libblobd_direct/op/
create_object.rs

1use super::OpResult;
2use crate::ctx::Ctx;
3use crate::incomplete_token::IncompleteToken;
4use crate::object::calc_object_layout;
5use crate::object::Object;
6use crate::object::ObjectMetadata;
7use crate::object::ObjectState;
8use crate::object::ObjectTuple;
9use crate::op::key_debug_str;
10use crate::util::ceil_pow2;
11use chrono::Utc;
12use off64::u64;
13use off64::u8;
14use off64::Off64WriteMut;
15use serde::Serialize;
16use std::cmp::max;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use tinybuf::TinyBuf;
20use tracing::trace;
21
22pub struct OpCreateObjectInput {
23  pub key: TinyBuf,
24  pub size: u64,
25}
26
27pub struct OpCreateObjectOutput {
28  pub token: IncompleteToken,
29}
30
31pub(crate) async fn op_create_object(
32  ctx: Arc<Ctx>,
33  req: OpCreateObjectInput,
34) -> OpResult<OpCreateObjectOutput> {
35  trace!(
36    key = key_debug_str(&req.key),
37    size = req.size,
38    "creating object"
39  );
40
41  let layout = calc_object_layout(&ctx.pages, req.size);
42
43  let mut lpage_dev_offsets = Vec::new();
44  let mut tail_page_dev_offsets = Vec::new();
45  {
46    let mut allocator = ctx.heap_allocator.lock();
47    for _ in 0..layout.lpage_count {
48      let lpage_dev_offset = allocator.allocate(ctx.pages.lpage_size()).unwrap();
49      lpage_dev_offsets.push(lpage_dev_offset);
50    }
51    for (_, tail_page_size_pow2) in layout.tail_page_sizes_pow2 {
52      let page_dev_offset = allocator.allocate(1 << tail_page_size_pow2).unwrap();
53      tail_page_dev_offsets.push(page_dev_offset);
54    }
55  };
56
57  let metadata = ObjectMetadata {
58    created: Utc::now(),
59    key: req.key,
60    size: req.size,
61    lpage_dev_offsets,
62    tail_page_dev_offsets,
63  };
64
65  let mut metadata_raw = Vec::new();
66  metadata
67    .serialize(&mut rmp_serde::Serializer::new(&mut metadata_raw))
68    .unwrap();
69  let metadata_size = u64!(metadata_raw.len());
70  // TODO Check before allocating space and return `OpError::ObjectMetadataTooLarge`.
71  assert!(metadata_size <= ctx.pages.lpage_size());
72  let metadata_page_size = max(
73    ctx.pages.spage_size(),
74    ceil_pow2(metadata_size, ctx.pages.spage_size_pow2),
75  );
76  let metadata_dev_offset = ctx.heap_allocator.lock().allocate(metadata_size).unwrap();
77
78  let object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
79  let tuple = ObjectTuple {
80    id: object_id,
81    state: ObjectState::Incomplete,
82    metadata_dev_offset,
83    metadata_page_size_pow2: u8!(metadata_page_size.ilog2()),
84  };
85
86  // NOTE: This is not the same as `allocate_from_data` as `metadata_page_size` may be much larger than the actual `metadata_size`.
87  let mut write_page = ctx.pages.allocate_uninitialised(metadata_page_size);
88  write_page.write_at(0, &metadata_raw);
89
90  ctx.device.write_at(metadata_dev_offset, write_page).await;
91
92  ctx.tuples.insert_object(tuple).await;
93
94  // Out of abundance of caution, insert AFTER tuple is certain to have persisted.
95  let None = ctx.incomplete_objects.write().insert(object_id, Object::new(object_id, ObjectState::Incomplete, metadata, metadata_size)) else {
96    unreachable!();
97  };
98
99  ctx
100    .metrics
101    .0
102    .create_op_count
103    .fetch_add(1, Ordering::Relaxed);
104  ctx
105    .metrics
106    .0
107    .incomplete_object_count
108    .fetch_add(1, Ordering::Relaxed);
109  ctx
110    .metrics
111    .0
112    .object_metadata_bytes
113    .fetch_add(metadata_size, Ordering::Relaxed);
114  ctx
115    .metrics
116    .0
117    .object_data_bytes
118    .fetch_add(req.size, Ordering::Relaxed);
119
120  trace!(
121    id = object_id,
122    size = req.size,
123    metadata_dev_offset = metadata_dev_offset,
124    metadata_page_size = metadata_page_size,
125    "created object"
126  );
127
128  Ok(OpCreateObjectOutput {
129    token: IncompleteToken {
130      partition_idx: ctx.partition_idx,
131      object_id,
132    },
133  })
134}