1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47}
48
49pub struct ProjectOperator {
51 child: Box<dyn Operator>,
53 projections: Vec<ProjectExpr>,
55 output_types: Vec<LogicalType>,
57 store: Option<Arc<dyn GraphStore>>,
59 transaction_id: Option<TransactionId>,
61 viewing_epoch: Option<EpochId>,
63 session_context: SessionContext,
65}
66
67impl ProjectOperator {
68 pub fn new(
70 child: Box<dyn Operator>,
71 projections: Vec<ProjectExpr>,
72 output_types: Vec<LogicalType>,
73 ) -> Self {
74 assert_eq!(projections.len(), output_types.len());
75 Self {
76 child,
77 projections,
78 output_types,
79 store: None,
80 transaction_id: None,
81 viewing_epoch: None,
82 session_context: SessionContext::default(),
83 }
84 }
85
86 pub fn with_store(
88 child: Box<dyn Operator>,
89 projections: Vec<ProjectExpr>,
90 output_types: Vec<LogicalType>,
91 store: Arc<dyn GraphStore>,
92 ) -> Self {
93 assert_eq!(projections.len(), output_types.len());
94 Self {
95 child,
96 projections,
97 output_types,
98 store: Some(store),
99 transaction_id: None,
100 viewing_epoch: None,
101 session_context: SessionContext::default(),
102 }
103 }
104
105 pub fn with_transaction_context(
107 mut self,
108 epoch: EpochId,
109 transaction_id: Option<TransactionId>,
110 ) -> Self {
111 self.viewing_epoch = Some(epoch);
112 self.transaction_id = transaction_id;
113 self
114 }
115
116 pub fn with_session_context(mut self, context: SessionContext) -> Self {
118 self.session_context = context;
119 self
120 }
121
122 pub fn select_columns(
124 child: Box<dyn Operator>,
125 columns: Vec<usize>,
126 types: Vec<LogicalType>,
127 ) -> Self {
128 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
129 Self::new(child, projections, types)
130 }
131}
132
133impl Operator for ProjectOperator {
134 fn next(&mut self) -> OperatorResult {
135 let Some(input) = self.child.next()? else {
137 return Ok(None);
138 };
139
140 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
142
143 for (i, proj) in self.projections.iter().enumerate() {
145 match proj {
146 ProjectExpr::Column(col_idx) => {
147 let input_col = input.column(*col_idx).ok_or_else(|| {
149 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
150 })?;
151
152 let output_col = output
153 .column_mut(i)
154 .expect("column exists: index matches projection schema");
155
156 for row in input.selected_indices() {
158 if let Some(value) = input_col.get_value(row) {
159 output_col.push_value(value);
160 }
161 }
162 }
163 ProjectExpr::Constant(value) => {
164 let output_col = output
166 .column_mut(i)
167 .expect("column exists: index matches projection schema");
168 for _ in input.selected_indices() {
169 output_col.push_value(value.clone());
170 }
171 }
172 ProjectExpr::PropertyAccess { column, property } => {
173 let input_col = input
175 .column(*column)
176 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178 let output_col = output
179 .column_mut(i)
180 .expect("column exists: index matches projection schema");
181
182 let store = self.store.as_ref().ok_or_else(|| {
183 OperatorError::Execution("Store required for property access".to_string())
184 })?;
185
186 let prop_key = PropertyKey::new(property);
193 let epoch = self.viewing_epoch;
194 let tx_id = self.transaction_id;
195 for row in input.selected_indices() {
196 let value = if let Some(node_id) = input_col.get_node_id(row) {
197 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
198 store.get_node_versioned(node_id, ep, tx)
199 } else {
200 store.get_node(node_id)
201 };
202 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
203 {
204 prop
205 } else if let Some(edge_id) = input_col.get_edge_id(row) {
206 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
209 store.get_edge_versioned(edge_id, ep, tx)
210 } else {
211 store.get_edge(edge_id)
212 };
213 edge.and_then(|e| e.get_property(property).cloned())
214 .unwrap_or(Value::Null)
215 } else {
216 Value::Null
217 }
218 } else if let Some(edge_id) = input_col.get_edge_id(row) {
219 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
220 store.get_edge_versioned(edge_id, ep, tx)
221 } else {
222 store.get_edge(edge_id)
223 };
224 edge.and_then(|e| e.get_property(property).cloned())
225 .unwrap_or(Value::Null)
226 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
227 map.get(&prop_key).cloned().unwrap_or(Value::Null)
228 } else {
229 Value::Null
230 };
231 output_col.push_value(value);
232 }
233 }
234 ProjectExpr::EdgeType { column } => {
235 let input_col = input
237 .column(*column)
238 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
239
240 let output_col = output
241 .column_mut(i)
242 .expect("column exists: index matches projection schema");
243
244 let store = self.store.as_ref().ok_or_else(|| {
245 OperatorError::Execution("Store required for edge type access".to_string())
246 })?;
247
248 let epoch = self.viewing_epoch;
249 let tx_id = self.transaction_id;
250 for row in input.selected_indices() {
251 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
252 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
253 store.edge_type_versioned(edge_id, ep, tx)
254 } else {
255 store.edge_type(edge_id)
256 };
257 etype.map_or(Value::Null, Value::String)
258 } else {
259 Value::Null
260 };
261 output_col.push_value(value);
262 }
263 }
264 ProjectExpr::Expression {
265 expr,
266 variable_columns,
267 } => {
268 let output_col = output
269 .column_mut(i)
270 .expect("column exists: index matches projection schema");
271
272 let store = self.store.as_ref().ok_or_else(|| {
273 OperatorError::Execution(
274 "Store required for expression evaluation".to_string(),
275 )
276 })?;
277
278 let mut evaluator = ExpressionPredicate::new(
280 expr.clone(),
281 variable_columns.clone(),
282 Arc::clone(store),
283 )
284 .with_session_context(self.session_context.clone());
285 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
286 evaluator = evaluator.with_transaction_context(ep, tx_id);
287 }
288
289 for row in input.selected_indices() {
290 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
291 output_col.push_value(value);
292 }
293 }
294 ProjectExpr::NodeResolve { column } => {
295 let input_col = input
296 .column(*column)
297 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
298
299 let output_col = output
300 .column_mut(i)
301 .expect("column exists: index matches projection schema");
302
303 let store = self.store.as_ref().ok_or_else(|| {
304 OperatorError::Execution("Store required for node resolution".to_string())
305 })?;
306
307 let epoch = self.viewing_epoch;
308 let tx_id = self.transaction_id;
309 for row in input.selected_indices() {
310 let value = if let Some(node_id) = input_col.get_node_id(row) {
311 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
312 store.get_node_versioned(node_id, ep, tx)
313 } else {
314 store.get_node(node_id)
315 };
316 node.map_or(Value::Null, |n| node_to_map(&n))
317 } else {
318 Value::Null
319 };
320 output_col.push_value(value);
321 }
322 }
323 ProjectExpr::EdgeResolve { column } => {
324 let input_col = input
325 .column(*column)
326 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
327
328 let output_col = output
329 .column_mut(i)
330 .expect("column exists: index matches projection schema");
331
332 let store = self.store.as_ref().ok_or_else(|| {
333 OperatorError::Execution("Store required for edge resolution".to_string())
334 })?;
335
336 let epoch = self.viewing_epoch;
337 let tx_id = self.transaction_id;
338 for row in input.selected_indices() {
339 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
340 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
341 store.get_edge_versioned(edge_id, ep, tx)
342 } else {
343 store.get_edge(edge_id)
344 };
345 edge.map_or(Value::Null, |e| edge_to_map(&e))
346 } else {
347 Value::Null
348 };
349 output_col.push_value(value);
350 }
351 }
352 }
353 }
354
355 output.set_count(input.row_count());
356 Ok(Some(output))
357 }
358
359 fn reset(&mut self) {
360 self.child.reset();
361 }
362
363 fn name(&self) -> &'static str {
364 "Project"
365 }
366}
367
368fn node_to_map(node: &Node) -> Value {
373 let mut map = BTreeMap::new();
374 map.insert(
375 PropertyKey::new("_id"),
376 Value::Int64(node.id.as_u64() as i64),
377 );
378 let labels: Vec<Value> = node
379 .labels
380 .iter()
381 .map(|l| Value::String(l.clone()))
382 .collect();
383 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
384 for (key, value) in &node.properties {
385 map.insert(key.clone(), value.clone());
386 }
387 Value::Map(Arc::new(map))
388}
389
390fn edge_to_map(edge: &Edge) -> Value {
395 let mut map = BTreeMap::new();
396 map.insert(
397 PropertyKey::new("_id"),
398 Value::Int64(edge.id.as_u64() as i64),
399 );
400 map.insert(
401 PropertyKey::new("_type"),
402 Value::String(edge.edge_type.clone()),
403 );
404 map.insert(
405 PropertyKey::new("_source"),
406 Value::Int64(edge.src.as_u64() as i64),
407 );
408 map.insert(
409 PropertyKey::new("_target"),
410 Value::Int64(edge.dst.as_u64() as i64),
411 );
412 for (key, value) in &edge.properties {
413 map.insert(key.clone(), value.clone());
414 }
415 Value::Map(Arc::new(map))
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::execution::chunk::DataChunkBuilder;
422 use crate::graph::lpg::LpgStore;
423 use grafeo_common::types::Value;
424
425 struct MockScanOperator {
426 chunks: Vec<DataChunk>,
427 position: usize,
428 }
429
430 impl Operator for MockScanOperator {
431 fn next(&mut self) -> OperatorResult {
432 if self.position < self.chunks.len() {
433 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
434 self.position += 1;
435 Ok(Some(chunk))
436 } else {
437 Ok(None)
438 }
439 }
440
441 fn reset(&mut self) {
442 self.position = 0;
443 }
444
445 fn name(&self) -> &'static str {
446 "MockScan"
447 }
448 }
449
450 #[test]
451 fn test_project_select_columns() {
452 let mut builder =
454 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
455
456 builder.column_mut(0).unwrap().push_int64(1);
457 builder.column_mut(1).unwrap().push_string("hello");
458 builder.column_mut(2).unwrap().push_int64(100);
459 builder.advance_row();
460
461 builder.column_mut(0).unwrap().push_int64(2);
462 builder.column_mut(1).unwrap().push_string("world");
463 builder.column_mut(2).unwrap().push_int64(200);
464 builder.advance_row();
465
466 let chunk = builder.finish();
467
468 let mock_scan = MockScanOperator {
469 chunks: vec![chunk],
470 position: 0,
471 };
472
473 let mut project = ProjectOperator::select_columns(
475 Box::new(mock_scan),
476 vec![2, 0],
477 vec![LogicalType::Int64, LogicalType::Int64],
478 );
479
480 let result = project.next().unwrap().unwrap();
481
482 assert_eq!(result.column_count(), 2);
483 assert_eq!(result.row_count(), 2);
484
485 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
487 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
488 }
489
490 #[test]
491 fn test_project_constant() {
492 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
493 builder.column_mut(0).unwrap().push_int64(1);
494 builder.advance_row();
495 builder.column_mut(0).unwrap().push_int64(2);
496 builder.advance_row();
497
498 let chunk = builder.finish();
499
500 let mock_scan = MockScanOperator {
501 chunks: vec![chunk],
502 position: 0,
503 };
504
505 let mut project = ProjectOperator::new(
507 Box::new(mock_scan),
508 vec![
509 ProjectExpr::Column(0),
510 ProjectExpr::Constant(Value::String("constant".into())),
511 ],
512 vec![LogicalType::Int64, LogicalType::String],
513 );
514
515 let result = project.next().unwrap().unwrap();
516
517 assert_eq!(result.column_count(), 2);
518 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
519 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
520 }
521
522 #[test]
523 fn test_project_empty_input() {
524 let mock_scan = MockScanOperator {
525 chunks: vec![],
526 position: 0,
527 };
528
529 let mut project =
530 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
531
532 assert!(project.next().unwrap().is_none());
533 }
534
535 #[test]
536 fn test_project_column_not_found() {
537 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
538 builder.column_mut(0).unwrap().push_int64(1);
539 builder.advance_row();
540 let chunk = builder.finish();
541
542 let mock_scan = MockScanOperator {
543 chunks: vec![chunk],
544 position: 0,
545 };
546
547 let mut project = ProjectOperator::new(
549 Box::new(mock_scan),
550 vec![ProjectExpr::Column(5)],
551 vec![LogicalType::Int64],
552 );
553
554 let result = project.next();
555 assert!(result.is_err(), "Should fail with ColumnNotFound");
556 }
557
558 #[test]
559 fn test_project_multiple_constants() {
560 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
561 builder.column_mut(0).unwrap().push_int64(1);
562 builder.advance_row();
563 let chunk = builder.finish();
564
565 let mock_scan = MockScanOperator {
566 chunks: vec![chunk],
567 position: 0,
568 };
569
570 let mut project = ProjectOperator::new(
571 Box::new(mock_scan),
572 vec![
573 ProjectExpr::Constant(Value::Int64(42)),
574 ProjectExpr::Constant(Value::String("fixed".into())),
575 ProjectExpr::Constant(Value::Bool(true)),
576 ],
577 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
578 );
579
580 let result = project.next().unwrap().unwrap();
581 assert_eq!(result.column_count(), 3);
582 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
583 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
584 assert_eq!(
585 result.column(2).unwrap().get_value(0),
586 Some(Value::Bool(true))
587 );
588 }
589
590 #[test]
591 fn test_project_identity() {
592 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
594 builder.column_mut(0).unwrap().push_int64(10);
595 builder.column_mut(1).unwrap().push_string("test");
596 builder.advance_row();
597 let chunk = builder.finish();
598
599 let mock_scan = MockScanOperator {
600 chunks: vec![chunk],
601 position: 0,
602 };
603
604 let mut project = ProjectOperator::select_columns(
605 Box::new(mock_scan),
606 vec![0, 1],
607 vec![LogicalType::Int64, LogicalType::String],
608 );
609
610 let result = project.next().unwrap().unwrap();
611 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
612 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
613 }
614
615 #[test]
616 fn test_project_name() {
617 let mock_scan = MockScanOperator {
618 chunks: vec![],
619 position: 0,
620 };
621 let project =
622 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
623 assert_eq!(project.name(), "Project");
624 }
625
626 #[test]
627 fn test_project_node_resolve() {
628 let store = LpgStore::new().unwrap();
630 let node_id = store.create_node(&["Person"]);
631 store.set_node_property(node_id, "name", Value::String("Alix".into()));
632 store.set_node_property(node_id, "age", Value::Int64(30));
633
634 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
636 builder.column_mut(0).unwrap().push_node_id(node_id);
637 builder.advance_row();
638 let chunk = builder.finish();
639
640 let mock_scan = MockScanOperator {
641 chunks: vec![chunk],
642 position: 0,
643 };
644
645 let mut project = ProjectOperator::with_store(
646 Box::new(mock_scan),
647 vec![ProjectExpr::NodeResolve { column: 0 }],
648 vec![LogicalType::Any],
649 Arc::new(store),
650 );
651
652 let result = project.next().unwrap().unwrap();
653 assert_eq!(result.column_count(), 1);
654
655 let value = result.column(0).unwrap().get_value(0).unwrap();
656 if let Value::Map(map) = value {
657 assert_eq!(
658 map.get(&PropertyKey::new("_id")),
659 Some(&Value::Int64(node_id.as_u64() as i64))
660 );
661 assert!(map.get(&PropertyKey::new("_labels")).is_some());
662 assert_eq!(
663 map.get(&PropertyKey::new("name")),
664 Some(&Value::String("Alix".into()))
665 );
666 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
667 } else {
668 panic!("Expected Value::Map, got {:?}", value);
669 }
670 }
671
672 #[test]
673 fn test_project_edge_resolve() {
674 let store = LpgStore::new().unwrap();
675 let src = store.create_node(&["Person"]);
676 let dst = store.create_node(&["Company"]);
677 let edge_id = store.create_edge(src, dst, "WORKS_AT");
678 store.set_edge_property(edge_id, "since", Value::Int64(2020));
679
680 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
682 builder.column_mut(0).unwrap().push_edge_id(edge_id);
683 builder.advance_row();
684 let chunk = builder.finish();
685
686 let mock_scan = MockScanOperator {
687 chunks: vec![chunk],
688 position: 0,
689 };
690
691 let mut project = ProjectOperator::with_store(
692 Box::new(mock_scan),
693 vec![ProjectExpr::EdgeResolve { column: 0 }],
694 vec![LogicalType::Any],
695 Arc::new(store),
696 );
697
698 let result = project.next().unwrap().unwrap();
699 let value = result.column(0).unwrap().get_value(0).unwrap();
700 if let Value::Map(map) = value {
701 assert_eq!(
702 map.get(&PropertyKey::new("_id")),
703 Some(&Value::Int64(edge_id.as_u64() as i64))
704 );
705 assert_eq!(
706 map.get(&PropertyKey::new("_type")),
707 Some(&Value::String("WORKS_AT".into()))
708 );
709 assert_eq!(
710 map.get(&PropertyKey::new("_source")),
711 Some(&Value::Int64(src.as_u64() as i64))
712 );
713 assert_eq!(
714 map.get(&PropertyKey::new("_target")),
715 Some(&Value::Int64(dst.as_u64() as i64))
716 );
717 assert_eq!(
718 map.get(&PropertyKey::new("since")),
719 Some(&Value::Int64(2020))
720 );
721 } else {
722 panic!("Expected Value::Map, got {:?}", value);
723 }
724 }
725
726 #[test]
727 fn test_project_resolve_missing_entity() {
728 use grafeo_common::types::NodeId;
729
730 let store = LpgStore::new().unwrap();
731
732 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
734 builder
735 .column_mut(0)
736 .unwrap()
737 .push_node_id(NodeId::new(999));
738 builder.advance_row();
739 let chunk = builder.finish();
740
741 let mock_scan = MockScanOperator {
742 chunks: vec![chunk],
743 position: 0,
744 };
745
746 let mut project = ProjectOperator::with_store(
747 Box::new(mock_scan),
748 vec![ProjectExpr::NodeResolve { column: 0 }],
749 vec![LogicalType::Any],
750 Arc::new(store),
751 );
752
753 let result = project.next().unwrap().unwrap();
754 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
755 }
756}