vor 0.2.1

Cross-platform performance instrumentation with an in-app egui panel and live system and GPU metrics.
Documentation
//! The `.vor` stream format: a small uncompressed header followed by
//! a sequence of independently lz4-compressed, length-delimited
//! records.
//!
//! ```text
//! [header] [u32 len][lz4 record] [u32 len][lz4 record] ...
//! ```
//!
//! Per-record (not whole-stream) compression is what makes the stream
//! tailable and crash-safe: a reader does read-len / read+decompress /
//! repeat, stopping at EOF, and a job killed mid-write leaves a
//! truncated final record that the reader drops while keeping every
//! earlier record intact.
//!
//! Every metric is a column - a `(name, unit)` pair with a stable id,
//! declared once before any value references it. System columns are
//! declared in the header (their ids are `0..n_system` in order, known
//! at startup). User-metric columns
//! ([`record_metric`](crate::record_metric)) are declared by a
//! [`Decl`](StreamRecord::Decl) record the first time the metric
//! appears, taking ids from `n_system` up. Frame records then carry
//! values by id, so a name or unit is never repeated per frame.

use std::io::{self, Read, Write};

const MAGIC: [u8; 4] = *b"VOR1";

/// Bumped on any incompatible change to the header or record layout.
pub(crate) const VERSION: u16 = 2;

const TAG_FRAME: u8 = 0;
const TAG_DECL: u8 = 1;

/// One metric column, named + united for the viewer.
pub(crate) struct Column {
    pub name: String,
    pub unit: String,
}

/// Stream header: format version, whether records may carry flame
/// frames, and the system-metric columns (ids `0..len`).
pub(crate) struct Header {
    pub version: u16,
    pub flame_enabled: bool,
    pub columns: Vec<Column>,
}

/// One frame's record: system values (ids `0..n_system`, positional),
/// the frame's user scalars by column id, and an optional puffin flame
/// frame.
pub(crate) struct Record {
    pub system: Vec<f64>,
    pub user: Vec<(u16, f64)>,
    pub flame: Option<Vec<u8>>,
}

/// One decoded stream record: either a user-column declaration or a
/// frame.
pub(crate) enum StreamRecord {
    Decl { id: u16, name: String, unit: String },
    Frame(Record),
}

pub(crate) fn write_header(w: &mut impl Write, header: &Header) -> io::Result<()> {
    let Header {
        version,
        flame_enabled,
        columns,
    } = header;
    w.write_all(&MAGIC)?;
    w.write_all(&version.to_le_bytes())?;
    w.write_all(&[u8::from(*flame_enabled)])?;
    w.write_all(&(columns.len() as u16).to_le_bytes())?;
    for Column { name, unit } in columns {
        write_str(w, name)?;
        write_str(w, unit)?;
    }
    Ok(())
}

pub(crate) fn read_header(r: &mut impl Read) -> io::Result<Header> {
    let mut magic = [0u8; 4];
    r.read_exact(&mut magic)?;
    assert_eq!(magic, MAGIC, "not a .vor stream");
    let version = read_u16(r)?;
    assert_eq!(version, VERSION, "unsupported .vor version");
    let flame_enabled = read_u8(r)? != 0;
    let n = read_u16(r)? as usize;
    let columns = (0..n)
        .map(|_| {
            Ok(Column {
                name: read_str(r)?,
                unit: read_str(r)?,
            })
        })
        .collect::<io::Result<_>>()?;
    Ok(Header {
        version,
        flame_enabled,
        columns,
    })
}

/// Append a user-column declaration: id + name + unit, written once
/// before the first frame that references the id.
pub(crate) fn write_decl(w: &mut impl Write, id: u16, name: &str, unit: &str) -> io::Result<()> {
    let mut body = vec![TAG_DECL];
    body.extend_from_slice(&id.to_le_bytes());
    write_str(&mut body, name)?;
    write_str(&mut body, unit)?;
    write_blob(w, &body)
}

