Skip to main content

kyu_executor/
mapper.rs

1//! Plan mapper — translates LogicalPlan → PhysicalOperator (1:1 in Phase 6).
2
3use kyu_common::{KyuError, KyuResult};
4use kyu_planner::*;
5
6use crate::operators::*;
7use crate::physical_plan::PhysicalOperator;
8
9/// If all expressions are simple Variable references with unique indices,
10/// return the column indices in projection order. Otherwise return None.
11fn all_variable_indices(exprs: &[kyu_expression::BoundExpression]) -> Option<Vec<usize>> {
12    let mut indices = Vec::with_capacity(exprs.len());
13    for expr in exprs {
14        match expr {
15            kyu_expression::BoundExpression::Variable { index, .. } => {
16                indices.push(*index as usize);
17            }
18            _ => return None,
19        }
20    }
21    // Check uniqueness (duplicate column refs need projection to clone)
22    let mut seen = std::collections::HashSet::new();
23    if !indices.iter().all(|i| seen.insert(*i)) {
24        return None;
25    }
26    Some(indices)
27}
28
29/// Map a logical plan tree to a physical operator tree.
30pub fn map_plan(logical: &LogicalPlan) -> KyuResult<PhysicalOperator> {
31    match logical {
32        LogicalPlan::ScanNode(scan) => Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(
33            scan.table_id,
34        ))),
35
36        LogicalPlan::ScanRel(scan) => {
37            // For now, relationship scans use the same ScanNodeOp with the rel table id.
38            Ok(PhysicalOperator::ScanNode(ScanNodeOp::new(scan.table_id)))
39        }
40
41        LogicalPlan::Filter(f) => {
42            let child = map_plan(&f.child)?;
43            Ok(PhysicalOperator::Filter(FilterOp::new(
44                child,
45                f.predicate.clone(),
46            )))
47        }
48
49        LogicalPlan::Projection(p) => {
50            let child = map_plan(&p.child)?;
51            // Column pruning: when projecting only Variable refs over a ScanNode,
52            // push column selection into the scan and eliminate the projection.
53            if let PhysicalOperator::ScanNode(mut scan) = child {
54                if let Some(indices) = all_variable_indices(&p.expressions) {
55                    scan.column_indices = Some(indices);
56                    return Ok(PhysicalOperator::ScanNode(scan));
57                }
58                // Not all variables — keep projection
59                return Ok(PhysicalOperator::Projection(ProjectionOp::new(
60                    PhysicalOperator::ScanNode(scan),
61                    p.expressions.clone(),
62                )));
63            }
64            Ok(PhysicalOperator::Projection(ProjectionOp::new(
65                child,
66                p.expressions.clone(),
67            )))
68        }
69
70        LogicalPlan::HashJoin(j) => {
71            let build = map_plan(&j.build)?;
72            let probe = map_plan(&j.probe)?;
73            Ok(PhysicalOperator::HashJoin(HashJoinOp::new(
74                build,
75                probe,
76                j.build_keys.clone(),
77                j.probe_keys.clone(),
78            )))
79        }
80
81        LogicalPlan::CrossProduct(cp) => {
82            let left = map_plan(&cp.left)?;
83            let right = map_plan(&cp.right)?;
84            Ok(PhysicalOperator::CrossProduct(CrossProductOp::new(
85                left, right,
86            )))
87        }
88
89        LogicalPlan::Aggregate(a) => {
90            let child = map_plan(&a.child)?;
91            Ok(PhysicalOperator::Aggregate(AggregateOp::new(
92                child,
93                a.group_by.clone(),
94                a.aggregates.clone(),
95            )))
96        }
97
98        LogicalPlan::OrderBy(o) => {
99            let child = map_plan(&o.child)?;
100            Ok(PhysicalOperator::OrderBy(OrderByOp::new(
101                child,
102                o.order_by.clone(),
103            )))
104        }
105
106        LogicalPlan::Limit(l) => {
107            let child = map_plan(&l.child)?;
108            Ok(PhysicalOperator::Limit(LimitOp::new(
109                child,
110                l.skip.unwrap_or(0),
111                l.limit.unwrap_or(u64::MAX),
112            )))
113        }
114
115        LogicalPlan::Distinct(d) => {
116            let child = map_plan(&d.child)?;
117            Ok(PhysicalOperator::Distinct(DistinctOp::new(child)))
118        }
119
120        LogicalPlan::Unwind(u) => {
121            let child = map_plan(&u.child)?;
122            Ok(PhysicalOperator::Unwind(UnwindOp::new(
123                child,
124                u.expression.clone(),
125            )))
126        }
127
128        LogicalPlan::RecursiveJoin(rj) => {
129            let child = map_plan(&rj.child)?;
130            Ok(PhysicalOperator::RecursiveJoin(RecursiveJoinOp::new(
131                child,
132                crate::operators::recursive_join::RecursiveJoinConfig {
133                    rel_table_id: rj.rel_table_id,
134                    dest_table_id: rj.dest_table_id,
135                    direction: rj.direction,
136                    min_hops: rj.min_hops,
137                    max_hops: rj.max_hops,
138                    src_key_col: rj.src_key_col as usize,
139                    dest_key_col: rj.dest_key_col as usize,
140                    dest_ncols: rj.dest_columns.len(),
141                },
142            )))
143        }
144
145        LogicalPlan::Empty(e) => Ok(PhysicalOperator::Empty(EmptyOp::new(e.num_columns))),
146
147        LogicalPlan::Union(u) => {
148            // Execute union as: first child, then second child, etc.
149            // For now, return NotImplemented for multi-child union.
150            if u.children.len() == 1 {
151                map_plan(&u.children[0])
152            } else {
153                Err(KyuError::NotImplemented("UNION execution".into()))
154            }
155        }
156
157        LogicalPlan::CreateNode(_) | LogicalPlan::SetProperty(_) | LogicalPlan::Delete(_) => {
158            Err(KyuError::NotImplemented(
159                "mutation operators not yet executable".into(),
160            ))
161        }
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use kyu_common::id::TableId;
169    use kyu_expression::BoundExpression;
170    use kyu_types::{LogicalType, TypedValue};
171    use smol_str::SmolStr;
172
173    #[test]
174    fn map_empty() {
175        let plan = LogicalPlan::Empty(LogicalEmpty { num_columns: 2 });
176        let op = map_plan(&plan).unwrap();
177        assert!(matches!(op, PhysicalOperator::Empty(_)));
178    }
179
180    #[test]
181    fn map_scan_node() {
182        let plan = LogicalPlan::ScanNode(LogicalScanNode {
183            table_id: TableId(0),
184            variable_index: Some(0),
185            output_columns: vec![(SmolStr::new("x"), LogicalType::Int64)],
186        });
187        let op = map_plan(&plan).unwrap();
188        assert!(matches!(op, PhysicalOperator::ScanNode(_)));
189    }
190
191    #[test]
192    fn map_projection() {
193        let plan = LogicalPlan::Projection(Box::new(LogicalProjection {
194            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
195            expressions: vec![BoundExpression::Literal {
196                value: TypedValue::Int64(1),
197                result_type: LogicalType::Int64,
198            }],
199            aliases: vec![SmolStr::new("x")],
200        }));
201        let op = map_plan(&plan).unwrap();
202        assert!(matches!(op, PhysicalOperator::Projection(_)));
203    }
204
205    #[test]
206    fn map_filter() {
207        let plan = LogicalPlan::Filter(Box::new(LogicalFilter {
208            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 1 }),
209            predicate: BoundExpression::Literal {
210                value: TypedValue::Bool(true),
211                result_type: LogicalType::Bool,
212            },
213        }));
214        let op = map_plan(&plan).unwrap();
215        assert!(matches!(op, PhysicalOperator::Filter(_)));
216    }
217
218    #[test]
219    fn map_limit() {
220        let plan = LogicalPlan::Limit(Box::new(LogicalLimit {
221            child: LogicalPlan::Empty(LogicalEmpty { num_columns: 0 }),
222            skip: Some(1),
223            limit: Some(10),
224        }));
225        let op = map_plan(&plan).unwrap();
226        assert!(matches!(op, PhysicalOperator::Limit(_)));
227    }
228}