db_core/block_mgr/
allocator.rs

1/// Searh of free blocks and allocation of new blocks.
2
3
4use crate::common::errors::Error;
5use crate::common::errors::ErrorKind;
6use crate::common::defs::BlockId;
7use crate::common::defs::Sequence;
8use crate::storage::datastore::FileDesc;
9use crate::system::config::ConfigMt;
10use crate::block_mgr::block_mgr::BlockMgr;
11use crate::block_mgr::block::DataBlock;
12use crate::block_mgr::block::BlockLockedMut;
13use crate::block_mgr::free_info::FreeInfo;
14use crate::block_mgr::free_info::FiData;
15use crate::block_mgr::free_info::FreeInfoSharedState;
16use crate::buf_mgr::buf_mgr::BlockType;
17use std::cell::RefCell;
18use std::rc::Rc;
19
20
21/// Shared state that can be sent to other threads.
22pub struct BlockAllocatorSharedState {
23    fi_ss:                  FreeInfoSharedState,
24    checkpoint_store_seq:   Sequence,
25}
26
27
28pub struct BlockAllocator {
29    block_mgr:              Rc<BlockMgr>,
30    file_desc_buf:          RefCell<Vec<FileDesc>>,
31    free_info:              FreeInfo,
32    file_fi_data:           RefCell<FiData>,
33    extent_fi_data:         RefCell<FiData>,
34    checkpoint_store_seq:   Sequence,
35}
36
37impl BlockAllocator {
38
39    pub fn new(conf: ConfigMt, block_mgr: Rc<BlockMgr>) -> Self {
40        let free_info = FreeInfo::new(conf.clone(), block_mgr.clone());
41        let file_desc_buf = RefCell::new(vec![]);
42        let file_fi_data = RefCell::new(FiData:: new());
43        let extent_fi_data = RefCell::new(FiData:: new());
44        let checkpoint_store_seq = Sequence::new(1);
45
46        BlockAllocator {
47            block_mgr,
48            free_info,
49            file_desc_buf,
50            file_fi_data,
51            extent_fi_data,
52            checkpoint_store_seq,
53        }
54    }
55
56    /// Build instance from shared state.
57    pub fn from_shared_state(block_mgr: Rc<BlockMgr>, ss: BlockAllocatorSharedState) -> Result<Self, Error> {
58        let BlockAllocatorSharedState { fi_ss, checkpoint_store_seq } = ss;
59
60        let free_info = FreeInfo::from_shared_state(block_mgr.clone(), fi_ss)?;
61        let file_desc_buf = RefCell::new(vec![]);
62        let file_fi_data = RefCell::new(FiData:: new());
63        let extent_fi_data = RefCell::new(FiData:: new());
64
65        Ok(BlockAllocator {
66            block_mgr,
67            free_info,
68            file_desc_buf,
69            file_fi_data,
70            extent_fi_data,
71            checkpoint_store_seq,
72        })
73    }
74
75    /// Return shared state that can be sent to other threads.
76    pub fn get_shared_state(&self) -> BlockAllocatorSharedState {
77        BlockAllocatorSharedState {
78            fi_ss:                  self.free_info.get_shared_state(), 
79            checkpoint_store_seq:   self.checkpoint_store_seq.clone(),
80        }
81    }
82
83    /// Find or allocate a new data block.
84    pub fn get_free(&self, file_id: u16) -> Result<BlockLockedMut<DataBlock>, Error> {
85        if let Some(block) = self.find_free_block(file_id)? {
86            Ok(block)
87        } else {
88            self.allocate_block(file_id)
89        }
90    }
91
92    /// mark an extent as full.
93    pub fn mark_extent_full(&self, file_id: u16, extent_id: u16) -> Result<(), Error> {
94        self.free_info.set_extent_bit(file_id, extent_id, true)
95    }
96
97    /// In some of datastore files add a new extent, return first free block from that extent.
98    pub fn allocate_block(&self, file_id: u16) -> Result<BlockLockedMut<DataBlock>, Error> {
99        // in some of data files add a new extent;
100        // try to get free block in that extent and return the block;
101        // if free block was not found then add extent and repeat attempt.
102        let desc = self.block_mgr.get_file_desc(file_id).ok_or(Error::file_does_not_exist())?;
103        // try adding a new extent to datastore file
104        self.free_info.get_fi_for_file(desc.file_id, &mut self.file_fi_data.borrow_mut())?;
105        if self.file_fi_data.borrow().size() < desc.max_extent_num {
106            self.block_mgr.add_extent(desc.file_id)?;
107            self.free_info.add_extent(desc.file_id)?;
108            let extent_id = self.file_fi_data.borrow().size();
109            if let Some(block) = self.find_free_block_in_extent(desc.file_id, extent_id)? {
110                return Ok(block);
111            }
112        }
113        
114        return Err(Error::db_size_limit_reached());
115    }
116
117    /// Mark block as full in free info section.
118    pub fn set_free_info_used(&self, block_id: &BlockId) -> Result<(), Error> {
119        // lock extent free info and set the bit for the block accordingly
120        // check if extent changed from full to free or wise versa
121        // if extent changed then lock file free info, and then release extent lock
122        // update file free info accordingly
123        self.free_info.set_block_bit(block_id, true)
124    }
125
126    /// Mark block as having free space in free info section.
127    pub fn set_free_info_free(&self, block_id: &BlockId) -> Result<(), Error> {
128        // lock extent free info and set the bit for the block accordingly
129        // check if extent changed from full to free or wise versa
130        // if extent changed then lock file free info, and then release extent lock
131        // update file free info accordingly
132        self.free_info.set_block_bit(block_id, false)
133    }
134
135    /// Return free data block from checkpoint store.
136    pub fn get_free_checkpoint_block(&self, checkpoint_csn: u64) -> Result<DataBlock, Error> {
137        // determine next available block_id;
138        // find a free block from buffer and assign it to block_id;
139        // return the block.
140        let block_id = self.get_next_checkpoint_block_id(checkpoint_csn);
141        self.block_mgr.allocate_on_cache_mut_no_lock(block_id, BlockType::CheckpointBlock)
142    }
143
144    /// Allocate extent in the versioning store.
145    pub fn allocate_versioning_extent(&self) -> Result<(u16, u16, u16), Error> {
146        self.block_mgr.get_versioning_files(&mut self.file_desc_buf.borrow_mut());
147        let file_desc_set = &self.file_desc_buf.borrow();
148        for desc in file_desc_set.iter() {
149            if desc.extent_num < desc.max_extent_num {
150                self.block_mgr.add_extent(desc.file_id)?;
151                return Ok((desc.file_id, desc.extent_num, desc.extent_size));
152            }
153        }
154        return Err(Error::db_size_limit_reached());
155    }
156
157    /// Allocate block with specified id.
158    pub fn allocate_block_with_id(&self, block_id: &BlockId) -> Result<(), Error> {
159        self.block_mgr.get_data_files(&mut self.file_desc_buf.borrow_mut());
160        let file_desc_set = &self.file_desc_buf.borrow();
161        for desc in file_desc_set.iter() {
162            if desc.file_id == block_id.file_id {
163                let mut extent_num = desc.extent_num;
164                while extent_num <= block_id.extent_id {
165                    if desc.extent_num >= desc.max_extent_num {
166                        return Err(Error::db_size_limit_reached());
167                    } else {
168                        self.block_mgr.add_extent(desc.file_id)?;
169                        self.free_info.add_extent(desc.file_id)?;
170                    }
171                    extent_num += 1;
172                }
173
174                return Ok(());
175            }
176        }
177        return Err(Error::file_does_not_exist());
178    }
179
180    // return next generated checkpoint block_id.
181    fn get_next_checkpoint_block_id(&self, checkpoint_csn: u64) -> BlockId {
182        // use fake block_id because it is not considered by writer when writer
183        // writes checkpoint block to disk.
184        let seq_num = self.checkpoint_store_seq.get_next();
185        let file_id = (checkpoint_csn & 0x1) as u16;                 // file_id 0 and 1 are reserved for checkpointing store file ids.
186        let block_id = (seq_num & 0xffff) as u16;
187        let seq_num = seq_num >> 16;
188        let extent_id = (seq_num & 0xffff) as u16;
189
190        BlockId {
191            file_id,
192            extent_id,
193            block_id,
194        }
195    }
196
197    // find and return block with free space.
198    fn find_free_block(&self, file_id: u16) -> Result<Option<BlockLockedMut<DataBlock>>, Error> {
199        // find exntents with free blocks;
200        // try getting the block.
201        let desc = self.block_mgr.get_file_desc(file_id).ok_or(Error::file_does_not_exist())?;
202        self.free_info.get_fi_for_file(desc.file_id, &mut self.file_fi_data.borrow_mut())?;
203        let file_fi_data = self.file_fi_data.borrow();
204        let mut free_iter = file_fi_data.free_iter();
205        while let Some(extent_id) = free_iter.next() {
206            if let Some(block) = self.find_free_block_in_extent(desc.file_id, extent_id)? {
207                return Ok(Some(block));
208            }
209        }
210        Ok(None)
211    }
212
213    fn find_free_block_in_extent(&self, file_id: u16, extent_id: u16) -> Result<Option<BlockLockedMut<DataBlock>>, Error> {
214        // find a free block in given extent;
215        // try locking the block; 
216        // if the block is already locked then continue search,
217        // else check the used space in the block;
218        // if block is full then unlock and continue search,
219        // else return the block.
220        self.free_info.get_fi_for_extent(file_id, extent_id, &mut self.extent_fi_data.borrow_mut())?;
221        let extent_fi_data = self.extent_fi_data.borrow();
222        let mut free_iter = extent_fi_data.free_iter();
223        while let Some(block_id) = free_iter.next() {
224            let blid = BlockId {
225                file_id,
226                extent_id: extent_id,
227                block_id,
228            };
229
230            match self.block_mgr.get_block_for_write::<DataBlock>(&blid, DataBlock::new, true, 0) {
231                Ok(block) => {
232                    if block.get_used_space() < self.block_mgr.block_fill_size() {
233                        return Ok(Some(block));
234                    }
235                    drop(block);
236                },
237                Err(e) => {
238                    match e.kind() {
239                        ErrorKind::TryLockError => {},
240                        _ => return Err(e)
241                    }
242                },
243            };
244        }
245        Ok(None)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251
252    use super::*;
253    use crate::storage::datastore::DataStore;
254    use crate::storage::datastore::FileType;
255    use crate::storage::datastore::FileState;
256    use crate::block_mgr::block::BasicBlock;
257    use std::path::Path;
258
259
260    fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
261
262        if Path::new(&dspath).exists() {
263            std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
264        }
265        std::fs::create_dir(&dspath).expect("Failed to create test dir");
266
267        let mut fdset = vec![];
268        let desc1 = FileDesc {
269            state:          FileState::InUse,
270            file_id:        3,
271            extent_size:    16,
272            extent_num:     3,
273            max_extent_num: 65500,
274            file_type:      FileType::DataStoreFile,
275        };
276        let desc2 = FileDesc {
277            state:          FileState::InUse,
278            file_id:        4,
279            extent_size:    10,
280            extent_num:     3,
281            max_extent_num: 65500,
282            file_type:      FileType::VersioningStoreFile,
283        };
284        let desc3 = FileDesc {
285            state:          FileState::InUse,
286            file_id:        5,
287            extent_size:    10,
288            extent_num:     3,
289            max_extent_num: 65500,
290            file_type:      FileType::CheckpointStoreFile,
291        };
292
293        fdset.push(desc1);
294        fdset.push(desc2);
295        fdset.push(desc3);
296
297        DataStore::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
298        fdset
299    }
300
301    #[test]
302    fn test_allocator() {
303        let dspath = "/tmp/test_allocator_655637";
304        let block_size = 8192;
305        let block_num = 100;
306
307        let conf = ConfigMt::new();
308        let mut c = conf.get_conf();
309        c.set_datastore_path(dspath.to_owned());
310        c.set_block_mgr_n_lock(10);
311        c.set_block_buf_size(block_num*block_size as u64);
312        drop(c);
313
314        let _init_fdesc = init_datastore(dspath, block_size);
315
316        let block_mgr = Rc::new(BlockMgr::new(conf.clone()).expect("Failed to create instance"));
317
318        let ba = BlockAllocator::new(conf.clone(), block_mgr.clone());
319        let ss = ba.get_shared_state();
320        let ba = BlockAllocator::from_shared_state(block_mgr.clone(), ss).expect("Failed to get block allocator");
321
322        let checkpoint_csn = 34544;
323
324        let file_id = 3;
325        let block_id = BlockId::init(file_id, 1, 1);
326        let block = ba.get_free(file_id).unwrap();
327        assert_eq!(block_id, block.get_id());
328        drop(block);
329        ba.set_free_info_used(&block_id).expect("Failed to set block bit");
330        let block = ba.get_free(file_id).unwrap();
331        assert_eq!(BlockId::init(file_id, 1, 2), block.get_id());
332        drop(block);
333        ba.set_free_info_free(&block_id).expect("Failed to set block bit");
334        let block = ba.get_free(file_id).unwrap();
335        assert_eq!(block_id, block.get_id());
336        drop(block);
337
338
339        let block_id = BlockId::init(file_id, 3, 1);
340        let block = ba.allocate_block(file_id).expect("Failed to allocate block");
341        assert_eq!(block_id, block.get_id());
342        drop(block);
343
344        let block = ba.get_free_checkpoint_block(checkpoint_csn).expect("Failed to get checkpoint block");
345        assert_eq!(BlockId::init(0, 0, 2), block.get_id());
346        drop(block);
347        let block = ba.get_free_checkpoint_block(checkpoint_csn+1).expect("Failed to get checkpoint block");
348        assert_eq!(BlockId::init(1, 0, 3), block.get_id());
349        drop(block);
350
351
352        let block_id = BlockId::init(3, 5, 1);
353        assert!(block_mgr.get_block(&block_id).is_err());
354        ba.allocate_block_with_id(&block_id).expect("Failed to allocate block");
355        assert!(block_mgr.get_block(&block_id).is_ok());
356    }
357}