umi_memory/dst/
storage.rs

1//! SimStorage - Simulated Storage with Fault Injection
2//!
3//! TigerStyle: In-memory storage that can fail deterministically.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use thiserror::Error;
9
10use super::clock::SimClock;
11use super::fault::{FaultInjector, FaultType};
12use super::rng::DeterministicRng;
13
14/// Storage errors.
15#[derive(Error, Debug, Clone)]
16pub enum StorageError {
17    /// Write operation failed.
18    #[error("storage write failed: {0}")]
19    Write(String),
20
21    /// Read operation failed.
22    #[error("storage read failed: {0}")]
23    Read(String),
24
25    /// Delete operation failed.
26    #[error("storage delete failed: {0}")]
27    Delete(String),
28
29    /// Storage corruption detected.
30    #[error("storage corruption detected: {0}")]
31    Corruption(String),
32
33    /// Disk is full.
34    #[error("disk full: {0}")]
35    DiskFull(String),
36}
37
38/// Write-specific error alias.
39pub type StorageWriteError = StorageError;
40
41/// Read-specific error alias.
42pub type StorageReadError = StorageError;
43
44/// Storage statistics.
45#[derive(Debug, Default)]
46pub struct StorageStats {
47    pub writes_count: u64,
48    pub reads_count: u64,
49    pub deletes_count: u64,
50    pub entries_count: u64,
51    pub bytes_total: u64,
52    pub faults_injected_count: u64,
53}
54
55/// Entry metadata.
56#[derive(Debug, Clone)]
57struct StorageEntry {
58    data: Vec<u8>,
59    #[allow(dead_code)] // For future temporal queries
60    created_at_ms: u64,
61    #[allow(dead_code)] // For future temporal queries
62    updated_at_ms: u64,
63}
64
65/// Simulated storage for DST testing.
66///
67/// TigerStyle:
68/// - In-memory HashMap storage
69/// - Fault injection at every operation
70/// - Full statistics tracking
71/// - Shared FaultInjector via Arc (Kelpie pattern)
72#[derive(Debug)]
73pub struct SimStorage {
74    data: HashMap<String, StorageEntry>,
75    clock: SimClock,
76    #[allow(dead_code)] // For future random delays/corruption
77    rng: DeterministicRng,
78    /// Shared fault injector (via Arc for sharing with simulation harness)
79    faults: Arc<FaultInjector>,
80    // Statistics
81    writes_count: AtomicU64,
82    reads_count: AtomicU64,
83    deletes_count: AtomicU64,
84    faults_injected_count: AtomicU64,
85}
86
87impl SimStorage {
88    /// Create a new simulated storage.
89    ///
90    /// TigerStyle: Takes Arc<FaultInjector> to share with simulation harness.
91    #[must_use]
92    pub fn new(clock: SimClock, rng: DeterministicRng, faults: Arc<FaultInjector>) -> Self {
93        Self {
94            data: HashMap::new(),
95            clock,
96            rng,
97            faults,
98            writes_count: AtomicU64::new(0),
99            reads_count: AtomicU64::new(0),
100            deletes_count: AtomicU64::new(0),
101            faults_injected_count: AtomicU64::new(0),
102        }
103    }
104
105    /// Write a value to storage.
106    ///
107    /// # Errors
108    /// Returns error if fault injection triggers a failure.
109    pub async fn write(&mut self, key: &str, value: &[u8]) -> Result<(), StorageError> {
110        // Preconditions
111        assert!(!key.is_empty(), "key must not be empty");
112        assert!(value.len() <= 10_000_000, "value too large");
113
114        // Check for fault injection
115        if let Some(fault) = self.faults.should_inject("storage_write") {
116            self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
117            return Err(self.fault_to_error(fault, "write"));
118        }
119
120        let now = self.clock.now_ms();
121        let entry = StorageEntry {
122            data: value.to_vec(),
123            created_at_ms: now,
124            updated_at_ms: now,
125        };
126
127        self.data.insert(key.to_string(), entry);
128        self.writes_count.fetch_add(1, Ordering::Relaxed);
129
130        // Postcondition
131        assert!(self.data.contains_key(key), "key must exist after write");
132
133        Ok(())
134    }
135
136    /// Read a value from storage.
137    ///
138    /// # Errors
139    /// Returns error if fault injection triggers a failure.
140    pub async fn read(&mut self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
141        // Precondition
142        assert!(!key.is_empty(), "key must not be empty");
143
144        // Check for fault injection
145        if let Some(fault) = self.faults.should_inject("storage_read") {
146            self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
147            return Err(self.fault_to_error(fault, "read"));
148        }
149
150        self.reads_count.fetch_add(1, Ordering::Relaxed);
151
152        Ok(self.data.get(key).map(|entry| entry.data.clone()))
153    }
154
155    /// Delete a value from storage.
156    ///
157    /// # Errors
158    /// Returns error if fault injection triggers a failure.
159    pub async fn delete(&mut self, key: &str) -> Result<bool, StorageError> {
160        // Precondition
161        assert!(!key.is_empty(), "key must not be empty");
162
163        // Check for fault injection
164        if let Some(fault) = self.faults.should_inject("storage_delete") {
165            self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
166            return Err(self.fault_to_error(fault, "delete"));
167        }
168
169        self.deletes_count.fetch_add(1, Ordering::Relaxed);
170
171        Ok(self.data.remove(key).is_some())
172    }
173
174    /// Check if a key exists.
175    ///
176    /// # Errors
177    /// Returns error if fault injection triggers a failure.
178    pub async fn exists(&mut self, key: &str) -> Result<bool, StorageError> {
179        // Precondition
180        assert!(!key.is_empty(), "key must not be empty");
181
182        // Check for fault injection
183        if let Some(fault) = self.faults.should_inject("storage_read") {
184            self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
185            return Err(self.fault_to_error(fault, "exists"));
186        }
187
188        Ok(self.data.contains_key(key))
189    }
190
191    /// List keys matching a prefix.
192    ///
193    /// # Errors
194    /// Returns error if fault injection triggers a failure.
195    pub async fn keys(&mut self, prefix: Option<&str>) -> Result<Vec<String>, StorageError> {
196        // Check for fault injection
197        if let Some(fault) = self.faults.should_inject("storage_read") {
198            self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
199            return Err(self.fault_to_error(fault, "keys"));
200        }
201
202        let keys: Vec<String> = match prefix {
203            Some(p) => self
204                .data
205                .keys()
206                .filter(|k| k.starts_with(p))
207                .cloned()
208                .collect(),
209            None => self.data.keys().cloned().collect(),
210        };
211
212        Ok(keys)
213    }
214
215    /// Get storage statistics.
216    #[must_use]
217    pub fn stats(&self) -> StorageStats {
218        let bytes_total: u64 = self.data.values().map(|e| e.data.len() as u64).sum();
219
220        StorageStats {
221            writes_count: self.writes_count.load(Ordering::Relaxed),
222            reads_count: self.reads_count.load(Ordering::Relaxed),
223            deletes_count: self.deletes_count.load(Ordering::Relaxed),
224            entries_count: self.data.len() as u64,
225            bytes_total,
226            faults_injected_count: self.faults_injected_count.load(Ordering::Relaxed),
227        }
228    }
229
230    /// Get the clock.
231    #[must_use]
232    pub fn clock(&self) -> &SimClock {
233        &self.clock
234    }
235
236    /// Get mutable clock.
237    pub fn clock_mut(&mut self) -> &mut SimClock {
238        &mut self.clock
239    }
240
241    /// Clear all data.
242    pub fn clear(&mut self) {
243        self.data.clear();
244    }
245
246    /// Convert a fault type to an error.
247    fn fault_to_error(&self, fault: FaultType, operation: &str) -> StorageError {
248        match fault {
249            FaultType::StorageWriteFail => {
250                StorageError::Write(format!("injected {} fault", operation))
251            }
252            FaultType::StorageReadFail => {
253                StorageError::Read(format!("injected {} fault", operation))
254            }
255            FaultType::StorageDeleteFail => {
256                StorageError::Delete(format!("injected {} fault", operation))
257            }
258            FaultType::StorageCorruption => {
259                StorageError::Corruption(format!("injected {} fault", operation))
260            }
261            FaultType::StorageDiskFull => {
262                StorageError::DiskFull(format!("injected {} fault", operation))
263            }
264            _ => StorageError::Write(format!("unexpected fault type for {}", operation)),
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::dst::fault::{FaultConfig, FaultInjectorBuilder};
273
274    fn create_storage() -> SimStorage {
275        let clock = SimClock::new();
276        let mut rng = DeterministicRng::new(42);
277        let faults = Arc::new(FaultInjectorBuilder::new(rng.fork()).build());
278        SimStorage::new(clock, rng, faults)
279    }
280
281    fn create_faulty_storage(fault_type: FaultType) -> SimStorage {
282        let clock = SimClock::new();
283        let mut rng = DeterministicRng::new(42);
284        let faults = Arc::new(
285            FaultInjectorBuilder::new(rng.fork())
286                .with_fault(FaultConfig::new(fault_type, 1.0))
287                .build(),
288        );
289        SimStorage::new(clock, rng, faults)
290    }
291
292    #[tokio::test]
293    async fn test_write_and_read() {
294        let mut storage = create_storage();
295
296        storage.write("key1", b"value1").await.unwrap();
297        let result = storage.read("key1").await.unwrap();
298
299        assert_eq!(result, Some(b"value1".to_vec()));
300    }
301
302    #[tokio::test]
303    async fn test_read_nonexistent() {
304        let mut storage = create_storage();
305
306        let result = storage.read("nonexistent").await.unwrap();
307
308        assert_eq!(result, None);
309    }
310
311    #[tokio::test]
312    async fn test_delete() {
313        let mut storage = create_storage();
314
315        storage.write("key1", b"value1").await.unwrap();
316        let deleted = storage.delete("key1").await.unwrap();
317
318        assert!(deleted);
319        assert_eq!(storage.read("key1").await.unwrap(), None);
320    }
321
322    #[tokio::test]
323    async fn test_delete_nonexistent() {
324        let mut storage = create_storage();
325
326        let deleted = storage.delete("nonexistent").await.unwrap();
327
328        assert!(!deleted);
329    }
330
331    #[tokio::test]
332    async fn test_exists() {
333        let mut storage = create_storage();
334
335        assert!(!storage.exists("key1").await.unwrap());
336
337        storage.write("key1", b"value1").await.unwrap();
338
339        assert!(storage.exists("key1").await.unwrap());
340    }
341
342    #[tokio::test]
343    async fn test_keys() {
344        let mut storage = create_storage();
345
346        storage.write("user:1", b"alice").await.unwrap();
347        storage.write("user:2", b"bob").await.unwrap();
348        storage.write("session:1", b"data").await.unwrap();
349
350        let mut all_keys = storage.keys(None).await.unwrap();
351        all_keys.sort();
352        assert_eq!(all_keys, vec!["session:1", "user:1", "user:2"]);
353
354        let mut user_keys = storage.keys(Some("user:")).await.unwrap();
355        user_keys.sort();
356        assert_eq!(user_keys, vec!["user:1", "user:2"]);
357    }
358
359    #[tokio::test]
360    async fn test_write_fault_injection() {
361        let mut storage = create_faulty_storage(FaultType::StorageWriteFail);
362
363        let result = storage.write("key", b"value").await;
364
365        assert!(result.is_err());
366        assert!(matches!(result.unwrap_err(), StorageError::Write(_)));
367    }
368
369    #[tokio::test]
370    async fn test_read_fault_injection() {
371        let mut storage = create_faulty_storage(FaultType::StorageReadFail);
372
373        let result = storage.read("key").await;
374
375        assert!(result.is_err());
376        assert!(matches!(result.unwrap_err(), StorageError::Read(_)));
377    }
378
379    #[tokio::test]
380    async fn test_stats_tracking() {
381        let mut storage = create_storage();
382
383        storage.write("k1", b"v1").await.unwrap();
384        storage.write("k2", b"v2").await.unwrap();
385        storage.read("k1").await.unwrap();
386        storage.read("k3").await.unwrap(); // nonexistent
387        storage.delete("k1").await.unwrap();
388
389        let stats = storage.stats();
390
391        assert_eq!(stats.writes_count, 2);
392        assert_eq!(stats.reads_count, 2);
393        assert_eq!(stats.deletes_count, 1);
394        assert_eq!(stats.entries_count, 1); // k2 remains
395    }
396
397    #[tokio::test]
398    async fn test_clear() {
399        let mut storage = create_storage();
400
401        storage.write("k1", b"v1").await.unwrap();
402        storage.write("k2", b"v2").await.unwrap();
403
404        storage.clear();
405
406        assert_eq!(storage.stats().entries_count, 0);
407    }
408
409    #[test]
410    #[should_panic(expected = "key must not be empty")]
411    fn test_write_empty_key() {
412        let mut storage = create_storage();
413        let _ = tokio_test::block_on(storage.write("", b"value"));
414    }
415}