Skip to main content

grafeo_engine/query/executor/
mod.rs

1//! Query executor.
2//!
3//! Executes physical plans and produces results.
4
5#[cfg(feature = "algos")]
6pub mod procedure_call;
7#[cfg(all(feature = "algos", feature = "gql"))]
8pub mod user_procedure;
9
10use std::time::{Duration, Instant};
11
12use crate::config::AdaptiveConfig;
13use crate::database::QueryResult;
14use grafeo_common::grafeo_debug_span;
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, QueryError, Result};
17use grafeo_core::execution::operators::{Operator, OperatorError};
18use grafeo_core::execution::{
19    AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, Pipeline,
20    SharedAdaptiveContext,
21};
22
23/// Executes a physical operator tree and collects results.
24pub struct Executor {
25    /// Column names for the result.
26    columns: Vec<String>,
27    /// Column types for the result.
28    column_types: Vec<LogicalType>,
29    /// Wall-clock deadline after which execution is aborted.
30    deadline: Option<Instant>,
31    /// The configured timeout duration (for error messages).
32    query_timeout: Option<Duration>,
33}
34
35impl Executor {
36    /// Creates a new executor.
37    #[must_use]
38    pub fn new() -> Self {
39        Self {
40            columns: Vec::new(),
41            column_types: Vec::new(),
42            deadline: None,
43            query_timeout: None,
44        }
45    }
46
47    /// Creates an executor with specified column names.
48    #[must_use]
49    pub fn with_columns(columns: Vec<String>) -> Self {
50        let len = columns.len();
51        Self {
52            columns,
53            column_types: vec![LogicalType::Any; len],
54            deadline: None,
55            query_timeout: None,
56        }
57    }
58
59    /// Creates an executor with specified column names and types.
60    #[must_use]
61    pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
62        Self {
63            columns,
64            column_types,
65            deadline: None,
66            query_timeout: None,
67        }
68    }
69
70    /// Sets a wall-clock deadline for query execution.
71    #[must_use]
72    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
73        self.deadline = deadline;
74        self
75    }
76
77    /// Sets the original timeout duration (used for error messages).
78    #[must_use]
79    pub fn with_timeout_duration(mut self, timeout: Option<Duration>) -> Self {
80        self.query_timeout = timeout;
81        self
82    }
83
84    /// Checks whether the deadline has been exceeded.
85    fn check_deadline(&self) -> Result<()> {
86        #[cfg(not(target_arch = "wasm32"))]
87        if let Some(deadline) = self.deadline
88            && Instant::now() >= deadline
89        {
90            return Err(Error::Query(match self.query_timeout {
91                Some(d) => QueryError::timeout_with_limit(d),
92                None => QueryError::timeout(),
93            }));
94        }
95        Ok(())
96    }
97
98    /// Executes a physical operator and collects all results.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if operator execution fails or the query timeout is exceeded.
103    pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
104        let _span = grafeo_debug_span!("grafeo::query::execute");
105        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
106        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
107
108        loop {
109            self.check_deadline()?;
110
111            match operator.next() {
112                Ok(Some(chunk)) => {
113                    // Capture column types from first non-empty chunk
114                    if !types_captured && chunk.column_count() > 0 {
115                        self.capture_column_types(&chunk, &mut result);
116                        types_captured = true;
117                    }
118                    self.collect_chunk(&chunk, &mut result)?;
119                }
120                Ok(None) => break,
121                Err(err) => return Err(convert_operator_error(err)),
122            }
123        }
124
125        Ok(result)
126    }
127
128    /// Executes a push-based pipeline.
129    ///
130    /// The source operator is wrapped in `OperatorSource`, push operators form
131    /// the pipeline body, and a `ChunkCollector` gathers results.
132    ///
133    /// # Panics
134    ///
135    /// Panics if the internal sink downcast fails (should never happen since we
136    /// create the `ChunkCollector` ourselves).
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if pipeline execution fails or the query timeout is exceeded.
141    pub fn execute_pipeline(
142        &self,
143        source: Box<dyn Operator>,
144        push_ops: Vec<Box<dyn grafeo_core::execution::pipeline::PushOperator>>,
145    ) -> Result<QueryResult> {
146        use grafeo_core::execution::{ChunkCollector, OperatorSource};
147
148        let _span = grafeo_debug_span!("grafeo::query::execute_pipeline");
149
150        let source = Box::new(OperatorSource::new(source));
151        let collector = ChunkCollector::new();
152
153        // Build and execute the pipeline with deadline enforcement
154        let mut pipeline = Pipeline::new(source, push_ops, Box::new(collector));
155        pipeline.set_deadline(self.deadline);
156        pipeline.execute().map_err(convert_operator_error)?;
157
158        // Extract the sink (ChunkCollector) and get the chunks
159        // Safety: we know the sink is a ChunkCollector because we just created it
160        let sink_box = pipeline.into_sink();
161        let any_sink: Box<dyn std::any::Any> = sink_box.into_any();
162        let collector = any_sink
163            .downcast::<ChunkCollector>()
164            .expect("sink should be ChunkCollector");
165        let chunks = collector.into_chunks();
166
167        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
168        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
169
170        for chunk in &chunks {
171            if !types_captured && chunk.column_count() > 0 {
172                self.capture_column_types(chunk, &mut result);
173                types_captured = true;
174            }
175            self.collect_chunk(chunk, &mut result)?;
176        }
177
178        Ok(result)
179    }
180
181    /// Executes and returns at most `limit` rows.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if operator execution fails or the query timeout is exceeded.
186    pub fn execute_with_limit(
187        &self,
188        operator: &mut dyn Operator,
189        limit: usize,
190    ) -> Result<QueryResult> {
191        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
192        let mut collected = 0;
193        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
194
195        loop {
196            if collected >= limit {
197                break;
198            }
199
200            self.check_deadline()?;
201
202            match operator.next() {
203                Ok(Some(chunk)) => {
204                    // Capture column types from first non-empty chunk
205                    if !types_captured && chunk.column_count() > 0 {
206                        self.capture_column_types(&chunk, &mut result);
207                        types_captured = true;
208                    }
209                    let remaining = limit - collected;
210                    collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
211                }
212                Ok(None) => break,
213                Err(err) => return Err(convert_operator_error(err)),
214            }
215        }
216
217        Ok(result)
218    }
219
220    /// Captures column types from a DataChunk.
221    fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
222        let col_count = chunk.column_count();
223        result.column_types = Vec::with_capacity(col_count);
224        for col_idx in 0..col_count {
225            let col_type = chunk
226                .column(col_idx)
227                .map_or(LogicalType::Any, |col| col.data_type().clone());
228            result.column_types.push(col_type);
229        }
230    }
231
232    /// Collects all rows from a DataChunk into the result.
233    ///
234    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
235    /// (e.g., after filtering operations).
236    fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
237        let col_count = chunk.column_count();
238        let mut collected = 0;
239
240        for row_idx in chunk.selected_indices() {
241            let mut row = Vec::with_capacity(col_count);
242            for col_idx in 0..col_count {
243                let value = chunk
244                    .column(col_idx)
245                    .and_then(|col| col.get_value(row_idx))
246                    .unwrap_or(Value::Null);
247                row.push(value);
248            }
249            result.rows.push(row);
250            collected += 1;
251        }
252
253        Ok(collected)
254    }
255
256    /// Collects up to `limit` rows from a DataChunk.
257    ///
258    /// Uses `selected_indices()` to correctly handle chunks with selection vectors
259    /// (e.g., after filtering operations).
260    fn collect_chunk_limited(
261        &self,
262        chunk: &DataChunk,
263        result: &mut QueryResult,
264        limit: usize,
265    ) -> Result<usize> {
266        let col_count = chunk.column_count();
267        let mut collected = 0;
268
269        for row_idx in chunk.selected_indices() {
270            if collected >= limit {
271                break;
272            }
273            let mut row = Vec::with_capacity(col_count);
274            for col_idx in 0..col_count {
275                let value = chunk
276                    .column(col_idx)
277                    .and_then(|col| col.get_value(row_idx))
278                    .unwrap_or(Value::Null);
279                row.push(value);
280            }
281            result.rows.push(row);
282            collected += 1;
283        }
284
285        Ok(collected)
286    }
287
288    /// Executes a physical operator with adaptive cardinality tracking.
289    ///
290    /// This wraps the operator in a cardinality tracking layer and monitors
291    /// deviation from estimates during execution. The adaptive summary is
292    /// returned alongside the query result.
293    ///
294    /// # Arguments
295    ///
296    /// * `operator` - The root physical operator to execute
297    /// * `adaptive_context` - Context with cardinality estimates from planning
298    /// * `config` - Adaptive execution configuration
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if operator execution fails.
303    pub fn execute_adaptive(
304        &self,
305        operator: Box<dyn Operator>,
306        adaptive_context: Option<AdaptiveContext>,
307        config: &AdaptiveConfig,
308    ) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
309        // If adaptive is disabled or no context, fall back to normal execution
310        if !config.enabled {
311            let mut op = operator;
312            let result = self.execute(op.as_mut())?;
313            return Ok((result, None));
314        }
315
316        let Some(ctx) = adaptive_context else {
317            let mut op = operator;
318            let result = self.execute(op.as_mut())?;
319            return Ok((result, None));
320        };
321
322        // Create shared context for tracking
323        let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
324            config.threshold,
325            config.min_rows,
326        ));
327
328        // Copy estimates from the planning context to the shared tracking context
329        for (op_id, checkpoint) in ctx.all_checkpoints() {
330            if let Some(mut inner) = shared_ctx.snapshot() {
331                inner.set_estimate(op_id, checkpoint.estimated);
332            }
333        }
334
335        // Wrap operator with tracking
336        let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
337
338        // Execute with tracking
339        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
340        let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
341        let mut total_rows: u64 = 0;
342        let check_interval = config.min_rows;
343
344        loop {
345            self.check_deadline()?;
346
347            match wrapped.next() {
348                Ok(Some(chunk)) => {
349                    let chunk_rows = chunk.row_count();
350                    total_rows += chunk_rows as u64;
351
352                    // Capture column types from first non-empty chunk
353                    if !types_captured && chunk.column_count() > 0 {
354                        self.capture_column_types(&chunk, &mut result);
355                        types_captured = true;
356                    }
357                    self.collect_chunk(&chunk, &mut result)?;
358
359                    // Periodically check for significant deviation
360                    if total_rows >= check_interval
361                        && total_rows.is_multiple_of(check_interval)
362                        && shared_ctx.should_reoptimize()
363                    {
364                        // For now, just log/note that re-optimization would trigger
365                        // Full re-optimization would require plan regeneration
366                        // which is a more invasive change
367                    }
368                }
369                Ok(None) => break,
370                Err(err) => return Err(convert_operator_error(err)),
371            }
372        }
373
374        // Get final summary
375        let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
376
377        Ok((result, summary))
378    }
379}
380
381impl Default for Executor {
382    fn default() -> Self {
383        Self::new()
384    }
385}
386
387/// Converts an operator error to a common error.
388fn convert_operator_error(err: OperatorError) -> Error {
389    match err {
390        OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
391        OperatorError::ColumnNotFound(name) => {
392            Error::InvalidValue(format!("Column not found: {name}"))
393        }
394        OperatorError::Execution(msg) => Error::Internal(msg),
395        OperatorError::ConstraintViolation(msg) => {
396            Error::InvalidValue(format!("Constraint violation: {msg}"))
397        }
398        OperatorError::WriteConflict(msg) => {
399            Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
400        }
401        _ => Error::Internal(format!("{err}")),
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use grafeo_common::types::LogicalType;
409    use grafeo_core::execution::DataChunk;
410
411    /// A mock operator that generates chunks with integer data on demand.
412    struct MockIntOperator {
413        values: Vec<i64>,
414        position: usize,
415        chunk_size: usize,
416    }
417
418    impl MockIntOperator {
419        fn new(values: Vec<i64>, chunk_size: usize) -> Self {
420            Self {
421                values,
422                position: 0,
423                chunk_size,
424            }
425        }
426    }
427
428    impl Operator for MockIntOperator {
429        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
430            if self.position >= self.values.len() {
431                return Ok(None);
432            }
433
434            let end = (self.position + self.chunk_size).min(self.values.len());
435            let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
436
437            {
438                let col = chunk.column_mut(0).unwrap();
439                for i in self.position..end {
440                    col.push_int64(self.values[i]);
441                }
442            }
443            chunk.set_count(end - self.position);
444            self.position = end;
445
446            Ok(Some(chunk))
447        }
448
449        fn reset(&mut self) {
450            self.position = 0;
451        }
452
453        fn name(&self) -> &'static str {
454            "MockInt"
455        }
456
457        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
458            self
459        }
460    }
461
462    /// Empty mock operator for testing empty results.
463    struct EmptyOperator;
464
465    impl Operator for EmptyOperator {
466        fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
467            Ok(None)
468        }
469
470        fn reset(&mut self) {}
471
472        fn name(&self) -> &'static str {
473            "Empty"
474        }
475
476        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
477            self
478        }
479    }
480
481    #[test]
482    fn test_executor_empty() {
483        let executor = Executor::with_columns(vec!["a".to_string()]);
484        let mut op = EmptyOperator;
485
486        let result = executor.execute(&mut op).unwrap();
487        assert!(result.is_empty());
488        assert_eq!(result.column_count(), 1);
489    }
490
491    #[test]
492    fn test_executor_single_chunk() {
493        let executor = Executor::with_columns(vec!["value".to_string()]);
494        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
495
496        let result = executor.execute(&mut op).unwrap();
497        assert_eq!(result.row_count(), 3);
498        assert_eq!(result.rows[0][0], Value::Int64(1));
499        assert_eq!(result.rows[1][0], Value::Int64(2));
500        assert_eq!(result.rows[2][0], Value::Int64(3));
501    }
502
503    #[test]
504    fn test_executor_with_limit() {
505        let executor = Executor::with_columns(vec!["value".to_string()]);
506        let mut op = MockIntOperator::new((0..10).collect(), 100);
507
508        let result = executor.execute_with_limit(&mut op, 5).unwrap();
509        assert_eq!(result.row_count(), 5);
510    }
511
512    #[test]
513    fn test_executor_timeout_expired() {
514        use std::time::{Duration, Instant};
515
516        // Set a deadline that has already passed
517        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
518            Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
519        ));
520        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
521
522        let result = executor.execute(&mut op);
523        assert!(result.is_err());
524        let err = result.unwrap_err();
525        assert!(
526            err.to_string().contains("Query exceeded timeout"),
527            "Expected timeout error, got: {err}"
528        );
529    }
530
531    #[test]
532    fn test_executor_no_timeout() {
533        // No deadline set - should execute normally
534        let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
535        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
536
537        let result = executor.execute(&mut op).unwrap();
538        assert_eq!(result.row_count(), 3);
539    }
540
541    #[test]
542    fn test_executor_type_capture_from_first_chunk() {
543        // When column_types are all Any, types should be captured from the first
544        // non-empty chunk.
545        let executor = Executor::with_columns(vec!["value".to_string()]);
546        // column_types starts as [Any] from with_columns
547        let mut op = MockIntOperator::new(vec![42, 99], 10);
548
549        let result = executor.execute(&mut op).unwrap();
550        assert_eq!(result.row_count(), 2);
551        // After execution, column types should be captured as Int64
552        assert_eq!(result.column_types, vec![LogicalType::Int64]);
553    }
554
555    #[test]
556    fn test_executor_type_capture_with_explicit_types() {
557        // When column_types are explicitly set (not all Any), types should NOT be
558        // overwritten from chunks.
559        let executor =
560            Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
561        let mut op = MockIntOperator::new(vec![1], 10);
562
563        let result = executor.execute(&mut op).unwrap();
564        assert_eq!(result.row_count(), 1);
565        // Types should remain as explicitly set (String), not changed to Int64
566        assert_eq!(result.column_types, vec![LogicalType::String]);
567    }
568
569    #[test]
570    fn test_execute_pipeline_basic() {
571        let source = Box::new(MockIntOperator::new(vec![10, 20, 30], 10));
572        let executor = Executor::with_columns(vec!["value".to_string()]);
573
574        let result = executor.execute_pipeline(source, vec![]).unwrap();
575        assert_eq!(result.row_count(), 3);
576        assert_eq!(result.rows[0][0], Value::Int64(10));
577        assert_eq!(result.rows[1][0], Value::Int64(20));
578        assert_eq!(result.rows[2][0], Value::Int64(30));
579    }
580
581    #[test]
582    fn test_execute_pipeline_empty_source() {
583        let source = Box::new(EmptyOperator);
584        let executor = Executor::with_columns(vec!["value".to_string()]);
585
586        let result = executor.execute_pipeline(source, vec![]).unwrap();
587        assert!(result.is_empty());
588    }
589
590    #[test]
591    fn test_execute_pipeline_type_capture() {
592        // Pipeline should capture column types from first non-empty chunk when
593        // column_types are all Any.
594        let source = Box::new(MockIntOperator::new(vec![1, 2], 10));
595        let executor = Executor::with_columns(vec!["value".to_string()]);
596
597        let result = executor.execute_pipeline(source, vec![]).unwrap();
598        assert_eq!(result.column_types, vec![LogicalType::Int64]);
599    }
600
601    #[test]
602    fn test_execute_pipeline_explicit_types_preserved() {
603        // Pipeline should preserve explicitly set column types.
604        let source = Box::new(MockIntOperator::new(vec![1], 10));
605        let executor =
606            Executor::with_columns_and_types(vec!["value".to_string()], vec![LogicalType::String]);
607
608        let result = executor.execute_pipeline(source, vec![]).unwrap();
609        // Explicit types should not be overwritten
610        assert_eq!(result.column_types, vec![LogicalType::String]);
611    }
612
613    #[test]
614    fn test_execute_with_limit_type_capture() {
615        // execute_with_limit should also capture types from first chunk
616        let executor = Executor::with_columns(vec!["value".to_string()]);
617        let mut op = MockIntOperator::new(vec![1, 2, 3, 4, 5], 2);
618
619        let result = executor.execute_with_limit(&mut op, 3).unwrap();
620        assert_eq!(result.row_count(), 3);
621        assert_eq!(result.column_types, vec![LogicalType::Int64]);
622    }
623
624    #[test]
625    fn test_execute_with_limit_timeout_expired() {
626        use std::time::{Duration, Instant};
627
628        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
629        let executor =
630            Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(expired));
631        let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
632
633        let result = executor.execute_with_limit(&mut op, 10);
634        assert!(result.is_err());
635        assert!(
636            result
637                .unwrap_err()
638                .to_string()
639                .contains("Query exceeded timeout")
640        );
641    }
642
643    #[test]
644    fn test_convert_operator_error_variants() {
645        // Test all OperatorError conversion branches
646        let err = convert_operator_error(OperatorError::TypeMismatch {
647            expected: "Int64".to_string(),
648            found: "String".to_string(),
649        });
650        assert!(matches!(err, Error::TypeMismatch { .. }));
651
652        let err = convert_operator_error(OperatorError::ColumnNotFound("col_x".to_string()));
653        assert!(matches!(err, Error::InvalidValue(_)));
654        assert!(err.to_string().contains("col_x"));
655
656        let err = convert_operator_error(OperatorError::Execution("internal issue".to_string()));
657        assert!(matches!(err, Error::Internal(_)));
658
659        let err = convert_operator_error(OperatorError::ConstraintViolation("unique".to_string()));
660        assert!(matches!(err, Error::InvalidValue(_)));
661        assert!(err.to_string().contains("unique"));
662
663        let err =
664            convert_operator_error(OperatorError::WriteConflict("concurrent write".to_string()));
665        assert!(matches!(err, Error::Transaction(_)));
666    }
667
668    #[test]
669    fn test_execute_pipeline_timeout_expired() {
670        use std::time::{Duration, Instant};
671
672        use grafeo_core::execution::pipeline::{Sink as PipelineSink, Source as PipelineSource};
673
674        struct PipelineTestSource {
675            remaining: usize,
676        }
677
678        impl PipelineSource for PipelineTestSource {
679            fn next_chunk(
680                &mut self,
681                _chunk_size: usize,
682            ) -> std::result::Result<Option<DataChunk>, OperatorError> {
683                if self.remaining == 0 {
684                    return Ok(None);
685                }
686                self.remaining -= 1;
687                Ok(Some(DataChunk::empty()))
688            }
689            fn reset(&mut self) {}
690            fn name(&self) -> &'static str {
691                "PipelineTestSource"
692            }
693        }
694
695        struct PipelineTestSink;
696
697        impl PipelineSink for PipelineTestSink {
698            fn consume(&mut self, _chunk: DataChunk) -> std::result::Result<bool, OperatorError> {
699                Ok(true)
700            }
701            fn finalize(&mut self) -> std::result::Result<(), OperatorError> {
702                Ok(())
703            }
704            fn name(&self) -> &'static str {
705                "PipelineTestSink"
706            }
707            fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
708                self
709            }
710        }
711
712        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
713        let mut pipeline = Pipeline::simple(
714            Box::new(PipelineTestSource { remaining: 10 }),
715            Box::new(PipelineTestSink),
716        )
717        .with_deadline(Some(expired));
718
719        let result = pipeline.execute().map_err(convert_operator_error);
720        assert!(result.is_err());
721        let err = result.unwrap_err();
722        assert!(
723            err.to_string().contains("Query exceeded timeout"),
724            "Expected timeout error, got: {err}"
725        );
726    }
727}