Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5pub mod procedure_call;
6
7use std::time::Instant;
8
9use crate::config::AdaptiveConfig;
10use crate::database::QueryResult;
11use grafeo_common::types::{LogicalType, Value};
12use grafeo_common::utils::error::{Error, QueryError, Result};
13use grafeo_core::execution::operators::{Operator, OperatorError};
14use grafeo_core::execution::{
15    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
16};
17
18/// Executes a physical operator tree and collects results.
19pub struct Executor {
20    /// Column names for the result.
21    columns: Vec<String>,
22    /// Column types for the result.
23    column_types: Vec<LogicalType>,
24    /// Wall-clock deadline after which execution is aborted.
25    deadline: Option<Instant>,
26}
27
28impl Executor {
29    /// Creates a new executor.
30    #[must_use]
31    pub fn new() -> Self {
32        Self {
33            columns: Vec::new(),
34            column_types: Vec::new(),
35            deadline: None,
36        }
37    }
38
39    /// Creates an executor with specified column names.
40    #[must_use]
41    pub fn with_columns(columns: Vec<String>) -> Self {
42        let len = columns.len();
43        Self {
44            columns,
45            column_types: vec![LogicalType::Any; len],
46            deadline: None,
47        }
48    }
49
50    /// Creates an executor with specified column names and types.
51    #[must_use]
52    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
53        Self {
54            columns,
55            column_types,
56            deadline: None,
57        }
58    }
59
60    /// Sets a wall-clock deadline for query execution.
61    #[must_use]
62    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
63        self.deadline = deadline;
64        self
65    }
66
67    /// Checks whether the deadline has been exceeded.
68    fn check_deadline(&self) -> Result<()> {
69        if let Some(deadline) = self.deadline
70            && Instant::now() >= deadline
71        {
72            return Err(Error::Query(QueryError::timeout()));
73        }
74        Ok(())
75    }
76
77    /// Executes a physical operator and collects all results.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if operator execution fails or the query timeout is exceeded.
82    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
83        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
84        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
85
86        loop {
87            self.check_deadline()?;
88
89            match operator.next() {
90                Ok(Some(chunk)) => {
91                    // Capture column types from first non-empty chunk
92                    if !types_captured && chunk.column_count() > 0 {
93                        self.capture_column_types(&chunk, &mut result);
94                        types_captured = true;
95                    }
96                    self.collect_chunk(&chunk, &mut result)?;
97                }
98                Ok(None) => break,
99                Err(err) => return Err(convert_operator_error(err)),
100            }
101        }
102
103        Ok(result)
104    }
105
106    /// Executes and returns at most `limit` rows.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if operator execution fails or the query timeout is exceeded.
111    pub fn execute_with_limit(
112        &self,
113        operator: &mut dyn Operator,
114        limit: usize,
115    ) -> Result<QueryResult> {
116        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
117        let mut collected = 0;
118        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
119
120        loop {
121            if collected >= limit {
122                break;
123            }
124
125            self.check_deadline()?;
126
127            match operator.next() {
128                Ok(Some(chunk)) => {
129                    // Capture column types from first non-empty chunk
130                    if !types_captured && chunk.column_count() > 0 {
131                        self.capture_column_types(&chunk, &mut result);
132                        types_captured = true;
133                    }
134                    let remaining = limit - collected;
135                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
136                }
137                Ok(None) => break,
138                Err(err) => return Err(convert_operator_error(err)),
139            }
140        }
141
142        Ok(result)
143    }
144
145    /// Captures column types from a DataChunk.
146    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
147        let col_count = chunk.column_count();
148        result.column_types = Vec::with_capacity(col_count);
149        for col_idx in 0..col_count {
150            let col_type = chunk
151                .column(col_idx)
152                .map_or(LogicalType::Any, |col| col.data_type().clone());
153            result.column_types.push(col_type);
154        }
155    }
156
157    /// Collects all rows from a DataChunk into the result.
158    ///
159    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
160    /// (e.g., after filtering operations).
161    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
162        let col_count = chunk.column_count();
163        let mut collected = 0;
164
165        for row_idx in chunk.selected_indices() {
166            let mut row = Vec::with_capacity(col_count);
167            for col_idx in 0..col_count {
168                let value = chunk
169                    .column(col_idx)
170                    .and_then(|col| col.get_value(row_idx))
171                    .unwrap_or(Value::Null);
172                row.push(value);
173            }
174            result.rows.push(row);
175            collected += 1;
176        }
177
178        Ok(collected)
179    }
180
181    /// Collects up to `limit` rows from a DataChunk.
182    ///
183    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
184    /// (e.g., after filtering operations).
185    fn collect_chunk_limited(
186        &self,
187        chunk: &DataChunk,
188        result: &mut QueryResult,
189        limit: usize,
190    ) -> Result<usize> {
191        let col_count = chunk.column_count();
192        let mut collected = 0;
193
194        for row_idx in chunk.selected_indices() {
195            if collected >= limit {
196                break;
197            }
198            let mut row = Vec::with_capacity(col_count);
199            for col_idx in 0..col_count {
200                let value = chunk
201                    .column(col_idx)
202                    .and_then(|col| col.get_value(row_idx))
203                    .unwrap_or(Value::Null);
204                row.push(value);
205            }
206            result.rows.push(row);
207            collected += 1;
208        }
209
210        Ok(collected)
211    }
212
213    /// Executes a physical operator with adaptive cardinality tracking.
214    ///
215    /// This wraps the operator in a cardinality tracking layer and monitors
216    /// deviation from estimates during execution. The adaptive summary is
217    /// returned alongside the query result.
218    ///
219    /// # Arguments
220    ///
221    /// * `operator` - The root physical operator to execute
222    /// * `adaptive_context` - Context with cardinality estimates from planning
223    /// * `config` - Adaptive execution configuration
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if operator execution fails.
228    pub fn execute_adaptive(
229        &self,
230        operator: Box<dyn Operator>,
231        adaptive_context: Option<AdaptiveContext>,
232        config: &AdaptiveConfig,
233    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
234        // If adaptive is disabled or no context, fall back to normal execution
235        if !config.enabled {
236            let mut op = operator;
237            let result = self.execute(op.as_mut())?;
238            return Ok((result, None));
239        }
240
241        let Some(ctx) = adaptive_context else {
242            let mut op = operator;
243            let result = self.execute(op.as_mut())?;
244            return Ok((result, None));
245        };
246
247        // Create shared context for tracking
248        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
249            config.threshold,
250            config.min_rows,
251        ));
252
253        // Copy estimates from the planning context to the shared tracking context
254        for (op_id, checkpoint) in ctx.all_checkpoints() {
255            if let Some(mut inner) = shared_ctx.snapshot() {
256                inner.set_estimate(op_id, checkpoint.estimated);
257            }
258        }
259
260        // Wrap operator with tracking
261        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
262
263        // Execute with tracking
264        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
265        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
266        let mut total_rows: u64 = 0;
267        let check_interval = config.min_rows;
268
269        loop {
270            self.check_deadline()?;
271
272            match wrapped.next() {
273                Ok(Some(chunk)) => {
274                    let chunk_rows = chunk.row_count();
275                    total_rows += chunk_rows as u64;
276
277                    // Capture column types from first non-empty chunk
278                    if !types_captured && chunk.column_count() > 0 {
279                        self.capture_column_types(&chunk, &mut result);
280                        types_captured = true;
281                    }
282                    self.collect_chunk(&chunk, &mut result)?;
283
284                    // Periodically check for significant deviation
285                    if total_rows >= check_interval
286                        && total_rows.is_multiple_of(check_interval)
287                        && shared_ctx.should_reoptimize()
288                    {
289                        // For now, just log/note that re-optimization would trigger
290                        // Full re-optimization would require plan regeneration
291                        // which is a more invasive change
292                    }
293                }
294                Ok(None) => break,
295                Err(err) => return Err(convert_operator_error(err)),
296            }
297        }
298
299        // Get final summary
300        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
301
302        Ok((result, summary))
303    }
304}
305
306impl Default for Executor {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Converts an operator error to a common error.
313fn convert_operator_error(err: OperatorError) -> Error {
314    match err {
315        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
316        OperatorError::ColumnNotFound(name) => {
317            Error::InvalidValue(format!("Column not found: {name}"))
318        }
319        OperatorError::Execution(msg) => Error::Internal(msg),
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use grafeo_common::types::LogicalType;
327    use grafeo_core::execution::DataChunk;
328
329    /// A mock operator that generates chunks with integer data on demand.
330    struct MockIntOperator {
331        values: Vec<i64>,
332        position: usize,
333        chunk_size: usize,
334    }
335
336    impl MockIntOperator {
337        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
338            Self {
339                values,
340                position: 0,
341                chunk_size,
342            }
343        }
344    }
345
346    impl Operator for MockIntOperator {
347        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
348            if self.position >= self.values.len() {
349                return Ok(None);
350            }
351
352            let end = (self.position + self.chunk_size).min(self.values.len());
353            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
354
355            {
356                let col = chunk.column_mut(0).unwrap();
357                for i in self.position..end {
358                    col.push_int64(self.values[i]);
359                }
360            }
361            chunk.set_count(end - self.position);
362            self.position = end;
363
364            Ok(Some(chunk))
365        }
366
367        fn reset(&mut self) {
368            self.position = 0;
369        }
370
371        fn name(&self) -> &'static str {
372            "MockInt"
373        }
374    }
375
376    /// Empty mock operator for testing empty results.
377    struct EmptyOperator;
378
379    impl Operator for EmptyOperator {
380        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
381            Ok(None)
382        }
383
384        fn reset(&mut self) {}
385
386        fn name(&self) -> &'static str {
387            "Empty"
388        }
389    }
390
391    #[test]
392    fn test_executor_empty() {
393        let executor = Executor::with_columns(vec!["a".to_string()]);
394        let mut op = EmptyOperator;
395
396        let result = executor.execute(&mut op).unwrap();
397        assert!(result.is_empty());
398        assert_eq!(result.column_count(), 1);
399    }
400
401    #[test]
402    fn test_executor_single_chunk() {
403        let executor = Executor::with_columns(vec!["value".to_string()]);
404        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
405
406        let result = executor.execute(&mut op).unwrap();
407        assert_eq!(result.row_count(), 3);
408        assert_eq!(result.rows[0][0], Value::Int64(1));
409        assert_eq!(result.rows[1][0], Value::Int64(2));
410        assert_eq!(result.rows[2][0], Value::Int64(3));
411    }
412
413    #[test]
414    fn test_executor_with_limit() {
415        let executor = Executor::with_columns(vec!["value".to_string()]);
416        let mut op = MockIntOperator::new((0..10).collect(), 100);
417
418        let result = executor.execute_with_limit(&mut op, 5).unwrap();
419        assert_eq!(result.row_count(), 5);
420    }
421
422    #[test]
423    fn test_executor_timeout_expired() {
424        use std::time::{Duration, Instant};
425
426        // Set a deadline that has already passed
427        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
428            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
429        ));
430        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
431
432        let result = executor.execute(&mut op);
433        assert!(result.is_err());
434        let err = result.unwrap_err();
435        assert!(
436            err.to_string().contains("Query exceeded timeout"),
437            "Expected timeout error, got: {err}"
438        );
439    }
440
441    #[test]
442    fn test_executor_no_timeout() {
443        // No deadline set - should execute normally
444        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
445        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
446
447        let result = executor.execute(&mut op).unwrap();
448        assert_eq!(result.row_count(), 3);
449    }
450}