Skip to main content

a3s_flow/
store.rs

1//! [`ExecutionStore`] — execution history persistence extension point.
2//!
3//! Implement [`ExecutionStore`] to persist [`FlowResult`] objects across process
4//! restarts, enabling auditing, replay, and partial-resume workflows.
5//!
6//! Register a store via
7//! [`FlowEngine::with_execution_store`](crate::engine::FlowEngine::with_execution_store).
8//! The engine automatically saves completed results to the store.
9//! [`MemoryExecutionStore`] is provided as an in-process default.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18use crate::error::Result;
19use crate::result::FlowResult;
20
21/// Persistence layer for completed flow execution results.
22///
23/// # Example
24///
25/// ```rust
26/// use a3s_flow::{ExecutionStore, FlowResult, MemoryExecutionStore};
27/// use std::sync::Arc;
28///
29/// # #[tokio::main] async fn main() {
30/// let store = Arc::new(MemoryExecutionStore::new());
31/// let ids = store.list().await.unwrap();
32/// assert!(ids.is_empty());
33/// # }
34/// ```
35#[async_trait]
36pub trait ExecutionStore: Send + Sync {
37    /// Persist a completed execution result, keyed by `result.execution_id`.
38    async fn save(&self, result: &FlowResult) -> Result<()>;
39
40    /// Load a previously saved result by execution ID.
41    ///
42    /// Returns `None` if no result exists for the given ID.
43    async fn load(&self, id: Uuid) -> Result<Option<FlowResult>>;
44
45    /// List all stored execution IDs.
46    async fn list(&self) -> Result<Vec<Uuid>>;
47
48    /// Delete a stored result. No-op if the ID is not found.
49    async fn delete(&self, id: Uuid) -> Result<()>;
50}
51
52/// An in-memory [`ExecutionStore`] backed by a `HashMap` under an `RwLock`.
53///
54/// Suitable for testing and short-lived processes. Data is lost on restart.
55pub struct MemoryExecutionStore {
56    inner: Arc<RwLock<HashMap<Uuid, FlowResult>>>,
57}
58
59impl MemoryExecutionStore {
60    /// Create a new empty store.
61    pub fn new() -> Self {
62        Self {
63            inner: Arc::new(RwLock::new(HashMap::new())),
64        }
65    }
66}
67
68impl Default for MemoryExecutionStore {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74#[async_trait]
75impl ExecutionStore for MemoryExecutionStore {
76    async fn save(&self, result: &FlowResult) -> Result<()> {
77        self.inner
78            .write()
79            .await
80            .insert(result.execution_id, result.clone());
81        Ok(())
82    }
83
84    async fn load(&self, id: Uuid) -> Result<Option<FlowResult>> {
85        Ok(self.inner.read().await.get(&id).cloned())
86    }
87
88    async fn list(&self) -> Result<Vec<Uuid>> {
89        Ok(self.inner.read().await.keys().cloned().collect())
90    }
91
92    async fn delete(&self, id: Uuid) -> Result<()> {
93        self.inner.write().await.remove(&id);
94        Ok(())
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use std::collections::{HashMap, HashSet};
102
103    fn make_result() -> FlowResult {
104        FlowResult {
105            execution_id: Uuid::new_v4(),
106            outputs: HashMap::new(),
107            completed_nodes: HashSet::new(),
108            skipped_nodes: HashSet::new(),
109            context: HashMap::new(),
110        }
111    }
112
113    #[tokio::test]
114    async fn save_and_load_round_trip() {
115        let store = MemoryExecutionStore::new();
116        let r = make_result();
117        let id = r.execution_id;
118
119        store.save(&r).await.unwrap();
120        let loaded = store.load(id).await.unwrap().unwrap();
121        assert_eq!(loaded.execution_id, id);
122    }
123
124    #[tokio::test]
125    async fn load_unknown_id_returns_none() {
126        let store = MemoryExecutionStore::new();
127        let result = store.load(Uuid::new_v4()).await.unwrap();
128        assert!(result.is_none());
129    }
130
131    #[tokio::test]
132    async fn list_returns_all_saved_ids() {
133        let store = MemoryExecutionStore::new();
134        let r1 = make_result();
135        let r2 = make_result();
136        let id1 = r1.execution_id;
137        let id2 = r2.execution_id;
138
139        store.save(&r1).await.unwrap();
140        store.save(&r2).await.unwrap();
141
142        let ids = store.list().await.unwrap();
143        assert_eq!(ids.len(), 2);
144        assert!(ids.contains(&id1));
145        assert!(ids.contains(&id2));
146    }
147
148    #[tokio::test]
149    async fn delete_removes_entry() {
150        let store = MemoryExecutionStore::new();
151        let r = make_result();
152        let id = r.execution_id;
153
154        store.save(&r).await.unwrap();
155        store.delete(id).await.unwrap();
156
157        assert!(store.load(id).await.unwrap().is_none());
158        assert!(store.list().await.unwrap().is_empty());
159    }
160
161    #[tokio::test]
162    async fn save_overwrites_existing_entry() {
163        let store = MemoryExecutionStore::new();
164        let mut r = make_result();
165        let id = r.execution_id;
166
167        store.save(&r).await.unwrap();
168
169        // Save again with a modified outputs map — should overwrite.
170        r.outputs.insert("x".into(), serde_json::json!(42));
171        store.save(&r).await.unwrap();
172
173        let loaded = store.load(id).await.unwrap().unwrap();
174        assert_eq!(loaded.outputs["x"], serde_json::json!(42));
175
176        // Still only one entry.
177        assert_eq!(store.list().await.unwrap().len(), 1);
178    }
179}