1use crate::memory::Message;
4use crate::memory_db::MemoryDatabase;
5use crate::cache_management::cache_config::{KVCacheConfig, SnapshotStrategy};
6use crate::cache_management::cache_extractor::{CacheExtractor, ExtractedCacheEntry, KVEntry};
7use crate::cache_management::cache_scorer::{CacheEntryScorer, CacheScoringConfig};
8use crate::cache_management::cache_bridge::CacheContextBridge;
9
10use std::sync::Arc;
11use std::collections::HashMap;
12use tracing::{info, debug, warn};
13use chrono::{Utc, DateTime};
14use serde::Serialize;
15
16pub struct KVCacheManager {
18 config: KVCacheConfig,
19 database: Arc<MemoryDatabase>,
20 cache_extractor: CacheExtractor,
21 cache_scorer: CacheEntryScorer,
22 context_bridge: CacheContextBridge,
23 statistics: CacheStatistics,
24 session_state: HashMap<String, SessionCacheState>,
25 llm_worker: Option<Arc<crate::worker_threads::LLMWorker>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct KvSnapshot {
30 pub id: i64,
31 pub session_id: String,
32 pub message_id: i64,
33 pub snapshot_type: String,
34 pub size_bytes: i64,
35 pub created_at: chrono::DateTime<chrono::Utc>,
36}
37
38#[derive(Debug, Clone, Serialize)]
39pub struct SessionCacheState {
40 pub session_id: String,
41 pub conversation_count: usize,
42 pub last_cleared_at: Option<DateTime<Utc>>,
43 pub last_snapshot_id: Option<i64>,
44 pub cache_size_bytes: usize,
45 pub entry_count: usize,
46 pub metadata: HashMap<String, String>,
47}
48
49#[derive(Debug, Clone, Serialize, Default)]
50pub struct CacheStatistics {
51 pub total_clears: usize,
52 pub total_retrievals: usize,
53 pub entries_preserved: usize,
54 pub entries_cleared: usize,
55 pub entries_retrieved: usize,
56 pub last_operation: Option<DateTime<Utc>>,
57 pub operation_history: Vec<CacheOperation>,
58}
59
60#[derive(Debug, Clone, Serialize)]
61pub struct CacheOperation {
62 pub operation_type: CacheOperationType,
63 pub timestamp: DateTime<Utc>,
64 pub entries_affected: usize,
65 pub session_id: String,
66 pub details: String,
67}
68
69#[derive(Debug, Clone, Serialize)]
70pub enum CacheOperationType {
71 Clear,
72 Retrieve,
73 Snapshot,
74 Restore,
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub enum ClearReason {
79 ConversationLimit,
80 MemoryThreshold,
81 Manual,
82 ErrorRecovery,
83}
84
85#[derive(Debug, Clone)]
86pub struct CacheClearResult {
87 pub entries_to_keep: Vec<ExtractedCacheEntry>,
88 pub entries_cleared: usize,
89 pub bridge_message: String,
90 pub snapshot_id: Option<i64>,
91 pub preserved_keywords: Vec<String>,
92 pub clear_reason: ClearReason,
93}
94
95#[derive(Debug, Clone, Default)]
96pub struct RetrievalResult {
97 pub retrieved_entries: Vec<RetrievedEntry>,
98 pub bridge_message: Option<String>,
99 pub search_duration_ms: u64,
100 pub keywords_used: Vec<String>,
101 pub tiers_searched: Vec<u8>,
102}
103
104#[derive(Debug, Clone)]
105pub struct RetrievedEntry {
106 pub entry: KVEntry,
107 pub similarity_score: f32,
108 pub source_tier: u8,
109 pub matched_keywords: Vec<String>,
110 pub retrieval_time: DateTime<Utc>,
111}
112
113#[derive(Debug, Clone)]
114pub struct CacheProcessingResult {
115 pub should_clear_cache: bool,
116 pub clear_result: Option<CacheClearResult>,
117 pub should_retrieve: bool,
118 pub retrieval_result: Option<RetrievalResult>,
119 pub bridge_messages: Vec<String>,
120 pub updated_session_state: SessionCacheState,
121}
122
123impl KVCacheManager {
124 pub fn new(
127 config: KVCacheConfig,
128 database: Arc<MemoryDatabase>,
129 llm_worker: Option<Arc<crate::worker_threads::LLMWorker>>,
130 ) -> anyhow::Result<Self> {
131 let cache_extractor = CacheExtractor::new(Default::default());
132 let scoring_config = CacheScoringConfig::default();
133 let cache_scorer = CacheEntryScorer::new(scoring_config);
134 let context_bridge = CacheContextBridge::new(20);
135
136 Ok(Self {
137 config,
138 database,
139 cache_extractor,
140 cache_scorer,
141 context_bridge,
142 statistics: CacheStatistics::new(),
143 session_state: HashMap::new(),
144 llm_worker,
145 })
146 }
147
148 fn get_or_create_session_state(&mut self, session_id: &str) -> &mut SessionCacheState {
150 self.session_state.entry(session_id.to_string())
151 .or_insert_with(|| SessionCacheState {
152 session_id: session_id.to_string(),
153 conversation_count: 0,
154 last_cleared_at: None,
155 last_snapshot_id: None,
156 cache_size_bytes: 0,
157 entry_count: 0,
158 metadata: HashMap::new(),
159 })
160 }
161
162 pub async fn process_conversation(
164 &mut self,
165 session_id: &str,
166 messages: &[Message],
167 current_kv_entries: &[KVEntry],
168 _current_cache_size_bytes: usize,
169 max_cache_size_bytes: usize,
170 ) -> anyhow::Result<CacheProcessingResult> {
171 debug!("Processing conversation for session: {}", session_id);
172
173 let current_conversation_count = self.session_state
175 .get(session_id)
176 .map(|s| s.conversation_count)
177 .unwrap_or(0);
178
179 let actual_cache_memory = self.calculate_cache_memory_usage(current_kv_entries);
181 let should_clear_by_conversation = self.should_clear_by_conversation(current_conversation_count + 1);
182 let should_clear_by_memory = self.should_clear_by_memory(actual_cache_memory, max_cache_size_bytes);
183
184 let session_state = self.get_or_create_session_state(session_id);
186 session_state.conversation_count += 1;
187 session_state.cache_size_bytes = actual_cache_memory;
188 session_state.entry_count = current_kv_entries.len();
189
190 let mut result = CacheProcessingResult {
191 should_clear_cache: false,
192 clear_result: None,
193 should_retrieve: false,
194 retrieval_result: None,
195 bridge_messages: Vec::new(),
196 updated_session_state: session_state.clone(),
197 };
198
199 if should_clear_by_conversation || should_clear_by_memory {
200 let clear_reason = if should_clear_by_conversation {
201 ClearReason::ConversationLimit
202 } else {
203 ClearReason::MemoryThreshold
204 };
205
206 let _ = session_state;
208
209 let clear_result = self.clear_cache(session_id, current_kv_entries, messages, clear_reason).await?;
210 result.should_clear_cache = true;
211 result.clear_result = Some(clear_result.clone());
212 result.bridge_messages.push(clear_result.bridge_message);
213
214 if let Some(state) = self.session_state.get_mut(session_id) {
216 state.conversation_count = 0;
217 state.last_cleared_at = Some(Utc::now());
218 result.updated_session_state = state.clone();
219 }
220 }
221
222 if !result.should_clear_cache {
225 let was_previously_cleared = self.session_state
226 .get(session_id)
227 .and_then(|s| s.last_cleared_at)
228 .is_some();
229
230 if was_previously_cleared {
231 match self.database.session_summaries.get(session_id) {
232 Ok(Some(summary)) => {
233 let summary_msg = format!(
234 "[Prior conversation summary — context before memory compression:]\n{}",
235 summary.summary_text
236 );
237 result.bridge_messages.insert(0, summary_msg);
238 debug!(
239 "Prepended pre-clear summary for session {} ({} tokens, {} messages summarized)",
240 session_id, summary.token_count, summary.total_message_count
241 );
242 }
243 Ok(None) => {}
244 Err(e) => {
245 debug!("Could not fetch summary for session {}: {}", session_id, e);
246 }
247 }
248 }
249 }
250
251 let should_retrieve = self.should_retrieve_context(messages);
253 if should_retrieve {
254 let last_user_message = messages.iter()
255 .rev()
256 .find(|m| m.role == "user")
257 .map(|m| &m.content)
258 .map_or("", |v| v);
259
260 if !last_user_message.is_empty() {
261 let retrieval_result = self.retrieve_context(session_id, last_user_message, current_kv_entries).await?;
262 if !retrieval_result.retrieved_entries.is_empty() {
263 result.should_retrieve = true;
264 result.retrieval_result = Some(retrieval_result.clone());
265 if let Some(bridge_msg) = &retrieval_result.bridge_message {
266 result.bridge_messages.push(bridge_msg.clone());
267 }
268 }
269 }
270 }
271
272 if let Some(state) = self.session_state.get(session_id) {
274 self.update_session_metadata(session_id, state).await?;
275
276 if self.config.generate_cache_embeddings && !current_kv_entries.is_empty() {
278 self.generate_and_store_kv_embeddings(session_id, current_kv_entries).await?;
279 }
280 }
281
282 Ok(result)
283 }
284
285 pub fn should_clear_by_conversation(&self, conversation_count: usize) -> bool {
287 conversation_count >= self.config.clear_after_conversations
288 }
289
290 pub fn should_clear_by_memory(&self, current_usage_bytes: usize, max_memory_bytes: usize) -> bool {
292 if max_memory_bytes == 0 {
293 let max_memory = self.estimate_max_cache_memory();
295 let usage_percent = current_usage_bytes as f32 / max_memory as f32;
296 return usage_percent >= self.config.memory_threshold_percent;
297 }
298
299 let usage_percent = current_usage_bytes as f32 / max_memory_bytes as f32;
300 usage_percent >= self.config.memory_threshold_percent
301 }
302
303 fn estimate_max_cache_memory(&self) -> usize {
310 if self.config.max_cache_entries > 0 {
311 return self.config.max_cache_entries * 1024;
312 }
313
314 let available = {
315 let mut sys = sysinfo::System::new();
316 sys.refresh_memory();
317 sys.available_memory() as usize
318 };
319
320 if available > 0 {
321 let target = available / 4;
323 const MIN: usize = 256 * 1024 * 1024; const MAX: usize = 8 * 1024 * 1024 * 1024; target.clamp(MIN, MAX)
326 } else {
327 1024 * 1024 * 1024 }
329 }
330
331 pub fn calculate_cache_memory_usage(&self, entries: &[KVEntry]) -> usize {
333 entries.iter().map(|entry| entry.size_bytes).sum()
334 }
335
336 fn should_retrieve_context(&self, messages: &[Message]) -> bool {
338 if !self.config.retrieval_enabled {
339 return false;
340 }
341
342 if let Some(last_user) = messages.iter().rev().find(|m| m.role == "user") {
344 let content = &last_user.content;
345 content.contains('?') ||
347 content.len() > 100 ||
348 content.contains("```") ||
349 content.contains("explain") ||
350 content.contains("how to") ||
351 content.contains("what is")
352 } else {
353 false
354 }
355 }
356
357 pub async fn clear_cache(
361 &mut self,
362 session_id: &str,
363 current_entries: &[KVEntry],
364 messages: &[crate::memory::Message],
365 reason: ClearReason,
366 ) -> anyhow::Result<CacheClearResult> {
367 info!("Clearing KV cache for session {}: {:?}", session_id, reason);
368
369 let start_time = std::time::Instant::now();
370
371 let extracted = self.cache_extractor.extract_entries(current_entries, &self.cache_scorer);
373
374 let to_preserve = self.cache_extractor.filter_preserved_entries(
376 &extracted,
377 self.config.min_importance_to_preserve,
378 self.config.preserve_system_prompts,
379 self.config.preserve_code_entries,
380 );
381
382 let snapshot_id = if self.should_create_snapshot(&reason) {
384 Some(self.create_snapshot(session_id, &to_preserve).await?)
385 } else {
386 None
387 };
388
389 let preserved_keywords: Vec<String> = to_preserve.iter()
391 .flat_map(|e| e.keywords.clone())
392 .take(10)
393 .collect();
394
395 let summary_text = self.generate_pre_clear_summary(session_id, messages).await;
398
399 if let Some(ref text) = summary_text {
401 let token_estimate = (text.len() / 4) as i32;
402 let msg_count = messages.len() as i32;
403 if let Err(e) = self.database.session_summaries.upsert(
404 session_id, text, token_estimate, msg_count,
405 ) {
406 info!("Could not persist cumulative summary for {}: {}", session_id, e);
408 }
409 }
410
411 let bridge_message = match summary_text {
414 Some(ref text) => format!(
415 "[Memory compressed — conversation summary up to this point:]\n{}",
416 text
417 ),
418 None => self.context_bridge.create_clear_bridge(
419 current_entries.len().saturating_sub(to_preserve.len()),
420 to_preserve.len(),
421 &preserved_keywords,
422 ),
423 };
424
425 self.statistics.record_clear(
427 current_entries.len(),
428 to_preserve.len(),
429 reason.clone(),
430 session_id,
431 );
432
433 if let Some(state) = self.session_state.get_mut(session_id) {
435 state.entry_count = to_preserve.len();
436 state.last_snapshot_id = snapshot_id;
437 state.last_cleared_at = Some(Utc::now());
438 state.metadata.insert("last_clear_reason".to_string(), format!("{:?}", reason));
439 }
440
441 let duration = start_time.elapsed();
442 debug!("Cache clear completed in {:?}", duration);
443
444 Ok(CacheClearResult {
445 entries_to_keep: to_preserve.clone(),
446 entries_cleared: current_entries.len().saturating_sub(to_preserve.len()),
447 bridge_message,
448 snapshot_id,
449 preserved_keywords,
450 clear_reason: reason,
451 })
452 }
453
454 async fn generate_pre_clear_summary(
463 &self,
464 session_id: &str,
465 messages: &[crate::memory::Message],
466 ) -> Option<String> {
467 if messages.len() < 4 {
469 return None;
470 }
471
472 let llm_worker = self.llm_worker.as_ref()?;
473
474 let existing_summary = self.database.session_summaries.get(session_id)
476 .unwrap_or(None);
477
478 let system_content = match &existing_summary {
480 Some(prev) => format!(
481 "You are a concise summarizer. You will be given a running summary of a \
482 conversation followed by new messages that have occurred since that summary. \
483 Produce a single updated summary that covers EVERYTHING — the prior summary \
484 and the new messages combined. Be brief (target under 400 tokens). \
485 Include key facts, decisions, code, numbers, and names. No commentary.\n\n\
486 PRIOR SUMMARY (covers everything before these new messages):\n{}",
487 prev.summary_text
488 ),
489 None => "You are a concise summarizer. Summarize the following conversation \
490 into key facts, decisions, code snippets, and figures. \
491 Be brief — target under 300 tokens. No commentary.".to_string(),
492 };
493
494 let mut context: Vec<crate::memory::Message> = vec![
495 crate::memory::Message {
496 role: "system".to_string(),
497 content: system_content,
498 },
499 ];
500
501 let tail = if messages.len() > 40 {
503 &messages[messages.len() - 40..]
504 } else {
505 messages
506 };
507 context.extend_from_slice(tail);
508
509 let user_prompt = if existing_summary.is_some() {
510 "Please produce the updated cumulative summary now, covering both the prior summary and these new messages."
511 } else {
512 "Please summarize the above conversation now."
513 };
514 context.push(crate::memory::Message {
515 role: "user".to_string(),
516 content: user_prompt.to_string(),
517 });
518
519 match llm_worker.generate_response(session_id.to_string(), context).await {
520 Ok(summary) if !summary.trim().is_empty() => {
521 let clear_num = existing_summary.as_ref().map(|s| s.clear_count + 1).unwrap_or(1);
522 info!(
523 "Generated cumulative summary #{} for session {} ({} chars)",
524 clear_num, session_id, summary.len()
525 );
526 Some(summary)
527 }
528 Ok(_) => {
529 debug!("Pre-clear summary was empty for session {}", session_id);
530 None
531 }
532 Err(e) => {
533 info!("Pre-clear summary generation skipped for {}: {}", session_id, e);
534 None
535 }
536 }
537 }
538
539 fn should_create_snapshot(&self, reason: &ClearReason) -> bool {
541 if !self.config.enabled {
542 return false;
543 }
544
545 match &self.config.snapshot_strategy {
546 SnapshotStrategy::None => false,
547 SnapshotStrategy::Full { interval_conversations: _ } => true,
548 SnapshotStrategy::Incremental { interval_conversations: _, max_snapshots: _ } => {
549 matches!(reason, ClearReason::ConversationLimit)
550 }
551 SnapshotStrategy::Adaptive { min_importance_threshold: _, max_snapshots: _ } => true,
552 }
553 }
554
555 async fn create_snapshot(
557 &self,
558 session_id: &str,
559 preserved_entries: &[ExtractedCacheEntry],
560 ) -> anyhow::Result<i64> {
561 debug!("Creating KV snapshot for session: {}", session_id);
562
563 let db_entries: Vec<KVEntry> = preserved_entries.iter()
565 .map(|entry| {
566 KVEntry {
567 key_hash: entry.key_hash.clone(),
568 key_data: entry.key_data.clone(),
569 value_data: entry.value_data.clone(),
570 key_type: entry.entry_type.to_string(),
571 layer_index: entry.layer_index,
572 head_index: entry.head_index,
573 importance_score: entry.importance_score,
574 access_count: entry.access_count,
575 last_accessed: Utc::now(),
576 token_positions: None,
577 embedding: None,
578 size_bytes: entry.value_data.len(),
579 is_persistent: true,
580 }
581 })
582 .collect();
583
584 let snapshot_id = self.database.create_kv_snapshot(session_id, &db_entries).await?;
586
587 info!("Created KV snapshot {} with {} entries", snapshot_id, db_entries.len());
588 Ok(snapshot_id)
589 }
590
591 pub async fn retrieve_context(
593 &mut self,
594 session_id: &str,
595 query: &str,
596 current_cache_entries: &[KVEntry],
597 ) -> anyhow::Result<RetrievalResult> {
598 debug!("Retrieving context for query: {}", query);
599
600 let start_time = std::time::Instant::now();
601 let keywords = self.extract_keywords(query);
602
603 let mut results = Vec::new();
604 let mut searched_tiers = Vec::new();
605
606 if !current_cache_entries.is_empty() {
608 searched_tiers.push(1);
609 let tier1_results = self.search_tier1(current_cache_entries, &keywords).await?;
610 results.extend(tier1_results);
611 }
612
613 if results.len() < 5 {
615 searched_tiers.push(2);
616 let tier2_results = self.search_tier2(session_id, &keywords).await?;
617 results.extend(tier2_results);
618 }
619
620 if results.len() < 3 {
622 searched_tiers.push(3);
623 let tier3_results = self.search_tier3(session_id, &keywords).await?;
624 results.extend(tier3_results);
625 }
626
627 results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
629 .unwrap_or(std::cmp::Ordering::Equal));
630
631 results.truncate(20);
633
634 for result in &results {
636 self.cache_scorer.update_engagement(&result.entry.key_hash, true);
637 }
638
639 let bridge_message = if !results.is_empty() {
641 let primary_tier = results.iter()
642 .map(|r| r.source_tier)
643 .max()
644 .unwrap_or(1);
645
646 let avg_similarity = results.iter()
647 .map(|r| r.similarity_score)
648 .sum::<f32>() / results.len() as f32;
649
650 Some(self.context_bridge.create_retrieval_bridge(
651 results.len(),
652 primary_tier,
653 &keywords,
654 Some(avg_similarity),
655 ))
656 } else {
657 None
658 };
659
660 let duration = start_time.elapsed();
661
662 self.statistics.record_retrieval(
664 results.len(),
665 searched_tiers.clone(),
666 keywords.len(),
667 session_id,
668 );
669
670 Ok(RetrievalResult {
671 retrieved_entries: results,
672 bridge_message,
673 search_duration_ms: duration.as_millis() as u64,
674 keywords_used: keywords,
675 tiers_searched: searched_tiers,
676 })
677 }
678
679 async fn search_tier1(
681 &self,
682 entries: &[KVEntry],
683 keywords: &[String],
684 ) -> anyhow::Result<Vec<RetrievedEntry>> {
685 let mut results = Vec::new();
686
687 for entry in entries {
688 let similarity = self.calculate_keyword_similarity(entry, keywords);
689 if similarity > 0.3 { let matched_keywords = self.get_matching_keywords(entry, keywords);
691 results.push(RetrievedEntry {
692 entry: entry.clone(),
693 similarity_score: similarity,
694 source_tier: 1,
695 matched_keywords,
696 retrieval_time: Utc::now(),
697 });
698 }
699 }
700
701 results.sort_by(|a, b| {
703 b.similarity_score.partial_cmp(&a.similarity_score)
704 .unwrap_or(std::cmp::Ordering::Equal)
705 .then(b.entry.access_count.cmp(&a.entry.access_count))
706 });
707
708 results.truncate(10); debug!("Tier 1 search found {} results", results.len());
711 Ok(results)
712 }
713
714 async fn search_tier2(
716 &self,
717 session_id: &str,
718 keywords: &[String],
719 ) -> anyhow::Result<Vec<RetrievedEntry>> {
720 let snapshots = self.database.get_recent_kv_snapshots(session_id, 3).await?;
722
723 let mut all_results = Vec::new();
724
725 for snapshot in snapshots {
726 let entries = self.database.get_kv_snapshot_entries(snapshot.id).await?;
728
729 for entry in entries {
730 let similarity = self.calculate_keyword_similarity(&entry, keywords);
731 if similarity > 0.4 { let matched_keywords = self.get_matching_keywords(&entry, keywords);
733 all_results.push(RetrievedEntry {
734 entry,
735 similarity_score: similarity,
736 source_tier: 2,
737 matched_keywords,
738 retrieval_time: Utc::now(),
739 });
740 }
741 }
742 }
743
744 all_results.sort_by(|a, b| b.similarity_score.partial_cmp(&a.similarity_score)
746 .unwrap_or(std::cmp::Ordering::Equal));
747 all_results.truncate(15);
748
749 debug!("Tier 2 search found {} results", all_results.len());
750 Ok(all_results)
751 }
752
753 async fn search_tier3(
755 &self,
756 session_id: &str,
757 keywords: &[String],
758 ) -> anyhow::Result<Vec<RetrievedEntry>> {
759 if keywords.is_empty() {
760 return Ok(Vec::new());
761 }
762
763 let messages = self.database.conversations.search_messages_by_keywords(
765 session_id,
766 keywords,
767 20,
768 ).await?;
769
770 let mut results = Vec::new();
771
772 for message in messages {
773 let entry = KVEntry {
775 key_hash: format!("msg_{}", message.id),
776 key_data: Some(message.content.as_bytes().to_vec()),
777 value_data: message.content.as_bytes().to_vec(),
778 key_type: "message".to_string(),
779 layer_index: 0,
780 head_index: None,
781 importance_score: message.importance_score,
782 access_count: 1,
783 last_accessed: message.timestamp,
784 token_positions: None,
785 embedding: None,
786 size_bytes: message.content.len(),
787 is_persistent: false,
788 };
789
790 let similarity = self.calculate_keyword_similarity(&entry, keywords);
791 if similarity > 0.5 { results.push(RetrievedEntry {
793 entry,
794 similarity_score: similarity,
795 source_tier: 3,
796 matched_keywords: keywords.to_vec(),
797 retrieval_time: Utc::now(),
798 });
799 }
800 }
801
802 results.sort_by(|a, b| {
804 b.entry.last_accessed.cmp(&a.entry.last_accessed)
805 .then(b.similarity_score.partial_cmp(&a.similarity_score)
806 .unwrap_or(std::cmp::Ordering::Equal))
807 });
808
809 results.truncate(10);
810
811 debug!("Tier 3 search found {} results", results.len());
812 Ok(results)
813 }
814
815 fn calculate_keyword_similarity(&self, entry: &KVEntry, keywords: &[String]) -> f32 {
816 if keywords.is_empty() {
817 return 0.0;
818 }
819
820 let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
821 if entry_keywords.is_empty() {
822 return 0.0;
823 }
824
825 let mut matches = 0.0;
827 for keyword in keywords {
828 let keyword_lower = keyword.to_lowercase();
829 for entry_keyword in &entry_keywords {
830 let entry_lower = entry_keyword.to_lowercase();
831 if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
832 matches += 1.0;
833 break;
834 }
835 }
836 }
837
838 matches / keywords.len() as f32
839 }
840
841 fn get_matching_keywords(&self, entry: &KVEntry, keywords: &[String]) -> Vec<String> {
842 let entry_keywords = self.cache_scorer.extract_keywords(entry.key_data.as_deref());
843 let mut matches = Vec::new();
844
845 for keyword in keywords {
846 let keyword_lower = keyword.to_lowercase();
847 for entry_keyword in &entry_keywords {
848 let entry_lower = entry_keyword.to_lowercase();
849 if entry_lower.contains(&keyword_lower) || keyword_lower.contains(&entry_lower) {
850 matches.push(keyword.clone());
851 break;
852 }
853 }
854 }
855
856 matches
857 }
858
859 fn extract_keywords(&self, text: &str) -> Vec<String> {
860 let words: Vec<&str> = text.split_whitespace().collect();
861 words.iter()
862 .filter(|w| w.len() > 3)
863 .map(|w| w.to_lowercase())
864 .filter(|w| !self.is_stop_word(w))
865 .collect()
866 }
867
868 fn is_stop_word(&self, word: &str) -> bool {
869 let stop_words = [
870 "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
871 "of", "with", "by", "is", "am", "are", "was", "were", "be", "been",
872 "being", "have", "has", "had", "do", "does", "did", "will", "would",
873 "shall", "should", "may", "might", "must", "can", "could",
874 ];
875 stop_words.contains(&word)
876 }
877
878 async fn update_session_metadata(
880 &self,
881 session_id: &str,
882 state: &SessionCacheState,
883 ) -> anyhow::Result<()> {
884 self.database.update_kv_cache_metadata(session_id, state).await
885 }
886
887 async fn generate_and_store_kv_embeddings(
895 &self,
896 session_id: &str,
897 kv_entries: &[KVEntry],
898 ) -> anyhow::Result<()> {
899 debug!("Generating embeddings for {} KV cache entries", kv_entries.len());
900
901 let text_entries: Vec<(usize, String)> = kv_entries.iter().enumerate()
903 .filter_map(|(i, entry)| {
904 let text = entry.key_data.as_ref()
905 .and_then(|d| String::from_utf8(d.clone()).ok())
906 .or_else(|| String::from_utf8(entry.value_data.clone()).ok())?;
907 if text.trim().is_empty() { None } else { Some((i, text)) }
908 })
909 .collect();
910
911 if text_entries.is_empty() {
912 debug!("No text content in KV entries — skipping embedding generation");
913 return Ok(());
914 }
915
916 let stored_messages = self.database.conversations
919 .get_session_messages(session_id, Some(1000), None)
920 .unwrap_or_default();
921 let content_to_id: HashMap<&str, i64> = stored_messages.iter()
922 .map(|m| (m.content.as_str(), m.id))
923 .collect();
924
925 let llm_worker = match self.get_llm_worker() {
926 Some(w) => w,
927 None => {
928 debug!("No LLM worker — cannot generate KV embeddings");
929 return Ok(());
930 }
931 };
932
933 let texts: Vec<String> = text_entries.iter().map(|(_, t)| t.clone()).collect();
934
935 match llm_worker.generate_embeddings(texts).await {
936 Ok(embeddings) => {
937 let mut stored = 0usize;
938 let now = Utc::now();
939
940 for (pos, embedding_vec) in embeddings.iter().enumerate() {
941 if embedding_vec.is_empty() { continue; }
942 let Some((_, ref text)) = text_entries.get(pos) else { continue };
943
944 let message_id = match content_to_id.get(text.as_str()).copied() {
946 Some(id) => id,
947 None => {
948 debug!(
949 "KV entry text has no matching stored message — skipping embedding"
950 );
951 continue;
952 }
953 };
954
955 let embedding = crate::memory_db::schema::Embedding {
956 id: 0, message_id,
958 embedding: embedding_vec.clone(),
959 embedding_model: "local-llm".to_string(),
960 generated_at: now,
961 };
962
963 if let Err(e) = self.database.embeddings.store_embedding(&embedding) {
964 warn!("Failed to store KV embedding for message {}: {}", message_id, e);
965 } else {
966 let _ = self.database.conversations.mark_embedding_generated(message_id);
968 stored += 1;
969 }
970 }
971
972 info!(
973 "Stored {}/{} KV embeddings for session {}",
974 stored, embeddings.len(), session_id
975 );
976 }
977 Err(e) => {
978 warn!("Failed to generate embeddings for KV cache entries: {}", e);
979 }
980 }
981
982 Ok(())
983 }
984
985 fn get_llm_worker(&self) -> Option<&crate::worker_threads::LLMWorker> {
987 self.llm_worker.as_ref().map(|worker| worker.as_ref())
988 }
989
990 pub fn set_llm_worker(&mut self, llm_worker: Arc<crate::worker_threads::LLMWorker>) {
992 self.llm_worker = Some(llm_worker);
993 }
994
995 pub fn get_statistics(&self) -> &CacheStatistics {
997 &self.statistics
998 }
999
1000 pub fn get_session_state(&self, session_id: &str) -> Option<&SessionCacheState> {
1002 self.session_state.get(session_id)
1003 }
1004
1005 pub fn get_all_session_states(&self) -> &HashMap<String, SessionCacheState> {
1007 &self.session_state
1008 }
1009
1010 pub async fn restore_from_snapshot(
1012 &mut self,
1013 session_id: &str,
1014 snapshot_id: i64,
1015 ) -> anyhow::Result<Vec<KVEntry>> {
1016 info!("Restoring cache from snapshot {} for session {}", snapshot_id, session_id);
1017
1018 let entries: Vec<KVEntry> = self.database.get_kv_snapshot_entries(snapshot_id).await?;
1019
1020 if let Some(state) = self.session_state.get_mut(session_id) {
1022 state.entry_count = entries.len();
1023 state.last_snapshot_id = Some(snapshot_id);
1024 }
1025
1026 let bridge_message = self.context_bridge.create_restore_bridge(
1028 entries.len(),
1029 None, );
1031
1032 info!("{}", bridge_message);
1033
1034 self.statistics.record_restore(entries.len(), session_id);
1036
1037 Ok(entries)
1038 }
1039
1040 pub async fn manual_clear_cache(
1043 &mut self,
1044 session_id: &str,
1045 current_entries: &[KVEntry],
1046 ) -> anyhow::Result<CacheClearResult> {
1047 self.clear_cache(session_id, current_entries, &[], ClearReason::Manual).await
1048 }
1049
1050 pub async fn perform_maintenance(&mut self) -> anyhow::Result<MaintenanceResult> {
1052 let mut result = MaintenanceResult {
1053 sessions_cleaned: 0,
1054 snapshots_pruned: 0,
1055 errors: Vec::new(),
1056 };
1057
1058 let cutoff = Utc::now() - chrono::Duration::hours(24);
1060 let sessions_to_clean: Vec<String> = self.session_state.iter()
1061 .filter(|(_, state)| {
1062 state.last_cleared_at.is_none_or(|dt| dt < cutoff)
1063 })
1064 .map(|(id, _)| id.clone())
1065 .collect();
1066
1067 for session_id in sessions_to_clean {
1068 if let Err(e) = self.cleanup_session(&session_id).await {
1069 result.errors.push(format!("Failed to cleanup session {}: {}", session_id, e));
1070 } else {
1071 result.sessions_cleaned += 1;
1072 }
1073 }
1074
1075 if let SnapshotStrategy::Incremental { max_snapshots, .. } = &self.config.snapshot_strategy {
1077 let pruned = self.prune_old_snapshots(*max_snapshots).await?;
1078 result.snapshots_pruned = pruned;
1079 }
1080
1081 Ok(result)
1082 }
1083
1084 async fn cleanup_session(&mut self, session_id: &str) -> anyhow::Result<()> {
1086 self.session_state.remove(session_id);
1087 self.database.cleanup_session_snapshots(session_id).await?;
1088 Ok(())
1089 }
1090
1091 async fn prune_old_snapshots(&self, keep_max: usize) -> anyhow::Result<usize> {
1093 self.database.prune_old_kv_snapshots(keep_max).await
1094 }
1095
1096 pub fn export_statistics(&self) -> CacheStatisticsExport {
1098 CacheStatisticsExport {
1099 total_clears: self.statistics.total_clears,
1100 total_retrievals: self.statistics.total_retrievals,
1101 entries_preserved: self.statistics.entries_preserved,
1102 entries_cleared: self.statistics.entries_cleared,
1103 entries_retrieved: self.statistics.entries_retrieved,
1104 active_sessions: self.session_state.len(),
1105 last_operation: self.statistics.last_operation,
1106 operation_history_count: self.statistics.operation_history.len(),
1107 }
1108 }
1109
1110 pub fn get_config(&self) -> &KVCacheConfig {
1112 &self.config
1113 }
1114
1115 pub fn update_config(&mut self, config: KVCacheConfig) {
1117 self.config = config;
1118 }
1119
1120 pub async fn flush_to_database(&self, session_id: &str, current_entries: &[KVEntry]) -> anyhow::Result<Option<i64>> {
1122 if current_entries.is_empty() {
1123 debug!("No entries to flush for session: {}", session_id);
1124 return Ok(None);
1125 }
1126
1127 debug!("Flushing {} cache entries to database for session: {}", current_entries.len(), session_id);
1128
1129 let snapshot_id = self.database.create_kv_snapshot(session_id, current_entries).await?;
1131
1132 if let Some(state) = self.session_state.get(session_id) {
1134 let mut updated_state = state.clone();
1135 updated_state.entry_count = current_entries.len();
1136 updated_state.last_snapshot_id = Some(snapshot_id);
1137
1138 self.update_session_metadata(session_id, &updated_state).await?;
1139 }
1140
1141 info!("Flushed {} entries to database snapshot {} for session {}",
1142 current_entries.len(), snapshot_id, session_id);
1143
1144 Ok(Some(snapshot_id))
1145 }
1146
1147 pub fn cache_scorer(&self) -> &CacheEntryScorer {
1149 &self.cache_scorer
1150 }
1151
1152 pub fn cache_scorer_mut(&mut self) -> &mut CacheEntryScorer {
1154 &mut self.cache_scorer
1155 }
1156
1157 pub fn reset_statistics(&mut self) {
1159 self.statistics = CacheStatistics::new();
1160 }
1161
1162 pub async fn shutdown_flush(&self) -> anyhow::Result<()> {
1164 info!("Starting KV cache manager shutdown flush...");
1165
1166 let all_states = self.get_all_session_states().clone();
1168
1169 for (session_id, state) in all_states {
1170 info!("Flushing cache data for session: {} (entries: {}, size: {} bytes)",
1171 session_id, state.entry_count, state.cache_size_bytes);
1172
1173 self.update_session_metadata(&session_id, &state).await?;
1177 }
1178
1179 info!("KV cache manager shutdown flush completed");
1180 Ok(())
1181 }
1182}
1183
1184impl CacheStatistics {
1185 pub fn new() -> Self {
1186 Self::default()
1187 }
1188
1189 pub fn record_clear(
1190 &mut self,
1191 total_entries: usize,
1192 preserved_entries: usize,
1193 reason: ClearReason,
1194 session_id: &str,
1195 ) {
1196 self.total_clears += 1;
1197 self.entries_preserved += preserved_entries;
1198 self.entries_cleared += total_entries - preserved_entries;
1199 self.last_operation = Some(Utc::now());
1200
1201 self.operation_history.push(CacheOperation {
1202 operation_type: CacheOperationType::Clear,
1203 timestamp: Utc::now(),
1204 entries_affected: total_entries,
1205 session_id: session_id.to_string(),
1206 details: format!("{:?}", reason),
1207 });
1208
1209 if self.operation_history.len() > 100 {
1211 self.operation_history.remove(0);
1212 }
1213 }
1214
1215 pub fn record_retrieval(
1216 &mut self,
1217 retrieved_count: usize,
1218 tiers_searched: Vec<u8>,
1219 keywords_count: usize,
1220 session_id: &str,
1221 ) {
1222 self.total_retrievals += 1;
1223 self.entries_retrieved += retrieved_count;
1224 self.last_operation = Some(Utc::now());
1225
1226 self.operation_history.push(CacheOperation {
1227 operation_type: CacheOperationType::Retrieve,
1228 timestamp: Utc::now(),
1229 entries_affected: retrieved_count,
1230 session_id: session_id.to_string(),
1231 details: format!("Tiers: {:?}, Keywords: {}", tiers_searched, keywords_count),
1232 });
1233
1234 if self.operation_history.len() > 100 {
1236 self.operation_history.remove(0);
1237 }
1238 }
1239
1240 pub fn record_restore(&mut self, restored_count: usize, session_id: &str) {
1241 self.operation_history.push(CacheOperation {
1242 operation_type: CacheOperationType::Restore,
1243 timestamp: Utc::now(),
1244 entries_affected: restored_count,
1245 session_id: session_id.to_string(),
1246 details: "Cache restored from snapshot".to_string(),
1247 });
1248
1249 if self.operation_history.len() > 100 {
1251 self.operation_history.remove(0);
1252 }
1253 }
1254
1255 pub fn record_snapshot(&mut self, snapshot_id: i64, entry_count: usize, session_id: &str) {
1256 self.operation_history.push(CacheOperation {
1257 operation_type: CacheOperationType::Snapshot,
1258 timestamp: Utc::now(),
1259 entries_affected: entry_count,
1260 session_id: session_id.to_string(),
1261 details: format!("Snapshot ID: {}", snapshot_id),
1262 });
1263
1264 if self.operation_history.len() > 100 {
1266 self.operation_history.remove(0);
1267 }
1268 }
1269}
1270
1271impl RetrievalResult {
1272 pub fn new() -> Self {
1273 Self::default()
1274 }
1275
1276 pub fn total_entries(&self) -> usize {
1277 self.retrieved_entries.len()
1278 }
1279
1280 pub fn average_similarity(&self) -> f32 {
1281 if self.retrieved_entries.is_empty() {
1282 return 0.0;
1283 }
1284 self.retrieved_entries.iter()
1285 .map(|e| e.similarity_score)
1286 .sum::<f32>() / self.retrieved_entries.len() as f32
1287 }
1288
1289 pub fn is_empty(&self) -> bool {
1290 self.retrieved_entries.is_empty()
1291 }
1292
1293 pub fn entries_by_tier(&self, tier: u8) -> Vec<&RetrievedEntry> {
1294 self.retrieved_entries.iter()
1295 .filter(|e| e.source_tier == tier)
1296 .collect()
1297 }
1298
1299 pub fn primary_tier(&self) -> u8 {
1300 if self.retrieved_entries.is_empty() {
1301 return 0;
1302 }
1303
1304 let mut tier_counts = HashMap::new();
1306 for entry in &self.retrieved_entries {
1307 *tier_counts.entry(entry.source_tier).or_insert(0) += 1;
1308 }
1309
1310 tier_counts.into_iter()
1312 .max_by_key(|(_, count)| *count)
1313 .map(|(tier, _)| tier)
1314 .unwrap_or(0)
1315 }
1316
1317 pub fn add_tier_results(&mut self, tier: u8, results: Vec<RetrievedEntry>) {
1318 self.tiers_searched.push(tier);
1319 self.retrieved_entries.extend(results);
1320 }
1321}
1322
1323#[derive(Debug, Clone, Serialize)]
1324pub struct CacheStatisticsExport {
1325 pub total_clears: usize,
1326 pub total_retrievals: usize,
1327 pub entries_preserved: usize,
1328 pub entries_cleared: usize,
1329 pub entries_retrieved: usize,
1330 pub active_sessions: usize,
1331 pub last_operation: Option<DateTime<Utc>>,
1332 pub operation_history_count: usize,
1333}
1334
1335#[derive(Debug, Clone, Serialize)]
1336pub struct MaintenanceResult {
1337 pub sessions_cleaned: usize,
1338 pub snapshots_pruned: usize,
1339 pub errors: Vec<String>,
1340}