ruvector_graph/executor/
mod.rs

1//! High-performance query execution engine for RuVector graph database
2//!
3//! This module provides a complete query execution system with:
4//! - Logical and physical query plans
5//! - Vectorized operators (scan, filter, join, aggregate)
6//! - Pipeline execution with iterator model
7//! - Parallel execution using rayon
8//! - Query result caching
9//! - Cost-based optimization statistics
10//!
11//! Performance targets:
12//! - 100K+ traversals/second per core
13//! - Sub-millisecond simple lookups
14//! - SIMD-optimized predicate evaluation
15
16pub mod cache;
17pub mod operators;
18pub mod parallel;
19pub mod pipeline;
20pub mod plan;
21pub mod stats;
22
23pub use cache::{CacheConfig, CacheEntry, QueryCache};
24pub use operators::{
25    Aggregate, AggregateFunction, EdgeScan, Filter, HyperedgeScan, Join, JoinType, Limit, NodeScan,
26    Operator, Project, ScanMode, Sort,
27};
28pub use parallel::{ParallelConfig, ParallelExecutor};
29pub use pipeline::{ExecutionContext, Pipeline, RowBatch};
30pub use plan::{LogicalPlan, PhysicalPlan, PlanNode};
31pub use stats::{ColumnStats, Histogram, Statistics, TableStats};
32
33use std::error::Error;
34use std::fmt;
35use std::sync::Arc;
36
37/// Query execution error types
38#[derive(Debug, Clone)]
39pub enum ExecutionError {
40    /// Invalid query plan
41    InvalidPlan(String),
42    /// Operator execution failed
43    OperatorError(String),
44    /// Type mismatch in expression evaluation
45    TypeMismatch(String),
46    /// Resource exhausted (memory, disk, etc.)
47    ResourceExhausted(String),
48    /// Internal error
49    Internal(String),
50}
51
52impl fmt::Display for ExecutionError {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        match self {
55            ExecutionError::InvalidPlan(msg) => write!(f, "Invalid plan: {}", msg),
56            ExecutionError::OperatorError(msg) => write!(f, "Operator error: {}", msg),
57            ExecutionError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
58            ExecutionError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
59            ExecutionError::Internal(msg) => write!(f, "Internal error: {}", msg),
60        }
61    }
62}
63
64impl Error for ExecutionError {}
65
66pub type Result<T> = std::result::Result<T, ExecutionError>;
67
68/// Query execution engine
69pub struct QueryExecutor {
70    /// Query result cache
71    cache: Arc<QueryCache>,
72    /// Execution statistics
73    stats: Arc<Statistics>,
74    /// Parallel execution configuration
75    parallel_config: ParallelConfig,
76}
77
78impl QueryExecutor {
79    /// Create a new query executor
80    pub fn new() -> Self {
81        Self {
82            cache: Arc::new(QueryCache::new(CacheConfig::default())),
83            stats: Arc::new(Statistics::new()),
84            parallel_config: ParallelConfig::default(),
85        }
86    }
87
88    /// Create executor with custom configuration
89    pub fn with_config(cache_config: CacheConfig, parallel_config: ParallelConfig) -> Self {
90        Self {
91            cache: Arc::new(QueryCache::new(cache_config)),
92            stats: Arc::new(Statistics::new()),
93            parallel_config,
94        }
95    }
96
97    /// Execute a logical plan
98    pub fn execute(&self, plan: &LogicalPlan) -> Result<Vec<RowBatch>> {
99        // Check cache first
100        let cache_key = plan.cache_key();
101        if let Some(cached) = self.cache.get(&cache_key) {
102            return Ok(cached.results.clone());
103        }
104
105        // Optimize logical plan to physical plan
106        let physical_plan = self.optimize(plan)?;
107
108        // Execute physical plan
109        let results = if self.parallel_config.enabled && plan.is_parallelizable() {
110            self.execute_parallel(&physical_plan)?
111        } else {
112            self.execute_sequential(&physical_plan)?
113        };
114
115        // Cache results
116        self.cache.insert(cache_key, results.clone());
117
118        Ok(results)
119    }
120
121    /// Optimize logical plan to physical plan
122    fn optimize(&self, plan: &LogicalPlan) -> Result<PhysicalPlan> {
123        // Cost-based optimization using statistics
124        let physical = PhysicalPlan::from_logical(plan, &self.stats)?;
125        Ok(physical)
126    }
127
128    /// Execute plan sequentially
129    fn execute_sequential(&self, _plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
130        // Note: In a real implementation, we would need to reconstruct operators
131        // For now, return empty results as placeholder
132        Ok(Vec::new())
133    }
134
135    /// Execute plan in parallel
136    fn execute_parallel(&self, plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
137        let executor = ParallelExecutor::new(self.parallel_config.clone());
138        executor.execute(plan)
139    }
140
141    /// Get execution statistics
142    pub fn stats(&self) -> Arc<Statistics> {
143        Arc::clone(&self.stats)
144    }
145
146    /// Clear query cache
147    pub fn clear_cache(&self) {
148        self.cache.clear();
149    }
150}
151
152impl Default for QueryExecutor {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_executor_creation() {
164        let executor = QueryExecutor::new();
165        assert!(executor.stats().is_empty());
166    }
167
168    #[test]
169    fn test_executor_with_config() {
170        let cache_config = CacheConfig {
171            max_entries: 100,
172            max_memory_bytes: 1024 * 1024,
173            ttl_seconds: 300,
174        };
175        let parallel_config = ParallelConfig {
176            enabled: true,
177            num_threads: 4,
178            batch_size: 1000,
179        };
180        let executor = QueryExecutor::with_config(cache_config, parallel_config);
181        assert!(executor.stats().is_empty());
182    }
183}