nodo_cask 0.18.5

Message recording to MCPA for NODO
Documentation
// Copyright 2025 David Weikersdorfer

use crate::common::*;
use nodo_cask::{cask_read_all_nodo_messages, CaskChannel, CaskWriter, CaskWriterConfigBuilder};
use std::time::Duration;
use tempfile::tempdir;

#[allow(dead_code)]
mod common;

#[test]
fn test_cask_read_write() -> eyre::Result<()> {
    for count in [0, 1, 10, 100, 1000] {
        let tmp_dir = tempdir()?;

        let expected = (0..count)
            .map(|i| Foo::message_from_index(i))
            .collect::<Vec<_>>();

        let path = {
            let mut writer = CaskWriter::new(
                &tmp_dir.path(),
                &CaskWriterConfigBuilder::default()
                    .with_timestamp_filename(&format!("test_cask_read_write_{count}"))
                    .with_channel(CaskChannel {
                        topic: "foo".into(),
                    })
                    .into(),
            )?;

            for msg in expected.iter() {
                writer.write_message("foo", &msg)?;
            }

            writer.path().to_owned()
        };

        {
            let msgs = cask_read_all_nodo_messages(&path)?;

            assert_eq!(msgs, expected);
        }
    }

    Ok(())
}

#[test]
fn test_cask_read_write_max_clip_messages() -> eyre::Result<()> {
    const CLIP_MSG_COUNT: usize = 100;

    for count in [100, 1015] {
        let tmp_dir = tempdir()?;

        let expected = (0..count)
            .map(|i| Foo::message_from_index(i))
            .collect::<Vec<_>>();

        let path = {
            let mut writer = CaskWriter::new(
                &tmp_dir.path(),
                &CaskWriterConfigBuilder::default()
                    .with_timestamp_filename(&format!("test_cask_read_write_{count}"))
                    .with_channel(CaskChannel {
                        topic: "foo".into(),
                    })
                    .with_max_clip_messages(CLIP_MSG_COUNT)
                    .into(),
            )?;

            for msg in expected.iter() {
                writer.write_message("foo", &msg)?;
                assert!(writer.clip_messages_written() <= CLIP_MSG_COUNT);
            }

            assert_eq!(
                writer.total_clips(),
                (count + CLIP_MSG_COUNT - 1) / CLIP_MSG_COUNT
            );

            writer.path().to_owned()
        };

        {
            let msgs = cask_read_all_nodo_messages(&path)?;

            assert_eq!(msgs, expected);
        }
    }

    Ok(())
}

#[test]
fn test_cask_read_write_clipped_max_clip_bytes() -> eyre::Result<()> {
    const CLIP_BYTES: usize = 100;

    for count in [100, 1015] {
        let tmp_dir = tempdir()?;

        let expected = (0..count)
            .map(|i| Foo::message_from_index(i))
            .collect::<Vec<_>>();

        let path = {
            let mut writer = CaskWriter::new(
                &tmp_dir.path(),
                &CaskWriterConfigBuilder::default()
                    .with_timestamp_filename(&format!("test_cask_read_write_{count}"))
                    .with_channel(CaskChannel {
                        topic: "foo".into(),
                    })
                    .with_max_clip_bytes(CLIP_BYTES)
                    .into(),
            )?;

            for msg in expected.iter() {
                writer.write_message("foo", &msg)?;
                println!("{:?}", writer.clip_bytes_written());
                assert!(writer.clip_bytes_written() <= CLIP_BYTES);
            }

            assert!(writer.total_clips() >= 2);

            writer.path().to_owned()
        };

        {
            let msgs = cask_read_all_nodo_messages(&path)?;

            assert_eq!(msgs, expected);
        }
    }

    Ok(())
}

#[test]
fn test_cask_read_write_clipped_max_clip_duration() -> eyre::Result<()> {
    const CLIP_DURATION_MS: u64 = 250;

    for count in [250, 1015] {
        let tmp_dir = tempdir()?;

        let expected = (0..count)
            .map(|i| Foo::message_from_index(i))
            .collect::<Vec<_>>();

        let path = {
            let mut writer = CaskWriter::new(
                &tmp_dir.path(),
                &CaskWriterConfigBuilder::default()
                    .with_timestamp_filename(&format!("test_cask_read_write_{count}"))
                    .with_channel(CaskChannel {
                        topic: "foo".into(),
                    })
                    .with_max_clip_duration(Duration::from_millis(CLIP_DURATION_MS))
                    .into(),
            )?;

            for msg in expected.iter() {
                writer.write_message("foo", &msg)?;
            }

            // Note that message_from_index creates pubtimes at 1ms intervals
            assert_eq!(
                writer.total_clips(),
                (count + CLIP_DURATION_MS as usize - 1) / CLIP_DURATION_MS as usize
            );

            writer.path().to_owned()
        };

        {
            let msgs = cask_read_all_nodo_messages(&path)?;

            assert_eq!(msgs, expected);
        }
    }

    Ok(())
}