rusty_leveldb_arc/
db_impl.rs

1//! db_impl contains the implementation of the database interface and high-level compaction and
2//! maintenance logic.
3
4#![allow(unused_attributes)]
5
6use crate::db_iter::DBIterator;
7
8use crate::cmp::{Cmp, InternalKeyCmp};
9use crate::env::{Env, FileLock};
10use crate::error::{err, Result, StatusCode};
11use crate::filter::{BoxedFilterPolicy, InternalFilterPolicy};
12use crate::infolog::Logger;
13use crate::key_types::{parse_internal_key, InternalKey, LookupKey, ValueType};
14use crate::log::{LogReader, LogWriter};
15use crate::memtable::MemTable;
16use crate::merging_iter::MergingIter;
17use crate::options::Options;
18use crate::snapshot::{Snapshot, SnapshotList};
19use crate::table_builder::TableBuilder;
20use crate::table_cache::{table_file_name, TableCache};
21use crate::types::{
22    parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, SequenceNumber, Shared,
23    MAX_SEQUENCE_NUMBER, NUM_LEVELS,
24};
25use crate::version::Version;
26use crate::version_edit::VersionEdit;
27use crate::version_set::{
28    manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet,
29};
30use crate::write_batch::WriteBatch;
31
32use std::cmp::Ordering;
33use std::io::{self, BufWriter, Write};
34use std::mem;
35use std::ops::Drop;
36use std::path::Path;
37use std::path::PathBuf;
38use std::sync::Arc;
39
40/// DB contains the actual database implemenation. As opposed to the original, this implementation
41/// is not concurrent (yet).
42pub struct DB {
43    name: PathBuf,
44    path: PathBuf,
45    lock: Option<FileLock>,
46
47    internal_cmp: Arc<Box<dyn Cmp>>,
48    fpol: InternalFilterPolicy<BoxedFilterPolicy>,
49    opt: Options,
50
51    mem: MemTable,
52    imm: Option<MemTable>,
53
54    log: Option<LogWriter<BufWriter<Box<dyn Write>>>>,
55    log_num: Option<FileNum>,
56    cache: Shared<TableCache>,
57    vset: Shared<VersionSet>,
58    snaps: SnapshotList,
59
60    cstats: [CompactionStats; NUM_LEVELS],
61}
62
63unsafe impl Send for DB {}
64
65impl DB {
66    // RECOVERY AND INITIALIZATION //
67
68    /// new initializes a new DB object, but doesn't touch disk.
69    fn new<P: AsRef<Path>>(name: P, mut opt: Options) -> DB {
70        let name = name.as_ref();
71        if opt.log.is_none() {
72            let log = open_info_log(opt.env.as_ref().as_ref(), name);
73            opt.log = Some(share(log));
74        }
75        let path = name.canonicalize().unwrap_or(name.to_owned());
76
77        let cache = share(TableCache::new(&name, opt.clone(), opt.max_open_files - 10));
78        let vset = VersionSet::new(&name, opt.clone(), cache.clone());
79
80        DB {
81            name: name.to_owned(),
82            path,
83            lock: None,
84            internal_cmp: Arc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))),
85            fpol: InternalFilterPolicy::new(opt.filter_policy.clone()),
86
87            mem: MemTable::new(opt.cmp.clone()),
88            imm: None,
89
90            opt,
91
92            log: None,
93            log_num: None,
94            cache,
95            vset: share(vset),
96            snaps: SnapshotList::new(),
97
98            cstats: Default::default(),
99        }
100    }
101
102    fn current(&self) -> Shared<Version> {
103        self.vset.borrow().current()
104    }
105
106    /// Opens or creates a new or existing database. `name` is the name of the directory containing
107    /// the database.
108    ///
109    /// Whether a new database is created and what happens if a database exists at the given path
110    /// depends on the options set (`create_if_missing`, `error_if_exists`).
111    pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> {
112        let name = name.as_ref();
113        let mut db = DB::new(name, opt);
114        let mut ve = VersionEdit::new();
115        let save_manifest = db.recover(&mut ve)?;
116
117        // Create log file if an old one is not being reused.
118        if db.log.is_none() {
119            let lognum = db.vset.borrow_mut().new_file_number();
120            let logfile = db
121                .opt
122                .env
123                .open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
124            ve.set_log_num(lognum);
125            db.log = Some(LogWriter::new(BufWriter::new(logfile)));
126            db.log_num = Some(lognum);
127        }
128
129        if save_manifest {
130            ve.set_log_num(db.log_num.unwrap_or(0));
131            db.vset.borrow_mut().log_and_apply(ve)?;
132        }
133
134        db.delete_obsolete_files()?;
135        db.maybe_do_compaction()?;
136        Ok(db)
137    }
138
139    /// initialize_db initializes a new database.
140    fn initialize_db(&mut self) -> Result<()> {
141        let mut ve = VersionEdit::new();
142        ve.set_comparator_name(self.opt.cmp.id());
143        ve.set_log_num(0);
144        ve.set_next_file(2);
145        ve.set_last_seq(0);
146
147        {
148            let manifest = manifest_file_name(&self.path, 1);
149            let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?;
150            let mut lw = LogWriter::new(manifest_file);
151            lw.add_record(&ve.encode())?;
152            lw.flush()?;
153        }
154        set_current_file(&self.opt.env, &self.path, 1)
155    }
156
157    /// recover recovers from the existing state on disk. If the wrapped result is `true`, then
158    /// log_and_apply() should be called after recovery has finished.
159    fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> {
160        if self.opt.error_if_exists && self.opt.env.exists(&self.path.as_ref()).unwrap_or(false) {
161            return err(StatusCode::AlreadyExists, "database already exists");
162        }
163
164        let _ = self.opt.env.mkdir(Path::new(&self.path));
165        self.acquire_lock()?;
166
167        if let Err(e) = read_current_file(&self.opt.env, &self.path) {
168            if e.code == StatusCode::NotFound && self.opt.create_if_missing {
169                self.initialize_db()?;
170            } else {
171                return err(
172                    StatusCode::InvalidArgument,
173                    "database does not exist and create_if_missing is false",
174                );
175            }
176        }
177
178        // If save_manifest is true, we should log_and_apply() later in order to write the new
179        // manifest.
180        let mut save_manifest = self.vset.borrow_mut().recover()?;
181
182        // Recover from all log files not in the descriptor.
183        let mut max_seq = 0;
184        let filenames = self.opt.env.children(&self.path)?;
185        let mut expected = self.vset.borrow().live_files();
186        let mut log_files = vec![];
187
188        for file in &filenames {
189            match parse_file_name(&file) {
190                Ok((num, typ)) => {
191                    expected.remove(&num);
192                    if typ == FileType::Log
193                        && (num >= self.vset.borrow().log_num
194                            || num == self.vset.borrow().prev_log_num)
195                    {
196                        log_files.push(num);
197                    }
198                }
199                Err(e) => return Err(e.annotate(format!("While parsing {:?}", file))),
200            }
201        }
202        if !expected.is_empty() {
203            log!(self.opt.log, "Missing at least these files: {:?}", expected);
204            return err(StatusCode::Corruption, "missing live files (see log)");
205        }
206
207        log_files.sort();
208        for i in 0..log_files.len() {
209            let (save_manifest_, max_seq_) =
210                self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?;
211            if save_manifest_ {
212                save_manifest = true;
213            }
214            if max_seq_ > max_seq {
215                max_seq = max_seq_;
216            }
217            self.vset.borrow_mut().mark_file_number_used(log_files[i]);
218        }
219
220        if self.vset.borrow().last_seq < max_seq {
221            self.vset.borrow_mut().last_seq = max_seq;
222        }
223
224        Ok(save_manifest)
225    }
226
227    /// recover_log_file reads a single log file into a memtable, writing new L0 tables if
228    /// necessary. If is_last is true, it checks whether the log file can be reused, and sets up
229    /// the database's logging handles appropriately if that's the case.
230    fn recover_log_file(
231        &mut self,
232        log_num: FileNum,
233        is_last: bool,
234        ve: &mut VersionEdit,
235    ) -> Result<(bool, SequenceNumber)> {
236        let filename = log_file_name(&self.path, log_num);
237        let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?;
238        // Use the user-supplied comparator; it will be wrapped inside a MemtableKeyCmp.
239        let cmp: Arc<Box<dyn Cmp>> = self.opt.cmp.clone();
240
241        let mut logreader = LogReader::new(
242            logfile, // checksum=
243            true,
244        );
245        log!(self.opt.log, "Recovering log file {:?}", filename);
246        let mut scratch = vec![];
247        let mut mem = MemTable::new(cmp.clone());
248        let mut batch = WriteBatch::new();
249
250        let mut compactions = 0;
251        let mut max_seq = 0;
252        let mut save_manifest = false;
253
254        while let Ok(len) = logreader.read(&mut scratch) {
255            if len == 0 {
256                break;
257            }
258            if len < 12 {
259                log!(
260                    self.opt.log,
261                    "corruption in log file {:06}: record shorter than 12B",
262                    log_num
263                );
264                continue;
265            }
266
267            batch.set_contents(&scratch);
268            batch.insert_into_memtable(batch.sequence(), &mut mem);
269
270            let last_seq = batch.sequence() + batch.count() as u64 - 1;
271            if last_seq > max_seq {
272                max_seq = last_seq
273            }
274            if mem.approx_mem_usage() > self.opt.write_buffer_size {
275                compactions += 1;
276                self.write_l0_table(&mem, ve, None)?;
277                save_manifest = true;
278                mem = MemTable::new(cmp.clone());
279            }
280            batch.clear();
281        }
282
283        // Check if we can reuse the last log file.
284        if self.opt.reuse_logs && is_last && compactions == 0 {
285            assert!(self.log.is_none());
286            log!(self.opt.log, "reusing log file {:?}", filename);
287            let oldsize = self.opt.env.size_of(Path::new(&filename))?;
288            let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
289            let lw = LogWriter::new_with_off(BufWriter::new(oldfile), oldsize);
290            self.log = Some(lw);
291            self.log_num = Some(log_num);
292            self.mem = mem;
293        } else if mem.len() > 0 {
294            // Log is not reused, so write out the accumulated memtable.
295            save_manifest = true;
296            self.write_l0_table(&mem, ve, None)?;
297        }
298
299        Ok((save_manifest, max_seq))
300    }
301
302    /// delete_obsolete_files removes files that are no longer needed from the file system.
303    fn delete_obsolete_files(&mut self) -> Result<()> {
304        let files = self.vset.borrow().live_files();
305        let filenames = self.opt.env.children(Path::new(&self.path))?;
306        for name in filenames {
307            if let Ok((num, typ)) = parse_file_name(&name) {
308                match typ {
309                    FileType::Log => {
310                        if num >= self.vset.borrow().log_num {
311                            continue;
312                        }
313                    }
314                    FileType::Descriptor => {
315                        if num >= self.vset.borrow().manifest_num {
316                            continue;
317                        }
318                    }
319                    FileType::Table => {
320                        if files.contains(&num) {
321                            continue;
322                        }
323                    }
324                    // NOTE: In this non-concurrent implementation, we likely never find temp
325                    // files.
326                    FileType::Temp => {
327                        if files.contains(&num) {
328                            continue;
329                        }
330                    }
331                    FileType::Current | FileType::DBLock | FileType::InfoLog => continue,
332                }
333
334                // If we're here, delete this file.
335                if typ == FileType::Table {
336                    let _ = self.cache.borrow_mut().evict(num);
337                }
338                log!(self.opt.log, "Deleting file type={:?} num={}", typ, num);
339                if let Err(e) = self.opt.env.delete(&self.path.join(&name)) {
340                    log!(self.opt.log, "Deleting file num={} failed: {}", num, e);
341                }
342            }
343        }
344        Ok(())
345    }
346
347    /// acquire_lock acquires the lock file.
348    fn acquire_lock(&mut self) -> Result<()> {
349        let lock_r = self.opt.env.lock(Path::new(&lock_file_name(&self.path)));
350        match lock_r {
351            Ok(lockfile) => {
352                self.lock = Some(lockfile);
353                Ok(())
354            }
355            Err(ref e) if e.code == StatusCode::LockError => err(
356                StatusCode::LockError,
357                "database lock is held by another instance",
358            ),
359            Err(e) => Err(e),
360        }
361    }
362
363    /// release_lock releases the lock file, if it's currently held.
364    fn release_lock(&mut self) -> Result<()> {
365        if let Some(l) = self.lock.take() {
366            self.opt.env.unlock(l)
367        } else {
368            Ok(())
369        }
370    }
371
372    /// Flush data to disk and release lock.
373    pub fn close(&mut self) -> Result<()> {
374        self.flush()?;
375        self.release_lock()?;
376        Ok(())
377    }
378}
379
380impl DB {
381    // WRITE //
382
383    /// Adds a single entry. It's a short, non-synchronous, form of `write()`; in order to make
384    /// sure that the written entry is on disk, call `flush()` afterwards.
385    pub fn put(&mut self, k: &[u8], v: &[u8]) -> Result<()> {
386        let mut wb = WriteBatch::new();
387        wb.put(k, v);
388        self.write(wb, false)
389    }
390
391    /// Deletes a single entry. Like with `put()`, you can call `flush()` to guarantee that
392    /// the operation made it to disk.
393    pub fn delete(&mut self, k: &[u8]) -> Result<()> {
394        let mut wb = WriteBatch::new();
395        wb.delete(k);
396        self.write(wb, false)
397    }
398
399    /// Writes an entire WriteBatch. `sync` determines whether the write should be flushed to
400    /// disk.
401    pub fn write(&mut self, batch: WriteBatch, sync: bool) -> Result<()> {
402        assert!(self.log.is_some());
403
404        self.make_room_for_write(false)?;
405
406        let entries = batch.count() as u64;
407        let log = self.log.as_mut().unwrap();
408        let next = self.vset.borrow().last_seq + 1;
409
410        batch.insert_into_memtable(next, &mut self.mem);
411        log.add_record(&batch.encode(next))?;
412        if sync {
413            log.flush()?;
414        }
415        self.vset.borrow_mut().last_seq += entries;
416        Ok(())
417    }
418
419    /// flush makes sure that all pending changes (e.g. from put()) are stored on disk.
420    pub fn flush(&mut self) -> Result<()> {
421        if let Some(ref mut log) = self.log.as_mut() {
422            log.flush()?;
423        }
424        Ok(())
425    }
426}
427
428impl DB {
429    // READ //
430
431    fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Vec<u8>>> {
432        // Using this lookup key will skip all entries with higher sequence numbers, because they
433        // will compare "Lesser" using the InternalKeyCmp
434        let lkey = LookupKey::new(key, seq);
435
436        match self.mem.get(&lkey) {
437            (Some(v), _) => return Ok(Some(v)),
438            // deleted entry
439            (None, true) => return Ok(None),
440            // not found entry
441            (None, false) => {}
442        }
443
444        if let Some(imm) = self.imm.as_ref() {
445            match imm.get(&lkey) {
446                (Some(v), _) => return Ok(Some(v)),
447                // deleted entry
448                (None, true) => return Ok(None),
449                // not found entry
450                (None, false) => {}
451            }
452        }
453
454        let mut do_compaction = false;
455        let mut result = None;
456
457        // Limiting the borrow scope of self.current.
458        {
459            let current = self.current();
460            let mut current = current.borrow_mut();
461            if let Ok(Some((v, st))) = current.get(lkey.internal_key()) {
462                if current.update_stats(st) {
463                    do_compaction = true;
464                }
465                result = Some(v)
466            }
467        }
468
469        if do_compaction {
470            if let Err(e) = self.maybe_do_compaction() {
471                log!(self.opt.log, "error while doing compaction in get: {}", e);
472            }
473        }
474        Ok(result)
475    }
476
477    /// get_at reads the value for a given key at or before snapshot. It returns Ok(None) if the
478    /// entry wasn't found, and Err(_) if an error occurred.
479    pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Vec<u8>>> {
480        self.get_internal(snapshot.sequence(), key)
481    }
482
483    /// get is a simplified version of get_at(), translating errors to None.
484    pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
485        let seq = self.vset.borrow().last_seq;
486        if let Ok(v) = self.get_internal(seq, key) {
487            v
488        } else {
489            None
490        }
491    }
492}
493
494impl DB {
495    // ITERATOR //
496
497    /// new_iter returns a DBIterator over the current state of the database. The iterator will not
498    /// return elements added to the database after its creation.
499    pub fn new_iter(&mut self) -> Result<DBIterator> {
500        let snapshot = self.get_snapshot();
501        self.new_iter_at(snapshot)
502    }
503
504    /// new_iter_at returns a DBIterator at the supplied snapshot.
505    pub fn new_iter_at(&mut self, ss: Snapshot) -> Result<DBIterator> {
506        Ok(DBIterator::new(
507            self.opt.cmp.clone(),
508            self.vset.clone(),
509            self.merge_iterators()?,
510            ss,
511        ))
512    }
513
514    /// merge_iterators produces a MergingIter merging the entries in the memtable, the immutable
515    /// memtable, and table files from all levels.
516    fn merge_iterators(&mut self) -> Result<MergingIter> {
517        let mut iters: Vec<Box<dyn LdbIterator>> = vec![];
518        if self.mem.len() > 0 {
519            iters.push(Box::new(self.mem.iter()));
520        }
521        if let Some(ref imm) = self.imm {
522            if imm.len() > 0 {
523                iters.push(Box::new(imm.iter()));
524            }
525        }
526
527        // Add iterators for table files.
528        let current = self.current();
529        let current = current.borrow();
530        iters.extend(current.new_iters()?);
531
532        Ok(MergingIter::new(self.internal_cmp.clone(), iters))
533    }
534}
535
536impl DB {
537    // SNAPSHOTS //
538
539    /// Returns a snapshot at the current state. It can be used to retrieve entries from the
540    /// database as they were at an earlier point in time.
541    pub fn get_snapshot(&mut self) -> Snapshot {
542        self.snaps.new_snapshot(self.vset.borrow().last_seq)
543    }
544}
545
546impl DB {
547    // STATISTICS //
548    fn add_stats(&mut self, level: usize, cs: CompactionStats) {
549        assert!(level < NUM_LEVELS);
550        self.cstats[level].add(cs);
551    }
552
553    /// Trigger a compaction based on where this key is located in the different levels.
554    fn record_read_sample<'a>(&mut self, k: InternalKey<'a>) {
555        let current = self.current();
556        if current.borrow_mut().record_read_sample(k) {
557            if let Err(e) = self.maybe_do_compaction() {
558                log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
559            }
560        }
561    }
562}
563
564impl DB {
565    // COMPACTIONS //
566
567    /// make_room_for_write checks if the memtable has become too large, and triggers a compaction
568    /// if it's the case.
569    fn make_room_for_write(&mut self, force: bool) -> Result<()> {
570        if !force && self.mem.approx_mem_usage() < self.opt.write_buffer_size || self.mem.len() == 0 {
571            Ok(())
572        } else {
573            // Create new memtable.
574            let logn = self.vset.borrow_mut().new_file_number();
575            let logf = self
576                .opt
577                .env
578                .open_writable_file(Path::new(&log_file_name(&self.path, logn)));
579            if logf.is_err() {
580                self.vset.borrow_mut().reuse_file_number(logn);
581                Err(logf.err().unwrap())
582            } else {
583                self.log = Some(LogWriter::new(BufWriter::new(logf.unwrap())));
584                self.log_num = Some(logn);
585
586                let mut imm = MemTable::new(self.opt.cmp.clone());
587                mem::swap(&mut imm, &mut self.mem);
588                self.imm = Some(imm);
589                self.maybe_do_compaction()
590            }
591        }
592    }
593
594    /// maybe_do_compaction starts a blocking compaction if it makes sense.
595    fn maybe_do_compaction(&mut self) -> Result<()> {
596        if self.imm.is_some() {
597            self.compact_memtable()?;
598        }
599        // Issue #34 PR #36: after compacting a memtable into an L0 file, it is possible that the
600        // L0 files need to be merged and promoted.
601        if self.vset.borrow().needs_compaction() {
602            let c = self.vset.borrow_mut().pick_compaction();
603            if let Some(c) = c {
604                self.start_compaction(c)
605            } else {
606                Ok(())
607            }
608        } else {
609            Ok(())
610        }
611    }
612
613    /// compact_range triggers an immediate compaction on the specified key range. Repeatedly
614    /// calling this without actually adding new keys is not useful.
615    ///
616    /// Compactions in general will cause the database to find entries more quickly, and take up
617    /// less space on disk.
618    pub fn compact_range(&mut self, from: &[u8], to: &[u8]) -> Result<()> {
619        let mut max_level = 1;
620        {
621            let v = self.vset.borrow().current();
622            let v = v.borrow();
623            for l in 1..NUM_LEVELS - 1 {
624                if v.overlap_in_level(l, from, to) {
625                    max_level = l;
626                }
627            }
628        }
629
630        // Compact memtable.
631        self.make_room_for_write(true)?;
632
633        let mut ifrom = LookupKey::new(from, MAX_SEQUENCE_NUMBER)
634            .internal_key()
635            .to_vec();
636        let iend = LookupKey::new_full(to, 0, ValueType::TypeDeletion);
637
638        for l in 0..max_level + 1 {
639            loop {
640                let c_ = self
641                    .vset
642                    .borrow_mut()
643                    .compact_range(l, &ifrom, iend.internal_key());
644                if let Some(c) = c_ {
645                    // Update ifrom to the largest key of the last file in this compaction.
646                    let ix = c.num_inputs(0) - 1;
647                    ifrom = c.input(0, ix).largest.clone();
648                    self.start_compaction(c)?;
649                } else {
650                    break;
651                }
652            }
653        }
654        Ok(())
655    }
656
657    /// start_compaction dispatches the different kinds of compactions depending on the current
658    /// state of the database.
659    fn start_compaction(&mut self, mut compaction: Compaction) -> Result<()> {
660        if compaction.is_trivial_move() {
661            assert_eq!(1, compaction.num_inputs(0));
662            let f = compaction.input(0, 0);
663            let num = f.num;
664            let size = f.size;
665            let level = compaction.level();
666
667            compaction.edit().delete_file(level, num);
668            compaction.edit().add_file(level + 1, f);
669
670            let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit());
671            if let Err(e) = r {
672                log!(self.opt.log, "trivial move failed: {}", e);
673                Err(e)
674            } else {
675                log!(
676                    self.opt.log,
677                    "Moved num={} bytes={} from L{} to L{}",
678                    num,
679                    size,
680                    level,
681                    level + 1
682                );
683                log!(
684                    self.opt.log,
685                    "Summary: {}",
686                    self.vset.borrow().current_summary()
687                );
688                Ok(())
689            }
690        } else {
691            let smallest = if self.snaps.empty() {
692                self.vset.borrow().last_seq
693            } else {
694                self.snaps.oldest()
695            };
696            let mut state = CompactionState::new(compaction, smallest);
697            if let Err(e) = self.do_compaction_work(&mut state) {
698                state.cleanup(&self.opt.env, &self.path);
699                log!(self.opt.log, "Compaction work failed: {}", e);
700            }
701            self.install_compaction_results(state)?;
702            log!(
703                self.opt.log,
704                "Compaction finished: {}",
705                self.vset.borrow().current_summary()
706            );
707
708            self.delete_obsolete_files()
709        }
710    }
711
712    fn compact_memtable(&mut self) -> Result<()> {
713        assert!(self.imm.is_some());
714
715        let mut ve = VersionEdit::new();
716        let base = self.current();
717
718        let imm = self.imm.take().unwrap();
719        if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) {
720            self.imm = Some(imm);
721            return Err(e);
722        }
723        ve.set_log_num(self.log_num.unwrap_or(0));
724        self.vset.borrow_mut().log_and_apply(ve)?;
725        if let Err(e) = self.delete_obsolete_files() {
726            log!(self.opt.log, "Error deleting obsolete files: {}", e);
727        }
728        Ok(())
729    }
730
731    /// write_l0_table writes the given memtable to a table file.
732    fn write_l0_table(
733        &mut self,
734        memt: &MemTable,
735        ve: &mut VersionEdit,
736        base: Option<&Version>,
737    ) -> Result<()> {
738        let start_ts = self.opt.env.micros();
739        let num = self.vset.borrow_mut().new_file_number();
740        log!(self.opt.log, "Start write of L0 table {:06}", num);
741        let fmd = build_table(&self.path, &self.opt, memt.iter(), num)?;
742        log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
743
744        // Wrote empty table.
745        if fmd.size == 0 {
746            self.vset.borrow_mut().reuse_file_number(num);
747            return Ok(());
748        }
749
750        let cache_result = self.cache.borrow_mut().get_table(num);
751        if let Err(e) = cache_result {
752            log!(
753                self.opt.log,
754                "L0 table {:06} not returned by cache: {}",
755                num,
756                e
757            );
758            let _ = self
759                .opt
760                .env
761                .delete(Path::new(&table_file_name(&self.path, num)));
762            return Err(e);
763        }
764
765        let mut stats = CompactionStats::default();
766        stats.micros = self.opt.env.micros() - start_ts;
767        stats.written = fmd.size;
768
769        let mut level = 0;
770        if let Some(b) = base {
771            level = b.pick_memtable_output_level(
772                parse_internal_key(&fmd.smallest).2,
773                parse_internal_key(&fmd.largest).2,
774            );
775        }
776
777        self.add_stats(level, stats);
778        ve.add_file(level, fmd);
779
780        Ok(())
781    }
782
783    fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
784        {
785            let current = self.vset.borrow().current();
786            assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
787            assert!(cs.builder.is_none());
788        }
789        let start_ts = self.opt.env.micros();
790        log!(
791            self.opt.log,
792            "Compacting {} files at L{} and {} files at L{}",
793            cs.compaction.num_inputs(0),
794            cs.compaction.level(),
795            cs.compaction.num_inputs(1),
796            cs.compaction.level() + 1
797        );
798
799        let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
800        input.seek_to_first();
801
802        let (mut key, mut val) = (vec![], vec![]);
803        let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
804
805        let mut have_ukey = false;
806        let mut current_ukey = vec![];
807
808        while input.valid() {
809            // TODO: Do we need to do a memtable compaction here? Probably not, in the sequential
810            // case.
811            assert!(input.current(&mut key, &mut val));
812            if cs.compaction.should_stop_before(&key) && cs.builder.is_some() {
813                self.finish_compaction_output(cs)?;
814            }
815            let (ktyp, seq, ukey) = parse_internal_key(&key);
816            if seq == 0 {
817                // Parsing failed.
818                log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key);
819                last_seq_for_key = MAX_SEQUENCE_NUMBER;
820                have_ukey = false;
821                current_ukey.clear();
822                input.advance();
823                continue;
824            }
825
826            if !have_ukey || self.opt.cmp.cmp(ukey, &current_ukey) != Ordering::Equal {
827                // First occurrence of this key.
828                current_ukey.clear();
829                current_ukey.extend_from_slice(ukey);
830                have_ukey = true;
831                last_seq_for_key = MAX_SEQUENCE_NUMBER;
832            }
833
834            // We can omit the key under the following conditions:
835            if last_seq_for_key <= cs.smallest_seq {
836                last_seq_for_key = seq;
837                input.advance();
838                continue;
839            }
840            // Entry is deletion; no older version is observable by any snapshot; and all entries
841            // in compacted levels with smaller sequence numbers will
842            if ktyp == ValueType::TypeDeletion
843                && seq <= cs.smallest_seq
844                && cs.compaction.is_base_level_for(ukey)
845            {
846                last_seq_for_key = seq;
847                input.advance();
848                continue;
849            }
850
851            last_seq_for_key = seq;
852
853            if cs.builder.is_none() {
854                let fnum = self.vset.borrow_mut().new_file_number();
855                let mut fmd = FileMetaData::default();
856                fmd.num = fnum;
857
858                let fname = table_file_name(&self.path, fnum);
859                let f = self.opt.env.open_writable_file(Path::new(&fname))?;
860                let f = Box::new(BufWriter::new(f));
861                cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
862                cs.outputs.push(fmd);
863            }
864            if cs.builder.as_ref().unwrap().entries() == 0 {
865                cs.current_output().smallest = key.clone();
866            }
867            cs.current_output().largest = key.clone();
868            cs.builder.as_mut().unwrap().add(&key, &val)?;
869            // NOTE: Adjust max file size based on level.
870            if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
871                self.finish_compaction_output(cs)?;
872            }
873
874            input.advance();
875        }
876
877        if cs.builder.is_some() {
878            self.finish_compaction_output(cs)?;
879        }
880
881        let mut stats = CompactionStats::default();
882        stats.micros = self.opt.env.micros() - start_ts;
883        for parent in 0..2 {
884            for inp in 0..cs.compaction.num_inputs(parent) {
885                stats.read += cs.compaction.input(parent, inp).size;
886            }
887        }
888        for output in &cs.outputs {
889            stats.written += output.size;
890        }
891        self.cstats[cs.compaction.level()].add(stats);
892        Ok(())
893    }
894
895    fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
896        assert!(cs.builder.is_some());
897        let output_num = cs.current_output().num;
898        assert!(output_num > 0);
899
900        // The original checks if the input iterator has an OK status. For this, we'd need to
901        // extend the LdbIterator interface though -- let's see if we can without for now.
902        // (it's not good for corruptions, in any case)
903        let b = cs.builder.take().unwrap();
904        let entries = b.entries();
905        let bytes = b.finish()?;
906        cs.total_bytes += bytes;
907
908        cs.current_output().size = bytes;
909
910        if entries > 0 {
911            // Verify that table can be used. (Separating get_table() because borrowing in an if
912            // let expression is dangerous).
913            let r = self.cache.borrow_mut().get_table(output_num);
914            if let Err(e) = r {
915                log!(self.opt.log, "New table can't be read: {}", e);
916                return Err(e);
917            }
918            log!(
919                self.opt.log,
920                "New table num={}: keys={} size={}",
921                output_num,
922                entries,
923                bytes
924            );
925        }
926        Ok(())
927    }
928
929    fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> {
930        log!(
931            self.opt.log,
932            "Compacted {} L{} files + {} L{} files => {}B",
933            cs.compaction.num_inputs(0),
934            cs.compaction.level(),
935            cs.compaction.num_inputs(1),
936            cs.compaction.level() + 1,
937            cs.total_bytes
938        );
939        cs.compaction.add_input_deletions();
940        let level = cs.compaction.level();
941        for output in &cs.outputs {
942            cs.compaction.edit().add_file(level + 1, output.clone());
943        }
944        self.vset
945            .borrow_mut()
946            .log_and_apply(cs.compaction.into_edit())
947    }
948}
949
950impl Drop for DB {
951    fn drop(&mut self) {
952        self.flush().ok();
953        let _ = self.release_lock();
954    }
955}
956
957struct CompactionState {
958    compaction: Compaction,
959    smallest_seq: SequenceNumber,
960    outputs: Vec<FileMetaData>,
961    builder: Option<TableBuilder<Box<dyn Write>>>,
962    total_bytes: usize,
963}
964
965impl CompactionState {
966    fn new(c: Compaction, smallest: SequenceNumber) -> CompactionState {
967        CompactionState {
968            compaction: c,
969            smallest_seq: smallest,
970            outputs: vec![],
971            builder: None,
972            total_bytes: 0,
973        }
974    }
975
976    fn current_output(&mut self) -> &mut FileMetaData {
977        let len = self.outputs.len();
978        &mut self.outputs[len - 1]
979    }
980
981    /// cleanup cleans up after an aborted compaction.
982    fn cleanup<P: AsRef<Path>>(&mut self, env: &Box<dyn Env>, name: P) {
983        for o in self.outputs.drain(..) {
984            let name = table_file_name(name.as_ref(), o.num);
985            let _ = env.delete(&name);
986        }
987    }
988}
989
990#[derive(Debug, Default)]
991struct CompactionStats {
992    micros: u64,
993    read: usize,
994    written: usize,
995}
996
997impl CompactionStats {
998    fn add(&mut self, cs: CompactionStats) {
999        self.micros += cs.micros;
1000        self.read += cs.read;
1001        self.written += cs.written;
1002    }
1003}
1004
1005pub fn build_table<I: LdbIterator, P: AsRef<Path>>(
1006    dbname: P,
1007    opt: &Options,
1008    mut from: I,
1009    num: FileNum,
1010) -> Result<FileMetaData> {
1011    from.reset();
1012    let filename = table_file_name(dbname.as_ref(), num);
1013
1014    let (mut kbuf, mut vbuf) = (vec![], vec![]);
1015    let mut firstkey = None;
1016    // lastkey is what remains in kbuf.
1017
1018    // Clean up file if write fails at any point.
1019    //
1020    // TODO: Replace with catch {} when available.
1021    let r = (|| -> Result<()> {
1022        let f = opt.env.open_writable_file(Path::new(&filename))?;
1023        let f = BufWriter::new(f);
1024        let mut builder = TableBuilder::new(opt.clone(), f);
1025        while from.advance() {
1026            assert!(from.current(&mut kbuf, &mut vbuf));
1027            if firstkey.is_none() {
1028                firstkey = Some(kbuf.clone());
1029            }
1030            builder.add(&kbuf, &vbuf)?;
1031        }
1032        builder.finish()?;
1033        Ok(())
1034    })();
1035
1036    if let Err(e) = r {
1037        let _ = opt.env.delete(Path::new(&filename));
1038        return Err(e);
1039    }
1040
1041    let mut md = FileMetaData::default();
1042    match firstkey {
1043        None => {
1044            let _ = opt.env.delete(Path::new(&filename));
1045        }
1046        Some(key) => {
1047            md.num = num;
1048            md.size = opt.env.size_of(Path::new(&filename))?;
1049            md.smallest = key;
1050            md.largest = kbuf;
1051        }
1052    }
1053    Ok(md)
1054}
1055
1056fn log_file_name(db: &Path, num: FileNum) -> PathBuf {
1057    db.join(format!("{:06}.log", num))
1058}
1059
1060fn lock_file_name(db: &Path) -> PathBuf {
1061    db.join("LOCK")
1062}
1063
1064/// open_info_log opens an info log file in the given database. It transparently returns a
1065/// /dev/null logger in case the open fails.
1066fn open_info_log<E: Env + ?Sized, P: AsRef<Path>>(env: &E, db: P) -> Logger {
1067    let db = db.as_ref();
1068    let logfilename = db.join("LOG");
1069    let oldlogfilename = db.join("LOG.old");
1070    let _ = env.mkdir(Path::new(db));
1071    if let Ok(e) = env.exists(Path::new(&logfilename)) {
1072        if e {
1073            let _ = env.rename(Path::new(&logfilename), Path::new(&oldlogfilename));
1074        }
1075    }
1076    if let Ok(w) = env.open_writable_file(Path::new(&logfilename)) {
1077        Logger(w)
1078    } else {
1079        Logger(Box::new(io::sink()))
1080    }
1081}
1082
1083#[cfg(test)]
1084pub mod testutil {
1085    use super::*;
1086
1087    use crate::version::testutil::make_version;
1088
1089    /// build_db creates a database filled with the tables created by make_version().
1090    pub fn build_db() -> (DB, Options) {
1091        let name = "db";
1092        let (v, mut opt) = make_version();
1093        opt.reuse_logs = false;
1094        opt.reuse_manifest = false;
1095        let mut ve = VersionEdit::new();
1096        ve.set_comparator_name(opt.cmp.id());
1097        ve.set_log_num(0);
1098        // 9 files + 1 manifest we write below.
1099        ve.set_next_file(11);
1100        // 30 entries in these tables.
1101        ve.set_last_seq(30);
1102
1103        for l in 0..NUM_LEVELS {
1104            for f in &v.files[l] {
1105                ve.add_file(l, f.borrow().clone());
1106            }
1107        }
1108
1109        let manifest = manifest_file_name(name, 10);
1110        let manifest_file = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1111        let mut lw = LogWriter::new(manifest_file);
1112        lw.add_record(&ve.encode()).unwrap();
1113        lw.flush().unwrap();
1114        set_current_file(&opt.env, name, 10).unwrap();
1115
1116        (DB::open(name, opt.clone()).unwrap(), opt)
1117    }
1118
1119    /// set_file_to_compact ensures that the specified table file will be compacted next.
1120    pub fn set_file_to_compact(db: &mut DB, num: FileNum) {
1121        let v = db.current();
1122        let mut v = v.borrow_mut();
1123
1124        let mut ftc = None;
1125        for l in 0..NUM_LEVELS {
1126            for f in &v.files[l] {
1127                if f.borrow().num == num {
1128                    ftc = Some((f.clone(), l));
1129                }
1130            }
1131        }
1132        if let Some((f, l)) = ftc {
1133            v.file_to_compact = Some(f);
1134            v.file_to_compact_lvl = l;
1135        } else {
1136            panic!("file number not found");
1137        }
1138    }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143    use super::testutil::{build_db, set_file_to_compact};
1144    use super::*;
1145
1146    use crate::error::Status;
1147    use crate::key_types::LookupKey;
1148    use crate::mem_env::MemEnv;
1149    use crate::options;
1150    use crate::test_util::LdbIteratorIter;
1151    use crate::version::testutil::make_version;
1152
1153    #[test]
1154    fn test_db_impl_open_info_log() {
1155        let e = MemEnv::new();
1156        {
1157            let l = Some(share(open_info_log(&e, "abc")));
1158            assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1159            log!(l, "hello {}", "world");
1160            assert_eq!(12, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1161        }
1162        {
1163            let l = Some(share(open_info_log(&e, "abc")));
1164            assert!(e.exists(&Path::new("abc").join("LOG.old")).unwrap());
1165            assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1166            assert_eq!(12, e.size_of(&Path::new("abc").join("LOG.old")).unwrap());
1167            assert_eq!(0, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1168            log!(l, "something else");
1169            log!(l, "and another {}", 1);
1170
1171            let mut s = String::new();
1172            let mut r = e
1173                .open_sequential_file(&Path::new("abc").join("LOG"))
1174                .unwrap();
1175            r.read_to_string(&mut s).unwrap();
1176            assert_eq!("something else\nand another 1\n", &s);
1177        }
1178    }
1179
1180    fn build_memtable() -> MemTable {
1181        let mut mt = MemTable::new(options::for_test().cmp);
1182        let mut i = 1;
1183        for k in ["abc", "def", "ghi", "jkl", "mno", "aabc", "test123"].iter() {
1184            mt.add(
1185                i,
1186                ValueType::TypeValue,
1187                k.as_bytes(),
1188                "looooongval".as_bytes(),
1189            );
1190            i += 1;
1191        }
1192        mt
1193    }
1194
1195    #[test]
1196    fn test_db_impl_init() {
1197        // A sanity check for recovery and basic persistence.
1198        let opt = options::for_test();
1199        let env = opt.env.clone();
1200
1201        // Several test cases with different options follow. The printlns can eventually be
1202        // removed.
1203
1204        {
1205            let mut opt = opt.clone();
1206            opt.reuse_manifest = false;
1207            let _ = DB::open("otherdb", opt.clone()).unwrap();
1208
1209            eprintln!(
1210                "children after: {:?}",
1211                env.children(Path::new("otherdb")).unwrap()
1212            );
1213            assert!(env.exists(&Path::new("otherdb").join("CURRENT")).unwrap());
1214            // Database is initialized and initial manifest reused.
1215            assert!(!env
1216                .exists(&Path::new("otherdb").join("MANIFEST-000001"))
1217                .unwrap());
1218            assert!(env
1219                .exists(&Path::new("otherdb").join("MANIFEST-000002"))
1220                .unwrap());
1221            assert!(env
1222                .exists(&Path::new("otherdb").join("000003.log"))
1223                .unwrap());
1224        }
1225
1226        {
1227            let mut opt = opt.clone();
1228            opt.reuse_manifest = true;
1229            let mut db = DB::open("db", opt.clone()).unwrap();
1230
1231            eprintln!(
1232                "children after: {:?}",
1233                env.children(&Path::new("db").join("")).unwrap()
1234            );
1235            assert!(env.exists(&Path::new("db").join("CURRENT")).unwrap());
1236            // Database is initialized and initial manifest reused.
1237            assert!(env
1238                .exists(&Path::new("db").join("MANIFEST-000001"))
1239                .unwrap());
1240            assert!(env.exists(&Path::new("db").join("LOCK")).unwrap());
1241            assert!(env.exists(&Path::new("db").join("000003.log")).unwrap());
1242
1243            db.put("abc".as_bytes(), "def".as_bytes()).unwrap();
1244            db.put("abd".as_bytes(), "def".as_bytes()).unwrap();
1245        }
1246
1247        {
1248            eprintln!(
1249                "children before: {:?}",
1250                env.children(&Path::new("db").join("")).unwrap()
1251            );
1252            let mut opt = opt.clone();
1253            opt.reuse_manifest = false;
1254            opt.reuse_logs = false;
1255            let mut db = DB::open("db", opt.clone()).unwrap();
1256
1257            eprintln!(
1258                "children after: {:?}",
1259                env.children(&Path::new("db").join("")).unwrap()
1260            );
1261            // Obsolete manifest is deleted.
1262            assert!(!env
1263                .exists(&Path::new("db").join("MANIFEST-000001"))
1264                .unwrap());
1265            // New manifest is created.
1266            assert!(env
1267                .exists(&Path::new("db").join("MANIFEST-000002"))
1268                .unwrap());
1269            // Obsolete log file is deleted.
1270            assert!(!env.exists(&Path::new("db").join("000003.log")).unwrap());
1271            // New L0 table has been added.
1272            assert!(env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1273            assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1274            // Check that entry exists and is correct. Phew, long call chain!
1275            let current = db.current();
1276            log!(opt.log, "files: {:?}", current.borrow().files);
1277            assert_eq!(
1278                "def".as_bytes(),
1279                current
1280                    .borrow_mut()
1281                    .get(LookupKey::new("abc".as_bytes(), 1).internal_key())
1282                    .unwrap()
1283                    .unwrap()
1284                    .0
1285                    .as_slice()
1286            );
1287            db.put("abe".as_bytes(), "def".as_bytes()).unwrap();
1288        }
1289
1290        {
1291            eprintln!(
1292                "children before: {:?}",
1293                env.children(Path::new("db")).unwrap()
1294            );
1295            // reuse_manifest above causes the old manifest to be deleted as obsolete, but no new
1296            // manifest is written. CURRENT becomes stale.
1297            let mut opt = opt.clone();
1298            opt.reuse_logs = true;
1299            let db = DB::open("db", opt).unwrap();
1300
1301            eprintln!(
1302                "children after: {:?}",
1303                env.children(Path::new("db")).unwrap()
1304            );
1305            assert!(!env
1306                .exists(&Path::new("db").join("MANIFEST-000001"))
1307                .unwrap());
1308            assert!(env
1309                .exists(&Path::new("db").join("MANIFEST-000002"))
1310                .unwrap());
1311            assert!(!env
1312                .exists(&Path::new("db").join("MANIFEST-000005"))
1313                .unwrap());
1314            assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1315            // 000004 should be reused, no new log file should be created.
1316            assert!(!env.exists(&Path::new("db").join("000006.log")).unwrap());
1317            // Log is reused, so memtable should contain last written entry from above.
1318            assert_eq!(1, db.mem.len());
1319            assert_eq!(
1320                "def".as_bytes(),
1321                db.mem
1322                    .get(&LookupKey::new("abe".as_bytes(), 3))
1323                    .0
1324                    .unwrap()
1325                    .as_slice()
1326            );
1327        }
1328    }
1329
1330    #[test]
1331    fn test_db_impl_compact_range() {
1332        let (mut db, opt) = build_db();
1333        let env = &opt.env;
1334
1335        eprintln!(
1336            "children before: {:?}",
1337            env.children(&Path::new("db").join("")).unwrap()
1338        );
1339        db.compact_range(b"aaa", b"dba").unwrap();
1340        eprintln!(
1341            "children after: {:?}",
1342            env.children(&Path::new("db").join("")).unwrap()
1343        );
1344
1345        assert_eq!(
1346            250,
1347            opt.env
1348                .size_of(&Path::new("db").join("000007.ldb"))
1349                .unwrap()
1350        );
1351        assert_eq!(
1352            200,
1353            opt.env
1354                .size_of(&Path::new("db").join("000008.ldb"))
1355                .unwrap()
1356        );
1357        assert_eq!(
1358            200,
1359            opt.env
1360                .size_of(&Path::new("db").join("000009.ldb"))
1361                .unwrap()
1362        );
1363        assert_eq!(
1364            435,
1365            opt.env
1366                .size_of(&Path::new("db").join("000015.ldb"))
1367                .unwrap()
1368        );
1369
1370        assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1371        assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1372        assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1373        assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1374        assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1375        assert!(!opt.env.exists(&Path::new("db").join("000013.ldb")).unwrap());
1376        assert!(!opt.env.exists(&Path::new("db").join("000014.ldb")).unwrap());
1377
1378        assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1379        assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1380        assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1381        assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1382    }
1383
1384    #[test]
1385    fn test_db_impl_compact_range_memtable() {
1386        let (mut db, opt) = build_db();
1387        let env = &opt.env;
1388
1389        db.put(b"xxx", b"123").unwrap();
1390
1391        eprintln!(
1392            "children before: {:?}",
1393            env.children(Path::new("db")).unwrap()
1394        );
1395        db.compact_range(b"aaa", b"dba").unwrap();
1396        eprintln!(
1397            "children after: {:?}",
1398            env.children(Path::new("db")).unwrap()
1399        );
1400
1401        assert_eq!(
1402            250,
1403            opt.env
1404                .size_of(&Path::new("db").join("000007.ldb"))
1405                .unwrap()
1406        );
1407        assert_eq!(
1408            200,
1409            opt.env
1410                .size_of(&Path::new("db").join("000008.ldb"))
1411                .unwrap()
1412        );
1413        assert_eq!(
1414            200,
1415            opt.env
1416                .size_of(&Path::new("db").join("000009.ldb"))
1417                .unwrap()
1418        );
1419        assert_eq!(
1420            182,
1421            opt.env
1422                .size_of(&Path::new("db").join("000014.ldb"))
1423                .unwrap()
1424        );
1425        assert_eq!(
1426            435,
1427            opt.env
1428                .size_of(&Path::new("db").join("000017.ldb"))
1429                .unwrap()
1430        );
1431
1432        assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1433        assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1434        assert!(!opt.env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1435        assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1436        assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1437        assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1438        assert!(!opt.env.exists(&Path::new("db").join("000015.ldb")).unwrap());
1439        assert!(!opt.env.exists(&Path::new("db").join("000016.ldb")).unwrap());
1440
1441        assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1442        assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1443        assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1444        assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1445        assert_eq!(b"123".to_vec(), db.get(b"xxx").unwrap());
1446    }
1447
1448    #[allow(unused_variables)]
1449    #[test]
1450    fn test_db_impl_locking() {
1451        let opt = options::for_test();
1452        let db = DB::open("db", opt.clone()).unwrap();
1453        let want_err = Status::new(
1454            StatusCode::LockError,
1455            "database lock is held by another instance",
1456        );
1457        assert_eq!(want_err, DB::open("db", opt.clone()).err().unwrap());
1458    }
1459
1460    #[test]
1461    fn test_db_impl_build_table() {
1462        let mut opt = options::for_test();
1463        opt.block_size = 128;
1464        let mt = build_memtable();
1465
1466        let f = build_table("db", &opt, mt.iter(), 123).unwrap();
1467        let path = &Path::new("db").join("000123.ldb");
1468
1469        assert_eq!(
1470            LookupKey::new("aabc".as_bytes(), 6).internal_key(),
1471            f.smallest.as_slice()
1472        );
1473        assert_eq!(
1474            LookupKey::new("test123".as_bytes(), 7).internal_key(),
1475            f.largest.as_slice()
1476        );
1477        assert_eq!(379, f.size);
1478        assert_eq!(123, f.num);
1479        assert!(opt.env.exists(path).unwrap());
1480
1481        {
1482            // Read table back in.
1483            let mut tc = TableCache::new("db", opt.clone(), 100);
1484            let tbl = tc.get_table(123).unwrap();
1485            assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count());
1486        }
1487
1488        {
1489            // Corrupt table; make sure it doesn't load fully.
1490            let mut buf = vec![];
1491            opt.env
1492                .open_sequential_file(path)
1493                .unwrap()
1494                .read_to_end(&mut buf)
1495                .unwrap();
1496            buf[150] += 1;
1497            opt.env
1498                .open_writable_file(path)
1499                .unwrap()
1500                .write_all(&buf)
1501                .unwrap();
1502
1503            let mut tc = TableCache::new("db", opt.clone(), 100);
1504            let tbl = tc.get_table(123).unwrap();
1505            // The last two entries are skipped due to the corruption above.
1506            assert_eq!(
1507                5,
1508                LdbIteratorIter::wrap(&mut tbl.iter())
1509                    .map(|v| eprintln!("{:?}", v))
1510                    .count()
1511            );
1512        }
1513    }
1514
1515    #[allow(unused_variables)]
1516    #[test]
1517    fn test_db_impl_build_db_sanity() {
1518        let db = build_db().0;
1519        let env = &db.opt.env;
1520        let name = &db.name;
1521
1522        assert!(env.exists(Path::new(&log_file_name(name, 12))).unwrap());
1523    }
1524
1525    #[test]
1526    fn test_db_impl_get_from_table_with_snapshot() {
1527        let mut db = build_db().0;
1528
1529        assert_eq!(30, db.vset.borrow().last_seq);
1530
1531        // seq = 31
1532        db.put("xyy".as_bytes(), "123".as_bytes()).unwrap();
1533        let old_ss = db.get_snapshot();
1534        // seq = 32
1535        db.put("xyz".as_bytes(), "123".as_bytes()).unwrap();
1536        db.flush().unwrap();
1537        assert!(db.get_at(&old_ss, "xyy".as_bytes()).unwrap().is_some());
1538        assert!(db.get_at(&old_ss, "xyz".as_bytes()).unwrap().is_none());
1539
1540        // memtable get
1541        assert_eq!(
1542            "123".as_bytes(),
1543            db.get("xyz".as_bytes()).unwrap().as_slice()
1544        );
1545        assert!(db.get_internal(31, "xyy".as_bytes()).unwrap().is_some());
1546        assert!(db.get_internal(32, "xyy".as_bytes()).unwrap().is_some());
1547
1548        assert!(db.get_internal(31, "xyz".as_bytes()).unwrap().is_none());
1549        assert!(db.get_internal(32, "xyz".as_bytes()).unwrap().is_some());
1550
1551        // table get
1552        assert_eq!(
1553            "val2".as_bytes(),
1554            db.get("eab".as_bytes()).unwrap().as_slice()
1555        );
1556        assert!(db.get_internal(3, "eab".as_bytes()).unwrap().is_none());
1557        assert!(db.get_internal(32, "eab".as_bytes()).unwrap().is_some());
1558
1559        {
1560            let ss = db.get_snapshot();
1561            assert_eq!(
1562                "val2".as_bytes(),
1563                db.get_at(&ss, "eab".as_bytes())
1564                    .unwrap()
1565                    .unwrap()
1566                    .as_slice()
1567            );
1568        }
1569
1570        // from table.
1571        assert_eq!(
1572            "val2".as_bytes(),
1573            db.get("cab".as_bytes()).unwrap().as_slice()
1574        );
1575    }
1576
1577    #[test]
1578    fn test_db_impl_delete() {
1579        let mut db = build_db().0;
1580
1581        db.put(b"xyy", b"123").unwrap();
1582        db.put(b"xyz", b"123").unwrap();
1583
1584        assert!(db.get(b"xyy").is_some());
1585        assert!(db.get(b"gaa").is_some());
1586
1587        // Delete one memtable entry and one table entry.
1588        db.delete(b"xyy").unwrap();
1589        db.delete(b"gaa").unwrap();
1590
1591        assert!(db.get(b"xyy").is_none());
1592        assert!(db.get(b"gaa").is_none());
1593        assert!(db.get(b"xyz").is_some());
1594    }
1595
1596    #[test]
1597    fn test_db_impl_compact_single_file() {
1598        let mut db = build_db().0;
1599        set_file_to_compact(&mut db, 4);
1600        db.maybe_do_compaction().unwrap();
1601
1602        let env = &db.opt.env;
1603        let name = &db.name;
1604        assert!(!env.exists(Path::new(&table_file_name(name, 3))).unwrap());
1605        assert!(!env.exists(Path::new(&table_file_name(name, 4))).unwrap());
1606        assert!(!env.exists(Path::new(&table_file_name(name, 5))).unwrap());
1607        assert!(env.exists(Path::new(&table_file_name(name, 13))).unwrap());
1608    }
1609
1610    #[test]
1611    fn test_db_impl_compaction_trivial_move() {
1612        let mut db = DB::open("db", options::for_test()).unwrap();
1613
1614        db.put("abc".as_bytes(), "xyz".as_bytes()).unwrap();
1615        db.put("ab3".as_bytes(), "xyz".as_bytes()).unwrap();
1616        db.put("ab0".as_bytes(), "xyz".as_bytes()).unwrap();
1617        db.put("abz".as_bytes(), "xyz".as_bytes()).unwrap();
1618        assert_eq!(4, db.mem.len());
1619        let mut imm = MemTable::new(db.opt.cmp.clone());
1620        mem::swap(&mut imm, &mut db.mem);
1621        db.imm = Some(imm);
1622        db.compact_memtable().unwrap();
1623
1624        eprintln!(
1625            "children after: {:?}",
1626            db.opt.env.children(Path::new("db")).unwrap()
1627        );
1628        assert!(db
1629            .opt
1630            .env
1631            .exists(&Path::new("db").join("000004.ldb"))
1632            .unwrap());
1633
1634        {
1635            let v = db.current();
1636            let mut v = v.borrow_mut();
1637            v.file_to_compact = Some(v.files[2][0].clone());
1638            v.file_to_compact_lvl = 2;
1639        }
1640
1641        db.maybe_do_compaction().unwrap();
1642
1643        {
1644            let v = db.current();
1645            let v = v.borrow_mut();
1646            assert_eq!(1, v.files[3].len());
1647        }
1648    }
1649
1650    #[test]
1651    fn test_db_impl_memtable_compaction() {
1652        let mut opt = options::for_test();
1653        opt.write_buffer_size = 25;
1654        let mut db = DB::new("db", opt);
1655
1656        // Fill up memtable.
1657        db.mem = build_memtable();
1658
1659        // Trigger memtable compaction.
1660        db.make_room_for_write(true).unwrap();
1661        assert_eq!(0, db.mem.len());
1662        assert!(db
1663            .opt
1664            .env
1665            .exists(&Path::new("db").join("000002.log"))
1666            .unwrap());
1667        assert!(db
1668            .opt
1669            .env
1670            .exists(&Path::new("db").join("000003.ldb"))
1671            .unwrap());
1672        assert_eq!(
1673            351,
1674            db.opt
1675                .env
1676                .size_of(&Path::new("db").join("000003.ldb"))
1677                .unwrap()
1678        );
1679        assert_eq!(
1680            7,
1681            LdbIteratorIter::wrap(&mut db.cache.borrow_mut().get_table(3).unwrap().iter()).count()
1682        );
1683    }
1684
1685    #[test]
1686    fn test_db_impl_compaction() {
1687        let mut db = build_db().0;
1688        let v = db.current();
1689        v.borrow_mut().compaction_score = Some(2.0);
1690        v.borrow_mut().compaction_level = Some(1);
1691
1692        db.maybe_do_compaction().unwrap();
1693
1694        assert!(!db
1695            .opt
1696            .env
1697            .exists(&Path::new("db").join("000003.ldb"))
1698            .unwrap());
1699        assert!(db
1700            .opt
1701            .env
1702            .exists(&Path::new("db").join("000013.ldb"))
1703            .unwrap());
1704        assert_eq!(
1705            345,
1706            db.opt
1707                .env
1708                .size_of(&Path::new("db").join("000013.ldb"))
1709                .unwrap()
1710        );
1711
1712        // New current version.
1713        let v = db.current();
1714        assert_eq!(0, v.borrow().files[1].len());
1715        assert_eq!(2, v.borrow().files[2].len());
1716    }
1717
1718    #[test]
1719    fn test_db_impl_compaction_trivial() {
1720        let (mut v, opt) = make_version();
1721
1722        let to_compact = v.files[2][0].clone();
1723        v.file_to_compact = Some(to_compact);
1724        v.file_to_compact_lvl = 2;
1725
1726        let mut db = DB::new("db", opt.clone());
1727        db.vset.borrow_mut().add_version(v);
1728        db.vset.borrow_mut().next_file_num = 10;
1729
1730        db.maybe_do_compaction().unwrap();
1731
1732        assert!(opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1733        assert!(!opt.env.exists(&Path::new("db").join("000010.ldb")).unwrap());
1734        assert_eq!(
1735            218,
1736            opt.env
1737                .size_of(&Path::new("db").join("000006.ldb"))
1738                .unwrap()
1739        );
1740
1741        let v = db.current();
1742        assert_eq!(1, v.borrow().files[2].len());
1743        assert_eq!(3, v.borrow().files[3].len());
1744    }
1745
1746    #[test]
1747    fn test_db_impl_compaction_state_cleanup() {
1748        let env: Box<dyn Env> = Box::new(MemEnv::new());
1749        let name = "db";
1750
1751        let stuff = "abcdefghijkl".as_bytes();
1752        env.open_writable_file(&Path::new("db").join("000001.ldb"))
1753            .unwrap()
1754            .write_all(stuff)
1755            .unwrap();
1756        let mut fmd = FileMetaData::default();
1757        fmd.num = 1;
1758
1759        let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None), 12);
1760        cs.outputs = vec![fmd];
1761        cs.cleanup(&env, name);
1762
1763        assert!(!env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1764    }
1765
1766    #[test]
1767    fn test_db_impl_open_close_reopen() {
1768        let opt;
1769        {
1770            let mut db = build_db().0;
1771            opt = db.opt.clone();
1772            db.put(b"xx1", b"111").unwrap();
1773            db.put(b"xx2", b"112").unwrap();
1774            db.put(b"xx3", b"113").unwrap();
1775            db.put(b"xx4", b"114").unwrap();
1776            db.put(b"xx5", b"115").unwrap();
1777            db.delete(b"xx2").unwrap();
1778        }
1779
1780        {
1781            let mut db = DB::open("db", opt.clone()).unwrap();
1782            db.delete(b"xx5").unwrap();
1783        }
1784
1785        {
1786            let mut db = DB::open("db", opt.clone()).unwrap();
1787
1788            assert_eq!(None, db.get(b"xx5"));
1789
1790            let ss = db.get_snapshot();
1791            db.put(b"xx4", b"222").unwrap();
1792            let ss2 = db.get_snapshot();
1793
1794            assert_eq!(Some(b"113".to_vec()), db.get_at(&ss, b"xx3").unwrap());
1795            assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1796            assert_eq!(None, db.get_at(&ss, b"xx5").unwrap());
1797
1798            assert_eq!(Some(b"114".to_vec()), db.get_at(&ss, b"xx4").unwrap());
1799            assert_eq!(Some(b"222".to_vec()), db.get_at(&ss2, b"xx4").unwrap());
1800        }
1801
1802        {
1803            let mut db = DB::open("db", opt).unwrap();
1804
1805            let ss = db.get_snapshot();
1806            assert_eq!(Some(b"113".to_vec()), db.get_at(&ss, b"xx3").unwrap());
1807            assert_eq!(Some(b"222".to_vec()), db.get_at(&ss, b"xx4").unwrap());
1808            assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1809        }
1810    }
1811
1812    #[test]
1813    fn test_drop_memtable() {
1814        let mut db = DB::open("db", options::for_test()).unwrap();
1815        let mut cnt = 0;
1816        let mut buf: Vec<u8> = Vec::with_capacity(8);
1817        let entries_per_batch = 1;
1818        let max_num = 100000;
1819        while cnt < max_num {
1820            let mut write_batch = WriteBatch::new();
1821            for i in 0..entries_per_batch {
1822                buf.clear();
1823                buf.extend_from_slice(format!("{}-{}", cnt, i).as_bytes());
1824                write_batch.put(buf.as_slice(), buf.as_slice());
1825            }
1826            db.write(write_batch, false).unwrap();
1827            cnt += entries_per_batch;
1828        }
1829    }
1830}