libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::VecDeque,
23    convert::Infallible,
24    error::Error,
25    fmt, io,
26    task::{Context, Poll},
27    time::Duration,
28};
29
30use futures::{
31    future::{BoxFuture, Either},
32    prelude::*,
33};
34use futures_timer::Delay;
35use libp2p_core::upgrade::ReadyUpgrade;
36use libp2p_swarm::{
37    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
38    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39    SubstreamProtocol,
40};
41
42use crate::{protocol, PROTOCOL_NAME};
43
44/// The configuration for outbound pings.
45#[derive(Debug, Clone)]
46pub struct Config {
47    /// The timeout of an outbound ping.
48    timeout: Duration,
49    /// The duration between outbound pings.
50    interval: Duration,
51}
52
53impl Config {
54    /// Creates a new [`Config`] with the following default settings:
55    ///
56    ///   * [`Config::with_interval`] 15s
57    ///   * [`Config::with_timeout`] 20s
58    ///
59    /// These settings have the following effect:
60    ///
61    ///   * A ping is sent every 15 seconds on a healthy connection.
62    ///   * Every ping sent must yield a response within 20 seconds in order to be successful.
63    pub fn new() -> Self {
64        Self {
65            timeout: Duration::from_secs(20),
66            interval: Duration::from_secs(15),
67        }
68    }
69
70    /// Sets the ping timeout.
71    pub fn with_timeout(mut self, d: Duration) -> Self {
72        self.timeout = d;
73        self
74    }
75
76    /// Sets the ping interval.
77    pub fn with_interval(mut self, d: Duration) -> Self {
78        self.interval = d;
79        self
80    }
81}
82
83impl Default for Config {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89/// An outbound ping failure.
90#[derive(Debug)]
91pub enum Failure {
92    /// The ping timed out, i.e. no response was received within the
93    /// configured ping timeout.
94    Timeout,
95    /// The peer does not support the ping protocol.
96    Unsupported,
97    /// The ping failed for reasons other than a timeout.
98    Other {
99        error: Box<dyn std::error::Error + Send + Sync + 'static>,
100    },
101}
102
103impl Failure {
104    fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
105        Self::Other { error: Box::new(e) }
106    }
107}
108
109impl fmt::Display for Failure {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            Failure::Timeout => f.write_str("Ping timeout"),
113            Failure::Other { error } => write!(f, "Ping error: {error}"),
114            Failure::Unsupported => write!(f, "Ping protocol not supported"),
115        }
116    }
117}
118
119impl Error for Failure {
120    fn source(&self) -> Option<&(dyn Error + 'static)> {
121        match self {
122            Failure::Timeout => None,
123            Failure::Other { error } => Some(&**error),
124            Failure::Unsupported => None,
125        }
126    }
127}
128
129/// Protocol handler that handles pinging the remote at a regular period
130/// and answering ping queries.
131pub struct Handler {
132    /// Configuration options.
133    config: Config,
134    /// The timer used for the delay to the next ping.
135    interval: Delay,
136    /// Outbound ping failures that are pending to be processed by `poll()`.
137    pending_errors: VecDeque<Failure>,
138    /// The number of consecutive ping failures that occurred.
139    ///
140    /// Each successful ping resets this counter to 0.
141    failures: u32,
142    /// The outbound ping state.
143    outbound: Option<OutboundState>,
144    /// The inbound pong handler, i.e. if there is an inbound
145    /// substream, this is always a future that waits for the
146    /// next inbound ping to be answered.
147    inbound: Option<PongFuture>,
148    /// Tracks the state of our handler.
149    state: State,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153enum State {
154    /// We are inactive because the other peer doesn't support ping.
155    Inactive {
156        /// Whether or not we've reported the missing support yet.
157        ///
158        /// This is used to avoid repeated events being emitted for a specific connection.
159        reported: bool,
160    },
161    /// We are actively pinging the other peer.
162    Active,
163}
164
165impl Handler {
166    /// Builds a new [`Handler`] with the given configuration.
167    pub fn new(config: Config) -> Self {
168        Handler {
169            config,
170            interval: Delay::new(Duration::new(0, 0)),
171            pending_errors: VecDeque::with_capacity(2),
172            failures: 0,
173            outbound: None,
174            inbound: None,
175            state: State::Active,
176        }
177    }
178
179    fn on_dial_upgrade_error(
180        &mut self,
181        DialUpgradeError { error, .. }: DialUpgradeError<
182            (),
183            <Self as ConnectionHandler>::OutboundProtocol,
184        >,
185    ) {
186        self.outbound = None; // Request a new substream on the next `poll`.
187
188        // Timer is already polled and expired before substream request is initiated
189        // and will be polled again later on in our `poll` because we reset `self.outbound`.
190        //
191        // `futures-timer` allows an expired timer to be polled again and returns
192        // immediately `Poll::Ready`. However in its WASM implementation there is
193        // a bug that causes the expired timer to panic.
194        // This is a workaround until a proper fix is merged and released.
195        // See libp2p/rust-libp2p#5447 for more info.
196        //
197        // TODO: remove when async-rs/futures-timer#74 gets merged.
198        self.interval.reset(Duration::new(0, 0));
199
200        let error = match error {
201            StreamUpgradeError::NegotiationFailed => {
202                debug_assert_eq!(self.state, State::Active);
203
204                self.state = State::Inactive { reported: false };
205                return;
206            }
207            // Note: This timeout only covers protocol negotiation.
208            StreamUpgradeError::Timeout => Failure::Other {
209                error: Box::new(std::io::Error::new(
210                    std::io::ErrorKind::TimedOut,
211                    "ping protocol negotiation timed out",
212                )),
213            },
214            // TODO: remove when Rust 1.82 is MSRV
215            #[allow(unreachable_patterns)]
216            StreamUpgradeError::Apply(e) => libp2p_core::util::unreachable(e),
217            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
218        };
219
220        self.pending_errors.push_front(error);
221    }
222}
223
224impl ConnectionHandler for Handler {
225    type FromBehaviour = Infallible;
226    type ToBehaviour = Result<Duration, Failure>;
227    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
228    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
229    type OutboundOpenInfo = ();
230    type InboundOpenInfo = ();
231
232    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>> {
233        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
234    }
235
236    fn on_behaviour_event(&mut self, _: Infallible) {}
237
238    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
239    fn poll(
240        &mut self,
241        cx: &mut Context<'_>,
242    ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
243    {
244        match self.state {
245            State::Inactive { reported: true } => {
246                return Poll::Pending; // nothing to do on this connection
247            }
248            State::Inactive { reported: false } => {
249                self.state = State::Inactive { reported: true };
250                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
251                    Failure::Unsupported,
252                )));
253            }
254            State::Active => {}
255        }
256
257        // Respond to inbound pings.
258        if let Some(fut) = self.inbound.as_mut() {
259            match fut.poll_unpin(cx) {
260                Poll::Pending => {}
261                Poll::Ready(Err(e)) => {
262                    tracing::debug!("Inbound ping error: {:?}", e);
263                    self.inbound = None;
264                }
265                Poll::Ready(Ok(stream)) => {
266                    tracing::trace!("answered inbound ping from peer");
267
268                    // A ping from a remote peer has been answered, wait for the next.
269                    self.inbound = Some(protocol::recv_ping(stream).boxed());
270                }
271            }
272        }
273
274        loop {
275            // Check for outbound ping failures.
276            if let Some(error) = self.pending_errors.pop_back() {
277                tracing::debug!("Ping failure: {:?}", error);
278
279                self.failures += 1;
280
281                // Note: For backward-compatibility the first failure is always "free"
282                // and silent. This allows peers who use a new substream
283                // for each ping to have successful ping exchanges with peers
284                // that use a single substream, since every successful ping
285                // resets `failures` to `0`.
286                if self.failures > 1 {
287                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
288                }
289            }
290
291            // Continue outbound pings.
292            match self.outbound.take() {
293                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
294                    Poll::Pending => {
295                        self.outbound = Some(OutboundState::Ping(ping));
296                        break;
297                    }
298                    Poll::Ready(Ok((stream, rtt))) => {
299                        tracing::debug!(?rtt, "ping succeeded");
300                        self.failures = 0;
301                        self.interval.reset(self.config.interval);
302                        self.outbound = Some(OutboundState::Idle(stream));
303                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
304                    }
305                    Poll::Ready(Err(e)) => {
306                        self.interval.reset(self.config.interval);
307                        self.pending_errors.push_front(e);
308                    }
309                },
310                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
311                    Poll::Pending => {
312                        self.outbound = Some(OutboundState::Idle(stream));
313                        break;
314                    }
315                    Poll::Ready(()) => {
316                        self.outbound = Some(OutboundState::Ping(
317                            send_ping(stream, self.config.timeout).boxed(),
318                        ));
319                    }
320                },
321                Some(OutboundState::OpenStream) => {
322                    self.outbound = Some(OutboundState::OpenStream);
323                    break;
324                }
325                None => match self.interval.poll_unpin(cx) {
326                    Poll::Pending => break,
327                    Poll::Ready(()) => {
328                        self.outbound = Some(OutboundState::OpenStream);
329                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
330                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
331                            protocol,
332                        });
333                    }
334                },
335            }
336        }
337
338        Poll::Pending
339    }
340
341    fn on_connection_event(
342        &mut self,
343        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
344    ) {
345        match event {
346            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
347                protocol: mut stream,
348                ..
349            }) => {
350                stream.ignore_for_keep_alive();
351                self.inbound = Some(protocol::recv_ping(stream).boxed());
352            }
353            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
354                protocol: mut stream,
355                ..
356            }) => {
357                stream.ignore_for_keep_alive();
358                self.outbound = Some(OutboundState::Ping(
359                    send_ping(stream, self.config.timeout).boxed(),
360                ));
361            }
362            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
363                self.on_dial_upgrade_error(dial_upgrade_error)
364            }
365            _ => {}
366        }
367    }
368}
369
370type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
371type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
372
373/// The current state w.r.t. outbound pings.
374enum OutboundState {
375    /// A new substream is being negotiated for the ping protocol.
376    OpenStream,
377    /// The substream is idle, waiting to send the next ping.
378    Idle(Stream),
379    /// A ping is being sent and the response awaited.
380    Ping(PingFuture),
381}
382
383/// A wrapper around [`protocol::send_ping`] that enforces a time out.
384async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
385    let ping = protocol::send_ping(stream);
386    futures::pin_mut!(ping);
387
388    match future::select(ping, Delay::new(timeout)).await {
389        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
390        Either::Left((Err(e), _)) => Err(Failure::other(e)),
391        Either::Right(((), _)) => Err(Failure::Timeout),
392    }
393}