libblobd_direct/op/
read_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::floor_pow2;
10use crate::util::mod_pow2;
11use bufpool::buf::Buf;
12use futures::stream::empty;
13use futures::Stream;
14use off64::u32;
15use off64::u64;
16use off64::u8;
17use off64::usz;
18use std::cmp::max;
19use std::pin::Pin;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tracing::trace;
23
24pub struct OpReadObjectInput {
25  pub key: Vec<u8>,
26  // Only useful if versioning is enabled.
27  pub id: Option<u128>,
28  pub start: u64,
29  // Exclusive.
30  pub end: Option<u64>,
31  pub stream_buffer_size: u64,
32}
33
34pub struct OpReadObjectOutput {
35  pub data_stream: Pin<Box<dyn Stream<Item = OpResult<Buf>> + Send>>,
36  pub start: u64,
37  pub end: u64,
38  pub object_size: u64,
39  pub object_id: u128,
40}
41
42/// Both `offset` and `len` do not have to be multiples of the spage size.
43async fn unaligned_read(ctx: &Ctx, offset: u64, len: u64) -> Buf {
44  let a_start = floor_pow2(offset, ctx.pages.spage_size_pow2);
45  let a_end = max(
46    ceil_pow2(offset + len, ctx.pages.spage_size_pow2),
47    ctx.pages.spage_size(),
48  );
49  let mut buf = ctx.device.read_at(a_start, a_end - a_start).await;
50  ctx
51    .metrics
52    .0
53    .read_op_bytes_discarded
54    .fetch_add((a_end - a_start) - len, Relaxed);
55  buf.copy_within(usz!(offset - a_start)..usz!(offset - a_start + len), 0);
56  buf.truncate(usz!(len));
57  buf
58}
59
60fn object_is_still_valid(obj: &Object) -> OpResult<()> {
61  if obj.get_state() == ObjectState::Committed {
62    Ok(())
63  } else {
64    Err(OpError::ObjectNotFound)
65  }
66}
67
68pub(crate) async fn op_read_object(
69  ctx: Arc<Ctx>,
70  req: OpReadObjectInput,
71) -> OpResult<OpReadObjectOutput> {
72  let Some(obj) = ctx
73    .committed_objects
74    .get(&req.key)
75    .filter(|o| req.id.is_none() || Some(o.id()) == req.id)
76    .map(|e| e.value().clone())
77  else {
78    return Err(OpError::ObjectNotFound);
79  };
80  let object_id = obj.id();
81  let object_size = obj.size;
82  let layout = calc_object_layout(&ctx.pages, object_size);
83  let lpage_count = layout.lpage_count;
84  let tail_page_count = layout.tail_page_sizes_pow2.len();
85
86  let start = req.start;
87  // Exclusive.
88  let end = req.end.unwrap_or(object_size);
89  if start > end || start > object_size || end > object_size {
90    return Err(OpError::RangeOutOfBounds);
91  };
92
93  ctx.metrics.0.read_op_count.fetch_add(1, Relaxed);
94  ctx
95    .metrics
96    .0
97    .read_op_bytes_requested
98    .fetch_add(end - start, Relaxed);
99
100  // Special handling for empty ranges. Note that we must handle this in case object has size of zero.
101  if start == end || start == object_size {
102    return Ok(OpReadObjectOutput {
103      data_stream: Box::pin(empty()),
104      start,
105      end,
106      object_size,
107      object_id,
108    });
109  }
110
111  let data_stream = async_stream::try_stream! {
112    // This is the lpage index (incremented every lpage) or tail page index (incremented every tail page **which differ in size**).
113    let mut idx = u32!(div_pow2(start, ctx.pages.lpage_size_pow2));
114    if idx >= lpage_count {
115      // We're starting inside the tail data, but that doesn't mean we're starting from the first tail page.
116      // WARNING: Convert values to u64 BEFORE multiplying.
117      let mut accum = u64!(idx) * ctx.pages.lpage_size();
118      for (_, sz_pow2) in layout.tail_page_sizes_pow2 {
119        accum += 1 << sz_pow2;
120        // This should be `>` not `>=`. For example, if lpage size is 16 MiB and first tail page is 8 MiB, and `start` is 24 MiB exactly, then it needs to start on the *second* tail page, not the first.
121        if accum > start {
122          break;
123        };
124        idx += 1;
125      };
126    };
127    let mut next = start;
128    while next < end {
129      let (page_dev_offset, page_size_pow2) = {
130        if idx < lpage_count {
131          let dev_offset = obj.lpage_dev_offsets[usz!(idx)];
132          let page_size_pow2 = ctx.pages.lpage_size_pow2;
133          (dev_offset, page_size_pow2)
134        } else {
135          let tail_idx = u8!(idx - lpage_count);
136          assert!(tail_idx < tail_page_count);
137          let dev_offset = obj.tail_page_dev_offsets[usz!(tail_idx)];
138          let page_size_pow2 = layout.tail_page_sizes_pow2.get(tail_idx).unwrap();
139          (dev_offset, page_size_pow2)
140        }
141      };
142      // The device offset of the current lpage or tail page changes each lpage amount, so this is not the same as `next`. Think of `next` as the virtual pointer within a contiguous span of the object's data bytes, and this as the physical offset within the physical page that backs the current position of the virtual pointer within the object's data made from many pages at random device locations of different sizes.
143      let offset_within_page = mod_pow2(next, page_size_pow2);
144      let rem_within_page = (1 << page_size_pow2) - offset_within_page;
145
146      // Can't read past current page, as we'll need to switch to a different page then.
147      let chunk_len = req.stream_buffer_size
148        .min(end - next)
149        .min(rem_within_page);
150      trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
151      object_is_still_valid(&obj)?;
152      let data = unaligned_read(&ctx, page_dev_offset + offset_within_page, chunk_len).await;
153      assert_eq!(u64!(data.len()), chunk_len);
154      if chunk_len == rem_within_page {
155        idx += 1;
156      };
157      next += chunk_len;
158
159      // Check again before yielding; we may have read junk.
160      object_is_still_valid(&obj)?;
161      ctx.metrics.0.read_op_bytes_sent.fetch_add(chunk_len, Relaxed);
162      yield data;
163    };
164  };
165
166  Ok(OpReadObjectOutput {
167    data_stream: Box::pin(data_stream),
168    end,
169    object_id,
170    object_size,
171    start,
172  })
173}