1#![allow(clippy::field_reassign_with_default)]
2use std::{
3 fs,
4 path::{Path, PathBuf},
5 sync::atomic::Ordering,
6};
7
8use log::error;
9
10use crate::{
11 batch::{log_record_key_with_seq, parse_log_record_key, NON_TXN_SEQ_NO},
12 data::{
13 data_file::{
14 get_data_file_name, DataFile, DATA_FILE_NAME_SUFFIX, HINT_FILE_NAME,
15 MERGE_FINISHED_FILE_NAME, SEQ_NO_FILE_NAME,
16 },
17 log_record::{decode_log_record_pos, LogRecord, LogRecordType},
18 },
19 db::{Engine, FILE_LOCK_NAME},
20 errors::{Errors, Result},
21 option::{IOManagerType, Options},
22 util,
23};
24
25const MERGE_DIR_NAME: &str = "merge";
26const MERGE_FIN_KEY: &[u8] = "merge.finished".as_bytes();
27
28impl Engine {
29 pub fn merge(&self) -> Result<()> {
31 if self.is_engine_empty() {
33 return Ok(());
34 }
35
36 let lock = self.merging_lock.try_lock();
38 if lock.is_none() {
39 return Err(Errors::MergeInProgress);
40 }
41
42 let reclaim_size = self.reclaim_size.load(Ordering::SeqCst);
44 let total_size = util::file::dir_disk_size(&self.options.dir_path);
45 let ratio = reclaim_size as f32 / total_size as f32;
46 if ratio < self.options.file_merge_threshold {
47 return Err(Errors::MergeThresholdUnreached);
48 }
49
50 let available_space = util::file::available_disk_space();
51 if total_size - reclaim_size as u64 >= available_space {
52 return Err(Errors::MergeNoEnoughSpace);
53 }
54
55 let merge_path = get_merge_path(&self.options.dir_path);
56
57 if merge_path.is_dir() {
59 fs::remove_dir_all(merge_path.clone()).unwrap();
60 }
61
62 if let Err(e) = fs::create_dir(merge_path.clone()) {
64 error!("fail to create merge path {}", e);
65 return Err(Errors::FailedToCreateDatabaseDir);
66 }
67
68 let merge_files = self.rotate_merge_files()?;
70
71 let mut merge_db_opts = Options::default();
73 merge_db_opts.dir_path = merge_path.clone();
74 merge_db_opts.data_file_size = self.options.data_file_size;
75 let merge_db = Engine::open(merge_db_opts)?;
76
77 let hint_file = DataFile::new_hint_file(&merge_path)?;
79
80 for data_file in merge_files.iter() {
82 let mut offset = 0;
83 loop {
84 let (mut log_record, size) = match data_file.read_log_record(offset) {
85 Ok(result) => (result.record, result.size),
86 Err(e) => {
87 if e == Errors::ReadDataFileEOF {
88 break;
89 }
90 return Err(e);
91 }
92 };
93
94 let (real_key, _) = parse_log_record_key(log_record.key.clone());
96 if let Some(index_pos) = self.index.get(real_key.clone()) {
97 if index_pos.file_id == data_file.get_file_id() && index_pos.offset == offset {
99 log_record.key = log_record_key_with_seq(real_key.clone(), NON_TXN_SEQ_NO);
101 let log_record_pos = merge_db.append_log_record(&mut log_record)?;
102 hint_file.write_hint_record(real_key.clone(), log_record_pos)?;
104 }
105 }
106 offset += size as u64;
107 }
108 }
109
110 merge_db.sync()?;
112 hint_file.sync()?;
113
114 let non_merge_file_id = merge_files.last().unwrap().get_file_id() + 1;
116 let merge_fin_file = DataFile::new_merge_fin_file(&merge_path)?;
117 let merge_fin_record = LogRecord {
118 key: MERGE_FIN_KEY.to_vec(),
119 value: non_merge_file_id.to_string().into_bytes(),
120 rec_type: LogRecordType::Normal,
121 };
122 let enc_record = merge_fin_record.encode();
123 merge_fin_file.write(&enc_record)?;
124 merge_fin_file.sync()?;
125
126 Ok(())
127 }
128
129 fn is_engine_empty(&self) -> bool {
130 let active_file = self.active_data_file.read();
131 let old_files = self.old_data_files.read();
132 active_file.get_write_off() == 0 && old_files.len() == 0
133 }
134
135 fn rotate_merge_files(&self) -> Result<Vec<DataFile>> {
136 let mut merge_file_ids = Vec::new();
138 let mut old_files = self.old_data_files.write();
139 for fid in old_files.keys() {
140 merge_file_ids.push(*fid);
141 }
142
143 let mut active_file = self.active_data_file.write();
145
146 active_file.sync()?;
148 let active_file_id = active_file.get_file_id();
149 let new_active_file = DataFile::new(
150 &self.options.dir_path,
151 active_file_id + 1,
152 IOManagerType::StandardFileIO,
153 )?;
154 *active_file = new_active_file;
155
156 let old_file = DataFile::new(
158 &self.options.dir_path,
159 active_file_id,
160 IOManagerType::StandardFileIO,
161 )?;
162 old_files.insert(active_file_id, old_file);
163
164 merge_file_ids.push(active_file_id);
166
167 merge_file_ids.sort();
169
170 let mut merge_files = Vec::new();
172 for file_id in merge_file_ids {
173 let data_file = DataFile::new(
174 &self.options.dir_path,
175 file_id,
176 IOManagerType::StandardFileIO,
177 )?;
178 merge_files.push(data_file);
179 }
180
181 Ok(merge_files)
182 }
183
184 pub(crate) fn load_index_from_hint_file(&self) -> Result<()> {
186 let hint_file_name = self.options.dir_path.join(HINT_FILE_NAME);
187
188 if !hint_file_name.is_file() {
190 return Ok(());
191 }
192
193 let hint_file = DataFile::new_hint_file(&self.options.dir_path)?;
194 let mut offset = 0;
195 loop {
196 let (log_record, size) = match hint_file.read_log_record(offset) {
197 Ok(result) => (result.record, result.size),
198 Err(e) => {
199 if e == Errors::ReadDataFileEOF {
200 break;
201 }
202 return Err(e);
203 }
204 };
205
206 let log_record_pos = decode_log_record_pos(log_record.value);
208 self.index.put(log_record.key, log_record_pos);
209
210 offset += size as u64;
211 }
212
213 Ok(())
214 }
215}
216
217fn get_merge_path<P>(dir_path: P) -> PathBuf
218where
219 P: AsRef<Path>,
220{
221 let file_name = dir_path.as_ref().file_name().unwrap();
222 let merge_name = format!("{}-{}", file_name.to_str().unwrap(), MERGE_DIR_NAME);
223 let parent = dir_path.as_ref().parent().unwrap();
224 parent.to_path_buf().join(merge_name)
225}
226
227pub(crate) fn load_merge_files<P>(dir_path: P) -> Result<()>
229where
230 P: AsRef<Path>,
231{
232 let merge_path = get_merge_path(&dir_path);
233 if !merge_path.is_dir() {
235 return Ok(());
236 }
237
238 let dir = match fs::read_dir(&merge_path) {
239 Ok(dir) => dir,
240 Err(e) => {
241 error!("fail to read merge dir: {}", e);
242 return Err(Errors::FailedToReadDatabaseDir);
243 }
244 };
245
246 let mut merge_file_names = Vec::new();
248 let mut merge_finished = false;
249 for file in dir.flatten() {
250 let file_os_str = file.file_name();
251 let file_name = file_os_str.to_str().unwrap();
252
253 if file_name.ends_with(MERGE_FINISHED_FILE_NAME) {
254 merge_finished = true;
255 }
256
257 if file_name.ends_with(SEQ_NO_FILE_NAME) {
258 continue;
259 }
260
261 if file_name.ends_with(FILE_LOCK_NAME) {
262 continue;
263 }
264
265 let meta = file.metadata().unwrap();
267 if file_name.ends_with(DATA_FILE_NAME_SUFFIX) && meta.len() == 0 {
268 continue;
269 }
270
271 merge_file_names.push(file.file_name());
272 }
273
274 if !merge_finished {
276 fs::remove_dir_all(merge_path.clone()).unwrap();
277 return Ok(());
278 }
279
280 let merge_fin_file = DataFile::new_merge_fin_file(&merge_path)?;
282 let merge_fin_record = merge_fin_file.read_log_record(0)?;
283 let v = String::from_utf8(merge_fin_record.record.value).unwrap();
284 let non_merge_file_id = v.parse::<u32>().unwrap();
285
286 for fid in 0..non_merge_file_id {
288 let file = get_data_file_name(&dir_path, fid);
289 if file.is_file() {
290 fs::remove_file(file).unwrap();
291 }
292 }
293
294 for file_name in merge_file_names {
296 let src_path = merge_path.join(&file_name);
297 let dst_path = dir_path.as_ref().join(&file_name);
298 fs::rename(src_path, dst_path).unwrap();
299 }
300
301 fs::remove_dir_all(merge_path.clone()).unwrap();
303
304 Ok(())
305}
306
307#[cfg(test)]
308mod tests {
309 use std::{sync::Arc, thread};
310
311 use super::*;
312 use crate::util::rand_kv::{get_test_key, get_test_value};
313 use bytes::Bytes;
314
315 #[test]
316 fn test_merge_1() {
317 let mut opts = Options::default();
319 opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-1");
320 opts.data_file_size = 32 * 1024 * 1024;
321
322 let engine = Engine::open(opts.clone()).expect("failed to open engine");
323
324 let res1 = engine.merge();
325 assert!(res1.is_ok());
326
327 std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
329 }
330
331 #[test]
332 fn test_merge_2() {
333 let mut opts = Options::default();
335 opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-2");
336 opts.data_file_size = 32 * 1024 * 1024;
337 opts.file_merge_threshold = 0 as f32;
338 let engine = Engine::open(opts.clone()).expect("failed to open engine");
339
340 for i in 0..50000 {
341 let put_res = engine.put(get_test_key(i), get_test_value(i));
342 assert!(put_res.is_ok());
343 }
344 let res1 = engine.merge();
345 assert!(res1.is_ok());
346
347 std::mem::drop(engine);
349
350 let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
351 let keys = engine2.list_keys().unwrap();
352 assert_eq!(keys.len(), 50000);
353 for i in 0..50000 {
354 let get_res = engine2.get(get_test_key(i));
355 assert!(get_res.ok().unwrap().len() > 0);
356 }
357
358 std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
360 }
361
362 #[test]
363 fn test_merge_3() {
364 let mut opts = Options::default();
366 opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-3");
367 opts.data_file_size = 32 * 1024 * 1024;
368 opts.file_merge_threshold = 0 as f32;
369 let engine = Engine::open(opts.clone()).expect("failed to open engine");
370
371 for i in 0..50000 {
372 let put_res = engine.put(get_test_key(i), get_test_value(i));
373 assert!(put_res.is_ok());
374 }
375
376 for i in 0..10000 {
377 let put_res = engine.put(get_test_key(i), Bytes::from("new value in merge"));
378 assert!(put_res.is_ok());
379 }
380
381 for i in 40000..50000 {
382 let del_res = engine.delete(get_test_key(i));
383 assert!(del_res.is_ok());
384 }
385
386 let res1 = engine.merge();
387 assert!(res1.is_ok());
388
389 std::mem::drop(engine);
391
392 let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
393 let keys = engine2.list_keys().unwrap();
394 assert_eq!(keys.len(), 40000);
395
396 for i in 0..10000 {
397 let get_res = engine2.get(get_test_key(i));
398 assert_eq!(Bytes::from("new value in merge"), get_res.ok().unwrap());
399 }
400
401 std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
403 }
404
405 #[test]
406 fn test_merge_4() {
407 let mut opts = Options::default();
408 opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-4");
409 opts.data_file_size = 32 * 1024 * 1024;
410 opts.file_merge_threshold = 0 as f32;
411 let engine = Engine::open(opts.clone()).expect("failed to open engine");
412
413 for i in 0..50000 {
414 let put_res = engine.put(get_test_key(i), get_test_value(i));
415 assert!(put_res.is_ok());
416 let del_res = engine.delete(get_test_key(i));
417 assert!(del_res.is_ok());
418 }
419
420 let res1 = engine.merge();
421 assert!(res1.is_ok());
422
423 std::mem::drop(engine);
425
426 let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
427 let keys = engine2.list_keys().unwrap();
428 assert_eq!(keys.len(), 0);
429
430 for i in 0..50000 {
431 let get_res = engine2.get(get_test_key(i));
432 assert_eq!(Errors::KeyNotFound, get_res.err().unwrap());
433 }
434
435 std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
437 }
438
439 #[test]
440 fn test_merge_5() {
441 let mut opts = Options::default();
443 opts.dir_path = PathBuf::from("/tmp/bitkv-rs-merge-5");
444 opts.data_file_size = 32 * 1024 * 1024;
445 opts.file_merge_threshold = 0 as f32;
446 let engine = Engine::open(opts.clone()).expect("failed to open engine");
447
448 for i in 0..50000 {
449 let put_res = engine.put(get_test_key(i), get_test_value(i));
450 assert!(put_res.is_ok());
451 }
452
453 for i in 0..10000 {
454 let put_res = engine.put(get_test_key(i), Bytes::from("new value in merge"));
455 assert!(put_res.is_ok());
456 }
457 for i in 40000..50000 {
458 let del_res = engine.delete(get_test_key(i));
459 assert!(del_res.is_ok());
460 }
461
462 let eng = Arc::new(engine);
463
464 let mut handles = vec![];
465 let eng1 = eng.clone();
466 let handle1 = thread::spawn(move || {
467 for i in 60000..100000 {
468 let put_res = eng1.put(get_test_key(i), get_test_value(i));
469 assert!(put_res.is_ok());
470 }
471 });
472 handles.push(handle1);
473
474 let eng2 = eng.clone();
475 let handle2 = thread::spawn(move || {
476 let merge_res = eng2.merge();
477 assert!(merge_res.is_ok());
478 });
479 handles.push(handle2);
480
481 for handle in handles {
482 handle.join().unwrap();
483 }
484
485 std::mem::drop(eng);
487
488 let engine2 = Engine::open(opts.clone()).expect("failed to open engine");
489 let keys = engine2.list_keys().unwrap();
490
491 assert_eq!(keys.len(), 80000);
492
493 std::fs::remove_dir_all(opts.clone().dir_path).expect("failed to remove path");
495 }
496}