Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7
8use std::time::Instant;
9
10use crate::config::AdaptiveConfig;
11use crate::database::QueryResult;
12use grafeo_common::types::{LogicalType, Value};
13use grafeo_common::utils::error::{Error, QueryError, Result};
14use grafeo_core::execution::operators::{Operator, OperatorError};
15use grafeo_core::execution::{
16    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
17};
18
19/// Executes a physical operator tree and collects results.
20pub struct Executor {
21    /// Column names for the result.
22    columns: Vec<String>,
23    /// Column types for the result.
24    column_types: Vec<LogicalType>,
25    /// Wall-clock deadline after which execution is aborted.
26    deadline: Option<Instant>,
27}
28
29impl Executor {
30    /// Creates a new executor.
31    #[must_use]
32    pub fn new() -> Self {
33        Self {
34            columns: Vec::new(),
35            column_types: Vec::new(),
36            deadline: None,
37        }
38    }
39
40    /// Creates an executor with specified column names.
41    #[must_use]
42    pub fn with_columns(columns: Vec<String>) -> Self {
43        let len = columns.len();
44        Self {
45            columns,
46            column_types: vec![LogicalType::Any; len],
47            deadline: None,
48        }
49    }
50
51    /// Creates an executor with specified column names and types.
52    #[must_use]
53    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
54        Self {
55            columns,
56            column_types,
57            deadline: None,
58        }
59    }
60
61    /// Sets a wall-clock deadline for query execution.
62    #[must_use]
63    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
64        self.deadline = deadline;
65        self
66    }
67
68    /// Checks whether the deadline has been exceeded.
69    fn check_deadline(&self) -> Result<()> {
70        if let Some(deadline) = self.deadline
71            && Instant::now() >= deadline
72        {
73            return Err(Error::Query(QueryError::timeout()));
74        }
75        Ok(())
76    }
77
78    /// Executes a physical operator and collects all results.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if operator execution fails or the query timeout is exceeded.
83    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
84        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
85        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
86
87        loop {
88            self.check_deadline()?;
89
90            match operator.next() {
91                Ok(Some(chunk)) => {
92                    // Capture column types from first non-empty chunk
93                    if !types_captured && chunk.column_count() > 0 {
94                        self.capture_column_types(&chunk, &mut result);
95                        types_captured = true;
96                    }
97                    self.collect_chunk(&chunk, &mut result)?;
98                }
99                Ok(None) => break,
100                Err(err) => return Err(convert_operator_error(err)),
101            }
102        }
103
104        Ok(result)
105    }
106
107    /// Executes and returns at most `limit` rows.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if operator execution fails or the query timeout is exceeded.
112    pub fn execute_with_limit(
113        &self,
114        operator: &mut dyn Operator,
115        limit: usize,
116    ) -> Result<QueryResult> {
117        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
118        let mut collected = 0;
119        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
120
121        loop {
122            if collected >= limit {
123                break;
124            }
125
126            self.check_deadline()?;
127
128            match operator.next() {
129                Ok(Some(chunk)) => {
130                    // Capture column types from first non-empty chunk
131                    if !types_captured && chunk.column_count() > 0 {
132                        self.capture_column_types(&chunk, &mut result);
133                        types_captured = true;
134                    }
135                    let remaining = limit - collected;
136                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
137                }
138                Ok(None) => break,
139                Err(err) => return Err(convert_operator_error(err)),
140            }
141        }
142
143        Ok(result)
144    }
145
146    /// Captures column types from a DataChunk.
147    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
148        let col_count = chunk.column_count();
149        result.column_types = Vec::with_capacity(col_count);
150        for col_idx in 0..col_count {
151            let col_type = chunk
152                .column(col_idx)
153                .map_or(LogicalType::Any, |col| col.data_type().clone());
154            result.column_types.push(col_type);
155        }
156    }
157
158    /// Collects all rows from a DataChunk into the result.
159    ///
160    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
161    /// (e.g., after filtering operations).
162    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
163        let col_count = chunk.column_count();
164        let mut collected = 0;
165
166        for row_idx in chunk.selected_indices() {
167            let mut row = Vec::with_capacity(col_count);
168            for col_idx in 0..col_count {
169                let value = chunk
170                    .column(col_idx)
171                    .and_then(|col| col.get_value(row_idx))
172                    .unwrap_or(Value::Null);
173                row.push(value);
174            }
175            result.rows.push(row);
176            collected += 1;
177        }
178
179        Ok(collected)
180    }
181
182    /// Collects up to `limit` rows from a DataChunk.
183    ///
184    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
185    /// (e.g., after filtering operations).
186    fn collect_chunk_limited(
187        &self,
188        chunk: &DataChunk,
189        result: &mut QueryResult,
190        limit: usize,
191    ) -> Result<usize> {
192        let col_count = chunk.column_count();
193        let mut collected = 0;
194
195        for row_idx in chunk.selected_indices() {
196            if collected >= limit {
197                break;
198            }
199            let mut row = Vec::with_capacity(col_count);
200            for col_idx in 0..col_count {
201                let value = chunk
202                    .column(col_idx)
203                    .and_then(|col| col.get_value(row_idx))
204                    .unwrap_or(Value::Null);
205                row.push(value);
206            }
207            result.rows.push(row);
208            collected += 1;
209        }
210
211        Ok(collected)
212    }
213
214    /// Executes a physical operator with adaptive cardinality tracking.
215    ///
216    /// This wraps the operator in a cardinality tracking layer and monitors
217    /// deviation from estimates during execution. The adaptive summary is
218    /// returned alongside the query result.
219    ///
220    /// # Arguments
221    ///
222    /// * `operator` - The root physical operator to execute
223    /// * `adaptive_context` - Context with cardinality estimates from planning
224    /// * `config` - Adaptive execution configuration
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if operator execution fails.
229    pub fn execute_adaptive(
230        &self,
231        operator: Box<dyn Operator>,
232        adaptive_context: Option<AdaptiveContext>,
233        config: &AdaptiveConfig,
234    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
235        // If adaptive is disabled or no context, fall back to normal execution
236        if !config.enabled {
237            let mut op = operator;
238            let result = self.execute(op.as_mut())?;
239            return Ok((result, None));
240        }
241
242        let Some(ctx) = adaptive_context else {
243            let mut op = operator;
244            let result = self.execute(op.as_mut())?;
245            return Ok((result, None));
246        };
247
248        // Create shared context for tracking
249        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
250            config.threshold,
251            config.min_rows,
252        ));
253
254        // Copy estimates from the planning context to the shared tracking context
255        for (op_id, checkpoint) in ctx.all_checkpoints() {
256            if let Some(mut inner) = shared_ctx.snapshot() {
257                inner.set_estimate(op_id, checkpoint.estimated);
258            }
259        }
260
261        // Wrap operator with tracking
262        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
263
264        // Execute with tracking
265        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
266        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
267        let mut total_rows: u64 = 0;
268        let check_interval = config.min_rows;
269
270        loop {
271            self.check_deadline()?;
272
273            match wrapped.next() {
274                Ok(Some(chunk)) => {
275                    let chunk_rows = chunk.row_count();
276                    total_rows += chunk_rows as u64;
277
278                    // Capture column types from first non-empty chunk
279                    if !types_captured && chunk.column_count() > 0 {
280                        self.capture_column_types(&chunk, &mut result);
281                        types_captured = true;
282                    }
283                    self.collect_chunk(&chunk, &mut result)?;
284
285                    // Periodically check for significant deviation
286                    if total_rows >= check_interval
287                        && total_rows.is_multiple_of(check_interval)
288                        && shared_ctx.should_reoptimize()
289                    {
290                        // For now, just log/note that re-optimization would trigger
291                        // Full re-optimization would require plan regeneration
292                        // which is a more invasive change
293                    }
294                }
295                Ok(None) => break,
296                Err(err) => return Err(convert_operator_error(err)),
297            }
298        }
299
300        // Get final summary
301        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
302
303        Ok((result, summary))
304    }
305}
306
307impl Default for Executor {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313/// Converts an operator error to a common error.
314fn convert_operator_error(err: OperatorError) -> Error {
315    match err {
316        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
317        OperatorError::ColumnNotFound(name) => {
318            Error::InvalidValue(format!("Column not found: {name}"))
319        }
320        OperatorError::Execution(msg) => Error::Internal(msg),
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use grafeo_common::types::LogicalType;
328    use grafeo_core::execution::DataChunk;
329
330    /// A mock operator that generates chunks with integer data on demand.
331    struct MockIntOperator {
332        values: Vec<i64>,
333        position: usize,
334        chunk_size: usize,
335    }
336
337    impl MockIntOperator {
338        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
339            Self {
340                values,
341                position: 0,
342                chunk_size,
343            }
344        }
345    }
346
347    impl Operator for MockIntOperator {
348        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
349            if self.position >= self.values.len() {
350                return Ok(None);
351            }
352
353            let end = (self.position + self.chunk_size).min(self.values.len());
354            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
355
356            {
357                let col = chunk.column_mut(0).unwrap();
358                for i in self.position..end {
359                    col.push_int64(self.values[i]);
360                }
361            }
362            chunk.set_count(end - self.position);
363            self.position = end;
364
365            Ok(Some(chunk))
366        }
367
368        fn reset(&mut self) {
369            self.position = 0;
370        }
371
372        fn name(&self) -> &'static str {
373            "MockInt"
374        }
375    }
376
377    /// Empty mock operator for testing empty results.
378    struct EmptyOperator;
379
380    impl Operator for EmptyOperator {
381        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
382            Ok(None)
383        }
384
385        fn reset(&mut self) {}
386
387        fn name(&self) -> &'static str {
388            "Empty"
389        }
390    }
391
392    #[test]
393    fn test_executor_empty() {
394        let executor = Executor::with_columns(vec!["a".to_string()]);
395        let mut op = EmptyOperator;
396
397        let result = executor.execute(&mut op).unwrap();
398        assert!(result.is_empty());
399        assert_eq!(result.column_count(), 1);
400    }
401
402    #[test]
403    fn test_executor_single_chunk() {
404        let executor = Executor::with_columns(vec!["value".to_string()]);
405        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
406
407        let result = executor.execute(&mut op).unwrap();
408        assert_eq!(result.row_count(), 3);
409        assert_eq!(result.rows[0][0], Value::Int64(1));
410        assert_eq!(result.rows[1][0], Value::Int64(2));
411        assert_eq!(result.rows[2][0], Value::Int64(3));
412    }
413
414    #[test]
415    fn test_executor_with_limit() {
416        let executor = Executor::with_columns(vec!["value".to_string()]);
417        let mut op = MockIntOperator::new((0..10).collect(), 100);
418
419        let result = executor.execute_with_limit(&mut op, 5).unwrap();
420        assert_eq!(result.row_count(), 5);
421    }
422
423    #[test]
424    fn test_executor_timeout_expired() {
425        use std::time::{Duration, Instant};
426
427        // Set a deadline that has already passed
428        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
429            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
430        ));
431        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
432
433        let result = executor.execute(&mut op);
434        assert!(result.is_err());
435        let err = result.unwrap_err();
436        assert!(
437            err.to_string().contains("Query exceeded timeout"),
438            "Expected timeout error, got: {err}"
439        );
440    }
441
442    #[test]
443    fn test_executor_no_timeout() {
444        // No deadline set - should execute normally
445        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
446        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
447
448        let result = executor.execute(&mut op).unwrap();
449        assert_eq!(result.row_count(), 3);
450    }
451}