runnel-rs 0.2.1

A Rust proxy and tunnel toolbox with WireGuard-style, TUN, SOCKS, and TLS-based transports.
Documentation
use std::{collections::BTreeMap, path::PathBuf, time::Duration};

use tracing::{debug, info, warn};

use crate::telemetry;

use super::{
    WgRuntimeConfig,
    uapi::{WgDeviceStats, read_device_stats, read_last_handshake_age, refresh_peer_config},
};

const STATS_INTERVAL: Duration = Duration::from_secs(1);

pub(crate) fn start_stats_poller(role: &'static str, socket_path: PathBuf) {
    std::mem::drop(tokio::spawn(async move {
        let mut ticker = tokio::time::interval(STATS_INTERVAL);
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        let mut previous = None;

        loop {
            ticker.tick().await;
            let path = socket_path.clone();
            let sample = tokio::task::spawn_blocking(move || read_device_stats(&path)).await;

            match sample {
                Ok(Ok(current)) => {
                    if let Some((uploaded, downloaded)) = traffic_delta(previous, current) {
                        emit_sample(role, uploaded, downloaded);
                    }
                    previous = Some(current);
                }
                Ok(Err(error)) => {
                    debug!(
                        uapi_socket = %socket_path.display(),
                        error = %error,
                        "failed to poll wg stats"
                    );
                }
                Err(error) => {
                    debug!(
                        uapi_socket = %socket_path.display(),
                        error = %error,
                        "wg stats poll task failed"
                    );
                }
            }
        }
    }));
}

pub(crate) fn start_unhandshaken_peer_refresher(
    role: &'static str,
    socket_path: PathBuf,
    runtime: WgRuntimeConfig,
    interval: Duration,
) {
    if interval.is_zero() {
        return;
    }

    std::mem::drop(tokio::spawn(async move {
        let start = tokio::time::Instant::now() + interval;
        let mut ticker = tokio::time::interval_at(start, interval);
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            ticker.tick().await;
            let path = socket_path.clone();
            let sample = tokio::task::spawn_blocking(move || read_last_handshake_age(&path)).await;

            match sample {
                Ok(Ok(Some(_))) => {
                    debug!(
                        role,
                        uapi_socket = %socket_path.display(),
                        "stopping wg unhandshaken peer refresher after successful handshake"
                    );
                    break;
                }
                Ok(Ok(None)) => {
                    let path = socket_path.clone();
                    let runtime = runtime.clone();
                    let refresh =
                        tokio::task::spawn_blocking(move || refresh_peer_config(&path, &runtime))
                            .await;
                    match refresh {
                        Ok(Ok(())) => {
                            info!(
                                role,
                                uapi_socket = %socket_path.display(),
                                interval_secs = interval.as_secs(),
                                "refreshed wg peer while waiting for first handshake"
                            );
                        }
                        Ok(Err(error)) => {
                            warn!(
                                role,
                                uapi_socket = %socket_path.display(),
                                error = %error,
                                "failed to refresh wg peer before first handshake"
                            );
                        }
                        Err(error) => {
                            warn!(
                                role,
                                uapi_socket = %socket_path.display(),
                                error = %error,
                                "wg peer refresh task failed"
                            );
                        }
                    }
                }
                Ok(Err(error)) => {
                    debug!(
                        role,
                        uapi_socket = %socket_path.display(),
                        error = %error,
                        "failed to poll wg handshake before peer refresh"
                    );
                }
                Err(error) => {
                    debug!(
                        role,
                        uapi_socket = %socket_path.display(),
                        error = %error,
                        "wg handshake poll task failed before peer refresh"
                    );
                }
            }
        }
    }));
}

fn traffic_delta(previous: Option<WgDeviceStats>, current: WgDeviceStats) -> Option<(u64, u64)> {
    let previous = previous?;
    if current.rx_bytes < previous.rx_bytes || current.tx_bytes < previous.tx_bytes {
        return None;
    }

    let downloaded = current.rx_bytes - previous.rx_bytes;
    let uploaded = current.tx_bytes - previous.tx_bytes;
    (uploaded > 0 || downloaded > 0).then_some((uploaded, downloaded))
}

fn emit_sample(role: &str, uploaded: u64, downloaded: u64) {
    let mut fields = BTreeMap::new();
    fields.insert("target".to_owned(), "wireguard".to_owned());
    fields.insert("link".to_owned(), "wg://wireguard".to_owned());
    fields.insert("route".to_owned(), role.to_owned());
    fields.insert("mode".to_owned(), "wg".to_owned());
    fields.insert("aggregate".to_owned(), "true".to_owned());
    fields.insert("uploaded".to_owned(), uploaded.to_string());
    fields.insert("downloaded".to_owned(), downloaded.to_string());
    telemetry::emit("INFO", "traffic sample", fields);
}

#[cfg(test)]
mod tests {
    use super::traffic_delta;
    use crate::wg::uapi::WgDeviceStats;

    #[test]
    fn traffic_delta_uses_tx_as_uploaded_and_rx_as_downloaded() {
        assert_eq!(
            traffic_delta(
                Some(WgDeviceStats {
                    rx_bytes: 100,
                    tx_bytes: 200,
                }),
                WgDeviceStats {
                    rx_bytes: 175,
                    tx_bytes: 260,
                },
            ),
            Some((60, 75))
        );
    }

    #[test]
    fn traffic_delta_ignores_initial_zero_and_counter_resets() {
        assert_eq!(
            traffic_delta(
                None,
                WgDeviceStats {
                    rx_bytes: 175,
                    tx_bytes: 260,
                },
            ),
            None
        );
        assert_eq!(
            traffic_delta(
                Some(WgDeviceStats {
                    rx_bytes: 175,
                    tx_bytes: 260,
                }),
                WgDeviceStats {
                    rx_bytes: 100,
                    tx_bytes: 200,
                },
            ),
            None
        );
    }

    #[test]
    fn traffic_delta_drops_empty_samples() {
        let sample = WgDeviceStats {
            rx_bytes: 100,
            tx_bytes: 200,
        };

        assert_eq!(traffic_delta(Some(sample), sample), None);
    }
}