Skip to main content

dev_chaos/
io.rs

1//! Synchronous IO wrappers that inject failures into real `Read`/`Write`
2//! types.
3//!
4//! Each wrapper holds a [`FailureSchedule`] and increments an attempt
5//! counter on every read/write call. When the schedule fires, the call
6//! returns an `io::Error` derived from the [`FailureMode`]. On
7//! non-failing attempts, the wrapper delegates to the underlying type
8//! and preserves its bytes-on-disk behavior.
9//!
10//! [`FailureMode`]: crate::FailureMode
11
12use std::io::{self, Read, Write};
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15use crate::FailureSchedule;
16
17/// Wraps a `Read` and injects failures per a [`FailureSchedule`].
18///
19/// On non-failing attempts, behavior is identical to the underlying
20/// reader. On failing attempts, returns an `io::Error` with the
21/// schedule's [`FailureMode`](crate::FailureMode) mapped to a
22/// matching `ErrorKind`.
23///
24/// # Example
25///
26/// ```
27/// use dev_chaos::{io::ChaosReader, FailureMode, FailureSchedule};
28/// use std::io::Read;
29///
30/// let data: &[u8] = b"hello";
31/// let schedule = FailureSchedule::on_attempts(&[2], FailureMode::IoError);
32/// let mut reader = ChaosReader::new(data, schedule);
33///
34/// let mut buf = [0u8; 1];
35/// reader.read(&mut buf).unwrap();           // attempt 1: ok
36/// assert!(reader.read(&mut buf).is_err()); // attempt 2: fails
37/// ```
38pub struct ChaosReader<R: Read> {
39    inner: R,
40    schedule: FailureSchedule,
41    attempt: AtomicUsize,
42}
43
44impl<R: Read> ChaosReader<R> {
45    /// Wrap `inner` with the given schedule.
46    pub fn new(inner: R, schedule: FailureSchedule) -> Self {
47        Self {
48            inner,
49            schedule,
50            attempt: AtomicUsize::new(0),
51        }
52    }
53
54    /// Number of read attempts (successful or failed) so far.
55    pub fn attempt_count(&self) -> usize {
56        self.attempt.load(Ordering::Relaxed)
57    }
58
59    /// Consume the wrapper and return the underlying reader.
60    pub fn into_inner(self) -> R {
61        self.inner
62    }
63}
64
65impl<R: Read> Read for ChaosReader<R> {
66    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
67        let n = self.attempt.fetch_add(1, Ordering::Relaxed) + 1;
68        if let Err(f) = self.schedule.maybe_fail(n) {
69            return Err(f.into());
70        }
71        self.inner.read(buf)
72    }
73}
74
75/// Wraps a `Write` and injects failures per a [`FailureSchedule`].
76///
77/// On `PartialWrite` failures, the wrapper writes one byte (when the
78/// caller asked for at least one) before returning the error, so the
79/// caller observes a partial-flush state.
80///
81/// On all other failure modes, the wrapper returns the error without
82/// writing any bytes.
83///
84/// # Example
85///
86/// ```
87/// use dev_chaos::{io::ChaosWriter, FailureMode, FailureSchedule};
88/// use std::io::Write;
89///
90/// let mut sink: Vec<u8> = Vec::new();
91/// let schedule = FailureSchedule::on_attempts(&[2], FailureMode::IoError);
92/// let mut writer = ChaosWriter::new(&mut sink, schedule);
93///
94/// writer.write_all(b"a").unwrap();        // attempt 1: ok
95/// assert!(writer.write_all(b"b").is_err()); // attempt 2: fails
96/// drop(writer);
97/// assert_eq!(sink, b"a");
98/// ```
99pub struct ChaosWriter<W: Write> {
100    inner: W,
101    schedule: FailureSchedule,
102    attempt: AtomicUsize,
103}
104
105impl<W: Write> ChaosWriter<W> {
106    /// Wrap `inner` with the given schedule.
107    pub fn new(inner: W, schedule: FailureSchedule) -> Self {
108        Self {
109            inner,
110            schedule,
111            attempt: AtomicUsize::new(0),
112        }
113    }
114
115    /// Number of write attempts (successful or failed) so far.
116    pub fn attempt_count(&self) -> usize {
117        self.attempt.load(Ordering::Relaxed)
118    }
119
120    /// Consume the wrapper and return the underlying writer.
121    pub fn into_inner(self) -> W {
122        self.inner
123    }
124}
125
126impl<W: Write> Write for ChaosWriter<W> {
127    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
128        let n = self.attempt.fetch_add(1, Ordering::Relaxed) + 1;
129        if let Err(f) = self.schedule.maybe_fail(n) {
130            // Partial-write semantics: write one byte then error so
131            // the caller sees a torn state.
132            if matches!(f.mode, crate::FailureMode::PartialWrite) && !buf.is_empty() {
133                let _ = self.inner.write(&buf[..1])?;
134            }
135            return Err(f.into());
136        }
137        self.inner.write(buf)
138    }
139
140    fn flush(&mut self) -> io::Result<()> {
141        self.inner.flush()
142    }
143}
144
145/// Convenience: a `ChaosWriter<File>`.
146///
147/// Open a real file and wrap it for failure injection on writes.
148///
149/// # Example (ignored: requires a real filesystem path)
150///
151/// ```ignore
152/// use dev_chaos::{io::ChaosFile, FailureMode, FailureSchedule};
153/// use std::io::Write;
154///
155/// let schedule = FailureSchedule::on_attempts(&[3], FailureMode::PartialWrite);
156/// let mut f = ChaosFile::create("/tmp/x.log", schedule).unwrap();
157/// f.write_all(b"data").unwrap();
158/// ```
159pub type ChaosFile = ChaosWriter<std::fs::File>;
160
161impl ChaosFile {
162    /// Create a new file at `path` and wrap it.
163    pub fn create(
164        path: impl AsRef<std::path::Path>,
165        schedule: FailureSchedule,
166    ) -> io::Result<Self> {
167        let f = std::fs::File::create(path)?;
168        Ok(Self::new(f, schedule))
169    }
170
171    /// Open an existing file at `path` for appending and wrap it.
172    pub fn append(
173        path: impl AsRef<std::path::Path>,
174        schedule: FailureSchedule,
175    ) -> io::Result<Self> {
176        let f = std::fs::OpenOptions::new()
177            .create(true)
178            .append(true)
179            .open(path)?;
180        Ok(Self::new(f, schedule))
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use crate::FailureMode;
188
189    #[test]
190    fn reader_passes_through_when_schedule_does_not_fire() {
191        let data: &[u8] = b"hello";
192        let schedule = FailureSchedule::on_attempts(&[10], FailureMode::IoError);
193        let mut r = ChaosReader::new(data, schedule);
194        let mut buf = [0u8; 5];
195        let n = r.read(&mut buf).unwrap();
196        assert_eq!(n, 5);
197        assert_eq!(&buf, b"hello");
198    }
199
200    #[test]
201    fn reader_fails_when_schedule_fires() {
202        let data: &[u8] = b"hello";
203        let schedule = FailureSchedule::on_attempts(&[1], FailureMode::Timeout);
204        let mut r = ChaosReader::new(data, schedule);
205        let mut buf = [0u8; 5];
206        let err = r.read(&mut buf).unwrap_err();
207        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
208    }
209
210    #[test]
211    fn reader_attempt_count_increments() {
212        let data: &[u8] = b"abc";
213        let schedule = FailureSchedule::on_attempts(&[], FailureMode::IoError);
214        let mut r = ChaosReader::new(data, schedule);
215        let mut buf = [0u8; 1];
216        for _ in 0..3 {
217            let _ = r.read(&mut buf);
218        }
219        assert_eq!(r.attempt_count(), 3);
220    }
221
222    #[test]
223    fn writer_passes_through_bytes() {
224        let sink: Vec<u8> = Vec::new();
225        let schedule = FailureSchedule::on_attempts(&[], FailureMode::IoError);
226        let mut w = ChaosWriter::new(sink, schedule);
227        w.write_all(b"hello").unwrap();
228        let sink = w.into_inner();
229        assert_eq!(sink, b"hello");
230    }
231
232    #[test]
233    fn writer_fails_on_scheduled_attempt() {
234        let sink: Vec<u8> = Vec::new();
235        let schedule = FailureSchedule::on_attempts(&[2], FailureMode::ConnectionReset);
236        let mut w = ChaosWriter::new(sink, schedule);
237        w.write_all(b"a").unwrap();
238        let err = w.write_all(b"b").unwrap_err();
239        assert_eq!(err.kind(), io::ErrorKind::ConnectionReset);
240        let sink = w.into_inner();
241        assert_eq!(sink, b"a");
242    }
243
244    #[test]
245    fn writer_partial_write_emits_one_byte_then_error() {
246        let sink: Vec<u8> = Vec::new();
247        let schedule = FailureSchedule::on_attempts(&[1], FailureMode::PartialWrite);
248        let mut w = ChaosWriter::new(sink, schedule);
249        let err = w.write(b"abcd").unwrap_err();
250        assert_eq!(err.kind(), io::ErrorKind::WriteZero);
251        let sink = w.into_inner();
252        assert_eq!(sink, b"a");
253    }
254
255    #[test]
256    fn chaos_file_writes_and_truncates_on_partial() {
257        let dir = tempfile::tempdir().unwrap();
258        let path = dir.path().join("log.txt");
259        let schedule = FailureSchedule::on_attempts(&[2], FailureMode::PartialWrite);
260        let mut f = ChaosFile::create(&path, schedule).unwrap();
261        f.write_all(b"first").unwrap();
262        let _ = f.write(b"second"); // attempt 2: PartialWrite -> 1 byte then err
263        drop(f);
264        let bytes = std::fs::read(&path).unwrap();
265        // "first" + "s" (one byte from "second" before failure).
266        assert_eq!(bytes, b"firsts");
267    }
268
269    #[test]
270    fn into_inner_returns_underlying() {
271        let data: &[u8] = b"x";
272        let schedule = FailureSchedule::on_attempts(&[], FailureMode::IoError);
273        let r = ChaosReader::new(data, schedule);
274        let inner = r.into_inner();
275        assert_eq!(inner, b"x");
276    }
277}