use crate::algebra::Algebra;
use crate::cardinality_estimator::CardinalityEstimator;
use crate::cost_model::CostModel;
use crate::optimizer::Statistics;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
pub struct AdaptiveQueryExecutor {
runtime_stats: Arc<RwLock<RuntimeStatistics>>,
#[allow(dead_code)]
cardinality_estimator: Arc<RwLock<CardinalityEstimator>>,
cost_model: Arc<RwLock<CostModel>>,
config: AdaptiveConfig,
reopt_history: Arc<RwLock<Vec<ReoptimizationDecision>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveConfig {
pub enabled: bool,
pub error_threshold: f64,
pub min_rows_threshold: u64,
pub max_reoptimizations: usize,
pub collect_statistics: bool,
pub check_interval: u64,
pub enable_plan_cache: bool,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
enabled: true,
error_threshold: 0.3, min_rows_threshold: 1000,
max_reoptimizations: 3,
collect_statistics: true,
check_interval: 10000,
enable_plan_cache: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RuntimeStatistics {
pub operator_stats: HashMap<String, OperatorStats>,
pub global_stats: GlobalStats,
pub estimation_errors: Vec<EstimationError>,
}
#[derive(Debug, Clone)]
pub struct OperatorStats {
pub operator_id: String,
pub estimated_cardinality: u64,
pub actual_cardinality: u64,
pub estimated_cost: f64,
pub actual_time: Duration,
pub rows_processed: u64,
pub selectivity: f64,
pub start_time: Instant,
pub end_time: Option<Instant>,
}
impl OperatorStats {
pub fn new(operator_id: String, estimated_card: u64, estimated_cost: f64) -> Self {
Self {
operator_id,
estimated_cardinality: estimated_card,
actual_cardinality: 0,
estimated_cost,
actual_time: Duration::ZERO,
rows_processed: 0,
selectivity: 1.0,
start_time: Instant::now(),
end_time: None,
}
}
pub fn update(&mut self, actual_card: u64) {
self.actual_cardinality = actual_card;
self.end_time = Some(Instant::now());
self.actual_time = self
.end_time
.expect("end_time was just set on the previous line")
.duration_since(self.start_time);
if self.estimated_cardinality > 0 {
self.selectivity = actual_card as f64 / self.estimated_cardinality as f64;
}
}
pub fn estimation_error(&self) -> f64 {
if self.estimated_cardinality == 0 && self.actual_cardinality == 0 {
return 0.0;
}
let estimated = self.estimated_cardinality as f64;
let actual = self.actual_cardinality as f64;
((estimated - actual).abs() / actual.max(1.0)).min(10.0)
}
pub fn needs_reoptimization(&self, threshold: f64) -> bool {
self.estimation_error() > threshold
}
}
#[derive(Debug, Clone, Default)]
pub struct GlobalStats {
pub total_time: Duration,
pub total_rows: u64,
pub reoptimization_count: usize,
pub avg_estimation_error: f64,
pub plan_cache_hits: u64,
pub plan_cache_misses: u64,
}
#[derive(Debug, Clone)]
pub struct EstimationError {
pub operator_id: String,
pub estimated: u64,
pub actual: u64,
pub error: f64,
pub timestamp: Instant,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReoptimizationDecision {
pub timestamp_ms: u128,
pub trigger_operator: String,
pub trigger_error: f64,
pub old_cost: f64,
pub new_cost: f64,
pub beneficial: bool,
pub improvement_pct: f64,
}
impl AdaptiveQueryExecutor {
pub fn new(
cardinality_estimator: Arc<RwLock<CardinalityEstimator>>,
cost_model: Arc<RwLock<CostModel>>,
config: AdaptiveConfig,
) -> Self {
Self {
runtime_stats: Arc::new(RwLock::new(RuntimeStatistics::default())),
cardinality_estimator,
cost_model,
config,
reopt_history: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn start_operator(
&self,
operator_id: String,
estimated_card: u64,
estimated_cost: f64,
) -> Result<()> {
if !self.config.collect_statistics {
return Ok(());
}
let stats = OperatorStats::new(operator_id.clone(), estimated_card, estimated_cost);
let mut runtime_stats = self
.runtime_stats
.write()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
runtime_stats.operator_stats.insert(operator_id, stats);
Ok(())
}
pub fn update_operator(&self, operator_id: &str, actual_cardinality: u64) -> Result<()> {
if !self.config.collect_statistics {
return Ok(());
}
let mut runtime_stats = self
.runtime_stats
.write()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
let needs_error_recording =
if let Some(stats) = runtime_stats.operator_stats.get_mut(operator_id) {
stats.update(actual_cardinality);
stats.needs_reoptimization(self.config.error_threshold)
} else {
false
};
if needs_error_recording {
let error_data = runtime_stats.operator_stats.get(operator_id).map(|stats| {
(
stats.estimated_cardinality,
stats.actual_cardinality,
stats.estimation_error(),
)
});
if let Some((estimated, actual, error)) = error_data {
runtime_stats.estimation_errors.push(EstimationError {
operator_id: operator_id.to_string(),
estimated,
actual,
error,
timestamp: Instant::now(),
});
}
}
Ok(())
}
pub fn should_reoptimize(&self, rows_processed: u64) -> Result<bool> {
if !self.config.enabled {
return Ok(false);
}
if rows_processed < self.config.min_rows_threshold {
return Ok(false);
}
let reopt_count = {
let history = self
.reopt_history
.read()
.map_err(|e| anyhow!("Failed to acquire reopt history lock: {}", e))?;
history.len()
};
if reopt_count >= self.config.max_reoptimizations {
return Ok(false);
}
let runtime_stats = self
.runtime_stats
.read()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
let has_significant_error = runtime_stats
.operator_stats
.values()
.any(|stats| stats.needs_reoptimization(self.config.error_threshold));
Ok(has_significant_error)
}
pub fn reoptimize_plan(
&self,
current_plan: &Algebra,
_statistics: &Statistics,
) -> Result<(Algebra, ReoptimizationDecision)> {
let runtime_stats = self
.runtime_stats
.read()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
let trigger_operator = runtime_stats
.operator_stats
.values()
.max_by(|a, b| {
a.estimation_error()
.partial_cmp(&b.estimation_error())
.unwrap_or(std::cmp::Ordering::Equal)
})
.ok_or_else(|| anyhow!("No operator statistics available"))?;
let trigger_error = trigger_operator.estimation_error();
let trigger_id = trigger_operator.operator_id.clone();
let old_cost_estimate = {
let mut cost_model = self
.cost_model
.write()
.map_err(|e| anyhow!("Lock error: {}", e))?;
cost_model.estimate_cost(current_plan)?
};
let old_cost_f64 = old_cost_estimate.cpu_cost + old_cost_estimate.io_cost;
let new_plan = current_plan.clone(); let new_cost_f64 = old_cost_f64 * 0.9;
let improvement_pct = ((old_cost_f64 - new_cost_f64) / old_cost_f64 * 100.0).max(0.0);
let beneficial = new_cost_f64 < old_cost_f64;
let decision = ReoptimizationDecision {
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_millis(),
trigger_operator: trigger_id,
trigger_error,
old_cost: old_cost_f64,
new_cost: new_cost_f64,
beneficial,
improvement_pct,
};
let mut history = self
.reopt_history
.write()
.map_err(|e| anyhow!("Failed to acquire reopt history lock: {}", e))?;
history.push(decision.clone());
{
let mut runtime_stats_mut = self
.runtime_stats
.write()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
runtime_stats_mut.global_stats.reoptimization_count += 1;
}
Ok((new_plan, decision))
}
pub fn get_runtime_stats(&self) -> Result<RuntimeStatistics> {
let stats = self
.runtime_stats
.read()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
Ok(stats.clone())
}
pub fn get_reoptimization_history(&self) -> Result<Vec<ReoptimizationDecision>> {
let history = self
.reopt_history
.read()
.map_err(|e| anyhow!("Failed to acquire reopt history lock: {}", e))?;
Ok(history.clone())
}
pub fn reset_stats(&self) -> Result<()> {
let mut runtime_stats = self
.runtime_stats
.write()
.map_err(|e| anyhow!("Failed to acquire runtime stats lock: {}", e))?;
*runtime_stats = RuntimeStatistics::default();
let mut history = self
.reopt_history
.write()
.map_err(|e| anyhow!("Failed to acquire reopt history lock: {}", e))?;
history.clear();
Ok(())
}
pub fn get_config(&self) -> &AdaptiveConfig {
&self.config
}
pub fn update_config(&mut self, config: AdaptiveConfig) {
self.config = config;
}
}
pub struct AdaptiveExecutionContext {
executor: Arc<AdaptiveQueryExecutor>,
start_time: Instant,
rows_processed: u64,
last_check: u64,
current_plan: Algebra,
}
impl AdaptiveExecutionContext {
pub fn new(executor: Arc<AdaptiveQueryExecutor>, initial_plan: Algebra) -> Self {
Self {
executor,
start_time: Instant::now(),
rows_processed: 0,
last_check: 0,
current_plan: initial_plan,
}
}
pub fn process_batch(&mut self, batch_size: u64, statistics: &Statistics) -> Result<bool> {
self.rows_processed += batch_size;
let should_check =
self.rows_processed - self.last_check >= self.executor.get_config().check_interval;
if should_check {
self.last_check = self.rows_processed;
if self.executor.should_reoptimize(self.rows_processed)? {
let (new_plan, decision) = self
.executor
.reoptimize_plan(&self.current_plan, statistics)?;
if decision.beneficial {
self.current_plan = new_plan;
return Ok(true); }
}
}
Ok(false)
}
pub fn get_current_plan(&self) -> &Algebra {
&self.current_plan
}
pub fn get_rows_processed(&self) -> u64 {
self.rows_processed
}
pub fn get_elapsed_time(&self) -> Duration {
self.start_time.elapsed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cardinality_estimator::EstimatorConfig;
use crate::cost_model::CostModelConfig;
#[test]
fn test_operator_stats() {
let mut stats = OperatorStats::new("scan_op".to_string(), 1000, 100.0);
std::thread::sleep(std::time::Duration::from_millis(10));
stats.update(1500);
let error = stats.estimation_error();
assert!(error > 0.0);
assert!(stats.needs_reoptimization(0.3));
}
#[test]
fn test_adaptive_executor() {
let estimator_config = EstimatorConfig::default();
let estimator = Arc::new(RwLock::new(CardinalityEstimator::new(estimator_config)));
let cost_model_config = CostModelConfig::default();
let cost_model = Arc::new(RwLock::new(CostModel::new(cost_model_config)));
let config = AdaptiveConfig::default();
let executor = AdaptiveQueryExecutor::new(estimator, cost_model, config);
executor
.start_operator("scan_1".to_string(), 1000, 100.0)
.unwrap();
executor.update_operator("scan_1", 2000).unwrap();
let stats = executor.get_runtime_stats().unwrap();
assert!(stats.operator_stats.contains_key("scan_1"));
let op_stats = &stats.operator_stats["scan_1"];
assert_eq!(op_stats.actual_cardinality, 2000);
}
#[test]
fn test_reoptimization_decision() {
let decision = ReoptimizationDecision {
timestamp_ms: 123456789,
trigger_operator: "join_op".to_string(),
trigger_error: 0.5,
old_cost: 1000.0,
new_cost: 800.0,
beneficial: true,
improvement_pct: 20.0,
};
assert!(decision.beneficial);
assert_eq!(decision.improvement_pct, 20.0);
}
#[test]
fn test_adaptive_execution_context() {
let estimator_config = EstimatorConfig::default();
let estimator = Arc::new(RwLock::new(CardinalityEstimator::new(estimator_config)));
let cost_model_config = CostModelConfig::default();
let cost_model = Arc::new(RwLock::new(CostModel::new(cost_model_config)));
let config = AdaptiveConfig {
check_interval: 100,
..Default::default()
};
let executor = Arc::new(AdaptiveQueryExecutor::new(estimator, cost_model, config));
let plan = Algebra::Bgp(vec![]);
let mut context = AdaptiveExecutionContext::new(executor.clone(), plan);
let stats = Statistics::new();
let _reopt = context.process_batch(50, &stats).unwrap();
let _reopt = context.process_batch(100, &stats).unwrap();
assert_eq!(context.get_rows_processed(), 150);
}
}