offline_intelligence/worker_threads/
cache_worker.rs1use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9 shared_state::SharedState,
10 cache_management::cache_extractor::KVEntry,
11};
12
13pub struct CacheWorker {
14 shared_state: Arc<SharedState>,
15}
16
17impl CacheWorker {
18 pub fn new(shared_state: Arc<SharedState>) -> Self {
19 Self { shared_state }
20 }
21
22 pub async fn update_cache(
24 &self,
25 session_id: String,
26 entries: Vec<KVEntry>,
27 ) -> anyhow::Result<()> {
28 debug!("Cache worker updating cache for session: {}", session_id);
29
30 let cache_guard = self.shared_state.cache_manager.read()
31 .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
32 if let Some(cache_manager) = &*cache_guard {
33 info!("Updated cache for session {} with {} entries", session_id, entries.len());
36 self.shared_state.counters.inc_cache_hit();
37 } else {
38 debug!("Cache manager not available");
39 self.shared_state.counters.inc_cache_miss();
40 }
41
42 Ok(())
43 }
44
45 pub async fn get_cache_entries(
47 &self,
48 session_id: &str,
49 ) -> anyhow::Result<Vec<KVEntry>> {
50 debug!("Cache worker retrieving entries for session: {}", session_id);
51
52 let cache_guard = self.shared_state.cache_manager.read()
53 .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
54 if let Some(cache_manager) = &*cache_guard {
55 let entries = Vec::new(); info!("Retrieved {} cache entries for session {}", entries.len(), session_id);
59 Ok(entries)
60 } else {
61 debug!("Cache manager not available");
62 Ok(Vec::new())
63 }
64 }
65
66 pub async fn create_snapshot(
68 &self,
69 session_id: &str,
70 entries: &[KVEntry],
71 ) -> anyhow::Result<i64> {
72 debug!("Creating KV snapshot for session: {}", session_id);
73
74 let snapshot_id = self.shared_state.database_pool
76 .create_kv_snapshot(session_id, entries)
77 .await?;
78
79 info!("Created KV snapshot {} for session {}", snapshot_id, session_id);
80 Ok(snapshot_id)
81 }
82}