use core::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use kameo::{
actor::ActorRef,
message::{Context, Message},
};
use tokio::sync::watch;
use ts_bart::RoutingTable;
use ts_overlay_router::{
inbound::RouteAction as InboundRouteAction, outbound::RouteAction as OutboundRouteAction,
};
use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
use crate::{
Error,
direct::{self, DirectManager},
env::Env,
multiderp,
multiderp::Multiderp,
peer_tracker::PeerState,
};
const RECOMPUTE_INTERVAL: Duration = Duration::from_secs(2);
pub struct RouteUpdater {
multiderp: ActorRef<Multiderp>,
direct: ActorRef<DirectManager>,
direct_tid: Option<UnderlayTransportId>,
default_overlay_transport: OverlayTransportId,
forwarder_overlay_transport: OverlayTransportId,
env: Env,
last_peer_state: Option<Arc<PeerState>>,
last_underlay: HashMap<PeerId, UnderlayTransportId>,
last_exit_node_id: Option<ts_control::StableNodeId>,
active_exit_tx: watch::Sender<Option<ts_control::StableNodeId>>,
}
#[derive(Clone)]
struct RecomputeRoutes;
impl kameo::Actor for RouteUpdater {
type Args = (
ActorRef<Multiderp>,
ActorRef<DirectManager>,
Env,
OverlayTransportId,
OverlayTransportId,
watch::Sender<Option<ts_control::StableNodeId>>,
);
type Error = Error;
async fn on_start(
(multiderp, direct, env, default_transport, forwarder_transport, active_exit_tx): Self::Args,
actor_ref: ActorRef<Self>,
) -> Result<Self, Self::Error> {
env.subscribe::<Arc<PeerState>>(&actor_ref).await?;
env.subscribe::<Arc<ts_control::StateUpdate>>(&actor_ref)
.await?;
let direct_tid = match direct.ask(direct::DirectTransportId).await {
Ok(tid) => tid,
Err(e) => {
tracing::error!(error = %e, "direct transport id unavailable at startup, staying on derp");
None
}
};
let weak = actor_ref.downgrade();
tokio::spawn(async move {
let mut interval = tokio::time::interval(RECOMPUTE_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let Some(aref) = weak.upgrade() else {
break;
};
if aref.tell(RecomputeRoutes).await.is_err() {
break;
}
}
});
Ok(Self {
multiderp,
direct,
direct_tid,
default_overlay_transport: default_transport,
forwarder_overlay_transport: forwarder_transport,
env,
last_peer_state: None,
last_underlay: HashMap::default(),
last_exit_node_id: None,
active_exit_tx,
})
}
}
#[derive(Clone)]
pub struct SelfRouteUpdate {
pub overlay_in_routes: Arc<ts_bart::Table<InboundRouteAction>>,
}
#[derive(Clone)]
pub struct ActiveExitNode {
pub node: Option<Arc<ts_control::Node>>,
}
#[derive(Clone)]
pub struct PeerRouteUpdate {
pub inner: Arc<PeerRoutesInner>,
}
pub struct PeerRoutesInner {
pub underlay_routes: HashMap<PeerId, UnderlayTransportId>,
pub overlay_out_routes: ts_bart::Table<OutboundRouteAction>,
}
fn overlay_direct(
mut derp_underlay: HashMap<PeerId, UnderlayTransportId>,
direct_ready: &HashSet<PeerId>,
direct_tid: UnderlayTransportId,
) -> HashMap<PeerId, UnderlayTransportId> {
for id in direct_ready {
derp_underlay.insert(*id, direct_tid);
}
derp_underlay
}
impl RouteUpdater {
async fn rebuild_and_publish(&mut self, force: bool) {
let Some(state) = self.last_peer_state.clone() else {
return;
};
let mut overlay_out = ts_bart::Table::default();
let mut derp_underlay = HashMap::default();
let mut peer_ids = Vec::new();
let exit_node_selector = self.env.exit_node();
let exit_node_id = exit_node_selector
.as_ref()
.and_then(|sel| sel.resolve(state.peers.peers().values()));
let mut exit_node_satisfied = exit_node_selector.is_none();
for (id, peer) in state.peers.peers() {
peer_ids.push(*id);
let span = tracing::trace_span!(
"peer_update",
peer_key = %peer.node_key,
region = ?peer.derp_region,
peer_id = ?id,
);
for route in peer.routes_to_install(self.env.accept_routes, exit_node_id.as_ref()) {
if route.prefix_len() == 0 {
exit_node_satisfied = true;
}
overlay_out.insert(*route, OutboundRouteAction::Wireguard(*id));
}
let region = match peer.derp_region {
Some(region) => Some(region),
None => match self
.multiderp
.ask(multiderp::RegionForPeer { peer: *id })
.await
{
Ok(region) => region,
Err(e) => {
tracing::error!(error = %e, "multiderp unavailable");
None
}
},
};
let Some(region) = region else {
tracing::trace!(parent: &span, "peer has no derp region and none could be inferred");
continue;
};
match self
.multiderp
.ask(multiderp::TransportIdForRegion { id: region })
.await
{
Ok(Some(transport_id)) => {
derp_underlay.insert(*id, transport_id);
}
Ok(None) => {
tracing::error!(parent: &span, "no region stored in multiderp, no underlay route");
}
Err(e) => {
tracing::error!(error = %e, "multiderp unavailable");
}
}
}
if !exit_node_satisfied {
if exit_node_id.is_some() {
tracing::warn!(
exit_node = ?exit_node_selector,
resolved = ?exit_node_id,
"configured exit node resolved to a peer that advertises no default route \
(0.0.0.0/0); internet-bound traffic will be dropped (fail-closed). If you only \
need to reach this peer's ports over the tailnet (not full-tunnel egress), \
leave `exit_node` unset — direct peer dials don't require it."
);
} else {
tracing::warn!(
exit_node = ?exit_node_selector,
"configured exit node not found among peers (stale or typo'd selector); \
internet-bound traffic will be dropped (fail-closed)"
);
}
}
let active_exit_id = exit_node_satisfied.then(|| exit_node_id.clone()).flatten();
if active_exit_id != self.last_exit_node_id {
self.last_exit_node_id = active_exit_id.clone();
self.active_exit_tx.send_replace(active_exit_id.clone());
let node = active_exit_id.and_then(|id| {
state
.peers
.peers()
.values()
.find(|peer| peer.stable_id == id)
.cloned()
.map(Arc::new)
});
if let Err(e) = self.env.publish(ActiveExitNode { node }).await {
tracing::error!(error = %e, "publishing active exit node");
}
}
let direct_ready = match self
.direct
.ask(direct::PeersWithDirectPath { ids: peer_ids })
.await
{
Ok(ready) => ready,
Err(e) => {
tracing::error!(error = %e, "direct manager unavailable, staying on derp");
HashSet::new()
}
};
let underlay_out = match self.direct_tid {
Some(direct_tid) if !direct_ready.is_empty() => {
overlay_direct(derp_underlay, &direct_ready, direct_tid)
}
_ => derp_underlay,
};
if !force && underlay_out == self.last_underlay {
tracing::trace!("routes unchanged, skipping republish");
return;
}
self.last_underlay = underlay_out.clone();
if let Err(e) = self
.env
.publish(PeerRouteUpdate {
inner: Arc::new(PeerRoutesInner {
underlay_routes: underlay_out,
overlay_out_routes: overlay_out,
}),
})
.await
{
tracing::error!(error = %e, "publishing peer route update");
}
}
}
impl Message<Arc<PeerState>> for RouteUpdater {
type Reply = ();
async fn handle(&mut self, msg: Arc<PeerState>, _ctx: &mut Context<Self, Self::Reply>) {
tracing::trace!(
n_peers = msg.peers.peers().len(),
"reconstructing routes for peer update"
);
self.last_peer_state = Some(msg);
self.rebuild_and_publish(true).await;
}
}
impl Message<RecomputeRoutes> for RouteUpdater {
type Reply = ();
async fn handle(&mut self, _msg: RecomputeRoutes, _ctx: &mut Context<Self, Self::Reply>) {
self.rebuild_and_publish(false).await;
}
}
fn split_inbound_routes(
node: &ts_control::Node,
app_transport: OverlayTransportId,
forwarder_transport: OverlayTransportId,
) -> ts_bart::Table<InboundRouteAction> {
let mut out = ts_bart::Table::default();
for &accepted_route in &node.accepted_routes {
let transport = if node.is_subnet_route(&accepted_route) {
forwarder_transport
} else {
app_transport
};
out.insert(accepted_route, InboundRouteAction::ToOverlay(transport));
}
out
}
impl Message<Arc<ts_control::StateUpdate>> for RouteUpdater {
type Reply = ();
async fn handle(
&mut self,
msg: Arc<ts_control::StateUpdate>,
_ctx: &mut Context<Self, Self::Reply>,
) {
let Some(node) = msg.node.as_ref() else {
return;
};
tracing::debug!(accepted_routes = ?node.accepted_routes, "populating accepted routes");
let out = split_inbound_routes(
node,
self.default_overlay_transport,
self.forwarder_overlay_transport,
);
if let Err(e) = self
.env
.publish(SelfRouteUpdate {
overlay_in_routes: Arc::new(out),
})
.await
{
tracing::error!(error = %e, "publishing self route update");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn direct_ready_peers_are_upgraded_others_keep_derp() {
let derp_a = UnderlayTransportId(1);
let derp_b = UnderlayTransportId(2);
let direct_tid = UnderlayTransportId(99);
let peer_a = PeerId(10);
let peer_b = PeerId(20);
let peer_c = PeerId(30);
let mut derp_underlay = HashMap::new();
derp_underlay.insert(peer_a, derp_a);
derp_underlay.insert(peer_b, derp_b);
let ready: HashSet<PeerId> = [peer_a, peer_c].into_iter().collect();
let out = overlay_direct(derp_underlay, &ready, direct_tid);
assert_eq!(out.get(&peer_a), Some(&direct_tid), "a upgraded to direct");
assert_eq!(out.get(&peer_b), Some(&derp_b), "b stays on derp");
assert_eq!(
out.get(&peer_c),
Some(&direct_tid),
"c routed direct even with no derp route"
);
}
fn split_router_node() -> ts_control::Node {
use ts_control::{Node, StableNodeId, TailnetAddress};
Node {
id: 1,
stable_id: StableNodeId("n1".to_string()),
hostname: "router".to_string(),
user_id: 0,
tailnet: Some("ts.net".to_string()),
tags: vec![],
tailnet_address: TailnetAddress {
ipv4: "100.64.0.7/32".parse().unwrap(),
ipv6: "fd7a::7/128".parse().unwrap(),
},
node_key: [0u8; 32].into(),
node_key_expiry: None,
key_signature: vec![],
machine_key: None,
disco_key: None,
accepted_routes: vec![
"100.64.0.7/32".parse().unwrap(),
"fd7a::7/128".parse().unwrap(),
"192.168.1.0/24".parse().unwrap(),
"0.0.0.0/0".parse().unwrap(),
],
underlay_addresses: vec![],
derp_region: None,
cap: Default::default(),
cap_map: Default::default(),
peerapi_port: None,
peerapi_dns_proxy: false,
is_wireguard_only: false,
exit_node_dns_resolvers: vec![],
peer_relay: false,
service_vips: Default::default(),
}
}
fn routed_transport(
table: &ts_bart::Table<InboundRouteAction>,
ip: &str,
) -> Option<OverlayTransportId> {
match table.lookup(ip.parse().unwrap()) {
Some(InboundRouteAction::ToOverlay(id)) => Some(*id),
_ => None,
}
}
#[test]
fn inbound_split_sends_subnets_to_forwarder_and_host_addrs_to_app() {
let app = OverlayTransportId(0);
let fwd = OverlayTransportId(1);
let node = split_router_node();
let table = split_inbound_routes(&node, app, fwd);
assert_eq!(routed_transport(&table, "100.64.0.7"), Some(app));
assert_eq!(routed_transport(&table, "fd7a::7"), Some(app));
assert_eq!(routed_transport(&table, "192.168.1.5"), Some(fwd));
assert_eq!(routed_transport(&table, "8.8.8.8"), Some(fwd));
}
#[test]
fn empty_ready_set_is_pure_derp() {
let derp_a = UnderlayTransportId(1);
let peer_a = PeerId(10);
let mut derp_underlay = HashMap::new();
derp_underlay.insert(peer_a, derp_a);
let out = overlay_direct(
derp_underlay.clone(),
&HashSet::new(),
UnderlayTransportId(99),
);
assert_eq!(
out, derp_underlay,
"no direct-ready peers => unchanged derp map"
);
}
}