1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47 Coalesce {
49 first: usize,
51 second: usize,
53 },
54}
55
56pub struct ProjectOperator {
58 child: Box<dyn Operator>,
60 projections: Vec<ProjectExpr>,
62 output_types: Vec<LogicalType>,
64 store: Option<Arc<dyn GraphStore>>,
66 transaction_id: Option<TransactionId>,
68 viewing_epoch: Option<EpochId>,
70 session_context: SessionContext,
72}
73
74impl ProjectOperator {
75 pub fn new(
81 child: Box<dyn Operator>,
82 projections: Vec<ProjectExpr>,
83 output_types: Vec<LogicalType>,
84 ) -> Self {
85 assert_eq!(projections.len(), output_types.len());
86 Self {
87 child,
88 projections,
89 output_types,
90 store: None,
91 transaction_id: None,
92 viewing_epoch: None,
93 session_context: SessionContext::default(),
94 }
95 }
96
97 pub fn with_store(
103 child: Box<dyn Operator>,
104 projections: Vec<ProjectExpr>,
105 output_types: Vec<LogicalType>,
106 store: Arc<dyn GraphStore>,
107 ) -> Self {
108 assert_eq!(projections.len(), output_types.len());
109 Self {
110 child,
111 projections,
112 output_types,
113 store: Some(store),
114 transaction_id: None,
115 viewing_epoch: None,
116 session_context: SessionContext::default(),
117 }
118 }
119
120 pub fn with_transaction_context(
122 mut self,
123 epoch: EpochId,
124 transaction_id: Option<TransactionId>,
125 ) -> Self {
126 self.viewing_epoch = Some(epoch);
127 self.transaction_id = transaction_id;
128 self
129 }
130
131 pub fn with_session_context(mut self, context: SessionContext) -> Self {
133 self.session_context = context;
134 self
135 }
136
137 pub fn select_columns(
139 child: Box<dyn Operator>,
140 columns: Vec<usize>,
141 types: Vec<LogicalType>,
142 ) -> Self {
143 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
144 Self::new(child, projections, types)
145 }
146}
147
148impl Operator for ProjectOperator {
149 fn next(&mut self) -> OperatorResult {
150 let Some(input) = self.child.next()? else {
152 return Ok(None);
153 };
154
155 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
157
158 for (i, proj) in self.projections.iter().enumerate() {
160 match proj {
161 ProjectExpr::Column(col_idx) => {
162 let input_col = input.column(*col_idx).ok_or_else(|| {
164 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
165 })?;
166
167 let output_col = output
168 .column_mut(i)
169 .expect("column exists: index matches projection schema");
170
171 for row in input.selected_indices() {
173 if let Some(value) = input_col.get_value(row) {
174 output_col.push_value(value);
175 }
176 }
177 }
178 ProjectExpr::Constant(value) => {
179 let output_col = output
181 .column_mut(i)
182 .expect("column exists: index matches projection schema");
183 for _ in input.selected_indices() {
184 output_col.push_value(value.clone());
185 }
186 }
187 ProjectExpr::PropertyAccess { column, property } => {
188 let input_col = input
190 .column(*column)
191 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
192
193 let output_col = output
194 .column_mut(i)
195 .expect("column exists: index matches projection schema");
196
197 let store = self.store.as_ref().ok_or_else(|| {
198 OperatorError::Execution("Store required for property access".to_string())
199 })?;
200
201 let prop_key = PropertyKey::new(property);
208 let epoch = self.viewing_epoch;
209 let tx_id = self.transaction_id;
210 for row in input.selected_indices() {
211 let value = if let Some(node_id) = input_col.get_node_id(row) {
212 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
213 store.get_node_versioned(node_id, ep, tx)
214 } else if let Some(ep) = epoch {
215 store.get_node_at_epoch(node_id, ep)
216 } else {
217 store.get_node(node_id)
218 };
219 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
220 {
221 prop
222 } else if let Some(edge_id) = input_col.get_edge_id(row) {
223 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
226 store.get_edge_versioned(edge_id, ep, tx)
227 } else if let Some(ep) = epoch {
228 store.get_edge_at_epoch(edge_id, ep)
229 } else {
230 store.get_edge(edge_id)
231 };
232 edge.and_then(|e| e.get_property(property).cloned())
233 .unwrap_or(Value::Null)
234 } else {
235 Value::Null
236 }
237 } else if let Some(edge_id) = input_col.get_edge_id(row) {
238 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
239 store.get_edge_versioned(edge_id, ep, tx)
240 } else if let Some(ep) = epoch {
241 store.get_edge_at_epoch(edge_id, ep)
242 } else {
243 store.get_edge(edge_id)
244 };
245 edge.and_then(|e| e.get_property(property).cloned())
246 .unwrap_or(Value::Null)
247 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
248 map.get(&prop_key).cloned().unwrap_or(Value::Null)
249 } else {
250 Value::Null
251 };
252 output_col.push_value(value);
253 }
254 }
255 ProjectExpr::EdgeType { column } => {
256 let input_col = input
258 .column(*column)
259 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
260
261 let output_col = output
262 .column_mut(i)
263 .expect("column exists: index matches projection schema");
264
265 let store = self.store.as_ref().ok_or_else(|| {
266 OperatorError::Execution("Store required for edge type access".to_string())
267 })?;
268
269 let epoch = self.viewing_epoch;
270 let tx_id = self.transaction_id;
271 for row in input.selected_indices() {
272 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
273 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
274 store.edge_type_versioned(edge_id, ep, tx)
275 } else {
276 store.edge_type(edge_id)
277 };
278 etype.map_or(Value::Null, Value::String)
279 } else {
280 Value::Null
281 };
282 output_col.push_value(value);
283 }
284 }
285 ProjectExpr::Expression {
286 expr,
287 variable_columns,
288 } => {
289 let output_col = output
290 .column_mut(i)
291 .expect("column exists: index matches projection schema");
292
293 let store = self.store.as_ref().ok_or_else(|| {
294 OperatorError::Execution(
295 "Store required for expression evaluation".to_string(),
296 )
297 })?;
298
299 let mut evaluator = ExpressionPredicate::new(
301 expr.clone(),
302 variable_columns.clone(),
303 Arc::clone(store),
304 )
305 .with_session_context(self.session_context.clone());
306 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
307 evaluator = evaluator.with_transaction_context(ep, tx_id);
308 }
309
310 for row in input.selected_indices() {
311 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
312 output_col.push_value(value);
313 }
314 }
315 ProjectExpr::NodeResolve { column } => {
316 let input_col = input
317 .column(*column)
318 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
319
320 let output_col = output
321 .column_mut(i)
322 .expect("column exists: index matches projection schema");
323
324 let store = self.store.as_ref().ok_or_else(|| {
325 OperatorError::Execution("Store required for node resolution".to_string())
326 })?;
327
328 let epoch = self.viewing_epoch;
329 let tx_id = self.transaction_id;
330 for row in input.selected_indices() {
331 let value = if let Some(node_id) = input_col.get_node_id(row) {
332 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
333 store.get_node_versioned(node_id, ep, tx)
334 } else if let Some(ep) = epoch {
335 store.get_node_at_epoch(node_id, ep)
336 } else {
337 store.get_node(node_id)
338 };
339 node.map_or(Value::Null, |n| node_to_map(&n))
340 } else {
341 Value::Null
342 };
343 output_col.push_value(value);
344 }
345 }
346 ProjectExpr::EdgeResolve { column } => {
347 let input_col = input
348 .column(*column)
349 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
350
351 let output_col = output
352 .column_mut(i)
353 .expect("column exists: index matches projection schema");
354
355 let store = self.store.as_ref().ok_or_else(|| {
356 OperatorError::Execution("Store required for edge resolution".to_string())
357 })?;
358
359 let epoch = self.viewing_epoch;
360 let tx_id = self.transaction_id;
361 for row in input.selected_indices() {
362 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
363 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
364 store.get_edge_versioned(edge_id, ep, tx)
365 } else if let Some(ep) = epoch {
366 store.get_edge_at_epoch(edge_id, ep)
367 } else {
368 store.get_edge(edge_id)
369 };
370 edge.map_or(Value::Null, |e| edge_to_map(&e))
371 } else {
372 Value::Null
373 };
374 output_col.push_value(value);
375 }
376 }
377 ProjectExpr::Coalesce { first, second } => {
378 let first_col = input
379 .column(*first)
380 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
381 let second_col = input
382 .column(*second)
383 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
384
385 let output_col = output
386 .column_mut(i)
387 .expect("column exists: index matches projection schema");
388
389 for row in input.selected_indices() {
390 let value = match first_col.get_value(row) {
391 Some(Value::Null) | None => {
392 second_col.get_value(row).unwrap_or(Value::Null)
393 }
394 Some(v) => v,
395 };
396 output_col.push_value(value);
397 }
398 }
399 }
400 }
401
402 output.set_count(input.row_count());
403 Ok(Some(output))
404 }
405
406 fn reset(&mut self) {
407 self.child.reset();
408 }
409
410 fn name(&self) -> &'static str {
411 "Project"
412 }
413}
414
415fn node_to_map(node: &Node) -> Value {
420 let mut map = BTreeMap::new();
421 map.insert(
422 PropertyKey::new("_id"),
423 Value::Int64(node.id.as_u64() as i64),
424 );
425 let labels: Vec<Value> = node
426 .labels
427 .iter()
428 .map(|l| Value::String(l.clone()))
429 .collect();
430 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
431 for (key, value) in &node.properties {
432 map.insert(key.clone(), value.clone());
433 }
434 Value::Map(Arc::new(map))
435}
436
437fn edge_to_map(edge: &Edge) -> Value {
442 let mut map = BTreeMap::new();
443 map.insert(
444 PropertyKey::new("_id"),
445 Value::Int64(edge.id.as_u64() as i64),
446 );
447 map.insert(
448 PropertyKey::new("_type"),
449 Value::String(edge.edge_type.clone()),
450 );
451 map.insert(
452 PropertyKey::new("_source"),
453 Value::Int64(edge.src.as_u64() as i64),
454 );
455 map.insert(
456 PropertyKey::new("_target"),
457 Value::Int64(edge.dst.as_u64() as i64),
458 );
459 for (key, value) in &edge.properties {
460 map.insert(key.clone(), value.clone());
461 }
462 Value::Map(Arc::new(map))
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use crate::execution::chunk::DataChunkBuilder;
469 use crate::graph::lpg::LpgStore;
470 use grafeo_common::types::Value;
471
472 struct MockScanOperator {
473 chunks: Vec<DataChunk>,
474 position: usize,
475 }
476
477 impl Operator for MockScanOperator {
478 fn next(&mut self) -> OperatorResult {
479 if self.position < self.chunks.len() {
480 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
481 self.position += 1;
482 Ok(Some(chunk))
483 } else {
484 Ok(None)
485 }
486 }
487
488 fn reset(&mut self) {
489 self.position = 0;
490 }
491
492 fn name(&self) -> &'static str {
493 "MockScan"
494 }
495 }
496
497 #[test]
498 fn test_project_select_columns() {
499 let mut builder =
501 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
502
503 builder.column_mut(0).unwrap().push_int64(1);
504 builder.column_mut(1).unwrap().push_string("hello");
505 builder.column_mut(2).unwrap().push_int64(100);
506 builder.advance_row();
507
508 builder.column_mut(0).unwrap().push_int64(2);
509 builder.column_mut(1).unwrap().push_string("world");
510 builder.column_mut(2).unwrap().push_int64(200);
511 builder.advance_row();
512
513 let chunk = builder.finish();
514
515 let mock_scan = MockScanOperator {
516 chunks: vec![chunk],
517 position: 0,
518 };
519
520 let mut project = ProjectOperator::select_columns(
522 Box::new(mock_scan),
523 vec![2, 0],
524 vec![LogicalType::Int64, LogicalType::Int64],
525 );
526
527 let result = project.next().unwrap().unwrap();
528
529 assert_eq!(result.column_count(), 2);
530 assert_eq!(result.row_count(), 2);
531
532 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
534 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
535 }
536
537 #[test]
538 fn test_project_constant() {
539 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
540 builder.column_mut(0).unwrap().push_int64(1);
541 builder.advance_row();
542 builder.column_mut(0).unwrap().push_int64(2);
543 builder.advance_row();
544
545 let chunk = builder.finish();
546
547 let mock_scan = MockScanOperator {
548 chunks: vec![chunk],
549 position: 0,
550 };
551
552 let mut project = ProjectOperator::new(
554 Box::new(mock_scan),
555 vec![
556 ProjectExpr::Column(0),
557 ProjectExpr::Constant(Value::String("constant".into())),
558 ],
559 vec![LogicalType::Int64, LogicalType::String],
560 );
561
562 let result = project.next().unwrap().unwrap();
563
564 assert_eq!(result.column_count(), 2);
565 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
566 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
567 }
568
569 #[test]
570 fn test_project_empty_input() {
571 let mock_scan = MockScanOperator {
572 chunks: vec![],
573 position: 0,
574 };
575
576 let mut project =
577 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
578
579 assert!(project.next().unwrap().is_none());
580 }
581
582 #[test]
583 fn test_project_column_not_found() {
584 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
585 builder.column_mut(0).unwrap().push_int64(1);
586 builder.advance_row();
587 let chunk = builder.finish();
588
589 let mock_scan = MockScanOperator {
590 chunks: vec![chunk],
591 position: 0,
592 };
593
594 let mut project = ProjectOperator::new(
596 Box::new(mock_scan),
597 vec![ProjectExpr::Column(5)],
598 vec![LogicalType::Int64],
599 );
600
601 let result = project.next();
602 assert!(result.is_err(), "Should fail with ColumnNotFound");
603 }
604
605 #[test]
606 fn test_project_multiple_constants() {
607 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
608 builder.column_mut(0).unwrap().push_int64(1);
609 builder.advance_row();
610 let chunk = builder.finish();
611
612 let mock_scan = MockScanOperator {
613 chunks: vec![chunk],
614 position: 0,
615 };
616
617 let mut project = ProjectOperator::new(
618 Box::new(mock_scan),
619 vec![
620 ProjectExpr::Constant(Value::Int64(42)),
621 ProjectExpr::Constant(Value::String("fixed".into())),
622 ProjectExpr::Constant(Value::Bool(true)),
623 ],
624 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
625 );
626
627 let result = project.next().unwrap().unwrap();
628 assert_eq!(result.column_count(), 3);
629 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
630 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
631 assert_eq!(
632 result.column(2).unwrap().get_value(0),
633 Some(Value::Bool(true))
634 );
635 }
636
637 #[test]
638 fn test_project_identity() {
639 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
641 builder.column_mut(0).unwrap().push_int64(10);
642 builder.column_mut(1).unwrap().push_string("test");
643 builder.advance_row();
644 let chunk = builder.finish();
645
646 let mock_scan = MockScanOperator {
647 chunks: vec![chunk],
648 position: 0,
649 };
650
651 let mut project = ProjectOperator::select_columns(
652 Box::new(mock_scan),
653 vec![0, 1],
654 vec![LogicalType::Int64, LogicalType::String],
655 );
656
657 let result = project.next().unwrap().unwrap();
658 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
659 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
660 }
661
662 #[test]
663 fn test_project_name() {
664 let mock_scan = MockScanOperator {
665 chunks: vec![],
666 position: 0,
667 };
668 let project =
669 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
670 assert_eq!(project.name(), "Project");
671 }
672
673 #[test]
674 fn test_project_node_resolve() {
675 let store = LpgStore::new().unwrap();
677 let node_id = store.create_node(&["Person"]);
678 store.set_node_property(node_id, "name", Value::String("Alix".into()));
679 store.set_node_property(node_id, "age", Value::Int64(30));
680
681 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
683 builder.column_mut(0).unwrap().push_node_id(node_id);
684 builder.advance_row();
685 let chunk = builder.finish();
686
687 let mock_scan = MockScanOperator {
688 chunks: vec![chunk],
689 position: 0,
690 };
691
692 let mut project = ProjectOperator::with_store(
693 Box::new(mock_scan),
694 vec![ProjectExpr::NodeResolve { column: 0 }],
695 vec![LogicalType::Any],
696 Arc::new(store),
697 );
698
699 let result = project.next().unwrap().unwrap();
700 assert_eq!(result.column_count(), 1);
701
702 let value = result.column(0).unwrap().get_value(0).unwrap();
703 if let Value::Map(map) = value {
704 assert_eq!(
705 map.get(&PropertyKey::new("_id")),
706 Some(&Value::Int64(node_id.as_u64() as i64))
707 );
708 assert!(map.get(&PropertyKey::new("_labels")).is_some());
709 assert_eq!(
710 map.get(&PropertyKey::new("name")),
711 Some(&Value::String("Alix".into()))
712 );
713 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
714 } else {
715 panic!("Expected Value::Map, got {:?}", value);
716 }
717 }
718
719 #[test]
720 fn test_project_edge_resolve() {
721 let store = LpgStore::new().unwrap();
722 let src = store.create_node(&["Person"]);
723 let dst = store.create_node(&["Company"]);
724 let edge_id = store.create_edge(src, dst, "WORKS_AT");
725 store.set_edge_property(edge_id, "since", Value::Int64(2020));
726
727 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
729 builder.column_mut(0).unwrap().push_edge_id(edge_id);
730 builder.advance_row();
731 let chunk = builder.finish();
732
733 let mock_scan = MockScanOperator {
734 chunks: vec![chunk],
735 position: 0,
736 };
737
738 let mut project = ProjectOperator::with_store(
739 Box::new(mock_scan),
740 vec![ProjectExpr::EdgeResolve { column: 0 }],
741 vec![LogicalType::Any],
742 Arc::new(store),
743 );
744
745 let result = project.next().unwrap().unwrap();
746 let value = result.column(0).unwrap().get_value(0).unwrap();
747 if let Value::Map(map) = value {
748 assert_eq!(
749 map.get(&PropertyKey::new("_id")),
750 Some(&Value::Int64(edge_id.as_u64() as i64))
751 );
752 assert_eq!(
753 map.get(&PropertyKey::new("_type")),
754 Some(&Value::String("WORKS_AT".into()))
755 );
756 assert_eq!(
757 map.get(&PropertyKey::new("_source")),
758 Some(&Value::Int64(src.as_u64() as i64))
759 );
760 assert_eq!(
761 map.get(&PropertyKey::new("_target")),
762 Some(&Value::Int64(dst.as_u64() as i64))
763 );
764 assert_eq!(
765 map.get(&PropertyKey::new("since")),
766 Some(&Value::Int64(2020))
767 );
768 } else {
769 panic!("Expected Value::Map, got {:?}", value);
770 }
771 }
772
773 #[test]
774 fn test_project_resolve_missing_entity() {
775 use grafeo_common::types::NodeId;
776
777 let store = LpgStore::new().unwrap();
778
779 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
781 builder
782 .column_mut(0)
783 .unwrap()
784 .push_node_id(NodeId::new(999));
785 builder.advance_row();
786 let chunk = builder.finish();
787
788 let mock_scan = MockScanOperator {
789 chunks: vec![chunk],
790 position: 0,
791 };
792
793 let mut project = ProjectOperator::with_store(
794 Box::new(mock_scan),
795 vec![ProjectExpr::NodeResolve { column: 0 }],
796 vec![LogicalType::Any],
797 Arc::new(store),
798 );
799
800 let result = project.next().unwrap().unwrap();
801 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
802 }
803}