use crate::{
config::Config,
discref::{DiscRef, Page, PageIndex, PageSeek},
error::PRes,
flush_checksum::{double_buffer_check, prepare_buffer_flush},
};
use byteorder::{BigEndian, ByteOrder};
use linked_hash_map::LinkedHashMap;
use std::{
io::{self, Read, Write},
sync::Mutex,
};
const ALLOCATOR_PAGE_EXP: u8 = 10;
pub struct Cache {
cache: LinkedHashMap<u64, Page>,
size: u64,
limit: u64,
}
impl Cache {
pub fn new(limit: u64) -> Cache {
Cache {
cache: LinkedHashMap::new(),
size: 0,
limit,
}
}
fn get(&mut self, key: u64) -> Option<Page> {
self.cache.get_refresh(&key).map(|val| val.clone_resetted())
}
fn put(&mut self, key: u64, value: &Page) {
self.size += 1 << value.get_size_exp();
self.cache.insert(key, value.clone_resetted());
while self.size > self.limit {
if let Some(en) = self.cache.pop_front() {
self.size -= 1 << en.1.get_size_exp();
} else {
break;
}
}
}
}
struct FreeList {
list: [u64; 32],
last_flush: u8,
}
pub struct Allocator {
disc: DiscRef,
freelist: Mutex<FreeList>,
cache: Mutex<Cache>,
page: u64,
}
pub struct ReadPage {
page: Page,
}
impl ReadPage {
pub fn get_size_exp(&mut self) -> u8 {
self.page.get_size_exp()
}
}
impl Read for ReadPage {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.page.read(buf)
}
}
impl PageSeek for ReadPage {
fn seek(&mut self, pos: u32) -> PRes<()> {
self.page.seek(pos)
}
}
impl PageIndex for ReadPage {
fn get_index(&self) -> u64 {
self.page.get_index()
}
}
impl Allocator {
pub fn new(dr: DiscRef, config: &Config, page: u64) -> PRes<Allocator> {
let mut buffer_0 = [0; 259];
let mut buffer_1 = [0; 259];
let freelist;
let last_flush;
{
let mut page = dr.load_page(page)?;
page.read_exact(&mut buffer_0)?;
page.read_exact(&mut buffer_1)?;
let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
last_flush = flush_number;
if first {
freelist = Allocator::read_free_list(&buffer_0);
} else {
freelist = Allocator::read_free_list(&buffer_1);
}
}
let cache_size = config.cache_size();
Ok(Allocator {
disc: dr,
freelist: Mutex::new(FreeList {
list: freelist,
last_flush,
}),
cache: Mutex::new(Cache::new(cache_size)),
page,
})
}
pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
let mut freelist = [0; 32];
for p in &mut freelist.iter_mut().enumerate() {
let pos = 8 * p.0;
(*p.1) = BigEndian::read_u64(&buffer[pos..pos + 8]);
}
freelist
}
pub fn init(dr: &DiscRef) -> PRes<u64> {
let page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
let mut buffer = Allocator::write_free_list(&[0; 32]);
prepare_buffer_flush(&mut buffer, 0);
let mut pag = dr.load_page(page)?;
pag.write_all(&buffer)?;
dr.flush_page(&pag)?;
Ok(page)
}
pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
Ok(ReadPage {
page: self.write_page(page)?,
})
}
pub fn write_page(&self, page: u64) -> PRes<Page> {
let load = self.write_page_int(page)?;
debug_assert!(!load.is_free()?);
Ok(load)
}
fn write_page_int(&self, page: u64) -> PRes<Page> {
{
let mut cache = self.cache.lock()?;
if let Some(pg) = cache.get(page) {
return Ok(pg);
}
}
let load = self.disc.load_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page, &load);
}
Ok(load)
}
pub fn allocate(&self, exp: u8) -> PRes<u64> {
{
let mut fl = self.freelist.lock()?;
let page = fl.list[exp as usize];
if page != 0 as u64 {
let next;
{
let mut pg = self.write_page_int(page)?;
next = pg.get_next_free()?;
debug_assert!(pg.is_free()?);
pg.reset()?;
pg.set_free(false)?;
self.flush_page(&mut pg)?;
}
fl.list[exp as usize] = next;
return Ok(page);
}
}
self.disc.create_page(exp)
}
pub fn flush_page(&self, page: &mut Page) -> PRes<()> {
self.disc.flush_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page.get_index(), page);
}
Ok(())
}
pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
let mut fl = self.freelist.lock()?;
if fl.list[exp as usize] == page {
fl.list[exp as usize] = 0;
} else {
let mut p = fl.list[exp as usize];
while p != 0 as u64 {
let mut pg = self.write_page_int(p)?;
p = pg.get_next_free()?;
if p == page {
pg.set_next_free(0)?;
self.flush_page(&mut pg)?;
break;
}
}
}
Ok(())
}
pub fn free(&self, page: u64) -> PRes<()> {
let mut fl = self.freelist.lock()?;
let result_load = self.disc.trim_or_load_page(page)?;
if let Some(mut pag) = result_load {
debug_assert!(!pag.is_free()?, "freeing: {} already freed ", page);
pag.set_free(true)?;
let size = pag.get_size_exp();
let old = fl.list[size as usize];
debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
fl.list[size as usize] = page;
pag.set_next_free(old)?;
self.flush_page(&mut pag)?
};
Ok(())
}
pub fn write_free_list(list: &[u64]) -> [u8; 259] {
let mut buffer = [0; 259];
for (pos, page) in list.iter().enumerate() {
BigEndian::write_u64(&mut buffer[pos..pos + 8], *page);
}
buffer
}
pub fn flush_free_list(&self) -> PRes<()> {
let mut lock = self.freelist.lock()?;
let mut buffer = Allocator::write_free_list(&lock.list);
let (last_flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
lock.last_flush = last_flush;
let mut pag = self.disc.load_page(self.page)?;
pag.seek(offset)?;
pag.write_all(&buffer)?;
self.disc.flush_page(&pag)?;
Ok(())
}
pub fn disc(&self) -> &DiscRef {
&self.disc
}
}
#[cfg(test)]
mod tests {
use super::{Allocator, Cache};
use crate::{
config::Config,
discref::{DiscRef, Page},
};
use std::rc::Rc;
use tempfile::Builder;
#[test]
fn test_reuse_freed_page() {
let file = Builder::new()
.prefix("all_reuse_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = DiscRef::new(file);
let pg = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
allocator.allocate(2).unwrap();
let first = allocator.allocate(10).unwrap();
let second = allocator.allocate(10).unwrap();
let third = allocator.allocate(11).unwrap();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
let val = allocator.allocate(10).unwrap();
assert_eq!(val, second);
let val = allocator.allocate(10).unwrap();
assert_eq!(val, first);
let val = allocator.allocate(10).unwrap();
assert!(val != first);
assert!(val != second);
let val = allocator.allocate(11).unwrap();
assert_eq!(val, third);
let val = allocator.allocate(11).unwrap();
assert!(val != third);
}
#[test]
fn test_remove_freed_page() {
let file = Builder::new()
.prefix("remove_free_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = DiscRef::new(file);
let pg = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
allocator.allocate(2).unwrap();
let first = allocator.allocate(10).unwrap();
let second = allocator.allocate(10).unwrap();
let third = allocator.allocate(10).unwrap();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
allocator.remove_from_free(second, 10).unwrap();
let val = allocator.allocate(10).unwrap();
assert_eq!(val, third);
let val = allocator.allocate(10).unwrap();
assert!(val != first);
assert!(val != second);
assert!(val != third);
}
#[test]
fn test_cache_limit_evict() {
let mut cache = Cache::new(1050 as u64);
cache.put(10, &Page::new(Vec::new(), 0, 10, 9));
cache.put(20, &Page::new(Vec::new(), 0, 10, 9));
cache.put(30, &Page::new(Vec::new(), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
#[test]
fn test_cache_limit_refresh_evict() {
let mut cache = Cache::new(1050 as u64);
cache.put(10, &Page::new(Vec::new(), 0, 10, 9));
cache.put(20, &Page::new(Vec::new(), 0, 10, 9));
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
cache.put(30, &Page::new(Vec::new(), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
}