iroh_net/
magicsock.rs

1//! Implements a socket that can change its communication path while in use, actively searching for the best way to communicate.
2//!
3//! Based on tailscale/wgengine/magicsock
4//!
5//! ### `DEV_RELAY_ONLY` env var:
6//! When present at *compile time*, this env var will force all packets
7//! to be sent over the relay connection, regardless of whether or
8//! not we have a direct UDP address for the given node.
9//!
10//! The intended use is for testing the relay protocol inside the MagicSock
11//! to ensure that we can rely on the relay to send packets when two nodes
12//! are unable to find direct UDP connections to each other.
13//!
14//! This also prevent this node from attempting to hole punch and prevents it
15//! from responding to any hole punching attempts. This node will still,
16//! however, read any packets that come off the UDP sockets.
17
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt::Display,
21    io,
22    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
23    pin::Pin,
24    sync::{
25        atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
26        Arc, RwLock,
27    },
28    task::{Context, Poll, Waker},
29    time::{Duration, Instant},
30};
31
32use anyhow::{anyhow, Context as _, Result};
33use bytes::Bytes;
34use futures_lite::{FutureExt, Stream, StreamExt};
35use futures_util::stream::BoxStream;
36use iroh_base::key::NodeId;
37use iroh_metrics::{inc, inc_by};
38use netwatch::{interfaces, ip::LocalAddresses, netmon};
39use quinn::AsyncUdpSocket;
40use rand::{seq::SliceRandom, Rng, SeedableRng};
41use smallvec::{smallvec, SmallVec};
42use tokio::{
43    sync::{self, mpsc, Mutex},
44    task::JoinSet,
45    time,
46};
47use tokio_util::sync::CancellationToken;
48use tracing::{
49    debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn,
50    Instrument, Level, Span,
51};
52use url::Url;
53use watchable::Watchable;
54
55use self::{
56    metrics::Metrics as MagicsockMetrics,
57    node_map::{NodeMap, PingAction, PingRole, SendPing},
58    relay_actor::{RelayActor, RelayActorMessage, RelayReadResult},
59    udp_conn::UdpConn,
60};
61use crate::{
62    defaults::timeouts::NETCHECK_REPORT_TIMEOUT,
63    disco::{self, CallMeMaybe, SendAddr},
64    discovery::{Discovery, DiscoveryItem},
65    dns::DnsResolver,
66    endpoint::NodeAddr,
67    key::{PublicKey, SecretKey, SharedSecret},
68    netcheck,
69    relay::{RelayMap, RelayUrl},
70    stun, AddrInfo,
71};
72
73mod metrics;
74mod node_map;
75mod relay_actor;
76mod timer;
77mod udp_conn;
78
79pub use node_map::Source;
80
81pub(super) use self::timer::Timer;
82pub use self::{
83    metrics::Metrics,
84    node_map::{ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, RemoteInfo},
85};
86
87/// How long we consider a STUN-derived endpoint valid for. UDP NAT mappings typically
88/// expire at 30 seconds, so this is a few seconds shy of that.
89const ENDPOINTS_FRESH_ENOUGH_DURATION: Duration = Duration::from_secs(27);
90
91const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
92
93/// Contains options for `MagicSock::listen`.
94#[derive(derive_more::Debug)]
95pub(crate) struct Options {
96    /// The IPv4 address to listen on.
97    ///
98    /// If set to `None` it will choose a random port and listen on `0.0.0.0:0`.
99    pub(crate) addr_v4: Option<SocketAddrV4>,
100    /// The IPv6 address to listen on.
101    ///
102    /// If set to `None` it will choose a random port and listen on `[::]:0`.
103    pub(crate) addr_v6: Option<SocketAddrV6>,
104
105    /// Secret key for this node.
106    pub(crate) secret_key: SecretKey,
107
108    /// The [`RelayMap`] to use, leave empty to not use a relay server.
109    pub(crate) relay_map: RelayMap,
110
111    /// An optional [`NodeMap`], to restore information about nodes.
112    pub(crate) node_map: Option<Vec<NodeAddr>>,
113
114    /// Optional node discovery mechanism.
115    pub(crate) discovery: Option<Box<dyn Discovery>>,
116
117    /// A DNS resolver to use for resolving relay URLs.
118    ///
119    /// You can use [`crate::dns::default_resolver`] for a resolver that uses the system's DNS
120    /// configuration.
121    pub(crate) dns_resolver: DnsResolver,
122
123    /// Proxy configuration.
124    pub(crate) proxy_url: Option<Url>,
125
126    /// Skip verification of SSL certificates from relay servers
127    ///
128    /// May only be used in tests.
129    #[cfg(any(test, feature = "test-utils"))]
130    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
131    pub(crate) insecure_skip_relay_cert_verify: bool,
132}
133
134impl Default for Options {
135    fn default() -> Self {
136        Options {
137            addr_v4: None,
138            addr_v6: None,
139            secret_key: SecretKey::generate(),
140            relay_map: RelayMap::empty(),
141            node_map: None,
142            discovery: None,
143            proxy_url: None,
144            dns_resolver: crate::dns::default_resolver().clone(),
145            #[cfg(any(test, feature = "test-utils"))]
146            insecure_skip_relay_cert_verify: false,
147        }
148    }
149}
150
151/// Contents of a relay message. Use a SmallVec to avoid allocations for the very
152/// common case of a single packet.
153type RelayContents = SmallVec<[Bytes; 1]>;
154
155/// Handle for [`MagicSock`].
156///
157/// Dereferences to [`MagicSock`], and handles closing.
158#[derive(Clone, Debug, derive_more::Deref)]
159pub(crate) struct Handle {
160    #[deref(forward)]
161    msock: Arc<MagicSock>,
162    // Empty when closed
163    actor_tasks: Arc<Mutex<JoinSet<()>>>,
164}
165
166/// Iroh connectivity layer.
167///
168/// This is responsible for routing packets to nodes based on node IDs, it will initially
169/// route packets via a relay and transparently try and establish a node-to-node
170/// connection and upgrade to it.  It will also keep looking for better connections as the
171/// network details of both nodes change.
172///
173/// It is usually only necessary to use a single [`MagicSock`] instance in an application, it
174/// means any QUIC endpoints on top will be sharing as much information about nodes as
175/// possible.
176#[derive(derive_more::Debug)]
177pub(crate) struct MagicSock {
178    actor_sender: mpsc::Sender<ActorMessage>,
179    relay_actor_sender: mpsc::Sender<RelayActorMessage>,
180    /// String representation of the node_id of this node.
181    me: String,
182    /// Proxy
183    proxy_url: Option<Url>,
184
185    /// Used for receiving relay messages.
186    relay_recv_receiver: parking_lot::Mutex<mpsc::Receiver<RelayRecvResult>>,
187    /// Stores wakers, to be called when relay_recv_ch receives new data.
188    network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
189    network_send_wakers: Arc<parking_lot::Mutex<Option<Waker>>>,
190
191    /// The DNS resolver to be used in this magicsock.
192    dns_resolver: DnsResolver,
193
194    /// Key for this node.
195    secret_key: SecretKey,
196
197    /// Cached version of the Ipv4 and Ipv6 addrs of the current connection.
198    local_addrs: std::sync::RwLock<(SocketAddr, Option<SocketAddr>)>,
199
200    /// Preferred port from `Options::port`; 0 means auto.
201    port: AtomicU16,
202
203    /// Close is in progress (or done)
204    closing: AtomicBool,
205    /// Close was called.
206    closed: AtomicBool,
207    /// If the last netcheck report, reports IPv6 to be available.
208    ipv6_reported: Arc<AtomicBool>,
209
210    /// None (or zero nodes) means relay is disabled.
211    relay_map: RelayMap,
212    /// Nearest relay node ID; 0 means none/unknown.
213    my_relay: Watchable<Option<RelayUrl>>,
214    /// Tracks the networkmap node entity for each node discovery key.
215    node_map: NodeMap,
216    /// UDP IPv4 socket
217    pconn4: UdpConn,
218    /// UDP IPv6 socket
219    pconn6: Option<UdpConn>,
220    /// Netcheck client
221    net_checker: netcheck::Addr,
222    /// The state for an active DiscoKey.
223    disco_secrets: DiscoSecrets,
224
225    /// UDP disco (ping) queue
226    udp_disco_sender: mpsc::Sender<(SocketAddr, PublicKey, disco::Message)>,
227
228    /// Optional discovery service
229    discovery: Option<Box<dyn Discovery>>,
230
231    /// Our discovered direct addresses.
232    direct_addrs: DiscoveredDirectAddrs,
233
234    /// List of CallMeMaybe disco messages that should be sent out after the next endpoint update
235    /// completes
236    pending_call_me_maybes: parking_lot::Mutex<HashMap<PublicKey, RelayUrl>>,
237
238    /// Indicates the direct addr update state.
239    direct_addr_update_state: DirectAddrUpdateState,
240
241    /// Skip verification of SSL certificates from relay servers
242    ///
243    /// May only be used in tests.
244    #[cfg(any(test, feature = "test-utils"))]
245    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
246    insecure_skip_relay_cert_verify: bool,
247}
248
249impl MagicSock {
250    /// Creates a magic [`MagicSock`] listening on [`Options::addr_v4`] and [`Options::addr_v6`].
251    pub(crate) async fn spawn(opts: Options) -> Result<Handle> {
252        Handle::new(opts).await
253    }
254
255    /// Returns the relay node we are connected to, that has the best latency.
256    ///
257    /// If `None`, then we are not connected to any relay nodes.
258    pub(crate) fn my_relay(&self) -> Option<RelayUrl> {
259        self.my_relay.get()
260    }
261
262    /// Get the current proxy configuration.
263    pub(crate) fn proxy_url(&self) -> Option<&Url> {
264        self.proxy_url.as_ref()
265    }
266
267    /// Sets the relay node with the best latency.
268    ///
269    /// If we are not connected to any relay nodes, set this to `None`.
270    fn set_my_relay(&self, my_relay: Option<RelayUrl>) -> Option<RelayUrl> {
271        self.my_relay.replace(my_relay)
272    }
273
274    fn is_closing(&self) -> bool {
275        self.closing.load(Ordering::Relaxed)
276    }
277
278    fn is_closed(&self) -> bool {
279        self.closed.load(Ordering::SeqCst)
280    }
281
282    fn public_key(&self) -> PublicKey {
283        self.secret_key.public()
284    }
285
286    /// Get the cached version of the Ipv4 and Ipv6 addrs of the current connection.
287    pub(crate) fn local_addr(&self) -> (SocketAddr, Option<SocketAddr>) {
288        *self.local_addrs.read().expect("not poisoned")
289    }
290
291    /// Returns `true` if we have at least one candidate address where we can send packets to.
292    pub(crate) fn has_send_address(&self, node_key: PublicKey) -> bool {
293        self.remote_info(node_key)
294            .map(|info| info.has_send_address())
295            .unwrap_or(false)
296    }
297
298    /// Return the [`RemoteInfo`]s of all nodes in the node map.
299    pub(crate) fn list_remote_infos(&self) -> Vec<RemoteInfo> {
300        self.node_map.list_remote_infos(Instant::now())
301    }
302
303    /// Return the [`RemoteInfo`] for a single node in the node map.
304    pub(crate) fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
305        self.node_map.remote_info(node_id)
306    }
307
308    /// Returns the direct addresses as a stream.
309    ///
310    /// The [`MagicSock`] continuously monitors the direct addresses, the network addresses
311    /// it might be able to be contacted on, for changes.  Whenever changes are detected
312    /// this stream will yield a new list of addresses.
313    ///
314    /// Upon the first creation on the [`MagicSock`] it may not yet have completed a first
315    /// direct addresses discovery, in this case the first item of the stream will not be
316    /// immediately available.  Once this first set of direct addresses are discovered the
317    /// stream will always return the first set of addresses immediately, which are the most
318    /// recently discovered addresses.
319    ///
320    /// To get the current direct addresses, drop the stream after the first item was
321    /// received.
322    pub(crate) fn direct_addresses(&self) -> DirectAddrsStream {
323        self.direct_addrs.updates_stream()
324    }
325
326    /// Watch for changes to the home relay.
327    ///
328    /// Note that this can be used to wait for the initial home relay to be known. If the home
329    /// relay is known at this point, it will be the first item in the stream.
330    pub(crate) fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
331        let current = futures_lite::stream::iter(self.my_relay());
332        let changes = self
333            .my_relay
334            .watch()
335            .into_stream()
336            .filter_map(|maybe_relay| maybe_relay);
337        current.chain(changes)
338    }
339
340    /// Returns a stream that reports the [`ConnectionType`] we have to the
341    /// given `node_id`.
342    ///
343    /// The `NodeMap` continuously monitors the `node_id`'s endpoint for
344    /// [`ConnectionType`] changes, and sends the latest [`ConnectionType`]
345    /// on the stream.
346    ///
347    /// The current [`ConnectionType`] will the the initial entry on the stream.
348    ///
349    /// # Errors
350    ///
351    /// Will return an error if there is no address information known about the
352    /// given `node_id`.
353    pub(crate) fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
354        self.node_map.conn_type_stream(node_id)
355    }
356
357    /// Returns the socket address which can be used by the QUIC layer to dial this node.
358    pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option<QuicMappedAddr> {
359        self.node_map.get_quic_mapped_addr_for_node_key(node_id)
360    }
361
362    /// Add addresses for a node to the magic socket's addresbook.
363    #[instrument(skip_all, fields(me = %self.me))]
364    pub fn add_node_addr(&self, mut addr: NodeAddr, source: node_map::Source) -> Result<()> {
365        let mut pruned = 0;
366        for my_addr in self.direct_addrs.sockaddrs() {
367            if addr.info.direct_addresses.remove(&my_addr) {
368                warn!( node_id=addr.node_id.fmt_short(), %my_addr, %source, "not adding our addr for node");
369                pruned += 1;
370            }
371        }
372        if !addr.info.is_empty() {
373            self.node_map.add_node_addr(addr, source);
374            Ok(())
375        } else if pruned != 0 {
376            Err(anyhow::anyhow!(
377                "empty addressing info, {pruned} direct addresses have been pruned"
378            ))
379        } else {
380            Err(anyhow::anyhow!("empty addressing info"))
381        }
382    }
383
384    /// Stores a new set of direct addresses.
385    ///
386    /// If the direct addresses have changed from the previous set, they are published to
387    /// discovery.
388    pub(super) fn store_direct_addresses(&self, addrs: BTreeSet<DirectAddr>) {
389        let updated = self.direct_addrs.update(addrs);
390        if updated {
391            self.node_map
392                .on_direct_addr_discovered(self.direct_addrs.sockaddrs());
393            self.publish_my_addr();
394        }
395    }
396
397    /// Get a reference to the DNS resolver used in this [`MagicSock`].
398    pub(crate) fn dns_resolver(&self) -> &DnsResolver {
399        &self.dns_resolver
400    }
401
402    /// Reference to optional discovery service
403    pub(crate) fn discovery(&self) -> Option<&dyn Discovery> {
404        self.discovery.as_ref().map(Box::as_ref)
405    }
406
407    /// Call to notify the system of potential network changes.
408    pub(crate) async fn network_change(&self) {
409        self.actor_sender
410            .send(ActorMessage::NetworkChange)
411            .await
412            .ok();
413    }
414
415    #[cfg(test)]
416    async fn force_network_change(&self, is_major: bool) {
417        self.actor_sender
418            .send(ActorMessage::ForceNetworkChange(is_major))
419            .await
420            .ok();
421    }
422
423    #[cfg_attr(windows, allow(dead_code))]
424    fn normalized_local_addr(&self) -> io::Result<SocketAddr> {
425        let (v4, v6) = self.local_addr();
426        let addr = if let Some(v6) = v6 { v6 } else { v4 };
427        Ok(addr)
428    }
429
430    fn create_io_poller(&self) -> Pin<Box<dyn quinn::UdpPoller>> {
431        // To do this properly the MagicSock would need a registry of pollers.  For each
432        // node we would look up the poller or create one.  Then on each try_send we can
433        // look up the correct poller and configure it to poll the paths it needs.
434        //
435        // Note however that the current quinn impl calls UdpPoller::poll_writable()
436        // **before** it calls try_send(), as opposed to how it is documented.  That is a
437        // problem as we would not yet know the path that needs to be polled.  To avoid such
438        // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context,
439        // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll<io::Result<()>>
440        // instead of the existing .try_send() because then we would have control over this.
441        //
442        // Right now however we have one single poller behaving the same for each
443        // connection.  It checks all paths and returns Poll::Ready as soon as any path is
444        // ready.
445        let ipv4_poller = Arc::new(self.pconn4.clone()).create_io_poller();
446        let ipv6_poller = self
447            .pconn6
448            .as_ref()
449            .map(|sock| Arc::new(sock.clone()).create_io_poller());
450        let relay_sender = self.relay_actor_sender.clone();
451        Box::pin(IoPoller {
452            ipv4_poller,
453            ipv6_poller,
454            relay_sender,
455            relay_send_waker: self.network_send_wakers.clone(),
456        })
457    }
458
459    /// Implementation for AsyncUdpSocket::try_send
460    #[instrument(skip_all)]
461    fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
462        inc_by!(MagicsockMetrics, send_data, transmit.contents.len() as _);
463
464        if self.is_closed() {
465            inc_by!(
466                MagicsockMetrics,
467                send_data_network_down,
468                transmit.contents.len() as _
469            );
470            return Err(io::Error::new(
471                io::ErrorKind::NotConnected,
472                "connection closed",
473            ));
474        }
475
476        let dest = QuicMappedAddr(transmit.destination);
477        trace!(
478            dst = %dest,
479            src = ?transmit.src_ip,
480            len = %transmit.contents.len(),
481            "sending",
482        );
483        let mut transmit = transmit.clone();
484        match self
485            .node_map
486            .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed))
487        {
488            Some((node_id, udp_addr, relay_url, msgs)) => {
489                let mut pings_sent = false;
490                // If we have pings to send, we *have* to send them out first.
491                if !msgs.is_empty() {
492                    if let Err(err) = self.try_send_ping_actions(msgs) {
493                        warn!(
494                            node = %node_id.fmt_short(),
495                            "failed to handle ping actions: {err:#}",
496                        );
497                    }
498                    pings_sent = true;
499                }
500
501                let mut udp_sent = false;
502                let mut udp_error = None;
503                let mut relay_sent = false;
504                let mut relay_error = None;
505
506                // send udp
507                if let Some(addr) = udp_addr {
508                    // rewrite target address
509                    transmit.destination = addr;
510                    match self.try_send_udp(addr, &transmit) {
511                        Ok(()) => {
512                            trace!(node = %node_id.fmt_short(), dst = %addr,
513                                   "sent transmit over UDP");
514                            udp_sent = true;
515                        }
516                        Err(err) => {
517                            error!(node = %node_id.fmt_short(), dst = %addr,
518                                   "failed to send udp: {err:#}");
519                            udp_error = Some(err);
520                        }
521                    }
522                }
523
524                // send relay
525                if let Some(ref relay_url) = relay_url {
526                    match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) {
527                        Ok(()) => {
528                            relay_sent = true;
529                        }
530                        Err(err) => {
531                            relay_error = Some(err);
532                        }
533                    }
534                }
535
536                let udp_pending = udp_error
537                    .as_ref()
538                    .map(|err| err.kind() == io::ErrorKind::WouldBlock)
539                    .unwrap_or_default();
540                let relay_pending = relay_error
541                    .as_ref()
542                    .map(|err| err.kind() == io::ErrorKind::WouldBlock)
543                    .unwrap_or_default();
544                if udp_pending && relay_pending {
545                    // Handle backpressure.
546                    Err(io::Error::new(io::ErrorKind::WouldBlock, "pending"))
547                } else {
548                    if relay_sent || udp_sent {
549                        trace!(
550                            node = %node_id.fmt_short(),
551                            send_udp = ?udp_addr,
552                            send_relay = ?relay_url,
553                            "sent transmit",
554                        );
555                    } else if !pings_sent {
556                        // Returning Ok here means we let QUIC handle a timeout for a lost
557                        // packet, same would happen if we returned any errors.  The
558                        // philosophy of quinn-udp is that a UDP connection could come back
559                        // at any time so these errors should be treated as transient and
560                        // are just timeouts.  Hence we opt for returning Ok.  See
561                        // test_try_send_no_udp_addr_or_relay_url to explore this further.
562                        error!(
563                            node = %node_id.fmt_short(),
564                            "no UDP or relay paths available for node",
565                        );
566                    }
567                    Ok(())
568                }
569            }
570            None => {
571                error!(%dest, "no NodeState for mapped address");
572                // Returning Ok here means we let QUIC timeout.  Returning WouldBlock
573                // triggers a hot loop.  Returning an error would immediately fail a
574                // connection.  The philosophy of quinn-udp is that a UDP connection could
575                // come back at any time or missing should be transient so chooses to let
576                // these kind of errors time out.  See test_try_send_no_send_addr to try
577                // this out.
578                Ok(())
579            }
580        }
581    }
582
583    fn try_send_relay(
584        &self,
585        url: &RelayUrl,
586        node: NodeId,
587        contents: RelayContents,
588    ) -> io::Result<()> {
589        trace!(
590            node = %node.fmt_short(),
591            relay_url = %url,
592            count = contents.len(),
593            len = contents.iter().map(|c| c.len()).sum::<usize>(),
594            "send relay",
595        );
596        let msg = RelayActorMessage::Send {
597            url: url.clone(),
598            contents,
599            remote_node: node,
600        };
601        match self.relay_actor_sender.try_send(msg) {
602            Ok(_) => {
603                trace!(node = %node.fmt_short(), relay_url = %url,
604                       "send relay: message queued");
605                Ok(())
606            }
607            Err(mpsc::error::TrySendError::Closed(_)) => {
608                warn!(node = %node.fmt_short(), relay_url = %url,
609                      "send relay: message dropped, channel to actor is closed");
610                Err(io::Error::new(
611                    io::ErrorKind::ConnectionReset,
612                    "channel to actor is closed",
613                ))
614            }
615            Err(mpsc::error::TrySendError::Full(_)) => {
616                warn!(node = %node.fmt_short(), relay_url = %url,
617                      "send relay: message dropped, channel to actor is full");
618                Err(io::Error::new(
619                    io::ErrorKind::WouldBlock,
620                    "channel to actor is full",
621                ))
622            }
623        }
624    }
625
626    fn try_send_udp(&self, addr: SocketAddr, transmit: &quinn_udp::Transmit) -> io::Result<()> {
627        let conn = self.conn_for_addr(addr)?;
628        conn.try_send(transmit)?;
629        let total_bytes: u64 = transmit.contents.len() as u64;
630        if addr.is_ipv6() {
631            inc_by!(MagicsockMetrics, send_ipv6, total_bytes);
632        } else {
633            inc_by!(MagicsockMetrics, send_ipv4, total_bytes);
634        }
635        Ok(())
636    }
637
638    fn conn_for_addr(&self, addr: SocketAddr) -> io::Result<&UdpConn> {
639        let sock = match addr {
640            SocketAddr::V4(_) => &self.pconn4,
641            SocketAddr::V6(_) => self
642                .pconn6
643                .as_ref()
644                .ok_or(io::Error::new(io::ErrorKind::Other, "no IPv6 connection"))?,
645        };
646        Ok(sock)
647    }
648
649    /// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] indefinitely.
650    #[instrument(skip_all)]
651    fn poll_recv(
652        &self,
653        cx: &mut Context,
654        bufs: &mut [io::IoSliceMut<'_>],
655        metas: &mut [quinn_udp::RecvMeta],
656    ) -> Poll<io::Result<usize>> {
657        // FIXME: currently ipv4 load results in ipv6 traffic being ignored
658        debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
659        if self.is_closed() {
660            return Poll::Pending;
661        }
662
663        // order of polling is: UDPv4, UDPv6, relay
664        let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? {
665            Poll::Pending | Poll::Ready(0) => match &self.pconn6 {
666                Some(conn) => match conn.poll_recv(cx, bufs, metas)? {
667                    Poll::Pending | Poll::Ready(0) => {
668                        return self.poll_recv_relay(cx, bufs, metas);
669                    }
670                    Poll::Ready(n) => (n, false),
671                },
672                None => {
673                    return self.poll_recv_relay(cx, bufs, metas);
674                }
675            },
676            Poll::Ready(n) => (n, true),
677        };
678
679        // Adding the IP address we received something on results in Quinn using this
680        // address on the send path to send from.  However we let Quinn use a
681        // QuicMappedAddress, not a real address.  So we used to substitute our bind address
682        // here so that Quinn would send on the right address.  But that would sometimes
683        // result in the wrong address family and Windows trips up on that.
684        //
685        // What should be done is that this dst_ip from the RecvMeta is stored in the
686        // NodeState/PathState.  Then on the send path it should be retrieved from the
687        // NodeState/PathSate together with the send address and substituted at send time.
688        // This is relevant for IPv6 link-local addresses where the OS otherwise does not
689        // know which intervace to send from.
690        #[cfg(not(windows))]
691        let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip());
692        // Reasoning for this here:
693        // https://github.com/n0-computer/iroh/pull/2595#issuecomment-2290947319
694        #[cfg(windows)]
695        let dst_ip = None;
696
697        let mut quic_packets_total = 0;
698
699        for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) {
700            let mut is_quic = false;
701            let mut quic_packets_count = 0;
702            if meta.len > meta.stride {
703                trace!(%meta.len, %meta.stride, "GRO datagram received");
704                inc!(MagicsockMetrics, recv_gro_datagrams);
705            }
706
707            // find disco and stun packets and forward them to the actor
708            for packet in buf[..meta.len].chunks_mut(meta.stride) {
709                if packet.len() < meta.stride {
710                    trace!(
711                        len = %packet.len(),
712                        %meta.stride,
713                        "Last GRO datagram smaller than stride",
714                    );
715                }
716
717                let packet_is_quic = if stun::is(packet) {
718                    trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet");
719                    let packet2 = Bytes::copy_from_slice(packet);
720                    self.net_checker.receive_stun_packet(packet2, meta.addr);
721                    false
722                } else if let Some((sender, sealed_box)) = disco::source_and_box(packet) {
723                    // Disco?
724                    trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet");
725                    self.handle_disco_message(
726                        sender,
727                        sealed_box,
728                        DiscoMessageSource::Udp(meta.addr),
729                    );
730                    false
731                } else {
732                    trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet");
733                    if from_ipv4 {
734                        inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _);
735                    } else {
736                        inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _);
737                    }
738                    true
739                };
740
741                if packet_is_quic {
742                    quic_packets_count += 1;
743                    is_quic = true;
744                } else {
745                    // overwrite the first byte of the packets with zero.
746                    // this makes quinn reliably and quickly ignore the packet as long as
747                    // [`quinn::EndpointConfig::grease_quic_bit`] is set to `false`
748                    // (which we always do in Endpoint::bind).
749                    packet[0] = 0u8;
750                }
751            }
752
753            if is_quic {
754                // remap addr
755                match self.node_map.receive_udp(meta.addr) {
756                    None => {
757                        warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping");
758                        // if we have no node state for the from addr, set len to 0 to make quinn skip the buf completely.
759                        meta.len = 0;
760                    }
761                    Some((node_id, quic_mapped_addr)) => {
762                        trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets");
763                        quic_packets_total += quic_packets_count;
764                        meta.addr = quic_mapped_addr.0;
765                    }
766                }
767            } else {
768                // if there is no non-stun,non-disco packet in the chunk, set len to zero to make
769                // quinn skip the buf completely.
770                meta.len = 0;
771            }
772            // Normalize local_ip
773            meta.dst_ip = dst_ip;
774        }
775
776        if quic_packets_total > 0 {
777            inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _);
778            trace!("UDP recv: {} packets", quic_packets_total);
779        }
780
781        Poll::Ready(Ok(msgs))
782    }
783
784    #[instrument(skip_all)]
785    fn poll_recv_relay(
786        &self,
787        cx: &mut Context,
788        bufs: &mut [io::IoSliceMut<'_>],
789        metas: &mut [quinn_udp::RecvMeta],
790    ) -> Poll<io::Result<usize>> {
791        let mut num_msgs = 0;
792        for (buf_out, meta_out) in bufs.iter_mut().zip(metas.iter_mut()) {
793            if self.is_closed() {
794                break;
795            }
796            let mut relay_recv_receiver = self.relay_recv_receiver.lock();
797            match relay_recv_receiver.try_recv() {
798                Err(mpsc::error::TryRecvError::Empty) => {
799                    self.network_recv_wakers.lock().replace(cx.waker().clone());
800                    break;
801                }
802                Err(mpsc::error::TryRecvError::Disconnected) => {
803                    return Poll::Ready(Err(io::Error::new(
804                        io::ErrorKind::NotConnected,
805                        "connection closed",
806                    )));
807                }
808                Ok(Err(err)) => return Poll::Ready(Err(err)),
809                Ok(Ok((node_id, meta, bytes))) => {
810                    inc_by!(MagicsockMetrics, recv_data_relay, bytes.len() as _);
811                    trace!(src = %meta.addr, node = %node_id.fmt_short(), count = meta.len / meta.stride, len = meta.len, "recv quic packets from relay");
812                    buf_out[..bytes.len()].copy_from_slice(&bytes);
813                    *meta_out = meta;
814                    num_msgs += 1;
815                }
816            }
817        }
818
819        // If we have any msgs to report, they are in the first `num_msgs_total` slots
820        if num_msgs > 0 {
821            inc_by!(MagicsockMetrics, recv_datagrams, num_msgs as _);
822            Poll::Ready(Ok(num_msgs))
823        } else {
824            Poll::Pending
825        }
826    }
827
828    /// Handles a discovery message.
829    #[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))]
830    fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) {
831        trace!("handle_disco_message start");
832        if self.is_closed() {
833            return;
834        }
835
836        // We're now reasonably sure we're expecting communication from
837        // this node, do the heavy crypto lifting to see what they want.
838        let dm = match self.disco_secrets.unseal_and_decode(
839            &self.secret_key,
840            sender,
841            sealed_box.to_vec(),
842        ) {
843            Ok(dm) => dm,
844            Err(DiscoBoxError::Open(err)) => {
845                warn!(?err, "failed to open disco box");
846                inc!(MagicsockMetrics, recv_disco_bad_key);
847                return;
848            }
849            Err(DiscoBoxError::Parse(err)) => {
850                // Couldn't parse it, but it was inside a correctly
851                // signed box, so just ignore it, assuming it's from a
852                // newer version of Tailscale that we don't
853                // understand. Not even worth logging about, lest it
854                // be too spammy for old clients.
855
856                inc!(MagicsockMetrics, recv_disco_bad_parse);
857                debug!(?err, "failed to parse disco message");
858                return;
859            }
860        };
861
862        if src.is_relay() {
863            inc!(MagicsockMetrics, recv_disco_relay);
864        } else {
865            inc!(MagicsockMetrics, recv_disco_udp);
866        }
867
868        let span = trace_span!("handle_disco", ?dm);
869        let _guard = span.enter();
870        trace!("receive disco message");
871        match dm {
872            disco::Message::Ping(ping) => {
873                inc!(MagicsockMetrics, recv_disco_ping);
874                self.handle_ping(ping, sender, src);
875            }
876            disco::Message::Pong(pong) => {
877                inc!(MagicsockMetrics, recv_disco_pong);
878                self.node_map.handle_pong(sender, &src, pong);
879            }
880            disco::Message::CallMeMaybe(cm) => {
881                inc!(MagicsockMetrics, recv_disco_call_me_maybe);
882                match src {
883                    DiscoMessageSource::Relay { url, .. } => {
884                        event!(
885                            target: "events.net.call-me-maybe.recv",
886                            Level::DEBUG,
887                            remote_node = sender.fmt_short(),
888                            via = ?url,
889                            their_addrs = ?cm.my_numbers,
890                        );
891                    }
892                    _ => {
893                        warn!("call-me-maybe packets should only come via relay");
894                        return;
895                    }
896                }
897                let ping_actions = self.node_map.handle_call_me_maybe(sender, cm);
898                for action in ping_actions {
899                    match action {
900                        PingAction::SendCallMeMaybe { .. } => {
901                            warn!("Unexpected CallMeMaybe as response of handling a CallMeMaybe");
902                        }
903                        PingAction::SendPing(ping) => {
904                            self.send_ping_queued(ping);
905                        }
906                    }
907                }
908            }
909        }
910        trace!("disco message handled");
911    }
912
913    /// Handle a ping message.
914    fn handle_ping(&self, dm: disco::Ping, sender: NodeId, src: DiscoMessageSource) {
915        // Insert the ping into the node map, and return whether a ping with this tx_id was already
916        // received.
917        let addr: SendAddr = src.clone().into();
918        let handled = self.node_map.handle_ping(sender, addr.clone(), dm.tx_id);
919        match handled.role {
920            PingRole::Duplicate => {
921                debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path already confirmed, skip");
922                return;
923            }
924            PingRole::LikelyHeartbeat => {}
925            PingRole::NewPath => {
926                debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new path");
927            }
928            PingRole::Activate => {
929                debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path active");
930            }
931        }
932
933        // Send a pong.
934        debug!(tx = %hex::encode(dm.tx_id), %addr, dstkey = %sender.fmt_short(),
935               "sending pong");
936        let pong = disco::Message::Pong(disco::Pong {
937            tx_id: dm.tx_id,
938            ping_observed_addr: addr.clone(),
939        });
940        event!(
941            target: "events.net.pong.sent",
942            Level::DEBUG,
943            remote_node = %sender.fmt_short(),
944            dst = ?addr,
945            txn = ?dm.tx_id,
946        );
947
948        if !self.send_disco_message_queued(addr.clone(), sender, pong) {
949            warn!(%addr, "failed to queue pong");
950        }
951
952        if let Some(ping) = handled.needs_ping_back {
953            debug!(
954                %addr,
955                dstkey = %sender.fmt_short(),
956                "sending direct ping back",
957            );
958            self.send_ping_queued(ping);
959        }
960    }
961
962    fn encode_disco_message(&self, dst_key: PublicKey, msg: &disco::Message) -> Bytes {
963        self.disco_secrets
964            .encode_and_seal(&self.secret_key, dst_key, msg)
965    }
966
967    fn send_ping_queued(&self, ping: SendPing) {
968        let SendPing {
969            id,
970            dst,
971            dst_node,
972            tx_id,
973            purpose,
974        } = ping;
975        let msg = disco::Message::Ping(disco::Ping {
976            tx_id,
977            node_key: self.public_key(),
978        });
979        let sent = match dst {
980            SendAddr::Udp(addr) => self
981                .udp_disco_sender
982                .try_send((addr, dst_node, msg))
983                .is_ok(),
984            SendAddr::Relay(ref url) => self.send_disco_message_relay(url, dst_node, msg),
985        };
986        if sent {
987            let msg_sender = self.actor_sender.clone();
988            trace!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
989            self.node_map
990                .notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
991        } else {
992            warn!(dst = ?dst, tx = %hex::encode(tx_id), ?purpose, "failed to send ping: queues full");
993        }
994    }
995
996    /// Tries to send the ping actions.
997    ///
998    /// Note that on failure the (remaining) ping actions are simply dropped.  That's bad!
999    /// The Endpoint will think a full ping was done and not request a new full-ping for a
1000    /// while.  We should probably be buffering the pings.
1001    fn try_send_ping_actions(&self, msgs: Vec<PingAction>) -> io::Result<()> {
1002        for msg in msgs {
1003            // Abort sending as soon as we know we are shutting down.
1004            if self.is_closing() || self.is_closed() {
1005                return Ok(());
1006            }
1007            match msg {
1008                PingAction::SendCallMeMaybe {
1009                    ref relay_url,
1010                    dst_node,
1011                } => {
1012                    self.send_or_queue_call_me_maybe(relay_url, dst_node);
1013                }
1014                PingAction::SendPing(ping) => {
1015                    self.try_send_ping(ping)?;
1016                }
1017            }
1018        }
1019        Ok(())
1020    }
1021
1022    /// Send a disco message. UDP messages will be queued.
1023    ///
1024    /// If `dst` is [`SendAddr::Relay`], the message will be pushed into the relay client channel.
1025    /// If `dst` is [`SendAddr::Udp`], the message will be pushed into the udp disco send channel.
1026    ///
1027    /// Returns true if the channel had capacity for the message, and false if the message was
1028    /// dropped.
1029    fn send_disco_message_queued(
1030        &self,
1031        dst: SendAddr,
1032        dst_key: PublicKey,
1033        msg: disco::Message,
1034    ) -> bool {
1035        match dst {
1036            SendAddr::Udp(addr) => self.udp_disco_sender.try_send((addr, dst_key, msg)).is_ok(),
1037            SendAddr::Relay(ref url) => self.send_disco_message_relay(url, dst_key, msg),
1038        }
1039    }
1040
1041    /// Send a disco message. UDP messages will be polled to send directly on the UDP socket.
1042    fn try_send_disco_message(
1043        &self,
1044        dst: SendAddr,
1045        dst_key: PublicKey,
1046        msg: disco::Message,
1047    ) -> io::Result<()> {
1048        match dst {
1049            SendAddr::Udp(addr) => {
1050                self.try_send_disco_message_udp(addr, dst_key, &msg)?;
1051            }
1052            SendAddr::Relay(ref url) => {
1053                self.send_disco_message_relay(url, dst_key, msg);
1054            }
1055        }
1056        Ok(())
1057    }
1058
1059    fn send_disco_message_relay(&self, url: &RelayUrl, dst: NodeId, msg: disco::Message) -> bool {
1060        debug!(node = %dst.fmt_short(), %url, %msg, "send disco message (relay)");
1061        let pkt = self.encode_disco_message(dst, &msg);
1062        inc!(MagicsockMetrics, send_disco_relay);
1063        match self.try_send_relay(url, dst, smallvec![pkt]) {
1064            Ok(()) => {
1065                if let disco::Message::CallMeMaybe(CallMeMaybe { ref my_numbers }) = msg {
1066                    event!(
1067                        target: "events.net.call-me-maybe.sent",
1068                        Level::DEBUG,
1069                        remote_node = %dst.fmt_short(),
1070                        via = ?url,
1071                        addrs = ?my_numbers,
1072                    );
1073                }
1074                inc!(MagicsockMetrics, sent_disco_relay);
1075                disco_message_sent(&msg);
1076                true
1077            }
1078            Err(_) => false,
1079        }
1080    }
1081
1082    async fn send_disco_message_udp(
1083        &self,
1084        dst: SocketAddr,
1085        dst_node: NodeId,
1086        msg: &disco::Message,
1087    ) -> io::Result<()> {
1088        futures_lite::future::poll_fn(move |cx| {
1089            loop {
1090                match self.try_send_disco_message_udp(dst, dst_node, msg) {
1091                    Ok(()) => return Poll::Ready(Ok(())),
1092                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1093                        // This is the socket .try_send_disco_message_udp used.
1094                        let sock = self.conn_for_addr(dst)?;
1095                        let sock = Arc::new(sock.clone());
1096                        let mut poller = sock.create_io_poller();
1097                        match poller.as_mut().poll_writable(cx)? {
1098                            Poll::Ready(()) => continue,
1099                            Poll::Pending => return Poll::Pending,
1100                        }
1101                    }
1102                    Err(err) => return Poll::Ready(Err(err)),
1103                }
1104            }
1105        })
1106        .await
1107    }
1108
1109    fn try_send_disco_message_udp(
1110        &self,
1111        dst: SocketAddr,
1112        dst_node: NodeId,
1113        msg: &disco::Message,
1114    ) -> std::io::Result<()> {
1115        trace!(%dst, %msg, "send disco message (UDP)");
1116        if self.is_closed() {
1117            return Err(io::Error::new(
1118                io::ErrorKind::NotConnected,
1119                "connection closed",
1120            ));
1121        }
1122        let pkt = self.encode_disco_message(dst_node, msg);
1123        // TODO: These metrics will be wrong with the poll impl
1124        // Also - do we need it? I'd say the `sent_disco_udp` below is enough.
1125        inc!(MagicsockMetrics, send_disco_udp);
1126        let transmit = quinn_udp::Transmit {
1127            destination: dst,
1128            contents: &pkt,
1129            ecn: None,
1130            segment_size: None,
1131            src_ip: None, // TODO
1132        };
1133        let sent = self.try_send_udp(dst, &transmit);
1134        match sent {
1135            Ok(()) => {
1136                trace!(%dst, node = %dst_node.fmt_short(), %msg, "sent disco message");
1137                inc!(MagicsockMetrics, sent_disco_udp);
1138                disco_message_sent(msg);
1139                Ok(())
1140            }
1141            Err(err) => {
1142                warn!(%dst, node = %dst_node.fmt_short(), ?msg, ?err,
1143                      "failed to send disco message");
1144                Err(err)
1145            }
1146        }
1147    }
1148
1149    #[instrument(skip_all)]
1150    async fn handle_ping_actions(&mut self, msgs: Vec<PingAction>) {
1151        // TODO: This used to make sure that all ping actions are sent.  Though on the
1152        // poll_send/try_send path we also do fire-and-forget.  try_send_ping_actions()
1153        // really should store any unsent pings on the Inner and send them at the next
1154        // possible time.
1155        if let Err(err) = self.try_send_ping_actions(msgs) {
1156            warn!("Not all ping actions were sent: {err:#}");
1157        }
1158    }
1159
1160    fn try_send_ping(&self, ping: SendPing) -> io::Result<()> {
1161        let SendPing {
1162            id,
1163            dst,
1164            dst_node,
1165            tx_id,
1166            purpose,
1167        } = ping;
1168        let msg = disco::Message::Ping(disco::Ping {
1169            tx_id,
1170            node_key: self.public_key(),
1171        });
1172        self.try_send_disco_message(dst.clone(), dst_node, msg)?;
1173        debug!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (polled)");
1174        let msg_sender = self.actor_sender.clone();
1175        self.node_map
1176            .notify_ping_sent(id, dst.clone(), tx_id, purpose, msg_sender);
1177        Ok(())
1178    }
1179
1180    fn poll_send_relay(
1181        &self,
1182        url: &RelayUrl,
1183        node: PublicKey,
1184        contents: RelayContents,
1185    ) -> Poll<bool> {
1186        trace!(node = %node.fmt_short(), relay_url = %url, count = contents.len(), len = contents.iter().map(|c| c.len()).sum::<usize>(), "send relay");
1187        let msg = RelayActorMessage::Send {
1188            url: url.clone(),
1189            contents,
1190            remote_node: node,
1191        };
1192        match self.relay_actor_sender.try_send(msg) {
1193            Ok(_) => {
1194                trace!(node = %node.fmt_short(), relay_url = %url, "send relay: message queued");
1195                Poll::Ready(true)
1196            }
1197            Err(mpsc::error::TrySendError::Closed(_)) => {
1198                warn!(node = %node.fmt_short(), relay_url = %url, "send relay: message dropped, channel to actor is closed");
1199                Poll::Ready(false)
1200            }
1201            Err(mpsc::error::TrySendError::Full(_)) => {
1202                warn!(node = %node.fmt_short(), relay_url = %url, "send relay: message dropped, channel to actor is full");
1203                Poll::Pending
1204            }
1205        }
1206    }
1207
1208    fn send_queued_call_me_maybes(&self) {
1209        let msg = self.direct_addrs.to_call_me_maybe_message();
1210        let msg = disco::Message::CallMeMaybe(msg);
1211        for (public_key, url) in self.pending_call_me_maybes.lock().drain() {
1212            if !self.send_disco_message_relay(&url, public_key, msg.clone()) {
1213                warn!(node = %public_key.fmt_short(), "relay channel full, dropping call-me-maybe");
1214            }
1215        }
1216    }
1217
1218    /// Sends the call-me-maybe DISCO message, queuing if addresses are too stale.
1219    ///
1220    /// To send the call-me-maybe message, we need to know our current direct addresses.  If
1221    /// this information is too stale, the call-me-maybe is queued while a netcheck run is
1222    /// scheduled.  Once this run finishes, the call-me-maybe will be sent.
1223    fn send_or_queue_call_me_maybe(&self, url: &RelayUrl, dst_node: NodeId) {
1224        match self.direct_addrs.fresh_enough() {
1225            Ok(()) => {
1226                let msg = self.direct_addrs.to_call_me_maybe_message();
1227                let msg = disco::Message::CallMeMaybe(msg);
1228                if !self.send_disco_message_relay(url, dst_node, msg) {
1229                    warn!(dstkey = %dst_node.fmt_short(), relayurl = %url,
1230                      "relay channel full, dropping call-me-maybe");
1231                } else {
1232                    debug!(dstkey = %dst_node.fmt_short(), relayurl = %url, "call-me-maybe sent");
1233                }
1234            }
1235            Err(last_refresh_ago) => {
1236                self.pending_call_me_maybes
1237                    .lock()
1238                    .insert(dst_node, url.clone());
1239                debug!(
1240                    ?last_refresh_ago,
1241                    "want call-me-maybe but direct addrs stale; queuing after restun",
1242                );
1243                self.re_stun("refresh-for-peering");
1244            }
1245        }
1246    }
1247
1248    /// Triggers an address discovery. The provided why string is for debug logging only.
1249    #[instrument(skip_all)]
1250    fn re_stun(&self, why: &'static str) {
1251        debug!("re_stun: {}", why);
1252        inc!(MagicsockMetrics, re_stun_calls);
1253        self.direct_addr_update_state.schedule_run(why);
1254    }
1255
1256    /// Publishes our address to a discovery service, if configured.
1257    ///
1258    /// Called whenever our addresses or home relay node changes.
1259    fn publish_my_addr(&self) {
1260        if let Some(ref discovery) = self.discovery {
1261            let info = AddrInfo {
1262                relay_url: self.my_relay(),
1263                direct_addresses: self.direct_addrs.sockaddrs(),
1264            };
1265            discovery.publish(&info);
1266        }
1267    }
1268}
1269
1270#[derive(Clone, Debug)]
1271enum DiscoMessageSource {
1272    Udp(SocketAddr),
1273    Relay { url: RelayUrl, key: PublicKey },
1274}
1275
1276impl Display for DiscoMessageSource {
1277    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1278        match self {
1279            Self::Udp(addr) => write!(f, "Udp({addr})"),
1280            Self::Relay { ref url, key } => write!(f, "Relay({url}, {})", key.fmt_short()),
1281        }
1282    }
1283}
1284
1285impl From<DiscoMessageSource> for SendAddr {
1286    fn from(value: DiscoMessageSource) -> Self {
1287        match value {
1288            DiscoMessageSource::Udp(addr) => SendAddr::Udp(addr),
1289            DiscoMessageSource::Relay { url, .. } => SendAddr::Relay(url),
1290        }
1291    }
1292}
1293
1294impl From<&DiscoMessageSource> for SendAddr {
1295    fn from(value: &DiscoMessageSource) -> Self {
1296        match value {
1297            DiscoMessageSource::Udp(addr) => SendAddr::Udp(*addr),
1298            DiscoMessageSource::Relay { url, .. } => SendAddr::Relay(url.clone()),
1299        }
1300    }
1301}
1302
1303impl DiscoMessageSource {
1304    fn is_relay(&self) -> bool {
1305        matches!(self, DiscoMessageSource::Relay { .. })
1306    }
1307}
1308
1309/// Manages currently running direct addr discovery, aka netcheck runs.
1310///
1311/// Invariants:
1312/// - only one direct addr update must be running at a time
1313/// - if an update is scheduled while another one is running, remember that
1314///   and start a new one when the current one has finished
1315#[derive(Debug)]
1316struct DirectAddrUpdateState {
1317    /// If running, set to the reason for the currently the update.
1318    running: sync::watch::Sender<Option<&'static str>>,
1319    /// If set, start a new update as soon as the current one is finished.
1320    want_update: parking_lot::Mutex<Option<&'static str>>,
1321}
1322
1323impl DirectAddrUpdateState {
1324    fn new() -> Self {
1325        let (running, _) = sync::watch::channel(None);
1326        DirectAddrUpdateState {
1327            running,
1328            want_update: Default::default(),
1329        }
1330    }
1331
1332    /// Schedules a new run, either starting it immediately if none is running or
1333    /// scheduling it for later.
1334    fn schedule_run(&self, why: &'static str) {
1335        if self.is_running() {
1336            let _ = self.want_update.lock().insert(why);
1337        } else {
1338            self.run(why);
1339        }
1340    }
1341
1342    /// Returns `true` if an update is currently in progress.
1343    fn is_running(&self) -> bool {
1344        self.running.borrow().is_some()
1345    }
1346
1347    /// Trigger a new run.
1348    fn run(&self, why: &'static str) {
1349        self.running.send(Some(why)).ok();
1350    }
1351
1352    /// Clears the current running state.
1353    fn finish_run(&self) {
1354        self.running.send(None).ok();
1355    }
1356
1357    /// Returns the next update, if one is set.
1358    fn next_update(&self) -> Option<&'static str> {
1359        self.want_update.lock().take()
1360    }
1361}
1362
1363impl Handle {
1364    /// Creates a magic [`MagicSock`] listening on [`Options::addr_v4`] and [`Options::addr_v6`].
1365    async fn new(opts: Options) -> Result<Self> {
1366        let me = opts.secret_key.public().fmt_short();
1367        if crate::util::relay_only_mode() {
1368            warn!(
1369                "creating a MagicSock that will only send packets over a relay relay connection."
1370            );
1371        }
1372
1373        Self::with_name(me, opts)
1374            .instrument(error_span!("magicsock"))
1375            .await
1376    }
1377
1378    async fn with_name(me: String, opts: Options) -> Result<Self> {
1379        let port_mapper = portmapper::Client::default();
1380
1381        let Options {
1382            addr_v4,
1383            addr_v6,
1384            secret_key,
1385            relay_map,
1386            node_map,
1387            discovery,
1388            dns_resolver,
1389            proxy_url,
1390            #[cfg(any(test, feature = "test-utils"))]
1391            insecure_skip_relay_cert_verify,
1392        } = opts;
1393
1394        let (relay_recv_sender, relay_recv_receiver) = mpsc::channel(128);
1395
1396        let (pconn4, pconn6) = bind(addr_v4, addr_v6)?;
1397        let port = pconn4.port();
1398
1399        // NOTE: we can end up with a zero port if `std::net::UdpSocket::socket_addr` fails
1400        match port.try_into() {
1401            Ok(non_zero_port) => {
1402                port_mapper.update_local_port(non_zero_port);
1403            }
1404            Err(_zero_port) => debug!("Skipping port mapping with zero local port"),
1405        }
1406        let ipv4_addr = pconn4.local_addr()?;
1407        let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok());
1408
1409        let net_checker = netcheck::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?;
1410
1411        let (actor_sender, actor_receiver) = mpsc::channel(256);
1412        let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
1413        let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);
1414
1415        // load the node data
1416        let node_map = node_map.unwrap_or_default();
1417        let node_map = NodeMap::load_from_vec(node_map);
1418
1419        let inner = Arc::new(MagicSock {
1420            me,
1421            port: AtomicU16::new(port),
1422            secret_key,
1423            proxy_url,
1424            local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)),
1425            closing: AtomicBool::new(false),
1426            closed: AtomicBool::new(false),
1427            relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver),
1428            network_recv_wakers: parking_lot::Mutex::new(None),
1429            network_send_wakers: Arc::new(parking_lot::Mutex::new(None)),
1430            actor_sender: actor_sender.clone(),
1431            ipv6_reported: Arc::new(AtomicBool::new(false)),
1432            relay_map,
1433            my_relay: Default::default(),
1434            pconn4: pconn4.clone(),
1435            pconn6: pconn6.clone(),
1436            net_checker: net_checker.addr(),
1437            disco_secrets: DiscoSecrets::default(),
1438            node_map,
1439            relay_actor_sender: relay_actor_sender.clone(),
1440            udp_disco_sender,
1441            discovery,
1442            direct_addrs: Default::default(),
1443            pending_call_me_maybes: Default::default(),
1444            direct_addr_update_state: DirectAddrUpdateState::new(),
1445            dns_resolver,
1446            #[cfg(any(test, feature = "test-utils"))]
1447            insecure_skip_relay_cert_verify,
1448        });
1449
1450        let mut actor_tasks = JoinSet::default();
1451
1452        let relay_actor = RelayActor::new(inner.clone(), actor_sender.clone());
1453        let relay_actor_cancel_token = relay_actor.cancel_token();
1454        actor_tasks.spawn(
1455            async move {
1456                relay_actor.run(relay_actor_receiver).await;
1457            }
1458            .instrument(info_span!("relay-actor")),
1459        );
1460
1461        let inner2 = inner.clone();
1462        actor_tasks.spawn(async move {
1463            while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await {
1464                if let Err(err) = inner2.send_disco_message_udp(dst, dst_key, &msg).await {
1465                    warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)");
1466                }
1467            }
1468        });
1469
1470        let inner2 = inner.clone();
1471        let network_monitor = netmon::Monitor::new().await?;
1472        actor_tasks.spawn(
1473            async move {
1474                let actor = Actor {
1475                    msg_receiver: actor_receiver,
1476                    msg_sender: actor_sender,
1477                    relay_actor_sender,
1478                    relay_actor_cancel_token,
1479                    msock: inner2,
1480                    relay_recv_sender,
1481                    periodic_re_stun_timer: new_re_stun_timer(false),
1482                    net_info_last: None,
1483                    port_mapper,
1484                    pconn4,
1485                    pconn6,
1486                    no_v4_send: false,
1487                    net_checker,
1488                    network_monitor,
1489                };
1490
1491                if let Err(err) = actor.run().await {
1492                    warn!("relay handler errored: {:?}", err);
1493                }
1494            }
1495            .instrument(info_span!("actor")),
1496        );
1497
1498        let c = Handle {
1499            msock: inner,
1500            actor_tasks: Arc::new(Mutex::new(actor_tasks)),
1501        };
1502
1503        Ok(c)
1504    }
1505
1506    /// Closes the connection.
1507    ///
1508    /// Only the first close does anything. Any later closes return nil.
1509    /// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`]
1510    /// indefinitely after this call.
1511    #[instrument(skip_all, fields(me = %self.msock.me))]
1512    pub(crate) async fn close(&self) -> Result<()> {
1513        if self.msock.is_closed() {
1514            return Ok(());
1515        }
1516        self.msock.closing.store(true, Ordering::Relaxed);
1517        self.msock.actor_sender.send(ActorMessage::Shutdown).await?;
1518        self.msock.closed.store(true, Ordering::SeqCst);
1519        self.msock.direct_addrs.addrs.shutdown();
1520
1521        let mut tasks = self.actor_tasks.lock().await;
1522
1523        // give the tasks a moment to shutdown cleanly
1524        let tasks_ref = &mut tasks;
1525        let shutdown_done = time::timeout(Duration::from_millis(100), async move {
1526            while let Some(task) = tasks_ref.join_next().await {
1527                if let Err(err) = task {
1528                    warn!("unexpected error in task shutdown: {:?}", err);
1529                }
1530            }
1531        })
1532        .await;
1533        if shutdown_done.is_ok() {
1534            debug!("tasks shutdown complete");
1535        } else {
1536            // shutdown all tasks
1537            debug!("aborting remaining {}/3 tasks", tasks.len());
1538            tasks.shutdown().await;
1539        }
1540
1541        Ok(())
1542    }
1543}
1544
1545#[derive(Debug, Default)]
1546struct DiscoSecrets(parking_lot::Mutex<HashMap<PublicKey, SharedSecret>>);
1547
1548impl DiscoSecrets {
1549    fn get(
1550        &self,
1551        secret: &SecretKey,
1552        node_id: PublicKey,
1553    ) -> parking_lot::MappedMutexGuard<SharedSecret> {
1554        parking_lot::MutexGuard::map(self.0.lock(), |inner| {
1555            inner
1556                .entry(node_id)
1557                .or_insert_with(|| secret.shared(&node_id))
1558        })
1559    }
1560
1561    pub fn encode_and_seal(
1562        &self,
1563        secret_key: &SecretKey,
1564        node_id: PublicKey,
1565        msg: &disco::Message,
1566    ) -> Bytes {
1567        let mut seal = msg.as_bytes();
1568        self.get(secret_key, node_id).seal(&mut seal);
1569        disco::encode_message(&secret_key.public(), seal).into()
1570    }
1571
1572    pub fn unseal_and_decode(
1573        &self,
1574        secret: &SecretKey,
1575        node_id: PublicKey,
1576        mut sealed_box: Vec<u8>,
1577    ) -> Result<disco::Message, DiscoBoxError> {
1578        self.get(secret, node_id)
1579            .open(&mut sealed_box)
1580            .map_err(DiscoBoxError::Open)?;
1581        disco::Message::from_bytes(&sealed_box).map_err(DiscoBoxError::Parse)
1582    }
1583}
1584
1585#[derive(Debug, thiserror::Error)]
1586enum DiscoBoxError {
1587    #[error("Failed to open crypto box")]
1588    Open(anyhow::Error),
1589    #[error("Failed to parse disco message")]
1590    Parse(anyhow::Error),
1591}
1592
1593type RelayRecvResult = Result<(PublicKey, quinn_udp::RecvMeta, Bytes), io::Error>;
1594
1595impl AsyncUdpSocket for Handle {
1596    fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn quinn::UdpPoller>> {
1597        self.msock.create_io_poller()
1598    }
1599
1600    fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
1601        self.msock.try_send(transmit)
1602    }
1603
1604    /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely.
1605    fn poll_recv(
1606        &self,
1607        cx: &mut Context,
1608        bufs: &mut [io::IoSliceMut<'_>],
1609        metas: &mut [quinn_udp::RecvMeta],
1610    ) -> Poll<io::Result<usize>> {
1611        self.msock.poll_recv(cx, bufs, metas)
1612    }
1613
1614    fn local_addr(&self) -> io::Result<SocketAddr> {
1615        match &*self.msock.local_addrs.read().expect("not poisoned") {
1616            (ipv4, None) => {
1617                // Pretend to be IPv6, because our QuinnMappedAddrs
1618                // need to be IPv6.
1619                let ip: IpAddr = match ipv4.ip() {
1620                    IpAddr::V4(ip) => ip.to_ipv6_mapped().into(),
1621                    IpAddr::V6(ip) => ip.into(),
1622                };
1623                Ok(SocketAddr::new(ip, ipv4.port()))
1624            }
1625            (_, Some(ipv6)) => Ok(*ipv6),
1626        }
1627    }
1628
1629    fn max_transmit_segments(&self) -> usize {
1630        if let Some(pconn6) = self.pconn6.as_ref() {
1631            std::cmp::min(
1632                pconn6.max_transmit_segments(),
1633                self.pconn4.max_transmit_segments(),
1634            )
1635        } else {
1636            self.pconn4.max_transmit_segments()
1637        }
1638    }
1639
1640    fn max_receive_segments(&self) -> usize {
1641        if let Some(pconn6) = self.pconn6.as_ref() {
1642            // `max_receive_segments` controls the size of the `RecvMeta` buffer
1643            // that quinn creates. Having buffers slightly bigger than necessary
1644            // isn't terrible, and makes sure a single socket can read the maximum
1645            // amount with a single poll. We considered adding these numbers instead,
1646            // but we never get data from both sockets at the same time in `poll_recv`
1647            // and it's impossible and unnecessary to be refactored that way.
1648            std::cmp::max(
1649                pconn6.max_receive_segments(),
1650                self.pconn4.max_receive_segments(),
1651            )
1652        } else {
1653            self.pconn4.max_receive_segments()
1654        }
1655    }
1656
1657    fn may_fragment(&self) -> bool {
1658        if let Some(pconn6) = self.pconn6.as_ref() {
1659            pconn6.may_fragment() || self.pconn4.may_fragment()
1660        } else {
1661            self.pconn4.may_fragment()
1662        }
1663    }
1664}
1665
1666#[derive(Debug)]
1667struct IoPoller {
1668    ipv4_poller: Pin<Box<dyn quinn::UdpPoller>>,
1669    ipv6_poller: Option<Pin<Box<dyn quinn::UdpPoller>>>,
1670    relay_sender: mpsc::Sender<RelayActorMessage>,
1671    relay_send_waker: Arc<parking_lot::Mutex<Option<Waker>>>,
1672}
1673
1674impl quinn::UdpPoller for IoPoller {
1675    fn poll_writable(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
1676        // This version returns Ready as soon as any of them are ready.
1677        let this = &mut *self;
1678        match this.ipv4_poller.as_mut().poll_writable(cx) {
1679            Poll::Ready(_) => return Poll::Ready(Ok(())),
1680            Poll::Pending => (),
1681        }
1682        if let Some(ref mut ipv6_poller) = this.ipv6_poller {
1683            match ipv6_poller.as_mut().poll_writable(cx) {
1684                Poll::Ready(_) => return Poll::Ready(Ok(())),
1685                Poll::Pending => (),
1686            }
1687        }
1688        match this.relay_sender.capacity() {
1689            0 => {
1690                self.relay_send_waker.lock().replace(cx.waker().clone());
1691                Poll::Pending
1692            }
1693            _ => Poll::Ready(Ok(())),
1694        }
1695    }
1696}
1697
1698#[derive(Debug)]
1699enum ActorMessage {
1700    Shutdown,
1701    ReceiveRelay(RelayReadResult),
1702    EndpointPingExpired(usize, stun::TransactionId),
1703    NetcheckReport(Result<Option<Arc<netcheck::Report>>>, &'static str),
1704    NetworkChange,
1705    #[cfg(test)]
1706    ForceNetworkChange(bool),
1707}
1708
1709struct Actor {
1710    msock: Arc<MagicSock>,
1711    msg_receiver: mpsc::Receiver<ActorMessage>,
1712    msg_sender: mpsc::Sender<ActorMessage>,
1713    relay_actor_sender: mpsc::Sender<RelayActorMessage>,
1714    relay_actor_cancel_token: CancellationToken,
1715    /// Channel to send received relay messages on, for processing.
1716    relay_recv_sender: mpsc::Sender<RelayRecvResult>,
1717    /// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun.
1718    periodic_re_stun_timer: time::Interval,
1719    /// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc.
1720    net_info_last: Option<NetInfo>,
1721
1722    // The underlying UDP sockets used to send/rcv packets.
1723    pconn4: UdpConn,
1724    pconn6: Option<UdpConn>,
1725
1726    /// The NAT-PMP/PCP/UPnP prober/client, for requesting port mappings from NAT devices.
1727    port_mapper: portmapper::Client,
1728
1729    /// Whether IPv4 UDP is known to be unable to transmit
1730    /// at all. This could happen if the socket is in an invalid state
1731    /// (as can happen on darwin after a network link status change).
1732    no_v4_send: bool,
1733
1734    /// The prober that discovers local network conditions, including the closest relay relay and NAT mappings.
1735    net_checker: netcheck::Client,
1736
1737    network_monitor: netmon::Monitor,
1738}
1739
1740impl Actor {
1741    async fn run(mut self) -> Result<()> {
1742        // Setup network monitoring
1743        let (link_change_s, mut link_change_r) = mpsc::channel(8);
1744        let _token = self
1745            .network_monitor
1746            .subscribe(move |is_major| {
1747                let link_change_s = link_change_s.clone();
1748                async move {
1749                    link_change_s.send(is_major).await.ok();
1750                }
1751                .boxed()
1752            })
1753            .await?;
1754
1755        // Let the the heartbeat only start a couple seconds later
1756        let mut direct_addr_heartbeat_timer = time::interval_at(
1757            time::Instant::now() + HEARTBEAT_INTERVAL,
1758            HEARTBEAT_INTERVAL,
1759        );
1760        let mut direct_addr_update_receiver =
1761            self.msock.direct_addr_update_state.running.subscribe();
1762        let mut portmap_watcher = self.port_mapper.watch_external_address();
1763
1764        let mut discovery_events: BoxStream<DiscoveryItem> =
1765            Box::pin(futures_lite::stream::empty());
1766        if let Some(d) = self.msock.discovery() {
1767            if let Some(events) = d.subscribe() {
1768                discovery_events = events;
1769            }
1770        }
1771        loop {
1772            inc!(Metrics, actor_tick_main);
1773            tokio::select! {
1774                Some(msg) = self.msg_receiver.recv() => {
1775                    trace!(?msg, "tick: msg");
1776                    inc!(Metrics, actor_tick_msg);
1777                    if self.handle_actor_message(msg).await {
1778                        return Ok(());
1779                    }
1780                }
1781                tick = self.periodic_re_stun_timer.tick() => {
1782                    trace!("tick: re_stun {:?}", tick);
1783                    inc!(Metrics, actor_tick_re_stun);
1784                    self.msock.re_stun("periodic");
1785                }
1786                Ok(()) = portmap_watcher.changed() => {
1787                    trace!("tick: portmap changed");
1788                    inc!(Metrics, actor_tick_portmap_changed);
1789                    let new_external_address = *portmap_watcher.borrow();
1790                    debug!("external address updated: {new_external_address:?}");
1791                    self.msock.re_stun("portmap_updated");
1792                },
1793                _ = direct_addr_heartbeat_timer.tick() => {
1794                    trace!(
1795                        "tick: direct addr heartbeat {} direct addrs",
1796                        self.msock.node_map.node_count(),
1797                    );
1798                    inc!(Metrics, actor_tick_direct_addr_heartbeat);
1799                    // TODO: this might trigger too many packets at once, pace this
1800
1801                    self.msock.node_map.prune_inactive();
1802                    let msgs = self.msock.node_map.nodes_stayin_alive();
1803                    self.handle_ping_actions(msgs).await;
1804                }
1805                _ = direct_addr_update_receiver.changed() => {
1806                    let reason = *direct_addr_update_receiver.borrow();
1807                    trace!("tick: direct addr update receiver {:?}", reason);
1808                    inc!(Metrics, actor_tick_direct_addr_update_receiver);
1809                    if let Some(reason) = reason {
1810                        self.refresh_direct_addrs(reason).await;
1811                    }
1812                }
1813                Some(is_major) = link_change_r.recv() => {
1814                    trace!("tick: link change {}", is_major);
1815                    inc!(Metrics, actor_link_change);
1816                    self.handle_network_change(is_major).await;
1817                }
1818                Some(discovery_item) = discovery_events.next() => {
1819                    trace!("tick: discovery event, address discovered: {discovery_item:?}");
1820                    let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info};
1821                    if let Err(e) = self.msock.add_node_addr(node_addr.clone(), Source::Discovery { name: discovery_item.provenance.into() }) {
1822                        warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}");
1823                    }
1824                }
1825                else => {
1826                    trace!("tick: other");
1827                    inc!(Metrics, actor_tick_other);
1828                }
1829            }
1830        }
1831    }
1832
1833    async fn handle_network_change(&mut self, is_major: bool) {
1834        debug!("link change detected: major? {}", is_major);
1835
1836        if is_major {
1837            self.msock.dns_resolver.clear_cache();
1838            self.msock.re_stun("link-change-major");
1839            self.close_stale_relay_connections().await;
1840            self.reset_endpoint_states();
1841        } else {
1842            self.msock.re_stun("link-change-minor");
1843        }
1844    }
1845
1846    #[instrument(skip_all)]
1847    async fn handle_ping_actions(&mut self, msgs: Vec<PingAction>) {
1848        // TODO: This used to make sure that all ping actions are sent.  Though on the
1849        // poll_send/try_send path we also do fire-and-forget.  try_send_ping_actions()
1850        // really should store any unsent pings on the Inner and send them at the next
1851        // possible time.
1852        if let Err(err) = self.msock.try_send_ping_actions(msgs) {
1853            warn!("Not all ping actions were sent: {err:#}");
1854        }
1855    }
1856
1857    /// Processes an incoming actor message.
1858    ///
1859    /// Returns `true` if it was a shutdown.
1860    async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool {
1861        match msg {
1862            ActorMessage::Shutdown => {
1863                debug!("shutting down");
1864
1865                self.msock.node_map.notify_shutdown();
1866                self.port_mapper.deactivate();
1867                self.relay_actor_cancel_token.cancel();
1868
1869                // Ignore errors from pconnN
1870                // They will frequently have been closed already by a call to connBind.Close.
1871                debug!("stopping connections");
1872                if let Some(ref conn) = self.pconn6 {
1873                    conn.close().await.ok();
1874                }
1875                self.pconn4.close().await.ok();
1876
1877                debug!("shutdown complete");
1878                return true;
1879            }
1880            ActorMessage::ReceiveRelay(read_result) => {
1881                let passthroughs = self.process_relay_read_result(read_result);
1882                for passthrough in passthroughs {
1883                    self.relay_recv_sender
1884                        .send(passthrough)
1885                        .await
1886                        .expect("missing recv sender");
1887                    let mut wakers = self.msock.network_recv_wakers.lock();
1888                    if let Some(waker) = wakers.take() {
1889                        waker.wake();
1890                    }
1891                }
1892            }
1893            ActorMessage::EndpointPingExpired(id, txid) => {
1894                self.msock.node_map.notify_ping_timeout(id, txid);
1895            }
1896            ActorMessage::NetcheckReport(report, why) => {
1897                match report {
1898                    Ok(report) => {
1899                        self.handle_netcheck_report(report).await;
1900                    }
1901                    Err(err) => {
1902                        warn!("failed to generate netcheck report for: {}: {:?}", why, err);
1903                    }
1904                }
1905                self.finalize_direct_addrs_update(why);
1906            }
1907            ActorMessage::NetworkChange => {
1908                self.network_monitor.network_change().await.ok();
1909            }
1910            #[cfg(test)]
1911            ActorMessage::ForceNetworkChange(is_major) => {
1912                self.handle_network_change(is_major).await;
1913            }
1914        }
1915
1916        false
1917    }
1918
1919    #[cfg_attr(windows, allow(dead_code))]
1920    fn normalized_local_addr(&self) -> io::Result<SocketAddr> {
1921        self.msock.normalized_local_addr()
1922    }
1923
1924    fn process_relay_read_result(&mut self, dm: RelayReadResult) -> Vec<RelayRecvResult> {
1925        trace!("process_relay_read {} bytes", dm.buf.len());
1926        if dm.buf.is_empty() {
1927            warn!("received empty relay packet");
1928            return Vec::new();
1929        }
1930        let url = &dm.url;
1931
1932        let quic_mapped_addr = self.msock.node_map.receive_relay(url, dm.src);
1933
1934        // the relay packet is made up of multiple udp packets, prefixed by a u16 be length prefix
1935        //
1936        // split the packet into these parts
1937        let parts = PacketSplitIter::new(dm.buf);
1938        // Normalize local_ip
1939        #[cfg(not(windows))]
1940        let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip());
1941        // Reasoning for this here: https://github.com/n0-computer/iroh/pull/2595#issuecomment-2290947319
1942        #[cfg(windows)]
1943        let dst_ip = None;
1944
1945        let mut out = Vec::new();
1946        for part in parts {
1947            match part {
1948                Ok(part) => {
1949                    if self.handle_relay_disco_message(&part, url, dm.src) {
1950                        // Message was internal, do not bubble up.
1951                        continue;
1952                    }
1953
1954                    let meta = quinn_udp::RecvMeta {
1955                        len: part.len(),
1956                        stride: part.len(),
1957                        addr: quic_mapped_addr.0,
1958                        dst_ip,
1959                        ecn: None,
1960                    };
1961                    out.push(Ok((dm.src, meta, part)));
1962                }
1963                Err(e) => {
1964                    out.push(Err(e));
1965                }
1966            }
1967        }
1968
1969        out
1970    }
1971
1972    /// Refreshes knowledge about our direct addresses.
1973    ///
1974    /// In other words, this triggers a netcheck run.
1975    ///
1976    /// Note that invoking this is managed by the [`DirectAddrUpdateState`] and this should
1977    /// never be invoked directly.  Some day this will be refactored to not allow this easy
1978    /// mistake to be made.
1979    #[instrument(level = "debug", skip_all)]
1980    async fn refresh_direct_addrs(&mut self, why: &'static str) {
1981        inc!(MagicsockMetrics, update_direct_addrs);
1982
1983        debug!("starting direct addr update ({})", why);
1984        self.port_mapper.procure_mapping();
1985        self.update_net_info(why).await;
1986    }
1987
1988    /// Updates the direct addresses of this magic socket.
1989    ///
1990    /// Updates the [`DiscoveredDirectAddrs`] of this [`MagicSock`] with the current set of
1991    /// direct addresses from:
1992    ///
1993    /// - The portmapper.
1994    /// - A netcheck report.
1995    /// - The local interfaces IP addresses.
1996    fn update_direct_addresses(&mut self, netcheck_report: Option<Arc<netcheck::Report>>) {
1997        let portmap_watcher = self.port_mapper.watch_external_address();
1998
1999        // We only want to have one DirectAddr for each SocketAddr we have.  So we store
2000        // this as a map of SocketAddr -> DirectAddrType.  At the end we will construct a
2001        // DirectAddr from each entry.
2002        let mut addrs: BTreeMap<SocketAddr, DirectAddrType> = BTreeMap::new();
2003
2004        // First add PortMapper provided addresses.
2005        let maybe_port_mapped = *portmap_watcher.borrow();
2006        if let Some(portmap_ext) = maybe_port_mapped.map(SocketAddr::V4) {
2007            addrs
2008                .entry(portmap_ext)
2009                .or_insert(DirectAddrType::Portmapped);
2010            self.set_net_info_have_port_map();
2011        }
2012
2013        // Next add STUN addresses from the netcheck report.
2014        if let Some(netcheck_report) = netcheck_report {
2015            if let Some(global_v4) = netcheck_report.global_v4 {
2016                addrs
2017                    .entry(global_v4.into())
2018                    .or_insert(DirectAddrType::Stun);
2019
2020                // If they're behind a hard NAT and are using a fixed
2021                // port locally, assume they might've added a static
2022                // port mapping on their router to the same explicit
2023                // port that we are running with. Worst case it's an invalid candidate mapping.
2024                let port = self.msock.port.load(Ordering::Relaxed);
2025                if netcheck_report
2026                    .mapping_varies_by_dest_ip
2027                    .unwrap_or_default()
2028                    && port != 0
2029                {
2030                    let mut addr = global_v4;
2031                    addr.set_port(port);
2032                    addrs
2033                        .entry(addr.into())
2034                        .or_insert(DirectAddrType::Stun4LocalPort);
2035                }
2036            }
2037            if let Some(global_v6) = netcheck_report.global_v6 {
2038                addrs
2039                    .entry(global_v6.into())
2040                    .or_insert(DirectAddrType::Stun);
2041            }
2042        }
2043
2044        let local_addr_v4 = self.pconn4.local_addr().ok();
2045        let local_addr_v6 = self.pconn6.as_ref().and_then(|c| c.local_addr().ok());
2046
2047        let is_unspecified_v4 = local_addr_v4
2048            .map(|a| a.ip().is_unspecified())
2049            .unwrap_or(false);
2050        let is_unspecified_v6 = local_addr_v6
2051            .map(|a| a.ip().is_unspecified())
2052            .unwrap_or(false);
2053
2054        let msock = self.msock.clone();
2055
2056        // The following code can be slow, we do not want to block the caller since it would
2057        // block the actor loop.
2058        tokio::spawn(
2059            async move {
2060                // If a socket is bound to the unspecified address, create SocketAddrs for
2061                // each local IP address by pairing it with the port the socket is bound on.
2062                if is_unspecified_v4 || is_unspecified_v6 {
2063                    // Depending on the OS and network interfaces attached and their state
2064                    // enumerating the local interfaces can take a long time.  Especially
2065                    // Windows is very slow.
2066                    let LocalAddresses {
2067                        regular: mut ips,
2068                        loopback,
2069                    } = tokio::task::spawn_blocking(LocalAddresses::new)
2070                        .await
2071                        .unwrap();
2072                    if ips.is_empty() && addrs.is_empty() {
2073                        // Include loopback addresses only if there are no other interfaces
2074                        // or public addresses, this allows testing offline.
2075                        ips = loopback;
2076                    }
2077                    for ip in ips {
2078                        let port_if_unspecified = match ip {
2079                            IpAddr::V4(_) if is_unspecified_v4 => {
2080                                local_addr_v4.map(|addr| addr.port())
2081                            }
2082                            IpAddr::V6(_) if is_unspecified_v6 => {
2083                                local_addr_v6.map(|addr| addr.port())
2084                            }
2085                            _ => None,
2086                        };
2087                        if let Some(port) = port_if_unspecified {
2088                            let addr = SocketAddr::new(ip, port);
2089                            addrs.entry(addr).or_insert(DirectAddrType::Local);
2090                        }
2091                    }
2092                }
2093
2094                // If a socket is bound to a specific address, add it.
2095                if !is_unspecified_v4 {
2096                    if let Some(addr) = local_addr_v4 {
2097                        addrs.entry(addr).or_insert(DirectAddrType::Local);
2098                    }
2099                }
2100                if !is_unspecified_v6 {
2101                    if let Some(addr) = local_addr_v6 {
2102                        addrs.entry(addr).or_insert(DirectAddrType::Local);
2103                    }
2104                }
2105
2106                // Finally create and store store all these direct addresses and send any
2107                // queued call-me-maybe messages.
2108                msock.store_direct_addresses(
2109                    addrs
2110                        .iter()
2111                        .map(|(addr, typ)| DirectAddr {
2112                            addr: *addr,
2113                            typ: *typ,
2114                        })
2115                        .collect(),
2116                );
2117                msock.send_queued_call_me_maybes();
2118            }
2119            .instrument(Span::current()),
2120        );
2121    }
2122
2123    /// Called when a direct addr update is done, no matter if it was successful or not.
2124    fn finalize_direct_addrs_update(&mut self, why: &'static str) {
2125        let new_why = self.msock.direct_addr_update_state.next_update();
2126        if !self.msock.is_closed() {
2127            if let Some(new_why) = new_why {
2128                self.msock.direct_addr_update_state.run(new_why);
2129                return;
2130            }
2131            self.periodic_re_stun_timer = new_re_stun_timer(true);
2132        }
2133
2134        self.msock.direct_addr_update_state.finish_run();
2135        debug!("direct addr update done ({})", why);
2136    }
2137
2138    /// Updates `NetInfo.HavePortMap` to true.
2139    #[instrument(level = "debug", skip_all)]
2140    fn set_net_info_have_port_map(&mut self) {
2141        if let Some(ref mut net_info_last) = self.net_info_last {
2142            if net_info_last.have_port_map {
2143                // No change.
2144                return;
2145            }
2146            net_info_last.have_port_map = true;
2147            self.net_info_last = Some(net_info_last.clone());
2148        }
2149    }
2150
2151    #[instrument(level = "debug", skip_all)]
2152    async fn call_net_info_callback(&mut self, ni: NetInfo) {
2153        if let Some(ref net_info_last) = self.net_info_last {
2154            if ni.basically_equal(net_info_last) {
2155                return;
2156            }
2157        }
2158
2159        self.net_info_last = Some(ni);
2160    }
2161
2162    /// Calls netcheck.
2163    ///
2164    /// Note that invoking this is managed by [`DirectAddrUpdateState`] via
2165    /// [`Actor::refresh_direct_addrs`] and this should never be invoked directly.  Some day
2166    /// this will be refactored to not allow this easy mistake to be made.
2167    #[instrument(level = "debug", skip_all)]
2168    async fn update_net_info(&mut self, why: &'static str) {
2169        if self.msock.relay_map.is_empty() {
2170            debug!("skipping netcheck, empty RelayMap");
2171            self.msg_sender
2172                .send(ActorMessage::NetcheckReport(Ok(None), why))
2173                .await
2174                .ok();
2175            return;
2176        }
2177
2178        let relay_map = self.msock.relay_map.clone();
2179        let pconn4 = Some(self.pconn4.as_socket());
2180        let pconn6 = self.pconn6.as_ref().map(|p| p.as_socket());
2181
2182        debug!("requesting netcheck report");
2183        match self
2184            .net_checker
2185            .get_report_channel(relay_map, pconn4, pconn6)
2186            .await
2187        {
2188            Ok(rx) => {
2189                let msg_sender = self.msg_sender.clone();
2190                tokio::task::spawn(async move {
2191                    let report = time::timeout(NETCHECK_REPORT_TIMEOUT, rx).await;
2192                    let report: anyhow::Result<_> = match report {
2193                        Ok(Ok(Ok(report))) => Ok(Some(report)),
2194                        Ok(Ok(Err(err))) => Err(err),
2195                        Ok(Err(_)) => Err(anyhow!("netcheck report not received")),
2196                        Err(err) => Err(anyhow!("netcheck report timeout: {:?}", err)),
2197                    };
2198                    msg_sender
2199                        .send(ActorMessage::NetcheckReport(report, why))
2200                        .await
2201                        .ok();
2202                    // The receiver of the NetcheckReport message will call
2203                    // .finalize_direct_addrs_update().
2204                });
2205            }
2206            Err(err) => {
2207                warn!("unable to start netcheck generation: {:?}", err);
2208                self.finalize_direct_addrs_update(why);
2209            }
2210        }
2211    }
2212
2213    async fn handle_netcheck_report(&mut self, report: Option<Arc<netcheck::Report>>) {
2214        if let Some(ref report) = report {
2215            self.msock
2216                .ipv6_reported
2217                .store(report.ipv6, Ordering::Relaxed);
2218            let r = &report;
2219            trace!(
2220                "setting no_v4_send {} -> {}",
2221                self.no_v4_send,
2222                !r.ipv4_can_send
2223            );
2224            self.no_v4_send = !r.ipv4_can_send;
2225
2226            let have_port_map = self.port_mapper.watch_external_address().borrow().is_some();
2227            let mut ni = NetInfo {
2228                relay_latency: Default::default(),
2229                mapping_varies_by_dest_ip: r.mapping_varies_by_dest_ip,
2230                hair_pinning: r.hair_pinning,
2231                portmap_probe: r.portmap_probe.clone(),
2232                have_port_map,
2233                working_ipv6: Some(r.ipv6),
2234                os_has_ipv6: Some(r.os_has_ipv6),
2235                working_udp: Some(r.udp),
2236                working_icmp_v4: r.icmpv4,
2237                working_icmp_v6: r.icmpv6,
2238                preferred_relay: r.preferred_relay.clone(),
2239            };
2240            for (rid, d) in r.relay_v4_latency.iter() {
2241                ni.relay_latency
2242                    .insert(format!("{rid}-v4"), d.as_secs_f64());
2243            }
2244            for (rid, d) in r.relay_v6_latency.iter() {
2245                ni.relay_latency
2246                    .insert(format!("{rid}-v6"), d.as_secs_f64());
2247            }
2248
2249            if ni.preferred_relay.is_none() {
2250                // Perhaps UDP is blocked. Pick a deterministic but arbitrary one.
2251                ni.preferred_relay = self.pick_relay_fallback();
2252            }
2253
2254            if !self.set_nearest_relay(ni.preferred_relay.clone()) {
2255                ni.preferred_relay = None;
2256            }
2257
2258            // TODO: set link type
2259            self.call_net_info_callback(ni).await;
2260        }
2261        self.update_direct_addresses(report);
2262    }
2263
2264    fn set_nearest_relay(&mut self, relay_url: Option<RelayUrl>) -> bool {
2265        let my_relay = self.msock.my_relay();
2266        if relay_url == my_relay {
2267            // No change.
2268            return true;
2269        }
2270        let old_relay = self.msock.set_my_relay(relay_url.clone());
2271
2272        if let Some(ref relay_url) = relay_url {
2273            inc!(MagicsockMetrics, relay_home_change);
2274
2275            // On change, notify all currently connected relay servers and
2276            // start connecting to our home relay if we are not already.
2277            info!("home is now relay {}, was {:?}", relay_url, old_relay);
2278            self.msock.publish_my_addr();
2279
2280            self.send_relay_actor(RelayActorMessage::SetHome {
2281                url: relay_url.clone(),
2282            });
2283        }
2284
2285        true
2286    }
2287
2288    /// Returns a deterministic relay node to connect to. This is only used if netcheck
2289    /// couldn't find the nearest one, for instance, if UDP is blocked and thus STUN
2290    /// latency checks aren't working.
2291    ///
2292    /// If no the [`RelayMap`] is empty, returns `0`.
2293    fn pick_relay_fallback(&self) -> Option<RelayUrl> {
2294        // TODO: figure out which relay node most of our nodes are using,
2295        // and use that region as our fallback.
2296        //
2297        // If we already had selected something in the past and it has any
2298        // nodes, we want to stay on it. If there are no nodes at all,
2299        // stay on whatever relay we previously picked. If we need to pick
2300        // one and have no node info, pick a node randomly.
2301        //
2302        // We used to do the above for legacy clients, but never updated it for disco.
2303
2304        let my_relay = self.msock.my_relay();
2305        if my_relay.is_some() {
2306            return my_relay;
2307        }
2308
2309        let ids = self.msock.relay_map.urls().collect::<Vec<_>>();
2310        let mut rng = rand::rngs::StdRng::seed_from_u64(0);
2311        ids.choose(&mut rng).map(|c| (*c).clone())
2312    }
2313
2314    /// Resets the preferred address for all nodes.
2315    /// This is called when connectivity changes enough that we no longer trust the old routes.
2316    #[instrument(skip_all, fields(me = %self.msock.me))]
2317    fn reset_endpoint_states(&mut self) {
2318        self.msock.node_map.reset_node_states()
2319    }
2320
2321    /// Tells the relay actor to close stale relay connections.
2322    ///
2323    /// The relay connections who's local endpoints no longer exist after a network change
2324    /// will error out soon enough.  Closing them eagerly speeds this up however and allows
2325    /// re-establishing a relay connection faster.
2326    async fn close_stale_relay_connections(&self) {
2327        let ifs = interfaces::State::new().await;
2328        let local_ips = ifs
2329            .interfaces
2330            .values()
2331            .flat_map(|netif| netif.addrs())
2332            .map(|ipnet| ipnet.addr())
2333            .collect();
2334        self.send_relay_actor(RelayActorMessage::MaybeCloseRelaysOnRebind(local_ips));
2335    }
2336
2337    fn send_relay_actor(&self, msg: RelayActorMessage) {
2338        match self.relay_actor_sender.try_send(msg) {
2339            Ok(_) => {}
2340            Err(mpsc::error::TrySendError::Closed(_)) => {
2341                warn!("unable to send to relay actor, already closed");
2342            }
2343            Err(mpsc::error::TrySendError::Full(_)) => {
2344                warn!("dropping message for relay actor, channel is full");
2345            }
2346        }
2347    }
2348
2349    fn handle_relay_disco_message(
2350        &mut self,
2351        msg: &[u8],
2352        url: &RelayUrl,
2353        relay_node_src: PublicKey,
2354    ) -> bool {
2355        match disco::source_and_box(msg) {
2356            Some((source, sealed_box)) => {
2357                if relay_node_src != source {
2358                    // TODO: return here?
2359                    warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short());
2360                }
2361                self.msock.handle_disco_message(
2362                    source,
2363                    sealed_box,
2364                    DiscoMessageSource::Relay {
2365                        url: url.clone(),
2366                        key: relay_node_src,
2367                    },
2368                );
2369                true
2370            }
2371            None => false,
2372        }
2373    }
2374}
2375
2376fn new_re_stun_timer(initial_delay: bool) -> time::Interval {
2377    // Pick a random duration between 20 and 26 seconds (just under 30s,
2378    // a common UDP NAT timeout on Linux,etc)
2379    let mut rng = rand::thread_rng();
2380    let d: Duration = rng.gen_range(Duration::from_secs(20)..=Duration::from_secs(26));
2381    if initial_delay {
2382        debug!("scheduling periodic_stun to run in {}s", d.as_secs());
2383        time::interval_at(time::Instant::now() + d, d)
2384    } else {
2385        debug!(
2386            "scheduling periodic_stun to run immediately and in {}s",
2387            d.as_secs()
2388        );
2389        time::interval(d)
2390    }
2391}
2392
2393/// Initial connection setup.
2394fn bind(
2395    addr_v4: Option<SocketAddrV4>,
2396    addr_v6: Option<SocketAddrV6>,
2397) -> Result<(UdpConn, Option<UdpConn>)> {
2398    let addr_v4 = addr_v4.unwrap_or_else(|| SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
2399    let pconn4 = UdpConn::bind(SocketAddr::V4(addr_v4)).context("bind IPv4 failed")?;
2400
2401    let ip4_port = pconn4.local_addr()?.port();
2402    let ip6_port = ip4_port.checked_add(1).unwrap_or(ip4_port - 1);
2403    let addr_v6 =
2404        addr_v6.unwrap_or_else(|| SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, ip6_port, 0, 0));
2405    let pconn6 = match UdpConn::bind(SocketAddr::V6(addr_v6)) {
2406        Ok(conn) => Some(conn),
2407        Err(err) => {
2408            info!("bind ignoring IPv6 bind failure: {:?}", err);
2409            None
2410        }
2411    };
2412
2413    Ok((pconn4, pconn6))
2414}
2415
2416/// The discovered direct addresses of this [`MagicSock`].
2417///
2418/// These are all the [`DirectAddr`]s that this [`MagicSock`] is aware of for itself.
2419/// They include all locally bound ones as well as those discovered by other mechanisms like
2420/// STUN.
2421#[derive(derive_more::Debug, Default, Clone)]
2422struct DiscoveredDirectAddrs {
2423    /// The last set of discovered direct addresses.
2424    addrs: Watchable<BTreeSet<DirectAddr>>,
2425
2426    /// The last time the direct addresses were updated, even if there was no change.
2427    ///
2428    /// This is only ever None at startup.
2429    updated_at: Arc<RwLock<Option<Instant>>>,
2430}
2431
2432impl DiscoveredDirectAddrs {
2433    /// Updates the direct addresses, returns `true` if they changed, `false` if not.
2434    fn update(&self, addrs: BTreeSet<DirectAddr>) -> bool {
2435        *self.updated_at.write().unwrap() = Some(Instant::now());
2436        let updated = self.addrs.update(addrs).is_ok();
2437        if updated {
2438            event!(
2439                target: "events.net.direct_addrs",
2440                Level::DEBUG,
2441                addrs = ?self.addrs.get(),
2442            );
2443        }
2444        updated
2445    }
2446
2447    fn sockaddrs(&self) -> BTreeSet<SocketAddr> {
2448        self.addrs.read().iter().map(|da| da.addr).collect()
2449    }
2450
2451    /// Whether the direct addr information is considered "fresh".
2452    ///
2453    /// If not fresh you should probably update the direct addresses before using this info.
2454    ///
2455    /// Returns `Ok(())` if fresh enough and `Err(elapsed)` if not fresh enough.
2456    /// `elapsed` is the time elapsed since the direct addresses were last updated.
2457    ///
2458    /// If there is no direct address information `Err(Duration::ZERO)` is returned.
2459    fn fresh_enough(&self) -> Result<(), Duration> {
2460        match *self.updated_at.read().expect("poisoned") {
2461            None => Err(Duration::ZERO),
2462            Some(time) => {
2463                let elapsed = time.elapsed();
2464                if elapsed <= ENDPOINTS_FRESH_ENOUGH_DURATION {
2465                    Ok(())
2466                } else {
2467                    Err(elapsed)
2468                }
2469            }
2470        }
2471    }
2472
2473    fn to_call_me_maybe_message(&self) -> disco::CallMeMaybe {
2474        let my_numbers = self.addrs.read().iter().map(|da| da.addr).collect();
2475        disco::CallMeMaybe { my_numbers }
2476    }
2477
2478    fn updates_stream(&self) -> DirectAddrsStream {
2479        DirectAddrsStream {
2480            initial: Some(self.addrs.get()),
2481            inner: self.addrs.watch().into_stream(),
2482        }
2483    }
2484}
2485
2486/// Stream returning local endpoints as they change.
2487#[derive(Debug)]
2488pub struct DirectAddrsStream {
2489    initial: Option<BTreeSet<DirectAddr>>,
2490    inner: watchable::WatcherStream<BTreeSet<DirectAddr>>,
2491}
2492
2493impl Stream for DirectAddrsStream {
2494    type Item = BTreeSet<DirectAddr>;
2495
2496    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2497        let this = &mut *self;
2498        if let Some(addrs) = this.initial.take() {
2499            if !addrs.is_empty() {
2500                return Poll::Ready(Some(addrs));
2501            }
2502        }
2503        loop {
2504            match Pin::new(&mut this.inner).poll_next(cx) {
2505                Poll::Pending => break Poll::Pending,
2506                Poll::Ready(Some(addrs)) => {
2507                    if addrs.is_empty() {
2508                        // When we start up we might initially have empty direct addrs as
2509                        // the magic socket has not yet figured this out.  Later on this set
2510                        // should never be empty.  However even if it was the magicsock
2511                        // would be in a state not very usable so skipping those events is
2512                        // probably fine.
2513                        // To make sure we install the right waker we loop rather than
2514                        // returning Poll::Pending immediately here.
2515                        continue;
2516                    } else {
2517                        break Poll::Ready(Some(addrs));
2518                    }
2519                }
2520                Poll::Ready(None) => break Poll::Ready(None),
2521            }
2522        }
2523    }
2524}
2525
2526/// Split a transmit containing a GSO payload into individual packets.
2527///
2528/// This allocates the data.
2529///
2530/// If the transmit has a segment size it contains multiple GSO packets.  It will be split
2531/// into multiple packets according to that segment size.  If it does not have a segment
2532/// size, the contents will be sent as a single packet.
2533// TODO: If quinn stayed on bytes this would probably be much cheaper, probably.  Need to
2534// figure out where they allocate the Vec.
2535fn split_packets(transmit: &quinn_udp::Transmit) -> RelayContents {
2536    let mut res = SmallVec::with_capacity(1);
2537    let contents = transmit.contents;
2538    if let Some(segment_size) = transmit.segment_size {
2539        for chunk in contents.chunks(segment_size) {
2540            res.push(Bytes::from(chunk.to_vec()));
2541        }
2542    } else {
2543        res.push(Bytes::from(contents.to_vec()));
2544    }
2545    res
2546}
2547
2548/// Splits a packet into its component items.
2549#[derive(Debug)]
2550struct PacketSplitIter {
2551    bytes: Bytes,
2552}
2553
2554impl PacketSplitIter {
2555    /// Create a new PacketSplitIter from a packet.
2556    ///
2557    /// Returns an error if the packet is too big.
2558    fn new(bytes: Bytes) -> Self {
2559        Self { bytes }
2560    }
2561
2562    fn fail(&mut self) -> Option<std::io::Result<Bytes>> {
2563        self.bytes.clear();
2564        Some(Err(std::io::Error::new(
2565            std::io::ErrorKind::UnexpectedEof,
2566            "",
2567        )))
2568    }
2569}
2570
2571impl Iterator for PacketSplitIter {
2572    type Item = std::io::Result<Bytes>;
2573
2574    fn next(&mut self) -> Option<Self::Item> {
2575        use bytes::Buf;
2576        if self.bytes.has_remaining() {
2577            if self.bytes.remaining() < 2 {
2578                return self.fail();
2579            }
2580            let len = self.bytes.get_u16_le() as usize;
2581            if self.bytes.remaining() < len {
2582                return self.fail();
2583            }
2584            let item = self.bytes.split_to(len);
2585            Some(Ok(item))
2586        } else {
2587            None
2588        }
2589    }
2590}
2591
2592/// The fake address used by the QUIC layer to address a node.
2593///
2594/// You can consider this as nothing more than a lookup key for a node the [`MagicSock`] knows
2595/// about.
2596///
2597/// [`MagicSock`] can reach a node by several real socket addresses, or maybe even via the relay
2598/// node.  The QUIC layer however needs to address a node by a stable [`SocketAddr`] so
2599/// that normal socket APIs can function.  Thus when a new node is introduced to a [`MagicSock`]
2600/// it is given a new fake address.  This is the type of that address.
2601///
2602/// It is but a newtype.  And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it
2603/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do
2604/// the conversion to this type.
2605#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
2606pub(crate) struct QuicMappedAddr(pub(crate) SocketAddr);
2607
2608/// Counter to always generate unique addresses for [`QuicMappedAddr`].
2609static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1);
2610
2611impl QuicMappedAddr {
2612    /// The Prefix/L of our Unique Local Addresses.
2613    const ADDR_PREFIXL: u8 = 0xfd;
2614    /// The Global ID used in our Unique Local Addresses.
2615    const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11];
2616    /// The Subnet ID used in our Unique Local Addresses.
2617    const ADDR_SUBNET: [u8; 2] = [0; 2];
2618
2619    /// Generates a globally unique fake UDP address.
2620    ///
2621    /// This generates and IPv6 Unique Local Address according to RFC 4193.
2622    pub(crate) fn generate() -> Self {
2623        let mut addr = [0u8; 16];
2624        addr[0] = Self::ADDR_PREFIXL;
2625        addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID);
2626        addr[6..8].copy_from_slice(&Self::ADDR_SUBNET);
2627
2628        let counter = ADDR_COUNTER.fetch_add(1, Ordering::Relaxed);
2629        addr[8..16].copy_from_slice(&counter.to_be_bytes());
2630
2631        Self(SocketAddr::new(IpAddr::V6(Ipv6Addr::from(addr)), 12345))
2632    }
2633}
2634
2635impl std::fmt::Display for QuicMappedAddr {
2636    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2637        write!(f, "QuicMappedAddr({})", self.0)
2638    }
2639}
2640fn disco_message_sent(msg: &disco::Message) {
2641    match msg {
2642        disco::Message::Ping(_) => {
2643            inc!(MagicsockMetrics, sent_disco_ping);
2644        }
2645        disco::Message::Pong(_) => {
2646            inc!(MagicsockMetrics, sent_disco_pong);
2647        }
2648        disco::Message::CallMeMaybe(_) => {
2649            inc!(MagicsockMetrics, sent_disco_call_me_maybe);
2650        }
2651    }
2652}
2653
2654/// A *direct address* on which an iroh-node might be contactable.
2655///
2656/// Direct addresses are UDP socket addresses on which an iroh-net node could potentially be
2657/// contacted.  These can come from various sources depending on the network topology of the
2658/// iroh-net node, see [`DirectAddrType`] for the several kinds of sources.
2659#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2660pub struct DirectAddr {
2661    /// The address.
2662    pub addr: SocketAddr,
2663    /// The origin of this direct address.
2664    pub typ: DirectAddrType,
2665}
2666
2667/// The type of direct address.
2668///
2669/// These are the various sources or origins from which an iroh-net node might have found a
2670/// possible [`DirectAddr`].
2671#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
2672pub enum DirectAddrType {
2673    /// Not yet determined..
2674    Unknown,
2675    /// A locally bound socket address.
2676    Local,
2677    /// Public internet address discovered via STUN.
2678    ///
2679    /// When possible an iroh-net node will perform STUN to discover which is the address
2680    /// from which it sends data on the public internet.  This can be different from locally
2681    /// bound addresses when the node is on a local network which performs NAT or similar.
2682    Stun,
2683    /// An address assigned by the router using port mapping.
2684    ///
2685    /// When possible an iroh-net node will request a port mapping from the local router to
2686    /// get a publicly routable direct address.
2687    Portmapped,
2688    /// Hard NAT: STUN'ed IPv4 address + local fixed port.
2689    ///
2690    /// It is possible to configure iroh-net to bound to a specific port and independently
2691    /// configure the router to forward this port to the iroh-net node.  This indicates a
2692    /// situation like this, which still uses STUN to discover the public address.
2693    Stun4LocalPort,
2694}
2695
2696impl Display for DirectAddrType {
2697    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2698        match self {
2699            DirectAddrType::Unknown => write!(f, "?"),
2700            DirectAddrType::Local => write!(f, "local"),
2701            DirectAddrType::Stun => write!(f, "stun"),
2702            DirectAddrType::Portmapped => write!(f, "portmap"),
2703            DirectAddrType::Stun4LocalPort => write!(f, "stun4localport"),
2704        }
2705    }
2706}
2707
2708/// Contains information about the host's network state.
2709#[derive(Debug, Clone, PartialEq)]
2710struct NetInfo {
2711    /// Says whether the host's NAT mappings vary based on the destination IP.
2712    mapping_varies_by_dest_ip: Option<bool>,
2713
2714    /// If their router does hairpinning. It reports true even if there's no NAT involved.
2715    hair_pinning: Option<bool>,
2716
2717    /// Whether the host has IPv6 internet connectivity.
2718    working_ipv6: Option<bool>,
2719
2720    /// Whether the OS supports IPv6 at all, regardless of whether IPv6 internet connectivity is available.
2721    os_has_ipv6: Option<bool>,
2722
2723    /// Whether the host has UDP internet connectivity.
2724    working_udp: Option<bool>,
2725
2726    /// Whether ICMPv4 works, `None` means not checked.
2727    working_icmp_v4: Option<bool>,
2728
2729    /// Whether ICMPv6 works, `None` means not checked.
2730    working_icmp_v6: Option<bool>,
2731
2732    /// Whether we have an existing portmap open (UPnP, PMP, or PCP).
2733    have_port_map: bool,
2734
2735    /// Probe indicating the presence of port mapping protocols on the LAN.
2736    portmap_probe: Option<portmapper::ProbeOutput>,
2737
2738    /// This node's preferred relay server for incoming traffic.
2739    ///
2740    /// The node might be be temporarily connected to multiple relay servers (to send to
2741    /// other nodes) but this is the relay on which you can always contact this node.  Also
2742    /// known as home relay.
2743    preferred_relay: Option<RelayUrl>,
2744
2745    /// The fastest recent time to reach various relay STUN servers, in seconds.
2746    ///
2747    /// This should only be updated rarely, or when there's a
2748    /// material change, as any change here also gets uploaded to the control plane.
2749    relay_latency: BTreeMap<String, f64>,
2750}
2751
2752impl NetInfo {
2753    /// Checks if this is probably still the same network as *other*.
2754    ///
2755    /// This tries to compare the network situation, without taking into account things
2756    /// expected to change a little like e.g. latency to the relay server.
2757    fn basically_equal(&self, other: &Self) -> bool {
2758        let eq_icmp_v4 = match (self.working_icmp_v4, other.working_icmp_v4) {
2759            (Some(slf), Some(other)) => slf == other,
2760            _ => true, // ignore for comparison if only one report had this info
2761        };
2762        let eq_icmp_v6 = match (self.working_icmp_v6, other.working_icmp_v6) {
2763            (Some(slf), Some(other)) => slf == other,
2764            _ => true, // ignore for comparison if only one report had this info
2765        };
2766        self.mapping_varies_by_dest_ip == other.mapping_varies_by_dest_ip
2767            && self.hair_pinning == other.hair_pinning
2768            && self.working_ipv6 == other.working_ipv6
2769            && self.os_has_ipv6 == other.os_has_ipv6
2770            && self.working_udp == other.working_udp
2771            && eq_icmp_v4
2772            && eq_icmp_v6
2773            && self.have_port_map == other.have_port_map
2774            && self.portmap_probe == other.portmap_probe
2775            && self.preferred_relay == other.preferred_relay
2776    }
2777}
2778
2779#[cfg(test)]
2780mod tests {
2781    use anyhow::Context;
2782    use iroh_test::CallOnDrop;
2783    use rand::RngCore;
2784    use tokio_util::task::AbortOnDropHandle;
2785
2786    use super::*;
2787    use crate::{defaults::staging::EU_RELAY_HOSTNAME, relay::RelayMode, tls, Endpoint};
2788
2789    const ALPN: &[u8] = b"n0/test/1";
2790
2791    impl MagicSock {
2792        #[track_caller]
2793        pub fn add_test_addr(&self, node_addr: NodeAddr) {
2794            self.add_node_addr(
2795                node_addr,
2796                Source::NamedApp {
2797                    name: "test".into(),
2798                },
2799            )
2800            .unwrap()
2801        }
2802    }
2803
2804    /// Magicsock plus wrappers for sending packets
2805    #[derive(Clone)]
2806    struct MagicStack {
2807        secret_key: SecretKey,
2808        endpoint: Endpoint,
2809    }
2810
2811    impl MagicStack {
2812        async fn new(relay_mode: RelayMode) -> Result<Self> {
2813            let secret_key = SecretKey::generate();
2814
2815            let mut transport_config = quinn::TransportConfig::default();
2816            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
2817
2818            let endpoint = Endpoint::builder()
2819                .secret_key(secret_key.clone())
2820                .transport_config(transport_config)
2821                .relay_mode(relay_mode)
2822                .alpns(vec![ALPN.to_vec()])
2823                .bind()
2824                .await?;
2825
2826            Ok(Self {
2827                secret_key,
2828                endpoint,
2829            })
2830        }
2831
2832        fn tracked_endpoints(&self) -> Vec<PublicKey> {
2833            self.endpoint
2834                .magic_sock()
2835                .list_remote_infos()
2836                .into_iter()
2837                .map(|ep| ep.node_id)
2838                .collect()
2839        }
2840
2841        fn public(&self) -> PublicKey {
2842            self.secret_key.public()
2843        }
2844    }
2845
2846    /// Monitors endpoint changes and plumbs things together.
2847    ///
2848    /// This is a way of connecting endpoints without a relay server.  Whenever the local
2849    /// endpoints of a magic endpoint change this address is added to the other magic
2850    /// sockets.  This function will await until the endpoints are connected the first time
2851    /// before returning.
2852    ///
2853    /// When the returned drop guard is dropped, the tasks doing this updating are stopped.
2854    #[instrument(skip_all)]
2855    async fn mesh_stacks(stacks: Vec<MagicStack>) -> Result<CallOnDrop> {
2856        /// Registers endpoint addresses of a node to all other nodes.
2857        fn update_direct_addrs(
2858            stacks: &[MagicStack],
2859            my_idx: usize,
2860            new_addrs: BTreeSet<DirectAddr>,
2861        ) {
2862            let me = &stacks[my_idx];
2863            for (i, m) in stacks.iter().enumerate() {
2864                if i == my_idx {
2865                    continue;
2866                }
2867
2868                let addr = NodeAddr {
2869                    node_id: me.public(),
2870                    info: crate::AddrInfo {
2871                        relay_url: None,
2872                        direct_addresses: new_addrs.iter().map(|ep| ep.addr).collect(),
2873                    },
2874                };
2875                m.endpoint.magic_sock().add_test_addr(addr);
2876            }
2877        }
2878
2879        // For each node, start a task which monitors its local endpoints and registers them
2880        // with the other nodes as local endpoints become known.
2881        let mut tasks = JoinSet::new();
2882        for (my_idx, m) in stacks.iter().enumerate() {
2883            let m = m.clone();
2884            let stacks = stacks.clone();
2885            tasks.spawn(async move {
2886                let me = m.endpoint.node_id().fmt_short();
2887                let mut stream = m.endpoint.direct_addresses();
2888                while let Some(new_eps) = stream.next().await {
2889                    info!(%me, "conn{} endpoints update: {:?}", my_idx + 1, new_eps);
2890                    update_direct_addrs(&stacks, my_idx, new_eps);
2891                }
2892            });
2893        }
2894        let guard = CallOnDrop::new(move || {
2895            tasks.abort_all();
2896        });
2897
2898        // Wait for all nodes to be registered with each other.
2899        time::timeout(Duration::from_secs(10), async move {
2900            let all_node_ids: Vec<_> = stacks.iter().map(|ms| ms.endpoint.node_id()).collect();
2901            loop {
2902                let mut ready = Vec::with_capacity(stacks.len());
2903                for ms in stacks.iter() {
2904                    let endpoints = ms.tracked_endpoints();
2905                    let my_node_id = ms.endpoint.node_id();
2906                    let all_nodes_meshed = all_node_ids
2907                        .iter()
2908                        .filter(|node_id| **node_id != my_node_id)
2909                        .all(|node_id| endpoints.contains(node_id));
2910                    ready.push(all_nodes_meshed);
2911                }
2912                if ready.iter().all(|meshed| *meshed) {
2913                    break;
2914                }
2915                tokio::time::sleep(Duration::from_millis(200)).await;
2916            }
2917        })
2918        .await
2919        .context("failed to connect nodes")?;
2920        info!("all nodes meshed");
2921        Ok(guard)
2922    }
2923
2924    #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))]
2925    async fn echo_receiver(ep: MagicStack) -> Result<()> {
2926        info!("accepting conn");
2927        let conn = ep.endpoint.accept().await.expect("no conn");
2928
2929        info!("connecting");
2930        let conn = conn.await.context("[receiver] connecting")?;
2931        info!("accepting bi");
2932        let (mut send_bi, mut recv_bi) =
2933            conn.accept_bi().await.context("[receiver] accepting bi")?;
2934
2935        info!("reading");
2936        let val = recv_bi
2937            .read_to_end(usize::MAX)
2938            .await
2939            .context("[receiver] reading to end")?;
2940
2941        info!("replying");
2942        for chunk in val.chunks(12) {
2943            send_bi
2944                .write_all(chunk)
2945                .await
2946                .context("[receiver] sending chunk")?;
2947        }
2948
2949        info!("finishing");
2950        send_bi.finish().context("[receiver] finishing")?;
2951        send_bi.stopped().await.context("[receiver] stopped")?;
2952
2953        let stats = conn.stats();
2954        info!("stats: {:#?}", stats);
2955        // TODO: ensure panics in this function are reported ok
2956        assert!(
2957            stats.path.lost_packets < 10,
2958            "[receiver] should not loose many packets",
2959        );
2960
2961        info!("close");
2962        conn.close(0u32.into(), b"done");
2963        info!("wait idle");
2964        ep.endpoint.endpoint().wait_idle().await;
2965
2966        Ok(())
2967    }
2968
2969    #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))]
2970    async fn echo_sender(ep: MagicStack, dest_id: PublicKey, msg: &[u8]) -> Result<()> {
2971        info!("connecting to {}", dest_id.fmt_short());
2972        let dest = NodeAddr::new(dest_id);
2973        let conn = ep
2974            .endpoint
2975            .connect(dest, ALPN)
2976            .await
2977            .context("[sender] connect")?;
2978
2979        info!("opening bi");
2980        let (mut send_bi, mut recv_bi) = conn.open_bi().await.context("[sender] open bi")?;
2981
2982        info!("writing message");
2983        send_bi.write_all(msg).await.context("[sender] write all")?;
2984
2985        info!("finishing");
2986        send_bi.finish().context("[sender] finish")?;
2987        send_bi.stopped().await.context("[sender] stopped")?;
2988
2989        info!("reading_to_end");
2990        let val = recv_bi.read_to_end(usize::MAX).await.context("[sender]")?;
2991        assert_eq!(
2992            val,
2993            msg,
2994            "[sender] expected {}, got {}",
2995            hex::encode(msg),
2996            hex::encode(&val)
2997        );
2998
2999        let stats = conn.stats();
3000        info!("stats: {:#?}", stats);
3001        assert!(
3002            stats.path.lost_packets < 10,
3003            "[sender] should not loose many packets",
3004        );
3005
3006        info!("close");
3007        conn.close(0u32.into(), b"done");
3008        info!("wait idle");
3009        ep.endpoint.endpoint().wait_idle().await;
3010        Ok(())
3011    }
3012
3013    /// Runs a roundtrip between the [`echo_sender`] and [`echo_receiver`].
3014    async fn run_roundtrip(sender: MagicStack, receiver: MagicStack, payload: &[u8]) {
3015        let send_node_id = sender.endpoint.node_id();
3016        let recv_node_id = receiver.endpoint.node_id();
3017        info!("\nroundtrip: {send_node_id:#} -> {recv_node_id:#}");
3018
3019        let receiver_task = tokio::spawn(echo_receiver(receiver));
3020        let sender_res = echo_sender(sender, recv_node_id, payload).await;
3021        let sender_is_err = match sender_res {
3022            Ok(()) => false,
3023            Err(err) => {
3024                eprintln!("[sender] Error:\n{err:#?}");
3025                true
3026            }
3027        };
3028        let receiver_is_err = match receiver_task.await {
3029            Ok(Ok(())) => false,
3030            Ok(Err(err)) => {
3031                eprintln!("[receiver] Error:\n{err:#?}");
3032                true
3033            }
3034            Err(joinerr) => {
3035                if joinerr.is_panic() {
3036                    std::panic::resume_unwind(joinerr.into_panic());
3037                } else {
3038                    eprintln!("[receiver] Error:\n{joinerr:#?}");
3039                }
3040                true
3041            }
3042        };
3043        if sender_is_err || receiver_is_err {
3044            panic!("Sender or receiver errored");
3045        }
3046    }
3047
3048    #[tokio::test(flavor = "multi_thread")]
3049    async fn test_two_devices_roundtrip_quinn_magic() -> Result<()> {
3050        iroh_test::logging::setup_multithreaded();
3051
3052        let m1 = MagicStack::new(RelayMode::Disabled).await?;
3053        let m2 = MagicStack::new(RelayMode::Disabled).await?;
3054
3055        let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3056
3057        for i in 0..5 {
3058            info!("\n-- round {i}");
3059            run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3060            run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3061
3062            info!("\n-- larger data");
3063            let mut data = vec![0u8; 10 * 1024];
3064            rand::thread_rng().fill_bytes(&mut data);
3065            run_roundtrip(m1.clone(), m2.clone(), &data).await;
3066            run_roundtrip(m2.clone(), m1.clone(), &data).await;
3067        }
3068
3069        Ok(())
3070    }
3071
3072    #[tokio::test(flavor = "multi_thread")]
3073    async fn test_two_devices_roundtrip_network_change() -> Result<()> {
3074        time::timeout(
3075            Duration::from_secs(90),
3076            test_two_devices_roundtrip_network_change_impl(),
3077        )
3078        .await?
3079    }
3080
3081    /// Same structure as `test_two_devices_roundtrip_quinn_magic`, but interrupts regularly
3082    /// with (simulated) network changes.
3083    async fn test_two_devices_roundtrip_network_change_impl() -> Result<()> {
3084        iroh_test::logging::setup_multithreaded();
3085
3086        let m1 = MagicStack::new(RelayMode::Disabled).await?;
3087        let m2 = MagicStack::new(RelayMode::Disabled).await?;
3088
3089        let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3090
3091        let offset = || {
3092            let delay = rand::thread_rng().gen_range(10..=500);
3093            Duration::from_millis(delay)
3094        };
3095        let rounds = 5;
3096
3097        // Regular network changes to m1 only.
3098        let m1_network_change_guard = {
3099            let m1 = m1.clone();
3100            let task = tokio::spawn(async move {
3101                loop {
3102                    println!("[m1] network change");
3103                    m1.endpoint.magic_sock().force_network_change(true).await;
3104                    time::sleep(offset()).await;
3105                }
3106            });
3107            CallOnDrop::new(move || {
3108                task.abort();
3109            })
3110        };
3111
3112        for i in 0..rounds {
3113            println!("-- [m1 changes] round {}", i + 1);
3114            run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3115            run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3116
3117            println!("-- [m1 changes] larger data");
3118            let mut data = vec![0u8; 10 * 1024];
3119            rand::thread_rng().fill_bytes(&mut data);
3120            run_roundtrip(m1.clone(), m2.clone(), &data).await;
3121            run_roundtrip(m2.clone(), m1.clone(), &data).await;
3122        }
3123
3124        std::mem::drop(m1_network_change_guard);
3125
3126        // Regular network changes to m2 only.
3127        let m2_network_change_guard = {
3128            let m2 = m2.clone();
3129            let task = tokio::spawn(async move {
3130                loop {
3131                    println!("[m2] network change");
3132                    m2.endpoint.magic_sock().force_network_change(true).await;
3133                    time::sleep(offset()).await;
3134                }
3135            });
3136            CallOnDrop::new(move || {
3137                task.abort();
3138            })
3139        };
3140
3141        for i in 0..rounds {
3142            println!("-- [m2 changes] round {}", i + 1);
3143            run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3144            run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3145
3146            println!("-- [m2 changes] larger data");
3147            let mut data = vec![0u8; 10 * 1024];
3148            rand::thread_rng().fill_bytes(&mut data);
3149            run_roundtrip(m1.clone(), m2.clone(), &data).await;
3150            run_roundtrip(m2.clone(), m1.clone(), &data).await;
3151        }
3152
3153        std::mem::drop(m2_network_change_guard);
3154
3155        // Regular network changes to both m1 and m2 only.
3156        let m1_m2_network_change_guard = {
3157            let m1 = m1.clone();
3158            let m2 = m2.clone();
3159            let task = tokio::spawn(async move {
3160                println!("-- [m1] network change");
3161                m1.endpoint.magic_sock().force_network_change(true).await;
3162                println!("-- [m2] network change");
3163                m2.endpoint.magic_sock().force_network_change(true).await;
3164                time::sleep(offset()).await;
3165            });
3166            CallOnDrop::new(move || {
3167                task.abort();
3168            })
3169        };
3170
3171        for i in 0..rounds {
3172            println!("-- [m1 & m2 changes] round {}", i + 1);
3173            run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3174            run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3175
3176            println!("-- [m1 & m2 changes] larger data");
3177            let mut data = vec![0u8; 10 * 1024];
3178            rand::thread_rng().fill_bytes(&mut data);
3179            run_roundtrip(m1.clone(), m2.clone(), &data).await;
3180            run_roundtrip(m2.clone(), m1.clone(), &data).await;
3181        }
3182
3183        std::mem::drop(m1_m2_network_change_guard);
3184        Ok(())
3185    }
3186
3187    #[tokio::test(flavor = "multi_thread")]
3188    async fn test_two_devices_setup_teardown() -> Result<()> {
3189        iroh_test::logging::setup_multithreaded();
3190        for i in 0..10 {
3191            println!("-- round {i}");
3192            println!("setting up magic stack");
3193            let m1 = MagicStack::new(RelayMode::Disabled).await?;
3194            let m2 = MagicStack::new(RelayMode::Disabled).await?;
3195
3196            let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3197
3198            println!("closing endpoints");
3199            let msock1 = m1.endpoint.magic_sock();
3200            let msock2 = m2.endpoint.magic_sock();
3201            m1.endpoint.close(0u32.into(), b"done").await?;
3202            m2.endpoint.close(0u32.into(), b"done").await?;
3203
3204            assert!(msock1.msock.is_closed());
3205            assert!(msock2.msock.is_closed());
3206        }
3207        Ok(())
3208    }
3209
3210    #[tokio::test]
3211    async fn test_two_devices_roundtrip_quinn_raw() -> Result<()> {
3212        let _guard = iroh_test::logging::setup();
3213
3214        let make_conn = |addr: SocketAddr| -> anyhow::Result<quinn::Endpoint> {
3215            let key = SecretKey::generate();
3216            let conn = std::net::UdpSocket::bind(addr)?;
3217
3218            let quic_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?;
3219            let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
3220            let mut transport_config = quinn::TransportConfig::default();
3221            transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
3222            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3223            server_config.transport_config(Arc::new(transport_config));
3224            let mut quic_ep = quinn::Endpoint::new(
3225                quinn::EndpointConfig::default(),
3226                Some(server_config),
3227                conn,
3228                Arc::new(quinn::TokioRuntime),
3229            )?;
3230
3231            let quic_client_config =
3232                tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?;
3233            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3234            let mut transport_config = quinn::TransportConfig::default();
3235            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3236            client_config.transport_config(Arc::new(transport_config));
3237            quic_ep.set_default_client_config(client_config);
3238
3239            Ok(quic_ep)
3240        };
3241
3242        let m1 = make_conn("127.0.0.1:0".parse().unwrap())?;
3243        let m2 = make_conn("127.0.0.1:0".parse().unwrap())?;
3244
3245        // msg from  a -> b
3246        macro_rules! roundtrip {
3247            ($a:expr, $b:expr, $msg:expr) => {
3248                let a = $a.clone();
3249                let b = $b.clone();
3250                let a_name = stringify!($a);
3251                let b_name = stringify!($b);
3252                println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len());
3253
3254                let a_addr = a.local_addr()?;
3255                let b_addr = b.local_addr()?;
3256
3257                println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr);
3258
3259                let b_task = tokio::task::spawn(async move {
3260                    println!("[{b_name}] accepting conn");
3261                    let conn = b.accept().await.expect("no conn");
3262                    println!("[{}] connecting", b_name);
3263                    let conn = conn
3264                        .await
3265                        .with_context(|| format!("[{b_name}] connecting"))?;
3266                    println!("[{}] accepting bi", b_name);
3267                    let (mut send_bi, mut recv_bi) = conn
3268                        .accept_bi()
3269                        .await
3270                        .with_context(|| format!("[{b_name}] accepting bi"))?;
3271
3272                    println!("[{b_name}] reading");
3273                    let val = recv_bi
3274                        .read_to_end(usize::MAX)
3275                        .await
3276                        .with_context(|| format!("[{b_name}] reading to end"))?;
3277                    println!("[{b_name}] finishing");
3278                    send_bi
3279                        .finish()
3280                        .with_context(|| format!("[{b_name}] finishing"))?;
3281                    send_bi
3282                        .stopped()
3283                        .await
3284                        .with_context(|| format!("[b_name] stopped"))?;
3285
3286                    println!("[{b_name}] close");
3287                    conn.close(0u32.into(), b"done");
3288                    println!("[{b_name}] closed");
3289
3290                    Ok::<_, anyhow::Error>(val)
3291                });
3292
3293                println!("[{a_name}] connecting to {b_addr}");
3294                let conn = a
3295                    .connect(b_addr, "localhost")?
3296                    .await
3297                    .with_context(|| format!("[{a_name}] connect"))?;
3298
3299                println!("[{a_name}] opening bi");
3300                let (mut send_bi, mut recv_bi) = conn
3301                    .open_bi()
3302                    .await
3303                    .with_context(|| format!("[{a_name}] open bi"))?;
3304                println!("[{a_name}] writing message");
3305                send_bi
3306                    .write_all(&$msg[..])
3307                    .await
3308                    .with_context(|| format!("[{a_name}] write all"))?;
3309
3310                println!("[{a_name}] finishing");
3311                send_bi
3312                    .finish()
3313                    .with_context(|| format!("[{a_name}] finish"))?;
3314                send_bi
3315                    .stopped()
3316                    .await
3317                    .with_context(|| format!("[{a_name}] stopped"))?;
3318
3319                println!("[{a_name}] reading_to_end");
3320                let _ = recv_bi
3321                    .read_to_end(usize::MAX)
3322                    .await
3323                    .with_context(|| format!("[{a_name}] reading_to_end"))?;
3324                println!("[{a_name}] close");
3325                conn.close(0u32.into(), b"done");
3326                println!("[{a_name}] wait idle");
3327                a.wait_idle().await;
3328
3329                drop(send_bi);
3330
3331                // make sure the right values arrived
3332                println!("[{a_name}] waiting for channel");
3333                let val = b_task.await??;
3334                anyhow::ensure!(
3335                    val == $msg,
3336                    "expected {}, got {}",
3337                    hex::encode($msg),
3338                    hex::encode(val)
3339                );
3340            };
3341        }
3342
3343        for i in 0..10 {
3344            println!("-- round {}", i + 1);
3345            roundtrip!(m1, m2, b"hello m1");
3346            roundtrip!(m2, m1, b"hello m2");
3347
3348            println!("-- larger data");
3349
3350            let mut data = vec![0u8; 10 * 1024];
3351            rand::thread_rng().fill_bytes(&mut data);
3352            roundtrip!(m1, m2, data);
3353            roundtrip!(m2, m1, data);
3354        }
3355
3356        Ok(())
3357    }
3358
3359    #[tokio::test]
3360    async fn test_two_devices_roundtrip_quinn_rebinding_conn() -> Result<()> {
3361        let _guard = iroh_test::logging::setup();
3362
3363        fn make_conn(addr: SocketAddr) -> anyhow::Result<quinn::Endpoint> {
3364            let key = SecretKey::generate();
3365            let conn = UdpConn::bind(addr)?;
3366
3367            let quic_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?;
3368            let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
3369            let mut transport_config = quinn::TransportConfig::default();
3370            transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
3371            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3372            server_config.transport_config(Arc::new(transport_config));
3373            let mut quic_ep = quinn::Endpoint::new_with_abstract_socket(
3374                quinn::EndpointConfig::default(),
3375                Some(server_config),
3376                Arc::new(conn),
3377                Arc::new(quinn::TokioRuntime),
3378            )?;
3379
3380            let quic_client_config =
3381                tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?;
3382            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3383            let mut transport_config = quinn::TransportConfig::default();
3384            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3385            client_config.transport_config(Arc::new(transport_config));
3386            quic_ep.set_default_client_config(client_config);
3387
3388            Ok(quic_ep)
3389        }
3390
3391        let m1 = make_conn("127.0.0.1:7770".parse().unwrap())?;
3392        let m2 = make_conn("127.0.0.1:7771".parse().unwrap())?;
3393
3394        // msg from  a -> b
3395        macro_rules! roundtrip {
3396            ($a:expr, $b:expr, $msg:expr) => {
3397                let a = $a.clone();
3398                let b = $b.clone();
3399                let a_name = stringify!($a);
3400                let b_name = stringify!($b);
3401                println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len());
3402
3403                let a_addr: SocketAddr = format!("127.0.0.1:{}", a.local_addr()?.port())
3404                    .parse()
3405                    .unwrap();
3406                let b_addr: SocketAddr = format!("127.0.0.1:{}", b.local_addr()?.port())
3407                    .parse()
3408                    .unwrap();
3409
3410                println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr);
3411
3412                let b_task = tokio::task::spawn(async move {
3413                    println!("[{}] accepting conn", b_name);
3414                    let conn = b.accept().await.expect("no conn");
3415                    println!("[{}] connecting", b_name);
3416                    let conn = conn
3417                        .await
3418                        .with_context(|| format!("[{}] connecting", b_name))?;
3419                    println!("[{}] accepting bi", b_name);
3420                    let (mut send_bi, mut recv_bi) = conn
3421                        .accept_bi()
3422                        .await
3423                        .with_context(|| format!("[{}] accepting bi", b_name))?;
3424
3425                    println!("[{}] reading", b_name);
3426                    let val = recv_bi
3427                        .read_to_end(usize::MAX)
3428                        .await
3429                        .with_context(|| format!("[{}] reading to end", b_name))?;
3430                    println!("[{}] finishing", b_name);
3431                    send_bi
3432                        .finish()
3433                        .with_context(|| format!("[{}] finishing", b_name))?;
3434                    send_bi
3435                        .stopped()
3436                        .await
3437                        .with_context(|| format!("[{b_name}] stopped"))?;
3438
3439                    println!("[{}] close", b_name);
3440                    conn.close(0u32.into(), b"done");
3441                    println!("[{}] closed", b_name);
3442
3443                    Ok::<_, anyhow::Error>(val)
3444                });
3445
3446                println!("[{}] connecting to {}", a_name, b_addr);
3447                let conn = a
3448                    .connect(b_addr, "localhost")?
3449                    .await
3450                    .with_context(|| format!("[{}] connect", a_name))?;
3451
3452                println!("[{}] opening bi", a_name);
3453                let (mut send_bi, mut recv_bi) = conn
3454                    .open_bi()
3455                    .await
3456                    .with_context(|| format!("[{}] open bi", a_name))?;
3457                println!("[{}] writing message", a_name);
3458                send_bi
3459                    .write_all(&$msg[..])
3460                    .await
3461                    .with_context(|| format!("[{}] write all", a_name))?;
3462
3463                println!("[{}] finishing", a_name);
3464                send_bi
3465                    .finish()
3466                    .with_context(|| format!("[{}] finish", a_name))?;
3467                send_bi
3468                    .stopped()
3469                    .await
3470                    .with_context(|| format!("[{a_name}] stopped"))?;
3471
3472                println!("[{}] reading_to_end", a_name);
3473                let _ = recv_bi
3474                    .read_to_end(usize::MAX)
3475                    .await
3476                    .with_context(|| format!("[{}]", a_name))?;
3477                println!("[{}] close", a_name);
3478                conn.close(0u32.into(), b"done");
3479                println!("[{}] wait idle", a_name);
3480                a.wait_idle().await;
3481
3482                drop(send_bi);
3483
3484                // make sure the right values arrived
3485                println!("[{}] waiting for channel", a_name);
3486                let val = b_task.await??;
3487                anyhow::ensure!(
3488                    val == $msg,
3489                    "expected {}, got {}",
3490                    hex::encode($msg),
3491                    hex::encode(val)
3492                );
3493            };
3494        }
3495
3496        for i in 0..10 {
3497            println!("-- round {}", i + 1);
3498            roundtrip!(m1, m2, b"hello m1");
3499            roundtrip!(m2, m1, b"hello m2");
3500
3501            println!("-- larger data");
3502
3503            let mut data = vec![0u8; 10 * 1024];
3504            rand::thread_rng().fill_bytes(&mut data);
3505            roundtrip!(m1, m2, data);
3506            roundtrip!(m2, m1, data);
3507        }
3508
3509        Ok(())
3510    }
3511
3512    #[test]
3513    fn test_split_packets() {
3514        fn mk_transmit(contents: &[u8], segment_size: Option<usize>) -> quinn_udp::Transmit<'_> {
3515            let destination = "127.0.0.1:0".parse().unwrap();
3516            quinn_udp::Transmit {
3517                destination,
3518                ecn: None,
3519                contents,
3520                segment_size,
3521                src_ip: None,
3522            }
3523        }
3524        fn mk_expected(parts: impl IntoIterator<Item = &'static str>) -> RelayContents {
3525            parts
3526                .into_iter()
3527                .map(|p| p.as_bytes().to_vec().into())
3528                .collect()
3529        }
3530        // no split
3531        assert_eq!(
3532            split_packets(&mk_transmit(b"hello", None)),
3533            mk_expected(["hello"])
3534        );
3535        // split without rest
3536        assert_eq!(
3537            split_packets(&mk_transmit(b"helloworld", Some(5))),
3538            mk_expected(["hello", "world"])
3539        );
3540        // split with rest and second transmit
3541        assert_eq!(
3542            split_packets(&mk_transmit(b"hello world", Some(5))),
3543            mk_expected(["hello", " worl", "d"]) // spellchecker:disable-line
3544        );
3545        // split that results in 1 packet
3546        assert_eq!(
3547            split_packets(&mk_transmit(b"hello world", Some(1000))),
3548            mk_expected(["hello world"])
3549        );
3550    }
3551
3552    #[tokio::test]
3553    async fn test_local_endpoints() {
3554        let _guard = iroh_test::logging::setup();
3555        let ms = Handle::new(Default::default()).await.unwrap();
3556
3557        // See if we can get endpoints.
3558        let eps0 = ms.direct_addresses().next().await.unwrap();
3559        println!("{eps0:?}");
3560        assert!(!eps0.is_empty());
3561
3562        // Getting the endpoints again immediately should give the same results.
3563        let eps1 = ms.direct_addresses().next().await.unwrap();
3564        println!("{eps1:?}");
3565        assert_eq!(eps0, eps1);
3566    }
3567
3568    #[tokio::test]
3569    async fn test_watch_home_relay() {
3570        // use an empty relay map to get full control of the changes during the test
3571        let ops = Options {
3572            relay_map: RelayMap::empty(),
3573            ..Default::default()
3574        };
3575        let msock = MagicSock::spawn(ops).await.unwrap();
3576        let mut relay_stream = msock.watch_home_relay();
3577
3578        // no relay, nothing to report
3579        assert_eq!(
3580            futures_lite::future::poll_once(relay_stream.next()).await,
3581            None
3582        );
3583
3584        let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap();
3585        msock.set_my_relay(Some(url.clone()));
3586
3587        assert_eq!(relay_stream.next().await, Some(url.clone()));
3588
3589        // drop the stream and query it again, the result should be immediately available
3590
3591        let mut relay_stream = msock.watch_home_relay();
3592        assert_eq!(
3593            futures_lite::future::poll_once(relay_stream.next()).await,
3594            Some(Some(url))
3595        );
3596    }
3597
3598    /// Creates a new [`quinn::Endpoint`] hooked up to a [`MagicSock`].
3599    ///
3600    /// This is without involving [`crate::endpoint::Endpoint`].  The socket will accept
3601    /// connections using [`ALPN`].
3602    ///
3603    /// Use [`magicsock_connect`] to establish connections.
3604    #[instrument(name = "ep", skip_all, fields(me = secret_key.public().fmt_short()))]
3605    async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result<(quinn::Endpoint, Handle)> {
3606        let opts = Options {
3607            addr_v4: None,
3608            addr_v6: None,
3609            secret_key: secret_key.clone(),
3610            relay_map: RelayMap::empty(),
3611            node_map: None,
3612            discovery: None,
3613            dns_resolver: crate::dns::default_resolver().clone(),
3614            proxy_url: None,
3615            insecure_skip_relay_cert_verify: true,
3616        };
3617        let msock = MagicSock::spawn(opts).await?;
3618        let server_config = crate::endpoint::make_server_config(
3619            &secret_key,
3620            vec![ALPN.to_vec()],
3621            Arc::new(quinn::TransportConfig::default()),
3622            true,
3623        )?;
3624        let mut endpoint_config = quinn::EndpointConfig::default();
3625        endpoint_config.grease_quic_bit(false);
3626        let endpoint = quinn::Endpoint::new_with_abstract_socket(
3627            endpoint_config,
3628            Some(server_config),
3629            Arc::new(msock.clone()),
3630            Arc::new(quinn::TokioRuntime),
3631        )?;
3632        Ok((endpoint, msock))
3633    }
3634
3635    /// Connects from `ep` returned by [`magicsock_ep`] to the `node_id`.
3636    ///
3637    /// Uses [`ALPN`], `node_id`, must match `addr`.
3638    #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))]
3639    async fn magicsock_connect(
3640        ep: &quinn::Endpoint,
3641        ep_secret_key: SecretKey,
3642        addr: QuicMappedAddr,
3643        node_id: NodeId,
3644    ) -> Result<quinn::Connection> {
3645        // Endpoint::connect sets this, do the same to have similar behaviour.
3646        let mut transport_config = quinn::TransportConfig::default();
3647        transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
3648
3649        magicsock_connet_with_transport_config(
3650            ep,
3651            ep_secret_key,
3652            addr,
3653            node_id,
3654            Arc::new(transport_config),
3655        )
3656        .await
3657    }
3658
3659    /// Connects from `ep` returned by [`magicsock_ep`] to the `node_id`.
3660    ///
3661    /// This version allows customising the transport config.
3662    ///
3663    /// Uses [`ALPN`], `node_id`, must match `addr`.
3664    #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))]
3665    async fn magicsock_connet_with_transport_config(
3666        ep: &quinn::Endpoint,
3667        ep_secret_key: SecretKey,
3668        addr: QuicMappedAddr,
3669        node_id: NodeId,
3670        transport_config: Arc<quinn::TransportConfig>,
3671    ) -> Result<quinn::Connection> {
3672        let alpns = vec![ALPN.to_vec()];
3673        let quic_client_config =
3674            tls::make_client_config(&ep_secret_key, Some(node_id), alpns, true)?;
3675        let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3676        client_config.transport_config(transport_config);
3677        let connect = ep.connect_with(client_config, addr.0, "localhost")?;
3678        let connection = connect.await?;
3679        Ok(connection)
3680    }
3681
3682    #[tokio::test]
3683    async fn test_try_send_no_send_addr() {
3684        // Regression test: if there is no send_addr we should keep being able to use the
3685        // Endpoint.
3686        let _guard = iroh_test::logging::setup();
3687
3688        let secret_key_1 = SecretKey::from_bytes(&[1u8; 32]);
3689        let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]);
3690        let node_id_2 = secret_key_2.public();
3691        let secret_key_missing_node = SecretKey::from_bytes(&[255u8; 32]);
3692        let node_id_missing_node = secret_key_missing_node.public();
3693
3694        let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap();
3695
3696        // Generate an address not present in the NodeMap.
3697        let bad_addr = QuicMappedAddr::generate();
3698
3699        // 500ms is rather fast here.  Running this locally it should always be the correct
3700        // timeout.  If this is too slow however the test will not become flaky as we are
3701        // expecting the timeout, we might just get the timeout for the wrong reason.  But
3702        // this speeds up the test.
3703        let res = tokio::time::timeout(
3704            Duration::from_millis(500),
3705            magicsock_connect(&ep_1, secret_key_1.clone(), bad_addr, node_id_missing_node),
3706        )
3707        .await;
3708        assert!(res.is_err(), "expecting timeout");
3709
3710        // Now check we can still create another connection with this endpoint.
3711        let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap();
3712
3713        // This needs an accept task
3714        let accept_task = tokio::spawn({
3715            async fn accept(ep: quinn::Endpoint) -> Result<()> {
3716                let incoming = ep.accept().await.ok_or(anyhow!("no incoming"))?;
3717                let _conn = incoming.accept()?.await?;
3718
3719                // Keep this connection alive for a while
3720                tokio::time::sleep(Duration::from_secs(10)).await;
3721                info!("accept finished");
3722                Ok(())
3723            }
3724            let ep_2 = ep_2.clone();
3725            async move {
3726                if let Err(err) = accept(ep_2).await {
3727                    error!("{err:#}");
3728                }
3729            }
3730            .instrument(info_span!("ep2.accept, me = node_id_2.fmt_short()"))
3731        });
3732        let _accept_task = AbortOnDropHandle::new(accept_task);
3733
3734        let node_addr_2 = NodeAddr {
3735            node_id: node_id_2,
3736            info: AddrInfo {
3737                relay_url: None,
3738                direct_addresses: msock_2
3739                    .direct_addresses()
3740                    .next()
3741                    .await
3742                    .expect("no direct addrs")
3743                    .into_iter()
3744                    .map(|x| x.addr)
3745                    .collect(),
3746            },
3747        };
3748        msock_1
3749            .add_node_addr(
3750                node_addr_2,
3751                Source::NamedApp {
3752                    name: "test".into(),
3753                },
3754            )
3755            .unwrap();
3756        let addr = msock_1.get_mapping_addr(node_id_2).unwrap();
3757        let res = tokio::time::timeout(
3758            Duration::from_secs(10),
3759            magicsock_connect(&ep_1, secret_key_1.clone(), addr, node_id_2),
3760        )
3761        .await
3762        .expect("timeout while connecting");
3763
3764        // aka assert!(res.is_ok()) but with nicer error reporting.
3765        res.unwrap();
3766
3767        // TODO: Now check if we can connect to a repaired ep_3, but we can't modify that
3768        // much internal state for now.
3769    }
3770
3771    #[tokio::test]
3772    async fn test_try_send_no_udp_addr_or_relay_url() {
3773        // This specifically tests the `if udp_addr.is_none() && relay_url.is_none()`
3774        // behaviour of MagicSock::try_send.
3775        let _logging_guard = iroh_test::logging::setup();
3776
3777        let secret_key_1 = SecretKey::from_bytes(&[1u8; 32]);
3778        let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]);
3779        let node_id_2 = secret_key_2.public();
3780
3781        let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap();
3782        let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap();
3783
3784        // We need a task to accept the connection.
3785        let accept_task = tokio::spawn({
3786            async fn accept(ep: quinn::Endpoint) -> Result<()> {
3787                let incoming = ep.accept().await.ok_or(anyhow!("no incoming"))?;
3788                let conn = incoming.accept()?.await?;
3789                let mut stream = conn.accept_uni().await?;
3790                stream.read_to_end(1 << 16).await?;
3791                info!("accept finished");
3792                Ok(())
3793            }
3794            let ep_2 = ep_2.clone();
3795            async move {
3796                if let Err(err) = accept(ep_2).await {
3797                    error!("{err:#}");
3798                }
3799            }
3800            .instrument(info_span!("ep2.accept", me = node_id_2.fmt_short()))
3801        });
3802        let _accept_task = AbortOnDropHandle::new(accept_task);
3803
3804        // Add an empty entry in the NodeMap of ep_1
3805        msock_1.node_map.add_node_addr(
3806            NodeAddr {
3807                node_id: node_id_2,
3808                info: AddrInfo::default(),
3809            },
3810            Source::NamedApp {
3811                name: "test".into(),
3812            },
3813        );
3814        let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap();
3815
3816        // Set a low max_idle_timeout so quinn gives up on this quickly and our test does
3817        // not take forever.  You need to check the log output to verify this is really
3818        // triggering the correct error.
3819        // In test_try_send_no_send_addr() above you may have noticed we used
3820        // tokio::time::timeout() on the connection attempt instead.  Here however we want
3821        // Quinn itself to have fully given up on the connection attempt because we will
3822        // later connect to **the same** node.  If Quinn did not give up on the connection
3823        // we'd close it on drop, and the retransmits of the close packets would interfere
3824        // with the next handshake, closing it during the handshake.  This makes the test a
3825        // little slower though.
3826        let mut transport_config = quinn::TransportConfig::default();
3827        transport_config.max_idle_timeout(Some(Duration::from_millis(200).try_into().unwrap()));
3828        let res = magicsock_connet_with_transport_config(
3829            &ep_1,
3830            secret_key_1.clone(),
3831            addr_2,
3832            node_id_2,
3833            Arc::new(transport_config),
3834        )
3835        .await;
3836        assert!(res.is_err(), "expected timeout");
3837        info!("first connect timed out as expected");
3838
3839        // Provide correct addressing information
3840        msock_1.node_map.add_node_addr(
3841            NodeAddr {
3842                node_id: node_id_2,
3843                info: AddrInfo {
3844                    relay_url: None,
3845                    direct_addresses: msock_2
3846                        .direct_addresses()
3847                        .next()
3848                        .await
3849                        .expect("no direct addrs")
3850                        .into_iter()
3851                        .map(|x| x.addr)
3852                        .collect(),
3853                },
3854            },
3855            Source::NamedApp {
3856                name: "test".into(),
3857            },
3858        );
3859
3860        // We can now connect
3861        tokio::time::timeout(Duration::from_secs(10), async move {
3862            info!("establishing new connection");
3863            let conn = magicsock_connect(&ep_1, secret_key_1.clone(), addr_2, node_id_2)
3864                .await
3865                .unwrap();
3866            info!("have connection");
3867            let mut stream = conn.open_uni().await.unwrap();
3868            stream.write_all(b"hello").await.unwrap();
3869            stream.finish().unwrap();
3870            stream.stopped().await.unwrap();
3871            info!("finished stream");
3872        })
3873        .await
3874        .expect("connection timed out");
3875
3876        // TODO: could remove the addresses again, send, add it back and see it recover.
3877        // But we don't have that much private access to the NodeMap.  This will do for now.
3878    }
3879}