Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::grafeo_debug_span;
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, QueryError, Result};
17use grafeo_core::execution::operators::{Operator, OperatorError};
18use grafeo_core::execution::{
19    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
20};
21
22/// Executes a physical operator tree and collects results.
23pub struct Executor {
24    /// Column names for the result.
25    columns: Vec<String>,
26    /// Column types for the result.
27    column_types: Vec<LogicalType>,
28    /// Wall-clock deadline after which execution is aborted.
29    deadline: Option<Instant>,
30}
31
32impl Executor {
33    /// Creates a new executor.
34    #[must_use]
35    pub fn new() -> Self {
36        Self {
37            columns: Vec::new(),
38            column_types: Vec::new(),
39            deadline: None,
40        }
41    }
42
43    /// Creates an executor with specified column names.
44    #[must_use]
45    pub fn with_columns(columns: Vec<String>) -> Self {
46        let len = columns.len();
47        Self {
48            columns,
49            column_types: vec![LogicalType::Any; len],
50            deadline: None,
51        }
52    }
53
54    /// Creates an executor with specified column names and types.
55    #[must_use]
56    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
57        Self {
58            columns,
59            column_types,
60            deadline: None,
61        }
62    }
63
64    /// Sets a wall-clock deadline for query execution.
65    #[must_use]
66    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
67        self.deadline = deadline;
68        self
69    }
70
71    /// Checks whether the deadline has been exceeded.
72    fn check_deadline(&self) -> Result<()> {
73        #[cfg(not(target_arch = "wasm32"))]
74        if let Some(deadline) = self.deadline
75            && Instant::now() >= deadline
76        {
77            return Err(Error::Query(QueryError::timeout()));
78        }
79        Ok(())
80    }
81
82    /// Executes a physical operator and collects all results.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if operator execution fails or the query timeout is exceeded.
87    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
88        let _span = grafeo_debug_span!("grafeo::query::execute");
89        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
90        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
91
92        loop {
93            self.check_deadline()?;
94
95            match operator.next() {
96                Ok(Some(chunk)) => {
97                    // Capture column types from first non-empty chunk
98                    if !types_captured && chunk.column_count() > 0 {
99                        self.capture_column_types(&chunk, &mut result);
100                        types_captured = true;
101                    }
102                    self.collect_chunk(&chunk, &mut result)?;
103                }
104                Ok(None) => break,
105                Err(err) => return Err(convert_operator_error(err)),
106            }
107        }
108
109        Ok(result)
110    }
111
112    /// Executes and returns at most `limit` rows.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if operator execution fails or the query timeout is exceeded.
117    pub fn execute_with_limit(
118        &self,
119        operator: &mut dyn Operator,
120        limit: usize,
121    ) -> Result<QueryResult> {
122        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
123        let mut collected = 0;
124        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
125
126        loop {
127            if collected >= limit {
128                break;
129            }
130
131            self.check_deadline()?;
132
133            match operator.next() {
134                Ok(Some(chunk)) => {
135                    // Capture column types from first non-empty chunk
136                    if !types_captured && chunk.column_count() > 0 {
137                        self.capture_column_types(&chunk, &mut result);
138                        types_captured = true;
139                    }
140                    let remaining = limit - collected;
141                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
142                }
143                Ok(None) => break,
144                Err(err) => return Err(convert_operator_error(err)),
145            }
146        }
147
148        Ok(result)
149    }
150
151    /// Captures column types from a DataChunk.
152    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
153        let col_count = chunk.column_count();
154        result.column_types = Vec::with_capacity(col_count);
155        for col_idx in 0..col_count {
156            let col_type = chunk
157                .column(col_idx)
158                .map_or(LogicalType::Any, |col| col.data_type().clone());
159            result.column_types.push(col_type);
160        }
161    }
162
163    /// Collects all rows from a DataChunk into the result.
164    ///
165    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
166    /// (e.g., after filtering operations).
167    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
168        let col_count = chunk.column_count();
169        let mut collected = 0;
170
171        for row_idx in chunk.selected_indices() {
172            let mut row = Vec::with_capacity(col_count);
173            for col_idx in 0..col_count {
174                let value = chunk
175                    .column(col_idx)
176                    .and_then(|col| col.get_value(row_idx))
177                    .unwrap_or(Value::Null);
178                row.push(value);
179            }
180            result.rows.push(row);
181            collected += 1;
182        }
183
184        Ok(collected)
185    }
186
187    /// Collects up to `limit` rows from a DataChunk.
188    ///
189    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
190    /// (e.g., after filtering operations).
191    fn collect_chunk_limited(
192        &self,
193        chunk: &DataChunk,
194        result: &mut QueryResult,
195        limit: usize,
196    ) -> Result<usize> {
197        let col_count = chunk.column_count();
198        let mut collected = 0;
199
200        for row_idx in chunk.selected_indices() {
201            if collected >= limit {
202                break;
203            }
204            let mut row = Vec::with_capacity(col_count);
205            for col_idx in 0..col_count {
206                let value = chunk
207                    .column(col_idx)
208                    .and_then(|col| col.get_value(row_idx))
209                    .unwrap_or(Value::Null);
210                row.push(value);
211            }
212            result.rows.push(row);
213            collected += 1;
214        }
215
216        Ok(collected)
217    }
218
219    /// Executes a physical operator with adaptive cardinality tracking.
220    ///
221    /// This wraps the operator in a cardinality tracking layer and monitors
222    /// deviation from estimates during execution. The adaptive summary is
223    /// returned alongside the query result.
224    ///
225    /// # Arguments
226    ///
227    /// * `operator` - The root physical operator to execute
228    /// * `adaptive_context` - Context with cardinality estimates from planning
229    /// * `config` - Adaptive execution configuration
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if operator execution fails.
234    pub fn execute_adaptive(
235        &self,
236        operator: Box<dyn Operator>,
237        adaptive_context: Option<AdaptiveContext>,
238        config: &AdaptiveConfig,
239    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
240        // If adaptive is disabled or no context, fall back to normal execution
241        if !config.enabled {
242            let mut op = operator;
243            let result = self.execute(op.as_mut())?;
244            return Ok((result, None));
245        }
246
247        let Some(ctx) = adaptive_context else {
248            let mut op = operator;
249            let result = self.execute(op.as_mut())?;
250            return Ok((result, None));
251        };
252
253        // Create shared context for tracking
254        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
255            config.threshold,
256            config.min_rows,
257        ));
258
259        // Copy estimates from the planning context to the shared tracking context
260        for (op_id, checkpoint) in ctx.all_checkpoints() {
261            if let Some(mut inner) = shared_ctx.snapshot() {
262                inner.set_estimate(op_id, checkpoint.estimated);
263            }
264        }
265
266        // Wrap operator with tracking
267        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
268
269        // Execute with tracking
270        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
271        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
272        let mut total_rows: u64 = 0;
273        let check_interval = config.min_rows;
274
275        loop {
276            self.check_deadline()?;
277
278            match wrapped.next() {
279                Ok(Some(chunk)) => {
280                    let chunk_rows = chunk.row_count();
281                    total_rows += chunk_rows as u64;
282
283                    // Capture column types from first non-empty chunk
284                    if !types_captured && chunk.column_count() > 0 {
285                        self.capture_column_types(&chunk, &mut result);
286                        types_captured = true;
287                    }
288                    self.collect_chunk(&chunk, &mut result)?;
289
290                    // Periodically check for significant deviation
291                    if total_rows >= check_interval
292                        && total_rows.is_multiple_of(check_interval)
293                        && shared_ctx.should_reoptimize()
294                    {
295                        // For now, just log/note that re-optimization would trigger
296                        // Full re-optimization would require plan regeneration
297                        // which is a more invasive change
298                    }
299                }
300                Ok(None) => break,
301                Err(err) => return Err(convert_operator_error(err)),
302            }
303        }
304
305        // Get final summary
306        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
307
308        Ok((result, summary))
309    }
310}
311
312impl Default for Executor {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318/// Converts an operator error to a common error.
319fn convert_operator_error(err: OperatorError) -> Error {
320    match err {
321        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
322        OperatorError::ColumnNotFound(name) => {
323            Error::InvalidValue(format!("Column not found: {name}"))
324        }
325        OperatorError::Execution(msg) => Error::Internal(msg),
326        OperatorError::ConstraintViolation(msg) => {
327            Error::InvalidValue(format!("Constraint violation: {msg}"))
328        }
329        OperatorError::WriteConflict(msg) => {
330            Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
331        }
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use grafeo_common::types::LogicalType;
339    use grafeo_core::execution::DataChunk;
340
341    /// A mock operator that generates chunks with integer data on demand.
342    struct MockIntOperator {
343        values: Vec<i64>,
344        position: usize,
345        chunk_size: usize,
346    }
347
348    impl MockIntOperator {
349        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
350            Self {
351                values,
352                position: 0,
353                chunk_size,
354            }
355        }
356    }
357
358    impl Operator for MockIntOperator {
359        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
360            if self.position >= self.values.len() {
361                return Ok(None);
362            }
363
364            let end = (self.position + self.chunk_size).min(self.values.len());
365            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
366
367            {
368                let col = chunk.column_mut(0).unwrap();
369                for i in self.position..end {
370                    col.push_int64(self.values[i]);
371                }
372            }
373            chunk.set_count(end - self.position);
374            self.position = end;
375
376            Ok(Some(chunk))
377        }
378
379        fn reset(&mut self) {
380            self.position = 0;
381        }
382
383        fn name(&self) -> &'static str {
384            "MockInt"
385        }
386    }
387
388    /// Empty mock operator for testing empty results.
389    struct EmptyOperator;
390
391    impl Operator for EmptyOperator {
392        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
393            Ok(None)
394        }
395
396        fn reset(&mut self) {}
397
398        fn name(&self) -> &'static str {
399            "Empty"
400        }
401    }
402
403    #[test]
404    fn test_executor_empty() {
405        let executor = Executor::with_columns(vec!["a".to_string()]);
406        let mut op = EmptyOperator;
407
408        let result = executor.execute(&mut op).unwrap();
409        assert!(result.is_empty());
410        assert_eq!(result.column_count(), 1);
411    }
412
413    #[test]
414    fn test_executor_single_chunk() {
415        let executor = Executor::with_columns(vec!["value".to_string()]);
416        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
417
418        let result = executor.execute(&mut op).unwrap();
419        assert_eq!(result.row_count(), 3);
420        assert_eq!(result.rows[0][0], Value::Int64(1));
421        assert_eq!(result.rows[1][0], Value::Int64(2));
422        assert_eq!(result.rows[2][0], Value::Int64(3));
423    }
424
425    #[test]
426    fn test_executor_with_limit() {
427        let executor = Executor::with_columns(vec!["value".to_string()]);
428        let mut op = MockIntOperator::new((0..10).collect(), 100);
429
430        let result = executor.execute_with_limit(&mut op, 5).unwrap();
431        assert_eq!(result.row_count(), 5);
432    }
433
434    #[test]
435    fn test_executor_timeout_expired() {
436        use std::time::{Duration, Instant};
437
438        // Set a deadline that has already passed
439        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
440            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
441        ));
442        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
443
444        let result = executor.execute(&mut op);
445        assert!(result.is_err());
446        let err = result.unwrap_err();
447        assert!(
448            err.to_string().contains("Query exceeded timeout"),
449            "Expected timeout error, got: {err}"
450        );
451    }
452
453    #[test]
454    fn test_executor_no_timeout() {
455        // No deadline set - should execute normally
456        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
457        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
458
459        let result = executor.execute(&mut op).unwrap();
460        assert_eq!(result.row_count(), 3);
461    }
462}