graphos_core/execution/operators/
project.rs1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use graphos_common::types::LogicalType;
6
7pub enum ProjectExpr {
9 Column(usize),
11 Constant(graphos_common::types::Value),
13 }
15
16pub struct ProjectOperator {
18 child: Box<dyn Operator>,
20 projections: Vec<ProjectExpr>,
22 output_types: Vec<LogicalType>,
24}
25
26impl ProjectOperator {
27 pub fn new(
29 child: Box<dyn Operator>,
30 projections: Vec<ProjectExpr>,
31 output_types: Vec<LogicalType>,
32 ) -> Self {
33 assert_eq!(projections.len(), output_types.len());
34 Self {
35 child,
36 projections,
37 output_types,
38 }
39 }
40
41 pub fn select_columns(child: Box<dyn Operator>, columns: Vec<usize>, types: Vec<LogicalType>) -> Self {
43 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
44 Self::new(child, projections, types)
45 }
46}
47
48impl Operator for ProjectOperator {
49 fn next(&mut self) -> OperatorResult {
50 let input = match self.child.next()? {
52 Some(c) => c,
53 None => return Ok(None),
54 };
55
56 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
58
59 for (i, proj) in self.projections.iter().enumerate() {
61 match proj {
62 ProjectExpr::Column(col_idx) => {
63 let input_col = input.column(*col_idx).ok_or_else(|| {
65 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
66 })?;
67
68 let output_col = output.column_mut(i).unwrap();
69
70 for row in input.selected_indices() {
72 if let Some(value) = input_col.get_value(row) {
73 output_col.push_value(value);
74 }
75 }
76 }
77 ProjectExpr::Constant(value) => {
78 let output_col = output.column_mut(i).unwrap();
80 for _ in input.selected_indices() {
81 output_col.push_value(value.clone());
82 }
83 }
84 }
85 }
86
87 output.set_count(input.row_count());
88 Ok(Some(output))
89 }
90
91 fn reset(&mut self) {
92 self.child.reset();
93 }
94
95 fn name(&self) -> &'static str {
96 "Project"
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use crate::execution::chunk::DataChunkBuilder;
104 use graphos_common::types::Value;
105
106 struct MockScanOperator {
107 chunks: Vec<DataChunk>,
108 position: usize,
109 }
110
111 impl Operator for MockScanOperator {
112 fn next(&mut self) -> OperatorResult {
113 if self.position < self.chunks.len() {
114 let chunk = std::mem::replace(
115 &mut self.chunks[self.position],
116 DataChunk::new(&[]),
117 );
118 self.position += 1;
119 Ok(Some(chunk))
120 } else {
121 Ok(None)
122 }
123 }
124
125 fn reset(&mut self) {
126 self.position = 0;
127 }
128
129 fn name(&self) -> &'static str {
130 "MockScan"
131 }
132 }
133
134 #[test]
135 fn test_project_select_columns() {
136 let mut builder = DataChunkBuilder::new(&[
138 LogicalType::Int64,
139 LogicalType::String,
140 LogicalType::Int64,
141 ]);
142
143 builder.column_mut(0).unwrap().push_int64(1);
144 builder.column_mut(1).unwrap().push_string("hello");
145 builder.column_mut(2).unwrap().push_int64(100);
146 builder.advance_row();
147
148 builder.column_mut(0).unwrap().push_int64(2);
149 builder.column_mut(1).unwrap().push_string("world");
150 builder.column_mut(2).unwrap().push_int64(200);
151 builder.advance_row();
152
153 let chunk = builder.finish();
154
155 let mock_scan = MockScanOperator {
156 chunks: vec![chunk],
157 position: 0,
158 };
159
160 let mut project = ProjectOperator::select_columns(
162 Box::new(mock_scan),
163 vec![2, 0],
164 vec![LogicalType::Int64, LogicalType::Int64],
165 );
166
167 let result = project.next().unwrap().unwrap();
168
169 assert_eq!(result.column_count(), 2);
170 assert_eq!(result.row_count(), 2);
171
172 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
174 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
175 }
176
177 #[test]
178 fn test_project_constant() {
179 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
180 builder.column_mut(0).unwrap().push_int64(1);
181 builder.advance_row();
182 builder.column_mut(0).unwrap().push_int64(2);
183 builder.advance_row();
184
185 let chunk = builder.finish();
186
187 let mock_scan = MockScanOperator {
188 chunks: vec![chunk],
189 position: 0,
190 };
191
192 let mut project = ProjectOperator::new(
194 Box::new(mock_scan),
195 vec![
196 ProjectExpr::Column(0),
197 ProjectExpr::Constant(Value::String("constant".into())),
198 ],
199 vec![LogicalType::Int64, LogicalType::String],
200 );
201
202 let result = project.next().unwrap().unwrap();
203
204 assert_eq!(result.column_count(), 2);
205 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
206 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
207 }
208}