libblobd_direct/op/
read_object.rs1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::floor_pow2;
10use crate::util::mod_pow2;
11use bufpool::buf::Buf;
12use futures::stream::empty;
13use futures::Stream;
14use off64::u32;
15use off64::u64;
16use off64::u8;
17use off64::usz;
18use std::cmp::max;
19use std::pin::Pin;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tracing::trace;
23
24pub struct OpReadObjectInput {
25 pub key: Vec<u8>,
26 pub id: Option<u128>,
28 pub start: u64,
29 pub end: Option<u64>,
31 pub stream_buffer_size: u64,
32}
33
34pub struct OpReadObjectOutput {
35 pub data_stream: Pin<Box<dyn Stream<Item = OpResult<Buf>> + Send>>,
36 pub start: u64,
37 pub end: u64,
38 pub object_size: u64,
39 pub object_id: u128,
40}
41
42async fn unaligned_read(ctx: &Ctx, offset: u64, len: u64) -> Buf {
44 let a_start = floor_pow2(offset, ctx.pages.spage_size_pow2);
45 let a_end = max(
46 ceil_pow2(offset + len, ctx.pages.spage_size_pow2),
47 ctx.pages.spage_size(),
48 );
49 let mut buf = ctx.device.read_at(a_start, a_end - a_start).await;
50 ctx
51 .metrics
52 .0
53 .read_op_bytes_discarded
54 .fetch_add((a_end - a_start) - len, Relaxed);
55 buf.copy_within(usz!(offset - a_start)..usz!(offset - a_start + len), 0);
56 buf.truncate(usz!(len));
57 buf
58}
59
60fn object_is_still_valid(obj: &Object) -> OpResult<()> {
61 if obj.get_state() == ObjectState::Committed {
62 Ok(())
63 } else {
64 Err(OpError::ObjectNotFound)
65 }
66}
67
68pub(crate) async fn op_read_object(
69 ctx: Arc<Ctx>,
70 req: OpReadObjectInput,
71) -> OpResult<OpReadObjectOutput> {
72 let Some(obj) = ctx
73 .committed_objects
74 .get(&req.key)
75 .filter(|o| req.id.is_none() || Some(o.id()) == req.id)
76 .map(|e| e.value().clone())
77 else {
78 return Err(OpError::ObjectNotFound);
79 };
80 let object_id = obj.id();
81 let object_size = obj.size;
82 let layout = calc_object_layout(&ctx.pages, object_size);
83 let lpage_count = layout.lpage_count;
84 let tail_page_count = layout.tail_page_sizes_pow2.len();
85
86 let start = req.start;
87 let end = req.end.unwrap_or(object_size);
89 if start > end || start > object_size || end > object_size {
90 return Err(OpError::RangeOutOfBounds);
91 };
92
93 ctx.metrics.0.read_op_count.fetch_add(1, Relaxed);
94 ctx
95 .metrics
96 .0
97 .read_op_bytes_requested
98 .fetch_add(end - start, Relaxed);
99
100 if start == end || start == object_size {
102 return Ok(OpReadObjectOutput {
103 data_stream: Box::pin(empty()),
104 start,
105 end,
106 object_size,
107 object_id,
108 });
109 }
110
111 let data_stream = async_stream::try_stream! {
112 let mut idx = u32!(div_pow2(start, ctx.pages.lpage_size_pow2));
114 if idx >= lpage_count {
115 let mut accum = u64!(idx) * ctx.pages.lpage_size();
118 for (_, sz_pow2) in layout.tail_page_sizes_pow2 {
119 accum += 1 << sz_pow2;
120 if accum > start {
122 break;
123 };
124 idx += 1;
125 };
126 };
127 let mut next = start;
128 while next < end {
129 let (page_dev_offset, page_size_pow2) = {
130 if idx < lpage_count {
131 let dev_offset = obj.lpage_dev_offsets[usz!(idx)];
132 let page_size_pow2 = ctx.pages.lpage_size_pow2;
133 (dev_offset, page_size_pow2)
134 } else {
135 let tail_idx = u8!(idx - lpage_count);
136 assert!(tail_idx < tail_page_count);
137 let dev_offset = obj.tail_page_dev_offsets[usz!(tail_idx)];
138 let page_size_pow2 = layout.tail_page_sizes_pow2.get(tail_idx).unwrap();
139 (dev_offset, page_size_pow2)
140 }
141 };
142 let offset_within_page = mod_pow2(next, page_size_pow2);
144 let rem_within_page = (1 << page_size_pow2) - offset_within_page;
145
146 let chunk_len = req.stream_buffer_size
148 .min(end - next)
149 .min(rem_within_page);
150 trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
151 object_is_still_valid(&obj)?;
152 let data = unaligned_read(&ctx, page_dev_offset + offset_within_page, chunk_len).await;
153 assert_eq!(u64!(data.len()), chunk_len);
154 if chunk_len == rem_within_page {
155 idx += 1;
156 };
157 next += chunk_len;
158
159 object_is_still_valid(&obj)?;
161 ctx.metrics.0.read_op_bytes_sent.fetch_add(chunk_len, Relaxed);
162 yield data;
163 };
164 };
165
166 Ok(OpReadObjectOutput {
167 data_stream: Box::pin(data_stream),
168 end,
169 object_id,
170 object_size,
171 start,
172 })
173}