#![cfg_attr(iroh_docsrs, feature(doc_cfg))]
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
#![cfg_attr(not(test), deny(clippy::unwrap_used))]
#![cfg_attr(wasm_browser, allow(unused))]
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
net::SocketAddr,
sync::Arc,
};
use defaults::timeouts::PROBES_TIMEOUT;
pub use defaults::timeouts::TIMEOUT;
use iroh_base::RelayUrl;
#[cfg(not(wasm_browser))]
use iroh_relay::RelayConfig;
#[cfg(not(wasm_browser))]
use iroh_relay::dns::DnsResolver;
#[cfg(not(wasm_browser))]
use iroh_relay::quic::QuicClient;
use iroh_relay::{
RelayMap,
quic::{QUIC_ADDR_DISC_CLOSE_CODE, QUIC_ADDR_DISC_CLOSE_REASON},
};
use n0_error::e;
#[cfg(not(wasm_browser))]
use n0_error::stack_error;
#[cfg(not(wasm_browser))]
use n0_future::task;
use n0_future::{
StreamExt,
task::AbortOnDropHandle,
time::{self, Duration, Instant},
};
use n0_watcher::{Watchable, Watcher};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
#[cfg(not(wasm_browser))]
use self::reportgen::QadProbeReport;
use self::reportgen::{ProbeFinished, ProbeReport};
mod defaults;
mod metrics;
mod probes;
mod report;
mod reportgen;
mod options;
pub(crate) use self::reportgen::IfStateDetails;
#[cfg(not(wasm_browser))]
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
enum QadProbeError {
#[error("Failed to resolve relay address")]
GetRelayAddr {
source: self::reportgen::GetRelayAddrError,
},
#[error("Missing host in relay URL")]
MissingHost,
#[error("QUIC connection failed")]
Quic { source: iroh_relay::quic::Error },
#[error("Receiver dropped")]
ReceiverDropped,
}
#[cfg(not(wasm_browser))]
use self::reportgen::SocketState;
pub use self::{
metrics::Metrics,
probes::Probe,
report::{RelayLatencies, Report},
};
pub(crate) use self::{options::Options, reportgen::QuicConfig};
const FULL_REPORT_INTERVAL: Duration = Duration::from_secs(5 * 60);
const ENOUGH_ENDPOINTS: usize = 3;
#[derive(Debug)]
pub(crate) struct Client {
#[cfg(not(wasm_browser))]
socket_state: SocketState,
metrics: Arc<Metrics>,
probes: BTreeSet<Probe>,
relay_map: RelayMap,
#[cfg(not(wasm_browser))]
qad_conns: QadConns,
#[cfg(not(wasm_browser))]
tls_config: rustls::ClientConfig,
reports: Reports,
}
#[cfg(not(wasm_browser))]
#[derive(Debug, Default)]
struct QadConns {
v4: Option<(RelayUrl, QadConn)>,
v6: Option<(RelayUrl, QadConn)>,
}
#[cfg(not(wasm_browser))]
impl QadConns {
fn clear(&mut self) {
if let Some((_, conn)) = self.v4.take() {
conn.conn
.close(QUIC_ADDR_DISC_CLOSE_CODE, QUIC_ADDR_DISC_CLOSE_REASON);
}
if let Some((_, conn)) = self.v6.take() {
conn.conn
.close(QUIC_ADDR_DISC_CLOSE_CODE, QUIC_ADDR_DISC_CLOSE_REASON);
}
}
fn current_v4(&self) -> Option<ProbeReport> {
if let Some((_, ref conn)) = self.v4
&& let Some(mut r) = conn.observer.get()
{
use noq_proto::PathId;
if let Some(latency) = conn.conn.rtt(PathId::ZERO) {
r.latency = latency;
}
return Some(ProbeReport::QadIpv4(r));
}
None
}
fn current_v6(&self) -> Option<ProbeReport> {
if let Some((_, ref conn)) = self.v6
&& let Some(mut r) = conn.observer.get()
{
use noq_proto::PathId;
if let Some(latency) = conn.conn.rtt(PathId::ZERO) {
r.latency = latency;
}
return Some(ProbeReport::QadIpv6(r));
}
None
}
fn watch_v4(&self) -> impl n0_future::Stream<Item = Option<QadProbeReport>> + Unpin + use<> {
let watcher = self.v4.as_ref().map(|(_url, conn)| conn.observer.watch());
if let Some(watcher) = watcher {
watcher.stream_updates_only().boxed()
} else {
n0_future::stream::empty().boxed()
}
}
fn watch_v6(&self) -> impl n0_future::Stream<Item = Option<QadProbeReport>> + Unpin + use<> {
let watcher = self.v6.as_ref().map(|(_url, conn)| conn.observer.watch());
if let Some(watcher) = watcher {
watcher.stream_updates_only().boxed()
} else {
n0_future::stream::empty().boxed()
}
}
}
#[cfg(not(wasm_browser))]
#[derive(Debug)]
struct QadConn {
conn: noq::Connection,
observer: Watchable<Option<QadProbeReport>>,
_handle: AbortOnDropHandle<Option<()>>,
}
#[derive(Debug)]
struct Reports {
next_full: bool,
prev: BTreeMap<Instant, Report>,
last: Option<Report>,
last_full: Instant,
}
impl Default for Reports {
fn default() -> Self {
Self {
next_full: true,
prev: Default::default(),
last: Default::default(),
last_full: Instant::now(),
}
}
}
impl Client {
pub(crate) fn new(
#[cfg(not(wasm_browser))] dns_resolver: DnsResolver,
relay_map: RelayMap,
opts: Options,
metrics: Arc<Metrics>,
) -> Self {
let probes = opts.as_protocols();
#[cfg(not(wasm_browser))]
let quic_client = opts
.quic_config
.map(|c| iroh_relay::quic::QuicClient::new(c.ep, c.client_config));
#[cfg(not(wasm_browser))]
let socket_state = SocketState {
quic_client,
dns_resolver,
};
Client {
#[cfg(not(wasm_browser))]
socket_state,
metrics,
reports: Reports::default(),
probes,
relay_map,
#[cfg(not(wasm_browser))]
qad_conns: QadConns::default(),
#[cfg(not(wasm_browser))]
tls_config: opts.tls_config,
}
}
pub(crate) async fn get_report(
&mut self,
if_state: IfStateDetails,
is_major: bool,
shutdown_token: CancellationToken,
) -> Report {
let now = Instant::now();
let mut do_full = is_major
|| self.reports.next_full
|| now.duration_since(self.reports.last_full) > FULL_REPORT_INTERVAL;
debug!(%do_full, "net_report starting");
if !do_full
&& let Some(ref last) = self.reports.last
&& !last.has_udp()
&& last.captive_portal == Some(true)
{
do_full = true;
}
if do_full {
self.reports.last = None; self.reports.next_full = false;
self.reports.last_full = now;
self.metrics.reports_full.inc();
}
self.metrics.reports.inc();
let num_relays = self.relay_map.len();
let enough_relays = std::cmp::min(num_relays, ENOUGH_ENDPOINTS);
#[cfg(wasm_browser)]
let if_state = IfStateDetails::default();
#[cfg(not(wasm_browser))]
let if_state = IfStateDetails {
have_v4: if_state.have_v4,
have_v6: if_state.have_v6,
};
let mut report = Report::default();
let (actor, mut probe_rx) = reportgen::Client::new(
self.reports.last.clone(),
self.relay_map.clone(),
self.probes.clone(),
if_state.clone(),
shutdown_token.child_token(),
#[cfg(not(wasm_browser))]
self.socket_state.clone(),
#[cfg(not(wasm_browser))]
self.tls_config.clone(),
);
#[cfg(not(wasm_browser))]
let reports = self
.spawn_qad_probes(
&if_state,
enough_relays,
do_full,
shutdown_token.child_token(),
)
.await;
#[cfg(not(wasm_browser))]
for r in reports {
report.update(&r);
}
if self.have_enough_reports(&if_state, do_full, num_relays, &report) {
trace!("have enough probe reports, aborting further probes");
drop(actor);
} else {
#[cfg(not(wasm_browser))]
let mut qad_v4_stream = self.qad_conns.watch_v4();
#[cfg(wasm_browser)]
let mut qad_v4_stream = n0_future::stream::empty::<Option<()>>();
#[cfg(not(wasm_browser))]
let mut qad_v6_stream = self.qad_conns.watch_v6();
#[cfg(wasm_browser)]
let mut qad_v6_stream = n0_future::stream::empty::<Option<()>>();
loop {
tokio::select! {
biased;
Some(Some(r)) = qad_v4_stream.next() => {
#[cfg(not(wasm_browser))]
{
trace!(?r, "new report from QAD V4");
report.update(&ProbeReport::QadIpv4(r));
}
}
Some(Some(r)) = qad_v6_stream.next() => {
#[cfg(not(wasm_browser))]
{
trace!(?r, "new report from QAD V6");
report.update(&ProbeReport::QadIpv6(r));
}
}
maybe_probe = probe_rx.recv() => {
let Some(probe_res) = maybe_probe else {
break;
};
match probe_res {
ProbeFinished::Regular(probe) => match probe {
Ok(probe) => {
report.update(&probe);
if self.have_enough_reports(&if_state, do_full, num_relays, &report) {
trace!("have enough probe reports, aborting further probes");
drop(actor);
break;
}
}
Err(err) => {
trace!("probe failed: {:?}", err);
}
},
#[cfg(not(wasm_browser))]
ProbeFinished::CaptivePortal(portal) => {
report.captive_portal = portal;
}
}
}
}
}
}
self.add_report_history_and_set_preferred_relay(&mut report);
debug!(
?report,
duration = ?now.elapsed(),
"net_report generated",
);
report
}
#[cfg(not(wasm_browser))]
async fn spawn_qad_probes(
&mut self,
if_state: &IfStateDetails,
enough_relays: usize,
do_full: bool,
shutdown_token: CancellationToken,
) -> Vec<ProbeReport> {
use tracing::{Instrument, warn_span};
let Some(ref quic_client) = self.socket_state.quic_client else {
return Vec::new();
};
if do_full {
self.qad_conns.clear();
}
if let Some((url, conn)) = &self.qad_conns.v4 {
if let Some(reason) = conn.conn.close_reason() {
trace!(?url, "QAD v4 conn closed: {}", reason);
self.qad_conns.v4.take();
}
}
if let Some((url, conn)) = &self.qad_conns.v6 {
if let Some(reason) = conn.conn.close_reason() {
trace!(?url, "QAD v6 conn closed: {}", reason);
self.qad_conns.v6.take();
}
}
let v4_report = self.qad_conns.current_v4();
let v6_report = self.qad_conns.current_v6();
let needs_v4_probe = v4_report.is_none();
let needs_v6_probe = v6_report.is_some() != if_state.have_v6;
let mut reports = Vec::new();
if let Some(report) = v4_report {
reports.push(report);
}
if let Some(report) = v6_report {
reports.push(report);
}
if !needs_v4_probe && !needs_v6_probe {
return reports;
}
trace!("spawning QAD probes");
const MAX_RELAYS: usize = 5;
let mut v4_buf = JoinSet::new();
let cancel_v4 = shutdown_token.child_token();
let mut v6_buf = JoinSet::new();
let cancel_v6 = shutdown_token.child_token();
let relays = self.relay_map.relays::<Vec<_>>();
for relay in relays.into_iter().take(MAX_RELAYS) {
if if_state.have_v4 && needs_v4_probe {
trace!(?relay.url, "v4 QAD probe starting");
let relay = relay.clone();
let dns_resolver = self.socket_state.dns_resolver.clone();
let quic_client = quic_client.clone();
let relay_url = relay.url.clone();
let inner_token = cancel_v4.child_token();
v4_buf.spawn(
cancel_v4
.child_token()
.run_until_cancelled_owned(time::timeout(
PROBES_TIMEOUT,
run_probe_v4(relay, quic_client, dns_resolver, inner_token),
))
.instrument(warn_span!("QADv4", %relay_url)),
);
}
if if_state.have_v6 && needs_v6_probe {
trace!(?relay.url, "v6 QAD probe starting");
let relay = relay.clone();
let dns_resolver = self.socket_state.dns_resolver.clone();
let quic_client = quic_client.clone();
let relay_url = relay.url.clone();
let inner_token = cancel_v6.child_token();
v6_buf.spawn(
cancel_v6
.child_token()
.run_until_cancelled_owned(time::timeout(
PROBES_TIMEOUT,
run_probe_v6(relay, quic_client, dns_resolver, inner_token),
))
.instrument(warn_span!("QADv6", %relay_url)),
);
}
}
let mut ipv4_pending = !v4_buf.is_empty();
let mut ipv6_pending = !v6_buf.is_empty();
while !v4_buf.is_empty() || !v6_buf.is_empty() {
if reports.len() >= enough_relays && !ipv4_pending && !ipv6_pending {
debug!("enough probes: {}", reports.len());
cancel_v4.cancel();
cancel_v6.cancel();
break;
}
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("qad report cancelled");
break;
}
val = v4_buf.join_next(), if !v4_buf.is_empty() => {
let span = warn_span!("QADv4");
let _guard = span.enter();
ipv4_pending = false;
match val {
Some(Ok(Some(Ok(res)))) => {
match res {
Ok((r, conn)) => {
debug!(?r, "probe report");
let url = r.relay.clone();
reports.push(ProbeReport::QadIpv4(r));
if self.qad_conns.v4.is_none() {
self.qad_conns.v4.replace((url, conn));
} else {
conn.conn.close(QUIC_ADDR_DISC_CLOSE_CODE, QUIC_ADDR_DISC_CLOSE_REASON);
}
}
Err(err) => {
debug!("probe failed: {err:#}");
}
}
}
Some(Err(err)) => {
if err.is_panic() {
panic!("probe panicked: {err:#}");
}
warn!("probe failed: {err:#}");
}
Some(Ok(None)) => {
debug!("probe canceled");
}
Some(Ok(Some(Err(time::Elapsed { .. })))) => {
debug!("probe timed out");
}
None => {
trace!("report canceled");
}
}
}
val = v6_buf.join_next(), if !v6_buf.is_empty() => {
let span = warn_span!("QADv6");
let _guard = span.enter();
ipv6_pending = false;
match val {
Some(Ok(Some(Ok(res)))) => {
match res {
Ok((r, conn)) => {
debug!(?r, "probe report");
let url = r.relay.clone();
reports.push(ProbeReport::QadIpv6(r));
if self.qad_conns.v6.is_none() {
self.qad_conns.v6.replace((url, conn));
} else {
conn.conn.close(QUIC_ADDR_DISC_CLOSE_CODE, QUIC_ADDR_DISC_CLOSE_REASON);
}
}
Err(err) => {
debug!("probe failed: {err:#}");
}
}
}
Some(Err(err)) => {
if err.is_panic() {
panic!("probe panicked: {err:#}");
}
warn!("probe failed: {err:#}");
}
Some(Ok(None)) => {
debug!("probe canceled");
}
Some(Ok(Some(Err(time::Elapsed { .. })))) => {
debug!("probe timed out");
}
None => {
trace!("report canceled");
}
}
}
else => {
break;
}
}
}
v4_buf.abort_all();
v6_buf.abort_all();
reports
}
fn have_enough_reports(
&self,
state: &IfStateDetails,
do_full: bool,
num_relays: usize,
report: &Report,
) -> bool {
#[cfg_attr(wasm_browser, allow(unused_mut))]
let mut num_ipv4 = 0;
#[cfg_attr(wasm_browser, allow(unused_mut))]
let mut num_ipv6 = 0;
let mut num_https = 0;
for (typ, _, _) in report.relay_latency.iter() {
match typ {
#[cfg(not(wasm_browser))]
Probe::QadIpv4 => {
num_ipv4 += 1;
}
#[cfg(not(wasm_browser))]
Probe::QadIpv6 => {
num_ipv6 += 1;
}
Probe::Https => {
num_https += 1;
}
}
}
if do_full {
match (state.have_v4, state.have_v6) {
(true, true) => {
if num_ipv4 >= 2 && num_ipv6 >= 1 || num_ipv6 >= 2 && num_ipv4 >= 1 {
return true;
}
}
(true, false) => {
if num_ipv4 >= 2 {
return true;
}
}
(false, true) => {
if num_ipv6 >= 2 {
return true;
}
}
(false, false) => {}
}
if num_https >= num_relays {
return true;
}
false
} else {
match (state.have_v4, state.have_v6) {
(true, true) => {
if num_ipv4 >= 1 && num_ipv6 >= 1 {
return true;
}
}
(true, false) => {
if num_ipv4 >= 1 {
return true;
}
}
(false, true) => {
if num_ipv6 >= 1 {
return true;
}
}
(false, false) => {}
}
if num_https >= num_relays {
return true;
}
false
}
}
fn add_report_history_and_set_preferred_relay(&mut self, r: &mut Report) {
let mut prev_relay = None;
if let Some(ref last) = self.reports.last {
prev_relay.clone_from(&last.preferred_relay);
if r.mapping_varies_by_dest_ipv4.is_none() {
r.mapping_varies_by_dest_ipv4 = last.mapping_varies_by_dest_ipv4;
}
if r.mapping_varies_by_dest_ipv6.is_none() {
r.mapping_varies_by_dest_ipv6 = last.mapping_varies_by_dest_ipv6;
}
}
let now = Instant::now();
const MAX_AGE: Duration = Duration::from_secs(5 * 60);
let mut best_recent = RelayLatencies::default();
let prevs_iter = self
.reports
.prev
.iter()
.map(|(a, b)| -> (&Instant, &Report) { (a, b) });
let mut to_remove = Vec::new();
for (t, pr) in prevs_iter {
if now.duration_since(*t) > MAX_AGE {
to_remove.push(*t);
continue;
}
best_recent.merge(&pr.relay_latency);
}
best_recent.merge(&r.relay_latency);
for t in to_remove {
self.reports.prev.remove(&t);
}
let mut best_any = Duration::default();
let mut old_relay_cur_latency = Duration::default();
{
for (_, url, duration) in r.relay_latency.iter() {
if Some(url) == prev_relay.as_ref() {
old_relay_cur_latency = duration;
}
if let Some(best) = best_recent.get(url)
&& (r.preferred_relay.is_none() || best < best_any)
{
best_any = best;
r.preferred_relay.replace(url.clone());
}
}
if prev_relay.is_some()
&& r.preferred_relay != prev_relay
&& !old_relay_cur_latency.is_zero()
&& best_any > old_relay_cur_latency / 3 * 2
{
r.preferred_relay = prev_relay;
}
}
self.reports.prev.insert(now, r.clone());
self.reports.last = Some(r.clone());
}
}
#[cfg(not(wasm_browser))]
async fn run_probe_v4(
relay: Arc<RelayConfig>,
quic_client: QuicClient,
dns_resolver: DnsResolver,
shutdown_token: CancellationToken,
) -> n0_error::Result<(QadProbeReport, QadConn), QadProbeError> {
use noq_proto::PathId;
let relay_addr = reportgen::get_relay_addr_ipv4(&dns_resolver, &relay)
.await
.map_err(|source| e!(QadProbeError::GetRelayAddr { source }))?;
trace!(?relay_addr, "resolved relay server address");
let host = relay
.url
.host_str()
.ok_or_else(|| e!(QadProbeError::MissingHost))?;
let conn = quic_client
.create_conn(relay_addr.into(), host)
.await
.map_err(|source| e!(QadProbeError::Quic { source }))?;
let mut watcher = conn.observed_external_addr();
let addr = watcher
.next()
.await
.ok_or_else(|| e!(QadProbeError::ReceiverDropped))?;
let report = QadProbeReport {
relay: relay.url.clone(),
addr: SocketAddr::new(addr.ip().to_canonical(), addr.port()),
latency: conn.rtt(PathId::ZERO).unwrap_or_default(),
};
let observer = Watchable::new(None);
let endpoint = relay.url.clone();
let handle = task::spawn(shutdown_token.run_until_cancelled_owned({
let conn = conn.clone();
let observer = observer.clone();
async move {
while let Some(val) = watcher.next().await {
let val = SocketAddr::new(val.ip().to_canonical(), val.port());
let latency = conn.rtt(PathId::ZERO).unwrap_or_default();
observer
.set(Some(QadProbeReport {
relay: endpoint.clone(),
addr: val,
latency,
}))
.ok();
}
}
}));
let handle = AbortOnDropHandle::new(handle);
Ok((
report,
QadConn {
conn,
observer,
_handle: handle,
},
))
}
#[cfg(not(wasm_browser))]
async fn run_probe_v6(
relay: Arc<RelayConfig>,
quic_client: QuicClient,
dns_resolver: DnsResolver,
shutdown_token: CancellationToken,
) -> n0_error::Result<(QadProbeReport, QadConn), QadProbeError> {
use noq_proto::PathId;
let relay_addr = reportgen::get_relay_addr_ipv6(&dns_resolver, &relay)
.await
.map_err(|source| e!(QadProbeError::GetRelayAddr { source }))?;
trace!(?relay_addr, "resolved relay server address");
let host = relay
.url
.host_str()
.ok_or_else(|| e!(QadProbeError::MissingHost))?;
let conn = quic_client
.create_conn(relay_addr.into(), host)
.await
.map_err(|source| e!(QadProbeError::Quic { source }))?;
let mut watcher = conn.observed_external_addr();
let addr = watcher
.next()
.await
.ok_or_else(|| e!(QadProbeError::ReceiverDropped))?;
let report = QadProbeReport {
relay: relay.url.clone(),
addr: SocketAddr::new(addr.ip().to_canonical(), addr.port()),
latency: conn.rtt(PathId::ZERO).unwrap_or_default(),
};
let observer = Watchable::new(None);
let endpoint = relay.url.clone();
let handle = task::spawn(shutdown_token.run_until_cancelled_owned({
let observer = observer.clone();
let conn = conn.clone();
async move {
while let Some(val) = watcher.next().await {
let val = SocketAddr::new(val.ip().to_canonical(), val.port());
let latency = conn.rtt(PathId::ZERO).unwrap_or_default();
observer
.set(Some(QadProbeReport {
relay: endpoint.clone(),
addr: val,
latency,
}))
.ok();
}
}
}));
let handle = AbortOnDropHandle::new(handle);
Ok((
report,
QadConn {
conn,
observer,
_handle: handle,
},
))
}
#[cfg(test)]
mod test_utils {
use iroh_relay::{RelayConfig, RelayQuicConfig, server};
pub(crate) async fn relay() -> (server::Server, RelayConfig) {
let server = server::Server::spawn(server::testing::server_config())
.await
.expect("should serve relay");
let quic = Some(RelayQuicConfig::new(
server.quic_addr().expect("server should run quic").port(),
));
let endpoint_desc =
RelayConfig::new(server.https_url().expect("should work as relay"), quic);
(server, endpoint_desc)
}
pub(crate) async fn relay_map(relays: usize) -> (Vec<server::Server>, crate::RelayMap) {
let mut servers = Vec::with_capacity(relays);
let mut endpoints = Vec::with_capacity(relays);
for _ in 0..relays {
let (relay_server, endpoint) = relay().await;
servers.push(relay_server);
endpoints.push(endpoint);
}
(servers, crate::RelayMap::from_iter(endpoints))
}
}
#[cfg(all(test, with_crypto_provider))]
mod tests {
use std::net::{Ipv4Addr, SocketAddr};
use iroh_base::RelayUrl;
use iroh_relay::{
dns::DnsResolver,
tls::{CaRootsConfig, default_provider},
};
use n0_error::{Result, StdResultExt};
use n0_tracing_test::traced_test;
use tokio_util::sync::CancellationToken;
use super::*;
use crate::net_report::probes::Probe;
#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn test_basic() -> Result<()> {
let (server, relay) = test_utils::relay().await;
let client_config = iroh_relay::tls::make_dangerous_client_config();
let ep = noq::Endpoint::client(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).anyerr()?;
let quic_addr_disc = QuicConfig {
ep: ep.clone(),
client_config,
ipv4: true,
ipv6: true,
};
let relay_map = RelayMap::from(relay);
let resolver = DnsResolver::new();
let tls_config = CaRootsConfig::insecure_skip_verify()
.client_config(default_provider())
.expect("infallible");
let opts = Options::new(tls_config).quic_config(Some(quic_addr_disc.clone()));
let mut client = Client::new(
resolver.clone(),
relay_map.clone(),
opts.clone(),
Default::default(),
);
let if_state = IfStateDetails::fake();
for i in 0..5 {
let cancel = CancellationToken::new();
println!("--round {i}");
let r = client
.get_report(if_state.clone(), false, cancel.child_token())
.await;
assert!(r.has_udp(), "want UDP");
dbg!(&r);
assert!(
!r.relay_latency.is_empty(),
"expected at least 1 key in RelayLatency; got none",
);
assert!(
r.relay_latency.iter().next().is_some(),
"expected key 1 in RelayLatency; got {:?}",
r.relay_latency
);
assert!(r.global_v4.is_some(), "expected globalV4 set");
assert!(r.preferred_relay.is_some(),);
cancel.cancel();
}
drop(client);
ep.wait_idle().await;
server.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_add_report_history_set_preferred_relay() -> Result {
fn relay_url(i: u16) -> RelayUrl {
format!("http://{i}.com").parse().unwrap()
}
fn report(a: impl IntoIterator<Item = (&'static str, u64)>) -> Option<Report> {
let mut report = Report::default();
for (s, d) in a {
assert!(s.starts_with('d'), "invalid relay server key");
let id: u16 = s[1..].parse().unwrap();
report.relay_latency.update_relay(
relay_url(id),
Duration::from_secs(d),
Probe::QadIpv4,
);
}
Some(report)
}
struct Step {
after: u64,
r: Option<Report>,
}
struct Test {
name: &'static str,
steps: Vec<Step>,
want_relay: Option<RelayUrl>,
want_prev_len: usize,
}
let tests = [
Test {
name: "first_reading",
steps: vec![Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
}],
want_prev_len: 1,
want_relay: Some(relay_url(1)),
},
Test {
name: "with_two",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(1)), },
Test {
name: "but_now_d1_gone",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
Step {
after: 2,
r: report([("d2", 3)]),
},
],
want_prev_len: 3,
want_relay: Some(relay_url(2)), },
Test {
name: "d1_is_back",
steps: vec![
Step {
after: 0,
r: report([("d1", 2), ("d2", 3)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
Step {
after: 2,
r: report([("d2", 3)]),
},
Step {
after: 3,
r: report([("d1", 4), ("d2", 3)]),
}, ],
want_prev_len: 4,
want_relay: Some(relay_url(1)), },
Test {
name: "things_clean_up",
steps: vec![
Step {
after: 0,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 1,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 2,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 3,
r: report([("d1", 1), ("d2", 2)]),
},
Step {
after: 10 * 60,
r: report([("d3", 3)]),
},
],
want_prev_len: 1, want_relay: Some(relay_url(3)), },
Test {
name: "preferred_relay_hysteresis_no_switch",
steps: vec![
Step {
after: 0,
r: report([("d1", 4), ("d2", 5)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 3)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(1)), },
Test {
name: "preferred_relay_hysteresis_do_switch",
steps: vec![
Step {
after: 0,
r: report([("d1", 4), ("d2", 5)]),
},
Step {
after: 1,
r: report([("d1", 4), ("d2", 1)]),
},
],
want_prev_len: 2,
want_relay: Some(relay_url(2)), },
];
let resolver = DnsResolver::new();
let tls_config = CaRootsConfig::insecure_skip_verify()
.client_config(default_provider())
.expect("infallible");
for mut tt in tests {
println!("test: {}", tt.name);
let relay_map = RelayMap::empty();
let opts = Options::new(tls_config.clone());
let mut client = Client::new(resolver.clone(), relay_map, opts, Default::default());
for s in &mut tt.steps {
tokio::time::advance(Duration::from_secs(s.after)).await;
client.add_report_history_and_set_preferred_relay(s.r.as_mut().unwrap());
}
let last_report = tt.steps.last().unwrap().r.clone().unwrap();
let got = client.reports.prev.len();
let want = tt.want_prev_len;
assert_eq!(got, want, "prev length");
let got = &last_report.preferred_relay;
let want = &tt.want_relay;
assert_eq!(got, want, "preferred_relay");
}
Ok(())
}
}