iroh_ping/
lib.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use anyhow::Result;
5use iroh::{Endpoint, NodeAddr, endpoint::Connection, protocol::ProtocolHandler};
6use iroh_metrics::{Counter, MetricsGroup};
7use n0_future::boxed::BoxFuture;
8
9/// Each protocol is identified by its ALPN string.
10///
11/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
12/// and the connection is aborted unless both nodes pass the same bytestring.
13pub const ALPN: &[u8] = b"iroh/ping/0";
14
15/// Ping is a struct that holds both the client ping method, and the endpoint
16/// protocol implementation
17#[derive(Debug, Clone)]
18pub struct Ping {
19    metrics: Arc<Metrics>,
20}
21
22impl Ping {
23    /// create a new Ping
24    pub fn new() -> Self {
25        Self {
26            metrics: Arc::new(Metrics::default()),
27        }
28    }
29
30    /// handle to ping metrics
31    pub fn metrics(&self) -> &Arc<Metrics> {
32        &self.metrics
33    }
34
35    /// send a ping on the provided endpoint to a given node address
36    pub async fn ping(&self, endpoint: &Endpoint, addr: NodeAddr) -> Result<Duration> {
37        let start = Instant::now();
38        // Open a connection to the accepting node
39        let conn = endpoint.connect(addr, ALPN).await?;
40
41        // Open a bidirectional QUIC stream
42        let (mut send, mut recv) = conn.open_bi().await?;
43
44        // Send some data to be pinged
45        send.write_all(b"PING").await?;
46
47        // Signal the end of data for this particular stream
48        send.finish()?;
49
50        // read the response, which must be PONG as bytes
51        let response = recv.read_to_end(4).await?;
52        assert_eq!(&response, b"PONG");
53
54        // Explicitly close the whole connection.
55        conn.close(0u32.into(), b"bye!");
56
57        // The above call only queues a close message to be sent (see how it's not async!).
58        // We need to actually call this to make sure this message is sent out.
59        endpoint.close().await;
60
61        // at this point we've successfully pinged, mark the metric
62        self.metrics.pings_sent.inc();
63
64        // If we don't call this, but continue using the endpoint, we then the queued
65        // close call will eventually be picked up and sent.
66        // But always try to wait for endpoint.close().await to go through before dropping
67        // the endpoint to ensure any queued messages are sent through and connections are
68        // closed gracefully.
69        Ok(Duration::from_millis(
70            Instant::now().duration_since(start).as_millis() as u64,
71        ))
72    }
73}
74
75impl ProtocolHandler for Ping {
76    /// The `accept` method is called for each incoming connection for our ALPN.
77    ///
78    /// The returned future runs on a newly spawned tokio task, so it can run as long as
79    /// the connection lasts.
80    fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
81        // We have to return a boxed future from the handler.
82        let metrics = self.metrics.clone();
83        Box::pin(async move {
84            // We can get the remote's node id from the connection.
85            let node_id = connection.remote_node_id()?;
86            println!("accepted connection from {node_id}");
87
88            // Our protocol is a simple request-response protocol, so we expect the
89            // connecting peer to open a single bi-directional stream.
90            let (mut send, mut recv) = connection.accept_bi().await?;
91
92            let req = recv.read_to_end(4).await?;
93            assert_eq!(&req, b"PING");
94
95            // send back "PONG" bytes
96            send.write_all(b"PONG").await?;
97
98            // By calling `finish` on the send stream we signal that we will not send anything
99            // further, which makes the receive stream on the other end terminate.
100            send.finish()?;
101
102            // Wait until the remote closes the connection, which it does once it
103            // received the response.
104            connection.closed().await;
105
106            // increment count of pings we've received
107            metrics.pings_recv.inc();
108
109            Ok(())
110        })
111    }
112}
113
114/// Enum of metrics for the module
115#[derive(Debug, Default, MetricsGroup)]
116#[metrics(name = "ping")]
117pub struct Metrics {
118    /// count of valid ping messages sent
119    pub pings_sent: Counter,
120    /// count of valid ping messages received
121    pub pings_recv: Counter,
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use anyhow::Result;
128    use iroh::{Endpoint, protocol::Router};
129
130    #[tokio::test]
131    async fn test_ping() -> Result<()> {
132        let ep = Endpoint::builder().discovery_n0().bind().await?;
133        let router = Router::builder(ep).accept(ALPN, Ping::new()).spawn();
134        let addr = router.endpoint().node_addr().await?;
135
136        let client = Endpoint::builder().discovery_n0().bind().await?;
137        client.node_addr().await?;
138        let ping_client = Ping::new();
139        let res = ping_client.ping(&client, addr.clone()).await?;
140        println!("ping response: {:?}", res);
141
142        Ok(())
143    }
144}