use super::OpError;
use super::OpResult;
use crate::bucket::FoundObject;
use crate::ctx::Ctx;
use crate::object::calc_object_layout;
use crate::object::OBJECT_OFF;
use crate::op::key_debug_str;
use crate::page::ObjectPageHeader;
use crate::page::ObjectState;
use crate::util::div_pow2;
use crate::util::mod_pow2;
use futures::Stream;
use off64::int::Off64AsyncReadInt;
use off64::u16;
use off64::u8;
use std::cmp::min;
use std::pin::Pin;
use std::sync::Arc;
use tinybuf::TinyBuf;
use tokio::time::Instant;
use tracing::trace;
pub struct OpReadObjectInput {
pub key: TinyBuf,
pub id: Option<u64>,
pub start: u64,
pub end: Option<u64>,
pub stream_buffer_size: u64,
}
pub struct OpReadObjectOutput {
pub data_stream: Pin<Box<dyn Stream<Item = OpResult<TinyBuf>> + Send>>,
pub start: u64,
pub end: u64,
pub object_size: u64,
pub object_id: u64,
}
pub(crate) async fn op_read_object(
ctx: Arc<Ctx>,
req: OpReadObjectInput,
) -> OpResult<OpReadObjectOutput> {
trace!(
key = key_debug_str(&req.key),
start = req.start,
end = req.end,
"reading object"
);
let Some(FoundObject { dev_offset: object_dev_offset, id: object_id, size: object_size, .. }) = ctx.buckets.get_bucket_for_key(&req.key).await.find_object(req.id).await else {
return Err(OpError::ObjectNotFound);
};
let start = req.start;
let end = req.end.unwrap_or(object_size);
if start >= end || start >= object_size || end > object_size {
return Err(OpError::RangeOutOfBounds);
};
trace!(
key = key_debug_str(&req.key),
object_dev_offset,
object_id,
object_size,
start,
end,
"found object to read"
);
let alloc_cfg = calc_object_layout(&ctx.pages, object_size);
let off = OBJECT_OFF
.with_key_len(u16!(req.key.len()))
.with_lpages(alloc_cfg.lpage_count)
.with_tail_pages(alloc_cfg.tail_page_sizes_pow2.len());
let data_stream = async_stream::try_stream! {
let mut idx = div_pow2(start, ctx.pages.lpage_size_pow2);
if idx >= alloc_cfg.lpage_count {
let mut accum = idx * alloc_cfg.lpage_count;
for (_, sz_pow2) in alloc_cfg.tail_page_sizes_pow2 {
accum += 1 << sz_pow2;
if accum > start {
break;
};
idx += 1;
};
};
let mut next = start;
let mut last_checked_valid = Instant::now();
while next < end {
let now = Instant::now();
if now.duration_since(last_checked_valid).as_secs() >= 60 {
let hdr = ctx
.pages
.read_page_header::<ObjectPageHeader>(object_dev_offset)
.await;
if hdr.state == ObjectState::Committed {
Ok(())
} else {
Err(OpError::ObjectNotFound)
}?;
last_checked_valid = now;
}
let (page_dev_offset, page_size_pow2) = if idx < alloc_cfg.lpage_count {
let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.lpage(idx)).await;
let page_size_pow2 = ctx.pages.lpage_size_pow2;
(dev_offset, page_size_pow2)
} else {
let tail_idx = u8!(idx - alloc_cfg.lpage_count);
debug_assert!(tail_idx < alloc_cfg.tail_page_sizes_pow2.len());
let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.tail_page(tail_idx)).await;
let page_size_pow2 = alloc_cfg.tail_page_sizes_pow2.get(tail_idx).unwrap();
(dev_offset, page_size_pow2)
};
let offset_within_page = mod_pow2(next, page_size_pow2);
let chunk_len = min(
end - next,
(1 << page_size_pow2) - offset_within_page,
);
trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
let data = ctx.device.read_at(page_dev_offset + offset_within_page, chunk_len).await;
idx += 1;
next += chunk_len;
yield data;
};
};
Ok(OpReadObjectOutput {
data_stream: Box::pin(data_stream),
end,
object_id,
object_size,
start,
})
}