use std::fs::File;
use std::io::{self, BufReader, Read};
use std::path::Path;
use crate::record::format::{self, Column, Header, Record, StreamRecord};
const CHUNK: usize = 64 * 1024;
pub struct MetricColumn {
pub name: String,
pub unit: String,
}
pub struct Frame {
pub system: Vec<f64>,
pub user: Vec<(String, f64)>,
pub flame: Option<Vec<u8>>,
}
pub struct Reader<R: Read> {
inner: R,
buf: Vec<u8>,
pos: usize,
columns: Vec<MetricColumn>,
user_columns: Vec<MetricColumn>,
flame_enabled: bool,
}
impl Reader<BufReader<File>> {
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
Self::new(BufReader::new(File::open(path)?))
}
}
impl<R: Read> Reader<R> {
pub fn new(mut inner: R) -> io::Result<Self> {
let Header {
version: _,
flame_enabled,
columns,
} = format::read_header(&mut inner)?;
let columns = columns
.into_iter()
.map(|Column { name, unit }| MetricColumn { name, unit })
.collect();
Ok(Self {
inner,
buf: Vec::new(),
pos: 0,
columns,
user_columns: Vec::new(),
flame_enabled,
})
}
pub fn columns(&self) -> &[MetricColumn] {
&self.columns
}
pub fn user_columns(&self) -> &[MetricColumn] {
&self.user_columns
}
pub const fn flame_enabled(&self) -> bool {
self.flame_enabled
}
pub fn next_frame(&mut self) -> io::Result<Option<Frame>> {
let n_system = self.columns.len();
loop {
while let Some((consumed, record)) =
format::try_read_record(&self.buf[self.pos..], n_system)
{
self.pos += consumed;
match record {
StreamRecord::Decl { id, name, unit } => {
let expected = (n_system + self.user_columns.len()) as u16;
assert_eq!(id, expected, "out-of-order .vor column declaration");
self.user_columns.push(MetricColumn { name, unit });
}
StreamRecord::Frame(Record {
system,
user,
flame,
}) => {
let user = user
.into_iter()
.map(|(id, value)| (self.name_of(id), value))
.collect();
return Ok(Some(Frame {
system,
user,
flame,
}));
}
}
}
self.buf.drain(..self.pos);
self.pos = 0;
let mut chunk = [0u8; CHUNK];
let n = self.inner.read(&mut chunk)?;
if n == 0 {
return Ok(None);
}
self.buf.extend_from_slice(&chunk[..n]);
}
}
fn name_of(&self, id: u16) -> String {
let idx = id as usize - self.columns.len();
self.user_columns[idx].name.clone()
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::record::format::{Column as FmtColumn, Header as FmtHeader, Record, VERSION};
use crate::record::format::{write_decl, write_frame, write_header};
#[test]
fn tails_a_partially_written_stream() {
let mut full = Vec::new();
write_header(
&mut full,
&FmtHeader {
version: VERSION,
flame_enabled: false,
columns: vec![FmtColumn {
name: "frame_ms".to_owned(),
unit: "ms".to_owned(),
}],
},
)
.unwrap();
write_decl(&mut full, 1, "loss", "").unwrap();
write_frame(
&mut full,
&Record {
system: vec![16.0],
user: vec![(1, 0.5)],
flame: None,
},
)
.unwrap();
let after_first = full.len();
write_frame(
&mut full,
&Record {
system: vec![17.0],
user: vec![(1, 0.4)],
flame: None,
},
)
.unwrap();
let cut = after_first + (full.len() - after_first) / 2;
let mut reader = Reader::new(Cursor::new(full[..cut].to_vec())).unwrap();
assert_eq!(reader.columns()[0].name, "frame_ms");
let f0 = reader.next_frame().unwrap().unwrap();
assert_eq!(f0.system, vec![16.0]);
assert_eq!(f0.user, vec![("loss".to_owned(), 0.5)]);
assert!(reader.next_frame().unwrap().is_none());
reader.inner = Cursor::new(full[cut..].to_vec());
let f1 = reader.next_frame().unwrap().unwrap();
assert_eq!(f1.system, vec![17.0]);
assert_eq!(f1.user, vec![("loss".to_owned(), 0.4)]);
assert!(reader.next_frame().unwrap().is_none());
}
}