libblobd_direct/op/
read_object.rs1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::floor_pow2;
10use crate::util::mod_pow2;
11use bufpool::buf::Buf;
12use futures::stream::empty;
13use futures::Stream;
14use off64::u32;
15use off64::u64;
16use off64::u8;
17use off64::usz;
18use std::cmp::max;
19use std::pin::Pin;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tinybuf::TinyBuf;
23use tracing::trace;
24
25pub struct OpReadObjectInput {
26 pub key: TinyBuf,
27 pub id: Option<u64>,
29 pub start: u64,
30 pub end: Option<u64>,
32 pub stream_buffer_size: u64,
33}
34
35pub struct OpReadObjectOutput {
36 pub data_stream: Pin<Box<dyn Stream<Item = OpResult<Buf>> + Send>>,
37 pub start: u64,
38 pub end: u64,
39 pub object_size: u64,
40 pub object_id: u64,
41}
42
43async fn unaligned_read(ctx: &Ctx, offset: u64, len: u64) -> Buf {
45 let a_start = floor_pow2(offset, ctx.pages.spage_size_pow2);
46 let a_end = max(
47 ceil_pow2(offset + len, ctx.pages.spage_size_pow2),
48 ctx.pages.spage_size(),
49 );
50 let mut buf = ctx.device.read_at(a_start, a_end - a_start).await;
51 ctx
52 .metrics
53 .0
54 .read_op_bytes_discarded
55 .fetch_add((a_end - a_start) - len, Relaxed);
56 buf.copy_within(usz!(offset - a_start)..usz!(offset - a_start + len), 0);
57 buf.truncate(usz!(len));
58 buf
59}
60
61fn object_is_still_valid(obj: &Object) -> OpResult<()> {
62 if obj.get_state() == ObjectState::Committed {
63 Ok(())
64 } else {
65 Err(OpError::ObjectNotFound)
66 }
67}
68
69pub(crate) async fn op_read_object(
70 ctx: Arc<Ctx>,
71 req: OpReadObjectInput,
72) -> OpResult<OpReadObjectOutput> {
73 let Some(obj) = ctx.committed_objects.get(&req.key).filter(|o| req.id.is_none() || Some(o.id()) == req.id).map(|e| e.value().clone()) else {
74 return Err(OpError::ObjectNotFound);
75 };
76 let object_id = obj.id();
77 let object_size = obj.size;
78 let layout = calc_object_layout(&ctx.pages, object_size);
79 let lpage_count = layout.lpage_count;
80 let tail_page_count = layout.tail_page_sizes_pow2.len();
81
82 let start = req.start;
83 let end = req.end.unwrap_or(object_size);
85 if start > end || start > object_size || end > object_size {
86 return Err(OpError::RangeOutOfBounds);
87 };
88
89 ctx.metrics.0.read_op_count.fetch_add(1, Relaxed);
90 ctx
91 .metrics
92 .0
93 .read_op_bytes_requested
94 .fetch_add(end - start, Relaxed);
95
96 if start == end || start == object_size {
98 return Ok(OpReadObjectOutput {
99 data_stream: Box::pin(empty()),
100 start,
101 end,
102 object_size,
103 object_id,
104 });
105 }
106
107 let data_stream = async_stream::try_stream! {
108 let mut idx = u32!(div_pow2(start, ctx.pages.lpage_size_pow2));
110 if idx >= lpage_count {
111 let mut accum = u64!(idx) * ctx.pages.lpage_size();
114 for (_, sz_pow2) in layout.tail_page_sizes_pow2 {
115 accum += 1 << sz_pow2;
116 if accum > start {
118 break;
119 };
120 idx += 1;
121 };
122 };
123 let mut next = start;
124 while next < end {
125 let (page_dev_offset, page_size_pow2) = {
126 if idx < lpage_count {
127 let dev_offset = obj.lpage_dev_offsets[usz!(idx)];
128 let page_size_pow2 = ctx.pages.lpage_size_pow2;
129 (dev_offset, page_size_pow2)
130 } else {
131 let tail_idx = u8!(idx - lpage_count);
132 assert!(tail_idx < tail_page_count);
133 let dev_offset = obj.tail_page_dev_offsets[usz!(tail_idx)];
134 let page_size_pow2 = layout.tail_page_sizes_pow2.get(tail_idx).unwrap();
135 (dev_offset, page_size_pow2)
136 }
137 };
138 let offset_within_page = mod_pow2(next, page_size_pow2);
140 let rem_within_page = (1 << page_size_pow2) - offset_within_page;
141
142 let chunk_len = req.stream_buffer_size
144 .min(end - next)
145 .min(rem_within_page);
146 trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
147 object_is_still_valid(&obj)?;
148 let data = unaligned_read(&ctx, page_dev_offset + offset_within_page, chunk_len).await;
149 assert_eq!(u64!(data.len()), chunk_len);
150 if chunk_len == rem_within_page {
151 idx += 1;
152 };
153 next += chunk_len;
154
155 object_is_still_valid(&obj)?;
157 ctx.metrics.0.read_op_bytes_sent.fetch_add(chunk_len, Relaxed);
158 yield data;
159 };
160 };
161
162 Ok(OpReadObjectOutput {
163 data_stream: Box::pin(data_stream),
164 end,
165 object_id,
166 object_size,
167 start,
168 })
169}