nodo_cask 0.18.5

Message recording to MCPA for NODO
Documentation
// Copyright 2024 David Weikersdorfer

use crate::CaskControlError;
use chrono::Local;
use mcap::records::MessageHeader as McapMessageHeader;
use nodo::prelude::{Message as NodoMessage, Pubtime};
use serde::Serialize;
use std::{
    collections::{BTreeMap, HashMap, HashSet},
    fs, io,
    path::{Path, PathBuf},
    time::Duration,
};

/// Configuration for writing a cask
#[derive(Clone)]
pub struct CaskWriterConfig {
    /// Filename of the MCAP file which is written. The filename will be pre-pended with the work
    /// directory of CaskControl. Do not include the extension ".mcap".
    pub filename: String,

    /// Set of channels to write
    pub channels: HashSet<CaskChannel>,

    /// Maximum number of messages to write per clip. If enabled the cask will written as multiple
    /// "clips" instead of one single file. The clip filename will be extended with the clip
    /// number, for example "my_cask-00001.mcap".
    pub max_clip_messages: Option<usize>,

    /// Maximum number of bytes to write per clip. See `max_clip_messages` for more information.
    pub max_clip_bytes: Option<usize>,

    /// Maximum duration per clip. See `max_clip_messages` for more information.
    pub max_clip_duration: Option<Duration>,

    /// Number of digits used to indicate clips
    pub clip_filename_digits: usize,
}

/// Information about a channel to be recorded into the casks
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CaskChannel {
    /// Topic name under which messages are stored in the MCAP
    pub topic: String,
}

/// Helper to create CaskWriterConfig
#[derive(Default)]
pub struct CaskWriterConfigBuilder {
    filename: Option<String>,
    channels: HashSet<CaskChannel>,
    max_clip_messages: Option<usize>,
    max_clip_bytes: Option<usize>,
    max_clip_duration: Option<Duration>,
    clip_filename_digits: Option<usize>,
}

impl CaskWriterConfigBuilder {
    /// Specifies the filename
    pub fn with_filename(mut self, filename: &str) -> Self {
        self.filename = Some(filename.into());
        self
    }

    /// Sets the filename based on an identifier and the current timestamp
    pub fn with_timestamp_filename(mut self, id: &str) -> Self {
        let timestamp = Local::now().format("%Y%m%d_%H%M%S");
        self.filename = Some(format!("cask_{id}_{:05}", timestamp));
        self
    }

    /// Enables a channel to be recorded into this cask
    pub fn with_channel(mut self, channel: CaskChannel) -> Self {
        self.channels.insert(channel);
        self
    }

    /// Enables multiple channels to be recorded into this cask
    pub fn with_channels<'a, I: IntoIterator<Item = CaskChannel>>(mut self, channels: I) -> Self {
        for channel in channels.into_iter() {
            self = self.with_channel(channel);
        }
        self
    }

    /// Disables a previously enabled channel to not be recorded
    pub fn without_channel(mut self, topic: &str) -> Self {
        self.channels.retain(|cc| cc.topic != topic);
        self
    }

    /// Writes the cask as multiple "clips" and limits the number of messages per clip.
    pub fn with_max_clip_messages(mut self, max_clip_messages: usize) -> Self {
        self.max_clip_messages = Some(max_clip_messages);
        self
    }

    /// Writes the cask as multiple "clips" and limits the number of bytes per clip.
    pub fn with_max_clip_bytes(mut self, max_clip_bytes: usize) -> Self {
        self.max_clip_bytes = Some(max_clip_bytes);
        self
    }

    /// Writes the cask as multiple "clips" and limits the duration of each clip.
    pub fn with_max_clip_duration(mut self, max_clip_duration: Duration) -> Self {
        self.max_clip_duration = Some(max_clip_duration);
        self
    }

    /// Sets the number of digits used for clip filenames
    pub fn with_clip_filename_digits(mut self, clip_filename_digits: usize) -> Self {
        self.clip_filename_digits = Some(clip_filename_digits);
        self
    }
}

impl From<CaskWriterConfigBuilder> for CaskWriterConfig {
    fn from(builder: CaskWriterConfigBuilder) -> Self {
        CaskWriterConfig {
            filename: builder.filename.unwrap_or_else(|| "out".into()),
            channels: builder.channels,
            max_clip_messages: builder.max_clip_messages,
            max_clip_bytes: builder.max_clip_bytes,
            max_clip_duration: builder.max_clip_duration,
            clip_filename_digits: builder.clip_filename_digits.unwrap_or(5),
        }
    }
}

/// Writes nodo messages to a cask using bincode serialization
pub struct CaskWriter {
    directory: PathBuf,

    writer: mcap::Writer<io::BufWriter<fs::File>>,

    channel_ids: CaskChannelMap<u16>,

    path: PathBuf,

    /// Workspace buffer used for serializing messages
    buffer: Vec<u8>,

    /// Config used for the writer
    config: CaskWriterConfig,

    /// Tracks how many messages were written to the cask
    total_messages_written: usize,

    /// Tracks how many bytes of serialized data was written to the cask
    total_bytes_written: usize,

    /// Total number of clips written
    total_clips: usize,

    clip_start_pubtime: Option<Pubtime>,

    /// Tracks how many messages were written to the current clip
    clip_messages_written: usize,

    /// Tracks how many bytes of serialized data was written to the current clip
    clip_bytes_written: usize,
}

impl CaskWriter {
    const INITIAL_BUFFER_SIZE: usize = 1024 * 1024;

