cellos-telemetry 0.5.1

In-guest telemetry agent for CellOS — runs as PID 2 inside Firecracker microVMs, emits CBOR-over-vsock observations. No signing key by design (ADR-0006).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
//! In-guest telemetry agent for the CellOS runner-evidence wedge (Phase F).
//!
//! Phase F3a — implementation. ADR-0006 is the doctrine reference.
//!
//! Operating model (ADR-0006 §5):
//! - This agent runs as PID 2, forked by `cellos-init` BEFORE the workload
//!   process (PID 3+) starts.
//! - Workload seccomp profile blocks `kill(2)`, `tgkill(2)`, and `ptrace(2)`
//!   against PIDs ≤ 2; the agent is structurally unreachable from the
//!   workload. (TODO seccomp gate — config lives in `cellos-host-firecracker`.)
//! - The agent holds NO private key. Authenticity comes from WHICH vsock
//!   CID:port its frames arrive on — the host bound the channel before the
//!   workload existed.
//! - The agent fills only `probe_source`, `guest_pid`, `guest_comm`,
//!   `guest_monotonic_ns`, and the leading `content_version`. The supervisor
//!   host-stamps `cell_id`, `run_id`, `host_received_at`, `spec_signature_hash`,
//!   and the ADG `output` block on receipt; anything the agent puts in those
//!   fields is overwritten.
//!
//! Wire shape (ADR-0006 §12):
//!
//! ```text
//! u32 LE length || CBOR map(5) {
//!     "content_version"     => u16  (FIRST — host short-circuits unknown major)
//!     "probe_source"        => text
//!     "guest_pid"           => u32
//!     "guest_comm"          => text
//!     "guest_monotonic_ns"  => u64
//! }
//! ```
//!
//! The CBOR encoder/decoder is hand-rolled and *minimal*: definite-length
//! map(5), uint major (0), text major (3), no floats, no tags, no indefinite
//! lengths. `content_version` is always emitted first so the host can reject
//! unknown majors before walking unknown probe-source strings.
//!
//! Probes the agent declares (Phase F3):
//! - `process.spawned`, `process.exited` (`/proc` delta walker)
//! - `capability.denied` (stub — kernel surface not yet wired)
//! - declared `inotify` watch (one)
//! - `net.connect_attempted` (stub)
//!
//! Back-pressure (ADR-0006 §5.3): drop-with-counter. The agent surfaces
//! drops via `cell.observability.guest.telemetry.dropped`, never by
//! blocking the workload.
//!
//! See [docs/adr/0006-in-vm-observability-runner-evidence.md] for the
//! complete decision record.

// Crate-wide policy: CBOR + framing core is pure-safe Rust. Syscall surfaces
// (probes/* and the binary entrypoint) opt out per-module with
// `#[allow(unsafe_code)]` because libc::inotify_init1/socket/connect/fork
// have no safe wrapper at this layer.
#![deny(unsafe_code)]
#![warn(missing_docs)]

/// CBOR wire-format major version. Must match
/// `cellos_host_telemetry::WIRE_CONTENT_VERSION_MAJOR` or the host rejects
/// the frame (ADR-0006 §12 wire-schema versioning).
pub const WIRE_CONTENT_VERSION_MAJOR: u16 = 1;

/// Vsock port the guest agent connects to on the host.
/// Must match `cellos_host_telemetry::VSOCK_TELEMETRY_PORT`.
pub const VSOCK_TELEMETRY_PORT: u32 = 9001;

/// Vsock CID for the host (`VMADDR_CID_HOST`). Firecracker maps guest-initiated
/// connect(CID=2) to the host listener.
pub const VMADDR_CID_HOST: u32 = 2;

/// Maximum frame body size (CBOR map payload) the agent ever emits.
/// 4 KiB is generous for our 5-field map; the host enforces the same bound.
pub const MAX_FRAME_BODY_BYTES: usize = 4096;

