1use crate::{deterministic::BoxDynRng, Error, IoBufs, IoBufsMut};
4use bytes::Buf;
5use commonware_utils::sync::{Mutex, RwLock};
6use rand::Rng;
7use std::{
8 io::Error as IoError,
9 sync::{
10 atomic::{AtomicU64, Ordering},
11 Arc,
12 },
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum Op {
18 Open,
19 Read,
20 Write,
21 Sync,
22 Resize,
23 Remove,
24 Scan,
25}
26
27#[derive(Clone, Debug, Default)]
31pub struct Config {
32 pub open_rate: Option<f64>,
34
35 pub read_rate: Option<f64>,
37
38 pub write_rate: Option<f64>,
40
41 pub partial_write_rate: Option<f64>,
46
47 pub sync_rate: Option<f64>,
49
50 pub resize_rate: Option<f64>,
52
53 pub partial_resize_rate: Option<f64>,
58
59 pub remove_rate: Option<f64>,
61
62 pub scan_rate: Option<f64>,
64}
65
66impl Config {
67 fn rate_for(&self, op: Op) -> f64 {
69 match op {
70 Op::Open => self.open_rate,
71 Op::Read => self.read_rate,
72 Op::Write => self.write_rate,
73 Op::Sync => self.sync_rate,
74 Op::Resize => self.resize_rate,
75 Op::Remove => self.remove_rate,
76 Op::Scan => self.scan_rate,
77 }
78 .unwrap_or(0.0)
79 }
80
81 pub const fn open(mut self, rate: f64) -> Self {
83 self.open_rate = Some(rate);
84 self
85 }
86
87 pub const fn read(mut self, rate: f64) -> Self {
89 self.read_rate = Some(rate);
90 self
91 }
92
93 pub const fn write(mut self, rate: f64) -> Self {
95 self.write_rate = Some(rate);
96 self
97 }
98
99 pub const fn partial_write(mut self, rate: f64) -> Self {
101 self.partial_write_rate = Some(rate);
102 self
103 }
104
105 pub const fn sync(mut self, rate: f64) -> Self {
107 self.sync_rate = Some(rate);
108 self
109 }
110
111 pub const fn resize(mut self, rate: f64) -> Self {
113 self.resize_rate = Some(rate);
114 self
115 }
116
117 pub const fn partial_resize(mut self, rate: f64) -> Self {
119 self.partial_resize_rate = Some(rate);
120 self
121 }
122
123 pub const fn remove(mut self, rate: f64) -> Self {
125 self.remove_rate = Some(rate);
126 self
127 }
128
129 pub const fn scan(mut self, rate: f64) -> Self {
131 self.scan_rate = Some(rate);
132 self
133 }
134}
135
136#[derive(Clone)]
138struct Oracle {
139 rng: Arc<Mutex<BoxDynRng>>,
140 config: Arc<RwLock<Config>>,
141}
142
143impl Oracle {
144 fn should_fail(&self, op: Op) -> bool {
146 self.roll(Some(self.config.read().rate_for(op)))
147 }
148
149 fn check_write_fault(&self) -> (bool, Option<f64>) {
152 let config = self.config.read();
153 let fail = self.roll(Some(config.rate_for(Op::Write)));
154 (fail, config.partial_write_rate)
155 }
156
157 fn check_resize_fault(&self) -> (bool, Option<f64>) {
160 let config = self.config.read();
161 let fail = self.roll(Some(config.rate_for(Op::Resize)));
162 (fail, config.partial_resize_rate)
163 }
164
165 fn roll(&self, rate: Option<f64>) -> bool {
167 let rate = rate.unwrap_or(0.0);
168 if rate <= 0.0 {
169 return false;
170 }
171 if rate >= 1.0 {
172 return true;
173 }
174 self.rng.lock().gen::<f64>() < rate
175 }
176
177 fn random_between(&self, from: u64, to: u64) -> Option<u64> {
179 if from == to {
180 return None;
181 }
182 let (min, max) = if from < to { (from, to) } else { (to, from) };
183 if max - min <= 1 {
184 return None;
185 }
186 Some(self.rng.lock().gen_range(min + 1..max))
187 }
188
189 fn try_partial(&self, rate: Option<f64>, from: u64, to: u64) -> Option<u64> {
192 if self.roll(rate) {
193 self.random_between(from, to)
194 } else {
195 None
196 }
197 }
198}
199
200#[derive(Clone)]
204pub struct Storage<S: crate::Storage> {
205 inner: S,
206 ctx: Oracle,
207}
208
209impl<S: crate::Storage> Storage<S> {
210 pub fn new(inner: S, rng: Arc<Mutex<BoxDynRng>>, config: Arc<RwLock<Config>>) -> Self {
212 Self {
213 inner,
214 ctx: Oracle { rng, config },
215 }
216 }
217
218 pub const fn inner(&self) -> &S {
220 &self.inner
221 }
222
223 pub fn config(&self) -> Arc<RwLock<Config>> {
225 self.ctx.config.clone()
226 }
227}
228
229fn injected_io_error() -> IoError {
231 IoError::other("injected storage fault")
232}
233
234impl<S: crate::Storage> crate::Storage for Storage<S> {
235 type Blob = Blob<S::Blob>;
236
237 async fn open_versioned(
238 &self,
239 partition: &str,
240 name: &[u8],
241 versions: std::ops::RangeInclusive<u16>,
242 ) -> Result<(Self::Blob, u64, u16), Error> {
243 if self.ctx.should_fail(Op::Open) {
244 return Err(Error::Io(injected_io_error()));
245 }
246 self.inner
247 .open_versioned(partition, name, versions)
248 .await
249 .map(|(blob, len, blob_version)| {
250 (Blob::new(self.ctx.clone(), blob, len), len, blob_version)
251 })
252 }
253
254 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
255 if self.ctx.should_fail(Op::Remove) {
256 return Err(Error::Io(injected_io_error()));
257 }
258 self.inner.remove(partition, name).await
259 }
260
261 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
262 if self.ctx.should_fail(Op::Scan) {
263 return Err(Error::Io(injected_io_error()));
264 }
265 self.inner.scan(partition).await
266 }
267}
268
269#[derive(Clone)]
271pub struct Blob<B: crate::Blob> {
272 inner: B,
273 ctx: Oracle,
274 size: Arc<AtomicU64>,
276}
277
278impl<B: crate::Blob> Blob<B> {
279 fn new(ctx: Oracle, inner: B, size: u64) -> Self {
280 Self {
281 inner,
282 ctx,
283 size: Arc::new(AtomicU64::new(size)),
284 }
285 }
286}
287
288impl<B: crate::Blob> crate::Blob for Blob<B> {
289 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
290 if self.ctx.should_fail(Op::Read) {
291 return Err(Error::Io(injected_io_error()));
292 }
293 self.inner.read_at(offset, len).await
294 }
295
296 async fn read_at_buf(
297 &self,
298 offset: u64,
299 len: usize,
300 bufs: impl Into<IoBufsMut> + Send,
301 ) -> Result<IoBufsMut, Error> {
302 if self.ctx.should_fail(Op::Read) {
303 return Err(Error::Io(injected_io_error()));
304 }
305 self.inner.read_at_buf(offset, len, bufs.into()).await
306 }
307
308 async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
309 let bufs = bufs.into();
310 let total_bytes = bufs.remaining() as u64;
311
312 let (should_fail, partial_rate) = self.ctx.check_write_fault();
313 if should_fail {
314 if let Some(bytes) = self.ctx.try_partial(partial_rate, 0, total_bytes) {
315 self.inner
317 .write_at(offset, bufs.coalesce().slice(..bytes as usize))
318 .await?;
319 self.inner.sync().await?;
320 self.size
321 .fetch_max(offset.saturating_add(bytes), Ordering::Relaxed);
322 return Err(Error::Io(injected_io_error()));
323 }
324 return Err(Error::Io(injected_io_error()));
325 }
326
327 self.inner.write_at(offset, bufs).await?;
328 self.size
329 .fetch_max(offset.saturating_add(total_bytes), Ordering::Relaxed);
330 Ok(())
331 }
332
333 async fn resize(&self, len: u64) -> Result<(), Error> {
334 let (should_fail, partial_rate) = self.ctx.check_resize_fault();
335 if should_fail {
336 let current = self.size.load(Ordering::Relaxed);
337 if let Some(len) = self.ctx.try_partial(partial_rate, current, len) {
338 self.inner.resize(len).await?;
340 self.inner.sync().await?;
341 self.size.store(len, Ordering::Relaxed);
342 return Err(Error::Io(injected_io_error()));
343 }
344 return Err(Error::Io(injected_io_error()));
345 }
346 self.inner.resize(len).await?;
347 self.size.store(len, Ordering::Relaxed);
348 Ok(())
349 }
350
351 async fn sync(&self) -> Result<(), Error> {
352 if self.ctx.should_fail(Op::Sync) {
353 return Err(Error::Io(injected_io_error()));
354 }
355 self.inner.sync().await
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use crate::{
363 storage::{memory::Storage as MemStorage, tests::run_storage_tests},
364 Blob as _, BufferPool, BufferPoolConfig, Storage as _,
365 };
366 use rand::{rngs::StdRng, SeedableRng};
367
368 fn test_pool() -> BufferPool {
369 BufferPool::new(
370 BufferPoolConfig::for_storage(),
371 &mut prometheus_client::registry::Registry::default(),
372 )
373 }
374
375 struct Harness {
377 inner: MemStorage,
378 storage: Storage<MemStorage>,
379 config: Arc<RwLock<Config>>,
380 }
381
382 impl Harness {
383 fn new(config: Config) -> Self {
384 Self::with_seed(42, config)
385 }
386
387 fn with_seed(seed: u64, config: Config) -> Self {
388 let inner = MemStorage::new(test_pool());
389 let rng = Arc::new(Mutex::new(
390 Box::new(StdRng::seed_from_u64(seed)) as BoxDynRng
391 ));
392 let config = Arc::new(RwLock::new(config));
393 let storage = Storage::new(inner.clone(), rng, config.clone());
394 Self {
395 inner,
396 storage,
397 config,
398 }
399 }
400 }
401
402 #[tokio::test]
403 async fn test_faulty_storage_no_faults() {
404 let h = Harness::new(Config::default());
405 run_storage_tests(h.storage).await;
406 }
407
408 #[tokio::test]
409 async fn test_faulty_storage_sync_always_fails() {
410 let h = Harness::new(Config::default().sync(1.0));
411
412 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
413 blob.write_at(0, b"data".to_vec()).await.unwrap();
414
415 assert!(matches!(blob.sync().await, Err(Error::Io(_))));
416 }
417
418 #[tokio::test]
419 async fn test_faulty_storage_write_always_fails() {
420 let h = Harness::new(Config::default().write(1.0));
421
422 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
423
424 assert!(matches!(
425 blob.write_at(0, b"data".to_vec()).await,
426 Err(Error::Io(_))
427 ));
428 }
429
430 #[tokio::test]
431 async fn test_faulty_storage_read_always_fails() {
432 let h = Harness::new(Config::default());
433
434 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
436 blob.write_at(0, b"data".to_vec()).await.unwrap();
437 blob.sync().await.unwrap();
438
439 h.config.write().read_rate = Some(1.0);
441
442 assert!(matches!(blob.read_at(0, 4).await, Err(Error::Io(_))));
443 }
444
445 #[tokio::test]
446 async fn test_faulty_storage_open_always_fails() {
447 let h = Harness::new(Config::default().open(1.0));
448
449 assert!(matches!(
450 h.storage.open("partition", b"test").await,
451 Err(Error::Io(_))
452 ));
453 }
454
455 #[tokio::test]
456 async fn test_faulty_storage_remove_always_fails() {
457 let h = Harness::new(Config::default());
458
459 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
461 blob.write_at(0, b"data".to_vec()).await.unwrap();
462 blob.sync().await.unwrap();
463 drop(blob);
464
465 h.config.write().remove_rate = Some(1.0);
467
468 assert!(matches!(
469 h.storage.remove("partition", Some(b"test")).await,
470 Err(Error::Io(_))
471 ));
472 }
473
474 #[tokio::test]
475 async fn test_faulty_storage_scan_always_fails() {
476 let h = Harness::new(Config::default());
477
478 for i in 0..3 {
480 let name = format!("blob{i}");
481 let (blob, _) = h.storage.open("partition", name.as_bytes()).await.unwrap();
482 blob.write_at(0, b"data".to_vec()).await.unwrap();
483 blob.sync().await.unwrap();
484 }
485
486 h.config.write().scan_rate = Some(1.0);
488
489 assert!(matches!(
490 h.storage.scan("partition").await,
491 Err(Error::Io(_))
492 ));
493 }
494
495 #[tokio::test]
496 async fn test_faulty_storage_determinism() {
497 async fn run_ops(seed: u64, rate: f64) -> Vec<bool> {
498 let h = Harness::with_seed(seed, Config::default().open(rate));
499 let mut results = Vec::new();
500 for i in 0..20 {
501 let name = format!("blob{i}");
502 results.push(h.storage.open("partition", name.as_bytes()).await.is_ok());
503 }
504 results
505 }
506
507 let results1 = run_ops(42, 0.5).await;
508 let results2 = run_ops(42, 0.5).await;
509 assert_eq!(results1, results2, "Same seed should produce same results");
510
511 let results3 = run_ops(999, 0.5).await;
512 assert_ne!(
513 results1, results3,
514 "Different seeds should produce different results"
515 );
516 }
517
518 #[tokio::test]
519 async fn test_faulty_storage_rate_for() {
520 let config = Config::default().open(0.1).sync(0.9);
521
522 assert!((config.rate_for(Op::Open) - 0.1).abs() < f64::EPSILON);
523 assert!((config.rate_for(Op::Sync) - 0.9).abs() < f64::EPSILON);
524 assert!(config.rate_for(Op::Write).abs() < f64::EPSILON);
525 }
526
527 #[tokio::test]
528 async fn test_faulty_storage_dynamic_config() {
529 let h = Harness::new(Config::default());
530
531 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
532 blob.sync().await.unwrap();
533
534 h.config.write().sync_rate = Some(1.0);
535 assert!(matches!(blob.sync().await, Err(Error::Io(_))));
536
537 h.config.write().sync_rate = Some(0.0);
538 blob.sync().await.unwrap();
539 }
540
541 #[tokio::test]
542 async fn test_faulty_storage_partial_write() {
543 let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
544
545 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
546 let data = b"hello world".to_vec();
547 let result = blob.write_at(0, data.clone()).await;
548
549 assert!(matches!(result, Err(Error::Io(_))));
550
551 let (inner_blob, size) = h.inner.open("partition", b"test").await.unwrap();
552 let bytes_written = size as usize;
553 assert!(
554 bytes_written > 0 && bytes_written < data.len(),
555 "Expected partial write: {bytes_written} bytes out of {}",
556 data.len()
557 );
558
559 let read_result = inner_blob.read_at(0, bytes_written).await.unwrap();
560 assert_eq!(read_result.coalesce().as_ref(), &data[..bytes_written]);
561 }
562
563 #[tokio::test]
564 async fn test_faulty_storage_partial_write_disabled() {
565 let h = Harness::new(Config::default().write(1.0).partial_write(0.0));
566
567 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
568 let result = blob.write_at(0, b"hello world".to_vec()).await;
569
570 assert!(matches!(result, Err(Error::Io(_))));
571
572 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
573 assert_eq!(
574 size, 0,
575 "Expected no bytes written when partial_write_rate is 0"
576 );
577 }
578
579 #[tokio::test]
580 async fn test_faulty_storage_partial_write_single_byte() {
581 let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
582
583 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
584 let result = blob.write_at(0, b"x".to_vec()).await;
585
586 assert!(matches!(result, Err(Error::Io(_))));
587
588 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
589 assert_eq!(size, 0, "No partial write possible for single byte");
590 }
591
592 #[tokio::test]
593 async fn test_faulty_storage_partial_resize_grow() {
594 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
595
596 let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
597 assert_eq!(initial_size, 0);
598
599 let target_size = 100u64;
600 let result = blob.resize(target_size).await;
601
602 assert!(matches!(result, Err(Error::Io(_))));
603
604 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
605 assert!(
606 actual_size > 0 && actual_size < target_size,
607 "Expected partial resize: size {actual_size} should be between 0 and {target_size}"
608 );
609 }
610
611 #[tokio::test]
612 async fn test_faulty_storage_partial_resize_shrink() {
613 let h = Harness::new(Config::default());
614
615 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
616 blob.resize(100).await.unwrap();
617 blob.sync().await.unwrap();
618
619 {
620 let mut cfg = h.config.write();
621 cfg.resize_rate = Some(1.0);
622 cfg.partial_resize_rate = Some(1.0);
623 }
624
625 let target_size = 10u64;
626 let result = blob.resize(target_size).await;
627
628 assert!(matches!(result, Err(Error::Io(_))));
629
630 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
631 assert!(
632 actual_size > target_size && actual_size < 100,
633 "Expected partial shrink: size {actual_size} should be between {target_size} and 100"
634 );
635 }
636
637 #[tokio::test]
638 async fn test_faulty_storage_partial_resize_disabled() {
639 let h = Harness::new(Config::default().resize(1.0).partial_resize(0.0));
640
641 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
642 let result = blob.resize(100).await;
643
644 assert!(matches!(result, Err(Error::Io(_))));
645
646 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
647 assert_eq!(size, 0, "Expected no resize when partial_resize_rate is 0");
648 }
649
650 #[tokio::test]
651 async fn test_faulty_storage_partial_resize_same_size() {
652 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
653
654 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
655 let result = blob.resize(0).await;
656
657 assert!(matches!(result, Err(Error::Io(_))));
658
659 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
660 assert_eq!(size, 0);
661 }
662
663 #[tokio::test]
664 async fn test_faulty_storage_partial_resize_after_write_extends() {
665 let h = Harness::new(Config::default());
666
667 let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
668 assert_eq!(initial_size, 0);
669
670 blob.write_at(0, vec![0xABu8; 50]).await.unwrap();
671 blob.sync().await.unwrap();
672
673 let (_, size_after_write) = h.inner.open("partition", b"test").await.unwrap();
674 assert_eq!(size_after_write, 50);
675
676 {
677 let mut cfg = h.config.write();
678 cfg.resize_rate = Some(1.0);
679 cfg.partial_resize_rate = Some(1.0);
680 }
681
682 let target_size = 10u64;
683 let result = blob.resize(target_size).await;
684
685 assert!(matches!(result, Err(Error::Io(_))));
686
687 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
688 assert!(
689 actual_size > target_size && actual_size < 50,
690 "Expected partial shrink from 50: size {actual_size} should be between {target_size} and 50"
691 );
692 }
693
694 #[tokio::test]
695 async fn test_faulty_storage_partial_resize_one_byte_difference() {
696 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
697
698 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
699 let result = blob.resize(1).await;
700
701 assert!(matches!(result, Err(Error::Io(_))));
702
703 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
704 assert_eq!(size, 0);
705 }
706}