Skip to main content

grafeo_core/execution/
adaptive.rs

1//! Adaptive query execution with runtime cardinality feedback.
2//!
3//! This module provides adaptive execution capabilities that allow the query
4//! engine to adjust its execution strategy based on actual runtime cardinalities.
5//!
6//! The key insight is that cardinality estimates can be significantly wrong,
7//! especially for complex predicates or skewed data. By tracking actual row
8//! counts during execution, we can detect when our estimates are off and
9//! potentially re-optimize the remaining query plan.
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
15//! │  Optimizer  │────>│ Estimated Cards  │     │ CardinalityFeed │
16//! └─────────────┘     └──────────────────┘     │     back        │
17//!                              │               └────────┬────────┘
18//!                              v                        │
19//!                     ┌──────────────────┐              │
20//!                     │ AdaptiveContext  │<─────────────┘
21//!                     └────────┬─────────┘
22//!                              │
23//!                     ┌────────v─────────┐
24//!                     │ ReoptimizeCheck  │
25//!                     └──────────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```rust
31//! use grafeo_core::execution::adaptive::{AdaptiveContext, CardinalityCheckpoint};
32//!
33//! // Set up adaptive context with estimated cardinalities
34//! let mut ctx = AdaptiveContext::new();
35//! ctx.set_estimate("scan_1", 1000.0);
36//! ctx.set_estimate("filter_1", 100.0);  // Expected 10% selectivity
37//!
38//! // During execution, record actuals
39//! ctx.record_actual("scan_1", 1000);
40//! ctx.record_actual("filter_1", 500);   // Actual 50% selectivity!
41//!
42//! // Check if re-optimization is warranted
43//! if ctx.should_reoptimize() {
44//!     // Re-plan remaining operators with updated statistics
45//! }
46//! ```
47
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56/// Threshold for deviation that triggers re-optimization consideration.
57/// A value of 2.0 means actual cardinality is 2x or 0.5x the estimate.
58pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60/// Minimum number of rows before considering re-optimization.
61/// Helps avoid thrashing on small result sets.
62pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64/// A checkpoint for tracking cardinality at a specific point in the plan.
65#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67    /// Unique identifier for this checkpoint (typically operator name/id).
68    pub operator_id: String,
69    /// Estimated cardinality from the optimizer.
70    pub estimated: f64,
71    /// Actual row count observed during execution.
72    pub actual: u64,
73    /// Whether this checkpoint has been recorded.
74    pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78    /// Creates a new checkpoint with an estimate.
79    #[must_use]
80    pub fn new(operator_id: &str, estimated: f64) -> Self {
81        Self {
82            operator_id: operator_id.to_string(),
83            estimated,
84            actual: 0,
85            recorded: false,
86        }
87    }
88
89    /// Records the actual cardinality.
90    pub fn record(&mut self, actual: u64) {
91        self.actual = actual;
92        self.recorded = true;
93    }
94
95    /// Returns the deviation ratio (actual / estimated).
96    ///
97    /// A ratio > 1.0 means we underestimated, < 1.0 means overestimated.
98    /// Returns 1.0 if estimate is 0 to avoid division by zero.
99    #[must_use]
100    pub fn deviation_ratio(&self) -> f64 {
101        if self.estimated <= 0.0 {
102            return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103        }
104        self.actual as f64 / self.estimated
105    }
106
107    /// Returns the absolute deviation (|actual - estimated|).
108    #[must_use]
109    pub fn absolute_deviation(&self) -> f64 {
110        (self.actual as f64 - self.estimated).abs()
111    }
112
113    /// Checks if this checkpoint shows significant deviation.
114    #[must_use]
115    pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116        if !self.recorded {
117            return false;
118        }
119        let ratio = self.deviation_ratio();
120        ratio > threshold || ratio < 1.0 / threshold
121    }
122}
123
124/// Feedback from runtime execution about actual cardinalities.
125///
126/// This struct collects actual row counts at various points during query
127/// execution, allowing comparison with optimizer estimates.
128#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130    /// Actual row counts by operator ID.
131    actuals: HashMap<String, u64>,
132    /// Running count (for streaming updates).
133    running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137    /// Creates a new empty feedback collector.
138    #[must_use]
139    pub fn new() -> Self {
140        Self {
141            actuals: HashMap::new(),
142            running_counts: HashMap::new(),
143        }
144    }
145
146    /// Records the final actual cardinality for an operator.
147    pub fn record(&mut self, operator_id: &str, count: u64) {
148        self.actuals.insert(operator_id.to_string(), count);
149    }
150
151    /// Adds to the running count for an operator (thread-safe).
152    pub fn add_rows(&self, operator_id: &str, count: u64) {
153        if let Some(counter) = self.running_counts.get(operator_id) {
154            counter.fetch_add(count, Ordering::Relaxed);
155        }
156    }
157
158    /// Initializes a running counter for an operator.
159    pub fn init_counter(&mut self, operator_id: &str) {
160        self.running_counts
161            .insert(operator_id.to_string(), AtomicU64::new(0));
162    }
163
164    /// Finalizes the running count into the actuals.
165    pub fn finalize_counter(&mut self, operator_id: &str) {
166        if let Some(counter) = self.running_counts.get(operator_id) {
167            let count = counter.load(Ordering::Relaxed);
168            self.actuals.insert(operator_id.to_string(), count);
169        }
170    }
171
172    /// Gets the actual count for an operator.
173    #[must_use]
174    pub fn get(&self, operator_id: &str) -> Option<u64> {
175        self.actuals.get(operator_id).copied()
176    }
177
178    /// Gets the current running count for an operator.
179    #[must_use]
180    pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181        self.running_counts
182            .get(operator_id)
183            .map(|c| c.load(Ordering::Relaxed))
184    }
185
186    /// Returns all recorded actuals.
187    #[must_use]
188    pub fn all_actuals(&self) -> &HashMap<String, u64> {
189        &self.actuals
190    }
191}
192
193/// Context for adaptive query execution.
194///
195/// Holds both estimated and actual cardinalities, and provides methods
196/// to determine when re-optimization should occur.
197#[derive(Debug)]
198pub struct AdaptiveContext {
199    /// Checkpoints with estimates and actuals.
200    checkpoints: HashMap<String, CardinalityCheckpoint>,
201    /// Threshold ratio for triggering re-optimization.
202    reoptimization_threshold: f64,
203    /// Minimum rows before considering re-optimization.
204    min_rows: u64,
205    /// Whether re-optimization has been triggered.
206    reoptimization_triggered: bool,
207    /// Operator that caused re-optimization (if any).
208    trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212    /// Creates a new adaptive context with default settings.
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            checkpoints: HashMap::new(),
217            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218            min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219            reoptimization_triggered: false,
220            trigger_operator: None,
221        }
222    }
223
224    /// Creates a context with custom thresholds.
225    #[must_use]
226    pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227        Self {
228            checkpoints: HashMap::new(),
229            reoptimization_threshold: threshold,
230            min_rows,
231            reoptimization_triggered: false,
232            trigger_operator: None,
233        }
234    }
235
236    /// Sets the estimated cardinality for an operator.
237    pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238        self.checkpoints.insert(
239            operator_id.to_string(),
240            CardinalityCheckpoint::new(operator_id, estimate),
241        );
242    }
243
244    /// Records the actual cardinality for an operator.
245    pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246        if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247            checkpoint.record(actual);
248        } else {
249            // Create checkpoint with unknown estimate
250            let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251            checkpoint.record(actual);
252            self.checkpoints.insert(operator_id.to_string(), checkpoint);
253        }
254    }
255
256    /// Applies feedback from a `CardinalityFeedback` collector.
257    pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258        for (op_id, &actual) in feedback.all_actuals() {
259            self.record_actual(op_id, actual);
260        }
261    }
262
263    /// Checks if any checkpoint shows significant deviation.
264    #[must_use]
265    pub fn has_significant_deviation(&self) -> bool {
266        self.checkpoints
267            .values()
268            .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269    }
270
271    /// Determines if re-optimization should be triggered.
272    ///
273    /// Returns true if:
274    /// - There's significant deviation from estimates
275    /// - We've processed enough rows to make a meaningful decision
276    /// - Re-optimization hasn't already been triggered
277    #[must_use]
278    pub fn should_reoptimize(&mut self) -> bool {
279        if self.reoptimization_triggered {
280            return false;
281        }
282
283        for (op_id, checkpoint) in &self.checkpoints {
284            if checkpoint.actual < self.min_rows {
285                continue;
286            }
287
288            if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289                self.reoptimization_triggered = true;
290                self.trigger_operator = Some(op_id.clone());
291                return true;
292            }
293        }
294
295        false
296    }
297
298    /// Returns the operator that triggered re-optimization, if any.
299    #[must_use]
300    pub fn trigger_operator(&self) -> Option<&str> {
301        self.trigger_operator.as_deref()
302    }
303
304    /// Gets the checkpoint for an operator.
305    #[must_use]
306    pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307        self.checkpoints.get(operator_id)
308    }
309
310    /// Returns all checkpoints.
311    #[must_use]
312    pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313        &self.checkpoints
314    }
315
316    /// Calculates a correction factor for a specific operator.
317    ///
318    /// This factor can be used to adjust remaining cardinality estimates.
319    #[must_use]
320    pub fn correction_factor(&self, operator_id: &str) -> f64 {
321        self.checkpoints
322            .get(operator_id)
323            .filter(|cp| cp.recorded)
324            .map_or(1.0, CardinalityCheckpoint::deviation_ratio)
325    }
326
327    /// Returns summary statistics about the adaptive execution.
328    #[must_use]
329    pub fn summary(&self) -> AdaptiveSummary {
330        let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
331        let deviation_count = self
332            .checkpoints
333            .values()
334            .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
335            .count();
336
337        let avg_deviation = if recorded_count > 0 {
338            self.checkpoints
339                .values()
340                .filter(|cp| cp.recorded)
341                .map(CardinalityCheckpoint::deviation_ratio)
342                .sum::<f64>()
343                / recorded_count as f64
344        } else {
345            1.0
346        };
347
348        let max_deviation = self
349            .checkpoints
350            .values()
351            .filter(|cp| cp.recorded)
352            .map(|cp| {
353                let ratio = cp.deviation_ratio();
354                if ratio > 1.0 { ratio } else { 1.0 / ratio }
355            })
356            .fold(1.0_f64, f64::max);
357
358        AdaptiveSummary {
359            checkpoint_count: self.checkpoints.len(),
360            recorded_count,
361            deviation_count,
362            avg_deviation_ratio: avg_deviation,
363            max_deviation_ratio: max_deviation,
364            reoptimization_triggered: self.reoptimization_triggered,
365            trigger_operator: self.trigger_operator.clone(),
366        }
367    }
368
369    /// Resets the context for a new execution.
370    pub fn reset(&mut self) {
371        for checkpoint in self.checkpoints.values_mut() {
372            checkpoint.actual = 0;
373            checkpoint.recorded = false;
374        }
375        self.reoptimization_triggered = false;
376        self.trigger_operator = None;
377    }
378}
379
380impl Default for AdaptiveContext {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386/// Summary of adaptive execution statistics.
387#[derive(Debug, Clone, Default)]
388pub struct AdaptiveSummary {
389    /// Total number of checkpoints.
390    pub checkpoint_count: usize,
391    /// Number of checkpoints with recorded actuals.
392    pub recorded_count: usize,
393    /// Number of checkpoints with significant deviation.
394    pub deviation_count: usize,
395    /// Average deviation ratio across all recorded checkpoints.
396    pub avg_deviation_ratio: f64,
397    /// Maximum deviation ratio observed.
398    pub max_deviation_ratio: f64,
399    /// Whether re-optimization was triggered.
400    pub reoptimization_triggered: bool,
401    /// Operator that triggered re-optimization.
402    pub trigger_operator: Option<String>,
403}
404
405/// Thread-safe wrapper for `AdaptiveContext`.
406///
407/// Allows multiple operators to report cardinalities concurrently.
408#[derive(Debug, Clone)]
409pub struct SharedAdaptiveContext {
410    inner: Arc<RwLock<AdaptiveContext>>,
411}
412
413impl SharedAdaptiveContext {
414    /// Creates a new shared context.
415    #[must_use]
416    pub fn new() -> Self {
417        Self {
418            inner: Arc::new(RwLock::new(AdaptiveContext::new())),
419        }
420    }
421
422    /// Creates from an existing context.
423    #[must_use]
424    pub fn from_context(ctx: AdaptiveContext) -> Self {
425        Self {
426            inner: Arc::new(RwLock::new(ctx)),
427        }
428    }
429
430    /// Records actual cardinality for an operator.
431    pub fn record_actual(&self, operator_id: &str, actual: u64) {
432        if let Ok(mut ctx) = self.inner.write() {
433            ctx.record_actual(operator_id, actual);
434        }
435    }
436
437    /// Checks if re-optimization should be triggered.
438    #[must_use]
439    pub fn should_reoptimize(&self) -> bool {
440        if let Ok(mut ctx) = self.inner.write() {
441            ctx.should_reoptimize()
442        } else {
443            false
444        }
445    }
446
447    /// Gets a read-only snapshot of the context.
448    #[must_use]
449    pub fn snapshot(&self) -> Option<AdaptiveContext> {
450        self.inner.read().ok().map(|guard| AdaptiveContext {
451            checkpoints: guard.checkpoints.clone(),
452            reoptimization_threshold: guard.reoptimization_threshold,
453            min_rows: guard.min_rows,
454            reoptimization_triggered: guard.reoptimization_triggered,
455            trigger_operator: guard.trigger_operator.clone(),
456        })
457    }
458}
459
460impl Default for SharedAdaptiveContext {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466/// A wrapper operator that tracks cardinality and reports to an adaptive context.
467///
468/// This wraps any `PushOperator` and counts the rows flowing through it,
469/// reporting the count to the adaptive context.
470pub struct CardinalityTrackingOperator {
471    /// The wrapped operator.
472    inner: Box<dyn PushOperator>,
473    /// Operator identifier for reporting.
474    operator_id: String,
475    /// Row counter.
476    row_count: u64,
477    /// Shared adaptive context for reporting.
478    context: SharedAdaptiveContext,
479}
480
481impl CardinalityTrackingOperator {
482    /// Creates a new tracking wrapper.
483    pub fn new(
484        inner: Box<dyn PushOperator>,
485        operator_id: &str,
486        context: SharedAdaptiveContext,
487    ) -> Self {
488        Self {
489            inner,
490            operator_id: operator_id.to_string(),
491            row_count: 0,
492            context,
493        }
494    }
495
496    /// Returns the current row count.
497    #[must_use]
498    pub fn current_count(&self) -> u64 {
499        self.row_count
500    }
501}
502
503impl PushOperator for CardinalityTrackingOperator {
504    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
505        // Track input rows
506        self.row_count += chunk.len() as u64;
507
508        // Push through to inner operator
509        self.inner.push(chunk, sink)
510    }
511
512    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
513        // Report final cardinality to context
514        self.context
515            .record_actual(&self.operator_id, self.row_count);
516
517        // Finalize inner operator
518        self.inner.finalize(sink)
519    }
520
521    fn preferred_chunk_size(&self) -> ChunkSizeHint {
522        self.inner.preferred_chunk_size()
523    }
524
525    fn name(&self) -> &'static str {
526        // Return the inner operator's name
527        self.inner.name()
528    }
529}
530
531/// A sink that tracks cardinality of data flowing through it.
532pub struct CardinalityTrackingSink {
533    /// The wrapped sink.
534    inner: Box<dyn Sink>,
535    /// Operator identifier for reporting.
536    operator_id: String,
537    /// Row counter.
538    row_count: u64,
539    /// Shared adaptive context for reporting.
540    context: SharedAdaptiveContext,
541}
542
543impl CardinalityTrackingSink {
544    /// Creates a new tracking sink wrapper.
545    pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
546        Self {
547            inner,
548            operator_id: operator_id.to_string(),
549            row_count: 0,
550            context,
551        }
552    }
553
554    /// Returns the current row count.
555    #[must_use]
556    pub fn current_count(&self) -> u64 {
557        self.row_count
558    }
559}
560
561impl Sink for CardinalityTrackingSink {
562    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
563        self.row_count += chunk.len() as u64;
564        self.inner.consume(chunk)
565    }
566
567    fn finalize(&mut self) -> Result<(), OperatorError> {
568        // Report final cardinality
569        self.context
570            .record_actual(&self.operator_id, self.row_count);
571        self.inner.finalize()
572    }
573
574    fn name(&self) -> &'static str {
575        self.inner.name()
576    }
577}
578
579/// Decision about whether to re-optimize a query.
580#[derive(Debug, Clone, PartialEq)]
581pub enum ReoptimizationDecision {
582    /// Continue with current plan.
583    Continue,
584    /// Re-optimize the remaining plan.
585    Reoptimize {
586        /// The operator that triggered re-optimization.
587        trigger: String,
588        /// Correction factors to apply to remaining estimates.
589        corrections: HashMap<String, f64>,
590    },
591    /// Abort the query (catastrophic misestimate).
592    Abort {
593        /// The reason for aborting the query.
594        reason: String,
595    },
596}
597
598/// Evaluates whether re-optimization should occur based on context.
599#[must_use]
600pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
601    let summary = ctx.summary();
602
603    // If no significant deviations, continue
604    if !summary.reoptimization_triggered {
605        return ReoptimizationDecision::Continue;
606    }
607
608    // If deviation is catastrophic (>100x), consider aborting
609    if summary.max_deviation_ratio > 100.0 {
610        return ReoptimizationDecision::Abort {
611            reason: format!(
612                "Catastrophic cardinality misestimate: {}x deviation",
613                summary.max_deviation_ratio
614            ),
615        };
616    }
617
618    // Build correction factors
619    let corrections: HashMap<String, f64> = ctx
620        .all_checkpoints()
621        .iter()
622        .filter(|(_, cp)| cp.recorded)
623        .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
624        .collect();
625
626    ReoptimizationDecision::Reoptimize {
627        trigger: summary.trigger_operator.unwrap_or_default(),
628        corrections,
629    }
630}
631
632/// Callback for creating a new plan based on observed cardinalities.
633///
634/// This is called when the adaptive pipeline detects significant deviation
635/// and decides to re-optimize. The callback receives the adaptive context
636/// with recorded actuals and should return a new set of operators.
637pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
638
639/// Configuration for adaptive pipeline execution.
640#[derive(Debug, Clone)]
641pub struct AdaptivePipelineConfig {
642    /// Number of rows to process before checking for re-optimization.
643    pub check_interval: u64,
644    /// Threshold for deviation that triggers re-optimization.
645    pub reoptimization_threshold: f64,
646    /// Minimum rows before considering re-optimization.
647    pub min_rows_for_reoptimization: u64,
648    /// Maximum number of re-optimizations allowed per query.
649    pub max_reoptimizations: usize,
650}
651
652impl Default for AdaptivePipelineConfig {
653    fn default() -> Self {
654        Self {
655            check_interval: 10_000,
656            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
657            min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
658            max_reoptimizations: 3,
659        }
660    }
661}
662
663impl AdaptivePipelineConfig {
664    /// Creates a new configuration with custom thresholds.
665    #[must_use]
666    pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
667        Self {
668            check_interval,
669            reoptimization_threshold: threshold,
670            min_rows_for_reoptimization: min_rows,
671            max_reoptimizations: 3,
672        }
673    }
674
675    /// Sets the maximum number of re-optimizations allowed.
676    #[must_use]
677    pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
678        self.max_reoptimizations = max;
679        self
680    }
681}
682
683/// Result of executing an adaptive pipeline.
684#[derive(Debug, Clone)]
685pub struct AdaptiveExecutionResult {
686    /// Total rows processed.
687    pub total_rows: u64,
688    /// Number of re-optimizations performed.
689    pub reoptimization_count: usize,
690    /// Operators that triggered re-optimization.
691    pub triggers: Vec<String>,
692    /// Final adaptive context with all recorded actuals.
693    pub final_context: AdaptiveSummary,
694}
695
696/// A checkpoint during adaptive execution where plan switching can occur.
697///
698/// Checkpoints are placed at strategic points in the pipeline (typically after
699/// filters or joins) where switching to a new plan makes sense.
700#[derive(Debug)]
701pub struct AdaptiveCheckpoint {
702    /// Unique identifier for this checkpoint.
703    pub id: String,
704    /// Operator index in the pipeline (after which this checkpoint occurs).
705    pub after_operator: usize,
706    /// Estimated cardinality at this point.
707    pub estimated_cardinality: f64,
708    /// Actual rows seen so far.
709    pub actual_rows: u64,
710    /// Whether this checkpoint has triggered re-optimization.
711    pub triggered: bool,
712}
713
714impl AdaptiveCheckpoint {
715    /// Creates a new checkpoint.
716    #[must_use]
717    pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
718        Self {
719            id: id.to_string(),
720            after_operator,
721            estimated_cardinality: estimated,
722            actual_rows: 0,
723            triggered: false,
724        }
725    }
726
727    /// Records rows passing through this checkpoint.
728    pub fn record_rows(&mut self, count: u64) {
729        self.actual_rows += count;
730    }
731
732    /// Checks if deviation exceeds threshold.
733    #[must_use]
734    pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
735        if self.actual_rows < min_rows {
736            return false;
737        }
738        if self.estimated_cardinality <= 0.0 {
739            return self.actual_rows > 0;
740        }
741        let ratio = self.actual_rows as f64 / self.estimated_cardinality;
742        ratio > threshold || ratio < 1.0 / threshold
743    }
744}
745
746/// Event emitted during adaptive execution.
747#[derive(Debug, Clone)]
748pub enum AdaptiveEvent {
749    /// A checkpoint was reached.
750    CheckpointReached {
751        /// The checkpoint identifier.
752        id: String,
753        /// The actual number of rows observed.
754        actual_rows: u64,
755        /// The estimated number of rows from the optimizer.
756        estimated: f64,
757    },
758    /// Re-optimization was triggered.
759    ReoptimizationTriggered {
760        /// The checkpoint that triggered re-optimization.
761        checkpoint_id: String,
762        /// The ratio between actual and estimated rows.
763        deviation_ratio: f64,
764    },
765    /// Plan was switched.
766    PlanSwitched {
767        /// The number of operators in the previous plan.
768        old_operator_count: usize,
769        /// The number of operators in the new plan.
770        new_operator_count: usize,
771    },
772    /// Execution completed.
773    ExecutionCompleted {
774        /// The total number of rows produced.
775        total_rows: u64,
776    },
777}
778
779/// Callback for observing adaptive execution events.
780pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
781
782/// Builder for creating adaptive pipelines.
783pub struct AdaptivePipelineBuilder {
784    checkpoints: Vec<AdaptiveCheckpoint>,
785    config: AdaptivePipelineConfig,
786    context: AdaptiveContext,
787    event_callback: Option<AdaptiveEventCallback>,
788}
789
790impl AdaptivePipelineBuilder {
791    /// Creates a new builder with default configuration.
792    #[must_use]
793    pub fn new() -> Self {
794        Self {
795            checkpoints: Vec::new(),
796            config: AdaptivePipelineConfig::default(),
797            context: AdaptiveContext::new(),
798            event_callback: None,
799        }
800    }
801
802    /// Sets the configuration.
803    #[must_use]
804    pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
805        self.config = config;
806        self
807    }
808
809    /// Adds a checkpoint at the specified operator index.
810    #[must_use]
811    pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
812        self.checkpoints
813            .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
814        self.context.set_estimate(id, estimated);
815        self
816    }
817
818    /// Sets an event callback for observing execution.
819    #[must_use]
820    pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
821        self.event_callback = Some(callback);
822        self
823    }
824
825    /// Sets estimates from a pre-configured context.
826    #[must_use]
827    pub fn with_context(mut self, context: AdaptiveContext) -> Self {
828        self.context = context;
829        self
830    }
831
832    /// Builds the configuration for adaptive execution.
833    #[must_use]
834    pub fn build(self) -> AdaptiveExecutionConfig {
835        AdaptiveExecutionConfig {
836            checkpoints: self.checkpoints,
837            config: self.config,
838            context: self.context,
839            event_callback: self.event_callback,
840        }
841    }
842}
843
844impl Default for AdaptivePipelineBuilder {
845    fn default() -> Self {
846        Self::new()
847    }
848}
849
850/// Configuration for adaptive execution, built by `AdaptivePipelineBuilder`.
851pub struct AdaptiveExecutionConfig {
852    /// Checkpoints for monitoring cardinality.
853    pub checkpoints: Vec<AdaptiveCheckpoint>,
854    /// Execution configuration.
855    pub config: AdaptivePipelineConfig,
856    /// Adaptive context with estimates.
857    pub context: AdaptiveContext,
858    /// Optional event callback.
859    pub event_callback: Option<AdaptiveEventCallback>,
860}
861
862impl AdaptiveExecutionConfig {
863    /// Returns a summary of the adaptive execution after it completes.
864    #[must_use]
865    pub fn summary(&self) -> AdaptiveSummary {
866        self.context.summary()
867    }
868
869    /// Records actual cardinality for a checkpoint.
870    pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
871        self.context.record_actual(checkpoint_id, actual);
872
873        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
874            cp.actual_rows = actual;
875        }
876
877        if let Some(ref callback) = self.event_callback {
878            let estimated = self
879                .context
880                .get_checkpoint(checkpoint_id)
881                .map_or(0.0, |cp| cp.estimated);
882            callback(AdaptiveEvent::CheckpointReached {
883                id: checkpoint_id.to_string(),
884                actual_rows: actual,
885                estimated,
886            });
887        }
888    }
889
890    /// Checks if any checkpoint exceeds the deviation threshold.
891    #[must_use]
892    pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
893        self.checkpoints.iter().find(|cp| {
894            !cp.triggered
895                && cp.exceeds_threshold(
896                    self.config.reoptimization_threshold,
897                    self.config.min_rows_for_reoptimization,
898                )
899        })
900    }
901
902    /// Marks a checkpoint as having triggered re-optimization.
903    pub fn mark_triggered(&mut self, checkpoint_id: &str) {
904        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
905            cp.triggered = true;
906        }
907
908        if let Some(ref callback) = self.event_callback {
909            let ratio = self
910                .context
911                .get_checkpoint(checkpoint_id)
912                .filter(|cp| cp.recorded)
913                .map_or(1.0, |cp| cp.deviation_ratio());
914            callback(AdaptiveEvent::ReoptimizationTriggered {
915                checkpoint_id: checkpoint_id.to_string(),
916                deviation_ratio: ratio,
917            });
918        }
919    }
920}
921
922// ============= Pull-Based Operator Tracking =============
923
924use super::operators::{Operator, OperatorResult}; // OperatorError imported above
925
926/// A wrapper that tracks cardinality for pull-based operators.
927///
928/// This wraps any `Operator` and counts the rows flowing through it,
929/// reporting the count to the adaptive context. Use this for integrating
930/// adaptive execution with the standard pull-based executor.
931pub struct CardinalityTrackingWrapper {
932    /// The wrapped operator.
933    inner: Box<dyn Operator>,
934    /// Operator identifier for reporting.
935    operator_id: String,
936    /// Row counter.
937    row_count: u64,
938    /// Shared adaptive context for reporting.
939    context: SharedAdaptiveContext,
940    /// Whether finalization has been reported.
941    finalized: bool,
942}
943
944impl CardinalityTrackingWrapper {
945    /// Creates a new tracking wrapper for a pull-based operator.
946    pub fn new(
947        inner: Box<dyn Operator>,
948        operator_id: &str,
949        context: SharedAdaptiveContext,
950    ) -> Self {
951        Self {
952            inner,
953            operator_id: operator_id.to_string(),
954            row_count: 0,
955            context,
956            finalized: false,
957        }
958    }
959
960    /// Returns the current row count.
961    #[must_use]
962    pub fn current_count(&self) -> u64 {
963        self.row_count
964    }
965
966    /// Reports the final cardinality to the context.
967    fn report_final(&mut self) {
968        if !self.finalized {
969            self.context
970                .record_actual(&self.operator_id, self.row_count);
971            self.finalized = true;
972        }
973    }
974}
975
976impl Operator for CardinalityTrackingWrapper {
977    fn next(&mut self) -> OperatorResult {
978        match self.inner.next() {
979            Ok(Some(chunk)) => {
980                // Track rows
981                self.row_count += chunk.row_count() as u64;
982                Ok(Some(chunk))
983            }
984            Ok(None) => {
985                // Stream exhausted - report final cardinality
986                self.report_final();
987                Ok(None)
988            }
989            Err(e) => {
990                // Report on error too
991                self.report_final();
992                Err(e)
993            }
994        }
995    }
996
997    fn reset(&mut self) {
998        self.row_count = 0;
999        self.finalized = false;
1000        self.inner.reset();
1001    }
1002
1003    fn name(&self) -> &'static str {
1004        self.inner.name()
1005    }
1006}
1007
1008impl Drop for CardinalityTrackingWrapper {
1009    fn drop(&mut self) {
1010        // Ensure we report even if dropped early
1011        self.report_final();
1012    }
1013}
1014
1015// ============= Adaptive Pipeline Execution =============
1016
1017use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; // Sink imported above
1018use super::sink::CollectorSink;
1019use super::source::OperatorSource;
1020
1021/// High-level adaptive pipeline that executes a pull-based operator with
1022/// cardinality tracking using push-based infrastructure.
1023///
1024/// This bridges the pull-based planner output with push-based execution:
1025/// 1. Wraps the pull operator as an `OperatorSource`
1026/// 2. Uses `CardinalityTrackingSink` to track output cardinality
1027/// 3. Provides adaptive feedback through `AdaptiveContext`
1028///
1029/// # Example
1030///
1031/// ```no_run
1032/// # use grafeo_core::execution::adaptive::{AdaptiveContext, AdaptivePipelineExecutor};
1033/// # use grafeo_core::execution::operators::Operator;
1034/// # fn example(operator: Box<dyn Operator>, adaptive_context: AdaptiveContext) -> Result<(), Box<dyn std::error::Error>> {
1035/// let executor = AdaptivePipelineExecutor::new(operator, adaptive_context);
1036/// let (chunks, summary) = executor.execute()?;
1037/// # Ok(())
1038/// # }
1039/// ```
1040pub struct AdaptivePipelineExecutor {
1041    source: OperatorSource,
1042    context: SharedAdaptiveContext,
1043    config: AdaptivePipelineConfig,
1044}
1045
1046impl AdaptivePipelineExecutor {
1047    /// Creates a new adaptive pipeline executor.
1048    ///
1049    /// # Arguments
1050    ///
1051    /// * `operator` - The pull-based operator to execute
1052    /// * `context` - Adaptive context with cardinality estimates
1053    pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1054        Self {
1055            source: OperatorSource::new(operator),
1056            context: SharedAdaptiveContext::from_context(context),
1057            config: AdaptivePipelineConfig::default(),
1058        }
1059    }
1060
1061    /// Creates an executor with custom configuration.
1062    pub fn with_config(
1063        operator: Box<dyn Operator>,
1064        context: AdaptiveContext,
1065        config: AdaptivePipelineConfig,
1066    ) -> Self {
1067        Self {
1068            source: OperatorSource::new(operator),
1069            context: SharedAdaptiveContext::from_context(context),
1070            config,
1071        }
1072    }
1073
1074    /// Executes the pipeline and returns collected chunks with adaptive summary.
1075    ///
1076    /// # Returns
1077    ///
1078    /// A tuple of (collected DataChunks, adaptive execution summary).
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if execution fails.
1083    pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1084        let mut sink = CardinalityTrackingSink::new(
1085            Box::new(CollectorSink::new()),
1086            "output",
1087            self.context.clone(),
1088        );
1089
1090        let chunk_size = DEFAULT_CHUNK_SIZE;
1091        let mut total_rows: u64 = 0;
1092        let check_interval = self.config.check_interval;
1093
1094        // Process all chunks from source
1095        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1096            let chunk_rows = chunk.len() as u64;
1097            total_rows += chunk_rows;
1098
1099            // Push to tracking sink
1100            let continue_exec = sink.consume(chunk)?;
1101            if !continue_exec {
1102                break;
1103            }
1104
1105            // Periodically check for reoptimization need
1106            if total_rows >= check_interval
1107                && total_rows.is_multiple_of(check_interval)
1108                && self.context.should_reoptimize()
1109            {
1110                // Log or emit event that reoptimization would be triggered
1111                // Full re-planning would happen at a higher level
1112            }
1113        }
1114
1115        // Finalize sink
1116        sink.finalize()?;
1117
1118        // Extract results from the inner sink
1119        let summary = self
1120            .context
1121            .snapshot()
1122            .map(|ctx| ctx.summary())
1123            .unwrap_or_default();
1124
1125        // Get collected chunks from the inner CollectorSink
1126        // Note: We need to extract chunks from the wrapped sink
1127        // For now, we'll return the summary and the caller can collect separately
1128        Ok((Vec::new(), summary))
1129    }
1130
1131    /// Executes and collects all results into DataChunks.
1132    ///
1133    /// This is a simpler interface that handles chunk collection internally.
1134    pub fn execute_collecting(
1135        mut self,
1136    ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1137        let mut chunks = Vec::new();
1138        let chunk_size = DEFAULT_CHUNK_SIZE;
1139        let mut total_rows: u64 = 0;
1140        let check_interval = self.config.check_interval;
1141
1142        // Process all chunks from source
1143        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1144            let chunk_rows = chunk.len() as u64;
1145            total_rows += chunk_rows;
1146
1147            // Record cardinality
1148            self.context.record_actual("root", total_rows);
1149
1150            // Collect the chunk
1151            if !chunk.is_empty() {
1152                chunks.push(chunk);
1153            }
1154
1155            // Periodically check for reoptimization
1156            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1157                let _ = self.context.should_reoptimize();
1158            }
1159        }
1160
1161        let summary = self
1162            .context
1163            .snapshot()
1164            .map(|ctx| ctx.summary())
1165            .unwrap_or_default();
1166
1167        Ok((chunks, summary))
1168    }
1169
1170    /// Returns a reference to the shared context for external monitoring.
1171    pub fn context(&self) -> &SharedAdaptiveContext {
1172        &self.context
1173    }
1174}
1175
1176/// Convenience function to execute a pull-based operator with adaptive tracking.
1177///
1178/// This is the recommended entry point for adaptive execution.
1179///
1180/// # Arguments
1181///
1182/// * `operator` - The pull-based operator to execute
1183/// * `context` - Adaptive context with cardinality estimates (or None for default)
1184/// * `config` - Optional configuration (uses defaults if None)
1185///
1186/// # Returns
1187///
1188/// A tuple of (collected DataChunks, adaptive summary if tracking was enabled).
1189pub fn execute_adaptive(
1190    operator: Box<dyn Operator>,
1191    context: Option<AdaptiveContext>,
1192    config: Option<AdaptivePipelineConfig>,
1193) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1194    let ctx = context.unwrap_or_default();
1195    let cfg = config.unwrap_or_default();
1196
1197    let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1198    let (chunks, summary) = executor.execute_collecting()?;
1199
1200    Ok((chunks, Some(summary)))
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205    use super::*;
1206
1207    #[test]
1208    fn test_checkpoint_deviation_ratio() {
1209        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1210        cp.record(200);
1211
1212        // Actual is 2x estimate
1213        assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1214    }
1215
1216    #[test]
1217    fn test_checkpoint_underestimate() {
1218        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1219        cp.record(500);
1220
1221        // Underestimated by 5x
1222        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1223        assert!(cp.is_significant_deviation(3.0));
1224    }
1225
1226    #[test]
1227    fn test_checkpoint_overestimate() {
1228        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1229        cp.record(20);
1230
1231        // Overestimated - actual is 0.2x estimate
1232        assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1233        assert!(cp.is_significant_deviation(3.0)); // 0.2 < 1/3
1234    }
1235
1236    #[test]
1237    fn test_checkpoint_accurate() {
1238        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1239        cp.record(110);
1240
1241        // Close to estimate
1242        assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1243        assert!(!cp.is_significant_deviation(3.0)); // 1.1 is within threshold
1244    }
1245
1246    #[test]
1247    fn test_checkpoint_zero_estimate() {
1248        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1249        cp.record(100);
1250
1251        // Zero estimate with actual rows = infinity
1252        assert!(cp.deviation_ratio().is_infinite());
1253    }
1254
1255    #[test]
1256    fn test_checkpoint_zero_both() {
1257        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1258        cp.record(0);
1259
1260        // Zero estimate and zero actual = ratio of 1.0
1261        assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1262    }
1263
1264    #[test]
1265    fn test_feedback_collection() {
1266        let mut feedback = CardinalityFeedback::new();
1267        feedback.record("scan_1", 1000);
1268        feedback.record("filter_1", 100);
1269
1270        assert_eq!(feedback.get("scan_1"), Some(1000));
1271        assert_eq!(feedback.get("filter_1"), Some(100));
1272        assert_eq!(feedback.get("unknown"), None);
1273    }
1274
1275    #[test]
1276    fn test_feedback_running_counter() {
1277        let mut feedback = CardinalityFeedback::new();
1278        feedback.init_counter("op_1");
1279
1280        feedback.add_rows("op_1", 100);
1281        feedback.add_rows("op_1", 200);
1282        feedback.add_rows("op_1", 50);
1283
1284        assert_eq!(feedback.get_running("op_1"), Some(350));
1285
1286        feedback.finalize_counter("op_1");
1287        assert_eq!(feedback.get("op_1"), Some(350));
1288    }
1289
1290    #[test]
1291    fn test_adaptive_context_basic() {
1292        let mut ctx = AdaptiveContext::new();
1293        ctx.set_estimate("scan", 1000.0);
1294        ctx.set_estimate("filter", 100.0);
1295
1296        ctx.record_actual("scan", 1000);
1297        ctx.record_actual("filter", 500); // 5x underestimate
1298
1299        let cp = ctx.get_checkpoint("filter").unwrap();
1300        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1301    }
1302
1303    #[test]
1304    fn test_adaptive_context_should_reoptimize() {
1305        let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1306        ctx.set_estimate("scan", 10000.0);
1307        ctx.set_estimate("filter", 1000.0);
1308
1309        ctx.record_actual("scan", 10000);
1310        ctx.record_actual("filter", 5000); // 5x underestimate
1311
1312        assert!(ctx.should_reoptimize());
1313        assert_eq!(ctx.trigger_operator(), Some("filter"));
1314
1315        // Second call should return false (already triggered)
1316        assert!(!ctx.should_reoptimize());
1317    }
1318
1319    #[test]
1320    fn test_adaptive_context_min_rows() {
1321        let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1322        ctx.set_estimate("filter", 100.0);
1323        ctx.record_actual("filter", 500); // 5x, but only 500 rows
1324
1325        // Should not trigger because we haven't seen enough rows
1326        assert!(!ctx.should_reoptimize());
1327    }
1328
1329    #[test]
1330    fn test_adaptive_context_no_deviation() {
1331        let mut ctx = AdaptiveContext::new();
1332        ctx.set_estimate("scan", 1000.0);
1333        ctx.record_actual("scan", 1100); // Close to estimate
1334
1335        assert!(!ctx.has_significant_deviation());
1336        assert!(!ctx.should_reoptimize());
1337    }
1338
1339    #[test]
1340    fn test_adaptive_context_correction_factor() {
1341        let mut ctx = AdaptiveContext::new();
1342        ctx.set_estimate("filter", 100.0);
1343        ctx.record_actual("filter", 300);
1344
1345        assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1346        assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1347    }
1348
1349    #[test]
1350    fn test_adaptive_context_apply_feedback() {
1351        let mut ctx = AdaptiveContext::new();
1352        ctx.set_estimate("scan", 1000.0);
1353        ctx.set_estimate("filter", 100.0);
1354
1355        let mut feedback = CardinalityFeedback::new();
1356        feedback.record("scan", 1000);
1357        feedback.record("filter", 500);
1358
1359        ctx.apply_feedback(&feedback);
1360
1361        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1362        assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1363    }
1364
1365    #[test]
1366    fn test_adaptive_summary() {
1367        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1368        ctx.set_estimate("op1", 100.0);
1369        ctx.set_estimate("op2", 200.0);
1370        ctx.set_estimate("op3", 300.0);
1371
1372        ctx.record_actual("op1", 100); // Exact
1373        ctx.record_actual("op2", 600); // 3x
1374
1375        // Trigger reoptimization
1376        let _ = ctx.should_reoptimize();
1377
1378        let summary = ctx.summary();
1379        assert_eq!(summary.checkpoint_count, 3);
1380        assert_eq!(summary.recorded_count, 2);
1381        assert_eq!(summary.deviation_count, 1);
1382        assert!(summary.reoptimization_triggered);
1383    }
1384
1385    #[test]
1386    fn test_adaptive_context_reset() {
1387        let mut ctx = AdaptiveContext::new();
1388        ctx.set_estimate("scan", 1000.0);
1389        ctx.record_actual("scan", 5000);
1390        let _ = ctx.should_reoptimize(); // Trigger
1391
1392        assert!(ctx.reoptimization_triggered);
1393
1394        ctx.reset();
1395
1396        assert!(!ctx.reoptimization_triggered);
1397        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1398        assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1399        // Estimate should be preserved
1400        assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1401    }
1402
1403    #[test]
1404    fn test_shared_context() {
1405        let ctx = SharedAdaptiveContext::new();
1406
1407        ctx.record_actual("op1", 1000);
1408
1409        let snapshot = ctx.snapshot().unwrap();
1410        assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1411    }
1412
1413    #[test]
1414    fn test_reoptimization_decision_continue() {
1415        let mut ctx = AdaptiveContext::new();
1416        ctx.set_estimate("scan", 1000.0);
1417        ctx.record_actual("scan", 1100);
1418
1419        let decision = evaluate_reoptimization(&ctx);
1420        assert_eq!(decision, ReoptimizationDecision::Continue);
1421    }
1422
1423    #[test]
1424    fn test_reoptimization_decision_reoptimize() {
1425        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1426        ctx.set_estimate("filter", 100.0);
1427        ctx.record_actual("filter", 500);
1428        let _ = ctx.should_reoptimize(); // Trigger
1429
1430        let decision = evaluate_reoptimization(&ctx);
1431
1432        if let ReoptimizationDecision::Reoptimize {
1433            trigger,
1434            corrections,
1435        } = decision
1436        {
1437            assert_eq!(trigger, "filter");
1438            assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1439        } else {
1440            panic!("Expected Reoptimize decision");
1441        }
1442    }
1443
1444    #[test]
1445    fn test_reoptimization_decision_abort() {
1446        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1447        ctx.set_estimate("filter", 1.0);
1448        ctx.record_actual("filter", 1000); // 1000x deviation!
1449        let _ = ctx.should_reoptimize();
1450
1451        let decision = evaluate_reoptimization(&ctx);
1452
1453        if let ReoptimizationDecision::Abort { reason } = decision {
1454            assert!(reason.contains("Catastrophic"));
1455        } else {
1456            panic!("Expected Abort decision");
1457        }
1458    }
1459
1460    #[test]
1461    fn test_absolute_deviation() {
1462        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1463        cp.record(150);
1464
1465        assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1466    }
1467
1468    // ============= Plan Switching Tests =============
1469
1470    #[test]
1471    fn test_adaptive_checkpoint_basic() {
1472        let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1473        assert_eq!(cp.actual_rows, 0);
1474        assert!(!cp.triggered);
1475
1476        cp.record_rows(50);
1477        assert_eq!(cp.actual_rows, 50);
1478
1479        cp.record_rows(100);
1480        assert_eq!(cp.actual_rows, 150);
1481    }
1482
1483    #[test]
1484    fn test_adaptive_checkpoint_exceeds_threshold() {
1485        let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1486
1487        // Below min rows
1488        cp.record_rows(50);
1489        assert!(!cp.exceeds_threshold(2.0, 100));
1490
1491        // Above min rows but within threshold
1492        cp.record_rows(50);
1493        assert!(!cp.exceeds_threshold(2.0, 100)); // 100 actual vs 100 estimated = 1.0x
1494
1495        // Above threshold (underestimate)
1496        cp.actual_rows = 0;
1497        cp.record_rows(500);
1498        assert!(cp.exceeds_threshold(2.0, 100)); // 500 actual vs 100 estimated = 5.0x
1499
1500        // Above threshold (overestimate)
1501        let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1502        cp2.record_rows(200);
1503        assert!(cp2.exceeds_threshold(2.0, 100)); // 200 actual vs 1000 estimated = 0.2x
1504    }
1505
1506    #[test]
1507    fn test_adaptive_pipeline_config_default() {
1508        let config = AdaptivePipelineConfig::default();
1509
1510        assert_eq!(config.check_interval, 10_000);
1511        assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1512        assert_eq!(
1513            config.min_rows_for_reoptimization,
1514            MIN_ROWS_FOR_REOPTIMIZATION
1515        );
1516        assert_eq!(config.max_reoptimizations, 3);
1517    }
1518
1519    #[test]
1520    fn test_adaptive_pipeline_config_custom() {
1521        let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1522
1523        assert_eq!(config.check_interval, 5000);
1524        assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1525        assert_eq!(config.min_rows_for_reoptimization, 500);
1526        assert_eq!(config.max_reoptimizations, 5);
1527    }
1528
1529    #[test]
1530    fn test_adaptive_pipeline_builder() {
1531        let config = AdaptivePipelineBuilder::new()
1532            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1533            .with_checkpoint("scan", 0, 10000.0)
1534            .with_checkpoint("filter", 1, 1000.0)
1535            .build();
1536
1537        assert_eq!(config.checkpoints.len(), 2);
1538        assert_eq!(config.checkpoints[0].id, "scan");
1539        assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1540        assert_eq!(config.checkpoints[1].id, "filter");
1541        assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1542    }
1543
1544    #[test]
1545    fn test_adaptive_execution_config_record_checkpoint() {
1546        let mut config = AdaptivePipelineBuilder::new()
1547            .with_checkpoint("filter", 0, 100.0)
1548            .build();
1549
1550        config.record_checkpoint("filter", 500);
1551
1552        // Check context was updated
1553        let cp = config.context.get_checkpoint("filter").unwrap();
1554        assert_eq!(cp.actual, 500);
1555        assert!(cp.recorded);
1556
1557        // Check checkpoint was updated
1558        let acp = config
1559            .checkpoints
1560            .iter()
1561            .find(|c| c.id == "filter")
1562            .unwrap();
1563        assert_eq!(acp.actual_rows, 500);
1564    }
1565
1566    #[test]
1567    fn test_adaptive_execution_config_should_reoptimize() {
1568        let mut config = AdaptivePipelineBuilder::new()
1569            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1570            .with_checkpoint("filter", 0, 100.0)
1571            .build();
1572
1573        // No data yet - should not trigger
1574        assert!(config.should_reoptimize().is_none());
1575
1576        // Record within threshold
1577        config.record_checkpoint("filter", 150);
1578        assert!(config.should_reoptimize().is_none()); // 1.5x is within 2.0x threshold
1579
1580        // Record exceeding threshold
1581        config.checkpoints[0].actual_rows = 0; // Reset for new test
1582        config.record_checkpoint("filter", 500);
1583        config.checkpoints[0].actual_rows = 500;
1584
1585        let trigger = config.should_reoptimize();
1586        assert!(trigger.is_some());
1587        assert_eq!(trigger.unwrap().id, "filter");
1588    }
1589
1590    #[test]
1591    fn test_adaptive_execution_config_mark_triggered() {
1592        let mut config = AdaptivePipelineBuilder::new()
1593            .with_checkpoint("filter", 0, 100.0)
1594            .build();
1595
1596        assert!(!config.checkpoints[0].triggered);
1597
1598        config.mark_triggered("filter");
1599
1600        assert!(config.checkpoints[0].triggered);
1601    }
1602
1603    #[test]
1604    fn test_adaptive_event_callback() {
1605        use std::sync::atomic::AtomicUsize;
1606
1607        let event_count = Arc::new(AtomicUsize::new(0));
1608        let counter = event_count.clone();
1609
1610        let mut config = AdaptivePipelineBuilder::new()
1611            .with_checkpoint("filter", 0, 100.0)
1612            .with_event_callback(Box::new(move |_event| {
1613                counter.fetch_add(1, Ordering::Relaxed);
1614            }))
1615            .build();
1616
1617        config.record_checkpoint("filter", 500);
1618
1619        // Should have received one CheckpointReached event
1620        assert_eq!(event_count.load(Ordering::Relaxed), 1);
1621
1622        config.mark_triggered("filter");
1623
1624        // Should have received one ReoptimizationTriggered event
1625        assert_eq!(event_count.load(Ordering::Relaxed), 2);
1626    }
1627
1628    #[test]
1629    fn test_adaptive_checkpoint_with_zero_estimate() {
1630        let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1631        cp.record_rows(100);
1632
1633        // Zero estimate should trigger if any rows are seen
1634        assert!(cp.exceeds_threshold(2.0, 50));
1635    }
1636}