use core::{
net::{Ipv4Addr, Ipv6Addr},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Instant};
use futures::StreamExt;
use kameo::{
actor::{ActorRef, Spawn},
message::{Context, StreamMessage},
prelude::Message,
};
use tokio::sync::watch;
use ts_control::{
AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
tka_init_begin, tka_init_finish, tka_submit_signature,
};
use ts_magicsock::SelfEndpointType;
use crate::{
derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
direct::EndpointAdvertisement,
};
pub struct ControlRunner {
client: AsyncControlClient,
params: Params,
self_node: watch::Sender<Option<Node>>,
ssh_policy: watch::Sender<Option<SshPolicy>>,
tka: watch::Sender<Option<TkaStatus>>,
tka_synced: Option<crate::tka_sync::SyncedTka>,
tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
tka_syncing: bool,
tka_generation: u64,
cert_domains: watch::Sender<Vec<String>>,
dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
pop_browser_url: watch::Sender<Option<url::Url>>,
netcheck: watch::Sender<crate::status::NetcheckReport>,
home_region: Option<(ts_derp::RegionId, core::time::Duration)>,
derp_report_history: Vec<(Instant, Arc<Vec<ts_netcheck::RegionResult>>)>,
reauth_bridge: tokio::task::JoinHandle<()>,
}
impl Drop for ControlRunner {
fn drop(&mut self) {
self.reauth_bridge.abort();
}
}
pub struct Params {
pub(crate) config: ts_control::Config,
pub(crate) auth_key: Option<String>,
pub(crate) env: crate::Env,
pub(crate) state_tx: watch::Sender<crate::DeviceState>,
pub(crate) tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
}
#[doc(hidden)]
#[derive(Debug, thiserror::Error)]
pub enum ControlRunnerError {
#[error(transparent)]
Control(#[from] ControlError),
#[error(transparent)]
Crate(#[from] crate::Error),
}
impl kameo::Actor for ControlRunner {
type Args = Params;
type Error = ControlRunnerError;
async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
loop {
match AsyncControlClient::check_auth(
¶ms.config,
¶ms.env.keys,
params.auth_key.as_deref(),
)
.await
{
Ok(()) => break,
Err(ControlError::MachineNotAuthorized(u)) => {
tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
params
.state_tx
.send_replace(crate::DeviceState::NeedsLogin(u.clone()));
tokio::time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
let reason = crate::RegistrationError::from(&e);
tracing::error!(error = %e, "registration failed; control runner stopping");
params
.state_tx
.send_replace(crate::DeviceState::Failed(reason));
return Err(e.into());
}
}
}
let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
let bring_up = async {
let (client, stream) = AsyncControlClient::connect(
¶ms.config,
¶ms.env.keys,
params.auth_key.as_deref(),
auth_url_tx,
)
.await?;
DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
slf.attach_stream(stream.boxed(), (), ());
Ok::<_, ControlRunnerError>(client)
};
let client = match bring_up.await {
Ok(client) => client,
Err(e) => {
tracing::error!(error = %e, "bringing up the control session failed");
params.state_tx.send_replace(crate::DeviceState::Failed(
crate::RegistrationError::NetworkUnreachable,
));
return Err(e);
}
};
params.state_tx.send_replace(crate::DeviceState::Running);
let reauth_bridge = {
let state_tx = params.state_tx.clone();
let mut auth_url_rx = auth_url_rx;
tokio::spawn(async move {
while auth_url_rx.changed().await.is_ok() {
let url = auth_url_rx.borrow_and_update().clone();
bridge_reauth_url_to_state(&state_tx, url.as_ref());
}
})
};
let tka_authority = params.tka_authority.clone();
Ok(Self {
client,
params,
self_node: Default::default(),
ssh_policy: Default::default(),
tka: Default::default(),
tka_synced: None,
tka_authority,
tka_syncing: false,
tka_generation: 0,
cert_domains: Default::default(),
dns_config: Default::default(),
pop_browser_url: Default::default(),
netcheck: Default::default(),
home_region: None,
derp_report_history: Vec::new(),
reauth_bridge,
})
}
}
impl ControlRunner {
fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
if !tka.is_enabled() {
self.tka_generation = self.tka_generation.wrapping_add(1);
if self.tka_synced.is_some() {
tracing::info!("TKA lock disabled; clearing enforcement (admitting all peers)");
self.tka_synced = None;
}
self.tka_authority.send_replace(None);
return;
}
if self.tka_syncing {
return; }
if let Some(synced) = &self.tka_synced
&& let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
&& synced.authority.head_matches(&control_head)
{
return;
}
self.tka_syncing = true;
let generation = self.tka_generation;
let current = self.tka_synced.take();
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
if let Err(e) = self_ref.tell(TkaSynced { result, generation }).await {
tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
}
});
}
async fn apply_tka_synced(
&mut self,
result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
generation: u64,
) {
self.tka_syncing = false;
if generation != self.tka_generation {
tracing::info!(
"TKA sync result superseded (lock disabled or re-synced mid-flight); discarding"
);
return;
}
match result {
Ok(Some(synced)) => {
tracing::info!(
head = %synced.authority.head().to_base32(),
"TKA sync succeeded; enforcing verified Authority (Go tkaFilterNetmapLocked)"
);
self.tka_authority
.send_replace(Some(synced.authority.clone()));
if let Some(self_node) = self.self_node.borrow().as_ref() {
log_self_lockout(self_node, &synced.authority);
}
self.tka_synced = Some(synced);
}
Ok(None) => {
if self.tka_synced.is_some() {
tracing::info!("TKA sync: control reports no lock; clearing enforcement");
self.tka_synced = None;
}
self.tka_authority.send_replace(None);
}
Err(e) => {
tracing::warn!(error = %e, "TKA sync failed; keeping prior enforcement state");
}
}
}
fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
where
F: FnOnce(&Node) -> R,
{
let mut sub = self.self_node.subscribe();
let mut shutdown = self.params.env.shutdown.clone();
async move {
tokio::select! {
_ = shutdown.wait_for(|x| *x) => {
None
},
node = sub.wait_for(Option::is_some) => {
Some(f(node.ok()?.as_ref()?))
},
}
}
}
}
fn sticky_update_pop_browser_url(
cell: &watch::Sender<Option<url::Url>>,
incoming: Option<&url::Url>,
) {
if let Some(url) = incoming {
cell.send_if_modified(|current| {
if current.as_ref() == Some(url) {
false
} else {
*current = Some(url.clone());
true
}
});
}
}
pub(crate) fn bridge_reauth_url_to_state(
state_tx: &watch::Sender<crate::DeviceState>,
incoming: Option<&url::Url>,
) {
if let Some(url) = incoming {
let next = crate::DeviceState::NeedsLogin(url.clone());
state_tx.send_if_modified(|current| {
if *current == next {
false
} else {
*current = next.clone();
true
}
});
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum SelfLockVerdict {
Unsigned,
Authorized,
LockedOut(String),
}
fn self_lock_verdict(
node_key: &ts_keys::NodePublicKey,
key_signature: &[u8],
authority: &ts_tka::Authority,
) -> SelfLockVerdict {
if key_signature.is_empty() {
return SelfLockVerdict::Unsigned;
}
match authority.node_key_authorized(&node_key.to_bytes(), key_signature) {
Ok(()) => SelfLockVerdict::Authorized,
Err(e) => SelfLockVerdict::LockedOut(e.to_string()),
}
}
fn log_self_lockout(self_node: &Node, authority: &ts_tka::Authority) {
match self_lock_verdict(&self_node.node_key, &self_node.key_signature, authority) {
SelfLockVerdict::Unsigned => tracing::info!(
"TKA: this node has no key-signature for the active lock; it cannot prove itself to \
locked peers until control signs it (not locked out, just unsigned)"
),
SelfLockVerdict::Authorized => {
tracing::debug!("TKA: self node-key is authorized by the active lock")
}
SelfLockVerdict::LockedOut(error) => tracing::warn!(
%error,
"TKA self locked out: this node's key-signature is not authorized by the active \
network lock; locked peers will reject it until control re-signs this node \
(Go LockedOut)"
),
}
}
pub use msg_impl::*;
#[allow(missing_docs)]
mod msg_impl {
use kameo::{message::Context, reply::DelegatedReply};
use super::*;
#[kameo::messages]
impl ControlRunner {
#[message(ctx)]
pub fn ipv4(
&self,
ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
) -> DelegatedReply<Option<Ipv4Addr>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
tokio::spawn(async move {
let ip = fut.await;
replier.send(ip);
});
}
deleg
}
#[message(ctx)]
pub fn ipv6(
&self,
ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
) -> DelegatedReply<Option<Ipv6Addr>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
tokio::spawn(async move {
let ip = fut.await;
replier.send(ip);
});
}
deleg
}
#[message(ctx)]
pub fn self_node(
&self,
ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
) -> DelegatedReply<Option<Node>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let node = self.with_self_node(|node| node.clone());
tokio::spawn(async move {
let node = node.await;
replier.send(node)
});
}
deleg
}
#[message]
pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
self.ssh_policy.borrow().clone()
}
#[message]
pub fn current_tka_status(&self) -> Option<TkaStatus> {
self.tka.borrow().clone()
}
#[message(ctx)]
pub fn tka_sign(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
node_key: [u8; 32],
) -> DelegatedReply<Result<(), TkaSyncError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let nks = ts_tka::NodeKeySignature::sign_direct(
&node_key,
&keys.network_lock_keys.private.signing_key(),
);
let req = ts_control::TkaSubmitSignatureRequest {
version: Default::default(),
node_key: keys.node_keys.public,
signature: nks.serialize(),
};
let result = tka_submit_signature(
&config.server_url,
&keys,
req,
config.allow_http_key_fetch,
)
.await
.map(|_response| ());
replier.send(result);
});
}
deleg
}
#[message(ctx)]
pub fn tka_disable(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
disablement_secret: Vec<u8>,
) -> DelegatedReply<Result<(), TkaSyncError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = match head {
Some(head) => {
let req = ts_control::TkaDisableRequest {
version: Default::default(),
node_key: keys.node_keys.public,
head,
disablement_secret,
};
tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
.await
.map(|_response| ())
}
None => Err(TkaSyncError::Unsupported),
};
replier.send(result);
});
}
deleg
}
#[message(ctx)]
pub fn tka_init(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
disablement_secret: Vec<u8>,
) -> DelegatedReply<Result<(), TkaSyncError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = tka_init_run(&config, &keys, disablement_secret).await;
replier.send(result);
});
}
deleg
}
#[message]
pub fn cert_domains(&self) -> Vec<String> {
self.cert_domains.borrow().clone()
}
#[message]
pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
self.dns_config.borrow().clone()
}
#[message]
pub fn pop_browser_url(&self) -> Option<url::Url> {
self.pop_browser_url.borrow().clone()
}
#[message(derive(Clone))]
pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
self.pop_browser_url.subscribe()
}
#[message]
pub fn netcheck(&self) -> crate::status::NetcheckReport {
self.netcheck.borrow().clone()
}
#[message(ctx)]
pub fn fetch_id_token(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
audience: String,
) -> DelegatedReply<Result<String, IdTokenError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
replier.send(result);
});
}
deleg
}
#[message(ctx)]
pub fn logout(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
) -> DelegatedReply<Result<(), LogoutError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = ts_control::logout(&config, &keys).await;
replier.send(result);
});
}
deleg
}
#[message(ctx)]
pub fn set_dns(
&self,
ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
name: String,
value: String,
) -> DelegatedReply<Result<(), SetDnsError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
replier.send(result);
});
}
deleg
}
}
#[cfg(feature = "acme")]
pub type CertPairReply = Result<(String, String), ts_control::CertError>;
#[cfg(feature = "acme")]
#[kameo::messages]
impl ControlRunner {
#[message(ctx)]
pub fn get_certificate(
&self,
ctx: &mut Context<
Self,
DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
>,
name: String,
) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = issue_certificate(&config, &keys, &name).await;
replier.send(result);
});
}
deleg
}
#[message(ctx)]
pub fn get_cert_pair(
&self,
ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
name: String,
) -> DelegatedReply<CertPairReply> {
let (deleg, replier) = ctx.reply_sender();
if let Some(replier) = replier {
let config = self.params.config.clone();
let keys = self.params.env.keys.clone();
tokio::spawn(async move {
let result = issue_cert_pair(&config, &keys, &name).await;
replier.send(result);
});
}
deleg
}
}
}
async fn tka_init_run(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
disablement_secret: Vec<u8>,
) -> Result<(), TkaSyncError> {
let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
let genesis_key = ts_tka::AumKey {
kind: ts_tka::KeyKind::Ed25519,
votes: 1,
public: nl_public,
meta: Vec::new(),
};
let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
.map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
genesis.sign(&keys.network_lock_keys.private.signing_key());
let begin_req = ts_control::TkaInitBeginRequest {
version: Default::default(),
node_key: keys.node_keys.public,
genesis_aum: genesis.serialize(),
};
let begin_resp = tka_init_begin(
&config.server_url,
keys,
begin_req,
config.allow_http_key_fetch,
)
.await?;
if !begin_resp.need_signatures.is_empty() {
tracing::warn!(
need = begin_resp.need_signatures.len(),
"tka_init: control requires re-signing other nodes; the multi-node init is not yet \
implemented (single-node lock-init only)"
);
return Err(TkaSyncError::Unsupported);
}
let finish_req = ts_control::TkaInitFinishRequest {
version: Default::default(),
node_key: keys.node_keys.public,
signatures: std::collections::BTreeMap::new(),
support_disablement: disablement_secret,
};
tka_init_finish(
&config.server_url,
keys,
finish_req,
config.allow_http_key_fetch,
)
.await
.map(|_response| ())
}
#[cfg(feature = "acme")]
async fn issue_certificate(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
name: &str,
) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
issue_cert_pair_inner(config, keys, name)
.await
.map(|issued| issued.certified)
}
#[cfg(feature = "acme")]
async fn issue_cert_pair(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
name: &str,
) -> Result<(String, String), ts_control::CertError> {
issue_cert_pair_inner(config, keys, name)
.await
.map(|issued| (issued.cert_chain_pem, issued.key_pem))
}
#[cfg(feature = "acme")]
async fn issue_cert_pair_inner(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
name: &str,
) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
let account_key = match keys.acme_account_key.as_deref() {
Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
None => {
tracing::debug!(
"no persisted ACME account key in key state; generating an ephemeral per-call key \
(a new ACME account this issuance — not persisted back)"
);
ts_control::acme::AcmeAccountKey::generate()?.0
}
};
let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
.parse()
.map_err(|e| {
ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
})?;
ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
}
impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
type Reply = ();
async fn handle(
&mut self,
msg: StreamMessage<Arc<StateUpdate>, (), ()>,
ctx: &mut Context<Self, Self::Reply>,
) {
match msg {
StreamMessage::Started(_) => {
tracing::trace!("started listening to state updates");
}
StreamMessage::Next(msg) => {
if let Some(node) = msg.node.as_ref() {
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let next = if node.key_expired_at_unix(now_unix) {
crate::DeviceState::Expired
} else {
crate::DeviceState::Running
};
self.params.state_tx.send_if_modified(|s| {
if *s != next {
*s = next.clone();
true
} else {
false
}
});
self.self_node.send_replace(Some(node.clone()));
}
if let Some(policy) = msg.ssh_policy.as_ref() {
self.ssh_policy.send_replace(Some(policy.clone()));
}
if let Some(tka) = msg.tka.as_ref() {
self.tka.send_replace(Some(tka.clone()));
self.maybe_sync_tka(tka, ctx.actor_ref().clone());
}
let cert_domains = msg
.dns_config
.as_ref()
.map(|d| d.cert_domains.clone())
.unwrap_or_default();
self.cert_domains.send_replace(cert_domains);
self.dns_config.send_replace(msg.dns_config.clone());
sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
if let Err(e) = self.params.env.publish(msg).await {
tracing::error!(error = %e, "publishing netmap update");
}
}
StreamMessage::Finished(_) => {
tracing::error!("state update stream terminated")
}
}
}
}
#[doc(hidden)]
pub struct TkaSynced {
pub(crate) result:
Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
pub(crate) generation: u64,
}
impl Message<TkaSynced> for ControlRunner {
type Reply = ();
async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
self.apply_tka_synced(msg.result, msg.generation).await;
}
}
impl Message<DerpLatencyMeasurement> for ControlRunner {
type Reply = ();
async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
let measurements = msg.measurement.as_ref().clone();
self.netcheck
.send_replace(crate::status::NetcheckReport::from_region_results(
&measurements,
));
if measurements.is_empty() {
tracing::debug!("derp latency measurements empty");
return;
};
let now = Instant::now();
self.derp_report_history
.push((now, msg.measurement.clone()));
self.derp_report_history
.retain(|(stamp, _)| now.saturating_duration_since(*stamp) <= DERP_HISTORY_MAX_AGE);
let best_recent = best_recent(&self.derp_report_history, now, DERP_HISTORY_MAX_AGE);
let selected_id = select_home_region(
self.home_region.map(|(id, _)| id),
&measurements,
&best_recent,
)
.expect("non-empty measurements always yield a selection");
let selected_latency = measurements
.iter()
.find(|m| m.id == selected_id)
.expect("the selected region id is always one of the measurements")
.latency;
let iter = measurements.iter().map(|result| {
(
result.latency_map_key.as_str(),
result.latency.as_secs_f64(),
)
});
if self.home_region.map(|(id, _)| id) != Some(selected_id) {
tracing::debug!(selected_region_id = ?selected_id, "updating home region");
}
self.home_region = Some((selected_id, selected_latency));
self.client.set_home_region(selected_id, iter).await;
}
}
const DERP_HISTORY_MAX_AGE: Duration = Duration::from_secs(5 * 60);
fn best_recent(
history: &[(Instant, Arc<Vec<ts_netcheck::RegionResult>>)],
now: Instant,
max_age: Duration,
) -> HashMap<ts_derp::RegionId, Duration> {
let mut best: HashMap<ts_derp::RegionId, Duration> = HashMap::new();
for (stamp, report) in history {
if now.saturating_duration_since(*stamp) > max_age {
continue;
}
for r in report.iter() {
best.entry(r.id)
.and_modify(|d| {
if r.latency < *d {
*d = r.latency;
}
})
.or_insert(r.latency);
}
}
best
}
fn select_home_region(
current: Option<ts_derp::RegionId>,
measurements: &[ts_netcheck::RegionResult],
best_recent: &HashMap<ts_derp::RegionId, Duration>,
) -> Option<ts_derp::RegionId> {
const PREFERRED_DERP_ABSOLUTE_DIFF: Duration = Duration::from_millis(10);
let smoothed = |m: &ts_netcheck::RegionResult| -> Duration {
best_recent.get(&m.id).copied().unwrap_or(m.latency)
};
let best = measurements.iter().min_by_key(|m| smoothed(m))?;
let best_any = smoothed(best);
let Some(old_id) = current.filter(|id| *id != best.id) else {
return Some(best.id);
};
match measurements.iter().find(|m| m.id == old_id) {
Some(old) => {
let keep_old = old.latency.saturating_sub(best_any) < PREFERRED_DERP_ABSOLUTE_DIFF
|| best_any > (old.latency / 3) * 2;
Some(if keep_old { old.id } else { best.id })
}
None => Some(best.id),
}
}
impl Message<EndpointAdvertisement> for ControlRunner {
type Reply = ();
async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
let endpoints: Vec<Endpoint> = msg
.endpoints
.iter()
.map(|ep| Endpoint {
endpoint: ep.addr,
ty: match ep.ty {
SelfEndpointType::Local => EndpointType::Local,
SelfEndpointType::Stun => EndpointType::Stun,
SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
},
})
.collect();
tracing::debug!(
n_endpoints = endpoints.len(),
"advertising endpoints to control"
);
self.client.set_endpoints(endpoints).await;
}
}
#[derive(Debug)]
pub struct SetAdvertiseRoutes {
pub routes: Vec<ipnet::IpNet>,
}
impl Message<SetAdvertiseRoutes> for ControlRunner {
type Reply = ();
async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
self.client.set_routable_ips(msg.routes).await;
}
}
#[derive(Debug)]
pub struct SetHostname {
pub hostname: String,
}
impl Message<SetHostname> for ControlRunner {
type Reply = ();
async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
tracing::debug!("updating hostname at control");
self.client.set_hostname(msg.hostname).await;
}
}
#[cfg(test)]
mod reauth_bridge_tests {
use tokio::sync::watch;
use super::bridge_reauth_url_to_state;
use crate::DeviceState;
fn url(s: &str) -> url::Url {
s.parse().unwrap()
}
#[test]
fn bridge_maps_auth_url_to_needs_login() {
let u = url("https://login.example/auth");
let (tx, rx) = watch::channel(DeviceState::Running);
bridge_reauth_url_to_state(&tx, Some(&u));
assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
}
#[test]
fn bridge_none_leaves_state_unchanged() {
let (tx, rx) = watch::channel(DeviceState::Running);
bridge_reauth_url_to_state(&tx, None);
assert_eq!(*rx.borrow(), DeviceState::Running);
}
#[test]
fn bridge_same_url_does_not_refire() {
let u = url("https://login.example/auth");
let (tx, mut rx) = watch::channel(DeviceState::Running);
bridge_reauth_url_to_state(&tx, Some(&u)); assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
rx.mark_unchanged();
bridge_reauth_url_to_state(&tx, Some(&u)); assert!(
!rx.has_changed().unwrap(),
"the same re-auth URL must not re-fire the state watch"
);
}
#[test]
fn bridge_new_url_after_prior_fires() {
let a = url("https://login.example/a");
let b = url("https://login.example/b");
let (tx, rx) = watch::channel(DeviceState::Running);
bridge_reauth_url_to_state(&tx, Some(&a));
bridge_reauth_url_to_state(&tx, Some(&b));
assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
}
#[test]
fn running_netmap_clears_needs_login() {
let u = url("https://login.example/auth");
let (tx, rx) = watch::channel(DeviceState::Running);
bridge_reauth_url_to_state(&tx, Some(&u));
assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
tx.send_replace(DeviceState::Running);
assert_eq!(*rx.borrow(), DeviceState::Running);
}
}
#[cfg(test)]
mod sticky_pop_browser_url_tests {
use tokio::sync::watch;
use super::sticky_update_pop_browser_url;
fn url(s: &str) -> url::Url {
s.parse().unwrap()
}
#[test]
fn non_empty_url_publishes() {
let (tx, rx) = watch::channel(None);
let u = url("https://login.example/consent");
sticky_update_pop_browser_url(&tx, Some(&u));
assert_eq!(*rx.borrow(), Some(u));
}
#[test]
fn absent_update_does_not_reset() {
let u = url("https://login.example/consent");
let (tx, rx) = watch::channel(Some(u.clone()));
for _ in 0..5 {
sticky_update_pop_browser_url(&tx, None);
}
assert_eq!(
*rx.borrow(),
Some(u),
"empty updates must not clear the URL"
);
}
#[test]
fn repeated_same_url_does_not_refire() {
let u = url("https://login.example/consent");
let (tx, mut rx) = watch::channel(None);
sticky_update_pop_browser_url(&tx, Some(&u)); assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
rx.mark_unchanged();
sticky_update_pop_browser_url(&tx, Some(&u)); assert!(
!rx.has_changed().unwrap(),
"repeating the same URL must not re-fire the watch"
);
}
#[test]
fn new_url_after_prior_fires() {
let a = url("https://login.example/a");
let b = url("https://login.example/b");
let (tx, rx) = watch::channel(None);
sticky_update_pop_browser_url(&tx, Some(&a));
sticky_update_pop_browser_url(&tx, Some(&b));
assert_eq!(*rx.borrow(), Some(b));
}
#[test]
fn sticky_through_none_gap_then_new_url_fires() {
let a = url("https://login.example/a");
let b = url("https://login.example/b");
let (tx, rx) = watch::channel(None);
sticky_update_pop_browser_url(&tx, Some(&a));
for _ in 0..3 {
sticky_update_pop_browser_url(&tx, None);
}
assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
sticky_update_pop_browser_url(&tx, Some(&b));
assert_eq!(
*rx.borrow(),
Some(b),
"a new URL after a None gap still fires"
);
}
#[test]
fn returning_to_prior_url_refires() {
let a = url("https://login.example/a");
let b = url("https://login.example/b");
let (tx, mut rx) = watch::channel(None);
sticky_update_pop_browser_url(&tx, Some(&a));
sticky_update_pop_browser_url(&tx, Some(&b));
rx.mark_unchanged();
sticky_update_pop_browser_url(&tx, Some(&a)); assert!(
rx.has_changed().unwrap(),
"returning to a prior URL re-fires"
);
assert_eq!(*rx.borrow(), Some(a));
}
#[tokio::test]
async fn end_to_end_one_change_survives_none_thrash() {
let u = url("https://login.example/consent");
let (tx, mut rx) = watch::channel(None);
let cadence = [None, None, Some(&u), None, None];
for incoming in cadence {
sticky_update_pop_browser_url(&tx, incoming);
}
let mut changes = 0;
while rx.has_changed().unwrap() {
let v = rx.borrow_and_update().clone();
changes += 1;
assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
}
assert_eq!(changes, 1, "exactly one change survives the None thrash");
}
}
#[cfg(test)]
mod home_region_hysteresis_tests {
use core::time::Duration;
use std::{collections::HashMap, sync::Arc, time::Instant};
use ts_derp::RegionId;
use ts_netcheck::RegionResult;
use super::{DERP_HISTORY_MAX_AGE, best_recent, select_home_region};
fn region(id: u32, latency_ms: u64) -> RegionResult {
RegionResult {
latency: Duration::from_millis(latency_ms),
id: RegionId(core::num::NonZeroU32::new(id).unwrap()),
latency_map_key: format!("region-{id}"),
connected_remote: "127.0.0.1:0".parse().unwrap(),
}
}
fn rid(id: u32) -> RegionId {
RegionId(core::num::NonZeroU32::new(id).unwrap())
}
fn sel(current: Option<RegionId>, m: &[RegionResult]) -> Option<RegionId> {
select_home_region(current, m, &HashMap::new())
}
#[test]
fn empty_measurements_select_none() {
assert!(sel(Some(rid(1)), &[]).is_none());
assert!(sel(None, &[]).is_none());
}
#[test]
fn first_selection_takes_best() {
let m = [region(1, 20), region(2, 50)];
assert_eq!(sel(None, &m).unwrap(), rid(1));
}
#[test]
fn keeps_current_when_within_absolute_diff() {
let m = [region(1, 20), region(2, 25)];
assert_eq!(
sel(Some(rid(2)), &m).unwrap(),
rid(2),
"a 5ms improvement (< 10ms) must not flap the home region"
);
}
#[test]
fn switches_on_meaningful_improvement() {
let m = [region(1, 20), region(2, 100)];
assert_eq!(
sel(Some(rid(2)), &m).unwrap(),
rid(1),
"a large improvement must switch the home region"
);
}
#[test]
fn keeps_current_when_two_thirds_rule_not_met() {
let m = [region(1, 45), region(2, 60)];
assert_eq!(
sel(Some(rid(2)), &m).unwrap(),
rid(2),
"best (45ms) is not <= 2/3 of current (40ms), so keep current despite >10ms diff"
);
}
#[test]
fn switches_when_current_region_absent() {
let m = [region(1, 20), region(3, 25)];
assert_eq!(
sel(Some(rid(2)), &m).unwrap(),
rid(1),
"a current region absent from the measurements falls through to the best"
);
}
#[test]
fn keeps_current_when_it_is_already_best() {
let m = [region(2, 20), region(1, 50)];
assert_eq!(sel(Some(rid(2)), &m).unwrap(), rid(2));
}
#[test]
fn best_recent_is_min_over_window_and_evicts_aged() {
let now = Instant::now();
let history = vec![
(
now - Duration::from_secs(10 * 60), Arc::new(vec![region(1, 5)]),
),
(
now - Duration::from_secs(60),
Arc::new(vec![region(1, 50), region(2, 30)]),
),
(now, Arc::new(vec![region(1, 20)])),
];
let br = best_recent(&history, now, DERP_HISTORY_MAX_AGE);
assert_eq!(
br.get(&rid(1)).copied(),
Some(Duration::from_millis(20)),
"region 1 min over the window is 20ms (the aged 5ms is excluded)"
);
assert_eq!(br.get(&rid(2)).copied(), Some(Duration::from_millis(30)));
}
#[test]
fn smoothed_best_damps_oscillation_across_boundary() {
let m = [region(1, 20), region(2, 60)];
let mut br = HashMap::new();
br.insert(rid(1), Duration::from_millis(50)); br.insert(rid(2), Duration::from_millis(60));
assert_eq!(
select_home_region(Some(rid(2)), &m, &br).unwrap(),
rid(2),
"a best region whose 5-min min is only marginally better must not flap the home region"
);
assert_eq!(
select_home_region(Some(rid(2)), &m, &HashMap::new()).unwrap(),
rid(1),
"raw-vs-raw (no smoothing) switches on the 20ms-vs-60ms current samples"
);
}
#[test]
fn smoothed_best_may_differ_from_raw_first() {
let m = [region(1, 10), region(2, 12)];
let mut br = HashMap::new();
br.insert(rid(1), Duration::from_millis(40));
br.insert(rid(2), Duration::from_millis(5));
assert_eq!(
select_home_region(None, &m, &br).unwrap(),
rid(2),
"the smoothed-best region wins even when it is not the raw-latency first"
);
}
#[test]
fn two_thirds_boundary_is_integer_not_float() {
let m = [region(1, 24), region(2, 36)];
assert_eq!(
sel(Some(rid(2)), &m).unwrap(),
rid(1),
"at best == old*2/3 the integer rule does NOT keep (Go switches); a float rule would keep"
);
}
#[test]
fn old_faster_than_smoothed_best_keeps_via_absolute_diff() {
let m = [region(1, 15), region(2, 20)];
let mut br = HashMap::new();
br.insert(rid(1), Duration::from_millis(18)); br.insert(rid(2), Duration::from_millis(25)); assert_eq!(
select_home_region(Some(rid(2)), &m, &br).unwrap(),
rid(2),
"old raw (20ms) within 10ms of smoothed-best (18ms) keeps via the absolute-diff arm"
);
}
}
#[cfg(test)]
mod self_lockout_tests {
use ts_tka::{AumHash, Authority, State};
use super::{SelfLockVerdict, self_lock_verdict};
fn node_key() -> ts_keys::NodePublicKey {
ts_keys::NodePrivateKey::random().public_key()
}
#[test]
fn empty_signature_is_unsigned_not_locked_out() {
let authority = Authority::from_state(AumHash([0; 32]), State::default());
assert_eq!(
self_lock_verdict(&node_key(), &[], &authority),
SelfLockVerdict::Unsigned
);
}
#[test]
fn unverifiable_signature_is_locked_out() {
let authority = Authority::from_state(AumHash([0; 32]), State::default());
let verdict = self_lock_verdict(&node_key(), &[0x01, 0x02, 0x03], &authority);
assert!(
matches!(verdict, SelfLockVerdict::LockedOut(_)),
"a signature the lock cannot authorize must classify as LockedOut, got {verdict:?}"
);
}
}