Skip to main content

volans_core/transport/
boxed.rs

1use std::{
2    error, io,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures::{TryFutureExt, ready};
8
9use crate::{Listener, ListenerEvent, Multiaddr, Transport, TransportError};
10
11trait Abstract<O> {
12    fn dial(&self, addr: Multiaddr) -> Result<BoxedUpgrade<O>, TransportError<io::Error>>;
13    fn listen(&self, addr: Multiaddr) -> Result<BoxedListener<O>, TransportError<io::Error>>;
14}
15
16impl<T, O> Abstract<O> for T
17where
18    T: Transport<Output = O> + 'static,
19    T::Error: Send + Sync,
20    T::Dial: Send + 'static,
21    T::Incoming: Send + 'static,
22    T::Listener: Send + Unpin,
23{
24    fn dial(&self, addr: Multiaddr) -> Result<BoxedUpgrade<O>, TransportError<io::Error>> {
25        let fut = Transport::dial(self, addr)
26            .map_err(|e| e.map(box_err))?
27            .map_err(|e| box_err(e));
28        Ok(Box::pin(fut) as BoxedUpgrade<O>)
29    }
30
31    fn listen(&self, addr: Multiaddr) -> Result<BoxedListener<O>, TransportError<io::Error>> {
32        let listener = Transport::listen(self, addr).map_err(|e| e.map(box_err))?;
33
34        Ok(BoxedListener {
35            inner: Box::pin(ListenerSendWrapper::new(listener)),
36        })
37    }
38}
39
40struct ListenerSendWrapper<L> {
41    inner: L,
42}
43
44impl<L> ListenerSendWrapper<L> {
45    fn new(inner: L) -> Self {
46        Self { inner }
47    }
48}
49
50impl<L> Listener for ListenerSendWrapper<L>
51where
52    L: Listener + Send + Unpin,
53    L::Upgrade: Send + 'static,
54    L::Error: Send + Sync + 'static,
55{
56    type Output = L::Output;
57    type Error = io::Error;
58    type Upgrade = BoxedUpgrade<L::Output>;
59
60    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        Listener::poll_close(Pin::new(&mut self.get_mut().inner), cx).map_err(box_err)
62    }
63
64    fn poll_event(
65        self: Pin<&mut Self>,
66        cx: &mut Context<'_>,
67    ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
68        let event = ready!(Pin::new(&mut self.get_mut().inner).poll_event(cx))
69            .map_err(box_err)
70            .map_upgrade(|up| Box::pin(up.map_err(box_err)) as BoxedUpgrade<L::Output>);
71
72        Poll::Ready(event)
73    }
74}
75
76type BoxedUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
77
78pub struct BoxedListener<O> {
79    inner: Pin<Box<dyn Listener<Output = O, Error = io::Error, Upgrade = BoxedUpgrade<O>> + Send>>,
80}
81
82impl<O> Listener for BoxedListener<O> {
83    type Output = O;
84    type Error = io::Error;
85    type Upgrade = BoxedUpgrade<O>;
86
87    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        self.get_mut().inner.as_mut().poll_close(cx)
89    }
90
91    fn poll_event(
92        self: Pin<&mut Self>,
93        cx: &mut Context<'_>,
94    ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
95        self.get_mut().inner.as_mut().poll_event(cx)
96    }
97}
98
99pub struct Boxed<O> {
100    inner: Box<dyn Abstract<O> + Send + Unpin>,
101}
102
103impl<O> Transport for Boxed<O> {
104    type Output = O;
105    type Error = io::Error;
106    type Dial = BoxedUpgrade<O>;
107    type Incoming = BoxedUpgrade<O>;
108    type Listener = BoxedListener<O>;
109
110    fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
111        self.inner.dial(addr)
112    }
113
114    fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
115        self.inner.listen(addr)
116    }
117}
118
119fn box_err<E: error::Error + Send + Sync + 'static>(e: E) -> io::Error {
120    io::Error::other(e)
121}
122
123pub(crate) fn boxed<T>(transport: T) -> Boxed<T::Output>
124where
125    T: Transport + Unpin + Send + 'static,
126    T::Error: Send + Sync,
127    T::Dial: Send + 'static,
128    T::Incoming: Send + 'static,
129    T::Listener: Send + Unpin,
130{
131    Boxed {
132        inner: Box::new(transport) as Box<_>,
133    }
134}