Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5use crate::config::AdaptiveConfig;
6use crate::database::QueryResult;
7use grafeo_common::types::{LogicalType, Value};
8use grafeo_common::utils::error::{Error, Result};
9use grafeo_core::execution::operators::{Operator, OperatorError};
10use grafeo_core::execution::{
11    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
12};
13
14/// Executes a physical operator tree and collects results.
15pub struct Executor {
16    /// Column names for the result.
17    columns: Vec<String>,
18    /// Column types for the result.
19    column_types: Vec<LogicalType>,
20}
21
22impl Executor {
23    /// Creates a new executor.
24    #[must_use]
25    pub fn new() -> Self {
26        Self {
27            columns: Vec::new(),
28            column_types: Vec::new(),
29        }
30    }
31
32    /// Creates an executor with specified column names.
33    #[must_use]
34    pub fn with_columns(columns: Vec<String>) -> Self {
35        let len = columns.len();
36        Self {
37            columns,
38            column_types: vec![LogicalType::Any; len],
39        }
40    }
41
42    /// Creates an executor with specified column names and types.
43    #[must_use]
44    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
45        Self {
46            columns,
47            column_types,
48        }
49    }
50
51    /// Executes a physical operator and collects all results.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if operator execution fails.
56    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
57        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
58        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
59
60        loop {
61            match operator.next() {
62                Ok(Some(chunk)) => {
63                    // Capture column types from first non-empty chunk
64                    if !types_captured && chunk.column_count() > 0 {
65                        self.capture_column_types(&chunk, &mut result);
66                        types_captured = true;
67                    }
68                    self.collect_chunk(&chunk, &mut result)?;
69                }
70                Ok(None) => break,
71                Err(err) => return Err(convert_operator_error(err)),
72            }
73        }
74
75        Ok(result)
76    }
77
78    /// Executes and returns at most `limit` rows.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if operator execution fails.
83    pub fn execute_with_limit(
84        &self,
85        operator: &mut dyn Operator,
86        limit: usize,
87    ) -> Result<QueryResult> {
88        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
89        let mut collected = 0;
90        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
91
92        loop {
93            if collected >= limit {
94                break;
95            }
96
97            match operator.next() {
98                Ok(Some(chunk)) => {
99                    // Capture column types from first non-empty chunk
100                    if !types_captured && chunk.column_count() > 0 {
101                        self.capture_column_types(&chunk, &mut result);
102                        types_captured = true;
103                    }
104                    let remaining = limit - collected;
105                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
106                }
107                Ok(None) => break,
108                Err(err) => return Err(convert_operator_error(err)),
109            }
110        }
111
112        Ok(result)
113    }
114
115    /// Captures column types from a DataChunk.
116    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
117        let col_count = chunk.column_count();
118        result.column_types = Vec::with_capacity(col_count);
119        for col_idx in 0..col_count {
120            let col_type = chunk
121                .column(col_idx)
122                .map(|col| col.data_type().clone())
123                .unwrap_or(LogicalType::Any);
124            result.column_types.push(col_type);
125        }
126    }
127
128    /// Collects all rows from a DataChunk into the result.
129    ///
130    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
131    /// (e.g., after filtering operations).
132    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
133        let col_count = chunk.column_count();
134        let mut collected = 0;
135
136        for row_idx in chunk.selected_indices() {
137            let mut row = Vec::with_capacity(col_count);
138            for col_idx in 0..col_count {
139                let value = chunk
140                    .column(col_idx)
141                    .and_then(|col| col.get_value(row_idx))
142                    .unwrap_or(Value::Null);
143                row.push(value);
144            }
145            result.rows.push(row);
146            collected += 1;
147        }
148
149        Ok(collected)
150    }
151
152    /// Collects up to `limit` rows from a DataChunk.
153    ///
154    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
155    /// (e.g., after filtering operations).
156    fn collect_chunk_limited(
157        &self,
158        chunk: &DataChunk,
159        result: &mut QueryResult,
160        limit: usize,
161    ) -> Result<usize> {
162        let col_count = chunk.column_count();
163        let mut collected = 0;
164
165        for row_idx in chunk.selected_indices() {
166            if collected >= limit {
167                break;
168            }
169            let mut row = Vec::with_capacity(col_count);
170            for col_idx in 0..col_count {
171                let value = chunk
172                    .column(col_idx)
173                    .and_then(|col| col.get_value(row_idx))
174                    .unwrap_or(Value::Null);
175                row.push(value);
176            }
177            result.rows.push(row);
178            collected += 1;
179        }
180
181        Ok(collected)
182    }
183
184    /// Executes a physical operator with adaptive cardinality tracking.
185    ///
186    /// This wraps the operator in a cardinality tracking layer and monitors
187    /// deviation from estimates during execution. The adaptive summary is
188    /// returned alongside the query result.
189    ///
190    /// # Arguments
191    ///
192    /// * `operator` - The root physical operator to execute
193    /// * `adaptive_context` - Context with cardinality estimates from planning
194    /// * `config` - Adaptive execution configuration
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if operator execution fails.
199    pub fn execute_adaptive(
200        &self,
201        operator: Box<dyn Operator>,
202        adaptive_context: Option<AdaptiveContext>,
203        config: &AdaptiveConfig,
204    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
205        // If adaptive is disabled or no context, fall back to normal execution
206        if !config.enabled {
207            let mut op = operator;
208            let result = self.execute(op.as_mut())?;
209            return Ok((result, None));
210        }
211
212        let ctx = match adaptive_context {
213            Some(ctx) => ctx,
214            None => {
215                let mut op = operator;
216                let result = self.execute(op.as_mut())?;
217                return Ok((result, None));
218            }
219        };
220
221        // Create shared context for tracking
222        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
223            config.threshold,
224            config.min_rows,
225        ));
226
227        // Copy estimates from the planning context to the shared tracking context
228        for (op_id, checkpoint) in ctx.all_checkpoints() {
229            if let Some(mut inner) = shared_ctx.snapshot() {
230                inner.set_estimate(op_id, checkpoint.estimated);
231            }
232        }
233
234        // Wrap operator with tracking
235        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
236
237        // Execute with tracking
238        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
239        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
240        let mut total_rows: u64 = 0;
241        let check_interval = config.min_rows;
242
243        loop {
244            match wrapped.next() {
245                Ok(Some(chunk)) => {
246                    let chunk_rows = chunk.row_count();
247                    total_rows += chunk_rows as u64;
248
249                    // Capture column types from first non-empty chunk
250                    if !types_captured && chunk.column_count() > 0 {
251                        self.capture_column_types(&chunk, &mut result);
252                        types_captured = true;
253                    }
254                    self.collect_chunk(&chunk, &mut result)?;
255
256                    // Periodically check for significant deviation
257                    if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
258                        if shared_ctx.should_reoptimize() {
259                            // For now, just log/note that re-optimization would trigger
260                            // Full re-optimization would require plan regeneration
261                            // which is a more invasive change
262                        }
263                    }
264                }
265                Ok(None) => break,
266                Err(err) => return Err(convert_operator_error(err)),
267            }
268        }
269
270        // Get final summary
271        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
272
273        Ok((result, summary))
274    }
275}
276
277impl Default for Executor {
278    fn default() -> Self {
279        Self::new()
280    }
281}
282
283/// Converts an operator error to a common error.
284fn convert_operator_error(err: OperatorError) -> Error {
285    match err {
286        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
287        OperatorError::ColumnNotFound(name) => {
288            Error::InvalidValue(format!("Column not found: {name}"))
289        }
290        OperatorError::Execution(msg) => Error::Internal(msg),
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use grafeo_common::types::LogicalType;
298    use grafeo_core::execution::DataChunk;
299
300    /// A mock operator that generates chunks with integer data on demand.
301    struct MockIntOperator {
302        values: Vec<i64>,
303        position: usize,
304        chunk_size: usize,
305    }
306
307    impl MockIntOperator {
308        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
309            Self {
310                values,
311                position: 0,
312                chunk_size,
313            }
314        }
315    }
316
317    impl Operator for MockIntOperator {
318        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
319            if self.position >= self.values.len() {
320                return Ok(None);
321            }
322
323            let end = (self.position + self.chunk_size).min(self.values.len());
324            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
325
326            {
327                let col = chunk.column_mut(0).unwrap();
328                for i in self.position..end {
329                    col.push_int64(self.values[i]);
330                }
331            }
332            chunk.set_count(end - self.position);
333            self.position = end;
334
335            Ok(Some(chunk))
336        }
337
338        fn reset(&mut self) {
339            self.position = 0;
340        }
341
342        fn name(&self) -> &'static str {
343            "MockInt"
344        }
345    }
346
347    /// Empty mock operator for testing empty results.
348    struct EmptyOperator;
349
350    impl Operator for EmptyOperator {
351        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
352            Ok(None)
353        }
354
355        fn reset(&mut self) {}
356
357        fn name(&self) -> &'static str {
358            "Empty"
359        }
360    }
361
362    #[test]
363    fn test_executor_empty() {
364        let executor = Executor::with_columns(vec!["a".to_string()]);
365        let mut op = EmptyOperator;
366
367        let result = executor.execute(&mut op).unwrap();
368        assert!(result.is_empty());
369        assert_eq!(result.column_count(), 1);
370    }
371
372    #[test]
373    fn test_executor_single_chunk() {
374        let executor = Executor::with_columns(vec!["value".to_string()]);
375        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
376
377        let result = executor.execute(&mut op).unwrap();
378        assert_eq!(result.row_count(), 3);
379        assert_eq!(result.rows[0][0], Value::Int64(1));
380        assert_eq!(result.rows[1][0], Value::Int64(2));
381        assert_eq!(result.rows[2][0], Value::Int64(3));
382    }
383
384    #[test]
385    fn test_executor_with_limit() {
386        let executor = Executor::with_columns(vec!["value".to_string()]);
387        let mut op = MockIntOperator::new((0..10).collect(), 100);
388
389        let result = executor.execute_with_limit(&mut op, 5).unwrap();
390        assert_eq!(result.row_count(), 5);
391    }
392}