iroh_ping/lib.rs
1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use anyhow::Result;
5use iroh::{Endpoint, NodeAddr, endpoint::Connection, protocol::ProtocolHandler};
6use iroh_metrics::{Counter, MetricsGroup};
7use n0_future::boxed::BoxFuture;
8
9/// Each protocol is identified by its ALPN string.
10///
11/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
12/// and the connection is aborted unless both nodes pass the same bytestring.
13pub const ALPN: &[u8] = b"iroh/ping/0";
14
15/// Ping is a struct that holds both the client ping method, and the endpoint
16/// protocol implementation
17#[derive(Debug, Clone)]
18pub struct Ping {
19 metrics: Arc<Metrics>,
20}
21
22impl Ping {
23 /// create a new Ping
24 pub fn new() -> Self {
25 Self {
26 metrics: Arc::new(Metrics::default()),
27 }
28 }
29
30 /// handle to ping metrics
31 pub fn metrics(&self) -> &Arc<Metrics> {
32 &self.metrics
33 }
34
35 /// send a ping on the provided endpoint to a given node address
36 pub async fn ping(&self, endpoint: &Endpoint, addr: NodeAddr) -> Result<Duration> {
37 let start = Instant::now();
38 // Open a connection to the accepting node
39 let conn = endpoint.connect(addr, ALPN).await?;
40
41 // Open a bidirectional QUIC stream
42 let (mut send, mut recv) = conn.open_bi().await?;
43
44 // Send some data to be pinged
45 send.write_all(b"PING").await?;
46
47 // Signal the end of data for this particular stream
48 send.finish()?;
49
50 // read the response, which must be PONG as bytes
51 let response = recv.read_to_end(4).await?;
52 assert_eq!(&response, b"PONG");
53
54 // Explicitly close the whole connection.
55 conn.close(0u32.into(), b"bye!");
56
57 // The above call only queues a close message to be sent (see how it's not async!).
58 // We need to actually call this to make sure this message is sent out.
59 endpoint.close().await;
60
61 // at this point we've successfully pinged, mark the metric
62 self.metrics.pings_sent.inc();
63
64 // If we don't call this, but continue using the endpoint, we then the queued
65 // close call will eventually be picked up and sent.
66 // But always try to wait for endpoint.close().await to go through before dropping
67 // the endpoint to ensure any queued messages are sent through and connections are
68 // closed gracefully.
69 Ok(Duration::from_millis(
70 Instant::now().duration_since(start).as_millis() as u64,
71 ))
72 }
73}
74
75impl ProtocolHandler for Ping {
76 /// The `accept` method is called for each incoming connection for our ALPN.
77 ///
78 /// The returned future runs on a newly spawned tokio task, so it can run as long as
79 /// the connection lasts.
80 fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
81 // We have to return a boxed future from the handler.
82 let metrics = self.metrics.clone();
83 Box::pin(async move {
84 // We can get the remote's node id from the connection.
85 let node_id = connection.remote_node_id()?;
86 println!("accepted connection from {node_id}");
87
88 // Our protocol is a simple request-response protocol, so we expect the
89 // connecting peer to open a single bi-directional stream.
90 let (mut send, mut recv) = connection.accept_bi().await?;
91
92 let req = recv.read_to_end(4).await?;
93 assert_eq!(&req, b"PING");
94
95 // send back "PONG" bytes
96 send.write_all(b"PONG").await?;
97
98 // By calling `finish` on the send stream we signal that we will not send anything
99 // further, which makes the receive stream on the other end terminate.
100 send.finish()?;
101
102 // Wait until the remote closes the connection, which it does once it
103 // received the response.
104 connection.closed().await;
105
106 // increment count of pings we've received
107 metrics.pings_recv.inc();
108
109 Ok(())
110 })
111 }
112}
113
114/// Enum of metrics for the module
115#[derive(Debug, Default, MetricsGroup)]
116#[metrics(name = "ping")]
117pub struct Metrics {
118 /// count of valid ping messages sent
119 pub pings_sent: Counter,
120 /// count of valid ping messages received
121 pub pings_recv: Counter,
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use anyhow::Result;
128 use iroh::{Endpoint, protocol::Router};
129
130 #[tokio::test]
131 async fn test_ping() -> Result<()> {
132 let ep = Endpoint::builder().discovery_n0().bind().await?;
133 let router = Router::builder(ep).accept(ALPN, Ping::new()).spawn();
134 let addr = router.endpoint().node_addr().await?;
135
136 let client = Endpoint::builder().discovery_n0().bind().await?;
137 client.node_addr().await?;
138 let ping_client = Ping::new();
139 let res = ping_client.ping(&client, addr.clone()).await?;
140 println!("ping response: {:?}", res);
141
142 Ok(())
143 }
144}