use uuid::Uuid;
#[allow(unused_imports)]
use zeph_db::sql;
use crate::error::MemoryError;
use crate::store::SqliteStore;
impl SqliteStore {
pub async fn save_overflow(
&self,
conversation_id: i64,
content: &[u8],
) -> Result<String, MemoryError> {
let id = Uuid::new_v4().to_string();
let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
zeph_db::query(sql!(
"INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
VALUES (?, ?, ?, ?, 'overflow')"
))
.bind(&id)
.bind(conversation_id)
.bind(content)
.bind(byte_size)
.execute(&self.pool)
.await?;
Ok(id)
}
pub async fn save_archive(
&self,
conversation_id: i64,
content: &[u8],
) -> Result<String, MemoryError> {
let id = Uuid::new_v4().to_string();
let byte_size = i64::try_from(content.len()).unwrap_or(i64::MAX);
zeph_db::query(sql!(
"INSERT INTO tool_overflow (id, conversation_id, content, byte_size, archive_type) \
VALUES (?, ?, ?, ?, 'archive')"
))
.bind(&id)
.bind(conversation_id)
.bind(content)
.bind(byte_size)
.execute(&self.pool)
.await?;
Ok(id)
}
pub async fn load_overflow(
&self,
id: &str,
conversation_id: i64,
) -> Result<Option<Vec<u8>>, MemoryError> {
let row: Option<(Vec<u8>,)> = zeph_db::query_as(sql!(
"SELECT content FROM tool_overflow WHERE id = ? AND conversation_id = ?"
))
.bind(id)
.bind(conversation_id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(content,)| content))
}
pub async fn cleanup_overflow(&self, max_age_secs: u64) -> Result<u64, MemoryError> {
let result = zeph_db::query(sql!(
"DELETE FROM tool_overflow \
WHERE archive_type = 'overflow' \
AND created_at < datetime('now', printf('-%d seconds', ?))"
))
.bind(max_age_secs.cast_signed())
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn overflow_size(&self, conversation_id: i64) -> Result<u64, MemoryError> {
let total: Option<i64> = zeph_db::query_scalar(sql!(
"SELECT COALESCE(SUM(byte_size), 0) FROM tool_overflow WHERE conversation_id = ?"
))
.bind(conversation_id)
.fetch_one(&self.pool)
.await?;
Ok(total.unwrap_or(0).cast_unsigned())
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn make_store() -> (SqliteStore, i64) {
let store = SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new");
let cid = store
.create_conversation()
.await
.expect("create_conversation");
(store, cid.0)
}
#[tokio::test]
async fn save_and_load_roundtrip() {
let (store, cid) = make_store().await;
let content = b"hello overflow world";
let id = store.save_overflow(cid, content).await.expect("save");
let loaded = store.load_overflow(&id, cid).await.expect("load");
assert_eq!(loaded, Some(content.to_vec()));
}
#[tokio::test]
async fn load_missing_returns_none() {
let (store, cid) = make_store().await;
let loaded = store
.load_overflow("00000000-0000-0000-0000-000000000000", cid)
.await
.expect("load");
assert!(loaded.is_none());
}
#[tokio::test]
async fn load_wrong_conversation_returns_none() {
let (store, cid1) = make_store().await;
let cid2 = store
.create_conversation()
.await
.expect("create_conversation")
.0;
let id = store.save_overflow(cid1, b"secret").await.expect("save");
let loaded = store.load_overflow(&id, cid2).await.expect("load");
assert!(
loaded.is_none(),
"overflow entry must not be accessible from a different conversation"
);
}
#[tokio::test]
async fn overflow_size_empty_returns_zero() {
let (store, cid) = make_store().await;
let size = store.overflow_size(cid).await.expect("size");
assert_eq!(size, 0);
}
#[tokio::test]
async fn overflow_size_sums_byte_sizes() {
let (store, cid) = make_store().await;
store.save_overflow(cid, b"aaa").await.expect("save1");
store.save_overflow(cid, b"bb").await.expect("save2");
let size = store.overflow_size(cid).await.expect("size");
assert_eq!(size, 5);
}
#[tokio::test]
async fn cascade_delete_removes_overflow() {
let (store, cid) = make_store().await;
let id = store.save_overflow(cid, b"data").await.expect("save");
zeph_db::query(sql!("DELETE FROM conversations WHERE id = ?"))
.bind(cid)
.execute(store.pool())
.await
.expect("delete conversation");
let count: i64 =
zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM tool_overflow WHERE id = ?"))
.bind(&id)
.fetch_one(store.pool())
.await
.expect("count");
assert_eq!(count, 0, "overflow row should be removed by CASCADE");
}
#[tokio::test]
async fn cleanup_removes_old_entries() {
let (store, cid) = make_store().await;
let id = Uuid::new_v4().to_string();
zeph_db::query(sql!(
"INSERT INTO tool_overflow (id, conversation_id, content, byte_size, created_at) \
VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
))
.bind(&id)
.bind(cid)
.bind(b"old data".as_slice())
.bind(8i64)
.execute(store.pool())
.await
.expect("insert old row");
let fresh_id = store.save_overflow(cid, b"fresh").await.expect("fresh");
let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
assert_eq!(deleted, 1, "one old row should be deleted");
assert!(
store
.load_overflow(&id, cid)
.await
.expect("load old")
.is_none()
);
assert!(
store
.load_overflow(&fresh_id, cid)
.await
.expect("load fresh")
.is_some()
);
}
#[tokio::test]
async fn cleanup_fresh_entries_not_removed() {
let (store, cid) = make_store().await;
store.save_overflow(cid, b"a").await.expect("save");
store.save_overflow(cid, b"b").await.expect("save");
let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
assert_eq!(deleted, 0);
}
#[tokio::test]
async fn save_archive_and_load_roundtrip() {
let (store, cid) = make_store().await;
let content = b"archived tool output body";
let id = store
.save_archive(cid, content)
.await
.expect("save_archive");
let loaded = store.load_overflow(&id, cid).await.expect("load");
assert_eq!(loaded, Some(content.to_vec()));
}
#[tokio::test]
async fn cleanup_does_not_remove_old_archives() {
let (store, cid) = make_store().await;
let archive_id = Uuid::new_v4().to_string();
zeph_db::query(sql!(
"INSERT INTO tool_overflow \
(id, conversation_id, content, byte_size, archive_type, created_at) \
VALUES (?, ?, ?, ?, 'archive', datetime('now', '-30 days'))"
))
.bind(&archive_id)
.bind(cid)
.bind(b"old archive".as_slice())
.bind(11i64)
.execute(store.pool())
.await
.expect("insert old archive");
let overflow_id = Uuid::new_v4().to_string();
zeph_db::query(sql!(
"INSERT INTO tool_overflow \
(id, conversation_id, content, byte_size, archive_type, created_at) \
VALUES (?, ?, ?, ?, 'overflow', datetime('now', '-30 days'))"
))
.bind(&overflow_id)
.bind(cid)
.bind(b"old overflow".as_slice())
.bind(12i64)
.execute(store.pool())
.await
.expect("insert old overflow");
let deleted = store.cleanup_overflow(86400).await.expect("cleanup");
assert_eq!(deleted, 1, "only the overflow-type row should be deleted");
assert!(
store
.load_overflow(&archive_id, cid)
.await
.expect("load archive")
.is_some(),
"archive must not be removed by cleanup"
);
assert!(
store
.load_overflow(&overflow_id, cid)
.await
.expect("load overflow")
.is_none(),
"old overflow must be removed by cleanup"
);
}
}