Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47}
48
49/// A project operator that selects and transforms columns.
50pub struct ProjectOperator {
51    /// Child operator to read from.
52    child: Box<dyn Operator>,
53    /// Projection expressions.
54    projections: Vec<ProjectExpr>,
55    /// Output column types.
56    output_types: Vec<LogicalType>,
57    /// Optional store for property access.
58    store: Option<Arc<dyn GraphStore>>,
59    /// Transaction ID for MVCC-aware property lookups.
60    transaction_id: Option<TransactionId>,
61    /// Viewing epoch for MVCC-aware property lookups.
62    viewing_epoch: Option<EpochId>,
63    /// Session context for introspection functions in expression evaluation.
64    session_context: SessionContext,
65}
66
67impl ProjectOperator {
68    /// Creates a new project operator.
69    pub fn new(
70        child: Box<dyn Operator>,
71        projections: Vec<ProjectExpr>,
72        output_types: Vec<LogicalType>,
73    ) -> Self {
74        assert_eq!(projections.len(), output_types.len());
75        Self {
76            child,
77            projections,
78            output_types,
79            store: None,
80            transaction_id: None,
81            viewing_epoch: None,
82            session_context: SessionContext::default(),
83        }
84    }
85
86    /// Creates a new project operator with store access for property lookups.
87    pub fn with_store(
88        child: Box<dyn Operator>,
89        projections: Vec<ProjectExpr>,
90        output_types: Vec<LogicalType>,
91        store: Arc<dyn GraphStore>,
92    ) -> Self {
93        assert_eq!(projections.len(), output_types.len());
94        Self {
95            child,
96            projections,
97            output_types,
98            store: Some(store),
99            transaction_id: None,
100            viewing_epoch: None,
101            session_context: SessionContext::default(),
102        }
103    }
104
105    /// Sets the transaction context for MVCC-aware property lookups.
106    pub fn with_transaction_context(
107        mut self,
108        epoch: EpochId,
109        transaction_id: Option<TransactionId>,
110    ) -> Self {
111        self.viewing_epoch = Some(epoch);
112        self.transaction_id = transaction_id;
113        self
114    }
115
116    /// Sets the session context for introspection functions.
117    pub fn with_session_context(mut self, context: SessionContext) -> Self {
118        self.session_context = context;
119        self
120    }
121
122    /// Creates a project operator that selects specific columns.
123    pub fn select_columns(
124        child: Box<dyn Operator>,
125        columns: Vec<usize>,
126        types: Vec<LogicalType>,
127    ) -> Self {
128        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
129        Self::new(child, projections, types)
130    }
131}
132
133impl Operator for ProjectOperator {
134    fn next(&mut self) -> OperatorResult {
135        // Get next chunk from child
136        let Some(input) = self.child.next()? else {
137            return Ok(None);
138        };
139
140        // Create output chunk
141        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
142
143        // Evaluate each projection
144        for (i, proj) in self.projections.iter().enumerate() {
145            match proj {
146                ProjectExpr::Column(col_idx) => {
147                    // Copy column from input to output
148                    let input_col = input.column(*col_idx).ok_or_else(|| {
149                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
150                    })?;
151
152                    let output_col = output
153                        .column_mut(i)
154                        .expect("column exists: index matches projection schema");
155
156                    // Copy selected rows
157                    for row in input.selected_indices() {
158                        if let Some(value) = input_col.get_value(row) {
159                            output_col.push_value(value);
160                        }
161                    }
162                }
163                ProjectExpr::Constant(value) => {
164                    // Push constant for each row
165                    let output_col = output
166                        .column_mut(i)
167                        .expect("column exists: index matches projection schema");
168                    for _ in input.selected_indices() {
169                        output_col.push_value(value.clone());
170                    }
171                }
172                ProjectExpr::PropertyAccess { column, property } => {
173                    // Access property from node/edge in the specified column
174                    let input_col = input
175                        .column(*column)
176                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178                    let output_col = output
179                        .column_mut(i)
180                        .expect("column exists: index matches projection schema");
181
182                    let store = self.store.as_ref().ok_or_else(|| {
183                        OperatorError::Execution("Store required for property access".to_string())
184                    })?;
185
186                    // Extract property for each row.
187                    // For typed columns (VectorData::NodeId / EdgeId) there is
188                    // no ambiguity. For Generic/Any columns (e.g. after a hash
189                    // join), both get_node_id and get_edge_id can succeed on the
190                    // same Int64 value, so we verify against the store to resolve
191                    // the entity type.
192                    let prop_key = PropertyKey::new(property);
193                    let epoch = self.viewing_epoch;
194                    let tx_id = self.transaction_id;
195                    for row in input.selected_indices() {
196                        let value = if let Some(node_id) = input_col.get_node_id(row) {
197                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
198                                store.get_node_versioned(node_id, ep, tx)
199                            } else {
200                                store.get_node(node_id)
201                            };
202                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
203                            {
204                                prop
205                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
206                                // Node lookup failed: the ID may belong to an
207                                // edge (common with Generic columns after joins).
208                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
209                                    store.get_edge_versioned(edge_id, ep, tx)
210                                } else {
211                                    store.get_edge(edge_id)
212                                };
213                                edge.and_then(|e| e.get_property(property).cloned())
214                                    .unwrap_or(Value::Null)
215                            } else {
216                                Value::Null
217                            }
218                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
219                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
220                                store.get_edge_versioned(edge_id, ep, tx)
221                            } else {
222                                store.get_edge(edge_id)
223                            };
224                            edge.and_then(|e| e.get_property(property).cloned())
225                                .unwrap_or(Value::Null)
226                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
227                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
228                        } else {
229                            Value::Null
230                        };
231                        output_col.push_value(value);
232                    }
233                }
234                ProjectExpr::EdgeType { column } => {
235                    // Get edge type string from an edge column
236                    let input_col = input
237                        .column(*column)
238                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
239
240                    let output_col = output
241                        .column_mut(i)
242                        .expect("column exists: index matches projection schema");
243
244                    let store = self.store.as_ref().ok_or_else(|| {
245                        OperatorError::Execution("Store required for edge type access".to_string())
246                    })?;
247
248                    let epoch = self.viewing_epoch;
249                    let tx_id = self.transaction_id;
250                    for row in input.selected_indices() {
251                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
252                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
253                                store.edge_type_versioned(edge_id, ep, tx)
254                            } else {
255                                store.edge_type(edge_id)
256                            };
257                            etype.map_or(Value::Null, Value::String)
258                        } else {
259                            Value::Null
260                        };
261                        output_col.push_value(value);
262                    }
263                }
264                ProjectExpr::Expression {
265                    expr,
266                    variable_columns,
267                } => {
268                    let output_col = output
269                        .column_mut(i)
270                        .expect("column exists: index matches projection schema");
271
272                    let store = self.store.as_ref().ok_or_else(|| {
273                        OperatorError::Execution(
274                            "Store required for expression evaluation".to_string(),
275                        )
276                    })?;
277
278                    // Use the ExpressionPredicate for expression evaluation
279                    let mut evaluator = ExpressionPredicate::new(
280                        expr.clone(),
281                        variable_columns.clone(),
282                        Arc::clone(store),
283                    )
284                    .with_session_context(self.session_context.clone());
285                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
286                        evaluator = evaluator.with_transaction_context(ep, tx_id);
287                    }
288
289                    for row in input.selected_indices() {
290                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
291                        output_col.push_value(value);
292                    }
293                }
294                ProjectExpr::NodeResolve { column } => {
295                    let input_col = input
296                        .column(*column)
297                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
298
299                    let output_col = output
300                        .column_mut(i)
301                        .expect("column exists: index matches projection schema");
302
303                    let store = self.store.as_ref().ok_or_else(|| {
304                        OperatorError::Execution("Store required for node resolution".to_string())
305                    })?;
306
307                    let epoch = self.viewing_epoch;
308                    let tx_id = self.transaction_id;
309                    for row in input.selected_indices() {
310                        let value = if let Some(node_id) = input_col.get_node_id(row) {
311                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
312                                store.get_node_versioned(node_id, ep, tx)
313                            } else {
314                                store.get_node(node_id)
315                            };
316                            node.map_or(Value::Null, |n| node_to_map(&n))
317                        } else {
318                            Value::Null
319                        };
320                        output_col.push_value(value);
321                    }
322                }
323                ProjectExpr::EdgeResolve { column } => {
324                    let input_col = input
325                        .column(*column)
326                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
327
328                    let output_col = output
329                        .column_mut(i)
330                        .expect("column exists: index matches projection schema");
331
332                    let store = self.store.as_ref().ok_or_else(|| {
333                        OperatorError::Execution("Store required for edge resolution".to_string())
334                    })?;
335
336                    let epoch = self.viewing_epoch;
337                    let tx_id = self.transaction_id;
338                    for row in input.selected_indices() {
339                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
340                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
341                                store.get_edge_versioned(edge_id, ep, tx)
342                            } else {
343                                store.get_edge(edge_id)
344                            };
345                            edge.map_or(Value::Null, |e| edge_to_map(&e))
346                        } else {
347                            Value::Null
348                        };
349                        output_col.push_value(value);
350                    }
351                }
352            }
353        }
354
355        output.set_count(input.row_count());
356        Ok(Some(output))
357    }
358
359    fn reset(&mut self) {
360        self.child.reset();
361    }
362
363    fn name(&self) -> &'static str {
364        "Project"
365    }
366}
367
368/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
369///
370/// The map contains `_id` (integer), `_labels` (list of strings), and
371/// all node properties at the top level.
372fn node_to_map(node: &Node) -> Value {
373    let mut map = BTreeMap::new();
374    map.insert(
375        PropertyKey::new("_id"),
376        Value::Int64(node.id.as_u64() as i64),
377    );
378    let labels: Vec<Value> = node
379        .labels
380        .iter()
381        .map(|l| Value::String(l.clone()))
382        .collect();
383    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
384    for (key, value) in &node.properties {
385        map.insert(key.clone(), value.clone());
386    }
387    Value::Map(Arc::new(map))
388}
389
390/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
391///
392/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
393/// properties at the top level.
394fn edge_to_map(edge: &Edge) -> Value {
395    let mut map = BTreeMap::new();
396    map.insert(
397        PropertyKey::new("_id"),
398        Value::Int64(edge.id.as_u64() as i64),
399    );
400    map.insert(
401        PropertyKey::new("_type"),
402        Value::String(edge.edge_type.clone()),
403    );
404    map.insert(
405        PropertyKey::new("_source"),
406        Value::Int64(edge.src.as_u64() as i64),
407    );
408    map.insert(
409        PropertyKey::new("_target"),
410        Value::Int64(edge.dst.as_u64() as i64),
411    );
412    for (key, value) in &edge.properties {
413        map.insert(key.clone(), value.clone());
414    }
415    Value::Map(Arc::new(map))
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::execution::chunk::DataChunkBuilder;
422    use crate::graph::lpg::LpgStore;
423    use grafeo_common::types::Value;
424
425    struct MockScanOperator {
426        chunks: Vec<DataChunk>,
427        position: usize,
428    }
429
430    impl Operator for MockScanOperator {
431        fn next(&mut self) -> OperatorResult {
432            if self.position < self.chunks.len() {
433                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
434                self.position += 1;
435                Ok(Some(chunk))
436            } else {
437                Ok(None)
438            }
439        }
440
441        fn reset(&mut self) {
442            self.position = 0;
443        }
444
445        fn name(&self) -> &'static str {
446            "MockScan"
447        }
448    }
449
450    #[test]
451    fn test_project_select_columns() {
452        // Create input with 3 columns: [int, string, int]
453        let mut builder =
454            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
455
456        builder.column_mut(0).unwrap().push_int64(1);
457        builder.column_mut(1).unwrap().push_string("hello");
458        builder.column_mut(2).unwrap().push_int64(100);
459        builder.advance_row();
460
461        builder.column_mut(0).unwrap().push_int64(2);
462        builder.column_mut(1).unwrap().push_string("world");
463        builder.column_mut(2).unwrap().push_int64(200);
464        builder.advance_row();
465
466        let chunk = builder.finish();
467
468        let mock_scan = MockScanOperator {
469            chunks: vec![chunk],
470            position: 0,
471        };
472
473        // Project to select columns 2 and 0 (reordering)
474        let mut project = ProjectOperator::select_columns(
475            Box::new(mock_scan),
476            vec![2, 0],
477            vec![LogicalType::Int64, LogicalType::Int64],
478        );
479
480        let result = project.next().unwrap().unwrap();
481
482        assert_eq!(result.column_count(), 2);
483        assert_eq!(result.row_count(), 2);
484
485        // Check values are reordered
486        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
487        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
488    }
489
490    #[test]
491    fn test_project_constant() {
492        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
493        builder.column_mut(0).unwrap().push_int64(1);
494        builder.advance_row();
495        builder.column_mut(0).unwrap().push_int64(2);
496        builder.advance_row();
497
498        let chunk = builder.finish();
499
500        let mock_scan = MockScanOperator {
501            chunks: vec![chunk],
502            position: 0,
503        };
504
505        // Project with a constant
506        let mut project = ProjectOperator::new(
507            Box::new(mock_scan),
508            vec![
509                ProjectExpr::Column(0),
510                ProjectExpr::Constant(Value::String("constant".into())),
511            ],
512            vec![LogicalType::Int64, LogicalType::String],
513        );
514
515        let result = project.next().unwrap().unwrap();
516
517        assert_eq!(result.column_count(), 2);
518        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
519        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
520    }
521
522    #[test]
523    fn test_project_empty_input() {
524        let mock_scan = MockScanOperator {
525            chunks: vec![],
526            position: 0,
527        };
528
529        let mut project =
530            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
531
532        assert!(project.next().unwrap().is_none());
533    }
534
535    #[test]
536    fn test_project_column_not_found() {
537        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
538        builder.column_mut(0).unwrap().push_int64(1);
539        builder.advance_row();
540        let chunk = builder.finish();
541
542        let mock_scan = MockScanOperator {
543            chunks: vec![chunk],
544            position: 0,
545        };
546
547        // Reference column index 5 which doesn't exist
548        let mut project = ProjectOperator::new(
549            Box::new(mock_scan),
550            vec![ProjectExpr::Column(5)],
551            vec![LogicalType::Int64],
552        );
553
554        let result = project.next();
555        assert!(result.is_err(), "Should fail with ColumnNotFound");
556    }
557
558    #[test]
559    fn test_project_multiple_constants() {
560        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
561        builder.column_mut(0).unwrap().push_int64(1);
562        builder.advance_row();
563        let chunk = builder.finish();
564
565        let mock_scan = MockScanOperator {
566            chunks: vec![chunk],
567            position: 0,
568        };
569
570        let mut project = ProjectOperator::new(
571            Box::new(mock_scan),
572            vec![
573                ProjectExpr::Constant(Value::Int64(42)),
574                ProjectExpr::Constant(Value::String("fixed".into())),
575                ProjectExpr::Constant(Value::Bool(true)),
576            ],
577            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
578        );
579
580        let result = project.next().unwrap().unwrap();
581        assert_eq!(result.column_count(), 3);
582        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
583        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
584        assert_eq!(
585            result.column(2).unwrap().get_value(0),
586            Some(Value::Bool(true))
587        );
588    }
589
590    #[test]
591    fn test_project_identity() {
592        // Select all columns in original order
593        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
594        builder.column_mut(0).unwrap().push_int64(10);
595        builder.column_mut(1).unwrap().push_string("test");
596        builder.advance_row();
597        let chunk = builder.finish();
598
599        let mock_scan = MockScanOperator {
600            chunks: vec![chunk],
601            position: 0,
602        };
603
604        let mut project = ProjectOperator::select_columns(
605            Box::new(mock_scan),
606            vec![0, 1],
607            vec![LogicalType::Int64, LogicalType::String],
608        );
609
610        let result = project.next().unwrap().unwrap();
611        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
612        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
613    }
614
615    #[test]
616    fn test_project_name() {
617        let mock_scan = MockScanOperator {
618            chunks: vec![],
619            position: 0,
620        };
621        let project =
622            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
623        assert_eq!(project.name(), "Project");
624    }
625
626    #[test]
627    fn test_project_node_resolve() {
628        // Create a store with a test node
629        let store = LpgStore::new().unwrap();
630        let node_id = store.create_node(&["Person"]);
631        store.set_node_property(node_id, "name", Value::String("Alix".into()));
632        store.set_node_property(node_id, "age", Value::Int64(30));
633
634        // Create input chunk with a NodeId column
635        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
636        builder.column_mut(0).unwrap().push_node_id(node_id);
637        builder.advance_row();
638        let chunk = builder.finish();
639
640        let mock_scan = MockScanOperator {
641            chunks: vec![chunk],
642            position: 0,
643        };
644
645        let mut project = ProjectOperator::with_store(
646            Box::new(mock_scan),
647            vec![ProjectExpr::NodeResolve { column: 0 }],
648            vec![LogicalType::Any],
649            Arc::new(store),
650        );
651
652        let result = project.next().unwrap().unwrap();
653        assert_eq!(result.column_count(), 1);
654
655        let value = result.column(0).unwrap().get_value(0).unwrap();
656        if let Value::Map(map) = value {
657            assert_eq!(
658                map.get(&PropertyKey::new("_id")),
659                Some(&Value::Int64(node_id.as_u64() as i64))
660            );
661            assert!(map.get(&PropertyKey::new("_labels")).is_some());
662            assert_eq!(
663                map.get(&PropertyKey::new("name")),
664                Some(&Value::String("Alix".into()))
665            );
666            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
667        } else {
668            panic!("Expected Value::Map, got {:?}", value);
669        }
670    }
671
672    #[test]
673    fn test_project_edge_resolve() {
674        let store = LpgStore::new().unwrap();
675        let src = store.create_node(&["Person"]);
676        let dst = store.create_node(&["Company"]);
677        let edge_id = store.create_edge(src, dst, "WORKS_AT");
678        store.set_edge_property(edge_id, "since", Value::Int64(2020));
679
680        // Create input chunk with an EdgeId column
681        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
682        builder.column_mut(0).unwrap().push_edge_id(edge_id);
683        builder.advance_row();
684        let chunk = builder.finish();
685
686        let mock_scan = MockScanOperator {
687            chunks: vec![chunk],
688            position: 0,
689        };
690
691        let mut project = ProjectOperator::with_store(
692            Box::new(mock_scan),
693            vec![ProjectExpr::EdgeResolve { column: 0 }],
694            vec![LogicalType::Any],
695            Arc::new(store),
696        );
697
698        let result = project.next().unwrap().unwrap();
699        let value = result.column(0).unwrap().get_value(0).unwrap();
700        if let Value::Map(map) = value {
701            assert_eq!(
702                map.get(&PropertyKey::new("_id")),
703                Some(&Value::Int64(edge_id.as_u64() as i64))
704            );
705            assert_eq!(
706                map.get(&PropertyKey::new("_type")),
707                Some(&Value::String("WORKS_AT".into()))
708            );
709            assert_eq!(
710                map.get(&PropertyKey::new("_source")),
711                Some(&Value::Int64(src.as_u64() as i64))
712            );
713            assert_eq!(
714                map.get(&PropertyKey::new("_target")),
715                Some(&Value::Int64(dst.as_u64() as i64))
716            );
717            assert_eq!(
718                map.get(&PropertyKey::new("since")),
719                Some(&Value::Int64(2020))
720            );
721        } else {
722            panic!("Expected Value::Map, got {:?}", value);
723        }
724    }
725
726    #[test]
727    fn test_project_resolve_missing_entity() {
728        use grafeo_common::types::NodeId;
729
730        let store = LpgStore::new().unwrap();
731
732        // Create input chunk with a NodeId that doesn't exist in the store
733        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
734        builder
735            .column_mut(0)
736            .unwrap()
737            .push_node_id(NodeId::new(999));
738        builder.advance_row();
739        let chunk = builder.finish();
740
741        let mock_scan = MockScanOperator {
742            chunks: vec![chunk],
743            position: 0,
744        };
745
746        let mut project = ProjectOperator::with_store(
747            Box::new(mock_scan),
748            vec![ProjectExpr::NodeResolve { column: 0 }],
749            vec![LogicalType::Any],
750            Arc::new(store),
751        );
752
753        let result = project.next().unwrap().unwrap();
754        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
755    }
756}