nodo_cask 0.18.5

Message recording to MCPA for NODO
Documentation
// Copyright 2024 David Weikersdorfer

use eyre::{eyre, WrapErr};
use mcap::{read::RawMessageStream as McapRawMessageStream, Message as McapMessage};
use memmap::Mmap;
use nodo::prelude::{Message as NodoMessage, Stamp};
use std::{
    fs,
    path::{Path, PathBuf},
    time::Duration,
};

pub struct CaskReader {
    _file: fs::File,
    mmap: Mmap,
}

impl CaskReader {
    pub fn open(path: &Path) -> eyre::Result<Self> {
        let file = fs::File::open(path).context("Couldn't open MCAP file")?;
        let mmap = unsafe { Mmap::map(&file) }.context("Couldn't map MCAP file")?;
        Ok(CaskReader { _file: file, mmap })
    }

    pub fn summary(&self) -> eyre::Result<mcap::Summary> {
        Ok(mcap::Summary::read(&self.mmap)?.ok_or_else(|| eyre!("no summary"))?)
    }

    pub fn messages(&self) -> eyre::Result<mcap::MessageStream> {
        Ok(mcap::MessageStream::new(&self.mmap)?)
    }

    pub fn raw_message_stream(&self) -> eyre::Result<McapRawMessageStream> {
        Ok(McapRawMessageStream::new(&self.mmap)?)
    }
}

pub fn mcap_to_nodo_message<T>(msg: &McapMessage) -> eyre::Result<NodoMessage<T>>
where
    T: for<'a> serde::Deserialize<'a>,
{
    let value = bincode::deserialize(&msg.data).context("bincode deserialization failed")?;

    Ok(NodoMessage {
        seq: msg.sequence as u64,
        stamp: Stamp {
            acqtime: Duration::from_nanos(msg.log_time).into(),
            pubtime: Duration::from_nanos(msg.publish_time).into(),
        },
        value,
    })
}

/// If given path is a clip path gets the next clip path. For example for the clip path
/// "/tmp/foo-0313.mcap" this function would return "/tmp/foo-0314.mcap".
pub fn next_clip_path(path: &Path) -> Option<PathBuf> {
    let file_name = path.file_name()?.to_str()?;
    if !file_name.ends_with(".mcap") {
        return None;
    }

    let stem = &file_name[..file_name.len() - 5]; // remove ".mcap"
    let dash_pos = stem.rfind('-')?;
    let (prefix, number_str) = stem.split_at(dash_pos);
    let number_str = &number_str[1..]; // skip the dash

    let number: u64 = number_str.parse().ok()?;
    let incremented = number + 1;

    let new_file_name = format!(
        "{}-{:0width$}.mcap",
        prefix,
        incremented,
        width = number_str.len()
    );

    let mut new_path = path.to_path_buf();
    new_path.set_file_name(new_file_name);
    Some(new_path)
}

/// Reads all messages stored in a cask potentially divided into multiple clips. Note that this
/// function might allocate a lot of memory and should only be used on small casks.
pub fn cask_read_all_nodo_messages<T>(path: &Path) -> eyre::Result<Vec<NodoMessage<T>>>
where
    T: for<'a> serde::Deserialize<'a>,
{
    let mut result = Vec::new();
    let mut current_path = path.to_path_buf();

    while current_path.exists() {
        for msg in CaskReader::open(&current_path)?.messages()? {
            let msg = msg?;
            result.push(mcap_to_nodo_message(&msg)?);
        }

        match next_clip_path(&current_path) {
            Some(next_path) => current_path = next_path,
            None => break,
        }
    }

    Ok(result)
}

#[cfg(test)]
mod test {
    use super::*;
    use std::path::PathBuf;

    #[test]
    fn test_next_clip_path() {
        assert_eq!(
            next_clip_path(&PathBuf::from("/hello/world-001.mcap")),
            Some(PathBuf::from("/hello/world-002.mcap"))
        );

        assert_eq!(
            next_clip_path(&PathBuf::from("N:\\hello\\world-010.mcap")),
            Some(PathBuf::from("N:\\hello\\world-011.mcap"))
        );

        assert_eq!(
            next_clip_path(&PathBuf::from("/hello/world-999.mcap")),
            Some(PathBuf::from("/hello/world-1000.mcap"))
        );

        assert_eq!(next_clip_path(&PathBuf::from("/hello/world-001.mca")), None);

        assert_eq!(next_clip_path(&PathBuf::from("/hello/world.mcap")), None);
    }
}