use std::{fmt, ops::Range, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::try_join_all;
use crate::runtime_bridge::bridge_sync_to_async_send;
#[async_trait]
pub trait LazyByteSource: Send + Sync {
fn size(&self) -> u64;
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError>;
fn try_get_range_sync(&self, _start: u64, _len: u64) -> Option<Bytes> {
None
}
async fn tail(&self, len: u64) -> Result<(Bytes, u64), LazyByteSourceError> {
let size = self.size();
if size == 0 {
return Err(LazyByteSourceError::OutOfBounds {
start: 0,
len,
size: 0,
});
}
let len = len.min(size);
let bytes = self.range(size - len, len).await?;
Ok((bytes, size))
}
}
#[derive(Debug, thiserror::Error)]
pub enum LazyByteSourceError {
#[error("lazy source storage: {0}")]
Storage(#[from] crate::storage::StorageError),
#[error("range out of bounds: start={start} len={len} size={size}")]
OutOfBounds { start: u64, len: u64, size: u64 },
#[error("short read: start={start} requested={requested} got={got}")]
ShortRead {
start: u64,
requested: u64,
got: u64,
},
}
#[derive(Clone)]
pub(crate) enum Source {
InMemory(Bytes),
Lazy(Arc<dyn LazyByteSource>),
}
impl fmt::Debug for Source {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InMemory(b) => f.debug_tuple("InMemory").field(&b.len()).finish(),
Self::Lazy(_) => f.debug_struct("Lazy").finish_non_exhaustive(),
}
}
}
impl Source {
pub(crate) fn len(&self) -> usize {
match self {
Self::InMemory(b) => b.len(),
Self::Lazy(s) => s.size() as usize,
}
}
pub fn try_get_range_sync(&self, range: Range<usize>) -> Option<Bytes> {
let start = range.start as u64;
let len = range.len() as u64;
match self {
Self::InMemory(b) => {
if range.end > b.len() {
return None;
}
Some(b.slice(range))
}
Self::Lazy(s) => s.try_get_range_sync(start, len),
}
}
pub fn get_range(&self, range: Range<usize>) -> Result<Bytes, LazyByteSourceError> {
if let Some(bytes) = self.try_get_range_sync(range.clone()) {
return Ok(bytes);
}
let Self::Lazy(s) = self else {
return Err(LazyByteSourceError::OutOfBounds {
start: range.start as u64,
len: range.len() as u64,
size: self.len() as u64,
});
};
let start = range.start as u64;
let len = range.len() as u64;
let src = Arc::clone(s);
bridge_sync_to_async_send(async move { src.range(start, len).await })
}
pub fn get_ranges_parallel(
&self,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>, LazyByteSourceError> {
if ranges.is_empty() {
return Ok(Vec::new());
}
let mut out: Vec<Option<Bytes>> = Vec::with_capacity(ranges.len());
let mut pending: Vec<(usize, u64, u64)> = Vec::new();
for (i, r) in ranges.iter().enumerate() {
if let Some(b) = self.try_get_range_sync(r.clone()) {
out.push(Some(b));
continue;
}
if !matches!(self, Self::Lazy(_)) {
return Err(LazyByteSourceError::OutOfBounds {
start: r.start as u64,
len: r.len() as u64,
size: self.len() as u64,
});
}
pending.push((i, r.start as u64, r.len() as u64));
out.push(None);
}
if !pending.is_empty() {
let Self::Lazy(s) = self else {
unreachable!("pending non-empty implies Source::Lazy");
};
let src = Arc::clone(s);
let order: Vec<usize> = pending.iter().map(|(i, _, _)| *i).collect();
let fut = async move {
let futs = pending
.into_iter()
.map(|(_i, start, len)| {
let s = Arc::clone(&src);
async move { s.range(start, len).await }
})
.collect::<Vec<_>>();
try_join_all(futs).await
};
let bytes: Vec<Bytes> = bridge_sync_to_async_send(fut)?;
for (slot, b) in order.into_iter().zip(bytes) {
out[slot] = Some(b);
}
}
Ok(out
.into_iter()
.map(|b| b.expect("every slot filled by sync or async path"))
.collect())
}
pub async fn range_async(&self, range: Range<usize>) -> Result<Bytes, LazyByteSourceError> {
if let Some(bytes) = self.try_get_range_sync(range.clone()) {
return Ok(bytes);
}
let Self::Lazy(s) = self else {
return Err(LazyByteSourceError::OutOfBounds {
start: range.start as u64,
len: range.len() as u64,
size: self.len() as u64,
});
};
s.range(range.start as u64, range.len() as u64).await
}
pub async fn get_ranges_parallel_async(
&self,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>, LazyByteSourceError> {
if ranges.is_empty() {
return Ok(Vec::new());
}
let mut out: Vec<Option<Bytes>> = Vec::with_capacity(ranges.len());
let mut pending: Vec<(usize, u64, u64)> = Vec::new();
for (i, r) in ranges.iter().enumerate() {
if let Some(b) = self.try_get_range_sync(r.clone()) {
out.push(Some(b));
continue;
}
if !matches!(self, Self::Lazy(_)) {
return Err(LazyByteSourceError::OutOfBounds {
start: r.start as u64,
len: r.len() as u64,
size: self.len() as u64,
});
}
pending.push((i, r.start as u64, r.len() as u64));
out.push(None);
}
if !pending.is_empty() {
let Self::Lazy(s) = self else {
unreachable!("pending non-empty implies Source::Lazy");
};
let order: Vec<usize> = pending.iter().map(|(i, _, _)| *i).collect();
let futs = pending
.into_iter()
.map(|(_i, start, len)| {
let s = Arc::clone(s);
async move { s.range(start, len).await }
})
.collect::<Vec<_>>();
let bytes = try_join_all(futs).await?;
for (slot, b) in order.into_iter().zip(bytes) {
out[slot] = Some(b);
}
}
Ok(out
.into_iter()
.map(|b| b.expect("every slot filled by sync or async path"))
.collect())
}
}
#[derive(Debug, Clone)]
pub struct BytesLazyByteSource {
bytes: Bytes,
}
impl BytesLazyByteSource {
pub fn new(bytes: Bytes) -> Self {
Self { bytes }
}
}
#[async_trait]
impl LazyByteSource for BytesLazyByteSource {
fn size(&self) -> u64 {
self.bytes.len() as u64
}
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError> {
let total = self.bytes.len() as u64;
if start.saturating_add(len) > total {
return Err(LazyByteSourceError::OutOfBounds {
start,
len,
size: total,
});
}
let s = start as usize;
let e = s + len as usize;
Ok(self.bytes.slice(s..e))
}
fn try_get_range_sync(&self, start: u64, len: u64) -> Option<Bytes> {
let total = self.bytes.len() as u64;
if start.saturating_add(len) > total {
return None;
}
let s = start as usize;
let e = s + len as usize;
Some(self.bytes.slice(s..e))
}
}
pub(crate) struct LazySubSource {
inner: Arc<dyn LazyByteSource>,
offset: u64,
len: u64,
}
impl LazySubSource {
pub fn new(inner: Arc<dyn LazyByteSource>, offset: u64, len: u64) -> Self {
debug_assert!(offset + len <= inner.size(), "sub-source overruns inner");
Self { inner, offset, len }
}
}
impl fmt::Debug for LazySubSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LazySubSource")
.field("offset", &self.offset)
.field("len", &self.len)
.finish()
}
}
#[async_trait]
impl LazyByteSource for LazySubSource {
fn size(&self) -> u64 {
self.len
}
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError> {
if start.saturating_add(len) > self.len {
return Err(LazyByteSourceError::OutOfBounds {
start,
len,
size: self.len,
});
}
self.inner.range(self.offset + start, len).await
}
fn try_get_range_sync(&self, start: u64, len: u64) -> Option<Bytes> {
if start.saturating_add(len) > self.len {
return None;
}
self.inner.try_get_range_sync(self.offset + start, len)
}
}
pub(crate) struct PrefetchedSource {
inner: Arc<dyn LazyByteSource>,
prefetched: Vec<(u64, Bytes)>,
}
impl PrefetchedSource {
pub fn new(inner: Arc<dyn LazyByteSource>) -> Self {
Self {
inner,
prefetched: Vec::new(),
}
}
pub fn install(&mut self, start: u64, bytes: Bytes) {
self.prefetched.push((start, bytes));
}
fn lookup(&self, start: u64, len: u64) -> Option<Bytes> {
let req_end = start.checked_add(len)?;
for (p_start, p_bytes) in &self.prefetched {
let p_end = *p_start + p_bytes.len() as u64;
if *p_start <= start && req_end <= p_end {
let offset = (start - *p_start) as usize;
return Some(p_bytes.slice(offset..offset + len as usize));
}
}
None
}
}
impl fmt::Debug for PrefetchedSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PrefetchedSource")
.field("size", &self.inner.size())
.field("prefetched_count", &self.prefetched.len())
.field(
"prefetched_bytes",
&self.prefetched.iter().map(|(_, b)| b.len()).sum::<usize>(),
)
.finish()
}
}
#[async_trait]
impl LazyByteSource for PrefetchedSource {
fn size(&self) -> u64 {
self.inner.size()
}
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError> {
if let Some(b) = self.lookup(start, len) {
return Ok(b);
}
self.inner.range(start, len).await
}
fn try_get_range_sync(&self, start: u64, len: u64) -> Option<Bytes> {
if let Some(b) = self.lookup(start, len) {
return Some(b);
}
self.inner.try_get_range_sync(start, len)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn bytes_lazy_source_size_and_range() {
let payload = Bytes::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let src = BytesLazyByteSource::new(payload.clone());
assert_eq!(src.size(), payload.len() as u64);
let slice = src.range(2, 4).await.expect("range");
assert_eq!(slice.as_ref(), &payload[2..6]);
}
#[tokio::test]
async fn bytes_lazy_source_out_of_bounds_surfaces_typed_error() {
let src = BytesLazyByteSource::new(Bytes::from(vec![0u8; 4]));
let err = src
.range(2, 100)
.await
.expect_err("must reject out-of-bounds");
assert!(
matches!(err, LazyByteSourceError::OutOfBounds { .. }),
"expected OutOfBounds, got {err:?}"
);
}
#[test]
fn bytes_lazy_source_try_get_range_sync_returns_zero_copy_slice() {
let payload = Bytes::from(vec![10u8, 20, 30, 40, 50, 60, 70, 80]);
let src = BytesLazyByteSource::new(payload.clone());
let got = src
.try_get_range_sync(2, 4)
.expect("in-bounds sync must succeed");
assert_eq!(got.as_ref(), &payload[2..6]);
let expected_ptr = unsafe { payload.as_ptr().add(2) };
assert_eq!(got.as_ptr(), expected_ptr);
}
#[test]
fn bytes_lazy_source_try_get_range_sync_returns_none_out_of_bounds() {
let src = BytesLazyByteSource::new(Bytes::from(vec![0u8; 4]));
assert!(src.try_get_range_sync(2, 100).is_none());
assert!(src.try_get_range_sync(100, 0).is_none());
}
#[tokio::test]
async fn prefetched_source_serves_installed_range_zero_copy() {
let payload = Bytes::from(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let inner = Arc::new(BytesLazyByteSource::new(payload.clone()));
let mut overlay = PrefetchedSource::new(inner);
let installed = payload.slice(2..7);
overlay.install(2, installed.clone());
let got = overlay
.try_get_range_sync(3, 3)
.expect("installed range covers (3..6)");
assert_eq!(got.as_ref(), &payload[3..6]);
let async_got = overlay.range(3, 3).await.expect("async path also serves");
assert_eq!(async_got.as_ref(), &payload[3..6]);
}
#[tokio::test]
async fn lazy_sub_source_translates_offsets_and_reports_slice_size() {
let payload = Bytes::from((0u8..32).collect::<Vec<_>>());
let inner: Arc<dyn LazyByteSource> = Arc::new(BytesLazyByteSource::new(payload.clone()));
let sub = LazySubSource::new(Arc::clone(&inner), 8, 16);
assert_eq!(sub.size(), 16, "sub-source size must equal slice length");
let got = sub.range(0, 4).await.expect("range(0..4) in slice");
assert_eq!(got.as_ref(), &payload[8..12]);
let got = sub.range(12, 4).await.expect("range(12..16) in slice");
assert_eq!(got.as_ref(), &payload[20..24]);
let sync_got = sub.try_get_range_sync(2, 6).expect("sync in slice");
assert_eq!(sync_got.as_ref(), &payload[10..16]);
}
#[tokio::test]
async fn lazy_sub_source_out_of_bounds_uses_slice_size_in_error() {
let payload = Bytes::from(vec![0u8; 32]);
let inner: Arc<dyn LazyByteSource> = Arc::new(BytesLazyByteSource::new(payload));
let sub = LazySubSource::new(Arc::clone(&inner), 8, 16);
let err = sub
.range(10, 10)
.await
.expect_err("10+10 overruns the 16-byte slice");
match err {
LazyByteSourceError::OutOfBounds { start, len, size } => {
assert_eq!(start, 10);
assert_eq!(len, 10);
assert_eq!(size, 16, "size must be the slice's, not the inner's");
}
other => panic!("expected OutOfBounds, got {other:?}"),
}
assert!(sub.try_get_range_sync(10, 10).is_none());
}
#[tokio::test]
async fn prefetched_source_overlay_hit_skips_underlying_range_call() {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct CountingSource {
inner: BytesLazyByteSource,
range_calls: AtomicUsize,
}
#[async_trait]
impl LazyByteSource for CountingSource {
fn size(&self) -> u64 {
self.inner.size()
}
async fn range(&self, start: u64, len: u64) -> Result<Bytes, LazyByteSourceError> {
self.range_calls.fetch_add(1, Ordering::SeqCst);
self.inner.range(start, len).await
}
fn try_get_range_sync(&self, _: u64, _: u64) -> Option<Bytes> {
None
}
}
let payload = Bytes::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7]);
let counting = Arc::new(CountingSource {
inner: BytesLazyByteSource::new(payload.clone()),
range_calls: AtomicUsize::new(0),
});
let prefetched = counting.range(0, 4).await.expect("seed prefetch");
assert_eq!(counting.range_calls.load(Ordering::SeqCst), 1);
let mut overlay = PrefetchedSource::new(counting.clone());
overlay.install(0, prefetched);
let _ = overlay.range(1, 2).await.expect("overlay serves");
let _ = overlay.range(0, 4).await.expect("overlay serves");
assert_eq!(
counting.range_calls.load(Ordering::SeqCst),
1,
"overlay hits must not bump the underlying range counter"
);
let _ = overlay.range(4, 4).await.expect("miss falls through");
assert_eq!(
counting.range_calls.load(Ordering::SeqCst),
2,
"an overlay miss must reach the underlying source exactly once"
);
}
#[test]
fn debug_impls_render_each_source_kind() {
let payload = Bytes::from(vec![0u8; 16]);
let in_mem = Source::InMemory(payload.clone());
assert!(format!("{in_mem:?}").contains("InMemory"));
let inner: Arc<dyn LazyByteSource> = Arc::new(BytesLazyByteSource::new(payload.clone()));
let lazy = Source::Lazy(Arc::clone(&inner));
assert!(format!("{lazy:?}").contains("Lazy"));
let sub = LazySubSource::new(Arc::clone(&inner), 4, 8);
assert!(format!("{sub:?}").contains("LazySubSource"));
let mut overlay = PrefetchedSource::new(Arc::clone(&inner));
overlay.install(0, payload.slice(0..4));
assert!(format!("{overlay:?}").contains("PrefetchedSource"));
}
#[tokio::test]
async fn source_enum_fetch_surface_and_bounds() {
let payload = Bytes::from((0u8..16).collect::<Vec<_>>());
let src = Source::InMemory(payload.clone());
assert_eq!(src.len(), 16);
assert_eq!(
src.try_get_range_sync(2..6)
.expect("in-bounds sync")
.as_ref(),
&payload[2..6]
);
assert_eq!(
src.get_range(0..4).expect("get_range").as_ref(),
&payload[0..4]
);
assert!(matches!(
src.get_range(8..100),
Err(LazyByteSourceError::OutOfBounds { .. })
));
assert!(src.get_ranges_parallel(&[]).expect("empty").is_empty());
let got = src.get_ranges_parallel(&[0..2, 4..8]).expect("multi-range");
assert_eq!(got.len(), 2);
assert!(matches!(
src.get_ranges_parallel(&[0..2, 8..100]),
Err(LazyByteSourceError::OutOfBounds { .. })
));
assert_eq!(
src.range_async(4..8).await.expect("range_async").as_ref(),
&payload[4..8]
);
assert!(matches!(
src.range_async(8..100).await,
Err(LazyByteSourceError::OutOfBounds { .. })
));
assert!(
src.get_ranges_parallel_async(&[])
.await
.expect("empty async")
.is_empty()
);
assert!(matches!(
src.get_ranges_parallel_async(&[0..2, 8..100]).await,
Err(LazyByteSourceError::OutOfBounds { .. })
));
}
}