1#![allow(clippy::redundant_closure)]
2use crate::{
3 batch::{log_record_key_with_seq, parse_log_record_key, NON_TXN_SEQ_NO},
4 data::{
5 data_file::{DataFile, DATA_FILE_NAME_SUFFIX, MERGE_FINISHED_FILE_NAME, SEQ_NO_FILE_NAME},
6 log_record::{LogRecord, LogRecordPos, LogRecordType, TransactionRecord},
7 },
8 errors::{Errors, Result},
9 index,
10 merge::load_merge_files,
11 option::{IOManagerType, IndexType, Options},
12 util,
13};
14use bytes::Bytes;
15use fs2::FileExt;
16use log::{error, warn};
17use parking_lot::{Mutex, RwLock};
18use std::{
19 collections::HashMap,
20 fs::{self, File},
21 path::Path,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc,
25 },
26};
27
28const INITIAL_FILE_ID: u32 = 0;
29const SEQ_NO_KEY: &str = "seq.no";
30pub(crate) const FILE_LOCK_NAME: &str = "flock";
31
32pub enum SeqNoExist {
33 Yes(usize),
34 None,
35}
36
37pub struct Engine {
39 pub(crate) options: Arc<Options>,
40 pub(crate) active_data_file: Arc<RwLock<DataFile>>, pub(crate) old_data_files: Arc<RwLock<HashMap<u32, DataFile>>>, pub(crate) index: Box<dyn index::Indexer>, file_ids: Vec<u32>, pub(crate) batch_commit_lock: Mutex<()>, pub(crate) seq_no: Arc<AtomicUsize>, pub(crate) merging_lock: Mutex<()>, pub(crate) seq_file_exists: bool, pub(crate) is_initial: bool, lock_file: File, bytes_write: Arc<AtomicUsize>, pub(crate) reclaim_size: Arc<AtomicUsize>, }
53
54#[derive(Debug, Clone)]
56pub struct Stat {
57 pub key_num: usize,
59
60 pub data_file_num: usize,
62
63 pub reclaim_size: usize,
65
66 pub disk_size: u64,
68}
69
70impl Engine {
71 pub fn open(opts: Options) -> Result<Self> {
73 if let Some(e) = check_options(&opts) {
75 return Err(e);
76 };
77 let mut is_initial = false;
78 let options = Arc::new(opts);
79
80 let dir_path = &options.dir_path;
82 if !dir_path.is_dir() {
83 is_initial = true;
84 if let Err(e) = fs::create_dir(dir_path.as_path()) {
85 warn!("failed to create database directory error: {}", e);
86 return Err(Errors::FailedToCreateDatabaseDir);
87 };
88 }
89
90 let lock_file = fs::OpenOptions::new()
92 .read(true)
93 .create(true)
94 .append(true)
95 .open(dir_path.join(FILE_LOCK_NAME))
96 .unwrap();
97 if lock_file.try_lock_exclusive().is_err() {
98 return Err(Errors::DatabaseIsUsing);
99 }
100
101 let entry = fs::read_dir(dir_path).unwrap();
102 if entry.count() == 0 {
103 is_initial = true;
104 }
105 load_merge_files(dir_path)?;
107
108 let mut data_files = load_data_files(dir_path, options.mmap_at_startup)?;
110
111 let mut file_ids = Vec::new();
113 for v in data_files.iter() {
114 file_ids.push(v.get_file_id());
115 }
116 data_files.reverse();
118
119 let mut older_files = HashMap::new();
121 if data_files.len() > 1 {
122 for _ in 0..=data_files.len() - 2 {
123 let file = data_files.pop().unwrap();
124 older_files.insert(file.get_file_id(), file);
125 }
126 }
127
128 let active_file = match data_files.pop() {
130 Some(v) => v,
131 None => DataFile::new(dir_path, INITIAL_FILE_ID, IOManagerType::StandardFileIO)?,
132 };
133
134 let mut engine = Self {
136 options: options.clone(),
137 active_data_file: Arc::new(RwLock::new(active_file)),
138 old_data_files: Arc::new(RwLock::new(older_files)),
139 index: index::new_indexer(&options.index_type, &options.dir_path),
140 file_ids,
141 batch_commit_lock: Mutex::new(()),
142 seq_no: Arc::new(AtomicUsize::new(1)),
143 merging_lock: Mutex::new(()),
144 seq_file_exists: false,
145 is_initial,
146 lock_file,
147 bytes_write: Arc::new(AtomicUsize::new(0)),
148 reclaim_size: Arc::new(AtomicUsize::new(0)),
149 };
150
151 match engine.options.index_type {
153 IndexType::BPlusTree => {
154 let (is_exists, seq_no) = engine.load_seq_no();
156 if is_exists {
157 engine.seq_no.store(seq_no, Ordering::SeqCst);
158 engine.seq_file_exists = is_exists;
159 }
160
161 let active_file = engine.active_data_file.write();
163 active_file.set_write_off(active_file.file_size());
164 }
165 _ => {
166 engine.load_index_from_hint_file()?;
168
169 let curr_seq_no = engine.load_index_from_data_files()?;
171
172 if curr_seq_no > 0 {
174 engine
175 .seq_no
176 .store(curr_seq_no + 1, std::sync::atomic::Ordering::Relaxed);
177 }
178
179 if engine.options.mmap_at_startup {
181 engine.reset_io_type();
182 }
183 }
184 }
185
186 Ok(engine)
187 }
188
189 pub fn close(&self) -> Result<()> {
191 if !self.options.dir_path.is_dir() {
193 return Ok(());
194 }
195 let seq_no_file = DataFile::new_seq_no_file(&self.options.dir_path)?;
197 let seq_no = self.seq_no.load(Ordering::SeqCst);
198 let record = LogRecord {
199 key: SEQ_NO_KEY.as_bytes().to_vec(),
200 value: seq_no.to_string().into(),
201 rec_type: LogRecordType::Normal,
202 };
203 seq_no_file.write(&record.encode())?;
204 seq_no_file.sync()?;
205
206 let read_guard = self.active_data_file.read();
207 read_guard.sync()?;
208
209 self.lock_file.unlock().unwrap();
211
212 Ok(())
213 }
214
215 pub fn sync(&self) -> Result<()> {
217 let read_guard = self.active_data_file.read();
218 read_guard.sync()
219 }
220
221 pub fn get_engine_stat(&self) -> Result<Stat> {
222 let keys = self.list_keys()?;
223 let old_files = self.old_data_files.read();
224
225 Ok(Stat {
226 key_num: keys.len(),
227 data_file_num: old_files.len() + 1,
228 reclaim_size: self.reclaim_size.load(Ordering::SeqCst),
229 disk_size: util::file::dir_disk_size(&self.options.dir_path),
230 })
231 }
232
233 pub fn backup<P>(&self, dir_path: P) -> Result<()>
235 where
236 P: AsRef<Path>,
237 {
238 let exclude = &[FILE_LOCK_NAME];
239 if let Err(e) = util::file::copy_dir(
240 &self.options.dir_path,
241 &dir_path.as_ref().to_path_buf(),
242 exclude,
243 ) {
244 log::error!("failed to copy data directory error: {}", e);
245 return Err(Errors::FailedToCopyDirectory);
246 }
247 Ok(())
248 }
249
250 pub fn put(&self, key: Bytes, value: Bytes) -> Result<()> {
252 if key.is_empty() {
254 return Err(Errors::KeyIsEmpty);
255 }
256
257 let mut record = LogRecord {
259 key: log_record_key_with_seq(key.to_vec(), NON_TXN_SEQ_NO),
260 value: value.to_vec(),
261 rec_type: LogRecordType::Normal,
262 };
263
264 let log_record_pos = self.append_log_record(&mut record)?;
266
267 if let Some(old_pos) = self.index.put(key.to_vec(), log_record_pos) {
269 self
270 .reclaim_size
271 .fetch_add(old_pos.size as usize, Ordering::SeqCst);
272 }
273 Ok(())
274 }
275
276 pub fn delete(&self, key: Bytes) -> Result<()> {
278 if key.is_empty() {
280 return Err(Errors::KeyIsEmpty);
281 }
282
283 let pos = self.index.get(key.to_vec());
285 if pos.is_none() {
286 return Ok(());
287 }
288
289 let mut record = LogRecord {
291 key: log_record_key_with_seq(key.to_vec(), NON_TXN_SEQ_NO),
292 value: Default::default(),
293 rec_type: LogRecordType::Deleted,
294 };
295
296 let pos = self.append_log_record(&mut record)?;
298 self
299 .reclaim_size
300 .fetch_add(pos.size as usize, Ordering::SeqCst);
301
302 if let Some(old_pos) = self.index.delete(key.to_vec()) {
304 self
305 .reclaim_size
306 .fetch_add(old_pos.size as usize, Ordering::SeqCst);
307 }
308 Ok(())
309 }
310
311 pub fn get(&self, key: Bytes) -> Result<Bytes> {
313 if key.is_empty() {
315 return Err(Errors::KeyIsEmpty);
316 }
317
318 let pos = self.index.get(key.to_vec());
320
321 if pos.is_none() {
323 return Err(Errors::KeyNotFound);
324 }
325
326 self.get_value_by_position(&pos.unwrap())
328 }
329
330 pub(crate) fn get_value_by_position(&self, log_record_pos: &LogRecordPos) -> Result<Bytes> {
332 let active_file = self.active_data_file.read();
334 let oldre_files = self.old_data_files.read();
335 let log_record = match active_file.get_file_id() == log_record_pos.file_id {
336 true => active_file.read_log_record(log_record_pos.offset)?.record,
337 false => {
338 let data_file = oldre_files.get(&log_record_pos.file_id);
339 if data_file.is_none() {
340 return Err(Errors::DataFileNotFound);
342 }
343 data_file
344 .unwrap()
345 .read_log_record(log_record_pos.offset)?
346 .record
347 }
348 };
349
350 if let LogRecordType::Deleted = log_record.rec_type {
352 return Err(Errors::KeyNotFound);
353 };
354
355 Ok(log_record.value.into())
357 }
358
359 pub(crate) fn append_log_record(&self, log_record: &mut LogRecord) -> Result<LogRecordPos> {
361 let dir_path = &self.options.dir_path;
362
363 let enc_record = log_record.encode();
365 let record_len = enc_record.len() as u64;
366
367 let mut active_file = self.active_data_file.write();
369 if active_file.get_write_off() + record_len > self.options.data_file_size {
370 active_file.sync()?;
372
373 let current_fid = active_file.get_file_id();
374
375 let mut old_files = self.old_data_files.write();
377 let old_file = DataFile::new(dir_path, current_fid, IOManagerType::StandardFileIO)?;
378 old_files.insert(current_fid, old_file);
379
380 let new_file = DataFile::new(dir_path, current_fid + 1, IOManagerType::StandardFileIO)?;
382 *active_file = new_file;
383 }
384
385 let write_off = active_file.get_write_off();
387 active_file.write(&enc_record)?;
388
389 let previous = self
390 .bytes_write
391 .fetch_add(enc_record.len(), Ordering::SeqCst);
392
393 let mut need_sync = self.options.sync_writes;
395 if !need_sync
396 && self.options.bytes_per_sync > 0
397 && previous + enc_record.len() >= self.options.bytes_per_sync
398 {
399 need_sync = true;
400 self.bytes_write.store(0, Ordering::SeqCst);
401 }
402
403 if need_sync {
404 active_file.sync()?;
405
406 self.bytes_write.store(0, Ordering::SeqCst);
407 }
408
409 Ok(LogRecordPos {
411 file_id: active_file.get_file_id(),
412 offset: write_off,
413 size: enc_record.len() as u32,
414 })
415 }
416
417 fn load_index_from_data_files(&self) -> Result<usize> {
421 let mut current_seq_no = NON_TXN_SEQ_NO;
422 if self.file_ids.is_empty() {
424 return Ok(current_seq_no);
425 }
426
427 let mut has_merged = false;
429 let mut non_merge_fid = 0;
430 let merge_fin_file = self.options.dir_path.join(MERGE_FINISHED_FILE_NAME);
431 if merge_fin_file.is_file() {
432 let merge_file = DataFile::new_merge_fin_file(&self.options.dir_path)?;
433 let merge_fin_record = merge_file.read_log_record(0)?;
434 let v = String::from_utf8(merge_fin_record.record.value).unwrap();
435
436 non_merge_fid = v.parse::<u32>().unwrap();
437 has_merged = true;
438 }
439
440 let mut transaction_records = HashMap::new();
442
443 let active_file = self.active_data_file.read();
444 let old_files = self.old_data_files.read();
445
446 for (i, file_id) in self.file_ids.iter().enumerate() {
448 if has_merged && *file_id < non_merge_fid {
450 continue;
451 }
452
453 let mut offset = 0;
454 loop {
455 let log_record_res = match *file_id == active_file.get_file_id() {
457 true => active_file.read_log_record(offset),
458 _ => {
459 let data_file = old_files.get(file_id).unwrap();
460 data_file.read_log_record(offset)
461 }
462 };
463
464 let (mut log_record, size) = match log_record_res {
465 Ok(result) => (result.record, result.size),
466 Err(e) => {
467 if e == Errors::ReadDataFileEOF {
468 break;
469 }
470 return Err(e);
471 }
472 };
473
474 let log_record_pos = LogRecordPos {
476 file_id: *file_id,
477 offset,
478 size: size as u32,
479 };
480
481 let (real_key, seq_no) = parse_log_record_key(log_record.key.clone());
483 if seq_no == NON_TXN_SEQ_NO {
485 self.update_index(real_key, log_record.rec_type, log_record_pos)?;
486 } else {
487 if log_record.rec_type == LogRecordType::TxnFinished {
489 let records: &Vec<TransactionRecord> = transaction_records.get(&seq_no).unwrap();
490 for txn_record in records.iter() {
491 self.update_index(
492 txn_record.record.key.clone(),
493 txn_record.record.rec_type,
494 txn_record.pos,
495 )?;
496 }
497 transaction_records.remove(&seq_no);
498 } else {
499 log_record.key = real_key;
500 transaction_records
501 .entry(seq_no)
502 .or_insert_with(|| Vec::new())
503 .push(TransactionRecord {
504 record: log_record,
505 pos: log_record_pos,
506 });
507 }
508 }
509
510 if seq_no > current_seq_no {
512 current_seq_no = seq_no;
513 }
514
515 offset += size as u64;
517 }
518
519 if i == self.file_ids.len() - 1 {
521 active_file.set_write_off(offset);
522 }
523 }
524 Ok(current_seq_no)
525 }
526
527 fn load_seq_no(&self) -> (bool, usize) {
529 let file_name = self.options.dir_path.join(SEQ_NO_FILE_NAME);
530 if !file_name.is_file() {
531 return (false, 0);
532 }
533 let seq_no_file = DataFile::new_seq_no_file(&self.options.dir_path).unwrap();
534 let record = match seq_no_file.read_log_record(0) {
535 Ok(res) => res.record,
536 Err(e) => panic!("failed to read seq_no: {}", e),
537 };
538 let v = String::from_utf8(record.value).unwrap();
539 let seq_no = v.parse::<usize>().unwrap();
540
541 fs::remove_file(file_name).unwrap();
543
544 (true, seq_no)
545 }
546
547 fn update_index(&self, key: Vec<u8>, rec_type: LogRecordType, pos: LogRecordPos) -> Result<()> {
555 if rec_type == LogRecordType::Normal {
556 if let Some(old_pos) = self.index.put(key.clone(), pos) {
557 self
559 .reclaim_size
560 .fetch_add(old_pos.size as usize, Ordering::SeqCst);
561 }
562 }
563
564 if rec_type == LogRecordType::Deleted {
565 let mut size = pos.size;
567 if let Some(old_pos) = self.index.delete(key) {
569 size += old_pos.size;
571 }
572 self.reclaim_size.fetch_add(size as usize, Ordering::SeqCst);
574 }
575 Ok(())
576 }
577
578 fn reset_io_type(&self) {
580 let mut active_file = self.active_data_file.write();
581 active_file.set_io_manager(&self.options.dir_path, IOManagerType::StandardFileIO);
582 let mut old_files = self.old_data_files.write();
583 for (_, file) in old_files.iter_mut() {
584 file.set_io_manager(&self.options.dir_path, IOManagerType::StandardFileIO);
585 }
586 }
587}
588
589impl Drop for Engine {
590 fn drop(&mut self) {
591 if let Err(e) = self.close() {
592 error!("error while closing engine {}", e);
593 }
594 }
595}
596
597fn load_data_files<P>(dir_path: P, use_mmap: bool) -> Result<Vec<DataFile>>
599where
600 P: AsRef<Path>,
601{
602 let dir = fs::read_dir(&dir_path);
604 if dir.is_err() {
605 return Err(Errors::FailedToReadDatabaseDir);
606 }
607
608 let mut file_ids: Vec<u32> = Vec::new();
609 let mut data_files: Vec<DataFile> = Vec::new();
610
611 for file in dir.unwrap().flatten() {
612 let file_os_str = file.file_name();
614 let file_name = file_os_str.to_str().unwrap();
615
616 if file_name.ends_with(DATA_FILE_NAME_SUFFIX) {
618 let splited_names: Vec<&str> = file_name.split('.').collect();
619 let file_id = match splited_names[0].parse::<u32>() {
620 Ok(fid) => fid,
621 Err(_) => {
622 return Err(Errors::DatabaseDirectoryCorrupted);
623 }
624 };
625
626 file_ids.push(file_id);
627 }
628 }
629
630 if file_ids.is_empty() {
632 return Ok(data_files);
633 }
634
635 file_ids.sort();
637
638 for file_id in file_ids.iter() {
640 let mut io_type = IOManagerType::StandardFileIO;
641 if use_mmap {
642 io_type = IOManagerType::MemoryMap;
643 }
644 let data_file = DataFile::new(&dir_path, *file_id, io_type)?;
645 data_files.push(data_file);
646 }
647 Ok(data_files)
648}
649
650fn check_options(opts: &Options) -> Option<Errors> {
651 let dir_path = opts.dir_path.to_str();
652 if dir_path.is_none() || dir_path.unwrap().is_empty() {
653 return Some(Errors::DirPathIsEmpty);
654 }
655
656 if opts.data_file_size == 0 {
657 return Some(Errors::DataFileSizeTooSmall);
658 }
659
660 if opts.file_merge_threshold < 0f32 || opts.file_merge_threshold > 1f32 {
661 return Some(Errors::InvalidMergeThreshold);
662 }
663
664 None
665}