1use std::collections::HashMap;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use serde_json::Value;
18use tokio::sync::RwLock;
19
20use crate::error::Result;
21
22#[async_trait]
42pub trait FlowStore: Send + Sync {
43 async fn save(&self, name: &str, definition: &Value) -> Result<()>;
45
46 async fn load(&self, name: &str) -> Result<Option<Value>>;
50
51 async fn list(&self) -> Result<Vec<String>>;
53
54 async fn delete(&self, name: &str) -> Result<()>;
56}
57
58pub struct MemoryFlowStore {
62 inner: Arc<RwLock<HashMap<String, Value>>>,
63}
64
65impl MemoryFlowStore {
66 pub fn new() -> Self {
68 Self {
69 inner: Arc::new(RwLock::new(HashMap::new())),
70 }
71 }
72}
73
74impl Default for MemoryFlowStore {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80#[async_trait]
81impl FlowStore for MemoryFlowStore {
82 async fn save(&self, name: &str, definition: &Value) -> Result<()> {
83 self.inner
84 .write()
85 .await
86 .insert(name.to_string(), definition.clone());
87 Ok(())
88 }
89
90 async fn load(&self, name: &str) -> Result<Option<Value>> {
91 Ok(self.inner.read().await.get(name).cloned())
92 }
93
94 async fn list(&self) -> Result<Vec<String>> {
95 Ok(self.inner.read().await.keys().cloned().collect())
96 }
97
98 async fn delete(&self, name: &str) -> Result<()> {
99 self.inner.write().await.remove(name);
100 Ok(())
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use serde_json::json;
108
109 #[tokio::test]
110 async fn save_and_load_round_trip() {
111 let store = MemoryFlowStore::new();
112 let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
113
114 store.save("my-flow", &def).await.unwrap();
115 let loaded = store.load("my-flow").await.unwrap().unwrap();
116 assert_eq!(loaded, def);
117 }
118
119 #[tokio::test]
120 async fn load_unknown_name_returns_none() {
121 let store = MemoryFlowStore::new();
122 let result = store.load("nonexistent").await.unwrap();
123 assert!(result.is_none());
124 }
125
126 #[tokio::test]
127 async fn list_returns_all_saved_names() {
128 let store = MemoryFlowStore::new();
129 let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
130
131 store.save("flow-a", &def).await.unwrap();
132 store.save("flow-b", &def).await.unwrap();
133
134 let mut names = store.list().await.unwrap();
135 names.sort();
136 assert_eq!(names, vec!["flow-a", "flow-b"]);
137 }
138
139 #[tokio::test]
140 async fn delete_removes_entry() {
141 let store = MemoryFlowStore::new();
142 let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
143
144 store.save("flow-a", &def).await.unwrap();
145 store.delete("flow-a").await.unwrap();
146
147 assert!(store.load("flow-a").await.unwrap().is_none());
148 assert!(store.list().await.unwrap().is_empty());
149 }
150
151 #[tokio::test]
152 async fn save_overwrites_existing_definition() {
153 let store = MemoryFlowStore::new();
154 let v1 = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
155 let v2 = json!({ "nodes": [{ "id": "b", "type": "noop" }], "edges": [] });
156
157 store.save("flow", &v1).await.unwrap();
158 store.save("flow", &v2).await.unwrap();
159
160 let loaded = store.load("flow").await.unwrap().unwrap();
161 assert_eq!(loaded, v2);
162 assert_eq!(store.list().await.unwrap().len(), 1);
163 }
164}