Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21/// Executes a physical operator tree and collects results.
22pub struct Executor {
23    /// Column names for the result.
24    columns: Vec<String>,
25    /// Column types for the result.
26    column_types: Vec<LogicalType>,
27    /// Wall-clock deadline after which execution is aborted.
28    deadline: Option<Instant>,
29}
30
31impl Executor {
32    /// Creates a new executor.
33    #[must_use]
34    pub fn new() -> Self {
35        Self {
36            columns: Vec::new(),
37            column_types: Vec::new(),
38            deadline: None,
39        }
40    }
41
42    /// Creates an executor with specified column names.
43    #[must_use]
44    pub fn with_columns(columns: Vec<String>) -> Self {
45        let len = columns.len();
46        Self {
47            columns,
48            column_types: vec![LogicalType::Any; len],
49            deadline: None,
50        }
51    }
52
53    /// Creates an executor with specified column names and types.
54    #[must_use]
55    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56        Self {
57            columns,
58            column_types,
59            deadline: None,
60        }
61    }
62
63    /// Sets a wall-clock deadline for query execution.
64    #[must_use]
65    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66        self.deadline = deadline;
67        self
68    }
69
70    /// Checks whether the deadline has been exceeded.
71    fn check_deadline(&self) -> Result<()> {
72        if let Some(deadline) = self.deadline
73            && Instant::now() >= deadline
74        {
75            return Err(Error::Query(QueryError::timeout()));
76        }
77        Ok(())
78    }
79
80    /// Executes a physical operator and collects all results.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if operator execution fails or the query timeout is exceeded.
85    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
86        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
87        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
88
89        loop {
90            self.check_deadline()?;
91
92            match operator.next() {
93                Ok(Some(chunk)) => {
94                    // Capture column types from first non-empty chunk
95                    if !types_captured && chunk.column_count() > 0 {
96                        self.capture_column_types(&chunk, &mut result);
97                        types_captured = true;
98                    }
99                    self.collect_chunk(&chunk, &mut result)?;
100                }
101                Ok(None) => break,
102                Err(err) => return Err(convert_operator_error(err)),
103            }
104        }
105
106        Ok(result)
107    }
108
109    /// Executes and returns at most `limit` rows.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if operator execution fails or the query timeout is exceeded.
114    pub fn execute_with_limit(
115        &self,
116        operator: &mut dyn Operator,
117        limit: usize,
118    ) -> Result<QueryResult> {
119        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
120        let mut collected = 0;
121        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
122
123        loop {
124            if collected >= limit {
125                break;
126            }
127
128            self.check_deadline()?;
129
130            match operator.next() {
131                Ok(Some(chunk)) => {
132                    // Capture column types from first non-empty chunk
133                    if !types_captured && chunk.column_count() > 0 {
134                        self.capture_column_types(&chunk, &mut result);
135                        types_captured = true;
136                    }
137                    let remaining = limit - collected;
138                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
139                }
140                Ok(None) => break,
141                Err(err) => return Err(convert_operator_error(err)),
142            }
143        }
144
145        Ok(result)
146    }
147
148    /// Captures column types from a DataChunk.
149    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
150        let col_count = chunk.column_count();
151        result.column_types = Vec::with_capacity(col_count);
152        for col_idx in 0..col_count {
153            let col_type = chunk
154                .column(col_idx)
155                .map_or(LogicalType::Any, |col| col.data_type().clone());
156            result.column_types.push(col_type);
157        }
158    }
159
160    /// Collects all rows from a DataChunk into the result.
161    ///
162    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
163    /// (e.g., after filtering operations).
164    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
165        let col_count = chunk.column_count();
166        let mut collected = 0;
167
168        for row_idx in chunk.selected_indices() {
169            let mut row = Vec::with_capacity(col_count);
170            for col_idx in 0..col_count {
171                let value = chunk
172                    .column(col_idx)
173                    .and_then(|col| col.get_value(row_idx))
174                    .unwrap_or(Value::Null);
175                row.push(value);
176            }
177            result.rows.push(row);
178            collected += 1;
179        }
180
181        Ok(collected)
182    }
183
184    /// Collects up to `limit` rows from a DataChunk.
185    ///
186    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
187    /// (e.g., after filtering operations).
188    fn collect_chunk_limited(
189        &self,
190        chunk: &DataChunk,
191        result: &mut QueryResult,
192        limit: usize,
193    ) -> Result<usize> {
194        let col_count = chunk.column_count();
195        let mut collected = 0;
196
197        for row_idx in chunk.selected_indices() {
198            if collected >= limit {
199                break;
200            }
201            let mut row = Vec::with_capacity(col_count);
202            for col_idx in 0..col_count {
203                let value = chunk
204                    .column(col_idx)
205                    .and_then(|col| col.get_value(row_idx))
206                    .unwrap_or(Value::Null);
207                row.push(value);
208            }
209            result.rows.push(row);
210            collected += 1;
211        }
212
213        Ok(collected)
214    }
215
216    /// Executes a physical operator with adaptive cardinality tracking.
217    ///
218    /// This wraps the operator in a cardinality tracking layer and monitors
219    /// deviation from estimates during execution. The adaptive summary is
220    /// returned alongside the query result.
221    ///
222    /// # Arguments
223    ///
224    /// * `operator` - The root physical operator to execute
225    /// * `adaptive_context` - Context with cardinality estimates from planning
226    /// * `config` - Adaptive execution configuration
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if operator execution fails.
231    pub fn execute_adaptive(
232        &self,
233        operator: Box<dyn Operator>,
234        adaptive_context: Option<AdaptiveContext>,
235        config: &AdaptiveConfig,
236    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
237        // If adaptive is disabled or no context, fall back to normal execution
238        if !config.enabled {
239            let mut op = operator;
240            let result = self.execute(op.as_mut())?;
241            return Ok((result, None));
242        }
243
244        let Some(ctx) = adaptive_context else {
245            let mut op = operator;
246            let result = self.execute(op.as_mut())?;
247            return Ok((result, None));
248        };
249
250        // Create shared context for tracking
251        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
252            config.threshold,
253            config.min_rows,
254        ));
255
256        // Copy estimates from the planning context to the shared tracking context
257        for (op_id, checkpoint) in ctx.all_checkpoints() {
258            if let Some(mut inner) = shared_ctx.snapshot() {
259                inner.set_estimate(op_id, checkpoint.estimated);
260            }
261        }
262
263        // Wrap operator with tracking
264        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
265
266        // Execute with tracking
267        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
268        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
269        let mut total_rows: u64 = 0;
270        let check_interval = config.min_rows;
271
272        loop {
273            self.check_deadline()?;
274
275            match wrapped.next() {
276                Ok(Some(chunk)) => {
277                    let chunk_rows = chunk.row_count();
278                    total_rows += chunk_rows as u64;
279
280                    // Capture column types from first non-empty chunk
281                    if !types_captured && chunk.column_count() > 0 {
282                        self.capture_column_types(&chunk, &mut result);
283                        types_captured = true;
284                    }
285                    self.collect_chunk(&chunk, &mut result)?;
286
287                    // Periodically check for significant deviation
288                    if total_rows >= check_interval
289                        && total_rows.is_multiple_of(check_interval)
290                        && shared_ctx.should_reoptimize()
291                    {
292                        // For now, just log/note that re-optimization would trigger
293                        // Full re-optimization would require plan regeneration
294                        // which is a more invasive change
295                    }
296                }
297                Ok(None) => break,
298                Err(err) => return Err(convert_operator_error(err)),
299            }
300        }
301
302        // Get final summary
303        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
304
305        Ok((result, summary))
306    }
307}
308
309impl Default for Executor {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315/// Converts an operator error to a common error.
316fn convert_operator_error(err: OperatorError) -> Error {
317    match err {
318        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
319        OperatorError::ColumnNotFound(name) => {
320            Error::InvalidValue(format!("Column not found: {name}"))
321        }
322        OperatorError::Execution(msg) => Error::Internal(msg),
323        OperatorError::ConstraintViolation(msg) => {
324            Error::InvalidValue(format!("Constraint violation: {msg}"))
325        }
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use grafeo_common::types::LogicalType;
333    use grafeo_core::execution::DataChunk;
334
335    /// A mock operator that generates chunks with integer data on demand.
336    struct MockIntOperator {
337        values: Vec<i64>,
338        position: usize,
339        chunk_size: usize,
340    }
341
342    impl MockIntOperator {
343        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
344            Self {
345                values,
346                position: 0,
347                chunk_size,
348            }
349        }
350    }
351
352    impl Operator for MockIntOperator {
353        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
354            if self.position >= self.values.len() {
355                return Ok(None);
356            }
357
358            let end = (self.position + self.chunk_size).min(self.values.len());
359            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
360
361            {
362                let col = chunk.column_mut(0).unwrap();
363                for i in self.position..end {
364                    col.push_int64(self.values[i]);
365                }
366            }
367            chunk.set_count(end - self.position);
368            self.position = end;
369
370            Ok(Some(chunk))
371        }
372
373        fn reset(&mut self) {
374            self.position = 0;
375        }
376
377        fn name(&self) -> &'static str {
378            "MockInt"
379        }
380    }
381
382    /// Empty mock operator for testing empty results.
383    struct EmptyOperator;
384
385    impl Operator for EmptyOperator {
386        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
387            Ok(None)
388        }
389
390        fn reset(&mut self) {}
391
392        fn name(&self) -> &'static str {
393            "Empty"
394        }
395    }
396
397    #[test]
398    fn test_executor_empty() {
399        let executor = Executor::with_columns(vec!["a".to_string()]);
400        let mut op = EmptyOperator;
401
402        let result = executor.execute(&mut op).unwrap();
403        assert!(result.is_empty());
404        assert_eq!(result.column_count(), 1);
405    }
406
407    #[test]
408    fn test_executor_single_chunk() {
409        let executor = Executor::with_columns(vec!["value".to_string()]);
410        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
411
412        let result = executor.execute(&mut op).unwrap();
413        assert_eq!(result.row_count(), 3);
414        assert_eq!(result.rows[0][0], Value::Int64(1));
415        assert_eq!(result.rows[1][0], Value::Int64(2));
416        assert_eq!(result.rows[2][0], Value::Int64(3));
417    }
418
419    #[test]
420    fn test_executor_with_limit() {
421        let executor = Executor::with_columns(vec!["value".to_string()]);
422        let mut op = MockIntOperator::new((0..10).collect(), 100);
423
424        let result = executor.execute_with_limit(&mut op, 5).unwrap();
425        assert_eq!(result.row_count(), 5);
426    }
427
428    #[test]
429    fn test_executor_timeout_expired() {
430        use std::time::{Duration, Instant};
431
432        // Set a deadline that has already passed
433        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
434            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
435        ));
436        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
437
438        let result = executor.execute(&mut op);
439        assert!(result.is_err());
440        let err = result.unwrap_err();
441        assert!(
442            err.to_string().contains("Query exceeded timeout"),
443            "Expected timeout error, got: {err}"
444        );
445    }
446
447    #[test]
448    fn test_executor_no_timeout() {
449        // No deadline set - should execute normally
450        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
451        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
452
453        let result = executor.execute(&mut op).unwrap();
454        assert_eq!(result.row_count(), 3);
455    }
456}