#![doc = include_str!("../README.md")]
extern crate ts_netstack_smoltcp as netstack;
use core::time::Duration;
use kameo::{
actor::{ActorRef, Spawn, WeakActorRef},
mailbox::Signal,
};
use netstack::netcore::Channel;
use tokio::sync::watch;
use crate::{
control_runner::ControlRunner, dataplane::DataplaneActor, multiderp::Multiderp,
netstack_actor::NetstackActor,
};
pub mod control_runner;
mod dataplane;
mod derp_latency;
mod env;
mod error;
mod multiderp;
mod netstack_actor;
mod packetfilter;
pub mod peer_tracker;
mod route_updater;
mod src_filter;
pub(crate) use env::Env;
pub use error::{Error, ErrorKind};
use crate::peer_tracker::PeerTracker;
pub struct Runtime {
pub control: ActorRef<ControlRunner>,
dataplane: ActorRef<DataplaneActor>,
netstack: WeakActorRef<NetstackActor>,
pub peer_tracker: WeakActorRef<PeerTracker>,
env: Env,
shutdown: watch::Sender<bool>,
}
impl Runtime {
pub async fn spawn(
config: ts_control::Config,
auth_key: Option<String>,
keys: ts_keys::NodeState,
) -> Result<Self, Error> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let env = Env::new(keys, shutdown_rx);
let dataplane = DataplaneActor::spawn(env.clone());
let (netstack_id, netstack_up, netstack_down) =
dataplane.ask(dataplane::NewOverlayTransport).await?;
let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
route_updater::RouteUpdater::spawn((multiderp.clone(), env.clone(), netstack_id));
packetfilter::PacketfilterUpdater::spawn(env.clone());
src_filter::SourceFilterUpdater::spawn(env.clone());
let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
let netstack =
NetstackActor::spawn((env.clone(), Default::default(), netstack_up, netstack_down));
let control = ControlRunner::spawn(control_runner::Params {
config,
auth_key,
env: env.clone(),
});
Ok(Self {
control,
dataplane,
peer_tracker,
netstack: netstack.downgrade(),
env,
shutdown: shutdown_tx,
})
}
pub async fn channel(&self) -> Result<Channel, Error> {
let (channel,) = self
.netstack
.upgrade()
.ok_or(Error {
kind: ErrorKind::ActorGone,
target_actor: None,
message_ty: None,
})?
.ask(netstack_actor::GetChannel)
.await?;
Ok(channel)
}
pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
self.shutdown.send_replace(true);
async fn _shutdown_all(runtime: Runtime) {
let _ignore = runtime.control.stop_gracefully().await;
let _ignore = runtime.dataplane.stop_gracefully().await;
let _ignore = runtime.env.bus.stop_gracefully().await;
tokio::join![
runtime.control.wait_for_shutdown(),
runtime.dataplane.wait_for_shutdown(),
runtime.env.bus.wait_for_shutdown(),
];
}
let fut = _shutdown_all(self);
match timeout {
Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
None => {
fut.await;
true
}
}
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if *self.shutdown.borrow() {
self.control.kill();
self.dataplane.kill();
self.env.bus.kill();
return;
}
self.shutdown.send_replace(true);
try_shutdown(&self.control);
try_shutdown(&self.dataplane);
try_shutdown(&self.env.bus);
}
}
fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
tracing::error!(error = %e, "graceful shutdown failed, killing actor");
a.kill();
}
}