enact_core/streaming/
jsonl_event_store.rs1use crate::kernel::{ExecutionEvent, ExecutionId};
7use crate::streaming::event_logger::{EventLogEntry, EventStore};
8use async_trait::async_trait;
9use std::collections::HashMap;
10use std::io::{BufRead, BufReader, Write};
11use std::path::PathBuf;
12use std::sync::RwLock;
13use tokio::fs;
14use tracing::debug;
15
16pub struct JsonlEventStore {
21 base_dir: PathBuf,
23 sequence: RwLock<u64>,
25 index: RwLock<HashMap<String, Vec<u64>>>,
27}
28
29impl JsonlEventStore {
30 pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
32 fs::create_dir_all(&base_dir).await?;
34
35 let store = Self {
36 base_dir,
37 sequence: RwLock::new(0),
38 index: RwLock::new(HashMap::new()),
39 };
40
41 store.rebuild_index().await?;
43
44 Ok(store)
45 }
46
47 fn execution_file(&self, execution_id: &str) -> PathBuf {
49 self.base_dir.join(format!("{}.jsonl", execution_id))
50 }
51
52 async fn rebuild_index(&self) -> anyhow::Result<()> {
54 let mut max_sequence = 0u64;
55 let mut index = HashMap::new();
56
57 let mut entries = match fs::read_dir(&self.base_dir).await {
58 Ok(e) => e,
59 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
60 Err(e) => return Err(e.into()),
61 };
62
63 while let Some(entry) = entries.next_entry().await? {
64 let path = entry.path();
65 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
66 let execution_id = path
67 .file_stem()
68 .and_then(|s| s.to_str())
69 .unwrap_or("")
70 .to_string();
71
72 if let Ok(file) = std::fs::File::open(&path) {
73 let reader = BufReader::new(file);
74 let mut sequences = Vec::new();
75
76 for line in reader.lines().map_while(Result::ok) {
77 if let Ok(entry) = serde_json::from_str::<EventLogEntry>(&line) {
78 sequences.push(entry.sequence);
79 if entry.sequence > max_sequence {
80 max_sequence = entry.sequence;
81 }
82 }
83 }
84
85 if !sequences.is_empty() {
86 index.insert(execution_id, sequences);
87 }
88 }
89 }
90 }
91
92 let num_executions = index.len();
93 *self
94 .sequence
95 .write()
96 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = max_sequence;
97 *self
98 .index
99 .write()
100 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = index;
101
102 debug!(
103 "Rebuilt JSONL event store index: {} executions, max_sequence={}",
104 num_executions, max_sequence
105 );
106
107 Ok(())
108 }
109
110 fn read_execution_file(&self, execution_id: &str) -> anyhow::Result<Vec<EventLogEntry>> {
112 let path = self.execution_file(execution_id);
113 if !path.exists() {
114 return Ok(Vec::new());
115 }
116
117 let file = std::fs::File::open(&path)?;
118 let reader = BufReader::new(file);
119 let mut entries = Vec::new();
120
121 for line in reader.lines() {
122 let line = line?;
123 if !line.trim().is_empty() {
124 let entry: EventLogEntry = serde_json::from_str(&line)?;
125 entries.push(entry);
126 }
127 }
128
129 Ok(entries)
130 }
131}
132
133impl std::fmt::Debug for JsonlEventStore {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.debug_struct("JsonlEventStore")
136 .field("base_dir", &self.base_dir)
137 .finish()
138 }
139}
140
141#[async_trait]
142impl EventStore for JsonlEventStore {
143 async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
144 let execution_id = event.context.execution_id.as_str().to_string();
145
146 let sequence = {
148 let mut seq = self
149 .sequence
150 .write()
151 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
152 *seq += 1;
153 *seq
154 };
155
156 let entry = EventLogEntry::new(sequence, event);
157
158 let json = serde_json::to_string(&entry)?;
160
161 let path = self.execution_file(&execution_id);
163 let mut file = std::fs::OpenOptions::new()
164 .create(true)
165 .append(true)
166 .open(&path)?;
167 writeln!(file, "{}", json)?;
168 file.flush()?;
169
170 {
172 let mut index = self
173 .index
174 .write()
175 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
176 index
177 .entry(execution_id)
178 .or_insert_with(Vec::new)
179 .push(sequence);
180 }
181
182 debug!("Appended event {} to JSONL store", sequence);
183
184 Ok(entry)
185 }
186
187 async fn get_by_execution(
188 &self,
189 execution_id: &ExecutionId,
190 ) -> anyhow::Result<Vec<EventLogEntry>> {
191 self.read_execution_file(execution_id.as_str())
192 }
193
194 async fn get_after(
195 &self,
196 after_sequence: u64,
197 limit: usize,
198 ) -> anyhow::Result<Vec<EventLogEntry>> {
199 let index = self
202 .index
203 .read()
204 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
205
206 let mut all_entries = Vec::new();
207
208 for execution_id in index.keys() {
209 let entries = self.read_execution_file(execution_id)?;
210 for entry in entries {
211 if entry.sequence > after_sequence {
212 all_entries.push(entry);
213 }
214 }
215 }
216
217 all_entries.sort_by_key(|e| e.sequence);
219 all_entries.truncate(limit);
220
221 Ok(all_entries)
222 }
223
224 async fn latest_sequence(&self) -> anyhow::Result<u64> {
225 Ok(*self
226 .sequence
227 .read()
228 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?)
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::kernel::{ExecutionContext, ExecutionEventType};
236 use tempfile::tempdir;
237
238 #[tokio::test]
239 async fn test_jsonl_store_append_and_retrieve() {
240 let dir = tempdir().unwrap();
241 let store = JsonlEventStore::new(dir.path().to_path_buf())
242 .await
243 .unwrap();
244
245 let exec_id = ExecutionId::new();
246 let ctx = ExecutionContext::new(exec_id.clone());
247 let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
248
249 let entry = store.append(event).await.unwrap();
251 assert_eq!(entry.sequence, 1);
252
253 let events = store.get_by_execution(&exec_id).await.unwrap();
255 assert_eq!(events.len(), 1);
256 assert_eq!(events[0].sequence, 1);
257 }
258
259 #[tokio::test]
260 async fn test_jsonl_store_persistence() {
261 let dir = tempdir().unwrap();
262 let exec_id = ExecutionId::new();
263
264 {
266 let store = JsonlEventStore::new(dir.path().to_path_buf())
267 .await
268 .unwrap();
269 let ctx = ExecutionContext::new(exec_id.clone());
270 let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
271 store.append(event).await.unwrap();
272 }
273
274 {
276 let store = JsonlEventStore::new(dir.path().to_path_buf())
277 .await
278 .unwrap();
279 let events = store.get_by_execution(&exec_id).await.unwrap();
280 assert_eq!(events.len(), 1);
281 }
282 }
283}