Skip to main content

zeph_memory/sqlite/
acp_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::error::MemoryError;
5use crate::sqlite::SqliteStore;
6
7pub struct AcpSessionEvent {
8    pub event_type: String,
9    pub payload: String,
10}
11
12pub struct AcpSessionInfo {
13    pub id: String,
14    pub created_at: String,
15}
16
17impl SqliteStore {
18    /// Create a new ACP session record.
19    ///
20    /// # Errors
21    ///
22    /// Returns an error if the database write fails.
23    pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
24        sqlx::query("INSERT OR IGNORE INTO acp_sessions (id) VALUES (?)")
25            .bind(session_id)
26            .execute(&self.pool)
27            .await?;
28        Ok(())
29    }
30
31    /// Persist a single ACP session event.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the database write fails.
36    pub async fn save_acp_event(
37        &self,
38        session_id: &str,
39        event_type: &str,
40        payload: &str,
41    ) -> Result<(), MemoryError> {
42        sqlx::query(
43            "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
44        )
45        .bind(session_id)
46        .bind(event_type)
47        .bind(payload)
48        .execute(&self.pool)
49        .await?;
50        Ok(())
51    }
52
53    /// Load all events for an ACP session in insertion order.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the database query fails.
58    pub async fn load_acp_events(
59        &self,
60        session_id: &str,
61    ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
62        let rows = sqlx::query_as::<_, (String, String)>(
63            "SELECT event_type, payload FROM acp_session_events WHERE session_id = ? ORDER BY id",
64        )
65        .bind(session_id)
66        .fetch_all(&self.pool)
67        .await?;
68
69        Ok(rows
70            .into_iter()
71            .map(|(event_type, payload)| AcpSessionEvent {
72                event_type,
73                payload,
74            })
75            .collect())
76    }
77
78    /// Delete an ACP session and its events (cascade).
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the database write fails.
83    pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
84        sqlx::query("DELETE FROM acp_sessions WHERE id = ?")
85            .bind(session_id)
86            .execute(&self.pool)
87            .await?;
88        Ok(())
89    }
90
91    /// List all ACP sessions ordered by creation time descending.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the database query fails.
96    pub async fn list_acp_sessions(&self) -> Result<Vec<AcpSessionInfo>, MemoryError> {
97        let rows = sqlx::query_as::<_, (String, String)>(
98            "SELECT id, created_at FROM acp_sessions ORDER BY created_at DESC",
99        )
100        .fetch_all(&self.pool)
101        .await?;
102        Ok(rows
103            .into_iter()
104            .map(|(id, created_at)| AcpSessionInfo { id, created_at })
105            .collect())
106    }
107
108    /// Insert multiple events for a session inside a single transaction.
109    ///
110    /// Atomically writes all events or none. More efficient than individual inserts
111    /// for bulk import use cases.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the transaction or any insert fails.
116    pub async fn import_acp_events(
117        &self,
118        session_id: &str,
119        events: &[(&str, &str)],
120    ) -> Result<(), MemoryError> {
121        let mut tx = self.pool.begin().await?;
122        for (event_type, payload) in events {
123            sqlx::query(
124                "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
125            )
126            .bind(session_id)
127            .bind(event_type)
128            .bind(payload)
129            .execute(&mut *tx)
130            .await?;
131        }
132        tx.commit().await?;
133        Ok(())
134    }
135
136    /// Check whether an ACP session record exists.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the database query fails.
141    pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
142        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM acp_sessions WHERE id = ?")
143            .bind(session_id)
144            .fetch_one(&self.pool)
145            .await?;
146        Ok(count > 0)
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153
154    async fn make_store() -> SqliteStore {
155        SqliteStore::new(":memory:")
156            .await
157            .expect("SqliteStore::new")
158    }
159
160    #[tokio::test]
161    async fn create_and_exists() {
162        let store = make_store().await;
163        store.create_acp_session("sess-1").await.unwrap();
164        assert!(store.acp_session_exists("sess-1").await.unwrap());
165        assert!(!store.acp_session_exists("sess-2").await.unwrap());
166    }
167
168    #[tokio::test]
169    async fn save_and_load_events() {
170        let store = make_store().await;
171        store.create_acp_session("sess-1").await.unwrap();
172        store
173            .save_acp_event("sess-1", "user_message", "hello")
174            .await
175            .unwrap();
176        store
177            .save_acp_event("sess-1", "agent_message", "world")
178            .await
179            .unwrap();
180
181        let events = store.load_acp_events("sess-1").await.unwrap();
182        assert_eq!(events.len(), 2);
183        assert_eq!(events[0].event_type, "user_message");
184        assert_eq!(events[0].payload, "hello");
185        assert_eq!(events[1].event_type, "agent_message");
186        assert_eq!(events[1].payload, "world");
187    }
188
189    #[tokio::test]
190    async fn delete_cascades_events() {
191        let store = make_store().await;
192        store.create_acp_session("sess-1").await.unwrap();
193        store
194            .save_acp_event("sess-1", "user_message", "hello")
195            .await
196            .unwrap();
197        store.delete_acp_session("sess-1").await.unwrap();
198
199        assert!(!store.acp_session_exists("sess-1").await.unwrap());
200        let events = store.load_acp_events("sess-1").await.unwrap();
201        assert!(events.is_empty());
202    }
203
204    #[tokio::test]
205    async fn load_events_empty_for_unknown() {
206        let store = make_store().await;
207        let events = store.load_acp_events("no-such").await.unwrap();
208        assert!(events.is_empty());
209    }
210}