Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47}
48
49/// A project operator that selects and transforms columns.
50pub struct ProjectOperator {
51    /// Child operator to read from.
52    child: Box<dyn Operator>,
53    /// Projection expressions.
54    projections: Vec<ProjectExpr>,
55    /// Output column types.
56    output_types: Vec<LogicalType>,
57    /// Optional store for property access.
58    store: Option<Arc<dyn GraphStore>>,
59}
60
61impl ProjectOperator {
62    /// Creates a new project operator.
63    pub fn new(
64        child: Box<dyn Operator>,
65        projections: Vec<ProjectExpr>,
66        output_types: Vec<LogicalType>,
67    ) -> Self {
68        assert_eq!(projections.len(), output_types.len());
69        Self {
70            child,
71            projections,
72            output_types,
73            store: None,
74        }
75    }
76
77    /// Creates a new project operator with store access for property lookups.
78    pub fn with_store(
79        child: Box<dyn Operator>,
80        projections: Vec<ProjectExpr>,
81        output_types: Vec<LogicalType>,
82        store: Arc<dyn GraphStore>,
83    ) -> Self {
84        assert_eq!(projections.len(), output_types.len());
85        Self {
86            child,
87            projections,
88            output_types,
89            store: Some(store),
90        }
91    }
92
93    /// Creates a project operator that selects specific columns.
94    pub fn select_columns(
95        child: Box<dyn Operator>,
96        columns: Vec<usize>,
97        types: Vec<LogicalType>,
98    ) -> Self {
99        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
100        Self::new(child, projections, types)
101    }
102}
103
104impl Operator for ProjectOperator {
105    fn next(&mut self) -> OperatorResult {
106        // Get next chunk from child
107        let Some(input) = self.child.next()? else {
108            return Ok(None);
109        };
110
111        // Create output chunk
112        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
113
114        // Evaluate each projection
115        for (i, proj) in self.projections.iter().enumerate() {
116            match proj {
117                ProjectExpr::Column(col_idx) => {
118                    // Copy column from input to output
119                    let input_col = input.column(*col_idx).ok_or_else(|| {
120                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
121                    })?;
122
123                    let output_col = output
124                        .column_mut(i)
125                        .expect("column exists: index matches projection schema");
126
127                    // Copy selected rows
128                    for row in input.selected_indices() {
129                        if let Some(value) = input_col.get_value(row) {
130                            output_col.push_value(value);
131                        }
132                    }
133                }
134                ProjectExpr::Constant(value) => {
135                    // Push constant for each row
136                    let output_col = output
137                        .column_mut(i)
138                        .expect("column exists: index matches projection schema");
139                    for _ in input.selected_indices() {
140                        output_col.push_value(value.clone());
141                    }
142                }
143                ProjectExpr::PropertyAccess { column, property } => {
144                    // Access property from node/edge in the specified column
145                    let input_col = input
146                        .column(*column)
147                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
148
149                    let output_col = output
150                        .column_mut(i)
151                        .expect("column exists: index matches projection schema");
152
153                    let store = self.store.as_ref().ok_or_else(|| {
154                        OperatorError::Execution("Store required for property access".to_string())
155                    })?;
156
157                    // Extract property for each row
158                    let prop_key = PropertyKey::new(property);
159                    for row in input.selected_indices() {
160                        // Try node ID, then edge ID, then map value (for UNWIND maps)
161                        let value = if let Some(node_id) = input_col.get_node_id(row) {
162                            store
163                                .get_node(node_id)
164                                .and_then(|node| node.get_property(property).cloned())
165                                .unwrap_or(Value::Null)
166                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
167                            store
168                                .get_edge(edge_id)
169                                .and_then(|edge| edge.get_property(property).cloned())
170                                .unwrap_or(Value::Null)
171                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
172                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
173                        } else {
174                            Value::Null
175                        };
176                        output_col.push_value(value);
177                    }
178                }
179                ProjectExpr::EdgeType { column } => {
180                    // Get edge type string from an edge column
181                    let input_col = input
182                        .column(*column)
183                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
184
185                    let output_col = output
186                        .column_mut(i)
187                        .expect("column exists: index matches projection schema");
188
189                    let store = self.store.as_ref().ok_or_else(|| {
190                        OperatorError::Execution("Store required for edge type access".to_string())
191                    })?;
192
193                    for row in input.selected_indices() {
194                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
195                            store.edge_type(edge_id).map_or(Value::Null, Value::String)
196                        } else {
197                            Value::Null
198                        };
199                        output_col.push_value(value);
200                    }
201                }
202                ProjectExpr::Expression {
203                    expr,
204                    variable_columns,
205                } => {
206                    let output_col = output
207                        .column_mut(i)
208                        .expect("column exists: index matches projection schema");
209
210                    let store = self.store.as_ref().ok_or_else(|| {
211                        OperatorError::Execution(
212                            "Store required for expression evaluation".to_string(),
213                        )
214                    })?;
215
216                    // Use the ExpressionPredicate for expression evaluation
217                    let evaluator = ExpressionPredicate::new(
218                        expr.clone(),
219                        variable_columns.clone(),
220                        Arc::clone(store),
221                    );
222
223                    for row in input.selected_indices() {
224                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
225                        output_col.push_value(value);
226                    }
227                }
228                ProjectExpr::NodeResolve { column } => {
229                    let input_col = input
230                        .column(*column)
231                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
232
233                    let output_col = output
234                        .column_mut(i)
235                        .expect("column exists: index matches projection schema");
236
237                    let store = self.store.as_ref().ok_or_else(|| {
238                        OperatorError::Execution("Store required for node resolution".to_string())
239                    })?;
240
241                    for row in input.selected_indices() {
242                        let value = if let Some(node_id) = input_col.get_node_id(row) {
243                            store
244                                .get_node(node_id)
245                                .map_or(Value::Null, |n| node_to_map(&n))
246                        } else {
247                            Value::Null
248                        };
249                        output_col.push_value(value);
250                    }
251                }
252                ProjectExpr::EdgeResolve { column } => {
253                    let input_col = input
254                        .column(*column)
255                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
256
257                    let output_col = output
258                        .column_mut(i)
259                        .expect("column exists: index matches projection schema");
260
261                    let store = self.store.as_ref().ok_or_else(|| {
262                        OperatorError::Execution("Store required for edge resolution".to_string())
263                    })?;
264
265                    for row in input.selected_indices() {
266                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
267                            store
268                                .get_edge(edge_id)
269                                .map_or(Value::Null, |e| edge_to_map(&e))
270                        } else {
271                            Value::Null
272                        };
273                        output_col.push_value(value);
274                    }
275                }
276            }
277        }
278
279        output.set_count(input.row_count());
280        Ok(Some(output))
281    }
282
283    fn reset(&mut self) {
284        self.child.reset();
285    }
286
287    fn name(&self) -> &'static str {
288        "Project"
289    }
290}
291
292/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
293///
294/// The map contains `_id` (integer), `_labels` (list of strings), and
295/// all node properties at the top level.
296fn node_to_map(node: &Node) -> Value {
297    let mut map = BTreeMap::new();
298    map.insert(
299        PropertyKey::new("_id"),
300        Value::Int64(node.id.as_u64() as i64),
301    );
302    let labels: Vec<Value> = node
303        .labels
304        .iter()
305        .map(|l| Value::String(l.clone()))
306        .collect();
307    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
308    for (key, value) in &node.properties {
309        map.insert(key.clone(), value.clone());
310    }
311    Value::Map(Arc::new(map))
312}
313
314/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
315///
316/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
317/// properties at the top level.
318fn edge_to_map(edge: &Edge) -> Value {
319    let mut map = BTreeMap::new();
320    map.insert(
321        PropertyKey::new("_id"),
322        Value::Int64(edge.id.as_u64() as i64),
323    );
324    map.insert(
325        PropertyKey::new("_type"),
326        Value::String(edge.edge_type.clone()),
327    );
328    map.insert(
329        PropertyKey::new("_source"),
330        Value::Int64(edge.src.as_u64() as i64),
331    );
332    map.insert(
333        PropertyKey::new("_target"),
334        Value::Int64(edge.dst.as_u64() as i64),
335    );
336    for (key, value) in &edge.properties {
337        map.insert(key.clone(), value.clone());
338    }
339    Value::Map(Arc::new(map))
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::execution::chunk::DataChunkBuilder;
346    use crate::graph::lpg::LpgStore;
347    use grafeo_common::types::Value;
348
349    struct MockScanOperator {
350        chunks: Vec<DataChunk>,
351        position: usize,
352    }
353
354    impl Operator for MockScanOperator {
355        fn next(&mut self) -> OperatorResult {
356            if self.position < self.chunks.len() {
357                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
358                self.position += 1;
359                Ok(Some(chunk))
360            } else {
361                Ok(None)
362            }
363        }
364
365        fn reset(&mut self) {
366            self.position = 0;
367        }
368
369        fn name(&self) -> &'static str {
370            "MockScan"
371        }
372    }
373
374    #[test]
375    fn test_project_select_columns() {
376        // Create input with 3 columns: [int, string, int]
377        let mut builder =
378            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
379
380        builder.column_mut(0).unwrap().push_int64(1);
381        builder.column_mut(1).unwrap().push_string("hello");
382        builder.column_mut(2).unwrap().push_int64(100);
383        builder.advance_row();
384
385        builder.column_mut(0).unwrap().push_int64(2);
386        builder.column_mut(1).unwrap().push_string("world");
387        builder.column_mut(2).unwrap().push_int64(200);
388        builder.advance_row();
389
390        let chunk = builder.finish();
391
392        let mock_scan = MockScanOperator {
393            chunks: vec![chunk],
394            position: 0,
395        };
396
397        // Project to select columns 2 and 0 (reordering)
398        let mut project = ProjectOperator::select_columns(
399            Box::new(mock_scan),
400            vec![2, 0],
401            vec![LogicalType::Int64, LogicalType::Int64],
402        );
403
404        let result = project.next().unwrap().unwrap();
405
406        assert_eq!(result.column_count(), 2);
407        assert_eq!(result.row_count(), 2);
408
409        // Check values are reordered
410        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
411        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
412    }
413
414    #[test]
415    fn test_project_constant() {
416        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
417        builder.column_mut(0).unwrap().push_int64(1);
418        builder.advance_row();
419        builder.column_mut(0).unwrap().push_int64(2);
420        builder.advance_row();
421
422        let chunk = builder.finish();
423
424        let mock_scan = MockScanOperator {
425            chunks: vec![chunk],
426            position: 0,
427        };
428
429        // Project with a constant
430        let mut project = ProjectOperator::new(
431            Box::new(mock_scan),
432            vec![
433                ProjectExpr::Column(0),
434                ProjectExpr::Constant(Value::String("constant".into())),
435            ],
436            vec![LogicalType::Int64, LogicalType::String],
437        );
438
439        let result = project.next().unwrap().unwrap();
440
441        assert_eq!(result.column_count(), 2);
442        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
443        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
444    }
445
446    #[test]
447    fn test_project_empty_input() {
448        let mock_scan = MockScanOperator {
449            chunks: vec![],
450            position: 0,
451        };
452
453        let mut project =
454            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
455
456        assert!(project.next().unwrap().is_none());
457    }
458
459    #[test]
460    fn test_project_column_not_found() {
461        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
462        builder.column_mut(0).unwrap().push_int64(1);
463        builder.advance_row();
464        let chunk = builder.finish();
465
466        let mock_scan = MockScanOperator {
467            chunks: vec![chunk],
468            position: 0,
469        };
470
471        // Reference column index 5 which doesn't exist
472        let mut project = ProjectOperator::new(
473            Box::new(mock_scan),
474            vec![ProjectExpr::Column(5)],
475            vec![LogicalType::Int64],
476        );
477
478        let result = project.next();
479        assert!(result.is_err(), "Should fail with ColumnNotFound");
480    }
481
482    #[test]
483    fn test_project_multiple_constants() {
484        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
485        builder.column_mut(0).unwrap().push_int64(1);
486        builder.advance_row();
487        let chunk = builder.finish();
488
489        let mock_scan = MockScanOperator {
490            chunks: vec![chunk],
491            position: 0,
492        };
493
494        let mut project = ProjectOperator::new(
495            Box::new(mock_scan),
496            vec![
497                ProjectExpr::Constant(Value::Int64(42)),
498                ProjectExpr::Constant(Value::String("fixed".into())),
499                ProjectExpr::Constant(Value::Bool(true)),
500            ],
501            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
502        );
503
504        let result = project.next().unwrap().unwrap();
505        assert_eq!(result.column_count(), 3);
506        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
507        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
508        assert_eq!(
509            result.column(2).unwrap().get_value(0),
510            Some(Value::Bool(true))
511        );
512    }
513
514    #[test]
515    fn test_project_identity() {
516        // Select all columns in original order
517        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
518        builder.column_mut(0).unwrap().push_int64(10);
519        builder.column_mut(1).unwrap().push_string("test");
520        builder.advance_row();
521        let chunk = builder.finish();
522
523        let mock_scan = MockScanOperator {
524            chunks: vec![chunk],
525            position: 0,
526        };
527
528        let mut project = ProjectOperator::select_columns(
529            Box::new(mock_scan),
530            vec![0, 1],
531            vec![LogicalType::Int64, LogicalType::String],
532        );
533
534        let result = project.next().unwrap().unwrap();
535        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
536        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
537    }
538
539    #[test]
540    fn test_project_name() {
541        let mock_scan = MockScanOperator {
542            chunks: vec![],
543            position: 0,
544        };
545        let project =
546            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
547        assert_eq!(project.name(), "Project");
548    }
549
550    #[test]
551    fn test_project_node_resolve() {
552        // Create a store with a test node
553        let store = LpgStore::new().unwrap();
554        let node_id = store.create_node(&["Person"]);
555        store.set_node_property(node_id, "name", Value::String("Alix".into()));
556        store.set_node_property(node_id, "age", Value::Int64(30));
557
558        // Create input chunk with a NodeId column
559        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
560        builder.column_mut(0).unwrap().push_node_id(node_id);
561        builder.advance_row();
562        let chunk = builder.finish();
563
564        let mock_scan = MockScanOperator {
565            chunks: vec![chunk],
566            position: 0,
567        };
568
569        let mut project = ProjectOperator::with_store(
570            Box::new(mock_scan),
571            vec![ProjectExpr::NodeResolve { column: 0 }],
572            vec![LogicalType::Any],
573            Arc::new(store),
574        );
575
576        let result = project.next().unwrap().unwrap();
577        assert_eq!(result.column_count(), 1);
578
579        let value = result.column(0).unwrap().get_value(0).unwrap();
580        if let Value::Map(map) = value {
581            assert_eq!(
582                map.get(&PropertyKey::new("_id")),
583                Some(&Value::Int64(node_id.as_u64() as i64))
584            );
585            assert!(map.get(&PropertyKey::new("_labels")).is_some());
586            assert_eq!(
587                map.get(&PropertyKey::new("name")),
588                Some(&Value::String("Alix".into()))
589            );
590            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
591        } else {
592            panic!("Expected Value::Map, got {:?}", value);
593        }
594    }
595
596    #[test]
597    fn test_project_edge_resolve() {
598        let store = LpgStore::new().unwrap();
599        let src = store.create_node(&["Person"]);
600        let dst = store.create_node(&["Company"]);
601        let edge_id = store.create_edge(src, dst, "WORKS_AT");
602        store.set_edge_property(edge_id, "since", Value::Int64(2020));
603
604        // Create input chunk with an EdgeId column
605        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
606        builder.column_mut(0).unwrap().push_edge_id(edge_id);
607        builder.advance_row();
608        let chunk = builder.finish();
609
610        let mock_scan = MockScanOperator {
611            chunks: vec![chunk],
612            position: 0,
613        };
614
615        let mut project = ProjectOperator::with_store(
616            Box::new(mock_scan),
617            vec![ProjectExpr::EdgeResolve { column: 0 }],
618            vec![LogicalType::Any],
619            Arc::new(store),
620        );
621
622        let result = project.next().unwrap().unwrap();
623        let value = result.column(0).unwrap().get_value(0).unwrap();
624        if let Value::Map(map) = value {
625            assert_eq!(
626                map.get(&PropertyKey::new("_id")),
627                Some(&Value::Int64(edge_id.as_u64() as i64))
628            );
629            assert_eq!(
630                map.get(&PropertyKey::new("_type")),
631                Some(&Value::String("WORKS_AT".into()))
632            );
633            assert_eq!(
634                map.get(&PropertyKey::new("_source")),
635                Some(&Value::Int64(src.as_u64() as i64))
636            );
637            assert_eq!(
638                map.get(&PropertyKey::new("_target")),
639                Some(&Value::Int64(dst.as_u64() as i64))
640            );
641            assert_eq!(
642                map.get(&PropertyKey::new("since")),
643                Some(&Value::Int64(2020))
644            );
645        } else {
646            panic!("Expected Value::Map, got {:?}", value);
647        }
648    }
649
650    #[test]
651    fn test_project_resolve_missing_entity() {
652        use grafeo_common::types::NodeId;
653
654        let store = LpgStore::new().unwrap();
655
656        // Create input chunk with a NodeId that doesn't exist in the store
657        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
658        builder
659            .column_mut(0)
660            .unwrap()
661            .push_node_id(NodeId::new(999));
662        builder.advance_row();
663        let chunk = builder.finish();
664
665        let mock_scan = MockScanOperator {
666            chunks: vec![chunk],
667            position: 0,
668        };
669
670        let mut project = ProjectOperator::with_store(
671            Box::new(mock_scan),
672            vec![ProjectExpr::NodeResolve { column: 0 }],
673            vec![LogicalType::Any],
674            Arc::new(store),
675        );
676
677        let result = project.next().unwrap().unwrap();
678        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
679    }
680}