bitkv_rs/
db.rs

1#![allow(clippy::redundant_closure)]
2use crate::{
3  batch::{log_record_key_with_seq, parse_log_record_key, NON_TXN_SEQ_NO},
4  data::{
5    data_file::{DataFile, DATA_FILE_NAME_SUFFIX, MERGE_FINISHED_FILE_NAME, SEQ_NO_FILE_NAME},
6    log_record::{LogRecord, LogRecordPos, LogRecordType, TransactionRecord},
7  },
8  errors::{Errors, Result},
9  index,
10  merge::load_merge_files,
11  option::{IOManagerType, IndexType, Options},
12  util,
13};
14use bytes::Bytes;
15use fs2::FileExt;
16use log::{error, warn};
17use parking_lot::{Mutex, RwLock};
18use std::{
19  collections::HashMap,
20  fs::{self, File},
21  path::Path,
22  sync::{
23    atomic::{AtomicUsize, Ordering},
24    Arc,
25  },
26};
27
28const INITIAL_FILE_ID: u32 = 0;
29const SEQ_NO_KEY: &str = "seq.no";
30pub(crate) const FILE_LOCK_NAME: &str = "flock";
31
32pub enum SeqNoExist {
33  Yes(usize),
34  None,
35}
36
37// Storage Engine
38pub struct Engine {
39  pub(crate) options: Arc<Options>,
40  pub(crate) active_data_file: Arc<RwLock<DataFile>>, // current active data file
41  pub(crate) old_data_files: Arc<RwLock<HashMap<u32, DataFile>>>, // old data files
42  pub(crate) index: Box<dyn index::Indexer>,          // data cache index
43  file_ids: Vec<u32>, // database setup file id list, only used for setup, not allowed to be modified or updated somewhere else
44  pub(crate) batch_commit_lock: Mutex<()>, // txn commit lock ensure serializable
45  pub(crate) seq_no: Arc<AtomicUsize>, // transaction sequence number
46  pub(crate) merging_lock: Mutex<()>, // prevent multiple threads from merging data files at the same time
47  pub(crate) seq_file_exists: bool,   // whether the seq_no file exists
48  pub(crate) is_initial: bool,        // whether the engine is initialized
49  lock_file: File, // file lock, ensure only one engine instance can open the database directory
50  bytes_write: Arc<AtomicUsize>, // the add up number of bytes written
51  pub(crate) reclaim_size: Arc<AtomicUsize>, // the add up number of bytes to be merged
52}
53
54// engine statistics info
55#[derive(Debug, Clone)]
56pub struct Stat {
57  // number of keys
58  pub key_num: usize,
59
60  // number of data files
61  pub data_file_num: usize,
62
63  // number of data files to be merged
64  pub reclaim_size: usize,
65
66  // total directory size on disk
67  pub disk_size: u64,
68}
69
70impl Engine {
71  /// open bitkv storage engine instance
72  pub fn open(opts: Options) -> Result<Self> {
73    // check user options
74    if let Some(e) = check_options(&opts) {
75      return Err(e);
76    };
77    let mut is_initial = false;
78    let options = Arc::new(opts);
79
80    // determine if dir is valid, dir does not exist, create a new one
81    let dir_path = &options.dir_path;
82    if !dir_path.is_dir() {
83      is_initial = true;
84      if let Err(e) = fs::create_dir(dir_path.as_path()) {
85        warn!("failed to create database directory error: {}", e);
86        return Err(Errors::FailedToCreateDatabaseDir);
87      };
88    }
89
90    // determine if dir is empty, if empty, set is_initial to true
91    let lock_file = fs::OpenOptions::new()
92      .read(true)
93      .create(true)
94      .append(true)
95      .open(dir_path.join(FILE_LOCK_NAME))
96      .unwrap();
97    if lock_file.try_lock_exclusive().is_err() {
98      return Err(Errors::DatabaseIsUsing);
99    }
100
101    let entry = fs::read_dir(dir_path).unwrap();
102    if entry.count() == 0 {
103      is_initial = true;
104    }
105    // load merge files
106    load_merge_files(dir_path)?;
107
108    // load data files
109    let mut data_files = load_data_files(dir_path, options.mmap_at_startup)?;
110
111    // set file id info
112    let mut file_ids = Vec::new();
113    for v in data_files.iter() {
114      file_ids.push(v.get_file_id());
115    }
116    // adjust file_ids order, let current file id in the first place
117    data_files.reverse();
118
119    // save old file into older_files
120    let mut older_files = HashMap::new();
121    if data_files.len() > 1 {
122      for _ in 0..=data_files.len() - 2 {
123        let file = data_files.pop().unwrap();
124        older_files.insert(file.get_file_id(), file);
125      }
126    }
127
128    // Retrieve the active data file, which is the last one in the data_files
129    let active_file = match data_files.pop() {
130      Some(v) => v,
131      None => DataFile::new(dir_path, INITIAL_FILE_ID, IOManagerType::StandardFileIO)?,
132    };
133
134    // create a new engine instance
135    let mut engine = Self {
136      options: options.clone(),
137      active_data_file: Arc::new(RwLock::new(active_file)),
138      old_data_files: Arc::new(RwLock::new(older_files)),
139      index: index::new_indexer(&options.index_type, &options.dir_path),
140      file_ids,
141      batch_commit_lock: Mutex::new(()),
142      seq_no: Arc::new(AtomicUsize::new(1)),
143      merging_lock: Mutex::new(()),
144      seq_file_exists: false,
145      is_initial,
146      lock_file,
147      bytes_write: Arc::new(AtomicUsize::new(0)),
148      reclaim_size: Arc::new(AtomicUsize::new(0)),
149    };
150
151    // if not B+Tree index type, load index from hint file and data files
152    match engine.options.index_type {
153      IndexType::BPlusTree => {
154        // load seq_no from current transaction
155        let (is_exists, seq_no) = engine.load_seq_no();
156        if is_exists {
157          engine.seq_no.store(seq_no, Ordering::SeqCst);
158          engine.seq_file_exists = is_exists;
159        }
160
161        // update offset of active data file
162        let active_file = engine.active_data_file.write();
163        active_file.set_write_off(active_file.file_size());
164      }
165      _ => {
166        // load index from hint file
167        engine.load_index_from_hint_file()?;
168
169        // load index from data files
170        let curr_seq_no = engine.load_index_from_data_files()?;
171
172        // update seq_no
173        if curr_seq_no > 0 {
174          engine
175            .seq_no
176            .store(curr_seq_no + 1, std::sync::atomic::Ordering::Relaxed);
177        }
178
179        // reset io_manager type
180        if engine.options.mmap_at_startup {
181          engine.reset_io_type();
182        }
183      }
184    }
185
186    Ok(engine)
187  }
188
189  /// close engine, release resources
190  pub fn close(&self) -> Result<()> {
191    // if dir_path doesn't exist, return
192    if !self.options.dir_path.is_dir() {
193      return Ok(());
194    }
195    // load seq_no from current transaction
196    let seq_no_file = DataFile::new_seq_no_file(&self.options.dir_path)?;
197    let seq_no = self.seq_no.load(Ordering::SeqCst);
198    let record = LogRecord {
199      key: SEQ_NO_KEY.as_bytes().to_vec(),
200      value: seq_no.to_string().into(),
201      rec_type: LogRecordType::Normal,
202    };
203    seq_no_file.write(&record.encode())?;
204    seq_no_file.sync()?;
205
206    let read_guard = self.active_data_file.read();
207    read_guard.sync()?;
208
209    // release file lock
210    self.lock_file.unlock().unwrap();
211
212    Ok(())
213  }
214
215  /// sync current active data file to disk
216  pub fn sync(&self) -> Result<()> {
217    let read_guard = self.active_data_file.read();
218    read_guard.sync()
219  }
220
221  pub fn get_engine_stat(&self) -> Result<Stat> {
222    let keys = self.list_keys()?;
223    let old_files = self.old_data_files.read();
224
225    Ok(Stat {
226      key_num: keys.len(),
227      data_file_num: old_files.len() + 1,
228      reclaim_size: self.reclaim_size.load(Ordering::SeqCst),
229      disk_size: util::file::dir_disk_size(&self.options.dir_path),
230    })
231  }
232
233  /// backup data directory
234  pub fn backup<P>(&self, dir_path: P) -> Result<()>
235  where
236    P: AsRef<Path>,
237  {
238    let exclude = &[FILE_LOCK_NAME];
239    if let Err(e) = util::file::copy_dir(
240      &self.options.dir_path,
241      &dir_path.as_ref().to_path_buf(),
242      exclude,
243    ) {
244      log::error!("failed to copy data directory error: {}", e);
245      return Err(Errors::FailedToCopyDirectory);
246    }
247    Ok(())
248  }
249
250  /// store a key/value pair, ensuring key isn't null.
251  pub fn put(&self, key: Bytes, value: Bytes) -> Result<()> {
252    // if the key is valid
253    if key.is_empty() {
254      return Err(Errors::KeyIsEmpty);
255    }
256
257    // construct LogRecord
258    let mut record = LogRecord {
259      key: log_record_key_with_seq(key.to_vec(), NON_TXN_SEQ_NO),
260      value: value.to_vec(),
261      rec_type: LogRecordType::Normal,
262    };
263
264    // appending write to active file
265    let log_record_pos = self.append_log_record(&mut record)?;
266
267    // update index
268    if let Some(old_pos) = self.index.put(key.to_vec(), log_record_pos) {
269      self
270        .reclaim_size
271        .fetch_add(old_pos.size as usize, Ordering::SeqCst);
272    }
273    Ok(())
274  }
275
276  // delete the data associated with the specified key.
277  pub fn delete(&self, key: Bytes) -> Result<()> {
278    // if the key is valid
279    if key.is_empty() {
280      return Err(Errors::KeyIsEmpty);
281    }
282
283    // retrieve specified data from index if it not exists then return
284    let pos = self.index.get(key.to_vec());
285    if pos.is_none() {
286      return Ok(());
287    }
288
289    // construct LogRecord
290    let mut record = LogRecord {
291      key: log_record_key_with_seq(key.to_vec(), NON_TXN_SEQ_NO),
292      value: Default::default(),
293      rec_type: LogRecordType::Deleted,
294    };
295
296    // appending write to active file
297    let pos = self.append_log_record(&mut record)?;
298    self
299      .reclaim_size
300      .fetch_add(pos.size as usize, Ordering::SeqCst);
301
302    // delete key in index
303    if let Some(old_pos) = self.index.delete(key.to_vec()) {
304      self
305        .reclaim_size
306        .fetch_add(old_pos.size as usize, Ordering::SeqCst);
307    }
308    Ok(())
309  }
310
311  /// Retrieves the data associated with the specified key.
312  pub fn get(&self, key: Bytes) -> Result<Bytes> {
313    // if the key is empty then return
314    if key.is_empty() {
315      return Err(Errors::KeyIsEmpty);
316    }
317
318    // Retrieves data for the specified key from the in-memory index.
319    let pos = self.index.get(key.to_vec());
320
321    // if key not found then return
322    if pos.is_none() {
323      return Err(Errors::KeyNotFound);
324    }
325
326    // Retrieves LogRecord from the specified file data.
327    self.get_value_by_position(&pos.unwrap())
328  }
329
330  /// Retrieves the data by position.
331  pub(crate) fn get_value_by_position(&self, log_record_pos: &LogRecordPos) -> Result<Bytes> {
332    // Retrieves LogRecord from the specified file data.
333    let active_file = self.active_data_file.read();
334    let oldre_files = self.old_data_files.read();
335    let log_record = match active_file.get_file_id() == log_record_pos.file_id {
336      true => active_file.read_log_record(log_record_pos.offset)?.record,
337      false => {
338        let data_file = oldre_files.get(&log_record_pos.file_id);
339        if data_file.is_none() {
340          // Returns the error if the corresponding data file is not found.
341          return Err(Errors::DataFileNotFound);
342        }
343        data_file
344          .unwrap()
345          .read_log_record(log_record_pos.offset)?
346          .record
347      }
348    };
349
350    // Determines the type of the log record.
351    if let LogRecordType::Deleted = log_record.rec_type {
352      return Err(Errors::KeyNotFound);
353    };
354
355    // return corresponding value
356    Ok(log_record.value.into())
357  }
358
359  /// append write data to current active data file
360  pub(crate) fn append_log_record(&self, log_record: &mut LogRecord) -> Result<LogRecordPos> {
361    let dir_path = &self.options.dir_path;
362
363    // encode input data
364    let enc_record = log_record.encode();
365    let record_len = enc_record.len() as u64;
366
367    // obtain current active file
368    let mut active_file = self.active_data_file.write();
369    if active_file.get_write_off() + record_len > self.options.data_file_size {
370      // active file persistence
371      active_file.sync()?;
372
373      let current_fid = active_file.get_file_id();
374
375      // insert old data file to hash map
376      let mut old_files = self.old_data_files.write();
377      let old_file = DataFile::new(dir_path, current_fid, IOManagerType::StandardFileIO)?;
378      old_files.insert(current_fid, old_file);
379
380      // open a new active data file
381      let new_file = DataFile::new(dir_path, current_fid + 1, IOManagerType::StandardFileIO)?;
382      *active_file = new_file;
383    }
384
385    // append write to active file
386    let write_off = active_file.get_write_off();
387    active_file.write(&enc_record)?;
388
389    let previous = self
390      .bytes_write
391      .fetch_add(enc_record.len(), Ordering::SeqCst);
392
393    // options to sync or not
394    let mut need_sync = self.options.sync_writes;
395    if !need_sync
396      && self.options.bytes_per_sync > 0
397      && previous + enc_record.len() >= self.options.bytes_per_sync
398    {
399      need_sync = true;
400      self.bytes_write.store(0, Ordering::SeqCst);
401    }
402
403    if need_sync {
404      active_file.sync()?;
405
406      self.bytes_write.store(0, Ordering::SeqCst);
407    }
408
409    // construct log record return info
410    Ok(LogRecordPos {
411      file_id: active_file.get_file_id(),
412      offset: write_off,
413      size: enc_record.len() as u32,
414    })
415  }
416
417  /// load memory index from data files
418  /// traverse all data files, and process each log record
419
420  fn load_index_from_data_files(&self) -> Result<usize> {
421    let mut current_seq_no = NON_TXN_SEQ_NO;
422    // if data_files is empty then return
423    if self.file_ids.is_empty() {
424      return Ok(current_seq_no);
425    }
426
427    // get latest unmerged file id
428    let mut has_merged = false;
429    let mut non_merge_fid = 0;
430    let merge_fin_file = self.options.dir_path.join(MERGE_FINISHED_FILE_NAME);
431    if merge_fin_file.is_file() {
432      let merge_file = DataFile::new_merge_fin_file(&self.options.dir_path)?;
433      let merge_fin_record = merge_file.read_log_record(0)?;
434      let v = String::from_utf8(merge_fin_record.record.value).unwrap();
435
436      non_merge_fid = v.parse::<u32>().unwrap();
437      has_merged = true;
438    }
439
440    // temporary store data related to txn
441    let mut transaction_records = HashMap::new();
442
443    let active_file = self.active_data_file.read();
444    let old_files = self.old_data_files.read();
445
446    // traverse each file_id, retrieve data file and load its data
447    for (i, file_id) in self.file_ids.iter().enumerate() {
448      // if file_id is less than non_merge_fid, then skip
449      if has_merged && *file_id < non_merge_fid {
450        continue;
451      }
452
453      let mut offset = 0;
454      loop {
455        // read data in loop
456        let log_record_res = match *file_id == active_file.get_file_id() {
457          true => active_file.read_log_record(offset),
458          _ => {
459            let data_file = old_files.get(file_id).unwrap();
460            data_file.read_log_record(offset)
461          }
462        };
463
464        let (mut log_record, size) = match log_record_res {
465          Ok(result) => (result.record, result.size),
466          Err(e) => {
467            if e == Errors::ReadDataFileEOF {
468              break;
469            }
470            return Err(e);
471          }
472        };
473
474        // construct memory index
475        let log_record_pos = LogRecordPos {
476          file_id: *file_id,
477          offset,
478          size: size as u32,
479        };
480
481        // parse key, obtain actual key and seq_no
482        let (real_key, seq_no) = parse_log_record_key(log_record.key.clone());
483        // non txn log record, update index as usual
484        if seq_no == NON_TXN_SEQ_NO {
485          self.update_index(real_key, log_record.rec_type, log_record_pos)?;
486        } else {
487          // txn log record commit, update index
488          if log_record.rec_type == LogRecordType::TxnFinished {
489            let records: &Vec<TransactionRecord> = transaction_records.get(&seq_no).unwrap();
490            for txn_record in records.iter() {
491              self.update_index(
492                txn_record.record.key.clone(),
493                txn_record.record.rec_type,
494                txn_record.pos,
495              )?;
496            }
497            transaction_records.remove(&seq_no);
498          } else {
499            log_record.key = real_key;
500            transaction_records
501              .entry(seq_no)
502              .or_insert_with(|| Vec::new())
503              .push(TransactionRecord {
504                record: log_record,
505                pos: log_record_pos,
506              });
507          }
508        }
509
510        // seq_no update
511        if seq_no > current_seq_no {
512          current_seq_no = seq_no;
513        }
514
515        // offset move, read next log record
516        offset += size as u64;
517      }
518
519      // set active file offset
520      if i == self.file_ids.len() - 1 {
521        active_file.set_write_off(offset);
522      }
523    }
524    Ok(current_seq_no)
525  }
526
527  /// load seq_no under B+Tree index type
528  fn load_seq_no(&self) -> (bool, usize) {
529    let file_name = self.options.dir_path.join(SEQ_NO_FILE_NAME);
530    if !file_name.is_file() {
531      return (false, 0);
532    }
533    let seq_no_file = DataFile::new_seq_no_file(&self.options.dir_path).unwrap();
534    let record = match seq_no_file.read_log_record(0) {
535      Ok(res) => res.record,
536      Err(e) => panic!("failed to read seq_no: {}", e),
537    };
538    let v = String::from_utf8(record.value).unwrap();
539    let seq_no = v.parse::<usize>().unwrap();
540
541    // remove seq_no file, avoiding repeated writing
542    fs::remove_file(file_name).unwrap();
543
544    (true, seq_no)
545  }
546
547  /// Updates in-memory index upon loading
548  ///
549  /// This function updates the in-memory data based on the type of log record (normal or deleted).
550  /// For a normal record, it adds or updates the key's position in the index. If the key previously existed,
551  /// it increments a counter for reclaimed space size with the old position's size.
552  /// For a deleted record, it removes the key from the index and updates the reclaimed space size counter accordingly.
553  ///
554  fn update_index(&self, key: Vec<u8>, rec_type: LogRecordType, pos: LogRecordPos) -> Result<()> {
555    if rec_type == LogRecordType::Normal {
556      if let Some(old_pos) = self.index.put(key.clone(), pos) {
557        // Increments the reclaimed space size counter by the size of the old position.
558        self
559          .reclaim_size
560          .fetch_add(old_pos.size as usize, Ordering::SeqCst);
561      }
562    }
563
564    if rec_type == LogRecordType::Deleted {
565      // Starts with the current record's size for the reclaimed space.
566      let mut size = pos.size;
567      // Attempts to remove the key from the index. If the key exists, returns the old position.
568      if let Some(old_pos) = self.index.delete(key) {
569        // Adds the size of the old position to the reclaimed space size.
570        size += old_pos.size;
571      }
572      // Updates the reclaimed space size counter.
573      self.reclaim_size.fetch_add(size as usize, Ordering::SeqCst);
574    }
575    Ok(())
576  }
577
578  /// reset io_manager type for all data files
579  fn reset_io_type(&self) {
580    let mut active_file = self.active_data_file.write();
581    active_file.set_io_manager(&self.options.dir_path, IOManagerType::StandardFileIO);
582    let mut old_files = self.old_data_files.write();
583    for (_, file) in old_files.iter_mut() {
584      file.set_io_manager(&self.options.dir_path, IOManagerType::StandardFileIO);
585    }
586  }
587}
588
589impl Drop for Engine {
590  fn drop(&mut self) {
591    if let Err(e) = self.close() {
592      error!("error while closing engine {}", e);
593    }
594  }
595}
596
597// load data files from database directory
598fn load_data_files<P>(dir_path: P, use_mmap: bool) -> Result<Vec<DataFile>>
599where
600  P: AsRef<Path>,
601{
602  // read database directory
603  let dir = fs::read_dir(&dir_path);
604  if dir.is_err() {
605    return Err(Errors::FailedToReadDatabaseDir);
606  }
607
608  let mut file_ids: Vec<u32> = Vec::new();
609  let mut data_files: Vec<DataFile> = Vec::new();
610
611  for file in dir.unwrap().flatten() {
612    // Retrieve file name
613    let file_os_str = file.file_name();
614    let file_name = file_os_str.to_str().unwrap();
615
616    // determine if file name ends up with .data
617    if file_name.ends_with(DATA_FILE_NAME_SUFFIX) {
618      let splited_names: Vec<&str> = file_name.split('.').collect();
619      let file_id = match splited_names[0].parse::<u32>() {
620        Ok(fid) => fid,
621        Err(_) => {
622          return Err(Errors::DatabaseDirectoryCorrupted);
623        }
624      };
625
626      file_ids.push(file_id);
627    }
628  }
629
630  // if data file is empty then return
631  if file_ids.is_empty() {
632    return Ok(data_files);
633  }
634
635  // sort file_ids, loading from small to large
636  file_ids.sort();
637
638  // traverse file_ids, sequentially loading data files
639  for file_id in file_ids.iter() {
640    let mut io_type = IOManagerType::StandardFileIO;
641    if use_mmap {
642      io_type = IOManagerType::MemoryMap;
643    }
644    let data_file = DataFile::new(&dir_path, *file_id, io_type)?;
645    data_files.push(data_file);
646  }
647  Ok(data_files)
648}
649
650fn check_options(opts: &Options) -> Option<Errors> {
651  let dir_path = opts.dir_path.to_str();
652  if dir_path.is_none() || dir_path.unwrap().is_empty() {
653    return Some(Errors::DirPathIsEmpty);
654  }
655
656  if opts.data_file_size == 0 {
657    return Some(Errors::DataFileSizeTooSmall);
658  }
659
660  if opts.file_merge_threshold < 0f32 || opts.file_merge_threshold > 1f32 {
661    return Some(Errors::InvalidMergeThreshold);
662  }
663
664  None
665}