1use crate::executor::operators::{AggregateFunction, JoinType, Operator, ScanMode};
6use crate::executor::stats::Statistics;
7use crate::executor::{ExecutionError, Result};
8use ordered_float::OrderedFloat;
9use std::collections::hash_map::DefaultHasher;
10use std::collections::HashMap;
11use std::fmt;
12use std::hash::{Hash, Hasher};
13
14#[derive(Debug, Clone)]
16pub struct LogicalPlan {
17 pub root: PlanNode,
18 pub schema: QuerySchema,
19}
20
21impl LogicalPlan {
22 pub fn new(root: PlanNode, schema: QuerySchema) -> Self {
24 Self { root, schema }
25 }
26
27 pub fn cache_key(&self) -> String {
29 let mut hasher = DefaultHasher::new();
30 format!("{:?}", self).hash(&mut hasher);
31 format!("plan_{:x}", hasher.finish())
32 }
33
34 pub fn is_parallelizable(&self) -> bool {
36 self.root.is_parallelizable()
37 }
38
39 pub fn estimate_cardinality(&self) -> usize {
41 self.root.estimate_cardinality()
42 }
43}
44
45pub struct PhysicalPlan {
47 pub operators: Vec<Box<dyn Operator>>,
48 pub pipeline_breakers: Vec<usize>,
49 pub parallelism: usize,
50}
51
52impl fmt::Debug for PhysicalPlan {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("PhysicalPlan")
55 .field("operator_count", &self.operators.len())
56 .field("pipeline_breakers", &self.pipeline_breakers)
57 .field("parallelism", &self.parallelism)
58 .finish()
59 }
60}
61
62impl PhysicalPlan {
63 pub fn from_logical(logical: &LogicalPlan, stats: &Statistics) -> Result<Self> {
65 let mut operators = Vec::new();
66 let mut pipeline_breakers = Vec::new();
67
68 Self::compile_node(&logical.root, stats, &mut operators, &mut pipeline_breakers)?;
69
70 let parallelism = if logical.is_parallelizable() {
71 num_cpus::get()
72 } else {
73 1
74 };
75
76 Ok(Self {
77 operators,
78 pipeline_breakers,
79 parallelism,
80 })
81 }
82
83 fn compile_node(
84 node: &PlanNode,
85 stats: &Statistics,
86 operators: &mut Vec<Box<dyn Operator>>,
87 pipeline_breakers: &mut Vec<usize>,
88 ) -> Result<()> {
89 match node {
90 PlanNode::NodeScan { mode, filter } => {
91 operators.push(Box::new(crate::executor::operators::NodeScan::new(
93 mode.clone(),
94 filter.clone(),
95 )));
96 }
97 PlanNode::EdgeScan { mode, filter } => {
98 operators.push(Box::new(crate::executor::operators::EdgeScan::new(
99 mode.clone(),
100 filter.clone(),
101 )));
102 }
103 PlanNode::Filter { input, predicate } => {
104 Self::compile_node(input, stats, operators, pipeline_breakers)?;
105 operators.push(Box::new(crate::executor::operators::Filter::new(
106 predicate.clone(),
107 )));
108 }
109 PlanNode::Join {
110 left,
111 right,
112 join_type,
113 on,
114 } => {
115 Self::compile_node(left, stats, operators, pipeline_breakers)?;
116 pipeline_breakers.push(operators.len());
117 Self::compile_node(right, stats, operators, pipeline_breakers)?;
118 operators.push(Box::new(crate::executor::operators::Join::new(
119 *join_type,
120 on.clone(),
121 )));
122 }
123 PlanNode::Aggregate {
124 input,
125 group_by,
126 aggregates,
127 } => {
128 Self::compile_node(input, stats, operators, pipeline_breakers)?;
129 pipeline_breakers.push(operators.len());
130 operators.push(Box::new(crate::executor::operators::Aggregate::new(
131 group_by.clone(),
132 aggregates.clone(),
133 )));
134 }
135 PlanNode::Sort { input, order_by } => {
136 Self::compile_node(input, stats, operators, pipeline_breakers)?;
137 pipeline_breakers.push(operators.len());
138 operators.push(Box::new(crate::executor::operators::Sort::new(
139 order_by.clone(),
140 )));
141 }
142 PlanNode::Limit {
143 input,
144 limit,
145 offset,
146 } => {
147 Self::compile_node(input, stats, operators, pipeline_breakers)?;
148 operators.push(Box::new(crate::executor::operators::Limit::new(
149 *limit, *offset,
150 )));
151 }
152 PlanNode::Project { input, columns } => {
153 Self::compile_node(input, stats, operators, pipeline_breakers)?;
154 operators.push(Box::new(crate::executor::operators::Project::new(
155 columns.clone(),
156 )));
157 }
158 PlanNode::HyperedgeScan { mode, filter } => {
159 operators.push(Box::new(crate::executor::operators::HyperedgeScan::new(
160 mode.clone(),
161 filter.clone(),
162 )));
163 }
164 }
165 Ok(())
166 }
167}
168
169#[derive(Debug, Clone)]
171pub enum PlanNode {
172 NodeScan {
174 mode: ScanMode,
175 filter: Option<Predicate>,
176 },
177 EdgeScan {
179 mode: ScanMode,
180 filter: Option<Predicate>,
181 },
182 HyperedgeScan {
184 mode: ScanMode,
185 filter: Option<Predicate>,
186 },
187 Filter {
189 input: Box<PlanNode>,
190 predicate: Predicate,
191 },
192 Join {
194 left: Box<PlanNode>,
195 right: Box<PlanNode>,
196 join_type: JoinType,
197 on: Vec<(String, String)>,
198 },
199 Aggregate {
201 input: Box<PlanNode>,
202 group_by: Vec<String>,
203 aggregates: Vec<(AggregateFunction, String)>,
204 },
205 Sort {
207 input: Box<PlanNode>,
208 order_by: Vec<(String, SortOrder)>,
209 },
210 Limit {
212 input: Box<PlanNode>,
213 limit: usize,
214 offset: usize,
215 },
216 Project {
218 input: Box<PlanNode>,
219 columns: Vec<String>,
220 },
221}
222
223impl PlanNode {
224 pub fn is_parallelizable(&self) -> bool {
226 match self {
227 PlanNode::NodeScan { .. } => true,
228 PlanNode::EdgeScan { .. } => true,
229 PlanNode::HyperedgeScan { .. } => true,
230 PlanNode::Filter { input, .. } => input.is_parallelizable(),
231 PlanNode::Join { .. } => true,
232 PlanNode::Aggregate { .. } => true,
233 PlanNode::Sort { .. } => true,
234 PlanNode::Limit { .. } => false,
235 PlanNode::Project { input, .. } => input.is_parallelizable(),
236 }
237 }
238
239 pub fn estimate_cardinality(&self) -> usize {
241 match self {
242 PlanNode::NodeScan { .. } => 1000, PlanNode::EdgeScan { .. } => 5000,
244 PlanNode::HyperedgeScan { .. } => 500,
245 PlanNode::Filter { input, .. } => input.estimate_cardinality() / 10,
246 PlanNode::Join { left, right, .. } => {
247 left.estimate_cardinality() * right.estimate_cardinality() / 100
248 }
249 PlanNode::Aggregate { input, .. } => input.estimate_cardinality() / 20,
250 PlanNode::Sort { input, .. } => input.estimate_cardinality(),
251 PlanNode::Limit { limit, .. } => *limit,
252 PlanNode::Project { input, .. } => input.estimate_cardinality(),
253 }
254 }
255}
256
257#[derive(Debug, Clone)]
259pub struct QuerySchema {
260 pub columns: Vec<ColumnDef>,
261}
262
263impl QuerySchema {
264 pub fn new(columns: Vec<ColumnDef>) -> Self {
265 Self { columns }
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct ColumnDef {
272 pub name: String,
273 pub data_type: DataType,
274 pub nullable: bool,
275}
276
277#[derive(Debug, Clone, PartialEq)]
279pub enum DataType {
280 Int64,
281 Float64,
282 String,
283 Boolean,
284 Bytes,
285 List(Box<DataType>),
286}
287
288#[derive(Debug, Clone, Copy, PartialEq)]
290pub enum SortOrder {
291 Ascending,
292 Descending,
293}
294
295#[derive(Debug, Clone)]
297pub enum Predicate {
298 Equals(String, Value),
300 NotEquals(String, Value),
302 GreaterThan(String, Value),
304 GreaterThanOrEqual(String, Value),
306 LessThan(String, Value),
308 LessThanOrEqual(String, Value),
310 In(String, Vec<Value>),
312 Like(String, String),
314 And(Vec<Predicate>),
316 Or(Vec<Predicate>),
318 Not(Box<Predicate>),
320}
321
322#[derive(Debug, Clone, PartialEq)]
324pub enum Value {
325 Int64(i64),
326 Float64(f64),
327 String(String),
328 Boolean(bool),
329 Bytes(Vec<u8>),
330 Null,
331}
332
333impl Eq for Value {}
334
335impl Hash for Value {
336 fn hash<H: Hasher>(&self, state: &mut H) {
337 std::mem::discriminant(self).hash(state);
338 match self {
339 Value::Int64(v) => v.hash(state),
340 Value::Float64(v) => OrderedFloat(*v).hash(state),
341 Value::String(v) => v.hash(state),
342 Value::Boolean(v) => v.hash(state),
343 Value::Bytes(v) => v.hash(state),
344 Value::Null => {}
345 }
346 }
347}
348
349impl Value {
350 pub fn compare(&self, other: &Value) -> Option<std::cmp::Ordering> {
352 match (self, other) {
353 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
354 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
355 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
356 (Value::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
357 _ => None,
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_logical_plan_creation() {
368 let schema = QuerySchema::new(vec![ColumnDef {
369 name: "id".to_string(),
370 data_type: DataType::Int64,
371 nullable: false,
372 }]);
373
374 let plan = LogicalPlan::new(
375 PlanNode::NodeScan {
376 mode: ScanMode::Sequential,
377 filter: None,
378 },
379 schema,
380 );
381
382 assert!(plan.is_parallelizable());
383 }
384
385 #[test]
386 fn test_value_comparison() {
387 let v1 = Value::Int64(42);
388 let v2 = Value::Int64(100);
389 assert_eq!(v1.compare(&v2), Some(std::cmp::Ordering::Less));
390 }
391}