libblobd_lite/op/
read_object.rs1use super::OpError;
2use super::OpResult;
3use crate::bucket::FoundObject;
4use crate::ctx::Ctx;
5use crate::object::calc_object_layout;
6use crate::object::OBJECT_OFF;
7use crate::op::key_debug_str;
8use crate::page::ObjectPageHeader;
9use crate::page::ObjectState;
10use crate::util::div_pow2;
11use crate::util::mod_pow2;
12use futures::Stream;
13use off64::int::Off64AsyncReadInt;
14use off64::u16;
15use off64::u8;
16use std::cmp::min;
17use std::pin::Pin;
18use std::sync::Arc;
19use tinybuf::TinyBuf;
20use tokio::time::Instant;
21use tracing::trace;
22
23pub struct OpReadObjectInput {
24 pub key: TinyBuf,
25 pub id: Option<u64>,
27 pub start: u64,
28 pub end: Option<u64>,
30 pub stream_buffer_size: u64,
31}
32
33pub struct OpReadObjectOutput {
34 pub data_stream: Pin<Box<dyn Stream<Item = OpResult<TinyBuf>> + Send>>,
35 pub start: u64,
36 pub end: u64,
37 pub object_size: u64,
38 pub object_id: u64,
39}
40
41pub(crate) async fn op_read_object(
42 ctx: Arc<Ctx>,
43 req: OpReadObjectInput,
44) -> OpResult<OpReadObjectOutput> {
45 trace!(
46 key = key_debug_str(&req.key),
47 start = req.start,
48 end = req.end,
49 "reading object"
50 );
51
52 let Some(FoundObject { dev_offset: object_dev_offset, id: object_id, size: object_size, .. }) = ctx.buckets.get_bucket_for_key(&req.key).await.find_object(req.id).await else {
54 return Err(OpError::ObjectNotFound);
55 };
56 let start = req.start;
57 let end = req.end.unwrap_or(object_size);
59 if start >= end || start >= object_size || end > object_size {
61 return Err(OpError::RangeOutOfBounds);
62 };
63 trace!(
64 key = key_debug_str(&req.key),
65 object_dev_offset,
66 object_id,
67 object_size,
68 start,
69 end,
70 "found object to read"
71 );
72
73 let alloc_cfg = calc_object_layout(&ctx.pages, object_size);
74 let off = OBJECT_OFF
75 .with_key_len(u16!(req.key.len()))
76 .with_lpages(alloc_cfg.lpage_count)
77 .with_tail_pages(alloc_cfg.tail_page_sizes_pow2.len());
78
79 let data_stream = async_stream::try_stream! {
80 let mut idx = div_pow2(start, ctx.pages.lpage_size_pow2);
82 if idx >= alloc_cfg.lpage_count {
83 let mut accum = idx * alloc_cfg.lpage_count;
85 for (_, sz_pow2) in alloc_cfg.tail_page_sizes_pow2 {
86 accum += 1 << sz_pow2;
87 if accum > start {
89 break;
90 };
91 idx += 1;
92 };
93 };
94 let mut next = start;
95 let mut last_checked_valid = Instant::now();
96 while next < end {
97 let now = Instant::now();
98 if now.duration_since(last_checked_valid).as_secs() >= 60 {
99 let hdr = ctx
101 .pages
102 .read_page_header::<ObjectPageHeader>(object_dev_offset)
103 .await;
104 if hdr.state == ObjectState::Committed {
106 Ok(())
107 } else {
108 Err(OpError::ObjectNotFound)
109 }?;
110 last_checked_valid = now;
111 }
112 let (page_dev_offset, page_size_pow2) = if idx < alloc_cfg.lpage_count {
113 let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.lpage(idx)).await;
114 let page_size_pow2 = ctx.pages.lpage_size_pow2;
115 (dev_offset, page_size_pow2)
116 } else {
117 let tail_idx = u8!(idx - alloc_cfg.lpage_count);
118 debug_assert!(tail_idx < alloc_cfg.tail_page_sizes_pow2.len());
119 let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.tail_page(tail_idx)).await;
120 let page_size_pow2 = alloc_cfg.tail_page_sizes_pow2.get(tail_idx).unwrap();
121 (dev_offset, page_size_pow2)
122 };
123 let offset_within_page = mod_pow2(next, page_size_pow2);
125
126 let chunk_len = min(
129 end - next,
130 (1 << page_size_pow2) - offset_within_page,
131 );
132 trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
133 let data = ctx.device.read_at(page_dev_offset + offset_within_page, chunk_len).await;
134 idx += 1;
135 next += chunk_len;
136 yield data;
137 };
138 };
139
140 Ok(OpReadObjectOutput {
141 data_stream: Box::pin(data_stream),
142 end,
143 object_id,
144 object_size,
145 start,
146 })
147}