iroh 0.98.0

p2p quic connections dialed by public key
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
//! Example for using an iroh hook to collect information about remote endpoints.
//!
//! This implements a [`RemoteMap`] which collects information about all connections and paths from an iroh endpoint.
//! The remote map can be cloned and inspected from other tasks at any time. It contains both data about all
//! currently active connections, and an aggregate status for each remote that remains available even after
//! all connections to the endpoint have been closed.

use std::time::{Duration, SystemTime};

use iroh::{Endpoint, EndpointAddr, endpoint::presets};
use n0_error::{Result, StackResultExt, StdResultExt, ensure_any};
use n0_future::IterExt;
use tracing::{Instrument, info, info_span};

use crate::remote_map::RemoteMap;

const ALPN: &[u8] = b"iroh/test";

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
        )
        .init();

    // Create the remote map and hook.
    let (hook, remote_map) = RemoteMap::new();

    // Bind our endpoint and install the remote map hook.
    let server = Endpoint::builder(presets::N0)
        .alpns(vec![ALPN.to_vec()])
        .hooks(hook)
        .bind()
        .instrument(info_span!("server"))
        .await?;
    // Wait for our endpoint to be fully online.
    server.online().await;
    let server_addr = server.addr();

    // Spawn a task that creates `count` client endpoints that each connect to our server.
    let count = 3;
    let client_task = tokio::spawn(run_clients(server_addr, count));

    // Spawn a task that prints info from the remote map while some connections are active.
    // You can use this info to make decisions about remotes.
    let _inspect_task = tokio::task::spawn({
        let remote_map = remote_map.clone();
        async move {
            // Wait a bit.
            tokio::time::sleep(Duration::from_millis(500)).await;
            println!("== while connections are active == ");
            log_active(&remote_map);
            log_aggregate(&remote_map);
            println!();
        }
    });

    // Let the server accept `count` connections in parallel.
    // The server keeps all connections open for at least 500 milliseconds.
    std::iter::repeat_with(async || {
        let conn = server
            .accept()
            .await
            .context("server endpoint closed")?
            .await?;
        info!("accepted");
        let mut s = conn.open_uni().await.anyerr()?;
        // wait a bit.
        tokio::time::sleep(Duration::from_millis(500)).await;
        s.write_all(b"hi").await.anyerr()?;
        s.finish().anyerr()?;
        conn.closed().await;
        info!("closed");
        n0_error::Ok(())
    })
    .take(count)
    .enumerate()
    .map(|(i, fut)| fut.instrument(info_span!("server-conn", %i)))
    .try_join_all()
    .await?;

    // Print the remote map again.
    println!("== all connections closed ==");
    log_active(&remote_map);
    log_aggregate(&remote_map);

    server.close().await;
    client_task.await.std_context("client")?.context("client")?;

    Ok(())
}

/// Uses the current connection info to print info about a remote.
///
/// Uses the info about *currently active* connections, which return `None` if no connections are active.
fn log_active(remote_map: &RemoteMap) {
    println!("current remote state:");
    for (id, info) in remote_map.read().iter() {
        println!(
            "[{}] is_active {}, connections {}, ip_path {:?}, relay_path {:?}, current_min_rtt {:?}",
            id.fmt_short(),
            info.is_active(),
            info.connections().count(),
            info.has_ip_path(),
            info.has_relay_path(),
            info.current_min_rtt()
        );
    }
}

/// Uses the aggregated info to print info about a remote.
///
/// The aggregated info is updated for all connection and path changes, and stays at the latest values
/// even if all connections are closed.
fn log_aggregate(remote_map: &RemoteMap) {
    println!("aggregate remote state:");
    for (id, info) in remote_map.read().iter() {
        let aggregate = info.aggregate();
        println!(
            "[{}] min_rtt {:?}, max_rtt {:?}, ip_path {:?}, relay_path {}, last_update {:?} ago",
            id.fmt_short(),
            aggregate.rtt_min,
            aggregate.rtt_max,
            aggregate.ip_path,
            aggregate.relay_path,
            SystemTime::now()
                .duration_since(aggregate.last_update)
                .unwrap()
        );
    }
}

async fn run_clients(server_addr: EndpointAddr, count: usize) -> Result {
    std::iter::repeat_with(async || {
        let client = Endpoint::builder(presets::N0)
            .bind()
            .instrument(info_span!("client"))
            .await?;
        let conn = client.connect(server_addr.clone(), ALPN).await?;
        info!("connected");
        let mut s = conn.accept_uni().await.anyerr()?;
        let data = s.read_to_end(2).await.anyerr()?;
        ensure_any!(data == b"hi", "unexpected data");
        conn.close(23u32.into(), b"bye");
        info!("closed");
        client.close().await;
        n0_error::Ok(())
    })
    .take(count)
    .enumerate()
    .map(|(i, fut)| fut.instrument(info_span!("client", %i)))
    .try_join_all()
    .await?;
    Ok(())
}

