volans_core/transport/
boxed.rs1use std::{
2 error, io,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::{TryFutureExt, ready};
8
9use crate::{Listener, ListenerEvent, Multiaddr, Transport, TransportError};
10
11trait Abstract<O> {
12 fn dial(&self, addr: Multiaddr) -> Result<BoxedUpgrade<O>, TransportError<io::Error>>;
13 fn listen(&self, addr: Multiaddr) -> Result<BoxedListener<O>, TransportError<io::Error>>;
14}
15
16impl<T, O> Abstract<O> for T
17where
18 T: Transport<Output = O> + 'static,
19 T::Error: Send + Sync,
20 T::Dial: Send + 'static,
21 T::Incoming: Send + 'static,
22 T::Listener: Send + Unpin,
23{
24 fn dial(&self, addr: Multiaddr) -> Result<BoxedUpgrade<O>, TransportError<io::Error>> {
25 let fut = Transport::dial(self, addr)
26 .map_err(|e| e.map(box_err))?
27 .map_err(|e| box_err(e));
28 Ok(Box::pin(fut) as BoxedUpgrade<O>)
29 }
30
31 fn listen(&self, addr: Multiaddr) -> Result<BoxedListener<O>, TransportError<io::Error>> {
32 let listener = Transport::listen(self, addr).map_err(|e| e.map(box_err))?;
33
34 Ok(BoxedListener {
35 inner: Box::pin(ListenerSendWrapper::new(listener)),
36 })
37 }
38}
39
40struct ListenerSendWrapper<L> {
41 inner: L,
42}
43
44impl<L> ListenerSendWrapper<L> {
45 fn new(inner: L) -> Self {
46 Self { inner }
47 }
48}
49
50impl<L> Listener for ListenerSendWrapper<L>
51where
52 L: Listener + Send + Unpin,
53 L::Upgrade: Send + 'static,
54 L::Error: Send + Sync + 'static,
55{
56 type Output = L::Output;
57 type Error = io::Error;
58 type Upgrade = BoxedUpgrade<L::Output>;
59
60 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 Listener::poll_close(Pin::new(&mut self.get_mut().inner), cx).map_err(box_err)
62 }
63
64 fn poll_event(
65 self: Pin<&mut Self>,
66 cx: &mut Context<'_>,
67 ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
68 let event = ready!(Pin::new(&mut self.get_mut().inner).poll_event(cx))
69 .map_err(box_err)
70 .map_upgrade(|up| Box::pin(up.map_err(box_err)) as BoxedUpgrade<L::Output>);
71
72 Poll::Ready(event)
73 }
74}
75
76type BoxedUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
77
78pub struct BoxedListener<O> {
79 inner: Pin<Box<dyn Listener<Output = O, Error = io::Error, Upgrade = BoxedUpgrade<O>> + Send>>,
80}
81
82impl<O> Listener for BoxedListener<O> {
83 type Output = O;
84 type Error = io::Error;
85 type Upgrade = BoxedUpgrade<O>;
86
87 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88 self.get_mut().inner.as_mut().poll_close(cx)
89 }
90
91 fn poll_event(
92 self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
95 self.get_mut().inner.as_mut().poll_event(cx)
96 }
97}
98
99pub struct Boxed<O> {
100 inner: Box<dyn Abstract<O> + Send + Unpin>,
101}
102
103impl<O> Transport for Boxed<O> {
104 type Output = O;
105 type Error = io::Error;
106 type Dial = BoxedUpgrade<O>;
107 type Incoming = BoxedUpgrade<O>;
108 type Listener = BoxedListener<O>;
109
110 fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
111 self.inner.dial(addr)
112 }
113
114 fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
115 self.inner.listen(addr)
116 }
117}
118
119fn box_err<E: error::Error + Send + Sync + 'static>(e: E) -> io::Error {
120 io::Error::other(e)
121}
122
123pub(crate) fn boxed<T>(transport: T) -> Boxed<T::Output>
124where
125 T: Transport + Unpin + Send + 'static,
126 T::Error: Send + Sync,
127 T::Dial: Send + 'static,
128 T::Incoming: Send + 'static,
129 T::Listener: Send + Unpin,
130{
131 Boxed {
132 inner: Box::new(transport) as Box<_>,
133 }
134}