/// Append one frame record.
pub(crate) fn write_frame(w: &mut impl Write, record: &Record) -> io::Result<()> {
    let Record {
        system,
        user,
        flame,
    } = record;
    let mut body = vec![TAG_FRAME];
    for v in system {
        body.extend_from_slice(&v.to_le_bytes());
    }
    body.extend_from_slice(&(user.len() as u16).to_le_bytes());
    for (id, value) in user {
        body.extend_from_slice(&id.to_le_bytes());
        body.extend_from_slice(&value.to_le_bytes());
    }
    match flame {
        Some(bytes) => {
            body.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
            body.extend_from_slice(bytes);
        }
        None => body.extend_from_slice(&0u32.to_le_bytes()),
    }
    write_blob(w, &body)
}

/// Compress one record body and write it length-delimited.
fn write_blob(w: &mut impl Write, body: &[u8]) -> io::Result<()> {
    let blob = lz4_flex::compress_prepend_size(body);
    w.write_all(&(blob.len() as u32).to_le_bytes())?;
    w.write_all(&blob)
}

/// Parse one record from the front of `buf`, returning how many bytes
/// it consumed alongside it. `None` when `buf` does not yet hold a
/// complete record (a partial trailing record while tailing) - the
/// caller keeps the bytes and retries once more arrive.
pub(crate) fn try_read_record(buf: &[u8], n_system: usize) -> Option<(usize, StreamRecord)> {
    if buf.len() < 4 {
        return None;
    }
    let len = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize;
    let end = 4 + len;
    if buf.len() < end {
        return None;
    }
    Some((end, decode_blob(&buf[4..end], n_system)))
}

/// Decode one length-stripped, lz4-compressed record body.
fn decode_blob(blob: &[u8], n_system: usize) -> StreamRecord {
    let body = lz4_flex::decompress_size_prepended(blob).unwrap();
    let mut c = io::Cursor::new(body);
    match read_u8(&mut c).unwrap() {
        TAG_FRAME => StreamRecord::Frame(read_frame_body(&mut c, n_system).unwrap()),
        TAG_DECL => StreamRecord::Decl {
            id: read_u16(&mut c).unwrap(),
            name: read_str(&mut c).unwrap(),
            unit: read_str(&mut c).unwrap(),
        },
        other => panic!("unknown .vor record tag {other}"),
    }
}

fn read_frame_body(c: &mut impl Read, n_system: usize) -> io::Result<Record> {
    let system = (0..n_system).map(|_| read_f64(c)).collect::<io::Result<_>>()?;
    let n_user = read_u16(c)? as usize;
    let user = (0..n_user)
        .map(|_| Ok((read_u16(c)?, read_f64(c)?)))
        .collect::<io::Result<_>>()?;
    let flame_len = read_u32(c)? as usize;
    let flame = (flame_len != 0).then(|| {
        let mut bytes = vec![0u8; flame_len];
        c.read_exact(&mut bytes).unwrap();
        bytes
    });
    Ok(Record {
        system,
        user,
        flame,
    })
}

fn write_str(w: &mut impl Write, s: &str) -> io::Result<()> {
    w.write_all(&(s.len() as u16).to_le_bytes())?;
    w.write_all(s.as_bytes())
}

fn read_str(r: &mut impl Read) -> io::Result<String> {
    let len = read_u16(r)? as usize;
    let mut bytes = vec![0u8; len];
    r.read_exact(&mut bytes)?;
    Ok(String::from_utf8(bytes).unwrap())
}

fn read_u8(r: &mut impl Read) -> io::Result<u8> {
    let mut b = [0u8; 1];
    r.read_exact(&mut b)?;
    Ok(b[0])
}

fn read_u16(r: &mut impl Read) -> io::Result<u16> {
    let mut b = [0u8; 2];
    r.read_exact(&mut b)?;
    Ok(u16::from_le_bytes(b))
}

fn read_u32(r: &mut impl Read) -> io::Result<u32> {
    let mut b = [0u8; 4];
    r.read_exact(&mut b)?;
    Ok(u32::from_le_bytes(b))
}

