enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! EventStore - Append-only event log
//!
//! The EventStore is the **authoritative source of truth** for execution history.
//! It provides:
//! - Append-only semantics (events are never modified)
//! - Ordered events per execution
//! - Durability guarantees (event is persisted before append returns)
//!
//! ## Guarantees
//!
//! - **Durability**: Event is durable before `append()` returns
//! - **Ordering**: Events are strictly ordered by sequence number within an execution
//! - **Immutability**: Events are append-only, never modified or deleted
//! - **Authority**: EventStore is the single source of truth for execution history
//!
//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::kernel::{
    ExecutionError, ExecutionId, ParentLink, StepId, StepSourceType, StepType, TenantId,
};

use super::StorageBackend;

/// A stored event with metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEvent {
    /// Unique sequence number within the execution
    pub sequence: u64,
    /// The execution this event belongs to
    pub execution_id: ExecutionId,
    /// Tenant for multi-tenancy
    pub tenant_id: TenantId,
    /// The event payload
    pub event: ExecutionEventData,
    /// When this event occurred
    pub timestamp: DateTime<Utc>,
}

/// Event data that can be stored
///
/// These events represent all state transitions in an execution.
/// They can be replayed to reconstruct execution state.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ExecutionEventData {
    /// Execution started
    ExecutionStarted {
        parent: Option<ParentLink>,
        metadata: Option<serde_json::Value>,
    },

    /// A step started
    StepStarted {
        step_id: StepId,
        parent_step_id: Option<StepId>,
        step_type: StepType,
        name: String,
    },

    /// A step completed successfully
    StepCompleted {
        step_id: StepId,
        output: Option<String>,
        duration_ms: u64,
    },

    /// A step failed
    StepFailed {
        step_id: StepId,
        error: ExecutionError,
    },

    /// Execution paused
    ExecutionPaused { reason: String },

    /// Execution resumed
    ExecutionResumed,

    /// Execution completed successfully
    ExecutionCompleted {
        output: Option<String>,
        duration_ms: u64,
    },

    /// Execution failed
    ExecutionFailed { error: ExecutionError },

    /// Execution cancelled
    ExecutionCancelled { reason: String },

    /// Artifact stored
    ArtifactStored {
        artifact_id: String,
        step_id: StepId,
        content_hash: String,
        size_bytes: u64,
    },

    /// A step was dynamically discovered during execution
    ///
    /// Emitted when the agentic loop or LLM output identifies new work.
    /// Enables audit trails for discovered steps and replay of discovery sequences.
    StepDiscovered {
        /// The discovered step's ID
        step_id: StepId,
        /// Which step discovered this new work (if any)
        discovered_by: Option<StepId>,
        /// What triggered the discovery
        source_type: StepSourceType,
        /// Human-readable reason for discovery
        reason: String,
        /// Discovery depth (how deep in the discovery chain)
        depth: u32,
    },

    /// A tool call started (input available)
    ToolCallStarted {
        step_id: Option<StepId>,
        tool_call_id: String,
        tool_name: String,
        input: serde_json::Value,
    },

    /// A tool call completed (output available)
    ToolCallCompleted {
        step_id: Option<StepId>,
        tool_call_id: String,
        output: serde_json::Value,
    },

    /// A checkpoint was saved
    CheckpointSaved {
        step_id: Option<StepId>,
        checkpoint_id: String,
        state_hash: String,
    },

    /// A goal was evaluated
    GoalEvaluated {
        step_id: Option<StepId>,
        goal_id: String,
        status: String, // "met", "not_met", "progressing"
        score: Option<f64>,
        reason: Option<String>,
    },

    /// Custom event (for extensibility)
    Custom {
        event_type: String,
        data: serde_json::Value,
    },
}

/// EventStore trait - append-only event log
///
/// This is the core persistence trait for execution events. All execution
/// state can be reconstructed by replaying events from this store.
#[async_trait]
pub trait EventStore: StorageBackend {
    /// Append an event to the store
    ///
    /// # Arguments
    /// * `execution_id` - The execution this event belongs to
    /// * `tenant_id` - Tenant for multi-tenancy isolation
    /// * `event` - The event data to store
    ///
    /// # Returns
    /// * `Ok(sequence)` - The sequence number assigned to this event
    /// * `Err` - If the event could not be persisted
    ///
    /// # Guarantees
    /// - Event is durable when this returns Ok
    /// - Sequence numbers are monotonically increasing per execution
    async fn append(
        &self,
        execution_id: &ExecutionId,
        tenant_id: &TenantId,
        event: ExecutionEventData,
    ) -> anyhow::Result<u64>;

    /// Append multiple events atomically
    ///
    /// All events are stored with consecutive sequence numbers, or none are stored.
    ///
    /// # Returns
    /// * `Ok(first_sequence)` - The sequence number of the first event in the batch
    async fn append_batch(
        &self,
        execution_id: &ExecutionId,
        tenant_id: &TenantId,
        events: Vec<ExecutionEventData>,
    ) -> anyhow::Result<u64>;

    /// Load all events for an execution
    ///
    /// Returns events ordered by sequence number.
    async fn load_events(&self, execution_id: &ExecutionId) -> anyhow::Result<Vec<StoredEvent>>;

    /// Load events after a specific sequence number
    ///
    /// Useful for incremental replay or catching up after a checkpoint.
    ///
    /// # Arguments
    /// * `execution_id` - The execution to load events for
    /// * `after_seq` - Return events with sequence > after_seq
    async fn load_events_after(
        &self,
        execution_id: &ExecutionId,
        after_seq: u64,
    ) -> anyhow::Result<Vec<StoredEvent>>;

    /// Get the latest sequence number for an execution
    ///
    /// Returns `None` if no events exist for this execution.
    async fn get_latest_sequence(&self, execution_id: &ExecutionId) -> anyhow::Result<Option<u64>>;

    /// Check if an execution exists
    async fn execution_exists(&self, execution_id: &ExecutionId) -> anyhow::Result<bool> {
        Ok(self.get_latest_sequence(execution_id).await?.is_some())
    }

    /// List executions for a tenant
    ///
    /// Returns execution IDs ordered by creation time (newest first).
    ///
    /// # Arguments
    /// * `tenant_id` - The tenant to list executions for
    /// * `limit` - Maximum number of executions to return
    /// * `offset` - Number of executions to skip (for pagination)
    async fn list_executions(
        &self,
        tenant_id: &TenantId,
        limit: usize,
        offset: usize,
    ) -> anyhow::Result<Vec<ExecutionId>>;

    /// Count events for an execution
    async fn count_events(&self, execution_id: &ExecutionId) -> anyhow::Result<u64> {
        Ok(self.get_latest_sequence(execution_id).await?.unwrap_or(0))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_event_serialization() {
        let event = ExecutionEventData::StepStarted {
            step_id: StepId::new(),
            parent_step_id: None,
            step_type: StepType::FunctionNode,
            name: "test_step".to_string(),
        };

        let json = serde_json::to_string(&event).unwrap();
        let parsed: ExecutionEventData = serde_json::from_str(&json).unwrap();

        match parsed {
            ExecutionEventData::StepStarted { name, .. } => {
                assert_eq!(name, "test_step");
            }
            _ => panic!("Wrong event type"),
        }
    }
}