use crate::common::*;
use nodo_cask::{cask_read_all_nodo_messages, CaskChannel, CaskWriter, CaskWriterConfigBuilder};
use std::time::Duration;
use tempfile::tempdir;
#[allow(dead_code)]
mod common;
#[test]
fn test_cask_read_write() -> eyre::Result<()> {
for count in [0, 1, 10, 100, 1000] {
let tmp_dir = tempdir()?;
let expected = (0..count)
.map(|i| Foo::message_from_index(i))
.collect::<Vec<_>>();
let path = {
let mut writer = CaskWriter::new(
&tmp_dir.path(),
&CaskWriterConfigBuilder::default()
.with_timestamp_filename(&format!("test_cask_read_write_{count}"))
.with_channel(CaskChannel {
topic: "foo".into(),
})
.into(),
)?;
for msg in expected.iter() {
writer.write_message("foo", &msg)?;
}
writer.path().to_owned()
};
{
let msgs = cask_read_all_nodo_messages(&path)?;
assert_eq!(msgs, expected);
}
}
Ok(())
}
#[test]
fn test_cask_read_write_max_clip_messages() -> eyre::Result<()> {
const CLIP_MSG_COUNT: usize = 100;
for count in [100, 1015] {
let tmp_dir = tempdir()?;
let expected = (0..count)
.map(|i| Foo::message_from_index(i))
.collect::<Vec<_>>();
let path = {
let mut writer = CaskWriter::new(
&tmp_dir.path(),
&CaskWriterConfigBuilder::default()
.with_timestamp_filename(&format!("test_cask_read_write_{count}"))
.with_channel(CaskChannel {
topic: "foo".into(),
})
.with_max_clip_messages(CLIP_MSG_COUNT)
.into(),
)?;
for msg in expected.iter() {
writer.write_message("foo", &msg)?;
assert!(writer.clip_messages_written() <= CLIP_MSG_COUNT);
}
assert_eq!(
writer.total_clips(),
(count + CLIP_MSG_COUNT - 1) / CLIP_MSG_COUNT
);
writer.path().to_owned()
};
{
let msgs = cask_read_all_nodo_messages(&path)?;
assert_eq!(msgs, expected);
}
}
Ok(())
}
#[test]
fn test_cask_read_write_clipped_max_clip_bytes() -> eyre::Result<()> {
const CLIP_BYTES: usize = 100;
for count in [100, 1015] {
let tmp_dir = tempdir()?;
let expected = (0..count)
.map(|i| Foo::message_from_index(i))
.collect::<Vec<_>>();
let path = {
let mut writer = CaskWriter::new(
&tmp_dir.path(),
&CaskWriterConfigBuilder::default()
.with_timestamp_filename(&format!("test_cask_read_write_{count}"))
.with_channel(CaskChannel {
topic: "foo".into(),
})
.with_max_clip_bytes(CLIP_BYTES)
.into(),
)?;
for msg in expected.iter() {
writer.write_message("foo", &msg)?;
println!("{:?}", writer.clip_bytes_written());
assert!(writer.clip_bytes_written() <= CLIP_BYTES);
}
assert!(writer.total_clips() >= 2);
writer.path().to_owned()
};
{
let msgs = cask_read_all_nodo_messages(&path)?;
assert_eq!(msgs, expected);
}
}
Ok(())
}
#[test]
fn test_cask_read_write_clipped_max_clip_duration() -> eyre::Result<()> {
const CLIP_DURATION_MS: u64 = 250;
for count in [250, 1015] {
let tmp_dir = tempdir()?;
let expected = (0..count)
.map(|i| Foo::message_from_index(i))
.collect::<Vec<_>>();
let path = {
let mut writer = CaskWriter::new(
&tmp_dir.path(),
&CaskWriterConfigBuilder::default()
.with_timestamp_filename(&format!("test_cask_read_write_{count}"))
.with_channel(CaskChannel {
topic: "foo".into(),
})
.with_max_clip_duration(Duration::from_millis(CLIP_DURATION_MS))
.into(),
)?;
for msg in expected.iter() {
writer.write_message("foo", &msg)?;
}
assert_eq!(
writer.total_clips(),
(count + CLIP_DURATION_MS as usize - 1) / CLIP_DURATION_MS as usize
);
writer.path().to_owned()
};
{
let msgs = cask_read_all_nodo_messages(&path)?;
assert_eq!(msgs, expected);
}
}
Ok(())
}