use crate::CaskControlError;
use chrono::Local;
use mcap::records::MessageHeader as McapMessageHeader;
use nodo::prelude::{Message as NodoMessage, Pubtime};
use serde::Serialize;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fs, io,
path::{Path, PathBuf},
time::Duration,
};
#[derive(Clone)]
pub struct CaskWriterConfig {
pub filename: String,
pub channels: HashSet<CaskChannel>,
pub max_clip_messages: Option<usize>,
pub max_clip_bytes: Option<usize>,
pub max_clip_duration: Option<Duration>,
pub clip_filename_digits: usize,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CaskChannel {
pub topic: String,
}
#[derive(Default)]
pub struct CaskWriterConfigBuilder {
filename: Option<String>,
channels: HashSet<CaskChannel>,
max_clip_messages: Option<usize>,
max_clip_bytes: Option<usize>,
max_clip_duration: Option<Duration>,
clip_filename_digits: Option<usize>,
}
impl CaskWriterConfigBuilder {
pub fn with_filename(mut self, filename: &str) -> Self {
self.filename = Some(filename.into());
self
}
pub fn with_timestamp_filename(mut self, id: &str) -> Self {
let timestamp = Local::now().format("%Y%m%d_%H%M%S");
self.filename = Some(format!("cask_{id}_{:05}", timestamp));
self
}
pub fn with_channel(mut self, channel: CaskChannel) -> Self {
self.channels.insert(channel);
self
}
pub fn with_channels<'a, I: IntoIterator<Item = CaskChannel>>(mut self, channels: I) -> Self {
for channel in channels.into_iter() {
self = self.with_channel(channel);
}
self
}
pub fn without_channel(mut self, topic: &str) -> Self {
self.channels.retain(|cc| cc.topic != topic);
self
}
pub fn with_max_clip_messages(mut self, max_clip_messages: usize) -> Self {
self.max_clip_messages = Some(max_clip_messages);
self
}
pub fn with_max_clip_bytes(mut self, max_clip_bytes: usize) -> Self {
self.max_clip_bytes = Some(max_clip_bytes);
self
}
pub fn with_max_clip_duration(mut self, max_clip_duration: Duration) -> Self {
self.max_clip_duration = Some(max_clip_duration);
self
}
pub fn with_clip_filename_digits(mut self, clip_filename_digits: usize) -> Self {
self.clip_filename_digits = Some(clip_filename_digits);
self
}
}
impl From<CaskWriterConfigBuilder> for CaskWriterConfig {
fn from(builder: CaskWriterConfigBuilder) -> Self {
CaskWriterConfig {
filename: builder.filename.unwrap_or_else(|| "out".into()),
channels: builder.channels,
max_clip_messages: builder.max_clip_messages,
max_clip_bytes: builder.max_clip_bytes,
max_clip_duration: builder.max_clip_duration,
clip_filename_digits: builder.clip_filename_digits.unwrap_or(5),
}
}
}
pub struct CaskWriter {
directory: PathBuf,
writer: mcap::Writer<io::BufWriter<fs::File>>,
channel_ids: CaskChannelMap<u16>,
path: PathBuf,
buffer: Vec<u8>,
config: CaskWriterConfig,
total_messages_written: usize,
total_bytes_written: usize,
total_clips: usize,
clip_start_pubtime: Option<Pubtime>,
clip_messages_written: usize,
clip_bytes_written: usize,
}
impl CaskWriter {
const INITIAL_BUFFER_SIZE: usize = 1024 * 1024;
pub fn new(directory: &Path, config: &CaskWriterConfig) -> Result<Self, CaskControlError> {
let path = Self::path_impl(config, directory, 1);
let (writer, channel_ids) = Self::open_file_impl(config, &path)?;
Ok(CaskWriter {
directory: directory.into(),
path,
writer,
channel_ids,
buffer: Vec::with_capacity(Self::INITIAL_BUFFER_SIZE),
config: config.clone(),
total_messages_written: 0,
total_bytes_written: 0,
total_clips: 1,
clip_start_pubtime: None,
clip_messages_written: 0,
clip_bytes_written: 0,
})
}
pub fn path(&self) -> &Path {
&self.path
}
fn path_impl(config: &CaskWriterConfig, directory: &Path, clip_index: usize) -> PathBuf {
let filename = if config.max_clip_messages.is_some()
|| config.max_clip_bytes.is_some()
|| config.max_clip_duration.is_some()
{
&format!(
"{}-{:0width$}",
config.filename,
clip_index,
width = config.clip_filename_digits
)
} else {
&config.filename
};
let mut path = directory.join(&filename);
path.set_extension("mcap");
path
}
pub fn total_messages_written(&self) -> usize {
self.total_messages_written
}
pub fn total_bytes_written(&self) -> usize {
self.total_bytes_written
}
pub fn total_clips(&self) -> usize {
self.total_clips
}
pub fn clip_messages_written(&self) -> usize {
self.clip_messages_written
}
pub fn clip_bytes_written(&self) -> usize {
self.clip_bytes_written
}
pub fn write_message<T: Serialize>(
&mut self,
topic: &str,
msg: &NodoMessage<T>,
) -> Result<usize, CaskControlError> {
if let Some(&channel_id) = self.channel_ids.get(topic) {
let header = McapMessageHeader {
channel_id,
sequence: msg.seq as u32,
log_time: msg.stamp.acqtime.as_nanos() as u64,
publish_time: msg.stamp.pubtime.as_nanos() as u64,
};
self.buffer.clear();
bincode::serialize_into(&mut self.buffer, &msg.value)?;
let bytes_written = self.buffer.len();
self.total_messages_written += 1;
self.total_bytes_written += bytes_written;
self.clip_messages_written += 1;
self.clip_bytes_written += bytes_written;
if let Some(max_clip_messages) = self.config.max_clip_messages {
if self.clip_messages_written > max_clip_messages {
self.next_clip()?;
}
}
if let Some(max_clip_bytes) = self.config.max_clip_bytes {
if self.clip_bytes_written > max_clip_bytes {
self.next_clip()?;
}
}
if let Some(max_clip_duration) = self.config.max_clip_duration {
if let Some(clip_start_pubtime) = self.clip_start_pubtime {
if let Some(duration) = msg.stamp.pubtime.checked_sub(clip_start_pubtime) {
if duration > max_clip_duration {
self.next_clip()?;
}
}
} else {
self.clip_start_pubtime = Some(msg.stamp.pubtime);
}
}
self.writer.write_to_known_channel(&header, &self.buffer)?;
Ok(bytes_written)
} else {
Ok(0)
}
}
fn next_clip(&mut self) -> Result<(), CaskControlError> {
self.total_clips += 1;
self.clip_start_pubtime = None;
self.clip_messages_written = 0;
self.clip_bytes_written = 0;
self.channel_ids.0.clear();
let path = Self::path_impl(&self.config, &self.directory, self.total_clips);
let (writer, channel_ids) = Self::open_file_impl(&self.config, &path)?;
self.writer = writer;
self.channel_ids = channel_ids;
Ok(())
}
fn open_file_impl(
config: &CaskWriterConfig,
path: &Path,
) -> Result<(mcap::Writer<io::BufWriter<fs::File>>, CaskChannelMap<u16>), CaskControlError>
{
let file = fs::File::create_new(path)?;
let mut writer = mcap::Writer::new(io::BufWriter::new(file))?;
let mut channel_ids = CaskChannelMap::default();
for channel in config.channels.iter() {
let channel_id = writer.add_channel(0, &channel.topic, "bincode", &BTreeMap::new())?;
channel_ids.set(&channel.topic, channel_id);
}
Ok((writer, channel_ids))
}
}
struct CaskChannelMap<T>(HashMap<String, T>);
impl<T> CaskChannelMap<T> {
fn set<S: Into<String>>(&mut self, channel: S, value: T) {
self.0.insert(channel.into(), value);
}
fn get(&self, channel: &str) -> Option<&T> {
self.0.get(channel)
}
}
impl<T> Default for CaskChannelMap<T> {
fn default() -> Self {
Self(HashMap::new())
}
}