vor 0.2.0

Cross-platform performance instrumentation with an in-app egui panel and live system and GPU metrics.
Documentation
//! Read back a `.vor` capture: header then frames, one at a time.
//!
//! The reader buffers whole records, so it never consumes a partial
//! trailing record. [`Reader::next_frame`] returns `None` at a clean
//! EOF or when only a partial record has been written so far - which
//! makes the same call serve both a finished file (stop at `None`) and
//! one still being written (retry after `None` to tail new frames).

use std::fs::File;
use std::io::{self, BufReader, Read};
use std::path::Path;

use crate::record::format::{self, Column, Header, Record, StreamRecord};

/// Bytes pulled from the source per read when refilling the buffer.
const CHUNK: usize = 64 * 1024;

/// A metric column from a capture: name + unit, for labeling the
/// plotted row.
pub struct MetricColumn {
    pub name: String,
    pub unit: String,
}

/// One captured frame.
pub struct Frame {
    /// System-metric values, aligned to [`Reader::columns`].
    pub system: Vec<f64>,
    /// The frame's named user scalars (loss, lr, ...); units come from
    /// [`Reader::user_columns`]. Sparse: only metrics recorded this
    /// frame appear.
    pub user: Vec<(String, f64)>,
    /// Serialized puffin flame frame, present when flame capture was
    /// on for this frame. Decode with puffin's `FrameData::read_next`.
    pub flame: Option<Vec<u8>>,
}

/// Streaming reader over a `.vor` capture.
pub struct Reader<R: Read> {
    inner: R,
    buf: Vec<u8>,
    /// Start of the unconsumed bytes in `buf`; advanced as records are
    /// parsed, and dropped from the front only when refilling, so a
    /// long capture is not re-shifted per record.
    pos: usize,
    columns: Vec<MetricColumn>,
    user_columns: Vec<MetricColumn>,
    flame_enabled: bool,
}

impl Reader<BufReader<File>> {
    /// Open a capture file and read its header.
    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
        Self::new(BufReader::new(File::open(path)?))
    }
}

impl<R: Read> Reader<R> {
    /// Wrap any reader positioned at the start of a `.vor` stream and
    /// consume its header.
    pub fn new(mut inner: R) -> io::Result<Self> {
        let Header {
            version: _,
            flame_enabled,
            columns,
        } = format::read_header(&mut inner)?;
        let columns = columns
            .into_iter()
            .map(|Column { name, unit }| MetricColumn { name, unit })
            .collect();
        Ok(Self {
            inner,
            buf: Vec::new(),
            pos: 0,
            columns,
            user_columns: Vec::new(),
            flame_enabled,
        })
    }

    /// The system-metric columns each [`Frame::system`] aligns to.
    pub fn columns(&self) -> &[MetricColumn] {
        &self.columns
    }

    /// The user-metric columns declared so far. Grows as frames are
    /// read, since a metric is declared the first time it appears.
    pub fn user_columns(&self) -> &[MetricColumn] {
        &self.user_columns
    }

    /// Whether this capture may carry flame frames.
    pub const fn flame_enabled(&self) -> bool {
        self.flame_enabled
    }

    /// Pull the next frame, or `None` at EOF / a partial trailing
    /// record. Column declarations between frames are applied
    /// transparently ([`user_columns`](Self::user_columns)).
    ///
    /// On `None` from a still-growing capture, retry later: buffered
    /// partial bytes are kept, so a frame is emitted once fully
    /// written.
    pub fn next_frame(&mut self) -> io::Result<Option<Frame>> {
        let n_system = self.columns.len();
        loop {
            while let Some((consumed, record)) =
                format::try_read_record(&self.buf[self.pos..], n_system)
            {
                self.pos += consumed;
                match record {
                    StreamRecord::Decl { id, name, unit } => {
                        let expected = (n_system + self.user_columns.len()) as u16;
                        assert_eq!(id, expected, "out-of-order .vor column declaration");
                        self.user_columns.push(MetricColumn { name, unit });
                    }
                    StreamRecord::Frame(Record {
                        system,
                        user,
                        flame,
                    }) => {
                        let user = user
                            .into_iter()
                            .map(|(id, value)| (self.name_of(id), value))
                            .collect();
                        return Ok(Some(Frame {
                            system,
                            user,
                            flame,
                        }));
                    }
                }
            }
            // No complete record left: drop the consumed prefix, then
            // pull more bytes. A zero-byte read means EOF (clean or
            // mid-record) - stop for now.
            self.buf.drain(..self.pos);
            self.pos = 0;
            let mut chunk = [0u8; CHUNK];
            let n = self.inner.read(&mut chunk)?;
            if n == 0 {
                return Ok(None);
            }
            self.buf.extend_from_slice(&chunk[..n]);
        }
    }

    /// Resolve a frame's user-metric id to its declared name. Ids are
    /// assigned past the system columns, so the subtraction holds.
    fn name_of(&self, id: u16) -> String {
        let idx = id as usize - self.columns.len();
        self.user_columns[idx].name.clone()
    }
}

#[cfg(test)]
mod tests {
    use std::io::Cursor;

    use super::*;
    use crate::record::format::{Column as FmtColumn, Header as FmtHeader, Record, VERSION};
    use crate::record::format::{write_decl, write_frame, write_header};

    /// A capture truncated mid-record still yields its complete frames,
    /// and the reader resumes once the rest of the stream arrives -
    /// the tailing path.
    #[test]
    fn tails_a_partially_written_stream() {
        let mut full = Vec::new();
        write_header(
            &mut full,
            &FmtHeader {
                version: VERSION,
                flame_enabled: false,
                columns: vec![FmtColumn {
                    name: "frame_ms".to_owned(),
                    unit: "ms".to_owned(),
                }],
            },
        )
        .unwrap();
        write_decl(&mut full, 1, "loss", "").unwrap();
        write_frame(
            &mut full,
            &Record {
                system: vec![16.0],
                user: vec![(1, 0.5)],
                flame: None,
            },
        )
        .unwrap();
        let after_first = full.len();
        write_frame(
            &mut full,
            &Record {
                system: vec![17.0],
                user: vec![(1, 0.4)],
                flame: None,
            },
        )
        .unwrap();

        // Open on a prefix that holds the header + decl + first frame
        // and half of the second.
        let cut = after_first + (full.len() - after_first) / 2;
        let mut reader = Reader::new(Cursor::new(full[..cut].to_vec())).unwrap();
        assert_eq!(reader.columns()[0].name, "frame_ms");
        let f0 = reader.next_frame().unwrap().unwrap();
        assert_eq!(f0.system, vec![16.0]);
        assert_eq!(f0.user, vec![("loss".to_owned(), 0.5)]);
        // Second frame not yet complete.
        assert!(reader.next_frame().unwrap().is_none());

        // Feed the remaining bytes (as a fresh source) and resume.
        reader.inner = Cursor::new(full[cut..].to_vec());
        let f1 = reader.next_frame().unwrap().unwrap();
        assert_eq!(f1.system, vec![17.0]);
        assert_eq!(f1.user, vec![("loss".to_owned(), 0.4)]);
        assert!(reader.next_frame().unwrap().is_none());
    }
}