grafeo_core/execution/operators/
project.rs1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::lpg::LpgStore;
6use grafeo_common::types::{LogicalType, Value};
7use std::sync::Arc;
8
9pub enum ProjectExpr {
11 Column(usize),
13 Constant(Value),
15 PropertyAccess {
17 column: usize,
19 property: String,
21 },
22}
23
24pub struct ProjectOperator {
26 child: Box<dyn Operator>,
28 projections: Vec<ProjectExpr>,
30 output_types: Vec<LogicalType>,
32 store: Option<Arc<LpgStore>>,
34}
35
36impl ProjectOperator {
37 pub fn new(
39 child: Box<dyn Operator>,
40 projections: Vec<ProjectExpr>,
41 output_types: Vec<LogicalType>,
42 ) -> Self {
43 assert_eq!(projections.len(), output_types.len());
44 Self {
45 child,
46 projections,
47 output_types,
48 store: None,
49 }
50 }
51
52 pub fn with_store(
54 child: Box<dyn Operator>,
55 projections: Vec<ProjectExpr>,
56 output_types: Vec<LogicalType>,
57 store: Arc<LpgStore>,
58 ) -> Self {
59 assert_eq!(projections.len(), output_types.len());
60 Self {
61 child,
62 projections,
63 output_types,
64 store: Some(store),
65 }
66 }
67
68 pub fn select_columns(
70 child: Box<dyn Operator>,
71 columns: Vec<usize>,
72 types: Vec<LogicalType>,
73 ) -> Self {
74 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
75 Self::new(child, projections, types)
76 }
77}
78
79impl Operator for ProjectOperator {
80 fn next(&mut self) -> OperatorResult {
81 let input = match self.child.next()? {
83 Some(c) => c,
84 None => return Ok(None),
85 };
86
87 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
89
90 for (i, proj) in self.projections.iter().enumerate() {
92 match proj {
93 ProjectExpr::Column(col_idx) => {
94 let input_col = input.column(*col_idx).ok_or_else(|| {
96 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
97 })?;
98
99 let output_col = output.column_mut(i).unwrap();
100
101 for row in input.selected_indices() {
103 if let Some(value) = input_col.get_value(row) {
104 output_col.push_value(value);
105 }
106 }
107 }
108 ProjectExpr::Constant(value) => {
109 let output_col = output.column_mut(i).unwrap();
111 for _ in input.selected_indices() {
112 output_col.push_value(value.clone());
113 }
114 }
115 ProjectExpr::PropertyAccess { column, property } => {
116 let input_col = input
118 .column(*column)
119 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
120
121 let output_col = output.column_mut(i).unwrap();
122
123 let store = self.store.as_ref().ok_or_else(|| {
124 OperatorError::Execution("Store required for property access".to_string())
125 })?;
126
127 for row in input.selected_indices() {
129 let value = if let Some(node_id) = input_col.get_node_id(row) {
131 store
132 .get_node(node_id)
133 .and_then(|node| node.get_property(property).cloned())
134 .unwrap_or(Value::Null)
135 } else if let Some(edge_id) = input_col.get_edge_id(row) {
136 store
137 .get_edge(edge_id)
138 .and_then(|edge| edge.get_property(property).cloned())
139 .unwrap_or(Value::Null)
140 } else {
141 Value::Null
142 };
143 output_col.push_value(value);
144 }
145 }
146 }
147 }
148
149 output.set_count(input.row_count());
150 Ok(Some(output))
151 }
152
153 fn reset(&mut self) {
154 self.child.reset();
155 }
156
157 fn name(&self) -> &'static str {
158 "Project"
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::execution::chunk::DataChunkBuilder;
166 use grafeo_common::types::Value;
167
168 struct MockScanOperator {
169 chunks: Vec<DataChunk>,
170 position: usize,
171 }
172
173 impl Operator for MockScanOperator {
174 fn next(&mut self) -> OperatorResult {
175 if self.position < self.chunks.len() {
176 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
177 self.position += 1;
178 Ok(Some(chunk))
179 } else {
180 Ok(None)
181 }
182 }
183
184 fn reset(&mut self) {
185 self.position = 0;
186 }
187
188 fn name(&self) -> &'static str {
189 "MockScan"
190 }
191 }
192
193 #[test]
194 fn test_project_select_columns() {
195 let mut builder =
197 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
198
199 builder.column_mut(0).unwrap().push_int64(1);
200 builder.column_mut(1).unwrap().push_string("hello");
201 builder.column_mut(2).unwrap().push_int64(100);
202 builder.advance_row();
203
204 builder.column_mut(0).unwrap().push_int64(2);
205 builder.column_mut(1).unwrap().push_string("world");
206 builder.column_mut(2).unwrap().push_int64(200);
207 builder.advance_row();
208
209 let chunk = builder.finish();
210
211 let mock_scan = MockScanOperator {
212 chunks: vec![chunk],
213 position: 0,
214 };
215
216 let mut project = ProjectOperator::select_columns(
218 Box::new(mock_scan),
219 vec![2, 0],
220 vec![LogicalType::Int64, LogicalType::Int64],
221 );
222
223 let result = project.next().unwrap().unwrap();
224
225 assert_eq!(result.column_count(), 2);
226 assert_eq!(result.row_count(), 2);
227
228 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
230 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
231 }
232
233 #[test]
234 fn test_project_constant() {
235 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
236 builder.column_mut(0).unwrap().push_int64(1);
237 builder.advance_row();
238 builder.column_mut(0).unwrap().push_int64(2);
239 builder.advance_row();
240
241 let chunk = builder.finish();
242
243 let mock_scan = MockScanOperator {
244 chunks: vec![chunk],
245 position: 0,
246 };
247
248 let mut project = ProjectOperator::new(
250 Box::new(mock_scan),
251 vec![
252 ProjectExpr::Column(0),
253 ProjectExpr::Constant(Value::String("constant".into())),
254 ],
255 vec![LogicalType::Int64, LogicalType::String],
256 );
257
258 let result = project.next().unwrap().unwrap();
259
260 assert_eq!(result.column_count(), 2);
261 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
262 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
263 }
264}