#[cfg(not(wasm_browser))]
use std::net::{SocketAddrV4, SocketAddrV6};
use std::{
collections::BTreeSet,
net::{IpAddr, SocketAddr},
sync::Arc,
};
use http::StatusCode;
use iroh_base::RelayUrl;
use iroh_relay::{
RelayConfig, RelayMap, defaults::DEFAULT_RELAY_QUIC_PORT, http::RELAY_PROBE_PATH,
};
#[cfg(not(wasm_browser))]
use iroh_relay::{
dns::{DnsError, DnsResolver, StaggeredError},
quic::QuicClient,
};
use n0_error::{e, stack_error};
#[cfg(wasm_browser)]
use n0_future::future::Pending;
use n0_future::{
StreamExt as _,
task::{self, AbortOnDropHandle, JoinSet},
time::{self, Duration, Instant},
};
use rand::seq::IteratorRandom;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, trace, warn, warn_span};
use url::Host;
#[cfg(not(wasm_browser))]
use super::defaults::timeouts::DNS_TIMEOUT;
use super::{
Report,
probes::{Probe, ProbePlan},
};
#[cfg(not(wasm_browser))]
use crate::address_lookup::DNS_STAGGERING_MS;
use crate::{
net_report::defaults::timeouts::{
CAPTIVE_PORTAL_DELAY, CAPTIVE_PORTAL_TIMEOUT, OVERALL_REPORT_TIMEOUT, PROBES_TIMEOUT,
},
util::reqwest_client_builder,
};
#[derive(Debug)]
pub(super) struct Client {
_drop_guard: AbortOnDropHandle<()>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct IfStateDetails {
pub(crate) have_v4: bool,
pub(crate) have_v6: bool,
}
impl IfStateDetails {
#[cfg(test)]
pub(super) fn fake() -> Self {
IfStateDetails {
have_v4: true,
have_v6: true,
}
}
}
impl From<netwatch::netmon::State> for IfStateDetails {
fn from(value: netwatch::netmon::State) -> Self {
IfStateDetails {
have_v4: value.have_v4,
have_v6: value.have_v6,
}
}
}
#[cfg(not(wasm_browser))]
#[derive(Debug, Clone)]
pub(super) struct SocketState {
pub(super) quic_client: Option<QuicClient>,
pub(super) dns_resolver: DnsResolver,
}
impl Client {
pub(super) fn new(
last_report: Option<Report>,
relay_map: RelayMap,
protocols: BTreeSet<Probe>,
if_state: IfStateDetails,
shutdown_token: CancellationToken,
#[cfg(not(wasm_browser))] socket_state: SocketState,
#[cfg(not(wasm_browser))] tls_config: rustls::ClientConfig,
) -> (Self, mpsc::Receiver<ProbeFinished>) {
let (msg_tx, msg_rx) = mpsc::channel(32);
let actor = Actor {
msg_tx,
last_report,
relay_map,
protocols,
#[cfg(not(wasm_browser))]
socket_state,
#[cfg(not(wasm_browser))]
tls_config,
if_state,
};
let task = task::spawn(
actor
.run(shutdown_token)
.instrument(warn_span!("reportgen-actor")),
);
(
Self {
_drop_guard: AbortOnDropHandle::new(task),
},
msg_rx,
)
}
}
#[derive(Debug)]
struct Actor {
msg_tx: mpsc::Sender<ProbeFinished>,
last_report: Option<Report>,
relay_map: RelayMap,
protocols: BTreeSet<Probe>,
#[cfg(not(wasm_browser))]
socket_state: SocketState,
#[cfg(not(wasm_browser))]
tls_config: rustls::ClientConfig,
if_state: IfStateDetails,
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub(super) enum ProbesError {
#[error("Probe failed")]
ProbeFailure { source: ProbeError },
#[error("All probes failed")]
AllProbesFailed,
#[error("Probe cancelled")]
Cancelled,
#[error("Probe timed out")]
Timeout,
}
#[derive(Debug)]
pub(super) enum ProbeFinished {
Regular(Result<ProbeReport, ProbesError>),
#[cfg(not(wasm_browser))]
CaptivePortal(Option<bool>),
}
impl Actor {
async fn run(self, shutdown_token: CancellationToken) {
shutdown_token
.run_until_cancelled_owned(async {
match time::timeout(OVERALL_REPORT_TIMEOUT, self.run_inner()).await {
Ok(()) => trace!("reportgen actor finished"),
Err(time::Elapsed { .. }) => {
warn!("reportgen timed out");
}
}
})
.await;
}
async fn run_inner(self) {
trace!("reportgen actor starting");
let mut probes = JoinSet::default();
let _probes_token = self.spawn_probes_task(self.if_state.clone(), &mut probes);
let mut num_probes = probes.len();
let captive_token = self.prepare_captive_portal_task(&mut probes);
let mut have_udp = false;
while let Some(probe_result) = probes.join_next().await {
trace!(?probe_result, num_probes, "processing finished probe");
match probe_result {
Ok(report) => {
#[cfg_attr(wasm_browser, allow(irrefutable_let_patterns))]
if let ProbeFinished::Regular(report) = &report {
have_udp |= report.as_ref().map(|r| r.is_udp()).unwrap_or_default();
num_probes -= 1;
if num_probes == 0 {
trace!("all regular probes done");
debug_assert!(probes.len() <= 1, "{} probes", probes.len());
if have_udp {
captive_token.cancel();
}
}
}
self.msg_tx.send(report).await.ok();
}
Err(e) => {
if e.is_panic() {
error!("Task panicked {:?}", e);
break;
}
warn!("probes task join error: {:?}", e);
}
}
}
}
fn prepare_captive_portal_task(&self, tasks: &mut JoinSet<ProbeFinished>) -> CancellationToken {
let token = CancellationToken::new();
#[cfg(not(wasm_browser))]
if self.last_report.is_none() {
let preferred_relay = self
.last_report
.as_ref()
.and_then(|l| l.preferred_relay.clone());
let dns_resolver = self.socket_state.dns_resolver.clone();
let dm = self.relay_map.clone();
let token = token.clone();
#[cfg(not(wasm_browser))]
let tls_config = self.tls_config.clone();
tasks.spawn(
async move {
let res = token
.run_until_cancelled_owned(async move {
time::sleep(CAPTIVE_PORTAL_DELAY).await;
trace!("check started after {CAPTIVE_PORTAL_DELAY:?}");
time::timeout(
CAPTIVE_PORTAL_TIMEOUT,
check_captive_portal(
&dns_resolver,
&dm,
preferred_relay,
tls_config,
),
)
.await
})
.await;
let res = match res {
Some(Ok(Ok(found))) => Some(found),
Some(Ok(Err(err))) => {
match err {
CaptivePortalError::CreateReqwestClient { source, .. }
| CaptivePortalError::HttpRequest { source, .. }
if source.is_connect() =>
{
debug!("check_captive_portal failed: {source:#}");
}
err => warn!("check_captive_portal error: {err:#}"),
}
None
}
Some(Err(time::Elapsed { .. })) => {
warn!("probe timed out");
None
}
None => {
trace!("probe cancelled");
None
}
};
ProbeFinished::CaptivePortal(res)
}
.instrument(warn_span!("captive-portal")),
);
}
token
}
fn spawn_probes_task(
&self,
if_state: IfStateDetails,
probes: &mut JoinSet<ProbeFinished>,
) -> CancellationToken {
trace!(?if_state, "local interface details");
let plan = match self.last_report {
Some(ref report) => {
ProbePlan::with_last_report(&self.relay_map, report, &self.protocols)
}
None => ProbePlan::initial(&self.relay_map, &self.protocols),
};
trace!(%plan, "probe plan");
let token = CancellationToken::new();
for probe_set in plan.iter() {
let set_token = token.child_token();
let proto = probe_set.proto();
for (delay, relay) in probe_set.params() {
let probe_token = set_token.child_token();
let fut = probe_token.run_until_cancelled_owned(time::timeout(
PROBES_TIMEOUT,
proto.run(
*delay,
relay.clone(),
#[cfg(not(wasm_browser))]
self.socket_state.clone(),
#[cfg(not(wasm_browser))]
self.tls_config.clone(),
),
));
probes.spawn(
async move {
let res = fut.await;
let res = match res {
Some(Ok(Ok(report))) => Ok(report),
Some(Ok(Err(err))) => {
warn!("probe failed: {:#}", err);
Err(e!(ProbesError::ProbeFailure, err))
}
Some(Err(time::Elapsed { .. })) => Err(e!(ProbesError::Timeout)),
None => Err(e!(ProbesError::Cancelled)),
};
ProbeFinished::Regular(res)
}
.instrument(warn_span!(
"run-probe",
?proto,
?delay,
relay=%relay.url,
)),
);
}
}
token
}
}
#[derive(Debug, Clone)]
pub(super) enum ProbeReport {
#[cfg(not(wasm_browser))]
QadIpv4(QadProbeReport),
#[cfg(not(wasm_browser))]
QadIpv6(QadProbeReport),
Https(HttpsProbeReport),
}
impl ProbeReport {
#[cfg(not(wasm_browser))]
pub(super) fn is_udp(&self) -> bool {
matches!(self, Self::QadIpv4(_) | Self::QadIpv6(_))
}
#[cfg(wasm_browser)]
pub(super) fn is_udp(&self) -> bool {
false
}
}
#[cfg(not(wasm_browser))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct QadProbeReport {
pub(super) relay: RelayUrl,
pub(super) latency: Duration,
pub(super) addr: SocketAddr,
}
#[derive(Debug, Clone)]
pub(super) struct HttpsProbeReport {
pub(super) relay: RelayUrl,
pub(super) latency: Duration,
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub(super) enum ProbeError {
#[error("Client is gone")]
ClientGone,
#[error("Probe is no longer useful")]
NotUseful,
#[error("Failed to run HTTPS probe")]
Https { source: MeasureHttpsLatencyError },
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub(super) enum QuicError {
#[error("No relay available")]
NoRelay,
#[error("URL must have 'host' to use QUIC address discovery probes")]
InvalidUrl,
}
#[derive(derive_more::Debug, Clone)]
pub(crate) struct QuicConfig {
#[debug("noq::Endpoint")]
pub(crate) ep: noq::Endpoint,
pub(crate) client_config: rustls::ClientConfig,
pub(crate) ipv4: bool,
pub(crate) ipv6: bool,
}
impl Probe {
async fn run(
self,
delay: Duration,
relay: Arc<RelayConfig>,
#[cfg(not(wasm_browser))] socket_state: SocketState,
#[cfg(not(wasm_browser))] tls_config: rustls::ClientConfig,
) -> Result<ProbeReport, ProbeError> {
if !delay.is_zero() {
trace!("delaying probe");
time::sleep(delay).await;
}
trace!("starting probe");
let report = match self {
Probe::Https => {
match run_https_probe(
#[cfg(not(wasm_browser))]
&socket_state.dns_resolver,
relay.url.clone(),
#[cfg(not(wasm_browser))]
tls_config,
)
.await
{
Ok(report) => Ok(ProbeReport::Https(report)),
Err(err) => Err(e!(ProbeError::Https, err)),
}
}
#[cfg(not(wasm_browser))]
Probe::QadIpv4 | Probe::QadIpv6 => unreachable!("must not be used"),
};
debug!(?report, "probe finished");
report
}
}
#[cfg(not(wasm_browser))]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
enum CaptivePortalError {
#[error(transparent)]
DnsLookup {
#[error(from)]
source: StaggeredError<DnsError>,
},
#[error("Creating HTTP client failed")]
CreateReqwestClient {
#[error(std_err)]
source: reqwest::Error,
},
#[error("HTTP request failed")]
HttpRequest {
#[error(std_err)]
source: reqwest::Error,
},
}
#[cfg(not(wasm_browser))]
async fn check_captive_portal(
dns_resolver: &DnsResolver,
dm: &RelayMap,
preferred_relay: Option<RelayUrl>,
tls_config: rustls::ClientConfig,
) -> Result<bool, CaptivePortalError> {
use crate::util::reqwest_client_builder;
let preferred_relay = preferred_relay.and_then(|url| dm.get(&url).map(|_| url));
let url = match preferred_relay {
Some(url) => url,
None => {
let urls: Vec<_> = dm.urls();
if urls.is_empty() {
trace!("No suitable relay for captive portal check");
return Ok(false);
}
let i = (0..urls.len()).choose(&mut rand::rng()).unwrap_or_default();
urls[i].clone()
}
};
let mut builder =
reqwest_client_builder(Some(tls_config)).redirect(reqwest::redirect::Policy::none());
if let Some(Host::Domain(domain)) = url.host() {
let addrs: Vec<_> = dns_resolver
.lookup_ipv4_ipv6_staggered(domain, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await?
.map(|ipaddr| SocketAddr::new(ipaddr, 0))
.collect();
builder = builder.resolve_to_addrs(domain, &addrs);
}
let client = builder
.build()
.map_err(|err| e!(CaptivePortalError::CreateReqwestClient, err))?;
let host_name = url.host_str().unwrap_or_default();
let challenge = format!("ts_{host_name}");
let portal_url = format!("http://{host_name}/generate_204");
let res = client
.request(reqwest::Method::GET, portal_url)
.header("X-Iroh-Challenge", &challenge)
.send()
.await
.map_err(|err| e!(CaptivePortalError::HttpRequest, err))?;
let expected_response = format!("response {challenge}");
let is_valid_response = res
.headers()
.get("X-Iroh-Response")
.map(|s| s.to_str().unwrap_or_default())
== Some(&expected_response);
trace!(
"check_captive_portal url={} status_code={} valid_response={}",
res.url(),
res.status(),
is_valid_response,
);
let has_captive = res.status() != 204 || !is_valid_response;
Ok(has_captive)
}
#[cfg(not(wasm_browser))]
fn get_quic_port(relay: &RelayConfig) -> Option<u16> {
if let Some(ref quic) = relay.quic {
if quic.port == 0 {
Some(DEFAULT_RELAY_QUIC_PORT)
} else {
Some(quic.port)
}
} else {
None
}
}
#[cfg(not(wasm_browser))]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub(super) enum GetRelayAddrError {
#[error("No valid hostname in the relay URL")]
InvalidHostname,
#[error("No suitable relay address found for {url} ({addr_type})")]
NoAddrFound {
url: RelayUrl,
addr_type: &'static str,
},
#[error("DNS lookup failed")]
DnsLookup { source: StaggeredError<DnsError> },
#[error("Relay is not suitable")]
UnsupportedRelay,
#[error("HTTPS probes are not implemented")]
UnsupportedHttps,
#[error("No port available for this protocol")]
MissingPort,
}
#[cfg(not(wasm_browser))]
pub(super) async fn get_relay_addr_ipv4(
dns_resolver: &DnsResolver,
relay: &RelayConfig,
) -> Result<SocketAddrV4, GetRelayAddrError> {
let port = get_quic_port(relay).ok_or_else(|| e!(GetRelayAddrError::MissingPort))?;
relay_lookup_ipv4_staggered(dns_resolver, relay, port).await
}
#[cfg(not(wasm_browser))]
pub(super) async fn get_relay_addr_ipv6(
dns_resolver: &DnsResolver,
relay: &RelayConfig,
) -> Result<SocketAddrV6, GetRelayAddrError> {
let port = get_quic_port(relay).ok_or_else(|| e!(GetRelayAddrError::MissingPort))?;
relay_lookup_ipv6_staggered(dns_resolver, relay, port).await
}
#[cfg(not(wasm_browser))]
async fn relay_lookup_ipv4_staggered(
dns_resolver: &DnsResolver,
relay: &RelayConfig,
port: u16,
) -> Result<SocketAddrV4, GetRelayAddrError> {
match relay.url.host() {
Some(url::Host::Domain(hostname)) => {
trace!(%hostname, "Performing DNS A lookup for relay addr");
match dns_resolver
.lookup_ipv4_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await
{
Ok(mut addrs) => addrs
.next()
.map(|ip| ip.to_canonical())
.map(|addr| match addr {
IpAddr::V4(ip) => SocketAddrV4::new(ip, port),
IpAddr::V6(_) => unreachable!("bad DNS lookup: {:?}", addr),
})
.ok_or_else(|| {
e!(GetRelayAddrError::NoAddrFound {
url: relay.url.clone(),
addr_type: "A",
})
}),
Err(err) => Err(e!(GetRelayAddrError::DnsLookup, err)),
}
}
Some(url::Host::Ipv4(addr)) => Ok(SocketAddrV4::new(addr, port)),
Some(url::Host::Ipv6(_addr)) => Err(e!(GetRelayAddrError::NoAddrFound {
url: relay.url.clone(),
addr_type: "A",
})),
None => Err(e!(GetRelayAddrError::InvalidHostname)),
}
}
#[cfg(not(wasm_browser))]
async fn relay_lookup_ipv6_staggered(
dns_resolver: &DnsResolver,
relay: &RelayConfig,
port: u16,
) -> Result<SocketAddrV6, GetRelayAddrError> {
match relay.url.host() {
Some(url::Host::Domain(hostname)) => {
trace!(%hostname, "Performing DNS AAAA lookup for relay addr");
match dns_resolver
.lookup_ipv6_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await
{
Ok(mut addrs) => addrs
.next()
.map(|addr| match addr {
IpAddr::V4(_) => unreachable!("bad DNS lookup: {:?}", addr),
IpAddr::V6(ip) => SocketAddrV6::new(ip, port, 0, 0),
})
.ok_or_else(|| {
e!(GetRelayAddrError::NoAddrFound {
url: relay.url.clone(),
addr_type: "AAAA",
})
}),
Err(err) => Err(e!(GetRelayAddrError::DnsLookup, err)),
}
}
Some(url::Host::Ipv4(_addr)) => Err(e!(GetRelayAddrError::NoAddrFound {
url: relay.url.clone(),
addr_type: "AAAA",
})),
Some(url::Host::Ipv6(addr)) => Ok(SocketAddrV6::new(addr, port, 0, 0)),
None => Err(e!(GetRelayAddrError::InvalidHostname)),
}
}
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub(super) enum MeasureHttpsLatencyError {
#[error(transparent)]
InvalidUrl {
#[error(std_err, from)]
source: url::ParseError,
},
#[cfg(not(wasm_browser))]
#[error(transparent)]
DnsLookup {
#[error(from)]
source: StaggeredError<DnsError>,
},
#[error("Creating HTTP client failed")]
CreateReqwestClient {
#[error(std_err)]
source: reqwest::Error,
},
#[error("HTTP request failed")]
HttpRequest {
#[error(std_err)]
source: reqwest::Error,
},
#[error("Error response from server {status}: {:?}", status.canonical_reason())]
InvalidResponse { status: StatusCode },
}
#[allow(clippy::unused_async)]
async fn run_https_probe(
#[cfg(not(wasm_browser))] dns_resolver: &DnsResolver,
relay: RelayUrl,
#[cfg(not(wasm_browser))] tls_config: rustls::ClientConfig,
) -> Result<HttpsProbeReport, MeasureHttpsLatencyError> {
trace!("HTTPS probe start");
let url = relay.join(RELAY_PROBE_PATH)?;
#[cfg(not(wasm_browser))]
let mut builder = reqwest_client_builder(Some(tls_config));
#[cfg(wasm_browser)]
let mut builder = reqwest_client_builder(None);
#[cfg(not(wasm_browser))]
{
builder = builder.redirect(reqwest::redirect::Policy::none());
}
#[cfg(not(wasm_browser))]
if let Some(Host::Domain(domain)) = url.host() {
let addrs: Vec<_> = dns_resolver
.lookup_ipv4_ipv6_staggered(domain, DNS_TIMEOUT, DNS_STAGGERING_MS)
.await?
.map(|ipaddr| SocketAddr::new(ipaddr, 0))
.collect();
trace!(?addrs, "resolved addrs");
builder = builder.resolve_to_addrs(domain, &addrs);
}
let client = builder
.build()
.map_err(|err| e!(MeasureHttpsLatencyError::CreateReqwestClient, err))?;
let start = Instant::now();
let response = client
.request(reqwest::Method::GET, url)
.send()
.await
.map_err(|err| e!(MeasureHttpsLatencyError::HttpRequest, err))?;
let latency = start.elapsed();
if response.status().is_success() {
const MAX_BODY_SIZE: usize = 8 << 10; let mut body_size = 0;
let mut stream = response.bytes_stream();
while let Some(Ok(chunk)) = stream.next().await {
body_size += chunk.len();
if body_size >= MAX_BODY_SIZE {
break;
}
}
Ok(HttpsProbeReport { relay, latency })
} else {
Err(e!(MeasureHttpsLatencyError::InvalidResponse {
status: response.status()
}))
}
}
#[cfg(all(test, with_crypto_provider))]
mod tests {
use std::net::Ipv4Addr;
use iroh_relay::{
dns::DnsResolver,
tls::{CaRootsConfig, default_provider},
};
use n0_error::{Result, StdResultExt};
use n0_tracing_test::traced_test;
use super::{super::test_utils, *};
#[tokio::test]
async fn test_measure_https_latency() -> Result {
let (_server, relay) = test_utils::relay().await;
let dns_resolver = DnsResolver::new();
tracing::info!(relay_url = ?relay.url , "RELAY_URL");
let report = run_https_probe(
&dns_resolver,
relay.url,
CaRootsConfig::insecure_skip_verify()
.client_config(default_provider())
.expect("infallible"),
)
.await?;
assert!(report.latency > Duration::ZERO);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_qad_probe_v4() -> Result {
let (server, relay) = test_utils::relay().await;
let relay = Arc::new(relay);
let client_config = iroh_relay::tls::make_dangerous_client_config();
let ep = noq::Endpoint::client(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).anyerr()?;
let client_addr = ep.local_addr().anyerr()?;
let quic_client = iroh_relay::quic::QuicClient::new(ep.clone(), client_config);
let dns_resolver = DnsResolver::default();
let (report, conn) =
super::super::run_probe_v4(relay, quic_client, dns_resolver, CancellationToken::new())
.await
.unwrap();
assert_eq!(report.addr, client_addr);
drop(conn);
ep.wait_idle().await;
server.shutdown().await?;
Ok(())
}
}