enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Event Log - Append-only execution event store
//!
//! The event log is the source of truth for all execution events.
//! It is:
//! - Append-only (events cannot be modified or deleted)
//! - Ordered (events have a sequence number)
//! - Durable (persisted to storage)

use crate::kernel::{ExecutionEvent, ExecutionId};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

/// Event log entry with sequence number
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EventLogEntry {
    /// Global sequence number (monotonically increasing)
    pub sequence: u64,
    /// The event
    pub event: ExecutionEvent,
}

impl EventLogEntry {
    /// Create a new entry
    pub fn new(sequence: u64, event: ExecutionEvent) -> Self {
        Self { sequence, event }
    }
}

/// Event store trait - persistence backend
#[async_trait]
pub trait EventStore: Send + Sync {
    /// Append an event to the log
    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry>;

    /// Get events for an execution
    async fn get_by_execution(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Vec<EventLogEntry>>;

    /// Get events after a sequence number
    async fn get_after(
        &self,
        after_sequence: u64,
        limit: usize,
    ) -> anyhow::Result<Vec<EventLogEntry>>;

    /// Get the latest sequence number
    async fn latest_sequence(&self) -> anyhow::Result<u64>;
}

/// In-memory event store (for testing/development)
#[derive(Debug, Default)]
pub struct InMemoryEventStore {
    events: RwLock<Vec<EventLogEntry>>,
    by_execution: RwLock<HashMap<String, Vec<usize>>>,
}

impl InMemoryEventStore {
    /// Create a new in-memory event store
    pub fn new() -> Self {
        Self {
            events: RwLock::new(Vec::new()),
            by_execution: RwLock::new(HashMap::new()),
        }
    }
}

#[async_trait]
impl EventStore for InMemoryEventStore {
    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
        let mut events = self
            .events
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        let sequence = events.len() as u64;
        let entry = EventLogEntry::new(sequence, event.clone());

        // Store the event
        events.push(entry.clone());

        // Index by execution
        let mut by_execution = self
            .by_execution
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        by_execution
            .entry(event.context.execution_id.as_str().to_string())
            .or_insert_with(Vec::new)
            .push(sequence as usize);

        Ok(entry)
    }

    async fn get_by_execution(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        let events = self
            .events
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        let by_execution = self
            .by_execution
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;

        let indices = by_execution
            .get(execution_id.as_str())
            .map(|v| v.as_slice())
            .unwrap_or(&[]);

        Ok(indices
            .iter()
            .filter_map(|&i| events.get(i).cloned())
            .collect())
    }

    async fn get_after(
        &self,
        after_sequence: u64,
        limit: usize,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        let events = self
            .events
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;

        Ok(events
            .iter()
            .skip(after_sequence as usize + 1)
            .take(limit)
            .cloned()
            .collect())
    }

    async fn latest_sequence(&self) -> anyhow::Result<u64> {
        let events = self
            .events
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
        Ok(events.len().saturating_sub(1) as u64)
    }
}

/// Event log - wraps an event store with subscription support
pub struct EventLog {
    store: Arc<dyn EventStore>,
}

impl EventLog {
    /// Create a new event log with the given store
    pub fn new(store: Arc<dyn EventStore>) -> Self {
        Self { store }
    }

    /// Create an in-memory event log (for testing)
    pub fn in_memory() -> Self {
        Self::new(Arc::new(InMemoryEventStore::new()))
    }

    /// Append an event
    pub async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
        self.store.append(event).await
    }

    /// Get events for an execution
    pub async fn get_by_execution(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        self.store.get_by_execution(execution_id).await
    }

    /// Get events after a sequence number (for polling/streaming)
    pub async fn get_after(
        &self,
        after_sequence: u64,
        limit: usize,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        self.store.get_after(after_sequence, limit).await
    }

    /// Get the latest sequence number
    pub async fn latest_sequence(&self) -> anyhow::Result<u64> {
        self.store.latest_sequence().await
    }

    /// Get access to the underlying store
    pub fn store(&self) -> &Arc<dyn EventStore> {
        &self.store
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::kernel::{ExecutionContext, ExecutionEventType};

    #[tokio::test]
    async fn test_event_log_append_and_retrieve() {
        let log = EventLog::in_memory();

        let exec_id = ExecutionId::new();
        let ctx = ExecutionContext::new(exec_id.clone());
        let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);

        // Append
        let entry = log.append(event).await.unwrap();
        assert_eq!(entry.sequence, 0);

        // Retrieve
        let events = log.get_by_execution(&exec_id).await.unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].sequence, 0);
    }

    #[tokio::test]
    async fn test_event_log_get_after() {
        let log = EventLog::in_memory();

        // Append multiple events
        for _ in 0..5 {
            let exec_id = ExecutionId::new();
            let ctx = ExecutionContext::new(exec_id);
            let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
            log.append(event).await.unwrap();
        }

        // Get after sequence 2
        let events = log.get_after(2, 10).await.unwrap();
        assert_eq!(events.len(), 2); // Events 3 and 4
        assert_eq!(events[0].sequence, 3);
        assert_eq!(events[1].sequence, 4);
    }
}