use std::path::{Path, PathBuf};
use async_trait::async_trait;
use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};
pub const PROBE_SOURCE: &str = "host.tap_link";
#[derive(Debug, Clone)]
pub struct TapLinkProbe {
pub iface: String,
}
impl TapLinkProbe {
pub fn new(iface: impl Into<String>) -> Self {
Self {
iface: iface.into(),
}
}
fn sys_class_net(&self) -> PathBuf {
PathBuf::from("/sys/class/net").join(&self.iface)
}
}
#[async_trait]
impl HostProbe for TapLinkProbe {
fn probe_name(&self) -> &'static str {
"tap_link"
}
async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
let sysfs = self.sys_class_net();
let operstate = read_trimmed(&sysfs.join("operstate")).await?;
let carrier = read_trimmed(&sysfs.join("carrier")).await?;
let rx_packets = read_u64(&sysfs.join("statistics/rx_packets")).await?;
let tx_packets = read_u64(&sysfs.join("statistics/tx_packets")).await?;
let rx_bytes = read_u64(&sysfs.join("statistics/rx_bytes")).await?;
let tx_bytes = read_u64(&sysfs.join("statistics/tx_bytes")).await?;
let inputs = serde_json::json!({
"iface": self.iface,
"sysfs": sysfs.to_string_lossy(),
});
let output = serde_json::json!({
"operstate": operstate,
"carrier": carrier,
"rxPackets": rx_packets,
"txPackets": tx_packets,
"rxBytes": rx_bytes,
"txBytes": tx_bytes,
});
Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
}
}
#[cfg(target_os = "linux")]
async fn read_trimmed(path: &Path) -> Result<String, ProbeError> {
match tokio::fs::read_to_string(path).await {
Ok(s) => Ok(s.trim().to_string()),
Err(e) => Err(ProbeError::Io(format!(
"tap-link read failed at {}: {e}",
path.display()
))),
}
}
#[cfg(target_os = "linux")]
async fn read_u64(path: &Path) -> Result<u64, ProbeError> {
let raw = read_trimmed(path).await?;
raw.parse::<u64>().map_err(|e| {
ProbeError::Parse(format!(
"tap-link counter at {} not u64 ({raw:?}): {e}",
path.display()
))
})
}
#[cfg(not(target_os = "linux"))]
async fn read_trimmed(_path: &Path) -> Result<String, ProbeError> {
Err(ProbeError::PlatformUnsupported)
}
#[cfg(not(target_os = "linux"))]
async fn read_u64(_path: &Path) -> Result<u64, ProbeError> {
Err(ProbeError::PlatformUnsupported)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use cellos_core::ports::EventSink;
use super::super::test_support::CapturingSink;
use super::super::{emit_reading, ProbeContext, ProbeReading};
use super::PROBE_SOURCE;
#[tokio::test]
async fn emitted_event_carries_d12_fields() {
let snapshot_handle = Arc::new(CapturingSink::new());
let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();
let reading = ProbeReading::new(
PROBE_SOURCE,
serde_json::json!({
"iface": "cfc-abcd1234",
"sysfs": "/sys/class/net/cfc-abcd1234",
}),
serde_json::json!({
"operstate": "up",
"carrier": "1",
"rxPackets": 7,
"txPackets": 12,
"rxBytes": 840,
"txBytes": 1500,
}),
);
let ctx = ProbeContext::new("cell-tap-1", "run-tap-1", "sha256:tap");
emit_reading(&probe_sink, &ctx, reading)
.await
.expect("emit");
let events = snapshot_handle.snapshot().await;
assert_eq!(events.len(), 1);
let data = events[0].data.as_ref().expect("data");
assert_eq!(data["probeSource"], PROBE_SOURCE);
assert_eq!(data["cellId"], "cell-tap-1");
assert_eq!(data["runId"], "run-tap-1");
assert!(data["hostReceivedAt"].is_string());
assert_eq!(data["inputs"]["iface"], "cfc-abcd1234");
assert_eq!(data["output"]["operstate"], "up");
assert_eq!(data["output"]["carrier"], "1");
assert_eq!(data["output"]["txBytes"], 1500);
assert!(events[0]
.ty
.ends_with(".cell.observability.host.v1.tap_link"));
}
}