libblobd_lite/
bucket.rs

1use crate::ctx::State;
2use crate::object::OBJECT_KEY_LEN_MAX;
3use crate::object::OBJECT_OFF;
4use crate::page::ObjectPageHeader;
5use crate::page::ObjectState;
6use crate::page::Pages;
7use crate::page::MIN_PAGE_SIZE_POW2;
8use crate::stream::CreatedStreamEvent;
9use crate::stream::StreamEvent;
10use crate::stream::StreamEventType;
11#[cfg(test)]
12use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
13#[cfg(test)]
14use crate::test_util::journal::TestTransaction as Transaction;
15#[cfg(test)]
16use crate::test_util::journal::TestWriteJournal as WriteJournal;
17use itertools::Itertools;
18use off64::int::create_u40_be;
19use off64::int::Off64ReadInt;
20use off64::u16;
21use off64::usz;
22use off64::Off64Read;
23#[cfg(not(test))]
24use seekable_async_file::SeekableAsyncFile;
25use std::ops::Deref;
26use std::sync::Arc;
27use tokio::join;
28use tokio::sync::RwLock;
29use tokio::sync::RwLockReadGuard;
30use tokio::sync::RwLockWriteGuard;
31use tracing::debug;
32use twox_hash::xxh3::hash64;
33#[cfg(not(test))]
34use write_journal::Transaction;
35#[cfg(not(test))]
36use write_journal::WriteJournal;
37
38/**
39
40BUCKET
41======
42
43Since we hash keys, we expect keys to have no correlation to their buckets. As such, we will likely jump between random buckets on each key lookup, and if they are not loaded into memory, we'll end up continuously paging in and out of disk, ruining any performance gain of using a hash map structure.
44
45The limit on the amount of buckets is somewhat arbitrary, but provides reasonable constraints that we can target and optimise for.
46
47Because we allow deletion of objects and aim for immediate freeing of space, we must use locks on each bucket, as deleting requires detaching the inode, which means modifying non-atomic heap data. If we modified the memory anyway, other readers/writers may jump to arbitrary positions and possibly leak sensitive data or crash. Also, simultaneous creations/deletions on the same bucket would cause race conditions. Creating an object also requires a lock; it's possible to do an atomic CAS on the in-memory bucket head, but then the bucket would point to an inode with an uninitialised or zero "next" link.
48
49Structure
50---------
51
52u8 count_log2_between_12_and_40_inclusive
53u40[] dev_offset_rshift8_or_zero
54
55**/
56
57pub(crate) const BUCKETS_OFFSETOF_COUNT_LOG2: u64 = 0;
58
59pub(crate) fn BUCKETS_OFFSETOF_BUCKET(bkt_id: u64) -> u64 {
60  BUCKETS_OFFSETOF_COUNT_LOG2 + (bkt_id * 5)
61}
62
63pub(crate) fn BUCKETS_SIZE(bkt_cnt: u64) -> u64 {
64  BUCKETS_OFFSETOF_BUCKET(bkt_cnt)
65}
66
67pub(crate) struct FoundObject {
68  pub prev_dev_offset: Option<u64>,
69  pub next_dev_offset: Option<u64>,
70  pub dev_offset: u64,
71  pub id: u64,
72  pub size: u64,
73}
74
75// This type exists to make sure methods are called only when holding appropriate lock.
76pub(crate) struct ReadableLockedBucket<'b, 'k> {
77  bucket_id: u64,
78  buckets: &'b Buckets,
79  key_len: u16,
80  key: &'k [u8],
81}
82
83impl<'b, 'k> ReadableLockedBucket<'b, 'k> {
84  pub fn bucket_id(&self) -> u64 {
85    self.bucket_id
86  }
87
88  pub async fn find_object(&self, expected_id: Option<u64>) -> Option<FoundObject> {
89    let mut dev_offset = self.get_head().await;
90    let mut prev_dev_offset = None;
91    while dev_offset > 0 {
92      let (hdr, raw) = join! {
93        // SAFETY: We're holding a read lock, so the linked list cannot be in an invalid/intermediate state, and all elements should be committed objects.
94        self.buckets.pages.read_page_header::<ObjectPageHeader>(dev_offset),
95        self.buckets.dev.read_at(dev_offset, OBJECT_OFF.with_key_len(OBJECT_KEY_LEN_MAX).lpages()),
96      };
97      debug_assert_eq!(hdr.state, ObjectState::Committed);
98      debug_assert_eq!(hdr.deleted_sec, None);
99      let object_id = raw.read_u64_be_at(OBJECT_OFF.id());
100      if (expected_id.is_none() || expected_id.unwrap() == object_id)
101        && raw.read_u16_be_at(OBJECT_OFF.key_len()) == self.key_len
102        && raw.read_at(OBJECT_OFF.key(), self.key_len.into()) == self.key
103      {
104        return Some(FoundObject {
105          dev_offset,
106          next_dev_offset: Some(hdr.next).filter(|o| *o > 0),
107          id: object_id,
108          prev_dev_offset,
109          size: raw.read_u40_be_at(OBJECT_OFF.size()),
110        });
111      };
112      prev_dev_offset = Some(dev_offset);
113      dev_offset = hdr.next;
114    }
115    None
116  }
117
118  pub async fn get_head(&self) -> u64 {
119    self
120      .buckets
121      .journal
122      .read_with_overlay(
123        self.buckets.dev_offset + BUCKETS_OFFSETOF_BUCKET(self.bucket_id),
124        5,
125      )
126      .await
127      .read_u40_be_at(0)
128      << MIN_PAGE_SIZE_POW2
129  }
130}
131
132pub(crate) struct BucketReadLocked<'b, 'k, 'l> {
133  state: ReadableLockedBucket<'b, 'k>,
134  // We just hold this value, we don't use it.
135  #[allow(unused)]
136  lock: RwLockReadGuard<'l, ()>,
137}
138
139impl<'b, 'k, 'l> Deref for BucketReadLocked<'b, 'k, 'l> {
140  type Target = ReadableLockedBucket<'b, 'k>;
141
142  fn deref(&self) -> &Self::Target {
143    &self.state
144  }
145}
146
147// This struct's methods take `&mut self` to ensure write lock, even though it's not necessary.
148pub(crate) struct BucketWriteLocked<'b, 'k, 'l> {
149  state: ReadableLockedBucket<'b, 'k>,
150  // We just hold this value, we don't use it.
151  #[allow(unused)]
152  lock: RwLockWriteGuard<'l, ()>,
153}
154
155impl<'b, 'k, 'l> BucketWriteLocked<'b, 'k, 'l> {
156  pub async fn move_object_to_deleted_list_if_exists(
157    &mut self,
158    txn: &mut Transaction,
159    // TODO This is a workaround for the borrow checker, as it won't let us borrow both `deleted_list` and `stream` in `State` mutably.
160    state: &mut State,
161    id: Option<u64>,
162  ) -> Option<CreatedStreamEvent> {
163    let Some(FoundObject {
164      prev_dev_offset: prev_obj,
165      next_dev_offset: next_obj,
166      dev_offset: obj_dev_offset,
167      id: object_id,
168      ..
169    }) = self.find_object(id).await else {
170      return None;
171    };
172
173    // Detach from bucket.
174    match prev_obj {
175      Some(prev_inode_dev_offset) => {
176        // Update next pointer of previous inode.
177        self
178          .buckets
179          .pages
180          .update_page_header::<ObjectPageHeader>(txn, prev_inode_dev_offset, |p| {
181            debug_assert_eq!(p.state, ObjectState::Committed);
182            debug_assert_eq!(p.deleted_sec, None);
183            p.next = next_obj.unwrap_or(0);
184          })
185          .await;
186      }
187      None => {
188        // Update bucket head.
189        self.mutate_head(txn, next_obj.unwrap_or(0));
190      }
191    };
192
193    // Attach to deleted list.
194    state.deleted_list.attach(txn, obj_dev_offset).await;
195
196    // Create event.
197    let e = state.stream.create_event_on_device(txn, StreamEvent {
198      typ: StreamEventType::ObjectDelete,
199      bucket_id: self.state.bucket_id,
200      object_id,
201    });
202
203    Some(e)
204  }
205
206  pub fn mutate_head(&mut self, txn: &mut Transaction, dev_offset: u64) {
207    txn.write_with_overlay(
208      self.buckets.dev_offset + BUCKETS_OFFSETOF_BUCKET(self.bucket_id),
209      create_u40_be(dev_offset >> MIN_PAGE_SIZE_POW2),
210    );
211  }
212}
213
214impl<'b, 'k, 'l> Deref for BucketWriteLocked<'b, 'k, 'l> {
215  type Target = ReadableLockedBucket<'b, 'k>;
216
217  fn deref(&self) -> &Self::Target {
218    &self.state
219  }
220}
221
222pub(crate) struct Buckets {
223  bucket_count_pow2: u8,
224  bucket_lock_count_pow2: u8,
225  bucket_locks: Vec<RwLock<()>>,
226  dev_offset: u64,
227  dev: SeekableAsyncFile,
228  journal: Arc<WriteJournal>,
229  pages: Arc<Pages>,
230}
231
232impl Buckets {
233  pub async fn load_from_device(
234    dev: SeekableAsyncFile,
235    journal: Arc<WriteJournal>,
236    pages: Arc<Pages>,
237    dev_offset: u64,
238    bucket_lock_count_pow2: u8,
239  ) -> Buckets {
240    let bucket_count_pow2 = dev.read_at(dev_offset, 1).await[0];
241    let bucket_locks = (0..1 << bucket_lock_count_pow2)
242      .map(|_| RwLock::new(()))
243      .collect_vec();
244    debug!(
245      bucket_count = 1 << bucket_count_pow2,
246      bucket_lock_count = 1 << bucket_lock_count_pow2,
247      "buckets loaded"
248    );
249    Buckets {
250      bucket_count_pow2,
251      bucket_lock_count_pow2,
252      bucket_locks,
253      dev_offset,
254      dev,
255      journal,
256      pages,
257    }
258  }
259
260  pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64, bucket_count_pow2: u8) {
261    let mut raw = vec![0u8; usz!(BUCKETS_SIZE(1 << bucket_count_pow2))];
262    raw[usz!(BUCKETS_OFFSETOF_COUNT_LOG2)] = bucket_count_pow2;
263    dev.write_at(dev_offset, raw).await;
264  }
265
266  fn bucket_id_for_key(&self, key: &[u8]) -> u64 {
267    hash64(key) >> (64 - self.bucket_count_pow2)
268  }
269
270  fn bucket_lock_id_for_bucket_id(&self, bkt_id: u64) -> usize {
271    usz!(bkt_id >> (self.bucket_count_pow2 - self.bucket_lock_count_pow2))
272  }
273
274  fn build_readable_locked_bucket<'b, 'k>(&'b self, key: &'k [u8]) -> ReadableLockedBucket<'b, 'k> {
275    let bucket_id = self.bucket_id_for_key(key);
276    ReadableLockedBucket {
277      bucket_id,
278      buckets: self,
279      key,
280      key_len: u16!(key.len()),
281    }
282  }
283
284  pub async fn get_bucket_for_key<'b, 'k>(&'b self, key: &'k [u8]) -> BucketReadLocked<'b, 'k, '_> {
285    let state = self.build_readable_locked_bucket(key);
286    let lock = self.bucket_locks[self.bucket_lock_id_for_bucket_id(state.bucket_id)]
287      .read()
288      .await;
289    BucketReadLocked { state, lock }
290  }
291
292  pub async fn get_bucket_mut_for_key<'b, 'k>(
293    &'b self,
294    key: &'k [u8],
295  ) -> BucketWriteLocked<'b, 'k, '_> {
296    let state = self.build_readable_locked_bucket(key);
297    let lock = self.bucket_locks[self.bucket_lock_id_for_bucket_id(state.bucket_id)]
298      .write()
299      .await;
300    BucketWriteLocked { state, lock }
301  }
302}