Skip to main content

a3s_event/
state.rs

1//! EventBus state persistence
2//!
3//! Provides pluggable persistence for subscription filters so they
4//! survive process restarts. The `EventBus` auto-saves on changes
5//! and auto-loads on creation when a `StateStore` is configured.
6
7use crate::error::{EventError, Result};
8use crate::types::SubscriptionFilter;
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11
12/// Trait for persisting EventBus subscription state
13pub trait StateStore: Send + Sync {
14    /// Save all subscription filters
15    fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()>;
16
17    /// Load all subscription filters
18    fn load(&self) -> Result<HashMap<String, SubscriptionFilter>>;
19}
20
21/// JSON file-based state store
22///
23/// Persists subscription filters as a JSON file on disk.
24/// Atomic writes via temp file + rename to prevent corruption.
25pub struct FileStateStore {
26    path: PathBuf,
27}
28
29impl FileStateStore {
30    /// Create a new file state store at the given path
31    pub fn new(path: impl Into<PathBuf>) -> Self {
32        Self { path: path.into() }
33    }
34
35    /// Get the file path
36    pub fn path(&self) -> &Path {
37        &self.path
38    }
39}
40
41impl StateStore for FileStateStore {
42    fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
43        let json = serde_json::to_string_pretty(subscriptions)?;
44
45        // Atomic write: write to temp file, then rename
46        let tmp_path = self.path.with_extension("tmp");
47
48        if let Some(parent) = self.path.parent() {
49            std::fs::create_dir_all(parent).map_err(|e| {
50                EventError::Config(format!(
51                    "Failed to create state directory {}: {}",
52                    parent.display(),
53                    e
54                ))
55            })?;
56        }
57
58        std::fs::write(&tmp_path, json).map_err(|e| {
59            EventError::Config(format!(
60                "Failed to write state file {}: {}",
61                tmp_path.display(),
62                e
63            ))
64        })?;
65
66        std::fs::rename(&tmp_path, &self.path).map_err(|e| {
67            EventError::Config(format!(
68                "Failed to rename state file {} → {}: {}",
69                tmp_path.display(),
70                self.path.display(),
71                e
72            ))
73        })?;
74
75        tracing::debug!(path = %self.path.display(), "State saved");
76        Ok(())
77    }
78
79    fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
80        if !self.path.exists() {
81            return Ok(HashMap::new());
82        }
83
84        let json = std::fs::read_to_string(&self.path).map_err(|e| {
85            EventError::Config(format!(
86                "Failed to read state file {}: {}",
87                self.path.display(),
88                e
89            ))
90        })?;
91
92        let subscriptions: HashMap<String, SubscriptionFilter> =
93            serde_json::from_str(&json).map_err(|e| {
94                EventError::Config(format!(
95                    "Failed to parse state file {}: {}",
96                    self.path.display(),
97                    e
98                ))
99            })?;
100
101        tracing::debug!(
102            path = %self.path.display(),
103            count = subscriptions.len(),
104            "State loaded"
105        );
106        Ok(subscriptions)
107    }
108}
109
110/// In-memory state store for testing
111///
112/// Stores state in memory — lost on drop, but useful for tests.
113#[derive(Default)]
114pub struct MemoryStateStore {
115    state: std::sync::RwLock<HashMap<String, SubscriptionFilter>>,
116}
117
118impl StateStore for MemoryStateStore {
119    fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
120        let mut state = self.state.write().map_err(|e| {
121            EventError::Config(format!("Failed to acquire state lock: {}", e))
122        })?;
123        *state = subscriptions.clone();
124        Ok(())
125    }
126
127    fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
128        let state = self.state.read().map_err(|e| {
129            EventError::Config(format!("Failed to acquire state lock: {}", e))
130        })?;
131        Ok(state.clone())
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::types::SubscriptionFilter;
139
140    fn sample_filters() -> HashMap<String, SubscriptionFilter> {
141        let mut map = HashMap::new();
142        map.insert(
143            "analyst".to_string(),
144            SubscriptionFilter {
145                subscriber_id: "analyst".to_string(),
146                subjects: vec!["events.market.>".to_string()],
147                durable: true,
148                options: None,
149            },
150        );
151        map.insert(
152            "monitor".to_string(),
153            SubscriptionFilter {
154                subscriber_id: "monitor".to_string(),
155                subjects: vec!["events.system.>".to_string()],
156                durable: false,
157                options: None,
158            },
159        );
160        map
161    }
162
163    #[test]
164    fn test_memory_store_save_load() {
165        let store = MemoryStateStore::default();
166        let filters = sample_filters();
167
168        store.save(&filters).unwrap();
169        let loaded = store.load().unwrap();
170
171        assert_eq!(loaded.len(), 2);
172        assert_eq!(loaded["analyst"].subscriber_id, "analyst");
173        assert!(loaded["analyst"].durable);
174        assert_eq!(loaded["monitor"].subjects, vec!["events.system.>"]);
175    }
176
177    #[test]
178    fn test_memory_store_empty_load() {
179        let store = MemoryStateStore::default();
180        let loaded = store.load().unwrap();
181        assert!(loaded.is_empty());
182    }
183
184    #[test]
185    fn test_memory_store_overwrite() {
186        let store = MemoryStateStore::default();
187        let filters = sample_filters();
188        store.save(&filters).unwrap();
189
190        let mut updated = HashMap::new();
191        updated.insert(
192            "new-sub".to_string(),
193            SubscriptionFilter {
194                subscriber_id: "new-sub".to_string(),
195                subjects: vec!["events.>".to_string()],
196                durable: true,
197                options: None,
198            },
199        );
200        store.save(&updated).unwrap();
201
202        let loaded = store.load().unwrap();
203        assert_eq!(loaded.len(), 1);
204        assert!(loaded.contains_key("new-sub"));
205    }
206
207    #[test]
208    fn test_file_store_save_load() {
209        let dir = std::env::temp_dir().join(format!("a3s-event-test-{}", uuid::Uuid::new_v4()));
210        std::fs::create_dir_all(&dir).unwrap();
211        let path = dir.join("state.json");
212
213        let store = FileStateStore::new(&path);
214        let filters = sample_filters();
215
216        store.save(&filters).unwrap();
217        assert!(path.exists());
218
219        let loaded = store.load().unwrap();
220        assert_eq!(loaded.len(), 2);
221        assert_eq!(loaded["analyst"].subscriber_id, "analyst");
222
223        // Verify JSON is human-readable
224        let content = std::fs::read_to_string(&path).unwrap();
225        assert!(content.contains("analyst"));
226
227        std::fs::remove_dir_all(&dir).unwrap();
228    }
229
230    #[test]
231    fn test_file_store_load_nonexistent() {
232        let store = FileStateStore::new("/tmp/nonexistent-a3s-state.json");
233        let loaded = store.load().unwrap();
234        assert!(loaded.is_empty());
235    }
236
237    #[test]
238    fn test_file_store_creates_parent_dirs() {
239        let dir = std::env::temp_dir().join(format!(
240            "a3s-event-test-{}/nested/deep",
241            uuid::Uuid::new_v4()
242        ));
243        let path = dir.join("state.json");
244
245        let store = FileStateStore::new(&path);
246        store.save(&HashMap::new()).unwrap();
247        assert!(path.exists());
248
249        std::fs::remove_dir_all(
250            dir.parent().unwrap().parent().unwrap(),
251        )
252        .unwrap();
253    }
254
255    #[test]
256    fn test_file_store_atomic_write() {
257        let dir = std::env::temp_dir().join(format!("a3s-event-test-{}", uuid::Uuid::new_v4()));
258        let path = dir.join("state.json");
259        let store = FileStateStore::new(&path);
260
261        // Save initial state
262        let filters = sample_filters();
263        store.save(&filters).unwrap();
264
265        // Save again — tmp file should not linger
266        store.save(&filters).unwrap();
267        let tmp_path = path.with_extension("tmp");
268        assert!(!tmp_path.exists());
269
270        std::fs::remove_dir_all(&dir).unwrap();
271    }
272}