1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use thiserror::Error;
9
10use super::clock::SimClock;
11use super::fault::{FaultInjector, FaultType};
12use super::rng::DeterministicRng;
13
14#[derive(Error, Debug, Clone)]
16pub enum StorageError {
17 #[error("storage write failed: {0}")]
19 Write(String),
20
21 #[error("storage read failed: {0}")]
23 Read(String),
24
25 #[error("storage delete failed: {0}")]
27 Delete(String),
28
29 #[error("storage corruption detected: {0}")]
31 Corruption(String),
32
33 #[error("disk full: {0}")]
35 DiskFull(String),
36}
37
38pub type StorageWriteError = StorageError;
40
41pub type StorageReadError = StorageError;
43
44#[derive(Debug, Default)]
46pub struct StorageStats {
47 pub writes_count: u64,
48 pub reads_count: u64,
49 pub deletes_count: u64,
50 pub entries_count: u64,
51 pub bytes_total: u64,
52 pub faults_injected_count: u64,
53}
54
55#[derive(Debug, Clone)]
57struct StorageEntry {
58 data: Vec<u8>,
59 #[allow(dead_code)] created_at_ms: u64,
61 #[allow(dead_code)] updated_at_ms: u64,
63}
64
65#[derive(Debug)]
73pub struct SimStorage {
74 data: HashMap<String, StorageEntry>,
75 clock: SimClock,
76 #[allow(dead_code)] rng: DeterministicRng,
78 faults: Arc<FaultInjector>,
80 writes_count: AtomicU64,
82 reads_count: AtomicU64,
83 deletes_count: AtomicU64,
84 faults_injected_count: AtomicU64,
85}
86
87impl SimStorage {
88 #[must_use]
92 pub fn new(clock: SimClock, rng: DeterministicRng, faults: Arc<FaultInjector>) -> Self {
93 Self {
94 data: HashMap::new(),
95 clock,
96 rng,
97 faults,
98 writes_count: AtomicU64::new(0),
99 reads_count: AtomicU64::new(0),
100 deletes_count: AtomicU64::new(0),
101 faults_injected_count: AtomicU64::new(0),
102 }
103 }
104
105 pub async fn write(&mut self, key: &str, value: &[u8]) -> Result<(), StorageError> {
110 assert!(!key.is_empty(), "key must not be empty");
112 assert!(value.len() <= 10_000_000, "value too large");
113
114 if let Some(fault) = self.faults.should_inject("storage_write") {
116 self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
117 return Err(self.fault_to_error(fault, "write"));
118 }
119
120 let now = self.clock.now_ms();
121 let entry = StorageEntry {
122 data: value.to_vec(),
123 created_at_ms: now,
124 updated_at_ms: now,
125 };
126
127 self.data.insert(key.to_string(), entry);
128 self.writes_count.fetch_add(1, Ordering::Relaxed);
129
130 assert!(self.data.contains_key(key), "key must exist after write");
132
133 Ok(())
134 }
135
136 pub async fn read(&mut self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
141 assert!(!key.is_empty(), "key must not be empty");
143
144 if let Some(fault) = self.faults.should_inject("storage_read") {
146 self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
147 return Err(self.fault_to_error(fault, "read"));
148 }
149
150 self.reads_count.fetch_add(1, Ordering::Relaxed);
151
152 Ok(self.data.get(key).map(|entry| entry.data.clone()))
153 }
154
155 pub async fn delete(&mut self, key: &str) -> Result<bool, StorageError> {
160 assert!(!key.is_empty(), "key must not be empty");
162
163 if let Some(fault) = self.faults.should_inject("storage_delete") {
165 self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
166 return Err(self.fault_to_error(fault, "delete"));
167 }
168
169 self.deletes_count.fetch_add(1, Ordering::Relaxed);
170
171 Ok(self.data.remove(key).is_some())
172 }
173
174 pub async fn exists(&mut self, key: &str) -> Result<bool, StorageError> {
179 assert!(!key.is_empty(), "key must not be empty");
181
182 if let Some(fault) = self.faults.should_inject("storage_read") {
184 self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
185 return Err(self.fault_to_error(fault, "exists"));
186 }
187
188 Ok(self.data.contains_key(key))
189 }
190
191 pub async fn keys(&mut self, prefix: Option<&str>) -> Result<Vec<String>, StorageError> {
196 if let Some(fault) = self.faults.should_inject("storage_read") {
198 self.faults_injected_count.fetch_add(1, Ordering::Relaxed);
199 return Err(self.fault_to_error(fault, "keys"));
200 }
201
202 let keys: Vec<String> = match prefix {
203 Some(p) => self
204 .data
205 .keys()
206 .filter(|k| k.starts_with(p))
207 .cloned()
208 .collect(),
209 None => self.data.keys().cloned().collect(),
210 };
211
212 Ok(keys)
213 }
214
215 #[must_use]
217 pub fn stats(&self) -> StorageStats {
218 let bytes_total: u64 = self.data.values().map(|e| e.data.len() as u64).sum();
219
220 StorageStats {
221 writes_count: self.writes_count.load(Ordering::Relaxed),
222 reads_count: self.reads_count.load(Ordering::Relaxed),
223 deletes_count: self.deletes_count.load(Ordering::Relaxed),
224 entries_count: self.data.len() as u64,
225 bytes_total,
226 faults_injected_count: self.faults_injected_count.load(Ordering::Relaxed),
227 }
228 }
229
230 #[must_use]
232 pub fn clock(&self) -> &SimClock {
233 &self.clock
234 }
235
236 pub fn clock_mut(&mut self) -> &mut SimClock {
238 &mut self.clock
239 }
240
241 pub fn clear(&mut self) {
243 self.data.clear();
244 }
245
246 fn fault_to_error(&self, fault: FaultType, operation: &str) -> StorageError {
248 match fault {
249 FaultType::StorageWriteFail => {
250 StorageError::Write(format!("injected {} fault", operation))
251 }
252 FaultType::StorageReadFail => {
253 StorageError::Read(format!("injected {} fault", operation))
254 }
255 FaultType::StorageDeleteFail => {
256 StorageError::Delete(format!("injected {} fault", operation))
257 }
258 FaultType::StorageCorruption => {
259 StorageError::Corruption(format!("injected {} fault", operation))
260 }
261 FaultType::StorageDiskFull => {
262 StorageError::DiskFull(format!("injected {} fault", operation))
263 }
264 _ => StorageError::Write(format!("unexpected fault type for {}", operation)),
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::dst::fault::{FaultConfig, FaultInjectorBuilder};
273
274 fn create_storage() -> SimStorage {
275 let clock = SimClock::new();
276 let mut rng = DeterministicRng::new(42);
277 let faults = Arc::new(FaultInjectorBuilder::new(rng.fork()).build());
278 SimStorage::new(clock, rng, faults)
279 }
280
281 fn create_faulty_storage(fault_type: FaultType) -> SimStorage {
282 let clock = SimClock::new();
283 let mut rng = DeterministicRng::new(42);
284 let faults = Arc::new(
285 FaultInjectorBuilder::new(rng.fork())
286 .with_fault(FaultConfig::new(fault_type, 1.0))
287 .build(),
288 );
289 SimStorage::new(clock, rng, faults)
290 }
291
292 #[tokio::test]
293 async fn test_write_and_read() {
294 let mut storage = create_storage();
295
296 storage.write("key1", b"value1").await.unwrap();
297 let result = storage.read("key1").await.unwrap();
298
299 assert_eq!(result, Some(b"value1".to_vec()));
300 }
301
302 #[tokio::test]
303 async fn test_read_nonexistent() {
304 let mut storage = create_storage();
305
306 let result = storage.read("nonexistent").await.unwrap();
307
308 assert_eq!(result, None);
309 }
310
311 #[tokio::test]
312 async fn test_delete() {
313 let mut storage = create_storage();
314
315 storage.write("key1", b"value1").await.unwrap();
316 let deleted = storage.delete("key1").await.unwrap();
317
318 assert!(deleted);
319 assert_eq!(storage.read("key1").await.unwrap(), None);
320 }
321
322 #[tokio::test]
323 async fn test_delete_nonexistent() {
324 let mut storage = create_storage();
325
326 let deleted = storage.delete("nonexistent").await.unwrap();
327
328 assert!(!deleted);
329 }
330
331 #[tokio::test]
332 async fn test_exists() {
333 let mut storage = create_storage();
334
335 assert!(!storage.exists("key1").await.unwrap());
336
337 storage.write("key1", b"value1").await.unwrap();
338
339 assert!(storage.exists("key1").await.unwrap());
340 }
341
342 #[tokio::test]
343 async fn test_keys() {
344 let mut storage = create_storage();
345
346 storage.write("user:1", b"alice").await.unwrap();
347 storage.write("user:2", b"bob").await.unwrap();
348 storage.write("session:1", b"data").await.unwrap();
349
350 let mut all_keys = storage.keys(None).await.unwrap();
351 all_keys.sort();
352 assert_eq!(all_keys, vec!["session:1", "user:1", "user:2"]);
353
354 let mut user_keys = storage.keys(Some("user:")).await.unwrap();
355 user_keys.sort();
356 assert_eq!(user_keys, vec!["user:1", "user:2"]);
357 }
358
359 #[tokio::test]
360 async fn test_write_fault_injection() {
361 let mut storage = create_faulty_storage(FaultType::StorageWriteFail);
362
363 let result = storage.write("key", b"value").await;
364
365 assert!(result.is_err());
366 assert!(matches!(result.unwrap_err(), StorageError::Write(_)));
367 }
368
369 #[tokio::test]
370 async fn test_read_fault_injection() {
371 let mut storage = create_faulty_storage(FaultType::StorageReadFail);
372
373 let result = storage.read("key").await;
374
375 assert!(result.is_err());
376 assert!(matches!(result.unwrap_err(), StorageError::Read(_)));
377 }
378
379 #[tokio::test]
380 async fn test_stats_tracking() {
381 let mut storage = create_storage();
382
383 storage.write("k1", b"v1").await.unwrap();
384 storage.write("k2", b"v2").await.unwrap();
385 storage.read("k1").await.unwrap();
386 storage.read("k3").await.unwrap(); storage.delete("k1").await.unwrap();
388
389 let stats = storage.stats();
390
391 assert_eq!(stats.writes_count, 2);
392 assert_eq!(stats.reads_count, 2);
393 assert_eq!(stats.deletes_count, 1);
394 assert_eq!(stats.entries_count, 1); }
396
397 #[tokio::test]
398 async fn test_clear() {
399 let mut storage = create_storage();
400
401 storage.write("k1", b"v1").await.unwrap();
402 storage.write("k2", b"v2").await.unwrap();
403
404 storage.clear();
405
406 assert_eq!(storage.stats().entries_count, 0);
407 }
408
409 #[test]
410 #[should_panic(expected = "key must not be empty")]
411 fn test_write_empty_key() {
412 let mut storage = create_storage();
413 let _ = tokio_test::block_on(storage.write("", b"value"));
414 }
415}