enact_core/kernel/persistence/event_store.rs
1//! EventStore - Append-only event log
2//!
3//! The EventStore is the **authoritative source of truth** for execution history.
4//! It provides:
5//! - Append-only semantics (events are never modified)
6//! - Ordered events per execution
7//! - Durability guarantees (event is persisted before append returns)
8//!
9//! ## Guarantees
10//!
11//! - **Durability**: Event is durable before `append()` returns
12//! - **Ordering**: Events are strictly ordered by sequence number within an execution
13//! - **Immutability**: Events are append-only, never modified or deleted
14//! - **Authority**: EventStore is the single source of truth for execution history
15//!
16//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md
17
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21
22use crate::kernel::{
23 ExecutionError, ExecutionId, ParentLink, StepId, StepSourceType, StepType, TenantId,
24};
25
26use super::StorageBackend;
27
28/// A stored event with metadata
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StoredEvent {
31 /// Unique sequence number within the execution
32 pub sequence: u64,
33 /// The execution this event belongs to
34 pub execution_id: ExecutionId,
35 /// Tenant for multi-tenancy
36 pub tenant_id: TenantId,
37 /// The event payload
38 pub event: ExecutionEventData,
39 /// When this event occurred
40 pub timestamp: DateTime<Utc>,
41}
42
43/// Event data that can be stored
44///
45/// These events represent all state transitions in an execution.
46/// They can be replayed to reconstruct execution state.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type", rename_all = "snake_case")]
49pub enum ExecutionEventData {
50 /// Execution started
51 ExecutionStarted {
52 parent: Option<ParentLink>,
53 metadata: Option<serde_json::Value>,
54 },
55
56 /// A step started
57 StepStarted {
58 step_id: StepId,
59 parent_step_id: Option<StepId>,
60 step_type: StepType,
61 name: String,
62 },
63
64 /// A step completed successfully
65 StepCompleted {
66 step_id: StepId,
67 output: Option<String>,
68 duration_ms: u64,
69 },
70
71 /// A step failed
72 StepFailed {
73 step_id: StepId,
74 error: ExecutionError,
75 },
76
77 /// Execution paused
78 ExecutionPaused { reason: String },
79
80 /// Execution resumed
81 ExecutionResumed,
82
83 /// Execution completed successfully
84 ExecutionCompleted {
85 output: Option<String>,
86 duration_ms: u64,
87 },
88
89 /// Execution failed
90 ExecutionFailed { error: ExecutionError },
91
92 /// Execution cancelled
93 ExecutionCancelled { reason: String },
94
95 /// Artifact stored
96 ArtifactStored {
97 artifact_id: String,
98 step_id: StepId,
99 content_hash: String,
100 size_bytes: u64,
101 },
102
103 /// A step was dynamically discovered during execution
104 ///
105 /// Emitted when the agentic loop or LLM output identifies new work.
106 /// Enables audit trails for discovered steps and replay of discovery sequences.
107 StepDiscovered {
108 /// The discovered step's ID
109 step_id: StepId,
110 /// Which step discovered this new work (if any)
111 discovered_by: Option<StepId>,
112 /// What triggered the discovery
113 source_type: StepSourceType,
114 /// Human-readable reason for discovery
115 reason: String,
116 /// Discovery depth (how deep in the discovery chain)
117 depth: u32,
118 },
119
120 /// A tool call started (input available)
121 ToolCallStarted {
122 step_id: Option<StepId>,
123 tool_call_id: String,
124 tool_name: String,
125 input: serde_json::Value,
126 },
127
128 /// A tool call completed (output available)
129 ToolCallCompleted {
130 step_id: Option<StepId>,
131 tool_call_id: String,
132 output: serde_json::Value,
133 },
134
135 /// A checkpoint was saved
136 CheckpointSaved {
137 step_id: Option<StepId>,
138 checkpoint_id: String,
139 state_hash: String,
140 },
141
142 /// A goal was evaluated
143 GoalEvaluated {
144 step_id: Option<StepId>,
145 goal_id: String,
146 status: String, // "met", "not_met", "progressing"
147 score: Option<f64>,
148 reason: Option<String>,
149 },
150
151 /// Custom event (for extensibility)
152 Custom {
153 event_type: String,
154 data: serde_json::Value,
155 },
156}
157
158/// EventStore trait - append-only event log
159///
160/// This is the core persistence trait for execution events. All execution
161/// state can be reconstructed by replaying events from this store.
162#[async_trait]
163pub trait EventStore: StorageBackend {
164 /// Append an event to the store
165 ///
166 /// # Arguments
167 /// * `execution_id` - The execution this event belongs to
168 /// * `tenant_id` - Tenant for multi-tenancy isolation
169 /// * `event` - The event data to store
170 ///
171 /// # Returns
172 /// * `Ok(sequence)` - The sequence number assigned to this event
173 /// * `Err` - If the event could not be persisted
174 ///
175 /// # Guarantees
176 /// - Event is durable when this returns Ok
177 /// - Sequence numbers are monotonically increasing per execution
178 async fn append(
179 &self,
180 execution_id: &ExecutionId,
181 tenant_id: &TenantId,
182 event: ExecutionEventData,
183 ) -> anyhow::Result<u64>;
184
185 /// Append multiple events atomically
186 ///
187 /// All events are stored with consecutive sequence numbers, or none are stored.
188 ///
189 /// # Returns
190 /// * `Ok(first_sequence)` - The sequence number of the first event in the batch
191 async fn append_batch(
192 &self,
193 execution_id: &ExecutionId,
194 tenant_id: &TenantId,
195 events: Vec<ExecutionEventData>,
196 ) -> anyhow::Result<u64>;
197
198 /// Load all events for an execution
199 ///
200 /// Returns events ordered by sequence number.
201 async fn load_events(&self, execution_id: &ExecutionId) -> anyhow::Result<Vec<StoredEvent>>;
202
203 /// Load events after a specific sequence number
204 ///
205 /// Useful for incremental replay or catching up after a checkpoint.
206 ///
207 /// # Arguments
208 /// * `execution_id` - The execution to load events for
209 /// * `after_seq` - Return events with sequence > after_seq
210 async fn load_events_after(
211 &self,
212 execution_id: &ExecutionId,
213 after_seq: u64,
214 ) -> anyhow::Result<Vec<StoredEvent>>;
215
216 /// Get the latest sequence number for an execution
217 ///
218 /// Returns `None` if no events exist for this execution.
219 async fn get_latest_sequence(&self, execution_id: &ExecutionId) -> anyhow::Result<Option<u64>>;
220
221 /// Check if an execution exists
222 async fn execution_exists(&self, execution_id: &ExecutionId) -> anyhow::Result<bool> {
223 Ok(self.get_latest_sequence(execution_id).await?.is_some())
224 }
225
226 /// List executions for a tenant
227 ///
228 /// Returns execution IDs ordered by creation time (newest first).
229 ///
230 /// # Arguments
231 /// * `tenant_id` - The tenant to list executions for
232 /// * `limit` - Maximum number of executions to return
233 /// * `offset` - Number of executions to skip (for pagination)
234 async fn list_executions(
235 &self,
236 tenant_id: &TenantId,
237 limit: usize,
238 offset: usize,
239 ) -> anyhow::Result<Vec<ExecutionId>>;
240
241 /// Count events for an execution
242 async fn count_events(&self, execution_id: &ExecutionId) -> anyhow::Result<u64> {
243 Ok(self.get_latest_sequence(execution_id).await?.unwrap_or(0))
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[test]
252 fn test_event_serialization() {
253 let event = ExecutionEventData::StepStarted {
254 step_id: StepId::new(),
255 parent_step_id: None,
256 step_type: StepType::FunctionNode,
257 name: "test_step".to_string(),
258 };
259
260 let json = serde_json::to_string(&event).unwrap();
261 let parsed: ExecutionEventData = serde_json::from_str(&json).unwrap();
262
263 match parsed {
264 ExecutionEventData::StepStarted { name, .. } => {
265 assert_eq!(name, "test_step");
266 }
267 _ => panic!("Wrong event type"),
268 }
269 }
270}