use crate::error::{FusekiError, FusekiResult};
use crate::store::Store;
use scirs2_core::metrics::{Counter, Histogram, Timer};
use scirs2_core::parallel_ops::{par_chunks, par_join};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Notify, RwLock, Semaphore};
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
pub enabled: bool,
pub max_batch_size: usize,
pub min_batch_size: usize,
pub max_wait_time_ms: u64,
pub adaptive_sizing: bool,
pub max_parallel_batches: usize,
pub analyze_dependencies: bool,
pub max_parallel_queries: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfig {
enabled: true,
max_batch_size: 100,
min_batch_size: 10,
max_wait_time_ms: 100,
adaptive_sizing: true,
max_parallel_batches: 4,
analyze_dependencies: true,
max_parallel_queries: 20,
}
}
}
#[derive(Debug, Clone)]
pub struct BatchQuery {
pub id: String,
pub dataset: String,
pub query: String,
pub user_id: Option<String>,
pub submitted_at: Instant,
pub timeout: Duration,
pub metadata: HashMap<String, String>,
}
impl BatchQuery {
pub fn new(query: String, dataset: String) -> Self {
BatchQuery {
id: Uuid::new_v4().to_string(),
dataset,
query,
user_id: None,
submitted_at: Instant::now(),
timeout: Duration::from_secs(30),
metadata: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct BatchQueryResult {
pub query_id: String,
pub success: bool,
pub result: Option<String>,
pub error: Option<String>,
pub execution_time: Duration,
}
#[derive(Debug)]
pub struct QueryBatch {
pub id: String,
pub queries: Vec<BatchQuery>,
pub created_at: Instant,
pub dataset: String,
}
impl QueryBatch {
pub fn new(dataset: String) -> Self {
QueryBatch {
id: Uuid::new_v4().to_string(),
queries: Vec::new(),
created_at: Instant::now(),
dataset,
}
}
pub fn add_query(&mut self, query: BatchQuery) {
self.queries.push(query);
}
pub fn is_full(&self, max_size: usize) -> bool {
self.queries.len() >= max_size
}
pub fn size(&self) -> usize {
self.queries.len()
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct BatchStats {
pub total_batches: u64,
pub total_queries: u64,
pub average_batch_size: f64,
pub average_wait_time_ms: f64,
pub average_execution_time_ms: f64,
pub parallel_efficiency: f64,
pub queries_per_second: f64,
}
pub struct BatchExecutor {
config: BatchConfig,
store: Arc<Store>,
dataset_batches: Arc<RwLock<HashMap<String, Arc<RwLock<QueryBatch>>>>>,
pending_queries: Arc<RwLock<VecDeque<BatchQuery>>>,
active_batches: Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
result_channels: Arc<RwLock<HashMap<String, tokio::sync::oneshot::Sender<BatchQueryResult>>>>,
stats: Arc<RwLock<BatchStats>>,
total_batches: Arc<AtomicU64>,
total_queries: Arc<AtomicU64>,
batch_semaphore: Arc<Semaphore>,
new_query_notify: Arc<Notify>,
shutdown: Arc<tokio::sync::watch::Sender<bool>>,
}
impl BatchExecutor {
pub fn new(config: BatchConfig, store: Arc<Store>) -> Arc<Self> {
let batch_semaphore = Arc::new(Semaphore::new(config.max_parallel_batches));
let (shutdown_tx, _) = tokio::sync::watch::channel(false);
let executor = Arc::new(BatchExecutor {
config,
store,
dataset_batches: Arc::new(RwLock::new(HashMap::new())),
pending_queries: Arc::new(RwLock::new(VecDeque::new())),
active_batches: Arc::new(RwLock::new(HashMap::new())),
result_channels: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(BatchStats {
total_batches: 0,
total_queries: 0,
average_batch_size: 0.0,
average_wait_time_ms: 0.0,
average_execution_time_ms: 0.0,
parallel_efficiency: 1.0,
queries_per_second: 0.0,
})),
total_batches: Arc::new(AtomicU64::new(0)),
total_queries: Arc::new(AtomicU64::new(0)),
batch_semaphore,
new_query_notify: Arc::new(Notify::new()),
shutdown: Arc::new(shutdown_tx),
});
executor.clone().start_batch_processor();
info!(
"Batch executor initialized with max {} queries per batch",
executor.config.max_batch_size
);
executor
}
#[instrument(skip(self, query))]
pub async fn submit_query(&self, query: BatchQuery) -> FusekiResult<BatchQueryResult> {
if !self.config.enabled {
return self.execute_single_query(query).await;
}
let (tx, rx) = tokio::sync::oneshot::channel();
let query_id = query.id.clone();
{
let mut channels = self.result_channels.write().await;
channels.insert(query_id.clone(), tx);
}
{
let mut pending = self.pending_queries.write().await;
pending.push_back(query);
}
self.new_query_notify.notify_one();
self.total_queries.fetch_add(1, Ordering::Relaxed);
let result = tokio::time::timeout(
Duration::from_secs(60), rx,
)
.await
.map_err(|_| FusekiError::request_timeout("Batch execution timeout"))?
.map_err(|_| FusekiError::server_error("Result channel closed"))?;
Ok(result)
}
fn start_batch_processor(self: Arc<Self>) {
tokio::spawn(async move {
let mut shutdown_rx = self.shutdown.subscribe();
let mut ticker = tokio::time::interval(Duration::from_millis(10));
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
break;
}
}
_ = ticker.tick() => {
self.process_pending_queries().await;
self.check_batch_timeouts().await;
}
_ = self.new_query_notify.notified() => {
self.process_pending_queries().await;
}
}
}
});
}
async fn process_pending_queries(&self) {
let mut pending = self.pending_queries.write().await;
#[allow(unused_mut)]
let mut batches = self.dataset_batches.write().await;
while let Some(query) = pending.pop_front() {
let dataset = query.dataset.clone();
let batch = batches
.entry(dataset.clone())
.or_insert_with(|| Arc::new(RwLock::new(QueryBatch::new(dataset.clone()))));
let mut batch_lock = batch.write().await;
batch_lock.add_query(query);
if batch_lock.is_full(self.config.max_batch_size)
|| (batch_lock.size() >= self.config.min_batch_size
&& batch_lock.age() >= Duration::from_millis(self.config.max_wait_time_ms))
{
let executing_batch =
std::mem::replace(&mut *batch_lock, QueryBatch::new(dataset.clone()));
drop(batch_lock);
self.execute_batch_impl(executing_batch).await;
}
}
}
async fn check_batch_timeouts(&self) {
#[allow(unused_mut)]
let mut batches = self.dataset_batches.write().await;
let max_wait = Duration::from_millis(self.config.max_wait_time_ms);
let datasets_to_execute: Vec<String> = batches
.iter()
.filter_map(|(dataset, batch_arc)| {
if let Ok(batch) = batch_arc.try_read() {
if batch.size() >= self.config.min_batch_size && batch.age() >= max_wait {
return Some(dataset.clone());
}
}
None
})
.collect();
for dataset in datasets_to_execute {
if let Some(batch_arc) = batches.get(&dataset) {
let mut batch_lock = batch_arc.write().await;
if batch_lock.size() > 0 {
let executing_batch =
std::mem::replace(&mut *batch_lock, QueryBatch::new(dataset.clone()));
drop(batch_lock);
self.execute_batch_impl(executing_batch).await;
}
}
}
}
#[instrument(skip(self, batch))]
async fn execute_batch_impl(&self, batch: QueryBatch) {
let batch_id = batch.id.clone();
let batch_size = batch.size();
debug!("Executing batch {} with {} queries", batch_id, batch_size);
let _permit = self
.batch_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
self.total_batches.fetch_add(1, Ordering::Relaxed);
let batch_start = Instant::now();
let queries = batch.queries;
let chunk_size = (batch_size / self.config.max_parallel_queries).max(1);
let results: Vec<BatchQueryResult> = if self.config.analyze_dependencies {
self.execute_with_dependencies(queries).await
} else {
self.execute_parallel(queries).await
};
let batch_duration = batch_start.elapsed();
for result in results {
let query_id = result.query_id.clone();
if let Some(tx) = self.result_channels.write().await.remove(&query_id) {
let _ = tx.send(result);
}
}
info!(
"Batch {} completed in {:.2}ms ({} queries, {:.2} q/s)",
batch_id,
batch_duration.as_millis(),
batch_size,
(batch_size as f64) / batch_duration.as_secs_f64()
);
self.update_batch_stats(batch_size, batch_duration).await;
}
async fn execute_parallel(&self, queries: Vec<BatchQuery>) -> Vec<BatchQueryResult> {
let store = self.store.clone();
let tasks: Vec<_> = queries
.into_iter()
.map(|query| {
let store_clone = store.clone();
tokio::spawn(async move {
Self::execute_query_impl_with_store(query, store_clone).await
})
})
.collect();
let mut results = Vec::new();
for task in tasks {
if let Ok(result) = task.await {
results.push(result);
}
}
results
}
async fn execute_with_dependencies(&self, queries: Vec<BatchQuery>) -> Vec<BatchQueryResult> {
if queries.is_empty() {
return Vec::new();
}
let dependency_graph = self.build_dependency_graph(&queries);
let execution_order = self.topological_sort(&dependency_graph, queries.len());
self.execute_in_dependency_order(queries, execution_order, &dependency_graph)
.await
}
fn build_dependency_graph(&self, queries: &[BatchQuery]) -> Vec<Vec<usize>> {
let n = queries.len();
let mut graph = vec![Vec::new(); n];
for i in 0..n {
for j in (i + 1)..n {
if self.has_dependency(&queries[i], &queries[j]) {
graph[i].push(j); }
else if self.has_dependency(&queries[j], &queries[i]) {
graph[j].push(i); }
}
}
graph
}
fn has_dependency(&self, query1: &BatchQuery, query2: &BatchQuery) -> bool {
let q1_lower = query1.query.to_lowercase();
let q2_lower = query2.query.to_lowercase();
if (q1_lower.contains("insert")
|| q1_lower.contains("delete")
|| q1_lower.contains("update"))
&& q2_lower.contains("select")
{
return self.queries_reference_same_data(&q1_lower, &q2_lower);
}
if (q1_lower.contains("insert") || q1_lower.contains("delete"))
&& (q2_lower.contains("insert") || q2_lower.contains("delete"))
{
return self.queries_reference_same_data(&q1_lower, &q2_lower);
}
false
}
fn queries_reference_same_data(&self, query1: &str, query2: &str) -> bool {
let graph1 = self.extract_graph_references(query1);
let graph2 = self.extract_graph_references(query2);
if graph1.is_empty() || graph2.is_empty() {
return true;
}
for g1 in &graph1 {
if graph2.contains(g1) {
return true;
}
}
false
}
fn extract_graph_references(&self, query: &str) -> Vec<String> {
let mut graphs = Vec::new();
let mut current = query;
while let Some(pos) = current.find("graph") {
let after = ¤t[pos + 5..];
if let Some(start) = after.find('<') {
if let Some(end) = after[start..].find('>') {
let graph_iri = after[start + 1..start + end].to_string();
graphs.push(graph_iri);
current = &after[start + end..];
continue;
}
}
break;
}
graphs
}
fn topological_sort(&self, graph: &[Vec<usize>], n: usize) -> Vec<usize> {
let mut in_degree = vec![0; n];
let mut result = Vec::new();
for edges in graph {
for &dest in edges {
in_degree[dest] += 1;
}
}
let mut queue: Vec<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
while !queue.is_empty() {
let batch: Vec<usize> = std::mem::take(&mut queue);
result.extend(&batch);
for &node in &batch {
for &neighbor in &graph[node] {
in_degree[neighbor] -= 1;
if in_degree[neighbor] == 0 {
queue.push(neighbor);
}
}
}
}
if result.len() != n {
(0..n).collect()
} else {
result
}
}
async fn execute_in_dependency_order(
&self,
queries: Vec<BatchQuery>,
execution_order: Vec<usize>,
graph: &[Vec<usize>],
) -> Vec<BatchQueryResult> {
let mut results = vec![None; queries.len()];
let mut in_degree = vec![0; queries.len()];
for edges in graph {
for &dest in edges {
in_degree[dest] += 1;
}
}
let mut completed = vec![false; queries.len()];
let mut queue: Vec<usize> = (0..queries.len()).filter(|&i| in_degree[i] == 0).collect();
while !queue.is_empty() {
let batch_queries: Vec<BatchQuery> =
queue.iter().map(|&idx| queries[idx].clone()).collect();
let batch_results = self.execute_parallel(batch_queries).await;
for (i, &idx) in queue.iter().enumerate() {
results[idx] = Some(batch_results[i].clone());
completed[idx] = true;
}
let current_batch = queue.clone();
queue.clear();
for &node in ¤t_batch {
for &neighbor in &graph[node] {
in_degree[neighbor] -= 1;
if in_degree[neighbor] == 0 && !completed[neighbor] {
queue.push(neighbor);
}
}
}
}
results.into_iter().flatten().collect()
}
async fn execute_query_impl_with_store(
query: BatchQuery,
store: Arc<Store>,
) -> BatchQueryResult {
let start = Instant::now();
let dataset_name = if query.dataset.is_empty() {
None
} else {
Some(query.dataset.as_str())
};
let result = match store.query_dataset(&query.query, dataset_name) {
Ok(query_result) => {
let result_json = match &query_result.inner {
oxirs_core::query::QueryResult::Select {
variables,
bindings,
} => {
serde_json::json!({
"type": "select",
"variables": variables,
"bindings": bindings,
"count": bindings.len()
})
}
oxirs_core::query::QueryResult::Ask(boolean) => {
serde_json::json!({
"type": "ask",
"boolean": boolean
})
}
oxirs_core::query::QueryResult::Construct(graph) => {
serde_json::json!({
"type": "construct",
"triples": graph.len()
})
}
};
let execution_time = start.elapsed();
BatchQueryResult {
query_id: query.id,
success: true,
result: Some(result_json.to_string()),
error: None,
execution_time,
}
}
Err(e) => {
let execution_time = start.elapsed();
warn!("Query execution failed for {}: {}", query.id, e);
BatchQueryResult {
query_id: query.id,
success: false,
result: None,
error: Some(e.to_string()),
execution_time,
}
}
};
result
}
async fn execute_single_query(&self, query: BatchQuery) -> FusekiResult<BatchQueryResult> {
Ok(Self::execute_query_impl_with_store(query, self.store.clone()).await)
}
async fn update_batch_stats(&self, batch_size: usize, duration: Duration) {
let mut stats = self.stats.write().await;
let total_batches = self.total_batches.load(Ordering::Relaxed);
let total_queries = self.total_queries.load(Ordering::Relaxed);
stats.total_batches = total_batches;
stats.total_queries = total_queries;
if total_batches > 0 {
stats.average_batch_size = (total_queries as f64) / (total_batches as f64);
}
let alpha = 0.1;
let new_exec_time = duration.as_millis() as f64;
stats.average_execution_time_ms =
alpha * new_exec_time + (1.0 - alpha) * stats.average_execution_time_ms;
let sequential_estimate = batch_size as f64 * 10.0; let parallel_time = duration.as_millis() as f64;
if parallel_time > 0.0 {
stats.parallel_efficiency = (sequential_estimate / parallel_time)
/ (batch_size as f64).min(self.config.max_parallel_queries as f64);
}
if duration.as_secs_f64() > 0.0 {
stats.queries_per_second = (batch_size as f64) / duration.as_secs_f64();
}
}
pub async fn get_stats(&self) -> BatchStats {
self.stats.read().await.clone()
}
pub async fn shutdown(&self) {
info!("Shutting down batch executor");
let _ = self.shutdown.send(true);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_store() -> Arc<Store> {
Arc::new(Store::new().expect("Failed to create test store"))
}
#[tokio::test]
async fn test_batch_executor_creation() {
let config = BatchConfig::default();
let store = create_test_store();
let executor = BatchExecutor::new(config, store);
let stats = executor.get_stats().await;
assert_eq!(stats.total_batches, 0);
assert_eq!(stats.total_queries, 0);
}
#[tokio::test]
async fn test_single_query_execution() {
let config = BatchConfig {
enabled: false,
..Default::default()
};
let store = create_test_store();
let dataset_config = crate::config::DatasetConfig {
name: "test".to_string(),
location: "/tmp/test".to_string(),
read_only: false,
text_index: None,
shacl_shapes: Vec::new(),
services: Vec::new(),
access_control: None,
backup: None,
};
let _ = store.create_dataset("test", dataset_config);
let executor = BatchExecutor::new(config, store);
let query = BatchQuery::new(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
"test".to_string(),
);
let result = executor.submit_query(query).await;
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.success || result.error.is_some());
}
#[tokio::test]
async fn test_batched_execution() {
let config = BatchConfig {
enabled: true,
max_batch_size: 5,
min_batch_size: 2,
max_wait_time_ms: 50,
..Default::default()
};
let store = create_test_store();
let dataset_config = crate::config::DatasetConfig {
name: "test".to_string(),
location: "/tmp/test".to_string(),
read_only: false,
text_index: None,
shacl_shapes: Vec::new(),
services: Vec::new(),
access_control: None,
backup: None,
};
let _ = store.create_dataset("test", dataset_config);
let executor = BatchExecutor::new(config, store);
let mut handles = Vec::new();
for i in 0..5 {
let executor = executor.clone();
let handle = tokio::spawn(async move {
let query = BatchQuery::new(
format!("SELECT * WHERE {{ ?s ?p ?o }} # Query {}", i),
"test".to_string(),
);
executor.submit_query(query).await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok());
}
let stats = executor.get_stats().await;
assert!(stats.total_queries >= 5);
}
}