use std::io;
use std::net::Ipv4Addr;
use crate::config::{Config, DataPlaneMode};
use crate::engine::{Action, BufferedPacket};
#[derive(Debug)]
pub enum DataPlaneEvent {
Packet {
destination: Ipv4Addr,
packet: BufferedPacket,
},
LocalDelivery {
packet: BufferedPacket,
},
}
pub enum DataPlane {
ControlOnly,
#[cfg(any(target_os = "linux", target_os = "windows"))]
TunOverlay(tun_overlay::TunOverlay),
#[cfg(target_os = "linux")]
KernelRoutes(linux_routes::KernelRoutes),
}
impl DataPlane {
pub async fn new(config: &Config) -> io::Result<Self> {
match config.data_plane {
DataPlaneMode::ControlOnly => Ok(Self::ControlOnly),
DataPlaneMode::TunOverlay => {
#[cfg(any(target_os = "linux", target_os = "windows"))]
{
tun_overlay::TunOverlay::new(config)
.await
.map(Self::TunOverlay)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
unsupported("tun-overlay")
}
}
DataPlaneMode::KernelRoutes => {
#[cfg(target_os = "linux")]
{
linux_routes::KernelRoutes::new(config)
.await
.map(Self::KernelRoutes)
}
#[cfg(not(target_os = "linux"))]
{
unsupported("kernel-routes")
}
}
}
}
pub fn has_events(&self) -> bool {
#[cfg(any(target_os = "linux", target_os = "windows"))]
{
matches!(self, Self::TunOverlay(_))
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
false
}
}
pub async fn next_event(&mut self) -> io::Result<DataPlaneEvent> {
match self {
Self::ControlOnly => std::future::pending().await,
#[cfg(any(target_os = "linux", target_os = "windows"))]
Self::TunOverlay(tun) => tun.next_event().await,
#[cfg(target_os = "linux")]
Self::KernelRoutes(_) => std::future::pending().await,
}
}
pub async fn handle_action(&mut self, action: Action) -> io::Result<()> {
match self {
Self::ControlOnly => log_control_action(action),
#[cfg(any(target_os = "linux", target_os = "windows"))]
Self::TunOverlay(tun) => tun.handle_action(action).await,
#[cfg(target_os = "linux")]
Self::KernelRoutes(routes) => routes.handle_action(action).await,
}
}
pub async fn deliver_local(&mut self, packet: BufferedPacket) -> io::Result<()> {
match self {
Self::ControlOnly => {
tracing::debug!(
id = packet.id,
length = packet.payload.len(),
"local data packet"
);
Ok(())
}
#[cfg(any(target_os = "linux", target_os = "windows"))]
Self::TunOverlay(tun) => tun.deliver_local(packet).await,
#[cfg(target_os = "linux")]
Self::KernelRoutes(_) => Ok(()),
}
}
}
fn log_control_action(action: Action) -> io::Result<()> {
match action {
Action::ForwardBufferedPackets {
destination,
next_hop,
packets,
} => tracing::info!(
%destination,
%next_hop,
count = packets.len(),
"buffered packets ready to forward"
),
Action::DropBufferedPackets {
destination,
packets,
} => tracing::warn!(
%destination,
count = packets.len(),
"dropping buffered packets"
),
Action::RouteDiscovered {
destination,
next_hop,
hop_count,
} => tracing::info!(%destination, %next_hop, hop_count, "route discovered"),
Action::RouteInvalidated {
destination,
next_hop,
} => tracing::info!(%destination, %next_hop, "route invalidated"),
Action::RouteDiscoveryFailed { destination } => {
tracing::warn!(%destination, "route discovery failed");
}
Action::LocalRepairStarted { destination, ttl } => {
tracing::info!(%destination, ttl, "local repair started");
}
Action::LocalRepairFailed { destination } => {
tracing::warn!(%destination, "local repair failed");
}
Action::Send(_) => {}
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn unsupported(mode: &str) -> io::Result<DataPlane> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("{mode} data plane is not supported on this platform"),
))
}
#[cfg(any(target_os = "linux", target_os = "windows"))]
mod tun_overlay {
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use etherparse::Ipv4HeaderSlice;
use tokio::net::UdpSocket;
use tun_rs::{AsyncDevice, DeviceBuilder};
use super::{DataPlaneEvent, log_control_action};
use crate::config::Config;
use crate::engine::{Action, BufferedPacket};
#[cfg(target_os = "linux")]
use super::linux_routes::LinuxRoutes;
pub struct TunOverlay {
device: AsyncDevice,
data_socket: UdpSocket,
data_port: u16,
mtu: usize,
#[cfg(target_os = "linux")]
routes: LinuxRoutes,
}
impl TunOverlay {
pub async fn new(config: &Config) -> io::Result<Self> {
if let Some(tun_ip) = config.tun_ip
&& config.local_ip != tun_ip
{
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"--local-ip must match --tun-ip in tun-overlay mode; use --bind-ip for the underlay socket address",
));
}
let mut builder = DeviceBuilder::new()
.name(config.tun_name.clone())
.mtu(config.tun_mtu);
if let Some(tun_ip) = config.tun_ip {
builder = builder.ipv4(tun_ip, config.tun_prefix, None);
}
#[cfg(target_os = "windows")]
{
if config.tun_ip.is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"--tun-ip is required for Windows tun-overlay mode",
));
}
builder = builder
.description(format!("aodv-rs {}", config.tun_name))
.metric(config.route_metric.min(u16::MAX as u32) as u16);
}
let device = builder.build_async().map_err(|error| {
#[cfg(target_os = "windows")]
{
if error.kind() == io::ErrorKind::NotFound {
return io::Error::new(
error.kind(),
format!(
"failed to create/open Wintun adapter {}: {error}; ensure wintun.dll is available next to the executable or in PATH",
config.tun_name
),
);
}
}
io::Error::new(
error.kind(),
format!(
"failed to create/open TUN device {}: {error}",
config.tun_name
),
)
})?;
let actual_name = device.name()?;
let data_socket = UdpSocket::bind((config.bind_ip, config.data_port())).await?;
#[cfg(target_os = "linux")]
let routes = LinuxRoutes::new(actual_name.clone(), config.route_metric).await?;
tracing::info!(
tun = %actual_name,
data_port = config.data_port(),
mtu = config.tun_mtu,
tun_ip = ?config.tun_ip,
tun_prefix = config.tun_prefix,
"TUN overlay data plane started"
);
Ok(Self {
device,
data_socket,
data_port: config.data_port(),
mtu: config.tun_mtu as usize,
#[cfg(target_os = "linux")]
routes,
})
}
pub async fn next_event(&mut self) -> io::Result<DataPlaneEvent> {
let mut tun_buffer = vec![0_u8; self.mtu.max(1500)];
let mut udp_buffer = vec![0_u8; self.mtu.max(1500)];
tokio::select! {
result = self.device.recv(&mut tun_buffer) => {
let length = result?;
tun_buffer.truncate(length);
let destination = ipv4_destination(&tun_buffer)?;
Ok(DataPlaneEvent::Packet {
destination,
packet: BufferedPacket {
id: 0,
payload: tun_buffer,
},
})
}
result = self.data_socket.recv_from(&mut udp_buffer) => {
let (length, source) = result?;
udp_buffer.truncate(length);
let destination = ipv4_destination(&udp_buffer)?;
let source = match source.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(_) => Ipv4Addr::UNSPECIFIED,
};
tracing::debug!(%source, %destination, length, "received overlay data packet");
Ok(DataPlaneEvent::Packet {
destination,
packet: BufferedPacket {
id: 0,
payload: udp_buffer,
},
})
}
}
}
pub async fn handle_action(&mut self, action: Action) -> io::Result<()> {
match action {
Action::ForwardBufferedPackets {
destination,
next_hop,
packets,
} => {
for packet in packets {
self.forward_packet(destination, next_hop, packet).await?;
}
Ok(())
}
Action::RouteDiscovered { destination, .. } => {
self.install_tun_route(destination).await
}
Action::RouteInvalidated { destination, .. }
| Action::RouteDiscoveryFailed { destination } => {
self.remove_tun_route(destination).await
}
other => log_control_action(other),
}
}
pub async fn deliver_local(&self, packet: BufferedPacket) -> io::Result<()> {
self.device.send(&packet.payload).await?;
Ok(())
}
async fn forward_packet(
&self,
destination: Ipv4Addr,
next_hop: Ipv4Addr,
packet: BufferedPacket,
) -> io::Result<()> {
let target = SocketAddr::new(IpAddr::V4(next_hop), self.data_port);
self.data_socket.send_to(&packet.payload, target).await?;
tracing::debug!(
%destination,
%next_hop,
id = packet.id,
length = packet.payload.len(),
"forwarded overlay data packet"
);
Ok(())
}
#[cfg(target_os = "linux")]
async fn install_tun_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
self.routes.install_dev_route(destination).await
}
#[cfg(target_os = "windows")]
async fn install_tun_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
tracing::debug!(%destination, "route discovered for Wintun overlay");
Ok(())
}
#[cfg(target_os = "linux")]
async fn remove_tun_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
self.routes.remove_route(destination).await
}
#[cfg(target_os = "windows")]
async fn remove_tun_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
tracing::debug!(%destination, "route removed from Wintun overlay state");
Ok(())
}
}
fn ipv4_destination(packet: &[u8]) -> io::Result<Ipv4Addr> {
let header = Ipv4HeaderSlice::from_slice(packet).map_err(|error| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("data plane packet is not valid IPv4: {error}"),
)
})?;
Ok(header.destination_addr())
}
}
#[cfg(target_os = "linux")]
mod linux_routes {
use std::collections::BTreeSet;
use std::io;
use std::net::Ipv4Addr;
use futures_util::TryStreamExt;
use rtnetlink::{Handle, RouteMessageBuilder, new_connection};
use super::log_control_action;
use crate::config::Config;
use crate::engine::Action;
#[derive(Debug)]
pub struct KernelRoutes {
routes: LinuxRoutes,
}
#[derive(Debug)]
pub struct LinuxRoutes {
handle: Handle,
interface: String,
route_metric: u32,
managed: BTreeSet<Ipv4Addr>,
}
impl KernelRoutes {
pub async fn new(config: &Config) -> io::Result<Self> {
let interface = config.interface.clone().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"--interface is required for kernel-routes data plane",
)
})?;
Ok(Self {
routes: LinuxRoutes::new(interface, config.route_metric).await?,
})
}
pub async fn handle_action(&mut self, action: Action) -> io::Result<()> {
match action {
Action::RouteDiscovered {
destination,
next_hop,
..
} => {
self.routes
.install_gateway_route(destination, next_hop)
.await
}
Action::RouteInvalidated { destination, .. }
| Action::RouteDiscoveryFailed { destination } => {
self.routes.remove_route(destination).await
}
other => log_control_action(other),
}
}
}
impl LinuxRoutes {
pub async fn new(interface: String, route_metric: u32) -> io::Result<Self> {
let (connection, handle, _) =
new_connection().map_err(|error| io::Error::other(error.to_string()))?;
tokio::spawn(connection);
Ok(Self {
handle,
interface,
route_metric,
managed: BTreeSet::new(),
})
}
pub async fn install_dev_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
let ifindex = self.interface_index().await?;
self.remove_route(destination).await?;
let mut route = RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(destination, 32)
.output_interface(ifindex)
.build();
route
.attributes
.push(rtnetlink::packet_route::route::RouteAttribute::Priority(
self.route_metric,
));
self.handle
.route()
.add(route)
.execute()
.await
.map_err(|error| io::Error::other(error.to_string()))?;
self.managed.insert(destination);
Ok(())
}
async fn install_gateway_route(
&mut self,
destination: Ipv4Addr,
next_hop: Ipv4Addr,
) -> io::Result<()> {
let ifindex = self.interface_index().await?;
self.remove_route(destination).await?;
let mut route = RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(destination, 32)
.gateway(next_hop)
.output_interface(ifindex)
.build();
route
.attributes
.push(rtnetlink::packet_route::route::RouteAttribute::Priority(
self.route_metric,
));
self.handle
.route()
.add(route)
.execute()
.await
.map_err(|error| io::Error::other(error.to_string()))?;
self.managed.insert(destination);
Ok(())
}
pub async fn remove_route(&mut self, destination: Ipv4Addr) -> io::Result<()> {
if !self.managed.remove(&destination) {
return Ok(());
}
let route = RouteMessageBuilder::<Ipv4Addr>::new()
.destination_prefix(destination, 32)
.build();
match self.handle.route().del(route).execute().await {
Ok(()) => Ok(()),
Err(error) => {
tracing::debug!(%destination, %error, "managed route removal failed");
Ok(())
}
}
}
async fn interface_index(&self) -> io::Result<u32> {
let link = self
.handle
.link()
.get()
.match_name(self.interface.clone())
.execute()
.try_next()
.await
.map_err(|error| io::Error::other(error.to_string()))?
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("no interface found named {}", self.interface),
)
})?;
Ok(link.header.index)
}
}
}