use eyre::{eyre, WrapErr};
use mcap::{read::RawMessageStream as McapRawMessageStream, Message as McapMessage};
use memmap::Mmap;
use nodo::prelude::{Message as NodoMessage, Stamp};
use std::{
fs,
path::{Path, PathBuf},
time::Duration,
};
pub struct CaskReader {
_file: fs::File,
mmap: Mmap,
}
impl CaskReader {
pub fn open(path: &Path) -> eyre::Result<Self> {
let file = fs::File::open(path).context("Couldn't open MCAP file")?;
let mmap = unsafe { Mmap::map(&file) }.context("Couldn't map MCAP file")?;
Ok(CaskReader { _file: file, mmap })
}
pub fn summary(&self) -> eyre::Result<mcap::Summary> {
Ok(mcap::Summary::read(&self.mmap)?.ok_or_else(|| eyre!("no summary"))?)
}
pub fn messages(&self) -> eyre::Result<mcap::MessageStream> {
Ok(mcap::MessageStream::new(&self.mmap)?)
}
pub fn raw_message_stream(&self) -> eyre::Result<McapRawMessageStream> {
Ok(McapRawMessageStream::new(&self.mmap)?)
}
}
pub fn mcap_to_nodo_message<T>(msg: &McapMessage) -> eyre::Result<NodoMessage<T>>
where
T: for<'a> serde::Deserialize<'a>,
{
let value = bincode::deserialize(&msg.data).context("bincode deserialization failed")?;
Ok(NodoMessage {
seq: msg.sequence as u64,
stamp: Stamp {
acqtime: Duration::from_nanos(msg.log_time).into(),
pubtime: Duration::from_nanos(msg.publish_time).into(),
},
value,
})
}
pub fn next_clip_path(path: &Path) -> Option<PathBuf> {
let file_name = path.file_name()?.to_str()?;
if !file_name.ends_with(".mcap") {
return None;
}
let stem = &file_name[..file_name.len() - 5]; let dash_pos = stem.rfind('-')?;
let (prefix, number_str) = stem.split_at(dash_pos);
let number_str = &number_str[1..];
let number: u64 = number_str.parse().ok()?;
let incremented = number + 1;
let new_file_name = format!(
"{}-{:0width$}.mcap",
prefix,
incremented,
width = number_str.len()
);
let mut new_path = path.to_path_buf();
new_path.set_file_name(new_file_name);
Some(new_path)
}
pub fn cask_read_all_nodo_messages<T>(path: &Path) -> eyre::Result<Vec<NodoMessage<T>>>
where
T: for<'a> serde::Deserialize<'a>,
{
let mut result = Vec::new();
let mut current_path = path.to_path_buf();
while current_path.exists() {
for msg in CaskReader::open(¤t_path)?.messages()? {
let msg = msg?;
result.push(mcap_to_nodo_message(&msg)?);
}
match next_clip_path(¤t_path) {
Some(next_path) => current_path = next_path,
None => break,
}
}
Ok(result)
}
#[cfg(test)]
mod test {
use super::*;
use std::path::PathBuf;
#[test]
fn test_next_clip_path() {
assert_eq!(
next_clip_path(&PathBuf::from("/hello/world-001.mcap")),
Some(PathBuf::from("/hello/world-002.mcap"))
);
assert_eq!(
next_clip_path(&PathBuf::from("N:\\hello\\world-010.mcap")),
Some(PathBuf::from("N:\\hello\\world-011.mcap"))
);
assert_eq!(
next_clip_path(&PathBuf::from("/hello/world-999.mcap")),
Some(PathBuf::from("/hello/world-1000.mcap"))
);
assert_eq!(next_clip_path(&PathBuf::from("/hello/world-001.mca")), None);
assert_eq!(next_clip_path(&PathBuf::from("/hello/world.mcap")), None);
}
}