1use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47}
48
49pub struct ProjectOperator {
51 child: Box<dyn Operator>,
53 projections: Vec<ProjectExpr>,
55 output_types: Vec<LogicalType>,
57 store: Option<Arc<dyn GraphStore>>,
59 transaction_id: Option<TransactionId>,
61 viewing_epoch: Option<EpochId>,
63}
64
65impl ProjectOperator {
66 pub fn new(
68 child: Box<dyn Operator>,
69 projections: Vec<ProjectExpr>,
70 output_types: Vec<LogicalType>,
71 ) -> Self {
72 assert_eq!(projections.len(), output_types.len());
73 Self {
74 child,
75 projections,
76 output_types,
77 store: None,
78 transaction_id: None,
79 viewing_epoch: None,
80 }
81 }
82
83 pub fn with_store(
85 child: Box<dyn Operator>,
86 projections: Vec<ProjectExpr>,
87 output_types: Vec<LogicalType>,
88 store: Arc<dyn GraphStore>,
89 ) -> Self {
90 assert_eq!(projections.len(), output_types.len());
91 Self {
92 child,
93 projections,
94 output_types,
95 store: Some(store),
96 transaction_id: None,
97 viewing_epoch: None,
98 }
99 }
100
101 pub fn with_transaction_context(
103 mut self,
104 epoch: EpochId,
105 transaction_id: Option<TransactionId>,
106 ) -> Self {
107 self.viewing_epoch = Some(epoch);
108 self.transaction_id = transaction_id;
109 self
110 }
111
112 pub fn select_columns(
114 child: Box<dyn Operator>,
115 columns: Vec<usize>,
116 types: Vec<LogicalType>,
117 ) -> Self {
118 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
119 Self::new(child, projections, types)
120 }
121}
122
123impl Operator for ProjectOperator {
124 fn next(&mut self) -> OperatorResult {
125 let Some(input) = self.child.next()? else {
127 return Ok(None);
128 };
129
130 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
132
133 for (i, proj) in self.projections.iter().enumerate() {
135 match proj {
136 ProjectExpr::Column(col_idx) => {
137 let input_col = input.column(*col_idx).ok_or_else(|| {
139 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
140 })?;
141
142 let output_col = output
143 .column_mut(i)
144 .expect("column exists: index matches projection schema");
145
146 for row in input.selected_indices() {
148 if let Some(value) = input_col.get_value(row) {
149 output_col.push_value(value);
150 }
151 }
152 }
153 ProjectExpr::Constant(value) => {
154 let output_col = output
156 .column_mut(i)
157 .expect("column exists: index matches projection schema");
158 for _ in input.selected_indices() {
159 output_col.push_value(value.clone());
160 }
161 }
162 ProjectExpr::PropertyAccess { column, property } => {
163 let input_col = input
165 .column(*column)
166 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
167
168 let output_col = output
169 .column_mut(i)
170 .expect("column exists: index matches projection schema");
171
172 let store = self.store.as_ref().ok_or_else(|| {
173 OperatorError::Execution("Store required for property access".to_string())
174 })?;
175
176 let prop_key = PropertyKey::new(property);
183 let epoch = self.viewing_epoch;
184 let tx_id = self.transaction_id;
185 for row in input.selected_indices() {
186 let value = if let Some(node_id) = input_col.get_node_id(row) {
187 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
188 store.get_node_versioned(node_id, ep, tx)
189 } else {
190 store.get_node(node_id)
191 };
192 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
193 {
194 prop
195 } else if let Some(edge_id) = input_col.get_edge_id(row) {
196 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
199 store.get_edge_versioned(edge_id, ep, tx)
200 } else {
201 store.get_edge(edge_id)
202 };
203 edge.and_then(|e| e.get_property(property).cloned())
204 .unwrap_or(Value::Null)
205 } else {
206 Value::Null
207 }
208 } else if let Some(edge_id) = input_col.get_edge_id(row) {
209 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
210 store.get_edge_versioned(edge_id, ep, tx)
211 } else {
212 store.get_edge(edge_id)
213 };
214 edge.and_then(|e| e.get_property(property).cloned())
215 .unwrap_or(Value::Null)
216 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
217 map.get(&prop_key).cloned().unwrap_or(Value::Null)
218 } else {
219 Value::Null
220 };
221 output_col.push_value(value);
222 }
223 }
224 ProjectExpr::EdgeType { column } => {
225 let input_col = input
227 .column(*column)
228 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
229
230 let output_col = output
231 .column_mut(i)
232 .expect("column exists: index matches projection schema");
233
234 let store = self.store.as_ref().ok_or_else(|| {
235 OperatorError::Execution("Store required for edge type access".to_string())
236 })?;
237
238 let epoch = self.viewing_epoch;
239 let tx_id = self.transaction_id;
240 for row in input.selected_indices() {
241 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
242 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
243 store.edge_type_versioned(edge_id, ep, tx)
244 } else {
245 store.edge_type(edge_id)
246 };
247 etype.map_or(Value::Null, Value::String)
248 } else {
249 Value::Null
250 };
251 output_col.push_value(value);
252 }
253 }
254 ProjectExpr::Expression {
255 expr,
256 variable_columns,
257 } => {
258 let output_col = output
259 .column_mut(i)
260 .expect("column exists: index matches projection schema");
261
262 let store = self.store.as_ref().ok_or_else(|| {
263 OperatorError::Execution(
264 "Store required for expression evaluation".to_string(),
265 )
266 })?;
267
268 let mut evaluator = ExpressionPredicate::new(
270 expr.clone(),
271 variable_columns.clone(),
272 Arc::clone(store),
273 );
274 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
275 evaluator = evaluator.with_transaction_context(ep, tx_id);
276 }
277
278 for row in input.selected_indices() {
279 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
280 output_col.push_value(value);
281 }
282 }
283 ProjectExpr::NodeResolve { column } => {
284 let input_col = input
285 .column(*column)
286 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
287
288 let output_col = output
289 .column_mut(i)
290 .expect("column exists: index matches projection schema");
291
292 let store = self.store.as_ref().ok_or_else(|| {
293 OperatorError::Execution("Store required for node resolution".to_string())
294 })?;
295
296 let epoch = self.viewing_epoch;
297 let tx_id = self.transaction_id;
298 for row in input.selected_indices() {
299 let value = if let Some(node_id) = input_col.get_node_id(row) {
300 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
301 store.get_node_versioned(node_id, ep, tx)
302 } else {
303 store.get_node(node_id)
304 };
305 node.map_or(Value::Null, |n| node_to_map(&n))
306 } else {
307 Value::Null
308 };
309 output_col.push_value(value);
310 }
311 }
312 ProjectExpr::EdgeResolve { column } => {
313 let input_col = input
314 .column(*column)
315 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
316
317 let output_col = output
318 .column_mut(i)
319 .expect("column exists: index matches projection schema");
320
321 let store = self.store.as_ref().ok_or_else(|| {
322 OperatorError::Execution("Store required for edge resolution".to_string())
323 })?;
324
325 let epoch = self.viewing_epoch;
326 let tx_id = self.transaction_id;
327 for row in input.selected_indices() {
328 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
329 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
330 store.get_edge_versioned(edge_id, ep, tx)
331 } else {
332 store.get_edge(edge_id)
333 };
334 edge.map_or(Value::Null, |e| edge_to_map(&e))
335 } else {
336 Value::Null
337 };
338 output_col.push_value(value);
339 }
340 }
341 }
342 }
343
344 output.set_count(input.row_count());
345 Ok(Some(output))
346 }
347
348 fn reset(&mut self) {
349 self.child.reset();
350 }
351
352 fn name(&self) -> &'static str {
353 "Project"
354 }
355}
356
357fn node_to_map(node: &Node) -> Value {
362 let mut map = BTreeMap::new();
363 map.insert(
364 PropertyKey::new("_id"),
365 Value::Int64(node.id.as_u64() as i64),
366 );
367 let labels: Vec<Value> = node
368 .labels
369 .iter()
370 .map(|l| Value::String(l.clone()))
371 .collect();
372 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
373 for (key, value) in &node.properties {
374 map.insert(key.clone(), value.clone());
375 }
376 Value::Map(Arc::new(map))
377}
378
379fn edge_to_map(edge: &Edge) -> Value {
384 let mut map = BTreeMap::new();
385 map.insert(
386 PropertyKey::new("_id"),
387 Value::Int64(edge.id.as_u64() as i64),
388 );
389 map.insert(
390 PropertyKey::new("_type"),
391 Value::String(edge.edge_type.clone()),
392 );
393 map.insert(
394 PropertyKey::new("_source"),
395 Value::Int64(edge.src.as_u64() as i64),
396 );
397 map.insert(
398 PropertyKey::new("_target"),
399 Value::Int64(edge.dst.as_u64() as i64),
400 );
401 for (key, value) in &edge.properties {
402 map.insert(key.clone(), value.clone());
403 }
404 Value::Map(Arc::new(map))
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use crate::execution::chunk::DataChunkBuilder;
411 use crate::graph::lpg::LpgStore;
412 use grafeo_common::types::Value;
413
414 struct MockScanOperator {
415 chunks: Vec<DataChunk>,
416 position: usize,
417 }
418
419 impl Operator for MockScanOperator {
420 fn next(&mut self) -> OperatorResult {
421 if self.position < self.chunks.len() {
422 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
423 self.position += 1;
424 Ok(Some(chunk))
425 } else {
426 Ok(None)
427 }
428 }
429
430 fn reset(&mut self) {
431 self.position = 0;
432 }
433
434 fn name(&self) -> &'static str {
435 "MockScan"
436 }
437 }
438
439 #[test]
440 fn test_project_select_columns() {
441 let mut builder =
443 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
444
445 builder.column_mut(0).unwrap().push_int64(1);
446 builder.column_mut(1).unwrap().push_string("hello");
447 builder.column_mut(2).unwrap().push_int64(100);
448 builder.advance_row();
449
450 builder.column_mut(0).unwrap().push_int64(2);
451 builder.column_mut(1).unwrap().push_string("world");
452 builder.column_mut(2).unwrap().push_int64(200);
453 builder.advance_row();
454
455 let chunk = builder.finish();
456
457 let mock_scan = MockScanOperator {
458 chunks: vec![chunk],
459 position: 0,
460 };
461
462 let mut project = ProjectOperator::select_columns(
464 Box::new(mock_scan),
465 vec![2, 0],
466 vec![LogicalType::Int64, LogicalType::Int64],
467 );
468
469 let result = project.next().unwrap().unwrap();
470
471 assert_eq!(result.column_count(), 2);
472 assert_eq!(result.row_count(), 2);
473
474 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
476 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
477 }
478
479 #[test]
480 fn test_project_constant() {
481 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
482 builder.column_mut(0).unwrap().push_int64(1);
483 builder.advance_row();
484 builder.column_mut(0).unwrap().push_int64(2);
485 builder.advance_row();
486
487 let chunk = builder.finish();
488
489 let mock_scan = MockScanOperator {
490 chunks: vec![chunk],
491 position: 0,
492 };
493
494 let mut project = ProjectOperator::new(
496 Box::new(mock_scan),
497 vec![
498 ProjectExpr::Column(0),
499 ProjectExpr::Constant(Value::String("constant".into())),
500 ],
501 vec![LogicalType::Int64, LogicalType::String],
502 );
503
504 let result = project.next().unwrap().unwrap();
505
506 assert_eq!(result.column_count(), 2);
507 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
508 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
509 }
510
511 #[test]
512 fn test_project_empty_input() {
513 let mock_scan = MockScanOperator {
514 chunks: vec![],
515 position: 0,
516 };
517
518 let mut project =
519 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
520
521 assert!(project.next().unwrap().is_none());
522 }
523
524 #[test]
525 fn test_project_column_not_found() {
526 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
527 builder.column_mut(0).unwrap().push_int64(1);
528 builder.advance_row();
529 let chunk = builder.finish();
530
531 let mock_scan = MockScanOperator {
532 chunks: vec![chunk],
533 position: 0,
534 };
535
536 let mut project = ProjectOperator::new(
538 Box::new(mock_scan),
539 vec![ProjectExpr::Column(5)],
540 vec![LogicalType::Int64],
541 );
542
543 let result = project.next();
544 assert!(result.is_err(), "Should fail with ColumnNotFound");
545 }
546
547 #[test]
548 fn test_project_multiple_constants() {
549 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
550 builder.column_mut(0).unwrap().push_int64(1);
551 builder.advance_row();
552 let chunk = builder.finish();
553
554 let mock_scan = MockScanOperator {
555 chunks: vec![chunk],
556 position: 0,
557 };
558
559 let mut project = ProjectOperator::new(
560 Box::new(mock_scan),
561 vec![
562 ProjectExpr::Constant(Value::Int64(42)),
563 ProjectExpr::Constant(Value::String("fixed".into())),
564 ProjectExpr::Constant(Value::Bool(true)),
565 ],
566 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
567 );
568
569 let result = project.next().unwrap().unwrap();
570 assert_eq!(result.column_count(), 3);
571 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
572 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
573 assert_eq!(
574 result.column(2).unwrap().get_value(0),
575 Some(Value::Bool(true))
576 );
577 }
578
579 #[test]
580 fn test_project_identity() {
581 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
583 builder.column_mut(0).unwrap().push_int64(10);
584 builder.column_mut(1).unwrap().push_string("test");
585 builder.advance_row();
586 let chunk = builder.finish();
587
588 let mock_scan = MockScanOperator {
589 chunks: vec![chunk],
590 position: 0,
591 };
592
593 let mut project = ProjectOperator::select_columns(
594 Box::new(mock_scan),
595 vec![0, 1],
596 vec![LogicalType::Int64, LogicalType::String],
597 );
598
599 let result = project.next().unwrap().unwrap();
600 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
601 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
602 }
603
604 #[test]
605 fn test_project_name() {
606 let mock_scan = MockScanOperator {
607 chunks: vec![],
608 position: 0,
609 };
610 let project =
611 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
612 assert_eq!(project.name(), "Project");
613 }
614
615 #[test]
616 fn test_project_node_resolve() {
617 let store = LpgStore::new().unwrap();
619 let node_id = store.create_node(&["Person"]);
620 store.set_node_property(node_id, "name", Value::String("Alix".into()));
621 store.set_node_property(node_id, "age", Value::Int64(30));
622
623 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
625 builder.column_mut(0).unwrap().push_node_id(node_id);
626 builder.advance_row();
627 let chunk = builder.finish();
628
629 let mock_scan = MockScanOperator {
630 chunks: vec![chunk],
631 position: 0,
632 };
633
634 let mut project = ProjectOperator::with_store(
635 Box::new(mock_scan),
636 vec![ProjectExpr::NodeResolve { column: 0 }],
637 vec![LogicalType::Any],
638 Arc::new(store),
639 );
640
641 let result = project.next().unwrap().unwrap();
642 assert_eq!(result.column_count(), 1);
643
644 let value = result.column(0).unwrap().get_value(0).unwrap();
645 if let Value::Map(map) = value {
646 assert_eq!(
647 map.get(&PropertyKey::new("_id")),
648 Some(&Value::Int64(node_id.as_u64() as i64))
649 );
650 assert!(map.get(&PropertyKey::new("_labels")).is_some());
651 assert_eq!(
652 map.get(&PropertyKey::new("name")),
653 Some(&Value::String("Alix".into()))
654 );
655 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
656 } else {
657 panic!("Expected Value::Map, got {:?}", value);
658 }
659 }
660
661 #[test]
662 fn test_project_edge_resolve() {
663 let store = LpgStore::new().unwrap();
664 let src = store.create_node(&["Person"]);
665 let dst = store.create_node(&["Company"]);
666 let edge_id = store.create_edge(src, dst, "WORKS_AT");
667 store.set_edge_property(edge_id, "since", Value::Int64(2020));
668
669 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
671 builder.column_mut(0).unwrap().push_edge_id(edge_id);
672 builder.advance_row();
673 let chunk = builder.finish();
674
675 let mock_scan = MockScanOperator {
676 chunks: vec![chunk],
677 position: 0,
678 };
679
680 let mut project = ProjectOperator::with_store(
681 Box::new(mock_scan),
682 vec![ProjectExpr::EdgeResolve { column: 0 }],
683 vec![LogicalType::Any],
684 Arc::new(store),
685 );
686
687 let result = project.next().unwrap().unwrap();
688 let value = result.column(0).unwrap().get_value(0).unwrap();
689 if let Value::Map(map) = value {
690 assert_eq!(
691 map.get(&PropertyKey::new("_id")),
692 Some(&Value::Int64(edge_id.as_u64() as i64))
693 );
694 assert_eq!(
695 map.get(&PropertyKey::new("_type")),
696 Some(&Value::String("WORKS_AT".into()))
697 );
698 assert_eq!(
699 map.get(&PropertyKey::new("_source")),
700 Some(&Value::Int64(src.as_u64() as i64))
701 );
702 assert_eq!(
703 map.get(&PropertyKey::new("_target")),
704 Some(&Value::Int64(dst.as_u64() as i64))
705 );
706 assert_eq!(
707 map.get(&PropertyKey::new("since")),
708 Some(&Value::Int64(2020))
709 );
710 } else {
711 panic!("Expected Value::Map, got {:?}", value);
712 }
713 }
714
715 #[test]
716 fn test_project_resolve_missing_entity() {
717 use grafeo_common::types::NodeId;
718
719 let store = LpgStore::new().unwrap();
720
721 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
723 builder
724 .column_mut(0)
725 .unwrap()
726 .push_node_id(NodeId::new(999));
727 builder.advance_row();
728 let chunk = builder.finish();
729
730 let mock_scan = MockScanOperator {
731 chunks: vec![chunk],
732 position: 0,
733 };
734
735 let mut project = ProjectOperator::with_store(
736 Box::new(mock_scan),
737 vec![ProjectExpr::NodeResolve { column: 0 }],
738 vec![LogicalType::Any],
739 Arc::new(store),
740 );
741
742 let result = project.next().unwrap().unwrap();
743 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
744 }
745}