bramble_common/
duplex.rs

1//! Duplex pipes for testing
2
3use crate::transport::{Id, Latency};
4use futures::{
5    io,
6    lock::BiLock,
7    ready,
8    task::{AtomicWaker, Context, Poll},
9    AsyncRead, AsyncWrite,
10};
11use pin_project::pin_project;
12use std::{cmp::min as at_most, collections::VecDeque, pin::Pin, sync::Arc};
13
14/// The read end of a pipe
15#[pin_project]
16pub struct ReadPipe {
17    #[pin]
18    buf: BiLock<VecDeque<u8>>,
19    waker: Arc<AtomicWaker>,
20}
21
22/// The write end of a pipe
23#[pin_project]
24pub struct WritePipe {
25    #[pin]
26    buf: BiLock<VecDeque<u8>>,
27    waker: Arc<AtomicWaker>,
28}
29
30/// Creates a new pipe
31pub fn make_pipe() -> (ReadPipe, WritePipe) {
32    let bufs = BiLock::new(VecDeque::new());
33    let waker = Arc::new(AtomicWaker::new());
34    (
35        ReadPipe {
36            buf: bufs.0,
37            waker: waker.clone(),
38        },
39        WritePipe { buf: bufs.1, waker },
40    )
41}
42
43impl AsyncRead for ReadPipe {
44    fn poll_read(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context,
47        buf: &mut [u8],
48    ) -> Poll<IoResult<usize>> {
49        if buf.is_empty() {
50            return Poll::Ready(Ok(0));
51        }
52        let this = self.as_mut().project();
53        this.waker.register(cx.waker());
54        let mut data = ready!(this.buf.poll_lock(cx));
55        let (slice, _) = data.as_slices();
56        let available = slice.len();
57        if available == 0 {
58            return Poll::Pending;
59        }
60        let to_read = at_most(available, buf.len());
61        buf[..to_read].copy_from_slice(&slice[..to_read]);
62        data.drain(..to_read);
63        Poll::Ready(Ok(to_read))
64    }
65}
66
67impl AsyncWrite for WritePipe {
68    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<IoResult<usize>> {
69        if buf.is_empty() {
70            return Poll::Ready(Ok(0));
71        }
72        let this = self.as_mut().project();
73        let mut data = ready!(this.buf.poll_lock(cx));
74        data.extend(buf);
75        this.waker.wake();
76        Poll::Ready(Ok(buf.len()))
77    }
78
79    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<IoResult<()>> {
80        Poll::Ready(Ok(()))
81    }
82
83    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<IoResult<()>> {
84        Poll::Ready(Ok(()))
85    }
86}
87
88/// One end of a duplex pipe
89#[pin_project]
90pub struct Duplex {
91    #[pin]
92    reader: ReadPipe,
93    #[pin]
94    writer: WritePipe,
95}
96
97impl AsyncRead for Duplex {
98    fn poll_read(
99        mut self: Pin<&mut Self>,
100        cx: &mut Context,
101        buf: &mut [u8],
102    ) -> Poll<IoResult<usize>> {
103        self.as_mut().project().reader.poll_read(cx, buf)
104    }
105}
106
107impl AsyncWrite for Duplex {
108    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<IoResult<usize>> {
109        self.as_mut().project().writer.poll_write(cx, buf)
110    }
111    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<IoResult<()>> {
112        self.as_mut().project().writer.poll_flush(cx)
113    }
114    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<IoResult<()>> {
115        self.as_mut().project().writer.poll_close(cx)
116    }
117}
118
119impl Id for Duplex {
120    const ID: &'static [u8] = b"test.transport.duplex";
121}
122
123impl Latency for Duplex {
124    const MAX_LATENCY_SECONDS: u32 = 1;
125}
126
127/// Creates a new pipe-based duplex transport
128pub fn make_duplex() -> (Duplex, Duplex) {
129    let (read2, write1) = make_pipe();
130    let (read1, write2) = make_pipe();
131
132    (
133        Duplex {
134            reader: read1,
135            writer: write1,
136        },
137        Duplex {
138            reader: read2,
139            writer: write2,
140        },
141    )
142}
143
144type IoResult<T> = std::result::Result<T, io::Error>;