Skip to main content

graphos_core/execution/operators/
project.rs

1//! Project operator for selecting and transforming columns.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use graphos_common::types::LogicalType;
6
7/// A projection expression.
8pub enum ProjectExpr {
9    /// Reference to an input column.
10    Column(usize),
11    /// A constant value.
12    Constant(graphos_common::types::Value),
13    // Future: Add more expression types (arithmetic, function calls, etc.)
14}
15
16/// A project operator that selects and transforms columns.
17pub struct ProjectOperator {
18    /// Child operator to read from.
19    child: Box<dyn Operator>,
20    /// Projection expressions.
21    projections: Vec<ProjectExpr>,
22    /// Output column types.
23    output_types: Vec<LogicalType>,
24}
25
26impl ProjectOperator {
27    /// Creates a new project operator.
28    pub fn new(
29        child: Box<dyn Operator>,
30        projections: Vec<ProjectExpr>,
31        output_types: Vec<LogicalType>,
32    ) -> Self {
33        assert_eq!(projections.len(), output_types.len());
34        Self {
35            child,
36            projections,
37            output_types,
38        }
39    }
40
41    /// Creates a project operator that selects specific columns.
42    pub fn select_columns(child: Box<dyn Operator>, columns: Vec<usize>, types: Vec<LogicalType>) -> Self {
43        let projections = columns.into_iter().map(ProjectExpr::Column).collect();
44        Self::new(child, projections, types)
45    }
46}
47
48impl Operator for ProjectOperator {
49    fn next(&mut self) -> OperatorResult {
50        // Get next chunk from child
51        let input = match self.child.next()? {
52            Some(c) => c,
53            None => return Ok(None),
54        };
55
56        // Create output chunk
57        let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
58
59        // Evaluate each projection
60        for (i, proj) in self.projections.iter().enumerate() {
61            match proj {
62                ProjectExpr::Column(col_idx) => {
63                    // Copy column from input to output
64                    let input_col = input.column(*col_idx).ok_or_else(|| {
65                        OperatorError::ColumnNotFound(format!("Column {col_idx}"))
66                    })?;
67
68                    let output_col = output.column_mut(i).unwrap();
69
70                    // Copy selected rows
71                    for row in input.selected_indices() {
72                        if let Some(value) = input_col.get_value(row) {
73                            output_col.push_value(value);
74                        }
75                    }
76                }
77                ProjectExpr::Constant(value) => {
78                    // Push constant for each row
79                    let output_col = output.column_mut(i).unwrap();
80                    for _ in input.selected_indices() {
81                        output_col.push_value(value.clone());
82                    }
83                }
84            }
85        }
86
87        output.set_count(input.row_count());
88        Ok(Some(output))
89    }
90
91    fn reset(&mut self) {
92        self.child.reset();
93    }
94
95    fn name(&self) -> &'static str {
96        "Project"
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use crate::execution::chunk::DataChunkBuilder;
104    use graphos_common::types::Value;
105
106    struct MockScanOperator {
107        chunks: Vec<DataChunk>,
108        position: usize,
109    }
110
111    impl Operator for MockScanOperator {
112        fn next(&mut self) -> OperatorResult {
113            if self.position < self.chunks.len() {
114                let chunk = std::mem::replace(
115                    &mut self.chunks[self.position],
116                    DataChunk::new(&[]),
117                );
118                self.position += 1;
119                Ok(Some(chunk))
120            } else {
121                Ok(None)
122            }
123        }
124
125        fn reset(&mut self) {
126            self.position = 0;
127        }
128
129        fn name(&self) -> &'static str {
130            "MockScan"
131        }
132    }
133
134    #[test]
135    fn test_project_select_columns() {
136        // Create input with 3 columns: [int, string, int]
137        let mut builder = DataChunkBuilder::new(&[
138            LogicalType::Int64,
139            LogicalType::String,
140            LogicalType::Int64,
141        ]);
142
143        builder.column_mut(0).unwrap().push_int64(1);
144        builder.column_mut(1).unwrap().push_string("hello");
145        builder.column_mut(2).unwrap().push_int64(100);
146        builder.advance_row();
147
148        builder.column_mut(0).unwrap().push_int64(2);
149        builder.column_mut(1).unwrap().push_string("world");
150        builder.column_mut(2).unwrap().push_int64(200);
151        builder.advance_row();
152
153        let chunk = builder.finish();
154
155        let mock_scan = MockScanOperator {
156            chunks: vec![chunk],
157            position: 0,
158        };
159
160        // Project to select columns 2 and 0 (reordering)
161        let mut project = ProjectOperator::select_columns(
162            Box::new(mock_scan),
163            vec![2, 0],
164            vec![LogicalType::Int64, LogicalType::Int64],
165        );
166
167        let result = project.next().unwrap().unwrap();
168
169        assert_eq!(result.column_count(), 2);
170        assert_eq!(result.row_count(), 2);
171
172        // Check values are reordered
173        assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
174        assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
175    }
176
177    #[test]
178    fn test_project_constant() {
179        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
180        builder.column_mut(0).unwrap().push_int64(1);
181        builder.advance_row();
182        builder.column_mut(0).unwrap().push_int64(2);
183        builder.advance_row();
184
185        let chunk = builder.finish();
186
187        let mock_scan = MockScanOperator {
188            chunks: vec![chunk],
189            position: 0,
190        };
191
192        // Project with a constant
193        let mut project = ProjectOperator::new(
194            Box::new(mock_scan),
195            vec![
196                ProjectExpr::Column(0),
197                ProjectExpr::Constant(Value::String("constant".into())),
198            ],
199            vec![LogicalType::Int64, LogicalType::String],
200        );
201
202        let result = project.next().unwrap().unwrap();
203
204        assert_eq!(result.column_count(), 2);
205        assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
206        assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
207    }
208}