io_pipe/
async_pipe.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::io::{BufRead, Error, ErrorKind, IoSlice, Write};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
8use loole::{Receiver, RecvFuture, SendFuture, Sender, unbounded};
9
10use crate::state::SharedState;
11
12/// Creates a pair of asynchronous writer and reader objects.
13///
14/// This function returns a tuple containing an `AsyncWriter` and an `AsyncReader`.
15/// The `AsyncWriter` can be used to write data, which can then be read from the `AsyncReader`.
16///
17/// # Arguments
18///
19/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
20///
21/// # Returns
22///
23/// A tuple containing `(AsyncWriter, AsyncReader)`.
24///
25/// # Example
26///
27/// ```rust
28/// use io_pipe::async_pipe;
29///
30/// let (writer, reader) = async_pipe();
31/// // Use writer to write data and reader to read data asynchronously
32/// ```
33pub fn async_pipe() -> (AsyncWriter, AsyncReader) {
34    let (sender, receiver) = unbounded();
35    let state = SharedState::default();
36
37    (
38        AsyncWriter {
39            sender,
40            state: state.clone(),
41            send_future: None,
42        },
43        AsyncReader {
44            receiver,
45            state,
46            buf: VecDeque::new(),
47            reading: None,
48        },
49    )
50}
51
52/// Creates a pair of synchronous writer and asynchronous reader objects.
53///
54/// This function returns a tuple containing an `Writer` and an `AsyncReader`.
55/// The `Writer` can be used to write data, which can then be read from the `AsyncReader`.
56///
57/// # Arguments
58///
59/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
60///
61/// # Returns
62///
63/// A tuple containing `(Writer, AsyncReader)`.
64///
65/// # Example
66///
67/// ```rust
68/// use io_pipe::async_reader_pipe;
69///
70/// let (writer, reader) = async_reader_pipe();
71/// // Use writer to write data synchronously and reader to read data asynchronously
72/// ```
73#[cfg(feature = "async")]
74#[cfg(feature = "sync")]
75pub fn async_reader_pipe() -> (crate::Writer, AsyncReader) {
76    let (sender, receiver) = unbounded();
77    let state = SharedState::default();
78
79    (
80        crate::Writer {
81            sender,
82            state: state.clone(),
83        },
84        AsyncReader {
85            receiver,
86            state,
87            buf: VecDeque::new(),
88            reading: None,
89        },
90    )
91}
92
93/// Creates a pair of synchronous writer and asynchronous reader objects.
94///
95/// This function returns a tuple containing an `Writer` and an `AsyncReader`.
96/// The `Writer` can be used to write data, which can then be read from the `AsyncReader`.
97///
98/// # Arguments
99///
100/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
101///
102/// # Returns
103///
104/// A tuple containing `(Writer, AsyncReader)`.
105///
106/// # Example
107///
108/// ```rust
109/// use io_pipe::async_writer_pipe;
110///
111/// let (writer, reader) = async_writer_pipe();
112/// // Use writer to write data synchronously and reader to read data asynchronously
113/// ```
114#[cfg(feature = "async")]
115#[cfg(feature = "sync")]
116pub fn async_writer_pipe() -> (AsyncWriter, crate::Reader) {
117    let (sender, receiver) = unbounded();
118    let state = SharedState::default();
119
120    (
121        AsyncWriter {
122            sender,
123            state: state.clone(),
124            send_future: None,
125        },
126        crate::Reader {
127            receiver,
128            state,
129            buf: VecDeque::new(),
130        },
131    )
132}
133
134/// An asynchronous writer that implements `AsyncWrite`.
135///
136/// This struct allows writing data asynchronously, which can be read from a corresponding `AsyncReader`.
137#[derive(Debug)]
138pub struct AsyncWriter {
139    sender: Sender<()>,
140    send_future: Option<SendFuture<()>>,
141    state: SharedState,
142}
143
144impl Clone for AsyncWriter {
145    fn clone(&self) -> Self {
146        Self {
147            sender: self.sender.clone(),
148            send_future: None,
149            state: self.state.clone(),
150        }
151    }
152}
153
154impl AsyncWriter {
155    fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
156        if self.send_future.is_none() {
157            self.send_future = Some(self.sender.send_async(()));
158        }
159        match Pin::new(&mut self.send_future.as_mut().unwrap()).poll(cx) {
160            Poll::Ready(Ok(_)) => {
161                self.send_future = None;
162                Poll::Ready(Ok(()))
163            }
164            Poll::Ready(Err(e)) => {
165                self.send_future = None;
166                Poll::Ready(Err(Error::new(ErrorKind::WriteZero, e)))
167            }
168            Poll::Pending => Poll::Pending,
169        }
170    }
171}
172
173impl AsyncWrite for AsyncWriter {
174    fn poll_write(
175        mut self: Pin<&mut Self>,
176        cx: &mut Context<'_>,
177        buf: &[u8],
178    ) -> Poll<std::io::Result<usize>> {
179        let n = self.state.write(buf)?;
180        match self.poll_send(cx)? {
181            Poll::Ready(_) => Poll::Ready(Ok(n)),
182            Poll::Pending => Poll::Pending,
183        }
184    }
185
186    fn poll_write_vectored(
187        mut self: Pin<&mut Self>,
188        cx: &mut Context<'_>,
189        bufs: &[IoSlice<'_>],
190    ) -> Poll<std::io::Result<usize>> {
191        let n = self.state.write_vectored(bufs)?;
192        match self.poll_send(cx)? {
193            Poll::Ready(_) => Poll::Ready(Ok(n)),
194            Poll::Pending => Poll::Pending,
195        }
196    }
197
198    fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
199        Poll::Ready(self.state.flush())
200    }
201
202    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
203        self.sender.close();
204        Poll::Ready(Ok(()))
205    }
206}
207
208/// An asynchronous reader that implements `AsyncRead` and `AsyncBufRead`.
209///
210/// This struct allows reading data asynchronously that was written to a corresponding `AsyncWriter`.
211#[derive(Debug)]
212pub struct AsyncReader {
213    receiver: Receiver<()>,
214    buf: VecDeque<u8>,
215    reading: Option<RecvFuture<()>>,
216    state: SharedState,
217}
218
219impl AsyncBufRead for AsyncReader {
220    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
221        let this = self.get_mut();
222        while this.buf.is_empty() {
223            let n = this.state.copy_to(&mut this.buf)?;
224            if n == 0 {
225                if this.reading.is_none() {
226                    this.reading = Some(this.receiver.recv_async())
227                }
228
229                match Pin::new(this.reading.as_mut().unwrap()).poll(cx) {
230                    Poll::Ready(Ok(_)) => {
231                        this.reading = None;
232                    }
233                    Poll::Ready(Err(_)) => {
234                        this.reading = None;
235                        break;
236                    }
237                    Poll::Pending => return Poll::Pending,
238                }
239            }
240        }
241
242        if this.buf.is_empty() {
243            _ = this.state.copy_to(&mut this.buf)?;
244        }
245
246        Poll::Ready(this.buf.fill_buf())
247    }
248
249    fn consume(mut self: Pin<&mut Self>, amt: usize) {
250        self.buf.consume(amt)
251    }
252}
253
254impl AsyncRead for AsyncReader {
255    fn poll_read(
256        mut self: Pin<&mut Self>,
257        cx: &mut Context<'_>,
258        mut buf: &mut [u8],
259    ) -> Poll<std::io::Result<usize>> {
260        let data = match self.as_mut().poll_fill_buf(cx)? {
261            Poll::Ready(buf) => buf,
262            Poll::Pending => return Poll::Pending,
263        };
264        let n = buf.write(data)?;
265        self.consume(n);
266        Poll::Ready(Ok(n))
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::io::IoSlice;
273    use std::thread::spawn;
274
275    use futures::{
276        AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt, TryStreamExt, executor::block_on,
277    };
278
279    #[test]
280    fn base_write_case() {
281        block_on(async {
282            // Checking non-blocking buffer inside writer
283            let (mut writer, reader) = crate::async_pipe();
284            for _ in 0..1000 {
285                writer.write_all("hello".as_bytes()).await.unwrap();
286            }
287            drop(reader)
288        })
289    }
290
291    #[test]
292    fn base_read_case() {
293        block_on(async {
294            let (mut writer, mut reader) = crate::async_pipe();
295
296            writer.write_all("hello ".as_bytes()).await.unwrap();
297            writer.write_all("world".as_bytes()).await.unwrap();
298            drop(writer);
299
300            let mut str = String::new();
301            reader.read_to_string(&mut str).await.unwrap();
302
303            assert_eq!("hello world".to_string(), str);
304        });
305    }
306
307    #[test]
308    fn base_vectored_case() {
309        block_on(async {
310            let (mut writer, mut reader) = crate::async_pipe();
311            _ = writer
312                .write_vectored(&[
313                    IoSlice::new("hello ".as_bytes()),
314                    IoSlice::new("world".as_bytes()),
315                ])
316                .await
317                .unwrap();
318            drop(writer);
319
320            let mut str = String::new();
321            reader.read_to_string(&mut str).await.unwrap();
322
323            assert_eq!("hello world".to_string(), str);
324        });
325    }
326
327    #[test]
328    fn thread_case() {
329        block_on(async {
330            let (writer, mut reader) = crate::async_pipe();
331            let writers = (0..1000).map(|_| writer.clone()).collect::<Vec<_>>();
332            let writers_len = writers.len();
333            drop(writer);
334            let write_fut = futures::stream::iter(writers)
335                .map(|mut writer| async move { writer.write_all("hello".as_bytes()).await })
336                .buffer_unordered(writers_len)
337                .try_collect::<Vec<()>>();
338
339            let mut str = String::new();
340            let read_fut = reader.read_to_string(&mut str);
341            futures::join!(
342                async {
343                    write_fut.await.unwrap();
344                },
345                async { read_fut.await.unwrap() }
346            );
347
348            assert_eq!("hello".repeat(writers_len), str);
349        });
350    }
351
352    #[test]
353    fn writer_err_case() {
354        block_on(async {
355            let (mut writer, reader) = crate::async_pipe();
356            drop(reader);
357
358            assert!(writer.write("hello".as_bytes()).await.is_err());
359        });
360    }
361
362    #[test]
363    fn bufread_case() {
364        block_on(async {
365            let (mut writer, mut reader) = crate::async_pipe();
366            writer.write_all("hello\n".as_bytes()).await.unwrap();
367            writer.write_all("world".as_bytes()).await.unwrap();
368            drop(writer);
369
370            let mut str = String::new();
371            assert_ne!(0, reader.read_line(&mut str).await.unwrap());
372            assert_eq!("hello\n".to_string(), str);
373
374            let mut str = String::new();
375            assert_ne!(0, reader.read_line(&mut str).await.unwrap());
376            assert_eq!("world".to_string(), str);
377
378            let mut str = String::new();
379            assert_eq!(0, reader.read_line(&mut str).await.unwrap());
380        });
381    }
382
383    #[test]
384    fn bufread_lines_case() {
385        block_on(async {
386            let (mut writer, reader) = crate::async_pipe();
387            writer.write_all("hello\n".as_bytes()).await.unwrap();
388            writer.write_all("world".as_bytes()).await.unwrap();
389            drop(writer);
390
391            assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count().await)
392        });
393    }
394
395    #[test]
396    fn thread_writer_case() {
397        use std::io::Write;
398
399        let (writer, mut reader) = crate::async_reader_pipe();
400        for _ in 0..1000 {
401            let mut writer = writer.clone();
402            spawn(move || {
403                writer.write_all("hello".as_bytes()).unwrap();
404            });
405        }
406        drop(writer);
407
408        block_on(async {
409            let mut str = String::new();
410            reader.read_to_string(&mut str).await.unwrap();
411
412            assert_eq!("hello".repeat(1000), str);
413        })
414    }
415
416    #[test]
417    fn thread_reader_case() {
418        use std::io::Read;
419
420        let (writer, mut reader) = crate::async_writer_pipe();
421        for _ in 0..1000 {
422            let mut writer = writer.clone();
423            spawn(move || {
424                block_on(async {
425                    writer.write_all("hello".as_bytes()).await.unwrap();
426                })
427            });
428        }
429        drop(writer);
430
431        let mut str = String::new();
432        reader.read_to_string(&mut str).unwrap();
433
434        assert_eq!("hello".repeat(1000), str);
435    }
436
437    #[test]
438    fn threads_write_and_read_case() {
439        let (writer, mut reader) = crate::async_pipe();
440
441        for _ in 0..1000 {
442            let mut writer = writer.clone();
443
444            spawn(move || {
445                block_on(async {
446                    writer.write_all(&[0; 4]).await.unwrap();
447                })
448            });
449
450            block_on(async {
451                let mut buf = [0; 4];
452                assert_eq!(buf.len(), reader.read(&mut buf).await.unwrap());
453            })
454        }
455        drop(writer);
456    }
457}