1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
mod io { use bytes::{Buf, BufMut, BytesMut}; use std::io; pub struct Writer { pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>, buf: BytesMut, } pub struct Reader { channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>, buf: BytesMut, } impl io::BufRead for Reader { fn fill_buf(&mut self) -> io::Result<&[u8]> { if self.buf.is_empty() { match self.channel.recv() { Ok(Ok(buf)) => self.buf = buf, Ok(Err(err)) => return Err(err), Err(_) => {} } }; Ok(&self.buf) } fn consume(&mut self, amt: usize) { self.buf.advance(amt.min(self.buf.len())); } } impl io::Read for Reader { fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> { let mut written = 0; while !out.is_empty() { if self.buf.is_empty() { match self.channel.recv() { Ok(Ok(buf)) => self.buf = buf, Ok(Err(err)) => return Err(err), Err(_) => break, } } let bytes_to_write = self.buf.len().min(out.len()); let (to_write, rest) = out.split_at_mut(bytes_to_write); self.buf.split_to(bytes_to_write).copy_to_slice(to_write); out = rest; written += bytes_to_write; } Ok(written) } } impl io::Write for Writer { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.buf.put_slice(buf); self.channel .send(Ok(self.buf.split())) .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) { let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0)); ( Writer { channel: tx, buf: BytesMut::with_capacity(4096), }, Reader { channel: rx, buf: BytesMut::new(), }, ) } } pub use io::{unidirectional, Reader, Writer};