oxirs_federate/
optimization_cache.rs

1//! Intelligent Query Optimization Cache Module
2//!
3//! This module provides an advanced caching system for query optimization plans,
4//! execution strategies, and performance metrics to significantly improve federation
5//! query performance through intelligent plan reuse and adaptation.
6
7use anyhow::Result;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::hash::{Hash, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime};
13use tokio::sync::RwLock;
14use tracing::{debug, info};
15
16use crate::planner::planning::types::{ExecutionPlan, QueryInfo, QueryType};
17
18/// Configuration for the optimization cache
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizationCacheConfig {
21    /// Maximum number of cached plans
22    pub max_cached_plans: usize,
23    /// Plan cache TTL (time to live)
24    pub plan_ttl: Duration,
25    /// Enable adaptive caching based on performance
26    pub enable_adaptive_caching: bool,
27    /// Performance improvement threshold for caching
28    pub performance_threshold: f64,
29    /// Enable plan similarity matching
30    pub enable_similarity_matching: bool,
31    /// Similarity threshold for plan matching
32    pub similarity_threshold: f64,
33    /// Maximum age for performance data
34    pub max_performance_age: Duration,
35    /// Enable cache warming
36    pub enable_cache_warming: bool,
37    /// Cache warming interval
38    pub cache_warming_interval: Duration,
39}
40
41impl Default for OptimizationCacheConfig {
42    fn default() -> Self {
43        Self {
44            max_cached_plans: 1000,
45            plan_ttl: Duration::from_secs(3600), // 1 hour
46            enable_adaptive_caching: true,
47            performance_threshold: 0.15, // 15% improvement
48            enable_similarity_matching: true,
49            similarity_threshold: 0.8,
50            max_performance_age: Duration::from_secs(7200), // 2 hours
51            enable_cache_warming: true,
52            cache_warming_interval: Duration::from_secs(300), // 5 minutes
53        }
54    }
55}
56
57/// Query fingerprint for efficient cache lookups
58#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
59pub struct QueryFingerprint {
60    /// Query type
61    pub query_type: QueryType,
62    /// Pattern count
63    pub pattern_count: usize,
64    /// Variable count
65    pub variable_count: usize,
66    /// Filter count
67    pub filter_count: usize,
68    /// Complexity score (bucketed)
69    pub complexity_bucket: u8,
70    /// Service count
71    pub service_count: usize,
72    /// Query structure hash
73    pub structure_hash: u64,
74}
75
76impl QueryFingerprint {
77    /// Create a fingerprint from query information
78    pub fn from_query_info(query_info: &QueryInfo) -> Self {
79        let mut hasher = std::collections::hash_map::DefaultHasher::new();
80        query_info.original_query.hash(&mut hasher);
81
82        Self {
83            query_type: query_info.query_type,
84            pattern_count: query_info.patterns.len(),
85            variable_count: query_info.variables.len(),
86            filter_count: query_info.filters.len(),
87            complexity_bucket: Self::bucket_complexity(query_info.complexity),
88            service_count: 1, // Will be updated based on planning
89            structure_hash: hasher.finish(),
90        }
91    }
92
93    /// Bucket complexity score for better cache hit rates
94    fn bucket_complexity(complexity: u64) -> u8 {
95        match complexity {
96            0..=10 => 1,
97            11..=50 => 2,
98            51..=100 => 3,
99            101..=500 => 4,
100            501..=1000 => 5,
101            _ => 6,
102        }
103    }
104
105    /// Calculate similarity score with another fingerprint
106    pub fn similarity(&self, other: &QueryFingerprint) -> f64 {
107        let mut score = 0.0;
108        let mut total_weight = 0.0;
109
110        // Query type match (high weight)
111        if self.query_type == other.query_type {
112            score += 3.0;
113        }
114        total_weight += 3.0;
115
116        // Pattern count similarity
117        let pattern_similarity = 1.0
118            - ((self.pattern_count as f64 - other.pattern_count as f64).abs()
119                / (self.pattern_count.max(other.pattern_count) as f64 + 1.0));
120        score += pattern_similarity * 2.0;
121        total_weight += 2.0;
122
123        // Variable count similarity
124        let variable_similarity = 1.0
125            - ((self.variable_count as f64 - other.variable_count as f64).abs()
126                / (self.variable_count.max(other.variable_count) as f64 + 1.0));
127        score += variable_similarity * 1.5;
128        total_weight += 1.5;
129
130        // Complexity bucket match
131        if self.complexity_bucket == other.complexity_bucket {
132            score += 1.0;
133        }
134        total_weight += 1.0;
135
136        // Service count similarity
137        let service_similarity = 1.0
138            - ((self.service_count as f64 - other.service_count as f64).abs()
139                / (self.service_count.max(other.service_count) as f64 + 1.0));
140        score += service_similarity * 1.0;
141        total_weight += 1.0;
142
143        score / total_weight
144    }
145}
146
147/// Cached execution plan with metadata
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CachedPlan {
150    /// The execution plan
151    pub plan: ExecutionPlan,
152    /// When the plan was cached
153    pub cached_at: SystemTime,
154    /// Number of times this plan was used
155    pub usage_count: u32,
156    /// Average execution time when using this plan
157    pub avg_execution_time: Duration,
158    /// Success rate for this plan
159    pub success_rate: f64,
160    /// Services used in this plan
161    pub services: Vec<String>,
162    /// Performance improvement over baseline
163    pub performance_improvement: Option<f64>,
164    /// Query fingerprint for similarity matching
165    pub fingerprint: QueryFingerprint,
166}
167
168impl CachedPlan {
169    /// Create a new cached plan
170    pub fn new(plan: ExecutionPlan, fingerprint: QueryFingerprint) -> Self {
171        Self {
172            plan,
173            cached_at: SystemTime::now(),
174            usage_count: 0,
175            avg_execution_time: Duration::from_millis(0),
176            success_rate: 1.0,
177            services: Vec::new(),
178            performance_improvement: None,
179            fingerprint,
180        }
181    }
182
183    /// Update performance metrics
184    pub fn update_metrics(&mut self, execution_time: Duration, success: bool) {
185        self.usage_count += 1;
186
187        // Update average execution time
188        let current_avg_ms = self.avg_execution_time.as_millis() as f64;
189        let new_time_ms = execution_time.as_millis() as f64;
190        let new_avg_ms = (current_avg_ms * (self.usage_count - 1) as f64 + new_time_ms)
191            / self.usage_count as f64;
192        self.avg_execution_time = Duration::from_millis(new_avg_ms as u64);
193
194        // Update success rate
195        let current_successes = (self.success_rate * (self.usage_count - 1) as f64).round() as u32;
196        let new_successes = if success {
197            current_successes + 1
198        } else {
199            current_successes
200        };
201        self.success_rate = new_successes as f64 / self.usage_count as f64;
202    }
203
204    /// Check if the plan is still valid (not expired)
205    pub fn is_valid(&self, ttl: Duration) -> bool {
206        match self.cached_at.elapsed() {
207            Ok(age) => age < ttl,
208            Err(_) => false,
209        }
210    }
211
212    /// Calculate plan score for ranking
213    pub fn calculate_score(&self) -> f64 {
214        let time_factor = 1.0 / (self.avg_execution_time.as_millis() as f64 + 1.0);
215        let usage_factor = (self.usage_count as f64).ln() + 1.0;
216        let success_factor = self.success_rate;
217        let improvement_factor = self.performance_improvement.unwrap_or(0.0) + 1.0;
218
219        time_factor * usage_factor * success_factor * improvement_factor
220    }
221}
222
223/// Performance data for cache optimization
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct PerformanceData {
226    /// Execution time
227    pub execution_time: Duration,
228    /// Whether execution was successful
229    pub success: bool,
230    /// Memory usage
231    pub memory_usage: u64,
232    /// Services contacted
233    pub services_contacted: Vec<String>,
234    /// Timestamp
235    pub timestamp: SystemTime,
236}
237
238/// Statistics for the optimization cache
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheStatistics {
241    /// Total cache hits
242    pub hits: u64,
243    /// Total cache misses
244    pub misses: u64,
245    /// Total evictions
246    pub evictions: u64,
247    /// Average plan reuse count
248    pub avg_reuse_count: f64,
249    /// Cache effectiveness (hit rate)
250    pub hit_rate: f64,
251    /// Performance improvement from caching
252    pub performance_improvement: f64,
253    /// Last updated
254    pub last_updated: SystemTime,
255}
256
257impl Default for CacheStatistics {
258    fn default() -> Self {
259        Self {
260            hits: 0,
261            misses: 0,
262            evictions: 0,
263            avg_reuse_count: 0.0,
264            hit_rate: 0.0,
265            performance_improvement: 0.0,
266            last_updated: SystemTime::now(),
267        }
268    }
269}
270
271/// Intelligent query optimization cache
272pub struct OptimizationCache {
273    config: OptimizationCacheConfig,
274    cached_plans: Arc<RwLock<HashMap<QueryFingerprint, CachedPlan>>>,
275    performance_history: Arc<RwLock<VecDeque<PerformanceData>>>,
276    statistics: Arc<RwLock<CacheStatistics>>,
277    last_warming: Arc<RwLock<Instant>>,
278}
279
280impl OptimizationCache {
281    /// Create a new optimization cache
282    pub fn new(config: OptimizationCacheConfig) -> Self {
283        Self {
284            config,
285            cached_plans: Arc::new(RwLock::new(HashMap::new())),
286            performance_history: Arc::new(RwLock::new(VecDeque::new())),
287            statistics: Arc::new(RwLock::new(CacheStatistics::default())),
288            last_warming: Arc::new(RwLock::new(Instant::now())),
289        }
290    }
291
292    /// Look up a cached plan for a query
293    pub async fn get_plan(&self, fingerprint: &QueryFingerprint) -> Option<CachedPlan> {
294        let cached_plans = self.cached_plans.read().await;
295
296        // Direct lookup first
297        if let Some(plan) = cached_plans.get(fingerprint) {
298            if plan.is_valid(self.config.plan_ttl) {
299                self.record_hit().await;
300                debug!("Cache hit for fingerprint: {:?}", fingerprint);
301                return Some(plan.clone());
302            }
303        }
304
305        // Similarity-based lookup if enabled
306        if self.config.enable_similarity_matching {
307            let mut best_match: Option<CachedPlan> = None;
308            let mut best_similarity = 0.0;
309
310            for plan in cached_plans.values() {
311                if !plan.is_valid(self.config.plan_ttl) {
312                    continue;
313                }
314
315                let similarity = fingerprint.similarity(&plan.fingerprint);
316                if similarity > self.config.similarity_threshold && similarity > best_similarity {
317                    best_similarity = similarity;
318                    best_match = Some(plan.clone());
319                }
320            }
321
322            if let Some(plan) = best_match {
323                self.record_hit().await;
324                debug!("Similarity cache hit with score: {:.3}", best_similarity);
325                return Some(plan);
326            }
327        }
328
329        self.record_miss().await;
330        None
331    }
332
333    /// Cache an execution plan
334    pub async fn cache_plan(&self, fingerprint: QueryFingerprint, plan: ExecutionPlan) {
335        let mut cached_plans = self.cached_plans.write().await;
336
337        // Check cache size limit
338        if cached_plans.len() >= self.config.max_cached_plans {
339            self.evict_least_valuable(&mut cached_plans).await;
340        }
341
342        let cached_plan = CachedPlan::new(plan, fingerprint.clone());
343        cached_plans.insert(fingerprint.clone(), cached_plan);
344
345        info!(
346            "Cached new execution plan with fingerprint: {:?}",
347            fingerprint
348        );
349    }
350
351    /// Update performance metrics for a cached plan
352    pub async fn update_performance(
353        &self,
354        fingerprint: &QueryFingerprint,
355        execution_time: Duration,
356        success: bool,
357    ) {
358        let mut cached_plans = self.cached_plans.write().await;
359
360        if let Some(plan) = cached_plans.get_mut(fingerprint) {
361            plan.update_metrics(execution_time, success);
362            debug!("Updated performance metrics for plan: {:?}", fingerprint);
363        }
364
365        // Also update performance history
366        let performance_data = PerformanceData {
367            execution_time,
368            success,
369            memory_usage: 0,                // Would be provided by caller
370            services_contacted: Vec::new(), // Would be provided by caller
371            timestamp: SystemTime::now(),
372        };
373
374        let mut history = self.performance_history.write().await;
375        history.push_back(performance_data);
376
377        // Limit history size
378        while history.len() > 10000 {
379            history.pop_front();
380        }
381    }
382
383    /// Evict the least valuable cached plan
384    async fn evict_least_valuable(&self, cached_plans: &mut HashMap<QueryFingerprint, CachedPlan>) {
385        if cached_plans.is_empty() {
386            return;
387        }
388
389        let mut lowest_score = f64::INFINITY;
390        let mut evict_key: Option<QueryFingerprint> = None;
391
392        for (fingerprint, plan) in cached_plans.iter() {
393            let score = plan.calculate_score();
394            if score < lowest_score {
395                lowest_score = score;
396                evict_key = Some(fingerprint.clone());
397            }
398        }
399
400        if let Some(key) = evict_key {
401            cached_plans.remove(&key);
402            self.record_eviction().await;
403            debug!("Evicted plan with lowest score: {:.3}", lowest_score);
404        }
405    }
406
407    /// Record a cache hit
408    async fn record_hit(&self) {
409        let mut stats = self.statistics.write().await;
410        stats.hits += 1;
411        stats.hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
412        stats.last_updated = SystemTime::now();
413    }
414
415    /// Record a cache miss
416    async fn record_miss(&self) {
417        let mut stats = self.statistics.write().await;
418        stats.misses += 1;
419        stats.hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
420        stats.last_updated = SystemTime::now();
421    }
422
423    /// Record an eviction
424    async fn record_eviction(&self) {
425        let mut stats = self.statistics.write().await;
426        stats.evictions += 1;
427        stats.last_updated = SystemTime::now();
428    }
429
430    /// Get cache statistics
431    pub async fn get_statistics(&self) -> CacheStatistics {
432        let stats = self.statistics.read().await;
433        stats.clone()
434    }
435
436    /// Warm the cache with commonly used query patterns
437    pub async fn warm_cache(&self) -> Result<()> {
438        let now = Instant::now();
439        let last_warming = self.last_warming.read().await;
440
441        if now.duration_since(*last_warming) < self.config.cache_warming_interval {
442            return Ok(());
443        }
444
445        info!("Starting cache warming process");
446
447        // Analyze performance history to identify patterns
448        let history = self.performance_history.read().await;
449        let mut pattern_performance: HashMap<String, Vec<Duration>> = HashMap::new();
450
451        for data in history.iter() {
452            if data.success {
453                // Group by execution time buckets for pattern analysis
454                let bucket = Self::get_time_bucket(data.execution_time);
455                pattern_performance
456                    .entry(bucket)
457                    .or_default()
458                    .push(data.execution_time);
459            }
460        }
461
462        // Pre-create fingerprints for common patterns
463        let common_patterns = vec![
464            ("simple_select", QueryType::Select, 1, 1, 0),
465            ("complex_select", QueryType::Select, 5, 3, 2),
466            ("construct", QueryType::Construct, 3, 2, 1),
467            ("ask", QueryType::Ask, 1, 0, 0),
468        ];
469
470        for (name, query_type, patterns, vars, filters) in common_patterns {
471            let fingerprint = QueryFingerprint {
472                query_type,
473                pattern_count: patterns,
474                variable_count: vars,
475                filter_count: filters,
476                complexity_bucket: 2,
477                service_count: 1,
478                structure_hash: name
479                    .as_bytes()
480                    .iter()
481                    .fold(0u64, |acc, &b| acc.wrapping_mul(31).wrapping_add(b as u64)),
482            };
483
484            // Check if we should pre-warm this pattern
485            if !self.cached_plans.read().await.contains_key(&fingerprint) {
486                debug!("Would pre-warm pattern: {}", name);
487                // In a real implementation, we would create optimized plans here
488            }
489        }
490
491        // Update last warming time
492        *self.last_warming.write().await = now;
493
494        info!("Cache warming completed");
495        Ok(())
496    }
497
498    /// Get time bucket for performance analysis
499    fn get_time_bucket(duration: Duration) -> String {
500        let ms = duration.as_millis();
501        match ms {
502            0..=100 => "fast".to_string(),
503            101..=1000 => "medium".to_string(),
504            1001..=5000 => "slow".to_string(),
505            _ => "very_slow".to_string(),
506        }
507    }
508
509    /// Analyze cache effectiveness and provide recommendations
510    pub async fn analyze_effectiveness(&self) -> Result<CacheAnalysis> {
511        let stats = self.statistics.read().await;
512        let cached_plans = self.cached_plans.read().await;
513        let _performance_history = self.performance_history.read().await;
514
515        let total_requests = stats.hits + stats.misses;
516        let hit_rate = if total_requests > 0 {
517            stats.hits as f64 / total_requests as f64
518        } else {
519            0.0
520        };
521
522        let mut avg_reuse = 0.0;
523        if !cached_plans.is_empty() {
524            avg_reuse = cached_plans
525                .values()
526                .map(|p| p.usage_count as f64)
527                .sum::<f64>()
528                / cached_plans.len() as f64;
529        }
530
531        let recommendations = self
532            .generate_recommendations(hit_rate, avg_reuse, &cached_plans)
533            .await;
534
535        Ok(CacheAnalysis {
536            hit_rate,
537            avg_reuse_count: avg_reuse,
538            total_cached_plans: cached_plans.len(),
539            memory_usage_estimate: cached_plans.len() * 1024, // Rough estimate
540            recommendations,
541            effectiveness_score: self.calculate_effectiveness_score(hit_rate, avg_reuse),
542        })
543    }
544
545    /// Generate optimization recommendations
546    async fn generate_recommendations(
547        &self,
548        hit_rate: f64,
549        avg_reuse: f64,
550        cached_plans: &HashMap<QueryFingerprint, CachedPlan>,
551    ) -> Vec<String> {
552        let mut recommendations = Vec::new();
553
554        if hit_rate < 0.3 {
555            recommendations.push(
556                "Consider increasing cache size or adjusting similarity threshold".to_string(),
557            );
558        }
559
560        if avg_reuse < 2.0 {
561            recommendations
562                .push("Query patterns show low reuse - consider query optimization".to_string());
563        }
564
565        if cached_plans.len() < self.config.max_cached_plans / 10 {
566            recommendations
567                .push("Cache utilization is low - consider more aggressive caching".to_string());
568        }
569
570        let expired_count = cached_plans
571            .values()
572            .filter(|p| !p.is_valid(self.config.plan_ttl))
573            .count();
574
575        if expired_count > cached_plans.len() / 4 {
576            recommendations.push("Many cached plans are expired - consider longer TTL".to_string());
577        }
578
579        recommendations
580    }
581
582    /// Calculate overall effectiveness score
583    fn calculate_effectiveness_score(&self, hit_rate: f64, avg_reuse: f64) -> f64 {
584        let hit_score = hit_rate;
585        let reuse_score = (avg_reuse - 1.0).max(0.0) / 10.0; // Normalize to 0-1 range
586
587        (hit_score * 0.7 + reuse_score * 0.3).min(1.0)
588    }
589
590    /// Clean expired entries from the cache
591    pub async fn cleanup_expired(&self) -> Result<usize> {
592        let mut cached_plans = self.cached_plans.write().await;
593        let initial_count = cached_plans.len();
594
595        cached_plans.retain(|_, plan| plan.is_valid(self.config.plan_ttl));
596
597        let removed_count = initial_count - cached_plans.len();
598        if removed_count > 0 {
599            info!("Cleaned up {} expired cache entries", removed_count);
600        }
601
602        Ok(removed_count)
603    }
604}
605
606/// Cache effectiveness analysis result
607#[derive(Debug, Serialize, Deserialize)]
608pub struct CacheAnalysis {
609    pub hit_rate: f64,
610    pub avg_reuse_count: f64,
611    pub total_cached_plans: usize,
612    pub memory_usage_estimate: usize,
613    pub recommendations: Vec<String>,
614    pub effectiveness_score: f64,
615}
616
617impl Default for OptimizationCache {
618    fn default() -> Self {
619        Self::new(OptimizationCacheConfig::default())
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[tokio::test]
628    async fn test_query_fingerprint_similarity() {
629        let fp1 = QueryFingerprint {
630            query_type: QueryType::Select,
631            pattern_count: 3,
632            variable_count: 2,
633            filter_count: 1,
634            complexity_bucket: 2,
635            service_count: 1,
636            structure_hash: 12345,
637        };
638
639        let fp2 = QueryFingerprint {
640            query_type: QueryType::Select,
641            pattern_count: 3,
642            variable_count: 2,
643            filter_count: 1,
644            complexity_bucket: 2,
645            service_count: 1,
646            structure_hash: 54321,
647        };
648
649        let similarity = fp1.similarity(&fp2);
650        assert!(
651            similarity > 0.8,
652            "Similar queries should have high similarity score"
653        );
654    }
655
656    #[tokio::test]
657    async fn test_cache_operations() {
658        let cache = OptimizationCache::default();
659
660        let fingerprint = QueryFingerprint {
661            query_type: QueryType::Select,
662            pattern_count: 1,
663            variable_count: 1,
664            filter_count: 0,
665            complexity_bucket: 1,
666            service_count: 1,
667            structure_hash: 67890,
668        };
669
670        // Test cache miss
671        assert!(cache.get_plan(&fingerprint).await.is_none());
672
673        // Test cache storage and hit
674        let plan = ExecutionPlan {
675            query_id: "test-query".to_string(),
676            steps: Vec::new(),
677            estimated_total_cost: 100.0,
678            max_parallelism: 4,
679            planning_time: Duration::from_millis(50),
680            cache_key: None,
681            metadata: HashMap::new(),
682            parallelizable_steps: Vec::new(),
683        };
684
685        cache.cache_plan(fingerprint.clone(), plan).await;
686        assert!(cache.get_plan(&fingerprint).await.is_some());
687    }
688
689    #[tokio::test]
690    async fn test_cache_statistics() {
691        let cache = OptimizationCache::default();
692        let fingerprint = QueryFingerprint {
693            query_type: QueryType::Select,
694            pattern_count: 1,
695            variable_count: 1,
696            filter_count: 0,
697            complexity_bucket: 1,
698            service_count: 1,
699            structure_hash: 11111,
700        };
701
702        // Generate some hits and misses
703        cache.get_plan(&fingerprint).await; // miss
704        cache.get_plan(&fingerprint).await; // miss
705
706        let stats = cache.get_statistics().await;
707        assert_eq!(stats.hits, 0);
708        assert_eq!(stats.misses, 2);
709        assert_eq!(stats.hit_rate, 0.0);
710    }
711}