1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12#[non_exhaustive]
14pub enum ProjectExpr {
15 Column(usize),
17 Constant(Value),
19 PropertyAccess {
21 column: usize,
23 property: String,
25 },
26 EdgeType {
28 column: usize,
30 },
31 Expression {
33 expr: FilterExpression,
35 variable_columns: HashMap<String, usize>,
37 },
38 NodeResolve {
40 column: usize,
42 },
43 EdgeResolve {
45 column: usize,
47 },
48 Coalesce {
50 first: usize,
52 second: usize,
54 },
55}
56
57pub struct ProjectOperator {
59 child: Box<dyn Operator>,
61 projections: Vec<ProjectExpr>,
63 output_types: Vec<LogicalType>,
65 store: Option<Arc<dyn GraphStore>>,
67 transaction_id: Option<TransactionId>,
69 viewing_epoch: Option<EpochId>,
71 session_context: SessionContext,
73}
74
75impl ProjectOperator {
76 pub fn new(
82 child: Box<dyn Operator>,
83 projections: Vec<ProjectExpr>,
84 output_types: Vec<LogicalType>,
85 ) -> Self {
86 assert_eq!(projections.len(), output_types.len());
87 Self {
88 child,
89 projections,
90 output_types,
91 store: None,
92 transaction_id: None,
93 viewing_epoch: None,
94 session_context: SessionContext::default(),
95 }
96 }
97
98 pub fn with_store(
104 child: Box<dyn Operator>,
105 projections: Vec<ProjectExpr>,
106 output_types: Vec<LogicalType>,
107 store: Arc<dyn GraphStore>,
108 ) -> Self {
109 assert_eq!(projections.len(), output_types.len());
110 Self {
111 child,
112 projections,
113 output_types,
114 store: Some(store),
115 transaction_id: None,
116 viewing_epoch: None,
117 session_context: SessionContext::default(),
118 }
119 }
120
121 pub fn with_transaction_context(
123 mut self,
124 epoch: EpochId,
125 transaction_id: Option<TransactionId>,
126 ) -> Self {
127 self.viewing_epoch = Some(epoch);
128 self.transaction_id = transaction_id;
129 self
130 }
131
132 pub fn with_session_context(mut self, context: SessionContext) -> Self {
134 self.session_context = context;
135 self
136 }
137
138 pub fn select_columns(
140 child: Box<dyn Operator>,
141 columns: Vec<usize>,
142 types: Vec<LogicalType>,
143 ) -> Self {
144 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
145 Self::new(child, projections, types)
146 }
147}
148
149impl Operator for ProjectOperator {
150 fn next(&mut self) -> OperatorResult {
151 let Some(input) = self.child.next()? else {
153 return Ok(None);
154 };
155
156 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
158
159 for (i, proj) in self.projections.iter().enumerate() {
161 match proj {
162 ProjectExpr::Column(col_idx) => {
163 let input_col = input.column(*col_idx).ok_or_else(|| {
165 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
166 })?;
167
168 let output_col = output
169 .column_mut(i)
170 .expect("column exists: index matches projection schema");
171
172 for row in input.selected_indices() {
174 if let Some(value) = input_col.get_value(row) {
175 output_col.push_value(value);
176 }
177 }
178 }
179 ProjectExpr::Constant(value) => {
180 let output_col = output
182 .column_mut(i)
183 .expect("column exists: index matches projection schema");
184 for _ in input.selected_indices() {
185 output_col.push_value(value.clone());
186 }
187 }
188 ProjectExpr::PropertyAccess { column, property } => {
189 let input_col = input
191 .column(*column)
192 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
193
194 let output_col = output
195 .column_mut(i)
196 .expect("column exists: index matches projection schema");
197
198 let store = self.store.as_ref().ok_or_else(|| {
199 OperatorError::Execution("Store required for property access".to_string())
200 })?;
201
202 let prop_key = PropertyKey::new(property);
209 let epoch = self.viewing_epoch;
210 let tx_id = self.transaction_id;
211 for row in input.selected_indices() {
212 let value = if let Some(node_id) = input_col.get_node_id(row) {
213 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
214 store.get_node_versioned(node_id, ep, tx)
215 } else if let Some(ep) = epoch {
216 store.get_node_at_epoch(node_id, ep)
217 } else {
218 store.get_node(node_id)
219 };
220 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
221 {
222 prop
223 } else if let Some(edge_id) = input_col.get_edge_id(row) {
224 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
227 store.get_edge_versioned(edge_id, ep, tx)
228 } else if let Some(ep) = epoch {
229 store.get_edge_at_epoch(edge_id, ep)
230 } else {
231 store.get_edge(edge_id)
232 };
233 edge.and_then(|e| e.get_property(property).cloned())
234 .unwrap_or(Value::Null)
235 } else {
236 Value::Null
237 }
238 } else if let Some(edge_id) = input_col.get_edge_id(row) {
239 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
240 store.get_edge_versioned(edge_id, ep, tx)
241 } else if let Some(ep) = epoch {
242 store.get_edge_at_epoch(edge_id, ep)
243 } else {
244 store.get_edge(edge_id)
245 };
246 edge.and_then(|e| e.get_property(property).cloned())
247 .unwrap_or(Value::Null)
248 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
249 map.get(&prop_key).cloned().unwrap_or(Value::Null)
250 } else {
251 Value::Null
252 };
253 output_col.push_value(value);
254 }
255 }
256 ProjectExpr::EdgeType { column } => {
257 let input_col = input
259 .column(*column)
260 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
261
262 let output_col = output
263 .column_mut(i)
264 .expect("column exists: index matches projection schema");
265
266 let store = self.store.as_ref().ok_or_else(|| {
267 OperatorError::Execution("Store required for edge type access".to_string())
268 })?;
269
270 let epoch = self.viewing_epoch;
271 let tx_id = self.transaction_id;
272 for row in input.selected_indices() {
273 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
274 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
275 store.edge_type_versioned(edge_id, ep, tx)
276 } else {
277 store.edge_type(edge_id)
278 };
279 etype.map_or(Value::Null, Value::String)
280 } else {
281 Value::Null
282 };
283 output_col.push_value(value);
284 }
285 }
286 ProjectExpr::Expression {
287 expr,
288 variable_columns,
289 } => {
290 let output_col = output
291 .column_mut(i)
292 .expect("column exists: index matches projection schema");
293
294 let store = self.store.as_ref().ok_or_else(|| {
295 OperatorError::Execution(
296 "Store required for expression evaluation".to_string(),
297 )
298 })?;
299
300 let mut evaluator = ExpressionPredicate::new(
302 expr.clone(),
303 variable_columns.clone(),
304 Arc::clone(store),
305 )
306 .with_session_context(self.session_context.clone());
307 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
308 evaluator = evaluator.with_transaction_context(ep, tx_id);
309 }
310
311 for row in input.selected_indices() {
312 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
313 output_col.push_value(value);
314 }
315 }
316 ProjectExpr::NodeResolve { column } => {
317 let input_col = input
318 .column(*column)
319 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
320
321 let output_col = output
322 .column_mut(i)
323 .expect("column exists: index matches projection schema");
324
325 let store = self.store.as_ref().ok_or_else(|| {
326 OperatorError::Execution("Store required for node resolution".to_string())
327 })?;
328
329 let epoch = self.viewing_epoch;
330 let tx_id = self.transaction_id;
331 for row in input.selected_indices() {
332 let value = if let Some(node_id) = input_col.get_node_id(row) {
333 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
334 store.get_node_versioned(node_id, ep, tx)
335 } else if let Some(ep) = epoch {
336 store.get_node_at_epoch(node_id, ep)
337 } else {
338 store.get_node(node_id)
339 };
340 node.map_or(Value::Null, |n| node_to_map(&n))
341 } else {
342 Value::Null
343 };
344 output_col.push_value(value);
345 }
346 }
347 ProjectExpr::EdgeResolve { column } => {
348 let input_col = input
349 .column(*column)
350 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
351
352 let output_col = output
353 .column_mut(i)
354 .expect("column exists: index matches projection schema");
355
356 let store = self.store.as_ref().ok_or_else(|| {
357 OperatorError::Execution("Store required for edge resolution".to_string())
358 })?;
359
360 let epoch = self.viewing_epoch;
361 let tx_id = self.transaction_id;
362 for row in input.selected_indices() {
363 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
364 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
365 store.get_edge_versioned(edge_id, ep, tx)
366 } else if let Some(ep) = epoch {
367 store.get_edge_at_epoch(edge_id, ep)
368 } else {
369 store.get_edge(edge_id)
370 };
371 edge.map_or(Value::Null, |e| edge_to_map(&e))
372 } else {
373 Value::Null
374 };
375 output_col.push_value(value);
376 }
377 }
378 ProjectExpr::Coalesce { first, second } => {
379 let first_col = input
380 .column(*first)
381 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
382 let second_col = input
383 .column(*second)
384 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
385
386 let output_col = output
387 .column_mut(i)
388 .expect("column exists: index matches projection schema");
389
390 for row in input.selected_indices() {
391 let value = match first_col.get_value(row) {
392 Some(Value::Null) | None => {
393 second_col.get_value(row).unwrap_or(Value::Null)
394 }
395 Some(v) => v,
396 };
397 output_col.push_value(value);
398 }
399 }
400 }
401 }
402
403 output.set_count(input.row_count());
404 Ok(Some(output))
405 }
406
407 fn reset(&mut self) {
408 self.child.reset();
409 }
410
411 fn name(&self) -> &'static str {
412 "Project"
413 }
414}
415
416fn node_to_map(node: &Node) -> Value {
421 let mut map = BTreeMap::new();
422 map.insert(
423 PropertyKey::new("_id"),
424 Value::Int64(node.id.as_u64() as i64),
425 );
426 let labels: Vec<Value> = node
427 .labels
428 .iter()
429 .map(|l| Value::String(l.clone()))
430 .collect();
431 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
432 for (key, value) in &node.properties {
433 map.insert(key.clone(), value.clone());
434 }
435 Value::Map(Arc::new(map))
436}
437
438fn edge_to_map(edge: &Edge) -> Value {
443 let mut map = BTreeMap::new();
444 map.insert(
445 PropertyKey::new("_id"),
446 Value::Int64(edge.id.as_u64() as i64),
447 );
448 map.insert(
449 PropertyKey::new("_type"),
450 Value::String(edge.edge_type.clone()),
451 );
452 map.insert(
453 PropertyKey::new("_source"),
454 Value::Int64(edge.src.as_u64() as i64),
455 );
456 map.insert(
457 PropertyKey::new("_target"),
458 Value::Int64(edge.dst.as_u64() as i64),
459 );
460 for (key, value) in &edge.properties {
461 map.insert(key.clone(), value.clone());
462 }
463 Value::Map(Arc::new(map))
464}
465
466#[cfg(all(test, feature = "lpg"))]
467mod tests {
468 use super::*;
469 use crate::execution::chunk::DataChunkBuilder;
470 use crate::graph::lpg::LpgStore;
471 use grafeo_common::types::Value;
472
473 struct MockScanOperator {
474 chunks: Vec<DataChunk>,
475 position: usize,
476 }
477
478 impl Operator for MockScanOperator {
479 fn next(&mut self) -> OperatorResult {
480 if self.position < self.chunks.len() {
481 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
482 self.position += 1;
483 Ok(Some(chunk))
484 } else {
485 Ok(None)
486 }
487 }
488
489 fn reset(&mut self) {
490 self.position = 0;
491 }
492
493 fn name(&self) -> &'static str {
494 "MockScan"
495 }
496 }
497
498 #[test]
499 fn test_project_select_columns() {
500 let mut builder =
502 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
503
504 builder.column_mut(0).unwrap().push_int64(1);
505 builder.column_mut(1).unwrap().push_string("hello");
506 builder.column_mut(2).unwrap().push_int64(100);
507 builder.advance_row();
508
509 builder.column_mut(0).unwrap().push_int64(2);
510 builder.column_mut(1).unwrap().push_string("world");
511 builder.column_mut(2).unwrap().push_int64(200);
512 builder.advance_row();
513
514 let chunk = builder.finish();
515
516 let mock_scan = MockScanOperator {
517 chunks: vec![chunk],
518 position: 0,
519 };
520
521 let mut project = ProjectOperator::select_columns(
523 Box::new(mock_scan),
524 vec![2, 0],
525 vec![LogicalType::Int64, LogicalType::Int64],
526 );
527
528 let result = project.next().unwrap().unwrap();
529
530 assert_eq!(result.column_count(), 2);
531 assert_eq!(result.row_count(), 2);
532
533 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
535 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
536 }
537
538 #[test]
539 fn test_project_constant() {
540 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
541 builder.column_mut(0).unwrap().push_int64(1);
542 builder.advance_row();
543 builder.column_mut(0).unwrap().push_int64(2);
544 builder.advance_row();
545
546 let chunk = builder.finish();
547
548 let mock_scan = MockScanOperator {
549 chunks: vec![chunk],
550 position: 0,
551 };
552
553 let mut project = ProjectOperator::new(
555 Box::new(mock_scan),
556 vec![
557 ProjectExpr::Column(0),
558 ProjectExpr::Constant(Value::String("constant".into())),
559 ],
560 vec![LogicalType::Int64, LogicalType::String],
561 );
562
563 let result = project.next().unwrap().unwrap();
564
565 assert_eq!(result.column_count(), 2);
566 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
567 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
568 }
569
570 #[test]
571 fn test_project_empty_input() {
572 let mock_scan = MockScanOperator {
573 chunks: vec![],
574 position: 0,
575 };
576
577 let mut project =
578 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
579
580 assert!(project.next().unwrap().is_none());
581 }
582
583 #[test]
584 fn test_project_column_not_found() {
585 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
586 builder.column_mut(0).unwrap().push_int64(1);
587 builder.advance_row();
588 let chunk = builder.finish();
589
590 let mock_scan = MockScanOperator {
591 chunks: vec![chunk],
592 position: 0,
593 };
594
595 let mut project = ProjectOperator::new(
597 Box::new(mock_scan),
598 vec![ProjectExpr::Column(5)],
599 vec![LogicalType::Int64],
600 );
601
602 let result = project.next();
603 assert!(result.is_err(), "Should fail with ColumnNotFound");
604 }
605
606 #[test]
607 fn test_project_multiple_constants() {
608 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
609 builder.column_mut(0).unwrap().push_int64(1);
610 builder.advance_row();
611 let chunk = builder.finish();
612
613 let mock_scan = MockScanOperator {
614 chunks: vec![chunk],
615 position: 0,
616 };
617
618 let mut project = ProjectOperator::new(
619 Box::new(mock_scan),
620 vec![
621 ProjectExpr::Constant(Value::Int64(42)),
622 ProjectExpr::Constant(Value::String("fixed".into())),
623 ProjectExpr::Constant(Value::Bool(true)),
624 ],
625 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
626 );
627
628 let result = project.next().unwrap().unwrap();
629 assert_eq!(result.column_count(), 3);
630 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
631 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
632 assert_eq!(
633 result.column(2).unwrap().get_value(0),
634 Some(Value::Bool(true))
635 );
636 }
637
638 #[test]
639 fn test_project_identity() {
640 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
642 builder.column_mut(0).unwrap().push_int64(10);
643 builder.column_mut(1).unwrap().push_string("test");
644 builder.advance_row();
645 let chunk = builder.finish();
646
647 let mock_scan = MockScanOperator {
648 chunks: vec![chunk],
649 position: 0,
650 };
651
652 let mut project = ProjectOperator::select_columns(
653 Box::new(mock_scan),
654 vec![0, 1],
655 vec![LogicalType::Int64, LogicalType::String],
656 );
657
658 let result = project.next().unwrap().unwrap();
659 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
660 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
661 }
662
663 #[test]
664 fn test_project_name() {
665 let mock_scan = MockScanOperator {
666 chunks: vec![],
667 position: 0,
668 };
669 let project =
670 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
671 assert_eq!(project.name(), "Project");
672 }
673
674 #[test]
675 fn test_project_node_resolve() {
676 let store = LpgStore::new().unwrap();
678 let node_id = store.create_node(&["Person"]);
679 store.set_node_property(node_id, "name", Value::String("Alix".into()));
680 store.set_node_property(node_id, "age", Value::Int64(30));
681
682 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
684 builder.column_mut(0).unwrap().push_node_id(node_id);
685 builder.advance_row();
686 let chunk = builder.finish();
687
688 let mock_scan = MockScanOperator {
689 chunks: vec![chunk],
690 position: 0,
691 };
692
693 let mut project = ProjectOperator::with_store(
694 Box::new(mock_scan),
695 vec![ProjectExpr::NodeResolve { column: 0 }],
696 vec![LogicalType::Any],
697 Arc::new(store),
698 );
699
700 let result = project.next().unwrap().unwrap();
701 assert_eq!(result.column_count(), 1);
702
703 let value = result.column(0).unwrap().get_value(0).unwrap();
704 if let Value::Map(map) = value {
705 assert_eq!(
706 map.get(&PropertyKey::new("_id")),
707 Some(&Value::Int64(node_id.as_u64() as i64))
708 );
709 assert!(map.get(&PropertyKey::new("_labels")).is_some());
710 assert_eq!(
711 map.get(&PropertyKey::new("name")),
712 Some(&Value::String("Alix".into()))
713 );
714 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
715 } else {
716 panic!("Expected Value::Map, got {:?}", value);
717 }
718 }
719
720 #[test]
721 fn test_project_edge_resolve() {
722 let store = LpgStore::new().unwrap();
723 let src = store.create_node(&["Person"]);
724 let dst = store.create_node(&["Company"]);
725 let edge_id = store.create_edge(src, dst, "WORKS_AT");
726 store.set_edge_property(edge_id, "since", Value::Int64(2020));
727
728 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
730 builder.column_mut(0).unwrap().push_edge_id(edge_id);
731 builder.advance_row();
732 let chunk = builder.finish();
733
734 let mock_scan = MockScanOperator {
735 chunks: vec![chunk],
736 position: 0,
737 };
738
739 let mut project = ProjectOperator::with_store(
740 Box::new(mock_scan),
741 vec![ProjectExpr::EdgeResolve { column: 0 }],
742 vec![LogicalType::Any],
743 Arc::new(store),
744 );
745
746 let result = project.next().unwrap().unwrap();
747 let value = result.column(0).unwrap().get_value(0).unwrap();
748 if let Value::Map(map) = value {
749 assert_eq!(
750 map.get(&PropertyKey::new("_id")),
751 Some(&Value::Int64(edge_id.as_u64() as i64))
752 );
753 assert_eq!(
754 map.get(&PropertyKey::new("_type")),
755 Some(&Value::String("WORKS_AT".into()))
756 );
757 assert_eq!(
758 map.get(&PropertyKey::new("_source")),
759 Some(&Value::Int64(src.as_u64() as i64))
760 );
761 assert_eq!(
762 map.get(&PropertyKey::new("_target")),
763 Some(&Value::Int64(dst.as_u64() as i64))
764 );
765 assert_eq!(
766 map.get(&PropertyKey::new("since")),
767 Some(&Value::Int64(2020))
768 );
769 } else {
770 panic!("Expected Value::Map, got {:?}", value);
771 }
772 }
773
774 #[test]
775 fn test_project_resolve_missing_entity() {
776 use grafeo_common::types::NodeId;
777
778 let store = LpgStore::new().unwrap();
779
780 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
782 builder
783 .column_mut(0)
784 .unwrap()
785 .push_node_id(NodeId::new(999));
786 builder.advance_row();
787 let chunk = builder.finish();
788
789 let mock_scan = MockScanOperator {
790 chunks: vec![chunk],
791 position: 0,
792 };
793
794 let mut project = ProjectOperator::with_store(
795 Box::new(mock_scan),
796 vec![ProjectExpr::NodeResolve { column: 0 }],
797 vec![LogicalType::Any],
798 Arc::new(store),
799 );
800
801 let result = project.next().unwrap().unwrap();
802 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
803 }
804}