ruvector_graph/executor/
mod.rs1pub mod cache;
17pub mod operators;
18pub mod parallel;
19pub mod pipeline;
20pub mod plan;
21pub mod stats;
22
23pub use cache::{CacheConfig, CacheEntry, QueryCache};
24pub use operators::{
25 Aggregate, AggregateFunction, EdgeScan, Filter, HyperedgeScan, Join, JoinType, Limit, NodeScan,
26 Operator, Project, ScanMode, Sort,
27};
28pub use parallel::{ParallelConfig, ParallelExecutor};
29pub use pipeline::{ExecutionContext, Pipeline, RowBatch};
30pub use plan::{LogicalPlan, PhysicalPlan, PlanNode};
31pub use stats::{ColumnStats, Histogram, Statistics, TableStats};
32
33use std::error::Error;
34use std::fmt;
35use std::sync::Arc;
36
37#[derive(Debug, Clone)]
39pub enum ExecutionError {
40 InvalidPlan(String),
42 OperatorError(String),
44 TypeMismatch(String),
46 ResourceExhausted(String),
48 Internal(String),
50}
51
52impl fmt::Display for ExecutionError {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 ExecutionError::InvalidPlan(msg) => write!(f, "Invalid plan: {}", msg),
56 ExecutionError::OperatorError(msg) => write!(f, "Operator error: {}", msg),
57 ExecutionError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
58 ExecutionError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
59 ExecutionError::Internal(msg) => write!(f, "Internal error: {}", msg),
60 }
61 }
62}
63
64impl Error for ExecutionError {}
65
66pub type Result<T> = std::result::Result<T, ExecutionError>;
67
68pub struct QueryExecutor {
70 cache: Arc<QueryCache>,
72 stats: Arc<Statistics>,
74 parallel_config: ParallelConfig,
76}
77
78impl QueryExecutor {
79 pub fn new() -> Self {
81 Self {
82 cache: Arc::new(QueryCache::new(CacheConfig::default())),
83 stats: Arc::new(Statistics::new()),
84 parallel_config: ParallelConfig::default(),
85 }
86 }
87
88 pub fn with_config(cache_config: CacheConfig, parallel_config: ParallelConfig) -> Self {
90 Self {
91 cache: Arc::new(QueryCache::new(cache_config)),
92 stats: Arc::new(Statistics::new()),
93 parallel_config,
94 }
95 }
96
97 pub fn execute(&self, plan: &LogicalPlan) -> Result<Vec<RowBatch>> {
99 let cache_key = plan.cache_key();
101 if let Some(cached) = self.cache.get(&cache_key) {
102 return Ok(cached.results.clone());
103 }
104
105 let physical_plan = self.optimize(plan)?;
107
108 let results = if self.parallel_config.enabled && plan.is_parallelizable() {
110 self.execute_parallel(&physical_plan)?
111 } else {
112 self.execute_sequential(&physical_plan)?
113 };
114
115 self.cache.insert(cache_key, results.clone());
117
118 Ok(results)
119 }
120
121 fn optimize(&self, plan: &LogicalPlan) -> Result<PhysicalPlan> {
123 let physical = PhysicalPlan::from_logical(plan, &self.stats)?;
125 Ok(physical)
126 }
127
128 fn execute_sequential(&self, _plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
130 Ok(Vec::new())
133 }
134
135 fn execute_parallel(&self, plan: &PhysicalPlan) -> Result<Vec<RowBatch>> {
137 let executor = ParallelExecutor::new(self.parallel_config.clone());
138 executor.execute(plan)
139 }
140
141 pub fn stats(&self) -> Arc<Statistics> {
143 Arc::clone(&self.stats)
144 }
145
146 pub fn clear_cache(&self) {
148 self.cache.clear();
149 }
150}
151
152impl Default for QueryExecutor {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_executor_creation() {
164 let executor = QueryExecutor::new();
165 assert!(executor.stats().is_empty());
166 }
167
168 #[test]
169 fn test_executor_with_config() {
170 let cache_config = CacheConfig {
171 max_entries: 100,
172 max_memory_bytes: 1024 * 1024,
173 ttl_seconds: 300,
174 };
175 let parallel_config = ParallelConfig {
176 enabled: true,
177 num_threads: 4,
178 batch_size: 1000,
179 };
180 let executor = QueryExecutor::with_config(cache_config, parallel_config);
181 assert!(executor.stats().is_empty());
182 }
183}