vor 0.2.0

Cross-platform performance instrumentation with an in-app egui panel and live system and GPU metrics.
Documentation
//! Headless capture: write each [`frame_mark`](crate::frame_mark) as a
//! record on an append-only `.vor` stream, armed by the `VOR_RECORD`
//! environment variable. Decoupled from `viz` so an ML job records
//! with no panel, egui, or render loop.
//!
//! Environment:
//! - `VOR_RECORD` - output path (`/scratch/run.vor`). Unset = no
//!   recording. A `tcp://host:port` value is reserved for a later
//!   transport and rejected for now.
//! - `VOR_RECORD_FLAME=1` - also capture puffin flame frames (default
//!   off: metrics only, tens of bytes per step).
//! - `VOR_RECORD_EVERY=N` - capture a flame frame on 1 step in N.
//! - `VOR_RECORD_MAX_FRAMES=N` - stop capturing flame frames after N
//!   of them (metrics keep flowing).

pub(crate) mod format;
pub mod read;
mod writer;

use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::mpsc::{Sender, channel, sync_channel};
use std::sync::{Mutex, Once, OnceLock};
use std::thread;

use puffin::GlobalFrameView;
use web_time::Instant;

use crate::metrics_acc::DrainedMetric;
use crate::record::format::{Column, Header, Record, VERSION};
use crate::record::writer::Msg;
use crate::system::{SYSTEM_COLUMNS, sample_now};

static RECORDER: OnceLock<Recorder> = OnceLock::new();
static ARMED: Once = Once::new();

/// Per-frame mutable state, guarded so [`frame_mark`](crate::frame_mark)
/// stays correct even if called off more than one thread.
struct FrameState {
    last: Option<Instant>,
    count: u64,
    flames: u64,
    /// User-metric name to column id. Ids start past the system
    /// columns; a name is declared to the stream once, the first frame
    /// it appears.
    user_ids: BTreeMap<String, u16>,
}

struct Recorder {
    // `Sender` is not `Sync`; the `Mutex` lets the process-global
    // recorder hand records to the writer from any frame_mark thread.
    tx: Mutex<Sender<Msg>>,
    state: Mutex<FrameState>,
    /// `Some` only when flame capture is on; doubles as the puffin
    /// frame source. Absent in metrics-only mode, so no frame
    /// retention overhead.
    view: Option<GlobalFrameView>,
    flame_every: u64,
    max_flames: Option<u64>,
}

impl Recorder {
    fn new(target: PathBuf, flame: bool, flame_every: u64, max_flames: Option<u64>) -> Self {
        assert!(flame_every >= 1);
        crate::gpu::ensure_collector();
        let columns = SYSTEM_COLUMNS
            .iter()
            .map(|c| Column {
                name: c.name.to_owned(),
                unit: c.unit.to_owned(),
            })
            .collect();
        let header = Header {
            version: VERSION,
            flame_enabled: flame,
            columns,
        };
        let view = flame.then(GlobalFrameView::default);
        let (tx, rx) = channel();
        thread::Builder::new()
            .name("vor-record".into())
            .spawn(move || writer::run(target, header, rx))
            .unwrap();
        Self {
            tx: Mutex::new(tx),
            state: Mutex::new(FrameState {
                last: None,
                count: 0,
                flames: 0,
                user_ids: BTreeMap::new(),
            }),
            view,
            flame_every,
            max_flames,
        }
    }

    /// Snapshot one frame: system sample + named scalars + optional
    /// flame frame, handed to the writer thread. A user metric seen for
    /// the first time is declared (id + name + unit) before the frame
    /// that references its id.
    fn capture(&self) {
        let drained = crate::metrics_acc::drain_metrics();
        let mut st = self.state.lock().unwrap();
        let sample = sample_now(&mut st.last);
        let system = SYSTEM_COLUMNS.iter().map(|c| (c.get)(&sample)).collect();
        let flame = self.grab_flame(&mut st);
        st.count += 1;

        let tx = self.tx.lock().unwrap();
        let user = drained
            .into_iter()
            .map(|DrainedMetric { name, value, unit }| {
                let id = self.column_id(&mut st, &tx, name, unit);
                (id, value)
            })
            .collect();
        tx.send(Msg::Frame(Record {
            system,
            user,
            flame,
        }))
        .unwrap();
    }

    /// The column id for a user metric, declaring it to the stream the
    /// first time it appears. Ids follow the system columns.
    fn column_id(
        &self,
        st: &mut FrameState,
        tx: &Sender<Msg>,
        name: String,
        unit: String,
    ) -> u16 {
        if let Some(&id) = st.user_ids.get(&name) {
            return id;
        }
        let id = (SYSTEM_COLUMNS.len() + st.user_ids.len()) as u16;
        tx.send(Msg::Decl {
            id,
            name: name.clone(),
            unit,
        })
        .unwrap();
        st.user_ids.insert(name, id);
        id
    }