/// Probe source identifiers the agent emits.
///
/// String constants rather than an enum so the host's CBOR decoder can match
/// without coupling to this crate's Rust types — Path A is intentionally a
/// data wire, not a Rust API.
// TODO cfg-gate when F1a lands: future probes belong in #[cfg(target_os = "linux")] pub mod probes;
pub mod probe_source {
    /// Process spawn observed via guest-side `/proc` delta walk.
    pub const PROCESS_SPAWNED: &str = "process.spawned";
    /// Process exit observed via guest-side `/proc` delta walk.
    pub const PROCESS_EXITED: &str = "process.exited";
    /// `capability.denied` event from the kernel via guest probe.
    pub const CAPABILITY_DENIED: &str = "capability.denied";
    /// `inotify` watch fired on a declared FS path.
    pub const FS_INOTIFY_FIRED: &str = "fs.inotify_fired";
    /// `connect(2)` attempt observed pre-syscall.
    pub const NET_CONNECT_ATTEMPTED: &str = "net.connect_attempted";

    /// All known probe sources. Used by the decoder to reject unknown sources
    /// without bringing the channel down (drop-with-counter semantics).
    pub const ALL: &[&str] = &[
        PROCESS_SPAWNED,
        PROCESS_EXITED,
        CAPABILITY_DENIED,
        FS_INOTIFY_FIRED,
        NET_CONNECT_ATTEMPTED,
    ];

    /// True iff `s` matches one of the declared probe sources.
    pub fn is_known(s: &str) -> bool {
        ALL.contains(&s)
    }
}

pub mod probes;

/// Guest-side telemetry declaration — what the in-VM agent claims it will
/// exercise. The host validates `declared ⊆ authorized` before accepting
/// these events (ADR-0006 §11 admission validation, Doctrine #11).
///
/// Added 2026-05-16 as F3 prep alongside ADR-0006 acceptance. This is the
/// type the F3-next-tranche admission path will project from the spec's
/// `telemetry` block.
#[derive(Debug, Clone)]
pub struct GuestTelemetryDeclaration {
    /// The set of capabilities the workload claims it will exercise.
    /// Subset-checked against the cell's authorized surface at admission.
    pub declared_surface: cellos_core::authority::DeclaredAuthoritySurface,
    /// Version string of the in-guest telemetry agent emitting this
    /// declaration. Used for compatibility gating at the host.
    pub agent_version: String,
}

/// One probe-firing the agent emits.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeEvent {
    /// One of the [`probe_source`] constants.
    pub probe_source: &'static str,
    /// Guest-side process id (`getpid()` or as observed).
    pub guest_pid: u32,
    /// Guest-side process command name (truncated to 16 chars to match Linux `comm`).
    pub guest_comm: String,
    /// Guest-side monotonic timestamp at probe-fire.
    pub guest_monotonic_ns: u64,
}

// ---------------------------------------------------------------------------
// Hand-rolled minimal CBOR encoder/decoder.
//
// Subset only:
//   - major 0 (uint, 1/2/4/8-byte argument)
//   - major 3 (text string, 1/2/4-byte length argument)
//   - major 5 (map, definite-length only — we only ever emit map(5))
//
// No floats, no tags, no indefinite lengths, no negative ints, no arrays, no
// byte strings. If we ever add fields, the encoder grows; the decoder rejects
// anything outside this subset with `WireError::Unsupported*`.
// ---------------------------------------------------------------------------

