use std::{
collections::{BTreeMap, HashMap},
fmt::{self, Debug},
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
sync::Arc,
};
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use hickory_resolver::TokioAsyncResolver as DnsResolver;
use iroh_metrics::inc;
use netwatch::{IpFamily, UdpSocket};
use tokio::{
sync::{self, mpsc, oneshot},
time::{Duration, Instant},
};
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
use tracing::{debug, error, info_span, trace, warn, Instrument};
use super::{relay::RelayMap, stun};
use crate::relay::RelayUrl;
mod metrics;
mod reportgen;
pub use metrics::Metrics;
use Metrics as NetcheckMetrics;
const FULL_REPORT_INTERVAL: Duration = Duration::from_secs(5 * 60);
const DEFAULT_MAX_LATENCY: Duration = Duration::from_millis(100);
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct Report {
pub udp: bool,
pub ipv6: bool,
pub ipv4: bool,
pub ipv6_can_send: bool,
pub ipv4_can_send: bool,
pub os_has_ipv6: bool,
pub icmpv4: Option<bool>,
pub icmpv6: Option<bool>,
pub mapping_varies_by_dest_ip: Option<bool>,
pub mapping_varies_by_dest_ipv6: Option<bool>,
pub hair_pinning: Option<bool>,
pub portmap_probe: Option<portmapper::ProbeOutput>,
pub preferred_relay: Option<RelayUrl>,
pub relay_latency: RelayLatencies,
pub relay_v4_latency: RelayLatencies,
pub relay_v6_latency: RelayLatencies,
pub global_v4: Option<SocketAddrV4>,
pub global_v6: Option<SocketAddrV6>,
pub captive_portal: Option<bool>,
}
impl fmt::Display for Report {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self, f)
}
}
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct RelayLatencies(BTreeMap<RelayUrl, Duration>);
impl RelayLatencies {
fn new() -> Self {
Default::default()
}
fn update_relay(&mut self, url: RelayUrl, latency: Duration) {
let val = self.0.entry(url).or_insert(latency);
if latency < *val {
*val = latency;
}
}
fn merge(&mut self, other: &RelayLatencies) {
for (url, latency) in other.iter() {
self.update_relay(url.clone(), latency);
}
}
fn max_latency(&self) -> Duration {
self.0
.values()
.max()
.copied()
.unwrap_or(DEFAULT_MAX_LATENCY)
}
pub fn iter(&self) -> impl Iterator<Item = (&'_ RelayUrl, Duration)> + '_ {
self.0.iter().map(|(k, v)| (k, *v))
}
fn len(&self) -> usize {
self.0.len()
}
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn get(&self, url: &RelayUrl) -> Option<Duration> {
self.0.get(url).copied()
}
}
#[derive(Debug)]
pub struct Client {
addr: Addr,
_drop_guard: Arc<AbortOnDropHandle<()>>,
}
#[derive(Debug)]
struct Reports {
next_full: bool,
prev: HashMap<Instant, Arc<Report>>,
last: Option<Arc<Report>>,
last_full: Instant,
}
impl Default for Reports {
fn default() -> Self {
Self {
next_full: Default::default(),
prev: Default::default(),
last: Default::default(),
last_full: Instant::now(),
}
}
}
impl Client {
pub fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
let mut actor = Actor::new(port_mapper, dns_resolver)?;
let addr = actor.addr();
let task =
tokio::spawn(async move { actor.run().await }.instrument(info_span!("netcheck.actor")));
let drop_guard = AbortOnDropHandle::new(task);
Ok(Client {
addr,
_drop_guard: Arc::new(drop_guard),
})
}
pub(crate) fn addr(&self) -> Addr {
self.addr.clone()
}
pub async fn get_report(
&mut self,
dm: RelayMap,
stun_conn4: Option<Arc<UdpSocket>>,
stun_conn6: Option<Arc<UdpSocket>>,
) -> Result<Arc<Report>> {
let rx = self.get_report_channel(dm, stun_conn4, stun_conn6).await?;
match rx.await {
Ok(res) => res,
Err(_) => Err(anyhow!("channel closed, actor awol")),
}
}
pub async fn get_report_channel(
&mut self,
dm: RelayMap,
stun_conn4: Option<Arc<UdpSocket>>,
stun_conn6: Option<Arc<UdpSocket>>,
) -> Result<oneshot::Receiver<Result<Arc<Report>>>> {
let (tx, rx) = oneshot::channel();
self.addr
.send(Message::RunCheck {
relay_map: dm,
stun_sock_v4: stun_conn4,
stun_sock_v6: stun_conn6,
response_tx: tx,
})
.await?;
Ok(rx)
}
}
#[derive(Debug)]
pub(crate) struct Inflight {
txn: stun::TransactionId,
start: Instant,
s: sync::oneshot::Sender<(Duration, SocketAddr)>,
}
#[derive(Debug)]
pub(crate) enum Message {
RunCheck {
relay_map: RelayMap,
stun_sock_v4: Option<Arc<UdpSocket>>,
stun_sock_v6: Option<Arc<UdpSocket>>,
response_tx: oneshot::Sender<Result<Arc<Report>>>,
},
ReportReady { report: Box<Report> },
ReportAborted { err: anyhow::Error },
StunPacket {
payload: Bytes,
from_addr: SocketAddr,
},
InFlightStun(Inflight, oneshot::Sender<()>),
}
#[derive(Debug, Clone)]
pub(crate) struct Addr {
sender: mpsc::Sender<Message>,
}
impl Addr {
pub fn receive_stun_packet(&self, payload: Bytes, src: SocketAddr) {
if let Err(mpsc::error::TrySendError::Full(_)) = self.sender.try_send(Message::StunPacket {
payload,
from_addr: src,
}) {
inc!(NetcheckMetrics, stun_packets_dropped);
warn!("dropping stun packet from {}", src);
}
}
async fn send(&self, msg: Message) -> Result<(), mpsc::error::SendError<Message>> {
self.sender.send(msg).await.inspect_err(|_| {
error!("netcheck actor lost");
})
}
}
#[derive(Debug)]
struct Actor {
receiver: mpsc::Receiver<Message>,
sender: mpsc::Sender<Message>,
reports: Reports,
port_mapper: Option<portmapper::Client>,
in_flight_stun_requests: HashMap<stun::TransactionId, Inflight>,
current_report_run: Option<ReportRun>,
dns_resolver: DnsResolver,
}
impl Actor {
fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
let (sender, receiver) = mpsc::channel(32);
Ok(Self {
receiver,
sender,
reports: Default::default(),
port_mapper,
in_flight_stun_requests: Default::default(),
current_report_run: None,
dns_resolver,
})
}
fn addr(&self) -> Addr {
Addr {
sender: self.sender.clone(),
}
}
async fn run(&mut self) {
debug!("netcheck actor starting");
while let Some(msg) = self.receiver.recv().await {
trace!(?msg, "handling message");
match msg {
Message::RunCheck {
relay_map,
stun_sock_v4,
stun_sock_v6,
response_tx,
} => {
self.handle_run_check(relay_map, stun_sock_v4, stun_sock_v6, response_tx);
}
Message::ReportReady { report } => {
self.handle_report_ready(report);
}
Message::ReportAborted { err } => {
self.handle_report_aborted(err);
}
Message::StunPacket { payload, from_addr } => {
self.handle_stun_packet(&payload, from_addr);
}
Message::InFlightStun(inflight, response_tx) => {
self.handle_in_flight_stun(inflight, response_tx);
}
}
}
}
fn handle_run_check(
&mut self,
relay_map: RelayMap,
stun_sock_v4: Option<Arc<UdpSocket>>,
stun_sock_v6: Option<Arc<UdpSocket>>,
response_tx: oneshot::Sender<Result<Arc<Report>>>,
) {
if self.current_report_run.is_some() {
response_tx
.send(Err(anyhow!(
"ignoring RunCheck request: reportgen actor already running"
)))
.ok();
return;
}
let now = Instant::now();
let cancel_token = CancellationToken::new();
let stun_sock_v4 = match stun_sock_v4 {
Some(sock) => Some(sock),
None => bind_local_stun_socket(IpFamily::V4, self.addr(), cancel_token.clone()),
};
let stun_sock_v6 = match stun_sock_v6 {
Some(sock) => Some(sock),
None => bind_local_stun_socket(IpFamily::V6, self.addr(), cancel_token.clone()),
};
let mut do_full = self.reports.next_full
|| now.duration_since(self.reports.last_full) > FULL_REPORT_INTERVAL;
if !do_full {
if let Some(ref last) = self.reports.last {
do_full = !last.udp && last.captive_portal.unwrap_or_default();
}
}
if do_full {
self.reports.last = None; self.reports.next_full = false;
self.reports.last_full = now;
inc!(NetcheckMetrics, reports_full);
}
inc!(NetcheckMetrics, reports);
let actor = reportgen::Client::new(
self.addr(),
self.reports.last.clone(),
self.port_mapper.clone(),
relay_map,
stun_sock_v4,
stun_sock_v6,
self.dns_resolver.clone(),
);
self.current_report_run = Some(ReportRun {
_reportgen: actor,
_drop_guard: cancel_token.drop_guard(),
report_tx: response_tx,
});
}
fn handle_report_ready(&mut self, report: Box<Report>) {
let report = self.finish_and_store_report(*report);
self.in_flight_stun_requests.clear();
if let Some(ReportRun { report_tx, .. }) = self.current_report_run.take() {
report_tx.send(Ok(report)).ok();
}
}
fn handle_report_aborted(&mut self, err: anyhow::Error) {
self.in_flight_stun_requests.clear();
if let Some(ReportRun { report_tx, .. }) = self.current_report_run.take() {
report_tx.send(Err(err.context("report aborted"))).ok();
}
}
fn handle_stun_packet(&mut self, pkt: &[u8], src: SocketAddr) {
trace!(%src, "received STUN packet");
if self.in_flight_stun_requests.is_empty() {
return;
}
match &src {
SocketAddr::V4(_) => {
inc!(NetcheckMetrics, stun_packets_recv_ipv4);
}
SocketAddr::V6(_) => {
inc!(NetcheckMetrics, stun_packets_recv_ipv6);
}
}
match stun::parse_response(pkt) {
Ok((txn, addr_port)) => match self.in_flight_stun_requests.remove(&txn) {
Some(inf) => {
debug!(%src, %txn, "received known STUN packet");
let elapsed = inf.start.elapsed();
inf.s.send((elapsed, addr_port)).ok();
}
None => {
debug!(%src, %txn, "received unexpected STUN message response");
}
},
Err(err) => {
match stun::parse_binding_request(pkt) {
Ok(txn) => {
match self.in_flight_stun_requests.remove(&txn) {
Some(inf) => {
debug!(%src, %txn, "received our hairpin STUN request");
let elapsed = inf.start.elapsed();
inf.s.send((elapsed, src)).ok();
}
None => {
debug!(%src, %txn, "unknown STUN request");
}
}
}
Err(_) => {
debug!(%src, "received invalid STUN response: {err:#}");
}
}
}
}
}
fn handle_in_flight_stun(&mut self, inflight: Inflight, response_tx: oneshot::Sender<()>) {
self.in_flight_stun_requests.insert(inflight.txn, inflight);
response_tx.send(()).ok();
}
fn finish_and_store_report(&mut self, report: Report) -> Arc<Report> {
let report = self.add_report_history_and_set_preferred_relay(report);
debug!("{report:?}");
report
}
fn add_report_history_and_set_preferred_relay(&mut self, mut r: Report) -> Arc<Report> {
let mut prev_relay = None;
if let Some(ref last) = self.reports.last {
prev_relay.clone_from(&last.preferred_relay);
}
let now = Instant::now();
const MAX_AGE: Duration = Duration::from_secs(5 * 60);
let mut best_recent = RelayLatencies::new();
let prevs_iter = self
.reports
.prev
.iter()
.map(|(a, b)| -> (&Instant, &Report) { (a, b) })
.chain(std::iter::once((&now, &r)));
let mut to_remove = Vec::new();
for (t, pr) in prevs_iter {
if now.duration_since(*t) > MAX_AGE {
to_remove.push(*t);
continue;
}
best_recent.merge(&pr.relay_latency);
}
for t in to_remove {
self.reports.prev.remove(&t);
}
let mut best_any = Duration::default();
let mut old_relay_cur_latency = Duration::default();
{
for (url, duration) in r.relay_latency.iter() {
if Some(url) == prev_relay.as_ref() {
old_relay_cur_latency = duration;
}
if let Some(best) = best_recent.get(url) {
if r.preferred_relay.is_none() || best < best_any {
best_any = best;
r.preferred_relay.replace(url.clone());
}
}
}
if prev_relay.is_some()
&& r.preferred_relay != prev_relay
&& !old_relay_cur_latency.is_zero()
&& best_any > old_relay_cur_latency / 3 * 2
{
r.preferred_relay = prev_relay;
}
}
let r = Arc::new(r);
self.reports.prev.insert(now, r.clone());
self.reports.last = Some(r.clone());
r
}
}
#[derive(Debug)]
struct ReportRun {
_reportgen: reportgen::Client,
_drop_guard: tokio_util::sync::DropGuard,
report_tx: oneshot::Sender<Result<Arc<Report>>>,
}
fn bind_local_stun_socket(
network: IpFamily,
actor_addr: Addr,
cancel_token: CancellationToken,
) -> Option<Arc<UdpSocket>> {
let sock = match UdpSocket::bind(network, 0) {
Ok(sock) => Arc::new(sock),
Err(err) => {
debug!("failed to bind STUN socket: {}", err);
return None;
}
};
let span = info_span!(
"stun_udp_listener",
local_addr = sock
.local_addr()
.map(|a| a.to_string())
.unwrap_or(String::from("-")),
);
{
let sock = sock.clone();
tokio::spawn(
async move {
debug!("udp stun socket listener started");
let mut buf = vec![0u8; 64 << 10];
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => break,
res = recv_stun_once(&sock, &mut buf, &actor_addr) => {
if let Err(err) = res {
warn!(%err, "stun recv failed");
break;
}
}
}
}
debug!("udp stun socket listener stopped");
}
.instrument(span),
);
}
Some(sock)
}
async fn recv_stun_once(sock: &UdpSocket, buf: &mut [u8], actor_addr: &Addr) -> Result<()> {
let (count, mut from_addr) = sock
.recv_from(buf)
.await
.context("Error reading from stun socket")?;
let payload = &buf[..count];
from_addr.set_ip(from_addr.ip().to_canonical());
let msg = Message::StunPacket {
payload: Bytes::from(payload.to_vec()),
from_addr,
};
actor_addr.send(msg).await.context("actor stopped")
}
pub(crate) fn os_has_ipv6() -> bool {
UdpSocket::bind_local_v6(0).is_ok()
}
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use bytes::BytesMut;
use tokio::time;
use tracing::info;
use super::*;
use crate::{
defaults::{staging::EU_RELAY_HOSTNAME, DEFAULT_STUN_PORT},
ping::Pinger,
relay::RelayNode,
};
#[tokio::test]
async fn test_basic() -> Result<()> {
let _guard = iroh_test::logging::setup();
let (stun_addr, stun_stats, _cleanup_guard) =
stun::tests::serve("127.0.0.1".parse().unwrap()).await?;
let resolver = crate::dns::default_resolver();
let mut client = Client::new(None, resolver.clone())?;
let dm = stun::tests::relay_map_of([stun_addr].into_iter());
for i in 0..5 {
println!("--round {}", i);
let r = client.get_report(dm.clone(), None, None).await?;
assert!(r.udp, "want UDP");
assert_eq!(
r.relay_latency.len(),
1,
"expected 1 key in RelayLatency; got {}",
r.relay_latency.len()
);
assert!(
r.relay_latency.iter().next().is_some(),
"expected key 1 in RelayLatency; got {:?}",
r.relay_latency
);
assert!(r.global_v4.is_some(), "expected globalV4 set");
assert!(r.preferred_relay.is_some(),);
}
assert!(
stun_stats.total().await >= 5,
"expected at least 5 stun, got {}",
stun_stats.total().await,
);
Ok(())
}
#[tokio::test]
async fn test_iroh_computer_stun() -> Result<()> {
let _guard = iroh_test::logging::setup();
let resolver = crate::dns::default_resolver().clone();
let mut client = Client::new(None, resolver).context("failed to create netcheck client")?;
let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap();
let dm = RelayMap::from_nodes([RelayNode {
url: url.clone(),
stun_only: true,
stun_port: DEFAULT_STUN_PORT,
}])
.expect("hardcoded");
for i in 0..10 {
println!("starting report {}", i + 1);
let now = Instant::now();
let r = client
.get_report(dm.clone(), None, None)
.await
.context("failed to get netcheck report")?;
if r.udp {
assert_eq!(
r.relay_latency.len(),
1,
"expected 1 key in RelayLatency; got {}",
r.relay_latency.len()
);
assert!(
r.relay_latency.iter().next().is_some(),
"expected key 1 in RelayLatency; got {:?}",
r.relay_latency
);
assert!(
r.global_v4.is_some() || r.global_v6.is_some(),
"expected at least one of global_v4 or global_v6"
);
assert!(r.preferred_relay.is_some());
} else {
eprintln!("missing UDP, probe not returned by network");
}
println!("report {} done in {:?}", i + 1, now.elapsed());
}
Ok(())
}
#[tokio::test]
async fn test_udp_blocked() -> Result<()> {
let _guard = iroh_test::logging::setup();
let blackhole = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
let stun_addr = blackhole.local_addr()?;
let dm = stun::tests::relay_map_of_opts([(stun_addr, false)].into_iter());
let resolver = crate::dns::default_resolver().clone();
let mut client = Client::new(None, resolver)?;
let r = client.get_report(dm, None, None).await?;
let mut r: Report = (*r).clone();
r.portmap_probe = None;
let pinger = Pinger::new();
let can_ping = pinger.send(Ipv4Addr::LOCALHOST.into(), b"aa").await.is_ok();
let want_icmpv4 = match can_ping {
true => Some(true),
false => None,
};
let want = Report {
ipv4_can_send: can_ping,
os_has_ipv6: r.os_has_ipv6,
captive_portal: r.captive_portal,
icmpv4: want_icmpv4,
relay_latency: can_ping
.then(|| r.relay_latency.clone())
.unwrap_or_default(),
preferred_relay: can_ping
.then_some(r.preferred_relay.clone())
.unwrap_or_default(),
..Default::default()
};
assert_eq!(r, want);
Ok(())
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_add_report_history_set_preferred_relay() -> Result<()> {
fn relay_url(i: u16) -> RelayUrl {
format!("http://{i}.com").parse().unwrap()
}
fn report(a: impl IntoIterator<Item = (&'static str, u64)>) -> Option<Arc<Report>> {
let mut report = Report::default();
for (s, d) in a {
assert!(s.starts_with('d'), "invalid relay server key");
let id: u16 = s[1..].parse().unwrap();
report
.relay_latency
.0
.insert(relay_url(id), Duration::from_secs(d));
}
Some(Arc::new(report))
}
struct Step {
after: u64,
r: Option<Arc<Report>>,
}
struct Test {
name: &'static str,
steps: Vec<Step>,
want_relay: Option<RelayUrl>,
want_prev_len: usize,
}
let tests = [
Test {
name: "first_reading",
steps: vec![Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
}],
want_prev_len: 1,
want_relay: Some(relay_url(1)),
},
Test {
name: "with_two",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(1)), },
Test {
name: "but_now_d1_gone",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
Step {
after: 2,
r: report([("d2", 3)]),
},
],
want_prev_len: 3,
want_relay: Some(relay_url(2)), },
Test {
name: "d1_is_back",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
Step {
after: 2,
r: report([("d2", 3)]),
},
Step {
after: 3,
r: report([("d1", 4), ("d2", 3)]),
}, ],
want_prev_len: 4,
want_relay: Some(relay_url(1)), },
Test {
name: "things_clean_up",
steps: vec![
Step {
after: 0,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 1,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 2,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 3,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 10 * 60,
r: report([("d3", 3)]),
},
],
want_prev_len: 1, want_relay: Some(relay_url(3)), },
Test {
name: "preferred_relay_hysteresis_no_switch",
steps: vec![
Step {
after: 0,
r: report([("d1", 4), ("d2", 5)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(1)), },
Test {
name: "preferred_relay_hysteresis_do_switch",
steps: vec![
Step {
after: 0,
r: report([("d1", 4), ("d2", 5)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 1)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(2)), },
];
for mut tt in tests {
println!("test: {}", tt.name);
let resolver = crate::dns::default_resolver().clone();
let mut actor = Actor::new(None, resolver).unwrap();
for s in &mut tt.steps {
time::advance(Duration::from_secs(s.after)).await;
let r = Arc::try_unwrap(s.r.take().unwrap()).unwrap();
s.r = Some(actor.add_report_history_and_set_preferred_relay(r));
}
let last_report = tt.steps.last().unwrap().r.clone().unwrap();
let got = actor.reports.prev.len();
let want = tt.want_prev_len;
assert_eq!(got, want, "prev length");
let got = &last_report.preferred_relay;
let want = &tt.want_relay;
assert_eq!(got, want, "preferred_relay");
}
Ok(())
}
#[tokio::test]
async fn test_hairpin() -> Result<()> {
let (stun_addr, _stun_stats, _done) = stun::tests::serve_v4().await?;
let dm = stun::tests::relay_map_of([stun_addr].into_iter());
dbg!(&dm);
let resolver = crate::dns::default_resolver().clone();
let mut client = Client::new(None, resolver)?;
let sock = UdpSocket::bind_local(IpFamily::V4, 0)?;
let sock = Arc::new(sock);
info!(addr=?sock.local_addr().unwrap(), "Using local addr");
let task = {
let sock = sock.clone();
let addr = client.addr();
tokio::spawn(
async move {
let mut buf = BytesMut::zeroed(64 << 10);
loop {
let (count, src) = sock.recv_from(&mut buf).await.unwrap();
info!(
addr=?sock.local_addr().unwrap(),
%count,
"Forwarding payload to netcheck client",
);
let payload = buf.split_to(count).freeze();
addr.receive_stun_packet(payload, src);
}
}
.instrument(info_span!("pkt-fwd")),
)
};
let r = client.get_report(dm, Some(sock), None).await?;
dbg!(&r);
assert_eq!(r.hair_pinning, Some(true));
task.abort();
Ok(())
}
}