libblobd_kv/op/
write_object.rs1use super::OpError;
2use super::OpResult;
3use crate::allocator::Allocator;
4use crate::ctx::Ctx;
5use crate::metrics::BlobdMetrics;
6use crate::object::ObjectTupleData;
7use crate::object::LOG_ENTRY_DATA_LEN_INLINE_THRESHOLD;
8use crate::object::OBJECT_SIZE_MAX;
9use crate::pages::Pages;
10use crate::util::ceil_pow2;
11use off64::u32;
12use off64::u64;
13use off64::Off64WriteMut;
14use std::cmp::max;
15use std::sync::atomic::Ordering::Relaxed;
16use std::sync::Arc;
17use tinybuf::TinyBuf;
18
19pub struct OpWriteObjectInput {
20 pub key: TinyBuf,
21 pub data: Vec<u8>,
22}
23
24pub struct OpWriteObjectOutput {}
25
26pub(crate) fn allocate_object_on_heap(
29 heap_allocator: &mut Allocator,
30 pages: &Pages,
31 metrics: &BlobdMetrics,
32 size: u32,
33) -> Result<(u64, u64), OpError> {
34 let Ok(dev_offset) = heap_allocator.allocate(size) else {
35 return Err(OpError::OutOfSpace);
36 };
37 let size_on_dev = max(
38 pages.spage_size(),
39 ceil_pow2(size.into(), pages.spage_size_pow2),
40 );
41 let padding = size_on_dev - u64!(size);
42 metrics
43 .0
44 .heap_object_data_bytes
45 .fetch_add(size.into(), Relaxed);
46 metrics.0.write_op_bytes_padding.fetch_add(padding, Relaxed);
47 Ok((dev_offset, size_on_dev))
48}
49
50pub(crate) async fn op_write_object(
51 ctx: Arc<Ctx>,
52 req: OpWriteObjectInput,
53) -> OpResult<OpWriteObjectOutput> {
54 let size = req.data.len();
55 if size > OBJECT_SIZE_MAX {
56 return Err(OpError::ObjectTooLarge);
57 };
58
59 let tuple_data = if size <= LOG_ENTRY_DATA_LEN_INLINE_THRESHOLD {
61 ObjectTupleData::Inline(req.data.into())
62 } else {
63 let (dev_offset, size_on_dev) = allocate_object_on_heap(
64 &mut ctx.heap_allocator.lock(),
65 &ctx.pages,
66 &ctx.metrics,
67 u32!(size),
68 )?;
69 let mut buf = ctx.pages.allocate_uninitialised(size_on_dev);
70 buf.write_at(0, &req.data);
71 ctx.device.write_at(dev_offset, buf).await;
72 ObjectTupleData::Heap {
73 size: u32!(size),
74 dev_offset,
75 }
76 };
77 ctx.metrics.0.write_op_count.fetch_add(1, Relaxed);
78 ctx
79 .metrics
80 .0
81 .write_op_bytes_persisted
82 .fetch_add(u64!(size), Relaxed);
83
84 ctx.log_buffer.upsert_tuple(req.key, tuple_data).await;
85
86 Ok(OpWriteObjectOutput {})
87}