1use anyhow::Result;
26use chrono::{DateTime, Duration, Utc};
27use dashmap::DashMap;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
32use tracing::{debug, info};
33use uuid::Uuid;
34
35use crate::content_vectorizer::SemanticSearchResult;
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct QueryCacheConfig {
40 pub max_cache_size: usize,
42 pub ttl_minutes: i64,
44 pub similarity_threshold: f32,
46 pub enable_prefetching: bool,
48 pub max_prefetch_variations: usize,
50 pub enable_stats: bool,
52}
53
54impl Default for QueryCacheConfig {
55 fn default() -> Self {
56 Self {
57 max_cache_size: 1000,
58 ttl_minutes: 30,
59 similarity_threshold: 0.85,
60 enable_prefetching: true,
61 max_prefetch_variations: 5,
62 enable_stats: true,
63 }
64 }
65}
66
67#[derive(Debug, Serialize, Deserialize)]
69pub struct CachedQuery {
70 pub id: Uuid,
72 pub query_text: String,
74 pub query_vector: Vec<f32>,
76 pub results: Vec<SemanticSearchResult>,
78 pub cached_at: DateTime<Utc>,
80 last_accessed: AtomicU64,
82 access_count: AtomicU64,
84 pub params_hash: u64,
86 pub session_id: Option<Uuid>,
88 efficiency_score_bits: AtomicU64,
90}
91
92impl CachedQuery {
93 pub fn new(
95 query_text: String,
96 query_vector: Vec<f32>,
97 results: Vec<SemanticSearchResult>,
98 params_hash: u64,
99 session_id: Option<Uuid>,
100 ) -> Self {
101 let now = Utc::now();
102 let now_timestamp = now.timestamp() as u64;
103
104 Self {
105 id: Uuid::new_v4(),
106 query_text,
107 query_vector,
108 results,
109 cached_at: now,
110 last_accessed: AtomicU64::new(now_timestamp),
111 access_count: AtomicU64::new(0),
112 params_hash,
113 session_id,
114 efficiency_score_bits: AtomicU64::new(1.0f32.to_bits() as u64),
115 }
116 }
117
118 pub fn is_expired(&self, ttl_minutes: i64) -> bool {
120 let ttl_duration = Duration::minutes(ttl_minutes);
121 Utc::now() - self.cached_at > ttl_duration
122 }
123
124 pub fn mark_accessed(&self) {
126 let now = Utc::now();
127 let now_timestamp = now.timestamp() as u64;
128 self.last_accessed.store(now_timestamp, Ordering::Relaxed);
129 self.access_count.fetch_add(1, Ordering::Relaxed);
130
131 let hours_since_cached = (now - self.cached_at).num_hours().max(1) as f32;
134 let recency_factor = 1.0 / (1.0 + hours_since_cached / 24.0); loop {
137 let old_bits = self.efficiency_score_bits.load(Ordering::Relaxed);
138 let count = self.access_count.load(Ordering::Relaxed);
139 let frequency_factor = (count as f32).ln().max(1.0);
140 let score = recency_factor * frequency_factor;
141
142 if self
143 .efficiency_score_bits
144 .compare_exchange_weak(
145 old_bits,
146 score.to_bits() as u64,
147 Ordering::Relaxed,
148 Ordering::Relaxed,
149 )
150 .is_ok()
151 {
152 break;
153 }
154 }
155 }
156
157 pub fn efficiency_score(&self) -> f32 {
159 f32::from_bits(self.efficiency_score_bits.load(Ordering::Relaxed) as u32)
160 }
161
162 pub fn similarity_with(&self, other_vector: &[f32]) -> f32 {
164 if self.query_vector.len() != other_vector.len() {
165 return 0.0;
166 }
167
168 let dot_product: f32 = self
169 .query_vector
170 .iter()
171 .zip(other_vector.iter())
172 .map(|(a, b)| a * b)
173 .sum();
174
175 let norm_a: f32 = self.query_vector.iter().map(|x| x * x).sum::<f32>().sqrt();
176 let norm_b: f32 = other_vector.iter().map(|x| x * x).sum::<f32>().sqrt();
177
178 if norm_a == 0.0 || norm_b == 0.0 {
179 0.0
180 } else {
181 dot_product / (norm_a * norm_b)
182 }
183 }
184}
185
186#[derive(Debug, Serialize, Deserialize)]
188pub struct QueryCacheStats {
189 pub total_queries: AtomicU64,
191 pub cache_hits: AtomicU64,
193 pub cache_misses: AtomicU64,
195 pub expired_removed: AtomicU64,
197 pub evicted_entries: AtomicU64,
199 avg_hit_similarity_bits: AtomicU64,
201 pub current_cache_size: AtomicUsize,
203 pub estimated_memory_bytes: AtomicUsize,
205 pub hit_rate: f32,
207 pub avg_time_saved_ms: f32,
209}
210
211impl Default for QueryCacheStats {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217impl QueryCacheStats {
218 pub fn new() -> Self {
220 Self {
221 total_queries: AtomicU64::new(0),
222 cache_hits: AtomicU64::new(0),
223 cache_misses: AtomicU64::new(0),
224 expired_removed: AtomicU64::new(0),
225 evicted_entries: AtomicU64::new(0),
226 avg_hit_similarity_bits: AtomicU64::new(0.0f32.to_bits() as u64),
227 current_cache_size: AtomicUsize::new(0),
228 estimated_memory_bytes: AtomicUsize::new(0),
229 hit_rate: 0.0,
230 avg_time_saved_ms: 0.0,
231 }
232 }
233
234 pub fn record_hit(&self, similarity: f32) {
236 self.cache_hits.fetch_add(1, Ordering::Relaxed);
237
238 loop {
240 let old_bits = self.avg_hit_similarity_bits.load(Ordering::Relaxed);
241 let hits = self.cache_hits.load(Ordering::Relaxed);
242 let current_avg = f32::from_bits(old_bits as u32);
243 let new_avg = if hits == 1 {
244 similarity
245 } else {
246 ((current_avg * (hits as f32 - 1.0)) + similarity) / hits as f32
247 };
248
249 if self
250 .avg_hit_similarity_bits
251 .compare_exchange_weak(
252 old_bits,
253 new_avg.to_bits() as u64,
254 Ordering::Relaxed,
255 Ordering::Relaxed,
256 )
257 .is_ok()
258 {
259 break;
260 }
261 }
262 }
263
264 pub fn record_miss(&self) {
266 self.cache_misses.fetch_add(1, Ordering::Relaxed);
267 }
268
269 pub fn avg_hit_similarity(&self) -> f32 {
271 f32::from_bits(self.avg_hit_similarity_bits.load(Ordering::Relaxed) as u32)
272 }
273
274 pub fn snapshot(&self) -> QueryCacheStatsSnapshot {
276 let total = self.total_queries.load(Ordering::Relaxed);
277 let hits = self.cache_hits.load(Ordering::Relaxed);
278
279 let hit_rate = if total > 0 {
280 (hits as f32 / total as f32) * 100.0
281 } else {
282 0.0
283 };
284
285 QueryCacheStatsSnapshot {
286 total_queries: total,
287 cache_hits: hits,
288 cache_misses: self.cache_misses.load(Ordering::Relaxed),
289 expired_removed: self.expired_removed.load(Ordering::Relaxed),
290 evicted_entries: self.evicted_entries.load(Ordering::Relaxed),
291 avg_hit_similarity: self.avg_hit_similarity(),
292 current_cache_size: self.current_cache_size.load(Ordering::Relaxed),
293 estimated_memory_bytes: self.estimated_memory_bytes.load(Ordering::Relaxed),
294 hit_rate,
295 avg_time_saved_ms: 150.0, }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct QueryCacheStatsSnapshot {
303 pub total_queries: u64,
305 pub cache_hits: u64,
307 pub cache_misses: u64,
309 pub expired_removed: u64,
311 pub evicted_entries: u64,
313 pub avg_hit_similarity: f32,
315 pub current_cache_size: usize,
317 pub estimated_memory_bytes: usize,
319 pub hit_rate: f32,
321 pub avg_time_saved_ms: f32,
323}
324
325#[derive(Debug)]
327struct QueryPattern {
328 frequency: AtomicU64,
329 last_seen: AtomicU64,
330}
331
332pub struct QueryCache {
334 cache: Arc<DashMap<Uuid, CachedQuery>>,
336
337 config: QueryCacheConfig,
339
340 stats: Arc<QueryCacheStats>,
342
343 patterns: Arc<DashMap<String, QueryPattern>>,
345
346 recent_queries: Arc<DashMap<String, AtomicU64>>,
348}
349
350impl QueryCache {
351 pub fn new(config: QueryCacheConfig) -> Self {
353 info!(
354 "Initializing query cache with max size: {}",
355 config.max_cache_size
356 );
357
358 Self {
359 cache: Arc::new(DashMap::new()),
360 config,
361 stats: Arc::new(QueryCacheStats::new()),
362 patterns: Arc::new(DashMap::new()),
363 recent_queries: Arc::new(DashMap::new()),
364 }
365 }
366
367 pub fn search(
369 &self,
370 query_text: &str,
371 query_vector: &[f32],
372 params_hash: u64,
373 ) -> Option<Vec<SemanticSearchResult>> {
374 self.stats.total_queries.fetch_add(1, Ordering::Relaxed);
376
377 if let Some(results) = self.find_exact_match(params_hash) {
379 self.stats.record_hit(1.0);
380 return Some(results);
381 }
382
383 if let Some((results, similarity)) = self.find_similar_query(query_vector, params_hash) {
386 self.stats.record_hit(similarity);
387 return Some(results);
388 }
389
390 self.stats.record_miss();
392
393 if self.config.enable_prefetching {
395 self.update_query_patterns(query_text);
396 }
397
398 None
399 }
400
401 pub fn cache_results(
403 &self,
404 query_text: String,
405 query_vector: Vec<f32>,
406 results: Vec<SemanticSearchResult>,
407 params_hash: u64,
408 session_id: Option<Uuid>,
409 ) -> Result<()> {
410 self.cleanup_expired()?;
412
413 let cached_query = CachedQuery::new(
414 query_text.clone(),
415 query_vector,
416 results,
417 params_hash,
418 session_id,
419 );
420
421 let query_id = cached_query.id;
422
423 let current_size = self.cache.len();
425 if current_size >= self.config.max_cache_size {
426 self.evict_least_efficient();
427 }
428
429 self.cache.insert(query_id, cached_query);
431
432 self.stats
434 .current_cache_size
435 .store(self.cache.len(), Ordering::Relaxed);
436 self.stats
437 .estimated_memory_bytes
438 .store(self.estimate_memory_usage(), Ordering::Relaxed);
439
440 debug!("Cached query results for: {}", query_text);
441 Ok(())
442 }
443
444 fn find_exact_match(&self, params_hash: u64) -> Option<Vec<SemanticSearchResult>> {
446 for entry in self.cache.iter() {
447 let cached_query = entry.value();
448 if cached_query.params_hash == params_hash
449 && !cached_query.is_expired(self.config.ttl_minutes)
450 {
451 cached_query.mark_accessed();
453 return Some(cached_query.results.clone());
454 }
455 }
456 None
457 }
458
459 fn find_similar_query(
464 &self,
465 query_vector: &[f32],
466 params_hash: u64,
467 ) -> Option<(Vec<SemanticSearchResult>, f32)> {
468 let mut best_match: Option<(Vec<SemanticSearchResult>, f32, Uuid)> = None;
469 let mut best_similarity = 0.0f32;
470
471 for entry in self.cache.iter() {
472 let cached_query = entry.value();
473
474 if cached_query.is_expired(self.config.ttl_minutes) {
475 continue;
476 }
477
478 if cached_query.params_hash != params_hash {
483 continue;
484 }
485
486 let similarity = cached_query.similarity_with(query_vector);
487
488 if similarity >= self.config.similarity_threshold && similarity > best_similarity {
489 best_similarity = similarity;
490 best_match = Some((cached_query.results.clone(), similarity, cached_query.id));
491 }
492 }
493
494 if let Some((results, similarity, query_id)) = best_match {
495 if let Some(entry) = self.cache.get(&query_id) {
497 entry.mark_accessed();
498 }
499 Some((results, similarity))
500 } else {
501 None
502 }
503 }
504
505 fn cleanup_expired(&self) -> Result<()> {
507 let mut removed_count = 0;
508
509 self.cache.retain(|_, cached_query| {
510 let expired = cached_query.is_expired(self.config.ttl_minutes);
511 if expired {
512 removed_count += 1;
513 }
514 !expired
515 });
516
517 if removed_count > 0 {
518 self.stats
519 .expired_removed
520 .fetch_add(removed_count as u64, Ordering::Relaxed);
521 self.stats
522 .current_cache_size
523 .store(self.cache.len(), Ordering::Relaxed);
524 self.stats
525 .estimated_memory_bytes
526 .store(self.estimate_memory_usage(), Ordering::Relaxed);
527
528 debug!("Removed {} expired cache entries", removed_count);
529 }
530
531 Ok(())
532 }
533
534 fn evict_least_efficient(&self) {
536 if self.cache.is_empty() {
537 return;
538 }
539
540 let mut worst_id: Option<Uuid> = None;
542 let mut worst_score = f32::INFINITY;
543
544 for entry in self.cache.iter() {
545 let score = entry.value().efficiency_score();
546 if score < worst_score {
547 worst_score = score;
548 worst_id = Some(*entry.key());
549 }
550 }
551
552 if let Some(id) = worst_id {
553 if self.cache.remove(&id).is_some() {
556 self.stats.evicted_entries.fetch_add(1, Ordering::Relaxed);
557
558 debug!(
559 "Evicted cache entry with efficiency score: {:.3}",
560 worst_score
561 );
562 }
563 }
564 }
565
566 fn update_query_patterns(&self, query_text: &str) {
568 let now = Utc::now().timestamp() as u64;
569 let query_lower = query_text.to_lowercase();
570
571 self.recent_queries
573 .insert(query_lower.clone(), AtomicU64::new(now));
574
575 self.patterns
577 .entry(query_lower.clone())
578 .and_modify(|pattern| {
579 pattern.frequency.fetch_add(1, Ordering::Relaxed);
580 pattern.last_seen.store(now, Ordering::Relaxed);
581 })
582 .or_insert_with(|| QueryPattern {
583 frequency: AtomicU64::new(1),
584 last_seen: AtomicU64::new(now),
585 });
586
587 }
590
591 pub fn get_stats(&self) -> QueryCacheStatsSnapshot {
593 self.stats.snapshot()
594 }
595
596 pub fn clear(&self) -> Result<()> {
598 let old_size = self.cache.len();
599 self.cache.clear();
600
601 self.stats.current_cache_size.store(0, Ordering::Relaxed);
602 self.stats
603 .estimated_memory_bytes
604 .store(0, Ordering::Relaxed);
605
606 info!("Query cache cleared ({} entries)", old_size);
607 Ok(())
608 }
609
610 pub fn invalidate_session(&self, session_id: Uuid) -> Result<()> {
613 let mut invalidated_count = 0;
614 let mut keys_to_remove = Vec::new();
615
616 for entry in self.cache.iter() {
618 if entry.value().session_id == Some(session_id) {
619 keys_to_remove.push(*entry.key());
620 }
621 }
622
623 for key in keys_to_remove {
625 if self.cache.remove(&key).is_some() {
626 invalidated_count += 1;
627 }
628 }
629
630 let new_size = self.cache.len();
632 self.stats
633 .current_cache_size
634 .store(new_size, Ordering::Relaxed);
635 self.stats
636 .estimated_memory_bytes
637 .store(self.estimate_memory_usage(), Ordering::Relaxed);
638
639 if invalidated_count > 0 {
640 debug!(
641 "Invalidated {} cache entries for session {} (remaining: {})",
642 invalidated_count, session_id, new_size
643 );
644 }
645
646 Ok(())
647 }
648
649 fn estimate_memory_usage(&self) -> usize {
651 self.cache
652 .iter()
653 .map(|entry| {
654 let query = entry.value();
655 let text_size = query.query_text.len();
656 let vector_size = query.query_vector.len() * std::mem::size_of::<f32>();
657 let results_size = query.results.len() * 200; text_size + vector_size + results_size + 100 })
660 .sum()
661 }
662
663 pub fn get_efficiency_metrics(&self) -> HashMap<String, f32> {
665 let stats = self.get_stats();
666 let cache_size = self.cache.len();
667
668 let mut metrics = HashMap::new();
669 metrics.insert("hit_rate".to_string(), stats.hit_rate);
670 metrics.insert("avg_hit_similarity".to_string(), stats.avg_hit_similarity);
671 metrics.insert(
672 "cache_utilization".to_string(),
673 cache_size as f32 / self.config.max_cache_size as f32 * 100.0,
674 );
675 metrics.insert("avg_time_saved_ms".to_string(), stats.avg_time_saved_ms);
676
677 metrics
678 }
679}
680
681impl Default for QueryCache {
682 fn default() -> Self {
683 Self::new(QueryCacheConfig::default())
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690
691 #[test]
692 fn test_query_cache_creation() {
693 let config = QueryCacheConfig::default();
694 let cache = QueryCache::new(config);
695
696 let stats = cache.get_stats();
697 assert_eq!(stats.total_queries, 0);
698 assert_eq!(stats.cache_hits, 0);
699 }
700
701 #[test]
702 fn test_cache_and_retrieve() {
703 let cache = QueryCache::default();
704
705 let query_text = "test query".to_string();
706 let query_vector = vec![0.1, 0.2, 0.3];
707 let results = vec![];
708 let params_hash = 12345u64;
709
710 cache
712 .cache_results(
713 query_text.clone(),
714 query_vector.clone(),
715 results,
716 params_hash,
717 None,
718 )
719 .unwrap();
720
721 let cached_results = cache.search(&query_text, &query_vector, params_hash);
723 assert!(cached_results.is_some());
724 }
725
726 #[test]
727 fn test_similarity_matching() {
728 let cache = QueryCache::default();
729
730 let query_vector1 = vec![1.0, 0.0, 0.0];
731 let query_vector2 = vec![0.9, 0.1, 0.0]; let results = vec![];
733 let params_hash = 123; cache
737 .cache_results(
738 "query1".to_string(),
739 query_vector1,
740 results,
741 params_hash,
742 None,
743 )
744 .unwrap();
745
746 let cached_results = cache.search("query2", &query_vector2, params_hash);
749 assert!(cached_results.is_some());
750
751 let different_params = cache.search("query2", &query_vector2, 456);
753 assert!(
754 different_params.is_none(),
755 "Bug fix verification: different params_hash should not return cached results"
756 );
757 }
758
759 #[test]
760 fn test_cache_expiration() {
761 let config = QueryCacheConfig {
762 ttl_minutes: 0, ..Default::default()
764 };
765
766 let cache = QueryCache::new(config);
767
768 cache
769 .cache_results("test".to_string(), vec![1.0, 0.0], vec![], 123, None)
770 .unwrap();
771
772 let cached_results = cache.search("test", &[1.0, 0.0], 123);
774 assert!(cached_results.is_none());
775 }
776}