Skip to main content

a3s_flow/
flow_store.rs

1//! [`FlowStore`] — flow definition storage extension point.
2//!
3//! Implement [`FlowStore`] to store and retrieve named JSON flow definitions
4//! using any backend (in-memory, SQLite, PostgreSQL, remote API, …).
5//!
6//! Register a store on [`FlowEngine`](crate::engine::FlowEngine) via
7//! [`with_flow_store`](crate::engine::FlowEngine::with_flow_store) to enable
8//! [`start_named`](crate::engine::FlowEngine::start_named) — which loads a
9//! definition by name and starts it in one step.
10//!
11//! [`MemoryFlowStore`] is provided as an in-process default.
12
13use std::collections::HashMap;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use serde_json::Value;
18use tokio::sync::RwLock;
19
20use crate::error::Result;
21
22/// Named storage for JSON flow definitions.
23///
24/// # Example
25///
26/// ```rust
27/// use a3s_flow::{FlowStore, MemoryFlowStore};
28/// use serde_json::json;
29/// use std::sync::Arc;
30///
31/// # #[tokio::main] async fn main() {
32/// let store = MemoryFlowStore::new();
33///
34/// let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
35/// store.save("my-flow", &def).await.unwrap();
36///
37/// let loaded = store.load("my-flow").await.unwrap().unwrap();
38/// assert_eq!(loaded, def);
39/// # }
40/// ```
41#[async_trait]
42pub trait FlowStore: Send + Sync {
43    /// Persist a flow definition under the given name (upsert semantics).
44    async fn save(&self, name: &str, definition: &Value) -> Result<()>;
45
46    /// Load a flow definition by name.
47    ///
48    /// Returns `None` if no definition exists with that name.
49    async fn load(&self, name: &str) -> Result<Option<Value>>;
50
51    /// List all stored flow names.
52    async fn list(&self) -> Result<Vec<String>>;
53
54    /// Delete a stored flow definition. No-op if the name is not found.
55    async fn delete(&self, name: &str) -> Result<()>;
56}
57
58/// An in-memory [`FlowStore`] backed by a `HashMap` under an `RwLock`.
59///
60/// Suitable for testing and short-lived processes. Data is lost on restart.
61pub struct MemoryFlowStore {
62    inner: Arc<RwLock<HashMap<String, Value>>>,
63}
64
65impl MemoryFlowStore {
66    /// Create a new empty store.
67    pub fn new() -> Self {
68        Self {
69            inner: Arc::new(RwLock::new(HashMap::new())),
70        }
71    }
72}
73
74impl Default for MemoryFlowStore {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80#[async_trait]
81impl FlowStore for MemoryFlowStore {
82    async fn save(&self, name: &str, definition: &Value) -> Result<()> {
83        self.inner
84            .write()
85            .await
86            .insert(name.to_string(), definition.clone());
87        Ok(())
88    }
89
90    async fn load(&self, name: &str) -> Result<Option<Value>> {
91        Ok(self.inner.read().await.get(name).cloned())
92    }
93
94    async fn list(&self) -> Result<Vec<String>> {
95        Ok(self.inner.read().await.keys().cloned().collect())
96    }
97
98    async fn delete(&self, name: &str) -> Result<()> {
99        self.inner.write().await.remove(name);
100        Ok(())
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use serde_json::json;
108
109    #[tokio::test]
110    async fn save_and_load_round_trip() {
111        let store = MemoryFlowStore::new();
112        let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
113
114        store.save("my-flow", &def).await.unwrap();
115        let loaded = store.load("my-flow").await.unwrap().unwrap();
116        assert_eq!(loaded, def);
117    }
118
119    #[tokio::test]
120    async fn load_unknown_name_returns_none() {
121        let store = MemoryFlowStore::new();
122        let result = store.load("nonexistent").await.unwrap();
123        assert!(result.is_none());
124    }
125
126    #[tokio::test]
127    async fn list_returns_all_saved_names() {
128        let store = MemoryFlowStore::new();
129        let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
130
131        store.save("flow-a", &def).await.unwrap();
132        store.save("flow-b", &def).await.unwrap();
133
134        let mut names = store.list().await.unwrap();
135        names.sort();
136        assert_eq!(names, vec!["flow-a", "flow-b"]);
137    }
138
139    #[tokio::test]
140    async fn delete_removes_entry() {
141        let store = MemoryFlowStore::new();
142        let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
143
144        store.save("flow-a", &def).await.unwrap();
145        store.delete("flow-a").await.unwrap();
146
147        assert!(store.load("flow-a").await.unwrap().is_none());
148        assert!(store.list().await.unwrap().is_empty());
149    }
150
151    #[tokio::test]
152    async fn save_overwrites_existing_definition() {
153        let store = MemoryFlowStore::new();
154        let v1 = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
155        let v2 = json!({ "nodes": [{ "id": "b", "type": "noop" }], "edges": [] });
156
157        store.save("flow", &v1).await.unwrap();
158        store.save("flow", &v2).await.unwrap();
159
160        let loaded = store.load("flow").await.unwrap().unwrap();
161        assert_eq!(loaded, v2);
162        assert_eq!(store.list().await.unwrap().len(), 1);
163    }
164}