use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
use crate::journal::Error;
use commonware_codec::{
varint::{UInt, MAX_U32_VARINT_SIZE},
Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
};
use commonware_runtime::{
buffer::paged::{Append, CacheRef, Replay},
Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
};
use futures::stream::{self, Stream, StreamExt};
use std::{io::Cursor, num::NonZeroUsize};
use tracing::{trace, warn};
use zstd::{bulk::compress, decode_all};
#[derive(Clone)]
pub struct Config<C> {
pub partition: String,
pub compression: Option<u8>,
pub codec_config: C,
pub page_cache: CacheRef,
pub write_buffer: NonZeroUsize,
}
#[inline]
fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
let initial = buf.remaining();
let size = UInt::<u32>::read(buf)?.0 as usize;
let varint_len = initial - buf.remaining();
Ok((size, varint_len))
}
enum ItemInfo {
Complete {
varint_len: usize,
data_len: usize,
},
Incomplete {
varint_len: usize,
prefix_len: usize,
total_len: usize,
},
}
fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
let available = buf.remaining();
let (size, varint_len) = decode_length_prefix(buf)?;
let next_offset = offset
.checked_add(varint_len as u64)
.ok_or(Error::OffsetOverflow)?
.checked_add(size as u64)
.ok_or(Error::OffsetOverflow)?;
let buffered = available.saturating_sub(varint_len);
let item = if buffered >= size {
ItemInfo::Complete {
varint_len,
data_len: size,
}
} else {
ItemInfo::Incomplete {
varint_len,
prefix_len: buffered,
total_len: size,
}
};
Ok((next_offset, item))
}
struct ReplayState<B: Blob, C> {
section: u64,
blob: Append<B>,
replay: Replay<B>,
skip_bytes: u64,
offset: u64,
valid_offset: u64,
codec_config: C,
compressed: bool,
done: bool,
}
fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
if compressed {
let decompressed =
decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
} else {
V::decode_cfg(item_data, cfg).map_err(Error::Codec)
}
}
pub struct Journal<E: Storage + Metrics, V: Codec> {
manager: Manager<E, AppendFactory>,
compression: Option<u8>,
codec_config: V::Cfg,
}
impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
let manager_cfg = ManagerConfig {
partition: cfg.partition,
factory: AppendFactory {
write_buffer: cfg.write_buffer,
page_cache_ref: cfg.page_cache,
},
};
let manager = Manager::init(context, manager_cfg).await?;
Ok(Self {
manager,
compression: cfg.compression,
codec_config: cfg.codec_config,
})
}
async fn read(
compressed: bool,
cfg: &V::Cfg,
blob: &Append<E::Blob>,
offset: u64,
) -> Result<(u64, u32, V), Error> {
let (buf, available) = blob
.read_up_to(
offset,
MAX_U32_VARINT_SIZE,
IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
)
.await?;
let buf = buf.freeze();
let mut cursor = Cursor::new(buf.slice(..available));
let (next_offset, item_info) = find_item(&mut cursor, offset)?;
let (item_size, decoded) = match item_info {
ItemInfo::Complete {
varint_len,
data_len,
} => {
let data = buf.slice(varint_len..varint_len + data_len);
let decoded = decode_item::<V>(data, cfg, compressed)?;
(data_len as u32, decoded)
}
ItemInfo::Incomplete {
varint_len,
prefix_len,
total_len,
} => {
let prefix = buf.slice(varint_len..varint_len + prefix_len);
let read_offset = offset + varint_len as u64 + prefix_len as u64;
let remainder_len = total_len - prefix_len;
let mut remainder = vec![0u8; remainder_len];
blob.read_into(&mut remainder, read_offset).await?;
let chained = prefix.chain(IoBuf::from(remainder));
let decoded = decode_item::<V>(chained, cfg, compressed)?;
(total_len as u32, decoded)
}
};
Ok((next_offset, item_size, decoded))
}
pub async fn replay(
&self,
start_section: u64,
mut start_offset: u64,
buffer: NonZeroUsize,
) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
let codec_config = self.codec_config.clone();
let compressed = self.compression.is_some();
let mut blobs = Vec::new();
for (§ion, blob) in self.manager.sections_from(start_section) {
blobs.push((
section,
blob.clone(),
blob.replay(buffer).await?,
codec_config.clone(),
compressed,
));
}
Ok(stream::iter(blobs).flat_map(
move |(section, blob, replay, codec_config, compressed)| {
let skip_bytes = if section == start_section {
start_offset
} else {
start_offset = 0;
0
};
stream::unfold(
ReplayState {
section,
blob,
replay,
skip_bytes,
offset: 0,
valid_offset: skip_bytes,
codec_config,
compressed,
done: false,
},
move |mut state| async move {
if state.done {
return None;
}
let blob_size = state.replay.blob_size();
let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
loop {
match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
Ok(true) => {}
Ok(false) => {
if state.replay.remaining() == 0 {
state.done = true;
return if batch.is_empty() {
None
} else {
Some((batch, state))
};
}
}
Err(err) => {
batch.push(Err(err.into()));
state.done = true;
return Some((batch, state));
}
}
if state.skip_bytes > 0 {
let to_skip =
state.skip_bytes.min(state.replay.remaining() as u64) as usize;
state.replay.advance(to_skip);
state.skip_bytes -= to_skip as u64;
state.offset += to_skip as u64;
continue;
}
let before_remaining = state.replay.remaining();
let (item_size, varint_len) =
match decode_length_prefix(&mut state.replay) {
Ok(result) => result,
Err(err) => {
if state.replay.is_exhausted()
|| before_remaining < MAX_U32_VARINT_SIZE
{
if state.valid_offset < blob_size
&& state.offset < blob_size
{
warn!(
blob = state.section,
bad_offset = state.offset,
new_size = state.valid_offset,
"trailing bytes detected: truncating"
);
state.blob.resize(state.valid_offset).await.ok()?;
}
state.done = true;
return if batch.is_empty() {
None
} else {
Some((batch, state))
};
}
batch.push(Err(err));
state.done = true;
return Some((batch, state));
}
};
match state.replay.ensure(item_size).await {
Ok(true) => {}
Ok(false) => {
warn!(
blob = state.section,
bad_offset = state.offset,
new_size = state.valid_offset,
"incomplete item at end: truncating"
);
state.blob.resize(state.valid_offset).await.ok()?;
state.done = true;
return if batch.is_empty() {
None
} else {
Some((batch, state))
};
}
Err(err) => {
batch.push(Err(err.into()));
state.done = true;
return Some((batch, state));
}
}
let item_offset = state.offset;
let next_offset = match state
.offset
.checked_add(varint_len as u64)
.and_then(|o| o.checked_add(item_size as u64))
{
Some(o) => o,
None => {
batch.push(Err(Error::OffsetOverflow));
state.done = true;
return Some((batch, state));
}
};
match decode_item::<V>(
(&mut state.replay).take(item_size),
&state.codec_config,
state.compressed,
) {
Ok(decoded) => {
batch.push(Ok((
state.section,
item_offset,
item_size as u32,
decoded,
)));
state.valid_offset = next_offset;
state.offset = next_offset;
}
Err(err) => {
batch.push(Err(err));
state.done = true;
return Some((batch, state));
}
}
if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
return Some((batch, state));
}
}
},
)
.flat_map(stream::iter)
},
))
}
pub(crate) fn encode_item(compression: Option<u8>, item: &V) -> Result<(Vec<u8>, u32), Error> {
if let Some(compression) = compression {
let encoded = item.encode();
let compressed =
compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
let item_len = compressed.len();
let item_len_u32: u32 = match item_len.try_into() {
Ok(len) => len,
Err(_) => return Err(Error::ItemTooLarge(item_len)),
};
let size_len = UInt(item_len_u32).encode_size();
let entry_len = size_len
.checked_add(item_len)
.ok_or(Error::OffsetOverflow)?;
let mut buf = Vec::with_capacity(entry_len);
UInt(item_len_u32).write(&mut buf);
buf.put_slice(&compressed);
Ok((buf, item_len_u32))
} else {
let item_len = item.encode_size();
let item_len_u32: u32 = match item_len.try_into() {
Ok(len) => len,
Err(_) => return Err(Error::ItemTooLarge(item_len)),
};
let size_len = UInt(item_len_u32).encode_size();
let entry_len = size_len
.checked_add(item_len)
.ok_or(Error::OffsetOverflow)?;
let mut buf = Vec::with_capacity(entry_len);
UInt(item_len_u32).write(&mut buf);
item.write(&mut buf);
Ok((buf, item_len_u32))
}
}
pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
let (buf, item_len) = Self::encode_item(self.compression, item)?;
self.append_raw(section, &buf)
.await
.map(|offset| (offset, item_len))
}
pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<u64, Error> {
let blob = self.manager.get_or_create(section).await?;
let offset = blob.size().await;
blob.append(buf).await?;
trace!(blob = section, offset, "appended item");
Ok(offset)
}
pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
let blob = self
.manager
.get(section)?
.ok_or(Error::SectionOutOfRange(section))?;
let (_, _, item) =
Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
Ok(item)
}
pub fn try_get_sync(&self, section: u64, offset: u64) -> Option<V> {
let blob = self.manager.get(section).ok()??;
let remaining = blob.try_size()?.checked_sub(offset)?;
let header_len = usize::try_from(remaining.min(MAX_U32_VARINT_SIZE as u64)).ok()?;
if header_len == 0 {
return None;
}
let mut header = [0u8; MAX_U32_VARINT_SIZE];
if !blob.try_read_sync(offset, &mut header[..header_len]) {
return None;
}
let mut cursor = Cursor::new(&header[..header_len]);
let (_, item_info) = find_item(&mut cursor, offset).ok()?;
let (varint_len, data_len) = match item_info {
ItemInfo::Complete {
varint_len,
data_len,
} => (varint_len, data_len),
ItemInfo::Incomplete {
varint_len,
total_len,
..
} => (varint_len, total_len),
};
let item_len = varint_len.checked_add(data_len)?;
if item_len > usize::try_from(remaining).ok()? {
return None;
}
if item_len <= header_len {
return decode_item::<V>(
&header[varint_len..varint_len + data_len],
&self.codec_config,
self.compression.is_some(),
)
.ok();
}
let mut full = vec![0u8; item_len];
if !blob.try_read_sync(offset, &mut full) {
return None;
}
decode_item::<V>(
&full[varint_len..varint_len + data_len],
&self.codec_config,
self.compression.is_some(),
)
.ok()
}
pub async fn size(&self, section: u64) -> Result<u64, Error> {
self.manager.size(section).await
}
pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
self.manager.rewind(section, offset).await
}
pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.manager.rewind(section, size).await
}
pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.manager.rewind_section(section, size).await
}
pub async fn sync(&self, section: u64) -> Result<(), Error> {
self.manager.sync(section).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.manager.sync_all().await
}
pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
self.manager.prune(min).await
}
pub fn oldest_section(&self) -> Option<u64> {
self.manager.oldest_section()
}
pub fn newest_section(&self) -> Option<u64> {
self.manager.newest_section()
}
pub fn is_empty(&self) -> bool {
self.manager.is_empty()
}
pub fn num_sections(&self) -> usize {
self.manager.num_sections()
}
pub async fn destroy(self) -> Result<(), Error> {
self.manager.destroy().await
}
pub async fn clear(&mut self) -> Result<(), Error> {
self.manager.clear().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_macros::test_traced;
use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
use commonware_utils::{NZUsize, NZU16};
use futures::{pin_mut, StreamExt};
use std::num::NonZeroU16;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
#[test_traced]
fn test_journal_append_and_read() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let index = 1u64;
let data = 10;
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
journal
.append(index, &data)
.await
.expect("Failed to append data");
let buffer = context.encode();
assert!(buffer.contains("first_tracked 1"));
journal.sync(index).await.expect("Failed to sync journal");
drop(journal);
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::new();
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
assert_eq!(items.len(), 1);
assert_eq!(items[0].0, index);
assert_eq!(items[0].1, data);
let buffer = context.encode();
assert!(buffer.contains("second_tracked 1"));
});
}
#[test_traced]
fn test_journal_multiple_appends_and_reads() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
for (index, data) in &data_items {
journal
.append(*index, data)
.await
.expect("Failed to append data");
journal.sync(*index).await.expect("Failed to sync blob");
}
let buffer = context.encode();
assert!(buffer.contains("first_tracked 3"));
assert!(buffer.contains("first_synced_total 4"));
drop(journal);
let journal = Journal::init(context.with_label("second"), cfg)
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, u32)>::new();
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
}
assert_eq!(items.len(), data_items.len());
for ((expected_index, expected_data), (actual_index, actual_data)) in
data_items.iter().zip(items.iter())
{
assert_eq!(actual_index, expected_index);
assert_eq!(actual_data, expected_data);
}
journal.destroy().await.expect("Failed to destroy journal");
});
}
#[test_traced]
fn test_journal_prune_blobs() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
for index in 1u64..=5u64 {
journal
.append(index, &index)
.await
.expect("Failed to append data");
journal.sync(index).await.expect("Failed to sync blob");
}
let data = 99;
journal
.append(2u64, &data)
.await
.expect("Failed to append data");
journal.sync(2u64).await.expect("Failed to sync blob");
journal.prune(3).await.expect("Failed to prune blobs");
let buffer = context.encode();
assert!(buffer.contains("first_pruned_total 2"));
journal.prune(2).await.expect("Failed to no-op prune");
let buffer = context.encode();
assert!(buffer.contains("first_pruned_total 2"));
drop(journal);
let mut journal = Journal::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, u64)>::new();
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
}
assert_eq!(items.len(), 3);
let expected_indices = [3u64, 4u64, 5u64];
for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
assert_eq!(item.0, *expected_index);
}
journal.prune(6).await.expect("Failed to prune blobs");
drop(journal);
assert!(context
.scan(&cfg.partition)
.await
.expect("Failed to list blobs")
.is_empty());
});
}
#[test_traced]
fn test_journal_prune_guard() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize journal");
for section in 1u64..=5u64 {
journal
.append(section, &(section as i32))
.await
.expect("Failed to append data");
journal.sync(section).await.expect("Failed to sync");
}
journal.prune(3).await.expect("Failed to prune");
match journal.append(1, &100).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.append(2, &100).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.get(1, 0).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.size(1).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.rewind(2, 0).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.rewind_section(1, 0).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
match journal.sync(2).await {
Err(Error::AlreadyPrunedToSection(3)) => {}
other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
}
assert!(journal.get(3, 0).await.is_ok());
assert!(journal.get(4, 0).await.is_ok());
assert!(journal.get(5, 0).await.is_ok());
assert!(journal.size(3).await.is_ok());
assert!(journal.sync(4).await.is_ok());
journal
.append(3, &999)
.await
.expect("Should be able to append to section 3");
journal.prune(5).await.expect("Failed to prune");
match journal.get(3, 0).await {
Err(Error::AlreadyPrunedToSection(5)) => {}
other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
}
match journal.get(4, 0).await {
Err(Error::AlreadyPrunedToSection(5)) => {}
other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
}
assert!(journal.get(5, 0).await.is_ok());
});
}
#[test_traced]
fn test_journal_prune_guard_across_restart() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
{
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
for section in 1u64..=5u64 {
journal
.append(section, &(section as i32))
.await
.expect("Failed to append data");
journal.sync(section).await.expect("Failed to sync");
}
journal.prune(3).await.expect("Failed to prune");
}
{
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
match journal.get(1, 0).await {
Err(Error::SectionOutOfRange(1)) => {}
other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
}
match journal.get(2, 0).await {
Err(Error::SectionOutOfRange(2)) => {}
other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
}
assert!(journal.get(3, 0).await.is_ok());
assert!(journal.get(4, 0).await.is_ok());
assert!(journal.get(5, 0).await.is_ok());
}
});
}
#[test_traced]
fn test_journal_with_invalid_blob_name() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let invalid_blob_name = b"invalid"; let (blob, _) = context
.open(&cfg.partition, invalid_blob_name)
.await
.expect("Failed to create blob with invalid name");
blob.sync().await.expect("Failed to sync blob");
let result = Journal::<_, u64>::init(context, cfg).await;
assert!(matches!(result, Err(Error::InvalidBlobName(_))));
});
}
#[test_traced]
fn test_journal_read_size_missing() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let section = 1u64;
let blob_name = section.to_be_bytes();
let (blob, _) = context
.open(&cfg.partition, &blob_name)
.await
.expect("Failed to create blob");
let mut incomplete_data = Vec::new();
UInt(u32::MAX).write(&mut incomplete_data);
incomplete_data.truncate(1);
blob.write_at(0, incomplete_data)
.await
.expect("Failed to write incomplete data");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context, cfg)
.await
.expect("Failed to initialize journal");
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
let mut items = Vec::<(u64, u64)>::new();
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
assert!(items.is_empty());
});
}
#[test_traced]
fn test_journal_read_item_missing() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let section = 1u64;
let blob_name = section.to_be_bytes();
let (blob, _) = context
.open(&cfg.partition, &blob_name)
.await
.expect("Failed to create blob");
let item_size: u32 = 10; let mut buf = Vec::new();
UInt(item_size).write(&mut buf); let data = [2u8; 5];
BufMut::put_slice(&mut buf, &data);
blob.write_at(0, buf)
.await
.expect("Failed to write incomplete item");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context, cfg)
.await
.expect("Failed to initialize journal");
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
let mut items = Vec::<(u64, u64)>::new();
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
assert!(items.is_empty());
});
}
#[test_traced]
fn test_journal_read_checksum_missing() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let section = 1u64;
let blob_name = section.to_be_bytes();
let (blob, _) = context
.open(&cfg.partition, &blob_name)
.await
.expect("Failed to create blob");
let item_data = b"Test data";
let item_size = item_data.len() as u32;
let mut buf = Vec::new();
UInt(item_size).write(&mut buf);
BufMut::put_slice(&mut buf, item_data);
blob.write_at(0, buf)
.await
.expect("Failed to write item without checksum");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context, cfg)
.await
.expect("Failed to initialize journal");
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
let mut items = Vec::<(u64, u64)>::new();
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
assert!(items.is_empty());
});
}
#[test_traced]
fn test_journal_read_checksum_mismatch() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let section = 1u64;
let blob_name = section.to_be_bytes();
let (blob, _) = context
.open(&cfg.partition, &blob_name)
.await
.expect("Failed to create blob");
let item_data = b"Test data";
let item_size = item_data.len() as u32;
let incorrect_checksum: u32 = 0xDEADBEEF;
let mut buf = Vec::new();
UInt(item_size).write(&mut buf);
BufMut::put_slice(&mut buf, item_data);
buf.put_u32(incorrect_checksum);
blob.write_at(0, buf)
.await
.expect("Failed to write item with bad checksum");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
let mut items = Vec::<(u64, u64)>::new();
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
assert!(items.is_empty());
}
drop(journal);
let (_, blob_size) = context
.open(&cfg.partition, §ion.to_be_bytes())
.await
.expect("Failed to open blob");
assert_eq!(blob_size, 0);
});
}
#[test_traced]
fn test_journal_truncation_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
journal.append(1, &1).await.expect("Failed to append data");
let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
for (index, data) in &data_items {
journal
.append(*index, data)
.await
.expect("Failed to append data");
journal.sync(*index).await.expect("Failed to sync blob");
}
journal.sync_all().await.expect("Failed to sync");
drop(journal);
let (blob, blob_size) = context
.open(&cfg.partition, &2u64.to_be_bytes())
.await
.expect("Failed to open blob");
blob.resize(blob_size - 4)
.await
.expect("Failed to corrupt blob");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, u32)>::new();
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
}
drop(journal);
assert_eq!(items.len(), 1);
assert_eq!(items[0].0, 1);
assert_eq!(items[0].1, 1);
let (_, blob_size) = context
.open(&cfg.partition, &2u64.to_be_bytes())
.await
.expect("Failed to open blob");
assert_eq!(blob_size, 0);
let mut journal = Journal::init(context.with_label("third"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, u32)>::new();
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
}
assert_eq!(items.len(), 1);
assert_eq!(items[0].0, 1);
assert_eq!(items[0].1, 1);
let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
journal.sync(2).await.expect("Failed to sync blob");
let item = journal.get(2, 0).await.expect("Failed to get item");
assert_eq!(item, 5);
drop(journal);
let journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, u32)>::new();
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
}
assert_eq!(items.len(), 2);
assert_eq!(items[0].0, 1);
assert_eq!(items[0].1, 1);
assert_eq!(items[1].0, 2);
assert_eq!(items[1].1, 5);
});
}
#[test_traced]
fn test_journal_handling_extra_data() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
journal.append(1, &1).await.expect("Failed to append data");
let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
for (index, data) in &data_items {
journal
.append(*index, data)
.await
.expect("Failed to append data");
journal.sync(*index).await.expect("Failed to sync blob");
}
journal.sync_all().await.expect("Failed to sync");
drop(journal);
let (blob, blob_size) = context
.open(&cfg.partition, &2u64.to_be_bytes())
.await
.expect("Failed to open blob");
blob.write_at(blob_size, vec![0u8; 16])
.await
.expect("Failed to add extra data");
blob.sync().await.expect("Failed to sync blob");
let journal = Journal::init(context.with_label("second"), cfg)
.await
.expect("Failed to re-initialize journal");
let mut items = Vec::<(u64, i32)>::new();
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("unable to setup replay");
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
Err(err) => panic!("Failed to read item: {err}"),
}
}
});
}
#[test_traced]
fn test_journal_rewind() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context, cfg).await.unwrap();
let size = journal.size(1).await.unwrap();
assert_eq!(size, 0);
journal.append(1, &42i32).await.unwrap();
let size = journal.size(1).await.unwrap();
assert!(size > 0);
journal.append(1, &43i32).await.unwrap();
let new_size = journal.size(1).await.unwrap();
assert!(new_size > size);
let size = journal.size(2).await.unwrap();
assert_eq!(size, 0);
journal.append(2, &44i32).await.unwrap();
let size = journal.size(2).await.unwrap();
assert!(size > 0);
journal.rewind(1, 0).await.unwrap();
let size = journal.size(1).await.unwrap();
assert_eq!(size, 0);
let size = journal.size(2).await.unwrap();
assert_eq!(size, 0);
});
}
#[test_traced]
fn test_journal_rewind_section() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context, cfg).await.unwrap();
let size = journal.size(1).await.unwrap();
assert_eq!(size, 0);
journal.append(1, &42i32).await.unwrap();
let size = journal.size(1).await.unwrap();
assert!(size > 0);
journal.append(1, &43i32).await.unwrap();
let new_size = journal.size(1).await.unwrap();
assert!(new_size > size);
let size = journal.size(2).await.unwrap();
assert_eq!(size, 0);
journal.append(2, &44i32).await.unwrap();
let size = journal.size(2).await.unwrap();
assert!(size > 0);
journal.rewind_section(1, 0).await.unwrap();
let size = journal.size(1).await.unwrap();
assert_eq!(size, 0);
let size = journal.size(2).await.unwrap();
assert!(size > 0);
});
}
#[test_traced]
fn test_journal_small_items() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let num_items = 100;
let mut offsets = Vec::new();
for i in 0..num_items {
let (offset, size) = journal
.append(1, &(i as u8))
.await
.expect("Failed to append data");
assert_eq!(size, 1, "u8 should encode to 1 byte");
offsets.push(offset);
}
journal.sync(1).await.expect("Failed to sync");
for (i, &offset) in offsets.iter().enumerate() {
let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
}
drop(journal);
let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
.await
.expect("Failed to re-initialize journal");
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
let (section, offset, size, item) = result.expect("Failed to replay item");
assert_eq!(section, 1);
assert_eq!(offset, offsets[count]);
assert_eq!(size, 1);
assert_eq!(item, count as u8);
count += 1;
}
assert_eq!(count, num_items, "Should replay all items");
});
}
#[test_traced]
fn test_journal_rewind_many_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
for section in 1u64..=10 {
journal.append(section, &(section as i32)).await.unwrap();
}
journal.sync_all().await.unwrap();
for section in 1u64..=10 {
let size = journal.size(section).await.unwrap();
assert!(size > 0, "section {section} should have data");
}
journal
.rewind(5, journal.size(5).await.unwrap())
.await
.unwrap();
for section in 1u64..=5 {
let size = journal.size(section).await.unwrap();
assert!(size > 0, "section {section} should still have data");
}
for section in 6u64..=10 {
let size = journal.size(section).await.unwrap();
assert_eq!(size, 0, "section {section} should be removed");
}
{
let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.unwrap();
items.push((section, item));
}
assert_eq!(items.len(), 5);
for (i, (section, item)) in items.iter().enumerate() {
assert_eq!(*section, (i + 1) as u64);
assert_eq!(*item, (i + 1) as i32);
}
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_rewind_partial_truncation() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
let mut sizes = Vec::new();
for i in 0..5 {
journal.append(1, &i).await.unwrap();
journal.sync(1).await.unwrap();
sizes.push(journal.size(1).await.unwrap());
}
let target_size = sizes[2];
journal.rewind(1, target_size).await.unwrap();
let new_size = journal.size(1).await.unwrap();
assert_eq!(new_size, target_size);
{
let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (_, _, _, item) = result.unwrap();
items.push(item);
}
assert_eq!(items.len(), 3);
for (i, item) in items.iter().enumerate() {
assert_eq!(*item, i as i32);
}
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_rewind_nonexistent_target() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
for section in 5u64..=7 {
journal.append(section, &(section as i32)).await.unwrap();
}
journal.sync_all().await.unwrap();
journal.rewind(3, 0).await.unwrap();
for section in 5u64..=7 {
let size = journal.size(section).await.unwrap();
assert_eq!(size, 0, "section {section} should be removed");
}
{
let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
pin_mut!(stream);
let items: Vec<_> = stream.collect().await;
assert!(items.is_empty());
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_rewind_persistence() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
for section in 1u64..=5 {
journal.append(section, &(section as i32)).await.unwrap();
}
journal.sync_all().await.unwrap();
let size = journal.size(2).await.unwrap();
journal.rewind(2, size).await.unwrap();
journal.sync_all().await.unwrap();
drop(journal);
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.unwrap();
for section in 1u64..=2 {
let size = journal.size(section).await.unwrap();
assert!(size > 0, "section {section} should have data after restart");
}
for section in 3u64..=5 {
let size = journal.size(section).await.unwrap();
assert_eq!(size, 0, "section {section} should be gone after restart");
}
{
let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.unwrap();
items.push((section, item));
}
assert_eq!(items.len(), 2);
assert_eq!(items[0], (1, 1));
assert_eq!(items[1], (2, 2));
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_rewind_to_zero_removes_all_newer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
for section in 1u64..=3 {
journal.append(section, &(section as i32)).await.unwrap();
}
journal.sync_all().await.unwrap();
journal.rewind(1, 0).await.unwrap();
let size = journal.size(1).await.unwrap();
assert_eq!(size, 0, "section 1 should be empty");
for section in 2u64..=3 {
let size = journal.size(section).await.unwrap();
assert_eq!(size, 0, "section {section} should be removed");
}
{
let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
pin_mut!(stream);
let items: Vec<_> = stream.collect().await;
assert!(items.is_empty());
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_replay_start_offset_with_trailing_bytes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
for i in 0..5i32 {
journal.append(1, &i).await.unwrap();
}
journal.sync(1).await.unwrap();
let valid_logical_size = journal.size(1).await.unwrap();
drop(journal);
let (blob, physical_size_before) = context
.open(&cfg.partition, &1u64.to_be_bytes())
.await
.unwrap();
blob.write_at(physical_size_before, vec![0xFF, 0xFF])
.await
.unwrap();
blob.sync().await.unwrap();
let start_offset = valid_logical_size;
{
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.unwrap();
let stream = journal
.replay(1, start_offset, NZUsize!(1024))
.await
.unwrap();
pin_mut!(stream);
while let Some(_result) = stream.next().await {}
}
let (_, physical_size_after) = context
.open(&cfg.partition, &1u64.to_be_bytes())
.await
.unwrap();
assert!(
physical_size_after >= physical_size_before,
"Valid data was lost! Physical blob truncated from {physical_size_before} to \
{physical_size_after}. Logical valid size was {valid_logical_size}. \
This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
);
});
}
#[test_traced]
fn test_journal_large_item_spanning_pages() {
const LARGE_SIZE: usize = 2048;
type LargeItem = [u8; LARGE_SIZE];
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(4096),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let mut large_data: LargeItem = [0u8; LARGE_SIZE];
for (i, byte) in large_data.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
assert!(
LARGE_SIZE > PAGE_SIZE.get() as usize,
"Item must be larger than page size"
);
let (offset, size) = journal
.append(1, &large_data)
.await
.expect("Failed to append large item");
assert_eq!(size as usize, LARGE_SIZE);
journal.sync(1).await.expect("Failed to sync");
let retrieved: LargeItem = journal
.get(1, offset)
.await
.expect("Failed to get large item");
assert_eq!(retrieved, large_data, "Random access read mismatch");
drop(journal);
let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, off, sz, item) = result.expect("Failed to replay item");
items.push((section, off, sz, item));
}
assert_eq!(items.len(), 1, "Should have exactly one item");
let (section, off, sz, item) = &items[0];
assert_eq!(*section, 1);
assert_eq!(*off, offset);
assert_eq!(*sz as usize, LARGE_SIZE);
assert_eq!(*item, large_data, "Replay read mismatch");
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_non_contiguous_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
let mut offsets = Vec::new();
for (section, data) in §ions_and_data {
let (offset, _) = journal
.append(*section, data)
.await
.expect("Failed to append");
offsets.push(offset);
}
journal.sync_all().await.expect("Failed to sync");
for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
let retrieved: i32 = journal
.get(*section, offsets[i])
.await
.expect("Failed to get item");
assert_eq!(retrieved, *expected_data);
}
for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
let result = journal.get(missing_section, 0).await;
assert!(
matches!(result, Err(Error::SectionOutOfRange(_))),
"Expected SectionOutOfRange for section {}, got {:?}",
missing_section,
result
);
}
drop(journal);
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.expect("Failed to replay item");
items.push((section, item));
}
assert_eq!(items.len(), 3, "Should have 3 items");
assert_eq!(items[0], (1, 100));
assert_eq!(items[1], (5, 500));
assert_eq!(items[2], (10, 1000));
}
{
let stream = journal
.replay(5, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay from section 5");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.expect("Failed to replay item");
items.push((section, item));
}
assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
assert_eq!(items[0], (5, 500));
assert_eq!(items[1], (10, 1000));
}
{
let stream = journal
.replay(3, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay from section 3");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.expect("Failed to replay item");
items.push((section, item));
}
assert_eq!(items.len(), 2);
assert_eq!(items[0], (5, 500));
assert_eq!(items[1], (10, 1000));
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_empty_section_in_middle() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
journal.append(1, &100i32).await.expect("Failed to append");
journal.append(2, &200i32).await.expect("Failed to append");
journal.sync(2).await.expect("Failed to sync");
journal
.rewind_section(2, 0)
.await
.expect("Failed to rewind");
journal.append(3, &300i32).await.expect("Failed to append");
journal.sync_all().await.expect("Failed to sync");
assert!(journal.size(1).await.unwrap() > 0);
assert_eq!(journal.size(2).await.unwrap(), 0);
assert!(journal.size(3).await.unwrap() > 0);
drop(journal);
let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.expect("Failed to replay item");
items.push((section, item));
}
assert_eq!(
items.len(),
2,
"Should have 2 items (skipping empty section)"
);
assert_eq!(items[0], (1, 100));
assert_eq!(items[1], (3, 300));
}
{
let stream = journal
.replay(2, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay from section 2");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, _, item) = result.expect("Failed to replay item");
items.push((section, item));
}
assert_eq!(items.len(), 1, "Should have 1 item from section 3");
assert_eq!(items[0], (3, 300));
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_item_exactly_page_size() {
const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
type ExactItem = [u8; ITEM_SIZE];
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(4096),
};
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
for (i, byte) in exact_data.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
let (offset, size) = journal
.append(1, &exact_data)
.await
.expect("Failed to append exact item");
assert_eq!(size as usize, ITEM_SIZE);
journal.sync(1).await.expect("Failed to sync");
let retrieved: ExactItem = journal
.get(1, offset)
.await
.expect("Failed to get exact item");
assert_eq!(retrieved, exact_data, "Random access read mismatch");
drop(journal);
let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, off, sz, item) = result.expect("Failed to replay item");
items.push((section, off, sz, item));
}
assert_eq!(items.len(), 1, "Should have exactly one item");
let (section, off, sz, item) = &items[0];
assert_eq!(*section, 1);
assert_eq!(*off, offset);
assert_eq!(*sz as usize, ITEM_SIZE);
assert_eq!(*item, exact_data, "Replay read mismatch");
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_varint_spanning_page_boundary() {
const SMALL_PAGE: NonZeroU16 = NZU16!(16);
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal: Journal<_, [u8; 128]> =
Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize journal");
let item1: [u8; 128] = [1u8; 128];
let item2: [u8; 128] = [2u8; 128];
let item3: [u8; 128] = [3u8; 128];
let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
journal.sync(1).await.expect("Failed to sync");
let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
assert_eq!(retrieved1, item1);
assert_eq!(retrieved2, item2);
assert_eq!(retrieved3, item3);
drop(journal);
let journal: Journal<_, [u8; 128]> =
Journal::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to re-initialize journal");
{
let stream = journal
.replay(0, 0, NZUsize!(64))
.await
.expect("Failed to setup replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, off, _, item) = result.expect("Failed to replay item");
items.push((section, off, item));
}
assert_eq!(items.len(), 3, "Should have 3 items");
assert_eq!(items[0], (1, offset1, item1));
assert_eq!(items[1], (1, offset2, item2));
assert_eq!(items[2], (1, offset3, item3));
}
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_journal_clear() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "clear-test".into(),
compression: None,
codec_config: (),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal: Journal<_, u64> =
Journal::init(context.with_label("journal"), cfg.clone())
.await
.expect("Failed to initialize journal");
for section in 0..5u64 {
for i in 0..10u64 {
journal
.append(section, &(section * 1000 + i))
.await
.expect("Failed to append");
}
journal.sync(section).await.expect("Failed to sync");
}
assert_eq!(journal.get(0, 0).await.unwrap(), 0);
assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
journal.clear().await.expect("Failed to clear");
for section in 0..5u64 {
assert!(matches!(
journal.get(section, 0).await,
Err(Error::SectionOutOfRange(s)) if s == section
));
}
for i in 0..5u64 {
journal
.append(10, &(i * 100))
.await
.expect("Failed to append after clear");
}
journal.sync(10).await.expect("Failed to sync after clear");
assert_eq!(journal.get(10, 0).await.unwrap(), 0);
assert!(matches!(
journal.get(0, 0).await,
Err(Error::SectionOutOfRange(0))
));
journal.destroy().await.unwrap();
});
}
}