libblobd_direct/op/
create_object.rs1use super::OpResult;
2use crate::ctx::Ctx;
3use crate::incomplete_token::IncompleteToken;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectMetadata;
7use crate::object::ObjectState;
8use crate::object::ObjectTuple;
9use crate::op::key_debug_str;
10use crate::util::ceil_pow2;
11use chrono::Utc;
12use off64::u64;
13use off64::u8;
14use off64::Off64WriteMut;
15use serde::Serialize;
16use std::cmp::max;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use tinybuf::TinyBuf;
20use tracing::trace;
21
22pub struct OpCreateObjectInput {
23 pub key: TinyBuf,
24 pub size: u64,
25}
26
27pub struct OpCreateObjectOutput {
28 pub token: IncompleteToken,
29}
30
31pub(crate) async fn op_create_object(
32 ctx: Arc<Ctx>,
33 req: OpCreateObjectInput,
34) -> OpResult<OpCreateObjectOutput> {
35 trace!(
36 key = key_debug_str(&req.key),
37 size = req.size,
38 "creating object"
39 );
40
41 let layout = calc_object_layout(&ctx.pages, req.size);
42
43 let mut lpage_dev_offsets = Vec::new();
44 let mut tail_page_dev_offsets = Vec::new();
45 {
46 let mut allocator = ctx.heap_allocator.lock();
47 for _ in 0..layout.lpage_count {
48 let lpage_dev_offset = allocator.allocate(ctx.pages.lpage_size()).unwrap();
49 lpage_dev_offsets.push(lpage_dev_offset);
50 }
51 for (_, tail_page_size_pow2) in layout.tail_page_sizes_pow2 {
52 let page_dev_offset = allocator.allocate(1 << tail_page_size_pow2).unwrap();
53 tail_page_dev_offsets.push(page_dev_offset);
54 }
55 };
56
57 let metadata = ObjectMetadata {
58 created: Utc::now(),
59 key: req.key,
60 size: req.size,
61 lpage_dev_offsets,
62 tail_page_dev_offsets,
63 };
64
65 let mut metadata_raw = Vec::new();
66 metadata
67 .serialize(&mut rmp_serde::Serializer::new(&mut metadata_raw))
68 .unwrap();
69 let metadata_size = u64!(metadata_raw.len());
70 assert!(metadata_size <= ctx.pages.lpage_size());
72 let metadata_page_size = max(
73 ctx.pages.spage_size(),
74 ceil_pow2(metadata_size, ctx.pages.spage_size_pow2),
75 );
76 let metadata_dev_offset = ctx.heap_allocator.lock().allocate(metadata_size).unwrap();
77
78 let object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
79 let tuple = ObjectTuple {
80 id: object_id,
81 state: ObjectState::Incomplete,
82 metadata_dev_offset,
83 metadata_page_size_pow2: u8!(metadata_page_size.ilog2()),
84 };
85
86 let mut write_page = ctx.pages.allocate_uninitialised(metadata_page_size);
88 write_page.write_at(0, &metadata_raw);
89
90 ctx.device.write_at(metadata_dev_offset, write_page).await;
91
92 ctx.tuples.insert_object(tuple).await;
93
94 let None = ctx.incomplete_objects.write().insert(object_id, Object::new(object_id, ObjectState::Incomplete, metadata, metadata_size)) else {
96 unreachable!();
97 };
98
99 ctx
100 .metrics
101 .0
102 .create_op_count
103 .fetch_add(1, Ordering::Relaxed);
104 ctx
105 .metrics
106 .0
107 .incomplete_object_count
108 .fetch_add(1, Ordering::Relaxed);
109 ctx
110 .metrics
111 .0
112 .object_metadata_bytes
113 .fetch_add(metadata_size, Ordering::Relaxed);
114 ctx
115 .metrics
116 .0
117 .object_data_bytes
118 .fetch_add(req.size, Ordering::Relaxed);
119
120 trace!(
121 id = object_id,
122 size = req.size,
123 metadata_dev_offset = metadata_dev_offset,
124 metadata_page_size = metadata_page_size,
125 "created object"
126 );
127
128 Ok(OpCreateObjectOutput {
129 token: IncompleteToken {
130 partition_idx: ctx.partition_idx,
131 object_id,
132 },
133 })
134}