/// Errors surfaced by the wire encoder/decoder.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WireError {
    /// Frame body exceeded [`MAX_FRAME_BODY_BYTES`].
    FrameTooLarge {
        /// Length of the offending frame body, in bytes.
        len: usize,
    },
    /// Frame body shorter than the declared length prefix.
    FrameTruncated {
        /// Length declared by the 4-byte big-endian length prefix.
        declared: u32,
        /// Actual number of body bytes available after the header.
        actual: usize,
    },
    /// Length-prefix header itself is shorter than 4 bytes.
    ShortHeader {
        /// Number of header bytes actually received.
        got: usize,
    },
    /// CBOR ran past end of buffer.
    UnexpectedEof,
    /// CBOR major type is not in the supported subset (0, 3, 5).
    UnsupportedMajor {
        /// Offending CBOR major type encountered in the buffer.
        major: u8,
    },
    /// CBOR additional info encodes indefinite-length or > 8-byte length.
    UnsupportedAdditional {
        /// Offending CBOR additional-info nibble (0..=31).
        additional: u8,
    },
    /// Outer item is not a 5-entry definite map.
    NotMap5,
    /// First map key is not the literal text `"content_version"` —
    /// host short-circuits unknown majors before walking the rest.
    ContentVersionMustBeFirst,
    /// `content_version` major is not [`WIRE_CONTENT_VERSION_MAJOR`].
    UnsupportedContentVersion(u16),
    /// Required field missing or duplicated.
    MissingField(&'static str),
    /// Field present but wrong CBOR major.
    FieldType {
        /// Name of the field whose CBOR major did not match the schema.
        field: &'static str,
        /// CBOR major type actually observed for that field.
        got_major: u8,
    },
    /// Integer field overflowed its declared width.
    IntegerOverflow {
        /// Name of the integer field that exceeded its declared width.
        field: &'static str,
    },
    /// Probe-source string not in [`probe_source::ALL`].
    UnknownProbeSource(String),
    /// Text field is not valid UTF-8.
    InvalidUtf8 {
        /// Name of the text field that failed UTF-8 validation.
        field: &'static str,
    },
}

impl core::fmt::Display for WireError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::FrameTooLarge { len } => write!(f, "frame too large: {len} bytes"),
            Self::FrameTruncated { declared, actual } => {
                write!(f, "frame truncated: declared {declared}, got {actual}")
            }
            Self::ShortHeader { got } => write!(f, "short header: {got} bytes"),
            Self::UnexpectedEof => write!(f, "unexpected end of CBOR buffer"),
            Self::UnsupportedMajor { major } => write!(f, "unsupported CBOR major {major}"),
            Self::UnsupportedAdditional { additional } => {
                write!(f, "unsupported CBOR additional {additional}")
            }
            Self::NotMap5 => write!(f, "outer CBOR item is not map(5)"),
            Self::ContentVersionMustBeFirst => {
                write!(f, "content_version must be the first map key")
            }
            Self::UnsupportedContentVersion(v) => {
                write!(f, "unsupported content_version major {v}")
            }
            Self::MissingField(n) => write!(f, "missing or duplicate field {n}"),
            Self::FieldType { field, got_major } => {
                write!(f, "field {field}: wrong major {got_major}")
            }
            Self::IntegerOverflow { field } => write!(f, "integer overflow in field {field}"),
            Self::UnknownProbeSource(s) => write!(f, "unknown probe_source {s:?}"),
            Self::InvalidUtf8 { field } => write!(f, "invalid utf8 in field {field}"),
        }
    }
}

impl std::error::Error for WireError {}

// ---- Encoder ---------------------------------------------------------------

/// Push the type byte (`major << 5 | additional`) and big-endian argument
/// for a CBOR uint. We always pick the smallest legal encoding; that is the
/// only encoding the decoder accepts as canonical-equivalent.
fn push_uint(out: &mut Vec<u8>, major: u8, value: u64) {
    debug_assert!(major <= 7);
    let m = major << 5;
    if value < 24 {
        out.push(m | (value as u8));
    } else if value <= u8::MAX as u64 {
        out.push(m | 24);
        out.push(value as u8);
    } else if value <= u16::MAX as u64 {
        out.push(m | 25);
        out.extend_from_slice(&(value as u16).to_be_bytes());
    } else if value <= u32::MAX as u64 {
        out.push(m | 26);
        out.extend_from_slice(&(value as u32).to_be_bytes());
    } else {
        out.push(m | 27);
        out.extend_from_slice(&value.to_be_bytes());
    }
}

fn push_text(out: &mut Vec<u8>, s: &str) {
    push_uint(out, 3, s.len() as u64);
    out.extend_from_slice(s.as_bytes());
}

