Skip to main content

volans_core/muxing/
boxed.rs

1use crate::StreamMuxer;
2use futures::{AsyncRead, AsyncWrite};
3use pin_project::pin_project;
4use std::{
5    error::Error,
6    fmt, io,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11pub struct StreamMuxerBox {
12    inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
13}
14
15impl fmt::Debug for StreamMuxerBox {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        f.debug_struct("StreamMuxerBox").finish_non_exhaustive()
18    }
19}
20
21#[pin_project]
22struct Wrap<T>
23where
24    T: StreamMuxer,
25{
26    #[pin]
27    inner: T,
28}
29
30impl<T> StreamMuxer for Wrap<T>
31where
32    T: StreamMuxer + Send + 'static,
33    T::Substream: Send + 'static,
34    T::Error: Send + Sync + 'static,
35{
36    type Substream = SubstreamBox;
37    type Error = io::Error;
38
39    fn poll_inbound(
40        self: Pin<&mut Self>,
41        cx: &mut Context<'_>,
42    ) -> Poll<Result<Self::Substream, Self::Error>> {
43        self.project()
44            .inner
45            .poll_inbound(cx)
46            .map_ok(SubstreamBox::new)
47            .map_err(into_io_error)
48    }
49
50    fn poll_outbound(
51        self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53    ) -> Poll<Result<Self::Substream, Self::Error>> {
54        self.project()
55            .inner
56            .poll_outbound(cx)
57            .map_ok(SubstreamBox::new)
58            .map_err(into_io_error)
59    }
60
61    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62        self.project().inner.poll_close(cx).map_err(into_io_error)
63    }
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66        self.project().inner.poll(cx).map_err(into_io_error)
67    }
68}
69
70impl StreamMuxer for StreamMuxerBox {
71    type Substream = SubstreamBox;
72    type Error = io::Error;
73
74    fn poll_inbound(
75        self: Pin<&mut Self>,
76        cx: &mut Context<'_>,
77    ) -> Poll<Result<Self::Substream, Self::Error>> {
78        self.get_mut().inner.as_mut().poll_inbound(cx)
79    }
80
81    fn poll_outbound(
82        self: Pin<&mut Self>,
83        cx: &mut Context<'_>,
84    ) -> Poll<Result<Self::Substream, Self::Error>> {
85        self.get_mut().inner.as_mut().poll_outbound(cx)
86    }
87
88    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        self.get_mut().inner.as_mut().poll_close(cx)
90    }
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93        self.get_mut().inner.as_mut().poll(cx)
94    }
95}
96
97fn into_io_error<E>(err: E) -> io::Error
98where
99    E: Error + Send + Sync + 'static,
100{
101    io::Error::other(err)
102}
103
104impl StreamMuxerBox {
105    pub fn new<T>(muxer: T) -> StreamMuxerBox
106    where
107        T: StreamMuxer + Send + 'static,
108        T::Substream: Send + 'static,
109        T::Error: Send + Sync + 'static,
110    {
111        let wrap = Wrap { inner: muxer };
112        StreamMuxerBox {
113            inner: Box::pin(wrap),
114        }
115    }
116}
117
118pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
119
120impl SubstreamBox {
121    pub fn new<S>(substream: S) -> Self
122    where
123        S: AsyncRead + AsyncWrite + Send + 'static,
124    {
125        SubstreamBox(Box::pin(substream))
126    }
127}
128
129impl fmt::Debug for SubstreamBox {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        write!(f, "StreamMuxerBox({})", self.0.type_name())
132    }
133}
134
135trait AsyncReadWrite: AsyncRead + AsyncWrite {
136    fn type_name(&self) -> &'static str;
137}
138
139impl<S> AsyncReadWrite for S
140where
141    S: AsyncRead + AsyncWrite,
142{
143    fn type_name(&self) -> &'static str {
144        std::any::type_name::<S>()
145    }
146}
147
148impl AsyncRead for SubstreamBox {
149    fn poll_read(
150        mut self: Pin<&mut Self>,
151        cx: &mut Context<'_>,
152        buf: &mut [u8],
153    ) -> Poll<io::Result<usize>> {
154        self.0.as_mut().poll_read(cx, buf)
155    }
156
157    fn poll_read_vectored(
158        mut self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160        bufs: &mut [io::IoSliceMut<'_>],
161    ) -> Poll<io::Result<usize>> {
162        self.0.as_mut().poll_read_vectored(cx, bufs)
163    }
164}
165
166impl AsyncWrite for SubstreamBox {
167    fn poll_write(
168        mut self: Pin<&mut Self>,
169        cx: &mut Context<'_>,
170        buf: &[u8],
171    ) -> Poll<io::Result<usize>> {
172        self.0.as_mut().poll_write(cx, buf)
173    }
174
175    fn poll_write_vectored(
176        mut self: Pin<&mut Self>,
177        cx: &mut Context<'_>,
178        bufs: &[io::IoSlice<'_>],
179    ) -> Poll<io::Result<usize>> {
180        self.0.as_mut().poll_write_vectored(cx, bufs)
181    }
182
183    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
184        self.0.as_mut().poll_flush(cx)
185    }
186
187    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
188        self.0.as_mut().poll_close(cx)
189    }
190}