use std::{
future::Future,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
time::Duration,
};
use anyhow::{anyhow, bail, Context, Result};
use iroh_metrics::inc;
use netwatch::{interfaces, UdpSocket};
use rand::seq::IteratorRandom;
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
time::{self, Instant},
};
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument, Span};
use super::NetcheckMetrics;
use crate::{
defaults::DEFAULT_STUN_PORT,
dns::{DnsResolver, ResolverExt},
netcheck::{self, Report},
ping::{PingError, Pinger},
relay::{RelayMap, RelayNode, RelayUrl},
stun,
util::MaybeFuture,
};
mod hairpin;
mod probes;
use probes::{Probe, ProbePlan, ProbeProto};
use crate::defaults::timeouts::{
CAPTIVE_PORTAL_DELAY, CAPTIVE_PORTAL_TIMEOUT, DNS_TIMEOUT, OVERALL_REPORT_TIMEOUT,
PROBES_TIMEOUT,
};
const ENOUGH_NODES: usize = 3;
const DNS_STAGGERING_MS: &[u64] = &[200, 300];
#[derive(Debug)]
pub(super) struct Client {
_drop_guard: AbortOnDropHandle<()>,
}
impl Client {
pub(super) fn new(
netcheck: netcheck::Addr,
last_report: Option<Arc<Report>>,
port_mapper: Option<portmapper::Client>,
relay_map: RelayMap,
stun_sock4: Option<Arc<UdpSocket>>,
stun_sock6: Option<Arc<UdpSocket>>,
dns_resolver: DnsResolver,
) -> Self {
let (msg_tx, msg_rx) = mpsc::channel(32);
let addr = Addr {
sender: msg_tx.clone(),
};
let mut actor = Actor {
msg_tx,
msg_rx,
netcheck: netcheck.clone(),
last_report,
port_mapper,
relay_map,
stun_sock4,
stun_sock6,
report: Report::default(),
hairpin_actor: hairpin::Client::new(netcheck, addr),
outstanding_tasks: OutstandingTasks::default(),
dns_resolver,
};
let task = tokio::spawn(
async move { actor.run().await }.instrument(info_span!("reportgen.actor")),
);
Self {
_drop_guard: AbortOnDropHandle::new(task),
}
}
}
#[derive(Debug, Clone)]
pub(super) struct Addr {
sender: mpsc::Sender<Message>,
}
impl Addr {
async fn send(&self, msg: Message) -> Result<(), mpsc::error::SendError<Message>> {
trace!(
"sending {:?} to channel with cap {}",
msg,
self.sender.capacity()
);
self.sender.send(msg).await
}
}
#[derive(Debug)]
enum Message {
HairpinResult(bool),
ProbeWouldHelp(Probe, Arc<RelayNode>, oneshot::Sender<bool>),
AbortProbes,
}
#[derive(Debug)]
struct Actor {
msg_tx: mpsc::Sender<Message>,
msg_rx: mpsc::Receiver<Message>,
netcheck: super::Addr,
last_report: Option<Arc<Report>>,
port_mapper: Option<portmapper::Client>,
relay_map: RelayMap,
stun_sock4: Option<Arc<UdpSocket>>,
stun_sock6: Option<Arc<UdpSocket>>,
report: Report,
hairpin_actor: hairpin::Client,
outstanding_tasks: OutstandingTasks,
dns_resolver: DnsResolver,
}
impl Actor {
fn addr(&self) -> Addr {
Addr {
sender: self.msg_tx.clone(),
}
}
async fn run(&mut self) {
match self.run_inner().await {
Ok(_) => debug!("reportgen actor finished"),
Err(err) => {
self.netcheck
.send(netcheck::Message::ReportAborted { err })
.await
.ok();
}
}
}
async fn run_inner(&mut self) -> Result<()> {
debug!(
port_mapper = %self.port_mapper.is_some(),
"reportstate actor starting",
);
self.report.os_has_ipv6 = super::os_has_ipv6();
let mut port_mapping = self.prepare_portmapper_task();
let mut captive_task = self.prepare_captive_portal_task();
let mut probes = self.spawn_probes_task().await?;
let total_timer = tokio::time::sleep(OVERALL_REPORT_TIMEOUT);
tokio::pin!(total_timer);
let probe_timer = tokio::time::sleep(PROBES_TIMEOUT);
tokio::pin!(probe_timer);
loop {
trace!(awaiting = ?self.outstanding_tasks, "tick; awaiting tasks");
if self.outstanding_tasks.all_done() {
debug!("all tasks done");
break;
}
tokio::select! {
biased;
_ = &mut total_timer => {
trace!("tick: total_timer expired");
bail!("report timed out");
}
_ = &mut probe_timer => {
warn!("tick: probes timed out");
probe_timer.as_mut().reset(Instant::now() + PROBES_TIMEOUT);
probes.abort_all();
self.handle_abort_probes();
}
pm = &mut port_mapping, if self.outstanding_tasks.port_mapper => {
debug!(report=?pm, "tick: portmapper probe report");
self.report.portmap_probe = pm;
port_mapping.inner = None;
self.outstanding_tasks.port_mapper = false;
}
set_result = probes.join_next(), if self.outstanding_tasks.probes => {
trace!("tick: probes done: {:?}", set_result);
match set_result {
Some(Ok(Ok(report))) => self.handle_probe_report(report),
Some(Ok(Err(_))) => (),
Some(Err(e)) => {
warn!("probes task error: {:?}", e);
}
None => {
self.handle_abort_probes();
}
}
trace!("tick: probes handled");
}
found = &mut captive_task, if self.outstanding_tasks.captive_task => {
trace!("tick: captive portal task done");
self.report.captive_portal = found;
captive_task.inner = None;
self.outstanding_tasks.captive_task = false;
}
msg = self.msg_rx.recv() => {
trace!("tick: msg recv: {:?}", msg);
match msg {
Some(msg) => self.handle_message(msg),
None => bail!("msg_rx closed, reportgen client must be dropped"),
}
}
}
}
if !probes.is_empty() {
debug!(
"aborting {} probe sets, already have enough reports",
probes.len()
);
drop(probes);
}
debug!("Sending report to netcheck actor");
self.netcheck
.send(netcheck::Message::ReportReady {
report: Box::new(self.report.clone()),
})
.await?;
Ok(())
}
fn handle_message(&mut self, msg: Message) {
trace!(?msg, "handling message");
match msg {
Message::HairpinResult(works) => {
self.report.hair_pinning = Some(works);
self.outstanding_tasks.hairpin = false;
}
Message::ProbeWouldHelp(probe, relay_node, response_tx) => {
let res = self.probe_would_help(probe, relay_node);
if response_tx.send(res).is_err() {
debug!("probe dropped before ProbeWouldHelp response sent");
}
}
Message::AbortProbes => {
self.handle_abort_probes();
}
}
}
fn handle_probe_report(&mut self, probe_report: ProbeReport) {
debug!(?probe_report, "finished probe");
update_report(&mut self.report, probe_report);
if let Some(ref addr) = self.report.global_v4 {
if !self.hairpin_actor.has_started() {
self.hairpin_actor.start_check(*addr);
self.outstanding_tasks.hairpin = true;
}
}
let enough_relays = std::cmp::min(self.relay_map.len(), ENOUGH_NODES);
if self.report.relay_latency.len() == enough_relays {
let timeout = self.report.relay_latency.max_latency();
let timeout = match self.last_report.is_some() {
true => timeout,
false => timeout * 2,
};
let reportcheck = self.addr();
debug!(
reports=self.report.relay_latency.len(),
delay=?timeout,
"Have enough probe reports, aborting further probes soon",
);
tokio::spawn(
async move {
time::sleep(timeout).await;
reportcheck
.send(Message::AbortProbes)
.await
.map_err(|err| trace!("Failed to abort all probes: {err:#}"))
.ok();
}
.instrument(Span::current()),
);
}
}
fn probe_would_help(&mut self, probe: Probe, relay_node: Arc<RelayNode>) -> bool {
if self.report.relay_latency.get(&relay_node.url).is_none() {
return true;
}
if probe.proto() == ProbeProto::StunIpv6 && self.report.relay_v6_latency.is_empty() {
return true;
}
if probe.proto() == ProbeProto::StunIpv4 && self.report.mapping_varies_by_dest_ip.is_none()
{
return true;
}
false
}
fn handle_abort_probes(&mut self) {
trace!("handle abort probes");
self.outstanding_tasks.probes = false;
if self.report.udp {
self.outstanding_tasks.port_mapper = false;
self.outstanding_tasks.captive_task = false;
}
}
fn prepare_portmapper_task(
&mut self,
) -> MaybeFuture<Pin<Box<impl Future<Output = Option<portmapper::ProbeOutput>>>>> {
let mut port_mapping = MaybeFuture::default();
if let Some(port_mapper) = self.port_mapper.clone() {
port_mapping.inner = Some(Box::pin(async move {
match port_mapper.probe().await {
Ok(Ok(res)) => Some(res),
Ok(Err(err)) => {
debug!("skipping port mapping: {err:?}");
None
}
Err(recv_err) => {
warn!("skipping port mapping: {recv_err:?}");
None
}
}
}));
self.outstanding_tasks.port_mapper = true;
}
port_mapping
}
fn prepare_captive_portal_task(
&mut self,
) -> MaybeFuture<Pin<Box<impl Future<Output = Option<bool>>>>> {
if self.last_report.is_none() {
let preferred_relay = self
.last_report
.as_ref()
.and_then(|l| l.preferred_relay.clone());
let dm = self.relay_map.clone();
self.outstanding_tasks.captive_task = true;
MaybeFuture {
inner: Some(Box::pin(async move {
tokio::time::sleep(CAPTIVE_PORTAL_DELAY).await;
debug!("Captive portal check started after {CAPTIVE_PORTAL_DELAY:?}");
let captive_portal_check = tokio::time::timeout(
CAPTIVE_PORTAL_TIMEOUT,
check_captive_portal(&dm, preferred_relay)
.instrument(debug_span!("captive-portal")),
);
match captive_portal_check.await {
Ok(Ok(found)) => Some(found),
Ok(Err(err)) => {
let err: Result<reqwest::Error, _> = err.downcast();
match err {
Ok(req_err) if req_err.is_connect() => {
debug!("check_captive_portal failed: {req_err:#}");
}
Ok(req_err) => warn!("check_captive_portal error: {req_err:#}"),
Err(any_err) => warn!("check_captive_portal error: {any_err:#}"),
}
None
}
Err(_) => {
warn!("check_captive_portal timed out");
None
}
}
})),
}
} else {
self.outstanding_tasks.captive_task = false;
MaybeFuture::default()
}
}
async fn spawn_probes_task(&mut self) -> Result<JoinSet<Result<ProbeReport>>> {
let if_state = interfaces::State::new().await;
debug!(%if_state, "Local interfaces");
let plan = match self.last_report {
Some(ref report) => ProbePlan::with_last_report(&self.relay_map, &if_state, report),
None => ProbePlan::initial(&self.relay_map, &if_state),
};
trace!(%plan, "probe plan");
let pinger = Pinger::new();
let mut probes = JoinSet::default();
for probe_set in plan.iter() {
let mut set = JoinSet::default();
for probe in probe_set {
let reportstate = self.addr();
let stun_sock4 = self.stun_sock4.clone();
let stun_sock6 = self.stun_sock6.clone();
let relay_node = probe.node().clone();
let probe = probe.clone();
let netcheck = self.netcheck.clone();
let pinger = pinger.clone();
let dns_resolver = self.dns_resolver.clone();
set.spawn(
run_probe(
reportstate,
stun_sock4,
stun_sock6,
relay_node,
probe.clone(),
netcheck,
pinger,
dns_resolver,
)
.instrument(debug_span!("run_probe", %probe)),
);
}
probes.spawn(
async move {
let mut probe_proto = None;
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(report)) => return Ok(report),
Ok(Err(ProbeError::Error(err, probe))) => {
probe_proto = Some(probe.proto());
warn!(?probe, "probe failed: {:#}", err);
continue;
}
Ok(Err(ProbeError::AbortSet(err, probe))) => {
debug!(?probe, "probe set aborted: {:#}", err);
set.abort_all();
return Err(err);
}
Err(err) => {
warn!("fatal probe set error, aborting: {:#}", err);
continue;
}
}
}
warn!(?probe_proto, "no successful probes in ProbeSet");
Err(anyhow!("All probes in ProbeSet failed"))
}
.instrument(info_span!("probe")),
);
}
self.outstanding_tasks.probes = true;
Ok(probes)
}
}
#[derive(Debug, Default)]
struct OutstandingTasks {
probes: bool,
port_mapper: bool,
captive_task: bool,
hairpin: bool,
}
impl OutstandingTasks {
fn all_done(&self) -> bool {
!(self.probes || self.port_mapper || self.captive_task || self.hairpin)
}
}
#[derive(Debug, Clone)]
struct ProbeReport {
ipv4_can_send: bool,
ipv6_can_send: bool,
icmpv4: Option<bool>,
icmpv6: Option<bool>,
latency: Option<Duration>,
probe: Probe,
addr: Option<SocketAddr>,
}
impl ProbeReport {
fn new(probe: Probe) -> Self {
ProbeReport {
probe,
ipv4_can_send: false,
ipv6_can_send: false,
icmpv4: None,
icmpv6: None,
latency: None,
addr: None,
}
}
}
#[derive(Debug)]
enum ProbeError {
AbortSet(anyhow::Error, Probe),
Error(anyhow::Error, Probe),
}
#[allow(clippy::too_many_arguments)]
async fn run_probe(
reportstate: Addr,
stun_sock4: Option<Arc<UdpSocket>>,
stun_sock6: Option<Arc<UdpSocket>>,
relay_node: Arc<RelayNode>,
probe: Probe,
netcheck: netcheck::Addr,
pinger: Pinger,
dns_resolver: DnsResolver,
) -> Result<ProbeReport, ProbeError> {
if !probe.delay().is_zero() {
trace!("delaying probe");
tokio::time::sleep(probe.delay()).await;
}
debug!("starting probe");
let (would_help_tx, would_help_rx) = oneshot::channel();
if let Err(err) = reportstate
.send(Message::ProbeWouldHelp(
probe.clone(),
relay_node.clone(),
would_help_tx,
))
.await
{
debug!("Failed to check if probe would help: {err:#}");
return Err(ProbeError::AbortSet(err.into(), probe.clone()));
}
if !would_help_rx.await.map_err(|_| {
ProbeError::AbortSet(
anyhow!("ReportCheck actor dropped sender while waiting for ProbeWouldHelp response"),
probe.clone(),
)
})? {
return Err(ProbeError::AbortSet(
anyhow!("ReportCheck says probe set no longer useful"),
probe,
));
}
let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto())
.await
.context("no relay node addr")
.map_err(|e| ProbeError::AbortSet(e, probe.clone()))?;
let mut result = ProbeReport::new(probe.clone());
match probe {
Probe::StunIpv4 { .. } | Probe::StunIpv6 { .. } => {
let maybe_sock = if matches!(probe, Probe::StunIpv4 { .. }) {
stun_sock4.as_ref()
} else {
stun_sock6.as_ref()
};
match maybe_sock {
Some(sock) => {
result = run_stun_probe(sock, relay_addr, netcheck, probe).await?;
}
None => {
return Err(ProbeError::AbortSet(
anyhow!("No socket for {}, aborting probeset", probe.proto()),
probe.clone(),
));
}
}
}
Probe::IcmpV4 { .. } | Probe::IcmpV6 { .. } => {
result = run_icmp_probe(probe, relay_addr, pinger).await?
}
Probe::Https { ref node, .. } => {
debug!("sending probe HTTPS");
match measure_https_latency(node).await {
Ok((latency, ip)) => {
result.latency = Some(latency);
match ip {
IpAddr::V4(_) => result.ipv4_can_send = true,
IpAddr::V6(_) => result.ipv6_can_send = true,
}
}
Err(err) => {
warn!("https latency measurement failed: {:?}", err);
}
}
}
}
trace!("probe successful");
Ok(result)
}
async fn run_stun_probe(
sock: &Arc<UdpSocket>,
relay_addr: SocketAddr,
netcheck: netcheck::Addr,
probe: Probe,
) -> Result<ProbeReport, ProbeError> {
match probe.proto() {
ProbeProto::StunIpv4 => debug_assert!(relay_addr.is_ipv4()),
ProbeProto::StunIpv6 => debug_assert!(relay_addr.is_ipv6()),
_ => debug_assert!(false, "wrong probe"),
}
let txid = stun::TransactionId::default();
let req = stun::request(txid);
let (stun_tx, stun_rx) = oneshot::channel();
let (inflight_ready_tx, inflight_ready_rx) = oneshot::channel();
netcheck
.send(netcheck::Message::InFlightStun(
netcheck::Inflight {
txn: txid,
start: Instant::now(),
s: stun_tx,
},
inflight_ready_tx,
))
.await
.map_err(|e| ProbeError::Error(e.into(), probe.clone()))?;
inflight_ready_rx
.await
.map_err(|e| ProbeError::Error(e.into(), probe.clone()))?;
match sock.send_to(&req, relay_addr).await {
Ok(n) if n == req.len() => {
debug!(%relay_addr, %txid, "sending {} probe", probe.proto());
let mut result = ProbeReport::new(probe.clone());
if matches!(probe, Probe::StunIpv4 { .. }) {
result.ipv4_can_send = true;
inc!(NetcheckMetrics, stun_packets_sent_ipv4);
} else {
result.ipv6_can_send = true;
inc!(NetcheckMetrics, stun_packets_sent_ipv6);
}
let (delay, addr) = stun_rx
.await
.map_err(|e| ProbeError::Error(e.into(), probe.clone()))?;
result.latency = Some(delay);
result.addr = Some(addr);
Ok(result)
}
Ok(n) => {
let err = anyhow!("Failed to send full STUN request: {}", probe.proto());
error!(%relay_addr, sent_len=n, req_len=req.len(), "{err:#}");
Err(ProbeError::Error(err, probe.clone()))
}
Err(err) => {
let kind = err.kind();
let err = anyhow::Error::new(err)
.context(format!("Failed to send STUN request: {}", probe.proto()));
match format!("{kind:?}").as_str() {
"NetworkUnreachable" | "HostUnreachable" => {
debug!(%relay_addr, "{err:#}");
Err(ProbeError::AbortSet(err, probe.clone()))
}
_ => {
Err(ProbeError::Error(err, probe.clone()))
}
}
}
}
}
async fn check_captive_portal(dm: &RelayMap, preferred_relay: Option<RelayUrl>) -> Result<bool> {
let preferred_relay = preferred_relay.and_then(|url| match dm.get_node(&url) {
Some(node) if node.stun_only => Some(url),
_ => None,
});
let url = match preferred_relay {
Some(url) => url,
None => {
let urls: Vec<_> = dm
.nodes()
.filter(|n| !n.stun_only)
.map(|n| n.url.clone())
.collect();
if urls.is_empty() {
debug!("No suitable relay node for captive portal check");
return Ok(false);
}
let i = (0..urls.len())
.choose(&mut rand::thread_rng())
.unwrap_or_default();
urls[i].clone()
}
};
let client = reqwest::ClientBuilder::new()
.redirect(reqwest::redirect::Policy::none())
.build()?;
let host_name = url.host_str().unwrap_or_default();
let challenge = format!("ts_{}", host_name);
let portal_url = format!("http://{}/generate_204", host_name);
let res = client
.request(reqwest::Method::GET, portal_url)
.header("X-Tailscale-Challenge", &challenge)
.send()
.await?;
let expected_response = format!("response {challenge}");
let is_valid_response = res
.headers()
.get("X-Tailscale-Response")
.map(|s| s.to_str().unwrap_or_default())
== Some(&expected_response);
debug!(
"check_captive_portal url={} status_code={} valid_response={}",
res.url(),
res.status(),
is_valid_response,
);
let has_captive = res.status() != 204 || !is_valid_response;
Ok(has_captive)
}
async fn get_relay_addr(
dns_resolver: &DnsResolver,
relay_node: &RelayNode,
proto: ProbeProto,
) -> Result<SocketAddr> {
let port = if relay_node.stun_port == 0 {
DEFAULT_STUN_PORT
} else {
relay_node.stun_port
};
if relay_node.stun_only && !matches!(proto, ProbeProto::StunIpv4 | ProbeProto::StunIpv6) {
bail!("Relay node not suitable for non-STUN probes");
}
match proto {
ProbeProto::StunIpv4 | ProbeProto::IcmpV4 => match relay_node.url.host() {
Some(url::Host::Domain(hostname)) => {
debug!(?proto, %hostname, "Performing DNS A lookup for relay addr");
match dns_resolver
.lookup_ipv4_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await
{
Ok(mut addrs) => addrs
.next()
.map(|ip| ip.to_canonical())
.map(|addr| SocketAddr::new(addr, port))
.ok_or(anyhow!("No suitable relay addr found")),
Err(err) => Err(err.context("No suitable relay addr found")),
}
}
Some(url::Host::Ipv4(addr)) => Ok(SocketAddr::new(addr.into(), port)),
Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")),
None => Err(anyhow!("No valid hostname in RelayUrl")),
},
ProbeProto::StunIpv6 | ProbeProto::IcmpV6 => match relay_node.url.host() {
Some(url::Host::Domain(hostname)) => {
debug!(?proto, %hostname, "Performing DNS AAAA lookup for relay addr");
match dns_resolver
.lookup_ipv6_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await
{
Ok(mut addrs) => addrs
.next()
.map(|ip| ip.to_canonical())
.map(|addr| SocketAddr::new(addr, port))
.ok_or(anyhow!("No suitable relay addr found")),
Err(err) => Err(err.context("No suitable relay addr found")),
}
}
Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")),
Some(url::Host::Ipv6(addr)) => Ok(SocketAddr::new(addr.into(), port)),
None => Err(anyhow!("No valid hostname in RelayUrl")),
},
ProbeProto::Https => Err(anyhow!("Not implemented")),
}
}
async fn run_icmp_probe(
probe: Probe,
relay_addr: SocketAddr,
pinger: Pinger,
) -> Result<ProbeReport, ProbeError> {
match probe.proto() {
ProbeProto::IcmpV4 => debug_assert!(relay_addr.is_ipv4()),
ProbeProto::IcmpV6 => debug_assert!(relay_addr.is_ipv6()),
_ => debug_assert!(false, "wrong probe"),
}
const DATA: &[u8; 15] = b"iroh icmp probe";
debug!(dst = %relay_addr, len = DATA.len(), "ICMP Ping started");
let latency = pinger
.send(relay_addr.ip(), DATA)
.await
.map_err(|err| match err {
PingError::Client(err) => ProbeError::AbortSet(
anyhow!("Failed to create pinger ({err:#}), aborting probeset"),
probe.clone(),
),
PingError::Ping(err) => ProbeError::Error(err.into(), probe.clone()),
})?;
debug!(dst = %relay_addr, len = DATA.len(), ?latency, "ICMP ping done");
let mut report = ProbeReport::new(probe);
report.latency = Some(latency);
match relay_addr {
SocketAddr::V4(_) => {
report.ipv4_can_send = true;
report.icmpv4 = Some(true);
}
SocketAddr::V6(_) => {
report.ipv6_can_send = true;
report.icmpv6 = Some(true);
}
}
Ok(report)
}
#[allow(clippy::unused_async)]
async fn measure_https_latency(_node: &RelayNode) -> Result<(Duration, IpAddr)> {
bail!("not implemented");
}
fn update_report(report: &mut Report, probe_report: ProbeReport) {
let relay_node = probe_report.probe.node();
if let Some(latency) = probe_report.latency {
report
.relay_latency
.update_relay(relay_node.url.clone(), latency);
if matches!(
probe_report.probe.proto(),
ProbeProto::StunIpv4 | ProbeProto::StunIpv6
) {
report.udp = true;
match probe_report.addr {
Some(SocketAddr::V4(ipp)) => {
report.ipv4 = true;
report
.relay_v4_latency
.update_relay(relay_node.url.clone(), latency);
if report.global_v4.is_none() {
report.global_v4 = Some(ipp);
} else if report.global_v4 != Some(ipp) {
report.mapping_varies_by_dest_ip = Some(true);
} else if report.mapping_varies_by_dest_ip.is_none() {
report.mapping_varies_by_dest_ip = Some(false);
}
}
Some(SocketAddr::V6(ipp)) => {
report.ipv6 = true;
report
.relay_v6_latency
.update_relay(relay_node.url.clone(), latency);
if report.global_v6.is_none() {
report.global_v6 = Some(ipp);
} else if report.global_v6 != Some(ipp) {
report.mapping_varies_by_dest_ipv6 = Some(true);
warn!("IPv6 Address detected by STUN varies by destination");
} else if report.mapping_varies_by_dest_ipv6.is_none() {
report.mapping_varies_by_dest_ipv6 = Some(false);
}
}
None => {
debug_assert!(probe_report.addr.is_some());
}
}
}
}
report.ipv4_can_send |= probe_report.ipv4_can_send;
report.ipv6_can_send |= probe_report.ipv6_can_send;
report.icmpv4 = report
.icmpv4
.map(|val| val || probe_report.icmpv4.unwrap_or_default())
.or(probe_report.icmpv4);
report.icmpv6 = report
.icmpv6
.map(|val| val || probe_report.icmpv6.unwrap_or_default())
.or(probe_report.icmpv6);
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use super::*;
use crate::defaults::staging::{default_eu_relay_node, default_na_relay_node};
#[test]
fn test_update_report_stun_working() {
let eu_relayer = Arc::new(default_eu_relay_node());
let na_relayer = Arc::new(default_na_relay_node());
let mut report = Report::default();
let probe_report_eu = ProbeReport {
ipv4_can_send: true,
ipv6_can_send: false,
icmpv4: None,
icmpv6: None,
latency: Some(Duration::from_millis(5)),
probe: Probe::StunIpv4 {
delay: Duration::ZERO,
node: eu_relayer.clone(),
},
addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()),
};
update_report(&mut report, probe_report_eu.clone());
assert!(report.udp);
assert_eq!(
report.relay_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(5)
);
assert_eq!(
report.relay_v4_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(5)
);
assert!(report.ipv4_can_send);
assert!(!report.ipv6_can_send);
let probe_report_na = ProbeReport {
latency: Some(Duration::from_millis(8)),
probe: Probe::StunIpv4 {
delay: Duration::ZERO,
node: na_relayer.clone(),
},
..probe_report_eu
};
update_report(&mut report, probe_report_na);
assert!(report.udp);
assert_eq!(
report.relay_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(5)
);
assert_eq!(
report.relay_v4_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(5)
);
assert!(report.ipv4_can_send);
assert!(!report.ipv6_can_send);
let probe_report_eu_ipv6 = ProbeReport {
ipv4_can_send: false,
ipv6_can_send: true,
icmpv4: None,
icmpv6: None,
latency: Some(Duration::from_millis(4)),
probe: Probe::StunIpv6 {
delay: Duration::ZERO,
node: eu_relayer.clone(),
},
addr: Some((Ipv6Addr::new(2001, 0xdb8, 0, 0, 0, 0, 0, 1), 1234).into()),
};
update_report(&mut report, probe_report_eu_ipv6);
assert!(report.udp);
assert_eq!(
report.relay_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(4)
);
assert_eq!(
report.relay_v6_latency.get(&eu_relayer.url).unwrap(),
Duration::from_millis(4)
);
assert!(report.ipv4_can_send);
assert!(report.ipv6_can_send);
}
#[test]
fn test_update_report_icmp() {
let eu_relayer = Arc::new(default_eu_relay_node());
let na_relayer = Arc::new(default_na_relay_node());
let mut report = Report::default();
let probe_report_eu = ProbeReport {
ipv4_can_send: true,
ipv6_can_send: false,
icmpv4: Some(true),
icmpv6: None,
latency: Some(Duration::from_millis(5)),
probe: Probe::IcmpV4 {
delay: Duration::ZERO,
node: eu_relayer.clone(),
},
addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()),
};
update_report(&mut report, probe_report_eu.clone());
assert!(!report.udp);
assert!(report.ipv4_can_send);
assert_eq!(report.icmpv4, Some(true));
let probe_report_na = ProbeReport {
ipv4_can_send: false,
ipv6_can_send: false,
icmpv4: Some(false),
icmpv6: None,
latency: None,
probe: Probe::IcmpV4 {
delay: Duration::ZERO,
node: na_relayer.clone(),
},
addr: None,
};
update_report(&mut report, probe_report_na);
assert_eq!(report.icmpv4, Some(true));
let probe_report_eu_stun = ProbeReport {
ipv4_can_send: true,
ipv6_can_send: false,
icmpv4: None,
icmpv6: None,
latency: Some(Duration::from_millis(5)),
probe: Probe::StunIpv4 {
delay: Duration::ZERO,
node: eu_relayer.clone(),
},
addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()),
};
update_report(&mut report, probe_report_eu_stun);
assert!(report.udp);
assert_eq!(report.icmpv4, Some(true));
}
#[tokio::test]
async fn test_icmpk_probe_eu_relayer() {
let _logging_guard = iroh_test::logging::setup();
let pinger = Pinger::new();
let relay = default_eu_relay_node();
let resolver = crate::dns::default_resolver();
let addr = get_relay_addr(resolver, &relay, ProbeProto::IcmpV4)
.await
.map_err(|err| format!("{err:#}"))
.unwrap();
let probe = Probe::IcmpV4 {
delay: Duration::from_secs(0),
node: Arc::new(relay),
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut tasks = JoinSet::new();
for i in 0..8 {
let probe = probe.clone();
let pinger = pinger.clone();
let tx = tx.clone();
tasks.spawn(async move {
time::sleep(Duration::from_millis(i * 100)).await;
let res = run_icmp_probe(probe, addr, pinger).await;
tx.send(res).ok();
});
}
let mut last_err = None;
while let Some(res) = rx.recv().await {
match res {
Ok(report) => {
dbg!(&report);
assert_eq!(report.icmpv4, Some(true));
assert!(
report.latency.expect("should have a latency") > Duration::from_secs(0)
);
break;
}
Err(ProbeError::Error(err, _probe)) => {
last_err = Some(err);
}
Err(ProbeError::AbortSet(_err, _probe)) => {
break;
}
}
}
if let Some(err) = last_err {
panic!("Ping error: {err:#}");
}
}
}