pub(crate) mod format;
pub mod read;
mod writer;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::mpsc::{Sender, channel, sync_channel};
use std::sync::{Mutex, Once, OnceLock};
use std::thread;
use puffin::GlobalFrameView;
use web_time::Instant;
use crate::metrics_acc::DrainedMetric;
use crate::record::format::{Column, Header, Record, VERSION};
use crate::record::writer::Msg;
use crate::system::{SYSTEM_COLUMNS, sample_now};
static RECORDER: OnceLock<Recorder> = OnceLock::new();
static ARMED: Once = Once::new();
struct FrameState {
last: Option<Instant>,
count: u64,
flames: u64,
user_ids: BTreeMap<String, u16>,
}
struct Recorder {
tx: Mutex<Sender<Msg>>,
state: Mutex<FrameState>,
view: Option<GlobalFrameView>,
flame_every: u64,
max_flames: Option<u64>,
}
impl Recorder {
fn new(target: PathBuf, flame: bool, flame_every: u64, max_flames: Option<u64>) -> Self {
assert!(flame_every >= 1);
crate::gpu::ensure_collector();
let columns = SYSTEM_COLUMNS
.iter()
.map(|c| Column {
name: c.name.to_owned(),
unit: c.unit.to_owned(),
})
.collect();
let header = Header {
version: VERSION,
flame_enabled: flame,
columns,
};
let view = flame.then(GlobalFrameView::default);
let (tx, rx) = channel();
thread::Builder::new()
.name("vor-record".into())
.spawn(move || writer::run(target, header, rx))
.unwrap();
Self {
tx: Mutex::new(tx),
state: Mutex::new(FrameState {
last: None,
count: 0,
flames: 0,
user_ids: BTreeMap::new(),
}),
view,
flame_every,
max_flames,
}
}
fn capture(&self) {
let drained = crate::metrics_acc::drain_metrics();
let mut st = self.state.lock().unwrap();
let sample = sample_now(&mut st.last);
let system = SYSTEM_COLUMNS.iter().map(|c| (c.get)(&sample)).collect();
let flame = self.grab_flame(&mut st);
st.count += 1;
let tx = self.tx.lock().unwrap();
let user = drained
.into_iter()
.map(|DrainedMetric { name, value, unit }| {
let id = self.column_id(&mut st, &tx, name, unit);
(id, value)
})
.collect();
tx.send(Msg::Frame(Record {
system,
user,
flame,
}))
.unwrap();
}
fn column_id(
&self,
st: &mut FrameState,
tx: &Sender<Msg>,
name: String,
unit: String,
) -> u16 {
if let Some(&id) = st.user_ids.get(&name) {
return id;
}
let id = (SYSTEM_COLUMNS.len() + st.user_ids.len()) as u16;
tx.send(Msg::Decl {
id,
name: name.clone(),
unit,
})
.unwrap();
st.user_ids.insert(name, id);
id
}
fn grab_flame(&self, st: &mut FrameState) -> Option<Vec<u8>> {
let view = self.view.as_ref()?;
if !st.count.is_multiple_of(self.flame_every) {
return None;
}
if self.max_flames.is_some_and(|max| st.flames >= max) {
return None;
}
let frame_view = view.lock();
let frame = frame_view.latest_frame()?;
let mut bytes = Vec::new();
frame
.write_into(Some(frame_view.scope_collection()), &mut bytes)
.unwrap();
st.flames += 1;
Some(bytes)
}
fn flush(&self) {
let (reply, done) = sync_channel(0);
self.tx.lock().unwrap().send(Msg::Flush(reply)).unwrap();
done.recv().unwrap();
}
}
pub(crate) fn arm() {
ARMED.call_once(|| {
let Some(target) = std::env::var_os("VOR_RECORD") else {
return;
};
let target = PathBuf::from(target);
assert!(
!target.to_string_lossy().starts_with("tcp://"),
"VOR_RECORD tcp:// transport not yet implemented"
);
let flame = env_flag("VOR_RECORD_FLAME");
let flame_every = env_u64("VOR_RECORD_EVERY").unwrap_or(1);
let max_flames = env_u64("VOR_RECORD_MAX_FRAMES");
RECORDER
.set(Recorder::new(target, flame, flame_every, max_flames))
.ok()
.unwrap();
});
}
pub(crate) fn on_frame_mark() {
if let Some(recorder) = RECORDER.get() {
recorder.capture();
}
}
pub(crate) fn flush() {
if let Some(recorder) = RECORDER.get() {
recorder.flush();
}
}
fn env_flag(key: &str) -> bool {
std::env::var_os(key).is_some_and(|v| v.to_string_lossy() == "1")
}
fn env_u64(key: &str) -> Option<u64> {
std::env::var_os(key).map(|v| {
v.to_string_lossy()
.parse()
.unwrap_or_else(|_| panic!("{key} must be a positive integer, got {v:?}"))
})
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::record::format::{StreamRecord, read_header, try_read_record};
#[test]
fn recorder_round_trips_system_and_metrics() {
let _guard = crate::metrics_acc::TEST_GUARD.lock().unwrap();
let dir = std::env::temp_dir();
let path = dir.join("vor_recorder_test.vor");
let recorder = Recorder::new(path.clone(), false, 1, None);
crate::record_metric_unit("loss", "nats");
for step in 0..3 {
crate::record_metric("loss", 1.0 / (step as f64 + 1.0));
recorder.capture();
}
recorder.flush();
let bytes = std::fs::read(&path).unwrap();
let mut c = Cursor::new(&bytes);
let header = read_header(&mut c).unwrap();
assert_eq!(header.columns.len(), SYSTEM_COLUMNS.len());
assert_eq!(header.columns[0].name, "frame_ms");
assert!(!header.flame_enabled);
let loss_id = SYSTEM_COLUMNS.len() as u16;
let n = header.columns.len();
let mut at = c.position() as usize;
let mut decls = Vec::new();
let mut frames = Vec::new();
while let Some((consumed, rec)) = try_read_record(&bytes[at..], n) {
at += consumed;
match rec {
StreamRecord::Decl { id, name, unit } => decls.push((id, name, unit)),
StreamRecord::Frame(f) => frames.push(f),
}
}
assert_eq!(decls, vec![(loss_id, "loss".to_owned(), "nats".to_owned())]);
assert_eq!(frames.len(), 3);
for (step, f) in frames.iter().enumerate() {
assert_eq!(f.system.len(), n);
assert_eq!(f.user, vec![(loss_id, 1.0 / (step as f64 + 1.0))]);
assert!(f.flame.is_none());
}
std::fs::remove_file(&path).unwrap();
}
}