use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};
pub const PROBE_SOURCE: &str = "host.nftables";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NftCounterRow {
pub family: String,
pub table: String,
pub chain: String,
pub handle: u64,
pub packets: u64,
pub bytes: u64,
pub rule_repr: String,
}
#[derive(Debug, Clone)]
pub struct NftablesProbe {
pub family: String,
pub table_name: String,
}
impl NftablesProbe {
pub fn new(family: impl Into<String>, table_name: impl Into<String>) -> Self {
Self {
family: family.into(),
table_name: table_name.into(),
}
}
}
#[async_trait]
impl HostProbe for NftablesProbe {
fn probe_name(&self) -> &'static str {
"nftables"
}
async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
let rows = scrape_counter_rows(&self.family, &self.table_name).await?;
let inputs = serde_json::json!({
"family": self.family,
"table": self.table_name,
"command": format!("nft -j list table {} {}", self.family, self.table_name),
});
let output = serde_json::json!({
"rows": rows,
});
Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
}
}
#[cfg(target_os = "linux")]
async fn scrape_counter_rows(
_family: &str,
_table: &str,
) -> Result<Vec<NftCounterRow>, ProbeError> {
Err(ProbeError::Io(
"nftables counter scrape not yet wired (Phase F1a — pending supervisor integration)".into(),
))
}
#[cfg(not(target_os = "linux"))]
async fn scrape_counter_rows(
_family: &str,
_table: &str,
) -> Result<Vec<NftCounterRow>, ProbeError> {
Err(ProbeError::PlatformUnsupported)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use cellos_core::ports::EventSink;
use super::super::test_support::CapturingSink;
use super::super::{emit_reading, ProbeContext, ProbeReading};
use super::{NftCounterRow, PROBE_SOURCE};
#[tokio::test]
async fn emitted_event_carries_d12_fields() {
let snapshot_handle = Arc::new(CapturingSink::new());
let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();
let rows = vec![NftCounterRow {
family: "inet".into(),
table: "cellos_test".into(),
chain: "egress".into(),
handle: 4,
packets: 12,
bytes: 1284,
rule_repr: "ip daddr 1.2.3.4 tcp dport 443 accept".into(),
}];
let reading = ProbeReading::new(
PROBE_SOURCE,
serde_json::json!({
"family": "inet",
"table": "cellos_test",
"command": "nft -j list table inet cellos_test",
}),
serde_json::json!({"rows": rows}),
);
let ctx = ProbeContext::new("cell-nft-1", "run-nft-1", "sha256:nft");
emit_reading(&probe_sink, &ctx, reading)
.await
.expect("emit");
let events = snapshot_handle.snapshot().await;
assert_eq!(events.len(), 1);
let data = events[0].data.as_ref().expect("data");
assert_eq!(data["probeSource"], PROBE_SOURCE);
assert_eq!(data["cellId"], "cell-nft-1");
assert_eq!(data["runId"], "run-nft-1");
assert!(data["hostReceivedAt"].is_string());
assert_eq!(data["inputs"]["table"], "cellos_test");
let out_rows = data["output"]["rows"].as_array().expect("rows is array");
assert_eq!(out_rows.len(), 1);
assert_eq!(out_rows[0]["packets"], 12);
assert_eq!(out_rows[0]["bytes"], 1284);
assert!(events[0]
.ty
.ends_with(".cell.observability.host.v1.nftables"));
}
}