Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStoreSearch;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12/// A projection expression.
13#[non_exhaustive]
14pub enum ProjectExpr {
15    /// Reference to an input column.
16    Column(usize),
17    /// A constant value.
18    Constant(Value),
19    /// Property access on a node/edge column.
20    PropertyAccess {
21        /// The column containing the node or edge ID.
22        column: usize,
23        /// The property name to access.
24        property: String,
25    },
26    /// Edge type accessor (for type(r) function).
27    EdgeType {
28        /// The column containing the edge ID.
29        column: usize,
30    },
31    /// Full expression evaluation (for CASE WHEN, etc.).
32    Expression {
33        /// The filter expression to evaluate.
34        expr: FilterExpression,
35        /// Variable name to column index mapping.
36        variable_columns: HashMap<String, usize>,
37    },
38    /// Resolve a node ID column to a full node map with metadata and properties.
39    NodeResolve {
40        /// The column containing the node ID.
41        column: usize,
42    },
43    /// Resolve an edge ID column to a full edge map with metadata and properties.
44    EdgeResolve {
45        /// The column containing the edge ID.
46        column: usize,
47    },
48    /// Returns the first non-null value from two columns (used for RIGHT/FULL join dedup).
49    Coalesce {
50        /// Primary column index.
51        first: usize,
52        /// Fallback column index.
53        second: usize,
54    },
55}
56
57/// A project operator that selects and transforms columns.
58pub struct ProjectOperator {
59    /// Child operator to read from.
60    child: Box<dyn Operator>,
61    /// Projection expressions.
62    projections: Vec<ProjectExpr>,
63    /// Output column types.
64    output_types: Vec<LogicalType>,
65    /// Optional store for property access.
66    store: Option<Arc<dyn GraphStoreSearch>>,
67    /// Transaction ID for MVCC-aware property lookups.
68    transaction_id: Option<TransactionId>,
69    /// Viewing epoch for MVCC-aware property lookups.
70    viewing_epoch: Option<EpochId>,
71    /// Session context for introspection functions in expression evaluation.
72    session_context: SessionContext,
73}
74
75impl ProjectOperator {
76    /// Creates a new project operator.
77    ///
78    /// # Panics
79    ///
80    /// Panics if `projections` and `output_types` have different lengths.
81    pub fn new(
82        child: Box<dyn Operator>,
83        projections: Vec<ProjectExpr>,
84        output_types: Vec<LogicalType>,
85    ) -> Self {
86        assert_eq!(projections.len(), output_types.len());
87        Self {
88            child,
89            projections,
90            output_types,
91            store: None,
92            transaction_id: None,
93            viewing_epoch: None,
94            session_context: SessionContext::default(),
95        }
96    }
97
98    /// Creates a new project operator with store access for property lookups.
99    ///
100    /// # Panics
101    ///
102    /// Panics if `projections` and `output_types` have different lengths.
103    pub fn with_store(
104        child: Box<dyn Operator>,
105        projections: Vec<ProjectExpr>,
106        output_types: Vec<LogicalType>,
107        store: Arc<dyn GraphStoreSearch>,
108    ) -> Self {
109        assert_eq!(projections.len(), output_types.len());
110        Self {
111            child,
112            projections,
113            output_types,
114            store: Some(store),
115            transaction_id: None,
116            viewing_epoch: None,
117            session_context: SessionContext::default(),
118        }
119    }
120
121    /// Sets the transaction context for MVCC-aware property lookups.
122    pub fn with_transaction_context(
123        mut self,
124        epoch: EpochId,
125        transaction_id: Option<TransactionId>,
126    ) -> Self {
127        self.viewing_epoch = Some(epoch);
128        self.transaction_id = transaction_id;
129        self
130    }
131
132    /// Sets the session context for introspection functions.
133    pub fn with_session_context(mut self, context: SessionContext) -> Self {
134        self.session_context = context;
135        self
136    }
137
138    /// Decomposes this operator into its child and projections for push-based conversion.
139    pub fn into_parts(self) -> (Box<dyn Operator>, Vec<ProjectExpr>, Vec<LogicalType>) {
140        (self.child, self.projections, self.output_types)
141    }
142
143    /// Creates a project operator that selects specific columns.
144    pub fn select_columns(
145        child: Box<dyn Operator>,
146        columns: Vec<usize>,
147        types: Vec<LogicalType>,
148    ) -> Self {
149        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
150        Self::new(child, projections, types)
151    }
152}
153
154impl Operator for ProjectOperator {
155    fn next(&mut self) -> OperatorResult {
156        // Get next chunk from child
157        let Some(input) = self.child.next()? else {
158            return Ok(None);
159        };
160
161        // Create output chunk
162        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
163
164        // Evaluate each projection
165        for (i, proj) in self.projections.iter().enumerate() {
166            match proj {
167                ProjectExpr::Column(col_idx) => {
168                    // Copy column from input to output
169                    let input_col = input.column(*col_idx).ok_or_else(|| {
170                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
171                    })?;
172
173                    let output_col = output
174                        .column_mut(i)
175                        .expect("column exists: index matches projection schema");
176
177                    // Copy selected rows
178                    for row in input.selected_indices() {
179                        if let Some(value) = input_col.get_value(row) {
180                            output_col.push_value(value);
181                        }
182                    }
183                }
184                ProjectExpr::Constant(value) => {
185                    // Push constant for each row
186                    let output_col = output
187                        .column_mut(i)
188                        .expect("column exists: index matches projection schema");
189                    for _ in input.selected_indices() {
190                        output_col.push_value(value.clone());
191                    }
192                }
193                ProjectExpr::PropertyAccess { column, property } => {
194                    // Access property from node/edge in the specified column
195                    let input_col = input
196                        .column(*column)
197                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
198
199                    let output_col = output
200                        .column_mut(i)
201                        .expect("column exists: index matches projection schema");
202
203                    let store = self.store.as_ref().ok_or_else(|| {
204                        OperatorError::Execution("Store required for property access".to_string())
205                    })?;
206
207                    // Extract property for each row.
208                    // For typed columns (VectorData::NodeId / EdgeId) there is
209                    // no ambiguity. For Generic/Any columns (e.g. after a hash
210                    // join), both get_node_id and get_edge_id can succeed on the
211                    // same Int64 value, so we verify against the store to resolve
212                    // the entity type.
213                    let prop_key = PropertyKey::new(property);
214                    let epoch = self.viewing_epoch;
215                    let tx_id = self.transaction_id;
216                    for row in input.selected_indices() {
217                        let value = if let Some(node_id) = input_col.get_node_id(row) {
218                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
219                                store.get_node_versioned(node_id, ep, tx)
220                            } else if let Some(ep) = epoch {
221                                store.get_node_at_epoch(node_id, ep)
222                            } else {
223                                store.get_node(node_id)
224                            };
225                            if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
226                            {
227                                prop
228                            } else if let Some(edge_id) = input_col.get_edge_id(row) {
229                                // Node lookup failed: the ID may belong to an
230                                // edge (common with Generic columns after joins).
231                                let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
232                                    store.get_edge_versioned(edge_id, ep, tx)
233                                } else if let Some(ep) = epoch {
234                                    store.get_edge_at_epoch(edge_id, ep)
235                                } else {
236                                    store.get_edge(edge_id)
237                                };
238                                edge.and_then(|e| e.get_property(property).cloned())
239                                    .unwrap_or(Value::Null)
240                            } else {
241                                Value::Null
242                            }
243                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
244                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
245                                store.get_edge_versioned(edge_id, ep, tx)
246                            } else if let Some(ep) = epoch {
247                                store.get_edge_at_epoch(edge_id, ep)
248                            } else {
249                                store.get_edge(edge_id)
250                            };
251                            edge.and_then(|e| e.get_property(property).cloned())
252                                .unwrap_or(Value::Null)
253                        } else if let Some(Value::Map(map)) = input_col.get_value(row) {
254                            map.get(&prop_key).cloned().unwrap_or(Value::Null)
255                        } else {
256                            Value::Null
257                        };
258                        output_col.push_value(value);
259                    }
260                }
261                ProjectExpr::EdgeType { column } => {
262                    // Get edge type string from an edge column
263                    let input_col = input
264                        .column(*column)
265                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
266
267                    let output_col = output
268                        .column_mut(i)
269                        .expect("column exists: index matches projection schema");
270
271                    let store = self.store.as_ref().ok_or_else(|| {
272                        OperatorError::Execution("Store required for edge type access".to_string())
273                    })?;
274
275                    let epoch = self.viewing_epoch;
276                    let tx_id = self.transaction_id;
277                    for row in input.selected_indices() {
278                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
279                            let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
280                                store.edge_type_versioned(edge_id, ep, tx)
281                            } else {
282                                store.edge_type(edge_id)
283                            };
284                            etype.map_or(Value::Null, Value::String)
285                        } else {
286                            Value::Null
287                        };
288                        output_col.push_value(value);
289                    }
290                }
291                ProjectExpr::Expression {
292                    expr,
293                    variable_columns,
294                } => {
295                    let output_col = output
296                        .column_mut(i)
297                        .expect("column exists: index matches projection schema");
298
299                    let store = self.store.as_ref().ok_or_else(|| {
300                        OperatorError::Execution(
301                            "Store required for expression evaluation".to_string(),
302                        )
303                    })?;
304
305                    // Use the ExpressionPredicate for expression evaluation
306                    let mut evaluator = ExpressionPredicate::new(
307                        expr.clone(),
308                        variable_columns.clone(),
309                        Arc::clone(store),
310                    )
311                    .with_session_context(self.session_context.clone());
312                    if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
313                        evaluator = evaluator.with_transaction_context(ep, tx_id);
314                    }
315
316                    for row in input.selected_indices() {
317                        let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
318                        output_col.push_value(value);
319                    }
320                }
321                ProjectExpr::NodeResolve { column } => {
322                    let input_col = input
323                        .column(*column)
324                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
325
326                    let output_col = output
327                        .column_mut(i)
328                        .expect("column exists: index matches projection schema");
329
330                    let store = self.store.as_ref().ok_or_else(|| {
331                        OperatorError::Execution("Store required for node resolution".to_string())
332                    })?;
333
334                    let epoch = self.viewing_epoch;
335                    let tx_id = self.transaction_id;
336                    for row in input.selected_indices() {
337                        let value = if let Some(node_id) = input_col.get_node_id(row) {
338                            let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
339                                store.get_node_versioned(node_id, ep, tx)
340                            } else if let Some(ep) = epoch {
341                                store.get_node_at_epoch(node_id, ep)
342                            } else {
343                                store.get_node(node_id)
344                            };
345                            node.map_or(Value::Null, |n| node_to_map(&n))
346                        } else {
347                            Value::Null
348                        };
349                        output_col.push_value(value);
350                    }
351                }
352                ProjectExpr::EdgeResolve { column } => {
353                    let input_col = input
354                        .column(*column)
355                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
356
357                    let output_col = output
358                        .column_mut(i)
359                        .expect("column exists: index matches projection schema");
360
361                    let store = self.store.as_ref().ok_or_else(|| {
362                        OperatorError::Execution("Store required for edge resolution".to_string())
363                    })?;
364
365                    let epoch = self.viewing_epoch;
366                    let tx_id = self.transaction_id;
367                    for row in input.selected_indices() {
368                        let value = if let Some(edge_id) = input_col.get_edge_id(row) {
369                            let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
370                                store.get_edge_versioned(edge_id, ep, tx)
371                            } else if let Some(ep) = epoch {
372                                store.get_edge_at_epoch(edge_id, ep)
373                            } else {
374                                store.get_edge(edge_id)
375                            };
376                            edge.map_or(Value::Null, |e| edge_to_map(&e))
377                        } else {
378                            Value::Null
379                        };
380                        output_col.push_value(value);
381                    }
382                }
383                ProjectExpr::Coalesce { first, second } => {
384                    let first_col = input
385                        .column(*first)
386                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
387                    let second_col = input
388                        .column(*second)
389                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
390
391                    let output_col = output
392                        .column_mut(i)
393                        .expect("column exists: index matches projection schema");
394
395                    for row in input.selected_indices() {
396                        let value = match first_col.get_value(row) {
397                            Some(Value::Null) | None => {
398                                second_col.get_value(row).unwrap_or(Value::Null)
399                            }
400                            Some(v) => v,
401                        };
402                        output_col.push_value(value);
403                    }
404                }
405            }
406        }
407
408        output.set_count(input.row_count());
409        Ok(Some(output))
410    }
411
412    fn reset(&mut self) {
413        self.child.reset();
414    }
415
416    fn name(&self) -> &'static str {
417        "Project"
418    }
419
420    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
421        self
422    }
423}
424
425/// Converts a [`Node`] to a `Value::Map` with metadata and properties.
426///
427/// The map contains `_id` (integer), `_labels` (list of strings), and
428/// all node properties at the top level.
429fn node_to_map(node: &Node) -> Value {
430    let mut map = BTreeMap::new();
431    // reason: entity IDs stored as i64, standard encoding
432    #[allow(clippy::cast_possible_wrap)]
433    let node_id_i64 = node.id.as_u64() as i64;
434    map.insert(PropertyKey::new("_id"), Value::Int64(node_id_i64));
435    let labels: Vec<Value> = node
436        .labels
437        .iter()
438        .map(|l| Value::String(l.clone()))
439        .collect();
440    map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
441    for (key, value) in &node.properties {
442        map.insert(key.clone(), value.clone());
443    }
444    Value::Map(Arc::new(map))
445}
446
447/// Converts an [`Edge`] to a `Value::Map` with metadata and properties.
448///
449/// The map contains `_id`, `_type`, `_source`, `_target`, and all edge
450/// properties at the top level.
451fn edge_to_map(edge: &Edge) -> Value {
452    let mut map = BTreeMap::new();
453    // reason: entity IDs stored as i64, standard encoding
454    #[allow(clippy::cast_possible_wrap)]
455    let edge_id_i64 = edge.id.as_u64() as i64;
456    // reason: entity IDs stored as i64, standard encoding
457    #[allow(clippy::cast_possible_wrap)]
458    let src_id_i64 = edge.src.as_u64() as i64;
459    // reason: entity IDs stored as i64, standard encoding
460    #[allow(clippy::cast_possible_wrap)]
461    let dst_id_i64 = edge.dst.as_u64() as i64;
462    map.insert(PropertyKey::new("_id"), Value::Int64(edge_id_i64));
463    map.insert(
464        PropertyKey::new("_type"),
465        Value::String(edge.edge_type.clone()),
466    );
467    map.insert(PropertyKey::new("_source"), Value::Int64(src_id_i64));
468    map.insert(PropertyKey::new("_target"), Value::Int64(dst_id_i64));
469    for (key, value) in &edge.properties {
470        map.insert(key.clone(), value.clone());
471    }
472    Value::Map(Arc::new(map))
473}
474
475#[cfg(all(test, feature = "lpg"))]
476mod tests {
477    use super::*;
478    use crate::execution::chunk::DataChunkBuilder;
479    use crate::graph::lpg::LpgStore;
480    use grafeo_common::types::Value;
481
482    struct MockScanOperator {
483        chunks: Vec<DataChunk>,
484        position: usize,
485    }
486
487    impl Operator for MockScanOperator {
488        fn next(&mut self) -> OperatorResult {
489            if self.position < self.chunks.len() {
490                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
491                self.position += 1;
492                Ok(Some(chunk))
493            } else {
494                Ok(None)
495            }
496        }
497
498        fn reset(&mut self) {
499            self.position = 0;
500        }
501
502        fn name(&self) -> &'static str {
503            "MockScan"
504        }
505
506        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
507            self
508        }
509    }
510
511    #[test]
512    fn test_project_select_columns() {
513        // Create input with 3 columns: [int, string, int]
514        let mut builder =
515            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
516
517        builder.column_mut(0).unwrap().push_int64(1);
518        builder.column_mut(1).unwrap().push_string("hello");
519        builder.column_mut(2).unwrap().push_int64(100);
520        builder.advance_row();
521
522        builder.column_mut(0).unwrap().push_int64(2);
523        builder.column_mut(1).unwrap().push_string("world");
524        builder.column_mut(2).unwrap().push_int64(200);
525        builder.advance_row();
526
527        let chunk = builder.finish();
528
529        let mock_scan = MockScanOperator {
530            chunks: vec![chunk],
531            position: 0,
532        };
533
534        // Project to select columns 2 and 0 (reordering)
535        let mut project = ProjectOperator::select_columns(
536            Box::new(mock_scan),
537            vec![2, 0],
538            vec![LogicalType::Int64, LogicalType::Int64],
539        );
540
541        let result = project.next().unwrap().unwrap();
542
543        assert_eq!(result.column_count(), 2);
544        assert_eq!(result.row_count(), 2);
545
546        // Check values are reordered
547        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
548        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
549    }
550
551    #[test]
552    fn test_project_constant() {
553        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
554        builder.column_mut(0).unwrap().push_int64(1);
555        builder.advance_row();
556        builder.column_mut(0).unwrap().push_int64(2);
557        builder.advance_row();
558
559        let chunk = builder.finish();
560
561        let mock_scan = MockScanOperator {
562            chunks: vec![chunk],
563            position: 0,
564        };
565
566        // Project with a constant
567        let mut project = ProjectOperator::new(
568            Box::new(mock_scan),
569            vec![
570                ProjectExpr::Column(0),
571                ProjectExpr::Constant(Value::String("constant".into())),
572            ],
573            vec![LogicalType::Int64, LogicalType::String],
574        );
575
576        let result = project.next().unwrap().unwrap();
577
578        assert_eq!(result.column_count(), 2);
579        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
580        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
581    }
582
583    #[test]
584    fn test_project_empty_input() {
585        let mock_scan = MockScanOperator {
586            chunks: vec![],
587            position: 0,
588        };
589
590        let mut project =
591            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
592
593        assert!(project.next().unwrap().is_none());
594    }
595
596    #[test]
597    fn test_project_column_not_found() {
598        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
599        builder.column_mut(0).unwrap().push_int64(1);
600        builder.advance_row();
601        let chunk = builder.finish();
602
603        let mock_scan = MockScanOperator {
604            chunks: vec![chunk],
605            position: 0,
606        };
607
608        // Reference column index 5 which doesn't exist
609        let mut project = ProjectOperator::new(
610            Box::new(mock_scan),
611            vec![ProjectExpr::Column(5)],
612            vec![LogicalType::Int64],
613        );
614
615        let result = project.next();
616        assert!(result.is_err(), "Should fail with ColumnNotFound");
617    }
618
619    #[test]
620    fn test_project_multiple_constants() {
621        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
622        builder.column_mut(0).unwrap().push_int64(1);
623        builder.advance_row();
624        let chunk = builder.finish();
625
626        let mock_scan = MockScanOperator {
627            chunks: vec![chunk],
628            position: 0,
629        };
630
631        let mut project = ProjectOperator::new(
632            Box::new(mock_scan),
633            vec![
634                ProjectExpr::Constant(Value::Int64(42)),
635                ProjectExpr::Constant(Value::String("fixed".into())),
636                ProjectExpr::Constant(Value::Bool(true)),
637            ],
638            vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
639        );
640
641        let result = project.next().unwrap().unwrap();
642        assert_eq!(result.column_count(), 3);
643        assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
644        assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
645        assert_eq!(
646            result.column(2).unwrap().get_value(0),
647            Some(Value::Bool(true))
648        );
649    }
650
651    #[test]
652    fn test_project_identity() {
653        // Select all columns in original order
654        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
655        builder.column_mut(0).unwrap().push_int64(10);
656        builder.column_mut(1).unwrap().push_string("test");
657        builder.advance_row();
658        let chunk = builder.finish();
659
660        let mock_scan = MockScanOperator {
661            chunks: vec![chunk],
662            position: 0,
663        };
664
665        let mut project = ProjectOperator::select_columns(
666            Box::new(mock_scan),
667            vec![0, 1],
668            vec![LogicalType::Int64, LogicalType::String],
669        );
670
671        let result = project.next().unwrap().unwrap();
672        assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
673        assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
674    }
675
676    #[test]
677    fn test_project_name() {
678        let mock_scan = MockScanOperator {
679            chunks: vec![],
680            position: 0,
681        };
682        let project =
683            ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
684        assert_eq!(project.name(), "Project");
685    }
686
687    #[test]
688    // reason: test IDs are small sequential counters
689    #[allow(clippy::cast_possible_wrap)]
690    fn test_project_node_resolve() {
691        // Create a store with a test node
692        let store = LpgStore::new().unwrap();
693        let node_id = store.create_node(&["Person"]);
694        store.set_node_property(node_id, "name", Value::String("Alix".into()));
695        store.set_node_property(node_id, "age", Value::Int64(30));
696
697        // Create input chunk with a NodeId column
698        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
699        builder.column_mut(0).unwrap().push_node_id(node_id);
700        builder.advance_row();
701        let chunk = builder.finish();
702
703        let mock_scan = MockScanOperator {
704            chunks: vec![chunk],
705            position: 0,
706        };
707
708        let mut project = ProjectOperator::with_store(
709            Box::new(mock_scan),
710            vec![ProjectExpr::NodeResolve { column: 0 }],
711            vec![LogicalType::Any],
712            Arc::new(store),
713        );
714
715        let result = project.next().unwrap().unwrap();
716        assert_eq!(result.column_count(), 1);
717
718        let value = result.column(0).unwrap().get_value(0).unwrap();
719        if let Value::Map(map) = value {
720            assert_eq!(
721                map.get(&PropertyKey::new("_id")),
722                Some(&Value::Int64(node_id.as_u64() as i64))
723            );
724            assert!(map.get(&PropertyKey::new("_labels")).is_some());
725            assert_eq!(
726                map.get(&PropertyKey::new("name")),
727                Some(&Value::String("Alix".into()))
728            );
729            assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
730        } else {
731            panic!("Expected Value::Map, got {:?}", value);
732        }
733    }
734
735    #[test]
736    // reason: test IDs are small sequential counters
737    #[allow(clippy::cast_possible_wrap)]
738    fn test_project_edge_resolve() {
739        let store = LpgStore::new().unwrap();
740        let src = store.create_node(&["Person"]);
741        let dst = store.create_node(&["Company"]);
742        let edge_id = store.create_edge(src, dst, "WORKS_AT");
743        store.set_edge_property(edge_id, "since", Value::Int64(2020));
744
745        // Create input chunk with an EdgeId column
746        let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
747        builder.column_mut(0).unwrap().push_edge_id(edge_id);
748        builder.advance_row();
749        let chunk = builder.finish();
750
751        let mock_scan = MockScanOperator {
752            chunks: vec![chunk],
753            position: 0,
754        };
755
756        let mut project = ProjectOperator::with_store(
757            Box::new(mock_scan),
758            vec![ProjectExpr::EdgeResolve { column: 0 }],
759            vec![LogicalType::Any],
760            Arc::new(store),
761        );
762
763        let result = project.next().unwrap().unwrap();
764        let value = result.column(0).unwrap().get_value(0).unwrap();
765        if let Value::Map(map) = value {
766            assert_eq!(
767                map.get(&PropertyKey::new("_id")),
768                Some(&Value::Int64(edge_id.as_u64() as i64))
769            );
770            assert_eq!(
771                map.get(&PropertyKey::new("_type")),
772                Some(&Value::String("WORKS_AT".into()))
773            );
774            assert_eq!(
775                map.get(&PropertyKey::new("_source")),
776                Some(&Value::Int64(src.as_u64() as i64))
777            );
778            assert_eq!(
779                map.get(&PropertyKey::new("_target")),
780                Some(&Value::Int64(dst.as_u64() as i64))
781            );
782            assert_eq!(
783                map.get(&PropertyKey::new("since")),
784                Some(&Value::Int64(2020))
785            );
786        } else {
787            panic!("Expected Value::Map, got {:?}", value);
788        }
789    }
790
791    #[test]
792    fn test_project_resolve_missing_entity() {
793        use grafeo_common::types::NodeId;
794
795        let store = LpgStore::new().unwrap();
796
797        // Create input chunk with a NodeId that doesn't exist in the store
798        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
799        builder
800            .column_mut(0)
801            .unwrap()
802            .push_node_id(NodeId::new(999));
803        builder.advance_row();
804        let chunk = builder.finish();
805
806        let mock_scan = MockScanOperator {
807            chunks: vec![chunk],
808            position: 0,
809        };
810
811        let mut project = ProjectOperator::with_store(
812            Box::new(mock_scan),
813            vec![ProjectExpr::NodeResolve { column: 0 }],
814            vec![LogicalType::Any],
815            Arc::new(store),
816        );
817
818        let result = project.next().unwrap().unwrap();
819        assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
820    }
821
822    #[test]
823    fn test_project_into_any() {
824        let mock = MockScanOperator {
825            chunks: vec![],
826            position: 0,
827        };
828        let op = ProjectOperator::select_columns(Box::new(mock), vec![0], vec![LogicalType::Int64]);
829        let any = Box::new(op).into_any();
830        assert!(any.downcast::<ProjectOperator>().is_ok());
831    }
832
833    #[test]
834    fn test_project_into_parts() {
835        let mock = MockScanOperator {
836            chunks: vec![],
837            position: 0,
838        };
839        let op = ProjectOperator::new(
840            Box::new(mock),
841            vec![
842                ProjectExpr::Column(0),
843                ProjectExpr::Constant(Value::Int64(1)),
844            ],
845            vec![LogicalType::Int64, LogicalType::Int64],
846        );
847        let (child, projections, output_types) = op.into_parts();
848        assert_eq!(projections.len(), 2);
849        assert_eq!(output_types.len(), 2);
850        // Verify child is still functional
851        let mut child = child;
852        assert!(child.next().unwrap().is_none());
853    }
854}