Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13#[non_exhaustive]
14pub enum ProjectExpr {
15    /// Reference to an input column.
16    Column(usize),
17    /// A constant value.
18    Constant(Value),
19    /// Property access on a node/edge column.
20    PropertyAccess {
21        /// The column containing the node or edge ID.
22        column: usize,
23        /// The property name to access.
24        property: String,
25    },
26    /// Edge type accessor (for type(r) function).
27    EdgeType {
28        /// The column containing the edge ID.
29        column: usize,
30    },
31    /// Full expression evaluation (for CASE WHEN, etc.).
32    Expression {
33        /// The filter expression to evaluate.
34        expr: FilterExpression,
35        /// Variable name to column index mapping.
36        variable_columns: HashMap<String, usize>,
37    },
38    /// Resolve a node ID column to a full node map with metadata and properties.
39    NodeResolve {
40        /// The column containing the node ID.
41        column: usize,
42    },
43    /// Resolve an edge ID column to a full edge map with metadata and properties.
44    EdgeResolve {
45        /// The column containing the edge ID.
46        column: usize,
47    },
48    /// Returns the first non-null value from two columns (used for RIGHT/FULL join dedup).
49    Coalesce {
50        /// Primary column index.
51        first: usize,
52        /// Fallback column index.
53        second: usize,
54    },
55}
56
57/// A project operator that selects and transforms columns.
58pub struct ProjectOperator {
59    /// Child operator to read from.
60    child: Box<dyn Operator>,
61    /// Projection expressions.
62    projections: Vec<ProjectExpr>,
63    /// Output column types.
64    output_types: Vec<LogicalType>,
65    /// Optional store for property access.
66    store: Option<Arc<dyn GraphStore>>,
67    /// Transaction ID for MVCC-aware property lookups.
68    transaction_id: Option<TransactionId>,
69    /// Viewing epoch for MVCC-aware property lookups.
70    viewing_epoch: Option<EpochId>,
71    /// Session context for introspection functions in expression evaluation.
72    session_context: SessionContext,
73}
74
75impl ProjectOperator {
76    /// Creates a new project operator.
77    ///
78    /// # Panics
79    ///
80    /// Panics if `projections` and `output_types` have different lengths.
81    pub fn new(
82        child: Box<dyn Operator>,
83        projections: Vec<ProjectExpr>,
84        output_types: Vec<LogicalType>,
85    ) -> Self {
86        assert_eq!(projections.len(), output_types.len());
87        Self {
88            child,
89            projections,
90            output_types,
91            store: None,
92            transaction_id: None,
93            viewing_epoch: None,
94            session_context: SessionContext::default(),
95        }
96    }
97
98    /// Creates a new project operator with store access for property lookups.
99    ///
100    /// # Panics
101    ///
102    /// Panics if `projections` and `output_types` have different lengths.
103    pub fn with_store(
104        child: Box<dyn Operator>,
105        projections: Vec<ProjectExpr>,
106        output_types: Vec<LogicalType>,
107        store: Arc<dyn GraphStore>,
108    ) -> Self {
109        assert_eq!(projections.len(), output_types.len());
110        Self {
111            child,
112            projections,
113            output_types,
114            store: Some(store),
115            transaction_id: None,
116            viewing_epoch: None,
117            session_context: SessionContext::default(),
118        }
119    }
120
121    /// Sets the transaction context for MVCC-aware property lookups.
122    pub fn with_transaction_context(
123        mut self,
124        epoch: EpochId,
125        transaction_id: Option<TransactionId>,
126    ) -> Self {
127        self.viewing_epoch = Some(epoch);
128        self.transaction_id = transaction_id;
129        self
130    }
131
132    /// Sets the session context for introspection functions.
133    pub fn with_session_context(mut self, context: SessionContext) -> Self {
134        self.session_context = context;
135        self
136    }
137
138    /// Creates a project operator that selects specific columns.
139    pub fn select_columns(
140        child: Box<dyn Operator>,
141        columns: Vec<usize>,
142        types: Vec<LogicalType>,
143    ) -> Self {
144        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
145        Self::new(child, projections, types)
146    }
147}
148
149impl Operator for ProjectOperator {
150    fn next(&mut self) -> OperatorResult {
151        // Get next chunk from child
152        let Some(input) = self.child.next()? else {
153            return Ok(None);
154        };
155
156        // Create output chunk
157        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
158
159        // Evaluate each projection
160        for (i, proj) in self.projections.iter().enumerate() {
161            match proj {
162                ProjectExpr::Column(col_idx) => {
163                    // Copy column from input to output
164                    let input_col = input.column(*col_idx).ok_or_else(|| {
165                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
166                    })?;
167
168                    let output_col = output
169                        .column_mut(i)
170                        .expect("column exists: index matches projection schema");
171
172                    // Copy selected rows
173                    for row in input.selected_indices() {
174                        if let Some(value) = input_col.get_value(row) {
175                            output_col.push_value(value);
176                        }
177                    }
178                }
179                ProjectExpr::Constant(value) => {
180                    // Push constant for each row
181                    let output_col = output
182                        .column_mut(i)
183                        .expect("column exists: index matches projection schema");
184                    for _ in input.selected_indices() {
185                        output_col.push_value(value.clone());
186                    }
187                }
188                ProjectExpr::PropertyAccess { column, property } => {
189                    // Access property from node/edge in the specified column
190                    let input_col = input
191                        .column(*column)
192                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
193
194                    let output_col = output
195                        .column_mut(i)
196                        .expect("column exists: index matches projection schema");
197
198                    let store = self.store.as_ref().ok_or_else(|| {
199                        OperatorError::Execution("Store required for property access".to_string())
200                    })?;
201
202                    // Extract property for each row.
203                    // For typed columns (VectorData::NodeId / EdgeId) there is
204                    // no ambiguity. For Generic/Any columns (e.g. after a hash
205                    // join), both get_node_id and get_edge_id can succeed on the
206                    // same Int64 value, so we verify against the store to resolve
207                    // the entity type.
208                    let prop_key = PropertyKey::new(property);
209                    let epoch = self.viewing_epoch;
210                    let tx_id = self.transaction_id;
211                    for row in input.selected_indices() {
212                        let value = if let Some(node_id) = input_col.get_node_id(row) {
213                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
214                                store.get_node_versioned(node_id, ep, tx)
215                            } else if let Some(ep) = epoch {
216                                store.get_node_at_epoch(node_id, ep)
217                            } else {
218                                store.get_node(node_id)
219                            };
220                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
221                            {
222                                prop
223                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
224                                // Node lookup failed: the ID may belong to an
225                                // edge (common with Generic columns after joins).
226                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
227                                    store.get_edge_versioned(edge_id, ep, tx)
228                                } else if let Some(ep) = epoch {
229                                    store.get_edge_at_epoch(edge_id, ep)
230                                } else {
231                                    store.get_edge(edge_id)
232                                };
233                                edge.and_then(|e| e.get_property(property).cloned())
234                                    .unwrap_or(Value::Null)
235                            } else {
236                                Value::Null
237                            }
238                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
239                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
240                                store.get_edge_versioned(edge_id, ep, tx)
241                            } else if let Some(ep) = epoch {
242                                store.get_edge_at_epoch(edge_id, ep)
243                            } else {
244                                store.get_edge(edge_id)
245                            };
246                            edge.and_then(|e| e.get_property(property).cloned())
247                                .unwrap_or(Value::Null)
248                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
249                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
250                        } else {
251                            Value::Null
252                        };
253                        output_col.push_value(value);
254                    }
255                }
256                ProjectExpr::EdgeType { column } => {
257                    // Get edge type string from an edge column
258                    let input_col = input
259                        .column(*column)
260                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
261
262                    let output_col = output
263                        .column_mut(i)
264                        .expect("column exists: index matches projection schema");
265
266                    let store = self.store.as_ref().ok_or_else(|| {
267                        OperatorError::Execution("Store required for edge type access".to_string())
268                    })?;
269
270                    let epoch = self.viewing_epoch;
271                    let tx_id = self.transaction_id;
272                    for row in input.selected_indices() {
273                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
274                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
275                                store.edge_type_versioned(edge_id, ep, tx)
276                            } else {
277                                store.edge_type(edge_id)
278                            };
279                            etype.map_or(Value::Null, Value::String)
280                        } else {
281                            Value::Null
282                        };
283                        output_col.push_value(value);
284                    }
285                }
286                ProjectExpr::Expression {
287                    expr,
288                    variable_columns,
289                } => {
290                    let output_col = output
291                        .column_mut(i)
292                        .expect("column exists: index matches projection schema");
293
294                    let store = self.store.as_ref().ok_or_else(|| {
295                        OperatorError::Execution(
296                            "Store required for expression evaluation".to_string(),
297                        )
298                    })?;
299
300                    // Use the ExpressionPredicate for expression evaluation
301                    let mut evaluator = ExpressionPredicate::new(
302                        expr.clone(),
303                        variable_columns.clone(),
304                        Arc::clone(store),
305                    )
306                    .with_session_context(self.session_context.clone());
307                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
308                        evaluator = evaluator.with_transaction_context(ep, tx_id);
309                    }
310
311                    for row in input.selected_indices() {
312                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
313                        output_col.push_value(value);
314                    }
315                }
316                ProjectExpr::NodeResolve { column } => {
317                    let input_col = input
318                        .column(*column)
319                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
320
321                    let output_col = output
322                        .column_mut(i)
323                        .expect("column exists: index matches projection schema");
324
325                    let store = self.store.as_ref().ok_or_else(|| {
326                        OperatorError::Execution("Store required for node resolution".to_string())
327                    })?;
328
329                    let epoch = self.viewing_epoch;
330                    let tx_id = self.transaction_id;
331                    for row in input.selected_indices() {
332                        let value = if let Some(node_id) = input_col.get_node_id(row) {
333                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
334                                store.get_node_versioned(node_id, ep, tx)
335                            } else if let Some(ep) = epoch {
336                                store.get_node_at_epoch(node_id, ep)
337                            } else {
338                                store.get_node(node_id)
339                            };
340                            node.map_or(Value::Null, |n| node_to_map(&n))
341                        } else {
342                            Value::Null
343                        };
344                        output_col.push_value(value);
345                    }
346                }
347                ProjectExpr::EdgeResolve { column } => {
348                    let input_col = input
349                        .column(*column)
350                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
351
352                    let output_col = output
353                        .column_mut(i)
354                        .expect("column exists: index matches projection schema");
355
356                    let store = self.store.as_ref().ok_or_else(|| {
357                        OperatorError::Execution("Store required for edge resolution".to_string())
358                    })?;
359
360                    let epoch = self.viewing_epoch;
361                    let tx_id = self.transaction_id;
362                    for row in input.selected_indices() {
363                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
364                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
365                                store.get_edge_versioned(edge_id, ep, tx)
366                            } else if let Some(ep) = epoch {
367                                store.get_edge_at_epoch(edge_id, ep)
368                            } else {
369                                store.get_edge(edge_id)
370                            };
371                            edge.map_or(Value::Null, |e| edge_to_map(&e))
372                        } else {
373                            Value::Null
374                        };
375                        output_col.push_value(value);
376                    }
377                }
378                ProjectExpr::Coalesce { first, second } => {
379                    let first_col = input
380                        .column(*first)
381                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
382                    let second_col = input
383                        .column(*second)
384                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
385
386                    let output_col = output
387                        .column_mut(i)
388                        .expect("column exists: index matches projection schema");
389
390                    for row in input.selected_indices() {
391                        let value = match first_col.get_value(row) {
392                            Some(Value::Null) | None => {
393                                second_col.get_value(row).unwrap_or(Value::Null)
394                            }
395                            Some(v) => v,
396                        };
397                        output_col.push_value(value);
398                    }
399                }
400            }
401        }
402
403        output.set_count(input.row_count());
404        Ok(Some(output))
405    }
406
407    fn reset(&mut self) {
408        self.child.reset();
409    }
410
411    fn name(&self) -> &'static str {
412        "Project"
413    }
414}
415
416/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
417///
418/// The map contains `_id` (integer), `_labels` (list of strings), and
419/// all node properties at the top level.
420fn node_to_map(node: &Node) -> Value {
421    let mut map = BTreeMap::new();
422    map.insert(
423        PropertyKey::new("_id"),
424        Value::Int64(node.id.as_u64() as i64),
425    );
426    let labels: Vec<Value> = node
427        .labels
428        .iter()
429        .map(|l| Value::String(l.clone()))
430        .collect();
431    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
432    for (key, value) in &node.properties {
433        map.insert(key.clone(), value.clone());
434    }
435    Value::Map(Arc::new(map))
436}
437
438/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
439///
440/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
441/// properties at the top level.
442fn edge_to_map(edge: &Edge) -> Value {
443    let mut map = BTreeMap::new();
444    map.insert(
445        PropertyKey::new("_id"),
446        Value::Int64(edge.id.as_u64() as i64),
447    );
448    map.insert(
449        PropertyKey::new("_type"),
450        Value::String(edge.edge_type.clone()),
451    );
452    map.insert(
453        PropertyKey::new("_source"),
454        Value::Int64(edge.src.as_u64() as i64),
455    );
456    map.insert(
457        PropertyKey::new("_target"),
458        Value::Int64(edge.dst.as_u64() as i64),
459    );
460    for (key, value) in &edge.properties {
461        map.insert(key.clone(), value.clone());
462    }
463    Value::Map(Arc::new(map))
464}
465
466#[cfg(all(test, feature = "lpg"))]
467mod tests {
468    use super::*;
469    use crate::execution::chunk::DataChunkBuilder;
470    use crate::graph::lpg::LpgStore;
471    use grafeo_common::types::Value;
472
473    struct MockScanOperator {
474        chunks: Vec<DataChunk>,
475        position: usize,
476    }
477
478    impl Operator for MockScanOperator {
479        fn next(&mut self) -> OperatorResult {
480            if self.position < self.chunks.len() {
481                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
482                self.position += 1;
483                Ok(Some(chunk))
484            } else {
485                Ok(None)
486            }
487        }
488
489        fn reset(&mut self) {
490            self.position = 0;
491        }
492
493        fn name(&self) -> &'static str {
494            "MockScan"
495        }
496    }
497
498    #[test]
499    fn test_project_select_columns() {
500        // Create input with 3 columns: [int, string, int]
501        let mut builder =
502            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
503
504        builder.column_mut(0).unwrap().push_int64(1);
505        builder.column_mut(1).unwrap().push_string("hello");
506        builder.column_mut(2).unwrap().push_int64(100);
507        builder.advance_row();
508
509        builder.column_mut(0).unwrap().push_int64(2);
510        builder.column_mut(1).unwrap().push_string("world");
511        builder.column_mut(2).unwrap().push_int64(200);
512        builder.advance_row();
513
514        let chunk = builder.finish();
515
516        let mock_scan = MockScanOperator {
517            chunks: vec![chunk],
518            position: 0,
519        };
520
521        // Project to select columns 2 and 0 (reordering)
522        let mut project = ProjectOperator::select_columns(
523            Box::new(mock_scan),
524            vec![2, 0],
525            vec![LogicalType::Int64, LogicalType::Int64],
526        );
527
528        let result = project.next().unwrap().unwrap();
529
530        assert_eq!(result.column_count(), 2);
531        assert_eq!(result.row_count(), 2);
532
533        // Check values are reordered
534        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
535        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
536    }
537
538    #[test]
539    fn test_project_constant() {
540        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
541        builder.column_mut(0).unwrap().push_int64(1);
542        builder.advance_row();
543        builder.column_mut(0).unwrap().push_int64(2);
544        builder.advance_row();
545
546        let chunk = builder.finish();
547
548        let mock_scan = MockScanOperator {
549            chunks: vec![chunk],
550            position: 0,
551        };
552
553        // Project with a constant
554        let mut project = ProjectOperator::new(
555            Box::new(mock_scan),
556            vec![
557                ProjectExpr::Column(0),
558                ProjectExpr::Constant(Value::String("constant".into())),
559            ],
560            vec![LogicalType::Int64, LogicalType::String],
561        );
562
563        let result = project.next().unwrap().unwrap();
564
565        assert_eq!(result.column_count(), 2);
566        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
567        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
568    }
569
570    #[test]
571    fn test_project_empty_input() {
572        let mock_scan = MockScanOperator {
573            chunks: vec![],
574            position: 0,
575        };
576
577        let mut project =
578            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
579
580        assert!(project.next().unwrap().is_none());
581    }
582
583    #[test]
584    fn test_project_column_not_found() {
585        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
586        builder.column_mut(0).unwrap().push_int64(1);
587        builder.advance_row();
588        let chunk = builder.finish();
589
590        let mock_scan = MockScanOperator {
591            chunks: vec![chunk],
592            position: 0,
593        };
594
595        // Reference column index 5 which doesn't exist
596        let mut project = ProjectOperator::new(
597            Box::new(mock_scan),
598            vec![ProjectExpr::Column(5)],
599            vec![LogicalType::Int64],
600        );
601
602        let result = project.next();
603        assert!(result.is_err(), "Should fail with ColumnNotFound");
604    }
605
606    #[test]
607    fn test_project_multiple_constants() {
608        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
609        builder.column_mut(0).unwrap().push_int64(1);
610        builder.advance_row();
611        let chunk = builder.finish();
612
613        let mock_scan = MockScanOperator {
614            chunks: vec![chunk],
615            position: 0,
616        };
617
618        let mut project = ProjectOperator::new(
619            Box::new(mock_scan),
620            vec![
621                ProjectExpr::Constant(Value::Int64(42)),
622                ProjectExpr::Constant(Value::String("fixed".into())),
623                ProjectExpr::Constant(Value::Bool(true)),
624            ],
625            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
626        );
627
628        let result = project.next().unwrap().unwrap();
629        assert_eq!(result.column_count(), 3);
630        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
631        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
632        assert_eq!(
633            result.column(2).unwrap().get_value(0),
634            Some(Value::Bool(true))
635        );
636    }
637
638    #[test]
639    fn test_project_identity() {
640        // Select all columns in original order
641        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
642        builder.column_mut(0).unwrap().push_int64(10);
643        builder.column_mut(1).unwrap().push_string("test");
644        builder.advance_row();
645        let chunk = builder.finish();
646
647        let mock_scan = MockScanOperator {
648            chunks: vec![chunk],
649            position: 0,
650        };
651
652        let mut project = ProjectOperator::select_columns(
653            Box::new(mock_scan),
654            vec![0, 1],
655            vec![LogicalType::Int64, LogicalType::String],
656        );
657
658        let result = project.next().unwrap().unwrap();
659        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
660        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
661    }
662
663    #[test]
664    fn test_project_name() {
665        let mock_scan = MockScanOperator {
666            chunks: vec![],
667            position: 0,
668        };
669        let project =
670            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
671        assert_eq!(project.name(), "Project");
672    }
673
674    #[test]
675    fn test_project_node_resolve() {
676        // Create a store with a test node
677        let store = LpgStore::new().unwrap();
678        let node_id = store.create_node(&["Person"]);
679        store.set_node_property(node_id, "name", Value::String("Alix".into()));
680        store.set_node_property(node_id, "age", Value::Int64(30));
681
682        // Create input chunk with a NodeId column
683        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
684        builder.column_mut(0).unwrap().push_node_id(node_id);
685        builder.advance_row();
686        let chunk = builder.finish();
687
688        let mock_scan = MockScanOperator {
689            chunks: vec![chunk],
690            position: 0,
691        };
692
693        let mut project = ProjectOperator::with_store(
694            Box::new(mock_scan),
695            vec![ProjectExpr::NodeResolve { column: 0 }],
696            vec![LogicalType::Any],
697            Arc::new(store),
698        );
699
700        let result = project.next().unwrap().unwrap();
701        assert_eq!(result.column_count(), 1);
702
703        let value = result.column(0).unwrap().get_value(0).unwrap();
704        if let Value::Map(map) = value {
705            assert_eq!(
706                map.get(&PropertyKey::new("_id")),
707                Some(&Value::Int64(node_id.as_u64() as i64))
708            );
709            assert!(map.get(&PropertyKey::new("_labels")).is_some());
710            assert_eq!(
711                map.get(&PropertyKey::new("name")),
712                Some(&Value::String("Alix".into()))
713            );
714            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
715        } else {
716            panic!("Expected Value::Map, got {:?}", value);
717        }
718    }
719
720    #[test]
721    fn test_project_edge_resolve() {
722        let store = LpgStore::new().unwrap();
723        let src = store.create_node(&["Person"]);
724        let dst = store.create_node(&["Company"]);
725        let edge_id = store.create_edge(src, dst, "WORKS_AT");
726        store.set_edge_property(edge_id, "since", Value::Int64(2020));
727
728        // Create input chunk with an EdgeId column
729        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
730        builder.column_mut(0).unwrap().push_edge_id(edge_id);
731        builder.advance_row();
732        let chunk = builder.finish();
733
734        let mock_scan = MockScanOperator {
735            chunks: vec![chunk],
736            position: 0,
737        };
738
739        let mut project = ProjectOperator::with_store(
740            Box::new(mock_scan),
741            vec![ProjectExpr::EdgeResolve { column: 0 }],
742            vec![LogicalType::Any],
743            Arc::new(store),
744        );
745
746        let result = project.next().unwrap().unwrap();
747        let value = result.column(0).unwrap().get_value(0).unwrap();
748        if let Value::Map(map) = value {
749            assert_eq!(
750                map.get(&PropertyKey::new("_id")),
751                Some(&Value::Int64(edge_id.as_u64() as i64))
752            );
753            assert_eq!(
754                map.get(&PropertyKey::new("_type")),
755                Some(&Value::String("WORKS_AT".into()))
756            );
757            assert_eq!(
758                map.get(&PropertyKey::new("_source")),
759                Some(&Value::Int64(src.as_u64() as i64))
760            );
761            assert_eq!(
762                map.get(&PropertyKey::new("_target")),
763                Some(&Value::Int64(dst.as_u64() as i64))
764            );
765            assert_eq!(
766                map.get(&PropertyKey::new("since")),
767                Some(&Value::Int64(2020))
768            );
769        } else {
770            panic!("Expected Value::Map, got {:?}", value);
771        }
772    }
773
774    #[test]
775    fn test_project_resolve_missing_entity() {
776        use grafeo_common::types::NodeId;
777
778        let store = LpgStore::new().unwrap();
779
780        // Create input chunk with a NodeId that doesn't exist in the store
781        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
782        builder
783            .column_mut(0)
784            .unwrap()
785            .push_node_id(NodeId::new(999));
786        builder.advance_row();
787        let chunk = builder.finish();
788
789        let mock_scan = MockScanOperator {
790            chunks: vec![chunk],
791            position: 0,
792        };
793
794        let mut project = ProjectOperator::with_store(
795            Box::new(mock_scan),
796            vec![ProjectExpr::NodeResolve { column: 0 }],
797            vec![LogicalType::Any],
798            Arc::new(store),
799        );
800
801        let result = project.next().unwrap().unwrap();
802        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
803    }
804}