cellos-host-telemetry 0.5.0

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//! Host-side observability probes (Slot F1a / Path B).
//!
//! These are the **host-side** counterpart to the in-guest `cellos-telemetry`
//! agent's Path A declarations. Where Path A asks the guest to *declare* what
//! happened ("a process spawned, here's its `comm`"), Path B watches the cell
//! from outside the guest using primitives the supervisor already controls:
//!
//! - the Firecracker VMM's `/metrics` endpoint
//! - cgroup-v2 leaf files (`memory.events`, `cpu.stat`, `pids.events`)
//! - per-cell nftables byte/packet counters
//! - the host-side TAP link's carrier / flags state
//!
//! Each of these runs **on the host**, in the supervisor's address space, and
//! produces a [`ProbeReading`] that gets dispatched into the supervisor's
//! existing [`cellos_core::ports::EventSink`]. The guest cannot forge these
//! readings — the data comes from kernel files / VMM endpoints the guest has
//! no path to. That makes Path B the cross-witness for Path A: a guest
//! `connect_attempted` declaration without a matching host-side
//! `network_flow_decision` is the silence the projector flags.
//!
//! ## D12 — `probe_source` / `inputs` / `output` are mandatory
//!
//! Doctrine D12 ("every probe-emitted event is attributable to a probe and
//! its concrete inputs/outputs") means a `ProbeReading` MUST carry:
//!
//! - `probe_source` — stable string identifying which probe produced this
//!   reading (e.g. `"host.fc_metrics"`).
//! - `inputs` — JSON object describing the concrete inputs the probe read
//!   (file path, endpoint URL, table name, interface name…). This is what
//!   makes the reading audit-replayable.
//! - `output` — JSON object holding the structured fields the probe
//!   extracted (counters, gauges, link state). Free-form per probe.
//!
//! Each per-probe module (`fc_metrics`, `cgroup`, `nftables`, `tap_link`)
//! defines the concrete shape of its `inputs` and `output` blocks.
//!
//! ## Linux-only — non-Linux returns `ProbeError::PlatformUnsupported`
//!
//! Per the `d9f3b7a` precedent (gating Linux-only host surface so the
//! workspace builds on Windows), the per-probe `read()` implementations are
//! `#[cfg(target_os = "linux")]`. The non-Linux fallback is a single arm
//! that returns [`ProbeError::PlatformUnsupported`]. Tests that depend on
//! reading concrete kernel files only run under Linux; the rest run on every
//! platform via the `CapturingSink` test surface.
//!
//! ## F1a/F1b boundary
//!
//! F1b (a sibling slot, not in this changeset) owns event-type registration
//! in `cellos-core::events`. Until F1b lands, this module defines local stub
//! constructors named `cell_observability_host_<probe>_v1` that mint
//! [`cellos_core::CloudEventV1`] envelopes with the canonical
//! `dev.cellos.events.cell.observability.host.v1.<probe>` event type. When
//! F1b lands, the local stubs collapse into thin wrappers around the
//! `cellos-core` originals; the per-probe modules' call sites do not change.

use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;

use cellos_core::error::CellosError;
use cellos_core::ports::EventSink;
use cellos_core::CloudEventV1;

pub mod cgroup;
pub mod fc_metrics;
pub mod nftables;
pub mod tap_link;

/// CloudEvents `source` for host-probe-emitted events.
///
/// Stable identifier for the publisher; consumers use it to route Path B
/// readings without parsing the event type. The pair `(source, type)` is
/// the canonical CloudEvents idempotency key.
pub const HOST_PROBE_EVENT_SOURCE: &str = "cellos-host-telemetry/probes";

/// CloudEvents type prefix for host-probe-emitted events.
///
/// The full type is `<prefix>.<probe>` where `<probe>` matches the
/// [`ProbeReading::probe_source`] short suffix (e.g. `fc_metrics`).
pub const HOST_PROBE_EVENT_TYPE_PREFIX: &str = "dev.cellos.events.cell.observability.host.v1";

/// Per-cell context every host probe needs to attribute its readings.
///
/// Mirrors the host-stamped attribution fields ADR-0006 §6 declares
/// non-negotiable. `cell_id` and `run_id` come from the supervisor at cell
/// creation; `spec_signature_hash` is the canonical sha256 of the spec the
/// cell was admitted under (matches `events::canonical_spec_hash` shape).
#[derive(Debug, Clone)]
pub struct ProbeContext {
    /// Per-run cell identifier.
    pub cell_id: String,
    /// Per-run identifier.
    pub run_id: String,
    /// `sha256:...` digest of the spec admitted for this run.
    pub spec_signature_hash: String,
}