fn read_f64(r: &mut impl Read) -> io::Result<f64> {
    let mut b = [0u8; 8];
    r.read_exact(&mut b)?;
    Ok(f64::from_le_bytes(b))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn header() -> Header {
        Header {
            version: VERSION,
            flame_enabled: true,
            columns: vec![
                Column {
                    name: "frame_ms".to_owned(),
                    unit: "ms".to_owned(),
                },
                Column {
                    name: "gpu_util".to_owned(),
                    unit: "%".to_owned(),
                },
            ],
        }
    }

    #[test]
    fn header_round_trip() {
        let mut buf = Vec::new();
        write_header(&mut buf, &header()).unwrap();
        let h = read_header(&mut io::Cursor::new(buf)).unwrap();
        assert_eq!(h.version, VERSION);
        assert!(h.flame_enabled);
        assert_eq!(h.columns.len(), 2);
        assert_eq!(h.columns[1].name, "gpu_util");
        assert_eq!(h.columns[1].unit, "%");
    }

    #[test]
    fn decl_and_frames_round_trip_and_stop_at_eof() {
        let mut buf = Vec::new();
        write_decl(&mut buf, 2, "loss", "nats").unwrap();
        write_frame(
            &mut buf,
            &Record {
                system: vec![16.6, 73.0],
                user: vec![(2, 1.25)],
                flame: Some(vec![1, 2, 3, 4]),
            },
        )
        .unwrap();
        write_frame(
            &mut buf,
            &Record {
                system: vec![17.0, 0.0],
                user: vec![],
                flame: None,
            },
        )
        .unwrap();

        let mut at = 0;
        let next = |at: &mut usize| {
            let (consumed, rec) = try_read_record(&buf[*at..], 2).unwrap();
            *at += consumed;
            rec
        };

        let StreamRecord::Decl { id, name, unit } = next(&mut at) else {
            panic!("expected a decl first");
        };
        assert_eq!((id, name.as_str(), unit.as_str()), (2, "loss", "nats"));

        let StreamRecord::Frame(r0) = next(&mut at) else {
            panic!("expected a frame");
        };
        assert_eq!(r0.system, vec![16.6, 73.0]);
        assert_eq!(r0.user, vec![(2, 1.25)]);
        assert_eq!(r0.flame, Some(vec![1, 2, 3, 4]));

        let StreamRecord::Frame(r1) = next(&mut at) else {
            panic!("expected a frame");
        };
        assert_eq!(r1.user, Vec::new());
        assert_eq!(r1.flame, None);
        assert!(try_read_record(&buf[at..], 2).is_none());
    }

    #[test]
    fn truncated_trailing_record_is_dropped() {
        let mut buf = Vec::new();
        write_frame(
            &mut buf,
            &Record {
                system: vec![1.0],
                user: vec![],
                flame: None,
            },
        )
        .unwrap();
        let full = buf.len();
        write_frame(
            &mut buf,
            &Record {
                system: vec![2.0],
                user: vec![],
                flame: None,
            },
        )
        .unwrap();
        // Lop off the back half of the second record's blob.
        buf.truncate(full + (buf.len() - full) / 2);

        let (consumed, rec) = try_read_record(&buf, 1).unwrap();
        let StreamRecord::Frame(r) = rec else {
            panic!("expected a frame");
        };
        assert_eq!(r.system, vec![1.0]);
        assert!(try_read_record(&buf[consumed..], 1).is_none());
    }

    #[test]
    fn try_read_record_needs_a_complete_record() {
        let mut buf = Vec::new();
        write_frame(
            &mut buf,
            &Record {
                system: vec![9.0],
                user: vec![],
                flame: None,
            },
        )
        .unwrap();

        // Every short prefix yields None; the bytes are kept for later.
        for cut in 0..buf.len() {
            assert!(try_read_record(&buf[..cut], 1).is_none());
        }
        let (consumed, rec) = try_read_record(&buf, 1).unwrap();
        assert_eq!(consumed, buf.len());
        let StreamRecord::Frame(r) = rec else {
            panic!("expected a frame");
        };
        assert_eq!(r.system, vec![9.0]);
    }
}