Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47    /// Returns the first non-null value from two columns (used for RIGHT/FULL join dedup).
48    Coalesce {
49        /// Primary column index.
50        first: usize,
51        /// Fallback column index.
52        second: usize,
53    },
54}
55
56/// A project operator that selects and transforms columns.
57pub struct ProjectOperator {
58    /// Child operator to read from.
59    child: Box<dyn Operator>,
60    /// Projection expressions.
61    projections: Vec<ProjectExpr>,
62    /// Output column types.
63    output_types: Vec<LogicalType>,
64    /// Optional store for property access.
65    store: Option<Arc<dyn GraphStore>>,
66    /// Transaction ID for MVCC-aware property lookups.
67    transaction_id: Option<TransactionId>,
68    /// Viewing epoch for MVCC-aware property lookups.
69    viewing_epoch: Option<EpochId>,
70    /// Session context for introspection functions in expression evaluation.
71    session_context: SessionContext,
72}
73
74impl ProjectOperator {
75    /// Creates a new project operator.
76    pub fn new(
77        child: Box<dyn Operator>,
78        projections: Vec<ProjectExpr>,
79        output_types: Vec<LogicalType>,
80    ) -> Self {
81        assert_eq!(projections.len(), output_types.len());
82        Self {
83            child,
84            projections,
85            output_types,
86            store: None,
87            transaction_id: None,
88            viewing_epoch: None,
89            session_context: SessionContext::default(),
90        }
91    }
92
93    /// Creates a new project operator with store access for property lookups.
94    pub fn with_store(
95        child: Box<dyn Operator>,
96        projections: Vec<ProjectExpr>,
97        output_types: Vec<LogicalType>,
98        store: Arc<dyn GraphStore>,
99    ) -> Self {
100        assert_eq!(projections.len(), output_types.len());
101        Self {
102            child,
103            projections,
104            output_types,
105            store: Some(store),
106            transaction_id: None,
107            viewing_epoch: None,
108            session_context: SessionContext::default(),
109        }
110    }
111
112    /// Sets the transaction context for MVCC-aware property lookups.
113    pub fn with_transaction_context(
114        mut self,
115        epoch: EpochId,
116        transaction_id: Option<TransactionId>,
117    ) -> Self {
118        self.viewing_epoch = Some(epoch);
119        self.transaction_id = transaction_id;
120        self
121    }
122
123    /// Sets the session context for introspection functions.
124    pub fn with_session_context(mut self, context: SessionContext) -> Self {
125        self.session_context = context;
126        self
127    }
128
129    /// Creates a project operator that selects specific columns.
130    pub fn select_columns(
131        child: Box<dyn Operator>,
132        columns: Vec<usize>,
133        types: Vec<LogicalType>,
134    ) -> Self {
135        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
136        Self::new(child, projections, types)
137    }
138}
139
140impl Operator for ProjectOperator {
141    fn next(&mut self) -> OperatorResult {
142        // Get next chunk from child
143        let Some(input) = self.child.next()? else {
144            return Ok(None);
145        };
146
147        // Create output chunk
148        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
149
150        // Evaluate each projection
151        for (i, proj) in self.projections.iter().enumerate() {
152            match proj {
153                ProjectExpr::Column(col_idx) => {
154                    // Copy column from input to output
155                    let input_col = input.column(*col_idx).ok_or_else(|| {
156                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
157                    })?;
158
159                    let output_col = output
160                        .column_mut(i)
161                        .expect("column exists: index matches projection schema");
162
163                    // Copy selected rows
164                    for row in input.selected_indices() {
165                        if let Some(value) = input_col.get_value(row) {
166                            output_col.push_value(value);
167                        }
168                    }
169                }
170                ProjectExpr::Constant(value) => {
171                    // Push constant for each row
172                    let output_col = output
173                        .column_mut(i)
174                        .expect("column exists: index matches projection schema");
175                    for _ in input.selected_indices() {
176                        output_col.push_value(value.clone());
177                    }
178                }
179                ProjectExpr::PropertyAccess { column, property } => {
180                    // Access property from node/edge in the specified column
181                    let input_col = input
182                        .column(*column)
183                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
184
185                    let output_col = output
186                        .column_mut(i)
187                        .expect("column exists: index matches projection schema");
188
189                    let store = self.store.as_ref().ok_or_else(|| {
190                        OperatorError::Execution("Store required for property access".to_string())
191                    })?;
192
193                    // Extract property for each row.
194                    // For typed columns (VectorData::NodeId / EdgeId) there is
195                    // no ambiguity. For Generic/Any columns (e.g. after a hash
196                    // join), both get_node_id and get_edge_id can succeed on the
197                    // same Int64 value, so we verify against the store to resolve
198                    // the entity type.
199                    let prop_key = PropertyKey::new(property);
200                    let epoch = self.viewing_epoch;
201                    let tx_id = self.transaction_id;
202                    for row in input.selected_indices() {
203                        let value = if let Some(node_id) = input_col.get_node_id(row) {
204                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
205                                store.get_node_versioned(node_id, ep, tx)
206                            } else if let Some(ep) = epoch {
207                                store.get_node_at_epoch(node_id, ep)
208                            } else {
209                                store.get_node(node_id)
210                            };
211                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
212                            {
213                                prop
214                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
215                                // Node lookup failed: the ID may belong to an
216                                // edge (common with Generic columns after joins).
217                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
218                                    store.get_edge_versioned(edge_id, ep, tx)
219                                } else if let Some(ep) = epoch {
220                                    store.get_edge_at_epoch(edge_id, ep)
221                                } else {
222                                    store.get_edge(edge_id)
223                                };
224                                edge.and_then(|e| e.get_property(property).cloned())
225                                    .unwrap_or(Value::Null)
226                            } else {
227                                Value::Null
228                            }
229                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
230                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
231                                store.get_edge_versioned(edge_id, ep, tx)
232                            } else if let Some(ep) = epoch {
233                                store.get_edge_at_epoch(edge_id, ep)
234                            } else {
235                                store.get_edge(edge_id)
236                            };
237                            edge.and_then(|e| e.get_property(property).cloned())
238                                .unwrap_or(Value::Null)
239                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
240                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
241                        } else {
242                            Value::Null
243                        };
244                        output_col.push_value(value);
245                    }
246                }
247                ProjectExpr::EdgeType { column } => {
248                    // Get edge type string from an edge column
249                    let input_col = input
250                        .column(*column)
251                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
252
253                    let output_col = output
254                        .column_mut(i)
255                        .expect("column exists: index matches projection schema");
256
257                    let store = self.store.as_ref().ok_or_else(|| {
258                        OperatorError::Execution("Store required for edge type access".to_string())
259                    })?;
260
261                    let epoch = self.viewing_epoch;
262                    let tx_id = self.transaction_id;
263                    for row in input.selected_indices() {
264                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
265                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
266                                store.edge_type_versioned(edge_id, ep, tx)
267                            } else {
268                                store.edge_type(edge_id)
269                            };
270                            etype.map_or(Value::Null, Value::String)
271                        } else {
272                            Value::Null
273                        };
274                        output_col.push_value(value);
275                    }
276                }
277                ProjectExpr::Expression {
278                    expr,
279                    variable_columns,
280                } => {
281                    let output_col = output
282                        .column_mut(i)
283                        .expect("column exists: index matches projection schema");
284
285                    let store = self.store.as_ref().ok_or_else(|| {
286                        OperatorError::Execution(
287                            "Store required for expression evaluation".to_string(),
288                        )
289                    })?;
290
291                    // Use the ExpressionPredicate for expression evaluation
292                    let mut evaluator = ExpressionPredicate::new(
293                        expr.clone(),
294                        variable_columns.clone(),
295                        Arc::clone(store),
296                    )
297                    .with_session_context(self.session_context.clone());
298                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
299                        evaluator = evaluator.with_transaction_context(ep, tx_id);
300                    }
301
302                    for row in input.selected_indices() {
303                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
304                        output_col.push_value(value);
305                    }
306                }
307                ProjectExpr::NodeResolve { column } => {
308                    let input_col = input
309                        .column(*column)
310                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
311
312                    let output_col = output
313                        .column_mut(i)
314                        .expect("column exists: index matches projection schema");
315
316                    let store = self.store.as_ref().ok_or_else(|| {
317                        OperatorError::Execution("Store required for node resolution".to_string())
318                    })?;
319
320                    let epoch = self.viewing_epoch;
321                    let tx_id = self.transaction_id;
322                    for row in input.selected_indices() {
323                        let value = if let Some(node_id) = input_col.get_node_id(row) {
324                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
325                                store.get_node_versioned(node_id, ep, tx)
326                            } else if let Some(ep) = epoch {
327                                store.get_node_at_epoch(node_id, ep)
328                            } else {
329                                store.get_node(node_id)
330                            };
331                            node.map_or(Value::Null, |n| node_to_map(&n))
332                        } else {
333                            Value::Null
334                        };
335                        output_col.push_value(value);
336                    }
337                }
338                ProjectExpr::EdgeResolve { column } => {
339                    let input_col = input
340                        .column(*column)
341                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
342
343                    let output_col = output
344                        .column_mut(i)
345                        .expect("column exists: index matches projection schema");
346
347                    let store = self.store.as_ref().ok_or_else(|| {
348                        OperatorError::Execution("Store required for edge resolution".to_string())
349                    })?;
350
351                    let epoch = self.viewing_epoch;
352                    let tx_id = self.transaction_id;
353                    for row in input.selected_indices() {
354                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
355                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
356                                store.get_edge_versioned(edge_id, ep, tx)
357                            } else if let Some(ep) = epoch {
358                                store.get_edge_at_epoch(edge_id, ep)
359                            } else {
360                                store.get_edge(edge_id)
361                            };
362                            edge.map_or(Value::Null, |e| edge_to_map(&e))
363                        } else {
364                            Value::Null
365                        };
366                        output_col.push_value(value);
367                    }
368                }
369                ProjectExpr::Coalesce { first, second } => {
370                    let first_col = input
371                        .column(*first)
372                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
373                    let second_col = input
374                        .column(*second)
375                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
376
377                    let output_col = output
378                        .column_mut(i)
379                        .expect("column exists: index matches projection schema");
380
381                    for row in input.selected_indices() {
382                        let value = match first_col.get_value(row) {
383                            Some(Value::Null) | None => {
384                                second_col.get_value(row).unwrap_or(Value::Null)
385                            }
386                            Some(v) => v,
387                        };
388                        output_col.push_value(value);
389                    }
390                }
391            }
392        }
393
394        output.set_count(input.row_count());
395        Ok(Some(output))
396    }
397
398    fn reset(&mut self) {
399        self.child.reset();
400    }
401
402    fn name(&self) -> &'static str {
403        "Project"
404    }
405}
406
407/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
408///
409/// The map contains `_id` (integer), `_labels` (list of strings), and
410/// all node properties at the top level.
411fn node_to_map(node: &Node) -> Value {
412    let mut map = BTreeMap::new();
413    map.insert(
414        PropertyKey::new("_id"),
415        Value::Int64(node.id.as_u64() as i64),
416    );
417    let labels: Vec<Value> = node
418        .labels
419        .iter()
420        .map(|l| Value::String(l.clone()))
421        .collect();
422    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
423    for (key, value) in &node.properties {
424        map.insert(key.clone(), value.clone());
425    }
426    Value::Map(Arc::new(map))
427}
428
429/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
430///
431/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
432/// properties at the top level.
433fn edge_to_map(edge: &Edge) -> Value {
434    let mut map = BTreeMap::new();
435    map.insert(
436        PropertyKey::new("_id"),
437        Value::Int64(edge.id.as_u64() as i64),
438    );
439    map.insert(
440        PropertyKey::new("_type"),
441        Value::String(edge.edge_type.clone()),
442    );
443    map.insert(
444        PropertyKey::new("_source"),
445        Value::Int64(edge.src.as_u64() as i64),
446    );
447    map.insert(
448        PropertyKey::new("_target"),
449        Value::Int64(edge.dst.as_u64() as i64),
450    );
451    for (key, value) in &edge.properties {
452        map.insert(key.clone(), value.clone());
453    }
454    Value::Map(Arc::new(map))
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::execution::chunk::DataChunkBuilder;
461    use crate::graph::lpg::LpgStore;
462    use grafeo_common::types::Value;
463
464    struct MockScanOperator {
465        chunks: Vec<DataChunk>,
466        position: usize,
467    }
468
469    impl Operator for MockScanOperator {
470        fn next(&mut self) -> OperatorResult {
471            if self.position < self.chunks.len() {
472                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
473                self.position += 1;
474                Ok(Some(chunk))
475            } else {
476                Ok(None)
477            }
478        }
479
480        fn reset(&mut self) {
481            self.position = 0;
482        }
483
484        fn name(&self) -> &'static str {
485            "MockScan"
486        }
487    }
488
489    #[test]
490    fn test_project_select_columns() {
491        // Create input with 3 columns: [int, string, int]
492        let mut builder =
493            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
494
495        builder.column_mut(0).unwrap().push_int64(1);
496        builder.column_mut(1).unwrap().push_string("hello");
497        builder.column_mut(2).unwrap().push_int64(100);
498        builder.advance_row();
499
500        builder.column_mut(0).unwrap().push_int64(2);
501        builder.column_mut(1).unwrap().push_string("world");
502        builder.column_mut(2).unwrap().push_int64(200);
503        builder.advance_row();
504
505        let chunk = builder.finish();
506
507        let mock_scan = MockScanOperator {
508            chunks: vec![chunk],
509            position: 0,
510        };
511
512        // Project to select columns 2 and 0 (reordering)
513        let mut project = ProjectOperator::select_columns(
514            Box::new(mock_scan),
515            vec![2, 0],
516            vec![LogicalType::Int64, LogicalType::Int64],
517        );
518
519        let result = project.next().unwrap().unwrap();
520
521        assert_eq!(result.column_count(), 2);
522        assert_eq!(result.row_count(), 2);
523
524        // Check values are reordered
525        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
526        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
527    }
528
529    #[test]
530    fn test_project_constant() {
531        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
532        builder.column_mut(0).unwrap().push_int64(1);
533        builder.advance_row();
534        builder.column_mut(0).unwrap().push_int64(2);
535        builder.advance_row();
536
537        let chunk = builder.finish();
538
539        let mock_scan = MockScanOperator {
540            chunks: vec![chunk],
541            position: 0,
542        };
543
544        // Project with a constant
545        let mut project = ProjectOperator::new(
546            Box::new(mock_scan),
547            vec![
548                ProjectExpr::Column(0),
549                ProjectExpr::Constant(Value::String("constant".into())),
550            ],
551            vec![LogicalType::Int64, LogicalType::String],
552        );
553
554        let result = project.next().unwrap().unwrap();
555
556        assert_eq!(result.column_count(), 2);
557        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
558        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
559    }
560
561    #[test]
562    fn test_project_empty_input() {
563        let mock_scan = MockScanOperator {
564            chunks: vec![],
565            position: 0,
566        };
567
568        let mut project =
569            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
570
571        assert!(project.next().unwrap().is_none());
572    }
573
574    #[test]
575    fn test_project_column_not_found() {
576        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
577        builder.column_mut(0).unwrap().push_int64(1);
578        builder.advance_row();
579        let chunk = builder.finish();
580
581        let mock_scan = MockScanOperator {
582            chunks: vec![chunk],
583            position: 0,
584        };
585
586        // Reference column index 5 which doesn't exist
587        let mut project = ProjectOperator::new(
588            Box::new(mock_scan),
589            vec![ProjectExpr::Column(5)],
590            vec![LogicalType::Int64],
591        );
592
593        let result = project.next();
594        assert!(result.is_err(), "Should fail with ColumnNotFound");
595    }
596
597    #[test]
598    fn test_project_multiple_constants() {
599        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
600        builder.column_mut(0).unwrap().push_int64(1);
601        builder.advance_row();
602        let chunk = builder.finish();
603
604        let mock_scan = MockScanOperator {
605            chunks: vec![chunk],
606            position: 0,
607        };
608
609        let mut project = ProjectOperator::new(
610            Box::new(mock_scan),
611            vec![
612                ProjectExpr::Constant(Value::Int64(42)),
613                ProjectExpr::Constant(Value::String("fixed".into())),
614                ProjectExpr::Constant(Value::Bool(true)),
615            ],
616            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
617        );
618
619        let result = project.next().unwrap().unwrap();
620        assert_eq!(result.column_count(), 3);
621        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
622        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
623        assert_eq!(
624            result.column(2).unwrap().get_value(0),
625            Some(Value::Bool(true))
626        );
627    }
628
629    #[test]
630    fn test_project_identity() {
631        // Select all columns in original order
632        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
633        builder.column_mut(0).unwrap().push_int64(10);
634        builder.column_mut(1).unwrap().push_string("test");
635        builder.advance_row();
636        let chunk = builder.finish();
637
638        let mock_scan = MockScanOperator {
639            chunks: vec![chunk],
640            position: 0,
641        };
642
643        let mut project = ProjectOperator::select_columns(
644            Box::new(mock_scan),
645            vec![0, 1],
646            vec![LogicalType::Int64, LogicalType::String],
647        );
648
649        let result = project.next().unwrap().unwrap();
650        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
651        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
652    }
653
654    #[test]
655    fn test_project_name() {
656        let mock_scan = MockScanOperator {
657            chunks: vec![],
658            position: 0,
659        };
660        let project =
661            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
662        assert_eq!(project.name(), "Project");
663    }
664
665    #[test]
666    fn test_project_node_resolve() {
667        // Create a store with a test node
668        let store = LpgStore::new().unwrap();
669        let node_id = store.create_node(&["Person"]);
670        store.set_node_property(node_id, "name", Value::String("Alix".into()));
671        store.set_node_property(node_id, "age", Value::Int64(30));
672
673        // Create input chunk with a NodeId column
674        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
675        builder.column_mut(0).unwrap().push_node_id(node_id);
676        builder.advance_row();
677        let chunk = builder.finish();
678
679        let mock_scan = MockScanOperator {
680            chunks: vec![chunk],
681            position: 0,
682        };
683
684        let mut project = ProjectOperator::with_store(
685            Box::new(mock_scan),
686            vec![ProjectExpr::NodeResolve { column: 0 }],
687            vec![LogicalType::Any],
688            Arc::new(store),
689        );
690
691        let result = project.next().unwrap().unwrap();
692        assert_eq!(result.column_count(), 1);
693
694        let value = result.column(0).unwrap().get_value(0).unwrap();
695        if let Value::Map(map) = value {
696            assert_eq!(
697                map.get(&PropertyKey::new("_id")),
698                Some(&Value::Int64(node_id.as_u64() as i64))
699            );
700            assert!(map.get(&PropertyKey::new("_labels")).is_some());
701            assert_eq!(
702                map.get(&PropertyKey::new("name")),
703                Some(&Value::String("Alix".into()))
704            );
705            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
706        } else {
707            panic!("Expected Value::Map, got {:?}", value);
708        }
709    }
710
711    #[test]
712    fn test_project_edge_resolve() {
713        let store = LpgStore::new().unwrap();
714        let src = store.create_node(&["Person"]);
715        let dst = store.create_node(&["Company"]);
716        let edge_id = store.create_edge(src, dst, "WORKS_AT");
717        store.set_edge_property(edge_id, "since", Value::Int64(2020));
718
719        // Create input chunk with an EdgeId column
720        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
721        builder.column_mut(0).unwrap().push_edge_id(edge_id);
722        builder.advance_row();
723        let chunk = builder.finish();
724
725        let mock_scan = MockScanOperator {
726            chunks: vec![chunk],
727            position: 0,
728        };
729
730        let mut project = ProjectOperator::with_store(
731            Box::new(mock_scan),
732            vec![ProjectExpr::EdgeResolve { column: 0 }],
733            vec![LogicalType::Any],
734            Arc::new(store),
735        );
736
737        let result = project.next().unwrap().unwrap();
738        let value = result.column(0).unwrap().get_value(0).unwrap();
739        if let Value::Map(map) = value {
740            assert_eq!(
741                map.get(&PropertyKey::new("_id")),
742                Some(&Value::Int64(edge_id.as_u64() as i64))
743            );
744            assert_eq!(
745                map.get(&PropertyKey::new("_type")),
746                Some(&Value::String("WORKS_AT".into()))
747            );
748            assert_eq!(
749                map.get(&PropertyKey::new("_source")),
750                Some(&Value::Int64(src.as_u64() as i64))
751            );
752            assert_eq!(
753                map.get(&PropertyKey::new("_target")),
754                Some(&Value::Int64(dst.as_u64() as i64))
755            );
756            assert_eq!(
757                map.get(&PropertyKey::new("since")),
758                Some(&Value::Int64(2020))
759            );
760        } else {
761            panic!("Expected Value::Map, got {:?}", value);
762        }
763    }
764
765    #[test]
766    fn test_project_resolve_missing_entity() {
767        use grafeo_common::types::NodeId;
768
769        let store = LpgStore::new().unwrap();
770
771        // Create input chunk with a NodeId that doesn't exist in the store
772        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
773        builder
774            .column_mut(0)
775            .unwrap()
776            .push_node_id(NodeId::new(999));
777        builder.advance_row();
778        let chunk = builder.finish();
779
780        let mock_scan = MockScanOperator {
781            chunks: vec![chunk],
782            position: 0,
783        };
784
785        let mut project = ProjectOperator::with_store(
786            Box::new(mock_scan),
787            vec![ProjectExpr::NodeResolve { column: 0 }],
788            vec![LogicalType::Any],
789            Arc::new(store),
790        );
791
792        let result = project.next().unwrap().unwrap();
793        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
794    }
795}