Skip to main content

zeph_memory/store/
overflow.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use uuid::Uuid;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::MemoryError;
9use crate::store::SqliteStore;
10
11impl SqliteStore {
12    /// Save overflow content associated with a conversation, returning the generated UUID.
13    ///
14    /// # Errors
15    ///
16    /// Returns an error if the database insert fails.
17    pub async fn save_overflow(
18        &self,
19        conversation_id: i64,
20        content: &[u8],
21    ) -> Result<String, MemoryError> {
22        let id = Uuid::new_v4().to_string();
23        let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
24        zeph_db::query(sql!(
25            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
26             VALUES (?, ?, ?, ?, 'overflow')"
27        ))
28        .bind(&id)
29        .bind(conversation_id)
30        .bind(content)
31        .bind(byte_size)
32        .execute(&self.pool)
33        .await?;
34        Ok(id)
35    }
36
37    /// Save a Memex compaction-time archive, returning the generated UUID.
38    ///
39    /// Archives use `archive_type = 'archive'` and are excluded from the short-lived
40    /// `cleanup_overflow()` job. They persist as long as the conversation exists.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the database insert fails.
45    pub async fn save_archive(
46        &self,
47        conversation_id: i64,
48        content: &[u8],
49    ) -> Result<String, MemoryError> {
50        let id = Uuid::new_v4().to_string();
51        let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
52        zeph_db::query(sql!(
53            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
54             VALUES (?, ?, ?, ?, 'archive')"
55        ))
56        .bind(&id)
57        .bind(conversation_id)
58        .bind(content)
59        .bind(byte_size)
60        .execute(&self.pool)
61        .await?;
62        Ok(id)
63    }
64
65    /// Load overflow content by UUID, scoped to the given conversation.
66    /// Returns `None` if the entry does not exist or belongs to a different conversation.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the database query fails.
71    pub async fn load_overflow(
72        &self,
73        id: &str,
74        conversation_id: i64,
75    ) -> Result<Option<Vec<u8>>, MemoryError> {
76        let row: Option<(Vec<u8>,)> = zeph_db::query_as(sql!(
77            "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?"
78        ))
79        .bind(id)
80        .bind(conversation_id)
81        .fetch_optional(&self.pool)
82        .await?;
83        Ok(row.map(|(content,)| content))
84    }
85
86    /// Delete execution-time overflow entries (`archive_type = 'overflow'`) older than
87    /// `max_age_secs` seconds. Compaction-time archives (`archive_type = 'archive'`) are
88    /// intentionally excluded — they persist for the lifetime of the conversation.
89    ///
90    /// Returns the number of deleted rows.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the database delete fails.
95    pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
96        let result = zeph_db::query(sql!(
97            "DELETE FROM tool_overflow \
98             WHERE archive_type = 'overflow' \
99             AND created_at < datetime('now', printf('-%d seconds', ?))"
100        ))
101        .bind(max_age_secs.cast_signed())
102        .execute(&self.pool)
103        .await?;
104        Ok(result.rows_affected())
105    }
106
107    /// Return total overflow bytes stored for a conversation.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if the database query fails.
112    pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
113        let total: Option<i64> = zeph_db::query_scalar(sql!(
114            "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?"
115        ))
116        .bind(conversation_id)
117        .fetch_one(&self.pool)
118        .await?;
119        Ok(total.unwrap_or(0).cast_unsigned())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    async fn make_store() -> (SqliteStore, i64) {
128        let store = SqliteStore::new(":memory:")
129            .await
130            .expect("SqliteStore::new");
131        let cid = store
132            .create_conversation()
133            .await
134            .expect("create_conversation");
135        (store, cid.0)
136    }
137
138    #[tokio::test]
139    async fn save_and_load_roundtrip() {
140        let (store, cid) = make_store().await;
141        let content = b"hello overflow world";
142        let id = store.save_overflow(cid, content).await.expect("save");
143        let loaded = store.load_overflow(&id, cid).await.expect("load");
144        assert_eq!(loaded, Some(content.to_vec()));
145    }
146
147    #[tokio::test]
148    async fn load_missing_returns_none() {
149        let (store, cid) = make_store().await;
150        let loaded = store
151            .load_overflow("00000000-0000-0000-0000-000000000000", cid)
152            .await
153            .expect("load");
154        assert!(loaded.is_none());
155    }
156
157    #[tokio::test]
158    async fn load_wrong_conversation_returns_none() {
159        let (store, cid1) = make_store().await;
160        let cid2 = store
161            .create_conversation()
162            .await
163            .expect("create_conversation")
164            .0;
165        let id = store.save_overflow(cid1, b"secret").await.expect("save");
166        // Loading with a different conversation_id must return None.
167        let loaded = store.load_overflow(&id, cid2).await.expect("load");
168        assert!(
169            loaded.is_none(),
170            "overflow entry must not be accessible from a different conversation"
171        );
172    }
173
174    #[tokio::test]
175    async fn overflow_size_empty_returns_zero() {
176        let (store, cid) = make_store().await;
177        let size = store.overflow_size(cid).await.expect("size");
178        assert_eq!(size, 0);
179    }
180
181    #[tokio::test]
182    async fn overflow_size_sums_byte_sizes() {
183        let (store, cid) = make_store().await;
184        store.save_overflow(cid, b"aaa").await.expect("save1");
185        store.save_overflow(cid, b"bb").await.expect("save2");
186        let size = store.overflow_size(cid).await.expect("size");
187        assert_eq!(size, 5);
188    }
189
190    #[tokio::test]
191    async fn cascade_delete_removes_overflow() {
192        let (store, cid) = make_store().await;
193        let id = store.save_overflow(cid, b"data").await.expect("save");
194        // Delete the conversation — overflow should cascade.
195        zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
196            .bind(cid)
197            .execute(store.pool())
198            .await
199            .expect("delete conversation");
200        // Use a fresh store to load by id only — conversation is gone, use id=0 (will miss).
201        // Verify via direct SQL that the row is gone.
202        let count: i64 =
203            zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM tool_overflow WHERE id = ?"))
204                .bind(&id)
205                .fetch_one(store.pool())
206                .await
207                .expect("count");
208        assert_eq!(count, 0, "overflow row should be removed by CASCADE");
209    }
210
211    #[tokio::test]
212    async fn cleanup_removes_old_entries() {
213        let (store, cid) = make_store().await;
214        // Insert a row with an old timestamp.
215        let id = Uuid::new_v4().to_string();
216        zeph_db::query(sql!(
217            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
218             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
219        ))
220        .bind(&id)
221        .bind(cid)
222        .bind(b"old data".as_slice())
223        .bind(8i64)
224        .execute(store.pool())
225        .await
226        .expect("insert old row");
227
228        // Insert a fresh row.
229        let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
230
231        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
232        assert_eq!(deleted, 1, "one old row should be deleted");
233
234        assert!(
235            store
236                .load_overflow(&id, cid)
237                .await
238                .expect("load old")
239                .is_none()
240        );
241        assert!(
242            store
243                .load_overflow(&fresh_id, cid)
244                .await
245                .expect("load fresh")
246                .is_some()
247        );
248    }
249
250    #[tokio::test]
251    async fn cleanup_fresh_entries_not_removed() {
252        let (store, cid) = make_store().await;
253        store.save_overflow(cid, b"a").await.expect("save");
254        store.save_overflow(cid, b"b").await.expect("save");
255        // Cleanup with 1 day retention — fresh entries should not be removed.
256        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
257        assert_eq!(deleted, 0);
258    }
259
260    #[tokio::test]
261    async fn save_archive_and_load_roundtrip() {
262        let (store, cid) = make_store().await;
263        let content = b"archived tool output body";
264        let id = store
265            .save_archive(cid, content)
266            .await
267            .expect("save_archive");
268        // Archives are stored in the same table and loadable by the same API.
269        let loaded = store.load_overflow(&id, cid).await.expect("load");
270        assert_eq!(loaded, Some(content.to_vec()));
271    }
272
273    #[tokio::test]
274    async fn cleanup_does_not_remove_old_archives() {
275        let (store, cid) = make_store().await;
276        // Insert a very old archive-type row directly.
277        let archive_id = Uuid::new_v4().to_string();
278        zeph_db::query(sql!(
279            "INSERT INTO tool_overflow \
280             (id, conversation_id, content, byte_size, archive_type, created_at) \
281             VALUES (?, ?, ?, ?, 'archive', datetime('now', '-30 days'))"
282        ))
283        .bind(&archive_id)
284        .bind(cid)
285        .bind(b"old archive".as_slice())
286        .bind(11i64)
287        .execute(store.pool())
288        .await
289        .expect("insert old archive");
290
291        // Insert an old overflow-type row — this should be cleaned up.
292        let overflow_id = Uuid::new_v4().to_string();
293        zeph_db::query(sql!(
294            "INSERT INTO tool_overflow \
295             (id, conversation_id, content, byte_size, archive_type, created_at) \
296             VALUES (?, ?, ?, ?, 'overflow', datetime('now', '-30 days'))"
297        ))
298        .bind(&overflow_id)
299        .bind(cid)
300        .bind(b"old overflow".as_slice())
301        .bind(12i64)
302        .execute(store.pool())
303        .await
304        .expect("insert old overflow");
305
306        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
307        assert_eq!(deleted, 1, "only the overflow-type row should be deleted");
308
309        // Archive must still be retrievable.
310        assert!(
311            store
312                .load_overflow(&archive_id, cid)
313                .await
314                .expect("load archive")
315                .is_some(),
316            "archive must not be removed by cleanup"
317        );
318        // Overflow must be gone.
319        assert!(
320            store
321                .load_overflow(&overflow_id, cid)
322                .await
323                .expect("load overflow")
324                .is_none(),
325            "old overflow must be removed by cleanup"
326        );
327    }
328}