Skip to main content

kyu_executor/operators/
projection.rs

1//! Projection operator — evaluates expression list per row.
2//!
3//! Evaluation cascade per expression:
4//! 1. JIT compiled (if available) — native code writing to flat buffers
5//! 2. Batch column evaluation — pattern-matched common patterns
6//! 3. Scalar fallback — tree-walking `evaluate()` per row
7
8use kyu_common::KyuResult;
9use kyu_expression::{evaluate, BoundExpression};
10
11use crate::batch_eval::evaluate_column;
12use crate::context::ExecutionContext;
13use crate::data_chunk::DataChunk;
14use crate::physical_plan::PhysicalOperator;
15use crate::value_vector::{SelectionVector, ValueVector};
16
17pub struct ProjectionOp {
18    pub child: Box<PhysicalOperator>,
19    pub expressions: Vec<BoundExpression>,
20    #[cfg(feature = "jit")]
21    jit_states: Vec<Option<crate::jit::JitState>>,
22}
23
24impl ProjectionOp {
25    pub fn new(child: PhysicalOperator, expressions: Vec<BoundExpression>) -> Self {
26        #[cfg(feature = "jit")]
27        let jit_states = expressions
28            .iter()
29            .map(|e| crate::jit::JitState::new_projection(e, 100_000))
30            .collect();
31
32        Self {
33            child: Box::new(child),
34            expressions,
35            #[cfg(feature = "jit")]
36            jit_states,
37        }
38    }
39
40    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
41        let mut chunk = match self.child.next(ctx)? {
42            Some(c) => c,
43            None => return Ok(None),
44        };
45
46        let n = chunk.num_rows();
47        let is_identity = chunk.selection().is_identity();
48        let mut out_columns = Vec::with_capacity(self.expressions.len());
49
50        #[cfg(feature = "jit")]
51        let jit_cache = ctx.jit_cache();
52
53        #[allow(unused_variables)]
54        for (i, expr) in self.expressions.iter().enumerate() {
55            // Fast path: Variable ref with identity selection — move column, no clone.
56            if let BoundExpression::Variable { index, .. } = expr
57                && is_identity
58            {
59                out_columns.push(chunk.take_column(*index as usize));
60                continue;
61            }
62
63            // Tier 1: JIT compiled projection
64            #[cfg(feature = "jit")]
65            {
66                if let Some(ref jit) = self.jit_states[i] {
67                    if let Some(cache) = jit_cache {
68                        jit.observe_rows(n as u64, cache);
69                    }
70                    if let Some(col) = jit.try_eval_projection(&chunk) {
71                        out_columns.push(col);
72                        continue;
73                    }
74                }
75            }
76
77            // Tier 2: Batch column evaluation
78            if let Some(result) = evaluate_column(expr, &chunk) {
79                out_columns.push(result?);
80            } else {
81                // Tier 3: Scalar fallback — evaluate per-row, collect into Owned
82                let mut col = Vec::with_capacity(n);
83                for row_idx in 0..n {
84                    col.push(evaluate(expr, &chunk.row_ref(row_idx))?);
85                }
86                out_columns.push(ValueVector::Owned(col));
87            }
88        }
89
90        Ok(Some(DataChunk::from_vectors(
91            out_columns,
92            SelectionVector::identity(n),
93        )))
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use crate::context::MockStorage;
101    use kyu_parser::ast::BinaryOp;
102    use kyu_types::{LogicalType, TypedValue};
103
104    #[test]
105    fn project_literal() {
106        let storage = MockStorage::new();
107        let ctx = ExecutionContext::new(
108            kyu_catalog::CatalogContent::new(),
109            &storage,
110        );
111        let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
112        let mut proj = ProjectionOp::new(
113            empty,
114            vec![BoundExpression::Literal {
115                value: TypedValue::Int64(42),
116                result_type: LogicalType::Int64,
117            }],
118        );
119        let chunk = proj.next(&ctx).unwrap().unwrap();
120        assert_eq!(chunk.num_rows(), 1);
121        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(42));
122    }
123
124    #[test]
125    fn project_expression() {
126        let mut storage = MockStorage::new();
127        storage.insert_table(
128            kyu_common::id::TableId(0),
129            vec![
130                vec![TypedValue::Int64(10)],
131                vec![TypedValue::Int64(20)],
132            ],
133        );
134        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
135
136        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
137            kyu_common::id::TableId(0),
138        ));
139
140        // Project: variable[0] + 1
141        let expr = BoundExpression::BinaryOp {
142            op: BinaryOp::Add,
143            left: Box::new(BoundExpression::Variable {
144                index: 0,
145                result_type: LogicalType::Int64,
146            }),
147            right: Box::new(BoundExpression::Literal {
148                value: TypedValue::Int64(1),
149                result_type: LogicalType::Int64,
150            }),
151            result_type: LogicalType::Int64,
152        };
153
154        let mut proj = ProjectionOp::new(scan, vec![expr]);
155        let chunk = proj.next(&ctx).unwrap().unwrap();
156        assert_eq!(chunk.num_rows(), 2);
157        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(11));
158        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(21));
159    }
160}