1use crate::{
2 address::segment::{SEGMENTS_ROOT_PAGE_VERSION, SEGMENTS_ROOT_PAGE_VERSION_0},
3 allocator::{cache::Cache, free_list::FreeList},
4 config::Config,
5 device::{Device, Page, PageOps, ReadPage, UpdateList},
6 error::PERes,
7 flush_checksum::{double_buffer_check, write_root_page},
8 snapshot::data::PendingClean,
9 util::io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat},
10};
11use std::{io::Write, sync::Arc, sync::Mutex};
12
13mod cache;
14pub(crate) mod free_list;
15#[cfg(test)]
16mod tests;
17
18const ALLOCATOR_PAGE_EXP: u8 = 10; const ALLOCATOR_ROOT_PAGE_VERSION_V0: u8 = 0;
20const ALLOCATOR_ROOT_PAGE_VERSION_V1: u8 = 1;
21const ALLOCATOR_ROOT_PAGE_VERSION: u8 = ALLOCATOR_ROOT_PAGE_VERSION_V1;
22
23struct RootWriteInfo {
24 page: u64,
25 buffer: Vec<u8>,
26 version: u8,
27}
28
29#[derive(Clone, Default)]
30struct AddressData {
31 page: u64,
32 other_page: u64,
33}
34
35#[derive(Default)]
36pub struct RootPageHolder {
37 page: u64,
38 buffer: Option<Vec<u8>>,
39 dirty: bool,
40 version: u8,
41}
42
43impl RootPageHolder {
44 fn write_data(&mut self) -> Option<RootWriteInfo> {
45 if let Some(buff) = &self.buffer {
46 if self.dirty {
47 self.dirty = false;
48 return Some(RootWriteInfo {
49 page: self.page,
50 buffer: buff.clone(),
51 version: self.version,
52 });
53 }
54 }
55 None
56 }
57}
58
59#[derive(Default)]
60pub struct Counter {
61 flush_counter: u8,
62}
63#[derive(Default)]
64struct FlushCount {
65 free_list: Counter,
66 journal: Counter,
67 address: (Counter, AddressData),
68}
69
70#[derive(Default)]
73struct RootMonitor {
74 free_list_holder: RootPageHolder,
75 journal_holder: RootPageHolder,
76 address_holder: RootPageHolder,
77}
78impl RootMonitor {
79 fn is_dirty(&self) -> bool {
80 self.free_list_holder.dirty || self.journal_holder.dirty || self.address_holder.dirty
81 }
82}
83
84#[derive(Default)]
85struct ReleaseNextSync {
86 to_release: Vec<Arc<PendingClean>>,
87}
88
89pub struct Allocator {
91 device: Box<dyn Device>,
92 free_list: Mutex<FreeList>,
93 cache: Mutex<Cache>,
94 root_monitor: Mutex<RootMonitor>,
95 flush_count: Mutex<FlushCount>,
96 release_next_sync: Mutex<ReleaseNextSync>,
97 page: u64,
98}
99
100impl Allocator {
101 pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PERes<Self> {
102 let mut root_monitor = RootMonitor::default();
103 let mut flush_count = FlushCount::default();
104 let mut pg = dr.load_page(page)?;
105 let mut freelist = FreeList::read(&mut pg, &mut root_monitor.free_list_holder, &mut flush_count.free_list)?;
106 freelist.check_and_clean(&*dr)?;
107
108 let cache_size = config.cache_size();
109 let cache_age_limit = config.cache_age_limit();
110 Ok(Allocator {
111 device: dr,
112 free_list: Mutex::new(freelist),
113 cache: Mutex::new(Cache::new(cache_size, cache_age_limit)),
114 root_monitor: Mutex::new(root_monitor),
115 flush_count: Mutex::new(flush_count),
116 release_next_sync: Default::default(),
117 page,
118 })
119 }
120
121 pub fn init(dr: Box<dyn Device>, config: &Config) -> PERes<(u64, Allocator)> {
122 let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
123 let mut list = FreeList::default();
124 let mut counter = Counter::default();
125 let buffer = list.write_list();
126 Allocator::write_root_page(&mut page, &mut counter, buffer.to_vec(), ALLOCATOR_ROOT_PAGE_VERSION)?;
127 dr.flush_page(&page)?;
128 let allocate_page = page.get_index();
129 Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
130 }
131
132 pub fn load_page_not_free(&self, page: u64) -> PERes<Option<ReadPage>> {
133 {
134 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
135 if let Some(pg) = cache.get(page) {
136 if pg.is_free()? {
137 return Ok(None);
138 } else {
139 return Ok(Some(pg));
140 }
141 }
142 }
143 if let Some(load) = self.device.load_page_if_exists(page)? {
144 if load.is_free()? {
145 Ok(None)
146 } else {
147 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
148 cache.put(page, load.clone_read());
149 Ok(Some(load))
150 }
151 } else {
152 Ok(None)
153 }
154 }
155
156 pub(crate) fn to_release_next_sync(&self, to_release: Arc<PendingClean>) {
157 self.release_next_sync
158 .lock()
159 .expect("next sync lock not poisoned")
160 .to_release
161 .push(to_release);
162 }
163
164 pub fn load_page(&self, page: u64) -> PERes<ReadPage> {
165 let load = self.read_page_int(page)?;
166 debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
167 Ok(load)
168 }
169
170 pub fn write_page(&self, page: u64) -> PERes<Page> {
171 let load = self.write_page_int(page)?;
172 debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
173 Ok(load)
174 }
175
176 fn read_page_int(&self, page: u64) -> PERes<ReadPage> {
177 {
178 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
179 if let Some(pg) = cache.get(page) {
180 return Ok(pg);
181 }
182 }
183 let load = self.device.load_page(page)?;
184 {
185 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
186 cache.put(page, load.clone_read());
187 }
188 Ok(load)
189 }
190
191 fn write_page_int(&self, page: u64) -> PERes<Page> {
192 let cache_result;
193 {
194 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
195 cache_result = cache.get(page);
196 }
197 if let Some(pg) = cache_result {
198 return Ok(pg.clone_write());
199 }
200 let load = self.device.load_page(page)?;
201 {
202 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
203 cache.put(page, load.clone_read());
204 }
205 Ok(load.clone_write())
206 }
207
208 pub fn allocate(&self, exp: u8) -> PERes<Page> {
209 let mut fl = self.free_list.lock().expect("free list lock not poisoned");
210 let page = fl.get_next_available(exp);
211 if page != 0u64 {
212 let next = self.device.mark_allocated(page)?;
213 fl.set_next_available_if_match(exp, page, next);
214 {
215 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
216 cache.remove(page);
217 }
218 Ok(Page::new_alloc(page, exp))
219 } else {
220 self.device.create_page(exp)
221 }
222 }
223
224 pub fn flush_journal(&self, page: &Page) -> PERes<()> {
225 self.device.flush_page(page)?;
226 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
227 cache.remove(page.get_index());
228 Ok(())
229 }
230
231 pub fn flush_page(&self, page: Page) -> PERes<()> {
232 self.device.flush_page(&page)?;
233 {
234 let mut cache = self.cache.lock().expect("cache lock is not poisoned");
235 cache.put(page.get_index(), page.make_read());
236 }
237 Ok(())
238 }
239
240 pub fn remove_from_free(&self, page: u64, exp: u8) -> PERes<()> {
241 let mut fl = self.free_list.lock().expect("free list lock not poisoned");
242 let mut pg = self.device.load_free_page(page)?;
243 if pg.is_free()? {
244 if pg.get_prev_free() == 0 {
245 fl.set_free(exp, pg.get_next_free());
246 } else {
247 let mut next = self.device.load_free_page(pg.get_next_free())?;
248 next.set_prev_free(pg.get_prev_free());
249 self.device.flush_free_page(&next)?;
250 let mut prev = self.device.load_free_page(pg.get_prev_free())?;
251 prev.set_next_free(pg.get_next_free());
252 self.device.flush_free_page(&prev)?;
253 }
254 pg.set_free(false)?;
255 self.device.flush_free_page(&pg)?;
256 } else {
257 }
259 Ok(())
260 }
261
262 pub fn recover_free(&self, page: u64) -> PERes<()> {
264 if let Ok(p) = self.device.load_free_page(page) {
265 if !p.is_free()? {
266 self.free(page)?;
267 } else {
268 self.free_list
269 .lock()
270 .expect("free list lock not poisoned")
271 .recover_free(p)?;
272 }
273 }
274 Ok(())
275 }
276 pub fn recover_sync(&self) -> PERes<bool> {
277 self.free_list
278 .lock()
279 .expect("free list lock not poisoned")
280 .check_and_clean(&*self.device)?;
281 self.device_sync()
282 }
283 pub fn trim_free_at_end(&self) -> PERes<()> {
284 let mut fl = self.free_list.lock().expect("free list lock not poisoned");
285 let list: &mut FreeList = &mut fl;
286 self.device.trim_end_pages(list)?;
287 Ok(())
288 }
289
290 pub fn free_pages(&self, pages: &[u64]) -> PERes<()> {
291 let mut fl = self.free_list.lock().expect("free list lock not poisoned");
292 let list: &mut FreeList = &mut fl;
293 self.cache.lock().expect("cache lock is not poisoned").remove_all(pages);
294 for page in pages {
295 self.device.trim_or_free_page(*page, list)?;
296 }
297 Ok(())
298 }
299
300 pub fn free(&self, page: u64) -> PERes<()> {
301 self.free_pages(&[page])
302 }
303
304 pub fn flush_free_list(&self) -> PERes<()> {
305 let mut lock = self.free_list.lock().expect("free list lock not poisoned");
306 if lock.is_changed() {
307 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
308 let page = self.device.load_page(self.page)?.clone_write();
309 let mut buffer = lock.write_list().to_vec();
310 let holder = &mut monitor.free_list_holder;
311 self.write_root(page.get_index(), holder, &mut buffer, ALLOCATOR_ROOT_PAGE_VERSION)?;
312 lock.reset_changed_flag();
314 }
315 Ok(())
316 }
317
318 pub fn write_address_root(&self, root: u64, buffer: &mut [u8], version: u8) -> PERes<()> {
319 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
320 self.write_root(root, &mut monitor.address_holder, buffer, version)
321 }
322 pub fn write_journal_root(&self, root: Page, buffer: &mut [u8], version: u8) -> PERes<()> {
323 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
324 self.write_root(root.get_index(), &mut monitor.journal_holder, buffer, version)
325 }
326
327 fn write_root(&self, root: u64, holder: &mut RootPageHolder, buffer: &mut [u8], version: u8) -> PERes<()> {
328 holder.page = root;
329 holder.version = version;
330 holder.buffer = Some(Vec::from(buffer));
331 holder.dirty = true;
332 Ok(())
333 }
334
335 fn write_root_page(root: &mut Page, holder: &mut Counter, mut buffer: Vec<u8>, version: u8) -> PERes<()> {
336 let last_flush = holder.flush_counter;
337 let order = write_root_page(root, &mut buffer, version, last_flush)?;
338 holder.flush_counter = order;
339 Ok(())
340 }
341 fn write_root_page_info(
342 &self,
343 mut info: RootWriteInfo,
344 holder: &mut Counter,
345 ad: Option<&mut AddressData>,
346 ) -> PERes<()> {
347 let mut root = self.write_page(info.page)?;
348 let last_flush = holder.flush_counter;
349 let order = if let Some(bp) = ad {
350 let exp = self.exp_from_content_size(info.buffer.len() as u64);
351 let mut content_page = if bp.other_page == 0 {
352 self.allocate(exp)?
353 } else {
354 let mut page = self.write_page(bp.other_page)?;
355 if page.get_size_exp() != exp {
356 self.free(bp.other_page)?;
357 page = self.allocate(exp)?;
358 }
359 page
360 };
361 let content_page_id = content_page.get_index();
362 content_page.write_all(&info.buffer)?;
363 self.flush_page(content_page)?;
364
365 let mut root_buffer = [0; 19];
366 write_u64(&mut root_buffer[0..8], content_page_id);
367 write_u64(&mut root_buffer[8..16], bp.page);
368 let result = write_root_page(&mut root, &mut root_buffer, info.version, last_flush)?;
369 bp.other_page = bp.page;
370 bp.page = content_page_id;
371 result
372 } else {
373 write_root_page(&mut root, &mut info.buffer, info.version, last_flush)?
374 };
375 self.flush_page(root)?;
376 holder.flush_counter = order;
377 Ok(())
378 }
379
380 pub fn exp_from_content_size(&self, size: u64) -> u8 {
381 self.device.exp_from_content_size(size)
382 }
383
384 pub fn read_root_journal(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
385 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
386 let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
387 Allocator::read_root_page_int(
388 page,
389 buffer_size,
390 &mut monitor.journal_holder,
391 &mut counter_monitor.journal,
392 )
393 }
394
395 pub fn read_root_address(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
396 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
397 let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
398 Allocator::read_root_page_int(
399 page,
400 buffer_size,
401 &mut monitor.address_holder,
402 &mut counter_monitor.address.0,
403 )
404 }
405 pub fn create_address_root(&self, page: Page) -> PERes<()> {
406 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
407 monitor.address_holder.page = page.get_index();
408 monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION;
409 monitor.address_holder.buffer = Some(Vec::new());
410 monitor.address_holder.dirty = true;
411 Ok(())
412 }
413
414 pub fn read_address_buffer(&self, page: u64) -> PERes<Option<Vec<u8>>> {
415 let mut root = self.load_page(page)?;
416 match root.read_u8() {
417 SEGMENTS_ROOT_PAGE_VERSION_0 => {
418 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
419 monitor.address_holder.page = page;
420 monitor.address_holder.dirty = false;
421 monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION_0;
422 let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
423 let mut buffer_0 = vec![0; 19];
424 let mut buffer_1 = vec![0; 19];
425 InfallibleRead::read_exact(&mut root, &mut buffer_0);
426 InfallibleRead::read_exact(&mut root, &mut buffer_1);
427 let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
428 let buffer = if first { buffer_0 } else { buffer_1 };
429 counter_monitor.address.0.flush_counter = flush;
430 let page_id = read_u64(&buffer[0..8]);
431 let other_page_id = read_u64(&buffer[8..16]);
432 counter_monitor.address.1.page = page_id;
433 counter_monitor.address.1.other_page = other_page_id;
434 if page_id != 0 {
435 let page = self.load_page(page_id)?;
436 let buffer = page.content();
437 monitor.address_holder.buffer = Some(buffer.clone());
438 Ok(Some(buffer))
439 } else {
440 monitor.address_holder.buffer = None;
441 Ok(None)
442 }
443 }
444 _ => panic!("version not supported"),
445 }
446 }
447
448 fn read_root_page_int(
449 page: &mut ReadPage,
450 buffer_size: usize,
451 holder: &mut RootPageHolder,
452 counter: &mut Counter,
453 ) -> Vec<u8> {
454 let mut buffer_0 = vec![0; buffer_size];
455 let mut buffer_1 = vec![0; buffer_size];
456 InfallibleRead::read_exact(page, &mut buffer_0);
457 InfallibleRead::read_exact(page, &mut buffer_1);
458 let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
459 let buffer = if first { buffer_0 } else { buffer_1 };
460 holder.buffer = Some(buffer.clone());
461 counter.flush_counter = flush;
462 buffer
463 }
464
465 pub fn flush_root_page(&self, page: Page) -> PERes<()> {
466 self.flush_page(page)
467 }
468
469 pub fn device(&self) -> &dyn Device {
470 &*self.device
471 }
472
473 pub fn device_sync(&self) -> PERes<bool> {
474 self.flush_free_list()?;
475 let free_list_data;
476 let journal_data;
477 let address_data;
478 {
479 let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
480 free_list_data = monitor.free_list_holder.write_data();
481 journal_data = monitor.journal_holder.write_data();
482 address_data = monitor.address_holder.write_data();
483 }
484 {
485 let mut fm = self.flush_count.lock().expect("flush count lock not poisoned");
486 if let Some(info) = free_list_data {
487 self.write_root_page_info(info, &mut fm.free_list, None)?;
488 }
489 if let Some(info) = journal_data {
490 self.write_root_page_info(info, &mut fm.journal, None)?;
491 }
492 if let Some(info) = address_data {
493 let (counter, data) = &mut fm.address;
494 self.write_root_page_info(info, counter, Some(data))?;
495 }
496 self.device.sync()?;
497 }
498 let result = std::mem::take(
499 &mut self
500 .release_next_sync
501 .lock()
502 .expect("next sync lock not poisoned")
503 .to_release,
504 );
505
506 Ok(result.is_empty())
507 }
508
509 pub fn need_sync(&self) -> bool {
510 self.root_monitor
511 .lock()
512 .expect("root monitor lock not poisoned")
513 .is_dirty()
514 || !self
515 .release_next_sync
516 .lock()
517 .expect("release next sync lock not poisoned")
518 .to_release
519 .is_empty()
520 }
521
522 pub fn release(self) -> Box<dyn Device> {
523 self.device
524 }
525
526 #[cfg(test)]
527 pub fn free_file_lock(&self) -> PERes<()> {
528 self.device.release_file_lock()
529 }
530
531 #[cfg(feature = "experimental_inspect")]
532 pub fn page_state(&self, page: u64) -> Option<crate::inspect::PageState> {
533 if let Ok(Some(p)) = self.device.load_page_if_exists(page) {
534 Some(crate::inspect::PageState::new(
535 p.get_index(),
536 p.get_size_exp(),
537 p.is_free().unwrap_or(false),
538 ))
539 } else {
540 None
541 }
542 }
543}