kyu_executor/operators/
projection.rs1use kyu_common::KyuResult;
9use kyu_expression::{BoundExpression, evaluate};
10
11use crate::batch_eval::evaluate_column;
12use crate::context::ExecutionContext;
13use crate::data_chunk::DataChunk;
14use crate::physical_plan::PhysicalOperator;
15use crate::value_vector::{SelectionVector, ValueVector};
16
17pub struct ProjectionOp {
18 pub child: Box<PhysicalOperator>,
19 pub expressions: Vec<BoundExpression>,
20 #[cfg(feature = "jit")]
21 jit_states: Vec<Option<crate::jit::JitState>>,
22}
23
24impl ProjectionOp {
25 pub fn new(child: PhysicalOperator, expressions: Vec<BoundExpression>) -> Self {
26 #[cfg(feature = "jit")]
27 let jit_states = expressions
28 .iter()
29 .map(|e| crate::jit::JitState::new_projection(e, 100_000))
30 .collect();
31
32 Self {
33 child: Box::new(child),
34 expressions,
35 #[cfg(feature = "jit")]
36 jit_states,
37 }
38 }
39
40 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
41 let mut chunk = match self.child.next(ctx)? {
42 Some(c) => c,
43 None => return Ok(None),
44 };
45
46 let n = chunk.num_rows();
47 let is_identity = chunk.selection().is_identity();
48 let mut out_columns = Vec::with_capacity(self.expressions.len());
49
50 #[cfg(feature = "jit")]
51 let jit_cache = ctx.jit_cache();
52
53 #[allow(unused_variables)]
54 for (i, expr) in self.expressions.iter().enumerate() {
55 if let BoundExpression::Variable { index, .. } = expr
57 && is_identity
58 {
59 out_columns.push(chunk.take_column(*index as usize));
60 continue;
61 }
62
63 #[cfg(feature = "jit")]
65 {
66 if let Some(ref jit) = self.jit_states[i] {
67 if let Some(cache) = jit_cache {
68 jit.observe_rows(n as u64, cache);
69 }
70 if let Some(col) = jit.try_eval_projection(&chunk) {
71 out_columns.push(col);
72 continue;
73 }
74 }
75 }
76
77 if let Some(result) = evaluate_column(expr, &chunk) {
79 out_columns.push(result?);
80 } else {
81 let mut col = Vec::with_capacity(n);
83 for row_idx in 0..n {
84 col.push(evaluate(expr, &chunk.row_ref(row_idx))?);
85 }
86 out_columns.push(ValueVector::Owned(col));
87 }
88 }
89
90 Ok(Some(DataChunk::from_vectors(
91 out_columns,
92 SelectionVector::identity(n),
93 )))
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use crate::context::MockStorage;
101 use kyu_parser::ast::BinaryOp;
102 use kyu_types::{LogicalType, TypedValue};
103
104 #[test]
105 fn project_literal() {
106 let storage = MockStorage::new();
107 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
108 let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
109 let mut proj = ProjectionOp::new(
110 empty,
111 vec![BoundExpression::Literal {
112 value: TypedValue::Int64(42),
113 result_type: LogicalType::Int64,
114 }],
115 );
116 let chunk = proj.next(&ctx).unwrap().unwrap();
117 assert_eq!(chunk.num_rows(), 1);
118 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(42));
119 }
120
121 #[test]
122 fn project_expression() {
123 let mut storage = MockStorage::new();
124 storage.insert_table(
125 kyu_common::id::TableId(0),
126 vec![vec![TypedValue::Int64(10)], vec![TypedValue::Int64(20)]],
127 );
128 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
129
130 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
131 kyu_common::id::TableId(0),
132 ));
133
134 let expr = BoundExpression::BinaryOp {
136 op: BinaryOp::Add,
137 left: Box::new(BoundExpression::Variable {
138 index: 0,
139 result_type: LogicalType::Int64,
140 }),
141 right: Box::new(BoundExpression::Literal {
142 value: TypedValue::Int64(1),
143 result_type: LogicalType::Int64,
144 }),
145 result_type: LogicalType::Int64,
146 };
147
148 let mut proj = ProjectionOp::new(scan, vec![expr]);
149 let chunk = proj.next(&ctx).unwrap().unwrap();
150 assert_eq!(chunk.num_rows(), 2);
151 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(11));
152 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(21));
153 }
154}