cellos_host_telemetry/probes/mod.rs
1//! Host-side observability probes (Slot F1a / Path B).
2//!
3//! These are the **host-side** counterpart to the in-guest `cellos-telemetry`
4//! agent's Path A declarations. Where Path A asks the guest to *declare* what
5//! happened ("a process spawned, here's its `comm`"), Path B watches the cell
6//! from outside the guest using primitives the supervisor already controls:
7//!
8//! - the Firecracker VMM's `/metrics` endpoint
9//! - cgroup-v2 leaf files (`memory.events`, `cpu.stat`, `pids.events`)
10//! - per-cell nftables byte/packet counters
11//! - the host-side TAP link's carrier / flags state
12//!
13//! Each of these runs **on the host**, in the supervisor's address space, and
14//! produces a [`ProbeReading`] that gets dispatched into the supervisor's
15//! existing [`cellos_core::ports::EventSink`]. The guest cannot forge these
16//! readings — the data comes from kernel files / VMM endpoints the guest has
17//! no path to. That makes Path B the cross-witness for Path A: a guest
18//! `connect_attempted` declaration without a matching host-side
19//! `network_flow_decision` is the silence the projector flags.
20//!
21//! ## D12 — `probe_source` / `inputs` / `output` are mandatory
22//!
23//! Doctrine D12 ("every probe-emitted event is attributable to a probe and
24//! its concrete inputs/outputs") means a `ProbeReading` MUST carry:
25//!
26//! - `probe_source` — stable string identifying which probe produced this
27//! reading (e.g. `"host.fc_metrics"`).
28//! - `inputs` — JSON object describing the concrete inputs the probe read
29//! (file path, endpoint URL, table name, interface name…). This is what
30//! makes the reading audit-replayable.
31//! - `output` — JSON object holding the structured fields the probe
32//! extracted (counters, gauges, link state). Free-form per probe.
33//!
34//! Each per-probe module (`fc_metrics`, `cgroup`, `nftables`, `tap_link`)
35//! defines the concrete shape of its `inputs` and `output` blocks.
36//!
37//! ## Linux-only — non-Linux returns `ProbeError::PlatformUnsupported`
38//!
39//! Per the `d9f3b7a` precedent (gating Linux-only host surface so the
40//! workspace builds on Windows), the per-probe `read()` implementations are
41//! `#[cfg(target_os = "linux")]`. The non-Linux fallback is a single arm
42//! that returns [`ProbeError::PlatformUnsupported`]. Tests that depend on
43//! reading concrete kernel files only run under Linux; the rest run on every
44//! platform via the `CapturingSink` test surface.
45//!
46//! ## F1a/F1b boundary
47//!
48//! F1b (a sibling slot, not in this changeset) owns event-type registration
49//! in `cellos-core::events`. Until F1b lands, this module defines local stub
50//! constructors named `cell_observability_host_<probe>_v1` that mint
51//! [`cellos_core::CloudEventV1`] envelopes with the canonical
52//! `dev.cellos.events.cell.observability.host.v1.<probe>` event type. When
53//! F1b lands, the local stubs collapse into thin wrappers around the
54//! `cellos-core` originals; the per-probe modules' call sites do not change.
55
56use std::sync::Arc;
57use std::time::SystemTime;
58
59use async_trait::async_trait;
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use thiserror::Error;
63
64use cellos_core::error::CellosError;
65use cellos_core::ports::EventSink;
66use cellos_core::CloudEventV1;
67
68pub mod cgroup;
69pub mod fc_metrics;
70pub mod nftables;
71pub mod tap_link;
72
73/// CloudEvents `source` for host-probe-emitted events.
74///
75/// Stable identifier for the publisher; consumers use it to route Path B
76/// readings without parsing the event type. The pair `(source, type)` is
77/// the canonical CloudEvents idempotency key.
78pub const HOST_PROBE_EVENT_SOURCE: &str = "cellos-host-telemetry/probes";
79
80/// CloudEvents type prefix for host-probe-emitted events.
81///
82/// The full type is `<prefix>.<probe>` where `<probe>` matches the
83/// [`ProbeReading::probe_source`] short suffix (e.g. `fc_metrics`).
84pub const HOST_PROBE_EVENT_TYPE_PREFIX: &str = "dev.cellos.events.cell.observability.host.v1";
85
86/// Per-cell context every host probe needs to attribute its readings.
87///
88/// Mirrors the host-stamped attribution fields ADR-0006 §6 declares
89/// non-negotiable. `cell_id` and `run_id` come from the supervisor at cell
90/// creation; `spec_signature_hash` is the canonical sha256 of the spec the
91/// cell was admitted under (matches `events::canonical_spec_hash` shape).
92#[derive(Debug, Clone)]
93pub struct ProbeContext {
94 /// Per-run cell identifier.
95 pub cell_id: String,
96 /// Per-run identifier.
97 pub run_id: String,
98 /// `sha256:...` digest of the spec admitted for this run.
99 pub spec_signature_hash: String,
100}
101
102impl ProbeContext {
103 /// Convenience constructor for tests / call-sites that want owned strings.
104 pub fn new(
105 cell_id: impl Into<String>,
106 run_id: impl Into<String>,
107 spec_signature_hash: impl Into<String>,
108 ) -> Self {
109 Self {
110 cell_id: cell_id.into(),
111 run_id: run_id.into(),
112 spec_signature_hash: spec_signature_hash.into(),
113 }
114 }
115}
116
117/// One reading produced by a [`HostProbe::read`] call.
118///
119/// The mandatory triple per D12: which probe, what concrete inputs it read,
120/// and the structured output extracted. `host_received_at` is stamped by
121/// [`emit_reading`] at dispatch time, not by the probe itself, so the
122/// ordering between "probe ran" and "reading hit the sink" stays observable.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(rename_all = "camelCase")]
125pub struct ProbeReading {
126 /// Stable identifier for the probe (e.g. `"host.fc_metrics"`,
127 /// `"host.cgroup"`, `"host.nftables"`, `"host.tap_link"`). Matches
128 /// the CloudEvents type suffix after the canonical prefix is stripped.
129 pub probe_source: String,
130 /// Concrete inputs the probe consumed (file path, endpoint, etc.).
131 /// Audit-replayable: re-reading the same `inputs` should reproduce
132 /// the reading on a quiescent system.
133 pub inputs: Value,
134 /// Structured output extracted from the inputs. Free-form per probe.
135 pub output: Value,
136 /// Wallclock the supervisor stamped at dispatch (not at probe-fire).
137 /// `None` until [`emit_reading`] runs; `Some(_)` on every event the
138 /// sink receives.
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub host_received_at: Option<chrono::DateTime<chrono::Utc>>,
141}
142
143impl ProbeReading {
144 /// Construct a reading without a host-receive stamp; [`emit_reading`]
145 /// stamps it during dispatch.
146 pub fn new(probe_source: impl Into<String>, inputs: Value, output: Value) -> Self {
147 Self {
148 probe_source: probe_source.into(),
149 inputs,
150 output,
151 host_received_at: None,
152 }
153 }
154}
155
156/// Errors a host probe can surface.
157#[derive(Debug, Error)]
158pub enum ProbeError {
159 /// The probe's underlying primitive is not available on this host
160 /// platform (e.g. cgroup-v2 files outside Linux). Per `d9f3b7a`, the
161 /// non-Linux build path returns this rather than panicking.
162 #[error("host probe is not supported on this platform")]
163 PlatformUnsupported,
164
165 /// The probe could not read its underlying source (file, endpoint…).
166 #[error("host probe i/o error: {0}")]
167 Io(String),
168
169 /// The probe's output could not be parsed (e.g. malformed `cpu.stat`).
170 #[error("host probe parse error: {0}")]
171 Parse(String),
172
173 /// The downstream [`EventSink`] rejected the emitted event.
174 #[error("host probe sink error: {0}")]
175 Sink(String),
176}
177
178impl From<CellosError> for ProbeError {
179 fn from(err: CellosError) -> Self {
180 ProbeError::Sink(err.to_string())
181 }
182}
183
184/// A single host-side probe.
185///
186/// The trait is async so an implementation can fan a single `read` out to
187/// I/O without blocking (e.g. an HTTP scrape of Firecracker `/metrics`).
188/// The default supervisor wiring will hold a `Vec<Arc<dyn HostProbe>>` and
189/// poll each one on a `cellos.host_probe.tick`.
190#[async_trait]
191pub trait HostProbe: Send + Sync {
192 /// Stable short identifier; used as the CloudEvents type suffix.
193 /// Examples: `"fc_metrics"`, `"cgroup"`, `"nftables"`, `"tap_link"`.
194 fn probe_name(&self) -> &'static str;
195
196 /// Read the probe's underlying source and return one [`ProbeReading`].
197 ///
198 /// Probes are stateless from the trait's perspective; per-probe
199 /// concrete types may hold deltas (e.g. `cgroup.memory.events.oom`)
200 /// inside `&self` but `read` does not borrow mutably on purpose so a
201 /// single probe can be polled from multiple cells in parallel via
202 /// [`Arc`].
203 async fn read(&self, ctx: &ProbeContext) -> Result<ProbeReading, ProbeError>;
204}
205
206/// Stamp `host_received_at` on `reading`, build the CloudEvent envelope
207/// via the F1b-shaped constructors, and dispatch via the supervisor's
208/// existing [`EventSink`].
209///
210/// This is the function every probe call site uses — it is the single point
211/// where "a probe produced something" turns into "the sink saw an event",
212/// so the host-stamping discipline is non-bypassable.
213pub async fn emit_reading(
214 sink: &Arc<dyn EventSink>,
215 ctx: &ProbeContext,
216 mut reading: ProbeReading,
217) -> Result<(), ProbeError> {
218 let stamp = chrono::DateTime::<chrono::Utc>::from(SystemTime::now());
219 reading.host_received_at = Some(stamp);
220
221 let event = build_host_probe_envelope(ctx, &reading)?;
222 sink.emit(&event).await?;
223 Ok(())
224}
225
226/// Build a [`CloudEventV1`] envelope for a host-probe reading.
227///
228/// Stamps the non-negotiable attribution fields per ADR-0006 §6 into `data`,
229/// then defers to the per-probe F1b stub constructor for the event type
230/// suffix. Public so call-sites that need to inspect the envelope before
231/// emit (e.g. integration tests, `evidence_bundle` builders) can re-use the
232/// same shape the sink will see.
233pub fn build_host_probe_envelope(
234 ctx: &ProbeContext,
235 reading: &ProbeReading,
236) -> Result<CloudEventV1, ProbeError> {
237 let probe_short = probe_short_name(&reading.probe_source);
238 let event_type = match probe_short {
239 "fc_metrics" => f1b_stub::cell_observability_host_fc_metrics_v1_type(),
240 "cgroup" => f1b_stub::cell_observability_host_cgroup_v1_type(),
241 "nftables" => f1b_stub::cell_observability_host_nftables_v1_type(),
242 "tap_link" => f1b_stub::cell_observability_host_tap_v1_type(),
243 other => {
244 return Err(ProbeError::Parse(format!(
245 "unknown host probe short name: {other}"
246 )))
247 }
248 };
249
250 let stamp_rfc3339 = reading
251 .host_received_at
252 .map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
253 .unwrap_or_else(|| {
254 chrono::DateTime::<chrono::Utc>::from(SystemTime::now())
255 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
256 });
257
258 let data = serde_json::json!({
259 "cellId": ctx.cell_id,
260 "runId": ctx.run_id,
261 "specSignatureHash": ctx.spec_signature_hash,
262 "hostReceivedAt": stamp_rfc3339.clone(),
263 "probeSource": reading.probe_source,
264 "inputs": reading.inputs,
265 "output": reading.output,
266 });
267
268 Ok(CloudEventV1 {
269 specversion: "1.0".into(),
270 id: format!("urn:cellos:event:{}", uuid::Uuid::new_v4()),
271 source: HOST_PROBE_EVENT_SOURCE.into(),
272 ty: event_type,
273 datacontenttype: Some("application/json".into()),
274 data: Some(data),
275 time: Some(stamp_rfc3339),
276 traceparent: None,
277 })
278}
279
280/// Strip the optional `host.` prefix the per-probe modules use for
281/// `probe_source` so it lines up with the canonical CloudEvents type
282/// suffix ("fc_metrics", "cgroup", "nftables", "tap_link"). Tolerates
283/// either shape so external test fixtures can pass either form.
284fn probe_short_name(probe_source: &str) -> &str {
285 probe_source.strip_prefix("host.").unwrap_or(probe_source)
286}
287
288/// F1b-shaped stub constructors.
289///
290/// F1b owns the canonical event-type registration in `cellos-core::events`.
291/// Until that lands, these stubs return the canonical CloudEvents `type`
292/// strings so the per-probe modules can build envelopes without forking
293/// nomenclature. When F1b is merged, this module collapses into thin
294/// wrappers; nothing else changes.
295pub mod f1b_stub {
296 use super::HOST_PROBE_EVENT_TYPE_PREFIX;
297
298 /// Type for `dev.cellos.events.cell.observability.host.v1.fc_metrics`.
299 pub fn cell_observability_host_fc_metrics_v1_type() -> String {
300 format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.fc_metrics")
301 }
302
303 /// Type for `dev.cellos.events.cell.observability.host.v1.cgroup`.
304 pub fn cell_observability_host_cgroup_v1_type() -> String {
305 format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.cgroup")
306 }
307
308 /// Type for `dev.cellos.events.cell.observability.host.v1.nftables`.
309 pub fn cell_observability_host_nftables_v1_type() -> String {
310 format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.nftables")
311 }
312
313 /// Type for `dev.cellos.events.cell.observability.host.v1.tap_link`.
314 pub fn cell_observability_host_tap_v1_type() -> String {
315 format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.tap_link")
316 }
317}
318
319/// In-memory test sink that records every [`CloudEventV1`] it sees.
320///
321/// Lives behind `cfg(test)` so production builds don't pay the lock cost.
322/// Used by the per-probe tests to assert each emitted event carries
323/// `probeSource`, `inputs`, `output`, `cellId`, `runId`, and `hostReceivedAt`.
324#[cfg(test)]
325pub mod test_support {
326 use std::sync::Arc;
327
328 use async_trait::async_trait;
329 use tokio::sync::Mutex;
330
331 use cellos_core::error::CellosError;
332 use cellos_core::ports::EventSink;
333 use cellos_core::CloudEventV1;
334
335 /// Captures every event emitted through it for per-phase assertions.
336 #[derive(Default)]
337 pub struct CapturingSink {
338 events: Mutex<Vec<CloudEventV1>>,
339 }
340
341 impl CapturingSink {
342 /// Construct an empty sink.
343 pub fn new() -> Self {
344 Self {
345 events: Mutex::new(Vec::new()),
346 }
347 }
348
349 /// Wrap `self` as an `Arc<dyn EventSink>` for ergonomic call sites.
350 pub fn as_arc(self) -> Arc<dyn EventSink> {
351 Arc::new(self)
352 }
353
354 /// Snapshot every captured event in emission order.
355 pub async fn snapshot(&self) -> Vec<CloudEventV1> {
356 self.events.lock().await.clone()
357 }
358 }
359
360 #[async_trait]
361 impl EventSink for CapturingSink {
362 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
363 self.events.lock().await.push(event.clone());
364 Ok(())
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn probe_short_name_strips_host_prefix() {
375 assert_eq!(probe_short_name("host.fc_metrics"), "fc_metrics");
376 assert_eq!(probe_short_name("fc_metrics"), "fc_metrics");
377 }
378
379 #[test]
380 fn f1b_stub_types_match_adr0006_shape() {
381 assert_eq!(
382 f1b_stub::cell_observability_host_fc_metrics_v1_type(),
383 "dev.cellos.events.cell.observability.host.v1.fc_metrics"
384 );
385 assert_eq!(
386 f1b_stub::cell_observability_host_cgroup_v1_type(),
387 "dev.cellos.events.cell.observability.host.v1.cgroup"
388 );
389 assert_eq!(
390 f1b_stub::cell_observability_host_nftables_v1_type(),
391 "dev.cellos.events.cell.observability.host.v1.nftables"
392 );
393 assert_eq!(
394 f1b_stub::cell_observability_host_tap_v1_type(),
395 "dev.cellos.events.cell.observability.host.v1.tap_link"
396 );
397 }
398
399 #[test]
400 fn build_envelope_carries_attribution_fields() {
401 let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
402 let reading = ProbeReading {
403 probe_source: "host.fc_metrics".into(),
404 inputs: serde_json::json!({"endpoint": "http://localhost/metrics"}),
405 output: serde_json::json!({"vmm": {"reads": 0}}),
406 host_received_at: Some(chrono::Utc::now()),
407 };
408 let env = build_host_probe_envelope(&ctx, &reading).expect("build envelope");
409 assert_eq!(env.specversion, "1.0");
410 assert_eq!(env.source, HOST_PROBE_EVENT_SOURCE);
411 assert!(env.ty.ends_with(".fc_metrics"));
412 let data = env.data.expect("data present");
413 assert_eq!(data["cellId"], "cell-1");
414 assert_eq!(data["runId"], "run-7");
415 assert_eq!(data["specSignatureHash"], "sha256:abcd");
416 assert_eq!(data["probeSource"], "host.fc_metrics");
417 assert!(data["inputs"].is_object());
418 assert!(data["output"].is_object());
419 assert!(data["hostReceivedAt"].is_string());
420 }
421
422 #[tokio::test]
423 async fn emit_reading_stamps_host_received_at_and_dispatches() {
424 // Hold the sink behind an `Arc<CapturingSink>` so the test can both
425 // dispatch through it (as `Arc<dyn EventSink>`) and read its
426 // `snapshot()` afterward.
427 let sink = Arc::new(test_support::CapturingSink::new());
428 let dyn_sink: Arc<dyn EventSink> = sink.clone();
429
430 let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
431 let reading = ProbeReading::new(
432 "host.fc_metrics",
433 serde_json::json!({"endpoint": "http://127.0.0.1:0/metrics"}),
434 serde_json::json!({"vmm": {"reads": 1}}),
435 );
436 emit_reading(&dyn_sink, &ctx, reading)
437 .await
438 .expect("emit succeeds");
439
440 let events = sink.snapshot().await;
441 assert_eq!(events.len(), 1);
442 let data = events[0].data.as_ref().expect("data");
443 assert!(data["hostReceivedAt"].is_string());
444 assert_eq!(data["probeSource"], "host.fc_metrics");
445 assert_eq!(data["cellId"], "cell-1");
446 assert_eq!(data["runId"], "run-7");
447 }
448}