1use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt::Display,
21 io,
22 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
23 pin::Pin,
24 sync::{
25 atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
26 Arc, RwLock,
27 },
28 task::{Context, Poll, Waker},
29 time::{Duration, Instant},
30};
31
32use anyhow::{anyhow, Context as _, Result};
33use bytes::Bytes;
34use futures_lite::{FutureExt, Stream, StreamExt};
35use futures_util::stream::BoxStream;
36use iroh_base::key::NodeId;
37use iroh_metrics::{inc, inc_by};
38use netwatch::{interfaces, ip::LocalAddresses, netmon};
39use quinn::AsyncUdpSocket;
40use rand::{seq::SliceRandom, Rng, SeedableRng};
41use smallvec::{smallvec, SmallVec};
42use tokio::{
43 sync::{self, mpsc, Mutex},
44 task::JoinSet,
45 time,
46};
47use tokio_util::sync::CancellationToken;
48use tracing::{
49 debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn,
50 Instrument, Level, Span,
51};
52use url::Url;
53use watchable::Watchable;
54
55use self::{
56 metrics::Metrics as MagicsockMetrics,
57 node_map::{NodeMap, PingAction, PingRole, SendPing},
58 relay_actor::{RelayActor, RelayActorMessage, RelayReadResult},
59 udp_conn::UdpConn,
60};
61use crate::{
62 defaults::timeouts::NETCHECK_REPORT_TIMEOUT,
63 disco::{self, CallMeMaybe, SendAddr},
64 discovery::{Discovery, DiscoveryItem},
65 dns::DnsResolver,
66 endpoint::NodeAddr,
67 key::{PublicKey, SecretKey, SharedSecret},
68 netcheck,
69 relay::{RelayMap, RelayUrl},
70 stun, AddrInfo,
71};
72
73mod metrics;
74mod node_map;
75mod relay_actor;
76mod timer;
77mod udp_conn;
78
79pub use node_map::Source;
80
81pub(super) use self::timer::Timer;
82pub use self::{
83 metrics::Metrics,
84 node_map::{ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, RemoteInfo},
85};
86
87const ENDPOINTS_FRESH_ENOUGH_DURATION: Duration = Duration::from_secs(27);
90
91const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
92
93#[derive(derive_more::Debug)]
95pub(crate) struct Options {
96 pub(crate) addr_v4: Option<SocketAddrV4>,
100 pub(crate) addr_v6: Option<SocketAddrV6>,
104
105 pub(crate) secret_key: SecretKey,
107
108 pub(crate) relay_map: RelayMap,
110
111 pub(crate) node_map: Option<Vec<NodeAddr>>,
113
114 pub(crate) discovery: Option<Box<dyn Discovery>>,
116
117 pub(crate) dns_resolver: DnsResolver,
122
123 pub(crate) proxy_url: Option<Url>,
125
126 #[cfg(any(test, feature = "test-utils"))]
130 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
131 pub(crate) insecure_skip_relay_cert_verify: bool,
132}
133
134impl Default for Options {
135 fn default() -> Self {
136 Options {
137 addr_v4: None,
138 addr_v6: None,
139 secret_key: SecretKey::generate(),
140 relay_map: RelayMap::empty(),
141 node_map: None,
142 discovery: None,
143 proxy_url: None,
144 dns_resolver: crate::dns::default_resolver().clone(),
145 #[cfg(any(test, feature = "test-utils"))]
146 insecure_skip_relay_cert_verify: false,
147 }
148 }
149}
150
151type RelayContents = SmallVec<[Bytes; 1]>;
154
155#[derive(Clone, Debug, derive_more::Deref)]
159pub(crate) struct Handle {
160 #[deref(forward)]
161 msock: Arc<MagicSock>,
162 actor_tasks: Arc<Mutex<JoinSet<()>>>,
164}
165
166#[derive(derive_more::Debug)]
177pub(crate) struct MagicSock {
178 actor_sender: mpsc::Sender<ActorMessage>,
179 relay_actor_sender: mpsc::Sender<RelayActorMessage>,
180 me: String,
182 proxy_url: Option<Url>,
184
185 relay_recv_receiver: parking_lot::Mutex<mpsc::Receiver<RelayRecvResult>>,
187 network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
189 network_send_wakers: Arc<parking_lot::Mutex<Option<Waker>>>,
190
191 dns_resolver: DnsResolver,
193
194 secret_key: SecretKey,
196
197 local_addrs: std::sync::RwLock<(SocketAddr, Option<SocketAddr>)>,
199
200 port: AtomicU16,
202
203 closing: AtomicBool,
205 closed: AtomicBool,
207 ipv6_reported: Arc<AtomicBool>,
209
210 relay_map: RelayMap,
212 my_relay: Watchable<Option<RelayUrl>>,
214 node_map: NodeMap,
216 pconn4: UdpConn,
218 pconn6: Option<UdpConn>,
220 net_checker: netcheck::Addr,
222 disco_secrets: DiscoSecrets,
224
225 udp_disco_sender: mpsc::Sender<(SocketAddr, PublicKey, disco::Message)>,
227
228 discovery: Option<Box<dyn Discovery>>,
230
231 direct_addrs: DiscoveredDirectAddrs,
233
234 pending_call_me_maybes: parking_lot::Mutex<HashMap<PublicKey, RelayUrl>>,
237
238 direct_addr_update_state: DirectAddrUpdateState,
240
241 #[cfg(any(test, feature = "test-utils"))]
245 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
246 insecure_skip_relay_cert_verify: bool,
247}
248
249impl MagicSock {
250 pub(crate) async fn spawn(opts: Options) -> Result<Handle> {
252 Handle::new(opts).await
253 }
254
255 pub(crate) fn my_relay(&self) -> Option<RelayUrl> {
259 self.my_relay.get()
260 }
261
262 pub(crate) fn proxy_url(&self) -> Option<&Url> {
264 self.proxy_url.as_ref()
265 }
266
267 fn set_my_relay(&self, my_relay: Option<RelayUrl>) -> Option<RelayUrl> {
271 self.my_relay.replace(my_relay)
272 }
273
274 fn is_closing(&self) -> bool {
275 self.closing.load(Ordering::Relaxed)
276 }
277
278 fn is_closed(&self) -> bool {
279 self.closed.load(Ordering::SeqCst)
280 }
281
282 fn public_key(&self) -> PublicKey {
283 self.secret_key.public()
284 }
285
286 pub(crate) fn local_addr(&self) -> (SocketAddr, Option<SocketAddr>) {
288 *self.local_addrs.read().expect("not poisoned")
289 }
290
291 pub(crate) fn has_send_address(&self, node_key: PublicKey) -> bool {
293 self.remote_info(node_key)
294 .map(|info| info.has_send_address())
295 .unwrap_or(false)
296 }
297
298 pub(crate) fn list_remote_infos(&self) -> Vec<RemoteInfo> {
300 self.node_map.list_remote_infos(Instant::now())
301 }
302
303 pub(crate) fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
305 self.node_map.remote_info(node_id)
306 }
307
308 pub(crate) fn direct_addresses(&self) -> DirectAddrsStream {
323 self.direct_addrs.updates_stream()
324 }
325
326 pub(crate) fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
331 let current = futures_lite::stream::iter(self.my_relay());
332 let changes = self
333 .my_relay
334 .watch()
335 .into_stream()
336 .filter_map(|maybe_relay| maybe_relay);
337 current.chain(changes)
338 }
339
340 pub(crate) fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
354 self.node_map.conn_type_stream(node_id)
355 }
356
357 pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option<QuicMappedAddr> {
359 self.node_map.get_quic_mapped_addr_for_node_key(node_id)
360 }
361
362 #[instrument(skip_all, fields(me = %self.me))]
364 pub fn add_node_addr(&self, mut addr: NodeAddr, source: node_map::Source) -> Result<()> {
365 let mut pruned = 0;
366 for my_addr in self.direct_addrs.sockaddrs() {
367 if addr.info.direct_addresses.remove(&my_addr) {
368 warn!( node_id=addr.node_id.fmt_short(), %my_addr, %source, "not adding our addr for node");
369 pruned += 1;
370 }
371 }
372 if !addr.info.is_empty() {
373 self.node_map.add_node_addr(addr, source);
374 Ok(())
375 } else if pruned != 0 {
376 Err(anyhow::anyhow!(
377 "empty addressing info, {pruned} direct addresses have been pruned"
378 ))
379 } else {
380 Err(anyhow::anyhow!("empty addressing info"))
381 }
382 }
383
384 pub(super) fn store_direct_addresses(&self, addrs: BTreeSet<DirectAddr>) {
389 let updated = self.direct_addrs.update(addrs);
390 if updated {
391 self.node_map
392 .on_direct_addr_discovered(self.direct_addrs.sockaddrs());
393 self.publish_my_addr();
394 }
395 }
396
397 pub(crate) fn dns_resolver(&self) -> &DnsResolver {
399 &self.dns_resolver
400 }
401
402 pub(crate) fn discovery(&self) -> Option<&dyn Discovery> {
404 self.discovery.as_ref().map(Box::as_ref)
405 }
406
407 pub(crate) async fn network_change(&self) {
409 self.actor_sender
410 .send(ActorMessage::NetworkChange)
411 .await
412 .ok();
413 }
414
415 #[cfg(test)]
416 async fn force_network_change(&self, is_major: bool) {
417 self.actor_sender
418 .send(ActorMessage::ForceNetworkChange(is_major))
419 .await
420 .ok();
421 }
422
423 #[cfg_attr(windows, allow(dead_code))]
424 fn normalized_local_addr(&self) -> io::Result<SocketAddr> {
425 let (v4, v6) = self.local_addr();
426 let addr = if let Some(v6) = v6 { v6 } else { v4 };
427 Ok(addr)
428 }
429
430 fn create_io_poller(&self) -> Pin<Box<dyn quinn::UdpPoller>> {
431 let ipv4_poller = Arc::new(self.pconn4.clone()).create_io_poller();
446 let ipv6_poller = self
447 .pconn6
448 .as_ref()
449 .map(|sock| Arc::new(sock.clone()).create_io_poller());
450 let relay_sender = self.relay_actor_sender.clone();
451 Box::pin(IoPoller {
452 ipv4_poller,
453 ipv6_poller,
454 relay_sender,
455 relay_send_waker: self.network_send_wakers.clone(),
456 })
457 }
458
459 #[instrument(skip_all)]
461 fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
462 inc_by!(MagicsockMetrics, send_data, transmit.contents.len() as _);
463
464 if self.is_closed() {
465 inc_by!(
466 MagicsockMetrics,
467 send_data_network_down,
468 transmit.contents.len() as _
469 );
470 return Err(io::Error::new(
471 io::ErrorKind::NotConnected,
472 "connection closed",
473 ));
474 }
475
476 let dest = QuicMappedAddr(transmit.destination);
477 trace!(
478 dst = %dest,
479 src = ?transmit.src_ip,
480 len = %transmit.contents.len(),
481 "sending",
482 );
483 let mut transmit = transmit.clone();
484 match self
485 .node_map
486 .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed))
487 {
488 Some((node_id, udp_addr, relay_url, msgs)) => {
489 let mut pings_sent = false;
490 if !msgs.is_empty() {
492 if let Err(err) = self.try_send_ping_actions(msgs) {
493 warn!(
494 node = %node_id.fmt_short(),
495 "failed to handle ping actions: {err:#}",
496 );
497 }
498 pings_sent = true;
499 }
500
501 let mut udp_sent = false;
502 let mut udp_error = None;
503 let mut relay_sent = false;
504 let mut relay_error = None;
505
506 if let Some(addr) = udp_addr {
508 transmit.destination = addr;
510 match self.try_send_udp(addr, &transmit) {
511 Ok(()) => {
512 trace!(node = %node_id.fmt_short(), dst = %addr,
513 "sent transmit over UDP");
514 udp_sent = true;
515 }
516 Err(err) => {
517 error!(node = %node_id.fmt_short(), dst = %addr,
518 "failed to send udp: {err:#}");
519 udp_error = Some(err);
520 }
521 }
522 }
523
524 if let Some(ref relay_url) = relay_url {
526 match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) {
527 Ok(()) => {
528 relay_sent = true;
529 }
530 Err(err) => {
531 relay_error = Some(err);
532 }
533 }
534 }
535
536 let udp_pending = udp_error
537 .as_ref()
538 .map(|err| err.kind() == io::ErrorKind::WouldBlock)
539 .unwrap_or_default();
540 let relay_pending = relay_error
541 .as_ref()
542 .map(|err| err.kind() == io::ErrorKind::WouldBlock)
543 .unwrap_or_default();
544 if udp_pending && relay_pending {
545 Err(io::Error::new(io::ErrorKind::WouldBlock, "pending"))
547 } else {
548 if relay_sent || udp_sent {
549 trace!(
550 node = %node_id.fmt_short(),
551 send_udp = ?udp_addr,
552 send_relay = ?relay_url,
553 "sent transmit",
554 );
555 } else if !pings_sent {
556 error!(
563 node = %node_id.fmt_short(),
564 "no UDP or relay paths available for node",
565 );
566 }
567 Ok(())
568 }
569 }
570 None => {
571 error!(%dest, "no NodeState for mapped address");
572 Ok(())
579 }
580 }
581 }
582
583 fn try_send_relay(
584 &self,
585 url: &RelayUrl,
586 node: NodeId,
587 contents: RelayContents,
588 ) -> io::Result<()> {
589 trace!(
590 node = %node.fmt_short(),
591 relay_url = %url,
592 count = contents.len(),
593 len = contents.iter().map(|c| c.len()).sum::<usize>(),
594 "send relay",
595 );
596 let msg = RelayActorMessage::Send {
597 url: url.clone(),
598 contents,
599 remote_node: node,
600 };
601 match self.relay_actor_sender.try_send(msg) {
602 Ok(_) => {
603 trace!(node = %node.fmt_short(), relay_url = %url,
604 "send relay: message queued");
605 Ok(())
606 }
607 Err(mpsc::error::TrySendError::Closed(_)) => {
608 warn!(node = %node.fmt_short(), relay_url = %url,
609 "send relay: message dropped, channel to actor is closed");
610 Err(io::Error::new(
611 io::ErrorKind::ConnectionReset,
612 "channel to actor is closed",
613 ))
614 }
615 Err(mpsc::error::TrySendError::Full(_)) => {
616 warn!(node = %node.fmt_short(), relay_url = %url,
617 "send relay: message dropped, channel to actor is full");
618 Err(io::Error::new(
619 io::ErrorKind::WouldBlock,
620 "channel to actor is full",
621 ))
622 }
623 }
624 }
625
626 fn try_send_udp(&self, addr: SocketAddr, transmit: &quinn_udp::Transmit) -> io::Result<()> {
627 let conn = self.conn_for_addr(addr)?;
628 conn.try_send(transmit)?;
629 let total_bytes: u64 = transmit.contents.len() as u64;
630 if addr.is_ipv6() {
631 inc_by!(MagicsockMetrics, send_ipv6, total_bytes);
632 } else {
633 inc_by!(MagicsockMetrics, send_ipv4, total_bytes);
634 }
635 Ok(())
636 }
637
638 fn conn_for_addr(&self, addr: SocketAddr) -> io::Result<&UdpConn> {
639 let sock = match addr {
640 SocketAddr::V4(_) => &self.pconn4,
641 SocketAddr::V6(_) => self
642 .pconn6
643 .as_ref()
644 .ok_or(io::Error::new(io::ErrorKind::Other, "no IPv6 connection"))?,
645 };
646 Ok(sock)
647 }
648
649 #[instrument(skip_all)]
651 fn poll_recv(
652 &self,
653 cx: &mut Context,
654 bufs: &mut [io::IoSliceMut<'_>],
655 metas: &mut [quinn_udp::RecvMeta],
656 ) -> Poll<io::Result<usize>> {
657 debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
659 if self.is_closed() {
660 return Poll::Pending;
661 }
662
663 let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? {
665 Poll::Pending | Poll::Ready(0) => match &self.pconn6 {
666 Some(conn) => match conn.poll_recv(cx, bufs, metas)? {
667 Poll::Pending | Poll::Ready(0) => {
668 return self.poll_recv_relay(cx, bufs, metas);
669 }
670 Poll::Ready(n) => (n, false),
671 },
672 None => {
673 return self.poll_recv_relay(cx, bufs, metas);
674 }
675 },
676 Poll::Ready(n) => (n, true),
677 };
678
679 #[cfg(not(windows))]
691 let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip());
692 #[cfg(windows)]
695 let dst_ip = None;
696
697 let mut quic_packets_total = 0;
698
699 for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) {
700 let mut is_quic = false;
701 let mut quic_packets_count = 0;
702 if meta.len > meta.stride {
703 trace!(%meta.len, %meta.stride, "GRO datagram received");
704 inc!(MagicsockMetrics, recv_gro_datagrams);
705 }
706
707 for packet in buf[..meta.len].chunks_mut(meta.stride) {
709 if packet.len() < meta.stride {
710 trace!(
711 len = %packet.len(),
712 %meta.stride,
713 "Last GRO datagram smaller than stride",
714 );
715 }
716
717 let packet_is_quic = if stun::is(packet) {
718 trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet");
719 let packet2 = Bytes::copy_from_slice(packet);
720 self.net_checker.receive_stun_packet(packet2, meta.addr);
721 false
722 } else if let Some((sender, sealed_box)) = disco::source_and_box(packet) {
723 trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet");
725 self.handle_disco_message(
726 sender,
727 sealed_box,
728 DiscoMessageSource::Udp(meta.addr),
729 );
730 false
731 } else {
732 trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet");
733 if from_ipv4 {
734 inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _);
735 } else {
736 inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _);
737 }
738 true
739 };
740
741 if packet_is_quic {
742 quic_packets_count += 1;
743 is_quic = true;
744 } else {
745 packet[0] = 0u8;
750 }
751 }
752
753 if is_quic {
754 match self.node_map.receive_udp(meta.addr) {
756 None => {
757 warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping");
758 meta.len = 0;
760 }
761 Some((node_id, quic_mapped_addr)) => {
762 trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets");
763 quic_packets_total += quic_packets_count;
764 meta.addr = quic_mapped_addr.0;
765 }
766 }
767 } else {
768 meta.len = 0;
771 }
772 meta.dst_ip = dst_ip;
774 }
775
776 if quic_packets_total > 0 {
777 inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _);
778 trace!("UDP recv: {} packets", quic_packets_total);
779 }
780
781 Poll::Ready(Ok(msgs))
782 }
783
784 #[instrument(skip_all)]
785 fn poll_recv_relay(
786 &self,
787 cx: &mut Context,
788 bufs: &mut [io::IoSliceMut<'_>],
789 metas: &mut [quinn_udp::RecvMeta],
790 ) -> Poll<io::Result<usize>> {
791 let mut num_msgs = 0;
792 for (buf_out, meta_out) in bufs.iter_mut().zip(metas.iter_mut()) {
793 if self.is_closed() {
794 break;
795 }
796 let mut relay_recv_receiver = self.relay_recv_receiver.lock();
797 match relay_recv_receiver.try_recv() {
798 Err(mpsc::error::TryRecvError::Empty) => {
799 self.network_recv_wakers.lock().replace(cx.waker().clone());
800 break;
801 }
802 Err(mpsc::error::TryRecvError::Disconnected) => {
803 return Poll::Ready(Err(io::Error::new(
804 io::ErrorKind::NotConnected,
805 "connection closed",
806 )));
807 }
808 Ok(Err(err)) => return Poll::Ready(Err(err)),
809 Ok(Ok((node_id, meta, bytes))) => {
810 inc_by!(MagicsockMetrics, recv_data_relay, bytes.len() as _);
811 trace!(src = %meta.addr, node = %node_id.fmt_short(), count = meta.len / meta.stride, len = meta.len, "recv quic packets from relay");
812 buf_out[..bytes.len()].copy_from_slice(&bytes);
813 *meta_out = meta;
814 num_msgs += 1;
815 }
816 }
817 }
818
819 if num_msgs > 0 {
821 inc_by!(MagicsockMetrics, recv_datagrams, num_msgs as _);
822 Poll::Ready(Ok(num_msgs))
823 } else {
824 Poll::Pending
825 }
826 }
827
828 #[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))]
830 fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) {
831 trace!("handle_disco_message start");
832 if self.is_closed() {
833 return;
834 }
835
836 let dm = match self.disco_secrets.unseal_and_decode(
839 &self.secret_key,
840 sender,
841 sealed_box.to_vec(),
842 ) {
843 Ok(dm) => dm,
844 Err(DiscoBoxError::Open(err)) => {
845 warn!(?err, "failed to open disco box");
846 inc!(MagicsockMetrics, recv_disco_bad_key);
847 return;
848 }
849 Err(DiscoBoxError::Parse(err)) => {
850 inc!(MagicsockMetrics, recv_disco_bad_parse);
857 debug!(?err, "failed to parse disco message");
858 return;
859 }
860 };
861
862 if src.is_relay() {
863 inc!(MagicsockMetrics, recv_disco_relay);
864 } else {
865 inc!(MagicsockMetrics, recv_disco_udp);
866 }
867
868 let span = trace_span!("handle_disco", ?dm);
869 let _guard = span.enter();
870 trace!("receive disco message");
871 match dm {
872 disco::Message::Ping(ping) => {
873 inc!(MagicsockMetrics, recv_disco_ping);
874 self.handle_ping(ping, sender, src);
875 }
876 disco::Message::Pong(pong) => {
877 inc!(MagicsockMetrics, recv_disco_pong);
878 self.node_map.handle_pong(sender, &src, pong);
879 }
880 disco::Message::CallMeMaybe(cm) => {
881 inc!(MagicsockMetrics, recv_disco_call_me_maybe);
882 match src {
883 DiscoMessageSource::Relay { url, .. } => {
884 event!(
885 target: "events.net.call-me-maybe.recv",
886 Level::DEBUG,
887 remote_node = sender.fmt_short(),
888 via = ?url,
889 their_addrs = ?cm.my_numbers,
890 );
891 }
892 _ => {
893 warn!("call-me-maybe packets should only come via relay");
894 return;
895 }
896 }
897 let ping_actions = self.node_map.handle_call_me_maybe(sender, cm);
898 for action in ping_actions {
899 match action {
900 PingAction::SendCallMeMaybe { .. } => {
901 warn!("Unexpected CallMeMaybe as response of handling a CallMeMaybe");
902 }
903 PingAction::SendPing(ping) => {
904 self.send_ping_queued(ping);
905 }
906 }
907 }
908 }
909 }
910 trace!("disco message handled");
911 }
912
913 fn handle_ping(&self, dm: disco::Ping, sender: NodeId, src: DiscoMessageSource) {
915 let addr: SendAddr = src.clone().into();
918 let handled = self.node_map.handle_ping(sender, addr.clone(), dm.tx_id);
919 match handled.role {
920 PingRole::Duplicate => {
921 debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path already confirmed, skip");
922 return;
923 }
924 PingRole::LikelyHeartbeat => {}
925 PingRole::NewPath => {
926 debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: new path");
927 }
928 PingRole::Activate => {
929 debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: path active");
930 }
931 }
932
933 debug!(tx = %hex::encode(dm.tx_id), %addr, dstkey = %sender.fmt_short(),
935 "sending pong");
936 let pong = disco::Message::Pong(disco::Pong {
937 tx_id: dm.tx_id,
938 ping_observed_addr: addr.clone(),
939 });
940 event!(
941 target: "events.net.pong.sent",
942 Level::DEBUG,
943 remote_node = %sender.fmt_short(),
944 dst = ?addr,
945 txn = ?dm.tx_id,
946 );
947
948 if !self.send_disco_message_queued(addr.clone(), sender, pong) {
949 warn!(%addr, "failed to queue pong");
950 }
951
952 if let Some(ping) = handled.needs_ping_back {
953 debug!(
954 %addr,
955 dstkey = %sender.fmt_short(),
956 "sending direct ping back",
957 );
958 self.send_ping_queued(ping);
959 }
960 }
961
962 fn encode_disco_message(&self, dst_key: PublicKey, msg: &disco::Message) -> Bytes {
963 self.disco_secrets
964 .encode_and_seal(&self.secret_key, dst_key, msg)
965 }
966
967 fn send_ping_queued(&self, ping: SendPing) {
968 let SendPing {
969 id,
970 dst,
971 dst_node,
972 tx_id,
973 purpose,
974 } = ping;
975 let msg = disco::Message::Ping(disco::Ping {
976 tx_id,
977 node_key: self.public_key(),
978 });
979 let sent = match dst {
980 SendAddr::Udp(addr) => self
981 .udp_disco_sender
982 .try_send((addr, dst_node, msg))
983 .is_ok(),
984 SendAddr::Relay(ref url) => self.send_disco_message_relay(url, dst_node, msg),
985 };
986 if sent {
987 let msg_sender = self.actor_sender.clone();
988 trace!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
989 self.node_map
990 .notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
991 } else {
992 warn!(dst = ?dst, tx = %hex::encode(tx_id), ?purpose, "failed to send ping: queues full");
993 }
994 }
995
996 fn try_send_ping_actions(&self, msgs: Vec<PingAction>) -> io::Result<()> {
1002 for msg in msgs {
1003 if self.is_closing() || self.is_closed() {
1005 return Ok(());
1006 }
1007 match msg {
1008 PingAction::SendCallMeMaybe {
1009 ref relay_url,
1010 dst_node,
1011 } => {
1012 self.send_or_queue_call_me_maybe(relay_url, dst_node);
1013 }
1014 PingAction::SendPing(ping) => {
1015 self.try_send_ping(ping)?;
1016 }
1017 }
1018 }
1019 Ok(())
1020 }
1021
1022 fn send_disco_message_queued(
1030 &self,
1031 dst: SendAddr,
1032 dst_key: PublicKey,
1033 msg: disco::Message,
1034 ) -> bool {
1035 match dst {
1036 SendAddr::Udp(addr) => self.udp_disco_sender.try_send((addr, dst_key, msg)).is_ok(),
1037 SendAddr::Relay(ref url) => self.send_disco_message_relay(url, dst_key, msg),
1038 }
1039 }
1040
1041 fn try_send_disco_message(
1043 &self,
1044 dst: SendAddr,
1045 dst_key: PublicKey,
1046 msg: disco::Message,
1047 ) -> io::Result<()> {
1048 match dst {
1049 SendAddr::Udp(addr) => {
1050 self.try_send_disco_message_udp(addr, dst_key, &msg)?;
1051 }
1052 SendAddr::Relay(ref url) => {
1053 self.send_disco_message_relay(url, dst_key, msg);
1054 }
1055 }
1056 Ok(())
1057 }
1058
1059 fn send_disco_message_relay(&self, url: &RelayUrl, dst: NodeId, msg: disco::Message) -> bool {
1060 debug!(node = %dst.fmt_short(), %url, %msg, "send disco message (relay)");
1061 let pkt = self.encode_disco_message(dst, &msg);
1062 inc!(MagicsockMetrics, send_disco_relay);
1063 match self.try_send_relay(url, dst, smallvec![pkt]) {
1064 Ok(()) => {
1065 if let disco::Message::CallMeMaybe(CallMeMaybe { ref my_numbers }) = msg {
1066 event!(
1067 target: "events.net.call-me-maybe.sent",
1068 Level::DEBUG,
1069 remote_node = %dst.fmt_short(),
1070 via = ?url,
1071 addrs = ?my_numbers,
1072 );
1073 }
1074 inc!(MagicsockMetrics, sent_disco_relay);
1075 disco_message_sent(&msg);
1076 true
1077 }
1078 Err(_) => false,
1079 }
1080 }
1081
1082 async fn send_disco_message_udp(
1083 &self,
1084 dst: SocketAddr,
1085 dst_node: NodeId,
1086 msg: &disco::Message,
1087 ) -> io::Result<()> {
1088 futures_lite::future::poll_fn(move |cx| {
1089 loop {
1090 match self.try_send_disco_message_udp(dst, dst_node, msg) {
1091 Ok(()) => return Poll::Ready(Ok(())),
1092 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1093 let sock = self.conn_for_addr(dst)?;
1095 let sock = Arc::new(sock.clone());
1096 let mut poller = sock.create_io_poller();
1097 match poller.as_mut().poll_writable(cx)? {
1098 Poll::Ready(()) => continue,
1099 Poll::Pending => return Poll::Pending,
1100 }
1101 }
1102 Err(err) => return Poll::Ready(Err(err)),
1103 }
1104 }
1105 })
1106 .await
1107 }
1108
1109 fn try_send_disco_message_udp(
1110 &self,
1111 dst: SocketAddr,
1112 dst_node: NodeId,
1113 msg: &disco::Message,
1114 ) -> std::io::Result<()> {
1115 trace!(%dst, %msg, "send disco message (UDP)");
1116 if self.is_closed() {
1117 return Err(io::Error::new(
1118 io::ErrorKind::NotConnected,
1119 "connection closed",
1120 ));
1121 }
1122 let pkt = self.encode_disco_message(dst_node, msg);
1123 inc!(MagicsockMetrics, send_disco_udp);
1126 let transmit = quinn_udp::Transmit {
1127 destination: dst,
1128 contents: &pkt,
1129 ecn: None,
1130 segment_size: None,
1131 src_ip: None, };
1133 let sent = self.try_send_udp(dst, &transmit);
1134 match sent {
1135 Ok(()) => {
1136 trace!(%dst, node = %dst_node.fmt_short(), %msg, "sent disco message");
1137 inc!(MagicsockMetrics, sent_disco_udp);
1138 disco_message_sent(msg);
1139 Ok(())
1140 }
1141 Err(err) => {
1142 warn!(%dst, node = %dst_node.fmt_short(), ?msg, ?err,
1143 "failed to send disco message");
1144 Err(err)
1145 }
1146 }
1147 }
1148
1149 #[instrument(skip_all)]
1150 async fn handle_ping_actions(&mut self, msgs: Vec<PingAction>) {
1151 if let Err(err) = self.try_send_ping_actions(msgs) {
1156 warn!("Not all ping actions were sent: {err:#}");
1157 }
1158 }
1159
1160 fn try_send_ping(&self, ping: SendPing) -> io::Result<()> {
1161 let SendPing {
1162 id,
1163 dst,
1164 dst_node,
1165 tx_id,
1166 purpose,
1167 } = ping;
1168 let msg = disco::Message::Ping(disco::Ping {
1169 tx_id,
1170 node_key: self.public_key(),
1171 });
1172 self.try_send_disco_message(dst.clone(), dst_node, msg)?;
1173 debug!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (polled)");
1174 let msg_sender = self.actor_sender.clone();
1175 self.node_map
1176 .notify_ping_sent(id, dst.clone(), tx_id, purpose, msg_sender);
1177 Ok(())
1178 }
1179
1180 fn poll_send_relay(
1181 &self,
1182 url: &RelayUrl,
1183 node: PublicKey,
1184 contents: RelayContents,
1185 ) -> Poll<bool> {
1186 trace!(node = %node.fmt_short(), relay_url = %url, count = contents.len(), len = contents.iter().map(|c| c.len()).sum::<usize>(), "send relay");
1187 let msg = RelayActorMessage::Send {
1188 url: url.clone(),
1189 contents,
1190 remote_node: node,
1191 };
1192 match self.relay_actor_sender.try_send(msg) {
1193 Ok(_) => {
1194 trace!(node = %node.fmt_short(), relay_url = %url, "send relay: message queued");
1195 Poll::Ready(true)
1196 }
1197 Err(mpsc::error::TrySendError::Closed(_)) => {
1198 warn!(node = %node.fmt_short(), relay_url = %url, "send relay: message dropped, channel to actor is closed");
1199 Poll::Ready(false)
1200 }
1201 Err(mpsc::error::TrySendError::Full(_)) => {
1202 warn!(node = %node.fmt_short(), relay_url = %url, "send relay: message dropped, channel to actor is full");
1203 Poll::Pending
1204 }
1205 }
1206 }
1207
1208 fn send_queued_call_me_maybes(&self) {
1209 let msg = self.direct_addrs.to_call_me_maybe_message();
1210 let msg = disco::Message::CallMeMaybe(msg);
1211 for (public_key, url) in self.pending_call_me_maybes.lock().drain() {
1212 if !self.send_disco_message_relay(&url, public_key, msg.clone()) {
1213 warn!(node = %public_key.fmt_short(), "relay channel full, dropping call-me-maybe");
1214 }
1215 }
1216 }
1217
1218 fn send_or_queue_call_me_maybe(&self, url: &RelayUrl, dst_node: NodeId) {
1224 match self.direct_addrs.fresh_enough() {
1225 Ok(()) => {
1226 let msg = self.direct_addrs.to_call_me_maybe_message();
1227 let msg = disco::Message::CallMeMaybe(msg);
1228 if !self.send_disco_message_relay(url, dst_node, msg) {
1229 warn!(dstkey = %dst_node.fmt_short(), relayurl = %url,
1230 "relay channel full, dropping call-me-maybe");
1231 } else {
1232 debug!(dstkey = %dst_node.fmt_short(), relayurl = %url, "call-me-maybe sent");
1233 }
1234 }
1235 Err(last_refresh_ago) => {
1236 self.pending_call_me_maybes
1237 .lock()
1238 .insert(dst_node, url.clone());
1239 debug!(
1240 ?last_refresh_ago,
1241 "want call-me-maybe but direct addrs stale; queuing after restun",
1242 );
1243 self.re_stun("refresh-for-peering");
1244 }
1245 }
1246 }
1247
1248 #[instrument(skip_all)]
1250 fn re_stun(&self, why: &'static str) {
1251 debug!("re_stun: {}", why);
1252 inc!(MagicsockMetrics, re_stun_calls);
1253 self.direct_addr_update_state.schedule_run(why);
1254 }
1255
1256 fn publish_my_addr(&self) {
1260 if let Some(ref discovery) = self.discovery {
1261 let info = AddrInfo {
1262 relay_url: self.my_relay(),
1263 direct_addresses: self.direct_addrs.sockaddrs(),
1264 };
1265 discovery.publish(&info);
1266 }
1267 }
1268}
1269
1270#[derive(Clone, Debug)]
1271enum DiscoMessageSource {
1272 Udp(SocketAddr),
1273 Relay { url: RelayUrl, key: PublicKey },
1274}
1275
1276impl Display for DiscoMessageSource {
1277 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1278 match self {
1279 Self::Udp(addr) => write!(f, "Udp({addr})"),
1280 Self::Relay { ref url, key } => write!(f, "Relay({url}, {})", key.fmt_short()),
1281 }
1282 }
1283}
1284
1285impl From<DiscoMessageSource> for SendAddr {
1286 fn from(value: DiscoMessageSource) -> Self {
1287 match value {
1288 DiscoMessageSource::Udp(addr) => SendAddr::Udp(addr),
1289 DiscoMessageSource::Relay { url, .. } => SendAddr::Relay(url),
1290 }
1291 }
1292}
1293
1294impl From<&DiscoMessageSource> for SendAddr {
1295 fn from(value: &DiscoMessageSource) -> Self {
1296 match value {
1297 DiscoMessageSource::Udp(addr) => SendAddr::Udp(*addr),
1298 DiscoMessageSource::Relay { url, .. } => SendAddr::Relay(url.clone()),
1299 }
1300 }
1301}
1302
1303impl DiscoMessageSource {
1304 fn is_relay(&self) -> bool {
1305 matches!(self, DiscoMessageSource::Relay { .. })
1306 }
1307}
1308
1309#[derive(Debug)]
1316struct DirectAddrUpdateState {
1317 running: sync::watch::Sender<Option<&'static str>>,
1319 want_update: parking_lot::Mutex<Option<&'static str>>,
1321}
1322
1323impl DirectAddrUpdateState {
1324 fn new() -> Self {
1325 let (running, _) = sync::watch::channel(None);
1326 DirectAddrUpdateState {
1327 running,
1328 want_update: Default::default(),
1329 }
1330 }
1331
1332 fn schedule_run(&self, why: &'static str) {
1335 if self.is_running() {
1336 let _ = self.want_update.lock().insert(why);
1337 } else {
1338 self.run(why);
1339 }
1340 }
1341
1342 fn is_running(&self) -> bool {
1344 self.running.borrow().is_some()
1345 }
1346
1347 fn run(&self, why: &'static str) {
1349 self.running.send(Some(why)).ok();
1350 }
1351
1352 fn finish_run(&self) {
1354 self.running.send(None).ok();
1355 }
1356
1357 fn next_update(&self) -> Option<&'static str> {
1359 self.want_update.lock().take()
1360 }
1361}
1362
1363impl Handle {
1364 async fn new(opts: Options) -> Result<Self> {
1366 let me = opts.secret_key.public().fmt_short();
1367 if crate::util::relay_only_mode() {
1368 warn!(
1369 "creating a MagicSock that will only send packets over a relay relay connection."
1370 );
1371 }
1372
1373 Self::with_name(me, opts)
1374 .instrument(error_span!("magicsock"))
1375 .await
1376 }
1377
1378 async fn with_name(me: String, opts: Options) -> Result<Self> {
1379 let port_mapper = portmapper::Client::default();
1380
1381 let Options {
1382 addr_v4,
1383 addr_v6,
1384 secret_key,
1385 relay_map,
1386 node_map,
1387 discovery,
1388 dns_resolver,
1389 proxy_url,
1390 #[cfg(any(test, feature = "test-utils"))]
1391 insecure_skip_relay_cert_verify,
1392 } = opts;
1393
1394 let (relay_recv_sender, relay_recv_receiver) = mpsc::channel(128);
1395
1396 let (pconn4, pconn6) = bind(addr_v4, addr_v6)?;
1397 let port = pconn4.port();
1398
1399 match port.try_into() {
1401 Ok(non_zero_port) => {
1402 port_mapper.update_local_port(non_zero_port);
1403 }
1404 Err(_zero_port) => debug!("Skipping port mapping with zero local port"),
1405 }
1406 let ipv4_addr = pconn4.local_addr()?;
1407 let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok());
1408
1409 let net_checker = netcheck::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?;
1410
1411 let (actor_sender, actor_receiver) = mpsc::channel(256);
1412 let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
1413 let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);
1414
1415 let node_map = node_map.unwrap_or_default();
1417 let node_map = NodeMap::load_from_vec(node_map);
1418
1419 let inner = Arc::new(MagicSock {
1420 me,
1421 port: AtomicU16::new(port),
1422 secret_key,
1423 proxy_url,
1424 local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)),
1425 closing: AtomicBool::new(false),
1426 closed: AtomicBool::new(false),
1427 relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver),
1428 network_recv_wakers: parking_lot::Mutex::new(None),
1429 network_send_wakers: Arc::new(parking_lot::Mutex::new(None)),
1430 actor_sender: actor_sender.clone(),
1431 ipv6_reported: Arc::new(AtomicBool::new(false)),
1432 relay_map,
1433 my_relay: Default::default(),
1434 pconn4: pconn4.clone(),
1435 pconn6: pconn6.clone(),
1436 net_checker: net_checker.addr(),
1437 disco_secrets: DiscoSecrets::default(),
1438 node_map,
1439 relay_actor_sender: relay_actor_sender.clone(),
1440 udp_disco_sender,
1441 discovery,
1442 direct_addrs: Default::default(),
1443 pending_call_me_maybes: Default::default(),
1444 direct_addr_update_state: DirectAddrUpdateState::new(),
1445 dns_resolver,
1446 #[cfg(any(test, feature = "test-utils"))]
1447 insecure_skip_relay_cert_verify,
1448 });
1449
1450 let mut actor_tasks = JoinSet::default();
1451
1452 let relay_actor = RelayActor::new(inner.clone(), actor_sender.clone());
1453 let relay_actor_cancel_token = relay_actor.cancel_token();
1454 actor_tasks.spawn(
1455 async move {
1456 relay_actor.run(relay_actor_receiver).await;
1457 }
1458 .instrument(info_span!("relay-actor")),
1459 );
1460
1461 let inner2 = inner.clone();
1462 actor_tasks.spawn(async move {
1463 while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await {
1464 if let Err(err) = inner2.send_disco_message_udp(dst, dst_key, &msg).await {
1465 warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)");
1466 }
1467 }
1468 });
1469
1470 let inner2 = inner.clone();
1471 let network_monitor = netmon::Monitor::new().await?;
1472 actor_tasks.spawn(
1473 async move {
1474 let actor = Actor {
1475 msg_receiver: actor_receiver,
1476 msg_sender: actor_sender,
1477 relay_actor_sender,
1478 relay_actor_cancel_token,
1479 msock: inner2,
1480 relay_recv_sender,
1481 periodic_re_stun_timer: new_re_stun_timer(false),
1482 net_info_last: None,
1483 port_mapper,
1484 pconn4,
1485 pconn6,
1486 no_v4_send: false,
1487 net_checker,
1488 network_monitor,
1489 };
1490
1491 if let Err(err) = actor.run().await {
1492 warn!("relay handler errored: {:?}", err);
1493 }
1494 }
1495 .instrument(info_span!("actor")),
1496 );
1497
1498 let c = Handle {
1499 msock: inner,
1500 actor_tasks: Arc::new(Mutex::new(actor_tasks)),
1501 };
1502
1503 Ok(c)
1504 }
1505
1506 #[instrument(skip_all, fields(me = %self.msock.me))]
1512 pub(crate) async fn close(&self) -> Result<()> {
1513 if self.msock.is_closed() {
1514 return Ok(());
1515 }
1516 self.msock.closing.store(true, Ordering::Relaxed);
1517 self.msock.actor_sender.send(ActorMessage::Shutdown).await?;
1518 self.msock.closed.store(true, Ordering::SeqCst);
1519 self.msock.direct_addrs.addrs.shutdown();
1520
1521 let mut tasks = self.actor_tasks.lock().await;
1522
1523 let tasks_ref = &mut tasks;
1525 let shutdown_done = time::timeout(Duration::from_millis(100), async move {
1526 while let Some(task) = tasks_ref.join_next().await {
1527 if let Err(err) = task {
1528 warn!("unexpected error in task shutdown: {:?}", err);
1529 }
1530 }
1531 })
1532 .await;
1533 if shutdown_done.is_ok() {
1534 debug!("tasks shutdown complete");
1535 } else {
1536 debug!("aborting remaining {}/3 tasks", tasks.len());
1538 tasks.shutdown().await;
1539 }
1540
1541 Ok(())
1542 }
1543}
1544
1545#[derive(Debug, Default)]
1546struct DiscoSecrets(parking_lot::Mutex<HashMap<PublicKey, SharedSecret>>);
1547
1548impl DiscoSecrets {
1549 fn get(
1550 &self,
1551 secret: &SecretKey,
1552 node_id: PublicKey,
1553 ) -> parking_lot::MappedMutexGuard<SharedSecret> {
1554 parking_lot::MutexGuard::map(self.0.lock(), |inner| {
1555 inner
1556 .entry(node_id)
1557 .or_insert_with(|| secret.shared(&node_id))
1558 })
1559 }
1560
1561 pub fn encode_and_seal(
1562 &self,
1563 secret_key: &SecretKey,
1564 node_id: PublicKey,
1565 msg: &disco::Message,
1566 ) -> Bytes {
1567 let mut seal = msg.as_bytes();
1568 self.get(secret_key, node_id).seal(&mut seal);
1569 disco::encode_message(&secret_key.public(), seal).into()
1570 }
1571
1572 pub fn unseal_and_decode(
1573 &self,
1574 secret: &SecretKey,
1575 node_id: PublicKey,
1576 mut sealed_box: Vec<u8>,
1577 ) -> Result<disco::Message, DiscoBoxError> {
1578 self.get(secret, node_id)
1579 .open(&mut sealed_box)
1580 .map_err(DiscoBoxError::Open)?;
1581 disco::Message::from_bytes(&sealed_box).map_err(DiscoBoxError::Parse)
1582 }
1583}
1584
1585#[derive(Debug, thiserror::Error)]
1586enum DiscoBoxError {
1587 #[error("Failed to open crypto box")]
1588 Open(anyhow::Error),
1589 #[error("Failed to parse disco message")]
1590 Parse(anyhow::Error),
1591}
1592
1593type RelayRecvResult = Result<(PublicKey, quinn_udp::RecvMeta, Bytes), io::Error>;
1594
1595impl AsyncUdpSocket for Handle {
1596 fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn quinn::UdpPoller>> {
1597 self.msock.create_io_poller()
1598 }
1599
1600 fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
1601 self.msock.try_send(transmit)
1602 }
1603
1604 fn poll_recv(
1606 &self,
1607 cx: &mut Context,
1608 bufs: &mut [io::IoSliceMut<'_>],
1609 metas: &mut [quinn_udp::RecvMeta],
1610 ) -> Poll<io::Result<usize>> {
1611 self.msock.poll_recv(cx, bufs, metas)
1612 }
1613
1614 fn local_addr(&self) -> io::Result<SocketAddr> {
1615 match &*self.msock.local_addrs.read().expect("not poisoned") {
1616 (ipv4, None) => {
1617 let ip: IpAddr = match ipv4.ip() {
1620 IpAddr::V4(ip) => ip.to_ipv6_mapped().into(),
1621 IpAddr::V6(ip) => ip.into(),
1622 };
1623 Ok(SocketAddr::new(ip, ipv4.port()))
1624 }
1625 (_, Some(ipv6)) => Ok(*ipv6),
1626 }
1627 }
1628
1629 fn max_transmit_segments(&self) -> usize {
1630 if let Some(pconn6) = self.pconn6.as_ref() {
1631 std::cmp::min(
1632 pconn6.max_transmit_segments(),
1633 self.pconn4.max_transmit_segments(),
1634 )
1635 } else {
1636 self.pconn4.max_transmit_segments()
1637 }
1638 }
1639
1640 fn max_receive_segments(&self) -> usize {
1641 if let Some(pconn6) = self.pconn6.as_ref() {
1642 std::cmp::max(
1649 pconn6.max_receive_segments(),
1650 self.pconn4.max_receive_segments(),
1651 )
1652 } else {
1653 self.pconn4.max_receive_segments()
1654 }
1655 }
1656
1657 fn may_fragment(&self) -> bool {
1658 if let Some(pconn6) = self.pconn6.as_ref() {
1659 pconn6.may_fragment() || self.pconn4.may_fragment()
1660 } else {
1661 self.pconn4.may_fragment()
1662 }
1663 }
1664}
1665
1666#[derive(Debug)]
1667struct IoPoller {
1668 ipv4_poller: Pin<Box<dyn quinn::UdpPoller>>,
1669 ipv6_poller: Option<Pin<Box<dyn quinn::UdpPoller>>>,
1670 relay_sender: mpsc::Sender<RelayActorMessage>,
1671 relay_send_waker: Arc<parking_lot::Mutex<Option<Waker>>>,
1672}
1673
1674impl quinn::UdpPoller for IoPoller {
1675 fn poll_writable(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
1676 let this = &mut *self;
1678 match this.ipv4_poller.as_mut().poll_writable(cx) {
1679 Poll::Ready(_) => return Poll::Ready(Ok(())),
1680 Poll::Pending => (),
1681 }
1682 if let Some(ref mut ipv6_poller) = this.ipv6_poller {
1683 match ipv6_poller.as_mut().poll_writable(cx) {
1684 Poll::Ready(_) => return Poll::Ready(Ok(())),
1685 Poll::Pending => (),
1686 }
1687 }
1688 match this.relay_sender.capacity() {
1689 0 => {
1690 self.relay_send_waker.lock().replace(cx.waker().clone());
1691 Poll::Pending
1692 }
1693 _ => Poll::Ready(Ok(())),
1694 }
1695 }
1696}
1697
1698#[derive(Debug)]
1699enum ActorMessage {
1700 Shutdown,
1701 ReceiveRelay(RelayReadResult),
1702 EndpointPingExpired(usize, stun::TransactionId),
1703 NetcheckReport(Result<Option<Arc<netcheck::Report>>>, &'static str),
1704 NetworkChange,
1705 #[cfg(test)]
1706 ForceNetworkChange(bool),
1707}
1708
1709struct Actor {
1710 msock: Arc<MagicSock>,
1711 msg_receiver: mpsc::Receiver<ActorMessage>,
1712 msg_sender: mpsc::Sender<ActorMessage>,
1713 relay_actor_sender: mpsc::Sender<RelayActorMessage>,
1714 relay_actor_cancel_token: CancellationToken,
1715 relay_recv_sender: mpsc::Sender<RelayRecvResult>,
1717 periodic_re_stun_timer: time::Interval,
1719 net_info_last: Option<NetInfo>,
1721
1722 pconn4: UdpConn,
1724 pconn6: Option<UdpConn>,
1725
1726 port_mapper: portmapper::Client,
1728
1729 no_v4_send: bool,
1733
1734 net_checker: netcheck::Client,
1736
1737 network_monitor: netmon::Monitor,
1738}
1739
1740impl Actor {
1741 async fn run(mut self) -> Result<()> {
1742 let (link_change_s, mut link_change_r) = mpsc::channel(8);
1744 let _token = self
1745 .network_monitor
1746 .subscribe(move |is_major| {
1747 let link_change_s = link_change_s.clone();
1748 async move {
1749 link_change_s.send(is_major).await.ok();
1750 }
1751 .boxed()
1752 })
1753 .await?;
1754
1755 let mut direct_addr_heartbeat_timer = time::interval_at(
1757 time::Instant::now() + HEARTBEAT_INTERVAL,
1758 HEARTBEAT_INTERVAL,
1759 );
1760 let mut direct_addr_update_receiver =
1761 self.msock.direct_addr_update_state.running.subscribe();
1762 let mut portmap_watcher = self.port_mapper.watch_external_address();
1763
1764 let mut discovery_events: BoxStream<DiscoveryItem> =
1765 Box::pin(futures_lite::stream::empty());
1766 if let Some(d) = self.msock.discovery() {
1767 if let Some(events) = d.subscribe() {
1768 discovery_events = events;
1769 }
1770 }
1771 loop {
1772 inc!(Metrics, actor_tick_main);
1773 tokio::select! {
1774 Some(msg) = self.msg_receiver.recv() => {
1775 trace!(?msg, "tick: msg");
1776 inc!(Metrics, actor_tick_msg);
1777 if self.handle_actor_message(msg).await {
1778 return Ok(());
1779 }
1780 }
1781 tick = self.periodic_re_stun_timer.tick() => {
1782 trace!("tick: re_stun {:?}", tick);
1783 inc!(Metrics, actor_tick_re_stun);
1784 self.msock.re_stun("periodic");
1785 }
1786 Ok(()) = portmap_watcher.changed() => {
1787 trace!("tick: portmap changed");
1788 inc!(Metrics, actor_tick_portmap_changed);
1789 let new_external_address = *portmap_watcher.borrow();
1790 debug!("external address updated: {new_external_address:?}");
1791 self.msock.re_stun("portmap_updated");
1792 },
1793 _ = direct_addr_heartbeat_timer.tick() => {
1794 trace!(
1795 "tick: direct addr heartbeat {} direct addrs",
1796 self.msock.node_map.node_count(),
1797 );
1798 inc!(Metrics, actor_tick_direct_addr_heartbeat);
1799 self.msock.node_map.prune_inactive();
1802 let msgs = self.msock.node_map.nodes_stayin_alive();
1803 self.handle_ping_actions(msgs).await;
1804 }
1805 _ = direct_addr_update_receiver.changed() => {
1806 let reason = *direct_addr_update_receiver.borrow();
1807 trace!("tick: direct addr update receiver {:?}", reason);
1808 inc!(Metrics, actor_tick_direct_addr_update_receiver);
1809 if let Some(reason) = reason {
1810 self.refresh_direct_addrs(reason).await;
1811 }
1812 }
1813 Some(is_major) = link_change_r.recv() => {
1814 trace!("tick: link change {}", is_major);
1815 inc!(Metrics, actor_link_change);
1816 self.handle_network_change(is_major).await;
1817 }
1818 Some(discovery_item) = discovery_events.next() => {
1819 trace!("tick: discovery event, address discovered: {discovery_item:?}");
1820 let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info};
1821 if let Err(e) = self.msock.add_node_addr(node_addr.clone(), Source::Discovery { name: discovery_item.provenance.into() }) {
1822 warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}");
1823 }
1824 }
1825 else => {
1826 trace!("tick: other");
1827 inc!(Metrics, actor_tick_other);
1828 }
1829 }
1830 }
1831 }
1832
1833 async fn handle_network_change(&mut self, is_major: bool) {
1834 debug!("link change detected: major? {}", is_major);
1835
1836 if is_major {
1837 self.msock.dns_resolver.clear_cache();
1838 self.msock.re_stun("link-change-major");
1839 self.close_stale_relay_connections().await;
1840 self.reset_endpoint_states();
1841 } else {
1842 self.msock.re_stun("link-change-minor");
1843 }
1844 }
1845
1846 #[instrument(skip_all)]
1847 async fn handle_ping_actions(&mut self, msgs: Vec<PingAction>) {
1848 if let Err(err) = self.msock.try_send_ping_actions(msgs) {
1853 warn!("Not all ping actions were sent: {err:#}");
1854 }
1855 }
1856
1857 async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool {
1861 match msg {
1862 ActorMessage::Shutdown => {
1863 debug!("shutting down");
1864
1865 self.msock.node_map.notify_shutdown();
1866 self.port_mapper.deactivate();
1867 self.relay_actor_cancel_token.cancel();
1868
1869 debug!("stopping connections");
1872 if let Some(ref conn) = self.pconn6 {
1873 conn.close().await.ok();
1874 }
1875 self.pconn4.close().await.ok();
1876
1877 debug!("shutdown complete");
1878 return true;
1879 }
1880 ActorMessage::ReceiveRelay(read_result) => {
1881 let passthroughs = self.process_relay_read_result(read_result);
1882 for passthrough in passthroughs {
1883 self.relay_recv_sender
1884 .send(passthrough)
1885 .await
1886 .expect("missing recv sender");
1887 let mut wakers = self.msock.network_recv_wakers.lock();
1888 if let Some(waker) = wakers.take() {
1889 waker.wake();
1890 }
1891 }
1892 }
1893 ActorMessage::EndpointPingExpired(id, txid) => {
1894 self.msock.node_map.notify_ping_timeout(id, txid);
1895 }
1896 ActorMessage::NetcheckReport(report, why) => {
1897 match report {
1898 Ok(report) => {
1899 self.handle_netcheck_report(report).await;
1900 }
1901 Err(err) => {
1902 warn!("failed to generate netcheck report for: {}: {:?}", why, err);
1903 }
1904 }
1905 self.finalize_direct_addrs_update(why);
1906 }
1907 ActorMessage::NetworkChange => {
1908 self.network_monitor.network_change().await.ok();
1909 }
1910 #[cfg(test)]
1911 ActorMessage::ForceNetworkChange(is_major) => {
1912 self.handle_network_change(is_major).await;
1913 }
1914 }
1915
1916 false
1917 }
1918
1919 #[cfg_attr(windows, allow(dead_code))]
1920 fn normalized_local_addr(&self) -> io::Result<SocketAddr> {
1921 self.msock.normalized_local_addr()
1922 }
1923
1924 fn process_relay_read_result(&mut self, dm: RelayReadResult) -> Vec<RelayRecvResult> {
1925 trace!("process_relay_read {} bytes", dm.buf.len());
1926 if dm.buf.is_empty() {
1927 warn!("received empty relay packet");
1928 return Vec::new();
1929 }
1930 let url = &dm.url;
1931
1932 let quic_mapped_addr = self.msock.node_map.receive_relay(url, dm.src);
1933
1934 let parts = PacketSplitIter::new(dm.buf);
1938 #[cfg(not(windows))]
1940 let dst_ip = self.normalized_local_addr().ok().map(|addr| addr.ip());
1941 #[cfg(windows)]
1943 let dst_ip = None;
1944
1945 let mut out = Vec::new();
1946 for part in parts {
1947 match part {
1948 Ok(part) => {
1949 if self.handle_relay_disco_message(&part, url, dm.src) {
1950 continue;
1952 }
1953
1954 let meta = quinn_udp::RecvMeta {
1955 len: part.len(),
1956 stride: part.len(),
1957 addr: quic_mapped_addr.0,
1958 dst_ip,
1959 ecn: None,
1960 };
1961 out.push(Ok((dm.src, meta, part)));
1962 }
1963 Err(e) => {
1964 out.push(Err(e));
1965 }
1966 }
1967 }
1968
1969 out
1970 }
1971
1972 #[instrument(level = "debug", skip_all)]
1980 async fn refresh_direct_addrs(&mut self, why: &'static str) {
1981 inc!(MagicsockMetrics, update_direct_addrs);
1982
1983 debug!("starting direct addr update ({})", why);
1984 self.port_mapper.procure_mapping();
1985 self.update_net_info(why).await;
1986 }
1987
1988 fn update_direct_addresses(&mut self, netcheck_report: Option<Arc<netcheck::Report>>) {
1997 let portmap_watcher = self.port_mapper.watch_external_address();
1998
1999 let mut addrs: BTreeMap<SocketAddr, DirectAddrType> = BTreeMap::new();
2003
2004 let maybe_port_mapped = *portmap_watcher.borrow();
2006 if let Some(portmap_ext) = maybe_port_mapped.map(SocketAddr::V4) {
2007 addrs
2008 .entry(portmap_ext)
2009 .or_insert(DirectAddrType::Portmapped);
2010 self.set_net_info_have_port_map();
2011 }
2012
2013 if let Some(netcheck_report) = netcheck_report {
2015 if let Some(global_v4) = netcheck_report.global_v4 {
2016 addrs
2017 .entry(global_v4.into())
2018 .or_insert(DirectAddrType::Stun);
2019
2020 let port = self.msock.port.load(Ordering::Relaxed);
2025 if netcheck_report
2026 .mapping_varies_by_dest_ip
2027 .unwrap_or_default()
2028 && port != 0
2029 {
2030 let mut addr = global_v4;
2031 addr.set_port(port);
2032 addrs
2033 .entry(addr.into())
2034 .or_insert(DirectAddrType::Stun4LocalPort);
2035 }
2036 }
2037 if let Some(global_v6) = netcheck_report.global_v6 {
2038 addrs
2039 .entry(global_v6.into())
2040 .or_insert(DirectAddrType::Stun);
2041 }
2042 }
2043
2044 let local_addr_v4 = self.pconn4.local_addr().ok();
2045 let local_addr_v6 = self.pconn6.as_ref().and_then(|c| c.local_addr().ok());
2046
2047 let is_unspecified_v4 = local_addr_v4
2048 .map(|a| a.ip().is_unspecified())
2049 .unwrap_or(false);
2050 let is_unspecified_v6 = local_addr_v6
2051 .map(|a| a.ip().is_unspecified())
2052 .unwrap_or(false);
2053
2054 let msock = self.msock.clone();
2055
2056 tokio::spawn(
2059 async move {
2060 if is_unspecified_v4 || is_unspecified_v6 {
2063 let LocalAddresses {
2067 regular: mut ips,
2068 loopback,
2069 } = tokio::task::spawn_blocking(LocalAddresses::new)
2070 .await
2071 .unwrap();
2072 if ips.is_empty() && addrs.is_empty() {
2073 ips = loopback;
2076 }
2077 for ip in ips {
2078 let port_if_unspecified = match ip {
2079 IpAddr::V4(_) if is_unspecified_v4 => {
2080 local_addr_v4.map(|addr| addr.port())
2081 }
2082 IpAddr::V6(_) if is_unspecified_v6 => {
2083 local_addr_v6.map(|addr| addr.port())
2084 }
2085 _ => None,
2086 };
2087 if let Some(port) = port_if_unspecified {
2088 let addr = SocketAddr::new(ip, port);
2089 addrs.entry(addr).or_insert(DirectAddrType::Local);
2090 }
2091 }
2092 }
2093
2094 if !is_unspecified_v4 {
2096 if let Some(addr) = local_addr_v4 {
2097 addrs.entry(addr).or_insert(DirectAddrType::Local);
2098 }
2099 }
2100 if !is_unspecified_v6 {
2101 if let Some(addr) = local_addr_v6 {
2102 addrs.entry(addr).or_insert(DirectAddrType::Local);
2103 }
2104 }
2105
2106 msock.store_direct_addresses(
2109 addrs
2110 .iter()
2111 .map(|(addr, typ)| DirectAddr {
2112 addr: *addr,
2113 typ: *typ,
2114 })
2115 .collect(),
2116 );
2117 msock.send_queued_call_me_maybes();
2118 }
2119 .instrument(Span::current()),
2120 );
2121 }
2122
2123 fn finalize_direct_addrs_update(&mut self, why: &'static str) {
2125 let new_why = self.msock.direct_addr_update_state.next_update();
2126 if !self.msock.is_closed() {
2127 if let Some(new_why) = new_why {
2128 self.msock.direct_addr_update_state.run(new_why);
2129 return;
2130 }
2131 self.periodic_re_stun_timer = new_re_stun_timer(true);
2132 }
2133
2134 self.msock.direct_addr_update_state.finish_run();
2135 debug!("direct addr update done ({})", why);
2136 }
2137
2138 #[instrument(level = "debug", skip_all)]
2140 fn set_net_info_have_port_map(&mut self) {
2141 if let Some(ref mut net_info_last) = self.net_info_last {
2142 if net_info_last.have_port_map {
2143 return;
2145 }
2146 net_info_last.have_port_map = true;
2147 self.net_info_last = Some(net_info_last.clone());
2148 }
2149 }
2150
2151 #[instrument(level = "debug", skip_all)]
2152 async fn call_net_info_callback(&mut self, ni: NetInfo) {
2153 if let Some(ref net_info_last) = self.net_info_last {
2154 if ni.basically_equal(net_info_last) {
2155 return;
2156 }
2157 }
2158
2159 self.net_info_last = Some(ni);
2160 }
2161
2162 #[instrument(level = "debug", skip_all)]
2168 async fn update_net_info(&mut self, why: &'static str) {
2169 if self.msock.relay_map.is_empty() {
2170 debug!("skipping netcheck, empty RelayMap");
2171 self.msg_sender
2172 .send(ActorMessage::NetcheckReport(Ok(None), why))
2173 .await
2174 .ok();
2175 return;
2176 }
2177
2178 let relay_map = self.msock.relay_map.clone();
2179 let pconn4 = Some(self.pconn4.as_socket());
2180 let pconn6 = self.pconn6.as_ref().map(|p| p.as_socket());
2181
2182 debug!("requesting netcheck report");
2183 match self
2184 .net_checker
2185 .get_report_channel(relay_map, pconn4, pconn6)
2186 .await
2187 {
2188 Ok(rx) => {
2189 let msg_sender = self.msg_sender.clone();
2190 tokio::task::spawn(async move {
2191 let report = time::timeout(NETCHECK_REPORT_TIMEOUT, rx).await;
2192 let report: anyhow::Result<_> = match report {
2193 Ok(Ok(Ok(report))) => Ok(Some(report)),
2194 Ok(Ok(Err(err))) => Err(err),
2195 Ok(Err(_)) => Err(anyhow!("netcheck report not received")),
2196 Err(err) => Err(anyhow!("netcheck report timeout: {:?}", err)),
2197 };
2198 msg_sender
2199 .send(ActorMessage::NetcheckReport(report, why))
2200 .await
2201 .ok();
2202 });
2205 }
2206 Err(err) => {
2207 warn!("unable to start netcheck generation: {:?}", err);
2208 self.finalize_direct_addrs_update(why);
2209 }
2210 }
2211 }
2212
2213 async fn handle_netcheck_report(&mut self, report: Option<Arc<netcheck::Report>>) {
2214 if let Some(ref report) = report {
2215 self.msock
2216 .ipv6_reported
2217 .store(report.ipv6, Ordering::Relaxed);
2218 let r = &report;
2219 trace!(
2220 "setting no_v4_send {} -> {}",
2221 self.no_v4_send,
2222 !r.ipv4_can_send
2223 );
2224 self.no_v4_send = !r.ipv4_can_send;
2225
2226 let have_port_map = self.port_mapper.watch_external_address().borrow().is_some();
2227 let mut ni = NetInfo {
2228 relay_latency: Default::default(),
2229 mapping_varies_by_dest_ip: r.mapping_varies_by_dest_ip,
2230 hair_pinning: r.hair_pinning,
2231 portmap_probe: r.portmap_probe.clone(),
2232 have_port_map,
2233 working_ipv6: Some(r.ipv6),
2234 os_has_ipv6: Some(r.os_has_ipv6),
2235 working_udp: Some(r.udp),
2236 working_icmp_v4: r.icmpv4,
2237 working_icmp_v6: r.icmpv6,
2238 preferred_relay: r.preferred_relay.clone(),
2239 };
2240 for (rid, d) in r.relay_v4_latency.iter() {
2241 ni.relay_latency
2242 .insert(format!("{rid}-v4"), d.as_secs_f64());
2243 }
2244 for (rid, d) in r.relay_v6_latency.iter() {
2245 ni.relay_latency
2246 .insert(format!("{rid}-v6"), d.as_secs_f64());
2247 }
2248
2249 if ni.preferred_relay.is_none() {
2250 ni.preferred_relay = self.pick_relay_fallback();
2252 }
2253
2254 if !self.set_nearest_relay(ni.preferred_relay.clone()) {
2255 ni.preferred_relay = None;
2256 }
2257
2258 self.call_net_info_callback(ni).await;
2260 }
2261 self.update_direct_addresses(report);
2262 }
2263
2264 fn set_nearest_relay(&mut self, relay_url: Option<RelayUrl>) -> bool {
2265 let my_relay = self.msock.my_relay();
2266 if relay_url == my_relay {
2267 return true;
2269 }
2270 let old_relay = self.msock.set_my_relay(relay_url.clone());
2271
2272 if let Some(ref relay_url) = relay_url {
2273 inc!(MagicsockMetrics, relay_home_change);
2274
2275 info!("home is now relay {}, was {:?}", relay_url, old_relay);
2278 self.msock.publish_my_addr();
2279
2280 self.send_relay_actor(RelayActorMessage::SetHome {
2281 url: relay_url.clone(),
2282 });
2283 }
2284
2285 true
2286 }
2287
2288 fn pick_relay_fallback(&self) -> Option<RelayUrl> {
2294 let my_relay = self.msock.my_relay();
2305 if my_relay.is_some() {
2306 return my_relay;
2307 }
2308
2309 let ids = self.msock.relay_map.urls().collect::<Vec<_>>();
2310 let mut rng = rand::rngs::StdRng::seed_from_u64(0);
2311 ids.choose(&mut rng).map(|c| (*c).clone())
2312 }
2313
2314 #[instrument(skip_all, fields(me = %self.msock.me))]
2317 fn reset_endpoint_states(&mut self) {
2318 self.msock.node_map.reset_node_states()
2319 }
2320
2321 async fn close_stale_relay_connections(&self) {
2327 let ifs = interfaces::State::new().await;
2328 let local_ips = ifs
2329 .interfaces
2330 .values()
2331 .flat_map(|netif| netif.addrs())
2332 .map(|ipnet| ipnet.addr())
2333 .collect();
2334 self.send_relay_actor(RelayActorMessage::MaybeCloseRelaysOnRebind(local_ips));
2335 }
2336
2337 fn send_relay_actor(&self, msg: RelayActorMessage) {
2338 match self.relay_actor_sender.try_send(msg) {
2339 Ok(_) => {}
2340 Err(mpsc::error::TrySendError::Closed(_)) => {
2341 warn!("unable to send to relay actor, already closed");
2342 }
2343 Err(mpsc::error::TrySendError::Full(_)) => {
2344 warn!("dropping message for relay actor, channel is full");
2345 }
2346 }
2347 }
2348
2349 fn handle_relay_disco_message(
2350 &mut self,
2351 msg: &[u8],
2352 url: &RelayUrl,
2353 relay_node_src: PublicKey,
2354 ) -> bool {
2355 match disco::source_and_box(msg) {
2356 Some((source, sealed_box)) => {
2357 if relay_node_src != source {
2358 warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short());
2360 }
2361 self.msock.handle_disco_message(
2362 source,
2363 sealed_box,
2364 DiscoMessageSource::Relay {
2365 url: url.clone(),
2366 key: relay_node_src,
2367 },
2368 );
2369 true
2370 }
2371 None => false,
2372 }
2373 }
2374}
2375
2376fn new_re_stun_timer(initial_delay: bool) -> time::Interval {
2377 let mut rng = rand::thread_rng();
2380 let d: Duration = rng.gen_range(Duration::from_secs(20)..=Duration::from_secs(26));
2381 if initial_delay {
2382 debug!("scheduling periodic_stun to run in {}s", d.as_secs());
2383 time::interval_at(time::Instant::now() + d, d)
2384 } else {
2385 debug!(
2386 "scheduling periodic_stun to run immediately and in {}s",
2387 d.as_secs()
2388 );
2389 time::interval(d)
2390 }
2391}
2392
2393fn bind(
2395 addr_v4: Option<SocketAddrV4>,
2396 addr_v6: Option<SocketAddrV6>,
2397) -> Result<(UdpConn, Option<UdpConn>)> {
2398 let addr_v4 = addr_v4.unwrap_or_else(|| SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
2399 let pconn4 = UdpConn::bind(SocketAddr::V4(addr_v4)).context("bind IPv4 failed")?;
2400
2401 let ip4_port = pconn4.local_addr()?.port();
2402 let ip6_port = ip4_port.checked_add(1).unwrap_or(ip4_port - 1);
2403 let addr_v6 =
2404 addr_v6.unwrap_or_else(|| SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, ip6_port, 0, 0));
2405 let pconn6 = match UdpConn::bind(SocketAddr::V6(addr_v6)) {
2406 Ok(conn) => Some(conn),
2407 Err(err) => {
2408 info!("bind ignoring IPv6 bind failure: {:?}", err);
2409 None
2410 }
2411 };
2412
2413 Ok((pconn4, pconn6))
2414}
2415
2416#[derive(derive_more::Debug, Default, Clone)]
2422struct DiscoveredDirectAddrs {
2423 addrs: Watchable<BTreeSet<DirectAddr>>,
2425
2426 updated_at: Arc<RwLock<Option<Instant>>>,
2430}
2431
2432impl DiscoveredDirectAddrs {
2433 fn update(&self, addrs: BTreeSet<DirectAddr>) -> bool {
2435 *self.updated_at.write().unwrap() = Some(Instant::now());
2436 let updated = self.addrs.update(addrs).is_ok();
2437 if updated {
2438 event!(
2439 target: "events.net.direct_addrs",
2440 Level::DEBUG,
2441 addrs = ?self.addrs.get(),
2442 );
2443 }
2444 updated
2445 }
2446
2447 fn sockaddrs(&self) -> BTreeSet<SocketAddr> {
2448 self.addrs.read().iter().map(|da| da.addr).collect()
2449 }
2450
2451 fn fresh_enough(&self) -> Result<(), Duration> {
2460 match *self.updated_at.read().expect("poisoned") {
2461 None => Err(Duration::ZERO),
2462 Some(time) => {
2463 let elapsed = time.elapsed();
2464 if elapsed <= ENDPOINTS_FRESH_ENOUGH_DURATION {
2465 Ok(())
2466 } else {
2467 Err(elapsed)
2468 }
2469 }
2470 }
2471 }
2472
2473 fn to_call_me_maybe_message(&self) -> disco::CallMeMaybe {
2474 let my_numbers = self.addrs.read().iter().map(|da| da.addr).collect();
2475 disco::CallMeMaybe { my_numbers }
2476 }
2477
2478 fn updates_stream(&self) -> DirectAddrsStream {
2479 DirectAddrsStream {
2480 initial: Some(self.addrs.get()),
2481 inner: self.addrs.watch().into_stream(),
2482 }
2483 }
2484}
2485
2486#[derive(Debug)]
2488pub struct DirectAddrsStream {
2489 initial: Option<BTreeSet<DirectAddr>>,
2490 inner: watchable::WatcherStream<BTreeSet<DirectAddr>>,
2491}
2492
2493impl Stream for DirectAddrsStream {
2494 type Item = BTreeSet<DirectAddr>;
2495
2496 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2497 let this = &mut *self;
2498 if let Some(addrs) = this.initial.take() {
2499 if !addrs.is_empty() {
2500 return Poll::Ready(Some(addrs));
2501 }
2502 }
2503 loop {
2504 match Pin::new(&mut this.inner).poll_next(cx) {
2505 Poll::Pending => break Poll::Pending,
2506 Poll::Ready(Some(addrs)) => {
2507 if addrs.is_empty() {
2508 continue;
2516 } else {
2517 break Poll::Ready(Some(addrs));
2518 }
2519 }
2520 Poll::Ready(None) => break Poll::Ready(None),
2521 }
2522 }
2523 }
2524}
2525
2526fn split_packets(transmit: &quinn_udp::Transmit) -> RelayContents {
2536 let mut res = SmallVec::with_capacity(1);
2537 let contents = transmit.contents;
2538 if let Some(segment_size) = transmit.segment_size {
2539 for chunk in contents.chunks(segment_size) {
2540 res.push(Bytes::from(chunk.to_vec()));
2541 }
2542 } else {
2543 res.push(Bytes::from(contents.to_vec()));
2544 }
2545 res
2546}
2547
2548#[derive(Debug)]
2550struct PacketSplitIter {
2551 bytes: Bytes,
2552}
2553
2554impl PacketSplitIter {
2555 fn new(bytes: Bytes) -> Self {
2559 Self { bytes }
2560 }
2561
2562 fn fail(&mut self) -> Option<std::io::Result<Bytes>> {
2563 self.bytes.clear();
2564 Some(Err(std::io::Error::new(
2565 std::io::ErrorKind::UnexpectedEof,
2566 "",
2567 )))
2568 }
2569}
2570
2571impl Iterator for PacketSplitIter {
2572 type Item = std::io::Result<Bytes>;
2573
2574 fn next(&mut self) -> Option<Self::Item> {
2575 use bytes::Buf;
2576 if self.bytes.has_remaining() {
2577 if self.bytes.remaining() < 2 {
2578 return self.fail();
2579 }
2580 let len = self.bytes.get_u16_le() as usize;
2581 if self.bytes.remaining() < len {
2582 return self.fail();
2583 }
2584 let item = self.bytes.split_to(len);
2585 Some(Ok(item))
2586 } else {
2587 None
2588 }
2589 }
2590}
2591
2592#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
2606pub(crate) struct QuicMappedAddr(pub(crate) SocketAddr);
2607
2608static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1);
2610
2611impl QuicMappedAddr {
2612 const ADDR_PREFIXL: u8 = 0xfd;
2614 const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11];
2616 const ADDR_SUBNET: [u8; 2] = [0; 2];
2618
2619 pub(crate) fn generate() -> Self {
2623 let mut addr = [0u8; 16];
2624 addr[0] = Self::ADDR_PREFIXL;
2625 addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID);
2626 addr[6..8].copy_from_slice(&Self::ADDR_SUBNET);
2627
2628 let counter = ADDR_COUNTER.fetch_add(1, Ordering::Relaxed);
2629 addr[8..16].copy_from_slice(&counter.to_be_bytes());
2630
2631 Self(SocketAddr::new(IpAddr::V6(Ipv6Addr::from(addr)), 12345))
2632 }
2633}
2634
2635impl std::fmt::Display for QuicMappedAddr {
2636 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2637 write!(f, "QuicMappedAddr({})", self.0)
2638 }
2639}
2640fn disco_message_sent(msg: &disco::Message) {
2641 match msg {
2642 disco::Message::Ping(_) => {
2643 inc!(MagicsockMetrics, sent_disco_ping);
2644 }
2645 disco::Message::Pong(_) => {
2646 inc!(MagicsockMetrics, sent_disco_pong);
2647 }
2648 disco::Message::CallMeMaybe(_) => {
2649 inc!(MagicsockMetrics, sent_disco_call_me_maybe);
2650 }
2651 }
2652}
2653
2654#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2660pub struct DirectAddr {
2661 pub addr: SocketAddr,
2663 pub typ: DirectAddrType,
2665}
2666
2667#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
2672pub enum DirectAddrType {
2673 Unknown,
2675 Local,
2677 Stun,
2683 Portmapped,
2688 Stun4LocalPort,
2694}
2695
2696impl Display for DirectAddrType {
2697 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2698 match self {
2699 DirectAddrType::Unknown => write!(f, "?"),
2700 DirectAddrType::Local => write!(f, "local"),
2701 DirectAddrType::Stun => write!(f, "stun"),
2702 DirectAddrType::Portmapped => write!(f, "portmap"),
2703 DirectAddrType::Stun4LocalPort => write!(f, "stun4localport"),
2704 }
2705 }
2706}
2707
2708#[derive(Debug, Clone, PartialEq)]
2710struct NetInfo {
2711 mapping_varies_by_dest_ip: Option<bool>,
2713
2714 hair_pinning: Option<bool>,
2716
2717 working_ipv6: Option<bool>,
2719
2720 os_has_ipv6: Option<bool>,
2722
2723 working_udp: Option<bool>,
2725
2726 working_icmp_v4: Option<bool>,
2728
2729 working_icmp_v6: Option<bool>,
2731
2732 have_port_map: bool,
2734
2735 portmap_probe: Option<portmapper::ProbeOutput>,
2737
2738 preferred_relay: Option<RelayUrl>,
2744
2745 relay_latency: BTreeMap<String, f64>,
2750}
2751
2752impl NetInfo {
2753 fn basically_equal(&self, other: &Self) -> bool {
2758 let eq_icmp_v4 = match (self.working_icmp_v4, other.working_icmp_v4) {
2759 (Some(slf), Some(other)) => slf == other,
2760 _ => true, };
2762 let eq_icmp_v6 = match (self.working_icmp_v6, other.working_icmp_v6) {
2763 (Some(slf), Some(other)) => slf == other,
2764 _ => true, };
2766 self.mapping_varies_by_dest_ip == other.mapping_varies_by_dest_ip
2767 && self.hair_pinning == other.hair_pinning
2768 && self.working_ipv6 == other.working_ipv6
2769 && self.os_has_ipv6 == other.os_has_ipv6
2770 && self.working_udp == other.working_udp
2771 && eq_icmp_v4
2772 && eq_icmp_v6
2773 && self.have_port_map == other.have_port_map
2774 && self.portmap_probe == other.portmap_probe
2775 && self.preferred_relay == other.preferred_relay
2776 }
2777}
2778
2779#[cfg(test)]
2780mod tests {
2781 use anyhow::Context;
2782 use iroh_test::CallOnDrop;
2783 use rand::RngCore;
2784 use tokio_util::task::AbortOnDropHandle;
2785
2786 use super::*;
2787 use crate::{defaults::staging::EU_RELAY_HOSTNAME, relay::RelayMode, tls, Endpoint};
2788
2789 const ALPN: &[u8] = b"n0/test/1";
2790
2791 impl MagicSock {
2792 #[track_caller]
2793 pub fn add_test_addr(&self, node_addr: NodeAddr) {
2794 self.add_node_addr(
2795 node_addr,
2796 Source::NamedApp {
2797 name: "test".into(),
2798 },
2799 )
2800 .unwrap()
2801 }
2802 }
2803
2804 #[derive(Clone)]
2806 struct MagicStack {
2807 secret_key: SecretKey,
2808 endpoint: Endpoint,
2809 }
2810
2811 impl MagicStack {
2812 async fn new(relay_mode: RelayMode) -> Result<Self> {
2813 let secret_key = SecretKey::generate();
2814
2815 let mut transport_config = quinn::TransportConfig::default();
2816 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
2817
2818 let endpoint = Endpoint::builder()
2819 .secret_key(secret_key.clone())
2820 .transport_config(transport_config)
2821 .relay_mode(relay_mode)
2822 .alpns(vec![ALPN.to_vec()])
2823 .bind()
2824 .await?;
2825
2826 Ok(Self {
2827 secret_key,
2828 endpoint,
2829 })
2830 }
2831
2832 fn tracked_endpoints(&self) -> Vec<PublicKey> {
2833 self.endpoint
2834 .magic_sock()
2835 .list_remote_infos()
2836 .into_iter()
2837 .map(|ep| ep.node_id)
2838 .collect()
2839 }
2840
2841 fn public(&self) -> PublicKey {
2842 self.secret_key.public()
2843 }
2844 }
2845
2846 #[instrument(skip_all)]
2855 async fn mesh_stacks(stacks: Vec<MagicStack>) -> Result<CallOnDrop> {
2856 fn update_direct_addrs(
2858 stacks: &[MagicStack],
2859 my_idx: usize,
2860 new_addrs: BTreeSet<DirectAddr>,
2861 ) {
2862 let me = &stacks[my_idx];
2863 for (i, m) in stacks.iter().enumerate() {
2864 if i == my_idx {
2865 continue;
2866 }
2867
2868 let addr = NodeAddr {
2869 node_id: me.public(),
2870 info: crate::AddrInfo {
2871 relay_url: None,
2872 direct_addresses: new_addrs.iter().map(|ep| ep.addr).collect(),
2873 },
2874 };
2875 m.endpoint.magic_sock().add_test_addr(addr);
2876 }
2877 }
2878
2879 let mut tasks = JoinSet::new();
2882 for (my_idx, m) in stacks.iter().enumerate() {
2883 let m = m.clone();
2884 let stacks = stacks.clone();
2885 tasks.spawn(async move {
2886 let me = m.endpoint.node_id().fmt_short();
2887 let mut stream = m.endpoint.direct_addresses();
2888 while let Some(new_eps) = stream.next().await {
2889 info!(%me, "conn{} endpoints update: {:?}", my_idx + 1, new_eps);
2890 update_direct_addrs(&stacks, my_idx, new_eps);
2891 }
2892 });
2893 }
2894 let guard = CallOnDrop::new(move || {
2895 tasks.abort_all();
2896 });
2897
2898 time::timeout(Duration::from_secs(10), async move {
2900 let all_node_ids: Vec<_> = stacks.iter().map(|ms| ms.endpoint.node_id()).collect();
2901 loop {
2902 let mut ready = Vec::with_capacity(stacks.len());
2903 for ms in stacks.iter() {
2904 let endpoints = ms.tracked_endpoints();
2905 let my_node_id = ms.endpoint.node_id();
2906 let all_nodes_meshed = all_node_ids
2907 .iter()
2908 .filter(|node_id| **node_id != my_node_id)
2909 .all(|node_id| endpoints.contains(node_id));
2910 ready.push(all_nodes_meshed);
2911 }
2912 if ready.iter().all(|meshed| *meshed) {
2913 break;
2914 }
2915 tokio::time::sleep(Duration::from_millis(200)).await;
2916 }
2917 })
2918 .await
2919 .context("failed to connect nodes")?;
2920 info!("all nodes meshed");
2921 Ok(guard)
2922 }
2923
2924 #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))]
2925 async fn echo_receiver(ep: MagicStack) -> Result<()> {
2926 info!("accepting conn");
2927 let conn = ep.endpoint.accept().await.expect("no conn");
2928
2929 info!("connecting");
2930 let conn = conn.await.context("[receiver] connecting")?;
2931 info!("accepting bi");
2932 let (mut send_bi, mut recv_bi) =
2933 conn.accept_bi().await.context("[receiver] accepting bi")?;
2934
2935 info!("reading");
2936 let val = recv_bi
2937 .read_to_end(usize::MAX)
2938 .await
2939 .context("[receiver] reading to end")?;
2940
2941 info!("replying");
2942 for chunk in val.chunks(12) {
2943 send_bi
2944 .write_all(chunk)
2945 .await
2946 .context("[receiver] sending chunk")?;
2947 }
2948
2949 info!("finishing");
2950 send_bi.finish().context("[receiver] finishing")?;
2951 send_bi.stopped().await.context("[receiver] stopped")?;
2952
2953 let stats = conn.stats();
2954 info!("stats: {:#?}", stats);
2955 assert!(
2957 stats.path.lost_packets < 10,
2958 "[receiver] should not loose many packets",
2959 );
2960
2961 info!("close");
2962 conn.close(0u32.into(), b"done");
2963 info!("wait idle");
2964 ep.endpoint.endpoint().wait_idle().await;
2965
2966 Ok(())
2967 }
2968
2969 #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))]
2970 async fn echo_sender(ep: MagicStack, dest_id: PublicKey, msg: &[u8]) -> Result<()> {
2971 info!("connecting to {}", dest_id.fmt_short());
2972 let dest = NodeAddr::new(dest_id);
2973 let conn = ep
2974 .endpoint
2975 .connect(dest, ALPN)
2976 .await
2977 .context("[sender] connect")?;
2978
2979 info!("opening bi");
2980 let (mut send_bi, mut recv_bi) = conn.open_bi().await.context("[sender] open bi")?;
2981
2982 info!("writing message");
2983 send_bi.write_all(msg).await.context("[sender] write all")?;
2984
2985 info!("finishing");
2986 send_bi.finish().context("[sender] finish")?;
2987 send_bi.stopped().await.context("[sender] stopped")?;
2988
2989 info!("reading_to_end");
2990 let val = recv_bi.read_to_end(usize::MAX).await.context("[sender]")?;
2991 assert_eq!(
2992 val,
2993 msg,
2994 "[sender] expected {}, got {}",
2995 hex::encode(msg),
2996 hex::encode(&val)
2997 );
2998
2999 let stats = conn.stats();
3000 info!("stats: {:#?}", stats);
3001 assert!(
3002 stats.path.lost_packets < 10,
3003 "[sender] should not loose many packets",
3004 );
3005
3006 info!("close");
3007 conn.close(0u32.into(), b"done");
3008 info!("wait idle");
3009 ep.endpoint.endpoint().wait_idle().await;
3010 Ok(())
3011 }
3012
3013 async fn run_roundtrip(sender: MagicStack, receiver: MagicStack, payload: &[u8]) {
3015 let send_node_id = sender.endpoint.node_id();
3016 let recv_node_id = receiver.endpoint.node_id();
3017 info!("\nroundtrip: {send_node_id:#} -> {recv_node_id:#}");
3018
3019 let receiver_task = tokio::spawn(echo_receiver(receiver));
3020 let sender_res = echo_sender(sender, recv_node_id, payload).await;
3021 let sender_is_err = match sender_res {
3022 Ok(()) => false,
3023 Err(err) => {
3024 eprintln!("[sender] Error:\n{err:#?}");
3025 true
3026 }
3027 };
3028 let receiver_is_err = match receiver_task.await {
3029 Ok(Ok(())) => false,
3030 Ok(Err(err)) => {
3031 eprintln!("[receiver] Error:\n{err:#?}");
3032 true
3033 }
3034 Err(joinerr) => {
3035 if joinerr.is_panic() {
3036 std::panic::resume_unwind(joinerr.into_panic());
3037 } else {
3038 eprintln!("[receiver] Error:\n{joinerr:#?}");
3039 }
3040 true
3041 }
3042 };
3043 if sender_is_err || receiver_is_err {
3044 panic!("Sender or receiver errored");
3045 }
3046 }
3047
3048 #[tokio::test(flavor = "multi_thread")]
3049 async fn test_two_devices_roundtrip_quinn_magic() -> Result<()> {
3050 iroh_test::logging::setup_multithreaded();
3051
3052 let m1 = MagicStack::new(RelayMode::Disabled).await?;
3053 let m2 = MagicStack::new(RelayMode::Disabled).await?;
3054
3055 let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3056
3057 for i in 0..5 {
3058 info!("\n-- round {i}");
3059 run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3060 run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3061
3062 info!("\n-- larger data");
3063 let mut data = vec![0u8; 10 * 1024];
3064 rand::thread_rng().fill_bytes(&mut data);
3065 run_roundtrip(m1.clone(), m2.clone(), &data).await;
3066 run_roundtrip(m2.clone(), m1.clone(), &data).await;
3067 }
3068
3069 Ok(())
3070 }
3071
3072 #[tokio::test(flavor = "multi_thread")]
3073 async fn test_two_devices_roundtrip_network_change() -> Result<()> {
3074 time::timeout(
3075 Duration::from_secs(90),
3076 test_two_devices_roundtrip_network_change_impl(),
3077 )
3078 .await?
3079 }
3080
3081 async fn test_two_devices_roundtrip_network_change_impl() -> Result<()> {
3084 iroh_test::logging::setup_multithreaded();
3085
3086 let m1 = MagicStack::new(RelayMode::Disabled).await?;
3087 let m2 = MagicStack::new(RelayMode::Disabled).await?;
3088
3089 let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3090
3091 let offset = || {
3092 let delay = rand::thread_rng().gen_range(10..=500);
3093 Duration::from_millis(delay)
3094 };
3095 let rounds = 5;
3096
3097 let m1_network_change_guard = {
3099 let m1 = m1.clone();
3100 let task = tokio::spawn(async move {
3101 loop {
3102 println!("[m1] network change");
3103 m1.endpoint.magic_sock().force_network_change(true).await;
3104 time::sleep(offset()).await;
3105 }
3106 });
3107 CallOnDrop::new(move || {
3108 task.abort();
3109 })
3110 };
3111
3112 for i in 0..rounds {
3113 println!("-- [m1 changes] round {}", i + 1);
3114 run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3115 run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3116
3117 println!("-- [m1 changes] larger data");
3118 let mut data = vec![0u8; 10 * 1024];
3119 rand::thread_rng().fill_bytes(&mut data);
3120 run_roundtrip(m1.clone(), m2.clone(), &data).await;
3121 run_roundtrip(m2.clone(), m1.clone(), &data).await;
3122 }
3123
3124 std::mem::drop(m1_network_change_guard);
3125
3126 let m2_network_change_guard = {
3128 let m2 = m2.clone();
3129 let task = tokio::spawn(async move {
3130 loop {
3131 println!("[m2] network change");
3132 m2.endpoint.magic_sock().force_network_change(true).await;
3133 time::sleep(offset()).await;
3134 }
3135 });
3136 CallOnDrop::new(move || {
3137 task.abort();
3138 })
3139 };
3140
3141 for i in 0..rounds {
3142 println!("-- [m2 changes] round {}", i + 1);
3143 run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3144 run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3145
3146 println!("-- [m2 changes] larger data");
3147 let mut data = vec![0u8; 10 * 1024];
3148 rand::thread_rng().fill_bytes(&mut data);
3149 run_roundtrip(m1.clone(), m2.clone(), &data).await;
3150 run_roundtrip(m2.clone(), m1.clone(), &data).await;
3151 }
3152
3153 std::mem::drop(m2_network_change_guard);
3154
3155 let m1_m2_network_change_guard = {
3157 let m1 = m1.clone();
3158 let m2 = m2.clone();
3159 let task = tokio::spawn(async move {
3160 println!("-- [m1] network change");
3161 m1.endpoint.magic_sock().force_network_change(true).await;
3162 println!("-- [m2] network change");
3163 m2.endpoint.magic_sock().force_network_change(true).await;
3164 time::sleep(offset()).await;
3165 });
3166 CallOnDrop::new(move || {
3167 task.abort();
3168 })
3169 };
3170
3171 for i in 0..rounds {
3172 println!("-- [m1 & m2 changes] round {}", i + 1);
3173 run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await;
3174 run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await;
3175
3176 println!("-- [m1 & m2 changes] larger data");
3177 let mut data = vec![0u8; 10 * 1024];
3178 rand::thread_rng().fill_bytes(&mut data);
3179 run_roundtrip(m1.clone(), m2.clone(), &data).await;
3180 run_roundtrip(m2.clone(), m1.clone(), &data).await;
3181 }
3182
3183 std::mem::drop(m1_m2_network_change_guard);
3184 Ok(())
3185 }
3186
3187 #[tokio::test(flavor = "multi_thread")]
3188 async fn test_two_devices_setup_teardown() -> Result<()> {
3189 iroh_test::logging::setup_multithreaded();
3190 for i in 0..10 {
3191 println!("-- round {i}");
3192 println!("setting up magic stack");
3193 let m1 = MagicStack::new(RelayMode::Disabled).await?;
3194 let m2 = MagicStack::new(RelayMode::Disabled).await?;
3195
3196 let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3197
3198 println!("closing endpoints");
3199 let msock1 = m1.endpoint.magic_sock();
3200 let msock2 = m2.endpoint.magic_sock();
3201 m1.endpoint.close(0u32.into(), b"done").await?;
3202 m2.endpoint.close(0u32.into(), b"done").await?;
3203
3204 assert!(msock1.msock.is_closed());
3205 assert!(msock2.msock.is_closed());
3206 }
3207 Ok(())
3208 }
3209
3210 #[tokio::test]
3211 async fn test_two_devices_roundtrip_quinn_raw() -> Result<()> {
3212 let _guard = iroh_test::logging::setup();
3213
3214 let make_conn = |addr: SocketAddr| -> anyhow::Result<quinn::Endpoint> {
3215 let key = SecretKey::generate();
3216 let conn = std::net::UdpSocket::bind(addr)?;
3217
3218 let quic_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?;
3219 let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
3220 let mut transport_config = quinn::TransportConfig::default();
3221 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
3222 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3223 server_config.transport_config(Arc::new(transport_config));
3224 let mut quic_ep = quinn::Endpoint::new(
3225 quinn::EndpointConfig::default(),
3226 Some(server_config),
3227 conn,
3228 Arc::new(quinn::TokioRuntime),
3229 )?;
3230
3231 let quic_client_config =
3232 tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?;
3233 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3234 let mut transport_config = quinn::TransportConfig::default();
3235 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3236 client_config.transport_config(Arc::new(transport_config));
3237 quic_ep.set_default_client_config(client_config);
3238
3239 Ok(quic_ep)
3240 };
3241
3242 let m1 = make_conn("127.0.0.1:0".parse().unwrap())?;
3243 let m2 = make_conn("127.0.0.1:0".parse().unwrap())?;
3244
3245 macro_rules! roundtrip {
3247 ($a:expr, $b:expr, $msg:expr) => {
3248 let a = $a.clone();
3249 let b = $b.clone();
3250 let a_name = stringify!($a);
3251 let b_name = stringify!($b);
3252 println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len());
3253
3254 let a_addr = a.local_addr()?;
3255 let b_addr = b.local_addr()?;
3256
3257 println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr);
3258
3259 let b_task = tokio::task::spawn(async move {
3260 println!("[{b_name}] accepting conn");
3261 let conn = b.accept().await.expect("no conn");
3262 println!("[{}] connecting", b_name);
3263 let conn = conn
3264 .await
3265 .with_context(|| format!("[{b_name}] connecting"))?;
3266 println!("[{}] accepting bi", b_name);
3267 let (mut send_bi, mut recv_bi) = conn
3268 .accept_bi()
3269 .await
3270 .with_context(|| format!("[{b_name}] accepting bi"))?;
3271
3272 println!("[{b_name}] reading");
3273 let val = recv_bi
3274 .read_to_end(usize::MAX)
3275 .await
3276 .with_context(|| format!("[{b_name}] reading to end"))?;
3277 println!("[{b_name}] finishing");
3278 send_bi
3279 .finish()
3280 .with_context(|| format!("[{b_name}] finishing"))?;
3281 send_bi
3282 .stopped()
3283 .await
3284 .with_context(|| format!("[b_name] stopped"))?;
3285
3286 println!("[{b_name}] close");
3287 conn.close(0u32.into(), b"done");
3288 println!("[{b_name}] closed");
3289
3290 Ok::<_, anyhow::Error>(val)
3291 });
3292
3293 println!("[{a_name}] connecting to {b_addr}");
3294 let conn = a
3295 .connect(b_addr, "localhost")?
3296 .await
3297 .with_context(|| format!("[{a_name}] connect"))?;
3298
3299 println!("[{a_name}] opening bi");
3300 let (mut send_bi, mut recv_bi) = conn
3301 .open_bi()
3302 .await
3303 .with_context(|| format!("[{a_name}] open bi"))?;
3304 println!("[{a_name}] writing message");
3305 send_bi
3306 .write_all(&$msg[..])
3307 .await
3308 .with_context(|| format!("[{a_name}] write all"))?;
3309
3310 println!("[{a_name}] finishing");
3311 send_bi
3312 .finish()
3313 .with_context(|| format!("[{a_name}] finish"))?;
3314 send_bi
3315 .stopped()
3316 .await
3317 .with_context(|| format!("[{a_name}] stopped"))?;
3318
3319 println!("[{a_name}] reading_to_end");
3320 let _ = recv_bi
3321 .read_to_end(usize::MAX)
3322 .await
3323 .with_context(|| format!("[{a_name}] reading_to_end"))?;
3324 println!("[{a_name}] close");
3325 conn.close(0u32.into(), b"done");
3326 println!("[{a_name}] wait idle");
3327 a.wait_idle().await;
3328
3329 drop(send_bi);
3330
3331 println!("[{a_name}] waiting for channel");
3333 let val = b_task.await??;
3334 anyhow::ensure!(
3335 val == $msg,
3336 "expected {}, got {}",
3337 hex::encode($msg),
3338 hex::encode(val)
3339 );
3340 };
3341 }
3342
3343 for i in 0..10 {
3344 println!("-- round {}", i + 1);
3345 roundtrip!(m1, m2, b"hello m1");
3346 roundtrip!(m2, m1, b"hello m2");
3347
3348 println!("-- larger data");
3349
3350 let mut data = vec![0u8; 10 * 1024];
3351 rand::thread_rng().fill_bytes(&mut data);
3352 roundtrip!(m1, m2, data);
3353 roundtrip!(m2, m1, data);
3354 }
3355
3356 Ok(())
3357 }
3358
3359 #[tokio::test]
3360 async fn test_two_devices_roundtrip_quinn_rebinding_conn() -> Result<()> {
3361 let _guard = iroh_test::logging::setup();
3362
3363 fn make_conn(addr: SocketAddr) -> anyhow::Result<quinn::Endpoint> {
3364 let key = SecretKey::generate();
3365 let conn = UdpConn::bind(addr)?;
3366
3367 let quic_server_config = tls::make_server_config(&key, vec![ALPN.to_vec()], false)?;
3368 let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
3369 let mut transport_config = quinn::TransportConfig::default();
3370 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
3371 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3372 server_config.transport_config(Arc::new(transport_config));
3373 let mut quic_ep = quinn::Endpoint::new_with_abstract_socket(
3374 quinn::EndpointConfig::default(),
3375 Some(server_config),
3376 Arc::new(conn),
3377 Arc::new(quinn::TokioRuntime),
3378 )?;
3379
3380 let quic_client_config =
3381 tls::make_client_config(&key, None, vec![ALPN.to_vec()], false)?;
3382 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3383 let mut transport_config = quinn::TransportConfig::default();
3384 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
3385 client_config.transport_config(Arc::new(transport_config));
3386 quic_ep.set_default_client_config(client_config);
3387
3388 Ok(quic_ep)
3389 }
3390
3391 let m1 = make_conn("127.0.0.1:7770".parse().unwrap())?;
3392 let m2 = make_conn("127.0.0.1:7771".parse().unwrap())?;
3393
3394 macro_rules! roundtrip {
3396 ($a:expr, $b:expr, $msg:expr) => {
3397 let a = $a.clone();
3398 let b = $b.clone();
3399 let a_name = stringify!($a);
3400 let b_name = stringify!($b);
3401 println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len());
3402
3403 let a_addr: SocketAddr = format!("127.0.0.1:{}", a.local_addr()?.port())
3404 .parse()
3405 .unwrap();
3406 let b_addr: SocketAddr = format!("127.0.0.1:{}", b.local_addr()?.port())
3407 .parse()
3408 .unwrap();
3409
3410 println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr);
3411
3412 let b_task = tokio::task::spawn(async move {
3413 println!("[{}] accepting conn", b_name);
3414 let conn = b.accept().await.expect("no conn");
3415 println!("[{}] connecting", b_name);
3416 let conn = conn
3417 .await
3418 .with_context(|| format!("[{}] connecting", b_name))?;
3419 println!("[{}] accepting bi", b_name);
3420 let (mut send_bi, mut recv_bi) = conn
3421 .accept_bi()
3422 .await
3423 .with_context(|| format!("[{}] accepting bi", b_name))?;
3424
3425 println!("[{}] reading", b_name);
3426 let val = recv_bi
3427 .read_to_end(usize::MAX)
3428 .await
3429 .with_context(|| format!("[{}] reading to end", b_name))?;
3430 println!("[{}] finishing", b_name);
3431 send_bi
3432 .finish()
3433 .with_context(|| format!("[{}] finishing", b_name))?;
3434 send_bi
3435 .stopped()
3436 .await
3437 .with_context(|| format!("[{b_name}] stopped"))?;
3438
3439 println!("[{}] close", b_name);
3440 conn.close(0u32.into(), b"done");
3441 println!("[{}] closed", b_name);
3442
3443 Ok::<_, anyhow::Error>(val)
3444 });
3445
3446 println!("[{}] connecting to {}", a_name, b_addr);
3447 let conn = a
3448 .connect(b_addr, "localhost")?
3449 .await
3450 .with_context(|| format!("[{}] connect", a_name))?;
3451
3452 println!("[{}] opening bi", a_name);
3453 let (mut send_bi, mut recv_bi) = conn
3454 .open_bi()
3455 .await
3456 .with_context(|| format!("[{}] open bi", a_name))?;
3457 println!("[{}] writing message", a_name);
3458 send_bi
3459 .write_all(&$msg[..])
3460 .await
3461 .with_context(|| format!("[{}] write all", a_name))?;
3462
3463 println!("[{}] finishing", a_name);
3464 send_bi
3465 .finish()
3466 .with_context(|| format!("[{}] finish", a_name))?;
3467 send_bi
3468 .stopped()
3469 .await
3470 .with_context(|| format!("[{a_name}] stopped"))?;
3471
3472 println!("[{}] reading_to_end", a_name);
3473 let _ = recv_bi
3474 .read_to_end(usize::MAX)
3475 .await
3476 .with_context(|| format!("[{}]", a_name))?;
3477 println!("[{}] close", a_name);
3478 conn.close(0u32.into(), b"done");
3479 println!("[{}] wait idle", a_name);
3480 a.wait_idle().await;
3481
3482 drop(send_bi);
3483
3484 println!("[{}] waiting for channel", a_name);
3486 let val = b_task.await??;
3487 anyhow::ensure!(
3488 val == $msg,
3489 "expected {}, got {}",
3490 hex::encode($msg),
3491 hex::encode(val)
3492 );
3493 };
3494 }
3495
3496 for i in 0..10 {
3497 println!("-- round {}", i + 1);
3498 roundtrip!(m1, m2, b"hello m1");
3499 roundtrip!(m2, m1, b"hello m2");
3500
3501 println!("-- larger data");
3502
3503 let mut data = vec![0u8; 10 * 1024];
3504 rand::thread_rng().fill_bytes(&mut data);
3505 roundtrip!(m1, m2, data);
3506 roundtrip!(m2, m1, data);
3507 }
3508
3509 Ok(())
3510 }
3511
3512 #[test]
3513 fn test_split_packets() {
3514 fn mk_transmit(contents: &[u8], segment_size: Option<usize>) -> quinn_udp::Transmit<'_> {
3515 let destination = "127.0.0.1:0".parse().unwrap();
3516 quinn_udp::Transmit {
3517 destination,
3518 ecn: None,
3519 contents,
3520 segment_size,
3521 src_ip: None,
3522 }
3523 }
3524 fn mk_expected(parts: impl IntoIterator<Item = &'static str>) -> RelayContents {
3525 parts
3526 .into_iter()
3527 .map(|p| p.as_bytes().to_vec().into())
3528 .collect()
3529 }
3530 assert_eq!(
3532 split_packets(&mk_transmit(b"hello", None)),
3533 mk_expected(["hello"])
3534 );
3535 assert_eq!(
3537 split_packets(&mk_transmit(b"helloworld", Some(5))),
3538 mk_expected(["hello", "world"])
3539 );
3540 assert_eq!(
3542 split_packets(&mk_transmit(b"hello world", Some(5))),
3543 mk_expected(["hello", " worl", "d"]) );
3545 assert_eq!(
3547 split_packets(&mk_transmit(b"hello world", Some(1000))),
3548 mk_expected(["hello world"])
3549 );
3550 }
3551
3552 #[tokio::test]
3553 async fn test_local_endpoints() {
3554 let _guard = iroh_test::logging::setup();
3555 let ms = Handle::new(Default::default()).await.unwrap();
3556
3557 let eps0 = ms.direct_addresses().next().await.unwrap();
3559 println!("{eps0:?}");
3560 assert!(!eps0.is_empty());
3561
3562 let eps1 = ms.direct_addresses().next().await.unwrap();
3564 println!("{eps1:?}");
3565 assert_eq!(eps0, eps1);
3566 }
3567
3568 #[tokio::test]
3569 async fn test_watch_home_relay() {
3570 let ops = Options {
3572 relay_map: RelayMap::empty(),
3573 ..Default::default()
3574 };
3575 let msock = MagicSock::spawn(ops).await.unwrap();
3576 let mut relay_stream = msock.watch_home_relay();
3577
3578 assert_eq!(
3580 futures_lite::future::poll_once(relay_stream.next()).await,
3581 None
3582 );
3583
3584 let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap();
3585 msock.set_my_relay(Some(url.clone()));
3586
3587 assert_eq!(relay_stream.next().await, Some(url.clone()));
3588
3589 let mut relay_stream = msock.watch_home_relay();
3592 assert_eq!(
3593 futures_lite::future::poll_once(relay_stream.next()).await,
3594 Some(Some(url))
3595 );
3596 }
3597
3598 #[instrument(name = "ep", skip_all, fields(me = secret_key.public().fmt_short()))]
3605 async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result<(quinn::Endpoint, Handle)> {
3606 let opts = Options {
3607 addr_v4: None,
3608 addr_v6: None,
3609 secret_key: secret_key.clone(),
3610 relay_map: RelayMap::empty(),
3611 node_map: None,
3612 discovery: None,
3613 dns_resolver: crate::dns::default_resolver().clone(),
3614 proxy_url: None,
3615 insecure_skip_relay_cert_verify: true,
3616 };
3617 let msock = MagicSock::spawn(opts).await?;
3618 let server_config = crate::endpoint::make_server_config(
3619 &secret_key,
3620 vec![ALPN.to_vec()],
3621 Arc::new(quinn::TransportConfig::default()),
3622 true,
3623 )?;
3624 let mut endpoint_config = quinn::EndpointConfig::default();
3625 endpoint_config.grease_quic_bit(false);
3626 let endpoint = quinn::Endpoint::new_with_abstract_socket(
3627 endpoint_config,
3628 Some(server_config),
3629 Arc::new(msock.clone()),
3630 Arc::new(quinn::TokioRuntime),
3631 )?;
3632 Ok((endpoint, msock))
3633 }
3634
3635 #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))]
3639 async fn magicsock_connect(
3640 ep: &quinn::Endpoint,
3641 ep_secret_key: SecretKey,
3642 addr: QuicMappedAddr,
3643 node_id: NodeId,
3644 ) -> Result<quinn::Connection> {
3645 let mut transport_config = quinn::TransportConfig::default();
3647 transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
3648
3649 magicsock_connet_with_transport_config(
3650 ep,
3651 ep_secret_key,
3652 addr,
3653 node_id,
3654 Arc::new(transport_config),
3655 )
3656 .await
3657 }
3658
3659 #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))]
3665 async fn magicsock_connet_with_transport_config(
3666 ep: &quinn::Endpoint,
3667 ep_secret_key: SecretKey,
3668 addr: QuicMappedAddr,
3669 node_id: NodeId,
3670 transport_config: Arc<quinn::TransportConfig>,
3671 ) -> Result<quinn::Connection> {
3672 let alpns = vec![ALPN.to_vec()];
3673 let quic_client_config =
3674 tls::make_client_config(&ep_secret_key, Some(node_id), alpns, true)?;
3675 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
3676 client_config.transport_config(transport_config);
3677 let connect = ep.connect_with(client_config, addr.0, "localhost")?;
3678 let connection = connect.await?;
3679 Ok(connection)
3680 }
3681
3682 #[tokio::test]
3683 async fn test_try_send_no_send_addr() {
3684 let _guard = iroh_test::logging::setup();
3687
3688 let secret_key_1 = SecretKey::from_bytes(&[1u8; 32]);
3689 let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]);
3690 let node_id_2 = secret_key_2.public();
3691 let secret_key_missing_node = SecretKey::from_bytes(&[255u8; 32]);
3692 let node_id_missing_node = secret_key_missing_node.public();
3693
3694 let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap();
3695
3696 let bad_addr = QuicMappedAddr::generate();
3698
3699 let res = tokio::time::timeout(
3704 Duration::from_millis(500),
3705 magicsock_connect(&ep_1, secret_key_1.clone(), bad_addr, node_id_missing_node),
3706 )
3707 .await;
3708 assert!(res.is_err(), "expecting timeout");
3709
3710 let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap();
3712
3713 let accept_task = tokio::spawn({
3715 async fn accept(ep: quinn::Endpoint) -> Result<()> {
3716 let incoming = ep.accept().await.ok_or(anyhow!("no incoming"))?;
3717 let _conn = incoming.accept()?.await?;
3718
3719 tokio::time::sleep(Duration::from_secs(10)).await;
3721 info!("accept finished");
3722 Ok(())
3723 }
3724 let ep_2 = ep_2.clone();
3725 async move {
3726 if let Err(err) = accept(ep_2).await {
3727 error!("{err:#}");
3728 }
3729 }
3730 .instrument(info_span!("ep2.accept, me = node_id_2.fmt_short()"))
3731 });
3732 let _accept_task = AbortOnDropHandle::new(accept_task);
3733
3734 let node_addr_2 = NodeAddr {
3735 node_id: node_id_2,
3736 info: AddrInfo {
3737 relay_url: None,
3738 direct_addresses: msock_2
3739 .direct_addresses()
3740 .next()
3741 .await
3742 .expect("no direct addrs")
3743 .into_iter()
3744 .map(|x| x.addr)
3745 .collect(),
3746 },
3747 };
3748 msock_1
3749 .add_node_addr(
3750 node_addr_2,
3751 Source::NamedApp {
3752 name: "test".into(),
3753 },
3754 )
3755 .unwrap();
3756 let addr = msock_1.get_mapping_addr(node_id_2).unwrap();
3757 let res = tokio::time::timeout(
3758 Duration::from_secs(10),
3759 magicsock_connect(&ep_1, secret_key_1.clone(), addr, node_id_2),
3760 )
3761 .await
3762 .expect("timeout while connecting");
3763
3764 res.unwrap();
3766
3767 }
3770
3771 #[tokio::test]
3772 async fn test_try_send_no_udp_addr_or_relay_url() {
3773 let _logging_guard = iroh_test::logging::setup();
3776
3777 let secret_key_1 = SecretKey::from_bytes(&[1u8; 32]);
3778 let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]);
3779 let node_id_2 = secret_key_2.public();
3780
3781 let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap();
3782 let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap();
3783
3784 let accept_task = tokio::spawn({
3786 async fn accept(ep: quinn::Endpoint) -> Result<()> {
3787 let incoming = ep.accept().await.ok_or(anyhow!("no incoming"))?;
3788 let conn = incoming.accept()?.await?;
3789 let mut stream = conn.accept_uni().await?;
3790 stream.read_to_end(1 << 16).await?;
3791 info!("accept finished");
3792 Ok(())
3793 }
3794 let ep_2 = ep_2.clone();
3795 async move {
3796 if let Err(err) = accept(ep_2).await {
3797 error!("{err:#}");
3798 }
3799 }
3800 .instrument(info_span!("ep2.accept", me = node_id_2.fmt_short()))
3801 });
3802 let _accept_task = AbortOnDropHandle::new(accept_task);
3803
3804 msock_1.node_map.add_node_addr(
3806 NodeAddr {
3807 node_id: node_id_2,
3808 info: AddrInfo::default(),
3809 },
3810 Source::NamedApp {
3811 name: "test".into(),
3812 },
3813 );
3814 let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap();
3815
3816 let mut transport_config = quinn::TransportConfig::default();
3827 transport_config.max_idle_timeout(Some(Duration::from_millis(200).try_into().unwrap()));
3828 let res = magicsock_connet_with_transport_config(
3829 &ep_1,
3830 secret_key_1.clone(),
3831 addr_2,
3832 node_id_2,
3833 Arc::new(transport_config),
3834 )
3835 .await;
3836 assert!(res.is_err(), "expected timeout");
3837 info!("first connect timed out as expected");
3838
3839 msock_1.node_map.add_node_addr(
3841 NodeAddr {
3842 node_id: node_id_2,
3843 info: AddrInfo {
3844 relay_url: None,
3845 direct_addresses: msock_2
3846 .direct_addresses()
3847 .next()
3848 .await
3849 .expect("no direct addrs")
3850 .into_iter()
3851 .map(|x| x.addr)
3852 .collect(),
3853 },
3854 },
3855 Source::NamedApp {
3856 name: "test".into(),
3857 },
3858 );
3859
3860 tokio::time::timeout(Duration::from_secs(10), async move {
3862 info!("establishing new connection");
3863 let conn = magicsock_connect(&ep_1, secret_key_1.clone(), addr_2, node_id_2)
3864 .await
3865 .unwrap();
3866 info!("have connection");
3867 let mut stream = conn.open_uni().await.unwrap();
3868 stream.write_all(b"hello").await.unwrap();
3869 stream.finish().unwrap();
3870 stream.stopped().await.unwrap();
3871 info!("finished stream");
3872 })
3873 .await
3874 .expect("connection timed out");
3875
3876 }
3879}