Skip to main content

cellos_host_telemetry/probes/
mod.rs

1//! Host-side observability probes (Slot F1a / Path B).
2//!
3//! These are the **host-side** counterpart to the in-guest `cellos-telemetry`
4//! agent's Path A declarations. Where Path A asks the guest to *declare* what
5//! happened ("a process spawned, here's its `comm`"), Path B watches the cell
6//! from outside the guest using primitives the supervisor already controls:
7//!
8//! - the Firecracker VMM's `/metrics` endpoint
9//! - cgroup-v2 leaf files (`memory.events`, `cpu.stat`, `pids.events`)
10//! - per-cell nftables byte/packet counters
11//! - the host-side TAP link's carrier / flags state
12//!
13//! Each of these runs **on the host**, in the supervisor's address space, and
14//! produces a [`ProbeReading`] that gets dispatched into the supervisor's
15//! existing [`cellos_core::ports::EventSink`]. The guest cannot forge these
16//! readings — the data comes from kernel files / VMM endpoints the guest has
17//! no path to. That makes Path B the cross-witness for Path A: a guest
18//! `connect_attempted` declaration without a matching host-side
19//! `network_flow_decision` is the silence the projector flags.
20//!
21//! ## D12 — `probe_source` / `inputs` / `output` are mandatory
22//!
23//! Doctrine D12 ("every probe-emitted event is attributable to a probe and
24//! its concrete inputs/outputs") means a `ProbeReading` MUST carry:
25//!
26//! - `probe_source` — stable string identifying which probe produced this
27//!   reading (e.g. `"host.fc_metrics"`).
28//! - `inputs` — JSON object describing the concrete inputs the probe read
29//!   (file path, endpoint URL, table name, interface name…). This is what
30//!   makes the reading audit-replayable.
31//! - `output` — JSON object holding the structured fields the probe
32//!   extracted (counters, gauges, link state). Free-form per probe.
33//!
34//! Each per-probe module (`fc_metrics`, `cgroup`, `nftables`, `tap_link`)
35//! defines the concrete shape of its `inputs` and `output` blocks.
36//!
37//! ## Linux-only — non-Linux returns `ProbeError::PlatformUnsupported`
38//!
39//! Per the `d9f3b7a` precedent (gating Linux-only host surface so the
40//! workspace builds on Windows), the per-probe `read()` implementations are
41//! `#[cfg(target_os = "linux")]`. The non-Linux fallback is a single arm
42//! that returns [`ProbeError::PlatformUnsupported`]. Tests that depend on
43//! reading concrete kernel files only run under Linux; the rest run on every
44//! platform via the `CapturingSink` test surface.
45//!
46//! ## F1a/F1b boundary
47//!
48//! F1b (a sibling slot, not in this changeset) owns event-type registration
49//! in `cellos-core::events`. Until F1b lands, this module defines local stub
50//! constructors named `cell_observability_host_<probe>_v1` that mint
51//! [`cellos_core::CloudEventV1`] envelopes with the canonical
52//! `dev.cellos.events.cell.observability.host.v1.<probe>` event type. When
53//! F1b lands, the local stubs collapse into thin wrappers around the
54//! `cellos-core` originals; the per-probe modules' call sites do not change.
55
56use std::sync::Arc;
57use std::time::SystemTime;
58
59use async_trait::async_trait;
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use thiserror::Error;
63
64use cellos_core::error::CellosError;
65use cellos_core::ports::EventSink;
66use cellos_core::CloudEventV1;
67
68pub mod cgroup;
69pub mod fc_metrics;
70pub mod nftables;
71pub mod tap_link;
72
73/// CloudEvents `source` for host-probe-emitted events.
74///
75/// Stable identifier for the publisher; consumers use it to route Path B
76/// readings without parsing the event type. The pair `(source, type)` is
77/// the canonical CloudEvents idempotency key.
78pub const HOST_PROBE_EVENT_SOURCE: &str = "cellos-host-telemetry/probes";
79
80/// CloudEvents type prefix for host-probe-emitted events.
81///
82/// The full type is `<prefix>.<probe>` where `<probe>` matches the
83/// [`ProbeReading::probe_source`] short suffix (e.g. `fc_metrics`).
84pub const HOST_PROBE_EVENT_TYPE_PREFIX: &str = "dev.cellos.events.cell.observability.host.v1";
85
86/// Per-cell context every host probe needs to attribute its readings.
87///
88/// Mirrors the host-stamped attribution fields ADR-0006 §6 declares
89/// non-negotiable. `cell_id` and `run_id` come from the supervisor at cell
90/// creation; `spec_signature_hash` is the canonical sha256 of the spec the
91/// cell was admitted under (matches `events::canonical_spec_hash` shape).
92#[derive(Debug, Clone)]
93pub struct ProbeContext {
94    /// Per-run cell identifier.
95    pub cell_id: String,
96    /// Per-run identifier.
97    pub run_id: String,
98    /// `sha256:...` digest of the spec admitted for this run.
99    pub spec_signature_hash: String,
100}
101
102impl ProbeContext {
103    /// Convenience constructor for tests / call-sites that want owned strings.
104    pub fn new(
105        cell_id: impl Into<String>,
106        run_id: impl Into<String>,
107        spec_signature_hash: impl Into<String>,
108    ) -> Self {
109        Self {
110            cell_id: cell_id.into(),
111            run_id: run_id.into(),
112            spec_signature_hash: spec_signature_hash.into(),
113        }
114    }
115}
116
117/// One reading produced by a [`HostProbe::read`] call.
118///
119/// The mandatory triple per D12: which probe, what concrete inputs it read,
120/// and the structured output extracted. `host_received_at` is stamped by
121/// [`emit_reading`] at dispatch time, not by the probe itself, so the
122/// ordering between "probe ran" and "reading hit the sink" stays observable.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(rename_all = "camelCase")]
125pub struct ProbeReading {
126    /// Stable identifier for the probe (e.g. `"host.fc_metrics"`,
127    /// `"host.cgroup"`, `"host.nftables"`, `"host.tap_link"`). Matches
128    /// the CloudEvents type suffix after the canonical prefix is stripped.
129    pub probe_source: String,
130    /// Concrete inputs the probe consumed (file path, endpoint, etc.).
131    /// Audit-replayable: re-reading the same `inputs` should reproduce
132    /// the reading on a quiescent system.
133    pub inputs: Value,
134    /// Structured output extracted from the inputs. Free-form per probe.
135    pub output: Value,
136    /// Wallclock the supervisor stamped at dispatch (not at probe-fire).
137    /// `None` until [`emit_reading`] runs; `Some(_)` on every event the
138    /// sink receives.
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub host_received_at: Option<chrono::DateTime<chrono::Utc>>,
141}
142
143impl ProbeReading {
144    /// Construct a reading without a host-receive stamp; [`emit_reading`]
145    /// stamps it during dispatch.
146    pub fn new(probe_source: impl Into<String>, inputs: Value, output: Value) -> Self {
147        Self {
148            probe_source: probe_source.into(),
149            inputs,
150            output,
151            host_received_at: None,
152        }
153    }
154}
155
156/// Errors a host probe can surface.
157#[derive(Debug, Error)]
158pub enum ProbeError {
159    /// The probe's underlying primitive is not available on this host
160    /// platform (e.g. cgroup-v2 files outside Linux). Per `d9f3b7a`, the
161    /// non-Linux build path returns this rather than panicking.
162    #[error("host probe is not supported on this platform")]
163    PlatformUnsupported,
164
165    /// The probe could not read its underlying source (file, endpoint…).
166    #[error("host probe i/o error: {0}")]
167    Io(String),
168
169    /// The probe's output could not be parsed (e.g. malformed `cpu.stat`).
170    #[error("host probe parse error: {0}")]
171    Parse(String),
172
173    /// The downstream [`EventSink`] rejected the emitted event.
174    #[error("host probe sink error: {0}")]
175    Sink(String),
176}
177
178impl From<CellosError> for ProbeError {
179    fn from(err: CellosError) -> Self {
180        ProbeError::Sink(err.to_string())
181    }
182}
183
184/// A single host-side probe.
185///
186/// The trait is async so an implementation can fan a single `read` out to
187/// I/O without blocking (e.g. an HTTP scrape of Firecracker `/metrics`).
188/// The default supervisor wiring will hold a `Vec<Arc<dyn HostProbe>>` and
189/// poll each one on a `cellos.host_probe.tick`.
190#[async_trait]
191pub trait HostProbe: Send + Sync {
192    /// Stable short identifier; used as the CloudEvents type suffix.
193    /// Examples: `"fc_metrics"`, `"cgroup"`, `"nftables"`, `"tap_link"`.
194    fn probe_name(&self) -> &'static str;
195
196    /// Read the probe's underlying source and return one [`ProbeReading`].
197    ///
198    /// Probes are stateless from the trait's perspective; per-probe
199    /// concrete types may hold deltas (e.g. `cgroup.memory.events.oom`)
200    /// inside `&self` but `read` does not borrow mutably on purpose so a
201    /// single probe can be polled from multiple cells in parallel via
202    /// [`Arc`].
203    async fn read(&self, ctx: &ProbeContext) -> Result<ProbeReading, ProbeError>;
204}
205
206/// Stamp `host_received_at` on `reading`, build the CloudEvent envelope
207/// via the F1b-shaped constructors, and dispatch via the supervisor's
208/// existing [`EventSink`].
209///
210/// This is the function every probe call site uses — it is the single point
211/// where "a probe produced something" turns into "the sink saw an event",
212/// so the host-stamping discipline is non-bypassable.
213pub async fn emit_reading(
214    sink: &Arc<dyn EventSink>,
215    ctx: &ProbeContext,
216    mut reading: ProbeReading,
217) -> Result<(), ProbeError> {
218    let stamp = chrono::DateTime::<chrono::Utc>::from(SystemTime::now());
219    reading.host_received_at = Some(stamp);
220
221    let event = build_host_probe_envelope(ctx, &reading)?;
222    sink.emit(&event).await?;
223    Ok(())
224}
225
226/// Build a [`CloudEventV1`] envelope for a host-probe reading.
227///
228/// Stamps the non-negotiable attribution fields per ADR-0006 §6 into `data`,
229/// then defers to the per-probe F1b stub constructor for the event type
230/// suffix. Public so call-sites that need to inspect the envelope before
231/// emit (e.g. integration tests, `evidence_bundle` builders) can re-use the
232/// same shape the sink will see.
233pub fn build_host_probe_envelope(
234    ctx: &ProbeContext,
235    reading: &ProbeReading,
236) -> Result<CloudEventV1, ProbeError> {
237    let probe_short = probe_short_name(&reading.probe_source);
238    let event_type = match probe_short {
239        "fc_metrics" => f1b_stub::cell_observability_host_fc_metrics_v1_type(),
240        "cgroup" => f1b_stub::cell_observability_host_cgroup_v1_type(),
241        "nftables" => f1b_stub::cell_observability_host_nftables_v1_type(),
242        "tap_link" => f1b_stub::cell_observability_host_tap_v1_type(),
243        other => {
244            return Err(ProbeError::Parse(format!(
245                "unknown host probe short name: {other}"
246            )))
247        }
248    };
249
250    let stamp_rfc3339 = reading
251        .host_received_at
252        .map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
253        .unwrap_or_else(|| {
254            chrono::DateTime::<chrono::Utc>::from(SystemTime::now())
255                .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
256        });
257
258    let data = serde_json::json!({
259        "cellId": ctx.cell_id,
260        "runId": ctx.run_id,
261        "specSignatureHash": ctx.spec_signature_hash,
262        "hostReceivedAt": stamp_rfc3339.clone(),
263        "probeSource": reading.probe_source,
264        "inputs": reading.inputs,
265        "output": reading.output,
266    });
267
268    Ok(CloudEventV1 {
269        specversion: "1.0".into(),
270        id: format!("urn:cellos:event:{}", uuid::Uuid::new_v4()),
271        source: HOST_PROBE_EVENT_SOURCE.into(),
272        ty: event_type,
273        datacontenttype: Some("application/json".into()),
274        data: Some(data),
275        time: Some(stamp_rfc3339),
276        traceparent: None,
277    })
278}
279
280/// Strip the optional `host.` prefix the per-probe modules use for
281/// `probe_source` so it lines up with the canonical CloudEvents type
282/// suffix ("fc_metrics", "cgroup", "nftables", "tap_link"). Tolerates
283/// either shape so external test fixtures can pass either form.
284fn probe_short_name(probe_source: &str) -> &str {
285    probe_source.strip_prefix("host.").unwrap_or(probe_source)
286}
287
288/// F1b-shaped stub constructors.
289///
290/// F1b owns the canonical event-type registration in `cellos-core::events`.
291/// Until that lands, these stubs return the canonical CloudEvents `type`
292/// strings so the per-probe modules can build envelopes without forking
293/// nomenclature. When F1b is merged, this module collapses into thin
294/// wrappers; nothing else changes.
295pub mod f1b_stub {
296    use super::HOST_PROBE_EVENT_TYPE_PREFIX;
297
298    /// Type for `dev.cellos.events.cell.observability.host.v1.fc_metrics`.
299    pub fn cell_observability_host_fc_metrics_v1_type() -> String {
300        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.fc_metrics")
301    }
302
303    /// Type for `dev.cellos.events.cell.observability.host.v1.cgroup`.
304    pub fn cell_observability_host_cgroup_v1_type() -> String {
305        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.cgroup")
306    }
307
308    /// Type for `dev.cellos.events.cell.observability.host.v1.nftables`.
309    pub fn cell_observability_host_nftables_v1_type() -> String {
310        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.nftables")
311    }
312
313    /// Type for `dev.cellos.events.cell.observability.host.v1.tap_link`.
314    pub fn cell_observability_host_tap_v1_type() -> String {
315        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.tap_link")
316    }
317}
318
319/// In-memory test sink that records every [`CloudEventV1`] it sees.
320///
321/// Lives behind `cfg(test)` so production builds don't pay the lock cost.
322/// Used by the per-probe tests to assert each emitted event carries
323/// `probeSource`, `inputs`, `output`, `cellId`, `runId`, and `hostReceivedAt`.
324#[cfg(test)]
325pub mod test_support {
326    use std::sync::Arc;
327
328    use async_trait::async_trait;
329    use tokio::sync::Mutex;
330
331    use cellos_core::error::CellosError;
332    use cellos_core::ports::EventSink;
333    use cellos_core::CloudEventV1;
334
335    /// Captures every event emitted through it for per-phase assertions.
336    #[derive(Default)]
337    pub struct CapturingSink {
338        events: Mutex<Vec<CloudEventV1>>,
339    }
340
341    impl CapturingSink {
342        /// Construct an empty sink.
343        pub fn new() -> Self {
344            Self {
345                events: Mutex::new(Vec::new()),
346            }
347        }
348
349        /// Wrap `self` as an `Arc<dyn EventSink>` for ergonomic call sites.
350        pub fn as_arc(self) -> Arc<dyn EventSink> {
351            Arc::new(self)
352        }
353
354        /// Snapshot every captured event in emission order.
355        pub async fn snapshot(&self) -> Vec<CloudEventV1> {
356            self.events.lock().await.clone()
357        }
358    }
359
360    #[async_trait]
361    impl EventSink for CapturingSink {
362        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
363            self.events.lock().await.push(event.clone());
364            Ok(())
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn probe_short_name_strips_host_prefix() {
375        assert_eq!(probe_short_name("host.fc_metrics"), "fc_metrics");
376        assert_eq!(probe_short_name("fc_metrics"), "fc_metrics");
377    }
378
379    #[test]
380    fn f1b_stub_types_match_adr0006_shape() {
381        assert_eq!(
382            f1b_stub::cell_observability_host_fc_metrics_v1_type(),
383            "dev.cellos.events.cell.observability.host.v1.fc_metrics"
384        );
385        assert_eq!(
386            f1b_stub::cell_observability_host_cgroup_v1_type(),
387            "dev.cellos.events.cell.observability.host.v1.cgroup"
388        );
389        assert_eq!(
390            f1b_stub::cell_observability_host_nftables_v1_type(),
391            "dev.cellos.events.cell.observability.host.v1.nftables"
392        );
393        assert_eq!(
394            f1b_stub::cell_observability_host_tap_v1_type(),
395            "dev.cellos.events.cell.observability.host.v1.tap_link"
396        );
397    }
398
399    #[test]
400    fn build_envelope_carries_attribution_fields() {
401        let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
402        let reading = ProbeReading {
403            probe_source: "host.fc_metrics".into(),
404            inputs: serde_json::json!({"endpoint": "http://localhost/metrics"}),
405            output: serde_json::json!({"vmm": {"reads": 0}}),
406            host_received_at: Some(chrono::Utc::now()),
407        };
408        let env = build_host_probe_envelope(&ctx, &reading).expect("build envelope");
409        assert_eq!(env.specversion, "1.0");
410        assert_eq!(env.source, HOST_PROBE_EVENT_SOURCE);
411        assert!(env.ty.ends_with(".fc_metrics"));
412        let data = env.data.expect("data present");
413        assert_eq!(data["cellId"], "cell-1");
414        assert_eq!(data["runId"], "run-7");
415        assert_eq!(data["specSignatureHash"], "sha256:abcd");
416        assert_eq!(data["probeSource"], "host.fc_metrics");
417        assert!(data["inputs"].is_object());
418        assert!(data["output"].is_object());
419        assert!(data["hostReceivedAt"].is_string());
420    }
421
422    #[tokio::test]
423    async fn emit_reading_stamps_host_received_at_and_dispatches() {
424        // Hold the sink behind an `Arc<CapturingSink>` so the test can both
425        // dispatch through it (as `Arc<dyn EventSink>`) and read its
426        // `snapshot()` afterward.
427        let sink = Arc::new(test_support::CapturingSink::new());
428        let dyn_sink: Arc<dyn EventSink> = sink.clone();
429
430        let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
431        let reading = ProbeReading::new(
432            "host.fc_metrics",
433            serde_json::json!({"endpoint": "http://127.0.0.1:0/metrics"}),
434            serde_json::json!({"vmm": {"reads": 1}}),
435        );
436        emit_reading(&dyn_sink, &ctx, reading)
437            .await
438            .expect("emit succeeds");
439
440        let events = sink.snapshot().await;
441        assert_eq!(events.len(), 1);
442        let data = events[0].data.as_ref().expect("data");
443        assert!(data["hostReceivedAt"].is_string());
444        assert_eq!(data["probeSource"], "host.fc_metrics");
445        assert_eq!(data["cellId"], "cell-1");
446        assert_eq!(data["runId"], "run-7");
447    }
448}