libp2p_ping/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::prelude::*;
22use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
23use libp2p_swarm::NegotiatedSubstream;
24use rand::{distributions, prelude::*};
25use std::{io, iter, time::Duration};
26use void::Void;
27use wasm_timer::Instant;
28
29/// The `Ping` protocol upgrade.
30///
31/// The ping protocol sends 32 bytes of random data in configurable
32/// intervals over a single outbound substream, expecting to receive
33/// the same bytes as a response. At the same time, incoming pings
34/// on inbound substreams are answered by sending back the received bytes.
35///
36/// At most a single inbound and outbound substream is kept open at
37/// any time. In case of a ping timeout or another error on a substream, the
38/// substream is dropped. If a configurable number of consecutive
39/// outbound pings fail, the connection is closed.
40///
41/// Successful pings report the round-trip time.
42///
43/// > **Note**: The round-trip time of a ping may be subject to delays induced
44/// >           by the underlying transport, e.g. in the case of TCP there is
45/// >           Nagle's algorithm, delayed acks and similar configuration options
46/// >           which can affect latencies especially on otherwise low-volume
47/// >           connections.
48#[derive(Default, Debug, Copy, Clone)]
49pub struct Ping;
50
51const PING_SIZE: usize = 32;
52
53impl UpgradeInfo for Ping {
54    type Info = &'static [u8];
55    type InfoIter = iter::Once<Self::Info>;
56
57    fn protocol_info(&self) -> Self::InfoIter {
58        iter::once(b"/ipfs/ping/1.0.0")
59    }
60}
61
62impl InboundUpgrade<NegotiatedSubstream> for Ping {
63    type Output = NegotiatedSubstream;
64    type Error = Void;
65    type Future = future::Ready<Result<Self::Output, Self::Error>>;
66
67    fn upgrade_inbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
68        future::ok(stream)
69    }
70}
71
72impl OutboundUpgrade<NegotiatedSubstream> for Ping {
73    type Output = NegotiatedSubstream;
74    type Error = Void;
75    type Future = future::Ready<Result<Self::Output, Self::Error>>;
76
77    fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
78        future::ok(stream)
79    }
80}
81
82/// Sends a ping and waits for the pong.
83pub async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
84where
85    S: AsyncRead + AsyncWrite + Unpin
86{
87    let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
88    log::debug!("Preparing ping payload {:?}", payload);
89    stream.write_all(&payload).await?;
90    stream.flush().await?;
91    let started = Instant::now();
92    let mut recv_payload = [0u8; PING_SIZE];
93    log::debug!("Awaiting pong for {:?}", payload);
94    stream.read_exact(&mut recv_payload).await?;
95    if recv_payload == payload {
96        Ok((stream, started.elapsed()))
97    } else {
98        Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch"))
99    }
100}
101
102/// Waits for a ping and sends a pong.
103pub async fn recv_ping<S>(mut stream: S) -> io::Result<S>
104where
105    S: AsyncRead + AsyncWrite + Unpin
106{
107    let mut payload = [0u8; PING_SIZE];
108    log::debug!("Waiting for ping ...");
109    stream.read_exact(&mut payload).await?;
110    log::debug!("Sending pong for {:?}", payload);
111    stream.write_all(&payload).await?;
112    stream.flush().await?;
113    Ok(stream)
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use libp2p_core::{
120        multiaddr::multiaddr,
121        transport::{
122            Transport,
123            ListenerEvent,
124            memory::MemoryTransport
125        }
126    };
127    use rand::{thread_rng, Rng};
128    use std::time::Duration;
129
130    #[test]
131    fn ping_pong() {
132        let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
133        let mut listener = MemoryTransport.listen_on(mem_addr).unwrap();
134
135        let listener_addr =
136            if let Some(Some(Ok(ListenerEvent::NewAddress(a)))) = listener.next().now_or_never() {
137                a
138            } else {
139                panic!("MemoryTransport not listening on an address!");
140            };
141
142        async_std::task::spawn(async move {
143            let listener_event = listener.next().await.unwrap();
144            let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
145            let conn = listener_upgrade.await.unwrap();
146            recv_ping(conn).await.unwrap();
147        });
148
149        async_std::task::block_on(async move {
150            let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap();
151            let (_, rtt) = send_ping(c).await.unwrap();
152            assert!(rtt > Duration::from_secs(0));
153        });
154    }
155}