1use super::{Store, CONVERSATION_TIMEOUT_MINUTES};
4use kernex_core::error::KernexError;
5use uuid::Uuid;
6
7impl Store {
8 pub(crate) async fn get_or_create_conversation(
13 &self,
14 channel: &str,
15 sender_id: &str,
16 project: &str,
17 ) -> Result<String, KernexError> {
18 let row: Option<(String,)> = sqlx::query_as(
19 "SELECT id FROM conversations \
20 WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active' \
21 AND datetime(last_activity) > datetime('now', ? || ' minutes') \
22 ORDER BY last_activity DESC LIMIT 1",
23 )
24 .bind(channel)
25 .bind(sender_id)
26 .bind(project)
27 .bind(-CONVERSATION_TIMEOUT_MINUTES)
28 .fetch_optional(&self.pool)
29 .await
30 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
31
32 if let Some((id,)) = row {
33 sqlx::query(
34 "UPDATE conversations SET last_activity = datetime('now'), updated_at = datetime('now') WHERE id = ?",
35 )
36 .bind(&id)
37 .execute(&self.pool)
38 .await
39 .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
40 return Ok(id);
41 }
42
43 let id = Uuid::new_v4().to_string();
44 sqlx::query(
45 "INSERT INTO conversations (id, channel, sender_id, project, status, last_activity) \
46 VALUES (?, ?, ?, ?, 'active', datetime('now'))",
47 )
48 .bind(&id)
49 .bind(channel)
50 .bind(sender_id)
51 .bind(project)
52 .execute(&self.pool)
53 .await
54 .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
55
56 Ok(id)
57 }
58
59 pub async fn find_idle_conversations(
61 &self,
62 ) -> Result<Vec<(String, String, String, String)>, KernexError> {
63 let rows: Vec<(String, String, String, String)> = sqlx::query_as(
64 "SELECT id, channel, sender_id, project FROM conversations \
65 WHERE status = 'active' \
66 AND datetime(last_activity) <= datetime('now', ? || ' minutes')",
67 )
68 .bind(-CONVERSATION_TIMEOUT_MINUTES)
69 .fetch_all(&self.pool)
70 .await
71 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
72
73 Ok(rows)
74 }
75
76 pub async fn find_all_active_conversations(
78 &self,
79 ) -> Result<Vec<(String, String, String, String)>, KernexError> {
80 let rows: Vec<(String, String, String, String)> = sqlx::query_as(
81 "SELECT id, channel, sender_id, project FROM conversations WHERE status = 'active'",
82 )
83 .fetch_all(&self.pool)
84 .await
85 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
86
87 Ok(rows)
88 }
89
90 pub async fn get_conversation_messages(
92 &self,
93 conversation_id: &str,
94 ) -> Result<Vec<(String, String)>, KernexError> {
95 let rows: Vec<(String, String)> = sqlx::query_as(
96 "SELECT role, content FROM messages \
97 WHERE conversation_id = ? ORDER BY timestamp ASC",
98 )
99 .bind(conversation_id)
100 .fetch_all(&self.pool)
101 .await
102 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
103
104 Ok(rows)
105 }
106
107 pub async fn close_conversation(
109 &self,
110 conversation_id: &str,
111 summary: &str,
112 ) -> Result<(), KernexError> {
113 sqlx::query(
114 "UPDATE conversations SET status = 'closed', summary = ?, updated_at = datetime('now') WHERE id = ?",
115 )
116 .bind(summary)
117 .bind(conversation_id)
118 .execute(&self.pool)
119 .await
120 .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
121
122 Ok(())
123 }
124
125 pub async fn close_current_conversation(
127 &self,
128 channel: &str,
129 sender_id: &str,
130 project: &str,
131 ) -> Result<bool, KernexError> {
132 let result = sqlx::query(
133 "UPDATE conversations SET status = 'closed', updated_at = datetime('now') \
134 WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active'",
135 )
136 .bind(channel)
137 .bind(sender_id)
138 .bind(project)
139 .execute(&self.pool)
140 .await
141 .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
142
143 Ok(result.rows_affected() > 0)
144 }
145
146 pub async fn get_recent_summaries(
148 &self,
149 channel: &str,
150 sender_id: &str,
151 limit: i64,
152 ) -> Result<Vec<(String, String)>, KernexError> {
153 let rows: Vec<(String, String)> = sqlx::query_as(
154 "SELECT summary, updated_at FROM conversations \
155 WHERE channel = ? AND sender_id = ? AND status = 'closed' AND summary IS NOT NULL \
156 ORDER BY updated_at DESC LIMIT ?",
157 )
158 .bind(channel)
159 .bind(sender_id)
160 .bind(limit)
161 .fetch_all(&self.pool)
162 .await
163 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
164
165 Ok(rows)
166 }
167
168 pub async fn get_all_recent_summaries(
170 &self,
171 limit: i64,
172 ) -> Result<Vec<(String, String)>, KernexError> {
173 let rows: Vec<(String, String)> = sqlx::query_as(
174 "SELECT summary, updated_at FROM conversations \
175 WHERE status = 'closed' AND summary IS NOT NULL \
176 ORDER BY updated_at DESC LIMIT ?",
177 )
178 .bind(limit)
179 .fetch_all(&self.pool)
180 .await
181 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
182
183 Ok(rows)
184 }
185
186 pub async fn get_history(
188 &self,
189 channel: &str,
190 sender_id: &str,
191 limit: i64,
192 ) -> Result<Vec<(String, String)>, KernexError> {
193 let rows: Vec<(String, String)> = sqlx::query_as(
194 "SELECT COALESCE(summary, '(no summary)'), updated_at FROM conversations \
195 WHERE channel = ? AND sender_id = ? AND status = 'closed' \
196 ORDER BY updated_at DESC LIMIT ?",
197 )
198 .bind(channel)
199 .bind(sender_id)
200 .bind(limit)
201 .fetch_all(&self.pool)
202 .await
203 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
204
205 Ok(rows)
206 }
207
208 pub async fn get_memory_stats(&self, sender_id: &str) -> Result<(i64, i64, i64), KernexError> {
210 let (conv_count,): (i64,) =
211 sqlx::query_as("SELECT COUNT(*) FROM conversations WHERE sender_id = ?")
212 .bind(sender_id)
213 .fetch_one(&self.pool)
214 .await
215 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
216
217 let (msg_count,): (i64,) = sqlx::query_as(
218 "SELECT COUNT(*) FROM messages m \
219 JOIN conversations c ON m.conversation_id = c.id \
220 WHERE c.sender_id = ?",
221 )
222 .bind(sender_id)
223 .fetch_one(&self.pool)
224 .await
225 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
226
227 let (fact_count,): (i64,) =
228 sqlx::query_as("SELECT COUNT(*) FROM facts WHERE sender_id = ?")
229 .bind(sender_id)
230 .fetch_one(&self.pool)
231 .await
232 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
233
234 Ok((conv_count, msg_count, fact_count))
235 }
236}