use crate::kernel::persistence::{ExecutionSnapshot, StateStore, StorageBackend};
use crate::kernel::{ExecutionId, TenantId};
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs;
use tracing::debug;
pub struct JsonlStateStore {
base_dir: PathBuf,
}
impl JsonlStateStore {
pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
fs::create_dir_all(base_dir.join("snapshots")).await?;
fs::create_dir_all(base_dir.join("kv")).await?;
Ok(Self { base_dir })
}
fn snapshot_path(&self, execution_id: &ExecutionId) -> PathBuf {
self.base_dir
.join("snapshots")
.join(format!("{}.json", execution_id.as_str()))
}
fn kv_path(&self, key: &str) -> PathBuf {
let safe_key = key.replace(['/', ':', '\\'], "_");
self.base_dir.join("kv").join(format!("{}.json", safe_key))
}
}
impl std::fmt::Debug for JsonlStateStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonlStateStore")
.field("base_dir", &self.base_dir)
.finish()
}
}
#[async_trait]
impl StorageBackend for JsonlStateStore {
fn name(&self) -> &str {
"jsonl"
}
fn requires_network(&self) -> bool {
false
}
async fn health_check(&self) -> anyhow::Result<()> {
if !self.base_dir.exists() {
anyhow::bail!("State store directory does not exist: {:?}", self.base_dir);
}
Ok(())
}
async fn shutdown(&self) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
impl StateStore for JsonlStateStore {
async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()> {
let path = self.snapshot_path(&snapshot.execution_id);
let json = serde_json::to_string_pretty(&snapshot)?;
fs::write(&path, json).await?;
debug!(
"Saved snapshot for execution {} to {:?}",
snapshot.execution_id.as_str(),
path
);
Ok(())
}
async fn load_snapshot(
&self,
execution_id: &ExecutionId,
) -> anyhow::Result<Option<ExecutionSnapshot>> {
let path = self.snapshot_path(execution_id);
if !path.exists() {
return Ok(None);
}
let json = fs::read_to_string(&path).await?;
let snapshot: ExecutionSnapshot = serde_json::from_str(&json)?;
debug!(
"Loaded snapshot for execution {} from {:?}",
execution_id.as_str(),
path
);
Ok(Some(snapshot))
}
async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
let path = self.snapshot_path(execution_id);
if path.exists() {
fs::remove_file(&path).await?;
debug!(
"Deleted snapshot for execution {} from {:?}",
execution_id.as_str(),
path
);
}
Ok(())
}
async fn set(&self, key: &str, value: &[u8], _ttl: Option<Duration>) -> anyhow::Result<()> {
let path = self.kv_path(key);
let entry = serde_json::json!({
"key": key,
"value": BASE64.encode(value),
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let json = serde_json::to_string_pretty(&entry)?;
fs::write(&path, json).await?;
debug!("Set key {} to {:?}", key, path);
Ok(())
}
async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let path = self.kv_path(key);
if !path.exists() {
return Ok(None);
}
let json = fs::read_to_string(&path).await?;
let entry: serde_json::Value = serde_json::from_str(&json)?;
if let Some(value_str) = entry.get("value").and_then(|v| v.as_str()) {
let value = BASE64.decode(value_str)?;
return Ok(Some(value));
}
Ok(None)
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
let path = self.kv_path(key);
if path.exists() {
fs::remove_file(&path).await?;
debug!("Deleted key {} from {:?}", key, path);
}
Ok(())
}
async fn list_snapshots(
&self,
_tenant_id: &TenantId,
limit: usize,
) -> anyhow::Result<Vec<ExecutionId>> {
let snapshot_dir = self.base_dir.join("snapshots");
let mut execution_ids = Vec::new();
let mut entries = fs::read_dir(&snapshot_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false) {
if let Some(id_str) = path.file_stem().and_then(|s| s.to_str()) {
execution_ids.push(ExecutionId::from(id_str));
}
}
if execution_ids.len() >= limit {
break;
}
}
Ok(execution_ids)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::ExecutionState;
use tempfile::tempdir;
#[tokio::test]
async fn test_jsonl_state_store_snapshot() {
let dir = tempdir().unwrap();
let store = JsonlStateStore::new(dir.path().to_path_buf())
.await
.unwrap();
let exec_id = ExecutionId::new();
let tenant_id = TenantId::from("test");
let snapshot =
ExecutionSnapshot::new(exec_id.clone(), tenant_id, ExecutionState::Running, 5);
store.save_snapshot(snapshot.clone()).await.unwrap();
let loaded = store.load_snapshot(&exec_id).await.unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.execution_id, exec_id);
assert_eq!(loaded.state, ExecutionState::Running);
}
#[tokio::test]
async fn test_jsonl_state_store_kv() {
let dir = tempdir().unwrap();
let store = JsonlStateStore::new(dir.path().to_path_buf())
.await
.unwrap();
let key = "test:key:123";
let value = b"hello world";
store.set(key, value, None).await.unwrap();
let loaded = store.get(key).await.unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap(), value);
store.delete(key).await.unwrap();
let loaded = store.get(key).await.unwrap();
assert!(loaded.is_none());
}
}