zeph_memory/sqlite/
overflow.rs1use uuid::Uuid;
5
6use crate::error::MemoryError;
7use crate::sqlite::SqliteStore;
8
9impl SqliteStore {
10 pub async fn save_overflow(
16 &self,
17 conversation_id: i64,
18 content: &[u8],
19 ) -> Result<String, MemoryError> {
20 let id = Uuid::new_v4().to_string();
21 let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
22 sqlx::query(
23 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size) \
24 VALUES (?, ?, ?, ?)",
25 )
26 .bind(&id)
27 .bind(conversation_id)
28 .bind(content)
29 .bind(byte_size)
30 .execute(&self.pool)
31 .await?;
32 Ok(id)
33 }
34
35 pub async fn load_overflow(
42 &self,
43 id: &str,
44 conversation_id: i64,
45 ) -> Result<Option<Vec<u8>>, MemoryError> {
46 let row: Option<(Vec<u8>,)> = sqlx::query_as(
47 "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?",
48 )
49 .bind(id)
50 .bind(conversation_id)
51 .fetch_optional(&self.pool)
52 .await?;
53 Ok(row.map(|(content,)| content))
54 }
55
56 pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
63 let result = sqlx::query(
64 "DELETE FROM tool_overflow \
65 WHERE created_at < datetime('now', printf('-%d seconds', ?))",
66 )
67 .bind(max_age_secs.cast_signed())
68 .execute(&self.pool)
69 .await?;
70 Ok(result.rows_affected())
71 }
72
73 pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
79 let total: Option<i64> = sqlx::query_scalar(
80 "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?",
81 )
82 .bind(conversation_id)
83 .fetch_one(&self.pool)
84 .await?;
85 Ok(total.unwrap_or(0).cast_unsigned())
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92
93 async fn make_store() -> (SqliteStore, i64) {
94 let store = SqliteStore::new(":memory:")
95 .await
96 .expect("SqliteStore::new");
97 let cid = store
98 .create_conversation()
99 .await
100 .expect("create_conversation");
101 (store, cid.0)
102 }
103
104 #[tokio::test]
105 async fn save_and_load_roundtrip() {
106 let (store, cid) = make_store().await;
107 let content = b"hello overflow world";
108 let id = store.save_overflow(cid, content).await.expect("save");
109 let loaded = store.load_overflow(&id, cid).await.expect("load");
110 assert_eq!(loaded, Some(content.to_vec()));
111 }
112
113 #[tokio::test]
114 async fn load_missing_returns_none() {
115 let (store, cid) = make_store().await;
116 let loaded = store
117 .load_overflow("00000000-0000-0000-0000-000000000000", cid)
118 .await
119 .expect("load");
120 assert!(loaded.is_none());
121 }
122
123 #[tokio::test]
124 async fn load_wrong_conversation_returns_none() {
125 let (store, cid1) = make_store().await;
126 let cid2 = store
127 .create_conversation()
128 .await
129 .expect("create_conversation")
130 .0;
131 let id = store.save_overflow(cid1, b"secret").await.expect("save");
132 let loaded = store.load_overflow(&id, cid2).await.expect("load");
134 assert!(
135 loaded.is_none(),
136 "overflow entry must not be accessible from a different conversation"
137 );
138 }
139
140 #[tokio::test]
141 async fn overflow_size_empty_returns_zero() {
142 let (store, cid) = make_store().await;
143 let size = store.overflow_size(cid).await.expect("size");
144 assert_eq!(size, 0);
145 }
146
147 #[tokio::test]
148 async fn overflow_size_sums_byte_sizes() {
149 let (store, cid) = make_store().await;
150 store.save_overflow(cid, b"aaa").await.expect("save1");
151 store.save_overflow(cid, b"bb").await.expect("save2");
152 let size = store.overflow_size(cid).await.expect("size");
153 assert_eq!(size, 5);
154 }
155
156 #[tokio::test]
157 async fn cascade_delete_removes_overflow() {
158 let (store, cid) = make_store().await;
159 let id = store.save_overflow(cid, b"data").await.expect("save");
160 sqlx::query("DELETE FROM conversations WHERE id = ?")
162 .bind(cid)
163 .execute(store.pool())
164 .await
165 .expect("delete conversation");
166 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tool_overflow WHERE id = ?")
169 .bind(&id)
170 .fetch_one(store.pool())
171 .await
172 .expect("count");
173 assert_eq!(count, 0, "overflow row should be removed by CASCADE");
174 }
175
176 #[tokio::test]
177 async fn cleanup_removes_old_entries() {
178 let (store, cid) = make_store().await;
179 let id = Uuid::new_v4().to_string();
181 sqlx::query(
182 "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
183 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))",
184 )
185 .bind(&id)
186 .bind(cid)
187 .bind(b"old data".as_slice())
188 .bind(8i64)
189 .execute(store.pool())
190 .await
191 .expect("insert old row");
192
193 let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
195
196 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
197 assert_eq!(deleted, 1, "one old row should be deleted");
198
199 assert!(
200 store
201 .load_overflow(&id, cid)
202 .await
203 .expect("load old")
204 .is_none()
205 );
206 assert!(
207 store
208 .load_overflow(&fresh_id, cid)
209 .await
210 .expect("load fresh")
211 .is_some()
212 );
213 }
214
215 #[tokio::test]
216 async fn cleanup_fresh_entries_not_removed() {
217 let (store, cid) = make_store().await;
218 store.save_overflow(cid, b"a").await.expect("save");
219 store.save_overflow(cid, b"b").await.expect("save");
220 let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
222 assert_eq!(deleted, 0);
223 }
224}