1#![allow(unused_attributes)]
5
6use crate::db_iter::DBIterator;
7
8use crate::cmp::{Cmp, InternalKeyCmp};
9use crate::env::{Env, FileLock};
10use crate::error::{err, Result, StatusCode};
11use crate::filter::{BoxedFilterPolicy, InternalFilterPolicy};
12use crate::infolog::Logger;
13use crate::key_types::{parse_internal_key, InternalKey, LookupKey, ValueType};
14use crate::log::{LogReader, LogWriter};
15use crate::memtable::MemTable;
16use crate::merging_iter::MergingIter;
17use crate::options::Options;
18use crate::snapshot::{Snapshot, SnapshotList};
19use crate::table_builder::TableBuilder;
20use crate::table_cache::{table_file_name, TableCache};
21use crate::types::{
22 parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, SequenceNumber, Shared,
23 MAX_SEQUENCE_NUMBER, NUM_LEVELS,
24};
25use crate::version::Version;
26use crate::version_edit::VersionEdit;
27use crate::version_set::{
28 manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet,
29};
30use crate::write_batch::WriteBatch;
31
32use std::cmp::Ordering;
33use std::io::{self, BufWriter, Write};
34use std::mem;
35use std::ops::Drop;
36use std::path::Path;
37use std::path::PathBuf;
38use std::sync::Arc;
39
40pub struct DB {
43 name: PathBuf,
44 path: PathBuf,
45 lock: Option<FileLock>,
46
47 internal_cmp: Arc<Box<dyn Cmp>>,
48 fpol: InternalFilterPolicy<BoxedFilterPolicy>,
49 opt: Options,
50
51 mem: MemTable,
52 imm: Option<MemTable>,
53
54 log: Option<LogWriter<BufWriter<Box<dyn Write>>>>,
55 log_num: Option<FileNum>,
56 cache: Shared<TableCache>,
57 vset: Shared<VersionSet>,
58 snaps: SnapshotList,
59
60 cstats: [CompactionStats; NUM_LEVELS],
61}
62
63unsafe impl Send for DB {}
64
65impl DB {
66 fn new<P: AsRef<Path>>(name: P, mut opt: Options) -> DB {
70 let name = name.as_ref();
71 if opt.log.is_none() {
72 let log = open_info_log(opt.env.as_ref().as_ref(), name);
73 opt.log = Some(share(log));
74 }
75 let path = name.canonicalize().unwrap_or(name.to_owned());
76
77 let cache = share(TableCache::new(&name, opt.clone(), opt.max_open_files - 10));
78 let vset = VersionSet::new(&name, opt.clone(), cache.clone());
79
80 DB {
81 name: name.to_owned(),
82 path,
83 lock: None,
84 internal_cmp: Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))),
85 fpol: InternalFilterPolicy::new(opt.filter_policy.clone()),
86
87 mem: MemTable::new(opt.cmp.clone()),
88 imm: None,
89
90 opt,
91
92 log: None,
93 log_num: None,
94 cache,
95 vset: share(vset),
96 snaps: SnapshotList::new(),
97
98 cstats: Default::default(),
99 }
100 }
101
102 fn current(&self) -> Shared<Version> {
103 self.vset.borrow().current()
104 }
105
106 pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> {
112 let name = name.as_ref();
113 let mut db = DB::new(name, opt);
114 let mut ve = VersionEdit::new();
115 let save_manifest = db.recover(&mut ve)?;
116
117 if db.log.is_none() {
119 let lognum = db.vset.borrow_mut().new_file_number();
120 let logfile = db
121 .opt
122 .env
123 .open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
124 ve.set_log_num(lognum);
125 db.log = Some(LogWriter::new(BufWriter::new(logfile)));
126 db.log_num = Some(lognum);
127 }
128
129 if save_manifest {
130 ve.set_log_num(db.log_num.unwrap_or(0));
131 db.vset.borrow_mut().log_and_apply(ve)?;
132 }
133
134 db.delete_obsolete_files()?;
135 db.maybe_do_compaction()?;
136 Ok(db)
137 }
138
139 fn initialize_db(&mut self) -> Result<()> {
141 let mut ve = VersionEdit::new();
142 ve.set_comparator_name(self.opt.cmp.id());
143 ve.set_log_num(0);
144 ve.set_next_file(2);
145 ve.set_last_seq(0);
146
147 {
148 let manifest = manifest_file_name(&self.path, 1);
149 let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?;
150 let mut lw = LogWriter::new(manifest_file);
151 lw.add_record(&ve.encode())?;
152 lw.flush()?;
153 }
154 set_current_file(&self.opt.env, &self.path, 1)
155 }
156
157 fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> {
160 if self.opt.error_if_exists && self.opt.env.exists(&self.path.as_ref()).unwrap_or(false) {
161 return err(StatusCode::AlreadyExists, "database already exists");
162 }
163
164 let _ = self.opt.env.mkdir(Path::new(&self.path));
165 self.acquire_lock()?;
166
167 if let Err(e) = read_current_file(&self.opt.env, &self.path) {
168 if e.code == StatusCode::NotFound && self.opt.create_if_missing {
169 self.initialize_db()?;
170 } else {
171 return err(
172 StatusCode::InvalidArgument,
173 "database does not exist and create_if_missing is false",
174 );
175 }
176 }
177
178 let mut save_manifest = self.vset.borrow_mut().recover()?;
181
182 let mut max_seq = 0;
184 let filenames = self.opt.env.children(&self.path)?;
185 let mut expected = self.vset.borrow().live_files();
186 let mut log_files = vec![];
187
188 for file in &filenames {
189 match parse_file_name(&file) {
190 Ok((num, typ)) => {
191 expected.remove(&num);
192 if typ == FileType::Log
193 && (num >= self.vset.borrow().log_num
194 || num == self.vset.borrow().prev_log_num)
195 {
196 log_files.push(num);
197 }
198 }
199 Err(e) => return Err(e.annotate(format!("While parsing {:?}", file))),
200 }
201 }
202 if !expected.is_empty() {
203 log!(self.opt.log, "Missing at least these files: {:?}", expected);
204 return err(StatusCode::Corruption, "missing live files (see log)");
205 }
206
207 log_files.sort();
208 for i in 0..log_files.len() {
209 let (save_manifest_, max_seq_) =
210 self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?;
211 if save_manifest_ {
212 save_manifest = true;
213 }
214 if max_seq_ > max_seq {
215 max_seq = max_seq_;
216 }
217 self.vset.borrow_mut().mark_file_number_used(log_files[i]);
218 }
219
220 if self.vset.borrow().last_seq < max_seq {
221 self.vset.borrow_mut().last_seq = max_seq;
222 }
223
224 Ok(save_manifest)
225 }
226
227 fn recover_log_file(
231 &mut self,
232 log_num: FileNum,
233 is_last: bool,
234 ve: &mut VersionEdit,
235 ) -> Result<(bool, SequenceNumber)> {
236 let filename = log_file_name(&self.path, log_num);
237 let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?;
238 let cmp: Arc<Box<dyn Cmp>> = self.opt.cmp.clone();
240
241 let mut logreader = LogReader::new(
242 logfile, true,
244 );
245 log!(self.opt.log, "Recovering log file {:?}", filename);
246 let mut scratch = vec![];
247 let mut mem = MemTable::new(cmp.clone());
248 let mut batch = WriteBatch::new();
249
250 let mut compactions = 0;
251 let mut max_seq = 0;
252 let mut save_manifest = false;
253
254 while let Ok(len) = logreader.read(&mut scratch) {
255 if len == 0 {
256 break;
257 }
258 if len < 12 {
259 log!(
260 self.opt.log,
261 "corruption in log file {:06}: record shorter than 12B",
262 log_num
263 );
264 continue;
265 }
266
267 batch.set_contents(&scratch);
268 batch.insert_into_memtable(batch.sequence(), &mut mem);
269
270 let last_seq = batch.sequence() + batch.count() as u64 - 1;
271 if last_seq > max_seq {
272 max_seq = last_seq
273 }
274 if mem.approx_mem_usage() > self.opt.write_buffer_size {
275 compactions += 1;
276 self.write_l0_table(&mem, ve, None)?;
277 save_manifest = true;
278 mem = MemTable::new(cmp.clone());
279 }
280 batch.clear();
281 }
282
283 if self.opt.reuse_logs && is_last && compactions == 0 {
285 assert!(self.log.is_none());
286 log!(self.opt.log, "reusing log file {:?}", filename);
287 let oldsize = self.opt.env.size_of(Path::new(&filename))?;
288 let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
289 let lw = LogWriter::new_with_off(BufWriter::new(oldfile), oldsize);
290 self.log = Some(lw);
291 self.log_num = Some(log_num);
292 self.mem = mem;
293 } else if mem.len() > 0 {
294 save_manifest = true;
296 self.write_l0_table(&mem, ve, None)?;
297 }
298
299 Ok((save_manifest, max_seq))
300 }
301
302 fn delete_obsolete_files(&mut self) -> Result<()> {
304 let files = self.vset.borrow().live_files();
305 let filenames = self.opt.env.children(Path::new(&self.path))?;
306 for name in filenames {
307 if let Ok((num, typ)) = parse_file_name(&name) {
308 match typ {
309 FileType::Log => {
310 if num >= self.vset.borrow().log_num {
311 continue;
312 }
313 }
314 FileType::Descriptor => {
315 if num >= self.vset.borrow().manifest_num {
316 continue;
317 }
318 }
319 FileType::Table => {
320 if files.contains(&num) {
321 continue;
322 }
323 }
324 FileType::Temp => {
327 if files.contains(&num) {
328 continue;
329 }
330 }
331 FileType::Current | FileType::DBLock | FileType::InfoLog => continue,
332 }
333
334 if typ == FileType::Table {
336 let _ = self.cache.borrow_mut().evict(num);
337 }
338 log!(self.opt.log, "Deleting file type={:?} num={}", typ, num);
339 if let Err(e) = self.opt.env.delete(&self.path.join(&name)) {
340 log!(self.opt.log, "Deleting file num={} failed: {}", num, e);
341 }
342 }
343 }
344 Ok(())
345 }
346
347 fn acquire_lock(&mut self) -> Result<()> {
349 let lock_r = self.opt.env.lock(Path::new(&lock_file_name(&self.path)));
350 match lock_r {
351 Ok(lockfile) => {
352 self.lock = Some(lockfile);
353 Ok(())
354 }
355 Err(ref e) if e.code == StatusCode::LockError => err(
356 StatusCode::LockError,
357 "database lock is held by another instance",
358 ),
359 Err(e) => Err(e),
360 }
361 }
362
363 fn release_lock(&mut self) -> Result<()> {
365 if let Some(l) = self.lock.take() {
366 self.opt.env.unlock(l)
367 } else {
368 Ok(())
369 }
370 }
371
372 pub fn close(&mut self) -> Result<()> {
374 self.flush()?;
375 self.release_lock()?;
376 Ok(())
377 }
378}
379
380impl DB {
381 pub fn put(&mut self, k: &[u8], v: &[u8]) -> Result<()> {
386 let mut wb = WriteBatch::new();
387 wb.put(k, v);
388 self.write(wb, false)
389 }
390
391 pub fn delete(&mut self, k: &[u8]) -> Result<()> {
394 let mut wb = WriteBatch::new();
395 wb.delete(k);
396 self.write(wb, false)
397 }
398
399 pub fn write(&mut self, batch: WriteBatch, sync: bool) -> Result<()> {
402 assert!(self.log.is_some());
403
404 self.make_room_for_write(false)?;
405
406 let entries = batch.count() as u64;
407 let log = self.log.as_mut().unwrap();
408 let next = self.vset.borrow().last_seq + 1;
409
410 batch.insert_into_memtable(next, &mut self.mem);
411 log.add_record(&batch.encode(next))?;
412 if sync {
413 log.flush()?;
414 }
415 self.vset.borrow_mut().last_seq += entries;
416 Ok(())
417 }
418
419 pub fn flush(&mut self) -> Result<()> {
421 if let Some(ref mut log) = self.log.as_mut() {
422 log.flush()?;
423 }
424 Ok(())
425 }
426}
427
428impl DB {
429 fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Vec<u8>>> {
432 let lkey = LookupKey::new(key, seq);
435
436 match self.mem.get(&lkey) {
437 (Some(v), _) => return Ok(Some(v)),
438 (None, true) => return Ok(None),
440 (None, false) => {}
442 }
443
444 if let Some(imm) = self.imm.as_ref() {
445 match imm.get(&lkey) {
446 (Some(v), _) => return Ok(Some(v)),
447 (None, true) => return Ok(None),
449 (None, false) => {}
451 }
452 }
453
454 let mut do_compaction = false;
455 let mut result = None;
456
457 {
459 let current = self.current();
460 let mut current = current.borrow_mut();
461 if let Ok(Some((v, st))) = current.get(lkey.internal_key()) {
462 if current.update_stats(st) {
463 do_compaction = true;
464 }
465 result = Some(v)
466 }
467 }
468
469 if do_compaction {
470 if let Err(e) = self.maybe_do_compaction() {
471 log!(self.opt.log, "error while doing compaction in get: {}", e);
472 }
473 }
474 Ok(result)
475 }
476
477 pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Vec<u8>>> {
480 self.get_internal(snapshot.sequence(), key)
481 }
482
483 pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
485 let seq = self.vset.borrow().last_seq;
486 if let Ok(v) = self.get_internal(seq, key) {
487 v
488 } else {
489 None
490 }
491 }
492}
493
494impl DB {
495 pub fn new_iter(&mut self) -> Result<DBIterator> {
500 let snapshot = self.get_snapshot();
501 self.new_iter_at(snapshot)
502 }
503
504 pub fn new_iter_at(&mut self, ss: Snapshot) -> Result<DBIterator> {
506 Ok(DBIterator::new(
507 self.opt.cmp.clone(),
508 self.vset.clone(),
509 self.merge_iterators()?,
510 ss,
511 ))
512 }
513
514 fn merge_iterators(&mut self) -> Result<MergingIter> {
517 let mut iters: Vec<Box<dyn LdbIterator>> = vec![];
518 if self.mem.len() > 0 {
519 iters.push(Box::new(self.mem.iter()));
520 }
521 if let Some(ref imm) = self.imm {
522 if imm.len() > 0 {
523 iters.push(Box::new(imm.iter()));
524 }
525 }
526
527 let current = self.current();
529 let current = current.borrow();
530 iters.extend(current.new_iters()?);
531
532 Ok(MergingIter::new(self.internal_cmp.clone(), iters))
533 }
534}
535
536impl DB {
537 pub fn get_snapshot(&mut self) -> Snapshot {
542 self.snaps.new_snapshot(self.vset.borrow().last_seq)
543 }
544}
545
546impl DB {
547 fn add_stats(&mut self, level: usize, cs: CompactionStats) {
549 assert!(level < NUM_LEVELS);
550 self.cstats[level].add(cs);
551 }
552
553 fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) {
555 let current = self.current();
556 if current.borrow_mut().record_read_sample(k) {
557 if let Err(e) = self.maybe_do_compaction() {
558 log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
559 }
560 }
561 }
562}
563
564impl DB {
565 fn make_room_for_write(&mut self, force: bool) -> Result<()> {
570 if !force && self.mem.approx_mem_usage() < self.opt.write_buffer_size || self.mem.len() == 0 {
571 Ok(())
572 } else {
573 let logn = self.vset.borrow_mut().new_file_number();
575 let logf = self
576 .opt
577 .env
578 .open_writable_file(Path::new(&log_file_name(&self.path, logn)));
579 if logf.is_err() {
580 self.vset.borrow_mut().reuse_file_number(logn);
581 Err(logf.err().unwrap())
582 } else {
583 self.log = Some(LogWriter::new(BufWriter::new(logf.unwrap())));
584 self.log_num = Some(logn);
585
586 let mut imm = MemTable::new(self.opt.cmp.clone());
587 mem::swap(&mut imm, &mut self.mem);
588 self.imm = Some(imm);
589 self.maybe_do_compaction()
590 }
591 }
592 }
593
594 fn maybe_do_compaction(&mut self) -> Result<()> {
596 if self.imm.is_some() {
597 self.compact_memtable()?;
598 }
599 if self.vset.borrow().needs_compaction() {
602 let c = self.vset.borrow_mut().pick_compaction();
603 if let Some(c) = c {
604 self.start_compaction(c)
605 } else {
606 Ok(())
607 }
608 } else {
609 Ok(())
610 }
611 }
612
613 pub fn compact_range(&mut self, from: &[u8], to: &[u8]) -> Result<()> {
619 let mut max_level = 1;
620 {
621 let v = self.vset.borrow().current();
622 let v = v.borrow();
623 for l in 1..NUM_LEVELS - 1 {
624 if v.overlap_in_level(l, from, to) {
625 max_level = l;
626 }
627 }
628 }
629
630 self.make_room_for_write(true)?;
632
633 let mut ifrom = LookupKey::new(from, MAX_SEQUENCE_NUMBER)
634 .internal_key()
635 .to_vec();
636 let iend = LookupKey::new_full(to, 0, ValueType::TypeDeletion);
637
638 for l in 0..max_level + 1 {
639 loop {
640 let c_ = self
641 .vset
642 .borrow_mut()
643 .compact_range(l, &ifrom, iend.internal_key());
644 if let Some(c) = c_ {
645 let ix = c.num_inputs(0) - 1;
647 ifrom = c.input(0, ix).largest.clone();
648 self.start_compaction(c)?;
649 } else {
650 break;
651 }
652 }
653 }
654 Ok(())
655 }
656
657 fn start_compaction(&mut self, mut compaction: Compaction) -> Result<()> {
660 if compaction.is_trivial_move() {
661 assert_eq!(1, compaction.num_inputs(0));
662 let f = compaction.input(0, 0);
663 let num = f.num;
664 let size = f.size;
665 let level = compaction.level();
666
667 compaction.edit().delete_file(level, num);
668 compaction.edit().add_file(level + 1, f);
669
670 let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit());
671 if let Err(e) = r {
672 log!(self.opt.log, "trivial move failed: {}", e);
673 Err(e)
674 } else {
675 log!(
676 self.opt.log,
677 "Moved num={} bytes={} from L{} to L{}",
678 num,
679 size,
680 level,
681 level + 1
682 );
683 log!(
684 self.opt.log,
685 "Summary: {}",
686 self.vset.borrow().current_summary()
687 );
688 Ok(())
689 }
690 } else {
691 let smallest = if self.snaps.empty() {
692 self.vset.borrow().last_seq
693 } else {
694 self.snaps.oldest()
695 };
696 let mut state = CompactionState::new(compaction, smallest);
697 if let Err(e) = self.do_compaction_work(&mut state) {
698 state.cleanup(&self.opt.env, &self.path);
699 log!(self.opt.log, "Compaction work failed: {}", e);
700 }
701 self.install_compaction_results(state)?;
702 log!(
703 self.opt.log,
704 "Compaction finished: {}",
705 self.vset.borrow().current_summary()
706 );
707
708 self.delete_obsolete_files()
709 }
710 }
711
712 fn compact_memtable(&mut self) -> Result<()> {
713 assert!(self.imm.is_some());
714
715 let mut ve = VersionEdit::new();
716 let base = self.current();
717
718 let imm = self.imm.take().unwrap();
719 if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) {
720 self.imm = Some(imm);
721 return Err(e);
722 }
723 ve.set_log_num(self.log_num.unwrap_or(0));
724 self.vset.borrow_mut().log_and_apply(ve)?;
725 if let Err(e) = self.delete_obsolete_files() {
726 log!(self.opt.log, "Error deleting obsolete files: {}", e);
727 }
728 Ok(())
729 }
730
731 fn write_l0_table(
733 &mut self,
734 memt: &MemTable,
735 ve: &mut VersionEdit,
736 base: Option<&Version>,
737 ) -> Result<()> {
738 let start_ts = self.opt.env.micros();
739 let num = self.vset.borrow_mut().new_file_number();
740 log!(self.opt.log, "Start write of L0 table {:06}", num);
741 let fmd = build_table(&self.path, &self.opt, memt.iter(), num)?;
742 log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
743
744 if fmd.size == 0 {
746 self.vset.borrow_mut().reuse_file_number(num);
747 return Ok(());
748 }
749
750 let cache_result = self.cache.borrow_mut().get_table(num);
751 if let Err(e) = cache_result {
752 log!(
753 self.opt.log,
754 "L0 table {:06} not returned by cache: {}",
755 num,
756 e
757 );
758 let _ = self
759 .opt
760 .env
761 .delete(Path::new(&table_file_name(&self.path, num)));
762 return Err(e);
763 }
764
765 let mut stats = CompactionStats::default();
766 stats.micros = self.opt.env.micros() - start_ts;
767 stats.written = fmd.size;
768
769 let mut level = 0;
770 if let Some(b) = base {
771 level = b.pick_memtable_output_level(
772 parse_internal_key(&fmd.smallest).2,
773 parse_internal_key(&fmd.largest).2,
774 );
775 }
776
777 self.add_stats(level, stats);
778 ve.add_file(level, fmd);
779
780 Ok(())
781 }
782
783 fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
784 {
785 let current = self.vset.borrow().current();
786 assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
787 assert!(cs.builder.is_none());
788 }
789 let start_ts = self.opt.env.micros();
790 log!(
791 self.opt.log,
792 "Compacting {} files at L{} and {} files at L{}",
793 cs.compaction.num_inputs(0),
794 cs.compaction.level(),
795 cs.compaction.num_inputs(1),
796 cs.compaction.level() + 1
797 );
798
799 let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
800 input.seek_to_first();
801
802 let (mut key, mut val) = (vec![], vec![]);
803 let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
804
805 let mut have_ukey = false;
806 let mut current_ukey = vec![];
807
808 while input.valid() {
809 assert!(input.current(&mut key, &mut val));
812 if cs.compaction.should_stop_before(&key) && cs.builder.is_some() {
813 self.finish_compaction_output(cs)?;
814 }
815 let (ktyp, seq, ukey) = parse_internal_key(&key);
816 if seq == 0 {
817 log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key);
819 last_seq_for_key = MAX_SEQUENCE_NUMBER;
820 have_ukey = false;
821 current_ukey.clear();
822 input.advance();
823 continue;
824 }
825
826 if !have_ukey || self.opt.cmp.cmp(ukey, ¤t_ukey) != Ordering::Equal {
827 current_ukey.clear();
829 current_ukey.extend_from_slice(ukey);
830 have_ukey = true;
831 last_seq_for_key = MAX_SEQUENCE_NUMBER;
832 }
833
834 if last_seq_for_key <= cs.smallest_seq {
836 last_seq_for_key = seq;
837 input.advance();
838 continue;
839 }
840 if ktyp == ValueType::TypeDeletion
843 && seq <= cs.smallest_seq
844 && cs.compaction.is_base_level_for(ukey)
845 {
846 last_seq_for_key = seq;
847 input.advance();
848 continue;
849 }
850
851 last_seq_for_key = seq;
852
853 if cs.builder.is_none() {
854 let fnum = self.vset.borrow_mut().new_file_number();
855 let mut fmd = FileMetaData::default();
856 fmd.num = fnum;
857
858 let fname = table_file_name(&self.path, fnum);
859 let f = self.opt.env.open_writable_file(Path::new(&fname))?;
860 let f = Box::new(BufWriter::new(f));
861 cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
862 cs.outputs.push(fmd);
863 }
864 if cs.builder.as_ref().unwrap().entries() == 0 {
865 cs.current_output().smallest = key.clone();
866 }
867 cs.current_output().largest = key.clone();
868 cs.builder.as_mut().unwrap().add(&key, &val)?;
869 if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
871 self.finish_compaction_output(cs)?;
872 }
873
874 input.advance();
875 }
876
877 if cs.builder.is_some() {
878 self.finish_compaction_output(cs)?;
879 }
880
881 let mut stats = CompactionStats::default();
882 stats.micros = self.opt.env.micros() - start_ts;
883 for parent in 0..2 {
884 for inp in 0..cs.compaction.num_inputs(parent) {
885 stats.read += cs.compaction.input(parent, inp).size;
886 }
887 }
888 for output in &cs.outputs {
889 stats.written += output.size;
890 }
891 self.cstats[cs.compaction.level()].add(stats);
892 Ok(())
893 }
894
895 fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
896 assert!(cs.builder.is_some());
897 let output_num = cs.current_output().num;
898 assert!(output_num > 0);
899
900 let b = cs.builder.take().unwrap();
904 let entries = b.entries();
905 let bytes = b.finish()?;
906 cs.total_bytes += bytes;
907
908 cs.current_output().size = bytes;
909
910 if entries > 0 {
911 let r = self.cache.borrow_mut().get_table(output_num);
914 if let Err(e) = r {
915 log!(self.opt.log, "New table can't be read: {}", e);
916 return Err(e);
917 }
918 log!(
919 self.opt.log,
920 "New table num={}: keys={} size={}",
921 output_num,
922 entries,
923 bytes
924 );
925 }
926 Ok(())
927 }
928
929 fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> {
930 log!(
931 self.opt.log,
932 "Compacted {} L{} files + {} L{} files => {}B",
933 cs.compaction.num_inputs(0),
934 cs.compaction.level(),
935 cs.compaction.num_inputs(1),
936 cs.compaction.level() + 1,
937 cs.total_bytes
938 );
939 cs.compaction.add_input_deletions();
940 let level = cs.compaction.level();
941 for output in &cs.outputs {
942 cs.compaction.edit().add_file(level + 1, output.clone());
943 }
944 self.vset
945 .borrow_mut()
946 .log_and_apply(cs.compaction.into_edit())
947 }
948}
949
950impl Drop for DB {
951 fn drop(&mut self) {
952 self.flush().ok();
953 let _ = self.release_lock();
954 }
955}
956
957struct CompactionState {
958 compaction: Compaction,
959 smallest_seq: SequenceNumber,
960 outputs: Vec<FileMetaData>,
961 builder: Option<TableBuilder<Box<dyn Write>>>,
962 total_bytes: usize,
963}
964
965impl CompactionState {
966 fn new(c: Compaction, smallest: SequenceNumber) -> CompactionState {
967 CompactionState {
968 compaction: c,
969 smallest_seq: smallest,
970 outputs: vec![],
971 builder: None,
972 total_bytes: 0,
973 }
974 }
975
976 fn current_output(&mut self) -> &mut FileMetaData {
977 let len = self.outputs.len();
978 &mut self.outputs[len - 1]
979 }
980
981 fn cleanup<P: AsRef<Path>>(&mut self, env: &Box<dyn Env>, name: P) {
983 for o in self.outputs.drain(..) {
984 let name = table_file_name(name.as_ref(), o.num);
985 let _ = env.delete(&name);
986 }
987 }
988}
989
990#[derive(Debug, Default)]
991struct CompactionStats {
992 micros: u64,
993 read: usize,
994 written: usize,
995}
996
997impl CompactionStats {
998 fn add(&mut self, cs: CompactionStats) {
999 self.micros += cs.micros;
1000 self.read += cs.read;
1001 self.written += cs.written;
1002 }
1003}
1004
1005pub fn build_table<I: LdbIterator, P: AsRef<Path>>(
1006 dbname: P,
1007 opt: &Options,
1008 mut from: I,
1009 num: FileNum,
1010) -> Result<FileMetaData> {
1011 from.reset();
1012 let filename = table_file_name(dbname.as_ref(), num);
1013
1014 let (mut kbuf, mut vbuf) = (vec![], vec![]);
1015 let mut firstkey = None;
1016 let r = (|| -> Result<()> {
1022 let f = opt.env.open_writable_file(Path::new(&filename))?;
1023 let f = BufWriter::new(f);
1024 let mut builder = TableBuilder::new(opt.clone(), f);
1025 while from.advance() {
1026 assert!(from.current(&mut kbuf, &mut vbuf));
1027 if firstkey.is_none() {
1028 firstkey = Some(kbuf.clone());
1029 }
1030 builder.add(&kbuf, &vbuf)?;
1031 }
1032 builder.finish()?;
1033 Ok(())
1034 })();
1035
1036 if let Err(e) = r {
1037 let _ = opt.env.delete(Path::new(&filename));
1038 return Err(e);
1039 }
1040
1041 let mut md = FileMetaData::default();
1042 match firstkey {
1043 None => {
1044 let _ = opt.env.delete(Path::new(&filename));
1045 }
1046 Some(key) => {
1047 md.num = num;
1048 md.size = opt.env.size_of(Path::new(&filename))?;
1049 md.smallest = key;
1050 md.largest = kbuf;
1051 }
1052 }
1053 Ok(md)
1054}
1055
1056fn log_file_name(db: &Path, num: FileNum) -> PathBuf {
1057 db.join(format!("{:06}.log", num))
1058}
1059
1060fn lock_file_name(db: &Path) -> PathBuf {
1061 db.join("LOCK")
1062}
1063
1064fn open_info_log<E: Env + ?Sized, P: AsRef<Path>>(env: &E, db: P) -> Logger {
1067 let db = db.as_ref();
1068 let logfilename = db.join("LOG");
1069 let oldlogfilename = db.join("LOG.old");
1070 let _ = env.mkdir(Path::new(db));
1071 if let Ok(e) = env.exists(Path::new(&logfilename)) {
1072 if e {
1073 let _ = env.rename(Path::new(&logfilename), Path::new(&oldlogfilename));
1074 }
1075 }
1076 if let Ok(w) = env.open_writable_file(Path::new(&logfilename)) {
1077 Logger(w)
1078 } else {
1079 Logger(Box::new(io::sink()))
1080 }
1081}
1082
1083#[cfg(test)]
1084pub mod testutil {
1085 use super::*;
1086
1087 use crate::version::testutil::make_version;
1088
1089 pub fn build_db() -> (DB, Options) {
1091 let name = "db";
1092 let (v, mut opt) = make_version();
1093 opt.reuse_logs = false;
1094 opt.reuse_manifest = false;
1095 let mut ve = VersionEdit::new();
1096 ve.set_comparator_name(opt.cmp.id());
1097 ve.set_log_num(0);
1098 ve.set_next_file(11);
1100 ve.set_last_seq(30);
1102
1103 for l in 0..NUM_LEVELS {
1104 for f in &v.files[l] {
1105 ve.add_file(l, f.borrow().clone());
1106 }
1107 }
1108
1109 let manifest = manifest_file_name(name, 10);
1110 let manifest_file = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1111 let mut lw = LogWriter::new(manifest_file);
1112 lw.add_record(&ve.encode()).unwrap();
1113 lw.flush().unwrap();
1114 set_current_file(&opt.env, name, 10).unwrap();
1115
1116 (DB::open(name, opt.clone()).unwrap(), opt)
1117 }
1118
1119 pub fn set_file_to_compact(db: &mut DB, num: FileNum) {
1121 let v = db.current();
1122 let mut v = v.borrow_mut();
1123
1124 let mut ftc = None;
1125 for l in 0..NUM_LEVELS {
1126 for f in &v.files[l] {
1127 if f.borrow().num == num {
1128 ftc = Some((f.clone(), l));
1129 }
1130 }
1131 }
1132 if let Some((f, l)) = ftc {
1133 v.file_to_compact = Some(f);
1134 v.file_to_compact_lvl = l;
1135 } else {
1136 panic!("file number not found");
1137 }
1138 }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use super::testutil::{build_db, set_file_to_compact};
1144 use super::*;
1145
1146 use crate::error::Status;
1147 use crate::key_types::LookupKey;
1148 use crate::mem_env::MemEnv;
1149 use crate::options;
1150 use crate::test_util::LdbIteratorIter;
1151 use crate::version::testutil::make_version;
1152
1153 #[test]
1154 fn test_db_impl_open_info_log() {
1155 let e = MemEnv::new();
1156 {
1157 let l = Some(share(open_info_log(&e, "abc")));
1158 assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1159 log!(l, "hello {}", "world");
1160 assert_eq!(12, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1161 }
1162 {
1163 let l = Some(share(open_info_log(&e, "abc")));
1164 assert!(e.exists(&Path::new("abc").join("LOG.old")).unwrap());
1165 assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1166 assert_eq!(12, e.size_of(&Path::new("abc").join("LOG.old")).unwrap());
1167 assert_eq!(0, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1168 log!(l, "something else");
1169 log!(l, "and another {}", 1);
1170
1171 let mut s = String::new();
1172 let mut r = e
1173 .open_sequential_file(&Path::new("abc").join("LOG"))
1174 .unwrap();
1175 r.read_to_string(&mut s).unwrap();
1176 assert_eq!("something else\nand another 1\n", &s);
1177 }
1178 }
1179
1180 fn build_memtable() -> MemTable {
1181 let mut mt = MemTable::new(options::for_test().cmp);
1182 let mut i = 1;
1183 for k in ["abc", "def", "ghi", "jkl", "mno", "aabc", "test123"].iter() {
1184 mt.add(
1185 i,
1186 ValueType::TypeValue,
1187 k.as_bytes(),
1188 "looooongval".as_bytes(),
1189 );
1190 i += 1;
1191 }
1192 mt
1193 }
1194
1195 #[test]
1196 fn test_db_impl_init() {
1197 let opt = options::for_test();
1199 let env = opt.env.clone();
1200
1201 {
1205 let mut opt = opt.clone();
1206 opt.reuse_manifest = false;
1207 let _ = DB::open("otherdb", opt.clone()).unwrap();
1208
1209 eprintln!(
1210 "children after: {:?}",
1211 env.children(Path::new("otherdb")).unwrap()
1212 );
1213 assert!(env.exists(&Path::new("otherdb").join("CURRENT")).unwrap());
1214 assert!(!env
1216 .exists(&Path::new("otherdb").join("MANIFEST-000001"))
1217 .unwrap());
1218 assert!(env
1219 .exists(&Path::new("otherdb").join("MANIFEST-000002"))
1220 .unwrap());
1221 assert!(env
1222 .exists(&Path::new("otherdb").join("000003.log"))
1223 .unwrap());
1224 }
1225
1226 {
1227 let mut opt = opt.clone();
1228 opt.reuse_manifest = true;
1229 let mut db = DB::open("db", opt.clone()).unwrap();
1230
1231 eprintln!(
1232 "children after: {:?}",
1233 env.children(&Path::new("db").join("")).unwrap()
1234 );
1235 assert!(env.exists(&Path::new("db").join("CURRENT")).unwrap());
1236 assert!(env
1238 .exists(&Path::new("db").join("MANIFEST-000001"))
1239 .unwrap());
1240 assert!(env.exists(&Path::new("db").join("LOCK")).unwrap());
1241 assert!(env.exists(&Path::new("db").join("000003.log")).unwrap());
1242
1243 db.put("abc".as_bytes(), "def".as_bytes()).unwrap();
1244 db.put("abd".as_bytes(), "def".as_bytes()).unwrap();
1245 }
1246
1247 {
1248 eprintln!(
1249 "children before: {:?}",
1250 env.children(&Path::new("db").join("")).unwrap()
1251 );
1252 let mut opt = opt.clone();
1253 opt.reuse_manifest = false;
1254 opt.reuse_logs = false;
1255 let mut db = DB::open("db", opt.clone()).unwrap();
1256
1257 eprintln!(
1258 "children after: {:?}",
1259 env.children(&Path::new("db").join("")).unwrap()
1260 );
1261 assert!(!env
1263 .exists(&Path::new("db").join("MANIFEST-000001"))
1264 .unwrap());
1265 assert!(env
1267 .exists(&Path::new("db").join("MANIFEST-000002"))
1268 .unwrap());
1269 assert!(!env.exists(&Path::new("db").join("000003.log")).unwrap());
1271 assert!(env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1273 assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1274 let current = db.current();
1276 log!(opt.log, "files: {:?}", current.borrow().files);
1277 assert_eq!(
1278 "def".as_bytes(),
1279 current
1280 .borrow_mut()
1281 .get(LookupKey::new("abc".as_bytes(), 1).internal_key())
1282 .unwrap()
1283 .unwrap()
1284 .0
1285 .as_slice()
1286 );
1287 db.put("abe".as_bytes(), "def".as_bytes()).unwrap();
1288 }
1289
1290 {
1291 eprintln!(
1292 "children before: {:?}",
1293 env.children(Path::new("db")).unwrap()
1294 );
1295 let mut opt = opt.clone();
1298 opt.reuse_logs = true;
1299 let db = DB::open("db", opt).unwrap();
1300
1301 eprintln!(
1302 "children after: {:?}",
1303 env.children(Path::new("db")).unwrap()
1304 );
1305 assert!(!env
1306 .exists(&Path::new("db").join("MANIFEST-000001"))
1307 .unwrap());
1308 assert!(env
1309 .exists(&Path::new("db").join("MANIFEST-000002"))
1310 .unwrap());
1311 assert!(!env
1312 .exists(&Path::new("db").join("MANIFEST-000005"))
1313 .unwrap());
1314 assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1315 assert!(!env.exists(&Path::new("db").join("000006.log")).unwrap());
1317 assert_eq!(1, db.mem.len());
1319 assert_eq!(
1320 "def".as_bytes(),
1321 db.mem
1322 .get(&LookupKey::new("abe".as_bytes(), 3))
1323 .0
1324 .unwrap()
1325 .as_slice()
1326 );
1327 }
1328 }
1329
1330 #[test]
1331 fn test_db_impl_compact_range() {
1332 let (mut db, opt) = build_db();
1333 let env = &opt.env;
1334
1335 eprintln!(
1336 "children before: {:?}",
1337 env.children(&Path::new("db").join("")).unwrap()
1338 );
1339 db.compact_range(b"aaa", b"dba").unwrap();
1340 eprintln!(
1341 "children after: {:?}",
1342 env.children(&Path::new("db").join("")).unwrap()
1343 );
1344
1345 assert_eq!(
1346 250,
1347 opt.env
1348 .size_of(&Path::new("db").join("000007.ldb"))
1349 .unwrap()
1350 );
1351 assert_eq!(
1352 200,
1353 opt.env
1354 .size_of(&Path::new("db").join("000008.ldb"))
1355 .unwrap()
1356 );
1357 assert_eq!(
1358 200,
1359 opt.env
1360 .size_of(&Path::new("db").join("000009.ldb"))
1361 .unwrap()
1362 );
1363 assert_eq!(
1364 435,
1365 opt.env
1366 .size_of(&Path::new("db").join("000015.ldb"))
1367 .unwrap()
1368 );
1369
1370 assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1371 assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1372 assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1373 assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1374 assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1375 assert!(!opt.env.exists(&Path::new("db").join("000013.ldb")).unwrap());
1376 assert!(!opt.env.exists(&Path::new("db").join("000014.ldb")).unwrap());
1377
1378 assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1379 assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1380 assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1381 assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1382 }
1383
1384 #[test]
1385 fn test_db_impl_compact_range_memtable() {
1386 let (mut db, opt) = build_db();
1387 let env = &opt.env;
1388
1389 db.put(b"xxx", b"123").unwrap();
1390
1391 eprintln!(
1392 "children before: {:?}",
1393 env.children(Path::new("db")).unwrap()
1394 );
1395 db.compact_range(b"aaa", b"dba").unwrap();
1396 eprintln!(
1397 "children after: {:?}",
1398 env.children(Path::new("db")).unwrap()
1399 );
1400
1401 assert_eq!(
1402 250,
1403 opt.env
1404 .size_of(&Path::new("db").join("000007.ldb"))
1405 .unwrap()
1406 );
1407 assert_eq!(
1408 200,
1409 opt.env
1410 .size_of(&Path::new("db").join("000008.ldb"))
1411 .unwrap()
1412 );
1413 assert_eq!(
1414 200,
1415 opt.env
1416 .size_of(&Path::new("db").join("000009.ldb"))
1417 .unwrap()
1418 );
1419 assert_eq!(
1420 182,
1421 opt.env
1422 .size_of(&Path::new("db").join("000014.ldb"))
1423 .unwrap()
1424 );
1425 assert_eq!(
1426 435,
1427 opt.env
1428 .size_of(&Path::new("db").join("000017.ldb"))
1429 .unwrap()
1430 );
1431
1432 assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1433 assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1434 assert!(!opt.env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1435 assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1436 assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1437 assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1438 assert!(!opt.env.exists(&Path::new("db").join("000015.ldb")).unwrap());
1439 assert!(!opt.env.exists(&Path::new("db").join("000016.ldb")).unwrap());
1440
1441 assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1442 assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1443 assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1444 assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1445 assert_eq!(b"123".to_vec(), db.get(b"xxx").unwrap());
1446 }
1447
1448 #[allow(unused_variables)]
1449 #[test]
1450 fn test_db_impl_locking() {
1451 let opt = options::for_test();
1452 let db = DB::open("db", opt.clone()).unwrap();
1453 let want_err = Status::new(
1454 StatusCode::LockError,
1455 "database lock is held by another instance",
1456 );
1457 assert_eq!(want_err, DB::open("db", opt.clone()).err().unwrap());
1458 }
1459
1460 #[test]
1461 fn test_db_impl_build_table() {
1462 let mut opt = options::for_test();
1463 opt.block_size = 128;
1464 let mt = build_memtable();
1465
1466 let f = build_table("db", &opt, mt.iter(), 123).unwrap();
1467 let path = &Path::new("db").join("000123.ldb");
1468
1469 assert_eq!(
1470 LookupKey::new("aabc".as_bytes(), 6).internal_key(),
1471 f.smallest.as_slice()
1472 );
1473 assert_eq!(
1474 LookupKey::new("test123".as_bytes(), 7).internal_key(),
1475 f.largest.as_slice()
1476 );
1477 assert_eq!(379, f.size);
1478 assert_eq!(123, f.num);
1479 assert!(opt.env.exists(path).unwrap());
1480
1481 {
1482 let mut tc = TableCache::new("db", opt.clone(), 100);
1484 let tbl = tc.get_table(123).unwrap();
1485 assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count());
1486 }
1487
1488 {
1489 let mut buf = vec![];
1491 opt.env
1492 .open_sequential_file(path)
1493 .unwrap()
1494 .read_to_end(&mut buf)
1495 .unwrap();
1496 buf[150] += 1;
1497 opt.env
1498 .open_writable_file(path)
1499 .unwrap()
1500 .write_all(&buf)
1501 .unwrap();
1502
1503 let mut tc = TableCache::new("db", opt.clone(), 100);
1504 let tbl = tc.get_table(123).unwrap();
1505 assert_eq!(
1507 5,
1508 LdbIteratorIter::wrap(&mut tbl.iter())
1509 .map(|v| eprintln!("{:?}", v))
1510 .count()
1511 );
1512 }
1513 }
1514
1515 #[allow(unused_variables)]
1516 #[test]
1517 fn test_db_impl_build_db_sanity() {
1518 let db = build_db().0;
1519 let env = &db.opt.env;
1520 let name = &db.name;
1521
1522 assert!(env.exists(Path::new(&log_file_name(name, 12))).unwrap());
1523 }
1524
1525 #[test]
1526 fn test_db_impl_get_from_table_with_snapshot() {
1527 let mut db = build_db().0;
1528
1529 assert_eq!(30, db.vset.borrow().last_seq);
1530
1531 db.put("xyy".as_bytes(), "123".as_bytes()).unwrap();
1533 let old_ss = db.get_snapshot();
1534 db.put("xyz".as_bytes(), "123".as_bytes()).unwrap();
1536 db.flush().unwrap();
1537 assert!(db.get_at(&old_ss, "xyy".as_bytes()).unwrap().is_some());
1538 assert!(db.get_at(&old_ss, "xyz".as_bytes()).unwrap().is_none());
1539
1540 assert_eq!(
1542 "123".as_bytes(),
1543 db.get("xyz".as_bytes()).unwrap().as_slice()
1544 );
1545 assert!(db.get_internal(31, "xyy".as_bytes()).unwrap().is_some());
1546 assert!(db.get_internal(32, "xyy".as_bytes()).unwrap().is_some());
1547
1548 assert!(db.get_internal(31, "xyz".as_bytes()).unwrap().is_none());
1549 assert!(db.get_internal(32, "xyz".as_bytes()).unwrap().is_some());
1550
1551 assert_eq!(
1553 "val2".as_bytes(),
1554 db.get("eab".as_bytes()).unwrap().as_slice()
1555 );
1556 assert!(db.get_internal(3, "eab".as_bytes()).unwrap().is_none());
1557 assert!(db.get_internal(32, "eab".as_bytes()).unwrap().is_some());
1558
1559 {
1560 let ss = db.get_snapshot();
1561 assert_eq!(
1562 "val2".as_bytes(),
1563 db.get_at(&ss, "eab".as_bytes())
1564 .unwrap()
1565 .unwrap()
1566 .as_slice()
1567 );
1568 }
1569
1570 assert_eq!(
1572 "val2".as_bytes(),
1573 db.get("cab".as_bytes()).unwrap().as_slice()
1574 );
1575 }
1576
1577 #[test]
1578 fn test_db_impl_delete() {
1579 let mut db = build_db().0;
1580
1581 db.put(b"xyy", b"123").unwrap();
1582 db.put(b"xyz", b"123").unwrap();
1583
1584 assert!(db.get(b"xyy").is_some());
1585 assert!(db.get(b"gaa").is_some());
1586
1587 db.delete(b"xyy").unwrap();
1589 db.delete(b"gaa").unwrap();
1590
1591 assert!(db.get(b"xyy").is_none());
1592 assert!(db.get(b"gaa").is_none());
1593 assert!(db.get(b"xyz").is_some());
1594 }
1595
1596 #[test]
1597 fn test_db_impl_compact_single_file() {
1598 let mut db = build_db().0;
1599 set_file_to_compact(&mut db, 4);
1600 db.maybe_do_compaction().unwrap();
1601
1602 let env = &db.opt.env;
1603 let name = &db.name;
1604 assert!(!env.exists(Path::new(&table_file_name(name, 3))).unwrap());
1605 assert!(!env.exists(Path::new(&table_file_name(name, 4))).unwrap());
1606 assert!(!env.exists(Path::new(&table_file_name(name, 5))).unwrap());
1607 assert!(env.exists(Path::new(&table_file_name(name, 13))).unwrap());
1608 }
1609
1610 #[test]
1611 fn test_db_impl_compaction_trivial_move() {
1612 let mut db = DB::open("db", options::for_test()).unwrap();
1613
1614 db.put("abc".as_bytes(), "xyz".as_bytes()).unwrap();
1615 db.put("ab3".as_bytes(), "xyz".as_bytes()).unwrap();
1616 db.put("ab0".as_bytes(), "xyz".as_bytes()).unwrap();
1617 db.put("abz".as_bytes(), "xyz".as_bytes()).unwrap();
1618 assert_eq!(4, db.mem.len());
1619 let mut imm = MemTable::new(db.opt.cmp.clone());
1620 mem::swap(&mut imm, &mut db.mem);
1621 db.imm = Some(imm);
1622 db.compact_memtable().unwrap();
1623
1624 eprintln!(
1625 "children after: {:?}",
1626 db.opt.env.children(Path::new("db")).unwrap()
1627 );
1628 assert!(db
1629 .opt
1630 .env
1631 .exists(&Path::new("db").join("000004.ldb"))
1632 .unwrap());
1633
1634 {
1635 let v = db.current();
1636 let mut v = v.borrow_mut();
1637 v.file_to_compact = Some(v.files[2][0].clone());
1638 v.file_to_compact_lvl = 2;
1639 }
1640
1641 db.maybe_do_compaction().unwrap();
1642
1643 {
1644 let v = db.current();
1645 let v = v.borrow_mut();
1646 assert_eq!(1, v.files[3].len());
1647 }
1648 }
1649
1650 #[test]
1651 fn test_db_impl_memtable_compaction() {
1652 let mut opt = options::for_test();
1653 opt.write_buffer_size = 25;
1654 let mut db = DB::new("db", opt);
1655
1656 db.mem = build_memtable();
1658
1659 db.make_room_for_write(true).unwrap();
1661 assert_eq!(0, db.mem.len());
1662 assert!(db
1663 .opt
1664 .env
1665 .exists(&Path::new("db").join("000002.log"))
1666 .unwrap());
1667 assert!(db
1668 .opt
1669 .env
1670 .exists(&Path::new("db").join("000003.ldb"))
1671 .unwrap());
1672 assert_eq!(
1673 351,
1674 db.opt
1675 .env
1676 .size_of(&Path::new("db").join("000003.ldb"))
1677 .unwrap()
1678 );
1679 assert_eq!(
1680 7,
1681 LdbIteratorIter::wrap(&mut db.cache.borrow_mut().get_table(3).unwrap().iter()).count()
1682 );
1683 }
1684
1685 #[test]
1686 fn test_db_impl_compaction() {
1687 let mut db = build_db().0;
1688 let v = db.current();
1689 v.borrow_mut().compaction_score = Some(2.0);
1690 v.borrow_mut().compaction_level = Some(1);
1691
1692 db.maybe_do_compaction().unwrap();
1693
1694 assert!(!db
1695 .opt
1696 .env
1697 .exists(&Path::new("db").join("000003.ldb"))
1698 .unwrap());
1699 assert!(db
1700 .opt
1701 .env
1702 .exists(&Path::new("db").join("000013.ldb"))
1703 .unwrap());
1704 assert_eq!(
1705 345,
1706 db.opt
1707 .env
1708 .size_of(&Path::new("db").join("000013.ldb"))
1709 .unwrap()
1710 );
1711
1712 let v = db.current();
1714 assert_eq!(0, v.borrow().files[1].len());
1715 assert_eq!(2, v.borrow().files[2].len());
1716 }
1717
1718 #[test]
1719 fn test_db_impl_compaction_trivial() {
1720 let (mut v, opt) = make_version();
1721
1722 let to_compact = v.files[2][0].clone();
1723 v.file_to_compact = Some(to_compact);
1724 v.file_to_compact_lvl = 2;
1725
1726 let mut db = DB::new("db", opt.clone());
1727 db.vset.borrow_mut().add_version(v);
1728 db.vset.borrow_mut().next_file_num = 10;
1729
1730 db.maybe_do_compaction().unwrap();
1731
1732 assert!(opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1733 assert!(!opt.env.exists(&Path::new("db").join("000010.ldb")).unwrap());
1734 assert_eq!(
1735 218,
1736 opt.env
1737 .size_of(&Path::new("db").join("000006.ldb"))
1738 .unwrap()
1739 );
1740
1741 let v = db.current();
1742 assert_eq!(1, v.borrow().files[2].len());
1743 assert_eq!(3, v.borrow().files[3].len());
1744 }
1745
1746 #[test]
1747 fn test_db_impl_compaction_state_cleanup() {
1748 let env: Box<dyn Env> = Box::new(MemEnv::new());
1749 let name = "db";
1750
1751 let stuff = "abcdefghijkl".as_bytes();
1752 env.open_writable_file(&Path::new("db").join("000001.ldb"))
1753 .unwrap()
1754 .write_all(stuff)
1755 .unwrap();
1756 let mut fmd = FileMetaData::default();
1757 fmd.num = 1;
1758
1759 let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None), 12);
1760 cs.outputs = vec![fmd];
1761 cs.cleanup(&env, name);
1762
1763 assert!(!env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1764 }
1765
1766 #[test]
1767 fn test_db_impl_open_close_reopen() {
1768 let opt;
1769 {
1770 let mut db = build_db().0;
1771 opt = db.opt.clone();
1772 db.put(b"xx1", b"111").unwrap();
1773 db.put(b"xx2", b"112").unwrap();
1774 db.put(b"xx3", b"113").unwrap();
1775 db.put(b"xx4", b"114").unwrap();
1776 db.put(b"xx5", b"115").unwrap();
1777 db.delete(b"xx2").unwrap();
1778 }
1779
1780 {
1781 let mut db = DB::open("db", opt.clone()).unwrap();
1782 db.delete(b"xx5").unwrap();
1783 }
1784
1785 {
1786 let mut db = DB::open("db", opt.clone()).unwrap();
1787
1788 assert_eq!(None, db.get(b"xx5"));
1789
1790 let ss = db.get_snapshot();
1791 db.put(b"xx4", b"222").unwrap();
1792 let ss2 = db.get_snapshot();
1793
1794 assert_eq!(Some(b"113".to_vec()), db.get_at(&ss, b"xx3").unwrap());
1795 assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1796 assert_eq!(None, db.get_at(&ss, b"xx5").unwrap());
1797
1798 assert_eq!(Some(b"114".to_vec()), db.get_at(&ss, b"xx4").unwrap());
1799 assert_eq!(Some(b"222".to_vec()), db.get_at(&ss2, b"xx4").unwrap());
1800 }
1801
1802 {
1803 let mut db = DB::open("db", opt).unwrap();
1804
1805 let ss = db.get_snapshot();
1806 assert_eq!(Some(b"113".to_vec()), db.get_at(&ss, b"xx3").unwrap());
1807 assert_eq!(Some(b"222".to_vec()), db.get_at(&ss, b"xx4").unwrap());
1808 assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1809 }
1810 }
1811
1812 #[test]
1813 fn test_drop_memtable() {
1814 let mut db = DB::open("db", options::for_test()).unwrap();
1815 let mut cnt = 0;
1816 let mut buf: Vec<u8> = Vec::with_capacity(8);
1817 let entries_per_batch = 1;
1818 let max_num = 100000;
1819 while cnt < max_num {
1820 let mut write_batch = WriteBatch::new();
1821 for i in 0..entries_per_batch {
1822 buf.clear();
1823 buf.extend_from_slice(format!("{}-{}", cnt, i).as_bytes());
1824 write_batch.put(buf.as_slice(), buf.as_slice());
1825 }
1826 db.write(write_batch, false).unwrap();
1827 cnt += entries_per_batch;
1828 }
1829 }
1830}