1#[cfg(test)]
2pub mod tests;
3
4use crate::metrics::BlobdMetrics;
5use crate::page::FreePagePageHeader;
6use crate::page::Pages;
7use crate::page::MAX_PAGE_SIZE_POW2;
8use crate::page::MIN_PAGE_SIZE_POW2;
9#[cfg(test)]
10use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
11#[cfg(test)]
12use crate::test_util::journal::TestTransaction as Transaction;
13use crate::util::floor_pow2;
14use crate::util::mod_pow2;
15use async_recursion::async_recursion;
16use futures::future::join_all;
17use off64::int::create_u64_be;
18use off64::int::Off64AsyncReadInt;
19use off64::int::Off64WriteMutInt;
20use off64::usz;
21#[cfg(not(test))]
22use seekable_async_file::SeekableAsyncFile;
23use std::cmp::max;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::sync::Arc;
27use tracing::debug;
28use tracing::info;
29use tracing::trace;
30#[cfg(not(test))]
31use write_journal::Transaction;
32
33const ALLOCSTATE_OFFSETOF_FRONTIER: u64 = 0;
34const fn ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(page_size_pow2: u8) -> u64 {
35 ALLOCSTATE_OFFSETOF_FRONTIER + 8 + 8 * ((page_size_pow2 - MIN_PAGE_SIZE_POW2) as u64)
36}
37pub(crate) const ALLOCSTATE_SIZE: u64 =
38 ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(MAX_PAGE_SIZE_POW2 + 1);
39
40pub(crate) struct Allocator {
41 state_dev_offset: u64,
42 pages: Arc<Pages>,
43 metrics: Arc<BlobdMetrics>,
44 frontier_dev_offset: u64,
46 device_size: Arc<AtomicU64>,
48 free_list_head: Vec<u64>,
50}
51
52impl Allocator {
53 pub async fn load_from_device(
54 dev: &SeekableAsyncFile,
55 device_size: Arc<AtomicU64>,
56 state_dev_offset: u64,
57 pages: Arc<Pages>,
58 metrics: Arc<BlobdMetrics>,
59 heap_dev_offset: u64,
60 ) -> Self {
61 assert_eq!(mod_pow2(heap_dev_offset, pages.lpage_size_pow2), 0);
63 let frontier_dev_offset = dev
64 .read_u64_be_at(state_dev_offset + ALLOCSTATE_OFFSETOF_FRONTIER)
65 .await;
66 let free_list_head = join_all((pages.spage_size_pow2..=pages.lpage_size_pow2).map(|i| {
67 dev.read_u64_be_at(state_dev_offset + ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(i))
68 }))
69 .await;
70 debug!(frontier_dev_offset, "allocator loaded");
71 Self {
72 device_size,
73 free_list_head,
74 frontier_dev_offset,
75 metrics,
76 pages,
77 state_dev_offset,
78 }
79 }
80
81 pub async fn format_device(dev: &SeekableAsyncFile, state_dev_offset: u64, heap_dev_offset: u64) {
82 let mut raw = vec![0u8; usz!(ALLOCSTATE_SIZE)];
83 raw.write_u64_be_at(ALLOCSTATE_OFFSETOF_FRONTIER, heap_dev_offset);
84 dev.write_at(state_dev_offset, raw).await;
85 }
86
87 fn get_free_list_head(&mut self, page_size_pow2: u8) -> u64 {
88 let pow2_idx = usz!(page_size_pow2 - self.pages.spage_size_pow2);
89 self.free_list_head[pow2_idx]
90 }
91
92 fn update_free_list_head(
101 &mut self,
102 txn: &mut Transaction,
103 page_size_pow2: u8,
104 new_head_page_dev_offset: u64,
105 ) {
106 let pow2_idx = usz!(page_size_pow2 - self.pages.spage_size_pow2);
107 txn.write(
109 self.state_dev_offset + ALLOCSTATE_OFFSETOF_PAGE_SIZE_FREE_LIST_HEAD(page_size_pow2),
110 create_u64_be(new_head_page_dev_offset),
111 );
112 self.free_list_head[pow2_idx] = new_head_page_dev_offset;
113 trace!(
114 new_head_page_dev_offset,
115 page_size_pow2,
116 "updated free list head"
117 );
118 }
119
120 async fn detach_page_from_free_list(
129 &mut self,
130 txn: &mut Transaction,
131 page_dev_offset: u64,
132 page_size_pow2: u8,
133 ) {
134 let hdr = self
135 .pages
136 .read_page_header::<FreePagePageHeader>(page_dev_offset)
137 .await;
138 if hdr.prev == 0 {
139 self.update_free_list_head(txn, page_size_pow2, hdr.next);
141 } else {
142 self
144 .pages
145 .update_page_header::<FreePagePageHeader>(txn, hdr.prev, |h| h.next = hdr.next)
146 .await;
147 };
148 if hdr.next != 0 {
149 self
151 .pages
152 .update_page_header::<FreePagePageHeader>(txn, hdr.next, |h| h.prev = hdr.prev)
153 .await;
154 };
155 trace!(
156 page_dev_offset,
157 page_size_pow2,
158 "detached page from free list"
159 );
160 }
161
162 async fn try_consume_page_at_free_list_head(
171 &mut self,
172 txn: &mut Transaction,
173 page_size_pow2: u8,
174 ) -> Option<u64> {
175 let page_dev_offset = self.get_free_list_head(page_size_pow2);
176 if page_dev_offset == 0 {
177 return None;
178 };
179 trace!(page_size_pow2, page_dev_offset, "found free page");
180 let new_free_page = self
182 .pages
183 .read_page_header::<FreePagePageHeader>(page_dev_offset)
184 .await
185 .next;
186 self.update_free_list_head(txn, page_size_pow2, new_free_page);
187 if new_free_page != 0 {
188 self
189 .pages
190 .update_page_header::<FreePagePageHeader>(txn, new_free_page, |h| h.prev = 0)
191 .await;
192 };
193 Some(page_dev_offset)
194 }
195
196 async fn insert_page_into_free_list(
205 &mut self,
206 txn: &mut Transaction,
207 page_dev_offset: u64,
208 page_size_pow2: u8,
209 ) {
210 let cur_head = self.get_free_list_head(page_size_pow2);
211 self
212 .pages
213 .write_page_header(txn, page_dev_offset, FreePagePageHeader {
214 prev: 0,
215 next: cur_head,
216 });
217 if cur_head != 0 {
218 self
219 .pages
220 .update_page_header::<FreePagePageHeader>(txn, cur_head, |f| f.prev = page_dev_offset)
221 .await;
222 };
223 self.update_free_list_head(txn, page_size_pow2, page_dev_offset);
224 trace!(
225 page_size_pow2,
226 page_dev_offset,
227 cur_head,
228 "inserted page into free list"
229 );
230 }
231
232 async fn allocate_new_block_and_then_allocate_lpage(&mut self, txn: &mut Transaction) -> u64 {
233 let lpage_size = 1 << self.pages.lpage_size_pow2;
234 let block_dev_offset = self.frontier_dev_offset;
235 let new_frontier = block_dev_offset + self.pages.block_size;
236 info!(block_dev_offset, new_frontier, "allocating new block");
237 if new_frontier > self.device_size.load(Ordering::Relaxed) {
238 panic!("out of space");
239 };
240 self.metrics.incr_allocated_block_count(txn, 1);
241 txn.write(
243 self.state_dev_offset + ALLOCSTATE_OFFSETOF_FRONTIER,
244 create_u64_be(new_frontier),
245 );
246 self.frontier_dev_offset = new_frontier;
247
248 for i in (0..self.pages.lpage_size()).step_by(8) {
250 txn.write_with_overlay(block_dev_offset + i, create_u64_be(u64::MAX));
251 }
252
253 {
261 let mut lpage_dev_offset = new_frontier - lpage_size;
262 while lpage_dev_offset >= block_dev_offset + 2 * lpage_size {
263 self
264 .insert_page_into_free_list(txn, lpage_dev_offset, self.pages.lpage_size_pow2)
265 .await;
266 lpage_dev_offset -= lpage_size;
267 }
268 };
269 self.metrics.incr_used_bytes(txn, self.pages.lpage_size());
271
272 let first_data_lpage_dev_offset = block_dev_offset + lpage_size;
274 first_data_lpage_dev_offset
275 }
276
277 #[async_recursion]
278 async fn allocate_page(&mut self, txn: &mut Transaction, page_size_pow2: u8) -> u64 {
279 assert!(
280 page_size_pow2 >= self.pages.spage_size_pow2 && page_size_pow2 <= self.pages.lpage_size_pow2
281 );
282 let page_dev_offset = match self
283 .try_consume_page_at_free_list_head(txn, page_size_pow2)
284 .await
285 {
286 Some(page_dev_offset) => page_dev_offset,
287 None if page_size_pow2 == self.pages.lpage_size_pow2 => {
288 trace!(page_size_pow2, "ran out of lpages, will allocate new block");
289 self.allocate_new_block_and_then_allocate_lpage(txn).await
291 }
292 None => {
293 trace!(
294 page_size_pow2,
295 "ran out of pages, will allocate page of the next bigger size"
296 );
297 let larger_page_dev_offset = self.allocate_page(txn, page_size_pow2 + 1).await;
299 let right_page_dev_offset = larger_page_dev_offset + (1 << page_size_pow2);
301 self
302 .insert_page_into_free_list(txn, right_page_dev_offset, page_size_pow2)
303 .await;
304 self
305 .pages
306 .mark_page_as_free(txn, right_page_dev_offset, page_size_pow2)
307 .await;
308 larger_page_dev_offset
309 }
310 };
311 self
313 .pages
314 .mark_page_as_not_free(txn, page_dev_offset, page_size_pow2)
315 .await;
316 trace!(page_size_pow2, page_dev_offset, "allocated page");
317 page_dev_offset
318 }
319
320 pub async fn allocate_and_ret_with_size(
321 &mut self,
322 txn: &mut Transaction,
323 size: u64,
324 ) -> (u64, u8) {
325 let pow2 = max(
326 self.pages.spage_size_pow2,
327 size.next_power_of_two().ilog2().try_into().unwrap(),
328 );
329 assert!(pow2 <= self.pages.lpage_size_pow2);
330 self.metrics.incr_allocated_page_count(txn, 1);
332 self.metrics.incr_used_bytes(txn, 1 << pow2);
333 (self.allocate_page(txn, pow2).await, pow2)
334 }
335
336 pub async fn allocate(&mut self, txn: &mut Transaction, size: u64) -> u64 {
337 self.allocate_and_ret_with_size(txn, size).await.0
338 }
339
340 #[async_recursion]
341 async fn release_internal(
342 &mut self,
343 txn: &mut Transaction,
344 page_dev_offset: u64,
345 page_size_pow2: u8,
346 ) {
347 if page_size_pow2 < self.pages.lpage_size_pow2 {
349 let buddy_page_dev_offset = page_dev_offset ^ (1 << page_size_pow2);
350 if self
351 .pages
352 .is_page_free(buddy_page_dev_offset, page_size_pow2)
353 .await
354 {
355 trace!(
357 page_dev_offset,
358 page_size_pow2,
359 buddy_page_dev_offset,
360 "buddy is also free"
361 );
362 self
363 .detach_page_from_free_list(txn, buddy_page_dev_offset, page_size_pow2)
364 .await;
365 self
366 .pages
367 .mark_page_as_not_free(txn, page_dev_offset, page_size_pow2)
368 .await;
369 self
370 .pages
371 .mark_page_as_not_free(txn, buddy_page_dev_offset, page_size_pow2)
372 .await;
373 let parent_page_dev_offset = floor_pow2(page_dev_offset, page_size_pow2 + 1);
375 self
376 .release_internal(txn, parent_page_dev_offset, page_size_pow2 + 1)
377 .await;
378 return;
379 };
380 };
381 self
383 .insert_page_into_free_list(txn, page_dev_offset, page_size_pow2)
384 .await;
385 self
386 .pages
387 .mark_page_as_free(txn, page_dev_offset, page_size_pow2)
388 .await;
389 }
390
391 pub async fn release(&mut self, txn: &mut Transaction, page_dev_offset: u64, page_size_pow2: u8) {
392 self.metrics.decr_allocated_page_count(txn, 1);
394 self.metrics.decr_used_bytes(txn, 1 << page_size_pow2);
395 self
396 .release_internal(txn, page_dev_offset, page_size_pow2)
397 .await
398 }
399}