1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use super::OpResult;
use crate::ctx::Ctx;
use crate::incomplete_token::IncompleteToken;
use crate::object::calc_object_layout;
use crate::object::Object;
use crate::object::ObjectMetadata;
use crate::object::ObjectState;
use crate::object::ObjectTuple;
use crate::op::key_debug_str;
use crate::util::ceil_pow2;
use chrono::Utc;
use off64::u64;
use off64::u8;
use off64::Off64WriteMut;
use serde::Serialize;
use std::cmp::max;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tinybuf::TinyBuf;
use tracing::trace;

pub struct OpCreateObjectInput {
  pub key: TinyBuf,
  pub size: u64,
}

pub struct OpCreateObjectOutput {
  pub token: IncompleteToken,
}

pub(crate) async fn op_create_object(
  ctx: Arc<Ctx>,
  req: OpCreateObjectInput,
) -> OpResult<OpCreateObjectOutput> {
  trace!(
    key = key_debug_str(&req.key),
    size = req.size,
    "creating object"
  );

  let layout = calc_object_layout(&ctx.pages, req.size);

  let mut lpage_dev_offsets = Vec::new();
  let mut tail_page_dev_offsets = Vec::new();
  {
    let mut allocator = ctx.heap_allocator.lock();
    for _ in 0..layout.lpage_count {
      let lpage_dev_offset = allocator.allocate(ctx.pages.lpage_size()).unwrap();
      lpage_dev_offsets.push(lpage_dev_offset);
    }
    for (_, tail_page_size_pow2) in layout.tail_page_sizes_pow2 {
      let page_dev_offset = allocator.allocate(1 << tail_page_size_pow2).unwrap();
      tail_page_dev_offsets.push(page_dev_offset);
    }
  };

  let metadata = ObjectMetadata {
    created: Utc::now(),
    key: req.key,
    size: req.size,
    lpage_dev_offsets,
    tail_page_dev_offsets,
  };

  let mut metadata_raw = Vec::new();
  metadata
    .serialize(&mut rmp_serde::Serializer::new(&mut metadata_raw))
    .unwrap();
  let metadata_size = u64!(metadata_raw.len());
  // TODO Check before allocating space and return `OpError::ObjectMetadataTooLarge`.
  assert!(metadata_size <= ctx.pages.lpage_size());
  let metadata_page_size = max(
    ctx.pages.spage_size(),
    ceil_pow2(metadata_size, ctx.pages.spage_size_pow2),
  );
  let metadata_dev_offset = ctx.heap_allocator.lock().allocate(metadata_size).unwrap();

  let object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
  let tuple = ObjectTuple {
    id: object_id,
    state: ObjectState::Incomplete,
    metadata_dev_offset,
    metadata_page_size_pow2: u8!(metadata_page_size.ilog2()),
  };

  // NOTE: This is not the same as `allocate_from_data` as `metadata_page_size` may be much larger than the actual `metadata_size`.
  let mut write_page = ctx.pages.allocate_uninitialised(metadata_page_size);
  write_page.write_at(0, &metadata_raw);

  let None = ctx.incomplete_objects.write().insert(object_id, Object::new(object_id, ObjectState::Incomplete, metadata)) else {
    unreachable!();
  };

  ctx.device.write_at(metadata_dev_offset, write_page).await;

  ctx.tuples.insert_object(tuple).await;

  trace!(
    id = object_id,
    size = req.size,
    metadata_dev_offset = metadata_dev_offset,
    metadata_page_size = metadata_page_size,
    "created object"
  );

  Ok(OpCreateObjectOutput {
    token: IncompleteToken {
      partition_idx: ctx.partition_idx,
      object_id,
    },
  })
}