bitkv_rs/
merge.rs

1#![allow(clippy::field_reassign_with_default)]
2use std::{
3  fs,
4  path::{Path, PathBuf},
5  sync::atomic::Ordering,
6};
7
8use log::error;
9
10use crate::{
11  batch::{log_record_key_with_seq, parse_log_record_key, NON_TXN_SEQ_NO},
12  data::{
13    data_file::{
14      get_data_file_name, DataFile, DATA_FILE_NAME_SUFFIX, HINT_FILE_NAME,
15      MERGE_FINISHED_FILE_NAME, SEQ_NO_FILE_NAME,
16    },
17    log_record::{decode_log_record_pos, LogRecord, LogRecordType},
18  },
19  db::{Engine, FILE_LOCK_NAME},
20  errors::{Errors, Result},
21  option::{IOManagerType, Options},
22  util,
23};
24
25const MERGE_DIR_NAME: &str = "merge";
26const MERGE_FIN_KEY: &[u8] = "merge.finished".as_bytes();
27
28impl Engine {
29  /// merge data directories, produce valid data and create hint file
30  pub fn merge(&self) -> Result<()> {
31    // if engine is empty, just return
32    if self.is_engine_empty() {
33      return Ok(());
34    }
35
36    // if merge is running, just return
37    let lock = self.merging_lock.try_lock();
38    if lock.is_none() {
39      return Err(Errors::MergeInProgress);
40    }
41
42    // determine if the merge is necessary
43    let reclaim_size = self.reclaim_size.load(Ordering::SeqCst);
44    let total_size = util::file::dir_disk_size(&self.options.dir_path);
45    let ratio = reclaim_size as f32 / total_size as f32;
46    if ratio < self.options.file_merge_threshold {
47      return Err(Errors::MergeThresholdUnreached);
48    }
49
50    let available_space = util::file::available_disk_space();
51    if total_size - reclaim_size as u64 >= available_space {
52      return Err(Errors::MergeNoEnoughSpace);
53    }
54
55    let merge_path = get_merge_path(&self.options.dir_path);
56
57    // if dir exists, remove it
58    if merge_path.is_dir() {
59      fs::remove_dir_all(merge_path.clone()).unwrap();
60    }
61
62    // create merge dir
63    if let Err(e) = fs::create_dir(merge_path.clone()) {
64      error!("fail to create merge path {}", e);
65      return Err(Errors::FailedToCreateDatabaseDir);
66    }
67
68    // Retrieve all data files for merging
69    let merge_files = self.rotate_merge_files()?;
70
71    // open a new temporary database instance for merging
72    let mut merge_db_opts = Options::default();
73    merge_db_opts.dir_path = merge_path.clone();
74    merge_db_opts.data_file_size = self.options.data_file_size;
75    let merge_db = Engine::open(merge_db_opts)?;
76
77    // open hint file
78    let hint_file = DataFile::new_hint_file(&merge_path)?;
79
80    // iterate over all data files and rewrite valid files
81    for data_file in merge_files.iter() {
82      let mut offset = 0;
83      loop {
84        let (mut log_record, size) = match data_file.read_log_record(offset) {
85          Ok(result) => (result.record, result.size),
86          Err(e) => {
87            if e == Errors::ReadDataFileEOF {
88              break;
89            }
90            return Err(e);
91          }
92        };
93
94        // deserialize log record and get real key
95        let (real_key, _) = parse_log_record_key(log_record.key.clone());
96        if let Some(index_pos) = self.index.get(real_key.clone()) {
97          // if file id and offset are the same, which means the record is valid
98          if index_pos.file_id == data_file.get_file_id() && index_pos.offset == offset {
99            // remove transaction sequence number
100            log_record.key = log_record_key_with_seq(real_key.clone(), NON_TXN_SEQ_NO);
101            let log_record_pos = merge_db.append_log_record(&mut log_record)?;
102            // update hint file
103            hint_file.write_hint_record(real_key.clone(), log_record_pos)?;
104          }
105        }
106        offset += size as u64;
107      }
108    }
109
110    // sync all files
111    merge_db.sync()?;
112    hint_file.sync()?;
113
114    // get latest unmerged file id
115    let non_merge_file_id = merge_files.last().unwrap().get_file_id() + 1;
116    let merge_fin_file = DataFile::new_merge_fin_file(&merge_path)?;
117    let merge_fin_record = LogRecord {
118      key: MERGE_FIN_KEY.to_vec(),
119      value: non_merge_file_id.to_string().into_bytes(),
120      rec_type: LogRecordType::Normal,
121    };
122    let enc_record = merge_fin_record.encode();
123    merge_fin_file.write(&enc_record)?;
124    merge_fin_file.sync()?;
125
126    Ok(())
127  }
128
129  fn is_engine_empty(&self) -> bool {
130    let active_file = self.active_data_file.read();
131    let old_files = self.old_data_files.read();
132    active_file.get_write_off() == 0 && old_files.len() == 0
133  }
134
135  fn rotate_merge_files(&self) -> Result<Vec<DataFile>> {
136    // retrieve old data files id
137    let mut merge_file_ids = Vec::new();
138    let mut old_files = self.old_data_files.write();
139    for fid in old_files.keys() {
140      merge_file_ids.push(*fid);
141    }
142
143    // create a new active file for writing
144    let mut active_file = self.active_data_file.write();
145
146    // sync active file
147    active_file.sync()?;
148    let active_file_id = active_file.get_file_id();
149    let new_active_file = DataFile::new(
150      &self.options.dir_path,
151      active_file_id + 1,
152      IOManagerType::StandardFileIO,
153    )?;
154    *active_file = new_active_file;
155
156    // load current active data file to old data files
157    let old_file = DataFile::new(
158      &self.options.dir_path,
159      active_file_id,
160      IOManagerType::StandardFileIO,
161    )?;
162    old_files.insert(active_file_id, old_file);
163
164    // load id to merge file ids list
165    merge_file_ids.push(active_file_id);
166
167    // sort for an ascending merge order
168    merge_file_ids.sort();
169
170    // retrieve data files
171    let mut merge_files = Vec::new();
172    for file_id in merge_file_ids {
173      let data_file = DataFile::new(
174        &self.options.dir_path,
175        file_id,
176        IOManagerType::StandardFileIO,
177      )?;
178      merge_files.push(data_file);
179    }
180
181    Ok(merge_files)
182  }
183
184  /// load index from hint file
185  pub(crate) fn load_index_from_hint_file(&self) -> Result<()> {
186    let hint_file_name = self.options.dir_path.join(HINT_FILE_NAME);
187
188    // if hint file doesn't exist, just return
189    if !hint_file_name.is_file() {
190      return Ok(());
191    }
192
193    let hint_file = DataFile::new_hint_file(&self.options.dir_path)?;
194    let mut offset = 0;
195    loop {
196      let (log_record, size) = match hint_file.read_log_record(offset) {
197        Ok(result) => (result.record, result.size),
198        Err(e) => {
199          if e == Errors::ReadDataFileEOF {
200            break;
201          }
202          return Err(e);
203        }
204      };
205
206      // deserialize log record and get real key
207      let log_record_pos = decode_log_record_pos(log_record.value);
208      self.index.put(log_record.key, log_record_pos);
209
210      offset += size as u64;
211    }
212
213    Ok(())
214  }
215}
216
217fn get_merge_path<P>(dir_path: P) -> PathBuf
218where
219  P: AsRef<Path>,
220{
221  let file_name = dir_path.as_ref().file_name().unwrap();
222  let merge_name = format!("{}-{}", file_name.to_str().unwrap(), MERGE_DIR_NAME);
223  let parent = dir_path.as_ref().parent().unwrap();
224  parent.to_path_buf().join(merge_name)
225}
226
227// load merge files
228pub(crate) fn load_merge_files<P>(dir_path: P) -> Result<()>
229where
230  P: AsRef<Path>,
231{
232  let merge_path = get_merge_path(&dir_path);
233  // merge never happened, just return
234  if !merge_path.is_dir() {
235    return Ok(());
236  }
237
238  let dir = match fs::read_dir(&merge_path) {
239    Ok(dir) => dir,
240    Err(e) => {
241      error!("fail to read merge dir: {}", e);
242      return Err(Errors::FailedToReadDatabaseDir);
243    }
244  };
245
246  // check if merge finished file exists
247  let mut merge_file_names = Vec::new();
248  let mut merge_finished = false;
249  for file in dir.flatten() {
250    let file_os_str = file.file_name();
251    let file_name = file_os_str.to_str().unwrap();
252
253    if file_name.ends_with(MERGE_FINISHED_FILE_NAME) {
254      merge_finished = true;
255    }
256
257    if file_name.ends_with(SEQ_NO_FILE_NAME) {
258      continue;
259    }
260
261    if file_name.ends_with(FILE_LOCK_NAME) {
262      continue;
263    }
264
265    // data file volume is 0 and ends with .data, just skip
266    let meta = file.metadata().unwrap();
267    if file_name.ends_with(DATA_FILE_NAME_SUFFIX) && meta.len() == 0 {
268      continue;
269    }
270
271    merge_file_names.push(file.file_name());
272  }
273
274  // if merge doesn't finish, remove merge dir and return
275  if !merge_finished {
276    fs::remove_dir_all(merge_path.clone()).unwrap();
277    return Ok(());
278  }
279
280  // open merge finished files, get the latest unmerged file id
281  let merge_fin_file = DataFile::new_merge_fin_file(&merge_path)?;
282  let merge_fin_record = merge_fin_file.read_log_record(0)?;
283  let v = String::from_utf8(merge_fin_record.record.value).unwrap();
284  let non_merge_file_id = v.parse::<u32>().unwrap();
285
286  // remove old data files
287  for fid in 0..non_merge_file_id {
288    let file = get_data_file_name(&dir_path, fid);
289    if file.is_file() {
290      fs::remove_file(file).unwrap();
291    }
292  }
293
294  // move temporary merge files to database dir
295  for file_name in merge_file_names {
296    let src_path = merge_path.join(&file_name);
297    let dst_path = dir_path.as_ref().join(&file_name);
298    fs::rename(src_path, dst_path).unwrap();
299  }
300
301  // remove merge dir
302  fs::remove_dir_all(merge_path.clone()).unwrap();
303
304  Ok(())
305}
306
307#[cfg(test)]
308mod tests {
309  use std::{sync::Arc, thread};
310
311  use super::*;
312  use crate::util::rand_kv::{get_test_key, get_test_value};
313  use bytes::Bytes;
314
315  #[test]
316  fn test_merge_1() {
317    // case for no data files
318    let mut opts = Options::default();
319    opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-1");
320    opts.data_file_size = 32 * 1024 * 1024;
321
322    let engine = Engine::open(opts.clone()).expect("failed to open engine");
323
324    let res1 = engine.merge();
325    assert!(res1.is_ok());
326
327    // delete tested files
328    std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
329  }
330
331  #[test]
332  fn test_merge_2() {
333    // case for all valid data files
334    let mut opts = Options::default();
335    opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-2");
336    opts.data_file_size = 32 * 1024 * 1024;
337    opts.file_merge_threshold = 0 as f32;
338    let engine = Engine::open(opts.clone()).expect("failed to open engine");
339
340    for i in 0..50000 {
341      let put_res = engine.put(get_test_key(i), get_test_value(i));
342      assert!(put_res.is_ok());
343    }
344    let res1 = engine.merge();
345    assert!(res1.is_ok());
346
347    // restart engine
348    std::mem::drop(engine);
349
350    let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
351    let keys = engine2.list_keys().unwrap();
352    assert_eq!(keys.len(), 50000);
353    for i in 0..50000 {
354      let get_res = engine2.get(get_test_key(i));
355      assert!(get_res.ok().unwrap().len() > 0);
356    }
357
358    // delete tested files
359    std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
360  }
361
362  #[test]
363  fn test_merge_3() {
364    // partial valid data files, and delete some data cases
365    let mut opts = Options::default();
366    opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-3");
367    opts.data_file_size = 32 * 1024 * 1024;
368    opts.file_merge_threshold = 0 as f32;
369    let engine = Engine::open(opts.clone()).expect("failed to open engine");
370
371    for i in 0..50000 {
372      let put_res = engine.put(get_test_key(i), get_test_value(i));
373      assert!(put_res.is_ok());
374    }
375
376    for i in 0..10000 {
377      let put_res = engine.put(get_test_key(i), Bytes::from("new value in merge"));
378      assert!(put_res.is_ok());
379    }
380
381    for i in 40000..50000 {
382      let del_res = engine.delete(get_test_key(i));
383      assert!(del_res.is_ok());
384    }
385
386    let res1 = engine.merge();
387    assert!(res1.is_ok());
388
389    // restart engine
390    std::mem::drop(engine);
391
392    let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
393    let keys = engine2.list_keys().unwrap();
394    assert_eq!(keys.len(), 40000);
395
396    for i in 0..10000 {
397      let get_res = engine2.get(get_test_key(i));
398      assert_eq!(Bytes::from("new value in merge"), get_res.ok().unwrap());
399    }
400
401    //delete tested files
402    std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
403  }
404
405  #[test]
406  fn test_merge_4() {
407    let mut opts = Options::default();
408    opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-4");
409    opts.data_file_size = 32 * 1024 * 1024;
410    opts.file_merge_threshold = 0 as f32;
411    let engine = Engine::open(opts.clone()).expect("failed to open engine");
412
413    for i in 0..50000 {
414      let put_res = engine.put(get_test_key(i), get_test_value(i));
415      assert!(put_res.is_ok());
416      let del_res = engine.delete(get_test_key(i));
417      assert!(del_res.is_ok());
418    }
419
420    let res1 = engine.merge();
421    assert!(res1.is_ok());
422
423    // restart engine
424    std::mem::drop(engine);
425
426    let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
427    let keys = engine2.list_keys().unwrap();
428    assert_eq!(keys.len(), 0);
429
430    for i in 0..50000 {
431      let get_res = engine2.get(get_test_key(i));
432      assert_eq!(Errors::KeyNotFound, get_res.err().unwrap());
433    }
434
435    // delete tested files
436    std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
437  }
438
439  #[test]
440  fn test_merge_5() {
441    // write and delete process occurs when merging
442    let mut opts = Options::default();
443    opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-5");
444    opts.data_file_size = 32 * 1024 * 1024;
445    opts.file_merge_threshold = 0 as f32;
446    let engine = Engine::open(opts.clone()).expect("failed to open engine");
447
448    for i in 0..50000 {
449      let put_res = engine.put(get_test_key(i), get_test_value(i));
450      assert!(put_res.is_ok());
451    }
452
453    for i in 0..10000 {
454      let put_res = engine.put(get_test_key(i), Bytes::from("new value in merge"));
455      assert!(put_res.is_ok());
456    }
457    for i in 40000..50000 {
458      let del_res = engine.delete(get_test_key(i));
459      assert!(del_res.is_ok());
460    }
461
462    let eng = Arc::new(engine);
463
464    let mut handles = vec![];
465    let eng1 = eng.clone();
466    let handle1 = thread::spawn(move || {
467      for i in 60000..100000 {
468        let put_res = eng1.put(get_test_key(i), get_test_value(i));
469        assert!(put_res.is_ok());
470      }
471    });
472    handles.push(handle1);
473
474    let eng2 = eng.clone();
475    let handle2 = thread::spawn(move || {
476      let merge_res = eng2.merge();
477      assert!(merge_res.is_ok());
478    });
479    handles.push(handle2);
480
481    for handle in handles {
482      handle.join().unwrap();
483    }
484
485    // restart engine
486    std::mem::drop(eng);
487
488    let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
489    let keys = engine2.list_keys().unwrap();
490
491    assert_eq!(keys.len(), 80000);
492
493    // delete tested files
494    std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
495  }
496}