    /// The just-closed puffin frame serialized to bytes, when flame
    /// capture is on and this step is selected by `VOR_RECORD_EVERY`
    /// and under the `VOR_RECORD_MAX_FRAMES` cap.
    fn grab_flame(&self, st: &mut FrameState) -> Option<Vec<u8>> {
        let view = self.view.as_ref()?;
        if !st.count.is_multiple_of(self.flame_every) {
            return None;
        }
        if self.max_flames.is_some_and(|max| st.flames >= max) {
            return None;
        }
        let frame_view = view.lock();
        let frame = frame_view.latest_frame()?;
        let mut bytes = Vec::new();
        frame
            .write_into(Some(frame_view.scope_collection()), &mut bytes)
            .unwrap();
        st.flames += 1;
        Some(bytes)
    }

    fn flush(&self) {
        let (reply, done) = sync_channel(0);
        self.tx.lock().unwrap().send(Msg::Flush(reply)).unwrap();
        done.recv().unwrap();
    }
}

/// Arm the recorder from the environment. Called by
/// [`enable`](crate::enable); a no-op unless `VOR_RECORD` is set.
pub(crate) fn arm() {
    ARMED.call_once(|| {
        let Some(target) = std::env::var_os("VOR_RECORD") else {
            return;
        };
        let target = PathBuf::from(target);
        assert!(
            !target.to_string_lossy().starts_with("tcp://"),
            "VOR_RECORD tcp:// transport not yet implemented"
        );
        let flame = env_flag("VOR_RECORD_FLAME");
        let flame_every = env_u64("VOR_RECORD_EVERY").unwrap_or(1);
        let max_flames = env_u64("VOR_RECORD_MAX_FRAMES");
        // call_once guarantees a single arm, so set never contends.
        RECORDER
            .set(Recorder::new(target, flame, flame_every, max_flames))
            .ok()
            .unwrap();
    });
}

/// Capture the frame just closed by [`frame_mark`](crate::frame_mark).
/// Cheap no-op when not recording.
pub(crate) fn on_frame_mark() {
    if let Some(recorder) = RECORDER.get() {
        recorder.capture();
    }
}

/// Block until every captured frame is on disk. No-op when not
/// recording. Call before exit so a tailing/post-mortem reader sees
/// the run's final frames.
pub(crate) fn flush() {
    if let Some(recorder) = RECORDER.get() {
        recorder.flush();
    }
}

fn env_flag(key: &str) -> bool {
    std::env::var_os(key).is_some_and(|v| v.to_string_lossy() == "1")
}

fn env_u64(key: &str) -> Option<u64> {
    std::env::var_os(key).map(|v| {
        v.to_string_lossy()
            .parse()
            .unwrap_or_else(|_| panic!("{key} must be a positive integer, got {v:?}"))
    })
}

#[cfg(test)]
mod tests {
    use std::io::Cursor;

    use super::*;
    use crate::record::format::{StreamRecord, read_header, try_read_record};

    #[test]
    fn recorder_round_trips_system_and_metrics() {
        // Serialize against the other global-accumulator test so the
        // drained metric table is ours alone for the assertion.
        let _guard = crate::metrics_acc::TEST_GUARD.lock().unwrap();

        let dir = std::env::temp_dir();
        let path = dir.join("vor_recorder_test.vor");
        let recorder = Recorder::new(path.clone(), false, 1, None);

        crate::record_metric_unit("loss", "nats");
        for step in 0..3 {
            crate::record_metric("loss", 1.0 / (step as f64 + 1.0));
            recorder.capture();
        }
        recorder.flush();

        let bytes = std::fs::read(&path).unwrap();
        let mut c = Cursor::new(&bytes);
        let header = read_header(&mut c).unwrap();
        assert_eq!(header.columns.len(), SYSTEM_COLUMNS.len());
        assert_eq!(header.columns[0].name, "frame_ms");
        assert!(!header.flame_enabled);

        // loss takes the first id past the system columns, declared once.
        let loss_id = SYSTEM_COLUMNS.len() as u16;
        let n = header.columns.len();
        let mut at = c.position() as usize;
        let mut decls = Vec::new();
        let mut frames = Vec::new();
        while let Some((consumed, rec)) = try_read_record(&bytes[at..], n) {
            at += consumed;
            match rec {
                StreamRecord::Decl { id, name, unit } => decls.push((id, name, unit)),
                StreamRecord::Frame(f) => frames.push(f),
            }
        }
        assert_eq!(decls, vec![(loss_id, "loss".to_owned(), "nats".to_owned())]);
        assert_eq!(frames.len(), 3);
        for (step, f) in frames.iter().enumerate() {
            assert_eq!(f.system.len(), n);
            assert_eq!(f.user, vec![(loss_id, 1.0 / (step as f64 + 1.0))]);
            assert!(f.flame.is_none());
        }
        std::fs::remove_file(&path).unwrap();
    }
}