Skip to main content

opendal_layer_chaos/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Chaos layer implementation for Apache OpenDAL.
19
20#![cfg_attr(docsrs, feature(doc_cfg))]
21#![deny(missing_docs)]
22
23use std::sync::Arc;
24use std::sync::Mutex;
25
26use opendal_core::raw::*;
27use opendal_core::*;
28use rand::prelude::*;
29use rand::rngs::StdRng;
30
31/// Inject chaos into underlying services for robustness test.
32///
33/// # Chaos
34///
35/// Chaos tests is a part of stress test. By generating errors at specified
36/// error ratio, we can reproduce underlying services error more reliable.
37///
38/// Running tests under ChaosLayer will make your application more robust.
39///
40/// For example: If we specify an error rate of 0.5, there is a 50% chance
41/// of an EOF error for every read operation.
42///
43/// # Note
44///
45/// For now, ChaosLayer only injects read operations. More operations may
46/// be added in the future.
47///
48/// # Examples
49///
50/// ```no_run
51/// # use opendal_core::services;
52/// # use opendal_core::Operator;
53/// # use opendal_core::Result;
54/// # use opendal_layer_chaos::ChaosLayer;
55/// #
56/// # fn main() -> Result<()> {
57/// let _ = Operator::new(services::Memory::default())?
58///     .layer(ChaosLayer::new(0.1))
59///     .finish();
60/// # Ok(())
61/// # }
62/// ```
63#[derive(Clone)]
64pub struct ChaosLayer {
65    error_ratio: f64,
66}
67
68impl ChaosLayer {
69    /// Create a new [`ChaosLayer`] with specified error ratio.
70    ///
71    /// # Panics
72    ///
73    /// Input error_ratio must in [0.0..=1.0]
74    pub fn new(error_ratio: f64) -> Self {
75        assert!(
76            (0.0..=1.0).contains(&error_ratio),
77            "error_ratio must between 0.0 and 1.0"
78        );
79        Self { error_ratio }
80    }
81}
82
83impl<A: Access> Layer<A> for ChaosLayer {
84    type LayeredAccess = ChaosAccessor<A>;
85
86    fn layer(&self, inner: A) -> Self::LayeredAccess {
87        ChaosAccessor {
88            inner,
89            rng: Arc::new(Mutex::new(rand::make_rng())),
90            error_ratio: self.error_ratio,
91        }
92    }
93}
94
95#[doc(hidden)]
96#[derive(Debug)]
97pub struct ChaosAccessor<A> {
98    inner: A,
99    rng: Arc<Mutex<StdRng>>,
100
101    error_ratio: f64,
102}
103
104impl<A: Access> LayeredAccess for ChaosAccessor<A> {
105    type Inner = A;
106    type Reader = ChaosReader<A::Reader>;
107    type Writer = A::Writer;
108    type Lister = A::Lister;
109    type Deleter = A::Deleter;
110    type Copier = A::Copier;
111
112    fn inner(&self) -> &Self::Inner {
113        &self.inner
114    }
115
116    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
117        self.inner.read(path, args).await.map(|(rp, r)| {
118            (
119                rp,
120                ChaosReader::new(r, Arc::clone(&self.rng), self.error_ratio),
121            )
122        })
123    }
124
125    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
126        self.inner.write(path, args).await
127    }
128
129    async fn copy(
130        &self,
131        from: &str,
132        to: &str,
133        args: OpCopy,
134        opts: OpCopier,
135    ) -> Result<(RpCopy, Self::Copier)> {
136        self.inner.copy(from, to, args, opts).await
137    }
138
139    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
140        self.inner.list(path, args).await
141    }
142
143    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
144        self.inner.delete().await
145    }
146}
147
148#[doc(hidden)]
149pub struct ChaosReader<R> {
150    inner: R,
151    rng: Arc<Mutex<StdRng>>,
152
153    error_ratio: f64,
154}
155
156impl<R> ChaosReader<R> {
157    fn new(inner: R, rng: Arc<Mutex<StdRng>>, error_ratio: f64) -> Self {
158        Self {
159            inner,
160            rng,
161            error_ratio,
162        }
163    }
164
165    /// If I feel lucky, we can return the correct response. Otherwise,
166    /// we need to generate an error.
167    fn i_feel_lucky(&self) -> bool {
168        let point: u32 = self.rng.lock().unwrap().random_range(0..100);
169        point >= (self.error_ratio * 100.0) as u32
170    }
171
172    fn unexpected_eof() -> Error {
173        Error::new(ErrorKind::Unexpected, "I am your chaos!")
174            .with_operation("chaos")
175            .set_temporary()
176    }
177}
178
179impl<R: oio::Read> oio::Read for ChaosReader<R> {
180    async fn read(&mut self) -> Result<Buffer> {
181        if self.i_feel_lucky() {
182            self.inner.read().await
183        } else {
184            Err(Self::unexpected_eof())
185        }
186    }
187}