use core::net::IpAddr;
use std::sync::Arc;
use kameo::{
actor::ActorRef,
message::{Context, Message},
};
use netstack::{
HasChannel,
netcore::{Channel, NetstackControl},
};
use tokio::task::JoinSet;
use ts_packet::PacketMut;
use crate::{
Error,
dataplane::{OverlayFromDataplane, OverlayToDataplane},
env::Env,
};
pub struct NetstackActor {
_joinset: JoinSet<()>,
channel: Channel,
enable_ipv6: bool,
}
fn overlay_addresses(self_node: &ts_control::Node, enable_ipv6: bool) -> Vec<IpAddr> {
let tailnet_address = &self_node.tailnet_address;
let mut addrs = vec![tailnet_address.ipv4.addr().into()];
if enable_ipv6 {
addrs.push(tailnet_address.ipv6.addr().into());
}
addrs.push(core::net::Ipv4Addr::new(100, 100, 100, 100).into());
for vip in self_node.service_addresses() {
if vip.is_ipv6() && !enable_ipv6 {
continue;
}
if !addrs.contains(&vip) {
addrs.push(vip);
}
}
addrs
}
impl kameo::Actor for NetstackActor {
type Args = (
Env,
netstack::netcore::Config,
OverlayToDataplane,
OverlayFromDataplane,
);
type Error = Error;
async fn on_start(
(env, config, netstack_up, mut netstack_down): Self::Args,
slf: ActorRef<Self>,
) -> Result<Self, Self::Error> {
env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
let enable_ipv6 = env.enable_ipv6;
let (
mut netstack,
netstack::WakingPipe {
rx: mut netstack_down_rx,
tx: netstack_down_tx,
},
) = netstack::piped(config);
let channel = netstack.command_channel();
let mut joinset = JoinSet::new();
joinset.spawn(async move {
netstack.run_tokio().await;
});
joinset.spawn(async move {
while let Some(buf) = netstack_down_rx.recv_async().await {
if netstack_up.send(vec![buf.to_vec().into()]).is_err() {
break;
}
}
tracing::warn!("netstack downlink shut down!");
});
joinset.spawn(async move {
while let Some(bufs) = netstack_down.recv().await {
for buf in bufs {
let buf: PacketMut = buf;
netstack_down_tx.send_async(buf.as_ref()).await;
}
}
tracing::warn!("netstack uplink shut down!");
});
Ok(Self {
_joinset: joinset,
channel,
enable_ipv6,
})
}
}
#[kameo::messages]
impl NetstackActor {
#[message]
pub fn get_channel(&self) -> (Channel,) {
(self.channel.clone(),)
}
}
impl Message<Arc<ts_control::StateUpdate>> for NetstackActor {
type Reply = ();
async fn handle(
&mut self,
msg: Arc<ts_control::StateUpdate>,
_ctx: &mut Context<Self, Self::Reply>,
) {
let Some(self_node) = &msg.node else {
return;
};
tracing::debug!(new_tailnet_ips = ?self_node.tailnet_address, self.enable_ipv6);
let ips = overlay_addresses(self_node, self.enable_ipv6);
if let Err(e) = self.channel.set_ips(ips).await {
tracing::error!(error = %e, "setting netstack ips");
}
}
}
#[cfg(test)]
mod tests {
use core::net::{IpAddr, Ipv4Addr};
use ipnet::{Ipv4Net, Ipv6Net};
use ts_control::{Node, NodeCapMap, StableNodeId, TailnetAddress};
use super::overlay_addresses;
fn tailnet_address() -> TailnetAddress {
TailnetAddress {
ipv4: Ipv4Net::new(Ipv4Addr::new(100, 64, 0, 1), 32).unwrap(),
ipv6: Ipv6Net::new(
core::net::Ipv6Addr::new(0xfd7a, 0x115c, 0xa1e0, 0, 0, 0, 0, 1),
128,
)
.unwrap(),
}
}
fn self_node(service_addresses: Vec<IpAddr>) -> Node {
let addr = tailnet_address();
let mut service_vips: std::collections::BTreeMap<String, Vec<IpAddr>> =
std::collections::BTreeMap::new();
if !service_addresses.is_empty() {
service_vips.insert("svc:test".to_string(), service_addresses);
}
Node {
id: 1,
stable_id: StableNodeId("n1".to_string()),
hostname: "host".to_string(),
user_id: 0,
tailnet: Some("tail1.ts.net".to_string()),
tags: vec![],
tailnet_address: addr,
node_key: [0u8; 32].into(),
node_key_expiry: None,
key_signature: vec![],
machine_key: None,
disco_key: None,
accepted_routes: vec![],
underlay_addresses: vec![],
derp_region: None,
cap: Default::default(),
cap_map: NodeCapMap::new(),
peerapi_port: None,
peerapi_dns_proxy: false,
is_wireguard_only: false,
exit_node_dns_resolvers: vec![],
peer_relay: false,
service_vips,
}
}
#[test]
fn gate_off_drops_ipv6_overlay_address() {
let node = self_node(vec![]);
let addr = &node.tailnet_address;
let ips = overlay_addresses(&node, false);
assert!(
!ips.iter().any(|ip| ip.is_ipv6()),
"gate-off address list must contain no IPv6 address: {ips:?}"
);
assert_eq!(
ips,
vec![
IpAddr::V4(addr.ipv4.addr()),
IpAddr::V4(Ipv4Addr::new(100, 100, 100, 100)),
],
"gate-off list must be exactly [ipv4, 100.100.100.100]"
);
}
#[test]
fn gate_on_includes_ipv6_overlay_address() {
let node = self_node(vec![]);
let addr = &node.tailnet_address;
let ips = overlay_addresses(&node, true);
assert!(
ips.contains(&IpAddr::V6(addr.ipv6.addr())),
"gate-on address list must contain the IPv6 overlay address: {ips:?}"
);
assert_eq!(
ips,
vec![
IpAddr::V4(addr.ipv4.addr()),
IpAddr::V6(addr.ipv6.addr()),
IpAddr::V4(Ipv4Addr::new(100, 100, 100, 100)),
],
"gate-on list must be exactly [ipv4, ipv6, 100.100.100.100]"
);
}
#[test]
fn vip_service_v4_address_is_accepted() {
let vip = IpAddr::V4(Ipv4Addr::new(100, 65, 32, 1));
let node = self_node(vec![vip]);
let ips = overlay_addresses(&node, false);
assert!(
ips.contains(&vip),
"the VIP-service address must be in the accepted set: {ips:?}"
);
}
#[test]
fn vip_service_v6_address_dropped_when_ipv6_disabled() {
let vip6: IpAddr = "fd7a:115c:a1e0::1234".parse().unwrap();
let vip4 = IpAddr::V4(Ipv4Addr::new(100, 65, 32, 1));
let node = self_node(vec![vip4, vip6]);
let ips = overlay_addresses(&node, false);
assert!(ips.contains(&vip4));
assert!(
!ips.contains(&vip6),
"IPv6 VIP must be dropped when IPv6 is disabled: {ips:?}"
);
}
#[test]
fn vip_service_v6_address_accepted_when_ipv6_enabled() {
let vip6: IpAddr = "fd7a:115c:a1e0::1234".parse().unwrap();
let node = self_node(vec![vip6]);
let ips = overlay_addresses(&node, true);
assert!(ips.contains(&vip6));
}
}