Skip to main content

enact_core/streaming/
jsonl_event_store.rs

1//! JSONL Event Store - File-based append-only event storage
2//!
3//! Stores events in JSONL (JSON Lines) format for easy testing and debugging.
4//! Each execution gets its own file: `{dir}/{execution_id}.jsonl`
5
6use crate::kernel::{ExecutionEvent, ExecutionId};
7use crate::streaming::event_logger::{EventLogEntry, EventStore};
8use async_trait::async_trait;
9use std::collections::HashMap;
10use std::io::{BufRead, BufReader, Write};
11use std::path::PathBuf;
12use std::sync::RwLock;
13use tokio::fs;
14use tracing::debug;
15
16/// JSONL-based event store
17///
18/// Stores events in JSONL files, one file per execution.
19/// Directory structure: `{base_dir}/{execution_id}.jsonl`
20pub struct JsonlEventStore {
21    /// Base directory for event files
22    base_dir: PathBuf,
23    /// Global sequence counter (in-memory, rebuilt on load)
24    sequence: RwLock<u64>,
25    /// Index of sequence numbers per execution (for efficient lookups)
26    index: RwLock<HashMap<String, Vec<u64>>>,
27}
28
29impl JsonlEventStore {
30    /// Create a new JSONL event store at the given directory
31    pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
32        // Ensure directory exists
33        fs::create_dir_all(&base_dir).await?;
34
35        let store = Self {
36            base_dir,
37            sequence: RwLock::new(0),
38            index: RwLock::new(HashMap::new()),
39        };
40
41        // Rebuild index from existing files
42        store.rebuild_index().await?;
43
44        Ok(store)
45    }
46
47    /// Get the file path for an execution
48    fn execution_file(&self, execution_id: &str) -> PathBuf {
49        self.base_dir.join(format!("{}.jsonl", execution_id))
50    }
51
52    /// Rebuild the index from existing files
53    async fn rebuild_index(&self) -> anyhow::Result<()> {
54        let mut max_sequence = 0u64;
55        let mut index = HashMap::new();
56
57        let mut entries = match fs::read_dir(&self.base_dir).await {
58            Ok(e) => e,
59            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
60            Err(e) => return Err(e.into()),
61        };
62
63        while let Some(entry) = entries.next_entry().await? {
64            let path = entry.path();
65            if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
66                let execution_id = path
67                    .file_stem()
68                    .and_then(|s| s.to_str())
69                    .unwrap_or("")
70                    .to_string();
71
72                if let Ok(file) = std::fs::File::open(&path) {
73                    let reader = BufReader::new(file);
74                    let mut sequences = Vec::new();
75
76                    for line in reader.lines().map_while(Result::ok) {
77                        if let Ok(entry) = serde_json::from_str::<EventLogEntry>(&line) {
78                            sequences.push(entry.sequence);
79                            if entry.sequence > max_sequence {
80                                max_sequence = entry.sequence;
81                            }
82                        }
83                    }
84
85                    if !sequences.is_empty() {
86                        index.insert(execution_id, sequences);
87                    }
88                }
89            }
90        }
91
92        let num_executions = index.len();
93        *self
94            .sequence
95            .write()
96            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = max_sequence;
97        *self
98            .index
99            .write()
100            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = index;
101
102        debug!(
103            "Rebuilt JSONL event store index: {} executions, max_sequence={}",
104            num_executions, max_sequence
105        );
106
107        Ok(())
108    }
109
110    /// Read all entries for an execution from file
111    fn read_execution_file(&self, execution_id: &str) -> anyhow::Result<Vec<EventLogEntry>> {
112        let path = self.execution_file(execution_id);
113        if !path.exists() {
114            return Ok(Vec::new());
115        }
116
117        let file = std::fs::File::open(&path)?;
118        let reader = BufReader::new(file);
119        let mut entries = Vec::new();
120
121        for line in reader.lines() {
122            let line = line?;
123            if !line.trim().is_empty() {
124                let entry: EventLogEntry = serde_json::from_str(&line)?;
125                entries.push(entry);
126            }
127        }
128
129        Ok(entries)
130    }
131}
132
133impl std::fmt::Debug for JsonlEventStore {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("JsonlEventStore")
136            .field("base_dir", &self.base_dir)
137            .finish()
138    }
139}
140
141#[async_trait]
142impl EventStore for JsonlEventStore {
143    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
144        let execution_id = event.context.execution_id.as_str().to_string();
145
146        // Get next sequence number
147        let sequence = {
148            let mut seq = self
149                .sequence
150                .write()
151                .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
152            *seq += 1;
153            *seq
154        };
155
156        let entry = EventLogEntry::new(sequence, event);
157
158        // Serialize to JSON line
159        let json = serde_json::to_string(&entry)?;
160
161        // Append to file
162        let path = self.execution_file(&execution_id);
163        let mut file = std::fs::OpenOptions::new()
164            .create(true)
165            .append(true)
166            .open(&path)?;
167        writeln!(file, "{}", json)?;
168        file.flush()?;
169
170        // Update index
171        {
172            let mut index = self
173                .index
174                .write()
175                .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
176            index
177                .entry(execution_id)
178                .or_insert_with(Vec::new)
179                .push(sequence);
180        }
181
182        debug!("Appended event {} to JSONL store", sequence);
183
184        Ok(entry)
185    }
186
187    async fn get_by_execution(
188        &self,
189        execution_id: &ExecutionId,
190    ) -> anyhow::Result<Vec<EventLogEntry>> {
191        self.read_execution_file(execution_id.as_str())
192    }
193
194    async fn get_after(
195        &self,
196        after_sequence: u64,
197        limit: usize,
198    ) -> anyhow::Result<Vec<EventLogEntry>> {
199        // This is less efficient for JSONL but acceptable for testing
200        // We need to scan all files to find events after the sequence
201        let index = self
202            .index
203            .read()
204            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
205
206        let mut all_entries = Vec::new();
207
208        for execution_id in index.keys() {
209            let entries = self.read_execution_file(execution_id)?;
210            for entry in entries {
211                if entry.sequence > after_sequence {
212                    all_entries.push(entry);
213                }
214            }
215        }
216
217        // Sort by sequence and limit
218        all_entries.sort_by_key(|e| e.sequence);
219        all_entries.truncate(limit);
220
221        Ok(all_entries)
222    }
223
224    async fn latest_sequence(&self) -> anyhow::Result<u64> {
225        Ok(*self
226            .sequence
227            .read()
228            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?)
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::kernel::{ExecutionContext, ExecutionEventType};
236    use tempfile::tempdir;
237
238    #[tokio::test]
239    async fn test_jsonl_store_append_and_retrieve() {
240        let dir = tempdir().unwrap();
241        let store = JsonlEventStore::new(dir.path().to_path_buf())
242            .await
243            .unwrap();
244
245        let exec_id = ExecutionId::new();
246        let ctx = ExecutionContext::new(exec_id.clone());
247        let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
248
249        // Append
250        let entry = store.append(event).await.unwrap();
251        assert_eq!(entry.sequence, 1);
252
253        // Retrieve
254        let events = store.get_by_execution(&exec_id).await.unwrap();
255        assert_eq!(events.len(), 1);
256        assert_eq!(events[0].sequence, 1);
257    }
258
259    #[tokio::test]
260    async fn test_jsonl_store_persistence() {
261        let dir = tempdir().unwrap();
262        let exec_id = ExecutionId::new();
263
264        // Create store and append event
265        {
266            let store = JsonlEventStore::new(dir.path().to_path_buf())
267                .await
268                .unwrap();
269            let ctx = ExecutionContext::new(exec_id.clone());
270            let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
271            store.append(event).await.unwrap();
272        }
273
274        // Create new store and verify event persisted
275        {
276            let store = JsonlEventStore::new(dir.path().to_path_buf())
277                .await
278                .unwrap();
279            let events = store.get_by_execution(&exec_id).await.unwrap();
280            assert_eq!(events.len(), 1);
281        }
282    }
283}