db_core/storage/
checkpoint_store.rs

1/// Block storage for checkpoint restore.
2/// CheckpointStore represents part of BlockStorageDriver functionality related to checkpointing.
3/// Before any data block can be modifed its original state must be saved in the special block
4/// checkpoint store. 
5/// When system starts it searches through the transaction log for the last checkpoint
6/// mark and then uses checkpoint store to restore all blocks from it. After all blocks are
7/// restored database represents state as of checkpoint start, and system can read the changes from
8/// the transaction log and apply them to database.
9/// Each checkpoint is identified by unique checkpoint sequence number. Checkpoint can be in
10/// not completed state, because when new checkpoint begins, a sequence of actions like writing all the 
11/// previous checkpoint blocks to disk is required. As soon as all them done, the checkpoint is marked as
12/// completed, and corresponding record is made in the transaction log. 
13/// System can use only completed checkpoint for restore. That means the checkpoint
14/// store keeps previous checkpoint's blocks until the new checkpoint is marked as completed. As soon
15/// as the new checkpoint is completed all blocks of the previous checkpoint are discarded and space
16/// can be reused by the current and future checkpoints.
17
18
19use crate::common::errors::Error;
20use crate::common::defs::BlockId;
21use crate::storage::datastore::FileDesc;
22use crate::block_mgr::block_mgr::BlockMgr;
23use crate::block_mgr::allocator::BlockAllocator;
24use crate::block_mgr::block::BlockLockedMut;
25use crate::block_mgr::block::DataBlock;
26use crate::block_mgr::block::BasicBlock;
27use std::cell::RefCell;
28use std::cell::Ref;
29use std::rc::Rc;
30
31
32#[derive(Clone)]
33pub struct CheckpointStore {
34    block_mgr: Rc<BlockMgr>,
35    block_allocator: Rc<BlockAllocator>,
36    file_info: RefCell<Vec<FileDesc>>,
37}
38
39impl CheckpointStore {
40
41    pub fn new(block_mgr: Rc<BlockMgr>, block_allocator: Rc<BlockAllocator>) -> Result<Self, Error> {
42        let file_info = RefCell::new(vec![]);
43        Ok(CheckpointStore {
44            block_mgr,
45            block_allocator,
46            file_info,
47        })
48    }
49
50    /// Add block to checkpoint store.
51    pub fn add_block(&self, block: &BlockLockedMut<DataBlock>, checkpoint_csn: u64) -> Result<(), Error> {
52        let mut tgt_block = self.block_allocator.get_free_checkpoint_block(checkpoint_csn)?;
53        tgt_block.copy_from(&block);
54        tgt_block.set_original_id(block.get_id());
55        self.block_mgr.set_checkpoint_block_id(block.get_buf_idx(), tgt_block.get_id());
56        self.block_mgr.set_checkpoint_written(block.get_buf_idx(), false);
57
58        Ok(())
59    }
60
61    /// Return iterator over checkpoint store blocks.
62    pub fn get_iter(&self, checkpoint_csn: u64) -> Result<Iterator, Error> {
63        self.block_mgr.get_checkpoint_files(&mut self.file_info.borrow_mut());
64        Ok(Iterator::new(&self.block_mgr, self.file_info.borrow(), checkpoint_csn))
65    }
66}
67
68/// Iterator over blocks of checkpoint store for certain checkpoint sequence number.
69pub struct Iterator<'a> {
70    block_mgr:      &'a BlockMgr,
71    file_desc:      Ref<'a, Vec<FileDesc>>,
72    cur_extent_id:  u16,
73    cur_block_id:   u16,
74    checkpoint_csn: u64,
75    cur_file_idx:   usize,
76}
77
78impl<'a> Iterator<'a> {
79
80    fn new(block_mgr: &'a BlockMgr, file_desc: Ref<'a, Vec<FileDesc>>, checkpoint_csn: u64) -> Self {
81        Iterator {
82            block_mgr,
83            file_desc,
84            cur_extent_id: (checkpoint_csn & 0x1) as u16 + 1,   // avoid extent 0 by adding 1
85            cur_block_id: 0,
86            checkpoint_csn,
87            cur_file_idx: 0,
88        }
89    }
90
91    pub fn get_next(&mut self) -> Result<Option<(BlockId, DataBlock)>, Error> {
92        while let Some(block_id) = self.calc_next_block_id() {
93            let block = self.block_mgr.get_block_mut_no_lock(&block_id)?;
94            if block.get_checkpoint_csn() == self.checkpoint_csn {
95                return Ok(Some((block.get_original_id(), block)));
96            } else {
97                break;
98            }
99        }
100
101        Ok(None)
102    }
103
104    fn calc_next_block_id(&mut self) -> Option<BlockId> {
105        self.cur_block_id += 1;
106        if self.cur_block_id == self.file_desc[self.cur_file_idx].extent_size {
107            self.cur_block_id = 0;
108            self.cur_extent_id += 2;
109            if self.cur_extent_id >= self.file_desc[self.cur_file_idx].extent_num {
110                self.cur_extent_id = (self.checkpoint_csn & 0x1) as u16;
111                self.cur_file_idx += 1;
112                if self.cur_file_idx == self.file_desc.len() {
113                    return None;
114                }
115            }
116        }
117
118        Some(BlockId {
119            file_id: self.file_desc[self.cur_file_idx].file_id,
120            extent_id: self.cur_extent_id,
121            block_id: self.cur_block_id,
122        })
123    }
124}
125
126
127#[cfg(test)]
128mod tests {
129
130    use super::*;
131    use crate::storage::datastore::DataStore;
132    use crate::storage::datastore::FileType;
133    use crate::storage::datastore::FileDesc;
134    use crate::storage::datastore::FileState;
135    use crate::buf_mgr::buf_writer::BufWriter;
136    use crate::system::config::ConfigMt;
137    use std::time::Duration;
138    use std::path::Path;
139
140
141    fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
142
143        if Path::new(&dspath).exists() {
144            std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
145        }
146        std::fs::create_dir(&dspath).expect("Failed to create test dir");
147
148        let mut fdset = vec![];
149        let desc1 = FileDesc {
150            state:          FileState::InUse,
151            file_id:        3,
152            extent_size:    16,
153            extent_num:     3,
154            max_extent_num: 65500,
155            file_type:      FileType::DataStoreFile,
156        };
157        let desc2 = FileDesc {
158            state:          FileState::InUse,
159            file_id:        4,
160            extent_size:    10,
161            extent_num:     3,
162            max_extent_num: 65500,
163            file_type:      FileType::VersioningStoreFile,
164        };
165        let desc3 = FileDesc {
166            state:          FileState::InUse,
167            file_id:        5,
168            extent_size:    10,
169            extent_num:     5,
170            max_extent_num: 65500,
171            file_type:      FileType::CheckpointStoreFile,
172        };
173
174        fdset.push(desc1);
175        fdset.push(desc2);
176        fdset.push(desc3);
177
178        DataStore::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
179        fdset
180    }
181
182    fn check_added_num(expected_cnt: usize, cs: &CheckpointStore, checkpoint_csn: u64) {
183        let mut block_num = 0;
184
185        let mut iter = cs.get_iter(checkpoint_csn).expect("Failed to get iterator");
186
187        while let Some(_block) = iter.get_next().expect("Failed to get next block") {
188            block_num += 1;
189        }
190
191        assert_eq!(expected_cnt, block_num);
192    }
193
194    fn add_block(file_id: u16, extent_id: u16, block_id: u16, cs: &CheckpointStore, block_mgr: &BlockMgr, checkpoint_csn: u64) -> usize {
195        let block_id = BlockId::init(file_id, extent_id, block_id);
196        let mut block = block_mgr.get_block_mut(&block_id).expect("Failed to get block");
197        let ret = block.get_buf_idx();
198        block.set_checkpoint_csn(checkpoint_csn);
199        cs.add_block(&block, checkpoint_csn).expect("Failed to add block");
200        ret
201    }
202
203    fn flush_blocks(block_mgr: &BlockMgr, idxs: &[usize]) {
204
205        let mut i =0;
206        assert!(loop {
207            std::thread::sleep(Duration::new(2,0));
208            let mut dirty = false;
209            for idx in idxs.iter() {
210                let desc = block_mgr.get_block_desc(*idx).unwrap();
211                if desc.dirty {
212                    dirty = true;
213                }
214            }
215            if ! dirty {
216                break true;
217            }
218            i += 1;
219            if i == 30 {
220                break false;
221            }
222        }, "Writers couldn't complete in 60 secs");
223    }
224
225    #[test]
226    fn test_checkpoint_store() {
227        let dspath = "/tmp/test_checkpoint_store_68343467";
228        let block_size = 8192;
229        let block_num = 100;
230        let writer_num = 2;
231
232        let conf = ConfigMt::new();
233        let mut c = conf.get_conf();
234        c.set_datastore_path(dspath.to_owned());
235        c.set_block_mgr_n_lock(10);
236        c.set_free_info_n_file_lock(10);
237        c.set_free_info_n_extent_lock(10);
238        c.set_block_buf_size(block_num*block_size as u64);
239        c.set_checkpoint_data_threshold(10*1024);
240        c.set_version_retain_time(10_000);
241        c.set_writer_num(2);
242        drop(c);
243
244        let _init_fdesc = init_datastore(dspath, block_size);
245
246        let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
247        let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
248        let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
249        let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
250
251        let mut checkpoint_csn = 1;
252        let add_cnt = 7;
253        let mut idxs = vec![];
254
255        for i in 0..add_cnt {
256            idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
257        }
258        flush_blocks(&block_mgr, &idxs);
259
260        check_added_num(add_cnt as usize, &cs, checkpoint_csn);
261
262        idxs.truncate(0);
263        idxs.push(add_block(3, 1, 1 + add_cnt, &cs, &block_mgr, checkpoint_csn));
264        flush_blocks(&block_mgr, &idxs);
265
266        // emulate restart, otherwise buf can contain unsynced block data previously read from checkpint store.
267        drop(cs);
268        buf_writer.terminate();
269        if let Ok(bm) = Rc::try_unwrap(block_mgr) {
270            drop(bm);
271        } else {
272            panic!("Failed to unwrap block mgr");
273        }
274
275        let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
276        let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
277        let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
278        let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
279
280        check_added_num(add_cnt as usize + 1, &cs, checkpoint_csn);
281
282
283        checkpoint_csn += 1;
284
285        idxs.truncate(0);
286        for i in 0..add_cnt {
287            idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
288            idxs.push(add_block(3, 2, 1 + i, &cs, &block_mgr, checkpoint_csn));
289        }
290
291        flush_blocks(&block_mgr, &idxs);
292
293        check_added_num(add_cnt as usize * 2, &cs, checkpoint_csn);
294        buf_writer.terminate();
295    }
296}