1use crate::{deterministic::BoxDynRng, Error, IoBufs, IoBufsMut};
4use bytes::Buf;
5use rand::Rng;
6use std::{
7 io::Error as IoError,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc, Mutex, RwLock,
11 },
12};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum Op {
17 Open,
18 Read,
19 Write,
20 Sync,
21 Resize,
22 Remove,
23 Scan,
24}
25
26#[derive(Clone, Debug, Default)]
30pub struct Config {
31 pub open_rate: Option<f64>,
33
34 pub read_rate: Option<f64>,
36
37 pub write_rate: Option<f64>,
39
40 pub partial_write_rate: Option<f64>,
45
46 pub sync_rate: Option<f64>,
48
49 pub resize_rate: Option<f64>,
51
52 pub partial_resize_rate: Option<f64>,
57
58 pub remove_rate: Option<f64>,
60
61 pub scan_rate: Option<f64>,
63}
64
65impl Config {
66 fn rate_for(&self, op: Op) -> f64 {
68 match op {
69 Op::Open => self.open_rate,
70 Op::Read => self.read_rate,
71 Op::Write => self.write_rate,
72 Op::Sync => self.sync_rate,
73 Op::Resize => self.resize_rate,
74 Op::Remove => self.remove_rate,
75 Op::Scan => self.scan_rate,
76 }
77 .unwrap_or(0.0)
78 }
79
80 pub const fn open(mut self, rate: f64) -> Self {
82 self.open_rate = Some(rate);
83 self
84 }
85
86 pub const fn read(mut self, rate: f64) -> Self {
88 self.read_rate = Some(rate);
89 self
90 }
91
92 pub const fn write(mut self, rate: f64) -> Self {
94 self.write_rate = Some(rate);
95 self
96 }
97
98 pub const fn partial_write(mut self, rate: f64) -> Self {
100 self.partial_write_rate = Some(rate);
101 self
102 }
103
104 pub const fn sync(mut self, rate: f64) -> Self {
106 self.sync_rate = Some(rate);
107 self
108 }
109
110 pub const fn resize(mut self, rate: f64) -> Self {
112 self.resize_rate = Some(rate);
113 self
114 }
115
116 pub const fn partial_resize(mut self, rate: f64) -> Self {
118 self.partial_resize_rate = Some(rate);
119 self
120 }
121
122 pub const fn remove(mut self, rate: f64) -> Self {
124 self.remove_rate = Some(rate);
125 self
126 }
127
128 pub const fn scan(mut self, rate: f64) -> Self {
130 self.scan_rate = Some(rate);
131 self
132 }
133}
134
135#[derive(Clone)]
137struct Oracle {
138 rng: Arc<Mutex<BoxDynRng>>,
139 config: Arc<RwLock<Config>>,
140}
141
142impl Oracle {
143 fn should_fail(&self, op: Op) -> bool {
145 self.roll(Some(self.config.read().unwrap().rate_for(op)))
146 }
147
148 fn check_write_fault(&self) -> (bool, Option<f64>) {
151 let config = self.config.read().unwrap();
152 let fail = self.roll(Some(config.rate_for(Op::Write)));
153 (fail, config.partial_write_rate)
154 }
155
156 fn check_resize_fault(&self) -> (bool, Option<f64>) {
159 let config = self.config.read().unwrap();
160 let fail = self.roll(Some(config.rate_for(Op::Resize)));
161 (fail, config.partial_resize_rate)
162 }
163
164 fn roll(&self, rate: Option<f64>) -> bool {
166 let rate = rate.unwrap_or(0.0);
167 if rate <= 0.0 {
168 return false;
169 }
170 if rate >= 1.0 {
171 return true;
172 }
173 self.rng.lock().unwrap().gen::<f64>() < rate
174 }
175
176 fn random_between(&self, from: u64, to: u64) -> Option<u64> {
178 if from == to {
179 return None;
180 }
181 let (min, max) = if from < to { (from, to) } else { (to, from) };
182 if max - min <= 1 {
183 return None;
184 }
185 Some(self.rng.lock().unwrap().gen_range(min + 1..max))
186 }
187
188 fn try_partial(&self, rate: Option<f64>, from: u64, to: u64) -> Option<u64> {
191 if self.roll(rate) {
192 self.random_between(from, to)
193 } else {
194 None
195 }
196 }
197}
198
199#[derive(Clone)]
203pub struct Storage<S: crate::Storage> {
204 inner: S,
205 ctx: Oracle,
206}
207
208impl<S: crate::Storage> Storage<S> {
209 pub fn new(inner: S, rng: Arc<Mutex<BoxDynRng>>, config: Arc<RwLock<Config>>) -> Self {
211 Self {
212 inner,
213 ctx: Oracle { rng, config },
214 }
215 }
216
217 pub const fn inner(&self) -> &S {
219 &self.inner
220 }
221
222 pub fn config(&self) -> Arc<RwLock<Config>> {
224 self.ctx.config.clone()
225 }
226}
227
228fn injected_io_error() -> IoError {
230 IoError::other("injected storage fault")
231}
232
233impl<S: crate::Storage> crate::Storage for Storage<S> {
234 type Blob = Blob<S::Blob>;
235
236 async fn open_versioned(
237 &self,
238 partition: &str,
239 name: &[u8],
240 versions: std::ops::RangeInclusive<u16>,
241 ) -> Result<(Self::Blob, u64, u16), Error> {
242 if self.ctx.should_fail(Op::Open) {
243 return Err(Error::Io(injected_io_error()));
244 }
245 self.inner
246 .open_versioned(partition, name, versions)
247 .await
248 .map(|(blob, len, blob_version)| {
249 (Blob::new(self.ctx.clone(), blob, len), len, blob_version)
250 })
251 }
252
253 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
254 if self.ctx.should_fail(Op::Remove) {
255 return Err(Error::Io(injected_io_error()));
256 }
257 self.inner.remove(partition, name).await
258 }
259
260 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
261 if self.ctx.should_fail(Op::Scan) {
262 return Err(Error::Io(injected_io_error()));
263 }
264 self.inner.scan(partition).await
265 }
266}
267
268#[derive(Clone)]
270pub struct Blob<B: crate::Blob> {
271 inner: B,
272 ctx: Oracle,
273 size: Arc<AtomicU64>,
275}
276
277impl<B: crate::Blob> Blob<B> {
278 fn new(ctx: Oracle, inner: B, size: u64) -> Self {
279 Self {
280 inner,
281 ctx,
282 size: Arc::new(AtomicU64::new(size)),
283 }
284 }
285}
286
287impl<B: crate::Blob> crate::Blob for Blob<B> {
288 async fn read_at(
289 &self,
290 offset: u64,
291 buf: impl Into<IoBufsMut> + Send,
292 ) -> Result<IoBufsMut, Error> {
293 if self.ctx.should_fail(Op::Read) {
294 return Err(Error::Io(injected_io_error()));
295 }
296 self.inner.read_at(offset, buf.into()).await
297 }
298
299 async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
300 let buf: IoBufs = buf.into();
301 let total_bytes = buf.remaining() as u64;
302
303 let (should_fail, partial_rate) = self.ctx.check_write_fault();
304 if should_fail {
305 if let Some(bytes) = self.ctx.try_partial(partial_rate, 0, total_bytes) {
306 self.inner
308 .write_at(offset, buf.coalesce().slice(..bytes as usize))
309 .await?;
310 self.inner.sync().await?;
311 self.size
312 .fetch_max(offset.saturating_add(bytes), Ordering::Relaxed);
313 return Err(Error::Io(injected_io_error()));
314 }
315 return Err(Error::Io(injected_io_error()));
316 }
317
318 self.inner.write_at(offset, buf).await?;
319 self.size
320 .fetch_max(offset.saturating_add(total_bytes), Ordering::Relaxed);
321 Ok(())
322 }
323
324 async fn resize(&self, len: u64) -> Result<(), Error> {
325 let (should_fail, partial_rate) = self.ctx.check_resize_fault();
326 if should_fail {
327 let current = self.size.load(Ordering::Relaxed);
328 if let Some(len) = self.ctx.try_partial(partial_rate, current, len) {
329 self.inner.resize(len).await?;
331 self.inner.sync().await?;
332 self.size.store(len, Ordering::Relaxed);
333 return Err(Error::Io(injected_io_error()));
334 }
335 return Err(Error::Io(injected_io_error()));
336 }
337 self.inner.resize(len).await?;
338 self.size.store(len, Ordering::Relaxed);
339 Ok(())
340 }
341
342 async fn sync(&self) -> Result<(), Error> {
343 if self.ctx.should_fail(Op::Sync) {
344 return Err(Error::Io(injected_io_error()));
345 }
346 self.inner.sync().await
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::{
354 storage::{memory::Storage as MemStorage, tests::run_storage_tests},
355 Blob as _, Storage as _,
356 };
357 use rand::{rngs::StdRng, SeedableRng};
358
359 struct Harness {
361 inner: MemStorage,
362 storage: Storage<MemStorage>,
363 config: Arc<RwLock<Config>>,
364 }
365
366 impl Harness {
367 fn new(config: Config) -> Self {
368 Self::with_seed(42, config)
369 }
370
371 fn with_seed(seed: u64, config: Config) -> Self {
372 let inner = MemStorage::default();
373 let rng = Arc::new(Mutex::new(
374 Box::new(StdRng::seed_from_u64(seed)) as BoxDynRng
375 ));
376 let config = Arc::new(RwLock::new(config));
377 let storage = Storage::new(inner.clone(), rng, config.clone());
378 Self {
379 inner,
380 storage,
381 config,
382 }
383 }
384 }
385
386 #[tokio::test]
387 async fn test_faulty_storage_no_faults() {
388 let h = Harness::new(Config::default());
389 run_storage_tests(h.storage).await;
390 }
391
392 #[tokio::test]
393 async fn test_faulty_storage_sync_always_fails() {
394 let h = Harness::new(Config::default().sync(1.0));
395
396 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
397 blob.write_at(0, b"data".to_vec()).await.unwrap();
398
399 assert!(matches!(blob.sync().await, Err(Error::Io(_))));
400 }
401
402 #[tokio::test]
403 async fn test_faulty_storage_write_always_fails() {
404 let h = Harness::new(Config::default().write(1.0));
405
406 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
407
408 assert!(matches!(
409 blob.write_at(0, b"data".to_vec()).await,
410 Err(Error::Io(_))
411 ));
412 }
413
414 #[tokio::test]
415 async fn test_faulty_storage_read_always_fails() {
416 let h = Harness::new(Config::default());
417
418 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
420 blob.write_at(0, b"data".to_vec()).await.unwrap();
421 blob.sync().await.unwrap();
422
423 h.config.write().unwrap().read_rate = Some(1.0);
425
426 assert!(matches!(
427 blob.read_at(0, vec![0u8; 4]).await,
428 Err(Error::Io(_))
429 ));
430 }
431
432 #[tokio::test]
433 async fn test_faulty_storage_open_always_fails() {
434 let h = Harness::new(Config::default().open(1.0));
435
436 assert!(matches!(
437 h.storage.open("partition", b"test").await,
438 Err(Error::Io(_))
439 ));
440 }
441
442 #[tokio::test]
443 async fn test_faulty_storage_remove_always_fails() {
444 let h = Harness::new(Config::default());
445
446 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
448 blob.write_at(0, b"data".to_vec()).await.unwrap();
449 blob.sync().await.unwrap();
450 drop(blob);
451
452 h.config.write().unwrap().remove_rate = Some(1.0);
454
455 assert!(matches!(
456 h.storage.remove("partition", Some(b"test")).await,
457 Err(Error::Io(_))
458 ));
459 }
460
461 #[tokio::test]
462 async fn test_faulty_storage_scan_always_fails() {
463 let h = Harness::new(Config::default());
464
465 for i in 0..3 {
467 let name = format!("blob{i}");
468 let (blob, _) = h.storage.open("partition", name.as_bytes()).await.unwrap();
469 blob.write_at(0, b"data".to_vec()).await.unwrap();
470 blob.sync().await.unwrap();
471 }
472
473 h.config.write().unwrap().scan_rate = Some(1.0);
475
476 assert!(matches!(
477 h.storage.scan("partition").await,
478 Err(Error::Io(_))
479 ));
480 }
481
482 #[tokio::test]
483 async fn test_faulty_storage_determinism() {
484 async fn run_ops(seed: u64, rate: f64) -> Vec<bool> {
485 let h = Harness::with_seed(seed, Config::default().open(rate));
486 let mut results = Vec::new();
487 for i in 0..20 {
488 let name = format!("blob{i}");
489 results.push(h.storage.open("partition", name.as_bytes()).await.is_ok());
490 }
491 results
492 }
493
494 let results1 = run_ops(42, 0.5).await;
495 let results2 = run_ops(42, 0.5).await;
496 assert_eq!(results1, results2, "Same seed should produce same results");
497
498 let results3 = run_ops(999, 0.5).await;
499 assert_ne!(
500 results1, results3,
501 "Different seeds should produce different results"
502 );
503 }
504
505 #[tokio::test]
506 async fn test_faulty_storage_rate_for() {
507 let config = Config::default().open(0.1).sync(0.9);
508
509 assert!((config.rate_for(Op::Open) - 0.1).abs() < f64::EPSILON);
510 assert!((config.rate_for(Op::Sync) - 0.9).abs() < f64::EPSILON);
511 assert!(config.rate_for(Op::Write).abs() < f64::EPSILON);
512 }
513
514 #[tokio::test]
515 async fn test_faulty_storage_dynamic_config() {
516 let h = Harness::new(Config::default());
517
518 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
519 blob.sync().await.unwrap();
520
521 h.config.write().unwrap().sync_rate = Some(1.0);
522 assert!(matches!(blob.sync().await, Err(Error::Io(_))));
523
524 h.config.write().unwrap().sync_rate = Some(0.0);
525 blob.sync().await.unwrap();
526 }
527
528 #[tokio::test]
529 async fn test_faulty_storage_partial_write() {
530 let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
531
532 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
533 let data = b"hello world".to_vec();
534 let result = blob.write_at(0, data.clone()).await;
535
536 assert!(matches!(result, Err(Error::Io(_))));
537
538 let (inner_blob, size) = h.inner.open("partition", b"test").await.unwrap();
539 let bytes_written = size as usize;
540 assert!(
541 bytes_written > 0 && bytes_written < data.len(),
542 "Expected partial write: {bytes_written} bytes out of {}",
543 data.len()
544 );
545
546 let read_buf = vec![0u8; bytes_written];
547 let read_result = inner_blob.read_at(0, read_buf).await.unwrap();
548 assert_eq!(read_result.coalesce().as_ref(), &data[..bytes_written]);
549 }
550
551 #[tokio::test]
552 async fn test_faulty_storage_partial_write_disabled() {
553 let h = Harness::new(Config::default().write(1.0).partial_write(0.0));
554
555 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
556 let result = blob.write_at(0, b"hello world".to_vec()).await;
557
558 assert!(matches!(result, Err(Error::Io(_))));
559
560 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
561 assert_eq!(
562 size, 0,
563 "Expected no bytes written when partial_write_rate is 0"
564 );
565 }
566
567 #[tokio::test]
568 async fn test_faulty_storage_partial_write_single_byte() {
569 let h = Harness::new(Config::default().write(1.0).partial_write(1.0));
570
571 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
572 let result = blob.write_at(0, b"x".to_vec()).await;
573
574 assert!(matches!(result, Err(Error::Io(_))));
575
576 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
577 assert_eq!(size, 0, "No partial write possible for single byte");
578 }
579
580 #[tokio::test]
581 async fn test_faulty_storage_partial_resize_grow() {
582 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
583
584 let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
585 assert_eq!(initial_size, 0);
586
587 let target_size = 100u64;
588 let result = blob.resize(target_size).await;
589
590 assert!(matches!(result, Err(Error::Io(_))));
591
592 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
593 assert!(
594 actual_size > 0 && actual_size < target_size,
595 "Expected partial resize: size {actual_size} should be between 0 and {target_size}"
596 );
597 }
598
599 #[tokio::test]
600 async fn test_faulty_storage_partial_resize_shrink() {
601 let h = Harness::new(Config::default());
602
603 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
604 blob.resize(100).await.unwrap();
605 blob.sync().await.unwrap();
606
607 {
608 let mut cfg = h.config.write().unwrap();
609 cfg.resize_rate = Some(1.0);
610 cfg.partial_resize_rate = Some(1.0);
611 }
612
613 let target_size = 10u64;
614 let result = blob.resize(target_size).await;
615
616 assert!(matches!(result, Err(Error::Io(_))));
617
618 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
619 assert!(
620 actual_size > target_size && actual_size < 100,
621 "Expected partial shrink: size {actual_size} should be between {target_size} and 100"
622 );
623 }
624
625 #[tokio::test]
626 async fn test_faulty_storage_partial_resize_disabled() {
627 let h = Harness::new(Config::default().resize(1.0).partial_resize(0.0));
628
629 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
630 let result = blob.resize(100).await;
631
632 assert!(matches!(result, Err(Error::Io(_))));
633
634 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
635 assert_eq!(size, 0, "Expected no resize when partial_resize_rate is 0");
636 }
637
638 #[tokio::test]
639 async fn test_faulty_storage_partial_resize_same_size() {
640 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
641
642 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
643 let result = blob.resize(0).await;
644
645 assert!(matches!(result, Err(Error::Io(_))));
646
647 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
648 assert_eq!(size, 0);
649 }
650
651 #[tokio::test]
652 async fn test_faulty_storage_partial_resize_after_write_extends() {
653 let h = Harness::new(Config::default());
654
655 let (blob, initial_size) = h.storage.open("partition", b"test").await.unwrap();
656 assert_eq!(initial_size, 0);
657
658 blob.write_at(0, vec![0xABu8; 50]).await.unwrap();
659 blob.sync().await.unwrap();
660
661 let (_, size_after_write) = h.inner.open("partition", b"test").await.unwrap();
662 assert_eq!(size_after_write, 50);
663
664 {
665 let mut cfg = h.config.write().unwrap();
666 cfg.resize_rate = Some(1.0);
667 cfg.partial_resize_rate = Some(1.0);
668 }
669
670 let target_size = 10u64;
671 let result = blob.resize(target_size).await;
672
673 assert!(matches!(result, Err(Error::Io(_))));
674
675 let (_, actual_size) = h.inner.open("partition", b"test").await.unwrap();
676 assert!(
677 actual_size > target_size && actual_size < 50,
678 "Expected partial shrink from 50: size {actual_size} should be between {target_size} and 50"
679 );
680 }
681
682 #[tokio::test]
683 async fn test_faulty_storage_partial_resize_one_byte_difference() {
684 let h = Harness::new(Config::default().resize(1.0).partial_resize(1.0));
685
686 let (blob, _) = h.storage.open("partition", b"test").await.unwrap();
687 let result = blob.resize(1).await;
688
689 assert!(matches!(result, Err(Error::Io(_))));
690
691 let (_, size) = h.inner.open("partition", b"test").await.unwrap();
692 assert_eq!(size, 0);
693 }
694}