Skip to main content

enact_core/streaming/
event_logger.rs

1//! Event Log - Append-only execution event store
2//!
3//! The event log is the source of truth for all execution events.
4//! It is:
5//! - Append-only (events cannot be modified or deleted)
6//! - Ordered (events have a sequence number)
7//! - Durable (persisted to storage)
8
9use crate::kernel::{ExecutionEvent, ExecutionId};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use std::sync::{Arc, RwLock};
13
14/// Event log entry with sequence number
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
16pub struct EventLogEntry {
17    /// Global sequence number (monotonically increasing)
18    pub sequence: u64,
19    /// The event
20    pub event: ExecutionEvent,
21}
22
23impl EventLogEntry {
24    /// Create a new entry
25    pub fn new(sequence: u64, event: ExecutionEvent) -> Self {
26        Self { sequence, event }
27    }
28}
29
30/// Event store trait - persistence backend
31#[async_trait]
32pub trait EventStore: Send + Sync {
33    /// Append an event to the log
34    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry>;
35
36    /// Get events for an execution
37    async fn get_by_execution(
38        &self,
39        execution_id: &ExecutionId,
40    ) -> anyhow::Result<Vec<EventLogEntry>>;
41
42    /// Get events after a sequence number
43    async fn get_after(
44        &self,
45        after_sequence: u64,
46        limit: usize,
47    ) -> anyhow::Result<Vec<EventLogEntry>>;
48
49    /// Get the latest sequence number
50    async fn latest_sequence(&self) -> anyhow::Result<u64>;
51}
52
53/// In-memory event store (for testing/development)
54#[derive(Debug, Default)]
55pub struct InMemoryEventStore {
56    events: RwLock<Vec<EventLogEntry>>,
57    by_execution: RwLock<HashMap<String, Vec<usize>>>,
58}
59
60impl InMemoryEventStore {
61    /// Create a new in-memory event store
62    pub fn new() -> Self {
63        Self {
64            events: RwLock::new(Vec::new()),
65            by_execution: RwLock::new(HashMap::new()),
66        }
67    }
68}
69
70#[async_trait]
71impl EventStore for InMemoryEventStore {
72    async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
73        let mut events = self
74            .events
75            .write()
76            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
77        let sequence = events.len() as u64;
78        let entry = EventLogEntry::new(sequence, event.clone());
79
80        // Store the event
81        events.push(entry.clone());
82
83        // Index by execution
84        let mut by_execution = self
85            .by_execution
86            .write()
87            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
88        by_execution
89            .entry(event.context.execution_id.as_str().to_string())
90            .or_insert_with(Vec::new)
91            .push(sequence as usize);
92
93        Ok(entry)
94    }
95
96    async fn get_by_execution(
97        &self,
98        execution_id: &ExecutionId,
99    ) -> anyhow::Result<Vec<EventLogEntry>> {
100        let events = self
101            .events
102            .read()
103            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
104        let by_execution = self
105            .by_execution
106            .read()
107            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
108
109        let indices = by_execution
110            .get(execution_id.as_str())
111            .map(|v| v.as_slice())
112            .unwrap_or(&[]);
113
114        Ok(indices
115            .iter()
116            .filter_map(|&i| events.get(i).cloned())
117            .collect())
118    }
119
120    async fn get_after(
121        &self,
122        after_sequence: u64,
123        limit: usize,
124    ) -> anyhow::Result<Vec<EventLogEntry>> {
125        let events = self
126            .events
127            .read()
128            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
129
130        Ok(events
131            .iter()
132            .skip(after_sequence as usize + 1)
133            .take(limit)
134            .cloned()
135            .collect())
136    }
137
138    async fn latest_sequence(&self) -> anyhow::Result<u64> {
139        let events = self
140            .events
141            .read()
142            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
143        Ok(events.len().saturating_sub(1) as u64)
144    }
145}
146
147/// Event log - wraps an event store with subscription support
148pub struct EventLog {
149    store: Arc<dyn EventStore>,
150}
151
152impl EventLog {
153    /// Create a new event log with the given store
154    pub fn new(store: Arc<dyn EventStore>) -> Self {
155        Self { store }
156    }
157
158    /// Create an in-memory event log (for testing)
159    pub fn in_memory() -> Self {
160        Self::new(Arc::new(InMemoryEventStore::new()))
161    }
162
163    /// Append an event
164    pub async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
165        self.store.append(event).await
166    }
167
168    /// Get events for an execution
169    pub async fn get_by_execution(
170        &self,
171        execution_id: &ExecutionId,
172    ) -> anyhow::Result<Vec<EventLogEntry>> {
173        self.store.get_by_execution(execution_id).await
174    }
175
176    /// Get events after a sequence number (for polling/streaming)
177    pub async fn get_after(
178        &self,
179        after_sequence: u64,
180        limit: usize,
181    ) -> anyhow::Result<Vec<EventLogEntry>> {
182        self.store.get_after(after_sequence, limit).await
183    }
184
185    /// Get the latest sequence number
186    pub async fn latest_sequence(&self) -> anyhow::Result<u64> {
187        self.store.latest_sequence().await
188    }
189
190    /// Get access to the underlying store
191    pub fn store(&self) -> &Arc<dyn EventStore> {
192        &self.store
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::kernel::{ExecutionContext, ExecutionEventType};
200
201    #[tokio::test]
202    async fn test_event_log_append_and_retrieve() {
203        let log = EventLog::in_memory();
204
205        let exec_id = ExecutionId::new();
206        let ctx = ExecutionContext::new(exec_id.clone());
207        let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
208
209        // Append
210        let entry = log.append(event).await.unwrap();
211        assert_eq!(entry.sequence, 0);
212
213        // Retrieve
214        let events = log.get_by_execution(&exec_id).await.unwrap();
215        assert_eq!(events.len(), 1);
216        assert_eq!(events[0].sequence, 0);
217    }
218
219    #[tokio::test]
220    async fn test_event_log_get_after() {
221        let log = EventLog::in_memory();
222
223        // Append multiple events
224        for _ in 0..5 {
225            let exec_id = ExecutionId::new();
226            let ctx = ExecutionContext::new(exec_id);
227            let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
228            log.append(event).await.unwrap();
229        }
230
231        // Get after sequence 2
232        let events = log.get_after(2, 10).await.unwrap();
233        assert_eq!(events.len(), 2); // Events 3 and 4
234        assert_eq!(events[0].sequence, 3);
235        assert_eq!(events[1].sequence, 4);
236    }
237}