libblobd_direct/object/
mod.rs

1use self::tail::TailPageSizes;
2use crate::objects::ObjectId;
3use crate::op::OpError;
4use crate::pages::Pages;
5use crate::util::ceil_pow2;
6use crate::util::div_mod_pow2;
7use chrono::serde::ts_microseconds;
8use chrono::DateTime;
9use chrono::Utc;
10use num_derive::FromPrimitive;
11use num_traits::FromPrimitive;
12use off64::int::Off64ReadInt;
13use off64::int::Off64WriteMutInt;
14use off64::u32;
15use off64::u8;
16use off64::usz;
17use serde::Deserialize;
18use serde::Serialize;
19use std::ops::Deref;
20use std::sync::atomic::AtomicU8;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24use tokio::sync::RwLockReadGuard;
25
26pub mod tail;
27
28#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
29#[repr(u8)]
30pub(crate) enum ObjectState {
31  // These are used when scanning the device and aren't actually object states.
32  // `_EndOfBundleTuples` must be zero so that a zero filled device is already in that state.
33  _EndOfBundleTuples = 0,
34  // These are also used in-memory and are actually object states.
35  Incomplete,
36  Committed,
37}
38
39pub(crate) struct ObjectLayout {
40  pub lpage_count: u32,
41  pub tail_page_sizes_pow2: TailPageSizes,
42}
43
44pub(crate) fn calc_object_layout(pages: &Pages, object_size: u64) -> ObjectLayout {
45  let (lpage_count, tail_size) = div_mod_pow2(object_size, pages.lpage_size_pow2);
46  let lpage_count = u32!(lpage_count);
47  let mut rem = ceil_pow2(tail_size, pages.spage_size_pow2);
48  let mut tail_page_sizes_pow2 = TailPageSizes::new();
49  loop {
50    let pos = rem.leading_zeros();
51    if pos == 64 {
52      break;
53    };
54    let pow2 = u8!(63 - pos);
55    tail_page_sizes_pow2.push(pow2);
56    rem &= !(1 << pow2);
57  }
58  ObjectLayout {
59    lpage_count,
60    tail_page_sizes_pow2,
61  }
62}
63
64const TUPLE_OFFSETOF_STATE: u64 = 0;
65const TUPLE_OFFSETOF_ID: u64 = TUPLE_OFFSETOF_STATE + 1;
66const TUPLE_OFFSETOF_METADATA_DEV_OFFSET: u64 = TUPLE_OFFSETOF_ID + 16;
67const TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2: u64 = TUPLE_OFFSETOF_METADATA_DEV_OFFSET + 6;
68pub(crate) const OBJECT_TUPLE_SERIALISED_LEN: u64 = TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2 + 1;
69
70#[derive(Clone, Debug)]
71pub(crate) struct ObjectTuple {
72  pub state: ObjectState,
73  pub id: ObjectId,
74  pub metadata_dev_offset: u64,
75  pub metadata_page_size_pow2: u8,
76}
77
78impl ObjectTuple {
79  pub fn serialise(&self, out: &mut [u8]) {
80    assert_eq!(out.len(), usz!(OBJECT_TUPLE_SERIALISED_LEN));
81    out[usz!(TUPLE_OFFSETOF_STATE)] = self.state as u8;
82    out.write_u128_le_at(TUPLE_OFFSETOF_ID, self.id);
83    out.write_u48_le_at(TUPLE_OFFSETOF_METADATA_DEV_OFFSET, self.metadata_dev_offset);
84    out[usz!(TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2)] = self.metadata_page_size_pow2;
85  }
86
87  pub fn deserialise(raw: &[u8]) -> Self {
88    assert_eq!(raw.len(), usz!(OBJECT_TUPLE_SERIALISED_LEN));
89    Self {
90      state: ObjectState::from_u8(raw[usz!(TUPLE_OFFSETOF_STATE)]).unwrap(),
91      id: raw.read_u128_le_at(TUPLE_OFFSETOF_ID),
92      metadata_dev_offset: raw.read_u48_le_at(TUPLE_OFFSETOF_METADATA_DEV_OFFSET),
93      metadata_page_size_pow2: raw[usz!(TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2)],
94    }
95  }
96}
97
98// WARNING: Do not reorder or remove the fields in this struct, as it is serialised without field names and in this order, and changing would mean breaking all existing persisted metadata.
99#[derive(Serialize, Deserialize, Clone)]
100pub(crate) struct ObjectMetadata {
101  pub size: u64,
102  #[serde(with = "ts_microseconds")]
103  pub created: DateTime<Utc>,
104  pub key: Vec<u8>,
105  pub lpage_dev_offsets: Vec<u64>,
106  pub tail_page_dev_offsets: Vec<u64>,
107}
108
109struct ObjectInner {
110  id: ObjectId,
111  metadata_size: u64, // How large the `metadata` is in bytes when serialised.
112  state: AtomicU8,
113  lock: RwLock<()>,
114  metadata: ObjectMetadata,
115}
116
117#[derive(Clone)]
118pub(crate) struct Object {
119  inner: Arc<ObjectInner>,
120}
121
122impl Object {
123  pub fn new(
124    id: ObjectId,
125    state: ObjectState,
126    metadata: ObjectMetadata,
127    metadata_size: u64,
128  ) -> Self {
129    Self {
130      inner: Arc::new(ObjectInner {
131        id,
132        lock: RwLock::new(()),
133        metadata,
134        metadata_size,
135        state: AtomicU8::new(state as u8),
136      }),
137    }
138  }
139
140  pub fn with_new_id(self, new_id: ObjectId) -> Self {
141    Self::new(
142      new_id,
143      self.get_state(),
144      self.inner.metadata.clone(),
145      self.inner.metadata_size,
146    )
147  }
148
149  pub fn id(&self) -> ObjectId {
150    self.inner.id
151  }
152
153  pub fn metadata_size(&self) -> u64 {
154    self.inner.metadata_size
155  }
156
157  pub fn get_state(&self) -> ObjectState {
158    ObjectState::from_u8(self.inner.state.load(Ordering::Relaxed)).unwrap()
159  }
160
161  pub async fn update_state_then_ensure_no_writers(&self, new_state: ObjectState) {
162    // Update state BEFORE acquiring lock.
163    self.inner.state.store(new_state as u8, Ordering::Relaxed);
164    let lock = self.inner.lock.write().await;
165    drop(lock);
166  }
167
168  pub async fn lock_for_writing_if_still_valid(
169    &self,
170    expected_state: ObjectState,
171  ) -> Result<RwLockReadGuard<'_, ()>, OpError> {
172    // Acquire lock BEFORE checking state.
173    let lock = self.inner.lock.read().await;
174    if self.get_state() != expected_state {
175      return Err(OpError::ObjectNotFound);
176    };
177    Ok(lock)
178  }
179}
180
181impl Deref for Object {
182  type Target = ObjectMetadata;
183
184  fn deref(&self) -> &Self::Target {
185    &self.inner.metadata
186  }
187}