mod append;
pub mod pool;
mod read;
mod tip;
mod write;
pub use append::Append;
pub use pool::{Pool, PoolRef};
pub use read::Read;
pub use write::Write;
#[cfg(test)]
mod tests {
use super::*;
use crate::{deterministic, Blob as _, Error, Runner, Storage};
use commonware_macros::test_traced;
use commonware_utils::NZUsize;
#[test_traced]
fn test_read_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"Hello, world! This is a test.";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"Hello");
let mut buf = [0u8; 14];
reader.read_exact(&mut buf, 14).await.unwrap();
assert_eq!(&buf, b", world! This ");
assert_eq!(reader.position(), 19);
let mut buf = [0u8; 10];
reader.read_exact(&mut buf, 7).await.unwrap();
assert_eq!(&buf[..7], b"is a te");
let mut buf = [0u8; 5];
let result = reader.read_exact(&mut buf, 5).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_cross_boundary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = [0u8; 15];
reader.read_exact(&mut buf, 15).await.unwrap();
assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
assert_eq!(reader.position(), 15);
let mut buf = [0u8; 11];
reader.read_exact(&mut buf, 11).await.unwrap();
assert_eq!(&buf, b"PQRSTUVWXYZ");
assert_eq!(reader.position(), 26);
assert_eq!(reader.blob_remaining(), 0);
});
}
#[test_traced]
fn test_read_to_end_then_rewind_and_read_again() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(20));
let mut buf = [0u8; 21];
reader.read_exact(&mut buf, 21).await.unwrap();
assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
assert_eq!(reader.position(), 21);
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"VWXYZ");
reader.seek_to(0).unwrap();
let mut buf = [0u8; 21];
reader.read_exact(&mut buf, 21).await.unwrap();
assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
});
}
#[test_traced]
fn test_read_with_known_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"This is a test with known size limitations.";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(10));
assert_eq!(reader.blob_remaining(), size);
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"This ");
assert_eq!(reader.blob_remaining(), size - 5);
let mut buf = vec![0u8; (size - 5) as usize];
reader
.read_exact(&mut buf, (size - 5) as usize)
.await
.unwrap();
assert_eq!(&buf, b"is a test with known size limitations.");
assert_eq!(reader.blob_remaining(), 0);
let mut buf = [0u8; 1];
let result = reader.read_exact(&mut buf, 1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_large_data() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data_size = 1024 * 256; let data = vec![0x42; data_size];
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.clone(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(64 * 1024));
let mut total_read = 0;
let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
while total_read < data_size {
let to_read = std::cmp::min(chunk_size, data_size - total_read);
reader
.read_exact(&mut buf[..to_read], to_read)
.await
.unwrap();
assert!(
buf[..to_read].iter().all(|&b| b == 0x42),
"Data at position {total_read} is not correct"
);
total_read += to_read;
}
assert_eq!(total_read, data_size);
let mut extra_buf = [0u8; 1];
let result = reader.read_exact(&mut extra_buf, 1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_exact_size_reads() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let buffer_size = 1024;
let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.clone(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
let mut buf1 = vec![0u8; buffer_size];
reader.read_exact(&mut buf1, buffer_size).await.unwrap();
assert!(buf1.iter().all(|&b| b == 0x37));
let mut buf2 = vec![0u8; buffer_size];
reader.read_exact(&mut buf2, buffer_size).await.unwrap();
assert!(buf2.iter().all(|&b| b == 0x37));
let half_buffer = buffer_size / 2;
let mut buf3 = vec![0u8; half_buffer];
reader.read_exact(&mut buf3, half_buffer).await.unwrap();
assert!(buf3.iter().all(|&b| b == 0x37));
assert_eq!(reader.blob_remaining(), 0);
assert_eq!(reader.position(), size);
});
}
#[test_traced]
fn test_read_seek_to() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"ABCDE");
assert_eq!(reader.position(), 5);
reader.seek_to(10).unwrap();
assert_eq!(reader.position(), 10);
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"KLMNO");
reader.seek_to(0).unwrap();
assert_eq!(reader.position(), 0);
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"ABCDE");
reader.seek_to(size).unwrap();
assert_eq!(reader.position(), size);
let mut buf = [0u8; 1];
let result = reader.read_exact(&mut buf, 1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
let result = reader.seek_to(size + 10);
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_seek_with_refill() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.clone(), 0).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
reader.seek_to(500).unwrap();
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
reader.seek_to(100).unwrap();
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(reader.position(), 105);
});
}
#[test_traced]
fn test_read_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let data_len = data.len() as u64;
let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
let resize_len = data_len / 2;
reader.resize(resize_len).await.unwrap();
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, resize_len, "Blob should be resized to half size");
let mut new_reader = Read::new(blob, size, NZUsize!(10));
let mut buf = vec![0u8; size as usize];
new_reader
.read_exact(&mut buf, size as usize)
.await
.unwrap();
assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
let mut extra_buf = [0u8; 1];
let result = new_reader.read_exact(&mut extra_buf, 1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
new_reader.resize(data_len * 2).await.unwrap();
let (blob, new_size) = context.open("partition", b"test").await.unwrap();
assert_eq!(new_size, data_len * 2);
let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
let mut buf = vec![0u8; new_size as usize];
new_reader
.read_exact(&mut buf, new_size as usize)
.await
.unwrap();
assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
assert_eq!(
&buf[size as usize..],
vec![0u8; new_size as usize - size as usize]
);
});
}
#[test_traced]
fn test_read_resize_to_zero() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let data_len = data.len() as u64;
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(data.to_vec(), 0).await.unwrap();
let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
reader.resize(0).await.unwrap();
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0, "Blob should be resized to zero");
let mut new_reader = Read::new(blob, size, NZUsize!(10));
let mut buf = [0u8; 1];
let result = new_reader.read_exact(&mut buf, 1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_write_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
assert_eq!(size, 0);
let writer = Write::new(blob.clone(), size, NZUsize!(8));
writer.write_at(b"hello".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 5);
let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::new(blob, size, NZUsize!(8));
let mut buf = [0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"hello");
});
}
#[test_traced]
fn test_write_multiple_flushes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
assert_eq!(size, 0);
let writer = Write::new(blob.clone(), size, NZUsize!(4));
writer.write_at(b"abc".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 3);
writer.write_at(b"defg".to_vec(), 3).await.unwrap();
assert_eq!(writer.size().await, 7);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
assert_eq!(size, 7);
let mut reader = Read::new(blob, size, NZUsize!(4));
let mut buf = [0u8; 7];
reader.read_exact(&mut buf, 7).await.unwrap();
assert_eq!(&buf, b"abcdefg");
});
}
#[test_traced]
fn test_write_large_data() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_large").await.unwrap();
assert_eq!(size, 0);
let writer = Write::new(blob.clone(), size, NZUsize!(4));
writer.write_at(b"abc".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 3);
writer
.write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
.await
.unwrap();
assert_eq!(writer.size().await, 26);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 26);
let (blob, size) = context.open("partition", b"write_large").await.unwrap();
assert_eq!(size, 26);
let mut reader = Read::new(blob, size, NZUsize!(4));
let mut buf = [0u8; 26];
reader.read_exact(&mut buf, 26).await.unwrap();
assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
});
}
#[test_traced]
fn test_write_append_to_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer.write_at(b"hello".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 5);
writer.write_at(b" world".to_vec(), 5).await.unwrap();
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 11);
let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
assert_eq!(size, 11);
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = vec![0u8; 11];
reader.read_exact(&mut buf, 11).await.unwrap();
assert_eq!(&buf, b"hello world");
});
}
#[test_traced]
fn test_write_into_middle_of_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(20));
writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(b"01234".to_vec(), 2).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
assert_eq!(size, 10);
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = vec![0u8; 10];
reader.read_exact(&mut buf, 10).await.unwrap();
assert_eq!(&buf, b"ab01234hij");
writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::new(blob, size, NZUsize!(20));
let mut buf = vec![0u8; 20];
reader.read_exact(&mut buf, 20).await.unwrap();
assert_eq!(&buf, b"ab01234hiwxyznopqrst");
});
}
#[test_traced]
fn test_write_before_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::new(blob, size, NZUsize!(20));
let mut buf = vec![0u8; 20];
reader.read_exact(&mut buf, 20).await.unwrap();
let mut expected = vec![0u8; 20];
expected[0..5].copy_from_slice("abcde".as_bytes());
expected[10..20].copy_from_slice("0123456789".as_bytes());
assert_eq!(buf, expected);
writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 20);
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::new(blob, size, NZUsize!(20));
let mut buf = vec![0u8; 20];
reader.read_exact(&mut buf, 20).await.unwrap();
expected[0..10].copy_from_slice("abcdefghij".as_bytes());
assert_eq!(buf, expected);
});
}
#[test_traced]
fn test_write_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
let writer = Write::new(blob, size, NZUsize!(10));
writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 11);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 11);
let (blob_check, size_check) =
context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size_check, 11);
drop(blob_check);
writer.resize(5).await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::new(blob, size, NZUsize!(5));
let mut buf = vec![0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"hello");
writer.write_at(b"X".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::new(blob, size, NZUsize!(5));
let mut buf = vec![0u8; 5];
reader.read_exact(&mut buf, 5).await.unwrap();
assert_eq!(&buf, b"Xello");
writer.resize(10).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 10);
let mut reader = Read::new(blob, size, NZUsize!(10));
let mut buf = vec![0u8; 10];
reader.read_exact(&mut buf, 10).await.unwrap();
assert_eq!(&buf[0..5], b"Xello");
assert_eq!(&buf[5..10], [0u8; 5]);
let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
writer_zero
.write_at(b"some data".to_vec(), 0)
.await
.unwrap();
assert_eq!(writer_zero.size().await, 9);
writer_zero.sync().await.unwrap();
assert_eq!(writer_zero.size().await, 9);
writer_zero.resize(0).await.unwrap();
assert_eq!(writer_zero.size().await, 0);
writer_zero.sync().await.unwrap();
assert_eq!(writer_zero.size().await, 0);
let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
assert_eq!(size_z, 0);
});
}
#[test_traced]
fn test_write_read_at_on_writer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 8);
let mut read_buf_vec = vec![0u8; 4].into();
read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
assert_eq!(read_buf_vec.as_ref(), b"buff");
read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
assert_eq!(read_buf_vec.as_ref(), b"ered");
let small_buf_vec = vec![0u8; 1];
assert!(writer.read_at(small_buf_vec, 8).await.is_err());
writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 20);
let mut read_buf_vec_2 = vec![0u8; 4].into();
read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
assert_eq!(read_buf_vec_2.as_ref(), b"buff");
let mut read_buf_7_vec = vec![0u8; 7].into();
read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
writer.write_at(b" more data".to_vec(), 20).await.unwrap();
assert_eq!(writer.size().await, 30);
let mut read_buf_vec_3 = vec![0u8; 5].into();
read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
assert_eq!(read_buf_vec_3.as_ref(), b" more");
let mut combo_read_buf_vec = vec![0u8; 12].into();
combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 30);
let (final_blob, final_size) =
context.open("partition", b"read_at_writer").await.unwrap();
assert_eq!(final_size, 30);
let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
let mut full_content = vec![0u8; 30];
final_reader
.read_exact(&mut full_content, 30)
.await
.unwrap();
assert_eq!(&full_content, b"buffered and flushed more data");
});
}
#[test_traced]
fn test_write_straddling_non_mergeable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(b"abc".to_vec(), 15).await.unwrap();
assert_eq!(writer.size().await, 18);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 18);
let (blob_check, size_check) =
context.open("partition", b"write_straddle").await.unwrap();
assert_eq!(size_check, 18);
let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
let mut buf = vec![0u8; 18];
reader.read_exact(&mut buf, 18).await.unwrap();
let mut expected = vec![0u8; 18];
expected[0..10].copy_from_slice(b"0123456789");
expected[15..18].copy_from_slice(b"abc");
assert_eq!(buf, expected);
let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
assert_eq!(writer2.size().await, 10);
writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
assert_eq!(writer2.size().await, 17);
writer2.sync().await.unwrap();
assert_eq!(writer2.size().await, 17);
let (blob_check2, size_check2) =
context.open("partition", b"write_straddle2").await.unwrap();
assert_eq!(size_check2, 17);
let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
let mut buf2 = vec![0u8; 17];
reader2.read_exact(&mut buf2, 17).await.unwrap();
assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
});
}
#[test_traced]
fn test_write_close() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
writer.write_at(b"pending".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 7);
writer.sync().await.unwrap();
let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
assert_eq!(size_check, 7);
let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
let mut buf = [0u8; 7];
reader.read_exact(&mut buf, 7).await.unwrap();
assert_eq!(&buf, b"pending");
});
}
#[test_traced]
fn test_write_direct_due_to_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(5));
let data_large = b"0123456789";
writer.write_at(data_large.to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob_check, size_check) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
assert_eq!(size_check, 10);
let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
let mut buf = vec![0u8; 10];
reader.read_exact(&mut buf, 10).await.unwrap();
assert_eq!(&buf, data_large.as_slice());
writer.write_at(b"abc".to_vec(), 10).await.unwrap();
assert_eq!(writer.size().await, 13);
let mut read_small_buf_vec = vec![0u8; 3].into();
read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
assert_eq!(read_small_buf_vec.as_ref(), b"abc");
writer.sync().await.unwrap();
let (blob_check2, size_check2) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
assert_eq!(size_check2, 13);
let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
let mut buf2 = vec![0u8; 13];
reader2.read_exact(&mut buf2, 13).await.unwrap();
assert_eq!(&buf2[10..], b"abc".as_slice());
});
}
#[test_traced]
fn test_write_overwrite_and_extend_in_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"overwrite_extend_buf")
.await
.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(15));
writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
assert_eq!(writer.size().await, 15);
let mut read_buf_vec = vec![0u8; 15].into();
read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
writer.sync().await.unwrap();
let (blob_check, size_check) = context
.open("partition", b"overwrite_extend_buf")
.await
.unwrap();
assert_eq!(size_check, 15);
let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
let mut final_buf = vec![0u8; 15];
reader.read_exact(&mut final_buf, 15).await.unwrap();
assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
});
}
#[test_traced]
fn test_write_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_end").await.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(20));
writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
writer
.write_at(b"abc".to_vec(), writer.size().await)
.await
.unwrap();
assert_eq!(writer.size().await, 13);
writer.sync().await.unwrap();
let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
assert_eq!(size_check, 13);
let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
let mut buf = vec![0u8; 13];
reader.read_exact(&mut buf, 13).await.unwrap();
assert_eq!(&buf, b"0123456789abc");
});
}
#[test_traced]
fn test_write_at_size_multiple_appends() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_multiple_appends_at_size")
.await
.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(5));
writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
assert_eq!(writer.size().await, 3);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 3);
writer
.write_at(b"BBB".to_vec(), writer.size().await)
.await
.unwrap();
assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 6);
writer
.write_at(b"CCC".to_vec(), writer.size().await)
.await
.unwrap();
assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 9);
let (blob_check, size_check) = context
.open("partition", b"write_multiple_appends_at_size")
.await
.unwrap();
assert_eq!(size_check, 9);
let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
let mut buf = vec![0u8; 9];
reader.read_exact(&mut buf, 9).await.unwrap();
assert_eq!(&buf, b"AAABBBCCC");
});
}
#[test_traced]
fn test_write_non_contiguous_then_append_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_non_contiguous_then_append")
.await
.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); assert_eq!(writer.size().await, 7);
writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
assert_eq!(writer.size().await, 29);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 29);
writer
.write_at(b"APPEND".to_vec(), writer.size().await)
.await
.unwrap();
assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 35);
let (blob_check, size_check) = context
.open("partition", b"write_non_contiguous_then_append")
.await
.unwrap();
assert_eq!(size_check, 35);
let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
let mut buf = vec![0u8; 35];
reader.read_exact(&mut buf, 35).await.unwrap();
let mut expected = vec![0u8; 35];
expected[0..7].copy_from_slice(b"INITIAL");
expected[20..29].copy_from_slice(b"NONCONTIG");
expected[29..35].copy_from_slice(b"APPEND");
assert_eq!(buf, expected);
});
}
#[test_traced]
fn test_resize_then_append_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"resize_then_append_at_size")
.await
.unwrap();
let writer = Write::new(blob.clone(), size, NZUsize!(10));
writer
.write_at(b"0123456789ABCDEF".to_vec(), 0)
.await
.unwrap(); assert_eq!(writer.size().await, 16);
writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
let resize_to = 5;
writer.resize(resize_to).await.unwrap();
assert_eq!(writer.size().await, resize_to);
writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
writer
.write_at(b"XXXXX".to_vec(), writer.size().await)
.await
.unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 10);
let (blob_check, size_check) = context
.open("partition", b"resize_then_append_at_size")
.await
.unwrap();
assert_eq!(size_check, 10);
let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
let mut buf = vec![0u8; 10];
reader.read_exact(&mut buf, 10).await.unwrap();
assert_eq!(&buf, b"01234XXXXX");
});
}
}