Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47}
48
49/// A project operator that selects and transforms columns.
50pub struct ProjectOperator {
51    /// Child operator to read from.
52    child: Box<dyn Operator>,
53    /// Projection expressions.
54    projections: Vec<ProjectExpr>,
55    /// Output column types.
56    output_types: Vec<LogicalType>,
57    /// Optional store for property access.
58    store: Option<Arc<dyn GraphStore>>,
59    /// Transaction ID for MVCC-aware property lookups.
60    transaction_id: Option<TransactionId>,
61    /// Viewing epoch for MVCC-aware property lookups.
62    viewing_epoch: Option<EpochId>,
63    /// Session context for introspection functions in expression evaluation.
64    session_context: SessionContext,
65}
66
67impl ProjectOperator {
68    /// Creates a new project operator.
69    pub fn new(
70        child: Box<dyn Operator>,
71        projections: Vec<ProjectExpr>,
72        output_types: Vec<LogicalType>,
73    ) -> Self {
74        assert_eq!(projections.len(), output_types.len());
75        Self {
76            child,
77            projections,
78            output_types,
79            store: None,
80            transaction_id: None,
81            viewing_epoch: None,
82            session_context: SessionContext::default(),
83        }
84    }
85
86    /// Creates a new project operator with store access for property lookups.
87    pub fn with_store(
88        child: Box<dyn Operator>,
89        projections: Vec<ProjectExpr>,
90        output_types: Vec<LogicalType>,
91        store: Arc<dyn GraphStore>,
92    ) -> Self {
93        assert_eq!(projections.len(), output_types.len());
94        Self {
95            child,
96            projections,
97            output_types,
98            store: Some(store),
99            transaction_id: None,
100            viewing_epoch: None,
101            session_context: SessionContext::default(),
102        }
103    }
104
105    /// Sets the transaction context for MVCC-aware property lookups.
106    pub fn with_transaction_context(
107        mut self,
108        epoch: EpochId,
109        transaction_id: Option<TransactionId>,
110    ) -> Self {
111        self.viewing_epoch = Some(epoch);
112        self.transaction_id = transaction_id;
113        self
114    }
115
116    /// Sets the session context for introspection functions.
117    pub fn with_session_context(mut self, context: SessionContext) -> Self {
118        self.session_context = context;
119        self
120    }
121
122    /// Creates a project operator that selects specific columns.
123    pub fn select_columns(
124        child: Box<dyn Operator>,
125        columns: Vec<usize>,
126        types: Vec<LogicalType>,
127    ) -> Self {
128        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
129        Self::new(child, projections, types)
130    }
131}
132
133impl Operator for ProjectOperator {
134    fn next(&mut self) -> OperatorResult {
135        // Get next chunk from child
136        let Some(input) = self.child.next()? else {
137            return Ok(None);
138        };
139
140        // Create output chunk
141        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
142
143        // Evaluate each projection
144        for (i, proj) in self.projections.iter().enumerate() {
145            match proj {
146                ProjectExpr::Column(col_idx) => {
147                    // Copy column from input to output
148                    let input_col = input.column(*col_idx).ok_or_else(|| {
149                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
150                    })?;
151
152                    let output_col = output
153                        .column_mut(i)
154                        .expect("column exists: index matches projection schema");
155
156                    // Copy selected rows
157                    for row in input.selected_indices() {
158                        if let Some(value) = input_col.get_value(row) {
159                            output_col.push_value(value);
160                        }
161                    }
162                }
163                ProjectExpr::Constant(value) => {
164                    // Push constant for each row
165                    let output_col = output
166                        .column_mut(i)
167                        .expect("column exists: index matches projection schema");
168                    for _ in input.selected_indices() {
169                        output_col.push_value(value.clone());
170                    }
171                }
172                ProjectExpr::PropertyAccess { column, property } => {
173                    // Access property from node/edge in the specified column
174                    let input_col = input
175                        .column(*column)
176                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178                    let output_col = output
179                        .column_mut(i)
180                        .expect("column exists: index matches projection schema");
181
182                    let store = self.store.as_ref().ok_or_else(|| {
183                        OperatorError::Execution("Store required for property access".to_string())
184                    })?;
185
186                    // Extract property for each row.
187                    // For typed columns (VectorData::NodeId / EdgeId) there is
188                    // no ambiguity. For Generic/Any columns (e.g. after a hash
189                    // join), both get_node_id and get_edge_id can succeed on the
190                    // same Int64 value, so we verify against the store to resolve
191                    // the entity type.
192                    let prop_key = PropertyKey::new(property);
193                    let epoch = self.viewing_epoch;
194                    let tx_id = self.transaction_id;
195                    for row in input.selected_indices() {
196                        let value = if let Some(node_id) = input_col.get_node_id(row) {
197                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
198                                store.get_node_versioned(node_id, ep, tx)
199                            } else if let Some(ep) = epoch {
200                                store.get_node_at_epoch(node_id, ep)
201                            } else {
202                                store.get_node(node_id)
203                            };
204                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
205                            {
206                                prop
207                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
208                                // Node lookup failed: the ID may belong to an
209                                // edge (common with Generic columns after joins).
210                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
211                                    store.get_edge_versioned(edge_id, ep, tx)
212                                } else if let Some(ep) = epoch {
213                                    store.get_edge_at_epoch(edge_id, ep)
214                                } else {
215                                    store.get_edge(edge_id)
216                                };
217                                edge.and_then(|e| e.get_property(property).cloned())
218                                    .unwrap_or(Value::Null)
219                            } else {
220                                Value::Null
221                            }
222                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
223                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
224                                store.get_edge_versioned(edge_id, ep, tx)
225                            } else if let Some(ep) = epoch {
226                                store.get_edge_at_epoch(edge_id, ep)
227                            } else {
228                                store.get_edge(edge_id)
229                            };
230                            edge.and_then(|e| e.get_property(property).cloned())
231                                .unwrap_or(Value::Null)
232                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
233                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
234                        } else {
235                            Value::Null
236                        };
237                        output_col.push_value(value);
238                    }
239                }
240                ProjectExpr::EdgeType { column } => {
241                    // Get edge type string from an edge column
242                    let input_col = input
243                        .column(*column)
244                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
245
246                    let output_col = output
247                        .column_mut(i)
248                        .expect("column exists: index matches projection schema");
249
250                    let store = self.store.as_ref().ok_or_else(|| {
251                        OperatorError::Execution("Store required for edge type access".to_string())
252                    })?;
253
254                    let epoch = self.viewing_epoch;
255                    let tx_id = self.transaction_id;
256                    for row in input.selected_indices() {
257                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
258                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
259                                store.edge_type_versioned(edge_id, ep, tx)
260                            } else {
261                                store.edge_type(edge_id)
262                            };
263                            etype.map_or(Value::Null, Value::String)
264                        } else {
265                            Value::Null
266                        };
267                        output_col.push_value(value);
268                    }
269                }
270                ProjectExpr::Expression {
271                    expr,
272                    variable_columns,
273                } => {
274                    let output_col = output
275                        .column_mut(i)
276                        .expect("column exists: index matches projection schema");
277
278                    let store = self.store.as_ref().ok_or_else(|| {
279                        OperatorError::Execution(
280                            "Store required for expression evaluation".to_string(),
281                        )
282                    })?;
283
284                    // Use the ExpressionPredicate for expression evaluation
285                    let mut evaluator = ExpressionPredicate::new(
286                        expr.clone(),
287                        variable_columns.clone(),
288                        Arc::clone(store),
289                    )
290                    .with_session_context(self.session_context.clone());
291                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
292                        evaluator = evaluator.with_transaction_context(ep, tx_id);
293                    }
294
295                    for row in input.selected_indices() {
296                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
297                        output_col.push_value(value);
298                    }
299                }
300                ProjectExpr::NodeResolve { column } => {
301                    let input_col = input
302                        .column(*column)
303                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
304
305                    let output_col = output
306                        .column_mut(i)
307                        .expect("column exists: index matches projection schema");
308
309                    let store = self.store.as_ref().ok_or_else(|| {
310                        OperatorError::Execution("Store required for node resolution".to_string())
311                    })?;
312
313                    let epoch = self.viewing_epoch;
314                    let tx_id = self.transaction_id;
315                    for row in input.selected_indices() {
316                        let value = if let Some(node_id) = input_col.get_node_id(row) {
317                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
318                                store.get_node_versioned(node_id, ep, tx)
319                            } else if let Some(ep) = epoch {
320                                store.get_node_at_epoch(node_id, ep)
321                            } else {
322                                store.get_node(node_id)
323                            };
324                            node.map_or(Value::Null, |n| node_to_map(&n))
325                        } else {
326                            Value::Null
327                        };
328                        output_col.push_value(value);
329                    }
330                }
331                ProjectExpr::EdgeResolve { column } => {
332                    let input_col = input
333                        .column(*column)
334                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
335
336                    let output_col = output
337                        .column_mut(i)
338                        .expect("column exists: index matches projection schema");
339
340                    let store = self.store.as_ref().ok_or_else(|| {
341                        OperatorError::Execution("Store required for edge resolution".to_string())
342                    })?;
343
344                    let epoch = self.viewing_epoch;
345                    let tx_id = self.transaction_id;
346                    for row in input.selected_indices() {
347                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
348                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
349                                store.get_edge_versioned(edge_id, ep, tx)
350                            } else if let Some(ep) = epoch {
351                                store.get_edge_at_epoch(edge_id, ep)
352                            } else {
353                                store.get_edge(edge_id)
354                            };
355                            edge.map_or(Value::Null, |e| edge_to_map(&e))
356                        } else {
357                            Value::Null
358                        };
359                        output_col.push_value(value);
360                    }
361                }
362            }
363        }
364
365        output.set_count(input.row_count());
366        Ok(Some(output))
367    }
368
369    fn reset(&mut self) {
370        self.child.reset();
371    }
372
373    fn name(&self) -> &'static str {
374        "Project"
375    }
376}
377
378/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
379///
380/// The map contains `_id` (integer), `_labels` (list of strings), and
381/// all node properties at the top level.
382fn node_to_map(node: &Node) -> Value {
383    let mut map = BTreeMap::new();
384    map.insert(
385        PropertyKey::new("_id"),
386        Value::Int64(node.id.as_u64() as i64),
387    );
388    let labels: Vec<Value> = node
389        .labels
390        .iter()
391        .map(|l| Value::String(l.clone()))
392        .collect();
393    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
394    for (key, value) in &node.properties {
395        map.insert(key.clone(), value.clone());
396    }
397    Value::Map(Arc::new(map))
398}
399
400/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
401///
402/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
403/// properties at the top level.
404fn edge_to_map(edge: &Edge) -> Value {
405    let mut map = BTreeMap::new();
406    map.insert(
407        PropertyKey::new("_id"),
408        Value::Int64(edge.id.as_u64() as i64),
409    );
410    map.insert(
411        PropertyKey::new("_type"),
412        Value::String(edge.edge_type.clone()),
413    );
414    map.insert(
415        PropertyKey::new("_source"),
416        Value::Int64(edge.src.as_u64() as i64),
417    );
418    map.insert(
419        PropertyKey::new("_target"),
420        Value::Int64(edge.dst.as_u64() as i64),
421    );
422    for (key, value) in &edge.properties {
423        map.insert(key.clone(), value.clone());
424    }
425    Value::Map(Arc::new(map))
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::execution::chunk::DataChunkBuilder;
432    use crate::graph::lpg::LpgStore;
433    use grafeo_common::types::Value;
434
435    struct MockScanOperator {
436        chunks: Vec<DataChunk>,
437        position: usize,
438    }
439
440    impl Operator for MockScanOperator {
441        fn next(&mut self) -> OperatorResult {
442            if self.position < self.chunks.len() {
443                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
444                self.position += 1;
445                Ok(Some(chunk))
446            } else {
447                Ok(None)
448            }
449        }
450
451        fn reset(&mut self) {
452            self.position = 0;
453        }
454
455        fn name(&self) -> &'static str {
456            "MockScan"
457        }
458    }
459
460    #[test]
461    fn test_project_select_columns() {
462        // Create input with 3 columns: [int, string, int]
463        let mut builder =
464            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
465
466        builder.column_mut(0).unwrap().push_int64(1);
467        builder.column_mut(1).unwrap().push_string("hello");
468        builder.column_mut(2).unwrap().push_int64(100);
469        builder.advance_row();
470
471        builder.column_mut(0).unwrap().push_int64(2);
472        builder.column_mut(1).unwrap().push_string("world");
473        builder.column_mut(2).unwrap().push_int64(200);
474        builder.advance_row();
475
476        let chunk = builder.finish();
477
478        let mock_scan = MockScanOperator {
479            chunks: vec![chunk],
480            position: 0,
481        };
482
483        // Project to select columns 2 and 0 (reordering)
484        let mut project = ProjectOperator::select_columns(
485            Box::new(mock_scan),
486            vec![2, 0],
487            vec![LogicalType::Int64, LogicalType::Int64],
488        );
489
490        let result = project.next().unwrap().unwrap();
491
492        assert_eq!(result.column_count(), 2);
493        assert_eq!(result.row_count(), 2);
494
495        // Check values are reordered
496        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
497        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
498    }
499
500    #[test]
501    fn test_project_constant() {
502        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
503        builder.column_mut(0).unwrap().push_int64(1);
504        builder.advance_row();
505        builder.column_mut(0).unwrap().push_int64(2);
506        builder.advance_row();
507
508        let chunk = builder.finish();
509
510        let mock_scan = MockScanOperator {
511            chunks: vec![chunk],
512            position: 0,
513        };
514
515        // Project with a constant
516        let mut project = ProjectOperator::new(
517            Box::new(mock_scan),
518            vec![
519                ProjectExpr::Column(0),
520                ProjectExpr::Constant(Value::String("constant".into())),
521            ],
522            vec![LogicalType::Int64, LogicalType::String],
523        );
524
525        let result = project.next().unwrap().unwrap();
526
527        assert_eq!(result.column_count(), 2);
528        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
529        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
530    }
531
532    #[test]
533    fn test_project_empty_input() {
534        let mock_scan = MockScanOperator {
535            chunks: vec![],
536            position: 0,
537        };
538
539        let mut project =
540            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
541
542        assert!(project.next().unwrap().is_none());
543    }
544
545    #[test]
546    fn test_project_column_not_found() {
547        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
548        builder.column_mut(0).unwrap().push_int64(1);
549        builder.advance_row();
550        let chunk = builder.finish();
551
552        let mock_scan = MockScanOperator {
553            chunks: vec![chunk],
554            position: 0,
555        };
556
557        // Reference column index 5 which doesn't exist
558        let mut project = ProjectOperator::new(
559            Box::new(mock_scan),
560            vec![ProjectExpr::Column(5)],
561            vec![LogicalType::Int64],
562        );
563
564        let result = project.next();
565        assert!(result.is_err(), "Should fail with ColumnNotFound");
566    }
567
568    #[test]
569    fn test_project_multiple_constants() {
570        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
571        builder.column_mut(0).unwrap().push_int64(1);
572        builder.advance_row();
573        let chunk = builder.finish();
574
575        let mock_scan = MockScanOperator {
576            chunks: vec![chunk],
577            position: 0,
578        };
579
580        let mut project = ProjectOperator::new(
581            Box::new(mock_scan),
582            vec![
583                ProjectExpr::Constant(Value::Int64(42)),
584                ProjectExpr::Constant(Value::String("fixed".into())),
585                ProjectExpr::Constant(Value::Bool(true)),
586            ],
587            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
588        );
589
590        let result = project.next().unwrap().unwrap();
591        assert_eq!(result.column_count(), 3);
592        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
593        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
594        assert_eq!(
595            result.column(2).unwrap().get_value(0),
596            Some(Value::Bool(true))
597        );
598    }
599
600    #[test]
601    fn test_project_identity() {
602        // Select all columns in original order
603        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
604        builder.column_mut(0).unwrap().push_int64(10);
605        builder.column_mut(1).unwrap().push_string("test");
606        builder.advance_row();
607        let chunk = builder.finish();
608
609        let mock_scan = MockScanOperator {
610            chunks: vec![chunk],
611            position: 0,
612        };
613
614        let mut project = ProjectOperator::select_columns(
615            Box::new(mock_scan),
616            vec![0, 1],
617            vec![LogicalType::Int64, LogicalType::String],
618        );
619
620        let result = project.next().unwrap().unwrap();
621        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
622        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
623    }
624
625    #[test]
626    fn test_project_name() {
627        let mock_scan = MockScanOperator {
628            chunks: vec![],
629            position: 0,
630        };
631        let project =
632            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
633        assert_eq!(project.name(), "Project");
634    }
635
636    #[test]
637    fn test_project_node_resolve() {
638        // Create a store with a test node
639        let store = LpgStore::new().unwrap();
640        let node_id = store.create_node(&["Person"]);
641        store.set_node_property(node_id, "name", Value::String("Alix".into()));
642        store.set_node_property(node_id, "age", Value::Int64(30));
643
644        // Create input chunk with a NodeId column
645        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
646        builder.column_mut(0).unwrap().push_node_id(node_id);
647        builder.advance_row();
648        let chunk = builder.finish();
649
650        let mock_scan = MockScanOperator {
651            chunks: vec![chunk],
652            position: 0,
653        };
654
655        let mut project = ProjectOperator::with_store(
656            Box::new(mock_scan),
657            vec![ProjectExpr::NodeResolve { column: 0 }],
658            vec![LogicalType::Any],
659            Arc::new(store),
660        );
661
662        let result = project.next().unwrap().unwrap();
663        assert_eq!(result.column_count(), 1);
664
665        let value = result.column(0).unwrap().get_value(0).unwrap();
666        if let Value::Map(map) = value {
667            assert_eq!(
668                map.get(&PropertyKey::new("_id")),
669                Some(&Value::Int64(node_id.as_u64() as i64))
670            );
671            assert!(map.get(&PropertyKey::new("_labels")).is_some());
672            assert_eq!(
673                map.get(&PropertyKey::new("name")),
674                Some(&Value::String("Alix".into()))
675            );
676            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
677        } else {
678            panic!("Expected Value::Map, got {:?}", value);
679        }
680    }
681
682    #[test]
683    fn test_project_edge_resolve() {
684        let store = LpgStore::new().unwrap();
685        let src = store.create_node(&["Person"]);
686        let dst = store.create_node(&["Company"]);
687        let edge_id = store.create_edge(src, dst, "WORKS_AT");
688        store.set_edge_property(edge_id, "since", Value::Int64(2020));
689
690        // Create input chunk with an EdgeId column
691        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
692        builder.column_mut(0).unwrap().push_edge_id(edge_id);
693        builder.advance_row();
694        let chunk = builder.finish();
695
696        let mock_scan = MockScanOperator {
697            chunks: vec![chunk],
698            position: 0,
699        };
700
701        let mut project = ProjectOperator::with_store(
702            Box::new(mock_scan),
703            vec![ProjectExpr::EdgeResolve { column: 0 }],
704            vec![LogicalType::Any],
705            Arc::new(store),
706        );
707
708        let result = project.next().unwrap().unwrap();
709        let value = result.column(0).unwrap().get_value(0).unwrap();
710        if let Value::Map(map) = value {
711            assert_eq!(
712                map.get(&PropertyKey::new("_id")),
713                Some(&Value::Int64(edge_id.as_u64() as i64))
714            );
715            assert_eq!(
716                map.get(&PropertyKey::new("_type")),
717                Some(&Value::String("WORKS_AT".into()))
718            );
719            assert_eq!(
720                map.get(&PropertyKey::new("_source")),
721                Some(&Value::Int64(src.as_u64() as i64))
722            );
723            assert_eq!(
724                map.get(&PropertyKey::new("_target")),
725                Some(&Value::Int64(dst.as_u64() as i64))
726            );
727            assert_eq!(
728                map.get(&PropertyKey::new("since")),
729                Some(&Value::Int64(2020))
730            );
731        } else {
732            panic!("Expected Value::Map, got {:?}", value);
733        }
734    }
735
736    #[test]
737    fn test_project_resolve_missing_entity() {
738        use grafeo_common::types::NodeId;
739
740        let store = LpgStore::new().unwrap();
741
742        // Create input chunk with a NodeId that doesn't exist in the store
743        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
744        builder
745            .column_mut(0)
746            .unwrap()
747            .push_node_id(NodeId::new(999));
748        builder.advance_row();
749        let chunk = builder.finish();
750
751        let mock_scan = MockScanOperator {
752            chunks: vec![chunk],
753            position: 0,
754        };
755
756        let mut project = ProjectOperator::with_store(
757            Box::new(mock_scan),
758            vec![ProjectExpr::NodeResolve { column: 0 }],
759            vec![LogicalType::Any],
760            Arc::new(store),
761        );
762
763        let result = project.next().unwrap().unwrap();
764        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
765    }
766}