enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! JSONL State Store - File-based state snapshot storage
//!
//! Stores execution snapshots and key-value state in JSONL format.
//! - Snapshots: `{dir}/snapshots/{execution_id}.json`
//! - Key-values: `{dir}/kv/{key}.json`

use crate::kernel::persistence::{ExecutionSnapshot, StateStore, StorageBackend};
use crate::kernel::{ExecutionId, TenantId};
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs;
use tracing::debug;

/// JSONL-based state store
///
/// Stores snapshots as individual JSON files and key-value pairs in a separate directory.
pub struct JsonlStateStore {
    /// Base directory for state files
    base_dir: PathBuf,
}

impl JsonlStateStore {
    /// Create a new JSONL state store at the given directory
    pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
        // Ensure directories exist
        fs::create_dir_all(base_dir.join("snapshots")).await?;
        fs::create_dir_all(base_dir.join("kv")).await?;

        Ok(Self { base_dir })
    }

    /// Get the file path for a snapshot
    fn snapshot_path(&self, execution_id: &ExecutionId) -> PathBuf {
        self.base_dir
            .join("snapshots")
            .join(format!("{}.json", execution_id.as_str()))
    }

    /// Get the file path for a key-value entry
    fn kv_path(&self, key: &str) -> PathBuf {
        // Sanitize key for filesystem (replace special chars)
        let safe_key = key.replace(['/', ':', '\\'], "_");
        self.base_dir.join("kv").join(format!("{}.json", safe_key))
    }
}

impl std::fmt::Debug for JsonlStateStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("JsonlStateStore")
            .field("base_dir", &self.base_dir)
            .finish()
    }
}

#[async_trait]
impl StorageBackend for JsonlStateStore {
    fn name(&self) -> &str {
        "jsonl"
    }

    fn requires_network(&self) -> bool {
        false
    }

    async fn health_check(&self) -> anyhow::Result<()> {
        // Check that directories exist and are writable
        if !self.base_dir.exists() {
            anyhow::bail!("State store directory does not exist: {:?}", self.base_dir);
        }
        Ok(())
    }

    async fn shutdown(&self) -> anyhow::Result<()> {
        Ok(())
    }
}

#[async_trait]
impl StateStore for JsonlStateStore {
    async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()> {
        let path = self.snapshot_path(&snapshot.execution_id);

        let json = serde_json::to_string_pretty(&snapshot)?;
        fs::write(&path, json).await?;

        debug!(
            "Saved snapshot for execution {} to {:?}",
            snapshot.execution_id.as_str(),
            path
        );

        Ok(())
    }

    async fn load_snapshot(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Option<ExecutionSnapshot>> {
        let path = self.snapshot_path(execution_id);

        if !path.exists() {
            return Ok(None);
        }

        let json = fs::read_to_string(&path).await?;
        let snapshot: ExecutionSnapshot = serde_json::from_str(&json)?;

        debug!(
            "Loaded snapshot for execution {} from {:?}",
            execution_id.as_str(),
            path
        );

        Ok(Some(snapshot))
    }

    async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
        let path = self.snapshot_path(execution_id);

        if path.exists() {
            fs::remove_file(&path).await?;
            debug!(
                "Deleted snapshot for execution {} from {:?}",
                execution_id.as_str(),
                path
            );
        }

        Ok(())
    }

    async fn set(&self, key: &str, value: &[u8], _ttl: Option<Duration>) -> anyhow::Result<()> {
        let path = self.kv_path(key);

        // Store as JSON with metadata
        let entry = serde_json::json!({
            "key": key,
            "value": BASE64.encode(value),
            "timestamp": chrono::Utc::now().to_rfc3339(),
        });

        let json = serde_json::to_string_pretty(&entry)?;
        fs::write(&path, json).await?;

        // Note: TTL is not implemented for JSONL store (would need background cleanup)
        debug!("Set key {} to {:?}", key, path);

        Ok(())
    }

    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
        let path = self.kv_path(key);

        if !path.exists() {
            return Ok(None);
        }

        let json = fs::read_to_string(&path).await?;
        let entry: serde_json::Value = serde_json::from_str(&json)?;

        if let Some(value_str) = entry.get("value").and_then(|v| v.as_str()) {
            let value = BASE64.decode(value_str)?;
            return Ok(Some(value));
        }

        Ok(None)
    }

    async fn delete(&self, key: &str) -> anyhow::Result<()> {
        let path = self.kv_path(key);

        if path.exists() {
            fs::remove_file(&path).await?;
            debug!("Deleted key {} from {:?}", key, path);
        }

        Ok(())
    }

    async fn list_snapshots(
        &self,
        _tenant_id: &TenantId,
        limit: usize,
    ) -> anyhow::Result<Vec<ExecutionId>> {
        let snapshot_dir = self.base_dir.join("snapshots");
        let mut execution_ids = Vec::new();

        let mut entries = fs::read_dir(&snapshot_dir).await?;
        while let Some(entry) = entries.next_entry().await? {
            let path = entry.path();
            if path.extension().map(|e| e == "json").unwrap_or(false) {
                if let Some(id_str) = path.file_stem().and_then(|s| s.to_str()) {
                    execution_ids.push(ExecutionId::from(id_str));
                }
            }
            if execution_ids.len() >= limit {
                break;
            }
        }

        Ok(execution_ids)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::kernel::ExecutionState;
    use tempfile::tempdir;

    #[tokio::test]
    async fn test_jsonl_state_store_snapshot() {
        let dir = tempdir().unwrap();
        let store = JsonlStateStore::new(dir.path().to_path_buf())
            .await
            .unwrap();

        let exec_id = ExecutionId::new();
        let tenant_id = TenantId::from("test");
        let snapshot =
            ExecutionSnapshot::new(exec_id.clone(), tenant_id, ExecutionState::Running, 5);

        // Save
        store.save_snapshot(snapshot.clone()).await.unwrap();

        // Load
        let loaded = store.load_snapshot(&exec_id).await.unwrap();
        assert!(loaded.is_some());
        let loaded = loaded.unwrap();
        assert_eq!(loaded.execution_id, exec_id);
        assert_eq!(loaded.state, ExecutionState::Running);
    }

    #[tokio::test]
    async fn test_jsonl_state_store_kv() {
        let dir = tempdir().unwrap();
        let store = JsonlStateStore::new(dir.path().to_path_buf())
            .await
            .unwrap();

        let key = "test:key:123";
        let value = b"hello world";

        // Set
        store.set(key, value, None).await.unwrap();

        // Get
        let loaded = store.get(key).await.unwrap();
        assert!(loaded.is_some());
        assert_eq!(loaded.unwrap(), value);

        // Delete
        store.delete(key).await.unwrap();
        let loaded = store.get(key).await.unwrap();
        assert!(loaded.is_none());
    }
}