#[cfg(test)]
use std::net::SocketAddr;
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
net::IpAddr,
pin::{Pin, pin},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use backon::{Backoff, BackoffBuilder, ExponentialBuilder};
use iroh_base::{EndpointId, RelayUrl, SecretKey};
use iroh_relay::{
self as relay, PingTracker,
client::{Client, ConnectError, RecvError, SendError},
protos::relay::{ClientToRelayMsg, Datagrams, RelayToClientMsg, Status},
};
use n0_error::{e, stack_error};
use n0_future::{
FuturesUnorderedBounded, SinkExt, StreamExt,
task::JoinSet,
time::{self, Duration, Instant, MissedTickBehavior},
};
use n0_watcher::Watchable;
use netwatch::interfaces;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Level, debug, error, event, info, info_span, instrument, trace, warn};
use url::Url;
#[cfg(not(wasm_browser))]
use crate::dns::DnsResolver;
use crate::{net_report::Report, socket::Metrics as SocketMetrics, util::MaybeFuture};
const RELAY_INACTIVE_CLEANUP_TIME: Duration = Duration::from_secs(60);
const PING_INTERVAL: Duration = Duration::from_secs(15);
const SEND_DATAGRAM_BATCH_SIZE: usize = 20;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const UNDELIVERABLE_DATAGRAM_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug)]
struct ActiveRelayActor {
prio_inbox: mpsc::Receiver<ActiveRelayPrioMessage>,
inbox: mpsc::Receiver<ActiveRelayMessage>,
relay_datagrams_recv: mpsc::Sender<RelayRecvDatagram>,
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
url: RelayUrl,
relay_client_builder: relay::client::ClientBuilder,
is_home_relay: bool,
inactive_timeout: Pin<Box<time::Sleep>>,
stop_token: CancellationToken,
metrics: Arc<SocketMetrics>,
my_relay: HomeRelayWatch,
}
#[derive(Debug)]
enum ActiveRelayMessage {
CheckConnection { local_ips: Vec<IpAddr> },
SetHomeRelay(bool),
#[cfg(test)]
GetLocalAddr(oneshot::Sender<Option<SocketAddr>>),
#[cfg(test)]
PingServer(oneshot::Sender<()>),
}
#[derive(Debug)]
enum ActiveRelayPrioMessage {
HasEndpointRoute(EndpointId, oneshot::Sender<bool>),
}
#[derive(Debug)]
struct ActiveRelayActorOptions {
url: RelayUrl,
prio_inbox_: mpsc::Receiver<ActiveRelayPrioMessage>,
inbox: mpsc::Receiver<ActiveRelayMessage>,
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
relay_datagrams_recv: mpsc::Sender<RelayRecvDatagram>,
connection_opts: RelayConnectionOptions,
stop_token: CancellationToken,
metrics: Arc<SocketMetrics>,
my_relay: HomeRelayWatch,
}
#[derive(Debug, Clone)]
struct RelayConnectionOptions {
secret_key: SecretKey,
#[cfg(not(wasm_browser))]
dns_resolver: DnsResolver,
proxy_url: Option<Url>,
prefer_ipv6: Arc<AtomicBool>,
tls_config: rustls::ClientConfig,
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
enum RelayConnectionError {
#[error("Failed to connect to relay server")]
Dial { source: DialError },
#[error("Failed to handshake with relay server")]
Handshake { source: RunError },
#[error("Lost connection to relay server")]
Established { source: RunError },
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
enum RunError {
#[error("Send timeout")]
SendTimeout,
#[error("Ping timeout")]
PingTimeout,
#[error("Local IP no longer valid")]
LocalIpInvalid,
#[error("No local address")]
LocalAddrMissing,
#[error("Stream closed by server.")]
StreamClosedServer,
#[error("Client stream read failed")]
ClientStreamRead {
#[error(std_err)]
source: RecvError,
},
#[error("Client stream write failed")]
ClientStreamWrite {
#[error(std_err)]
source: SendError,
},
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
enum DialError {
#[error("timeout (>{timeout:?}) trying to establish a connection")]
Timeout { timeout: Duration },
#[error("unable to connect")]
Connect { source: ConnectError },
}
impl ActiveRelayActor {
fn new(opts: ActiveRelayActorOptions) -> Self {
let ActiveRelayActorOptions {
url,
prio_inbox_: prio_inbox,
inbox,
relay_datagrams_send,
relay_datagrams_recv,
connection_opts,
stop_token,
metrics,
my_relay,
} = opts;
let relay_client_builder = Self::create_relay_builder(url.clone(), connection_opts);
ActiveRelayActor {
prio_inbox,
inbox,
relay_datagrams_recv,
relay_datagrams_send,
url,
relay_client_builder,
is_home_relay: false,
inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)),
stop_token,
metrics,
my_relay,
}
}
fn create_relay_builder(
url: RelayUrl,
opts: RelayConnectionOptions,
) -> relay::client::ClientBuilder {
let RelayConnectionOptions {
secret_key,
#[cfg(not(wasm_browser))]
dns_resolver,
proxy_url,
prefer_ipv6,
tls_config,
} = opts;
let mut builder = relay::client::ClientBuilder::new(
url,
secret_key,
#[cfg(not(wasm_browser))]
dns_resolver,
)
.tls_client_config(tls_config)
.address_family_selector(move || prefer_ipv6.load(Ordering::Relaxed));
if let Some(proxy_url) = proxy_url {
builder = builder.proxy_url(proxy_url);
}
builder
}
async fn run(mut self) {
let mut backoff = Self::build_backoff();
while let Err(err) = self.run_once().await {
warn!("{err:#}");
self.my_relay
.set_status(&self.url, HomeRelayStatus::Disconnected);
match err {
RelayConnectionError::Dial { .. } | RelayConnectionError::Handshake { .. } => {
let Some(delay) = backoff.next() else {
warn!("retries exceeded");
break;
};
debug!("retry in {delay:?}");
time::sleep(delay).await;
}
RelayConnectionError::Established { .. } => {
backoff = Self::build_backoff();
}
}
}
debug!("exiting");
}
fn build_backoff() -> impl Backoff {
ExponentialBuilder::new()
.with_min_delay(Duration::from_millis(10))
.with_max_delay(Duration::from_secs(16))
.with_jitter()
.without_max_times()
.build()
}
async fn run_once(&mut self) -> Result<(), RelayConnectionError> {
self.my_relay
.set_status(&self.url, HomeRelayStatus::Connecting);
let client = match self.run_dialing().instrument(info_span!("dialing")).await {
Some(client_res) => client_res.map_err(|err| e!(RelayConnectionError::Dial, err))?,
None => return Ok(()),
};
self.my_relay
.set_status(&self.url, HomeRelayStatus::Connected);
self.run_connected(client)
.instrument(info_span!("connected"))
.await
}
fn reset_inactive_timeout(&mut self) {
self.inactive_timeout
.as_mut()
.reset(Instant::now() + RELAY_INACTIVE_CLEANUP_TIME);
}
fn set_home_relay(&mut self, is_home: bool) {
let prev = std::mem::replace(&mut self.is_home_relay, is_home);
if self.is_home_relay != prev {
event!(
target: "iroh::_events::relay::home_changed",
Level::DEBUG,
url = %self.url,
home_relay = self.is_home_relay,
);
}
}
async fn run_dialing(&mut self) -> Option<Result<iroh_relay::client::Client, DialError>> {
trace!("Actor loop: connecting to relay.");
let mut send_datagram_flush = time::interval(UNDELIVERABLE_DATAGRAM_TIMEOUT);
send_datagram_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
send_datagram_flush.reset();
let dialing_fut = self.dial_relay();
tokio::pin!(dialing_fut);
loop {
tokio::select! {
biased;
_ = self.stop_token.cancelled() => {
debug!("Shutdown.");
break None;
}
msg = self.prio_inbox.recv() => {
let Some(msg) = msg else {
warn!("Priority inbox closed, shutdown.");
break None;
};
match msg {
ActiveRelayPrioMessage::HasEndpointRoute(_peer, sender) => {
sender.send(false).ok();
}
}
}
res = &mut dialing_fut => {
match res {
Ok(client) => {
break Some(Ok(client));
}
Err(err) => {
break Some(Err(err));
}
}
}
msg = self.inbox.recv() => {
let Some(msg) = msg else {
debug!("Inbox closed, shutdown.");
break None;
};
match msg {
ActiveRelayMessage::SetHomeRelay(is_home) => {
self.set_home_relay(is_home);
}
ActiveRelayMessage::CheckConnection { .. } => {}
#[cfg(test)]
ActiveRelayMessage::GetLocalAddr(sender) => {
sender.send(None).ok();
}
#[cfg(test)]
ActiveRelayMessage::PingServer(sender) => {
drop(sender);
}
}
}
_ = send_datagram_flush.tick() => {
self.reset_inactive_timeout();
let mut logged = false;
while self.relay_datagrams_send.try_recv().is_ok() {
if !logged {
debug!(?UNDELIVERABLE_DATAGRAM_TIMEOUT, "Dropping datagrams to send.");
logged = true;
}
}
}
_ = &mut self.inactive_timeout, if !self.is_home_relay => {
debug!(?RELAY_INACTIVE_CLEANUP_TIME, "Inactive, exiting.");
break None;
}
}
}
}
fn dial_relay(&self) -> impl Future<Output = Result<Client, DialError>> + use<> {
let client_builder = self.relay_client_builder.clone();
async move {
match time::timeout(CONNECT_TIMEOUT, client_builder.connect()).await {
Ok(Ok(client)) => Ok(client),
Ok(Err(err)) => Err(e!(DialError::Connect, err)),
Err(_) => Err(e!(DialError::Timeout {
timeout: CONNECT_TIMEOUT
})),
}
}
}
async fn run_connected(
&mut self,
client: iroh_relay::client::Client,
) -> Result<(), RelayConnectionError> {
debug!("Actor loop: connected to relay");
event!(
target: "iroh::_events::relay::connected",
Level::DEBUG,
url = %self.url,
home_relay = self.is_home_relay,
);
let (mut client_stream, client_sink) = client.split();
let mut client_sink = client_sink.sink_map_err(|e| e!(RunError::ClientStreamWrite, e));
let mut state = ConnectedRelayState {
ping_tracker: PingTracker::default(),
endpoints_present: BTreeSet::new(),
last_packet_src: None,
pong_pending: None,
established: false,
#[cfg(test)]
test_pong: None,
};
let mut send_datagrams_buf = Vec::with_capacity(SEND_DATAGRAM_BATCH_SIZE);
let mut ping_interval = time::interval(PING_INTERVAL);
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let res = loop {
if let Some(data) = state.pong_pending.take() {
let fut = client_sink.send(ClientToRelayMsg::Pong(data));
self.run_sending(fut, &mut state, &mut client_stream)
.await?;
}
tokio::select! {
biased;
_ = self.stop_token.cancelled() => {
debug!("Shutdown.");
break Ok(());
}
msg = self.prio_inbox.recv() => {
let Some(msg) = msg else {
warn!("Priority inbox closed, shutdown.");
break Ok(());
};
match msg {
ActiveRelayPrioMessage::HasEndpointRoute(peer, sender) => {
let has_peer = state.endpoints_present.contains(&peer);
sender.send(has_peer).ok();
}
}
}
_ = state.ping_tracker.timeout() => {
break Err(e!(RunError::PingTimeout));
}
_ = ping_interval.tick() => {
let data = state.ping_tracker.new_ping();
let fut = client_sink.send(ClientToRelayMsg::Ping(data));
self.run_sending(fut, &mut state, &mut client_stream).await?;
}
msg = self.inbox.recv() => {
let Some(msg) = msg else {
warn!("Inbox closed, shutdown.");
break Ok(());
};
match msg {
ActiveRelayMessage::SetHomeRelay(is_home) => {
self.set_home_relay(is_home);
if is_home {
self.my_relay.set_status(
&self.url,
HomeRelayStatus::Connected,
);
}
}
ActiveRelayMessage::CheckConnection { local_ips } => {
match client_stream.local_addr() {
Some(addr) if local_ips.contains(&addr.ip()) => {
let data = state.ping_tracker.new_ping();
let fut = client_sink.send(ClientToRelayMsg::Ping(data));
self.run_sending(fut, &mut state, &mut client_stream).await?;
}
Some(_) => break Err(e!(RunError::LocalIpInvalid)),
None => break Err(e!(RunError::LocalAddrMissing)),
}
}
#[cfg(test)]
ActiveRelayMessage::GetLocalAddr(sender) => {
let addr = client_stream.local_addr();
sender.send(addr).ok();
}
#[cfg(test)]
ActiveRelayMessage::PingServer(sender) => {
let data = rand::random();
state.test_pong = Some((data, sender));
let fut = client_sink.send(ClientToRelayMsg::Ping(data));
self.run_sending(fut, &mut state, &mut client_stream).await?;
}
}
}
count = self.relay_datagrams_send.recv_many(
&mut send_datagrams_buf,
SEND_DATAGRAM_BATCH_SIZE,
) => {
if count == 0 {
warn!("Datagram inbox closed, shutdown");
break Ok(());
};
self.reset_inactive_timeout();
let metrics = self.metrics.clone();
let packet_iter = send_datagrams_buf.drain(..).map(|item| {
metrics.send_relay.inc_by(item.datagrams.contents.len() as _);
Ok(ClientToRelayMsg::Datagrams {
dst_endpoint_id: item.remote_endpoint,
datagrams: item.datagrams,
})
});
let mut packet_stream = n0_future::stream::iter(packet_iter);
let fut = client_sink.send_all(&mut packet_stream);
self.run_sending(fut, &mut state, &mut client_stream).await?;
}
msg = client_stream.next() => {
let Some(msg) = msg else {
break Err(e!(RunError::StreamClosedServer));
};
match msg {
Ok(msg) => {
self.handle_relay_msg(msg, &mut state);
ping_interval.reset();
},
Err(err) => break Err(e!(RunError::ClientStreamRead, err)),
}
}
_ = &mut self.inactive_timeout, if !self.is_home_relay => {
debug!("Inactive for {RELAY_INACTIVE_CLEANUP_TIME:?}, exiting (running).");
break Ok(());
}
}
};
if res.is_ok()
&& let Err(err) = client_sink.close().await
{
debug!("Failed to close client sink gracefully: {err:#}");
}
res.map_err(|err| state.map_err(err))
}
fn handle_relay_msg(&mut self, msg: RelayToClientMsg, state: &mut ConnectedRelayState) {
match msg {
RelayToClientMsg::Datagrams {
remote_endpoint_id,
datagrams,
} => {
trace!(len = datagrams.contents.len(), "received msg");
if state
.last_packet_src
.as_ref()
.map(|p| *p != remote_endpoint_id)
.unwrap_or(true)
{
state.last_packet_src = Some(remote_endpoint_id);
state.endpoints_present.insert(remote_endpoint_id);
}
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
url: self.url.clone(),
src: remote_endpoint_id,
datagrams,
}) {
warn!("Dropping received relay packet: {err:#}");
}
}
RelayToClientMsg::EndpointGone(endpoint_id) => {
state.endpoints_present.remove(&endpoint_id);
}
RelayToClientMsg::Ping(data) => state.pong_pending = Some(data),
RelayToClientMsg::Pong(data) => {
#[cfg(test)]
{
if let Some((expected_data, sender)) = state.test_pong.take() {
if data == expected_data {
sender.send(()).ok();
} else {
state.test_pong = Some((expected_data, sender));
}
}
}
state.ping_tracker.pong_received(data);
state.established = true;
}
RelayToClientMsg::Status(status) => match status {
Status::Healthy => info!("Relay server reports: {status}"),
_ => warn!("Relay server reports problem: {status}"),
},
RelayToClientMsg::Restarting { .. } => {
trace!("Ignoring {msg:?}")
}
RelayToClientMsg::Health { problem } => {
warn!("Relay server reports problem: {problem}");
}
_ => unreachable!(
"got unknown RelayToClientMsg but iroh is released in sync with iroh-relay"
),
}
}
#[instrument(name = "tx", skip_all)]
async fn run_sending<T>(
&mut self,
sending_fut: impl Future<Output = Result<T, RunError>>,
state: &mut ConnectedRelayState,
client_stream: &mut iroh_relay::client::ClientStream,
) -> Result<(), RelayConnectionError> {
let send_timeout = PING_INTERVAL;
let mut timeout = pin!(time::sleep(send_timeout));
let mut sending_fut = pin!(sending_fut);
let res = loop {
tokio::select! {
biased;
_ = self.stop_token.cancelled() => {
break Ok(());
}
_ = &mut timeout => {
break Err(e!(RunError::SendTimeout));
}
msg = self.prio_inbox.recv() => {
let Some(msg) = msg else {
warn!("Priority inbox closed, shutdown.");
break Ok(());
};
match msg {
ActiveRelayPrioMessage::HasEndpointRoute(peer, sender) => {
let has_peer = state.endpoints_present.contains(&peer);
sender.send(has_peer).ok();
}
}
}
res = &mut sending_fut => {
match res {
Ok(_) => break Ok(()),
Err(err) => break Err(err),
}
}
_ = state.ping_tracker.timeout() => {
break Err(e!(RunError::PingTimeout));
}
msg = client_stream.next() => {
let Some(msg) = msg else {
break Err(e!(RunError::StreamClosedServer));
};
match msg {
Ok(msg) => self.handle_relay_msg(msg, state),
Err(err) => break Err(e!(RunError::ClientStreamRead, err)),
}
}
_ = &mut self.inactive_timeout, if !self.is_home_relay => {
debug!("Inactive for {RELAY_INACTIVE_CLEANUP_TIME:?}, exiting (sending).");
break Ok(());
}
}
};
res.map_err(|err| state.map_err(err))
}
}
#[derive(Debug)]
struct ConnectedRelayState {
ping_tracker: PingTracker,
endpoints_present: BTreeSet<EndpointId>,
last_packet_src: Option<EndpointId>,
pong_pending: Option<[u8; 8]>,
established: bool,
#[cfg(test)]
test_pong: Option<([u8; 8], oneshot::Sender<()>)>,
}
impl ConnectedRelayState {
fn map_err(&self, error: RunError) -> RelayConnectionError {
if self.established {
e!(RelayConnectionError::Established, error)
} else {
e!(RelayConnectionError::Handshake, error)
}
}
}
pub(super) enum RelayActorMessage {
MaybeCloseRelaysOnRebind,
NetworkChange {
report: Report,
},
CheckConnectionAfterNetworkChange,
}
#[derive(Debug, Clone)]
pub(crate) struct RelaySendItem {
pub(crate) remote_endpoint: EndpointId,
pub(crate) url: RelayUrl,
pub(crate) datagrams: Datagrams,
}
pub(super) struct RelayActor {
config: Config,
relay_datagram_recv_queue: mpsc::Sender<RelayRecvDatagram>,
active_relays: BTreeMap<RelayUrl, ActiveRelayHandle>,
active_relay_tasks: JoinSet<()>,
cancel_token: CancellationToken,
}
#[derive(Debug, Clone)]
pub(crate) struct Config {
pub my_relay: HomeRelayWatch,
pub secret_key: SecretKey,
#[cfg(not(wasm_browser))]
pub dns_resolver: DnsResolver,
pub proxy_url: Option<Url>,
pub ipv6_reported: Arc<AtomicBool>,
pub tls_config: rustls::ClientConfig,
pub metrics: Arc<SocketMetrics>,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub(crate) enum HomeRelayStatus {
Connecting,
Connected,
Disconnected,
}
impl HomeRelayStatus {
pub(crate) fn is_connected(&self) -> bool {
matches!(self, Self::Connected)
}
}
#[derive(Debug, Clone)]
pub(crate) struct HomeRelayWatch {
inner: Watchable<Option<(RelayUrl, HomeRelayStatus)>>,
}
impl Default for HomeRelayWatch {
fn default() -> Self {
Self {
inner: Watchable::new(None),
}
}
}
impl HomeRelayWatch {
fn set(&self, url: RelayUrl, status: HomeRelayStatus) {
let _ = self.inner.set(Some((url, status)));
}
fn clear(&self) {
let _ = self.inner.set(None);
}
fn set_status(&self, url: &RelayUrl, status: HomeRelayStatus) {
if self.inner.get().as_ref().map(|p| &p.0) == Some(url) {
let _ = self.inner.set(Some((url.clone(), status)));
}
}
fn get(&self) -> Option<(RelayUrl, HomeRelayStatus)> {
self.inner.get()
}
pub(crate) fn watch(&self) -> n0_watcher::Direct<Option<(RelayUrl, HomeRelayStatus)>> {
self.inner.watch()
}
}
impl RelayActor {
pub(super) fn new(
config: Config,
relay_datagram_recv_queue: mpsc::Sender<RelayRecvDatagram>,
cancel_token: CancellationToken,
) -> Self {
Self {
config,
relay_datagram_recv_queue,
active_relays: Default::default(),
active_relay_tasks: JoinSet::new(),
cancel_token,
}
}
pub(super) async fn run(
mut self,
mut receiver: mpsc::Receiver<RelayActorMessage>,
mut datagram_send_channel: mpsc::Receiver<RelaySendItem>,
) {
let mut datagram_send_fut = std::pin::pin!(MaybeFuture::none());
loop {
tokio::select! {
biased;
_ = self.cancel_token.cancelled() => {
debug!("shutting down");
break;
}
Some(res) = self.active_relay_tasks.join_next() => {
match res {
Ok(()) => (),
Err(err) if err.is_panic() => {
error!("ActiveRelayActor task panicked: {err:#?}");
}
Err(err) if err.is_cancelled() => {
error!("ActiveRelayActor cancelled: {err:#?}");
}
Err(err) => error!("ActiveRelayActor failed: {err:#?}"),
}
self.reap_active_relays();
}
msg = receiver.recv() => {
let Some(msg) = msg else {
debug!("Inbox dropped, shutting down.");
break;
};
let cancel_token = self.cancel_token.child_token();
cancel_token.run_until_cancelled(self.handle_msg(msg)).await;
}
item = datagram_send_channel.recv(), if datagram_send_fut.is_none() => {
let Some(item) = item else {
debug!("Datagram send channel dropped, shutting down.");
break;
};
let token = self.cancel_token.child_token();
if let Some(Some(fut)) = token.run_until_cancelled(
self.try_send_datagram(item)
).await {
datagram_send_fut.as_mut().set_future(fut);
}
}
_ = &mut datagram_send_fut, if datagram_send_fut.is_some() => {
datagram_send_fut.as_mut().set_none();
}
}
}
if time::timeout(Duration::from_secs(3), self.close_all_active_relays())
.await
.is_err()
{
warn!("Failed to shut down all ActiveRelayActors");
}
}
async fn handle_msg(&mut self, msg: RelayActorMessage) {
match msg {
RelayActorMessage::NetworkChange { report } => {
self.on_network_change(report).await;
}
RelayActorMessage::MaybeCloseRelaysOnRebind => {
self.maybe_close_relays_on_rebind().await;
}
RelayActorMessage::CheckConnectionAfterNetworkChange => {
self.check_connection_after_network_change().await;
}
}
}
async fn try_send_datagram(
&mut self,
item: RelaySendItem,
) -> Option<impl Future<Output = ()> + use<>> {
let url = item.url.clone();
let handle = self
.active_relay_handle_for_endpoint(&item.url, &item.remote_endpoint)
.await;
match handle.datagrams_send_queue.try_send(item) {
Ok(()) => None,
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
None
}
Err(mpsc::error::TrySendError::Full(item)) => {
let sender = handle.datagrams_send_queue.clone();
let fut = async move {
if sender.send(item).await.is_err() {
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
}
};
Some(fut)
}
}
}
async fn on_network_change(&mut self, report: Report) {
let prev = self.config.my_relay.get();
let prev_url = prev.as_ref().map(|p| &p.0);
if report.preferred_relay.as_ref() == prev_url {
return;
}
if let Some(relay_url) = report.preferred_relay {
self.config.metrics.relay_home_change.inc();
info!("home is now relay {}, was {:?}", relay_url, prev_url);
self.config
.my_relay
.set(relay_url.clone(), HomeRelayStatus::Connecting);
self.set_home_relay(relay_url).await;
} else {
self.config.my_relay.clear();
}
}
async fn set_home_relay(&mut self, home_url: RelayUrl) {
let home_url_ref = &home_url;
n0_future::join_all(self.active_relays.iter().map(|(url, handle)| async move {
let is_preferred = url == home_url_ref;
handle
.inbox_addr
.send(ActiveRelayMessage::SetHomeRelay(is_preferred))
.await
.ok()
}))
.await;
self.active_relay_handle(home_url);
}
async fn active_relay_handle_for_endpoint(
&mut self,
url: &RelayUrl,
remote_endpoint: &EndpointId,
) -> ActiveRelayHandle {
if let Some(handle) = self.active_relays.get(url) {
return handle.clone();
}
let mut found_relay: Option<RelayUrl> = None;
{
let check_futs = self.active_relays.iter().map(|(url, handle)| async move {
let (tx, rx) = oneshot::channel();
handle
.prio_inbox_addr
.send(ActiveRelayPrioMessage::HasEndpointRoute(
*remote_endpoint,
tx,
))
.await
.ok();
match rx.await {
Ok(true) => Some(url.clone()),
_ => None,
}
});
let mut futures = FuturesUnorderedBounded::from_iter(check_futs);
while let Some(maybe_url) = futures.next().await {
if maybe_url.is_some() {
found_relay = maybe_url;
break;
}
}
}
let url = found_relay.unwrap_or(url.clone());
self.active_relay_handle(url)
}
fn active_relay_handle(&mut self, url: RelayUrl) -> ActiveRelayHandle {
match self.active_relays.get(&url) {
Some(e) => e.clone(),
None => {
let handle = self.start_active_relay(url.clone());
if Some(&url) == self.config.my_relay.get().as_ref().map(|p| &p.0)
&& let Err(err) = handle
.inbox_addr
.try_send(ActiveRelayMessage::SetHomeRelay(true))
{
error!("Home relay not set, send to new actor failed: {err:#}.");
}
self.active_relays.insert(url, handle.clone());
self.log_active_relay();
handle
}
}
}
fn start_active_relay(&mut self, url: RelayUrl) -> ActiveRelayHandle {
debug!(?url, "Adding relay connection");
let connection_opts = RelayConnectionOptions {
secret_key: self.config.secret_key.clone(),
#[cfg(not(wasm_browser))]
dns_resolver: self.config.dns_resolver.clone(),
proxy_url: self.config.proxy_url.clone(),
prefer_ipv6: self.config.ipv6_reported.clone(),
tls_config: self.config.tls_config.clone(),
};
let (send_datagram_tx, send_datagram_rx) = mpsc::channel(64);
let (prio_inbox_tx, prio_inbox_rx) = mpsc::channel(32);
let (inbox_tx, inbox_rx) = mpsc::channel(64);
let span = info_span!("active-relay", %url);
let opts = ActiveRelayActorOptions {
url,
prio_inbox_: prio_inbox_rx,
inbox: inbox_rx,
relay_datagrams_send: send_datagram_rx,
relay_datagrams_recv: self.relay_datagram_recv_queue.clone(),
connection_opts,
stop_token: self.cancel_token.child_token(),
metrics: self.config.metrics.clone(),
my_relay: self.config.my_relay.clone(),
};
let actor = ActiveRelayActor::new(opts);
self.active_relay_tasks.spawn(
async move {
actor.run().await;
}
.instrument(span),
);
let handle = ActiveRelayHandle {
prio_inbox_addr: prio_inbox_tx,
inbox_addr: inbox_tx,
datagrams_send_queue: send_datagram_tx,
};
self.log_active_relay();
handle
}
async fn check_connection_after_network_change(&mut self) {
self.send_check_connection().await;
}
async fn maybe_close_relays_on_rebind(&mut self) {
self.send_check_connection().await;
self.log_active_relay();
}
async fn send_check_connection(&self) {
#[cfg(not(wasm_browser))]
let ifs = interfaces::State::new().await;
#[cfg(not(wasm_browser))]
let local_ips: Vec<_> = ifs
.interfaces
.values()
.flat_map(|netif| netif.addrs())
.map(|ipnet| ipnet.addr())
.collect();
#[cfg(wasm_browser)]
let local_ips = Vec::new();
let send_futs = self.active_relays.values().map(|handle| {
let local_ips = local_ips.clone();
async move {
handle
.inbox_addr
.send(ActiveRelayMessage::CheckConnection { local_ips })
.await
.ok();
}
});
n0_future::join_all(send_futs).await;
}
fn reap_active_relays(&mut self) {
self.active_relays
.retain(|_url, handle| !handle.inbox_addr.is_closed());
if let Some(url) = self.config.my_relay.get() {
self.active_relay_handle(url.0);
}
self.log_active_relay();
}
async fn close_all_active_relays(&mut self) {
self.cancel_token.cancel();
let tasks = std::mem::take(&mut self.active_relay_tasks);
tasks.join_all().await;
self.log_active_relay();
}
fn log_active_relay(&self) {
debug!("{} active relay conns{}", self.active_relays.len(), {
let mut s = String::new();
if !self.active_relays.is_empty() {
s += ":";
for endpoint in self.active_relay_sorted() {
s += &format!(" relay-{endpoint}");
}
}
s
});
}
fn active_relay_sorted(&self) -> impl Iterator<Item = RelayUrl> + use<> {
let mut ids: Vec<_> = self.active_relays.keys().cloned().collect();
ids.sort();
ids.into_iter()
}
}
#[derive(Debug, Clone)]
struct ActiveRelayHandle {
prio_inbox_addr: mpsc::Sender<ActiveRelayPrioMessage>,
inbox_addr: mpsc::Sender<ActiveRelayMessage>,
datagrams_send_queue: mpsc::Sender<RelaySendItem>,
}
#[derive(Debug)]
pub(crate) struct RelayRecvDatagram {
pub(crate) url: RelayUrl,
pub(crate) src: EndpointId,
pub(crate) datagrams: Datagrams,
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use iroh_base::{EndpointId, RelayUrl, SecretKey};
use iroh_relay::{
PingTracker,
protos::relay::Datagrams,
tls::{CaRootsConfig, default_provider},
};
use n0_error::{AnyError as Error, Result, StackResultExt, StdResultExt};
use n0_tracing_test::traced_test;
use tokio::sync::{mpsc, oneshot};
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
use tracing::{Instrument, info, info_span};
use super::{
ActiveRelayActor, ActiveRelayActorOptions, ActiveRelayMessage, ActiveRelayPrioMessage,
RELAY_INACTIVE_CLEANUP_TIME, RelayConnectionOptions, RelayRecvDatagram, RelaySendItem,
UNDELIVERABLE_DATAGRAM_TIMEOUT,
};
use crate::{dns::DnsResolver, test_utils};
#[allow(clippy::too_many_arguments)]
fn start_active_relay_actor(
secret_key: SecretKey,
stop_token: CancellationToken,
url: RelayUrl,
prio_inbox_rx: mpsc::Receiver<ActiveRelayPrioMessage>,
inbox_rx: mpsc::Receiver<ActiveRelayMessage>,
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
relay_datagrams_recv: mpsc::Sender<RelayRecvDatagram>,
span: tracing::Span,
) -> AbortOnDropHandle<()> {
let opts = ActiveRelayActorOptions {
url,
prio_inbox_: prio_inbox_rx,
inbox: inbox_rx,
relay_datagrams_send,
relay_datagrams_recv,
connection_opts: RelayConnectionOptions {
secret_key,
dns_resolver: DnsResolver::new(),
proxy_url: None,
prefer_ipv6: Arc::new(AtomicBool::new(true)),
tls_config: CaRootsConfig::insecure_skip_verify()
.client_config(default_provider())
.expect("infallible"),
},
stop_token,
metrics: Default::default(),
my_relay: Default::default(),
};
let task = tokio::spawn(ActiveRelayActor::new(opts).run().instrument(span));
AbortOnDropHandle::new(task)
}
fn start_echo_endpoint(relay_url: RelayUrl) -> (EndpointId, AbortOnDropHandle<()>) {
let secret_key = SecretKey::from_bytes(&[8u8; 32]);
let (recv_datagram_tx, mut recv_datagram_rx) = mpsc::channel(16);
let (send_datagram_tx, send_datagram_rx) = mpsc::channel(16);
let (prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8);
let (inbox_tx, inbox_rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let actor_task = start_active_relay_actor(
secret_key.clone(),
cancel_token.clone(),
relay_url.clone(),
prio_inbox_rx,
inbox_rx,
send_datagram_rx,
recv_datagram_tx,
info_span!("echo-endpoint"),
);
let echo_task = tokio::spawn({
let relay_url = relay_url.clone();
async move {
loop {
let datagram = recv_datagram_rx.recv().await;
if let Some(recv) = datagram {
let RelayRecvDatagram {
url: _,
src,
datagrams,
} = recv;
info!(from = %src.fmt_short(), "Received datagram");
let send = RelaySendItem {
remote_endpoint: src,
url: relay_url.clone(),
datagrams,
};
send_datagram_tx.send(send).await.ok();
}
}
}
.instrument(info_span!("echo-task"))
});
let echo_task = AbortOnDropHandle::new(echo_task);
let supervisor_task = tokio::spawn(async move {
let _guard = cancel_token.drop_guard();
let _prio_inbox_tx = prio_inbox_tx;
let _inbox_tx = inbox_tx;
tokio::select! {
biased;
_ = actor_task => (),
_ = echo_task => (),
};
});
let supervisor_task = AbortOnDropHandle::new(supervisor_task);
(secret_key.public(), supervisor_task)
}
async fn send_recv_echo(
item: RelaySendItem,
tx: &mpsc::Sender<RelaySendItem>,
rx: &mut mpsc::Receiver<RelayRecvDatagram>,
) -> Result<()> {
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
let res = tokio::time::timeout(UNDELIVERABLE_DATAGRAM_TIMEOUT, async {
tx.send(item.clone()).await.std_context("send item")?;
let RelayRecvDatagram {
url: _,
src: _,
datagrams,
} = rx.recv().await.unwrap();
assert_eq!(datagrams, item.datagrams);
Ok::<_, Error>(())
})
.await;
if res.is_ok() {
break;
}
}
})
.await
.expect("overall timeout exceeded");
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_active_relay_reconnect() -> Result {
let (_relay_map, relay_url, _server) = test_utils::run_relay_server().await?;
let (peer_endpoint, _echo_endpoint_task) = start_echo_endpoint(relay_url.clone());
let secret_key = SecretKey::from_bytes(&[1u8; 32]);
let (datagram_recv_tx, mut datagram_recv_rx) = mpsc::channel(16);
let (send_datagram_tx, send_datagram_rx) = mpsc::channel(16);
let (_prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8);
let (inbox_tx, inbox_rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let task = start_active_relay_actor(
secret_key,
cancel_token.clone(),
relay_url.clone(),
prio_inbox_rx,
inbox_rx,
send_datagram_rx,
datagram_recv_tx.clone(),
info_span!("actor-under-test"),
);
info!("first echo");
let hello_send_item = RelaySendItem {
remote_endpoint: peer_endpoint,
url: relay_url.clone(),
datagrams: Datagrams::from(b"hello"),
};
send_recv_echo(
hello_send_item.clone(),
&send_datagram_tx,
&mut datagram_recv_rx,
)
.await?;
let (tx, rx) = oneshot::channel();
inbox_tx
.send(ActiveRelayMessage::GetLocalAddr(tx))
.await
.std_context("send get local addr msg")?;
let local_addr = rx
.await
.std_context("wait for local addr msg")?
.context("no local addr")?;
info!(?local_addr, "check connection with addr");
inbox_tx
.send(ActiveRelayMessage::CheckConnection {
local_ips: vec![local_addr.ip()],
})
.await
.std_context("send check connection message")?;
let (tx, rx) = oneshot::channel();
inbox_tx
.send(ActiveRelayMessage::GetLocalAddr(tx))
.await
.std_context("send get local addr msg")?;
rx.await.std_context("recv send local addr msg")?;
info!("second echo");
send_recv_echo(
hello_send_item.clone(),
&send_datagram_tx,
&mut datagram_recv_rx,
)
.await?;
info!("check connection");
inbox_tx
.send(ActiveRelayMessage::CheckConnection {
local_ips: Vec::new(),
})
.await
.std_context("send check connection msg")?;
tokio::time::sleep(Duration::from_millis(10)).await;
info!("third echo");
send_recv_echo(
hello_send_item.clone(),
&send_datagram_tx,
&mut datagram_recv_rx,
)
.await?;
cancel_token.cancel();
task.await.std_context("wait for task to finish")?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_active_relay_inactive() -> Result {
let (_relay_map, relay_url, _server) = test_utils::run_relay_server().await?;
let secret_key = SecretKey::from_bytes(&[1u8; 32]);
let (datagram_recv_tx, _datagram_recv_rx) = mpsc::channel(16);
let (_send_datagram_tx, send_datagram_rx) = mpsc::channel(16);
let (_prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8);
let (inbox_tx, inbox_rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let mut task = start_active_relay_actor(
secret_key,
cancel_token.clone(),
relay_url,
prio_inbox_rx,
inbox_rx,
send_datagram_rx,
datagram_recv_tx,
info_span!("actor-under-test"),
);
tokio::time::timeout(Duration::from_secs(5), async {
loop {
let (tx, rx) = oneshot::channel();
inbox_tx.send(ActiveRelayMessage::PingServer(tx)).await.ok();
if tokio::time::timeout(Duration::from_millis(200), rx)
.await
.map(|resp| resp.is_ok())
.unwrap_or_default()
{
break;
}
}
})
.await
.std_context("timeout")?;
info!("Stepping time forwards by RELAY_INACTIVE_CLEANUP_TIME / 2");
tokio::time::pause();
tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME / 2).await;
tokio::time::resume();
assert!(
tokio::time::timeout(Duration::from_millis(100), &mut task)
.await
.is_err(),
"actor task terminated"
);
info!("Stepping time forwards by RELAY_INACTIVE_CLEANUP_TIME");
tokio::time::pause();
tokio::time::advance(RELAY_INACTIVE_CLEANUP_TIME).await;
tokio::time::resume();
assert!(
tokio::time::timeout(Duration::from_secs(1), task)
.await
.is_ok(),
"actor task still running"
);
cancel_token.cancel();
Ok(())
}
#[tokio::test]
async fn test_ping_tracker() {
tokio::time::pause();
let mut tracker = PingTracker::default();
let ping0 = tracker.new_ping();
let res = tokio::time::timeout(Duration::from_secs(1), tracker.timeout()).await;
assert!(res.is_err(), "no ping timeout has elapsed yet");
tracker.pong_received(ping0);
let res = tokio::time::timeout(Duration::from_secs(10), tracker.timeout()).await;
assert!(res.is_err(), "ping completed before timeout");
let _ping1 = tracker.new_ping();
let res = tokio::time::timeout(Duration::from_secs(10), tracker.timeout()).await;
assert!(res.is_ok(), "ping timeout should have happened");
let _ping2 = tracker.new_ping();
tokio::time::sleep(Duration::from_secs(10)).await;
let res = tokio::time::timeout(Duration::from_millis(1), tracker.timeout()).await;
assert!(res.is_ok(), "ping timeout happened in the past");
let res = tokio::time::timeout(Duration::from_secs(10), tracker.timeout()).await;
assert!(res.is_err(), "ping timeout should only happen once");
}
#[test]
fn test_home_relay_watch_url_guard() {
use super::{HomeRelayStatus::*, HomeRelayWatch};
let watch = HomeRelayWatch::default();
let a: RelayUrl = "https://a.example.com".parse().unwrap();
let b: RelayUrl = "https://b.example.com".parse().unwrap();
watch.set(a.clone(), Connecting);
watch.set_status(&a, Connected);
assert_eq!(watch.get(), Some((a.clone(), Connected)));
watch.set(b.clone(), Connecting);
watch.set_status(&a, Disconnected);
assert_eq!(watch.get(), Some((b.clone(), Connecting)));
watch.set_status(&b, Connected);
assert_eq!(watch.get(), Some((b.clone(), Connected)));
}
}