1use crate::error::MemoryError;
5use crate::sqlite::SqliteStore;
6
7pub struct AcpSessionEvent {
8 pub event_type: String,
9 pub payload: String,
10 pub created_at: String,
11}
12
13pub struct AcpSessionInfo {
14 pub id: String,
15 pub title: Option<String>,
16 pub created_at: String,
17 pub updated_at: String,
18 pub message_count: i64,
19}
20
21impl SqliteStore {
22 pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
28 sqlx::query("INSERT OR IGNORE INTO acp_sessions (id) VALUES (?)")
29 .bind(session_id)
30 .execute(&self.pool)
31 .await?;
32 Ok(())
33 }
34
35 pub async fn save_acp_event(
41 &self,
42 session_id: &str,
43 event_type: &str,
44 payload: &str,
45 ) -> Result<(), MemoryError> {
46 sqlx::query(
47 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
48 )
49 .bind(session_id)
50 .bind(event_type)
51 .bind(payload)
52 .execute(&self.pool)
53 .await?;
54 Ok(())
55 }
56
57 pub async fn load_acp_events(
63 &self,
64 session_id: &str,
65 ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
66 let rows = sqlx::query_as::<_, (String, String, String)>(
67 "SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id",
68 )
69 .bind(session_id)
70 .fetch_all(&self.pool)
71 .await?;
72
73 Ok(rows
74 .into_iter()
75 .map(|(event_type, payload, created_at)| AcpSessionEvent {
76 event_type,
77 payload,
78 created_at,
79 })
80 .collect())
81 }
82
83 pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
89 sqlx::query("DELETE FROM acp_sessions WHERE id = ?")
90 .bind(session_id)
91 .execute(&self.pool)
92 .await?;
93 Ok(())
94 }
95
96 pub async fn list_acp_sessions(
105 &self,
106 limit: usize,
107 ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
108 #[allow(clippy::cast_possible_wrap)]
110 let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
111 let rows = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
112 "SELECT s.id, s.title, s.created_at, s.updated_at, \
113 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
114 FROM acp_sessions s \
115 ORDER BY s.updated_at DESC \
116 LIMIT ?",
117 )
118 .bind(sql_limit)
119 .fetch_all(&self.pool)
120 .await?;
121
122 Ok(rows
123 .into_iter()
124 .map(
125 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
126 id,
127 title,
128 created_at,
129 updated_at,
130 message_count,
131 },
132 )
133 .collect())
134 }
135
136 pub async fn get_acp_session_info(
144 &self,
145 session_id: &str,
146 ) -> Result<Option<AcpSessionInfo>, MemoryError> {
147 let row = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
148 "SELECT s.id, s.title, s.created_at, s.updated_at, \
149 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
150 FROM acp_sessions s \
151 WHERE s.id = ?",
152 )
153 .bind(session_id)
154 .fetch_optional(&self.pool)
155 .await?;
156
157 Ok(row.map(
158 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
159 id,
160 title,
161 created_at,
162 updated_at,
163 message_count,
164 },
165 ))
166 }
167
168 pub async fn import_acp_events(
177 &self,
178 session_id: &str,
179 events: &[(&str, &str)],
180 ) -> Result<(), MemoryError> {
181 let mut tx = self.pool.begin().await?;
182 for (event_type, payload) in events {
183 sqlx::query(
184 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
185 )
186 .bind(session_id)
187 .bind(event_type)
188 .bind(payload)
189 .execute(&mut *tx)
190 .await?;
191 }
192 tx.commit().await?;
193 Ok(())
194 }
195
196 pub async fn update_session_title(
202 &self,
203 session_id: &str,
204 title: &str,
205 ) -> Result<(), MemoryError> {
206 sqlx::query("UPDATE acp_sessions SET title = ? WHERE id = ?")
207 .bind(title)
208 .bind(session_id)
209 .execute(&self.pool)
210 .await?;
211 Ok(())
212 }
213
214 pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
220 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM acp_sessions WHERE id = ?")
221 .bind(session_id)
222 .fetch_one(&self.pool)
223 .await?;
224 Ok(count > 0)
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 async fn make_store() -> SqliteStore {
233 SqliteStore::new(":memory:")
234 .await
235 .expect("SqliteStore::new")
236 }
237
238 #[tokio::test]
239 async fn create_and_exists() {
240 let store = make_store().await;
241 store.create_acp_session("sess-1").await.unwrap();
242 assert!(store.acp_session_exists("sess-1").await.unwrap());
243 assert!(!store.acp_session_exists("sess-2").await.unwrap());
244 }
245
246 #[tokio::test]
247 async fn save_and_load_events() {
248 let store = make_store().await;
249 store.create_acp_session("sess-1").await.unwrap();
250 store
251 .save_acp_event("sess-1", "user_message", "hello")
252 .await
253 .unwrap();
254 store
255 .save_acp_event("sess-1", "agent_message", "world")
256 .await
257 .unwrap();
258
259 let events = store.load_acp_events("sess-1").await.unwrap();
260 assert_eq!(events.len(), 2);
261 assert_eq!(events[0].event_type, "user_message");
262 assert_eq!(events[0].payload, "hello");
263 assert_eq!(events[1].event_type, "agent_message");
264 assert_eq!(events[1].payload, "world");
265 }
266
267 #[tokio::test]
268 async fn delete_cascades_events() {
269 let store = make_store().await;
270 store.create_acp_session("sess-1").await.unwrap();
271 store
272 .save_acp_event("sess-1", "user_message", "hello")
273 .await
274 .unwrap();
275 store.delete_acp_session("sess-1").await.unwrap();
276
277 assert!(!store.acp_session_exists("sess-1").await.unwrap());
278 let events = store.load_acp_events("sess-1").await.unwrap();
279 assert!(events.is_empty());
280 }
281
282 #[tokio::test]
283 async fn load_events_empty_for_unknown() {
284 let store = make_store().await;
285 let events = store.load_acp_events("no-such").await.unwrap();
286 assert!(events.is_empty());
287 }
288
289 #[tokio::test]
290 async fn list_sessions_includes_title_and_message_count() {
291 let store = make_store().await;
292 store.create_acp_session("sess-b").await.unwrap();
293
294 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
297
298 store.create_acp_session("sess-a").await.unwrap();
299 store.save_acp_event("sess-a", "user", "hi").await.unwrap();
300 store
301 .save_acp_event("sess-a", "agent", "hello")
302 .await
303 .unwrap();
304 store
305 .update_session_title("sess-a", "My Chat")
306 .await
307 .unwrap();
308
309 let sessions = store.list_acp_sessions(100).await.unwrap();
310 assert_eq!(sessions[0].id, "sess-a");
312 assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
313 assert_eq!(sessions[0].message_count, 2);
314
315 let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
317 assert!(b.title.is_none());
318 assert_eq!(b.message_count, 0);
319 }
320
321 #[tokio::test]
322 async fn list_sessions_respects_limit() {
323 let store = make_store().await;
324 for i in 0..5u8 {
325 store
326 .create_acp_session(&format!("sess-{i}"))
327 .await
328 .unwrap();
329 }
330 let sessions = store.list_acp_sessions(3).await.unwrap();
331 assert_eq!(sessions.len(), 3);
332 }
333
334 #[tokio::test]
335 async fn list_sessions_limit_one_boundary() {
336 let store = make_store().await;
337 for i in 0..3u8 {
338 store
339 .create_acp_session(&format!("sess-{i}"))
340 .await
341 .unwrap();
342 }
343 let sessions = store.list_acp_sessions(1).await.unwrap();
344 assert_eq!(sessions.len(), 1);
345 }
346
347 #[tokio::test]
348 async fn list_sessions_unlimited_when_zero() {
349 let store = make_store().await;
350 for i in 0..5u8 {
351 store
352 .create_acp_session(&format!("sess-{i}"))
353 .await
354 .unwrap();
355 }
356 let sessions = store.list_acp_sessions(0).await.unwrap();
357 assert_eq!(sessions.len(), 5);
358 }
359
360 #[tokio::test]
361 async fn get_acp_session_info_returns_none_for_missing() {
362 let store = make_store().await;
363 let info = store.get_acp_session_info("no-such").await.unwrap();
364 assert!(info.is_none());
365 }
366
367 #[tokio::test]
368 async fn get_acp_session_info_returns_data() {
369 let store = make_store().await;
370 store.create_acp_session("sess-x").await.unwrap();
371 store
372 .save_acp_event("sess-x", "user", "hello")
373 .await
374 .unwrap();
375 store.update_session_title("sess-x", "Test").await.unwrap();
376
377 let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
378 assert_eq!(info.id, "sess-x");
379 assert_eq!(info.title.as_deref(), Some("Test"));
380 assert_eq!(info.message_count, 1);
381 }
382
383 #[tokio::test]
384 async fn updated_at_trigger_fires_on_event_insert() {
385 let store = make_store().await;
386 store.create_acp_session("sess-t").await.unwrap();
387
388 let before = store
389 .get_acp_session_info("sess-t")
390 .await
391 .unwrap()
392 .unwrap()
393 .updated_at
394 .clone();
395
396 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
398
399 store
400 .save_acp_event("sess-t", "user", "ping")
401 .await
402 .unwrap();
403
404 let after = store
405 .get_acp_session_info("sess-t")
406 .await
407 .unwrap()
408 .unwrap()
409 .updated_at;
410
411 assert!(
412 after > before,
413 "updated_at should increase after event insert: before={before} after={after}"
414 );
415 }
416}