1use crate::error::MemoryError;
5use crate::sqlite::SqliteStore;
6use crate::types::ConversationId;
7
8pub struct AcpSessionEvent {
9 pub event_type: String,
10 pub payload: String,
11 pub created_at: String,
12}
13
14pub struct AcpSessionInfo {
15 pub id: String,
16 pub title: Option<String>,
17 pub created_at: String,
18 pub updated_at: String,
19 pub message_count: i64,
20}
21
22impl SqliteStore {
23 pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
29 sqlx::query("INSERT OR IGNORE INTO acp_sessions (id) VALUES (?)")
30 .bind(session_id)
31 .execute(&self.pool)
32 .await?;
33 Ok(())
34 }
35
36 pub async fn save_acp_event(
42 &self,
43 session_id: &str,
44 event_type: &str,
45 payload: &str,
46 ) -> Result<(), MemoryError> {
47 sqlx::query(
48 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
49 )
50 .bind(session_id)
51 .bind(event_type)
52 .bind(payload)
53 .execute(&self.pool)
54 .await?;
55 Ok(())
56 }
57
58 pub async fn load_acp_events(
64 &self,
65 session_id: &str,
66 ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
67 let rows = sqlx::query_as::<_, (String, String, String)>(
68 "SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id",
69 )
70 .bind(session_id)
71 .fetch_all(&self.pool)
72 .await?;
73
74 Ok(rows
75 .into_iter()
76 .map(|(event_type, payload, created_at)| AcpSessionEvent {
77 event_type,
78 payload,
79 created_at,
80 })
81 .collect())
82 }
83
84 pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
90 sqlx::query("DELETE FROM acp_sessions WHERE id = ?")
91 .bind(session_id)
92 .execute(&self.pool)
93 .await?;
94 Ok(())
95 }
96
97 pub async fn list_acp_sessions(
106 &self,
107 limit: usize,
108 ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
109 #[allow(clippy::cast_possible_wrap)]
111 let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
112 let rows = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
113 "SELECT s.id, s.title, s.created_at, s.updated_at, \
114 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
115 FROM acp_sessions s \
116 ORDER BY s.updated_at DESC \
117 LIMIT ?",
118 )
119 .bind(sql_limit)
120 .fetch_all(&self.pool)
121 .await?;
122
123 Ok(rows
124 .into_iter()
125 .map(
126 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
127 id,
128 title,
129 created_at,
130 updated_at,
131 message_count,
132 },
133 )
134 .collect())
135 }
136
137 pub async fn get_acp_session_info(
145 &self,
146 session_id: &str,
147 ) -> Result<Option<AcpSessionInfo>, MemoryError> {
148 let row = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
149 "SELECT s.id, s.title, s.created_at, s.updated_at, \
150 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
151 FROM acp_sessions s \
152 WHERE s.id = ?",
153 )
154 .bind(session_id)
155 .fetch_optional(&self.pool)
156 .await?;
157
158 Ok(row.map(
159 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
160 id,
161 title,
162 created_at,
163 updated_at,
164 message_count,
165 },
166 ))
167 }
168
169 pub async fn import_acp_events(
178 &self,
179 session_id: &str,
180 events: &[(&str, &str)],
181 ) -> Result<(), MemoryError> {
182 let mut tx = self.pool.begin().await?;
183 for (event_type, payload) in events {
184 sqlx::query(
185 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
186 )
187 .bind(session_id)
188 .bind(event_type)
189 .bind(payload)
190 .execute(&mut *tx)
191 .await?;
192 }
193 tx.commit().await?;
194 Ok(())
195 }
196
197 pub async fn update_session_title(
203 &self,
204 session_id: &str,
205 title: &str,
206 ) -> Result<(), MemoryError> {
207 sqlx::query("UPDATE acp_sessions SET title = ? WHERE id = ?")
208 .bind(title)
209 .bind(session_id)
210 .execute(&self.pool)
211 .await?;
212 Ok(())
213 }
214
215 pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
221 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM acp_sessions WHERE id = ?")
222 .bind(session_id)
223 .fetch_one(&self.pool)
224 .await?;
225 Ok(count > 0)
226 }
227
228 pub async fn create_acp_session_with_conversation(
234 &self,
235 session_id: &str,
236 conversation_id: ConversationId,
237 ) -> Result<(), MemoryError> {
238 sqlx::query("INSERT OR IGNORE INTO acp_sessions (id, conversation_id) VALUES (?, ?)")
239 .bind(session_id)
240 .bind(conversation_id)
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 pub async fn get_acp_session_conversation_id(
254 &self,
255 session_id: &str,
256 ) -> Result<Option<ConversationId>, MemoryError> {
257 let row: Option<(Option<ConversationId>,)> =
258 sqlx::query_as("SELECT conversation_id FROM acp_sessions WHERE id = ?")
259 .bind(session_id)
260 .fetch_optional(&self.pool)
261 .await?;
262 Ok(row.and_then(|(cid,)| cid))
263 }
264
265 pub async fn set_acp_session_conversation_id(
271 &self,
272 session_id: &str,
273 conversation_id: ConversationId,
274 ) -> Result<(), MemoryError> {
275 sqlx::query("UPDATE acp_sessions SET conversation_id = ? WHERE id = ?")
276 .bind(conversation_id)
277 .bind(session_id)
278 .execute(&self.pool)
279 .await?;
280 Ok(())
281 }
282
283 pub async fn copy_conversation(
296 &self,
297 source: ConversationId,
298 target: ConversationId,
299 ) -> Result<(), MemoryError> {
300 let mut tx = self.pool.begin().await?;
301
302 sqlx::query(
306 "INSERT INTO messages \
307 (conversation_id, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at) \
308 SELECT ?, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at \
309 FROM messages WHERE conversation_id = ? ORDER BY id",
310 )
311 .bind(target)
312 .bind(source)
313 .execute(&mut *tx)
314 .await?;
315
316 tx.commit().await?;
321 Ok(())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 async fn make_store() -> SqliteStore {
330 SqliteStore::new(":memory:")
331 .await
332 .expect("SqliteStore::new")
333 }
334
335 #[tokio::test]
336 async fn create_and_exists() {
337 let store = make_store().await;
338 store.create_acp_session("sess-1").await.unwrap();
339 assert!(store.acp_session_exists("sess-1").await.unwrap());
340 assert!(!store.acp_session_exists("sess-2").await.unwrap());
341 }
342
343 #[tokio::test]
344 async fn save_and_load_events() {
345 let store = make_store().await;
346 store.create_acp_session("sess-1").await.unwrap();
347 store
348 .save_acp_event("sess-1", "user_message", "hello")
349 .await
350 .unwrap();
351 store
352 .save_acp_event("sess-1", "agent_message", "world")
353 .await
354 .unwrap();
355
356 let events = store.load_acp_events("sess-1").await.unwrap();
357 assert_eq!(events.len(), 2);
358 assert_eq!(events[0].event_type, "user_message");
359 assert_eq!(events[0].payload, "hello");
360 assert_eq!(events[1].event_type, "agent_message");
361 assert_eq!(events[1].payload, "world");
362 }
363
364 #[tokio::test]
365 async fn delete_cascades_events() {
366 let store = make_store().await;
367 store.create_acp_session("sess-1").await.unwrap();
368 store
369 .save_acp_event("sess-1", "user_message", "hello")
370 .await
371 .unwrap();
372 store.delete_acp_session("sess-1").await.unwrap();
373
374 assert!(!store.acp_session_exists("sess-1").await.unwrap());
375 let events = store.load_acp_events("sess-1").await.unwrap();
376 assert!(events.is_empty());
377 }
378
379 #[tokio::test]
380 async fn load_events_empty_for_unknown() {
381 let store = make_store().await;
382 let events = store.load_acp_events("no-such").await.unwrap();
383 assert!(events.is_empty());
384 }
385
386 #[tokio::test]
387 async fn list_sessions_includes_title_and_message_count() {
388 let store = make_store().await;
389 store.create_acp_session("sess-b").await.unwrap();
390
391 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
394
395 store.create_acp_session("sess-a").await.unwrap();
396 store.save_acp_event("sess-a", "user", "hi").await.unwrap();
397 store
398 .save_acp_event("sess-a", "agent", "hello")
399 .await
400 .unwrap();
401 store
402 .update_session_title("sess-a", "My Chat")
403 .await
404 .unwrap();
405
406 let sessions = store.list_acp_sessions(100).await.unwrap();
407 assert_eq!(sessions[0].id, "sess-a");
409 assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
410 assert_eq!(sessions[0].message_count, 2);
411
412 let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
414 assert!(b.title.is_none());
415 assert_eq!(b.message_count, 0);
416 }
417
418 #[tokio::test]
419 async fn list_sessions_respects_limit() {
420 let store = make_store().await;
421 for i in 0..5u8 {
422 store
423 .create_acp_session(&format!("sess-{i}"))
424 .await
425 .unwrap();
426 }
427 let sessions = store.list_acp_sessions(3).await.unwrap();
428 assert_eq!(sessions.len(), 3);
429 }
430
431 #[tokio::test]
432 async fn list_sessions_limit_one_boundary() {
433 let store = make_store().await;
434 for i in 0..3u8 {
435 store
436 .create_acp_session(&format!("sess-{i}"))
437 .await
438 .unwrap();
439 }
440 let sessions = store.list_acp_sessions(1).await.unwrap();
441 assert_eq!(sessions.len(), 1);
442 }
443
444 #[tokio::test]
445 async fn list_sessions_unlimited_when_zero() {
446 let store = make_store().await;
447 for i in 0..5u8 {
448 store
449 .create_acp_session(&format!("sess-{i}"))
450 .await
451 .unwrap();
452 }
453 let sessions = store.list_acp_sessions(0).await.unwrap();
454 assert_eq!(sessions.len(), 5);
455 }
456
457 #[tokio::test]
458 async fn get_acp_session_info_returns_none_for_missing() {
459 let store = make_store().await;
460 let info = store.get_acp_session_info("no-such").await.unwrap();
461 assert!(info.is_none());
462 }
463
464 #[tokio::test]
465 async fn get_acp_session_info_returns_data() {
466 let store = make_store().await;
467 store.create_acp_session("sess-x").await.unwrap();
468 store
469 .save_acp_event("sess-x", "user", "hello")
470 .await
471 .unwrap();
472 store.update_session_title("sess-x", "Test").await.unwrap();
473
474 let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
475 assert_eq!(info.id, "sess-x");
476 assert_eq!(info.title.as_deref(), Some("Test"));
477 assert_eq!(info.message_count, 1);
478 }
479
480 #[tokio::test]
481 async fn updated_at_trigger_fires_on_event_insert() {
482 let store = make_store().await;
483 store.create_acp_session("sess-t").await.unwrap();
484
485 let before = store
486 .get_acp_session_info("sess-t")
487 .await
488 .unwrap()
489 .unwrap()
490 .updated_at
491 .clone();
492
493 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
495
496 store
497 .save_acp_event("sess-t", "user", "ping")
498 .await
499 .unwrap();
500
501 let after = store
502 .get_acp_session_info("sess-t")
503 .await
504 .unwrap()
505 .unwrap()
506 .updated_at;
507
508 assert!(
509 after > before,
510 "updated_at should increase after event insert: before={before} after={after}"
511 );
512 }
513
514 #[tokio::test]
515 async fn create_session_with_conversation_and_retrieve() {
516 let store = make_store().await;
517 let cid = store.create_conversation().await.unwrap();
518 store
519 .create_acp_session_with_conversation("sess-1", cid)
520 .await
521 .unwrap();
522 let retrieved = store
523 .get_acp_session_conversation_id("sess-1")
524 .await
525 .unwrap();
526 assert_eq!(retrieved, Some(cid));
527 }
528
529 #[tokio::test]
530 async fn get_conversation_id_returns_none_for_legacy_session() {
531 let store = make_store().await;
532 store.create_acp_session("legacy").await.unwrap();
533 let cid = store
534 .get_acp_session_conversation_id("legacy")
535 .await
536 .unwrap();
537 assert!(cid.is_none());
538 }
539
540 #[tokio::test]
541 async fn get_conversation_id_returns_none_for_missing_session() {
542 let store = make_store().await;
543 let cid = store
544 .get_acp_session_conversation_id("no-such")
545 .await
546 .unwrap();
547 assert!(cid.is_none());
548 }
549
550 #[tokio::test]
551 async fn set_conversation_id_updates_existing_session() {
552 let store = make_store().await;
553 store.create_acp_session("sess-2").await.unwrap();
554 let cid = store.create_conversation().await.unwrap();
555 store
556 .set_acp_session_conversation_id("sess-2", cid)
557 .await
558 .unwrap();
559 let retrieved = store
560 .get_acp_session_conversation_id("sess-2")
561 .await
562 .unwrap();
563 assert_eq!(retrieved, Some(cid));
564 }
565
566 #[tokio::test]
567 async fn copy_conversation_copies_messages_in_order() {
568 use zeph_llm::provider::Role;
569 let store = make_store().await;
570 let src = store.create_conversation().await.unwrap();
571 store.save_message(src, "user", "hello").await.unwrap();
572 store.save_message(src, "assistant", "world").await.unwrap();
573
574 let dst = store.create_conversation().await.unwrap();
575 store.copy_conversation(src, dst).await.unwrap();
576
577 let msgs = store.load_history(dst, 100).await.unwrap();
578 assert_eq!(msgs.len(), 2);
579 assert_eq!(msgs[0].role, Role::User);
580 assert_eq!(msgs[0].content, "hello");
581 assert_eq!(msgs[1].role, Role::Assistant);
582 assert_eq!(msgs[1].content, "world");
583 }
584
585 #[tokio::test]
586 async fn copy_conversation_empty_source_is_noop() {
587 let store = make_store().await;
588 let src = store.create_conversation().await.unwrap();
589 let dst = store.create_conversation().await.unwrap();
590 store.copy_conversation(src, dst).await.unwrap();
591 let msgs = store.load_history(dst, 100).await.unwrap();
592 assert!(msgs.is_empty());
593 }
594
595 #[tokio::test]
596 async fn copy_conversation_does_not_copy_summaries() {
597 let store = make_store().await;
600 let src = store.create_conversation().await.unwrap();
601 store.save_message(src, "user", "hello").await.unwrap();
602 sqlx::query(
604 "INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
605 VALUES (?, 'summary text', 1, 1, 10)",
606 )
607 .bind(src)
608 .execute(&store.pool)
609 .await
610 .unwrap();
611
612 let dst = store.create_conversation().await.unwrap();
613 store.copy_conversation(src, dst).await.unwrap();
614
615 let count: i64 =
616 sqlx::query_scalar("SELECT COUNT(*) FROM summaries WHERE conversation_id = ?")
617 .bind(dst)
618 .fetch_one(&store.pool)
619 .await
620 .unwrap();
621 assert_eq!(
622 count, 0,
623 "summaries must not be copied to forked conversation"
624 );
625 }
626
627 #[tokio::test]
628 async fn concurrent_sessions_get_distinct_conversation_ids() {
629 let store = make_store().await;
630 let cid1 = store.create_conversation().await.unwrap();
631 let cid2 = store.create_conversation().await.unwrap();
632 store
633 .create_acp_session_with_conversation("sess-a", cid1)
634 .await
635 .unwrap();
636 store
637 .create_acp_session_with_conversation("sess-b", cid2)
638 .await
639 .unwrap();
640
641 let retrieved1 = store
642 .get_acp_session_conversation_id("sess-a")
643 .await
644 .unwrap();
645 let retrieved2 = store
646 .get_acp_session_conversation_id("sess-b")
647 .await
648 .unwrap();
649
650 assert!(retrieved1.is_some());
651 assert!(retrieved2.is_some());
652 assert_ne!(
653 retrieved1, retrieved2,
654 "concurrent sessions must get distinct conversation_ids"
655 );
656 }
657}