use binrw::BinRead;
use crate::{
parse_record,
records::{Footer, Record},
sans_io::linear_reader::{LinearReadEvent, LinearReader, LinearReaderOptions},
McapError, McapResult, Summary, MAGIC,
};
use std::io::SeekFrom;
const FOOTER_RECORD_AND_END_MAGIC: usize = 1 + 8 + 8 + 8 + 4 + 8;
pub enum SummaryReadEvent {
ReadRequest(usize),
SeekRequest(std::io::SeekFrom),
}
#[derive(Default)]
enum State {
#[default]
SeekingToFooter,
ReadingFooter {
loaded_bytes: usize,
},
SeekingToSummary {
summary_start: u64,
},
ReadingSummary {
summary_start: u64,
reader: Box<LinearReader>,
channeler: crate::read::ChannelAccumulator<'static>,
},
}
#[derive(Default)]
pub struct SummaryReader {
pos: u64,
footer_buf: Vec<u8>,
file_size: Option<u64>,
state: State,
summary: crate::Summary,
summary_present: bool,
at_eof: bool,
options: SummaryReaderOptions,
}
#[derive(Debug, Default)]
pub struct SummaryReaderOptions {
pub file_size: Option<u64>,
pub record_length_limit: Option<usize>,
}
impl SummaryReaderOptions {
pub fn with_file_size(mut self, size: u64) -> Self {
self.file_size = Some(size);
self
}
pub fn with_record_length_limit(mut self, limit: usize) -> Self {
self.record_length_limit = Some(limit);
self
}
}
fn compute_record_length_limit(
pos: u64,
file_size: Option<u64>,
record_length_limit: Option<usize>,
) -> Option<usize> {
let remain =
file_size.map(|size| usize::try_from(size.saturating_sub(pos)).unwrap_or(usize::MAX));
match (remain, record_length_limit) {
(Some(remain), Some(limit)) => Some(std::cmp::min(remain, limit)),
(Some(remain), None) => Some(remain),
(None, Some(limit)) => Some(limit),
(None, None) => None,
}
}
impl SummaryReader {
pub fn new() -> Self {
Self::default()
}
pub fn new_with_options(options: SummaryReaderOptions) -> Self {
Self {
file_size: options.file_size,
options,
..Default::default()
}
}
pub fn next_event(&mut self) -> Option<McapResult<SummaryReadEvent>> {
self.next_event_inner().transpose()
}
pub fn next_event_inner(&mut self) -> McapResult<Option<SummaryReadEvent>> {
loop {
match &mut self.state {
State::SeekingToFooter => {
let Some(file_size) = self.file_size else {
return Ok(Some(SummaryReadEvent::SeekRequest(SeekFrom::End(
-(FOOTER_RECORD_AND_END_MAGIC as i64),
))));
};
if file_size < FOOTER_RECORD_AND_END_MAGIC as u64 + 8 {
return Err(crate::McapError::UnexpectedEof);
}
let footer_start_pos = file_size - FOOTER_RECORD_AND_END_MAGIC as u64;
if self.pos == footer_start_pos {
self.state = State::ReadingFooter { loaded_bytes: 0 };
continue;
} else {
return Ok(Some(SummaryReadEvent::SeekRequest(SeekFrom::Start(
footer_start_pos,
))));
}
}
State::ReadingFooter { loaded_bytes } => {
if *loaded_bytes >= FOOTER_RECORD_AND_END_MAGIC {
let opcode = self.footer_buf[0];
let footer_body = &self.footer_buf[1 + 8..FOOTER_RECORD_AND_END_MAGIC - 8];
let end_magic =
&self.footer_buf[FOOTER_RECORD_AND_END_MAGIC - 8..*loaded_bytes];
if end_magic != MAGIC {
return Err(McapError::BadMagic);
}
if opcode != crate::records::op::FOOTER {
return Err(McapError::BadFooter);
}
let mut cursor = std::io::Cursor::new(footer_body);
let footer = Footer::read_le(&mut cursor)?;
if footer.summary_start == 0 {
return Ok(None);
}
self.summary_present = true;
self.state = State::SeekingToSummary {
summary_start: footer.summary_start,
};
continue;
} else {
if self.at_eof {
return Err(McapError::UnexpectedEof);
}
return Ok(Some(SummaryReadEvent::ReadRequest(
FOOTER_RECORD_AND_END_MAGIC - *loaded_bytes,
)));
}
}
State::SeekingToSummary { summary_start } => {
if self.pos == *summary_start {
let mut options =
LinearReaderOptions::default().with_skip_start_magic(true);
if let Some(limit) = compute_record_length_limit(
self.pos,
self.file_size,
self.options.record_length_limit,
) {
options = options.with_record_length_limit(limit);
}
self.state = State::ReadingSummary {
summary_start: *summary_start,
reader: Box::new(LinearReader::new_with_options(options)),
channeler: crate::read::ChannelAccumulator::default(),
};
continue;
} else {
return Ok(Some(SummaryReadEvent::SeekRequest(SeekFrom::Start(
*summary_start,
))));
}
}
State::ReadingSummary {
reader, channeler, ..
} => match reader.next_event() {
Some(Ok(LinearReadEvent::Record { data, opcode })) => {
match parse_record(opcode, data)?.into_owned() {
Record::AttachmentIndex(index) => {
self.summary.attachment_indexes.push(index);
}
Record::MetadataIndex(index) => {
self.summary.metadata_indexes.push(index);
}
Record::Statistics(statistics) => {
self.summary.stats = Some(statistics);
}
Record::Channel(channel) => channeler.add_channel(channel)?,
Record::Schema { header, data } => {
channeler.add_schema(header, data)?;
}
Record::ChunkIndex(index) => self.summary.chunk_indexes.push(index),
_ => {}
};
continue;
}
Some(Ok(LinearReadEvent::ReadRequest(n))) => {
return Ok(Some(SummaryReadEvent::ReadRequest(n)));
}
Some(Err(err)) => {
return Err(err);
}
None => {
self.summary.schemas = channeler.schemas.clone();
self.summary.channels = channeler.channels.clone();
return Ok(None);
}
},
}
}
}
pub fn notify_read(&mut self, n: usize) {
self.at_eof = n == 0;
match &mut self.state {
State::ReadingFooter { loaded_bytes, .. } => {
assert!(
self.footer_buf.len() >= *loaded_bytes + n,
"notify_read called with n > last inserted length",
);
*loaded_bytes += n;
}
State::ReadingSummary { reader, .. } => {
reader.notify_read(n);
}
_ => {}
}
self.pos += n as u64;
}
pub fn notify_seeked(&mut self, pos: u64) {
if self.at_eof && self.pos != pos {
self.at_eof = false;
}
if self.file_size.is_none() {
self.file_size = Some(pos + FOOTER_RECORD_AND_END_MAGIC as u64);
}
if self.pos != pos {
match self.state {
State::ReadingFooter { .. } => {
self.footer_buf.clear();
self.state = State::SeekingToFooter;
}
State::ReadingSummary { summary_start, .. } => {
self.state = State::SeekingToSummary { summary_start };
self.summary = Summary::default();
}
_ => {}
}
}
self.pos = pos;
}
pub fn insert(&mut self, n: usize) -> &mut [u8] {
match &mut self.state {
State::ReadingFooter { loaded_bytes } => {
self.footer_buf.resize(*loaded_bytes + n, 0);
&mut self.footer_buf[*loaded_bytes..]
}
State::ReadingSummary { reader, .. } => reader.insert(n),
_ => {
self.footer_buf.resize(n, 0);
&mut self.footer_buf[..]
}
}
}
pub fn finish(self) -> Option<Summary> {
if self.summary_present {
Some(self.summary)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use std::io::{Read, Seek, Write};
#[test]
fn test_smoke() {
let mut f = std::fs::File::open("tests/data/compressed.mcap").expect("could not open file");
let mut summary_loader = SummaryReader::new();
while let Some(event) = summary_loader.next_event() {
let event = event.expect("got error instead of event");
match event {
SummaryReadEvent::ReadRequest(n) => {
let read = f.read(summary_loader.insert(n)).expect("failed file read");
summary_loader.notify_read(read);
}
SummaryReadEvent::SeekRequest(to) => {
let pos = f.seek(to).expect("failed file seek");
summary_loader.notify_seeked(pos);
}
}
}
let Some(summary) = summary_loader.finish() else {
panic!("should have found a summary")
};
assert_eq!(summary.chunk_indexes.len(), 413);
}
#[test]
fn test_truncated_mcap() {
let mut buf = Vec::new();
std::fs::File::open("tests/data/uncompressed.mcap")
.expect("could not open file")
.read_to_end(&mut buf)
.expect("could not read file");
let truncated = &buf[..buf.len() - 100];
let mut cursor = std::io::Cursor::new(truncated);
let mut summary_loader = SummaryReader::new();
let mut failed = false;
while let Some(event) = summary_loader.next_event() {
match event {
Ok(SummaryReadEvent::ReadRequest(n)) => {
let read = cursor
.read(summary_loader.insert(n))
.expect("failed file read");
summary_loader.notify_read(read);
}
Ok(SummaryReadEvent::SeekRequest(to)) => {
let pos = cursor.seek(to).expect("failed file seek");
summary_loader.notify_seeked(pos);
}
Err(err) => {
assert!(matches!(err, McapError::BadMagic));
failed = true;
break;
}
}
}
assert!(failed);
}
#[test]
fn test_bounds() {
let buf = Vec::new();
let mut file = std::io::Cursor::new(buf);
file.write_all(MAGIC).unwrap();
assert_matches!(Summary::read(file.get_ref()), Err(McapError::UnexpectedEof));
let stats_record = file.stream_position().unwrap();
file.write_all(&[0x0b]).unwrap();
file.write_all(&46_u64.to_le_bytes()).unwrap();
file.write_all(&[0x0; 46]).unwrap();
let footer_record = file.stream_position().unwrap();
file.write_all(&[0x02]).unwrap();
file.write_all(&20_u64.to_le_bytes()).unwrap();
let footer_content = file.stream_position().unwrap();
file.write_all(&[0xff; 20]).unwrap();
file.write_all(MAGIC).unwrap();
assert_matches!(Summary::read(file.get_ref()), Err(McapError::UnexpectedEof));
file.seek(SeekFrom::Start(footer_content)).unwrap();
file.write_all(&[0x00; 20]).unwrap();
file.write_all(MAGIC).unwrap();
assert_matches!(Summary::read(file.get_ref()), Ok(None));
file.seek(SeekFrom::Start(footer_content)).unwrap();
file.write_all(&footer_record.to_le_bytes()).unwrap();
file.write_all(&[0x00; 12]).unwrap();
file.write_all(MAGIC).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Ok(Some(Summary { stats: None, .. }))
);
file.seek(SeekFrom::Start(footer_content)).unwrap();
file.write_all(&stats_record.to_le_bytes()).unwrap();
file.write_all(&[0x00; 12]).unwrap();
file.write_all(MAGIC).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Ok(Some(Summary { stats: Some(_), .. }))
);
file.seek(SeekFrom::Start(stats_record + 1)).unwrap();
file.write_all(&[0x11; 8]).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Err(McapError::RecordTooLarge { opcode: 0x0b, .. })
);
file.seek(SeekFrom::Start(stats_record + 1)).unwrap();
file.write_all(&[0xff; 8]).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Err(McapError::RecordTooLarge { opcode: 0x0b, .. })
);
file.seek(SeekFrom::Start(footer_content)).unwrap();
file.write_all(&footer_record.to_le_bytes()).unwrap();
file.seek(SeekFrom::Start(footer_record + 1)).unwrap();
file.write_all(&(20_u64 + 8 + 1).to_le_bytes()).unwrap();
assert_matches!(Summary::read(file.get_ref()), Err(McapError::UnexpectedEof));
file.seek(SeekFrom::Start(footer_record + 1)).unwrap();
file.write_all(&[0x11; 8]).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Err(McapError::RecordTooLarge { opcode: 2, .. })
);
file.seek(SeekFrom::Start(footer_record + 1)).unwrap();
file.write_all(&[0xff; 8]).unwrap();
assert_matches!(
Summary::read(file.get_ref()),
Err(McapError::RecordTooLarge { opcode: 2, .. })
);
}
}