1use anyhow::Result;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::hash::{Hash, Hasher};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime};
13use tokio::sync::RwLock;
14use tracing::{debug, info};
15
16use crate::planner::planning::types::{ExecutionPlan, QueryInfo, QueryType};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizationCacheConfig {
21 pub max_cached_plans: usize,
23 pub plan_ttl: Duration,
25 pub enable_adaptive_caching: bool,
27 pub performance_threshold: f64,
29 pub enable_similarity_matching: bool,
31 pub similarity_threshold: f64,
33 pub max_performance_age: Duration,
35 pub enable_cache_warming: bool,
37 pub cache_warming_interval: Duration,
39}
40
41impl Default for OptimizationCacheConfig {
42 fn default() -> Self {
43 Self {
44 max_cached_plans: 1000,
45 plan_ttl: Duration::from_secs(3600), enable_adaptive_caching: true,
47 performance_threshold: 0.15, enable_similarity_matching: true,
49 similarity_threshold: 0.8,
50 max_performance_age: Duration::from_secs(7200), enable_cache_warming: true,
52 cache_warming_interval: Duration::from_secs(300), }
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
59pub struct QueryFingerprint {
60 pub query_type: QueryType,
62 pub pattern_count: usize,
64 pub variable_count: usize,
66 pub filter_count: usize,
68 pub complexity_bucket: u8,
70 pub service_count: usize,
72 pub structure_hash: u64,
74}
75
76impl QueryFingerprint {
77 pub fn from_query_info(query_info: &QueryInfo) -> Self {
79 let mut hasher = std::collections::hash_map::DefaultHasher::new();
80 query_info.original_query.hash(&mut hasher);
81
82 Self {
83 query_type: query_info.query_type,
84 pattern_count: query_info.patterns.len(),
85 variable_count: query_info.variables.len(),
86 filter_count: query_info.filters.len(),
87 complexity_bucket: Self::bucket_complexity(query_info.complexity),
88 service_count: 1, structure_hash: hasher.finish(),
90 }
91 }
92
93 fn bucket_complexity(complexity: u64) -> u8 {
95 match complexity {
96 0..=10 => 1,
97 11..=50 => 2,
98 51..=100 => 3,
99 101..=500 => 4,
100 501..=1000 => 5,
101 _ => 6,
102 }
103 }
104
105 pub fn similarity(&self, other: &QueryFingerprint) -> f64 {
107 let mut score = 0.0;
108 let mut total_weight = 0.0;
109
110 if self.query_type == other.query_type {
112 score += 3.0;
113 }
114 total_weight += 3.0;
115
116 let pattern_similarity = 1.0
118 - ((self.pattern_count as f64 - other.pattern_count as f64).abs()
119 / (self.pattern_count.max(other.pattern_count) as f64 + 1.0));
120 score += pattern_similarity * 2.0;
121 total_weight += 2.0;
122
123 let variable_similarity = 1.0
125 - ((self.variable_count as f64 - other.variable_count as f64).abs()
126 / (self.variable_count.max(other.variable_count) as f64 + 1.0));
127 score += variable_similarity * 1.5;
128 total_weight += 1.5;
129
130 if self.complexity_bucket == other.complexity_bucket {
132 score += 1.0;
133 }
134 total_weight += 1.0;
135
136 let service_similarity = 1.0
138 - ((self.service_count as f64 - other.service_count as f64).abs()
139 / (self.service_count.max(other.service_count) as f64 + 1.0));
140 score += service_similarity * 1.0;
141 total_weight += 1.0;
142
143 score / total_weight
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CachedPlan {
150 pub plan: ExecutionPlan,
152 pub cached_at: SystemTime,
154 pub usage_count: u32,
156 pub avg_execution_time: Duration,
158 pub success_rate: f64,
160 pub services: Vec<String>,
162 pub performance_improvement: Option<f64>,
164 pub fingerprint: QueryFingerprint,
166}
167
168impl CachedPlan {
169 pub fn new(plan: ExecutionPlan, fingerprint: QueryFingerprint) -> Self {
171 Self {
172 plan,
173 cached_at: SystemTime::now(),
174 usage_count: 0,
175 avg_execution_time: Duration::from_millis(0),
176 success_rate: 1.0,
177 services: Vec::new(),
178 performance_improvement: None,
179 fingerprint,
180 }
181 }
182
183 pub fn update_metrics(&mut self, execution_time: Duration, success: bool) {
185 self.usage_count += 1;
186
187 let current_avg_ms = self.avg_execution_time.as_millis() as f64;
189 let new_time_ms = execution_time.as_millis() as f64;
190 let new_avg_ms = (current_avg_ms * (self.usage_count - 1) as f64 + new_time_ms)
191 / self.usage_count as f64;
192 self.avg_execution_time = Duration::from_millis(new_avg_ms as u64);
193
194 let current_successes = (self.success_rate * (self.usage_count - 1) as f64).round() as u32;
196 let new_successes = if success {
197 current_successes + 1
198 } else {
199 current_successes
200 };
201 self.success_rate = new_successes as f64 / self.usage_count as f64;
202 }
203
204 pub fn is_valid(&self, ttl: Duration) -> bool {
206 match self.cached_at.elapsed() {
207 Ok(age) => age < ttl,
208 Err(_) => false,
209 }
210 }
211
212 pub fn calculate_score(&self) -> f64 {
214 let time_factor = 1.0 / (self.avg_execution_time.as_millis() as f64 + 1.0);
215 let usage_factor = (self.usage_count as f64).ln() + 1.0;
216 let success_factor = self.success_rate;
217 let improvement_factor = self.performance_improvement.unwrap_or(0.0) + 1.0;
218
219 time_factor * usage_factor * success_factor * improvement_factor
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct PerformanceData {
226 pub execution_time: Duration,
228 pub success: bool,
230 pub memory_usage: u64,
232 pub services_contacted: Vec<String>,
234 pub timestamp: SystemTime,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheStatistics {
241 pub hits: u64,
243 pub misses: u64,
245 pub evictions: u64,
247 pub avg_reuse_count: f64,
249 pub hit_rate: f64,
251 pub performance_improvement: f64,
253 pub last_updated: SystemTime,
255}
256
257impl Default for CacheStatistics {
258 fn default() -> Self {
259 Self {
260 hits: 0,
261 misses: 0,
262 evictions: 0,
263 avg_reuse_count: 0.0,
264 hit_rate: 0.0,
265 performance_improvement: 0.0,
266 last_updated: SystemTime::now(),
267 }
268 }
269}
270
271pub struct OptimizationCache {
273 config: OptimizationCacheConfig,
274 cached_plans: Arc<RwLock<HashMap<QueryFingerprint, CachedPlan>>>,
275 performance_history: Arc<RwLock<VecDeque<PerformanceData>>>,
276 statistics: Arc<RwLock<CacheStatistics>>,
277 last_warming: Arc<RwLock<Instant>>,
278}
279
280impl OptimizationCache {
281 pub fn new(config: OptimizationCacheConfig) -> Self {
283 Self {
284 config,
285 cached_plans: Arc::new(RwLock::new(HashMap::new())),
286 performance_history: Arc::new(RwLock::new(VecDeque::new())),
287 statistics: Arc::new(RwLock::new(CacheStatistics::default())),
288 last_warming: Arc::new(RwLock::new(Instant::now())),
289 }
290 }
291
292 pub async fn get_plan(&self, fingerprint: &QueryFingerprint) -> Option<CachedPlan> {
294 let cached_plans = self.cached_plans.read().await;
295
296 if let Some(plan) = cached_plans.get(fingerprint) {
298 if plan.is_valid(self.config.plan_ttl) {
299 self.record_hit().await;
300 debug!("Cache hit for fingerprint: {:?}", fingerprint);
301 return Some(plan.clone());
302 }
303 }
304
305 if self.config.enable_similarity_matching {
307 let mut best_match: Option<CachedPlan> = None;
308 let mut best_similarity = 0.0;
309
310 for plan in cached_plans.values() {
311 if !plan.is_valid(self.config.plan_ttl) {
312 continue;
313 }
314
315 let similarity = fingerprint.similarity(&plan.fingerprint);
316 if similarity > self.config.similarity_threshold && similarity > best_similarity {
317 best_similarity = similarity;
318 best_match = Some(plan.clone());
319 }
320 }
321
322 if let Some(plan) = best_match {
323 self.record_hit().await;
324 debug!("Similarity cache hit with score: {:.3}", best_similarity);
325 return Some(plan);
326 }
327 }
328
329 self.record_miss().await;
330 None
331 }
332
333 pub async fn cache_plan(&self, fingerprint: QueryFingerprint, plan: ExecutionPlan) {
335 let mut cached_plans = self.cached_plans.write().await;
336
337 if cached_plans.len() >= self.config.max_cached_plans {
339 self.evict_least_valuable(&mut cached_plans).await;
340 }
341
342 let cached_plan = CachedPlan::new(plan, fingerprint.clone());
343 cached_plans.insert(fingerprint.clone(), cached_plan);
344
345 info!(
346 "Cached new execution plan with fingerprint: {:?}",
347 fingerprint
348 );
349 }
350
351 pub async fn update_performance(
353 &self,
354 fingerprint: &QueryFingerprint,
355 execution_time: Duration,
356 success: bool,
357 ) {
358 let mut cached_plans = self.cached_plans.write().await;
359
360 if let Some(plan) = cached_plans.get_mut(fingerprint) {
361 plan.update_metrics(execution_time, success);
362 debug!("Updated performance metrics for plan: {:?}", fingerprint);
363 }
364
365 let performance_data = PerformanceData {
367 execution_time,
368 success,
369 memory_usage: 0, services_contacted: Vec::new(), timestamp: SystemTime::now(),
372 };
373
374 let mut history = self.performance_history.write().await;
375 history.push_back(performance_data);
376
377 while history.len() > 10000 {
379 history.pop_front();
380 }
381 }
382
383 async fn evict_least_valuable(&self, cached_plans: &mut HashMap<QueryFingerprint, CachedPlan>) {
385 if cached_plans.is_empty() {
386 return;
387 }
388
389 let mut lowest_score = f64::INFINITY;
390 let mut evict_key: Option<QueryFingerprint> = None;
391
392 for (fingerprint, plan) in cached_plans.iter() {
393 let score = plan.calculate_score();
394 if score < lowest_score {
395 lowest_score = score;
396 evict_key = Some(fingerprint.clone());
397 }
398 }
399
400 if let Some(key) = evict_key {
401 cached_plans.remove(&key);
402 self.record_eviction().await;
403 debug!("Evicted plan with lowest score: {:.3}", lowest_score);
404 }
405 }
406
407 async fn record_hit(&self) {
409 let mut stats = self.statistics.write().await;
410 stats.hits += 1;
411 stats.hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
412 stats.last_updated = SystemTime::now();
413 }
414
415 async fn record_miss(&self) {
417 let mut stats = self.statistics.write().await;
418 stats.misses += 1;
419 stats.hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
420 stats.last_updated = SystemTime::now();
421 }
422
423 async fn record_eviction(&self) {
425 let mut stats = self.statistics.write().await;
426 stats.evictions += 1;
427 stats.last_updated = SystemTime::now();
428 }
429
430 pub async fn get_statistics(&self) -> CacheStatistics {
432 let stats = self.statistics.read().await;
433 stats.clone()
434 }
435
436 pub async fn warm_cache(&self) -> Result<()> {
438 let now = Instant::now();
439 let last_warming = self.last_warming.read().await;
440
441 if now.duration_since(*last_warming) < self.config.cache_warming_interval {
442 return Ok(());
443 }
444
445 info!("Starting cache warming process");
446
447 let history = self.performance_history.read().await;
449 let mut pattern_performance: HashMap<String, Vec<Duration>> = HashMap::new();
450
451 for data in history.iter() {
452 if data.success {
453 let bucket = Self::get_time_bucket(data.execution_time);
455 pattern_performance
456 .entry(bucket)
457 .or_default()
458 .push(data.execution_time);
459 }
460 }
461
462 let common_patterns = vec![
464 ("simple_select", QueryType::Select, 1, 1, 0),
465 ("complex_select", QueryType::Select, 5, 3, 2),
466 ("construct", QueryType::Construct, 3, 2, 1),
467 ("ask", QueryType::Ask, 1, 0, 0),
468 ];
469
470 for (name, query_type, patterns, vars, filters) in common_patterns {
471 let fingerprint = QueryFingerprint {
472 query_type,
473 pattern_count: patterns,
474 variable_count: vars,
475 filter_count: filters,
476 complexity_bucket: 2,
477 service_count: 1,
478 structure_hash: name
479 .as_bytes()
480 .iter()
481 .fold(0u64, |acc, &b| acc.wrapping_mul(31).wrapping_add(b as u64)),
482 };
483
484 if !self.cached_plans.read().await.contains_key(&fingerprint) {
486 debug!("Would pre-warm pattern: {}", name);
487 }
489 }
490
491 *self.last_warming.write().await = now;
493
494 info!("Cache warming completed");
495 Ok(())
496 }
497
498 fn get_time_bucket(duration: Duration) -> String {
500 let ms = duration.as_millis();
501 match ms {
502 0..=100 => "fast".to_string(),
503 101..=1000 => "medium".to_string(),
504 1001..=5000 => "slow".to_string(),
505 _ => "very_slow".to_string(),
506 }
507 }
508
509 pub async fn analyze_effectiveness(&self) -> Result<CacheAnalysis> {
511 let stats = self.statistics.read().await;
512 let cached_plans = self.cached_plans.read().await;
513 let _performance_history = self.performance_history.read().await;
514
515 let total_requests = stats.hits + stats.misses;
516 let hit_rate = if total_requests > 0 {
517 stats.hits as f64 / total_requests as f64
518 } else {
519 0.0
520 };
521
522 let mut avg_reuse = 0.0;
523 if !cached_plans.is_empty() {
524 avg_reuse = cached_plans
525 .values()
526 .map(|p| p.usage_count as f64)
527 .sum::<f64>()
528 / cached_plans.len() as f64;
529 }
530
531 let recommendations = self
532 .generate_recommendations(hit_rate, avg_reuse, &cached_plans)
533 .await;
534
535 Ok(CacheAnalysis {
536 hit_rate,
537 avg_reuse_count: avg_reuse,
538 total_cached_plans: cached_plans.len(),
539 memory_usage_estimate: cached_plans.len() * 1024, recommendations,
541 effectiveness_score: self.calculate_effectiveness_score(hit_rate, avg_reuse),
542 })
543 }
544
545 async fn generate_recommendations(
547 &self,
548 hit_rate: f64,
549 avg_reuse: f64,
550 cached_plans: &HashMap<QueryFingerprint, CachedPlan>,
551 ) -> Vec<String> {
552 let mut recommendations = Vec::new();
553
554 if hit_rate < 0.3 {
555 recommendations.push(
556 "Consider increasing cache size or adjusting similarity threshold".to_string(),
557 );
558 }
559
560 if avg_reuse < 2.0 {
561 recommendations
562 .push("Query patterns show low reuse - consider query optimization".to_string());
563 }
564
565 if cached_plans.len() < self.config.max_cached_plans / 10 {
566 recommendations
567 .push("Cache utilization is low - consider more aggressive caching".to_string());
568 }
569
570 let expired_count = cached_plans
571 .values()
572 .filter(|p| !p.is_valid(self.config.plan_ttl))
573 .count();
574
575 if expired_count > cached_plans.len() / 4 {
576 recommendations.push("Many cached plans are expired - consider longer TTL".to_string());
577 }
578
579 recommendations
580 }
581
582 fn calculate_effectiveness_score(&self, hit_rate: f64, avg_reuse: f64) -> f64 {
584 let hit_score = hit_rate;
585 let reuse_score = (avg_reuse - 1.0).max(0.0) / 10.0; (hit_score * 0.7 + reuse_score * 0.3).min(1.0)
588 }
589
590 pub async fn cleanup_expired(&self) -> Result<usize> {
592 let mut cached_plans = self.cached_plans.write().await;
593 let initial_count = cached_plans.len();
594
595 cached_plans.retain(|_, plan| plan.is_valid(self.config.plan_ttl));
596
597 let removed_count = initial_count - cached_plans.len();
598 if removed_count > 0 {
599 info!("Cleaned up {} expired cache entries", removed_count);
600 }
601
602 Ok(removed_count)
603 }
604}
605
606#[derive(Debug, Serialize, Deserialize)]
608pub struct CacheAnalysis {
609 pub hit_rate: f64,
610 pub avg_reuse_count: f64,
611 pub total_cached_plans: usize,
612 pub memory_usage_estimate: usize,
613 pub recommendations: Vec<String>,
614 pub effectiveness_score: f64,
615}
616
617impl Default for OptimizationCache {
618 fn default() -> Self {
619 Self::new(OptimizationCacheConfig::default())
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 #[tokio::test]
628 async fn test_query_fingerprint_similarity() {
629 let fp1 = QueryFingerprint {
630 query_type: QueryType::Select,
631 pattern_count: 3,
632 variable_count: 2,
633 filter_count: 1,
634 complexity_bucket: 2,
635 service_count: 1,
636 structure_hash: 12345,
637 };
638
639 let fp2 = QueryFingerprint {
640 query_type: QueryType::Select,
641 pattern_count: 3,
642 variable_count: 2,
643 filter_count: 1,
644 complexity_bucket: 2,
645 service_count: 1,
646 structure_hash: 54321,
647 };
648
649 let similarity = fp1.similarity(&fp2);
650 assert!(
651 similarity > 0.8,
652 "Similar queries should have high similarity score"
653 );
654 }
655
656 #[tokio::test]
657 async fn test_cache_operations() {
658 let cache = OptimizationCache::default();
659
660 let fingerprint = QueryFingerprint {
661 query_type: QueryType::Select,
662 pattern_count: 1,
663 variable_count: 1,
664 filter_count: 0,
665 complexity_bucket: 1,
666 service_count: 1,
667 structure_hash: 67890,
668 };
669
670 assert!(cache.get_plan(&fingerprint).await.is_none());
672
673 let plan = ExecutionPlan {
675 query_id: "test-query".to_string(),
676 steps: Vec::new(),
677 estimated_total_cost: 100.0,
678 max_parallelism: 4,
679 planning_time: Duration::from_millis(50),
680 cache_key: None,
681 metadata: HashMap::new(),
682 parallelizable_steps: Vec::new(),
683 };
684
685 cache.cache_plan(fingerprint.clone(), plan).await;
686 assert!(cache.get_plan(&fingerprint).await.is_some());
687 }
688
689 #[tokio::test]
690 async fn test_cache_statistics() {
691 let cache = OptimizationCache::default();
692 let fingerprint = QueryFingerprint {
693 query_type: QueryType::Select,
694 pattern_count: 1,
695 variable_count: 1,
696 filter_count: 0,
697 complexity_bucket: 1,
698 service_count: 1,
699 structure_hash: 11111,
700 };
701
702 cache.get_plan(&fingerprint).await; cache.get_plan(&fingerprint).await; let stats = cache.get_statistics().await;
707 assert_eq!(stats.hits, 0);
708 assert_eq!(stats.misses, 2);
709 assert_eq!(stats.hit_rate, 0.0);
710 }
711}