Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::lpg::{Edge, LpgStore, Node};
7use grafeo_common::types::{LogicalType, PropertyKey, Value};
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11/// A projection expression.
12pub enum ProjectExpr {
13    /// Reference to an input column.
14    Column(usize),
15    /// A constant value.
16    Constant(Value),
17    /// Property access on a node/edge column.
18    PropertyAccess {
19        /// The column containing the node or edge ID.
20        column: usize,
21        /// The property name to access.
22        property: String,
23    },
24    /// Edge type accessor (for type(r) function).
25    EdgeType {
26        /// The column containing the edge ID.
27        column: usize,
28    },
29    /// Full expression evaluation (for CASE WHEN, etc.).
30    Expression {
31        /// The filter expression to evaluate.
32        expr: FilterExpression,
33        /// Variable name to column index mapping.
34        variable_columns: HashMap<String, usize>,
35    },
36    /// Resolve a node ID column to a full node map with metadata and properties.
37    NodeResolve {
38        /// The column containing the node ID.
39        column: usize,
40    },
41    /// Resolve an edge ID column to a full edge map with metadata and properties.
42    EdgeResolve {
43        /// The column containing the edge ID.
44        column: usize,
45    },
46}
47
48/// A project operator that selects and transforms columns.
49pub struct ProjectOperator {
50    /// Child operator to read from.
51    child: Box<dyn Operator>,
52    /// Projection expressions.
53    projections: Vec<ProjectExpr>,
54    /// Output column types.
55    output_types: Vec<LogicalType>,
56    /// Optional store for property access.
57    store: Option<Arc<LpgStore>>,
58}
59
60impl ProjectOperator {
61    /// Creates a new project operator.
62    pub fn new(
63        child: Box<dyn Operator>,
64        projections: Vec<ProjectExpr>,
65        output_types: Vec<LogicalType>,
66    ) -> Self {
67        assert_eq!(projections.len(), output_types.len());
68        Self {
69            child,
70            projections,
71            output_types,
72            store: None,
73        }
74    }
75
76    /// Creates a new project operator with store access for property lookups.
77    pub fn with_store(
78        child: Box<dyn Operator>,
79        projections: Vec<ProjectExpr>,
80        output_types: Vec<LogicalType>,
81        store: Arc<LpgStore>,
82    ) -> Self {
83        assert_eq!(projections.len(), output_types.len());
84        Self {
85            child,
86            projections,
87            output_types,
88            store: Some(store),
89        }
90    }
91
92    /// Creates a project operator that selects specific columns.
93    pub fn select_columns(
94        child: Box<dyn Operator>,
95        columns: Vec<usize>,
96        types: Vec<LogicalType>,
97    ) -> Self {
98        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
99        Self::new(child, projections, types)
100    }
101}
102
103impl Operator for ProjectOperator {
104    fn next(&mut self) -> OperatorResult {
105        // Get next chunk from child
106        let Some(input) = self.child.next()? else {
107            return Ok(None);
108        };
109
110        // Create output chunk
111        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
112
113        // Evaluate each projection
114        for (i, proj) in self.projections.iter().enumerate() {
115            match proj {
116                ProjectExpr::Column(col_idx) => {
117                    // Copy column from input to output
118                    let input_col = input.column(*col_idx).ok_or_else(|| {
119                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
120                    })?;
121
122                    let output_col = output
123                        .column_mut(i)
124                        .expect("column exists: index matches projection schema");
125
126                    // Copy selected rows
127                    for row in input.selected_indices() {
128                        if let Some(value) = input_col.get_value(row) {
129                            output_col.push_value(value);
130                        }
131                    }
132                }
133                ProjectExpr::Constant(value) => {
134                    // Push constant for each row
135                    let output_col = output
136                        .column_mut(i)
137                        .expect("column exists: index matches projection schema");
138                    for _ in input.selected_indices() {
139                        output_col.push_value(value.clone());
140                    }
141                }
142                ProjectExpr::PropertyAccess { column, property } => {
143                    // Access property from node/edge in the specified column
144                    let input_col = input
145                        .column(*column)
146                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
147
148                    let output_col = output
149                        .column_mut(i)
150                        .expect("column exists: index matches projection schema");
151
152                    let store = self.store.as_ref().ok_or_else(|| {
153                        OperatorError::Execution("Store required for property access".to_string())
154                    })?;
155
156                    // Extract property for each row
157                    let prop_key = PropertyKey::new(property);
158                    for row in input.selected_indices() {
159                        // Try node ID, then edge ID, then map value (for UNWIND maps)
160                        let value = if let Some(node_id) = input_col.get_node_id(row) {
161                            store
162                                .get_node(node_id)
163                                .and_then(|node| node.get_property(property).cloned())
164                                .unwrap_or(Value::Null)
165                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
166                            store
167                                .get_edge(edge_id)
168                                .and_then(|edge| edge.get_property(property).cloned())
169                                .unwrap_or(Value::Null)
170                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
171                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
172                        } else {
173                            Value::Null
174                        };
175                        output_col.push_value(value);
176                    }
177                }
178                ProjectExpr::EdgeType { column } => {
179                    // Get edge type string from an edge column
180                    let input_col = input
181                        .column(*column)
182                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
183
184                    let output_col = output
185                        .column_mut(i)
186                        .expect("column exists: index matches projection schema");
187
188                    let store = self.store.as_ref().ok_or_else(|| {
189                        OperatorError::Execution("Store required for edge type access".to_string())
190                    })?;
191
192                    for row in input.selected_indices() {
193                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
194                            store.edge_type(edge_id).map_or(Value::Null, Value::String)
195                        } else {
196                            Value::Null
197                        };
198                        output_col.push_value(value);
199                    }
200                }
201                ProjectExpr::Expression {
202                    expr,
203                    variable_columns,
204                } => {
205                    let output_col = output
206                        .column_mut(i)
207                        .expect("column exists: index matches projection schema");
208
209                    let store = self.store.as_ref().ok_or_else(|| {
210                        OperatorError::Execution(
211                            "Store required for expression evaluation".to_string(),
212                        )
213                    })?;
214
215                    // Use the ExpressionPredicate for expression evaluation
216                    let evaluator = ExpressionPredicate::new(
217                        expr.clone(),
218                        variable_columns.clone(),
219                        Arc::clone(store),
220                    );
221
222                    for row in input.selected_indices() {
223                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
224                        output_col.push_value(value);
225                    }
226                }
227                ProjectExpr::NodeResolve { column } => {
228                    let input_col = input
229                        .column(*column)
230                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
231
232                    let output_col = output
233                        .column_mut(i)
234                        .expect("column exists: index matches projection schema");
235
236                    let store = self.store.as_ref().ok_or_else(|| {
237                        OperatorError::Execution("Store required for node resolution".to_string())
238                    })?;
239
240                    for row in input.selected_indices() {
241                        let value = if let Some(node_id) = input_col.get_node_id(row) {
242                            store
243                                .get_node(node_id)
244                                .map_or(Value::Null, |n| node_to_map(&n))
245                        } else {
246                            Value::Null
247                        };
248                        output_col.push_value(value);
249                    }
250                }
251                ProjectExpr::EdgeResolve { column } => {
252                    let input_col = input
253                        .column(*column)
254                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
255
256                    let output_col = output
257                        .column_mut(i)
258                        .expect("column exists: index matches projection schema");
259
260                    let store = self.store.as_ref().ok_or_else(|| {
261                        OperatorError::Execution("Store required for edge resolution".to_string())
262                    })?;
263
264                    for row in input.selected_indices() {
265                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
266                            store
267                                .get_edge(edge_id)
268                                .map_or(Value::Null, |e| edge_to_map(&e))
269                        } else {
270                            Value::Null
271                        };
272                        output_col.push_value(value);
273                    }
274                }
275            }
276        }
277
278        output.set_count(input.row_count());
279        Ok(Some(output))
280    }
281
282    fn reset(&mut self) {
283        self.child.reset();
284    }
285
286    fn name(&self) -> &'static str {
287        "Project"
288    }
289}
290
291/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
292///
293/// The map contains `_id` (integer), `_labels` (list of strings), and
294/// all node properties at the top level.
295fn node_to_map(node: &Node) -> Value {
296    let mut map = BTreeMap::new();
297    map.insert(
298        PropertyKey::new("_id"),
299        Value::Int64(node.id.as_u64() as i64),
300    );
301    let labels: Vec<Value> = node
302        .labels
303        .iter()
304        .map(|l| Value::String(l.clone()))
305        .collect();
306    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
307    for (key, value) in &node.properties {
308        map.insert(key.clone(), value.clone());
309    }
310    Value::Map(Arc::new(map))
311}
312
313/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
314///
315/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
316/// properties at the top level.
317fn edge_to_map(edge: &Edge) -> Value {
318    let mut map = BTreeMap::new();
319    map.insert(
320        PropertyKey::new("_id"),
321        Value::Int64(edge.id.as_u64() as i64),
322    );
323    map.insert(
324        PropertyKey::new("_type"),
325        Value::String(edge.edge_type.clone()),
326    );
327    map.insert(
328        PropertyKey::new("_source"),
329        Value::Int64(edge.src.as_u64() as i64),
330    );
331    map.insert(
332        PropertyKey::new("_target"),
333        Value::Int64(edge.dst.as_u64() as i64),
334    );
335    for (key, value) in &edge.properties {
336        map.insert(key.clone(), value.clone());
337    }
338    Value::Map(Arc::new(map))
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::execution::chunk::DataChunkBuilder;
345    use grafeo_common::types::Value;
346
347    struct MockScanOperator {
348        chunks: Vec<DataChunk>,
349        position: usize,
350    }
351
352    impl Operator for MockScanOperator {
353        fn next(&mut self) -> OperatorResult {
354            if self.position < self.chunks.len() {
355                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
356                self.position += 1;
357                Ok(Some(chunk))
358            } else {
359                Ok(None)
360            }
361        }
362
363        fn reset(&mut self) {
364            self.position = 0;
365        }
366
367        fn name(&self) -> &'static str {
368            "MockScan"
369        }
370    }
371
372    #[test]
373    fn test_project_select_columns() {
374        // Create input with 3 columns: [int, string, int]
375        let mut builder =
376            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
377
378        builder.column_mut(0).unwrap().push_int64(1);
379        builder.column_mut(1).unwrap().push_string("hello");
380        builder.column_mut(2).unwrap().push_int64(100);
381        builder.advance_row();
382
383        builder.column_mut(0).unwrap().push_int64(2);
384        builder.column_mut(1).unwrap().push_string("world");
385        builder.column_mut(2).unwrap().push_int64(200);
386        builder.advance_row();
387
388        let chunk = builder.finish();
389
390        let mock_scan = MockScanOperator {
391            chunks: vec![chunk],
392            position: 0,
393        };
394
395        // Project to select columns 2 and 0 (reordering)
396        let mut project = ProjectOperator::select_columns(
397            Box::new(mock_scan),
398            vec![2, 0],
399            vec![LogicalType::Int64, LogicalType::Int64],
400        );
401
402        let result = project.next().unwrap().unwrap();
403
404        assert_eq!(result.column_count(), 2);
405        assert_eq!(result.row_count(), 2);
406
407        // Check values are reordered
408        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
409        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
410    }
411
412    #[test]
413    fn test_project_constant() {
414        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
415        builder.column_mut(0).unwrap().push_int64(1);
416        builder.advance_row();
417        builder.column_mut(0).unwrap().push_int64(2);
418        builder.advance_row();
419
420        let chunk = builder.finish();
421
422        let mock_scan = MockScanOperator {
423            chunks: vec![chunk],
424            position: 0,
425        };
426
427        // Project with a constant
428        let mut project = ProjectOperator::new(
429            Box::new(mock_scan),
430            vec![
431                ProjectExpr::Column(0),
432                ProjectExpr::Constant(Value::String("constant".into())),
433            ],
434            vec![LogicalType::Int64, LogicalType::String],
435        );
436
437        let result = project.next().unwrap().unwrap();
438
439        assert_eq!(result.column_count(), 2);
440        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
441        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
442    }
443
444    #[test]
445    fn test_project_empty_input() {
446        let mock_scan = MockScanOperator {
447            chunks: vec![],
448            position: 0,
449        };
450
451        let mut project =
452            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
453
454        assert!(project.next().unwrap().is_none());
455    }
456
457    #[test]
458    fn test_project_column_not_found() {
459        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
460        builder.column_mut(0).unwrap().push_int64(1);
461        builder.advance_row();
462        let chunk = builder.finish();
463
464        let mock_scan = MockScanOperator {
465            chunks: vec![chunk],
466            position: 0,
467        };
468
469        // Reference column index 5 which doesn't exist
470        let mut project = ProjectOperator::new(
471            Box::new(mock_scan),
472            vec![ProjectExpr::Column(5)],
473            vec![LogicalType::Int64],
474        );
475
476        let result = project.next();
477        assert!(result.is_err(), "Should fail with ColumnNotFound");
478    }
479
480    #[test]
481    fn test_project_multiple_constants() {
482        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
483        builder.column_mut(0).unwrap().push_int64(1);
484        builder.advance_row();
485        let chunk = builder.finish();
486
487        let mock_scan = MockScanOperator {
488            chunks: vec![chunk],
489            position: 0,
490        };
491
492        let mut project = ProjectOperator::new(
493            Box::new(mock_scan),
494            vec![
495                ProjectExpr::Constant(Value::Int64(42)),
496                ProjectExpr::Constant(Value::String("fixed".into())),
497                ProjectExpr::Constant(Value::Bool(true)),
498            ],
499            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
500        );
501
502        let result = project.next().unwrap().unwrap();
503        assert_eq!(result.column_count(), 3);
504        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
505        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
506        assert_eq!(
507            result.column(2).unwrap().get_value(0),
508            Some(Value::Bool(true))
509        );
510    }
511
512    #[test]
513    fn test_project_identity() {
514        // Select all columns in original order
515        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
516        builder.column_mut(0).unwrap().push_int64(10);
517        builder.column_mut(1).unwrap().push_string("test");
518        builder.advance_row();
519        let chunk = builder.finish();
520
521        let mock_scan = MockScanOperator {
522            chunks: vec![chunk],
523            position: 0,
524        };
525
526        let mut project = ProjectOperator::select_columns(
527            Box::new(mock_scan),
528            vec![0, 1],
529            vec![LogicalType::Int64, LogicalType::String],
530        );
531
532        let result = project.next().unwrap().unwrap();
533        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
534        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
535    }
536
537    #[test]
538    fn test_project_name() {
539        let mock_scan = MockScanOperator {
540            chunks: vec![],
541            position: 0,
542        };
543        let project =
544            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
545        assert_eq!(project.name(), "Project");
546    }
547
548    #[test]
549    fn test_project_node_resolve() {
550        // Create a store with a test node
551        let store = LpgStore::new();
552        let node_id = store.create_node(&["Person"]);
553        store.set_node_property(node_id, "name", Value::String("Alice".into()));
554        store.set_node_property(node_id, "age", Value::Int64(30));
555
556        // Create input chunk with a NodeId column
557        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
558        builder.column_mut(0).unwrap().push_node_id(node_id);
559        builder.advance_row();
560        let chunk = builder.finish();
561
562        let mock_scan = MockScanOperator {
563            chunks: vec![chunk],
564            position: 0,
565        };
566
567        let mut project = ProjectOperator::with_store(
568            Box::new(mock_scan),
569            vec![ProjectExpr::NodeResolve { column: 0 }],
570            vec![LogicalType::Any],
571            Arc::new(store),
572        );
573
574        let result = project.next().unwrap().unwrap();
575        assert_eq!(result.column_count(), 1);
576
577        let value = result.column(0).unwrap().get_value(0).unwrap();
578        if let Value::Map(map) = value {
579            assert_eq!(
580                map.get(&PropertyKey::new("_id")),
581                Some(&Value::Int64(node_id.as_u64() as i64))
582            );
583            assert!(map.get(&PropertyKey::new("_labels")).is_some());
584            assert_eq!(
585                map.get(&PropertyKey::new("name")),
586                Some(&Value::String("Alice".into()))
587            );
588            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
589        } else {
590            panic!("Expected Value::Map, got {:?}", value);
591        }
592    }
593
594    #[test]
595    fn test_project_edge_resolve() {
596        let store = LpgStore::new();
597        let src = store.create_node(&["Person"]);
598        let dst = store.create_node(&["Company"]);
599        let edge_id = store.create_edge(src, dst, "WORKS_AT");
600        store.set_edge_property(edge_id, "since", Value::Int64(2020));
601
602        // Create input chunk with an EdgeId column
603        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
604        builder.column_mut(0).unwrap().push_edge_id(edge_id);
605        builder.advance_row();
606        let chunk = builder.finish();
607
608        let mock_scan = MockScanOperator {
609            chunks: vec![chunk],
610            position: 0,
611        };
612
613        let mut project = ProjectOperator::with_store(
614            Box::new(mock_scan),
615            vec![ProjectExpr::EdgeResolve { column: 0 }],
616            vec![LogicalType::Any],
617            Arc::new(store),
618        );
619
620        let result = project.next().unwrap().unwrap();
621        let value = result.column(0).unwrap().get_value(0).unwrap();
622        if let Value::Map(map) = value {
623            assert_eq!(
624                map.get(&PropertyKey::new("_id")),
625                Some(&Value::Int64(edge_id.as_u64() as i64))
626            );
627            assert_eq!(
628                map.get(&PropertyKey::new("_type")),
629                Some(&Value::String("WORKS_AT".into()))
630            );
631            assert_eq!(
632                map.get(&PropertyKey::new("_source")),
633                Some(&Value::Int64(src.as_u64() as i64))
634            );
635            assert_eq!(
636                map.get(&PropertyKey::new("_target")),
637                Some(&Value::Int64(dst.as_u64() as i64))
638            );
639            assert_eq!(
640                map.get(&PropertyKey::new("since")),
641                Some(&Value::Int64(2020))
642            );
643        } else {
644            panic!("Expected Value::Map, got {:?}", value);
645        }
646    }
647
648    #[test]
649    fn test_project_resolve_missing_entity() {
650        use grafeo_common::types::NodeId;
651
652        let store = LpgStore::new();
653
654        // Create input chunk with a NodeId that doesn't exist in the store
655        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
656        builder
657            .column_mut(0)
658            .unwrap()
659            .push_node_id(NodeId::new(999));
660        builder.advance_row();
661        let chunk = builder.finish();
662
663        let mock_scan = MockScanOperator {
664            chunks: vec![chunk],
665            position: 0,
666        };
667
668        let mut project = ProjectOperator::with_store(
669            Box::new(mock_scan),
670            vec![ProjectExpr::NodeResolve { column: 0 }],
671            vec![LogicalType::Any],
672            Arc::new(store),
673        );
674
675        let result = project.next().unwrap().unwrap();
676        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
677    }
678}