iroh_ping/lib.rs
1use std::{
2 sync::Arc,
3 time::{Duration, Instant},
4};
5
6use iroh::{
7 endpoint::Connection,
8 protocol::{AcceptError, ProtocolHandler},
9 Endpoint, NodeAddr,
10};
11use iroh_metrics::{Counter, MetricsGroup};
12
13/// Each protocol is identified by its ALPN string.
14///
15/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
16/// and the connection is aborted unless both nodes pass the same bytestring.
17pub const ALPN: &[u8] = b"iroh/ping/0";
18
19/// Ping is a struct that holds both the client ping method, and the endpoint
20/// protocol implementation
21#[derive(Debug, Clone)]
22pub struct Ping {
23 metrics: Arc<Metrics>,
24}
25
26impl Default for Ping {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl Ping {
33 /// create a new Ping
34 pub fn new() -> Self {
35 Self {
36 metrics: Arc::new(Metrics::default()),
37 }
38 }
39
40 /// handle to ping metrics
41 pub fn metrics(&self) -> &Arc<Metrics> {
42 &self.metrics
43 }
44
45 /// send a ping on the provided endpoint to a given node address
46 pub async fn ping(&self, endpoint: &Endpoint, addr: NodeAddr) -> anyhow::Result<Duration> {
47 let start = Instant::now();
48 // Open a connection to the accepting node
49 let conn = endpoint.connect(addr, ALPN).await?;
50
51 // Open a bidirectional QUIC stream
52 let (mut send, mut recv) = conn.open_bi().await?;
53
54 // Send some data to be pinged
55 send.write_all(b"PING").await?;
56
57 // Signal the end of data for this particular stream
58 send.finish()?;
59
60 // read the response, which must be PONG as bytes
61 let response = recv.read_to_end(4).await?;
62 assert_eq!(&response, b"PONG");
63
64 // Explicitly close the whole connection.
65 conn.close(0u32.into(), b"bye!");
66
67 // The above call only queues a close message to be sent (see how it's not async!).
68 // We need to actually call this to make sure this message is sent out.
69 endpoint.close().await;
70
71 // at this point we've successfully pinged, mark the metric
72 self.metrics.pings_sent.inc();
73
74 // If we don't call this, but continue using the endpoint, we then the queued
75 // close call will eventually be picked up and sent.
76 // But always try to wait for endpoint.close().await to go through before dropping
77 // the endpoint to ensure any queued messages are sent through and connections are
78 // closed gracefully.
79 Ok(Duration::from_millis(
80 Instant::now().duration_since(start).as_millis() as u64,
81 ))
82 }
83}
84
85impl ProtocolHandler for Ping {
86 /// The `accept` method is called for each incoming connection for our ALPN.
87 ///
88 /// The returned future runs on a newly spawned tokio task, so it can run as long as
89 /// the connection lasts.
90 async fn accept(&self, connection: Connection) -> n0_snafu::Result<(), AcceptError> {
91 let metrics = self.metrics.clone();
92
93 // We can get the remote's node id from the connection.
94 let node_id = connection.remote_node_id()?;
95 println!("accepted connection from {node_id}");
96
97 // Our protocol is a simple request-response protocol, so we expect the
98 // connecting peer to open a single bi-directional stream.
99 let (mut send, mut recv) = connection.accept_bi().await?;
100
101 let req = recv.read_to_end(4).await.map_err(AcceptError::from_err)?;
102 assert_eq!(&req, b"PING");
103
104 // send back "PONG" bytes
105 send.write_all(b"PONG")
106 .await
107 .map_err(AcceptError::from_err)?;
108
109 // By calling `finish` on the send stream we signal that we will not send anything
110 // further, which makes the receive stream on the other end terminate.
111 send.finish()?;
112
113 // Wait until the remote closes the connection, which it does once it
114 // received the response.
115 connection.closed().await;
116
117 // increment count of pings we've received
118 metrics.pings_recv.inc();
119
120 Ok(())
121 }
122}
123
124/// Enum of metrics for the module
125#[derive(Debug, Default, MetricsGroup)]
126#[metrics(name = "ping")]
127pub struct Metrics {
128 /// count of valid ping messages sent
129 pub pings_sent: Counter,
130 /// count of valid ping messages received
131 pub pings_recv: Counter,
132}
133
134#[cfg(test)]
135mod tests {
136 use iroh::{protocol::Router, Endpoint, Watcher};
137
138 use super::*;
139
140 #[tokio::test]
141 async fn test_ping() -> anyhow::Result<()> {
142 let ep = Endpoint::builder().discovery_n0().bind().await?;
143 let router = Router::builder(ep).accept(ALPN, Ping::new()).spawn();
144 let addr = router.endpoint().node_addr().initialized().await?;
145
146 let client = Endpoint::builder().discovery_n0().bind().await?;
147 let ping_client = Ping::new();
148 let res = ping_client.ping(&client, addr.clone()).await?;
149 println!("ping response: {res:?}");
150
151 Ok(())
152 }
153}