libblobd_lite/op/
write_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::calc_object_layout;
6use crate::object::ObjectLayout;
7use crate::object::OBJECT_OFF;
8use crate::page::ObjectPageHeader;
9use crate::page::ObjectState;
10use crate::util::div_pow2;
11use crate::util::is_multiple_of_pow2;
12use futures::Stream;
13use futures::StreamExt;
14use itertools::Itertools;
15use off64::int::Off64AsyncReadInt;
16use off64::int::Off64ReadInt;
17use off64::u64;
18use off64::usz;
19use std::cmp::min;
20use std::error::Error;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::timeout;
24use tracing::trace;
25use tracing::warn;
26
27pub struct OpWriteObjectInput<
28  D: AsRef<[u8]>,
29  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
30> {
31  pub offset: u64,
32  pub incomplete_token: IncompleteToken,
33  pub data_len: u64,
34  pub data_stream: S,
35}
36
37pub struct OpWriteObjectOutput {}
38
39pub(crate) async fn op_write_object<
40  D: AsRef<[u8]>,
41  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
42>(
43  ctx: Arc<Ctx>,
44  mut req: OpWriteObjectInput<D, S>,
45) -> OpResult<OpWriteObjectOutput> {
46  let len = req.data_len;
47  let object_dev_offset = req.incomplete_token.object_dev_offset;
48  trace!(
49    dev_offset = object_dev_offset,
50    offset = req.offset,
51    length = req.data_len,
52    "writing object"
53  );
54
55  // See IncompleteToken for why if the token has not expired, the object definitely still exists (i.e. safe to read any metadata).
56  if req
57    .incomplete_token
58    .has_expired(ctx.reap_objects_after_secs)
59  {
60    return Err(OpError::ObjectNotFound);
61  };
62
63  let incomplete_object_is_still_valid = || async {
64    // Our incomplete reaper simply deletes incomplete objects instead of reaping directly, which avoids some clock drift issues, so we only need to check the type, and should not check if it's expired based on its creation time. This is always correct, as if the page still exists, it's definitely still the same object, as we check well before any deleted object would be reaped.
65    let hdr = ctx
66      .pages
67      .read_page_header::<ObjectPageHeader>(object_dev_offset)
68      .await;
69    hdr.state == ObjectState::Incomplete
70  };
71
72  if !incomplete_object_is_still_valid().await {
73    return Err(OpError::ObjectNotFound);
74  };
75
76  if !is_multiple_of_pow2(req.offset, ctx.pages.lpage_size_pow2) {
77    // Invalid offset.
78    return Err(OpError::UnalignedWrite);
79  };
80  if len > ctx.pages.lpage_size() {
81    // Cannot write greater than one tile size in one request.
82    return Err(OpError::InexactWriteLength);
83  };
84
85  // Read fields before `key` i.e. `size`, `obj_id`, `key_len`.
86  let raw = ctx
87    .device
88    .read_at(object_dev_offset, OBJECT_OFF.key())
89    .await;
90  let object_id = raw.read_u64_be_at(OBJECT_OFF.id());
91  let size = raw.read_u40_be_at(OBJECT_OFF.size());
92  let key_len = raw.read_u16_be_at(OBJECT_OFF.key_len());
93  trace!(
94    object_id,
95    object_dev_offset,
96    size,
97    "found object to write to"
98  );
99
100  if req.offset + len > size {
101    // Offset is past size.
102    return Err(OpError::RangeOutOfBounds);
103  };
104
105  if req.offset + len != min(req.offset + ctx.pages.lpage_size(), size) {
106    // Write does not fully fill lpage or entire tail. All writes must fill as otherwise uninitialised data will get exposed.
107    return Err(OpError::InexactWriteLength);
108  };
109
110  let ObjectLayout {
111    lpage_count,
112    tail_page_sizes_pow2,
113  } = calc_object_layout(&ctx.pages, size);
114  let off = OBJECT_OFF
115    .with_key_len(key_len)
116    .with_lpages(lpage_count)
117    .with_tail_pages(tail_page_sizes_pow2.len());
118  // Vec of (page_size, page_dev_offset).
119  let write_dev_offsets = {
120    let idx = div_pow2(req.offset, ctx.pages.lpage_size_pow2);
121    if idx < lpage_count {
122      vec![(
123        ctx.pages.lpage_size(),
124        Off64AsyncReadInt::read_u48_be_at(&ctx.device, object_dev_offset + off.lpage(idx)).await,
125      )]
126    } else {
127      let raw = ctx
128        .device
129        .read_at(
130          object_dev_offset + off.tail_pages(),
131          6 * u64!(tail_page_sizes_pow2.len()),
132        )
133        .await;
134      let mut offsets = tail_page_sizes_pow2
135        .into_iter()
136        .map(|(i, sz)| (1 << sz, raw.read_u48_be_at(u64!(i) * 6)))
137        .collect_vec();
138      // For the very last tail page, we don't write a full page amount of bytes, unless the object just happens to be a multiple of that page's size. Use `.map` as there may not even be any tail pages at all.
139      offsets.last_mut().map(|(page_size, _page_dev_offset)| {
140        let mod_ = size % *page_size;
141        if mod_ != 0 {
142          *page_size = mod_;
143        };
144      });
145      offsets
146    }
147  };
148
149  let mut written = 0;
150  let mut write_page_idx = 0;
151  let mut buf = Vec::new();
152  loop {
153    // See comment for code below.
154    if !incomplete_object_is_still_valid().await {
155      return Err(OpError::ObjectNotFound);
156    };
157    let Ok(maybe_chunk) = timeout(Duration::from_secs(60), req.data_stream.next()).await else {
158      // We timed out, and need to check if the object is still valid.
159      continue;
160    };
161    if let Some(chunk) = maybe_chunk {
162      buf.extend_from_slice(
163        chunk
164          .map_err(|err| OpError::DataStreamError(Box::from(err)))?
165          .as_ref(),
166      );
167    } else if buf.is_empty() {
168      // Stream has ended and buffer has been fully consumed.
169      break;
170    };
171    let buf_len = u64!(buf.len());
172    if written + buf_len > len {
173      warn!(
174        received = written + buf_len,
175        declared = len,
176        "stream provided more data than declared"
177      );
178      return Err(OpError::DataStreamLengthMismatch);
179    };
180
181    // We have two reasons to check the object state again:
182    // - Prevent use-after-free: incomplete object may have expired while we were writing. This is important to check regularly as we must not write after an object has been released, which would otherwise cause corruption. We need to do this well before actual reap time, to account for possible clock drift and slow execution delaying checks, but no need to do every iteration, e.g. around every 60 seconds.
183    // - Prevent writing after committing: unlike use-after-free, this doesn't actually lead to any corruption, but it's to assist the user to ensure that what they get after they commit is always the same, very useful when creator is different from reader (e.g. content is uploaded by customer, and then hashed and processed by service straight away). AFAICT, doing this every iteration just before writing should be good enough, only possibly microseconds delay due to CPU cache coherence as we're not using locks, atomics, or memory barriers to read the object state. This should be reasonably fast given the object metadata should be in the page cache.
184    if !incomplete_object_is_still_valid().await {
185      return Err(OpError::ObjectNotFound);
186    };
187
188    // TODO We could write more frequently instead of buffering an entire page if the page is larger than one SSD page/block write. However, if it's smaller, the I/O system (e.g. mmap) would be doing buffering and repeated writes anyway.
189    let (amount_to_write, page_dev_offset) = write_dev_offsets[write_page_idx];
190    if buf_len < amount_to_write {
191      continue;
192    };
193    // Optimisation: fdatasync at end of all writes instead of here.
194    ctx
195      .device
196      .write_at(
197        page_dev_offset,
198        buf.drain(..usz!(amount_to_write)).collect_vec(),
199      )
200      .await;
201    trace!(
202      object_id,
203      previously_written = written,
204      page_write_amount = amount_to_write,
205      "wrote page"
206    );
207    written += amount_to_write;
208    write_page_idx += 1;
209  }
210  if written != len {
211    warn!(
212      received = written,
213      declared = len,
214      "stream provided fewer data than declared"
215    );
216    return Err(OpError::DataStreamLengthMismatch);
217  };
218
219  // Optimisation: perform fdatasync in batches.
220  #[cfg(not(test))]
221  ctx
222    .device
223    .write_at_with_delayed_sync::<&'static [u8]>(vec![])
224    .await;
225
226  Ok(OpWriteObjectOutput {})
227}