persy/
allocator.rs

1use crate::{
2    address::segment::{SEGMENTS_ROOT_PAGE_VERSION, SEGMENTS_ROOT_PAGE_VERSION_0},
3    allocator::{cache::Cache, free_list::FreeList},
4    config::Config,
5    device::{Device, Page, PageOps, ReadPage, UpdateList},
6    error::PERes,
7    flush_checksum::{double_buffer_check, write_root_page},
8    snapshot::data::PendingClean,
9    util::io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat},
10};
11use std::{io::Write, sync::Arc, sync::Mutex};
12
13mod cache;
14pub(crate) mod free_list;
15#[cfg(test)]
16mod tests;
17
18const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^10
19const ALLOCATOR_ROOT_PAGE_VERSION_V0: u8 = 0;
20const ALLOCATOR_ROOT_PAGE_VERSION_V1: u8 = 1;
21const ALLOCATOR_ROOT_PAGE_VERSION: u8 = ALLOCATOR_ROOT_PAGE_VERSION_V1;
22
23struct RootWriteInfo {
24    page: u64,
25    buffer: Vec<u8>,
26    version: u8,
27}
28
29#[derive(Clone, Default)]
30struct AddressData {
31    page: u64,
32    other_page: u64,
33}
34
35#[derive(Default)]
36pub struct RootPageHolder {
37    page: u64,
38    buffer: Option<Vec<u8>>,
39    dirty: bool,
40    version: u8,
41}
42
43impl RootPageHolder {
44    fn write_data(&mut self) -> Option<RootWriteInfo> {
45        if let Some(buff) = &self.buffer {
46            if self.dirty {
47                self.dirty = false;
48                return Some(RootWriteInfo {
49                    page: self.page,
50                    buffer: buff.clone(),
51                    version: self.version,
52                });
53            }
54        }
55        None
56    }
57}
58
59#[derive(Default)]
60pub struct Counter {
61    flush_counter: u8,
62}
63#[derive(Default)]
64struct FlushCount {
65    free_list: Counter,
66    journal: Counter,
67    address: (Counter, AddressData),
68}
69
70// Root pages monitor to avoid to write a root page before the relative backup root page is not
71// flushed
72#[derive(Default)]
73struct RootMonitor {
74    free_list_holder: RootPageHolder,
75    journal_holder: RootPageHolder,
76    address_holder: RootPageHolder,
77}
78impl RootMonitor {
79    fn is_dirty(&self) -> bool {
80        self.free_list_holder.dirty || self.journal_holder.dirty || self.address_holder.dirty
81    }
82}
83
84#[derive(Default)]
85struct ReleaseNextSync {
86    to_release: Vec<Arc<PendingClean>>,
87}
88
89// TODO: Manage defragmentation by merging/splitting pages in the free list
90pub struct Allocator {
91    device: Box<dyn Device>,
92    free_list: Mutex<FreeList>,
93    cache: Mutex<Cache>,
94    root_monitor: Mutex<RootMonitor>,
95    flush_count: Mutex<FlushCount>,
96    release_next_sync: Mutex<ReleaseNextSync>,
97    page: u64,
98}
99
100impl Allocator {
101    pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PERes<Self> {
102        let mut root_monitor = RootMonitor::default();
103        let mut flush_count = FlushCount::default();
104        let mut pg = dr.load_page(page)?;
105        let mut freelist = FreeList::read(&mut pg, &mut root_monitor.free_list_holder, &mut flush_count.free_list)?;
106        freelist.check_and_clean(&*dr)?;
107
108        let cache_size = config.cache_size();
109        let cache_age_limit = config.cache_age_limit();
110        Ok(Allocator {
111            device: dr,
112            free_list: Mutex::new(freelist),
113            cache: Mutex::new(Cache::new(cache_size, cache_age_limit)),
114            root_monitor: Mutex::new(root_monitor),
115            flush_count: Mutex::new(flush_count),
116            release_next_sync: Default::default(),
117            page,
118        })
119    }
120
121    pub fn init(dr: Box<dyn Device>, config: &Config) -> PERes<(u64, Allocator)> {
122        let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
123        let mut list = FreeList::default();
124        let mut counter = Counter::default();
125        let buffer = list.write_list();
126        Allocator::write_root_page(&mut page, &mut counter, buffer.to_vec(), ALLOCATOR_ROOT_PAGE_VERSION)?;
127        dr.flush_page(&page)?;
128        let allocate_page = page.get_index();
129        Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
130    }
131
132    pub fn load_page_not_free(&self, page: u64) -> PERes<Option<ReadPage>> {
133        {
134            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
135            if let Some(pg) = cache.get(page) {
136                if pg.is_free()? {
137                    return Ok(None);
138                } else {
139                    return Ok(Some(pg));
140                }
141            }
142        }
143        if let Some(load) = self.device.load_page_if_exists(page)? {
144            if load.is_free()? {
145                Ok(None)
146            } else {
147                let mut cache = self.cache.lock().expect("cache lock is not poisoned");
148                cache.put(page, load.clone_read());
149                Ok(Some(load))
150            }
151        } else {
152            Ok(None)
153        }
154    }
155
156    pub(crate) fn to_release_next_sync(&self, to_release: Arc<PendingClean>) {
157        self.release_next_sync
158            .lock()
159            .expect("next sync lock not poisoned")
160            .to_release
161            .push(to_release);
162    }
163
164    pub fn load_page(&self, page: u64) -> PERes<ReadPage> {
165        let load = self.read_page_int(page)?;
166        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
167        Ok(load)
168    }
169
170    pub fn write_page(&self, page: u64) -> PERes<Page> {
171        let load = self.write_page_int(page)?;
172        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
173        Ok(load)
174    }
175
176    fn read_page_int(&self, page: u64) -> PERes<ReadPage> {
177        {
178            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
179            if let Some(pg) = cache.get(page) {
180                return Ok(pg);
181            }
182        }
183        let load = self.device.load_page(page)?;
184        {
185            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
186            cache.put(page, load.clone_read());
187        }
188        Ok(load)
189    }
190
191    fn write_page_int(&self, page: u64) -> PERes<Page> {
192        let cache_result;
193        {
194            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
195            cache_result = cache.get(page);
196        }
197        if let Some(pg) = cache_result {
198            return Ok(pg.clone_write());
199        }
200        let load = self.device.load_page(page)?;
201        {
202            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
203            cache.put(page, load.clone_read());
204        }
205        Ok(load.clone_write())
206    }
207
208    pub fn allocate(&self, exp: u8) -> PERes<Page> {
209        let mut fl = self.free_list.lock().expect("free list lock not poisoned");
210        let page = fl.get_next_available(exp);
211        if page != 0u64 {
212            let next = self.device.mark_allocated(page)?;
213            fl.set_next_available_if_match(exp, page, next);
214            {
215                let mut cache = self.cache.lock().expect("cache lock is not poisoned");
216                cache.remove(page);
217            }
218            Ok(Page::new_alloc(page, exp))
219        } else {
220            self.device.create_page(exp)
221        }
222    }
223
224    pub fn flush_journal(&self, page: &Page) -> PERes<()> {
225        self.device.flush_page(page)?;
226        let mut cache = self.cache.lock().expect("cache lock is not poisoned");
227        cache.remove(page.get_index());
228        Ok(())
229    }
230
231    pub fn flush_page(&self, page: Page) -> PERes<()> {
232        self.device.flush_page(&page)?;
233        {
234            let mut cache = self.cache.lock().expect("cache lock is not poisoned");
235            cache.put(page.get_index(), page.make_read());
236        }
237        Ok(())
238    }
239
240    pub fn remove_from_free(&self, page: u64, exp: u8) -> PERes<()> {
241        let mut fl = self.free_list.lock().expect("free list lock not poisoned");
242        let mut pg = self.device.load_free_page(page)?;
243        if pg.is_free()? {
244            if pg.get_prev_free() == 0 {
245                fl.set_free(exp, pg.get_next_free());
246            } else {
247                let mut next = self.device.load_free_page(pg.get_next_free())?;
248                next.set_prev_free(pg.get_prev_free());
249                self.device.flush_free_page(&next)?;
250                let mut prev = self.device.load_free_page(pg.get_prev_free())?;
251                prev.set_next_free(pg.get_next_free());
252                self.device.flush_free_page(&prev)?;
253            }
254            pg.set_free(false)?;
255            self.device.flush_free_page(&pg)?;
256        } else {
257            // do nothing the free list has it's own logic to re-compute heads and tails.
258        }
259        Ok(())
260    }
261
262    /// Recover free do not not have debug asserts for already freed pages
263    pub fn recover_free(&self, page: u64) -> PERes<()> {
264        if let Ok(p) = self.device.load_free_page(page) {
265            if !p.is_free()? {
266                self.free(page)?;
267            } else {
268                self.free_list
269                    .lock()
270                    .expect("free list lock not poisoned")
271                    .recover_free(p)?;
272            }
273        }
274        Ok(())
275    }
276    pub fn recover_sync(&self) -> PERes<bool> {
277        self.free_list
278            .lock()
279            .expect("free list lock not poisoned")
280            .check_and_clean(&*self.device)?;
281        self.device_sync()
282    }
283    pub fn trim_free_at_end(&self) -> PERes<()> {
284        let mut fl = self.free_list.lock().expect("free list lock not poisoned");
285        let list: &mut FreeList = &mut fl;
286        self.device.trim_end_pages(list)?;
287        Ok(())
288    }
289
290    pub fn free_pages(&self, pages: &[u64]) -> PERes<()> {
291        let mut fl = self.free_list.lock().expect("free list lock not poisoned");
292        let list: &mut FreeList = &mut fl;
293        self.cache.lock().expect("cache lock is not poisoned").remove_all(pages);
294        for page in pages {
295            self.device.trim_or_free_page(*page, list)?;
296        }
297        Ok(())
298    }
299
300    pub fn free(&self, page: u64) -> PERes<()> {
301        self.free_pages(&[page])
302    }
303
304    pub fn flush_free_list(&self) -> PERes<()> {
305        let mut lock = self.free_list.lock().expect("free list lock not poisoned");
306        if lock.is_changed() {
307            let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
308            let page = self.device.load_page(self.page)?.clone_write();
309            let mut buffer = lock.write_list().to_vec();
310            let holder = &mut monitor.free_list_holder;
311            self.write_root(page.get_index(), holder, &mut buffer, ALLOCATOR_ROOT_PAGE_VERSION)?;
312            // I do not do the disk sync here because is every time done by the caller.
313            lock.reset_changed_flag();
314        }
315        Ok(())
316    }
317
318    pub fn write_address_root(&self, root: u64, buffer: &mut [u8], version: u8) -> PERes<()> {
319        let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
320        self.write_root(root, &mut monitor.address_holder, buffer, version)
321    }
322    pub fn write_journal_root(&self, root: Page, buffer: &mut [u8], version: u8) -> PERes<()> {
323        let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
324        self.write_root(root.get_index(), &mut monitor.journal_holder, buffer, version)
325    }
326
327    fn write_root(&self, root: u64, holder: &mut RootPageHolder, buffer: &mut [u8], version: u8) -> PERes<()> {
328        holder.page = root;
329        holder.version = version;
330        holder.buffer = Some(Vec::from(buffer));
331        holder.dirty = true;
332        Ok(())
333    }
334
335    fn write_root_page(root: &mut Page, holder: &mut Counter, mut buffer: Vec<u8>, version: u8) -> PERes<()> {
336        let last_flush = holder.flush_counter;
337        let order = write_root_page(root, &mut buffer, version, last_flush)?;
338        holder.flush_counter = order;
339        Ok(())
340    }
341    fn write_root_page_info(
342        &self,
343        mut info: RootWriteInfo,
344        holder: &mut Counter,
345        ad: Option<&mut AddressData>,
346    ) -> PERes<()> {
347        let mut root = self.write_page(info.page)?;
348        let last_flush = holder.flush_counter;
349        let order = if let Some(bp) = ad {
350            let exp = self.exp_from_content_size(info.buffer.len() as u64);
351            let mut content_page = if bp.other_page == 0 {
352                self.allocate(exp)?
353            } else {
354                let mut page = self.write_page(bp.other_page)?;
355                if page.get_size_exp() != exp {
356                    self.free(bp.other_page)?;
357                    page = self.allocate(exp)?;
358                }
359                page
360            };
361            let content_page_id = content_page.get_index();
362            content_page.write_all(&info.buffer)?;
363            self.flush_page(content_page)?;
364
365            let mut root_buffer = [0; 19];
366            write_u64(&mut root_buffer[0..8], content_page_id);
367            write_u64(&mut root_buffer[8..16], bp.page);
368            let result = write_root_page(&mut root, &mut root_buffer, info.version, last_flush)?;
369            bp.other_page = bp.page;
370            bp.page = content_page_id;
371            result
372        } else {
373            write_root_page(&mut root, &mut info.buffer, info.version, last_flush)?
374        };
375        self.flush_page(root)?;
376        holder.flush_counter = order;
377        Ok(())
378    }
379
380    pub fn exp_from_content_size(&self, size: u64) -> u8 {
381        self.device.exp_from_content_size(size)
382    }
383
384    pub fn read_root_journal(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
385        let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
386        let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
387        Allocator::read_root_page_int(
388            page,
389            buffer_size,
390            &mut monitor.journal_holder,
391            &mut counter_monitor.journal,
392        )
393    }
394
395    pub fn read_root_address(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
396        let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
397        let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
398        Allocator::read_root_page_int(
399            page,
400            buffer_size,
401            &mut monitor.address_holder,
402            &mut counter_monitor.address.0,
403        )
404    }
405    pub fn create_address_root(&self, page: Page) -> PERes<()> {
406        let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
407        monitor.address_holder.page = page.get_index();
408        monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION;
409        monitor.address_holder.buffer = Some(Vec::new());
410        monitor.address_holder.dirty = true;
411        Ok(())
412    }
413
414    pub fn read_address_buffer(&self, page: u64) -> PERes<Option<Vec<u8>>> {
415        let mut root = self.load_page(page)?;
416        match root.read_u8() {
417            SEGMENTS_ROOT_PAGE_VERSION_0 => {
418                let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
419                monitor.address_holder.page = page;
420                monitor.address_holder.dirty = false;
421                monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION_0;
422                let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
423                let mut buffer_0 = vec![0; 19];
424                let mut buffer_1 = vec![0; 19];
425                InfallibleRead::read_exact(&mut root, &mut buffer_0);
426                InfallibleRead::read_exact(&mut root, &mut buffer_1);
427                let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
428                let buffer = if first { buffer_0 } else { buffer_1 };
429                counter_monitor.address.0.flush_counter = flush;
430                let page_id = read_u64(&buffer[0..8]);
431                let other_page_id = read_u64(&buffer[8..16]);
432                counter_monitor.address.1.page = page_id;
433                counter_monitor.address.1.other_page = other_page_id;
434                if page_id != 0 {
435                    let page = self.load_page(page_id)?;
436                    let buffer = page.content();
437                    monitor.address_holder.buffer = Some(buffer.clone());
438                    Ok(Some(buffer))
439                } else {
440                    monitor.address_holder.buffer = None;
441                    Ok(None)
442                }
443            }
444            _ => panic!("version not supported"),
445        }
446    }
447
448    fn read_root_page_int(
449        page: &mut ReadPage,
450        buffer_size: usize,
451        holder: &mut RootPageHolder,
452        counter: &mut Counter,
453    ) -> Vec<u8> {
454        let mut buffer_0 = vec![0; buffer_size];
455        let mut buffer_1 = vec![0; buffer_size];
456        InfallibleRead::read_exact(page, &mut buffer_0);
457        InfallibleRead::read_exact(page, &mut buffer_1);
458        let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
459        let buffer = if first { buffer_0 } else { buffer_1 };
460        holder.buffer = Some(buffer.clone());
461        counter.flush_counter = flush;
462        buffer
463    }
464
465    pub fn flush_root_page(&self, page: Page) -> PERes<()> {
466        self.flush_page(page)
467    }
468
469    pub fn device(&self) -> &dyn Device {
470        &*self.device
471    }
472
473    pub fn device_sync(&self) -> PERes<bool> {
474        self.flush_free_list()?;
475        let free_list_data;
476        let journal_data;
477        let address_data;
478        {
479            let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
480            free_list_data = monitor.free_list_holder.write_data();
481            journal_data = monitor.journal_holder.write_data();
482            address_data = monitor.address_holder.write_data();
483        }
484        {
485            let mut fm = self.flush_count.lock().expect("flush count lock not poisoned");
486            if let Some(info) = free_list_data {
487                self.write_root_page_info(info, &mut fm.free_list, None)?;
488            }
489            if let Some(info) = journal_data {
490                self.write_root_page_info(info, &mut fm.journal, None)?;
491            }
492            if let Some(info) = address_data {
493                let (counter, data) = &mut fm.address;
494                self.write_root_page_info(info, counter, Some(data))?;
495            }
496            self.device.sync()?;
497        }
498        let result = std::mem::take(
499            &mut self
500                .release_next_sync
501                .lock()
502                .expect("next sync lock not poisoned")
503                .to_release,
504        );
505
506        Ok(result.is_empty())
507    }
508
509    pub fn need_sync(&self) -> bool {
510        self.root_monitor
511            .lock()
512            .expect("root monitor lock not poisoned")
513            .is_dirty()
514            || !self
515                .release_next_sync
516                .lock()
517                .expect("release next sync lock not poisoned")
518                .to_release
519                .is_empty()
520    }
521
522    pub fn release(self) -> Box<dyn Device> {
523        self.device
524    }
525
526    #[cfg(test)]
527    pub fn free_file_lock(&self) -> PERes<()> {
528        self.device.release_file_lock()
529    }
530
531    #[cfg(feature = "experimental_inspect")]
532    pub fn page_state(&self, page: u64) -> Option<crate::inspect::PageState> {
533        if let Ok(Some(p)) = self.device.load_page_if_exists(page) {
534            Some(crate::inspect::PageState::new(
535                p.get_index(),
536                p.get_size_exp(),
537                p.is_free().unwrap_or(false),
538            ))
539        } else {
540            None
541        }
542    }
543}