libblobd_lite/op/
create_object.rs1use super::OpResult;
2use crate::ctx::Ctx;
3use crate::incomplete_token::IncompleteToken;
4use crate::object::calc_object_layout;
5use crate::object::ObjectLayout;
6use crate::object::OBJECT_OFF;
7use crate::op::key_debug_str;
8use crate::op::OpError;
9use crate::util::get_now_ms;
10use off64::int::Off64AsyncWriteInt;
11use off64::int::Off64WriteMutInt;
12use off64::u16;
13use off64::usz;
14use off64::Off64WriteMut;
15use std::sync::Arc;
16use tinybuf::TinyBuf;
17use tracing::trace;
18
19pub struct OpCreateObjectInput {
20 pub key: TinyBuf,
21 pub size: u64,
22 pub assoc_data: TinyBuf,
23}
24
25pub struct OpCreateObjectOutput {
26 pub token: IncompleteToken,
27}
28
29pub(crate) async fn op_create_object(
30 ctx: Arc<Ctx>,
31 req: OpCreateObjectInput,
32) -> OpResult<OpCreateObjectOutput> {
33 let key_len = u16!(req.key.len());
34 let ObjectLayout {
35 lpage_count,
36 tail_page_sizes_pow2,
37 } = calc_object_layout(&ctx.pages, req.size);
38
39 let off = OBJECT_OFF
40 .with_key_len(key_len)
41 .with_lpages(lpage_count)
42 .with_tail_pages(tail_page_sizes_pow2.len())
43 .with_assoc_data_len(u16!(req.assoc_data.len()));
44 let meta_size = off._total_size();
45 if meta_size > ctx.pages.lpage_size() {
46 return Err(OpError::ObjectMetadataTooLarge);
47 };
48
49 trace!(
50 key = key_debug_str(&req.key),
51 meta_size,
52 size = req.size,
53 lpage_count,
54 tail_page_count = tail_page_sizes_pow2.len(),
55 "creating object"
56 );
57
58 let created_ms = get_now_ms();
59
60 let mut raw = vec![0u8; usz!(meta_size)];
61 raw.write_u48_be_at(off.created_ms(), created_ms);
62 raw.write_u40_be_at(off.size(), req.size);
63 raw.write_u16_be_at(off.key_len(), key_len);
64 raw.write_at(off.key(), &req.key);
65 raw.write_u16_be_at(off.assoc_data_len(), u16!(req.assoc_data.len()));
66 raw.write_at(off.assoc_data(), &req.assoc_data);
67
68 let (txn, dev_offset, object_id) = {
69 let mut state = ctx.state.lock().await;
70 let mut txn = ctx.journal.begin_transaction();
71
72 let object_id = state.object_id_serial.next(&mut txn);
73 raw.write_u64_be_at(off.id(), object_id);
74
75 trace!(
76 key = key_debug_str(&req.key),
77 "allocating metadata for object"
78 );
79 let (dev_offset, meta_size_pow2) = state
80 .allocator
81 .allocate_and_ret_with_size(&mut txn, meta_size)
82 .await;
83
84 ctx
86 .device
87 .write_u48_be_at(dev_offset + off.created_ms(), created_ms)
88 .await;
89
90 for i in 0..lpage_count {
91 trace!(
92 key = key_debug_str(&req.key),
93 lpage_index = i,
94 "allocating lpage for object"
95 );
96 let lpage_dev_offset = state
97 .allocator
98 .allocate(&mut txn, ctx.pages.lpage_size())
99 .await;
100 raw.write_u48_be_at(off.lpage(i), lpage_dev_offset);
101 }
102 for (i, tail_page_size_pow2) in tail_page_sizes_pow2 {
103 trace!(
104 key = key_debug_str(&req.key),
105 tail_page_size_pow2,
106 "allocating tail page for object"
107 );
108 let page_dev_offset = state
109 .allocator
110 .allocate(&mut txn, 1 << tail_page_size_pow2)
111 .await;
112 raw.write_u48_be_at(off.tail_page(i), page_dev_offset);
113 }
114
115 state
116 .incomplete_list
117 .attach(&mut txn, dev_offset, meta_size_pow2)
118 .await;
119
120 ctx.metrics.incr_object_count(&mut txn, 1);
121 ctx.metrics.incr_object_data_bytes(&mut txn, req.size);
122 ctx.metrics.incr_object_metadata_bytes(&mut txn, meta_size);
123
124 (txn, dev_offset, object_id)
125 };
126
127 trace!(
128 key = key_debug_str(&req.key),
129 object_id,
130 dev_offset,
131 "allocated object"
132 );
133
134 ctx.device.write_at(dev_offset, raw).await;
135 ctx.journal.commit_transaction(txn).await;
136 trace!(key = key_debug_str(&req.key), object_id, "created object");
137
138 Ok(OpCreateObjectOutput {
139 token: IncompleteToken {
140 created_sec: created_ms / 1000,
141 object_dev_offset: dev_offset,
142 },
143 })
144}