1use std::io::{Read, Error as IoError, ErrorKind};
56use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
57
58pub fn fuse<R: Read>(reader: R) -> (FusedReader<R>, Fuse) {
60 let reader_fuse = Arc::new(Mutex::new(Ok(())));
61 let writer_fuse = reader_fuse.clone();
62( FusedReader {
63 reader,
64 fuse: reader_fuse,
65 },
66 Fuse(writer_fuse),
67 )
68}
69
70#[derive(Debug)]
72pub struct FusedReader<R: Read> {
73 reader: R,
74 fuse: Arc<Mutex<Result<(), IoError>>>,
75}
76
77#[derive(Debug)]
79pub enum FuseStatus {
80 Unarmed,
82 Armed,
84 Blown(IoError),
86 Poisoned,
88}
89
90impl<R: Read> FusedReader<R> {
91 pub fn check_fuse(&mut self) -> FuseStatus {
96 match self.fuse.try_lock() {
97 Err(TryLockError::Poisoned(_)) => FuseStatus::Poisoned,
98 Ok(mut guard) => {
99 if guard.is_err() {
100 let mut res = Ok(());
101 std::mem::swap(&mut *guard, &mut res);
102 FuseStatus::Blown(res.unwrap_err())
103 } else {
104 FuseStatus::Unarmed
105 }
106 }
107 Err(TryLockError::WouldBlock) => FuseStatus::Armed,
108 }
109 }
110
111 pub fn into_inner(self) -> R {
113 self.reader
114 }
115}
116
117impl<R: Read> Read for FusedReader<R> {
118 fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
119 self.reader.read(buf).and_then(|bytes| if bytes == 0 {
121 match self.check_fuse() {
122 FuseStatus::Blown(err) => Err(err),
123 FuseStatus::Poisoned => Err(IoError::new(ErrorKind::BrokenPipe, "writer end dropped due to panic")),
124 FuseStatus::Unarmed |
125 FuseStatus::Armed => Ok(bytes),
126 }
127 } else {
128 Ok(bytes)
129 })
130 }
131}
132
133#[derive(Debug)]
135pub struct Fuse(Arc<Mutex<Result<(), IoError>>>);
136
137impl Fuse {
138 pub fn arm(&self) -> Result<FuseGuard, IoError> {
142 self.0.lock().map(FuseGuard).map_err(|_| IoError::new(ErrorKind::BrokenPipe, "reader end dropped due to panic"))
143 }
144}
145
146#[derive(Debug)]
148pub struct FuseGuard<'a>(MutexGuard<'a, Result<(), IoError>>);
149
150impl<'a> FuseGuard<'a> {
151 pub fn blow(mut self, err: IoError) {
155 *self.0 = Err(err);
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use std::thread;
163 use std::io::Write;
164 use pipe::pipe;
165
166 #[test]
167 fn test_unfused_panic() {
168 let (mut reader, mut writer) = pipe();
169
170 thread::spawn(move || {
171 writer.write(&[1]).unwrap();
172 panic!("boom");
173 });
174
175 let mut data = Vec::new();
176
177 assert!(reader.read_to_end(&mut data).is_ok());
178 assert_eq!(&data, &[1]);
179 }
180
181 #[test]
182 fn test_fused_nopanic() {
183 let (reader, mut writer) = pipe();
184
185 let (mut reader, fuse) = fuse(reader);
186
187 thread::spawn(move || {
188 let _fuse = fuse.arm().unwrap();
189 writer.write(&[1]).unwrap();
190 });
191
192 let mut data = Vec::new();
193
194 assert!(reader.read_to_end(&mut data).is_ok());
195 assert_eq!(&data, &[1]);
196 }
197
198 #[test]
199 fn test_fused_panic() {
200 let (reader, mut writer) = pipe();
201
202 let (mut reader, fuse) = fuse(reader);
203
204 thread::spawn(move || {
205 let _fuse = fuse.arm().unwrap();
206 writer.write(&[1]).unwrap();
207 panic!("boom");
208 });
209
210 let mut data = Vec::new();
211
212 assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::BrokenPipe);
213 assert_eq!(&data, &[1]);
214 }
215
216 #[test]
217 fn test_fused_blow() {
218 let (reader, mut writer) = pipe();
219
220 let (mut reader, fuse) = fuse(reader);
221
222 thread::spawn(move || {
223 let fuse = fuse.arm().unwrap();
224 writer.write(&[1]).unwrap();
225 fuse.blow(IoError::new(ErrorKind::UnexpectedEof, "uh! oh!"))
226 });
227
228 let mut data = Vec::new();
229
230 assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::UnexpectedEof);
231 assert_eq!(&data, &[1]);
232 }
233}