1use uuid::Uuid;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::MemoryError;
9use crate::store::SqliteStore;
10
11impl SqliteStore {
12 pub async fn save_overflow(
18 &self,
19 conversation_id: i64,
20 content: &[u8],
21 ) -> Result<String, MemoryError> {
22 let id = Uuid::new_v4().to_string();
23 let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
24 zeph_db::query(sql!(
25 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
26 VALUES (?, ?, ?, ?, 'overflow')"
27 ))
28 .bind(&id)
29 .bind(conversation_id)
30 .bind(content)
31 .bind(byte_size)
32 .execute(&self.pool)
33 .await?;
34 Ok(id)
35 }
36
37 pub async fn save_archive(
46 &self,
47 conversation_id: i64,
48 content: &[u8],
49 ) -> Result<String, MemoryError> {
50 let id = Uuid::new_v4().to_string();
51 let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
52 zeph_db::query(sql!(
53 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
54 VALUES (?, ?, ?, ?, 'archive')"
55 ))
56 .bind(&id)
57 .bind(conversation_id)
58 .bind(content)
59 .bind(byte_size)
60 .execute(&self.pool)
61 .await?;
62 Ok(id)
63 }
64
65 pub async fn load_overflow(
72 &self,
73 id: &str,
74 conversation_id: i64,
75 ) -> Result<Option<Vec<u8>>, MemoryError> {
76 let row: Option<(Vec<u8>,)> = zeph_db::query_as(sql!(
77 "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?"
78 ))
79 .bind(id)
80 .bind(conversation_id)
81 .fetch_optional(&self.pool)
82 .await?;
83 Ok(row.map(|(content,)| content))
84 }
85
86 pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
96 let result = zeph_db::query(sql!(
97 "DELETE FROM tool_overflow \
98 WHERE archive_type = 'overflow' \
99 AND created_at < datetime('now', printf('-%d seconds', ?))"
100 ))
101 .bind(max_age_secs.cast_signed())
102 .execute(&self.pool)
103 .await?;
104 Ok(result.rows_affected())
105 }
106
107 pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
113 let total: Option<i64> = zeph_db::query_scalar(sql!(
114 "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?"
115 ))
116 .bind(conversation_id)
117 .fetch_one(&self.pool)
118 .await?;
119 Ok(total.unwrap_or(0).cast_unsigned())
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 async fn make_store() -> (SqliteStore, i64) {
128 let store = SqliteStore::new(":memory:")
129 .await
130 .expect("SqliteStore::new");
131 let cid = store
132 .create_conversation()
133 .await
134 .expect("create_conversation");
135 (store, cid.0)
136 }
137
138 #[tokio::test]
139 async fn save_and_load_roundtrip() {
140 let (store, cid) = make_store().await;
141 let content = b"hello overflow world";
142 let id = store.save_overflow(cid, content).await.expect("save");
143 let loaded = store.load_overflow(&id, cid).await.expect("load");
144 assert_eq!(loaded, Some(content.to_vec()));
145 }
146
147 #[tokio::test]
148 async fn load_missing_returns_none() {
149 let (store, cid) = make_store().await;
150 let loaded = store
151 .load_overflow("00000000-0000-0000-0000-000000000000", cid)
152 .await
153 .expect("load");
154 assert!(loaded.is_none());
155 }
156
157 #[tokio::test]
158 async fn load_wrong_conversation_returns_none() {
159 let (store, cid1) = make_store().await;
160 let cid2 = store
161 .create_conversation()
162 .await
163 .expect("create_conversation")
164 .0;
165 let id = store.save_overflow(cid1, b"secret").await.expect("save");
166 let loaded = store.load_overflow(&id, cid2).await.expect("load");
168 assert!(
169 loaded.is_none(),
170 "overflow entry must not be accessible from a different conversation"
171 );
172 }
173
174 #[tokio::test]
175 async fn overflow_size_empty_returns_zero() {
176 let (store, cid) = make_store().await;
177 let size = store.overflow_size(cid).await.expect("size");
178 assert_eq!(size, 0);
179 }
180
181 #[tokio::test]
182 async fn overflow_size_sums_byte_sizes() {
183 let (store, cid) = make_store().await;
184 store.save_overflow(cid, b"aaa").await.expect("save1");
185 store.save_overflow(cid, b"bb").await.expect("save2");
186 let size = store.overflow_size(cid).await.expect("size");
187 assert_eq!(size, 5);
188 }
189
190 #[tokio::test]
191 async fn cascade_delete_removes_overflow() {
192 let (store, cid) = make_store().await;
193 let id = store.save_overflow(cid, b"data").await.expect("save");
194 zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
196 .bind(cid)
197 .execute(store.pool())
198 .await
199 .expect("delete conversation");
200 let count: i64 =
203 zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM tool_overflow WHERE id = ?"))
204 .bind(&id)
205 .fetch_one(store.pool())
206 .await
207 .expect("count");
208 assert_eq!(count, 0, "overflow row should be removed by CASCADE");
209 }
210
211 #[tokio::test]
212 async fn cleanup_removes_old_entries() {
213 let (store, cid) = make_store().await;
214 let id = Uuid::new_v4().to_string();
216 zeph_db::query(sql!(
217 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
218 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
219 ))
220 .bind(&id)
221 .bind(cid)
222 .bind(b"old data".as_slice())
223 .bind(8i64)
224 .execute(store.pool())
225 .await
226 .expect("insert old row");
227
228 let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
230
231 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
232 assert_eq!(deleted, 1, "one old row should be deleted");
233
234 assert!(
235 store
236 .load_overflow(&id, cid)
237 .await
238 .expect("load old")
239 .is_none()
240 );
241 assert!(
242 store
243 .load_overflow(&fresh_id, cid)
244 .await
245 .expect("load fresh")
246 .is_some()
247 );
248 }
249
250 #[tokio::test]
251 async fn cleanup_fresh_entries_not_removed() {
252 let (store, cid) = make_store().await;
253 store.save_overflow(cid, b"a").await.expect("save");
254 store.save_overflow(cid, b"b").await.expect("save");
255 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
257 assert_eq!(deleted, 0);
258 }
259
260 #[tokio::test]
261 async fn save_archive_and_load_roundtrip() {
262 let (store, cid) = make_store().await;
263 let content = b"archived tool output body";
264 let id = store
265 .save_archive(cid, content)
266 .await
267 .expect("save_archive");
268 let loaded = store.load_overflow(&id, cid).await.expect("load");
270 assert_eq!(loaded, Some(content.to_vec()));
271 }
272
273 #[tokio::test]
274 async fn cleanup_does_not_remove_old_archives() {
275 let (store, cid) = make_store().await;
276 let archive_id = Uuid::new_v4().to_string();
278 zeph_db::query(sql!(
279 "INSERT INTO tool_overflow \
280 (id, conversation_id, content, byte_size, archive_type, created_at) \
281 VALUES (?, ?, ?, ?, 'archive', datetime('now', '-30 days'))"
282 ))
283 .bind(&archive_id)
284 .bind(cid)
285 .bind(b"old archive".as_slice())
286 .bind(11i64)
287 .execute(store.pool())
288 .await
289 .expect("insert old archive");
290
291 let overflow_id = Uuid::new_v4().to_string();
293 zeph_db::query(sql!(
294 "INSERT INTO tool_overflow \
295 (id, conversation_id, content, byte_size, archive_type, created_at) \
296 VALUES (?, ?, ?, ?, 'overflow', datetime('now', '-30 days'))"
297 ))
298 .bind(&overflow_id)
299 .bind(cid)
300 .bind(b"old overflow".as_slice())
301 .bind(12i64)
302 .execute(store.pool())
303 .await
304 .expect("insert old overflow");
305
306 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
307 assert_eq!(deleted, 1, "only the overflow-type row should be deleted");
308
309 assert!(
311 store
312 .load_overflow(&archive_id, cid)
313 .await
314 .expect("load archive")
315 .is_some(),
316 "archive must not be removed by cleanup"
317 );
318 assert!(
320 store
321 .load_overflow(&overflow_id, cid)
322 .await
323 .expect("load overflow")
324 .is_none(),
325 "old overflow must be removed by cleanup"
326 );
327 }
328}