zeph_memory/store/
overflow.rs1use uuid::Uuid;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::MemoryError;
9use crate::store::SqliteStore;
10
11impl SqliteStore {
12 pub async fn save_overflow(
18 &self,
19 conversation_id: i64,
20 content: &[u8],
21 ) -> Result<String, MemoryError> {
22 let id = Uuid::new_v4().to_string();
23 let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
24 zeph_db::query(sql!(
25 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size) \
26 VALUES (?, ?, ?, ?)"
27 ))
28 .bind(&id)
29 .bind(conversation_id)
30 .bind(content)
31 .bind(byte_size)
32 .execute(&self.pool)
33 .await?;
34 Ok(id)
35 }
36
37 pub async fn load_overflow(
44 &self,
45 id: &str,
46 conversation_id: i64,
47 ) -> Result<Option<Vec<u8>>, MemoryError> {
48 let row: Option<(Vec<u8>,)> = zeph_db::query_as(sql!(
49 "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?"
50 ))
51 .bind(id)
52 .bind(conversation_id)
53 .fetch_optional(&self.pool)
54 .await?;
55 Ok(row.map(|(content,)| content))
56 }
57
58 pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
65 let result = zeph_db::query(sql!(
66 "DELETE FROM tool_overflow \
67 WHERE created_at < datetime('now', printf('-%d seconds', ?))"
68 ))
69 .bind(max_age_secs.cast_signed())
70 .execute(&self.pool)
71 .await?;
72 Ok(result.rows_affected())
73 }
74
75 pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
81 let total: Option<i64> = zeph_db::query_scalar(sql!(
82 "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?"
83 ))
84 .bind(conversation_id)
85 .fetch_one(&self.pool)
86 .await?;
87 Ok(total.unwrap_or(0).cast_unsigned())
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94
95 async fn make_store() -> (SqliteStore, i64) {
96 let store = SqliteStore::new(":memory:")
97 .await
98 .expect("SqliteStore::new");
99 let cid = store
100 .create_conversation()
101 .await
102 .expect("create_conversation");
103 (store, cid.0)
104 }
105
106 #[tokio::test]
107 async fn save_and_load_roundtrip() {
108 let (store, cid) = make_store().await;
109 let content = b"hello overflow world";
110 let id = store.save_overflow(cid, content).await.expect("save");
111 let loaded = store.load_overflow(&id, cid).await.expect("load");
112 assert_eq!(loaded, Some(content.to_vec()));
113 }
114
115 #[tokio::test]
116 async fn load_missing_returns_none() {
117 let (store, cid) = make_store().await;
118 let loaded = store
119 .load_overflow("00000000-0000-0000-0000-000000000000", cid)
120 .await
121 .expect("load");
122 assert!(loaded.is_none());
123 }
124
125 #[tokio::test]
126 async fn load_wrong_conversation_returns_none() {
127 let (store, cid1) = make_store().await;
128 let cid2 = store
129 .create_conversation()
130 .await
131 .expect("create_conversation")
132 .0;
133 let id = store.save_overflow(cid1, b"secret").await.expect("save");
134 let loaded = store.load_overflow(&id, cid2).await.expect("load");
136 assert!(
137 loaded.is_none(),
138 "overflow entry must not be accessible from a different conversation"
139 );
140 }
141
142 #[tokio::test]
143 async fn overflow_size_empty_returns_zero() {
144 let (store, cid) = make_store().await;
145 let size = store.overflow_size(cid).await.expect("size");
146 assert_eq!(size, 0);
147 }
148
149 #[tokio::test]
150 async fn overflow_size_sums_byte_sizes() {
151 let (store, cid) = make_store().await;
152 store.save_overflow(cid, b"aaa").await.expect("save1");
153 store.save_overflow(cid, b"bb").await.expect("save2");
154 let size = store.overflow_size(cid).await.expect("size");
155 assert_eq!(size, 5);
156 }
157
158 #[tokio::test]
159 async fn cascade_delete_removes_overflow() {
160 let (store, cid) = make_store().await;
161 let id = store.save_overflow(cid, b"data").await.expect("save");
162 zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
164 .bind(cid)
165 .execute(store.pool())
166 .await
167 .expect("delete conversation");
168 let count: i64 =
171 zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM tool_overflow WHERE id = ?"))
172 .bind(&id)
173 .fetch_one(store.pool())
174 .await
175 .expect("count");
176 assert_eq!(count, 0, "overflow row should be removed by CASCADE");
177 }
178
179 #[tokio::test]
180 async fn cleanup_removes_old_entries() {
181 let (store, cid) = make_store().await;
182 let id = Uuid::new_v4().to_string();
184 zeph_db::query(sql!(
185 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
186 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
187 ))
188 .bind(&id)
189 .bind(cid)
190 .bind(b"old data".as_slice())
191 .bind(8i64)
192 .execute(store.pool())
193 .await
194 .expect("insert old row");
195
196 let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
198
199 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
200 assert_eq!(deleted, 1, "one old row should be deleted");
201
202 assert!(
203 store
204 .load_overflow(&id, cid)
205 .await
206 .expect("load old")
207 .is_none()
208 );
209 assert!(
210 store
211 .load_overflow(&fresh_id, cid)
212 .await
213 .expect("load fresh")
214 .is_some()
215 );
216 }
217
218 #[tokio::test]
219 async fn cleanup_fresh_entries_not_removed() {
220 let (store, cid) = make_store().await;
221 store.save_overflow(cid, b"a").await.expect("save");
222 store.save_overflow(cid, b"b").await.expect("save");
223 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
225 assert_eq!(deleted, 0);
226 }
227}