Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47}
48
49/// A project operator that selects and transforms columns.
50pub struct ProjectOperator {
51    /// Child operator to read from.
52    child: Box<dyn Operator>,
53    /// Projection expressions.
54    projections: Vec<ProjectExpr>,
55    /// Output column types.
56    output_types: Vec<LogicalType>,
57    /// Optional store for property access.
58    store: Option<Arc<dyn GraphStore>>,
59}
60
61impl ProjectOperator {
62    /// Creates a new project operator.
63    pub fn new(
64        child: Box<dyn Operator>,
65        projections: Vec<ProjectExpr>,
66        output_types: Vec<LogicalType>,
67    ) -> Self {
68        assert_eq!(projections.len(), output_types.len());
69        Self {
70            child,
71            projections,
72            output_types,
73            store: None,
74        }
75    }
76
77    /// Creates a new project operator with store access for property lookups.
78    pub fn with_store(
79        child: Box<dyn Operator>,
80        projections: Vec<ProjectExpr>,
81        output_types: Vec<LogicalType>,
82        store: Arc<dyn GraphStore>,
83    ) -> Self {
84        assert_eq!(projections.len(), output_types.len());
85        Self {
86            child,
87            projections,
88            output_types,
89            store: Some(store),
90        }
91    }
92
93    /// Creates a project operator that selects specific columns.
94    pub fn select_columns(
95        child: Box<dyn Operator>,
96        columns: Vec<usize>,
97        types: Vec<LogicalType>,
98    ) -> Self {
99        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
100        Self::new(child, projections, types)
101    }
102}
103
104impl Operator for ProjectOperator {
105    fn next(&mut self) -> OperatorResult {
106        // Get next chunk from child
107        let Some(input) = self.child.next()? else {
108            return Ok(None);
109        };
110
111        // Create output chunk
112        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
113
114        // Evaluate each projection
115        for (i, proj) in self.projections.iter().enumerate() {
116            match proj {
117                ProjectExpr::Column(col_idx) => {
118                    // Copy column from input to output
119                    let input_col = input.column(*col_idx).ok_or_else(|| {
120                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
121                    })?;
122
123                    let output_col = output
124                        .column_mut(i)
125                        .expect("column exists: index matches projection schema");
126
127                    // Copy selected rows
128                    for row in input.selected_indices() {
129                        if let Some(value) = input_col.get_value(row) {
130                            output_col.push_value(value);
131                        }
132                    }
133                }
134                ProjectExpr::Constant(value) => {
135                    // Push constant for each row
136                    let output_col = output
137                        .column_mut(i)
138                        .expect("column exists: index matches projection schema");
139                    for _ in input.selected_indices() {
140                        output_col.push_value(value.clone());
141                    }
142                }
143                ProjectExpr::PropertyAccess { column, property } => {
144                    // Access property from node/edge in the specified column
145                    let input_col = input
146                        .column(*column)
147                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
148
149                    let output_col = output
150                        .column_mut(i)
151                        .expect("column exists: index matches projection schema");
152
153                    let store = self.store.as_ref().ok_or_else(|| {
154                        OperatorError::Execution("Store required for property access".to_string())
155                    })?;
156
157                    // Extract property for each row.
158                    // For typed columns (VectorData::NodeId / EdgeId) there is
159                    // no ambiguity. For Generic/Any columns (e.g. after a hash
160                    // join), both get_node_id and get_edge_id can succeed on the
161                    // same Int64 value, so we verify against the store to resolve
162                    // the entity type.
163                    let prop_key = PropertyKey::new(property);
164                    for row in input.selected_indices() {
165                        let value = if let Some(node_id) = input_col.get_node_id(row) {
166                            if let Some(prop) = store
167                                .get_node(node_id)
168                                .and_then(|node| node.get_property(property).cloned())
169                            {
170                                prop
171                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
172                                // Node lookup failed: the ID may belong to an
173                                // edge (common with Generic columns after joins).
174                                store
175                                    .get_edge(edge_id)
176                                    .and_then(|edge| edge.get_property(property).cloned())
177                                    .unwrap_or(Value::Null)
178                            } else {
179                                Value::Null
180                            }
181                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
182                            store
183                                .get_edge(edge_id)
184                                .and_then(|edge| edge.get_property(property).cloned())
185                                .unwrap_or(Value::Null)
186                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
187                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
188                        } else {
189                            Value::Null
190                        };
191                        output_col.push_value(value);
192                    }
193                }
194                ProjectExpr::EdgeType { column } => {
195                    // Get edge type string from an edge column
196                    let input_col = input
197                        .column(*column)
198                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
199
200                    let output_col = output
201                        .column_mut(i)
202                        .expect("column exists: index matches projection schema");
203
204                    let store = self.store.as_ref().ok_or_else(|| {
205                        OperatorError::Execution("Store required for edge type access".to_string())
206                    })?;
207
208                    for row in input.selected_indices() {
209                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
210                            store.edge_type(edge_id).map_or(Value::Null, Value::String)
211                        } else {
212                            Value::Null
213                        };
214                        output_col.push_value(value);
215                    }
216                }
217                ProjectExpr::Expression {
218                    expr,
219                    variable_columns,
220                } => {
221                    let output_col = output
222                        .column_mut(i)
223                        .expect("column exists: index matches projection schema");
224
225                    let store = self.store.as_ref().ok_or_else(|| {
226                        OperatorError::Execution(
227                            "Store required for expression evaluation".to_string(),
228                        )
229                    })?;
230
231                    // Use the ExpressionPredicate for expression evaluation
232                    let evaluator = ExpressionPredicate::new(
233                        expr.clone(),
234                        variable_columns.clone(),
235                        Arc::clone(store),
236                    );
237
238                    for row in input.selected_indices() {
239                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
240                        output_col.push_value(value);
241                    }
242                }
243                ProjectExpr::NodeResolve { column } => {
244                    let input_col = input
245                        .column(*column)
246                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
247
248                    let output_col = output
249                        .column_mut(i)
250                        .expect("column exists: index matches projection schema");
251
252                    let store = self.store.as_ref().ok_or_else(|| {
253                        OperatorError::Execution("Store required for node resolution".to_string())
254                    })?;
255
256                    for row in input.selected_indices() {
257                        let value = if let Some(node_id) = input_col.get_node_id(row) {
258                            store
259                                .get_node(node_id)
260                                .map_or(Value::Null, |n| node_to_map(&n))
261                        } else {
262                            Value::Null
263                        };
264                        output_col.push_value(value);
265                    }
266                }
267                ProjectExpr::EdgeResolve { column } => {
268                    let input_col = input
269                        .column(*column)
270                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
271
272                    let output_col = output
273                        .column_mut(i)
274                        .expect("column exists: index matches projection schema");
275
276                    let store = self.store.as_ref().ok_or_else(|| {
277                        OperatorError::Execution("Store required for edge resolution".to_string())
278                    })?;
279
280                    for row in input.selected_indices() {
281                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
282                            store
283                                .get_edge(edge_id)
284                                .map_or(Value::Null, |e| edge_to_map(&e))
285                        } else {
286                            Value::Null
287                        };
288                        output_col.push_value(value);
289                    }
290                }
291            }
292        }
293
294        output.set_count(input.row_count());
295        Ok(Some(output))
296    }
297
298    fn reset(&mut self) {
299        self.child.reset();
300    }
301
302    fn name(&self) -> &'static str {
303        "Project"
304    }
305}
306
307/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
308///
309/// The map contains `_id` (integer), `_labels` (list of strings), and
310/// all node properties at the top level.
311fn node_to_map(node: &Node) -> Value {
312    let mut map = BTreeMap::new();
313    map.insert(
314        PropertyKey::new("_id"),
315        Value::Int64(node.id.as_u64() as i64),
316    );
317    let labels: Vec<Value> = node
318        .labels
319        .iter()
320        .map(|l| Value::String(l.clone()))
321        .collect();
322    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
323    for (key, value) in &node.properties {
324        map.insert(key.clone(), value.clone());
325    }
326    Value::Map(Arc::new(map))
327}
328
329/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
330///
331/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
332/// properties at the top level.
333fn edge_to_map(edge: &Edge) -> Value {
334    let mut map = BTreeMap::new();
335    map.insert(
336        PropertyKey::new("_id"),
337        Value::Int64(edge.id.as_u64() as i64),
338    );
339    map.insert(
340        PropertyKey::new("_type"),
341        Value::String(edge.edge_type.clone()),
342    );
343    map.insert(
344        PropertyKey::new("_source"),
345        Value::Int64(edge.src.as_u64() as i64),
346    );
347    map.insert(
348        PropertyKey::new("_target"),
349        Value::Int64(edge.dst.as_u64() as i64),
350    );
351    for (key, value) in &edge.properties {
352        map.insert(key.clone(), value.clone());
353    }
354    Value::Map(Arc::new(map))
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::execution::chunk::DataChunkBuilder;
361    use crate::graph::lpg::LpgStore;
362    use grafeo_common::types::Value;
363
364    struct MockScanOperator {
365        chunks: Vec<DataChunk>,
366        position: usize,
367    }
368
369    impl Operator for MockScanOperator {
370        fn next(&mut self) -> OperatorResult {
371            if self.position < self.chunks.len() {
372                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
373                self.position += 1;
374                Ok(Some(chunk))
375            } else {
376                Ok(None)
377            }
378        }
379
380        fn reset(&mut self) {
381            self.position = 0;
382        }
383
384        fn name(&self) -> &'static str {
385            "MockScan"
386        }
387    }
388
389    #[test]
390    fn test_project_select_columns() {
391        // Create input with 3 columns: [int, string, int]
392        let mut builder =
393            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
394
395        builder.column_mut(0).unwrap().push_int64(1);
396        builder.column_mut(1).unwrap().push_string("hello");
397        builder.column_mut(2).unwrap().push_int64(100);
398        builder.advance_row();
399
400        builder.column_mut(0).unwrap().push_int64(2);
401        builder.column_mut(1).unwrap().push_string("world");
402        builder.column_mut(2).unwrap().push_int64(200);
403        builder.advance_row();
404
405        let chunk = builder.finish();
406
407        let mock_scan = MockScanOperator {
408            chunks: vec![chunk],
409            position: 0,
410        };
411
412        // Project to select columns 2 and 0 (reordering)
413        let mut project = ProjectOperator::select_columns(
414            Box::new(mock_scan),
415            vec![2, 0],
416            vec![LogicalType::Int64, LogicalType::Int64],
417        );
418
419        let result = project.next().unwrap().unwrap();
420
421        assert_eq!(result.column_count(), 2);
422        assert_eq!(result.row_count(), 2);
423
424        // Check values are reordered
425        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
426        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
427    }
428
429    #[test]
430    fn test_project_constant() {
431        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
432        builder.column_mut(0).unwrap().push_int64(1);
433        builder.advance_row();
434        builder.column_mut(0).unwrap().push_int64(2);
435        builder.advance_row();
436
437        let chunk = builder.finish();
438
439        let mock_scan = MockScanOperator {
440            chunks: vec![chunk],
441            position: 0,
442        };
443
444        // Project with a constant
445        let mut project = ProjectOperator::new(
446            Box::new(mock_scan),
447            vec![
448                ProjectExpr::Column(0),
449                ProjectExpr::Constant(Value::String("constant".into())),
450            ],
451            vec![LogicalType::Int64, LogicalType::String],
452        );
453
454        let result = project.next().unwrap().unwrap();
455
456        assert_eq!(result.column_count(), 2);
457        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
458        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
459    }
460
461    #[test]
462    fn test_project_empty_input() {
463        let mock_scan = MockScanOperator {
464            chunks: vec![],
465            position: 0,
466        };
467
468        let mut project =
469            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
470
471        assert!(project.next().unwrap().is_none());
472    }
473
474    #[test]
475    fn test_project_column_not_found() {
476        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
477        builder.column_mut(0).unwrap().push_int64(1);
478        builder.advance_row();
479        let chunk = builder.finish();
480
481        let mock_scan = MockScanOperator {
482            chunks: vec![chunk],
483            position: 0,
484        };
485
486        // Reference column index 5 which doesn't exist
487        let mut project = ProjectOperator::new(
488            Box::new(mock_scan),
489            vec![ProjectExpr::Column(5)],
490            vec![LogicalType::Int64],
491        );
492
493        let result = project.next();
494        assert!(result.is_err(), "Should fail with ColumnNotFound");
495    }
496
497    #[test]
498    fn test_project_multiple_constants() {
499        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
500        builder.column_mut(0).unwrap().push_int64(1);
501        builder.advance_row();
502        let chunk = builder.finish();
503
504        let mock_scan = MockScanOperator {
505            chunks: vec![chunk],
506            position: 0,
507        };
508
509        let mut project = ProjectOperator::new(
510            Box::new(mock_scan),
511            vec![
512                ProjectExpr::Constant(Value::Int64(42)),
513                ProjectExpr::Constant(Value::String("fixed".into())),
514                ProjectExpr::Constant(Value::Bool(true)),
515            ],
516            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
517        );
518
519        let result = project.next().unwrap().unwrap();
520        assert_eq!(result.column_count(), 3);
521        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
522        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
523        assert_eq!(
524            result.column(2).unwrap().get_value(0),
525            Some(Value::Bool(true))
526        );
527    }
528
529    #[test]
530    fn test_project_identity() {
531        // Select all columns in original order
532        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
533        builder.column_mut(0).unwrap().push_int64(10);
534        builder.column_mut(1).unwrap().push_string("test");
535        builder.advance_row();
536        let chunk = builder.finish();
537
538        let mock_scan = MockScanOperator {
539            chunks: vec![chunk],
540            position: 0,
541        };
542
543        let mut project = ProjectOperator::select_columns(
544            Box::new(mock_scan),
545            vec![0, 1],
546            vec![LogicalType::Int64, LogicalType::String],
547        );
548
549        let result = project.next().unwrap().unwrap();
550        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
551        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
552    }
553
554    #[test]
555    fn test_project_name() {
556        let mock_scan = MockScanOperator {
557            chunks: vec![],
558            position: 0,
559        };
560        let project =
561            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
562        assert_eq!(project.name(), "Project");
563    }
564
565    #[test]
566    fn test_project_node_resolve() {
567        // Create a store with a test node
568        let store = LpgStore::new().unwrap();
569        let node_id = store.create_node(&["Person"]);
570        store.set_node_property(node_id, "name", Value::String("Alix".into()));
571        store.set_node_property(node_id, "age", Value::Int64(30));
572
573        // Create input chunk with a NodeId column
574        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
575        builder.column_mut(0).unwrap().push_node_id(node_id);
576        builder.advance_row();
577        let chunk = builder.finish();
578
579        let mock_scan = MockScanOperator {
580            chunks: vec![chunk],
581            position: 0,
582        };
583
584        let mut project = ProjectOperator::with_store(
585            Box::new(mock_scan),
586            vec![ProjectExpr::NodeResolve { column: 0 }],
587            vec![LogicalType::Any],
588            Arc::new(store),
589        );
590
591        let result = project.next().unwrap().unwrap();
592        assert_eq!(result.column_count(), 1);
593
594        let value = result.column(0).unwrap().get_value(0).unwrap();
595        if let Value::Map(map) = value {
596            assert_eq!(
597                map.get(&PropertyKey::new("_id")),
598                Some(&Value::Int64(node_id.as_u64() as i64))
599            );
600            assert!(map.get(&PropertyKey::new("_labels")).is_some());
601            assert_eq!(
602                map.get(&PropertyKey::new("name")),
603                Some(&Value::String("Alix".into()))
604            );
605            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
606        } else {
607            panic!("Expected Value::Map, got {:?}", value);
608        }
609    }
610
611    #[test]
612    fn test_project_edge_resolve() {
613        let store = LpgStore::new().unwrap();
614        let src = store.create_node(&["Person"]);
615        let dst = store.create_node(&["Company"]);
616        let edge_id = store.create_edge(src, dst, "WORKS_AT");
617        store.set_edge_property(edge_id, "since", Value::Int64(2020));
618
619        // Create input chunk with an EdgeId column
620        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
621        builder.column_mut(0).unwrap().push_edge_id(edge_id);
622        builder.advance_row();
623        let chunk = builder.finish();
624
625        let mock_scan = MockScanOperator {
626            chunks: vec![chunk],
627            position: 0,
628        };
629
630        let mut project = ProjectOperator::with_store(
631            Box::new(mock_scan),
632            vec![ProjectExpr::EdgeResolve { column: 0 }],
633            vec![LogicalType::Any],
634            Arc::new(store),
635        );
636
637        let result = project.next().unwrap().unwrap();
638        let value = result.column(0).unwrap().get_value(0).unwrap();
639        if let Value::Map(map) = value {
640            assert_eq!(
641                map.get(&PropertyKey::new("_id")),
642                Some(&Value::Int64(edge_id.as_u64() as i64))
643            );
644            assert_eq!(
645                map.get(&PropertyKey::new("_type")),
646                Some(&Value::String("WORKS_AT".into()))
647            );
648            assert_eq!(
649                map.get(&PropertyKey::new("_source")),
650                Some(&Value::Int64(src.as_u64() as i64))
651            );
652            assert_eq!(
653                map.get(&PropertyKey::new("_target")),
654                Some(&Value::Int64(dst.as_u64() as i64))
655            );
656            assert_eq!(
657                map.get(&PropertyKey::new("since")),
658                Some(&Value::Int64(2020))
659            );
660        } else {
661            panic!("Expected Value::Map, got {:?}", value);
662        }
663    }
664
665    #[test]
666    fn test_project_resolve_missing_entity() {
667        use grafeo_common::types::NodeId;
668
669        let store = LpgStore::new().unwrap();
670
671        // Create input chunk with a NodeId that doesn't exist in the store
672        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
673        builder
674            .column_mut(0)
675            .unwrap()
676            .push_node_id(NodeId::new(999));
677        builder.advance_row();
678        let chunk = builder.finish();
679
680        let mock_scan = MockScanOperator {
681            chunks: vec![chunk],
682            position: 0,
683        };
684
685        let mut project = ProjectOperator::with_store(
686            Box::new(mock_scan),
687            vec![ProjectExpr::NodeResolve { column: 0 }],
688            vec![LogicalType::Any],
689            Arc::new(store),
690        );
691
692        let result = project.next().unwrap().unwrap();
693        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
694    }
695}