offline_intelligence/worker_threads/
cache_worker.rs1use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9 shared_state::SharedState,
10 cache_management::cache_extractor::KVEntry,
11};
12
13pub struct CacheWorker {
14 shared_state: Arc<SharedState>,
15}
16
17impl CacheWorker {
18 pub fn new(shared_state: Arc<SharedState>) -> Self {
19 Self { shared_state }
20 }
21
22 pub async fn update_cache(
24 &self,
25 session_id: String,
26 entries: Vec<KVEntry>,
27 ) -> anyhow::Result<()> {
28 debug!("Cache worker updating cache for session: {}", session_id);
29
30 let cache_guard = self.shared_state.cache_manager.read()
31 .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
32 if let Some(cache_manager) = &*cache_guard {
33 let snapshot_id = cache_manager.flush_to_database(&session_id, &entries).await?;
34 info!("Persisted cache for session {} with {} entries (snapshot: {:?})",
35 session_id, entries.len(), snapshot_id);
36 self.shared_state.counters.inc_cache_hit();
37 } else {
38 debug!("Cache manager not available");
39 self.shared_state.counters.inc_cache_miss();
40 }
41
42 Ok(())
43 }
44
45 pub async fn get_cache_entries(
47 &self,
48 session_id: &str,
49 ) -> anyhow::Result<Vec<KVEntry>> {
50 debug!("Cache worker retrieving entries for session: {}", session_id);
51
52 let snapshots = self.shared_state.database_pool
54 .get_recent_kv_snapshots(session_id, 1)
55 .await?;
56
57 if let Some(snapshot) = snapshots.first() {
58 let entries = self.shared_state.database_pool
59 .get_kv_snapshot_entries(snapshot.id)
60 .await?;
61 info!("Retrieved {} cache entries for session {}", entries.len(), session_id);
62 Ok(entries)
63 } else {
64 debug!("No KV snapshots found for session {}", session_id);
65 Ok(Vec::new())
66 }
67 }
68
69 pub async fn create_snapshot(
71 &self,
72 session_id: &str,
73 entries: &[KVEntry],
74 ) -> anyhow::Result<i64> {
75 debug!("Creating KV snapshot for session: {}", session_id);
76
77 let snapshot_id = self.shared_state.database_pool
79 .create_kv_snapshot(session_id, entries)
80 .await?;
81
82 info!("Created KV snapshot {} for session {}", snapshot_id, session_id);
83 Ok(snapshot_id)
84 }
85}