1use crate::error::MemoryError;
5use crate::store::SqliteStore;
6use crate::types::ConversationId;
7use zeph_db::ActiveDialect;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11pub struct AcpSessionEvent {
12 pub event_type: String,
13 pub payload: String,
14 pub created_at: String,
15}
16
17pub struct AcpSessionInfo {
18 pub id: String,
19 pub title: Option<String>,
20 pub created_at: String,
21 pub updated_at: String,
22 pub message_count: i64,
23}
24
25impl SqliteStore {
26 pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
32 let sql = format!(
33 "{} INTO acp_sessions (id) VALUES (?){}",
34 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
35 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
36 );
37 zeph_db::query(&sql)
38 .bind(session_id)
39 .execute(&self.pool)
40 .await?;
41 Ok(())
42 }
43
44 pub async fn save_acp_event(
50 &self,
51 session_id: &str,
52 event_type: &str,
53 payload: &str,
54 ) -> Result<(), MemoryError> {
55 zeph_db::query(sql!(
56 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
57 ))
58 .bind(session_id)
59 .bind(event_type)
60 .bind(payload)
61 .execute(&self.pool)
62 .await?;
63 Ok(())
64 }
65
66 pub async fn load_acp_events(
72 &self,
73 session_id: &str,
74 ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
75 let rows = zeph_db::query_as::<_, (String, String, String)>(
76 sql!("SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id"),
77 )
78 .bind(session_id)
79 .fetch_all(&self.pool)
80 .await?;
81
82 Ok(rows
83 .into_iter()
84 .map(|(event_type, payload, created_at)| AcpSessionEvent {
85 event_type,
86 payload,
87 created_at,
88 })
89 .collect())
90 }
91
92 pub async fn delete_acp_session_checked(&self, session_id: &str) -> Result<bool, MemoryError> {
101 let result = zeph_db::query(sql!("DELETE FROM acp_sessions WHERE id = ?"))
102 .bind(session_id)
103 .execute(&self.pool)
104 .await?;
105 Ok(result.rows_affected() > 0)
106 }
107
108 pub async fn list_acp_sessions(
117 &self,
118 limit: usize,
119 ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
120 #[allow(clippy::cast_possible_wrap)]
122 let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
123 let rows = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
124 "SELECT s.id, s.title, s.created_at, s.updated_at, \
125 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
126 FROM acp_sessions s \
127 ORDER BY s.updated_at DESC \
128 LIMIT ?",
129 )
130 .bind(sql_limit)
131 .fetch_all(&self.pool)
132 .await?;
133
134 Ok(rows
135 .into_iter()
136 .map(
137 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
138 id,
139 title,
140 created_at,
141 updated_at,
142 message_count,
143 },
144 )
145 .collect())
146 }
147
148 pub async fn get_acp_session_info(
156 &self,
157 session_id: &str,
158 ) -> Result<Option<AcpSessionInfo>, MemoryError> {
159 let row = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
160 "SELECT s.id, s.title, s.created_at, s.updated_at, \
161 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
162 FROM acp_sessions s \
163 WHERE s.id = ?",
164 )
165 .bind(session_id)
166 .fetch_optional(&self.pool)
167 .await?;
168
169 Ok(row.map(
170 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
171 id,
172 title,
173 created_at,
174 updated_at,
175 message_count,
176 },
177 ))
178 }
179
180 pub async fn import_acp_events(
189 &self,
190 session_id: &str,
191 events: &[(&str, &str)],
192 ) -> Result<(), MemoryError> {
193 let mut tx = self.pool.begin().await?;
194 for (event_type, payload) in events {
195 zeph_db::query(sql!(
196 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
197 ))
198 .bind(session_id)
199 .bind(event_type)
200 .bind(payload)
201 .execute(&mut *tx)
202 .await?;
203 }
204 tx.commit().await?;
205 Ok(())
206 }
207
208 pub async fn update_session_title(
214 &self,
215 session_id: &str,
216 title: &str,
217 ) -> Result<(), MemoryError> {
218 zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
219 .bind(title)
220 .bind(session_id)
221 .execute(&self.pool)
222 .await?;
223 Ok(())
224 }
225
226 pub async fn update_session_title_checked(
235 &self,
236 session_id: &str,
237 title: &str,
238 ) -> Result<bool, MemoryError> {
239 let result = zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
240 .bind(title)
241 .bind(session_id)
242 .execute(&self.pool)
243 .await?;
244 Ok(result.rows_affected() > 0)
245 }
246
247 pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
253 let count: i64 =
254 zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM acp_sessions WHERE id = ?"))
255 .bind(session_id)
256 .fetch_one(&self.pool)
257 .await?;
258 Ok(count > 0)
259 }
260
261 pub async fn create_acp_session_with_conversation(
267 &self,
268 session_id: &str,
269 conversation_id: ConversationId,
270 ) -> Result<(), MemoryError> {
271 let sql = format!(
272 "{} INTO acp_sessions (id, conversation_id) VALUES (?, ?){}",
273 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
274 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
275 );
276 zeph_db::query(&sql)
277 .bind(session_id)
278 .bind(conversation_id)
279 .execute(&self.pool)
280 .await?;
281 Ok(())
282 }
283
284 pub async fn get_acp_session_conversation_id(
292 &self,
293 session_id: &str,
294 ) -> Result<Option<ConversationId>, MemoryError> {
295 let row: Option<(Option<ConversationId>,)> = zeph_db::query_as(sql!(
296 "SELECT conversation_id FROM acp_sessions WHERE id = ?"
297 ))
298 .bind(session_id)
299 .fetch_optional(&self.pool)
300 .await?;
301 Ok(row.and_then(|(cid,)| cid))
302 }
303
304 pub async fn set_acp_session_conversation_id(
310 &self,
311 session_id: &str,
312 conversation_id: ConversationId,
313 ) -> Result<(), MemoryError> {
314 zeph_db::query(sql!(
315 "UPDATE acp_sessions SET conversation_id = ? WHERE id = ?"
316 ))
317 .bind(conversation_id)
318 .bind(session_id)
319 .execute(&self.pool)
320 .await?;
321 Ok(())
322 }
323
324 pub async fn copy_conversation(
337 &self,
338 source: ConversationId,
339 target: ConversationId,
340 ) -> Result<(), MemoryError> {
341 let mut tx = self.pool.begin().await?;
342
343 zeph_db::query(sql!(
347 "INSERT INTO messages \
348 (conversation_id, role, content, parts, visibility, compacted_at, deleted_at) \
349 SELECT ?, role, content, parts, visibility, compacted_at, deleted_at \
350 FROM messages WHERE conversation_id = ? ORDER BY id"
351 ))
352 .bind(target)
353 .bind(source)
354 .execute(&mut *tx)
355 .await?;
356
357 tx.commit().await?;
362 Ok(())
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 async fn make_store() -> SqliteStore {
371 SqliteStore::new(":memory:")
372 .await
373 .expect("SqliteStore::new")
374 }
375
376 #[tokio::test]
377 async fn create_and_exists() {
378 let store = make_store().await;
379 store.create_acp_session("sess-1").await.unwrap();
380 assert!(store.acp_session_exists("sess-1").await.unwrap());
381 assert!(!store.acp_session_exists("sess-2").await.unwrap());
382 }
383
384 #[tokio::test]
385 async fn save_and_load_events() {
386 let store = make_store().await;
387 store.create_acp_session("sess-1").await.unwrap();
388 store
389 .save_acp_event("sess-1", "user_message", "hello")
390 .await
391 .unwrap();
392 store
393 .save_acp_event("sess-1", "agent_message", "world")
394 .await
395 .unwrap();
396
397 let events = store.load_acp_events("sess-1").await.unwrap();
398 assert_eq!(events.len(), 2);
399 assert_eq!(events[0].event_type, "user_message");
400 assert_eq!(events[0].payload, "hello");
401 assert_eq!(events[1].event_type, "agent_message");
402 assert_eq!(events[1].payload, "world");
403 }
404
405 #[tokio::test]
406 async fn delete_cascades_events() {
407 let store = make_store().await;
408 store.create_acp_session("sess-1").await.unwrap();
409 store
410 .save_acp_event("sess-1", "user_message", "hello")
411 .await
412 .unwrap();
413 store.delete_acp_session_checked("sess-1").await.unwrap();
414
415 assert!(!store.acp_session_exists("sess-1").await.unwrap());
416 let events = store.load_acp_events("sess-1").await.unwrap();
417 assert!(events.is_empty());
418 }
419
420 #[tokio::test]
421 async fn load_events_empty_for_unknown() {
422 let store = make_store().await;
423 let events = store.load_acp_events("no-such").await.unwrap();
424 assert!(events.is_empty());
425 }
426
427 #[tokio::test]
428 async fn list_sessions_includes_title_and_message_count() {
429 let store = make_store().await;
430 store.create_acp_session("sess-b").await.unwrap();
431
432 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
435
436 store.create_acp_session("sess-a").await.unwrap();
437 store.save_acp_event("sess-a", "user", "hi").await.unwrap();
438 store
439 .save_acp_event("sess-a", "agent", "hello")
440 .await
441 .unwrap();
442 store
443 .update_session_title("sess-a", "My Chat")
444 .await
445 .unwrap();
446
447 let sessions = store.list_acp_sessions(100).await.unwrap();
448 assert_eq!(sessions[0].id, "sess-a");
450 assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
451 assert_eq!(sessions[0].message_count, 2);
452
453 let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
455 assert!(b.title.is_none());
456 assert_eq!(b.message_count, 0);
457 }
458
459 #[tokio::test]
460 async fn list_sessions_respects_limit() {
461 let store = make_store().await;
462 for i in 0..5u8 {
463 store
464 .create_acp_session(&format!("sess-{i}"))
465 .await
466 .unwrap();
467 }
468 let sessions = store.list_acp_sessions(3).await.unwrap();
469 assert_eq!(sessions.len(), 3);
470 }
471
472 #[tokio::test]
473 async fn list_sessions_limit_one_boundary() {
474 let store = make_store().await;
475 for i in 0..3u8 {
476 store
477 .create_acp_session(&format!("sess-{i}"))
478 .await
479 .unwrap();
480 }
481 let sessions = store.list_acp_sessions(1).await.unwrap();
482 assert_eq!(sessions.len(), 1);
483 }
484
485 #[tokio::test]
486 async fn list_sessions_unlimited_when_zero() {
487 let store = make_store().await;
488 for i in 0..5u8 {
489 store
490 .create_acp_session(&format!("sess-{i}"))
491 .await
492 .unwrap();
493 }
494 let sessions = store.list_acp_sessions(0).await.unwrap();
495 assert_eq!(sessions.len(), 5);
496 }
497
498 #[tokio::test]
499 async fn get_acp_session_info_returns_none_for_missing() {
500 let store = make_store().await;
501 let info = store.get_acp_session_info("no-such").await.unwrap();
502 assert!(info.is_none());
503 }
504
505 #[tokio::test]
506 async fn get_acp_session_info_returns_data() {
507 let store = make_store().await;
508 store.create_acp_session("sess-x").await.unwrap();
509 store
510 .save_acp_event("sess-x", "user", "hello")
511 .await
512 .unwrap();
513 store.update_session_title("sess-x", "Test").await.unwrap();
514
515 let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
516 assert_eq!(info.id, "sess-x");
517 assert_eq!(info.title.as_deref(), Some("Test"));
518 assert_eq!(info.message_count, 1);
519 }
520
521 #[tokio::test]
522 async fn updated_at_trigger_fires_on_event_insert() {
523 let store = make_store().await;
524 store.create_acp_session("sess-t").await.unwrap();
525
526 let before = store
527 .get_acp_session_info("sess-t")
528 .await
529 .unwrap()
530 .unwrap()
531 .updated_at
532 .clone();
533
534 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
536
537 store
538 .save_acp_event("sess-t", "user", "ping")
539 .await
540 .unwrap();
541
542 let after = store
543 .get_acp_session_info("sess-t")
544 .await
545 .unwrap()
546 .unwrap()
547 .updated_at;
548
549 assert!(
550 after > before,
551 "updated_at should increase after event insert: before={before} after={after}"
552 );
553 }
554
555 #[tokio::test]
556 async fn create_session_with_conversation_and_retrieve() {
557 let store = make_store().await;
558 let cid = store.create_conversation().await.unwrap();
559 store
560 .create_acp_session_with_conversation("sess-1", cid)
561 .await
562 .unwrap();
563 let retrieved = store
564 .get_acp_session_conversation_id("sess-1")
565 .await
566 .unwrap();
567 assert_eq!(retrieved, Some(cid));
568 }
569
570 #[tokio::test]
571 async fn get_conversation_id_returns_none_for_legacy_session() {
572 let store = make_store().await;
573 store.create_acp_session("legacy").await.unwrap();
574 let cid = store
575 .get_acp_session_conversation_id("legacy")
576 .await
577 .unwrap();
578 assert!(cid.is_none());
579 }
580
581 #[tokio::test]
582 async fn get_conversation_id_returns_none_for_missing_session() {
583 let store = make_store().await;
584 let cid = store
585 .get_acp_session_conversation_id("no-such")
586 .await
587 .unwrap();
588 assert!(cid.is_none());
589 }
590
591 #[tokio::test]
592 async fn set_conversation_id_updates_existing_session() {
593 let store = make_store().await;
594 store.create_acp_session("sess-2").await.unwrap();
595 let cid = store.create_conversation().await.unwrap();
596 store
597 .set_acp_session_conversation_id("sess-2", cid)
598 .await
599 .unwrap();
600 let retrieved = store
601 .get_acp_session_conversation_id("sess-2")
602 .await
603 .unwrap();
604 assert_eq!(retrieved, Some(cid));
605 }
606
607 #[tokio::test]
608 async fn copy_conversation_copies_messages_in_order() {
609 use zeph_llm::provider::Role;
610 let store = make_store().await;
611 let src = store.create_conversation().await.unwrap();
612 store.save_message(src, "user", "hello").await.unwrap();
613 store.save_message(src, "assistant", "world").await.unwrap();
614
615 let dst = store.create_conversation().await.unwrap();
616 store.copy_conversation(src, dst).await.unwrap();
617
618 let msgs = store.load_history(dst, 100).await.unwrap();
619 assert_eq!(msgs.len(), 2);
620 assert_eq!(msgs[0].role, Role::User);
621 assert_eq!(msgs[0].content, "hello");
622 assert_eq!(msgs[1].role, Role::Assistant);
623 assert_eq!(msgs[1].content, "world");
624 }
625
626 #[tokio::test]
627 async fn copy_conversation_empty_source_is_noop() {
628 let store = make_store().await;
629 let src = store.create_conversation().await.unwrap();
630 let dst = store.create_conversation().await.unwrap();
631 store.copy_conversation(src, dst).await.unwrap();
632 let msgs = store.load_history(dst, 100).await.unwrap();
633 assert!(msgs.is_empty());
634 }
635
636 #[tokio::test]
637 async fn copy_conversation_does_not_copy_summaries() {
638 let store = make_store().await;
641 let src = store.create_conversation().await.unwrap();
642 store.save_message(src, "user", "hello").await.unwrap();
643 zeph_db::query(
645 sql!("INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
646 VALUES (?, 'summary text', 1, 1, 10)"),
647 )
648 .bind(src)
649 .execute(&store.pool)
650 .await
651 .unwrap();
652
653 let dst = store.create_conversation().await.unwrap();
654 store.copy_conversation(src, dst).await.unwrap();
655
656 let count: i64 = zeph_db::query_scalar(sql!(
657 "SELECT COUNT(*) FROM summaries WHERE conversation_id = ?"
658 ))
659 .bind(dst)
660 .fetch_one(&store.pool)
661 .await
662 .unwrap();
663 assert_eq!(
664 count, 0,
665 "summaries must not be copied to forked conversation"
666 );
667 }
668
669 #[tokio::test]
670 async fn concurrent_sessions_get_distinct_conversation_ids() {
671 let store = make_store().await;
672 let cid1 = store.create_conversation().await.unwrap();
673 let cid2 = store.create_conversation().await.unwrap();
674 store
675 .create_acp_session_with_conversation("sess-a", cid1)
676 .await
677 .unwrap();
678 store
679 .create_acp_session_with_conversation("sess-b", cid2)
680 .await
681 .unwrap();
682
683 let retrieved1 = store
684 .get_acp_session_conversation_id("sess-a")
685 .await
686 .unwrap();
687 let retrieved2 = store
688 .get_acp_session_conversation_id("sess-b")
689 .await
690 .unwrap();
691
692 assert!(retrieved1.is_some());
693 assert!(retrieved2.is_some());
694 assert_ne!(
695 retrieved1, retrieved2,
696 "concurrent sessions must get distinct conversation_ids"
697 );
698 }
699}