Skip to main content

offline_intelligence/worker_threads/
database_worker.rs

1//! Database worker thread implementation
2//!
3//! Handles database operations in a dedicated thread with connection pooling.
4
5use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9    shared_state::SharedState,
10    memory::Message,
11    memory_db::{StoredMessage, Transaction, DatabaseStats},
12    cache_management::cache_scorer::score_message_importance,
13};
14
15pub struct DatabaseWorker {
16    shared_state: Arc<SharedState>,
17}
18
19impl DatabaseWorker {
20    pub fn new(shared_state: Arc<SharedState>) -> Self {
21        Self { shared_state }
22    }
23    
24    /// Store messages in database
25    pub async fn store_messages(
26        &self,
27        session_id: String,
28        messages: Vec<Message>,
29    ) -> anyhow::Result<()> {
30        debug!("Database worker storing {} messages for session: {}", messages.len(), session_id);
31
32        let batch: Vec<(String, String, i32, i32, f32)> = messages.iter().enumerate()
33            .map(|(i, m)| (
34                m.role.clone(),
35                m.content.clone(),
36                i as i32,
37                (m.content.len() / 4) as i32,
38                score_message_importance(&m.role, &m.content),
39            ))
40            .collect();
41
42        self.shared_state.database_pool.conversations
43            .store_messages_batch(&session_id, &batch)?;
44
45        info!("Stored {} messages for session {}", messages.len(), session_id);
46        Ok(())
47    }
48
49    /// Retrieve conversation from database
50    pub async fn get_conversation(
51        &self,
52        session_id: &str,
53    ) -> anyhow::Result<Vec<StoredMessage>> {
54        debug!("Database worker retrieving conversation: {}", session_id);
55
56        let messages = self.shared_state.database_pool.conversations
57            .get_session_messages(session_id, None, None)?;
58
59        info!("Retrieved conversation {} with {} messages", session_id, messages.len());
60        Ok(messages)
61    }
62
63    /// Update conversation title
64    pub async fn update_conversation_title(
65        &self,
66        session_id: &str,
67        title: &str,
68    ) -> anyhow::Result<()> {
69        debug!("Database worker updating title for session: {}", session_id);
70
71        self.shared_state.database_pool.conversations
72            .update_session_title(session_id, title)?;
73
74        info!("Updated conversation title for session {}", session_id);
75        Ok(())
76    }
77
78    /// Delete conversation
79    pub async fn delete_conversation(
80        &self,
81        session_id: &str,
82    ) -> anyhow::Result<()> {
83        debug!("Database worker deleting conversation: {}", session_id);
84
85        let deleted = self.shared_state.database_pool.conversations
86            .delete_session(session_id)?;
87
88        info!("Deleted conversation {} ({} records removed)", session_id, deleted);
89        Ok(())
90    }
91    
92    /// Begin database transaction
93    pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_>> {
94        debug!("Database worker beginning transaction");
95        
96        // Use shared database pool
97        let transaction = self.shared_state.database_pool.begin_transaction()?;
98        Ok(transaction)
99    }
100    
101    /// Get database statistics
102    pub async fn get_stats(&self) -> anyhow::Result<DatabaseStats> {
103        debug!("Database worker getting statistics");
104        
105        let stats = self.shared_state.database_pool.get_stats()?;
106        Ok(stats)
107    }
108    
109    /// Cleanup old data
110    pub async fn cleanup_old_data(&self, older_than_days: i32) -> anyhow::Result<usize> {
111        debug!("Database worker cleaning up data older than {} days", older_than_days);
112        
113        let deleted_count = self.shared_state.database_pool.cleanup_old_data(older_than_days)?;
114        info!("Cleaned up {} old records", deleted_count);
115        Ok(deleted_count)
116    }
117}