1#[cfg(feature = "io-pipe")]
5pub mod pipe {
6 use std::io;
7
8 use bytes::{Buf, BufMut, BytesMut};
9
10 pub struct Writer {
14 pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
16 buf: BytesMut,
17 }
18
19 pub struct Reader {
21 channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
22 buf: BytesMut,
23 }
24
25 impl io::BufRead for Reader {
26 fn fill_buf(&mut self) -> io::Result<&[u8]> {
27 if self.buf.is_empty() {
28 match self.channel.recv() {
29 Ok(Ok(buf)) => self.buf = buf,
30 Ok(Err(err)) => return Err(err),
31 Err(_) => {}
32 }
33 };
34 Ok(&self.buf)
35 }
36
37 fn consume(&mut self, amt: usize) {
38 self.buf.advance(amt.min(self.buf.len()));
39 }
40 }
41
42 impl io::Read for Reader {
43 fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
44 let mut written = 0;
45 while !out.is_empty() {
46 if self.buf.is_empty() {
47 match self.channel.recv() {
48 Ok(Ok(buf)) => self.buf = buf,
49 Ok(Err(err)) => return Err(err),
50 Err(_) => break,
51 }
52 }
53 let bytes_to_write = self.buf.len().min(out.len());
54 let (to_write, rest) = out.split_at_mut(bytes_to_write);
55 self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
56 out = rest;
57 written += bytes_to_write;
58 }
59 Ok(written)
60 }
61 }
62
63 impl io::Write for Writer {
64 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
65 self.buf.put_slice(buf);
66 self.channel
67 .send(Ok(self.buf.split()))
68 .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
69 Ok(buf.len())
70 }
71
72 fn flush(&mut self) -> io::Result<()> {
73 Ok(())
74 }
75 }
76
77 pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) {
82 let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0));
83 (
84 Writer {
85 channel: tx,
86 buf: BytesMut::with_capacity(4096),
87 },
88 Reader {
89 channel: rx,
90 buf: BytesMut::new(),
91 },
92 )
93 }
94}