1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47}
48
49pub struct ProjectOperator {
51 child: Box<dyn Operator>,
53 projections: Vec<ProjectExpr>,
55 output_types: Vec<LogicalType>,
57 store: Option<Arc<dyn GraphStore>>,
59 transaction_id: Option<TransactionId>,
61 viewing_epoch: Option<EpochId>,
63 session_context: SessionContext,
65}
66
67impl ProjectOperator {
68 pub fn new(
70 child: Box<dyn Operator>,
71 projections: Vec<ProjectExpr>,
72 output_types: Vec<LogicalType>,
73 ) -> Self {
74 assert_eq!(projections.len(), output_types.len());
75 Self {
76 child,
77 projections,
78 output_types,
79 store: None,
80 transaction_id: None,
81 viewing_epoch: None,
82 session_context: SessionContext::default(),
83 }
84 }
85
86 pub fn with_store(
88 child: Box<dyn Operator>,
89 projections: Vec<ProjectExpr>,
90 output_types: Vec<LogicalType>,
91 store: Arc<dyn GraphStore>,
92 ) -> Self {
93 assert_eq!(projections.len(), output_types.len());
94 Self {
95 child,
96 projections,
97 output_types,
98 store: Some(store),
99 transaction_id: None,
100 viewing_epoch: None,
101 session_context: SessionContext::default(),
102 }
103 }
104
105 pub fn with_transaction_context(
107 mut self,
108 epoch: EpochId,
109 transaction_id: Option<TransactionId>,
110 ) -> Self {
111 self.viewing_epoch = Some(epoch);
112 self.transaction_id = transaction_id;
113 self
114 }
115
116 pub fn with_session_context(mut self, context: SessionContext) -> Self {
118 self.session_context = context;
119 self
120 }
121
122 pub fn select_columns(
124 child: Box<dyn Operator>,
125 columns: Vec<usize>,
126 types: Vec<LogicalType>,
127 ) -> Self {
128 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
129 Self::new(child, projections, types)
130 }
131}
132
133impl Operator for ProjectOperator {
134 fn next(&mut self) -> OperatorResult {
135 let Some(input) = self.child.next()? else {
137 return Ok(None);
138 };
139
140 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
142
143 for (i, proj) in self.projections.iter().enumerate() {
145 match proj {
146 ProjectExpr::Column(col_idx) => {
147 let input_col = input.column(*col_idx).ok_or_else(|| {
149 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
150 })?;
151
152 let output_col = output
153 .column_mut(i)
154 .expect("column exists: index matches projection schema");
155
156 for row in input.selected_indices() {
158 if let Some(value) = input_col.get_value(row) {
159 output_col.push_value(value);
160 }
161 }
162 }
163 ProjectExpr::Constant(value) => {
164 let output_col = output
166 .column_mut(i)
167 .expect("column exists: index matches projection schema");
168 for _ in input.selected_indices() {
169 output_col.push_value(value.clone());
170 }
171 }
172 ProjectExpr::PropertyAccess { column, property } => {
173 let input_col = input
175 .column(*column)
176 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178 let output_col = output
179 .column_mut(i)
180 .expect("column exists: index matches projection schema");
181
182 let store = self.store.as_ref().ok_or_else(|| {
183 OperatorError::Execution("Store required for property access".to_string())
184 })?;
185
186 let prop_key = PropertyKey::new(property);
193 let epoch = self.viewing_epoch;
194 let tx_id = self.transaction_id;
195 for row in input.selected_indices() {
196 let value = if let Some(node_id) = input_col.get_node_id(row) {
197 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
198 store.get_node_versioned(node_id, ep, tx)
199 } else if let Some(ep) = epoch {
200 store.get_node_at_epoch(node_id, ep)
201 } else {
202 store.get_node(node_id)
203 };
204 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
205 {
206 prop
207 } else if let Some(edge_id) = input_col.get_edge_id(row) {
208 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
211 store.get_edge_versioned(edge_id, ep, tx)
212 } else if let Some(ep) = epoch {
213 store.get_edge_at_epoch(edge_id, ep)
214 } else {
215 store.get_edge(edge_id)
216 };
217 edge.and_then(|e| e.get_property(property).cloned())
218 .unwrap_or(Value::Null)
219 } else {
220 Value::Null
221 }
222 } else if let Some(edge_id) = input_col.get_edge_id(row) {
223 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
224 store.get_edge_versioned(edge_id, ep, tx)
225 } else if let Some(ep) = epoch {
226 store.get_edge_at_epoch(edge_id, ep)
227 } else {
228 store.get_edge(edge_id)
229 };
230 edge.and_then(|e| e.get_property(property).cloned())
231 .unwrap_or(Value::Null)
232 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
233 map.get(&prop_key).cloned().unwrap_or(Value::Null)
234 } else {
235 Value::Null
236 };
237 output_col.push_value(value);
238 }
239 }
240 ProjectExpr::EdgeType { column } => {
241 let input_col = input
243 .column(*column)
244 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
245
246 let output_col = output
247 .column_mut(i)
248 .expect("column exists: index matches projection schema");
249
250 let store = self.store.as_ref().ok_or_else(|| {
251 OperatorError::Execution("Store required for edge type access".to_string())
252 })?;
253
254 let epoch = self.viewing_epoch;
255 let tx_id = self.transaction_id;
256 for row in input.selected_indices() {
257 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
258 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
259 store.edge_type_versioned(edge_id, ep, tx)
260 } else {
261 store.edge_type(edge_id)
262 };
263 etype.map_or(Value::Null, Value::String)
264 } else {
265 Value::Null
266 };
267 output_col.push_value(value);
268 }
269 }
270 ProjectExpr::Expression {
271 expr,
272 variable_columns,
273 } => {
274 let output_col = output
275 .column_mut(i)
276 .expect("column exists: index matches projection schema");
277
278 let store = self.store.as_ref().ok_or_else(|| {
279 OperatorError::Execution(
280 "Store required for expression evaluation".to_string(),
281 )
282 })?;
283
284 let mut evaluator = ExpressionPredicate::new(
286 expr.clone(),
287 variable_columns.clone(),
288 Arc::clone(store),
289 )
290 .with_session_context(self.session_context.clone());
291 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
292 evaluator = evaluator.with_transaction_context(ep, tx_id);
293 }
294
295 for row in input.selected_indices() {
296 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
297 output_col.push_value(value);
298 }
299 }
300 ProjectExpr::NodeResolve { column } => {
301 let input_col = input
302 .column(*column)
303 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
304
305 let output_col = output
306 .column_mut(i)
307 .expect("column exists: index matches projection schema");
308
309 let store = self.store.as_ref().ok_or_else(|| {
310 OperatorError::Execution("Store required for node resolution".to_string())
311 })?;
312
313 let epoch = self.viewing_epoch;
314 let tx_id = self.transaction_id;
315 for row in input.selected_indices() {
316 let value = if let Some(node_id) = input_col.get_node_id(row) {
317 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
318 store.get_node_versioned(node_id, ep, tx)
319 } else if let Some(ep) = epoch {
320 store.get_node_at_epoch(node_id, ep)
321 } else {
322 store.get_node(node_id)
323 };
324 node.map_or(Value::Null, |n| node_to_map(&n))
325 } else {
326 Value::Null
327 };
328 output_col.push_value(value);
329 }
330 }
331 ProjectExpr::EdgeResolve { column } => {
332 let input_col = input
333 .column(*column)
334 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
335
336 let output_col = output
337 .column_mut(i)
338 .expect("column exists: index matches projection schema");
339
340 let store = self.store.as_ref().ok_or_else(|| {
341 OperatorError::Execution("Store required for edge resolution".to_string())
342 })?;
343
344 let epoch = self.viewing_epoch;
345 let tx_id = self.transaction_id;
346 for row in input.selected_indices() {
347 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
348 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
349 store.get_edge_versioned(edge_id, ep, tx)
350 } else if let Some(ep) = epoch {
351 store.get_edge_at_epoch(edge_id, ep)
352 } else {
353 store.get_edge(edge_id)
354 };
355 edge.map_or(Value::Null, |e| edge_to_map(&e))
356 } else {
357 Value::Null
358 };
359 output_col.push_value(value);
360 }
361 }
362 }
363 }
364
365 output.set_count(input.row_count());
366 Ok(Some(output))
367 }
368
369 fn reset(&mut self) {
370 self.child.reset();
371 }
372
373 fn name(&self) -> &'static str {
374 "Project"
375 }
376}
377
378fn node_to_map(node: &Node) -> Value {
383 let mut map = BTreeMap::new();
384 map.insert(
385 PropertyKey::new("_id"),
386 Value::Int64(node.id.as_u64() as i64),
387 );
388 let labels: Vec<Value> = node
389 .labels
390 .iter()
391 .map(|l| Value::String(l.clone()))
392 .collect();
393 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
394 for (key, value) in &node.properties {
395 map.insert(key.clone(), value.clone());
396 }
397 Value::Map(Arc::new(map))
398}
399
400fn edge_to_map(edge: &Edge) -> Value {
405 let mut map = BTreeMap::new();
406 map.insert(
407 PropertyKey::new("_id"),
408 Value::Int64(edge.id.as_u64() as i64),
409 );
410 map.insert(
411 PropertyKey::new("_type"),
412 Value::String(edge.edge_type.clone()),
413 );
414 map.insert(
415 PropertyKey::new("_source"),
416 Value::Int64(edge.src.as_u64() as i64),
417 );
418 map.insert(
419 PropertyKey::new("_target"),
420 Value::Int64(edge.dst.as_u64() as i64),
421 );
422 for (key, value) in &edge.properties {
423 map.insert(key.clone(), value.clone());
424 }
425 Value::Map(Arc::new(map))
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::execution::chunk::DataChunkBuilder;
432 use crate::graph::lpg::LpgStore;
433 use grafeo_common::types::Value;
434
435 struct MockScanOperator {
436 chunks: Vec<DataChunk>,
437 position: usize,
438 }
439
440 impl Operator for MockScanOperator {
441 fn next(&mut self) -> OperatorResult {
442 if self.position < self.chunks.len() {
443 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
444 self.position += 1;
445 Ok(Some(chunk))
446 } else {
447 Ok(None)
448 }
449 }
450
451 fn reset(&mut self) {
452 self.position = 0;
453 }
454
455 fn name(&self) -> &'static str {
456 "MockScan"
457 }
458 }
459
460 #[test]
461 fn test_project_select_columns() {
462 let mut builder =
464 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
465
466 builder.column_mut(0).unwrap().push_int64(1);
467 builder.column_mut(1).unwrap().push_string("hello");
468 builder.column_mut(2).unwrap().push_int64(100);
469 builder.advance_row();
470
471 builder.column_mut(0).unwrap().push_int64(2);
472 builder.column_mut(1).unwrap().push_string("world");
473 builder.column_mut(2).unwrap().push_int64(200);
474 builder.advance_row();
475
476 let chunk = builder.finish();
477
478 let mock_scan = MockScanOperator {
479 chunks: vec![chunk],
480 position: 0,
481 };
482
483 let mut project = ProjectOperator::select_columns(
485 Box::new(mock_scan),
486 vec![2, 0],
487 vec![LogicalType::Int64, LogicalType::Int64],
488 );
489
490 let result = project.next().unwrap().unwrap();
491
492 assert_eq!(result.column_count(), 2);
493 assert_eq!(result.row_count(), 2);
494
495 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
497 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
498 }
499
500 #[test]
501 fn test_project_constant() {
502 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
503 builder.column_mut(0).unwrap().push_int64(1);
504 builder.advance_row();
505 builder.column_mut(0).unwrap().push_int64(2);
506 builder.advance_row();
507
508 let chunk = builder.finish();
509
510 let mock_scan = MockScanOperator {
511 chunks: vec![chunk],
512 position: 0,
513 };
514
515 let mut project = ProjectOperator::new(
517 Box::new(mock_scan),
518 vec![
519 ProjectExpr::Column(0),
520 ProjectExpr::Constant(Value::String("constant".into())),
521 ],
522 vec![LogicalType::Int64, LogicalType::String],
523 );
524
525 let result = project.next().unwrap().unwrap();
526
527 assert_eq!(result.column_count(), 2);
528 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
529 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
530 }
531
532 #[test]
533 fn test_project_empty_input() {
534 let mock_scan = MockScanOperator {
535 chunks: vec![],
536 position: 0,
537 };
538
539 let mut project =
540 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
541
542 assert!(project.next().unwrap().is_none());
543 }
544
545 #[test]
546 fn test_project_column_not_found() {
547 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
548 builder.column_mut(0).unwrap().push_int64(1);
549 builder.advance_row();
550 let chunk = builder.finish();
551
552 let mock_scan = MockScanOperator {
553 chunks: vec![chunk],
554 position: 0,
555 };
556
557 let mut project = ProjectOperator::new(
559 Box::new(mock_scan),
560 vec![ProjectExpr::Column(5)],
561 vec![LogicalType::Int64],
562 );
563
564 let result = project.next();
565 assert!(result.is_err(), "Should fail with ColumnNotFound");
566 }
567
568 #[test]
569 fn test_project_multiple_constants() {
570 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
571 builder.column_mut(0).unwrap().push_int64(1);
572 builder.advance_row();
573 let chunk = builder.finish();
574
575 let mock_scan = MockScanOperator {
576 chunks: vec![chunk],
577 position: 0,
578 };
579
580 let mut project = ProjectOperator::new(
581 Box::new(mock_scan),
582 vec![
583 ProjectExpr::Constant(Value::Int64(42)),
584 ProjectExpr::Constant(Value::String("fixed".into())),
585 ProjectExpr::Constant(Value::Bool(true)),
586 ],
587 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
588 );
589
590 let result = project.next().unwrap().unwrap();
591 assert_eq!(result.column_count(), 3);
592 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
593 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
594 assert_eq!(
595 result.column(2).unwrap().get_value(0),
596 Some(Value::Bool(true))
597 );
598 }
599
600 #[test]
601 fn test_project_identity() {
602 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
604 builder.column_mut(0).unwrap().push_int64(10);
605 builder.column_mut(1).unwrap().push_string("test");
606 builder.advance_row();
607 let chunk = builder.finish();
608
609 let mock_scan = MockScanOperator {
610 chunks: vec![chunk],
611 position: 0,
612 };
613
614 let mut project = ProjectOperator::select_columns(
615 Box::new(mock_scan),
616 vec![0, 1],
617 vec![LogicalType::Int64, LogicalType::String],
618 );
619
620 let result = project.next().unwrap().unwrap();
621 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
622 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
623 }
624
625 #[test]
626 fn test_project_name() {
627 let mock_scan = MockScanOperator {
628 chunks: vec![],
629 position: 0,
630 };
631 let project =
632 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
633 assert_eq!(project.name(), "Project");
634 }
635
636 #[test]
637 fn test_project_node_resolve() {
638 let store = LpgStore::new().unwrap();
640 let node_id = store.create_node(&["Person"]);
641 store.set_node_property(node_id, "name", Value::String("Alix".into()));
642 store.set_node_property(node_id, "age", Value::Int64(30));
643
644 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
646 builder.column_mut(0).unwrap().push_node_id(node_id);
647 builder.advance_row();
648 let chunk = builder.finish();
649
650 let mock_scan = MockScanOperator {
651 chunks: vec![chunk],
652 position: 0,
653 };
654
655 let mut project = ProjectOperator::with_store(
656 Box::new(mock_scan),
657 vec![ProjectExpr::NodeResolve { column: 0 }],
658 vec![LogicalType::Any],
659 Arc::new(store),
660 );
661
662 let result = project.next().unwrap().unwrap();
663 assert_eq!(result.column_count(), 1);
664
665 let value = result.column(0).unwrap().get_value(0).unwrap();
666 if let Value::Map(map) = value {
667 assert_eq!(
668 map.get(&PropertyKey::new("_id")),
669 Some(&Value::Int64(node_id.as_u64() as i64))
670 );
671 assert!(map.get(&PropertyKey::new("_labels")).is_some());
672 assert_eq!(
673 map.get(&PropertyKey::new("name")),
674 Some(&Value::String("Alix".into()))
675 );
676 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
677 } else {
678 panic!("Expected Value::Map, got {:?}", value);
679 }
680 }
681
682 #[test]
683 fn test_project_edge_resolve() {
684 let store = LpgStore::new().unwrap();
685 let src = store.create_node(&["Person"]);
686 let dst = store.create_node(&["Company"]);
687 let edge_id = store.create_edge(src, dst, "WORKS_AT");
688 store.set_edge_property(edge_id, "since", Value::Int64(2020));
689
690 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
692 builder.column_mut(0).unwrap().push_edge_id(edge_id);
693 builder.advance_row();
694 let chunk = builder.finish();
695
696 let mock_scan = MockScanOperator {
697 chunks: vec![chunk],
698 position: 0,
699 };
700
701 let mut project = ProjectOperator::with_store(
702 Box::new(mock_scan),
703 vec![ProjectExpr::EdgeResolve { column: 0 }],
704 vec![LogicalType::Any],
705 Arc::new(store),
706 );
707
708 let result = project.next().unwrap().unwrap();
709 let value = result.column(0).unwrap().get_value(0).unwrap();
710 if let Value::Map(map) = value {
711 assert_eq!(
712 map.get(&PropertyKey::new("_id")),
713 Some(&Value::Int64(edge_id.as_u64() as i64))
714 );
715 assert_eq!(
716 map.get(&PropertyKey::new("_type")),
717 Some(&Value::String("WORKS_AT".into()))
718 );
719 assert_eq!(
720 map.get(&PropertyKey::new("_source")),
721 Some(&Value::Int64(src.as_u64() as i64))
722 );
723 assert_eq!(
724 map.get(&PropertyKey::new("_target")),
725 Some(&Value::Int64(dst.as_u64() as i64))
726 );
727 assert_eq!(
728 map.get(&PropertyKey::new("since")),
729 Some(&Value::Int64(2020))
730 );
731 } else {
732 panic!("Expected Value::Map, got {:?}", value);
733 }
734 }
735
736 #[test]
737 fn test_project_resolve_missing_entity() {
738 use grafeo_common::types::NodeId;
739
740 let store = LpgStore::new().unwrap();
741
742 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
744 builder
745 .column_mut(0)
746 .unwrap()
747 .push_node_id(NodeId::new(999));
748 builder.advance_row();
749 let chunk = builder.finish();
750
751 let mock_scan = MockScanOperator {
752 chunks: vec![chunk],
753 position: 0,
754 };
755
756 let mut project = ProjectOperator::with_store(
757 Box::new(mock_scan),
758 vec![ProjectExpr::NodeResolve { column: 0 }],
759 vec![LogicalType::Any],
760 Arc::new(store),
761 );
762
763 let result = project.next().unwrap().unwrap();
764 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
765 }
766}