ruvector_graph/executor/
plan.rs

1//! Query execution plan representation
2//!
3//! Provides logical and physical query plan structures for graph queries
4
5use crate::executor::operators::{AggregateFunction, JoinType, Operator, ScanMode};
6use crate::executor::stats::Statistics;
7use crate::executor::{ExecutionError, Result};
8use ordered_float::OrderedFloat;
9use std::collections::hash_map::DefaultHasher;
10use std::collections::HashMap;
11use std::fmt;
12use std::hash::{Hash, Hasher};
13
14/// Logical query plan (high-level, optimizer input)
15#[derive(Debug, Clone)]
16pub struct LogicalPlan {
17    pub root: PlanNode,
18    pub schema: QuerySchema,
19}
20
21impl LogicalPlan {
22    /// Create a new logical plan
23    pub fn new(root: PlanNode, schema: QuerySchema) -> Self {
24        Self { root, schema }
25    }
26
27    /// Generate cache key for this plan
28    pub fn cache_key(&self) -> String {
29        let mut hasher = DefaultHasher::new();
30        format!("{:?}", self).hash(&mut hasher);
31        format!("plan_{:x}", hasher.finish())
32    }
33
34    /// Check if plan can be parallelized
35    pub fn is_parallelizable(&self) -> bool {
36        self.root.is_parallelizable()
37    }
38
39    /// Estimate output cardinality
40    pub fn estimate_cardinality(&self) -> usize {
41        self.root.estimate_cardinality()
42    }
43}
44
45/// Physical query plan (low-level, executor input)
46pub struct PhysicalPlan {
47    pub operators: Vec<Box<dyn Operator>>,
48    pub pipeline_breakers: Vec<usize>,
49    pub parallelism: usize,
50}
51
52impl fmt::Debug for PhysicalPlan {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("PhysicalPlan")
55            .field("operator_count", &self.operators.len())
56            .field("pipeline_breakers", &self.pipeline_breakers)
57            .field("parallelism", &self.parallelism)
58            .finish()
59    }
60}
61
62impl PhysicalPlan {
63    /// Create physical plan from logical plan
64    pub fn from_logical(logical: &LogicalPlan, stats: &Statistics) -> Result<Self> {
65        let mut operators = Vec::new();
66        let mut pipeline_breakers = Vec::new();
67
68        Self::compile_node(&logical.root, stats, &mut operators, &mut pipeline_breakers)?;
69
70        let parallelism = if logical.is_parallelizable() {
71            num_cpus::get()
72        } else {
73            1
74        };
75
76        Ok(Self {
77            operators,
78            pipeline_breakers,
79            parallelism,
80        })
81    }
82
83    fn compile_node(
84        node: &PlanNode,
85        stats: &Statistics,
86        operators: &mut Vec<Box<dyn Operator>>,
87        pipeline_breakers: &mut Vec<usize>,
88    ) -> Result<()> {
89        match node {
90            PlanNode::NodeScan { mode, filter } => {
91                // Add scan operator
92                operators.push(Box::new(crate::executor::operators::NodeScan::new(
93                    mode.clone(),
94                    filter.clone(),
95                )));
96            }
97            PlanNode::EdgeScan { mode, filter } => {
98                operators.push(Box::new(crate::executor::operators::EdgeScan::new(
99                    mode.clone(),
100                    filter.clone(),
101                )));
102            }
103            PlanNode::Filter { input, predicate } => {
104                Self::compile_node(input, stats, operators, pipeline_breakers)?;
105                operators.push(Box::new(crate::executor::operators::Filter::new(
106                    predicate.clone(),
107                )));
108            }
109            PlanNode::Join {
110                left,
111                right,
112                join_type,
113                on,
114            } => {
115                Self::compile_node(left, stats, operators, pipeline_breakers)?;
116                pipeline_breakers.push(operators.len());
117                Self::compile_node(right, stats, operators, pipeline_breakers)?;
118                operators.push(Box::new(crate::executor::operators::Join::new(
119                    *join_type,
120                    on.clone(),
121                )));
122            }
123            PlanNode::Aggregate {
124                input,
125                group_by,
126                aggregates,
127            } => {
128                Self::compile_node(input, stats, operators, pipeline_breakers)?;
129                pipeline_breakers.push(operators.len());
130                operators.push(Box::new(crate::executor::operators::Aggregate::new(
131                    group_by.clone(),
132                    aggregates.clone(),
133                )));
134            }
135            PlanNode::Sort { input, order_by } => {
136                Self::compile_node(input, stats, operators, pipeline_breakers)?;
137                pipeline_breakers.push(operators.len());
138                operators.push(Box::new(crate::executor::operators::Sort::new(
139                    order_by.clone(),
140                )));
141            }
142            PlanNode::Limit {
143                input,
144                limit,
145                offset,
146            } => {
147                Self::compile_node(input, stats, operators, pipeline_breakers)?;
148                operators.push(Box::new(crate::executor::operators::Limit::new(
149                    *limit, *offset,
150                )));
151            }
152            PlanNode::Project { input, columns } => {
153                Self::compile_node(input, stats, operators, pipeline_breakers)?;
154                operators.push(Box::new(crate::executor::operators::Project::new(
155                    columns.clone(),
156                )));
157            }
158            PlanNode::HyperedgeScan { mode, filter } => {
159                operators.push(Box::new(crate::executor::operators::HyperedgeScan::new(
160                    mode.clone(),
161                    filter.clone(),
162                )));
163            }
164        }
165        Ok(())
166    }
167}
168
169/// Plan node types
170#[derive(Debug, Clone)]
171pub enum PlanNode {
172    /// Sequential or index-based node scan
173    NodeScan {
174        mode: ScanMode,
175        filter: Option<Predicate>,
176    },
177    /// Edge scan
178    EdgeScan {
179        mode: ScanMode,
180        filter: Option<Predicate>,
181    },
182    /// Hyperedge scan
183    HyperedgeScan {
184        mode: ScanMode,
185        filter: Option<Predicate>,
186    },
187    /// Filter rows by predicate
188    Filter {
189        input: Box<PlanNode>,
190        predicate: Predicate,
191    },
192    /// Join two inputs
193    Join {
194        left: Box<PlanNode>,
195        right: Box<PlanNode>,
196        join_type: JoinType,
197        on: Vec<(String, String)>,
198    },
199    /// Aggregate with grouping
200    Aggregate {
201        input: Box<PlanNode>,
202        group_by: Vec<String>,
203        aggregates: Vec<(AggregateFunction, String)>,
204    },
205    /// Sort results
206    Sort {
207        input: Box<PlanNode>,
208        order_by: Vec<(String, SortOrder)>,
209    },
210    /// Limit and offset
211    Limit {
212        input: Box<PlanNode>,
213        limit: usize,
214        offset: usize,
215    },
216    /// Project columns
217    Project {
218        input: Box<PlanNode>,
219        columns: Vec<String>,
220    },
221}
222
223impl PlanNode {
224    /// Check if node can be parallelized
225    pub fn is_parallelizable(&self) -> bool {
226        match self {
227            PlanNode::NodeScan { .. } => true,
228            PlanNode::EdgeScan { .. } => true,
229            PlanNode::HyperedgeScan { .. } => true,
230            PlanNode::Filter { input, .. } => input.is_parallelizable(),
231            PlanNode::Join { .. } => true,
232            PlanNode::Aggregate { .. } => true,
233            PlanNode::Sort { .. } => true,
234            PlanNode::Limit { .. } => false,
235            PlanNode::Project { input, .. } => input.is_parallelizable(),
236        }
237    }
238
239    /// Estimate output cardinality
240    pub fn estimate_cardinality(&self) -> usize {
241        match self {
242            PlanNode::NodeScan { .. } => 1000, // Placeholder
243            PlanNode::EdgeScan { .. } => 5000,
244            PlanNode::HyperedgeScan { .. } => 500,
245            PlanNode::Filter { input, .. } => input.estimate_cardinality() / 10,
246            PlanNode::Join { left, right, .. } => {
247                left.estimate_cardinality() * right.estimate_cardinality() / 100
248            }
249            PlanNode::Aggregate { input, .. } => input.estimate_cardinality() / 20,
250            PlanNode::Sort { input, .. } => input.estimate_cardinality(),
251            PlanNode::Limit { limit, .. } => *limit,
252            PlanNode::Project { input, .. } => input.estimate_cardinality(),
253        }
254    }
255}
256
257/// Query schema definition
258#[derive(Debug, Clone)]
259pub struct QuerySchema {
260    pub columns: Vec<ColumnDef>,
261}
262
263impl QuerySchema {
264    pub fn new(columns: Vec<ColumnDef>) -> Self {
265        Self { columns }
266    }
267}
268
269/// Column definition
270#[derive(Debug, Clone)]
271pub struct ColumnDef {
272    pub name: String,
273    pub data_type: DataType,
274    pub nullable: bool,
275}
276
277/// Data types supported in query execution
278#[derive(Debug, Clone, PartialEq)]
279pub enum DataType {
280    Int64,
281    Float64,
282    String,
283    Boolean,
284    Bytes,
285    List(Box<DataType>),
286}
287
288/// Sort order
289#[derive(Debug, Clone, Copy, PartialEq)]
290pub enum SortOrder {
291    Ascending,
292    Descending,
293}
294
295/// Query predicate for filtering
296#[derive(Debug, Clone)]
297pub enum Predicate {
298    /// column = value
299    Equals(String, Value),
300    /// column != value
301    NotEquals(String, Value),
302    /// column > value
303    GreaterThan(String, Value),
304    /// column >= value
305    GreaterThanOrEqual(String, Value),
306    /// column < value
307    LessThan(String, Value),
308    /// column <= value
309    LessThanOrEqual(String, Value),
310    /// column IN (values)
311    In(String, Vec<Value>),
312    /// column LIKE pattern
313    Like(String, String),
314    /// AND predicates
315    And(Vec<Predicate>),
316    /// OR predicates
317    Or(Vec<Predicate>),
318    /// NOT predicate
319    Not(Box<Predicate>),
320}
321
322/// Runtime value
323#[derive(Debug, Clone, PartialEq)]
324pub enum Value {
325    Int64(i64),
326    Float64(f64),
327    String(String),
328    Boolean(bool),
329    Bytes(Vec<u8>),
330    Null,
331}
332
333impl Eq for Value {}
334
335impl Hash for Value {
336    fn hash<H: Hasher>(&self, state: &mut H) {
337        std::mem::discriminant(self).hash(state);
338        match self {
339            Value::Int64(v) => v.hash(state),
340            Value::Float64(v) => OrderedFloat(*v).hash(state),
341            Value::String(v) => v.hash(state),
342            Value::Boolean(v) => v.hash(state),
343            Value::Bytes(v) => v.hash(state),
344            Value::Null => {}
345        }
346    }
347}
348
349impl Value {
350    /// Compare values for predicate evaluation
351    pub fn compare(&self, other: &Value) -> Option<std::cmp::Ordering> {
352        match (self, other) {
353            (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
354            (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
355            (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
356            (Value::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
357            _ => None,
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_logical_plan_creation() {
368        let schema = QuerySchema::new(vec![ColumnDef {
369            name: "id".to_string(),
370            data_type: DataType::Int64,
371            nullable: false,
372        }]);
373
374        let plan = LogicalPlan::new(
375            PlanNode::NodeScan {
376                mode: ScanMode::Sequential,
377                filter: None,
378            },
379            schema,
380        );
381
382        assert!(plan.is_parallelizable());
383    }
384
385    #[test]
386    fn test_value_comparison() {
387        let v1 = Value::Int64(42);
388        let v2 = Value::Int64(100);
389        assert_eq!(v1.compare(&v2), Some(std::cmp::Ordering::Less));
390    }
391}