/// CBOR-encode a [`ProbeEvent`] body (no length prefix).
///
/// `content_version` is emitted FIRST so the host can short-circuit unknown
/// majors before walking unknown probe-source strings (ADR-0006 §12).
pub fn encode_event_body(ev: &ProbeEvent) -> Vec<u8> {
    let mut out = Vec::with_capacity(128);

    // map(5) — definite-length, 5 entries.
    out.push((5u8 << 5) | 5);

    // 1. "content_version" => uint
    push_text(&mut out, "content_version");
    push_uint(&mut out, 0, WIRE_CONTENT_VERSION_MAJOR as u64);

    // 2. "probe_source" => text
    push_text(&mut out, "probe_source");
    push_text(&mut out, ev.probe_source);

    // 3. "guest_pid" => uint
    push_text(&mut out, "guest_pid");
    push_uint(&mut out, 0, ev.guest_pid as u64);

    // 4. "guest_comm" => text
    push_text(&mut out, "guest_comm");
    push_text(&mut out, &ev.guest_comm);

    // 5. "guest_monotonic_ns" => uint
    push_text(&mut out, "guest_monotonic_ns");
    push_uint(&mut out, 0, ev.guest_monotonic_ns);

    out
}

/// CBOR-encode a [`ProbeEvent`] and prepend the 4-byte LE frame-length
/// header.
///
/// Returns [`WireError::FrameTooLarge`] if the body exceeds
/// [`MAX_FRAME_BODY_BYTES`] — that is a budget the host enforces too, so
/// failing locally avoids putting an unreceivable frame on the wire.
pub fn encode_frame(ev: &ProbeEvent) -> Result<Vec<u8>, WireError> {
    let body = encode_event_body(ev);
    if body.len() > MAX_FRAME_BODY_BYTES {
        return Err(WireError::FrameTooLarge { len: body.len() });
    }
    let mut frame = Vec::with_capacity(4 + body.len());
    frame.extend_from_slice(&(body.len() as u32).to_le_bytes());
    frame.extend_from_slice(&body);
    Ok(frame)
}

// ---- Decoder ---------------------------------------------------------------

/// Cursor over a CBOR byte slice. Pure-safe; bounds-checked on every read.
struct Cursor<'a> {
    buf: &'a [u8],
    pos: usize,
}

impl<'a> Cursor<'a> {
    fn new(buf: &'a [u8]) -> Self {
        Self { buf, pos: 0 }
    }

    fn take(&mut self, n: usize) -> Result<&'a [u8], WireError> {
        if self.pos + n > self.buf.len() {
            return Err(WireError::UnexpectedEof);
        }
        let slice = &self.buf[self.pos..self.pos + n];
        self.pos += n;
        Ok(slice)
    }

    fn read_u8(&mut self) -> Result<u8, WireError> {
        Ok(self.take(1)?[0])
    }

    /// Decode the (major, argument) pair for the next CBOR item.
    /// Refuses indefinite-length (additional=31) and unsupported additional bits.
    fn read_header(&mut self) -> Result<(u8, u64), WireError> {
        let b = self.read_u8()?;
        let major = b >> 5;
        let additional = b & 0x1f;
        let arg = match additional {
            0..=23 => additional as u64,
            24 => self.read_u8()? as u64,
            25 => {
                let bs = self.take(2)?;
                u16::from_be_bytes([bs[0], bs[1]]) as u64
            }
            26 => {
                let bs = self.take(4)?;
                u32::from_be_bytes([bs[0], bs[1], bs[2], bs[3]]) as u64
            }
            27 => {
                let bs = self.take(8)?;
                u64::from_be_bytes([bs[0], bs[1], bs[2], bs[3], bs[4], bs[5], bs[6], bs[7]])
            }
            other => return Err(WireError::UnsupportedAdditional { additional: other }),
        };
        Ok((major, arg))
    }

    fn read_text(&mut self, field: &'static str) -> Result<String, WireError> {
        let (major, len) = self.read_header()?;
        if major != 3 {
            return Err(WireError::FieldType {
                field,
                got_major: major,
            });
        }
        let bytes = self.take(len as usize)?;
        std::str::from_utf8(bytes)
            .map(|s| s.to_owned())
            .map_err(|_| WireError::InvalidUtf8 { field })
    }

    fn read_uint(&mut self, field: &'static str) -> Result<u64, WireError> {
        let (major, val) = self.read_header()?;
        if major != 0 {
            return Err(WireError::FieldType {
                field,
                got_major: major,
            });
        }
        Ok(val)
    }
}

