Skip to main content

slack_rs/idempotency/
store.rs

1//! Idempotency store implementation with JSON persistence
2
3use super::types::{IdempotencyEntry, RequestFingerprint, ScopedKey};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::fs;
7use std::path::PathBuf;
8use thiserror::Error;
9
10/// Default TTL: 7 days in seconds
11pub const DEFAULT_TTL_SECONDS: u64 = 7 * 24 * 60 * 60;
12
13/// Default capacity limit
14pub const DEFAULT_CAPACITY: usize = 10_000;
15
16/// Idempotency store errors
17#[derive(Debug, Error)]
18pub enum IdempotencyError {
19    #[error("IO error: {0}")]
20    IoError(#[from] std::io::Error),
21
22    #[error("JSON serialization error: {0}")]
23    JsonError(#[from] serde_json::Error),
24
25    #[error("Fingerprint mismatch: different request with same idempotency key")]
26    FingerprintMismatch,
27
28    #[error("Store error: {0}")]
29    StoreError(String),
30}
31
32/// Persistent idempotency store
33#[derive(Debug, Serialize, Deserialize)]
34pub struct IdempotencyStore {
35    /// Map of scoped keys to entries
36    entries: HashMap<String, IdempotencyEntry>,
37
38    /// Capacity limit
39    #[serde(skip)]
40    capacity: usize,
41
42    /// Store file path
43    #[serde(skip)]
44    store_path: PathBuf,
45}
46
47impl IdempotencyStore {
48    /// Create a new store with default config dir
49    pub fn new() -> Result<Self, IdempotencyError> {
50        let store_path = Self::default_store_path()?;
51        Self::load_or_create(store_path, DEFAULT_CAPACITY)
52    }
53
54    /// Create a new store with custom path
55    pub fn with_path(store_path: PathBuf) -> Result<Self, IdempotencyError> {
56        Self::load_or_create(store_path, DEFAULT_CAPACITY)
57    }
58
59    /// Get default store path in config directory
60    fn default_store_path() -> Result<PathBuf, IdempotencyError> {
61        let project_dirs = directories::ProjectDirs::from("", "", "slack-rs")
62            .ok_or_else(|| IdempotencyError::StoreError("Cannot find config directory".into()))?;
63        let config_dir = project_dirs.config_dir();
64
65        // Create directory if it doesn't exist
66        if !config_dir.exists() {
67            fs::create_dir_all(config_dir)?;
68        }
69
70        Ok(config_dir.join("idempotency_store.json"))
71    }
72
73    /// Load store from disk or create new if doesn't exist
74    fn load_or_create(store_path: PathBuf, capacity: usize) -> Result<Self, IdempotencyError> {
75        if store_path.exists() {
76            let content = fs::read_to_string(&store_path)?;
77            let mut store: IdempotencyStore = serde_json::from_str(&content)?;
78            store.store_path = store_path;
79            store.capacity = capacity;
80
81            // Run GC on load
82            store.gc()?;
83            Ok(store)
84        } else {
85            let store = IdempotencyStore {
86                entries: HashMap::new(),
87                capacity,
88                store_path,
89            };
90
91            // Create parent directory if needed
92            if let Some(parent) = store.store_path.parent() {
93                if !parent.exists() {
94                    fs::create_dir_all(parent)?;
95                }
96            }
97
98            // Set file permissions on Unix (0600)
99            #[cfg(unix)]
100            {
101                use std::os::unix::fs::OpenOptionsExt;
102                let mut options = fs::OpenOptions::new();
103                options.write(true).create(true).mode(0o600);
104                options.open(&store.store_path)?;
105            }
106
107            #[cfg(not(unix))]
108            {
109                // On non-Unix, just create the file
110                fs::write(&store.store_path, "{}")?;
111            }
112
113            store.save()?;
114            Ok(store)
115        }
116    }
117
118    /// Get entry if exists
119    pub fn get(&self, key: &ScopedKey) -> Option<&IdempotencyEntry> {
120        let key_str = key.to_string();
121        self.entries.get(&key_str).filter(|e| !e.is_expired())
122    }
123
124    /// Check and validate entry, returning response if valid
125    ///
126    /// Returns:
127    /// - Ok(Some(response)) if entry exists and fingerprint matches
128    /// - Ok(None) if entry doesn't exist
129    /// - Err if entry exists but fingerprint doesn't match
130    pub fn check(
131        &self,
132        key: &ScopedKey,
133        fingerprint: &RequestFingerprint,
134    ) -> Result<Option<serde_json::Value>, IdempotencyError> {
135        if let Some(entry) = self.get(key) {
136            if entry.fingerprint == *fingerprint {
137                Ok(Some(entry.response.clone()))
138            } else {
139                Err(IdempotencyError::FingerprintMismatch)
140            }
141        } else {
142            Ok(None)
143        }
144    }
145
146    /// Store entry
147    pub fn put(
148        &mut self,
149        key: ScopedKey,
150        fingerprint: RequestFingerprint,
151        response: serde_json::Value,
152    ) -> Result<(), IdempotencyError> {
153        // Run GC before adding new entry
154        self.gc()?;
155
156        let entry = IdempotencyEntry::new(fingerprint, response, DEFAULT_TTL_SECONDS);
157        let key_str = key.to_string();
158        self.entries.insert(key_str, entry);
159
160        self.save()
161    }
162
163    /// Garbage collection: remove expired and enforce capacity limit
164    fn gc(&mut self) -> Result<(), IdempotencyError> {
165        // Remove expired entries
166        self.entries.retain(|_, entry| !entry.is_expired());
167
168        // If over capacity, remove oldest entries
169        if self.entries.len() > self.capacity {
170            let mut entries: Vec<_> = self
171                .entries
172                .iter()
173                .map(|(k, v)| (k.clone(), v.created_at))
174                .collect();
175            entries.sort_by_key(|(_, created_at)| *created_at);
176
177            let to_remove = self.entries.len() - self.capacity;
178            for (key, _) in entries.iter().take(to_remove) {
179                self.entries.remove(key);
180            }
181        }
182
183        Ok(())
184    }
185
186    /// Save store to disk
187    fn save(&self) -> Result<(), IdempotencyError> {
188        let content = serde_json::to_string_pretty(&self)?;
189        fs::write(&self.store_path, content)?;
190
191        // Ensure permissions on Unix
192        #[cfg(unix)]
193        {
194            use std::fs::Permissions;
195            use std::os::unix::fs::PermissionsExt;
196            let perms = Permissions::from_mode(0o600);
197            fs::set_permissions(&self.store_path, perms)?;
198        }
199
200        Ok(())
201    }
202
203    /// Get number of entries in store
204    #[allow(dead_code)]
205    pub fn len(&self) -> usize {
206        self.entries.len()
207    }
208
209    /// Check if store is empty
210    #[allow(dead_code)]
211    pub fn is_empty(&self) -> bool {
212        self.entries.is_empty()
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use serde_json::json;
220    use tempfile::TempDir;
221
222    fn create_test_store() -> (IdempotencyStore, TempDir) {
223        let temp_dir = TempDir::new().unwrap();
224        let store_path = temp_dir.path().join("test_store.json");
225        let store = IdempotencyStore::with_path(store_path).unwrap();
226        (store, temp_dir)
227    }
228
229    #[test]
230    fn test_store_creation() {
231        let (store, _temp) = create_test_store();
232        assert_eq!(store.len(), 0);
233        assert!(store.is_empty());
234    }
235
236    #[test]
237    fn test_put_and_get() {
238        let (mut store, _temp) = create_test_store();
239
240        let key = ScopedKey::new(
241            "T123".into(),
242            "U456".into(),
243            "chat.postMessage".into(),
244            "test-key".into(),
245        );
246
247        let mut params = serde_json::Map::new();
248        params.insert("channel".into(), json!("C123"));
249        params.insert("text".into(), json!("hello"));
250        let fingerprint = RequestFingerprint::from_params(&params);
251
252        let response = json!({"ok": true, "ts": "1234567890.123456"});
253
254        // Store entry
255        store
256            .put(key.clone(), fingerprint.clone(), response.clone())
257            .unwrap();
258
259        // Check retrieval
260        let result = store.check(&key, &fingerprint).unwrap();
261        assert_eq!(result, Some(response));
262    }
263
264    #[test]
265    fn test_fingerprint_mismatch() {
266        let (mut store, _temp) = create_test_store();
267
268        let key = ScopedKey::new(
269            "T123".into(),
270            "U456".into(),
271            "chat.postMessage".into(),
272            "test-key".into(),
273        );
274
275        let mut params1 = serde_json::Map::new();
276        params1.insert("channel".into(), json!("C123"));
277        params1.insert("text".into(), json!("hello"));
278        let fingerprint1 = RequestFingerprint::from_params(&params1);
279
280        let response = json!({"ok": true, "ts": "1234567890.123456"});
281        store.put(key.clone(), fingerprint1, response).unwrap();
282
283        // Different params with same key
284        let mut params2 = serde_json::Map::new();
285        params2.insert("channel".into(), json!("C123"));
286        params2.insert("text".into(), json!("different"));
287        let fingerprint2 = RequestFingerprint::from_params(&params2);
288
289        // Should fail with fingerprint mismatch
290        let result = store.check(&key, &fingerprint2);
291        assert!(matches!(result, Err(IdempotencyError::FingerprintMismatch)));
292    }
293
294    #[test]
295    fn test_gc_expired_entries() {
296        let (mut store, _temp) = create_test_store();
297
298        let key = ScopedKey::new(
299            "T123".into(),
300            "U456".into(),
301            "chat.postMessage".into(),
302            "test-key".into(),
303        );
304
305        let mut params = serde_json::Map::new();
306        params.insert("channel".into(), json!("C123"));
307        let fingerprint = RequestFingerprint::from_params(&params);
308
309        // Create expired entry (negative TTL to ensure it's already expired)
310        let response = json!({"ok": true});
311        let now = std::time::SystemTime::now()
312            .duration_since(std::time::UNIX_EPOCH)
313            .unwrap()
314            .as_secs();
315        let entry = IdempotencyEntry {
316            fingerprint: fingerprint.clone(),
317            response,
318            created_at: now - 10,
319            expires_at: now - 5, // Already expired
320        };
321        store.entries.insert(key.to_string(), entry);
322
323        assert_eq!(store.len(), 1);
324
325        // GC should remove expired entry
326        store.gc().unwrap();
327        assert_eq!(store.len(), 0);
328    }
329
330    #[test]
331    fn test_gc_capacity_limit() {
332        let (mut store, _temp) = create_test_store();
333        store.capacity = 3;
334
335        // Add entries with staggered timestamps
336        let now = std::time::SystemTime::now()
337            .duration_since(std::time::UNIX_EPOCH)
338            .unwrap()
339            .as_secs();
340
341        for i in 0..5 {
342            let key = ScopedKey::new(
343                "T123".into(),
344                "U456".into(),
345                "chat.postMessage".into(),
346                format!("key-{}", i),
347            );
348
349            let mut params = serde_json::Map::new();
350            params.insert("i".into(), json!(i));
351            let fingerprint = RequestFingerprint::from_params(&params);
352
353            let response = json!({"ok": true, "i": i});
354
355            // Manually create entry with staggered timestamps
356            let entry = IdempotencyEntry {
357                fingerprint,
358                response,
359                created_at: now + i,
360                expires_at: now + DEFAULT_TTL_SECONDS + i,
361            };
362
363            // Add directly to bypass GC in put()
364            let key_str = key.to_string();
365            store.entries.insert(key_str, entry);
366        }
367
368        // Now run GC manually
369        store.gc().unwrap();
370
371        // Should only have 3 entries (capacity limit)
372        assert_eq!(store.len(), 3);
373
374        // Oldest entries should be removed (key-0 and key-1)
375        let key0 = ScopedKey::new(
376            "T123".into(),
377            "U456".into(),
378            "chat.postMessage".into(),
379            "key-0".into(),
380        );
381        assert!(store.get(&key0).is_none());
382    }
383
384    #[test]
385    fn test_persistence() {
386        let temp_dir = TempDir::new().unwrap();
387        let store_path = temp_dir.path().join("test_store.json");
388
389        let key = ScopedKey::new(
390            "T123".into(),
391            "U456".into(),
392            "chat.postMessage".into(),
393            "test-key".into(),
394        );
395
396        let mut params = serde_json::Map::new();
397        params.insert("channel".into(), json!("C123"));
398        let fingerprint = RequestFingerprint::from_params(&params);
399
400        let response = json!({"ok": true});
401
402        // Create and save
403        {
404            let mut store = IdempotencyStore::with_path(store_path.clone()).unwrap();
405            store
406                .put(key.clone(), fingerprint.clone(), response.clone())
407                .unwrap();
408        }
409
410        // Load and verify
411        {
412            let store = IdempotencyStore::with_path(store_path).unwrap();
413            let result = store.check(&key, &fingerprint).unwrap();
414            assert_eq!(result, Some(response));
415        }
416    }
417}