    /// Creates a new cask in the given directory
    pub fn new(directory: &Path, config: &CaskWriterConfig) -> Result<Self, CaskControlError> {
        let path = Self::path_impl(config, directory, 1);

        let (writer, channel_ids) = Self::open_file_impl(config, &path)?;

        Ok(CaskWriter {
            directory: directory.into(),
            path,
            writer,
            channel_ids,
            buffer: Vec::with_capacity(Self::INITIAL_BUFFER_SIZE),
            config: config.clone(),
            total_messages_written: 0,
            total_bytes_written: 0,
            total_clips: 1,
            clip_start_pubtime: None,
            clip_messages_written: 0,
            clip_bytes_written: 0,
        })
    }

    /// Path under which the current clip is written
    pub fn path(&self) -> &Path {
        &self.path
    }

    fn path_impl(config: &CaskWriterConfig, directory: &Path, clip_index: usize) -> PathBuf {
        let filename = if config.max_clip_messages.is_some()
            || config.max_clip_bytes.is_some()
            || config.max_clip_duration.is_some()
        {
            &format!(
                "{}-{:0width$}",
                config.filename,
                clip_index,
                width = config.clip_filename_digits
            )
        } else {
            &config.filename
        };

        let mut path = directory.join(&filename);
        path.set_extension("mcap");
        path
    }

    /// Total number of messages written to the casks
    pub fn total_messages_written(&self) -> usize {
        self.total_messages_written
    }

    /// Total number of bytes written to the casks
    pub fn total_bytes_written(&self) -> usize {
        self.total_bytes_written
    }

    /// Total number of clips
    pub fn total_clips(&self) -> usize {
        self.total_clips
    }

    /// Number messages written in this clip
    pub fn clip_messages_written(&self) -> usize {
        self.clip_messages_written
    }

    /// Number bytes written in this clip
    pub fn clip_bytes_written(&self) -> usize {
        self.clip_bytes_written
    }

    /// Writes a message to the cask under a topic. Messages are only written if the topic is
    /// enabled via configuration. Returns total bytes written.
    pub fn write_message<T: Serialize>(
        &mut self,
        topic: &str,
        msg: &NodoMessage<T>,
    ) -> Result<usize, CaskControlError> {
        if let Some(&channel_id) = self.channel_ids.get(topic) {
            let header = McapMessageHeader {
                channel_id,
                sequence: msg.seq as u32,
                log_time: msg.stamp.acqtime.as_nanos() as u64,
                publish_time: msg.stamp.pubtime.as_nanos() as u64,
            };

            self.buffer.clear();
            bincode::serialize_into(&mut self.buffer, &msg.value)?;

            let bytes_written = self.buffer.len();
            self.total_messages_written += 1;
            self.total_bytes_written += bytes_written;
            self.clip_messages_written += 1;
            self.clip_bytes_written += bytes_written;

            if let Some(max_clip_messages) = self.config.max_clip_messages {
                if self.clip_messages_written > max_clip_messages {
                    self.next_clip()?;
                }
            }

            if let Some(max_clip_bytes) = self.config.max_clip_bytes {
                if self.clip_bytes_written > max_clip_bytes {
                    self.next_clip()?;
                }
            }

            if let Some(max_clip_duration) = self.config.max_clip_duration {
                if let Some(clip_start_pubtime) = self.clip_start_pubtime {
                    if let Some(duration) = msg.stamp.pubtime.checked_sub(clip_start_pubtime) {
                        if duration > max_clip_duration {
                            self.next_clip()?;
                        }
                    }
                } else {
                    self.clip_start_pubtime = Some(msg.stamp.pubtime);
                }
            }

            self.writer.write_to_known_channel(&header, &self.buffer)?;

            Ok(bytes_written)
        } else {
            Ok(0)
        }
    }

    fn next_clip(&mut self) -> Result<(), CaskControlError> {
        self.total_clips += 1;
        self.clip_start_pubtime = None;
        self.clip_messages_written = 0;
        self.clip_bytes_written = 0;

        // Clear channel ids so in case this function returns an error we are in a well-defined
        // state.
        self.channel_ids.0.clear();

        let path = Self::path_impl(&self.config, &self.directory, self.total_clips);

        let (writer, channel_ids) = Self::open_file_impl(&self.config, &path)?;
        self.writer = writer;
        // channel ids should not change, but even if they do we only use them internally
        self.channel_ids = channel_ids;

        Ok(())
    }

    fn open_file_impl(
        config: &CaskWriterConfig,
        path: &Path,
    ) -> Result<(mcap::Writer<io::BufWriter<fs::File>>, CaskChannelMap<u16>), CaskControlError>
    {
        let file = fs::File::create_new(path)?;
        let mut writer = mcap::Writer::new(io::BufWriter::new(file))?;

        let mut channel_ids = CaskChannelMap::default();
        for channel in config.channels.iter() {
            let channel_id = writer.add_channel(0, &channel.topic, "bincode", &BTreeMap::new())?;
            channel_ids.set(&channel.topic, channel_id);
        }

        Ok((writer, channel_ids))
    }
}

struct CaskChannelMap<T>(HashMap<String, T>);

impl<T> CaskChannelMap<T> {
    fn set<S: Into<String>>(&mut self, channel: S, value: T) {
        self.0.insert(channel.into(), value);
    }

    fn get(&self, channel: &str) -> Option<&T> {
        self.0.get(channel)
    }
}

impl<T> Default for CaskChannelMap<T> {
    fn default() -> Self {
        Self(HashMap::new())
    }
}