Skip to main content

offline_intelligence/worker_threads/
cache_worker.rs

1//! Cache worker thread implementation
2//!
3//! Handles KV cache operations in a dedicated thread.
4
5use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9    shared_state::SharedState,
10    cache_management::cache_extractor::KVEntry,
11};
12
13pub struct CacheWorker {
14    shared_state: Arc<SharedState>,
15}
16
17impl CacheWorker {
18    pub fn new(shared_state: Arc<SharedState>) -> Self {
19        Self { shared_state }
20    }
21    
22    /// Update cache with new entries
23    pub async fn update_cache(
24        &self,
25        session_id: String,
26        entries: Vec<KVEntry>,
27    ) -> anyhow::Result<()> {
28        debug!("Cache worker updating cache for session: {}", session_id);
29        
30        let cache_guard = self.shared_state.cache_manager.read()
31            .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
32        if let Some(cache_manager) = &*cache_guard {
33            // Update cache entries
34            // This would call the actual cache manager methods
35            info!("Updated cache for session {} with {} entries", session_id, entries.len());
36            self.shared_state.counters.inc_cache_hit();
37        } else {
38            debug!("Cache manager not available");
39            self.shared_state.counters.inc_cache_miss();
40        }
41        
42        Ok(())
43    }
44    
45    /// Retrieve cache entries for session
46    pub async fn get_cache_entries(
47        &self,
48        session_id: &str,
49    ) -> anyhow::Result<Vec<KVEntry>> {
50        debug!("Cache worker retrieving entries for session: {}", session_id);
51        
52        let cache_guard = self.shared_state.cache_manager.read()
53            .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager read lock"))?;
54        if let Some(cache_manager) = &*cache_guard {
55            // Retrieve cache entries
56            // This would call the actual cache manager methods
57            let entries = Vec::new(); // Placeholder
58            info!("Retrieved {} cache entries for session {}", entries.len(), session_id);
59            Ok(entries)
60        } else {
61            debug!("Cache manager not available");
62            Ok(Vec::new())
63        }
64    }
65    
66    /// Create KV snapshot
67    pub async fn create_snapshot(
68        &self,
69        session_id: &str,
70        entries: &[KVEntry],
71    ) -> anyhow::Result<i64> {
72        debug!("Creating KV snapshot for session: {}", session_id);
73        
74        // Use database pool from shared state
75        let snapshot_id = self.shared_state.database_pool
76            .create_kv_snapshot(session_id, entries)
77            .await?;
78            
79        info!("Created KV snapshot {} for session {}", snapshot_id, session_id);
80        Ok(snapshot_id)
81    }
82}