io_pipe/
async_pipe.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::io::{BufRead, Error, ErrorKind, IoSlice, Write};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
8use loole::{Receiver, RecvFuture, SendFuture, Sender, unbounded};
9
10use crate::state::SharedState;
11
12/// Creates a pair of asynchronous writer and reader objects.
13///
14/// This function returns a tuple containing an `AsyncWriter` and an `AsyncReader`.
15/// The `AsyncWriter` can be used to write data, which can then be read from the `AsyncReader`.
16///
17/// # Arguments
18///
19/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
20///
21/// # Returns
22///
23/// A tuple containing `(AsyncWriter, AsyncReader)`.
24///
25/// # Example
26///
27/// ```rust
28/// use io_pipe::async_pipe;
29///
30/// let (writer, reader) = async_pipe();
31/// // Use writer to write data and reader to read data asynchronously
32/// ```
33pub fn async_pipe() -> (AsyncWriter, AsyncReader) {
34    let (sender, receiver) = unbounded();
35    let state = SharedState::default();
36
37    (
38        AsyncWriter {
39            sender,
40            state: state.clone(),
41            write_state: None,
42        },
43        AsyncReader {
44            receiver,
45            state,
46            buf: VecDeque::new(),
47            reading: None,
48        },
49    )
50}
51
52/// Creates a pair of synchronous writer and asynchronous reader objects.
53///
54/// This function returns a tuple containing an `Writer` and an `AsyncReader`.
55/// The `Writer` can be used to write data, which can then be read from the `AsyncReader`.
56///
57/// # Arguments
58///
59/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
60///
61/// # Returns
62///
63/// A tuple containing `(Writer, AsyncReader)`.
64///
65/// # Example
66///
67/// ```rust
68/// use io_pipe::async_reader_pipe;
69///
70/// let (writer, reader) = async_reader_pipe();
71/// // Use writer to write data synchronously and reader to read data asynchronously
72/// ```
73#[cfg(feature = "async")]
74#[cfg(feature = "sync")]
75pub fn async_reader_pipe() -> (crate::Writer, AsyncReader) {
76    let (sender, receiver) = unbounded();
77    let state = SharedState::default();
78
79    (
80        crate::Writer {
81            sender,
82            state: state.clone(),
83        },
84        AsyncReader {
85            receiver,
86            state,
87            buf: VecDeque::new(),
88            reading: None,
89        },
90    )
91}
92
93/// Creates a pair of synchronous writer and asynchronous reader objects.
94///
95/// This function returns a tuple containing an `Writer` and an `AsyncReader`.
96/// The `Writer` can be used to write data, which can then be read from the `AsyncReader`.
97///
98/// # Arguments
99///
100/// * `buffer_size` - The size of the internal buffer used for communication between the writer and reader.
101///
102/// # Returns
103///
104/// A tuple containing `(Writer, AsyncReader)`.
105///
106/// # Example
107///
108/// ```rust
109/// use io_pipe::async_writer_pipe;
110///
111/// let (writer, reader) = async_writer_pipe();
112/// // Use writer to write data synchronously and reader to read data asynchronously
113/// ```
114#[cfg(feature = "async")]
115#[cfg(feature = "sync")]
116pub fn async_writer_pipe() -> (AsyncWriter, crate::Reader) {
117    let (sender, receiver) = unbounded();
118    let state = SharedState::default();
119
120    (
121        AsyncWriter {
122            sender,
123            state: state.clone(),
124            write_state: None,
125        },
126        crate::Reader {
127            receiver,
128            state,
129            buf: VecDeque::new(),
130        },
131    )
132}
133
134#[derive(Debug)]
135struct WriteState {
136    send_future: SendFuture<()>,
137    n: usize,
138}
139
140/// An asynchronous writer that implements `AsyncWrite`.
141///
142/// This struct allows writing data asynchronously, which can be read from a corresponding `AsyncReader`.
143#[derive(Debug)]
144pub struct AsyncWriter {
145    sender: Sender<()>,
146    write_state: Option<WriteState>,
147    state: SharedState,
148}
149
150impl Clone for AsyncWriter {
151    fn clone(&self) -> Self {
152        Self {
153            sender: self.sender.clone(),
154            write_state: None,
155            state: self.state.clone(),
156        }
157    }
158}
159
160impl AsyncWriter {
161    fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<usize>> {
162        debug_assert!(self.write_state.is_some());
163        match Pin::new(&mut self.write_state.as_mut().unwrap().send_future).poll(cx) {
164            Poll::Ready(Ok(_)) => {
165                let n = self.write_state.as_ref().map(|s| s.n).unwrap();
166                self.write_state = None;
167                Poll::Ready(Ok(n))
168            }
169            Poll::Ready(Err(e)) => {
170                self.write_state = None;
171                Poll::Ready(Err(Error::new(ErrorKind::WriteZero, e)))
172            }
173            Poll::Pending => Poll::Pending,
174        }
175    }
176}
177
178impl AsyncWrite for AsyncWriter {
179    fn poll_write(
180        mut self: Pin<&mut Self>,
181        cx: &mut Context<'_>,
182        buf: &[u8],
183    ) -> Poll<std::io::Result<usize>> {
184        if self.write_state.is_none() {
185            self.write_state = Some(WriteState {
186                send_future: self.sender.send_async(()),
187                n: self.state.write(buf)?,
188            });
189        }
190        match self.poll_send(cx)? {
191            Poll::Ready(n) => Poll::Ready(Ok(n)),
192            Poll::Pending => Poll::Pending,
193        }
194    }
195
196    fn poll_write_vectored(
197        mut self: Pin<&mut Self>,
198        cx: &mut Context<'_>,
199        bufs: &[IoSlice<'_>],
200    ) -> Poll<std::io::Result<usize>> {
201        if self.write_state.is_none() {
202            self.write_state = Some(WriteState {
203                send_future: self.sender.send_async(()),
204                n: self.state.write_vectored(bufs)?,
205            });
206        }
207        match self.poll_send(cx)? {
208            Poll::Ready(n) => Poll::Ready(Ok(n)),
209            Poll::Pending => Poll::Pending,
210        }
211    }
212
213    fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
214        Poll::Ready(self.state.flush())
215    }
216
217    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
218        self.sender.close();
219        Poll::Ready(Ok(()))
220    }
221}
222
223/// An asynchronous reader that implements `AsyncRead` and `AsyncBufRead`.
224///
225/// This struct allows reading data asynchronously that was written to a corresponding `AsyncWriter`.
226#[derive(Debug)]
227pub struct AsyncReader {
228    receiver: Receiver<()>,
229    buf: VecDeque<u8>,
230    reading: Option<RecvFuture<()>>,
231    state: SharedState,
232}
233
234impl AsyncBufRead for AsyncReader {
235    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
236        let this = self.get_mut();
237        while this.buf.is_empty() {
238            let n = this.state.copy_to(&mut this.buf)?;
239            if n == 0 {
240                if this.reading.is_none() {
241                    this.reading = Some(this.receiver.recv_async())
242                }
243
244                match Pin::new(this.reading.as_mut().unwrap()).poll(cx) {
245                    Poll::Ready(Ok(_)) => {
246                        this.reading = None;
247                    }
248                    Poll::Ready(Err(_)) => {
249                        this.reading = None;
250                        break;
251                    }
252                    Poll::Pending => return Poll::Pending,
253                }
254            }
255        }
256
257        if this.buf.is_empty() {
258            _ = this.state.copy_to(&mut this.buf)?;
259        }
260
261        Poll::Ready(this.buf.fill_buf())
262    }
263
264    fn consume(mut self: Pin<&mut Self>, amt: usize) {
265        self.buf.consume(amt)
266    }
267}
268
269impl AsyncRead for AsyncReader {
270    fn poll_read(
271        mut self: Pin<&mut Self>,
272        cx: &mut Context<'_>,
273        mut buf: &mut [u8],
274    ) -> Poll<std::io::Result<usize>> {
275        let data = match self.as_mut().poll_fill_buf(cx)? {
276            Poll::Ready(buf) => buf,
277            Poll::Pending => return Poll::Pending,
278        };
279        let n = buf.write(data)?;
280        self.consume(n);
281        Poll::Ready(Ok(n))
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::io::IoSlice;
288    use std::thread::spawn;
289
290    use futures::{
291        AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt, TryStreamExt, executor::block_on,
292    };
293
294    #[test]
295    fn base_write_case() {
296        block_on(async {
297            // Checking non-blocking buffer inside writer
298            let (mut writer, reader) = crate::async_pipe();
299            for _ in 0..1000 {
300                writer.write_all("hello".as_bytes()).await.unwrap();
301            }
302            drop(reader)
303        })
304    }
305
306    #[test]
307    fn base_read_case() {
308        block_on(async {
309            let (mut writer, mut reader) = crate::async_pipe();
310
311            writer.write_all("hello ".as_bytes()).await.unwrap();
312            writer.write_all("world".as_bytes()).await.unwrap();
313            drop(writer);
314
315            let mut str = String::new();
316            reader.read_to_string(&mut str).await.unwrap();
317
318            assert_eq!("hello world".to_string(), str);
319        });
320    }
321
322    #[test]
323    fn base_vectored_case() {
324        block_on(async {
325            let (mut writer, mut reader) = crate::async_pipe();
326            _ = writer
327                .write_vectored(&[
328                    IoSlice::new("hello ".as_bytes()),
329                    IoSlice::new("world".as_bytes()),
330                ])
331                .await
332                .unwrap();
333            drop(writer);
334
335            let mut str = String::new();
336            reader.read_to_string(&mut str).await.unwrap();
337
338            assert_eq!("hello world".to_string(), str);
339        });
340    }
341
342    #[test]
343    fn thread_case() {
344        block_on(async {
345            let (writer, mut reader) = crate::async_pipe();
346            let writers = (0..1000).map(|_| writer.clone()).collect::<Vec<_>>();
347            let writers_len = writers.len();
348            drop(writer);
349            let write_fut = futures::stream::iter(writers)
350                .map(|mut writer| async move { writer.write_all("hello".as_bytes()).await })
351                .buffer_unordered(writers_len)
352                .try_collect::<Vec<()>>();
353
354            let mut str = String::new();
355            let read_fut = reader.read_to_string(&mut str);
356            futures::join!(
357                async {
358                    write_fut.await.unwrap();
359                },
360                async { read_fut.await.unwrap() }
361            );
362
363            assert_eq!("hello".repeat(writers_len), str);
364        });
365    }
366
367    #[test]
368    fn writer_err_case() {
369        block_on(async {
370            let (mut writer, reader) = crate::async_pipe();
371            drop(reader);
372
373            assert!(writer.write("hello".as_bytes()).await.is_err());
374        });
375    }
376
377    #[test]
378    fn bufread_case() {
379        block_on(async {
380            let (mut writer, mut reader) = crate::async_pipe();
381            writer.write_all("hello\n".as_bytes()).await.unwrap();
382            writer.write_all("world".as_bytes()).await.unwrap();
383            drop(writer);
384
385            let mut str = String::new();
386            assert_ne!(0, reader.read_line(&mut str).await.unwrap());
387            assert_eq!("hello\n".to_string(), str);
388
389            let mut str = String::new();
390            assert_ne!(0, reader.read_line(&mut str).await.unwrap());
391            assert_eq!("world".to_string(), str);
392
393            let mut str = String::new();
394            assert_eq!(0, reader.read_line(&mut str).await.unwrap());
395        });
396    }
397
398    #[test]
399    fn bufread_lines_case() {
400        block_on(async {
401            let (mut writer, reader) = crate::async_pipe();
402            writer.write_all("hello\n".as_bytes()).await.unwrap();
403            writer.write_all("world".as_bytes()).await.unwrap();
404            drop(writer);
405
406            assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count().await)
407        });
408    }
409
410    #[test]
411    fn thread_writer_case() {
412        use std::io::Write;
413
414        let (writer, mut reader) = crate::async_reader_pipe();
415        for _ in 0..1000 {
416            let mut writer = writer.clone();
417            spawn(move || {
418                writer.write_all("hello".as_bytes()).unwrap();
419            });
420        }
421        drop(writer);
422
423        block_on(async {
424            let mut str = String::new();
425            reader.read_to_string(&mut str).await.unwrap();
426
427            assert_eq!("hello".repeat(1000), str);
428        })
429    }
430
431    #[test]
432    fn thread_reader_case() {
433        use std::io::Read;
434
435        let (writer, mut reader) = crate::async_writer_pipe();
436        for _ in 0..1000 {
437            let mut writer = writer.clone();
438            spawn(move || {
439                block_on(async {
440                    writer.write_all("hello".as_bytes()).await.unwrap();
441                })
442            });
443        }
444        drop(writer);
445
446        let mut str = String::new();
447        reader.read_to_string(&mut str).unwrap();
448
449        assert_eq!("hello".repeat(1000), str);
450    }
451
452    #[test]
453    fn threads_write_and_read_case() {
454        let (writer, mut reader) = crate::async_pipe();
455
456        for _ in 0..1000 {
457            let mut writer = writer.clone();
458
459            spawn(move || {
460                block_on(async {
461                    writer.write_all(&[0; 4]).await.unwrap();
462                })
463            });
464
465            block_on(async {
466                let mut buf = [0; 4];
467                assert_eq!(buf.len(), reader.read(&mut buf).await.unwrap());
468            })
469        }
470        drop(writer);
471    }
472}