kyu_executor/operators/
projection.rs1use kyu_common::KyuResult;
9use kyu_expression::{evaluate, BoundExpression};
10
11use crate::batch_eval::evaluate_column;
12use crate::context::ExecutionContext;
13use crate::data_chunk::DataChunk;
14use crate::physical_plan::PhysicalOperator;
15use crate::value_vector::{SelectionVector, ValueVector};
16
17pub struct ProjectionOp {
18 pub child: Box<PhysicalOperator>,
19 pub expressions: Vec<BoundExpression>,
20 #[cfg(feature = "jit")]
21 jit_states: Vec<Option<crate::jit::JitState>>,
22}
23
24impl ProjectionOp {
25 pub fn new(child: PhysicalOperator, expressions: Vec<BoundExpression>) -> Self {
26 #[cfg(feature = "jit")]
27 let jit_states = expressions
28 .iter()
29 .map(|e| crate::jit::JitState::new_projection(e, 100_000))
30 .collect();
31
32 Self {
33 child: Box::new(child),
34 expressions,
35 #[cfg(feature = "jit")]
36 jit_states,
37 }
38 }
39
40 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
41 let mut chunk = match self.child.next(ctx)? {
42 Some(c) => c,
43 None => return Ok(None),
44 };
45
46 let n = chunk.num_rows();
47 let is_identity = chunk.selection().is_identity();
48 let mut out_columns = Vec::with_capacity(self.expressions.len());
49
50 #[cfg(feature = "jit")]
51 let jit_cache = ctx.jit_cache();
52
53 #[allow(unused_variables)]
54 for (i, expr) in self.expressions.iter().enumerate() {
55 if let BoundExpression::Variable { index, .. } = expr
57 && is_identity
58 {
59 out_columns.push(chunk.take_column(*index as usize));
60 continue;
61 }
62
63 #[cfg(feature = "jit")]
65 {
66 if let Some(ref jit) = self.jit_states[i] {
67 if let Some(cache) = jit_cache {
68 jit.observe_rows(n as u64, cache);
69 }
70 if let Some(col) = jit.try_eval_projection(&chunk) {
71 out_columns.push(col);
72 continue;
73 }
74 }
75 }
76
77 if let Some(result) = evaluate_column(expr, &chunk) {
79 out_columns.push(result?);
80 } else {
81 let mut col = Vec::with_capacity(n);
83 for row_idx in 0..n {
84 col.push(evaluate(expr, &chunk.row_ref(row_idx))?);
85 }
86 out_columns.push(ValueVector::Owned(col));
87 }
88 }
89
90 Ok(Some(DataChunk::from_vectors(
91 out_columns,
92 SelectionVector::identity(n),
93 )))
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use crate::context::MockStorage;
101 use kyu_parser::ast::BinaryOp;
102 use kyu_types::{LogicalType, TypedValue};
103
104 #[test]
105 fn project_literal() {
106 let storage = MockStorage::new();
107 let ctx = ExecutionContext::new(
108 kyu_catalog::CatalogContent::new(),
109 &storage,
110 );
111 let empty = PhysicalOperator::Empty(crate::operators::empty::EmptyOp::new(0));
112 let mut proj = ProjectionOp::new(
113 empty,
114 vec![BoundExpression::Literal {
115 value: TypedValue::Int64(42),
116 result_type: LogicalType::Int64,
117 }],
118 );
119 let chunk = proj.next(&ctx).unwrap().unwrap();
120 assert_eq!(chunk.num_rows(), 1);
121 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(42));
122 }
123
124 #[test]
125 fn project_expression() {
126 let mut storage = MockStorage::new();
127 storage.insert_table(
128 kyu_common::id::TableId(0),
129 vec![
130 vec![TypedValue::Int64(10)],
131 vec![TypedValue::Int64(20)],
132 ],
133 );
134 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
135
136 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
137 kyu_common::id::TableId(0),
138 ));
139
140 let expr = BoundExpression::BinaryOp {
142 op: BinaryOp::Add,
143 left: Box::new(BoundExpression::Variable {
144 index: 0,
145 result_type: LogicalType::Int64,
146 }),
147 right: Box::new(BoundExpression::Literal {
148 value: TypedValue::Int64(1),
149 result_type: LogicalType::Int64,
150 }),
151 result_type: LogicalType::Int64,
152 };
153
154 let mut proj = ProjectionOp::new(scan, vec![expr]);
155 let chunk = proj.next(&ctx).unwrap().unwrap();
156 assert_eq!(chunk.num_rows(), 2);
157 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(11));
158 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(21));
159 }
160}