Skip to main content

offline_intelligence/worker_threads/
context_worker.rs

1//! Context worker thread implementation
2//!
3//! Handles conversation context optimization and management in a dedicated thread.
4
5use std::sync::Arc;
6use tracing::{info, debug, warn};
7
8use crate::{
9    shared_state::SharedState,
10    memory::Message,
11};
12
13pub struct ContextWorker {
14    shared_state: Arc<SharedState>,
15}
16
17impl ContextWorker {
18    pub fn new(shared_state: Arc<SharedState>) -> Self {
19        Self { shared_state }
20    }
21
22    /// Process conversation context optimization
23    pub async fn process_conversation(
24        &self,
25        session_id: String,
26        messages: Vec<Message>,
27        user_query: Option<&str>,
28    ) -> anyhow::Result<Vec<Message>> {
29        debug!("Context worker processing conversation for session: {}", session_id);
30
31        // Access context orchestrator through shared state (tokio RwLock)
32        let orchestrator_guard = self.shared_state.context_orchestrator.read().await;
33
34        if let Some(ref orchestrator) = *orchestrator_guard {
35            match orchestrator.process_conversation(&session_id, &messages, user_query).await {
36                Ok(optimized) => {
37                    debug!("Context optimized: {} -> {} messages", messages.len(), optimized.len());
38                    Ok(optimized)
39                }
40                Err(e) => {
41                    warn!("Context optimization failed: {}, using original", e);
42                    Ok(messages)
43                }
44            }
45        } else {
46            warn!("Context orchestrator not available, using original messages");
47            Ok(messages)
48        }
49    }
50
51    /// Save assistant response to database
52    pub async fn save_assistant_response(
53        &self,
54        session_id: &str,
55        assistant_content: &str,
56    ) -> anyhow::Result<()> {
57        debug!("Saving assistant response for session: {}", session_id);
58
59        let orchestrator_guard = self.shared_state.context_orchestrator.read().await;
60        if let Some(ref orchestrator) = *orchestrator_guard {
61            orchestrator.save_assistant_response(session_id, assistant_content).await?;
62            info!("Assistant response saved for session: {}", session_id);
63        }
64
65        Ok(())
66    }
67
68    /// Ensure session exists in database
69    pub async fn ensure_session_exists(
70        &self,
71        session_id: &str,
72        title: Option<&str>,
73    ) -> anyhow::Result<()> {
74        let orchestrator_guard = self.shared_state.context_orchestrator.read().await;
75        if let Some(ref orchestrator) = *orchestrator_guard {
76            let tier_manager_guard = orchestrator.tier_manager().read().await;
77            if let Err(e) = tier_manager_guard.ensure_session_exists(session_id, title.map(|s| s.to_string())).await {
78                return Err(anyhow::anyhow!("Failed to ensure session exists: {}", e));
79            }
80            info!("Ensured session {} exists", session_id);
81        }
82
83        Ok(())
84    }
85}