Skip to main content

zeph_memory/store/
acp_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::error::MemoryError;
5use crate::store::SqliteStore;
6use crate::types::ConversationId;
7use zeph_db::ActiveDialect;
8#[allow(unused_imports)]
9use zeph_db::sql;
10
11pub struct AcpSessionEvent {
12    pub event_type: String,
13    pub payload: String,
14    pub created_at: String,
15}
16
17pub struct AcpSessionInfo {
18    pub id: String,
19    pub title: Option<String>,
20    pub created_at: String,
21    pub updated_at: String,
22    pub message_count: i64,
23}
24
25impl SqliteStore {
26    /// Create a new ACP session record.
27    ///
28    /// # Errors
29    ///
30    /// Returns an error if the database write fails.
31    pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
32        let sql = format!(
33            "{} INTO acp_sessions (id) VALUES (?){}",
34            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
35            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
36        );
37        zeph_db::query(&sql)
38            .bind(session_id)
39            .execute(&self.pool)
40            .await?;
41        Ok(())
42    }
43
44    /// Persist a single ACP session event.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the database write fails.
49    pub async fn save_acp_event(
50        &self,
51        session_id: &str,
52        event_type: &str,
53        payload: &str,
54    ) -> Result<(), MemoryError> {
55        zeph_db::query(sql!(
56            "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
57        ))
58        .bind(session_id)
59        .bind(event_type)
60        .bind(payload)
61        .execute(&self.pool)
62        .await?;
63        Ok(())
64    }
65
66    /// Load all events for an ACP session in insertion order.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the database query fails.
71    pub async fn load_acp_events(
72        &self,
73        session_id: &str,
74    ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
75        let rows = zeph_db::query_as::<_, (String, String, String)>(
76            sql!("SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id"),
77        )
78        .bind(session_id)
79        .fetch_all(&self.pool)
80        .await?;
81
82        Ok(rows
83            .into_iter()
84            .map(|(event_type, payload, created_at)| AcpSessionEvent {
85                event_type,
86                payload,
87                created_at,
88            })
89            .collect())
90    }
91
92    /// Delete an ACP session only if it exists; returns `true` when a row was deleted.
93    ///
94    /// Eliminates the separate exists-check + delete TOCTOU race by relying on a
95    /// single DELETE statement and inspecting affected rows.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the database write fails.
100    pub async fn delete_acp_session_checked(&self, session_id: &str) -> Result<bool, MemoryError> {
101        let result = zeph_db::query(sql!("DELETE FROM acp_sessions WHERE id = ?"))
102            .bind(session_id)
103            .execute(&self.pool)
104            .await?;
105        Ok(result.rows_affected() > 0)
106    }
107
108    /// List ACP sessions ordered by last activity descending.
109    ///
110    /// Includes title, `updated_at`, and message count per session.
111    /// Pass `limit = 0` for unlimited results.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the database query fails.
116    pub async fn list_acp_sessions(
117        &self,
118        limit: usize,
119    ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
120        // LIMIT -1 in SQLite means no limit; cast limit=0 sentinel to -1.
121        #[allow(clippy::cast_possible_wrap)]
122        let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
123        let rows = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
124            "SELECT s.id, s.title, s.created_at, s.updated_at, \
125             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
126             FROM acp_sessions s \
127             ORDER BY s.updated_at DESC \
128             LIMIT ?",
129        )
130        .bind(sql_limit)
131        .fetch_all(&self.pool)
132        .await?;
133
134        Ok(rows
135            .into_iter()
136            .map(
137                |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
138                    id,
139                    title,
140                    created_at,
141                    updated_at,
142                    message_count,
143                },
144            )
145            .collect())
146    }
147
148    /// Fetch metadata for a single ACP session.
149    ///
150    /// Returns `None` if the session does not exist.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the database query fails.
155    pub async fn get_acp_session_info(
156        &self,
157        session_id: &str,
158    ) -> Result<Option<AcpSessionInfo>, MemoryError> {
159        let row = zeph_db::query_as::<_, (String, Option<String>, String, String, i64)>(
160            "SELECT s.id, s.title, s.created_at, s.updated_at, \
161             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
162             FROM acp_sessions s \
163             WHERE s.id = ?",
164        )
165        .bind(session_id)
166        .fetch_optional(&self.pool)
167        .await?;
168
169        Ok(row.map(
170            |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
171                id,
172                title,
173                created_at,
174                updated_at,
175                message_count,
176            },
177        ))
178    }
179
180    /// Insert multiple events for a session inside a single transaction.
181    ///
182    /// Atomically writes all events or none. More efficient than individual inserts
183    /// for bulk import use cases.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the transaction or any insert fails.
188    pub async fn import_acp_events(
189        &self,
190        session_id: &str,
191        events: &[(&str, &str)],
192    ) -> Result<(), MemoryError> {
193        let mut tx = self.pool.begin().await?;
194        for (event_type, payload) in events {
195            zeph_db::query(sql!(
196                "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)"
197            ))
198            .bind(session_id)
199            .bind(event_type)
200            .bind(payload)
201            .execute(&mut *tx)
202            .await?;
203        }
204        tx.commit().await?;
205        Ok(())
206    }
207
208    /// Update the title of an ACP session.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the database write fails.
213    pub async fn update_session_title(
214        &self,
215        session_id: &str,
216        title: &str,
217    ) -> Result<(), MemoryError> {
218        zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
219            .bind(title)
220            .bind(session_id)
221            .execute(&self.pool)
222            .await?;
223        Ok(())
224    }
225
226    /// Update the title of an ACP session; returns `true` when the row was found and updated.
227    ///
228    /// Eliminates the separate exists-check + update TOCTOU race by relying on a
229    /// single UPDATE statement and inspecting affected rows.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the database write fails.
234    pub async fn update_session_title_checked(
235        &self,
236        session_id: &str,
237        title: &str,
238    ) -> Result<bool, MemoryError> {
239        let result = zeph_db::query(sql!("UPDATE acp_sessions SET title = ? WHERE id = ?"))
240            .bind(title)
241            .bind(session_id)
242            .execute(&self.pool)
243            .await?;
244        Ok(result.rows_affected() > 0)
245    }
246
247    /// Check whether an ACP session record exists.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the database query fails.
252    pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
253        let count: i64 =
254            zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM acp_sessions WHERE id = ?"))
255                .bind(session_id)
256                .fetch_one(&self.pool)
257                .await?;
258        Ok(count > 0)
259    }
260
261    /// Create a new ACP session record with an associated conversation.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the database write fails.
266    pub async fn create_acp_session_with_conversation(
267        &self,
268        session_id: &str,
269        conversation_id: ConversationId,
270    ) -> Result<(), MemoryError> {
271        let sql = format!(
272            "{} INTO acp_sessions (id, conversation_id) VALUES (?, ?){}",
273            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
274            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
275        );
276        zeph_db::query(&sql)
277            .bind(session_id)
278            .bind(conversation_id)
279            .execute(&self.pool)
280            .await?;
281        Ok(())
282    }
283
284    /// Get the conversation ID associated with an ACP session.
285    ///
286    /// Returns `None` if the session has no conversation mapping (legacy session).
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the database query fails.
291    pub async fn get_acp_session_conversation_id(
292        &self,
293        session_id: &str,
294    ) -> Result<Option<ConversationId>, MemoryError> {
295        let row: Option<(Option<ConversationId>,)> = zeph_db::query_as(sql!(
296            "SELECT conversation_id FROM acp_sessions WHERE id = ?"
297        ))
298        .bind(session_id)
299        .fetch_optional(&self.pool)
300        .await?;
301        Ok(row.and_then(|(cid,)| cid))
302    }
303
304    /// Update the conversation mapping for an ACP session.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the database write fails.
309    pub async fn set_acp_session_conversation_id(
310        &self,
311        session_id: &str,
312        conversation_id: ConversationId,
313    ) -> Result<(), MemoryError> {
314        zeph_db::query(sql!(
315            "UPDATE acp_sessions SET conversation_id = ? WHERE id = ?"
316        ))
317        .bind(conversation_id)
318        .bind(session_id)
319        .execute(&self.pool)
320        .await?;
321        Ok(())
322    }
323
324    /// Copy all messages from one conversation to another, preserving order.
325    ///
326    /// Summaries are intentionally NOT copied: their `first_message_id`/`last_message_id`
327    /// reference message IDs from the source conversation which differ from the new IDs
328    /// assigned to the copied messages, making the compaction cursor incorrect. The forked
329    /// session inherits the full message history and builds its own compaction state from
330    /// scratch. Other per-conversation state also excluded: embeddings (re-indexed on demand),
331    /// deferred tool summaries (treated as fresh context budget).
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the database write fails.
336    pub async fn copy_conversation(
337        &self,
338        source: ConversationId,
339        target: ConversationId,
340    ) -> Result<(), MemoryError> {
341        let mut tx = self.pool.begin().await?;
342
343        // Copy messages in order. Only columns present across all migrations are included;
344        // per-message auto-fields (id, created_at, last_accessed, access_count, qdrant_cleaned)
345        // are excluded so they are generated fresh for the target conversation.
346        zeph_db::query(sql!(
347            "INSERT INTO messages \
348                (conversation_id, role, content, parts, visibility, compacted_at, deleted_at) \
349             SELECT ?, role, content, parts, visibility, compacted_at, deleted_at \
350             FROM messages WHERE conversation_id = ? ORDER BY id"
351        ))
352        .bind(target)
353        .bind(source)
354        .execute(&mut *tx)
355        .await?;
356
357        // Summaries are NOT copied — their message ID boundaries reference the source
358        // conversation and would corrupt the compaction cursor in the forked session.
359        // The forked session builds compaction state from its own messages.
360
361        tx.commit().await?;
362        Ok(())
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    async fn make_store() -> SqliteStore {
371        SqliteStore::new(":memory:")
372            .await
373            .expect("SqliteStore::new")
374    }
375
376    #[tokio::test]
377    async fn create_and_exists() {
378        let store = make_store().await;
379        store.create_acp_session("sess-1").await.unwrap();
380        assert!(store.acp_session_exists("sess-1").await.unwrap());
381        assert!(!store.acp_session_exists("sess-2").await.unwrap());
382    }
383
384    #[tokio::test]
385    async fn save_and_load_events() {
386        let store = make_store().await;
387        store.create_acp_session("sess-1").await.unwrap();
388        store
389            .save_acp_event("sess-1", "user_message", "hello")
390            .await
391            .unwrap();
392        store
393            .save_acp_event("sess-1", "agent_message", "world")
394            .await
395            .unwrap();
396
397        let events = store.load_acp_events("sess-1").await.unwrap();
398        assert_eq!(events.len(), 2);
399        assert_eq!(events[0].event_type, "user_message");
400        assert_eq!(events[0].payload, "hello");
401        assert_eq!(events[1].event_type, "agent_message");
402        assert_eq!(events[1].payload, "world");
403    }
404
405    #[tokio::test]
406    async fn delete_cascades_events() {
407        let store = make_store().await;
408        store.create_acp_session("sess-1").await.unwrap();
409        store
410            .save_acp_event("sess-1", "user_message", "hello")
411            .await
412            .unwrap();
413        store.delete_acp_session_checked("sess-1").await.unwrap();
414
415        assert!(!store.acp_session_exists("sess-1").await.unwrap());
416        let events = store.load_acp_events("sess-1").await.unwrap();
417        assert!(events.is_empty());
418    }
419
420    #[tokio::test]
421    async fn load_events_empty_for_unknown() {
422        let store = make_store().await;
423        let events = store.load_acp_events("no-such").await.unwrap();
424        assert!(events.is_empty());
425    }
426
427    #[tokio::test]
428    async fn list_sessions_includes_title_and_message_count() {
429        let store = make_store().await;
430        store.create_acp_session("sess-b").await.unwrap();
431
432        // Sleep so that sess-a's events land in a different second than sess-b's
433        // created_at, making the updated_at DESC ordering deterministic.
434        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
435
436        store.create_acp_session("sess-a").await.unwrap();
437        store.save_acp_event("sess-a", "user", "hi").await.unwrap();
438        store
439            .save_acp_event("sess-a", "agent", "hello")
440            .await
441            .unwrap();
442        store
443            .update_session_title("sess-a", "My Chat")
444            .await
445            .unwrap();
446
447        let sessions = store.list_acp_sessions(100).await.unwrap();
448        // sess-a has events so updated_at is newer — should be first
449        assert_eq!(sessions[0].id, "sess-a");
450        assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
451        assert_eq!(sessions[0].message_count, 2);
452
453        // sess-b has no events
454        let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
455        assert!(b.title.is_none());
456        assert_eq!(b.message_count, 0);
457    }
458
459    #[tokio::test]
460    async fn list_sessions_respects_limit() {
461        let store = make_store().await;
462        for i in 0..5u8 {
463            store
464                .create_acp_session(&format!("sess-{i}"))
465                .await
466                .unwrap();
467        }
468        let sessions = store.list_acp_sessions(3).await.unwrap();
469        assert_eq!(sessions.len(), 3);
470    }
471
472    #[tokio::test]
473    async fn list_sessions_limit_one_boundary() {
474        let store = make_store().await;
475        for i in 0..3u8 {
476            store
477                .create_acp_session(&format!("sess-{i}"))
478                .await
479                .unwrap();
480        }
481        let sessions = store.list_acp_sessions(1).await.unwrap();
482        assert_eq!(sessions.len(), 1);
483    }
484
485    #[tokio::test]
486    async fn list_sessions_unlimited_when_zero() {
487        let store = make_store().await;
488        for i in 0..5u8 {
489            store
490                .create_acp_session(&format!("sess-{i}"))
491                .await
492                .unwrap();
493        }
494        let sessions = store.list_acp_sessions(0).await.unwrap();
495        assert_eq!(sessions.len(), 5);
496    }
497
498    #[tokio::test]
499    async fn get_acp_session_info_returns_none_for_missing() {
500        let store = make_store().await;
501        let info = store.get_acp_session_info("no-such").await.unwrap();
502        assert!(info.is_none());
503    }
504
505    #[tokio::test]
506    async fn get_acp_session_info_returns_data() {
507        let store = make_store().await;
508        store.create_acp_session("sess-x").await.unwrap();
509        store
510            .save_acp_event("sess-x", "user", "hello")
511            .await
512            .unwrap();
513        store.update_session_title("sess-x", "Test").await.unwrap();
514
515        let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
516        assert_eq!(info.id, "sess-x");
517        assert_eq!(info.title.as_deref(), Some("Test"));
518        assert_eq!(info.message_count, 1);
519    }
520
521    #[tokio::test]
522    async fn updated_at_trigger_fires_on_event_insert() {
523        let store = make_store().await;
524        store.create_acp_session("sess-t").await.unwrap();
525
526        let before = store
527            .get_acp_session_info("sess-t")
528            .await
529            .unwrap()
530            .unwrap()
531            .updated_at
532            .clone();
533
534        // Small sleep so datetime('now') differs
535        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
536
537        store
538            .save_acp_event("sess-t", "user", "ping")
539            .await
540            .unwrap();
541
542        let after = store
543            .get_acp_session_info("sess-t")
544            .await
545            .unwrap()
546            .unwrap()
547            .updated_at;
548
549        assert!(
550            after > before,
551            "updated_at should increase after event insert: before={before} after={after}"
552        );
553    }
554
555    #[tokio::test]
556    async fn create_session_with_conversation_and_retrieve() {
557        let store = make_store().await;
558        let cid = store.create_conversation().await.unwrap();
559        store
560            .create_acp_session_with_conversation("sess-1", cid)
561            .await
562            .unwrap();
563        let retrieved = store
564            .get_acp_session_conversation_id("sess-1")
565            .await
566            .unwrap();
567        assert_eq!(retrieved, Some(cid));
568    }
569
570    #[tokio::test]
571    async fn get_conversation_id_returns_none_for_legacy_session() {
572        let store = make_store().await;
573        store.create_acp_session("legacy").await.unwrap();
574        let cid = store
575            .get_acp_session_conversation_id("legacy")
576            .await
577            .unwrap();
578        assert!(cid.is_none());
579    }
580
581    #[tokio::test]
582    async fn get_conversation_id_returns_none_for_missing_session() {
583        let store = make_store().await;
584        let cid = store
585            .get_acp_session_conversation_id("no-such")
586            .await
587            .unwrap();
588        assert!(cid.is_none());
589    }
590
591    #[tokio::test]
592    async fn set_conversation_id_updates_existing_session() {
593        let store = make_store().await;
594        store.create_acp_session("sess-2").await.unwrap();
595        let cid = store.create_conversation().await.unwrap();
596        store
597            .set_acp_session_conversation_id("sess-2", cid)
598            .await
599            .unwrap();
600        let retrieved = store
601            .get_acp_session_conversation_id("sess-2")
602            .await
603            .unwrap();
604        assert_eq!(retrieved, Some(cid));
605    }
606
607    #[tokio::test]
608    async fn copy_conversation_copies_messages_in_order() {
609        use zeph_llm::provider::Role;
610        let store = make_store().await;
611        let src = store.create_conversation().await.unwrap();
612        store.save_message(src, "user", "hello").await.unwrap();
613        store.save_message(src, "assistant", "world").await.unwrap();
614
615        let dst = store.create_conversation().await.unwrap();
616        store.copy_conversation(src, dst).await.unwrap();
617
618        let msgs = store.load_history(dst, 100).await.unwrap();
619        assert_eq!(msgs.len(), 2);
620        assert_eq!(msgs[0].role, Role::User);
621        assert_eq!(msgs[0].content, "hello");
622        assert_eq!(msgs[1].role, Role::Assistant);
623        assert_eq!(msgs[1].content, "world");
624    }
625
626    #[tokio::test]
627    async fn copy_conversation_empty_source_is_noop() {
628        let store = make_store().await;
629        let src = store.create_conversation().await.unwrap();
630        let dst = store.create_conversation().await.unwrap();
631        store.copy_conversation(src, dst).await.unwrap();
632        let msgs = store.load_history(dst, 100).await.unwrap();
633        assert!(msgs.is_empty());
634    }
635
636    #[tokio::test]
637    async fn copy_conversation_does_not_copy_summaries() {
638        // Summaries are intentionally excluded because their first/last_message_id
639        // boundaries would reference source message IDs, corrupting the compaction cursor.
640        let store = make_store().await;
641        let src = store.create_conversation().await.unwrap();
642        store.save_message(src, "user", "hello").await.unwrap();
643        // Insert a summary directly so we can verify it is not copied.
644        zeph_db::query(
645            sql!("INSERT INTO summaries (conversation_id, content, first_message_id, last_message_id, token_estimate) \
646             VALUES (?, 'summary text', 1, 1, 10)"),
647        )
648        .bind(src)
649        .execute(&store.pool)
650        .await
651        .unwrap();
652
653        let dst = store.create_conversation().await.unwrap();
654        store.copy_conversation(src, dst).await.unwrap();
655
656        let count: i64 = zeph_db::query_scalar(sql!(
657            "SELECT COUNT(*) FROM summaries WHERE conversation_id = ?"
658        ))
659        .bind(dst)
660        .fetch_one(&store.pool)
661        .await
662        .unwrap();
663        assert_eq!(
664            count, 0,
665            "summaries must not be copied to forked conversation"
666        );
667    }
668
669    #[tokio::test]
670    async fn concurrent_sessions_get_distinct_conversation_ids() {
671        let store = make_store().await;
672        let cid1 = store.create_conversation().await.unwrap();
673        let cid2 = store.create_conversation().await.unwrap();
674        store
675            .create_acp_session_with_conversation("sess-a", cid1)
676            .await
677            .unwrap();
678        store
679            .create_acp_session_with_conversation("sess-b", cid2)
680            .await
681            .unwrap();
682
683        let retrieved1 = store
684            .get_acp_session_conversation_id("sess-a")
685            .await
686            .unwrap();
687        let retrieved2 = store
688            .get_acp_session_conversation_id("sess-b")
689            .await
690            .unwrap();
691
692        assert!(retrieved1.is_some());
693        assert!(retrieved2.is_some());
694        assert_ne!(
695            retrieved1, retrieved2,
696            "concurrent sessions must get distinct conversation_ids"
697        );
698    }
699}