1use futures::prelude::*;
22use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
23use libp2p_swarm::NegotiatedSubstream;
24use rand::{distributions, prelude::*};
25use std::{io, iter, time::Duration};
26use void::Void;
27use wasm_timer::Instant;
28
29#[derive(Default, Debug, Copy, Clone)]
49pub struct Ping;
50
51const PING_SIZE: usize = 32;
52
53impl UpgradeInfo for Ping {
54 type Info = &'static [u8];
55 type InfoIter = iter::Once<Self::Info>;
56
57 fn protocol_info(&self) -> Self::InfoIter {
58 iter::once(b"/ipfs/ping/1.0.0")
59 }
60}
61
62impl InboundUpgrade<NegotiatedSubstream> for Ping {
63 type Output = NegotiatedSubstream;
64 type Error = Void;
65 type Future = future::Ready<Result<Self::Output, Self::Error>>;
66
67 fn upgrade_inbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
68 future::ok(stream)
69 }
70}
71
72impl OutboundUpgrade<NegotiatedSubstream> for Ping {
73 type Output = NegotiatedSubstream;
74 type Error = Void;
75 type Future = future::Ready<Result<Self::Output, Self::Error>>;
76
77 fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
78 future::ok(stream)
79 }
80}
81
82pub async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
84where
85 S: AsyncRead + AsyncWrite + Unpin
86{
87 let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
88 log::debug!("Preparing ping payload {:?}", payload);
89 stream.write_all(&payload).await?;
90 stream.flush().await?;
91 let started = Instant::now();
92 let mut recv_payload = [0u8; PING_SIZE];
93 log::debug!("Awaiting pong for {:?}", payload);
94 stream.read_exact(&mut recv_payload).await?;
95 if recv_payload == payload {
96 Ok((stream, started.elapsed()))
97 } else {
98 Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch"))
99 }
100}
101
102pub async fn recv_ping<S>(mut stream: S) -> io::Result<S>
104where
105 S: AsyncRead + AsyncWrite + Unpin
106{
107 let mut payload = [0u8; PING_SIZE];
108 log::debug!("Waiting for ping ...");
109 stream.read_exact(&mut payload).await?;
110 log::debug!("Sending pong for {:?}", payload);
111 stream.write_all(&payload).await?;
112 stream.flush().await?;
113 Ok(stream)
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use libp2p_core::{
120 multiaddr::multiaddr,
121 transport::{
122 Transport,
123 ListenerEvent,
124 memory::MemoryTransport
125 }
126 };
127 use rand::{thread_rng, Rng};
128 use std::time::Duration;
129
130 #[test]
131 fn ping_pong() {
132 let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
133 let mut listener = MemoryTransport.listen_on(mem_addr).unwrap();
134
135 let listener_addr =
136 if let Some(Some(Ok(ListenerEvent::NewAddress(a)))) = listener.next().now_or_never() {
137 a
138 } else {
139 panic!("MemoryTransport not listening on an address!");
140 };
141
142 async_std::task::spawn(async move {
143 let listener_event = listener.next().await.unwrap();
144 let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
145 let conn = listener_upgrade.await.unwrap();
146 recv_ping(conn).await.unwrap();
147 });
148
149 async_std::task::block_on(async move {
150 let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap();
151 let (_, rtt) = send_ping(c).await.unwrap();
152 assert!(rtt > Duration::from_secs(0));
153 });
154 }
155}