runnel-rs 0.2.2

A Rust proxy and tunnel toolbox with WireGuard-style, TUN, SOCKS, and TLS-based transports.
Documentation
use std::{collections::BTreeMap, net::SocketAddr, process::Stdio};

use tokio::{
    io::{AsyncBufReadExt, BufReader},
    process::Command,
    task::JoinHandle,
};
use tracing::{debug, info, warn};

use crate::telemetry;

pub(crate) struct TcpdumpGuard {
    handle: JoinHandle<()>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
struct PacketLine {
    src: String,
    dst: String,
    length: u64,
    direction: Option<&'static str>,
}

#[derive(Clone, Debug)]
pub(crate) enum TcpdumpFilter {
    Client { endpoint: SocketAddr },
    Server { listen: SocketAddr },
}

impl Drop for TcpdumpGuard {
    fn drop(&mut self) {
        self.handle.abort();
    }
}

pub(crate) fn start(
    role: &'static str,
    interface: Option<&str>,
    filter: TcpdumpFilter,
) -> TcpdumpGuard {
    let interface = interface
        .filter(|value| !value.trim().is_empty())
        .unwrap_or(default_interface())
        .to_owned();
    let handle = tokio::spawn(run_tcpdump(role, interface, filter));
    TcpdumpGuard { handle }
}

async fn run_tcpdump(role: &'static str, interface: String, filter: TcpdumpFilter) {
    let args = tcpdump_args(&interface, &filter);
    info!(
        role,
        interface = %interface,
        filter = %args[4..].join(" "),
        "starting wg tcpdump monitor"
    );

    let mut command = Command::new("tcpdump");
    command.args(&args);
    command.stdout(Stdio::piped());
    command.stderr(Stdio::piped());
    command.kill_on_drop(true);

    let mut child = match command.spawn() {
        Ok(child) => child,
        Err(error) => {
            warn!(
                role,
                interface = %interface,
                error = %error,
                "failed to start wg tcpdump monitor"
            );
            return;
        }
    };

    let stdout_task = child.stdout.take().map(|stdout| {
        tokio::spawn(read_tcpdump_stdout(
            role,
            interface.clone(),
            filter.clone(),
            stdout,
        ))
    });
    let stderr_task = child
        .stderr
        .take()
        .map(|stderr| tokio::spawn(read_tcpdump_stderr(role, interface.clone(), stderr)));

    match child.wait().await {
        Ok(status) if status.success() => {
            debug!(role, interface = %interface, "wg tcpdump monitor exited");
        }
        Ok(status) => {
            warn!(
                role,
                interface = %interface,
                status = %status,
                "wg tcpdump monitor exited"
            );
        }
        Err(error) => {
            warn!(
                role,
                interface = %interface,
                error = %error,
                "failed to wait for wg tcpdump monitor"
            );
        }
    }

    if let Some(task) = stdout_task {
        task.abort();
    }
    if let Some(task) = stderr_task {
        task.abort();
    }
}

async fn read_tcpdump_stdout(
    role: &'static str,
    interface: String,
    filter: TcpdumpFilter,
    stdout: tokio::process::ChildStdout,
) {
    let mut lines = BufReader::new(stdout).lines();
    while let Ok(Some(line)) = lines.next_line().await {
        if let Some(packet) = parse_packet_line(&line) {
            emit_packet(role, &interface, &filter, packet);
        }
    }
}

async fn read_tcpdump_stderr(
    role: &'static str,
    interface: String,
    stderr: tokio::process::ChildStderr,
) {
    let mut lines = BufReader::new(stderr).lines();
    while let Ok(Some(line)) = lines.next_line().await {
        let line = line.trim();
        if line.is_empty() {
            continue;
        }
        if line.starts_with("listening on ") || line.starts_with("tcpdump: verbose output") {
            debug!(role, interface = %interface, message = %line, "wg tcpdump monitor");
        } else {
            warn!(role, interface = %interface, message = %line, "wg tcpdump monitor");
        }
    }
}

fn tcpdump_args(interface: &str, filter: &TcpdumpFilter) -> Vec<String> {
    let mut args = vec![
        "-l".to_owned(),
        "-n".to_owned(),
        "-tt".to_owned(),
        "-i".to_owned(),
        interface.to_owned(),
        "udp".to_owned(),
        "and".to_owned(),
    ];
    match filter {
        TcpdumpFilter::Client { endpoint } => {
            args.extend([
                "host".to_owned(),
                endpoint.ip().to_string(),
                "and".to_owned(),
                "port".to_owned(),
                endpoint.port().to_string(),
            ]);
        }
        TcpdumpFilter::Server { listen } => {
            args.extend(["port".to_owned(), listen.port().to_string()]);
        }
    }
    args
}

fn emit_packet(role: &'static str, interface: &str, filter: &TcpdumpFilter, packet: PacketLine) {
    let mut fields = BTreeMap::new();
    fields.insert("mode".to_owned(), "wg".to_owned());
    fields.insert("role".to_owned(), role.to_owned());
    fields.insert("interface".to_owned(), interface.to_owned());
    fields.insert("src".to_owned(), packet.src.clone());
    fields.insert("dst".to_owned(), packet.dst.clone());
    fields.insert("length".to_owned(), packet.length.to_string());
    fields.insert("packet".to_owned(), packet_kind(packet.length).to_owned());
    fields.insert(
        "target".to_owned(),
        format!("{} > {}", packet.src, packet.dst),
    );
    fields.insert(
        "detail".to_owned(),
        format!("{} bytes {}", packet.length, packet_kind(packet.length)),
    );
    if let Some(direction) = packet
        .direction
        .or_else(|| infer_direction(filter, &packet.src, &packet.dst))
    {
        fields.insert("direction".to_owned(), direction.to_owned());
    }
    telemetry::emit("INFO", "wg tcpdump packet", fields);
}

fn parse_packet_line(line: &str) -> Option<PacketLine> {
    let length = parse_udp_length(line)?;
    let tokens = line.split_whitespace().collect::<Vec<_>>();
    let arrow = tokens.iter().position(|token| *token == ">")?;
    let src = tokens.get(arrow.checked_sub(1)?)?.trim_end_matches(':');
    let dst = tokens.get(arrow + 1)?.trim_end_matches(':');
    Some(PacketLine {
        src: src.to_owned(),
        dst: dst.to_owned(),
        length,
        direction: parse_direction(&tokens),
    })
}

fn parse_udp_length(line: &str) -> Option<u64> {
    let marker = " UDP, length ";
    let value = line.split_once(marker)?.1;
    value
        .split(|ch: char| !ch.is_ascii_digit())
        .next()?
        .parse()
        .ok()
}

fn parse_direction(tokens: &[&str]) -> Option<&'static str> {
    if tokens.contains(&"In") {
        return Some("in");
    }
    if tokens.contains(&"Out") {
        return Some("out");
    }
    None
}

