enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Storage Factory - Create storage backends from configuration
//!
//! This module provides factory functions to create EventStore and StateStore
//! instances based on configuration. It reads from `config.yaml` and creates
//! the appropriate backend (jsonl, sqlite, etc.).
//!
//! ## Supported Backends
//!
//! - **jsonl**: File-based JSONL storage (default, good for testing/development)
//! - **sqlite**: SQLite-based storage (planned)
//! - **memory**: In-memory storage (for testing only)

use crate::kernel::persistence::StateStore;
use crate::streaming::{
    EventLog, EventStore, InMemoryEventStore, JsonlEventStore, JsonlStateStore,
};
use std::path::{Path, PathBuf};
use std::sync::Arc;

/// Storage configuration (mirrors enact_config::storage::EventStore)
#[derive(Debug, Clone)]
pub struct EventStoreConfig {
    pub store_type: String,
    pub path: Option<String>,
    pub dsn: Option<String>,
}

/// Storage configuration (mirrors enact_config::storage::StateStore)
#[derive(Debug, Clone)]
pub struct StateStoreConfig {
    pub store_type: String,
    pub path: Option<String>,
    pub dsn: Option<String>,
}

/// Create an EventStore from configuration
///
/// # Arguments
/// * `config` - Event store configuration
/// * `base_dir` - Base directory (typically ENACT_HOME) for relative paths
///
/// # Returns
/// An Arc-wrapped EventStore implementation
pub async fn create_event_store(
    config: &EventStoreConfig,
    base_dir: &Path,
) -> anyhow::Result<Arc<dyn EventStore>> {
    match config.store_type.as_str() {
        "jsonl" => {
            let path = match &config.path {
                Some(p) => base_dir.join(p),
                None => base_dir.join("events"),
            };
            let store = JsonlEventStore::new(path).await?;
            Ok(Arc::new(store))
        }
        "memory" => Ok(Arc::new(InMemoryEventStore::new())),
        "sqlite" => {
            // For now, fallback to JSONL since SQLite isn't implemented
            tracing::warn!("SQLite event store not implemented, falling back to JSONL");
            let path = match &config.path {
                Some(p) => base_dir.join(p),
                None => base_dir.join("events"),
            };
            let store = JsonlEventStore::new(path).await?;
            Ok(Arc::new(store))
        }
        other => anyhow::bail!("Unknown event store type: {}", other),
    }
}

/// Create a StateStore from configuration
///
/// # Arguments
/// * `config` - State store configuration
/// * `base_dir` - Base directory (typically ENACT_HOME) for relative paths
///
/// # Returns
/// An Arc-wrapped StateStore implementation
pub async fn create_state_store(
    config: &StateStoreConfig,
    base_dir: &Path,
) -> anyhow::Result<Arc<dyn StateStore>> {
    match config.store_type.as_str() {
        "jsonl" => {
            let path = match &config.path {
                Some(p) => base_dir.join(p),
                None => base_dir.join("state"),
            };
            let store = JsonlStateStore::new(path).await?;
            Ok(Arc::new(store))
        }
        "sqlite" => {
            // For now, fallback to JSONL since SQLite isn't implemented
            tracing::warn!("SQLite state store not implemented, falling back to JSONL");
            let path = match &config.path {
                Some(p) => base_dir.join(p),
                None => base_dir.join("state"),
            };
            let store = JsonlStateStore::new(path).await?;
            Ok(Arc::new(store))
        }
        other => anyhow::bail!("Unknown state store type: {}", other),
    }
}

/// Create an EventLog with the configured backend
///
/// This is a convenience function that creates an EventStore and wraps it in an EventLog.
pub async fn create_event_log(
    config: &EventStoreConfig,
    base_dir: &Path,
) -> anyhow::Result<EventLog> {
    let store = create_event_store(config, base_dir).await?;
    Ok(EventLog::new(store))
}

/// Storage context holding all configured stores
///
/// Use this to pass storage configuration throughout the application.
#[derive(Clone)]
pub struct StorageContext {
    pub event_store: Arc<dyn EventStore>,
    pub state_store: Arc<dyn StateStore>,
    pub base_dir: PathBuf,
}

