Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::lpg::{Edge, LpgStore, Node};
7use grafeo_common::types::{LogicalType, PropertyKey, Value};
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11/// A projection expression.
12pub enum ProjectExpr {
13    /// Reference to an input column.
14    Column(usize),
15    /// A constant value.
16    Constant(Value),
17    /// Property access on a node/edge column.
18    PropertyAccess {
19        /// The column containing the node or edge ID.
20        column: usize,
21        /// The property name to access.
22        property: String,
23    },
24    /// Edge type accessor (for type(r) function).
25    EdgeType {
26        /// The column containing the edge ID.
27        column: usize,
28    },
29    /// Full expression evaluation (for CASE WHEN, etc.).
30    Expression {
31        /// The filter expression to evaluate.
32        expr: FilterExpression,
33        /// Variable name to column index mapping.
34        variable_columns: HashMap<String, usize>,
35    },
36    /// Resolve a node ID column to a full node map with metadata and properties.
37    NodeResolve {
38        /// The column containing the node ID.
39        column: usize,
40    },
41    /// Resolve an edge ID column to a full edge map with metadata and properties.
42    EdgeResolve {
43        /// The column containing the edge ID.
44        column: usize,
45    },
46}
47
48/// A project operator that selects and transforms columns.
49pub struct ProjectOperator {
50    /// Child operator to read from.
51    child: Box<dyn Operator>,
52    /// Projection expressions.
53    projections: Vec<ProjectExpr>,
54    /// Output column types.
55    output_types: Vec<LogicalType>,
56    /// Optional store for property access.
57    store: Option<Arc<LpgStore>>,
58}
59
60impl ProjectOperator {
61    /// Creates a new project operator.
62    pub fn new(
63        child: Box<dyn Operator>,
64        projections: Vec<ProjectExpr>,
65        output_types: Vec<LogicalType>,
66    ) -> Self {
67        assert_eq!(projections.len(), output_types.len());
68        Self {
69            child,
70            projections,
71            output_types,
72            store: None,
73        }
74    }
75
76    /// Creates a new project operator with store access for property lookups.
77    pub fn with_store(
78        child: Box<dyn Operator>,
79        projections: Vec<ProjectExpr>,
80        output_types: Vec<LogicalType>,
81        store: Arc<LpgStore>,
82    ) -> Self {
83        assert_eq!(projections.len(), output_types.len());
84        Self {
85            child,
86            projections,
87            output_types,
88            store: Some(store),
89        }
90    }
91
92    /// Creates a project operator that selects specific columns.
93    pub fn select_columns(
94        child: Box<dyn Operator>,
95        columns: Vec<usize>,
96        types: Vec<LogicalType>,
97    ) -> Self {
98        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
99        Self::new(child, projections, types)
100    }
101}
102
103impl Operator for ProjectOperator {
104    fn next(&mut self) -> OperatorResult {
105        // Get next chunk from child
106        let Some(input) = self.child.next()? else {
107            return Ok(None);
108        };
109
110        // Create output chunk
111        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
112
113        // Evaluate each projection
114        for (i, proj) in self.projections.iter().enumerate() {
115            match proj {
116                ProjectExpr::Column(col_idx) => {
117                    // Copy column from input to output
118                    let input_col = input.column(*col_idx).ok_or_else(|| {
119                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
120                    })?;
121
122                    let output_col = output.column_mut(i).unwrap();
123
124                    // Copy selected rows
125                    for row in input.selected_indices() {
126                        if let Some(value) = input_col.get_value(row) {
127                            output_col.push_value(value);
128                        }
129                    }
130                }
131                ProjectExpr::Constant(value) => {
132                    // Push constant for each row
133                    let output_col = output.column_mut(i).unwrap();
134                    for _ in input.selected_indices() {
135                        output_col.push_value(value.clone());
136                    }
137                }
138                ProjectExpr::PropertyAccess { column, property } => {
139                    // Access property from node/edge in the specified column
140                    let input_col = input
141                        .column(*column)
142                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
143
144                    let output_col = output.column_mut(i).unwrap();
145
146                    let store = self.store.as_ref().ok_or_else(|| {
147                        OperatorError::Execution("Store required for property access".to_string())
148                    })?;
149
150                    // Extract property for each row
151                    let prop_key = PropertyKey::new(property);
152                    for row in input.selected_indices() {
153                        // Try node ID, then edge ID, then map value (for UNWIND maps)
154                        let value = if let Some(node_id) = input_col.get_node_id(row) {
155                            store
156                                .get_node(node_id)
157                                .and_then(|node| node.get_property(property).cloned())
158                                .unwrap_or(Value::Null)
159                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
160                            store
161                                .get_edge(edge_id)
162                                .and_then(|edge| edge.get_property(property).cloned())
163                                .unwrap_or(Value::Null)
164                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
165                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
166                        } else {
167                            Value::Null
168                        };
169                        output_col.push_value(value);
170                    }
171                }
172                ProjectExpr::EdgeType { column } => {
173                    // Get edge type string from an edge column
174                    let input_col = input
175                        .column(*column)
176                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178                    let output_col = output.column_mut(i).unwrap();
179
180                    let store = self.store.as_ref().ok_or_else(|| {
181                        OperatorError::Execution("Store required for edge type access".to_string())
182                    })?;
183
184                    for row in input.selected_indices() {
185                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
186                            store.edge_type(edge_id).map_or(Value::Null, Value::String)
187                        } else {
188                            Value::Null
189                        };
190                        output_col.push_value(value);
191                    }
192                }
193                ProjectExpr::Expression {
194                    expr,
195                    variable_columns,
196                } => {
197                    let output_col = output.column_mut(i).unwrap();
198
199                    let store = self.store.as_ref().ok_or_else(|| {
200                        OperatorError::Execution(
201                            "Store required for expression evaluation".to_string(),
202                        )
203                    })?;
204
205                    // Use the ExpressionPredicate for expression evaluation
206                    let evaluator = ExpressionPredicate::new(
207                        expr.clone(),
208                        variable_columns.clone(),
209                        Arc::clone(store),
210                    );
211
212                    for row in input.selected_indices() {
213                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
214                        output_col.push_value(value);
215                    }
216                }
217                ProjectExpr::NodeResolve { column } => {
218                    let input_col = input
219                        .column(*column)
220                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
221
222                    let output_col = output.column_mut(i).unwrap();
223
224                    let store = self.store.as_ref().ok_or_else(|| {
225                        OperatorError::Execution("Store required for node resolution".to_string())
226                    })?;
227
228                    for row in input.selected_indices() {
229                        let value = if let Some(node_id) = input_col.get_node_id(row) {
230                            store
231                                .get_node(node_id)
232                                .map_or(Value::Null, |n| node_to_map(&n))
233                        } else {
234                            Value::Null
235                        };
236                        output_col.push_value(value);
237                    }
238                }
239                ProjectExpr::EdgeResolve { column } => {
240                    let input_col = input
241                        .column(*column)
242                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
243
244                    let output_col = output.column_mut(i).unwrap();
245
246                    let store = self.store.as_ref().ok_or_else(|| {
247                        OperatorError::Execution("Store required for edge resolution".to_string())
248                    })?;
249
250                    for row in input.selected_indices() {
251                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
252                            store
253                                .get_edge(edge_id)
254                                .map_or(Value::Null, |e| edge_to_map(&e))
255                        } else {
256                            Value::Null
257                        };
258                        output_col.push_value(value);
259                    }
260                }
261            }
262        }
263
264        output.set_count(input.row_count());
265        Ok(Some(output))
266    }
267
268    fn reset(&mut self) {
269        self.child.reset();
270    }
271
272    fn name(&self) -> &'static str {
273        "Project"
274    }
275}
276
277/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
278///
279/// The map contains `_id` (integer), `_labels` (list of strings), and
280/// all node properties at the top level.
281fn node_to_map(node: &Node) -> Value {
282    let mut map = BTreeMap::new();
283    map.insert(
284        PropertyKey::new("_id"),
285        Value::Int64(node.id.as_u64() as i64),
286    );
287    let labels: Vec<Value> = node
288        .labels
289        .iter()
290        .map(|l| Value::String(l.clone()))
291        .collect();
292    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
293    for (key, value) in &node.properties {
294        map.insert(key.clone(), value.clone());
295    }
296    Value::Map(Arc::new(map))
297}
298
299/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
300///
301/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
302/// properties at the top level.
303fn edge_to_map(edge: &Edge) -> Value {
304    let mut map = BTreeMap::new();
305    map.insert(
306        PropertyKey::new("_id"),
307        Value::Int64(edge.id.as_u64() as i64),
308    );
309    map.insert(
310        PropertyKey::new("_type"),
311        Value::String(edge.edge_type.clone()),
312    );
313    map.insert(
314        PropertyKey::new("_source"),
315        Value::Int64(edge.src.as_u64() as i64),
316    );
317    map.insert(
318        PropertyKey::new("_target"),
319        Value::Int64(edge.dst.as_u64() as i64),
320    );
321    for (key, value) in &edge.properties {
322        map.insert(key.clone(), value.clone());
323    }
324    Value::Map(Arc::new(map))
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::execution::chunk::DataChunkBuilder;
331    use grafeo_common::types::Value;
332
333    struct MockScanOperator {
334        chunks: Vec<DataChunk>,
335        position: usize,
336    }
337
338    impl Operator for MockScanOperator {
339        fn next(&mut self) -> OperatorResult {
340            if self.position < self.chunks.len() {
341                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
342                self.position += 1;
343                Ok(Some(chunk))
344            } else {
345                Ok(None)
346            }
347        }
348
349        fn reset(&mut self) {
350            self.position = 0;
351        }
352
353        fn name(&self) -> &'static str {
354            "MockScan"
355        }
356    }
357
358    #[test]
359    fn test_project_select_columns() {
360        // Create input with 3 columns: [int, string, int]
361        let mut builder =
362            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
363
364        builder.column_mut(0).unwrap().push_int64(1);
365        builder.column_mut(1).unwrap().push_string("hello");
366        builder.column_mut(2).unwrap().push_int64(100);
367        builder.advance_row();
368
369        builder.column_mut(0).unwrap().push_int64(2);
370        builder.column_mut(1).unwrap().push_string("world");
371        builder.column_mut(2).unwrap().push_int64(200);
372        builder.advance_row();
373
374        let chunk = builder.finish();
375
376        let mock_scan = MockScanOperator {
377            chunks: vec![chunk],
378            position: 0,
379        };
380
381        // Project to select columns 2 and 0 (reordering)
382        let mut project = ProjectOperator::select_columns(
383            Box::new(mock_scan),
384            vec![2, 0],
385            vec![LogicalType::Int64, LogicalType::Int64],
386        );
387
388        let result = project.next().unwrap().unwrap();
389
390        assert_eq!(result.column_count(), 2);
391        assert_eq!(result.row_count(), 2);
392
393        // Check values are reordered
394        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
395        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
396    }
397
398    #[test]
399    fn test_project_constant() {
400        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
401        builder.column_mut(0).unwrap().push_int64(1);
402        builder.advance_row();
403        builder.column_mut(0).unwrap().push_int64(2);
404        builder.advance_row();
405
406        let chunk = builder.finish();
407
408        let mock_scan = MockScanOperator {
409            chunks: vec![chunk],
410            position: 0,
411        };
412
413        // Project with a constant
414        let mut project = ProjectOperator::new(
415            Box::new(mock_scan),
416            vec![
417                ProjectExpr::Column(0),
418                ProjectExpr::Constant(Value::String("constant".into())),
419            ],
420            vec![LogicalType::Int64, LogicalType::String],
421        );
422
423        let result = project.next().unwrap().unwrap();
424
425        assert_eq!(result.column_count(), 2);
426        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
427        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
428    }
429
430    #[test]
431    fn test_project_empty_input() {
432        let mock_scan = MockScanOperator {
433            chunks: vec![],
434            position: 0,
435        };
436
437        let mut project =
438            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
439
440        assert!(project.next().unwrap().is_none());
441    }
442
443    #[test]
444    fn test_project_column_not_found() {
445        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
446        builder.column_mut(0).unwrap().push_int64(1);
447        builder.advance_row();
448        let chunk = builder.finish();
449
450        let mock_scan = MockScanOperator {
451            chunks: vec![chunk],
452            position: 0,
453        };
454
455        // Reference column index 5 which doesn't exist
456        let mut project = ProjectOperator::new(
457            Box::new(mock_scan),
458            vec![ProjectExpr::Column(5)],
459            vec![LogicalType::Int64],
460        );
461
462        let result = project.next();
463        assert!(result.is_err(), "Should fail with ColumnNotFound");
464    }
465
466    #[test]
467    fn test_project_multiple_constants() {
468        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
469        builder.column_mut(0).unwrap().push_int64(1);
470        builder.advance_row();
471        let chunk = builder.finish();
472
473        let mock_scan = MockScanOperator {
474            chunks: vec![chunk],
475            position: 0,
476        };
477
478        let mut project = ProjectOperator::new(
479            Box::new(mock_scan),
480            vec![
481                ProjectExpr::Constant(Value::Int64(42)),
482                ProjectExpr::Constant(Value::String("fixed".into())),
483                ProjectExpr::Constant(Value::Bool(true)),
484            ],
485            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
486        );
487
488        let result = project.next().unwrap().unwrap();
489        assert_eq!(result.column_count(), 3);
490        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
491        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
492        assert_eq!(
493            result.column(2).unwrap().get_value(0),
494            Some(Value::Bool(true))
495        );
496    }
497
498    #[test]
499    fn test_project_identity() {
500        // Select all columns in original order
501        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
502        builder.column_mut(0).unwrap().push_int64(10);
503        builder.column_mut(1).unwrap().push_string("test");
504        builder.advance_row();
505        let chunk = builder.finish();
506
507        let mock_scan = MockScanOperator {
508            chunks: vec![chunk],
509            position: 0,
510        };
511
512        let mut project = ProjectOperator::select_columns(
513            Box::new(mock_scan),
514            vec![0, 1],
515            vec![LogicalType::Int64, LogicalType::String],
516        );
517
518        let result = project.next().unwrap().unwrap();
519        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
520        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
521    }
522
523    #[test]
524    fn test_project_name() {
525        let mock_scan = MockScanOperator {
526            chunks: vec![],
527            position: 0,
528        };
529        let project =
530            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
531        assert_eq!(project.name(), "Project");
532    }
533
534    #[test]
535    fn test_project_node_resolve() {
536        // Create a store with a test node
537        let store = LpgStore::new();
538        let node_id = store.create_node(&["Person"]);
539        store.set_node_property(node_id, "name", Value::String("Alice".into()));
540        store.set_node_property(node_id, "age", Value::Int64(30));
541
542        // Create input chunk with a NodeId column
543        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
544        builder.column_mut(0).unwrap().push_node_id(node_id);
545        builder.advance_row();
546        let chunk = builder.finish();
547
548        let mock_scan = MockScanOperator {
549            chunks: vec![chunk],
550            position: 0,
551        };
552
553        let mut project = ProjectOperator::with_store(
554            Box::new(mock_scan),
555            vec![ProjectExpr::NodeResolve { column: 0 }],
556            vec![LogicalType::Any],
557            Arc::new(store),
558        );
559
560        let result = project.next().unwrap().unwrap();
561        assert_eq!(result.column_count(), 1);
562
563        let value = result.column(0).unwrap().get_value(0).unwrap();
564        if let Value::Map(map) = value {
565            assert_eq!(
566                map.get(&PropertyKey::new("_id")),
567                Some(&Value::Int64(node_id.as_u64() as i64))
568            );
569            assert!(map.get(&PropertyKey::new("_labels")).is_some());
570            assert_eq!(
571                map.get(&PropertyKey::new("name")),
572                Some(&Value::String("Alice".into()))
573            );
574            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
575        } else {
576            panic!("Expected Value::Map, got {:?}", value);
577        }
578    }
579
580    #[test]
581    fn test_project_edge_resolve() {
582        let store = LpgStore::new();
583        let src = store.create_node(&["Person"]);
584        let dst = store.create_node(&["Company"]);
585        let edge_id = store.create_edge(src, dst, "WORKS_AT");
586        store.set_edge_property(edge_id, "since", Value::Int64(2020));
587
588        // Create input chunk with an EdgeId column
589        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
590        builder.column_mut(0).unwrap().push_edge_id(edge_id);
591        builder.advance_row();
592        let chunk = builder.finish();
593
594        let mock_scan = MockScanOperator {
595            chunks: vec![chunk],
596            position: 0,
597        };
598
599        let mut project = ProjectOperator::with_store(
600            Box::new(mock_scan),
601            vec![ProjectExpr::EdgeResolve { column: 0 }],
602            vec![LogicalType::Any],
603            Arc::new(store),
604        );
605
606        let result = project.next().unwrap().unwrap();
607        let value = result.column(0).unwrap().get_value(0).unwrap();
608        if let Value::Map(map) = value {
609            assert_eq!(
610                map.get(&PropertyKey::new("_id")),
611                Some(&Value::Int64(edge_id.as_u64() as i64))
612            );
613            assert_eq!(
614                map.get(&PropertyKey::new("_type")),
615                Some(&Value::String("WORKS_AT".into()))
616            );
617            assert_eq!(
618                map.get(&PropertyKey::new("_source")),
619                Some(&Value::Int64(src.as_u64() as i64))
620            );
621            assert_eq!(
622                map.get(&PropertyKey::new("_target")),
623                Some(&Value::Int64(dst.as_u64() as i64))
624            );
625            assert_eq!(
626                map.get(&PropertyKey::new("since")),
627                Some(&Value::Int64(2020))
628            );
629        } else {
630            panic!("Expected Value::Map, got {:?}", value);
631        }
632    }
633
634    #[test]
635    fn test_project_resolve_missing_entity() {
636        use grafeo_common::types::NodeId;
637
638        let store = LpgStore::new();
639
640        // Create input chunk with a NodeId that doesn't exist in the store
641        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
642        builder
643            .column_mut(0)
644            .unwrap()
645            .push_node_id(NodeId::new(999));
646        builder.advance_row();
647        let chunk = builder.finish();
648
649        let mock_scan = MockScanOperator {
650            chunks: vec![chunk],
651            position: 0,
652        };
653
654        let mut project = ProjectOperator::with_store(
655            Box::new(mock_scan),
656            vec![ProjectExpr::NodeResolve { column: 0 }],
657            vec![LogicalType::Any],
658            Arc::new(store),
659        );
660
661        let result = project.next().unwrap().unwrap();
662        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
663    }
664}