libblobd_direct/op/
write_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::calc_object_layout;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::is_multiple_of_pow2;
10use futures::Stream;
11use futures::StreamExt;
12use itertools::Itertools;
13use off64::u32;
14use off64::u64;
15use off64::usz;
16use std::cmp::max;
17use std::cmp::min;
18use std::error::Error;
19use std::iter::empty;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tracing::trace;
23use tracing::warn;
24
25pub struct OpWriteObjectInput<
26  D: AsRef<[u8]>,
27  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
28> {
29  pub incomplete_token: IncompleteToken,
30  pub offset: u64,
31  pub data_len: u64,
32  pub data_stream: S,
33}
34
35pub struct OpWriteObjectOutput {}
36
37pub(crate) async fn op_write_object<
38  D: AsRef<[u8]>,
39  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
40>(
41  ctx: Arc<Ctx>,
42  mut req: OpWriteObjectInput<D, S>,
43) -> OpResult<OpWriteObjectOutput> {
44  let len = req.data_len;
45  let IncompleteToken { object_id, .. } = req.incomplete_token;
46
47  let Some(obj) = ctx.incomplete_objects.read().get(&object_id).cloned() else {
48    return Err(OpError::ObjectNotFound);
49  };
50
51  if !is_multiple_of_pow2(req.offset, ctx.pages.lpage_size_pow2) {
52    // Invalid offset.
53    return Err(OpError::UnalignedWrite);
54  };
55  if len > ctx.pages.lpage_size() {
56    // Cannot write greater than one lpage size in one request.
57    return Err(OpError::InexactWriteLength);
58  };
59
60  if req.offset + len > obj.size {
61    // Offset is past size.
62    return Err(OpError::RangeOutOfBounds);
63  };
64
65  if req.offset + len != min(req.offset + ctx.pages.lpage_size(), obj.size) {
66    // Write does not fully fill lpage or entire tail. All writes must fill as otherwise uninitialised data will get exposed.
67    return Err(OpError::InexactWriteLength);
68  };
69
70  ctx.metrics.0.write_op_count.fetch_add(1, Relaxed);
71  ctx
72    .metrics
73    .0
74    .write_op_bytes_requested
75    .fetch_add(len, Relaxed);
76
77  let layout = calc_object_layout(&ctx.pages, obj.size);
78
79  // Vec of (page_size, page_dev_offset).
80  let write_dev_offsets = {
81    let idx = u32!(div_pow2(req.offset, ctx.pages.lpage_size_pow2));
82    if idx < layout.lpage_count {
83      vec![(ctx.pages.lpage_size(), obj.lpage_dev_offsets[usz!(idx)])]
84    } else {
85      let mut offsets = layout
86        .tail_page_sizes_pow2
87        .into_iter()
88        .map(|(i, sz)| (1 << sz, obj.tail_page_dev_offsets[usz!(i)]))
89        .collect_vec();
90      // For the very last tail page, we don't write a full page amount of bytes, unless the object just happens to be a multiple of that page's size. Use `.map` as there may not even be any tail pages at all.
91      offsets.last_mut().map(|(page_size, _page_dev_offset)| {
92        let mod_ = obj.size % *page_size;
93        if mod_ != 0 {
94          *page_size = mod_;
95        };
96      });
97      offsets
98    }
99  };
100
101  let mut written = 0;
102  let mut write_page_idx = 0;
103  let mut buf = Vec::new();
104  loop {
105    let maybe_chunk = req.data_stream.next().await;
106    if let Some(chunk) = maybe_chunk {
107      buf.extend_from_slice(
108        chunk
109          .map_err(|err| OpError::DataStreamError(Box::from(err)))?
110          .as_ref(),
111      );
112    } else if buf.is_empty() {
113      // Stream has ended and buffer has been fully consumed.
114      break;
115    };
116    let buf_len = u64!(buf.len());
117    if written + buf_len > len {
118      warn!(
119        received = written + buf_len,
120        declared = len,
121        "stream provided more data than declared"
122      );
123      return Err(OpError::DataStreamLengthMismatch);
124    };
125
126    // TODO We could write more frequently instead of buffering an entire page if the page is larger than one SSD page/block write. However, if it's smaller, the I/O system (e.g. mmap) would be doing buffering and repeated writes anyway.
127    let (amount_to_write, page_dev_offset) = write_dev_offsets[write_page_idx];
128    if buf_len < amount_to_write {
129      continue;
130    };
131    {
132      let lock = obj
133        .lock_for_writing_if_still_valid(ObjectState::Incomplete)
134        .await?;
135      // Optimisation: fdatasync at end of all writes instead of here.
136      // We can't use `allocate_from_data` as it won't be sized correctly.
137      let mut write_data = ctx.pages.allocate_uninitialised(max(
138        ctx.pages.spage_size(),
139        ceil_pow2(amount_to_write, ctx.pages.spage_size_pow2),
140      ));
141      write_data[..usz!(amount_to_write)].copy_from_slice(&buf[..usz!(amount_to_write)]);
142      buf.splice(..usz!(amount_to_write), empty());
143      // NOTE: This is different from `amount_to_write` due to alignment and/or padding (i.e. write amplification).
144      let write_data_len = write_data.len();
145      ctx.device.write_at(page_dev_offset, write_data).await;
146      ctx
147        .metrics
148        .0
149        .write_op_bytes_written
150        .fetch_add(u64!(write_data_len), Relaxed);
151      trace!(
152        object_id,
153        previously_written = written,
154        page_write_amount = amount_to_write,
155        "wrote page"
156      );
157      written += amount_to_write;
158      write_page_idx += 1;
159      drop(lock);
160    };
161  }
162  if written != len {
163    warn!(
164      received = written,
165      declared = len,
166      "stream provided fewer data than declared"
167    );
168    return Err(OpError::DataStreamLengthMismatch);
169  };
170
171  ctx.device.sync().await;
172
173  Ok(OpWriteObjectOutput {})
174}