enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! JSONL Event Store - File-based append-only event storage
//!
//! Stores events in JSONL (JSON Lines) format for easy testing and debugging.
//! Each execution gets its own file: `{dir}/{execution_id}.jsonl`

use crate::kernel::{ExecutionEvent, ExecutionId};
use crate::streaming::event_logger::{EventLogEntry, EventStore};
use async_trait::async_trait;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::RwLock;
use tokio::fs;
use tracing::debug;

/// JSONL-based event store
///
/// Stores events in JSONL files, one file per execution.
/// Directory structure: `{base_dir}/{execution_id}.jsonl`
pub struct JsonlEventStore {
    /// Base directory for event files
    base_dir: PathBuf,
    /// Global sequence counter (in-memory, rebuilt on load)
    sequence: RwLock<u64>,
    /// Index of sequence numbers per execution (for efficient lookups)
    index: RwLock<HashMap<String, Vec<u64>>>,
}

impl JsonlEventStore {
    /// Create a new JSONL event store at the given directory
    pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
        // Ensure directory exists
        fs::create_dir_all(&base_dir).await?;

        let store = Self {
            base_dir,
            sequence: RwLock::new(0),
            index: RwLock::new(HashMap::new()),
        };

        // Rebuild index from existing files
        store.rebuild_index().await?;

        Ok(store)
    }

    /// Get the file path for an execution
    fn execution_file(&self, execution_id: &str) -> PathBuf {
        self.base_dir.join(format!("{}.jsonl", execution_id))
    }

    /// Rebuild the index from existing files
    async fn rebuild_index(&self) -> anyhow::Result<()> {
        let mut max_sequence = 0u64;
        let mut index = HashMap::new();

        let mut entries = match fs::read_dir(&self.base_dir).await {
            Ok(e) => e,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
            Err(e) => return Err(e.into()),
        };

        while let Some(entry) = entries.next_entry().await? {
            let path = entry.path();
            if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
                let execution_id = path
                    .file_stem()
                    .and_then(|s| s.to_str())
                    .unwrap_or("")
                    .to_string();

                if let Ok(file) = std::fs::File::open(&path) {
                    let reader = BufReader::new(file);
                    let mut sequences = Vec::new();

                    for line in reader.lines().map_while(Result::ok) {
                        if let Ok(entry) = serde_json::from_str::<EventLogEntry>(&line) {
                            sequences.push(entry.sequence);
                            if entry.sequence > max_sequence {
                                max_sequence = entry.sequence;
                            }
                        }
                    }

                    if !sequences.is_empty() {
                        index.insert(execution_id, sequences);
                    }
                }
            }
        }

        let num_executions = index.len();
        *self
            .sequence
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = max_sequence;
        *self
            .index
            .write()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = index;

        debug!(
            "Rebuilt JSONL event store index: {} executions, max_sequence={}",
            num_executions, max_sequence
        );

        Ok(())
    }

    /// Read all entries for an execution from file
    fn read_execution_file(&self, execution_id: &str) -> anyhow::Result<Vec<EventLogEntry>> {
        let path = self.execution_file(execution_id);
        if !path.exists() {
            return Ok(Vec::new());
        }

        let file = std::fs::File::open(&path)?;
        let reader = BufReader::new(file);
        let mut entries = Vec::new();

        for line in reader.lines() {
            let line = line?;
            if !line.trim().is_empty() {
                let entry: EventLogEntry = serde_json::from_str(&line)?;
                entries.push(entry);
            }
        }

        Ok(entries)
    }
}

impl std::fmt::Debug for JsonlEventStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("JsonlEventStore")
            .field("base_dir", &self.base_dir)
            .finish()
    }
}

#[async_trait]
impl EventStore for JsonlEventStore {
    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
        let execution_id = event.context.execution_id.as_str().to_string();

        // Get next sequence number
        let sequence = {
            let mut seq = self
                .sequence
                .write()
                .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
            *seq += 1;
            *seq
        };

        let entry = EventLogEntry::new(sequence, event);

        // Serialize to JSON line
        let json = serde_json::to_string(&entry)?;

        // Append to file
        let path = self.execution_file(&execution_id);
        let mut file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&path)?;
        writeln!(file, "{}", json)?;
        file.flush()?;

        // Update index
        {
            let mut index = self
                .index
                .write()
                .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
            index
                .entry(execution_id)
                .or_insert_with(Vec::new)
                .push(sequence);
        }

        debug!("Appended event {} to JSONL store", sequence);

        Ok(entry)
    }

    async fn get_by_execution(
        &self,
        execution_id: &ExecutionId,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        self.read_execution_file(execution_id.as_str())
    }

    async fn get_after(
        &self,
        after_sequence: u64,
        limit: usize,
    ) -> anyhow::Result<Vec<EventLogEntry>> {
        // This is less efficient for JSONL but acceptable for testing
        // We need to scan all files to find events after the sequence
        let index = self
            .index
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;

        let mut all_entries = Vec::new();

        for execution_id in index.keys() {
            let entries = self.read_execution_file(execution_id)?;
            for entry in entries {
                if entry.sequence > after_sequence {
                    all_entries.push(entry);
                }
            }
        }

        // Sort by sequence and limit
        all_entries.sort_by_key(|e| e.sequence);
        all_entries.truncate(limit);

        Ok(all_entries)
    }

    async fn latest_sequence(&self) -> anyhow::Result<u64> {
        Ok(*self
            .sequence
            .read()
            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::kernel::{ExecutionContext, ExecutionEventType};
    use tempfile::tempdir;

    #[tokio::test]
    async fn test_jsonl_store_append_and_retrieve() {
        let dir = tempdir().unwrap();
        let store = JsonlEventStore::new(dir.path().to_path_buf())
            .await
            .unwrap();

        let exec_id = ExecutionId::new();
        let ctx = ExecutionContext::new(exec_id.clone());
        let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);

        // Append
        let entry = store.append(event).await.unwrap();
        assert_eq!(entry.sequence, 1);

        // Retrieve
        let events = store.get_by_execution(&exec_id).await.unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].sequence, 1);
    }

    #[tokio::test]
    async fn test_jsonl_store_persistence() {
        let dir = tempdir().unwrap();
        let exec_id = ExecutionId::new();

        // Create store and append event
        {
            let store = JsonlEventStore::new(dir.path().to_path_buf())
                .await
                .unwrap();
            let ctx = ExecutionContext::new(exec_id.clone());
            let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
            store.append(event).await.unwrap();
        }

        // Create new store and verify event persisted
        {
            let store = JsonlEventStore::new(dir.path().to_path_buf())
                .await
                .unwrap();
            let events = store.get_by_execution(&exec_id).await.unwrap();
            assert_eq!(events.len(), 1);
        }
    }
}