1use crate::transport::{Id, Latency};
4use futures::{
5 io,
6 lock::BiLock,
7 ready,
8 task::{AtomicWaker, Context, Poll},
9 AsyncRead, AsyncWrite,
10};
11use pin_project::pin_project;
12use std::{cmp::min as at_most, collections::VecDeque, pin::Pin, sync::Arc};
13
14#[pin_project]
16pub struct ReadPipe {
17 #[pin]
18 buf: BiLock<VecDeque<u8>>,
19 waker: Arc<AtomicWaker>,
20}
21
22#[pin_project]
24pub struct WritePipe {
25 #[pin]
26 buf: BiLock<VecDeque<u8>>,
27 waker: Arc<AtomicWaker>,
28}
29
30pub fn make_pipe() -> (ReadPipe, WritePipe) {
32 let bufs = BiLock::new(VecDeque::new());
33 let waker = Arc::new(AtomicWaker::new());
34 (
35 ReadPipe {
36 buf: bufs.0,
37 waker: waker.clone(),
38 },
39 WritePipe { buf: bufs.1, waker },
40 )
41}
42
43impl AsyncRead for ReadPipe {
44 fn poll_read(
45 mut self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &mut [u8],
48 ) -> Poll<IoResult<usize>> {
49 if buf.is_empty() {
50 return Poll::Ready(Ok(0));
51 }
52 let this = self.as_mut().project();
53 this.waker.register(cx.waker());
54 let mut data = ready!(this.buf.poll_lock(cx));
55 let (slice, _) = data.as_slices();
56 let available = slice.len();
57 if available == 0 {
58 return Poll::Pending;
59 }
60 let to_read = at_most(available, buf.len());
61 buf[..to_read].copy_from_slice(&slice[..to_read]);
62 data.drain(..to_read);
63 Poll::Ready(Ok(to_read))
64 }
65}
66
67impl AsyncWrite for WritePipe {
68 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<IoResult<usize>> {
69 if buf.is_empty() {
70 return Poll::Ready(Ok(0));
71 }
72 let this = self.as_mut().project();
73 let mut data = ready!(this.buf.poll_lock(cx));
74 data.extend(buf);
75 this.waker.wake();
76 Poll::Ready(Ok(buf.len()))
77 }
78
79 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<IoResult<()>> {
80 Poll::Ready(Ok(()))
81 }
82
83 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<IoResult<()>> {
84 Poll::Ready(Ok(()))
85 }
86}
87
88#[pin_project]
90pub struct Duplex {
91 #[pin]
92 reader: ReadPipe,
93 #[pin]
94 writer: WritePipe,
95}
96
97impl AsyncRead for Duplex {
98 fn poll_read(
99 mut self: Pin<&mut Self>,
100 cx: &mut Context,
101 buf: &mut [u8],
102 ) -> Poll<IoResult<usize>> {
103 self.as_mut().project().reader.poll_read(cx, buf)
104 }
105}
106
107impl AsyncWrite for Duplex {
108 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<IoResult<usize>> {
109 self.as_mut().project().writer.poll_write(cx, buf)
110 }
111 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<IoResult<()>> {
112 self.as_mut().project().writer.poll_flush(cx)
113 }
114 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<IoResult<()>> {
115 self.as_mut().project().writer.poll_close(cx)
116 }
117}
118
119impl Id for Duplex {
120 const ID: &'static [u8] = b"test.transport.duplex";
121}
122
123impl Latency for Duplex {
124 const MAX_LATENCY_SECONDS: u32 = 1;
125}
126
127pub fn make_duplex() -> (Duplex, Duplex) {
129 let (read2, write1) = make_pipe();
130 let (read1, write2) = make_pipe();
131
132 (
133 Duplex {
134 reader: read1,
135 writer: write1,
136 },
137 Duplex {
138 reader: read2,
139 writer: write2,
140 },
141 )
142}
143
144type IoResult<T> = std::result::Result<T, io::Error>;