#![doc = include_str!("../README.md")]
extern crate ts_netstack_smoltcp as netstack;
use core::time::Duration;
use std::sync::Arc;
use kameo::{
actor::{ActorRef, Spawn, WeakActorRef},
mailbox::Signal,
};
use netstack::netcore::Channel;
use tokio::sync::watch;
use crate::{
control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
};
pub mod capture;
pub mod control_runner;
mod dataplane;
mod derp_latency;
pub mod device_state;
mod direct;
mod env;
mod error;
pub mod fallback_tcp;
mod forwarder_actor;
pub mod funnel;
pub mod ipn_bus;
mod magic_dns;
mod multiderp;
mod netstack_actor;
mod packetfilter;
pub mod peer_tracker;
mod peerapi;
mod peerapi_doh;
mod route_updater;
pub mod serve;
mod src_filter;
pub mod status;
pub mod taildrop;
pub mod taildrop_send;
mod tka_sync;
#[cfg(feature = "tun")]
mod tun_actor;
pub use device_state::{DeviceState, RegistrationError};
pub(crate) use env::Env;
pub use error::{Error, ErrorKind};
pub use ipn_bus::{IpnBusWatcher, Notify, NotifyWatchOpt};
pub use status::{FileTarget, NetcheckReport, RegionLatency, Status, StatusNode, WhoIs};
pub use ts_dataplane::{CaptureHook, CapturePath};
use crate::peer_tracker::PeerTracker;
pub struct Runtime {
pub control: ActorRef<ControlRunner>,
dataplane: ActorRef<DataplaneActor>,
direct: ActorRef<DirectManager>,
netstack: Option<WeakActorRef<NetstackActor>>,
pub peer_tracker: WeakActorRef<PeerTracker>,
fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
forwarder: ActorRef<ForwarderActor>,
multiderp: ActorRef<Multiderp>,
env: Env,
shutdown: watch::Sender<bool>,
exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
state_rx: watch::Receiver<DeviceState>,
cap_grants_rx: watch::Receiver<packetfilter::CapGrants>,
advertise: std::sync::Mutex<AdvertiseState>,
}
impl Runtime {
pub async fn spawn(
config: ts_control::Config,
auth_key: Option<String>,
keys: ts_keys::NodeState,
) -> Result<Self, Error> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (env, exit_node_tx) = Env::new_with_exit_tx(
keys,
shutdown_rx,
env::ForwarderConfig::from_control_config(&config),
);
let netstack_config = netstack_config_from(config.tcp_buffer_size);
let dataplane = DataplaneActor::spawn(env.clone());
let (netstack_id, netstack_up, netstack_down) =
dataplane.ask(dataplane::NewOverlayTransport).await?;
let (forwarder_id, forwarder_up, forwarder_down) =
dataplane.ask(dataplane::NewOverlayTransport).await?;
let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
let forwarder = ForwarderActor::spawn((
env.clone(),
netstack_config.clone(),
forwarder_up,
forwarder_down,
));
#[cfg_attr(not(feature = "tun"), allow(unused_variables))]
let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
let (active_exit_tx, active_exit_rx) = watch::channel(None);
route_updater::RouteUpdater::spawn((
multiderp.clone(),
direct.clone(),
env.clone(),
netstack_id,
forwarder_id,
active_exit_tx,
));
let (cap_grants_tx, cap_grants_rx) = watch::channel(Default::default());
packetfilter::PacketfilterUpdater::spawn((env.clone(), cap_grants_tx));
src_filter::SourceFilterUpdater::spawn(env.clone());
let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
let (netstack, fallback_tcp) = match &config.transport_mode {
ts_control::TransportMode::Netstack => {
let netstack = NetstackActor::spawn((
env.clone(),
netstack_config,
netstack_up,
netstack_down,
));
let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
magic_dns::MagicDnsActor::spawn((env.clone(), channel));
(Some(netstack.downgrade()), Some(fallback_tcp))
}
#[cfg(feature = "tun")]
ts_control::TransportMode::Tun(tun_cfg) => {
tun_actor::TunActor::spawn((
env.clone(),
tun_cfg.clone(),
netstack_up,
netstack_down,
tun_actor::HostRouteGating {
accept_routes: env.accept_routes,
exit_node_configured: env.exit_node().is_some(),
},
forwarder_channel.clone(),
));
(None, None)
}
#[cfg(not(feature = "tun"))]
ts_control::TransportMode::Tun(_) => {
return Err(Error {
kind: ErrorKind::TunUnavailable,
target_actor: None,
message_ty: None,
});
}
};
let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
let advertise = std::sync::Mutex::new(AdvertiseState {
routes: config.advertise_routes.clone(),
exit_node: config.advertise_exit_node,
});
let control = ControlRunner::spawn(control_runner::Params {
config,
auth_key,
env: env.clone(),
state_tx,
});
Ok(Self {
control,
dataplane,
direct,
peer_tracker,
fallback_tcp,
forwarder,
multiderp,
netstack,
env,
shutdown: shutdown_tx,
exit_node_tx,
active_exit_rx,
state_rx,
cap_grants_rx,
advertise,
})
}
pub fn register_fallback_tcp_handler(
&self,
cb: Arc<
dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
+ Send
+ Sync,
>,
) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
Ok(self
.fallback_tcp
.as_ref()
.ok_or(Error {
kind: ErrorKind::UnsupportedInTunMode,
target_actor: None,
message_ty: None,
})?
.register(cb))
}
pub async fn channel(&self) -> Result<Channel, Error> {
let (channel,) = self
.netstack
.as_ref()
.ok_or(Error {
kind: ErrorKind::UnsupportedInTunMode,
target_actor: None,
message_ty: None,
})?
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(netstack_actor::GetChannel)
.await?;
Ok(channel)
}
pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
self.env.taildrop_store.clone()
}
pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
self.env.funnel_ingress.clone()
}
pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
self.env.ingress_active.clone()
}
pub async fn install_capture(
&self,
hook: Option<ts_dataplane::CaptureHook>,
) -> Result<(), Error> {
self.dataplane
.ask(dataplane::InstallCapture { hook })
.await
.map_err(Into::into)
}
pub async fn rebind(&self) -> Result<(), Error> {
self.direct.ask(direct::Rebind).await.map_err(Error::from)
}
pub async fn status(&self) -> Result<Status, Error> {
let self_node_domain = self.control.ask(control_runner::SelfNode).await?;
let magic_dns_suffix = self_node_domain.as_ref().and_then(|n| n.tailnet.clone());
let self_node = self_node_domain.as_ref().map(StatusNode::from_node);
let peers_with_ids = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::GetStatus)
.await?;
let ids: Vec<ts_transport::PeerId> = peers_with_ids.iter().map(|(id, _)| *id).collect();
let best_addrs = self
.direct
.ask(direct::BestAddrs { ids: ids.clone() })
.await
.unwrap_or_default();
let relay_ids: Vec<ts_transport::PeerId> = ids
.into_iter()
.filter(|id| !best_addrs.contains_key(id))
.collect();
let relay_codes = if relay_ids.is_empty() {
Default::default()
} else {
self.multiderp
.ask(multiderp::RelayCodesForPeers { ids: relay_ids })
.await
.unwrap_or_default()
};
let peers = peers_with_ids
.into_iter()
.map(|(id, mut node)| match best_addrs.get(&id).copied() {
Some(addr) => {
node.cur_addr = Some(addr);
node
}
None => {
node.relay = relay_codes.get(&id).cloned();
node
}
})
.collect();
Ok(Status {
self_node,
peers,
active_exit_node: self.active_exit_node(),
magic_dns_suffix,
})
}
pub async fn file_targets(&self) -> Result<Vec<FileTarget>, Error> {
let self_node = self.control.ask(control_runner::SelfNode).await?;
let Some(self_node) = self_node else {
return Ok(Vec::new()); };
if !self_node.can_share_files() {
return Ok(Vec::new()); }
let self_user_id = self_node.user_id;
let peers = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::AllPeers)
.await?;
Ok(status::build_file_targets(peers, self_user_id))
}
pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
self.active_exit_rx.borrow().clone()
}
pub async fn fetch_id_token(
&self,
audience: String,
) -> Result<String, ts_control::IdTokenError> {
self.control
.ask(control_runner::FetchIdToken { audience })
.await
.map_err(flatten_send_err)
}
pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
self.control
.ask(control_runner::Logout)
.await
.map_err(flatten_logout_send_err)
}
pub async fn set_dns(
&self,
name: String,
value: String,
) -> Result<(), ts_control::SetDnsError> {
self.control
.ask(control_runner::SetDns { name, value })
.await
.map_err(flatten_set_dns_send_err)
}
#[cfg(feature = "acme")]
pub async fn get_certificate(
&self,
name: String,
) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
self.control
.ask(control_runner::GetCertificate { name })
.await
.map_err(flatten_cert_send_err)
}
pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
let whois = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::Whois { addr })
.await?;
let Some(mut whois) = whois else {
return Ok(None);
};
let dst = addr.ip();
if let Some(self_node) = self.control.ask(control_runner::SelfNode).await? {
let src: core::net::IpAddr = if dst.is_ipv6() {
self_node.tailnet_address.ipv6.addr().into()
} else {
self_node.tailnet_address.ipv4.addr().into()
};
let grants = self.cap_grants_rx.borrow();
whois.cap_map = ts_packetfilter_state::caps_for(&grants, src, dst, |cap| {
self_node.has_node_attr(cap)
});
}
Ok(Some(whois))
}
pub async fn direct_path(
&self,
dst: core::net::IpAddr,
) -> Result<Option<(core::net::SocketAddr, Duration)>, Error> {
let peer_tracker = self.peer_tracker.upgrade().ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?;
let Some(node) = peer_tracker
.ask(peer_tracker::PeerByTailnetIp { ip: dst })
.await?
else {
return Ok(None);
};
let Some(disco) = node.disco_key else {
return Ok(None);
};
self.direct
.ask(direct::DirectPathLatency { disco })
.await
.map_err(Into::into)
}
pub async fn ping_disco(
&self,
dst: core::net::IpAddr,
timeout: Duration,
) -> Result<Option<(core::net::SocketAddr, Duration)>, Error> {
let peer_tracker = self.peer_tracker.upgrade().ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?;
let Some(node) = peer_tracker
.ask(peer_tracker::PeerByTailnetIp { ip: dst })
.await?
else {
return Ok(None);
};
let Some(disco) = node.disco_key else {
return Ok(None);
};
let Some(sock) = self.direct.ask(direct::SockHandle).await? else {
return Ok(None);
};
sock.ping_now(&disco, timeout).await.map_err(|_| Error {
kind: ErrorKind::ReplyErr,
target_actor: None,
message_ty: None,
})
}
pub async fn set_exit_node(
&self,
selector: Option<ts_control::ExitNodeSelector>,
) -> Result<(), Error> {
self.exit_node_tx.send_replace(selector);
self.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::RepublishState)
.await
.map_err(Into::into)
}
pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
self.env.exit_node()
}
pub async fn set_advertise_routes(&self, routes: Vec<ipnet::IpNet>) -> Result<(), Error> {
let composed = {
let mut adv = self.advertise.lock().unwrap_or_else(|p| p.into_inner());
adv.routes = routes;
compose_advertised_routes(adv.routes.clone(), adv.exit_node)
};
self.apply_advertised_routes(composed).await
}
pub async fn set_advertise_exit_node(&self, enable: bool) -> Result<(), Error> {
let composed = {
let mut adv = self.advertise.lock().unwrap_or_else(|p| p.into_inner());
adv.exit_node = enable;
compose_advertised_routes(adv.routes.clone(), adv.exit_node)
};
self.apply_advertised_routes(composed).await
}
async fn apply_advertised_routes(&self, composed: Vec<ipnet::IpNet>) -> Result<(), Error> {
self.forwarder
.ask(forwarder_actor::UpdateRoutes {
routes: composed.clone(),
})
.await?;
self.control
.ask(control_runner::SetAdvertiseRoutes { routes: composed })
.await
.map_err(Into::into)
}
pub async fn set_hostname(&self, hostname: String) -> Result<(), Error> {
self.control
.ask(control_runner::SetHostname { hostname })
.await
.map_err(Into::into)
}
pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
self.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::WatchNetmap)
.await
.map_err(Into::into)
}
pub fn device_state(&self) -> DeviceState {
self.state_rx.borrow().clone()
}
pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
self.state_rx.clone()
}
pub async fn wait_until_running(
&self,
timeout: Option<Duration>,
) -> Result<(), RegistrationError> {
device_state::wait_for_running(self.state_rx.clone(), timeout).await
}
pub async fn watch_ipn_bus(&self, mask: NotifyWatchOpt) -> Result<IpnBusWatcher, Error> {
let peer_rx = self
.peer_tracker
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(peer_tracker::WatchNetmap)
.await?;
let browser_rx = self.control.ask(control_runner::WatchBrowserUrl).await?;
Ok(ipn_bus::spawn_watcher(
mask,
self.state_rx.clone(),
peer_rx,
browser_rx,
self.shutdown.subscribe(),
))
}
pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
self.shutdown.send_replace(true);
async fn _shutdown_all(runtime: Runtime) {
let _ignore = runtime.control.stop_gracefully().await;
let _ignore = runtime.dataplane.stop_gracefully().await;
let _ignore = runtime.env.bus.stop_gracefully().await;
tokio::join![
runtime.control.wait_for_shutdown(),
runtime.dataplane.wait_for_shutdown(),
runtime.env.bus.wait_for_shutdown(),
];
}
let fut = _shutdown_all(self);
match timeout {
Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
None => {
fut.await;
true
}
}
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if *self.shutdown.borrow() {
self.control.kill();
self.dataplane.kill();
self.env.bus.kill();
return;
}
self.shutdown.send_replace(true);
try_shutdown(&self.control);
try_shutdown(&self.dataplane);
try_shutdown(&self.env.bus);
}
}
fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
tracing::error!(error = %e, "graceful shutdown failed, killing actor");
a.kill();
}
}
fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
let mut c = netstack::netcore::Config::default();
if let Some(tcp_buffer_size) = tcp_buffer_size {
c.tcp_buffer_size = tcp_buffer_size;
}
c
}
fn filter_advertise_routes(routes: Vec<ipnet::IpNet>) -> Vec<ipnet::IpNet> {
let mut filtered: Vec<ipnet::IpNet> = Vec::new();
for net in routes {
if matches!(net, ipnet::IpNet::V4(_)) {
if !filtered.contains(&net) {
filtered.push(net);
}
} else {
tracing::warn!(prefix = %net, "dropping IPv6 advertise route (IPv6-off posture)");
}
}
filtered
}
fn compose_advertised_routes(routes: Vec<ipnet::IpNet>, exit_node: bool) -> Vec<ipnet::IpNet> {
let mut filtered = filter_advertise_routes(routes);
if exit_node {
let default_v4 = ipnet::IpNet::V4(
ipnet::Ipv4Net::new(core::net::Ipv4Addr::UNSPECIFIED, 0)
.expect("0.0.0.0/0 is a valid prefix"),
);
if !filtered.contains(&default_v4) {
filtered.push(default_v4);
}
}
filtered
}
#[derive(Debug, Default, Clone)]
struct AdvertiseState {
routes: Vec<ipnet::IpNet>,
exit_node: bool,
}
fn flatten_send_err<M>(
e: kameo::error::SendError<M, ts_control::IdTokenError>,
) -> ts_control::IdTokenError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::IdTokenError::NetworkError,
}
}
fn flatten_logout_send_err<M>(
e: kameo::error::SendError<M, ts_control::LogoutError>,
) -> ts_control::LogoutError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::LogoutError::NetworkError,
}
}
fn flatten_set_dns_send_err<M>(
e: kameo::error::SendError<M, ts_control::SetDnsError>,
) -> ts_control::SetDnsError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::SetDnsError::NetworkError,
}
}
#[cfg(feature = "acme")]
fn flatten_cert_send_err<M>(
e: kameo::error::SendError<M, ts_control::CertError>,
) -> ts_control::CertError {
match e {
kameo::error::SendError::HandlerError(err) => err,
_ => ts_control::CertError::Io(std::io::Error::other(
"control runner unavailable for certificate issuance",
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn netstack_config_none_uses_netstack_default() {
let default = netstack::netcore::Config::default();
let built = netstack_config_from(None);
assert_eq!(
built.tcp_buffer_size, default.tcp_buffer_size,
"None must inherit the netstack default TCP buffer size"
);
}
#[test]
fn netstack_config_some_overrides_buffer() {
let built = netstack_config_from(Some(64 * 1024));
assert_eq!(
built.tcp_buffer_size,
64 * 1024,
"Some(n) must override the TCP buffer size that both netstacks use"
);
}
#[test]
fn filter_advertise_routes_keeps_v4_dedups_drops_v6() {
let v4a: ipnet::IpNet = "10.0.0.0/24".parse().unwrap();
let v4b: ipnet::IpNet = "192.168.1.0/24".parse().unwrap();
let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
let out = filter_advertise_routes(vec![v4a, v6, v4b, v4a]);
assert_eq!(
out,
vec![v4a, v4b],
"v6 dropped, duplicate v4 collapsed, first-occurrence order preserved"
);
}
#[test]
fn filter_advertise_routes_all_v6_is_empty() {
let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
assert!(filter_advertise_routes(vec![v6]).is_empty());
}
#[test]
fn compose_advertised_routes_folds_exit_node() {
let subnet: ipnet::IpNet = "10.0.0.0/24".parse().unwrap();
let default_v4: ipnet::IpNet = "0.0.0.0/0".parse().unwrap();
assert_eq!(
compose_advertised_routes(vec![subnet], false),
vec![subnet],
"exit-node off ⇒ no default route"
);
assert_eq!(
compose_advertised_routes(vec![subnet], true),
vec![subnet, default_v4],
"exit-node on ⇒ 0.0.0.0/0 appended"
);
assert_eq!(
compose_advertised_routes(vec![], true),
vec![default_v4],
"exit-node alone advertises only 0.0.0.0/0"
);
assert_eq!(
compose_advertised_routes(vec![default_v4], true),
vec![default_v4],
"the exit-node fold dedups against an explicit default route"
);
}
#[test]
fn flatten_send_err_handler_error_passes_through() {
let bytes = vec![0xffu8, 0xfe];
let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
let inner = ts_control::IdTokenError::from(utf8_err);
assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_send_err(e), inner);
}
#[test]
fn flatten_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::ActorStopped;
assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
}
#[test]
fn flatten_send_err_actor_not_running_is_network_error() {
let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
audience: "sts.amazonaws.com".to_string(),
});
assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
}
#[test]
fn flatten_logout_send_err_handler_error_passes_through() {
let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_logout_send_err(e), inner);
}
#[test]
fn flatten_logout_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
kameo::error::SendError::ActorStopped;
assert_eq!(
flatten_logout_send_err(e),
ts_control::LogoutError::NetworkError
);
}
#[test]
fn flatten_set_dns_send_err_handler_error_passes_through() {
let inner = ts_control::SetDnsError::Internal(ts_control::SetDnsInternalErrorKind::Http);
assert!(matches!(inner, ts_control::SetDnsError::Internal(_)));
let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
kameo::error::SendError::HandlerError(inner.clone());
assert_eq!(flatten_set_dns_send_err(e), inner);
}
#[test]
fn flatten_set_dns_send_err_actor_stopped_is_network_error() {
let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
kameo::error::SendError::ActorStopped;
assert_eq!(
flatten_set_dns_send_err(e),
ts_control::SetDnsError::NetworkError
);
}
}