Skip to main content

zeph_memory/sqlite/
acp_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::error::MemoryError;
5use crate::sqlite::SqliteStore;
6use crate::types::ConversationId;
7
8pub struct AcpSessionEvent {
9    pub event_type: String,
10    pub payload: String,
11    pub created_at: String,
12}
13
14pub struct AcpSessionInfo {
15    pub id: String,
16    pub title: Option<String>,
17    pub created_at: String,
18    pub updated_at: String,
19    pub message_count: i64,
20}
21
22impl SqliteStore {
23    /// Create a new ACP session record.
24    ///
25    /// # Errors
26    ///
27    /// Returns an error if the database write fails.
28    pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
29        sqlx::query("INSERT OR IGNORE INTO acp_sessions (id) VALUES (?)")
30            .bind(session_id)
31            .execute(&self.pool)
32            .await?;
33        Ok(())
34    }
35
36    /// Persist a single ACP session event.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the database write fails.
41    pub async fn save_acp_event(
42        &self,
43        session_id: &str,
44        event_type: &str,
45        payload: &str,
46    ) -> Result<(), MemoryError> {
47        sqlx::query(
48            "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
49        )
50        .bind(session_id)
51        .bind(event_type)
52        .bind(payload)
53        .execute(&self.pool)
54        .await?;
55        Ok(())
56    }
57
58    /// Load all events for an ACP session in insertion order.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if the database query fails.
63    pub async fn load_acp_events(
64        &self,
65        session_id: &str,
66    ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
67        let rows = sqlx::query_as::<_, (String, String, String)>(
68            "SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id",
69        )
70        .bind(session_id)
71        .fetch_all(&self.pool)
72        .await?;
73
74        Ok(rows
75            .into_iter()
76            .map(|(event_type, payload, created_at)| AcpSessionEvent {
77                event_type,
78                payload,
79                created_at,
80            })
81            .collect())
82    }
83
84    /// Delete an ACP session and its events (cascade).
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the database write fails.
89    pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
90        sqlx::query("DELETE FROM acp_sessions WHERE id = ?")
91            .bind(session_id)
92            .execute(&self.pool)
93            .await?;
94        Ok(())
95    }
96
97    /// List ACP sessions ordered by last activity descending.
98    ///
99    /// Includes title, `updated_at`, and message count per session.
100    /// Pass `limit = 0` for unlimited results.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the database query fails.
105    pub async fn list_acp_sessions(
106        &self,
107        limit: usize,
108    ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
109        // LIMIT -1 in SQLite means no limit; cast limit=0 sentinel to -1.
110        #[allow(clippy::cast_possible_wrap)]
111        let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
112        let rows = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
113            "SELECT s.id, s.title, s.created_at, s.updated_at, \
114             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
115             FROM acp_sessions s \
116             ORDER BY s.updated_at DESC \
117             LIMIT ?",
118        )
119        .bind(sql_limit)
120        .fetch_all(&self.pool)
121        .await?;
122
123        Ok(rows
124            .into_iter()
125            .map(
126                |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
127                    id,
128                    title,
129                    created_at,
130                    updated_at,
131                    message_count,
132                },
133            )
134            .collect())
135    }
136
137    /// Fetch metadata for a single ACP session.
138    ///
139    /// Returns `None` if the session does not exist.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the database query fails.
144    pub async fn get_acp_session_info(
145        &self,
146        session_id: &str,
147    ) -> Result<Option<AcpSessionInfo>, MemoryError> {
148        let row = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
149            "SELECT s.id, s.title, s.created_at, s.updated_at, \
150             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
151             FROM acp_sessions s \
152             WHERE s.id = ?",
153        )
154        .bind(session_id)
155        .fetch_optional(&self.pool)
156        .await?;
157
158        Ok(row.map(
159            |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
160                id,
161                title,
162                created_at,
163                updated_at,
164                message_count,
165            },
166        ))
167    }
168
169    /// Insert multiple events for a session inside a single transaction.
170    ///
171    /// Atomically writes all events or none. More efficient than individual inserts
172    /// for bulk import use cases.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if the transaction or any insert fails.
177    pub async fn import_acp_events(
178        &self,
179        session_id: &str,
180        events: &[(&str, &str)],
181    ) -> Result<(), MemoryError> {
182        let mut tx = self.pool.begin().await?;
183        for (event_type, payload) in events {
184            sqlx::query(
185                "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
186            )
187            .bind(session_id)
188            .bind(event_type)
189            .bind(payload)
190            .execute(&mut *tx)
191            .await?;
192        }
193        tx.commit().await?;
194        Ok(())
195    }
196
197    /// Update the title of an ACP session.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the database write fails.
202    pub async fn update_session_title(
203        &self,
204        session_id: &str,
205        title: &str,
206    ) -> Result<(), MemoryError> {
207        sqlx::query("UPDATE acp_sessions SET title = ? WHERE id = ?")
208            .bind(title)
209            .bind(session_id)
210            .execute(&self.pool)
211            .await?;
212        Ok(())
213    }
214
215    /// Check whether an ACP session record exists.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the database query fails.
220    pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
221        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM acp_sessions WHERE id = ?")
222            .bind(session_id)
223            .fetch_one(&self.pool)
224            .await?;
225        Ok(count > 0)
226    }
227
228    /// Create a new ACP session record with an associated conversation.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the database write fails.
233    pub async fn create_acp_session_with_conversation(
234        &self,
235        session_id: &str,
236        conversation_id: ConversationId,
237    ) -> Result<(), MemoryError> {
238        sqlx::query("INSERT OR IGNORE INTO acp_sessions (id, conversation_id) VALUES (?, ?)")
239            .bind(session_id)
240            .bind(conversation_id)
241            .execute(&self.pool)
242            .await?;
243        Ok(())
244    }
245
246    /// Get the conversation ID associated with an ACP session.
247    ///
248    /// Returns `None` if the session has no conversation mapping (legacy session).
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the database query fails.
253    pub async fn get_acp_session_conversation_id(
254        &self,
255        session_id: &str,
256    ) -> Result<Option<ConversationId>, MemoryError> {
257        let row: Option<(Option<ConversationId>,)> =
258            sqlx::query_as("SELECT conversation_id FROM acp_sessions WHERE id = ?")
259                .bind(session_id)
260                .fetch_optional(&self.pool)
261                .await?;
262        Ok(row.and_then(|(cid,)| cid))
263    }
264
265    /// Update the conversation mapping for an ACP session.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the database write fails.
270    pub async fn set_acp_session_conversation_id(
271        &self,
272        session_id: &str,
273        conversation_id: ConversationId,
274    ) -> Result<(), MemoryError> {
275        sqlx::query("UPDATE acp_sessions SET conversation_id = ? WHERE id = ?")
276            .bind(conversation_id)
277            .bind(session_id)
278            .execute(&self.pool)
279            .await?;
280        Ok(())
281    }
282
283    /// Copy all messages from one conversation to another, preserving order.
284    ///
285    /// Summaries are intentionally NOT copied: their `first_message_id`/`last_message_id`
286    /// reference message IDs from the source conversation which differ from the new IDs
287    /// assigned to the copied messages, making the compaction cursor incorrect. The forked
288    /// session inherits the full message history and builds its own compaction state from
289    /// scratch. Other per-conversation state also excluded: embeddings (re-indexed on demand),
290    /// deferred tool summaries (treated as fresh context budget).
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the database write fails.
295    pub async fn copy_conversation(
296        &self,
297        source: ConversationId,
298        target: ConversationId,
299    ) -> Result<(), MemoryError> {
300        let mut tx = self.pool.begin().await?;
301
302        // Copy messages in order. Only columns present across all migrations are included;
303        // per-message auto-fields (id, created_at, last_accessed, access_count, qdrant_cleaned)
304        // are excluded so they are generated fresh for the target conversation.
305        sqlx::query(
306            "INSERT INTO messages \
307                (conversation_id, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at) \
308             SELECT ?, role, content, parts, agent_visible, user_visible, compacted_at, deleted_at \
309             FROM messages WHERE conversation_id = ? ORDER BY id",
310        )
311        .bind(target)
312        .bind(source)
313        .execute(&mut *tx)
314        .await?;
315
316        // Summaries are NOT copied — their message ID boundaries reference the source
317        // conversation and would corrupt the compaction cursor in the forked session.
318        // The forked session builds compaction state from its own messages.
319
320        tx.commit().await?;
321        Ok(())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    async fn make_store() -> SqliteStore {
330        SqliteStore::new(":memory:")
331            .await
332            .expect("SqliteStore::new")
333    }
334
335    #[tokio::test]
336    async fn create_and_exists() {
337        let store = make_store().await;
338        store.create_acp_session("sess-1").await.unwrap();
339        assert!(store.acp_session_exists("sess-1").await.unwrap());
340        assert!(!store.acp_session_exists("sess-2").await.unwrap());
341    }
342
343    #[tokio::test]
344    async fn save_and_load_events() {
345        let store = make_store().await;
346        store.create_acp_session("sess-1").await.unwrap();
347        store
348            .save_acp_event("sess-1", "user_message", "hello")
349            .await
350            .unwrap();
351        store
352            .save_acp_event("sess-1", "agent_message", "world")
353            .await
354            .unwrap();
355
356        let events = store.load_acp_events("sess-1").await.unwrap();
357        assert_eq!(events.len(), 2);
358        assert_eq!(events[0].event_type, "user_message");
359        assert_eq!(events[0].payload, "hello");
360        assert_eq!(events[1].event_type, "agent_message");
361        assert_eq!(events[1].payload, "world");
362    }
363
364    #[tokio::test]
365    async fn delete_cascades_events() {
366        let store = make_store().await;
367        store.create_acp_session("sess-1").await.unwrap();
368        store
369            .save_acp_event("sess-1", "user_message", "hello")
370            .await
371            .unwrap();
372        store.delete_acp_session("sess-1").await.unwrap();
373
374        assert!(!store.acp_session_exists("sess-1").await.unwrap());
375        let events = store.load_acp_events("sess-1").await.unwrap();
376        assert!(events.is_empty());
377    }
378
379    #[tokio::test]
380    async fn load_events_empty_for_unknown() {
381        let store = make_store().await;
382        let events = store.load_acp_events("no-such").await.unwrap();
383        assert!(events.is_empty());
384    }
385
386    #[tokio::test]
387    async fn list_sessions_includes_title_and_message_count() {
388        let store = make_store().await;
389        store.create_acp_session("sess-b").await.unwrap();
390
391        // Sleep so that sess-a's events land in a different second than sess-b's
392        // created_at, making the updated_at DESC ordering deterministic.
393        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
394
395        store.create_acp_session("sess-a").await.unwrap();
396        store.save_acp_event("sess-a", "user", "hi").await.unwrap();
397        store
398            .save_acp_event("sess-a", "agent", "hello")
399            .await
400            .unwrap();
401        store
402            .update_session_title("sess-a", "My Chat")
403            .await
404            .unwrap();
405
406        let sessions = store.list_acp_sessions(100).await.unwrap();
407        // sess-a has events so updated_at is newer — should be first
408        assert_eq!(sessions[0].id, "sess-a");
409        assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
410        assert_eq!(sessions[0].message_count, 2);
411
412        // sess-b has no events
413        let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
414        assert!(b.title.is_none());
415        assert_eq!(b.message_count, 0);
416    }
417
418    #[tokio::test]
419    async fn list_sessions_respects_limit() {
420        let store = make_store().await;
421        for i in 0..5u8 {
422            store
423                .create_acp_session(&format!("sess-{i}"))
424                .await
425                .unwrap();
426        }
427        let sessions = store.list_acp_sessions(3).await.unwrap();
428        assert_eq!(sessions.len(), 3);
429    }
430
431    #[tokio::test]
432    async fn list_sessions_limit_one_boundary() {
433        let store = make_store().await;
434        for i in 0..3u8 {
435            store
436                .create_acp_session(&format!("sess-{i}"))
437                .await
438                .unwrap();
439        }
440        let sessions = store.list_acp_sessions(1).await.unwrap();
441        assert_eq!(sessions.len(), 1);
442    }
443
444    #[tokio::test]
445    async fn list_sessions_unlimited_when_zero() {
446        let store = make_store().await;
447        for i in 0..5u8 {
448            store
449                .create_acp_session(&format!("sess-{i}"))
450                .await
451                .unwrap();
452        }
453        let sessions = store.list_acp_sessions(0).await.unwrap();
454        assert_eq!(sessions.len(), 5);
455    }
456
457    #[tokio::test]
458    async fn get_acp_session_info_returns_none_for_missing() {
459        let store = make_store().await;
460        let info = store.get_acp_session_info("no-such").await.unwrap();
461        assert!(info.is_none());
462    }
463
464    #[tokio::test]
465    async fn get_acp_session_info_returns_data() {
466        let store = make_store().await;
467        store.create_acp_session("sess-x").await.unwrap();
468        store
469            .save_acp_event("sess-x", "user", "hello")
470            .await
471            .unwrap();
472        store.update_session_title("sess-x", "Test").await.unwrap();
473
474        let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
475        assert_eq!(info.id, "sess-x");
476        assert_eq!(info.title.as_deref(), Some("Test"));
477        assert_eq!(info.message_count, 1);
478    }
479
480    #[tokio::test]
481    async fn updated_at_trigger_fires_on_event_insert() {
482        let store = make_store().await;
483        store.create_acp_session("sess-t").await.unwrap();
484
485        let before = store
486            .get_acp_session_info("sess-t")
487            .await
488            .unwrap()
489            .unwrap()
490            .updated_at
491            .clone();
492
493        // Small sleep so datetime('now') differs
494        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
495
496        store
497            .save_acp_event("sess-t", "user", "ping")
498            .await
499            .unwrap();
500
501        let after = store
502            .get_acp_session_info("sess-t")
503            .await
504            .unwrap()
505            .unwrap()
506            .updated_at;
507
508        assert!(
509            after > before,
510            "updated_at should increase after event insert: before={before} after={after}"
511        );
512    }
513
514    #[tokio::test]
515    async fn create_session_with_conversation_and_retrieve() {
516        let store = make_store().await;
517        let cid = store.create_conversation().await.unwrap();
518        store
519            .create_acp_session_with_conversation("sess-1", cid)
520            .await
521            .unwrap();
522        let retrieved = store
523            .get_acp_session_conversation_id("sess-1")
524            .await
525            .unwrap();
526        assert_eq!(retrieved, Some(cid));
527    }
528
529    #[tokio::test]
530    async fn get_conversation_id_returns_none_for_legacy_session() {
531        let store = make_store().await;
532        store.create_acp_session("legacy").await.unwrap();
533        let cid = store
534            .get_acp_session_conversation_id("legacy")
535            .await
536            .unwrap();
537        assert!(cid.is_none());
538    }
539
540    #[tokio::test]
541    async fn get_conversation_id_returns_none_for_missing_session() {
542        let store = make_store().await;
543        let cid = store
544            .get_acp_session_conversation_id("no-such")
545            .await
546            .unwrap();
547        assert!(cid.is_none());
548    }
549
550    #[tokio::test]
551    async fn set_conversation_id_updates_existing_session() {
552        let store = make_store().await;
553        store.create_acp_session("sess-2").await.unwrap();
554        let cid = store.create_conversation().await.unwrap();
555        store
556            .set_acp_session_conversation_id("sess-2", cid)
557            .await
558            .unwrap();
559        let retrieved = store
560            .get_acp_session_conversation_id("sess-2")
561            .await
562            .unwrap();
563        assert_eq!(retrieved, Some(cid));
564    }
565
566    #[tokio::test]
567    async fn copy_conversation_copies_messages_in_order() {
568        use zeph_llm::provider::Role;
569        let store = make_store().await;
570        let src = store.create_conversation().await.unwrap();
571        store.save_message(src, "user", "hello").await.unwrap();
572        store.save_message(src, "assistant", "world").await.unwrap();
573
574        let dst = store.create_conversation().await.unwrap();
575        store.copy_conversation(src, dst).await.unwrap();
576
577        let msgs = store.load_history(dst, 100).await.unwrap();
578        assert_eq!(msgs.len(), 2);
579        assert_eq!(msgs[0].role, Role::User);
580        assert_eq!(msgs[0].content, "hello");
581        assert_eq!(msgs[1].role, Role::Assistant);
582        assert_eq!(msgs[1].content, "world");
583    }
584
585    #[tokio::test]
586    async fn copy_conversation_empty_source_is_noop() {
587        let store = make_store().await;
588        let src = store.create_conversation().await.unwrap();
589        let dst = store.create_conversation().await.unwrap();
590        store.copy_conversation(src, dst).await.unwrap();
591        let msgs = store.load_history(dst, 100).await.unwrap();
592        assert!(msgs.is_empty());
593    }
594
595    #[tokio::test]
596    async fn copy_conversation_does_not_copy_summaries() {
597        // Summaries are intentionally excluded because their first/last_message_id
598        // boundaries would reference source message IDs, corrupting the compaction cursor.
599        let store = make_store().await;
600        let src = store.create_conversation().await.unwrap();
601        store.save_message(src, "user", "hello").await.unwrap();
602        // Insert a summary directly so we can verify it is not copied.
603        sqlx::query(
604            "INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
605             VALUES (?, 'summary text', 1, 1, 10)",
606        )
607        .bind(src)
608        .execute(&store.pool)
609        .await
610        .unwrap();
611
612        let dst = store.create_conversation().await.unwrap();
613        store.copy_conversation(src, dst).await.unwrap();
614
615        let count: i64 =
616            sqlx::query_scalar("SELECT COUNT(*) FROM summaries WHERE conversation_id = ?")
617                .bind(dst)
618                .fetch_one(&store.pool)
619                .await
620                .unwrap();
621        assert_eq!(
622            count, 0,
623            "summaries must not be copied to forked conversation"
624        );
625    }
626
627    #[tokio::test]
628    async fn concurrent_sessions_get_distinct_conversation_ids() {
629        let store = make_store().await;
630        let cid1 = store.create_conversation().await.unwrap();
631        let cid2 = store.create_conversation().await.unwrap();
632        store
633            .create_acp_session_with_conversation("sess-a", cid1)
634            .await
635            .unwrap();
636        store
637            .create_acp_session_with_conversation("sess-b", cid2)
638            .await
639            .unwrap();
640
641        let retrieved1 = store
642            .get_acp_session_conversation_id("sess-a")
643            .await
644            .unwrap();
645        let retrieved2 = store
646            .get_acp_session_conversation_id("sess-b")
647            .await
648            .unwrap();
649
650        assert!(retrieved1.is_some());
651        assert!(retrieved2.is_some());
652        assert_ne!(
653            retrieved1, retrieved2,
654            "concurrent sessions must get distinct conversation_ids"
655        );
656    }
657}