mwc_libp2p_core/transport/
timeout.rs1use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
28use futures::prelude::*;
29use futures_timer::Delay;
30use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
31
32#[derive(Debug, Copy, Clone)]
38pub struct TransportTimeout<InnerTrans> {
39 inner: InnerTrans,
40 outgoing_timeout: Duration,
41 incoming_timeout: Duration,
42}
43
44impl<InnerTrans> TransportTimeout<InnerTrans> {
45 pub fn new(trans: InnerTrans, timeout: Duration) -> Self {
47 TransportTimeout {
48 inner: trans,
49 outgoing_timeout: timeout,
50 incoming_timeout: timeout,
51 }
52 }
53
54 pub fn with_outgoing_timeout(trans: InnerTrans, timeout: Duration) -> Self {
56 TransportTimeout {
57 inner: trans,
58 outgoing_timeout: timeout,
59 incoming_timeout: Duration::from_secs(100 * 365 * 24 * 3600), }
61 }
62
63 pub fn with_ingoing_timeout(trans: InnerTrans, timeout: Duration) -> Self {
65 TransportTimeout {
66 inner: trans,
67 outgoing_timeout: Duration::from_secs(100 * 365 * 24 * 3600), incoming_timeout: timeout,
69 }
70 }
71}
72
73impl<InnerTrans> Transport for TransportTimeout<InnerTrans>
74where
75 InnerTrans: Transport,
76 InnerTrans::Error: 'static,
77{
78 type Output = InnerTrans::Output;
79 type Error = TransportTimeoutError<InnerTrans::Error>;
80 type Listener = TimeoutListener<InnerTrans::Listener>;
81 type ListenerUpgrade = Timeout<InnerTrans::ListenerUpgrade>;
82 type Dial = Timeout<InnerTrans::Dial>;
83
84 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
85 let listener = self.inner.listen_on(addr)
86 .map_err(|err| err.map(TransportTimeoutError::Other))?;
87
88 let listener = TimeoutListener {
89 inner: listener,
90 timeout: self.incoming_timeout,
91 };
92
93 Ok(listener)
94 }
95
96 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
97 let dial = self.inner.dial(addr)
98 .map_err(|err| err.map(TransportTimeoutError::Other))?;
99 Ok(Timeout {
100 inner: dial,
101 timer: Delay::new(self.outgoing_timeout),
102 })
103 }
104
105 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
106 self.inner.address_translation(server, observed)
107 }
108}
109
110#[pin_project::pin_project]
113pub struct TimeoutListener<InnerStream> {
114 #[pin]
115 inner: InnerStream,
116 timeout: Duration,
117}
118
119impl<InnerStream, O, E> Stream for TimeoutListener<InnerStream>
120where
121 InnerStream: TryStream<Ok = ListenerEvent<O, E>, Error = E>,
122{
123 type Item = Result<ListenerEvent<Timeout<O>, TransportTimeoutError<E>>, TransportTimeoutError<E>>;
124
125 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 let this = self.project();
127
128 let poll_out = match TryStream::try_poll_next(this.inner, cx) {
129 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(TransportTimeoutError::Other(err)))),
130 Poll::Ready(Some(Ok(v))) => v,
131 Poll::Ready(None) => return Poll::Ready(None),
132 Poll::Pending => return Poll::Pending,
133 };
134
135 let timeout = *this.timeout;
136 let event = poll_out
137 .map(move |inner_fut| {
138 Timeout {
139 inner: inner_fut,
140 timer: Delay::new(timeout),
141 }
142 })
143 .map_err(TransportTimeoutError::Other);
144
145 Poll::Ready(Some(Ok(event)))
146 }
147}
148
149#[pin_project::pin_project]
154#[must_use = "futures do nothing unless polled"]
155pub struct Timeout<InnerFut> {
156 #[pin]
157 inner: InnerFut,
158 timer: Delay,
159}
160
161impl<InnerFut> Future for Timeout<InnerFut>
162where
163 InnerFut: TryFuture,
164{
165 type Output = Result<InnerFut::Ok, TransportTimeoutError<InnerFut::Error>>;
166
167 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
168 let mut this = self.project();
174
175 match TryFuture::try_poll(this.inner, cx) {
176 Poll::Pending => {},
177 Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
178 Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))),
179 }
180
181 match Pin::new(&mut this.timer).poll(cx) {
182 Poll::Pending => Poll::Pending,
183 Poll::Ready(()) => Poll::Ready(Err(TransportTimeoutError::Timeout))
184 }
185 }
186}
187
188#[derive(Debug)]
190pub enum TransportTimeoutError<TErr> {
191 Timeout,
193 TimerError(io::Error),
195 Other(TErr),
197}
198
199impl<TErr> fmt::Display for TransportTimeoutError<TErr>
200where TErr: fmt::Display,
201{
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 match self {
204 TransportTimeoutError::Timeout => write!(f, "Timeout has been reached"),
205 TransportTimeoutError::TimerError(err) => write!(f, "Error in the timer: {}", err),
206 TransportTimeoutError::Other(err) => write!(f, "{}", err),
207 }
208 }
209}
210
211impl<TErr> error::Error for TransportTimeoutError<TErr>
212where TErr: error::Error + 'static,
213{
214 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
215 match self {
216 TransportTimeoutError::Timeout => None,
217 TransportTimeoutError::TimerError(err) => Some(err),
218 TransportTimeoutError::Other(err) => Some(err),
219 }
220 }
221}