Skip to main content

volans_core/
transport.rs

1pub(crate) mod boxed;
2
3pub mod and_then;
4pub mod apply;
5pub mod choice;
6pub mod map;
7pub mod map_err;
8pub mod timeout;
9pub mod upgrade;
10
11pub use boxed::{Boxed, BoxedListener};
12
13use futures::TryFuture;
14
15use std::{
16    error, fmt,
17    pin::Pin,
18    task::{Context, Poll},
19    time::Duration,
20};
21
22use crate::{
23    ConnectedPoint, Multiaddr, Negotiated,
24    upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade},
25};
26
27pub trait Listener {
28    type Output;
29    type Error: std::error::Error;
30    type Upgrade: Future<Output = Result<Self::Output, Self::Error>>;
31
32    fn poll_event(
33        self: Pin<&mut Self>,
34        cx: &mut Context<'_>,
35    ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>>;
36
37    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
38}
39pub trait Transport {
40    type Output;
41    type Error: std::error::Error;
42    type Dial: Future<Output = Result<Self::Output, Self::Error>>;
43    type Incoming: Future<Output = Result<Self::Output, Self::Error>>;
44    type Listener: Listener<Output = Self::Output, Error = Self::Error, Upgrade = Self::Incoming>;
45
46    fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>;
47    fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>>;
48
49    fn and_then<D, TMap, TMapFut>(self, map: TMap) -> and_then::AndThen<Self, TMap>
50    where
51        Self: Sized,
52        TMap: FnOnce(Self::Output, ConnectedPoint) -> TMapFut + Clone,
53        TMapFut: TryFuture<Ok = D>,
54        TMapFut::Error: std::error::Error,
55    {
56        and_then::AndThen::new(self, map)
57    }
58
59    fn map<D, TMap>(self, map: TMap) -> map::Map<Self, TMap>
60    where
61        Self: Sized,
62        TMap: FnOnce(Self::Output, ConnectedPoint) -> D + Clone,
63    {
64        map::Map::new(self, map)
65    }
66
67    fn map_err<TErr, F>(self, map: F) -> map_err::MapErr<Self, F>
68    where
69        Self: Sized,
70        F: FnOnce(Self::Error) -> TErr + Clone,
71        TErr: std::error::Error,
72    {
73        map_err::MapErr::new(self, map)
74    }
75
76    fn apply<C, D, E, U>(self, upgrade: U) -> apply::UpgradeApply<Self, U>
77    where
78        Self: Sized,
79        Self: Transport<Output = C>,
80        U: InboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
81        U: OutboundConnectionUpgrade<Negotiated<C>, Output = D, Error = E>,
82        E: std::error::Error,
83    {
84        apply::UpgradeApply::new(self, upgrade)
85    }
86
87    fn choice<B>(self, other: B) -> choice::Choice<Self, B>
88    where
89        Self: Sized,
90        B: Transport,
91    {
92        choice::Choice::new(self, other)
93    }
94
95    fn timeout(self, timeout: Duration) -> timeout::Timeout<Self>
96    where
97        Self: Sized,
98    {
99        timeout::Timeout::new(self, timeout)
100    }
101
102    fn boxed(self) -> boxed::Boxed<Self::Output>
103    where
104        Self: Sized + Send + Unpin + 'static,
105        Self::Dial: Send + 'static,
106        Self::Incoming: Send + 'static,
107        Self::Error: Send + Sync,
108        Self::Listener: Send + Unpin + 'static,
109    {
110        boxed::boxed(self)
111    }
112
113    fn upgrade(self) -> upgrade::Builder<Self>
114    where
115        Self: Sized,
116        Self::Error: 'static,
117    {
118        upgrade::Builder::new(self)
119    }
120}
121
122pub enum ListenerEvent<TUpgr, TErr> {
123    NewAddress(Multiaddr),
124    AddressExpired(Multiaddr),
125    Incoming {
126        local_addr: Multiaddr,
127        remote_addr: Multiaddr,
128        upgrade: TUpgr,
129    },
130    Closed(Result<(), TErr>),
131    Error(TErr),
132}
133
134impl<TUpgr, TErr> ListenerEvent<TUpgr, TErr> {
135    pub fn map_upgrade<F, TUpgr2>(self, map: F) -> ListenerEvent<TUpgr2, TErr>
136    where
137        F: FnOnce(TUpgr) -> TUpgr2,
138    {
139        match self {
140            ListenerEvent::NewAddress(addr) => ListenerEvent::NewAddress(addr),
141            ListenerEvent::AddressExpired(addr) => ListenerEvent::AddressExpired(addr),
142            ListenerEvent::Incoming {
143                local_addr,
144                remote_addr,
145                upgrade,
146            } => ListenerEvent::Incoming {
147                local_addr,
148                remote_addr,
149                upgrade: map(upgrade),
150            },
151            ListenerEvent::Closed(res) => ListenerEvent::Closed(res),
152            ListenerEvent::Error(err) => ListenerEvent::Error(err),
153        }
154    }
155
156    pub fn map_err<F, TErr2>(self, map: F) -> ListenerEvent<TUpgr, TErr2>
157    where
158        F: FnOnce(TErr) -> TErr2,
159    {
160        match self {
161            ListenerEvent::NewAddress(addr) => ListenerEvent::NewAddress(addr),
162            ListenerEvent::AddressExpired(addr) => ListenerEvent::AddressExpired(addr),
163            ListenerEvent::Incoming {
164                local_addr,
165                remote_addr,
166                upgrade,
167            } => ListenerEvent::Incoming {
168                local_addr,
169                remote_addr,
170                upgrade,
171            },
172            ListenerEvent::Closed(res) => ListenerEvent::Closed(res.map_err(map)),
173            ListenerEvent::Error(err) => ListenerEvent::Error(map(err)),
174        }
175    }
176}
177
178#[derive(Debug, Clone)]
179pub enum TransportError<TErr> {
180    NotSupported(Multiaddr),
181    Other(TErr),
182}
183
184impl<TErr> From<TErr> for TransportError<TErr> {
185    fn from(err: TErr) -> Self {
186        TransportError::Other(err)
187    }
188}
189
190impl<TErr> TransportError<TErr> {
191    pub fn map<E, F>(self, map: F) -> TransportError<E>
192    where
193        F: FnOnce(TErr) -> E,
194        E: std::error::Error,
195    {
196        match self {
197            TransportError::NotSupported(addr) => TransportError::NotSupported(addr),
198            TransportError::Other(err) => TransportError::Other(map(err)),
199        }
200    }
201}
202
203impl<TErr> fmt::Display for TransportError<TErr>
204where
205    TErr: fmt::Display,
206{
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        match self {
209            TransportError::NotSupported(addr) => {
210                write!(f, "Transport not supported for address: {}", addr)
211            }
212            TransportError::Other(err) => write!(f, "Transport error: {}", err),
213        }
214    }
215}
216
217impl<TErr> error::Error for TransportError<TErr>
218where
219    TErr: error::Error + 'static,
220{
221    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
222        match self {
223            TransportError::NotSupported(_) => None,
224            TransportError::Other(err) => Some(err),
225        }
226    }
227}