enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! StateStore - Hot snapshots and working memory
//!
//! The StateStore provides fast access to execution state for resumption.
//! It is a **cache**, not a source of truth - the EventStore is authoritative.
//!
//! ## Guarantees
//!
//! - **Durability**: Best-effort - may be lost on failure
//! - **Freshness**: May be stale - always verify sequence with EventStore
//! - **Authority**: Non-authoritative - cache only
//! - **Availability**: Designed for fast reads, not durability
//!
//! ## Use Cases
//!
//! - Fast execution resumption (avoid replaying all events)
//! - Working memory during execution
//! - Temporary state that doesn't need event sourcing
//!
//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

/// Extension trait for JSON operations on StateStore
///
/// This is separate from StateStore to maintain dyn-compatibility.
#[async_trait]
pub trait StateStoreJsonExt: StateStore {
    /// Set a JSON value
    async fn set_json<T: Serialize + Send + Sync>(
        &self,
        key: &str,
        value: &T,
        ttl: Option<Duration>,
    ) -> anyhow::Result<()> {
        let bytes = serde_json::to_vec(value)?;
        self.set(key, &bytes, ttl).await
    }

    /// Get a JSON value
    async fn get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Option<T>> {
        match self.get(key).await? {
            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
            None => Ok(None),
        }
    }
}

// Blanket implementation for all StateStore implementors
impl<S: StateStore + ?Sized> StateStoreJsonExt for S {}

use crate::kernel::{ExecutionId, ExecutionState, StepId, TenantId, UserId};

use super::StorageBackend;

/// Execution state snapshot
///
/// This captures the current state of an execution for fast resumption.
/// The `sequence_number` should match the EventStore sequence to verify freshness.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionSnapshot {
    /// The execution this snapshot is for
    pub execution_id: ExecutionId,
    /// Tenant for multi-tenancy isolation
    pub tenant_id: TenantId,
    /// User who initiated this execution (for observability)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub user_id: Option<UserId>,
    /// Current execution state
    pub state: ExecutionState,
    /// Currently executing step (if any)
    pub current_step_id: Option<StepId>,
    /// Outputs from completed steps
    pub step_outputs: HashMap<StepId, serde_json::Value>,
    /// Working variables
    pub variables: HashMap<String, serde_json::Value>,
    /// When this snapshot was created
    pub timestamp: DateTime<Utc>,
    /// EventStore sequence number at snapshot time
    ///
    /// Used to verify freshness - if EventStore has more events,
    /// the snapshot may be stale.
    pub sequence_number: u64,
}

impl ExecutionSnapshot {
    /// Create a new snapshot
    pub fn new(
        execution_id: ExecutionId,
        tenant_id: TenantId,
        state: ExecutionState,
        sequence_number: u64,
    ) -> Self {
        Self {
            execution_id,
            tenant_id,
            user_id: None,
            state,
            current_step_id: None,
            step_outputs: HashMap::new(),
            variables: HashMap::new(),
            timestamp: Utc::now(),
            sequence_number,
        }
    }

    /// Create a new snapshot with user_id
    pub fn with_user(
        execution_id: ExecutionId,
        tenant_id: TenantId,
        user_id: Option<UserId>,
        state: ExecutionState,
        sequence_number: u64,
    ) -> Self {
        Self {
            execution_id,
            tenant_id,
            user_id,
            state,
            current_step_id: None,
            step_outputs: HashMap::new(),
            variables: HashMap::new(),
            timestamp: Utc::now(),
            sequence_number,
        }
    }

    /// Check if this snapshot is fresh compared to an EventStore sequence
    pub fn is_fresh(&self, event_store_sequence: u64) -> bool {
        self.sequence_number >= event_store_sequence
    }
}

/// StateStore trait - mutable snapshot cache
///
/// Provides fast state access for execution resumption and working memory.
/// This is NOT the source of truth - always verify freshness against EventStore.
#[async_trait]
pub trait StateStore: StorageBackend {
    // =========================================================================
    // Snapshot operations
    // =========================================================================

    /// Save a state snapshot
    ///
    /// # Guarantees
    /// - Best-effort durability
    /// - May be overwritten by subsequent saves
    /// - May be lost on backend failure
    async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()>;

    /// Load the latest snapshot for an execution
    ///
    /// Returns `None` if no snapshot exists.
    ///
    /// # Important
    /// Always check `snapshot.sequence_number` against EventStore to verify freshness.
    async fn load_snapshot(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Option<ExecutionSnapshot>>;

    /// Delete a snapshot
    ///
    /// Called when an execution completes or is cleaned up.
    async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()>;

    // =========================================================================
    // Key-value operations (working memory)
    // =========================================================================

    /// Set a key-value pair
    ///
    /// # Arguments
    /// * `key` - The key (should be namespaced, e.g., "exec:{id}:var:{name}")
    /// * `value` - The value as bytes
    /// * `ttl` - Optional time-to-live (auto-delete after this duration)
    async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> anyhow::Result<()>;

    /// Get a value by key
    ///
    /// Returns `None` if the key doesn't exist or has expired.
    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>>;

    /// Delete a key
    async fn delete(&self, key: &str) -> anyhow::Result<()>;

    /// Check if a key exists
    async fn exists(&self, key: &str) -> anyhow::Result<bool> {
        Ok(self.get(key).await?.is_some())
    }

    // =========================================================================
    // Batch operations
    // =========================================================================

    /// Delete all state for an execution
    ///
    /// Cleans up snapshot and any keys prefixed with the execution ID.
    async fn delete_execution_state(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
        self.delete_snapshot(execution_id).await
        // Implementations may also clean up related keys
    }

    /// List snapshots for a tenant (for cleanup/admin)
    async fn list_snapshots(
        &self,
        tenant_id: &TenantId,
        limit: usize,
    ) -> anyhow::Result<Vec<ExecutionId>>;
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_snapshot_freshness() {
        let snapshot = ExecutionSnapshot::new(
            ExecutionId::new(),
            TenantId::from("test"),
            ExecutionState::Running,
            10,
        );

        assert!(snapshot.is_fresh(10));
        assert!(snapshot.is_fresh(5));
        assert!(!snapshot.is_fresh(15));
    }

    #[test]
    fn test_snapshot_serialization() {
        let mut snapshot = ExecutionSnapshot::new(
            ExecutionId::new(),
            TenantId::from("test"),
            ExecutionState::Running,
            5,
        );
        snapshot
            .variables
            .insert("foo".to_string(), serde_json::json!("bar"));

        let json = serde_json::to_string(&snapshot).unwrap();
        let parsed: ExecutionSnapshot = serde_json::from_str(&json).unwrap();

        assert_eq!(parsed.state, ExecutionState::Running);
        assert_eq!(parsed.variables.get("foo").unwrap(), "bar");
    }
}