use std::io::{self, Write};
use std::mem::replace;
#[cfg(feature = "crossbeam_channel")]
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
#[cfg(not(feature = "crossbeam_channel"))]
use std::sync::mpsc::{channel, Receiver, Sender};
#[derive(Debug)]
enum Message {
Buffer(io::Cursor<Box<[u8]>>),
Flush,
Done,
}
#[derive(Debug)]
pub struct Writer {
empty_recv: Receiver<io::Result<Box<[u8]>>>,
full_send: Sender<Message>,
buffer: io::Cursor<Box<[u8]>>,
}
impl Writer {
#[inline]
fn new(
empty_recv: Receiver<io::Result<Box<[u8]>>>,
full_send: Sender<Message>,
bufsize: usize,
) -> Self {
let buffer = io::Cursor::new(vec![0; bufsize].into_boxed_slice());
Writer {
empty_recv,
full_send,
buffer,
}
}
#[inline]
fn send_to_background(&mut self) -> io::Result<bool> {
if let Ok(empty) = self.empty_recv.recv() {
let full = replace(&mut self.buffer, io::Cursor::new(empty?));
if self.full_send.send(Message::Buffer(full)).is_ok() {
return Ok(true);
}
}
Ok(false)
}
#[inline]
fn done(&mut self) -> io::Result<()> {
self.send_to_background()?;
self.full_send.send(Message::Done).ok();
Ok(())
}
#[inline]
fn fetch_error(&self) -> io::Result<()> {
for res in &self.empty_recv {
res?;
}
Ok(())
}
}
impl Write for Writer {
fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < buffer.len() {
let n = self.buffer.write(&buffer[written..])?;
written += n;
if n == 0 && !self.send_to_background()? {
break;
}
}
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
self.send_to_background()?;
self.full_send.send(Message::Flush).ok();
Ok(())
}
}
#[derive(Debug)]
struct BackgroundWriter {
full_recv: Receiver<Message>,
empty_send: Sender<io::Result<Box<[u8]>>>,
}
impl BackgroundWriter {
#[inline]
fn new(
full_recv: Receiver<Message>,
empty_send: Sender<io::Result<Box<[u8]>>>,
bufsize: usize,
queuelen: usize,
) -> Self {
for _ in 0..queuelen {
empty_send
.send(Ok(vec![0; bufsize].into_boxed_slice()))
.ok();
}
BackgroundWriter {
full_recv,
empty_send,
}
}
#[inline]
fn listen<W: Write>(&mut self, mut writer: W) -> bool {
while let Ok(msg) = self.full_recv.recv() {
match msg {
Message::Buffer(buf) => {
let pos = buf.position() as usize;
let buffer = buf.into_inner();
let res = writer.write_all(&buffer[..pos]);
let is_err = res.is_err();
self.empty_send.send(res.map(|_| buffer)).ok();
if is_err {
return false;
}
}
Message::Flush => {
if let Err(e) = writer.flush() {
self.empty_send.send(Err(e)).ok();
return false;
}
}
Message::Done => break,
}
}
true
}
}
pub fn writer<W, F, O, E>(bufsize: usize, queuelen: usize, writer: W, func: F) -> Result<O, E>
where
F: FnOnce(&mut Writer) -> Result<O, E>,
W: Write + Send,
E: Send + From<io::Error>,
{
writer_init(bufsize, queuelen, || Ok(writer), func)
}
pub fn writer_init<W, I, F, O, E>(
bufsize: usize,
queuelen: usize,
init_writer: I,
func: F,
) -> Result<O, E>
where
I: Send + FnOnce() -> Result<W, E>,
F: FnOnce(&mut Writer) -> Result<O, E>,
W: Write,
E: Send + From<io::Error>,
{
writer_init_finish(bufsize, queuelen, init_writer, func, |_| ()).map(|(o, _)| o)
}
pub fn writer_finish<W, F, O, F2, O2, E>(
bufsize: usize,
queuelen: usize,
writer: W,
func: F,
finish: F2,
) -> Result<(O, O2), E>
where
F: FnOnce(&mut Writer) -> Result<O, E>,
W: Write + Send,
F2: Send + FnOnce(W) -> O2,
O2: Send,
E: Send + From<io::Error>,
{
writer_init_finish(bufsize, queuelen, || Ok(writer), func, finish)
}
pub fn writer_init_finish<W, I, F, O, F2, O2, E>(
bufsize: usize,
queuelen: usize,
init_writer: I,
func: F,
finish: F2,
) -> Result<(O, O2), E>
where
I: Send + FnOnce() -> Result<W, E>,
F: FnOnce(&mut Writer) -> Result<O, E>,
W: Write,
F2: Send + FnOnce(W) -> O2,
O2: Send,
E: Send + From<io::Error>,
{
assert!(queuelen >= 1);
assert!(bufsize > 0);
let (full_send, full_recv) = channel();
let (empty_send, empty_recv) = channel();
let mut writer = Writer::new(empty_recv, full_send, bufsize);
let mut background_writer = BackgroundWriter::new(full_recv, empty_send, bufsize, queuelen);
crossbeam_utils::thread::scope(|scope| {
let handle = scope.spawn::<_, Result<_, E>>(move |_| {
let mut inner = init_writer()?;
if background_writer.listen(&mut inner) {
return Ok(Some(finish(inner)));
}
Ok(None)
});
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let out = func(&mut writer)?;
writer.flush()?;
Ok::<_, E>(out)
}));
let writer_result = writer.done();
let handle = handle.join();
let of = crate::unwrap_or_resume_unwind(handle)?;
let out = crate::unwrap_or_resume_unwind(out)?;
writer_result?;
writer.fetch_error()?;
Ok((out, of.unwrap()))
})
.unwrap()
}