Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5use std::time::Instant;
6
7use crate::config::AdaptiveConfig;
8use crate::database::QueryResult;
9use grafeo_common::types::{LogicalType, Value};
10use grafeo_common::utils::error::{Error, QueryError, Result};
11use grafeo_core::execution::operators::{Operator, OperatorError};
12use grafeo_core::execution::{
13    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
14};
15
16/// Executes a physical operator tree and collects results.
17pub struct Executor {
18    /// Column names for the result.
19    columns: Vec<String>,
20    /// Column types for the result.
21    column_types: Vec<LogicalType>,
22    /// Wall-clock deadline after which execution is aborted.
23    deadline: Option<Instant>,
24}
25
26impl Executor {
27    /// Creates a new executor.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            columns: Vec::new(),
32            column_types: Vec::new(),
33            deadline: None,
34        }
35    }
36
37    /// Creates an executor with specified column names.
38    #[must_use]
39    pub fn with_columns(columns: Vec<String>) -> Self {
40        let len = columns.len();
41        Self {
42            columns,
43            column_types: vec![LogicalType::Any; len],
44            deadline: None,
45        }
46    }
47
48    /// Creates an executor with specified column names and types.
49    #[must_use]
50    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
51        Self {
52            columns,
53            column_types,
54            deadline: None,
55        }
56    }
57
58    /// Sets a wall-clock deadline for query execution.
59    #[must_use]
60    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
61        self.deadline = deadline;
62        self
63    }
64
65    /// Checks whether the deadline has been exceeded.
66    fn check_deadline(&self) -> Result<()> {
67        if let Some(deadline) = self.deadline
68            && Instant::now() >= deadline
69        {
70            return Err(Error::Query(QueryError::timeout()));
71        }
72        Ok(())
73    }
74
75    /// Executes a physical operator and collects all results.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if operator execution fails or the query timeout is exceeded.
80    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
81        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
82        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
83
84        loop {
85            self.check_deadline()?;
86
87            match operator.next() {
88                Ok(Some(chunk)) => {
89                    // Capture column types from first non-empty chunk
90                    if !types_captured && chunk.column_count() > 0 {
91                        self.capture_column_types(&chunk, &mut result);
92                        types_captured = true;
93                    }
94                    self.collect_chunk(&chunk, &mut result)?;
95                }
96                Ok(None) => break,
97                Err(err) => return Err(convert_operator_error(err)),
98            }
99        }
100
101        Ok(result)
102    }
103
104    /// Executes and returns at most `limit` rows.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if operator execution fails or the query timeout is exceeded.
109    pub fn execute_with_limit(
110        &self,
111        operator: &mut dyn Operator,
112        limit: usize,
113    ) -> Result<QueryResult> {
114        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
115        let mut collected = 0;
116        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
117
118        loop {
119            if collected >= limit {
120                break;
121            }
122
123            self.check_deadline()?;
124
125            match operator.next() {
126                Ok(Some(chunk)) => {
127                    // Capture column types from first non-empty chunk
128                    if !types_captured && chunk.column_count() > 0 {
129                        self.capture_column_types(&chunk, &mut result);
130                        types_captured = true;
131                    }
132                    let remaining = limit - collected;
133                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
134                }
135                Ok(None) => break,
136                Err(err) => return Err(convert_operator_error(err)),
137            }
138        }
139
140        Ok(result)
141    }
142
143    /// Captures column types from a DataChunk.
144    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
145        let col_count = chunk.column_count();
146        result.column_types = Vec::with_capacity(col_count);
147        for col_idx in 0..col_count {
148            let col_type = chunk
149                .column(col_idx)
150                .map_or(LogicalType::Any, |col| col.data_type().clone());
151            result.column_types.push(col_type);
152        }
153    }
154
155    /// Collects all rows from a DataChunk into the result.
156    ///
157    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
158    /// (e.g., after filtering operations).
159    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
160        let col_count = chunk.column_count();
161        let mut collected = 0;
162
163        for row_idx in chunk.selected_indices() {
164            let mut row = Vec::with_capacity(col_count);
165            for col_idx in 0..col_count {
166                let value = chunk
167                    .column(col_idx)
168                    .and_then(|col| col.get_value(row_idx))
169                    .unwrap_or(Value::Null);
170                row.push(value);
171            }
172            result.rows.push(row);
173            collected += 1;
174        }
175
176        Ok(collected)
177    }
178
179    /// Collects up to `limit` rows from a DataChunk.
180    ///
181    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
182    /// (e.g., after filtering operations).
183    fn collect_chunk_limited(
184        &self,
185        chunk: &DataChunk,
186        result: &mut QueryResult,
187        limit: usize,
188    ) -> Result<usize> {
189        let col_count = chunk.column_count();
190        let mut collected = 0;
191
192        for row_idx in chunk.selected_indices() {
193            if collected >= limit {
194                break;
195            }
196            let mut row = Vec::with_capacity(col_count);
197            for col_idx in 0..col_count {
198                let value = chunk
199                    .column(col_idx)
200                    .and_then(|col| col.get_value(row_idx))
201                    .unwrap_or(Value::Null);
202                row.push(value);
203            }
204            result.rows.push(row);
205            collected += 1;
206        }
207
208        Ok(collected)
209    }
210
211    /// Executes a physical operator with adaptive cardinality tracking.
212    ///
213    /// This wraps the operator in a cardinality tracking layer and monitors
214    /// deviation from estimates during execution. The adaptive summary is
215    /// returned alongside the query result.
216    ///
217    /// # Arguments
218    ///
219    /// * `operator` - The root physical operator to execute
220    /// * `adaptive_context` - Context with cardinality estimates from planning
221    /// * `config` - Adaptive execution configuration
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if operator execution fails.
226    pub fn execute_adaptive(
227        &self,
228        operator: Box<dyn Operator>,
229        adaptive_context: Option<AdaptiveContext>,
230        config: &AdaptiveConfig,
231    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
232        // If adaptive is disabled or no context, fall back to normal execution
233        if !config.enabled {
234            let mut op = operator;
235            let result = self.execute(op.as_mut())?;
236            return Ok((result, None));
237        }
238
239        let Some(ctx) = adaptive_context else {
240            let mut op = operator;
241            let result = self.execute(op.as_mut())?;
242            return Ok((result, None));
243        };
244
245        // Create shared context for tracking
246        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
247            config.threshold,
248            config.min_rows,
249        ));
250
251        // Copy estimates from the planning context to the shared tracking context
252        for (op_id, checkpoint) in ctx.all_checkpoints() {
253            if let Some(mut inner) = shared_ctx.snapshot() {
254                inner.set_estimate(op_id, checkpoint.estimated);
255            }
256        }
257
258        // Wrap operator with tracking
259        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
260
261        // Execute with tracking
262        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
263        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
264        let mut total_rows: u64 = 0;
265        let check_interval = config.min_rows;
266
267        loop {
268            self.check_deadline()?;
269
270            match wrapped.next() {
271                Ok(Some(chunk)) => {
272                    let chunk_rows = chunk.row_count();
273                    total_rows += chunk_rows as u64;
274
275                    // Capture column types from first non-empty chunk
276                    if !types_captured && chunk.column_count() > 0 {
277                        self.capture_column_types(&chunk, &mut result);
278                        types_captured = true;
279                    }
280                    self.collect_chunk(&chunk, &mut result)?;
281
282                    // Periodically check for significant deviation
283                    if total_rows >= check_interval
284                        && total_rows.is_multiple_of(check_interval)
285                        && shared_ctx.should_reoptimize()
286                    {
287                        // For now, just log/note that re-optimization would trigger
288                        // Full re-optimization would require plan regeneration
289                        // which is a more invasive change
290                    }
291                }
292                Ok(None) => break,
293                Err(err) => return Err(convert_operator_error(err)),
294            }
295        }
296
297        // Get final summary
298        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
299
300        Ok((result, summary))
301    }
302}
303
304impl Default for Executor {
305    fn default() -> Self {
306        Self::new()
307    }
308}
309
310/// Converts an operator error to a common error.
311fn convert_operator_error(err: OperatorError) -> Error {
312    match err {
313        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
314        OperatorError::ColumnNotFound(name) => {
315            Error::InvalidValue(format!("Column not found: {name}"))
316        }
317        OperatorError::Execution(msg) => Error::Internal(msg),
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use grafeo_common::types::LogicalType;
325    use grafeo_core::execution::DataChunk;
326
327    /// A mock operator that generates chunks with integer data on demand.
328    struct MockIntOperator {
329        values: Vec<i64>,
330        position: usize,
331        chunk_size: usize,
332    }
333
334    impl MockIntOperator {
335        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
336            Self {
337                values,
338                position: 0,
339                chunk_size,
340            }
341        }
342    }
343
344    impl Operator for MockIntOperator {
345        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
346            if self.position >= self.values.len() {
347                return Ok(None);
348            }
349
350            let end = (self.position + self.chunk_size).min(self.values.len());
351            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
352
353            {
354                let col = chunk.column_mut(0).unwrap();
355                for i in self.position..end {
356                    col.push_int64(self.values[i]);
357                }
358            }
359            chunk.set_count(end - self.position);
360            self.position = end;
361
362            Ok(Some(chunk))
363        }
364
365        fn reset(&mut self) {
366            self.position = 0;
367        }
368
369        fn name(&self) -> &'static str {
370            "MockInt"
371        }
372    }
373
374    /// Empty mock operator for testing empty results.
375    struct EmptyOperator;
376
377    impl Operator for EmptyOperator {
378        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
379            Ok(None)
380        }
381
382        fn reset(&mut self) {}
383
384        fn name(&self) -> &'static str {
385            "Empty"
386        }
387    }
388
389    #[test]
390    fn test_executor_empty() {
391        let executor = Executor::with_columns(vec!["a".to_string()]);
392        let mut op = EmptyOperator;
393
394        let result = executor.execute(&mut op).unwrap();
395        assert!(result.is_empty());
396        assert_eq!(result.column_count(), 1);
397    }
398
399    #[test]
400    fn test_executor_single_chunk() {
401        let executor = Executor::with_columns(vec!["value".to_string()]);
402        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
403
404        let result = executor.execute(&mut op).unwrap();
405        assert_eq!(result.row_count(), 3);
406        assert_eq!(result.rows[0][0], Value::Int64(1));
407        assert_eq!(result.rows[1][0], Value::Int64(2));
408        assert_eq!(result.rows[2][0], Value::Int64(3));
409    }
410
411    #[test]
412    fn test_executor_with_limit() {
413        let executor = Executor::with_columns(vec!["value".to_string()]);
414        let mut op = MockIntOperator::new((0..10).collect(), 100);
415
416        let result = executor.execute_with_limit(&mut op, 5).unwrap();
417        assert_eq!(result.row_count(), 5);
418    }
419
420    #[test]
421    fn test_executor_timeout_expired() {
422        use std::time::{Duration, Instant};
423
424        // Set a deadline that has already passed
425        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
426            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
427        ));
428        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
429
430        let result = executor.execute(&mut op);
431        assert!(result.is_err());
432        let err = result.unwrap_err();
433        assert!(
434            err.to_string().contains("Query exceeded timeout"),
435            "Expected timeout error, got: {err}"
436        );
437    }
438
439    #[test]
440    fn test_executor_no_timeout() {
441        // No deadline set - should execute normally
442        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
443        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
444
445        let result = executor.execute(&mut op).unwrap();
446        assert_eq!(result.row_count(), 3);
447    }
448}