Skip to main content

zeph_memory/sqlite/
acp_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::error::MemoryError;
5use crate::sqlite::SqliteStore;
6
7pub struct AcpSessionEvent {
8    pub event_type: String,
9    pub payload: String,
10    pub created_at: String,
11}
12
13pub struct AcpSessionInfo {
14    pub id: String,
15    pub title: Option<String>,
16    pub created_at: String,
17    pub updated_at: String,
18    pub message_count: i64,
19}
20
21impl SqliteStore {
22    /// Create a new ACP session record.
23    ///
24    /// # Errors
25    ///
26    /// Returns an error if the database write fails.
27    pub async fn create_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
28        sqlx::query("INSERT OR IGNORE INTO acp_sessions (id) VALUES (?)")
29            .bind(session_id)
30            .execute(&self.pool)
31            .await?;
32        Ok(())
33    }
34
35    /// Persist a single ACP session event.
36    ///
37    /// # Errors
38    ///
39    /// Returns an error if the database write fails.
40    pub async fn save_acp_event(
41        &self,
42        session_id: &str,
43        event_type: &str,
44        payload: &str,
45    ) -> Result<(), MemoryError> {
46        sqlx::query(
47            "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
48        )
49        .bind(session_id)
50        .bind(event_type)
51        .bind(payload)
52        .execute(&self.pool)
53        .await?;
54        Ok(())
55    }
56
57    /// Load all events for an ACP session in insertion order.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the database query fails.
62    pub async fn load_acp_events(
63        &self,
64        session_id: &str,
65    ) -> Result<Vec<AcpSessionEvent>, MemoryError> {
66        let rows = sqlx::query_as::<_, (String, String, String)>(
67            "SELECT event_type, payload, created_at FROM acp_session_events WHERE session_id = ? ORDER BY id",
68        )
69        .bind(session_id)
70        .fetch_all(&self.pool)
71        .await?;
72
73        Ok(rows
74            .into_iter()
75            .map(|(event_type, payload, created_at)| AcpSessionEvent {
76                event_type,
77                payload,
78                created_at,
79            })
80            .collect())
81    }
82
83    /// Delete an ACP session and its events (cascade).
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the database write fails.
88    pub async fn delete_acp_session(&self, session_id: &str) -> Result<(), MemoryError> {
89        sqlx::query("DELETE FROM acp_sessions WHERE id = ?")
90            .bind(session_id)
91            .execute(&self.pool)
92            .await?;
93        Ok(())
94    }
95
96    /// List ACP sessions ordered by last activity descending.
97    ///
98    /// Includes title, `updated_at`, and message count per session.
99    /// Pass `limit = 0` for unlimited results.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the database query fails.
104    pub async fn list_acp_sessions(
105        &self,
106        limit: usize,
107    ) -> Result<Vec<AcpSessionInfo>, MemoryError> {
108        // LIMIT -1 in SQLite means no limit; cast limit=0 sentinel to -1.
109        #[allow(clippy::cast_possible_wrap)]
110        let sql_limit: i64 = if limit == 0 { -1 } else { limit as i64 };
111        let rows = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
112            "SELECT s.id, s.title, s.created_at, s.updated_at, \
113             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
114             FROM acp_sessions s \
115             ORDER BY s.updated_at DESC \
116             LIMIT ?",
117        )
118        .bind(sql_limit)
119        .fetch_all(&self.pool)
120        .await?;
121
122        Ok(rows
123            .into_iter()
124            .map(
125                |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
126                    id,
127                    title,
128                    created_at,
129                    updated_at,
130                    message_count,
131                },
132            )
133            .collect())
134    }
135
136    /// Fetch metadata for a single ACP session.
137    ///
138    /// Returns `None` if the session does not exist.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the database query fails.
143    pub async fn get_acp_session_info(
144        &self,
145        session_id: &str,
146    ) -> Result<Option<AcpSessionInfo>, MemoryError> {
147        let row = sqlx::query_as::<_, (String, Option<String>, String, String, i64)>(
148            "SELECT s.id, s.title, s.created_at, s.updated_at, \
149             (SELECT COUNT(*) FROM acp_session_events WHERE session_id = s.id) AS message_count \
150             FROM acp_sessions s \
151             WHERE s.id = ?",
152        )
153        .bind(session_id)
154        .fetch_optional(&self.pool)
155        .await?;
156
157        Ok(row.map(
158            |(id, title, created_at, updated_at, message_count)| AcpSessionInfo {
159                id,
160                title,
161                created_at,
162                updated_at,
163                message_count,
164            },
165        ))
166    }
167
168    /// Insert multiple events for a session inside a single transaction.
169    ///
170    /// Atomically writes all events or none. More efficient than individual inserts
171    /// for bulk import use cases.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the transaction or any insert fails.
176    pub async fn import_acp_events(
177        &self,
178        session_id: &str,
179        events: &[(&str, &str)],
180    ) -> Result<(), MemoryError> {
181        let mut tx = self.pool.begin().await?;
182        for (event_type, payload) in events {
183            sqlx::query(
184                "INSERT INTO acp_session_events (session_id, event_type, payload) VALUES (?, ?, ?)",
185            )
186            .bind(session_id)
187            .bind(event_type)
188            .bind(payload)
189            .execute(&mut *tx)
190            .await?;
191        }
192        tx.commit().await?;
193        Ok(())
194    }
195
196    /// Update the title of an ACP session.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the database write fails.
201    pub async fn update_session_title(
202        &self,
203        session_id: &str,
204        title: &str,
205    ) -> Result<(), MemoryError> {
206        sqlx::query("UPDATE acp_sessions SET title = ? WHERE id = ?")
207            .bind(title)
208            .bind(session_id)
209            .execute(&self.pool)
210            .await?;
211        Ok(())
212    }
213
214    /// Check whether an ACP session record exists.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the database query fails.
219    pub async fn acp_session_exists(&self, session_id: &str) -> Result<bool, MemoryError> {
220        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM acp_sessions WHERE id = ?")
221            .bind(session_id)
222            .fetch_one(&self.pool)
223            .await?;
224        Ok(count > 0)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    async fn make_store() -> SqliteStore {
233        SqliteStore::new(":memory:")
234            .await
235            .expect("SqliteStore::new")
236    }
237
238    #[tokio::test]
239    async fn create_and_exists() {
240        let store = make_store().await;
241        store.create_acp_session("sess-1").await.unwrap();
242        assert!(store.acp_session_exists("sess-1").await.unwrap());
243        assert!(!store.acp_session_exists("sess-2").await.unwrap());
244    }
245
246    #[tokio::test]
247    async fn save_and_load_events() {
248        let store = make_store().await;
249        store.create_acp_session("sess-1").await.unwrap();
250        store
251            .save_acp_event("sess-1", "user_message", "hello")
252            .await
253            .unwrap();
254        store
255            .save_acp_event("sess-1", "agent_message", "world")
256            .await
257            .unwrap();
258
259        let events = store.load_acp_events("sess-1").await.unwrap();
260        assert_eq!(events.len(), 2);
261        assert_eq!(events[0].event_type, "user_message");
262        assert_eq!(events[0].payload, "hello");
263        assert_eq!(events[1].event_type, "agent_message");
264        assert_eq!(events[1].payload, "world");
265    }
266
267    #[tokio::test]
268    async fn delete_cascades_events() {
269        let store = make_store().await;
270        store.create_acp_session("sess-1").await.unwrap();
271        store
272            .save_acp_event("sess-1", "user_message", "hello")
273            .await
274            .unwrap();
275        store.delete_acp_session("sess-1").await.unwrap();
276
277        assert!(!store.acp_session_exists("sess-1").await.unwrap());
278        let events = store.load_acp_events("sess-1").await.unwrap();
279        assert!(events.is_empty());
280    }
281
282    #[tokio::test]
283    async fn load_events_empty_for_unknown() {
284        let store = make_store().await;
285        let events = store.load_acp_events("no-such").await.unwrap();
286        assert!(events.is_empty());
287    }
288
289    #[tokio::test]
290    async fn list_sessions_includes_title_and_message_count() {
291        let store = make_store().await;
292        store.create_acp_session("sess-b").await.unwrap();
293
294        // Sleep so that sess-a's events land in a different second than sess-b's
295        // created_at, making the updated_at DESC ordering deterministic.
296        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
297
298        store.create_acp_session("sess-a").await.unwrap();
299        store.save_acp_event("sess-a", "user", "hi").await.unwrap();
300        store
301            .save_acp_event("sess-a", "agent", "hello")
302            .await
303            .unwrap();
304        store
305            .update_session_title("sess-a", "My Chat")
306            .await
307            .unwrap();
308
309        let sessions = store.list_acp_sessions(100).await.unwrap();
310        // sess-a has events so updated_at is newer — should be first
311        assert_eq!(sessions[0].id, "sess-a");
312        assert_eq!(sessions[0].title.as_deref(), Some("My Chat"));
313        assert_eq!(sessions[0].message_count, 2);
314
315        // sess-b has no events
316        let b = sessions.iter().find(|s| s.id == "sess-b").unwrap();
317        assert!(b.title.is_none());
318        assert_eq!(b.message_count, 0);
319    }
320
321    #[tokio::test]
322    async fn list_sessions_respects_limit() {
323        let store = make_store().await;
324        for i in 0..5u8 {
325            store
326                .create_acp_session(&format!("sess-{i}"))
327                .await
328                .unwrap();
329        }
330        let sessions = store.list_acp_sessions(3).await.unwrap();
331        assert_eq!(sessions.len(), 3);
332    }
333
334    #[tokio::test]
335    async fn list_sessions_limit_one_boundary() {
336        let store = make_store().await;
337        for i in 0..3u8 {
338            store
339                .create_acp_session(&format!("sess-{i}"))
340                .await
341                .unwrap();
342        }
343        let sessions = store.list_acp_sessions(1).await.unwrap();
344        assert_eq!(sessions.len(), 1);
345    }
346
347    #[tokio::test]
348    async fn list_sessions_unlimited_when_zero() {
349        let store = make_store().await;
350        for i in 0..5u8 {
351            store
352                .create_acp_session(&format!("sess-{i}"))
353                .await
354                .unwrap();
355        }
356        let sessions = store.list_acp_sessions(0).await.unwrap();
357        assert_eq!(sessions.len(), 5);
358    }
359
360    #[tokio::test]
361    async fn get_acp_session_info_returns_none_for_missing() {
362        let store = make_store().await;
363        let info = store.get_acp_session_info("no-such").await.unwrap();
364        assert!(info.is_none());
365    }
366
367    #[tokio::test]
368    async fn get_acp_session_info_returns_data() {
369        let store = make_store().await;
370        store.create_acp_session("sess-x").await.unwrap();
371        store
372            .save_acp_event("sess-x", "user", "hello")
373            .await
374            .unwrap();
375        store.update_session_title("sess-x", "Test").await.unwrap();
376
377        let info = store.get_acp_session_info("sess-x").await.unwrap().unwrap();
378        assert_eq!(info.id, "sess-x");
379        assert_eq!(info.title.as_deref(), Some("Test"));
380        assert_eq!(info.message_count, 1);
381    }
382
383    #[tokio::test]
384    async fn updated_at_trigger_fires_on_event_insert() {
385        let store = make_store().await;
386        store.create_acp_session("sess-t").await.unwrap();
387
388        let before = store
389            .get_acp_session_info("sess-t")
390            .await
391            .unwrap()
392            .unwrap()
393            .updated_at
394            .clone();
395
396        // Small sleep so datetime('now') differs
397        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
398
399        store
400            .save_acp_event("sess-t", "user", "ping")
401            .await
402            .unwrap();
403
404        let after = store
405            .get_acp_session_info("sess-t")
406            .await
407            .unwrap()
408            .unwrap()
409            .updated_at;
410
411        assert!(
412            after > before,
413            "updated_at should increase after event insert: before={before} after={after}"
414        );
415    }
416}