offline_intelligence/worker_threads/
database_worker.rs1use std::sync::Arc;
6use tracing::{info, debug};
7
8use crate::{
9 shared_state::SharedState,
10 memory::Message,
11 memory_db::{StoredMessage, Transaction, DatabaseStats},
12 cache_management::cache_scorer::score_message_importance,
13};
14
15pub struct DatabaseWorker {
16 shared_state: Arc<SharedState>,
17}
18
19impl DatabaseWorker {
20 pub fn new(shared_state: Arc<SharedState>) -> Self {
21 Self { shared_state }
22 }
23
24 pub async fn store_messages(
26 &self,
27 session_id: String,
28 messages: Vec<Message>,
29 ) -> anyhow::Result<()> {
30 debug!("Database worker storing {} messages for session: {}", messages.len(), session_id);
31
32 let batch: Vec<(String, String, i32, i32, f32)> = messages.iter().enumerate()
33 .map(|(i, m)| (
34 m.role.clone(),
35 m.content.clone(),
36 i as i32,
37 (m.content.len() / 4) as i32,
38 score_message_importance(&m.role, &m.content),
39 ))
40 .collect();
41
42 self.shared_state.database_pool.conversations
43 .store_messages_batch(&session_id, &batch)?;
44
45 info!("Stored {} messages for session {}", messages.len(), session_id);
46 Ok(())
47 }
48
49 pub async fn get_conversation(
51 &self,
52 session_id: &str,
53 ) -> anyhow::Result<Vec<StoredMessage>> {
54 debug!("Database worker retrieving conversation: {}", session_id);
55
56 let messages = self.shared_state.database_pool.conversations
57 .get_session_messages(session_id, None, None)?;
58
59 info!("Retrieved conversation {} with {} messages", session_id, messages.len());
60 Ok(messages)
61 }
62
63 pub async fn update_conversation_title(
65 &self,
66 session_id: &str,
67 title: &str,
68 ) -> anyhow::Result<()> {
69 debug!("Database worker updating title for session: {}", session_id);
70
71 self.shared_state.database_pool.conversations
72 .update_session_title(session_id, title)?;
73
74 info!("Updated conversation title for session {}", session_id);
75 Ok(())
76 }
77
78 pub async fn delete_conversation(
80 &self,
81 session_id: &str,
82 ) -> anyhow::Result<()> {
83 debug!("Database worker deleting conversation: {}", session_id);
84
85 let deleted = self.shared_state.database_pool.conversations
86 .delete_session(session_id)?;
87
88 info!("Deleted conversation {} ({} records removed)", session_id, deleted);
89 Ok(())
90 }
91
92 pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_>> {
94 debug!("Database worker beginning transaction");
95
96 let transaction = self.shared_state.database_pool.begin_transaction()?;
98 Ok(transaction)
99 }
100
101 pub async fn get_stats(&self) -> anyhow::Result<DatabaseStats> {
103 debug!("Database worker getting statistics");
104
105 let stats = self.shared_state.database_pool.get_stats()?;
106 Ok(stats)
107 }
108
109 pub async fn cleanup_old_data(&self, older_than_days: i32) -> anyhow::Result<usize> {
111 debug!("Database worker cleaning up data older than {} days", older_than_days);
112
113 let deleted_count = self.shared_state.database_pool.cleanup_old_data(older_than_days)?;
114 info!("Cleaned up {} old records", deleted_count);
115 Ok(deleted_count)
116 }
117}