1use kyu_common::{KyuError, KyuResult};
4use kyu_planner::*;
5
6use crate::operators::*;
7use crate::physical_plan::PhysicalOperator;
8
9fn all_variable_indices(exprs: &[kyu_expression::BoundExpression]) -> Option<Vec<usize>> {
12 let mut indices = Vec::with_capacity(exprs.len());
13 for expr in exprs {
14 match expr {
15 kyu_expression::BoundExpression::Variable { index, .. } => {
16 indices.push(*index as usize);
17 }
18 _ => return None,
19 }
20 }
21 let mut seen = std::collections::HashSet::new();
23 if !indices.iter().all(|i| seen.insert(*i)) {
24 return None;
25 }
26 Some(indices)
27}
28
29pub fn map_plan(logical: &LogicalPlan) -> KyuResult<PhysicalOperator> {
31 match logical {
32 LogicalPlan::ScanNode(scan) => {
33 Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(scan.table_id)))
34 }
35
36 LogicalPlan::ScanRel(scan) => {
37 Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(scan.table_id)))
39 }
40
41 LogicalPlan::Filter(f) => {
42 let child = map_plan(&f.child)?;
43 Ok(PhysicalOperator::Filter(FilterOp::new(
44 child,
45 f.predicate.clone(),
46 )))
47 }
48
49 LogicalPlan::Projection(p) => {
50 let child = map_plan(&p.child)?;
51 if let PhysicalOperator::ScanNode(mut scan) = child {
54 if let Some(indices) = all_variable_indices(&p.expressions) {
55 scan.column_indices = Some(indices);
56 return Ok(PhysicalOperator::ScanNode(scan));
57 }
58 return Ok(PhysicalOperator::Projection(ProjectionOp::new(
60 PhysicalOperator::ScanNode(scan),
61 p.expressions.clone(),
62 )));
63 }
64 Ok(PhysicalOperator::Projection(ProjectionOp::new(
65 child,
66 p.expressions.clone(),
67 )))
68 }
69
70 LogicalPlan::HashJoin(j) => {
71 let build = map_plan(&j.build)?;
72 let probe = map_plan(&j.probe)?;
73 Ok(PhysicalOperator::HashJoin(HashJoinOp::new(
74 build,
75 probe,
76 j.build_keys.clone(),
77 j.probe_keys.clone(),
78 )))
79 }
80
81 LogicalPlan::CrossProduct(cp) => {
82 let left = map_plan(&cp.left)?;
83 let right = map_plan(&cp.right)?;
84 Ok(PhysicalOperator::CrossProduct(CrossProductOp::new(
85 left, right,
86 )))
87 }
88
89 LogicalPlan::Aggregate(a) => {
90 let child = map_plan(&a.child)?;
91 Ok(PhysicalOperator::Aggregate(AggregateOp::new(
92 child,
93 a.group_by.clone(),
94 a.aggregates.clone(),
95 )))
96 }
97
98 LogicalPlan::OrderBy(o) => {
99 let child = map_plan(&o.child)?;
100 Ok(PhysicalOperator::OrderBy(OrderByOp::new(
101 child,
102 o.order_by.clone(),
103 )))
104 }
105
106 LogicalPlan::Limit(l) => {
107 let child = map_plan(&l.child)?;
108 Ok(PhysicalOperator::Limit(LimitOp::new(
109 child,
110 l.skip.unwrap_or(0),
111 l.limit.unwrap_or(u64::MAX),
112 )))
113 }
114
115 LogicalPlan::Distinct(d) => {
116 let child = map_plan(&d.child)?;
117 Ok(PhysicalOperator::Distinct(DistinctOp::new(child)))
118 }
119
120 LogicalPlan::Unwind(u) => {
121 let child = map_plan(&u.child)?;
122 Ok(PhysicalOperator::Unwind(UnwindOp::new(
123 child,
124 u.expression.clone(),
125 )))
126 }
127
128 LogicalPlan::RecursiveJoin(rj) => {
129 let child = map_plan(&rj.child)?;
130 Ok(PhysicalOperator::RecursiveJoin(RecursiveJoinOp::new(
131 child,
132 crate::operators::recursive_join::RecursiveJoinConfig {
133 rel_table_id: rj.rel_table_id,
134 dest_table_id: rj.dest_table_id,
135 direction: rj.direction,
136 min_hops: rj.min_hops,
137 max_hops: rj.max_hops,
138 src_key_col: rj.src_key_col as usize,
139 dest_key_col: rj.dest_key_col as usize,
140 dest_ncols: rj.dest_columns.len(),
141 },
142 )))
143 }
144
145 LogicalPlan::Empty(e) => Ok(PhysicalOperator::Empty(EmptyOp::new(e.num_columns))),
146
147 LogicalPlan::Union(u) => {
148 if u.children.len() == 1 {
151 map_plan(&u.children[0])
152 } else {
153 Err(KyuError::NotImplemented("UNION execution".into()))
154 }
155 }
156
157 LogicalPlan::CreateNode(_) | LogicalPlan::SetProperty(_) | LogicalPlan::Delete(_) => Err(
158 KyuError::NotImplemented("mutation operators not yet executable".into()),
159 ),
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use kyu_common::id::TableId;
167 use kyu_expression::BoundExpression;
168 use kyu_types::{LogicalType, TypedValue};
169 use smol_str::SmolStr;
170
171 #[test]
172 fn map_empty() {
173 let plan = LogicalPlan::Empty(LogicalEmpty { num_columns: 2 });
174 let op = map_plan(&plan).unwrap();
175 assert!(matches!(op, PhysicalOperator::Empty(_)));
176 }
177
178 #[test]
179 fn map_scan_node() {
180 let plan = LogicalPlan::ScanNode(LogicalScanNode {
181 table_id: TableId(0),
182 variable_index: Some(0),
183 output_columns: vec![(SmolStr::new("x"), LogicalType::Int64)],
184 });
185 let op = map_plan(&plan).unwrap();
186 assert!(matches!(op, PhysicalOperator::ScanNode(_)));
187 }
188
189 #[test]
190 fn map_projection() {
191 let plan = LogicalPlan::Projection(Box::new(LogicalProjection {
192 child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
193 expressions: vec![BoundExpression::Literal {
194 value: TypedValue::Int64(1),
195 result_type: LogicalType::Int64,
196 }],
197 aliases: vec![SmolStr::new("x")],
198 }));
199 let op = map_plan(&plan).unwrap();
200 assert!(matches!(op, PhysicalOperator::Projection(_)));
201 }
202
203 #[test]
204 fn map_filter() {
205 let plan = LogicalPlan::Filter(Box::new(LogicalFilter {
206 child: LogicalPlan::Empty(LogicalEmpty { num_columns: 1 }),
207 predicate: BoundExpression::Literal {
208 value: TypedValue::Bool(true),
209 result_type: LogicalType::Bool,
210 },
211 }));
212 let op = map_plan(&plan).unwrap();
213 assert!(matches!(op, PhysicalOperator::Filter(_)));
214 }
215
216 #[test]
217 fn map_limit() {
218 let plan = LogicalPlan::Limit(Box::new(LogicalLimit {
219 child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
220 skip: Some(1),
221 limit: Some(10),
222 }));
223 let op = map_plan(&plan).unwrap();
224 assert!(matches!(op, PhysicalOperator::Limit(_)));
225 }
226}