Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47    /// Returns the first non-null value from two columns (used for RIGHT/FULL join dedup).
48    Coalesce {
49        /// Primary column index.
50        first: usize,
51        /// Fallback column index.
52        second: usize,
53    },
54}
55
56/// A project operator that selects and transforms columns.
57pub struct ProjectOperator {
58    /// Child operator to read from.
59    child: Box<dyn Operator>,
60    /// Projection expressions.
61    projections: Vec<ProjectExpr>,
62    /// Output column types.
63    output_types: Vec<LogicalType>,
64    /// Optional store for property access.
65    store: Option<Arc<dyn GraphStore>>,
66    /// Transaction ID for MVCC-aware property lookups.
67    transaction_id: Option<TransactionId>,
68    /// Viewing epoch for MVCC-aware property lookups.
69    viewing_epoch: Option<EpochId>,
70    /// Session context for introspection functions in expression evaluation.
71    session_context: SessionContext,
72}
73
74impl ProjectOperator {
75    /// Creates a new project operator.
76    ///
77    /// # Panics
78    ///
79    /// Panics if `projections` and `output_types` have different lengths.
80    pub fn new(
81        child: Box<dyn Operator>,
82        projections: Vec<ProjectExpr>,
83        output_types: Vec<LogicalType>,
84    ) -> Self {
85        assert_eq!(projections.len(), output_types.len());
86        Self {
87            child,
88            projections,
89            output_types,
90            store: None,
91            transaction_id: None,
92            viewing_epoch: None,
93            session_context: SessionContext::default(),
94        }
95    }
96
97    /// Creates a new project operator with store access for property lookups.
98    ///
99    /// # Panics
100    ///
101    /// Panics if `projections` and `output_types` have different lengths.
102    pub fn with_store(
103        child: Box<dyn Operator>,
104        projections: Vec<ProjectExpr>,
105        output_types: Vec<LogicalType>,
106        store: Arc<dyn GraphStore>,
107    ) -> Self {
108        assert_eq!(projections.len(), output_types.len());
109        Self {
110            child,
111            projections,
112            output_types,
113            store: Some(store),
114            transaction_id: None,
115            viewing_epoch: None,
116            session_context: SessionContext::default(),
117        }
118    }
119
120    /// Sets the transaction context for MVCC-aware property lookups.
121    pub fn with_transaction_context(
122        mut self,
123        epoch: EpochId,
124        transaction_id: Option<TransactionId>,
125    ) -> Self {
126        self.viewing_epoch = Some(epoch);
127        self.transaction_id = transaction_id;
128        self
129    }
130
131    /// Sets the session context for introspection functions.
132    pub fn with_session_context(mut self, context: SessionContext) -> Self {
133        self.session_context = context;
134        self
135    }
136
137    /// Creates a project operator that selects specific columns.
138    pub fn select_columns(
139        child: Box<dyn Operator>,
140        columns: Vec<usize>,
141        types: Vec<LogicalType>,
142    ) -> Self {
143        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
144        Self::new(child, projections, types)
145    }
146}
147
148impl Operator for ProjectOperator {
149    fn next(&mut self) -> OperatorResult {
150        // Get next chunk from child
151        let Some(input) = self.child.next()? else {
152            return Ok(None);
153        };
154
155        // Create output chunk
156        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
157
158        // Evaluate each projection
159        for (i, proj) in self.projections.iter().enumerate() {
160            match proj {
161                ProjectExpr::Column(col_idx) => {
162                    // Copy column from input to output
163                    let input_col = input.column(*col_idx).ok_or_else(|| {
164                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
165                    })?;
166
167                    let output_col = output
168                        .column_mut(i)
169                        .expect("column exists: index matches projection schema");
170
171                    // Copy selected rows
172                    for row in input.selected_indices() {
173                        if let Some(value) = input_col.get_value(row) {
174                            output_col.push_value(value);
175                        }
176                    }
177                }
178                ProjectExpr::Constant(value) => {
179                    // Push constant for each row
180                    let output_col = output
181                        .column_mut(i)
182                        .expect("column exists: index matches projection schema");
183                    for _ in input.selected_indices() {
184                        output_col.push_value(value.clone());
185                    }
186                }
187                ProjectExpr::PropertyAccess { column, property } => {
188                    // Access property from node/edge in the specified column
189                    let input_col = input
190                        .column(*column)
191                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
192
193                    let output_col = output
194                        .column_mut(i)
195                        .expect("column exists: index matches projection schema");
196
197                    let store = self.store.as_ref().ok_or_else(|| {
198                        OperatorError::Execution("Store required for property access".to_string())
199                    })?;
200
201                    // Extract property for each row.
202                    // For typed columns (VectorData::NodeId / EdgeId) there is
203                    // no ambiguity. For Generic/Any columns (e.g. after a hash
204                    // join), both get_node_id and get_edge_id can succeed on the
205                    // same Int64 value, so we verify against the store to resolve
206                    // the entity type.
207                    let prop_key = PropertyKey::new(property);
208                    let epoch = self.viewing_epoch;
209                    let tx_id = self.transaction_id;
210                    for row in input.selected_indices() {
211                        let value = if let Some(node_id) = input_col.get_node_id(row) {
212                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
213                                store.get_node_versioned(node_id, ep, tx)
214                            } else if let Some(ep) = epoch {
215                                store.get_node_at_epoch(node_id, ep)
216                            } else {
217                                store.get_node(node_id)
218                            };
219                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
220                            {
221                                prop
222                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
223                                // Node lookup failed: the ID may belong to an
224                                // edge (common with Generic columns after joins).
225                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
226                                    store.get_edge_versioned(edge_id, ep, tx)
227                                } else if let Some(ep) = epoch {
228                                    store.get_edge_at_epoch(edge_id, ep)
229                                } else {
230                                    store.get_edge(edge_id)
231                                };
232                                edge.and_then(|e| e.get_property(property).cloned())
233                                    .unwrap_or(Value::Null)
234                            } else {
235                                Value::Null
236                            }
237                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
238                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
239                                store.get_edge_versioned(edge_id, ep, tx)
240                            } else if let Some(ep) = epoch {
241                                store.get_edge_at_epoch(edge_id, ep)
242                            } else {
243                                store.get_edge(edge_id)
244                            };
245                            edge.and_then(|e| e.get_property(property).cloned())
246                                .unwrap_or(Value::Null)
247                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
248                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
249                        } else {
250                            Value::Null
251                        };
252                        output_col.push_value(value);
253                    }
254                }
255                ProjectExpr::EdgeType { column } => {
256                    // Get edge type string from an edge column
257                    let input_col = input
258                        .column(*column)
259                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
260
261                    let output_col = output
262                        .column_mut(i)
263                        .expect("column exists: index matches projection schema");
264
265                    let store = self.store.as_ref().ok_or_else(|| {
266                        OperatorError::Execution("Store required for edge type access".to_string())
267                    })?;
268
269                    let epoch = self.viewing_epoch;
270                    let tx_id = self.transaction_id;
271                    for row in input.selected_indices() {
272                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
273                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
274                                store.edge_type_versioned(edge_id, ep, tx)
275                            } else {
276                                store.edge_type(edge_id)
277                            };
278                            etype.map_or(Value::Null, Value::String)
279                        } else {
280                            Value::Null
281                        };
282                        output_col.push_value(value);
283                    }
284                }
285                ProjectExpr::Expression {
286                    expr,
287                    variable_columns,
288                } => {
289                    let output_col = output
290                        .column_mut(i)
291                        .expect("column exists: index matches projection schema");
292
293                    let store = self.store.as_ref().ok_or_else(|| {
294                        OperatorError::Execution(
295                            "Store required for expression evaluation".to_string(),
296                        )
297                    })?;
298
299                    // Use the ExpressionPredicate for expression evaluation
300                    let mut evaluator = ExpressionPredicate::new(
301                        expr.clone(),
302                        variable_columns.clone(),
303                        Arc::clone(store),
304                    )
305                    .with_session_context(self.session_context.clone());
306                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
307                        evaluator = evaluator.with_transaction_context(ep, tx_id);
308                    }
309
310                    for row in input.selected_indices() {
311                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
312                        output_col.push_value(value);
313                    }
314                }
315                ProjectExpr::NodeResolve { column } => {
316                    let input_col = input
317                        .column(*column)
318                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
319
320                    let output_col = output
321                        .column_mut(i)
322                        .expect("column exists: index matches projection schema");
323
324                    let store = self.store.as_ref().ok_or_else(|| {
325                        OperatorError::Execution("Store required for node resolution".to_string())
326                    })?;
327
328                    let epoch = self.viewing_epoch;
329                    let tx_id = self.transaction_id;
330                    for row in input.selected_indices() {
331                        let value = if let Some(node_id) = input_col.get_node_id(row) {
332                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
333                                store.get_node_versioned(node_id, ep, tx)
334                            } else if let Some(ep) = epoch {
335                                store.get_node_at_epoch(node_id, ep)
336                            } else {
337                                store.get_node(node_id)
338                            };
339                            node.map_or(Value::Null, |n| node_to_map(&n))
340                        } else {
341                            Value::Null
342                        };
343                        output_col.push_value(value);
344                    }
345                }
346                ProjectExpr::EdgeResolve { column } => {
347                    let input_col = input
348                        .column(*column)
349                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
350
351                    let output_col = output
352                        .column_mut(i)
353                        .expect("column exists: index matches projection schema");
354
355                    let store = self.store.as_ref().ok_or_else(|| {
356                        OperatorError::Execution("Store required for edge resolution".to_string())
357                    })?;
358
359                    let epoch = self.viewing_epoch;
360                    let tx_id = self.transaction_id;
361                    for row in input.selected_indices() {
362                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
363                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
364                                store.get_edge_versioned(edge_id, ep, tx)
365                            } else if let Some(ep) = epoch {
366                                store.get_edge_at_epoch(edge_id, ep)
367                            } else {
368                                store.get_edge(edge_id)
369                            };
370                            edge.map_or(Value::Null, |e| edge_to_map(&e))
371                        } else {
372                            Value::Null
373                        };
374                        output_col.push_value(value);
375                    }
376                }
377                ProjectExpr::Coalesce { first, second } => {
378                    let first_col = input
379                        .column(*first)
380                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
381                    let second_col = input
382                        .column(*second)
383                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
384
385                    let output_col = output
386                        .column_mut(i)
387                        .expect("column exists: index matches projection schema");
388
389                    for row in input.selected_indices() {
390                        let value = match first_col.get_value(row) {
391                            Some(Value::Null) | None => {
392                                second_col.get_value(row).unwrap_or(Value::Null)
393                            }
394                            Some(v) => v,
395                        };
396                        output_col.push_value(value);
397                    }
398                }
399            }
400        }
401
402        output.set_count(input.row_count());
403        Ok(Some(output))
404    }
405
406    fn reset(&mut self) {
407        self.child.reset();
408    }
409
410    fn name(&self) -> &'static str {
411        "Project"
412    }
413}
414
415/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
416///
417/// The map contains `_id` (integer), `_labels` (list of strings), and
418/// all node properties at the top level.
419fn node_to_map(node: &Node) -> Value {
420    let mut map = BTreeMap::new();
421    map.insert(
422        PropertyKey::new("_id"),
423        Value::Int64(node.id.as_u64() as i64),
424    );
425    let labels: Vec<Value> = node
426        .labels
427        .iter()
428        .map(|l| Value::String(l.clone()))
429        .collect();
430    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
431    for (key, value) in &node.properties {
432        map.insert(key.clone(), value.clone());
433    }
434    Value::Map(Arc::new(map))
435}
436
437/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
438///
439/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
440/// properties at the top level.
441fn edge_to_map(edge: &Edge) -> Value {
442    let mut map = BTreeMap::new();
443    map.insert(
444        PropertyKey::new("_id"),
445        Value::Int64(edge.id.as_u64() as i64),
446    );
447    map.insert(
448        PropertyKey::new("_type"),
449        Value::String(edge.edge_type.clone()),
450    );
451    map.insert(
452        PropertyKey::new("_source"),
453        Value::Int64(edge.src.as_u64() as i64),
454    );
455    map.insert(
456        PropertyKey::new("_target"),
457        Value::Int64(edge.dst.as_u64() as i64),
458    );
459    for (key, value) in &edge.properties {
460        map.insert(key.clone(), value.clone());
461    }
462    Value::Map(Arc::new(map))
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use crate::execution::chunk::DataChunkBuilder;
469    use crate::graph::lpg::LpgStore;
470    use grafeo_common::types::Value;
471
472    struct MockScanOperator {
473        chunks: Vec<DataChunk>,
474        position: usize,
475    }
476
477    impl Operator for MockScanOperator {
478        fn next(&mut self) -> OperatorResult {
479            if self.position < self.chunks.len() {
480                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
481                self.position += 1;
482                Ok(Some(chunk))
483            } else {
484                Ok(None)
485            }
486        }
487
488        fn reset(&mut self) {
489            self.position = 0;
490        }
491
492        fn name(&self) -> &'static str {
493            "MockScan"
494        }
495    }
496
497    #[test]
498    fn test_project_select_columns() {
499        // Create input with 3 columns: [int, string, int]
500        let mut builder =
501            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
502
503        builder.column_mut(0).unwrap().push_int64(1);
504        builder.column_mut(1).unwrap().push_string("hello");
505        builder.column_mut(2).unwrap().push_int64(100);
506        builder.advance_row();
507
508        builder.column_mut(0).unwrap().push_int64(2);
509        builder.column_mut(1).unwrap().push_string("world");
510        builder.column_mut(2).unwrap().push_int64(200);
511        builder.advance_row();
512
513        let chunk = builder.finish();
514
515        let mock_scan = MockScanOperator {
516            chunks: vec![chunk],
517            position: 0,
518        };
519
520        // Project to select columns 2 and 0 (reordering)
521        let mut project = ProjectOperator::select_columns(
522            Box::new(mock_scan),
523            vec![2, 0],
524            vec![LogicalType::Int64, LogicalType::Int64],
525        );
526
527        let result = project.next().unwrap().unwrap();
528
529        assert_eq!(result.column_count(), 2);
530        assert_eq!(result.row_count(), 2);
531
532        // Check values are reordered
533        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
534        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
535    }
536
537    #[test]
538    fn test_project_constant() {
539        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
540        builder.column_mut(0).unwrap().push_int64(1);
541        builder.advance_row();
542        builder.column_mut(0).unwrap().push_int64(2);
543        builder.advance_row();
544
545        let chunk = builder.finish();
546
547        let mock_scan = MockScanOperator {
548            chunks: vec![chunk],
549            position: 0,
550        };
551
552        // Project with a constant
553        let mut project = ProjectOperator::new(
554            Box::new(mock_scan),
555            vec![
556                ProjectExpr::Column(0),
557                ProjectExpr::Constant(Value::String("constant".into())),
558            ],
559            vec![LogicalType::Int64, LogicalType::String],
560        );
561
562        let result = project.next().unwrap().unwrap();
563
564        assert_eq!(result.column_count(), 2);
565        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
566        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
567    }
568
569    #[test]
570    fn test_project_empty_input() {
571        let mock_scan = MockScanOperator {
572            chunks: vec![],
573            position: 0,
574        };
575
576        let mut project =
577            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
578
579        assert!(project.next().unwrap().is_none());
580    }
581
582    #[test]
583    fn test_project_column_not_found() {
584        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
585        builder.column_mut(0).unwrap().push_int64(1);
586        builder.advance_row();
587        let chunk = builder.finish();
588
589        let mock_scan = MockScanOperator {
590            chunks: vec![chunk],
591            position: 0,
592        };
593
594        // Reference column index 5 which doesn't exist
595        let mut project = ProjectOperator::new(
596            Box::new(mock_scan),
597            vec![ProjectExpr::Column(5)],
598            vec![LogicalType::Int64],
599        );
600
601        let result = project.next();
602        assert!(result.is_err(), "Should fail with ColumnNotFound");
603    }
604
605    #[test]
606    fn test_project_multiple_constants() {
607        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
608        builder.column_mut(0).unwrap().push_int64(1);
609        builder.advance_row();
610        let chunk = builder.finish();
611
612        let mock_scan = MockScanOperator {
613            chunks: vec![chunk],
614            position: 0,
615        };
616
617        let mut project = ProjectOperator::new(
618            Box::new(mock_scan),
619            vec![
620                ProjectExpr::Constant(Value::Int64(42)),
621                ProjectExpr::Constant(Value::String("fixed".into())),
622                ProjectExpr::Constant(Value::Bool(true)),
623            ],
624            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
625        );
626
627        let result = project.next().unwrap().unwrap();
628        assert_eq!(result.column_count(), 3);
629        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
630        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
631        assert_eq!(
632            result.column(2).unwrap().get_value(0),
633            Some(Value::Bool(true))
634        );
635    }
636
637    #[test]
638    fn test_project_identity() {
639        // Select all columns in original order
640        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
641        builder.column_mut(0).unwrap().push_int64(10);
642        builder.column_mut(1).unwrap().push_string("test");
643        builder.advance_row();
644        let chunk = builder.finish();
645
646        let mock_scan = MockScanOperator {
647            chunks: vec![chunk],
648            position: 0,
649        };
650
651        let mut project = ProjectOperator::select_columns(
652            Box::new(mock_scan),
653            vec![0, 1],
654            vec![LogicalType::Int64, LogicalType::String],
655        );
656
657        let result = project.next().unwrap().unwrap();
658        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
659        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
660    }
661
662    #[test]
663    fn test_project_name() {
664        let mock_scan = MockScanOperator {
665            chunks: vec![],
666            position: 0,
667        };
668        let project =
669            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
670        assert_eq!(project.name(), "Project");
671    }
672
673    #[test]
674    fn test_project_node_resolve() {
675        // Create a store with a test node
676        let store = LpgStore::new().unwrap();
677        let node_id = store.create_node(&["Person"]);
678        store.set_node_property(node_id, "name", Value::String("Alix".into()));
679        store.set_node_property(node_id, "age", Value::Int64(30));
680
681        // Create input chunk with a NodeId column
682        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
683        builder.column_mut(0).unwrap().push_node_id(node_id);
684        builder.advance_row();
685        let chunk = builder.finish();
686
687        let mock_scan = MockScanOperator {
688            chunks: vec![chunk],
689            position: 0,
690        };
691
692        let mut project = ProjectOperator::with_store(
693            Box::new(mock_scan),
694            vec![ProjectExpr::NodeResolve { column: 0 }],
695            vec![LogicalType::Any],
696            Arc::new(store),
697        );
698
699        let result = project.next().unwrap().unwrap();
700        assert_eq!(result.column_count(), 1);
701
702        let value = result.column(0).unwrap().get_value(0).unwrap();
703        if let Value::Map(map) = value {
704            assert_eq!(
705                map.get(&PropertyKey::new("_id")),
706                Some(&Value::Int64(node_id.as_u64() as i64))
707            );
708            assert!(map.get(&PropertyKey::new("_labels")).is_some());
709            assert_eq!(
710                map.get(&PropertyKey::new("name")),
711                Some(&Value::String("Alix".into()))
712            );
713            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
714        } else {
715            panic!("Expected Value::Map, got {:?}", value);
716        }
717    }
718
719    #[test]
720    fn test_project_edge_resolve() {
721        let store = LpgStore::new().unwrap();
722        let src = store.create_node(&["Person"]);
723        let dst = store.create_node(&["Company"]);
724        let edge_id = store.create_edge(src, dst, "WORKS_AT");
725        store.set_edge_property(edge_id, "since", Value::Int64(2020));
726
727        // Create input chunk with an EdgeId column
728        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
729        builder.column_mut(0).unwrap().push_edge_id(edge_id);
730        builder.advance_row();
731        let chunk = builder.finish();
732
733        let mock_scan = MockScanOperator {
734            chunks: vec![chunk],
735            position: 0,
736        };
737
738        let mut project = ProjectOperator::with_store(
739            Box::new(mock_scan),
740            vec![ProjectExpr::EdgeResolve { column: 0 }],
741            vec![LogicalType::Any],
742            Arc::new(store),
743        );
744
745        let result = project.next().unwrap().unwrap();
746        let value = result.column(0).unwrap().get_value(0).unwrap();
747        if let Value::Map(map) = value {
748            assert_eq!(
749                map.get(&PropertyKey::new("_id")),
750                Some(&Value::Int64(edge_id.as_u64() as i64))
751            );
752            assert_eq!(
753                map.get(&PropertyKey::new("_type")),
754                Some(&Value::String("WORKS_AT".into()))
755            );
756            assert_eq!(
757                map.get(&PropertyKey::new("_source")),
758                Some(&Value::Int64(src.as_u64() as i64))
759            );
760            assert_eq!(
761                map.get(&PropertyKey::new("_target")),
762                Some(&Value::Int64(dst.as_u64() as i64))
763            );
764            assert_eq!(
765                map.get(&PropertyKey::new("since")),
766                Some(&Value::Int64(2020))
767            );
768        } else {
769            panic!("Expected Value::Map, got {:?}", value);
770        }
771    }
772
773    #[test]
774    fn test_project_resolve_missing_entity() {
775        use grafeo_common::types::NodeId;
776
777        let store = LpgStore::new().unwrap();
778
779        // Create input chunk with a NodeId that doesn't exist in the store
780        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
781        builder
782            .column_mut(0)
783            .unwrap()
784            .push_node_id(NodeId::new(999));
785        builder.advance_row();
786        let chunk = builder.finish();
787
788        let mock_scan = MockScanOperator {
789            chunks: vec![chunk],
790            position: 0,
791        };
792
793        let mut project = ProjectOperator::with_store(
794            Box::new(mock_scan),
795            vec![ProjectExpr::NodeResolve { column: 0 }],
796            vec![LogicalType::Any],
797            Arc::new(store),
798        );
799
800        let result = project.next().unwrap().unwrap();
801        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
802    }
803}