Skip to main content

offline_intelligence/cache_management/
cache_manager.rs

1//! Main KV cache management engine
2
3use crate::memory::Message;
4use crate::memory_db::MemoryDatabase;
5use crate::cache_management::cache_config::{KVCacheConfig, SnapshotStrategy};
6use crate::cache_management::cache_extractor::{CacheExtractor, ExtractedCacheEntry, KVEntry};
7use crate::cache_management::cache_scorer::{CacheEntryScorer, CacheScoringConfig};
8use crate::cache_management::cache_bridge::CacheContextBridge;
9
10use std::sync::Arc;
11use std::collections::HashMap;
12use tracing::{info, debug};
13use chrono::{Utc, DateTime};
14use serde::Serialize;
15
16/// Main KV cache management engine
17pub struct KVCacheManager {
18    config: KVCacheConfig,
19    database: Arc<MemoryDatabase>,
20    cache_extractor: CacheExtractor,
21    cache_scorer: CacheEntryScorer,
22    context_bridge: CacheContextBridge,
23    statistics: CacheStatistics,
24    session_state: HashMap<String, SessionCacheState>,
25}
26
27#[derive(Debug, Clone)]
28pub struct KvSnapshot {
29    pub id: i64,
30    pub session_id: String,
31    pub message_id: i64,
32    pub snapshot_type: String,
33    pub size_bytes: i64,
34    pub created_at: chrono::DateTime<chrono::Utc>,
35}
36
37#[derive(Debug, Clone, Serialize)]
38pub struct SessionCacheState {
39    pub session_id: String,
40    pub conversation_count: usize,
41    pub last_cleared_at: Option<DateTime<Utc>>,
42    pub last_snapshot_id: Option<i64>,
43    pub cache_size_bytes: usize,
44    pub entry_count: usize,
45    pub metadata: HashMap<String, String>,
46}
47
48#[derive(Debug, Clone, Serialize, Default)]
49pub struct CacheStatistics {
50    pub total_clears: usize,
51    pub total_retrievals: usize,
52    pub entries_preserved: usize,
53    pub entries_cleared: usize,
54    pub entries_retrieved: usize,
55    pub last_operation: Option<DateTime<Utc>>,
56    pub operation_history: Vec<CacheOperation>,
57}
58
59#[derive(Debug, Clone, Serialize)]
60pub struct CacheOperation {
61    pub operation_type: CacheOperationType,
62    pub timestamp: DateTime<Utc>,
63    pub entries_affected: usize,
64    pub session_id: String,
65    pub details: String,
66}
67
68#[derive(Debug, Clone, Serialize)]
69pub enum CacheOperationType {
70    Clear,
71    Retrieve,
72    Snapshot,
73    Restore,
74}
75
76#[derive(Debug, Clone, Serialize)]
77pub enum ClearReason {
78    ConversationLimit,
79    MemoryThreshold,
80    Manual,
81    ErrorRecovery,
82}
83
84#[derive(Debug, Clone)]
85pub struct CacheClearResult {
86    pub entries_to_keep: Vec<ExtractedCacheEntry>,
87    pub entries_cleared: usize,
88    pub bridge_message: String,
89    pub snapshot_id: Option<i64>,
90    pub preserved_keywords: Vec<String>,
91    pub clear_reason: ClearReason,
92}
93
94#[derive(Debug, Clone, Default)]
95pub struct RetrievalResult {
96    pub retrieved_entries: Vec<RetrievedEntry>,
97    pub bridge_message: Option<String>,
98    pub search_duration_ms: u64,
99    pub keywords_used: Vec<String>,
100    pub tiers_searched: Vec<u8>,
101}
102
103#[derive(Debug, Clone)]
104pub struct RetrievedEntry {
105    pub entry: KVEntry,
106    pub similarity_score: f32,
107    pub source_tier: u8,
108    pub matched_keywords: Vec<String>,
109    pub retrieval_time: DateTime<Utc>,
110}
111
112#[derive(Debug, Clone)]
113pub struct CacheProcessingResult {
114    pub should_clear_cache: bool,
115    pub clear_result: Option<CacheClearResult>,
116    pub should_retrieve: bool,
117    pub retrieval_result: Option<RetrievalResult>,
118    pub bridge_messages: Vec<String>,
119    pub updated_session_state: SessionCacheState,
120}
121
122impl KVCacheManager {
123    /// Create a new KV cache manager
124    pub fn new(
125        config: KVCacheConfig,
126        database: Arc<MemoryDatabase>,
127    ) -> anyhow::Result<Self> {
128        let cache_extractor = CacheExtractor::new(Default::default());
129        
130        let scoring_config = CacheScoringConfig::default();
131        let cache_scorer = CacheEntryScorer::new(scoring_config);
132        
133        let context_bridge = CacheContextBridge::new(20);
134        
135        Ok(Self {
136            config,
137            database,
138            cache_extractor,
139            cache_scorer,
140            context_bridge,
141            statistics: CacheStatistics::new(),
142            session_state: HashMap::new(),
143        })
144    }
145    
146    /// Initialize or get session state
147    fn get_or_create_session_state(&mut self, session_id: &str) -> &mut SessionCacheState {
148        self.session_state.entry(session_id.to_string())
149            .or_insert_with(|| SessionCacheState {
150                session_id: session_id.to_string(),
151                conversation_count: 0,
152                last_cleared_at: None,
153                last_snapshot_id: None,
154                cache_size_bytes: 0,
155                entry_count: 0,
156                metadata: HashMap::new(),
157            })
158    }
159    
160    /// Process a conversation and manage cache
161    pub async fn process_conversation(
162        &mut self,
163        session_id: &str,
164        messages: &[Message],
165        current_kv_entries: &[KVEntry],
166        current_cache_size_bytes: usize,
167        max_cache_size_bytes: usize,
168    ) -> anyhow::Result<CacheProcessingResult> {
169        debug!("Processing conversation for session: {}", session_id);
170        
171        // First, check conditions without mutable borrow
172        let current_conversation_count = self.session_state
173            .get(session_id)
174            .map(|s| s.conversation_count)
175            .unwrap_or(0);
176        
177        let should_clear_by_conversation = self.should_clear_by_conversation(current_conversation_count + 1);
178        let should_clear_by_memory = self.should_clear_by_memory(current_cache_size_bytes, max_cache_size_bytes);
179        
180        // Now get mutable reference
181        let session_state = self.get_or_create_session_state(session_id);
182        session_state.conversation_count += 1;
183        session_state.cache_size_bytes = current_cache_size_bytes;
184        session_state.entry_count = current_kv_entries.len();
185        
186        let mut result = CacheProcessingResult {
187            should_clear_cache: false,
188            clear_result: None,
189            should_retrieve: false,
190            retrieval_result: None,
191            bridge_messages: Vec::new(),
192            updated_session_state: session_state.clone(),
193        };
194        
195        if should_clear_by_conversation || should_clear_by_memory {
196            let clear_reason = if should_clear_by_conversation {
197                ClearReason::ConversationLimit
198            } else {
199                ClearReason::MemoryThreshold
200            };
201            
202            // Release the mutable borrow before calling clear_cache
203            let _ = session_state;
204            
205            let clear_result = self.clear_cache(session_id, current_kv_entries, clear_reason).await?;
206            result.should_clear_cache = true;
207            result.clear_result = Some(clear_result.clone());
208            result.bridge_messages.push(clear_result.bridge_message);
209            
210            // Update session state after clearing
211            if let Some(state) = self.session_state.get_mut(session_id) {
212                state.conversation_count = 0;
213                state.last_cleared_at = Some(Utc::now());
214                result.updated_session_state = state.clone();
215            }
216        }
217        
218        // Check if we should retrieve context
219        let should_retrieve = self.should_retrieve_context(messages);
220        if should_retrieve {
221            let last_user_message = messages.iter()
222                .rev()
223                .find(|m| m.role == "user")
224                .map(|m| &m.content)
225                .map_or("", |v| v);
226            
227            if !last_user_message.is_empty() {
228                let retrieval_result = self.retrieve_context(session_id, last_user_message, current_kv_entries).await?;
229                if !retrieval_result.retrieved_entries.is_empty() {
230                    result.should_retrieve = true;
231                    result.retrieval_result = Some(retrieval_result.clone());
232                    if let Some(bridge_msg) = &retrieval_result.bridge_message {
233                        result.bridge_messages.push(bridge_msg.clone());
234                    }
235                }
236            }
237        }
238        
239        // Update database metadata
240        if let Some(state) = self.session_state.get(session_id) {
241            self.update_session_metadata(session_id, state).await?;
242        }
243        
244        Ok(result)
245    }
246    
247    /// Check if cache needs to be cleared based on conversation count
248    pub fn should_clear_by_conversation(&self, conversation_count: usize) -> bool {
249        conversation_count >= self.config.clear_after_conversations
250    }
251    
252    /// Check if cache needs to be cleared based on memory usage
253    pub fn should_clear_by_memory(&self, current_usage_bytes: usize, max_memory_bytes: usize) -> bool {
254        if max_memory_bytes == 0 {
255            return false;
256        }
257        
258        let usage_percent = current_usage_bytes as f32 / max_memory_bytes as f32;
259        usage_percent >= self.config.memory_threshold_percent
260    }
261    
262    /// Check if we should retrieve context for current messages
263    fn should_retrieve_context(&self, messages: &[Message]) -> bool {
264        if !self.config.retrieval_enabled {
265            return false;
266        }
267        
268        // Check last user message for complex queries
269        if let Some(last_user) = messages.iter().rev().find(|m| m.role == "user") {
270            let content = &last_user.content;
271            // Retrieve for questions, complex requests, or code
272            content.contains('?') ||
273            content.len() > 100 ||
274            content.contains("```") ||
275            content.contains("explain") ||
276            content.contains("how to") ||
277            content.contains("what is")
278        } else {
279            false
280        }
281    }
282    
283    /// Clear KV cache intelligently
284    pub async fn clear_cache(
285        &mut self,
286        session_id: &str,
287        current_entries: &[KVEntry],
288        reason: ClearReason,
289    ) -> anyhow::Result<CacheClearResult> {
290        info!("Clearing KV cache for session {}: {:?}", session_id, reason);
291        
292        let start_time = std::time::Instant::now();
293        
294        // 1. Extract important entries
295        let extracted = self.cache_extractor.extract_entries(current_entries, &self.cache_scorer);
296        
297        // 2. Filter entries to preserve
298        let to_preserve = self.cache_extractor.filter_preserved_entries(
299            &extracted,
300            self.config.min_importance_to_preserve,
301            self.config.preserve_system_prompts,
302            self.config.preserve_code_entries,
303        );
304        
305        // 3. Create snapshot if configured
306        let snapshot_id = if self.should_create_snapshot(&reason) {
307            Some(self.create_snapshot(session_id, &to_preserve).await?)
308        } else {
309            None
310        };
311        
312        // 4. Extract keywords from preserved entries
313        let preserved_keywords: Vec<String> = to_preserve.iter()
314            .flat_map(|e| e.keywords.clone())
315            .take(10)
316            .collect();
317        
318        // 5. Generate bridge message
319        let bridge_message = self.context_bridge.create_clear_bridge(
320            current_entries.len().saturating_sub(to_preserve.len()),
321            to_preserve.len(),
322            &preserved_keywords,
323        );
324        
325        // 6. Update statistics
326        self.statistics.record_clear(
327            current_entries.len(),
328            to_preserve.len(),
329            reason.clone(),
330            session_id,
331        );
332        
333        // 7. Update session state
334        if let Some(state) = self.session_state.get_mut(session_id) {
335            state.entry_count = to_preserve.len();
336            state.last_snapshot_id = snapshot_id;
337            state.last_cleared_at = Some(Utc::now());
338            state.metadata.insert("last_clear_reason".to_string(), format!("{:?}", reason));
339        }
340        
341        let duration = start_time.elapsed();
342        debug!("Cache clear completed in {:?}", duration);
343        
344        Ok(CacheClearResult {
345            entries_to_keep: to_preserve.clone(), // CLONE FIXED HERE
346            entries_cleared: current_entries.len().saturating_sub(to_preserve.len()),
347            bridge_message,
348            snapshot_id,
349            preserved_keywords,
350            clear_reason: reason,
351        })
352    }
353    
354    /// Check if we should create a snapshot
355    fn should_create_snapshot(&self, reason: &ClearReason) -> bool {
356        if !self.config.enabled {
357            return false;
358        }
359        
360        match &self.config.snapshot_strategy {
361            SnapshotStrategy::None => false,
362            SnapshotStrategy::Full { interval_conversations: _ } => true,
363            SnapshotStrategy::Incremental { interval_conversations: _, max_snapshots: _ } => {
364                matches!(reason, ClearReason::ConversationLimit)
365            }
366            SnapshotStrategy::Adaptive { min_importance_threshold: _, max_snapshots: _ } => true,
367        }
368    }
369    
370    /// Create a snapshot of preserved entries
371    async fn create_snapshot(
372        &self,
373        session_id: &str,
374        preserved_entries: &[ExtractedCacheEntry],
375    ) -> anyhow::Result<i64> {
376        debug!("Creating KV snapshot for session: {}", session_id);
377        
378        // Convert to database format
379        let db_entries: Vec<KVEntry> = preserved_entries.iter()
380            .map(|entry| {
381                KVEntry {
382                    key_hash: entry.key_hash.clone(),
383                    key_data: entry.key_data.clone(),
384                    value_data: entry.value_data.clone(),
385                    key_type: entry.entry_type.to_string(),
386                    layer_index: entry.layer_index,
387                    head_index: entry.head_index,
388                    importance_score: entry.importance_score,
389                    access_count: entry.access_count,
390                    last_accessed: Utc::now(),
391                }
392            })
393            .collect();
394        
395        // Store in database
396        let snapshot_id = self.database.create_kv_snapshot(session_id, &db_entries).await?;
397        
398        info!("Created KV snapshot {} with {} entries", snapshot_id, db_entries.len());
399        Ok(snapshot_id)
400    }
401    
402    /// Retrieve relevant context from all tiers
403    pub async fn retrieve_context(
404        &mut self,
405        session_id: &str,
406        query: &str,
407        current_cache_entries: &[KVEntry],
408    ) -> anyhow::Result<RetrievalResult> {
409        debug!("Retrieving context for query: {}", query);
410        
411        let start_time = std::time::Instant::now();
412        let keywords = self.extract_keywords(query);
413        
414        let mut results = Vec::new();
415        let mut searched_tiers = Vec::new();
416        
417        // Tier 1: Search active cache
418        if !current_cache_entries.is_empty() {
419            searched_tiers.push(1);
420            let tier1_results = self.search_tier1(current_cache_entries, &keywords).await?;
421            results.extend(tier1_results);
422        }
423        
424        // Tier 2: Search KV snapshots if Tier 1 insufficient
425        if results.len() < 5 {
426            searched_tiers.push(2);
427            let tier2_results = self.search_tier2(session_id, &keywords).await?;
428            results.extend(tier2_results);
429        }
430        
431        // Tier 3: Search complete messages if still insufficient
432        if results.len() < 3 {
433            searched_tiers.push(3);
434            let tier3_results = self.search_tier3(session_id, &keywords).await?;
435            results.extend(tier3_results);
436        }
437        
438        // Sort all results by similarity score
439        results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
440            .unwrap_or(std::cmp::Ordering::Equal));
441        
442        // Limit total results
443        results.truncate(20);
444        
445        // Update engagement scores for retrieved entries
446        for result in &results {
447            self.cache_scorer.update_engagement(&result.entry.key_hash, true);
448        }
449        
450        // Generate bridge message if we found results
451        let bridge_message = if !results.is_empty() {
452            let primary_tier = results.iter()
453                .map(|r| r.source_tier)
454                .max()
455                .unwrap_or(1);
456            
457            let avg_similarity = results.iter()
458                .map(|r| r.similarity_score)
459                .sum::<f32>() / results.len() as f32;
460            
461            Some(self.context_bridge.create_retrieval_bridge(
462                results.len(),
463                primary_tier,
464                &keywords,
465                Some(avg_similarity),
466            ))
467        } else {
468            None
469        };
470        
471        let duration = start_time.elapsed();
472        
473        // Update statistics
474        self.statistics.record_retrieval(
475            results.len(),
476            searched_tiers.clone(),
477            keywords.len(),
478            session_id,
479        );
480        
481        Ok(RetrievalResult {
482            retrieved_entries: results,
483            bridge_message,
484            search_duration_ms: duration.as_millis() as u64,
485            keywords_used: keywords,
486            tiers_searched: searched_tiers,
487        })
488    }
489    
490    /// Search Tier 1 (Active KV cache)
491    async fn search_tier1(
492        &self,
493        entries: &[KVEntry],
494        keywords: &[String],
495    ) -> anyhow::Result<Vec<RetrievedEntry>> {
496        let mut results = Vec::new();
497        
498        for entry in entries {
499            let similarity = self.calculate_keyword_similarity(entry, keywords);
500            if similarity > 0.3 { // Threshold for Tier 1
501                let matched_keywords = self.get_matching_keywords(entry, keywords);
502                results.push(RetrievedEntry {
503                    entry: entry.clone(),
504                    similarity_score: similarity,
505                    source_tier: 1,
506                    matched_keywords,
507                    retrieval_time: Utc::now(),
508                });
509            }
510        }
511        
512        // Sort by similarity and access count
513        results.sort_by(|a, b| {
514            b.similarity_score.partial_cmp(&a.similarity_score)
515                .unwrap_or(std::cmp::Ordering::Equal)
516                .then(b.entry.access_count.cmp(&a.entry.access_count))
517        });
518        
519        results.truncate(10); // Limit results
520        
521        debug!("Tier 1 search found {} results", results.len());
522        Ok(results)
523    }
524    
525    /// Search Tier 2 (KV snapshots)
526    async fn search_tier2(
527        &self,
528        session_id: &str,
529        keywords: &[String],
530    ) -> anyhow::Result<Vec<RetrievedEntry>> {
531        // Get recent snapshots (max 3 for performance)
532        let snapshots = self.database.get_recent_kv_snapshots(session_id, 3).await?;
533        
534        let mut all_results = Vec::new();
535        
536        for snapshot in snapshots {
537            // Search snapshot entries
538            let entries = self.database.get_kv_snapshot_entries(snapshot.id).await?;
539            
540            for entry in entries {
541                let similarity = self.calculate_keyword_similarity(&entry, keywords);
542                if similarity > 0.4 { // Higher threshold for Tier 2
543                    let matched_keywords = self.get_matching_keywords(&entry, keywords);
544                    all_results.push(RetrievedEntry {
545                        entry,
546                        similarity_score: similarity,
547                        source_tier: 2,
548                        matched_keywords,
549                        retrieval_time: Utc::now(),
550                    });
551                }
552            }
553        }
554        
555        // Sort and limit
556        all_results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
557            .unwrap_or(std::cmp::Ordering::Equal));
558        all_results.truncate(15);
559        
560        debug!("Tier 2 search found {} results", all_results.len());
561        Ok(all_results)
562    }
563    
564    /// Search Tier 3 (Complete messages)
565    async fn search_tier3(
566        &self,
567        session_id: &str,
568        keywords: &[String],
569    ) -> anyhow::Result<Vec<RetrievedEntry>> {
570        if keywords.is_empty() {
571            return Ok(Vec::new());
572        }
573        
574        // Search messages by keywords
575        let messages = self.database.conversations.search_messages_by_keywords(
576            session_id,
577            keywords,
578            20,
579        ).await?;
580        
581        let mut results = Vec::new();
582        
583        for message in messages {
584            // Convert message to KV entry for consistency
585            let entry = KVEntry {
586                key_hash: format!("msg_{}", message.id),
587                key_data: Some(message.content.as_bytes().to_vec()),
588                value_data: message.content.as_bytes().to_vec(),
589                key_type: "message".to_string(),
590                layer_index: 0,
591                head_index: None,
592                importance_score: message.importance_score,
593                access_count: 1,
594                last_accessed: message.timestamp,
595            };
596            
597            let similarity = self.calculate_keyword_similarity(&entry, keywords);
598            if similarity > 0.5 { // Highest threshold for Tier 3
599                results.push(RetrievedEntry {
600                    entry,
601                    similarity_score: similarity,
602                    source_tier: 3,
603                    matched_keywords: keywords.to_vec(),
604                    retrieval_time: Utc::now(),
605                });
606            }
607        }
608        
609        // Sort by timestamp (most recent first) and similarity
610        results.sort_by(|a, b| {
611            b.entry.last_accessed.cmp(&a.entry.last_accessed)
612                .then(b.similarity_score.partial_cmp(&a.similarity_score)
613                    .unwrap_or(std::cmp::Ordering::Equal))
614        });
615        
616        results.truncate(10);
617        
618        debug!("Tier 3 search found {} results", results.len());
619        Ok(results)
620    }
621    
622    fn calculate_keyword_similarity(&self, entry: &KVEntry, keywords: &[String]) -> f32 {
623        if keywords.is_empty() {
624            return 0.0;
625        }
626        
627        let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
628        if entry_keywords.is_empty() {
629            return 0.0;
630        }
631        
632        // Simple keyword matching with partial matches
633        let mut matches = 0.0;
634        for keyword in keywords {
635            let keyword_lower = keyword.to_lowercase();
636            for entry_keyword in &entry_keywords {
637                let entry_lower = entry_keyword.to_lowercase();
638                if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
639                    matches += 1.0;
640                    break;
641                }
642            }
643        }
644        
645        matches / keywords.len() as f32
646    }
647    
648    fn get_matching_keywords(&self, entry: &KVEntry, keywords: &[String]) -> Vec<String> {
649        let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
650        let mut matches = Vec::new();
651        
652        for keyword in keywords {
653            let keyword_lower = keyword.to_lowercase();
654            for entry_keyword in &entry_keywords {
655                let entry_lower = entry_keyword.to_lowercase();
656                if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
657                    matches.push(keyword.clone());
658                    break;
659                }
660            }
661        }
662        
663        matches
664    }
665    
666    fn extract_keywords(&self, text: &str) -> Vec<String> {
667        let words: Vec<&str> = text.split_whitespace().collect();
668        words.iter()
669            .filter(|w| w.len() > 3)
670            .map(|w| w.to_lowercase())
671            .filter(|w| !self.is_stop_word(w))
672            .collect()
673    }
674    
675    fn is_stop_word(&self, word: &str) -> bool {
676        let stop_words = [
677            "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
678            "of", "with", "by", "is", "am", "are", "was", "were", "be", "been",
679            "being", "have", "has", "had", "do", "does", "did", "will", "would",
680            "shall", "should", "may", "might", "must", "can", "could",
681        ];
682        stop_words.contains(&word)
683    }
684    
685    /// Update session metadata in database
686    async fn update_session_metadata(
687        &self,
688        session_id: &str,
689        state: &SessionCacheState,
690    ) -> anyhow::Result<()> {
691        self.database.update_kv_cache_metadata(session_id, state).await
692    }
693    
694    /// Get cache statistics
695    pub fn get_statistics(&self) -> &CacheStatistics {
696        &self.statistics
697    }
698    
699    /// Get session state
700    pub fn get_session_state(&self, session_id: &str) -> Option<&SessionCacheState> {
701        self.session_state.get(session_id)
702    }
703    
704    /// Get all session states
705    pub fn get_all_session_states(&self) -> &HashMap<String, SessionCacheState> {
706        &self.session_state
707    }
708    
709    /// Restore cache from snapshot
710    pub async fn restore_from_snapshot(
711        &mut self,
712        session_id: &str,
713        snapshot_id: i64,
714    ) -> anyhow::Result<Vec<KVEntry>> {
715        info!("Restoring cache from snapshot {} for session {}", snapshot_id, session_id);
716        
717        let entries: Vec<KVEntry> = self.database.get_kv_snapshot_entries(snapshot_id).await?;
718        
719        // Update session state
720        if let Some(state) = self.session_state.get_mut(session_id) {
721            state.entry_count = entries.len();
722            state.last_snapshot_id = Some(snapshot_id);
723        }
724        
725        // Generate bridge message
726        let bridge_message = self.context_bridge.create_restore_bridge(
727            entries.len(),
728            None, // Could calculate snapshot age if needed
729        );
730        
731        info!("{}", bridge_message);
732        
733        // Update statistics
734        self.statistics.record_restore(entries.len(), session_id);
735        
736        Ok(entries)
737    }
738    
739    /// Manual cache clear (for testing or admin purposes)
740    pub async fn manual_clear_cache(
741        &mut self,
742        session_id: &str,
743        current_entries: &[KVEntry],
744    ) -> anyhow::Result<CacheClearResult> {
745        self.clear_cache(session_id, current_entries, ClearReason::Manual).await
746    }
747    
748    /// Check cache health and perform maintenance if needed
749    pub async fn perform_maintenance(&mut self) -> anyhow::Result<MaintenanceResult> {
750        let mut result = MaintenanceResult {
751            sessions_cleaned: 0,
752            snapshots_pruned: 0,
753            errors: Vec::new(),
754        };
755        
756        // Clean up old session states (inactive for > 24 hours)
757        let cutoff = Utc::now() - chrono::Duration::hours(24);
758        let sessions_to_clean: Vec<String> = self.session_state.iter()
759            .filter(|(_, state)| {
760                state.last_cleared_at.is_none_or(|dt| dt < cutoff)
761            })
762            .map(|(id, _)| id.clone())
763            .collect();
764        
765        for session_id in sessions_to_clean {
766            if let Err(e) = self.cleanup_session(&session_id).await {
767                result.errors.push(format!("Failed to cleanup session {}: {}", session_id, e));
768            } else {
769                result.sessions_cleaned += 1;
770            }
771        }
772        
773        // Prune old snapshots if configured
774        if let SnapshotStrategy::Incremental { max_snapshots, .. } = &self.config.snapshot_strategy {
775            let pruned = self.prune_old_snapshots(*max_snapshots).await?;
776            result.snapshots_pruned = pruned;
777        }
778        
779        Ok(result)
780    }
781    
782    /// Cleanup a specific session
783    async fn cleanup_session(&mut self, session_id: &str) -> anyhow::Result<()> {
784        self.session_state.remove(session_id);
785        self.database.cleanup_session_snapshots(session_id).await?;
786        Ok(())
787    }
788    
789    /// Prune old snapshots
790    async fn prune_old_snapshots(&self, keep_max: usize) -> anyhow::Result<usize> {
791        self.database.prune_old_kv_snapshots(keep_max).await
792    }
793    
794    /// Export cache statistics
795    pub fn export_statistics(&self) -> CacheStatisticsExport {
796        CacheStatisticsExport {
797            total_clears: self.statistics.total_clears,
798            total_retrievals: self.statistics.total_retrievals,
799            entries_preserved: self.statistics.entries_preserved,
800            entries_cleared: self.statistics.entries_cleared,
801            entries_retrieved: self.statistics.entries_retrieved,
802            active_sessions: self.session_state.len(),
803            last_operation: self.statistics.last_operation,
804            operation_history_count: self.statistics.operation_history.len(),
805        }
806    }
807    
808    /// Get configuration
809    pub fn get_config(&self) -> &KVCacheConfig {
810        &self.config
811    }
812    
813    /// Update configuration
814    pub fn update_config(&mut self, config: KVCacheConfig) {
815        self.config = config;
816    }
817    
818    /// Get cache scorer reference
819    pub fn cache_scorer(&self) -> &CacheEntryScorer {
820        &self.cache_scorer
821    }
822    
823    /// Get mutable cache scorer reference
824    pub fn cache_scorer_mut(&mut self) -> &mut CacheEntryScorer {
825        &mut self.cache_scorer
826    }
827    
828    /// Reset statistics
829    pub fn reset_statistics(&mut self) {
830        self.statistics = CacheStatistics::new();
831    }
832}
833
834impl CacheStatistics {
835    pub fn new() -> Self {
836        Self::default()
837    }
838
839    pub fn record_clear(
840        &mut self,
841        total_entries: usize,
842        preserved_entries: usize,
843        reason: ClearReason,
844        session_id: &str,
845    ) {
846        self.total_clears += 1;
847        self.entries_preserved += preserved_entries;
848        self.entries_cleared += total_entries - preserved_entries;
849        self.last_operation = Some(Utc::now());
850        
851        self.operation_history.push(CacheOperation {
852            operation_type: CacheOperationType::Clear,
853            timestamp: Utc::now(),
854            entries_affected: total_entries,
855            session_id: session_id.to_string(),
856            details: format!("{:?}", reason),
857        });
858        
859        // Keep only last 100 operations
860        if self.operation_history.len() > 100 {
861            self.operation_history.remove(0);
862        }
863    }
864    
865    pub fn record_retrieval(
866        &mut self,
867        retrieved_count: usize,
868        tiers_searched: Vec<u8>,
869        keywords_count: usize,
870        session_id: &str,
871    ) {
872        self.total_retrievals += 1;
873        self.entries_retrieved += retrieved_count;
874        self.last_operation = Some(Utc::now());
875        
876        self.operation_history.push(CacheOperation {
877            operation_type: CacheOperationType::Retrieve,
878            timestamp: Utc::now(),
879            entries_affected: retrieved_count,
880            session_id: session_id.to_string(),
881            details: format!("Tiers: {:?}, Keywords: {}", tiers_searched, keywords_count),
882        });
883        
884        // Keep only last 100 operations
885        if self.operation_history.len() > 100 {
886            self.operation_history.remove(0);
887        }
888    }
889    
890    pub fn record_restore(&mut self, restored_count: usize, session_id: &str) {
891        self.operation_history.push(CacheOperation {
892            operation_type: CacheOperationType::Restore,
893            timestamp: Utc::now(),
894            entries_affected: restored_count,
895            session_id: session_id.to_string(),
896            details: "Cache restored from snapshot".to_string(),
897        });
898        
899        // Keep only last 100 operations
900        if self.operation_history.len() > 100 {
901            self.operation_history.remove(0);
902        }
903    }
904    
905    pub fn record_snapshot(&mut self, snapshot_id: i64, entry_count: usize, session_id: &str) {
906        self.operation_history.push(CacheOperation {
907            operation_type: CacheOperationType::Snapshot,
908            timestamp: Utc::now(),
909            entries_affected: entry_count,
910            session_id: session_id.to_string(),
911            details: format!("Snapshot ID: {}", snapshot_id),
912        });
913        
914        // Keep only last 100 operations
915        if self.operation_history.len() > 100 {
916            self.operation_history.remove(0);
917        }
918    }
919}
920
921impl RetrievalResult {
922    pub fn new() -> Self {
923        Self::default()
924    }
925
926    pub fn total_entries(&self) -> usize {
927        self.retrieved_entries.len()
928    }
929    
930    pub fn average_similarity(&self) -> f32 {
931        if self.retrieved_entries.is_empty() {
932            return 0.0;
933        }
934        self.retrieved_entries.iter()
935            .map(|e| e.similarity_score)
936            .sum::<f32>() / self.retrieved_entries.len() as f32
937    }
938    
939    pub fn is_empty(&self) -> bool {
940        self.retrieved_entries.is_empty()
941    }
942    
943    pub fn entries_by_tier(&self, tier: u8) -> Vec<&RetrievedEntry> {
944        self.retrieved_entries.iter()
945            .filter(|e| e.source_tier == tier)
946            .collect()
947    }
948    
949    pub fn primary_tier(&self) -> u8 {
950        if self.retrieved_entries.is_empty() {
951            return 0;
952        }
953        
954        // Count entries by tier
955        let mut tier_counts = HashMap::new();
956        for entry in &self.retrieved_entries {
957            *tier_counts.entry(entry.source_tier).or_insert(0) += 1;
958        }
959        
960        // Return tier with most entries
961        tier_counts.into_iter()
962            .max_by_key(|(_, count)| *count)
963            .map(|(tier, _)| tier)
964            .unwrap_or(0)
965    }
966    
967    pub fn add_tier_results(&mut self, tier: u8, results: Vec<RetrievedEntry>) {
968        self.tiers_searched.push(tier);
969        self.retrieved_entries.extend(results);
970    }
971}
972
973#[derive(Debug, Clone, Serialize)]
974pub struct CacheStatisticsExport {
975    pub total_clears: usize,
976    pub total_retrievals: usize,
977    pub entries_preserved: usize,
978    pub entries_cleared: usize,
979    pub entries_retrieved: usize,
980    pub active_sessions: usize,
981    pub last_operation: Option<DateTime<Utc>>,
982    pub operation_history_count: usize,
983}
984
985#[derive(Debug, Clone, Serialize)]
986pub struct MaintenanceResult {
987    pub sessions_cleaned: usize,
988    pub snapshots_pruned: usize,
989    pub errors: Vec<String>,
990}