Skip to main content

grafeo_core/execution/
adaptive.rs

1//! Adaptive query execution with runtime cardinality feedback.
2//!
3//! This module provides adaptive execution capabilities that allow the query
4//! engine to adjust its execution strategy based on actual runtime cardinalities.
5//!
6//! The key insight is that cardinality estimates can be significantly wrong,
7//! especially for complex predicates or skewed data. By tracking actual row
8//! counts during execution, we can detect when our estimates are off and
9//! potentially re-optimize the remaining query plan.
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
15//! │  Optimizer  │────>│ Estimated Cards  │     │ CardinalityFeed │
16//! └─────────────┘     └──────────────────┘     │     back        │
17//!                              │               └────────┬────────┘
18//!                              v                        │
19//!                     ┌──────────────────┐              │
20//!                     │ AdaptiveContext  │<─────────────┘
21//!                     └────────┬─────────┘
22//!                              │
23//!                     ┌────────v─────────┐
24//!                     │ ReoptimizeCheck  │
25//!                     └──────────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```rust
31//! use grafeo_core::execution::adaptive::{AdaptiveContext, CardinalityCheckpoint};
32//!
33//! // Set up adaptive context with estimated cardinalities
34//! let mut ctx = AdaptiveContext::new();
35//! ctx.set_estimate("scan_1", 1000.0);
36//! ctx.set_estimate("filter_1", 100.0);  // Expected 10% selectivity
37//!
38//! // During execution, record actuals
39//! ctx.record_actual("scan_1", 1000);
40//! ctx.record_actual("filter_1", 500);   // Actual 50% selectivity!
41//!
42//! // Check if re-optimization is warranted
43//! if ctx.should_reoptimize() {
44//!     // Re-plan remaining operators with updated statistics
45//! }
46//! ```
47
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56/// Threshold for deviation that triggers re-optimization consideration.
57/// A value of 2.0 means actual cardinality is 2x or 0.5x the estimate.
58pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60/// Minimum number of rows before considering re-optimization.
61/// Helps avoid thrashing on small result sets.
62pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64/// A checkpoint for tracking cardinality at a specific point in the plan.
65#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67    /// Unique identifier for this checkpoint (typically operator name/id).
68    pub operator_id: String,
69    /// Estimated cardinality from the optimizer.
70    pub estimated: f64,
71    /// Actual row count observed during execution.
72    pub actual: u64,
73    /// Whether this checkpoint has been recorded.
74    pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78    /// Creates a new checkpoint with an estimate.
79    #[must_use]
80    pub fn new(operator_id: &str, estimated: f64) -> Self {
81        Self {
82            operator_id: operator_id.to_string(),
83            estimated,
84            actual: 0,
85            recorded: false,
86        }
87    }
88
89    /// Records the actual cardinality.
90    pub fn record(&mut self, actual: u64) {
91        self.actual = actual;
92        self.recorded = true;
93    }
94
95    /// Returns the deviation ratio (actual / estimated).
96    ///
97    /// A ratio > 1.0 means we underestimated, < 1.0 means overestimated.
98    /// Returns 1.0 if estimate is 0 to avoid division by zero.
99    #[must_use]
100    pub fn deviation_ratio(&self) -> f64 {
101        if self.estimated <= 0.0 {
102            return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103        }
104        self.actual as f64 / self.estimated
105    }
106
107    /// Returns the absolute deviation (|actual - estimated|).
108    #[must_use]
109    pub fn absolute_deviation(&self) -> f64 {
110        (self.actual as f64 - self.estimated).abs()
111    }
112
113    /// Checks if this checkpoint shows significant deviation.
114    #[must_use]
115    pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116        if !self.recorded {
117            return false;
118        }
119        let ratio = self.deviation_ratio();
120        ratio > threshold || ratio < 1.0 / threshold
121    }
122}
123
124/// Feedback from runtime execution about actual cardinalities.
125///
126/// This struct collects actual row counts at various points during query
127/// execution, allowing comparison with optimizer estimates.
128#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130    /// Actual row counts by operator ID.
131    actuals: HashMap<String, u64>,
132    /// Running count (for streaming updates).
133    running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137    /// Creates a new empty feedback collector.
138    #[must_use]
139    pub fn new() -> Self {
140        Self {
141            actuals: HashMap::new(),
142            running_counts: HashMap::new(),
143        }
144    }
145
146    /// Records the final actual cardinality for an operator.
147    pub fn record(&mut self, operator_id: &str, count: u64) {
148        self.actuals.insert(operator_id.to_string(), count);
149    }
150
151    /// Adds to the running count for an operator (thread-safe).
152    pub fn add_rows(&self, operator_id: &str, count: u64) {
153        if let Some(counter) = self.running_counts.get(operator_id) {
154            counter.fetch_add(count, Ordering::Relaxed);
155        }
156    }
157
158    /// Initializes a running counter for an operator.
159    pub fn init_counter(&mut self, operator_id: &str) {
160        self.running_counts
161            .insert(operator_id.to_string(), AtomicU64::new(0));
162    }
163
164    /// Finalizes the running count into the actuals.
165    pub fn finalize_counter(&mut self, operator_id: &str) {
166        if let Some(counter) = self.running_counts.get(operator_id) {
167            let count = counter.load(Ordering::Relaxed);
168            self.actuals.insert(operator_id.to_string(), count);
169        }
170    }
171
172    /// Gets the actual count for an operator.
173    #[must_use]
174    pub fn get(&self, operator_id: &str) -> Option<u64> {
175        self.actuals.get(operator_id).copied()
176    }
177
178    /// Gets the current running count for an operator.
179    #[must_use]
180    pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181        self.running_counts
182            .get(operator_id)
183            .map(|c| c.load(Ordering::Relaxed))
184    }
185
186    /// Returns all recorded actuals.
187    #[must_use]
188    pub fn all_actuals(&self) -> &HashMap<String, u64> {
189        &self.actuals
190    }
191}
192
193/// Context for adaptive query execution.
194///
195/// Holds both estimated and actual cardinalities, and provides methods
196/// to determine when re-optimization should occur.
197#[derive(Debug)]
198pub struct AdaptiveContext {
199    /// Checkpoints with estimates and actuals.
200    checkpoints: HashMap<String, CardinalityCheckpoint>,
201    /// Threshold ratio for triggering re-optimization.
202    reoptimization_threshold: f64,
203    /// Minimum rows before considering re-optimization.
204    min_rows: u64,
205    /// Whether re-optimization has been triggered.
206    reoptimization_triggered: bool,
207    /// Operator that caused re-optimization (if any).
208    trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212    /// Creates a new adaptive context with default settings.
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            checkpoints: HashMap::new(),
217            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218            min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219            reoptimization_triggered: false,
220            trigger_operator: None,
221        }
222    }
223
224    /// Creates a context with custom thresholds.
225    #[must_use]
226    pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227        Self {
228            checkpoints: HashMap::new(),
229            reoptimization_threshold: threshold,
230            min_rows,
231            reoptimization_triggered: false,
232            trigger_operator: None,
233        }
234    }
235
236    /// Sets the estimated cardinality for an operator.
237    pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238        self.checkpoints.insert(
239            operator_id.to_string(),
240            CardinalityCheckpoint::new(operator_id, estimate),
241        );
242    }
243
244    /// Records the actual cardinality for an operator.
245    pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246        if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247            checkpoint.record(actual);
248        } else {
249            // Create checkpoint with unknown estimate
250            let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251            checkpoint.record(actual);
252            self.checkpoints.insert(operator_id.to_string(), checkpoint);
253        }
254    }
255
256    /// Applies feedback from a `CardinalityFeedback` collector.
257    pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258        for (op_id, &actual) in feedback.all_actuals() {
259            self.record_actual(op_id, actual);
260        }
261    }
262
263    /// Checks if any checkpoint shows significant deviation.
264    #[must_use]
265    pub fn has_significant_deviation(&self) -> bool {
266        self.checkpoints
267            .values()
268            .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269    }
270
271    /// Determines if re-optimization should be triggered.
272    ///
273    /// Returns true if:
274    /// - There's significant deviation from estimates
275    /// - We've processed enough rows to make a meaningful decision
276    /// - Re-optimization hasn't already been triggered
277    #[must_use]
278    pub fn should_reoptimize(&mut self) -> bool {
279        if self.reoptimization_triggered {
280            return false;
281        }
282
283        for (op_id, checkpoint) in &self.checkpoints {
284            if checkpoint.actual < self.min_rows {
285                continue;
286            }
287
288            if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289                self.reoptimization_triggered = true;
290                self.trigger_operator = Some(op_id.clone());
291                return true;
292            }
293        }
294
295        false
296    }
297
298    /// Returns the operator that triggered re-optimization, if any.
299    #[must_use]
300    pub fn trigger_operator(&self) -> Option<&str> {
301        self.trigger_operator.as_deref()
302    }
303
304    /// Gets the checkpoint for an operator.
305    #[must_use]
306    pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307        self.checkpoints.get(operator_id)
308    }
309
310    /// Returns all checkpoints.
311    #[must_use]
312    pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313        &self.checkpoints
314    }
315
316    /// Calculates a correction factor for a specific operator.
317    ///
318    /// This factor can be used to adjust remaining cardinality estimates.
319    #[must_use]
320    pub fn correction_factor(&self, operator_id: &str) -> f64 {
321        self.checkpoints
322            .get(operator_id)
323            .filter(|cp| cp.recorded)
324            .map_or(1.0, CardinalityCheckpoint::deviation_ratio)
325    }
326
327    /// Returns summary statistics about the adaptive execution.
328    #[must_use]
329    pub fn summary(&self) -> AdaptiveSummary {
330        let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
331        let deviation_count = self
332            .checkpoints
333            .values()
334            .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
335            .count();
336
337        let avg_deviation = if recorded_count > 0 {
338            self.checkpoints
339                .values()
340                .filter(|cp| cp.recorded)
341                .map(CardinalityCheckpoint::deviation_ratio)
342                .sum::<f64>()
343                / recorded_count as f64
344        } else {
345            1.0
346        };
347
348        let max_deviation = self
349            .checkpoints
350            .values()
351            .filter(|cp| cp.recorded)
352            .map(|cp| {
353                let ratio = cp.deviation_ratio();
354                if ratio > 1.0 { ratio } else { 1.0 / ratio }
355            })
356            .fold(1.0_f64, f64::max);
357
358        AdaptiveSummary {
359            checkpoint_count: self.checkpoints.len(),
360            recorded_count,
361            deviation_count,
362            avg_deviation_ratio: avg_deviation,
363            max_deviation_ratio: max_deviation,
364            reoptimization_triggered: self.reoptimization_triggered,
365            trigger_operator: self.trigger_operator.clone(),
366        }
367    }
368
369    /// Resets the context for a new execution.
370    pub fn reset(&mut self) {
371        for checkpoint in self.checkpoints.values_mut() {
372            checkpoint.actual = 0;
373            checkpoint.recorded = false;
374        }
375        self.reoptimization_triggered = false;
376        self.trigger_operator = None;
377    }
378}
379
380impl Default for AdaptiveContext {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386/// Summary of adaptive execution statistics.
387#[derive(Debug, Clone, Default)]
388pub struct AdaptiveSummary {
389    /// Total number of checkpoints.
390    pub checkpoint_count: usize,
391    /// Number of checkpoints with recorded actuals.
392    pub recorded_count: usize,
393    /// Number of checkpoints with significant deviation.
394    pub deviation_count: usize,
395    /// Average deviation ratio across all recorded checkpoints.
396    pub avg_deviation_ratio: f64,
397    /// Maximum deviation ratio observed.
398    pub max_deviation_ratio: f64,
399    /// Whether re-optimization was triggered.
400    pub reoptimization_triggered: bool,
401    /// Operator that triggered re-optimization.
402    pub trigger_operator: Option<String>,
403}
404
405/// Thread-safe wrapper for `AdaptiveContext`.
406///
407/// Allows multiple operators to report cardinalities concurrently.
408#[derive(Debug, Clone)]
409pub struct SharedAdaptiveContext {
410    inner: Arc<RwLock<AdaptiveContext>>,
411}
412
413impl SharedAdaptiveContext {
414    /// Creates a new shared context.
415    #[must_use]
416    pub fn new() -> Self {
417        Self {
418            inner: Arc::new(RwLock::new(AdaptiveContext::new())),
419        }
420    }
421
422    /// Creates from an existing context.
423    #[must_use]
424    pub fn from_context(ctx: AdaptiveContext) -> Self {
425        Self {
426            inner: Arc::new(RwLock::new(ctx)),
427        }
428    }
429
430    /// Records actual cardinality for an operator.
431    pub fn record_actual(&self, operator_id: &str, actual: u64) {
432        if let Ok(mut ctx) = self.inner.write() {
433            ctx.record_actual(operator_id, actual);
434        }
435    }
436
437    /// Checks if re-optimization should be triggered.
438    #[must_use]
439    pub fn should_reoptimize(&self) -> bool {
440        if let Ok(mut ctx) = self.inner.write() {
441            ctx.should_reoptimize()
442        } else {
443            false
444        }
445    }
446
447    /// Gets a read-only snapshot of the context.
448    #[must_use]
449    pub fn snapshot(&self) -> Option<AdaptiveContext> {
450        self.inner.read().ok().map(|guard| AdaptiveContext {
451            checkpoints: guard.checkpoints.clone(),
452            reoptimization_threshold: guard.reoptimization_threshold,
453            min_rows: guard.min_rows,
454            reoptimization_triggered: guard.reoptimization_triggered,
455            trigger_operator: guard.trigger_operator.clone(),
456        })
457    }
458}
459
460impl Default for SharedAdaptiveContext {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466/// A wrapper operator that tracks cardinality and reports to an adaptive context.
467///
468/// This wraps any `PushOperator` and counts the rows flowing through it,
469/// reporting the count to the adaptive context.
470pub struct CardinalityTrackingOperator {
471    /// The wrapped operator.
472    inner: Box<dyn PushOperator>,
473    /// Operator identifier for reporting.
474    operator_id: String,
475    /// Row counter.
476    row_count: u64,
477    /// Shared adaptive context for reporting.
478    context: SharedAdaptiveContext,
479}
480
481impl CardinalityTrackingOperator {
482    /// Creates a new tracking wrapper.
483    pub fn new(
484        inner: Box<dyn PushOperator>,
485        operator_id: &str,
486        context: SharedAdaptiveContext,
487    ) -> Self {
488        Self {
489            inner,
490            operator_id: operator_id.to_string(),
491            row_count: 0,
492            context,
493        }
494    }
495
496    /// Returns the current row count.
497    #[must_use]
498    pub fn current_count(&self) -> u64 {
499        self.row_count
500    }
501}
502
503impl PushOperator for CardinalityTrackingOperator {
504    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
505        // Track input rows
506        self.row_count += chunk.len() as u64;
507
508        // Push through to inner operator
509        self.inner.push(chunk, sink)
510    }
511
512    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
513        // Report final cardinality to context
514        self.context
515            .record_actual(&self.operator_id, self.row_count);
516
517        // Finalize inner operator
518        self.inner.finalize(sink)
519    }
520
521    fn preferred_chunk_size(&self) -> ChunkSizeHint {
522        self.inner.preferred_chunk_size()
523    }
524
525    fn name(&self) -> &'static str {
526        // Return the inner operator's name
527        self.inner.name()
528    }
529}
530
531/// A sink that tracks cardinality of data flowing through it.
532pub struct CardinalityTrackingSink {
533    /// The wrapped sink.
534    inner: Box<dyn Sink>,
535    /// Operator identifier for reporting.
536    operator_id: String,
537    /// Row counter.
538    row_count: u64,
539    /// Shared adaptive context for reporting.
540    context: SharedAdaptiveContext,
541}
542
543impl CardinalityTrackingSink {
544    /// Creates a new tracking sink wrapper.
545    pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
546        Self {
547            inner,
548            operator_id: operator_id.to_string(),
549            row_count: 0,
550            context,
551        }
552    }
553
554    /// Returns the current row count.
555    #[must_use]
556    pub fn current_count(&self) -> u64 {
557        self.row_count
558    }
559}
560
561impl Sink for CardinalityTrackingSink {
562    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
563        self.row_count += chunk.len() as u64;
564        self.inner.consume(chunk)
565    }
566
567    fn finalize(&mut self) -> Result<(), OperatorError> {
568        // Report final cardinality
569        self.context
570            .record_actual(&self.operator_id, self.row_count);
571        self.inner.finalize()
572    }
573
574    fn name(&self) -> &'static str {
575        self.inner.name()
576    }
577
578    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
579        self
580    }
581}
582
583/// Decision about whether to re-optimize a query.
584#[derive(Debug, Clone, PartialEq)]
585#[non_exhaustive]
586pub enum ReoptimizationDecision {
587    /// Continue with current plan.
588    Continue,
589    /// Re-optimize the remaining plan.
590    Reoptimize {
591        /// The operator that triggered re-optimization.
592        trigger: String,
593        /// Correction factors to apply to remaining estimates.
594        corrections: HashMap<String, f64>,
595    },
596    /// Abort the query (catastrophic misestimate).
597    Abort {
598        /// The reason for aborting the query.
599        reason: String,
600    },
601}
602
603/// Evaluates whether re-optimization should occur based on context.
604#[must_use]
605pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
606    let summary = ctx.summary();
607
608    // If no significant deviations, continue
609    if !summary.reoptimization_triggered {
610        return ReoptimizationDecision::Continue;
611    }
612
613    // If deviation is catastrophic (>100x), consider aborting
614    if summary.max_deviation_ratio > 100.0 {
615        return ReoptimizationDecision::Abort {
616            reason: format!(
617                "Catastrophic cardinality misestimate: {}x deviation",
618                summary.max_deviation_ratio
619            ),
620        };
621    }
622
623    // Build correction factors
624    let corrections: HashMap<String, f64> = ctx
625        .all_checkpoints()
626        .iter()
627        .filter(|(_, cp)| cp.recorded)
628        .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
629        .collect();
630
631    ReoptimizationDecision::Reoptimize {
632        trigger: summary.trigger_operator.unwrap_or_default(),
633        corrections,
634    }
635}
636
637/// Callback for creating a new plan based on observed cardinalities.
638///
639/// This is called when the adaptive pipeline detects significant deviation
640/// and decides to re-optimize. The callback receives the adaptive context
641/// with recorded actuals and should return a new set of operators.
642pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
643
644/// Configuration for adaptive pipeline execution.
645#[derive(Debug, Clone)]
646pub struct AdaptivePipelineConfig {
647    /// Number of rows to process before checking for re-optimization.
648    pub check_interval: u64,
649    /// Threshold for deviation that triggers re-optimization.
650    pub reoptimization_threshold: f64,
651    /// Minimum rows before considering re-optimization.
652    pub min_rows_for_reoptimization: u64,
653    /// Maximum number of re-optimizations allowed per query.
654    pub max_reoptimizations: usize,
655}
656
657impl Default for AdaptivePipelineConfig {
658    fn default() -> Self {
659        Self {
660            check_interval: 10_000,
661            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
662            min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
663            max_reoptimizations: 3,
664        }
665    }
666}
667
668impl AdaptivePipelineConfig {
669    /// Creates a new configuration with custom thresholds.
670    #[must_use]
671    pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
672        Self {
673            check_interval,
674            reoptimization_threshold: threshold,
675            min_rows_for_reoptimization: min_rows,
676            max_reoptimizations: 3,
677        }
678    }
679
680    /// Sets the maximum number of re-optimizations allowed.
681    #[must_use]
682    pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
683        self.max_reoptimizations = max;
684        self
685    }
686}
687
688/// Result of executing an adaptive pipeline.
689#[derive(Debug, Clone)]
690pub struct AdaptiveExecutionResult {
691    /// Total rows processed.
692    pub total_rows: u64,
693    /// Number of re-optimizations performed.
694    pub reoptimization_count: usize,
695    /// Operators that triggered re-optimization.
696    pub triggers: Vec<String>,
697    /// Final adaptive context with all recorded actuals.
698    pub final_context: AdaptiveSummary,
699}
700
701/// A checkpoint during adaptive execution where plan switching can occur.
702///
703/// Checkpoints are placed at strategic points in the pipeline (typically after
704/// filters or joins) where switching to a new plan makes sense.
705#[derive(Debug)]
706pub struct AdaptiveCheckpoint {
707    /// Unique identifier for this checkpoint.
708    pub id: String,
709    /// Operator index in the pipeline (after which this checkpoint occurs).
710    pub after_operator: usize,
711    /// Estimated cardinality at this point.
712    pub estimated_cardinality: f64,
713    /// Actual rows seen so far.
714    pub actual_rows: u64,
715    /// Whether this checkpoint has triggered re-optimization.
716    pub triggered: bool,
717}
718
719impl AdaptiveCheckpoint {
720    /// Creates a new checkpoint.
721    #[must_use]
722    pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
723        Self {
724            id: id.to_string(),
725            after_operator,
726            estimated_cardinality: estimated,
727            actual_rows: 0,
728            triggered: false,
729        }
730    }
731
732    /// Records rows passing through this checkpoint.
733    pub fn record_rows(&mut self, count: u64) {
734        self.actual_rows += count;
735    }
736
737    /// Checks if deviation exceeds threshold.
738    #[must_use]
739    pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
740        if self.actual_rows < min_rows {
741            return false;
742        }
743        if self.estimated_cardinality <= 0.0 {
744            return self.actual_rows > 0;
745        }
746        let ratio = self.actual_rows as f64 / self.estimated_cardinality;
747        ratio > threshold || ratio < 1.0 / threshold
748    }
749}
750
751/// Event emitted during adaptive execution.
752#[derive(Debug, Clone)]
753#[non_exhaustive]
754pub enum AdaptiveEvent {
755    /// A checkpoint was reached.
756    CheckpointReached {
757        /// The checkpoint identifier.
758        id: String,
759        /// The actual number of rows observed.
760        actual_rows: u64,
761        /// The estimated number of rows from the optimizer.
762        estimated: f64,
763    },
764    /// Re-optimization was triggered.
765    ReoptimizationTriggered {
766        /// The checkpoint that triggered re-optimization.
767        checkpoint_id: String,
768        /// The ratio between actual and estimated rows.
769        deviation_ratio: f64,
770    },
771    /// Plan was switched.
772    PlanSwitched {
773        /// The number of operators in the previous plan.
774        old_operator_count: usize,
775        /// The number of operators in the new plan.
776        new_operator_count: usize,
777    },
778    /// Execution completed.
779    ExecutionCompleted {
780        /// The total number of rows produced.
781        total_rows: u64,
782    },
783}
784
785/// Callback for observing adaptive execution events.
786pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
787
788/// Builder for creating adaptive pipelines.
789pub struct AdaptivePipelineBuilder {
790    checkpoints: Vec<AdaptiveCheckpoint>,
791    config: AdaptivePipelineConfig,
792    context: AdaptiveContext,
793    event_callback: Option<AdaptiveEventCallback>,
794}
795
796impl AdaptivePipelineBuilder {
797    /// Creates a new builder with default configuration.
798    #[must_use]
799    pub fn new() -> Self {
800        Self {
801            checkpoints: Vec::new(),
802            config: AdaptivePipelineConfig::default(),
803            context: AdaptiveContext::new(),
804            event_callback: None,
805        }
806    }
807
808    /// Sets the configuration.
809    #[must_use]
810    pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
811        self.config = config;
812        self
813    }
814
815    /// Adds a checkpoint at the specified operator index.
816    #[must_use]
817    pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
818        self.checkpoints
819            .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
820        self.context.set_estimate(id, estimated);
821        self
822    }
823
824    /// Sets an event callback for observing execution.
825    #[must_use]
826    pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
827        self.event_callback = Some(callback);
828        self
829    }
830
831    /// Sets estimates from a pre-configured context.
832    #[must_use]
833    pub fn with_context(mut self, context: AdaptiveContext) -> Self {
834        self.context = context;
835        self
836    }
837
838    /// Builds the configuration for adaptive execution.
839    #[must_use]
840    pub fn build(self) -> AdaptiveExecutionConfig {
841        AdaptiveExecutionConfig {
842            checkpoints: self.checkpoints,
843            config: self.config,
844            context: self.context,
845            event_callback: self.event_callback,
846        }
847    }
848}
849
850impl Default for AdaptivePipelineBuilder {
851    fn default() -> Self {
852        Self::new()
853    }
854}
855
856/// Configuration for adaptive execution, built by `AdaptivePipelineBuilder`.
857pub struct AdaptiveExecutionConfig {
858    /// Checkpoints for monitoring cardinality.
859    pub checkpoints: Vec<AdaptiveCheckpoint>,
860    /// Execution configuration.
861    pub config: AdaptivePipelineConfig,
862    /// Adaptive context with estimates.
863    pub context: AdaptiveContext,
864    /// Optional event callback.
865    pub event_callback: Option<AdaptiveEventCallback>,
866}
867
868impl AdaptiveExecutionConfig {
869    /// Returns a summary of the adaptive execution after it completes.
870    #[must_use]
871    pub fn summary(&self) -> AdaptiveSummary {
872        self.context.summary()
873    }
874
875    /// Records actual cardinality for a checkpoint.
876    pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
877        self.context.record_actual(checkpoint_id, actual);
878
879        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
880            cp.actual_rows = actual;
881        }
882
883        if let Some(ref callback) = self.event_callback {
884            let estimated = self
885                .context
886                .get_checkpoint(checkpoint_id)
887                .map_or(0.0, |cp| cp.estimated);
888            callback(AdaptiveEvent::CheckpointReached {
889                id: checkpoint_id.to_string(),
890                actual_rows: actual,
891                estimated,
892            });
893        }
894    }
895
896    /// Checks if any checkpoint exceeds the deviation threshold.
897    #[must_use]
898    pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
899        self.checkpoints.iter().find(|cp| {
900            !cp.triggered
901                && cp.exceeds_threshold(
902                    self.config.reoptimization_threshold,
903                    self.config.min_rows_for_reoptimization,
904                )
905        })
906    }
907
908    /// Marks a checkpoint as having triggered re-optimization.
909    pub fn mark_triggered(&mut self, checkpoint_id: &str) {
910        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
911            cp.triggered = true;
912        }
913
914        if let Some(ref callback) = self.event_callback {
915            let ratio = self
916                .context
917                .get_checkpoint(checkpoint_id)
918                .filter(|cp| cp.recorded)
919                .map_or(1.0, |cp| cp.deviation_ratio());
920            callback(AdaptiveEvent::ReoptimizationTriggered {
921                checkpoint_id: checkpoint_id.to_string(),
922                deviation_ratio: ratio,
923            });
924        }
925    }
926}
927
928// ============= Pull-Based Operator Tracking =============
929
930use super::operators::{Operator, OperatorResult}; // OperatorError imported above
931
932/// A wrapper that tracks cardinality for pull-based operators.
933///
934/// This wraps any `Operator` and counts the rows flowing through it,
935/// reporting the count to the adaptive context. Use this for integrating
936/// adaptive execution with the standard pull-based executor.
937pub struct CardinalityTrackingWrapper {
938    /// The wrapped operator.
939    inner: Box<dyn Operator>,
940    /// Operator identifier for reporting.
941    operator_id: String,
942    /// Row counter.
943    row_count: u64,
944    /// Shared adaptive context for reporting.
945    context: SharedAdaptiveContext,
946    /// Whether finalization has been reported.
947    finalized: bool,
948}
949
950impl CardinalityTrackingWrapper {
951    /// Creates a new tracking wrapper for a pull-based operator.
952    pub fn new(
953        inner: Box<dyn Operator>,
954        operator_id: &str,
955        context: SharedAdaptiveContext,
956    ) -> Self {
957        Self {
958            inner,
959            operator_id: operator_id.to_string(),
960            row_count: 0,
961            context,
962            finalized: false,
963        }
964    }
965
966    /// Returns the current row count.
967    #[must_use]
968    pub fn current_count(&self) -> u64 {
969        self.row_count
970    }
971
972    /// Reports the final cardinality to the context.
973    fn report_final(&mut self) {
974        if !self.finalized {
975            self.context
976                .record_actual(&self.operator_id, self.row_count);
977            self.finalized = true;
978        }
979    }
980}
981
982impl Operator for CardinalityTrackingWrapper {
983    fn next(&mut self) -> OperatorResult {
984        match self.inner.next() {
985            Ok(Some(chunk)) => {
986                // Track rows
987                self.row_count += chunk.row_count() as u64;
988                Ok(Some(chunk))
989            }
990            Ok(None) => {
991                // Stream exhausted - report final cardinality
992                self.report_final();
993                Ok(None)
994            }
995            Err(e) => {
996                // Report on error too
997                self.report_final();
998                Err(e)
999            }
1000        }
1001    }
1002
1003    fn reset(&mut self) {
1004        self.row_count = 0;
1005        self.finalized = false;
1006        self.inner.reset();
1007    }
1008
1009    fn name(&self) -> &'static str {
1010        self.inner.name()
1011    }
1012
1013    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1014        self
1015    }
1016}
1017
1018impl Drop for CardinalityTrackingWrapper {
1019    fn drop(&mut self) {
1020        // Ensure we report even if dropped early
1021        self.report_final();
1022    }
1023}
1024
1025// ============= Adaptive Pipeline Execution =============
1026
1027use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; // Sink imported above
1028use super::sink::CollectorSink;
1029use super::source::OperatorSource;
1030
1031/// High-level adaptive pipeline that executes a pull-based operator with
1032/// cardinality tracking using push-based infrastructure.
1033///
1034/// This bridges the pull-based planner output with push-based execution:
1035/// 1. Wraps the pull operator as an `OperatorSource`
1036/// 2. Uses `CardinalityTrackingSink` to track output cardinality
1037/// 3. Provides adaptive feedback through `AdaptiveContext`
1038///
1039/// # Example
1040///
1041/// ```no_run
1042/// # use grafeo_core::execution::adaptive::{AdaptiveContext, AdaptivePipelineExecutor};
1043/// # use grafeo_core::execution::operators::Operator;
1044/// # fn example(operator: Box<dyn Operator>, adaptive_context: AdaptiveContext) -> Result<(), Box<dyn std::error::Error>> {
1045/// let executor = AdaptivePipelineExecutor::new(operator, adaptive_context);
1046/// let (chunks, summary) = executor.execute()?;
1047/// # Ok(())
1048/// # }
1049/// ```
1050pub struct AdaptivePipelineExecutor {
1051    source: OperatorSource,
1052    context: SharedAdaptiveContext,
1053    config: AdaptivePipelineConfig,
1054}
1055
1056impl AdaptivePipelineExecutor {
1057    /// Creates a new adaptive pipeline executor.
1058    ///
1059    /// # Arguments
1060    ///
1061    /// * `operator` - The pull-based operator to execute
1062    /// * `context` - Adaptive context with cardinality estimates
1063    pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1064        Self {
1065            source: OperatorSource::new(operator),
1066            context: SharedAdaptiveContext::from_context(context),
1067            config: AdaptivePipelineConfig::default(),
1068        }
1069    }
1070
1071    /// Creates an executor with custom configuration.
1072    pub fn with_config(
1073        operator: Box<dyn Operator>,
1074        context: AdaptiveContext,
1075        config: AdaptivePipelineConfig,
1076    ) -> Self {
1077        Self {
1078            source: OperatorSource::new(operator),
1079            context: SharedAdaptiveContext::from_context(context),
1080            config,
1081        }
1082    }
1083
1084    /// Executes the pipeline and returns collected chunks with adaptive summary.
1085    ///
1086    /// # Returns
1087    ///
1088    /// A tuple of (collected DataChunks, adaptive execution summary).
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns an error if execution fails.
1093    pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1094        let mut sink = CardinalityTrackingSink::new(
1095            Box::new(CollectorSink::new()),
1096            "output",
1097            self.context.clone(),
1098        );
1099
1100        let chunk_size = DEFAULT_CHUNK_SIZE;
1101        let mut total_rows: u64 = 0;
1102        let check_interval = self.config.check_interval;
1103
1104        // Process all chunks from source
1105        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1106            let chunk_rows = chunk.len() as u64;
1107            total_rows += chunk_rows;
1108
1109            // Push to tracking sink
1110            let continue_exec = sink.consume(chunk)?;
1111            if !continue_exec {
1112                break;
1113            }
1114
1115            // Periodically check for reoptimization need
1116            if total_rows >= check_interval
1117                && total_rows.is_multiple_of(check_interval)
1118                && self.context.should_reoptimize()
1119            {
1120                // Log or emit event that reoptimization would be triggered
1121                // Full re-planning would happen at a higher level
1122            }
1123        }
1124
1125        // Finalize sink
1126        sink.finalize()?;
1127
1128        // Extract results from the inner sink
1129        let summary = self
1130            .context
1131            .snapshot()
1132            .map(|ctx| ctx.summary())
1133            .unwrap_or_default();
1134
1135        // Get collected chunks from the inner CollectorSink
1136        // Note: We need to extract chunks from the wrapped sink
1137        // For now, we'll return the summary and the caller can collect separately
1138        Ok((Vec::new(), summary))
1139    }
1140
1141    /// Executes and collects all results into DataChunks.
1142    ///
1143    /// This is a simpler interface that handles chunk collection internally.
1144    ///
1145    /// # Errors
1146    ///
1147    /// Returns `Err` if any operator in the pipeline fails during execution.
1148    pub fn execute_collecting(
1149        mut self,
1150    ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1151        let mut chunks = Vec::new();
1152        let chunk_size = DEFAULT_CHUNK_SIZE;
1153        let mut total_rows: u64 = 0;
1154        let check_interval = self.config.check_interval;
1155
1156        // Process all chunks from source
1157        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1158            let chunk_rows = chunk.len() as u64;
1159            total_rows += chunk_rows;
1160
1161            // Record cardinality
1162            self.context.record_actual("root", total_rows);
1163
1164            // Collect the chunk
1165            if !chunk.is_empty() {
1166                chunks.push(chunk);
1167            }
1168
1169            // Periodically check for reoptimization
1170            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1171                let _ = self.context.should_reoptimize();
1172            }
1173        }
1174
1175        let summary = self
1176            .context
1177            .snapshot()
1178            .map(|ctx| ctx.summary())
1179            .unwrap_or_default();
1180
1181        Ok((chunks, summary))
1182    }
1183
1184    /// Returns a reference to the shared context for external monitoring.
1185    pub fn context(&self) -> &SharedAdaptiveContext {
1186        &self.context
1187    }
1188}
1189
1190/// Convenience function to execute a pull-based operator with adaptive tracking.
1191///
1192/// This is the recommended entry point for adaptive execution.
1193///
1194/// # Arguments
1195///
1196/// * `operator` - The pull-based operator to execute
1197/// * `context` - Adaptive context with cardinality estimates (or None for default)
1198/// * `config` - Optional configuration (uses defaults if None)
1199///
1200/// # Returns
1201///
1202/// A tuple of (collected DataChunks, adaptive summary if tracking was enabled).
1203///
1204/// # Errors
1205///
1206/// Returns `Err` if any operator in the pipeline fails during execution.
1207pub fn execute_adaptive(
1208    operator: Box<dyn Operator>,
1209    context: Option<AdaptiveContext>,
1210    config: Option<AdaptivePipelineConfig>,
1211) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1212    let ctx = context.unwrap_or_default();
1213    let cfg = config.unwrap_or_default();
1214
1215    let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1216    let (chunks, summary) = executor.execute_collecting()?;
1217
1218    Ok((chunks, Some(summary)))
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224
1225    #[test]
1226    fn test_checkpoint_deviation_ratio() {
1227        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1228        cp.record(200);
1229
1230        // Actual is 2x estimate
1231        assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1232    }
1233
1234    #[test]
1235    fn test_checkpoint_underestimate() {
1236        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1237        cp.record(500);
1238
1239        // Underestimated by 5x
1240        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1241        assert!(cp.is_significant_deviation(3.0));
1242    }
1243
1244    #[test]
1245    fn test_checkpoint_overestimate() {
1246        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1247        cp.record(20);
1248
1249        // Overestimated - actual is 0.2x estimate
1250        assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1251        assert!(cp.is_significant_deviation(3.0)); // 0.2 < 1/3
1252    }
1253
1254    #[test]
1255    fn test_checkpoint_accurate() {
1256        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1257        cp.record(110);
1258
1259        // Close to estimate
1260        assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1261        assert!(!cp.is_significant_deviation(3.0)); // 1.1 is within threshold
1262    }
1263
1264    #[test]
1265    fn test_checkpoint_zero_estimate() {
1266        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1267        cp.record(100);
1268
1269        // Zero estimate with actual rows = infinity
1270        assert!(cp.deviation_ratio().is_infinite());
1271    }
1272
1273    #[test]
1274    fn test_checkpoint_zero_both() {
1275        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1276        cp.record(0);
1277
1278        // Zero estimate and zero actual = ratio of 1.0
1279        assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1280    }
1281
1282    #[test]
1283    fn test_feedback_collection() {
1284        let mut feedback = CardinalityFeedback::new();
1285        feedback.record("scan_1", 1000);
1286        feedback.record("filter_1", 100);
1287
1288        assert_eq!(feedback.get("scan_1"), Some(1000));
1289        assert_eq!(feedback.get("filter_1"), Some(100));
1290        assert_eq!(feedback.get("unknown"), None);
1291    }
1292
1293    #[test]
1294    fn test_feedback_running_counter() {
1295        let mut feedback = CardinalityFeedback::new();
1296        feedback.init_counter("op_1");
1297
1298        feedback.add_rows("op_1", 100);
1299        feedback.add_rows("op_1", 200);
1300        feedback.add_rows("op_1", 50);
1301
1302        assert_eq!(feedback.get_running("op_1"), Some(350));
1303
1304        feedback.finalize_counter("op_1");
1305        assert_eq!(feedback.get("op_1"), Some(350));
1306    }
1307
1308    #[test]
1309    fn test_adaptive_context_basic() {
1310        let mut ctx = AdaptiveContext::new();
1311        ctx.set_estimate("scan", 1000.0);
1312        ctx.set_estimate("filter", 100.0);
1313
1314        ctx.record_actual("scan", 1000);
1315        ctx.record_actual("filter", 500); // 5x underestimate
1316
1317        let cp = ctx.get_checkpoint("filter").unwrap();
1318        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1319    }
1320
1321    #[test]
1322    fn test_adaptive_context_should_reoptimize() {
1323        let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1324        ctx.set_estimate("scan", 10000.0);
1325        ctx.set_estimate("filter", 1000.0);
1326
1327        ctx.record_actual("scan", 10000);
1328        ctx.record_actual("filter", 5000); // 5x underestimate
1329
1330        assert!(ctx.should_reoptimize());
1331        assert_eq!(ctx.trigger_operator(), Some("filter"));
1332
1333        // Second call should return false (already triggered)
1334        assert!(!ctx.should_reoptimize());
1335    }
1336
1337    #[test]
1338    fn test_adaptive_context_min_rows() {
1339        let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1340        ctx.set_estimate("filter", 100.0);
1341        ctx.record_actual("filter", 500); // 5x, but only 500 rows
1342
1343        // Should not trigger because we haven't seen enough rows
1344        assert!(!ctx.should_reoptimize());
1345    }
1346
1347    #[test]
1348    fn test_adaptive_context_no_deviation() {
1349        let mut ctx = AdaptiveContext::new();
1350        ctx.set_estimate("scan", 1000.0);
1351        ctx.record_actual("scan", 1100); // Close to estimate
1352
1353        assert!(!ctx.has_significant_deviation());
1354        assert!(!ctx.should_reoptimize());
1355    }
1356
1357    #[test]
1358    fn test_adaptive_context_correction_factor() {
1359        let mut ctx = AdaptiveContext::new();
1360        ctx.set_estimate("filter", 100.0);
1361        ctx.record_actual("filter", 300);
1362
1363        assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1364        assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1365    }
1366
1367    #[test]
1368    fn test_adaptive_context_apply_feedback() {
1369        let mut ctx = AdaptiveContext::new();
1370        ctx.set_estimate("scan", 1000.0);
1371        ctx.set_estimate("filter", 100.0);
1372
1373        let mut feedback = CardinalityFeedback::new();
1374        feedback.record("scan", 1000);
1375        feedback.record("filter", 500);
1376
1377        ctx.apply_feedback(&feedback);
1378
1379        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1380        assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1381    }
1382
1383    #[test]
1384    fn test_adaptive_summary() {
1385        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1386        ctx.set_estimate("op1", 100.0);
1387        ctx.set_estimate("op2", 200.0);
1388        ctx.set_estimate("op3", 300.0);
1389
1390        ctx.record_actual("op1", 100); // Exact
1391        ctx.record_actual("op2", 600); // 3x
1392
1393        // Trigger reoptimization
1394        let _ = ctx.should_reoptimize();
1395
1396        let summary = ctx.summary();
1397        assert_eq!(summary.checkpoint_count, 3);
1398        assert_eq!(summary.recorded_count, 2);
1399        assert_eq!(summary.deviation_count, 1);
1400        assert!(summary.reoptimization_triggered);
1401    }
1402
1403    #[test]
1404    fn test_adaptive_context_reset() {
1405        let mut ctx = AdaptiveContext::new();
1406        ctx.set_estimate("scan", 1000.0);
1407        ctx.record_actual("scan", 5000);
1408        let _ = ctx.should_reoptimize(); // Trigger
1409
1410        assert!(ctx.reoptimization_triggered);
1411
1412        ctx.reset();
1413
1414        assert!(!ctx.reoptimization_triggered);
1415        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1416        assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1417        // Estimate should be preserved
1418        assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1419    }
1420
1421    #[test]
1422    fn test_shared_context() {
1423        let ctx = SharedAdaptiveContext::new();
1424
1425        ctx.record_actual("op1", 1000);
1426
1427        let snapshot = ctx.snapshot().unwrap();
1428        assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1429    }
1430
1431    #[test]
1432    fn test_reoptimization_decision_continue() {
1433        let mut ctx = AdaptiveContext::new();
1434        ctx.set_estimate("scan", 1000.0);
1435        ctx.record_actual("scan", 1100);
1436
1437        let decision = evaluate_reoptimization(&ctx);
1438        assert_eq!(decision, ReoptimizationDecision::Continue);
1439    }
1440
1441    #[test]
1442    fn test_reoptimization_decision_reoptimize() {
1443        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1444        ctx.set_estimate("filter", 100.0);
1445        ctx.record_actual("filter", 500);
1446        let _ = ctx.should_reoptimize(); // Trigger
1447
1448        let decision = evaluate_reoptimization(&ctx);
1449
1450        if let ReoptimizationDecision::Reoptimize {
1451            trigger,
1452            corrections,
1453        } = decision
1454        {
1455            assert_eq!(trigger, "filter");
1456            assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1457        } else {
1458            panic!("Expected Reoptimize decision");
1459        }
1460    }
1461
1462    #[test]
1463    fn test_reoptimization_decision_abort() {
1464        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1465        ctx.set_estimate("filter", 1.0);
1466        ctx.record_actual("filter", 1000); // 1000x deviation!
1467        let _ = ctx.should_reoptimize();
1468
1469        let decision = evaluate_reoptimization(&ctx);
1470
1471        if let ReoptimizationDecision::Abort { reason } = decision {
1472            assert!(reason.contains("Catastrophic"));
1473        } else {
1474            panic!("Expected Abort decision");
1475        }
1476    }
1477
1478    #[test]
1479    fn test_absolute_deviation() {
1480        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1481        cp.record(150);
1482
1483        assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1484    }
1485
1486    // ============= Plan Switching Tests =============
1487
1488    #[test]
1489    fn test_adaptive_checkpoint_basic() {
1490        let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1491        assert_eq!(cp.actual_rows, 0);
1492        assert!(!cp.triggered);
1493
1494        cp.record_rows(50);
1495        assert_eq!(cp.actual_rows, 50);
1496
1497        cp.record_rows(100);
1498        assert_eq!(cp.actual_rows, 150);
1499    }
1500
1501    #[test]
1502    fn test_adaptive_checkpoint_exceeds_threshold() {
1503        let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1504
1505        // Below min rows
1506        cp.record_rows(50);
1507        assert!(!cp.exceeds_threshold(2.0, 100));
1508
1509        // Above min rows but within threshold
1510        cp.record_rows(50);
1511        assert!(!cp.exceeds_threshold(2.0, 100)); // 100 actual vs 100 estimated = 1.0x
1512
1513        // Above threshold (underestimate)
1514        cp.actual_rows = 0;
1515        cp.record_rows(500);
1516        assert!(cp.exceeds_threshold(2.0, 100)); // 500 actual vs 100 estimated = 5.0x
1517
1518        // Above threshold (overestimate)
1519        let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1520        cp2.record_rows(200);
1521        assert!(cp2.exceeds_threshold(2.0, 100)); // 200 actual vs 1000 estimated = 0.2x
1522    }
1523
1524    #[test]
1525    fn test_adaptive_pipeline_config_default() {
1526        let config = AdaptivePipelineConfig::default();
1527
1528        assert_eq!(config.check_interval, 10_000);
1529        assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1530        assert_eq!(
1531            config.min_rows_for_reoptimization,
1532            MIN_ROWS_FOR_REOPTIMIZATION
1533        );
1534        assert_eq!(config.max_reoptimizations, 3);
1535    }
1536
1537    #[test]
1538    fn test_adaptive_pipeline_config_custom() {
1539        let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1540
1541        assert_eq!(config.check_interval, 5000);
1542        assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1543        assert_eq!(config.min_rows_for_reoptimization, 500);
1544        assert_eq!(config.max_reoptimizations, 5);
1545    }
1546
1547    #[test]
1548    fn test_adaptive_pipeline_builder() {
1549        let config = AdaptivePipelineBuilder::new()
1550            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1551            .with_checkpoint("scan", 0, 10000.0)
1552            .with_checkpoint("filter", 1, 1000.0)
1553            .build();
1554
1555        assert_eq!(config.checkpoints.len(), 2);
1556        assert_eq!(config.checkpoints[0].id, "scan");
1557        assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1558        assert_eq!(config.checkpoints[1].id, "filter");
1559        assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1560    }
1561
1562    #[test]
1563    fn test_adaptive_execution_config_record_checkpoint() {
1564        let mut config = AdaptivePipelineBuilder::new()
1565            .with_checkpoint("filter", 0, 100.0)
1566            .build();
1567
1568        config.record_checkpoint("filter", 500);
1569
1570        // Check context was updated
1571        let cp = config.context.get_checkpoint("filter").unwrap();
1572        assert_eq!(cp.actual, 500);
1573        assert!(cp.recorded);
1574
1575        // Check checkpoint was updated
1576        let acp = config
1577            .checkpoints
1578            .iter()
1579            .find(|c| c.id == "filter")
1580            .unwrap();
1581        assert_eq!(acp.actual_rows, 500);
1582    }
1583
1584    #[test]
1585    fn test_adaptive_execution_config_should_reoptimize() {
1586        let mut config = AdaptivePipelineBuilder::new()
1587            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1588            .with_checkpoint("filter", 0, 100.0)
1589            .build();
1590
1591        // No data yet - should not trigger
1592        assert!(config.should_reoptimize().is_none());
1593
1594        // Record within threshold
1595        config.record_checkpoint("filter", 150);
1596        assert!(config.should_reoptimize().is_none()); // 1.5x is within 2.0x threshold
1597
1598        // Record exceeding threshold
1599        config.checkpoints[0].actual_rows = 0; // Reset for new test
1600        config.record_checkpoint("filter", 500);
1601        config.checkpoints[0].actual_rows = 500;
1602
1603        let trigger = config.should_reoptimize();
1604        assert!(trigger.is_some());
1605        assert_eq!(trigger.unwrap().id, "filter");
1606    }
1607
1608    #[test]
1609    fn test_adaptive_execution_config_mark_triggered() {
1610        let mut config = AdaptivePipelineBuilder::new()
1611            .with_checkpoint("filter", 0, 100.0)
1612            .build();
1613
1614        assert!(!config.checkpoints[0].triggered);
1615
1616        config.mark_triggered("filter");
1617
1618        assert!(config.checkpoints[0].triggered);
1619    }
1620
1621    #[test]
1622    fn test_adaptive_event_callback() {
1623        use std::sync::atomic::AtomicUsize;
1624
1625        let event_count = Arc::new(AtomicUsize::new(0));
1626        let counter = event_count.clone();
1627
1628        let mut config = AdaptivePipelineBuilder::new()
1629            .with_checkpoint("filter", 0, 100.0)
1630            .with_event_callback(Box::new(move |_event| {
1631                counter.fetch_add(1, Ordering::Relaxed);
1632            }))
1633            .build();
1634
1635        config.record_checkpoint("filter", 500);
1636
1637        // Should have received one CheckpointReached event
1638        assert_eq!(event_count.load(Ordering::Relaxed), 1);
1639
1640        config.mark_triggered("filter");
1641
1642        // Should have received one ReoptimizationTriggered event
1643        assert_eq!(event_count.load(Ordering::Relaxed), 2);
1644    }
1645
1646    #[test]
1647    fn test_adaptive_checkpoint_with_zero_estimate() {
1648        let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1649        cp.record_rows(100);
1650
1651        // Zero estimate should trigger if any rows are seen
1652        assert!(cp.exceeds_threshold(2.0, 50));
1653    }
1654}