1use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::lpg::{Edge, LpgStore, Node};
7use grafeo_common::types::{LogicalType, PropertyKey, Value};
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11pub enum ProjectExpr {
13 Column(usize),
15 Constant(Value),
17 PropertyAccess {
19 column: usize,
21 property: String,
23 },
24 EdgeType {
26 column: usize,
28 },
29 Expression {
31 expr: FilterExpression,
33 variable_columns: HashMap<String, usize>,
35 },
36 NodeResolve {
38 column: usize,
40 },
41 EdgeResolve {
43 column: usize,
45 },
46}
47
48pub struct ProjectOperator {
50 child: Box<dyn Operator>,
52 projections: Vec<ProjectExpr>,
54 output_types: Vec<LogicalType>,
56 store: Option<Arc<LpgStore>>,
58}
59
60impl ProjectOperator {
61 pub fn new(
63 child: Box<dyn Operator>,
64 projections: Vec<ProjectExpr>,
65 output_types: Vec<LogicalType>,
66 ) -> Self {
67 assert_eq!(projections.len(), output_types.len());
68 Self {
69 child,
70 projections,
71 output_types,
72 store: None,
73 }
74 }
75
76 pub fn with_store(
78 child: Box<dyn Operator>,
79 projections: Vec<ProjectExpr>,
80 output_types: Vec<LogicalType>,
81 store: Arc<LpgStore>,
82 ) -> Self {
83 assert_eq!(projections.len(), output_types.len());
84 Self {
85 child,
86 projections,
87 output_types,
88 store: Some(store),
89 }
90 }
91
92 pub fn select_columns(
94 child: Box<dyn Operator>,
95 columns: Vec<usize>,
96 types: Vec<LogicalType>,
97 ) -> Self {
98 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
99 Self::new(child, projections, types)
100 }
101}
102
103impl Operator for ProjectOperator {
104 fn next(&mut self) -> OperatorResult {
105 let Some(input) = self.child.next()? else {
107 return Ok(None);
108 };
109
110 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
112
113 for (i, proj) in self.projections.iter().enumerate() {
115 match proj {
116 ProjectExpr::Column(col_idx) => {
117 let input_col = input.column(*col_idx).ok_or_else(|| {
119 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
120 })?;
121
122 let output_col = output
123 .column_mut(i)
124 .expect("column exists: index matches projection schema");
125
126 for row in input.selected_indices() {
128 if let Some(value) = input_col.get_value(row) {
129 output_col.push_value(value);
130 }
131 }
132 }
133 ProjectExpr::Constant(value) => {
134 let output_col = output
136 .column_mut(i)
137 .expect("column exists: index matches projection schema");
138 for _ in input.selected_indices() {
139 output_col.push_value(value.clone());
140 }
141 }
142 ProjectExpr::PropertyAccess { column, property } => {
143 let input_col = input
145 .column(*column)
146 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
147
148 let output_col = output
149 .column_mut(i)
150 .expect("column exists: index matches projection schema");
151
152 let store = self.store.as_ref().ok_or_else(|| {
153 OperatorError::Execution("Store required for property access".to_string())
154 })?;
155
156 let prop_key = PropertyKey::new(property);
158 for row in input.selected_indices() {
159 let value = if let Some(node_id) = input_col.get_node_id(row) {
161 store
162 .get_node(node_id)
163 .and_then(|node| node.get_property(property).cloned())
164 .unwrap_or(Value::Null)
165 } else if let Some(edge_id) = input_col.get_edge_id(row) {
166 store
167 .get_edge(edge_id)
168 .and_then(|edge| edge.get_property(property).cloned())
169 .unwrap_or(Value::Null)
170 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
171 map.get(&prop_key).cloned().unwrap_or(Value::Null)
172 } else {
173 Value::Null
174 };
175 output_col.push_value(value);
176 }
177 }
178 ProjectExpr::EdgeType { column } => {
179 let input_col = input
181 .column(*column)
182 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
183
184 let output_col = output
185 .column_mut(i)
186 .expect("column exists: index matches projection schema");
187
188 let store = self.store.as_ref().ok_or_else(|| {
189 OperatorError::Execution("Store required for edge type access".to_string())
190 })?;
191
192 for row in input.selected_indices() {
193 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
194 store.edge_type(edge_id).map_or(Value::Null, Value::String)
195 } else {
196 Value::Null
197 };
198 output_col.push_value(value);
199 }
200 }
201 ProjectExpr::Expression {
202 expr,
203 variable_columns,
204 } => {
205 let output_col = output
206 .column_mut(i)
207 .expect("column exists: index matches projection schema");
208
209 let store = self.store.as_ref().ok_or_else(|| {
210 OperatorError::Execution(
211 "Store required for expression evaluation".to_string(),
212 )
213 })?;
214
215 let evaluator = ExpressionPredicate::new(
217 expr.clone(),
218 variable_columns.clone(),
219 Arc::clone(store),
220 );
221
222 for row in input.selected_indices() {
223 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
224 output_col.push_value(value);
225 }
226 }
227 ProjectExpr::NodeResolve { column } => {
228 let input_col = input
229 .column(*column)
230 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
231
232 let output_col = output
233 .column_mut(i)
234 .expect("column exists: index matches projection schema");
235
236 let store = self.store.as_ref().ok_or_else(|| {
237 OperatorError::Execution("Store required for node resolution".to_string())
238 })?;
239
240 for row in input.selected_indices() {
241 let value = if let Some(node_id) = input_col.get_node_id(row) {
242 store
243 .get_node(node_id)
244 .map_or(Value::Null, |n| node_to_map(&n))
245 } else {
246 Value::Null
247 };
248 output_col.push_value(value);
249 }
250 }
251 ProjectExpr::EdgeResolve { column } => {
252 let input_col = input
253 .column(*column)
254 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
255
256 let output_col = output
257 .column_mut(i)
258 .expect("column exists: index matches projection schema");
259
260 let store = self.store.as_ref().ok_or_else(|| {
261 OperatorError::Execution("Store required for edge resolution".to_string())
262 })?;
263
264 for row in input.selected_indices() {
265 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
266 store
267 .get_edge(edge_id)
268 .map_or(Value::Null, |e| edge_to_map(&e))
269 } else {
270 Value::Null
271 };
272 output_col.push_value(value);
273 }
274 }
275 }
276 }
277
278 output.set_count(input.row_count());
279 Ok(Some(output))
280 }
281
282 fn reset(&mut self) {
283 self.child.reset();
284 }
285
286 fn name(&self) -> &'static str {
287 "Project"
288 }
289}
290
291fn node_to_map(node: &Node) -> Value {
296 let mut map = BTreeMap::new();
297 map.insert(
298 PropertyKey::new("_id"),
299 Value::Int64(node.id.as_u64() as i64),
300 );
301 let labels: Vec<Value> = node
302 .labels
303 .iter()
304 .map(|l| Value::String(l.clone()))
305 .collect();
306 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
307 for (key, value) in &node.properties {
308 map.insert(key.clone(), value.clone());
309 }
310 Value::Map(Arc::new(map))
311}
312
313fn edge_to_map(edge: &Edge) -> Value {
318 let mut map = BTreeMap::new();
319 map.insert(
320 PropertyKey::new("_id"),
321 Value::Int64(edge.id.as_u64() as i64),
322 );
323 map.insert(
324 PropertyKey::new("_type"),
325 Value::String(edge.edge_type.clone()),
326 );
327 map.insert(
328 PropertyKey::new("_source"),
329 Value::Int64(edge.src.as_u64() as i64),
330 );
331 map.insert(
332 PropertyKey::new("_target"),
333 Value::Int64(edge.dst.as_u64() as i64),
334 );
335 for (key, value) in &edge.properties {
336 map.insert(key.clone(), value.clone());
337 }
338 Value::Map(Arc::new(map))
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::execution::chunk::DataChunkBuilder;
345 use grafeo_common::types::Value;
346
347 struct MockScanOperator {
348 chunks: Vec<DataChunk>,
349 position: usize,
350 }
351
352 impl Operator for MockScanOperator {
353 fn next(&mut self) -> OperatorResult {
354 if self.position < self.chunks.len() {
355 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
356 self.position += 1;
357 Ok(Some(chunk))
358 } else {
359 Ok(None)
360 }
361 }
362
363 fn reset(&mut self) {
364 self.position = 0;
365 }
366
367 fn name(&self) -> &'static str {
368 "MockScan"
369 }
370 }
371
372 #[test]
373 fn test_project_select_columns() {
374 let mut builder =
376 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
377
378 builder.column_mut(0).unwrap().push_int64(1);
379 builder.column_mut(1).unwrap().push_string("hello");
380 builder.column_mut(2).unwrap().push_int64(100);
381 builder.advance_row();
382
383 builder.column_mut(0).unwrap().push_int64(2);
384 builder.column_mut(1).unwrap().push_string("world");
385 builder.column_mut(2).unwrap().push_int64(200);
386 builder.advance_row();
387
388 let chunk = builder.finish();
389
390 let mock_scan = MockScanOperator {
391 chunks: vec![chunk],
392 position: 0,
393 };
394
395 let mut project = ProjectOperator::select_columns(
397 Box::new(mock_scan),
398 vec![2, 0],
399 vec![LogicalType::Int64, LogicalType::Int64],
400 );
401
402 let result = project.next().unwrap().unwrap();
403
404 assert_eq!(result.column_count(), 2);
405 assert_eq!(result.row_count(), 2);
406
407 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
409 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
410 }
411
412 #[test]
413 fn test_project_constant() {
414 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
415 builder.column_mut(0).unwrap().push_int64(1);
416 builder.advance_row();
417 builder.column_mut(0).unwrap().push_int64(2);
418 builder.advance_row();
419
420 let chunk = builder.finish();
421
422 let mock_scan = MockScanOperator {
423 chunks: vec![chunk],
424 position: 0,
425 };
426
427 let mut project = ProjectOperator::new(
429 Box::new(mock_scan),
430 vec![
431 ProjectExpr::Column(0),
432 ProjectExpr::Constant(Value::String("constant".into())),
433 ],
434 vec![LogicalType::Int64, LogicalType::String],
435 );
436
437 let result = project.next().unwrap().unwrap();
438
439 assert_eq!(result.column_count(), 2);
440 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
441 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
442 }
443
444 #[test]
445 fn test_project_empty_input() {
446 let mock_scan = MockScanOperator {
447 chunks: vec![],
448 position: 0,
449 };
450
451 let mut project =
452 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
453
454 assert!(project.next().unwrap().is_none());
455 }
456
457 #[test]
458 fn test_project_column_not_found() {
459 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
460 builder.column_mut(0).unwrap().push_int64(1);
461 builder.advance_row();
462 let chunk = builder.finish();
463
464 let mock_scan = MockScanOperator {
465 chunks: vec![chunk],
466 position: 0,
467 };
468
469 let mut project = ProjectOperator::new(
471 Box::new(mock_scan),
472 vec![ProjectExpr::Column(5)],
473 vec![LogicalType::Int64],
474 );
475
476 let result = project.next();
477 assert!(result.is_err(), "Should fail with ColumnNotFound");
478 }
479
480 #[test]
481 fn test_project_multiple_constants() {
482 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
483 builder.column_mut(0).unwrap().push_int64(1);
484 builder.advance_row();
485 let chunk = builder.finish();
486
487 let mock_scan = MockScanOperator {
488 chunks: vec![chunk],
489 position: 0,
490 };
491
492 let mut project = ProjectOperator::new(
493 Box::new(mock_scan),
494 vec![
495 ProjectExpr::Constant(Value::Int64(42)),
496 ProjectExpr::Constant(Value::String("fixed".into())),
497 ProjectExpr::Constant(Value::Bool(true)),
498 ],
499 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
500 );
501
502 let result = project.next().unwrap().unwrap();
503 assert_eq!(result.column_count(), 3);
504 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
505 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
506 assert_eq!(
507 result.column(2).unwrap().get_value(0),
508 Some(Value::Bool(true))
509 );
510 }
511
512 #[test]
513 fn test_project_identity() {
514 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
516 builder.column_mut(0).unwrap().push_int64(10);
517 builder.column_mut(1).unwrap().push_string("test");
518 builder.advance_row();
519 let chunk = builder.finish();
520
521 let mock_scan = MockScanOperator {
522 chunks: vec![chunk],
523 position: 0,
524 };
525
526 let mut project = ProjectOperator::select_columns(
527 Box::new(mock_scan),
528 vec![0, 1],
529 vec![LogicalType::Int64, LogicalType::String],
530 );
531
532 let result = project.next().unwrap().unwrap();
533 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
534 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
535 }
536
537 #[test]
538 fn test_project_name() {
539 let mock_scan = MockScanOperator {
540 chunks: vec![],
541 position: 0,
542 };
543 let project =
544 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
545 assert_eq!(project.name(), "Project");
546 }
547
548 #[test]
549 fn test_project_node_resolve() {
550 let store = LpgStore::new();
552 let node_id = store.create_node(&["Person"]);
553 store.set_node_property(node_id, "name", Value::String("Alice".into()));
554 store.set_node_property(node_id, "age", Value::Int64(30));
555
556 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
558 builder.column_mut(0).unwrap().push_node_id(node_id);
559 builder.advance_row();
560 let chunk = builder.finish();
561
562 let mock_scan = MockScanOperator {
563 chunks: vec![chunk],
564 position: 0,
565 };
566
567 let mut project = ProjectOperator::with_store(
568 Box::new(mock_scan),
569 vec![ProjectExpr::NodeResolve { column: 0 }],
570 vec![LogicalType::Any],
571 Arc::new(store),
572 );
573
574 let result = project.next().unwrap().unwrap();
575 assert_eq!(result.column_count(), 1);
576
577 let value = result.column(0).unwrap().get_value(0).unwrap();
578 if let Value::Map(map) = value {
579 assert_eq!(
580 map.get(&PropertyKey::new("_id")),
581 Some(&Value::Int64(node_id.as_u64() as i64))
582 );
583 assert!(map.get(&PropertyKey::new("_labels")).is_some());
584 assert_eq!(
585 map.get(&PropertyKey::new("name")),
586 Some(&Value::String("Alice".into()))
587 );
588 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
589 } else {
590 panic!("Expected Value::Map, got {:?}", value);
591 }
592 }
593
594 #[test]
595 fn test_project_edge_resolve() {
596 let store = LpgStore::new();
597 let src = store.create_node(&["Person"]);
598 let dst = store.create_node(&["Company"]);
599 let edge_id = store.create_edge(src, dst, "WORKS_AT");
600 store.set_edge_property(edge_id, "since", Value::Int64(2020));
601
602 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
604 builder.column_mut(0).unwrap().push_edge_id(edge_id);
605 builder.advance_row();
606 let chunk = builder.finish();
607
608 let mock_scan = MockScanOperator {
609 chunks: vec![chunk],
610 position: 0,
611 };
612
613 let mut project = ProjectOperator::with_store(
614 Box::new(mock_scan),
615 vec![ProjectExpr::EdgeResolve { column: 0 }],
616 vec![LogicalType::Any],
617 Arc::new(store),
618 );
619
620 let result = project.next().unwrap().unwrap();
621 let value = result.column(0).unwrap().get_value(0).unwrap();
622 if let Value::Map(map) = value {
623 assert_eq!(
624 map.get(&PropertyKey::new("_id")),
625 Some(&Value::Int64(edge_id.as_u64() as i64))
626 );
627 assert_eq!(
628 map.get(&PropertyKey::new("_type")),
629 Some(&Value::String("WORKS_AT".into()))
630 );
631 assert_eq!(
632 map.get(&PropertyKey::new("_source")),
633 Some(&Value::Int64(src.as_u64() as i64))
634 );
635 assert_eq!(
636 map.get(&PropertyKey::new("_target")),
637 Some(&Value::Int64(dst.as_u64() as i64))
638 );
639 assert_eq!(
640 map.get(&PropertyKey::new("since")),
641 Some(&Value::Int64(2020))
642 );
643 } else {
644 panic!("Expected Value::Map, got {:?}", value);
645 }
646 }
647
648 #[test]
649 fn test_project_resolve_missing_entity() {
650 use grafeo_common::types::NodeId;
651
652 let store = LpgStore::new();
653
654 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
656 builder
657 .column_mut(0)
658 .unwrap()
659 .push_node_id(NodeId::new(999));
660 builder.advance_row();
661 let chunk = builder.finish();
662
663 let mock_scan = MockScanOperator {
664 chunks: vec![chunk],
665 position: 0,
666 };
667
668 let mut project = ProjectOperator::with_store(
669 Box::new(mock_scan),
670 vec![ProjectExpr::NodeResolve { column: 0 }],
671 vec![LogicalType::Any],
672 Arc::new(store),
673 );
674
675 let result = project.next().unwrap().unwrap();
676 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
677 }
678}