use crate::common::errors::Error;
use crate::common::defs::BlockId;
use crate::storage::datastore::FileDesc;
use crate::block_mgr::block_mgr::BlockMgr;
use crate::block_mgr::allocator::BlockAllocator;
use crate::block_mgr::block::BlockLockedMut;
use crate::block_mgr::block::DataBlock;
use crate::block_mgr::block::BasicBlock;
use std::cell::RefCell;
use std::cell::Ref;
use std::rc::Rc;
#[derive(Clone)]
pub struct CheckpointStore {
block_mgr: Rc<BlockMgr>,
block_allocator: Rc<BlockAllocator>,
file_info: RefCell<Vec<FileDesc>>,
}
impl CheckpointStore {
pub fn new(block_mgr: Rc<BlockMgr>, block_allocator: Rc<BlockAllocator>) -> Result<Self, Error> {
let file_info = RefCell::new(vec![]);
Ok(CheckpointStore {
block_mgr,
block_allocator,
file_info,
})
}
pub fn add_block(&self, block: &BlockLockedMut<DataBlock>, checkpoint_csn: u64) -> Result<(), Error> {
let mut tgt_block = self.block_allocator.get_free_checkpoint_block(checkpoint_csn)?;
tgt_block.copy_from(&block);
tgt_block.set_original_id(block.get_id());
self.block_mgr.set_checkpoint_block_id(block.get_buf_idx(), tgt_block.get_id());
self.block_mgr.set_checkpoint_written(block.get_buf_idx(), false);
Ok(())
}
pub fn get_iter(&self, checkpoint_csn: u64) -> Result<Iterator, Error> {
self.block_mgr.get_checkpoint_files(&mut self.file_info.borrow_mut());
Ok(Iterator::new(&self.block_mgr, self.file_info.borrow(), checkpoint_csn))
}
}
pub struct Iterator<'a> {
block_mgr: &'a BlockMgr,
file_desc: Ref<'a, Vec<FileDesc>>,
cur_extent_id: u16,
cur_block_id: u16,
checkpoint_csn: u64,
cur_file_idx: usize,
}
impl<'a> Iterator<'a> {
fn new(block_mgr: &'a BlockMgr, file_desc: Ref<'a, Vec<FileDesc>>, checkpoint_csn: u64) -> Self {
Iterator {
block_mgr,
file_desc,
cur_extent_id: (checkpoint_csn & 0x1) as u16 + 1, cur_block_id: 0,
checkpoint_csn,
cur_file_idx: 0,
}
}
pub fn get_next(&mut self) -> Result<Option<(BlockId, DataBlock)>, Error> {
while let Some(block_id) = self.calc_next_block_id() {
let block = self.block_mgr.get_block_mut_no_lock(&block_id)?;
if block.get_checkpoint_csn() == self.checkpoint_csn {
return Ok(Some((block.get_original_id(), block)));
} else {
break;
}
}
Ok(None)
}
fn calc_next_block_id(&mut self) -> Option<BlockId> {
self.cur_block_id += 1;
if self.cur_block_id == self.file_desc[self.cur_file_idx].extent_size {
self.cur_block_id = 0;
self.cur_extent_id += 2;
if self.cur_extent_id >= self.file_desc[self.cur_file_idx].extent_num {
self.cur_extent_id = (self.checkpoint_csn & 0x1) as u16;
self.cur_file_idx += 1;
if self.cur_file_idx == self.file_desc.len() {
return None;
}
}
}
Some(BlockId {
file_id: self.file_desc[self.cur_file_idx].file_id,
extent_id: self.cur_extent_id,
block_id: self.cur_block_id,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::datastore::DataStore;
use crate::storage::datastore::FileType;
use crate::storage::datastore::FileDesc;
use crate::storage::datastore::FileState;
use crate::buf_mgr::buf_writer::BufWriter;
use crate::system::config::ConfigMt;
use std::time::Duration;
use std::path::Path;
fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
if Path::new(&dspath).exists() {
std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
}
std::fs::create_dir(&dspath).expect("Failed to create test dir");
let mut fdset = vec![];
let desc1 = FileDesc {
state: FileState::InUse,
file_id: 3,
extent_size: 16,
extent_num: 3,
max_extent_num: 65500,
file_type: FileType::DataStoreFile,
};
let desc2 = FileDesc {
state: FileState::InUse,
file_id: 4,
extent_size: 10,
extent_num: 3,
max_extent_num: 65500,
file_type: FileType::VersioningStoreFile,
};
let desc3 = FileDesc {
state: FileState::InUse,
file_id: 5,
extent_size: 10,
extent_num: 5,
max_extent_num: 65500,
file_type: FileType::CheckpointStoreFile,
};
fdset.push(desc1);
fdset.push(desc2);
fdset.push(desc3);
DataStore::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
fdset
}
fn check_added_num(expected_cnt: usize, cs: &CheckpointStore, checkpoint_csn: u64) {
let mut block_num = 0;
let mut iter = cs.get_iter(checkpoint_csn).expect("Failed to get iterator");
while let Some(_block) = iter.get_next().expect("Failed to get next block") {
block_num += 1;
}
assert_eq!(expected_cnt, block_num);
}
fn add_block(file_id: u16, extent_id: u16, block_id: u16, cs: &CheckpointStore, block_mgr: &BlockMgr, checkpoint_csn: u64) -> usize {
let block_id = BlockId::init(file_id, extent_id, block_id);
let mut block = block_mgr.get_block_mut(&block_id).expect("Failed to get block");
let ret = block.get_buf_idx();
block.set_checkpoint_csn(checkpoint_csn);
cs.add_block(&block, checkpoint_csn).expect("Failed to add block");
ret
}
fn flush_blocks(block_mgr: &BlockMgr, idxs: &[usize]) {
let mut i =0;
assert!(loop {
std::thread::sleep(Duration::new(2,0));
let mut dirty = false;
for idx in idxs.iter() {
let desc = block_mgr.get_block_desc(*idx).unwrap();
if desc.dirty {
dirty = true;
}
}
if ! dirty {
break true;
}
i += 1;
if i == 30 {
break false;
}
}, "Writers couldn't complete in 60 secs");
}
#[test]
fn test_checkpoint_store() {
let dspath = "/tmp/test_checkpoint_store_68343467";
let block_size = 8192;
let block_num = 100;
let writer_num = 2;
let conf = ConfigMt::new();
let mut c = conf.get_conf();
c.set_datastore_path(dspath.to_owned());
c.set_block_mgr_n_lock(10);
c.set_free_info_n_file_lock(10);
c.set_free_info_n_extent_lock(10);
c.set_block_buf_size(block_num*block_size as u64);
c.set_checkpoint_data_threshold(10*1024);
c.set_version_retain_time(10_000);
c.set_writer_num(2);
drop(c);
let _init_fdesc = init_datastore(dspath, block_size);
let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
let mut checkpoint_csn = 1;
let add_cnt = 7;
let mut idxs = vec![];
for i in 0..add_cnt {
idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
}
flush_blocks(&block_mgr, &idxs);
check_added_num(add_cnt as usize, &cs, checkpoint_csn);
idxs.truncate(0);
idxs.push(add_block(3, 1, 1 + add_cnt, &cs, &block_mgr, checkpoint_csn));
flush_blocks(&block_mgr, &idxs);
drop(cs);
buf_writer.terminate();
if let Ok(bm) = Rc::try_unwrap(block_mgr) {
drop(bm);
} else {
panic!("Failed to unwrap block mgr");
}
let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
check_added_num(add_cnt as usize + 1, &cs, checkpoint_csn);
checkpoint_csn += 1;
idxs.truncate(0);
for i in 0..add_cnt {
idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
idxs.push(add_block(3, 2, 1 + i, &cs, &block_mgr, checkpoint_csn));
}
flush_blocks(&block_mgr, &idxs);
check_added_num(add_cnt as usize * 2, &cs, checkpoint_csn);
buf_writer.terminate();
}
}