libblobd_lite/op/
read_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::bucket::FoundObject;
4use crate::ctx::Ctx;
5use crate::object::calc_object_layout;
6use crate::object::OBJECT_OFF;
7use crate::op::key_debug_str;
8use crate::page::ObjectPageHeader;
9use crate::page::ObjectState;
10use crate::util::div_pow2;
11use crate::util::mod_pow2;
12use futures::Stream;
13use off64::int::Off64AsyncReadInt;
14use off64::u16;
15use off64::u8;
16use std::cmp::min;
17use std::pin::Pin;
18use std::sync::Arc;
19use tinybuf::TinyBuf;
20use tokio::time::Instant;
21use tracing::trace;
22
23pub struct OpReadObjectInput {
24  pub key: TinyBuf,
25  // Only useful if versioning is enabled.
26  pub id: Option<u64>,
27  pub start: u64,
28  // Exclusive.
29  pub end: Option<u64>,
30  pub stream_buffer_size: u64,
31}
32
33pub struct OpReadObjectOutput {
34  pub data_stream: Pin<Box<dyn Stream<Item = OpResult<TinyBuf>> + Send>>,
35  pub start: u64,
36  pub end: u64,
37  pub object_size: u64,
38  pub object_id: u64,
39}
40
41pub(crate) async fn op_read_object(
42  ctx: Arc<Ctx>,
43  req: OpReadObjectInput,
44) -> OpResult<OpReadObjectOutput> {
45  trace!(
46    key = key_debug_str(&req.key),
47    start = req.start,
48    end = req.end,
49    "reading object"
50  );
51
52  // WARNING: Drop bucket lock immediately.
53  let Some(FoundObject { dev_offset: object_dev_offset, id: object_id, size: object_size, .. }) = ctx.buckets.get_bucket_for_key(&req.key).await.find_object(req.id).await else {
54    return Err(OpError::ObjectNotFound);
55  };
56  let start = req.start;
57  // Exclusive.
58  let end = req.end.unwrap_or(object_size);
59  // Note: disallow empty ranges.
60  if start >= end || start >= object_size || end > object_size {
61    return Err(OpError::RangeOutOfBounds);
62  };
63  trace!(
64    key = key_debug_str(&req.key),
65    object_dev_offset,
66    object_id,
67    object_size,
68    start,
69    end,
70    "found object to read"
71  );
72
73  let alloc_cfg = calc_object_layout(&ctx.pages, object_size);
74  let off = OBJECT_OFF
75    .with_key_len(u16!(req.key.len()))
76    .with_lpages(alloc_cfg.lpage_count)
77    .with_tail_pages(alloc_cfg.tail_page_sizes_pow2.len());
78
79  let data_stream = async_stream::try_stream! {
80    // This is the lpage index (incremented every lpage) or tail page index (incremented every tail page **which differ in size**).
81    let mut idx = div_pow2(start, ctx.pages.lpage_size_pow2);
82    if idx >= alloc_cfg.lpage_count {
83      // We're starting inside the tail data, but that doesn't mean we're starting from the first tail page.
84      let mut accum = idx * alloc_cfg.lpage_count;
85      for (_, sz_pow2) in alloc_cfg.tail_page_sizes_pow2 {
86        accum += 1 << sz_pow2;
87        // This should be `>` not `>=`. For example, if lpage size is 16 MiB and first tail page is 8 MiB, and `start` is 24 MiB exactly, then it needs to start on the *second* tail page, not the first.
88        if accum > start {
89          break;
90        };
91        idx += 1;
92      };
93    };
94    let mut next = start;
95    let mut last_checked_valid = Instant::now();
96    while next < end {
97      let now = Instant::now();
98      if now.duration_since(last_checked_valid).as_secs() >= 60 {
99        // Check that object is still valid.
100        let hdr = ctx
101          .pages
102          .read_page_header::<ObjectPageHeader>(object_dev_offset)
103          .await;
104        // We can't use `return Err(...)` in `try_stream!`.
105        if hdr.state == ObjectState::Committed {
106          Ok(())
107        } else {
108          Err(OpError::ObjectNotFound)
109        }?;
110        last_checked_valid = now;
111      }
112      let (page_dev_offset, page_size_pow2) = if idx < alloc_cfg.lpage_count {
113        let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.lpage(idx)).await;
114        let page_size_pow2 = ctx.pages.lpage_size_pow2;
115        (dev_offset, page_size_pow2)
116      } else {
117        let tail_idx = u8!(idx - alloc_cfg.lpage_count);
118        debug_assert!(tail_idx < alloc_cfg.tail_page_sizes_pow2.len());
119        let dev_offset = ctx.device.read_u48_be_at(object_dev_offset + off.tail_page(tail_idx)).await;
120        let page_size_pow2 = alloc_cfg.tail_page_sizes_pow2.get(tail_idx).unwrap();
121        (dev_offset, page_size_pow2)
122      };
123      // The device offset of the current lpage or tail page changes each lpage amount, so this is not the same as `next`. Think of `next` as the virtual pointer within a contiguous span of the object's data bytes, and this as the physical offset within the physical page that backs the current position of the virtual pointer within the object's data made from many pages of different sizes.
124      let offset_within_page = mod_pow2(next, page_size_pow2);
125
126      // Can't read past current page, as we'll need to switch to a different page then.
127      // TODO We could read in smaller amounts instead, to avoid higher memory usage from buffering.
128      let chunk_len = min(
129        end - next,
130        (1 << page_size_pow2) - offset_within_page,
131      );
132      trace!(idx, page_size_pow2, page_dev_offset, offset_within_page, chunk_len, start, next, end, "reading chunk");
133      let data = ctx.device.read_at(page_dev_offset + offset_within_page, chunk_len).await;
134      idx += 1;
135      next += chunk_len;
136      yield data;
137    };
138  };
139
140  Ok(OpReadObjectOutput {
141    data_stream: Box::pin(data_stream),
142    end,
143    object_id,
144    object_size,
145    start,
146  })
147}