use std::io::{Read, Error as IoError, ErrorKind};
use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
pub fn fuse<R: Read>(reader: R) -> (FusedReader<R>, Fuse) {
let reader_fuse = Arc::new(Mutex::new(Ok(())));
let writer_fuse = reader_fuse.clone();
( FusedReader {
reader,
fuse: reader_fuse,
},
Fuse(writer_fuse),
)
}
#[derive(Debug)]
pub struct FusedReader<R: Read> {
reader: R,
fuse: Arc<Mutex<Result<(), IoError>>>,
}
#[derive(Debug)]
pub enum FuseStatus {
Unarmed,
Armed,
Blown(IoError),
Poisoned,
}
impl<R: Read> FusedReader<R> {
pub fn check_fuse(&mut self) -> FuseStatus {
match self.fuse.try_lock() {
Err(TryLockError::Poisoned(_)) => FuseStatus::Poisoned,
Ok(mut guard) => {
if guard.is_err() {
let mut res = Ok(());
std::mem::swap(&mut *guard, &mut res);
FuseStatus::Blown(res.unwrap_err())
} else {
FuseStatus::Unarmed
}
}
Err(TryLockError::WouldBlock) => FuseStatus::Armed,
}
}
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: Read> Read for FusedReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.reader.read(buf).and_then(|bytes| if bytes == 0 {
match self.check_fuse() {
FuseStatus::Blown(err) => Err(err),
FuseStatus::Poisoned => Err(IoError::new(ErrorKind::BrokenPipe, "writer end dropped due to panic")),
FuseStatus::Unarmed |
FuseStatus::Armed => Ok(bytes),
}
} else {
Ok(bytes)
})
}
}
#[derive(Debug)]
pub struct Fuse(Arc<Mutex<Result<(), IoError>>>);
impl Fuse {
pub fn arm(&self) -> Result<FuseGuard, IoError> {
self.0.lock().map(FuseGuard).map_err(|_| IoError::new(ErrorKind::BrokenPipe, "reader end dropped due to panic"))
}
}
#[derive(Debug)]
pub struct FuseGuard<'a>(MutexGuard<'a, Result<(), IoError>>);
impl<'a> FuseGuard<'a> {
pub fn blow(mut self, err: IoError) {
*self.0 = Err(err);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::io::Write;
use pipe::pipe;
#[test]
fn test_unfused_panic() {
let (mut reader, mut writer) = pipe();
thread::spawn(move || {
writer.write(&[1]).unwrap();
panic!("boom");
});
let mut data = Vec::new();
assert!(reader.read_to_end(&mut data).is_ok());
assert_eq!(&data, &[1]);
}
#[test]
fn test_fused_nopanic() {
let (reader, mut writer) = pipe();
let (mut reader, fuse) = fuse(reader);
thread::spawn(move || {
let _fuse = fuse.arm().unwrap();
writer.write(&[1]).unwrap();
});
let mut data = Vec::new();
assert!(reader.read_to_end(&mut data).is_ok());
assert_eq!(&data, &[1]);
}
#[test]
fn test_fused_panic() {
let (reader, mut writer) = pipe();
let (mut reader, fuse) = fuse(reader);
thread::spawn(move || {
let _fuse = fuse.arm().unwrap();
writer.write(&[1]).unwrap();
panic!("boom");
});
let mut data = Vec::new();
assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::BrokenPipe);
assert_eq!(&data, &[1]);
}
#[test]
fn test_fused_blow() {
let (reader, mut writer) = pipe();
let (mut reader, fuse) = fuse(reader);
thread::spawn(move || {
let fuse = fuse.arm().unwrap();
writer.write(&[1]).unwrap();
fuse.blow(IoError::new(ErrorKind::UnexpectedEof, "uh! oh!"))
});
let mut data = Vec::new();
assert_eq!(reader.read_to_end(&mut data).unwrap_err().kind(), ErrorKind::UnexpectedEof);
assert_eq!(&data, &[1]);
}
}