/// Decode a frame body (no length prefix) into a [`ProbeEvent`].
///
/// Validation order:
/// 1. Outer must be `map(5)`.
/// 2. First key must be the literal text `"content_version"` and its value
///    must equal [`WIRE_CONTENT_VERSION_MAJOR`]. This short-circuits unknown
///    majors before walking unknown probe-source strings.
/// 3. Remaining four keys may appear in any order; each must appear exactly
///    once.
/// 4. `probe_source` must be in [`probe_source::ALL`].
pub fn decode_event_body(body: &[u8]) -> Result<ProbeEvent, WireError> {
    let mut cur = Cursor::new(body);

    // Outer map(5).
    let (major, len) = cur.read_header()?;
    if major != 5 || len != 5 {
        return Err(WireError::NotMap5);
    }

    // First key MUST be content_version. Reject before doing any other work
    // so an unknown major can't trigger probe-source validation, etc.
    let first_key = cur.read_text("<map key 0>")?;
    if first_key != "content_version" {
        return Err(WireError::ContentVersionMustBeFirst);
    }
    let cv = cur.read_uint("content_version")?;
    if cv > u16::MAX as u64 {
        return Err(WireError::IntegerOverflow {
            field: "content_version",
        });
    }
    if cv as u16 != WIRE_CONTENT_VERSION_MAJOR {
        return Err(WireError::UnsupportedContentVersion(cv as u16));
    }

    // Remaining 4 keys, any order.
    let mut probe_source: Option<String> = None;
    let mut guest_pid: Option<u32> = None;
    let mut guest_comm: Option<String> = None;
    let mut guest_monotonic_ns: Option<u64> = None;

    for _ in 0..4 {
        let key = cur.read_text("<map key>")?;
        match key.as_str() {
            "probe_source" => {
                if probe_source.is_some() {
                    return Err(WireError::MissingField("probe_source"));
                }
                probe_source = Some(cur.read_text("probe_source")?);
            }
            "guest_pid" => {
                if guest_pid.is_some() {
                    return Err(WireError::MissingField("guest_pid"));
                }
                let v = cur.read_uint("guest_pid")?;
                if v > u32::MAX as u64 {
                    return Err(WireError::IntegerOverflow { field: "guest_pid" });
                }
                guest_pid = Some(v as u32);
            }
            "guest_comm" => {
                if guest_comm.is_some() {
                    return Err(WireError::MissingField("guest_comm"));
                }
                guest_comm = Some(cur.read_text("guest_comm")?);
            }
            "guest_monotonic_ns" => {
                if guest_monotonic_ns.is_some() {
                    return Err(WireError::MissingField("guest_monotonic_ns"));
                }
                guest_monotonic_ns = Some(cur.read_uint("guest_monotonic_ns")?);
            }
            _ => {
                // Unknown key — reject. Forwarding it would leak attribution
                // shape into the host stamp. We map all unknown keys to a
                // single static label so the decoder allocates nothing for
                // adversarial input.
                return Err(WireError::MissingField("<unknown key>"));
            }
        }
    }

    let ps_owned = probe_source.ok_or(WireError::MissingField("probe_source"))?;
    if !probe_source::is_known(&ps_owned) {
        return Err(WireError::UnknownProbeSource(ps_owned));
    }
    // Map back to the &'static str so downstream code keeps the no-alloc invariant.
    let ps_static: &'static str = probe_source::ALL
        .iter()
        .copied()
        .find(|k| *k == ps_owned)
        .expect("is_known just verified");

    Ok(ProbeEvent {
        probe_source: ps_static,
        guest_pid: guest_pid.ok_or(WireError::MissingField("guest_pid"))?,
        guest_comm: guest_comm.ok_or(WireError::MissingField("guest_comm"))?,
        guest_monotonic_ns: guest_monotonic_ns
            .ok_or(WireError::MissingField("guest_monotonic_ns"))?,
    })
}

