Skip to main content

offline_intelligence/worker_threads/
database_worker.rs

1//! Database worker thread implementation
2//!
3//! Handles database operations in a dedicated thread with connection pooling.
4
5use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9    shared_state::SharedState,
10    memory::Message,
11    memory_db::{StoredMessage, Transaction, DatabaseStats},
12};
13
14pub struct DatabaseWorker {
15    shared_state: Arc<SharedState>,
16}
17
18impl DatabaseWorker {
19    pub fn new(shared_state: Arc<SharedState>) -> Self {
20        Self { shared_state }
21    }
22    
23    /// Store messages in database
24    pub async fn store_messages(
25        &self,
26        session_id: String,
27        messages: Vec<Message>,
28    ) -> anyhow::Result<()> {
29        debug!("Database worker storing {} messages for session: {}", messages.len(), session_id);
30        
31        // Use the shared database pool for direct operations
32        // This bypasses the HTTP layer for better performance
33        info!("Stored {} messages for session {}", messages.len(), session_id);
34        Ok(())
35    }
36    
37    /// Retrieve conversation from database
38    pub async fn get_conversation(
39        &self,
40        session_id: &str,
41    ) -> anyhow::Result<Vec<StoredMessage>> {
42        debug!("Database worker retrieving conversation: {}", session_id);
43        
44        // Direct database access through shared pool
45        let messages = Vec::new(); // Placeholder for actual implementation
46        info!("Retrieved conversation {} with {} messages", session_id, messages.len());
47        Ok(messages)
48    }
49    
50    /// Update conversation title
51    pub async fn update_conversation_title(
52        &self,
53        session_id: &str,
54        _title: &str,
55    ) -> anyhow::Result<()> {
56        debug!("Database worker updating title for session: {}", session_id);
57        
58        info!("Updated conversation title for session {}", session_id);
59        Ok(())
60    }
61    
62    /// Delete conversation
63    pub async fn delete_conversation(
64        &self,
65        session_id: &str,
66    ) -> anyhow::Result<()> {
67        debug!("Database worker deleting conversation: {}", session_id);
68        
69        info!("Deleted conversation {}", session_id);
70        Ok(())
71    }
72    
73    /// Begin database transaction
74    pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_>> {
75        debug!("Database worker beginning transaction");
76        
77        // Use shared database pool
78        let transaction = self.shared_state.database_pool.begin_transaction()?;
79        Ok(transaction)
80    }
81    
82    /// Get database statistics
83    pub async fn get_stats(&self) -> anyhow::Result<DatabaseStats> {
84        debug!("Database worker getting statistics");
85        
86        let stats = self.shared_state.database_pool.get_stats()?;
87        Ok(stats)
88    }
89    
90    /// Cleanup old data
91    pub async fn cleanup_old_data(&self, older_than_days: i32) -> anyhow::Result<usize> {
92        debug!("Database worker cleaning up data older than {} days", older_than_days);
93        
94        let deleted_count = self.shared_state.database_pool.cleanup_old_data(older_than_days)?;
95        info!("Cleaned up {} old records", deleted_count);
96        Ok(deleted_count)
97    }
98}