#![allow(dead_code)]
use crate::indexing::IndexStats;
use crate::model::Variable;
use crate::query::algebra::{
AlgebraTriplePattern, GraphPattern, Query as AlgebraQuery, QueryForm, TermPattern,
};
use crate::query::plan::{ExecutionPlan, QueryPlanner};
use crate::OxirsError;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct CostModel {
execution_history: Arc<RwLock<QueryHistory>>,
learned_parameters: Arc<RwLock<LearnedParameters>>,
index_stats: Arc<IndexStats>,
}
#[derive(Debug, Default)]
struct QueryHistory {
patterns: VecDeque<(QueryPattern, ExecutionMetrics)>,
max_size: usize,
}
#[derive(Debug, Default)]
struct LearnedParameters {
scan_costs: HashMap<String, f64>,
join_selectivities: HashMap<JoinPattern, f64>,
filter_selectivities: HashMap<String, f64>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
struct QueryPattern {
num_patterns: usize,
predicates: Vec<String>,
join_types: Vec<JoinType>,
has_filter: bool,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
struct JoinPattern {
num_vars: usize,
term_types: Vec<String>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
enum JoinType {
SubjectSubject,
SubjectObject,
ObjectObject,
PredicatePredicate,
}
#[derive(Debug, Clone)]
struct ExecutionMetrics {
execution_time: Duration,
result_count: usize,
memory_used: usize,
cpu_percent: f32,
}
pub struct AIQueryOptimizer {
base_planner: QueryPlanner,
cost_model: CostModel,
query_cache: Arc<RwLock<QueryCache>>,
hardware_info: HardwareInfo,
}
#[derive(Debug, Default)]
struct QueryCache {
cache: HashMap<String, CachedResult>,
access_patterns: VecDeque<AccessPattern>,
max_size: usize,
}
#[derive(Debug, Clone)]
struct CachedResult {
data: Vec<u8>,
cached_at: Instant,
access_count: usize,
last_accessed: Instant,
}
#[derive(Debug, Clone)]
struct AccessPattern {
query_hash: String,
accessed_at: Instant,
session_id: String,
}
#[derive(Debug, Clone)]
struct HardwareInfo {
cpu_cores: usize,
memory_bytes: usize,
cpu_features: CpuFeatures,
gpu_available: bool,
}
#[derive(Debug, Clone)]
struct CpuFeatures {
has_simd: bool,
has_avx2: bool,
cache_line_size: usize,
}
impl AIQueryOptimizer {
pub fn new(index_stats: Arc<IndexStats>) -> Self {
Self {
base_planner: QueryPlanner::new(),
cost_model: CostModel::new(index_stats),
query_cache: Arc::new(RwLock::new(QueryCache::new())),
hardware_info: HardwareInfo::detect(),
}
}
pub fn optimize_query(&self, query: &AlgebraQuery) -> Result<OptimizedPlan, OxirsError> {
let pattern = self.extract_query_pattern(query)?;
if let Some(cached) = self.check_predictive_cache(&pattern) {
return Ok(cached);
}
let candidates = self.generate_candidate_plans(query)?;
let mut best_plan = None;
let mut best_cost = f64::MAX;
for candidate in candidates {
let cost = self.estimate_cost(&candidate, &pattern)?;
if cost < best_cost {
best_cost = cost;
best_plan = Some(candidate);
}
}
let plan = best_plan
.ok_or_else(|| OxirsError::Query("No valid execution plan found".to_string()))?;
let optimized = self.apply_hardware_optimizations(plan)?;
self.update_learning_model(&pattern, &optimized);
Ok(optimized)
}
fn extract_query_pattern(&self, query: &AlgebraQuery) -> Result<QueryPattern, OxirsError> {
match &query.form {
QueryForm::Select { where_clause, .. } => {
let (num_patterns, predicates, join_types) =
self.analyze_graph_pattern(where_clause)?;
Ok(QueryPattern {
num_patterns,
predicates,
join_types,
has_filter: self.has_filter(where_clause),
})
}
_ => Err(OxirsError::Query("Unsupported query form".to_string())),
}
}
fn analyze_graph_pattern(
&self,
pattern: &GraphPattern,
) -> Result<(usize, Vec<String>, Vec<JoinType>), OxirsError> {
match pattern {
GraphPattern::Bgp(patterns) => {
let num_patterns = patterns.len();
let mut predicates = Vec::new();
let mut join_types = Vec::new();
for triple in patterns {
if let TermPattern::NamedNode(pred) = &triple.predicate {
predicates.push(pred.as_str().to_string());
}
}
for i in 0..patterns.len() {
for j in (i + 1)..patterns.len() {
if let Some(join_type) = self.get_join_type(&patterns[i], &patterns[j]) {
join_types.push(join_type);
}
}
}
Ok((num_patterns, predicates, join_types))
}
_ => Ok((0, Vec::new(), Vec::new())),
}
}
fn get_join_type(
&self,
left: &AlgebraTriplePattern,
right: &AlgebraTriplePattern,
) -> Option<JoinType> {
if self.patterns_match(&left.subject, &right.subject) {
return Some(JoinType::SubjectSubject);
}
if self.patterns_match(&left.subject, &right.object) {
return Some(JoinType::SubjectObject);
}
if self.patterns_match(&left.object, &right.object) {
return Some(JoinType::ObjectObject);
}
if self.patterns_match(&left.predicate, &right.predicate) {
return Some(JoinType::PredicatePredicate);
}
None
}
fn patterns_match(&self, left: &TermPattern, right: &TermPattern) -> bool {
match (left, right) {
(TermPattern::Variable(v1), TermPattern::Variable(v2)) => v1 == v2,
_ => false,
}
}
#[allow(clippy::only_used_in_recursion)]
fn has_filter(&self, pattern: &GraphPattern) -> bool {
match pattern {
GraphPattern::Filter { .. } => true,
GraphPattern::Bgp(_) => false,
GraphPattern::Union(left, right) => self.has_filter(left) || self.has_filter(right),
_ => false,
}
}
fn generate_candidate_plans(
&self,
query: &AlgebraQuery,
) -> Result<Vec<ExecutionPlan>, OxirsError> {
let mut candidates = Vec::new();
let basic_plan = self.base_planner.plan_query(query)?;
candidates.push(basic_plan.clone());
if let QueryForm::Select {
where_clause: GraphPattern::Bgp(patterns),
..
} = &query.form
{
let join_orders = self.generate_join_orders(patterns);
for order in join_orders {
if let Ok(plan) = self.create_plan_with_order(patterns, &order) {
candidates.push(plan);
}
}
}
candidates.extend(self.generate_index_plans(query)?);
Ok(candidates)
}
fn generate_join_orders(&self, patterns: &[AlgebraTriplePattern]) -> Vec<Vec<usize>> {
let mut orders = Vec::new();
orders.push((0..patterns.len()).collect());
let mut selective_order: Vec<usize> = (0..patterns.len()).collect();
selective_order.sort_by_key(|&i| self.estimate_selectivity(&patterns[i]));
orders.push(selective_order);
orders.truncate(5);
orders
}
fn estimate_selectivity(&self, pattern: &AlgebraTriplePattern) -> i64 {
let mut score = 0;
if !matches!(pattern.subject, TermPattern::Variable(_)) {
score -= 1000;
}
if !matches!(pattern.predicate, TermPattern::Variable(_)) {
score -= 100;
}
if !matches!(pattern.object, TermPattern::Variable(_)) {
score -= 1000;
}
score
}
fn create_plan_with_order(
&self,
patterns: &[AlgebraTriplePattern],
order: &[usize],
) -> Result<ExecutionPlan, OxirsError> {
if order.is_empty() {
return Err(OxirsError::Query("Empty join order".to_string()));
}
let mut plan = ExecutionPlan::TripleScan {
pattern: crate::query::plan::convert_algebra_triple_pattern(&patterns[order[0]]),
};
for &idx in &order[1..] {
let right_plan = ExecutionPlan::TripleScan {
pattern: crate::query::plan::convert_algebra_triple_pattern(&patterns[idx]),
};
plan = ExecutionPlan::HashJoin {
left: Box::new(plan),
right: Box::new(right_plan),
join_vars: Vec::new(), };
}
Ok(plan)
}
fn generate_index_plans(
&self,
_query: &AlgebraQuery,
) -> Result<Vec<ExecutionPlan>, OxirsError> {
Ok(Vec::new())
}
fn estimate_cost(
&self,
plan: &ExecutionPlan,
pattern: &QueryPattern,
) -> Result<f64, OxirsError> {
let params = self
.cost_model
.learned_parameters
.read()
.map_err(|e| OxirsError::Query(format!("Failed to read parameters: {e}")))?;
let base_cost = self.estimate_plan_cost(plan, ¶ms)?;
let history_factor = self.get_history_factor(pattern);
Ok(base_cost * history_factor)
}
#[allow(clippy::only_used_in_recursion)]
fn estimate_plan_cost(
&self,
plan: &ExecutionPlan,
params: &LearnedParameters,
) -> Result<f64, OxirsError> {
match plan {
ExecutionPlan::TripleScan { pattern } => {
let mut cost = 100.0;
if let Some(crate::model::pattern::PredicatePattern::NamedNode(pred)) =
&pattern.predicate
{
if let Some(&pred_cost) = params.scan_costs.get(pred.as_str()) {
cost *= pred_cost;
}
}
Ok(cost)
}
ExecutionPlan::HashJoin { left, right, .. } => {
let left_cost = self.estimate_plan_cost(left, params)?;
let right_cost = self.estimate_plan_cost(right, params)?;
Ok(left_cost + right_cost + (left_cost * right_cost * 0.01))
}
ExecutionPlan::Filter { input, .. } => {
let input_cost = self.estimate_plan_cost(input, params)?;
Ok(input_cost * 0.5)
}
_ => Ok(1000.0), }
}
fn get_history_factor(&self, pattern: &QueryPattern) -> f64 {
if let Ok(history) = self.cost_model.execution_history.read() {
for (hist_pattern, metrics) in history.patterns.iter() {
if self.patterns_similar(pattern, hist_pattern) {
return if metrics.execution_time.as_millis() < 100 {
0.8 } else {
1.2 };
}
}
}
1.0 }
fn patterns_similar(&self, a: &QueryPattern, b: &QueryPattern) -> bool {
a.num_patterns == b.num_patterns
&& a.has_filter == b.has_filter
&& a.predicates.len() == b.predicates.len()
}
fn check_predictive_cache(&self, _pattern: &QueryPattern) -> Option<OptimizedPlan> {
None
}
fn apply_hardware_optimizations(
&self,
plan: ExecutionPlan,
) -> Result<OptimizedPlan, OxirsError> {
let mut optimized = OptimizedPlan {
base_plan: plan,
parallelism_level: 1,
use_simd: false,
use_gpu: false,
memory_budget: 0,
};
optimized.parallelism_level = self.calculate_optimal_parallelism();
optimized.use_simd = self.hardware_info.cpu_features.has_simd;
optimized.use_gpu =
self.hardware_info.gpu_available && self.should_use_gpu(&optimized.base_plan);
optimized.memory_budget = self.calculate_memory_budget();
Ok(optimized)
}
fn calculate_optimal_parallelism(&self) -> usize {
(self.hardware_info.cpu_cores as f32 * 0.75) as usize
}
fn should_use_gpu(&self, _plan: &ExecutionPlan) -> bool {
false }
fn calculate_memory_budget(&self) -> usize {
self.hardware_info.memory_bytes / 2
}
fn update_learning_model(&self, pattern: &QueryPattern, _plan: &OptimizedPlan) {
if let Ok(mut history) = self.cost_model.execution_history.write() {
let metrics = ExecutionMetrics {
execution_time: Duration::from_millis(50), result_count: 100, memory_used: 1024 * 1024, cpu_percent: 25.0, };
history.add_execution(pattern.clone(), metrics);
}
}
}
#[derive(Debug)]
pub struct OptimizedPlan {
pub base_plan: ExecutionPlan,
pub parallelism_level: usize,
pub use_simd: bool,
pub use_gpu: bool,
pub memory_budget: usize,
}
impl CostModel {
fn new(index_stats: Arc<IndexStats>) -> Self {
Self {
execution_history: Arc::new(RwLock::new(QueryHistory::new())),
learned_parameters: Arc::new(RwLock::new(LearnedParameters::default())),
index_stats,
}
}
}
impl QueryHistory {
fn new() -> Self {
Self {
patterns: VecDeque::new(),
max_size: 10000,
}
}
fn add_execution(&mut self, pattern: QueryPattern, metrics: ExecutionMetrics) {
self.patterns.push_back((pattern, metrics));
while self.patterns.len() > self.max_size {
self.patterns.pop_front();
}
}
}
impl QueryCache {
fn new() -> Self {
Self {
cache: HashMap::new(),
access_patterns: VecDeque::new(),
max_size: 1000,
}
}
}
impl HardwareInfo {
fn detect() -> Self {
Self {
cpu_cores: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1),
memory_bytes: 8 * 1024 * 1024 * 1024, cpu_features: CpuFeatures {
has_simd: cfg!(target_feature = "sse2"),
has_avx2: cfg!(target_feature = "avx2"),
cache_line_size: 64,
},
gpu_available: false, }
}
}
pub struct MultiQueryOptimizer {
single_optimizer: AIQueryOptimizer,
subexpression_cache: Arc<RwLock<HashMap<String, ExecutionPlan>>>,
}
impl MultiQueryOptimizer {
pub fn new(index_stats: Arc<IndexStats>) -> Self {
Self {
single_optimizer: AIQueryOptimizer::new(index_stats),
subexpression_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn optimize_batch(
&self,
queries: &[AlgebraQuery],
) -> Result<Vec<OptimizedPlan>, OxirsError> {
let common_subs = self.detect_common_subexpressions(queries)?;
let mut optimized_plans = Vec::new();
for query in queries {
let mut plan = self.single_optimizer.optimize_query(query)?;
plan = self.reuse_common_subexpressions(plan, &common_subs)?;
optimized_plans.push(plan);
}
Ok(optimized_plans)
}
fn detect_common_subexpressions(
&self,
queries: &[AlgebraQuery],
) -> Result<HashMap<String, ExecutionPlan>, OxirsError> {
let mut common_subs = HashMap::new();
let mut pattern_counts = HashMap::new();
for query in queries {
self.count_patterns(query, &mut pattern_counts)?;
}
for (pattern_key, count) in pattern_counts {
if count > 1 {
common_subs.insert(
pattern_key,
ExecutionPlan::TripleScan {
pattern: crate::model::pattern::TriplePattern::new(
Some(crate::model::pattern::SubjectPattern::Variable(
Variable::new("?s").expect("variable name is valid"),
)),
Some(crate::model::pattern::PredicatePattern::Variable(
Variable::new("?p").expect("variable name is valid"),
)),
Some(crate::model::pattern::ObjectPattern::Variable(
Variable::new("?o").expect("variable name is valid"),
)),
),
},
);
}
}
Ok(common_subs)
}
fn count_patterns(
&self,
query: &AlgebraQuery,
counts: &mut HashMap<String, usize>,
) -> Result<(), OxirsError> {
if let QueryForm::Select { where_clause, .. } = &query.form {
self.count_graph_patterns(where_clause, counts)?;
}
Ok(())
}
fn count_graph_patterns(
&self,
pattern: &GraphPattern,
counts: &mut HashMap<String, usize>,
) -> Result<(), OxirsError> {
if let GraphPattern::Bgp(patterns) = pattern {
for triple in patterns {
let key = format!("{triple}"); *counts.entry(key).or_insert(0) += 1;
}
}
Ok(())
}
fn reuse_common_subexpressions(
&self,
plan: OptimizedPlan,
_common: &HashMap<String, ExecutionPlan>,
) -> Result<OptimizedPlan, OxirsError> {
Ok(plan)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ai_optimizer_creation() {
let stats = Arc::new(IndexStats::new());
let optimizer = AIQueryOptimizer::new(stats);
assert!(optimizer.hardware_info.cpu_cores > 0);
}
#[test]
fn test_cost_model() {
let stats = Arc::new(IndexStats::new());
let model = CostModel::new(stats);
let history = model
.execution_history
.read()
.expect("lock should not be poisoned");
assert_eq!(history.patterns.len(), 0);
}
#[test]
fn test_hardware_detection() {
let hw = HardwareInfo::detect();
assert!(hw.cpu_cores > 0);
assert!(hw.memory_bytes > 0);
assert_eq!(hw.cpu_features.cache_line_size, 64);
}
}