/// Decode a length-prefixed frame: `u32 LE length || CBOR body`.
///
/// Returns [`WireError::ShortHeader`] if `frame.len() < 4` and
/// [`WireError::FrameTruncated`] if the declared length exceeds the bytes
/// actually present.
pub fn decode_frame(frame: &[u8]) -> Result<ProbeEvent, WireError> {
    if frame.len() < 4 {
        return Err(WireError::ShortHeader { got: frame.len() });
    }
    let declared = u32::from_le_bytes([frame[0], frame[1], frame[2], frame[3]]);
    if declared as usize > MAX_FRAME_BODY_BYTES {
        return Err(WireError::FrameTooLarge {
            len: declared as usize,
        });
    }
    let body_present = frame.len() - 4;
    if (declared as usize) > body_present {
        return Err(WireError::FrameTruncated {
            declared,
            actual: body_present,
        });
    }
    decode_event_body(&frame[4..4 + declared as usize])
}

#[cfg(test)]
mod tests {
    use super::*;

    fn sample() -> ProbeEvent {
        ProbeEvent {
            probe_source: probe_source::PROCESS_SPAWNED,
            guest_pid: 4242,
            guest_comm: "workload".to_owned(),
            guest_monotonic_ns: 123_456_789_012,
        }
    }

    #[test]
    fn wire_versions_match() {
        assert_eq!(WIRE_CONTENT_VERSION_MAJOR, 1);
        assert_eq!(VSOCK_TELEMETRY_PORT, 9001);
        assert_eq!(VMADDR_CID_HOST, 2);
    }

    #[test]
    fn probe_source_constants_are_stable_strings() {
        assert_eq!(probe_source::PROCESS_SPAWNED, "process.spawned");
        assert_eq!(probe_source::CAPABILITY_DENIED, "capability.denied");
        assert!(probe_source::is_known("process.spawned"));
        assert!(!probe_source::is_known("rogue.event"));
    }

    #[test]
    fn round_trip_encode_decode() {
        let ev = sample();
        let frame = encode_frame(&ev).expect("encode");
        let back = decode_frame(&frame).expect("decode");
        assert_eq!(back, ev);
    }

    #[test]
    fn round_trip_each_probe_source() {
        for &ps in probe_source::ALL {
            let ev = ProbeEvent {
                probe_source: ps,
                guest_pid: 7,
                guest_comm: "p".to_owned(),
                guest_monotonic_ns: 1,
            };
            let frame = encode_frame(&ev).unwrap();
            let back = decode_frame(&frame).unwrap();
            assert_eq!(back, ev, "round-trip failed for {ps}");
        }
    }

    #[test]
    fn content_version_first_short_circuits_unknown_major() {
        // Build a CBOR map(5) where the FIRST key is content_version but
        // the value is an unsupported major, and a LATER key is an
        // unknown probe source. The decoder MUST return
        // UnsupportedContentVersion before reaching the probe-source check.
        let mut body = Vec::new();
        body.push((5u8 << 5) | 5); // map(5)
        push_text(&mut body, "content_version");
        push_uint(&mut body, 0, 999); // bogus major
        push_text(&mut body, "probe_source");
        push_text(&mut body, "rogue.event"); // would also fail later
        push_text(&mut body, "guest_pid");
        push_uint(&mut body, 0, 1);
        push_text(&mut body, "guest_comm");
        push_text(&mut body, "x");
        push_text(&mut body, "guest_monotonic_ns");
        push_uint(&mut body, 0, 1);

        match decode_event_body(&body) {
            Err(WireError::UnsupportedContentVersion(999)) => {}
            other => panic!(
                "expected UnsupportedContentVersion(999), got {other:?} \
                 (host MUST short-circuit before probe-source validation)"
            ),
        }
    }

