Skip to main content

commonware_runtime/storage/
faulty.rs

1//! A storage wrapper that injects deterministic faults for testing crash recovery.
2
3use crate::{deterministic::BoxDynRng, Error, IoBufs, IoBufsMut};
4use bytes::Buf;
5use rand::Rng;
6use std::{
7    io::Error as IoError,
8    sync::{
9        atomic::{AtomicU64, Ordering},
10        Arc, Mutex, RwLock,
11    },
12};
13
14/// Operation types for fault injection.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum Op {
17    Open,
18    Read,
19    Write,
20    Sync,
21    Resize,
22    Remove,
23    Scan,
24}
25
26/// Configuration for deterministic storage fault injection.
27///
28/// Each rate is a probability from 0.0 (never fail) to 1.0 (always fail).
29#[derive(Clone, Debug, Default)]
30pub struct Config {
31    /// Failure rate for `open_versioned` operations.
32    pub open_rate: Option<f64>,
33
34    /// Failure rate for `read_at` operations.
35    pub read_rate: Option<f64>,
36
37    /// Failure rate for `write_at` operations.
38    pub write_rate: Option<f64>,
39
40    /// Probability that a write failure is a partial write (some bytes written
41    /// before failure) rather than a complete failure (no bytes written).
42    /// Only applies when `write_rate` triggers a failure.
43    /// Value from 0.0 (always complete failure) to 1.0 (always partial write).
44    pub partial_write_rate: Option<f64>,
45
46    /// Failure rate for `sync` operations.
47    pub sync_rate: Option<f64>,
48
49    /// Failure rate for `resize` operations.
50    pub resize_rate: Option<f64>,
51
52    /// Probability that a resize failure is partial (resized to an intermediate
53    /// size before failure) rather than a complete failure (size unchanged).
54    /// Only applies when `resize_rate` triggers a failure.
55    /// Value from 0.0 (always complete failure) to 1.0 (always partial resize).
56    pub partial_resize_rate: Option<f64>,
57
58    /// Failure rate for `remove` operations.
59    pub remove_rate: Option<f64>,
60
61    /// Failure rate for `scan` operations.
62    pub scan_rate: Option<f64>,
63}
64
65impl Config {
66    /// Get the failure rate for an operation type.
67    fn rate_for(&self, op: Op) -> f64 {
68        match op {
69            Op::Open => self.open_rate,
70            Op::Read => self.read_rate,
71            Op::Write => self.write_rate,
72            Op::Sync => self.sync_rate,
73            Op::Resize => self.resize_rate,
74            Op::Remove => self.remove_rate,
75            Op::Scan => self.scan_rate,
76        }
77        .unwrap_or(0.0)
78    }
79
80    /// Set the open failure rate.
81    pub const fn open(mut self, rate: f64) -> Self {
82        self.open_rate = Some(rate);
83        self
84    }
85
86    /// Set the read failure rate.
87    pub const fn read(mut self, rate: f64) -> Self {
88        self.read_rate = Some(rate);
89        self
90    }
91
92    /// Set the write failure rate.
93    pub const fn write(mut self, rate: f64) -> Self {
94        self.write_rate = Some(rate);
95        self
96    }
97
98    /// Set the partial write rate (probability of partial vs complete write failure).
99    pub const fn partial_write(mut self, rate: f64) -> Self {
100        self.partial_write_rate = Some(rate);
101        self
102    }
103
104    /// Set the sync failure rate.
105    pub const fn sync(mut self, rate: f64) -> Self {
106        self.sync_rate = Some(rate);
107        self
108    }
109
110    /// Set the resize failure rate.
111    pub const fn resize(mut self, rate: f64) -> Self {
112        self.resize_rate = Some(rate);
113        self
114    }
115
116    /// Set the partial resize rate (probability of partial vs complete resize failure).
117    pub const fn partial_resize(mut self, rate: f64) -> Self {
118        self.partial_resize_rate = Some(rate);
119        self
120    }
121
122    /// Set the remove failure rate.
123    pub const fn remove(mut self, rate: f64) -> Self {
124        self.remove_rate = Some(rate);
125        self
126    }
127
128    /// Set the scan failure rate.
129    pub const fn scan(mut self, rate: f64) -> Self {
130        self.scan_rate = Some(rate);
131        self
132    }
133}
134
135/// Shared fault injection context.
136#[derive(Clone)]
137struct Oracle {
138    rng: Arc<Mutex<BoxDynRng>>,
139    config: Arc<RwLock<Config>>,
140}
141
142impl Oracle {
143    /// Check if a fault should be injected for the given operation.
144    fn should_fail(&self, op: Op) -> bool {
145        self.roll(Some(self.config.read().unwrap().rate_for(op)))
146    }
147
148    /// Check if a write fault should be injected. Returns (should_fail, partial_rate).
149    /// Reads config once to avoid nested lock acquisition.
150    fn check_write_fault(&self) -> (bool, Option<f64>) {
151        let config = self.config.read().unwrap();
152        let fail = self.roll(Some(config.rate_for(Op::Write)));
153        (fail, config.partial_write_rate)
154    }
155
156    /// Check if a resize fault should be injected. Returns (should_fail, partial_rate).
157    /// Reads config once to avoid nested lock acquisition.
158    fn check_resize_fault(&self) -> (bool, Option<f64>) {
159        let config = self.config.read().unwrap();
160        let fail = self.roll(Some(config.rate_for(Op::Resize)));
161        (fail, config.partial_resize_rate)
162    }
163
164    /// Check if an event should occur based on a probability rate.
165    fn roll(&self, rate: Option<f64>) -> bool {
166        let rate = rate.unwrap_or(0.0);
167        if rate <= 0.0 {
168            return false;
169        }
170        if rate >= 1.0 {
171            return true;
172        }
173        self.rng.lock().unwrap().gen::<f64>() < rate
174    }
175
176    /// Generate a random value strictly between `from` and `to`, or None if not possible.
177    fn random_between(&self, from: u64, to: u64) -> Option<u64> {
178        if from == to {
179            return None;
180        }
181        let (min, max) = if from < to { (from, to) } else { (to, from) };
182        if max - min <= 1 {
183            return None;
184        }
185        Some(self.rng.lock().unwrap().gen_range(min + 1..max))
186    }
187
188    /// Try to generate a partial operation target. Returns Some if both the rate
189    /// check passes and an intermediate value exists between `from` and `to`.
190    fn try_partial(&self, rate: Option<f64>, from: u64, to: u64) -> Option<u64> {
191        if self.roll(rate) {
192            self.random_between(from, to)
193        } else {
194            None
195        }
196    }
197}
198
199/// A storage wrapper that injects deterministic faults based on configuration.
200///
201/// Uses a shared RNG for determinism.
202#[derive(Clone)]
203pub struct Storage<S: crate::Storage> {
204    inner: S,
205    ctx: Oracle,
206}
207
208impl<S: crate::Storage> Storage<S> {
209    /// Create a new faulty storage wrapper.
210    pub fn new(inner: S, rng: Arc<Mutex<BoxDynRng>>, config: Arc<RwLock<Config>>) -> Self {
211        Self {
212            inner,
213            ctx: Oracle { rng, config },
214        }
215    }
216
217    /// Get a reference to the inner storage.
218    pub const fn inner(&self) -> &S {
219        &self.inner
220    }
221
222    /// Get access to the fault configuration for dynamic modification.
223    pub fn config(&self) -> Arc<RwLock<Config>> {
224        self.ctx.config.clone()
225    }
226}
227
228/// Create an IoError for injected faults.
229fn injected_io_error() -> IoError {
230    IoError::other("injected storage fault")
231}
232
233impl<S: crate::Storage> crate::Storage for Storage<S> {
234    type Blob = Blob<S::Blob>;
235
236    async fn open_versioned(
237        &self,
238        partition: &str,
239        name: &[u8],
240        versions: std::ops::RangeInclusive<u16>,
241    ) -> Result<(Self::Blob, u64, u16), Error> {
242        if self.ctx.should_fail(Op::Open) {
243            return Err(Error::Io(injected_io_error()));
244        }
245        self.inner
246            .open_versioned(partition, name, versions)
247            .await
248            .map(|(blob, len, blob_version)| {
249                (Blob::new(self.ctx.clone(), blob, len), len, blob_version)
250            })
251    }
252
253    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
254        if self.ctx.should_fail(Op::Remove) {
255            return Err(Error::Io(injected_io_error()));
256        }
257        self.inner.remove(partition, name).await
258    }
259
260    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
261        if self.ctx.should_fail(Op::Scan) {
262            return Err(Error::Io(injected_io_error()));
263        }
264        self.inner.scan(partition).await
265    }
266}
267
268/// A blob wrapper that injects deterministic faults based on configuration.
269#[derive(Clone)]
270pub struct Blob<B: crate::Blob> {
271    inner: B,
272    ctx: Oracle,
273    /// Tracked size for partial resize support.
274    size: Arc<AtomicU64>,
275}
276
277impl<B: crate::Blob> Blob<B> {
278    fn new(ctx: Oracle, inner: B, size: u64) -> Self {
279        Self {
280            inner,
281            ctx,
282            size: Arc::new(AtomicU64::new(size)),
283        }
284    }
285}
286
287impl<B: crate::Blob> crate::Blob for Blob<B> {
288    async fn read_at(
289        &self,
290        offset: u64,
291        buf: impl Into<IoBufsMut> + Send,
292    ) -> Result<IoBufsMut, Error> {
293        if self.ctx.should_fail(Op::Read) {
294            return Err(Error::Io(injected_io_error()));
295        }
296        self.inner.read_at(offset, buf.into()).await
297    }
298
299    async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
300        let buf: IoBufs = buf.into();
301        let total_bytes = buf.remaining() as u64;
302
303        let (should_fail, partial_rate) = self.ctx.check_write_fault();
304        if should_fail {
305            if let Some(bytes) = self.ctx.try_partial(partial_rate, 0, total_bytes) {
306                // Partial write: write some bytes, sync, then fail
307                self.inner
308                    .write_at(offset, buf.coalesce().slice(..bytes as usize))
309                    .await?;
310                self.inner.sync().await?;
311                self.size
312                    .fetch_max(offset.saturating_add(bytes), Ordering::Relaxed);
313                return Err(Error::Io(injected_io_error()));
314            }
315            return Err(Error::Io(injected_io_error()));
316        }
317
318        self.inner.write_at(offset, buf).await?;
319        self.size
320            .fetch_max(offset.saturating_add(total_bytes), Ordering::Relaxed);
321        Ok(())
322    }
323
324    async fn resize(&self, len: u64) -> Result<(), Error> {
325        let (should_fail, partial_rate) = self.ctx.check_resize_fault();
326        if should_fail {
327            let current = self.size.load(Ordering::Relaxed);
328            if let Some(len) = self.ctx.try_partial(partial_rate, current, len) {
329                // Partial resize: resize to intermediate size, sync, then fail
330                self.inner.resize(len).await?;
331                self.inner.sync().await?;
332                self.size.store(len, Ordering::Relaxed);
333                return Err(Error::Io(injected_io_error()));
334            }
335            return Err(Error::Io(injected_io_error()));
336        }
337        self.inner.resize(len).await?;
338        self.size.store(len, Ordering::Relaxed);
339        Ok(())
340    }
341
342    async fn sync(&self) -> Result<(), Error> {
343        if self.ctx.should_fail(Op::Sync) {
344            return Err(Error::Io(injected_io_error()));
345        }
346        self.inner.sync().await
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::{
354        storage::{memory::Storage as MemStorage, tests::run_storage_tests},
355        Blob as _, Storage as _,
356    };
357    use rand::{rngs::StdRng, SeedableRng};
358
359    /// Test harness with faulty storage wrapping memory storage.
360    struct Harness {
361        inner: MemStorage,
362        storage: Storage<MemStorage>,
363        config: Arc<RwLock<Config>>,
364    }
365
366    impl Harness {
367        fn new(config: Config) -> Self {
368            Self::with_seed(42, config)
369        }
370
371        fn with_seed(seed: u64, config: Config) -> Self {
372            let inner = MemStorage::default();
373            let rng = Arc::new(Mutex::new(
374                Box::new(StdRng::seed_from_u64(seed)) as BoxDynRng
375            ));
376            let config = Arc::new(RwLock::new(config));
377            let storage = Storage::new(inner.clone(), rng, config.clone());
378            Self {
379                inner,
380                storage,
381                config,
382            }
383        }
384    }
385
386    #[tokio::test]
387    async fn test_faulty_storage_no_faults() {
388        let h = Harness::new(Config::default());
389        run_storage_tests(h.storage).await;
390    }
391
392    #[tokio::test]
393    async fn test_faulty_storage_sync_always_fails() {
394        let h = Harness::new(Config::default().sync(1.0));
395
396        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
397        blob.write_at(0, b"data".to_vec()).await.unwrap();
398
399        assert!(matches!(blob.sync().await, Err(Error::Io(_))));
400    }
401
402    #[tokio::test]
403    async fn test_faulty_storage_write_always_fails() {
404        let h = Harness::new(Config::default().write(1.0));
405
406        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
407
408        assert!(matches!(
409            blob.write_at(0, b"data".to_vec()).await,
410            Err(Error::Io(_))
411        ));
412    }
413
414    #[tokio::test]
415    async fn test_faulty_storage_read_always_fails() {
416        let h = Harness::new(Config::default());
417
418        // Write some data first (no faults)
419        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
420        blob.write_at(0, b"data".to_vec()).await.unwrap();
421        blob.sync().await.unwrap();
422
423        // Enable read faults
424        h.config.write().unwrap().read_rate = Some(1.0);
425
426        assert!(matches!(
427            blob.read_at(0, vec![0u8; 4]).await,
428            Err(Error::Io(_))
429        ));
430    }
431
432    #[tokio::test]
433    async fn test_faulty_storage_open_always_fails() {
434        let h = Harness::new(Config::default().open(1.0));
435
436        assert!(matches!(
437            h.storage.open("partition", b"test").await,
438            Err(Error::Io(_))
439        ));
440    }
441
442    #[tokio::test]
443    async fn test_faulty_storage_remove_always_fails() {
444        let h = Harness::new(Config::default());
445
446        // Create a blob first
447        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
448        blob.write_at(0, b"data".to_vec()).await.unwrap();
449        blob.sync().await.unwrap();
450        drop(blob);
451
452        // Enable remove faults
453        h.config.write().unwrap().remove_rate = Some(1.0);
454
455        assert!(matches!(
456            h.storage.remove("partition", Some(b"test")).await,
457            Err(Error::Io(_))
458        ));
459    }
460
461    #[tokio::test]
462    async fn test_faulty_storage_scan_always_fails() {
463        let h = Harness::new(Config::default());
464
465        // Create some blobs first
466        for i in 0..3 {
467            let name = format!("blob{i}");
468            let (blob, _) = h.storage.open("partition", name.as_bytes()).await.unwrap();
469            blob.write_at(0, b"data".to_vec()).await.unwrap();
470            blob.sync().await.unwrap();
471        }
472
473        // Enable scan faults
474        h.config.write().unwrap().scan_rate = Some(1.0);
475
476        assert!(matches!(
477            h.storage.scan("partition").await,
478            Err(Error::Io(_))
479        ));
480    }
481
482    #[tokio::test]
483    async fn test_faulty_storage_determinism() {
484        async fn run_ops(seed: u64, rate: f64) -> Vec<bool> {
485            let h = Harness::with_seed(seed, Config::default().open(rate));
486            let mut results = Vec::new();
487            for i in 0..20 {
488                let name = format!("blob{i}");
489                results.push(h.storage.open("partition", name.as_bytes()).await.is_ok());
490            }
491            results
492        }
493
494        let results1 = run_ops(42, 0.5).await;
495        let results2 = run_ops(42, 0.5).await;
496        assert_eq!(results1, results2, "Same seed should produce same results");
497
498        let results3 = run_ops(999, 0.5).await;
499        assert_ne!(
500            results1, results3,
501            "Different seeds should produce different results"
502        );
503    }
504
505    #[tokio::test]
506    async fn test_faulty_storage_rate_for() {
507        let config = Config::default().open(0.1).sync(0.9);
508
509        assert!((config.rate_for(Op::Open) - 0.1).abs() < f64::EPSILON);
510        assert!((config.rate_for(Op::Sync) - 0.9).abs() < f64::EPSILON);
511        assert!(config.rate_for(Op::Write).abs() < f64::EPSILON);
512    }
513
514    #[tokio::test]
515    async fn test_faulty_storage_dynamic_config() {
516        let h = Harness::new(Config::default());
517
518        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
519        blob.sync().await.unwrap();
520
521        h.config.write().unwrap().sync_rate = Some(1.0);
522        assert!(matches!(blob.sync().await, Err(Error::Io(_))));
523
524        h.config.write().unwrap().sync_rate = Some(0.0);
525        blob.sync().await.unwrap();
526    }
527
528    #[tokio::test]
529    async fn test_faulty_storage_partial_write() {
530        let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
531
532        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
533        let data = b"hello world".to_vec();
534        let result = blob.write_at(0, data.clone()).await;
535
536        assert!(matches!(result, Err(Error::Io(_))));
537
538        let (inner_blob, size) = h.inner.open("partition", b"test").await.unwrap();
539        let bytes_written = size as usize;
540        assert!(
541            bytes_written > 0 && bytes_written < data.len(),
542            "Expected partial write: {bytes_written} bytes out of {}",
543            data.len()
544        );
545
546        let read_buf = vec![0u8; bytes_written];
547        let read_result = inner_blob.read_at(0, read_buf).await.unwrap();
548        assert_eq!(read_result.coalesce().as_ref(), &data[..bytes_written]);
549    }
550
551    #[tokio::test]
552    async fn test_faulty_storage_partial_write_disabled() {
553        let h = Harness::new(Config::default().write(1.0).partial_write(0.0));
554
555        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
556        let result = blob.write_at(0, b"hello world".to_vec()).await;
557
558        assert!(matches!(result, Err(Error::Io(_))));
559
560        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
561        assert_eq!(
562            size, 0,
563            "Expected no bytes written when partial_write_rate is 0"
564        );
565    }
566
567    #[tokio::test]
568    async fn test_faulty_storage_partial_write_single_byte() {
569        let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
570
571        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
572        let result = blob.write_at(0, b"x".to_vec()).await;
573
574        assert!(matches!(result, Err(Error::Io(_))));
575
576        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
577        assert_eq!(size, 0, "No partial write possible for single byte");
578    }
579
580    #[tokio::test]
581    async fn test_faulty_storage_partial_resize_grow() {
582        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
583
584        let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
585        assert_eq!(initial_size, 0);
586
587        let target_size = 100u64;
588        let result = blob.resize(target_size).await;
589
590        assert!(matches!(result, Err(Error::Io(_))));
591
592        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
593        assert!(
594            actual_size > 0 && actual_size < target_size,
595            "Expected partial resize: size {actual_size} should be between 0 and {target_size}"
596        );
597    }
598
599    #[tokio::test]
600    async fn test_faulty_storage_partial_resize_shrink() {
601        let h = Harness::new(Config::default());
602
603        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
604        blob.resize(100).await.unwrap();
605        blob.sync().await.unwrap();
606
607        {
608            let mut cfg = h.config.write().unwrap();
609            cfg.resize_rate = Some(1.0);
610            cfg.partial_resize_rate = Some(1.0);
611        }
612
613        let target_size = 10u64;
614        let result = blob.resize(target_size).await;
615
616        assert!(matches!(result, Err(Error::Io(_))));
617
618        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
619        assert!(
620            actual_size > target_size && actual_size < 100,
621            "Expected partial shrink: size {actual_size} should be between {target_size} and 100"
622        );
623    }
624
625    #[tokio::test]
626    async fn test_faulty_storage_partial_resize_disabled() {
627        let h = Harness::new(Config::default().resize(1.0).partial_resize(0.0));
628
629        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
630        let result = blob.resize(100).await;
631
632        assert!(matches!(result, Err(Error::Io(_))));
633
634        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
635        assert_eq!(size, 0, "Expected no resize when partial_resize_rate is 0");
636    }
637
638    #[tokio::test]
639    async fn test_faulty_storage_partial_resize_same_size() {
640        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
641
642        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
643        let result = blob.resize(0).await;
644
645        assert!(matches!(result, Err(Error::Io(_))));
646
647        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
648        assert_eq!(size, 0);
649    }
650
651    #[tokio::test]
652    async fn test_faulty_storage_partial_resize_after_write_extends() {
653        let h = Harness::new(Config::default());
654
655        let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
656        assert_eq!(initial_size, 0);
657
658        blob.write_at(0, vec![0xABu8; 50]).await.unwrap();
659        blob.sync().await.unwrap();
660
661        let (_, size_after_write) = h.inner.open("partition", b"test").await.unwrap();
662        assert_eq!(size_after_write, 50);
663
664        {
665            let mut cfg = h.config.write().unwrap();
666            cfg.resize_rate = Some(1.0);
667            cfg.partial_resize_rate = Some(1.0);
668        }
669
670        let target_size = 10u64;
671        let result = blob.resize(target_size).await;
672
673        assert!(matches!(result, Err(Error::Io(_))));
674
675        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
676        assert!(
677            actual_size > target_size && actual_size < 50,
678            "Expected partial shrink from 50: size {actual_size} should be between {target_size} and 50"
679        );
680    }
681
682    #[tokio::test]
683    async fn test_faulty_storage_partial_resize_one_byte_difference() {
684        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
685
686        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
687        let result = blob.resize(1).await;
688
689        assert!(matches!(result, Err(Error::Io(_))));
690
691        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
692        assert_eq!(size, 0);
693    }
694}