use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
use crate::journal::Error;
use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
use commonware_runtime::{
buffer::paged::{CacheRef, Replay},
Blob, Buf, Metrics, Storage,
};
use futures::{
stream::{self, Stream},
StreamExt,
};
use std::{marker::PhantomData, num::NonZeroUsize};
use tracing::{trace, warn};
struct ReplayState<B: Blob> {
section: u64,
replay: Replay<B>,
position: u64,
done: bool,
}
#[derive(Clone)]
pub struct Config {
pub partition: String,
pub page_cache: CacheRef,
pub write_buffer: NonZeroUsize,
}
pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
manager: Manager<E, AppendFactory>,
_array: PhantomData<A>,
}
impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
pub const CHUNK_SIZE: usize = A::SIZE;
const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
let manager_cfg = ManagerConfig {
partition: cfg.partition,
factory: AppendFactory {
write_buffer: cfg.write_buffer,
page_cache_ref: cfg.page_cache,
},
};
let mut manager = Manager::init(context, manager_cfg).await?;
let sections: Vec<_> = manager.sections().collect();
for section in sections {
let size = manager.size(section).await?;
if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
let valid_size = size - (size % Self::CHUNK_SIZE_U64);
warn!(
section,
invalid_size = size,
new_size = valid_size,
"trailing bytes detected: truncating"
);
manager.rewind_section(section, valid_size).await?;
}
}
Ok(Self {
manager,
_array: PhantomData,
})
}
pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
let blob = self.manager.get_or_create(section).await?;
let size = blob.size().await;
if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
return Err(Error::InvalidBlobSize(section, size));
}
let position = size / Self::CHUNK_SIZE_U64;
let buf = item.encode_mut();
blob.append(&buf).await?;
trace!(section, position, "appended item");
Ok(position)
}
pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<(), Error> {
assert!(!buf.is_empty());
assert!(buf.len().is_multiple_of(Self::CHUNK_SIZE));
let blob = self.manager.get_or_create(section).await?;
blob.append(buf).await?;
trace!(
section,
count = buf.len() / Self::CHUNK_SIZE,
"appended items"
);
Ok(())
}
pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
let blob = self
.manager
.get(section)?
.ok_or(Error::SectionOutOfRange(section))?;
let offset = position
.checked_mul(Self::CHUNK_SIZE_U64)
.ok_or(Error::ItemOutOfRange(position))?;
let end = offset
.checked_add(Self::CHUNK_SIZE_U64)
.ok_or(Error::ItemOutOfRange(position))?;
if end > blob.size().await {
return Err(Error::ItemOutOfRange(position));
}
let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
A::decode(buf.coalesce()).map_err(Error::Codec)
}
pub fn try_get_sync(&self, section: u64, position: u64) -> Option<A> {
let blob = self.manager.get(section).ok()??;
let offset = position.checked_mul(Self::CHUNK_SIZE_U64)?;
let remaining = blob.try_size()?.checked_sub(offset)?;
if remaining < Self::CHUNK_SIZE_U64 {
return None;
}
let mut buf = vec![0u8; Self::CHUNK_SIZE];
if !blob.try_read_sync(offset, &mut buf) {
return None;
}
A::decode(&buf[..]).ok()
}
pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
let blob = self
.manager
.get(section)?
.ok_or(Error::SectionOutOfRange(section))?;
let size = blob.size().await;
if size < Self::CHUNK_SIZE_U64 {
return Ok(None);
}
let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
let offset = last_position * Self::CHUNK_SIZE_U64;
let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
}
pub async fn replay(
&self,
start_section: u64,
start_position: u64,
buffer: NonZeroUsize,
) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
let mut blob_info = Vec::new();
for (§ion, blob) in self.manager.sections_from(start_section) {
let blob_size = blob.size().await;
let mut replay = blob.replay(buffer).await?;
let initial_position = if section == start_section {
let start = start_position * Self::CHUNK_SIZE_U64;
if start > blob_size {
return Err(Error::ItemOutOfRange(start_position));
}
replay.seek_to(start)?;
start_position
} else {
0
};
blob_info.push((section, replay, initial_position));
}
Ok(
stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
stream::unfold(
ReplayState {
section,
replay,
position: initial_position,
done: false,
},
move |mut state| async move {
if state.done {
return None;
}
let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
loop {
match state.replay.ensure(Self::CHUNK_SIZE).await {
Ok(true) => {}
Ok(false) => {
state.done = true;
return if batch.is_empty() {
None
} else {
Some((batch, state))
};
}
Err(err) => {
batch.push(Err(Error::Runtime(err)));
state.done = true;
return Some((batch, state));
}
}
while state.replay.remaining() >= Self::CHUNK_SIZE {
match A::read(&mut state.replay) {
Ok(item) => {
batch.push(Ok((state.section, state.position, item)));
state.position += 1;
}
Err(err) => {
batch.push(Err(Error::Codec(err)));
state.done = true;
return Some((batch, state));
}
}
}
if !batch.is_empty() {
return Some((batch, state));
}
}
},
)
.flat_map(stream::iter)
}),
)
}
pub async fn sync(&self, section: u64) -> Result<(), Error> {
self.manager.sync(section).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.manager.sync_all().await
}
pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
self.manager.prune(min).await
}
pub fn oldest_section(&self) -> Option<u64> {
self.manager.oldest_section()
}
pub fn newest_section(&self) -> Option<u64> {
self.manager.newest_section()
}
pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
self.manager.sections_from(0).map(|(section, _)| *section)
}
pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
let size = self.manager.size(section).await?;
Ok(size / Self::CHUNK_SIZE_U64)
}
pub async fn size(&self, section: u64) -> Result<u64, Error> {
self.manager.size(section).await
}
pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
self.manager.rewind(section, offset).await
}
pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.manager.rewind_section(section, size).await
}
pub async fn destroy(self) -> Result<(), Error> {
self.manager.destroy().await
}
pub async fn clear(&mut self) -> Result<(), Error> {
self.manager.clear().await
}
pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
self.manager.get_or_create(section).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
use commonware_macros::test_traced;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner,
};
use commonware_utils::{NZUsize, NZU16};
use core::num::NonZeroU16;
use futures::{pin_mut, StreamExt};
const PAGE_SIZE: NonZeroU16 = NZU16!(44);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
fn test_digest(value: u64) -> Digest {
Sha256::hash(&value.to_be_bytes())
}
fn test_cfg(pooler: &impl BufferPooler) -> Config {
Config {
partition: "test-partition".into(),
page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(2048),
}
}
#[test_traced]
fn test_segmented_fixed_append_and_get() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
let pos0 = journal
.append(1, &test_digest(0))
.await
.expect("failed to append");
assert_eq!(pos0, 0);
let pos1 = journal
.append(1, &test_digest(1))
.await
.expect("failed to append");
assert_eq!(pos1, 1);
let pos2 = journal
.append(2, &test_digest(2))
.await
.expect("failed to append");
assert_eq!(pos2, 0);
let item0 = journal.get(1, 0).await.expect("failed to get");
assert_eq!(item0, test_digest(0));
let item1 = journal.get(1, 1).await.expect("failed to get");
assert_eq!(item1, test_digest(1));
let item2 = journal.get(2, 0).await.expect("failed to get");
assert_eq!(item2, test_digest(2));
let err = journal.get(1, 2).await;
assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
let err = journal.get(3, 0).await;
assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_replay() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for i in 0u64..10 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
for i in 10u64..20 {
journal
.append(2, &test_digest(i))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
let items = {
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
match result {
Ok((section, pos, item)) => items.push((section, pos, item)),
Err(err) => panic!("replay error: {err}"),
}
}
items
};
assert_eq!(items.len(), 20);
for (i, item) in items.iter().enumerate().take(10) {
assert_eq!(item.0, 1);
assert_eq!(item.1, i as u64);
assert_eq!(item.2, test_digest(i as u64));
}
for (i, item) in items.iter().enumerate().skip(10).take(10) {
assert_eq!(item.0, 2);
assert_eq!(item.1, (i - 10) as u64);
assert_eq!(item.2, test_digest(i as u64));
}
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_replay_with_start_offset() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for i in 0u64..10 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
for i in 10u64..15 {
journal
.append(2, &test_digest(i))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
{
let stream = journal
.replay(1, 5, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, pos, item) = result.expect("replay error");
items.push((section, pos, item));
}
assert_eq!(
items.len(),
10,
"Should have 5 items from section 1 + 5 from section 2"
);
for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
assert_eq!(*section, 1);
assert_eq!(*pos, (i + 5) as u64);
assert_eq!(*item, test_digest((i + 5) as u64));
}
for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
assert_eq!(*section, 2);
assert_eq!(*pos, (i - 5) as u64);
assert_eq!(*item, test_digest((i + 5) as u64));
}
}
{
let stream = journal
.replay(1, 9, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, pos, item) = result.expect("replay error");
items.push((section, pos, item));
}
assert_eq!(
items.len(),
6,
"Should have 1 item from section 1 + 5 from section 2"
);
assert_eq!(items[0], (1, 9, test_digest(9)));
for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
assert_eq!(*section, 2);
assert_eq!(*pos, (i - 1) as u64);
assert_eq!(*item, test_digest((i + 9) as u64));
}
}
{
let stream = journal
.replay(2, 3, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, pos, item) = result.expect("replay error");
items.push((section, pos, item));
}
assert_eq!(items.len(), 2, "Should have 2 items from section 2");
assert_eq!(items[0], (2, 3, test_digest(13)));
assert_eq!(items[1], (2, 4, test_digest(14)));
}
let result = journal.replay(1, 100, NZUsize!(1024)).await;
assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
drop(result);
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
for section in 1u64..=5 {
journal
.append(section, &test_digest(section))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
journal.prune(3).await.expect("failed to prune");
let err = journal.get(1, 0).await;
assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
let err = journal.get(2, 0).await;
assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
let item = journal.get(3, 0).await.expect("should exist");
assert_eq!(item, test_digest(3));
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_rewind() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
for section in 1u64..=3 {
journal
.append(section, &test_digest(section))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
for section in 1u64..=3 {
let size = journal.size(section).await.expect("failed to get size");
assert!(size > 0, "section {section} should have data");
}
let size = journal.size(1).await.expect("failed to get size");
journal.rewind(1, size).await.expect("failed to rewind");
let size = journal.size(1).await.expect("failed to get size");
assert!(size > 0, "section 1 should still have data");
for section in 2u64..=3 {
let size = journal.size(section).await.expect("failed to get size");
assert_eq!(size, 0, "section {section} should be removed");
}
let item = journal.get(1, 0).await.expect("failed to get");
assert_eq!(item, test_digest(1));
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_rewind_many_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
for section in 1u64..=10 {
journal
.append(section, &test_digest(section))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
let size = journal.size(5).await.expect("failed to get size");
journal.rewind(5, size).await.expect("failed to rewind");
for section in 1u64..=5 {
let size = journal.size(section).await.expect("failed to get size");
assert!(size > 0, "section {section} should still have data");
}
for section in 6u64..=10 {
let size = journal.size(section).await.expect("failed to get size");
assert_eq!(size, 0, "section {section} should be removed");
}
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, item) = result.expect("failed to read");
items.push((section, item));
}
assert_eq!(items.len(), 5);
for (i, (section, item)) in items.iter().enumerate() {
assert_eq!(*section, (i + 1) as u64);
assert_eq!(*item, test_digest((i + 1) as u64));
}
}
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_rewind_persistence() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for section in 1u64..=5 {
journal
.append(section, &test_digest(section))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
let size = journal.size(2).await.expect("failed to get size");
journal.rewind(2, size).await.expect("failed to rewind");
journal.sync_all().await.expect("failed to sync");
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
for section in 1u64..=2 {
let size = journal.size(section).await.expect("failed to get size");
assert!(size > 0, "section {section} should have data after restart");
}
for section in 3u64..=5 {
let size = journal.size(section).await.expect("failed to get size");
assert_eq!(size, 0, "section {section} should be gone after restart");
}
let item1 = journal.get(1, 0).await.expect("failed to get");
assert_eq!(item1, test_digest(1));
let item2 = journal.get(2, 0).await.expect("failed to get");
assert_eq!(item2, test_digest(2));
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_corruption_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for i in 0u64..5 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
drop(journal);
let (blob, size) = context
.open(&cfg.partition, &1u64.to_be_bytes())
.await
.expect("failed to open blob");
blob.resize(size - 1).await.expect("failed to truncate");
blob.sync().await.expect("failed to sync");
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
let count = {
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
result.expect("should be ok");
count += 1;
}
count
};
assert_eq!(count, 4);
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_persistence() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for i in 0u64..5 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
.await
.expect("failed to re-init");
for i in 0u64..5 {
let item = journal.get(1, i).await.expect("failed to get");
assert_eq!(item, test_digest(i));
}
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_section_len() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
assert_eq!(journal.section_len(1).await.unwrap(), 0);
for i in 0u64..5 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
assert_eq!(journal.section_len(1).await.unwrap(), 5);
assert_eq!(journal.section_len(2).await.unwrap(), 0);
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_non_contiguous_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
journal
.append(1, &test_digest(100))
.await
.expect("failed to append");
journal
.append(5, &test_digest(500))
.await
.expect("failed to append");
journal
.append(10, &test_digest(1000))
.await
.expect("failed to append");
journal.sync_all().await.expect("failed to sync");
assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
let result = journal.get(missing_section, 0).await;
assert!(
matches!(result, Err(Error::SectionOutOfRange(_))),
"Expected SectionOutOfRange for section {}, got {:?}",
missing_section,
result
);
}
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, item) = result.expect("replay error");
items.push((section, item));
}
assert_eq!(items.len(), 3, "Should have 3 items");
assert_eq!(items[0], (1, test_digest(100)));
assert_eq!(items[1], (5, test_digest(500)));
assert_eq!(items[2], (10, test_digest(1000)));
}
{
let stream = journal
.replay(5, 0, NZUsize!(1024))
.await
.expect("failed to replay from section 5");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, item) = result.expect("replay error");
items.push((section, item));
}
assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
assert_eq!(items[0], (5, test_digest(500)));
assert_eq!(items[1], (10, test_digest(1000)));
}
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_empty_section_in_middle() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
journal
.append(1, &test_digest(100))
.await
.expect("failed to append");
journal
.append(2, &test_digest(200))
.await
.expect("failed to append");
journal.sync(2).await.expect("failed to sync");
journal
.rewind_section(2, 0)
.await
.expect("failed to rewind");
journal
.append(3, &test_digest(300))
.await
.expect("failed to append");
journal.sync_all().await.expect("failed to sync");
assert_eq!(journal.section_len(1).await.unwrap(), 1);
assert_eq!(journal.section_len(2).await.unwrap(), 0);
assert_eq!(journal.section_len(3).await.unwrap(), 1);
drop(journal);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
{
let stream = journal
.replay(0, 0, NZUsize!(1024))
.await
.expect("failed to replay");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, item) = result.expect("replay error");
items.push((section, item));
}
assert_eq!(
items.len(),
2,
"Should have 2 items (skipping empty section)"
);
assert_eq!(items[0], (1, test_digest(100)));
assert_eq!(items[1], (3, test_digest(300)));
}
{
let stream = journal
.replay(2, 0, NZUsize!(1024))
.await
.expect("failed to replay from section 2");
pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
let (section, _, item) = result.expect("replay error");
items.push((section, item));
}
assert_eq!(items.len(), 1, "Should have 1 item from section 3");
assert_eq!(items[0], (3, test_digest(300)));
}
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.with_label("first"), cfg.clone())
.await
.expect("failed to init");
for i in 0u64..3 {
journal
.append(1, &test_digest(i))
.await
.expect("failed to append");
}
journal.sync_all().await.expect("failed to sync");
for i in 0u64..3 {
let item = journal.get(1, i).await.expect("failed to get");
assert_eq!(item, test_digest(i));
}
drop(journal);
let (blob, size) = context
.open(&cfg.partition, &1u64.to_be_bytes())
.await
.expect("failed to open blob");
blob.resize(size - 1).await.expect("failed to truncate");
blob.sync().await.expect("failed to sync");
drop(blob);
let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
.await
.expect("failed to re-init");
assert_eq!(journal.section_len(1).await.unwrap(), 2);
assert_eq!(journal.size(1).await.unwrap(), 64);
let item0 = journal.get(1, 0).await.expect("failed to get item 0");
assert_eq!(item0, test_digest(0));
let item1 = journal.get(1, 1).await.expect("failed to get item 1");
assert_eq!(item1, test_digest(1));
let err = journal.get(1, 2).await;
assert!(
matches!(err, Err(Error::ItemOutOfRange(2))),
"expected ItemOutOfRange(2), got {:?}",
err
);
journal.destroy().await.expect("failed to destroy");
});
}
#[test_traced]
fn test_journal_clear() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "clear-test".into(),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
write_buffer: NZUsize!(1024),
};
let mut journal: Journal<_, Digest> =
Journal::init(context.with_label("journal"), cfg.clone())
.await
.expect("Failed to initialize journal");
for section in 0..5u64 {
for i in 0..10u64 {
journal
.append(section, &test_digest(section * 1000 + i))
.await
.expect("Failed to append");
}
journal.sync(section).await.expect("Failed to sync");
}
assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
journal.clear().await.expect("Failed to clear");
for section in 0..5u64 {
assert!(matches!(
journal.get(section, 0).await,
Err(Error::SectionOutOfRange(s)) if s == section
));
}
for i in 0..5u64 {
journal
.append(10, &test_digest(i * 100))
.await
.expect("Failed to append after clear");
}
journal.sync(10).await.expect("Failed to sync after clear");
assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
assert!(matches!(
journal.get(0, 0).await,
Err(Error::SectionOutOfRange(0))
));
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_last_missing_section_returns_error() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
assert!(matches!(
journal.last(0).await,
Err(Error::SectionOutOfRange(0))
));
assert!(matches!(
journal.last(99).await,
Err(Error::SectionOutOfRange(99))
));
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_last_after_rewind_to_zero() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
journal.append(0, &test_digest(0)).await.unwrap();
journal.append(0, &test_digest(1)).await.unwrap();
journal.sync(0).await.unwrap();
assert!(journal.last(0).await.unwrap().is_some());
journal.rewind(0, 0).await.unwrap();
assert_eq!(journal.last(0).await.unwrap(), None);
journal.destroy().await.unwrap();
});
}
#[test_traced]
fn test_last_pruned_section_returns_error() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg(&context);
let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
.await
.expect("failed to init");
journal.append(0, &test_digest(0)).await.unwrap();
journal.append(1, &test_digest(1)).await.unwrap();
journal.sync_all().await.unwrap();
journal.prune(1).await.unwrap();
assert!(matches!(
journal.last(0).await,
Err(Error::AlreadyPrunedToSection(1))
));
assert!(journal.last(1).await.unwrap().is_some());
journal.destroy().await.unwrap();
});
}
}