use std::io::{self, Read, Write};
const MAGIC: [u8; 4] = *b"VOR1";
pub(crate) const VERSION: u16 = 2;
const TAG_FRAME: u8 = 0;
const TAG_DECL: u8 = 1;
pub(crate) struct Column {
pub name: String,
pub unit: String,
}
pub(crate) struct Header {
pub version: u16,
pub flame_enabled: bool,
pub columns: Vec<Column>,
}
pub(crate) struct Record {
pub system: Vec<f64>,
pub user: Vec<(u16, f64)>,
pub flame: Option<Vec<u8>>,
}
pub(crate) enum StreamRecord {
Decl { id: u16, name: String, unit: String },
Frame(Record),
}
pub(crate) fn write_header(w: &mut impl Write, header: &Header) -> io::Result<()> {
let Header {
version,
flame_enabled,
columns,
} = header;
w.write_all(&MAGIC)?;
w.write_all(&version.to_le_bytes())?;
w.write_all(&[u8::from(*flame_enabled)])?;
w.write_all(&(columns.len() as u16).to_le_bytes())?;
for Column { name, unit } in columns {
write_str(w, name)?;
write_str(w, unit)?;
}
Ok(())
}
pub(crate) fn read_header(r: &mut impl Read) -> io::Result<Header> {
let mut magic = [0u8; 4];
r.read_exact(&mut magic)?;
assert_eq!(magic, MAGIC, "not a .vor stream");
let version = read_u16(r)?;
assert_eq!(version, VERSION, "unsupported .vor version");
let flame_enabled = read_u8(r)? != 0;
let n = read_u16(r)? as usize;
let columns = (0..n)
.map(|_| {
Ok(Column {
name: read_str(r)?,
unit: read_str(r)?,
})
})
.collect::<io::Result<_>>()?;
Ok(Header {
version,
flame_enabled,
columns,
})
}
pub(crate) fn write_decl(w: &mut impl Write, id: u16, name: &str, unit: &str) -> io::Result<()> {
let mut body = vec![TAG_DECL];
body.extend_from_slice(&id.to_le_bytes());
write_str(&mut body, name)?;
write_str(&mut body, unit)?;
write_blob(w, &body)
}
pub(crate) fn write_frame(w: &mut impl Write, record: &Record) -> io::Result<()> {
let Record {
system,
user,
flame,
} = record;
let mut body = vec![TAG_FRAME];
for v in system {
body.extend_from_slice(&v.to_le_bytes());
}
body.extend_from_slice(&(user.len() as u16).to_le_bytes());
for (id, value) in user {
body.extend_from_slice(&id.to_le_bytes());
body.extend_from_slice(&value.to_le_bytes());
}
match flame {
Some(bytes) => {
body.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
body.extend_from_slice(bytes);
}
None => body.extend_from_slice(&0u32.to_le_bytes()),
}
write_blob(w, &body)
}
fn write_blob(w: &mut impl Write, body: &[u8]) -> io::Result<()> {
let blob = lz4_flex::compress_prepend_size(body);
w.write_all(&(blob.len() as u32).to_le_bytes())?;
w.write_all(&blob)
}
pub(crate) fn try_read_record(buf: &[u8], n_system: usize) -> Option<(usize, StreamRecord)> {
if buf.len() < 4 {
return None;
}
let len = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize;
let end = 4 + len;
if buf.len() < end {
return None;
}
Some((end, decode_blob(&buf[4..end], n_system)))
}
fn decode_blob(blob: &[u8], n_system: usize) -> StreamRecord {
let body = lz4_flex::decompress_size_prepended(blob).unwrap();
let mut c = io::Cursor::new(body);
match read_u8(&mut c).unwrap() {
TAG_FRAME => StreamRecord::Frame(read_frame_body(&mut c, n_system).unwrap()),
TAG_DECL => StreamRecord::Decl {
id: read_u16(&mut c).unwrap(),
name: read_str(&mut c).unwrap(),
unit: read_str(&mut c).unwrap(),
},
other => panic!("unknown .vor record tag {other}"),
}
}
fn read_frame_body(c: &mut impl Read, n_system: usize) -> io::Result<Record> {
let system = (0..n_system).map(|_| read_f64(c)).collect::<io::Result<_>>()?;
let n_user = read_u16(c)? as usize;
let user = (0..n_user)
.map(|_| Ok((read_u16(c)?, read_f64(c)?)))
.collect::<io::Result<_>>()?;
let flame_len = read_u32(c)? as usize;
let flame = (flame_len != 0).then(|| {
let mut bytes = vec![0u8; flame_len];
c.read_exact(&mut bytes).unwrap();
bytes
});
Ok(Record {
system,
user,
flame,
})
}
fn write_str(w: &mut impl Write, s: &str) -> io::Result<()> {
w.write_all(&(s.len() as u16).to_le_bytes())?;
w.write_all(s.as_bytes())
}
fn read_str(r: &mut impl Read) -> io::Result<String> {
let len = read_u16(r)? as usize;
let mut bytes = vec![0u8; len];
r.read_exact(&mut bytes)?;
Ok(String::from_utf8(bytes).unwrap())
}
fn read_u8(r: &mut impl Read) -> io::Result<u8> {
let mut b = [0u8; 1];
r.read_exact(&mut b)?;
Ok(b[0])
}
fn read_u16(r: &mut impl Read) -> io::Result<u16> {
let mut b = [0u8; 2];
r.read_exact(&mut b)?;
Ok(u16::from_le_bytes(b))
}
fn read_u32(r: &mut impl Read) -> io::Result<u32> {
let mut b = [0u8; 4];
r.read_exact(&mut b)?;
Ok(u32::from_le_bytes(b))
}
fn read_f64(r: &mut impl Read) -> io::Result<f64> {
let mut b = [0u8; 8];
r.read_exact(&mut b)?;
Ok(f64::from_le_bytes(b))
}
#[cfg(test)]
mod tests {
use super::*;
fn header() -> Header {
Header {
version: VERSION,
flame_enabled: true,
columns: vec![
Column {
name: "frame_ms".to_owned(),
unit: "ms".to_owned(),
},
Column {
name: "gpu_util".to_owned(),
unit: "%".to_owned(),
},
],
}
}
#[test]
fn header_round_trip() {
let mut buf = Vec::new();
write_header(&mut buf, &header()).unwrap();
let h = read_header(&mut io::Cursor::new(buf)).unwrap();
assert_eq!(h.version, VERSION);
assert!(h.flame_enabled);
assert_eq!(h.columns.len(), 2);
assert_eq!(h.columns[1].name, "gpu_util");
assert_eq!(h.columns[1].unit, "%");
}
#[test]
fn decl_and_frames_round_trip_and_stop_at_eof() {
let mut buf = Vec::new();
write_decl(&mut buf, 2, "loss", "nats").unwrap();
write_frame(
&mut buf,
&Record {
system: vec![16.6, 73.0],
user: vec![(2, 1.25)],
flame: Some(vec![1, 2, 3, 4]),
},
)
.unwrap();
write_frame(
&mut buf,
&Record {
system: vec![17.0, 0.0],
user: vec![],
flame: None,
},
)
.unwrap();
let mut at = 0;
let next = |at: &mut usize| {
let (consumed, rec) = try_read_record(&buf[*at..], 2).unwrap();
*at += consumed;
rec
};
let StreamRecord::Decl { id, name, unit } = next(&mut at) else {
panic!("expected a decl first");
};
assert_eq!((id, name.as_str(), unit.as_str()), (2, "loss", "nats"));
let StreamRecord::Frame(r0) = next(&mut at) else {
panic!("expected a frame");
};
assert_eq!(r0.system, vec![16.6, 73.0]);
assert_eq!(r0.user, vec![(2, 1.25)]);
assert_eq!(r0.flame, Some(vec![1, 2, 3, 4]));
let StreamRecord::Frame(r1) = next(&mut at) else {
panic!("expected a frame");
};
assert_eq!(r1.user, Vec::new());
assert_eq!(r1.flame, None);
assert!(try_read_record(&buf[at..], 2).is_none());
}
#[test]
fn truncated_trailing_record_is_dropped() {
let mut buf = Vec::new();
write_frame(
&mut buf,
&Record {
system: vec![1.0],
user: vec![],
flame: None,
},
)
.unwrap();
let full = buf.len();
write_frame(
&mut buf,
&Record {
system: vec![2.0],
user: vec![],
flame: None,
},
)
.unwrap();
buf.truncate(full + (buf.len() - full) / 2);
let (consumed, rec) = try_read_record(&buf, 1).unwrap();
let StreamRecord::Frame(r) = rec else {
panic!("expected a frame");
};
assert_eq!(r.system, vec![1.0]);
assert!(try_read_record(&buf[consumed..], 1).is_none());
}
#[test]
fn try_read_record_needs_a_complete_record() {
let mut buf = Vec::new();
write_frame(
&mut buf,
&Record {
system: vec![9.0],
user: vec![],
flame: None,
},
)
.unwrap();
for cut in 0..buf.len() {
assert!(try_read_record(&buf[..cut], 1).is_none());
}
let (consumed, rec) = try_read_record(&buf, 1).unwrap();
assert_eq!(consumed, buf.len());
let StreamRecord::Frame(r) = rec else {
panic!("expected a frame");
};
assert_eq!(r.system, vec![9.0]);
}
}