Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(feature = "algos")]
8pub mod user_procedure;
9
10use std::time::Instant;
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::types::{LogicalType, Value};
15use grafeo_common::utils::error::{Error, QueryError, Result};
16use grafeo_core::execution::operators::{Operator, OperatorError};
17use grafeo_core::execution::{
18    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
19};
20
21/// Executes a physical operator tree and collects results.
22pub struct Executor {
23    /// Column names for the result.
24    columns: Vec<String>,
25    /// Column types for the result.
26    column_types: Vec<LogicalType>,
27    /// Wall-clock deadline after which execution is aborted.
28    deadline: Option<Instant>,
29}
30
31impl Executor {
32    /// Creates a new executor.
33    #[must_use]
34    pub fn new() -> Self {
35        Self {
36            columns: Vec::new(),
37            column_types: Vec::new(),
38            deadline: None,
39        }
40    }
41
42    /// Creates an executor with specified column names.
43    #[must_use]
44    pub fn with_columns(columns: Vec<String>) -> Self {
45        let len = columns.len();
46        Self {
47            columns,
48            column_types: vec![LogicalType::Any; len],
49            deadline: None,
50        }
51    }
52
53    /// Creates an executor with specified column names and types.
54    #[must_use]
55    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
56        Self {
57            columns,
58            column_types,
59            deadline: None,
60        }
61    }
62
63    /// Sets a wall-clock deadline for query execution.
64    #[must_use]
65    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
66        self.deadline = deadline;
67        self
68    }
69
70    /// Checks whether the deadline has been exceeded.
71    fn check_deadline(&self) -> Result<()> {
72        #[cfg(not(target_arch = "wasm32"))]
73        if let Some(deadline) = self.deadline
74            && Instant::now() >= deadline
75        {
76            return Err(Error::Query(QueryError::timeout()));
77        }
78        Ok(())
79    }
80
81    /// Executes a physical operator and collects all results.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if operator execution fails or the query timeout is exceeded.
86    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
87        let _span = tracing::debug_span!("grafeo::query::execute").entered();
88        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
89        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
90
91        loop {
92            self.check_deadline()?;
93
94            match operator.next() {
95                Ok(Some(chunk)) => {
96                    // Capture column types from first non-empty chunk
97                    if !types_captured && chunk.column_count() > 0 {
98                        self.capture_column_types(&chunk, &mut result);
99                        types_captured = true;
100                    }
101                    self.collect_chunk(&chunk, &mut result)?;
102                }
103                Ok(None) => break,
104                Err(err) => return Err(convert_operator_error(err)),
105            }
106        }
107
108        Ok(result)
109    }
110
111    /// Executes and returns at most `limit` rows.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if operator execution fails or the query timeout is exceeded.
116    pub fn execute_with_limit(
117        &self,
118        operator: &mut dyn Operator,
119        limit: usize,
120    ) -> Result<QueryResult> {
121        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
122        let mut collected = 0;
123        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
124
125        loop {
126            if collected >= limit {
127                break;
128            }
129
130            self.check_deadline()?;
131
132            match operator.next() {
133                Ok(Some(chunk)) => {
134                    // Capture column types from first non-empty chunk
135                    if !types_captured && chunk.column_count() > 0 {
136                        self.capture_column_types(&chunk, &mut result);
137                        types_captured = true;
138                    }
139                    let remaining = limit - collected;
140                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
141                }
142                Ok(None) => break,
143                Err(err) => return Err(convert_operator_error(err)),
144            }
145        }
146
147        Ok(result)
148    }
149
150    /// Captures column types from a DataChunk.
151    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
152        let col_count = chunk.column_count();
153        result.column_types = Vec::with_capacity(col_count);
154        for col_idx in 0..col_count {
155            let col_type = chunk
156                .column(col_idx)
157                .map_or(LogicalType::Any, |col| col.data_type().clone());
158            result.column_types.push(col_type);
159        }
160    }
161
162    /// Collects all rows from a DataChunk into the result.
163    ///
164    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
165    /// (e.g., after filtering operations).
166    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
167        let col_count = chunk.column_count();
168        let mut collected = 0;
169
170        for row_idx in chunk.selected_indices() {
171            let mut row = Vec::with_capacity(col_count);
172            for col_idx in 0..col_count {
173                let value = chunk
174                    .column(col_idx)
175                    .and_then(|col| col.get_value(row_idx))
176                    .unwrap_or(Value::Null);
177                row.push(value);
178            }
179            result.rows.push(row);
180            collected += 1;
181        }
182
183        Ok(collected)
184    }
185
186    /// Collects up to `limit` rows from a DataChunk.
187    ///
188    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
189    /// (e.g., after filtering operations).
190    fn collect_chunk_limited(
191        &self,
192        chunk: &DataChunk,
193        result: &mut QueryResult,
194        limit: usize,
195    ) -> Result<usize> {
196        let col_count = chunk.column_count();
197        let mut collected = 0;
198
199        for row_idx in chunk.selected_indices() {
200            if collected >= limit {
201                break;
202            }
203            let mut row = Vec::with_capacity(col_count);
204            for col_idx in 0..col_count {
205                let value = chunk
206                    .column(col_idx)
207                    .and_then(|col| col.get_value(row_idx))
208                    .unwrap_or(Value::Null);
209                row.push(value);
210            }
211            result.rows.push(row);
212            collected += 1;
213        }
214
215        Ok(collected)
216    }
217
218    /// Executes a physical operator with adaptive cardinality tracking.
219    ///
220    /// This wraps the operator in a cardinality tracking layer and monitors
221    /// deviation from estimates during execution. The adaptive summary is
222    /// returned alongside the query result.
223    ///
224    /// # Arguments
225    ///
226    /// * `operator` - The root physical operator to execute
227    /// * `adaptive_context` - Context with cardinality estimates from planning
228    /// * `config` - Adaptive execution configuration
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if operator execution fails.
233    pub fn execute_adaptive(
234        &self,
235        operator: Box<dyn Operator>,
236        adaptive_context: Option<AdaptiveContext>,
237        config: &AdaptiveConfig,
238    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
239        // If adaptive is disabled or no context, fall back to normal execution
240        if !config.enabled {
241            let mut op = operator;
242            let result = self.execute(op.as_mut())?;
243            return Ok((result, None));
244        }
245
246        let Some(ctx) = adaptive_context else {
247            let mut op = operator;
248            let result = self.execute(op.as_mut())?;
249            return Ok((result, None));
250        };
251
252        // Create shared context for tracking
253        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
254            config.threshold,
255            config.min_rows,
256        ));
257
258        // Copy estimates from the planning context to the shared tracking context
259        for (op_id, checkpoint) in ctx.all_checkpoints() {
260            if let Some(mut inner) = shared_ctx.snapshot() {
261                inner.set_estimate(op_id, checkpoint.estimated);
262            }
263        }
264
265        // Wrap operator with tracking
266        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
267
268        // Execute with tracking
269        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
270        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
271        let mut total_rows: u64 = 0;
272        let check_interval = config.min_rows;
273
274        loop {
275            self.check_deadline()?;
276
277            match wrapped.next() {
278                Ok(Some(chunk)) => {
279                    let chunk_rows = chunk.row_count();
280                    total_rows += chunk_rows as u64;
281
282                    // Capture column types from first non-empty chunk
283                    if !types_captured && chunk.column_count() > 0 {
284                        self.capture_column_types(&chunk, &mut result);
285                        types_captured = true;
286                    }
287                    self.collect_chunk(&chunk, &mut result)?;
288
289                    // Periodically check for significant deviation
290                    if total_rows >= check_interval
291                        && total_rows.is_multiple_of(check_interval)
292                        && shared_ctx.should_reoptimize()
293                    {
294                        // For now, just log/note that re-optimization would trigger
295                        // Full re-optimization would require plan regeneration
296                        // which is a more invasive change
297                    }
298                }
299                Ok(None) => break,
300                Err(err) => return Err(convert_operator_error(err)),
301            }
302        }
303
304        // Get final summary
305        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
306
307        Ok((result, summary))
308    }
309}
310
311impl Default for Executor {
312    fn default() -> Self {
313        Self::new()
314    }
315}
316
317/// Converts an operator error to a common error.
318fn convert_operator_error(err: OperatorError) -> Error {
319    match err {
320        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
321        OperatorError::ColumnNotFound(name) => {
322            Error::InvalidValue(format!("Column not found: {name}"))
323        }
324        OperatorError::Execution(msg) => Error::Internal(msg),
325        OperatorError::ConstraintViolation(msg) => {
326            Error::InvalidValue(format!("Constraint violation: {msg}"))
327        }
328        OperatorError::WriteConflict(msg) => {
329            Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use grafeo_common::types::LogicalType;
338    use grafeo_core::execution::DataChunk;
339
340    /// A mock operator that generates chunks with integer data on demand.
341    struct MockIntOperator {
342        values: Vec<i64>,
343        position: usize,
344        chunk_size: usize,
345    }
346
347    impl MockIntOperator {
348        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
349            Self {
350                values,
351                position: 0,
352                chunk_size,
353            }
354        }
355    }
356
357    impl Operator for MockIntOperator {
358        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
359            if self.position >= self.values.len() {
360                return Ok(None);
361            }
362
363            let end = (self.position + self.chunk_size).min(self.values.len());
364            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
365
366            {
367                let col = chunk.column_mut(0).unwrap();
368                for i in self.position..end {
369                    col.push_int64(self.values[i]);
370                }
371            }
372            chunk.set_count(end - self.position);
373            self.position = end;
374
375            Ok(Some(chunk))
376        }
377
378        fn reset(&mut self) {
379            self.position = 0;
380        }
381
382        fn name(&self) -> &'static str {
383            "MockInt"
384        }
385    }
386
387    /// Empty mock operator for testing empty results.
388    struct EmptyOperator;
389
390    impl Operator for EmptyOperator {
391        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
392            Ok(None)
393        }
394
395        fn reset(&mut self) {}
396
397        fn name(&self) -> &'static str {
398            "Empty"
399        }
400    }
401
402    #[test]
403    fn test_executor_empty() {
404        let executor = Executor::with_columns(vec!["a".to_string()]);
405        let mut op = EmptyOperator;
406
407        let result = executor.execute(&mut op).unwrap();
408        assert!(result.is_empty());
409        assert_eq!(result.column_count(), 1);
410    }
411
412    #[test]
413    fn test_executor_single_chunk() {
414        let executor = Executor::with_columns(vec!["value".to_string()]);
415        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
416
417        let result = executor.execute(&mut op).unwrap();
418        assert_eq!(result.row_count(), 3);
419        assert_eq!(result.rows[0][0], Value::Int64(1));
420        assert_eq!(result.rows[1][0], Value::Int64(2));
421        assert_eq!(result.rows[2][0], Value::Int64(3));
422    }
423
424    #[test]
425    fn test_executor_with_limit() {
426        let executor = Executor::with_columns(vec!["value".to_string()]);
427        let mut op = MockIntOperator::new((0..10).collect(), 100);
428
429        let result = executor.execute_with_limit(&mut op, 5).unwrap();
430        assert_eq!(result.row_count(), 5);
431    }
432
433    #[test]
434    fn test_executor_timeout_expired() {
435        use std::time::{Duration, Instant};
436
437        // Set a deadline that has already passed
438        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
439            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
440        ));
441        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
442
443        let result = executor.execute(&mut op);
444        assert!(result.is_err());
445        let err = result.unwrap_err();
446        assert!(
447            err.to_string().contains("Query exceeded timeout"),
448            "Expected timeout error, got: {err}"
449        );
450    }
451
452    #[test]
453    fn test_executor_no_timeout() {
454        // No deadline set - should execute normally
455        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
456        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
457
458        let result = executor.execute(&mut op).unwrap();
459        assert_eq!(result.row_count(), 3);
460    }
461}