impl ProbeContext {
    /// Convenience constructor for tests / call-sites that want owned strings.
    pub fn new(
        cell_id: impl Into<String>,
        run_id: impl Into<String>,
        spec_signature_hash: impl Into<String>,
    ) -> Self {
        Self {
            cell_id: cell_id.into(),
            run_id: run_id.into(),
            spec_signature_hash: spec_signature_hash.into(),
        }
    }
}

/// One reading produced by a [`HostProbe::read`] call.
///
/// The mandatory triple per D12: which probe, what concrete inputs it read,
/// and the structured output extracted. `host_received_at` is stamped by
/// [`emit_reading`] at dispatch time, not by the probe itself, so the
/// ordering between "probe ran" and "reading hit the sink" stays observable.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProbeReading {
    /// Stable identifier for the probe (e.g. `"host.fc_metrics"`,
    /// `"host.cgroup"`, `"host.nftables"`, `"host.tap_link"`). Matches
    /// the CloudEvents type suffix after the canonical prefix is stripped.
    pub probe_source: String,
    /// Concrete inputs the probe consumed (file path, endpoint, etc.).
    /// Audit-replayable: re-reading the same `inputs` should reproduce
    /// the reading on a quiescent system.
    pub inputs: Value,
    /// Structured output extracted from the inputs. Free-form per probe.
    pub output: Value,
    /// Wallclock the supervisor stamped at dispatch (not at probe-fire).
    /// `None` until [`emit_reading`] runs; `Some(_)` on every event the
    /// sink receives.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub host_received_at: Option<chrono::DateTime<chrono::Utc>>,
}

impl ProbeReading {
    /// Construct a reading without a host-receive stamp; [`emit_reading`]
    /// stamps it during dispatch.
    pub fn new(probe_source: impl Into<String>, inputs: Value, output: Value) -> Self {
        Self {
            probe_source: probe_source.into(),
            inputs,
            output,
            host_received_at: None,
        }
    }
}

/// Errors a host probe can surface.
#[derive(Debug, Error)]
pub enum ProbeError {
    /// The probe's underlying primitive is not available on this host
    /// platform (e.g. cgroup-v2 files outside Linux). Per `d9f3b7a`, the
    /// non-Linux build path returns this rather than panicking.
    #[error("host probe is not supported on this platform")]
    PlatformUnsupported,

    /// The probe could not read its underlying source (file, endpoint…).
    #[error("host probe i/o error: {0}")]
    Io(String),

    /// The probe's output could not be parsed (e.g. malformed `cpu.stat`).
    #[error("host probe parse error: {0}")]
    Parse(String),

    /// The downstream [`EventSink`] rejected the emitted event.
    #[error("host probe sink error: {0}")]
    Sink(String),
}

impl From<CellosError> for ProbeError {
    fn from(err: CellosError) -> Self {
        ProbeError::Sink(err.to_string())
    }
}

/// A single host-side probe.
///
/// The trait is async so an implementation can fan a single `read` out to
/// I/O without blocking (e.g. an HTTP scrape of Firecracker `/metrics`).
/// The default supervisor wiring will hold a `Vec<Arc<dyn HostProbe>>` and
/// poll each one on a `cellos.host_probe.tick`.
#[async_trait]
pub trait HostProbe: Send + Sync {
    /// Stable short identifier; used as the CloudEvents type suffix.
    /// Examples: `"fc_metrics"`, `"cgroup"`, `"nftables"`, `"tap_link"`.
    fn probe_name(&self) -> &'static str;

    /// Read the probe's underlying source and return one [`ProbeReading`].
    ///
    /// Probes are stateless from the trait's perspective; per-probe
    /// concrete types may hold deltas (e.g. `cgroup.memory.events.oom`)
    /// inside `&self` but `read` does not borrow mutably on purpose so a
    /// single probe can be polled from multiple cells in parallel via
    /// [`Arc`].
    async fn read(&self, ctx: &ProbeContext) -> Result<ProbeReading, ProbeError>;
}

