1use crate::common::errors::Error;
20use crate::common::defs::BlockId;
21use crate::storage::datastore::FileDesc;
22use crate::block_mgr::block_mgr::BlockMgr;
23use crate::block_mgr::allocator::BlockAllocator;
24use crate::block_mgr::block::BlockLockedMut;
25use crate::block_mgr::block::DataBlock;
26use crate::block_mgr::block::BasicBlock;
27use std::cell::RefCell;
28use std::cell::Ref;
29use std::rc::Rc;
30
31
32#[derive(Clone)]
33pub struct CheckpointStore {
34 block_mgr: Rc<BlockMgr>,
35 block_allocator: Rc<BlockAllocator>,
36 file_info: RefCell<Vec<FileDesc>>,
37}
38
39impl CheckpointStore {
40
41 pub fn new(block_mgr: Rc<BlockMgr>, block_allocator: Rc<BlockAllocator>) -> Result<Self, Error> {
42 let file_info = RefCell::new(vec![]);
43 Ok(CheckpointStore {
44 block_mgr,
45 block_allocator,
46 file_info,
47 })
48 }
49
50 pub fn add_block(&self, block: &BlockLockedMut<DataBlock>, checkpoint_csn: u64) -> Result<(), Error> {
52 let mut tgt_block = self.block_allocator.get_free_checkpoint_block(checkpoint_csn)?;
53 tgt_block.copy_from(&block);
54 tgt_block.set_original_id(block.get_id());
55 self.block_mgr.set_checkpoint_block_id(block.get_buf_idx(), tgt_block.get_id());
56 self.block_mgr.set_checkpoint_written(block.get_buf_idx(), false);
57
58 Ok(())
59 }
60
61 pub fn get_iter(&self, checkpoint_csn: u64) -> Result<Iterator, Error> {
63 self.block_mgr.get_checkpoint_files(&mut self.file_info.borrow_mut());
64 Ok(Iterator::new(&self.block_mgr, self.file_info.borrow(), checkpoint_csn))
65 }
66}
67
68pub struct Iterator<'a> {
70 block_mgr: &'a BlockMgr,
71 file_desc: Ref<'a, Vec<FileDesc>>,
72 cur_extent_id: u16,
73 cur_block_id: u16,
74 checkpoint_csn: u64,
75 cur_file_idx: usize,
76}
77
78impl<'a> Iterator<'a> {
79
80 fn new(block_mgr: &'a BlockMgr, file_desc: Ref<'a, Vec<FileDesc>>, checkpoint_csn: u64) -> Self {
81 Iterator {
82 block_mgr,
83 file_desc,
84 cur_extent_id: (checkpoint_csn & 0x1) as u16 + 1, cur_block_id: 0,
86 checkpoint_csn,
87 cur_file_idx: 0,
88 }
89 }
90
91 pub fn get_next(&mut self) -> Result<Option<(BlockId, DataBlock)>, Error> {
92 while let Some(block_id) = self.calc_next_block_id() {
93 let block = self.block_mgr.get_block_mut_no_lock(&block_id)?;
94 if block.get_checkpoint_csn() == self.checkpoint_csn {
95 return Ok(Some((block.get_original_id(), block)));
96 } else {
97 break;
98 }
99 }
100
101 Ok(None)
102 }
103
104 fn calc_next_block_id(&mut self) -> Option<BlockId> {
105 self.cur_block_id += 1;
106 if self.cur_block_id == self.file_desc[self.cur_file_idx].extent_size {
107 self.cur_block_id = 0;
108 self.cur_extent_id += 2;
109 if self.cur_extent_id >= self.file_desc[self.cur_file_idx].extent_num {
110 self.cur_extent_id = (self.checkpoint_csn & 0x1) as u16;
111 self.cur_file_idx += 1;
112 if self.cur_file_idx == self.file_desc.len() {
113 return None;
114 }
115 }
116 }
117
118 Some(BlockId {
119 file_id: self.file_desc[self.cur_file_idx].file_id,
120 extent_id: self.cur_extent_id,
121 block_id: self.cur_block_id,
122 })
123 }
124}
125
126
127#[cfg(test)]
128mod tests {
129
130 use super::*;
131 use crate::storage::datastore::DataStore;
132 use crate::storage::datastore::FileType;
133 use crate::storage::datastore::FileDesc;
134 use crate::storage::datastore::FileState;
135 use crate::buf_mgr::buf_writer::BufWriter;
136 use crate::system::config::ConfigMt;
137 use std::time::Duration;
138 use std::path::Path;
139
140
141 fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
142
143 if Path::new(&dspath).exists() {
144 std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
145 }
146 std::fs::create_dir(&dspath).expect("Failed to create test dir");
147
148 let mut fdset = vec![];
149 let desc1 = FileDesc {
150 state: FileState::InUse,
151 file_id: 3,
152 extent_size: 16,
153 extent_num: 3,
154 max_extent_num: 65500,
155 file_type: FileType::DataStoreFile,
156 };
157 let desc2 = FileDesc {
158 state: FileState::InUse,
159 file_id: 4,
160 extent_size: 10,
161 extent_num: 3,
162 max_extent_num: 65500,
163 file_type: FileType::VersioningStoreFile,
164 };
165 let desc3 = FileDesc {
166 state: FileState::InUse,
167 file_id: 5,
168 extent_size: 10,
169 extent_num: 5,
170 max_extent_num: 65500,
171 file_type: FileType::CheckpointStoreFile,
172 };
173
174 fdset.push(desc1);
175 fdset.push(desc2);
176 fdset.push(desc3);
177
178 DataStore::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
179 fdset
180 }
181
182 fn check_added_num(expected_cnt: usize, cs: &CheckpointStore, checkpoint_csn: u64) {
183 let mut block_num = 0;
184
185 let mut iter = cs.get_iter(checkpoint_csn).expect("Failed to get iterator");
186
187 while let Some(_block) = iter.get_next().expect("Failed to get next block") {
188 block_num += 1;
189 }
190
191 assert_eq!(expected_cnt, block_num);
192 }
193
194 fn add_block(file_id: u16, extent_id: u16, block_id: u16, cs: &CheckpointStore, block_mgr: &BlockMgr, checkpoint_csn: u64) -> usize {
195 let block_id = BlockId::init(file_id, extent_id, block_id);
196 let mut block = block_mgr.get_block_mut(&block_id).expect("Failed to get block");
197 let ret = block.get_buf_idx();
198 block.set_checkpoint_csn(checkpoint_csn);
199 cs.add_block(&block, checkpoint_csn).expect("Failed to add block");
200 ret
201 }
202
203 fn flush_blocks(block_mgr: &BlockMgr, idxs: &[usize]) {
204
205 let mut i =0;
206 assert!(loop {
207 std::thread::sleep(Duration::new(2,0));
208 let mut dirty = false;
209 for idx in idxs.iter() {
210 let desc = block_mgr.get_block_desc(*idx).unwrap();
211 if desc.dirty {
212 dirty = true;
213 }
214 }
215 if ! dirty {
216 break true;
217 }
218 i += 1;
219 if i == 30 {
220 break false;
221 }
222 }, "Writers couldn't complete in 60 secs");
223 }
224
225 #[test]
226 fn test_checkpoint_store() {
227 let dspath = "/tmp/test_checkpoint_store_68343467";
228 let block_size = 8192;
229 let block_num = 100;
230 let writer_num = 2;
231
232 let conf = ConfigMt::new();
233 let mut c = conf.get_conf();
234 c.set_datastore_path(dspath.to_owned());
235 c.set_block_mgr_n_lock(10);
236 c.set_free_info_n_file_lock(10);
237 c.set_free_info_n_extent_lock(10);
238 c.set_block_buf_size(block_num*block_size as u64);
239 c.set_checkpoint_data_threshold(10*1024);
240 c.set_version_retain_time(10_000);
241 c.set_writer_num(2);
242 drop(c);
243
244 let _init_fdesc = init_datastore(dspath, block_size);
245
246 let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
247 let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
248 let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
249 let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
250
251 let mut checkpoint_csn = 1;
252 let add_cnt = 7;
253 let mut idxs = vec![];
254
255 for i in 0..add_cnt {
256 idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
257 }
258 flush_blocks(&block_mgr, &idxs);
259
260 check_added_num(add_cnt as usize, &cs, checkpoint_csn);
261
262 idxs.truncate(0);
263 idxs.push(add_block(3, 1, 1 + add_cnt, &cs, &block_mgr, checkpoint_csn));
264 flush_blocks(&block_mgr, &idxs);
265
266 drop(cs);
268 buf_writer.terminate();
269 if let Ok(bm) = Rc::try_unwrap(block_mgr) {
270 drop(bm);
271 } else {
272 panic!("Failed to unwrap block mgr");
273 }
274
275 let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create block mgr"));
276 let block_allocator = Rc::new(BlockAllocator::new(conf.clone(), block_mgr.clone()));
277 let buf_writer = BufWriter::new(&block_mgr, writer_num).expect("Failed to create buf writer");
278 let cs = CheckpointStore::new(block_mgr.clone(), block_allocator).expect("Failed to create checkpoint store");
279
280 check_added_num(add_cnt as usize + 1, &cs, checkpoint_csn);
281
282
283 checkpoint_csn += 1;
284
285 idxs.truncate(0);
286 for i in 0..add_cnt {
287 idxs.push(add_block(3, 1, 1 + i, &cs, &block_mgr, checkpoint_csn));
288 idxs.push(add_block(3, 2, 1 + i, &cs, &block_mgr, checkpoint_csn));
289 }
290
291 flush_blocks(&block_mgr, &idxs);
292
293 check_added_num(add_cnt as usize * 2, &cs, checkpoint_csn);
294 buf_writer.terminate();
295 }
296}