use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};
pub const PROBE_SOURCE: &str = "host.cgroup";
#[derive(Debug, Clone)]
pub struct CgroupProbe {
pub cgroup_leaf: PathBuf,
}
impl CgroupProbe {
pub fn new(cgroup_leaf: impl Into<PathBuf>) -> Self {
Self {
cgroup_leaf: cgroup_leaf.into(),
}
}
}
#[async_trait]
impl HostProbe for CgroupProbe {
fn probe_name(&self) -> &'static str {
"cgroup"
}
async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
let memory = read_flat_kv_at(&self.cgroup_leaf.join("memory.events")).await?;
let cpu = read_flat_kv_at(&self.cgroup_leaf.join("cpu.stat")).await?;
let pids = read_flat_kv_at(&self.cgroup_leaf.join("pids.events")).await?;
let inputs = serde_json::json!({
"cgroupLeaf": self.cgroup_leaf.to_string_lossy(),
"files": ["memory.events", "cpu.stat", "pids.events"],
});
let output = serde_json::json!({
"memory_events": memory,
"cpu_stat": cpu,
"pids_events": pids,
});
Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
}
}
#[cfg(target_os = "linux")]
async fn read_flat_kv_at(path: &Path) -> Result<BTreeMap<String, u64>, ProbeError> {
match tokio::fs::read_to_string(path).await {
Ok(text) => parse_flat_kv(&text),
Err(e) => Err(ProbeError::Io(format!(
"cgroup read failed at {}: {e}",
path.display()
))),
}
}
#[cfg(not(target_os = "linux"))]
async fn read_flat_kv_at(_path: &Path) -> Result<BTreeMap<String, u64>, ProbeError> {
Err(ProbeError::PlatformUnsupported)
}
pub fn parse_flat_kv(text: &str) -> Result<BTreeMap<String, u64>, ProbeError> {
let mut out = BTreeMap::new();
for (lineno, raw) in text.lines().enumerate() {
let line = raw.trim();
if line.is_empty() {
continue;
}
let mut parts = line.split_whitespace();
let key = parts
.next()
.ok_or_else(|| ProbeError::Parse(format!("flat-kv line {} has no key", lineno + 1)))?;
let val = parts.next().ok_or_else(|| {
ProbeError::Parse(format!("flat-kv line {} has no value", lineno + 1))
})?;
let parsed: u64 = val.parse().map_err(|e| {
ProbeError::Parse(format!(
"flat-kv line {} value {val:?} not u64: {e}",
lineno + 1
))
})?;
out.insert(key.to_string(), parsed);
}
Ok(out)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use cellos_core::ports::EventSink;
use super::super::test_support::CapturingSink;
use super::super::{emit_reading, ProbeContext, ProbeReading};
use super::{parse_flat_kv, PROBE_SOURCE};
#[test]
fn parse_flat_kv_handles_canonical_memory_events() {
let text = "low 0\nhigh 0\nmax 12\noom 1\noom_kill 1\n";
let parsed = parse_flat_kv(text).expect("parse");
assert_eq!(parsed["low"], 0);
assert_eq!(parsed["max"], 12);
assert_eq!(parsed["oom"], 1);
assert_eq!(parsed["oom_kill"], 1);
}
#[test]
fn parse_flat_kv_rejects_non_numeric_value() {
let text = "low not-a-number\n";
let err = parse_flat_kv(text).expect_err("rejects bad value");
let msg = format!("{err}");
assert!(msg.contains("flat-kv"), "expected parse error, got {msg}");
}
#[tokio::test]
async fn emitted_event_carries_d12_fields() {
let snapshot_handle = Arc::new(CapturingSink::new());
let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();
let ctx = ProbeContext::new("cell-cg-1", "run-cg-1", "sha256:cg");
let reading = ProbeReading::new(
PROBE_SOURCE,
serde_json::json!({
"cgroupLeaf": "/sys/fs/cgroup/cellos/test",
"files": ["memory.events", "cpu.stat", "pids.events"],
}),
serde_json::json!({
"memory_events": {"oom": 1, "oom_kill": 1},
"cpu_stat": {"usage_usec": 12345},
"pids_events": {"max": 0},
}),
);
emit_reading(&probe_sink, &ctx, reading)
.await
.expect("emit");
let events = snapshot_handle.snapshot().await;
assert_eq!(events.len(), 1);
let data = events[0].data.as_ref().expect("data");
assert_eq!(data["probeSource"], PROBE_SOURCE);
assert_eq!(data["cellId"], "cell-cg-1");
assert_eq!(data["runId"], "run-cg-1");
assert!(data["hostReceivedAt"].is_string());
assert!(data["inputs"]["cgroupLeaf"].is_string());
assert_eq!(data["output"]["memory_events"]["oom_kill"], 1);
assert!(events[0].ty.ends_with(".cell.observability.host.v1.cgroup"));
}
}