pub mod paged;
mod read;
mod tip;
mod write;
pub use read::Read;
pub use write::Write;
#[cfg(test)]
mod tests {
use super::*;
use crate::{
deterministic, Blob as _, BufMut, Clock, Error, IoBufMut, IoBufs, IoBufsMut, Runner,
Spawner, Storage,
};
use commonware_macros::test_traced;
use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize};
use futures::{pin_mut, FutureExt};
use std::{sync::Arc, time::Duration};
struct BlockingReadGate {
read_started: Option<oneshot::Sender<()>>,
release_read: Option<oneshot::Receiver<()>>,
}
#[derive(Clone)]
struct BlockingReadBlob {
data: Arc<Mutex<Vec<u8>>>,
gate: Arc<Mutex<BlockingReadGate>>,
}
impl BlockingReadBlob {
fn new(data: Vec<u8>) -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (read_started_tx, read_started_rx) = oneshot::channel();
let (release_read_tx, release_read_rx) = oneshot::channel();
(
Self {
data: Arc::new(Mutex::new(data)),
gate: Arc::new(Mutex::new(BlockingReadGate {
read_started: Some(read_started_tx),
release_read: Some(release_read_rx),
})),
},
read_started_rx,
release_read_tx,
)
}
async fn block_once_on_read(&self) {
let rx = {
let mut gate = self.gate.lock();
gate.read_started.take().map(|read_started| {
let _ = read_started.send(());
gate.release_read.take().expect("release signal missing")
})
};
if let Some(rx) = rx {
let _ = rx.await;
}
}
}
impl crate::Blob for BlockingReadBlob {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
self.read_at_buf(offset, len, IoBufMut::default()).await
}
async fn read_at_buf(
&self,
offset: u64,
len: usize,
buf: impl Into<IoBufsMut> + Send,
) -> Result<IoBufsMut, Error> {
self.block_once_on_read().await;
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
let data = self.data.lock();
if end > data.len() {
return Err(Error::BlobInsufficientLength);
}
let mut out = buf.into();
out.put_slice(&data[start..end]);
Ok(out)
}
async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
let buf = buf.into().coalesce();
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
let mut data = self.data.lock();
if end > data.len() {
data.resize(end, 0);
}
data[start..end].copy_from_slice(buf.as_ref());
Ok(())
}
async fn resize(&self, len: u64) -> Result<(), Error> {
let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
self.data.lock().resize(len, 0);
Ok(())
}
async fn sync(&self) -> Result<(), Error> {
Ok(())
}
}
#[test_traced]
fn test_read_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"Hello, world! This is a test.";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"Hello");
let read = reader.read(14).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b", world! This ");
assert_eq!(reader.position(), 19);
let read = reader.read(7).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"is a te");
let result = reader.read(5).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_cross_boundary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(15).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNO");
assert_eq!(reader.position(), 15);
let read = reader.read(11).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"PQRSTUVWXYZ");
assert_eq!(reader.position(), 26);
assert_eq!(reader.blob_remaining(), 0);
});
}
#[test_traced]
fn test_read_to_end_then_rewind_and_read_again() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
let read = reader.read(21).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
assert_eq!(reader.position(), 21);
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"VWXYZ");
reader.seek_to(0).unwrap();
let read = reader.read(21).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
});
}
#[test_traced]
fn test_read_with_known_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"This is a test with known size limitations.";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
assert_eq!(reader.blob_remaining(), size);
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"This ");
assert_eq!(reader.blob_remaining(), size - 5);
let read = reader.read((size - 5) as usize).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"is a test with known size limitations.");
assert_eq!(reader.blob_remaining(), 0);
let result = reader.read(1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_oversized_request_does_not_consume_buffered_bytes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"abcdefghij";
let (blob, size) = context
.open("partition", b"double-count-regression")
.await
.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(8));
let first = reader.read(6).await.unwrap().coalesce();
assert_eq!(first.as_ref(), b"abcdef");
assert_eq!(reader.position(), 6);
let err = reader.read(5).await.unwrap_err();
assert!(matches!(err, Error::BlobInsufficientLength));
assert_eq!(reader.position(), 6);
let tail = reader.read(4).await.unwrap().coalesce();
assert_eq!(tail.as_ref(), b"ghij");
assert_eq!(reader.position(), 10);
});
}
#[test_traced]
fn test_read_large_data() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data_size = 1024 * 256; let data = vec![0x42; data_size];
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data.clone()).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(64 * 1024));
let mut total_read = 0;
let chunk_size = 8 * 1024;
while total_read < data_size {
let to_read = std::cmp::min(chunk_size, data_size - total_read);
let read = reader.read(to_read).await.unwrap().coalesce();
assert!(
read.as_ref().iter().all(|&b| b == 0x42),
"Data at position {total_read} is not correct"
);
total_read += to_read;
}
assert_eq!(total_read, data_size);
let result = reader.read(1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_exact_size_reads() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let buffer_size = 1024;
let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data.clone()).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer_size));
let read = reader.read(buffer_size).await.unwrap().coalesce();
assert!(read.as_ref().iter().all(|&b| b == 0x37));
let read = reader.read(buffer_size).await.unwrap().coalesce();
assert!(read.as_ref().iter().all(|&b| b == 0x37));
let half_buffer = buffer_size / 2;
let read = reader.read(half_buffer).await.unwrap().coalesce();
assert!(read.as_ref().iter().all(|&b| b == 0x37));
assert_eq!(reader.blob_remaining(), 0);
assert_eq!(reader.position(), size);
});
}
#[test_traced]
fn test_read_structure_single_vs_chunked() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKL";
let (blob, size) = context.open("partition", b"structural").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(5));
let first = reader.read(3).await.unwrap();
assert!(first.is_single());
assert_eq!(first.coalesce().as_ref(), b"ABC");
let second = reader.read(7).await.unwrap();
assert!(!second.is_single());
assert_eq!(second.coalesce().as_ref(), b"DEFGHIJ");
});
}
#[test_traced]
fn test_read_seek_to() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDE");
assert_eq!(reader.position(), 5);
reader.seek_to(10).unwrap();
assert_eq!(reader.position(), 10);
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"KLMNO");
reader.seek_to(0).unwrap();
assert_eq!(reader.position(), 0);
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDE");
reader.seek_to(size).unwrap();
assert_eq!(reader.position(), size);
let result = reader.read(1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
let result = reader.seek_to(size + 10);
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_read_seek_with_refill() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data.clone()).await.unwrap();
let size = data.len() as u64;
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let _ = reader.read(5).await.unwrap().coalesce();
reader.seek_to(500).unwrap();
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"AAAAA"); assert_eq!(reader.position(), 505);
reader.seek_to(100).unwrap();
let _ = reader.read(5).await.unwrap().coalesce();
assert_eq!(reader.position(), 105);
});
}
#[test_traced]
fn test_read_seek_within_buffered_range() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
let read = reader.read(6).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ABCDEF");
assert_eq!(reader.position(), 6);
assert_eq!(reader.buffer_remaining(), 4);
reader.seek_to(3).unwrap();
assert_eq!(reader.position(), 3);
assert_eq!(reader.buffer_remaining(), 7);
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"DEFGH");
assert_eq!(reader.position(), 8);
assert_eq!(reader.buffer_remaining(), 2);
});
}
#[test_traced]
fn test_read_seek_within_unread_buffer_does_not_refill() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context
.open("partition", b"seek_unread_no_refill")
.await
.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
let first = reader.read(6).await.unwrap();
assert_eq!(first.coalesce().as_ref(), b"ABCDEF");
assert_eq!(reader.position(), 6);
assert_eq!(reader.buffer_remaining(), 4);
reader.seek_to(7).unwrap();
assert_eq!(reader.position(), 7);
assert_eq!(reader.buffer_remaining(), 3);
let second = reader.read(3).await.unwrap();
assert_eq!(second.coalesce().as_ref(), b"HIJ");
assert_eq!(reader.position(), 10);
assert_eq!(reader.buffer_remaining(), 0);
let third = reader.read(1).await.unwrap();
assert_eq!(third.coalesce().as_ref(), b"K");
assert_eq!(reader.position(), 11);
assert_eq!(reader.buffer_remaining(), 9);
});
}
#[test_traced]
fn test_read_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let data_len = data.len() as u64;
let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
let resize_len = data_len / 2;
reader.resize(resize_len).await.unwrap();
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, resize_len, "Blob should be resized to half size");
let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = new_reader.read(size as usize).await.unwrap().coalesce();
assert_eq!(
read.as_ref(),
b"ABCDEFGHIJKLM",
"Resized content should match"
);
let result = new_reader.read(1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
new_reader.resize(data_len * 2).await.unwrap();
let (blob, new_size) = context.open("partition", b"test").await.unwrap();
assert_eq!(new_size, data_len * 2);
let mut new_reader = Read::from_pooler(&context, blob, new_size, NZUsize!(10));
let read = new_reader.read(new_size as usize).await.unwrap().coalesce();
assert_eq!(&read.as_ref()[..size as usize], b"ABCDEFGHIJKLM");
assert_eq!(
&read.as_ref()[size as usize..],
vec![0u8; new_size as usize - size as usize]
);
});
}
#[test_traced]
fn test_read_resize_to_zero() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let data_len = data.len() as u64;
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0);
blob.write_at(0, data).await.unwrap();
let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
reader.resize(0).await.unwrap();
let (blob, size) = context.open("partition", b"test").await.unwrap();
assert_eq!(size, 0, "Blob should be resized to zero");
let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let result = new_reader.read(1).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
});
}
#[test_traced]
fn test_write_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
assert_eq!(size, 0);
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(8));
writer.write_at(0, b"hello").await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 5);
let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"hello");
});
}
#[test_traced]
fn test_write_multiple_flushes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
assert_eq!(size, 0);
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
writer.write_at(0, b"abc").await.unwrap();
assert_eq!(writer.size().await, 3);
writer.write_at(3, b"defg").await.unwrap();
assert_eq!(writer.size().await, 7);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
assert_eq!(size, 7);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
let read = reader.read(7).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"abcdefg");
});
}
#[test_traced]
fn test_write_large_data() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_large").await.unwrap();
assert_eq!(size, 0);
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
writer.write_at(0, b"abc").await.unwrap();
assert_eq!(writer.size().await, 3);
writer
.write_at(3, b"defghijklmnopqrstuvwxyz")
.await
.unwrap();
assert_eq!(writer.size().await, 26);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 26);
let (blob, size) = context.open("partition", b"write_large").await.unwrap();
assert_eq!(size, 26);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
let read = reader.read(26).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"abcdefghijklmnopqrstuvwxyz");
});
}
#[test_traced]
fn test_write_append_to_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(0, b"hello").await.unwrap();
assert_eq!(writer.size().await, 5);
writer.write_at(5, b" world").await.unwrap();
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 11);
let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
assert_eq!(size, 11);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(11).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"hello world");
});
}
#[test_traced]
fn test_write_into_middle_of_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
writer.write_at(0, b"abcdefghij").await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(2, b"01234").await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
assert_eq!(size, 10);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(10).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ab01234hij");
writer.write_at(10, b"klmnopqrst").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.write_at(9, b"wxyz").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
let read = reader.read(20).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"ab01234hiwxyznopqrst");
});
}
#[test_traced]
fn test_write_before_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(10, b"0123456789").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.write_at(0, b"abcde").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
let read = reader.read(20).await.unwrap().coalesce();
let mut expected = vec![0u8; 20];
expected[0..5].copy_from_slice("abcde".as_bytes());
expected[10..20].copy_from_slice("0123456789".as_bytes());
assert_eq!(read.as_ref(), expected.as_slice());
writer.write_at(5, b"fghij").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 20);
let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
assert_eq!(size, 20);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
let read = reader.read(20).await.unwrap().coalesce();
expected[0..10].copy_from_slice("abcdefghij".as_bytes());
assert_eq!(read.as_ref(), expected.as_slice());
});
}
#[test_traced]
fn test_write_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
let writer = Write::from_pooler(&context, blob, size, NZUsize!(10));
writer.write_at(0, b"hello world").await.unwrap();
assert_eq!(writer.size().await, 11);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 11);
let (blob_check, size_check) =
context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size_check, 11);
drop(blob_check);
writer.resize(5).await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"hello");
writer.write_at(0, b"X").await.unwrap();
assert_eq!(writer.size().await, 5);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 5);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
let read = reader.read(5).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"Xello");
writer.resize(10).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
assert_eq!(size, 10);
let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
let read = reader.read(10).await.unwrap().coalesce();
assert_eq!(&read.as_ref()[0..5], b"Xello");
assert_eq!(&read.as_ref()[5..10], [0u8; 5]);
let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
let writer_zero = Write::from_pooler(&context, blob_zero.clone(), size, NZUsize!(10));
writer_zero.write_at(0, b"some data").await.unwrap();
assert_eq!(writer_zero.size().await, 9);
writer_zero.sync().await.unwrap();
assert_eq!(writer_zero.size().await, 9);
writer_zero.resize(0).await.unwrap();
assert_eq!(writer_zero.size().await, 0);
writer_zero.sync().await.unwrap();
assert_eq!(writer_zero.size().await, 0);
let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
assert_eq!(size_z, 0);
});
}
#[test_traced]
fn test_write_read_at_on_writer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(0, b"buffered").await.unwrap();
assert_eq!(writer.size().await, 8);
let read_buf_vec = writer.read_at(0, 4).await.unwrap().coalesce();
assert_eq!(read_buf_vec, b"buff");
let read_buf_vec = writer.read_at(4, 4).await.unwrap().coalesce();
assert_eq!(read_buf_vec, b"ered");
assert!(writer.read_at(8, 1).await.is_err());
writer.write_at(8, b" and flushed").await.unwrap();
assert_eq!(writer.size().await, 20);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 20);
let read_buf_vec_2 = writer.read_at(0, 4).await.unwrap().coalesce();
assert_eq!(read_buf_vec_2, b"buff");
let read_buf_7_vec = writer.read_at(13, 7).await.unwrap().coalesce();
assert_eq!(read_buf_7_vec, b"flushed");
writer.write_at(20, b" more data").await.unwrap();
assert_eq!(writer.size().await, 30);
let read_buf_vec_3 = writer.read_at(20, 5).await.unwrap().coalesce();
assert_eq!(read_buf_vec_3, b" more");
let combo_read_buf_vec = writer.read_at(16, 12).await.unwrap();
assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 30);
let (final_blob, final_size) =
context.open("partition", b"read_at_writer").await.unwrap();
assert_eq!(final_size, 30);
let mut final_reader =
Read::from_pooler(&context, final_blob, final_size, NZUsize!(30));
let read = final_reader.read(30).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"buffered and flushed more data");
});
}
#[test_traced]
fn test_write_zero_length_read_past_eof_errors() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"zero_len_probe").await.unwrap();
let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
writer.write_at(0, b"abc").await.unwrap();
let empty = writer.read_at(3, 0).await.unwrap();
assert!(empty.is_empty());
let err = writer.read_at(4, 0).await.unwrap_err();
assert!(matches!(err, Error::BlobInsufficientLength));
});
}
#[test_traced]
fn test_write_read_at_blocks_concurrent_write_until_persisted_read_completes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, read_started_rx, release_read_tx) =
BlockingReadBlob::new(b"abcdefghij".to_vec());
let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
let reader = writer.clone();
let verifier = writer.clone();
let read_task = context
.clone()
.spawn(move |_| async move { reader.read_at(0, 4).await.expect("read failed") });
read_started_rx.await.expect("read start signal missing");
let write_task = context.clone().spawn(move |_| async move {
writer.write_at(0, b"WXYZ").await.expect("write failed");
});
pin_mut!(write_task);
context.sleep(Duration::from_secs(1)).await;
assert!(
write_task.as_mut().now_or_never().is_none(),
"write_at completed while read_at still held lock over blob I/O"
);
release_read_tx
.send(())
.expect("failed to release blocked read");
let read_result = read_task.await.expect("read task failed").coalesce();
assert_eq!(read_result.as_ref(), b"abcd");
write_task.await.expect("write task failed");
let updated = verifier.read_at(0, 4).await.unwrap().coalesce();
assert_eq!(updated.as_ref(), b"WXYZ");
});
}
#[test_traced]
fn test_write_read_at_overlap_blocks_concurrent_write_until_persisted_read_completes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, read_started_rx, release_read_tx) =
BlockingReadBlob::new(b"abcdefghij".to_vec());
let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
let verifier = writer.clone();
writer.write_at(10, b"XYZ").await.unwrap();
let reader = writer.clone();
let read_task = context
.clone()
.spawn(move |_| async move { reader.read_at(8, 5).await.expect("read failed") });
read_started_rx.await.expect("read start signal missing");
let write_task = context.clone().spawn(move |_| async move {
writer.write_at(10, b"UVW").await.expect("write failed");
});
pin_mut!(write_task);
context.sleep(Duration::from_secs(1)).await;
assert!(
write_task.as_mut().now_or_never().is_none(),
"write_at completed while overlap read_at still held lock over blob I/O"
);
release_read_tx
.send(())
.expect("failed to release blocked read");
let read_result = read_task.await.expect("read task failed").coalesce();
assert_eq!(read_result.as_ref(), b"ijXYZ");
write_task.await.expect("write task failed");
let updated = verifier.read_at(8, 5).await.unwrap().coalesce();
assert_eq!(updated.as_ref(), b"ijUVW");
});
}
#[test_traced]
fn test_write_straddling_non_mergeable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(0, b"0123456789").await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(15, b"abc").await.unwrap();
assert_eq!(writer.size().await, 18);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 18);
let (blob_check, size_check) =
context.open("partition", b"write_straddle").await.unwrap();
assert_eq!(size_check, 18);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(20));
let read = reader.read(18).await.unwrap().coalesce();
let mut expected = vec![0u8; 18];
expected[0..10].copy_from_slice(b"0123456789");
expected[15..18].copy_from_slice(b"abc");
assert_eq!(read.as_ref(), expected.as_slice());
let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
let writer2 = Write::from_pooler(&context, blob2.clone(), size, NZUsize!(10));
writer2.write_at(0, b"0123456789").await.unwrap();
assert_eq!(writer2.size().await, 10);
writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
assert_eq!(writer2.size().await, 17);
writer2.sync().await.unwrap();
assert_eq!(writer2.size().await, 17);
let (blob_check2, size_check2) =
context.open("partition", b"write_straddle2").await.unwrap();
assert_eq!(size_check2, 17);
let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(20));
let read = reader2.read(17).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"01234ABCDEFGHIJKL");
});
}
#[test_traced]
fn test_write_close() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
let writer = Write::from_pooler(&context, blob_orig.clone(), size, NZUsize!(8));
writer.write_at(0, b"pending").await.unwrap();
assert_eq!(writer.size().await, 7);
writer.sync().await.unwrap();
let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
assert_eq!(size_check, 7);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(8));
let read = reader.read(7).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"pending");
});
}
#[test_traced]
fn test_write_direct_due_to_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
let data_large = b"0123456789";
writer.write_at(0, data_large).await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
let (blob_check, size_check) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
assert_eq!(size_check, 10);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
let read = reader.read(10).await.unwrap().coalesce();
assert_eq!(read.as_ref(), data_large.as_slice());
writer.write_at(10, b"abc").await.unwrap();
assert_eq!(writer.size().await, 13);
let read_small_buf_vec = writer.read_at(10, 3).await.unwrap().coalesce();
assert_eq!(read_small_buf_vec, b"abc");
writer.sync().await.unwrap();
let (blob_check2, size_check2) = context
.open("partition", b"write_direct_size")
.await
.unwrap();
assert_eq!(size_check2, 13);
let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(13));
let read = reader2.read(13).await.unwrap().coalesce();
assert_eq!(&read.as_ref()[10..], b"abc".as_slice());
});
}
#[test_traced]
fn test_write_overwrite_and_extend_in_buffer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"overwrite_extend_buf")
.await
.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(15));
writer.write_at(0, b"0123456789").await.unwrap();
assert_eq!(writer.size().await, 10);
writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
assert_eq!(writer.size().await, 15);
let read_buf_vec = writer.read_at(0, 15).await.unwrap().coalesce();
assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
writer.sync().await.unwrap();
let (blob_check, size_check) = context
.open("partition", b"overwrite_extend_buf")
.await
.unwrap();
assert_eq!(size_check, 15);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(15));
let read = reader.read(15).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"01234ABCDEFGHIJ".as_slice());
});
}
#[test_traced]
fn test_write_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context.open("partition", b"write_end").await.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
writer.write_at(0, b"0123456789").await.unwrap();
assert_eq!(writer.size().await, 10);
writer.sync().await.unwrap();
writer.write_at(writer.size().await, b"abc").await.unwrap();
assert_eq!(writer.size().await, 13);
writer.sync().await.unwrap();
let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
assert_eq!(size_check, 13);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(13));
let read = reader.read(13).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"0123456789abc");
});
}
#[test_traced]
fn test_write_at_size_multiple_appends() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_multiple_appends_at_size")
.await
.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
writer.write_at(0, b"AAA").await.unwrap();
assert_eq!(writer.size().await, 3);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 3);
writer.write_at(writer.size().await, b"BBB").await.unwrap();
assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 6);
writer.write_at(writer.size().await, b"CCC").await.unwrap();
assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 9);
let (blob_check, size_check) = context
.open("partition", b"write_multiple_appends_at_size")
.await
.unwrap();
assert_eq!(size_check, 9);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(9));
let read = reader.read(9).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"AAABBBCCC");
});
}
#[test_traced]
fn test_write_non_contiguous_then_append_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"write_non_contiguous_then_append")
.await
.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(0, b"INITIAL").await.unwrap(); assert_eq!(writer.size().await, 7);
writer.write_at(20, b"NONCONTIG").await.unwrap();
assert_eq!(writer.size().await, 29);
writer.sync().await.unwrap();
assert_eq!(writer.size().await, 29);
writer
.write_at(writer.size().await, b"APPEND")
.await
.unwrap();
assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 35);
let (blob_check, size_check) = context
.open("partition", b"write_non_contiguous_then_append")
.await
.unwrap();
assert_eq!(size_check, 35);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(35));
let read = reader.read(35).await.unwrap().coalesce();
let mut expected = vec![0u8; 35];
expected[0..7].copy_from_slice(b"INITIAL");
expected[20..29].copy_from_slice(b"NONCONTIG");
expected[29..35].copy_from_slice(b"APPEND");
assert_eq!(read.as_ref(), expected.as_slice());
});
}
#[test_traced]
fn test_resize_then_append_at_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (blob, size) = context
.open("partition", b"resize_then_append_at_size")
.await
.unwrap();
let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); assert_eq!(writer.size().await, 16);
writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
let resize_to = 5;
writer.resize(resize_to).await.unwrap();
assert_eq!(writer.size().await, resize_to);
writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
writer
.write_at(writer.size().await, b"XXXXX")
.await
.unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
assert_eq!(writer.size().await, 10);
let (blob_check, size_check) = context
.open("partition", b"resize_then_append_at_size")
.await
.unwrap();
assert_eq!(size_check, 10);
let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
let read = reader.read(10).await.unwrap().coalesce();
assert_eq!(read.as_ref(), b"01234XXXXX");
});
}
}