1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Ping, PingDialer};
use futures::prelude::*;
use libp2p_core::{
    OutboundUpgrade,
    ProtocolsHandler,
    ProtocolsHandlerEvent,
    protocols_handler::ProtocolsHandlerUpgrErr,
    upgrade::DeniedUpgrade
};
use log::warn;
use std::{
    io, mem,
    time::{Duration, Instant},
};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{self, Delay};
use void::{Void, unreachable};

/// Protocol handler that handles pinging the remote at a regular period.
///
/// If the remote doesn't respond, produces an error that closes the connection.
pub struct PeriodicPingHandler<TSubstream> {
    /// Configuration for the ping protocol.
    ping_config: Ping<Instant>,

    /// State of the outgoing ping.
    out_state: OutState<TSubstream>,

    /// Duration after which we consider that a ping failed.
    ping_timeout: Duration,

    /// After a ping succeeded, wait this long before the next ping.
    delay_to_next_ping: Duration,

    /// If true, we switch to the `Disabled` state if the remote doesn't support the ping protocol.
    /// If false, we close the connection.
    tolerate_unsupported: bool,
}

/// State of the outgoing ping substream.
enum OutState<TSubstream> {
    /// We need to open a new substream.
    NeedToOpen {
        /// Timeout after which we decide that it's not going to work out.
        ///
        /// Theoretically the handler should be polled immediately after we set the state to
        /// `NeedToOpen` and then we immediately transition away from it. However if the local node
        /// is for some reason busy, creating the `Delay` here avoids being overly generous with
        /// the ping timeout.
        expires: Delay,
    },

    /// Upgrading a substream to use ping.
    ///
    /// We produced a substream open request, and are waiting for it to be upgraded to a full
    /// ping-powered substream.
    Upgrading {
        /// Timeout after which we decide that it's not going to work out.
        ///
        /// The user of the `ProtocolsHandler` should ensure that there's a timeout when upgrading,
        /// but by storing a timeout here as well we ensure that we keep track of how long the
        /// ping has lasted.
        expires: Delay,
    },

    /// We sent a ping and we are waiting for the pong.
    WaitingForPong {
        /// Substream where we should receive the pong.
        substream: PingDialer<TSubstream, Instant>,
        /// Timeout after which we decide that we're not going to receive the pong.
        expires: Delay,
    },

    /// We received a pong and now we have nothing to do except wait a bit before sending the
    /// next ping.
    Idle {
        /// The substream to use to send pings.
        substream: PingDialer<TSubstream, Instant>,
        /// When to send the ping next.
        next_ping: Delay,
    },

    /// The ping dialer is disabled. Don't do anything.
    Disabled,

    /// The dialer has been closed.
    Shutdown,

    /// Something bad happened during the previous polling.
    Poisoned,
}

/// Event produced by the periodic pinger.
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
    /// Started pinging the remote. This can be used to print a diagnostic message in the logs.
    PingStart,

    /// The node has successfully responded to a ping.
    PingSuccess(Duration),
}

impl<TSubstream> PeriodicPingHandler<TSubstream> {
    /// Builds a new `PeriodicPingHandler`.
    pub fn new() -> PeriodicPingHandler<TSubstream> {
        let ping_timeout = Duration::from_secs(30);

        PeriodicPingHandler {
            ping_config: Default::default(),
            out_state: OutState::NeedToOpen {
                expires: Delay::new(Instant::now() + ping_timeout),
            },
            ping_timeout,
            delay_to_next_ping: Duration::from_secs(15),
            tolerate_unsupported: false,
        }
    }
}

impl<TSubstream> Default for PeriodicPingHandler<TSubstream> {
    #[inline]
    fn default() -> Self {
        PeriodicPingHandler::new()
    }
}

