libblobd_lite/op/
create_object.rs

1use super::OpResult;
2use crate::ctx::Ctx;
3use crate::incomplete_token::IncompleteToken;
4use crate::object::calc_object_layout;
5use crate::object::ObjectLayout;
6use crate::object::OBJECT_OFF;
7use crate::op::key_debug_str;
8use crate::op::OpError;
9use crate::util::get_now_ms;
10use off64::int::Off64AsyncWriteInt;
11use off64::int::Off64WriteMutInt;
12use off64::u16;
13use off64::usz;
14use off64::Off64WriteMut;
15use std::sync::Arc;
16use tinybuf::TinyBuf;
17use tracing::trace;
18
19pub struct OpCreateObjectInput {
20  pub key: TinyBuf,
21  pub size: u64,
22  pub assoc_data: TinyBuf,
23}
24
25pub struct OpCreateObjectOutput {
26  pub token: IncompleteToken,
27}
28
29pub(crate) async fn op_create_object(
30  ctx: Arc<Ctx>,
31  req: OpCreateObjectInput,
32) -> OpResult<OpCreateObjectOutput> {
33  let key_len = u16!(req.key.len());
34  let ObjectLayout {
35    lpage_count,
36    tail_page_sizes_pow2,
37  } = calc_object_layout(&ctx.pages, req.size);
38
39  let off = OBJECT_OFF
40    .with_key_len(key_len)
41    .with_lpages(lpage_count)
42    .with_tail_pages(tail_page_sizes_pow2.len())
43    .with_assoc_data_len(u16!(req.assoc_data.len()));
44  let meta_size = off._total_size();
45  if meta_size > ctx.pages.lpage_size() {
46    return Err(OpError::ObjectMetadataTooLarge);
47  };
48
49  trace!(
50    key = key_debug_str(&req.key),
51    meta_size,
52    size = req.size,
53    lpage_count,
54    tail_page_count = tail_page_sizes_pow2.len(),
55    "creating object"
56  );
57
58  let created_ms = get_now_ms();
59
60  let mut raw = vec![0u8; usz!(meta_size)];
61  raw.write_u48_be_at(off.created_ms(), created_ms);
62  raw.write_u40_be_at(off.size(), req.size);
63  raw.write_u16_be_at(off.key_len(), key_len);
64  raw.write_at(off.key(), &req.key);
65  raw.write_u16_be_at(off.assoc_data_len(), u16!(req.assoc_data.len()));
66  raw.write_at(off.assoc_data(), &req.assoc_data);
67
68  let (txn, dev_offset, object_id) = {
69    let mut state = ctx.state.lock().await;
70    let mut txn = ctx.journal.begin_transaction();
71
72    let object_id = state.object_id_serial.next(&mut txn);
73    raw.write_u64_be_at(off.id(), object_id);
74
75    trace!(
76      key = key_debug_str(&req.key),
77      "allocating metadata for object"
78    );
79    let (dev_offset, meta_size_pow2) = state
80      .allocator
81      .allocate_and_ret_with_size(&mut txn, meta_size)
82      .await;
83
84    // This is a one-time special direct write to prevent an edge case where the object is attached to the incomplete list, and the incomplete list tries to read the object's metadata (e.g. creation time), *before* the journal has actually written it out to the mmap. Normally this is dangerous because an unexpected page writeback could leave the device in a corrupted state, but in this specific case it's fine because we're writing to free space, so even if we crash right after this it's just junk. This specific issue can only occur here because object metadata is immutable and all other ops on the object can only occur after the journal has committed and the reseponse has been returned.
85    ctx
86      .device
87      .write_u48_be_at(dev_offset + off.created_ms(), created_ms)
88      .await;
89
90    for i in 0..lpage_count {
91      trace!(
92        key = key_debug_str(&req.key),
93        lpage_index = i,
94        "allocating lpage for object"
95      );
96      let lpage_dev_offset = state
97        .allocator
98        .allocate(&mut txn, ctx.pages.lpage_size())
99        .await;
100      raw.write_u48_be_at(off.lpage(i), lpage_dev_offset);
101    }
102    for (i, tail_page_size_pow2) in tail_page_sizes_pow2 {
103      trace!(
104        key = key_debug_str(&req.key),
105        tail_page_size_pow2,
106        "allocating tail page for object"
107      );
108      let page_dev_offset = state
109        .allocator
110        .allocate(&mut txn, 1 << tail_page_size_pow2)
111        .await;
112      raw.write_u48_be_at(off.tail_page(i), page_dev_offset);
113    }
114
115    state
116      .incomplete_list
117      .attach(&mut txn, dev_offset, meta_size_pow2)
118      .await;
119
120    ctx.metrics.incr_object_count(&mut txn, 1);
121    ctx.metrics.incr_object_data_bytes(&mut txn, req.size);
122    ctx.metrics.incr_object_metadata_bytes(&mut txn, meta_size);
123
124    (txn, dev_offset, object_id)
125  };
126
127  trace!(
128    key = key_debug_str(&req.key),
129    object_id,
130    dev_offset,
131    "allocated object"
132  );
133
134  ctx.device.write_at(dev_offset, raw).await;
135  ctx.journal.commit_transaction(txn).await;
136  trace!(key = key_debug_str(&req.key), object_id, "created object");
137
138  Ok(OpCreateObjectOutput {
139    token: IncompleteToken {
140      created_sec: created_ms / 1000,
141      object_dev_offset: dev_offset,
142    },
143  })
144}