#[cfg(feature = "crossbeam_channel")]
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
use std::io::{self, Cursor, Read};
#[cfg(not(feature = "crossbeam_channel"))]
use std::sync::mpsc::{channel, Receiver, Sender};
#[derive(Debug)]
struct Buffer {
data: Box<[u8]>,
end: usize,
interrupted: bool,
}
impl Buffer {
#[inline]
fn new(size: usize) -> Buffer {
assert!(size > 0);
Buffer {
data: vec![0; size].into_boxed_slice(),
end: 0,
interrupted: false,
}
}
#[inline]
fn refill<R: Read>(&mut self, mut reader: R) -> io::Result<()> {
let mut n_read = 0;
let mut buf = &mut *self.data;
self.interrupted = false;
while !buf.is_empty() {
match reader.read(buf) {
Ok(n) => {
if n == 0 {
break;
}
let tmp = buf;
buf = &mut tmp[n..];
n_read += n;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
self.interrupted = true;
break;
}
Err(e) => return Err(e),
};
}
self.end = n_read;
Ok(())
}
}
#[derive(Debug)]
pub struct Reader {
full_recv: Receiver<io::Result<Buffer>>,
empty_send: Sender<Option<Buffer>>,
buffer: Option<Buffer>,
pos: usize,
}
impl Reader {
#[inline]
fn new(
full_recv: Receiver<io::Result<Buffer>>,
empty_send: Sender<Option<Buffer>>,
bufsize: usize,
queuelen: usize,
) -> Self {
for _ in 0..queuelen {
empty_send.send(Some(Buffer::new(bufsize))).ok();
}
Reader {
full_recv,
empty_send,
buffer: None,
pos: 0,
}
}
#[inline]
fn done(&self) {
self.empty_send.send(None).ok();
}
#[inline]
fn _read(&mut self, buf: &mut [u8]) -> (io::Result<usize>, bool) {
let source = self.buffer.as_mut().unwrap();
if source.interrupted && self.pos == source.end {
return (Err(io::Error::from(io::ErrorKind::Interrupted)), true);
}
let n = Cursor::new(&source.data[self.pos..source.end])
.read(buf)
.unwrap();
self.pos += n;
(Ok(n), self.pos == source.end && !source.interrupted)
}
}
impl io::Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buffer.is_none() {
self.buffer = Some(self.full_recv.recv().ok().unwrap()?);
}
let (rv, recv_next) = self._read(buf);
if recv_next {
self.empty_send.send(self.buffer.take()).ok();
self.pos = 0;
}
rv
}
}
#[derive(Debug)]
struct BackgroundReader {
empty_recv: Receiver<Option<Buffer>>,
full_send: Sender<io::Result<Buffer>>,
}
impl BackgroundReader {
#[inline]
fn new(empty_recv: Receiver<Option<Buffer>>, full_send: Sender<io::Result<Buffer>>) -> Self {
BackgroundReader {
empty_recv,
full_send,
}
}
#[inline]
fn serve<R: Read>(&mut self, mut reader: R) {
while let Ok(Some(mut buffer)) = self.empty_recv.recv() {
match buffer.refill(&mut reader) {
Ok(_) => {
self.full_send.send(Ok(buffer)).ok();
}
Err(e) => {
self.full_send.send(Err(e)).ok();
break;
}
}
}
}
}
pub fn reader<R, F, O, E>(bufsize: usize, queuelen: usize, reader: R, func: F) -> Result<O, E>
where
F: FnOnce(&mut Reader) -> Result<O, E>,
R: io::Read + Send,
E: Send,
{
reader_init(bufsize, queuelen, || Ok(reader), func)
}
pub fn reader_init<R, I, F, O, E>(
bufsize: usize,
queuelen: usize,
init_reader: I,
func: F,
) -> Result<O, E>
where
I: Send + FnOnce() -> Result<R, E>,
F: FnOnce(&mut Reader) -> Result<O, E>,
R: io::Read,
E: Send,
{
assert!(queuelen >= 1);
assert!(bufsize > 0);
let (full_send, full_recv) = channel();
let (empty_send, empty_recv) = channel();
let mut reader = Reader::new(full_recv, empty_send, bufsize, queuelen);
let mut background_reader = BackgroundReader::new(empty_recv, full_send);
crossbeam_utils::thread::scope(|scope| {
let handle = scope.spawn(move |_| {
let mut inner = init_reader()?;
background_reader.serve(&mut inner);
Ok::<_, E>(())
});
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(&mut reader)));
reader.done();
crate::unwrap_or_resume_unwind(handle.join())?;
crate::unwrap_or_resume_unwind(out)
})
.unwrap()
}