use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use super::chunk::DataChunk;
use super::operators::OperatorError;
use super::pipeline::{ChunkSizeHint, PushOperator, Sink};
pub const DEFAULT_REOPTIMIZATION_THRESHOLD: f64 = 3.0;
pub const MIN_ROWS_FOR_REOPTIMIZATION: u64 = 1000;
#[derive(Debug, Clone)]
pub struct CardinalityCheckpoint {
pub operator_id: String,
pub estimated: f64,
pub actual: u64,
pub recorded: bool,
}
impl CardinalityCheckpoint {
#[must_use]
pub fn new(operator_id: &str, estimated: f64) -> Self {
Self {
operator_id: operator_id.to_string(),
estimated,
actual: 0,
recorded: false,
}
}
pub fn record(&mut self, actual: u64) {
self.actual = actual;
self.recorded = true;
}
#[must_use]
pub fn deviation_ratio(&self) -> f64 {
if self.estimated <= 0.0 {
return if self.actual == 0 { 1.0 } else { f64::INFINITY };
}
self.actual as f64 / self.estimated
}
#[must_use]
pub fn absolute_deviation(&self) -> f64 {
(self.actual as f64 - self.estimated).abs()
}
#[must_use]
pub fn is_significant_deviation(&self, threshold: f64) -> bool {
if !self.recorded {
return false;
}
let ratio = self.deviation_ratio();
ratio > threshold || ratio < 1.0 / threshold
}
}
#[derive(Debug, Default)]
pub struct CardinalityFeedback {
actuals: HashMap<String, u64>,
running_counts: HashMap<String, AtomicU64>,
}
impl CardinalityFeedback {
#[must_use]
pub fn new() -> Self {
Self {
actuals: HashMap::new(),
running_counts: HashMap::new(),
}
}
pub fn record(&mut self, operator_id: &str, count: u64) {
self.actuals.insert(operator_id.to_string(), count);
}
pub fn add_rows(&self, operator_id: &str, count: u64) {
if let Some(counter) = self.running_counts.get(operator_id) {
counter.fetch_add(count, Ordering::Relaxed);
}
}
pub fn init_counter(&mut self, operator_id: &str) {
self.running_counts
.insert(operator_id.to_string(), AtomicU64::new(0));
}
pub fn finalize_counter(&mut self, operator_id: &str) {
if let Some(counter) = self.running_counts.get(operator_id) {
let count = counter.load(Ordering::Relaxed);
self.actuals.insert(operator_id.to_string(), count);
}
}
#[must_use]
pub fn get(&self, operator_id: &str) -> Option<u64> {
self.actuals.get(operator_id).copied()
}
#[must_use]
pub fn get_running(&self, operator_id: &str) -> Option<u64> {
self.running_counts
.get(operator_id)
.map(|c| c.load(Ordering::Relaxed))
}
#[must_use]
pub fn all_actuals(&self) -> &HashMap<String, u64> {
&self.actuals
}
}
#[derive(Debug)]
pub struct AdaptiveContext {
checkpoints: HashMap<String, CardinalityCheckpoint>,
reoptimization_threshold: f64,
min_rows: u64,
reoptimization_triggered: bool,
trigger_operator: Option<String>,
}
impl AdaptiveContext {
#[must_use]
pub fn new() -> Self {
Self {
checkpoints: HashMap::new(),
reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
min_rows: MIN_ROWS_FOR_REOPTIMIZATION,
reoptimization_triggered: false,
trigger_operator: None,
}
}
#[must_use]
pub fn with_thresholds(threshold: f64, min_rows: u64) -> Self {
Self {
checkpoints: HashMap::new(),
reoptimization_threshold: threshold,
min_rows,
reoptimization_triggered: false,
trigger_operator: None,
}
}
pub fn set_estimate(&mut self, operator_id: &str, estimate: f64) {
self.checkpoints.insert(
operator_id.to_string(),
CardinalityCheckpoint::new(operator_id, estimate),
);
}
pub fn record_actual(&mut self, operator_id: &str, actual: u64) {
if let Some(checkpoint) = self.checkpoints.get_mut(operator_id) {
checkpoint.record(actual);
} else {
let mut checkpoint = CardinalityCheckpoint::new(operator_id, 0.0);
checkpoint.record(actual);
self.checkpoints.insert(operator_id.to_string(), checkpoint);
}
}
pub fn apply_feedback(&mut self, feedback: &CardinalityFeedback) {
for (op_id, &actual) in feedback.all_actuals() {
self.record_actual(op_id, actual);
}
}
#[must_use]
pub fn has_significant_deviation(&self) -> bool {
self.checkpoints
.values()
.any(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
}
#[must_use]
pub fn should_reoptimize(&mut self) -> bool {
if self.reoptimization_triggered {
return false;
}
for (op_id, checkpoint) in &self.checkpoints {
if checkpoint.actual < self.min_rows {
continue;
}
if checkpoint.is_significant_deviation(self.reoptimization_threshold) {
self.reoptimization_triggered = true;
self.trigger_operator = Some(op_id.clone());
return true;
}
}
false
}
#[must_use]
pub fn trigger_operator(&self) -> Option<&str> {
self.trigger_operator.as_deref()
}
#[must_use]
pub fn get_checkpoint(&self, operator_id: &str) -> Option<&CardinalityCheckpoint> {
self.checkpoints.get(operator_id)
}
#[must_use]
pub fn all_checkpoints(&self) -> &HashMap<String, CardinalityCheckpoint> {
&self.checkpoints
}
#[must_use]
pub fn correction_factor(&self, operator_id: &str) -> f64 {
self.checkpoints
.get(operator_id)
.filter(|cp| cp.recorded)
.map_or(1.0, CardinalityCheckpoint::deviation_ratio)
}
#[must_use]
pub fn summary(&self) -> AdaptiveSummary {
let recorded_count = self.checkpoints.values().filter(|cp| cp.recorded).count();
let deviation_count = self
.checkpoints
.values()
.filter(|cp| cp.is_significant_deviation(self.reoptimization_threshold))
.count();
let avg_deviation = if recorded_count > 0 {
self.checkpoints
.values()
.filter(|cp| cp.recorded)
.map(CardinalityCheckpoint::deviation_ratio)
.sum::<f64>()
/ recorded_count as f64
} else {
1.0
};
let max_deviation = self
.checkpoints
.values()
.filter(|cp| cp.recorded)
.map(|cp| {
let ratio = cp.deviation_ratio();
if ratio > 1.0 { ratio } else { 1.0 / ratio }
})
.fold(1.0_f64, f64::max);
AdaptiveSummary {
checkpoint_count: self.checkpoints.len(),
recorded_count,
deviation_count,
avg_deviation_ratio: avg_deviation,
max_deviation_ratio: max_deviation,
reoptimization_triggered: self.reoptimization_triggered,
trigger_operator: self.trigger_operator.clone(),
}
}
pub fn reset(&mut self) {
for checkpoint in self.checkpoints.values_mut() {
checkpoint.actual = 0;
checkpoint.recorded = false;
}
self.reoptimization_triggered = false;
self.trigger_operator = None;
}
}
impl Default for AdaptiveContext {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct AdaptiveSummary {
pub checkpoint_count: usize,
pub recorded_count: usize,
pub deviation_count: usize,
pub avg_deviation_ratio: f64,
pub max_deviation_ratio: f64,
pub reoptimization_triggered: bool,
pub trigger_operator: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SharedAdaptiveContext {
inner: Arc<RwLock<AdaptiveContext>>,
}
impl SharedAdaptiveContext {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(AdaptiveContext::new())),
}
}
#[must_use]
pub fn from_context(ctx: AdaptiveContext) -> Self {
Self {
inner: Arc::new(RwLock::new(ctx)),
}
}
pub fn record_actual(&self, operator_id: &str, actual: u64) {
if let Ok(mut ctx) = self.inner.write() {
ctx.record_actual(operator_id, actual);
}
}
#[must_use]
pub fn should_reoptimize(&self) -> bool {
if let Ok(mut ctx) = self.inner.write() {
ctx.should_reoptimize()
} else {
false
}
}
#[must_use]
pub fn snapshot(&self) -> Option<AdaptiveContext> {
self.inner.read().ok().map(|guard| AdaptiveContext {
checkpoints: guard.checkpoints.clone(),
reoptimization_threshold: guard.reoptimization_threshold,
min_rows: guard.min_rows,
reoptimization_triggered: guard.reoptimization_triggered,
trigger_operator: guard.trigger_operator.clone(),
})
}
}
impl Default for SharedAdaptiveContext {
fn default() -> Self {
Self::new()
}
}
pub struct CardinalityTrackingOperator {
inner: Box<dyn PushOperator>,
operator_id: String,
row_count: u64,
context: SharedAdaptiveContext,
}
impl CardinalityTrackingOperator {
pub fn new(
inner: Box<dyn PushOperator>,
operator_id: &str,
context: SharedAdaptiveContext,
) -> Self {
Self {
inner,
operator_id: operator_id.to_string(),
row_count: 0,
context,
}
}
#[must_use]
pub fn current_count(&self) -> u64 {
self.row_count
}
}
impl PushOperator for CardinalityTrackingOperator {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
self.row_count += chunk.len() as u64;
self.inner.push(chunk, sink)
}
fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
self.context
.record_actual(&self.operator_id, self.row_count);
self.inner.finalize(sink)
}
fn preferred_chunk_size(&self) -> ChunkSizeHint {
self.inner.preferred_chunk_size()
}
fn name(&self) -> &'static str {
self.inner.name()
}
}
pub struct CardinalityTrackingSink {
inner: Box<dyn Sink>,
operator_id: String,
row_count: u64,
context: SharedAdaptiveContext,
}
impl CardinalityTrackingSink {
pub fn new(inner: Box<dyn Sink>, operator_id: &str, context: SharedAdaptiveContext) -> Self {
Self {
inner,
operator_id: operator_id.to_string(),
row_count: 0,
context,
}
}
#[must_use]
pub fn current_count(&self) -> u64 {
self.row_count
}
}
impl Sink for CardinalityTrackingSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
self.row_count += chunk.len() as u64;
self.inner.consume(chunk)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
self.context
.record_actual(&self.operator_id, self.row_count);
self.inner.finalize()
}
fn name(&self) -> &'static str {
self.inner.name()
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum ReoptimizationDecision {
Continue,
Reoptimize {
trigger: String,
corrections: HashMap<String, f64>,
},
Abort {
reason: String,
},
}
#[must_use]
pub fn evaluate_reoptimization(ctx: &AdaptiveContext) -> ReoptimizationDecision {
let summary = ctx.summary();
if !summary.reoptimization_triggered {
return ReoptimizationDecision::Continue;
}
if summary.max_deviation_ratio > 100.0 {
return ReoptimizationDecision::Abort {
reason: format!(
"Catastrophic cardinality misestimate: {}x deviation",
summary.max_deviation_ratio
),
};
}
let corrections: HashMap<String, f64> = ctx
.all_checkpoints()
.iter()
.filter(|(_, cp)| cp.recorded)
.map(|(id, cp)| (id.clone(), cp.deviation_ratio()))
.collect();
ReoptimizationDecision::Reoptimize {
trigger: summary.trigger_operator.unwrap_or_default(),
corrections,
}
}
pub type PlanFactory = Box<dyn Fn(&AdaptiveContext) -> Vec<Box<dyn PushOperator>> + Send + Sync>;
#[derive(Debug, Clone)]
pub struct AdaptivePipelineConfig {
pub check_interval: u64,
pub reoptimization_threshold: f64,
pub min_rows_for_reoptimization: u64,
pub max_reoptimizations: usize,
}
impl Default for AdaptivePipelineConfig {
fn default() -> Self {
Self {
check_interval: 10_000,
reoptimization_threshold: DEFAULT_REOPTIMIZATION_THRESHOLD,
min_rows_for_reoptimization: MIN_ROWS_FOR_REOPTIMIZATION,
max_reoptimizations: 3,
}
}
}
impl AdaptivePipelineConfig {
#[must_use]
pub fn new(check_interval: u64, threshold: f64, min_rows: u64) -> Self {
Self {
check_interval,
reoptimization_threshold: threshold,
min_rows_for_reoptimization: min_rows,
max_reoptimizations: 3,
}
}
#[must_use]
pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
self.max_reoptimizations = max;
self
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveExecutionResult {
pub total_rows: u64,
pub reoptimization_count: usize,
pub triggers: Vec<String>,
pub final_context: AdaptiveSummary,
}
#[derive(Debug)]
pub struct AdaptiveCheckpoint {
pub id: String,
pub after_operator: usize,
pub estimated_cardinality: f64,
pub actual_rows: u64,
pub triggered: bool,
}
impl AdaptiveCheckpoint {
#[must_use]
pub fn new(id: &str, after_operator: usize, estimated: f64) -> Self {
Self {
id: id.to_string(),
after_operator,
estimated_cardinality: estimated,
actual_rows: 0,
triggered: false,
}
}
pub fn record_rows(&mut self, count: u64) {
self.actual_rows += count;
}
#[must_use]
pub fn exceeds_threshold(&self, threshold: f64, min_rows: u64) -> bool {
if self.actual_rows < min_rows {
return false;
}
if self.estimated_cardinality <= 0.0 {
return self.actual_rows > 0;
}
let ratio = self.actual_rows as f64 / self.estimated_cardinality;
ratio > threshold || ratio < 1.0 / threshold
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum AdaptiveEvent {
CheckpointReached {
id: String,
actual_rows: u64,
estimated: f64,
},
ReoptimizationTriggered {
checkpoint_id: String,
deviation_ratio: f64,
},
PlanSwitched {
old_operator_count: usize,
new_operator_count: usize,
},
ExecutionCompleted {
total_rows: u64,
},
}
pub type AdaptiveEventCallback = Box<dyn Fn(AdaptiveEvent) + Send + Sync>;
pub struct AdaptivePipelineBuilder {
checkpoints: Vec<AdaptiveCheckpoint>,
config: AdaptivePipelineConfig,
context: AdaptiveContext,
event_callback: Option<AdaptiveEventCallback>,
}
impl AdaptivePipelineBuilder {
#[must_use]
pub fn new() -> Self {
Self {
checkpoints: Vec::new(),
config: AdaptivePipelineConfig::default(),
context: AdaptiveContext::new(),
event_callback: None,
}
}
#[must_use]
pub fn with_config(mut self, config: AdaptivePipelineConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn with_checkpoint(mut self, id: &str, after_operator: usize, estimated: f64) -> Self {
self.checkpoints
.push(AdaptiveCheckpoint::new(id, after_operator, estimated));
self.context.set_estimate(id, estimated);
self
}
#[must_use]
pub fn with_event_callback(mut self, callback: AdaptiveEventCallback) -> Self {
self.event_callback = Some(callback);
self
}
#[must_use]
pub fn with_context(mut self, context: AdaptiveContext) -> Self {
self.context = context;
self
}
#[must_use]
pub fn build(self) -> AdaptiveExecutionConfig {
AdaptiveExecutionConfig {
checkpoints: self.checkpoints,
config: self.config,
context: self.context,
event_callback: self.event_callback,
}
}
}
impl Default for AdaptivePipelineBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct AdaptiveExecutionConfig {
pub checkpoints: Vec<AdaptiveCheckpoint>,
pub config: AdaptivePipelineConfig,
pub context: AdaptiveContext,
pub event_callback: Option<AdaptiveEventCallback>,
}
impl AdaptiveExecutionConfig {
#[must_use]
pub fn summary(&self) -> AdaptiveSummary {
self.context.summary()
}
pub fn record_checkpoint(&mut self, checkpoint_id: &str, actual: u64) {
self.context.record_actual(checkpoint_id, actual);
if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
cp.actual_rows = actual;
}
if let Some(ref callback) = self.event_callback {
let estimated = self
.context
.get_checkpoint(checkpoint_id)
.map_or(0.0, |cp| cp.estimated);
callback(AdaptiveEvent::CheckpointReached {
id: checkpoint_id.to_string(),
actual_rows: actual,
estimated,
});
}
}
#[must_use]
pub fn should_reoptimize(&self) -> Option<&AdaptiveCheckpoint> {
self.checkpoints.iter().find(|cp| {
!cp.triggered
&& cp.exceeds_threshold(
self.config.reoptimization_threshold,
self.config.min_rows_for_reoptimization,
)
})
}
pub fn mark_triggered(&mut self, checkpoint_id: &str) {
if let Some(cp) = self.checkpoints.iter_mut().find(|c| c.id == checkpoint_id) {
cp.triggered = true;
}
if let Some(ref callback) = self.event_callback {
let ratio = self
.context
.get_checkpoint(checkpoint_id)
.filter(|cp| cp.recorded)
.map_or(1.0, |cp| cp.deviation_ratio());
callback(AdaptiveEvent::ReoptimizationTriggered {
checkpoint_id: checkpoint_id.to_string(),
deviation_ratio: ratio,
});
}
}
}
use super::operators::{Operator, OperatorResult};
pub struct CardinalityTrackingWrapper {
inner: Box<dyn Operator>,
operator_id: String,
row_count: u64,
context: SharedAdaptiveContext,
finalized: bool,
}
impl CardinalityTrackingWrapper {
pub fn new(
inner: Box<dyn Operator>,
operator_id: &str,
context: SharedAdaptiveContext,
) -> Self {
Self {
inner,
operator_id: operator_id.to_string(),
row_count: 0,
context,
finalized: false,
}
}
#[must_use]
pub fn current_count(&self) -> u64 {
self.row_count
}
fn report_final(&mut self) {
if !self.finalized {
self.context
.record_actual(&self.operator_id, self.row_count);
self.finalized = true;
}
}
}
impl Operator for CardinalityTrackingWrapper {
fn next(&mut self) -> OperatorResult {
match self.inner.next() {
Ok(Some(chunk)) => {
self.row_count += chunk.row_count() as u64;
Ok(Some(chunk))
}
Ok(None) => {
self.report_final();
Ok(None)
}
Err(e) => {
self.report_final();
Err(e)
}
}
}
fn reset(&mut self) {
self.row_count = 0;
self.finalized = false;
self.inner.reset();
}
fn name(&self) -> &'static str {
self.inner.name()
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
impl Drop for CardinalityTrackingWrapper {
fn drop(&mut self) {
self.report_final();
}
}
use super::pipeline::{DEFAULT_CHUNK_SIZE, Source}; use super::sink::CollectorSink;
use super::source::OperatorSource;
pub struct AdaptivePipelineExecutor {
source: OperatorSource,
context: SharedAdaptiveContext,
config: AdaptivePipelineConfig,
}
impl AdaptivePipelineExecutor {
pub fn new(operator: Box<dyn Operator>, context: AdaptiveContext) -> Self {
Self {
source: OperatorSource::new(operator),
context: SharedAdaptiveContext::from_context(context),
config: AdaptivePipelineConfig::default(),
}
}
pub fn with_config(
operator: Box<dyn Operator>,
context: AdaptiveContext,
config: AdaptivePipelineConfig,
) -> Self {
Self {
source: OperatorSource::new(operator),
context: SharedAdaptiveContext::from_context(context),
config,
}
}
pub fn execute(mut self) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
let mut sink = CardinalityTrackingSink::new(
Box::new(CollectorSink::new()),
"output",
self.context.clone(),
);
let chunk_size = DEFAULT_CHUNK_SIZE;
let mut total_rows: u64 = 0;
let check_interval = self.config.check_interval;
while let Some(chunk) = self.source.next_chunk(chunk_size)? {
let chunk_rows = chunk.len() as u64;
total_rows += chunk_rows;
let continue_exec = sink.consume(chunk)?;
if !continue_exec {
break;
}
if total_rows >= check_interval
&& total_rows.is_multiple_of(check_interval)
&& self.context.should_reoptimize()
{
}
}
sink.finalize()?;
let summary = self
.context
.snapshot()
.map(|ctx| ctx.summary())
.unwrap_or_default();
Ok((Vec::new(), summary))
}
pub fn execute_collecting(
mut self,
) -> Result<(Vec<DataChunk>, AdaptiveSummary), OperatorError> {
let mut chunks = Vec::new();
let chunk_size = DEFAULT_CHUNK_SIZE;
let mut total_rows: u64 = 0;
let check_interval = self.config.check_interval;
while let Some(chunk) = self.source.next_chunk(chunk_size)? {
let chunk_rows = chunk.len() as u64;
total_rows += chunk_rows;
self.context.record_actual("root", total_rows);
if !chunk.is_empty() {
chunks.push(chunk);
}
if total_rows >= check_interval && total_rows.is_multiple_of(check_interval) {
let _ = self.context.should_reoptimize();
}
}
let summary = self
.context
.snapshot()
.map(|ctx| ctx.summary())
.unwrap_or_default();
Ok((chunks, summary))
}
pub fn context(&self) -> &SharedAdaptiveContext {
&self.context
}
}
pub fn execute_adaptive(
operator: Box<dyn Operator>,
context: Option<AdaptiveContext>,
config: Option<AdaptivePipelineConfig>,
) -> Result<(Vec<DataChunk>, Option<AdaptiveSummary>), OperatorError> {
let ctx = context.unwrap_or_default();
let cfg = config.unwrap_or_default();
let executor = AdaptivePipelineExecutor::with_config(operator, ctx, cfg);
let (chunks, summary) = executor.execute_collecting()?;
Ok((chunks, Some(summary)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_checkpoint_deviation_ratio() {
let mut cp = CardinalityCheckpoint::new("test", 100.0);
cp.record(200);
assert!((cp.deviation_ratio() - 2.0).abs() < 0.001);
}
#[test]
fn test_checkpoint_underestimate() {
let mut cp = CardinalityCheckpoint::new("test", 100.0);
cp.record(500);
assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
assert!(cp.is_significant_deviation(3.0));
}
#[test]
fn test_checkpoint_overestimate() {
let mut cp = CardinalityCheckpoint::new("test", 100.0);
cp.record(20);
assert!((cp.deviation_ratio() - 0.2).abs() < 0.001);
assert!(cp.is_significant_deviation(3.0)); }
#[test]
fn test_checkpoint_accurate() {
let mut cp = CardinalityCheckpoint::new("test", 100.0);
cp.record(110);
assert!((cp.deviation_ratio() - 1.1).abs() < 0.001);
assert!(!cp.is_significant_deviation(3.0)); }
#[test]
fn test_checkpoint_zero_estimate() {
let mut cp = CardinalityCheckpoint::new("test", 0.0);
cp.record(100);
assert!(cp.deviation_ratio().is_infinite());
}
#[test]
fn test_checkpoint_zero_both() {
let mut cp = CardinalityCheckpoint::new("test", 0.0);
cp.record(0);
assert!((cp.deviation_ratio() - 1.0).abs() < 0.001);
}
#[test]
fn test_feedback_collection() {
let mut feedback = CardinalityFeedback::new();
feedback.record("scan_1", 1000);
feedback.record("filter_1", 100);
assert_eq!(feedback.get("scan_1"), Some(1000));
assert_eq!(feedback.get("filter_1"), Some(100));
assert_eq!(feedback.get("unknown"), None);
}
#[test]
fn test_feedback_running_counter() {
let mut feedback = CardinalityFeedback::new();
feedback.init_counter("op_1");
feedback.add_rows("op_1", 100);
feedback.add_rows("op_1", 200);
feedback.add_rows("op_1", 50);
assert_eq!(feedback.get_running("op_1"), Some(350));
feedback.finalize_counter("op_1");
assert_eq!(feedback.get("op_1"), Some(350));
}
#[test]
fn test_adaptive_context_basic() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("scan", 1000.0);
ctx.set_estimate("filter", 100.0);
ctx.record_actual("scan", 1000);
ctx.record_actual("filter", 500);
let cp = ctx.get_checkpoint("filter").unwrap();
assert!((cp.deviation_ratio() - 5.0).abs() < 0.001);
}
#[test]
fn test_adaptive_context_should_reoptimize() {
let mut ctx = AdaptiveContext::with_thresholds(2.0, 100);
ctx.set_estimate("scan", 10000.0);
ctx.set_estimate("filter", 1000.0);
ctx.record_actual("scan", 10000);
ctx.record_actual("filter", 5000);
assert!(ctx.should_reoptimize());
assert_eq!(ctx.trigger_operator(), Some("filter"));
assert!(!ctx.should_reoptimize());
}
#[test]
fn test_adaptive_context_min_rows() {
let mut ctx = AdaptiveContext::with_thresholds(2.0, 1000);
ctx.set_estimate("filter", 100.0);
ctx.record_actual("filter", 500);
assert!(!ctx.should_reoptimize());
}
#[test]
fn test_adaptive_context_no_deviation() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("scan", 1000.0);
ctx.record_actual("scan", 1100);
assert!(!ctx.has_significant_deviation());
assert!(!ctx.should_reoptimize());
}
#[test]
fn test_adaptive_context_correction_factor() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("filter", 100.0);
ctx.record_actual("filter", 300);
assert!((ctx.correction_factor("filter") - 3.0).abs() < 0.001);
assert!((ctx.correction_factor("unknown") - 1.0).abs() < 0.001);
}
#[test]
fn test_adaptive_context_apply_feedback() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("scan", 1000.0);
ctx.set_estimate("filter", 100.0);
let mut feedback = CardinalityFeedback::new();
feedback.record("scan", 1000);
feedback.record("filter", 500);
ctx.apply_feedback(&feedback);
assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 1000);
assert_eq!(ctx.get_checkpoint("filter").unwrap().actual, 500);
}
#[test]
fn test_adaptive_summary() {
let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
ctx.set_estimate("op1", 100.0);
ctx.set_estimate("op2", 200.0);
ctx.set_estimate("op3", 300.0);
ctx.record_actual("op1", 100); ctx.record_actual("op2", 600);
let _ = ctx.should_reoptimize();
let summary = ctx.summary();
assert_eq!(summary.checkpoint_count, 3);
assert_eq!(summary.recorded_count, 2);
assert_eq!(summary.deviation_count, 1);
assert!(summary.reoptimization_triggered);
}
#[test]
fn test_adaptive_context_reset() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("scan", 1000.0);
ctx.record_actual("scan", 5000);
let _ = ctx.should_reoptimize();
assert!(ctx.reoptimization_triggered);
ctx.reset();
assert!(!ctx.reoptimization_triggered);
assert_eq!(ctx.get_checkpoint("scan").unwrap().actual, 0);
assert!(!ctx.get_checkpoint("scan").unwrap().recorded);
assert!((ctx.get_checkpoint("scan").unwrap().estimated - 1000.0).abs() < 0.001);
}
#[test]
fn test_shared_context() {
let ctx = SharedAdaptiveContext::new();
ctx.record_actual("op1", 1000);
let snapshot = ctx.snapshot().unwrap();
assert_eq!(snapshot.get_checkpoint("op1").unwrap().actual, 1000);
}
#[test]
fn test_reoptimization_decision_continue() {
let mut ctx = AdaptiveContext::new();
ctx.set_estimate("scan", 1000.0);
ctx.record_actual("scan", 1100);
let decision = evaluate_reoptimization(&ctx);
assert_eq!(decision, ReoptimizationDecision::Continue);
}
#[test]
fn test_reoptimization_decision_reoptimize() {
let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
ctx.set_estimate("filter", 100.0);
ctx.record_actual("filter", 500);
let _ = ctx.should_reoptimize();
let decision = evaluate_reoptimization(&ctx);
if let ReoptimizationDecision::Reoptimize {
trigger,
corrections,
} = decision
{
assert_eq!(trigger, "filter");
assert!((corrections.get("filter").copied().unwrap_or(0.0) - 5.0).abs() < 0.001);
} else {
panic!("Expected Reoptimize decision");
}
}
#[test]
fn test_reoptimization_decision_abort() {
let mut ctx = AdaptiveContext::with_thresholds(2.0, 0);
ctx.set_estimate("filter", 1.0);
ctx.record_actual("filter", 1000); let _ = ctx.should_reoptimize();
let decision = evaluate_reoptimization(&ctx);
if let ReoptimizationDecision::Abort { reason } = decision {
assert!(reason.contains("Catastrophic"));
} else {
panic!("Expected Abort decision");
}
}
#[test]
fn test_absolute_deviation() {
let mut cp = CardinalityCheckpoint::new("test", 100.0);
cp.record(150);
assert!((cp.absolute_deviation() - 50.0).abs() < 0.001);
}
#[test]
fn test_adaptive_checkpoint_basic() {
let mut cp = AdaptiveCheckpoint::new("filter_1", 0, 100.0);
assert_eq!(cp.actual_rows, 0);
assert!(!cp.triggered);
cp.record_rows(50);
assert_eq!(cp.actual_rows, 50);
cp.record_rows(100);
assert_eq!(cp.actual_rows, 150);
}
#[test]
fn test_adaptive_checkpoint_exceeds_threshold() {
let mut cp = AdaptiveCheckpoint::new("filter", 0, 100.0);
cp.record_rows(50);
assert!(!cp.exceeds_threshold(2.0, 100));
cp.record_rows(50);
assert!(!cp.exceeds_threshold(2.0, 100));
cp.actual_rows = 0;
cp.record_rows(500);
assert!(cp.exceeds_threshold(2.0, 100));
let mut cp2 = AdaptiveCheckpoint::new("filter2", 0, 1000.0);
cp2.record_rows(200);
assert!(cp2.exceeds_threshold(2.0, 100)); }
#[test]
fn test_adaptive_pipeline_config_default() {
let config = AdaptivePipelineConfig::default();
assert_eq!(config.check_interval, 10_000);
assert!((config.reoptimization_threshold - DEFAULT_REOPTIMIZATION_THRESHOLD).abs() < 0.001);
assert_eq!(
config.min_rows_for_reoptimization,
MIN_ROWS_FOR_REOPTIMIZATION
);
assert_eq!(config.max_reoptimizations, 3);
}
#[test]
fn test_adaptive_pipeline_config_custom() {
let config = AdaptivePipelineConfig::new(5000, 2.0, 500).with_max_reoptimizations(5);
assert_eq!(config.check_interval, 5000);
assert!((config.reoptimization_threshold - 2.0).abs() < 0.001);
assert_eq!(config.min_rows_for_reoptimization, 500);
assert_eq!(config.max_reoptimizations, 5);
}
#[test]
fn test_adaptive_pipeline_builder() {
let config = AdaptivePipelineBuilder::new()
.with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
.with_checkpoint("scan", 0, 10000.0)
.with_checkpoint("filter", 1, 1000.0)
.build();
assert_eq!(config.checkpoints.len(), 2);
assert_eq!(config.checkpoints[0].id, "scan");
assert!((config.checkpoints[0].estimated_cardinality - 10000.0).abs() < 0.001);
assert_eq!(config.checkpoints[1].id, "filter");
assert!((config.checkpoints[1].estimated_cardinality - 1000.0).abs() < 0.001);
}
#[test]
fn test_adaptive_execution_config_record_checkpoint() {
let mut config = AdaptivePipelineBuilder::new()
.with_checkpoint("filter", 0, 100.0)
.build();
config.record_checkpoint("filter", 500);
let cp = config.context.get_checkpoint("filter").unwrap();
assert_eq!(cp.actual, 500);
assert!(cp.recorded);
let acp = config
.checkpoints
.iter()
.find(|c| c.id == "filter")
.unwrap();
assert_eq!(acp.actual_rows, 500);
}
#[test]
fn test_adaptive_execution_config_should_reoptimize() {
let mut config = AdaptivePipelineBuilder::new()
.with_config(AdaptivePipelineConfig::new(1000, 2.0, 100))
.with_checkpoint("filter", 0, 100.0)
.build();
assert!(config.should_reoptimize().is_none());
config.record_checkpoint("filter", 150);
assert!(config.should_reoptimize().is_none());
config.checkpoints[0].actual_rows = 0; config.record_checkpoint("filter", 500);
config.checkpoints[0].actual_rows = 500;
let trigger = config.should_reoptimize();
assert!(trigger.is_some());
assert_eq!(trigger.unwrap().id, "filter");
}
#[test]
fn test_adaptive_execution_config_mark_triggered() {
let mut config = AdaptivePipelineBuilder::new()
.with_checkpoint("filter", 0, 100.0)
.build();
assert!(!config.checkpoints[0].triggered);
config.mark_triggered("filter");
assert!(config.checkpoints[0].triggered);
}
#[test]
fn test_adaptive_event_callback() {
use std::sync::atomic::AtomicUsize;
let event_count = Arc::new(AtomicUsize::new(0));
let counter = event_count.clone();
let mut config = AdaptivePipelineBuilder::new()
.with_checkpoint("filter", 0, 100.0)
.with_event_callback(Box::new(move |_event| {
counter.fetch_add(1, Ordering::Relaxed);
}))
.build();
config.record_checkpoint("filter", 500);
assert_eq!(event_count.load(Ordering::Relaxed), 1);
config.mark_triggered("filter");
assert_eq!(event_count.load(Ordering::Relaxed), 2);
}
#[test]
fn test_adaptive_checkpoint_with_zero_estimate() {
let mut cp = AdaptiveCheckpoint::new("test", 0, 0.0);
cp.record_rows(100);
assert!(cp.exceeds_threshold(2.0, 50));
}
}