Skip to main content

commonware_runtime/storage/
faulty.rs

1//! A storage wrapper that injects deterministic faults for testing crash recovery.
2
3use crate::{deterministic::BoxDynRng, Error, IoBufs, IoBufsMut};
4use bytes::Buf;
5use commonware_utils::sync::{Mutex, RwLock};
6use rand::Rng;
7use std::{
8    io::Error as IoError,
9    sync::{
10        atomic::{AtomicU64, Ordering},
11        Arc,
12    },
13};
14
15/// Operation types for fault injection.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum Op {
18    Open,
19    Read,
20    Write,
21    Sync,
22    Resize,
23    Remove,
24    Scan,
25}
26
27/// Configuration for deterministic storage fault injection.
28///
29/// Each rate is a probability from 0.0 (never fail) to 1.0 (always fail).
30#[derive(Clone, Debug, Default)]
31pub struct Config {
32    /// Failure rate for `open_versioned` operations.
33    pub open_rate: Option<f64>,
34
35    /// Failure rate for `read_at` operations.
36    pub read_rate: Option<f64>,
37
38    /// Failure rate for `write_at` operations.
39    pub write_rate: Option<f64>,
40
41    /// Probability that a write failure is a partial write (some bytes written
42    /// before failure) rather than a complete failure (no bytes written).
43    /// Only applies when `write_rate` triggers a failure.
44    /// Value from 0.0 (always complete failure) to 1.0 (always partial write).
45    pub partial_write_rate: Option<f64>,
46
47    /// Failure rate for `sync` operations.
48    pub sync_rate: Option<f64>,
49
50    /// Failure rate for `resize` operations.
51    pub resize_rate: Option<f64>,
52
53    /// Probability that a resize failure is partial (resized to an intermediate
54    /// size before failure) rather than a complete failure (size unchanged).
55    /// Only applies when `resize_rate` triggers a failure.
56    /// Value from 0.0 (always complete failure) to 1.0 (always partial resize).
57    pub partial_resize_rate: Option<f64>,
58
59    /// Failure rate for `remove` operations.
60    pub remove_rate: Option<f64>,
61
62    /// Failure rate for `scan` operations.
63    pub scan_rate: Option<f64>,
64}
65
66impl Config {
67    /// Get the failure rate for an operation type.
68    fn rate_for(&self, op: Op) -> f64 {
69        match op {
70            Op::Open => self.open_rate,
71            Op::Read => self.read_rate,
72            Op::Write => self.write_rate,
73            Op::Sync => self.sync_rate,
74            Op::Resize => self.resize_rate,
75            Op::Remove => self.remove_rate,
76            Op::Scan => self.scan_rate,
77        }
78        .unwrap_or(0.0)
79    }
80
81    /// Set the open failure rate.
82    pub const fn open(mut self, rate: f64) -> Self {
83        self.open_rate = Some(rate);
84        self
85    }
86
87    /// Set the read failure rate.
88    pub const fn read(mut self, rate: f64) -> Self {
89        self.read_rate = Some(rate);
90        self
91    }
92
93    /// Set the write failure rate.
94    pub const fn write(mut self, rate: f64) -> Self {
95        self.write_rate = Some(rate);
96        self
97    }
98
99    /// Set the partial write rate (probability of partial vs complete write failure).
100    pub const fn partial_write(mut self, rate: f64) -> Self {
101        self.partial_write_rate = Some(rate);
102        self
103    }
104
105    /// Set the sync failure rate.
106    pub const fn sync(mut self, rate: f64) -> Self {
107        self.sync_rate = Some(rate);
108        self
109    }
110
111    /// Set the resize failure rate.
112    pub const fn resize(mut self, rate: f64) -> Self {
113        self.resize_rate = Some(rate);
114        self
115    }
116
117    /// Set the partial resize rate (probability of partial vs complete resize failure).
118    pub const fn partial_resize(mut self, rate: f64) -> Self {
119        self.partial_resize_rate = Some(rate);
120        self
121    }
122
123    /// Set the remove failure rate.
124    pub const fn remove(mut self, rate: f64) -> Self {
125        self.remove_rate = Some(rate);
126        self
127    }
128
129    /// Set the scan failure rate.
130    pub const fn scan(mut self, rate: f64) -> Self {
131        self.scan_rate = Some(rate);
132        self
133    }
134}
135
136/// Shared fault injection context.
137#[derive(Clone)]
138struct Oracle {
139    rng: Arc<Mutex<BoxDynRng>>,
140    config: Arc<RwLock<Config>>,
141}
142
143impl Oracle {
144    /// Check if a fault should be injected for the given operation.
145    fn should_fail(&self, op: Op) -> bool {
146        self.roll(Some(self.config.read().rate_for(op)))
147    }
148
149    /// Check if a write fault should be injected. Returns (should_fail, partial_rate).
150    /// Reads config once to avoid nested lock acquisition.
151    fn check_write_fault(&self) -> (bool, Option<f64>) {
152        let config = self.config.read();
153        let fail = self.roll(Some(config.rate_for(Op::Write)));
154        (fail, config.partial_write_rate)
155    }
156
157    /// Check if a resize fault should be injected. Returns (should_fail, partial_rate).
158    /// Reads config once to avoid nested lock acquisition.
159    fn check_resize_fault(&self) -> (bool, Option<f64>) {
160        let config = self.config.read();
161        let fail = self.roll(Some(config.rate_for(Op::Resize)));
162        (fail, config.partial_resize_rate)
163    }
164
165    /// Check if an event should occur based on a probability rate.
166    fn roll(&self, rate: Option<f64>) -> bool {
167        let rate = rate.unwrap_or(0.0);
168        if rate <= 0.0 {
169            return false;
170        }
171        if rate >= 1.0 {
172            return true;
173        }
174        self.rng.lock().gen::<f64>() < rate
175    }
176
177    /// Generate a random value strictly between `from` and `to`, or None if not possible.
178    fn random_between(&self, from: u64, to: u64) -> Option<u64> {
179        if from == to {
180            return None;
181        }
182        let (min, max) = if from < to { (from, to) } else { (to, from) };
183        if max - min <= 1 {
184            return None;
185        }
186        Some(self.rng.lock().gen_range(min + 1..max))
187    }
188
189    /// Try to generate a partial operation target. Returns Some if both the rate
190    /// check passes and an intermediate value exists between `from` and `to`.
191    fn try_partial(&self, rate: Option<f64>, from: u64, to: u64) -> Option<u64> {
192        if self.roll(rate) {
193            self.random_between(from, to)
194        } else {
195            None
196        }
197    }
198}
199
200/// A storage wrapper that injects deterministic faults based on configuration.
201///
202/// Uses a shared RNG for determinism.
203#[derive(Clone)]
204pub struct Storage<S: crate::Storage> {
205    inner: S,
206    ctx: Oracle,
207}
208
209impl<S: crate::Storage> Storage<S> {
210    /// Create a new faulty storage wrapper.
211    pub fn new(inner: S, rng: Arc<Mutex<BoxDynRng>>, config: Arc<RwLock<Config>>) -> Self {
212        Self {
213            inner,
214            ctx: Oracle { rng, config },
215        }
216    }
217
218    /// Get a reference to the inner storage.
219    pub const fn inner(&self) -> &S {
220        &self.inner
221    }
222
223    /// Get access to the fault configuration for dynamic modification.
224    pub fn config(&self) -> Arc<RwLock<Config>> {
225        self.ctx.config.clone()
226    }
227}
228
229/// Create an IoError for injected faults.
230fn injected_io_error() -> IoError {
231    IoError::other("injected storage fault")
232}
233
234impl<S: crate::Storage> crate::Storage for Storage<S> {
235    type Blob = Blob<S::Blob>;
236
237    async fn open_versioned(
238        &self,
239        partition: &str,
240        name: &[u8],
241        versions: std::ops::RangeInclusive<u16>,
242    ) -> Result<(Self::Blob, u64, u16), Error> {
243        if self.ctx.should_fail(Op::Open) {
244            return Err(Error::Io(injected_io_error()));
245        }
246        self.inner
247            .open_versioned(partition, name, versions)
248            .await
249            .map(|(blob, len, blob_version)| {
250                (Blob::new(self.ctx.clone(), blob, len), len, blob_version)
251            })
252    }
253
254    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
255        if self.ctx.should_fail(Op::Remove) {
256            return Err(Error::Io(injected_io_error()));
257        }
258        self.inner.remove(partition, name).await
259    }
260
261    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
262        if self.ctx.should_fail(Op::Scan) {
263            return Err(Error::Io(injected_io_error()));
264        }
265        self.inner.scan(partition).await
266    }
267}
268
269/// A blob wrapper that injects deterministic faults based on configuration.
270#[derive(Clone)]
271pub struct Blob<B: crate::Blob> {
272    inner: B,
273    ctx: Oracle,
274    /// Tracked size for partial resize support.
275    size: Arc<AtomicU64>,
276}
277
278impl<B: crate::Blob> Blob<B> {
279    fn new(ctx: Oracle, inner: B, size: u64) -> Self {
280        Self {
281            inner,
282            ctx,
283            size: Arc::new(AtomicU64::new(size)),
284        }
285    }
286}
287
288impl<B: crate::Blob> crate::Blob for Blob<B> {
289    async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
290        if self.ctx.should_fail(Op::Read) {
291            return Err(Error::Io(injected_io_error()));
292        }
293        self.inner.read_at(offset, len).await
294    }
295
296    async fn read_at_buf(
297        &self,
298        offset: u64,
299        len: usize,
300        bufs: impl Into<IoBufsMut> + Send,
301    ) -> Result<IoBufsMut, Error> {
302        if self.ctx.should_fail(Op::Read) {
303            return Err(Error::Io(injected_io_error()));
304        }
305        self.inner.read_at_buf(offset, len, bufs.into()).await
306    }
307
308    async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
309        let bufs = bufs.into();
310        let total_bytes = bufs.remaining() as u64;
311
312        let (should_fail, partial_rate) = self.ctx.check_write_fault();
313        if should_fail {
314            if let Some(bytes) = self.ctx.try_partial(partial_rate, 0, total_bytes) {
315                // Partial write: write some bytes, sync, then fail
316                self.inner
317                    .write_at(offset, bufs.coalesce().slice(..bytes as usize))
318                    .await?;
319                self.inner.sync().await?;
320                self.size
321                    .fetch_max(offset.saturating_add(bytes), Ordering::Relaxed);
322                return Err(Error::Io(injected_io_error()));
323            }
324            return Err(Error::Io(injected_io_error()));
325        }
326
327        self.inner.write_at(offset, bufs).await?;
328        self.size
329            .fetch_max(offset.saturating_add(total_bytes), Ordering::Relaxed);
330        Ok(())
331    }
332
333    async fn resize(&self, len: u64) -> Result<(), Error> {
334        let (should_fail, partial_rate) = self.ctx.check_resize_fault();
335        if should_fail {
336            let current = self.size.load(Ordering::Relaxed);
337            if let Some(len) = self.ctx.try_partial(partial_rate, current, len) {
338                // Partial resize: resize to intermediate size, sync, then fail
339                self.inner.resize(len).await?;
340                self.inner.sync().await?;
341                self.size.store(len, Ordering::Relaxed);
342                return Err(Error::Io(injected_io_error()));
343            }
344            return Err(Error::Io(injected_io_error()));
345        }
346        self.inner.resize(len).await?;
347        self.size.store(len, Ordering::Relaxed);
348        Ok(())
349    }
350
351    async fn sync(&self) -> Result<(), Error> {
352        if self.ctx.should_fail(Op::Sync) {
353            return Err(Error::Io(injected_io_error()));
354        }
355        self.inner.sync().await
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use crate::{
363        storage::{memory::Storage as MemStorage, tests::run_storage_tests},
364        Blob as _, BufferPool, BufferPoolConfig, Storage as _,
365    };
366    use rand::{rngs::StdRng, SeedableRng};
367
368    fn test_pool() -> BufferPool {
369        BufferPool::new(
370            BufferPoolConfig::for_storage(),
371            &mut prometheus_client::registry::Registry::default(),
372        )
373    }
374
375    /// Test harness with faulty storage wrapping memory storage.
376    struct Harness {
377        inner: MemStorage,
378        storage: Storage<MemStorage>,
379        config: Arc<RwLock<Config>>,
380    }
381
382    impl Harness {
383        fn new(config: Config) -> Self {
384            Self::with_seed(42, config)
385        }
386
387        fn with_seed(seed: u64, config: Config) -> Self {
388            let inner = MemStorage::new(test_pool());
389            let rng = Arc::new(Mutex::new(
390                Box::new(StdRng::seed_from_u64(seed)) as BoxDynRng
391            ));
392            let config = Arc::new(RwLock::new(config));
393            let storage = Storage::new(inner.clone(), rng, config.clone());
394            Self {
395                inner,
396                storage,
397                config,
398            }
399        }
400    }
401
402    #[tokio::test]
403    async fn test_faulty_storage_no_faults() {
404        let h = Harness::new(Config::default());
405        run_storage_tests(h.storage).await;
406    }
407
408    #[tokio::test]
409    async fn test_faulty_storage_sync_always_fails() {
410        let h = Harness::new(Config::default().sync(1.0));
411
412        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
413        blob.write_at(0, b"data".to_vec()).await.unwrap();
414
415        assert!(matches!(blob.sync().await, Err(Error::Io(_))));
416    }
417
418    #[tokio::test]
419    async fn test_faulty_storage_write_always_fails() {
420        let h = Harness::new(Config::default().write(1.0));
421
422        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
423
424        assert!(matches!(
425            blob.write_at(0, b"data".to_vec()).await,
426            Err(Error::Io(_))
427        ));
428    }
429
430    #[tokio::test]
431    async fn test_faulty_storage_read_always_fails() {
432        let h = Harness::new(Config::default());
433
434        // Write some data first (no faults)
435        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
436        blob.write_at(0, b"data".to_vec()).await.unwrap();
437        blob.sync().await.unwrap();
438
439        // Enable read faults
440        h.config.write().read_rate = Some(1.0);
441
442        assert!(matches!(blob.read_at(0, 4).await, Err(Error::Io(_))));
443    }
444
445    #[tokio::test]
446    async fn test_faulty_storage_open_always_fails() {
447        let h = Harness::new(Config::default().open(1.0));
448
449        assert!(matches!(
450            h.storage.open("partition", b"test").await,
451            Err(Error::Io(_))
452        ));
453    }
454
455    #[tokio::test]
456    async fn test_faulty_storage_remove_always_fails() {
457        let h = Harness::new(Config::default());
458
459        // Create a blob first
460        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
461        blob.write_at(0, b"data".to_vec()).await.unwrap();
462        blob.sync().await.unwrap();
463        drop(blob);
464
465        // Enable remove faults
466        h.config.write().remove_rate = Some(1.0);
467
468        assert!(matches!(
469            h.storage.remove("partition", Some(b"test")).await,
470            Err(Error::Io(_))
471        ));
472    }
473
474    #[tokio::test]
475    async fn test_faulty_storage_scan_always_fails() {
476        let h = Harness::new(Config::default());
477
478        // Create some blobs first
479        for i in 0..3 {
480            let name = format!("blob{i}");
481            let (blob, _) = h.storage.open("partition", name.as_bytes()).await.unwrap();
482            blob.write_at(0, b"data".to_vec()).await.unwrap();
483            blob.sync().await.unwrap();
484        }
485
486        // Enable scan faults
487        h.config.write().scan_rate = Some(1.0);
488
489        assert!(matches!(
490            h.storage.scan("partition").await,
491            Err(Error::Io(_))
492        ));
493    }
494
495    #[tokio::test]
496    async fn test_faulty_storage_determinism() {
497        async fn run_ops(seed: u64, rate: f64) -> Vec<bool> {
498            let h = Harness::with_seed(seed, Config::default().open(rate));
499            let mut results = Vec::new();
500            for i in 0..20 {
501                let name = format!("blob{i}");
502                results.push(h.storage.open("partition", name.as_bytes()).await.is_ok());
503            }
504            results
505        }
506
507        let results1 = run_ops(42, 0.5).await;
508        let results2 = run_ops(42, 0.5).await;
509        assert_eq!(results1, results2, "Same seed should produce same results");
510
511        let results3 = run_ops(999, 0.5).await;
512        assert_ne!(
513            results1, results3,
514            "Different seeds should produce different results"
515        );
516    }
517
518    #[tokio::test]
519    async fn test_faulty_storage_rate_for() {
520        let config = Config::default().open(0.1).sync(0.9);
521
522        assert!((config.rate_for(Op::Open) - 0.1).abs() < f64::EPSILON);
523        assert!((config.rate_for(Op::Sync) - 0.9).abs() < f64::EPSILON);
524        assert!(config.rate_for(Op::Write).abs() < f64::EPSILON);
525    }
526
527    #[tokio::test]
528    async fn test_faulty_storage_dynamic_config() {
529        let h = Harness::new(Config::default());
530
531        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
532        blob.sync().await.unwrap();
533
534        h.config.write().sync_rate = Some(1.0);
535        assert!(matches!(blob.sync().await, Err(Error::Io(_))));
536
537        h.config.write().sync_rate = Some(0.0);
538        blob.sync().await.unwrap();
539    }
540
541    #[tokio::test]
542    async fn test_faulty_storage_partial_write() {
543        let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
544
545        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
546        let data = b"hello world".to_vec();
547        let result = blob.write_at(0, data.clone()).await;
548
549        assert!(matches!(result, Err(Error::Io(_))));
550
551        let (inner_blob, size) = h.inner.open("partition", b"test").await.unwrap();
552        let bytes_written = size as usize;
553        assert!(
554            bytes_written > 0 && bytes_written < data.len(),
555            "Expected partial write: {bytes_written} bytes out of {}",
556            data.len()
557        );
558
559        let read_result = inner_blob.read_at(0, bytes_written).await.unwrap();
560        assert_eq!(read_result.coalesce().as_ref(), &data[..bytes_written]);
561    }
562
563    #[tokio::test]
564    async fn test_faulty_storage_partial_write_disabled() {
565        let h = Harness::new(Config::default().write(1.0).partial_write(0.0));
566
567        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
568        let result = blob.write_at(0, b"hello world".to_vec()).await;
569
570        assert!(matches!(result, Err(Error::Io(_))));
571
572        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
573        assert_eq!(
574            size, 0,
575            "Expected no bytes written when partial_write_rate is 0"
576        );
577    }
578
579    #[tokio::test]
580    async fn test_faulty_storage_partial_write_single_byte() {
581        let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
582
583        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
584        let result = blob.write_at(0, b"x".to_vec()).await;
585
586        assert!(matches!(result, Err(Error::Io(_))));
587
588        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
589        assert_eq!(size, 0, "No partial write possible for single byte");
590    }
591
592    #[tokio::test]
593    async fn test_faulty_storage_partial_resize_grow() {
594        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
595
596        let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
597        assert_eq!(initial_size, 0);
598
599        let target_size = 100u64;
600        let result = blob.resize(target_size).await;
601
602        assert!(matches!(result, Err(Error::Io(_))));
603
604        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
605        assert!(
606            actual_size > 0 && actual_size < target_size,
607            "Expected partial resize: size {actual_size} should be between 0 and {target_size}"
608        );
609    }
610
611    #[tokio::test]
612    async fn test_faulty_storage_partial_resize_shrink() {
613        let h = Harness::new(Config::default());
614
615        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
616        blob.resize(100).await.unwrap();
617        blob.sync().await.unwrap();
618
619        {
620            let mut cfg = h.config.write();
621            cfg.resize_rate = Some(1.0);
622            cfg.partial_resize_rate = Some(1.0);
623        }
624
625        let target_size = 10u64;
626        let result = blob.resize(target_size).await;
627
628        assert!(matches!(result, Err(Error::Io(_))));
629
630        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
631        assert!(
632            actual_size > target_size && actual_size < 100,
633            "Expected partial shrink: size {actual_size} should be between {target_size} and 100"
634        );
635    }
636
637    #[tokio::test]
638    async fn test_faulty_storage_partial_resize_disabled() {
639        let h = Harness::new(Config::default().resize(1.0).partial_resize(0.0));
640
641        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
642        let result = blob.resize(100).await;
643
644        assert!(matches!(result, Err(Error::Io(_))));
645
646        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
647        assert_eq!(size, 0, "Expected no resize when partial_resize_rate is 0");
648    }
649
650    #[tokio::test]
651    async fn test_faulty_storage_partial_resize_same_size() {
652        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
653
654        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
655        let result = blob.resize(0).await;
656
657        assert!(matches!(result, Err(Error::Io(_))));
658
659        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
660        assert_eq!(size, 0);
661    }
662
663    #[tokio::test]
664    async fn test_faulty_storage_partial_resize_after_write_extends() {
665        let h = Harness::new(Config::default());
666
667        let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
668        assert_eq!(initial_size, 0);
669
670        blob.write_at(0, vec![0xABu8; 50]).await.unwrap();
671        blob.sync().await.unwrap();
672
673        let (_, size_after_write) = h.inner.open("partition", b"test").await.unwrap();
674        assert_eq!(size_after_write, 50);
675
676        {
677            let mut cfg = h.config.write();
678            cfg.resize_rate = Some(1.0);
679            cfg.partial_resize_rate = Some(1.0);
680        }
681
682        let target_size = 10u64;
683        let result = blob.resize(target_size).await;
684
685        assert!(matches!(result, Err(Error::Io(_))));
686
687        let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
688        assert!(
689            actual_size > target_size && actual_size < 50,
690            "Expected partial shrink from 50: size {actual_size} should be between {target_size} and 50"
691        );
692    }
693
694    #[tokio::test]
695    async fn test_faulty_storage_partial_resize_one_byte_difference() {
696        let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
697
698        let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
699        let result = blob.resize(1).await;
700
701        assert!(matches!(result, Err(Error::Io(_))));
702
703        let (_, size) = h.inner.open("partition", b"test").await.unwrap();
704        assert_eq!(size, 0);
705    }
706}