use crate::{
config::Config,
discref::{Device, DiscRef, Page, PageOps, ReadPage, UpdateList},
error::PRes,
flush_checksum::{double_buffer_check, prepare_buffer_flush},
};
use byteorder::{BigEndian, ByteOrder};
use linked_hash_map::LinkedHashMap;
use std::{
io::{Read, Write},
sync::Mutex,
};
const ALLOCATOR_PAGE_EXP: u8 = 10;
pub struct Cache {
cache: LinkedHashMap<u64, ReadPage>,
size: u64,
limit: u64,
}
impl Cache {
pub fn new(limit: u64) -> Cache {
Cache {
cache: LinkedHashMap::new(),
size: 0,
limit,
}
}
fn get(&mut self, key: u64) -> Option<ReadPage> {
self.cache.get_refresh(&key).map(|val| val.clone_read())
}
fn put(&mut self, key: u64, value: ReadPage) {
self.size += 1 << value.get_size_exp();
self.cache.insert(key, value);
while self.size > self.limit {
if let Some(en) = self.cache.pop_front() {
self.size -= 1 << en.1.get_size_exp();
} else {
break;
}
}
}
fn remove(&mut self, key: u64) {
self.cache.remove(&key);
}
}
struct FreeList {
list: [u64; 32],
last_flush: u8,
}
pub struct Allocator {
disc: Box<dyn Device>,
freelist: Mutex<FreeList>,
cache: Mutex<Cache>,
page: u64,
}
impl Allocator {
pub fn new(dr: DiscRef, config: &Config, page: u64) -> PRes<Allocator> {
let mut buffer_0 = [0; 259];
let mut buffer_1 = [0; 259];
let freelist;
let last_flush;
{
let mut page = dr.load_page(page)?;
page.read_exact(&mut buffer_0)?;
page.read_exact(&mut buffer_1)?;
let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
last_flush = flush_number;
if first {
freelist = Allocator::read_free_list(&buffer_0);
} else {
freelist = Allocator::read_free_list(&buffer_1);
}
}
let cache_size = config.cache_size();
Ok(Allocator {
disc: Box::new(dr),
freelist: Mutex::new(FreeList {
list: freelist,
last_flush,
}),
cache: Mutex::new(Cache::new(cache_size)),
page,
})
}
pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
let mut freelist = [0; 32];
for p in &mut freelist.iter_mut().enumerate() {
let pos = 8 * p.0;
(*p.1) = BigEndian::read_u64(&buffer[pos..pos + 8]);
}
freelist
}
pub fn init(dr: &DiscRef) -> PRes<u64> {
let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
let mut buffer = Allocator::write_free_list(&[0; 32]);
prepare_buffer_flush(&mut buffer, 0);
page.write_all(&buffer)?;
dr.flush_page(&page)?;
Ok(page.get_index())
}
pub fn load_page_not_free(&self, page: u64) -> PRes<Option<ReadPage>> {
let load = self.read_page_int(page)?;
if load.is_free()? {
Ok(None)
} else {
Ok(Some(load))
}
}
pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
let load = self.read_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
pub fn write_page(&self, page: u64) -> PRes<Page> {
let load = self.write_page_int(page)?;
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
Ok(load)
}
fn read_page_int(&self, page: u64) -> PRes<ReadPage> {
{
let mut cache = self.cache.lock()?;
if let Some(pg) = cache.get(page) {
return Ok(pg);
}
}
let load = self.disc.load_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page, load.clone_read());
}
Ok(load)
}
fn write_page_int(&self, page: u64) -> PRes<Page> {
let cache_result;
{
let mut cache = self.cache.lock()?;
cache_result = cache.get(page);
}
if let Some(pg) = cache_result {
return Ok(pg.clone_write());
}
let load = self.disc.load_page(page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page, load.clone_read());
}
Ok(load.clone_write())
}
pub fn allocate(&self, exp: u8) -> PRes<Page> {
{
let mut fl = self.freelist.lock()?;
let page = fl.list[exp as usize];
if page != 0 as u64 {
let next = self.disc.mark_allocated(page)?;
fl.list[exp as usize] = next;
let mut pg = self.write_page_int(page)?;
pg.reset()?;
return Ok(pg);
}
}
let page = self.disc.create_page(exp)?;
Ok(page)
}
pub fn flush_page(&self, page: Page) -> PRes<()> {
self.disc.flush_page(&page)?;
{
let mut cache = self.cache.lock()?;
cache.put(page.get_index(), page.make_read());
}
Ok(())
}
pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
let mut fl = self.freelist.lock()?;
if fl.list[exp as usize] == page {
fl.list[exp as usize] = 0;
} else {
let mut p = fl.list[exp as usize];
while p != 0 as u64 {
let mut pg = self.write_page_int(p)?;
p = pg.get_next_free()?;
if p == page {
pg.set_free(false)?;
pg.set_next_free(0)?;
self.flush_page(pg)?;
break;
}
}
}
Ok(())
}
pub fn free(&self, page: u64) -> PRes<()> {
self.cache.lock()?.remove(page);
let mut fl = self.freelist.lock()?;
self.disc.trim_or_free_page(page, &mut fl.list)?;
Ok(())
}
pub fn write_free_list(list: &[u64]) -> [u8; 259] {
let mut buffer = [0; 259];
for (pos, page) in list.iter().enumerate() {
BigEndian::write_u64(&mut buffer[pos..pos + 8], *page);
}
buffer
}
pub fn flush_free_list(&self) -> PRes<()> {
let mut lock = self.freelist.lock()?;
let mut buffer = Allocator::write_free_list(&lock.list);
let (last_flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
lock.last_flush = last_flush;
let mut pag = self.disc.load_page(self.page)?.clone_write();
pag.seek(offset)?;
pag.write_all(&buffer)?;
self.disc.flush_page(&pag)?;
Ok(())
}
pub fn disc(&self) -> &dyn Device {
&*self.disc
}
}
impl UpdateList for [u64; 32] {
fn update(&mut self, size: u8, page: u64) -> PRes<u64> {
let old = self[size as usize];
self[size as usize] = page;
debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
Ok(old)
}
}
#[cfg(test)]
mod tests {
use super::{Allocator, Cache};
use crate::{
config::Config,
discref::{DiscRef, PageOps, ReadPage},
};
use std::rc::Rc;
use std::sync::Arc;
use tempfile::Builder;
#[test]
fn test_reuse_freed_page() {
let file = Builder::new()
.prefix("all_reuse_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = DiscRef::new(file);
let pg = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
allocator.allocate(2).unwrap();
let first = allocator.allocate(10).unwrap().get_index();
let second = allocator.allocate(10).unwrap().get_index();
let third = allocator.allocate(11).unwrap().get_index();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, second);
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, first);
let val = allocator.allocate(10).unwrap().get_index();
assert!(val != first);
assert!(val != second);
let val = allocator.allocate(11).unwrap().get_index();
assert_eq!(val, third);
let val = allocator.allocate(11).unwrap().get_index();
assert!(val != third);
}
#[test]
fn test_remove_freed_page() {
let file = Builder::new()
.prefix("remove_free_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = DiscRef::new(file);
let pg = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
allocator.allocate(2).unwrap();
let first = allocator.allocate(10).unwrap().get_index();
let second = allocator.allocate(10).unwrap().get_index();
let third = allocator.allocate(10).unwrap().get_index();
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
println!("{} {} {}", first, second, third);
allocator.free(first).unwrap();
allocator.free(second).unwrap();
allocator.free(third).unwrap();
allocator.remove_from_free(second, 10).unwrap();
let val = allocator.allocate(10).unwrap().get_index();
assert_eq!(val, third);
let val = allocator.allocate(10).unwrap().get_index();
assert!(val != first);
assert!(val != second);
assert!(val != third);
}
#[test]
fn test_cache_limit_evict() {
let mut cache = Cache::new(1050 as u64);
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
#[test]
fn test_cache_limit_refresh_evict() {
let mut cache = Cache::new(1050 as u64);
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
assert!(cache.size < 1050);
assert_eq!(cache.cache.len(), 2);
let ten = 10 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
let ten = 20 as u64;
match cache.get(ten) {
Some(_) => assert!(false),
None => assert!(true),
}
let ten = 30 as u64;
match cache.get(ten) {
Some(_) => assert!(true),
None => assert!(false),
}
}
}