impl<TSubstream> ProtocolsHandler for PeriodicPingHandler<TSubstream>
where
    TSubstream: AsyncRead + AsyncWrite,
{
    type InEvent = Void;
    type OutEvent = OutEvent;
    type Error = io::Error; // TODO: more precise error type
    type Substream = TSubstream;
    type InboundProtocol = DeniedUpgrade;
    type OutboundProtocol = Ping<Instant>;
    type OutboundOpenInfo = ();

    #[inline]
    fn listen_protocol(&self) -> Self::InboundProtocol {
        DeniedUpgrade
    }

    fn inject_fully_negotiated_inbound(&mut self, protocol: Void) {
        unreachable(protocol)
    }

    fn inject_fully_negotiated_outbound(
        &mut self,
        mut substream: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
        _info: Self::OutboundOpenInfo
    ) {
        if let OutState::Upgrading { expires } = mem::replace(&mut self.out_state, OutState::Poisoned) {
            // We always upgrade with the intent of immediately pinging.
            substream.ping(Instant::now());
            self.out_state = OutState::WaitingForPong { substream, expires }
        }
    }

    fn inject_event(&mut self, _: Self::InEvent) {}

    fn inject_inbound_closed(&mut self) {}

    #[inline]
    fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
        // In case of error while upgrading, there's not much we can do except shut down.
        // TODO: we assume that the error is about ping not being supported, which is not
        //       necessarily the case
        if self.tolerate_unsupported {
            self.out_state = OutState::Disabled;
        } else {
            self.out_state = OutState::Shutdown;
        }
    }

    #[inline]
    fn connection_keep_alive(&self) -> bool {
        false
    }

    fn shutdown(&mut self) {
        // Put `Shutdown` in `self.out_state` if we don't have any substream open.
        // Otherwise, keep the state as it is but call `shutdown()` on the substream. This
        // guarantees that the dialer will return `None` at some point.
        match self.out_state {
            OutState::WaitingForPong {
                ref mut substream, ..
            } => substream.shutdown(),
            OutState::Idle {
                ref mut substream, ..
            } => substream.shutdown(),
            ref mut s => *s = OutState::Shutdown,
        }
    }

    fn poll(
        &mut self,
    ) -> Poll<
        ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
        io::Error,
    > {
        // Shortcut for polling a `tokio_timer::Delay`
        macro_rules! poll_delay {
            ($delay:expr => { NotReady => $notready:expr, Ready => $ready:expr, }) => (
                match $delay.poll() {
                    Ok(Async::NotReady) => $notready,
                    Ok(Async::Ready(())) => $ready,
                    Err(err) => {
                        warn!(target: "sub-libp2p", "Ping timer errored: {:?}", err);
                        return Err(io::Error::new(io::ErrorKind::Other, err));
                    }
                }
            )
        }

        match mem::replace(&mut self.out_state, OutState::Poisoned) {
            OutState::Shutdown | OutState::Poisoned => {
                // This shuts down the whole connection with the remote.
                Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
            },

            OutState::Disabled => {
                Ok(Async::NotReady)
            }

            // Need to open an outgoing substream.
            OutState::NeedToOpen { expires } => {
                // Note that we ignore the expiration here, as it's pretty unlikely to happen.
                // The expiration is only here to be transmitted to the `Upgrading`.
                self.out_state = OutState::Upgrading { expires };
                Ok(Async::Ready(
                    ProtocolsHandlerEvent::OutboundSubstreamRequest {
                        upgrade: self.ping_config,
                        info: (),
                    },
                ))
            }

            // Waiting for the upgrade to be negotiated.
            OutState::Upgrading { mut expires } => poll_delay!(expires => {
                    NotReady => {
                        self.out_state = OutState::Upgrading { expires };
                        Ok(Async::NotReady)
                    },
                    Ready => {
                        self.out_state = OutState::Shutdown;
                        Err(io::Error::new(io::ErrorKind::Other, "unresponsive node"))
                    },
                }),

            // Waiting for the pong.
            OutState::WaitingForPong { mut substream, mut expires } => {
                // We start by dialing the substream, leaving one last chance for it to
                // produce the pong even if the expiration happened.
                match substream.poll()? {
                    Async::Ready(Some(started)) => {
                        self.out_state = OutState::Idle {
                            substream,
                            next_ping: Delay::new(Instant::now() + self.delay_to_next_ping),
                        };
                        let ev = OutEvent::PingSuccess(started.elapsed());
                        return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev)));
                    }
                    Async::NotReady => {}
                    Async::Ready(None) => {
                        self.out_state = OutState::Shutdown;
                        return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown));
                    }
                }

                // Check the expiration.
                poll_delay!(expires => {
                    NotReady => {
                        self.out_state = OutState::WaitingForPong { substream, expires };
                        // Both `substream` and `expires` and not ready, so it's fine to return
                        // not ready.
                        Ok(Async::NotReady)
                    },
                    Ready => {
                        self.out_state = OutState::Shutdown;
                        Err(io::Error::new(io::ErrorKind::Other, "unresponsive node"))
                    },
                })
            }

            OutState::Idle { mut substream, mut next_ping } => {
                // Poll the future that fires when we need to ping the node again.
                poll_delay!(next_ping => {
                    NotReady => {
                        self.out_state = OutState::Idle { substream, next_ping };
                        Ok(Async::NotReady)
                    },
                    Ready => {
                        let expires = Delay::new(Instant::now() + self.ping_timeout);
                        substream.ping(Instant::now());
                        self.out_state = OutState::WaitingForPong { substream, expires };
                        Ok(Async::Ready(ProtocolsHandlerEvent::Custom(OutEvent::PingStart)))
                    },
                })
            }
        }
    }
}