io_pipe/
sync_pipe.rs

1use std::collections::VecDeque;
2use std::fmt::Arguments;
3use std::io::{BufRead, IoSlice};
4use std::io::{Error, ErrorKind, Read, Result as IOResult, Write};
5
6use loole::{Receiver, Sender, unbounded};
7
8use crate::state::SharedState;
9
10/// Creates a pair of synchronous writer and reader objects.
11///
12/// This function returns a tuple containing a `Writer` and a `Reader`.
13/// The `Writer` can be used to write data, which can then be read from the `Reader`.
14///
15/// # Returns
16///
17/// A tuple containing `(Writer, Reader)`.
18///
19/// # Example
20///
21/// ```rust
22/// use std::io::{read_to_string, Write};
23/// use io_pipe::pipe;
24///
25/// let (mut writer, reader) = pipe();
26/// writer.write_all("hello".as_bytes()).unwrap();
27/// drop(writer);
28///
29/// assert_eq!("hello".to_string(), read_to_string(reader).unwrap());
30/// ```
31pub fn pipe() -> (Writer, Reader) {
32    let (sender, receiver) = unbounded();
33
34    let state = SharedState::default();
35
36    (
37        Writer {
38            sender,
39            state: state.clone(),
40        },
41        Reader {
42            receiver,
43            state,
44            buf: VecDeque::new(),
45        },
46    )
47}
48
49/// A synchronous writer that implements `Write`.
50///
51/// This struct allows writing data synchronously, which can be read from a corresponding `Reader`.
52/// Multiple `Writer` instances can be created by cloning, allowing writes from different threads.
53///
54/// # Notes
55///
56/// - All write calls are executed immediately without blocking the thread.
57/// - It's safe to use this writer inside async operations.
58/// - Write method will return an error when the reader is dropped.
59///
60/// # Example
61///
62/// ```rust
63/// use std::io::Write;
64/// use io_pipe::pipe;
65///
66/// let (mut writer, reader) = pipe();
67/// writer.write_all("hello".as_bytes()).unwrap();
68/// ```
69#[derive(Clone, Debug)]
70pub struct Writer {
71    pub(crate) sender: Sender<()>,
72    pub(crate) state: SharedState,
73}
74
75impl Write for Writer {
76    fn write(&mut self, buf: &[u8]) -> IOResult<usize> {
77        let n = self.state.write(buf)?;
78        match self.sender.send(()) {
79            Ok(_) => Ok(n),
80            Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
81        }
82    }
83
84    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
85        let n = self.state.write_vectored(bufs)?;
86        match self.sender.send(()) {
87            Ok(_) => Ok(n),
88            Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
89        }
90    }
91
92    fn flush(&mut self) -> IOResult<()> {
93        self.state.flush()
94    }
95
96    fn write_all(&mut self, buf: &[u8]) -> IOResult<()> {
97        self.state.write_all(buf)?;
98        match self.sender.send(()) {
99            Ok(_) => Ok(()),
100            Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
101        }
102    }
103
104    fn write_fmt(&mut self, fmt: Arguments<'_>) -> IOResult<()> {
105        self.state.write_fmt(fmt)?;
106        match self.sender.send(()) {
107            Ok(_) => Ok(()),
108            Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
109        }
110    }
111}
112
113/// A synchronous reader that implements `Read` and `BufRead`.
114///
115/// This struct allows reading data synchronously that was written to a corresponding `Writer`.
116/// The reader will produce bytes until all writers are dropped.
117///
118/// # Notes
119///
120/// - Reads may block the thread until a writer sends data.
121/// - Implements the `BufRead` trait for buffered reading.
122/// - Be cautious of potential deadlocks when reading from the reader before dropping the writer in a single thread.
123///
124/// # Example
125///
126/// ```rust
127/// use std::io::{read_to_string, Write};
128/// use io_pipe::pipe;
129///
130/// let (mut writer, reader) = pipe();
131/// writer.write_all("hello".as_bytes()).unwrap();
132/// drop(writer);
133///
134/// assert_eq!("hello".to_string(), read_to_string(reader).unwrap());
135/// ```
136#[derive(Debug)]
137pub struct Reader {
138    pub(crate) receiver: Receiver<()>,
139    pub(crate) buf: VecDeque<u8>,
140    pub(crate) state: SharedState,
141}
142
143impl BufRead for Reader {
144    fn fill_buf(&mut self) -> IOResult<&[u8]> {
145        while self.buf.is_empty() {
146            let n = self.state.copy_to(&mut self.buf)?;
147            if n == 0 && self.receiver.recv().is_err() {
148                break;
149            }
150        }
151
152        self.buf.fill_buf()
153    }
154
155    fn consume(&mut self, amt: usize) {
156        self.buf.consume(amt)
157    }
158}
159
160impl Read for Reader {
161    fn read(&mut self, mut buf: &mut [u8]) -> IOResult<usize> {
162        let n = buf.write(self.fill_buf()?)?;
163        self.consume(n);
164        Ok(n)
165    }
166}
167#[cfg(test)]
168mod tests {
169    use std::io::{BufRead, IoSlice, Read, Write, read_to_string};
170    use std::thread::spawn;
171
172    #[test]
173    fn base_write_case() {
174        // Checking non-blocking buffer inside writer
175        let (mut writer, reader) = crate::pipe();
176        for _ in 0..1000 {
177            writer.write_all("hello".as_bytes()).unwrap();
178        }
179        drop(reader)
180    }
181
182    #[test]
183    fn base_read_case() {
184        let (mut writer, reader) = crate::pipe();
185        writer.write_all("hello ".as_bytes()).unwrap();
186        writer.write_all("world".as_bytes()).unwrap();
187        drop(writer);
188
189        assert_eq!("hello world".to_string(), read_to_string(reader).unwrap());
190    }
191    #[test]
192    fn base_vectored_case() {
193        let (mut writer, reader) = crate::pipe();
194        _ = writer
195            .write_vectored(&[
196                IoSlice::new("hello ".as_bytes()),
197                IoSlice::new("world".as_bytes()),
198            ])
199            .unwrap();
200        drop(writer);
201
202        assert_eq!("hello world".to_string(), read_to_string(reader).unwrap());
203    }
204
205    #[test]
206    fn thread_case() {
207        let (writer, reader) = crate::pipe();
208        for _ in 0..1000 {
209            let mut writer = writer.clone();
210            spawn(move || {
211                writer.write_all("hello".as_bytes()).unwrap();
212            });
213        }
214        drop(writer);
215
216        assert_eq!("hello".repeat(1000), read_to_string(reader).unwrap());
217    }
218
219    #[test]
220    fn writer_err_case() {
221        let (mut writer, reader) = crate::pipe();
222        drop(reader);
223
224        assert!(writer.write("hello".as_bytes()).is_err());
225    }
226
227    #[test]
228    fn bufread_case() {
229        let (mut writer, mut reader) = crate::pipe();
230        writer.write_all("hello\n".as_bytes()).unwrap();
231        writer.write_all("world".as_bytes()).unwrap();
232        drop(writer);
233
234        let mut str = String::new();
235        assert_ne!(0, reader.read_line(&mut str).unwrap());
236        assert_eq!("hello\n".to_string(), str);
237
238        let mut str = String::new();
239        assert_ne!(0, reader.read_line(&mut str).unwrap());
240        assert_eq!("world".to_string(), str);
241
242        let mut str = String::new();
243        assert_eq!(0, reader.read_line(&mut str).unwrap());
244    }
245
246    #[test]
247    fn bufread_lines_case() {
248        let (mut writer, reader) = crate::pipe();
249        writer.write_all("hello\n".as_bytes()).unwrap();
250        writer.write_all("world".as_bytes()).unwrap();
251        drop(writer);
252
253        assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count())
254    }
255
256    #[test]
257    fn threads_write_and_read_case() {
258        let (writer, mut reader) = crate::pipe();
259
260        for _ in 0..1000 {
261            let mut writer = writer.clone();
262            spawn(move || {
263                writer.write_all(&[0; 4]).unwrap();
264            });
265
266            let mut buf = [0; 4];
267            assert_eq!(buf.len(), reader.read(&mut buf).unwrap());
268        }
269        drop(writer);
270    }
271}