Skip to main content

zeph_memory/store/
overflow.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use uuid::Uuid;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::MemoryError;
9use crate::store::SqliteStore;
10
11impl SqliteStore {
12    /// Save overflow content associated with a conversation, returning the generated UUID.
13    ///
14    /// # Errors
15    ///
16    /// Returns an error if the database insert fails.
17    pub async fn save_overflow(
18        &self,
19        conversation_id: i64,
20        content: &[u8],
21    ) -> Result<String, MemoryError> {
22        let id = Uuid::new_v4().to_string();
23        let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
24        zeph_db::query(sql!(
25            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size) \
26             VALUES (?, ?, ?, ?)"
27        ))
28        .bind(&id)
29        .bind(conversation_id)
30        .bind(content)
31        .bind(byte_size)
32        .execute(&self.pool)
33        .await?;
34        Ok(id)
35    }
36
37    /// Load overflow content by UUID, scoped to the given conversation.
38    /// Returns `None` if the entry does not exist or belongs to a different conversation.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if the database query fails.
43    pub async fn load_overflow(
44        &self,
45        id: &str,
46        conversation_id: i64,
47    ) -> Result<Option<Vec<u8>>, MemoryError> {
48        let row: Option<(Vec<u8>,)> = zeph_db::query_as(sql!(
49            "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?"
50        ))
51        .bind(id)
52        .bind(conversation_id)
53        .fetch_optional(&self.pool)
54        .await?;
55        Ok(row.map(|(content,)| content))
56    }
57
58    /// Delete overflow entries older than `max_age_secs` seconds.
59    /// Returns the number of deleted rows.
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if the database delete fails.
64    pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
65        let result = zeph_db::query(sql!(
66            "DELETE FROM tool_overflow \
67             WHERE created_at < datetime('now', printf('-%d seconds', ?))"
68        ))
69        .bind(max_age_secs.cast_signed())
70        .execute(&self.pool)
71        .await?;
72        Ok(result.rows_affected())
73    }
74
75    /// Return total overflow bytes stored for a conversation.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the database query fails.
80    pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
81        let total: Option<i64> = zeph_db::query_scalar(sql!(
82            "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?"
83        ))
84        .bind(conversation_id)
85        .fetch_one(&self.pool)
86        .await?;
87        Ok(total.unwrap_or(0).cast_unsigned())
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    async fn make_store() -> (SqliteStore, i64) {
96        let store = SqliteStore::new(":memory:")
97            .await
98            .expect("SqliteStore::new");
99        let cid = store
100            .create_conversation()
101            .await
102            .expect("create_conversation");
103        (store, cid.0)
104    }
105
106    #[tokio::test]
107    async fn save_and_load_roundtrip() {
108        let (store, cid) = make_store().await;
109        let content = b"hello overflow world";
110        let id = store.save_overflow(cid, content).await.expect("save");
111        let loaded = store.load_overflow(&id, cid).await.expect("load");
112        assert_eq!(loaded, Some(content.to_vec()));
113    }
114
115    #[tokio::test]
116    async fn load_missing_returns_none() {
117        let (store, cid) = make_store().await;
118        let loaded = store
119            .load_overflow("00000000-0000-0000-0000-000000000000", cid)
120            .await
121            .expect("load");
122        assert!(loaded.is_none());
123    }
124
125    #[tokio::test]
126    async fn load_wrong_conversation_returns_none() {
127        let (store, cid1) = make_store().await;
128        let cid2 = store
129            .create_conversation()
130            .await
131            .expect("create_conversation")
132            .0;
133        let id = store.save_overflow(cid1, b"secret").await.expect("save");
134        // Loading with a different conversation_id must return None.
135        let loaded = store.load_overflow(&id, cid2).await.expect("load");
136        assert!(
137            loaded.is_none(),
138            "overflow entry must not be accessible from a different conversation"
139        );
140    }
141
142    #[tokio::test]
143    async fn overflow_size_empty_returns_zero() {
144        let (store, cid) = make_store().await;
145        let size = store.overflow_size(cid).await.expect("size");
146        assert_eq!(size, 0);
147    }
148
149    #[tokio::test]
150    async fn overflow_size_sums_byte_sizes() {
151        let (store, cid) = make_store().await;
152        store.save_overflow(cid, b"aaa").await.expect("save1");
153        store.save_overflow(cid, b"bb").await.expect("save2");
154        let size = store.overflow_size(cid).await.expect("size");
155        assert_eq!(size, 5);
156    }
157
158    #[tokio::test]
159    async fn cascade_delete_removes_overflow() {
160        let (store, cid) = make_store().await;
161        let id = store.save_overflow(cid, b"data").await.expect("save");
162        // Delete the conversation — overflow should cascade.
163        zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
164            .bind(cid)
165            .execute(store.pool())
166            .await
167            .expect("delete conversation");
168        // Use a fresh store to load by id only — conversation is gone, use id=0 (will miss).
169        // Verify via direct SQL that the row is gone.
170        let count: i64 =
171            zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM tool_overflow WHERE id = ?"))
172                .bind(&id)
173                .fetch_one(store.pool())
174                .await
175                .expect("count");
176        assert_eq!(count, 0, "overflow row should be removed by CASCADE");
177    }
178
179    #[tokio::test]
180    async fn cleanup_removes_old_entries() {
181        let (store, cid) = make_store().await;
182        // Insert a row with an old timestamp.
183        let id = Uuid::new_v4().to_string();
184        zeph_db::query(sql!(
185            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
186             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
187        ))
188        .bind(&id)
189        .bind(cid)
190        .bind(b"old data".as_slice())
191        .bind(8i64)
192        .execute(store.pool())
193        .await
194        .expect("insert old row");
195
196        // Insert a fresh row.
197        let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
198
199        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
200        assert_eq!(deleted, 1, "one old row should be deleted");
201
202        assert!(
203            store
204                .load_overflow(&id, cid)
205                .await
206                .expect("load old")
207                .is_none()
208        );
209        assert!(
210            store
211                .load_overflow(&fresh_id, cid)
212                .await
213                .expect("load fresh")
214                .is_some()
215        );
216    }
217
218    #[tokio::test]
219    async fn cleanup_fresh_entries_not_removed() {
220        let (store, cid) = make_store().await;
221        store.save_overflow(cid, b"a").await.expect("save");
222        store.save_overflow(cid, b"b").await.expect("save");
223        // Cleanup with 1 day retention — fresh entries should not be removed.
224        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
225        assert_eq!(deleted, 0);
226    }
227}