use crate::error::AgentRuntimeError;
use crate::util::djb2;
use async_trait::async_trait;
use futures::future::try_join_all;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
#[async_trait]
pub trait PersistenceBackend: Send + Sync {
async fn save(&self, key: &str, value: &[u8]) -> Result<(), AgentRuntimeError>;
async fn load(&self, key: &str) -> Result<Option<Vec<u8>>, AgentRuntimeError>;
async fn delete(&self, key: &str) -> Result<(), AgentRuntimeError>;
async fn batch_save(&self, items: &[(&str, &[u8])]) -> Result<(), AgentRuntimeError> {
try_join_all(items.iter().map(|(key, value)| self.save(key, value))).await?;
Ok(())
}
async fn batch_load(&self, keys: &[&str]) -> Result<Vec<Option<Vec<u8>>>, AgentRuntimeError> {
try_join_all(keys.iter().map(|key| self.load(key))).await
}
}
#[derive(Debug, Clone)]
pub struct FilePersistenceBackend {
base_dir: Arc<PathBuf>,
}
impl FilePersistenceBackend {
pub fn new(base_dir: impl Into<PathBuf>) -> Self {
Self {
base_dir: Arc::new(base_dir.into()),
}
}
pub fn base_dir(&self) -> &std::path::Path {
self.base_dir.as_ref()
}
fn path_for(&self, key: &str) -> PathBuf {
let hash = djb2(key);
let sanitized = key.replace(['/', '\\', ':', '*', '?', '"', '<', '>', '|'], "_");
self.base_dir
.join(format!("{sanitized}-{hash:016x}.bin"))
}
}
#[async_trait]
impl PersistenceBackend for FilePersistenceBackend {
#[tracing::instrument(skip(self, value), fields(key))]
async fn save(&self, key: &str, value: &[u8]) -> Result<(), AgentRuntimeError> {
let path = self.path_for(key);
let tmp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4().simple()));
tokio::fs::write(&tmp_path, value)
.await
.map_err(|e| AgentRuntimeError::Persistence(format!("write tmp {tmp_path:?}: {e}")))?;
tokio::fs::rename(&tmp_path, &path).await.map_err(|e| {
AgentRuntimeError::Persistence(format!(
"rename {tmp_path:?} -> {path:?}: {e}"
))
})?;
Ok(())
}
#[tracing::instrument(skip(self), fields(key))]
async fn load(&self, key: &str) -> Result<Option<Vec<u8>>, AgentRuntimeError> {
let path = self.path_for(key);
match tokio::fs::read(&path).await {
Ok(bytes) => Ok(Some(bytes)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(AgentRuntimeError::Persistence(format!(
"read {path:?}: {e}"
))),
}
}
async fn delete(&self, key: &str) -> Result<(), AgentRuntimeError> {
let path = self.path_for(key);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(AgentRuntimeError::Persistence(format!(
"delete {path:?}: {e}"
))),
}
}
async fn batch_save(&self, items: &[(&str, &[u8])]) -> Result<(), AgentRuntimeError> {
let futs: Vec<_> = items.iter().map(|(key, value)| self.save(key, value)).collect();
try_join_all(futs).await?;
Ok(())
}
async fn batch_load(&self, keys: &[&str]) -> Result<Vec<Option<Vec<u8>>>, AgentRuntimeError> {
let futs: Vec<_> = keys.iter().map(|key| self.load(key)).collect();
try_join_all(futs).await
}
}
impl FilePersistenceBackend {
pub async fn list_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
let mut entries = tokio::fs::read_dir(self.base_dir.as_ref())
.await
.map_err(|e| AgentRuntimeError::Persistence(format!("list_keys readdir: {e}")))?;
let mut keys = Vec::new();
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| AgentRuntimeError::Persistence(format!("list_keys entry: {e}")))?
{
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".bin") {
if let Some(stem) = name.strip_suffix(".bin") {
if stem.len() > 17 {
let prefix = &stem[..stem.len() - 17]; keys.push(prefix.to_owned());
}
}
}
}
}
Ok(keys)
}
pub async fn exists(&self, key: &str) -> Result<bool, AgentRuntimeError> {
let path = self.path_for(key);
Ok(tokio::fs::metadata(&path).await.is_ok())
}
pub async fn key_count(&self) -> Result<usize, AgentRuntimeError> {
let mut entries = tokio::fs::read_dir(self.base_dir.as_ref())
.await
.map_err(|e| AgentRuntimeError::Persistence(format!("key_count readdir: {e}")))?;
let mut count = 0usize;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| AgentRuntimeError::Persistence(format!("key_count entry: {e}")))?
{
if entry
.file_name()
.to_str()
.map_or(false, |n| n.ends_with(".bin"))
{
count += 1;
}
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
async fn temp_backend() -> (FilePersistenceBackend, tempdir::Handle) {
let dir = std::env::temp_dir().join(format!("agent_runtime_test_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let backend = FilePersistenceBackend::new(dir.clone());
(backend, tempdir::Handle { path: dir })
}
mod tempdir {
pub struct Handle {
pub path: std::path::PathBuf,
}
impl Drop for Handle {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
}
#[tokio::test]
async fn test_file_backend_save_and_load() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("test-key", b"hello world").await.unwrap();
let loaded = backend.load("test-key").await.unwrap();
assert_eq!(loaded, Some(b"hello world".to_vec()));
}
#[tokio::test]
async fn test_file_backend_load_missing_returns_none() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
let loaded = backend.load("nonexistent").await.unwrap();
assert_eq!(loaded, None);
}
#[tokio::test]
async fn test_file_backend_delete_removes_file() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("to-delete", b"data").await.unwrap();
backend.delete("to-delete").await.unwrap();
let loaded = backend.load("to-delete").await.unwrap();
assert_eq!(loaded, None);
}
#[tokio::test]
async fn test_file_backend_delete_missing_is_noop() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.delete("never-existed").await.unwrap();
}
#[tokio::test]
async fn test_file_backend_key_sanitization() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("agent/session:1", b"data").await.unwrap();
let loaded = backend.load("agent/session:1").await.unwrap();
assert_eq!(loaded, Some(b"data".to_vec()));
}
#[tokio::test]
async fn test_file_backend_collision_resistant_keys() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("a/b", b"slash").await.unwrap();
backend.save("a_b", b"underscore").await.unwrap();
let loaded_slash = backend.load("a/b").await.unwrap();
let loaded_under = backend.load("a_b").await.unwrap();
assert_eq!(loaded_slash, Some(b"slash".to_vec()));
assert_eq!(loaded_under, Some(b"underscore".to_vec()));
assert_ne!(
loaded_slash, loaded_under,
"keys with the same sanitized form must not collide"
);
}
#[tokio::test]
async fn test_batch_save_and_load() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
let data: Vec<(&str, Vec<u8>)> = vec![
("batch-key-1", b"value1".to_vec()),
("batch-key-2", b"value2".to_vec()),
];
let refs: Vec<(&str, &[u8])> = data.iter().map(|(k, v)| (*k, v.as_slice())).collect();
backend.batch_save(&refs).await.unwrap();
let keys = vec!["batch-key-1", "batch-key-2", "batch-key-missing"];
let results = backend.batch_load(&keys).await.unwrap();
assert_eq!(results[0], Some(b"value1".to_vec()));
assert_eq!(results[1], Some(b"value2".to_vec()));
assert_eq!(results[2], None);
}
#[tokio::test]
async fn test_file_backend_list_keys() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("my-session", b"data1").await.unwrap();
backend.save("another-key", b"data2").await.unwrap();
let keys = backend.list_keys().await.unwrap();
assert_eq!(keys.len(), 2);
}
#[tokio::test]
async fn test_persistence_backend_object_safe() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend: Arc<dyn PersistenceBackend> = Arc::new(FilePersistenceBackend::new(&dir));
backend.save("obj-safe", b"ok").await.unwrap();
let r = backend.load("obj-safe").await.unwrap();
assert_eq!(r, Some(b"ok".to_vec()));
}
#[tokio::test]
async fn test_key_count_zero_for_empty_directory() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
assert_eq!(backend.key_count().await.unwrap(), 0);
}
#[tokio::test]
async fn test_key_count_matches_number_of_saves() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("k1", b"v1").await.unwrap();
backend.save("k2", b"v2").await.unwrap();
backend.save("k3", b"v3").await.unwrap();
assert_eq!(backend.key_count().await.unwrap(), 3);
}
#[tokio::test]
async fn test_key_count_decrements_after_delete() {
let dir = std::env::temp_dir().join(format!("art_{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let _guard = tempdir::Handle { path: dir.clone() };
let backend = FilePersistenceBackend::new(&dir);
backend.save("key", b"val").await.unwrap();
assert_eq!(backend.key_count().await.unwrap(), 1);
backend.delete("key").await.unwrap();
assert_eq!(backend.key_count().await.unwrap(), 0);
}
}