use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[cfg(target_os = "linux")]
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, trace};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum HostTelemetryKind {
Buddyinfo,
Rptaddrs,
}
#[derive(Default)]
pub struct HostTelemetry {
pending: Mutex<Vec<String>>,
tasks: Mutex<HashMap<HostTelemetryKind, JoinHandle<()>>>,
#[cfg(target_os = "linux")]
rptaddrs_cache: Mutex<RptaddrsCache>,
}
#[cfg(target_os = "linux")]
#[derive(Default)]
struct RptaddrsCache {
last_hash: Option<u64>,
last_emit: Option<u64>,
}
impl HostTelemetry {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
async fn push(&self, line: String) {
let mut p = self.pending.lock().await;
p.push(line);
}
pub async fn drain(&self) -> Vec<String> {
let mut p = self.pending.lock().await;
std::mem::take(&mut *p)
}
pub async fn schedule_buddyinfo(
self: &Arc<Self>,
interval: u64,
lowmem: Option<u32>,
msm_id: Option<u32>,
) {
self.schedule(HostTelemetryKind::Buddyinfo, interval, move |_| {
buddyinfo_line(lowmem, msm_id)
})
.await;
}
pub async fn schedule_rptaddrs(self: &Arc<Self>, interval: u64, msm_id: u32) {
self.schedule(HostTelemetryKind::Rptaddrs, interval, move |me| {
rptaddrs_line(msm_id, &me)
})
.await;
}
async fn schedule<F>(self: &Arc<Self>, kind: HostTelemetryKind, interval: u64, make_line: F)
where
F: Fn(Arc<Self>) -> Option<String> + Send + Sync + 'static,
{
let me = Arc::clone(self);
let new_handle = tokio::spawn(async move {
loop {
match make_line(Arc::clone(&me)) {
Some(line) => me.push(line).await,
None => trace!(?kind, "telemetry reporter produced no line"),
}
if interval == 0 {
return;
}
tokio::time::sleep(Duration::from_secs(interval)).await;
}
});
let mut tasks = self.tasks.lock().await;
if let Some(old) = tasks.insert(kind, new_handle) {
old.abort();
debug!(?kind, interval, "replaced existing telemetry schedule");
} else {
debug!(?kind, interval, "scheduled telemetry");
}
}
}
#[cfg(target_os = "linux")]
fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub fn buddyinfo_line(lowmem: Option<u32>, _msm_id: Option<u32>) -> Option<String> {
#[cfg(target_os = "linux")]
{
let raw = std::fs::read_to_string("/proc/buddyinfo").ok()?;
let mut nums: Vec<u64> = Vec::new();
for line in raw.lines() {
for tok in line.split_whitespace().skip(4) {
if let Ok(n) = tok.parse::<u64>() {
nums.push(n);
}
}
if !nums.is_empty() {
break;
}
}
if nums.is_empty() {
return None;
}
let mut freemem: u64 = 0;
let mut block_kb: u64 = 4;
for n in &nums {
freemem = freemem.saturating_add(block_kb.saturating_mul(*n));
block_kb = block_kb.saturating_mul(2);
}
let arr = nums
.iter()
.map(u64::to_string)
.collect::<Vec<_>>()
.join(", ");
let uptime = read_uptime_secs().unwrap_or(0);
let now = unix_now();
let _ = lowmem;
Some(format!(
"RESULT {{ \"id\": \"9001\", \"time\": {}, \"uptime\": {}, \"buddyinfo\": [ {} ], \
\"freemem\": {} }}\n",
now, uptime, arr, freemem
))
}
#[cfg(not(target_os = "linux"))]
{
let _ = lowmem;
None
}
}
#[cfg(target_os = "linux")]
fn read_uptime_secs() -> Option<u64> {
let s = std::fs::read_to_string("/proc/uptime").ok()?;
s.split_whitespace().next()?.split('.').next()?.parse().ok()
}
pub fn rptaddrs_line(msm_id: u32, telem: &Arc<HostTelemetry>) -> Option<String> {
#[cfg(target_os = "linux")]
{
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let body = build_rptaddrs_body()?;
let mut hasher = DefaultHasher::new();
body.hash(&mut hasher);
let h = hasher.finish();
let now = unix_now();
let should_emit = match telem.rptaddrs_cache.try_lock() {
Ok(mut cache) => {
let stale = cache
.last_emit
.map(|t| now.saturating_sub(t) > 3600)
.unwrap_or(true);
let changed = cache.last_hash.map(|prev| prev != h).unwrap_or(true);
if changed || stale {
cache.last_hash = Some(h);
cache.last_emit = Some(now);
true
} else {
false
}
}
Err(_) => true,
};
if !should_emit {
return None;
}
Some(format!(
"RESULT {{ \"id\": \"{}\", \"time\": {}, {} }}\n",
msm_id, now, body
))
}
#[cfg(not(target_os = "linux"))]
{
let _ = (msm_id, telem);
None
}
}
#[cfg(target_os = "linux")]
fn build_rptaddrs_body() -> Option<String> {
let mut parts: Vec<String> = Vec::new();
parts.push(format!("\"inet-addresses\": [ {} ]", inet4_addresses()));
parts.push(format!("\"inet-routes\": [ {} ]", inet4_routes()));
if let Some(addrs) = inet6_addresses() {
parts.push(format!("\"inet6-addresses\": [ {} ]", addrs));
}
if let Some(routes) = inet6_routes() {
parts.push(format!("\"inet6-routes\": [ {} ]", routes));
}
parts.push(format!("\"dns\": [ {} ]", dns_resolvers()));
Some(parts.join(", "))
}
#[cfg(target_os = "linux")]
fn inet4_addresses() -> String {
use std::ffi::CStr;
use std::mem::{size_of, zeroed};
unsafe {
let s = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, libc::IPPROTO_IP);
if s < 0 {
return String::new();
}
let mut req_buf: [libc::ifreq; 32] = zeroed();
let mut ifc: libc::ifconf = zeroed();
ifc.ifc_len = (size_of::<libc::ifreq>() * req_buf.len()) as i32;
ifc.ifc_ifcu.ifcu_buf = req_buf.as_mut_ptr() as *mut libc::c_char;
if libc::ioctl(s, libc::SIOCGIFCONF as _, &mut ifc) < 0 {
libc::close(s);
return String::new();
}
let count = ifc.ifc_len as usize / size_of::<libc::ifreq>();
let mut out = String::new();
for ifr in req_buf.iter().take(count) {
let name = CStr::from_ptr(ifr.ifr_name.as_ptr())
.to_string_lossy()
.into_owned();
let addr = sockaddr_in_to_string(&ifr.ifr_ifru.ifru_addr);
let mut ifr_mask: libc::ifreq = zeroed();
ifr_mask.ifr_name = ifr.ifr_name;
let netmask = if libc::ioctl(s, libc::SIOCGIFNETMASK as _, &mut ifr_mask) == 0 {
sockaddr_in_to_string(&ifr_mask.ifr_ifru.ifru_addr)
} else {
String::new()
};
if !out.is_empty() {
out.push_str(", ");
}
out.push_str(&format!(
"{{ \"inet-addr\": \"{}\", \"netmask\": \"{}\", \"interface\": \"{}\" }}",
addr, netmask, name
));
}
libc::close(s);
out
}
}
#[cfg(target_os = "linux")]
fn sockaddr_in_to_string(sa: &libc::sockaddr) -> String {
if sa.sa_family as i32 != libc::AF_INET {
return String::new();
}
let sin = unsafe { &*(sa as *const libc::sockaddr as *const libc::sockaddr_in) };
let bytes = sin.sin_addr.s_addr.to_le_bytes();
format!("{}.{}.{}.{}", bytes[0], bytes[1], bytes[2], bytes[3])
}
#[cfg(target_os = "linux")]
fn inet4_routes() -> String {
let raw = match std::fs::read_to_string("/proc/net/route") {
Ok(s) => s,
Err(_) => return String::new(),
};
let mut out = String::new();
for line in raw.lines().skip(1) {
let f: Vec<&str> = line.split_whitespace().collect();
if f.len() < 8 {
continue;
}
let iface = f[0];
let dest = hex_to_v4(f[1]);
let gateway = hex_to_v4(f[2]);
let mask = hex_to_v4(f[7]);
if !out.is_empty() {
out.push_str(", ");
}
out.push_str(&format!(
"{{ \"destination\": \"{}\", \"netmask\": \"{}\", \"next-hop\": \"{}\", \
\"interface\": \"{}\" }}",
dest, mask, gateway, iface
));
}
out
}
#[cfg(target_os = "linux")]
fn hex_to_v4(hex: &str) -> String {
let v = u32::from_str_radix(hex, 16).unwrap_or(0);
let bytes = v.to_le_bytes();
format!("{}.{}.{}.{}", bytes[0], bytes[1], bytes[2], bytes[3])
}
#[cfg(target_os = "linux")]
fn inet6_addresses() -> Option<String> {
let raw = std::fs::read_to_string("/proc/net/if_inet6").ok()?;
if raw.is_empty() {
return None;
}
let mut out = String::new();
for line in raw.lines() {
let f: Vec<&str> = line.split_whitespace().collect();
if f.len() < 6 || f[0].len() != 32 {
continue;
}
let addr = hex32_to_v6(f[0]);
let prefix = u32::from_str_radix(f[2], 16).unwrap_or(0);
let scope_raw = u32::from_str_radix(f[3], 16).unwrap_or(0);
let iface = f[5];
let scope = match scope_raw & 0x00f0 {
0x0000 => "Global",
0x0010 => "Host",
0x0020 => "Link",
0x0040 => "Site",
0x0080 => "Compat",
_ => "Unknown",
};
if !out.is_empty() {
out.push_str(", ");
}
out.push_str(&format!(
"{{ \"inet6-addr\": \"{}\", \"prefix-length\": {}, \"scope\": \"{}\", \"interface\": \
\"{}\" }}",
addr, prefix, scope, iface
));
}
if out.is_empty() {
None
} else {
Some(out)
}
}
#[cfg(target_os = "linux")]
fn hex32_to_v6(hex: &str) -> String {
let mut groups: Vec<String> = Vec::with_capacity(8);
for i in 0..8 {
let g = &hex[i * 4..(i + 1) * 4];
let trimmed = g.trim_start_matches('0');
groups.push(if trimmed.is_empty() {
"0".to_string()
} else {
trimmed.to_string()
});
}
groups.join(":")
}
#[cfg(target_os = "linux")]
fn inet6_routes() -> Option<String> {
let raw = std::fs::read_to_string("/proc/net/ipv6_route").ok()?;
let mut out = String::new();
for line in raw.lines() {
let f: Vec<&str> = line.split_whitespace().collect();
if f.len() < 10 {
continue;
}
let dst_hex = f[0];
if dst_hex.len() != 32 {
continue;
}
let prefix_len = u32::from_str_radix(f[1], 16).unwrap_or(0);
let nh_hex = f[4];
let metric = u32::from_str_radix(f[5], 16).unwrap_or(0);
let flags = u32::from_str_radix(f[8], 16).unwrap_or(0);
let iface = f[9];
const RTF_UP: u32 = 0x0001;
const RTF_ADDRCONF: u32 = 0x0400_0000;
const RTF_CACHE: u32 = 0x0100_0000;
if flags & RTF_UP == 0 {
continue;
}
if (flags & RTF_ADDRCONF) != 0 && (flags & RTF_CACHE) != 0 {
continue;
}
if dst_hex.starts_with("ff02") || dst_hex.starts_with("ff00") {
continue;
}
if prefix_len == 128 {
continue;
}
let dest = hex32_to_v6(dst_hex);
let nh = hex32_to_v6(nh_hex);
if !out.is_empty() {
out.push_str(", ");
}
out.push_str(&format!(
"{{ \"destination\": \"{}\", \"prefix-length\": {}, \"next-hop\": \"{}\", \"metric\": \
{}, \"interface\": \"{}\" }}",
dest, prefix_len, nh, metric, iface
));
}
if out.is_empty() {
None
} else {
Some(out)
}
}
#[cfg(target_os = "linux")]
fn dns_resolvers() -> String {
let raw = match std::fs::read_to_string("/etc/resolv.conf") {
Ok(s) => s,
Err(_) => return String::new(),
};
let mut out = String::new();
for line in raw.lines() {
let line = line.trim();
if let Some(rest) = line.strip_prefix("nameserver") {
let ns = rest.trim();
if ns.is_empty() {
continue;
}
if !out.is_empty() {
out.push_str(", ");
}
out.push_str(&format!("{{ \"nameserver\": \"{}\" }}", ns));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_os = "linux")]
#[test]
fn hex_v4_round_trip() {
assert_eq!(hex_to_v4("0100A8C0"), "192.168.0.1");
assert_eq!(hex_to_v4("00000000"), "0.0.0.0");
}
#[cfg(target_os = "linux")]
#[test]
fn hex_v6_groups() {
assert_eq!(
hex32_to_v6("20010db8000000000000000000000001"),
"2001:db8:0:0:0:0:0:1"
);
}
#[tokio::test]
async fn drain_is_idempotent() {
let t = HostTelemetry::new();
let drained = t.drain().await;
assert!(drained.is_empty());
}
#[tokio::test]
async fn buddyinfo_returns_something_on_linux() {
if cfg!(target_os = "linux") {
let _ = buddyinfo_line(None, None);
}
}
#[cfg(target_os = "linux")]
#[test]
fn buddyinfo_line_shape_matches_official_probe() {
let Some(line) = buddyinfo_line(Some(4), None) else {
return; };
assert!(line.starts_with("RESULT { "));
assert!(line.contains("\"id\": \"9001\""));
assert!(line.contains("\"buddyinfo\":"));
assert!(line.contains("\"freemem\":"));
assert!(line.ends_with("}\n"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn rptaddrs_emits_once_then_caches() {
let t = HostTelemetry::new();
let first = rptaddrs_line(9104, &t);
assert!(first.is_some(), "rptaddrs should emit first time");
let second = rptaddrs_line(9104, &t);
assert!(second.is_none(), "rptaddrs should suppress unchanged");
}
#[tokio::test]
async fn one_shot_schedule_drains() {
let t = HostTelemetry::new();
if cfg!(target_os = "linux") {
t.schedule_buddyinfo(0, None, None).await;
for _ in 0..20 {
tokio::task::yield_now().await;
let pending = t.drain().await;
if !pending.is_empty() {
assert!(pending[0].contains("\"id\": \"9001\""));
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
panic!("buddyinfo never landed in drain");
}
}
}