/// Stamp `host_received_at` on `reading`, build the CloudEvent envelope
/// via the F1b-shaped constructors, and dispatch via the supervisor's
/// existing [`EventSink`].
///
/// This is the function every probe call site uses — it is the single point
/// where "a probe produced something" turns into "the sink saw an event",
/// so the host-stamping discipline is non-bypassable.
pub async fn emit_reading(
    sink: &Arc<dyn EventSink>,
    ctx: &ProbeContext,
    mut reading: ProbeReading,
) -> Result<(), ProbeError> {
    let stamp = chrono::DateTime::<chrono::Utc>::from(SystemTime::now());
    reading.host_received_at = Some(stamp);

    let event = build_host_probe_envelope(ctx, &reading)?;
    sink.emit(&event).await?;
    Ok(())
}

/// Build a [`CloudEventV1`] envelope for a host-probe reading.
///
/// Stamps the non-negotiable attribution fields per ADR-0006 §6 into `data`,
/// then defers to the per-probe F1b stub constructor for the event type
/// suffix. Public so call-sites that need to inspect the envelope before
/// emit (e.g. integration tests, `evidence_bundle` builders) can re-use the
/// same shape the sink will see.
pub fn build_host_probe_envelope(
    ctx: &ProbeContext,
    reading: &ProbeReading,
) -> Result<CloudEventV1, ProbeError> {
    let probe_short = probe_short_name(&reading.probe_source);
    let event_type = match probe_short {
        "fc_metrics" => f1b_stub::cell_observability_host_fc_metrics_v1_type(),
        "cgroup" => f1b_stub::cell_observability_host_cgroup_v1_type(),
        "nftables" => f1b_stub::cell_observability_host_nftables_v1_type(),
        "tap_link" => f1b_stub::cell_observability_host_tap_v1_type(),
        other => {
            return Err(ProbeError::Parse(format!(
                "unknown host probe short name: {other}"
            )))
        }
    };

    let stamp_rfc3339 = reading
        .host_received_at
        .map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
        .unwrap_or_else(|| {
            chrono::DateTime::<chrono::Utc>::from(SystemTime::now())
                .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
        });

    let data = serde_json::json!({
        "cellId": ctx.cell_id,
        "runId": ctx.run_id,
        "specSignatureHash": ctx.spec_signature_hash,
        "hostReceivedAt": stamp_rfc3339.clone(),
        "probeSource": reading.probe_source,
        "inputs": reading.inputs,
        "output": reading.output,
    });

    Ok(CloudEventV1 {
        specversion: "1.0".into(),
        id: format!("urn:cellos:event:{}", uuid::Uuid::new_v4()),
        source: HOST_PROBE_EVENT_SOURCE.into(),
        ty: event_type,
        datacontenttype: Some("application/json".into()),
        data: Some(data),
        time: Some(stamp_rfc3339),
        traceparent: None,
    })
}

/// Strip the optional `host.` prefix the per-probe modules use for
/// `probe_source` so it lines up with the canonical CloudEvents type
/// suffix ("fc_metrics", "cgroup", "nftables", "tap_link"). Tolerates
/// either shape so external test fixtures can pass either form.
fn probe_short_name(probe_source: &str) -> &str {
    probe_source.strip_prefix("host.").unwrap_or(probe_source)
}

/// F1b-shaped stub constructors.
///
/// F1b owns the canonical event-type registration in `cellos-core::events`.
/// Until that lands, these stubs return the canonical CloudEvents `type`
/// strings so the per-probe modules can build envelopes without forking
/// nomenclature. When F1b is merged, this module collapses into thin
/// wrappers; nothing else changes.
pub mod f1b_stub {
    use super::HOST_PROBE_EVENT_TYPE_PREFIX;

    /// Type for `dev.cellos.events.cell.observability.host.v1.fc_metrics`.
    pub fn cell_observability_host_fc_metrics_v1_type() -> String {
        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.fc_metrics")
    }

    /// Type for `dev.cellos.events.cell.observability.host.v1.cgroup`.
    pub fn cell_observability_host_cgroup_v1_type() -> String {
        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.cgroup")
    }

    /// Type for `dev.cellos.events.cell.observability.host.v1.nftables`.
    pub fn cell_observability_host_nftables_v1_type() -> String {
        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.nftables")
    }

    /// Type for `dev.cellos.events.cell.observability.host.v1.tap_link`.
    pub fn cell_observability_host_tap_v1_type() -> String {
        format!("{HOST_PROBE_EVENT_TYPE_PREFIX}.tap_link")
    }
}

/// In-memory test sink that records every [`CloudEventV1`] it sees.
///
/// Lives behind `cfg(test)` so production builds don't pay the lock cost.
/// Used by the per-probe tests to assert each emitted event carries
/// `probeSource`, `inputs`, `output`, `cellId`, `runId`, and `hostReceivedAt`.
#[cfg(test)]
pub mod test_support {
    use std::sync::Arc;