mod remote_map {
    //! Implementation of a remote map and hook to track information about all remote endpoints to which an iroh endpoint
    //! has connections with.

    use std::{
        collections::HashMap,
        sync::{Arc, RwLock, RwLockReadGuard},
        time::{Duration, SystemTime},
    };

    use iroh::{
        EndpointId, Watcher,
        endpoint::{AfterHandshakeOutcome, ConnectionInfo, EndpointHooks, PathInfo},
    };
    use n0_future::task::AbortOnDropHandle;
    use tokio::{sync::mpsc, task::JoinSet};
    use tokio_stream::StreamExt;
    use tracing::{Instrument, debug, info, info_span};

    /// Information about a remote info.
    #[derive(Debug, Default)]
    pub struct RemoteInfo {
        aggregate: Aggregate,
        connections: HashMap<u64, ConnectionInfo>,
    }

    /// Aggregate information about a remote info.
    #[derive(Debug)]
    pub struct Aggregate {
        /// Minimal RTT observed over all paths to this remote.
        pub rtt_min: Duration,
        /// Maximal RTT observed over all paths to this remote.
        pub rtt_max: Duration,
        /// Whether we ever had an IP path to this remote.
        pub ip_path: bool,
        /// Whether we ever had a relay path to this remote.
        pub relay_path: bool,
        /// Time this aggregate was last updated.
        pub last_update: SystemTime,
    }

    impl Default for Aggregate {
        fn default() -> Self {
            Self {
                rtt_min: Duration::MAX,
                rtt_max: Duration::ZERO,
                ip_path: false,
                relay_path: false,
                last_update: SystemTime::UNIX_EPOCH,
            }
        }
    }

    impl Aggregate {
        fn update(&mut self, path: &PathInfo) {
            self.last_update = SystemTime::now();
            if path.is_ip() {
                self.ip_path = true;
            }
            if path.is_relay() {
                self.relay_path = true;
            }
            if let Some(stats) = path.stats() {
                debug!("path update addr {:?} {stats:?}", path.remote_addr());
                self.rtt_min = self.rtt_min.min(stats.rtt);
                self.rtt_max = self.rtt_max.max(stats.rtt);
            }
        }
    }

    impl RemoteInfo {
        /// Returns an aggregate of stats for this remote.
        ///
        /// This includes info from closed connections.
        pub fn aggregate(&self) -> &Aggregate {
            &self.aggregate
        }

        /// Returns the minimal RTT of all currently active paths.
        ///
        /// Returns `None` if there are no active connections.
        pub fn current_min_rtt(&self) -> Option<Duration> {
            self.connections()
                .flat_map(|c| c.paths().get().into_iter())
                .flat_map(|p| p.stats())
                .map(|s| s.rtt)
                .min()
        }

        /// Returns whether any active connection to the remote has an active IP path.
        ///
        /// Returns `None` if there are no active connections.
        pub fn has_ip_path(&self) -> Option<bool> {
            self.connections()
                .flat_map(|c| c.paths().get())
                .filter(|path| path.is_ip())
                .map(|_| true)
                .next()
        }

        /// Returns whether any active connection to the remote has an active relay path.
        ///
        /// Returns `None` if there are no active connections.
        pub fn has_relay_path(&self) -> Option<bool> {
            self.connections()
                .flat_map(|c| c.paths().get())
                .filter(|path| path.is_relay())
                .map(|_| true)
                .next()
        }

        /// Returns `true` if there are active connections to this node.
        pub fn is_active(&self) -> bool {
            !self.connections.is_empty()
        }

        /// Returns an iterator over [`ConnectionInfo`] for currently active connections to this remote.
        pub fn connections(&self) -> impl Iterator<Item = &ConnectionInfo> {
            self.connections.values()
        }
    }

    type RemoteMapInner = Arc<RwLock<HashMap<EndpointId, RemoteInfo>>>;

    /// Contains information about remote nodes our endpoint has or had connections with.
    #[derive(Clone, Debug)]
    pub struct RemoteMap {
        map: RemoteMapInner,
        _task: Arc<AbortOnDropHandle<()>>,
    }

    /// Hook to collect information about remote endpoints from an endpoint.
    #[derive(Debug)]
    pub struct RemoteMapHook {
        tx: mpsc::Sender<ConnectionInfo>,
    }

