1use crate::error::{EventError, Result};
8use crate::types::SubscriptionFilter;
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11
12pub trait StateStore: Send + Sync {
14 fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()>;
16
17 fn load(&self) -> Result<HashMap<String, SubscriptionFilter>>;
19}
20
21pub struct FileStateStore {
26 path: PathBuf,
27}
28
29impl FileStateStore {
30 pub fn new(path: impl Into<PathBuf>) -> Self {
32 Self { path: path.into() }
33 }
34
35 pub fn path(&self) -> &Path {
37 &self.path
38 }
39}
40
41impl StateStore for FileStateStore {
42 fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
43 let json = serde_json::to_string_pretty(subscriptions)?;
44
45 let tmp_path = self.path.with_extension("tmp");
47
48 if let Some(parent) = self.path.parent() {
49 std::fs::create_dir_all(parent).map_err(|e| {
50 EventError::Config(format!(
51 "Failed to create state directory {}: {}",
52 parent.display(),
53 e
54 ))
55 })?;
56 }
57
58 std::fs::write(&tmp_path, json).map_err(|e| {
59 EventError::Config(format!(
60 "Failed to write state file {}: {}",
61 tmp_path.display(),
62 e
63 ))
64 })?;
65
66 std::fs::rename(&tmp_path, &self.path).map_err(|e| {
67 EventError::Config(format!(
68 "Failed to rename state file {} → {}: {}",
69 tmp_path.display(),
70 self.path.display(),
71 e
72 ))
73 })?;
74
75 tracing::debug!(path = %self.path.display(), "State saved");
76 Ok(())
77 }
78
79 fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
80 if !self.path.exists() {
81 return Ok(HashMap::new());
82 }
83
84 let json = std::fs::read_to_string(&self.path).map_err(|e| {
85 EventError::Config(format!(
86 "Failed to read state file {}: {}",
87 self.path.display(),
88 e
89 ))
90 })?;
91
92 let subscriptions: HashMap<String, SubscriptionFilter> =
93 serde_json::from_str(&json).map_err(|e| {
94 EventError::Config(format!(
95 "Failed to parse state file {}: {}",
96 self.path.display(),
97 e
98 ))
99 })?;
100
101 tracing::debug!(
102 path = %self.path.display(),
103 count = subscriptions.len(),
104 "State loaded"
105 );
106 Ok(subscriptions)
107 }
108}
109
110#[derive(Default)]
114pub struct MemoryStateStore {
115 state: std::sync::RwLock<HashMap<String, SubscriptionFilter>>,
116}
117
118impl StateStore for MemoryStateStore {
119 fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
120 let mut state = self.state.write().map_err(|e| {
121 EventError::Config(format!("Failed to acquire state lock: {}", e))
122 })?;
123 *state = subscriptions.clone();
124 Ok(())
125 }
126
127 fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
128 let state = self.state.read().map_err(|e| {
129 EventError::Config(format!("Failed to acquire state lock: {}", e))
130 })?;
131 Ok(state.clone())
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::types::SubscriptionFilter;
139
140 fn sample_filters() -> HashMap<String, SubscriptionFilter> {
141 let mut map = HashMap::new();
142 map.insert(
143 "analyst".to_string(),
144 SubscriptionFilter {
145 subscriber_id: "analyst".to_string(),
146 subjects: vec!["events.market.>".to_string()],
147 durable: true,
148 options: None,
149 },
150 );
151 map.insert(
152 "monitor".to_string(),
153 SubscriptionFilter {
154 subscriber_id: "monitor".to_string(),
155 subjects: vec!["events.system.>".to_string()],
156 durable: false,
157 options: None,
158 },
159 );
160 map
161 }
162
163 #[test]
164 fn test_memory_store_save_load() {
165 let store = MemoryStateStore::default();
166 let filters = sample_filters();
167
168 store.save(&filters).unwrap();
169 let loaded = store.load().unwrap();
170
171 assert_eq!(loaded.len(), 2);
172 assert_eq!(loaded["analyst"].subscriber_id, "analyst");
173 assert!(loaded["analyst"].durable);
174 assert_eq!(loaded["monitor"].subjects, vec!["events.system.>"]);
175 }
176
177 #[test]
178 fn test_memory_store_empty_load() {
179 let store = MemoryStateStore::default();
180 let loaded = store.load().unwrap();
181 assert!(loaded.is_empty());
182 }
183
184 #[test]
185 fn test_memory_store_overwrite() {
186 let store = MemoryStateStore::default();
187 let filters = sample_filters();
188 store.save(&filters).unwrap();
189
190 let mut updated = HashMap::new();
191 updated.insert(
192 "new-sub".to_string(),
193 SubscriptionFilter {
194 subscriber_id: "new-sub".to_string(),
195 subjects: vec!["events.>".to_string()],
196 durable: true,
197 options: None,
198 },
199 );
200 store.save(&updated).unwrap();
201
202 let loaded = store.load().unwrap();
203 assert_eq!(loaded.len(), 1);
204 assert!(loaded.contains_key("new-sub"));
205 }
206
207 #[test]
208 fn test_file_store_save_load() {
209 let dir = std::env::temp_dir().join(format!("a3s-event-test-{}", uuid::Uuid::new_v4()));
210 std::fs::create_dir_all(&dir).unwrap();
211 let path = dir.join("state.json");
212
213 let store = FileStateStore::new(&path);
214 let filters = sample_filters();
215
216 store.save(&filters).unwrap();
217 assert!(path.exists());
218
219 let loaded = store.load().unwrap();
220 assert_eq!(loaded.len(), 2);
221 assert_eq!(loaded["analyst"].subscriber_id, "analyst");
222
223 let content = std::fs::read_to_string(&path).unwrap();
225 assert!(content.contains("analyst"));
226
227 std::fs::remove_dir_all(&dir).unwrap();
228 }
229
230 #[test]
231 fn test_file_store_load_nonexistent() {
232 let store = FileStateStore::new("/tmp/nonexistent-a3s-state.json");
233 let loaded = store.load().unwrap();
234 assert!(loaded.is_empty());
235 }
236
237 #[test]
238 fn test_file_store_creates_parent_dirs() {
239 let dir = std::env::temp_dir().join(format!(
240 "a3s-event-test-{}/nested/deep",
241 uuid::Uuid::new_v4()
242 ));
243 let path = dir.join("state.json");
244
245 let store = FileStateStore::new(&path);
246 store.save(&HashMap::new()).unwrap();
247 assert!(path.exists());
248
249 std::fs::remove_dir_all(
250 dir.parent().unwrap().parent().unwrap(),
251 )
252 .unwrap();
253 }
254
255 #[test]
256 fn test_file_store_atomic_write() {
257 let dir = std::env::temp_dir().join(format!("a3s-event-test-{}", uuid::Uuid::new_v4()));
258 let path = dir.join("state.json");
259 let store = FileStateStore::new(&path);
260
261 let filters = sample_filters();
263 store.save(&filters).unwrap();
264
265 store.save(&filters).unwrap();
267 let tmp_path = path.with_extension("tmp");
268 assert!(!tmp_path.exists());
269
270 std::fs::remove_dir_all(&dir).unwrap();
271 }
272}