Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13pub enum ProjectExpr {
14    /// Reference to an input column.
15    Column(usize),
16    /// A constant value.
17    Constant(Value),
18    /// Property access on a node/edge column.
19    PropertyAccess {
20        /// The column containing the node or edge ID.
21        column: usize,
22        /// The property name to access.
23        property: String,
24    },
25    /// Edge type accessor (for type(r) function).
26    EdgeType {
27        /// The column containing the edge ID.
28        column: usize,
29    },
30    /// Full expression evaluation (for CASE WHEN, etc.).
31    Expression {
32        /// The filter expression to evaluate.
33        expr: FilterExpression,
34        /// Variable name to column index mapping.
35        variable_columns: HashMap<String, usize>,
36    },
37    /// Resolve a node ID column to a full node map with metadata and properties.
38    NodeResolve {
39        /// The column containing the node ID.
40        column: usize,
41    },
42    /// Resolve an edge ID column to a full edge map with metadata and properties.
43    EdgeResolve {
44        /// The column containing the edge ID.
45        column: usize,
46    },
47}
48
49/// A project operator that selects and transforms columns.
50pub struct ProjectOperator {
51    /// Child operator to read from.
52    child: Box<dyn Operator>,
53    /// Projection expressions.
54    projections: Vec<ProjectExpr>,
55    /// Output column types.
56    output_types: Vec<LogicalType>,
57    /// Optional store for property access.
58    store: Option<Arc<dyn GraphStore>>,
59    /// Transaction ID for MVCC-aware property lookups.
60    transaction_id: Option<TransactionId>,
61    /// Viewing epoch for MVCC-aware property lookups.
62    viewing_epoch: Option<EpochId>,
63}
64
65impl ProjectOperator {
66    /// Creates a new project operator.
67    pub fn new(
68        child: Box<dyn Operator>,
69        projections: Vec<ProjectExpr>,
70        output_types: Vec<LogicalType>,
71    ) -> Self {
72        assert_eq!(projections.len(), output_types.len());
73        Self {
74            child,
75            projections,
76            output_types,
77            store: None,
78            transaction_id: None,
79            viewing_epoch: None,
80        }
81    }
82
83    /// Creates a new project operator with store access for property lookups.
84    pub fn with_store(
85        child: Box<dyn Operator>,
86        projections: Vec<ProjectExpr>,
87        output_types: Vec<LogicalType>,
88        store: Arc<dyn GraphStore>,
89    ) -> Self {
90        assert_eq!(projections.len(), output_types.len());
91        Self {
92            child,
93            projections,
94            output_types,
95            store: Some(store),
96            transaction_id: None,
97            viewing_epoch: None,
98        }
99    }
100
101    /// Sets the transaction context for MVCC-aware property lookups.
102    pub fn with_transaction_context(
103        mut self,
104        epoch: EpochId,
105        transaction_id: Option<TransactionId>,
106    ) -> Self {
107        self.viewing_epoch = Some(epoch);
108        self.transaction_id = transaction_id;
109        self
110    }
111
112    /// Creates a project operator that selects specific columns.
113    pub fn select_columns(
114        child: Box<dyn Operator>,
115        columns: Vec<usize>,
116        types: Vec<LogicalType>,
117    ) -> Self {
118        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
119        Self::new(child, projections, types)
120    }
121}
122
123impl Operator for ProjectOperator {
124    fn next(&mut self) -> OperatorResult {
125        // Get next chunk from child
126        let Some(input) = self.child.next()? else {
127            return Ok(None);
128        };
129
130        // Create output chunk
131        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
132
133        // Evaluate each projection
134        for (i, proj) in self.projections.iter().enumerate() {
135            match proj {
136                ProjectExpr::Column(col_idx) => {
137                    // Copy column from input to output
138                    let input_col = input.column(*col_idx).ok_or_else(|| {
139                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
140                    })?;
141
142                    let output_col = output
143                        .column_mut(i)
144                        .expect("column exists: index matches projection schema");
145
146                    // Copy selected rows
147                    for row in input.selected_indices() {
148                        if let Some(value) = input_col.get_value(row) {
149                            output_col.push_value(value);
150                        }
151                    }
152                }
153                ProjectExpr::Constant(value) => {
154                    // Push constant for each row
155                    let output_col = output
156                        .column_mut(i)
157                        .expect("column exists: index matches projection schema");
158                    for _ in input.selected_indices() {
159                        output_col.push_value(value.clone());
160                    }
161                }
162                ProjectExpr::PropertyAccess { column, property } => {
163                    // Access property from node/edge in the specified column
164                    let input_col = input
165                        .column(*column)
166                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
167
168                    let output_col = output
169                        .column_mut(i)
170                        .expect("column exists: index matches projection schema");
171
172                    let store = self.store.as_ref().ok_or_else(|| {
173                        OperatorError::Execution("Store required for property access".to_string())
174                    })?;
175
176                    // Extract property for each row.
177                    // For typed columns (VectorData::NodeId / EdgeId) there is
178                    // no ambiguity. For Generic/Any columns (e.g. after a hash
179                    // join), both get_node_id and get_edge_id can succeed on the
180                    // same Int64 value, so we verify against the store to resolve
181                    // the entity type.
182                    let prop_key = PropertyKey::new(property);
183                    let epoch = self.viewing_epoch;
184                    let tx_id = self.transaction_id;
185                    for row in input.selected_indices() {
186                        let value = if let Some(node_id) = input_col.get_node_id(row) {
187                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
188                                store.get_node_versioned(node_id, ep, tx)
189                            } else {
190                                store.get_node(node_id)
191                            };
192                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
193                            {
194                                prop
195                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
196                                // Node lookup failed: the ID may belong to an
197                                // edge (common with Generic columns after joins).
198                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
199                                    store.get_edge_versioned(edge_id, ep, tx)
200                                } else {
201                                    store.get_edge(edge_id)
202                                };
203                                edge.and_then(|e| e.get_property(property).cloned())
204                                    .unwrap_or(Value::Null)
205                            } else {
206                                Value::Null
207                            }
208                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
209                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
210                                store.get_edge_versioned(edge_id, ep, tx)
211                            } else {
212                                store.get_edge(edge_id)
213                            };
214                            edge.and_then(|e| e.get_property(property).cloned())
215                                .unwrap_or(Value::Null)
216                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
217                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
218                        } else {
219                            Value::Null
220                        };
221                        output_col.push_value(value);
222                    }
223                }
224                ProjectExpr::EdgeType { column } => {
225                    // Get edge type string from an edge column
226                    let input_col = input
227                        .column(*column)
228                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
229
230                    let output_col = output
231                        .column_mut(i)
232                        .expect("column exists: index matches projection schema");
233
234                    let store = self.store.as_ref().ok_or_else(|| {
235                        OperatorError::Execution("Store required for edge type access".to_string())
236                    })?;
237
238                    let epoch = self.viewing_epoch;
239                    let tx_id = self.transaction_id;
240                    for row in input.selected_indices() {
241                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
242                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
243                                store.edge_type_versioned(edge_id, ep, tx)
244                            } else {
245                                store.edge_type(edge_id)
246                            };
247                            etype.map_or(Value::Null, Value::String)
248                        } else {
249                            Value::Null
250                        };
251                        output_col.push_value(value);
252                    }
253                }
254                ProjectExpr::Expression {
255                    expr,
256                    variable_columns,
257                } => {
258                    let output_col = output
259                        .column_mut(i)
260                        .expect("column exists: index matches projection schema");
261
262                    let store = self.store.as_ref().ok_or_else(|| {
263                        OperatorError::Execution(
264                            "Store required for expression evaluation".to_string(),
265                        )
266                    })?;
267
268                    // Use the ExpressionPredicate for expression evaluation
269                    let mut evaluator = ExpressionPredicate::new(
270                        expr.clone(),
271                        variable_columns.clone(),
272                        Arc::clone(store),
273                    );
274                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
275                        evaluator = evaluator.with_transaction_context(ep, tx_id);
276                    }
277
278                    for row in input.selected_indices() {
279                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
280                        output_col.push_value(value);
281                    }
282                }
283                ProjectExpr::NodeResolve { column } => {
284                    let input_col = input
285                        .column(*column)
286                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
287
288                    let output_col = output
289                        .column_mut(i)
290                        .expect("column exists: index matches projection schema");
291
292                    let store = self.store.as_ref().ok_or_else(|| {
293                        OperatorError::Execution("Store required for node resolution".to_string())
294                    })?;
295
296                    let epoch = self.viewing_epoch;
297                    let tx_id = self.transaction_id;
298                    for row in input.selected_indices() {
299                        let value = if let Some(node_id) = input_col.get_node_id(row) {
300                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
301                                store.get_node_versioned(node_id, ep, tx)
302                            } else {
303                                store.get_node(node_id)
304                            };
305                            node.map_or(Value::Null, |n| node_to_map(&n))
306                        } else {
307                            Value::Null
308                        };
309                        output_col.push_value(value);
310                    }
311                }
312                ProjectExpr::EdgeResolve { column } => {
313                    let input_col = input
314                        .column(*column)
315                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
316
317                    let output_col = output
318                        .column_mut(i)
319                        .expect("column exists: index matches projection schema");
320
321                    let store = self.store.as_ref().ok_or_else(|| {
322                        OperatorError::Execution("Store required for edge resolution".to_string())
323                    })?;
324
325                    let epoch = self.viewing_epoch;
326                    let tx_id = self.transaction_id;
327                    for row in input.selected_indices() {
328                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
329                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
330                                store.get_edge_versioned(edge_id, ep, tx)
331                            } else {
332                                store.get_edge(edge_id)
333                            };
334                            edge.map_or(Value::Null, |e| edge_to_map(&e))
335                        } else {
336                            Value::Null
337                        };
338                        output_col.push_value(value);
339                    }
340                }
341            }
342        }
343
344        output.set_count(input.row_count());
345        Ok(Some(output))
346    }
347
348    fn reset(&mut self) {
349        self.child.reset();
350    }
351
352    fn name(&self) -> &'static str {
353        "Project"
354    }
355}
356
357/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
358///
359/// The map contains `_id` (integer), `_labels` (list of strings), and
360/// all node properties at the top level.
361fn node_to_map(node: &Node) -> Value {
362    let mut map = BTreeMap::new();
363    map.insert(
364        PropertyKey::new("_id"),
365        Value::Int64(node.id.as_u64() as i64),
366    );
367    let labels: Vec<Value> = node
368        .labels
369        .iter()
370        .map(|l| Value::String(l.clone()))
371        .collect();
372    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
373    for (key, value) in &node.properties {
374        map.insert(key.clone(), value.clone());
375    }
376    Value::Map(Arc::new(map))
377}
378
379/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
380///
381/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
382/// properties at the top level.
383fn edge_to_map(edge: &Edge) -> Value {
384    let mut map = BTreeMap::new();
385    map.insert(
386        PropertyKey::new("_id"),
387        Value::Int64(edge.id.as_u64() as i64),
388    );
389    map.insert(
390        PropertyKey::new("_type"),
391        Value::String(edge.edge_type.clone()),
392    );
393    map.insert(
394        PropertyKey::new("_source"),
395        Value::Int64(edge.src.as_u64() as i64),
396    );
397    map.insert(
398        PropertyKey::new("_target"),
399        Value::Int64(edge.dst.as_u64() as i64),
400    );
401    for (key, value) in &edge.properties {
402        map.insert(key.clone(), value.clone());
403    }
404    Value::Map(Arc::new(map))
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use crate::execution::chunk::DataChunkBuilder;
411    use crate::graph::lpg::LpgStore;
412    use grafeo_common::types::Value;
413
414    struct MockScanOperator {
415        chunks: Vec<DataChunk>,
416        position: usize,
417    }
418
419    impl Operator for MockScanOperator {
420        fn next(&mut self) -> OperatorResult {
421            if self.position < self.chunks.len() {
422                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
423                self.position += 1;
424                Ok(Some(chunk))
425            } else {
426                Ok(None)
427            }
428        }
429
430        fn reset(&mut self) {
431            self.position = 0;
432        }
433
434        fn name(&self) -> &'static str {
435            "MockScan"
436        }
437    }
438
439    #[test]
440    fn test_project_select_columns() {
441        // Create input with 3 columns: [int, string, int]
442        let mut builder =
443            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
444
445        builder.column_mut(0).unwrap().push_int64(1);
446        builder.column_mut(1).unwrap().push_string("hello");
447        builder.column_mut(2).unwrap().push_int64(100);
448        builder.advance_row();
449
450        builder.column_mut(0).unwrap().push_int64(2);
451        builder.column_mut(1).unwrap().push_string("world");
452        builder.column_mut(2).unwrap().push_int64(200);
453        builder.advance_row();
454
455        let chunk = builder.finish();
456
457        let mock_scan = MockScanOperator {
458            chunks: vec![chunk],
459            position: 0,
460        };
461
462        // Project to select columns 2 and 0 (reordering)
463        let mut project = ProjectOperator::select_columns(
464            Box::new(mock_scan),
465            vec![2, 0],
466            vec![LogicalType::Int64, LogicalType::Int64],
467        );
468
469        let result = project.next().unwrap().unwrap();
470
471        assert_eq!(result.column_count(), 2);
472        assert_eq!(result.row_count(), 2);
473
474        // Check values are reordered
475        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
476        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
477    }
478
479    #[test]
480    fn test_project_constant() {
481        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
482        builder.column_mut(0).unwrap().push_int64(1);
483        builder.advance_row();
484        builder.column_mut(0).unwrap().push_int64(2);
485        builder.advance_row();
486
487        let chunk = builder.finish();
488
489        let mock_scan = MockScanOperator {
490            chunks: vec![chunk],
491            position: 0,
492        };
493
494        // Project with a constant
495        let mut project = ProjectOperator::new(
496            Box::new(mock_scan),
497            vec![
498                ProjectExpr::Column(0),
499                ProjectExpr::Constant(Value::String("constant".into())),
500            ],
501            vec![LogicalType::Int64, LogicalType::String],
502        );
503
504        let result = project.next().unwrap().unwrap();
505
506        assert_eq!(result.column_count(), 2);
507        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
508        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
509    }
510
511    #[test]
512    fn test_project_empty_input() {
513        let mock_scan = MockScanOperator {
514            chunks: vec![],
515            position: 0,
516        };
517
518        let mut project =
519            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
520
521        assert!(project.next().unwrap().is_none());
522    }
523
524    #[test]
525    fn test_project_column_not_found() {
526        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
527        builder.column_mut(0).unwrap().push_int64(1);
528        builder.advance_row();
529        let chunk = builder.finish();
530
531        let mock_scan = MockScanOperator {
532            chunks: vec![chunk],
533            position: 0,
534        };
535
536        // Reference column index 5 which doesn't exist
537        let mut project = ProjectOperator::new(
538            Box::new(mock_scan),
539            vec![ProjectExpr::Column(5)],
540            vec![LogicalType::Int64],
541        );
542
543        let result = project.next();
544        assert!(result.is_err(), "Should fail with ColumnNotFound");
545    }
546
547    #[test]
548    fn test_project_multiple_constants() {
549        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
550        builder.column_mut(0).unwrap().push_int64(1);
551        builder.advance_row();
552        let chunk = builder.finish();
553
554        let mock_scan = MockScanOperator {
555            chunks: vec![chunk],
556            position: 0,
557        };
558
559        let mut project = ProjectOperator::new(
560            Box::new(mock_scan),
561            vec![
562                ProjectExpr::Constant(Value::Int64(42)),
563                ProjectExpr::Constant(Value::String("fixed".into())),
564                ProjectExpr::Constant(Value::Bool(true)),
565            ],
566            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
567        );
568
569        let result = project.next().unwrap().unwrap();
570        assert_eq!(result.column_count(), 3);
571        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
572        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
573        assert_eq!(
574            result.column(2).unwrap().get_value(0),
575            Some(Value::Bool(true))
576        );
577    }
578
579    #[test]
580    fn test_project_identity() {
581        // Select all columns in original order
582        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
583        builder.column_mut(0).unwrap().push_int64(10);
584        builder.column_mut(1).unwrap().push_string("test");
585        builder.advance_row();
586        let chunk = builder.finish();
587
588        let mock_scan = MockScanOperator {
589            chunks: vec![chunk],
590            position: 0,
591        };
592
593        let mut project = ProjectOperator::select_columns(
594            Box::new(mock_scan),
595            vec![0, 1],
596            vec![LogicalType::Int64, LogicalType::String],
597        );
598
599        let result = project.next().unwrap().unwrap();
600        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
601        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
602    }
603
604    #[test]
605    fn test_project_name() {
606        let mock_scan = MockScanOperator {
607            chunks: vec![],
608            position: 0,
609        };
610        let project =
611            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
612        assert_eq!(project.name(), "Project");
613    }
614
615    #[test]
616    fn test_project_node_resolve() {
617        // Create a store with a test node
618        let store = LpgStore::new().unwrap();
619        let node_id = store.create_node(&["Person"]);
620        store.set_node_property(node_id, "name", Value::String("Alix".into()));
621        store.set_node_property(node_id, "age", Value::Int64(30));
622
623        // Create input chunk with a NodeId column
624        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
625        builder.column_mut(0).unwrap().push_node_id(node_id);
626        builder.advance_row();
627        let chunk = builder.finish();
628
629        let mock_scan = MockScanOperator {
630            chunks: vec![chunk],
631            position: 0,
632        };
633
634        let mut project = ProjectOperator::with_store(
635            Box::new(mock_scan),
636            vec![ProjectExpr::NodeResolve { column: 0 }],
637            vec![LogicalType::Any],
638            Arc::new(store),
639        );
640
641        let result = project.next().unwrap().unwrap();
642        assert_eq!(result.column_count(), 1);
643
644        let value = result.column(0).unwrap().get_value(0).unwrap();
645        if let Value::Map(map) = value {
646            assert_eq!(
647                map.get(&PropertyKey::new("_id")),
648                Some(&Value::Int64(node_id.as_u64() as i64))
649            );
650            assert!(map.get(&PropertyKey::new("_labels")).is_some());
651            assert_eq!(
652                map.get(&PropertyKey::new("name")),
653                Some(&Value::String("Alix".into()))
654            );
655            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
656        } else {
657            panic!("Expected Value::Map, got {:?}", value);
658        }
659    }
660
661    #[test]
662    fn test_project_edge_resolve() {
663        let store = LpgStore::new().unwrap();
664        let src = store.create_node(&["Person"]);
665        let dst = store.create_node(&["Company"]);
666        let edge_id = store.create_edge(src, dst, "WORKS_AT");
667        store.set_edge_property(edge_id, "since", Value::Int64(2020));
668
669        // Create input chunk with an EdgeId column
670        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
671        builder.column_mut(0).unwrap().push_edge_id(edge_id);
672        builder.advance_row();
673        let chunk = builder.finish();
674
675        let mock_scan = MockScanOperator {
676            chunks: vec![chunk],
677            position: 0,
678        };
679
680        let mut project = ProjectOperator::with_store(
681            Box::new(mock_scan),
682            vec![ProjectExpr::EdgeResolve { column: 0 }],
683            vec![LogicalType::Any],
684            Arc::new(store),
685        );
686
687        let result = project.next().unwrap().unwrap();
688        let value = result.column(0).unwrap().get_value(0).unwrap();
689        if let Value::Map(map) = value {
690            assert_eq!(
691                map.get(&PropertyKey::new("_id")),
692                Some(&Value::Int64(edge_id.as_u64() as i64))
693            );
694            assert_eq!(
695                map.get(&PropertyKey::new("_type")),
696                Some(&Value::String("WORKS_AT".into()))
697            );
698            assert_eq!(
699                map.get(&PropertyKey::new("_source")),
700                Some(&Value::Int64(src.as_u64() as i64))
701            );
702            assert_eq!(
703                map.get(&PropertyKey::new("_target")),
704                Some(&Value::Int64(dst.as_u64() as i64))
705            );
706            assert_eq!(
707                map.get(&PropertyKey::new("since")),
708                Some(&Value::Int64(2020))
709            );
710        } else {
711            panic!("Expected Value::Map, got {:?}", value);
712        }
713    }
714
715    #[test]
716    fn test_project_resolve_missing_entity() {
717        use grafeo_common::types::NodeId;
718
719        let store = LpgStore::new().unwrap();
720
721        // Create input chunk with a NodeId that doesn't exist in the store
722        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
723        builder
724            .column_mut(0)
725            .unwrap()
726            .push_node_id(NodeId::new(999));
727        builder.advance_row();
728        let chunk = builder.finish();
729
730        let mock_scan = MockScanOperator {
731            chunks: vec![chunk],
732            position: 0,
733        };
734
735        let mut project = ProjectOperator::with_store(
736            Box::new(mock_scan),
737            vec![ProjectExpr::NodeResolve { column: 0 }],
738            vec![LogicalType::Any],
739            Arc::new(store),
740        );
741
742        let result = project.next().unwrap().unwrap();
743        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
744    }
745}