use super::{Contiguous, Many, Reader as _};
use crate::{
journal::{contiguous::Mutable, Error},
Persistable,
};
use commonware_utils::NZUsize;
use futures::{future::BoxFuture, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
pub(super) trait PersistableContiguous:
Mutable<Item = u64> + Persistable<Error = Error>
{
}
impl<T> PersistableContiguous for T where T: Mutable<Item = u64> + Persistable<Error = Error> {}
async fn get_bounds<J: Contiguous>(journal: &J) -> std::ops::Range<u64> {
let reader = journal.reader().await;
reader.bounds()
}
async fn read_item<J: Contiguous>(journal: &J, position: u64) -> Result<J::Item, Error> {
let reader = journal.reader().await;
reader.read(position).await
}
pub(super) async fn run_contiguous_tests<F, J>(factory: F)
where
F: Fn(String, usize) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let counter = AtomicUsize::new(0);
let indexed_factory = |name: String| {
let idx = counter.fetch_add(1, Ordering::SeqCst);
factory(name, idx)
};
test_empty_journal_bounds(&indexed_factory).await;
test_bounds_with_items(&indexed_factory).await;
test_bounds_after_prune(&indexed_factory).await;
test_append_and_size(&indexed_factory).await;
test_sequential_appends(&indexed_factory).await;
test_replay_from_start(&indexed_factory).await;
test_replay_from_middle(&indexed_factory).await;
test_prune_retains_size(&indexed_factory).await;
test_through_trait(&indexed_factory).await;
test_replay_after_prune(&indexed_factory).await;
test_prune_then_append(&indexed_factory).await;
test_position_stability(&indexed_factory).await;
test_sync_behavior(&indexed_factory).await;
test_replay_on_empty(&indexed_factory).await;
test_replay_at_exact_size(&indexed_factory).await;
test_multiple_prunes(&indexed_factory).await;
test_prune_beyond_size(&indexed_factory).await;
test_persistence_basic(&indexed_factory).await;
test_persistence_after_prune(&indexed_factory).await;
test_read_by_position(&indexed_factory).await;
test_read_out_of_range(&indexed_factory).await;
test_read_after_prune(&indexed_factory).await;
test_rewind_to_middle(&indexed_factory).await;
test_rewind_to_zero(&indexed_factory).await;
test_rewind_current_size(&indexed_factory).await;
test_rewind_invalid_forward(&indexed_factory).await;
test_rewind_invalid_pruned(&indexed_factory).await;
test_rewind_then_append(&indexed_factory).await;
test_rewind_zero_then_append(&indexed_factory).await;
test_rewind_after_prune(&indexed_factory).await;
test_section_boundary_behavior(&indexed_factory).await;
test_destroy_and_reinit(&indexed_factory).await;
test_append_many_empty(&indexed_factory).await;
test_append_many_basic(&indexed_factory).await;
test_append_many_across_sections(&indexed_factory).await;
test_append_many_then_append(&indexed_factory).await;
test_append_many_single_item(&indexed_factory).await;
}
async fn test_empty_journal_bounds<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let journal = factory("empty".into()).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 0);
assert_eq!(bounds.end, 0);
assert!(bounds.is_empty());
journal.destroy().await.unwrap();
}
async fn test_bounds_with_items<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("bounds-with-items".into()).await.unwrap();
for i in 0..10 {
journal.append(&(i * 100)).await.unwrap();
}
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 0);
assert_eq!(bounds.end, 10);
assert!(!bounds.is_empty());
journal.destroy().await.unwrap();
}
async fn test_bounds_after_prune<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("bounds-after-prune".into()).await.unwrap();
for i in 0..30 {
journal.append(&(i * 100)).await.unwrap();
}
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 0);
assert_eq!(bounds.end, 30);
journal.prune(10).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 10);
assert_eq!(bounds.end, 30);
journal.prune(25).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 20);
assert_eq!(bounds.end, 30);
journal.prune(30).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 30);
assert_eq!(bounds.end, 30);
assert!(bounds.is_empty());
journal.sync().await.unwrap();
drop(journal);
let journal = factory("bounds-after-prune".into()).await.unwrap();
let bounds = get_bounds(&journal).await;
assert!(bounds.is_empty());
journal.destroy().await.unwrap();
}
async fn test_append_and_size<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-and-size".into()).await.unwrap();
let pos1 = journal.append(&100).await.unwrap();
let pos2 = journal.append(&200).await.unwrap();
let pos3 = journal.append(&300).await.unwrap();
assert_eq!(pos1, 0);
assert_eq!(pos2, 1);
assert_eq!(pos3, 2);
assert_eq!(get_bounds(&journal).await.end, 3);
assert_eq!(read_item(&journal, 0).await.unwrap(), 100);
assert_eq!(read_item(&journal, 1).await.unwrap(), 200);
assert_eq!(read_item(&journal, 2).await.unwrap(), 300);
journal.destroy().await.unwrap();
}
async fn test_sequential_appends<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("sequential-appends".into()).await.unwrap();
for i in 0..25u64 {
let pos = journal.append(&(i * 10)).await.unwrap();
assert_eq!(pos, i);
}
assert_eq!(get_bounds(&journal).await.end, 25);
for i in 0..25u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 10);
}
journal.destroy().await.unwrap();
}
async fn test_replay_from_start<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("replay-from-start".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&(i * 10)).await.unwrap();
}
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 0).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 10);
for (i, (pos, value)) in items.iter().enumerate() {
assert_eq!(*pos, i as u64);
assert_eq!(*value, (i as u64) * 10);
}
}
journal.destroy().await.unwrap();
}
async fn test_replay_from_middle<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("replay-from-middle".into()).await.unwrap();
for i in 0..15u64 {
journal.append(&(i * 10)).await.unwrap();
}
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 7).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 8);
for (i, (pos, value)) in items.iter().enumerate() {
assert_eq!(*pos, (i + 7) as u64);
assert_eq!(*value, ((i + 7) as u64) * 10);
}
}
journal.destroy().await.unwrap();
}
async fn test_prune_retains_size<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("prune-retains-size".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&i).await.unwrap();
}
let size_before = get_bounds(&journal).await.end;
journal.prune(10).await.unwrap();
let size_after = get_bounds(&journal).await.end;
assert_eq!(size_before, size_after);
assert_eq!(size_after, 20);
journal.prune(20).await.unwrap();
let size_after_all = get_bounds(&journal).await.end;
assert_eq!(size_after, size_after_all);
journal.sync().await.unwrap();
drop(journal);
let journal = factory("prune-retains-size".into()).await.unwrap();
let size_after_close = get_bounds(&journal).await.end;
assert_eq!(size_after_close, size_after_all);
journal.destroy().await.unwrap();
}
async fn test_through_trait<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("through-trait".into()).await.unwrap();
let pos1 = Mutable::append(&mut journal, &42).await.unwrap();
let pos2 = Mutable::append(&mut journal, &100).await.unwrap();
assert_eq!(pos1, 0);
assert_eq!(pos2, 1);
let size = Contiguous::size(&journal).await;
assert_eq!(size, 2);
journal.destroy().await.unwrap();
}
async fn test_replay_after_prune<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("replay-after-prune".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&(i * 10)).await.unwrap();
}
journal.prune(10).await.unwrap();
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 10).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 10);
for (i, (pos, value)) in items.iter().enumerate() {
assert_eq!(*pos, (i + 10) as u64);
assert_eq!(*value, ((i + 10) as u64) * 10);
}
}
journal.destroy().await.unwrap();
}
async fn test_prune_then_append<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("prune-then-append".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
journal.prune(10).await.unwrap();
assert!(get_bounds(&journal).await.is_empty());
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 10);
assert_eq!(get_bounds(&journal).await.end, 11);
journal.destroy().await.unwrap();
}
async fn test_position_stability<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("position-stability".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&(i * 100)).await.unwrap();
}
journal.prune(10).await.unwrap();
for i in 20..25u64 {
let pos = journal.append(&(i * 100)).await.unwrap();
assert_eq!(pos, i);
}
assert_eq!(read_item(&journal, 10).await.unwrap(), 1000);
assert_eq!(read_item(&journal, 15).await.unwrap(), 1500);
assert_eq!(read_item(&journal, 20).await.unwrap(), 2000);
assert_eq!(read_item(&journal, 24).await.unwrap(), 2400);
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 10).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 15);
for (i, (pos, value)) in items.iter().enumerate() {
let expected_pos = (i + 10) as u64;
assert_eq!(*pos, expected_pos);
assert_eq!(*value, expected_pos * 100);
}
}
journal.destroy().await.unwrap();
}
async fn test_sync_behavior<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("sync-behavior".into()).await.unwrap();
for i in 0..5u64 {
journal.append(&i).await.unwrap();
}
journal.sync().await.unwrap();
assert_eq!(read_item(&journal, 0).await.unwrap(), 0);
let pos = journal.append(&100).await.unwrap();
assert_eq!(pos, 5);
assert_eq!(read_item(&journal, 5).await.unwrap(), 100);
assert_eq!(get_bounds(&journal).await.end, 6);
journal.destroy().await.unwrap();
}
async fn test_replay_on_empty<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let journal = factory("replay-on-empty".into()).await.unwrap();
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 0).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 0);
}
journal.destroy().await.unwrap();
}
async fn test_replay_at_exact_size<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("replay-at-exact-size".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
let bounds = get_bounds(&journal).await;
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), bounds.end).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 0);
}
journal.destroy().await.unwrap();
}
async fn test_multiple_prunes<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("multiple-prunes".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&i).await.unwrap();
}
let pruned1 = journal.prune(10).await.unwrap();
let pruned2 = journal.prune(10).await.unwrap();
assert!(pruned1);
assert!(!pruned2);
assert_eq!(get_bounds(&journal).await.end, 20);
assert_eq!(read_item(&journal, 10).await.unwrap(), 10);
assert_eq!(read_item(&journal, 19).await.unwrap(), 19);
journal.destroy().await.unwrap();
}
async fn test_prune_beyond_size<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("prune-beyond-size".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
journal.prune(100).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 10);
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 10);
assert_eq!(read_item(&journal, 10).await.unwrap(), 999);
journal.destroy().await.unwrap();
}
async fn test_persistence_basic<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let test_name = "persistence-basic".to_string();
{
let mut journal = factory(test_name.clone()).await.unwrap();
for i in 0..15u64 {
let pos = journal.append(&(i * 10)).await.unwrap();
assert_eq!(pos, i);
}
assert_eq!(get_bounds(&journal).await.end, 15);
journal.sync().await.unwrap();
}
{
let journal = factory(test_name.clone()).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 15);
for i in 0..15u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 10);
}
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 0).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 15);
for (i, (pos, value)) in items.iter().enumerate() {
assert_eq!(*pos, i as u64);
assert_eq!(*value, (i as u64) * 10);
}
}
journal.destroy().await.unwrap();
}
}
async fn test_persistence_after_prune<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let test_name = "persistence-after-prune".to_string();
{
let mut journal = factory(test_name.clone()).await.unwrap();
for i in 0..25u64 {
journal.append(&(i * 100)).await.unwrap();
}
let pruned = journal.prune(10).await.unwrap();
assert!(pruned);
assert_eq!(get_bounds(&journal).await.end, 25);
journal.sync().await.unwrap();
}
{
let mut journal = factory(test_name.clone()).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 25);
for i in 0..10u64 {
assert!(matches!(
read_item(&journal, i).await,
Err(Error::ItemPruned(_))
));
}
for i in 10..25u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 100);
}
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 10).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert_eq!(items.len(), 15);
for (i, (pos, value)) in items.iter().enumerate() {
let expected_pos = (i + 10) as u64;
assert_eq!(*pos, expected_pos);
assert_eq!(*value, expected_pos * 100);
}
}
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 25);
assert_eq!(read_item(&journal, 25).await.unwrap(), 999);
journal.destroy().await.unwrap();
}
}
pub(super) async fn test_read_by_position<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("read-by-position".into()).await.unwrap();
for i in 0..1000u64 {
journal.append(&(i * 100)).await.unwrap();
assert_eq!(read_item(&journal, i).await.unwrap(), i * 100);
}
for i in 0..1000u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 100);
}
journal.destroy().await.unwrap();
}
pub(super) async fn test_read_out_of_range<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("read-out-of-range".into()).await.unwrap();
journal.append(&42).await.unwrap();
let result = read_item(&journal, 10).await;
assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
journal.destroy().await.unwrap();
}
pub(super) async fn test_read_after_prune<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("read-after-prune".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&i).await.unwrap();
}
journal.prune(10).await.unwrap();
let bounds = get_bounds(&journal).await;
let result = read_item(&journal, bounds.start - 1).await;
assert!(matches!(result, Err(Error::ItemPruned(_))));
journal.destroy().await.unwrap();
}
async fn test_rewind_to_middle<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-to-middle".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&(i * 100)).await.unwrap();
}
journal.rewind(12).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 12);
for i in 0..12u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 100);
}
for i in 12..20u64 {
assert!(matches!(
read_item(&journal, i).await,
Err(Error::ItemOutOfRange(_))
));
}
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 12);
assert_eq!(read_item(&journal, 12).await.unwrap(), 999);
journal.destroy().await.unwrap();
}
async fn test_rewind_to_zero<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-to-zero".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
journal.rewind(0).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 0);
assert!(bounds.is_empty());
let pos = journal.append(&42).await.unwrap();
assert_eq!(pos, 0);
journal.destroy().await.unwrap();
}
async fn test_rewind_current_size<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-current-size".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
journal.rewind(10).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 10);
journal.destroy().await.unwrap();
}
async fn test_rewind_invalid_forward<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-invalid-forward".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&i).await.unwrap();
}
let result = journal.rewind(20).await;
assert!(matches!(result, Err(Error::InvalidRewind(20))));
journal.destroy().await.unwrap();
}
async fn test_rewind_invalid_pruned<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-invalid-pruned".into()).await.unwrap();
for i in 0..20u64 {
journal.append(&i).await.unwrap();
}
journal.prune(10).await.unwrap();
let result = journal.rewind(5).await;
assert!(matches!(result, Err(Error::ItemPruned(5))));
journal.destroy().await.unwrap();
}
async fn test_rewind_then_append<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-then-append".into()).await.unwrap();
for i in 0..15u64 {
journal.append(&i).await.unwrap();
}
journal.rewind(8).await.unwrap();
let pos1 = journal.append(&888).await.unwrap();
let pos2 = journal.append(&999).await.unwrap();
assert_eq!(pos1, 8);
assert_eq!(pos2, 9);
assert_eq!(read_item(&journal, 8).await.unwrap(), 888);
assert_eq!(read_item(&journal, 9).await.unwrap(), 999);
journal.destroy().await.unwrap();
}
async fn test_rewind_zero_then_append<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-zero-then-append".into()).await.unwrap();
for i in 0..10u64 {
journal.append(&(i * 100)).await.unwrap();
}
journal.rewind(0).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 0);
assert!(bounds.is_empty());
let pos = journal.append(&42).await.unwrap();
assert_eq!(pos, 0);
assert_eq!(get_bounds(&journal).await.end, 1);
assert_eq!(read_item(&journal, 0).await.unwrap(), 42);
journal.destroy().await.unwrap();
}
async fn test_rewind_after_prune<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("rewind-after-prune".into()).await.unwrap();
for i in 0..30u64 {
journal.append(&(i * 100)).await.unwrap();
}
journal.prune(10).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.start, 10);
journal.rewind(20).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 20);
assert_eq!(bounds.start, 10);
for i in bounds.start..20 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 100);
}
let result = journal.rewind(5).await;
assert!(matches!(result, Err(Error::ItemPruned(5))));
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 20);
assert_eq!(bounds.start, 10);
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 20);
assert_eq!(read_item(&journal, 20).await.unwrap(), 999);
assert_eq!(get_bounds(&journal).await.start, 10);
journal.destroy().await.unwrap();
}
async fn test_section_boundary_behavior<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("section-boundary".into()).await.unwrap();
for i in 0..10u64 {
let pos = journal.append(&(i * 100)).await.unwrap();
assert_eq!(pos, i);
}
assert_eq!(get_bounds(&journal).await.end, 10);
let pos = journal.append(&999).await.unwrap();
assert_eq!(pos, 10);
assert_eq!(get_bounds(&journal).await.end, 11);
journal.prune(10).await.unwrap();
assert_eq!(get_bounds(&journal).await.start, 10);
assert!(matches!(
read_item(&journal, 9).await,
Err(Error::ItemPruned(_))
));
assert_eq!(read_item(&journal, 10).await.unwrap(), 999);
let pos = journal.append(&888).await.unwrap();
assert_eq!(pos, 11);
assert_eq!(get_bounds(&journal).await.end, 12);
journal.rewind(10).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 10);
assert!(bounds.is_empty());
let pos = journal.append(&777).await.unwrap();
assert_eq!(pos, 10);
assert_eq!(get_bounds(&journal).await.end, 11);
assert_eq!(read_item(&journal, 10).await.unwrap(), 777);
assert_eq!(get_bounds(&journal).await.start, 10);
journal.destroy().await.unwrap();
}
async fn test_destroy_and_reinit<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let test_name = "destroy-and-reinit".to_string();
{
let mut journal = factory(test_name.clone()).await.unwrap();
for i in 0..20u64 {
journal.append(&(i * 100)).await.unwrap();
}
journal.prune(10).await.unwrap();
assert_eq!(get_bounds(&journal).await.end, 20);
assert!(!get_bounds(&journal).await.is_empty());
journal.destroy().await.unwrap();
}
{
let journal = factory(test_name.clone()).await.unwrap();
let bounds = get_bounds(&journal).await;
assert_eq!(bounds.end, 0);
assert!(bounds.is_empty());
{
let reader = journal.reader().await;
let stream = reader.replay(NZUsize!(1024), 0).await.unwrap();
futures::pin_mut!(stream);
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result.unwrap());
}
assert!(items.is_empty());
}
journal.destroy().await.unwrap();
}
}
async fn test_append_many_empty<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-many-empty".into()).await.unwrap();
journal.append(&10).await.unwrap();
journal.append(&20).await.unwrap();
assert!(matches!(
journal.append_many(Many::Flat(&[])).await,
Err(Error::EmptyAppend)
));
assert_eq!(get_bounds(&journal).await.end, 2);
journal.destroy().await.unwrap();
}
async fn test_append_many_basic<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-many-basic".into()).await.unwrap();
let pos = journal
.append_many(Many::Flat(&[100, 200, 300]))
.await
.unwrap();
assert_eq!(pos, 2);
assert_eq!(get_bounds(&journal).await.end, 3);
assert_eq!(read_item(&journal, 0).await.unwrap(), 100);
assert_eq!(read_item(&journal, 1).await.unwrap(), 200);
assert_eq!(read_item(&journal, 2).await.unwrap(), 300);
journal.destroy().await.unwrap();
}
async fn test_append_many_across_sections<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-many-sections".into()).await.unwrap();
let items: Vec<u64> = (0..25).map(|i| i * 10).collect();
let pos = journal.append_many(Many::Flat(&items)).await.unwrap();
assert_eq!(pos, 24);
assert_eq!(get_bounds(&journal).await.end, 25);
for i in 0..25u64 {
assert_eq!(read_item(&journal, i).await.unwrap(), i * 10);
}
journal.destroy().await.unwrap();
}
async fn test_append_many_then_append<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-many-then-single".into()).await.unwrap();
journal
.append_many(Many::Flat(&[10, 20, 30]))
.await
.unwrap();
let pos = journal.append(&40).await.unwrap();
assert_eq!(pos, 3);
assert_eq!(read_item(&journal, 0).await.unwrap(), 10);
assert_eq!(read_item(&journal, 1).await.unwrap(), 20);
assert_eq!(read_item(&journal, 2).await.unwrap(), 30);
assert_eq!(read_item(&journal, 3).await.unwrap(), 40);
journal.destroy().await.unwrap();
}
async fn test_append_many_single_item<F, J>(factory: &F)
where
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
J: PersistableContiguous,
{
let mut journal = factory("append-many-single".into()).await.unwrap();
let pos = journal.append_many(Many::Flat(&[42])).await.unwrap();
assert_eq!(pos, 0);
assert_eq!(read_item(&journal, 0).await.unwrap(), 42);
journal.destroy().await.unwrap();
}