libblobd_lite/allocator/
mod.rs

1#[cfg(test)]
2pub mod tests;
3
4use crate::metrics::BlobdMetrics;
5use crate::page::FreePagePageHeader;
6use crate::page::Pages;
7use crate::page::MAX_PAGE_SIZE_POW2;
8use crate::page::MIN_PAGE_SIZE_POW2;
9#[cfg(test)]
10use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
11#[cfg(test)]
12use crate::test_util::journal::TestTransaction as Transaction;
13use crate::util::floor_pow2;
14use crate::util::mod_pow2;
15use async_recursion::async_recursion;
16use futures::future::join_all;
17use off64::int::create_u64_be;
18use off64::int::Off64AsyncReadInt;
19use off64::int::Off64WriteMutInt;
20use off64::usz;
21#[cfg(not(test))]
22use seekable_async_file::SeekableAsyncFile;
23use std::cmp::max;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::sync::Arc;
27use tracing::debug;
28use tracing::info;
29use tracing::trace;
30#[cfg(not(test))]
31use write_journal::Transaction;
32
33const ALLOCSTATE_OFFSETOF_FRONTIER: u64 = 0;
34const fn ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(page_size_pow2: u8) -> u64 {
35  ALLOCSTATE_OFFSETOF_FRONTIER + 8 + 8 * ((page_size_pow2 - MIN_PAGE_SIZE_POW2) as u64)
36}
37pub(crate) const ALLOCSTATE_SIZE: u64 =
38  ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(MAX_PAGE_SIZE_POW2 + 1);
39
40pub(crate) struct Allocator {
41  state_dev_offset: u64,
42  pages: Arc<Pages>,
43  metrics: Arc<BlobdMetrics>,
44  // To avoid needing to write to the entire device at format time to set up linked list of free lpages, we simply record where the next block would be if there's no free lpage available.
45  frontier_dev_offset: u64,
46  // This could change during online resizing.
47  device_size: Arc<AtomicU64>,
48  // One device offset (or zero) for each page size.
49  free_list_head: Vec<u64>,
50}
51
52impl Allocator {
53  pub async fn load_from_device(
54    dev: &SeekableAsyncFile,
55    device_size: Arc<AtomicU64>,
56    state_dev_offset: u64,
57    pages: Arc<Pages>,
58    metrics: Arc<BlobdMetrics>,
59    heap_dev_offset: u64,
60  ) -> Self {
61    // Getting the buddy of a page using only XOR requires that the heap starts at an address aligned to the lpage size.
62    assert_eq!(mod_pow2(heap_dev_offset, pages.lpage_size_pow2), 0);
63    let frontier_dev_offset = dev
64      .read_u64_be_at(state_dev_offset + ALLOCSTATE_OFFSETOF_FRONTIER)
65      .await;
66    let free_list_head = join_all((pages.spage_size_pow2..=pages.lpage_size_pow2).map(|i| {
67      dev.read_u64_be_at(state_dev_offset + ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(i))
68    }))
69    .await;
70    debug!(frontier_dev_offset, "allocator loaded");
71    Self {
72      device_size,
73      free_list_head,
74      frontier_dev_offset,
75      metrics,
76      pages,
77      state_dev_offset,
78    }
79  }
80
81  pub async fn format_device(dev: &SeekableAsyncFile, state_dev_offset: u64, heap_dev_offset: u64) {
82    let mut raw = vec![0u8; usz!(ALLOCSTATE_SIZE)];
83    raw.write_u64_be_at(ALLOCSTATE_OFFSETOF_FRONTIER, heap_dev_offset);
84    dev.write_at(state_dev_offset, raw).await;
85  }
86
87  fn get_free_list_head(&mut self, page_size_pow2: u8) -> u64 {
88    let pow2_idx = usz!(page_size_pow2 - self.pages.spage_size_pow2);
89    self.free_list_head[pow2_idx]
90  }
91
92  /// - Updates allocator state in memory: ✓
93  /// - Updates allocator state on device: ✓
94  /// - Updates bitmap: **NO**
95  /// - Updates header of sibling page: **NO**
96  /// - Updates header of target page: **NO**
97  /// - Updates metrics: **NO**
98  ///
99  /// Updates page pointed to by list head. Doesn't touch the page or anything else.
100  fn update_free_list_head(
101    &mut self,
102    txn: &mut Transaction,
103    page_size_pow2: u8,
104    new_head_page_dev_offset: u64,
105  ) {
106    let pow2_idx = usz!(page_size_pow2 - self.pages.spage_size_pow2);
107    // We don't need to use overlay as we have our own copy in `self.free_list_head`.
108    txn.write(
109      self.state_dev_offset + ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(page_size_pow2),
110      create_u64_be(new_head_page_dev_offset),
111    );
112    self.free_list_head[pow2_idx] = new_head_page_dev_offset;
113    trace!(
114      new_head_page_dev_offset,
115      page_size_pow2,
116      "updated free list head"
117    );
118  }
119
120  /// - Updates allocator state in memory: ✓
121  /// - Updates allocator state on device: ✓
122  /// - Updates bitmap: **NO**
123  /// - Updates header of sibling page: ✓
124  /// - Updates header of target page: **NO**
125  /// - Updates metrics: **NO**
126  ///
127  /// Updates page pointed to by list head or sibling, whichever is adjacent. Doesn't touch the page or anything else.
128  async fn detach_page_from_free_list(
129    &mut self,
130    txn: &mut Transaction,
131    page_dev_offset: u64,
132    page_size_pow2: u8,
133  ) {
134    let hdr = self
135      .pages
136      .read_page_header::<FreePagePageHeader>(page_dev_offset)
137      .await;
138    if hdr.prev == 0 {
139      // Update head.
140      self.update_free_list_head(txn, page_size_pow2, hdr.next);
141    } else {
142      // Update prev page's next.
143      self
144        .pages
145        .update_page_header::<FreePagePageHeader>(txn, hdr.prev, |h| h.next = hdr.next)
146        .await;
147    };
148    if hdr.next != 0 {
149      // Update next page's prev.
150      self
151        .pages
152        .update_page_header::<FreePagePageHeader>(txn, hdr.next, |h| h.prev = hdr.prev)
153        .await;
154    };
155    trace!(
156      page_dev_offset,
157      page_size_pow2,
158      "detached page from free list"
159    );
160  }
161
162  /// - Updates allocator state in memory: ✓ (via `update_free_list_head`)
163  /// - Updates allocator state on device: ✓ (via `update_free_list_head`)
164  /// - Updates bitmap: **NO**
165  /// - Updates header of sibling page: ✓
166  /// - Updates header of target page: **NO**
167  /// - Updates metrics: **NO**
168  ///
169  /// Detaches page at head from list and marks it as used.
170  async fn try_consume_page_at_free_list_head(
171    &mut self,
172    txn: &mut Transaction,
173    page_size_pow2: u8,
174  ) -> Option<u64> {
175    let page_dev_offset = self.get_free_list_head(page_size_pow2);
176    if page_dev_offset == 0 {
177      return None;
178    };
179    trace!(page_size_pow2, page_dev_offset, "found free page");
180    // We don't need to mark as not free as `allocate_page` will do that.
181    let new_free_page = self
182      .pages
183      .read_page_header::<FreePagePageHeader>(page_dev_offset)
184      .await
185      .next;
186    self.update_free_list_head(txn, page_size_pow2, new_free_page);
187    if new_free_page != 0 {
188      self
189        .pages
190        .update_page_header::<FreePagePageHeader>(txn, new_free_page, |h| h.prev = 0)
191        .await;
192    };
193    Some(page_dev_offset)
194  }
195
196  /// - Updates allocator state in memory: ✓ (via `update_free_list_head`)
197  /// - Updates allocator state on device: ✓ (via `update_free_list_head`)
198  /// - Updates bitmap: **NO**
199  /// - Updates header of sibling page: ✓
200  /// - Updates header of target page: ✓
201  /// - Updates metrics: **NO**
202  ///
203  /// Attaches page to list, overwriting any existing header. **Does not mark as free.**
204  async fn insert_page_into_free_list(
205    &mut self,
206    txn: &mut Transaction,
207    page_dev_offset: u64,
208    page_size_pow2: u8,
209  ) {
210    let cur_head = self.get_free_list_head(page_size_pow2);
211    self
212      .pages
213      .write_page_header(txn, page_dev_offset, FreePagePageHeader {
214        prev: 0,
215        next: cur_head,
216      });
217    if cur_head != 0 {
218      self
219        .pages
220        .update_page_header::<FreePagePageHeader>(txn, cur_head, |f| f.prev = page_dev_offset)
221        .await;
222    };
223    self.update_free_list_head(txn, page_size_pow2, page_dev_offset);
224    trace!(
225      page_size_pow2,
226      page_dev_offset,
227      cur_head,
228      "inserted page into free list"
229    );
230  }
231
232  async fn allocate_new_block_and_then_allocate_lpage(&mut self, txn: &mut Transaction) -> u64 {
233    let lpage_size = 1 << self.pages.lpage_size_pow2;
234    let block_dev_offset = self.frontier_dev_offset;
235    let new_frontier = block_dev_offset + self.pages.block_size;
236    info!(block_dev_offset, new_frontier, "allocating new block");
237    if new_frontier > self.device_size.load(Ordering::Relaxed) {
238      panic!("out of space");
239    };
240    self.metrics.incr_allocated_block_count(txn, 1);
241    // We don't need to use overlay as we have our own copy in `self.frontier_dev_offset`.
242    txn.write(
243      self.state_dev_offset + ALLOCSTATE_OFFSETOF_FRONTIER,
244      create_u64_be(new_frontier),
245    );
246    self.frontier_dev_offset = new_frontier;
247
248    // Write bitmap of free pages in metadata lpage for block. We must use overlay.
249    for i in (0..self.pages.lpage_size()).step_by(8) {
250      txn.write_with_overlay(block_dev_offset + i, create_u64_be(u64::MAX));
251    }
252
253    // - The first lpage of a block is always reserved for the metadata lpage, so we should not mark that as a free page.
254    // - The first data lpage (second lpage) will be immediately returned for usage, so we should not mark that as a free page.
255    // - We insert in reverse order so that lpages with a lower offset are used first. This isn't necessary for correctness but may offer some performance benefit due to sequential I/O, and makes testing easier too.
256    // It may seem inefficient to insert every lpage in a loop, instead of just building a list and then inserting the head only, but:
257    // - We only do this every new block, which should not be very often, so the cost is amortised.
258    // - We're only talking about a few thousand elements, so the loop execution is still blisteringly fast (each iteration just reads and writes from a cached mmap page and DashMap). There shouldn't be any noticeable system pause/delay.
259    // - We previously used an optimised loop that was subtle and confusing, which lead to some subtle bugs not being discovered. The complexity is not worth the intangible performance gain.
260    {
261      let mut lpage_dev_offset = new_frontier - lpage_size;
262      while lpage_dev_offset >= block_dev_offset + 2 * lpage_size {
263        self
264          .insert_page_into_free_list(txn, lpage_dev_offset, self.pages.lpage_size_pow2)
265          .await;
266        lpage_dev_offset -= lpage_size;
267      }
268    };
269    // Mark metadata lpage as used space.
270    self.metrics.incr_used_bytes(txn, self.pages.lpage_size());
271
272    // We don't need to mark as not free as `allocate_page` will do that.
273    let first_data_lpage_dev_offset = block_dev_offset + lpage_size;
274    first_data_lpage_dev_offset
275  }
276
277  #[async_recursion]
278  async fn allocate_page(&mut self, txn: &mut Transaction, page_size_pow2: u8) -> u64 {
279    assert!(
280      page_size_pow2 >= self.pages.spage_size_pow2 && page_size_pow2 <= self.pages.lpage_size_pow2
281    );
282    let page_dev_offset = match self
283      .try_consume_page_at_free_list_head(txn, page_size_pow2)
284      .await
285    {
286      Some(page_dev_offset) => page_dev_offset,
287      None if page_size_pow2 == self.pages.lpage_size_pow2 => {
288        trace!(page_size_pow2, "ran out of lpages, will allocate new block");
289        // There is no lpage to break, so create new block at frontier.
290        self.allocate_new_block_and_then_allocate_lpage(txn).await
291      }
292      None => {
293        trace!(
294          page_size_pow2,
295          "ran out of pages, will allocate page of the next bigger size"
296        );
297        // Find or create a larger page.
298        let larger_page_dev_offset = self.allocate_page(txn, page_size_pow2 + 1).await;
299        // Split the larger page in two, and release right page (we'll take the left one).
300        let right_page_dev_offset = larger_page_dev_offset + (1 << page_size_pow2);
301        self
302          .insert_page_into_free_list(txn, right_page_dev_offset, page_size_pow2)
303          .await;
304        self
305          .pages
306          .mark_page_as_free(txn, right_page_dev_offset, page_size_pow2)
307          .await;
308        larger_page_dev_offset
309      }
310    };
311    // We must always mark an allocated page as not free, including split pages which definitely cannot be released/merged.
312    self
313      .pages
314      .mark_page_as_not_free(txn, page_dev_offset, page_size_pow2)
315      .await;
316    trace!(page_size_pow2, page_dev_offset, "allocated page");
317    page_dev_offset
318  }
319
320  pub async fn allocate_and_ret_with_size(
321    &mut self,
322    txn: &mut Transaction,
323    size: u64,
324  ) -> (u64, u8) {
325    let pow2 = max(
326      self.pages.spage_size_pow2,
327      size.next_power_of_two().ilog2().try_into().unwrap(),
328    );
329    assert!(pow2 <= self.pages.lpage_size_pow2);
330    // We increment these metrics here instead of in `Pages::*`, `Allocator::insert_into_free_list`, `Allocator::allocate_page`, etc. as many of those are called during intermediate states, like merging/splitting pages, which aren't actual allocations.
331    self.metrics.incr_allocated_page_count(txn, 1);
332    self.metrics.incr_used_bytes(txn, 1 << pow2);
333    (self.allocate_page(txn, pow2).await, pow2)
334  }
335
336  pub async fn allocate(&mut self, txn: &mut Transaction, size: u64) -> u64 {
337    self.allocate_and_ret_with_size(txn, size).await.0
338  }
339
340  #[async_recursion]
341  async fn release_internal(
342    &mut self,
343    txn: &mut Transaction,
344    page_dev_offset: u64,
345    page_size_pow2: u8,
346  ) {
347    // Check if buddy is also free so we can recompact. This doesn't apply to lpages as they don't have buddies (they aren't split).
348    if page_size_pow2 < self.pages.lpage_size_pow2 {
349      let buddy_page_dev_offset = page_dev_offset ^ (1 << page_size_pow2);
350      if self
351        .pages
352        .is_page_free(buddy_page_dev_offset, page_size_pow2)
353        .await
354      {
355        // Buddy is also free.
356        trace!(
357          page_dev_offset,
358          page_size_pow2,
359          buddy_page_dev_offset,
360          "buddy is also free"
361        );
362        self
363          .detach_page_from_free_list(txn, buddy_page_dev_offset, page_size_pow2)
364          .await;
365        self
366          .pages
367          .mark_page_as_not_free(txn, page_dev_offset, page_size_pow2)
368          .await;
369        self
370          .pages
371          .mark_page_as_not_free(txn, buddy_page_dev_offset, page_size_pow2)
372          .await;
373        // Merge by freeing parent larger page.
374        let parent_page_dev_offset = floor_pow2(page_dev_offset, page_size_pow2 + 1);
375        self
376          .release_internal(txn, parent_page_dev_offset, page_size_pow2 + 1)
377          .await;
378        return;
379      };
380    };
381    // This will overwrite the page's header.
382    self
383      .insert_page_into_free_list(txn, page_dev_offset, page_size_pow2)
384      .await;
385    self
386      .pages
387      .mark_page_as_free(txn, page_dev_offset, page_size_pow2)
388      .await;
389  }
390
391  pub async fn release(&mut self, txn: &mut Transaction, page_dev_offset: u64, page_size_pow2: u8) {
392    // Similar to `allocate_and_ret_with_size`, we need to change metrics here and use an internal function. See `allocate_and_ret_with_size` for comment explaining why.
393    self.metrics.decr_allocated_page_count(txn, 1);
394    self.metrics.decr_used_bytes(txn, 1 << page_size_pow2);
395    self
396      .release_internal(txn, page_dev_offset, page_size_pow2)
397      .await
398  }
399}