iroh_net/magicsock/
node_map.rs

1use std::{
2    collections::{hash_map::Entry, BTreeSet, HashMap},
3    hash::Hash,
4    net::{IpAddr, SocketAddr},
5    pin::Pin,
6    task::{Context, Poll},
7    time::Instant,
8};
9
10use futures_lite::stream::Stream;
11use iroh_base::key::NodeId;
12use iroh_metrics::inc;
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use stun_rs::TransactionId;
16use tracing::{debug, info, instrument, trace, warn};
17
18use self::{
19    best_addr::ClearReason,
20    node_state::{NodeState, Options, PingHandled},
21};
22use super::{
23    metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr,
24};
25use crate::{
26    disco::{CallMeMaybe, Pong, SendAddr},
27    key::PublicKey,
28    relay::RelayUrl,
29    stun, NodeAddr,
30};
31
32mod best_addr;
33mod node_state;
34mod path_state;
35mod udp_paths;
36
37pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo};
38pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing};
39
40/// Number of nodes that are inactive for which we keep info about. This limit is enforced
41/// periodically via [`NodeMap::prune_inactive`].
42const MAX_INACTIVE_NODES: usize = 30;
43
44/// Map of the [`NodeState`] information for all the known nodes.
45///
46/// The nodes can be looked up by:
47///
48/// - The node's ID in this map, only useful if you know the ID from an insert or lookup.
49///   This is static and never changes.
50///
51/// - The [`QuicMappedAddr`] which internally identifies the node to the QUIC stack.  This
52///   is static and never changes.
53///
54/// - The nodes's public key, aka `PublicKey` or "node_key".  This is static and never changes,
55///   however a node could be added when this is not yet known.
56///
57/// - A public socket address on which they are reachable on the internet, known as ip-port.
58///   These come and go as the node moves around on the internet
59///
60/// An index of nodeInfos by node key, QuicMappedAddr, and discovered ip:port endpoints.
61#[derive(Default, Debug)]
62pub(super) struct NodeMap {
63    inner: Mutex<NodeMapInner>,
64}
65
66#[derive(Default, Debug)]
67pub(super) struct NodeMapInner {
68    by_node_key: HashMap<NodeId, usize>,
69    by_ip_port: HashMap<IpPort, usize>,
70    by_quic_mapped_addr: HashMap<QuicMappedAddr, usize>,
71    by_id: HashMap<usize, NodeState>,
72    next_id: usize,
73}
74
75/// Identifier to look up a [`NodeState`] in the [`NodeMap`].
76///
77/// You can look up entries in [`NodeMap`] with various keys, depending on the context you
78/// have for the node.  These are all the keys the [`NodeMap`] can use.
79#[derive(Debug, Clone)]
80enum NodeStateKey {
81    Idx(usize),
82    NodeId(NodeId),
83    QuicMappedAddr(QuicMappedAddr),
84    IpPort(IpPort),
85}
86
87/// The origin or *source* through which an address associated with a remote node
88/// was discovered.
89///
90/// An aggregate of the [`Source`]s of all the addresses of a node describe the
91/// [`Source`]s of the node itself.
92///
93/// A [`Source`] helps track how and where an address was learned. Multiple
94/// sources can be associated with a single address, if we have discovered this
95/// address through multiple means.
96///
97/// Each time a [`NodeAddr`] is added to the node map, usually through
98/// [`crate::endpoint::Endpoint::add_node_addr_with_source`], a [`Source`] must be supplied to indicate
99/// how the address was obtained.
100///
101/// A [`Source`] can describe a variety of places that an address or node was
102/// discovered, such as a configured discovery service, the network itself
103/// (if another node has reached out to us), or as a user supplied [`NodeAddr`].
104
105#[derive(Serialize, Deserialize, strum::Display, Debug, Clone, Eq, PartialEq, Hash)]
106#[strum(serialize_all = "kebab-case")]
107pub enum Source {
108    /// Address was loaded from the fs.
109    Saved,
110    /// A node communicated with us first via UDP.
111    Udp,
112    /// A node communicated with us first via relay.
113    Relay,
114    /// Application layer added the address directly.
115    App,
116    /// The address was discovered by a discovery service.
117    #[strum(serialize = "{name}")]
118    Discovery {
119        /// The name of the discovery service that discovered the address.
120        name: String,
121    },
122    /// Application layer with a specific name added the node directly.
123    #[strum(serialize = "{name}")]
124    NamedApp {
125        /// The name of the application that added the node
126        name: String,
127    },
128}
129
130impl NodeMap {
131    /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s.
132    pub(super) fn load_from_vec(nodes: Vec<NodeAddr>) -> Self {
133        Self::from_inner(NodeMapInner::load_from_vec(nodes))
134    }
135
136    fn from_inner(inner: NodeMapInner) -> Self {
137        Self {
138            inner: Mutex::new(inner),
139        }
140    }
141
142    /// Add the contact information for a node.
143    pub(super) fn add_node_addr(&self, node_addr: NodeAddr, source: Source) {
144        self.inner.lock().add_node_addr(node_addr, source)
145    }
146
147    /// Number of nodes currently listed.
148    pub(super) fn node_count(&self) -> usize {
149        self.inner.lock().node_count()
150    }
151
152    pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> {
153        self.inner.lock().receive_udp(udp_addr)
154    }
155
156    pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
157        self.inner.lock().receive_relay(relay_url, src)
158    }
159
160    pub(super) fn notify_ping_sent(
161        &self,
162        id: usize,
163        dst: SendAddr,
164        tx_id: stun::TransactionId,
165        purpose: DiscoPingPurpose,
166        msg_sender: tokio::sync::mpsc::Sender<ActorMessage>,
167    ) {
168        if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
169            ep.ping_sent(dst, tx_id, purpose, msg_sender);
170        }
171    }
172
173    pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun::TransactionId) {
174        if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) {
175            ep.ping_timeout(tx_id);
176        }
177    }
178
179    pub(super) fn get_quic_mapped_addr_for_node_key(
180        &self,
181        node_key: NodeId,
182    ) -> Option<QuicMappedAddr> {
183        self.inner
184            .lock()
185            .get(NodeStateKey::NodeId(node_key))
186            .map(|ep| *ep.quic_mapped_addr())
187    }
188
189    /// Insert a received ping into the node map, and return whether a ping with this tx_id was already
190    /// received.
191    pub(super) fn handle_ping(
192        &self,
193        sender: PublicKey,
194        src: SendAddr,
195        tx_id: TransactionId,
196    ) -> PingHandled {
197        self.inner.lock().handle_ping(sender, src, tx_id)
198    }
199
200    pub(super) fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) {
201        self.inner.lock().handle_pong(sender, src, pong)
202    }
203
204    #[must_use = "actions must be handled"]
205    pub(super) fn handle_call_me_maybe(
206        &self,
207        sender: PublicKey,
208        cm: CallMeMaybe,
209    ) -> Vec<PingAction> {
210        self.inner.lock().handle_call_me_maybe(sender, cm)
211    }
212
213    #[allow(clippy::type_complexity)]
214    pub(super) fn get_send_addrs(
215        &self,
216        addr: QuicMappedAddr,
217        have_ipv6: bool,
218    ) -> Option<(
219        PublicKey,
220        Option<SocketAddr>,
221        Option<RelayUrl>,
222        Vec<PingAction>,
223    )> {
224        let mut inner = self.inner.lock();
225        let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?;
226        let public_key = *ep.public_key();
227        trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId");
228        let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6);
229        Some((public_key, udp_addr, relay_url, msgs))
230    }
231
232    pub(super) fn notify_shutdown(&self) {
233        let mut inner = self.inner.lock();
234        for (_, ep) in inner.node_states_mut() {
235            ep.reset();
236        }
237    }
238
239    pub(super) fn reset_node_states(&self) {
240        let mut inner = self.inner.lock();
241        for (_, ep) in inner.node_states_mut() {
242            ep.note_connectivity_change();
243        }
244    }
245
246    pub(super) fn nodes_stayin_alive(&self) -> Vec<PingAction> {
247        let mut inner = self.inner.lock();
248        inner
249            .node_states_mut()
250            .flat_map(|(_idx, node_state)| node_state.stayin_alive())
251            .collect()
252    }
253
254    /// Returns the [`RemoteInfo`]s for each node in the node map.
255    pub(super) fn list_remote_infos(&self, now: Instant) -> Vec<RemoteInfo> {
256        // NOTE: calls to this method will often call `into_iter` (or similar methods). Note that
257        // we can't avoid `collect` here since it would hold a lock for an indefinite time. Even if
258        // we were to find this acceptable, dealing with the lifetimes of the mutex's guard and the
259        // internal iterator will be a hassle, if possible at all.
260        self.inner.lock().remote_infos_iter(now).collect()
261    }
262
263    /// Returns a stream of [`ConnectionType`].
264    ///
265    /// Sends the current [`ConnectionType`] whenever any changes to the
266    /// connection type for `public_key` has occurred.
267    ///
268    /// # Errors
269    ///
270    /// Will return an error if there is not an entry in the [`NodeMap`] for
271    /// the `public_key`
272    pub(super) fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
273        self.inner.lock().conn_type_stream(node_id)
274    }
275
276    /// Get the [`RemoteInfo`]s for the node identified by [`NodeId`].
277    pub(super) fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
278        self.inner.lock().remote_info(node_id)
279    }
280
281    /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept.
282    pub(super) fn prune_inactive(&self) {
283        self.inner.lock().prune_inactive();
284    }
285
286    pub(crate) fn on_direct_addr_discovered(&self, discovered: BTreeSet<SocketAddr>) {
287        self.inner.lock().on_direct_addr_discovered(discovered);
288    }
289}
290
291impl NodeMapInner {
292    /// Create a new [`NodeMap`] from a list of [`NodeAddr`]s.
293    fn load_from_vec(nodes: Vec<NodeAddr>) -> Self {
294        let mut me = Self::default();
295        for node_addr in nodes {
296            me.add_node_addr(node_addr, Source::Saved);
297        }
298        me
299    }
300
301    /// Add the contact information for a node.
302    #[instrument(skip_all, fields(node = %node_addr.node_id.fmt_short()))]
303    fn add_node_addr(&mut self, node_addr: NodeAddr, source: Source) {
304        let NodeAddr { node_id, info } = node_addr;
305
306        let source0 = source.clone();
307        let node_state = self.get_or_insert_with(NodeStateKey::NodeId(node_id), || Options {
308            node_id,
309            relay_url: info.relay_url.clone(),
310            active: false,
311            source,
312        });
313        node_state.update_from_node_addr(&info, source0);
314        let id = node_state.id();
315        for addr in &info.direct_addresses {
316            self.set_node_state_for_ip_port(*addr, id);
317        }
318    }
319
320    /// Prunes direct addresses from nodes that claim to share an address we know points to us.
321    pub(super) fn on_direct_addr_discovered(&mut self, discovered: BTreeSet<SocketAddr>) {
322        for addr in discovered {
323            self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr)
324        }
325    }
326
327    /// Removes a direct address from a node.
328    fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason) {
329        if let Some(id) = self.by_ip_port.remove(&ipp) {
330            if let Entry::Occupied(mut entry) = self.by_id.entry(id) {
331                let node = entry.get_mut();
332                node.remove_direct_addr(&ipp, reason);
333                if node.direct_addresses().count() == 0 {
334                    let node_id = node.public_key();
335                    let mapped_addr = node.quic_mapped_addr();
336                    self.by_node_key.remove(node_id);
337                    self.by_quic_mapped_addr.remove(mapped_addr);
338                    debug!(node_id=%node_id.fmt_short(), ?reason, "removing node");
339                    entry.remove();
340                }
341            }
342        }
343    }
344
345    fn get_id(&self, id: NodeStateKey) -> Option<usize> {
346        match id {
347            NodeStateKey::Idx(id) => Some(id),
348            NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(),
349            NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(),
350            NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(),
351        }
352    }
353
354    fn get_mut(&mut self, id: NodeStateKey) -> Option<&mut NodeState> {
355        self.get_id(id).and_then(|id| self.by_id.get_mut(&id))
356    }
357
358    fn get(&self, id: NodeStateKey) -> Option<&NodeState> {
359        self.get_id(id).and_then(|id| self.by_id.get(&id))
360    }
361
362    fn get_or_insert_with(
363        &mut self,
364        id: NodeStateKey,
365        f: impl FnOnce() -> Options,
366    ) -> &mut NodeState {
367        let id = self.get_id(id);
368        match id {
369            None => self.insert_node(f()),
370            Some(id) => self.by_id.get_mut(&id).expect("is not empty"),
371        }
372    }
373
374    /// Number of nodes currently listed.
375    fn node_count(&self) -> usize {
376        self.by_id.len()
377    }
378
379    /// Marks the node we believe to be at `ipp` as recently used.
380    fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> {
381        let ip_port: IpPort = udp_addr.into();
382        let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else {
383            info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore");
384            return None;
385        };
386        node_state.receive_udp(ip_port, Instant::now());
387        Some((*node_state.public_key(), *node_state.quic_mapped_addr()))
388    }
389
390    #[instrument(skip_all, fields(src = %src.fmt_short()))]
391    fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr {
392        let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || {
393            trace!("packets from unknown node, insert into node map");
394            Options {
395                node_id: src,
396                relay_url: Some(relay_url.clone()),
397                active: true,
398                source: Source::Relay,
399            }
400        });
401        node_state.receive_relay(relay_url, src, Instant::now());
402        *node_state.quic_mapped_addr()
403    }
404
405    fn node_states(&self) -> impl Iterator<Item = (&usize, &NodeState)> {
406        self.by_id.iter()
407    }
408
409    fn node_states_mut(&mut self) -> impl Iterator<Item = (&usize, &mut NodeState)> {
410        self.by_id.iter_mut()
411    }
412
413    /// Get the [`RemoteInfo`]s for all nodes.
414    fn remote_infos_iter(&self, now: Instant) -> impl Iterator<Item = RemoteInfo> + '_ {
415        self.node_states().map(move |(_, ep)| ep.info(now))
416    }
417
418    /// Get the [`RemoteInfo`]s for each node.
419    fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
420        self.get(NodeStateKey::NodeId(node_id))
421            .map(|ep| ep.info(Instant::now()))
422    }
423
424    /// Returns a stream of [`ConnectionType`].
425    ///
426    /// Sends the current [`ConnectionType`] whenever any changes to the
427    /// connection type for `public_key` has occurred.
428    ///
429    /// # Errors
430    ///
431    /// Will return an error if there is not an entry in the [`NodeMap`] for
432    /// the `public_key`
433    fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result<ConnectionTypeStream> {
434        match self.get(NodeStateKey::NodeId(node_id)) {
435            Some(ep) => Ok(ConnectionTypeStream {
436                initial: Some(ep.conn_type()),
437                inner: ep.conn_type_stream(),
438            }),
439            None => anyhow::bail!("No endpoint for {node_id:?} found"),
440        }
441    }
442
443    fn handle_pong(&mut self, sender: NodeId, src: &DiscoMessageSource, pong: Pong) {
444        if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() {
445            let insert = ns.handle_pong(&pong, src.into());
446            if let Some((src, key)) = insert {
447                self.set_node_key_for_ip_port(src, &key);
448            }
449            trace!(?insert, "received pong")
450        } else {
451            warn!("received pong: node unknown, ignore")
452        }
453    }
454
455    #[must_use = "actions must be handled"]
456    fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe) -> Vec<PingAction> {
457        let ns_id = NodeStateKey::NodeId(sender);
458        if let Some(id) = self.get_id(ns_id.clone()) {
459            for number in &cm.my_numbers {
460                // ensure the new addrs are known
461                self.set_node_state_for_ip_port(*number, id);
462            }
463        }
464        match self.get_mut(ns_id) {
465            None => {
466                inc!(MagicsockMetrics, recv_disco_call_me_maybe_bad_disco);
467                debug!("received call-me-maybe: ignore, node is unknown");
468                vec![]
469            }
470            Some(ns) => {
471                debug!(endpoints = ?cm.my_numbers, "received call-me-maybe");
472
473                ns.handle_call_me_maybe(cm)
474            }
475        }
476    }
477
478    fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled {
479        let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || {
480            debug!("received ping: node unknown, add to node map");
481            let source = if src.is_relay() {
482                Source::Relay
483            } else {
484                Source::Udp
485            };
486            Options {
487                node_id: sender,
488                relay_url: src.relay_url(),
489                active: true,
490                source,
491            }
492        });
493
494        let handled = node_state.handle_ping(src.clone(), tx_id);
495        if let SendAddr::Udp(ref addr) = src {
496            if matches!(handled.role, PingRole::NewPath) {
497                self.set_node_key_for_ip_port(*addr, &sender);
498            }
499        }
500        handled
501    }
502
503    /// Inserts a new node into the [`NodeMap`].
504    fn insert_node(&mut self, options: Options) -> &mut NodeState {
505        info!(
506            node = %options.node_id.fmt_short(),
507            relay_url = ?options.relay_url,
508            source = %options.source,
509            "inserting new node in NodeMap",
510        );
511        let id = self.next_id;
512        self.next_id = self.next_id.wrapping_add(1);
513        let node_state = NodeState::new(id, options);
514
515        // update indices
516        self.by_quic_mapped_addr
517            .insert(*node_state.quic_mapped_addr(), id);
518        self.by_node_key.insert(*node_state.public_key(), id);
519
520        self.by_id.insert(id, node_state);
521        self.by_id.get_mut(&id).expect("just inserted")
522    }
523
524    /// Makes future node lookups by ipp return the same endpoint as a lookup by nk.
525    ///
526    /// This should only be called with a fully verified mapping of ipp to
527    /// nk, because calling this function defines the endpoint we hand to
528    /// WireGuard for packets received from ipp.
529    fn set_node_key_for_ip_port(&mut self, ipp: impl Into<IpPort>, nk: &PublicKey) {
530        let ipp = ipp.into();
531        if let Some(id) = self.by_ip_port.get(&ipp) {
532            if !self.by_node_key.contains_key(nk) {
533                self.by_node_key.insert(*nk, *id);
534            }
535            self.by_ip_port.remove(&ipp);
536        }
537        if let Some(id) = self.by_node_key.get(nk) {
538            trace!("insert ip -> id: {:?} -> {}", ipp, id);
539            self.by_ip_port.insert(ipp, *id);
540        }
541    }
542
543    fn set_node_state_for_ip_port(&mut self, ipp: impl Into<IpPort>, id: usize) {
544        let ipp = ipp.into();
545        trace!(?ipp, ?id, "set endpoint for ip:port");
546        self.by_ip_port.insert(ipp, id);
547    }
548
549    /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept.
550    fn prune_inactive(&mut self) {
551        let now = Instant::now();
552        let mut prune_candidates: Vec<_> = self
553            .by_id
554            .values()
555            .filter(|node| !node.is_active(&now))
556            .map(|node| (*node.public_key(), node.last_used()))
557            .collect();
558
559        let prune_count = prune_candidates.len().saturating_sub(MAX_INACTIVE_NODES);
560        if prune_count == 0 {
561            // within limits
562            return;
563        }
564
565        prune_candidates.sort_unstable_by_key(|(_pk, last_used)| *last_used);
566        prune_candidates.truncate(prune_count);
567        for (public_key, last_used) in prune_candidates.into_iter() {
568            let node = public_key.fmt_short();
569            match last_used.map(|instant| instant.elapsed()) {
570                Some(last_used) => trace!(%node, ?last_used, "pruning inactive"),
571                None => trace!(%node, last_used=%"never", "pruning inactive"),
572            }
573
574            let Some(id) = self.by_node_key.remove(&public_key) else {
575                debug_assert!(false, "missing by_node_key entry for pk in by_id");
576                continue;
577            };
578
579            let Some(ep) = self.by_id.remove(&id) else {
580                debug_assert!(false, "missing by_id entry for id in by_node_key");
581                continue;
582            };
583
584            for ip_port in ep.direct_addresses() {
585                self.by_ip_port.remove(&ip_port);
586            }
587
588            self.by_quic_mapped_addr.remove(ep.quic_mapped_addr());
589        }
590    }
591}
592
593/// Stream returning `ConnectionTypes`
594#[derive(Debug)]
595pub struct ConnectionTypeStream {
596    initial: Option<ConnectionType>,
597    inner: watchable::WatcherStream<ConnectionType>,
598}
599
600impl Stream for ConnectionTypeStream {
601    type Item = ConnectionType;
602
603    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
604        let this = &mut *self;
605        if let Some(initial_conn_type) = this.initial.take() {
606            return Poll::Ready(Some(initial_conn_type));
607        }
608        Pin::new(&mut this.inner).poll_next(cx)
609    }
610}
611
612/// An (Ip, Port) pair.
613///
614/// NOTE: storing an [`IpPort`] is safer than storing a [`SocketAddr`] because for IPv6 socket
615/// addresses include fields that can't be assumed consistent even within a single connection.
616#[derive(Debug, derive_more::Display, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
617#[display("{}", SocketAddr::from(*self))]
618pub struct IpPort {
619    ip: IpAddr,
620    port: u16,
621}
622
623impl From<SocketAddr> for IpPort {
624    fn from(socket_addr: SocketAddr) -> Self {
625        Self {
626            ip: socket_addr.ip(),
627            port: socket_addr.port(),
628        }
629    }
630}
631
632impl From<IpPort> for SocketAddr {
633    fn from(ip_port: IpPort) -> Self {
634        let IpPort { ip, port } = ip_port;
635        (ip, port).into()
636    }
637}
638
639impl IpPort {
640    pub fn ip(&self) -> &IpAddr {
641        &self.ip
642    }
643
644    pub fn port(&self) -> u16 {
645        self.port
646    }
647}
648
649#[cfg(test)]
650mod tests {
651    use std::net::Ipv4Addr;
652
653    use super::{node_state::MAX_INACTIVE_DIRECT_ADDRESSES, *};
654    use crate::key::SecretKey;
655
656    impl NodeMap {
657        #[track_caller]
658        fn add_test_addr(&self, node_addr: NodeAddr) {
659            self.add_node_addr(
660                node_addr,
661                Source::NamedApp {
662                    name: "test".into(),
663                },
664            )
665        }
666    }
667
668    /// Test persisting and loading of known nodes.
669    #[tokio::test]
670    async fn restore_from_vec() {
671        let _guard = iroh_test::logging::setup();
672
673        let node_map = NodeMap::default();
674
675        let node_a = SecretKey::generate().public();
676        let node_b = SecretKey::generate().public();
677        let node_c = SecretKey::generate().public();
678        let node_d = SecretKey::generate().public();
679
680        let relay_x: RelayUrl = "https://my-relay-1.com".parse().unwrap();
681        let relay_y: RelayUrl = "https://my-relay-2.com".parse().unwrap();
682
683        let direct_addresses_a = [addr(4000), addr(4001)];
684        let direct_addresses_c = [addr(5000)];
685
686        let node_addr_a = NodeAddr::new(node_a)
687            .with_relay_url(relay_x)
688            .with_direct_addresses(direct_addresses_a);
689        let node_addr_b = NodeAddr::new(node_b).with_relay_url(relay_y);
690        let node_addr_c = NodeAddr::new(node_c).with_direct_addresses(direct_addresses_c);
691        let node_addr_d = NodeAddr::new(node_d);
692
693        node_map.add_test_addr(node_addr_a);
694        node_map.add_test_addr(node_addr_b);
695        node_map.add_test_addr(node_addr_c);
696        node_map.add_test_addr(node_addr_d);
697
698        let mut addrs: Vec<NodeAddr> = node_map
699            .list_remote_infos(Instant::now())
700            .into_iter()
701            .filter_map(|info| {
702                let addr: NodeAddr = info.into();
703                if addr.info.is_empty() {
704                    return None;
705                }
706                Some(addr)
707            })
708            .collect();
709        let loaded_node_map = NodeMap::load_from_vec(addrs.clone());
710
711        let mut loaded: Vec<NodeAddr> = loaded_node_map
712            .list_remote_infos(Instant::now())
713            .into_iter()
714            .filter_map(|info| {
715                let addr: NodeAddr = info.into();
716                if addr.info.is_empty() {
717                    return None;
718                }
719                Some(addr)
720            })
721            .collect();
722
723        loaded.sort_unstable();
724        addrs.sort_unstable();
725
726        // compare the node maps via their known nodes
727        assert_eq!(addrs, loaded);
728    }
729
730    fn addr(port: u16) -> SocketAddr {
731        (std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), port).into()
732    }
733
734    #[test]
735    fn test_prune_direct_addresses() {
736        let _guard = iroh_test::logging::setup();
737
738        let node_map = NodeMap::default();
739        let public_key = SecretKey::generate().public();
740        let id = node_map
741            .inner
742            .lock()
743            .insert_node(Options {
744                node_id: public_key,
745                relay_url: None,
746                active: false,
747                source: Source::NamedApp {
748                    name: "test".into(),
749                },
750            })
751            .id();
752
753        const LOCALHOST: IpAddr = IpAddr::V4(std::net::Ipv4Addr::LOCALHOST);
754
755        // add [`MAX_INACTIVE_DIRECT_ADDRESSES`] active direct addresses and double
756        // [`MAX_INACTIVE_DIRECT_ADDRESSES`] that are inactive
757
758        info!("Adding active addresses");
759        for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES {
760            let addr = SocketAddr::new(LOCALHOST, 5000 + i as u16);
761            let node_addr = NodeAddr::new(public_key).with_direct_addresses([addr]);
762            // add address
763            node_map.add_test_addr(node_addr);
764            // make it active
765            node_map.inner.lock().receive_udp(addr);
766        }
767
768        info!("Adding offline/inactive addresses");
769        for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES * 2 {
770            let addr = SocketAddr::new(LOCALHOST, 6000 + i as u16);
771            let node_addr = NodeAddr::new(public_key).with_direct_addresses([addr]);
772            node_map.add_test_addr(node_addr);
773        }
774
775        let mut node_map_inner = node_map.inner.lock();
776        let endpoint = node_map_inner.by_id.get_mut(&id).unwrap();
777
778        info!("Adding alive addresses");
779        for i in 0..MAX_INACTIVE_DIRECT_ADDRESSES {
780            let addr = SendAddr::Udp(SocketAddr::new(LOCALHOST, 7000 + i as u16));
781            let txid = stun::TransactionId::from([i as u8; 12]);
782            // Note that this already invokes .prune_direct_addresses() because these are
783            // new UDP paths.
784            endpoint.handle_ping(addr, txid);
785        }
786
787        info!("Pruning addresses");
788        endpoint.prune_direct_addresses();
789
790        // Half the offline addresses should have been pruned.  All the active and alive
791        // addresses should have been kept.
792        assert_eq!(
793            endpoint.direct_addresses().count(),
794            MAX_INACTIVE_DIRECT_ADDRESSES * 3
795        );
796
797        // We should have both offline and alive addresses which are not active.
798        assert_eq!(
799            endpoint
800                .direct_address_states()
801                .filter(|(_addr, state)| !state.is_active())
802                .count(),
803            MAX_INACTIVE_DIRECT_ADDRESSES * 2
804        )
805    }
806
807    #[test]
808    fn test_prune_inactive() {
809        let node_map = NodeMap::default();
810        // add one active node and more than MAX_INACTIVE_NODES inactive nodes
811        let active_node = SecretKey::generate().public();
812        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 167);
813        node_map.add_test_addr(NodeAddr::new(active_node).with_direct_addresses([addr]));
814        node_map.inner.lock().receive_udp(addr).expect("registered");
815
816        for _ in 0..MAX_INACTIVE_NODES + 1 {
817            let node = SecretKey::generate().public();
818            node_map.add_test_addr(NodeAddr::new(node));
819        }
820
821        assert_eq!(node_map.node_count(), MAX_INACTIVE_NODES + 2);
822        node_map.prune_inactive();
823        assert_eq!(node_map.node_count(), MAX_INACTIVE_NODES + 1);
824        node_map
825            .inner
826            .lock()
827            .get(NodeStateKey::NodeId(active_node))
828            .expect("should not be pruned");
829    }
830}