Skip to main content

enact_core/kernel/persistence/
event_store.rs

1//! EventStore - Append-only event log
2//!
3//! The EventStore is the **authoritative source of truth** for execution history.
4//! It provides:
5//! - Append-only semantics (events are never modified)
6//! - Ordered events per execution
7//! - Durability guarantees (event is persisted before append returns)
8//!
9//! ## Guarantees
10//!
11//! - **Durability**: Event is durable before `append()` returns
12//! - **Ordering**: Events are strictly ordered by sequence number within an execution
13//! - **Immutability**: Events are append-only, never modified or deleted
14//! - **Authority**: EventStore is the single source of truth for execution history
15//!
16//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md
17
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21
22use crate::kernel::{
23    ExecutionError, ExecutionId, ParentLink, StepId, StepSourceType, StepType, TenantId,
24};
25
26use super::StorageBackend;
27
28/// A stored event with metadata
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StoredEvent {
31    /// Unique sequence number within the execution
32    pub sequence: u64,
33    /// The execution this event belongs to
34    pub execution_id: ExecutionId,
35    /// Tenant for multi-tenancy
36    pub tenant_id: TenantId,
37    /// The event payload
38    pub event: ExecutionEventData,
39    /// When this event occurred
40    pub timestamp: DateTime<Utc>,
41}
42
43/// Event data that can be stored
44///
45/// These events represent all state transitions in an execution.
46/// They can be replayed to reconstruct execution state.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type", rename_all = "snake_case")]
49pub enum ExecutionEventData {
50    /// Execution started
51    ExecutionStarted {
52        parent: Option<ParentLink>,
53        metadata: Option<serde_json::Value>,
54    },
55
56    /// A step started
57    StepStarted {
58        step_id: StepId,
59        parent_step_id: Option<StepId>,
60        step_type: StepType,
61        name: String,
62    },
63
64    /// A step completed successfully
65    StepCompleted {
66        step_id: StepId,
67        output: Option<String>,
68        duration_ms: u64,
69    },
70
71    /// A step failed
72    StepFailed {
73        step_id: StepId,
74        error: ExecutionError,
75    },
76
77    /// Execution paused
78    ExecutionPaused { reason: String },
79
80    /// Execution resumed
81    ExecutionResumed,
82
83    /// Execution completed successfully
84    ExecutionCompleted {
85        output: Option<String>,
86        duration_ms: u64,
87    },
88
89    /// Execution failed
90    ExecutionFailed { error: ExecutionError },
91
92    /// Execution cancelled
93    ExecutionCancelled { reason: String },
94
95    /// Artifact stored
96    ArtifactStored {
97        artifact_id: String,
98        step_id: StepId,
99        content_hash: String,
100        size_bytes: u64,
101    },
102
103    /// A step was dynamically discovered during execution
104    ///
105    /// Emitted when the agentic loop or LLM output identifies new work.
106    /// Enables audit trails for discovered steps and replay of discovery sequences.
107    StepDiscovered {
108        /// The discovered step's ID
109        step_id: StepId,
110        /// Which step discovered this new work (if any)
111        discovered_by: Option<StepId>,
112        /// What triggered the discovery
113        source_type: StepSourceType,
114        /// Human-readable reason for discovery
115        reason: String,
116        /// Discovery depth (how deep in the discovery chain)
117        depth: u32,
118    },
119
120    /// A tool call started (input available)
121    ToolCallStarted {
122        step_id: Option<StepId>,
123        tool_call_id: String,
124        tool_name: String,
125        input: serde_json::Value,
126    },
127
128    /// A tool call completed (output available)
129    ToolCallCompleted {
130        step_id: Option<StepId>,
131        tool_call_id: String,
132        output: serde_json::Value,
133    },
134
135    /// A checkpoint was saved
136    CheckpointSaved {
137        step_id: Option<StepId>,
138        checkpoint_id: String,
139        state_hash: String,
140    },
141
142    /// A goal was evaluated
143    GoalEvaluated {
144        step_id: Option<StepId>,
145        goal_id: String,
146        status: String, // "met", "not_met", "progressing"
147        score: Option<f64>,
148        reason: Option<String>,
149    },
150
151    /// Custom event (for extensibility)
152    Custom {
153        event_type: String,
154        data: serde_json::Value,
155    },
156}
157
158/// EventStore trait - append-only event log
159///
160/// This is the core persistence trait for execution events. All execution
161/// state can be reconstructed by replaying events from this store.
162#[async_trait]
163pub trait EventStore: StorageBackend {
164    /// Append an event to the store
165    ///
166    /// # Arguments
167    /// * `execution_id` - The execution this event belongs to
168    /// * `tenant_id` - Tenant for multi-tenancy isolation
169    /// * `event` - The event data to store
170    ///
171    /// # Returns
172    /// * `Ok(sequence)` - The sequence number assigned to this event
173    /// * `Err` - If the event could not be persisted
174    ///
175    /// # Guarantees
176    /// - Event is durable when this returns Ok
177    /// - Sequence numbers are monotonically increasing per execution
178    async fn append(
179        &self,
180        execution_id: &ExecutionId,
181        tenant_id: &TenantId,
182        event: ExecutionEventData,
183    ) -> anyhow::Result<u64>;
184
185    /// Append multiple events atomically
186    ///
187    /// All events are stored with consecutive sequence numbers, or none are stored.
188    ///
189    /// # Returns
190    /// * `Ok(first_sequence)` - The sequence number of the first event in the batch
191    async fn append_batch(
192        &self,
193        execution_id: &ExecutionId,
194        tenant_id: &TenantId,
195        events: Vec<ExecutionEventData>,
196    ) -> anyhow::Result<u64>;
197
198    /// Load all events for an execution
199    ///
200    /// Returns events ordered by sequence number.
201    async fn load_events(&self, execution_id: &ExecutionId) -> anyhow::Result<Vec<StoredEvent>>;
202
203    /// Load events after a specific sequence number
204    ///
205    /// Useful for incremental replay or catching up after a checkpoint.
206    ///
207    /// # Arguments
208    /// * `execution_id` - The execution to load events for
209    /// * `after_seq` - Return events with sequence > after_seq
210    async fn load_events_after(
211        &self,
212        execution_id: &ExecutionId,
213        after_seq: u64,
214    ) -> anyhow::Result<Vec<StoredEvent>>;
215
216    /// Get the latest sequence number for an execution
217    ///
218    /// Returns `None` if no events exist for this execution.
219    async fn get_latest_sequence(&self, execution_id: &ExecutionId) -> anyhow::Result<Option<u64>>;
220
221    /// Check if an execution exists
222    async fn execution_exists(&self, execution_id: &ExecutionId) -> anyhow::Result<bool> {
223        Ok(self.get_latest_sequence(execution_id).await?.is_some())
224    }
225
226    /// List executions for a tenant
227    ///
228    /// Returns execution IDs ordered by creation time (newest first).
229    ///
230    /// # Arguments
231    /// * `tenant_id` - The tenant to list executions for
232    /// * `limit` - Maximum number of executions to return
233    /// * `offset` - Number of executions to skip (for pagination)
234    async fn list_executions(
235        &self,
236        tenant_id: &TenantId,
237        limit: usize,
238        offset: usize,
239    ) -> anyhow::Result<Vec<ExecutionId>>;
240
241    /// Count events for an execution
242    async fn count_events(&self, execution_id: &ExecutionId) -> anyhow::Result<u64> {
243        Ok(self.get_latest_sequence(execution_id).await?.unwrap_or(0))
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[test]
252    fn test_event_serialization() {
253        let event = ExecutionEventData::StepStarted {
254            step_id: StepId::new(),
255            parent_step_id: None,
256            step_type: StepType::FunctionNode,
257            name: "test_step".to_string(),
258        };
259
260        let json = serde_json::to_string(&event).unwrap();
261        let parsed: ExecutionEventData = serde_json::from_str(&json).unwrap();
262
263        match parsed {
264            ExecutionEventData::StepStarted { name, .. } => {
265                assert_eq!(name, "test_step");
266            }
267            _ => panic!("Wrong event type"),
268        }
269    }
270}