1#![allow(unused_attributes)]
5
6use crate::cache::Cache;
7use crate::db_iter::DBIterator;
8
9use crate::cmp::{Cmp, InternalKeyCmp};
10use crate::env::{Env, FileLock};
11use crate::error::{err, Result, StatusCode};
12use crate::filter::{BoxedFilterPolicy, InternalFilterPolicy};
13use crate::infolog::Logger;
14use crate::key_types::{parse_internal_key, InternalKey, LookupKey, ValueType};
15use crate::log::{LogReader, LogWriter};
16use crate::memtable::MemTable;
17use crate::merging_iter::MergingIter;
18use crate::options::Options;
19use crate::snapshot::{Snapshot, SnapshotList};
20use crate::table_builder::TableBuilder;
21use crate::table_cache::{table_file_name, TableCache};
22use crate::types::{
23 parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, SequenceNumber, Shared,
24 MAX_SEQUENCE_NUMBER, NUM_LEVELS,
25};
26use crate::version::Version;
27use crate::version_edit::VersionEdit;
28use crate::version_set::{
29 manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet,
30};
31use crate::write_batch::WriteBatch;
32
33use bytes::Bytes;
34use std::cmp::Ordering;
35use std::io::{self, BufWriter, Write};
36use std::mem;
37use std::ops::Drop;
38use std::path::Path;
39use std::path::PathBuf;
40use std::rc::Rc;
41
42pub struct DB {
45 name: PathBuf,
46 path: PathBuf,
47 lock: Option<FileLock>,
48
49 internal_cmp: Rc<Box<dyn Cmp>>,
50 fpol: InternalFilterPolicy<BoxedFilterPolicy>,
51 opt: Options,
52
53 mem: MemTable,
54 imm: Option<MemTable>,
55
56 log: Option<LogWriter<BufWriter<Box<dyn Write>>>>,
57 log_num: Option<FileNum>,
58 cache: Shared<TableCache>,
59 vset: Shared<VersionSet>,
60 snaps: SnapshotList,
61
62 cstats: [CompactionStats; NUM_LEVELS],
63}
64
65unsafe impl Send for DB {}
66
67impl DB {
68 fn new<P: AsRef<Path>>(name: P, mut opt: Options) -> DB {
72 let name = name.as_ref();
73 if opt.log.is_none() {
74 let log = open_info_log(opt.env.as_ref().as_ref(), name);
75 opt.log = Some(share(log));
76 }
77 let path = name.canonicalize().unwrap_or(name.to_owned());
78
79 let cache = share(TableCache::new(
80 name,
81 opt.clone(),
82 share(Cache::new(opt.block_cache_capacity_bytes / opt.block_size)),
83 opt.max_open_files - 10,
84 ));
85 let vset = VersionSet::new(name, opt.clone(), cache.clone());
86
87 DB {
88 name: name.to_owned(),
89 path,
90 lock: None,
91 internal_cmp: Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))),
92 fpol: InternalFilterPolicy::new(opt.filter_policy.clone()),
93
94 mem: MemTable::new(opt.cmp.clone()),
95 imm: None,
96
97 opt,
98
99 log: None,
100 log_num: None,
101 cache,
102 vset: share(vset),
103 snaps: SnapshotList::new(),
104
105 cstats: Default::default(),
106 }
107 }
108
109 fn current(&self) -> Shared<Version> {
110 self.vset.borrow().current()
111 }
112
113 pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> {
119 let name = name.as_ref();
120 let mut db = DB::new(name, opt);
121 let mut ve = VersionEdit::new();
122 let save_manifest = db.recover(&mut ve)?;
123
124 if db.log.is_none() {
126 let lognum = db.vset.borrow_mut().new_file_number();
127 let logfile = db
128 .opt
129 .env
130 .open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
131 ve.set_log_num(lognum);
132 db.log = Some(LogWriter::new(BufWriter::new(logfile)));
133 db.log_num = Some(lognum);
134 }
135
136 if save_manifest {
137 ve.set_log_num(db.log_num.unwrap_or(0));
138 db.vset.borrow_mut().log_and_apply(ve)?;
139 }
140
141 db.delete_obsolete_files()?;
142 db.maybe_do_compaction()?;
143 Ok(db)
144 }
145
146 fn initialize_db(&mut self) -> Result<()> {
148 let mut ve = VersionEdit::new();
149 ve.set_comparator_name(self.opt.cmp.id());
150 ve.set_log_num(0);
151 ve.set_next_file(2);
152 ve.set_last_seq(0);
153
154 {
155 let manifest = manifest_file_name(&self.path, 1);
156 let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?;
157 let mut lw = LogWriter::new(manifest_file);
158 lw.add_record(&ve.encode())?;
159 lw.flush()?;
160 }
161 set_current_file(self.opt.env.as_ref().as_ref(), &self.path, 1)
162 }
163
164 fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> {
167 if self.opt.error_if_exists && self.opt.env.exists(self.path.as_ref()).unwrap_or(false) {
168 return err(StatusCode::AlreadyExists, "database already exists");
169 }
170
171 let _ = self.opt.env.mkdir(Path::new(&self.path));
172 self.acquire_lock()?;
173
174 if let Err(e) = read_current_file(self.opt.env.as_ref().as_ref(), &self.path) {
175 if e.code == StatusCode::NotFound && self.opt.create_if_missing {
176 self.initialize_db()?;
177 } else {
178 return err(
179 StatusCode::InvalidArgument,
180 "database does not exist and create_if_missing is false",
181 );
182 }
183 }
184
185 let mut save_manifest = self.vset.borrow_mut().recover()?;
188
189 let mut max_seq = 0;
191 let filenames = self.opt.env.children(&self.path)?;
192 let mut expected = self.vset.borrow().live_files();
193 let mut log_files = vec![];
194
195 for file in &filenames {
196 if let Ok((num, typ)) = parse_file_name(file) {
197 expected.remove(&num);
198 if typ == FileType::Log
199 && (num >= self.vset.borrow().log_num || num == self.vset.borrow().prev_log_num)
200 {
201 log_files.push(num);
202 }
203 }
204 }
205 if !expected.is_empty() {
206 log!(self.opt.log, "Missing at least these files: {:?}", expected);
207 return err(StatusCode::Corruption, "missing live files (see log)");
208 }
209
210 log_files.sort();
211 for i in 0..log_files.len() {
212 let (save_manifest_, max_seq_) =
213 self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?;
214 if save_manifest_ {
215 save_manifest = true;
216 }
217 if max_seq_ > max_seq {
218 max_seq = max_seq_;
219 }
220 self.vset.borrow_mut().mark_file_number_used(log_files[i]);
221 }
222
223 if self.vset.borrow().last_seq < max_seq {
224 self.vset.borrow_mut().last_seq = max_seq;
225 }
226
227 Ok(save_manifest)
228 }
229
230 fn recover_log_file(
234 &mut self,
235 log_num: FileNum,
236 is_last: bool,
237 ve: &mut VersionEdit,
238 ) -> Result<(bool, SequenceNumber)> {
239 let filename = log_file_name(&self.path, log_num);
240 let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?;
241 let cmp: Rc<Box<dyn Cmp>> = self.opt.cmp.clone();
243
244 let mut logreader = LogReader::new(
245 logfile, true,
247 );
248 log!(self.opt.log, "Recovering log file {:?}", filename);
249 let mut scratch = vec![];
250 let mut mem = MemTable::new(cmp.clone());
251 let mut batch = WriteBatch::new();
252
253 let mut compactions = 0;
254 let mut max_seq = 0;
255 let mut save_manifest = false;
256
257 while let Ok(len) = logreader.read(&mut scratch) {
258 if len == 0 {
259 break;
260 }
261 if len < 12 {
262 log!(
263 self.opt.log,
264 "corruption in log file {:06}: record shorter than 12B",
265 log_num
266 );
267 continue;
268 }
269
270 batch.set_contents(&scratch);
271 batch.insert_into_memtable(batch.sequence(), &mut mem);
272
273 let last_seq = batch.sequence() + batch.count() as u64 - 1;
274 if last_seq > max_seq {
275 max_seq = last_seq
276 }
277 if mem.approx_mem_usage() > self.opt.write_buffer_size {
278 compactions += 1;
279 self.write_l0_table(&mem, ve, None)?;
280 save_manifest = true;
281 mem = MemTable::new(cmp.clone());
282 }
283 batch.clear();
284 }
285
286 if self.opt.reuse_logs && is_last && compactions == 0 {
288 assert!(self.log.is_none());
289 log!(self.opt.log, "reusing log file {:?}", filename);
290 let oldsize = self.opt.env.size_of(Path::new(&filename))?;
291 let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
292 let lw = LogWriter::new_with_off(BufWriter::new(oldfile), oldsize);
293 self.log = Some(lw);
294 self.log_num = Some(log_num);
295 self.mem = mem;
296 } else if mem.len() > 0 {
297 save_manifest = true;
299 self.write_l0_table(&mem, ve, None)?;
300 }
301
302 Ok((save_manifest, max_seq))
303 }
304
305 fn delete_obsolete_files(&mut self) -> Result<()> {
307 let files = self.vset.borrow().live_files();
308 let filenames = self.opt.env.children(Path::new(&self.path))?;
309 for name in filenames {
310 if let Ok((num, typ)) = parse_file_name(&name) {
311 match typ {
312 FileType::Log => {
313 if num >= self.vset.borrow().log_num {
314 continue;
315 }
316 }
317 FileType::Descriptor => {
318 if num >= self.vset.borrow().manifest_num {
319 continue;
320 }
321 }
322 FileType::Table => {
323 if files.contains(&num) {
324 continue;
325 }
326 }
327 FileType::Temp => {
330 if files.contains(&num) {
331 continue;
332 }
333 }
334 FileType::Current | FileType::DBLock | FileType::InfoLog => continue,
335 }
336
337 if typ == FileType::Table {
339 let _ = self.cache.borrow_mut().evict(num);
340 }
341 log!(self.opt.log, "Deleting file type={:?} num={}", typ, num);
342 if let Err(e) = self.opt.env.delete(&self.path.join(&name)) {
343 log!(self.opt.log, "Deleting file num={} failed: {}", num, e);
344 }
345 }
346 }
347 Ok(())
348 }
349
350 fn acquire_lock(&mut self) -> Result<()> {
352 let lock_r = self.opt.env.lock(Path::new(&lock_file_name(&self.path)));
353 match lock_r {
354 Ok(lockfile) => {
355 self.lock = Some(lockfile);
356 Ok(())
357 }
358 Err(ref e) if e.code == StatusCode::LockError => err(
359 StatusCode::LockError,
360 "database lock is held by another instance",
361 ),
362 Err(e) => Err(e),
363 }
364 }
365
366 fn release_lock(&mut self) -> Result<()> {
368 if let Some(l) = self.lock.take() {
369 self.opt.env.unlock(l)
370 } else {
371 Ok(())
372 }
373 }
374
375 pub fn close(&mut self) -> Result<()> {
377 self.flush()?;
378 self.release_lock()?;
379 Ok(())
380 }
381}
382
383impl DB {
384 pub fn put(&mut self, k: &[u8], v: &[u8]) -> Result<()> {
389 let mut wb = WriteBatch::new();
390 wb.put(k, v);
391 self.write(wb, false)
392 }
393
394 pub fn delete(&mut self, k: &[u8]) -> Result<()> {
397 let mut wb = WriteBatch::new();
398 wb.delete(k);
399 self.write(wb, false)
400 }
401
402 pub fn write(&mut self, batch: WriteBatch, sync: bool) -> Result<()> {
405 assert!(self.log.is_some());
406
407 self.make_room_for_write(false)?;
408
409 let entries = batch.count() as u64;
410 let log = self.log.as_mut().unwrap();
411 let next = self.vset.borrow().last_seq + 1;
412
413 batch.insert_into_memtable(next, &mut self.mem);
414 log.add_record(&batch.encode(next))?;
415 if sync {
416 log.flush()?;
417 }
418 self.vset.borrow_mut().last_seq += entries;
419 Ok(())
420 }
421
422 pub fn flush(&mut self) -> Result<()> {
424 if let Some(ref mut log) = self.log.as_mut() {
425 log.flush()?;
426 }
427 Ok(())
428 }
429}
430
431impl DB {
432 fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Bytes>> {
435 let lkey = LookupKey::new(key, seq);
438
439 match self.mem.get(&lkey) {
440 (Some(v), _) => return Ok(Some(v)),
441 (None, true) => return Ok(None),
443 (None, false) => {}
445 }
446
447 if let Some(imm) = self.imm.as_ref() {
448 match imm.get(&lkey) {
449 (Some(v), _) => return Ok(Some(v)),
450 (None, true) => return Ok(None),
452 (None, false) => {}
454 }
455 }
456
457 let mut do_compaction = false;
458 let mut result = None;
459
460 {
462 let current = self.current();
463 let mut current = current.borrow_mut();
464 if let Ok(Some((v, st))) = current.get(lkey.internal_key()) {
465 if current.update_stats(st) {
466 do_compaction = true;
467 }
468 result = Some(v)
469 }
470 }
471
472 if do_compaction {
473 if let Err(e) = self.maybe_do_compaction() {
474 log!(self.opt.log, "error while doing compaction in get: {}", e);
475 }
476 }
477 Ok(result)
478 }
479
480 pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Bytes>> {
483 self.get_internal(snapshot.sequence(), key)
484 }
485
486 pub fn get(&mut self, key: &[u8]) -> Option<Bytes> {
488 let seq = self.vset.borrow().last_seq;
489 self.get_internal(seq, key).unwrap_or_default()
490 }
491}
492
493impl DB {
494 pub fn new_iter(&mut self) -> Result<DBIterator> {
499 let snapshot = self.get_snapshot();
500 self.new_iter_at(snapshot)
501 }
502
503 pub fn new_iter_at(&mut self, ss: Snapshot) -> Result<DBIterator> {
505 Ok(DBIterator::new(
506 self.opt.cmp.clone(),
507 self.vset.clone(),
508 self.merge_iterators()?,
509 ss,
510 ))
511 }
512
513 fn merge_iterators(&mut self) -> Result<MergingIter> {
516 let mut iters: Vec<Box<dyn LdbIterator>> = vec![];
517 if self.mem.len() > 0 {
518 iters.push(Box::new(self.mem.iter()));
519 }
520 if let Some(ref imm) = self.imm {
521 if imm.len() > 0 {
522 iters.push(Box::new(imm.iter()));
523 }
524 }
525
526 let current = self.current();
528 let current = current.borrow();
529 iters.extend(current.new_iters()?);
530
531 Ok(MergingIter::new(self.internal_cmp.clone(), iters))
532 }
533}
534
535impl DB {
536 pub fn get_snapshot(&mut self) -> Snapshot {
541 self.snaps.new_snapshot(self.vset.borrow().last_seq)
542 }
543}
544
545impl DB {
546 fn add_stats(&mut self, level: usize, cs: CompactionStats) {
548 assert!(level < NUM_LEVELS);
549 self.cstats[level].add(cs);
550 }
551
552 fn record_read_sample(&mut self, k: InternalKey<'_>) {
554 let current = self.current();
555 if current.borrow_mut().record_read_sample(k) {
556 if let Err(e) = self.maybe_do_compaction() {
557 log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
558 }
559 }
560 }
561}
562
563impl DB {
564 fn make_room_for_write(&mut self, force: bool) -> Result<()> {
569 if !force && self.mem.approx_mem_usage() < self.opt.write_buffer_size || self.mem.len() == 0
570 {
571 Ok(())
572 } else {
573 let logn = self.vset.borrow_mut().new_file_number();
575 let logf = self
576 .opt
577 .env
578 .open_writable_file(Path::new(&log_file_name(&self.path, logn)));
579
580 if let Ok(log) = logf {
581 self.log = Some(LogWriter::new(BufWriter::new(log)));
582 self.log_num = Some(logn);
583
584 let mut imm = MemTable::new(self.opt.cmp.clone());
585 mem::swap(&mut imm, &mut self.mem);
586 self.imm = Some(imm);
587 self.maybe_do_compaction()
588 } else {
589 self.vset.borrow_mut().reuse_file_number(logn);
590 Err(logf.err().unwrap())
591 }
592 }
593 }
594
595 fn maybe_do_compaction(&mut self) -> Result<()> {
597 if self.imm.is_some() {
598 self.compact_memtable()?;
599 }
600 if self.vset.borrow().needs_compaction() {
603 let c = self.vset.borrow_mut().pick_compaction();
604 if let Some(c) = c {
605 self.start_compaction(c)
606 } else {
607 Ok(())
608 }
609 } else {
610 Ok(())
611 }
612 }
613
614 pub fn compact_range(&mut self, from: &[u8], to: &[u8]) -> Result<()> {
620 let mut max_level = 1;
621 {
622 let v = self.vset.borrow().current();
623 let v = v.borrow();
624 for l in 1..NUM_LEVELS - 1 {
625 if v.overlap_in_level(l, from, to) {
626 max_level = l;
627 }
628 }
629 }
630
631 self.make_room_for_write(true)?;
633
634 let mut ifrom = LookupKey::new(from, MAX_SEQUENCE_NUMBER)
635 .internal_key()
636 .to_vec();
637 let iend = LookupKey::new_full(to, 0, ValueType::TypeDeletion);
638
639 for l in 0..max_level + 1 {
640 loop {
641 let c_ = self
642 .vset
643 .borrow_mut()
644 .compact_range(l, &ifrom, iend.internal_key());
645 if let Some(c) = c_ {
646 let ix = c.num_inputs(0) - 1;
648 ifrom.clone_from(&c.input(0, ix).largest.into());
649 self.start_compaction(c)?;
650 } else {
651 break;
652 }
653 }
654 }
655 Ok(())
656 }
657
658 fn start_compaction(&mut self, mut compaction: Compaction) -> Result<()> {
661 if compaction.is_trivial_move() {
662 assert_eq!(1, compaction.num_inputs(0));
663 let f = compaction.input(0, 0);
664 let num = f.num;
665 let size = f.size;
666 let level = compaction.level();
667
668 compaction.edit().delete_file(level, num);
669 compaction.edit().add_file(level + 1, f);
670
671 let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit());
672 if let Err(e) = r {
673 log!(self.opt.log, "trivial move failed: {}", e);
674 Err(e)
675 } else {
676 log!(
677 self.opt.log,
678 "Moved num={} bytes={} from L{} to L{}",
679 num,
680 size,
681 level,
682 level + 1
683 );
684 log!(
685 self.opt.log,
686 "Summary: {}",
687 self.vset.borrow().current_summary()
688 );
689 Ok(())
690 }
691 } else {
692 let smallest = if self.snaps.empty() {
693 self.vset.borrow().last_seq
694 } else {
695 self.snaps.oldest()
696 };
697 let mut state = CompactionState::new(compaction, smallest);
698 if let Err(e) = self.do_compaction_work(&mut state) {
699 state.cleanup(self.opt.env.as_ref().as_ref(), &self.path);
700 log!(self.opt.log, "Compaction work failed: {}", e);
701 }
702 self.install_compaction_results(state)?;
703 log!(
704 self.opt.log,
705 "Compaction finished: {}",
706 self.vset.borrow().current_summary()
707 );
708
709 self.delete_obsolete_files()
710 }
711 }
712
713 fn compact_memtable(&mut self) -> Result<()> {
714 assert!(self.imm.is_some());
715
716 let mut ve = VersionEdit::new();
717 let base = self.current();
718
719 let imm = self.imm.take().unwrap();
720 if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) {
721 self.imm = Some(imm);
722 return Err(e);
723 }
724 ve.set_log_num(self.log_num.unwrap_or(0));
725 self.vset.borrow_mut().log_and_apply(ve)?;
726 if let Err(e) = self.delete_obsolete_files() {
727 log!(self.opt.log, "Error deleting obsolete files: {}", e);
728 }
729 Ok(())
730 }
731
732 fn write_l0_table(
734 &mut self,
735 memt: &MemTable,
736 ve: &mut VersionEdit,
737 base: Option<&Version>,
738 ) -> Result<()> {
739 let start_ts = self.opt.env.micros();
740 let num = self.vset.borrow_mut().new_file_number();
741 log!(self.opt.log, "Start write of L0 table {:06}", num);
742 let fmd = build_table(&self.path, &self.opt, memt.iter(), num)?;
743 log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
744
745 if fmd.size == 0 {
747 self.vset.borrow_mut().reuse_file_number(num);
748 return Ok(());
749 }
750
751 let cache_result = self.cache.borrow_mut().get_table(num);
752 if let Err(e) = cache_result {
753 log!(
754 self.opt.log,
755 "L0 table {:06} not returned by cache: {}",
756 num,
757 e
758 );
759 let _ = self
760 .opt
761 .env
762 .delete(Path::new(&table_file_name(&self.path, num)));
763 return Err(e);
764 }
765
766 let stats = CompactionStats {
767 micros: self.opt.env.micros() - start_ts,
768 written: fmd.size,
769 ..Default::default()
770 };
771
772 let mut level = 0;
773 if let Some(b) = base {
774 level = b.pick_memtable_output_level(
775 parse_internal_key(&fmd.smallest).2,
776 parse_internal_key(&fmd.largest).2,
777 );
778 }
779
780 self.add_stats(level, stats);
781 ve.add_file(level, fmd);
782
783 Ok(())
784 }
785
786 fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
787 {
788 let current = self.vset.borrow().current();
789 assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
790 assert!(cs.builder.is_none());
791 }
792 let start_ts = self.opt.env.micros();
793 log!(
794 self.opt.log,
795 "Compacting {} files at L{} and {} files at L{}",
796 cs.compaction.num_inputs(0),
797 cs.compaction.level(),
798 cs.compaction.num_inputs(1),
799 cs.compaction.level() + 1
800 );
801
802 let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
803 input.seek_to_first();
804
805 let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
806
807 let mut have_ukey = false;
808 let mut current_ukey = vec![];
809
810 while input.valid() {
811 let (key_bytes, val_bytes) = input.current().expect("Iterator should be valid here");
814
815 if cs.compaction.should_stop_before(&key_bytes) && cs.builder.is_some() {
816 self.finish_compaction_output(cs)?;
817 }
818 let (ktyp, seq, ukey) = parse_internal_key(&key_bytes);
819 if seq == 0 {
820 log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key_bytes);
822 last_seq_for_key = MAX_SEQUENCE_NUMBER;
823 have_ukey = false;
824 current_ukey.clear();
825 input.advance();
826 continue;
827 }
828
829 if !have_ukey || self.opt.cmp.cmp(ukey, ¤t_ukey) != Ordering::Equal {
830 current_ukey.clear();
832 current_ukey.extend_from_slice(ukey);
833 have_ukey = true;
834 last_seq_for_key = MAX_SEQUENCE_NUMBER;
835 }
836
837 if last_seq_for_key <= cs.smallest_seq {
839 last_seq_for_key = seq;
840 input.advance();
841 continue;
842 }
843 if ktyp == ValueType::TypeDeletion
846 && seq <= cs.smallest_seq
847 && cs.compaction.is_base_level_for(ukey)
848 {
849 last_seq_for_key = seq;
850 input.advance();
851 continue;
852 }
853
854 last_seq_for_key = seq;
855
856 if cs.builder.is_none() {
857 let fnum = self.vset.borrow_mut().new_file_number();
858 let fmd = FileMetaData {
859 num: fnum,
860 ..Default::default()
861 };
862
863 let fname = table_file_name(&self.path, fnum);
864 let f = self.opt.env.open_writable_file(Path::new(&fname))?;
865 let f = Box::new(BufWriter::new(f));
866 cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
867 cs.outputs.push(fmd);
868 }
869 if cs.builder.as_ref().unwrap().entries() == 0 {
870 cs.current_output().smallest.clone_from(&key_bytes);
871 }
872 cs.current_output().largest.clone_from(&key_bytes);
873 cs.builder.as_mut().unwrap().add(&key_bytes, &val_bytes)?;
874 if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
876 self.finish_compaction_output(cs)?;
877 }
878
879 input.advance();
880 }
881
882 if cs.builder.is_some() {
883 self.finish_compaction_output(cs)?;
884 }
885
886 let mut stats = CompactionStats {
887 micros: self.opt.env.micros() - start_ts,
888 ..Default::default()
889 };
890 for parent in 0..2 {
891 for inp in 0..cs.compaction.num_inputs(parent) {
892 stats.read += cs.compaction.input(parent, inp).size;
893 }
894 }
895 for output in &cs.outputs {
896 stats.written += output.size;
897 }
898 self.cstats[cs.compaction.level()].add(stats);
899 Ok(())
900 }
901
902 fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
903 assert!(cs.builder.is_some());
904 let output_num = cs.current_output().num;
905 assert!(output_num > 0);
906
907 let b = cs.builder.take().unwrap();
911 let entries = b.entries();
912 let bytes = b.finish()?;
913 cs.total_bytes += bytes;
914
915 cs.current_output().size = bytes;
916
917 if entries > 0 {
918 let r = self.cache.borrow_mut().get_table(output_num);
921 if let Err(e) = r {
922 log!(self.opt.log, "New table can't be read: {}", e);
923 return Err(e);
924 }
925 log!(
926 self.opt.log,
927 "New table num={}: keys={} size={}",
928 output_num,
929 entries,
930 bytes
931 );
932 }
933 Ok(())
934 }
935
936 fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> {
937 log!(
938 self.opt.log,
939 "Compacted {} L{} files + {} L{} files => {}B",
940 cs.compaction.num_inputs(0),
941 cs.compaction.level(),
942 cs.compaction.num_inputs(1),
943 cs.compaction.level() + 1,
944 cs.total_bytes
945 );
946 cs.compaction.add_input_deletions();
947 let level = cs.compaction.level();
948 for output in &cs.outputs {
949 cs.compaction.edit().add_file(level + 1, output.clone());
950 }
951 self.vset
952 .borrow_mut()
953 .log_and_apply(cs.compaction.into_edit())
954 }
955}
956
957impl Drop for DB {
958 fn drop(&mut self) {
959 self.flush().ok();
960 let _ = self.release_lock();
961 }
962}
963
964struct CompactionState {
965 compaction: Compaction,
966 smallest_seq: SequenceNumber,
967 outputs: Vec<FileMetaData>,
968 builder: Option<TableBuilder<Box<dyn Write>>>,
969 total_bytes: usize,
970}
971
972impl CompactionState {
973 fn new(c: Compaction, smallest: SequenceNumber) -> CompactionState {
974 CompactionState {
975 compaction: c,
976 smallest_seq: smallest,
977 outputs: vec![],
978 builder: None,
979 total_bytes: 0,
980 }
981 }
982
983 fn current_output(&mut self) -> &mut FileMetaData {
984 let len = self.outputs.len();
985 &mut self.outputs[len - 1]
986 }
987
988 fn cleanup<P: AsRef<Path>>(&mut self, env: &dyn Env, name: P) {
990 for o in self.outputs.drain(..) {
991 let name = table_file_name(name.as_ref(), o.num);
992 let _ = env.delete(&name);
993 }
994 }
995}
996
997#[derive(Debug, Default)]
998struct CompactionStats {
999 micros: u64,
1000 read: usize,
1001 written: usize,
1002}
1003
1004impl CompactionStats {
1005 fn add(&mut self, cs: CompactionStats) {
1006 self.micros += cs.micros;
1007 self.read += cs.read;
1008 self.written += cs.written;
1009 }
1010}
1011
1012pub fn build_table<I: LdbIterator, P: AsRef<Path>>(
1013 dbname: P,
1014 opt: &Options,
1015 mut from: I,
1016 num: FileNum,
1017) -> Result<FileMetaData> {
1018 from.reset();
1019 let filename = table_file_name(dbname.as_ref(), num);
1020
1021 let (mut kbuf, mut vbuf) = (vec![], vec![]);
1022 let mut firstkey = None;
1023 let r = (|| -> Result<()> {
1029 let f = opt.env.open_writable_file(Path::new(&filename))?;
1030 let f = BufWriter::new(f);
1031 let mut builder = TableBuilder::new(opt.clone(), f);
1032 while from.advance() {
1033 if let Some((key_bytes, val_bytes)) = from.current() {
1034 kbuf = key_bytes.to_vec();
1035 vbuf = val_bytes.to_vec();
1036 if firstkey.is_none() {
1037 firstkey = Some(kbuf.clone());
1038 }
1039 builder.add(&kbuf, &vbuf)?;
1040 } else {
1041 panic!("Iterator should be valid here");
1042 }
1043 }
1044 builder.finish()?;
1045 Ok(())
1046 })();
1047
1048 if let Err(e) = r {
1049 let _ = opt.env.delete(Path::new(&filename));
1050 return Err(e);
1051 }
1052
1053 let mut md = FileMetaData::default();
1054 match firstkey {
1055 None => {
1056 let _ = opt.env.delete(Path::new(&filename));
1057 }
1058 Some(key) => {
1059 md.num = num;
1060 md.size = opt.env.size_of(Path::new(&filename))?;
1061 md.smallest = key.into();
1062 md.largest = kbuf.into();
1063 }
1064 }
1065 Ok(md)
1066}
1067
1068fn log_file_name(db: &Path, num: FileNum) -> PathBuf {
1069 db.join(format!("{:06}.log", num))
1070}
1071
1072fn lock_file_name(db: &Path) -> PathBuf {
1073 db.join("LOCK")
1074}
1075
1076fn open_info_log<E: Env + ?Sized, P: AsRef<Path>>(env: &E, db: P) -> Logger {
1079 let db = db.as_ref();
1080 let logfilename = db.join("LOG");
1081 let oldlogfilename = db.join("LOG.old");
1082 let _ = env.mkdir(Path::new(db));
1083 if let Ok(e) = env.exists(Path::new(&logfilename)) {
1084 if e {
1085 let _ = env.rename(Path::new(&logfilename), Path::new(&oldlogfilename));
1086 }
1087 }
1088 if let Ok(w) = env.open_writable_file(Path::new(&logfilename)) {
1089 Logger(w)
1090 } else {
1091 Logger(Box::new(io::sink()))
1092 }
1093}
1094
1095#[cfg(test)]
1096pub mod testutil {
1097 use super::*;
1098
1099 use crate::version::testutil::make_version;
1100
1101 pub fn build_db() -> (DB, Options) {
1103 let name = "db";
1104 let (v, mut opt) = make_version();
1105 opt.reuse_logs = false;
1106 opt.reuse_manifest = false;
1107 let mut ve = VersionEdit::new();
1108 ve.set_comparator_name(opt.cmp.id());
1109 ve.set_log_num(0);
1110 ve.set_next_file(11);
1112 ve.set_last_seq(30);
1114
1115 for l in 0..NUM_LEVELS {
1116 for f in &v.files[l] {
1117 ve.add_file(l, f.borrow().clone());
1118 }
1119 }
1120
1121 let manifest = manifest_file_name(name, 10);
1122 let manifest_file = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1123 let mut lw = LogWriter::new(manifest_file);
1124 lw.add_record(&ve.encode()).unwrap();
1125 lw.flush().unwrap();
1126 set_current_file(opt.env.as_ref().as_ref(), name, 10).unwrap();
1127
1128 (DB::open(name, opt.clone()).unwrap(), opt)
1129 }
1130
1131 pub fn set_file_to_compact(db: &mut DB, num: FileNum) {
1133 let v = db.current();
1134 let mut v = v.borrow_mut();
1135
1136 let mut ftc = None;
1137 for l in 0..NUM_LEVELS {
1138 for f in &v.files[l] {
1139 if f.borrow().num == num {
1140 ftc = Some((f.clone(), l));
1141 }
1142 }
1143 }
1144 if let Some((f, l)) = ftc {
1145 v.file_to_compact = Some(f);
1146 v.file_to_compact_lvl = l;
1147 } else {
1148 panic!("file number not found");
1149 }
1150 }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::testutil::{build_db, set_file_to_compact};
1156 use super::*;
1157
1158 use crate::error::Status;
1159 use crate::key_types::LookupKey;
1160 use crate::mem_env::MemEnv;
1161 use crate::options;
1162 use crate::test_util::LdbIteratorIter;
1163 use crate::version::testutil::make_version;
1164
1165 #[test]
1166 fn test_db_impl_open_info_log() {
1167 let e = MemEnv::new();
1168 {
1169 let l = Some(share(open_info_log(&e, "abc")));
1170 assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1171 log!(l, "hello {}", "world");
1172 assert_eq!(12, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1173 }
1174 {
1175 let l = Some(share(open_info_log(&e, "abc")));
1176 assert!(e.exists(&Path::new("abc").join("LOG.old")).unwrap());
1177 assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1178 assert_eq!(12, e.size_of(&Path::new("abc").join("LOG.old")).unwrap());
1179 assert_eq!(0, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1180 log!(l, "something else");
1181 log!(l, "and another {}", 1);
1182
1183 let mut s = String::new();
1184 let mut r = e
1185 .open_sequential_file(&Path::new("abc").join("LOG"))
1186 .unwrap();
1187 r.read_to_string(&mut s).unwrap();
1188 assert_eq!("something else\nand another 1\n", &s);
1189 }
1190 }
1191
1192 fn build_memtable() -> MemTable {
1193 let mut mt = MemTable::new(options::for_test().cmp);
1194 let mut i = 1;
1195 for k in ["abc", "def", "ghi", "jkl", "mno", "aabc", "test123"].iter() {
1196 mt.add(i, ValueType::TypeValue, k.as_bytes(), b"looooongval");
1197 i += 1;
1198 }
1199 mt
1200 }
1201
1202 #[test]
1203 fn test_db_impl_init() {
1204 let opt = options::for_test();
1206 let env = opt.env.clone();
1207
1208 {
1212 let mut opt = opt.clone();
1213 opt.reuse_manifest = false;
1214 let _ = DB::open("otherdb", opt.clone()).unwrap();
1215
1216 eprintln!(
1217 "children after: {:?}",
1218 env.children(Path::new("otherdb")).unwrap()
1219 );
1220 assert!(env.exists(&Path::new("otherdb").join("CURRENT")).unwrap());
1221 assert!(!env
1223 .exists(&Path::new("otherdb").join("MANIFEST-000001"))
1224 .unwrap());
1225 assert!(env
1226 .exists(&Path::new("otherdb").join("MANIFEST-000002"))
1227 .unwrap());
1228 assert!(env
1229 .exists(&Path::new("otherdb").join("000003.log"))
1230 .unwrap());
1231 }
1232
1233 {
1234 let mut opt = opt.clone();
1235 opt.reuse_manifest = true;
1236 let mut db = DB::open("db", opt.clone()).unwrap();
1237
1238 eprintln!(
1239 "children after: {:?}",
1240 env.children(&Path::new("db").join("")).unwrap()
1241 );
1242 assert!(env.exists(&Path::new("db").join("CURRENT")).unwrap());
1243 assert!(env
1245 .exists(&Path::new("db").join("MANIFEST-000001"))
1246 .unwrap());
1247 assert!(env.exists(&Path::new("db").join("LOCK")).unwrap());
1248 assert!(env.exists(&Path::new("db").join("000003.log")).unwrap());
1249
1250 db.put(b"abc", b"def").unwrap();
1251 db.put(b"abd", b"def").unwrap();
1252 }
1253
1254 {
1255 eprintln!(
1256 "children before: {:?}",
1257 env.children(&Path::new("db").join("")).unwrap()
1258 );
1259 let mut opt = opt.clone();
1260 opt.reuse_manifest = false;
1261 opt.reuse_logs = false;
1262 let mut db = DB::open("db", opt.clone()).unwrap();
1263
1264 eprintln!(
1265 "children after: {:?}",
1266 env.children(&Path::new("db").join("")).unwrap()
1267 );
1268 assert!(!env
1270 .exists(&Path::new("db").join("MANIFEST-000001"))
1271 .unwrap());
1272 assert!(env
1274 .exists(&Path::new("db").join("MANIFEST-000002"))
1275 .unwrap());
1276 assert!(!env.exists(&Path::new("db").join("000003.log")).unwrap());
1278 assert!(env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1280 assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1281 let current = db.current();
1283 log!(opt.log, "files: {:?}", current.borrow().files);
1284 assert_eq!(
1285 b"def",
1286 current
1287 .borrow_mut()
1288 .get(LookupKey::new(b"abc", 1).internal_key())
1289 .unwrap()
1290 .unwrap()
1291 .0
1292 .as_ref()
1293 );
1294 db.put(b"abe", b"def").unwrap();
1295 }
1296
1297 {
1298 eprintln!(
1299 "children before: {:?}",
1300 env.children(Path::new("db")).unwrap()
1301 );
1302 let mut opt = opt.clone();
1305 opt.reuse_logs = true;
1306 let db = DB::open("db", opt).unwrap();
1307
1308 eprintln!(
1309 "children after: {:?}",
1310 env.children(Path::new("db")).unwrap()
1311 );
1312 assert!(!env
1313 .exists(&Path::new("db").join("MANIFEST-000001"))
1314 .unwrap());
1315 assert!(env
1316 .exists(&Path::new("db").join("MANIFEST-000002"))
1317 .unwrap());
1318 assert!(!env
1319 .exists(&Path::new("db").join("MANIFEST-000005"))
1320 .unwrap());
1321 assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1322 assert!(!env.exists(&Path::new("db").join("000006.log")).unwrap());
1324 assert_eq!(1, db.mem.len());
1326 assert_eq!(b"def", &*db.mem.get(&LookupKey::new(b"abe", 3)).0.unwrap());
1327 }
1328 }
1329
1330 #[test]
1331 fn test_db_impl_compact_range() {
1332 let (mut db, opt) = build_db();
1333 let env = &opt.env;
1334
1335 eprintln!(
1336 "children before: {:?}",
1337 env.children(&Path::new("db").join("")).unwrap()
1338 );
1339 db.compact_range(b"aaa", b"dba").unwrap();
1340 eprintln!(
1341 "children after: {:?}",
1342 env.children(&Path::new("db").join("")).unwrap()
1343 );
1344
1345 assert_eq!(
1346 250,
1347 opt.env
1348 .size_of(&Path::new("db").join("000007.ldb"))
1349 .unwrap()
1350 );
1351 assert_eq!(
1352 200,
1353 opt.env
1354 .size_of(&Path::new("db").join("000008.ldb"))
1355 .unwrap()
1356 );
1357 assert_eq!(
1358 200,
1359 opt.env
1360 .size_of(&Path::new("db").join("000009.ldb"))
1361 .unwrap()
1362 );
1363 assert_eq!(
1364 435,
1365 opt.env
1366 .size_of(&Path::new("db").join("000015.ldb"))
1367 .unwrap()
1368 );
1369
1370 assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1371 assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1372 assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1373 assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1374 assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1375 assert!(!opt.env.exists(&Path::new("db").join("000013.ldb")).unwrap());
1376 assert!(!opt.env.exists(&Path::new("db").join("000014.ldb")).unwrap());
1377
1378 assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1379 assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1380 assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1381 assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1382 }
1383
1384 #[test]
1385 fn test_db_impl_compact_range_memtable() {
1386 let (mut db, opt) = build_db();
1387 let env = &opt.env;
1388
1389 db.put(b"xxx", b"123").unwrap();
1390
1391 eprintln!(
1392 "children before: {:?}",
1393 env.children(Path::new("db")).unwrap()
1394 );
1395 db.compact_range(b"aaa", b"dba").unwrap();
1396 eprintln!(
1397 "children after: {:?}",
1398 env.children(Path::new("db")).unwrap()
1399 );
1400
1401 assert_eq!(
1402 250,
1403 opt.env
1404 .size_of(&Path::new("db").join("000007.ldb"))
1405 .unwrap()
1406 );
1407 assert_eq!(
1408 200,
1409 opt.env
1410 .size_of(&Path::new("db").join("000008.ldb"))
1411 .unwrap()
1412 );
1413 assert_eq!(
1414 200,
1415 opt.env
1416 .size_of(&Path::new("db").join("000009.ldb"))
1417 .unwrap()
1418 );
1419 assert_eq!(
1420 182,
1421 opt.env
1422 .size_of(&Path::new("db").join("000014.ldb"))
1423 .unwrap()
1424 );
1425 assert_eq!(
1426 435,
1427 opt.env
1428 .size_of(&Path::new("db").join("000017.ldb"))
1429 .unwrap()
1430 );
1431
1432 assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1433 assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1434 assert!(!opt.env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1435 assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1436 assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1437 assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1438 assert!(!opt.env.exists(&Path::new("db").join("000015.ldb")).unwrap());
1439 assert!(!opt.env.exists(&Path::new("db").join("000016.ldb")).unwrap());
1440
1441 assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1442 assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1443 assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1444 assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1445 assert_eq!(b"123".to_vec(), db.get(b"xxx").unwrap());
1446 }
1447
1448 #[allow(unused_variables)]
1449 #[test]
1450 fn test_db_impl_locking() {
1451 let opt = options::for_test();
1452 let db = DB::open("db", opt.clone()).unwrap();
1453 let want_err = Status::new(
1454 StatusCode::LockError,
1455 "database lock is held by another instance",
1456 );
1457 assert_eq!(want_err, DB::open("db", opt.clone()).err().unwrap());
1458 }
1459
1460 #[test]
1461 fn test_db_impl_build_table() {
1462 let mut opt = options::for_test();
1463 opt.block_size = 128;
1464 let mt = build_memtable();
1465
1466 let f = build_table("db", &opt, mt.iter(), 123).unwrap();
1467 let path = &Path::new("db").join("000123.ldb");
1468
1469 assert_eq!(LookupKey::new(b"aabc", 6).internal_key(), &f.smallest);
1470 assert_eq!(LookupKey::new(b"test123", 7).internal_key(), &f.largest);
1471 assert_eq!(379, f.size);
1472 assert_eq!(123, f.num);
1473 assert!(opt.env.exists(path).unwrap());
1474
1475 {
1476 let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
1478 let tbl = tc.get_table(123).unwrap();
1479 assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count());
1480 }
1481
1482 {
1483 let mut buf = vec![];
1485 opt.env
1486 .open_sequential_file(path)
1487 .unwrap()
1488 .read_to_end(&mut buf)
1489 .unwrap();
1490 buf[150] += 1;
1491 opt.env
1492 .open_writable_file(path)
1493 .unwrap()
1494 .write_all(&buf)
1495 .unwrap();
1496
1497 let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
1498 let tbl = tc.get_table(123).unwrap();
1499 assert_eq!(
1501 5,
1502 LdbIteratorIter::wrap(&mut tbl.iter())
1503 .map(|v| eprintln!("{:?}", v))
1504 .count()
1505 );
1506 }
1507 }
1508
1509 #[allow(unused_variables)]
1510 #[test]
1511 fn test_db_impl_build_db_sanity() {
1512 let db = build_db().0;
1513 let env = &db.opt.env;
1514 let name = &db.name;
1515
1516 assert!(env.exists(Path::new(&log_file_name(name, 12))).unwrap());
1517 }
1518
1519 #[test]
1520 fn test_db_impl_get_from_table_with_snapshot() {
1521 let mut db = build_db().0;
1522
1523 assert_eq!(30, db.vset.borrow().last_seq);
1524
1525 db.put(b"xyy", b"123").unwrap();
1527 let old_ss = db.get_snapshot();
1528 db.put(b"xyz", b"123").unwrap();
1530 db.flush().unwrap();
1531 assert!(db.get_at(&old_ss, b"xyy").unwrap().is_some());
1532 assert!(db.get_at(&old_ss, b"xyz").unwrap().is_none());
1533
1534 assert_eq!(b"123", db.get(b"xyz").unwrap().as_ref());
1536 assert!(db.get_internal(31, b"xyy").unwrap().is_some());
1537 assert!(db.get_internal(32, b"xyy").unwrap().is_some());
1538
1539 assert!(db.get_internal(31, b"xyz").unwrap().is_none());
1540 assert!(db.get_internal(32, b"xyz").unwrap().is_some());
1541
1542 assert_eq!(b"val2", db.get(b"eab").unwrap().as_ref());
1544 assert!(db.get_internal(3, b"eab").unwrap().is_none());
1545 assert!(db.get_internal(32, b"eab").unwrap().is_some());
1546
1547 {
1548 let ss = db.get_snapshot();
1549 assert_eq!(b"val2", db.get_at(&ss, b"eab").unwrap().unwrap().as_ref());
1550 }
1551
1552 assert_eq!(b"val2", db.get(b"cab").unwrap().as_ref());
1554 }
1555
1556 #[test]
1557 fn test_db_impl_delete() {
1558 let mut db = build_db().0;
1559
1560 db.put(b"xyy", b"123").unwrap();
1561 db.put(b"xyz", b"123").unwrap();
1562
1563 assert!(db.get(b"xyy").is_some());
1564 assert!(db.get(b"gaa").is_some());
1565
1566 db.delete(b"xyy").unwrap();
1568 db.delete(b"gaa").unwrap();
1569
1570 assert!(db.get(b"xyy").is_none());
1571 assert!(db.get(b"gaa").is_none());
1572 assert!(db.get(b"xyz").is_some());
1573 }
1574
1575 #[test]
1576 fn test_db_impl_compact_single_file() {
1577 let mut db = build_db().0;
1578 set_file_to_compact(&mut db, 4);
1579 db.maybe_do_compaction().unwrap();
1580
1581 let env = &db.opt.env;
1582 let name = &db.name;
1583 assert!(!env.exists(Path::new(&table_file_name(name, 3))).unwrap());
1584 assert!(!env.exists(Path::new(&table_file_name(name, 4))).unwrap());
1585 assert!(!env.exists(Path::new(&table_file_name(name, 5))).unwrap());
1586 assert!(env.exists(Path::new(&table_file_name(name, 13))).unwrap());
1587 }
1588
1589 #[test]
1590 fn test_db_impl_compaction_trivial_move() {
1591 let mut db = DB::open("db", options::for_test()).unwrap();
1592
1593 db.put(b"abc", b"xyz").unwrap();
1594 db.put(b"ab3", b"xyz").unwrap();
1595 db.put(b"ab0", b"xyz").unwrap();
1596 db.put(b"abz", b"xyz").unwrap();
1597 assert_eq!(4, db.mem.len());
1598 let mut imm = MemTable::new(db.opt.cmp.clone());
1599 mem::swap(&mut imm, &mut db.mem);
1600 db.imm = Some(imm);
1601 db.compact_memtable().unwrap();
1602
1603 eprintln!(
1604 "children after: {:?}",
1605 db.opt.env.children(Path::new("db")).unwrap()
1606 );
1607 assert!(db
1608 .opt
1609 .env
1610 .exists(&Path::new("db").join("000004.ldb"))
1611 .unwrap());
1612
1613 {
1614 let v = db.current();
1615 let mut v = v.borrow_mut();
1616 v.file_to_compact = Some(v.files[2][0].clone());
1617 v.file_to_compact_lvl = 2;
1618 }
1619
1620 db.maybe_do_compaction().unwrap();
1621
1622 {
1623 let v = db.current();
1624 let v = v.borrow_mut();
1625 assert_eq!(1, v.files[3].len());
1626 }
1627 }
1628
1629 #[test]
1630 fn test_db_impl_memtable_compaction() {
1631 let mut opt = options::for_test();
1632 opt.write_buffer_size = 25;
1633 let mut db = DB::new("db", opt);
1634
1635 db.mem = build_memtable();
1637
1638 db.make_room_for_write(true).unwrap();
1640 assert_eq!(0, db.mem.len());
1641 assert!(db
1642 .opt
1643 .env
1644 .exists(&Path::new("db").join("000002.log"))
1645 .unwrap());
1646 assert!(db
1647 .opt
1648 .env
1649 .exists(&Path::new("db").join("000003.ldb"))
1650 .unwrap());
1651 assert_eq!(
1652 351,
1653 db.opt
1654 .env
1655 .size_of(&Path::new("db").join("000003.ldb"))
1656 .unwrap()
1657 );
1658 assert_eq!(
1659 7,
1660 LdbIteratorIter::wrap(&mut db.cache.borrow_mut().get_table(3).unwrap().iter()).count()
1661 );
1662 }
1663
1664 #[test]
1665 fn test_db_impl_compaction() {
1666 let mut db = build_db().0;
1667 let v = db.current();
1668 v.borrow_mut().compaction_score = Some(2.0);
1669 v.borrow_mut().compaction_level = Some(1);
1670
1671 db.maybe_do_compaction().unwrap();
1672
1673 assert!(!db
1674 .opt
1675 .env
1676 .exists(&Path::new("db").join("000003.ldb"))
1677 .unwrap());
1678 assert!(db
1679 .opt
1680 .env
1681 .exists(&Path::new("db").join("000013.ldb"))
1682 .unwrap());
1683 assert_eq!(
1684 345,
1685 db.opt
1686 .env
1687 .size_of(&Path::new("db").join("000013.ldb"))
1688 .unwrap()
1689 );
1690
1691 let v = db.current();
1693 assert_eq!(0, v.borrow().files[1].len());
1694 assert_eq!(2, v.borrow().files[2].len());
1695 }
1696
1697 #[test]
1698 fn test_db_impl_compaction_trivial() {
1699 let (mut v, opt) = make_version();
1700
1701 let to_compact = v.files[2][0].clone();
1702 v.file_to_compact = Some(to_compact);
1703 v.file_to_compact_lvl = 2;
1704
1705 let mut db = DB::new("db", opt.clone());
1706 db.vset.borrow_mut().add_version(v);
1707 db.vset.borrow_mut().next_file_num = 10;
1708
1709 db.maybe_do_compaction().unwrap();
1710
1711 assert!(opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1712 assert!(!opt.env.exists(&Path::new("db").join("000010.ldb")).unwrap());
1713 assert_eq!(
1714 218,
1715 opt.env
1716 .size_of(&Path::new("db").join("000006.ldb"))
1717 .unwrap()
1718 );
1719
1720 let v = db.current();
1721 assert_eq!(1, v.borrow().files[2].len());
1722 assert_eq!(3, v.borrow().files[3].len());
1723 }
1724
1725 #[test]
1726 fn test_db_impl_compaction_state_cleanup() {
1727 let env: Box<dyn Env> = Box::new(MemEnv::new());
1728 let name = "db";
1729
1730 let stuff = b"abcdefghijkl";
1731 env.open_writable_file(&Path::new("db").join("000001.ldb"))
1732 .unwrap()
1733 .write_all(stuff)
1734 .unwrap();
1735 let fmd = FileMetaData {
1736 num: 1,
1737 ..Default::default()
1738 };
1739
1740 let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None), 12);
1741 cs.outputs = vec![fmd];
1742 cs.cleanup(env.as_ref(), name);
1743
1744 assert!(!env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1745 }
1746
1747 #[test]
1748 fn test_db_impl_open_close_reopen() {
1749 let opt;
1750 {
1751 let mut db = build_db().0;
1752 opt = db.opt.clone();
1753 db.put(b"xx1", b"111").unwrap();
1754 db.put(b"xx2", b"112").unwrap();
1755 db.put(b"xx3", b"113").unwrap();
1756 db.put(b"xx4", b"114").unwrap();
1757 db.put(b"xx5", b"115").unwrap();
1758 db.delete(b"xx2").unwrap();
1759 }
1760
1761 {
1762 let mut db = DB::open("db", opt.clone()).unwrap();
1763 db.delete(b"xx5").unwrap();
1764 }
1765
1766 {
1767 let mut db = DB::open("db", opt.clone()).unwrap();
1768
1769 assert_eq!(None, db.get(b"xx5"));
1770
1771 let ss = db.get_snapshot();
1772 db.put(b"xx4", b"222").unwrap();
1773 let ss2 = db.get_snapshot();
1774
1775 assert_eq!(
1776 Some(b"113".to_vec()),
1777 db.get_at(&ss, b"xx3").unwrap().map(|b| b.to_vec())
1778 );
1779 assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1780 assert_eq!(None, db.get_at(&ss, b"xx5").unwrap());
1781
1782 assert_eq!(
1783 Some(b"114".to_vec()),
1784 db.get_at(&ss, b"xx4").unwrap().map(|b| b.to_vec())
1785 );
1786 assert_eq!(
1787 Some(b"222".to_vec()),
1788 db.get_at(&ss2, b"xx4").unwrap().map(|b| b.to_vec())
1789 );
1790 }
1791
1792 {
1793 let mut db = DB::open("db", opt).unwrap();
1794
1795 let ss = db.get_snapshot();
1796 assert_eq!(
1797 Some(b"113".to_vec()),
1798 db.get_at(&ss, b"xx3").unwrap().map(|b| b.to_vec())
1799 );
1800 assert_eq!(
1801 Some(b"222".to_vec()),
1802 db.get_at(&ss, b"xx4").unwrap().map(|b| b.to_vec())
1803 );
1804 assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1805 }
1806 }
1807
1808 #[test]
1809 fn test_drop_memtable() {
1810 let mut db = DB::open("db", options::for_test()).unwrap();
1811 let mut cnt = 0;
1812 let mut buf: Vec<u8> = Vec::with_capacity(8);
1813 let entries_per_batch = 1;
1814 let max_num = 100000;
1815 while cnt < max_num {
1816 let mut write_batch = WriteBatch::new();
1817 for i in 0..entries_per_batch {
1818 buf.clear();
1819 buf.extend_from_slice(format!("{}-{}", cnt, i).as_bytes());
1820 write_batch.put(buf.as_slice(), buf.as_slice());
1821 }
1822 db.write(write_batch, false).unwrap();
1823 cnt += entries_per_batch;
1824 }
1825 }
1826}