git_features/
io.rs

1//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
2
3/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
4#[cfg(feature = "io-pipe")]
5pub mod pipe {
6    use std::io;
7
8    use bytes::{Buf, BufMut, BytesMut};
9
10    /// The write-end of the pipe, receiving items to become available in the [`Reader`].
11    ///
12    /// It's commonly used with the [`std::io::Write`] trait it implements.
13    pub struct Writer {
14        /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
15        pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
16        buf: BytesMut,
17    }
18
19    /// The read-end of the pipe, implementing the [`std::io::Read`] trait.
20    pub struct Reader {
21        channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
22        buf: BytesMut,
23    }
24
25    impl io::BufRead for Reader {
26        fn fill_buf(&mut self) -> io::Result<&[u8]> {
27            if self.buf.is_empty() {
28                match self.channel.recv() {
29                    Ok(Ok(buf)) => self.buf = buf,
30                    Ok(Err(err)) => return Err(err),
31                    Err(_) => {}
32                }
33            };
34            Ok(&self.buf)
35        }
36
37        fn consume(&mut self, amt: usize) {
38            self.buf.advance(amt.min(self.buf.len()));
39        }
40    }
41
42    impl io::Read for Reader {
43        fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
44            let mut written = 0;
45            while !out.is_empty() {
46                if self.buf.is_empty() {
47                    match self.channel.recv() {
48                        Ok(Ok(buf)) => self.buf = buf,
49                        Ok(Err(err)) => return Err(err),
50                        Err(_) => break,
51                    }
52                }
53                let bytes_to_write = self.buf.len().min(out.len());
54                let (to_write, rest) = out.split_at_mut(bytes_to_write);
55                self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
56                out = rest;
57                written += bytes_to_write;
58            }
59            Ok(written)
60        }
61    }
62
63    impl io::Write for Writer {
64        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
65            self.buf.put_slice(buf);
66            self.channel
67                .send(Ok(self.buf.split()))
68                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
69            Ok(buf.len())
70        }
71
72        fn flush(&mut self) -> io::Result<()> {
73            Ok(())
74        }
75    }
76
77    /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
78    ///
79    /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
80    ///    If `None` or `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
81    pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) {
82        let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0));
83        (
84            Writer {
85                channel: tx,
86                buf: BytesMut::with_capacity(4096),
87            },
88            Reader {
89                channel: rx,
90                buf: BytesMut::new(),
91            },
92        )
93    }
94}