volans_core/muxing/
boxed.rs1use crate::StreamMuxer;
2use futures::{AsyncRead, AsyncWrite};
3use pin_project::pin_project;
4use std::{
5 error::Error,
6 fmt, io,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11pub struct StreamMuxerBox {
12 inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
13}
14
15impl fmt::Debug for StreamMuxerBox {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 f.debug_struct("StreamMuxerBox").finish_non_exhaustive()
18 }
19}
20
21#[pin_project]
22struct Wrap<T>
23where
24 T: StreamMuxer,
25{
26 #[pin]
27 inner: T,
28}
29
30impl<T> StreamMuxer for Wrap<T>
31where
32 T: StreamMuxer + Send + 'static,
33 T::Substream: Send + 'static,
34 T::Error: Send + Sync + 'static,
35{
36 type Substream = SubstreamBox;
37 type Error = io::Error;
38
39 fn poll_inbound(
40 self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 ) -> Poll<Result<Self::Substream, Self::Error>> {
43 self.project()
44 .inner
45 .poll_inbound(cx)
46 .map_ok(SubstreamBox::new)
47 .map_err(into_io_error)
48 }
49
50 fn poll_outbound(
51 self: Pin<&mut Self>,
52 cx: &mut Context<'_>,
53 ) -> Poll<Result<Self::Substream, Self::Error>> {
54 self.project()
55 .inner
56 .poll_outbound(cx)
57 .map_ok(SubstreamBox::new)
58 .map_err(into_io_error)
59 }
60
61 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62 self.project().inner.poll_close(cx).map_err(into_io_error)
63 }
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66 self.project().inner.poll(cx).map_err(into_io_error)
67 }
68}
69
70impl StreamMuxer for StreamMuxerBox {
71 type Substream = SubstreamBox;
72 type Error = io::Error;
73
74 fn poll_inbound(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 ) -> Poll<Result<Self::Substream, Self::Error>> {
78 self.get_mut().inner.as_mut().poll_inbound(cx)
79 }
80
81 fn poll_outbound(
82 self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 ) -> Poll<Result<Self::Substream, Self::Error>> {
85 self.get_mut().inner.as_mut().poll_outbound(cx)
86 }
87
88 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89 self.get_mut().inner.as_mut().poll_close(cx)
90 }
91
92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93 self.get_mut().inner.as_mut().poll(cx)
94 }
95}
96
97fn into_io_error<E>(err: E) -> io::Error
98where
99 E: Error + Send + Sync + 'static,
100{
101 io::Error::other(err)
102}
103
104impl StreamMuxerBox {
105 pub fn new<T>(muxer: T) -> StreamMuxerBox
106 where
107 T: StreamMuxer + Send + 'static,
108 T::Substream: Send + 'static,
109 T::Error: Send + Sync + 'static,
110 {
111 let wrap = Wrap { inner: muxer };
112 StreamMuxerBox {
113 inner: Box::pin(wrap),
114 }
115 }
116}
117
118pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
119
120impl SubstreamBox {
121 pub fn new<S>(substream: S) -> Self
122 where
123 S: AsyncRead + AsyncWrite + Send + 'static,
124 {
125 SubstreamBox(Box::pin(substream))
126 }
127}
128
129impl fmt::Debug for SubstreamBox {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(f, "StreamMuxerBox({})", self.0.type_name())
132 }
133}
134
135trait AsyncReadWrite: AsyncRead + AsyncWrite {
136 fn type_name(&self) -> &'static str;
137}
138
139impl<S> AsyncReadWrite for S
140where
141 S: AsyncRead + AsyncWrite,
142{
143 fn type_name(&self) -> &'static str {
144 std::any::type_name::<S>()
145 }
146}
147
148impl AsyncRead for SubstreamBox {
149 fn poll_read(
150 mut self: Pin<&mut Self>,
151 cx: &mut Context<'_>,
152 buf: &mut [u8],
153 ) -> Poll<io::Result<usize>> {
154 self.0.as_mut().poll_read(cx, buf)
155 }
156
157 fn poll_read_vectored(
158 mut self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 bufs: &mut [io::IoSliceMut<'_>],
161 ) -> Poll<io::Result<usize>> {
162 self.0.as_mut().poll_read_vectored(cx, bufs)
163 }
164}
165
166impl AsyncWrite for SubstreamBox {
167 fn poll_write(
168 mut self: Pin<&mut Self>,
169 cx: &mut Context<'_>,
170 buf: &[u8],
171 ) -> Poll<io::Result<usize>> {
172 self.0.as_mut().poll_write(cx, buf)
173 }
174
175 fn poll_write_vectored(
176 mut self: Pin<&mut Self>,
177 cx: &mut Context<'_>,
178 bufs: &[io::IoSlice<'_>],
179 ) -> Poll<io::Result<usize>> {
180 self.0.as_mut().poll_write_vectored(cx, bufs)
181 }
182
183 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
184 self.0.as_mut().poll_flush(cx)
185 }
186
187 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
188 self.0.as_mut().poll_close(cx)
189 }
190}