fn infer_direction(filter: &TcpdumpFilter, src: &str, dst: &str) -> Option<&'static str> {
    match filter {
        TcpdumpFilter::Client { endpoint } => {
            let endpoint = tcpdump_addr(*endpoint);
            if dst == endpoint {
                Some("out")
            } else if src == endpoint {
                Some("in")
            } else {
                None
            }
        }
        TcpdumpFilter::Server { listen } => {
            let port_suffix = format!(".{}", listen.port());
            if dst.ends_with(&port_suffix) {
                Some("in")
            } else if src.ends_with(&port_suffix) {
                Some("out")
            } else {
                None
            }
        }
    }
}

fn tcpdump_addr(addr: SocketAddr) -> String {
    format!("{}.{}", addr.ip(), addr.port())
}

fn packet_kind(length: u64) -> &'static str {
    match length {
        148 => "handshake-init",
        92 => "handshake-response",
        64 => "cookie-reply",
        32 => "keepalive",
        _ => "data",
    }
}

fn default_interface() -> &'static str {
    #[cfg(target_os = "linux")]
    {
        "any"
    }
    #[cfg(target_os = "macos")]
    {
        "any"
    }
    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
    {
        "any"
    }
}

#[cfg(test)]
mod tests {
    use super::{PacketLine, TcpdumpFilter, infer_direction, parse_packet_line, tcpdump_args};
    use std::net::SocketAddr;

    #[test]
    fn parses_tcpdump_packet_line() {
        let line =
            "1713430000.158956 IP 192.168.3.72.64306 > 172.235.244.118.1443: UDP, length 148";
        assert_eq!(
            parse_packet_line(line),
            Some(PacketLine {
                src: "192.168.3.72.64306".to_owned(),
                dst: "172.235.244.118.1443".to_owned(),
                length: 148,
                direction: None,
            })
        );
    }

    #[test]
    fn parses_linux_any_direction() {
        let line = "1713430000.899841 eth0 In IP 120.231.241.251.3480 > 172.235.244.118.1443: UDP, length 148";
        assert_eq!(parse_packet_line(line).unwrap().direction, Some("in"));
    }

    #[test]
    fn infers_client_direction_from_endpoint() {
        let filter = TcpdumpFilter::Client {
            endpoint: "172.235.244.118:1443".parse::<SocketAddr>().unwrap(),
        };
        assert_eq!(
            infer_direction(&filter, "192.168.3.72.64306", "172.235.244.118.1443"),
            Some("out")
        );
        assert_eq!(
            infer_direction(&filter, "172.235.244.118.1443", "192.168.3.72.64306"),
            Some("in")
        );
    }

    #[test]
    fn builds_client_tcpdump_filter() {
        let args = tcpdump_args(
            "any",
            &TcpdumpFilter::Client {
                endpoint: "172.235.244.118:1443".parse::<SocketAddr>().unwrap(),
            },
        );
        assert_eq!(
            args,
            [
                "-l",
                "-n",
                "-tt",
                "-i",
                "any",
                "udp",
                "and",
                "host",
                "172.235.244.118",
                "and",
                "port",
                "1443"
            ]
        );
    }
}