Skip to main content

grafeo_core/execution/
adaptive.rs

1//! Adaptive query execution with runtime cardinality feedback.
2//!
3//! This module provides adaptive execution capabilities that allow the query
4//! engine to adjust its execution strategy based on actual runtime cardinalities.
5//!
6//! The key insight is that cardinality estimates can be significantly wrong,
7//! especially for complex predicates or skewed data. By tracking actual row
8//! counts during execution, we can detect when our estimates are off and
9//! potentially re-optimize the remaining query plan.
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
15//! │  Optimizer  │────>│ Estimated Cards  │     │ CardinalityFeed │
16//! └─────────────┘     └──────────────────┘     │     back        │
17//!                              │               └────────┬────────┘
18//!                              v                        │
19//!                     ┌──────────────────┐              │
20//!                     │ AdaptiveContext  │<─────────────┘
21//!                     └────────┬─────────┘
22//!                              │
23//!                     ┌────────v─────────┐
24//!                     │ ReoptimizeCheck  │
25//!                     └──────────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```ignore
31//! use grafeo_core::execution::adaptive::{AdaptiveContext, CardinalityCheckpoint};
32//!
33//! // Set up adaptive context with estimated cardinalities
34//! let mut ctx = AdaptiveContext::new();
35//! ctx.set_estimate("scan_1", 1000.0);
36//! ctx.set_estimate("filter_1", 100.0);  // Expected 10% selectivity
37//!
38//! // During execution, record actuals
39//! ctx.record_actual("scan_1", 1000);
40//! ctx.record_actual("filter_1", 500);   // Actual 50% selectivity!
41//!
42//! // Check if re-optimization is warranted
43//! if ctx.should_reoptimize() {
44//!     // Re-plan remaining operators with updated statistics
45//! }
46//! ```
47
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56/// Threshold for deviation that triggers re-optimization consideration.
57/// A value of 2.0 means actual cardinality is 2x or 0.5x the estimate.
58pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60/// Minimum number of rows before considering re-optimization.
61/// Helps avoid thrashing on small result sets.
62pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64/// A checkpoint for tracking cardinality at a specific point in the plan.
65#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67    /// Unique identifier for this checkpoint (typically operator name/id).
68    pub operator_id: String,
69    /// Estimated cardinality from the optimizer.
70    pub estimated: f64,
71    /// Actual row count observed during execution.
72    pub actual: u64,
73    /// Whether this checkpoint has been recorded.
74    pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78    /// Creates a new checkpoint with an estimate.
79    #[must_use]
80    pub fn new(operator_id: &str, estimated: f64) -> Self {
81        Self {
82            operator_id: operator_id.to_string(),
83            estimated,
84            actual: 0,
85            recorded: false,
86        }
87    }
88
89    /// Records the actual cardinality.
90    pub fn record(&mut self, actual: u64) {
91        self.actual = actual;
92        self.recorded = true;
93    }
94
95    /// Returns the deviation ratio (actual / estimated).
96    ///
97    /// A ratio > 1.0 means we underestimated, < 1.0 means overestimated.
98    /// Returns 1.0 if estimate is 0 to avoid division by zero.
99    #[must_use]
100    pub fn deviation_ratio(&self) -> f64 {
101        if self.estimated <= 0.0 {
102            return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103        }
104        self.actual as f64 / self.estimated
105    }
106
107    /// Returns the absolute deviation (|actual - estimated|).
108    #[must_use]
109    pub fn absolute_deviation(&self) -> f64 {
110        (self.actual as f64 - self.estimated).abs()
111    }
112
113    /// Checks if this checkpoint shows significant deviation.
114    #[must_use]
115    pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116        if !self.recorded {
117            return false;
118        }
119        let ratio = self.deviation_ratio();
120        ratio > threshold || ratio < 1.0 / threshold
121    }
122}
123
124/// Feedback from runtime execution about actual cardinalities.
125///
126/// This struct collects actual row counts at various points during query
127/// execution, allowing comparison with optimizer estimates.
128#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130    /// Actual row counts by operator ID.
131    actuals: HashMap<String, u64>,
132    /// Running count (for streaming updates).
133    running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137    /// Creates a new empty feedback collector.
138    #[must_use]
139    pub fn new() -> Self {
140        Self {
141            actuals: HashMap::new(),
142            running_counts: HashMap::new(),
143        }
144    }
145
146    /// Records the final actual cardinality for an operator.
147    pub fn record(&mut self, operator_id: &str, count: u64) {
148        self.actuals.insert(operator_id.to_string(), count);
149    }
150
151    /// Adds to the running count for an operator (thread-safe).
152    pub fn add_rows(&self, operator_id: &str, count: u64) {
153        if let Some(counter) = self.running_counts.get(operator_id) {
154            counter.fetch_add(count, Ordering::Relaxed);
155        }
156    }
157
158    /// Initializes a running counter for an operator.
159    pub fn init_counter(&mut self, operator_id: &str) {
160        self.running_counts
161            .insert(operator_id.to_string(), AtomicU64::new(0));
162    }
163
164    /// Finalizes the running count into the actuals.
165    pub fn finalize_counter(&mut self, operator_id: &str) {
166        if let Some(counter) = self.running_counts.get(operator_id) {
167            let count = counter.load(Ordering::Relaxed);
168            self.actuals.insert(operator_id.to_string(), count);
169        }
170    }
171
172    /// Gets the actual count for an operator.
173    #[must_use]
174    pub fn get(&self, operator_id: &str) -> Option<u64> {
175        self.actuals.get(operator_id).copied()
176    }
177
178    /// Gets the current running count for an operator.
179    #[must_use]
180    pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181        self.running_counts
182            .get(operator_id)
183            .map(|c| c.load(Ordering::Relaxed))
184    }
185
186    /// Returns all recorded actuals.
187    #[must_use]
188    pub fn all_actuals(&self) -> &HashMap<String, u64> {
189        &self.actuals
190    }
191}
192
193/// Context for adaptive query execution.
194///
195/// Holds both estimated and actual cardinalities, and provides methods
196/// to determine when re-optimization should occur.
197#[derive(Debug)]
198pub struct AdaptiveContext {
199    /// Checkpoints with estimates and actuals.
200    checkpoints: HashMap<String, CardinalityCheckpoint>,
201    /// Threshold ratio for triggering re-optimization.
202    reoptimization_threshold: f64,
203    /// Minimum rows before considering re-optimization.
204    min_rows: u64,
205    /// Whether re-optimization has been triggered.
206    reoptimization_triggered: bool,
207    /// Operator that caused re-optimization (if any).
208    trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212    /// Creates a new adaptive context with default settings.
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            checkpoints: HashMap::new(),
217            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218            min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219            reoptimization_triggered: false,
220            trigger_operator: None,
221        }
222    }
223
224    /// Creates a context with custom thresholds.
225    #[must_use]
226    pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227        Self {
228            checkpoints: HashMap::new(),
229            reoptimization_threshold: threshold,
230            min_rows,
231            reoptimization_triggered: false,
232            trigger_operator: None,
233        }
234    }
235
236    /// Sets the estimated cardinality for an operator.
237    pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238        self.checkpoints.insert(
239            operator_id.to_string(),
240            CardinalityCheckpoint::new(operator_id, estimate),
241        );
242    }
243
244    /// Records the actual cardinality for an operator.
245    pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246        if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247            checkpoint.record(actual);
248        } else {
249            // Create checkpoint with unknown estimate
250            let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251            checkpoint.record(actual);
252            self.checkpoints.insert(operator_id.to_string(), checkpoint);
253        }
254    }
255
256    /// Applies feedback from a `CardinalityFeedback` collector.
257    pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258        for (op_id, &actual) in feedback.all_actuals() {
259            self.record_actual(op_id, actual);
260        }
261    }
262
263    /// Checks if any checkpoint shows significant deviation.
264    #[must_use]
265    pub fn has_significant_deviation(&self) -> bool {
266        self.checkpoints
267            .values()
268            .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269    }
270
271    /// Determines if re-optimization should be triggered.
272    ///
273    /// Returns true if:
274    /// - There's significant deviation from estimates
275    /// - We've processed enough rows to make a meaningful decision
276    /// - Re-optimization hasn't already been triggered
277    #[must_use]
278    pub fn should_reoptimize(&mut self) -> bool {
279        if self.reoptimization_triggered {
280            return false;
281        }
282
283        for (op_id, checkpoint) in &self.checkpoints {
284            if checkpoint.actual < self.min_rows {
285                continue;
286            }
287
288            if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289                self.reoptimization_triggered = true;
290                self.trigger_operator = Some(op_id.clone());
291                return true;
292            }
293        }
294
295        false
296    }
297
298    /// Returns the operator that triggered re-optimization, if any.
299    #[must_use]
300    pub fn trigger_operator(&self) -> Option<&str> {
301        self.trigger_operator.as_deref()
302    }
303
304    /// Gets the checkpoint for an operator.
305    #[must_use]
306    pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307        self.checkpoints.get(operator_id)
308    }
309
310    /// Returns all checkpoints.
311    #[must_use]
312    pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313        &self.checkpoints
314    }
315
316    /// Calculates a correction factor for a specific operator.
317    ///
318    /// This factor can be used to adjust remaining cardinality estimates.
319    #[must_use]
320    pub fn correction_factor(&self, operator_id: &str) -> f64 {
321        self.checkpoints
322            .get(operator_id)
323            .filter(|cp| cp.recorded)
324            .map(CardinalityCheckpoint::deviation_ratio)
325            .unwrap_or(1.0)
326    }
327
328    /// Returns summary statistics about the adaptive execution.
329    #[must_use]
330    pub fn summary(&self) -> AdaptiveSummary {
331        let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
332        let deviation_count = self
333            .checkpoints
334            .values()
335            .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
336            .count();
337
338        let avg_deviation = if recorded_count > 0 {
339            self.checkpoints
340                .values()
341                .filter(|cp| cp.recorded)
342                .map(CardinalityCheckpoint::deviation_ratio)
343                .sum::<f64>()
344                / recorded_count as f64
345        } else {
346            1.0
347        };
348
349        let max_deviation = self
350            .checkpoints
351            .values()
352            .filter(|cp| cp.recorded)
353            .map(|cp| {
354                let ratio = cp.deviation_ratio();
355                if ratio > 1.0 { ratio } else { 1.0 / ratio }
356            })
357            .fold(1.0_f64, f64::max);
358
359        AdaptiveSummary {
360            checkpoint_count: self.checkpoints.len(),
361            recorded_count,
362            deviation_count,
363            avg_deviation_ratio: avg_deviation,
364            max_deviation_ratio: max_deviation,
365            reoptimization_triggered: self.reoptimization_triggered,
366            trigger_operator: self.trigger_operator.clone(),
367        }
368    }
369
370    /// Resets the context for a new execution.
371    pub fn reset(&mut self) {
372        for checkpoint in self.checkpoints.values_mut() {
373            checkpoint.actual = 0;
374            checkpoint.recorded = false;
375        }
376        self.reoptimization_triggered = false;
377        self.trigger_operator = None;
378    }
379}
380
381impl Default for AdaptiveContext {
382    fn default() -> Self {
383        Self::new()
384    }
385}
386
387/// Summary of adaptive execution statistics.
388#[derive(Debug, Clone, Default)]
389pub struct AdaptiveSummary {
390    /// Total number of checkpoints.
391    pub checkpoint_count: usize,
392    /// Number of checkpoints with recorded actuals.
393    pub recorded_count: usize,
394    /// Number of checkpoints with significant deviation.
395    pub deviation_count: usize,
396    /// Average deviation ratio across all recorded checkpoints.
397    pub avg_deviation_ratio: f64,
398    /// Maximum deviation ratio observed.
399    pub max_deviation_ratio: f64,
400    /// Whether re-optimization was triggered.
401    pub reoptimization_triggered: bool,
402    /// Operator that triggered re-optimization.
403    pub trigger_operator: Option<String>,
404}
405
406/// Thread-safe wrapper for `AdaptiveContext`.
407///
408/// Allows multiple operators to report cardinalities concurrently.
409#[derive(Debug, Clone)]
410pub struct SharedAdaptiveContext {
411    inner: Arc<RwLock<AdaptiveContext>>,
412}
413
414impl SharedAdaptiveContext {
415    /// Creates a new shared context.
416    #[must_use]
417    pub fn new() -> Self {
418        Self {
419            inner: Arc::new(RwLock::new(AdaptiveContext::new())),
420        }
421    }
422
423    /// Creates from an existing context.
424    #[must_use]
425    pub fn from_context(ctx: AdaptiveContext) -> Self {
426        Self {
427            inner: Arc::new(RwLock::new(ctx)),
428        }
429    }
430
431    /// Records actual cardinality for an operator.
432    pub fn record_actual(&self, operator_id: &str, actual: u64) {
433        if let Ok(mut ctx) = self.inner.write() {
434            ctx.record_actual(operator_id, actual);
435        }
436    }
437
438    /// Checks if re-optimization should be triggered.
439    #[must_use]
440    pub fn should_reoptimize(&self) -> bool {
441        if let Ok(mut ctx) = self.inner.write() {
442            ctx.should_reoptimize()
443        } else {
444            false
445        }
446    }
447
448    /// Gets a read-only snapshot of the context.
449    #[must_use]
450    pub fn snapshot(&self) -> Option<AdaptiveContext> {
451        self.inner.read().ok().map(|guard| AdaptiveContext {
452            checkpoints: guard.checkpoints.clone(),
453            reoptimization_threshold: guard.reoptimization_threshold,
454            min_rows: guard.min_rows,
455            reoptimization_triggered: guard.reoptimization_triggered,
456            trigger_operator: guard.trigger_operator.clone(),
457        })
458    }
459}
460
461impl Default for SharedAdaptiveContext {
462    fn default() -> Self {
463        Self::new()
464    }
465}
466
467/// A wrapper operator that tracks cardinality and reports to an adaptive context.
468///
469/// This wraps any `PushOperator` and counts the rows flowing through it,
470/// reporting the count to the adaptive context.
471pub struct CardinalityTrackingOperator {
472    /// The wrapped operator.
473    inner: Box<dyn PushOperator>,
474    /// Operator identifier for reporting.
475    operator_id: String,
476    /// Row counter.
477    row_count: u64,
478    /// Shared adaptive context for reporting.
479    context: SharedAdaptiveContext,
480}
481
482impl CardinalityTrackingOperator {
483    /// Creates a new tracking wrapper.
484    pub fn new(
485        inner: Box<dyn PushOperator>,
486        operator_id: &str,
487        context: SharedAdaptiveContext,
488    ) -> Self {
489        Self {
490            inner,
491            operator_id: operator_id.to_string(),
492            row_count: 0,
493            context,
494        }
495    }
496
497    /// Returns the current row count.
498    #[must_use]
499    pub fn current_count(&self) -> u64 {
500        self.row_count
501    }
502}
503
504impl PushOperator for CardinalityTrackingOperator {
505    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
506        // Track input rows
507        self.row_count += chunk.len() as u64;
508
509        // Push through to inner operator
510        self.inner.push(chunk, sink)
511    }
512
513    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
514        // Report final cardinality to context
515        self.context
516            .record_actual(&self.operator_id, self.row_count);
517
518        // Finalize inner operator
519        self.inner.finalize(sink)
520    }
521
522    fn preferred_chunk_size(&self) -> ChunkSizeHint {
523        self.inner.preferred_chunk_size()
524    }
525
526    fn name(&self) -> &'static str {
527        // Return the inner operator's name
528        self.inner.name()
529    }
530}
531
532/// A sink that tracks cardinality of data flowing through it.
533pub struct CardinalityTrackingSink {
534    /// The wrapped sink.
535    inner: Box<dyn Sink>,
536    /// Operator identifier for reporting.
537    operator_id: String,
538    /// Row counter.
539    row_count: u64,
540    /// Shared adaptive context for reporting.
541    context: SharedAdaptiveContext,
542}
543
544impl CardinalityTrackingSink {
545    /// Creates a new tracking sink wrapper.
546    pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
547        Self {
548            inner,
549            operator_id: operator_id.to_string(),
550            row_count: 0,
551            context,
552        }
553    }
554
555    /// Returns the current row count.
556    #[must_use]
557    pub fn current_count(&self) -> u64 {
558        self.row_count
559    }
560}
561
562impl Sink for CardinalityTrackingSink {
563    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
564        self.row_count += chunk.len() as u64;
565        self.inner.consume(chunk)
566    }
567
568    fn finalize(&mut self) -> Result<(), OperatorError> {
569        // Report final cardinality
570        self.context
571            .record_actual(&self.operator_id, self.row_count);
572        self.inner.finalize()
573    }
574
575    fn name(&self) -> &'static str {
576        self.inner.name()
577    }
578}
579
580/// Decision about whether to re-optimize a query.
581#[derive(Debug, Clone, PartialEq)]
582pub enum ReoptimizationDecision {
583    /// Continue with current plan.
584    Continue,
585    /// Re-optimize the remaining plan.
586    Reoptimize {
587        /// The operator that triggered re-optimization.
588        trigger: String,
589        /// Correction factors to apply to remaining estimates.
590        corrections: HashMap<String, f64>,
591    },
592    /// Abort the query (catastrophic misestimate).
593    Abort { reason: String },
594}
595
596/// Evaluates whether re-optimization should occur based on context.
597#[must_use]
598pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
599    let summary = ctx.summary();
600
601    // If no significant deviations, continue
602    if !summary.reoptimization_triggered {
603        return ReoptimizationDecision::Continue;
604    }
605
606    // If deviation is catastrophic (>100x), consider aborting
607    if summary.max_deviation_ratio > 100.0 {
608        return ReoptimizationDecision::Abort {
609            reason: format!(
610                "Catastrophic cardinality misestimate: {}x deviation",
611                summary.max_deviation_ratio
612            ),
613        };
614    }
615
616    // Build correction factors
617    let corrections: HashMap<String, f64> = ctx
618        .all_checkpoints()
619        .iter()
620        .filter(|(_, cp)| cp.recorded)
621        .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
622        .collect();
623
624    ReoptimizationDecision::Reoptimize {
625        trigger: summary.trigger_operator.unwrap_or_default(),
626        corrections,
627    }
628}
629
630/// Callback for creating a new plan based on observed cardinalities.
631///
632/// This is called when the adaptive pipeline detects significant deviation
633/// and decides to re-optimize. The callback receives the adaptive context
634/// with recorded actuals and should return a new set of operators.
635pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
636
637/// Configuration for adaptive pipeline execution.
638#[derive(Debug, Clone)]
639pub struct AdaptivePipelineConfig {
640    /// Number of rows to process before checking for re-optimization.
641    pub check_interval: u64,
642    /// Threshold for deviation that triggers re-optimization.
643    pub reoptimization_threshold: f64,
644    /// Minimum rows before considering re-optimization.
645    pub min_rows_for_reoptimization: u64,
646    /// Maximum number of re-optimizations allowed per query.
647    pub max_reoptimizations: usize,
648}
649
650impl Default for AdaptivePipelineConfig {
651    fn default() -> Self {
652        Self {
653            check_interval: 10_000,
654            reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
655            min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
656            max_reoptimizations: 3,
657        }
658    }
659}
660
661impl AdaptivePipelineConfig {
662    /// Creates a new configuration with custom thresholds.
663    #[must_use]
664    pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
665        Self {
666            check_interval,
667            reoptimization_threshold: threshold,
668            min_rows_for_reoptimization: min_rows,
669            max_reoptimizations: 3,
670        }
671    }
672
673    /// Sets the maximum number of re-optimizations allowed.
674    #[must_use]
675    pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
676        self.max_reoptimizations = max;
677        self
678    }
679}
680
681/// Result of executing an adaptive pipeline.
682#[derive(Debug, Clone)]
683pub struct AdaptiveExecutionResult {
684    /// Total rows processed.
685    pub total_rows: u64,
686    /// Number of re-optimizations performed.
687    pub reoptimization_count: usize,
688    /// Operators that triggered re-optimization.
689    pub triggers: Vec<String>,
690    /// Final adaptive context with all recorded actuals.
691    pub final_context: AdaptiveSummary,
692}
693
694/// A checkpoint during adaptive execution where plan switching can occur.
695///
696/// Checkpoints are placed at strategic points in the pipeline (typically after
697/// filters or joins) where switching to a new plan makes sense.
698#[derive(Debug)]
699pub struct AdaptiveCheckpoint {
700    /// Unique identifier for this checkpoint.
701    pub id: String,
702    /// Operator index in the pipeline (after which this checkpoint occurs).
703    pub after_operator: usize,
704    /// Estimated cardinality at this point.
705    pub estimated_cardinality: f64,
706    /// Actual rows seen so far.
707    pub actual_rows: u64,
708    /// Whether this checkpoint has triggered re-optimization.
709    pub triggered: bool,
710}
711
712impl AdaptiveCheckpoint {
713    /// Creates a new checkpoint.
714    #[must_use]
715    pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
716        Self {
717            id: id.to_string(),
718            after_operator,
719            estimated_cardinality: estimated,
720            actual_rows: 0,
721            triggered: false,
722        }
723    }
724
725    /// Records rows passing through this checkpoint.
726    pub fn record_rows(&mut self, count: u64) {
727        self.actual_rows += count;
728    }
729
730    /// Checks if deviation exceeds threshold.
731    #[must_use]
732    pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
733        if self.actual_rows < min_rows {
734            return false;
735        }
736        if self.estimated_cardinality <= 0.0 {
737            return self.actual_rows > 0;
738        }
739        let ratio = self.actual_rows as f64 / self.estimated_cardinality;
740        ratio > threshold || ratio < 1.0 / threshold
741    }
742}
743
744/// Event emitted during adaptive execution.
745#[derive(Debug, Clone)]
746pub enum AdaptiveEvent {
747    /// A checkpoint was reached.
748    CheckpointReached {
749        id: String,
750        actual_rows: u64,
751        estimated: f64,
752    },
753    /// Re-optimization was triggered.
754    ReoptimizationTriggered {
755        checkpoint_id: String,
756        deviation_ratio: f64,
757    },
758    /// Plan was switched.
759    PlanSwitched {
760        old_operator_count: usize,
761        new_operator_count: usize,
762    },
763    /// Execution completed.
764    ExecutionCompleted { total_rows: u64 },
765}
766
767/// Callback for observing adaptive execution events.
768pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
769
770/// Builder for creating adaptive pipelines.
771pub struct AdaptivePipelineBuilder {
772    checkpoints: Vec<AdaptiveCheckpoint>,
773    config: AdaptivePipelineConfig,
774    context: AdaptiveContext,
775    event_callback: Option<AdaptiveEventCallback>,
776}
777
778impl AdaptivePipelineBuilder {
779    /// Creates a new builder with default configuration.
780    #[must_use]
781    pub fn new() -> Self {
782        Self {
783            checkpoints: Vec::new(),
784            config: AdaptivePipelineConfig::default(),
785            context: AdaptiveContext::new(),
786            event_callback: None,
787        }
788    }
789
790    /// Sets the configuration.
791    #[must_use]
792    pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
793        self.config = config;
794        self
795    }
796
797    /// Adds a checkpoint at the specified operator index.
798    #[must_use]
799    pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
800        self.checkpoints
801            .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
802        self.context.set_estimate(id, estimated);
803        self
804    }
805
806    /// Sets an event callback for observing execution.
807    #[must_use]
808    pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
809        self.event_callback = Some(callback);
810        self
811    }
812
813    /// Sets estimates from a pre-configured context.
814    #[must_use]
815    pub fn with_context(mut self, context: AdaptiveContext) -> Self {
816        self.context = context;
817        self
818    }
819
820    /// Builds the configuration for adaptive execution.
821    #[must_use]
822    pub fn build(self) -> AdaptiveExecutionConfig {
823        AdaptiveExecutionConfig {
824            checkpoints: self.checkpoints,
825            config: self.config,
826            context: self.context,
827            event_callback: self.event_callback,
828        }
829    }
830}
831
832impl Default for AdaptivePipelineBuilder {
833    fn default() -> Self {
834        Self::new()
835    }
836}
837
838/// Configuration for adaptive execution, built by `AdaptivePipelineBuilder`.
839pub struct AdaptiveExecutionConfig {
840    /// Checkpoints for monitoring cardinality.
841    pub checkpoints: Vec<AdaptiveCheckpoint>,
842    /// Execution configuration.
843    pub config: AdaptivePipelineConfig,
844    /// Adaptive context with estimates.
845    pub context: AdaptiveContext,
846    /// Optional event callback.
847    pub event_callback: Option<AdaptiveEventCallback>,
848}
849
850impl AdaptiveExecutionConfig {
851    /// Returns a summary of the adaptive execution after it completes.
852    #[must_use]
853    pub fn summary(&self) -> AdaptiveSummary {
854        self.context.summary()
855    }
856
857    /// Records actual cardinality for a checkpoint.
858    pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
859        self.context.record_actual(checkpoint_id, actual);
860
861        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
862            cp.actual_rows = actual;
863        }
864
865        if let Some(ref callback) = self.event_callback {
866            let estimated = self
867                .context
868                .get_checkpoint(checkpoint_id)
869                .map(|cp| cp.estimated)
870                .unwrap_or(0.0);
871            callback(AdaptiveEvent::CheckpointReached {
872                id: checkpoint_id.to_string(),
873                actual_rows: actual,
874                estimated,
875            });
876        }
877    }
878
879    /// Checks if any checkpoint exceeds the deviation threshold.
880    #[must_use]
881    pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
882        self.checkpoints.iter().find(|cp| {
883            !cp.triggered
884                && cp.exceeds_threshold(
885                    self.config.reoptimization_threshold,
886                    self.config.min_rows_for_reoptimization,
887                )
888        })
889    }
890
891    /// Marks a checkpoint as having triggered re-optimization.
892    pub fn mark_triggered(&mut self, checkpoint_id: &str) {
893        if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
894            cp.triggered = true;
895        }
896
897        if let Some(ref callback) = self.event_callback {
898            let ratio = self
899                .context
900                .get_checkpoint(checkpoint_id)
901                .filter(|cp| cp.recorded)
902                .map(|cp| cp.deviation_ratio())
903                .unwrap_or(1.0);
904            callback(AdaptiveEvent::ReoptimizationTriggered {
905                checkpoint_id: checkpoint_id.to_string(),
906                deviation_ratio: ratio,
907            });
908        }
909    }
910}
911
912// ============= Pull-Based Operator Tracking =============
913
914use super::operators::{Operator, OperatorResult}; // OperatorError imported above
915
916/// A wrapper that tracks cardinality for pull-based operators.
917///
918/// This wraps any `Operator` and counts the rows flowing through it,
919/// reporting the count to the adaptive context. Use this for integrating
920/// adaptive execution with the standard pull-based executor.
921pub struct CardinalityTrackingWrapper {
922    /// The wrapped operator.
923    inner: Box<dyn Operator>,
924    /// Operator identifier for reporting.
925    operator_id: String,
926    /// Row counter.
927    row_count: u64,
928    /// Shared adaptive context for reporting.
929    context: SharedAdaptiveContext,
930    /// Whether finalization has been reported.
931    finalized: bool,
932}
933
934impl CardinalityTrackingWrapper {
935    /// Creates a new tracking wrapper for a pull-based operator.
936    pub fn new(
937        inner: Box<dyn Operator>,
938        operator_id: &str,
939        context: SharedAdaptiveContext,
940    ) -> Self {
941        Self {
942            inner,
943            operator_id: operator_id.to_string(),
944            row_count: 0,
945            context,
946            finalized: false,
947        }
948    }
949
950    /// Returns the current row count.
951    #[must_use]
952    pub fn current_count(&self) -> u64 {
953        self.row_count
954    }
955
956    /// Reports the final cardinality to the context.
957    fn report_final(&mut self) {
958        if !self.finalized {
959            self.context
960                .record_actual(&self.operator_id, self.row_count);
961            self.finalized = true;
962        }
963    }
964}
965
966impl Operator for CardinalityTrackingWrapper {
967    fn next(&mut self) -> OperatorResult {
968        match self.inner.next() {
969            Ok(Some(chunk)) => {
970                // Track rows
971                self.row_count += chunk.row_count() as u64;
972                Ok(Some(chunk))
973            }
974            Ok(None) => {
975                // Stream exhausted - report final cardinality
976                self.report_final();
977                Ok(None)
978            }
979            Err(e) => {
980                // Report on error too
981                self.report_final();
982                Err(e)
983            }
984        }
985    }
986
987    fn reset(&mut self) {
988        self.row_count = 0;
989        self.finalized = false;
990        self.inner.reset();
991    }
992
993    fn name(&self) -> &'static str {
994        self.inner.name()
995    }
996}
997
998impl Drop for CardinalityTrackingWrapper {
999    fn drop(&mut self) {
1000        // Ensure we report even if dropped early
1001        self.report_final();
1002    }
1003}
1004
1005// ============= Adaptive Pipeline Execution =============
1006
1007use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; // Sink imported above
1008use super::sink::CollectorSink;
1009use super::source::OperatorSource;
1010
1011/// High-level adaptive pipeline that executes a pull-based operator with
1012/// cardinality tracking using push-based infrastructure.
1013///
1014/// This bridges the pull-based planner output with push-based execution:
1015/// 1. Wraps the pull operator as an `OperatorSource`
1016/// 2. Uses `CardinalityTrackingSink` to track output cardinality
1017/// 3. Provides adaptive feedback through `AdaptiveContext`
1018///
1019/// # Example
1020///
1021/// ```ignore
1022/// use grafeo_core::execution::adaptive::AdaptivePipelineExecutor;
1023///
1024/// let executor = AdaptivePipelineExecutor::new(operator, adaptive_context);
1025/// let (chunks, summary) = executor.execute()?;
1026/// ```
1027pub struct AdaptivePipelineExecutor {
1028    source: OperatorSource,
1029    context: SharedAdaptiveContext,
1030    config: AdaptivePipelineConfig,
1031}
1032
1033impl AdaptivePipelineExecutor {
1034    /// Creates a new adaptive pipeline executor.
1035    ///
1036    /// # Arguments
1037    ///
1038    /// * `operator` - The pull-based operator to execute
1039    /// * `context` - Adaptive context with cardinality estimates
1040    pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1041        Self {
1042            source: OperatorSource::new(operator),
1043            context: SharedAdaptiveContext::from_context(context),
1044            config: AdaptivePipelineConfig::default(),
1045        }
1046    }
1047
1048    /// Creates an executor with custom configuration.
1049    pub fn with_config(
1050        operator: Box<dyn Operator>,
1051        context: AdaptiveContext,
1052        config: AdaptivePipelineConfig,
1053    ) -> Self {
1054        Self {
1055            source: OperatorSource::new(operator),
1056            context: SharedAdaptiveContext::from_context(context),
1057            config,
1058        }
1059    }
1060
1061    /// Executes the pipeline and returns collected chunks with adaptive summary.
1062    ///
1063    /// # Returns
1064    ///
1065    /// A tuple of (collected DataChunks, adaptive execution summary).
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if execution fails.
1070    pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1071        let mut sink = CardinalityTrackingSink::new(
1072            Box::new(CollectorSink::new()),
1073            "output",
1074            self.context.clone(),
1075        );
1076
1077        let chunk_size = DEFAULT_CHUNK_SIZE;
1078        let mut total_rows: u64 = 0;
1079        let check_interval = self.config.check_interval;
1080
1081        // Process all chunks from source
1082        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1083            let chunk_rows = chunk.len() as u64;
1084            total_rows += chunk_rows;
1085
1086            // Push to tracking sink
1087            let continue_exec = sink.consume(chunk)?;
1088            if !continue_exec {
1089                break;
1090            }
1091
1092            // Periodically check for reoptimization need
1093            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1094                if self.context.should_reoptimize() {
1095                    // Log or emit event that reoptimization would be triggered
1096                    // Full re-planning would happen at a higher level
1097                }
1098            }
1099        }
1100
1101        // Finalize sink
1102        sink.finalize()?;
1103
1104        // Extract results from the inner sink
1105        let summary = self
1106            .context
1107            .snapshot()
1108            .map(|ctx| ctx.summary())
1109            .unwrap_or_default();
1110
1111        // Get collected chunks from the inner CollectorSink
1112        // Note: We need to extract chunks from the wrapped sink
1113        // For now, we'll return the summary and the caller can collect separately
1114        Ok((Vec::new(), summary))
1115    }
1116
1117    /// Executes and collects all results into DataChunks.
1118    ///
1119    /// This is a simpler interface that handles chunk collection internally.
1120    pub fn execute_collecting(
1121        mut self,
1122    ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1123        let mut chunks = Vec::new();
1124        let chunk_size = DEFAULT_CHUNK_SIZE;
1125        let mut total_rows: u64 = 0;
1126        let check_interval = self.config.check_interval;
1127
1128        // Process all chunks from source
1129        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1130            let chunk_rows = chunk.len() as u64;
1131            total_rows += chunk_rows;
1132
1133            // Record cardinality
1134            self.context.record_actual("root", total_rows);
1135
1136            // Collect the chunk
1137            if !chunk.is_empty() {
1138                chunks.push(chunk);
1139            }
1140
1141            // Periodically check for reoptimization
1142            if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1143                let _ = self.context.should_reoptimize();
1144            }
1145        }
1146
1147        let summary = self
1148            .context
1149            .snapshot()
1150            .map(|ctx| ctx.summary())
1151            .unwrap_or_default();
1152
1153        Ok((chunks, summary))
1154    }
1155
1156    /// Returns a reference to the shared context for external monitoring.
1157    pub fn context(&self) -> &SharedAdaptiveContext {
1158        &self.context
1159    }
1160}
1161
1162/// Convenience function to execute a pull-based operator with adaptive tracking.
1163///
1164/// This is the recommended entry point for adaptive execution.
1165///
1166/// # Arguments
1167///
1168/// * `operator` - The pull-based operator to execute
1169/// * `context` - Adaptive context with cardinality estimates (or None for default)
1170/// * `config` - Optional configuration (uses defaults if None)
1171///
1172/// # Returns
1173///
1174/// A tuple of (collected DataChunks, adaptive summary if tracking was enabled).
1175pub fn execute_adaptive(
1176    operator: Box<dyn Operator>,
1177    context: Option<AdaptiveContext>,
1178    config: Option<AdaptivePipelineConfig>,
1179) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1180    let ctx = context.unwrap_or_default();
1181    let cfg = config.unwrap_or_default();
1182
1183    let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1184    let (chunks, summary) = executor.execute_collecting()?;
1185
1186    Ok((chunks, Some(summary)))
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191    use super::*;
1192
1193    #[test]
1194    fn test_checkpoint_deviation_ratio() {
1195        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1196        cp.record(200);
1197
1198        // Actual is 2x estimate
1199        assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1200    }
1201
1202    #[test]
1203    fn test_checkpoint_underestimate() {
1204        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1205        cp.record(500);
1206
1207        // Underestimated by 5x
1208        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1209        assert!(cp.is_significant_deviation(3.0));
1210    }
1211
1212    #[test]
1213    fn test_checkpoint_overestimate() {
1214        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1215        cp.record(20);
1216
1217        // Overestimated - actual is 0.2x estimate
1218        assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1219        assert!(cp.is_significant_deviation(3.0)); // 0.2 < 1/3
1220    }
1221
1222    #[test]
1223    fn test_checkpoint_accurate() {
1224        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1225        cp.record(110);
1226
1227        // Close to estimate
1228        assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1229        assert!(!cp.is_significant_deviation(3.0)); // 1.1 is within threshold
1230    }
1231
1232    #[test]
1233    fn test_checkpoint_zero_estimate() {
1234        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1235        cp.record(100);
1236
1237        // Zero estimate with actual rows = infinity
1238        assert!(cp.deviation_ratio().is_infinite());
1239    }
1240
1241    #[test]
1242    fn test_checkpoint_zero_both() {
1243        let mut cp = CardinalityCheckpoint::new("test", 0.0);
1244        cp.record(0);
1245
1246        // Zero estimate and zero actual = ratio of 1.0
1247        assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1248    }
1249
1250    #[test]
1251    fn test_feedback_collection() {
1252        let mut feedback = CardinalityFeedback::new();
1253        feedback.record("scan_1", 1000);
1254        feedback.record("filter_1", 100);
1255
1256        assert_eq!(feedback.get("scan_1"), Some(1000));
1257        assert_eq!(feedback.get("filter_1"), Some(100));
1258        assert_eq!(feedback.get("unknown"), None);
1259    }
1260
1261    #[test]
1262    fn test_feedback_running_counter() {
1263        let mut feedback = CardinalityFeedback::new();
1264        feedback.init_counter("op_1");
1265
1266        feedback.add_rows("op_1", 100);
1267        feedback.add_rows("op_1", 200);
1268        feedback.add_rows("op_1", 50);
1269
1270        assert_eq!(feedback.get_running("op_1"), Some(350));
1271
1272        feedback.finalize_counter("op_1");
1273        assert_eq!(feedback.get("op_1"), Some(350));
1274    }
1275
1276    #[test]
1277    fn test_adaptive_context_basic() {
1278        let mut ctx = AdaptiveContext::new();
1279        ctx.set_estimate("scan", 1000.0);
1280        ctx.set_estimate("filter", 100.0);
1281
1282        ctx.record_actual("scan", 1000);
1283        ctx.record_actual("filter", 500); // 5x underestimate
1284
1285        let cp = ctx.get_checkpoint("filter").unwrap();
1286        assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1287    }
1288
1289    #[test]
1290    fn test_adaptive_context_should_reoptimize() {
1291        let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1292        ctx.set_estimate("scan", 10000.0);
1293        ctx.set_estimate("filter", 1000.0);
1294
1295        ctx.record_actual("scan", 10000);
1296        ctx.record_actual("filter", 5000); // 5x underestimate
1297
1298        assert!(ctx.should_reoptimize());
1299        assert_eq!(ctx.trigger_operator(), Some("filter"));
1300
1301        // Second call should return false (already triggered)
1302        assert!(!ctx.should_reoptimize());
1303    }
1304
1305    #[test]
1306    fn test_adaptive_context_min_rows() {
1307        let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1308        ctx.set_estimate("filter", 100.0);
1309        ctx.record_actual("filter", 500); // 5x, but only 500 rows
1310
1311        // Should not trigger because we haven't seen enough rows
1312        assert!(!ctx.should_reoptimize());
1313    }
1314
1315    #[test]
1316    fn test_adaptive_context_no_deviation() {
1317        let mut ctx = AdaptiveContext::new();
1318        ctx.set_estimate("scan", 1000.0);
1319        ctx.record_actual("scan", 1100); // Close to estimate
1320
1321        assert!(!ctx.has_significant_deviation());
1322        assert!(!ctx.should_reoptimize());
1323    }
1324
1325    #[test]
1326    fn test_adaptive_context_correction_factor() {
1327        let mut ctx = AdaptiveContext::new();
1328        ctx.set_estimate("filter", 100.0);
1329        ctx.record_actual("filter", 300);
1330
1331        assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1332        assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1333    }
1334
1335    #[test]
1336    fn test_adaptive_context_apply_feedback() {
1337        let mut ctx = AdaptiveContext::new();
1338        ctx.set_estimate("scan", 1000.0);
1339        ctx.set_estimate("filter", 100.0);
1340
1341        let mut feedback = CardinalityFeedback::new();
1342        feedback.record("scan", 1000);
1343        feedback.record("filter", 500);
1344
1345        ctx.apply_feedback(&feedback);
1346
1347        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1348        assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1349    }
1350
1351    #[test]
1352    fn test_adaptive_summary() {
1353        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1354        ctx.set_estimate("op1", 100.0);
1355        ctx.set_estimate("op2", 200.0);
1356        ctx.set_estimate("op3", 300.0);
1357
1358        ctx.record_actual("op1", 100); // Exact
1359        ctx.record_actual("op2", 600); // 3x
1360
1361        // Trigger reoptimization
1362        let _ = ctx.should_reoptimize();
1363
1364        let summary = ctx.summary();
1365        assert_eq!(summary.checkpoint_count, 3);
1366        assert_eq!(summary.recorded_count, 2);
1367        assert_eq!(summary.deviation_count, 1);
1368        assert!(summary.reoptimization_triggered);
1369    }
1370
1371    #[test]
1372    fn test_adaptive_context_reset() {
1373        let mut ctx = AdaptiveContext::new();
1374        ctx.set_estimate("scan", 1000.0);
1375        ctx.record_actual("scan", 5000);
1376        let _ = ctx.should_reoptimize(); // Trigger
1377
1378        assert!(ctx.reoptimization_triggered);
1379
1380        ctx.reset();
1381
1382        assert!(!ctx.reoptimization_triggered);
1383        assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1384        assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1385        // Estimate should be preserved
1386        assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1387    }
1388
1389    #[test]
1390    fn test_shared_context() {
1391        let ctx = SharedAdaptiveContext::new();
1392
1393        ctx.record_actual("op1", 1000);
1394
1395        let snapshot = ctx.snapshot().unwrap();
1396        assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1397    }
1398
1399    #[test]
1400    fn test_reoptimization_decision_continue() {
1401        let mut ctx = AdaptiveContext::new();
1402        ctx.set_estimate("scan", 1000.0);
1403        ctx.record_actual("scan", 1100);
1404
1405        let decision = evaluate_reoptimization(&ctx);
1406        assert_eq!(decision, ReoptimizationDecision::Continue);
1407    }
1408
1409    #[test]
1410    fn test_reoptimization_decision_reoptimize() {
1411        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1412        ctx.set_estimate("filter", 100.0);
1413        ctx.record_actual("filter", 500);
1414        let _ = ctx.should_reoptimize(); // Trigger
1415
1416        let decision = evaluate_reoptimization(&ctx);
1417
1418        if let ReoptimizationDecision::Reoptimize {
1419            trigger,
1420            corrections,
1421        } = decision
1422        {
1423            assert_eq!(trigger, "filter");
1424            assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1425        } else {
1426            panic!("Expected Reoptimize decision");
1427        }
1428    }
1429
1430    #[test]
1431    fn test_reoptimization_decision_abort() {
1432        let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1433        ctx.set_estimate("filter", 1.0);
1434        ctx.record_actual("filter", 1000); // 1000x deviation!
1435        let _ = ctx.should_reoptimize();
1436
1437        let decision = evaluate_reoptimization(&ctx);
1438
1439        if let ReoptimizationDecision::Abort { reason } = decision {
1440            assert!(reason.contains("Catastrophic"));
1441        } else {
1442            panic!("Expected Abort decision");
1443        }
1444    }
1445
1446    #[test]
1447    fn test_absolute_deviation() {
1448        let mut cp = CardinalityCheckpoint::new("test", 100.0);
1449        cp.record(150);
1450
1451        assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1452    }
1453
1454    // ============= Plan Switching Tests =============
1455
1456    #[test]
1457    fn test_adaptive_checkpoint_basic() {
1458        let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1459        assert_eq!(cp.actual_rows, 0);
1460        assert!(!cp.triggered);
1461
1462        cp.record_rows(50);
1463        assert_eq!(cp.actual_rows, 50);
1464
1465        cp.record_rows(100);
1466        assert_eq!(cp.actual_rows, 150);
1467    }
1468
1469    #[test]
1470    fn test_adaptive_checkpoint_exceeds_threshold() {
1471        let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1472
1473        // Below min rows
1474        cp.record_rows(50);
1475        assert!(!cp.exceeds_threshold(2.0, 100));
1476
1477        // Above min rows but within threshold
1478        cp.record_rows(50);
1479        assert!(!cp.exceeds_threshold(2.0, 100)); // 100 actual vs 100 estimated = 1.0x
1480
1481        // Above threshold (underestimate)
1482        cp.actual_rows = 0;
1483        cp.record_rows(500);
1484        assert!(cp.exceeds_threshold(2.0, 100)); // 500 actual vs 100 estimated = 5.0x
1485
1486        // Above threshold (overestimate)
1487        let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1488        cp2.record_rows(200);
1489        assert!(cp2.exceeds_threshold(2.0, 100)); // 200 actual vs 1000 estimated = 0.2x
1490    }
1491
1492    #[test]
1493    fn test_adaptive_pipeline_config_default() {
1494        let config = AdaptivePipelineConfig::default();
1495
1496        assert_eq!(config.check_interval, 10_000);
1497        assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1498        assert_eq!(
1499            config.min_rows_for_reoptimization,
1500            MIN_ROWS_FOR_REOPTIMIZATION
1501        );
1502        assert_eq!(config.max_reoptimizations, 3);
1503    }
1504
1505    #[test]
1506    fn test_adaptive_pipeline_config_custom() {
1507        let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1508
1509        assert_eq!(config.check_interval, 5000);
1510        assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1511        assert_eq!(config.min_rows_for_reoptimization, 500);
1512        assert_eq!(config.max_reoptimizations, 5);
1513    }
1514
1515    #[test]
1516    fn test_adaptive_pipeline_builder() {
1517        let config = AdaptivePipelineBuilder::new()
1518            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1519            .with_checkpoint("scan", 0, 10000.0)
1520            .with_checkpoint("filter", 1, 1000.0)
1521            .build();
1522
1523        assert_eq!(config.checkpoints.len(), 2);
1524        assert_eq!(config.checkpoints[0].id, "scan");
1525        assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1526        assert_eq!(config.checkpoints[1].id, "filter");
1527        assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1528    }
1529
1530    #[test]
1531    fn test_adaptive_execution_config_record_checkpoint() {
1532        let mut config = AdaptivePipelineBuilder::new()
1533            .with_checkpoint("filter", 0, 100.0)
1534            .build();
1535
1536        config.record_checkpoint("filter", 500);
1537
1538        // Check context was updated
1539        let cp = config.context.get_checkpoint("filter").unwrap();
1540        assert_eq!(cp.actual, 500);
1541        assert!(cp.recorded);
1542
1543        // Check checkpoint was updated
1544        let acp = config
1545            .checkpoints
1546            .iter()
1547            .find(|c| c.id == "filter")
1548            .unwrap();
1549        assert_eq!(acp.actual_rows, 500);
1550    }
1551
1552    #[test]
1553    fn test_adaptive_execution_config_should_reoptimize() {
1554        let mut config = AdaptivePipelineBuilder::new()
1555            .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1556            .with_checkpoint("filter", 0, 100.0)
1557            .build();
1558
1559        // No data yet - should not trigger
1560        assert!(config.should_reoptimize().is_none());
1561
1562        // Record within threshold
1563        config.record_checkpoint("filter", 150);
1564        assert!(config.should_reoptimize().is_none()); // 1.5x is within 2.0x threshold
1565
1566        // Record exceeding threshold
1567        config.checkpoints[0].actual_rows = 0; // Reset for new test
1568        config.record_checkpoint("filter", 500);
1569        config.checkpoints[0].actual_rows = 500;
1570
1571        let trigger = config.should_reoptimize();
1572        assert!(trigger.is_some());
1573        assert_eq!(trigger.unwrap().id, "filter");
1574    }
1575
1576    #[test]
1577    fn test_adaptive_execution_config_mark_triggered() {
1578        let mut config = AdaptivePipelineBuilder::new()
1579            .with_checkpoint("filter", 0, 100.0)
1580            .build();
1581
1582        assert!(!config.checkpoints[0].triggered);
1583
1584        config.mark_triggered("filter");
1585
1586        assert!(config.checkpoints[0].triggered);
1587    }
1588
1589    #[test]
1590    fn test_adaptive_event_callback() {
1591        use std::sync::atomic::AtomicUsize;
1592
1593        let event_count = Arc::new(AtomicUsize::new(0));
1594        let counter = event_count.clone();
1595
1596        let mut config = AdaptivePipelineBuilder::new()
1597            .with_checkpoint("filter", 0, 100.0)
1598            .with_event_callback(Box::new(move |_event| {
1599                counter.fetch_add(1, Ordering::Relaxed);
1600            }))
1601            .build();
1602
1603        config.record_checkpoint("filter", 500);
1604
1605        // Should have received one CheckpointReached event
1606        assert_eq!(event_count.load(Ordering::Relaxed), 1);
1607
1608        config.mark_triggered("filter");
1609
1610        // Should have received one ReoptimizationTriggered event
1611        assert_eq!(event_count.load(Ordering::Relaxed), 2);
1612    }
1613
1614    #[test]
1615    fn test_adaptive_checkpoint_with_zero_estimate() {
1616        let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1617        cp.record_rows(100);
1618
1619        // Zero estimate should trigger if any rows are seen
1620        assert!(cp.exceeds_threshold(2.0, 50));
1621    }
1622}