1use super::{Store, CONVERSATION_TIMEOUT_MINUTES};
4use crate::error::MemoryError;
5use uuid::Uuid;
6
7impl Store {
8 pub(crate) async fn get_or_create_conversation(
13 &self,
14 channel: &str,
15 sender_id: &str,
16 project: &str,
17 ) -> Result<String, MemoryError> {
18 let row: Option<(String,)> = sqlx::query_as(
19 "SELECT id FROM conversations \
20 WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active' \
21 AND datetime(last_activity) > datetime('now', ? || ' minutes') \
22 ORDER BY last_activity DESC LIMIT 1",
23 )
24 .bind(channel)
25 .bind(sender_id)
26 .bind(project)
27 .bind(-CONVERSATION_TIMEOUT_MINUTES)
28 .fetch_optional(&self.pool)
29 .await
30 .map_err(|e| MemoryError::sqlite("query failed", e))?;
31
32 if let Some((id,)) = row {
33 sqlx::query(
34 "UPDATE conversations SET last_activity = datetime('now'), updated_at = datetime('now') WHERE id = ?",
35 )
36 .bind(&id)
37 .execute(&self.pool)
38 .await
39 .map_err(|e| MemoryError::sqlite("update failed", e))?;
40 return Ok(id);
41 }
42
43 let id = Uuid::new_v4().to_string();
44 sqlx::query(
45 "INSERT INTO conversations (id, channel, sender_id, project, status, last_activity) \
46 VALUES (?, ?, ?, ?, 'active', datetime('now'))",
47 )
48 .bind(&id)
49 .bind(channel)
50 .bind(sender_id)
51 .bind(project)
52 .execute(&self.pool)
53 .await
54 .map_err(|e| MemoryError::sqlite("insert failed", e))?;
55
56 Ok(id)
57 }
58
59 pub async fn find_idle_conversations(
61 &self,
62 ) -> Result<Vec<(String, String, String, String)>, MemoryError> {
63 let rows: Vec<(String, String, String, String)> = sqlx::query_as(
64 "SELECT id, channel, sender_id, project FROM conversations \
65 WHERE status = 'active' \
66 AND datetime(last_activity) <= datetime('now', ? || ' minutes')",
67 )
68 .bind(-CONVERSATION_TIMEOUT_MINUTES)
69 .fetch_all(&self.pool)
70 .await
71 .map_err(|e| MemoryError::sqlite("query failed", e))?;
72
73 Ok(rows)
74 }
75
76 pub async fn find_all_active_conversations(
78 &self,
79 ) -> Result<Vec<(String, String, String, String)>, MemoryError> {
80 let rows: Vec<(String, String, String, String)> = sqlx::query_as(
81 "SELECT id, channel, sender_id, project FROM conversations WHERE status = 'active'",
82 )
83 .fetch_all(&self.pool)
84 .await
85 .map_err(|e| MemoryError::sqlite("query failed", e))?;
86
87 Ok(rows)
88 }
89
90 pub async fn get_conversation_messages(
92 &self,
93 conversation_id: &str,
94 ) -> Result<Vec<(String, String)>, MemoryError> {
95 let rows: Vec<(String, String)> = sqlx::query_as(
96 "SELECT role, content FROM messages \
97 WHERE conversation_id = ? ORDER BY timestamp ASC",
98 )
99 .bind(conversation_id)
100 .fetch_all(&self.pool)
101 .await
102 .map_err(|e| MemoryError::sqlite("query failed", e))?;
103
104 Ok(rows)
105 }
106
107 pub async fn close_conversation(
109 &self,
110 conversation_id: &str,
111 summary: &str,
112 ) -> Result<(), MemoryError> {
113 sqlx::query(
114 "UPDATE conversations SET status = 'closed', summary = ?, updated_at = datetime('now') WHERE id = ?",
115 )
116 .bind(summary)
117 .bind(conversation_id)
118 .execute(&self.pool)
119 .await
120 .map_err(|e| MemoryError::sqlite("update failed", e))?;
121
122 Ok(())
123 }
124
125 pub async fn close_current_conversation(
127 &self,
128 channel: &str,
129 sender_id: &str,
130 project: &str,
131 ) -> Result<bool, MemoryError> {
132 let result = sqlx::query(
133 "UPDATE conversations SET status = 'closed', updated_at = datetime('now') \
134 WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active'",
135 )
136 .bind(channel)
137 .bind(sender_id)
138 .bind(project)
139 .execute(&self.pool)
140 .await
141 .map_err(|e| MemoryError::sqlite("update failed", e))?;
142
143 Ok(result.rows_affected() > 0)
144 }
145
146 pub async fn get_recent_summaries(
148 &self,
149 channel: &str,
150 sender_id: &str,
151 limit: i64,
152 ) -> Result<Vec<(String, String)>, MemoryError> {
153 let rows: Vec<(String, String)> = sqlx::query_as(
154 "SELECT summary, updated_at FROM conversations \
155 WHERE channel = ? AND sender_id = ? AND status = 'closed' AND summary IS NOT NULL \
156 ORDER BY updated_at DESC LIMIT ?",
157 )
158 .bind(channel)
159 .bind(sender_id)
160 .bind(limit)
161 .fetch_all(&self.pool)
162 .await
163 .map_err(|e| MemoryError::sqlite("query failed", e))?;
164
165 Ok(rows)
166 }
167
168 pub async fn get_all_recent_summaries(
170 &self,
171 limit: i64,
172 ) -> Result<Vec<(String, String)>, MemoryError> {
173 let rows: Vec<(String, String)> = sqlx::query_as(
174 "SELECT summary, updated_at FROM conversations \
175 WHERE status = 'closed' AND summary IS NOT NULL \
176 ORDER BY updated_at DESC LIMIT ?",
177 )
178 .bind(limit)
179 .fetch_all(&self.pool)
180 .await
181 .map_err(|e| MemoryError::sqlite("query failed", e))?;
182
183 Ok(rows)
184 }
185
186 pub async fn get_history(
191 &self,
192 channel: &str,
193 sender_id: &str,
194 limit: i64,
195 ) -> Result<Vec<crate::types::HistoryRow>, MemoryError> {
196 let rows: Vec<(String, String, String)> = sqlx::query_as(
197 "SELECT id, COALESCE(summary, '(no summary)'), updated_at FROM conversations \
198 WHERE channel = ? AND sender_id = ? AND status = 'closed' \
199 ORDER BY updated_at DESC LIMIT ?",
200 )
201 .bind(channel)
202 .bind(sender_id)
203 .bind(limit)
204 .fetch_all(&self.pool)
205 .await
206 .map_err(|e| MemoryError::sqlite("query failed", e))?;
207
208 let mut out = Vec::with_capacity(rows.len());
209 for (conversation_id, summary, updated_at) in rows {
210 out.push(crate::types::HistoryRow {
211 conversation_id,
212 summary,
213 updated_at: crate::types::parse_sqlite_timestamp(&updated_at)?,
214 });
215 }
216 Ok(out)
217 }
218
219 pub async fn get_memory_stats(
233 &self,
234 sender_id: &str,
235 ) -> Result<(i64, i64, i64, i64), MemoryError> {
236 let (conv_count,): (i64,) =
237 sqlx::query_as("SELECT COUNT(*) FROM conversations WHERE sender_id = ?")
238 .bind(sender_id)
239 .fetch_one(&self.pool)
240 .await
241 .map_err(|e| MemoryError::sqlite("query failed", e))?;
242
243 let (msg_count,): (i64,) = sqlx::query_as(
244 "SELECT COUNT(*) FROM messages m \
245 JOIN conversations c ON m.conversation_id = c.id \
246 WHERE c.sender_id = ?",
247 )
248 .bind(sender_id)
249 .fetch_one(&self.pool)
250 .await
251 .map_err(|e| MemoryError::sqlite("query failed", e))?;
252
253 let obs_count = self.count_observations(sender_id).await?;
254
255 let (fact_count,): (i64,) = sqlx::query_as(
256 "SELECT COUNT(*) FROM facts \
257 WHERE sender_id = ? AND deleted_at IS NULL",
258 )
259 .bind(sender_id)
260 .fetch_one(&self.pool)
261 .await
262 .map_err(|e| MemoryError::sqlite("query failed", e))?;
263
264 Ok((conv_count, msg_count, obs_count, fact_count))
265 }
266}