use std::sync::Arc;
use std::time::SystemTime;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
use cellos_core::error::CellosError;
use cellos_core::ports::EventSink;
use cellos_core::CloudEventV1;
pub mod cgroup;
pub mod fc_metrics;
pub mod nftables;
pub mod tap_link;
pub const HOST_PROBE_EVENT_SOURCE: &str = "cellos-host-telemetry/probes";
pub const HOST_PROBE_EVENT_TYPE_PREFIX: &str = "dev.cellos.events.cell.observability.host.v1";
#[derive(Debug, Clone)]
pub struct ProbeContext {
pub cell_id: String,
pub run_id: String,
pub spec_signature_hash: String,
}
impl ProbeContext {
pub fn new(
cell_id: impl Into<String>,
run_id: impl Into<String>,
spec_signature_hash: impl Into<String>,
) -> Self {
Self {
cell_id: cell_id.into(),
run_id: run_id.into(),
spec_signature_hash: spec_signature_hash.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProbeReading {
pub probe_source: String,
pub inputs: Value,
pub output: Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub host_received_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl ProbeReading {
pub fn new(probe_source: impl Into<String>, inputs: Value, output: Value) -> Self {
Self {
probe_source: probe_source.into(),
inputs,
output,
host_received_at: None,
}
}
}
#[derive(Debug, Error)]
pub enum ProbeError {
#[error("host probe is not supported on this platform")]
PlatformUnsupported,
#[error("host probe i/o error: {0}")]
Io(String),
#[error("host probe parse error: {0}")]
Parse(String),
#[error("host probe sink error: {0}")]
Sink(String),
}
impl From<CellosError> for ProbeError {
fn from(err: CellosError) -> Self {
ProbeError::Sink(err.to_string())
}
}
#[async_trait]
pub trait HostProbe: Send + Sync {
fn probe_name(&self) -> &'static str;
async fn read(&self, ctx: &ProbeContext) -> Result<ProbeReading, ProbeError>;
}
pub async fn emit_reading(
sink: &Arc<dyn EventSink>,
ctx: &ProbeContext,
mut reading: ProbeReading,
) -> Result<(), ProbeError> {
let stamp = chrono::DateTime::<chrono::Utc>::from(SystemTime::now());
reading.host_received_at = Some(stamp);
let event = build_host_probe_envelope(ctx, &reading)?;
sink.emit(&event).await?;
Ok(())
}
pub fn build_host_probe_envelope(
ctx: &ProbeContext,
reading: &ProbeReading,
) -> Result<CloudEventV1, ProbeError> {
let probe_short = probe_short_name(&reading.probe_source);
let event_type = match probe_short {
"fc_metrics" => f1b_stub::cell_observability_host_fc_metrics_v1_type(),
"cgroup" => f1b_stub::cell_observability_host_cgroup_v1_type(),
"nftables" => f1b_stub::cell_observability_host_nftables_v1_type(),
"tap_link" => f1b_stub::cell_observability_host_tap_v1_type(),
other => {
return Err(ProbeError::Parse(format!(
"unknown host probe short name: {other}"
)))
}
};
let stamp_rfc3339 = reading
.host_received_at
.map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
.unwrap_or_else(|| {
chrono::DateTime::<chrono::Utc>::from(SystemTime::now())
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
});
let data = serde_json::json!({
"cellId": ctx.cell_id,
"runId": ctx.run_id,
"specSignatureHash": ctx.spec_signature_hash,
"hostReceivedAt": stamp_rfc3339.clone(),
"probeSource": reading.probe_source,
"inputs": reading.inputs,
"output": reading.output,
});
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: format!("urn:cellos:event:{}", uuid::Uuid::new_v4()),
source: HOST_PROBE_EVENT_SOURCE.into(),
ty: event_type,
datacontenttype: Some("application/json".into()),
data: Some(data),
time: Some(stamp_rfc3339),
traceparent: None,
})
}
fn probe_short_name(probe_source: &str) -> &str {
probe_source.strip_prefix("host.").unwrap_or(probe_source)
}
pub mod f1b_stub {
use super::HOST_PROBE_EVENT_TYPE_PREFIX;
pub fn cell_observability_host_fc_metrics_v1_type() -> String {
format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.fc_metrics")
}
pub fn cell_observability_host_cgroup_v1_type() -> String {
format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.cgroup")
}
pub fn cell_observability_host_nftables_v1_type() -> String {
format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.nftables")
}
pub fn cell_observability_host_tap_v1_type() -> String {
format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.tap_link")
}
}
#[cfg(test)]
pub mod test_support {
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use cellos_core::error::CellosError;
use cellos_core::ports::EventSink;
use cellos_core::CloudEventV1;
#[derive(Default)]
pub struct CapturingSink {
events: Mutex<Vec<CloudEventV1>>,
}
impl CapturingSink {
pub fn new() -> Self {
Self {
events: Mutex::new(Vec::new()),
}
}
pub fn as_arc(self) -> Arc<dyn EventSink> {
Arc::new(self)
}
pub async fn snapshot(&self) -> Vec<CloudEventV1> {
self.events.lock().await.clone()
}
}
#[async_trait]
impl EventSink for CapturingSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
self.events.lock().await.push(event.clone());
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn probe_short_name_strips_host_prefix() {
assert_eq!(probe_short_name("host.fc_metrics"), "fc_metrics");
assert_eq!(probe_short_name("fc_metrics"), "fc_metrics");
}
#[test]
fn f1b_stub_types_match_adr0006_shape() {
assert_eq!(
f1b_stub::cell_observability_host_fc_metrics_v1_type(),
"dev.cellos.events.cell.observability.host.v1.fc_metrics"
);
assert_eq!(
f1b_stub::cell_observability_host_cgroup_v1_type(),
"dev.cellos.events.cell.observability.host.v1.cgroup"
);
assert_eq!(
f1b_stub::cell_observability_host_nftables_v1_type(),
"dev.cellos.events.cell.observability.host.v1.nftables"
);
assert_eq!(
f1b_stub::cell_observability_host_tap_v1_type(),
"dev.cellos.events.cell.observability.host.v1.tap_link"
);
}
#[test]
fn build_envelope_carries_attribution_fields() {
let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
let reading = ProbeReading {
probe_source: "host.fc_metrics".into(),
inputs: serde_json::json!({"endpoint": "http://localhost/metrics"}),
output: serde_json::json!({"vmm": {"reads": 0}}),
host_received_at: Some(chrono::Utc::now()),
};
let env = build_host_probe_envelope(&ctx, &reading).expect("build envelope");
assert_eq!(env.specversion, "1.0");
assert_eq!(env.source, HOST_PROBE_EVENT_SOURCE);
assert!(env.ty.ends_with(".fc_metrics"));
let data = env.data.expect("data present");
assert_eq!(data["cellId"], "cell-1");
assert_eq!(data["runId"], "run-7");
assert_eq!(data["specSignatureHash"], "sha256:abcd");
assert_eq!(data["probeSource"], "host.fc_metrics");
assert!(data["inputs"].is_object());
assert!(data["output"].is_object());
assert!(data["hostReceivedAt"].is_string());
}
#[tokio::test]
async fn emit_reading_stamps_host_received_at_and_dispatches() {
let sink = Arc::new(test_support::CapturingSink::new());
let dyn_sink: Arc<dyn EventSink> = sink.clone();
let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
let reading = ProbeReading::new(
"host.fc_metrics",
serde_json::json!({"endpoint": "http://127.0.0.1:0/metrics"}),
serde_json::json!({"vmm": {"reads": 1}}),
);
emit_reading(&dyn_sink, &ctx, reading)
.await
.expect("emit succeeds");
let events = sink.snapshot().await;
assert_eq!(events.len(), 1);
let data = events[0].data.as_ref().expect("data");
assert!(data["hostReceivedAt"].is_string());
assert_eq!(data["probeSource"], "host.fc_metrics");
assert_eq!(data["cellId"], "cell-1");
assert_eq!(data["runId"], "run-7");
}
}