use crate::algebra::Algebra;
use crate::cardinality_estimator::CardinalityEstimator;
use crate::cost_model::CostModel;
use anyhow::{anyhow, Result};
use scirs2_core::metrics::{Counter, Timer};
use scirs2_core::profiling::Profiler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tracing::{debug, info};
pub struct AdaptiveExecutor {
optimizer: Arc<RwLock<AdaptiveOptimizer>>,
config: AdaptiveConfig,
profiler: Profiler,
metrics: AdaptiveMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveConfig {
pub enable_adaptive: bool,
pub re_opt_trigger_percent: f64,
pub re_opt_trigger_seconds: u64,
pub min_reopt_interval_seconds: u64,
pub plan_switch_threshold: f64,
pub deviation_threshold: f64,
pub max_reoptimizations: usize,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
enable_adaptive: true,
re_opt_trigger_percent: 0.1, re_opt_trigger_seconds: 5,
min_reopt_interval_seconds: 5,
plan_switch_threshold: 2.0, deviation_threshold: 5.0, max_reoptimizations: 3,
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeStatistics {
pub operator_stats: HashMap<OperatorId, OperatorStats>,
pub execution_time: Duration,
pub rows_processed: u64,
pub start_time: Instant,
}
impl Default for RuntimeStatistics {
fn default() -> Self {
Self {
operator_stats: HashMap::new(),
execution_time: Duration::ZERO,
rows_processed: 0,
start_time: Instant::now(),
}
}
}
impl RuntimeStatistics {
pub fn update_from_batch(&mut self, batch: &BatchResult) -> Result<()> {
self.rows_processed += batch.rows_produced;
self.execution_time = self.start_time.elapsed();
for (op_id, op_result) in &batch.operator_results {
let stats = self
.operator_stats
.entry(op_id.clone())
.or_insert_with(|| OperatorStats::new(op_id.clone()));
stats.actual_cardinality += op_result.rows_produced;
stats.actual_time_ms += op_result.execution_time_ms;
stats.update_deviation();
}
Ok(())
}
pub fn max_deviation(&self) -> f64 {
self.operator_stats
.values()
.map(|s| s.deviation)
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or(1.0)
}
}
#[derive(Debug, Clone)]
pub struct OperatorStats {
pub operator_id: OperatorId,
pub actual_cardinality: u64,
pub estimated_cardinality: u64,
pub actual_time_ms: f64,
pub estimated_time_ms: f64,
pub deviation: f64,
}
impl OperatorStats {
pub fn new(operator_id: OperatorId) -> Self {
Self {
operator_id,
actual_cardinality: 0,
estimated_cardinality: 1,
actual_time_ms: 0.0,
estimated_time_ms: 1.0,
deviation: 1.0,
}
}
pub fn update_deviation(&mut self) {
if self.estimated_cardinality > 0 {
self.deviation = self.actual_cardinality as f64 / self.estimated_cardinality as f64;
}
}
pub fn set_estimates(&mut self, cardinality: u64, time_ms: f64) {
self.estimated_cardinality = cardinality;
self.estimated_time_ms = time_ms;
}
}
pub type OperatorId = String;
#[derive(Debug, Clone)]
pub struct BatchResult {
pub rows_produced: u64,
pub operator_results: HashMap<OperatorId, OperatorResult>,
pub is_complete: bool,
}
#[derive(Debug, Clone)]
pub struct OperatorResult {
pub rows_produced: u64,
pub execution_time_ms: f64,
}
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub algebra: Algebra,
pub estimated_cost: f64,
pub estimated_total_rows: u64,
pub operator_estimates: HashMap<OperatorId, u64>,
}
#[allow(dead_code)]
pub struct AdaptiveOptimizer {
cardinality_estimator: Arc<RwLock<CardinalityEstimator>>,
cost_model: Arc<RwLock<CostModel>>,
}
impl AdaptiveOptimizer {
pub fn new(
cardinality_estimator: Arc<RwLock<CardinalityEstimator>>,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
Self {
cardinality_estimator,
cost_model,
}
}
pub fn update_cardinality_estimate(&mut self, _op_id: OperatorId, actual: u64) -> Result<()> {
debug!("Updated cardinality estimate: actual={}", actual);
Ok(())
}
pub fn update_cost_estimate(&mut self, _op_id: OperatorId, actual_time_ms: f64) -> Result<()> {
debug!("Updated cost estimate: actual_time_ms={}", actual_time_ms);
Ok(())
}
pub fn optimize(&self, _algebra: &Algebra) -> Result<QueryPlan> {
Ok(QueryPlan {
algebra: Algebra::Bgp(vec![]),
estimated_cost: 100.0,
estimated_total_rows: 1000,
operator_estimates: HashMap::new(),
})
}
}
pub struct AdaptiveMetrics {
pub reoptimizations: Counter,
pub plan_switches: Counter,
pub reopt_time: Timer,
pub queries_improved: Counter,
}
impl Default for AdaptiveMetrics {
fn default() -> Self {
Self {
reoptimizations: Counter::new("adaptive.reoptimizations".to_string()),
plan_switches: Counter::new("adaptive.plan_switches".to_string()),
reopt_time: Timer::new("adaptive.reopt_time".to_string()),
queries_improved: Counter::new("adaptive.queries_improved".to_string()),
}
}
}
impl AdaptiveExecutor {
pub fn new(
cardinality_estimator: Arc<RwLock<CardinalityEstimator>>,
cost_model: Arc<RwLock<CostModel>>,
config: AdaptiveConfig,
) -> Self {
let optimizer = Arc::new(RwLock::new(AdaptiveOptimizer::new(
cardinality_estimator,
cost_model,
)));
Self {
optimizer,
config,
profiler: Profiler::new(),
metrics: AdaptiveMetrics::default(),
}
}
pub async fn execute_adaptive(
&mut self,
query: &Algebra,
initial_plan: QueryPlan,
) -> Result<QueryResults> {
let mut current_plan = initial_plan;
let mut stats = RuntimeStatistics {
start_time: Instant::now(),
..Default::default()
};
let mut last_reopt = Instant::now();
let start_time = Instant::now();
let mut executor = CheckpointedExecutor::new(current_plan.clone())?;
loop {
let batch_result = executor.execute_batch(1000).await?;
stats.update_from_batch(&batch_result)?;
let elapsed = start_time.elapsed();
let should_reopt = self.should_reoptimize(&stats, elapsed, last_reopt.elapsed())?;
if should_reopt {
info!(
"Triggering adaptive re-optimization at {}s",
elapsed.as_secs_f64()
);
self.metrics.reoptimizations.inc();
self.profiler.start();
let refined_plan = self.reoptimize_with_statistics(query, &stats)?;
if self.is_plan_significantly_better(¤t_plan, &refined_plan, &stats)? {
let improvement =
self.estimate_improvement(¤t_plan, &refined_plan, &stats)?;
info!(
"Switching to new plan (estimated {}x improvement)",
improvement
);
let checkpoint = executor.checkpoint()?;
current_plan = refined_plan;
executor = CheckpointedExecutor::new_from_checkpoint(
current_plan.clone(),
checkpoint,
)?;
self.metrics.plan_switches.inc();
last_reopt = Instant::now();
} else {
info!("New plan not significantly better, continuing with current plan");
}
}
if batch_result.is_complete {
break;
}
}
executor.finalize()
}
fn should_reoptimize(
&self,
stats: &RuntimeStatistics,
elapsed: Duration,
since_last_reopt: Duration,
) -> Result<bool> {
if !self.config.enable_adaptive {
return Ok(false);
}
if since_last_reopt.as_secs() < self.config.min_reopt_interval_seconds {
return Ok(false);
}
if elapsed.as_secs() >= self.config.re_opt_trigger_seconds {
debug!("Re-optimization triggered by time threshold");
return Ok(true);
}
let max_deviation = stats.max_deviation();
if max_deviation > self.config.deviation_threshold {
info!("Large deviation detected: {}x", max_deviation);
return Ok(true);
}
Ok(false)
}
fn reoptimize_with_statistics(
&self,
query: &Algebra,
stats: &RuntimeStatistics,
) -> Result<QueryPlan> {
let mut optimizer = self
.optimizer
.write()
.map_err(|e| anyhow!("Failed to acquire optimizer lock: {}", e))?;
for (op_id, op_stats) in &stats.operator_stats {
optimizer.update_cardinality_estimate(op_id.clone(), op_stats.actual_cardinality)?;
optimizer.update_cost_estimate(op_id.clone(), op_stats.actual_time_ms)?;
}
let new_plan = optimizer.optimize(query)?;
Ok(new_plan)
}
fn is_plan_significantly_better(
&self,
current_plan: &QueryPlan,
new_plan: &QueryPlan,
stats: &RuntimeStatistics,
) -> Result<bool> {
let current_remaining_cost = self.estimate_remaining_cost(current_plan, stats)?;
let new_remaining_cost = self.estimate_remaining_cost(new_plan, stats)?;
let improvement = current_remaining_cost / new_remaining_cost;
Ok(improvement > self.config.plan_switch_threshold)
}
fn estimate_remaining_cost(&self, plan: &QueryPlan, stats: &RuntimeStatistics) -> Result<f64> {
let processed = stats.rows_processed;
let total_estimated = plan.estimated_total_rows.max(1);
let remaining_percent = if processed < total_estimated {
(total_estimated - processed) as f64 / total_estimated as f64
} else {
0.1 };
Ok(plan.estimated_cost * remaining_percent)
}
fn estimate_improvement(
&self,
current: &QueryPlan,
new: &QueryPlan,
stats: &RuntimeStatistics,
) -> Result<f64> {
let current_cost = self.estimate_remaining_cost(current, stats)?;
let new_cost = self.estimate_remaining_cost(new, stats)?.max(0.1);
Ok(current_cost / new_cost)
}
pub fn get_config(&self) -> &AdaptiveConfig {
&self.config
}
pub fn get_profiler(&self) -> &Profiler {
&self.profiler
}
pub fn get_metrics(&self) -> &AdaptiveMetrics {
&self.metrics
}
}
#[allow(dead_code)]
pub struct CheckpointedExecutor {
plan: QueryPlan,
state: ExecutorState,
rows_produced: u64,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutorState {
#[allow(clippy::derivable_impls)]
pub operator_states: HashMap<OperatorId, OperatorState>,
pub rows_processed: u64,
pub intermediate_results: Vec<u8>, }
#[derive(Debug, Clone)]
pub struct OperatorState {
pub operator_id: OperatorId,
pub data: Vec<u8>,
pub rows_processed: u64,
}
impl CheckpointedExecutor {
pub fn new(plan: QueryPlan) -> Result<Self> {
Ok(Self {
plan,
state: ExecutorState::default(),
rows_produced: 0,
})
}
pub fn new_from_checkpoint(plan: QueryPlan, checkpoint: ExecutorState) -> Result<Self> {
Ok(Self {
plan,
state: checkpoint,
rows_produced: 0,
})
}
pub async fn execute_batch(&mut self, batch_size: u64) -> Result<BatchResult> {
let rows_produced = batch_size.min(100); self.rows_produced += rows_produced;
self.state.rows_processed += rows_produced;
let mut operator_results = HashMap::new();
operator_results.insert(
"scan_op".to_string(),
OperatorResult {
rows_produced,
execution_time_ms: 10.0,
},
);
let is_complete = self.rows_produced >= 1000;
Ok(BatchResult {
rows_produced,
operator_results,
is_complete,
})
}
pub fn checkpoint(&self) -> Result<ExecutorState> {
Ok(self.state.clone())
}
pub fn finalize(self) -> Result<QueryResults> {
Ok(QueryResults {
rows: self.rows_produced,
execution_time: Duration::from_millis(100),
})
}
}
#[derive(Debug, Clone)]
pub struct QueryResults {
pub rows: u64,
pub execution_time: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cardinality_estimator::EstimatorConfig;
use crate::cost_model::CostModelConfig;
#[tokio::test]
async fn test_adaptive_executor_basic() -> Result<()> {
let estimator = Arc::new(RwLock::new(CardinalityEstimator::new(
EstimatorConfig::default(),
)));
let cost_model = Arc::new(RwLock::new(CostModel::new(CostModelConfig::default())));
let config = AdaptiveConfig::default();
let mut executor = AdaptiveExecutor::new(estimator, cost_model, config);
let query = Algebra::Bgp(vec![]);
let plan = QueryPlan {
algebra: query.clone(),
estimated_cost: 1000.0,
estimated_total_rows: 10000,
operator_estimates: HashMap::new(),
};
let results = executor.execute_adaptive(&query, plan).await?;
assert!(results.rows > 0);
Ok(())
}
#[tokio::test]
async fn test_checkpointing() -> Result<()> {
let plan = QueryPlan {
algebra: Algebra::Bgp(vec![]),
estimated_cost: 100.0,
estimated_total_rows: 1000,
operator_estimates: HashMap::new(),
};
let mut executor = CheckpointedExecutor::new(plan.clone())?;
let _batch1 = executor.execute_batch(100).await?;
let _batch2 = executor.execute_batch(100).await?;
let checkpoint = executor.checkpoint()?;
assert_eq!(checkpoint.rows_processed, 200);
let mut executor2 = CheckpointedExecutor::new_from_checkpoint(plan, checkpoint)?;
let _batch3 = executor2.execute_batch(100).await?;
Ok(())
}
#[test]
fn test_runtime_statistics() {
let mut stats = RuntimeStatistics {
start_time: Instant::now(),
..Default::default()
};
let batch = BatchResult {
rows_produced: 100,
operator_results: {
let mut map = HashMap::new();
map.insert(
"op1".to_string(),
OperatorResult {
rows_produced: 100,
execution_time_ms: 50.0,
},
);
map
},
is_complete: false,
};
stats.update_from_batch(&batch).ok();
assert_eq!(stats.rows_processed, 100);
}
#[test]
fn test_deviation_calculation() {
let mut op_stats = OperatorStats::new("test_op".to_string());
op_stats.set_estimates(100, 10.0);
op_stats.actual_cardinality = 500;
op_stats.update_deviation();
assert!((op_stats.deviation - 5.0).abs() < 0.01);
}
#[test]
fn test_config_defaults() {
let config = AdaptiveConfig::default();
assert!(config.enable_adaptive);
assert_eq!(config.re_opt_trigger_seconds, 5);
assert_eq!(config.min_reopt_interval_seconds, 5);
assert_eq!(config.plan_switch_threshold, 2.0);
assert_eq!(config.deviation_threshold, 5.0);
}
}