libblobd_kv/op/
write_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::allocator::Allocator;
4use crate::ctx::Ctx;
5use crate::metrics::BlobdMetrics;
6use crate::object::ObjectTupleData;
7use crate::object::LOG_ENTRY_DATA_LEN_INLINE_THRESHOLD;
8use crate::object::OBJECT_SIZE_MAX;
9use crate::pages::Pages;
10use crate::util::ceil_pow2;
11use off64::u32;
12use off64::u64;
13use off64::Off64WriteMut;
14use std::cmp::max;
15use std::sync::atomic::Ordering::Relaxed;
16use std::sync::Arc;
17use tinybuf::TinyBuf;
18
19pub struct OpWriteObjectInput {
20  pub key: TinyBuf,
21  pub data: Vec<u8>,
22}
23
24pub struct OpWriteObjectOutput {}
25
26// This only updates the allocator and doesn't create the write buffer or actually perform the async write, so that we can efficiently bo a bunch of allocations without excessively acquiring and releasing the allocator lock.
27// Returns the device offset and the padded size.
28pub(crate) fn allocate_object_on_heap(
29  heap_allocator: &mut Allocator,
30  pages: &Pages,
31  metrics: &BlobdMetrics,
32  size: u32,
33) -> Result<(u64, u64), OpError> {
34  let Ok(dev_offset) = heap_allocator.allocate(size) else {
35    return Err(OpError::OutOfSpace);
36  };
37  let size_on_dev = max(
38    pages.spage_size(),
39    ceil_pow2(size.into(), pages.spage_size_pow2),
40  );
41  let padding = size_on_dev - u64!(size);
42  metrics
43    .0
44    .heap_object_data_bytes
45    .fetch_add(size.into(), Relaxed);
46  metrics.0.write_op_bytes_padding.fetch_add(padding, Relaxed);
47  Ok((dev_offset, size_on_dev))
48}
49
50pub(crate) async fn op_write_object(
51  ctx: Arc<Ctx>,
52  req: OpWriteObjectInput,
53) -> OpResult<OpWriteObjectOutput> {
54  let size = req.data.len();
55  if size > OBJECT_SIZE_MAX {
56    return Err(OpError::ObjectTooLarge);
57  };
58
59  // This will go into the log first, so the threshold is not `OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD`.
60  let tuple_data = if size <= LOG_ENTRY_DATA_LEN_INLINE_THRESHOLD {
61    ObjectTupleData::Inline(req.data.into())
62  } else {
63    let (dev_offset, size_on_dev) = allocate_object_on_heap(
64      &mut ctx.heap_allocator.lock(),
65      &ctx.pages,
66      &ctx.metrics,
67      u32!(size),
68    )?;
69    let mut buf = ctx.pages.allocate_uninitialised(size_on_dev);
70    buf.write_at(0, &req.data);
71    ctx.device.write_at(dev_offset, buf).await;
72    ObjectTupleData::Heap {
73      size: u32!(size),
74      dev_offset,
75    }
76  };
77  ctx.metrics.0.write_op_count.fetch_add(1, Relaxed);
78  ctx
79    .metrics
80    .0
81    .write_op_bytes_persisted
82    .fetch_add(u64!(size), Relaxed);
83
84  ctx.log_buffer.upsert_tuple(req.key, tuple_data).await;
85
86  Ok(OpWriteObjectOutput {})
87}