db_core/storage/
datastore.rs

1//! Datastore provides access to data blocks, versioning store, and checkpoint data on disk.
2//! 
3//! A set of files represent datastore on the disk. Each file consists of individual blocks.
4//! There are several types of blocks:
5//!  - file header block - the first block of the file, contains metadata about the file.
6//!  - extent header block - the first block of each extent.
7//!  - free info block - optional block containing bitmap of free blocks/extents in extent/file.
8//!  - data block - a block containing data itself.
9//!
10//! Sample file structure:
11//!
12//! +---- file header ----+
13//! |  file header block  |
14//! | [ free info block ] |
15//! | [ free info block ] |
16//! | [      ...        ] |
17//! +----- extent 1 ------+
18//! | extent header block |
19//! | [ free info block ] |
20//! | [ free info block ] |
21//! | [      ...        ] |
22//! |     data block      |
23//! |     data block      |
24//! |     data block      |
25//! |     data block      |
26//! |        ...          |
27//! +----- extent 2 ------+
28//! | extent header block |
29//! | [ free info block ] |
30//! | [ free info block ] |
31//! | [      ...        ] |
32//! |     data block      |
33//! |     data block      |
34//! |     data block      |
35//! |     data block      |
36//! |        ...          |
37//! +---------------------+
38//!
39//! The first 4 bytes of block is crc32 sum. 
40//! After crc depending on block type follow several metadata fields.
41//! After metadata follows free info bitmap for header blocks, or data entries list for data
42//! blocks.
43//!
44//! Extent is an extention unit for a data file, a continuous sequence of blocks.
45//!
46//! Entries start from the bottom and are added from bottom to top. 
47//!
48//! Exact fields and structure is given in module crate::block_mgr::block.
49//!
50
51use crate::common::errors::Error;
52use crate::common::defs::BlockId;
53use crate::common::crc32;
54use crate::common::misc::SliceToIntConverter;
55use crate::common::misc::alloc_buf;
56use crate::common::misc::dealloc_buf;
57use crate::block_mgr::block::DataBlock;
58use crate::block_mgr::block::FreeInfoSection;
59use crate::block_mgr::block::FreeInfoHeaderSection;
60use crate::block_mgr::block::ExtentHeaderBlock;
61use crate::block_mgr::block::FileHeaderBlock;
62use crate::block_mgr::block::FreeInfoBlock;
63use crate::block_mgr::block::BasicBlock;
64use crate::block_mgr::block::FIBLOCK_HEADER_LEN;
65use crate::block_mgr::block::EHBLOCK_HEADER_LEN;
66use crate::block_mgr::block::FHBLOCK_HEADER_LEN;
67use crate::buf_mgr::buf_mgr::Pinned;
68use crate::buf_mgr::buf_mgr::BlockArea;
69use crate::storage::fs_ops;
70use crate::system::config::ConfigMt;
71use std::io::{Read, Write, Seek, SeekFrom};
72use std::sync::RwLock;
73use std::sync::Arc;
74use std::sync::Mutex;
75use std::sync::RwLockReadGuard;
76use std::sync::atomic::AtomicU16;
77use std::sync::atomic::AtomicU64;
78use std::sync::atomic::Ordering; 
79use std::collections::HashMap;
80use std::cell::Ref;
81use std::cell::RefCell;
82use fs2::FileExt;
83use log::warn;
84use std::ops::DerefMut;
85
86
87pub const DATA_FILE_MAGIC: u8 = 0x14;
88pub const VERSIONING_FILE_MAGIC: u8 = 0x85;
89pub const CHECKPOINT_FILE_MAGIC: u8 = 0x35;
90pub const LOCK_FILE: &str = "db.lock";
91
92pub const MAX_EXTENT_SIZE: usize = 65500;
93pub const MIN_BLOCK_SIZE: usize = 1024;
94pub const MAX_BLOCK_SIZE: usize = 65536;
95
96pub const DEFAULT_BLOCK_SIZE: usize = 4096;
97
98pub const MAX_FI_BLOCKS: usize = MAX_EXTENT_SIZE / 8 / MIN_BLOCK_SIZE + 1;  // max number of free info blocks (excluding header).
99
100pub const MIN_EXTENT_SIZE: usize = MAX_FI_BLOCKS + 2;  // max fi blocks + header block + at least one data block.
101
102const FILE_IN_USE_MARK: u8 = 1;
103
104
105/// Datastore letes manipulate data files.
106pub struct DataStore {
107    desc_repo:          FileDescRepo,
108    block_size:         usize,
109    block_fill_size:    usize,
110    files:              RefCell<HashMap<u16, std::fs::File>>,
111    block_buf:          RefCell<BlockArea>,
112    path:               String,
113    lock_file:          Arc<std::fs::File>,
114    seq_file_id:        Arc<AtomicU16>,
115    path_buf:           RefCell<String>,
116}
117
118
119impl DataStore {
120
121    /// Create instance of DataStore.
122    pub fn new(conf: ConfigMt) -> Result<Self, Error> {
123        let block_fill_ratio = *conf.get_conf().get_block_fill_ratio();
124        let path = conf.get_conf().get_datastore_path().clone();
125
126        let mut ret = Self::open_datastore(path)?;
127        ret.block_fill_size = ret.block_size * block_fill_ratio as usize / 100;
128        Ok(ret)
129    }
130
131    /// Initialize datastore: create data, checkpoint, versioning store files, and lock file.
132    pub fn initialize_datastore(path: &str, block_size: usize, desc_set: &[FileDesc]) -> Result<(), Error> {
133        if (block_size as usize) < MIN_BLOCK_SIZE || (block_size as usize) > MAX_BLOCK_SIZE 
134            || !block_size.is_power_of_two() {
135            return Err(Error::incorrect_block_size());
136        }
137
138        // create lock file, and lock datastore
139        let mut lock_file = std::path::PathBuf::new();
140
141        lock_file.push(path);
142        lock_file.push(LOCK_FILE);
143
144        let f = std::fs::OpenOptions::new()
145                .write(true)
146                .create_new(true)
147                .open(lock_file)?;
148
149        f.try_lock_exclusive()?;
150
151        // initialize empty datastore
152        let desc_repo =          FileDescRepo::new(vec![], vec![]);
153        let block_fill_size =    block_size * 80 / 100;
154        let files =              HashMap::new();
155        let block_buf =          BlockArea::new(alloc_buf(block_size)?, block_size);
156        let path =               path.to_owned();
157        let lock_file =          Arc::new(f);
158        let seq_file_id =        Arc::new(AtomicU16::new(2));
159
160        let ds = DataStore {
161            desc_repo, 
162            block_size,
163            block_fill_size,
164            files: RefCell::new(files),
165            block_buf: RefCell::new(block_buf),
166            path,
167            lock_file,
168            seq_file_id,
169            path_buf: RefCell::new(String::new()),
170        };
171
172        // add data files
173        for desc in desc_set {
174            ds.add_datafile(desc.file_type, desc.extent_size, desc.extent_num, desc.max_extent_num)?;
175        }
176
177        Ok(())
178    }
179
180    /// Load block from disk.
181    pub fn load_block(&self, block_id: &BlockId, file_state: FileState) -> Result<Ref<BlockArea>, Error> {
182
183        self.seek_to_block(block_id, file_state)?;
184        let files = self.files.borrow();
185        let mut f = files.get(&block_id.file_id).unwrap();
186
187        let mut block_buf = self.block_buf.borrow_mut();
188        f.read_exact(&mut block_buf)?;
189        drop(files);
190        drop(block_buf);
191
192        let block_buf = self.block_buf.borrow();
193
194        // check crc
195        let mut crc32 = crc32::crc32_begin();
196        crc32 = crc32::crc32_arr(crc32, &block_buf[4..]);
197        crc32 = crc32::crc32_finalize(crc32);
198
199        if u32::slice_to_int(&block_buf[0..4]).unwrap() != crc32 {
200            Err(Error::block_crc_mismatch())
201        } else {
202            Ok(block_buf)
203        }
204    }
205
206    /// Return file descriptive information.
207    pub fn get_file_desc(&self, file_id: u16) -> Option<FileDesc> {
208        self.desc_repo.get_by_file_id(file_id, FileState::InUse)
209    }
210
211    /// Return block size in bytes.
212    pub fn get_block_size(&self) -> usize {
213        self.block_size
214    }
215
216    /// Return how much bytes are taken in a full block.
217    pub fn block_fill_size(&self) -> usize {
218        self.block_fill_size
219    }
220
221    /// Write a block to disk.
222    pub fn write_block<T: BasicBlock>(&self, block: &mut T, file_state: FileState) -> Result<(), Error> {
223        let block_id = block.get_id();
224
225        self.seek_to_block(&block_id, file_state)?;
226
227        block.prepare_for_write();
228
229        let files = self.files.borrow();
230        let mut f = files.get(&block_id.file_id).unwrap();
231        f.write_all(block.slice())?;
232        drop(files);
233
234        Ok(())
235    }
236
237    /// Return iterator over checkpoint store files.
238    pub fn get_checkpoint_files(&self, files: &mut Vec<FileDesc>) {
239        self.desc_repo.fill_files_vec(files, FileType::CheckpointStoreFile, FileState::InUse);
240    }
241
242    /// Return iterator over versioning store files.
243    pub fn get_versioning_files(&self, files: &mut Vec<FileDesc>) {
244        self.desc_repo.fill_files_vec(files, FileType::VersioningStoreFile, FileState::InUse);
245    }
246
247    /// Return iterator over data files.
248    pub fn get_data_files(&self, files: &mut Vec<FileDesc>) {
249        self.desc_repo.fill_files_vec(files, FileType::DataStoreFile, FileState::InUse);
250    }
251
252    /// Clone this instance. Clone operations opens separate file descriptors for each of data
253    /// files.
254    pub fn clone(&self) -> Result<Self, Error> {
255        let desc_repo          = self.desc_repo.clone();
256        let block_size         = self.block_size;
257        let block_fill_size    = self.block_fill_size;
258        let block_buf          = BlockArea::new(alloc_buf(block_size)?, block_size);
259
260        let mut files          = HashMap::new(); 
261        let mut path_buf       = String::new();
262        for (file_id, _) in self.files.borrow().iter() {
263            let desc = self.desc_repo.get_by_file_id(*file_id, FileState::InUse).unwrap();
264
265            self.desc_repo.get_path(desc.file_id, &mut path_buf);
266            let f = std::fs::OpenOptions::new()
267                    .read(true)
268                    .write(true)
269                    .create_new(false)
270                    .open(&path_buf)?;
271
272            files.insert(*file_id, f);
273        }
274
275        let path        = self.path.clone();
276        let lock_file   = self.lock_file.clone();
277        let seq_file_id = self.seq_file_id.clone();
278
279        Ok(DataStore {
280            desc_repo, 
281            block_size,
282            block_fill_size,
283            files: RefCell::new(files),
284            block_buf: RefCell::new(block_buf),
285            path,
286            lock_file,
287            seq_file_id,
288            path_buf: RefCell::new(String::new()),
289        })
290    }
291
292    /// Add a new extent to a file.
293    pub fn add_extent(&self, file_id: u16, file_state: FileState) -> Result<(), Error> {
294
295        let desc = self.desc_repo.get_by_file_id(file_id, file_state).unwrap();
296
297        if desc.state != file_state {
298            return Err(Error::file_does_not_exist());
299        }
300
301        if desc.extent_num == desc.max_extent_num {
302            return Err(Error::extent_limit_reached());
303        }
304
305        // get mutex for file, and in critical section increase file size and format extent.
306        let slock = self.desc_repo.lock_file(file_id).ok_or(Error::file_does_not_exist())?;
307        let mutex = slock.get(&file_id).unwrap();
308        let file_lock = mutex.lock().unwrap();
309
310        let extent_start_pos = ((desc.extent_num - 1) as u64 * desc.extent_size as u64 
311            + (self.calc_file_fi_block_num(desc.max_extent_num as usize) as u64) + 1) * self.block_size as u64;
312
313        let files = self.files.borrow();
314        let mut f = files.get(&file_id).unwrap();
315        f.set_len(extent_start_pos + (desc.extent_size as u64 * self.block_size as u64))?;
316        f.seek(SeekFrom::Start(extent_start_pos))?;
317
318        // write extent header block and free info block(s)
319        let mut block_id = BlockId {
320            file_id,
321            extent_id: desc.extent_num,
322            block_id: 0,
323        };
324
325        let stub_pin = AtomicU64::new(1000);
326
327        self.zero_block_buf();
328
329        let mut ehb = ExtentHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
330        let fi_block_num = self.calc_extent_fi_block_num(desc.extent_size as usize);
331        ehb.set_fi_size(desc.extent_size);
332        ehb.set_full_cnt(fi_block_num as u16 + 1);
333        for i in 0..fi_block_num + 1 {              // header and fi blocks are always full
334            ehb.fi_slice_mut()[i/8] |= 1 << (i%8);
335        }
336
337        self.write_block_direct(&mut ehb, &f)?;
338
339        if ehb.fi_slice().len() * 8 < desc.extent_size as usize {
340            self.add_fi_blocks(&mut block_id, fi_block_num, &stub_pin, &f)?;
341        }
342
343        // format data blocks
344        self.zero_block_buf();
345
346        let mut db = DataBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
347        for _ in 0..desc.extent_size {
348            block_id.block_id += 1;
349            db.set_id(block_id);
350            self.write_block_direct(&mut db, &f)?;
351        }
352
353        f.sync_data()?;
354        drop(files);
355
356        self.desc_repo.add_extent(file_id);
357
358        drop(file_lock);
359        drop(slock);
360        
361        Ok(())
362    }
363
364    /// Add a new file to datastore. extent_num is the number of data extents, and excludes extent 0
365    /// which is file header.
366    pub fn add_datafile(&self, file_type: FileType, extent_size: u16, extent_num: u16, max_extent_num: u16) -> Result<u16, Error> {
367
368        if (extent_size as usize) < MIN_EXTENT_SIZE || (extent_size as usize) > MAX_EXTENT_SIZE {
369            return Err(Error::incorrect_extent_size());
370        }
371
372        if extent_num < 2 || max_extent_num < extent_num {
373            return Err(Error::incorrect_extent_settings());
374        }
375
376        let file_id = self.next_file_id()?;
377
378        let mut file_path = std::path::PathBuf::new();
379        file_path.push(&self.path);
380        file_path.push(format!("{}.dat", file_id));
381
382        if let Some(file_path) = file_path.to_str() {
383            let path = file_path.to_owned();
384
385            let mut desc = FileDesc {
386                state: FileState::Initializing,
387                file_id,
388                extent_size,
389                extent_num: 0,  
390                max_extent_num,
391                file_type,
392            };
393
394            self.desc_repo.add_file(desc, path.clone());
395
396            desc.extent_num = extent_num;
397            self.create_file(&desc, &path)?;
398
399            self.desc_repo.add_extent(desc.file_id);
400
401            for _ in 0..extent_num - 1 {
402                self.add_extent(desc.file_id, FileState::Initializing)?;
403            }
404
405            // set "in use"
406            self.set_file_in_use(file_id)?;
407        } else {
408            return Err(Error::failed_to_build_path());
409        }
410
411        Ok(file_id)
412    }
413
414    /// Return number of free info blocks for an extent of a certain size.
415    pub fn calc_extent_fi_block_num(&self, extent_size: usize) -> usize {
416        // we don't subtract header and fi blocks from extent size that means unused fi block can
417        // be created.
418        let byte_sz = (extent_size + 1) / 8;
419        let hdrfilen = self.block_size - EHBLOCK_HEADER_LEN;
420        if byte_sz > hdrfilen {
421            let divizor = self.block_size - FIBLOCK_HEADER_LEN;
422            (byte_sz - hdrfilen + divizor - 1) / divizor
423        } else {
424            0
425        }
426    }
427
428
429    // private
430
431
432    // calc number of fi blocks for file.
433    fn calc_file_fi_block_num(&self, max_extent_num: usize) -> usize {
434        let byte_sz = (max_extent_num + 1) / 8;
435        let hdrfilen = self.block_size - FHBLOCK_HEADER_LEN;
436        if byte_sz > hdrfilen {
437            let divizor = self.block_size - FIBLOCK_HEADER_LEN;
438            (byte_sz - hdrfilen + divizor - 1) / divizor
439        } else {
440            0
441        }
442    }
443
444    // add free info blocks to file/extent header
445    fn add_fi_blocks(&self, mut block_id: &mut BlockId, fi_block_num: usize, stub_pin: &AtomicU64, f: &std::fs::File) -> Result<(), Error> {
446        self.zero_block_buf();
447
448        let mut fib = FreeInfoBlock::new(*block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), stub_pin));
449
450        for _ in 0..fi_block_num {
451            block_id.block_id += 1;
452            fib.set_id(*block_id);
453            self.write_block_direct(&mut fib, &f)?;
454        }
455
456        Ok(())
457    }
458
459
460    // open a datastore
461    fn open_datastore(path: String) -> Result<DataStore, Error> {
462
463        let mut lock_file = std::path::PathBuf::new();
464
465        lock_file.push(&path);
466        lock_file.push(LOCK_FILE);
467
468        // try to acquire exlusive lock for datastore
469        let f = std::fs::OpenOptions::new()
470                .write(true)
471                .create_new(false)
472                .open(lock_file)?;
473
474        f.try_lock_exclusive()?;
475
476        let (files, desc_set, block_size, paths)   = Self::load_files(&path)?;
477        let block_buf           = BlockArea::new(alloc_buf(block_size)?, block_size);
478
479        let desc_repo           = FileDescRepo::new(desc_set, paths);
480        let block_fill_size     = block_size * 80 / 100;
481
482        let mut seq_file_id = 2;
483        for (file_id, _) in files.iter() {
484            if *file_id > seq_file_id {
485                seq_file_id = *file_id;
486            }
487        }
488
489        let path                = String::from(path);
490        let lock_file           = Arc::new(f);
491        let seq_file_id         = Arc::new(AtomicU16::new(seq_file_id));
492
493        Ok(DataStore {
494            desc_repo, 
495            block_size,
496            block_fill_size,
497            files: RefCell::new(files),
498            block_buf: RefCell::new(block_buf),
499            path,
500            lock_file,
501            seq_file_id,
502            path_buf: RefCell::new(String::new()),
503        })
504    }
505
506
507    /// find and load datastore files
508    fn load_files(datastore_path: &str) -> Result<(HashMap<u16, std::fs::File>, Vec<FileDesc>, usize, Vec<String>), Error> {
509        let mut desc_set = Vec::new();
510        let mut paths = Vec::new();
511        let mut files = HashMap::new();
512        let mut block_size = DEFAULT_BLOCK_SIZE;
513
514        fs_ops::traverse_dir(std::path::Path::new(datastore_path), false, |entry| -> Result<(), Error> {
515            if entry.path().is_file() && 
516                entry.path().extension() == Some(std::ffi::OsStr::new("dat"))
517            {
518                if let Some(path) = entry.path().to_str() {
519                    let (f, desc, block_sz) = Self::open_file(&path)?;
520                    files.insert(desc.file_id, f);
521                    paths.push(path.to_owned());
522                    desc_set.push(desc);
523                    block_size = block_sz;
524                }
525            }
526
527            Ok(())
528        })?;
529
530        Ok((files, desc_set, block_size, paths))
531    }
532
533
534    // open a file and read header information.
535    fn open_file(path: &str) -> Result<(std::fs::File, FileDesc, usize), Error> {
536
537        let mut f = std::fs::OpenOptions::new()
538                .read(true)
539                .write(true)
540                .create_new(false)
541                .open(path)?;
542
543        // crc              - u32: header block crc
544        // checkpoint_csn   - u64: checkpoint csn
545        // original_id      - 3 x u16: in checkpoint blocks points to the original block id.
546        // magic            - u8: either magic for regular data file, or for checkpoint, or versioning data.
547        // use_mark         - u8: 1 - file is initialized and is in use, 0 - otherwise.
548        // block_size       - u16: block size
549        // extent_size      - u16: extent_size size
550        // max_extent_num   - u16: maximum allowed number of extents in this file
551        // file_id          - u16: file id
552        // fi_full_cnt      - u16: number of full blocks in the extent (number of set bits in free_info)
553        // fi_length        - u16: number of bits in free_blocks bitmask
554        let mut header = [0u8; FHBLOCK_HEADER_LEN];
555
556        f.read_exact(&mut header)?;
557        let file_type = match header[18] {
558            DATA_FILE_MAGIC => FileType::DataStoreFile,
559            CHECKPOINT_FILE_MAGIC => FileType::CheckpointStoreFile,
560            VERSIONING_FILE_MAGIC => FileType::VersioningStoreFile,
561            _ => return Err(Error::magic_mismatch())
562        };
563
564        if header[19] != FILE_IN_USE_MARK {
565            return Err(Error::data_file_not_initialized());
566        }
567
568        let block_size = u16::slice_to_int(&header[20..22])?;
569        let extent_size = u16::slice_to_int(&header[22..24])?;
570        let max_extent_num = u16::slice_to_int(&header[24..26])?;
571        let file_id = u16::slice_to_int(&header[26..28])?;
572        let extent_num = if file_type == FileType::CheckpointStoreFile {
573
574            // checkpoint store files don't track extent_num in the header
575            let byte_sz = (max_extent_num + 1) / 8;
576            let hdrfilen = block_size - FHBLOCK_HEADER_LEN as u16;
577            let fhe_size =  if byte_sz > hdrfilen {
578                let divizor = block_size - FIBLOCK_HEADER_LEN as u16;
579                (byte_sz - hdrfilen + divizor - 1) / divizor
580            } else {
581                0
582            } + 1;
583            let fhe_size = fhe_size * block_size;
584            ((f.seek(SeekFrom::End(0))? - fhe_size as u64) / extent_size as u64 / block_size as u64) as u16 + 1
585        } else if file_type == FileType::VersioningStoreFile {
586            // for versioning files existing extents are discarded
587            1
588        } else {
589            u16::slice_to_int(&header[30..32])?
590        };
591
592        Ok((
593            f,
594            FileDesc {
595                state: FileState::InUse,
596                file_id,
597                extent_size,
598                extent_num,
599                max_extent_num,
600                file_type,
601            },
602            block_size as usize
603        ))
604    }
605
606    fn zero_block_buf(&self) {
607        let ba: &mut BlockArea = &mut self.block_buf.borrow_mut();
608        for b in ba.deref_mut() { *b = 0; }
609    }
610
611    fn create_file(&self, desc: &FileDesc, path: &str) -> Result<(), Error> {
612        let f = std::fs::OpenOptions::new()
613                .read(true)
614                .write(true)
615                .create_new(true)
616                .open(path)?;
617
618        let mut block_id = BlockId {
619            file_id: desc.file_id,
620            extent_id: 0,
621            block_id: 0,
622        };
623
624        let stub_pin = AtomicU64::new(1000);
625
626        self.zero_block_buf();
627
628        self.load_fhb_from_desc(desc);
629        let mut fhb = FileHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
630        fhb.set_full_cnt(1);            // extent 0 is always full
631        fhb.fi_slice_mut()[0] = 0x01;
632        fhb.set_block_size(self.get_block_size() as u16);
633        fhb.set_in_use(false);
634        self.write_block_direct(&mut fhb, &f)?;
635
636        if fhb.fi_slice().len() * 8 < desc.max_extent_num as usize {
637            let fi_block_num = self.calc_file_fi_block_num(desc.max_extent_num as usize);
638            self.add_fi_blocks(&mut block_id, fi_block_num, &stub_pin, &f)?;
639        }
640
641        let mut files = self.files.borrow_mut();
642        files.insert(desc.file_id, f);
643        drop(files);
644
645        Ok(())
646    }
647
648    fn set_file_in_use(&self, file_id: u16) -> Result<(), Error> {
649        let block_id = BlockId {
650            file_id,
651            extent_id: 0,
652            block_id: 0,
653        };
654        let stub_pin = AtomicU64::new(1000);
655        let ba = self.load_block(&block_id, FileState::Initializing)?;
656        let mut fhb = FileHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(ba.clone(), &stub_pin));
657        drop(ba);
658
659        fhb.set_in_use(true);
660        self.write_block(&mut fhb, FileState::Initializing)?;
661
662        self.desc_repo.set_state(file_id, FileState::InUse);
663
664        Ok(())
665    }
666
667    // load file header block from file description.
668    fn load_fhb_from_desc(&self, desc: &FileDesc) {
669        let data = &mut self.block_buf.borrow_mut();
670        data[18] = desc.file_type as u8;
671        data[22..24].copy_from_slice(&desc.extent_size.to_ne_bytes());
672        data[24..26].copy_from_slice(&desc.max_extent_num.to_ne_bytes());
673        data[26..28].copy_from_slice(&desc.file_id.to_ne_bytes());
674        data[30..32].copy_from_slice(&desc.extent_num.to_ne_bytes());
675    }
676
677    /// return next free file id for a new data file
678    fn next_file_id(&self) -> Result<u16, Error> {
679        let mut current = self.seq_file_id.load(Ordering::Relaxed);
680        loop {
681            if current == std::u16::MAX {
682                return Err(Error::file_id_overflow());
683            }
684
685            if let Err(ret) = self.seq_file_id.compare_exchange(current, current+1, Ordering::Relaxed, Ordering::Relaxed) {
686                current = ret;
687            } else {
688                return Ok(current + 1);
689            }
690        }
691    }
692
693
694    // check if block exists, and seek in file to block position.
695    fn seek_to_block(&self, block_id: &BlockId, file_state: FileState) -> Result<(), Error> {
696        if let Some(desc) = self.desc_repo.get_by_file_id(block_id.file_id, file_state) {
697            let fhe_size = self.calc_file_fi_block_num(desc.max_extent_num as usize) + 1;   // size of extent 0
698            if desc.extent_num > block_id.extent_id {
699                if (desc.extent_size > block_id.block_id && block_id.extent_id > 0 ) || (block_id.extent_id == 0 && block_id.block_id < fhe_size as u16) {
700                    let mut files = self.files.borrow_mut();
701                    let mut f = if files.contains_key(&block_id.file_id) {
702                        files.get(&block_id.file_id).unwrap()
703                    } else {
704                        let mut path_buf = self.path_buf.borrow_mut();
705                        self.desc_repo.get_path(desc.file_id, &mut path_buf);
706                        let file = std::fs::OpenOptions::new()
707                                .read(true)
708                                .write(true)
709                                .create_new(false)
710                                .open(&path_buf as &str)?;
711
712                        files.insert(block_id.file_id, file);
713                        files.get(&block_id.file_id).unwrap()
714                    };
715
716                    let eff_extent_pos = if block_id.extent_id == 0 {
717                        0
718                    } else {
719                        (block_id.extent_id - 1) as u64 * desc.extent_size as u64 + fhe_size as u64
720                    };
721
722                    let file_pos = (eff_extent_pos + block_id.block_id as u64) * self.block_size as u64;
723                    f.seek(SeekFrom::Start(file_pos))?;
724
725                    Ok(())
726                } else {
727                    Err(Error::block_does_not_exist())
728                }
729            } else {
730                Err(Error::extent_does_not_exist())
731            }
732        } else {
733            Err(Error::file_does_not_exist())
734        }
735    }
736
737    // Write a block to disk without seeking.
738    pub fn write_block_direct<T: BasicBlock>(&self, block: &mut T, mut f: &std::fs::File) -> Result<(), Error> {
739        block.prepare_for_write();
740        f.write_all(block.slice())?;
741        Ok(())
742    }
743}
744
745
746impl Drop for DataStore {
747
748    fn drop(&mut self) {
749
750        for (_, f) in self.files.borrow_mut().drain() {
751            if let Err(e) = f.sync_all() {
752                warn!("Error while closing datastore file: {}", e);
753            }
754        }
755
756        if Arc::strong_count(&self.lock_file) == 1 {
757            if let Err(e) = self.lock_file.unlock() {
758                warn!("Error while closing datastore lock file: {}", e);
759            }
760        }
761
762        let block_buf = self.block_buf.borrow();
763        let v = dealloc_buf(block_buf.data_ptr(), block_buf.size());
764        drop(v);
765    }
766}
767
768
769/// State of database file.
770#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
771pub enum FileState {
772    Initializing = 0,
773    InUse = 1,
774}
775
776/// Type of a database file.
777#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)]
778pub enum FileType {
779    DataStoreFile = DATA_FILE_MAGIC as isize,
780    CheckpointStoreFile = CHECKPOINT_FILE_MAGIC as isize,
781    VersioningStoreFile = VERSIONING_FILE_MAGIC as isize,
782}
783
784
785
786/// Descriptive information about a data store file.
787#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
788pub struct FileDesc {
789    pub state:          FileState,
790    pub file_id:        u16,
791    pub extent_size:    u16,
792    pub extent_num:     u16,
793    pub max_extent_num: u16,
794    pub file_type:      FileType,
795}
796
797
798/// Repository of data store files metadata shared by all threads.
799#[derive(Clone)]
800struct FileDescRepo {
801    fd:     Arc<RwLock<HashMap<u16, (FileDesc, String)>>>,
802    locks:  Arc<RwLock<HashMap<u16, Mutex<()>>>>,
803}
804
805impl FileDescRepo {
806
807    fn new(mut desc_set: Vec<FileDesc>, mut paths: Vec<String>) -> Self {
808        let mut hm = HashMap::new();
809        let mut hm2 = HashMap::new();
810        for desc in desc_set.drain(..) {
811            hm.insert(desc.file_id, (desc, paths.remove(0)));
812            hm2.insert(desc.file_id, Mutex::new(()));
813        }
814        let fd = Arc::new(RwLock::new(hm));
815        let locks = Arc::new(RwLock::new(hm2));
816
817        FileDescRepo {
818            fd,
819            locks,
820        }
821    }
822
823    fn add_file(&self, desc: FileDesc, path: String) {
824        let mut xlock = self.fd.write().unwrap();
825        xlock.insert(desc.file_id, (desc, path));
826        let mut xlock = self.locks.write().unwrap();
827        xlock.insert(desc.file_id, Mutex::new(()));
828    }
829
830    fn fill_files_vec(&self, files: &mut Vec<FileDesc>, file_type: FileType, state: FileState) {
831        files.truncate(0);
832        let slock = self.fd.read().unwrap();
833        for (_, desc) in slock.iter() {
834            if desc.0.file_type == file_type && desc.0.state == state {
835                files.push(desc.0);
836            }
837        }
838    }
839
840    fn get_by_file_id(&self, file_id: u16, state: FileState) -> Option<FileDesc> {
841        let slock = self.fd.read().unwrap();
842        if let Some(desc) = slock.get(&file_id) {
843            if desc.0.state == state {
844                Some(desc.0)
845            } else {
846                None
847            }
848        } else {
849            None
850        }
851    }
852
853    fn get_path(&self, file_id: u16, dst: &mut String) {
854        let slock = self.fd.read().unwrap();
855        if let Some(desc) = slock.get(&file_id) {
856            dst.truncate(0);
857            dst.push_str(&desc.1);
858        }
859    }
860
861    fn add_extent(&self, file_id: u16) {
862        let mut xlock = self.fd.write().unwrap();
863        let mut desc = xlock.get_mut(&file_id).unwrap();
864        desc.0.extent_num += 1;
865    }
866
867    fn set_state(&self, file_id: u16, state: FileState) {
868        let mut xlock = self.fd.write().unwrap();
869        let mut desc = xlock.get_mut(&file_id).unwrap();
870        desc.0.state = state;
871    }
872
873
874    /// Acquire lock for a certain file_id, and return desc along the way.
875    fn lock_file(&self, file_id: u16) -> Option<RwLockReadGuard<HashMap<u16, Mutex<()>>>> {
876        let slock = self.locks.read().unwrap();
877        if slock.contains_key(&file_id) {
878            Some(slock)
879        } else {
880            None
881        }
882    }
883}
884
885
886#[cfg(test)]
887mod tests {
888
889    use super::*;
890    use std::path::Path;
891
892    #[test]
893    fn test_datastore() {
894
895        let dspath = "/tmp/test456343567578".to_owned();
896        let block_size = 8192;
897        
898        if Path::new(&dspath).exists() {
899            std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
900        }
901        std::fs::create_dir(&dspath).expect("Failed to create test dir");
902
903
904        let conf = ConfigMt::new();
905        let mut c = conf.get_conf();
906        c.set_datastore_path(dspath.clone());
907        drop(c);
908
909        let mut fdset = vec![];
910        let desc1 = FileDesc {
911            state:          FileState::InUse,
912            file_id:        3,
913            extent_size:    10,
914            extent_num:     2,
915            max_extent_num: 2,
916            file_type:      FileType::DataStoreFile,
917        };
918        let mut desc2 = FileDesc {
919            state:          FileState::InUse,
920            file_id:        4,
921            extent_size:    10,
922            extent_num:     2,
923            max_extent_num: 65500,
924            file_type:      FileType::VersioningStoreFile,
925        };
926        let desc3 = FileDesc {
927            state:          FileState::InUse,
928            file_id:        5,
929            extent_size:    10,
930            extent_num:     3,
931            max_extent_num: 65500,
932            file_type:      FileType::CheckpointStoreFile,
933        };
934
935        fdset.push(desc1);
936        fdset.push(desc2);
937        fdset.push(desc3);
938
939        DataStore::initialize_datastore(&dspath, block_size, &fdset).expect("Failed to init datastore");
940        desc2.extent_num = 1;
941        fdset[1].extent_num = 1; // versioning store has no extents allocated
942
943        let ds = DataStore::new(conf.clone()).expect("Failed to open datastore");
944
945        for desc in fdset {
946            for extent_id in 0..desc.extent_num {
947                let mut block_id = BlockId {
948                    file_id: desc.file_id,
949                    extent_id,
950                    block_id: 0,
951                };
952
953                let bn = if extent_id == 0 {
954                    ds.calc_file_fi_block_num(desc.max_extent_num as usize)
955                } else {
956                    desc.extent_num as usize
957                };
958
959                for bid in 0..bn {
960                    block_id.block_id = bid as u16;
961                    let block: Ref<BlockArea> = ds.load_block(&block_id, FileState::InUse).expect("Failed to load block");
962                    drop(block);
963                }
964            }
965        }
966
967        let desc = ds.get_file_desc(3).expect("No file description for file id found");
968        assert_eq!(desc, desc1);
969        let desc = ds.get_file_desc(4).expect("No file description for file id found");
970        assert_eq!(desc, desc2);
971        let desc = ds.get_file_desc(5).expect("No file description for file id found");
972        assert_eq!(desc, desc3);
973
974        let bsz = ds.get_block_size();
975        assert_eq!(block_size, bsz);
976
977        let bfsz = ds.block_fill_size();
978        assert_eq!(*conf.get_conf().get_block_fill_ratio() as usize * block_size / 100, bfsz);
979
980        let block_id = BlockId {
981            file_id: 3,
982            extent_id: 1,
983            block_id: 5,
984        };
985        let stub_pin =  AtomicU64::new(1000);
986        let block_buf = BlockArea::new(alloc_buf(block_size).expect("Allocation failure"), block_size);
987        let mut db = DataBlock::new(block_id, 0, Pinned::<BlockArea>::new(block_buf.clone(), &stub_pin));
988        ds.write_block(&mut db, FileState::InUse).expect("Failed to write block to disk");
989        drop(db);
990        dealloc_buf(block_buf.data_ptr(), block_size);
991
992        let mut files = Vec::new();
993        ds.get_checkpoint_files(&mut files);
994        assert_eq!(1, files.len());
995        assert_eq!(desc3, files[0]);
996
997        ds.get_versioning_files(&mut files);
998        assert_eq!(1, files.len());
999        assert_eq!(desc2, files[0]);
1000
1001        ds.get_data_files(&mut files);
1002        assert_eq!(1, files.len());
1003        assert_eq!(desc1, files[0]);
1004
1005        let ds2 = ds.clone().expect("Failed to clone ds");
1006
1007        ds.add_extent(5, FileState::InUse).expect("Failed to add extent");
1008        let desc = ds.get_file_desc(5).expect("No file description for file id found");
1009        assert_eq!(desc.extent_num, desc3.extent_num + 1);
1010
1011        let mut block_id = BlockId {
1012            file_id: 5,
1013            extent_id: desc.extent_num-1,
1014            block_id: 0,
1015        };
1016        for i in 0..desc.extent_size {
1017            block_id.block_id = i;
1018            let block: Ref<BlockArea> = ds.load_block(&block_id, FileState::InUse).expect("Failed to load block");
1019            drop(block);
1020        }
1021
1022        let file_id = ds2.add_datafile(FileType::DataStoreFile, 16, 3, 3).expect("Failed to add a data file");
1023        let desc = ds.get_file_desc(file_id).expect("No file description for file id found");
1024        assert_eq!(FileType::DataStoreFile, desc.file_type);
1025        assert_eq!(16, desc.extent_size);
1026        assert_eq!(3, desc.extent_num);
1027        assert_eq!(3, desc.max_extent_num);
1028    }
1029}
1030