impl StorageContext {
    /// Create a new storage context from configuration
    pub async fn new(
        event_config: &EventStoreConfig,
        state_config: &StateStoreConfig,
        base_dir: PathBuf,
    ) -> anyhow::Result<Self> {
        let event_store = create_event_store(event_config, &base_dir).await?;
        let state_store = create_state_store(state_config, &base_dir).await?;

        Ok(Self {
            event_store,
            state_store,
            base_dir,
        })
    }

    /// Create an in-memory storage context (for testing)
    pub fn in_memory() -> Self {
        Self {
            event_store: Arc::new(InMemoryEventStore::new()),
            state_store: Arc::new(InMemoryStateStore::new()),
            base_dir: PathBuf::from("."),
        }
    }

    /// Get the event log
    pub fn event_log(&self) -> EventLog {
        EventLog::new(self.event_store.clone())
    }
}

/// In-memory state store for testing
#[derive(Debug, Default)]
pub struct InMemoryStateStore {
    snapshots: std::sync::RwLock<
        std::collections::HashMap<String, crate::kernel::persistence::ExecutionSnapshot>,
    >,
    kv: std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>,
}

impl InMemoryStateStore {
    pub fn new() -> Self {
        Self::default()
    }
}

#[async_trait::async_trait]
impl crate::kernel::persistence::StorageBackend for InMemoryStateStore {
    fn name(&self) -> &str {
        "memory"
    }

    fn requires_network(&self) -> bool {
        false
    }

    async fn health_check(&self) -> anyhow::Result<()> {
        Ok(())
    }
}

#[async_trait::async_trait]
impl StateStore for InMemoryStateStore {
    async fn save_snapshot(
        &self,
        snapshot: crate::kernel::persistence::ExecutionSnapshot,
    ) -> anyhow::Result<()> {
        let mut snapshots = self
            .snapshots
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        snapshots.insert(snapshot.execution_id.as_str().to_string(), snapshot);
        Ok(())
    }

    async fn load_snapshot(
        &self,
        execution_id: &crate::kernel::ExecutionId,
    ) -> anyhow::Result<Option<crate::kernel::persistence::ExecutionSnapshot>> {
        let snapshots = self
            .snapshots
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        Ok(snapshots.get(execution_id.as_str()).cloned())
    }

    async fn delete_snapshot(
        &self,
        execution_id: &crate::kernel::ExecutionId,
    ) -> anyhow::Result<()> {
        let mut snapshots = self
            .snapshots
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        snapshots.remove(execution_id.as_str());
        Ok(())
    }

    async fn set(
        &self,
        key: &str,
        value: &[u8],
        _ttl: Option<std::time::Duration>,
    ) -> anyhow::Result<()> {
        let mut kv = self
            .kv
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        kv.insert(key.to_string(), value.to_vec());
        Ok(())
    }

    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
        let kv = self
            .kv
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        Ok(kv.get(key).cloned())
    }

    async fn delete(&self, key: &str) -> anyhow::Result<()> {
        let mut kv = self
            .kv
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        kv.remove(key);
        Ok(())
    }

    async fn list_snapshots(
        &self,
        _tenant_id: &crate::kernel::TenantId,
        limit: usize,
    ) -> anyhow::Result<Vec<crate::kernel::ExecutionId>> {
        let snapshots = self
            .snapshots
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        Ok(snapshots
            .keys()
            .take(limit)
            .map(|k| crate::kernel::ExecutionId::from(k.as_str()))
            .collect())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[tokio::test]
    async fn test_create_jsonl_event_store() {
        let dir = tempdir().unwrap();
        let config = EventStoreConfig {
            store_type: "jsonl".to_string(),
            path: Some("events".to_string()),
            dsn: None,
        };

        let _store = create_event_store(&config, dir.path()).await.unwrap();

        // Verify directory was created
        assert!(dir.path().join("events").exists());
    }

    #[tokio::test]
    async fn test_create_storage_context() {
        let dir = tempdir().unwrap();
        let event_config = EventStoreConfig {
            store_type: "jsonl".to_string(),
            path: Some("events".to_string()),
            dsn: None,
        };
        let state_config = StateStoreConfig {
            store_type: "jsonl".to_string(),
            path: Some("state".to_string()),
            dsn: None,
        };

        let _ctx = StorageContext::new(&event_config, &state_config, dir.path().to_path_buf())
            .await
            .unwrap();

        assert!(dir.path().join("events").exists());
        assert!(dir.path().join("state").exists());
    }
}