Skip to main content

offline_intelligence/cache_management/
cache_manager.rs

1//! Main KV cache management engine
2
3use crate::memory::Message;
4use crate::memory_db::MemoryDatabase;
5use crate::cache_management::cache_config::{KVCacheConfig, SnapshotStrategy};
6use crate::cache_management::cache_extractor::{CacheExtractor, ExtractedCacheEntry, KVEntry};
7use crate::cache_management::cache_scorer::{CacheEntryScorer, CacheScoringConfig};
8use crate::cache_management::cache_bridge::CacheContextBridge;
9
10use std::sync::Arc;
11use std::collections::HashMap;
12use tracing::{info, debug, warn};
13use chrono::{Utc, DateTime};
14use serde::Serialize;
15
16/// Main KV cache management engine
17pub struct KVCacheManager {
18    config: KVCacheConfig,
19    database: Arc<MemoryDatabase>,
20    cache_extractor: CacheExtractor,
21    cache_scorer: CacheEntryScorer,
22    context_bridge: CacheContextBridge,
23    statistics: CacheStatistics,
24    session_state: HashMap<String, SessionCacheState>,
25    llm_worker: Option<Arc<crate::worker_threads::LLMWorker>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct KvSnapshot {
30    pub id: i64,
31    pub session_id: String,
32    pub message_id: i64,
33    pub snapshot_type: String,
34    pub size_bytes: i64,
35    pub created_at: chrono::DateTime<chrono::Utc>,
36}
37
38#[derive(Debug, Clone, Serialize)]
39pub struct SessionCacheState {
40    pub session_id: String,
41    pub conversation_count: usize,
42    pub last_cleared_at: Option<DateTime<Utc>>,
43    pub last_snapshot_id: Option<i64>,
44    pub cache_size_bytes: usize,
45    pub entry_count: usize,
46    pub metadata: HashMap<String, String>,
47}
48
49#[derive(Debug, Clone, Serialize, Default)]
50pub struct CacheStatistics {
51    pub total_clears: usize,
52    pub total_retrievals: usize,
53    pub entries_preserved: usize,
54    pub entries_cleared: usize,
55    pub entries_retrieved: usize,
56    pub last_operation: Option<DateTime<Utc>>,
57    pub operation_history: Vec<CacheOperation>,
58}
59
60#[derive(Debug, Clone, Serialize)]
61pub struct CacheOperation {
62    pub operation_type: CacheOperationType,
63    pub timestamp: DateTime<Utc>,
64    pub entries_affected: usize,
65    pub session_id: String,
66    pub details: String,
67}
68
69#[derive(Debug, Clone, Serialize)]
70pub enum CacheOperationType {
71    Clear,
72    Retrieve,
73    Snapshot,
74    Restore,
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub enum ClearReason {
79    ConversationLimit,
80    MemoryThreshold,
81    Manual,
82    ErrorRecovery,
83}
84
85#[derive(Debug, Clone)]
86pub struct CacheClearResult {
87    pub entries_to_keep: Vec<ExtractedCacheEntry>,
88    pub entries_cleared: usize,
89    pub bridge_message: String,
90    pub snapshot_id: Option<i64>,
91    pub preserved_keywords: Vec<String>,
92    pub clear_reason: ClearReason,
93}
94
95#[derive(Debug, Clone, Default)]
96pub struct RetrievalResult {
97    pub retrieved_entries: Vec<RetrievedEntry>,
98    pub bridge_message: Option<String>,
99    pub search_duration_ms: u64,
100    pub keywords_used: Vec<String>,
101    pub tiers_searched: Vec<u8>,
102}
103
104#[derive(Debug, Clone)]
105pub struct RetrievedEntry {
106    pub entry: KVEntry,
107    pub similarity_score: f32,
108    pub source_tier: u8,
109    pub matched_keywords: Vec<String>,
110    pub retrieval_time: DateTime<Utc>,
111}
112
113#[derive(Debug, Clone)]
114pub struct CacheProcessingResult {
115    pub should_clear_cache: bool,
116    pub clear_result: Option<CacheClearResult>,
117    pub should_retrieve: bool,
118    pub retrieval_result: Option<RetrievalResult>,
119    pub bridge_messages: Vec<String>,
120    pub updated_session_state: SessionCacheState,
121}
122
123impl KVCacheManager {
124    /// Create a new KV cache manager.
125    /// Pass `llm_worker` to enable pre-clear summarization; pass `None` to disable it.
126    pub fn new(
127        config: KVCacheConfig,
128        database: Arc<MemoryDatabase>,
129        llm_worker: Option<Arc<crate::worker_threads::LLMWorker>>,
130    ) -> anyhow::Result<Self> {
131        let cache_extractor = CacheExtractor::new(Default::default());
132        let scoring_config = CacheScoringConfig::default();
133        let cache_scorer = CacheEntryScorer::new(scoring_config);
134        let context_bridge = CacheContextBridge::new(20);
135
136        Ok(Self {
137            config,
138            database,
139            cache_extractor,
140            cache_scorer,
141            context_bridge,
142            statistics: CacheStatistics::new(),
143            session_state: HashMap::new(),
144            llm_worker,
145        })
146    }
147    
148    /// Initialize or get session state
149    fn get_or_create_session_state(&mut self, session_id: &str) -> &mut SessionCacheState {
150        self.session_state.entry(session_id.to_string())
151            .or_insert_with(|| SessionCacheState {
152                session_id: session_id.to_string(),
153                conversation_count: 0,
154                last_cleared_at: None,
155                last_snapshot_id: None,
156                cache_size_bytes: 0,
157                entry_count: 0,
158                metadata: HashMap::new(),
159            })
160    }
161    
162    /// Process a conversation and manage cache
163    pub async fn process_conversation(
164        &mut self,
165        session_id: &str,
166        messages: &[Message],
167        current_kv_entries: &[KVEntry],
168        _current_cache_size_bytes: usize,
169        max_cache_size_bytes: usize,
170    ) -> anyhow::Result<CacheProcessingResult> {
171        debug!("Processing conversation for session: {}", session_id);
172        
173        // First, check conditions without mutable borrow
174        let current_conversation_count = self.session_state
175            .get(session_id)
176            .map(|s| s.conversation_count)
177            .unwrap_or(0);
178        
179        // Calculate actual cache memory usage
180        let actual_cache_memory = self.calculate_cache_memory_usage(current_kv_entries);
181        let should_clear_by_conversation = self.should_clear_by_conversation(current_conversation_count + 1);
182        let should_clear_by_memory = self.should_clear_by_memory(actual_cache_memory, max_cache_size_bytes);
183        
184        // Now get mutable reference
185        let session_state = self.get_or_create_session_state(session_id);
186        session_state.conversation_count += 1;
187        session_state.cache_size_bytes = actual_cache_memory;
188        session_state.entry_count = current_kv_entries.len();
189        
190        let mut result = CacheProcessingResult {
191            should_clear_cache: false,
192            clear_result: None,
193            should_retrieve: false,
194            retrieval_result: None,
195            bridge_messages: Vec::new(),
196            updated_session_state: session_state.clone(),
197        };
198        
199        if should_clear_by_conversation || should_clear_by_memory {
200            let clear_reason = if should_clear_by_conversation {
201                ClearReason::ConversationLimit
202            } else {
203                ClearReason::MemoryThreshold
204            };
205
206            // Release the mutable borrow before calling clear_cache
207            let _ = session_state;
208
209            let clear_result = self.clear_cache(session_id, current_kv_entries, messages, clear_reason).await?;
210            result.should_clear_cache = true;
211            result.clear_result = Some(clear_result.clone());
212            result.bridge_messages.push(clear_result.bridge_message);
213            
214            // Update session state after clearing
215            if let Some(state) = self.session_state.get_mut(session_id) {
216                state.conversation_count = 0;
217                state.last_cleared_at = Some(Utc::now());
218                result.updated_session_state = state.clone();
219            }
220        }
221        
222        // If the cache was cleared in a *previous* turn (not just now), prepend the stored
223        // summary so the LLM gets cheap continuity without recomputing the full history.
224        if !result.should_clear_cache {
225            let was_previously_cleared = self.session_state
226                .get(session_id)
227                .and_then(|s| s.last_cleared_at)
228                .is_some();
229
230            if was_previously_cleared {
231                match self.database.session_summaries.get(session_id) {
232                    Ok(Some(summary)) => {
233                        let summary_msg = format!(
234                            "[Prior conversation summary — context before memory compression:]\n{}",
235                            summary.summary_text
236                        );
237                        result.bridge_messages.insert(0, summary_msg);
238                        debug!(
239                            "Prepended pre-clear summary for session {} ({} tokens, {} messages summarized)",
240                            session_id, summary.token_count, summary.total_message_count
241                        );
242                    }
243                    Ok(None) => {}
244                    Err(e) => {
245                        debug!("Could not fetch summary for session {}: {}", session_id, e);
246                    }
247                }
248            }
249        }
250
251        // Check if we should retrieve context
252        let should_retrieve = self.should_retrieve_context(messages);
253        if should_retrieve {
254            let last_user_message = messages.iter()
255                .rev()
256                .find(|m| m.role == "user")
257                .map(|m| &m.content)
258                .map_or("", |v| v);
259            
260            if !last_user_message.is_empty() {
261                let retrieval_result = self.retrieve_context(session_id, last_user_message, current_kv_entries).await?;
262                if !retrieval_result.retrieved_entries.is_empty() {
263                    result.should_retrieve = true;
264                    result.retrieval_result = Some(retrieval_result.clone());
265                    if let Some(bridge_msg) = &retrieval_result.bridge_message {
266                        result.bridge_messages.push(bridge_msg.clone());
267                    }
268                }
269            }
270        }
271        
272        // Update database metadata
273        if let Some(state) = self.session_state.get(session_id) {
274            self.update_session_metadata(session_id, state).await?;
275            
276            // Generate embeddings for preserved KV entries if enabled
277            if self.config.generate_cache_embeddings && !current_kv_entries.is_empty() {
278                self.generate_and_store_kv_embeddings(session_id, current_kv_entries).await?;
279            }
280        }
281        
282        Ok(result)
283    }
284    
285    /// Check if cache needs to be cleared based on conversation count
286    pub fn should_clear_by_conversation(&self, conversation_count: usize) -> bool {
287        conversation_count >= self.config.clear_after_conversations
288    }
289    
290    /// Check if cache needs to be cleared based on memory usage
291    pub fn should_clear_by_memory(&self, current_usage_bytes: usize, max_memory_bytes: usize) -> bool {
292        if max_memory_bytes == 0 {
293            // If no max is set, use the configuration default
294            let max_memory = self.estimate_max_cache_memory();
295            let usage_percent = current_usage_bytes as f32 / max_memory as f32;
296            return usage_percent >= self.config.memory_threshold_percent;
297        }
298        
299        let usage_percent = current_usage_bytes as f32 / max_memory_bytes as f32;
300        usage_percent >= self.config.memory_threshold_percent
301    }
302    
303    /// Estimate maximum KV cache memory.
304    ///
305    /// Priority order:
306    /// 1. `config.max_cache_entries * ~1KB` when the operator has set an explicit entry limit
307    /// 2. 25% of system available memory (queried via sysinfo), clamped to [256 MB, 8 GB]
308    /// 3. Hard-coded 1 GB fallback if sysinfo returns zero
309    fn estimate_max_cache_memory(&self) -> usize {
310        if self.config.max_cache_entries > 0 {
311            return self.config.max_cache_entries * 1024;
312        }
313
314        let available = {
315            let mut sys = sysinfo::System::new();
316            sys.refresh_memory();
317            sys.available_memory() as usize
318        };
319
320        if available > 0 {
321            // Allocate 25% of currently available RAM to the KV cache, within sane bounds
322            let target = available / 4;
323            const MIN: usize = 256 * 1024 * 1024;  // 256 MB
324            const MAX: usize = 8 * 1024 * 1024 * 1024; // 8 GB
325            target.clamp(MIN, MAX)
326        } else {
327            1024 * 1024 * 1024 // 1 GB hard fallback
328        }
329    }
330    
331    /// Calculate the actual memory usage of current cache entries
332    pub fn calculate_cache_memory_usage(&self, entries: &[KVEntry]) -> usize {
333        entries.iter().map(|entry| entry.size_bytes).sum()
334    }
335    
336    /// Check if we should retrieve context for current messages
337    fn should_retrieve_context(&self, messages: &[Message]) -> bool {
338        if !self.config.retrieval_enabled {
339            return false;
340        }
341        
342        // Check last user message for complex queries
343        if let Some(last_user) = messages.iter().rev().find(|m| m.role == "user") {
344            let content = &last_user.content;
345            // Retrieve for questions, complex requests, or code
346            content.contains('?') ||
347            content.len() > 100 ||
348            content.contains("```") ||
349            content.contains("explain") ||
350            content.contains("how to") ||
351            content.contains("what is")
352        } else {
353            false
354        }
355    }
356    
357    /// Clear KV cache intelligently.
358    /// `messages` is the full conversation up to this point — used to generate a compact
359    /// pre-clear summary so the LLM can be re-fed cheaply when the session continues.
360    pub async fn clear_cache(
361        &mut self,
362        session_id: &str,
363        current_entries: &[KVEntry],
364        messages: &[crate::memory::Message],
365        reason: ClearReason,
366    ) -> anyhow::Result<CacheClearResult> {
367        info!("Clearing KV cache for session {}: {:?}", session_id, reason);
368
369        let start_time = std::time::Instant::now();
370
371        // 1. Extract important entries
372        let extracted = self.cache_extractor.extract_entries(current_entries, &self.cache_scorer);
373
374        // 2. Filter entries to preserve
375        let to_preserve = self.cache_extractor.filter_preserved_entries(
376            &extracted,
377            self.config.min_importance_to_preserve,
378            self.config.preserve_system_prompts,
379            self.config.preserve_code_entries,
380        );
381
382        // 3. Create snapshot if configured
383        let snapshot_id = if self.should_create_snapshot(&reason) {
384            Some(self.create_snapshot(session_id, &to_preserve).await?)
385        } else {
386            None
387        };
388
389        // 4. Extract keywords from preserved entries
390        let preserved_keywords: Vec<String> = to_preserve.iter()
391            .flat_map(|e| e.keywords.clone())
392            .take(10)
393            .collect();
394
395        // 5. Generate a compact pre-clear summary so re-feeding after the clear
396        //    costs only ~300 tokens instead of the full conversation history.
397        let summary_text = self.generate_pre_clear_summary(session_id, messages).await;
398
399        // 6. Persist the updated cumulative summary (one row per session, replaced each time).
400        if let Some(ref text) = summary_text {
401            let token_estimate = (text.len() / 4) as i32;
402            let msg_count = messages.len() as i32;
403            if let Err(e) = self.database.session_summaries.upsert(
404                session_id, text, token_estimate, msg_count,
405            ) {
406                // Non-fatal: log and continue without the summary
407                info!("Could not persist cumulative summary for {}: {}", session_id, e);
408            }
409        }
410
411        // 7. Build bridge message: use the summary if available, otherwise fall back
412        //    to the generic bridge sentence from CacheContextBridge.
413        let bridge_message = match summary_text {
414            Some(ref text) => format!(
415                "[Memory compressed — conversation summary up to this point:]\n{}",
416                text
417            ),
418            None => self.context_bridge.create_clear_bridge(
419                current_entries.len().saturating_sub(to_preserve.len()),
420                to_preserve.len(),
421                &preserved_keywords,
422            ),
423        };
424
425        // 8. Update statistics
426        self.statistics.record_clear(
427            current_entries.len(),
428            to_preserve.len(),
429            reason.clone(),
430            session_id,
431        );
432
433        // 9. Update session state
434        if let Some(state) = self.session_state.get_mut(session_id) {
435            state.entry_count = to_preserve.len();
436            state.last_snapshot_id = snapshot_id;
437            state.last_cleared_at = Some(Utc::now());
438            state.metadata.insert("last_clear_reason".to_string(), format!("{:?}", reason));
439        }
440
441        let duration = start_time.elapsed();
442        debug!("Cache clear completed in {:?}", duration);
443
444        Ok(CacheClearResult {
445            entries_to_keep: to_preserve.clone(),
446            entries_cleared: current_entries.len().saturating_sub(to_preserve.len()),
447            bridge_message,
448            snapshot_id,
449            preserved_keywords,
450            clear_reason: reason,
451        })
452    }
453
454    /// Call the LLM to produce an updated cumulative summary before clearing.
455    ///
456    /// If a previous summary exists for this session, it is included in the prompt so the
457    /// LLM produces a single unified summary covering the entire conversation history —
458    /// not just the current window. This way there is always exactly one summary per session
459    /// and it grows more complete with every cache clear.
460    ///
461    /// Returns `None` if the LLM is unavailable or there is nothing worth summarizing.
462    async fn generate_pre_clear_summary(
463        &self,
464        session_id: &str,
465        messages: &[crate::memory::Message],
466    ) -> Option<String> {
467        // Only worth summarizing if there's real content
468        if messages.len() < 4 {
469            return None;
470        }
471
472        let llm_worker = self.llm_worker.as_ref()?;
473
474        // Fetch the existing cumulative summary for this session, if any
475        let existing_summary = self.database.session_summaries.get(session_id)
476            .unwrap_or(None);
477
478        // Build system prompt — tell the LLM what it already knows vs what is new
479        let system_content = match &existing_summary {
480            Some(prev) => format!(
481                "You are a concise summarizer. You will be given a running summary of a \
482                 conversation followed by new messages that have occurred since that summary. \
483                 Produce a single updated summary that covers EVERYTHING — the prior summary \
484                 and the new messages combined. Be brief (target under 400 tokens). \
485                 Include key facts, decisions, code, numbers, and names. No commentary.\n\n\
486                 PRIOR SUMMARY (covers everything before these new messages):\n{}",
487                prev.summary_text
488            ),
489            None => "You are a concise summarizer. Summarize the following conversation \
490                     into key facts, decisions, code snippets, and figures. \
491                     Be brief — target under 300 tokens. No commentary.".to_string(),
492        };
493
494        let mut context: Vec<crate::memory::Message> = vec![
495            crate::memory::Message {
496                role: "system".to_string(),
497                content: system_content,
498            },
499        ];
500
501        // Include at most the last 40 messages to keep the summarization call cheap
502        let tail = if messages.len() > 40 {
503            &messages[messages.len() - 40..]
504        } else {
505            messages
506        };
507        context.extend_from_slice(tail);
508
509        let user_prompt = if existing_summary.is_some() {
510            "Please produce the updated cumulative summary now, covering both the prior summary and these new messages."
511        } else {
512            "Please summarize the above conversation now."
513        };
514        context.push(crate::memory::Message {
515            role: "user".to_string(),
516            content: user_prompt.to_string(),
517        });
518
519        match llm_worker.generate_response(session_id.to_string(), context).await {
520            Ok(summary) if !summary.trim().is_empty() => {
521                let clear_num = existing_summary.as_ref().map(|s| s.clear_count + 1).unwrap_or(1);
522                info!(
523                    "Generated cumulative summary #{} for session {} ({} chars)",
524                    clear_num, session_id, summary.len()
525                );
526                Some(summary)
527            }
528            Ok(_) => {
529                debug!("Pre-clear summary was empty for session {}", session_id);
530                None
531            }
532            Err(e) => {
533                info!("Pre-clear summary generation skipped for {}: {}", session_id, e);
534                None
535            }
536        }
537    }
538    
539    /// Check if we should create a snapshot
540    fn should_create_snapshot(&self, reason: &ClearReason) -> bool {
541        if !self.config.enabled {
542            return false;
543        }
544        
545        match &self.config.snapshot_strategy {
546            SnapshotStrategy::None => false,
547            SnapshotStrategy::Full { interval_conversations: _ } => true,
548            SnapshotStrategy::Incremental { interval_conversations: _, max_snapshots: _ } => {
549                matches!(reason, ClearReason::ConversationLimit)
550            }
551            SnapshotStrategy::Adaptive { min_importance_threshold: _, max_snapshots: _ } => true,
552        }
553    }
554    
555    /// Create a snapshot of preserved entries
556    async fn create_snapshot(
557        &self,
558        session_id: &str,
559        preserved_entries: &[ExtractedCacheEntry],
560    ) -> anyhow::Result<i64> {
561        debug!("Creating KV snapshot for session: {}", session_id);
562        
563        // Convert to database format
564        let db_entries: Vec<KVEntry> = preserved_entries.iter()
565            .map(|entry| {
566                KVEntry {
567                    key_hash: entry.key_hash.clone(),
568                    key_data: entry.key_data.clone(),
569                    value_data: entry.value_data.clone(),
570                    key_type: entry.entry_type.to_string(),
571                    layer_index: entry.layer_index,
572                    head_index: entry.head_index,
573                    importance_score: entry.importance_score,
574                    access_count: entry.access_count,
575                    last_accessed: Utc::now(),
576                    token_positions: None,
577                    embedding: None,
578                    size_bytes: entry.value_data.len(),
579                    is_persistent: true,
580                }
581            })
582            .collect();
583        
584        // Store in database
585        let snapshot_id = self.database.create_kv_snapshot(session_id, &db_entries).await?;
586        
587        info!("Created KV snapshot {} with {} entries", snapshot_id, db_entries.len());
588        Ok(snapshot_id)
589    }
590    
591    /// Retrieve relevant context from all tiers
592    pub async fn retrieve_context(
593        &mut self,
594        session_id: &str,
595        query: &str,
596        current_cache_entries: &[KVEntry],
597    ) -> anyhow::Result<RetrievalResult> {
598        debug!("Retrieving context for query: {}", query);
599        
600        let start_time = std::time::Instant::now();
601        let keywords = self.extract_keywords(query);
602        
603        let mut results = Vec::new();
604        let mut searched_tiers = Vec::new();
605        
606        // Tier 1: Search active cache
607        if !current_cache_entries.is_empty() {
608            searched_tiers.push(1);
609            let tier1_results = self.search_tier1(current_cache_entries, &keywords).await?;
610            results.extend(tier1_results);
611        }
612        
613        // Tier 2: Search KV snapshots if Tier 1 insufficient
614        if results.len() < 5 {
615            searched_tiers.push(2);
616            let tier2_results = self.search_tier2(session_id, &keywords).await?;
617            results.extend(tier2_results);
618        }
619        
620        // Tier 3: Search complete messages if still insufficient
621        if results.len() < 3 {
622            searched_tiers.push(3);
623            let tier3_results = self.search_tier3(session_id, &keywords).await?;
624            results.extend(tier3_results);
625        }
626        
627        // Sort all results by similarity score
628        results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
629            .unwrap_or(std::cmp::Ordering::Equal));
630        
631        // Limit total results
632        results.truncate(20);
633        
634        // Update engagement scores for retrieved entries
635        for result in &results {
636            self.cache_scorer.update_engagement(&result.entry.key_hash, true);
637        }
638        
639        // Generate bridge message if we found results
640        let bridge_message = if !results.is_empty() {
641            let primary_tier = results.iter()
642                .map(|r| r.source_tier)
643                .max()
644                .unwrap_or(1);
645            
646            let avg_similarity = results.iter()
647                .map(|r| r.similarity_score)
648                .sum::<f32>() / results.len() as f32;
649            
650            Some(self.context_bridge.create_retrieval_bridge(
651                results.len(),
652                primary_tier,
653                &keywords,
654                Some(avg_similarity),
655            ))
656        } else {
657            None
658        };
659        
660        let duration = start_time.elapsed();
661        
662        // Update statistics
663        self.statistics.record_retrieval(
664            results.len(),
665            searched_tiers.clone(),
666            keywords.len(),
667            session_id,
668        );
669        
670        Ok(RetrievalResult {
671            retrieved_entries: results,
672            bridge_message,
673            search_duration_ms: duration.as_millis() as u64,
674            keywords_used: keywords,
675            tiers_searched: searched_tiers,
676        })
677    }
678    
679    /// Search Tier 1 (Active KV cache)
680    async fn search_tier1(
681        &self,
682        entries: &[KVEntry],
683        keywords: &[String],
684    ) -> anyhow::Result<Vec<RetrievedEntry>> {
685        let mut results = Vec::new();
686        
687        for entry in entries {
688            let similarity = self.calculate_keyword_similarity(entry, keywords);
689            if similarity > 0.3 { // Threshold for Tier 1
690                let matched_keywords = self.get_matching_keywords(entry, keywords);
691                results.push(RetrievedEntry {
692                    entry: entry.clone(),
693                    similarity_score: similarity,
694                    source_tier: 1,
695                    matched_keywords,
696                    retrieval_time: Utc::now(),
697                });
698            }
699        }
700        
701        // Sort by similarity and access count
702        results.sort_by(|a, b| {
703            b.similarity_score.partial_cmp(&a.similarity_score)
704                .unwrap_or(std::cmp::Ordering::Equal)
705                .then(b.entry.access_count.cmp(&a.entry.access_count))
706        });
707        
708        results.truncate(10); // Limit results
709        
710        debug!("Tier 1 search found {} results", results.len());
711        Ok(results)
712    }
713    
714    /// Search Tier 2 (KV snapshots)
715    async fn search_tier2(
716        &self,
717        session_id: &str,
718        keywords: &[String],
719    ) -> anyhow::Result<Vec<RetrievedEntry>> {
720        // Get recent snapshots (max 3 for performance)
721        let snapshots = self.database.get_recent_kv_snapshots(session_id, 3).await?;
722        
723        let mut all_results = Vec::new();
724        
725        for snapshot in snapshots {
726            // Search snapshot entries
727            let entries = self.database.get_kv_snapshot_entries(snapshot.id).await?;
728            
729            for entry in entries {
730                let similarity = self.calculate_keyword_similarity(&entry, keywords);
731                if similarity > 0.4 { // Higher threshold for Tier 2
732                    let matched_keywords = self.get_matching_keywords(&entry, keywords);
733                    all_results.push(RetrievedEntry {
734                        entry,
735                        similarity_score: similarity,
736                        source_tier: 2,
737                        matched_keywords,
738                        retrieval_time: Utc::now(),
739                    });
740                }
741            }
742        }
743        
744        // Sort and limit
745        all_results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
746            .unwrap_or(std::cmp::Ordering::Equal));
747        all_results.truncate(15);
748        
749        debug!("Tier 2 search found {} results", all_results.len());
750        Ok(all_results)
751    }
752    
753    /// Search Tier 3 (Complete messages)
754    async fn search_tier3(
755        &self,
756        session_id: &str,
757        keywords: &[String],
758    ) -> anyhow::Result<Vec<RetrievedEntry>> {
759        if keywords.is_empty() {
760            return Ok(Vec::new());
761        }
762        
763        // Search messages by keywords
764        let messages = self.database.conversations.search_messages_by_keywords(
765            session_id,
766            keywords,
767            20,
768        ).await?;
769        
770        let mut results = Vec::new();
771        
772        for message in messages {
773            // Convert message to KV entry for consistency
774            let entry = KVEntry {
775                key_hash: format!("msg_{}", message.id),
776                key_data: Some(message.content.as_bytes().to_vec()),
777                value_data: message.content.as_bytes().to_vec(),
778                key_type: "message".to_string(),
779                layer_index: 0,
780                head_index: None,
781                importance_score: message.importance_score,
782                access_count: 1,
783                last_accessed: message.timestamp,
784                token_positions: None,
785                embedding: None,
786                size_bytes: message.content.len(),
787                is_persistent: false,
788            };
789            
790            let similarity = self.calculate_keyword_similarity(&entry, keywords);
791            if similarity > 0.5 { // Highest threshold for Tier 3
792                results.push(RetrievedEntry {
793                    entry,
794                    similarity_score: similarity,
795                    source_tier: 3,
796                    matched_keywords: keywords.to_vec(),
797                    retrieval_time: Utc::now(),
798                });
799            }
800        }
801        
802        // Sort by timestamp (most recent first) and similarity
803        results.sort_by(|a, b| {
804            b.entry.last_accessed.cmp(&a.entry.last_accessed)
805                .then(b.similarity_score.partial_cmp(&a.similarity_score)
806                    .unwrap_or(std::cmp::Ordering::Equal))
807        });
808        
809        results.truncate(10);
810        
811        debug!("Tier 3 search found {} results", results.len());
812        Ok(results)
813    }
814    
815    fn calculate_keyword_similarity(&self, entry: &KVEntry, keywords: &[String]) -> f32 {
816        if keywords.is_empty() {
817            return 0.0;
818        }
819        
820        let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
821        if entry_keywords.is_empty() {
822            return 0.0;
823        }
824        
825        // Simple keyword matching with partial matches
826        let mut matches = 0.0;
827        for keyword in keywords {
828            let keyword_lower = keyword.to_lowercase();
829            for entry_keyword in &entry_keywords {
830                let entry_lower = entry_keyword.to_lowercase();
831                if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
832                    matches += 1.0;
833                    break;
834                }
835            }
836        }
837        
838        matches / keywords.len() as f32
839    }
840    
841    fn get_matching_keywords(&self, entry: &KVEntry, keywords: &[String]) -> Vec<String> {
842        let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
843        let mut matches = Vec::new();
844        
845        for keyword in keywords {
846            let keyword_lower = keyword.to_lowercase();
847            for entry_keyword in &entry_keywords {
848                let entry_lower = entry_keyword.to_lowercase();
849                if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
850                    matches.push(keyword.clone());
851                    break;
852                }
853            }
854        }
855        
856        matches
857    }
858    
859    fn extract_keywords(&self, text: &str) -> Vec<String> {
860        let words: Vec<&str> = text.split_whitespace().collect();
861        words.iter()
862            .filter(|w| w.len() > 3)
863            .map(|w| w.to_lowercase())
864            .filter(|w| !self.is_stop_word(w))
865            .collect()
866    }
867    
868    fn is_stop_word(&self, word: &str) -> bool {
869        let stop_words = [
870            "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
871            "of", "with", "by", "is", "am", "are", "was", "were", "be", "been",
872            "being", "have", "has", "had", "do", "does", "did", "will", "would",
873            "shall", "should", "may", "might", "must", "can", "could",
874        ];
875        stop_words.contains(&word)
876    }
877    
878    /// Update session metadata in database
879    async fn update_session_metadata(
880        &self,
881        session_id: &str,
882        state: &SessionCacheState,
883    ) -> anyhow::Result<()> {
884        self.database.update_kv_cache_metadata(session_id, state).await
885    }
886    
887    /// Generate and store embeddings for KV cache entries that carry text content.
888    ///
889    /// Each KV entry is matched against the session's stored messages by content.
890    /// Only entries whose text matches an actual stored message get an embedding —
891    /// purely structural entries (attention heads with no readable content) are skipped.
892    /// The embedding is stored via `EmbeddingStore` so it becomes searchable by
893    /// `SmartRetrieval` on future context misses.
894    async fn generate_and_store_kv_embeddings(
895        &self,
896        session_id: &str,
897        kv_entries: &[KVEntry],
898    ) -> anyhow::Result<()> {
899        debug!("Generating embeddings for {} KV cache entries", kv_entries.len());
900
901        // Build a (entry_index, text) list from entries that carry readable content
902        let text_entries: Vec<(usize, String)> = kv_entries.iter().enumerate()
903            .filter_map(|(i, entry)| {
904                let text = entry.key_data.as_ref()
905                    .and_then(|d| String::from_utf8(d.clone()).ok())
906                    .or_else(|| String::from_utf8(entry.value_data.clone()).ok())?;
907                if text.trim().is_empty() { None } else { Some((i, text)) }
908            })
909            .collect();
910
911        if text_entries.is_empty() {
912            debug!("No text content in KV entries — skipping embedding generation");
913            return Ok(());
914        }
915
916        // Build a content→message_id map for the session so we can link embeddings
917        // back to the correct messages row (required for EmbeddingStore foreign key).
918        let stored_messages = self.database.conversations
919            .get_session_messages(session_id, Some(1000), None)
920            .unwrap_or_default();
921        let content_to_id: HashMap<&str, i64> = stored_messages.iter()
922            .map(|m| (m.content.as_str(), m.id))
923            .collect();
924
925        let llm_worker = match self.get_llm_worker() {
926            Some(w) => w,
927            None => {
928                debug!("No LLM worker — cannot generate KV embeddings");
929                return Ok(());
930            }
931        };
932
933        let texts: Vec<String> = text_entries.iter().map(|(_, t)| t.clone()).collect();
934
935        match llm_worker.generate_embeddings(texts).await {
936            Ok(embeddings) => {
937                let mut stored = 0usize;
938                let now = Utc::now();
939
940                for (pos, embedding_vec) in embeddings.iter().enumerate() {
941                    if embedding_vec.is_empty() { continue; }
942                    let Some((_, ref text)) = text_entries.get(pos) else { continue };
943
944                    // Find the message_id this text belongs to
945                    let message_id = match content_to_id.get(text.as_str()).copied() {
946                        Some(id) => id,
947                        None => {
948                            debug!(
949                                "KV entry text has no matching stored message — skipping embedding"
950                            );
951                            continue;
952                        }
953                    };
954
955                    let embedding = crate::memory_db::schema::Embedding {
956                        id: 0, // auto-assigned by DB
957                        message_id,
958                        embedding: embedding_vec.clone(),
959                        embedding_model: "local-llm".to_string(),
960                        generated_at: now,
961                    };
962
963                    if let Err(e) = self.database.embeddings.store_embedding(&embedding) {
964                        warn!("Failed to store KV embedding for message {}: {}", message_id, e);
965                    } else {
966                        // Mark the message so we don't re-embed it later
967                        let _ = self.database.conversations.mark_embedding_generated(message_id);
968                        stored += 1;
969                    }
970                }
971
972                info!(
973                    "Stored {}/{} KV embeddings for session {}",
974                    stored, embeddings.len(), session_id
975                );
976            }
977            Err(e) => {
978                warn!("Failed to generate embeddings for KV cache entries: {}", e);
979            }
980        }
981
982        Ok(())
983    }
984    
985    /// Get reference to LLM worker for embedding generation
986    fn get_llm_worker(&self) -> Option<&crate::worker_threads::LLMWorker> {
987        self.llm_worker.as_ref().map(|worker| worker.as_ref())
988    }
989    
990    /// Set the LLM worker reference
991    pub fn set_llm_worker(&mut self, llm_worker: Arc<crate::worker_threads::LLMWorker>) {
992        self.llm_worker = Some(llm_worker);
993    }
994    
995    /// Get cache statistics
996    pub fn get_statistics(&self) -> &CacheStatistics {
997        &self.statistics
998    }
999    
1000    /// Get session state
1001    pub fn get_session_state(&self, session_id: &str) -> Option<&SessionCacheState> {
1002        self.session_state.get(session_id)
1003    }
1004    
1005    /// Get all session states
1006    pub fn get_all_session_states(&self) -> &HashMap<String, SessionCacheState> {
1007        &self.session_state
1008    }
1009    
1010    /// Restore cache from snapshot
1011    pub async fn restore_from_snapshot(
1012        &mut self,
1013        session_id: &str,
1014        snapshot_id: i64,
1015    ) -> anyhow::Result<Vec<KVEntry>> {
1016        info!("Restoring cache from snapshot {} for session {}", snapshot_id, session_id);
1017        
1018        let entries: Vec<KVEntry> = self.database.get_kv_snapshot_entries(snapshot_id).await?;
1019        
1020        // Update session state
1021        if let Some(state) = self.session_state.get_mut(session_id) {
1022            state.entry_count = entries.len();
1023            state.last_snapshot_id = Some(snapshot_id);
1024        }
1025        
1026        // Generate bridge message
1027        let bridge_message = self.context_bridge.create_restore_bridge(
1028            entries.len(),
1029            None, // Could calculate snapshot age if needed
1030        );
1031        
1032        info!("{}", bridge_message);
1033        
1034        // Update statistics
1035        self.statistics.record_restore(entries.len(), session_id);
1036        
1037        Ok(entries)
1038    }
1039    
1040    /// Manual cache clear (for testing or admin purposes).
1041    /// No messages are available in this path, so no pre-clear summary is generated.
1042    pub async fn manual_clear_cache(
1043        &mut self,
1044        session_id: &str,
1045        current_entries: &[KVEntry],
1046    ) -> anyhow::Result<CacheClearResult> {
1047        self.clear_cache(session_id, current_entries, &[], ClearReason::Manual).await
1048    }
1049    
1050    /// Check cache health and perform maintenance if needed
1051    pub async fn perform_maintenance(&mut self) -> anyhow::Result<MaintenanceResult> {
1052        let mut result = MaintenanceResult {
1053            sessions_cleaned: 0,
1054            snapshots_pruned: 0,
1055            errors: Vec::new(),
1056        };
1057        
1058        // Clean up old session states (inactive for > 24 hours)
1059        let cutoff = Utc::now() - chrono::Duration::hours(24);
1060        let sessions_to_clean: Vec<String> = self.session_state.iter()
1061            .filter(|(_, state)| {
1062                state.last_cleared_at.is_none_or(|dt| dt < cutoff)
1063            })
1064            .map(|(id, _)| id.clone())
1065            .collect();
1066        
1067        for session_id in sessions_to_clean {
1068            if let Err(e) = self.cleanup_session(&session_id).await {
1069                result.errors.push(format!("Failed to cleanup session {}: {}", session_id, e));
1070            } else {
1071                result.sessions_cleaned += 1;
1072            }
1073        }
1074        
1075        // Prune old snapshots if configured
1076        if let SnapshotStrategy::Incremental { max_snapshots, .. } = &self.config.snapshot_strategy {
1077            let pruned = self.prune_old_snapshots(*max_snapshots).await?;
1078            result.snapshots_pruned = pruned;
1079        }
1080        
1081        Ok(result)
1082    }
1083    
1084    /// Cleanup a specific session
1085    async fn cleanup_session(&mut self, session_id: &str) -> anyhow::Result<()> {
1086        self.session_state.remove(session_id);
1087        self.database.cleanup_session_snapshots(session_id).await?;
1088        Ok(())
1089    }
1090    
1091    /// Prune old snapshots
1092    async fn prune_old_snapshots(&self, keep_max: usize) -> anyhow::Result<usize> {
1093        self.database.prune_old_kv_snapshots(keep_max).await
1094    }
1095    
1096    /// Export cache statistics
1097    pub fn export_statistics(&self) -> CacheStatisticsExport {
1098        CacheStatisticsExport {
1099            total_clears: self.statistics.total_clears,
1100            total_retrievals: self.statistics.total_retrievals,
1101            entries_preserved: self.statistics.entries_preserved,
1102            entries_cleared: self.statistics.entries_cleared,
1103            entries_retrieved: self.statistics.entries_retrieved,
1104            active_sessions: self.session_state.len(),
1105            last_operation: self.statistics.last_operation,
1106            operation_history_count: self.statistics.operation_history.len(),
1107        }
1108    }
1109    
1110    /// Get configuration
1111    pub fn get_config(&self) -> &KVCacheConfig {
1112        &self.config
1113    }
1114    
1115    /// Update configuration
1116    pub fn update_config(&mut self, config: KVCacheConfig) {
1117        self.config = config;
1118    }
1119    
1120    /// Flush current cache state to database for persistence
1121    pub async fn flush_to_database(&self, session_id: &str, current_entries: &[KVEntry]) -> anyhow::Result<Option<i64>> {
1122        if current_entries.is_empty() {
1123            debug!("No entries to flush for session: {}", session_id);
1124            return Ok(None);
1125        }
1126
1127        debug!("Flushing {} cache entries to database for session: {}", current_entries.len(), session_id);
1128
1129        // Create a snapshot of current entries
1130        let snapshot_id = self.database.create_kv_snapshot(session_id, current_entries).await?;
1131        
1132        // Update session metadata
1133        if let Some(state) = self.session_state.get(session_id) {
1134            let mut updated_state = state.clone();
1135            updated_state.entry_count = current_entries.len();
1136            updated_state.last_snapshot_id = Some(snapshot_id);
1137            
1138            self.update_session_metadata(session_id, &updated_state).await?;
1139        }
1140
1141        info!("Flushed {} entries to database snapshot {} for session {}", 
1142              current_entries.len(), snapshot_id, session_id);
1143
1144        Ok(Some(snapshot_id))
1145    }
1146    
1147    /// Get cache scorer reference
1148    pub fn cache_scorer(&self) -> &CacheEntryScorer {
1149        &self.cache_scorer
1150    }
1151    
1152    /// Get mutable cache scorer reference
1153    pub fn cache_scorer_mut(&mut self) -> &mut CacheEntryScorer {
1154        &mut self.cache_scorer
1155    }
1156    
1157    /// Reset statistics
1158    pub fn reset_statistics(&mut self) {
1159        self.statistics = CacheStatistics::new();
1160    }
1161    
1162    /// Shutdown method to flush all pending cache data to database
1163    pub async fn shutdown_flush(&self) -> anyhow::Result<()> {
1164        info!("Starting KV cache manager shutdown flush...");
1165
1166        // Get all current session states
1167        let all_states = self.get_all_session_states().clone();
1168        
1169        for (session_id, state) in all_states {
1170            info!("Flushing cache data for session: {} (entries: {}, size: {} bytes)", 
1171                  session_id, state.entry_count, state.cache_size_bytes);
1172            
1173            // KVCacheManager stores metadata, not the raw tensors — those live inside
1174            // the llama-server process and are accessed via LlamaKVCacheInterface.
1175            // Persisting metadata is the correct shutdown action here.
1176            self.update_session_metadata(&session_id, &state).await?;
1177        }
1178        
1179        info!("KV cache manager shutdown flush completed");
1180        Ok(())
1181    }
1182}
1183
1184impl CacheStatistics {
1185    pub fn new() -> Self {
1186        Self::default()
1187    }
1188
1189    pub fn record_clear(
1190        &mut self,
1191        total_entries: usize,
1192        preserved_entries: usize,
1193        reason: ClearReason,
1194        session_id: &str,
1195    ) {
1196        self.total_clears += 1;
1197        self.entries_preserved += preserved_entries;
1198        self.entries_cleared += total_entries - preserved_entries;
1199        self.last_operation = Some(Utc::now());
1200        
1201        self.operation_history.push(CacheOperation {
1202            operation_type: CacheOperationType::Clear,
1203            timestamp: Utc::now(),
1204            entries_affected: total_entries,
1205            session_id: session_id.to_string(),
1206            details: format!("{:?}", reason),
1207        });
1208        
1209        // Keep only last 100 operations
1210        if self.operation_history.len() > 100 {
1211            self.operation_history.remove(0);
1212        }
1213    }
1214    
1215    pub fn record_retrieval(
1216        &mut self,
1217        retrieved_count: usize,
1218        tiers_searched: Vec<u8>,
1219        keywords_count: usize,
1220        session_id: &str,
1221    ) {
1222        self.total_retrievals += 1;
1223        self.entries_retrieved += retrieved_count;
1224        self.last_operation = Some(Utc::now());
1225        
1226        self.operation_history.push(CacheOperation {
1227            operation_type: CacheOperationType::Retrieve,
1228            timestamp: Utc::now(),
1229            entries_affected: retrieved_count,
1230            session_id: session_id.to_string(),
1231            details: format!("Tiers: {:?}, Keywords: {}", tiers_searched, keywords_count),
1232        });
1233        
1234        // Keep only last 100 operations
1235        if self.operation_history.len() > 100 {
1236            self.operation_history.remove(0);
1237        }
1238    }
1239    
1240    pub fn record_restore(&mut self, restored_count: usize, session_id: &str) {
1241        self.operation_history.push(CacheOperation {
1242            operation_type: CacheOperationType::Restore,
1243            timestamp: Utc::now(),
1244            entries_affected: restored_count,
1245            session_id: session_id.to_string(),
1246            details: "Cache restored from snapshot".to_string(),
1247        });
1248        
1249        // Keep only last 100 operations
1250        if self.operation_history.len() > 100 {
1251            self.operation_history.remove(0);
1252        }
1253    }
1254    
1255    pub fn record_snapshot(&mut self, snapshot_id: i64, entry_count: usize, session_id: &str) {
1256        self.operation_history.push(CacheOperation {
1257            operation_type: CacheOperationType::Snapshot,
1258            timestamp: Utc::now(),
1259            entries_affected: entry_count,
1260            session_id: session_id.to_string(),
1261            details: format!("Snapshot ID: {}", snapshot_id),
1262        });
1263        
1264        // Keep only last 100 operations
1265        if self.operation_history.len() > 100 {
1266            self.operation_history.remove(0);
1267        }
1268    }
1269}
1270
1271impl RetrievalResult {
1272    pub fn new() -> Self {
1273        Self::default()
1274    }
1275
1276    pub fn total_entries(&self) -> usize {
1277        self.retrieved_entries.len()
1278    }
1279    
1280    pub fn average_similarity(&self) -> f32 {
1281        if self.retrieved_entries.is_empty() {
1282            return 0.0;
1283        }
1284        self.retrieved_entries.iter()
1285            .map(|e| e.similarity_score)
1286            .sum::<f32>() / self.retrieved_entries.len() as f32
1287    }
1288    
1289    pub fn is_empty(&self) -> bool {
1290        self.retrieved_entries.is_empty()
1291    }
1292    
1293    pub fn entries_by_tier(&self, tier: u8) -> Vec<&RetrievedEntry> {
1294        self.retrieved_entries.iter()
1295            .filter(|e| e.source_tier == tier)
1296            .collect()
1297    }
1298    
1299    pub fn primary_tier(&self) -> u8 {
1300        if self.retrieved_entries.is_empty() {
1301            return 0;
1302        }
1303        
1304        // Count entries by tier
1305        let mut tier_counts = HashMap::new();
1306        for entry in &self.retrieved_entries {
1307            *tier_counts.entry(entry.source_tier).or_insert(0) += 1;
1308        }
1309        
1310        // Return tier with most entries
1311        tier_counts.into_iter()
1312            .max_by_key(|(_, count)| *count)
1313            .map(|(tier, _)| tier)
1314            .unwrap_or(0)
1315    }
1316    
1317    pub fn add_tier_results(&mut self, tier: u8, results: Vec<RetrievedEntry>) {
1318        self.tiers_searched.push(tier);
1319        self.retrieved_entries.extend(results);
1320    }
1321}
1322
1323#[derive(Debug, Clone, Serialize)]
1324pub struct CacheStatisticsExport {
1325    pub total_clears: usize,
1326    pub total_retrievals: usize,
1327    pub entries_preserved: usize,
1328    pub entries_cleared: usize,
1329    pub entries_retrieved: usize,
1330    pub active_sessions: usize,
1331    pub last_operation: Option<DateTime<Utc>>,
1332    pub operation_history_count: usize,
1333}
1334
1335#[derive(Debug, Clone, Serialize)]
1336pub struct MaintenanceResult {
1337    pub sessions_cleaned: usize,
1338    pub snapshots_pruned: usize,
1339    pub errors: Vec<String>,
1340}