Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5use crate::database::QueryResult;
6use grafeo_common::types::{LogicalType, Value};
7use grafeo_common::utils::error::{Error, Result};
8use grafeo_core::execution::DataChunk;
9use grafeo_core::execution::operators::{Operator, OperatorError};
10
11/// Executes a physical operator tree and collects results.
12pub struct Executor {
13    /// Column names for the result.
14    columns: Vec<String>,
15    /// Column types for the result.
16    column_types: Vec<LogicalType>,
17}
18
19impl Executor {
20    /// Creates a new executor.
21    #[must_use]
22    pub fn new() -> Self {
23        Self {
24            columns: Vec::new(),
25            column_types: Vec::new(),
26        }
27    }
28
29    /// Creates an executor with specified column names.
30    #[must_use]
31    pub fn with_columns(columns: Vec<String>) -> Self {
32        let len = columns.len();
33        Self {
34            columns,
35            column_types: vec![LogicalType::Any; len],
36        }
37    }
38
39    /// Creates an executor with specified column names and types.
40    #[must_use]
41    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
42        Self {
43            columns,
44            column_types,
45        }
46    }
47
48    /// Executes a physical operator and collects all results.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if operator execution fails.
53    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
54        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
55        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
56
57        loop {
58            match operator.next() {
59                Ok(Some(chunk)) => {
60                    // Capture column types from first non-empty chunk
61                    if !types_captured && chunk.column_count() > 0 {
62                        self.capture_column_types(&chunk, &mut result);
63                        types_captured = true;
64                    }
65                    self.collect_chunk(&chunk, &mut result)?;
66                }
67                Ok(None) => break,
68                Err(err) => return Err(convert_operator_error(err)),
69            }
70        }
71
72        Ok(result)
73    }
74
75    /// Executes and returns at most `limit` rows.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if operator execution fails.
80    pub fn execute_with_limit(
81        &self,
82        operator: &mut dyn Operator,
83        limit: usize,
84    ) -> Result<QueryResult> {
85        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
86        let mut collected = 0;
87        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
88
89        loop {
90            if collected >= limit {
91                break;
92            }
93
94            match operator.next() {
95                Ok(Some(chunk)) => {
96                    // Capture column types from first non-empty chunk
97                    if !types_captured && chunk.column_count() > 0 {
98                        self.capture_column_types(&chunk, &mut result);
99                        types_captured = true;
100                    }
101                    let remaining = limit - collected;
102                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
103                }
104                Ok(None) => break,
105                Err(err) => return Err(convert_operator_error(err)),
106            }
107        }
108
109        Ok(result)
110    }
111
112    /// Captures column types from a DataChunk.
113    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
114        let col_count = chunk.column_count();
115        result.column_types = Vec::with_capacity(col_count);
116        for col_idx in 0..col_count {
117            let col_type = chunk
118                .column(col_idx)
119                .map(|col| col.data_type().clone())
120                .unwrap_or(LogicalType::Any);
121            result.column_types.push(col_type);
122        }
123    }
124
125    /// Collects all rows from a DataChunk into the result.
126    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
127        let row_count = chunk.row_count();
128        let col_count = chunk.column_count();
129
130        for row_idx in 0..row_count {
131            let mut row = Vec::with_capacity(col_count);
132            for col_idx in 0..col_count {
133                let value = chunk
134                    .column(col_idx)
135                    .and_then(|col| col.get_value(row_idx))
136                    .unwrap_or(Value::Null);
137                row.push(value);
138            }
139            result.rows.push(row);
140        }
141
142        Ok(row_count)
143    }
144
145    /// Collects up to `limit` rows from a DataChunk.
146    fn collect_chunk_limited(
147        &self,
148        chunk: &DataChunk,
149        result: &mut QueryResult,
150        limit: usize,
151    ) -> Result<usize> {
152        let row_count = chunk.row_count().min(limit);
153        let col_count = chunk.column_count();
154
155        for row_idx in 0..row_count {
156            let mut row = Vec::with_capacity(col_count);
157            for col_idx in 0..col_count {
158                let value = chunk
159                    .column(col_idx)
160                    .and_then(|col| col.get_value(row_idx))
161                    .unwrap_or(Value::Null);
162                row.push(value);
163            }
164            result.rows.push(row);
165        }
166
167        Ok(row_count)
168    }
169}
170
171impl Default for Executor {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177/// Converts an operator error to a common error.
178fn convert_operator_error(err: OperatorError) -> Error {
179    match err {
180        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
181        OperatorError::ColumnNotFound(name) => {
182            Error::InvalidValue(format!("Column not found: {name}"))
183        }
184        OperatorError::Execution(msg) => Error::Internal(msg),
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use grafeo_common::types::LogicalType;
192    use grafeo_core::execution::DataChunk;
193
194    /// A mock operator that generates chunks with integer data on demand.
195    struct MockIntOperator {
196        values: Vec<i64>,
197        position: usize,
198        chunk_size: usize,
199    }
200
201    impl MockIntOperator {
202        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
203            Self {
204                values,
205                position: 0,
206                chunk_size,
207            }
208        }
209    }
210
211    impl Operator for MockIntOperator {
212        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
213            if self.position >= self.values.len() {
214                return Ok(None);
215            }
216
217            let end = (self.position + self.chunk_size).min(self.values.len());
218            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
219
220            {
221                let col = chunk.column_mut(0).unwrap();
222                for i in self.position..end {
223                    col.push_int64(self.values[i]);
224                }
225            }
226            chunk.set_count(end - self.position);
227            self.position = end;
228
229            Ok(Some(chunk))
230        }
231
232        fn reset(&mut self) {
233            self.position = 0;
234        }
235
236        fn name(&self) -> &'static str {
237            "MockInt"
238        }
239    }
240
241    /// Empty mock operator for testing empty results.
242    struct EmptyOperator;
243
244    impl Operator for EmptyOperator {
245        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
246            Ok(None)
247        }
248
249        fn reset(&mut self) {}
250
251        fn name(&self) -> &'static str {
252            "Empty"
253        }
254    }
255
256    #[test]
257    fn test_executor_empty() {
258        let executor = Executor::with_columns(vec!["a".to_string()]);
259        let mut op = EmptyOperator;
260
261        let result = executor.execute(&mut op).unwrap();
262        assert!(result.is_empty());
263        assert_eq!(result.column_count(), 1);
264    }
265
266    #[test]
267    fn test_executor_single_chunk() {
268        let executor = Executor::with_columns(vec!["value".to_string()]);
269        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
270
271        let result = executor.execute(&mut op).unwrap();
272        assert_eq!(result.row_count(), 3);
273        assert_eq!(result.rows[0][0], Value::Int64(1));
274        assert_eq!(result.rows[1][0], Value::Int64(2));
275        assert_eq!(result.rows[2][0], Value::Int64(3));
276    }
277
278    #[test]
279    fn test_executor_with_limit() {
280        let executor = Executor::with_columns(vec!["value".to_string()]);
281        let mut op = MockIntOperator::new((0..10).collect(), 100);
282
283        let result = executor.execute_with_limit(&mut op, 5).unwrap();
284        assert_eq!(result.row_count(), 5);
285    }
286}