1#![deny(unsafe_code)]
55#![warn(missing_docs)]
56
57pub const WIRE_CONTENT_VERSION_MAJOR: u16 = 1;
61
62pub const VSOCK_TELEMETRY_PORT: u32 = 9001;
65
66pub const VMADDR_CID_HOST: u32 = 2;
69
70pub const MAX_FRAME_BODY_BYTES: usize = 4096;
73
74pub mod probe_source {
81 pub const PROCESS_SPAWNED: &str = "process.spawned";
83 pub const PROCESS_EXITED: &str = "process.exited";
85 pub const CAPABILITY_DENIED: &str = "capability.denied";
87 pub const FS_INOTIFY_FIRED: &str = "fs.inotify_fired";
89 pub const NET_CONNECT_ATTEMPTED: &str = "net.connect_attempted";
91
92 pub const ALL: &[&str] = &[
95 PROCESS_SPAWNED,
96 PROCESS_EXITED,
97 CAPABILITY_DENIED,
98 FS_INOTIFY_FIRED,
99 NET_CONNECT_ATTEMPTED,
100 ];
101
102 pub fn is_known(s: &str) -> bool {
104 ALL.contains(&s)
105 }
106}
107
108pub mod probes;
109
110#[derive(Debug, Clone)]
118pub struct GuestTelemetryDeclaration {
119 pub declared_surface: cellos_core::authority::DeclaredAuthoritySurface,
122 pub agent_version: String,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ProbeEvent {
130 pub probe_source: &'static str,
132 pub guest_pid: u32,
134 pub guest_comm: String,
136 pub guest_monotonic_ns: u64,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
155pub enum WireError {
156 FrameTooLarge {
158 len: usize,
160 },
161 FrameTruncated {
163 declared: u32,
165 actual: usize,
167 },
168 ShortHeader {
170 got: usize,
172 },
173 UnexpectedEof,
175 UnsupportedMajor {
177 major: u8,
179 },
180 UnsupportedAdditional {
182 additional: u8,
184 },
185 NotMap5,
187 ContentVersionMustBeFirst,
190 UnsupportedContentVersion(u16),
192 MissingField(&'static str),
194 FieldType {
196 field: &'static str,
198 got_major: u8,
200 },
201 IntegerOverflow {
203 field: &'static str,
205 },
206 UnknownProbeSource(String),
208 InvalidUtf8 {
210 field: &'static str,
212 },
213}
214
215impl core::fmt::Display for WireError {
216 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
217 match self {
218 Self::FrameTooLarge { len } => write!(f, "frame too large: {len} bytes"),
219 Self::FrameTruncated { declared, actual } => {
220 write!(f, "frame truncated: declared {declared}, got {actual}")
221 }
222 Self::ShortHeader { got } => write!(f, "short header: {got} bytes"),
223 Self::UnexpectedEof => write!(f, "unexpected end of CBOR buffer"),
224 Self::UnsupportedMajor { major } => write!(f, "unsupported CBOR major {major}"),
225 Self::UnsupportedAdditional { additional } => {
226 write!(f, "unsupported CBOR additional {additional}")
227 }
228 Self::NotMap5 => write!(f, "outer CBOR item is not map(5)"),
229 Self::ContentVersionMustBeFirst => {
230 write!(f, "content_version must be the first map key")
231 }
232 Self::UnsupportedContentVersion(v) => {
233 write!(f, "unsupported content_version major {v}")
234 }
235 Self::MissingField(n) => write!(f, "missing or duplicate field {n}"),
236 Self::FieldType { field, got_major } => {
237 write!(f, "field {field}: wrong major {got_major}")
238 }
239 Self::IntegerOverflow { field } => write!(f, "integer overflow in field {field}"),
240 Self::UnknownProbeSource(s) => write!(f, "unknown probe_source {s:?}"),
241 Self::InvalidUtf8 { field } => write!(f, "invalid utf8 in field {field}"),
242 }
243 }
244}
245
246impl std::error::Error for WireError {}
247
248fn push_uint(out: &mut Vec<u8>, major: u8, value: u64) {
254 debug_assert!(major <= 7);
255 let m = major << 5;
256 if value < 24 {
257 out.push(m | (value as u8));
258 } else if value <= u8::MAX as u64 {
259 out.push(m | 24);
260 out.push(value as u8);
261 } else if value <= u16::MAX as u64 {
262 out.push(m | 25);
263 out.extend_from_slice(&(value as u16).to_be_bytes());
264 } else if value <= u32::MAX as u64 {
265 out.push(m | 26);
266 out.extend_from_slice(&(value as u32).to_be_bytes());
267 } else {
268 out.push(m | 27);
269 out.extend_from_slice(&value.to_be_bytes());
270 }
271}
272
273fn push_text(out: &mut Vec<u8>, s: &str) {
274 push_uint(out, 3, s.len() as u64);
275 out.extend_from_slice(s.as_bytes());
276}
277
278pub fn encode_event_body(ev: &ProbeEvent) -> Vec<u8> {
283 let mut out = Vec::with_capacity(128);
284
285 out.push((5u8 << 5) | 5);
287
288 push_text(&mut out, "content_version");
290 push_uint(&mut out, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
291
292 push_text(&mut out, "probe_source");
294 push_text(&mut out, ev.probe_source);
295
296 push_text(&mut out, "guest_pid");
298 push_uint(&mut out, 0, ev.guest_pid as u64);
299
300 push_text(&mut out, "guest_comm");
302 push_text(&mut out, &ev.guest_comm);
303
304 push_text(&mut out, "guest_monotonic_ns");
306 push_uint(&mut out, 0, ev.guest_monotonic_ns);
307
308 out
309}
310
311pub fn encode_frame(ev: &ProbeEvent) -> Result<Vec<u8>, WireError> {
318 let body = encode_event_body(ev);
319 if body.len() > MAX_FRAME_BODY_BYTES {
320 return Err(WireError::FrameTooLarge { len: body.len() });
321 }
322 let mut frame = Vec::with_capacity(4 + body.len());
323 frame.extend_from_slice(&(body.len() as u32).to_le_bytes());
324 frame.extend_from_slice(&body);
325 Ok(frame)
326}
327
328struct Cursor<'a> {
332 buf: &'a [u8],
333 pos: usize,
334}
335
336impl<'a> Cursor<'a> {
337 fn new(buf: &'a [u8]) -> Self {
338 Self { buf, pos: 0 }
339 }
340
341 fn take(&mut self, n: usize) -> Result<&'a [u8], WireError> {
342 if self.pos + n > self.buf.len() {
343 return Err(WireError::UnexpectedEof);
344 }
345 let slice = &self.buf[self.pos..self.pos + n];
346 self.pos += n;
347 Ok(slice)
348 }
349
350 fn read_u8(&mut self) -> Result<u8, WireError> {
351 Ok(self.take(1)?[0])
352 }
353
354 fn read_header(&mut self) -> Result<(u8, u64), WireError> {
357 let b = self.read_u8()?;
358 let major = b >> 5;
359 let additional = b & 0x1f;
360 let arg = match additional {
361 0..=23 => additional as u64,
362 24 => self.read_u8()? as u64,
363 25 => {
364 let bs = self.take(2)?;
365 u16::from_be_bytes([bs[0], bs[1]]) as u64
366 }
367 26 => {
368 let bs = self.take(4)?;
369 u32::from_be_bytes([bs[0], bs[1], bs[2], bs[3]]) as u64
370 }
371 27 => {
372 let bs = self.take(8)?;
373 u64::from_be_bytes([bs[0], bs[1], bs[2], bs[3], bs[4], bs[5], bs[6], bs[7]])
374 }
375 other => return Err(WireError::UnsupportedAdditional { additional: other }),
376 };
377 Ok((major, arg))
378 }
379
380 fn read_text(&mut self, field: &'static str) -> Result<String, WireError> {
381 let (major, len) = self.read_header()?;
382 if major != 3 {
383 return Err(WireError::FieldType {
384 field,
385 got_major: major,
386 });
387 }
388 let bytes = self.take(len as usize)?;
389 std::str::from_utf8(bytes)
390 .map(|s| s.to_owned())
391 .map_err(|_| WireError::InvalidUtf8 { field })
392 }
393
394 fn read_uint(&mut self, field: &'static str) -> Result<u64, WireError> {
395 let (major, val) = self.read_header()?;
396 if major != 0 {
397 return Err(WireError::FieldType {
398 field,
399 got_major: major,
400 });
401 }
402 Ok(val)
403 }
404}
405
406pub fn decode_event_body(body: &[u8]) -> Result<ProbeEvent, WireError> {
417 let mut cur = Cursor::new(body);
418
419 let (major, len) = cur.read_header()?;
421 if major != 5 || len != 5 {
422 return Err(WireError::NotMap5);
423 }
424
425 let first_key = cur.read_text("<map key 0>")?;
428 if first_key != "content_version" {
429 return Err(WireError::ContentVersionMustBeFirst);
430 }
431 let cv = cur.read_uint("content_version")?;
432 if cv > u16::MAX as u64 {
433 return Err(WireError::IntegerOverflow {
434 field: "content_version",
435 });
436 }
437 if cv as u16 != WIRE_CONTENT_VERSION_MAJOR {
438 return Err(WireError::UnsupportedContentVersion(cv as u16));
439 }
440
441 let mut probe_source: Option<String> = None;
443 let mut guest_pid: Option<u32> = None;
444 let mut guest_comm: Option<String> = None;
445 let mut guest_monotonic_ns: Option<u64> = None;
446
447 for _ in 0..4 {
448 let key = cur.read_text("<map key>")?;
449 match key.as_str() {
450 "probe_source" => {
451 if probe_source.is_some() {
452 return Err(WireError::MissingField("probe_source"));
453 }
454 probe_source = Some(cur.read_text("probe_source")?);
455 }
456 "guest_pid" => {
457 if guest_pid.is_some() {
458 return Err(WireError::MissingField("guest_pid"));
459 }
460 let v = cur.read_uint("guest_pid")?;
461 if v > u32::MAX as u64 {
462 return Err(WireError::IntegerOverflow { field: "guest_pid" });
463 }
464 guest_pid = Some(v as u32);
465 }
466 "guest_comm" => {
467 if guest_comm.is_some() {
468 return Err(WireError::MissingField("guest_comm"));
469 }
470 guest_comm = Some(cur.read_text("guest_comm")?);
471 }
472 "guest_monotonic_ns" => {
473 if guest_monotonic_ns.is_some() {
474 return Err(WireError::MissingField("guest_monotonic_ns"));
475 }
476 guest_monotonic_ns = Some(cur.read_uint("guest_monotonic_ns")?);
477 }
478 _ => {
479 return Err(WireError::MissingField("<unknown key>"));
484 }
485 }
486 }
487
488 let ps_owned = probe_source.ok_or(WireError::MissingField("probe_source"))?;
489 if !probe_source::is_known(&ps_owned) {
490 return Err(WireError::UnknownProbeSource(ps_owned));
491 }
492 let ps_static: &'static str = probe_source::ALL
494 .iter()
495 .copied()
496 .find(|k| *k == ps_owned)
497 .expect("is_known just verified");
498
499 Ok(ProbeEvent {
500 probe_source: ps_static,
501 guest_pid: guest_pid.ok_or(WireError::MissingField("guest_pid"))?,
502 guest_comm: guest_comm.ok_or(WireError::MissingField("guest_comm"))?,
503 guest_monotonic_ns: guest_monotonic_ns
504 .ok_or(WireError::MissingField("guest_monotonic_ns"))?,
505 })
506}
507
508pub fn decode_frame(frame: &[u8]) -> Result<ProbeEvent, WireError> {
514 if frame.len() < 4 {
515 return Err(WireError::ShortHeader { got: frame.len() });
516 }
517 let declared = u32::from_le_bytes([frame[0], frame[1], frame[2], frame[3]]);
518 if declared as usize > MAX_FRAME_BODY_BYTES {
519 return Err(WireError::FrameTooLarge {
520 len: declared as usize,
521 });
522 }
523 let body_present = frame.len() - 4;
524 if (declared as usize) > body_present {
525 return Err(WireError::FrameTruncated {
526 declared,
527 actual: body_present,
528 });
529 }
530 decode_event_body(&frame[4..4 + declared as usize])
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536
537 fn sample() -> ProbeEvent {
538 ProbeEvent {
539 probe_source: probe_source::PROCESS_SPAWNED,
540 guest_pid: 4242,
541 guest_comm: "workload".to_owned(),
542 guest_monotonic_ns: 123_456_789_012,
543 }
544 }
545
546 #[test]
547 fn wire_versions_match() {
548 assert_eq!(WIRE_CONTENT_VERSION_MAJOR, 1);
549 assert_eq!(VSOCK_TELEMETRY_PORT, 9001);
550 assert_eq!(VMADDR_CID_HOST, 2);
551 }
552
553 #[test]
554 fn probe_source_constants_are_stable_strings() {
555 assert_eq!(probe_source::PROCESS_SPAWNED, "process.spawned");
556 assert_eq!(probe_source::CAPABILITY_DENIED, "capability.denied");
557 assert!(probe_source::is_known("process.spawned"));
558 assert!(!probe_source::is_known("rogue.event"));
559 }
560
561 #[test]
562 fn round_trip_encode_decode() {
563 let ev = sample();
564 let frame = encode_frame(&ev).expect("encode");
565 let back = decode_frame(&frame).expect("decode");
566 assert_eq!(back, ev);
567 }
568
569 #[test]
570 fn round_trip_each_probe_source() {
571 for &ps in probe_source::ALL {
572 let ev = ProbeEvent {
573 probe_source: ps,
574 guest_pid: 7,
575 guest_comm: "p".to_owned(),
576 guest_monotonic_ns: 1,
577 };
578 let frame = encode_frame(&ev).unwrap();
579 let back = decode_frame(&frame).unwrap();
580 assert_eq!(back, ev, "round-trip failed for {ps}");
581 }
582 }
583
584 #[test]
585 fn content_version_first_short_circuits_unknown_major() {
586 let mut body = Vec::new();
591 body.push((5u8 << 5) | 5); push_text(&mut body, "content_version");
593 push_uint(&mut body, 0, 999); push_text(&mut body, "probe_source");
595 push_text(&mut body, "rogue.event"); push_text(&mut body, "guest_pid");
597 push_uint(&mut body, 0, 1);
598 push_text(&mut body, "guest_comm");
599 push_text(&mut body, "x");
600 push_text(&mut body, "guest_monotonic_ns");
601 push_uint(&mut body, 0, 1);
602
603 match decode_event_body(&body) {
604 Err(WireError::UnsupportedContentVersion(999)) => {}
605 other => panic!(
606 "expected UnsupportedContentVersion(999), got {other:?} \
607 (host MUST short-circuit before probe-source validation)"
608 ),
609 }
610 }
611
612 #[test]
613 fn content_version_must_be_first_key() {
614 let mut body = Vec::new();
618 body.push((5u8 << 5) | 5);
619 push_text(&mut body, "guest_pid");
620 push_uint(&mut body, 0, 1);
621 push_text(&mut body, "content_version");
622 push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
623 push_text(&mut body, "probe_source");
624 push_text(&mut body, probe_source::PROCESS_SPAWNED);
625 push_text(&mut body, "guest_comm");
626 push_text(&mut body, "x");
627 push_text(&mut body, "guest_monotonic_ns");
628 push_uint(&mut body, 0, 1);
629
630 assert_eq!(
631 decode_event_body(&body),
632 Err(WireError::ContentVersionMustBeFirst)
633 );
634 }
635
636 #[test]
637 fn unknown_probe_source_rejected() {
638 let mut body = Vec::new();
639 body.push((5u8 << 5) | 5);
640 push_text(&mut body, "content_version");
641 push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
642 push_text(&mut body, "probe_source");
643 push_text(&mut body, "rogue.event");
644 push_text(&mut body, "guest_pid");
645 push_uint(&mut body, 0, 1);
646 push_text(&mut body, "guest_comm");
647 push_text(&mut body, "x");
648 push_text(&mut body, "guest_monotonic_ns");
649 push_uint(&mut body, 0, 1);
650
651 match decode_event_body(&body) {
652 Err(WireError::UnknownProbeSource(s)) => assert_eq!(s, "rogue.event"),
653 other => panic!("expected UnknownProbeSource, got {other:?}"),
654 }
655 }
656
657 #[test]
658 fn frame_truncation_detected() {
659 let ev = sample();
660 let mut frame = encode_frame(&ev).unwrap();
661 let original_len = frame.len();
663 frame.pop();
664 match decode_frame(&frame) {
665 Err(WireError::FrameTruncated { declared, actual }) => {
666 assert_eq!(declared as usize, original_len - 4);
667 assert_eq!(actual, original_len - 4 - 1);
668 }
669 other => panic!("expected FrameTruncated, got {other:?}"),
670 }
671 }
672
673 #[test]
674 fn short_header_detected() {
675 assert_eq!(
677 decode_frame(&[0x00, 0x00, 0x00]),
678 Err(WireError::ShortHeader { got: 3 })
679 );
680 assert_eq!(decode_frame(&[]), Err(WireError::ShortHeader { got: 0 }));
681 }
682
683 #[test]
684 fn frame_too_large_rejected() {
685 let mut frame = Vec::new();
687 frame.extend_from_slice(&((MAX_FRAME_BODY_BYTES as u32) + 1).to_le_bytes());
688 match decode_frame(&frame) {
690 Err(WireError::FrameTooLarge { .. }) => {}
691 other => panic!("expected FrameTooLarge, got {other:?}"),
692 }
693 }
694
695 #[test]
696 fn unsupported_major_rejected() {
697 let body = vec![(4u8 << 5) | 5];
699 assert_eq!(decode_event_body(&body), Err(WireError::NotMap5));
700 }
701
702 #[test]
703 fn indefinite_length_rejected() {
704 let body = vec![(5u8 << 5) | 31];
706 match decode_event_body(&body) {
707 Err(WireError::UnsupportedAdditional { additional: 31 }) => {}
708 other => panic!("expected UnsupportedAdditional(31), got {other:?}"),
709 }
710 }
711}