Skip to main content

kyu_executor/operators/
projection.rs

1//! Projection operator — evaluates expression list per row.
2//!
3//! Evaluation cascade per expression:
4//! 1. JIT compiled (if available) — native code writing to flat buffers
5//! 2. Batch column evaluation — pattern-matched common patterns
6//! 3. Scalar fallback — tree-walking `evaluate()` per row
7
8use kyu_common::KyuResult;
9use kyu_expression::{BoundExpression, evaluate};
10
11use crate::batch_eval::evaluate_column;
12use crate::context::ExecutionContext;
13use crate::data_chunk::DataChunk;
14use crate::physical_plan::PhysicalOperator;
15use crate::value_vector::{SelectionVector, ValueVector};
16
17pub struct ProjectionOp {
18    pub child: Box<PhysicalOperator>,
19    pub expressions: Vec<BoundExpression>,
20    #[cfg(feature = "jit")]
21    jit_states: Vec<Option<crate::jit::JitState>>,
22}
23
24impl ProjectionOp {
25    pub fn new(child: PhysicalOperator, expressions: Vec<BoundExpression>) -> Self {
26        #[cfg(feature = "jit")]
27        let jit_states = expressions
28            .iter()
29            .map(|e| crate::jit::JitState::new_projection(e, 100_000))
30            .collect();
31
32        Self {
33            child: Box::new(child),
34            expressions,
35            #[cfg(feature = "jit")]
36            jit_states,
37        }
38    }
39
40    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
41        let mut chunk = match self.child.next(ctx)? {
42            Some(c) => c,
43            None => return Ok(None),
44        };
45
46        let n = chunk.num_rows();
47        let is_identity = chunk.selection().is_identity();
48        let mut out_columns = Vec::with_capacity(self.expressions.len());
49
50        #[cfg(feature = "jit")]
51        let jit_cache = ctx.jit_cache();
52
53        #[allow(unused_variables)]
54        for (i, expr) in self.expressions.iter().enumerate() {
55            // Fast path: Variable ref with identity selection — move column, no clone.
56            if let BoundExpression::Variable { index, .. } = expr
57                && is_identity
58            {
59                out_columns.push(chunk.take_column(*index as usize));
60                continue;
61            }
62
63            // Tier 1: JIT compiled projection
64            #[cfg(feature = "jit")]
65            {
66                if let Some(ref jit) = self.jit_states[i] {
67                    if let Some(cache) = jit_cache {
68                        jit.observe_rows(n as u64, cache);
69                    }
70                    if let Some(col) = jit.try_eval_projection(&chunk) {
71                        out_columns.push(col);
72                        continue;
73                    }
74                }
75            }
76
77            // Tier 2: Batch column evaluation
78            if let Some(result) = evaluate_column(expr, &chunk) {
79                out_columns.push(result?);
80            } else {
81                // Tier 3: Scalar fallback — evaluate per-row, collect into Owned
82                let mut col = Vec::with_capacity(n);
83                for row_idx in 0..n {
84                    col.push(evaluate(expr, &chunk.row_ref(row_idx))?);
85                }
86                out_columns.push(ValueVector::Owned(col));
87            }
88        }
89
90        Ok(Some(DataChunk::from_vectors(
91            out_columns,
92            SelectionVector::identity(n),
93        )))
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use crate::context::MockStorage;
101    use kyu_parser::ast::BinaryOp;
102    use kyu_types::{LogicalType, TypedValue};
103
104    #[test]
105    fn project_literal() {
106        let storage = MockStorage::new();
107        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
108        let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
109        let mut proj = ProjectionOp::new(
110            empty,
111            vec![BoundExpression::Literal {
112                value: TypedValue::Int64(42),
113                result_type: LogicalType::Int64,
114            }],
115        );
116        let chunk = proj.next(&ctx).unwrap().unwrap();
117        assert_eq!(chunk.num_rows(), 1);
118        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(42));
119    }
120
121    #[test]
122    fn project_expression() {
123        let mut storage = MockStorage::new();
124        storage.insert_table(
125            kyu_common::id::TableId(0),
126            vec![vec![TypedValue::Int64(10)], vec![TypedValue::Int64(20)]],
127        );
128        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
129
130        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
131            kyu_common::id::TableId(0),
132        ));
133
134        // Project: variable[0] + 1
135        let expr = BoundExpression::BinaryOp {
136            op: BinaryOp::Add,
137            left: Box::new(BoundExpression::Variable {
138                index: 0,
139                result_type: LogicalType::Int64,
140            }),
141            right: Box::new(BoundExpression::Literal {
142                value: TypedValue::Int64(1),
143                result_type: LogicalType::Int64,
144            }),
145            result_type: LogicalType::Int64,
146        };
147
148        let mut proj = ProjectionOp::new(scan, vec![expr]);
149        let chunk = proj.next(&ctx).unwrap().unwrap();
150        assert_eq!(chunk.num_rows(), 2);
151        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(11));
152        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(21));
153    }
154}