ts_runtime 0.2.0

tailscale runtime
Documentation
use std::{collections::HashMap, sync::Arc};

use kameo::{
    actor::ActorRef,
    message::{Context, Message},
};
use ts_bart::RoutingTable;
use ts_keys::NodePublicKey;
use ts_overlay_router::{
    inbound::RouteAction as InboundRouteAction, outbound::RouteAction as OutboundRouteAction,
};
use ts_transport::{OverlayTransportId, UnderlayTransportId};

use crate::{Error, env::Env, multiderp, multiderp::Multiderp, peer_tracker::PeerState};

pub struct RouteUpdater {
    multiderp: ActorRef<Multiderp>,
    default_overlay_transport: OverlayTransportId,
    env: Env,
}

impl kameo::Actor for RouteUpdater {
    type Args = (ActorRef<Multiderp>, Env, OverlayTransportId);
    type Error = Error;

    async fn on_start(
        (multiderp, env, default_transport): Self::Args,
        actor_ref: ActorRef<Self>,
    ) -> Result<Self, Self::Error> {
        env.subscribe::<PeerState>(&actor_ref).await?;
        env.subscribe::<Arc<ts_control::StateUpdate>>(&actor_ref)
            .await?;

        Ok(Self {
            multiderp,
            default_overlay_transport: default_transport,
            env,
        })
    }
}

#[derive(Clone)]
pub struct SelfRouteUpdate {
    pub overlay_in_routes: Arc<ts_bart::Table<InboundRouteAction>>,
}

#[derive(Clone)]
pub struct PeerRouteUpdate {
    pub inner: Arc<PeerRoutesInner>,
}

pub struct PeerRoutesInner {
    pub underlay_routes: HashMap<NodePublicKey, UnderlayTransportId>,
    pub overlay_out_routes: ts_bart::Table<OutboundRouteAction>,
}

impl Message<PeerState> for RouteUpdater {
    type Reply = ();

    async fn handle(&mut self, msg: PeerState, _ctx: &mut Context<Self, Self::Reply>) {
        tracing::trace!(
            n_peers = msg.peers.len(),
            "reconstructing routes for peer update"
        );

        let mut overlay_out = ts_bart::Table::default();
        let mut underlay_out = HashMap::default();

        for peer in msg.peers.values() {
            let span = tracing::trace_span!(
                "peer_update",
                peer = %peer.node_key,
                region = tracing::field::Empty,
                underlay_transport = tracing::field::Empty,
            );

            let Some(region) = peer.derp_region else {
                tracing::trace!(parent: &span, "peer has no derp region");
                continue;
            };
            span.record("region", tracing::field::debug(region));

            tracing::trace!(parent: &span, "ask multiderp for transport id");

            match self
                .multiderp
                .ask(multiderp::TransportIdForRegion { id: region })
                .await
            {
                Ok(Some(transport_id)) => {
                    span.record("underlay_transport", tracing::field::debug(transport_id));
                    underlay_out.insert(peer.node_key, transport_id);
                    tracing::trace!(parent: &span, "set underlay route");
                }
                Ok(None) => {
                    tracing::error!(parent: &span, "no region stored in multiderp, no underlay route");
                }
                Err(e) => {
                    tracing::error!(error = %e, "multiderp unavailable");
                }
            }

            for route in &peer.accepted_routes {
                tracing::trace!(parent: &span, %route, "routes");

                overlay_out.insert(*route, OutboundRouteAction::Wireguard(peer.node_key));
            }
        }

        if let Err(e) = self
            .env
            .publish(PeerRouteUpdate {
                inner: Arc::new(PeerRoutesInner {
                    underlay_routes: underlay_out,
                    overlay_out_routes: overlay_out,
                }),
            })
            .await
        {
            tracing::error!(error = %e, "publishing peer route update");
        }
    }
}

impl Message<Arc<ts_control::StateUpdate>> for RouteUpdater {
    type Reply = ();

    async fn handle(
        &mut self,
        msg: Arc<ts_control::StateUpdate>,
        _ctx: &mut Context<Self, Self::Reply>,
    ) {
        let Some(node) = msg.node.as_ref() else {
            return;
        };

        let mut out = ts_bart::Table::default();

        tracing::debug!(accepted_routes = ?node.accepted_routes, "populating accepted routes");

        for &accepted_route in &node.accepted_routes {
            out.insert(
                accepted_route,
                InboundRouteAction::ToOverlay(self.default_overlay_transport),
            );
        }

        if let Err(e) = self
            .env
            .publish(SelfRouteUpdate {
                overlay_in_routes: Arc::new(out),
            })
            .await
        {
            tracing::error!(error = %e, "publishing self route update");
        }
    }
}