airio_core/muxing/
boxed.rs1use crate::muxing::{StreamMuxer, StreamMuxerEvent};
2use futures::{AsyncRead, AsyncWrite};
3use pin_project::pin_project;
4use std::{
5 error::Error,
6 fmt, io,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11pub struct StreamMuxerBox {
12 inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
13}
14
15#[pin_project]
16struct Wrap<T>
17where
18 T: StreamMuxer,
19{
20 #[pin]
21 inner: T,
22}
23
24impl<T> StreamMuxer for Wrap<T>
25where
26 T: StreamMuxer + Send + 'static,
27 T::Substream: Send + 'static,
28 T::Error: Send + Sync + 'static,
29{
30 type Substream = SubstreamBox;
31 type Error = io::Error;
32
33 fn poll_inbound(
34 self: Pin<&mut Self>,
35 cx: &mut Context<'_>,
36 ) -> Poll<Result<Self::Substream, Self::Error>> {
37 self.project()
38 .inner
39 .poll_inbound(cx)
40 .map_ok(SubstreamBox::new)
41 .map_err(into_io_error)
42 }
43
44 fn poll_outbound(
45 self: Pin<&mut Self>,
46 cx: &mut Context<'_>,
47 ) -> Poll<Result<Self::Substream, Self::Error>> {
48 self.project()
49 .inner
50 .poll_outbound(cx)
51 .map_ok(SubstreamBox::new)
52 .map_err(into_io_error)
53 }
54
55 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56 self.project().inner.poll_close(cx).map_err(into_io_error)
57 }
58
59 fn poll(
60 self: Pin<&mut Self>,
61 cx: &mut Context<'_>,
62 ) -> Poll<Result<super::StreamMuxerEvent, Self::Error>> {
63 self.project().inner.poll(cx).map_err(into_io_error)
64 }
65}
66
67impl StreamMuxer for StreamMuxerBox {
68 type Substream = SubstreamBox;
69 type Error = io::Error;
70
71 fn poll_inbound(
72 self: Pin<&mut Self>,
73 cx: &mut Context<'_>,
74 ) -> Poll<Result<Self::Substream, Self::Error>> {
75 self.get_mut().inner.as_mut().poll_inbound(cx)
76 }
77
78 fn poll_outbound(
79 self: Pin<&mut Self>,
80 cx: &mut Context<'_>,
81 ) -> Poll<Result<Self::Substream, Self::Error>> {
82 self.get_mut().inner.as_mut().poll_outbound(cx)
83 }
84
85 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 self.get_mut().inner.as_mut().poll_close(cx)
87 }
88
89 fn poll(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
93 self.get_mut().inner.as_mut().poll(cx)
94 }
95}
96
97fn into_io_error<E>(err: E) -> io::Error
98where
99 E: Error + Send + Sync + 'static,
100{
101 io::Error::other(err)
102}
103
104impl StreamMuxerBox {
105 pub fn new<T>(muxer: T) -> StreamMuxerBox
106 where
107 T: StreamMuxer + Send + 'static,
108 T::Substream: Send + 'static,
109 T::Error: Send + Sync + 'static,
110 {
111 let wrap = Wrap { inner: muxer };
112 StreamMuxerBox {
113 inner: Box::pin(wrap),
114 }
115 }
116}
117
118pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
119
120impl SubstreamBox {
121 pub fn new<S>(substream: S) -> Self
122 where
123 S: AsyncRead + AsyncWrite + Send + 'static,
124 {
125 SubstreamBox(Box::pin(substream))
126 }
127}
128
129impl fmt::Debug for SubstreamBox {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(f, "StreamMuxerBox({})", self.0.type_name())
132 }
133}
134
135trait AsyncReadWrite: AsyncRead + AsyncWrite {
136 fn type_name(&self) -> &'static str;
137}
138
139impl<S> AsyncReadWrite for S
140where
141 S: AsyncRead + AsyncWrite,
142{
143 fn type_name(&self) -> &'static str {
144 std::any::type_name::<S>()
145 }
146}
147
148impl AsyncRead for SubstreamBox {
149 fn poll_read(
150 mut self: Pin<&mut Self>,
151 cx: &mut Context<'_>,
152 buf: &mut [u8],
153 ) -> Poll<io::Result<usize>> {
154 self.0.as_mut().poll_read(cx, buf)
155 }
156
157 fn poll_read_vectored(
158 mut self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 bufs: &mut [io::IoSliceMut<'_>],
161 ) -> Poll<io::Result<usize>> {
162 self.0.as_mut().poll_read_vectored(cx, bufs)
163 }
164}
165
166impl AsyncWrite for SubstreamBox {
167 fn poll_write(
168 mut self: Pin<&mut Self>,
169 cx: &mut Context<'_>,
170 buf: &[u8],
171 ) -> Poll<io::Result<usize>> {
172 self.0.as_mut().poll_write(cx, buf)
173 }
174
175 fn poll_write_vectored(
176 mut self: Pin<&mut Self>,
177 cx: &mut Context<'_>,
178 bufs: &[io::IoSlice<'_>],
179 ) -> Poll<io::Result<usize>> {
180 self.0.as_mut().poll_write_vectored(cx, bufs)
181 }
182
183 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
184 self.0.as_mut().poll_flush(cx)
185 }
186
187 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
188 self.0.as_mut().poll_close(cx)
189 }
190}