use std::time::Duration;
use crate::node::background_task_monitor::BackgroundTaskMonitor;
use crate::transport::TRANSPORT_METRICS;
const MONITOR_INTERVAL: Duration = Duration::from_secs(1);
const PROC_NET_DEV: &str = "/proc/net/dev";
pub(crate) fn spawn_iface_tx_monitor(local_peer_id: String, monitor: &BackgroundTaskMonitor) {
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(MONITOR_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
let mut prev_total = read_total_tx_bytes().await;
let mut prev_own = TRANSPORT_METRICS.cumulative_bytes_sent();
loop {
ticker.tick().await;
let now_total = read_total_tx_bytes().await;
let now_own = TRANSPORT_METRICS.cumulative_bytes_sent();
if let (Some(prev), Some(now)) = (prev_total, now_total) {
let total_delta = now.saturating_sub(prev);
let own_delta = now_own.saturating_sub(prev_own);
let op_delta = iface_op(total_delta, own_delta);
emit_iface_snapshot(&local_peer_id, total_delta, own_delta, op_delta);
}
if now_total.is_some() {
prev_total = now_total;
prev_own = now_own;
}
}
});
monitor.register("shadow_iface_tx_monitor", handle);
}
async fn read_total_tx_bytes() -> Option<u64> {
let contents = tokio::fs::read_to_string(PROC_NET_DEV).await.ok()?;
parse_total_tx_bytes(&contents)
}
fn parse_total_tx_bytes(contents: &str) -> Option<u64> {
const TX_BYTES_FIELD: usize = 8;
let mut total: u64 = 0;
let mut found = false;
for line in contents.lines() {
let Some((iface, rest)) = line.split_once(':') else {
continue;
};
let iface = iface.trim();
if iface.is_empty() || iface == "lo" {
continue;
}
if let Some(tx) = rest
.split_whitespace()
.nth(TX_BYTES_FIELD)
.and_then(|s| s.parse::<u64>().ok())
{
total = total.saturating_add(tx);
found = true;
}
}
found.then_some(total)
}
fn iface_op(total_delta: u64, own_delta: u64) -> u64 {
total_delta.saturating_sub(own_delta)
}
fn emit_iface_snapshot(
local_peer_id: &str,
total_tx_bytes: u64,
own_tx_bytes: u64,
op_tx_bytes: u64,
) {
tracing::debug!(
target: "freenet::transport::shadow_iface_tx",
total_tx_bytes,
own_tx_bytes,
op_tx_bytes,
"shadow_iface_tx"
);
crate::tracing::telemetry::send_standalone_event_with_peer_id(
"shadow_iface_tx",
local_peer_id,
serde_json::json!({
"total_tx_bytes_per_sec": total_tx_bytes,
"freenet_own_tx_bytes_per_sec": own_tx_bytes,
"op_tx_bytes_per_sec": op_tx_bytes,
}),
);
}
#[cfg(test)]
mod tests {
use super::*;
const SAMPLE: &str = "Inter-| Receive | Transmit\n\
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed\n\
lo: 1000 10 0 0 0 0 0 0 1000 10 0 0 0 0 0 0\n\
eth0: 500000 1000 0 0 0 0 0 0 250000 800 0 0 0 0 0 0\n\
wlan0: 10 1 0 0 0 0 0 0 7500 5 0 0 0 0 0 0\n";
#[test]
fn parse_sums_tx_excluding_loopback() {
assert_eq!(parse_total_tx_bytes(SAMPLE), Some(257_500));
}
#[test]
fn parse_returns_none_when_no_interface_lines() {
let headers = "Inter-| Receive | Transmit\n face |bytes ... |bytes ...\n";
assert_eq!(parse_total_tx_bytes(headers), None);
assert_eq!(parse_total_tx_bytes(""), None);
}
#[test]
fn parse_skips_only_loopback_named_lo() {
let input = " lodev: 1 2 3 4 5 6 7 8 999 10\n";
assert_eq!(parse_total_tx_bytes(input), Some(999));
}
#[test]
fn parse_tolerates_short_or_garbage_lines() {
let input = " eth0: 1 2 3\n eth1: 1 2 3 4 5 6 7 8 4242 9\n";
assert_eq!(parse_total_tx_bytes(input), Some(4242));
}
#[test]
fn parse_handles_colon_glued_to_first_counter() {
let glued = "eth0:100 1 2 3 4 5 6 7 9999 10\n";
assert_eq!(parse_total_tx_bytes(glued), Some(9999));
}
#[test]
fn parse_handles_huge_counters_without_overflow() {
let big = u64::MAX - 1;
let input = format!("eth0: 1 2 3 4 5 6 7 8 {big} 9\neth1: 1 2 3 4 5 6 7 8 5 9\n");
assert_eq!(parse_total_tx_bytes(&input), Some(u64::MAX));
}
#[test]
fn iface_op_is_total_minus_own() {
assert_eq!(iface_op(10_000, 3_000), 7_000);
assert_eq!(iface_op(0, 0), 0);
}
#[test]
fn iface_op_saturates_when_own_exceeds_total() {
assert_eq!(iface_op(1_000, 4_000), 0);
}
#[tokio::test(start_paused = true)]
async fn monitor_survives_multiple_ticks() {
let monitor = BackgroundTaskMonitor::new();
spawn_iface_tx_monitor("test-peer".to_string(), &monitor);
tokio::time::advance(MONITOR_INTERVAL * 4 + Duration::from_millis(100)).await;
tokio::task::yield_now().await;
let exit = monitor.wait_for_any_exit();
tokio::pin!(exit);
let still_running = tokio::time::timeout(Duration::from_millis(50), &mut exit)
.await
.is_err();
assert!(
still_running,
"iface-tx monitor task should still be alive after a few ticks"
);
}
}