Skip to main content

kyu_executor/
mapper.rs

1//! Plan mapper — translates LogicalPlan → PhysicalOperator (1:1 in Phase 6).
2
3use kyu_common::{KyuError, KyuResult};
4use kyu_planner::*;
5
6use crate::operators::*;
7use crate::physical_plan::PhysicalOperator;
8
9/// If all expressions are simple Variable references with unique indices,
10/// return the column indices in projection order. Otherwise return None.
11fn all_variable_indices(exprs: &[kyu_expression::BoundExpression]) -> Option<Vec<usize>> {
12    let mut indices = Vec::with_capacity(exprs.len());
13    for expr in exprs {
14        match expr {
15            kyu_expression::BoundExpression::Variable { index, .. } => {
16                indices.push(*index as usize);
17            }
18            _ => return None,
19        }
20    }
21    // Check uniqueness (duplicate column refs need projection to clone)
22    let mut seen = std::collections::HashSet::new();
23    if !indices.iter().all(|i| seen.insert(*i)) {
24        return None;
25    }
26    Some(indices)
27}
28
29/// Map a logical plan tree to a physical operator tree.
30pub fn map_plan(logical: &LogicalPlan) -> KyuResult<PhysicalOperator> {
31    match logical {
32        LogicalPlan::ScanNode(scan) => {
33            Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(scan.table_id)))
34        }
35
36        LogicalPlan::ScanRel(scan) => {
37            // For now, relationship scans use the same ScanNodeOp with the rel table id.
38            Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(scan.table_id)))
39        }
40
41        LogicalPlan::Filter(f) => {
42            let child = map_plan(&f.child)?;
43            Ok(PhysicalOperator::Filter(FilterOp::new(
44                child,
45                f.predicate.clone(),
46            )))
47        }
48
49        LogicalPlan::Projection(p) => {
50            let child = map_plan(&p.child)?;
51            // Column pruning: when projecting only Variable refs over a ScanNode,
52            // push column selection into the scan and eliminate the projection.
53            if let PhysicalOperator::ScanNode(mut scan) = child {
54                if let Some(indices) = all_variable_indices(&p.expressions) {
55                    scan.column_indices = Some(indices);
56                    return Ok(PhysicalOperator::ScanNode(scan));
57                }
58                // Not all variables — keep projection
59                return Ok(PhysicalOperator::Projection(ProjectionOp::new(
60                    PhysicalOperator::ScanNode(scan),
61                    p.expressions.clone(),
62                )));
63            }
64            Ok(PhysicalOperator::Projection(ProjectionOp::new(
65                child,
66                p.expressions.clone(),
67            )))
68        }
69
70        LogicalPlan::HashJoin(j) => {
71            let build = map_plan(&j.build)?;
72            let probe = map_plan(&j.probe)?;
73            Ok(PhysicalOperator::HashJoin(HashJoinOp::new(
74                build,
75                probe,
76                j.build_keys.clone(),
77                j.probe_keys.clone(),
78            )))
79        }
80
81        LogicalPlan::CrossProduct(cp) => {
82            let left = map_plan(&cp.left)?;
83            let right = map_plan(&cp.right)?;
84            Ok(PhysicalOperator::CrossProduct(CrossProductOp::new(
85                left, right,
86            )))
87        }
88
89        LogicalPlan::Aggregate(a) => {
90            let child = map_plan(&a.child)?;
91            Ok(PhysicalOperator::Aggregate(AggregateOp::new(
92                child,
93                a.group_by.clone(),
94                a.aggregates.clone(),
95            )))
96        }
97
98        LogicalPlan::OrderBy(o) => {
99            let child = map_plan(&o.child)?;
100            Ok(PhysicalOperator::OrderBy(OrderByOp::new(
101                child,
102                o.order_by.clone(),
103            )))
104        }
105
106        LogicalPlan::Limit(l) => {
107            let child = map_plan(&l.child)?;
108            Ok(PhysicalOperator::Limit(LimitOp::new(
109                child,
110                l.skip.unwrap_or(0),
111                l.limit.unwrap_or(u64::MAX),
112            )))
113        }
114
115        LogicalPlan::Distinct(d) => {
116            let child = map_plan(&d.child)?;
117            Ok(PhysicalOperator::Distinct(DistinctOp::new(child)))
118        }
119
120        LogicalPlan::Unwind(u) => {
121            let child = map_plan(&u.child)?;
122            Ok(PhysicalOperator::Unwind(UnwindOp::new(
123                child,
124                u.expression.clone(),
125            )))
126        }
127
128        LogicalPlan::RecursiveJoin(rj) => {
129            let child = map_plan(&rj.child)?;
130            Ok(PhysicalOperator::RecursiveJoin(RecursiveJoinOp::new(
131                child,
132                crate::operators::recursive_join::RecursiveJoinConfig {
133                    rel_table_id: rj.rel_table_id,
134                    dest_table_id: rj.dest_table_id,
135                    direction: rj.direction,
136                    min_hops: rj.min_hops,
137                    max_hops: rj.max_hops,
138                    src_key_col: rj.src_key_col as usize,
139                    dest_key_col: rj.dest_key_col as usize,
140                    dest_ncols: rj.dest_columns.len(),
141                },
142            )))
143        }
144
145        LogicalPlan::Empty(e) => Ok(PhysicalOperator::Empty(EmptyOp::new(e.num_columns))),
146
147        LogicalPlan::Union(u) => {
148            // Execute union as: first child, then second child, etc.
149            // For now, return NotImplemented for multi-child union.
150            if u.children.len() == 1 {
151                map_plan(&u.children[0])
152            } else {
153                Err(KyuError::NotImplemented("UNION execution".into()))
154            }
155        }
156
157        LogicalPlan::CreateNode(_) | LogicalPlan::SetProperty(_) | LogicalPlan::Delete(_) => Err(
158            KyuError::NotImplemented("mutation operators not yet executable".into()),
159        ),
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use kyu_common::id::TableId;
167    use kyu_expression::BoundExpression;
168    use kyu_types::{LogicalType, TypedValue};
169    use smol_str::SmolStr;
170
171    #[test]
172    fn map_empty() {
173        let plan = LogicalPlan::Empty(LogicalEmpty { num_columns: 2 });
174        let op = map_plan(&plan).unwrap();
175        assert!(matches!(op, PhysicalOperator::Empty(_)));
176    }
177
178    #[test]
179    fn map_scan_node() {
180        let plan = LogicalPlan::ScanNode(LogicalScanNode {
181            table_id: TableId(0),
182            variable_index: Some(0),
183            output_columns: vec![(SmolStr::new("x"), LogicalType::Int64)],
184        });
185        let op = map_plan(&plan).unwrap();
186        assert!(matches!(op, PhysicalOperator::ScanNode(_)));
187    }
188
189    #[test]
190    fn map_projection() {
191        let plan = LogicalPlan::Projection(Box::new(LogicalProjection {
192            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
193            expressions: vec![BoundExpression::Literal {
194                value: TypedValue::Int64(1),
195                result_type: LogicalType::Int64,
196            }],
197            aliases: vec![SmolStr::new("x")],
198        }));
199        let op = map_plan(&plan).unwrap();
200        assert!(matches!(op, PhysicalOperator::Projection(_)));
201    }
202
203    #[test]
204    fn map_filter() {
205        let plan = LogicalPlan::Filter(Box::new(LogicalFilter {
206            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 1 }),
207            predicate: BoundExpression::Literal {
208                value: TypedValue::Bool(true),
209                result_type: LogicalType::Bool,
210            },
211        }));
212        let op = map_plan(&plan).unwrap();
213        assert!(matches!(op, PhysicalOperator::Filter(_)));
214    }
215
216    #[test]
217    fn map_limit() {
218        let plan = LogicalPlan::Limit(Box::new(LogicalLimit {
219            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
220            skip: Some(1),
221            limit: Some(10),
222        }));
223        let op = map_plan(&plan).unwrap();
224        assert!(matches!(op, PhysicalOperator::Limit(_)));
225    }
226}