    #[test]
    fn content_version_must_be_first_key() {
        // First key is something else — even though all other fields are
        // present and valid. This is the structural guarantee that lets the
        // host reject without indexing the rest of the map.
        let mut body = Vec::new();
        body.push((5u8 << 5) | 5);
        push_text(&mut body, "guest_pid");
        push_uint(&mut body, 0, 1);
        push_text(&mut body, "content_version");
        push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
        push_text(&mut body, "probe_source");
        push_text(&mut body, probe_source::PROCESS_SPAWNED);
        push_text(&mut body, "guest_comm");
        push_text(&mut body, "x");
        push_text(&mut body, "guest_monotonic_ns");
        push_uint(&mut body, 0, 1);

        assert_eq!(
            decode_event_body(&body),
            Err(WireError::ContentVersionMustBeFirst)
        );
    }

    #[test]
    fn unknown_probe_source_rejected() {
        let mut body = Vec::new();
        body.push((5u8 << 5) | 5);
        push_text(&mut body, "content_version");
        push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
        push_text(&mut body, "probe_source");
        push_text(&mut body, "rogue.event");
        push_text(&mut body, "guest_pid");
        push_uint(&mut body, 0, 1);
        push_text(&mut body, "guest_comm");
        push_text(&mut body, "x");
        push_text(&mut body, "guest_monotonic_ns");
        push_uint(&mut body, 0, 1);

        match decode_event_body(&body) {
            Err(WireError::UnknownProbeSource(s)) => assert_eq!(s, "rogue.event"),
            other => panic!("expected UnknownProbeSource, got {other:?}"),
        }
    }

    #[test]
    fn frame_truncation_detected() {
        let ev = sample();
        let mut frame = encode_frame(&ev).unwrap();
        // Drop the last byte — body is now shorter than the declared length.
        let original_len = frame.len();
        frame.pop();
        match decode_frame(&frame) {
            Err(WireError::FrameTruncated { declared, actual }) => {
                assert_eq!(declared as usize, original_len - 4);
                assert_eq!(actual, original_len - 4 - 1);
            }
            other => panic!("expected FrameTruncated, got {other:?}"),
        }
    }

    #[test]
    fn short_header_detected() {
        // Only 3 bytes — the 4-byte length header itself didn't arrive in full.
        assert_eq!(
            decode_frame(&[0x00, 0x00, 0x00]),
            Err(WireError::ShortHeader { got: 3 })
        );
        assert_eq!(decode_frame(&[]), Err(WireError::ShortHeader { got: 0 }));
    }

    #[test]
    fn frame_too_large_rejected() {
        // Forge a frame whose declared length is larger than the cap.
        let mut frame = Vec::new();
        frame.extend_from_slice(&((MAX_FRAME_BODY_BYTES as u32) + 1).to_le_bytes());
        // Body bytes don't matter — header alone trips the check.
        match decode_frame(&frame) {
            Err(WireError::FrameTooLarge { .. }) => {}
            other => panic!("expected FrameTooLarge, got {other:?}"),
        }
    }

    #[test]
    fn unsupported_major_rejected() {
        // Outer item is an array, not a map.
        let body = vec![(4u8 << 5) | 5];
        assert_eq!(decode_event_body(&body), Err(WireError::NotMap5));
    }

    #[test]
    fn indefinite_length_rejected() {
        // additional=31 = indefinite length — not in our subset.
        let body = vec![(5u8 << 5) | 31];
        match decode_event_body(&body) {
            Err(WireError::UnsupportedAdditional { additional: 31 }) => {}
            other => panic!("expected UnsupportedAdditional(31), got {other:?}"),
        }
    }
}