use crate::{
error::{FusekiError, FusekiResult},
store::Store,
};
use scirs2_core::ndarray_ext::{Array1, Array2, ArrayView1, Axis};
use scirs2_core::parallel_ops::{par_chunks, par_join};
use scirs2_core::profiling::Profiler;
use scirs2_core::random::Random;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, instrument, warn};
#[derive(Clone)]
pub struct AdaptiveExecutionEngine {
performance_history: Arc<RwLock<QueryPerformanceHistory>>,
cost_model: Arc<StatisticalCostModel>,
graph_optimizer: Arc<GraphBasedOptimizer>,
ml_predictor: Arc<RwLock<PerformancePredictor>>,
profiler: Arc<RwLock<Profiler>>,
config: Arc<AdaptiveExecutionConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveExecutionConfig {
pub enable_adaptive_learning: bool,
pub min_sample_size: usize,
pub confidence_level: f64,
pub enable_cost_model_tuning: bool,
pub enable_ml_prediction: bool,
pub ga_population_size: usize,
pub ga_max_generations: usize,
pub enable_parallel_evaluation: bool,
pub parallel_workers: usize,
}
impl Default for AdaptiveExecutionConfig {
fn default() -> Self {
Self {
enable_adaptive_learning: true,
min_sample_size: 10,
confidence_level: 0.95,
enable_cost_model_tuning: true,
enable_ml_prediction: true,
ga_population_size: 50,
ga_max_generations: 100,
enable_parallel_evaluation: true,
parallel_workers: num_cpus::get(),
}
}
}
#[derive(Debug)]
pub struct QueryPerformanceHistory {
records: HashMap<String, Vec<ExecutionRecord>>,
statistics: HashMap<String, QueryStatistics>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionRecord {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub execution_time_ms: f64,
pub result_cardinality: u64,
pub plan_id: String,
pub memory_used_bytes: u64,
pub cpu_time_ms: f64,
pub io_operations: u64,
pub cache_hits: u64,
pub cache_misses: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryStatistics {
pub sample_count: usize,
pub mean_execution_time_ms: f64,
pub std_dev_execution_time_ms: f64,
pub median_execution_time_ms: f64,
pub p95_execution_time_ms: f64,
pub p99_execution_time_ms: f64,
pub mean_cardinality: f64,
pub correlation_cardinality_time: f64,
pub trend_slope: f64,
pub trend_confidence: f64,
}
#[derive(Debug)]
pub struct StatisticalCostModel {
join_selectivity: RwLock<Array2<f64>>,
cost_factors: RwLock<Array1<f64>>,
}
#[derive(Debug)]
pub struct GraphBasedOptimizer {
join_graph: RwLock<Array2<f64>>,
node_weights: RwLock<HashMap<String, f64>>,
edge_weights: RwLock<HashMap<(String, String), f64>>,
}
#[derive(Debug)]
pub struct PerformancePredictor {
features: Array2<f64>,
targets: Array1<f64>,
feature_mean: Array1<f64>,
feature_std: Array1<f64>,
sample_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveQueryPlan {
pub plan_id: String,
pub query_pattern: String,
pub execution_strategy: ExecutionStrategy,
pub estimated_cost: f64,
pub confidence_interval: (f64, f64),
pub predicted_execution_time_ms: f64,
pub predicted_cardinality: u64,
pub optimization_method: String,
pub parallel_degree: usize,
pub adaptive_hints: Vec<AdaptiveHint>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExecutionStrategy {
Sequential,
Parallel { degree: usize },
WorkStealing { workers: usize },
Adaptive { initial_degree: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveHint {
pub hint_type: String,
pub description: String,
pub confidence: f64,
pub expected_improvement: f64,
pub source: String, }
#[derive(Debug, Clone)]
pub struct QueryFeatures {
pub triple_count: f64,
pub join_count: f64,
pub filter_count: f64,
pub optional_count: f64,
pub union_count: f64,
pub subquery_depth: f64,
pub avg_selectivity: f64,
pub graph_complexity: f64,
}
impl AdaptiveExecutionEngine {
pub fn new(config: AdaptiveExecutionConfig) -> FusekiResult<Self> {
let profiler = Arc::new(RwLock::new(Profiler::new()));
Ok(Self {
performance_history: Arc::new(RwLock::new(QueryPerformanceHistory::new()?)),
cost_model: Arc::new(StatisticalCostModel::new()?),
graph_optimizer: Arc::new(GraphBasedOptimizer::new()?),
ml_predictor: Arc::new(RwLock::new(PerformancePredictor::new()?)),
profiler,
config: Arc::new(config),
})
}
#[instrument(skip(self, store))]
pub async fn optimize_query(
&self,
query: &str,
store: &Store,
) -> FusekiResult<AdaptiveQueryPlan> {
self.profiler.write().await.start();
let features = self.extract_query_features(query).await?;
let predicted_time = if self.config.enable_ml_prediction {
self.predict_execution_time(&features).await?
} else {
0.0
};
let (cost_estimate, confidence_interval) =
self.estimate_cost_with_confidence(query, &features).await?;
let join_order = self.optimize_join_order(query).await?;
let optimal_plan = if self.config.ga_max_generations > 0 {
self.genetic_plan_optimization(query, &features, &join_order)
.await?
} else {
self.greedy_plan_selection(query, &features).await?
};
let execution_strategy = self
.select_execution_strategy(&features, predicted_time)
.await?;
let adaptive_hints = self.generate_adaptive_hints(query, &features).await?;
self.profiler.write().await.stop();
Ok(AdaptiveQueryPlan {
plan_id: self.generate_plan_id(query),
query_pattern: self.extract_query_pattern(query),
execution_strategy,
estimated_cost: cost_estimate,
confidence_interval,
predicted_execution_time_ms: predicted_time,
predicted_cardinality: self.predict_cardinality(&features).await?,
optimization_method: "adaptive_scirs2".to_string(),
parallel_degree: self.calculate_optimal_parallelism(&features).await?,
adaptive_hints,
})
}
async fn extract_query_features(&self, query: &str) -> FusekiResult<QueryFeatures> {
let query_lower = query.to_lowercase();
let triple_count = query_lower.matches("?").count() as f64 / 3.0; let join_count = query_lower.matches('.').count() as f64;
let filter_count = query_lower.matches("filter").count() as f64;
let optional_count = query_lower.matches("optional").count() as f64;
let union_count = query_lower.matches("union").count() as f64;
let subquery_depth = self.calculate_subquery_depth(query);
let avg_selectivity = self.estimate_selectivity(query, &query_lower).await?;
let graph_complexity = self.calculate_graph_complexity(query).await?;
Ok(QueryFeatures {
triple_count,
join_count,
filter_count,
optional_count,
union_count,
subquery_depth,
avg_selectivity,
graph_complexity,
})
}
async fn predict_execution_time(&self, features: &QueryFeatures) -> FusekiResult<f64> {
let predictor = self.ml_predictor.read().await;
if predictor.sample_count < self.config.min_sample_size {
debug!(
"Insufficient samples for ML prediction: {} < {}",
predictor.sample_count, self.config.min_sample_size
);
return Ok(0.0);
}
let feature_vec = Array1::from_vec(vec![
features.triple_count,
features.join_count,
features.filter_count,
features.optional_count,
features.union_count,
features.subquery_depth,
features.avg_selectivity,
features.graph_complexity,
]);
let normalized = (&feature_vec - &predictor.feature_mean) / &predictor.feature_std;
let prediction = normalized.iter().sum::<f64>() / normalized.len() as f64;
Ok(prediction.max(0.0)) }
async fn estimate_cost_with_confidence(
&self,
query: &str,
features: &QueryFeatures,
) -> FusekiResult<(f64, (f64, f64))> {
let query_pattern = self.extract_query_pattern(query);
let history = self.performance_history.read().await;
if let Some(stats) = history.statistics.get(&query_pattern) {
if stats.sample_count >= self.config.min_sample_size {
let mean = stats.mean_execution_time_ms;
let std_dev = stats.std_dev_execution_time_ms;
let n = stats.sample_count as f64;
let t_value = 1.96;
let margin = t_value * (std_dev / n.sqrt());
let confidence_interval = (mean - margin, mean + margin);
return Ok((mean, confidence_interval));
}
}
let estimated_cost = self.estimate_cost_from_features(features).await?;
let confidence_interval = (estimated_cost * 0.5, estimated_cost * 1.5);
Ok((estimated_cost, confidence_interval))
}
async fn optimize_join_order(&self, query: &str) -> FusekiResult<Vec<String>> {
let optimizer = &self.graph_optimizer;
let join_graph = optimizer.join_graph.read().await;
let node_weights = optimizer.node_weights.read().await;
let predicates = self.extract_predicates(query);
if predicates.is_empty() {
return Ok(vec![]);
}
let mut ordered_joins = Vec::new();
let mut remaining = predicates.to_vec();
while !remaining.is_empty() {
let best_idx = remaining
.iter()
.enumerate()
.min_by(|(_, a), (_, b)| {
let weight_a = node_weights.get(*a).unwrap_or(&1.0);
let weight_b = node_weights.get(*b).unwrap_or(&1.0);
weight_a
.partial_cmp(weight_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(idx, _)| idx)
.unwrap_or(0);
ordered_joins.push(remaining.remove(best_idx));
}
Ok(ordered_joins)
}
async fn genetic_plan_optimization(
&self,
query: &str,
features: &QueryFeatures,
join_order: &[String],
) -> FusekiResult<String> {
if !self.config.enable_parallel_evaluation {
return self.greedy_plan_selection(query, features).await;
}
info!("Using simplified genetic optimization");
Ok(format!("genetic_plan_{}", self.generate_plan_id(query)))
}
async fn greedy_plan_selection(
&self,
query: &str,
features: &QueryFeatures,
) -> FusekiResult<String> {
Ok(format!("greedy_plan_{}", self.generate_plan_id(query)))
}
async fn select_execution_strategy(
&self,
features: &QueryFeatures,
predicted_time_ms: f64,
) -> FusekiResult<ExecutionStrategy> {
if features.join_count >= 3.0 || predicted_time_ms > 1000.0 {
let degree = self.config.parallel_workers.min(8);
Ok(ExecutionStrategy::Parallel { degree })
} else if features.triple_count > 10.0 {
Ok(ExecutionStrategy::WorkStealing {
workers: self.config.parallel_workers,
})
} else {
Ok(ExecutionStrategy::Sequential)
}
}
async fn generate_adaptive_hints(
&self,
query: &str,
features: &QueryFeatures,
) -> FusekiResult<Vec<AdaptiveHint>> {
let mut hints = Vec::new();
if features.join_count >= 3.0 {
hints.push(AdaptiveHint {
hint_type: "parallelization".to_string(),
description: "Query has multiple joins, parallel execution recommended".to_string(),
confidence: 0.9,
expected_improvement: 2.5,
source: "statistical_analysis".to_string(),
});
}
if features.filter_count >= 2.0 {
hints.push(AdaptiveHint {
hint_type: "indexing".to_string(),
description: "Multiple filters detected, consider adding indexes".to_string(),
confidence: 0.85,
expected_improvement: 1.8,
source: "graph_analysis".to_string(),
});
}
if features.subquery_depth >= 2.0 {
hints.push(AdaptiveHint {
hint_type: "materialization".to_string(),
description: "Deep subquery nesting, consider materializing intermediate results"
.to_string(),
confidence: 0.75,
expected_improvement: 1.5,
source: "ml_prediction".to_string(),
});
}
Ok(hints)
}
pub async fn record_execution(&self, query: &str, record: ExecutionRecord) -> FusekiResult<()> {
let query_pattern = self.extract_query_pattern(query);
let mut history = self.performance_history.write().await;
history
.records
.entry(query_pattern.clone())
.or_default()
.push(record.clone());
self.update_statistics(&query_pattern, &mut history).await?;
if self.config.enable_ml_prediction {
self.update_ml_model(&query_pattern, &history).await?;
}
Ok(())
}
async fn update_statistics(
&self,
pattern: &str,
history: &mut QueryPerformanceHistory,
) -> FusekiResult<()> {
let records = history.records.get(pattern).ok_or_else(|| {
FusekiError::internal(format!("No records found for pattern: {}", pattern))
})?;
if records.is_empty() {
return Ok(());
}
let times: Vec<f64> = records.iter().map(|r| r.execution_time_ms).collect();
let cardinalities: Vec<f64> = records
.iter()
.map(|r| r.result_cardinality as f64)
.collect();
let mean_time = times.iter().sum::<f64>() / times.len() as f64;
let variance_time =
times.iter().map(|t| (t - mean_time).powi(2)).sum::<f64>() / times.len() as f64;
let std_dev_time = variance_time.sqrt();
let mut sorted_times = times.clone();
sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let median_time = sorted_times[sorted_times.len() / 2];
let p95_time = sorted_times[(sorted_times.len() as f64 * 0.95) as usize];
let p99_time = sorted_times[(sorted_times.len() as f64 * 0.99) as usize];
let mean_cardinality = cardinalities.iter().sum::<f64>() / cardinalities.len() as f64;
let correlation = self.calculate_correlation(&cardinalities, ×);
let (slope, confidence) = self.fit_trend_line(×);
let stats = QueryStatistics {
sample_count: records.len(),
mean_execution_time_ms: mean_time,
std_dev_execution_time_ms: std_dev_time,
median_execution_time_ms: median_time,
p95_execution_time_ms: p95_time,
p99_execution_time_ms: p99_time,
mean_cardinality,
correlation_cardinality_time: correlation,
trend_slope: slope,
trend_confidence: confidence,
};
history.statistics.insert(pattern.to_string(), stats);
Ok(())
}
async fn update_ml_model(
&self,
pattern: &str,
history: &QueryPerformanceHistory,
) -> FusekiResult<()> {
Ok(())
}
fn generate_plan_id(&self, query: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(query.as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
fn extract_query_pattern(&self, query: &str) -> String {
query
.to_lowercase()
.split_whitespace()
.take(5)
.collect::<Vec<_>>()
.join("_")
}
fn extract_predicates(&self, query: &str) -> Vec<String> {
vec!["pred1".to_string(), "pred2".to_string()]
}
async fn estimate_selectivity(&self, query: &str, query_lower: &str) -> FusekiResult<f64> {
let query_pattern = self.extract_query_pattern(query);
let history = self.performance_history.read().await;
if let Some(stats) = history.statistics.get(&query_pattern) {
if stats.sample_count >= 5 {
let selectivity = if stats.mean_cardinality > 0.0 {
(stats.mean_cardinality.ln() / 10.0).min(1.0).max(0.01)
} else {
0.01 };
debug!(
"Estimated selectivity from history: {:.4} (based on {} samples)",
selectivity, stats.sample_count
);
return Ok(selectivity);
}
}
let mut selectivity = 0.5;
if query_lower.contains("distinct") {
selectivity *= 0.7; }
if query_lower.contains("filter") {
let filter_count = query_lower.matches("filter").count();
selectivity *= 0.6_f64.powi(filter_count as i32); }
if query_lower.contains("optional") {
selectivity *= 1.3; }
if query_lower.contains("union") {
let union_count = query_lower.matches("union").count();
selectivity *= 1.5_f64.powi(union_count as i32); }
if query_lower.contains("limit") {
if let Some(limit_pos) = query_lower.find("limit") {
let after_limit = &query_lower[limit_pos + 5..];
if let Some(number_str) = after_limit.split_whitespace().next() {
if let Ok(limit) = number_str.parse::<f64>() {
selectivity = (limit / 10000.0).min(selectivity);
}
}
}
}
selectivity = selectivity.min(0.95).max(0.01);
debug!(
"Estimated selectivity from patterns: {:.4} (heuristic)",
selectivity
);
Ok(selectivity)
}
fn calculate_subquery_depth(&self, query: &str) -> f64 {
query.matches("SELECT").count() as f64
}
async fn calculate_graph_complexity(&self, query: &str) -> FusekiResult<f64> {
let joins = query.to_lowercase().matches('.').count() as f64;
let optionals = query.to_lowercase().matches("optional").count() as f64;
Ok(joins + optionals * 2.0)
}
async fn estimate_cost_from_features(&self, features: &QueryFeatures) -> FusekiResult<f64> {
Ok(features.triple_count * features.join_count * 10.0)
}
async fn predict_cardinality(&self, features: &QueryFeatures) -> FusekiResult<u64> {
Ok((features.triple_count * 100.0) as u64)
}
async fn calculate_optimal_parallelism(&self, features: &QueryFeatures) -> FusekiResult<usize> {
if features.join_count >= 3.0 {
Ok(self.config.parallel_workers.min(8))
} else {
Ok(1)
}
}
fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
if x.len() != y.len() || x.is_empty() {
return 0.0;
}
let n = x.len() as f64;
let mean_x = x.iter().sum::<f64>() / n;
let mean_y = y.iter().sum::<f64>() / n;
let covariance: f64 = x
.iter()
.zip(y.iter())
.map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
.sum::<f64>()
/ n;
let std_x = (x.iter().map(|xi| (xi - mean_x).powi(2)).sum::<f64>() / n).sqrt();
let std_y = (y.iter().map(|yi| (yi - mean_y).powi(2)).sum::<f64>() / n).sqrt();
if std_x == 0.0 || std_y == 0.0 {
0.0
} else {
covariance / (std_x * std_y)
}
}
fn fit_trend_line(&self, data: &[f64]) -> (f64, f64) {
if data.len() < 2 {
return (0.0, 0.0);
}
let n = data.len() as f64;
let x: Vec<f64> = (0..data.len()).map(|i| i as f64).collect();
let y = data;
let mean_x = x.iter().sum::<f64>() / n;
let mean_y = y.iter().sum::<f64>() / n;
let numerator: f64 = x
.iter()
.zip(y.iter())
.map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
.sum();
let denominator: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
let slope = if denominator != 0.0 {
numerator / denominator
} else {
0.0
};
let predictions: Vec<f64> = x.iter().map(|xi| slope * (xi - mean_x) + mean_y).collect();
let ss_res: f64 = y
.iter()
.zip(predictions.iter())
.map(|(yi, pi)| (yi - pi).powi(2))
.sum();
let ss_tot: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
let r_squared = if ss_tot != 0.0 {
1.0 - (ss_res / ss_tot)
} else {
0.0
};
(slope, r_squared.max(0.0).min(1.0))
}
}
impl QueryPerformanceHistory {
fn new() -> FusekiResult<Self> {
Ok(Self {
records: HashMap::new(),
statistics: HashMap::new(),
})
}
}
impl StatisticalCostModel {
fn new() -> FusekiResult<Self> {
Ok(Self {
join_selectivity: RwLock::new(Array2::zeros((10, 10))),
cost_factors: RwLock::new(Array1::from_vec(vec![1.0; 10])),
})
}
}
impl GraphBasedOptimizer {
fn new() -> FusekiResult<Self> {
Ok(Self {
join_graph: RwLock::new(Array2::zeros((10, 10))),
node_weights: RwLock::new(HashMap::new()),
edge_weights: RwLock::new(HashMap::new()),
})
}
}
impl PerformancePredictor {
fn new() -> FusekiResult<Self> {
Ok(Self {
features: Array2::zeros((0, 8)),
targets: Array1::zeros(0),
feature_mean: Array1::zeros(8),
feature_std: Array1::ones(8),
sample_count: 0,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_adaptive_execution_engine_creation() {
let config = AdaptiveExecutionConfig::default();
let engine = AdaptiveExecutionEngine::new(config);
assert!(engine.is_ok());
}
#[tokio::test]
async fn test_query_feature_extraction() {
let config = AdaptiveExecutionConfig::default();
let engine = AdaptiveExecutionEngine::new(config).unwrap();
let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o . FILTER(?o > 10) }";
let features = engine.extract_query_features(query).await;
assert!(features.is_ok());
let f = features.unwrap();
assert!(f.filter_count >= 1.0);
}
#[tokio::test]
async fn test_execution_strategy_selection() {
let config = AdaptiveExecutionConfig::default();
let engine = AdaptiveExecutionEngine::new(config).unwrap();
let features = QueryFeatures {
triple_count: 10.0,
join_count: 5.0,
filter_count: 2.0,
optional_count: 0.0,
union_count: 0.0,
subquery_depth: 1.0,
avg_selectivity: 0.5,
graph_complexity: 5.0,
};
let strategy = engine.select_execution_strategy(&features, 1500.0).await;
assert!(strategy.is_ok());
if let ExecutionStrategy::Parallel { degree } = strategy.unwrap() {
assert!(degree > 0);
}
}
#[tokio::test]
async fn test_correlation_calculation() {
let config = AdaptiveExecutionConfig::default();
let engine = AdaptiveExecutionEngine::new(config).unwrap();
let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let y = vec![2.0, 4.0, 6.0, 8.0, 10.0];
let correlation = engine.calculate_correlation(&x, &y);
assert!((correlation - 1.0).abs() < 0.01); }
#[tokio::test]
async fn test_trend_line_fitting() {
let config = AdaptiveExecutionConfig::default();
let engine = AdaptiveExecutionEngine::new(config).unwrap();
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let (slope, confidence) = engine.fit_trend_line(&data);
assert!((slope - 1.0).abs() < 0.01); assert!(confidence > 0.99); }
}