Skip to main content

grafeo_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::lpg::LpgStore;
6use grafeo_common::types::{LogicalType, Value};
7use std::sync::Arc;
8
9/// A projection expression.
10pub enum ProjectExpr {
11    /// Reference to an input column.
12    Column(usize),
13    /// A constant value.
14    Constant(Value),
15    /// Property access on a node/edge column.
16    PropertyAccess {
17        /// The column containing the node or edge ID.
18        column: usize,
19        /// The property name to access.
20        property: String,
21    },
22}
23
24/// A project operator that selects and transforms columns.
25pub struct ProjectOperator {
26    /// Child operator to read from.
27    child: Box<dyn Operator>,
28    /// Projection expressions.
29    projections: Vec<ProjectExpr>,
30    /// Output column types.
31    output_types: Vec<LogicalType>,
32    /// Optional store for property access.
33    store: Option<Arc<LpgStore>>,
34}
35
36impl ProjectOperator {
37    /// Creates a new project operator.
38    pub fn new(
39        child: Box<dyn Operator>,
40        projections: Vec<ProjectExpr>,
41        output_types: Vec<LogicalType>,
42    ) -> Self {
43        assert_eq!(projections.len(), output_types.len());
44        Self {
45            child,
46            projections,
47            output_types,
48            store: None,
49        }
50    }
51
52    /// Creates a new project operator with store access for property lookups.
53    pub fn with_store(
54        child: Box<dyn Operator>,
55        projections: Vec<ProjectExpr>,
56        output_types: Vec<LogicalType>,
57        store: Arc<LpgStore>,
58    ) -> Self {
59        assert_eq!(projections.len(), output_types.len());
60        Self {
61            child,
62            projections,
63            output_types,
64            store: Some(store),
65        }
66    }
67
68    /// Creates a project operator that selects specific columns.
69    pub fn select_columns(
70        child: Box<dyn Operator>,
71        columns: Vec<usize>,
72        types: Vec<LogicalType>,
73    ) -> Self {
74        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
75        Self::new(child, projections, types)
76    }
77}
78
79impl Operator for ProjectOperator {
80    fn next(&mut self) -> OperatorResult {
81        // Get next chunk from child
82        let input = match self.child.next()? {
83            Some(c) => c,
84            None => return Ok(None),
85        };
86
87        // Create output chunk
88        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
89
90        // Evaluate each projection
91        for (i, proj) in self.projections.iter().enumerate() {
92            match proj {
93                ProjectExpr::Column(col_idx) => {
94                    // Copy column from input to output
95                    let input_col = input.column(*col_idx).ok_or_else(|| {
96                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
97                    })?;
98
99                    let output_col = output.column_mut(i).unwrap();
100
101                    // Copy selected rows
102                    for row in input.selected_indices() {
103                        if let Some(value) = input_col.get_value(row) {
104                            output_col.push_value(value);
105                        }
106                    }
107                }
108                ProjectExpr::Constant(value) => {
109                    // Push constant for each row
110                    let output_col = output.column_mut(i).unwrap();
111                    for _ in input.selected_indices() {
112                        output_col.push_value(value.clone());
113                    }
114                }
115                ProjectExpr::PropertyAccess { column, property } => {
116                    // Access property from node/edge in the specified column
117                    let input_col = input
118                        .column(*column)
119                        .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
120
121                    let output_col = output.column_mut(i).unwrap();
122
123                    let store = self.store.as_ref().ok_or_else(|| {
124                        OperatorError::Execution("Store required for property access".to_string())
125                    })?;
126
127                    // Extract property for each row
128                    for row in input.selected_indices() {
129                        // Try to get node ID first, then edge ID
130                        let value = if let Some(node_id) = input_col.get_node_id(row) {
131                            store
132                                .get_node(node_id)
133                                .and_then(|node| node.get_property(property).cloned())
134                                .unwrap_or(Value::Null)
135                        } else if let Some(edge_id) = input_col.get_edge_id(row) {
136                            store
137                                .get_edge(edge_id)
138                                .and_then(|edge| edge.get_property(property).cloned())
139                                .unwrap_or(Value::Null)
140                        } else {
141                            Value::Null
142                        };
143                        output_col.push_value(value);
144                    }
145                }
146            }
147        }
148
149        output.set_count(input.row_count());
150        Ok(Some(output))
151    }
152
153    fn reset(&mut self) {
154        self.child.reset();
155    }
156
157    fn name(&self) -> &'static str {
158        "Project"
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::execution::chunk::DataChunkBuilder;
166    use grafeo_common::types::Value;
167
168    struct MockScanOperator {
169        chunks: Vec<DataChunk>,
170        position: usize,
171    }
172
173    impl Operator for MockScanOperator {
174        fn next(&mut self) -> OperatorResult {
175            if self.position < self.chunks.len() {
176                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
177                self.position += 1;
178                Ok(Some(chunk))
179            } else {
180                Ok(None)
181            }
182        }
183
184        fn reset(&mut self) {
185            self.position = 0;
186        }
187
188        fn name(&self) -> &'static str {
189            "MockScan"
190        }
191    }
192
193    #[test]
194    fn test_project_select_columns() {
195        // Create input with 3 columns: [int, string, int]
196        let mut builder =
197            DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
198
199        builder.column_mut(0).unwrap().push_int64(1);
200        builder.column_mut(1).unwrap().push_string("hello");
201        builder.column_mut(2).unwrap().push_int64(100);
202        builder.advance_row();
203
204        builder.column_mut(0).unwrap().push_int64(2);
205        builder.column_mut(1).unwrap().push_string("world");
206        builder.column_mut(2).unwrap().push_int64(200);
207        builder.advance_row();
208
209        let chunk = builder.finish();
210
211        let mock_scan = MockScanOperator {
212            chunks: vec![chunk],
213            position: 0,
214        };
215
216        // Project to select columns 2 and 0 (reordering)
217        let mut project = ProjectOperator::select_columns(
218            Box::new(mock_scan),
219            vec![2, 0],
220            vec![LogicalType::Int64, LogicalType::Int64],
221        );
222
223        let result = project.next().unwrap().unwrap();
224
225        assert_eq!(result.column_count(), 2);
226        assert_eq!(result.row_count(), 2);
227
228        // Check values are reordered
229        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
230        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
231    }
232
233    #[test]
234    fn test_project_constant() {
235        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
236        builder.column_mut(0).unwrap().push_int64(1);
237        builder.advance_row();
238        builder.column_mut(0).unwrap().push_int64(2);
239        builder.advance_row();
240
241        let chunk = builder.finish();
242
243        let mock_scan = MockScanOperator {
244            chunks: vec![chunk],
245            position: 0,
246        };
247
248        // Project with a constant
249        let mut project = ProjectOperator::new(
250            Box::new(mock_scan),
251            vec![
252                ProjectExpr::Column(0),
253                ProjectExpr::Constant(Value::String("constant".into())),
254            ],
255            vec![LogicalType::Int64, LogicalType::String],
256        );
257
258        let result = project.next().unwrap().unwrap();
259
260        assert_eq!(result.column_count(), 2);
261        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
262        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
263    }
264}