async_ringbuffer/
duplex.rs

1use crate::{ring_buffer, Reader, Writer};
2
3use core::pin::Pin;
4use core::task::Context;
5use futures_io::{AsyncRead, AsyncWrite, Result};
6use std::task::Poll;
7
8/// An AsyncRead + AsyncWrite stream, which reads from one
9/// ringbuffer, and writes to another.
10pub struct Duplex {
11    r: Reader,
12    w: Writer,
13}
14
15impl Duplex {
16    /// Create a pair of duplex AsyncRead + AsyncWrite streams,
17    /// for duplex communication between two entities (eg. client and server).
18    pub fn pair(capacity: usize) -> (Duplex, Duplex) {
19        let (a_w, a_r) = ring_buffer(capacity);
20        let (b_w, b_r) = ring_buffer(capacity);
21
22        (Duplex { r: a_r, w: b_w }, Duplex { r: b_r, w: a_w })
23    }
24
25    /// Split duplex AsyncRead + AsyncWrite stream into separate
26    /// (AsyncRead, AsyncWrite) halves.
27    pub fn split(self) -> (Reader, Writer) {
28        let Duplex { r, w } = self;
29        (r, w)
30    }
31}
32
33impl AsyncRead for Duplex {
34    fn poll_read(
35        mut self: Pin<&mut Self>,
36        cx: &mut Context,
37        buf: &mut [u8],
38    ) -> Poll<Result<usize>> {
39        Pin::new(&mut self.r).poll_read(cx, buf)
40    }
41}
42impl AsyncWrite for Duplex {
43    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
44        Pin::new(&mut self.w).poll_write(cx, buf)
45    }
46    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
47        Pin::new(&mut self.w).poll_flush(cx)
48    }
49    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
50        Pin::new(&mut self.w).poll_close(cx)
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use futures::io::{AsyncReadExt, AsyncWriteExt};
58
59    #[test]
60    fn back_and_forth() {
61        let (mut client, mut server) = Duplex::pair(1024);
62        use futures::executor::block_on;
63
64        block_on(async {
65            client.write(&[1, 2, 3, 4, 5]).await.unwrap();
66            server.write(&[5, 4, 3, 2, 1]).await.unwrap();
67            let mut buf = [0; 5];
68            server.read(&mut buf).await.unwrap();
69            assert_eq!(&buf, &[1, 2, 3, 4, 5]);
70            client.read(&mut buf).await.unwrap();
71            assert_eq!(&buf, &[5, 4, 3, 2, 1]);
72
73            server.write(&[6, 7, 8, 9, 10]).await.unwrap();
74            client.read(&mut buf).await.unwrap();
75            assert_eq!(&buf, &[6, 7, 8, 9, 10]);
76
77            let mut buf = [0; 3];
78            let (mut sr, mut sw) = server.split();
79            sw.write(&[1, 2, 3]).await.unwrap();
80            client.read(&mut buf).await.unwrap();
81            assert_eq!(&buf, &[1, 2, 3]);
82
83            client.write(&[3, 2, 1]).await.unwrap();
84	    sr.read(&mut buf).await.unwrap();
85	    assert_eq!(&buf, &[3, 2, 1]);
86        });
87    }
88}