fused_reader/
lib.rs

1/*!
2When working with data pipes it is often necessary to distinguish between EOF on the reader side caused by writer thread finishing write and writer panicking. This crate provides fused reader type that if writer thread dies while holding armed fuse the reader will get `BrokenPipe` error.
3
4Fuses can also be blown with custom error that is passed to the reader end.
5
6Example usage
7=============
8
9Writer panics and reader gets `BrokenPipe` error.
10
11```rust
12use pipe::pipe;
13use fused_reader::fuse;
14use std::io::{Read, Write, ErrorKind};
15use std::thread;
16
17let (reader, mut writer) = pipe();
18let (mut reader, fuse) = fuse(reader);
19
20thread::spawn(move || {
21    let _fuse = fuse.arm().unwrap();
22    writer.write(&[1]).unwrap();
23    panic!("boom");
24});
25
26let mut data = Vec::new();
27
28assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::BrokenPipe);
29assert_eq!(&data, &[1]); // data that was written before panic
30```
31
32Writer fails with error passed to reader.
33
34```rust
35use pipe::pipe;
36use fused_reader::fuse;
37use std::io::{Read, Write, Error as IoError, ErrorKind};
38use std::thread;
39
40let (reader, mut writer) = pipe();
41let (mut reader, fuse) = fuse(reader);
42
43thread::spawn(move || {
44    let fuse = fuse.arm().unwrap();
45    writer.write(&[1]).unwrap();
46    fuse.blow(IoError::new(ErrorKind::UnexpectedEof, "uh! oh!"))
47});
48
49let mut data = Vec::new();
50
51assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::UnexpectedEof);
52assert_eq!(&data, &[1]); // data that was written before error
53```
54!*/
55use std::io::{Read, Error as IoError, ErrorKind};
56use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
57
58/// Fuses reader so that if writer thread dies while holding armed fuse the reader will get `BrokenPipe` error.
59pub fn fuse<R: Read>(reader: R) -> (FusedReader<R>, Fuse) {
60    let reader_fuse = Arc::new(Mutex::new(Ok(())));
61    let writer_fuse = reader_fuse.clone();
62( FusedReader {
63            reader,
64            fuse: reader_fuse,
65        },
66        Fuse(writer_fuse),
67    )
68}
69
70/// Reader that will fail with I/O error if fuse was blown.
71#[derive(Debug)]
72pub struct FusedReader<R: Read> {
73    reader: R,
74    fuse: Arc<Mutex<Result<(), IoError>>>,
75}
76
77/// Status of the fuse.
78#[derive(Debug)]
79pub enum FuseStatus {
80    /// Fuse was not armed or guard got dropped.
81    Unarmed,
82    /// Fuse armed.
83    Armed,
84    /// Fuse blown with custom error.
85    Blown(IoError),
86    /// Fuse blown by panic unwind.
87    Poisoned,
88}
89
90impl<R: Read> FusedReader<R> {
91    /// Checks status of the fuse.
92    ///
93    /// Note that the variant `FuseStatus::Blown` is provided only once and following calls will
94    /// return `FuseStatus::Unarmed` instead.
95    pub fn check_fuse(&mut self) -> FuseStatus {
96        match self.fuse.try_lock() {
97            Err(TryLockError::Poisoned(_)) => FuseStatus::Poisoned,
98            Ok(mut guard) => {
99                if guard.is_err() {
100                    let mut res = Ok(());
101                    std::mem::swap(&mut *guard, &mut res);
102                    FuseStatus::Blown(res.unwrap_err())
103                } else {
104                    FuseStatus::Unarmed
105                }
106            }
107            Err(TryLockError::WouldBlock) => FuseStatus::Armed,
108        }
109    }
110
111    /// Returns inner reader.
112    pub fn into_inner(self) -> R {
113        self.reader
114    }
115}
116
117impl<R: Read> Read for FusedReader<R> {
118    fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
119        // let it read to end before checking fuse
120        self.reader.read(buf).and_then(|bytes| if bytes == 0 {
121            match self.check_fuse() {
122                FuseStatus::Blown(err) => Err(err),
123                FuseStatus::Poisoned => Err(IoError::new(ErrorKind::BrokenPipe, "writer end dropped due to panic")),
124                FuseStatus::Unarmed |
125                FuseStatus::Armed => Ok(bytes),
126            }
127        } else {
128            Ok(bytes)
129        })
130    }
131}
132
133/// Fuse that can be armed.
134#[derive(Debug)]
135pub struct Fuse(Arc<Mutex<Result<(), IoError>>>);
136
137impl Fuse {
138    /// Arms the fuse.
139    ///
140    /// Returns `BrokenPipe` error if reader was dropped due to panic.
141    pub fn arm(&self) -> Result<FuseGuard, IoError> {
142        self.0.lock().map(FuseGuard).map_err(|_| IoError::new(ErrorKind::BrokenPipe, "reader end dropped due to panic"))
143    }
144}
145
146/// Armed fuse that if dropped due to panic will signal reader to fail with `BrokenPipe` error.
147#[derive(Debug)]
148pub struct FuseGuard<'a>(MutexGuard<'a, Result<(), IoError>>);
149
150impl<'a> FuseGuard<'a> {
151    /// Blows the fuse with given error.
152    ///
153    /// The reader end will fail with this error after reaching EOF.
154    pub fn blow(mut self, err: IoError) {
155        *self.0 = Err(err);
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use std::thread;
163    use std::io::Write;
164    use pipe::pipe;
165
166    #[test]
167    fn test_unfused_panic() {
168        let (mut reader, mut writer) = pipe();
169
170        thread::spawn(move || {
171            writer.write(&[1]).unwrap();
172            panic!("boom");
173        });
174
175        let mut data = Vec::new();
176
177        assert!(reader.read_to_end(&mut data).is_ok());
178        assert_eq!(&data, &[1]);
179    }
180
181    #[test]
182    fn test_fused_nopanic() {
183        let (reader, mut writer) = pipe();
184
185        let (mut reader, fuse) = fuse(reader);
186
187        thread::spawn(move || {
188            let _fuse = fuse.arm().unwrap();
189            writer.write(&[1]).unwrap();
190        });
191
192        let mut data = Vec::new();
193
194        assert!(reader.read_to_end(&mut data).is_ok());
195        assert_eq!(&data, &[1]);
196    }
197
198    #[test]
199    fn test_fused_panic() {
200        let (reader, mut writer) = pipe();
201
202        let (mut reader, fuse) = fuse(reader);
203
204        thread::spawn(move || {
205            let _fuse = fuse.arm().unwrap();
206            writer.write(&[1]).unwrap();
207            panic!("boom");
208        });
209
210        let mut data = Vec::new();
211
212        assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::BrokenPipe);
213        assert_eq!(&data, &[1]);
214    }
215
216    #[test]
217    fn test_fused_blow() {
218        let (reader, mut writer) = pipe();
219
220        let (mut reader, fuse) = fuse(reader);
221
222        thread::spawn(move || {
223            let fuse = fuse.arm().unwrap();
224            writer.write(&[1]).unwrap();
225            fuse.blow(IoError::new(ErrorKind::UnexpectedEof, "uh! oh!"))
226        });
227
228        let mut data = Vec::new();
229
230        assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::UnexpectedEof);
231        assert_eq!(&data, &[1]);
232    }
233}