1use crate::advanced_caching::{CacheConfig, CacheKey};
9use crate::advanced_caching_multilevel::{CacheInvalidator, MultiLevelCache};
10use anyhow::{anyhow, Result};
11use std::sync::{Arc, RwLock};
12use std::thread::{self, JoinHandle};
13
14pub struct BackgroundCacheWorker {
20 cache: Arc<MultiLevelCache>,
21 invalidator: Arc<CacheInvalidator>,
22 config: CacheConfig,
23 worker_handle: Option<JoinHandle<()>>,
24 shutdown_signal: Arc<RwLock<bool>>,
25}
26
27impl BackgroundCacheWorker {
28 pub fn new(
29 cache: Arc<MultiLevelCache>,
30 invalidator: Arc<CacheInvalidator>,
31 config: CacheConfig,
32 ) -> Self {
33 Self {
34 cache,
35 invalidator,
36 config,
37 worker_handle: None,
38 shutdown_signal: Arc::new(RwLock::new(false)),
39 }
40 }
41
42 pub fn start(&mut self) -> Result<()> {
44 if !self.config.enable_background_updates {
45 return Ok(());
46 }
47
48 let cache = Arc::clone(&self.cache);
49 let invalidator = Arc::clone(&self.invalidator);
50 let interval = self.config.background_update_interval;
51 let shutdown_signal = Arc::clone(&self.shutdown_signal);
52
53 let handle = thread::spawn(move || {
54 while let Ok(shutdown) = shutdown_signal.read() {
55 if *shutdown {
56 break;
57 }
58 drop(shutdown); if let Err(e) = Self::perform_maintenance(&cache, &invalidator) {
62 tracing::warn!("Background cache maintenance error: {}", e);
64 }
65
66 thread::sleep(interval);
68 }
69 });
70
71 self.worker_handle = Some(handle);
72 Ok(())
73 }
74
75 pub fn stop(&mut self) -> Result<()> {
77 {
79 let mut signal = self.shutdown_signal.write().expect("lock poisoned");
80 *signal = true;
81 }
82
83 if let Some(handle) = self.worker_handle.take() {
85 handle
86 .join()
87 .map_err(|e| anyhow!("Failed to join worker thread: {:?}", e))?;
88 }
89
90 Ok(())
91 }
92
93 fn perform_maintenance(
95 cache: &Arc<MultiLevelCache>,
96 invalidator: &Arc<CacheInvalidator>,
97 ) -> Result<()> {
98 let expired_count = invalidator.invalidate_expired()?;
100 if expired_count > 0 {
101 println!("Background worker cleaned {expired_count} expired entries");
102 }
103
104 let memory_stats = cache.get_memory_stats();
106 let utilization = memory_stats.memory_bytes as f64 / memory_stats.max_memory_bytes as f64;
107
108 if utilization > 0.9 {
109 Self::aggressive_cleanup(cache)?;
111 }
112
113 Self::sync_hot_entries(cache)?;
115
116 Ok(())
117 }
118
119 fn aggressive_cleanup(_cache: &Arc<MultiLevelCache>) -> Result<()> {
121 println!("Performing aggressive cache cleanup due to high memory usage");
124 Ok(())
125 }
126
127 fn sync_hot_entries(_cache: &Arc<MultiLevelCache>) -> Result<()> {
129 Ok(())
132 }
133}
134
135impl Drop for BackgroundCacheWorker {
136 fn drop(&mut self) {
137 let _ = self.stop();
138 }
139}
140
141pub struct CacheWarmer {
147 cache: Arc<MultiLevelCache>,
148}
149
150impl CacheWarmer {
151 pub fn new(cache: Arc<MultiLevelCache>) -> Self {
152 Self { cache }
153 }
154
155 pub fn warm_with_data(&self, data: Vec<(CacheKey, crate::Vector)>) -> Result<usize> {
157 let mut loaded_count = 0;
158
159 for (key, vector) in data {
160 if self.cache.insert(key, vector).is_ok() {
161 loaded_count += 1;
162 }
163 }
164
165 Ok(loaded_count)
166 }
167
168 pub fn warm_from_persistent(&self, keys: Vec<CacheKey>) -> Result<usize> {
170 let mut loaded_count = 0;
171
172 for key in keys {
173 if self.cache.get(&key).is_some() {
175 loaded_count += 1;
176 }
177 }
178
179 Ok(loaded_count)
180 }
181
182 pub fn warm_with_generator<F>(&self, count: usize, generator: F) -> Result<usize>
184 where
185 F: Fn(usize) -> Option<(CacheKey, crate::Vector)>,
186 {
187 let mut loaded_count = 0;
188
189 for i in 0..count {
190 if let Some((key, vector)) = generator(i) {
191 if self.cache.insert(key, vector).is_ok() {
192 loaded_count += 1;
193 }
194 }
195 }
196
197 Ok(loaded_count)
198 }
199}
200
201pub struct CacheAnalyzer {
207 cache: Arc<MultiLevelCache>,
208 invalidator: Arc<CacheInvalidator>,
209}
210
211#[derive(Debug, Clone)]
212pub struct CacheAnalysisReport {
213 pub memory_utilization: f64,
214 pub hit_ratio: f64,
215 pub persistent_hit_ratio: f64,
216 pub most_accessed_namespaces: Vec<(String, usize)>,
217 pub recommendations: Vec<String>,
218 pub performance_score: f64, }
220
221impl CacheAnalyzer {
222 pub fn new(cache: Arc<MultiLevelCache>, invalidator: Arc<CacheInvalidator>) -> Self {
223 Self { cache, invalidator }
224 }
225
226 pub fn analyze(&self) -> CacheAnalysisReport {
228 let stats = self.cache.get_stats();
229 let memory_stats = self.cache.get_memory_stats();
230 let invalidation_stats = self.invalidator.get_stats();
231
232 let memory_utilization =
233 memory_stats.memory_bytes as f64 / memory_stats.max_memory_bytes as f64;
234
235 let total_requests = stats.total_requests;
236 let total_hits = stats.memory_hits + stats.persistent_hits;
237 let hit_ratio = if total_requests > 0 {
238 total_hits as f64 / total_requests as f64
239 } else {
240 0.0
241 };
242
243 let persistent_hit_ratio = if stats.persistent_hits + stats.persistent_misses > 0 {
244 stats.persistent_hits as f64 / (stats.persistent_hits + stats.persistent_misses) as f64
245 } else {
246 0.0
247 };
248
249 let mut recommendations = Vec::new();
250
251 if hit_ratio < 0.5 {
253 recommendations
254 .push("Consider increasing cache size or adjusting eviction policy".to_string());
255 }
256
257 if memory_utilization > 0.9 {
258 recommendations.push(
259 "Memory cache is near capacity - consider increasing max_memory_bytes".to_string(),
260 );
261 }
262
263 if persistent_hit_ratio < 0.3 {
264 recommendations
265 .push("Persistent cache hit ratio is low - review TTL settings".to_string());
266 }
267
268 if invalidation_stats.tracked_namespaces > 100 {
269 recommendations
270 .push("Consider consolidating namespaces to reduce tracking overhead".to_string());
271 }
272
273 let performance_score =
275 (hit_ratio * 0.4 + (1.0 - memory_utilization) * 0.3 + persistent_hit_ratio * 0.3)
276 .clamp(0.0, 1.0);
277
278 CacheAnalysisReport {
279 memory_utilization,
280 hit_ratio,
281 persistent_hit_ratio,
282 most_accessed_namespaces: vec![], recommendations,
284 performance_score,
285 }
286 }
287
288 pub fn get_optimization_recommendations(&self) -> Vec<String> {
290 self.analyze().recommendations
291 }
292}
293
294#[cfg(test)]
299mod tests {
300 use super::{BackgroundCacheWorker, CacheAnalyzer, CacheWarmer};
301 use crate::advanced_caching::{CacheConfig, CacheEntry, CacheKey, EvictionPolicy};
302 use crate::advanced_caching_eviction::{MemoryCache, PersistentCache};
303 use crate::advanced_caching_multilevel::{CacheInvalidator, MultiLevelCache};
304 use crate::Vector;
305 use anyhow::Result;
306 use std::collections::HashMap;
307 use std::sync::Arc;
308 use std::time::Duration;
309 use tempfile::TempDir;
310
311 #[test]
312 fn test_cache_key() {
313 let key = CacheKey::new("embeddings", "test_doc").with_variant("v1");
314
315 assert_eq!(key.namespace, "embeddings");
316 assert_eq!(key.key, "test_doc");
317 assert_eq!(key.variant, Some("v1".to_string()));
318 assert_eq!(key.to_string(), "embeddings:test_doc:v1");
319 }
320
321 #[test]
322 fn test_memory_cache() -> Result<()> {
323 let config = CacheConfig {
324 max_memory_entries: 2,
325 max_memory_bytes: 1024,
326 ..Default::default()
327 };
328
329 let mut cache = MemoryCache::new(config);
330
331 let key1 = CacheKey::new("test", "key1");
332 let key2 = CacheKey::new("test", "key2");
333 let key3 = CacheKey::new("test", "key3");
334
335 let vector1 = Vector::new(vec![1.0, 2.0, 3.0]);
336 let vector2 = Vector::new(vec![4.0, 5.0, 6.0]);
337 let vector3 = Vector::new(vec![7.0, 8.0, 9.0]);
338
339 cache.insert(key1.clone(), CacheEntry::new(vector1.clone()))?;
341 cache.insert(key2.clone(), CacheEntry::new(vector2.clone()))?;
342
343 assert!(cache.get(&key1).is_some());
345 assert!(cache.get(&key2).is_some());
346
347 cache.insert(key3.clone(), CacheEntry::new(vector3.clone()))?;
349
350 let remaining = cache.entries.len();
352 assert_eq!(remaining, 2);
353 Ok(())
354 }
355
356 #[test]
357 fn test_persistent_cache() -> Result<()> {
358 let temp_dir = TempDir::new()?;
359
360 let config = CacheConfig {
361 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
362 enable_compression: true,
363 ..Default::default()
364 };
365
366 let cache = PersistentCache::new(config)?;
367
368 let key = CacheKey::new("test", "persistent_key");
369 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
370 let entry = CacheEntry::new(vector.clone());
371
372 cache.store(&key, &entry)?;
374 let retrieved = cache.load(&key)?;
375
376 assert!(retrieved.is_some());
378 let retrieved_entry = retrieved.expect("retrieved entry was None");
379 assert_eq!(retrieved_entry.data.as_f32(), vector.as_f32());
380 Ok(())
381 }
382
383 #[test]
384 fn test_multi_level_cache() -> Result<()> {
385 let temp_dir = TempDir::new()?;
386
387 let config = CacheConfig {
388 max_memory_entries: 2,
389 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
390 enable_persistent: true,
391 ..Default::default()
392 };
393
394 let cache = MultiLevelCache::new(config)?;
395
396 let key = CacheKey::new("test", "multi_level");
397 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
398
399 cache.insert(key.clone(), vector.clone())?;
401 let retrieved = cache.get(&key).expect("get returned None");
402
403 assert_eq!(retrieved.as_f32(), vector.as_f32());
404
405 let stats = cache.get_stats();
407 assert_eq!(stats.total_requests, 1);
408 assert_eq!(stats.memory_hits, 1);
409 Ok(())
410 }
411
412 #[test]
413 fn test_cache_expiration() -> Result<()> {
414 let config = CacheConfig {
415 max_memory_entries: 10,
416 ttl: Some(Duration::from_millis(10)),
417 ..Default::default()
418 };
419
420 let mut cache = MemoryCache::new(config);
421
422 let key = CacheKey::new("test", "expiring");
423 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
424 let entry = CacheEntry::new(vector).with_ttl(Duration::from_millis(10));
425
426 cache.insert(key.clone(), entry)?;
427
428 assert!(cache.get(&key).is_some());
430
431 std::thread::sleep(Duration::from_millis(20));
433
434 assert!(cache.get(&key).is_none());
436 Ok(())
437 }
438
439 #[test]
440 fn test_arc_eviction_policy() -> Result<()> {
441 let config = CacheConfig {
442 max_memory_entries: 3,
443 eviction_policy: EvictionPolicy::ARC,
444 ..Default::default()
445 };
446
447 let mut cache = MemoryCache::new(config);
448
449 let key1 = CacheKey::new("test", "arc1");
450 let key2 = CacheKey::new("test", "arc2");
451 let key3 = CacheKey::new("test", "arc3");
452 let key4 = CacheKey::new("test", "arc4");
453
454 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
455
456 cache.insert(key1.clone(), CacheEntry::new(vector.clone()))?;
458 cache.insert(key2.clone(), CacheEntry::new(vector.clone()))?;
459 cache.insert(key3.clone(), CacheEntry::new(vector.clone()))?;
460
461 cache.get(&key1);
463 cache.get(&key1);
464 cache.get(&key1);
465
466 cache.insert(key4.clone(), CacheEntry::new(vector.clone()))?;
468
469 assert!(cache.get(&key1).is_some());
471
472 assert_eq!(cache.entries.len(), 3);
474 Ok(())
475 }
476
477 #[test]
478 fn test_cache_warmer() -> Result<()> {
479 let temp_dir = TempDir::new()?;
480
481 let config = CacheConfig {
482 max_memory_entries: 10,
483 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
484 enable_persistent: true,
485 ..Default::default()
486 };
487
488 let cache = Arc::new(MultiLevelCache::new(config)?);
489 let warmer = CacheWarmer::new(Arc::clone(&cache));
490
491 let test_data = vec![
493 (CacheKey::new("test", "warm1"), Vector::new(vec![1.0, 2.0])),
494 (CacheKey::new("test", "warm2"), Vector::new(vec![3.0, 4.0])),
495 (CacheKey::new("test", "warm3"), Vector::new(vec![5.0, 6.0])),
496 ];
497
498 let loaded_count = warmer.warm_with_data(test_data.clone())?;
500 assert_eq!(loaded_count, 3);
501
502 for (key, expected_vector) in test_data {
504 let cached_vector = cache.get(&key).expect("cached vector was None");
505 assert_eq!(cached_vector.as_f32(), expected_vector.as_f32());
506 }
507 Ok(())
508 }
509
510 #[test]
511 fn test_cache_warmer_with_generator() -> Result<()> {
512 let temp_dir = TempDir::new()?;
513
514 let config = CacheConfig {
515 max_memory_entries: 10,
516 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
517 enable_persistent: true,
518 ..Default::default()
519 };
520
521 let cache = Arc::new(MultiLevelCache::new(config)?);
522 let warmer = CacheWarmer::new(Arc::clone(&cache));
523
524 let loaded_count = warmer.warm_with_generator(5, |i| {
526 Some((
527 CacheKey::new("generated", format!("item_{i}")),
528 Vector::new(vec![i as f32, (i * 2) as f32]),
529 ))
530 })?;
531
532 assert_eq!(loaded_count, 5);
533
534 for i in 0..5 {
536 let key = CacheKey::new("generated", format!("item_{i}"));
537 let cached_vector = cache.get(&key).expect("cached vector was None");
538 assert_eq!(cached_vector.as_f32(), vec![i as f32, (i * 2) as f32]);
539 }
540 Ok(())
541 }
542
543 #[test]
544 fn test_cache_analyzer() -> Result<()> {
545 let temp_dir = TempDir::new()?;
546
547 let config = CacheConfig {
548 max_memory_entries: 10,
549 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
550 enable_persistent: true,
551 ..Default::default()
552 };
553
554 let cache = Arc::new(MultiLevelCache::new(config)?);
555 let invalidator = Arc::new(CacheInvalidator::new(Arc::clone(&cache)));
556 let analyzer = CacheAnalyzer::new(Arc::clone(&cache), Arc::clone(&invalidator));
557
558 let key1 = CacheKey::new("test", "analyze1");
560 let key2 = CacheKey::new("test", "analyze2");
561 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
562
563 cache.insert(key1.clone(), vector.clone())?;
564 cache.insert(key2.clone(), vector.clone())?;
565
566 cache.get(&key1);
568 cache.get(&key2);
569 cache.get(&key1); cache.get(&CacheKey::new("test", "nonexistent")); let report = analyzer.analyze();
574
575 assert!(report.hit_ratio > 0.0);
576 assert!(report.memory_utilization >= 0.0 && report.memory_utilization <= 1.0);
577 assert!(report.performance_score >= 0.0 && report.performance_score <= 1.0);
578
579 let recommendations = analyzer.get_optimization_recommendations();
581 assert!(!recommendations.is_empty());
582 Ok(())
583 }
584
585 #[test]
586 fn test_background_cache_worker() -> Result<()> {
587 let temp_dir = TempDir::new()?;
588
589 let config = CacheConfig {
590 max_memory_entries: 10,
591 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
592 enable_persistent: true,
593 enable_background_updates: true,
594 background_update_interval: Duration::from_millis(100),
595 ..Default::default()
596 };
597
598 let cache = Arc::new(MultiLevelCache::new(config.clone())?);
599 let invalidator = Arc::new(CacheInvalidator::new(Arc::clone(&cache)));
600 let mut worker =
601 BackgroundCacheWorker::new(Arc::clone(&cache), Arc::clone(&invalidator), config);
602
603 worker.start()?;
605
606 let key = CacheKey::new("test", "background");
608 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
609 cache.insert(key.clone(), vector.clone())?;
610
611 std::thread::sleep(Duration::from_millis(150));
613
614 worker.stop()?;
616
617 assert!(cache.get(&key).is_some());
619 Ok(())
620 }
621
622 #[test]
623 fn test_cache_invalidation_by_tag() -> Result<()> {
624 let temp_dir = TempDir::new()?;
625
626 let config = CacheConfig {
627 max_memory_entries: 10,
628 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
629 enable_persistent: true,
630 ..Default::default()
631 };
632
633 let cache = Arc::new(MultiLevelCache::new(config)?);
634 let invalidator = Arc::new(CacheInvalidator::new(Arc::clone(&cache)));
635
636 let key1 = CacheKey::new("test", "tagged1");
638 let key2 = CacheKey::new("test", "tagged2");
639 let key3 = CacheKey::new("test", "tagged3");
640
641 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
642
643 cache.insert(key1.clone(), vector.clone())?;
644 cache.insert(key2.clone(), vector.clone())?;
645 cache.insert(key3.clone(), vector.clone())?;
646
647 let mut tags1 = HashMap::new();
649 tags1.insert("category".to_string(), "embeddings".to_string());
650 invalidator.register_entry(&key1, &tags1);
651
652 let mut tags2 = HashMap::new();
653 tags2.insert("category".to_string(), "embeddings".to_string());
654 invalidator.register_entry(&key2, &tags2);
655
656 let mut tags3 = HashMap::new();
657 tags3.insert("category".to_string(), "vectors".to_string());
658 invalidator.register_entry(&key3, &tags3);
659
660 let invalidated_count = invalidator.invalidate_by_tag("category", "embeddings")?;
662 assert_eq!(invalidated_count, 2);
663
664 assert!(cache.get(&key1).is_none());
666 assert!(cache.get(&key2).is_none());
667
668 assert!(cache.get(&key3).is_some());
670 Ok(())
671 }
672
673 #[test]
674 fn test_cache_invalidation_by_namespace() -> Result<()> {
675 let temp_dir = TempDir::new()?;
676
677 let config = CacheConfig {
678 max_memory_entries: 10,
679 persistent_cache_dir: Some(temp_dir.path().to_path_buf()),
680 enable_persistent: true,
681 ..Default::default()
682 };
683
684 let cache = Arc::new(MultiLevelCache::new(config)?);
685 let invalidator = Arc::new(CacheInvalidator::new(Arc::clone(&cache)));
686
687 let key1 = CacheKey::new("embeddings", "item1");
689 let key2 = CacheKey::new("embeddings", "item2");
690 let key3 = CacheKey::new("vectors", "item3");
691
692 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
693
694 cache.insert(key1.clone(), vector.clone())?;
695 cache.insert(key2.clone(), vector.clone())?;
696 cache.insert(key3.clone(), vector.clone())?;
697
698 invalidator.register_entry(&key1, &HashMap::new());
700 invalidator.register_entry(&key2, &HashMap::new());
701 invalidator.register_entry(&key3, &HashMap::new());
702
703 let invalidated_count = invalidator.invalidate_namespace("embeddings")?;
705 assert_eq!(invalidated_count, 2);
706
707 assert!(cache.get(&key1).is_none());
709 assert!(cache.get(&key2).is_none());
710
711 assert!(cache.get(&key3).is_some());
713 Ok(())
714 }
715}