Skip to main content

zeph_memory/store/
acp_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::error::MemoryError;
5use crate::store::SqliteStore;
6use crate::types::ConversationId;
7use zeph_db::ActiveDialect;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11pub struct AcpSessionEvent {
12    pub event_type: String,
13    pub payload: String,
14    pub created_at: String,
15}
16
17pub struct AcpSessionInfo {
18    pub id: String,
19    pub title: Option<String>,
20    pub created_at: String,
21    pub updated_at: String,
22    pub message_count: i64,
23}
24
25impl SqliteStore {
26    /// Create a new ACP session record.
27    ///
28    /// # Errors
29    ///
30    /// Returns an error if the database write fails.
31    pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
32        let sql = format!(
33            "{} INTO acp_sessions (id) VALUES (?){}",
34            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
35            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
36        );
37        zeph_db::query(&sql)
38            .bind(session_id)
39            .execute(&self.pool)
40            .await?;
41        Ok(())
42    }
43
44    /// Persist a single ACP session event.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the database write fails.
49    pub async fn save_acp_event(
50        &self,
51        session_id: &str,
52        event_type: &str,
53        payload: &str,
54    ) -> Result<(), MemoryError> {
55        zeph_db::query(sql!(
56            "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
57        ))
58        .bind(session_id)
59        .bind(event_type)
60        .bind(payload)
61        .execute(&self.pool)
62        .await?;
63        Ok(())
64    }
65
66    /// Load all events for an ACP session in insertion order.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the database query fails.
71    pub async fn load_acp_events(
72        &self,
73        session_id: &str,
74    ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
75        let rows = zeph_db::query_as::<_, (String, String, String)>(
76            sql!("SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id"),
77        )
78        .bind(session_id)
79        .fetch_all(&self.pool)
80        .await?;
81
82        Ok(rows
83            .into_iter()
84            .map(|(event_type, payload, created_at)| AcpSessionEvent {
85                event_type,
86                payload,
87                created_at,
88            })
89            .collect())
90    }
91
92    /// Delete an ACP session and its events (cascade).
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the database write fails.
97    pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
98        zeph_db::query(sql!("DELETE FROM acp_sessions WHERE id = ?"))
99            .bind(session_id)
100            .execute(&self.pool)
101            .await?;
102        Ok(())
103    }
104
105    /// List ACP sessions ordered by last activity descending.
106    ///
107    /// Includes title, `updated_at`, and message count per session.
108    /// Pass `limit = 0` for unlimited results.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the database query fails.
113    pub async fn list_acp_sessions(
114        &self,
115        limit: usize,
116    ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
117        // LIMIT -1 in SQLite means no limit; cast limit=0 sentinel to -1.
118        #[allow(clippy::cast_possible_wrap)]
119        let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
120        let rows = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
121            "SELECT s.id, s.title, s.created_at, s.updated_at, \
122             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
123             FROM acp_sessions s \
124             ORDER BY s.updated_at DESC \
125             LIMIT ?",
126        )
127        .bind(sql_limit)
128        .fetch_all(&self.pool)
129        .await?;
130
131        Ok(rows
132            .into_iter()
133            .map(
134                |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
135                    id,
136                    title,
137                    created_at,
138                    updated_at,
139                    message_count,
140                },
141            )
142            .collect())
143    }
144
145    /// Fetch metadata for a single ACP session.
146    ///
147    /// Returns `None` if the session does not exist.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the database query fails.
152    pub async fn get_acp_session_info(
153        &self,
154        session_id: &str,
155    ) -> Result<Option<AcpSessionInfo>, MemoryError> {
156        let row = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
157            "SELECT s.id, s.title, s.created_at, s.updated_at, \
158             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
159             FROM acp_sessions s \
160             WHERE s.id = ?",
161        )
162        .bind(session_id)
163        .fetch_optional(&self.pool)
164        .await?;
165
166        Ok(row.map(
167            |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
168                id,
169                title,
170                created_at,
171                updated_at,
172                message_count,
173            },
174        ))
175    }
176
177    /// Insert multiple events for a session inside a single transaction.
178    ///
179    /// Atomically writes all events or none. More efficient than individual inserts
180    /// for bulk import use cases.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the transaction or any insert fails.
185    pub async fn import_acp_events(
186        &self,
187        session_id: &str,
188        events: &[(&str, &str)],
189    ) -> Result<(), MemoryError> {
190        let mut tx = self.pool.begin().await?;
191        for (event_type, payload) in events {
192            zeph_db::query(sql!(
193                "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
194            ))
195            .bind(session_id)
196            .bind(event_type)
197            .bind(payload)
198            .execute(&mut *tx)
199            .await?;
200        }
201        tx.commit().await?;
202        Ok(())
203    }
204
205    /// Update the title of an ACP session.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the database write fails.
210    pub async fn update_session_title(
211        &self,
212        session_id: &str,
213        title: &str,
214    ) -> Result<(), MemoryError> {
215        zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
216            .bind(title)
217            .bind(session_id)
218            .execute(&self.pool)
219            .await?;
220        Ok(())
221    }
222
223    /// Check whether an ACP session record exists.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the database query fails.
228    pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
229        let count: i64 =
230            zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM acp_sessions WHERE id = ?"))
231                .bind(session_id)
232                .fetch_one(&self.pool)
233                .await?;
234        Ok(count > 0)
235    }
236
237    /// Create a new ACP session record with an associated conversation.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the database write fails.
242    pub async fn create_acp_session_with_conversation(
243        &self,
244        session_id: &str,
245        conversation_id: ConversationId,
246    ) -> Result<(), MemoryError> {
247        let sql = format!(
248            "{} INTO acp_sessions (id, conversation_id) VALUES (?, ?){}",
249            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
250            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
251        );
252        zeph_db::query(&sql)
253            .bind(session_id)
254            .bind(conversation_id)
255            .execute(&self.pool)
256            .await?;
257        Ok(())
258    }
259
260    /// Get the conversation ID associated with an ACP session.
261    ///
262    /// Returns `None` if the session has no conversation mapping (legacy session).
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if the database query fails.
267    pub async fn get_acp_session_conversation_id(
268        &self,
269        session_id: &str,
270    ) -> Result<Option<ConversationId>, MemoryError> {
271        let row: Option<(Option<ConversationId>,)> = zeph_db::query_as(sql!(
272            "SELECT conversation_id FROM acp_sessions WHERE id = ?"
273        ))
274        .bind(session_id)
275        .fetch_optional(&self.pool)
276        .await?;
277        Ok(row.and_then(|(cid,)| cid))
278    }
279
280    /// Update the conversation mapping for an ACP session.
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if the database write fails.
285    pub async fn set_acp_session_conversation_id(
286        &self,
287        session_id: &str,
288        conversation_id: ConversationId,
289    ) -> Result<(), MemoryError> {
290        zeph_db::query(sql!(
291            "UPDATE acp_sessions SET conversation_id = ? WHERE id = ?"
292        ))
293        .bind(conversation_id)
294        .bind(session_id)
295        .execute(&self.pool)
296        .await?;
297        Ok(())
298    }
299
300    /// Copy all messages from one conversation to another, preserving order.
301    ///
302    /// Summaries are intentionally NOT copied: their `first_message_id`/`last_message_id`
303    /// reference message IDs from the source conversation which differ from the new IDs
304    /// assigned to the copied messages, making the compaction cursor incorrect. The forked
305    /// session inherits the full message history and builds its own compaction state from
306    /// scratch. Other per-conversation state also excluded: embeddings (re-indexed on demand),
307    /// deferred tool summaries (treated as fresh context budget).
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the database write fails.
312    pub async fn copy_conversation(
313        &self,
314        source: ConversationId,
315        target: ConversationId,
316    ) -> Result<(), MemoryError> {
317        let mut tx = self.pool.begin().await?;
318
319        // Copy messages in order. Only columns present across all migrations are included;
320        // per-message auto-fields (id, created_at, last_accessed, access_count, qdrant_cleaned)
321        // are excluded so they are generated fresh for the target conversation.
322        zeph_db::query(
323            sql!("INSERT INTO messages \
324                (conversation_id, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at) \
325             SELECT ?, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at \
326             FROM messages WHERE conversation_id = ? ORDER BY id"),
327        )
328        .bind(target)
329        .bind(source)
330        .execute(&mut *tx)
331        .await?;
332
333        // Summaries are NOT copied — their message ID boundaries reference the source
334        // conversation and would corrupt the compaction cursor in the forked session.
335        // The forked session builds compaction state from its own messages.
336
337        tx.commit().await?;
338        Ok(())
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    async fn make_store() -> SqliteStore {
347        SqliteStore::new(":memory:")
348            .await
349            .expect("SqliteStore::new")
350    }
351
352    #[tokio::test]
353    async fn create_and_exists() {
354        let store = make_store().await;
355        store.create_acp_session("sess-1").await.unwrap();
356        assert!(store.acp_session_exists("sess-1").await.unwrap());
357        assert!(!store.acp_session_exists("sess-2").await.unwrap());
358    }
359
360    #[tokio::test]
361    async fn save_and_load_events() {
362        let store = make_store().await;
363        store.create_acp_session("sess-1").await.unwrap();
364        store
365            .save_acp_event("sess-1", "user_message", "hello")
366            .await
367            .unwrap();
368        store
369            .save_acp_event("sess-1", "agent_message", "world")
370            .await
371            .unwrap();
372
373        let events = store.load_acp_events("sess-1").await.unwrap();
374        assert_eq!(events.len(), 2);
375        assert_eq!(events[0].event_type, "user_message");
376        assert_eq!(events[0].payload, "hello");
377        assert_eq!(events[1].event_type, "agent_message");
378        assert_eq!(events[1].payload, "world");
379    }
380
381    #[tokio::test]
382    async fn delete_cascades_events() {
383        let store = make_store().await;
384        store.create_acp_session("sess-1").await.unwrap();
385        store
386            .save_acp_event("sess-1", "user_message", "hello")
387            .await
388            .unwrap();
389        store.delete_acp_session("sess-1").await.unwrap();
390
391        assert!(!store.acp_session_exists("sess-1").await.unwrap());
392        let events = store.load_acp_events("sess-1").await.unwrap();
393        assert!(events.is_empty());
394    }
395
396    #[tokio::test]
397    async fn load_events_empty_for_unknown() {
398        let store = make_store().await;
399        let events = store.load_acp_events("no-such").await.unwrap();
400        assert!(events.is_empty());
401    }
402
403    #[tokio::test]
404    async fn list_sessions_includes_title_and_message_count() {
405        let store = make_store().await;
406        store.create_acp_session("sess-b").await.unwrap();
407
408        // Sleep so that sess-a's events land in a different second than sess-b's
409        // created_at, making the updated_at DESC ordering deterministic.
410        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
411
412        store.create_acp_session("sess-a").await.unwrap();
413        store.save_acp_event("sess-a", "user", "hi").await.unwrap();
414        store
415            .save_acp_event("sess-a", "agent", "hello")
416            .await
417            .unwrap();
418        store
419            .update_session_title("sess-a", "My Chat")
420            .await
421            .unwrap();
422
423        let sessions = store.list_acp_sessions(100).await.unwrap();
424        // sess-a has events so updated_at is newer — should be first
425        assert_eq!(sessions[0].id, "sess-a");
426        assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
427        assert_eq!(sessions[0].message_count, 2);
428
429        // sess-b has no events
430        let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
431        assert!(b.title.is_none());
432        assert_eq!(b.message_count, 0);
433    }
434
435    #[tokio::test]
436    async fn list_sessions_respects_limit() {
437        let store = make_store().await;
438        for i in 0..5u8 {
439            store
440                .create_acp_session(&format!("sess-{i}"))
441                .await
442                .unwrap();
443        }
444        let sessions = store.list_acp_sessions(3).await.unwrap();
445        assert_eq!(sessions.len(), 3);
446    }
447
448    #[tokio::test]
449    async fn list_sessions_limit_one_boundary() {
450        let store = make_store().await;
451        for i in 0..3u8 {
452            store
453                .create_acp_session(&format!("sess-{i}"))
454                .await
455                .unwrap();
456        }
457        let sessions = store.list_acp_sessions(1).await.unwrap();
458        assert_eq!(sessions.len(), 1);
459    }
460
461    #[tokio::test]
462    async fn list_sessions_unlimited_when_zero() {
463        let store = make_store().await;
464        for i in 0..5u8 {
465            store
466                .create_acp_session(&format!("sess-{i}"))
467                .await
468                .unwrap();
469        }
470        let sessions = store.list_acp_sessions(0).await.unwrap();
471        assert_eq!(sessions.len(), 5);
472    }
473
474    #[tokio::test]
475    async fn get_acp_session_info_returns_none_for_missing() {
476        let store = make_store().await;
477        let info = store.get_acp_session_info("no-such").await.unwrap();
478        assert!(info.is_none());
479    }
480
481    #[tokio::test]
482    async fn get_acp_session_info_returns_data() {
483        let store = make_store().await;
484        store.create_acp_session("sess-x").await.unwrap();
485        store
486            .save_acp_event("sess-x", "user", "hello")
487            .await
488            .unwrap();
489        store.update_session_title("sess-x", "Test").await.unwrap();
490
491        let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
492        assert_eq!(info.id, "sess-x");
493        assert_eq!(info.title.as_deref(), Some("Test"));
494        assert_eq!(info.message_count, 1);
495    }
496
497    #[tokio::test]
498    async fn updated_at_trigger_fires_on_event_insert() {
499        let store = make_store().await;
500        store.create_acp_session("sess-t").await.unwrap();
501
502        let before = store
503            .get_acp_session_info("sess-t")
504            .await
505            .unwrap()
506            .unwrap()
507            .updated_at
508            .clone();
509
510        // Small sleep so datetime('now') differs
511        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
512
513        store
514            .save_acp_event("sess-t", "user", "ping")
515            .await
516            .unwrap();
517
518        let after = store
519            .get_acp_session_info("sess-t")
520            .await
521            .unwrap()
522            .unwrap()
523            .updated_at;
524
525        assert!(
526            after > before,
527            "updated_at should increase after event insert: before={before} after={after}"
528        );
529    }
530
531    #[tokio::test]
532    async fn create_session_with_conversation_and_retrieve() {
533        let store = make_store().await;
534        let cid = store.create_conversation().await.unwrap();
535        store
536            .create_acp_session_with_conversation("sess-1", cid)
537            .await
538            .unwrap();
539        let retrieved = store
540            .get_acp_session_conversation_id("sess-1")
541            .await
542            .unwrap();
543        assert_eq!(retrieved, Some(cid));
544    }
545
546    #[tokio::test]
547    async fn get_conversation_id_returns_none_for_legacy_session() {
548        let store = make_store().await;
549        store.create_acp_session("legacy").await.unwrap();
550        let cid = store
551            .get_acp_session_conversation_id("legacy")
552            .await
553            .unwrap();
554        assert!(cid.is_none());
555    }
556
557    #[tokio::test]
558    async fn get_conversation_id_returns_none_for_missing_session() {
559        let store = make_store().await;
560        let cid = store
561            .get_acp_session_conversation_id("no-such")
562            .await
563            .unwrap();
564        assert!(cid.is_none());
565    }
566
567    #[tokio::test]
568    async fn set_conversation_id_updates_existing_session() {
569        let store = make_store().await;
570        store.create_acp_session("sess-2").await.unwrap();
571        let cid = store.create_conversation().await.unwrap();
572        store
573            .set_acp_session_conversation_id("sess-2", cid)
574            .await
575            .unwrap();
576        let retrieved = store
577            .get_acp_session_conversation_id("sess-2")
578            .await
579            .unwrap();
580        assert_eq!(retrieved, Some(cid));
581    }
582
583    #[tokio::test]
584    async fn copy_conversation_copies_messages_in_order() {
585        use zeph_llm::provider::Role;
586        let store = make_store().await;
587        let src = store.create_conversation().await.unwrap();
588        store.save_message(src, "user", "hello").await.unwrap();
589        store.save_message(src, "assistant", "world").await.unwrap();
590
591        let dst = store.create_conversation().await.unwrap();
592        store.copy_conversation(src, dst).await.unwrap();
593
594        let msgs = store.load_history(dst, 100).await.unwrap();
595        assert_eq!(msgs.len(), 2);
596        assert_eq!(msgs[0].role, Role::User);
597        assert_eq!(msgs[0].content, "hello");
598        assert_eq!(msgs[1].role, Role::Assistant);
599        assert_eq!(msgs[1].content, "world");
600    }
601
602    #[tokio::test]
603    async fn copy_conversation_empty_source_is_noop() {
604        let store = make_store().await;
605        let src = store.create_conversation().await.unwrap();
606        let dst = store.create_conversation().await.unwrap();
607        store.copy_conversation(src, dst).await.unwrap();
608        let msgs = store.load_history(dst, 100).await.unwrap();
609        assert!(msgs.is_empty());
610    }
611
612    #[tokio::test]
613    async fn copy_conversation_does_not_copy_summaries() {
614        // Summaries are intentionally excluded because their first/last_message_id
615        // boundaries would reference source message IDs, corrupting the compaction cursor.
616        let store = make_store().await;
617        let src = store.create_conversation().await.unwrap();
618        store.save_message(src, "user", "hello").await.unwrap();
619        // Insert a summary directly so we can verify it is not copied.
620        zeph_db::query(
621            sql!("INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
622             VALUES (?, 'summary text', 1, 1, 10)"),
623        )
624        .bind(src)
625        .execute(&store.pool)
626        .await
627        .unwrap();
628
629        let dst = store.create_conversation().await.unwrap();
630        store.copy_conversation(src, dst).await.unwrap();
631
632        let count: i64 = zeph_db::query_scalar(sql!(
633            "SELECT COUNT(*) FROM summaries WHERE conversation_id = ?"
634        ))
635        .bind(dst)
636        .fetch_one(&store.pool)
637        .await
638        .unwrap();
639        assert_eq!(
640            count, 0,
641            "summaries must not be copied to forked conversation"
642        );
643    }
644
645    #[tokio::test]
646    async fn concurrent_sessions_get_distinct_conversation_ids() {
647        let store = make_store().await;
648        let cid1 = store.create_conversation().await.unwrap();
649        let cid2 = store.create_conversation().await.unwrap();
650        store
651            .create_acp_session_with_conversation("sess-a", cid1)
652            .await
653            .unwrap();
654        store
655            .create_acp_session_with_conversation("sess-b", cid2)
656            .await
657            .unwrap();
658
659        let retrieved1 = store
660            .get_acp_session_conversation_id("sess-a")
661            .await
662            .unwrap();
663        let retrieved2 = store
664            .get_acp_session_conversation_id("sess-b")
665            .await
666            .unwrap();
667
668        assert!(retrieved1.is_some());
669        assert!(retrieved2.is_some());
670        assert_ne!(
671            retrieved1, retrieved2,
672            "concurrent sessions must get distinct conversation_ids"
673        );
674    }
675}