Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21/// Executes a physical operator tree and collects results.
22pub struct Executor {
23    /// Column names for the result.
24    columns: Vec<String>,
25    /// Column types for the result.
26    column_types: Vec<LogicalType>,
27    /// Wall-clock deadline after which execution is aborted.
28    deadline: Option<Instant>,
29}
30
31impl Executor {
32    /// Creates a new executor.
33    #[must_use]
34    pub fn new() -> Self {
35        Self {
36            columns: Vec::new(),
37            column_types: Vec::new(),
38            deadline: None,
39        }
40    }
41
42    /// Creates an executor with specified column names.
43    #[must_use]
44    pub fn with_columns(columns: Vec<String>) -> Self {
45        let len = columns.len();
46        Self {
47            columns,
48            column_types: vec![LogicalType::Any; len],
49            deadline: None,
50        }
51    }
52
53    /// Creates an executor with specified column names and types.
54    #[must_use]
55    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56        Self {
57            columns,
58            column_types,
59            deadline: None,
60        }
61    }
62
63    /// Sets a wall-clock deadline for query execution.
64    #[must_use]
65    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66        self.deadline = deadline;
67        self
68    }
69
70    /// Checks whether the deadline has been exceeded.
71    fn check_deadline(&self) -> Result<()> {
72        #[cfg(not(target_arch = "wasm32"))]
73        if let Some(deadline) = self.deadline
74            && Instant::now() >= deadline
75        {
76            return Err(Error::Query(QueryError::timeout()));
77        }
78        Ok(())
79    }
80
81    /// Executes a physical operator and collects all results.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if operator execution fails or the query timeout is exceeded.
86    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
87        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
88        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
89
90        loop {
91            self.check_deadline()?;
92
93            match operator.next() {
94                Ok(Some(chunk)) => {
95                    // Capture column types from first non-empty chunk
96                    if !types_captured && chunk.column_count() > 0 {
97                        self.capture_column_types(&chunk, &mut result);
98                        types_captured = true;
99                    }
100                    self.collect_chunk(&chunk, &mut result)?;
101                }
102                Ok(None) => break,
103                Err(err) => return Err(convert_operator_error(err)),
104            }
105        }
106
107        Ok(result)
108    }
109
110    /// Executes and returns at most `limit` rows.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if operator execution fails or the query timeout is exceeded.
115    pub fn execute_with_limit(
116        &self,
117        operator: &mut dyn Operator,
118        limit: usize,
119    ) -> Result<QueryResult> {
120        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
121        let mut collected = 0;
122        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
123
124        loop {
125            if collected >= limit {
126                break;
127            }
128
129            self.check_deadline()?;
130
131            match operator.next() {
132                Ok(Some(chunk)) => {
133                    // Capture column types from first non-empty chunk
134                    if !types_captured && chunk.column_count() > 0 {
135                        self.capture_column_types(&chunk, &mut result);
136                        types_captured = true;
137                    }
138                    let remaining = limit - collected;
139                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
140                }
141                Ok(None) => break,
142                Err(err) => return Err(convert_operator_error(err)),
143            }
144        }
145
146        Ok(result)
147    }
148
149    /// Captures column types from a DataChunk.
150    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
151        let col_count = chunk.column_count();
152        result.column_types = Vec::with_capacity(col_count);
153        for col_idx in 0..col_count {
154            let col_type = chunk
155                .column(col_idx)
156                .map_or(LogicalType::Any, |col| col.data_type().clone());
157            result.column_types.push(col_type);
158        }
159    }
160
161    /// Collects all rows from a DataChunk into the result.
162    ///
163    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
164    /// (e.g., after filtering operations).
165    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
166        let col_count = chunk.column_count();
167        let mut collected = 0;
168
169        for row_idx in chunk.selected_indices() {
170            let mut row = Vec::with_capacity(col_count);
171            for col_idx in 0..col_count {
172                let value = chunk
173                    .column(col_idx)
174                    .and_then(|col| col.get_value(row_idx))
175                    .unwrap_or(Value::Null);
176                row.push(value);
177            }
178            result.rows.push(row);
179            collected += 1;
180        }
181
182        Ok(collected)
183    }
184
185    /// Collects up to `limit` rows from a DataChunk.
186    ///
187    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
188    /// (e.g., after filtering operations).
189    fn collect_chunk_limited(
190        &self,
191        chunk: &DataChunk,
192        result: &mut QueryResult,
193        limit: usize,
194    ) -> Result<usize> {
195        let col_count = chunk.column_count();
196        let mut collected = 0;
197
198        for row_idx in chunk.selected_indices() {
199            if collected >= limit {
200                break;
201            }
202            let mut row = Vec::with_capacity(col_count);
203            for col_idx in 0..col_count {
204                let value = chunk
205                    .column(col_idx)
206                    .and_then(|col| col.get_value(row_idx))
207                    .unwrap_or(Value::Null);
208                row.push(value);
209            }
210            result.rows.push(row);
211            collected += 1;
212        }
213
214        Ok(collected)
215    }
216
217    /// Executes a physical operator with adaptive cardinality tracking.
218    ///
219    /// This wraps the operator in a cardinality tracking layer and monitors
220    /// deviation from estimates during execution. The adaptive summary is
221    /// returned alongside the query result.
222    ///
223    /// # Arguments
224    ///
225    /// * `operator` - The root physical operator to execute
226    /// * `adaptive_context` - Context with cardinality estimates from planning
227    /// * `config` - Adaptive execution configuration
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if operator execution fails.
232    pub fn execute_adaptive(
233        &self,
234        operator: Box<dyn Operator>,
235        adaptive_context: Option<AdaptiveContext>,
236        config: &AdaptiveConfig,
237    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
238        // If adaptive is disabled or no context, fall back to normal execution
239        if !config.enabled {
240            let mut op = operator;
241            let result = self.execute(op.as_mut())?;
242            return Ok((result, None));
243        }
244
245        let Some(ctx) = adaptive_context else {
246            let mut op = operator;
247            let result = self.execute(op.as_mut())?;
248            return Ok((result, None));
249        };
250
251        // Create shared context for tracking
252        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
253            config.threshold,
254            config.min_rows,
255        ));
256
257        // Copy estimates from the planning context to the shared tracking context
258        for (op_id, checkpoint) in ctx.all_checkpoints() {
259            if let Some(mut inner) = shared_ctx.snapshot() {
260                inner.set_estimate(op_id, checkpoint.estimated);
261            }
262        }
263
264        // Wrap operator with tracking
265        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
266
267        // Execute with tracking
268        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
269        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
270        let mut total_rows: u64 = 0;
271        let check_interval = config.min_rows;
272
273        loop {
274            self.check_deadline()?;
275
276            match wrapped.next() {
277                Ok(Some(chunk)) => {
278                    let chunk_rows = chunk.row_count();
279                    total_rows += chunk_rows as u64;
280
281                    // Capture column types from first non-empty chunk
282                    if !types_captured && chunk.column_count() > 0 {
283                        self.capture_column_types(&chunk, &mut result);
284                        types_captured = true;
285                    }
286                    self.collect_chunk(&chunk, &mut result)?;
287
288                    // Periodically check for significant deviation
289                    if total_rows >= check_interval
290                        && total_rows.is_multiple_of(check_interval)
291                        && shared_ctx.should_reoptimize()
292                    {
293                        // For now, just log/note that re-optimization would trigger
294                        // Full re-optimization would require plan regeneration
295                        // which is a more invasive change
296                    }
297                }
298                Ok(None) => break,
299                Err(err) => return Err(convert_operator_error(err)),
300            }
301        }
302
303        // Get final summary
304        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
305
306        Ok((result, summary))
307    }
308}
309
310impl Default for Executor {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316/// Converts an operator error to a common error.
317fn convert_operator_error(err: OperatorError) -> Error {
318    match err {
319        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
320        OperatorError::ColumnNotFound(name) => {
321            Error::InvalidValue(format!("Column not found: {name}"))
322        }
323        OperatorError::Execution(msg) => Error::Internal(msg),
324        OperatorError::ConstraintViolation(msg) => {
325            Error::InvalidValue(format!("Constraint violation: {msg}"))
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use grafeo_common::types::LogicalType;
334    use grafeo_core::execution::DataChunk;
335
336    /// A mock operator that generates chunks with integer data on demand.
337    struct MockIntOperator {
338        values: Vec<i64>,
339        position: usize,
340        chunk_size: usize,
341    }
342
343    impl MockIntOperator {
344        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
345            Self {
346                values,
347                position: 0,
348                chunk_size,
349            }
350        }
351    }
352
353    impl Operator for MockIntOperator {
354        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
355            if self.position >= self.values.len() {
356                return Ok(None);
357            }
358
359            let end = (self.position + self.chunk_size).min(self.values.len());
360            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
361
362            {
363                let col = chunk.column_mut(0).unwrap();
364                for i in self.position..end {
365                    col.push_int64(self.values[i]);
366                }
367            }
368            chunk.set_count(end - self.position);
369            self.position = end;
370
371            Ok(Some(chunk))
372        }
373
374        fn reset(&mut self) {
375            self.position = 0;
376        }
377
378        fn name(&self) -> &'static str {
379            "MockInt"
380        }
381    }
382
383    /// Empty mock operator for testing empty results.
384    struct EmptyOperator;
385
386    impl Operator for EmptyOperator {
387        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
388            Ok(None)
389        }
390
391        fn reset(&mut self) {}
392
393        fn name(&self) -> &'static str {
394            "Empty"
395        }
396    }
397
398    #[test]
399    fn test_executor_empty() {
400        let executor = Executor::with_columns(vec!["a".to_string()]);
401        let mut op = EmptyOperator;
402
403        let result = executor.execute(&mut op).unwrap();
404        assert!(result.is_empty());
405        assert_eq!(result.column_count(), 1);
406    }
407
408    #[test]
409    fn test_executor_single_chunk() {
410        let executor = Executor::with_columns(vec!["value".to_string()]);
411        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
412
413        let result = executor.execute(&mut op).unwrap();
414        assert_eq!(result.row_count(), 3);
415        assert_eq!(result.rows[0][0], Value::Int64(1));
416        assert_eq!(result.rows[1][0], Value::Int64(2));
417        assert_eq!(result.rows[2][0], Value::Int64(3));
418    }
419
420    #[test]
421    fn test_executor_with_limit() {
422        let executor = Executor::with_columns(vec!["value".to_string()]);
423        let mut op = MockIntOperator::new((0..10).collect(), 100);
424
425        let result = executor.execute_with_limit(&mut op, 5).unwrap();
426        assert_eq!(result.row_count(), 5);
427    }
428
429    #[test]
430    fn test_executor_timeout_expired() {
431        use std::time::{Duration, Instant};
432
433        // Set a deadline that has already passed
434        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
435            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
436        ));
437        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
438
439        let result = executor.execute(&mut op);
440        assert!(result.is_err());
441        let err = result.unwrap_err();
442        assert!(
443            err.to_string().contains("Query exceeded timeout"),
444            "Expected timeout error, got: {err}"
445        );
446    }
447
448    #[test]
449    fn test_executor_no_timeout() {
450        // No deadline set - should execute normally
451        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
452        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
453
454        let result = executor.execute(&mut op).unwrap();
455        assert_eq!(result.row_count(), 3);
456    }
457}