1use crate::error::MemoryError;
5use crate::store::SqliteStore;
6use crate::types::ConversationId;
7use zeph_db::ActiveDialect;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11pub struct AcpSessionEvent {
12 pub event_type: String,
13 pub payload: String,
14 pub created_at: String,
15}
16
17pub struct AcpSessionInfo {
18 pub id: String,
19 pub title: Option<String>,
20 pub created_at: String,
21 pub updated_at: String,
22 pub message_count: i64,
23}
24
25impl SqliteStore {
26 pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
32 let sql = format!(
33 "{} INTO acp_sessions (id) VALUES (?){}",
34 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
35 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
36 );
37 zeph_db::query(&sql)
38 .bind(session_id)
39 .execute(&self.pool)
40 .await?;
41 Ok(())
42 }
43
44 pub async fn save_acp_event(
50 &self,
51 session_id: &str,
52 event_type: &str,
53 payload: &str,
54 ) -> Result<(), MemoryError> {
55 zeph_db::query(sql!(
56 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
57 ))
58 .bind(session_id)
59 .bind(event_type)
60 .bind(payload)
61 .execute(&self.pool)
62 .await?;
63 Ok(())
64 }
65
66 pub async fn load_acp_events(
72 &self,
73 session_id: &str,
74 ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
75 let rows = zeph_db::query_as::<_, (String, String, String)>(
76 sql!("SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id"),
77 )
78 .bind(session_id)
79 .fetch_all(&self.pool)
80 .await?;
81
82 Ok(rows
83 .into_iter()
84 .map(|(event_type, payload, created_at)| AcpSessionEvent {
85 event_type,
86 payload,
87 created_at,
88 })
89 .collect())
90 }
91
92 pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
98 zeph_db::query(sql!("DELETE FROM acp_sessions WHERE id = ?"))
99 .bind(session_id)
100 .execute(&self.pool)
101 .await?;
102 Ok(())
103 }
104
105 pub async fn list_acp_sessions(
114 &self,
115 limit: usize,
116 ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
117 #[allow(clippy::cast_possible_wrap)]
119 let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
120 let rows = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
121 "SELECT s.id, s.title, s.created_at, s.updated_at, \
122 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
123 FROM acp_sessions s \
124 ORDER BY s.updated_at DESC \
125 LIMIT ?",
126 )
127 .bind(sql_limit)
128 .fetch_all(&self.pool)
129 .await?;
130
131 Ok(rows
132 .into_iter()
133 .map(
134 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
135 id,
136 title,
137 created_at,
138 updated_at,
139 message_count,
140 },
141 )
142 .collect())
143 }
144
145 pub async fn get_acp_session_info(
153 &self,
154 session_id: &str,
155 ) -> Result<Option<AcpSessionInfo>, MemoryError> {
156 let row = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
157 "SELECT s.id, s.title, s.created_at, s.updated_at, \
158 (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
159 FROM acp_sessions s \
160 WHERE s.id = ?",
161 )
162 .bind(session_id)
163 .fetch_optional(&self.pool)
164 .await?;
165
166 Ok(row.map(
167 |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
168 id,
169 title,
170 created_at,
171 updated_at,
172 message_count,
173 },
174 ))
175 }
176
177 pub async fn import_acp_events(
186 &self,
187 session_id: &str,
188 events: &[(&str, &str)],
189 ) -> Result<(), MemoryError> {
190 let mut tx = self.pool.begin().await?;
191 for (event_type, payload) in events {
192 zeph_db::query(sql!(
193 "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
194 ))
195 .bind(session_id)
196 .bind(event_type)
197 .bind(payload)
198 .execute(&mut *tx)
199 .await?;
200 }
201 tx.commit().await?;
202 Ok(())
203 }
204
205 pub async fn update_session_title(
211 &self,
212 session_id: &str,
213 title: &str,
214 ) -> Result<(), MemoryError> {
215 zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
216 .bind(title)
217 .bind(session_id)
218 .execute(&self.pool)
219 .await?;
220 Ok(())
221 }
222
223 pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
229 let count: i64 =
230 zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM acp_sessions WHERE id = ?"))
231 .bind(session_id)
232 .fetch_one(&self.pool)
233 .await?;
234 Ok(count > 0)
235 }
236
237 pub async fn create_acp_session_with_conversation(
243 &self,
244 session_id: &str,
245 conversation_id: ConversationId,
246 ) -> Result<(), MemoryError> {
247 let sql = format!(
248 "{} INTO acp_sessions (id, conversation_id) VALUES (?, ?){}",
249 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
250 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
251 );
252 zeph_db::query(&sql)
253 .bind(session_id)
254 .bind(conversation_id)
255 .execute(&self.pool)
256 .await?;
257 Ok(())
258 }
259
260 pub async fn get_acp_session_conversation_id(
268 &self,
269 session_id: &str,
270 ) -> Result<Option<ConversationId>, MemoryError> {
271 let row: Option<(Option<ConversationId>,)> = zeph_db::query_as(sql!(
272 "SELECT conversation_id FROM acp_sessions WHERE id = ?"
273 ))
274 .bind(session_id)
275 .fetch_optional(&self.pool)
276 .await?;
277 Ok(row.and_then(|(cid,)| cid))
278 }
279
280 pub async fn set_acp_session_conversation_id(
286 &self,
287 session_id: &str,
288 conversation_id: ConversationId,
289 ) -> Result<(), MemoryError> {
290 zeph_db::query(sql!(
291 "UPDATE acp_sessions SET conversation_id = ? WHERE id = ?"
292 ))
293 .bind(conversation_id)
294 .bind(session_id)
295 .execute(&self.pool)
296 .await?;
297 Ok(())
298 }
299
300 pub async fn copy_conversation(
313 &self,
314 source: ConversationId,
315 target: ConversationId,
316 ) -> Result<(), MemoryError> {
317 let mut tx = self.pool.begin().await?;
318
319 zeph_db::query(
323 sql!("INSERT INTO messages \
324 (conversation_id, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at) \
325 SELECT ?, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at \
326 FROM messages WHERE conversation_id = ? ORDER BY id"),
327 )
328 .bind(target)
329 .bind(source)
330 .execute(&mut *tx)
331 .await?;
332
333 tx.commit().await?;
338 Ok(())
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 async fn make_store() -> SqliteStore {
347 SqliteStore::new(":memory:")
348 .await
349 .expect("SqliteStore::new")
350 }
351
352 #[tokio::test]
353 async fn create_and_exists() {
354 let store = make_store().await;
355 store.create_acp_session("sess-1").await.unwrap();
356 assert!(store.acp_session_exists("sess-1").await.unwrap());
357 assert!(!store.acp_session_exists("sess-2").await.unwrap());
358 }
359
360 #[tokio::test]
361 async fn save_and_load_events() {
362 let store = make_store().await;
363 store.create_acp_session("sess-1").await.unwrap();
364 store
365 .save_acp_event("sess-1", "user_message", "hello")
366 .await
367 .unwrap();
368 store
369 .save_acp_event("sess-1", "agent_message", "world")
370 .await
371 .unwrap();
372
373 let events = store.load_acp_events("sess-1").await.unwrap();
374 assert_eq!(events.len(), 2);
375 assert_eq!(events[0].event_type, "user_message");
376 assert_eq!(events[0].payload, "hello");
377 assert_eq!(events[1].event_type, "agent_message");
378 assert_eq!(events[1].payload, "world");
379 }
380
381 #[tokio::test]
382 async fn delete_cascades_events() {
383 let store = make_store().await;
384 store.create_acp_session("sess-1").await.unwrap();
385 store
386 .save_acp_event("sess-1", "user_message", "hello")
387 .await
388 .unwrap();
389 store.delete_acp_session("sess-1").await.unwrap();
390
391 assert!(!store.acp_session_exists("sess-1").await.unwrap());
392 let events = store.load_acp_events("sess-1").await.unwrap();
393 assert!(events.is_empty());
394 }
395
396 #[tokio::test]
397 async fn load_events_empty_for_unknown() {
398 let store = make_store().await;
399 let events = store.load_acp_events("no-such").await.unwrap();
400 assert!(events.is_empty());
401 }
402
403 #[tokio::test]
404 async fn list_sessions_includes_title_and_message_count() {
405 let store = make_store().await;
406 store.create_acp_session("sess-b").await.unwrap();
407
408 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
411
412 store.create_acp_session("sess-a").await.unwrap();
413 store.save_acp_event("sess-a", "user", "hi").await.unwrap();
414 store
415 .save_acp_event("sess-a", "agent", "hello")
416 .await
417 .unwrap();
418 store
419 .update_session_title("sess-a", "My Chat")
420 .await
421 .unwrap();
422
423 let sessions = store.list_acp_sessions(100).await.unwrap();
424 assert_eq!(sessions[0].id, "sess-a");
426 assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
427 assert_eq!(sessions[0].message_count, 2);
428
429 let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
431 assert!(b.title.is_none());
432 assert_eq!(b.message_count, 0);
433 }
434
435 #[tokio::test]
436 async fn list_sessions_respects_limit() {
437 let store = make_store().await;
438 for i in 0..5u8 {
439 store
440 .create_acp_session(&format!("sess-{i}"))
441 .await
442 .unwrap();
443 }
444 let sessions = store.list_acp_sessions(3).await.unwrap();
445 assert_eq!(sessions.len(), 3);
446 }
447
448 #[tokio::test]
449 async fn list_sessions_limit_one_boundary() {
450 let store = make_store().await;
451 for i in 0..3u8 {
452 store
453 .create_acp_session(&format!("sess-{i}"))
454 .await
455 .unwrap();
456 }
457 let sessions = store.list_acp_sessions(1).await.unwrap();
458 assert_eq!(sessions.len(), 1);
459 }
460
461 #[tokio::test]
462 async fn list_sessions_unlimited_when_zero() {
463 let store = make_store().await;
464 for i in 0..5u8 {
465 store
466 .create_acp_session(&format!("sess-{i}"))
467 .await
468 .unwrap();
469 }
470 let sessions = store.list_acp_sessions(0).await.unwrap();
471 assert_eq!(sessions.len(), 5);
472 }
473
474 #[tokio::test]
475 async fn get_acp_session_info_returns_none_for_missing() {
476 let store = make_store().await;
477 let info = store.get_acp_session_info("no-such").await.unwrap();
478 assert!(info.is_none());
479 }
480
481 #[tokio::test]
482 async fn get_acp_session_info_returns_data() {
483 let store = make_store().await;
484 store.create_acp_session("sess-x").await.unwrap();
485 store
486 .save_acp_event("sess-x", "user", "hello")
487 .await
488 .unwrap();
489 store.update_session_title("sess-x", "Test").await.unwrap();
490
491 let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
492 assert_eq!(info.id, "sess-x");
493 assert_eq!(info.title.as_deref(), Some("Test"));
494 assert_eq!(info.message_count, 1);
495 }
496
497 #[tokio::test]
498 async fn updated_at_trigger_fires_on_event_insert() {
499 let store = make_store().await;
500 store.create_acp_session("sess-t").await.unwrap();
501
502 let before = store
503 .get_acp_session_info("sess-t")
504 .await
505 .unwrap()
506 .unwrap()
507 .updated_at
508 .clone();
509
510 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
512
513 store
514 .save_acp_event("sess-t", "user", "ping")
515 .await
516 .unwrap();
517
518 let after = store
519 .get_acp_session_info("sess-t")
520 .await
521 .unwrap()
522 .unwrap()
523 .updated_at;
524
525 assert!(
526 after > before,
527 "updated_at should increase after event insert: before={before} after={after}"
528 );
529 }
530
531 #[tokio::test]
532 async fn create_session_with_conversation_and_retrieve() {
533 let store = make_store().await;
534 let cid = store.create_conversation().await.unwrap();
535 store
536 .create_acp_session_with_conversation("sess-1", cid)
537 .await
538 .unwrap();
539 let retrieved = store
540 .get_acp_session_conversation_id("sess-1")
541 .await
542 .unwrap();
543 assert_eq!(retrieved, Some(cid));
544 }
545
546 #[tokio::test]
547 async fn get_conversation_id_returns_none_for_legacy_session() {
548 let store = make_store().await;
549 store.create_acp_session("legacy").await.unwrap();
550 let cid = store
551 .get_acp_session_conversation_id("legacy")
552 .await
553 .unwrap();
554 assert!(cid.is_none());
555 }
556
557 #[tokio::test]
558 async fn get_conversation_id_returns_none_for_missing_session() {
559 let store = make_store().await;
560 let cid = store
561 .get_acp_session_conversation_id("no-such")
562 .await
563 .unwrap();
564 assert!(cid.is_none());
565 }
566
567 #[tokio::test]
568 async fn set_conversation_id_updates_existing_session() {
569 let store = make_store().await;
570 store.create_acp_session("sess-2").await.unwrap();
571 let cid = store.create_conversation().await.unwrap();
572 store
573 .set_acp_session_conversation_id("sess-2", cid)
574 .await
575 .unwrap();
576 let retrieved = store
577 .get_acp_session_conversation_id("sess-2")
578 .await
579 .unwrap();
580 assert_eq!(retrieved, Some(cid));
581 }
582
583 #[tokio::test]
584 async fn copy_conversation_copies_messages_in_order() {
585 use zeph_llm::provider::Role;
586 let store = make_store().await;
587 let src = store.create_conversation().await.unwrap();
588 store.save_message(src, "user", "hello").await.unwrap();
589 store.save_message(src, "assistant", "world").await.unwrap();
590
591 let dst = store.create_conversation().await.unwrap();
592 store.copy_conversation(src, dst).await.unwrap();
593
594 let msgs = store.load_history(dst, 100).await.unwrap();
595 assert_eq!(msgs.len(), 2);
596 assert_eq!(msgs[0].role, Role::User);
597 assert_eq!(msgs[0].content, "hello");
598 assert_eq!(msgs[1].role, Role::Assistant);
599 assert_eq!(msgs[1].content, "world");
600 }
601
602 #[tokio::test]
603 async fn copy_conversation_empty_source_is_noop() {
604 let store = make_store().await;
605 let src = store.create_conversation().await.unwrap();
606 let dst = store.create_conversation().await.unwrap();
607 store.copy_conversation(src, dst).await.unwrap();
608 let msgs = store.load_history(dst, 100).await.unwrap();
609 assert!(msgs.is_empty());
610 }
611
612 #[tokio::test]
613 async fn copy_conversation_does_not_copy_summaries() {
614 let store = make_store().await;
617 let src = store.create_conversation().await.unwrap();
618 store.save_message(src, "user", "hello").await.unwrap();
619 zeph_db::query(
621 sql!("INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
622 VALUES (?, 'summary text', 1, 1, 10)"),
623 )
624 .bind(src)
625 .execute(&store.pool)
626 .await
627 .unwrap();
628
629 let dst = store.create_conversation().await.unwrap();
630 store.copy_conversation(src, dst).await.unwrap();
631
632 let count: i64 = zeph_db::query_scalar(sql!(
633 "SELECT COUNT(*) FROM summaries WHERE conversation_id = ?"
634 ))
635 .bind(dst)
636 .fetch_one(&store.pool)
637 .await
638 .unwrap();
639 assert_eq!(
640 count, 0,
641 "summaries must not be copied to forked conversation"
642 );
643 }
644
645 #[tokio::test]
646 async fn concurrent_sessions_get_distinct_conversation_ids() {
647 let store = make_store().await;
648 let cid1 = store.create_conversation().await.unwrap();
649 let cid2 = store.create_conversation().await.unwrap();
650 store
651 .create_acp_session_with_conversation("sess-a", cid1)
652 .await
653 .unwrap();
654 store
655 .create_acp_session_with_conversation("sess-b", cid2)
656 .await
657 .unwrap();
658
659 let retrieved1 = store
660 .get_acp_session_conversation_id("sess-a")
661 .await
662 .unwrap();
663 let retrieved2 = store
664 .get_acp_session_conversation_id("sess-b")
665 .await
666 .unwrap();
667
668 assert!(retrieved1.is_some());
669 assert!(retrieved2.is_some());
670 assert_ne!(
671 retrieved1, retrieved2,
672 "concurrent sessions must get distinct conversation_ids"
673 );
674 }
675}