1use crate::common::errors::Error;
52use crate::common::defs::BlockId;
53use crate::common::crc32;
54use crate::common::misc::SliceToIntConverter;
55use crate::common::misc::alloc_buf;
56use crate::common::misc::dealloc_buf;
57use crate::block_mgr::block::DataBlock;
58use crate::block_mgr::block::FreeInfoSection;
59use crate::block_mgr::block::FreeInfoHeaderSection;
60use crate::block_mgr::block::ExtentHeaderBlock;
61use crate::block_mgr::block::FileHeaderBlock;
62use crate::block_mgr::block::FreeInfoBlock;
63use crate::block_mgr::block::BasicBlock;
64use crate::block_mgr::block::FIBLOCK_HEADER_LEN;
65use crate::block_mgr::block::EHBLOCK_HEADER_LEN;
66use crate::block_mgr::block::FHBLOCK_HEADER_LEN;
67use crate::buf_mgr::buf_mgr::Pinned;
68use crate::buf_mgr::buf_mgr::BlockArea;
69use crate::storage::fs_ops;
70use crate::system::config::ConfigMt;
71use std::io::{Read, Write, Seek, SeekFrom};
72use std::sync::RwLock;
73use std::sync::Arc;
74use std::sync::Mutex;
75use std::sync::RwLockReadGuard;
76use std::sync::atomic::AtomicU16;
77use std::sync::atomic::AtomicU64;
78use std::sync::atomic::Ordering;
79use std::collections::HashMap;
80use std::cell::Ref;
81use std::cell::RefCell;
82use fs2::FileExt;
83use log::warn;
84use std::ops::DerefMut;
85
86
87pub const DATA_FILE_MAGIC: u8 = 0x14;
88pub const VERSIONING_FILE_MAGIC: u8 = 0x85;
89pub const CHECKPOINT_FILE_MAGIC: u8 = 0x35;
90pub const LOCK_FILE: &str = "db.lock";
91
92pub const MAX_EXTENT_SIZE: usize = 65500;
93pub const MIN_BLOCK_SIZE: usize = 1024;
94pub const MAX_BLOCK_SIZE: usize = 65536;
95
96pub const DEFAULT_BLOCK_SIZE: usize = 4096;
97
98pub const MAX_FI_BLOCKS: usize = MAX_EXTENT_SIZE / 8 / MIN_BLOCK_SIZE + 1; pub const MIN_EXTENT_SIZE: usize = MAX_FI_BLOCKS + 2; const FILE_IN_USE_MARK: u8 = 1;
103
104
105pub struct DataStore {
107 desc_repo: FileDescRepo,
108 block_size: usize,
109 block_fill_size: usize,
110 files: RefCell<HashMap<u16, std::fs::File>>,
111 block_buf: RefCell<BlockArea>,
112 path: String,
113 lock_file: Arc<std::fs::File>,
114 seq_file_id: Arc<AtomicU16>,
115 path_buf: RefCell<String>,
116}
117
118
119impl DataStore {
120
121 pub fn new(conf: ConfigMt) -> Result<Self, Error> {
123 let block_fill_ratio = *conf.get_conf().get_block_fill_ratio();
124 let path = conf.get_conf().get_datastore_path().clone();
125
126 let mut ret = Self::open_datastore(path)?;
127 ret.block_fill_size = ret.block_size * block_fill_ratio as usize / 100;
128 Ok(ret)
129 }
130
131 pub fn initialize_datastore(path: &str, block_size: usize, desc_set: &[FileDesc]) -> Result<(), Error> {
133 if (block_size as usize) < MIN_BLOCK_SIZE || (block_size as usize) > MAX_BLOCK_SIZE
134 || !block_size.is_power_of_two() {
135 return Err(Error::incorrect_block_size());
136 }
137
138 let mut lock_file = std::path::PathBuf::new();
140
141 lock_file.push(path);
142 lock_file.push(LOCK_FILE);
143
144 let f = std::fs::OpenOptions::new()
145 .write(true)
146 .create_new(true)
147 .open(lock_file)?;
148
149 f.try_lock_exclusive()?;
150
151 let desc_repo = FileDescRepo::new(vec![], vec![]);
153 let block_fill_size = block_size * 80 / 100;
154 let files = HashMap::new();
155 let block_buf = BlockArea::new(alloc_buf(block_size)?, block_size);
156 let path = path.to_owned();
157 let lock_file = Arc::new(f);
158 let seq_file_id = Arc::new(AtomicU16::new(2));
159
160 let ds = DataStore {
161 desc_repo,
162 block_size,
163 block_fill_size,
164 files: RefCell::new(files),
165 block_buf: RefCell::new(block_buf),
166 path,
167 lock_file,
168 seq_file_id,
169 path_buf: RefCell::new(String::new()),
170 };
171
172 for desc in desc_set {
174 ds.add_datafile(desc.file_type, desc.extent_size, desc.extent_num, desc.max_extent_num)?;
175 }
176
177 Ok(())
178 }
179
180 pub fn load_block(&self, block_id: &BlockId, file_state: FileState) -> Result<Ref<BlockArea>, Error> {
182
183 self.seek_to_block(block_id, file_state)?;
184 let files = self.files.borrow();
185 let mut f = files.get(&block_id.file_id).unwrap();
186
187 let mut block_buf = self.block_buf.borrow_mut();
188 f.read_exact(&mut block_buf)?;
189 drop(files);
190 drop(block_buf);
191
192 let block_buf = self.block_buf.borrow();
193
194 let mut crc32 = crc32::crc32_begin();
196 crc32 = crc32::crc32_arr(crc32, &block_buf[4..]);
197 crc32 = crc32::crc32_finalize(crc32);
198
199 if u32::slice_to_int(&block_buf[0..4]).unwrap() != crc32 {
200 Err(Error::block_crc_mismatch())
201 } else {
202 Ok(block_buf)
203 }
204 }
205
206 pub fn get_file_desc(&self, file_id: u16) -> Option<FileDesc> {
208 self.desc_repo.get_by_file_id(file_id, FileState::InUse)
209 }
210
211 pub fn get_block_size(&self) -> usize {
213 self.block_size
214 }
215
216 pub fn block_fill_size(&self) -> usize {
218 self.block_fill_size
219 }
220
221 pub fn write_block<T: BasicBlock>(&self, block: &mut T, file_state: FileState) -> Result<(), Error> {
223 let block_id = block.get_id();
224
225 self.seek_to_block(&block_id, file_state)?;
226
227 block.prepare_for_write();
228
229 let files = self.files.borrow();
230 let mut f = files.get(&block_id.file_id).unwrap();
231 f.write_all(block.slice())?;
232 drop(files);
233
234 Ok(())
235 }
236
237 pub fn get_checkpoint_files(&self, files: &mut Vec<FileDesc>) {
239 self.desc_repo.fill_files_vec(files, FileType::CheckpointStoreFile, FileState::InUse);
240 }
241
242 pub fn get_versioning_files(&self, files: &mut Vec<FileDesc>) {
244 self.desc_repo.fill_files_vec(files, FileType::VersioningStoreFile, FileState::InUse);
245 }
246
247 pub fn get_data_files(&self, files: &mut Vec<FileDesc>) {
249 self.desc_repo.fill_files_vec(files, FileType::DataStoreFile, FileState::InUse);
250 }
251
252 pub fn clone(&self) -> Result<Self, Error> {
255 let desc_repo = self.desc_repo.clone();
256 let block_size = self.block_size;
257 let block_fill_size = self.block_fill_size;
258 let block_buf = BlockArea::new(alloc_buf(block_size)?, block_size);
259
260 let mut files = HashMap::new();
261 let mut path_buf = String::new();
262 for (file_id, _) in self.files.borrow().iter() {
263 let desc = self.desc_repo.get_by_file_id(*file_id, FileState::InUse).unwrap();
264
265 self.desc_repo.get_path(desc.file_id, &mut path_buf);
266 let f = std::fs::OpenOptions::new()
267 .read(true)
268 .write(true)
269 .create_new(false)
270 .open(&path_buf)?;
271
272 files.insert(*file_id, f);
273 }
274
275 let path = self.path.clone();
276 let lock_file = self.lock_file.clone();
277 let seq_file_id = self.seq_file_id.clone();
278
279 Ok(DataStore {
280 desc_repo,
281 block_size,
282 block_fill_size,
283 files: RefCell::new(files),
284 block_buf: RefCell::new(block_buf),
285 path,
286 lock_file,
287 seq_file_id,
288 path_buf: RefCell::new(String::new()),
289 })
290 }
291
292 pub fn add_extent(&self, file_id: u16, file_state: FileState) -> Result<(), Error> {
294
295 let desc = self.desc_repo.get_by_file_id(file_id, file_state).unwrap();
296
297 if desc.state != file_state {
298 return Err(Error::file_does_not_exist());
299 }
300
301 if desc.extent_num == desc.max_extent_num {
302 return Err(Error::extent_limit_reached());
303 }
304
305 let slock = self.desc_repo.lock_file(file_id).ok_or(Error::file_does_not_exist())?;
307 let mutex = slock.get(&file_id).unwrap();
308 let file_lock = mutex.lock().unwrap();
309
310 let extent_start_pos = ((desc.extent_num - 1) as u64 * desc.extent_size as u64
311 + (self.calc_file_fi_block_num(desc.max_extent_num as usize) as u64) + 1) * self.block_size as u64;
312
313 let files = self.files.borrow();
314 let mut f = files.get(&file_id).unwrap();
315 f.set_len(extent_start_pos + (desc.extent_size as u64 * self.block_size as u64))?;
316 f.seek(SeekFrom::Start(extent_start_pos))?;
317
318 let mut block_id = BlockId {
320 file_id,
321 extent_id: desc.extent_num,
322 block_id: 0,
323 };
324
325 let stub_pin = AtomicU64::new(1000);
326
327 self.zero_block_buf();
328
329 let mut ehb = ExtentHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
330 let fi_block_num = self.calc_extent_fi_block_num(desc.extent_size as usize);
331 ehb.set_fi_size(desc.extent_size);
332 ehb.set_full_cnt(fi_block_num as u16 + 1);
333 for i in 0..fi_block_num + 1 { ehb.fi_slice_mut()[i/8] |= 1 << (i%8);
335 }
336
337 self.write_block_direct(&mut ehb, &f)?;
338
339 if ehb.fi_slice().len() * 8 < desc.extent_size as usize {
340 self.add_fi_blocks(&mut block_id, fi_block_num, &stub_pin, &f)?;
341 }
342
343 self.zero_block_buf();
345
346 let mut db = DataBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
347 for _ in 0..desc.extent_size {
348 block_id.block_id += 1;
349 db.set_id(block_id);
350 self.write_block_direct(&mut db, &f)?;
351 }
352
353 f.sync_data()?;
354 drop(files);
355
356 self.desc_repo.add_extent(file_id);
357
358 drop(file_lock);
359 drop(slock);
360
361 Ok(())
362 }
363
364 pub fn add_datafile(&self, file_type: FileType, extent_size: u16, extent_num: u16, max_extent_num: u16) -> Result<u16, Error> {
367
368 if (extent_size as usize) < MIN_EXTENT_SIZE || (extent_size as usize) > MAX_EXTENT_SIZE {
369 return Err(Error::incorrect_extent_size());
370 }
371
372 if extent_num < 2 || max_extent_num < extent_num {
373 return Err(Error::incorrect_extent_settings());
374 }
375
376 let file_id = self.next_file_id()?;
377
378 let mut file_path = std::path::PathBuf::new();
379 file_path.push(&self.path);
380 file_path.push(format!("{}.dat", file_id));
381
382 if let Some(file_path) = file_path.to_str() {
383 let path = file_path.to_owned();
384
385 let mut desc = FileDesc {
386 state: FileState::Initializing,
387 file_id,
388 extent_size,
389 extent_num: 0,
390 max_extent_num,
391 file_type,
392 };
393
394 self.desc_repo.add_file(desc, path.clone());
395
396 desc.extent_num = extent_num;
397 self.create_file(&desc, &path)?;
398
399 self.desc_repo.add_extent(desc.file_id);
400
401 for _ in 0..extent_num - 1 {
402 self.add_extent(desc.file_id, FileState::Initializing)?;
403 }
404
405 self.set_file_in_use(file_id)?;
407 } else {
408 return Err(Error::failed_to_build_path());
409 }
410
411 Ok(file_id)
412 }
413
414 pub fn calc_extent_fi_block_num(&self, extent_size: usize) -> usize {
416 let byte_sz = (extent_size + 1) / 8;
419 let hdrfilen = self.block_size - EHBLOCK_HEADER_LEN;
420 if byte_sz > hdrfilen {
421 let divizor = self.block_size - FIBLOCK_HEADER_LEN;
422 (byte_sz - hdrfilen + divizor - 1) / divizor
423 } else {
424 0
425 }
426 }
427
428
429 fn calc_file_fi_block_num(&self, max_extent_num: usize) -> usize {
434 let byte_sz = (max_extent_num + 1) / 8;
435 let hdrfilen = self.block_size - FHBLOCK_HEADER_LEN;
436 if byte_sz > hdrfilen {
437 let divizor = self.block_size - FIBLOCK_HEADER_LEN;
438 (byte_sz - hdrfilen + divizor - 1) / divizor
439 } else {
440 0
441 }
442 }
443
444 fn add_fi_blocks(&self, mut block_id: &mut BlockId, fi_block_num: usize, stub_pin: &AtomicU64, f: &std::fs::File) -> Result<(), Error> {
446 self.zero_block_buf();
447
448 let mut fib = FreeInfoBlock::new(*block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), stub_pin));
449
450 for _ in 0..fi_block_num {
451 block_id.block_id += 1;
452 fib.set_id(*block_id);
453 self.write_block_direct(&mut fib, &f)?;
454 }
455
456 Ok(())
457 }
458
459
460 fn open_datastore(path: String) -> Result<DataStore, Error> {
462
463 let mut lock_file = std::path::PathBuf::new();
464
465 lock_file.push(&path);
466 lock_file.push(LOCK_FILE);
467
468 let f = std::fs::OpenOptions::new()
470 .write(true)
471 .create_new(false)
472 .open(lock_file)?;
473
474 f.try_lock_exclusive()?;
475
476 let (files, desc_set, block_size, paths) = Self::load_files(&path)?;
477 let block_buf = BlockArea::new(alloc_buf(block_size)?, block_size);
478
479 let desc_repo = FileDescRepo::new(desc_set, paths);
480 let block_fill_size = block_size * 80 / 100;
481
482 let mut seq_file_id = 2;
483 for (file_id, _) in files.iter() {
484 if *file_id > seq_file_id {
485 seq_file_id = *file_id;
486 }
487 }
488
489 let path = String::from(path);
490 let lock_file = Arc::new(f);
491 let seq_file_id = Arc::new(AtomicU16::new(seq_file_id));
492
493 Ok(DataStore {
494 desc_repo,
495 block_size,
496 block_fill_size,
497 files: RefCell::new(files),
498 block_buf: RefCell::new(block_buf),
499 path,
500 lock_file,
501 seq_file_id,
502 path_buf: RefCell::new(String::new()),
503 })
504 }
505
506
507 fn load_files(datastore_path: &str) -> Result<(HashMap<u16, std::fs::File>, Vec<FileDesc>, usize, Vec<String>), Error> {
509 let mut desc_set = Vec::new();
510 let mut paths = Vec::new();
511 let mut files = HashMap::new();
512 let mut block_size = DEFAULT_BLOCK_SIZE;
513
514 fs_ops::traverse_dir(std::path::Path::new(datastore_path), false, |entry| -> Result<(), Error> {
515 if entry.path().is_file() &&
516 entry.path().extension() == Some(std::ffi::OsStr::new("dat"))
517 {
518 if let Some(path) = entry.path().to_str() {
519 let (f, desc, block_sz) = Self::open_file(&path)?;
520 files.insert(desc.file_id, f);
521 paths.push(path.to_owned());
522 desc_set.push(desc);
523 block_size = block_sz;
524 }
525 }
526
527 Ok(())
528 })?;
529
530 Ok((files, desc_set, block_size, paths))
531 }
532
533
534 fn open_file(path: &str) -> Result<(std::fs::File, FileDesc, usize), Error> {
536
537 let mut f = std::fs::OpenOptions::new()
538 .read(true)
539 .write(true)
540 .create_new(false)
541 .open(path)?;
542
543 let mut header = [0u8; FHBLOCK_HEADER_LEN];
555
556 f.read_exact(&mut header)?;
557 let file_type = match header[18] {
558 DATA_FILE_MAGIC => FileType::DataStoreFile,
559 CHECKPOINT_FILE_MAGIC => FileType::CheckpointStoreFile,
560 VERSIONING_FILE_MAGIC => FileType::VersioningStoreFile,
561 _ => return Err(Error::magic_mismatch())
562 };
563
564 if header[19] != FILE_IN_USE_MARK {
565 return Err(Error::data_file_not_initialized());
566 }
567
568 let block_size = u16::slice_to_int(&header[20..22])?;
569 let extent_size = u16::slice_to_int(&header[22..24])?;
570 let max_extent_num = u16::slice_to_int(&header[24..26])?;
571 let file_id = u16::slice_to_int(&header[26..28])?;
572 let extent_num = if file_type == FileType::CheckpointStoreFile {
573
574 let byte_sz = (max_extent_num + 1) / 8;
576 let hdrfilen = block_size - FHBLOCK_HEADER_LEN as u16;
577 let fhe_size = if byte_sz > hdrfilen {
578 let divizor = block_size - FIBLOCK_HEADER_LEN as u16;
579 (byte_sz - hdrfilen + divizor - 1) / divizor
580 } else {
581 0
582 } + 1;
583 let fhe_size = fhe_size * block_size;
584 ((f.seek(SeekFrom::End(0))? - fhe_size as u64) / extent_size as u64 / block_size as u64) as u16 + 1
585 } else if file_type == FileType::VersioningStoreFile {
586 1
588 } else {
589 u16::slice_to_int(&header[30..32])?
590 };
591
592 Ok((
593 f,
594 FileDesc {
595 state: FileState::InUse,
596 file_id,
597 extent_size,
598 extent_num,
599 max_extent_num,
600 file_type,
601 },
602 block_size as usize
603 ))
604 }
605
606 fn zero_block_buf(&self) {
607 let ba: &mut BlockArea = &mut self.block_buf.borrow_mut();
608 for b in ba.deref_mut() { *b = 0; }
609 }
610
611 fn create_file(&self, desc: &FileDesc, path: &str) -> Result<(), Error> {
612 let f = std::fs::OpenOptions::new()
613 .read(true)
614 .write(true)
615 .create_new(true)
616 .open(path)?;
617
618 let mut block_id = BlockId {
619 file_id: desc.file_id,
620 extent_id: 0,
621 block_id: 0,
622 };
623
624 let stub_pin = AtomicU64::new(1000);
625
626 self.zero_block_buf();
627
628 self.load_fhb_from_desc(desc);
629 let mut fhb = FileHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(self.block_buf.borrow().clone(), &stub_pin));
630 fhb.set_full_cnt(1); fhb.fi_slice_mut()[0] = 0x01;
632 fhb.set_block_size(self.get_block_size() as u16);
633 fhb.set_in_use(false);
634 self.write_block_direct(&mut fhb, &f)?;
635
636 if fhb.fi_slice().len() * 8 < desc.max_extent_num as usize {
637 let fi_block_num = self.calc_file_fi_block_num(desc.max_extent_num as usize);
638 self.add_fi_blocks(&mut block_id, fi_block_num, &stub_pin, &f)?;
639 }
640
641 let mut files = self.files.borrow_mut();
642 files.insert(desc.file_id, f);
643 drop(files);
644
645 Ok(())
646 }
647
648 fn set_file_in_use(&self, file_id: u16) -> Result<(), Error> {
649 let block_id = BlockId {
650 file_id,
651 extent_id: 0,
652 block_id: 0,
653 };
654 let stub_pin = AtomicU64::new(1000);
655 let ba = self.load_block(&block_id, FileState::Initializing)?;
656 let mut fhb = FileHeaderBlock::new(block_id, 0, Pinned::<BlockArea>::new(ba.clone(), &stub_pin));
657 drop(ba);
658
659 fhb.set_in_use(true);
660 self.write_block(&mut fhb, FileState::Initializing)?;
661
662 self.desc_repo.set_state(file_id, FileState::InUse);
663
664 Ok(())
665 }
666
667 fn load_fhb_from_desc(&self, desc: &FileDesc) {
669 let data = &mut self.block_buf.borrow_mut();
670 data[18] = desc.file_type as u8;
671 data[22..24].copy_from_slice(&desc.extent_size.to_ne_bytes());
672 data[24..26].copy_from_slice(&desc.max_extent_num.to_ne_bytes());
673 data[26..28].copy_from_slice(&desc.file_id.to_ne_bytes());
674 data[30..32].copy_from_slice(&desc.extent_num.to_ne_bytes());
675 }
676
677 fn next_file_id(&self) -> Result<u16, Error> {
679 let mut current = self.seq_file_id.load(Ordering::Relaxed);
680 loop {
681 if current == std::u16::MAX {
682 return Err(Error::file_id_overflow());
683 }
684
685 if let Err(ret) = self.seq_file_id.compare_exchange(current, current+1, Ordering::Relaxed, Ordering::Relaxed) {
686 current = ret;
687 } else {
688 return Ok(current + 1);
689 }
690 }
691 }
692
693
694 fn seek_to_block(&self, block_id: &BlockId, file_state: FileState) -> Result<(), Error> {
696 if let Some(desc) = self.desc_repo.get_by_file_id(block_id.file_id, file_state) {
697 let fhe_size = self.calc_file_fi_block_num(desc.max_extent_num as usize) + 1; if desc.extent_num > block_id.extent_id {
699 if (desc.extent_size > block_id.block_id && block_id.extent_id > 0 ) || (block_id.extent_id == 0 && block_id.block_id < fhe_size as u16) {
700 let mut files = self.files.borrow_mut();
701 let mut f = if files.contains_key(&block_id.file_id) {
702 files.get(&block_id.file_id).unwrap()
703 } else {
704 let mut path_buf = self.path_buf.borrow_mut();
705 self.desc_repo.get_path(desc.file_id, &mut path_buf);
706 let file = std::fs::OpenOptions::new()
707 .read(true)
708 .write(true)
709 .create_new(false)
710 .open(&path_buf as &str)?;
711
712 files.insert(block_id.file_id, file);
713 files.get(&block_id.file_id).unwrap()
714 };
715
716 let eff_extent_pos = if block_id.extent_id == 0 {
717 0
718 } else {
719 (block_id.extent_id - 1) as u64 * desc.extent_size as u64 + fhe_size as u64
720 };
721
722 let file_pos = (eff_extent_pos + block_id.block_id as u64) * self.block_size as u64;
723 f.seek(SeekFrom::Start(file_pos))?;
724
725 Ok(())
726 } else {
727 Err(Error::block_does_not_exist())
728 }
729 } else {
730 Err(Error::extent_does_not_exist())
731 }
732 } else {
733 Err(Error::file_does_not_exist())
734 }
735 }
736
737 pub fn write_block_direct<T: BasicBlock>(&self, block: &mut T, mut f: &std::fs::File) -> Result<(), Error> {
739 block.prepare_for_write();
740 f.write_all(block.slice())?;
741 Ok(())
742 }
743}
744
745
746impl Drop for DataStore {
747
748 fn drop(&mut self) {
749
750 for (_, f) in self.files.borrow_mut().drain() {
751 if let Err(e) = f.sync_all() {
752 warn!("Error while closing datastore file: {}", e);
753 }
754 }
755
756 if Arc::strong_count(&self.lock_file) == 1 {
757 if let Err(e) = self.lock_file.unlock() {
758 warn!("Error while closing datastore lock file: {}", e);
759 }
760 }
761
762 let block_buf = self.block_buf.borrow();
763 let v = dealloc_buf(block_buf.data_ptr(), block_buf.size());
764 drop(v);
765 }
766}
767
768
769#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
771pub enum FileState {
772 Initializing = 0,
773 InUse = 1,
774}
775
776#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)]
778pub enum FileType {
779 DataStoreFile = DATA_FILE_MAGIC as isize,
780 CheckpointStoreFile = CHECKPOINT_FILE_MAGIC as isize,
781 VersioningStoreFile = VERSIONING_FILE_MAGIC as isize,
782}
783
784
785
786#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
788pub struct FileDesc {
789 pub state: FileState,
790 pub file_id: u16,
791 pub extent_size: u16,
792 pub extent_num: u16,
793 pub max_extent_num: u16,
794 pub file_type: FileType,
795}
796
797
798#[derive(Clone)]
800struct FileDescRepo {
801 fd: Arc<RwLock<HashMap<u16, (FileDesc, String)>>>,
802 locks: Arc<RwLock<HashMap<u16, Mutex<()>>>>,
803}
804
805impl FileDescRepo {
806
807 fn new(mut desc_set: Vec<FileDesc>, mut paths: Vec<String>) -> Self {
808 let mut hm = HashMap::new();
809 let mut hm2 = HashMap::new();
810 for desc in desc_set.drain(..) {
811 hm.insert(desc.file_id, (desc, paths.remove(0)));
812 hm2.insert(desc.file_id, Mutex::new(()));
813 }
814 let fd = Arc::new(RwLock::new(hm));
815 let locks = Arc::new(RwLock::new(hm2));
816
817 FileDescRepo {
818 fd,
819 locks,
820 }
821 }
822
823 fn add_file(&self, desc: FileDesc, path: String) {
824 let mut xlock = self.fd.write().unwrap();
825 xlock.insert(desc.file_id, (desc, path));
826 let mut xlock = self.locks.write().unwrap();
827 xlock.insert(desc.file_id, Mutex::new(()));
828 }
829
830 fn fill_files_vec(&self, files: &mut Vec<FileDesc>, file_type: FileType, state: FileState) {
831 files.truncate(0);
832 let slock = self.fd.read().unwrap();
833 for (_, desc) in slock.iter() {
834 if desc.0.file_type == file_type && desc.0.state == state {
835 files.push(desc.0);
836 }
837 }
838 }
839
840 fn get_by_file_id(&self, file_id: u16, state: FileState) -> Option<FileDesc> {
841 let slock = self.fd.read().unwrap();
842 if let Some(desc) = slock.get(&file_id) {
843 if desc.0.state == state {
844 Some(desc.0)
845 } else {
846 None
847 }
848 } else {
849 None
850 }
851 }
852
853 fn get_path(&self, file_id: u16, dst: &mut String) {
854 let slock = self.fd.read().unwrap();
855 if let Some(desc) = slock.get(&file_id) {
856 dst.truncate(0);
857 dst.push_str(&desc.1);
858 }
859 }
860
861 fn add_extent(&self, file_id: u16) {
862 let mut xlock = self.fd.write().unwrap();
863 let mut desc = xlock.get_mut(&file_id).unwrap();
864 desc.0.extent_num += 1;
865 }
866
867 fn set_state(&self, file_id: u16, state: FileState) {
868 let mut xlock = self.fd.write().unwrap();
869 let mut desc = xlock.get_mut(&file_id).unwrap();
870 desc.0.state = state;
871 }
872
873
874 fn lock_file(&self, file_id: u16) -> Option<RwLockReadGuard<HashMap<u16, Mutex<()>>>> {
876 let slock = self.locks.read().unwrap();
877 if slock.contains_key(&file_id) {
878 Some(slock)
879 } else {
880 None
881 }
882 }
883}
884
885
886#[cfg(test)]
887mod tests {
888
889 use super::*;
890 use std::path::Path;
891
892 #[test]
893 fn test_datastore() {
894
895 let dspath = "/tmp/test456343567578".to_owned();
896 let block_size = 8192;
897
898 if Path::new(&dspath).exists() {
899 std::fs::remove_dir_all(&dspath).expect("Failed to delete test dir on cleanup");
900 }
901 std::fs::create_dir(&dspath).expect("Failed to create test dir");
902
903
904 let conf = ConfigMt::new();
905 let mut c = conf.get_conf();
906 c.set_datastore_path(dspath.clone());
907 drop(c);
908
909 let mut fdset = vec![];
910 let desc1 = FileDesc {
911 state: FileState::InUse,
912 file_id: 3,
913 extent_size: 10,
914 extent_num: 2,
915 max_extent_num: 2,
916 file_type: FileType::DataStoreFile,
917 };
918 let mut desc2 = FileDesc {
919 state: FileState::InUse,
920 file_id: 4,
921 extent_size: 10,
922 extent_num: 2,
923 max_extent_num: 65500,
924 file_type: FileType::VersioningStoreFile,
925 };
926 let desc3 = FileDesc {
927 state: FileState::InUse,
928 file_id: 5,
929 extent_size: 10,
930 extent_num: 3,
931 max_extent_num: 65500,
932 file_type: FileType::CheckpointStoreFile,
933 };
934
935 fdset.push(desc1);
936 fdset.push(desc2);
937 fdset.push(desc3);
938
939 DataStore::initialize_datastore(&dspath, block_size, &fdset).expect("Failed to init datastore");
940 desc2.extent_num = 1;
941 fdset[1].extent_num = 1; let ds = DataStore::new(conf.clone()).expect("Failed to open datastore");
944
945 for desc in fdset {
946 for extent_id in 0..desc.extent_num {
947 let mut block_id = BlockId {
948 file_id: desc.file_id,
949 extent_id,
950 block_id: 0,
951 };
952
953 let bn = if extent_id == 0 {
954 ds.calc_file_fi_block_num(desc.max_extent_num as usize)
955 } else {
956 desc.extent_num as usize
957 };
958
959 for bid in 0..bn {
960 block_id.block_id = bid as u16;
961 let block: Ref<BlockArea> = ds.load_block(&block_id, FileState::InUse).expect("Failed to load block");
962 drop(block);
963 }
964 }
965 }
966
967 let desc = ds.get_file_desc(3).expect("No file description for file id found");
968 assert_eq!(desc, desc1);
969 let desc = ds.get_file_desc(4).expect("No file description for file id found");
970 assert_eq!(desc, desc2);
971 let desc = ds.get_file_desc(5).expect("No file description for file id found");
972 assert_eq!(desc, desc3);
973
974 let bsz = ds.get_block_size();
975 assert_eq!(block_size, bsz);
976
977 let bfsz = ds.block_fill_size();
978 assert_eq!(*conf.get_conf().get_block_fill_ratio() as usize * block_size / 100, bfsz);
979
980 let block_id = BlockId {
981 file_id: 3,
982 extent_id: 1,
983 block_id: 5,
984 };
985 let stub_pin = AtomicU64::new(1000);
986 let block_buf = BlockArea::new(alloc_buf(block_size).expect("Allocation failure"), block_size);
987 let mut db = DataBlock::new(block_id, 0, Pinned::<BlockArea>::new(block_buf.clone(), &stub_pin));
988 ds.write_block(&mut db, FileState::InUse).expect("Failed to write block to disk");
989 drop(db);
990 dealloc_buf(block_buf.data_ptr(), block_size);
991
992 let mut files = Vec::new();
993 ds.get_checkpoint_files(&mut files);
994 assert_eq!(1, files.len());
995 assert_eq!(desc3, files[0]);
996
997 ds.get_versioning_files(&mut files);
998 assert_eq!(1, files.len());
999 assert_eq!(desc2, files[0]);
1000
1001 ds.get_data_files(&mut files);
1002 assert_eq!(1, files.len());
1003 assert_eq!(desc1, files[0]);
1004
1005 let ds2 = ds.clone().expect("Failed to clone ds");
1006
1007 ds.add_extent(5, FileState::InUse).expect("Failed to add extent");
1008 let desc = ds.get_file_desc(5).expect("No file description for file id found");
1009 assert_eq!(desc.extent_num, desc3.extent_num + 1);
1010
1011 let mut block_id = BlockId {
1012 file_id: 5,
1013 extent_id: desc.extent_num-1,
1014 block_id: 0,
1015 };
1016 for i in 0..desc.extent_size {
1017 block_id.block_id = i;
1018 let block: Ref<BlockArea> = ds.load_block(&block_id, FileState::InUse).expect("Failed to load block");
1019 drop(block);
1020 }
1021
1022 let file_id = ds2.add_datafile(FileType::DataStoreFile, 16, 3, 3).expect("Failed to add a data file");
1023 let desc = ds.get_file_desc(file_id).expect("No file description for file id found");
1024 assert_eq!(FileType::DataStoreFile, desc.file_type);
1025 assert_eq!(16, desc.extent_size);
1026 assert_eq!(3, desc.extent_num);
1027 assert_eq!(3, desc.max_extent_num);
1028 }
1029}
1030