pub mod cache;
pub mod operators;
pub mod parallel;
pub mod pipeline;
pub mod plan;
pub mod stats;
pub use cache::{CacheConfig, CacheEntry, QueryCache};
pub use operators::{
Aggregate, AggregateFunction, EdgeScan, Filter, HyperedgeScan, Join, JoinType, Limit, NodeScan,
Operator, Project, ScanMode, Sort,
};
pub use parallel::{ParallelConfig, ParallelExecutor};
pub use pipeline::{ExecutionContext, Pipeline, RowBatch};
pub use plan::{LogicalPlan, PhysicalPlan, PlanNode};
pub use stats::{ColumnStats, Histogram, Statistics, TableStats};
use std::error::Error;
use std::fmt;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum ExecutionError {
InvalidPlan(String),
OperatorError(String),
TypeMismatch(String),
ResourceExhausted(String),
Internal(String),
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionError::InvalidPlan(msg) => write!(f, "Invalid plan: {}", msg),
ExecutionError::OperatorError(msg) => write!(f, "Operator error: {}", msg),
ExecutionError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
ExecutionError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
ExecutionError::Internal(msg) => write!(f, "Internal error: {}", msg),
}
}
}
impl Error for ExecutionError {}
pub type Result<T> = std::result::Result<T, ExecutionError>;
pub struct QueryExecutor {
cache: Arc<QueryCache>,
stats: Arc<Statistics>,
parallel_config: ParallelConfig,
}
impl QueryExecutor {
pub fn new() -> Self {
Self {
cache: Arc::new(QueryCache::new(CacheConfig::default())),
stats: Arc::new(Statistics::new()),
parallel_config: ParallelConfig::default(),
}
}
pub fn with_config(cache_config: CacheConfig, parallel_config: ParallelConfig) -> Self {
Self {
cache: Arc::new(QueryCache::new(cache_config)),
stats: Arc::new(Statistics::new()),
parallel_config,
}
}
pub fn execute(&self, plan: &LogicalPlan) -> Result<Vec<RowBatch>> {
let cache_key = plan.cache_key();
if let Some(cached) = self.cache.get(&cache_key) {
return Ok(cached.results.clone());
}
let physical_plan = self.optimize(plan)?;
let results = if self.parallel_config.enabled && plan.is_parallelizable() {
self.execute_parallel(&physical_plan)?
} else {
self.execute_sequential(&physical_plan)?
};
self.cache.insert(cache_key, results.clone());
Ok(results)
}
fn optimize(&self, plan: &LogicalPlan) -> Result<PhysicalPlan> {
let physical = PhysicalPlan::from_logical(plan, &self.stats)?;
Ok(physical)
}
fn execute_sequential(&self, _plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
Ok(Vec::new())
}
fn execute_parallel(&self, plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
let executor = ParallelExecutor::new(self.parallel_config.clone());
executor.execute(plan)
}
pub fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
pub fn clear_cache(&self) {
self.cache.clear();
}
}
impl Default for QueryExecutor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_executor_creation() {
let executor = QueryExecutor::new();
assert!(executor.stats().is_empty());
}
#[test]
fn test_executor_with_config() {
let cache_config = CacheConfig {
max_entries: 100,
max_memory_bytes: 1024 * 1024,
ttl_seconds: 300,
};
let parallel_config = ParallelConfig {
enabled: true,
num_threads: 4,
batch_size: 1000,
};
let executor = QueryExecutor::with_config(cache_config, parallel_config);
assert!(executor.stats().is_empty());
}
}