#![cfg(feature = "utils")]
use std::sync::Arc;
use bytes::Bytes;
use futures::StreamExt;
use sparseio::Builder;
use sparseio::utils::{counting, fixture, flaky, oracle, tracing};
async fn build_io<R, W>(reader: R, writer: W, chunk_size: usize) -> sparseio::SparseIO<R, W>
where
R: sparseio::Reader + Send + Sync + 'static,
W: sparseio::Writer + Send + Sync + 'static,
{
Builder::new()
.chunk_size(chunk_size)
.reader(reader)
.writer(writer)
.build()
.await
.expect("builder should succeed")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn builder_validation_rejects_missing_fields_and_zero_chunk_size() {
tracing::init();
let reader = oracle::Reader::new(fixture::bytes(32));
let writer = oracle::Writer::default();
let missing_reader = match Builder::<oracle::Reader, oracle::Writer>::new()
.writer(writer.clone())
.build()
.await
{
Ok(_) => panic!("missing reader should fail"),
Err(err) => err,
};
assert_eq!(missing_reader.kind(), std::io::ErrorKind::InvalidInput);
let missing_writer = match Builder::<oracle::Reader, oracle::Writer>::new()
.reader(reader.clone())
.build()
.await
{
Ok(_) => panic!("missing writer should fail"),
Err(err) => err,
};
assert_eq!(missing_writer.kind(), std::io::ErrorKind::InvalidInput);
let zero_chunk = match Builder::new().chunk_size(0).reader(reader).writer(writer).build().await {
Ok(_) => panic!("zero chunk size should fail"),
Err(err) => err,
};
assert_eq!(zero_chunk.kind(), std::io::ErrorKind::InvalidInput);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn aligned_and_unaligned_viewer_reads_match_the_fixture() {
tracing::init();
let fixture = fixture::bytes(96);
let io = Arc::new(
build_io(
counting::Reader::new(oracle::Reader::new(fixture.clone())),
counting::Writer::new(oracle::Writer::default()),
16,
)
.await,
);
let mut viewer = io.viewer();
viewer.seek(5).expect("seek should succeed");
let mut buf = vec![0u8; 37];
let read = viewer.read(&mut buf).await.expect("read should succeed");
assert_eq!(read, 37);
assert_eq!(buf, fixture.slice(5..42).to_vec());
viewer.seek(16).expect("aligned seek should succeed");
let mut aligned = vec![0u8; 16];
let read = viewer.read(&mut aligned).await.expect("aligned read should succeed");
assert_eq!(read, 16);
assert_eq!(aligned, fixture.slice(16..32).to_vec());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cache_transitions_from_uncached_to_cached_without_extra_upstream_reads() {
tracing::init();
let fixture = fixture::bytes(64);
let reader = counting::Reader::new(oracle::Reader::new(fixture.clone()));
let writer = counting::Writer::new(oracle::Writer::default());
let io = Arc::new(build_io(reader.clone(), writer.clone(), 16).await);
let first = io.read_chunk(0).await.expect("first read should succeed");
let second = io.read_chunk(0).await.expect("second read should succeed");
assert_eq!(first, second);
assert_eq!(reader.read_count(), 1, "upstream reader should be used once");
assert_eq!(writer.create_count(), 1, "the extent should be materialized once");
assert_eq!(writer.read_count(), 1, "the cached re-read should come from the writer");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn read_chunk_normalizes_to_the_containing_chunk() {
tracing::init();
let fixture = fixture::bytes(64);
let io = Arc::new(
build_io(
counting::Reader::new(oracle::Reader::new(fixture.clone())),
counting::Writer::new(oracle::Writer::default()),
16,
)
.await,
);
let chunk = io.read_chunk(17).await.expect("unaligned chunk read should succeed");
assert_eq!(chunk, fixture.slice(16..32));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn same_offset_concurrency_dedupe_shares_one_upstream_fetch() {
tracing::init();
let fixture = fixture::bytes(128);
let reader = counting::Reader::new(oracle::Reader::new(fixture.clone()));
let writer = counting::Writer::new(oracle::Writer::default());
let io = Arc::new(build_io(reader.clone(), writer, 32).await);
let handles: Vec<_> = (0..12)
.map(|_| {
let io = io.clone();
tokio::spawn(async move { io.read_chunk(0).await })
})
.collect();
for handle in handles {
let chunk = handle.await.expect("task should join").expect("chunk read should succeed");
assert_eq!(chunk, fixture.slice(0..32));
}
assert_eq!(reader.read_count(), 1, "same-offset concurrency should dedupe");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn different_offset_concurrency_returns_correct_chunks() {
tracing::init();
let fixture = fixture::bytes(192);
let reader = counting::Reader::new(oracle::Reader::new(fixture.clone()));
let writer = counting::Writer::new(oracle::Writer::default());
let io = Arc::new(build_io(reader.clone(), writer, 32).await);
let offsets = [0usize, 32, 64, 96];
let handles: Vec<_> = offsets
.into_iter()
.map(|offset| {
let io = io.clone();
tokio::spawn(async move { (offset, io.read_chunk(offset).await) })
})
.collect();
for handle in handles {
let (offset, chunk) = handle.await.expect("task should join");
let chunk = chunk.expect("chunk read should succeed");
let expected = fixture.slice(offset..offset + 32);
assert_eq!(chunk, expected, "chunk at offset {offset} should match the fixture");
}
assert_eq!(reader.read_count(), offsets.len(), "different offsets should fetch independently");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn bytestream_matches_the_fixture_payload() {
tracing::init();
let fixture = fixture::bytes(80);
let io = Arc::new(
build_io(
counting::Reader::new(oracle::Reader::new(fixture.clone())),
counting::Writer::new(oracle::Writer::default()),
16,
)
.await,
);
let mut viewer = io.viewer();
viewer.seek(7).expect("seek should succeed");
let mut stream = viewer.to_bytestream().await;
let mut collected = Vec::new();
while let Some(chunk) = stream.next().await {
collected.extend_from_slice(&chunk.expect("stream chunk should succeed"));
}
assert_eq!(Bytes::from(collected), fixture.slice(7..));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn transient_reader_failures_cleanup_flights_and_allow_retry() {
tracing::init();
let fixture = fixture::bytes(64);
let reader = counting::Reader::new(flaky::Reader::fail_once_at(fixture.clone(), [0]));
let writer = counting::Writer::new(oracle::Writer::default());
let io = Arc::new(build_io(reader.clone(), writer, 16).await);
assert!(io.read_chunk(0).await.is_err(), "first transient failure should surface");
let chunk = io.read_chunk(0).await.expect("retry should succeed");
assert_eq!(chunk, fixture.slice(0..16));
assert_eq!(reader.read_count(), 2, "retry should re-enter the upstream reader after cleanup");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn transient_writer_failures_cleanup_flights_and_allow_retry() {
tracing::init();
let fixture = fixture::bytes(64);
let reader = counting::Reader::new(oracle::Reader::new(fixture.clone()));
let writer = flaky::Writer::fail_once_at([0]);
let io = Arc::new(build_io(reader.clone(), writer, 16).await);
assert!(io.read_chunk(0).await.is_err(), "first transient writer failure should surface");
let chunk = io.read_chunk(0).await.expect("retry should succeed");
assert_eq!(chunk, fixture.slice(0..16));
assert_eq!(reader.read_count(), 2, "writer failure retry should refetch after cleanup");
}