libblobd_direct/object/
mod.rs1use self::tail::TailPageSizes;
2use crate::objects::ObjectId;
3use crate::op::OpError;
4use crate::pages::Pages;
5use crate::util::ceil_pow2;
6use crate::util::div_mod_pow2;
7use chrono::serde::ts_microseconds;
8use chrono::DateTime;
9use chrono::Utc;
10use num_derive::FromPrimitive;
11use num_traits::FromPrimitive;
12use off64::int::Off64ReadInt;
13use off64::int::Off64WriteMutInt;
14use off64::u32;
15use off64::u8;
16use off64::usz;
17use serde::Deserialize;
18use serde::Serialize;
19use std::ops::Deref;
20use std::sync::atomic::AtomicU8;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24use tokio::sync::RwLockReadGuard;
25
26pub mod tail;
27
28#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
29#[repr(u8)]
30pub(crate) enum ObjectState {
31 _EndOfBundleTuples = 0,
34 Incomplete,
36 Committed,
37}
38
39pub(crate) struct ObjectLayout {
40 pub lpage_count: u32,
41 pub tail_page_sizes_pow2: TailPageSizes,
42}
43
44pub(crate) fn calc_object_layout(pages: &Pages, object_size: u64) -> ObjectLayout {
45 let (lpage_count, tail_size) = div_mod_pow2(object_size, pages.lpage_size_pow2);
46 let lpage_count = u32!(lpage_count);
47 let mut rem = ceil_pow2(tail_size, pages.spage_size_pow2);
48 let mut tail_page_sizes_pow2 = TailPageSizes::new();
49 loop {
50 let pos = rem.leading_zeros();
51 if pos == 64 {
52 break;
53 };
54 let pow2 = u8!(63 - pos);
55 tail_page_sizes_pow2.push(pow2);
56 rem &= !(1 << pow2);
57 }
58 ObjectLayout {
59 lpage_count,
60 tail_page_sizes_pow2,
61 }
62}
63
64const TUPLE_OFFSETOF_STATE: u64 = 0;
65const TUPLE_OFFSETOF_ID: u64 = TUPLE_OFFSETOF_STATE + 1;
66const TUPLE_OFFSETOF_METADATA_DEV_OFFSET: u64 = TUPLE_OFFSETOF_ID + 16;
67const TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2: u64 = TUPLE_OFFSETOF_METADATA_DEV_OFFSET + 6;
68pub(crate) const OBJECT_TUPLE_SERIALISED_LEN: u64 = TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2 + 1;
69
70#[derive(Clone, Debug)]
71pub(crate) struct ObjectTuple {
72 pub state: ObjectState,
73 pub id: ObjectId,
74 pub metadata_dev_offset: u64,
75 pub metadata_page_size_pow2: u8,
76}
77
78impl ObjectTuple {
79 pub fn serialise(&self, out: &mut [u8]) {
80 assert_eq!(out.len(), usz!(OBJECT_TUPLE_SERIALISED_LEN));
81 out[usz!(TUPLE_OFFSETOF_STATE)] = self.state as u8;
82 out.write_u128_le_at(TUPLE_OFFSETOF_ID, self.id);
83 out.write_u48_le_at(TUPLE_OFFSETOF_METADATA_DEV_OFFSET, self.metadata_dev_offset);
84 out[usz!(TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2)] = self.metadata_page_size_pow2;
85 }
86
87 pub fn deserialise(raw: &[u8]) -> Self {
88 assert_eq!(raw.len(), usz!(OBJECT_TUPLE_SERIALISED_LEN));
89 Self {
90 state: ObjectState::from_u8(raw[usz!(TUPLE_OFFSETOF_STATE)]).unwrap(),
91 id: raw.read_u128_le_at(TUPLE_OFFSETOF_ID),
92 metadata_dev_offset: raw.read_u48_le_at(TUPLE_OFFSETOF_METADATA_DEV_OFFSET),
93 metadata_page_size_pow2: raw[usz!(TUPLE_OFFSETOF_METADATA_PAGE_SIZE_POW2)],
94 }
95 }
96}
97
98#[derive(Serialize, Deserialize, Clone)]
100pub(crate) struct ObjectMetadata {
101 pub size: u64,
102 #[serde(with = "ts_microseconds")]
103 pub created: DateTime<Utc>,
104 pub key: Vec<u8>,
105 pub lpage_dev_offsets: Vec<u64>,
106 pub tail_page_dev_offsets: Vec<u64>,
107}
108
109struct ObjectInner {
110 id: ObjectId,
111 metadata_size: u64, state: AtomicU8,
113 lock: RwLock<()>,
114 metadata: ObjectMetadata,
115}
116
117#[derive(Clone)]
118pub(crate) struct Object {
119 inner: Arc<ObjectInner>,
120}
121
122impl Object {
123 pub fn new(
124 id: ObjectId,
125 state: ObjectState,
126 metadata: ObjectMetadata,
127 metadata_size: u64,
128 ) -> Self {
129 Self {
130 inner: Arc::new(ObjectInner {
131 id,
132 lock: RwLock::new(()),
133 metadata,
134 metadata_size,
135 state: AtomicU8::new(state as u8),
136 }),
137 }
138 }
139
140 pub fn with_new_id(self, new_id: ObjectId) -> Self {
141 Self::new(
142 new_id,
143 self.get_state(),
144 self.inner.metadata.clone(),
145 self.inner.metadata_size,
146 )
147 }
148
149 pub fn id(&self) -> ObjectId {
150 self.inner.id
151 }
152
153 pub fn metadata_size(&self) -> u64 {
154 self.inner.metadata_size
155 }
156
157 pub fn get_state(&self) -> ObjectState {
158 ObjectState::from_u8(self.inner.state.load(Ordering::Relaxed)).unwrap()
159 }
160
161 pub async fn update_state_then_ensure_no_writers(&self, new_state: ObjectState) {
162 self.inner.state.store(new_state as u8, Ordering::Relaxed);
164 let lock = self.inner.lock.write().await;
165 drop(lock);
166 }
167
168 pub async fn lock_for_writing_if_still_valid(
169 &self,
170 expected_state: ObjectState,
171 ) -> Result<RwLockReadGuard<'_, ()>, OpError> {
172 let lock = self.inner.lock.read().await;
174 if self.get_state() != expected_state {
175 return Err(OpError::ObjectNotFound);
176 };
177 Ok(lock)
178 }
179}
180
181impl Deref for Object {
182 type Target = ObjectMetadata;
183
184 fn deref(&self) -> &Self::Target {
185 &self.inner.metadata
186 }
187}