Skip to main content

grafeo_core/execution/
adaptive.rs

1//! Adaptive query execution with runtime cardinality feedback.
2//!
3//! This module provides adaptive execution capabilities that allow the query
4//! engine to adjust its execution strategy based on actual runtime cardinalities.
5//!
6//! The key insight is that cardinality estimates can be significantly wrong,
7//! especially for complex predicates or skewed data. By tracking actual row
8//! counts during execution, we can detect when our estimates are off and
9//! potentially re-optimize the remaining query plan.
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
15//! │  Optimizer  │────>│ Estimated Cards  │     │ CardinalityFeed │
16//! └─────────────┘     └──────────────────┘     │     back        │
17//!                              │               └────────┬────────┘
18//!                              v                        │
19//!                     ┌──────────────────┐              │
20//!                     │ AdaptiveContext  │<─────────────┘
21//!                     └────────┬─────────┘
22//!                              │
23//!                     ┌────────v─────────┐
24//!                     │ ReoptimizeCheck  │
25//!                     └──────────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```ignore
31//! use grafeo_core::execution::adaptive::{AdaptiveContext, CardinalityCheckpoint};
32//!
33//! // Set up adaptive context with estimated cardinalities
34//! let mut ctx = AdaptiveContext::new();
35//! ctx.set_estimate("scan_1", 1000.0);
36//! ctx.set_estimate("filter_1", 100.0);  // Expected 10% selectivity
37//!
38//! // During execution, record actuals
39//! ctx.record_actual("scan_1", 1000);
40//! ctx.record_actual("filter_1", 500);   // Actual 50% selectivity!
41//!
42//! // Check if re-optimization is warranted
43//! if ctx.should_reoptimize() {
44//!     // Re-plan remaining operators with updated statistics
45//! }
46//! ```
47
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56/// Threshold for deviation that triggers re-optimization consideration.
57/// A value of 2.0 means actual cardinality is 2x or 0.5x the estimate.
58pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60/// Minimum number of rows before considering re-optimization.
61/// Helps avoid thrashing on small result sets.
62pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64/// A checkpoint for tracking cardinality at a specific point in the plan.
65#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67    /// Unique identifier for this checkpoint (typically operator name/id).
68    pub operator_id: String,
69    /// Estimated cardinality from the optimizer.
70    pub estimated: f64,
71    /// Actual row count observed during execution.
72    pub actual: u64,
73    /// Whether this checkpoint has been recorded.
74    pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78    /// Creates a new checkpoint with an estimate.
79    #[must_use]
80    pub fn new(operator_id: &str, estimated: f64) -> Self {
81        Self {
82            operator_id: operator_id.to_string(),
83            estimated,
84            actual: 0,
85            recorded: false,
86        }
87    }
88
89    /// Records the actual cardinality.
90    pub fn record(&mut self, actual: u64) {
91        self.actual = actual;
92        self.recorded = true;
93    }
94
95    /// Returns the deviation ratio (actual / estimated).
96    ///
97    /// A ratio > 1.0 means we underestimated, < 1.0 means overestimated.
98    /// Returns 1.0 if estimate is 0 to avoid division by zero.
99    #[must_use]
100    pub fn deviation_ratio(&self) -> f64 {
101        if self.estimated <= 0.0 {
102            return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103        }
104        self.actual as f64 / self.estimated
105    }
106
107    /// Returns the absolute deviation (|actual - estimated|).
108    #[must_use]
109    pub fn absolute_deviation(&self) -> f64 {
110        (self.actual as f64 - self.estimated).abs()
111    }
112
113    /// Checks if this checkpoint shows significant deviation.
114    #[must_use]
115    pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116        if !self.recorded {
117            return false;
118        }
119        let ratio = self.deviation_ratio();
120        ratio > threshold || ratio < 1.0 / threshold
121    }
122}
123
124/// Feedback from runtime execution about actual cardinalities.
125///
126/// This struct collects actual row counts at various points during query
127/// execution, allowing comparison with optimizer estimates.
128#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130    /// Actual row counts by operator ID.
131    actuals: HashMap<String, u64>,
132    /// Running count (for streaming updates).
133    running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137    /// Creates a new empty feedback collector.
138    #[must_use]
139    pub fn new() -> Self {
140        Self {
141            actuals: HashMap::new(),
142            running_counts: HashMap::new(),
143        }
144    }
145
146    /// Records the final actual cardinality for an operator.
147    pub fn record(&mut self, operator_id: &str, count: u64) {
148        self.actuals.insert(operator_id.to_string(), count);
149    }
150
151    /// Adds to the running count for an operator (thread-safe).
152    pub fn add_rows(&self, operator_id: &str, count: u64) {
153        if let Some(counter) = self.running_counts.get(operator_id) {
154            counter.fetch_add(count, Ordering::Relaxed);
155        }
156    }
157
158    /// Initializes a running counter for an operator.
159    pub fn init_counter(&mut self, operator_id: &str) {
160        self.running_counts
161            .insert(operator_id.to_string(), AtomicU64::new(0));
162    }
163
164    /// Finalizes the running count into the actuals.
165    pub fn finalize_counter(&mut self, operator_id: &str) {
166        if let Some(counter) = self.running_counts.get(operator_id) {
167            let count = counter.load(Ordering::Relaxed);
168            self.actuals.insert(operator_id.to_string(), count);
169        }
170    }
171
172    /// Gets the actual count for an operator.
173    #[must_use]
174    pub fn get(&self, operator_id: &str) -> Option<u64> {
175        self.actuals.get(operator_id).copied()
176    }
177
178    /// Gets the current running count for an operator.
179    #[must_use]
180    pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181        self.running_counts
182            .get(operator_id)
183            .map(|c| c.load(Ordering::Relaxed))
184    }
185
186    /// Returns all recorded actuals.
187    #[must_use]
188    pub fn all_actuals(&self) -> &HashMap<String, u64> {
189        &self.actuals
190    }
191}
192
193/// Context for adaptive query execution.
194///
195/// Holds both estimated and actual cardinalities, and provides methods
196/// to determine when re-optimization should occur.
197#[derive(Debug)]
198pub struct AdaptiveContext {
199    /// Checkpoints with estimates and actuals.
200    checkpoints: HashMap<String, CardinalityCheckpoint>,
201    /// Threshold ratio for triggering re-optimization.
202    reoptimization_threshold: f64,
203    /// Minimum rows before considering re-optimization.
204    min_rows: u64,
205    /// Whether re-optimization has been triggered.
206    reoptimization_triggered: bool,
207    /// Operator that caused re-optimization (if any).
208    trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212    /// Creates a new adaptive context with default settings.
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            checkpoints: HashMap::new(),
217            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218            min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219            reoptimization_triggered: false,
220            trigger_operator: None,
221        }
222    }
223
224    /// Creates a context with custom thresholds.
225    #[must_use]
226    pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227        Self {
228            checkpoints: HashMap::new(),
229            reoptimization_threshold: threshold,
230            min_rows,
231            reoptimization_triggered: false,
232            trigger_operator: None,
233        }
234    }
235
236    /// Sets the estimated cardinality for an operator.
237    pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238        self.checkpoints.insert(
239            operator_id.to_string(),
240            CardinalityCheckpoint::new(operator_id, estimate),
241        );
242    }
243
244    /// Records the actual cardinality for an operator.
245    pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246        if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247            checkpoint.record(actual);
248        } else {
249            // Create checkpoint with unknown estimate
250            let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251            checkpoint.record(actual);
252            self.checkpoints.insert(operator_id.to_string(), checkpoint);
253        }
254    }
255
256    /// Applies feedback from a `CardinalityFeedback` collector.
257    pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258        for (op_id, &actual) in feedback.all_actuals() {
259            self.record_actual(op_id, actual);
260        }
261    }
262
263    /// Checks if any checkpoint shows significant deviation.
264    #[must_use]
265    pub fn has_significant_deviation(&self) -> bool {
266        self.checkpoints
267            .values()
268            .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269    }
270
271    /// Determines if re-optimization should be triggered.
272    ///
273    /// Returns true if:
274    /// - There's significant deviation from estimates
275    /// - We've processed enough rows to make a meaningful decision
276    /// - Re-optimization hasn't already been triggered
277    #[must_use]
278    pub fn should_reoptimize(&mut self) -> bool {
279        if self.reoptimization_triggered {
280            return false;
281        }
282
283        for (op_id, checkpoint) in &self.checkpoints {
284            if checkpoint.actual < self.min_rows {
285                continue;
286            }
287
288            if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289                self.reoptimization_triggered = true;
290                self.trigger_operator = Some(op_id.clone());
291                return true;
292            }
293        }
294
295        false
296    }
297
298    /// Returns the operator that triggered re-optimization, if any.
299    #[must_use]
300    pub fn trigger_operator(&self) -> Option<&str> {
301        self.trigger_operator.as_deref()
302    }
303
304    /// Gets the checkpoint for an operator.
305    #[must_use]
306    pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307        self.checkpoints.get(operator_id)
308    }
309
310    /// Returns all checkpoints.
311    #[must_use]
312    pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313        &self.checkpoints
314    }
315
316    /// Calculates a correction factor for a specific operator.
317    ///
318    /// This factor can be used to adjust remaining cardinality estimates.
319    #[must_use]
320    pub fn correction_factor(&self, operator_id: &str) -> f64 {
321        self.checkpoints
322            .get(operator_id)
323            .filter(|cp| cp.recorded)
324            .map_or(1.0, CardinalityCheckpoint::deviation_ratio)
325    }
326
327    /// Returns summary statistics about the adaptive execution.
328    #[must_use]
329    pub fn summary(&self) -> AdaptiveSummary {
330        let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
331        let deviation_count = self
332            .checkpoints
333            .values()
334            .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
335            .count();
336
337        let avg_deviation = if recorded_count > 0 {
338            self.checkpoints
339                .values()
340                .filter(|cp| cp.recorded)
341                .map(CardinalityCheckpoint::deviation_ratio)
342                .sum::<f64>()
343                / recorded_count as f64
344        } else {
345            1.0
346        };
347
348        let max_deviation = self
349            .checkpoints
350            .values()
351            .filter(|cp| cp.recorded)
352            .map(|cp| {
353                let ratio = cp.deviation_ratio();
354                if ratio > 1.0 { ratio } else { 1.0 / ratio }
355            })
356            .fold(1.0_f64, f64::max);
357
358        AdaptiveSummary {
359            checkpoint_count: self.checkpoints.len(),
360            recorded_count,
361            deviation_count,
362            avg_deviation_ratio: avg_deviation,
363            max_deviation_ratio: max_deviation,
364            reoptimization_triggered: self.reoptimization_triggered,
365            trigger_operator: self.trigger_operator.clone(),
366        }
367    }
368
369    /// Resets the context for a new execution.
370    pub fn reset(&mut self) {
371        for checkpoint in self.checkpoints.values_mut() {
372            checkpoint.actual = 0;
373            checkpoint.recorded = false;
374        }
375        self.reoptimization_triggered = false;
376        self.trigger_operator = None;
377    }
378}
379
380impl Default for AdaptiveContext {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386/// Summary of adaptive execution statistics.
387#[derive(Debug, Clone, Default)]
388pub struct AdaptiveSummary {
389    /// Total number of checkpoints.
390    pub checkpoint_count: usize,
391    /// Number of checkpoints with recorded actuals.
392    pub recorded_count: usize,
393    /// Number of checkpoints with significant deviation.
394    pub deviation_count: usize,
395    /// Average deviation ratio across all recorded checkpoints.
396    pub avg_deviation_ratio: f64,
397    /// Maximum deviation ratio observed.
398    pub max_deviation_ratio: f64,
399    /// Whether re-optimization was triggered.
400    pub reoptimization_triggered: bool,
401    /// Operator that triggered re-optimization.
402    pub trigger_operator: Option<String>,
403}
404
405/// Thread-safe wrapper for `AdaptiveContext`.
406///
407/// Allows multiple operators to report cardinalities concurrently.
408#[derive(Debug, Clone)]
409pub struct SharedAdaptiveContext {
410    inner: Arc<RwLock<AdaptiveContext>>,
411}
412
413impl SharedAdaptiveContext {
414    /// Creates a new shared context.
415    #[must_use]
416    pub fn new() -> Self {
417        Self {
418            inner: Arc::new(RwLock::new(AdaptiveContext::new())),
419        }
420    }
421
422    /// Creates from an existing context.
423    #[must_use]
424    pub fn from_context(ctx: AdaptiveContext) -> Self {
425        Self {
426            inner: Arc::new(RwLock::new(ctx)),
427        }
428    }
429
430    /// Records actual cardinality for an operator.
431    pub fn record_actual(&self, operator_id: &str, actual: u64) {
432        if let Ok(mut ctx) = self.inner.write() {
433            ctx.record_actual(operator_id, actual);
434        }
435    }
436
437    /// Checks if re-optimization should be triggered.
438    #[must_use]
439    pub fn should_reoptimize(&self) -> bool {
440        if let Ok(mut ctx) = self.inner.write() {
441            ctx.should_reoptimize()
442        } else {
443            false
444        }
445    }
446
447    /// Gets a read-only snapshot of the context.
448    #[must_use]
449    pub fn snapshot(&self) -> Option<AdaptiveContext> {
450        self.inner.read().ok().map(|guard| AdaptiveContext {
451            checkpoints: guard.checkpoints.clone(),
452            reoptimization_threshold: guard.reoptimization_threshold,
453            min_rows: guard.min_rows,
454            reoptimization_triggered: guard.reoptimization_triggered,
455            trigger_operator: guard.trigger_operator.clone(),
456        })
457    }
458}
459
460impl Default for SharedAdaptiveContext {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466/// A wrapper operator that tracks cardinality and reports to an adaptive context.
467///
468/// This wraps any `PushOperator` and counts the rows flowing through it,
469/// reporting the count to the adaptive context.
470pub struct CardinalityTrackingOperator {
471    /// The wrapped operator.
472    inner: Box<dyn PushOperator>,
473    /// Operator identifier for reporting.
474    operator_id: String,
475    /// Row counter.
476    row_count: u64,
477    /// Shared adaptive context for reporting.
478    context: SharedAdaptiveContext,
479}
480
481impl CardinalityTrackingOperator {
482    /// Creates a new tracking wrapper.
483    pub fn new(
484        inner: Box<dyn PushOperator>,
485        operator_id: &str,
486        context: SharedAdaptiveContext,
487    ) -> Self {
488        Self {
489            inner,
490            operator_id: operator_id.to_string(),
491            row_count: 0,
492            context,
493        }
494    }
495
496    /// Returns the current row count.
497    #[must_use]
498    pub fn current_count(&self) -> u64 {
499        self.row_count
500    }
501}
502
503impl PushOperator for CardinalityTrackingOperator {
504    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
505        // Track input rows
506        self.row_count += chunk.len() as u64;
507
508        // Push through to inner operator
509        self.inner.push(chunk, sink)
510    }
511
512    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
513        // Report final cardinality to context
514        self.context
515            .record_actual(&self.operator_id, self.row_count);
516
517        // Finalize inner operator
518        self.inner.finalize(sink)
519    }
520
521    fn preferred_chunk_size(&self) -> ChunkSizeHint {
522        self.inner.preferred_chunk_size()
523    }
524
525    fn name(&self) -> &'static str {
526        // Return the inner operator's name
527        self.inner.name()
528    }
529}
530
531/// A sink that tracks cardinality of data flowing through it.
532pub struct CardinalityTrackingSink {
533    /// The wrapped sink.
534    inner: Box<dyn Sink>,
535    /// Operator identifier for reporting.
536    operator_id: String,
537    /// Row counter.
538    row_count: u64,
539    /// Shared adaptive context for reporting.
540    context: SharedAdaptiveContext,
541}
542
543impl CardinalityTrackingSink {
544    /// Creates a new tracking sink wrapper.
545    pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
546        Self {
547            inner,
548            operator_id: operator_id.to_string(),
549            row_count: 0,
550            context,
551        }
552    }
553
554    /// Returns the current row count.
555    #[must_use]
556    pub fn current_count(&self) -> u64 {
557        self.row_count
558    }
559}
560
561impl Sink for CardinalityTrackingSink {
562    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
563        self.row_count += chunk.len() as u64;
564        self.inner.consume(chunk)
565    }
566
567    fn finalize(&mut self) -> Result<(), OperatorError> {
568        // Report final cardinality
569        self.context
570            .record_actual(&self.operator_id, self.row_count);
571        self.inner.finalize()
572    }
573
574    fn name(&self) -> &'static str {
575        self.inner.name()
576    }
577}
578
579/// Decision about whether to re-optimize a query.
580#[derive(Debug, Clone, PartialEq)]
581pub enum ReoptimizationDecision {
582    /// Continue with current plan.
583    Continue,
584    /// Re-optimize the remaining plan.
585    Reoptimize {
586        /// The operator that triggered re-optimization.
587        trigger: String,
588        /// Correction factors to apply to remaining estimates.
589        corrections: HashMap<String, f64>,
590    },
591    /// Abort the query (catastrophic misestimate).
592    Abort { reason: String },
593}
594
595/// Evaluates whether re-optimization should occur based on context.
596#[must_use]
597pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
598    let summary = ctx.summary();
599
600    // If no significant deviations, continue
601    if !summary.reoptimization_triggered {
602        return ReoptimizationDecision::Continue;
603    }
604
605    // If deviation is catastrophic (>100x), consider aborting
606    if summary.max_deviation_ratio > 100.0 {
607        return ReoptimizationDecision::Abort {
608            reason: format!(
609                "Catastrophic cardinality misestimate: {}x deviation",
610                summary.max_deviation_ratio
611            ),
612        };
613    }
614
615    // Build correction factors
616    let corrections: HashMap<String, f64> = ctx
617        .all_checkpoints()
618        .iter()
619        .filter(|(_, cp)| cp.recorded)
620        .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
621        .collect();
622
623    ReoptimizationDecision::Reoptimize {
624        trigger: summary.trigger_operator.unwrap_or_default(),
625        corrections,
626    }
627}
628
629/// Callback for creating a new plan based on observed cardinalities.
630///
631/// This is called when the adaptive pipeline detects significant deviation
632/// and decides to re-optimize. The callback receives the adaptive context
633/// with recorded actuals and should return a new set of operators.
634pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
635
636/// Configuration for adaptive pipeline execution.
637#[derive(Debug, Clone)]
638pub struct AdaptivePipelineConfig {
639    /// Number of rows to process before checking for re-optimization.
640    pub check_interval: u64,
641    /// Threshold for deviation that triggers re-optimization.
642    pub reoptimization_threshold: f64,
643    /// Minimum rows before considering re-optimization.
644    pub min_rows_for_reoptimization: u64,
645    /// Maximum number of re-optimizations allowed per query.
646    pub max_reoptimizations: usize,
647}
648
649impl Default for AdaptivePipelineConfig {
650    fn default() -> Self {
651        Self {
652            check_interval: 10_000,
653            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
654            min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
655            max_reoptimizations: 3,
656        }
657    }
658}
659
660impl AdaptivePipelineConfig {
661    /// Creates a new configuration with custom thresholds.
662    #[must_use]
663    pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
664        Self {
665            check_interval,
666            reoptimization_threshold: threshold,
667            min_rows_for_reoptimization: min_rows,
668            max_reoptimizations: 3,
669        }
670    }
671
672    /// Sets the maximum number of re-optimizations allowed.
673    #[must_use]
674    pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
675        self.max_reoptimizations = max;
676        self
677    }
678}
679
680/// Result of executing an adaptive pipeline.
681#[derive(Debug, Clone)]
682pub struct AdaptiveExecutionResult {
683    /// Total rows processed.
684    pub total_rows: u64,
685    /// Number of re-optimizations performed.
686    pub reoptimization_count: usize,
687    /// Operators that triggered re-optimization.
688    pub triggers: Vec<String>,
689    /// Final adaptive context with all recorded actuals.
690    pub final_context: AdaptiveSummary,
691}
692
693/// A checkpoint during adaptive execution where plan switching can occur.
694///
695/// Checkpoints are placed at strategic points in the pipeline (typically after
696/// filters or joins) where switching to a new plan makes sense.
697#[derive(Debug)]
698pub struct AdaptiveCheckpoint {
699    /// Unique identifier for this checkpoint.
700    pub id: String,
701    /// Operator index in the pipeline (after which this checkpoint occurs).
702    pub after_operator: usize,
703    /// Estimated cardinality at this point.
704    pub estimated_cardinality: f64,
705    /// Actual rows seen so far.
706    pub actual_rows: u64,
707    /// Whether this checkpoint has triggered re-optimization.
708    pub triggered: bool,
709}
710
711impl AdaptiveCheckpoint {
712    /// Creates a new checkpoint.
713    #[must_use]
714    pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
715        Self {
716            id: id.to_string(),
717            after_operator,
718            estimated_cardinality: estimated,
719            actual_rows: 0,
720            triggered: false,
721        }
722    }
723
724    /// Records rows passing through this checkpoint.
725    pub fn record_rows(&mut self, count: u64) {
726        self.actual_rows += count;
727    }
728
729    /// Checks if deviation exceeds threshold.
730    #[must_use]
731    pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
732        if self.actual_rows < min_rows {
733            return false;
734        }
735        if self.estimated_cardinality <= 0.0 {
736            return self.actual_rows > 0;
737        }
738        let ratio = self.actual_rows as f64 / self.estimated_cardinality;
739        ratio > threshold || ratio < 1.0 / threshold
740    }
741}
742
743/// Event emitted during adaptive execution.
744#[derive(Debug, Clone)]
745pub enum AdaptiveEvent {
746    /// A checkpoint was reached.
747    CheckpointReached {
748        id: String,
749        actual_rows: u64,
750        estimated: f64,
751    },
752    /// Re-optimization was triggered.
753    ReoptimizationTriggered {
754        checkpoint_id: String,
755        deviation_ratio: f64,
756    },
757    /// Plan was switched.
758    PlanSwitched {
759        old_operator_count: usize,
760        new_operator_count: usize,
761    },
762    /// Execution completed.
763    ExecutionCompleted { total_rows: u64 },
764}
765
766/// Callback for observing adaptive execution events.
767pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
768
769/// Builder for creating adaptive pipelines.
770pub struct AdaptivePipelineBuilder {
771    checkpoints: Vec<AdaptiveCheckpoint>,
772    config: AdaptivePipelineConfig,
773    context: AdaptiveContext,
774    event_callback: Option<AdaptiveEventCallback>,
775}
776
777impl AdaptivePipelineBuilder {
778    /// Creates a new builder with default configuration.
779    #[must_use]
780    pub fn new() -> Self {
781        Self {
782            checkpoints: Vec::new(),
783            config: AdaptivePipelineConfig::default(),
784            context: AdaptiveContext::new(),
785            event_callback: None,
786        }
787    }
788
789    /// Sets the configuration.
790    #[must_use]
791    pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
792        self.config = config;
793        self
794    }
795
796    /// Adds a checkpoint at the specified operator index.
797    #[must_use]
798    pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
799        self.checkpoints
800            .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
801        self.context.set_estimate(id, estimated);
802        self
803    }
804
805    /// Sets an event callback for observing execution.
806    #[must_use]
807    pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
808        self.event_callback = Some(callback);
809        self
810    }
811
812    /// Sets estimates from a pre-configured context.
813    #[must_use]
814    pub fn with_context(mut self, context: AdaptiveContext) -> Self {
815        self.context = context;
816        self
817    }
818
819    /// Builds the configuration for adaptive execution.
820    #[must_use]
821    pub fn build(self) -> AdaptiveExecutionConfig {
822        AdaptiveExecutionConfig {
823            checkpoints: self.checkpoints,
824            config: self.config,
825            context: self.context,
826            event_callback: self.event_callback,
827        }
828    }
829}
830
831impl Default for AdaptivePipelineBuilder {
832    fn default() -> Self {
833        Self::new()
834    }
835}
836
837/// Configuration for adaptive execution, built by `AdaptivePipelineBuilder`.
838pub struct AdaptiveExecutionConfig {
839    /// Checkpoints for monitoring cardinality.
840    pub checkpoints: Vec<AdaptiveCheckpoint>,
841    /// Execution configuration.
842    pub config: AdaptivePipelineConfig,
843    /// Adaptive context with estimates.
844    pub context: AdaptiveContext,
845    /// Optional event callback.
846    pub event_callback: Option<AdaptiveEventCallback>,
847}
848
849impl AdaptiveExecutionConfig {
850    /// Returns a summary of the adaptive execution after it completes.
851    #[must_use]
852    pub fn summary(&self) -> AdaptiveSummary {
853        self.context.summary()
854    }
855
856    /// Records actual cardinality for a checkpoint.
857    pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
858        self.context.record_actual(checkpoint_id, actual);
859
860        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
861            cp.actual_rows = actual;
862        }
863
864        if let Some(ref callback) = self.event_callback {
865            let estimated = self
866                .context
867                .get_checkpoint(checkpoint_id)
868                .map_or(0.0, |cp| cp.estimated);
869            callback(AdaptiveEvent::CheckpointReached {
870                id: checkpoint_id.to_string(),
871                actual_rows: actual,
872                estimated,
873            });
874        }
875    }
876
877    /// Checks if any checkpoint exceeds the deviation threshold.
878    #[must_use]
879    pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
880        self.checkpoints.iter().find(|cp| {
881            !cp.triggered
882                && cp.exceeds_threshold(
883                    self.config.reoptimization_threshold,
884                    self.config.min_rows_for_reoptimization,
885                )
886        })
887    }
888
889    /// Marks a checkpoint as having triggered re-optimization.
890    pub fn mark_triggered(&mut self, checkpoint_id: &str) {
891        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
892            cp.triggered = true;
893        }
894
895        if let Some(ref callback) = self.event_callback {
896            let ratio = self
897                .context
898                .get_checkpoint(checkpoint_id)
899                .filter(|cp| cp.recorded)
900                .map_or(1.0, |cp| cp.deviation_ratio());
901            callback(AdaptiveEvent::ReoptimizationTriggered {
902                checkpoint_id: checkpoint_id.to_string(),
903                deviation_ratio: ratio,
904            });
905        }
906    }
907}
908
909// ============= Pull-Based Operator Tracking =============
910
911use super::operators::{Operator, OperatorResult}; // OperatorError imported above
912
913/// A wrapper that tracks cardinality for pull-based operators.
914///
915/// This wraps any `Operator` and counts the rows flowing through it,
916/// reporting the count to the adaptive context. Use this for integrating
917/// adaptive execution with the standard pull-based executor.
918pub struct CardinalityTrackingWrapper {
919    /// The wrapped operator.
920    inner: Box<dyn Operator>,
921    /// Operator identifier for reporting.
922    operator_id: String,
923    /// Row counter.
924    row_count: u64,
925    /// Shared adaptive context for reporting.
926    context: SharedAdaptiveContext,
927    /// Whether finalization has been reported.
928    finalized: bool,
929}
930
931impl CardinalityTrackingWrapper {
932    /// Creates a new tracking wrapper for a pull-based operator.
933    pub fn new(
934        inner: Box<dyn Operator>,
935        operator_id: &str,
936        context: SharedAdaptiveContext,
937    ) -> Self {
938        Self {
939            inner,
940            operator_id: operator_id.to_string(),
941            row_count: 0,
942            context,
943            finalized: false,
944        }
945    }
946
947    /// Returns the current row count.
948    #[must_use]
949    pub fn current_count(&self) -> u64 {
950        self.row_count
951    }
952
953    /// Reports the final cardinality to the context.
954    fn report_final(&mut self) {
955        if !self.finalized {
956            self.context
957                .record_actual(&self.operator_id, self.row_count);
958            self.finalized = true;
959        }
960    }
961}
962
963impl Operator for CardinalityTrackingWrapper {
964    fn next(&mut self) -> OperatorResult {
965        match self.inner.next() {
966            Ok(Some(chunk)) => {
967                // Track rows
968                self.row_count += chunk.row_count() as u64;
969                Ok(Some(chunk))
970            }
971            Ok(None) => {
972                // Stream exhausted - report final cardinality
973                self.report_final();
974                Ok(None)
975            }
976            Err(e) => {
977                // Report on error too
978                self.report_final();
979                Err(e)
980            }
981        }
982    }
983
984    fn reset(&mut self) {
985        self.row_count = 0;
986        self.finalized = false;
987        self.inner.reset();
988    }
989
990    fn name(&self) -> &'static str {
991        self.inner.name()
992    }
993}
994
995impl Drop for CardinalityTrackingWrapper {
996    fn drop(&mut self) {
997        // Ensure we report even if dropped early
998        self.report_final();
999    }
1000}
1001
1002// ============= Adaptive Pipeline Execution =============
1003
1004use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; // Sink imported above
1005use super::sink::CollectorSink;
1006use super::source::OperatorSource;
1007
1008/// High-level adaptive pipeline that executes a pull-based operator with
1009/// cardinality tracking using push-based infrastructure.
1010///
1011/// This bridges the pull-based planner output with push-based execution:
1012/// 1. Wraps the pull operator as an `OperatorSource`
1013/// 2. Uses `CardinalityTrackingSink` to track output cardinality
1014/// 3. Provides adaptive feedback through `AdaptiveContext`
1015///
1016/// # Example
1017///
1018/// ```ignore
1019/// use grafeo_core::execution::adaptive::AdaptivePipelineExecutor;
1020///
1021/// let executor = AdaptivePipelineExecutor::new(operator, adaptive_context);
1022/// let (chunks, summary) = executor.execute()?;
1023/// ```
1024pub struct AdaptivePipelineExecutor {
1025    source: OperatorSource,
1026    context: SharedAdaptiveContext,
1027    config: AdaptivePipelineConfig,
1028}
1029
1030impl AdaptivePipelineExecutor {
1031    /// Creates a new adaptive pipeline executor.
1032    ///
1033    /// # Arguments
1034    ///
1035    /// * `operator` - The pull-based operator to execute
1036    /// * `context` - Adaptive context with cardinality estimates
1037    pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1038        Self {
1039            source: OperatorSource::new(operator),
1040            context: SharedAdaptiveContext::from_context(context),
1041            config: AdaptivePipelineConfig::default(),
1042        }
1043    }
1044
1045    /// Creates an executor with custom configuration.
1046    pub fn with_config(
1047        operator: Box<dyn Operator>,
1048        context: AdaptiveContext,
1049        config: AdaptivePipelineConfig,
1050    ) -> Self {
1051        Self {
1052            source: OperatorSource::new(operator),
1053            context: SharedAdaptiveContext::from_context(context),
1054            config,
1055        }
1056    }
1057
1058    /// Executes the pipeline and returns collected chunks with adaptive summary.
1059    ///
1060    /// # Returns
1061    ///
1062    /// A tuple of (collected DataChunks, adaptive execution summary).
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if execution fails.
1067    pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1068        let mut sink = CardinalityTrackingSink::new(
1069            Box::new(CollectorSink::new()),
1070            "output",
1071            self.context.clone(),
1072        );
1073
1074        let chunk_size = DEFAULT_CHUNK_SIZE;
1075        let mut total_rows: u64 = 0;
1076        let check_interval = self.config.check_interval;
1077
1078        // Process all chunks from source
1079        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1080            let chunk_rows = chunk.len() as u64;
1081            total_rows += chunk_rows;
1082
1083            // Push to tracking sink
1084            let continue_exec = sink.consume(chunk)?;
1085            if !continue_exec {
1086                break;
1087            }
1088
1089            // Periodically check for reoptimization need
1090            if total_rows >= check_interval
1091                && total_rows.is_multiple_of(check_interval)
1092                && self.context.should_reoptimize()
1093            {
1094                // Log or emit event that reoptimization would be triggered
1095                // Full re-planning would happen at a higher level
1096            }
1097        }
1098
1099        // Finalize sink
1100        sink.finalize()?;
1101
1102        // Extract results from the inner sink
1103        let summary = self
1104            .context
1105            .snapshot()
1106            .map(|ctx| ctx.summary())
1107            .unwrap_or_default();
1108
1109        // Get collected chunks from the inner CollectorSink
1110        // Note: We need to extract chunks from the wrapped sink
1111        // For now, we'll return the summary and the caller can collect separately
1112        Ok((Vec::new(), summary))
1113    }
1114
1115    /// Executes and collects all results into DataChunks.
1116    ///
1117    /// This is a simpler interface that handles chunk collection internally.
1118    pub fn execute_collecting(
1119        mut self,
1120    ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1121        let mut chunks = Vec::new();
1122        let chunk_size = DEFAULT_CHUNK_SIZE;
1123        let mut total_rows: u64 = 0;
1124        let check_interval = self.config.check_interval;
1125
1126        // Process all chunks from source
1127        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1128            let chunk_rows = chunk.len() as u64;
1129            total_rows += chunk_rows;
1130
1131            // Record cardinality
1132            self.context.record_actual("root", total_rows);
1133
1134            // Collect the chunk
1135            if !chunk.is_empty() {
1136                chunks.push(chunk);
1137            }
1138
1139            // Periodically check for reoptimization
1140            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1141                let _ = self.context.should_reoptimize();
1142            }
1143        }
1144
1145        let summary = self
1146            .context
1147            .snapshot()
1148            .map(|ctx| ctx.summary())
1149            .unwrap_or_default();
1150
1151        Ok((chunks, summary))
1152    }
1153
1154    /// Returns a reference to the shared context for external monitoring.
1155    pub fn context(&self) -> &SharedAdaptiveContext {
1156        &self.context
1157    }
1158}
1159
1160/// Convenience function to execute a pull-based operator with adaptive tracking.
1161///
1162/// This is the recommended entry point for adaptive execution.
1163///
1164/// # Arguments
1165///
1166/// * `operator` - The pull-based operator to execute
1167/// * `context` - Adaptive context with cardinality estimates (or None for default)
1168/// * `config` - Optional configuration (uses defaults if None)
1169///
1170/// # Returns
1171///
1172/// A tuple of (collected DataChunks, adaptive summary if tracking was enabled).
1173pub fn execute_adaptive(
1174    operator: Box<dyn Operator>,
1175    context: Option<AdaptiveContext>,
1176    config: Option<AdaptivePipelineConfig>,
1177) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1178    let ctx = context.unwrap_or_default();
1179    let cfg = config.unwrap_or_default();
1180
1181    let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1182    let (chunks, summary) = executor.execute_collecting()?;
1183
1184    Ok((chunks, Some(summary)))
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189    use super::*;
1190
1191    #[test]
1192    fn test_checkpoint_deviation_ratio() {
1193        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1194        cp.record(200);
1195
1196        // Actual is 2x estimate
1197        assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1198    }
1199
1200    #[test]
1201    fn test_checkpoint_underestimate() {
1202        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1203        cp.record(500);
1204
1205        // Underestimated by 5x
1206        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1207        assert!(cp.is_significant_deviation(3.0));
1208    }
1209
1210    #[test]
1211    fn test_checkpoint_overestimate() {
1212        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1213        cp.record(20);
1214
1215        // Overestimated - actual is 0.2x estimate
1216        assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1217        assert!(cp.is_significant_deviation(3.0)); // 0.2 < 1/3
1218    }
1219
1220    #[test]
1221    fn test_checkpoint_accurate() {
1222        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1223        cp.record(110);
1224
1225        // Close to estimate
1226        assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1227        assert!(!cp.is_significant_deviation(3.0)); // 1.1 is within threshold
1228    }
1229
1230    #[test]
1231    fn test_checkpoint_zero_estimate() {
1232        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1233        cp.record(100);
1234
1235        // Zero estimate with actual rows = infinity
1236        assert!(cp.deviation_ratio().is_infinite());
1237    }
1238
1239    #[test]
1240    fn test_checkpoint_zero_both() {
1241        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1242        cp.record(0);
1243
1244        // Zero estimate and zero actual = ratio of 1.0
1245        assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1246    }
1247
1248    #[test]
1249    fn test_feedback_collection() {
1250        let mut feedback = CardinalityFeedback::new();
1251        feedback.record("scan_1", 1000);
1252        feedback.record("filter_1", 100);
1253
1254        assert_eq!(feedback.get("scan_1"), Some(1000));
1255        assert_eq!(feedback.get("filter_1"), Some(100));
1256        assert_eq!(feedback.get("unknown"), None);
1257    }
1258
1259    #[test]
1260    fn test_feedback_running_counter() {
1261        let mut feedback = CardinalityFeedback::new();
1262        feedback.init_counter("op_1");
1263
1264        feedback.add_rows("op_1", 100);
1265        feedback.add_rows("op_1", 200);
1266        feedback.add_rows("op_1", 50);
1267
1268        assert_eq!(feedback.get_running("op_1"), Some(350));
1269
1270        feedback.finalize_counter("op_1");
1271        assert_eq!(feedback.get("op_1"), Some(350));
1272    }
1273
1274    #[test]
1275    fn test_adaptive_context_basic() {
1276        let mut ctx = AdaptiveContext::new();
1277        ctx.set_estimate("scan", 1000.0);
1278        ctx.set_estimate("filter", 100.0);
1279
1280        ctx.record_actual("scan", 1000);
1281        ctx.record_actual("filter", 500); // 5x underestimate
1282
1283        let cp = ctx.get_checkpoint("filter").unwrap();
1284        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1285    }
1286
1287    #[test]
1288    fn test_adaptive_context_should_reoptimize() {
1289        let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1290        ctx.set_estimate("scan", 10000.0);
1291        ctx.set_estimate("filter", 1000.0);
1292
1293        ctx.record_actual("scan", 10000);
1294        ctx.record_actual("filter", 5000); // 5x underestimate
1295
1296        assert!(ctx.should_reoptimize());
1297        assert_eq!(ctx.trigger_operator(), Some("filter"));
1298
1299        // Second call should return false (already triggered)
1300        assert!(!ctx.should_reoptimize());
1301    }
1302
1303    #[test]
1304    fn test_adaptive_context_min_rows() {
1305        let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1306        ctx.set_estimate("filter", 100.0);
1307        ctx.record_actual("filter", 500); // 5x, but only 500 rows
1308
1309        // Should not trigger because we haven't seen enough rows
1310        assert!(!ctx.should_reoptimize());
1311    }
1312
1313    #[test]
1314    fn test_adaptive_context_no_deviation() {
1315        let mut ctx = AdaptiveContext::new();
1316        ctx.set_estimate("scan", 1000.0);
1317        ctx.record_actual("scan", 1100); // Close to estimate
1318
1319        assert!(!ctx.has_significant_deviation());
1320        assert!(!ctx.should_reoptimize());
1321    }
1322
1323    #[test]
1324    fn test_adaptive_context_correction_factor() {
1325        let mut ctx = AdaptiveContext::new();
1326        ctx.set_estimate("filter", 100.0);
1327        ctx.record_actual("filter", 300);
1328
1329        assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1330        assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1331    }
1332
1333    #[test]
1334    fn test_adaptive_context_apply_feedback() {
1335        let mut ctx = AdaptiveContext::new();
1336        ctx.set_estimate("scan", 1000.0);
1337        ctx.set_estimate("filter", 100.0);
1338
1339        let mut feedback = CardinalityFeedback::new();
1340        feedback.record("scan", 1000);
1341        feedback.record("filter", 500);
1342
1343        ctx.apply_feedback(&feedback);
1344
1345        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1346        assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1347    }
1348
1349    #[test]
1350    fn test_adaptive_summary() {
1351        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1352        ctx.set_estimate("op1", 100.0);
1353        ctx.set_estimate("op2", 200.0);
1354        ctx.set_estimate("op3", 300.0);
1355
1356        ctx.record_actual("op1", 100); // Exact
1357        ctx.record_actual("op2", 600); // 3x
1358
1359        // Trigger reoptimization
1360        let _ = ctx.should_reoptimize();
1361
1362        let summary = ctx.summary();
1363        assert_eq!(summary.checkpoint_count, 3);
1364        assert_eq!(summary.recorded_count, 2);
1365        assert_eq!(summary.deviation_count, 1);
1366        assert!(summary.reoptimization_triggered);
1367    }
1368
1369    #[test]
1370    fn test_adaptive_context_reset() {
1371        let mut ctx = AdaptiveContext::new();
1372        ctx.set_estimate("scan", 1000.0);
1373        ctx.record_actual("scan", 5000);
1374        let _ = ctx.should_reoptimize(); // Trigger
1375
1376        assert!(ctx.reoptimization_triggered);
1377
1378        ctx.reset();
1379
1380        assert!(!ctx.reoptimization_triggered);
1381        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1382        assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1383        // Estimate should be preserved
1384        assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1385    }
1386
1387    #[test]
1388    fn test_shared_context() {
1389        let ctx = SharedAdaptiveContext::new();
1390
1391        ctx.record_actual("op1", 1000);
1392
1393        let snapshot = ctx.snapshot().unwrap();
1394        assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1395    }
1396
1397    #[test]
1398    fn test_reoptimization_decision_continue() {
1399        let mut ctx = AdaptiveContext::new();
1400        ctx.set_estimate("scan", 1000.0);
1401        ctx.record_actual("scan", 1100);
1402
1403        let decision = evaluate_reoptimization(&ctx);
1404        assert_eq!(decision, ReoptimizationDecision::Continue);
1405    }
1406
1407    #[test]
1408    fn test_reoptimization_decision_reoptimize() {
1409        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1410        ctx.set_estimate("filter", 100.0);
1411        ctx.record_actual("filter", 500);
1412        let _ = ctx.should_reoptimize(); // Trigger
1413
1414        let decision = evaluate_reoptimization(&ctx);
1415
1416        if let ReoptimizationDecision::Reoptimize {
1417            trigger,
1418            corrections,
1419        } = decision
1420        {
1421            assert_eq!(trigger, "filter");
1422            assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1423        } else {
1424            panic!("Expected Reoptimize decision");
1425        }
1426    }
1427
1428    #[test]
1429    fn test_reoptimization_decision_abort() {
1430        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1431        ctx.set_estimate("filter", 1.0);
1432        ctx.record_actual("filter", 1000); // 1000x deviation!
1433        let _ = ctx.should_reoptimize();
1434
1435        let decision = evaluate_reoptimization(&ctx);
1436
1437        if let ReoptimizationDecision::Abort { reason } = decision {
1438            assert!(reason.contains("Catastrophic"));
1439        } else {
1440            panic!("Expected Abort decision");
1441        }
1442    }
1443
1444    #[test]
1445    fn test_absolute_deviation() {
1446        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1447        cp.record(150);
1448
1449        assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1450    }
1451
1452    // ============= Plan Switching Tests =============
1453
1454    #[test]
1455    fn test_adaptive_checkpoint_basic() {
1456        let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1457        assert_eq!(cp.actual_rows, 0);
1458        assert!(!cp.triggered);
1459
1460        cp.record_rows(50);
1461        assert_eq!(cp.actual_rows, 50);
1462
1463        cp.record_rows(100);
1464        assert_eq!(cp.actual_rows, 150);
1465    }
1466
1467    #[test]
1468    fn test_adaptive_checkpoint_exceeds_threshold() {
1469        let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1470
1471        // Below min rows
1472        cp.record_rows(50);
1473        assert!(!cp.exceeds_threshold(2.0, 100));
1474
1475        // Above min rows but within threshold
1476        cp.record_rows(50);
1477        assert!(!cp.exceeds_threshold(2.0, 100)); // 100 actual vs 100 estimated = 1.0x
1478
1479        // Above threshold (underestimate)
1480        cp.actual_rows = 0;
1481        cp.record_rows(500);
1482        assert!(cp.exceeds_threshold(2.0, 100)); // 500 actual vs 100 estimated = 5.0x
1483
1484        // Above threshold (overestimate)
1485        let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1486        cp2.record_rows(200);
1487        assert!(cp2.exceeds_threshold(2.0, 100)); // 200 actual vs 1000 estimated = 0.2x
1488    }
1489
1490    #[test]
1491    fn test_adaptive_pipeline_config_default() {
1492        let config = AdaptivePipelineConfig::default();
1493
1494        assert_eq!(config.check_interval, 10_000);
1495        assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1496        assert_eq!(
1497            config.min_rows_for_reoptimization,
1498            MIN_ROWS_FOR_REOPTIMIZATION
1499        );
1500        assert_eq!(config.max_reoptimizations, 3);
1501    }
1502
1503    #[test]
1504    fn test_adaptive_pipeline_config_custom() {
1505        let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1506
1507        assert_eq!(config.check_interval, 5000);
1508        assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1509        assert_eq!(config.min_rows_for_reoptimization, 500);
1510        assert_eq!(config.max_reoptimizations, 5);
1511    }
1512
1513    #[test]
1514    fn test_adaptive_pipeline_builder() {
1515        let config = AdaptivePipelineBuilder::new()
1516            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1517            .with_checkpoint("scan", 0, 10000.0)
1518            .with_checkpoint("filter", 1, 1000.0)
1519            .build();
1520
1521        assert_eq!(config.checkpoints.len(), 2);
1522        assert_eq!(config.checkpoints[0].id, "scan");
1523        assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1524        assert_eq!(config.checkpoints[1].id, "filter");
1525        assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1526    }
1527
1528    #[test]
1529    fn test_adaptive_execution_config_record_checkpoint() {
1530        let mut config = AdaptivePipelineBuilder::new()
1531            .with_checkpoint("filter", 0, 100.0)
1532            .build();
1533
1534        config.record_checkpoint("filter", 500);
1535
1536        // Check context was updated
1537        let cp = config.context.get_checkpoint("filter").unwrap();
1538        assert_eq!(cp.actual, 500);
1539        assert!(cp.recorded);
1540
1541        // Check checkpoint was updated
1542        let acp = config
1543            .checkpoints
1544            .iter()
1545            .find(|c| c.id == "filter")
1546            .unwrap();
1547        assert_eq!(acp.actual_rows, 500);
1548    }
1549
1550    #[test]
1551    fn test_adaptive_execution_config_should_reoptimize() {
1552        let mut config = AdaptivePipelineBuilder::new()
1553            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1554            .with_checkpoint("filter", 0, 100.0)
1555            .build();
1556
1557        // No data yet - should not trigger
1558        assert!(config.should_reoptimize().is_none());
1559
1560        // Record within threshold
1561        config.record_checkpoint("filter", 150);
1562        assert!(config.should_reoptimize().is_none()); // 1.5x is within 2.0x threshold
1563
1564        // Record exceeding threshold
1565        config.checkpoints[0].actual_rows = 0; // Reset for new test
1566        config.record_checkpoint("filter", 500);
1567        config.checkpoints[0].actual_rows = 500;
1568
1569        let trigger = config.should_reoptimize();
1570        assert!(trigger.is_some());
1571        assert_eq!(trigger.unwrap().id, "filter");
1572    }
1573
1574    #[test]
1575    fn test_adaptive_execution_config_mark_triggered() {
1576        let mut config = AdaptivePipelineBuilder::new()
1577            .with_checkpoint("filter", 0, 100.0)
1578            .build();
1579
1580        assert!(!config.checkpoints[0].triggered);
1581
1582        config.mark_triggered("filter");
1583
1584        assert!(config.checkpoints[0].triggered);
1585    }
1586
1587    #[test]
1588    fn test_adaptive_event_callback() {
1589        use std::sync::atomic::AtomicUsize;
1590
1591        let event_count = Arc::new(AtomicUsize::new(0));
1592        let counter = event_count.clone();
1593
1594        let mut config = AdaptivePipelineBuilder::new()
1595            .with_checkpoint("filter", 0, 100.0)
1596            .with_event_callback(Box::new(move |_event| {
1597                counter.fetch_add(1, Ordering::Relaxed);
1598            }))
1599            .build();
1600
1601        config.record_checkpoint("filter", 500);
1602
1603        // Should have received one CheckpointReached event
1604        assert_eq!(event_count.load(Ordering::Relaxed), 1);
1605
1606        config.mark_triggered("filter");
1607
1608        // Should have received one ReoptimizationTriggered event
1609        assert_eq!(event_count.load(Ordering::Relaxed), 2);
1610    }
1611
1612    #[test]
1613    fn test_adaptive_checkpoint_with_zero_estimate() {
1614        let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1615        cp.record_rows(100);
1616
1617        // Zero estimate should trigger if any rows are seen
1618        assert!(cp.exceeds_threshold(2.0, 50));
1619    }
1620}