Skip to main content

zeph_memory/sqlite/
overflow.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use uuid::Uuid;
5
6use crate::error::MemoryError;
7use crate::sqlite::SqliteStore;
8
9impl SqliteStore {
10    /// Save overflow content associated with a conversation, returning the generated UUID.
11    ///
12    /// # Errors
13    ///
14    /// Returns an error if the database insert fails.
15    pub async fn save_overflow(
16        &self,
17        conversation_id: i64,
18        content: &[u8],
19    ) -> Result<String, MemoryError> {
20        let id = Uuid::new_v4().to_string();
21        let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
22        sqlx::query(
23            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size) \
24             VALUES (?, ?, ?, ?)",
25        )
26        .bind(&id)
27        .bind(conversation_id)
28        .bind(content)
29        .bind(byte_size)
30        .execute(&self.pool)
31        .await?;
32        Ok(id)
33    }
34
35    /// Load overflow content by UUID, scoped to the given conversation.
36    /// Returns `None` if the entry does not exist or belongs to a different conversation.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the database query fails.
41    pub async fn load_overflow(
42        &self,
43        id: &str,
44        conversation_id: i64,
45    ) -> Result<Option<Vec<u8>>, MemoryError> {
46        let row: Option<(Vec<u8>,)> = sqlx::query_as(
47            "SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?",
48        )
49        .bind(id)
50        .bind(conversation_id)
51        .fetch_optional(&self.pool)
52        .await?;
53        Ok(row.map(|(content,)| content))
54    }
55
56    /// Delete overflow entries older than `max_age_secs` seconds.
57    /// Returns the number of deleted rows.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the database delete fails.
62    pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
63        let result = sqlx::query(
64            "DELETE FROM tool_overflow \
65             WHERE created_at < datetime('now', printf('-%d seconds', ?))",
66        )
67        .bind(max_age_secs.cast_signed())
68        .execute(&self.pool)
69        .await?;
70        Ok(result.rows_affected())
71    }
72
73    /// Return total overflow bytes stored for a conversation.
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the database query fails.
78    pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
79        let total: Option<i64> = sqlx::query_scalar(
80            "SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?",
81        )
82        .bind(conversation_id)
83        .fetch_one(&self.pool)
84        .await?;
85        Ok(total.unwrap_or(0).cast_unsigned())
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    async fn make_store() -> (SqliteStore, i64) {
94        let store = SqliteStore::new(":memory:")
95            .await
96            .expect("SqliteStore::new");
97        let cid = store
98            .create_conversation()
99            .await
100            .expect("create_conversation");
101        (store, cid.0)
102    }
103
104    #[tokio::test]
105    async fn save_and_load_roundtrip() {
106        let (store, cid) = make_store().await;
107        let content = b"hello overflow world";
108        let id = store.save_overflow(cid, content).await.expect("save");
109        let loaded = store.load_overflow(&id, cid).await.expect("load");
110        assert_eq!(loaded, Some(content.to_vec()));
111    }
112
113    #[tokio::test]
114    async fn load_missing_returns_none() {
115        let (store, cid) = make_store().await;
116        let loaded = store
117            .load_overflow("00000000-0000-0000-0000-000000000000", cid)
118            .await
119            .expect("load");
120        assert!(loaded.is_none());
121    }
122
123    #[tokio::test]
124    async fn load_wrong_conversation_returns_none() {
125        let (store, cid1) = make_store().await;
126        let cid2 = store
127            .create_conversation()
128            .await
129            .expect("create_conversation")
130            .0;
131        let id = store.save_overflow(cid1, b"secret").await.expect("save");
132        // Loading with a different conversation_id must return None.
133        let loaded = store.load_overflow(&id, cid2).await.expect("load");
134        assert!(
135            loaded.is_none(),
136            "overflow entry must not be accessible from a different conversation"
137        );
138    }
139
140    #[tokio::test]
141    async fn overflow_size_empty_returns_zero() {
142        let (store, cid) = make_store().await;
143        let size = store.overflow_size(cid).await.expect("size");
144        assert_eq!(size, 0);
145    }
146
147    #[tokio::test]
148    async fn overflow_size_sums_byte_sizes() {
149        let (store, cid) = make_store().await;
150        store.save_overflow(cid, b"aaa").await.expect("save1");
151        store.save_overflow(cid, b"bb").await.expect("save2");
152        let size = store.overflow_size(cid).await.expect("size");
153        assert_eq!(size, 5);
154    }
155
156    #[tokio::test]
157    async fn cascade_delete_removes_overflow() {
158        let (store, cid) = make_store().await;
159        let id = store.save_overflow(cid, b"data").await.expect("save");
160        // Delete the conversation — overflow should cascade.
161        sqlx::query("DELETE FROM conversations WHERE id = ?")
162            .bind(cid)
163            .execute(store.pool())
164            .await
165            .expect("delete conversation");
166        // Use a fresh store to load by id only — conversation is gone, use id=0 (will miss).
167        // Verify via direct SQL that the row is gone.
168        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tool_overflow WHERE id = ?")
169            .bind(&id)
170            .fetch_one(store.pool())
171            .await
172            .expect("count");
173        assert_eq!(count, 0, "overflow row should be removed by CASCADE");
174    }
175
176    #[tokio::test]
177    async fn cleanup_removes_old_entries() {
178        let (store, cid) = make_store().await;
179        // Insert a row with an old timestamp.
180        let id = Uuid::new_v4().to_string();
181        sqlx::query(
182            "INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
183             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))",
184        )
185        .bind(&id)
186        .bind(cid)
187        .bind(b"old data".as_slice())
188        .bind(8i64)
189        .execute(store.pool())
190        .await
191        .expect("insert old row");
192
193        // Insert a fresh row.
194        let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
195
196        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
197        assert_eq!(deleted, 1, "one old row should be deleted");
198
199        assert!(
200            store
201                .load_overflow(&id, cid)
202                .await
203                .expect("load old")
204                .is_none()
205        );
206        assert!(
207            store
208                .load_overflow(&fresh_id, cid)
209                .await
210                .expect("load fresh")
211                .is_some()
212        );
213    }
214
215    #[tokio::test]
216    async fn cleanup_fresh_entries_not_removed() {
217        let (store, cid) = make_store().await;
218        store.save_overflow(cid, b"a").await.expect("save");
219        store.save_overflow(cid, b"b").await.expect("save");
220        // Cleanup with 1 day retention — fresh entries should not be removed.
221        let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
222        assert_eq!(deleted, 0);
223    }
224}