libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::protocol;
22use futures::prelude::*;
23use futures::future::BoxFuture;
24use libp2p_swarm::{
25    KeepAlive,
26    NegotiatedSubstream,
27    SubstreamProtocol,
28    ProtocolsHandler,
29    ProtocolsHandlerUpgrErr,
30    ProtocolsHandlerEvent
31};
32use std::{
33    error::Error,
34    io,
35    fmt,
36    num::NonZeroU32,
37    task::{Context, Poll},
38    time::Duration
39};
40use std::collections::VecDeque;
41use wasm_timer::Delay;
42use void::Void;
43
44/// The configuration for outbound pings.
45#[derive(Clone, Debug)]
46pub struct PingConfig {
47    /// The timeout of an outbound ping.
48    timeout: Duration,
49    /// The duration between the last successful outbound or inbound ping
50    /// and the next outbound ping.
51    interval: Duration,
52    /// The maximum number of failed outbound pings before the associated
53    /// connection is deemed unhealthy, indicating to the `Swarm` that it
54    /// should be closed.
55    max_failures: NonZeroU32,
56    /// Whether the connection should generally be kept alive unless
57    /// `max_failures` occur.
58    keep_alive: bool,
59}
60
61impl PingConfig {
62    /// Creates a new `PingConfig` with the following default settings:
63    ///
64    ///   * [`PingConfig::with_interval`] 15s
65    ///   * [`PingConfig::with_timeout`] 20s
66    ///   * [`PingConfig::with_max_failures`] 1
67    ///   * [`PingConfig::with_keep_alive`] false
68    ///
69    /// These settings have the following effect:
70    ///
71    ///   * A ping is sent every 15 seconds on a healthy connection.
72    ///   * Every ping sent must yield a response within 20 seconds in order to
73    ///     be successful.
74    ///   * A single ping failure is sufficient for the connection to be subject
75    ///     to being closed.
76    ///   * The connection may be closed at any time as far as the ping protocol
77    ///     is concerned, i.e. the ping protocol itself does not keep the
78    ///     connection alive.
79    pub fn new() -> Self {
80        Self {
81            timeout: Duration::from_secs(20),
82            interval: Duration::from_secs(15),
83            max_failures: NonZeroU32::new(1).expect("1 != 0"),
84            keep_alive: false
85        }
86    }
87
88    /// Sets the ping timeout.
89    pub fn with_timeout(mut self, d: Duration) -> Self {
90        self.timeout = d;
91        self
92    }
93
94    /// Sets the ping interval.
95    pub fn with_interval(mut self, d: Duration) -> Self {
96        self.interval = d;
97        self
98    }
99
100    /// Sets the maximum number of consecutive ping failures upon which the remote
101    /// peer is considered unreachable and the connection closed.
102    pub fn with_max_failures(mut self, n: NonZeroU32) -> Self {
103        self.max_failures = n;
104        self
105    }
106
107    /// Sets whether the ping protocol itself should keep the connection alive,
108    /// apart from the maximum allowed failures.
109    ///
110    /// By default, the ping protocol itself allows the connection to be closed
111    /// at any time, i.e. in the absence of ping failures the connection lifetime
112    /// is determined by other protocol handlers.
113    ///
114    /// If the maximum number of allowed ping failures is reached, the
115    /// connection is always terminated as a result of [`ProtocolsHandler::poll`]
116    /// returning an error, regardless of the keep-alive setting.
117    pub fn with_keep_alive(mut self, b: bool) -> Self {
118        self.keep_alive = b;
119        self
120    }
121}
122
123/// The result of an inbound or outbound ping.
124pub type PingResult = Result<PingSuccess, PingFailure>;
125
126/// The successful result of processing an inbound or outbound ping.
127#[derive(Debug)]
128pub enum PingSuccess {
129    /// Received a ping and sent back a pong.
130    Pong,
131    /// Sent a ping and received back a pong.
132    ///
133    /// Includes the round-trip time.
134    Ping { rtt: Duration },
135}
136
137/// An outbound ping failure.
138#[derive(Debug)]
139pub enum PingFailure {
140    /// The ping timed out, i.e. no response was received within the
141    /// configured ping timeout.
142    Timeout,
143    /// The ping failed for reasons other than a timeout.
144    Other { error: Box<dyn std::error::Error + Send + 'static> }
145}
146
147impl fmt::Display for PingFailure {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        match self {
150            PingFailure::Timeout => f.write_str("Ping timeout"),
151            PingFailure::Other { error } => write!(f, "Ping error: {}", error)
152        }
153    }
154}
155
156impl Error for PingFailure {
157    fn source(&self) -> Option<&(dyn Error + 'static)> {
158        match self {
159            PingFailure::Timeout => None,
160            PingFailure::Other { error } => Some(&**error)
161        }
162    }
163}
164
165/// Protocol handler that handles pinging the remote at a regular period
166/// and answering ping queries.
167///
168/// If the remote doesn't respond, produces an error that closes the connection.
169pub struct PingHandler {
170    /// Configuration options.
171    config: PingConfig,
172    /// The timer used for the delay to the next ping as well as
173    /// the ping timeout.
174    timer: Delay,
175    /// Outbound ping failures that are pending to be processed by `poll()`.
176    pending_errors: VecDeque<PingFailure>,
177    /// The number of consecutive ping failures that occurred.
178    ///
179    /// Each successful ping resets this counter to 0.
180    failures: u32,
181    /// The outbound ping state.
182    outbound: Option<PingState>,
183    /// The inbound pong handler, i.e. if there is an inbound
184    /// substream, this is always a future that waits for the
185    /// next inbound ping to be answered.
186    inbound: Option<PongFuture>,
187}
188
189impl PingHandler {
190    /// Builds a new `PingHandler` with the given configuration.
191    pub fn new(config: PingConfig) -> Self {
192        PingHandler {
193            config,
194            timer: Delay::new(Duration::new(0, 0)),
195            pending_errors: VecDeque::with_capacity(2),
196            failures: 0,
197            outbound: None,
198            inbound: None,
199        }
200    }
201}
202
203impl ProtocolsHandler for PingHandler {
204    type InEvent = Void;
205    type OutEvent = PingResult;
206    type Error = PingFailure;
207    type InboundProtocol = protocol::Ping;
208    type OutboundProtocol = protocol::Ping;
209    type OutboundOpenInfo = ();
210    type InboundOpenInfo = ();
211
212    fn listen_protocol(&self) -> SubstreamProtocol<protocol::Ping, ()> {
213        SubstreamProtocol::new(protocol::Ping, ())
214    }
215
216    fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) {
217        self.inbound = Some(protocol::recv_ping(stream).boxed());
218    }
219
220    fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) {
221        self.timer.reset(self.config.timeout);
222        self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
223    }
224
225    fn inject_event(&mut self, _: Void) {}
226
227    fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) {
228        self.outbound = None; // Request a new substream on the next `poll`.
229        self.pending_errors.push_front(
230            match error {
231                // Note: This timeout only covers protocol negotiation.
232                ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout,
233                e => PingFailure::Other { error: Box::new(e) },
234            })
235    }
236
237    fn connection_keep_alive(&self) -> KeepAlive {
238        if self.config.keep_alive {
239            KeepAlive::Yes
240        } else {
241            KeepAlive::No
242        }
243    }
244
245    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult, Self::Error>> {
246        // Respond to inbound pings.
247        if let Some(fut) = self.inbound.as_mut() {
248            match fut.poll_unpin(cx) {
249                Poll::Pending => {},
250                Poll::Ready(Err(e)) => {
251                    log::debug!("Inbound ping error: {:?}", e);
252                    self.inbound = None;
253                }
254                Poll::Ready(Ok(stream)) => {
255                    // A ping from a remote peer has been answered, wait for the next.
256                    self.inbound = Some(protocol::recv_ping(stream).boxed());
257                    return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Pong)))
258                }
259            }
260        }
261
262        loop {
263            // Check for outbound ping failures.
264            if let Some(error) = self.pending_errors.pop_back() {
265                log::debug!("Ping failure: {:?}", error);
266
267                self.failures += 1;
268
269                // Note: For backward-compatibility, with configured
270                // `max_failures == 1`, the first failure is always "free"
271                // and silent. This allows peers who still use a new substream
272                // for each ping to have successful ping exchanges with peers
273                // that use a single substream, since every successful ping
274                // resets `failures` to `0`, while at the same time emitting
275                // events only for `max_failures - 1` failures, as before.
276                if self.failures > 1 || self.config.max_failures.get() > 1 {
277                    if self.failures >= self.config.max_failures.get() {
278                        log::debug!("Too many failures ({}). Closing connection.", self.failures);
279                        return Poll::Ready(ProtocolsHandlerEvent::Close(error))
280                    }
281
282                    return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(error)))
283                }
284            }
285
286            // Continue outbound pings.
287            match self.outbound.take() {
288                Some(PingState::Ping(mut ping)) => match ping.poll_unpin(cx) {
289                    Poll::Pending => {
290                        if self.timer.poll_unpin(cx).is_ready() {
291                            self.pending_errors.push_front(PingFailure::Timeout);
292                        } else {
293                            self.outbound = Some(PingState::Ping(ping));
294                            break
295                        }
296                    },
297                    Poll::Ready(Ok((stream, rtt))) => {
298                        self.failures = 0;
299                        self.timer.reset(self.config.interval);
300                        self.outbound = Some(PingState::Idle(stream));
301                        return Poll::Ready(
302                            ProtocolsHandlerEvent::Custom(
303                                Ok(PingSuccess::Ping { rtt })))
304                    }
305                    Poll::Ready(Err(e)) => {
306                        self.pending_errors.push_front(PingFailure::Other {
307                            error: Box::new(e)
308                        });
309                    }
310                },
311                Some(PingState::Idle(stream)) => match self.timer.poll_unpin(cx) {
312                    Poll::Pending => {
313                        self.outbound = Some(PingState::Idle(stream));
314                        break
315                    },
316                    Poll::Ready(Ok(())) => {
317                        self.timer.reset(self.config.timeout);
318                        self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
319                    },
320                    Poll::Ready(Err(e)) => {
321                        return Poll::Ready(ProtocolsHandlerEvent::Close(
322                            PingFailure::Other {
323                                error: Box::new(e)
324                            }))
325                    }
326                }
327                Some(PingState::OpenStream) => {
328                    self.outbound = Some(PingState::OpenStream);
329                    break
330                }
331                None => {
332                    self.outbound = Some(PingState::OpenStream);
333                    let protocol = SubstreamProtocol::new(protocol::Ping, ())
334                        .with_timeout(self.config.timeout);
335                    return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
336                        protocol
337                    })
338                }
339            }
340        }
341
342        Poll::Pending
343    }
344}
345
346type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>;
347type PongFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>;
348
349/// The current state w.r.t. outbound pings.
350enum PingState {
351    /// A new substream is being negotiated for the ping protocol.
352    OpenStream,
353    /// The substream is idle, waiting to send the next ping.
354    Idle(NegotiatedSubstream),
355    /// A ping is being sent and the response awaited.
356    Ping(PingFuture),
357}
358