enact_core/streaming/
event_logger.rs1use crate::kernel::{ExecutionEvent, ExecutionId};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::{Arc, RwLock};
13
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
16pub struct EventLogEntry {
17 pub sequence: u64,
19 pub event: ExecutionEvent,
21}
22
23impl EventLogEntry {
24 pub fn new(sequence: u64, event: ExecutionEvent) -> Self {
26 Self { sequence, event }
27 }
28}
29
30#[async_trait]
32pub trait EventStore: Send + Sync {
33 async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry>;
35
36 async fn get_by_execution(
38 &self,
39 execution_id: &ExecutionId,
40 ) -> anyhow::Result<Vec<EventLogEntry>>;
41
42 async fn get_after(
44 &self,
45 after_sequence: u64,
46 limit: usize,
47 ) -> anyhow::Result<Vec<EventLogEntry>>;
48
49 async fn latest_sequence(&self) -> anyhow::Result<u64>;
51}
52
53#[derive(Debug, Default)]
55pub struct InMemoryEventStore {
56 events: RwLock<Vec<EventLogEntry>>,
57 by_execution: RwLock<HashMap<String, Vec<usize>>>,
58}
59
60impl InMemoryEventStore {
61 pub fn new() -> Self {
63 Self {
64 events: RwLock::new(Vec::new()),
65 by_execution: RwLock::new(HashMap::new()),
66 }
67 }
68}
69
70#[async_trait]
71impl EventStore for InMemoryEventStore {
72 async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
73 let mut events = self
74 .events
75 .write()
76 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
77 let sequence = events.len() as u64;
78 let entry = EventLogEntry::new(sequence, event.clone());
79
80 events.push(entry.clone());
82
83 let mut by_execution = self
85 .by_execution
86 .write()
87 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
88 by_execution
89 .entry(event.context.execution_id.as_str().to_string())
90 .or_insert_with(Vec::new)
91 .push(sequence as usize);
92
93 Ok(entry)
94 }
95
96 async fn get_by_execution(
97 &self,
98 execution_id: &ExecutionId,
99 ) -> anyhow::Result<Vec<EventLogEntry>> {
100 let events = self
101 .events
102 .read()
103 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
104 let by_execution = self
105 .by_execution
106 .read()
107 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
108
109 let indices = by_execution
110 .get(execution_id.as_str())
111 .map(|v| v.as_slice())
112 .unwrap_or(&[]);
113
114 Ok(indices
115 .iter()
116 .filter_map(|&i| events.get(i).cloned())
117 .collect())
118 }
119
120 async fn get_after(
121 &self,
122 after_sequence: u64,
123 limit: usize,
124 ) -> anyhow::Result<Vec<EventLogEntry>> {
125 let events = self
126 .events
127 .read()
128 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
129
130 Ok(events
131 .iter()
132 .skip(after_sequence as usize + 1)
133 .take(limit)
134 .cloned()
135 .collect())
136 }
137
138 async fn latest_sequence(&self) -> anyhow::Result<u64> {
139 let events = self
140 .events
141 .read()
142 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
143 Ok(events.len().saturating_sub(1) as u64)
144 }
145}
146
147pub struct EventLog {
149 store: Arc<dyn EventStore>,
150}
151
152impl EventLog {
153 pub fn new(store: Arc<dyn EventStore>) -> Self {
155 Self { store }
156 }
157
158 pub fn in_memory() -> Self {
160 Self::new(Arc::new(InMemoryEventStore::new()))
161 }
162
163 pub async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
165 self.store.append(event).await
166 }
167
168 pub async fn get_by_execution(
170 &self,
171 execution_id: &ExecutionId,
172 ) -> anyhow::Result<Vec<EventLogEntry>> {
173 self.store.get_by_execution(execution_id).await
174 }
175
176 pub async fn get_after(
178 &self,
179 after_sequence: u64,
180 limit: usize,
181 ) -> anyhow::Result<Vec<EventLogEntry>> {
182 self.store.get_after(after_sequence, limit).await
183 }
184
185 pub async fn latest_sequence(&self) -> anyhow::Result<u64> {
187 self.store.latest_sequence().await
188 }
189
190 pub fn store(&self) -> &Arc<dyn EventStore> {
192 &self.store
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::kernel::{ExecutionContext, ExecutionEventType};
200
201 #[tokio::test]
202 async fn test_event_log_append_and_retrieve() {
203 let log = EventLog::in_memory();
204
205 let exec_id = ExecutionId::new();
206 let ctx = ExecutionContext::new(exec_id.clone());
207 let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
208
209 let entry = log.append(event).await.unwrap();
211 assert_eq!(entry.sequence, 0);
212
213 let events = log.get_by_execution(&exec_id).await.unwrap();
215 assert_eq!(events.len(), 1);
216 assert_eq!(events[0].sequence, 0);
217 }
218
219 #[tokio::test]
220 async fn test_event_log_get_after() {
221 let log = EventLog::in_memory();
222
223 for _ in 0..5 {
225 let exec_id = ExecutionId::new();
226 let ctx = ExecutionContext::new(exec_id);
227 let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
228 log.append(event).await.unwrap();
229 }
230
231 let events = log.get_after(2, 10).await.unwrap();
233 assert_eq!(events.len(), 2); assert_eq!(events[0].sequence, 3);
235 assert_eq!(events[1].sequence, 4);
236 }
237}