iroh_ping/
lib.rs

1use std::{
2    sync::Arc,
3    time::{Duration, Instant},
4};
5
6use iroh::{
7    endpoint::Connection,
8    protocol::{AcceptError, ProtocolHandler},
9    Endpoint, NodeAddr,
10};
11use iroh_metrics::{Counter, MetricsGroup};
12
13/// Each protocol is identified by its ALPN string.
14///
15/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
16/// and the connection is aborted unless both nodes pass the same bytestring.
17pub const ALPN: &[u8] = b"iroh/ping/0";
18
19/// Ping is a struct that holds both the client ping method, and the endpoint
20/// protocol implementation
21#[derive(Debug, Clone)]
22pub struct Ping {
23    metrics: Arc<Metrics>,
24}
25
26impl Default for Ping {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl Ping {
33    /// create a new Ping
34    pub fn new() -> Self {
35        Self {
36            metrics: Arc::new(Metrics::default()),
37        }
38    }
39
40    /// handle to ping metrics
41    pub fn metrics(&self) -> &Arc<Metrics> {
42        &self.metrics
43    }
44
45    /// send a ping on the provided endpoint to a given node address
46    pub async fn ping(&self, endpoint: &Endpoint, addr: NodeAddr) -> anyhow::Result<Duration> {
47        let start = Instant::now();
48        // Open a connection to the accepting node
49        let conn = endpoint.connect(addr, ALPN).await?;
50
51        // Open a bidirectional QUIC stream
52        let (mut send, mut recv) = conn.open_bi().await?;
53
54        // Send some data to be pinged
55        send.write_all(b"PING").await?;
56
57        // Signal the end of data for this particular stream
58        send.finish()?;
59
60        // read the response, which must be PONG as bytes
61        let response = recv.read_to_end(4).await?;
62        assert_eq!(&response, b"PONG");
63
64        // Explicitly close the whole connection.
65        conn.close(0u32.into(), b"bye!");
66
67        // The above call only queues a close message to be sent (see how it's not async!).
68        // We need to actually call this to make sure this message is sent out.
69        endpoint.close().await;
70
71        // at this point we've successfully pinged, mark the metric
72        self.metrics.pings_sent.inc();
73
74        // If we don't call this, but continue using the endpoint, we then the queued
75        // close call will eventually be picked up and sent.
76        // But always try to wait for endpoint.close().await to go through before dropping
77        // the endpoint to ensure any queued messages are sent through and connections are
78        // closed gracefully.
79        Ok(Duration::from_millis(
80            Instant::now().duration_since(start).as_millis() as u64,
81        ))
82    }
83}
84
85impl ProtocolHandler for Ping {
86    /// The `accept` method is called for each incoming connection for our ALPN.
87    ///
88    /// The returned future runs on a newly spawned tokio task, so it can run as long as
89    /// the connection lasts.
90    async fn accept(&self, connection: Connection) -> n0_snafu::Result<(), AcceptError> {
91        let metrics = self.metrics.clone();
92
93        // We can get the remote's node id from the connection.
94        let node_id = connection.remote_node_id()?;
95        println!("accepted connection from {node_id}");
96
97        // Our protocol is a simple request-response protocol, so we expect the
98        // connecting peer to open a single bi-directional stream.
99        let (mut send, mut recv) = connection.accept_bi().await?;
100
101        let req = recv.read_to_end(4).await.map_err(AcceptError::from_err)?;
102        assert_eq!(&req, b"PING");
103
104        // send back "PONG" bytes
105        send.write_all(b"PONG")
106            .await
107            .map_err(AcceptError::from_err)?;
108
109        // By calling `finish` on the send stream we signal that we will not send anything
110        // further, which makes the receive stream on the other end terminate.
111        send.finish()?;
112
113        // Wait until the remote closes the connection, which it does once it
114        // received the response.
115        connection.closed().await;
116
117        // increment count of pings we've received
118        metrics.pings_recv.inc();
119
120        Ok(())
121    }
122}
123
124/// Enum of metrics for the module
125#[derive(Debug, Default, MetricsGroup)]
126#[metrics(name = "ping")]
127pub struct Metrics {
128    /// count of valid ping messages sent
129    pub pings_sent: Counter,
130    /// count of valid ping messages received
131    pub pings_recv: Counter,
132}
133
134#[cfg(test)]
135mod tests {
136    use iroh::{protocol::Router, Endpoint, Watcher};
137
138    use super::*;
139
140    #[tokio::test]
141    async fn test_ping() -> anyhow::Result<()> {
142        let ep = Endpoint::builder().discovery_n0().bind().await?;
143        let router = Router::builder(ep).accept(ALPN, Ping::new()).spawn();
144        let addr = router.endpoint().node_addr().initialized().await?;
145
146        let client = Endpoint::builder().discovery_n0().bind().await?;
147        let ping_client = Ping::new();
148        let res = ping_client.ping(&client, addr.clone()).await?;
149        println!("ping response: {res:?}");
150
151        Ok(())
152    }
153}