1use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, RwLock};
51
52use super::chunk::DataChunk;
53use super::operators::OperatorError;
54use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
55
56pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
59
60pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
63
64#[derive(Debug, Clone)]
66pub struct CardinalityCheckpoint {
67 pub operator_id: String,
69 pub estimated: f64,
71 pub actual: u64,
73 pub recorded: bool,
75}
76
77impl CardinalityCheckpoint {
78 #[must_use]
80 pub fn new(operator_id: &str, estimated: f64) -> Self {
81 Self {
82 operator_id: operator_id.to_string(),
83 estimated,
84 actual: 0,
85 recorded: false,
86 }
87 }
88
89 pub fn record(&mut self, actual: u64) {
91 self.actual = actual;
92 self.recorded = true;
93 }
94
95 #[must_use]
100 pub fn deviation_ratio(&self) -> f64 {
101 if self.estimated <= 0.0 {
102 return if self.actual == 0 { 1.0 } else { f64::INFINITY };
103 }
104 self.actual as f64 / self.estimated
105 }
106
107 #[must_use]
109 pub fn absolute_deviation(&self) -> f64 {
110 (self.actual as f64 - self.estimated).abs()
111 }
112
113 #[must_use]
115 pub fn is_significant_deviation(&self, threshold: f64) -> bool {
116 if !self.recorded {
117 return false;
118 }
119 let ratio = self.deviation_ratio();
120 ratio > threshold || ratio < 1.0 / threshold
121 }
122}
123
124#[derive(Debug, Default)]
129pub struct CardinalityFeedback {
130 actuals: HashMap<String, u64>,
132 running_counts: HashMap<String, AtomicU64>,
134}
135
136impl CardinalityFeedback {
137 #[must_use]
139 pub fn new() -> Self {
140 Self {
141 actuals: HashMap::new(),
142 running_counts: HashMap::new(),
143 }
144 }
145
146 pub fn record(&mut self, operator_id: &str, count: u64) {
148 self.actuals.insert(operator_id.to_string(), count);
149 }
150
151 pub fn add_rows(&self, operator_id: &str, count: u64) {
153 if let Some(counter) = self.running_counts.get(operator_id) {
154 counter.fetch_add(count, Ordering::Relaxed);
155 }
156 }
157
158 pub fn init_counter(&mut self, operator_id: &str) {
160 self.running_counts
161 .insert(operator_id.to_string(), AtomicU64::new(0));
162 }
163
164 pub fn finalize_counter(&mut self, operator_id: &str) {
166 if let Some(counter) = self.running_counts.get(operator_id) {
167 let count = counter.load(Ordering::Relaxed);
168 self.actuals.insert(operator_id.to_string(), count);
169 }
170 }
171
172 #[must_use]
174 pub fn get(&self, operator_id: &str) -> Option<u64> {
175 self.actuals.get(operator_id).copied()
176 }
177
178 #[must_use]
180 pub fn get_running(&self, operator_id: &str) -> Option<u64> {
181 self.running_counts
182 .get(operator_id)
183 .map(|c| c.load(Ordering::Relaxed))
184 }
185
186 #[must_use]
188 pub fn all_actuals(&self) -> &HashMap<String, u64> {
189 &self.actuals
190 }
191}
192
193#[derive(Debug)]
198pub struct AdaptiveContext {
199 checkpoints: HashMap<String, CardinalityCheckpoint>,
201 reoptimization_threshold: f64,
203 min_rows: u64,
205 reoptimization_triggered: bool,
207 trigger_operator: Option<String>,
209}
210
211impl AdaptiveContext {
212 #[must_use]
214 pub fn new() -> Self {
215 Self {
216 checkpoints: HashMap::new(),
217 reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
218 min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
219 reoptimization_triggered: false,
220 trigger_operator: None,
221 }
222 }
223
224 #[must_use]
226 pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
227 Self {
228 checkpoints: HashMap::new(),
229 reoptimization_threshold: threshold,
230 min_rows,
231 reoptimization_triggered: false,
232 trigger_operator: None,
233 }
234 }
235
236 pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
238 self.checkpoints.insert(
239 operator_id.to_string(),
240 CardinalityCheckpoint::new(operator_id, estimate),
241 );
242 }
243
244 pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
246 if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
247 checkpoint.record(actual);
248 } else {
249 let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
251 checkpoint.record(actual);
252 self.checkpoints.insert(operator_id.to_string(), checkpoint);
253 }
254 }
255
256 pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
258 for (op_id, &actual) in feedback.all_actuals() {
259 self.record_actual(op_id, actual);
260 }
261 }
262
263 #[must_use]
265 pub fn has_significant_deviation(&self) -> bool {
266 self.checkpoints
267 .values()
268 .any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
269 }
270
271 #[must_use]
278 pub fn should_reoptimize(&mut self) -> bool {
279 if self.reoptimization_triggered {
280 return false;
281 }
282
283 for (op_id, checkpoint) in &self.checkpoints {
284 if checkpoint.actual < self.min_rows {
285 continue;
286 }
287
288 if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
289 self.reoptimization_triggered = true;
290 self.trigger_operator = Some(op_id.clone());
291 return true;
292 }
293 }
294
295 false
296 }
297
298 #[must_use]
300 pub fn trigger_operator(&self) -> Option<&str> {
301 self.trigger_operator.as_deref()
302 }
303
304 #[must_use]
306 pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
307 self.checkpoints.get(operator_id)
308 }
309
310 #[must_use]
312 pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
313 &self.checkpoints
314 }
315
316 #[must_use]
320 pub fn correction_factor(&self, operator_id: &str) -> f64 {
321 self.checkpoints
322 .get(operator_id)
323 .filter(|cp| cp.recorded)
324 .map_or(1.0, CardinalityCheckpoint::deviation_ratio)
325 }
326
327 #[must_use]
329 pub fn summary(&self) -> AdaptiveSummary {
330 let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
331 let deviation_count = self
332 .checkpoints
333 .values()
334 .filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
335 .count();
336
337 let avg_deviation = if recorded_count > 0 {
338 self.checkpoints
339 .values()
340 .filter(|cp| cp.recorded)
341 .map(CardinalityCheckpoint::deviation_ratio)
342 .sum::<f64>()
343 / recorded_count as f64
344 } else {
345 1.0
346 };
347
348 let max_deviation = self
349 .checkpoints
350 .values()
351 .filter(|cp| cp.recorded)
352 .map(|cp| {
353 let ratio = cp.deviation_ratio();
354 if ratio > 1.0 { ratio } else { 1.0 / ratio }
355 })
356 .fold(1.0_f64, f64::max);
357
358 AdaptiveSummary {
359 checkpoint_count: self.checkpoints.len(),
360 recorded_count,
361 deviation_count,
362 avg_deviation_ratio: avg_deviation,
363 max_deviation_ratio: max_deviation,
364 reoptimization_triggered: self.reoptimization_triggered,
365 trigger_operator: self.trigger_operator.clone(),
366 }
367 }
368
369 pub fn reset(&mut self) {
371 for checkpoint in self.checkpoints.values_mut() {
372 checkpoint.actual = 0;
373 checkpoint.recorded = false;
374 }
375 self.reoptimization_triggered = false;
376 self.trigger_operator = None;
377 }
378}
379
380impl Default for AdaptiveContext {
381 fn default() -> Self {
382 Self::new()
383 }
384}
385
386#[derive(Debug, Clone, Default)]
388pub struct AdaptiveSummary {
389 pub checkpoint_count: usize,
391 pub recorded_count: usize,
393 pub deviation_count: usize,
395 pub avg_deviation_ratio: f64,
397 pub max_deviation_ratio: f64,
399 pub reoptimization_triggered: bool,
401 pub trigger_operator: Option<String>,
403}
404
405#[derive(Debug, Clone)]
409pub struct SharedAdaptiveContext {
410 inner: Arc<RwLock<AdaptiveContext>>,
411}
412
413impl SharedAdaptiveContext {
414 #[must_use]
416 pub fn new() -> Self {
417 Self {
418 inner: Arc::new(RwLock::new(AdaptiveContext::new())),
419 }
420 }
421
422 #[must_use]
424 pub fn from_context(ctx: AdaptiveContext) -> Self {
425 Self {
426 inner: Arc::new(RwLock::new(ctx)),
427 }
428 }
429
430 pub fn record_actual(&self, operator_id: &str, actual: u64) {
432 if let Ok(mut ctx) = self.inner.write() {
433 ctx.record_actual(operator_id, actual);
434 }
435 }
436
437 #[must_use]
439 pub fn should_reoptimize(&self) -> bool {
440 if let Ok(mut ctx) = self.inner.write() {
441 ctx.should_reoptimize()
442 } else {
443 false
444 }
445 }
446
447 #[must_use]
449 pub fn snapshot(&self) -> Option<AdaptiveContext> {
450 self.inner.read().ok().map(|guard| AdaptiveContext {
451 checkpoints: guard.checkpoints.clone(),
452 reoptimization_threshold: guard.reoptimization_threshold,
453 min_rows: guard.min_rows,
454 reoptimization_triggered: guard.reoptimization_triggered,
455 trigger_operator: guard.trigger_operator.clone(),
456 })
457 }
458}
459
460impl Default for SharedAdaptiveContext {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466pub struct CardinalityTrackingOperator {
471 inner: Box<dyn PushOperator>,
473 operator_id: String,
475 row_count: u64,
477 context: SharedAdaptiveContext,
479}
480
481impl CardinalityTrackingOperator {
482 pub fn new(
484 inner: Box<dyn PushOperator>,
485 operator_id: &str,
486 context: SharedAdaptiveContext,
487 ) -> Self {
488 Self {
489 inner,
490 operator_id: operator_id.to_string(),
491 row_count: 0,
492 context,
493 }
494 }
495
496 #[must_use]
498 pub fn current_count(&self) -> u64 {
499 self.row_count
500 }
501}
502
503impl PushOperator for CardinalityTrackingOperator {
504 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
505 self.row_count += chunk.len() as u64;
507
508 self.inner.push(chunk, sink)
510 }
511
512 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
513 self.context
515 .record_actual(&self.operator_id, self.row_count);
516
517 self.inner.finalize(sink)
519 }
520
521 fn preferred_chunk_size(&self) -> ChunkSizeHint {
522 self.inner.preferred_chunk_size()
523 }
524
525 fn name(&self) -> &'static str {
526 self.inner.name()
528 }
529}
530
531pub struct CardinalityTrackingSink {
533 inner: Box<dyn Sink>,
535 operator_id: String,
537 row_count: u64,
539 context: SharedAdaptiveContext,
541}
542
543impl CardinalityTrackingSink {
544 pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
546 Self {
547 inner,
548 operator_id: operator_id.to_string(),
549 row_count: 0,
550 context,
551 }
552 }
553
554 #[must_use]
556 pub fn current_count(&self) -> u64 {
557 self.row_count
558 }
559}
560
561impl Sink for CardinalityTrackingSink {
562 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
563 self.row_count += chunk.len() as u64;
564 self.inner.consume(chunk)
565 }
566
567 fn finalize(&mut self) -> Result<(), OperatorError> {
568 self.context
570 .record_actual(&self.operator_id, self.row_count);
571 self.inner.finalize()
572 }
573
574 fn name(&self) -> &'static str {
575 self.inner.name()
576 }
577
578 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
579 self
580 }
581}
582
583#[derive(Debug, Clone, PartialEq)]
585#[non_exhaustive]
586pub enum ReoptimizationDecision {
587 Continue,
589 Reoptimize {
591 trigger: String,
593 corrections: HashMap<String, f64>,
595 },
596 Abort {
598 reason: String,
600 },
601}
602
603#[must_use]
605pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
606 let summary = ctx.summary();
607
608 if !summary.reoptimization_triggered {
610 return ReoptimizationDecision::Continue;
611 }
612
613 if summary.max_deviation_ratio > 100.0 {
615 return ReoptimizationDecision::Abort {
616 reason: format!(
617 "Catastrophic cardinality misestimate: {}x deviation",
618 summary.max_deviation_ratio
619 ),
620 };
621 }
622
623 let corrections: HashMap<String, f64> = ctx
625 .all_checkpoints()
626 .iter()
627 .filter(|(_, cp)| cp.recorded)
628 .map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
629 .collect();
630
631 ReoptimizationDecision::Reoptimize {
632 trigger: summary.trigger_operator.unwrap_or_default(),
633 corrections,
634 }
635}
636
637pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
643
644#[derive(Debug, Clone)]
646pub struct AdaptivePipelineConfig {
647 pub check_interval: u64,
649 pub reoptimization_threshold: f64,
651 pub min_rows_for_reoptimization: u64,
653 pub max_reoptimizations: usize,
655}
656
657impl Default for AdaptivePipelineConfig {
658 fn default() -> Self {
659 Self {
660 check_interval: 10_000,
661 reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
662 min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
663 max_reoptimizations: 3,
664 }
665 }
666}
667
668impl AdaptivePipelineConfig {
669 #[must_use]
671 pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
672 Self {
673 check_interval,
674 reoptimization_threshold: threshold,
675 min_rows_for_reoptimization: min_rows,
676 max_reoptimizations: 3,
677 }
678 }
679
680 #[must_use]
682 pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
683 self.max_reoptimizations = max;
684 self
685 }
686}
687
688#[derive(Debug, Clone)]
690pub struct AdaptiveExecutionResult {
691 pub total_rows: u64,
693 pub reoptimization_count: usize,
695 pub triggers: Vec<String>,
697 pub final_context: AdaptiveSummary,
699}
700
701#[derive(Debug)]
706pub struct AdaptiveCheckpoint {
707 pub id: String,
709 pub after_operator: usize,
711 pub estimated_cardinality: f64,
713 pub actual_rows: u64,
715 pub triggered: bool,
717}
718
719impl AdaptiveCheckpoint {
720 #[must_use]
722 pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
723 Self {
724 id: id.to_string(),
725 after_operator,
726 estimated_cardinality: estimated,
727 actual_rows: 0,
728 triggered: false,
729 }
730 }
731
732 pub fn record_rows(&mut self, count: u64) {
734 self.actual_rows += count;
735 }
736
737 #[must_use]
739 pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
740 if self.actual_rows < min_rows {
741 return false;
742 }
743 if self.estimated_cardinality <= 0.0 {
744 return self.actual_rows > 0;
745 }
746 let ratio = self.actual_rows as f64 / self.estimated_cardinality;
747 ratio > threshold || ratio < 1.0 / threshold
748 }
749}
750
751#[derive(Debug, Clone)]
753#[non_exhaustive]
754pub enum AdaptiveEvent {
755 CheckpointReached {
757 id: String,
759 actual_rows: u64,
761 estimated: f64,
763 },
764 ReoptimizationTriggered {
766 checkpoint_id: String,
768 deviation_ratio: f64,
770 },
771 PlanSwitched {
773 old_operator_count: usize,
775 new_operator_count: usize,
777 },
778 ExecutionCompleted {
780 total_rows: u64,
782 },
783}
784
785pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
787
788pub struct AdaptivePipelineBuilder {
790 checkpoints: Vec<AdaptiveCheckpoint>,
791 config: AdaptivePipelineConfig,
792 context: AdaptiveContext,
793 event_callback: Option<AdaptiveEventCallback>,
794}
795
796impl AdaptivePipelineBuilder {
797 #[must_use]
799 pub fn new() -> Self {
800 Self {
801 checkpoints: Vec::new(),
802 config: AdaptivePipelineConfig::default(),
803 context: AdaptiveContext::new(),
804 event_callback: None,
805 }
806 }
807
808 #[must_use]
810 pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
811 self.config = config;
812 self
813 }
814
815 #[must_use]
817 pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
818 self.checkpoints
819 .push(AdaptiveCheckpoint::new(id, after_operator, estimated));
820 self.context.set_estimate(id, estimated);
821 self
822 }
823
824 #[must_use]
826 pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
827 self.event_callback = Some(callback);
828 self
829 }
830
831 #[must_use]
833 pub fn with_context(mut self, context: AdaptiveContext) -> Self {
834 self.context = context;
835 self
836 }
837
838 #[must_use]
840 pub fn build(self) -> AdaptiveExecutionConfig {
841 AdaptiveExecutionConfig {
842 checkpoints: self.checkpoints,
843 config: self.config,
844 context: self.context,
845 event_callback: self.event_callback,
846 }
847 }
848}
849
850impl Default for AdaptivePipelineBuilder {
851 fn default() -> Self {
852 Self::new()
853 }
854}
855
856pub struct AdaptiveExecutionConfig {
858 pub checkpoints: Vec<AdaptiveCheckpoint>,
860 pub config: AdaptivePipelineConfig,
862 pub context: AdaptiveContext,
864 pub event_callback: Option<AdaptiveEventCallback>,
866}
867
868impl AdaptiveExecutionConfig {
869 #[must_use]
871 pub fn summary(&self) -> AdaptiveSummary {
872 self.context.summary()
873 }
874
875 pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
877 self.context.record_actual(checkpoint_id, actual);
878
879 if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
880 cp.actual_rows = actual;
881 }
882
883 if let Some(ref callback) = self.event_callback {
884 let estimated = self
885 .context
886 .get_checkpoint(checkpoint_id)
887 .map_or(0.0, |cp| cp.estimated);
888 callback(AdaptiveEvent::CheckpointReached {
889 id: checkpoint_id.to_string(),
890 actual_rows: actual,
891 estimated,
892 });
893 }
894 }
895
896 #[must_use]
898 pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
899 self.checkpoints.iter().find(|cp| {
900 !cp.triggered
901 && cp.exceeds_threshold(
902 self.config.reoptimization_threshold,
903 self.config.min_rows_for_reoptimization,
904 )
905 })
906 }
907
908 pub fn mark_triggered(&mut self, checkpoint_id: &str) {
910 if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
911 cp.triggered = true;
912 }
913
914 if let Some(ref callback) = self.event_callback {
915 let ratio = self
916 .context
917 .get_checkpoint(checkpoint_id)
918 .filter(|cp| cp.recorded)
919 .map_or(1.0, |cp| cp.deviation_ratio());
920 callback(AdaptiveEvent::ReoptimizationTriggered {
921 checkpoint_id: checkpoint_id.to_string(),
922 deviation_ratio: ratio,
923 });
924 }
925 }
926}
927
928use super::operators::{Operator, OperatorResult}; pub struct CardinalityTrackingWrapper {
938 inner: Box<dyn Operator>,
940 operator_id: String,
942 row_count: u64,
944 context: SharedAdaptiveContext,
946 finalized: bool,
948}
949
950impl CardinalityTrackingWrapper {
951 pub fn new(
953 inner: Box<dyn Operator>,
954 operator_id: &str,
955 context: SharedAdaptiveContext,
956 ) -> Self {
957 Self {
958 inner,
959 operator_id: operator_id.to_string(),
960 row_count: 0,
961 context,
962 finalized: false,
963 }
964 }
965
966 #[must_use]
968 pub fn current_count(&self) -> u64 {
969 self.row_count
970 }
971
972 fn report_final(&mut self) {
974 if !self.finalized {
975 self.context
976 .record_actual(&self.operator_id, self.row_count);
977 self.finalized = true;
978 }
979 }
980}
981
982impl Operator for CardinalityTrackingWrapper {
983 fn next(&mut self) -> OperatorResult {
984 match self.inner.next() {
985 Ok(Some(chunk)) => {
986 self.row_count += chunk.row_count() as u64;
988 Ok(Some(chunk))
989 }
990 Ok(None) => {
991 self.report_final();
993 Ok(None)
994 }
995 Err(e) => {
996 self.report_final();
998 Err(e)
999 }
1000 }
1001 }
1002
1003 fn reset(&mut self) {
1004 self.row_count = 0;
1005 self.finalized = false;
1006 self.inner.reset();
1007 }
1008
1009 fn name(&self) -> &'static str {
1010 self.inner.name()
1011 }
1012
1013 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1014 self
1015 }
1016}
1017
1018impl Drop for CardinalityTrackingWrapper {
1019 fn drop(&mut self) {
1020 self.report_final();
1022 }
1023}
1024
1025use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; use super::sink::CollectorSink;
1029use super::source::OperatorSource;
1030
1031pub struct AdaptivePipelineExecutor {
1051 source: OperatorSource,
1052 context: SharedAdaptiveContext,
1053 config: AdaptivePipelineConfig,
1054}
1055
1056impl AdaptivePipelineExecutor {
1057 pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
1064 Self {
1065 source: OperatorSource::new(operator),
1066 context: SharedAdaptiveContext::from_context(context),
1067 config: AdaptivePipelineConfig::default(),
1068 }
1069 }
1070
1071 pub fn with_config(
1073 operator: Box<dyn Operator>,
1074 context: AdaptiveContext,
1075 config: AdaptivePipelineConfig,
1076 ) -> Self {
1077 Self {
1078 source: OperatorSource::new(operator),
1079 context: SharedAdaptiveContext::from_context(context),
1080 config,
1081 }
1082 }
1083
1084 pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1094 let mut sink = CardinalityTrackingSink::new(
1095 Box::new(CollectorSink::new()),
1096 "output",
1097 self.context.clone(),
1098 );
1099
1100 let chunk_size = DEFAULT_CHUNK_SIZE;
1101 let mut total_rows: u64 = 0;
1102 let check_interval = self.config.check_interval;
1103
1104 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1106 let chunk_rows = chunk.len() as u64;
1107 total_rows += chunk_rows;
1108
1109 let continue_exec = sink.consume(chunk)?;
1111 if !continue_exec {
1112 break;
1113 }
1114
1115 if total_rows >= check_interval
1117 && total_rows.is_multiple_of(check_interval)
1118 && self.context.should_reoptimize()
1119 {
1120 }
1123 }
1124
1125 sink.finalize()?;
1127
1128 let summary = self
1130 .context
1131 .snapshot()
1132 .map(|ctx| ctx.summary())
1133 .unwrap_or_default();
1134
1135 Ok((Vec::new(), summary))
1139 }
1140
1141 pub fn execute_collecting(
1149 mut self,
1150 ) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
1151 let mut chunks = Vec::new();
1152 let chunk_size = DEFAULT_CHUNK_SIZE;
1153 let mut total_rows: u64 = 0;
1154 let check_interval = self.config.check_interval;
1155
1156 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
1158 let chunk_rows = chunk.len() as u64;
1159 total_rows += chunk_rows;
1160
1161 self.context.record_actual("root", total_rows);
1163
1164 if !chunk.is_empty() {
1166 chunks.push(chunk);
1167 }
1168
1169 if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
1171 let _ = self.context.should_reoptimize();
1172 }
1173 }
1174
1175 let summary = self
1176 .context
1177 .snapshot()
1178 .map(|ctx| ctx.summary())
1179 .unwrap_or_default();
1180
1181 Ok((chunks, summary))
1182 }
1183
1184 pub fn context(&self) -> &SharedAdaptiveContext {
1186 &self.context
1187 }
1188}
1189
1190pub fn execute_adaptive(
1208 operator: Box<dyn Operator>,
1209 context: Option<AdaptiveContext>,
1210 config: Option<AdaptivePipelineConfig>,
1211) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
1212 let ctx = context.unwrap_or_default();
1213 let cfg = config.unwrap_or_default();
1214
1215 let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
1216 let (chunks, summary) = executor.execute_collecting()?;
1217
1218 Ok((chunks, Some(summary)))
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224
1225 #[test]
1226 fn test_checkpoint_deviation_ratio() {
1227 let mut cp = CardinalityCheckpoint::new("test", 100.0);
1228 cp.record(200);
1229
1230 assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
1232 }
1233
1234 #[test]
1235 fn test_checkpoint_underestimate() {
1236 let mut cp = CardinalityCheckpoint::new("test", 100.0);
1237 cp.record(500);
1238
1239 assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1241 assert!(cp.is_significant_deviation(3.0));
1242 }
1243
1244 #[test]
1245 fn test_checkpoint_overestimate() {
1246 let mut cp = CardinalityCheckpoint::new("test", 100.0);
1247 cp.record(20);
1248
1249 assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
1251 assert!(cp.is_significant_deviation(3.0)); }
1253
1254 #[test]
1255 fn test_checkpoint_accurate() {
1256 let mut cp = CardinalityCheckpoint::new("test", 100.0);
1257 cp.record(110);
1258
1259 assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
1261 assert!(!cp.is_significant_deviation(3.0)); }
1263
1264 #[test]
1265 fn test_checkpoint_zero_estimate() {
1266 let mut cp = CardinalityCheckpoint::new("test", 0.0);
1267 cp.record(100);
1268
1269 assert!(cp.deviation_ratio().is_infinite());
1271 }
1272
1273 #[test]
1274 fn test_checkpoint_zero_both() {
1275 let mut cp = CardinalityCheckpoint::new("test", 0.0);
1276 cp.record(0);
1277
1278 assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
1280 }
1281
1282 #[test]
1283 fn test_feedback_collection() {
1284 let mut feedback = CardinalityFeedback::new();
1285 feedback.record("scan_1", 1000);
1286 feedback.record("filter_1", 100);
1287
1288 assert_eq!(feedback.get("scan_1"), Some(1000));
1289 assert_eq!(feedback.get("filter_1"), Some(100));
1290 assert_eq!(feedback.get("unknown"), None);
1291 }
1292
1293 #[test]
1294 fn test_feedback_running_counter() {
1295 let mut feedback = CardinalityFeedback::new();
1296 feedback.init_counter("op_1");
1297
1298 feedback.add_rows("op_1", 100);
1299 feedback.add_rows("op_1", 200);
1300 feedback.add_rows("op_1", 50);
1301
1302 assert_eq!(feedback.get_running("op_1"), Some(350));
1303
1304 feedback.finalize_counter("op_1");
1305 assert_eq!(feedback.get("op_1"), Some(350));
1306 }
1307
1308 #[test]
1309 fn test_adaptive_context_basic() {
1310 let mut ctx = AdaptiveContext::new();
1311 ctx.set_estimate("scan", 1000.0);
1312 ctx.set_estimate("filter", 100.0);
1313
1314 ctx.record_actual("scan", 1000);
1315 ctx.record_actual("filter", 500); let cp = ctx.get_checkpoint("filter").unwrap();
1318 assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
1319 }
1320
1321 #[test]
1322 fn test_adaptive_context_should_reoptimize() {
1323 let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
1324 ctx.set_estimate("scan", 10000.0);
1325 ctx.set_estimate("filter", 1000.0);
1326
1327 ctx.record_actual("scan", 10000);
1328 ctx.record_actual("filter", 5000); assert!(ctx.should_reoptimize());
1331 assert_eq!(ctx.trigger_operator(), Some("filter"));
1332
1333 assert!(!ctx.should_reoptimize());
1335 }
1336
1337 #[test]
1338 fn test_adaptive_context_min_rows() {
1339 let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
1340 ctx.set_estimate("filter", 100.0);
1341 ctx.record_actual("filter", 500); assert!(!ctx.should_reoptimize());
1345 }
1346
1347 #[test]
1348 fn test_adaptive_context_no_deviation() {
1349 let mut ctx = AdaptiveContext::new();
1350 ctx.set_estimate("scan", 1000.0);
1351 ctx.record_actual("scan", 1100); assert!(!ctx.has_significant_deviation());
1354 assert!(!ctx.should_reoptimize());
1355 }
1356
1357 #[test]
1358 fn test_adaptive_context_correction_factor() {
1359 let mut ctx = AdaptiveContext::new();
1360 ctx.set_estimate("filter", 100.0);
1361 ctx.record_actual("filter", 300);
1362
1363 assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
1364 assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
1365 }
1366
1367 #[test]
1368 fn test_adaptive_context_apply_feedback() {
1369 let mut ctx = AdaptiveContext::new();
1370 ctx.set_estimate("scan", 1000.0);
1371 ctx.set_estimate("filter", 100.0);
1372
1373 let mut feedback = CardinalityFeedback::new();
1374 feedback.record("scan", 1000);
1375 feedback.record("filter", 500);
1376
1377 ctx.apply_feedback(&feedback);
1378
1379 assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
1380 assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
1381 }
1382
1383 #[test]
1384 fn test_adaptive_summary() {
1385 let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1386 ctx.set_estimate("op1", 100.0);
1387 ctx.set_estimate("op2", 200.0);
1388 ctx.set_estimate("op3", 300.0);
1389
1390 ctx.record_actual("op1", 100); ctx.record_actual("op2", 600); let _ = ctx.should_reoptimize();
1395
1396 let summary = ctx.summary();
1397 assert_eq!(summary.checkpoint_count, 3);
1398 assert_eq!(summary.recorded_count, 2);
1399 assert_eq!(summary.deviation_count, 1);
1400 assert!(summary.reoptimization_triggered);
1401 }
1402
1403 #[test]
1404 fn test_adaptive_context_reset() {
1405 let mut ctx = AdaptiveContext::new();
1406 ctx.set_estimate("scan", 1000.0);
1407 ctx.record_actual("scan", 5000);
1408 let _ = ctx.should_reoptimize(); assert!(ctx.reoptimization_triggered);
1411
1412 ctx.reset();
1413
1414 assert!(!ctx.reoptimization_triggered);
1415 assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
1416 assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
1417 assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
1419 }
1420
1421 #[test]
1422 fn test_shared_context() {
1423 let ctx = SharedAdaptiveContext::new();
1424
1425 ctx.record_actual("op1", 1000);
1426
1427 let snapshot = ctx.snapshot().unwrap();
1428 assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
1429 }
1430
1431 #[test]
1432 fn test_reoptimization_decision_continue() {
1433 let mut ctx = AdaptiveContext::new();
1434 ctx.set_estimate("scan", 1000.0);
1435 ctx.record_actual("scan", 1100);
1436
1437 let decision = evaluate_reoptimization(&ctx);
1438 assert_eq!(decision, ReoptimizationDecision::Continue);
1439 }
1440
1441 #[test]
1442 fn test_reoptimization_decision_reoptimize() {
1443 let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1444 ctx.set_estimate("filter", 100.0);
1445 ctx.record_actual("filter", 500);
1446 let _ = ctx.should_reoptimize(); let decision = evaluate_reoptimization(&ctx);
1449
1450 if let ReoptimizationDecision::Reoptimize {
1451 trigger,
1452 corrections,
1453 } = decision
1454 {
1455 assert_eq!(trigger, "filter");
1456 assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
1457 } else {
1458 panic!("Expected Reoptimize decision");
1459 }
1460 }
1461
1462 #[test]
1463 fn test_reoptimization_decision_abort() {
1464 let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
1465 ctx.set_estimate("filter", 1.0);
1466 ctx.record_actual("filter", 1000); let _ = ctx.should_reoptimize();
1468
1469 let decision = evaluate_reoptimization(&ctx);
1470
1471 if let ReoptimizationDecision::Abort { reason } = decision {
1472 assert!(reason.contains("Catastrophic"));
1473 } else {
1474 panic!("Expected Abort decision");
1475 }
1476 }
1477
1478 #[test]
1479 fn test_absolute_deviation() {
1480 let mut cp = CardinalityCheckpoint::new("test", 100.0);
1481 cp.record(150);
1482
1483 assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
1484 }
1485
1486 #[test]
1489 fn test_adaptive_checkpoint_basic() {
1490 let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
1491 assert_eq!(cp.actual_rows, 0);
1492 assert!(!cp.triggered);
1493
1494 cp.record_rows(50);
1495 assert_eq!(cp.actual_rows, 50);
1496
1497 cp.record_rows(100);
1498 assert_eq!(cp.actual_rows, 150);
1499 }
1500
1501 #[test]
1502 fn test_adaptive_checkpoint_exceeds_threshold() {
1503 let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
1504
1505 cp.record_rows(50);
1507 assert!(!cp.exceeds_threshold(2.0, 100));
1508
1509 cp.record_rows(50);
1511 assert!(!cp.exceeds_threshold(2.0, 100)); cp.actual_rows = 0;
1515 cp.record_rows(500);
1516 assert!(cp.exceeds_threshold(2.0, 100)); let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
1520 cp2.record_rows(200);
1521 assert!(cp2.exceeds_threshold(2.0, 100)); }
1523
1524 #[test]
1525 fn test_adaptive_pipeline_config_default() {
1526 let config = AdaptivePipelineConfig::default();
1527
1528 assert_eq!(config.check_interval, 10_000);
1529 assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
1530 assert_eq!(
1531 config.min_rows_for_reoptimization,
1532 MIN_ROWS_FOR_REOPTIMIZATION
1533 );
1534 assert_eq!(config.max_reoptimizations, 3);
1535 }
1536
1537 #[test]
1538 fn test_adaptive_pipeline_config_custom() {
1539 let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
1540
1541 assert_eq!(config.check_interval, 5000);
1542 assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
1543 assert_eq!(config.min_rows_for_reoptimization, 500);
1544 assert_eq!(config.max_reoptimizations, 5);
1545 }
1546
1547 #[test]
1548 fn test_adaptive_pipeline_builder() {
1549 let config = AdaptivePipelineBuilder::new()
1550 .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1551 .with_checkpoint("scan", 0, 10000.0)
1552 .with_checkpoint("filter", 1, 1000.0)
1553 .build();
1554
1555 assert_eq!(config.checkpoints.len(), 2);
1556 assert_eq!(config.checkpoints[0].id, "scan");
1557 assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
1558 assert_eq!(config.checkpoints[1].id, "filter");
1559 assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
1560 }
1561
1562 #[test]
1563 fn test_adaptive_execution_config_record_checkpoint() {
1564 let mut config = AdaptivePipelineBuilder::new()
1565 .with_checkpoint("filter", 0, 100.0)
1566 .build();
1567
1568 config.record_checkpoint("filter", 500);
1569
1570 let cp = config.context.get_checkpoint("filter").unwrap();
1572 assert_eq!(cp.actual, 500);
1573 assert!(cp.recorded);
1574
1575 let acp = config
1577 .checkpoints
1578 .iter()
1579 .find(|c| c.id == "filter")
1580 .unwrap();
1581 assert_eq!(acp.actual_rows, 500);
1582 }
1583
1584 #[test]
1585 fn test_adaptive_execution_config_should_reoptimize() {
1586 let mut config = AdaptivePipelineBuilder::new()
1587 .with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
1588 .with_checkpoint("filter", 0, 100.0)
1589 .build();
1590
1591 assert!(config.should_reoptimize().is_none());
1593
1594 config.record_checkpoint("filter", 150);
1596 assert!(config.should_reoptimize().is_none()); config.checkpoints[0].actual_rows = 0; config.record_checkpoint("filter", 500);
1601 config.checkpoints[0].actual_rows = 500;
1602
1603 let trigger = config.should_reoptimize();
1604 assert!(trigger.is_some());
1605 assert_eq!(trigger.unwrap().id, "filter");
1606 }
1607
1608 #[test]
1609 fn test_adaptive_execution_config_mark_triggered() {
1610 let mut config = AdaptivePipelineBuilder::new()
1611 .with_checkpoint("filter", 0, 100.0)
1612 .build();
1613
1614 assert!(!config.checkpoints[0].triggered);
1615
1616 config.mark_triggered("filter");
1617
1618 assert!(config.checkpoints[0].triggered);
1619 }
1620
1621 #[test]
1622 fn test_adaptive_event_callback() {
1623 use std::sync::atomic::AtomicUsize;
1624
1625 let event_count = Arc::new(AtomicUsize::new(0));
1626 let counter = event_count.clone();
1627
1628 let mut config = AdaptivePipelineBuilder::new()
1629 .with_checkpoint("filter", 0, 100.0)
1630 .with_event_callback(Box::new(move |_event| {
1631 counter.fetch_add(1, Ordering::Relaxed);
1632 }))
1633 .build();
1634
1635 config.record_checkpoint("filter", 500);
1636
1637 assert_eq!(event_count.load(Ordering::Relaxed), 1);
1639
1640 config.mark_triggered("filter");
1641
1642 assert_eq!(event_count.load(Ordering::Relaxed), 2);
1644 }
1645
1646 #[test]
1647 fn test_adaptive_checkpoint_with_zero_estimate() {
1648 let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
1649 cp.record_rows(100);
1650
1651 assert!(cp.exceeds_threshold(2.0, 50));
1653 }
1654}