#![deny(unsafe_code)]
#![warn(missing_docs)]
pub const WIRE_CONTENT_VERSION_MAJOR: u16 = 1;
pub const VSOCK_TELEMETRY_PORT: u32 = 9001;
pub const VMADDR_CID_HOST: u32 = 2;
pub const MAX_FRAME_BODY_BYTES: usize = 4096;
pub mod probe_source {
pub const PROCESS_SPAWNED: &str = "process.spawned";
pub const PROCESS_EXITED: &str = "process.exited";
pub const CAPABILITY_DENIED: &str = "capability.denied";
pub const FS_INOTIFY_FIRED: &str = "fs.inotify_fired";
pub const NET_CONNECT_ATTEMPTED: &str = "net.connect_attempted";
pub const ALL: &[&str] = &[
PROCESS_SPAWNED,
PROCESS_EXITED,
CAPABILITY_DENIED,
FS_INOTIFY_FIRED,
NET_CONNECT_ATTEMPTED,
];
pub fn is_known(s: &str) -> bool {
ALL.contains(&s)
}
}
pub mod probes;
#[derive(Debug, Clone)]
pub struct GuestTelemetryDeclaration {
pub declared_surface: cellos_core::authority::DeclaredAuthoritySurface,
pub agent_version: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeEvent {
pub probe_source: &'static str,
pub guest_pid: u32,
pub guest_comm: String,
pub guest_monotonic_ns: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WireError {
FrameTooLarge {
len: usize,
},
FrameTruncated {
declared: u32,
actual: usize,
},
ShortHeader {
got: usize,
},
UnexpectedEof,
UnsupportedMajor {
major: u8,
},
UnsupportedAdditional {
additional: u8,
},
NotMap5,
ContentVersionMustBeFirst,
UnsupportedContentVersion(u16),
MissingField(&'static str),
FieldType {
field: &'static str,
got_major: u8,
},
IntegerOverflow {
field: &'static str,
},
UnknownProbeSource(String),
InvalidUtf8 {
field: &'static str,
},
}
impl core::fmt::Display for WireError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::FrameTooLarge { len } => write!(f, "frame too large: {len} bytes"),
Self::FrameTruncated { declared, actual } => {
write!(f, "frame truncated: declared {declared}, got {actual}")
}
Self::ShortHeader { got } => write!(f, "short header: {got} bytes"),
Self::UnexpectedEof => write!(f, "unexpected end of CBOR buffer"),
Self::UnsupportedMajor { major } => write!(f, "unsupported CBOR major {major}"),
Self::UnsupportedAdditional { additional } => {
write!(f, "unsupported CBOR additional {additional}")
}
Self::NotMap5 => write!(f, "outer CBOR item is not map(5)"),
Self::ContentVersionMustBeFirst => {
write!(f, "content_version must be the first map key")
}
Self::UnsupportedContentVersion(v) => {
write!(f, "unsupported content_version major {v}")
}
Self::MissingField(n) => write!(f, "missing or duplicate field {n}"),
Self::FieldType { field, got_major } => {
write!(f, "field {field}: wrong major {got_major}")
}
Self::IntegerOverflow { field } => write!(f, "integer overflow in field {field}"),
Self::UnknownProbeSource(s) => write!(f, "unknown probe_source {s:?}"),
Self::InvalidUtf8 { field } => write!(f, "invalid utf8 in field {field}"),
}
}
}
impl std::error::Error for WireError {}
fn push_uint(out: &mut Vec<u8>, major: u8, value: u64) {
debug_assert!(major <= 7);
let m = major << 5;
if value < 24 {
out.push(m | (value as u8));
} else if value <= u8::MAX as u64 {
out.push(m | 24);
out.push(value as u8);
} else if value <= u16::MAX as u64 {
out.push(m | 25);
out.extend_from_slice(&(value as u16).to_be_bytes());
} else if value <= u32::MAX as u64 {
out.push(m | 26);
out.extend_from_slice(&(value as u32).to_be_bytes());
} else {
out.push(m | 27);
out.extend_from_slice(&value.to_be_bytes());
}
}
fn push_text(out: &mut Vec<u8>, s: &str) {
push_uint(out, 3, s.len() as u64);
out.extend_from_slice(s.as_bytes());
}
pub fn encode_event_body(ev: &ProbeEvent) -> Vec<u8> {
let mut out = Vec::with_capacity(128);
out.push((5u8 << 5) | 5);
push_text(&mut out, "content_version");
push_uint(&mut out, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
push_text(&mut out, "probe_source");
push_text(&mut out, ev.probe_source);
push_text(&mut out, "guest_pid");
push_uint(&mut out, 0, ev.guest_pid as u64);
push_text(&mut out, "guest_comm");
push_text(&mut out, &ev.guest_comm);
push_text(&mut out, "guest_monotonic_ns");
push_uint(&mut out, 0, ev.guest_monotonic_ns);
out
}
pub fn encode_frame(ev: &ProbeEvent) -> Result<Vec<u8>, WireError> {
let body = encode_event_body(ev);
if body.len() > MAX_FRAME_BODY_BYTES {
return Err(WireError::FrameTooLarge { len: body.len() });
}
let mut frame = Vec::with_capacity(4 + body.len());
frame.extend_from_slice(&(body.len() as u32).to_le_bytes());
frame.extend_from_slice(&body);
Ok(frame)
}
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> Cursor<'a> {
fn new(buf: &'a [u8]) -> Self {
Self { buf, pos: 0 }
}
fn take(&mut self, n: usize) -> Result<&'a [u8], WireError> {
if self.pos + n > self.buf.len() {
return Err(WireError::UnexpectedEof);
}
let slice = &self.buf[self.pos..self.pos + n];
self.pos += n;
Ok(slice)
}
fn read_u8(&mut self) -> Result<u8, WireError> {
Ok(self.take(1)?[0])
}
fn read_header(&mut self) -> Result<(u8, u64), WireError> {
let b = self.read_u8()?;
let major = b >> 5;
let additional = b & 0x1f;
let arg = match additional {
0..=23 => additional as u64,
24 => self.read_u8()? as u64,
25 => {
let bs = self.take(2)?;
u16::from_be_bytes([bs[0], bs[1]]) as u64
}
26 => {
let bs = self.take(4)?;
u32::from_be_bytes([bs[0], bs[1], bs[2], bs[3]]) as u64
}
27 => {
let bs = self.take(8)?;
u64::from_be_bytes([bs[0], bs[1], bs[2], bs[3], bs[4], bs[5], bs[6], bs[7]])
}
other => return Err(WireError::UnsupportedAdditional { additional: other }),
};
Ok((major, arg))
}
fn read_text(&mut self, field: &'static str) -> Result<String, WireError> {
let (major, len) = self.read_header()?;
if major != 3 {
return Err(WireError::FieldType {
field,
got_major: major,
});
}
let bytes = self.take(len as usize)?;
std::str::from_utf8(bytes)
.map(|s| s.to_owned())
.map_err(|_| WireError::InvalidUtf8 { field })
}
fn read_uint(&mut self, field: &'static str) -> Result<u64, WireError> {
let (major, val) = self.read_header()?;
if major != 0 {
return Err(WireError::FieldType {
field,
got_major: major,
});
}
Ok(val)
}
}
pub fn decode_event_body(body: &[u8]) -> Result<ProbeEvent, WireError> {
let mut cur = Cursor::new(body);
let (major, len) = cur.read_header()?;
if major != 5 || len != 5 {
return Err(WireError::NotMap5);
}
let first_key = cur.read_text("<map key 0>")?;
if first_key != "content_version" {
return Err(WireError::ContentVersionMustBeFirst);
}
let cv = cur.read_uint("content_version")?;
if cv > u16::MAX as u64 {
return Err(WireError::IntegerOverflow {
field: "content_version",
});
}
if cv as u16 != WIRE_CONTENT_VERSION_MAJOR {
return Err(WireError::UnsupportedContentVersion(cv as u16));
}
let mut probe_source: Option<String> = None;
let mut guest_pid: Option<u32> = None;
let mut guest_comm: Option<String> = None;
let mut guest_monotonic_ns: Option<u64> = None;
for _ in 0..4 {
let key = cur.read_text("<map key>")?;
match key.as_str() {
"probe_source" => {
if probe_source.is_some() {
return Err(WireError::MissingField("probe_source"));
}
probe_source = Some(cur.read_text("probe_source")?);
}
"guest_pid" => {
if guest_pid.is_some() {
return Err(WireError::MissingField("guest_pid"));
}
let v = cur.read_uint("guest_pid")?;
if v > u32::MAX as u64 {
return Err(WireError::IntegerOverflow { field: "guest_pid" });
}
guest_pid = Some(v as u32);
}
"guest_comm" => {
if guest_comm.is_some() {
return Err(WireError::MissingField("guest_comm"));
}
guest_comm = Some(cur.read_text("guest_comm")?);
}
"guest_monotonic_ns" => {
if guest_monotonic_ns.is_some() {
return Err(WireError::MissingField("guest_monotonic_ns"));
}
guest_monotonic_ns = Some(cur.read_uint("guest_monotonic_ns")?);
}
_ => {
return Err(WireError::MissingField("<unknown key>"));
}
}
}
let ps_owned = probe_source.ok_or(WireError::MissingField("probe_source"))?;
if !probe_source::is_known(&ps_owned) {
return Err(WireError::UnknownProbeSource(ps_owned));
}
let ps_static: &'static str = probe_source::ALL
.iter()
.copied()
.find(|k| *k == ps_owned)
.expect("is_known just verified");
Ok(ProbeEvent {
probe_source: ps_static,
guest_pid: guest_pid.ok_or(WireError::MissingField("guest_pid"))?,
guest_comm: guest_comm.ok_or(WireError::MissingField("guest_comm"))?,
guest_monotonic_ns: guest_monotonic_ns
.ok_or(WireError::MissingField("guest_monotonic_ns"))?,
})
}
pub fn decode_frame(frame: &[u8]) -> Result<ProbeEvent, WireError> {
if frame.len() < 4 {
return Err(WireError::ShortHeader { got: frame.len() });
}
let declared = u32::from_le_bytes([frame[0], frame[1], frame[2], frame[3]]);
if declared as usize > MAX_FRAME_BODY_BYTES {
return Err(WireError::FrameTooLarge {
len: declared as usize,
});
}
let body_present = frame.len() - 4;
if (declared as usize) > body_present {
return Err(WireError::FrameTruncated {
declared,
actual: body_present,
});
}
decode_event_body(&frame[4..4 + declared as usize])
}
#[cfg(test)]
mod tests {
use super::*;
fn sample() -> ProbeEvent {
ProbeEvent {
probe_source: probe_source::PROCESS_SPAWNED,
guest_pid: 4242,
guest_comm: "workload".to_owned(),
guest_monotonic_ns: 123_456_789_012,
}
}
#[test]
fn wire_versions_match() {
assert_eq!(WIRE_CONTENT_VERSION_MAJOR, 1);
assert_eq!(VSOCK_TELEMETRY_PORT, 9001);
assert_eq!(VMADDR_CID_HOST, 2);
}
#[test]
fn probe_source_constants_are_stable_strings() {
assert_eq!(probe_source::PROCESS_SPAWNED, "process.spawned");
assert_eq!(probe_source::CAPABILITY_DENIED, "capability.denied");
assert!(probe_source::is_known("process.spawned"));
assert!(!probe_source::is_known("rogue.event"));
}
#[test]
fn round_trip_encode_decode() {
let ev = sample();
let frame = encode_frame(&ev).expect("encode");
let back = decode_frame(&frame).expect("decode");
assert_eq!(back, ev);
}
#[test]
fn round_trip_each_probe_source() {
for &ps in probe_source::ALL {
let ev = ProbeEvent {
probe_source: ps,
guest_pid: 7,
guest_comm: "p".to_owned(),
guest_monotonic_ns: 1,
};
let frame = encode_frame(&ev).unwrap();
let back = decode_frame(&frame).unwrap();
assert_eq!(back, ev, "round-trip failed for {ps}");
}
}
#[test]
fn content_version_first_short_circuits_unknown_major() {
let mut body = Vec::new();
body.push((5u8 << 5) | 5); push_text(&mut body, "content_version");
push_uint(&mut body, 0, 999); push_text(&mut body, "probe_source");
push_text(&mut body, "rogue.event"); push_text(&mut body, "guest_pid");
push_uint(&mut body, 0, 1);
push_text(&mut body, "guest_comm");
push_text(&mut body, "x");
push_text(&mut body, "guest_monotonic_ns");
push_uint(&mut body, 0, 1);
match decode_event_body(&body) {
Err(WireError::UnsupportedContentVersion(999)) => {}
other => panic!(
"expected UnsupportedContentVersion(999), got {other:?} \
(host MUST short-circuit before probe-source validation)"
),
}
}
#[test]
fn content_version_must_be_first_key() {
let mut body = Vec::new();
body.push((5u8 << 5) | 5);
push_text(&mut body, "guest_pid");
push_uint(&mut body, 0, 1);
push_text(&mut body, "content_version");
push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
push_text(&mut body, "probe_source");
push_text(&mut body, probe_source::PROCESS_SPAWNED);
push_text(&mut body, "guest_comm");
push_text(&mut body, "x");
push_text(&mut body, "guest_monotonic_ns");
push_uint(&mut body, 0, 1);
assert_eq!(
decode_event_body(&body),
Err(WireError::ContentVersionMustBeFirst)
);
}
#[test]
fn unknown_probe_source_rejected() {
let mut body = Vec::new();
body.push((5u8 << 5) | 5);
push_text(&mut body, "content_version");
push_uint(&mut body, 0, WIRE_CONTENT_VERSION_MAJOR as u64);
push_text(&mut body, "probe_source");
push_text(&mut body, "rogue.event");
push_text(&mut body, "guest_pid");
push_uint(&mut body, 0, 1);
push_text(&mut body, "guest_comm");
push_text(&mut body, "x");
push_text(&mut body, "guest_monotonic_ns");
push_uint(&mut body, 0, 1);
match decode_event_body(&body) {
Err(WireError::UnknownProbeSource(s)) => assert_eq!(s, "rogue.event"),
other => panic!("expected UnknownProbeSource, got {other:?}"),
}
}
#[test]
fn frame_truncation_detected() {
let ev = sample();
let mut frame = encode_frame(&ev).unwrap();
let original_len = frame.len();
frame.pop();
match decode_frame(&frame) {
Err(WireError::FrameTruncated { declared, actual }) => {
assert_eq!(declared as usize, original_len - 4);
assert_eq!(actual, original_len - 4 - 1);
}
other => panic!("expected FrameTruncated, got {other:?}"),
}
}
#[test]
fn short_header_detected() {
assert_eq!(
decode_frame(&[0x00, 0x00, 0x00]),
Err(WireError::ShortHeader { got: 3 })
);
assert_eq!(decode_frame(&[]), Err(WireError::ShortHeader { got: 0 }));
}
#[test]
fn frame_too_large_rejected() {
let mut frame = Vec::new();
frame.extend_from_slice(&((MAX_FRAME_BODY_BYTES as u32) + 1).to_le_bytes());
match decode_frame(&frame) {
Err(WireError::FrameTooLarge { .. }) => {}
other => panic!("expected FrameTooLarge, got {other:?}"),
}
}
#[test]
fn unsupported_major_rejected() {
let body = vec![(4u8 << 5) | 5];
assert_eq!(decode_event_body(&body), Err(WireError::NotMap5));
}
#[test]
fn indefinite_length_rejected() {
let body = vec![(5u8 << 5) | 31];
match decode_event_body(&body) {
Err(WireError::UnsupportedAdditional { additional: 31 }) => {}
other => panic!("expected UnsupportedAdditional(31), got {other:?}"),
}
}
}