    use async_trait::async_trait;
    use tokio::sync::Mutex;

    use cellos_core::error::CellosError;
    use cellos_core::ports::EventSink;
    use cellos_core::CloudEventV1;

    /// Captures every event emitted through it for per-phase assertions.
    #[derive(Default)]
    pub struct CapturingSink {
        events: Mutex<Vec<CloudEventV1>>,
    }

    impl CapturingSink {
        /// Construct an empty sink.
        pub fn new() -> Self {
            Self {
                events: Mutex::new(Vec::new()),
            }
        }

        /// Wrap `self` as an `Arc<dyn EventSink>` for ergonomic call sites.
        pub fn as_arc(self) -> Arc<dyn EventSink> {
            Arc::new(self)
        }

        /// Snapshot every captured event in emission order.
        pub async fn snapshot(&self) -> Vec<CloudEventV1> {
            self.events.lock().await.clone()
        }
    }

    #[async_trait]
    impl EventSink for CapturingSink {
        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
            self.events.lock().await.push(event.clone());
            Ok(())
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn probe_short_name_strips_host_prefix() {
        assert_eq!(probe_short_name("host.fc_metrics"), "fc_metrics");
        assert_eq!(probe_short_name("fc_metrics"), "fc_metrics");
    }

    #[test]
    fn f1b_stub_types_match_adr0006_shape() {
        assert_eq!(
            f1b_stub::cell_observability_host_fc_metrics_v1_type(),
            "dev.cellos.events.cell.observability.host.v1.fc_metrics"
        );
        assert_eq!(
            f1b_stub::cell_observability_host_cgroup_v1_type(),
            "dev.cellos.events.cell.observability.host.v1.cgroup"
        );
        assert_eq!(
            f1b_stub::cell_observability_host_nftables_v1_type(),
            "dev.cellos.events.cell.observability.host.v1.nftables"
        );
        assert_eq!(
            f1b_stub::cell_observability_host_tap_v1_type(),
            "dev.cellos.events.cell.observability.host.v1.tap_link"
        );
    }

    #[test]
    fn build_envelope_carries_attribution_fields() {
        let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
        let reading = ProbeReading {
            probe_source: "host.fc_metrics".into(),
            inputs: serde_json::json!({"endpoint": "http://localhost/metrics"}),
            output: serde_json::json!({"vmm": {"reads": 0}}),
            host_received_at: Some(chrono::Utc::now()),
        };
        let env = build_host_probe_envelope(&ctx, &reading).expect("build envelope");
        assert_eq!(env.specversion, "1.0");
        assert_eq!(env.source, HOST_PROBE_EVENT_SOURCE);
        assert!(env.ty.ends_with(".fc_metrics"));
        let data = env.data.expect("data present");
        assert_eq!(data["cellId"], "cell-1");
        assert_eq!(data["runId"], "run-7");
        assert_eq!(data["specSignatureHash"], "sha256:abcd");
        assert_eq!(data["probeSource"], "host.fc_metrics");
        assert!(data["inputs"].is_object());
        assert!(data["output"].is_object());
        assert!(data["hostReceivedAt"].is_string());
    }

    #[tokio::test]
    async fn emit_reading_stamps_host_received_at_and_dispatches() {
        // Hold the sink behind an `Arc<CapturingSink>` so the test can both
        // dispatch through it (as `Arc<dyn EventSink>`) and read its
        // `snapshot()` afterward.
        let sink = Arc::new(test_support::CapturingSink::new());
        let dyn_sink: Arc<dyn EventSink> = sink.clone();

        let ctx = ProbeContext::new("cell-1", "run-7", "sha256:abcd");
        let reading = ProbeReading::new(
            "host.fc_metrics",
            serde_json::json!({"endpoint": "http://127.0.0.1:0/metrics"}),
            serde_json::json!({"vmm": {"reads": 1}}),
        );
        emit_reading(&dyn_sink, &ctx, reading)
            .await
            .expect("emit succeeds");

        let events = sink.snapshot().await;
        assert_eq!(events.len(), 1);
        let data = events[0].data.as_ref().expect("data");
        assert!(data["hostReceivedAt"].is_string());
        assert_eq!(data["probeSource"], "host.fc_metrics");
        assert_eq!(data["cellId"], "cell-1");
        assert_eq!(data["runId"], "run-7");
    }
}