enact_core/kernel/persistence/state_store.rs
1//! StateStore - Hot snapshots and working memory
2//!
3//! The StateStore provides fast access to execution state for resumption.
4//! It is a **cache**, not a source of truth - the EventStore is authoritative.
5//!
6//! ## Guarantees
7//!
8//! - **Durability**: Best-effort - may be lost on failure
9//! - **Freshness**: May be stale - always verify sequence with EventStore
10//! - **Authority**: Non-authoritative - cache only
11//! - **Availability**: Designed for fast reads, not durability
12//!
13//! ## Use Cases
14//!
15//! - Fast execution resumption (avoid replaying all events)
16//! - Working memory during execution
17//! - Temporary state that doesn't need event sourcing
18//!
19//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md
20
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::time::Duration;
26
27/// Extension trait for JSON operations on StateStore
28///
29/// This is separate from StateStore to maintain dyn-compatibility.
30#[async_trait]
31pub trait StateStoreJsonExt: StateStore {
32 /// Set a JSON value
33 async fn set_json<T: Serialize + Send + Sync>(
34 &self,
35 key: &str,
36 value: &T,
37 ttl: Option<Duration>,
38 ) -> anyhow::Result<()> {
39 let bytes = serde_json::to_vec(value)?;
40 self.set(key, &bytes, ttl).await
41 }
42
43 /// Get a JSON value
44 async fn get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Option<T>> {
45 match self.get(key).await? {
46 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
47 None => Ok(None),
48 }
49 }
50}
51
52// Blanket implementation for all StateStore implementors
53impl<S: StateStore + ?Sized> StateStoreJsonExt for S {}
54
55use crate::kernel::{ExecutionId, ExecutionState, StepId, TenantId, UserId};
56
57use super::StorageBackend;
58
59/// Execution state snapshot
60///
61/// This captures the current state of an execution for fast resumption.
62/// The `sequence_number` should match the EventStore sequence to verify freshness.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ExecutionSnapshot {
65 /// The execution this snapshot is for
66 pub execution_id: ExecutionId,
67 /// Tenant for multi-tenancy isolation
68 pub tenant_id: TenantId,
69 /// User who initiated this execution (for observability)
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub user_id: Option<UserId>,
72 /// Current execution state
73 pub state: ExecutionState,
74 /// Currently executing step (if any)
75 pub current_step_id: Option<StepId>,
76 /// Outputs from completed steps
77 pub step_outputs: HashMap<StepId, serde_json::Value>,
78 /// Working variables
79 pub variables: HashMap<String, serde_json::Value>,
80 /// When this snapshot was created
81 pub timestamp: DateTime<Utc>,
82 /// EventStore sequence number at snapshot time
83 ///
84 /// Used to verify freshness - if EventStore has more events,
85 /// the snapshot may be stale.
86 pub sequence_number: u64,
87}
88
89impl ExecutionSnapshot {
90 /// Create a new snapshot
91 pub fn new(
92 execution_id: ExecutionId,
93 tenant_id: TenantId,
94 state: ExecutionState,
95 sequence_number: u64,
96 ) -> Self {
97 Self {
98 execution_id,
99 tenant_id,
100 user_id: None,
101 state,
102 current_step_id: None,
103 step_outputs: HashMap::new(),
104 variables: HashMap::new(),
105 timestamp: Utc::now(),
106 sequence_number,
107 }
108 }
109
110 /// Create a new snapshot with user_id
111 pub fn with_user(
112 execution_id: ExecutionId,
113 tenant_id: TenantId,
114 user_id: Option<UserId>,
115 state: ExecutionState,
116 sequence_number: u64,
117 ) -> Self {
118 Self {
119 execution_id,
120 tenant_id,
121 user_id,
122 state,
123 current_step_id: None,
124 step_outputs: HashMap::new(),
125 variables: HashMap::new(),
126 timestamp: Utc::now(),
127 sequence_number,
128 }
129 }
130
131 /// Check if this snapshot is fresh compared to an EventStore sequence
132 pub fn is_fresh(&self, event_store_sequence: u64) -> bool {
133 self.sequence_number >= event_store_sequence
134 }
135}
136
137/// StateStore trait - mutable snapshot cache
138///
139/// Provides fast state access for execution resumption and working memory.
140/// This is NOT the source of truth - always verify freshness against EventStore.
141#[async_trait]
142pub trait StateStore: StorageBackend {
143 // =========================================================================
144 // Snapshot operations
145 // =========================================================================
146
147 /// Save a state snapshot
148 ///
149 /// # Guarantees
150 /// - Best-effort durability
151 /// - May be overwritten by subsequent saves
152 /// - May be lost on backend failure
153 async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()>;
154
155 /// Load the latest snapshot for an execution
156 ///
157 /// Returns `None` if no snapshot exists.
158 ///
159 /// # Important
160 /// Always check `snapshot.sequence_number` against EventStore to verify freshness.
161 async fn load_snapshot(
162 &self,
163 execution_id: &ExecutionId,
164 ) -> anyhow::Result<Option<ExecutionSnapshot>>;
165
166 /// Delete a snapshot
167 ///
168 /// Called when an execution completes or is cleaned up.
169 async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()>;
170
171 // =========================================================================
172 // Key-value operations (working memory)
173 // =========================================================================
174
175 /// Set a key-value pair
176 ///
177 /// # Arguments
178 /// * `key` - The key (should be namespaced, e.g., "exec:{id}:var:{name}")
179 /// * `value` - The value as bytes
180 /// * `ttl` - Optional time-to-live (auto-delete after this duration)
181 async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> anyhow::Result<()>;
182
183 /// Get a value by key
184 ///
185 /// Returns `None` if the key doesn't exist or has expired.
186 async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>>;
187
188 /// Delete a key
189 async fn delete(&self, key: &str) -> anyhow::Result<()>;
190
191 /// Check if a key exists
192 async fn exists(&self, key: &str) -> anyhow::Result<bool> {
193 Ok(self.get(key).await?.is_some())
194 }
195
196 // =========================================================================
197 // Batch operations
198 // =========================================================================
199
200 /// Delete all state for an execution
201 ///
202 /// Cleans up snapshot and any keys prefixed with the execution ID.
203 async fn delete_execution_state(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
204 self.delete_snapshot(execution_id).await
205 // Implementations may also clean up related keys
206 }
207
208 /// List snapshots for a tenant (for cleanup/admin)
209 async fn list_snapshots(
210 &self,
211 tenant_id: &TenantId,
212 limit: usize,
213 ) -> anyhow::Result<Vec<ExecutionId>>;
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_snapshot_freshness() {
222 let snapshot = ExecutionSnapshot::new(
223 ExecutionId::new(),
224 TenantId::from("test"),
225 ExecutionState::Running,
226 10,
227 );
228
229 assert!(snapshot.is_fresh(10));
230 assert!(snapshot.is_fresh(5));
231 assert!(!snapshot.is_fresh(15));
232 }
233
234 #[test]
235 fn test_snapshot_serialization() {
236 let mut snapshot = ExecutionSnapshot::new(
237 ExecutionId::new(),
238 TenantId::from("test"),
239 ExecutionState::Running,
240 5,
241 );
242 snapshot
243 .variables
244 .insert("foo".to_string(), serde_json::json!("bar"));
245
246 let json = serde_json::to_string(&snapshot).unwrap();
247 let parsed: ExecutionSnapshot = serde_json::from_str(&json).unwrap();
248
249 assert_eq!(parsed.state, ExecutionState::Running);
250 assert_eq!(parsed.variables.get("foo").unwrap(), "bar");
251 }
252}