#![doc = include_str!("../README.md")]
extern crate ts_netstack_smoltcp as netstack;
use core::time::Duration;
use std::sync::Arc;
use kameo::{
actor::{ActorRef, Spawn, WeakActorRef},
mailbox::Signal,
};
use netstack::netcore::Channel;
use tokio::sync::watch;
use crate::{
control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
};
pub mod capture;
pub mod control_runner;
mod dataplane;
mod derp_latency;
pub mod device_state;
mod direct;
mod env;
mod error;
pub mod fallback_tcp;
mod forwarder_actor;
pub mod funnel;
mod magic_dns;
mod multiderp;
mod netstack_actor;
mod packetfilter;
pub mod peer_tracker;
mod peerapi;
mod peerapi_doh;
mod route_updater;
pub mod serve;
mod src_filter;
pub mod status;
pub mod taildrop;
pub mod taildrop_send;
mod tka_sync;
#[cfg(feature = "tun")]
mod tun_actor;
pub use device_state::{DeviceState, RegistrationError};
pub(crate) use env::Env;
pub use error::{Error, ErrorKind};
pub use status::{FileTarget, NetcheckReport, RegionLatency, Status, StatusNode, WhoIs};
pub use ts_dataplane::{CaptureHook, CapturePath};
use crate::peer_tracker::PeerTracker;
pub struct Runtime {
pub control: ActorRef<ControlRunner>,
dataplane: ActorRef<DataplaneActor>,
direct: ActorRef<DirectManager>,
netstack: Option<WeakActorRef<NetstackActor>>,
pub peer_tracker: WeakActorRef<PeerTracker>,
fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
forwarder: ActorRef<ForwarderActor>,
env: Env,
shutdown: watch::Sender<bool>,
exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
state_rx: watch::Receiver<DeviceState>,
}
impl Runtime {
pub async fn spawn(
config: ts_control::Config,
auth_key: Option<String>,
keys: ts_keys::NodeState,
) -> Result<Self, Error> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (env, exit_node_tx) = Env::new_with_exit_tx(
keys,
shutdown_rx,
env::ForwarderConfig::from_control_config(&config),
);
let netstack_config = netstack_config_from(config.tcp_buffer_size);
let dataplane = DataplaneActor::spawn(env.clone());
let (netstack_id, netstack_up, netstack_down) =
dataplane.ask(dataplane::NewOverlayTransport).await?;
let (forwarder_id, forwarder_up, forwarder_down) =
dataplane.ask(dataplane::NewOverlayTransport).await?;
let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
let forwarder = ForwarderActor::spawn((
env.clone(),
netstack_config.clone(),
forwarder_up,
forwarder_down,
));
#[cfg_attr(not(feature = "tun"), allow(unused_variables))]
let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
let (active_exit_tx, active_exit_rx) = watch::channel(None);
route_updater::RouteUpdater::spawn((
multiderp.clone(),
direct.clone(),
env.clone(),
netstack_id,
forwarder_id,
active_exit_tx,
));
packetfilter::PacketfilterUpdater::spawn(env.clone());
src_filter::SourceFilterUpdater::spawn(env.clone());
let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
let (netstack, fallback_tcp) = match &config.transport_mode {
ts_control::TransportMode::Netstack => {
let netstack = NetstackActor::spawn((
env.clone(),
netstack_config,
netstack_up,
netstack_down,
));
let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
magic_dns::MagicDnsActor::spawn((env.clone(), channel));
(Some(netstack.downgrade()), Some(fallback_tcp))
}
#[cfg(feature = "tun")]
ts_control::TransportMode::Tun(tun_cfg) => {
tun_actor::TunActor::spawn((
env.clone(),
tun_cfg.clone(),
netstack_up,
netstack_down,
tun_actor::HostRouteGating {
accept_routes: env.accept_routes,
exit_node_configured: env.exit_node().is_some(),
},
forwarder_channel.clone(),
));
(None, None)
}
#[cfg(not(feature = "tun"))]
ts_control::TransportMode::Tun(_) => {
return Err(Error {
kind: ErrorKind::TunUnavailable,
target_actor: None,
message_ty: None,
});
}
};
let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
let control = ControlRunner::spawn(control_runner::Params {
config,
auth_key,
env: env.clone(),
state_tx,
});
Ok(Self {
control,
dataplane,
direct,
peer_tracker,
fallback_tcp,
forwarder,
netstack,
env,
shutdown: shutdown_tx,
exit_node_tx,
active_exit_rx,
state_rx,
})
}
pub fn register_fallback_tcp_handler(
&self,
cb: Arc<
dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
+ Send
+ Sync,
>,
) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
Ok(self
.fallback_tcp
.as_ref()
.ok_or(Error {
kind: ErrorKind::UnsupportedInTunMode,
target_actor: None,
message_ty: None,
})?
.register(cb))
}
pub async fn channel(&self) -> Result<Channel, Error> {
let (channel,) = self
.netstack
.as_ref()
.ok_or(Error {
kind: ErrorKind::UnsupportedInTunMode,
target_actor: None,
message_ty: None,
})?
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(netstack_actor::GetChannel)
.await?;
Ok(channel)
}
pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
self.env.taildrop_store.clone()
}
pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
self.env.funnel_ingress.clone()
}
pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
self.env.ingress_active.clone()
}
pub async fn install_capture(
&self,
hook: Option<ts_dataplane::CaptureHook>,
) -> Result<(), Error> {
self.dataplane
.ask(dataplane::InstallCapture { hook })
.await
.map_err(Into::into)
}
pub async fn rebind(&self) -> Result<(), Error> {
self.direct.ask(direct::Rebind).await.map_err(Error::from)
}
pub async fn status(&self) -> Result<Status, Error> {
let self_node_domain = self.control.ask(control_runner::SelfNode).await?;
let magic_dns_suffix = self_node_domain.as_ref().and_then(|n| n.tailnet.clone());
let self_node = self_node_domain.as_ref().map(StatusNode::from_node);
let peers_with_ids = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::GetStatus)
.await?;
let ids: Vec<ts_transport::PeerId> = peers_with_ids.iter().map(|(id, _)| *id).collect();
let best_addrs = self
.direct
.ask(direct::BestAddrs { ids })
.await
.unwrap_or_default();
let peers = peers_with_ids
.into_iter()
.map(|(id, mut node)| {
node.cur_addr = best_addrs.get(&id).copied();
node
})
.collect();
Ok(Status {
self_node,
peers,
active_exit_node: self.active_exit_node(),
magic_dns_suffix,
})
}
pub async fn file_targets(&self) -> Result<Vec<FileTarget>, Error> {
let self_node = self.control.ask(control_runner::SelfNode).await?;
let Some(self_node) = self_node else {
return Ok(Vec::new()); };
if !self_node.can_share_files() {
return Ok(Vec::new()); }
let self_user_id = self_node.user_id;
let peers = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::AllPeers)
.await?;
Ok(status::build_file_targets(peers, self_user_id))
}
pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
self.active_exit_rx.borrow().clone()
}
pub async fn fetch_id_token(
&self,
audience: String,
) -> Result<String, ts_control::IdTokenError> {
self.control
.ask(control_runner::FetchIdToken { audience })
.await
.map_err(flatten_send_err)
}
pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
self.control
.ask(control_runner::Logout)
.await
.map_err(flatten_logout_send_err)
}
pub async fn set_dns(
&self,
name: String,
value: String,
) -> Result<(), ts_control::SetDnsError> {
self.control
.ask(control_runner::SetDns { name, value })
.await
.map_err(flatten_set_dns_send_err)
}
#[cfg(feature = "acme")]
pub async fn get_certificate(
&self,
name: String,
) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
self.control
.ask(control_runner::GetCertificate { name })
.await
.map_err(flatten_cert_send_err)
}
pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
self.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::Whois { addr })
.await
.map_err(Into::into)
}
pub async fn set_exit_node(
&self,
selector: Option<ts_control::ExitNodeSelector>,
) -> Result<(), Error> {
self.exit_node_tx.send_replace(selector);
self.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::RepublishState)
.await
.map_err(Into::into)
}
pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
self.env.exit_node()
}
pub async fn set_advertise_routes(&self, routes: Vec<ipnet::IpNet>) -> Result<(), Error> {
let filtered = filter_advertise_routes(routes);
self.forwarder
.ask(forwarder_actor::UpdateRoutes {
routes: filtered.clone(),
})
.await?;
self.control
.ask(control_runner::SetAdvertiseRoutes { routes: filtered })
.await
.map_err(Into::into)
}
pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
self.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::WatchNetmap)
.await
.map_err(Into::into)
}
pub fn device_state(&self) -> DeviceState {
self.state_rx.borrow().clone()
}
pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
self.state_rx.clone()
}
pub async fn wait_until_running(
&self,
timeout: Option<Duration>,
) -> Result<(), RegistrationError> {
device_state::wait_for_running(self.state_rx.clone(), timeout).await
}
pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
self.shutdown.send_replace(true);
async fn _shutdown_all(runtime: Runtime) {
let _ignore = runtime.control.stop_gracefully().await;
let _ignore = runtime.dataplane.stop_gracefully().await;
let _ignore = runtime.env.bus.stop_gracefully().await;
tokio::join![
runtime.control.wait_for_shutdown(),
runtime.dataplane.wait_for_shutdown(),
runtime.env.bus.wait_for_shutdown(),
];
}
let fut = _shutdown_all(self);
match timeout {
Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
None => {
fut.await;
true
}
}
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if *self.shutdown.borrow() {
self.control.kill();
self.dataplane.kill();
self.env.bus.kill();
return;
}
self.shutdown.send_replace(true);
try_shutdown(&self.control);
try_shutdown(&self.dataplane);
try_shutdown(&self.env.bus);
}
}
fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
tracing::error!(error = %e, "graceful shutdown failed, killing actor");
a.kill();
}
}
fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
let mut c = netstack::netcore::Config::default();
if let Some(tcp_buffer_size) = tcp_buffer_size {
c.tcp_buffer_size = tcp_buffer_size;
}
c
}
fn filter_advertise_routes(routes: Vec<ipnet::IpNet>) -> Vec<ipnet::IpNet> {
let mut filtered: Vec<ipnet::IpNet> = Vec::new();
for net in routes {
if matches!(net, ipnet::IpNet::V4(_)) {
if !filtered.contains(&net) {
filtered.push(net);
}
} else {
tracing::warn!(prefix = %net, "dropping IPv6 advertise route (IPv6-off posture)");
}
}
filtered
}
fn flatten_send_err<M>(
e: kameo::error::SendError<M, ts_control::IdTokenError>,
) -> ts_control::IdTokenError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::IdTokenError::NetworkError,
}
}
fn flatten_logout_send_err<M>(
e: kameo::error::SendError<M, ts_control::LogoutError>,
) -> ts_control::LogoutError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::LogoutError::NetworkError,
}
}
fn flatten_set_dns_send_err<M>(
e: kameo::error::SendError<M, ts_control::SetDnsError>,
) -> ts_control::SetDnsError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::SetDnsError::NetworkError,
}
}
#[cfg(feature = "acme")]
fn flatten_cert_send_err<M>(
e: kameo::error::SendError<M, ts_control::CertError>,
) -> ts_control::CertError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::CertError::Io(std::io::Error::other(
"control runner unavailable for certificate issuance",
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn netstack_config_none_uses_netstack_default() {
let default = netstack::netcore::Config::default();
let built = netstack_config_from(None);
assert_eq!(
built.tcp_buffer_size, default.tcp_buffer_size,
"None must inherit the netstack default TCP buffer size"
);
}
#[test]
fn netstack_config_some_overrides_buffer() {
let built = netstack_config_from(Some(64 * 1024));
assert_eq!(
built.tcp_buffer_size,
64 * 1024,
"Some(n) must override the TCP buffer size that both netstacks use"
);
}
#[test]
fn filter_advertise_routes_keeps_v4_dedups_drops_v6() {
let v4a: ipnet::IpNet = "10.0.0.0/24".parse().unwrap();
let v4b: ipnet::IpNet = "192.168.1.0/24".parse().unwrap();
let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
let out = filter_advertise_routes(vec![v4a, v6, v4b, v4a]);
assert_eq!(
out,
vec![v4a, v4b],
"v6 dropped, duplicate v4 collapsed, first-occurrence order preserved"
);
}
#[test]
fn filter_advertise_routes_all_v6_is_empty() {
let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
assert!(filter_advertise_routes(vec![v6]).is_empty());
}
#[test]
fn flatten_send_err_handler_error_passes_through() {
let bytes = vec![0xffu8, 0xfe];
let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
let inner = ts_control::IdTokenError::from(utf8_err);
assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_send_err(e), inner);
}
#[test]
fn flatten_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::ActorStopped;
assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
}
#[test]
fn flatten_send_err_actor_not_running_is_network_error() {
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
audience: "sts.amazonaws.com".to_string(),
});
assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
}
#[test]
fn flatten_logout_send_err_handler_error_passes_through() {
let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_logout_send_err(e), inner);
}
#[test]
fn flatten_logout_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
kameo::error::SendError::ActorStopped;
assert_eq!(
flatten_logout_send_err(e),
ts_control::LogoutError::NetworkError
);
}
#[test]
fn flatten_set_dns_send_err_handler_error_passes_through() {
let inner = ts_control::SetDnsError::Internal(ts_control::SetDnsInternalErrorKind::Http);
assert!(matches!(inner, ts_control::SetDnsError::Internal(_)));
let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_set_dns_send_err(e), inner);
}
#[test]
fn flatten_set_dns_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
kameo::error::SendError::ActorStopped;
assert_eq!(
flatten_set_dns_send_err(e),
ts_control::SetDnsError::NetworkError
);
}
}