Skip to main content

grafeo_core/execution/
adaptive.rs

1//! Adaptive query execution with runtime cardinality feedback.
2//!
3//! This module provides adaptive execution capabilities that allow the query
4//! engine to adjust its execution strategy based on actual runtime cardinalities.
5//!
6//! The key insight is that cardinality estimates can be significantly wrong,
7//! especially for complex predicates or skewed data. By tracking actual row
8//! counts during execution, we can detect when our estimates are off and
9//! potentially re-optimize the remaining query plan.
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
15//! │  Optimizer  │────>│ Estimated Cards  │     │ CardinalityFeed │
16//! └─────────────┘     └──────────────────┘     │     back        │
17//!                              │               └────────┬────────┘
18//!                              v                        │
19//!                     ┌──────────────────┐              │
20//!                     │ AdaptiveContext  │<─────────────┘
21//!                     └────────┬─────────┘
22//!                              │
23//!                     ┌────────v─────────┐
24//!                     │ ReoptimizeCheck  │
25//!                     └──────────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```ignore
31//! use grafeo_core::execution::adaptive::{AdaptiveContext, CardinalityCheckpoint};
32//!
33//! // Set up adaptive context with estimated cardinalities
34//! let mut ctx = AdaptiveContext::new();
35//! ctx.set_estimate("scan_1", 1000.0);
36//! ctx.set_estimate("filter_1", 100.0);  // Expected 10% selectivity
37//!
38//! // During execution, record actuals
39//! ctx.record_actual("scan_1", 1000);
40//! ctx.record_actual("filter_1", 500);   // Actual 50% selectivity!
41//!
42//! // Check if re-optimization is warranted
43//! if ctx.should_reoptimize() {
44//!     // Re-plan remaining operators with updated statistics
45//! }
46//! ```
47
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56/// Threshold for deviation that triggers re-optimization consideration.
57/// A value of 2.0 means actual cardinality is 2x or 0.5x the estimate.
58pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60/// Minimum number of rows before considering re-optimization.
61/// Helps avoid thrashing on small result sets.
62pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64/// A checkpoint for tracking cardinality at a specific point in the plan.
65#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67    /// Unique identifier for this checkpoint (typically operator name/id).
68    pub operator_id: String,
69    /// Estimated cardinality from the optimizer.
70    pub estimated: f64,
71    /// Actual row count observed during execution.
72    pub actual: u64,
73    /// Whether this checkpoint has been recorded.
74    pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78    /// Creates a new checkpoint with an estimate.
79    #[must_use]
80    pub fn new(operator_id: &str, estimated: f64) -> Self {
81        Self {
82            operator_id: operator_id.to_string(),
83            estimated,
84            actual: 0,
85            recorded: false,
86        }
87    }
88
89    /// Records the actual cardinality.
90    pub fn record(&mut self, actual: u64) {
91        self.actual = actual;
92        self.recorded = true;
93    }
94
95    /// Returns the deviation ratio (actual / estimated).
96    ///
97    /// A ratio > 1.0 means we underestimated, < 1.0 means overestimated.
98    /// Returns 1.0 if estimate is 0 to avoid division by zero.
99    #[must_use]
100    pub fn deviation_ratio(&self) -> f64 {
101        if self.estimated <= 0.0 {
102            return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103        }
104        self.actual as f64 / self.estimated
105    }
106
107    /// Returns the absolute deviation (|actual - estimated|).
108    #[must_use]
109    pub fn absolute_deviation(&self) -> f64 {
110        (self.actual as f64 - self.estimated).abs()
111    }
112
113    /// Checks if this checkpoint shows significant deviation.
114    #[must_use]
115    pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116        if !self.recorded {
117            return false;
118        }
119        let ratio = self.deviation_ratio();
120        ratio > threshold || ratio < 1.0 / threshold
121    }
122}
123
124/// Feedback from runtime execution about actual cardinalities.
125///
126/// This struct collects actual row counts at various points during query
127/// execution, allowing comparison with optimizer estimates.
128#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130    /// Actual row counts by operator ID.
131    actuals: HashMap<String, u64>,
132    /// Running count (for streaming updates).
133    running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137    /// Creates a new empty feedback collector.
138    #[must_use]
139    pub fn new() -> Self {
140        Self {
141            actuals: HashMap::new(),
142            running_counts: HashMap::new(),
143        }
144    }
145
146    /// Records the final actual cardinality for an operator.
147    pub fn record(&mut self, operator_id: &str, count: u64) {
148        self.actuals.insert(operator_id.to_string(), count);
149    }
150
151    /// Adds to the running count for an operator (thread-safe).
152    pub fn add_rows(&self, operator_id: &str, count: u64) {
153        if let Some(counter) = self.running_counts.get(operator_id) {
154            counter.fetch_add(count, Ordering::Relaxed);
155        }
156    }
157
158    /// Initializes a running counter for an operator.
159    pub fn init_counter(&mut self, operator_id: &str) {
160        self.running_counts
161            .insert(operator_id.to_string(), AtomicU64::new(0));
162    }
163
164    /// Finalizes the running count into the actuals.
165    pub fn finalize_counter(&mut self, operator_id: &str) {
166        if let Some(counter) = self.running_counts.get(operator_id) {
167            let count = counter.load(Ordering::Relaxed);
168            self.actuals.insert(operator_id.to_string(), count);
169        }
170    }
171
172    /// Gets the actual count for an operator.
173    #[must_use]
174    pub fn get(&self, operator_id: &str) -> Option<u64> {
175        self.actuals.get(operator_id).copied()
176    }
177
178    /// Gets the current running count for an operator.
179    #[must_use]
180    pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181        self.running_counts
182            .get(operator_id)
183            .map(|c| c.load(Ordering::Relaxed))
184    }
185
186    /// Returns all recorded actuals.
187    #[must_use]
188    pub fn all_actuals(&self) -> &HashMap<String, u64> {
189        &self.actuals
190    }
191}
192
193/// Context for adaptive query execution.
194///
195/// Holds both estimated and actual cardinalities, and provides methods
196/// to determine when re-optimization should occur.
197#[derive(Debug)]
198pub struct AdaptiveContext {
199    /// Checkpoints with estimates and actuals.
200    checkpoints: HashMap<String, CardinalityCheckpoint>,
201    /// Threshold ratio for triggering re-optimization.
202    reoptimization_threshold: f64,
203    /// Minimum rows before considering re-optimization.
204    min_rows: u64,
205    /// Whether re-optimization has been triggered.
206    reoptimization_triggered: bool,
207    /// Operator that caused re-optimization (if any).
208    trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212    /// Creates a new adaptive context with default settings.
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            checkpoints: HashMap::new(),
217            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218            min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219            reoptimization_triggered: false,
220            trigger_operator: None,
221        }
222    }
223
224    /// Creates a context with custom thresholds.
225    #[must_use]
226    pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227        Self {
228            checkpoints: HashMap::new(),
229            reoptimization_threshold: threshold,
230            min_rows,
231            reoptimization_triggered: false,
232            trigger_operator: None,
233        }
234    }
235
236    /// Sets the estimated cardinality for an operator.
237    pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238        self.checkpoints.insert(
239            operator_id.to_string(),
240            CardinalityCheckpoint::new(operator_id, estimate),
241        );
242    }
243
244    /// Records the actual cardinality for an operator.
245    pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246        if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247            checkpoint.record(actual);
248        } else {
249            // Create checkpoint with unknown estimate
250            let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251            checkpoint.record(actual);
252            self.checkpoints.insert(operator_id.to_string(), checkpoint);
253        }
254    }
255
256    /// Applies feedback from a `CardinalityFeedback` collector.
257    pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258        for (op_id, &actual) in feedback.all_actuals() {
259            self.record_actual(op_id, actual);
260        }
261    }
262
263    /// Checks if any checkpoint shows significant deviation.
264    #[must_use]
265    pub fn has_significant_deviation(&self) -> bool {
266        self.checkpoints
267            .values()
268            .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269    }
270
271    /// Determines if re-optimization should be triggered.
272    ///
273    /// Returns true if:
274    /// - There's significant deviation from estimates
275    /// - We've processed enough rows to make a meaningful decision
276    /// - Re-optimization hasn't already been triggered
277    #[must_use]
278    pub fn should_reoptimize(&mut self) -> bool {
279        if self.reoptimization_triggered {
280            return false;
281        }
282
283        for (op_id, checkpoint) in &self.checkpoints {
284            if checkpoint.actual < self.min_rows {
285                continue;
286            }
287
288            if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289                self.reoptimization_triggered = true;
290                self.trigger_operator = Some(op_id.clone());
291                return true;
292            }
293        }
294
295        false
296    }
297
298    /// Returns the operator that triggered re-optimization, if any.
299    #[must_use]
300    pub fn trigger_operator(&self) -> Option<&str> {
301        self.trigger_operator.as_deref()
302    }
303
304    /// Gets the checkpoint for an operator.
305    #[must_use]
306    pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307        self.checkpoints.get(operator_id)
308    }
309
310    /// Returns all checkpoints.
311    #[must_use]
312    pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313        &self.checkpoints
314    }
315
316    /// Calculates a correction factor for a specific operator.
317    ///
318    /// This factor can be used to adjust remaining cardinality estimates.
319    #[must_use]
320    pub fn correction_factor(&self, operator_id: &str) -> f64 {
321        self.checkpoints
322            .get(operator_id)
323            .filter(|cp| cp.recorded)
324            .map_or(1.0, CardinalityCheckpoint::deviation_ratio)
325    }
326
327    /// Returns summary statistics about the adaptive execution.
328    #[must_use]
329    pub fn summary(&self) -> AdaptiveSummary {
330        let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
331        let deviation_count = self
332            .checkpoints
333            .values()
334            .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
335            .count();
336
337        let avg_deviation = if recorded_count > 0 {
338            self.checkpoints
339                .values()
340                .filter(|cp| cp.recorded)
341                .map(CardinalityCheckpoint::deviation_ratio)
342                .sum::<f64>()
343                / recorded_count as f64
344        } else {
345            1.0
346        };
347
348        let max_deviation = self
349            .checkpoints
350            .values()
351            .filter(|cp| cp.recorded)
352            .map(|cp| {
353                let ratio = cp.deviation_ratio();
354                if ratio > 1.0 { ratio } else { 1.0 / ratio }
355            })
356            .fold(1.0_f64, f64::max);
357
358        AdaptiveSummary {
359            checkpoint_count: self.checkpoints.len(),
360            recorded_count,
361            deviation_count,
362            avg_deviation_ratio: avg_deviation,
363            max_deviation_ratio: max_deviation,
364            reoptimization_triggered: self.reoptimization_triggered,
365            trigger_operator: self.trigger_operator.clone(),
366        }
367    }
368
369    /// Resets the context for a new execution.
370    pub fn reset(&mut self) {
371        for checkpoint in self.checkpoints.values_mut() {
372            checkpoint.actual = 0;
373            checkpoint.recorded = false;
374        }
375        self.reoptimization_triggered = false;
376        self.trigger_operator = None;
377    }
378}
379
380impl Default for AdaptiveContext {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386/// Summary of adaptive execution statistics.
387#[derive(Debug, Clone, Default)]
388pub struct AdaptiveSummary {
389    /// Total number of checkpoints.
390    pub checkpoint_count: usize,
391    /// Number of checkpoints with recorded actuals.
392    pub recorded_count: usize,
393    /// Number of checkpoints with significant deviation.
394    pub deviation_count: usize,
395    /// Average deviation ratio across all recorded checkpoints.
396    pub avg_deviation_ratio: f64,
397    /// Maximum deviation ratio observed.
398    pub max_deviation_ratio: f64,
399    /// Whether re-optimization was triggered.
400    pub reoptimization_triggered: bool,
401    /// Operator that triggered re-optimization.
402    pub trigger_operator: Option<String>,
403}
404
405/// Thread-safe wrapper for `AdaptiveContext`.
406///
407/// Allows multiple operators to report cardinalities concurrently.
408#[derive(Debug, Clone)]
409pub struct SharedAdaptiveContext {
410    inner: Arc<RwLock<AdaptiveContext>>,
411}
412
413impl SharedAdaptiveContext {
414    /// Creates a new shared context.
415    #[must_use]
416    pub fn new() -> Self {
417        Self {
418            inner: Arc::new(RwLock::new(AdaptiveContext::new())),
419        }
420    }
421
422    /// Creates from an existing context.
423    #[must_use]
424    pub fn from_context(ctx: AdaptiveContext) -> Self {
425        Self {
426            inner: Arc::new(RwLock::new(ctx)),
427        }
428    }
429
430    /// Records actual cardinality for an operator.
431    pub fn record_actual(&self, operator_id: &str, actual: u64) {
432        if let Ok(mut ctx) = self.inner.write() {
433            ctx.record_actual(operator_id, actual);
434        }
435    }
436
437    /// Checks if re-optimization should be triggered.
438    #[must_use]
439    pub fn should_reoptimize(&self) -> bool {
440        if let Ok(mut ctx) = self.inner.write() {
441            ctx.should_reoptimize()
442        } else {
443            false
444        }
445    }
446
447    /// Gets a read-only snapshot of the context.
448    #[must_use]
449    pub fn snapshot(&self) -> Option<AdaptiveContext> {
450        self.inner.read().ok().map(|guard| AdaptiveContext {
451            checkpoints: guard.checkpoints.clone(),
452            reoptimization_threshold: guard.reoptimization_threshold,
453            min_rows: guard.min_rows,
454            reoptimization_triggered: guard.reoptimization_triggered,
455            trigger_operator: guard.trigger_operator.clone(),
456        })
457    }
458}
459
460impl Default for SharedAdaptiveContext {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466/// A wrapper operator that tracks cardinality and reports to an adaptive context.
467///
468/// This wraps any `PushOperator` and counts the rows flowing through it,
469/// reporting the count to the adaptive context.
470pub struct CardinalityTrackingOperator {
471    /// The wrapped operator.
472    inner: Box<dyn PushOperator>,
473    /// Operator identifier for reporting.
474    operator_id: String,
475    /// Row counter.
476    row_count: u64,
477    /// Shared adaptive context for reporting.
478    context: SharedAdaptiveContext,
479}
480
481impl CardinalityTrackingOperator {
482    /// Creates a new tracking wrapper.
483    pub fn new(
484        inner: Box<dyn PushOperator>,
485        operator_id: &str,
486        context: SharedAdaptiveContext,
487    ) -> Self {
488        Self {
489            inner,
490            operator_id: operator_id.to_string(),
491            row_count: 0,
492            context,
493        }
494    }
495
496    /// Returns the current row count.
497    #[must_use]
498    pub fn current_count(&self) -> u64 {
499        self.row_count
500    }
501}
502
503impl PushOperator for CardinalityTrackingOperator {
504    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
505        // Track input rows
506        self.row_count += chunk.len() as u64;
507
508        // Push through to inner operator
509        self.inner.push(chunk, sink)
510    }
511
512    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
513        // Report final cardinality to context
514        self.context
515            .record_actual(&self.operator_id, self.row_count);
516
517        // Finalize inner operator
518        self.inner.finalize(sink)
519    }
520
521    fn preferred_chunk_size(&self) -> ChunkSizeHint {
522        self.inner.preferred_chunk_size()
523    }
524
525    fn name(&self) -> &'static str {
526        // Return the inner operator's name
527        self.inner.name()
528    }
529}
530
531/// A sink that tracks cardinality of data flowing through it.
532pub struct CardinalityTrackingSink {
533    /// The wrapped sink.
534    inner: Box<dyn Sink>,
535    /// Operator identifier for reporting.
536    operator_id: String,
537    /// Row counter.
538    row_count: u64,
539    /// Shared adaptive context for reporting.
540    context: SharedAdaptiveContext,
541}
542
543impl CardinalityTrackingSink {
544    /// Creates a new tracking sink wrapper.
545    pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
546        Self {
547            inner,
548            operator_id: operator_id.to_string(),
549            row_count: 0,
550            context,
551        }
552    }
553
554    /// Returns the current row count.
555    #[must_use]
556    pub fn current_count(&self) -> u64 {
557        self.row_count
558    }
559}
560
561impl Sink for CardinalityTrackingSink {
562    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
563        self.row_count += chunk.len() as u64;
564        self.inner.consume(chunk)
565    }
566
567    fn finalize(&mut self) -> Result<(), OperatorError> {
568        // Report final cardinality
569        self.context
570            .record_actual(&self.operator_id, self.row_count);
571        self.inner.finalize()
572    }
573
574    fn name(&self) -> &'static str {
575        self.inner.name()
576    }
577}
578
579/// Decision about whether to re-optimize a query.
580#[derive(Debug, Clone, PartialEq)]
581pub enum ReoptimizationDecision {
582    /// Continue with current plan.
583    Continue,
584    /// Re-optimize the remaining plan.
585    Reoptimize {
586        /// The operator that triggered re-optimization.
587        trigger: String,
588        /// Correction factors to apply to remaining estimates.
589        corrections: HashMap<String, f64>,
590    },
591    /// Abort the query (catastrophic misestimate).
592    Abort {
593        /// The reason for aborting the query.
594        reason: String,
595    },
596}
597
598/// Evaluates whether re-optimization should occur based on context.
599#[must_use]
600pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
601    let summary = ctx.summary();
602
603    // If no significant deviations, continue
604    if !summary.reoptimization_triggered {
605        return ReoptimizationDecision::Continue;
606    }
607
608    // If deviation is catastrophic (>100x), consider aborting
609    if summary.max_deviation_ratio > 100.0 {
610        return ReoptimizationDecision::Abort {
611            reason: format!(
612                "Catastrophic cardinality misestimate: {}x deviation",
613                summary.max_deviation_ratio
614            ),
615        };
616    }
617
618    // Build correction factors
619    let corrections: HashMap<String, f64> = ctx
620        .all_checkpoints()
621        .iter()
622        .filter(|(_, cp)| cp.recorded)
623        .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
624        .collect();
625
626    ReoptimizationDecision::Reoptimize {
627        trigger: summary.trigger_operator.unwrap_or_default(),
628        corrections,
629    }
630}
631
632/// Callback for creating a new plan based on observed cardinalities.
633///
634/// This is called when the adaptive pipeline detects significant deviation
635/// and decides to re-optimize. The callback receives the adaptive context
636/// with recorded actuals and should return a new set of operators.
637pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
638
639/// Configuration for adaptive pipeline execution.
640#[derive(Debug, Clone)]
641pub struct AdaptivePipelineConfig {
642    /// Number of rows to process before checking for re-optimization.
643    pub check_interval: u64,
644    /// Threshold for deviation that triggers re-optimization.
645    pub reoptimization_threshold: f64,
646    /// Minimum rows before considering re-optimization.
647    pub min_rows_for_reoptimization: u64,
648    /// Maximum number of re-optimizations allowed per query.
649    pub max_reoptimizations: usize,
650}
651
652impl Default for AdaptivePipelineConfig {
653    fn default() -> Self {
654        Self {
655            check_interval: 10_000,
656            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
657            min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
658            max_reoptimizations: 3,
659        }
660    }
661}
662
663impl AdaptivePipelineConfig {
664    /// Creates a new configuration with custom thresholds.
665    #[must_use]
666    pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
667        Self {
668            check_interval,
669            reoptimization_threshold: threshold,
670            min_rows_for_reoptimization: min_rows,
671            max_reoptimizations: 3,
672        }
673    }
674
675    /// Sets the maximum number of re-optimizations allowed.
676    #[must_use]
677    pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
678        self.max_reoptimizations = max;
679        self
680    }
681}
682
683/// Result of executing an adaptive pipeline.
684#[derive(Debug, Clone)]
685pub struct AdaptiveExecutionResult {
686    /// Total rows processed.
687    pub total_rows: u64,
688    /// Number of re-optimizations performed.
689    pub reoptimization_count: usize,
690    /// Operators that triggered re-optimization.
691    pub triggers: Vec<String>,
692    /// Final adaptive context with all recorded actuals.
693    pub final_context: AdaptiveSummary,
694}
695
696/// A checkpoint during adaptive execution where plan switching can occur.
697///
698/// Checkpoints are placed at strategic points in the pipeline (typically after
699/// filters or joins) where switching to a new plan makes sense.
700#[derive(Debug)]
701pub struct AdaptiveCheckpoint {
702    /// Unique identifier for this checkpoint.
703    pub id: String,
704    /// Operator index in the pipeline (after which this checkpoint occurs).
705    pub after_operator: usize,
706    /// Estimated cardinality at this point.
707    pub estimated_cardinality: f64,
708    /// Actual rows seen so far.
709    pub actual_rows: u64,
710    /// Whether this checkpoint has triggered re-optimization.
711    pub triggered: bool,
712}
713
714impl AdaptiveCheckpoint {
715    /// Creates a new checkpoint.
716    #[must_use]
717    pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
718        Self {
719            id: id.to_string(),
720            after_operator,
721            estimated_cardinality: estimated,
722            actual_rows: 0,
723            triggered: false,
724        }
725    }
726
727    /// Records rows passing through this checkpoint.
728    pub fn record_rows(&mut self, count: u64) {
729        self.actual_rows += count;
730    }
731
732    /// Checks if deviation exceeds threshold.
733    #[must_use]
734    pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
735        if self.actual_rows < min_rows {
736            return false;
737        }
738        if self.estimated_cardinality <= 0.0 {
739            return self.actual_rows > 0;
740        }
741        let ratio = self.actual_rows as f64 / self.estimated_cardinality;
742        ratio > threshold || ratio < 1.0 / threshold
743    }
744}
745
746/// Event emitted during adaptive execution.
747#[derive(Debug, Clone)]
748pub enum AdaptiveEvent {
749    /// A checkpoint was reached.
750    CheckpointReached {
751        /// The checkpoint identifier.
752        id: String,
753        /// The actual number of rows observed.
754        actual_rows: u64,
755        /// The estimated number of rows from the optimizer.
756        estimated: f64,
757    },
758    /// Re-optimization was triggered.
759    ReoptimizationTriggered {
760        /// The checkpoint that triggered re-optimization.
761        checkpoint_id: String,
762        /// The ratio between actual and estimated rows.
763        deviation_ratio: f64,
764    },
765    /// Plan was switched.
766    PlanSwitched {
767        /// The number of operators in the previous plan.
768        old_operator_count: usize,
769        /// The number of operators in the new plan.
770        new_operator_count: usize,
771    },
772    /// Execution completed.
773    ExecutionCompleted {
774        /// The total number of rows produced.
775        total_rows: u64,
776    },
777}
778
779/// Callback for observing adaptive execution events.
780pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
781
782/// Builder for creating adaptive pipelines.
783pub struct AdaptivePipelineBuilder {
784    checkpoints: Vec<AdaptiveCheckpoint>,
785    config: AdaptivePipelineConfig,
786    context: AdaptiveContext,
787    event_callback: Option<AdaptiveEventCallback>,
788}
789
790impl AdaptivePipelineBuilder {
791    /// Creates a new builder with default configuration.
792    #[must_use]
793    pub fn new() -> Self {
794        Self {
795            checkpoints: Vec::new(),
796            config: AdaptivePipelineConfig::default(),
797            context: AdaptiveContext::new(),
798            event_callback: None,
799        }
800    }
801
802    /// Sets the configuration.
803    #[must_use]
804    pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
805        self.config = config;
806        self
807    }
808
809    /// Adds a checkpoint at the specified operator index.
810    #[must_use]
811    pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
812        self.checkpoints
813            .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
814        self.context.set_estimate(id, estimated);
815        self
816    }
817
818    /// Sets an event callback for observing execution.
819    #[must_use]
820    pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
821        self.event_callback = Some(callback);
822        self
823    }
824
825    /// Sets estimates from a pre-configured context.
826    #[must_use]
827    pub fn with_context(mut self, context: AdaptiveContext) -> Self {
828        self.context = context;
829        self
830    }
831
832    /// Builds the configuration for adaptive execution.
833    #[must_use]
834    pub fn build(self) -> AdaptiveExecutionConfig {
835        AdaptiveExecutionConfig {
836            checkpoints: self.checkpoints,
837            config: self.config,
838            context: self.context,
839            event_callback: self.event_callback,
840        }
841    }
842}
843
844impl Default for AdaptivePipelineBuilder {
845    fn default() -> Self {
846        Self::new()
847    }
848}
849
850/// Configuration for adaptive execution, built by `AdaptivePipelineBuilder`.
851pub struct AdaptiveExecutionConfig {
852    /// Checkpoints for monitoring cardinality.
853    pub checkpoints: Vec<AdaptiveCheckpoint>,
854    /// Execution configuration.
855    pub config: AdaptivePipelineConfig,
856    /// Adaptive context with estimates.
857    pub context: AdaptiveContext,
858    /// Optional event callback.
859    pub event_callback: Option<AdaptiveEventCallback>,
860}
861
862impl AdaptiveExecutionConfig {
863    /// Returns a summary of the adaptive execution after it completes.
864    #[must_use]
865    pub fn summary(&self) -> AdaptiveSummary {
866        self.context.summary()
867    }
868
869    /// Records actual cardinality for a checkpoint.
870    pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
871        self.context.record_actual(checkpoint_id, actual);
872
873        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
874            cp.actual_rows = actual;
875        }
876
877        if let Some(ref callback) = self.event_callback {
878            let estimated = self
879                .context
880                .get_checkpoint(checkpoint_id)
881                .map_or(0.0, |cp| cp.estimated);
882            callback(AdaptiveEvent::CheckpointReached {
883                id: checkpoint_id.to_string(),
884                actual_rows: actual,
885                estimated,
886            });
887        }
888    }
889
890    /// Checks if any checkpoint exceeds the deviation threshold.
891    #[must_use]
892    pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
893        self.checkpoints.iter().find(|cp| {
894            !cp.triggered
895                && cp.exceeds_threshold(
896                    self.config.reoptimization_threshold,
897                    self.config.min_rows_for_reoptimization,
898                )
899        })
900    }
901
902    /// Marks a checkpoint as having triggered re-optimization.
903    pub fn mark_triggered(&mut self, checkpoint_id: &str) {
904        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
905            cp.triggered = true;
906        }
907
908        if let Some(ref callback) = self.event_callback {
909            let ratio = self
910                .context
911                .get_checkpoint(checkpoint_id)
912                .filter(|cp| cp.recorded)
913                .map_or(1.0, |cp| cp.deviation_ratio());
914            callback(AdaptiveEvent::ReoptimizationTriggered {
915                checkpoint_id: checkpoint_id.to_string(),
916                deviation_ratio: ratio,
917            });
918        }
919    }
920}
921
922// ============= Pull-Based Operator Tracking =============
923
924use super::operators::{Operator, OperatorResult}; // OperatorError imported above
925
926/// A wrapper that tracks cardinality for pull-based operators.
927///
928/// This wraps any `Operator` and counts the rows flowing through it,
929/// reporting the count to the adaptive context. Use this for integrating
930/// adaptive execution with the standard pull-based executor.
931pub struct CardinalityTrackingWrapper {
932    /// The wrapped operator.
933    inner: Box<dyn Operator>,
934    /// Operator identifier for reporting.
935    operator_id: String,
936    /// Row counter.
937    row_count: u64,
938    /// Shared adaptive context for reporting.
939    context: SharedAdaptiveContext,
940    /// Whether finalization has been reported.
941    finalized: bool,
942}
943
944impl CardinalityTrackingWrapper {
945    /// Creates a new tracking wrapper for a pull-based operator.
946    pub fn new(
947        inner: Box<dyn Operator>,
948        operator_id: &str,
949        context: SharedAdaptiveContext,
950    ) -> Self {
951        Self {
952            inner,
953            operator_id: operator_id.to_string(),
954            row_count: 0,
955            context,
956            finalized: false,
957        }
958    }
959
960    /// Returns the current row count.
961    #[must_use]
962    pub fn current_count(&self) -> u64 {
963        self.row_count
964    }
965
966    /// Reports the final cardinality to the context.
967    fn report_final(&mut self) {
968        if !self.finalized {
969            self.context
970                .record_actual(&self.operator_id, self.row_count);
971            self.finalized = true;
972        }
973    }
974}
975
976impl Operator for CardinalityTrackingWrapper {
977    fn next(&mut self) -> OperatorResult {
978        match self.inner.next() {
979            Ok(Some(chunk)) => {
980                // Track rows
981                self.row_count += chunk.row_count() as u64;
982                Ok(Some(chunk))
983            }
984            Ok(None) => {
985                // Stream exhausted - report final cardinality
986                self.report_final();
987                Ok(None)
988            }
989            Err(e) => {
990                // Report on error too
991                self.report_final();
992                Err(e)
993            }
994        }
995    }
996
997    fn reset(&mut self) {
998        self.row_count = 0;
999        self.finalized = false;
1000        self.inner.reset();
1001    }
1002
1003    fn name(&self) -> &'static str {
1004        self.inner.name()
1005    }
1006}
1007
1008impl Drop for CardinalityTrackingWrapper {
1009    fn drop(&mut self) {
1010        // Ensure we report even if dropped early
1011        self.report_final();
1012    }
1013}
1014
1015// ============= Adaptive Pipeline Execution =============
1016
1017use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; // Sink imported above
1018use super::sink::CollectorSink;
1019use super::source::OperatorSource;
1020
1021/// High-level adaptive pipeline that executes a pull-based operator with
1022/// cardinality tracking using push-based infrastructure.
1023///
1024/// This bridges the pull-based planner output with push-based execution:
1025/// 1. Wraps the pull operator as an `OperatorSource`
1026/// 2. Uses `CardinalityTrackingSink` to track output cardinality
1027/// 3. Provides adaptive feedback through `AdaptiveContext`
1028///
1029/// # Example
1030///
1031/// ```ignore
1032/// use grafeo_core::execution::adaptive::AdaptivePipelineExecutor;
1033///
1034/// let executor = AdaptivePipelineExecutor::new(operator, adaptive_context);
1035/// let (chunks, summary) = executor.execute()?;
1036/// ```
1037pub struct AdaptivePipelineExecutor {
1038    source: OperatorSource,
1039    context: SharedAdaptiveContext,
1040    config: AdaptivePipelineConfig,
1041}
1042
1043impl AdaptivePipelineExecutor {
1044    /// Creates a new adaptive pipeline executor.
1045    ///
1046    /// # Arguments
1047    ///
1048    /// * `operator` - The pull-based operator to execute
1049    /// * `context` - Adaptive context with cardinality estimates
1050    pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1051        Self {
1052            source: OperatorSource::new(operator),
1053            context: SharedAdaptiveContext::from_context(context),
1054            config: AdaptivePipelineConfig::default(),
1055        }
1056    }
1057
1058    /// Creates an executor with custom configuration.
1059    pub fn with_config(
1060        operator: Box<dyn Operator>,
1061        context: AdaptiveContext,
1062        config: AdaptivePipelineConfig,
1063    ) -> Self {
1064        Self {
1065            source: OperatorSource::new(operator),
1066            context: SharedAdaptiveContext::from_context(context),
1067            config,
1068        }
1069    }
1070
1071    /// Executes the pipeline and returns collected chunks with adaptive summary.
1072    ///
1073    /// # Returns
1074    ///
1075    /// A tuple of (collected DataChunks, adaptive execution summary).
1076    ///
1077    /// # Errors
1078    ///
1079    /// Returns an error if execution fails.
1080    pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1081        let mut sink = CardinalityTrackingSink::new(
1082            Box::new(CollectorSink::new()),
1083            "output",
1084            self.context.clone(),
1085        );
1086
1087        let chunk_size = DEFAULT_CHUNK_SIZE;
1088        let mut total_rows: u64 = 0;
1089        let check_interval = self.config.check_interval;
1090
1091        // Process all chunks from source
1092        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1093            let chunk_rows = chunk.len() as u64;
1094            total_rows += chunk_rows;
1095
1096            // Push to tracking sink
1097            let continue_exec = sink.consume(chunk)?;
1098            if !continue_exec {
1099                break;
1100            }
1101
1102            // Periodically check for reoptimization need
1103            if total_rows >= check_interval
1104                && total_rows.is_multiple_of(check_interval)
1105                && self.context.should_reoptimize()
1106            {
1107                // Log or emit event that reoptimization would be triggered
1108                // Full re-planning would happen at a higher level
1109            }
1110        }
1111
1112        // Finalize sink
1113        sink.finalize()?;
1114
1115        // Extract results from the inner sink
1116        let summary = self
1117            .context
1118            .snapshot()
1119            .map(|ctx| ctx.summary())
1120            .unwrap_or_default();
1121
1122        // Get collected chunks from the inner CollectorSink
1123        // Note: We need to extract chunks from the wrapped sink
1124        // For now, we'll return the summary and the caller can collect separately
1125        Ok((Vec::new(), summary))
1126    }
1127
1128    /// Executes and collects all results into DataChunks.
1129    ///
1130    /// This is a simpler interface that handles chunk collection internally.
1131    pub fn execute_collecting(
1132        mut self,
1133    ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1134        let mut chunks = Vec::new();
1135        let chunk_size = DEFAULT_CHUNK_SIZE;
1136        let mut total_rows: u64 = 0;
1137        let check_interval = self.config.check_interval;
1138
1139        // Process all chunks from source
1140        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1141            let chunk_rows = chunk.len() as u64;
1142            total_rows += chunk_rows;
1143
1144            // Record cardinality
1145            self.context.record_actual("root", total_rows);
1146
1147            // Collect the chunk
1148            if !chunk.is_empty() {
1149                chunks.push(chunk);
1150            }
1151
1152            // Periodically check for reoptimization
1153            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1154                let _ = self.context.should_reoptimize();
1155            }
1156        }
1157
1158        let summary = self
1159            .context
1160            .snapshot()
1161            .map(|ctx| ctx.summary())
1162            .unwrap_or_default();
1163
1164        Ok((chunks, summary))
1165    }
1166
1167    /// Returns a reference to the shared context for external monitoring.
1168    pub fn context(&self) -> &SharedAdaptiveContext {
1169        &self.context
1170    }
1171}
1172
1173/// Convenience function to execute a pull-based operator with adaptive tracking.
1174///
1175/// This is the recommended entry point for adaptive execution.
1176///
1177/// # Arguments
1178///
1179/// * `operator` - The pull-based operator to execute
1180/// * `context` - Adaptive context with cardinality estimates (or None for default)
1181/// * `config` - Optional configuration (uses defaults if None)
1182///
1183/// # Returns
1184///
1185/// A tuple of (collected DataChunks, adaptive summary if tracking was enabled).
1186pub fn execute_adaptive(
1187    operator: Box<dyn Operator>,
1188    context: Option<AdaptiveContext>,
1189    config: Option<AdaptivePipelineConfig>,
1190) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1191    let ctx = context.unwrap_or_default();
1192    let cfg = config.unwrap_or_default();
1193
1194    let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1195    let (chunks, summary) = executor.execute_collecting()?;
1196
1197    Ok((chunks, Some(summary)))
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202    use super::*;
1203
1204    #[test]
1205    fn test_checkpoint_deviation_ratio() {
1206        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1207        cp.record(200);
1208
1209        // Actual is 2x estimate
1210        assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1211    }
1212
1213    #[test]
1214    fn test_checkpoint_underestimate() {
1215        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1216        cp.record(500);
1217
1218        // Underestimated by 5x
1219        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1220        assert!(cp.is_significant_deviation(3.0));
1221    }
1222
1223    #[test]
1224    fn test_checkpoint_overestimate() {
1225        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1226        cp.record(20);
1227
1228        // Overestimated - actual is 0.2x estimate
1229        assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1230        assert!(cp.is_significant_deviation(3.0)); // 0.2 < 1/3
1231    }
1232
1233    #[test]
1234    fn test_checkpoint_accurate() {
1235        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1236        cp.record(110);
1237
1238        // Close to estimate
1239        assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1240        assert!(!cp.is_significant_deviation(3.0)); // 1.1 is within threshold
1241    }
1242
1243    #[test]
1244    fn test_checkpoint_zero_estimate() {
1245        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1246        cp.record(100);
1247
1248        // Zero estimate with actual rows = infinity
1249        assert!(cp.deviation_ratio().is_infinite());
1250    }
1251
1252    #[test]
1253    fn test_checkpoint_zero_both() {
1254        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1255        cp.record(0);
1256
1257        // Zero estimate and zero actual = ratio of 1.0
1258        assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1259    }
1260
1261    #[test]
1262    fn test_feedback_collection() {
1263        let mut feedback = CardinalityFeedback::new();
1264        feedback.record("scan_1", 1000);
1265        feedback.record("filter_1", 100);
1266
1267        assert_eq!(feedback.get("scan_1"), Some(1000));
1268        assert_eq!(feedback.get("filter_1"), Some(100));
1269        assert_eq!(feedback.get("unknown"), None);
1270    }
1271
1272    #[test]
1273    fn test_feedback_running_counter() {
1274        let mut feedback = CardinalityFeedback::new();
1275        feedback.init_counter("op_1");
1276
1277        feedback.add_rows("op_1", 100);
1278        feedback.add_rows("op_1", 200);
1279        feedback.add_rows("op_1", 50);
1280
1281        assert_eq!(feedback.get_running("op_1"), Some(350));
1282
1283        feedback.finalize_counter("op_1");
1284        assert_eq!(feedback.get("op_1"), Some(350));
1285    }
1286
1287    #[test]
1288    fn test_adaptive_context_basic() {
1289        let mut ctx = AdaptiveContext::new();
1290        ctx.set_estimate("scan", 1000.0);
1291        ctx.set_estimate("filter", 100.0);
1292
1293        ctx.record_actual("scan", 1000);
1294        ctx.record_actual("filter", 500); // 5x underestimate
1295
1296        let cp = ctx.get_checkpoint("filter").unwrap();
1297        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1298    }
1299
1300    #[test]
1301    fn test_adaptive_context_should_reoptimize() {
1302        let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1303        ctx.set_estimate("scan", 10000.0);
1304        ctx.set_estimate("filter", 1000.0);
1305
1306        ctx.record_actual("scan", 10000);
1307        ctx.record_actual("filter", 5000); // 5x underestimate
1308
1309        assert!(ctx.should_reoptimize());
1310        assert_eq!(ctx.trigger_operator(), Some("filter"));
1311
1312        // Second call should return false (already triggered)
1313        assert!(!ctx.should_reoptimize());
1314    }
1315
1316    #[test]
1317    fn test_adaptive_context_min_rows() {
1318        let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1319        ctx.set_estimate("filter", 100.0);
1320        ctx.record_actual("filter", 500); // 5x, but only 500 rows
1321
1322        // Should not trigger because we haven't seen enough rows
1323        assert!(!ctx.should_reoptimize());
1324    }
1325
1326    #[test]
1327    fn test_adaptive_context_no_deviation() {
1328        let mut ctx = AdaptiveContext::new();
1329        ctx.set_estimate("scan", 1000.0);
1330        ctx.record_actual("scan", 1100); // Close to estimate
1331
1332        assert!(!ctx.has_significant_deviation());
1333        assert!(!ctx.should_reoptimize());
1334    }
1335
1336    #[test]
1337    fn test_adaptive_context_correction_factor() {
1338        let mut ctx = AdaptiveContext::new();
1339        ctx.set_estimate("filter", 100.0);
1340        ctx.record_actual("filter", 300);
1341
1342        assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1343        assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1344    }
1345
1346    #[test]
1347    fn test_adaptive_context_apply_feedback() {
1348        let mut ctx = AdaptiveContext::new();
1349        ctx.set_estimate("scan", 1000.0);
1350        ctx.set_estimate("filter", 100.0);
1351
1352        let mut feedback = CardinalityFeedback::new();
1353        feedback.record("scan", 1000);
1354        feedback.record("filter", 500);
1355
1356        ctx.apply_feedback(&feedback);
1357
1358        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1359        assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1360    }
1361
1362    #[test]
1363    fn test_adaptive_summary() {
1364        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1365        ctx.set_estimate("op1", 100.0);
1366        ctx.set_estimate("op2", 200.0);
1367        ctx.set_estimate("op3", 300.0);
1368
1369        ctx.record_actual("op1", 100); // Exact
1370        ctx.record_actual("op2", 600); // 3x
1371
1372        // Trigger reoptimization
1373        let _ = ctx.should_reoptimize();
1374
1375        let summary = ctx.summary();
1376        assert_eq!(summary.checkpoint_count, 3);
1377        assert_eq!(summary.recorded_count, 2);
1378        assert_eq!(summary.deviation_count, 1);
1379        assert!(summary.reoptimization_triggered);
1380    }
1381
1382    #[test]
1383    fn test_adaptive_context_reset() {
1384        let mut ctx = AdaptiveContext::new();
1385        ctx.set_estimate("scan", 1000.0);
1386        ctx.record_actual("scan", 5000);
1387        let _ = ctx.should_reoptimize(); // Trigger
1388
1389        assert!(ctx.reoptimization_triggered);
1390
1391        ctx.reset();
1392
1393        assert!(!ctx.reoptimization_triggered);
1394        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1395        assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1396        // Estimate should be preserved
1397        assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1398    }
1399
1400    #[test]
1401    fn test_shared_context() {
1402        let ctx = SharedAdaptiveContext::new();
1403
1404        ctx.record_actual("op1", 1000);
1405
1406        let snapshot = ctx.snapshot().unwrap();
1407        assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1408    }
1409
1410    #[test]
1411    fn test_reoptimization_decision_continue() {
1412        let mut ctx = AdaptiveContext::new();
1413        ctx.set_estimate("scan", 1000.0);
1414        ctx.record_actual("scan", 1100);
1415
1416        let decision = evaluate_reoptimization(&ctx);
1417        assert_eq!(decision, ReoptimizationDecision::Continue);
1418    }
1419
1420    #[test]
1421    fn test_reoptimization_decision_reoptimize() {
1422        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1423        ctx.set_estimate("filter", 100.0);
1424        ctx.record_actual("filter", 500);
1425        let _ = ctx.should_reoptimize(); // Trigger
1426
1427        let decision = evaluate_reoptimization(&ctx);
1428
1429        if let ReoptimizationDecision::Reoptimize {
1430            trigger,
1431            corrections,
1432        } = decision
1433        {
1434            assert_eq!(trigger, "filter");
1435            assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1436        } else {
1437            panic!("Expected Reoptimize decision");
1438        }
1439    }
1440
1441    #[test]
1442    fn test_reoptimization_decision_abort() {
1443        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1444        ctx.set_estimate("filter", 1.0);
1445        ctx.record_actual("filter", 1000); // 1000x deviation!
1446        let _ = ctx.should_reoptimize();
1447
1448        let decision = evaluate_reoptimization(&ctx);
1449
1450        if let ReoptimizationDecision::Abort { reason } = decision {
1451            assert!(reason.contains("Catastrophic"));
1452        } else {
1453            panic!("Expected Abort decision");
1454        }
1455    }
1456
1457    #[test]
1458    fn test_absolute_deviation() {
1459        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1460        cp.record(150);
1461
1462        assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1463    }
1464
1465    // ============= Plan Switching Tests =============
1466
1467    #[test]
1468    fn test_adaptive_checkpoint_basic() {
1469        let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1470        assert_eq!(cp.actual_rows, 0);
1471        assert!(!cp.triggered);
1472
1473        cp.record_rows(50);
1474        assert_eq!(cp.actual_rows, 50);
1475
1476        cp.record_rows(100);
1477        assert_eq!(cp.actual_rows, 150);
1478    }
1479
1480    #[test]
1481    fn test_adaptive_checkpoint_exceeds_threshold() {
1482        let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1483
1484        // Below min rows
1485        cp.record_rows(50);
1486        assert!(!cp.exceeds_threshold(2.0, 100));
1487
1488        // Above min rows but within threshold
1489        cp.record_rows(50);
1490        assert!(!cp.exceeds_threshold(2.0, 100)); // 100 actual vs 100 estimated = 1.0x
1491
1492        // Above threshold (underestimate)
1493        cp.actual_rows = 0;
1494        cp.record_rows(500);
1495        assert!(cp.exceeds_threshold(2.0, 100)); // 500 actual vs 100 estimated = 5.0x
1496
1497        // Above threshold (overestimate)
1498        let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1499        cp2.record_rows(200);
1500        assert!(cp2.exceeds_threshold(2.0, 100)); // 200 actual vs 1000 estimated = 0.2x
1501    }
1502
1503    #[test]
1504    fn test_adaptive_pipeline_config_default() {
1505        let config = AdaptivePipelineConfig::default();
1506
1507        assert_eq!(config.check_interval, 10_000);
1508        assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1509        assert_eq!(
1510            config.min_rows_for_reoptimization,
1511            MIN_ROWS_FOR_REOPTIMIZATION
1512        );
1513        assert_eq!(config.max_reoptimizations, 3);
1514    }
1515
1516    #[test]
1517    fn test_adaptive_pipeline_config_custom() {
1518        let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1519
1520        assert_eq!(config.check_interval, 5000);
1521        assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1522        assert_eq!(config.min_rows_for_reoptimization, 500);
1523        assert_eq!(config.max_reoptimizations, 5);
1524    }
1525
1526    #[test]
1527    fn test_adaptive_pipeline_builder() {
1528        let config = AdaptivePipelineBuilder::new()
1529            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1530            .with_checkpoint("scan", 0, 10000.0)
1531            .with_checkpoint("filter", 1, 1000.0)
1532            .build();
1533
1534        assert_eq!(config.checkpoints.len(), 2);
1535        assert_eq!(config.checkpoints[0].id, "scan");
1536        assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1537        assert_eq!(config.checkpoints[1].id, "filter");
1538        assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1539    }
1540
1541    #[test]
1542    fn test_adaptive_execution_config_record_checkpoint() {
1543        let mut config = AdaptivePipelineBuilder::new()
1544            .with_checkpoint("filter", 0, 100.0)
1545            .build();
1546
1547        config.record_checkpoint("filter", 500);
1548
1549        // Check context was updated
1550        let cp = config.context.get_checkpoint("filter").unwrap();
1551        assert_eq!(cp.actual, 500);
1552        assert!(cp.recorded);
1553
1554        // Check checkpoint was updated
1555        let acp = config
1556            .checkpoints
1557            .iter()
1558            .find(|c| c.id == "filter")
1559            .unwrap();
1560        assert_eq!(acp.actual_rows, 500);
1561    }
1562
1563    #[test]
1564    fn test_adaptive_execution_config_should_reoptimize() {
1565        let mut config = AdaptivePipelineBuilder::new()
1566            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1567            .with_checkpoint("filter", 0, 100.0)
1568            .build();
1569
1570        // No data yet - should not trigger
1571        assert!(config.should_reoptimize().is_none());
1572
1573        // Record within threshold
1574        config.record_checkpoint("filter", 150);
1575        assert!(config.should_reoptimize().is_none()); // 1.5x is within 2.0x threshold
1576
1577        // Record exceeding threshold
1578        config.checkpoints[0].actual_rows = 0; // Reset for new test
1579        config.record_checkpoint("filter", 500);
1580        config.checkpoints[0].actual_rows = 500;
1581
1582        let trigger = config.should_reoptimize();
1583        assert!(trigger.is_some());
1584        assert_eq!(trigger.unwrap().id, "filter");
1585    }
1586
1587    #[test]
1588    fn test_adaptive_execution_config_mark_triggered() {
1589        let mut config = AdaptivePipelineBuilder::new()
1590            .with_checkpoint("filter", 0, 100.0)
1591            .build();
1592
1593        assert!(!config.checkpoints[0].triggered);
1594
1595        config.mark_triggered("filter");
1596
1597        assert!(config.checkpoints[0].triggered);
1598    }
1599
1600    #[test]
1601    fn test_adaptive_event_callback() {
1602        use std::sync::atomic::AtomicUsize;
1603
1604        let event_count = Arc::new(AtomicUsize::new(0));
1605        let counter = event_count.clone();
1606
1607        let mut config = AdaptivePipelineBuilder::new()
1608            .with_checkpoint("filter", 0, 100.0)
1609            .with_event_callback(Box::new(move |_event| {
1610                counter.fetch_add(1, Ordering::Relaxed);
1611            }))
1612            .build();
1613
1614        config.record_checkpoint("filter", 500);
1615
1616        // Should have received one CheckpointReached event
1617        assert_eq!(event_count.load(Ordering::Relaxed), 1);
1618
1619        config.mark_triggered("filter");
1620
1621        // Should have received one ReoptimizationTriggered event
1622        assert_eq!(event_count.load(Ordering::Relaxed), 2);
1623    }
1624
1625    #[test]
1626    fn test_adaptive_checkpoint_with_zero_estimate() {
1627        let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1628        cp.record_rows(100);
1629
1630        // Zero estimate should trigger if any rows are seen
1631        assert!(cp.exceeds_threshold(2.0, 50));
1632    }
1633}