    impl EndpointHooks for RemoteMapHook {
        async fn after_handshake(&self, conn: &ConnectionInfo) -> AfterHandshakeOutcome {
            info!(remote=%conn.remote_id().fmt_short(), "after_handshake");
            self.tx.send(conn.clone()).await.ok();
            AfterHandshakeOutcome::Accept
        }
    }

    impl RemoteMap {
        /// Creates a new [`RemoteMapHook`] and [`RemoteMap`].
        pub fn new() -> (RemoteMapHook, Self) {
            Self::with_max_retention(Duration::from_secs(60 * 5))
        }

        /// Creates a new [`RemoteMapHook`] and [`RemoteMap`] and configure the retention time.
        ///
        /// `retention_time` is the time entries for remote endpoints remain in the map after the last connection has closed.
        pub fn with_max_retention(retention_time: Duration) -> (RemoteMapHook, Self) {
            let (tx, rx) = mpsc::channel(8);
            let map = RemoteMapInner::default();
            let task = tokio::spawn(
                Self::run(rx, map.clone(), retention_time)
                    .instrument(info_span!("remote-map-task")),
            );
            let map = Self {
                map,
                _task: Arc::new(AbortOnDropHandle::new(task)),
            };
            let hook = RemoteMapHook { tx };
            (hook, map)
        }

        /// Read the current state of the remote map.
        ///
        /// Returns a [`RwLockReadGuard`] with the actual remote map. Don't hold over await points!
        pub fn read(&self) -> RwLockReadGuard<'_, HashMap<EndpointId, RemoteInfo>> {
            self.map.read().expect("poisoned")
        }

        async fn run(
            mut rx: mpsc::Receiver<ConnectionInfo>,
            map: RemoteMapInner,
            retention_time: Duration,
        ) {
            let mut tasks = JoinSet::new();
            let mut conn_id = 0;

            // Spawn a task to clear expired entries.
            let expiry_task = tasks.spawn(Self::clear_expired(retention_time, map.clone()));

            // Main loop
            loop {
                tokio::select! {
                    conn = rx.recv() => {
                        match conn {
                            Some(conn) => {
                                conn_id += 1;
                                Self::on_connection(&mut tasks, map.clone(), conn_id, conn);
                            },
                            None => break,
                        }
                    }
                    Some(res) = tasks.join_next(), if !tasks.is_empty() => {
                        res.expect("conn close task panicked");
                    }
                }
            }

            // Abort expiry task and join remaining tasks.
            expiry_task.abort();
            while let Some(res) = tasks.join_next().await {
                if let Err(err) = &res
                    && !err.is_cancelled()
                {
                    res.expect("conn close task panicked");
                }
            }
        }

        fn on_connection(
            tasks: &mut JoinSet<()>,
            map: RemoteMapInner,
            conn_id: u64,
            conn: ConnectionInfo,
        ) {
            // Store conn info for full introspection possibility.
            {
                let mut inner = map.write().expect("poisoned");
                inner
                    .entry(conn.remote_id())
                    .or_default()
                    .connections
                    .insert(conn_id, conn.clone());
            }

            // Track connection closing to clear up the map.
            tasks.spawn({
                let conn = conn.clone();
                let map = map.clone();
                async move {
                    conn.closed().await;
                    {
                        let mut inner = map.write().expect("poisoned");
                        let info = inner.entry(conn.remote_id()).or_default();
                        info.connections.remove(&conn_id);
                        info.aggregate.last_update = SystemTime::now();
                    }
                }
                .instrument(tracing::Span::current())
            });

            // Track path changes to update stats aggregate.
            tasks.spawn({
                async move {
                    let mut path_updates = conn.paths().stream();
                    while let Some(paths) = path_updates.next().await {
                        {
                            let mut inner = map.write().expect("poisoned");
                            let info = inner.entry(conn.remote_id()).or_default();
                            for path in paths {
                                info.aggregate.update(&path);
                            }
                        }
                    }
                }
                .instrument(tracing::Span::current())
            });
        }

        async fn clear_expired(
            retention_time: Duration,
            map: Arc<RwLock<HashMap<iroh::PublicKey, RemoteInfo>>>,
        ) {
            let mut interval = tokio::time::interval(retention_time);
            loop {
                interval.tick().await;
                let now = SystemTime::now();
                let mut inner = map.write().expect("poisoned");
                inner.retain(|_remote, info| {
                    info.is_active()
                        || now.duration_since(info.aggregate().last_update).unwrap()
                            < retention_time
                });
            }
        }
    }
}