use crate::{
config::Config,
discref::{Device, Page, PageOps, ReadPage, UpdateList},
error::PERes,
flush_checksum::{double_buffer_check, write_root_page},
io::{read_u64, write_u64, InfallibleReadFormat},
};
use linked_hash_map::LinkedHashMap;
use std::{
io::Read,
sync::Mutex,
time::{Duration, Instant},
};
const ALLOCATOR_PAGE_EXP: u8 = 10; const ALLOCATER_ROOT_PAGE_VERSION_V0: u8 = 0;
const ALLOCATER_ROOT_PAGE_VERSION: u8 = ALLOCATER_ROOT_PAGE_VERSION_V0;
struct CacheEntry {
page: ReadPage,
instant: Instant,
}
impl CacheEntry {
fn new(page: ReadPage) -> Self {
Self {
page,
instant: Instant::now(),
}
}
}
pub struct Cache {
cache: LinkedHashMap<u64, CacheEntry>,
size: u64,
limit: u64,
age_limit: Duration,
}
impl Cache {
pub fn new(limit: u64, age_limit: Duration) -> Cache {
Cache {
cache: LinkedHashMap::new(),
size: 0,
limit,
age_limit,
}
}
fn get(&mut self, key: u64) -> Option<ReadPage> {
self.cache.get_refresh(&key).map(|val| val.page.clone_read())
}
fn put(&mut self, key: u64, value: ReadPage) {
self.size += 1 << value.get_size_exp();
if let Some(pre) = self.cache.insert(key, CacheEntry::new(value)) {
self.size -= 1 << pre.page.get_size_exp();
}
while self.size > self.limit {
if let Some(en) = self.cache.pop_front() {
self.size -= 1 << en.1.page.get_size_exp();
} else {
break;
}
}
loop {
if self
.cache
.front()
.map(|(_, e)| e.instant.elapsed() > self.age_limit)
.unwrap_or(false)
{
if let Some(en) = self.cache.pop_front() {
self.size -= 1 << en.1.page.get_size_exp();
}
} else {
break;
}
}
}
fn remove(&mut self, key: u64) {
if let Some(removed) = self.cache.remove(&key) {
self.size -= 1 << removed.page.get_size_exp();
}
}
}
struct FreeList {
list: [u64; 32],
last_flush: u8,
to_flush: bool,
}
impl FreeList {
fn new(list: [u64; 32], last_flush: u8) -> Self {
FreeList {
list,
last_flush,
to_flush: false,
}
}
fn read(page: &mut ReadPage) -> PERes<FreeList> {
match page.read_u8() {
ALLOCATER_ROOT_PAGE_VERSION_V0 => Self::read_v0(page),
_ => panic!("unsupported format"),
}
}
fn read_v0(page: &mut ReadPage) -> PERes<FreeList> {
let mut buffer_0 = [0; 259];
let mut buffer_1 = [0; 259];
let freelist;
let last_flush;
page.read_exact(&mut buffer_0)?;
page.read_exact(&mut buffer_1)?;
let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
last_flush = flush_number;
if first {
freelist = Self::read_free_list(&buffer_0);
} else {
freelist = Self::read_free_list(&buffer_1);
}
Ok(FreeList::new(freelist, last_flush))
}
pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
let mut freelist = [0; 32];
for p in &mut freelist.iter_mut().enumerate() {
let pos = 8 * p.0;
(*p.1) = read_u64(&buffer[pos..(pos + 8)]);
}
freelist
}
pub fn write_free_list(list: &[u64]) -> [u8; 259] {
let mut buffer = [0; 259];
for (index, page) in list.iter().enumerate() {
let pos = index * 8;
write_u64(&mut buffer[pos..(pos + 8)], *page);
}
buffer
}
pub fn write_list(&mut self, page: &mut Page) -> PERes<()> {
let mut buffer = Self::write_free_list(&self.list);
self.last_flush = write_root_page(page, &mut buffer, ALLOCATER_ROOT_PAGE_VERSION, self.last_flush)?;
Ok(())
}
pub fn set(&mut self, exp: u8, new_page: u64) -> u64 {
let old = self.list[exp as usize];
self.list[exp as usize] = new_page;
self.to_flush = true;
old
}
pub fn get(&self, exp: u8) -> u64 {
self.list[exp as usize]
}
}
#[derive(Default)]
struct RootMonitor {
free_list_dirty: bool,
journal_dirty: bool,
address_dirty: bool,
}
pub struct Allocator {
disc: Box<dyn Device>,
free_list: Mutex<FreeList>,
cache: Mutex<Cache>,
root_monitor: Mutex<RootMonitor>,
page: u64,
}
impl Allocator {
pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PERes<Allocator> {
let mut pg = dr.load_page(page)?;
let mut freelist = FreeList::read(&mut pg)?;
for pos in 0..freelist.list.len() {
let page = freelist.list[pos];
if page != 0 {
let lp = dr.load_page(page)?;
if !lp.is_free()? {
freelist.list[pos] = 0;
}
}
}
let cache_size = config.cache_size();
let cache_age_limit = config.cache_age_limit();
Ok(Allocator {
disc: dr,
free_list: Mutex::new(freelist),
cache: Mutex::new(Cache::new(cache_size, cache_age_limit)),
root_monitor: Default::default(),
page,
})
}
pub fn init(dr: Box<dyn Device>, config: &Config) -> PERes<(u64, Allocator)> {
let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
let mut list = FreeList::new([0; 32], 0);
list.write_list(&mut page)?;
dr.flush_page(&page)?;
let allocate_page = page.get_index();
Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
}
pub fn load_page_not_free(&self, page: u64) -> PERes<Option<ReadPage>> {
{
let mut cache = self.cache.lock()?;
if let Some(pg) = cache.get(page) {
if pg.is_free()? {
return Ok(None);
} else {
return Ok(Some(pg));
}
}
}
if let Some(load) = self.disc.load_page_if_exists(page)? {
if load.is_free()? {
Ok(None)
} else {
let mut cache = self.cache.lock()?;
cache.put(page, load.clone_read());
Ok(Some(load))
}
} else {
Ok(None)
}
}
pub fn load_page(&self, page: u64) -> PERes<ReadPage> {
let load = self.read_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
pub fn write_page(&self, page: u64) -> PERes<Page> {
let load = self.write_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
fn read_page_int(&self, page: u64) -> PERes<ReadPage> {
{
let mut cache = self.cache.lock()?;
if let Some(pg) = cache.get(page) {
return Ok(pg);
}
}
let load = self.disc.load_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page, load.clone_read());
}
Ok(load)
}
fn write_page_int(&self, page: u64) -> PERes<Page> {
let cache_result;
{
let mut cache = self.cache.lock()?;
cache_result = cache.get(page);
}
if let Some(pg) = cache_result {
return Ok(pg.clone_write());
}
let load = self.disc.load_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page, load.clone_read());
}
Ok(load.clone_write())
}
pub fn allocate(&self, exp: u8) -> PERes<Page> {
{
let mut fl = self.free_list.lock()?;
let page = fl.get(exp);
if page != 0u64 {
let next = self.disc.mark_allocated(page)?;
fl.set(exp, next);
{
let mut cache = self.cache.lock()?;
cache.remove(page);
}
return Ok(Page::new_alloc(page, exp));
}
}
let page = self.disc.create_page(exp)?;
Ok(page)
}
pub fn flush_page(&self, page: Page) -> PERes<()> {
self.disc.flush_page(&page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page.get_index(), page.make_read());
}
Ok(())
}
pub fn remove_from_free(&self, page: u64, exp: u8) -> PERes<()> {
let mut fl = self.free_list.lock()?;
let mut pg = self.write_page_int(page)?;
if pg.is_free()? {
if pg.get_prev_free() == 0 {
fl.set(exp, pg.get_next_free());
} else {
let mut next = self.write_page_int(pg.get_next_free())?;
next.set_prev_free(pg.get_prev_free());
self.flush_page(next)?;
let mut prev = self.write_page_int(pg.get_prev_free())?;
prev.set_next_free(pg.get_next_free());
self.flush_page(prev)?;
}
pg.set_free(false)?;
self.flush_page(pg)?;
} else {
if fl.get(exp) == page {
fl.set(exp, 0);
} else {
let mut p = fl.get(exp);
while p != 0 {
let mut pg_in_list = self.write_page_int(p)?;
p = pg_in_list.get_next_free();
if p == page {
pg_in_list.set_next_free(0);
self.flush_page(pg_in_list)?;
break;
}
}
}
}
Ok(())
}
pub fn recover_free(&self, page: u64) -> PERes<()> {
if let Ok(p) = self.disc.load_page(page) {
if !p.is_free()? {
self.free(page)?;
}
}
Ok(())
}
pub fn trim_free_at_end(&self) -> PERes<()> {
let mut fl = self.free_list.lock()?;
let list: &mut FreeList = &mut fl;
self.disc.trim_end_pages(list)?;
Ok(())
}
pub fn free(&self, page: u64) -> PERes<()> {
self.cache.lock()?.remove(page);
let mut fl = self.free_list.lock()?;
let list: &mut FreeList = &mut fl;
self.disc.trim_or_free_page(page, list)?;
Ok(())
}
pub fn flush_free_list(&self) -> PERes<()> {
let mut lock = self.free_list.lock()?;
if lock.to_flush {
let mut monitor = self.root_monitor.lock()?;
if monitor.free_list_dirty {
self.disc().sync()?;
}
let mut pag = self.disc.load_page(self.page)?.clone_write();
lock.write_list(&mut pag)?;
self.disc.flush_page(&pag)?;
monitor.free_list_dirty = true;
lock.to_flush = false;
}
Ok(())
}
pub fn write_address_root(&self, root: Page, buffer: &mut [u8], version: u8, last_flush: u8) -> PERes<u8> {
let mut monitor = self.root_monitor.lock()?;
if monitor.address_dirty {
self.disc().sync()?;
}
let order = self.write_root_page(root, buffer, version, last_flush)?;
monitor.address_dirty = true;
Ok(order)
}
pub fn write_juornal_root(&self, root: Page, buffer: &mut [u8], version: u8, last_flush: u8) -> PERes<u8> {
let mut monitor = self.root_monitor.lock()?;
if monitor.journal_dirty {
self.disc().sync()?;
}
let order = self.write_root_page(root, buffer, version, last_flush)?;
monitor.journal_dirty = true;
Ok(order)
}
fn write_root_page(&self, mut root: Page, buffer: &mut [u8], version: u8, last_flush: u8) -> PERes<u8> {
let order = write_root_page(&mut root, buffer, version, last_flush)?;
self.flush_page(root)?;
Ok(order)
}
pub fn flush_root_page(&self, page: Page) -> PERes<()> {
self.flush_page(page)
}
pub fn disc(&self) -> &dyn Device {
&*self.disc
}
pub fn disc_sync(&self) -> PERes<()> {
let mut monitor = self.root_monitor.lock()?;
self.disc.sync()?;
monitor.free_list_dirty = false;
monitor.journal_dirty = false;
monitor.address_dirty = false;
Ok(())
}
pub fn release(self) -> Box<dyn Device> {
self.disc
}
}
impl UpdateList for FreeList {
fn update(&mut self, size: u8, page: u64) -> PERes<u64> {
let old = self.set(size, page);
debug_assert!(old != page || old == 0, "freeing: {} already free: {} ", page, old);
Ok(old)
}
fn remove(&mut self, size: u8, page: u64, next: u64) -> PERes<()> {
let old = self.set(size, next);
debug_assert!(
old == page || old == 0,
"trimmed page not in top list expected:{} current:{} ",
page,
old
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{Allocator, Cache, FreeList};
use crate::{
config::Config,
discref::{DiscRef, PageOps, ReadPage},
};
use std::{sync::Arc, time::Duration};
use tempfile::Builder;
#[test]
fn test_reuse_freed_page() {
let file = Builder::new()
.prefix("all_reuse_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = Box::new(DiscRef::new(file).unwrap());
let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
allocator.allocate(10).unwrap();
let first = allocator.allocate(10).unwrap().get_index();
let second = allocator.allocate(10).unwrap().get_index();
let third = allocator.allocate(11).unwrap().get_index();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, second);
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, first);
let val = allocator.allocate(10).unwrap().get_index();
assert!(val != first);
assert!(val != second);
let val = allocator.allocate(11).unwrap().get_index();
assert_eq!(val, third);
let val = allocator.allocate(11).unwrap().get_index();
assert!(val != third);
}
#[test]
fn test_remove_freed_page() {
let file = Builder::new()
.prefix("remove_free_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = Box::new(DiscRef::new(file).unwrap());
let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
allocator.allocate(2).unwrap();
let first = allocator.allocate(10).unwrap().get_index();
let second = allocator.allocate(10).unwrap().get_index();
let third = allocator.allocate(10).unwrap().get_index();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
allocator.remove_from_free(second, 10).unwrap();
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, third);
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, first);
}
#[test]
fn test_cache_limit_evict() {
let mut cache = Cache::new(1050 as u64, Duration::from_secs(3600));
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
#[test]
fn test_cache_limit_refresh_evict() {
let mut cache = Cache::new(1050 as u64, Duration::from_secs(3600));
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
#[test]
fn test_read_write_free_list() {
let mut list = [0u64; 32];
for p in 0..list.len() {
list[p] = p as u64 + 10 * 3;
}
let buf = FreeList::write_free_list(&list);
let read_list = FreeList::read_free_list(&buf);
assert_eq!(list, read_list);
}
#[test]
fn test_cache_limit_remove() {
let mut cache = Cache::new(1050 as u64, Duration::from_secs(3600));
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
cache.remove(20);
cache.remove(30);
assert_eq!(cache.size, 0);
}
#[test]
fn test_cache_replace_limit_stay() {
let mut cache = Cache::new(100000 as u64, Duration::from_secs(3600));
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
let pre = cache.size;
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert_eq!(cache.size, pre);
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 10));
assert_eq!(cache.size, pre - (1 << 9) + (1 << 10));
}
#[test]
fn test_cache_time_clean() {
let mut cache = Cache::new(100000 as u64, Duration::from_millis(500));
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
std::thread::sleep(Duration::from_millis(600));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert_eq!(cache.size, 1 << 9);
}
}