1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47 Coalesce {
49 first: usize,
51 second: usize,
53 },
54}
55
56pub struct ProjectOperator {
58 child: Box<dyn Operator>,
60 projections: Vec<ProjectExpr>,
62 output_types: Vec<LogicalType>,
64 store: Option<Arc<dyn GraphStore>>,
66 transaction_id: Option<TransactionId>,
68 viewing_epoch: Option<EpochId>,
70 session_context: SessionContext,
72}
73
74impl ProjectOperator {
75 pub fn new(
77 child: Box<dyn Operator>,
78 projections: Vec<ProjectExpr>,
79 output_types: Vec<LogicalType>,
80 ) -> Self {
81 assert_eq!(projections.len(), output_types.len());
82 Self {
83 child,
84 projections,
85 output_types,
86 store: None,
87 transaction_id: None,
88 viewing_epoch: None,
89 session_context: SessionContext::default(),
90 }
91 }
92
93 pub fn with_store(
95 child: Box<dyn Operator>,
96 projections: Vec<ProjectExpr>,
97 output_types: Vec<LogicalType>,
98 store: Arc<dyn GraphStore>,
99 ) -> Self {
100 assert_eq!(projections.len(), output_types.len());
101 Self {
102 child,
103 projections,
104 output_types,
105 store: Some(store),
106 transaction_id: None,
107 viewing_epoch: None,
108 session_context: SessionContext::default(),
109 }
110 }
111
112 pub fn with_transaction_context(
114 mut self,
115 epoch: EpochId,
116 transaction_id: Option<TransactionId>,
117 ) -> Self {
118 self.viewing_epoch = Some(epoch);
119 self.transaction_id = transaction_id;
120 self
121 }
122
123 pub fn with_session_context(mut self, context: SessionContext) -> Self {
125 self.session_context = context;
126 self
127 }
128
129 pub fn select_columns(
131 child: Box<dyn Operator>,
132 columns: Vec<usize>,
133 types: Vec<LogicalType>,
134 ) -> Self {
135 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
136 Self::new(child, projections, types)
137 }
138}
139
140impl Operator for ProjectOperator {
141 fn next(&mut self) -> OperatorResult {
142 let Some(input) = self.child.next()? else {
144 return Ok(None);
145 };
146
147 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
149
150 for (i, proj) in self.projections.iter().enumerate() {
152 match proj {
153 ProjectExpr::Column(col_idx) => {
154 let input_col = input.column(*col_idx).ok_or_else(|| {
156 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
157 })?;
158
159 let output_col = output
160 .column_mut(i)
161 .expect("column exists: index matches projection schema");
162
163 for row in input.selected_indices() {
165 if let Some(value) = input_col.get_value(row) {
166 output_col.push_value(value);
167 }
168 }
169 }
170 ProjectExpr::Constant(value) => {
171 let output_col = output
173 .column_mut(i)
174 .expect("column exists: index matches projection schema");
175 for _ in input.selected_indices() {
176 output_col.push_value(value.clone());
177 }
178 }
179 ProjectExpr::PropertyAccess { column, property } => {
180 let input_col = input
182 .column(*column)
183 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
184
185 let output_col = output
186 .column_mut(i)
187 .expect("column exists: index matches projection schema");
188
189 let store = self.store.as_ref().ok_or_else(|| {
190 OperatorError::Execution("Store required for property access".to_string())
191 })?;
192
193 let prop_key = PropertyKey::new(property);
200 let epoch = self.viewing_epoch;
201 let tx_id = self.transaction_id;
202 for row in input.selected_indices() {
203 let value = if let Some(node_id) = input_col.get_node_id(row) {
204 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
205 store.get_node_versioned(node_id, ep, tx)
206 } else if let Some(ep) = epoch {
207 store.get_node_at_epoch(node_id, ep)
208 } else {
209 store.get_node(node_id)
210 };
211 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
212 {
213 prop
214 } else if let Some(edge_id) = input_col.get_edge_id(row) {
215 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
218 store.get_edge_versioned(edge_id, ep, tx)
219 } else if let Some(ep) = epoch {
220 store.get_edge_at_epoch(edge_id, ep)
221 } else {
222 store.get_edge(edge_id)
223 };
224 edge.and_then(|e| e.get_property(property).cloned())
225 .unwrap_or(Value::Null)
226 } else {
227 Value::Null
228 }
229 } else if let Some(edge_id) = input_col.get_edge_id(row) {
230 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
231 store.get_edge_versioned(edge_id, ep, tx)
232 } else if let Some(ep) = epoch {
233 store.get_edge_at_epoch(edge_id, ep)
234 } else {
235 store.get_edge(edge_id)
236 };
237 edge.and_then(|e| e.get_property(property).cloned())
238 .unwrap_or(Value::Null)
239 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
240 map.get(&prop_key).cloned().unwrap_or(Value::Null)
241 } else {
242 Value::Null
243 };
244 output_col.push_value(value);
245 }
246 }
247 ProjectExpr::EdgeType { column } => {
248 let input_col = input
250 .column(*column)
251 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
252
253 let output_col = output
254 .column_mut(i)
255 .expect("column exists: index matches projection schema");
256
257 let store = self.store.as_ref().ok_or_else(|| {
258 OperatorError::Execution("Store required for edge type access".to_string())
259 })?;
260
261 let epoch = self.viewing_epoch;
262 let tx_id = self.transaction_id;
263 for row in input.selected_indices() {
264 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
265 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
266 store.edge_type_versioned(edge_id, ep, tx)
267 } else {
268 store.edge_type(edge_id)
269 };
270 etype.map_or(Value::Null, Value::String)
271 } else {
272 Value::Null
273 };
274 output_col.push_value(value);
275 }
276 }
277 ProjectExpr::Expression {
278 expr,
279 variable_columns,
280 } => {
281 let output_col = output
282 .column_mut(i)
283 .expect("column exists: index matches projection schema");
284
285 let store = self.store.as_ref().ok_or_else(|| {
286 OperatorError::Execution(
287 "Store required for expression evaluation".to_string(),
288 )
289 })?;
290
291 let mut evaluator = ExpressionPredicate::new(
293 expr.clone(),
294 variable_columns.clone(),
295 Arc::clone(store),
296 )
297 .with_session_context(self.session_context.clone());
298 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
299 evaluator = evaluator.with_transaction_context(ep, tx_id);
300 }
301
302 for row in input.selected_indices() {
303 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
304 output_col.push_value(value);
305 }
306 }
307 ProjectExpr::NodeResolve { column } => {
308 let input_col = input
309 .column(*column)
310 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
311
312 let output_col = output
313 .column_mut(i)
314 .expect("column exists: index matches projection schema");
315
316 let store = self.store.as_ref().ok_or_else(|| {
317 OperatorError::Execution("Store required for node resolution".to_string())
318 })?;
319
320 let epoch = self.viewing_epoch;
321 let tx_id = self.transaction_id;
322 for row in input.selected_indices() {
323 let value = if let Some(node_id) = input_col.get_node_id(row) {
324 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
325 store.get_node_versioned(node_id, ep, tx)
326 } else if let Some(ep) = epoch {
327 store.get_node_at_epoch(node_id, ep)
328 } else {
329 store.get_node(node_id)
330 };
331 node.map_or(Value::Null, |n| node_to_map(&n))
332 } else {
333 Value::Null
334 };
335 output_col.push_value(value);
336 }
337 }
338 ProjectExpr::EdgeResolve { column } => {
339 let input_col = input
340 .column(*column)
341 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
342
343 let output_col = output
344 .column_mut(i)
345 .expect("column exists: index matches projection schema");
346
347 let store = self.store.as_ref().ok_or_else(|| {
348 OperatorError::Execution("Store required for edge resolution".to_string())
349 })?;
350
351 let epoch = self.viewing_epoch;
352 let tx_id = self.transaction_id;
353 for row in input.selected_indices() {
354 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
355 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
356 store.get_edge_versioned(edge_id, ep, tx)
357 } else if let Some(ep) = epoch {
358 store.get_edge_at_epoch(edge_id, ep)
359 } else {
360 store.get_edge(edge_id)
361 };
362 edge.map_or(Value::Null, |e| edge_to_map(&e))
363 } else {
364 Value::Null
365 };
366 output_col.push_value(value);
367 }
368 }
369 ProjectExpr::Coalesce { first, second } => {
370 let first_col = input
371 .column(*first)
372 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
373 let second_col = input
374 .column(*second)
375 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
376
377 let output_col = output
378 .column_mut(i)
379 .expect("column exists: index matches projection schema");
380
381 for row in input.selected_indices() {
382 let value = match first_col.get_value(row) {
383 Some(Value::Null) | None => {
384 second_col.get_value(row).unwrap_or(Value::Null)
385 }
386 Some(v) => v,
387 };
388 output_col.push_value(value);
389 }
390 }
391 }
392 }
393
394 output.set_count(input.row_count());
395 Ok(Some(output))
396 }
397
398 fn reset(&mut self) {
399 self.child.reset();
400 }
401
402 fn name(&self) -> &'static str {
403 "Project"
404 }
405}
406
407fn node_to_map(node: &Node) -> Value {
412 let mut map = BTreeMap::new();
413 map.insert(
414 PropertyKey::new("_id"),
415 Value::Int64(node.id.as_u64() as i64),
416 );
417 let labels: Vec<Value> = node
418 .labels
419 .iter()
420 .map(|l| Value::String(l.clone()))
421 .collect();
422 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
423 for (key, value) in &node.properties {
424 map.insert(key.clone(), value.clone());
425 }
426 Value::Map(Arc::new(map))
427}
428
429fn edge_to_map(edge: &Edge) -> Value {
434 let mut map = BTreeMap::new();
435 map.insert(
436 PropertyKey::new("_id"),
437 Value::Int64(edge.id.as_u64() as i64),
438 );
439 map.insert(
440 PropertyKey::new("_type"),
441 Value::String(edge.edge_type.clone()),
442 );
443 map.insert(
444 PropertyKey::new("_source"),
445 Value::Int64(edge.src.as_u64() as i64),
446 );
447 map.insert(
448 PropertyKey::new("_target"),
449 Value::Int64(edge.dst.as_u64() as i64),
450 );
451 for (key, value) in &edge.properties {
452 map.insert(key.clone(), value.clone());
453 }
454 Value::Map(Arc::new(map))
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::execution::chunk::DataChunkBuilder;
461 use crate::graph::lpg::LpgStore;
462 use grafeo_common::types::Value;
463
464 struct MockScanOperator {
465 chunks: Vec<DataChunk>,
466 position: usize,
467 }
468
469 impl Operator for MockScanOperator {
470 fn next(&mut self) -> OperatorResult {
471 if self.position < self.chunks.len() {
472 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
473 self.position += 1;
474 Ok(Some(chunk))
475 } else {
476 Ok(None)
477 }
478 }
479
480 fn reset(&mut self) {
481 self.position = 0;
482 }
483
484 fn name(&self) -> &'static str {
485 "MockScan"
486 }
487 }
488
489 #[test]
490 fn test_project_select_columns() {
491 let mut builder =
493 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
494
495 builder.column_mut(0).unwrap().push_int64(1);
496 builder.column_mut(1).unwrap().push_string("hello");
497 builder.column_mut(2).unwrap().push_int64(100);
498 builder.advance_row();
499
500 builder.column_mut(0).unwrap().push_int64(2);
501 builder.column_mut(1).unwrap().push_string("world");
502 builder.column_mut(2).unwrap().push_int64(200);
503 builder.advance_row();
504
505 let chunk = builder.finish();
506
507 let mock_scan = MockScanOperator {
508 chunks: vec![chunk],
509 position: 0,
510 };
511
512 let mut project = ProjectOperator::select_columns(
514 Box::new(mock_scan),
515 vec![2, 0],
516 vec![LogicalType::Int64, LogicalType::Int64],
517 );
518
519 let result = project.next().unwrap().unwrap();
520
521 assert_eq!(result.column_count(), 2);
522 assert_eq!(result.row_count(), 2);
523
524 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
526 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
527 }
528
529 #[test]
530 fn test_project_constant() {
531 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
532 builder.column_mut(0).unwrap().push_int64(1);
533 builder.advance_row();
534 builder.column_mut(0).unwrap().push_int64(2);
535 builder.advance_row();
536
537 let chunk = builder.finish();
538
539 let mock_scan = MockScanOperator {
540 chunks: vec![chunk],
541 position: 0,
542 };
543
544 let mut project = ProjectOperator::new(
546 Box::new(mock_scan),
547 vec![
548 ProjectExpr::Column(0),
549 ProjectExpr::Constant(Value::String("constant".into())),
550 ],
551 vec![LogicalType::Int64, LogicalType::String],
552 );
553
554 let result = project.next().unwrap().unwrap();
555
556 assert_eq!(result.column_count(), 2);
557 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
558 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
559 }
560
561 #[test]
562 fn test_project_empty_input() {
563 let mock_scan = MockScanOperator {
564 chunks: vec![],
565 position: 0,
566 };
567
568 let mut project =
569 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
570
571 assert!(project.next().unwrap().is_none());
572 }
573
574 #[test]
575 fn test_project_column_not_found() {
576 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
577 builder.column_mut(0).unwrap().push_int64(1);
578 builder.advance_row();
579 let chunk = builder.finish();
580
581 let mock_scan = MockScanOperator {
582 chunks: vec![chunk],
583 position: 0,
584 };
585
586 let mut project = ProjectOperator::new(
588 Box::new(mock_scan),
589 vec![ProjectExpr::Column(5)],
590 vec![LogicalType::Int64],
591 );
592
593 let result = project.next();
594 assert!(result.is_err(), "Should fail with ColumnNotFound");
595 }
596
597 #[test]
598 fn test_project_multiple_constants() {
599 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
600 builder.column_mut(0).unwrap().push_int64(1);
601 builder.advance_row();
602 let chunk = builder.finish();
603
604 let mock_scan = MockScanOperator {
605 chunks: vec![chunk],
606 position: 0,
607 };
608
609 let mut project = ProjectOperator::new(
610 Box::new(mock_scan),
611 vec![
612 ProjectExpr::Constant(Value::Int64(42)),
613 ProjectExpr::Constant(Value::String("fixed".into())),
614 ProjectExpr::Constant(Value::Bool(true)),
615 ],
616 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
617 );
618
619 let result = project.next().unwrap().unwrap();
620 assert_eq!(result.column_count(), 3);
621 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
622 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
623 assert_eq!(
624 result.column(2).unwrap().get_value(0),
625 Some(Value::Bool(true))
626 );
627 }
628
629 #[test]
630 fn test_project_identity() {
631 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
633 builder.column_mut(0).unwrap().push_int64(10);
634 builder.column_mut(1).unwrap().push_string("test");
635 builder.advance_row();
636 let chunk = builder.finish();
637
638 let mock_scan = MockScanOperator {
639 chunks: vec![chunk],
640 position: 0,
641 };
642
643 let mut project = ProjectOperator::select_columns(
644 Box::new(mock_scan),
645 vec![0, 1],
646 vec![LogicalType::Int64, LogicalType::String],
647 );
648
649 let result = project.next().unwrap().unwrap();
650 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
651 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
652 }
653
654 #[test]
655 fn test_project_name() {
656 let mock_scan = MockScanOperator {
657 chunks: vec![],
658 position: 0,
659 };
660 let project =
661 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
662 assert_eq!(project.name(), "Project");
663 }
664
665 #[test]
666 fn test_project_node_resolve() {
667 let store = LpgStore::new().unwrap();
669 let node_id = store.create_node(&["Person"]);
670 store.set_node_property(node_id, "name", Value::String("Alix".into()));
671 store.set_node_property(node_id, "age", Value::Int64(30));
672
673 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
675 builder.column_mut(0).unwrap().push_node_id(node_id);
676 builder.advance_row();
677 let chunk = builder.finish();
678
679 let mock_scan = MockScanOperator {
680 chunks: vec![chunk],
681 position: 0,
682 };
683
684 let mut project = ProjectOperator::with_store(
685 Box::new(mock_scan),
686 vec![ProjectExpr::NodeResolve { column: 0 }],
687 vec![LogicalType::Any],
688 Arc::new(store),
689 );
690
691 let result = project.next().unwrap().unwrap();
692 assert_eq!(result.column_count(), 1);
693
694 let value = result.column(0).unwrap().get_value(0).unwrap();
695 if let Value::Map(map) = value {
696 assert_eq!(
697 map.get(&PropertyKey::new("_id")),
698 Some(&Value::Int64(node_id.as_u64() as i64))
699 );
700 assert!(map.get(&PropertyKey::new("_labels")).is_some());
701 assert_eq!(
702 map.get(&PropertyKey::new("name")),
703 Some(&Value::String("Alix".into()))
704 );
705 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
706 } else {
707 panic!("Expected Value::Map, got {:?}", value);
708 }
709 }
710
711 #[test]
712 fn test_project_edge_resolve() {
713 let store = LpgStore::new().unwrap();
714 let src = store.create_node(&["Person"]);
715 let dst = store.create_node(&["Company"]);
716 let edge_id = store.create_edge(src, dst, "WORKS_AT");
717 store.set_edge_property(edge_id, "since", Value::Int64(2020));
718
719 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
721 builder.column_mut(0).unwrap().push_edge_id(edge_id);
722 builder.advance_row();
723 let chunk = builder.finish();
724
725 let mock_scan = MockScanOperator {
726 chunks: vec![chunk],
727 position: 0,
728 };
729
730 let mut project = ProjectOperator::with_store(
731 Box::new(mock_scan),
732 vec![ProjectExpr::EdgeResolve { column: 0 }],
733 vec![LogicalType::Any],
734 Arc::new(store),
735 );
736
737 let result = project.next().unwrap().unwrap();
738 let value = result.column(0).unwrap().get_value(0).unwrap();
739 if let Value::Map(map) = value {
740 assert_eq!(
741 map.get(&PropertyKey::new("_id")),
742 Some(&Value::Int64(edge_id.as_u64() as i64))
743 );
744 assert_eq!(
745 map.get(&PropertyKey::new("_type")),
746 Some(&Value::String("WORKS_AT".into()))
747 );
748 assert_eq!(
749 map.get(&PropertyKey::new("_source")),
750 Some(&Value::Int64(src.as_u64() as i64))
751 );
752 assert_eq!(
753 map.get(&PropertyKey::new("_target")),
754 Some(&Value::Int64(dst.as_u64() as i64))
755 );
756 assert_eq!(
757 map.get(&PropertyKey::new("since")),
758 Some(&Value::Int64(2020))
759 );
760 } else {
761 panic!("Expected Value::Map, got {:?}", value);
762 }
763 }
764
765 #[test]
766 fn test_project_resolve_missing_entity() {
767 use grafeo_common::types::NodeId;
768
769 let store = LpgStore::new().unwrap();
770
771 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
773 builder
774 .column_mut(0)
775 .unwrap()
776 .push_node_id(NodeId::new(999));
777 builder.advance_row();
778 let chunk = builder.finish();
779
780 let mock_scan = MockScanOperator {
781 chunks: vec![chunk],
782 position: 0,
783 };
784
785 let mut project = ProjectOperator::with_store(
786 Box::new(mock_scan),
787 vec![ProjectExpr::NodeResolve { column: 0 }],
788 vec![LogicalType::Any],
789 Arc::new(store),
790 );
791
792 let result = project.next().unwrap().unwrap();
793 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
794 }
795}