Skip to main content

enact_core/kernel/persistence/
state_store.rs

1//! StateStore - Hot snapshots and working memory
2//!
3//! The StateStore provides fast access to execution state for resumption.
4//! It is a **cache**, not a source of truth - the EventStore is authoritative.
5//!
6//! ## Guarantees
7//!
8//! - **Durability**: Best-effort - may be lost on failure
9//! - **Freshness**: May be stale - always verify sequence with EventStore
10//! - **Authority**: Non-authoritative - cache only
11//! - **Availability**: Designed for fast reads, not durability
12//!
13//! ## Use Cases
14//!
15//! - Fast execution resumption (avoid replaying all events)
16//! - Working memory during execution
17//! - Temporary state that doesn't need event sourcing
18//!
19//! @see docs/TECHNICAL/14-PERSISTENCE-LAYER.md
20
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::time::Duration;
26
27/// Extension trait for JSON operations on StateStore
28///
29/// This is separate from StateStore to maintain dyn-compatibility.
30#[async_trait]
31pub trait StateStoreJsonExt: StateStore {
32    /// Set a JSON value
33    async fn set_json<T: Serialize + Send + Sync>(
34        &self,
35        key: &str,
36        value: &T,
37        ttl: Option<Duration>,
38    ) -> anyhow::Result<()> {
39        let bytes = serde_json::to_vec(value)?;
40        self.set(key, &bytes, ttl).await
41    }
42
43    /// Get a JSON value
44    async fn get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Option<T>> {
45        match self.get(key).await? {
46            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
47            None => Ok(None),
48        }
49    }
50}
51
52// Blanket implementation for all StateStore implementors
53impl<S: StateStore + ?Sized> StateStoreJsonExt for S {}
54
55use crate::kernel::{ExecutionId, ExecutionState, StepId, TenantId, UserId};
56
57use super::StorageBackend;
58
59/// Execution state snapshot
60///
61/// This captures the current state of an execution for fast resumption.
62/// The `sequence_number` should match the EventStore sequence to verify freshness.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ExecutionSnapshot {
65    /// The execution this snapshot is for
66    pub execution_id: ExecutionId,
67    /// Tenant for multi-tenancy isolation
68    pub tenant_id: TenantId,
69    /// User who initiated this execution (for observability)
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub user_id: Option<UserId>,
72    /// Current execution state
73    pub state: ExecutionState,
74    /// Currently executing step (if any)
75    pub current_step_id: Option<StepId>,
76    /// Outputs from completed steps
77    pub step_outputs: HashMap<StepId, serde_json::Value>,
78    /// Working variables
79    pub variables: HashMap<String, serde_json::Value>,
80    /// When this snapshot was created
81    pub timestamp: DateTime<Utc>,
82    /// EventStore sequence number at snapshot time
83    ///
84    /// Used to verify freshness - if EventStore has more events,
85    /// the snapshot may be stale.
86    pub sequence_number: u64,
87}
88
89impl ExecutionSnapshot {
90    /// Create a new snapshot
91    pub fn new(
92        execution_id: ExecutionId,
93        tenant_id: TenantId,
94        state: ExecutionState,
95        sequence_number: u64,
96    ) -> Self {
97        Self {
98            execution_id,
99            tenant_id,
100            user_id: None,
101            state,
102            current_step_id: None,
103            step_outputs: HashMap::new(),
104            variables: HashMap::new(),
105            timestamp: Utc::now(),
106            sequence_number,
107        }
108    }
109
110    /// Create a new snapshot with user_id
111    pub fn with_user(
112        execution_id: ExecutionId,
113        tenant_id: TenantId,
114        user_id: Option<UserId>,
115        state: ExecutionState,
116        sequence_number: u64,
117    ) -> Self {
118        Self {
119            execution_id,
120            tenant_id,
121            user_id,
122            state,
123            current_step_id: None,
124            step_outputs: HashMap::new(),
125            variables: HashMap::new(),
126            timestamp: Utc::now(),
127            sequence_number,
128        }
129    }
130
131    /// Check if this snapshot is fresh compared to an EventStore sequence
132    pub fn is_fresh(&self, event_store_sequence: u64) -> bool {
133        self.sequence_number >= event_store_sequence
134    }
135}
136
137/// StateStore trait - mutable snapshot cache
138///
139/// Provides fast state access for execution resumption and working memory.
140/// This is NOT the source of truth - always verify freshness against EventStore.
141#[async_trait]
142pub trait StateStore: StorageBackend {
143    // =========================================================================
144    // Snapshot operations
145    // =========================================================================
146
147    /// Save a state snapshot
148    ///
149    /// # Guarantees
150    /// - Best-effort durability
151    /// - May be overwritten by subsequent saves
152    /// - May be lost on backend failure
153    async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()>;
154
155    /// Load the latest snapshot for an execution
156    ///
157    /// Returns `None` if no snapshot exists.
158    ///
159    /// # Important
160    /// Always check `snapshot.sequence_number` against EventStore to verify freshness.
161    async fn load_snapshot(
162        &self,
163        execution_id: &ExecutionId,
164    ) -> anyhow::Result<Option<ExecutionSnapshot>>;
165
166    /// Delete a snapshot
167    ///
168    /// Called when an execution completes or is cleaned up.
169    async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()>;
170
171    // =========================================================================
172    // Key-value operations (working memory)
173    // =========================================================================
174
175    /// Set a key-value pair
176    ///
177    /// # Arguments
178    /// * `key` - The key (should be namespaced, e.g., "exec:{id}:var:{name}")
179    /// * `value` - The value as bytes
180    /// * `ttl` - Optional time-to-live (auto-delete after this duration)
181    async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> anyhow::Result<()>;
182
183    /// Get a value by key
184    ///
185    /// Returns `None` if the key doesn't exist or has expired.
186    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>>;
187
188    /// Delete a key
189    async fn delete(&self, key: &str) -> anyhow::Result<()>;
190
191    /// Check if a key exists
192    async fn exists(&self, key: &str) -> anyhow::Result<bool> {
193        Ok(self.get(key).await?.is_some())
194    }
195
196    // =========================================================================
197    // Batch operations
198    // =========================================================================
199
200    /// Delete all state for an execution
201    ///
202    /// Cleans up snapshot and any keys prefixed with the execution ID.
203    async fn delete_execution_state(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
204        self.delete_snapshot(execution_id).await
205        // Implementations may also clean up related keys
206    }
207
208    /// List snapshots for a tenant (for cleanup/admin)
209    async fn list_snapshots(
210        &self,
211        tenant_id: &TenantId,
212        limit: usize,
213    ) -> anyhow::Result<Vec<ExecutionId>>;
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_snapshot_freshness() {
222        let snapshot = ExecutionSnapshot::new(
223            ExecutionId::new(),
224            TenantId::from("test"),
225            ExecutionState::Running,
226            10,
227        );
228
229        assert!(snapshot.is_fresh(10));
230        assert!(snapshot.is_fresh(5));
231        assert!(!snapshot.is_fresh(15));
232    }
233
234    #[test]
235    fn test_snapshot_serialization() {
236        let mut snapshot = ExecutionSnapshot::new(
237            ExecutionId::new(),
238            TenantId::from("test"),
239            ExecutionState::Running,
240            5,
241        );
242        snapshot
243            .variables
244            .insert("foo".to_string(), serde_json::json!("bar"));
245
246        let json = serde_json::to_string(&snapshot).unwrap();
247        let parsed: ExecutionSnapshot = serde_json::from_str(&json).unwrap();
248
249        assert_eq!(parsed.state, ExecutionState::Running);
250        assert_eq!(parsed.variables.get("foo").unwrap(), "bar");
251    }
252}