use crate::{
address::segment::{SEGMENTS_ROOT_PAGE_VERSION, SEGMENTS_ROOT_PAGE_VERSION_0},
allocator::{cache::Cache, free_list::FreeList},
config::Config,
device::{Device, Page, PageOps, ReadPage, UpdateList},
error::PERes,
flush_checksum::{double_buffer_check, write_root_page},
io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat},
snapshot::data::PendingClean,
};
use std::{io::Write, sync::Arc, sync::Mutex};
mod cache;
pub(crate) mod free_list;
#[cfg(test)]
mod tests;
const ALLOCATOR_PAGE_EXP: u8 = 10; const ALLOCATOR_ROOT_PAGE_VERSION_V0: u8 = 0;
const ALLOCATOR_ROOT_PAGE_VERSION_V1: u8 = 1;
const ALLOCATOR_ROOT_PAGE_VERSION: u8 = ALLOCATOR_ROOT_PAGE_VERSION_V1;
struct RootWriteInfo {
page: u64,
buffer: Vec<u8>,
version: u8,
}
#[derive(Clone, Default)]
struct AddressData {
page: u64,
other_page: u64,
}
#[derive(Default)]
pub struct RootPageHolder {
page: u64,
buffer: Option<Vec<u8>>,
dirty: bool,
version: u8,
}
impl RootPageHolder {
fn write_data(&mut self) -> Option<RootWriteInfo> {
if let Some(buff) = &self.buffer {
if self.dirty {
self.dirty = false;
return Some(RootWriteInfo {
page: self.page,
buffer: buff.clone(),
version: self.version,
});
}
}
None
}
}
#[derive(Default)]
pub struct Counter {
flush_counter: u8,
}
#[derive(Default)]
struct FlushCount {
free_list: Counter,
journal: Counter,
address: (Counter, AddressData),
}
#[derive(Default)]
struct RootMonitor {
free_list_holder: RootPageHolder,
journal_holder: RootPageHolder,
address_holder: RootPageHolder,
}
impl RootMonitor {
fn is_dirty(&self) -> bool {
self.free_list_holder.dirty || self.journal_holder.dirty || self.address_holder.dirty
}
}
#[derive(Default)]
struct ReleaseNextSync {
to_release: Vec<Arc<PendingClean>>,
}
pub struct Allocator {
device: Box<dyn Device>,
free_list: Mutex<FreeList>,
cache: Mutex<Cache>,
root_monitor: Mutex<RootMonitor>,
flush_count: Mutex<FlushCount>,
release_next_sync: Mutex<ReleaseNextSync>,
page: u64,
}
impl Allocator {
pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PERes<Allocator> {
let mut root_monitor = RootMonitor::default();
let mut flush_count = FlushCount::default();
let mut pg = dr.load_page(page)?;
let mut freelist = FreeList::read(&mut pg, &mut root_monitor.free_list_holder, &mut flush_count.free_list)?;
freelist.check_and_clean(&*dr)?;
let cache_size = config.cache_size();
let cache_age_limit = config.cache_age_limit();
Ok(Allocator {
device: dr,
free_list: Mutex::new(freelist),
cache: Mutex::new(Cache::new(cache_size, cache_age_limit)),
root_monitor: Mutex::new(root_monitor),
flush_count: Mutex::new(flush_count),
release_next_sync: Default::default(),
page,
})
}
pub fn init(dr: Box<dyn Device>, config: &Config) -> PERes<(u64, Allocator)> {
let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
let mut list = FreeList::default();
let mut counter = Counter::default();
let buffer = list.write_list();
Allocator::write_root_page(&mut page, &mut counter, buffer.to_vec(), ALLOCATOR_ROOT_PAGE_VERSION)?;
dr.flush_page(&page)?;
let allocate_page = page.get_index();
Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
}
pub fn load_page_not_free(&self, page: u64) -> PERes<Option<ReadPage>> {
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
if let Some(pg) = cache.get(page) {
if pg.is_free()? {
return Ok(None);
} else {
return Ok(Some(pg));
}
}
}
if let Some(load) = self.device.load_page_if_exists(page)? {
if load.is_free()? {
Ok(None)
} else {
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.put(page, load.clone_read());
Ok(Some(load))
}
} else {
Ok(None)
}
}
pub(crate) fn to_release_next_sync(&self, to_release: Arc<PendingClean>) {
self.release_next_sync
.lock()
.expect("next sync lock not poisoned")
.to_release
.push(to_release);
}
pub fn load_page(&self, page: u64) -> PERes<ReadPage> {
let load = self.read_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
pub fn write_page(&self, page: u64) -> PERes<Page> {
let load = self.write_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
fn read_page_int(&self, page: u64) -> PERes<ReadPage> {
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
if let Some(pg) = cache.get(page) {
return Ok(pg);
}
}
let load = self.device.load_page(page)?;
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.put(page, load.clone_read());
}
Ok(load)
}
fn write_page_int(&self, page: u64) -> PERes<Page> {
let cache_result;
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache_result = cache.get(page);
}
if let Some(pg) = cache_result {
return Ok(pg.clone_write());
}
let load = self.device.load_page(page)?;
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.put(page, load.clone_read());
}
Ok(load.clone_write())
}
pub fn allocate(&self, exp: u8) -> PERes<Page> {
let mut fl = self.free_list.lock().expect("free list lock not poisoned");
let page = fl.get_next_available(exp);
if page != 0u64 {
let next = self.device.mark_allocated(page)?;
fl.set_next_available_if_match(exp, page, next);
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.remove(page);
}
Ok(Page::new_alloc(page, exp))
} else {
self.device.create_page(exp)
}
}
pub fn flush_journal(&self, page: &Page) -> PERes<()> {
self.device.flush_page(page)?;
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.remove(page.get_index());
Ok(())
}
pub fn flush_page(&self, page: Page) -> PERes<()> {
self.device.flush_page(&page)?;
{
let mut cache = self.cache.lock().expect("cache lock is not poisoned");
cache.put(page.get_index(), page.make_read());
}
Ok(())
}
pub fn remove_from_free(&self, page: u64, exp: u8) -> PERes<()> {
let mut fl = self.free_list.lock().expect("free list lock not poisoned");
let mut pg = self.device.load_free_page(page)?;
if pg.is_free()? {
if pg.get_prev_free() == 0 {
fl.set_free(exp, pg.get_next_free());
} else {
let mut next = self.device.load_free_page(pg.get_next_free())?;
next.set_prev_free(pg.get_prev_free());
self.device.flush_free_page(&next)?;
let mut prev = self.device.load_free_page(pg.get_prev_free())?;
prev.set_next_free(pg.get_next_free());
self.device.flush_free_page(&prev)?;
}
pg.set_free(false)?;
self.device.flush_free_page(&pg)?;
} else {
}
Ok(())
}
pub fn recover_free(&self, page: u64) -> PERes<()> {
if let Ok(p) = self.device.load_free_page(page) {
if !p.is_free()? {
self.free(page)?;
} else {
self.free_list
.lock()
.expect("free list lock not poisoned")
.recover_free(p)?;
}
}
Ok(())
}
pub fn recover_sync(&self) -> PERes<bool> {
self.free_list
.lock()
.expect("free list lock not poisoned")
.check_and_clean(&*self.device)?;
self.disc_sync()
}
pub fn trim_free_at_end(&self) -> PERes<()> {
let mut fl = self.free_list.lock().expect("free list lock not poisoned");
let list: &mut FreeList = &mut fl;
self.device.trim_end_pages(list)?;
Ok(())
}
pub fn free_pages(&self, pages: &[u64]) -> PERes<()> {
let mut fl = self.free_list.lock().expect("free list lock not poisoned");
let list: &mut FreeList = &mut fl;
self.cache.lock().expect("cache lock is not poisoned").remove_all(pages);
for page in pages {
self.device.trim_or_free_page(*page, list)?;
}
Ok(())
}
pub fn free(&self, page: u64) -> PERes<()> {
self.free_pages(&[page])
}
pub fn flush_free_list(&self) -> PERes<()> {
let mut lock = self.free_list.lock().expect("free list lock not poisoned");
if lock.is_changed() {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
let page = self.device.load_page(self.page)?.clone_write();
let mut buffer = lock.write_list().to_vec();
let holder = &mut monitor.free_list_holder;
self.write_root(page.get_index(), holder, &mut buffer, ALLOCATOR_ROOT_PAGE_VERSION)?;
lock.reset_changed_flag();
}
Ok(())
}
pub fn write_address_root(&self, root: u64, buffer: &mut [u8], version: u8) -> PERes<()> {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
self.write_root(root, &mut monitor.address_holder, buffer, version)
}
pub fn write_journal_root(&self, root: Page, buffer: &mut [u8], version: u8) -> PERes<()> {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
self.write_root(root.get_index(), &mut monitor.journal_holder, buffer, version)
}
fn write_root(&self, root: u64, holder: &mut RootPageHolder, buffer: &mut [u8], version: u8) -> PERes<()> {
holder.page = root;
holder.version = version;
holder.buffer = Some(Vec::from(buffer));
holder.dirty = true;
Ok(())
}
fn write_root_page(root: &mut Page, holder: &mut Counter, mut buffer: Vec<u8>, version: u8) -> PERes<()> {
let last_flush = holder.flush_counter;
let order = write_root_page(root, &mut buffer, version, last_flush)?;
holder.flush_counter = order;
Ok(())
}
fn write_root_page_info(
&self,
mut info: RootWriteInfo,
holder: &mut Counter,
ad: Option<&mut AddressData>,
) -> PERes<()> {
let mut root = self.write_page(info.page)?;
let last_flush = holder.flush_counter;
let order = if let Some(bp) = ad {
let exp = self.exp_from_content_size(info.buffer.len() as u64);
let mut content_page = if bp.other_page == 0 {
self.allocate(exp)?
} else {
let mut page = self.write_page(bp.other_page)?;
if page.get_size_exp() != exp {
self.free(bp.other_page)?;
page = self.allocate(exp)?;
}
page
};
let content_page_id = content_page.get_index();
content_page.write_all(&info.buffer)?;
self.flush_page(content_page)?;
let mut root_buffer = [0; 19];
write_u64(&mut root_buffer[0..8], content_page_id);
write_u64(&mut root_buffer[8..16], bp.page);
let result = write_root_page(&mut root, &mut root_buffer, info.version, last_flush)?;
bp.other_page = bp.page;
bp.page = content_page_id;
result
} else {
write_root_page(&mut root, &mut info.buffer, info.version, last_flush)?
};
self.flush_page(root)?;
holder.flush_counter = order;
Ok(())
}
pub fn exp_from_content_size(&self, size: u64) -> u8 {
self.device.exp_from_content_size(size)
}
pub fn read_root_journal(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
Allocator::read_root_page_int(
page,
buffer_size,
&mut monitor.journal_holder,
&mut counter_monitor.journal,
)
}
pub fn read_root_address(&self, page: &mut ReadPage, buffer_size: usize) -> Vec<u8> {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
Allocator::read_root_page_int(
page,
buffer_size,
&mut monitor.address_holder,
&mut counter_monitor.address.0,
)
}
pub fn create_address_root(&self, page: Page) -> PERes<()> {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
monitor.address_holder.page = page.get_index();
monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION;
monitor.address_holder.buffer = Some(Vec::new());
monitor.address_holder.dirty = true;
Ok(())
}
pub fn read_address_buffer(&self, page: u64) -> PERes<Option<Vec<u8>>> {
let mut root = self.load_page(page)?;
match root.read_u8() {
SEGMENTS_ROOT_PAGE_VERSION_0 => {
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
monitor.address_holder.page = page;
monitor.address_holder.dirty = false;
monitor.address_holder.version = SEGMENTS_ROOT_PAGE_VERSION_0;
let mut counter_monitor = self.flush_count.lock().expect("flush count lock not poisoned");
let mut buffer_0 = vec![0; 19];
let mut buffer_1 = vec![0; 19];
InfallibleRead::read_exact(&mut root, &mut buffer_0);
InfallibleRead::read_exact(&mut root, &mut buffer_1);
let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
let buffer = if first { buffer_0 } else { buffer_1 };
counter_monitor.address.0.flush_counter = flush;
let page_id = read_u64(&buffer[0..8]);
let other_page_id = read_u64(&buffer[8..16]);
counter_monitor.address.1.page = page_id;
counter_monitor.address.1.other_page = other_page_id;
if page_id != 0 {
let page = self.load_page(page_id)?;
let buffer = page.content();
monitor.address_holder.buffer = Some(buffer.clone());
Ok(Some(buffer))
} else {
monitor.address_holder.buffer = None;
Ok(None)
}
}
_ => panic!("version not supported"),
}
}
fn read_root_page_int(
page: &mut ReadPage,
buffer_size: usize,
holder: &mut RootPageHolder,
counter: &mut Counter,
) -> Vec<u8> {
let mut buffer_0 = vec![0; buffer_size];
let mut buffer_1 = vec![0; buffer_size];
InfallibleRead::read_exact(page, &mut buffer_0);
InfallibleRead::read_exact(page, &mut buffer_1);
let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
let buffer = if first { buffer_0 } else { buffer_1 };
holder.buffer = Some(buffer.clone());
counter.flush_counter = flush;
buffer
}
pub fn flush_root_page(&self, page: Page) -> PERes<()> {
self.flush_page(page)
}
pub fn disc(&self) -> &dyn Device {
&*self.device
}
pub fn disc_sync(&self) -> PERes<bool> {
self.flush_free_list()?;
let free_list_data;
let journal_data;
let address_data;
{
let mut monitor = self.root_monitor.lock().expect("root monitor lock not poisoned");
free_list_data = monitor.free_list_holder.write_data();
journal_data = monitor.journal_holder.write_data();
address_data = monitor.address_holder.write_data();
}
{
let mut fm = self.flush_count.lock().expect("flush count lock not poisoned");
if let Some(info) = free_list_data {
self.write_root_page_info(info, &mut fm.free_list, None)?;
}
if let Some(info) = journal_data {
self.write_root_page_info(info, &mut fm.journal, None)?;
}
if let Some(info) = address_data {
let (counter, data) = &mut fm.address;
self.write_root_page_info(info, counter, Some(data))?;
}
self.device.sync()?;
}
let result = std::mem::take(
&mut self
.release_next_sync
.lock()
.expect("next sync lock not poisoned")
.to_release,
);
Ok(result.is_empty())
}
pub fn need_sync(&self) -> bool {
self.root_monitor
.lock()
.expect("root monitor lock not poisoned")
.is_dirty()
|| !self
.release_next_sync
.lock()
.expect("release next sync lock not poisoned")
.to_release
.is_empty()
}
pub fn release(self) -> Box<dyn Device> {
self.device
}
#[cfg(test)]
pub fn free_file_lock(&self) -> PERes<()> {
self.device.release_file_lock()
}
#[cfg(feature = "experimental_inspect")]
pub fn page_state(&self, page: u64) -> Option<crate::inspect::PageState> {
if let Ok(Some(p)) = self.device.load_page_if_exists(page) {
Some(crate::inspect::PageState::new(
p.get_index(),
p.get_size_exp(),
p.is_free().unwrap_or(false),
))
} else {
None
}
}
}