Skip to main content

offline_intelligence/worker_threads/
cache_worker.rs

1//! Cache worker thread implementation
2//!
3//! Handles KV cache operations in a dedicated thread.
4
5use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9    shared_state::SharedState,
10    cache_management::cache_extractor::KVEntry,
11};
12
13pub struct CacheWorker {
14    shared_state: Arc<SharedState>,
15}
16
17impl CacheWorker {
18    pub fn new(shared_state: Arc<SharedState>) -> Self {
19        Self { shared_state }
20    }
21    
22    /// Persist cache entries for a session by creating a KV snapshot.
23    pub async fn update_cache(
24        &self,
25        session_id: String,
26        entries: Vec<KVEntry>,
27    ) -> anyhow::Result<()> {
28        debug!("Cache worker updating cache for session: {}", session_id);
29
30        let cache_guard = self.shared_state.cache_manager.read()
31            .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
32        if let Some(cache_manager) = &*cache_guard {
33            let snapshot_id = cache_manager.flush_to_database(&session_id, &entries).await?;
34            info!("Persisted cache for session {} with {} entries (snapshot: {:?})",
35                  session_id, entries.len(), snapshot_id);
36            self.shared_state.counters.inc_cache_hit();
37        } else {
38            debug!("Cache manager not available");
39            self.shared_state.counters.inc_cache_miss();
40        }
41
42        Ok(())
43    }
44
45    /// Retrieve the most-recently persisted KV cache entries for a session.
46    pub async fn get_cache_entries(
47        &self,
48        session_id: &str,
49    ) -> anyhow::Result<Vec<KVEntry>> {
50        debug!("Cache worker retrieving entries for session: {}", session_id);
51
52        // Look up the latest KV snapshot persisted for this session
53        let snapshots = self.shared_state.database_pool
54            .get_recent_kv_snapshots(session_id, 1)
55            .await?;
56
57        if let Some(snapshot) = snapshots.first() {
58            let entries = self.shared_state.database_pool
59                .get_kv_snapshot_entries(snapshot.id)
60                .await?;
61            info!("Retrieved {} cache entries for session {}", entries.len(), session_id);
62            Ok(entries)
63        } else {
64            debug!("No KV snapshots found for session {}", session_id);
65            Ok(Vec::new())
66        }
67    }
68    
69    /// Create KV snapshot
70    pub async fn create_snapshot(
71        &self,
72        session_id: &str,
73        entries: &[KVEntry],
74    ) -> anyhow::Result<i64> {
75        debug!("Creating KV snapshot for session: {}", session_id);
76        
77        // Use database pool from shared state
78        let snapshot_id = self.shared_state.database_pool
79            .create_kv_snapshot(session_id, entries)
80            .await?;
81            
82        info!("Created KV snapshot {} for session {}", snapshot_id, session_id);
83        Ok(snapshot_id)
84    }
85}