libblobd_direct/op/
read_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::floor_pow2;
10use crate::util::mod_pow2;
11use bufpool::buf::Buf;
12use futures::stream::empty;
13use futures::Stream;
14use off64::u32;
15use off64::u64;
16use off64::u8;
17use off64::usz;
18use std::cmp::max;
19use std::pin::Pin;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tinybuf::TinyBuf;
23use tracing::trace;
24
25pub struct OpReadObjectInput {
26  pub key: TinyBuf,
27  // Only useful if versioning is enabled.
28  pub id: Option<u64>,
29  pub start: u64,
30  // Exclusive.
31  pub end: Option<u64>,
32  pub stream_buffer_size: u64,
33}
34
35pub struct OpReadObjectOutput {
36  pub data_stream: Pin<Box<dyn Stream<Item = OpResult<Buf>> + Send>>,
37  pub start: u64,
38  pub end: u64,
39  pub object_size: u64,
40  pub object_id: u64,
41}
42
43/// Both `offset` and `len` do not have to be multiples of the spage size.
44async fn unaligned_read(ctx: &Ctx, offset: u64, len: u64) -> Buf {
45  let a_start = floor_pow2(offset, ctx.pages.spage_size_pow2);
46  let a_end = max(
47    ceil_pow2(offset + len, ctx.pages.spage_size_pow2),
48    ctx.pages.spage_size(),
49  );
50  let mut buf = ctx.device.read_at(a_start, a_end - a_start).await;
51  ctx
52    .metrics
53    .0
54    .read_op_bytes_discarded
55    .fetch_add((a_end - a_start) - len, Relaxed);
56  buf.copy_within(usz!(offset - a_start)..usz!(offset - a_start + len), 0);
57  buf.truncate(usz!(len));
58  buf
59}
60
61fn object_is_still_valid(obj: &Object) -> OpResult<()> {
62  if obj.get_state() == ObjectState::Committed {
63    Ok(())
64  } else {
65    Err(OpError::ObjectNotFound)
66  }
67}
68
69pub(crate) async fn op_read_object(
70  ctx: Arc<Ctx>,
71  req: OpReadObjectInput,
72) -> OpResult<OpReadObjectOutput> {
73  let Some(obj) = ctx.committed_objects.get(&req.key).filter(|o| req.id.is_none() || Some(o.id()) == req.id).map(|e| e.value().clone()) else {
74    return Err(OpError::ObjectNotFound);
75  };
76  let object_id = obj.id();
77  let object_size = obj.size;
78  let layout = calc_object_layout(&ctx.pages, object_size);
79  let lpage_count = layout.lpage_count;
80  let tail_page_count = layout.tail_page_sizes_pow2.len();
81
82  let start = req.start;
83  // Exclusive.
84  let end = req.end.unwrap_or(object_size);
85  if start > end || start > object_size || end > object_size {
86    return Err(OpError::RangeOutOfBounds);
87  };
88
89  ctx.metrics.0.read_op_count.fetch_add(1, Relaxed);
90  ctx
91    .metrics
92    .0
93    .read_op_bytes_requested
94    .fetch_add(end - start, Relaxed);
95
96  // Special handling for empty ranges. Note that we must handle this in case object has size of zero.
97  if start == end || start == object_size {
98    return Ok(OpReadObjectOutput {
99      data_stream: Box::pin(empty()),
100      start,
101      end,
102      object_size,
103      object_id,
104    });
105  }
106
107  let data_stream = async_stream::try_stream! {
108    // This is the lpage index (incremented every lpage) or tail page index (incremented every tail page **which differ in size**).
109    let mut idx = u32!(div_pow2(start, ctx.pages.lpage_size_pow2));
110    if idx >= lpage_count {
111      // We're starting inside the tail data, but that doesn't mean we're starting from the first tail page.
112      // WARNING: Convert values to u64 BEFORE multiplying.
113      let mut accum = u64!(idx) * ctx.pages.lpage_size();
114      for (_, sz_pow2) in layout.tail_page_sizes_pow2 {
115        accum += 1 << sz_pow2;
116        // This should be `>` not `>=`. For example, if lpage size is 16 MiB and first tail page is 8 MiB, and `start` is 24 MiB exactly, then it needs to start on the *second* tail page, not the first.
117        if accum > start {
118          break;
119        };
120        idx += 1;
121      };
122    };
123    let mut next = start;
124    while next < end {
125      let (page_dev_offset, page_size_pow2) = {
126        if idx < lpage_count {
127          let dev_offset = obj.lpage_dev_offsets[usz!(idx)];
128          let page_size_pow2 = ctx.pages.lpage_size_pow2;
129          (dev_offset, page_size_pow2)
130        } else {
131          let tail_idx = u8!(idx - lpage_count);
132          assert!(tail_idx < tail_page_count);
133          let dev_offset = obj.tail_page_dev_offsets[usz!(tail_idx)];
134          let page_size_pow2 = layout.tail_page_sizes_pow2.get(tail_idx).unwrap();
135          (dev_offset, page_size_pow2)
136        }
137      };
138      // The device offset of the current lpage or tail page changes each lpage amount, so this is not the same as `next`. Think of `next` as the virtual pointer within a contiguous span of the object's data bytes, and this as the physical offset within the physical page that backs the current position of the virtual pointer within the object's data made from many pages at random device locations of different sizes.
139      let offset_within_page = mod_pow2(next, page_size_pow2);
140      let rem_within_page = (1 << page_size_pow2) - offset_within_page;
141
142      // Can't read past current page, as we'll need to switch to a different page then.
143      let chunk_len = req.stream_buffer_size
144        .min(end - next)
145        .min(rem_within_page);
146      trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
147      object_is_still_valid(&obj)?;
148      let data = unaligned_read(&ctx, page_dev_offset + offset_within_page, chunk_len).await;
149      assert_eq!(u64!(data.len()), chunk_len);
150      if chunk_len == rem_within_page {
151        idx += 1;
152      };
153      next += chunk_len;
154
155      // Check again before yielding; we may have read junk.
156      object_is_still_valid(&obj)?;
157      ctx.metrics.0.read_op_bytes_sent.fetch_add(chunk_len, Relaxed);
158      yield data;
159    };
160  };
161
162  Ok(OpReadObjectOutput {
163    data_stream: Box::pin(data_stream),
164    end,
165    object_id,
166    object_size,
167    start,
168  })
169}