use std::sync::Arc;
use std::time::Duration;
use kameo::actor::ActorRef;
use kameo::message::Message;
use ts_netmon::{LinkMonitor, LinkMonitorHandle};
use crate::derp_latency::MeasureNow;
use crate::direct::{DirectManager, RebindAndReprobe};
use crate::{Env, Error};
const MIN_REBIND_INTERVAL: Duration = Duration::from_secs(1);
pub struct NetmonSupervisor {
_handle: LinkMonitorHandle,
}
pub struct NetmonSupervisorArgs {
pub monitor: Arc<dyn LinkMonitor>,
pub direct: ActorRef<DirectManager>,
pub env: Env,
}
impl kameo::Actor for NetmonSupervisor {
type Args = NetmonSupervisorArgs;
type Error = Error;
async fn on_start(args: Self::Args, _slf: ActorRef<Self>) -> Result<Self, Self::Error> {
let NetmonSupervisorArgs {
monitor,
direct,
env,
} = args;
let (mut events, handle) = match monitor.watch(env.shutdown.clone()) {
Ok(pair) => pair,
Err(e) => {
tracing::error!(error = %e, "network monitor failed to start; link-change auto-rebind disabled");
let (mut events, handle) = ts_netmon::NoopLinkMonitor
.watch(env.shutdown.clone())
.expect("noop monitor watch is infallible");
let loop_env = env.clone();
tokio::spawn(async move {
run(&mut events, &direct, &loop_env).await;
});
return Ok(Self { _handle: handle });
}
};
tracing::debug!("network-monitor supervisor running");
tokio::spawn(async move {
run(&mut events, &direct, &env).await;
});
Ok(Self { _handle: handle })
}
}
async fn run<A>(
events: &mut tokio::sync::mpsc::Receiver<ts_netmon::LinkChangeEvent>,
direct: &ActorRef<A>,
env: &Env,
) where
A: kameo::Actor + Message<RebindAndReprobe, Reply = Result<(), ts_magicsock::Error>>,
{
async fn react<A>(direct: &ActorRef<A>, env: &Env) -> tokio::time::Instant
where
A: kameo::Actor + Message<RebindAndReprobe, Reply = Result<(), ts_magicsock::Error>>,
{
tracing::debug!("link change: rebinding + re-probing connectivity");
if let Err(e) = direct.ask(RebindAndReprobe).await {
tracing::warn!(error = %e, "rebind-and-reprobe on link change");
}
let done = tokio::time::Instant::now();
if let Err(e) = env.publish(MeasureNow).await {
tracing::warn!(error = %e, "publishing MeasureNow on link change");
}
done
}
let mut shutdown = env.shutdown.clone();
let mut last_rebind: Option<tokio::time::Instant> = None;
let mut pending_event = false;
let timer = tokio::time::sleep(Duration::from_secs(0));
tokio::pin!(timer);
timer.as_mut().await;
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
event = events.recv() => {
match event {
Some(_link_change) => {
if let Some(prev) = last_rebind
&& prev.elapsed() < MIN_REBIND_INTERVAL
{
if !pending_event {
pending_event = true;
timer.as_mut().reset(prev + MIN_REBIND_INTERVAL);
tracing::trace!("link change within min-rebind interval; deferring");
} else {
tracing::trace!("link change within min-rebind interval; already deferred");
}
continue;
}
pending_event = false;
last_rebind = Some(react(direct, env).await);
}
None => {
tracing::trace!("link-change event stream closed; supervisor loop exiting");
break;
}
}
}
_ = &mut timer, if pending_event => {
pending_event = false;
last_rebind = Some(react(direct, env).await);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use kameo::actor::Spawn;
use kameo::message::{Context, Message};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::watch;
use ts_netmon::ManualLinkMonitor;
fn test_env(shutdown_rx: watch::Receiver<bool>) -> Env {
Env::new(
ts_keys::NodeState::generate(),
shutdown_rx,
crate::env::ForwarderConfig {
accept_routes: false,
accept_dns: true,
exit_node: None,
forward_routes: vec![],
forward_tcp_ports: vec![],
forward_udp_ports: vec![],
forward_all_ports: false,
forward_exit_egress: false,
block_incoming: false,
exit_proxy: None,
peerapi_port: None,
taildrop_dir: None,
enable_ipv6: false,
wireguard_listen_port: None,
network_monitor: true,
persistent_keepalive_interval: None,
ingress_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
)
}
struct RebindCounter {
count: Arc<AtomicUsize>,
}
impl kameo::Actor for RebindCounter {
type Args = Arc<AtomicUsize>;
type Error = Error;
async fn on_start(count: Self::Args, _s: ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(Self { count })
}
}
impl Message<RebindAndReprobe> for RebindCounter {
type Reply = Result<(), ts_magicsock::Error>;
async fn handle(
&mut self,
_m: RebindAndReprobe,
_c: &mut Context<Self, Self::Reply>,
) -> Self::Reply {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct MeasureNowTap {
count: Arc<AtomicUsize>,
}
impl kameo::Actor for MeasureNowTap {
type Args = Arc<AtomicUsize>;
type Error = Error;
async fn on_start(count: Self::Args, _s: ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(Self { count })
}
}
impl Message<MeasureNow> for MeasureNowTap {
type Reply = ();
async fn handle(&mut self, _m: MeasureNow, _c: &mut Context<Self, Self::Reply>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
async fn wait_until(counter: &AtomicUsize, want: usize, what: &str) {
for _ in 0..300 {
if counter.load(Ordering::SeqCst) >= want {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!(
"timed out waiting for {what}: got {} want {want}",
counter.load(Ordering::SeqCst)
);
}
#[tokio::test]
async fn one_link_event_fires_one_rebind_and_one_measure_now() {
let (_sd_tx, sd_rx) = watch::channel(false);
let env = test_env(sd_rx);
let rebinds = Arc::new(AtomicUsize::new(0));
let measures = Arc::new(AtomicUsize::new(0));
let direct = RebindCounter::spawn(rebinds.clone());
let tap = MeasureNowTap::spawn(measures.clone());
env.subscribe::<MeasureNow>(&tap).await.unwrap();
let (monitor, trigger) = ManualLinkMonitor::with_settle(Duration::from_millis(50));
let (mut events, _handle) = monitor.watch(env.shutdown.clone()).unwrap();
let loop_env = env.clone();
let loop_task = tokio::spawn(async move { run(&mut events, &direct, &loop_env).await });
for _ in 0..4 {
assert!(trigger.trigger());
tokio::time::sleep(Duration::from_millis(5)).await;
}
wait_until(&rebinds, 1, "one RebindAndReprobe").await;
wait_until(&measures, 1, "one MeasureNow").await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(rebinds.load(Ordering::SeqCst), 1, "exactly one rebind");
assert_eq!(
measures.load(Ordering::SeqCst),
1,
"exactly one measure-now"
);
drop(trigger);
drop(tokio::time::timeout(Duration::from_secs(1), loop_task).await);
}
#[tokio::test]
async fn min_interval_defers_within_interval_event() {
let (_sd_tx, sd_rx) = watch::channel(false);
let env = test_env(sd_rx);
let rebinds = Arc::new(AtomicUsize::new(0));
let direct = RebindCounter::spawn(rebinds.clone());
let (monitor, trigger) = ManualLinkMonitor::with_settle(Duration::from_millis(30));
let (mut events, _handle) = monitor.watch(env.shutdown.clone()).unwrap();
let loop_env = env.clone();
let loop_task = tokio::spawn(async move { run(&mut events, &direct, &loop_env).await });
trigger.trigger();
wait_until(&rebinds, 1, "first rebind").await;
trigger.trigger();
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
rebinds.load(Ordering::SeqCst),
1,
"the within-interval event must be deferred, not run immediately"
);
wait_until(
&rebinds,
2,
"deferred rebind fires after the interval elapses",
)
.await;
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(
rebinds.load(Ordering::SeqCst),
2,
"exactly one deferred rebind for the within-interval event (no extra)"
);
drop(trigger);
drop(tokio::time::timeout(Duration::from_secs(1), loop_task).await);
}
#[tokio::test]
async fn min_interval_coalesces_multiple_deferred_into_one() {
let (_sd_tx, sd_rx) = watch::channel(false);
let env = test_env(sd_rx);
let rebinds = Arc::new(AtomicUsize::new(0));
let direct = RebindCounter::spawn(rebinds.clone());
let (monitor, trigger) = ManualLinkMonitor::with_settle(Duration::from_millis(30));
let (mut events, _handle) = monitor.watch(env.shutdown.clone()).unwrap();
let loop_env = env.clone();
let loop_task = tokio::spawn(async move { run(&mut events, &direct, &loop_env).await });
trigger.trigger();
wait_until(&rebinds, 1, "first rebind").await;
for _ in 0..3 {
trigger.trigger();
tokio::time::sleep(Duration::from_millis(70)).await;
}
assert_eq!(
rebinds.load(Ordering::SeqCst),
1,
"within-interval events must not each trigger a rebind"
);
wait_until(&rebinds, 2, "one coalesced deferred rebind").await;
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(
rebinds.load(Ordering::SeqCst),
2,
"three within-interval events coalesce to exactly one deferred rebind"
);
drop(trigger);
drop(tokio::time::timeout(Duration::from_secs(1), loop_task).await);
}
#[tokio::test]
async fn shutdown_ends_loop() {
let (sd_tx, sd_rx) = watch::channel(false);
let env = test_env(sd_rx);
let rebinds = Arc::new(AtomicUsize::new(0));
let direct = RebindCounter::spawn(rebinds.clone());
let (monitor, _trigger) = ManualLinkMonitor::new();
let (mut events, _handle) = monitor.watch(env.shutdown.clone()).unwrap();
let loop_env = env.clone();
let loop_task = tokio::spawn(async move { run(&mut events, &direct, &loop_env).await });
sd_tx.send(true).unwrap();
tokio::time::timeout(Duration::from_secs(1), loop_task)
.await
.expect("loop must end on shutdown")
.expect("loop task joins");
}
#[tokio::test]
async fn supervisor_reacts_end_to_end_with_manual_monitor() {
let (_sd_tx, sd_rx) = watch::channel(false);
let env = test_env(sd_rx);
let dataplane = crate::dataplane::DataplaneActor::spawn(env.clone());
let (_home_tx, home_rx) = watch::channel(None);
let multiderp =
crate::multiderp::Multiderp::spawn((env.clone(), dataplane.clone(), home_rx));
let direct = crate::direct::DirectManager::spawn((
env.clone(),
dataplane.clone(),
multiderp.clone(),
));
let sock_before = direct
.ask(crate::direct::SockHandle)
.await
.expect("direct manager up");
let port_before = sock_before.as_ref().map(|s| {
s.local_addr()
.expect("bound socket has a local addr")
.port()
});
let measures = Arc::new(AtomicUsize::new(0));
let tap = MeasureNowTap::spawn(measures.clone());
env.subscribe::<MeasureNow>(&tap).await.unwrap();
let (monitor, trigger) = ManualLinkMonitor::with_settle(Duration::from_millis(50));
let monitor: Arc<dyn ts_netmon::LinkMonitor> = Arc::new(monitor);
let _supervisor = NetmonSupervisor::spawn(NetmonSupervisorArgs {
monitor,
direct: direct.clone(),
env: env.clone(),
});
for _ in 0..4 {
assert!(trigger.trigger());
tokio::time::sleep(Duration::from_millis(5)).await;
}
wait_until(&measures, 1, "MeasureNow published end-to-end").await;
let sock_after = direct
.ask(crate::direct::SockHandle)
.await
.expect("direct manager up");
let port_after = sock_after.as_ref().map(|s| {
s.local_addr()
.expect("bound socket has a local addr")
.port()
});
assert!(
port_before.is_some() && port_after.is_some(),
"the underlay socket must be bound before and after (not inert)"
);
assert_ne!(
port_before, port_after,
"RebindAndReprobe must have rebound the underlay socket to a new ephemeral port"
);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
measures.load(Ordering::SeqCst),
1,
"exactly one MeasureNow for one coalesced link event"
);
drop(trigger);
}
}