rusty_leveldb/
db_impl.rs

1//! db_impl contains the implementation of the database interface and high-level compaction and
2//! maintenance logic.
3
4#![allow(unused_attributes)]
5
6use crate::cache::Cache;
7use crate::db_iter::DBIterator;
8
9use crate::cmp::{Cmp, InternalKeyCmp};
10use crate::env::{Env, FileLock};
11use crate::error::{err, Result, StatusCode};
12use crate::filter::{BoxedFilterPolicy, InternalFilterPolicy};
13use crate::infolog::Logger;
14use crate::key_types::{parse_internal_key, InternalKey, LookupKey, ValueType};
15use crate::log::{LogReader, LogWriter};
16use crate::memtable::MemTable;
17use crate::merging_iter::MergingIter;
18use crate::options::Options;
19use crate::snapshot::{Snapshot, SnapshotList};
20use crate::table_builder::TableBuilder;
21use crate::table_cache::{table_file_name, TableCache};
22use crate::types::{
23    parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, SequenceNumber, Shared,
24    MAX_SEQUENCE_NUMBER, NUM_LEVELS,
25};
26use crate::version::Version;
27use crate::version_edit::VersionEdit;
28use crate::version_set::{
29    manifest_file_name, read_current_file, set_current_file, Compaction, VersionSet,
30};
31use crate::write_batch::WriteBatch;
32
33use bytes::Bytes;
34use std::cmp::Ordering;
35use std::io::{self, BufWriter, Write};
36use std::mem;
37use std::ops::Drop;
38use std::path::Path;
39use std::path::PathBuf;
40use std::rc::Rc;
41
42/// DB contains the actual database implemenation. As opposed to the original, this implementation
43/// is not concurrent (yet).
44pub struct DB {
45    name: PathBuf,
46    path: PathBuf,
47    lock: Option<FileLock>,
48
49    internal_cmp: Rc<Box<dyn Cmp>>,
50    fpol: InternalFilterPolicy<BoxedFilterPolicy>,
51    opt: Options,
52
53    mem: MemTable,
54    imm: Option<MemTable>,
55
56    log: Option<LogWriter<BufWriter<Box<dyn Write>>>>,
57    log_num: Option<FileNum>,
58    cache: Shared<TableCache>,
59    vset: Shared<VersionSet>,
60    snaps: SnapshotList,
61
62    cstats: [CompactionStats; NUM_LEVELS],
63}
64
65unsafe impl Send for DB {}
66
67impl DB {
68    // RECOVERY AND INITIALIZATION //
69
70    /// new initializes a new DB object, but doesn't touch disk.
71    fn new<P: AsRef<Path>>(name: P, mut opt: Options) -> DB {
72        let name = name.as_ref();
73        if opt.log.is_none() {
74            let log = open_info_log(opt.env.as_ref().as_ref(), name);
75            opt.log = Some(share(log));
76        }
77        let path = name.canonicalize().unwrap_or(name.to_owned());
78
79        let cache = share(TableCache::new(
80            name,
81            opt.clone(),
82            share(Cache::new(opt.block_cache_capacity_bytes / opt.block_size)),
83            opt.max_open_files - 10,
84        ));
85        let vset = VersionSet::new(name, opt.clone(), cache.clone());
86
87        DB {
88            name: name.to_owned(),
89            path,
90            lock: None,
91            internal_cmp: Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))),
92            fpol: InternalFilterPolicy::new(opt.filter_policy.clone()),
93
94            mem: MemTable::new(opt.cmp.clone()),
95            imm: None,
96
97            opt,
98
99            log: None,
100            log_num: None,
101            cache,
102            vset: share(vset),
103            snaps: SnapshotList::new(),
104
105            cstats: Default::default(),
106        }
107    }
108
109    fn current(&self) -> Shared<Version> {
110        self.vset.borrow().current()
111    }
112
113    /// Opens or creates a new or existing database. `name` is the name of the directory containing
114    /// the database.
115    ///
116    /// Whether a new database is created and what happens if a database exists at the given path
117    /// depends on the options set (`create_if_missing`, `error_if_exists`).
118    pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> {
119        let name = name.as_ref();
120        let mut db = DB::new(name, opt);
121        let mut ve = VersionEdit::new();
122        let save_manifest = db.recover(&mut ve)?;
123
124        // Create log file if an old one is not being reused.
125        if db.log.is_none() {
126            let lognum = db.vset.borrow_mut().new_file_number();
127            let logfile = db
128                .opt
129                .env
130                .open_writable_file(Path::new(&log_file_name(&db.name, lognum)))?;
131            ve.set_log_num(lognum);
132            db.log = Some(LogWriter::new(BufWriter::new(logfile)));
133            db.log_num = Some(lognum);
134        }
135
136        if save_manifest {
137            ve.set_log_num(db.log_num.unwrap_or(0));
138            db.vset.borrow_mut().log_and_apply(ve)?;
139        }
140
141        db.delete_obsolete_files()?;
142        db.maybe_do_compaction()?;
143        Ok(db)
144    }
145
146    /// initialize_db initializes a new database.
147    fn initialize_db(&mut self) -> Result<()> {
148        let mut ve = VersionEdit::new();
149        ve.set_comparator_name(self.opt.cmp.id());
150        ve.set_log_num(0);
151        ve.set_next_file(2);
152        ve.set_last_seq(0);
153
154        {
155            let manifest = manifest_file_name(&self.path, 1);
156            let manifest_file = self.opt.env.open_writable_file(Path::new(&manifest))?;
157            let mut lw = LogWriter::new(manifest_file);
158            lw.add_record(&ve.encode())?;
159            lw.flush()?;
160        }
161        set_current_file(self.opt.env.as_ref().as_ref(), &self.path, 1)
162    }
163
164    /// recover recovers from the existing state on disk. If the wrapped result is `true`, then
165    /// log_and_apply() should be called after recovery has finished.
166    fn recover(&mut self, ve: &mut VersionEdit) -> Result<bool> {
167        if self.opt.error_if_exists && self.opt.env.exists(self.path.as_ref()).unwrap_or(false) {
168            return err(StatusCode::AlreadyExists, "database already exists");
169        }
170
171        let _ = self.opt.env.mkdir(Path::new(&self.path));
172        self.acquire_lock()?;
173
174        if let Err(e) = read_current_file(self.opt.env.as_ref().as_ref(), &self.path) {
175            if e.code == StatusCode::NotFound && self.opt.create_if_missing {
176                self.initialize_db()?;
177            } else {
178                return err(
179                    StatusCode::InvalidArgument,
180                    "database does not exist and create_if_missing is false",
181                );
182            }
183        }
184
185        // If save_manifest is true, we should log_and_apply() later in order to write the new
186        // manifest.
187        let mut save_manifest = self.vset.borrow_mut().recover()?;
188
189        // Recover from all log files not in the descriptor.
190        let mut max_seq = 0;
191        let filenames = self.opt.env.children(&self.path)?;
192        let mut expected = self.vset.borrow().live_files();
193        let mut log_files = vec![];
194
195        for file in &filenames {
196            if let Ok((num, typ)) = parse_file_name(file) {
197                expected.remove(&num);
198                if typ == FileType::Log
199                    && (num >= self.vset.borrow().log_num || num == self.vset.borrow().prev_log_num)
200                {
201                    log_files.push(num);
202                }
203            }
204        }
205        if !expected.is_empty() {
206            log!(self.opt.log, "Missing at least these files: {:?}", expected);
207            return err(StatusCode::Corruption, "missing live files (see log)");
208        }
209
210        log_files.sort();
211        for i in 0..log_files.len() {
212            let (save_manifest_, max_seq_) =
213                self.recover_log_file(log_files[i], i == log_files.len() - 1, ve)?;
214            if save_manifest_ {
215                save_manifest = true;
216            }
217            if max_seq_ > max_seq {
218                max_seq = max_seq_;
219            }
220            self.vset.borrow_mut().mark_file_number_used(log_files[i]);
221        }
222
223        if self.vset.borrow().last_seq < max_seq {
224            self.vset.borrow_mut().last_seq = max_seq;
225        }
226
227        Ok(save_manifest)
228    }
229
230    /// recover_log_file reads a single log file into a memtable, writing new L0 tables if
231    /// necessary. If is_last is true, it checks whether the log file can be reused, and sets up
232    /// the database's logging handles appropriately if that's the case.
233    fn recover_log_file(
234        &mut self,
235        log_num: FileNum,
236        is_last: bool,
237        ve: &mut VersionEdit,
238    ) -> Result<(bool, SequenceNumber)> {
239        let filename = log_file_name(&self.path, log_num);
240        let logfile = self.opt.env.open_sequential_file(Path::new(&filename))?;
241        // Use the user-supplied comparator; it will be wrapped inside a MemtableKeyCmp.
242        let cmp: Rc<Box<dyn Cmp>> = self.opt.cmp.clone();
243
244        let mut logreader = LogReader::new(
245            logfile, // checksum=
246            true,
247        );
248        log!(self.opt.log, "Recovering log file {:?}", filename);
249        let mut scratch = vec![];
250        let mut mem = MemTable::new(cmp.clone());
251        let mut batch = WriteBatch::new();
252
253        let mut compactions = 0;
254        let mut max_seq = 0;
255        let mut save_manifest = false;
256
257        while let Ok(len) = logreader.read(&mut scratch) {
258            if len == 0 {
259                break;
260            }
261            if len < 12 {
262                log!(
263                    self.opt.log,
264                    "corruption in log file {:06}: record shorter than 12B",
265                    log_num
266                );
267                continue;
268            }
269
270            batch.set_contents(&scratch);
271            batch.insert_into_memtable(batch.sequence(), &mut mem);
272
273            let last_seq = batch.sequence() + batch.count() as u64 - 1;
274            if last_seq > max_seq {
275                max_seq = last_seq
276            }
277            if mem.approx_mem_usage() > self.opt.write_buffer_size {
278                compactions += 1;
279                self.write_l0_table(&mem, ve, None)?;
280                save_manifest = true;
281                mem = MemTable::new(cmp.clone());
282            }
283            batch.clear();
284        }
285
286        // Check if we can reuse the last log file.
287        if self.opt.reuse_logs && is_last && compactions == 0 {
288            assert!(self.log.is_none());
289            log!(self.opt.log, "reusing log file {:?}", filename);
290            let oldsize = self.opt.env.size_of(Path::new(&filename))?;
291            let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
292            let lw = LogWriter::new_with_off(BufWriter::new(oldfile), oldsize);
293            self.log = Some(lw);
294            self.log_num = Some(log_num);
295            self.mem = mem;
296        } else if mem.len() > 0 {
297            // Log is not reused, so write out the accumulated memtable.
298            save_manifest = true;
299            self.write_l0_table(&mem, ve, None)?;
300        }
301
302        Ok((save_manifest, max_seq))
303    }
304
305    /// delete_obsolete_files removes files that are no longer needed from the file system.
306    fn delete_obsolete_files(&mut self) -> Result<()> {
307        let files = self.vset.borrow().live_files();
308        let filenames = self.opt.env.children(Path::new(&self.path))?;
309        for name in filenames {
310            if let Ok((num, typ)) = parse_file_name(&name) {
311                match typ {
312                    FileType::Log => {
313                        if num >= self.vset.borrow().log_num {
314                            continue;
315                        }
316                    }
317                    FileType::Descriptor => {
318                        if num >= self.vset.borrow().manifest_num {
319                            continue;
320                        }
321                    }
322                    FileType::Table => {
323                        if files.contains(&num) {
324                            continue;
325                        }
326                    }
327                    // NOTE: In this non-concurrent implementation, we likely never find temp
328                    // files.
329                    FileType::Temp => {
330                        if files.contains(&num) {
331                            continue;
332                        }
333                    }
334                    FileType::Current | FileType::DBLock | FileType::InfoLog => continue,
335                }
336
337                // If we're here, delete this file.
338                if typ == FileType::Table {
339                    let _ = self.cache.borrow_mut().evict(num);
340                }
341                log!(self.opt.log, "Deleting file type={:?} num={}", typ, num);
342                if let Err(e) = self.opt.env.delete(&self.path.join(&name)) {
343                    log!(self.opt.log, "Deleting file num={} failed: {}", num, e);
344                }
345            }
346        }
347        Ok(())
348    }
349
350    /// acquire_lock acquires the lock file.
351    fn acquire_lock(&mut self) -> Result<()> {
352        let lock_r = self.opt.env.lock(Path::new(&lock_file_name(&self.path)));
353        match lock_r {
354            Ok(lockfile) => {
355                self.lock = Some(lockfile);
356                Ok(())
357            }
358            Err(ref e) if e.code == StatusCode::LockError => err(
359                StatusCode::LockError,
360                "database lock is held by another instance",
361            ),
362            Err(e) => Err(e),
363        }
364    }
365
366    /// release_lock releases the lock file, if it's currently held.
367    fn release_lock(&mut self) -> Result<()> {
368        if let Some(l) = self.lock.take() {
369            self.opt.env.unlock(l)
370        } else {
371            Ok(())
372        }
373    }
374
375    /// Flush data to disk and release lock.
376    pub fn close(&mut self) -> Result<()> {
377        self.flush()?;
378        self.release_lock()?;
379        Ok(())
380    }
381}
382
383impl DB {
384    // WRITE //
385
386    /// Adds a single entry. It's a short, non-synchronous, form of `write()`; in order to make
387    /// sure that the written entry is on disk, call `flush()` afterwards.
388    pub fn put(&mut self, k: &[u8], v: &[u8]) -> Result<()> {
389        let mut wb = WriteBatch::new();
390        wb.put(k, v);
391        self.write(wb, false)
392    }
393
394    /// Deletes a single entry. Like with `put()`, you can call `flush()` to guarantee that
395    /// the operation made it to disk.
396    pub fn delete(&mut self, k: &[u8]) -> Result<()> {
397        let mut wb = WriteBatch::new();
398        wb.delete(k);
399        self.write(wb, false)
400    }
401
402    /// Writes an entire WriteBatch. `sync` determines whether the write should be flushed to
403    /// disk.
404    pub fn write(&mut self, batch: WriteBatch, sync: bool) -> Result<()> {
405        assert!(self.log.is_some());
406
407        self.make_room_for_write(false)?;
408
409        let entries = batch.count() as u64;
410        let log = self.log.as_mut().unwrap();
411        let next = self.vset.borrow().last_seq + 1;
412
413        batch.insert_into_memtable(next, &mut self.mem);
414        log.add_record(&batch.encode(next))?;
415        if sync {
416            log.flush()?;
417        }
418        self.vset.borrow_mut().last_seq += entries;
419        Ok(())
420    }
421
422    /// flush makes sure that all pending changes (e.g. from put()) are stored on disk.
423    pub fn flush(&mut self) -> Result<()> {
424        if let Some(ref mut log) = self.log.as_mut() {
425            log.flush()?;
426        }
427        Ok(())
428    }
429}
430
431impl DB {
432    // READ //
433
434    fn get_internal(&mut self, seq: SequenceNumber, key: &[u8]) -> Result<Option<Bytes>> {
435        // Using this lookup key will skip all entries with higher sequence numbers, because they
436        // will compare "Lesser" using the InternalKeyCmp
437        let lkey = LookupKey::new(key, seq);
438
439        match self.mem.get(&lkey) {
440            (Some(v), _) => return Ok(Some(v)),
441            // deleted entry
442            (None, true) => return Ok(None),
443            // not found entry
444            (None, false) => {}
445        }
446
447        if let Some(imm) = self.imm.as_ref() {
448            match imm.get(&lkey) {
449                (Some(v), _) => return Ok(Some(v)),
450                // deleted entry
451                (None, true) => return Ok(None),
452                // not found entry
453                (None, false) => {}
454            }
455        }
456
457        let mut do_compaction = false;
458        let mut result = None;
459
460        // Limiting the borrow scope of self.current.
461        {
462            let current = self.current();
463            let mut current = current.borrow_mut();
464            if let Ok(Some((v, st))) = current.get(lkey.internal_key()) {
465                if current.update_stats(st) {
466                    do_compaction = true;
467                }
468                result = Some(v)
469            }
470        }
471
472        if do_compaction {
473            if let Err(e) = self.maybe_do_compaction() {
474                log!(self.opt.log, "error while doing compaction in get: {}", e);
475            }
476        }
477        Ok(result)
478    }
479
480    /// get_at reads the value for a given key at or before snapshot. It returns Ok(None) if the
481    /// entry wasn't found, and Err(_) if an error occurred.
482    pub fn get_at(&mut self, snapshot: &Snapshot, key: &[u8]) -> Result<Option<Bytes>> {
483        self.get_internal(snapshot.sequence(), key)
484    }
485
486    /// get is a simplified version of get_at(), translating errors to None.
487    pub fn get(&mut self, key: &[u8]) -> Option<Bytes> {
488        let seq = self.vset.borrow().last_seq;
489        self.get_internal(seq, key).unwrap_or_default()
490    }
491}
492
493impl DB {
494    // ITERATOR //
495
496    /// new_iter returns a DBIterator over the current state of the database. The iterator will not
497    /// return elements added to the database after its creation.
498    pub fn new_iter(&mut self) -> Result<DBIterator> {
499        let snapshot = self.get_snapshot();
500        self.new_iter_at(snapshot)
501    }
502
503    /// new_iter_at returns a DBIterator at the supplied snapshot.
504    pub fn new_iter_at(&mut self, ss: Snapshot) -> Result<DBIterator> {
505        Ok(DBIterator::new(
506            self.opt.cmp.clone(),
507            self.vset.clone(),
508            self.merge_iterators()?,
509            ss,
510        ))
511    }
512
513    /// merge_iterators produces a MergingIter merging the entries in the memtable, the immutable
514    /// memtable, and table files from all levels.
515    fn merge_iterators(&mut self) -> Result<MergingIter> {
516        let mut iters: Vec<Box<dyn LdbIterator>> = vec![];
517        if self.mem.len() > 0 {
518            iters.push(Box::new(self.mem.iter()));
519        }
520        if let Some(ref imm) = self.imm {
521            if imm.len() > 0 {
522                iters.push(Box::new(imm.iter()));
523            }
524        }
525
526        // Add iterators for table files.
527        let current = self.current();
528        let current = current.borrow();
529        iters.extend(current.new_iters()?);
530
531        Ok(MergingIter::new(self.internal_cmp.clone(), iters))
532    }
533}
534
535impl DB {
536    // SNAPSHOTS //
537
538    /// Returns a snapshot at the current state. It can be used to retrieve entries from the
539    /// database as they were at an earlier point in time.
540    pub fn get_snapshot(&mut self) -> Snapshot {
541        self.snaps.new_snapshot(self.vset.borrow().last_seq)
542    }
543}
544
545impl DB {
546    // STATISTICS //
547    fn add_stats(&mut self, level: usize, cs: CompactionStats) {
548        assert!(level < NUM_LEVELS);
549        self.cstats[level].add(cs);
550    }
551
552    /// Trigger a compaction based on where this key is located in the different levels.
553    fn record_read_sample(&mut self, k: InternalKey<'_>) {
554        let current = self.current();
555        if current.borrow_mut().record_read_sample(k) {
556            if let Err(e) = self.maybe_do_compaction() {
557                log!(self.opt.log, "record_read_sample: compaction failed: {}", e);
558            }
559        }
560    }
561}
562
563impl DB {
564    // COMPACTIONS //
565
566    /// make_room_for_write checks if the memtable has become too large, and triggers a compaction
567    /// if it's the case.
568    fn make_room_for_write(&mut self, force: bool) -> Result<()> {
569        if !force && self.mem.approx_mem_usage() < self.opt.write_buffer_size || self.mem.len() == 0
570        {
571            Ok(())
572        } else {
573            // Create new memtable.
574            let logn = self.vset.borrow_mut().new_file_number();
575            let logf = self
576                .opt
577                .env
578                .open_writable_file(Path::new(&log_file_name(&self.path, logn)));
579
580            if let Ok(log) = logf {
581                self.log = Some(LogWriter::new(BufWriter::new(log)));
582                self.log_num = Some(logn);
583
584                let mut imm = MemTable::new(self.opt.cmp.clone());
585                mem::swap(&mut imm, &mut self.mem);
586                self.imm = Some(imm);
587                self.maybe_do_compaction()
588            } else {
589                self.vset.borrow_mut().reuse_file_number(logn);
590                Err(logf.err().unwrap())
591            }
592        }
593    }
594
595    /// maybe_do_compaction starts a blocking compaction if it makes sense.
596    fn maybe_do_compaction(&mut self) -> Result<()> {
597        if self.imm.is_some() {
598            self.compact_memtable()?;
599        }
600        // Issue #34 PR #36: after compacting a memtable into an L0 file, it is possible that the
601        // L0 files need to be merged and promoted.
602        if self.vset.borrow().needs_compaction() {
603            let c = self.vset.borrow_mut().pick_compaction();
604            if let Some(c) = c {
605                self.start_compaction(c)
606            } else {
607                Ok(())
608            }
609        } else {
610            Ok(())
611        }
612    }
613
614    /// compact_range triggers an immediate compaction on the specified key range. Repeatedly
615    /// calling this without actually adding new keys is not useful.
616    ///
617    /// Compactions in general will cause the database to find entries more quickly, and take up
618    /// less space on disk.
619    pub fn compact_range(&mut self, from: &[u8], to: &[u8]) -> Result<()> {
620        let mut max_level = 1;
621        {
622            let v = self.vset.borrow().current();
623            let v = v.borrow();
624            for l in 1..NUM_LEVELS - 1 {
625                if v.overlap_in_level(l, from, to) {
626                    max_level = l;
627                }
628            }
629        }
630
631        // Compact memtable.
632        self.make_room_for_write(true)?;
633
634        let mut ifrom = LookupKey::new(from, MAX_SEQUENCE_NUMBER)
635            .internal_key()
636            .to_vec();
637        let iend = LookupKey::new_full(to, 0, ValueType::TypeDeletion);
638
639        for l in 0..max_level + 1 {
640            loop {
641                let c_ = self
642                    .vset
643                    .borrow_mut()
644                    .compact_range(l, &ifrom, iend.internal_key());
645                if let Some(c) = c_ {
646                    // Update ifrom to the largest key of the last file in this compaction.
647                    let ix = c.num_inputs(0) - 1;
648                    ifrom.clone_from(&c.input(0, ix).largest.into());
649                    self.start_compaction(c)?;
650                } else {
651                    break;
652                }
653            }
654        }
655        Ok(())
656    }
657
658    /// start_compaction dispatches the different kinds of compactions depending on the current
659    /// state of the database.
660    fn start_compaction(&mut self, mut compaction: Compaction) -> Result<()> {
661        if compaction.is_trivial_move() {
662            assert_eq!(1, compaction.num_inputs(0));
663            let f = compaction.input(0, 0);
664            let num = f.num;
665            let size = f.size;
666            let level = compaction.level();
667
668            compaction.edit().delete_file(level, num);
669            compaction.edit().add_file(level + 1, f);
670
671            let r = self.vset.borrow_mut().log_and_apply(compaction.into_edit());
672            if let Err(e) = r {
673                log!(self.opt.log, "trivial move failed: {}", e);
674                Err(e)
675            } else {
676                log!(
677                    self.opt.log,
678                    "Moved num={} bytes={} from L{} to L{}",
679                    num,
680                    size,
681                    level,
682                    level + 1
683                );
684                log!(
685                    self.opt.log,
686                    "Summary: {}",
687                    self.vset.borrow().current_summary()
688                );
689                Ok(())
690            }
691        } else {
692            let smallest = if self.snaps.empty() {
693                self.vset.borrow().last_seq
694            } else {
695                self.snaps.oldest()
696            };
697            let mut state = CompactionState::new(compaction, smallest);
698            if let Err(e) = self.do_compaction_work(&mut state) {
699                state.cleanup(self.opt.env.as_ref().as_ref(), &self.path);
700                log!(self.opt.log, "Compaction work failed: {}", e);
701            }
702            self.install_compaction_results(state)?;
703            log!(
704                self.opt.log,
705                "Compaction finished: {}",
706                self.vset.borrow().current_summary()
707            );
708
709            self.delete_obsolete_files()
710        }
711    }
712
713    fn compact_memtable(&mut self) -> Result<()> {
714        assert!(self.imm.is_some());
715
716        let mut ve = VersionEdit::new();
717        let base = self.current();
718
719        let imm = self.imm.take().unwrap();
720        if let Err(e) = self.write_l0_table(&imm, &mut ve, Some(&base.borrow())) {
721            self.imm = Some(imm);
722            return Err(e);
723        }
724        ve.set_log_num(self.log_num.unwrap_or(0));
725        self.vset.borrow_mut().log_and_apply(ve)?;
726        if let Err(e) = self.delete_obsolete_files() {
727            log!(self.opt.log, "Error deleting obsolete files: {}", e);
728        }
729        Ok(())
730    }
731
732    /// write_l0_table writes the given memtable to a table file.
733    fn write_l0_table(
734        &mut self,
735        memt: &MemTable,
736        ve: &mut VersionEdit,
737        base: Option<&Version>,
738    ) -> Result<()> {
739        let start_ts = self.opt.env.micros();
740        let num = self.vset.borrow_mut().new_file_number();
741        log!(self.opt.log, "Start write of L0 table {:06}", num);
742        let fmd = build_table(&self.path, &self.opt, memt.iter(), num)?;
743        log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
744
745        // Wrote empty table.
746        if fmd.size == 0 {
747            self.vset.borrow_mut().reuse_file_number(num);
748            return Ok(());
749        }
750
751        let cache_result = self.cache.borrow_mut().get_table(num);
752        if let Err(e) = cache_result {
753            log!(
754                self.opt.log,
755                "L0 table {:06} not returned by cache: {}",
756                num,
757                e
758            );
759            let _ = self
760                .opt
761                .env
762                .delete(Path::new(&table_file_name(&self.path, num)));
763            return Err(e);
764        }
765
766        let stats = CompactionStats {
767            micros: self.opt.env.micros() - start_ts,
768            written: fmd.size,
769            ..Default::default()
770        };
771
772        let mut level = 0;
773        if let Some(b) = base {
774            level = b.pick_memtable_output_level(
775                parse_internal_key(&fmd.smallest).2,
776                parse_internal_key(&fmd.largest).2,
777            );
778        }
779
780        self.add_stats(level, stats);
781        ve.add_file(level, fmd);
782
783        Ok(())
784    }
785
786    fn do_compaction_work(&mut self, cs: &mut CompactionState) -> Result<()> {
787        {
788            let current = self.vset.borrow().current();
789            assert!(current.borrow().num_level_files(cs.compaction.level()) > 0);
790            assert!(cs.builder.is_none());
791        }
792        let start_ts = self.opt.env.micros();
793        log!(
794            self.opt.log,
795            "Compacting {} files at L{} and {} files at L{}",
796            cs.compaction.num_inputs(0),
797            cs.compaction.level(),
798            cs.compaction.num_inputs(1),
799            cs.compaction.level() + 1
800        );
801
802        let mut input = self.vset.borrow().make_input_iterator(&cs.compaction);
803        input.seek_to_first();
804
805        let mut last_seq_for_key = MAX_SEQUENCE_NUMBER;
806
807        let mut have_ukey = false;
808        let mut current_ukey = vec![];
809
810        while input.valid() {
811            // TODO: Do we need to do a memtable compaction here? Probably not, in the sequential
812            // case.
813            let (key_bytes, val_bytes) = input.current().expect("Iterator should be valid here");
814
815            if cs.compaction.should_stop_before(&key_bytes) && cs.builder.is_some() {
816                self.finish_compaction_output(cs)?;
817            }
818            let (ktyp, seq, ukey) = parse_internal_key(&key_bytes);
819            if seq == 0 {
820                // Parsing failed.
821                log!(self.opt.log, "Encountered seq=0 in key: {:?}", &key_bytes);
822                last_seq_for_key = MAX_SEQUENCE_NUMBER;
823                have_ukey = false;
824                current_ukey.clear();
825                input.advance();
826                continue;
827            }
828
829            if !have_ukey || self.opt.cmp.cmp(ukey, &current_ukey) != Ordering::Equal {
830                // First occurrence of this key.
831                current_ukey.clear();
832                current_ukey.extend_from_slice(ukey);
833                have_ukey = true;
834                last_seq_for_key = MAX_SEQUENCE_NUMBER;
835            }
836
837            // We can omit the key under the following conditions:
838            if last_seq_for_key <= cs.smallest_seq {
839                last_seq_for_key = seq;
840                input.advance();
841                continue;
842            }
843            // Entry is deletion; no older version is observable by any snapshot; and all entries
844            // in compacted levels with smaller sequence numbers will
845            if ktyp == ValueType::TypeDeletion
846                && seq <= cs.smallest_seq
847                && cs.compaction.is_base_level_for(ukey)
848            {
849                last_seq_for_key = seq;
850                input.advance();
851                continue;
852            }
853
854            last_seq_for_key = seq;
855
856            if cs.builder.is_none() {
857                let fnum = self.vset.borrow_mut().new_file_number();
858                let fmd = FileMetaData {
859                    num: fnum,
860                    ..Default::default()
861                };
862
863                let fname = table_file_name(&self.path, fnum);
864                let f = self.opt.env.open_writable_file(Path::new(&fname))?;
865                let f = Box::new(BufWriter::new(f));
866                cs.builder = Some(TableBuilder::new(self.opt.clone(), f));
867                cs.outputs.push(fmd);
868            }
869            if cs.builder.as_ref().unwrap().entries() == 0 {
870                cs.current_output().smallest.clone_from(&key_bytes);
871            }
872            cs.current_output().largest.clone_from(&key_bytes);
873            cs.builder.as_mut().unwrap().add(&key_bytes, &val_bytes)?;
874            // NOTE: Adjust max file size based on level.
875            if cs.builder.as_ref().unwrap().size_estimate() > self.opt.max_file_size {
876                self.finish_compaction_output(cs)?;
877            }
878
879            input.advance();
880        }
881
882        if cs.builder.is_some() {
883            self.finish_compaction_output(cs)?;
884        }
885
886        let mut stats = CompactionStats {
887            micros: self.opt.env.micros() - start_ts,
888            ..Default::default()
889        };
890        for parent in 0..2 {
891            for inp in 0..cs.compaction.num_inputs(parent) {
892                stats.read += cs.compaction.input(parent, inp).size;
893            }
894        }
895        for output in &cs.outputs {
896            stats.written += output.size;
897        }
898        self.cstats[cs.compaction.level()].add(stats);
899        Ok(())
900    }
901
902    fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
903        assert!(cs.builder.is_some());
904        let output_num = cs.current_output().num;
905        assert!(output_num > 0);
906
907        // The original checks if the input iterator has an OK status. For this, we'd need to
908        // extend the LdbIterator interface though -- let's see if we can without for now.
909        // (it's not good for corruptions, in any case)
910        let b = cs.builder.take().unwrap();
911        let entries = b.entries();
912        let bytes = b.finish()?;
913        cs.total_bytes += bytes;
914
915        cs.current_output().size = bytes;
916
917        if entries > 0 {
918            // Verify that table can be used. (Separating get_table() because borrowing in an if
919            // let expression is dangerous).
920            let r = self.cache.borrow_mut().get_table(output_num);
921            if let Err(e) = r {
922                log!(self.opt.log, "New table can't be read: {}", e);
923                return Err(e);
924            }
925            log!(
926                self.opt.log,
927                "New table num={}: keys={} size={}",
928                output_num,
929                entries,
930                bytes
931            );
932        }
933        Ok(())
934    }
935
936    fn install_compaction_results(&mut self, mut cs: CompactionState) -> Result<()> {
937        log!(
938            self.opt.log,
939            "Compacted {} L{} files + {} L{} files => {}B",
940            cs.compaction.num_inputs(0),
941            cs.compaction.level(),
942            cs.compaction.num_inputs(1),
943            cs.compaction.level() + 1,
944            cs.total_bytes
945        );
946        cs.compaction.add_input_deletions();
947        let level = cs.compaction.level();
948        for output in &cs.outputs {
949            cs.compaction.edit().add_file(level + 1, output.clone());
950        }
951        self.vset
952            .borrow_mut()
953            .log_and_apply(cs.compaction.into_edit())
954    }
955}
956
957impl Drop for DB {
958    fn drop(&mut self) {
959        self.flush().ok();
960        let _ = self.release_lock();
961    }
962}
963
964struct CompactionState {
965    compaction: Compaction,
966    smallest_seq: SequenceNumber,
967    outputs: Vec<FileMetaData>,
968    builder: Option<TableBuilder<Box<dyn Write>>>,
969    total_bytes: usize,
970}
971
972impl CompactionState {
973    fn new(c: Compaction, smallest: SequenceNumber) -> CompactionState {
974        CompactionState {
975            compaction: c,
976            smallest_seq: smallest,
977            outputs: vec![],
978            builder: None,
979            total_bytes: 0,
980        }
981    }
982
983    fn current_output(&mut self) -> &mut FileMetaData {
984        let len = self.outputs.len();
985        &mut self.outputs[len - 1]
986    }
987
988    /// cleanup cleans up after an aborted compaction.
989    fn cleanup<P: AsRef<Path>>(&mut self, env: &dyn Env, name: P) {
990        for o in self.outputs.drain(..) {
991            let name = table_file_name(name.as_ref(), o.num);
992            let _ = env.delete(&name);
993        }
994    }
995}
996
997#[derive(Debug, Default)]
998struct CompactionStats {
999    micros: u64,
1000    read: usize,
1001    written: usize,
1002}
1003
1004impl CompactionStats {
1005    fn add(&mut self, cs: CompactionStats) {
1006        self.micros += cs.micros;
1007        self.read += cs.read;
1008        self.written += cs.written;
1009    }
1010}
1011
1012pub fn build_table<I: LdbIterator, P: AsRef<Path>>(
1013    dbname: P,
1014    opt: &Options,
1015    mut from: I,
1016    num: FileNum,
1017) -> Result<FileMetaData> {
1018    from.reset();
1019    let filename = table_file_name(dbname.as_ref(), num);
1020
1021    let (mut kbuf, mut vbuf) = (vec![], vec![]);
1022    let mut firstkey = None;
1023    // lastkey is what remains in kbuf.
1024
1025    // Clean up file if write fails at any point.
1026    //
1027    // TODO: Replace with catch {} when available.
1028    let r = (|| -> Result<()> {
1029        let f = opt.env.open_writable_file(Path::new(&filename))?;
1030        let f = BufWriter::new(f);
1031        let mut builder = TableBuilder::new(opt.clone(), f);
1032        while from.advance() {
1033            if let Some((key_bytes, val_bytes)) = from.current() {
1034                kbuf = key_bytes.to_vec();
1035                vbuf = val_bytes.to_vec();
1036                if firstkey.is_none() {
1037                    firstkey = Some(kbuf.clone());
1038                }
1039                builder.add(&kbuf, &vbuf)?;
1040            } else {
1041                panic!("Iterator should be valid here");
1042            }
1043        }
1044        builder.finish()?;
1045        Ok(())
1046    })();
1047
1048    if let Err(e) = r {
1049        let _ = opt.env.delete(Path::new(&filename));
1050        return Err(e);
1051    }
1052
1053    let mut md = FileMetaData::default();
1054    match firstkey {
1055        None => {
1056            let _ = opt.env.delete(Path::new(&filename));
1057        }
1058        Some(key) => {
1059            md.num = num;
1060            md.size = opt.env.size_of(Path::new(&filename))?;
1061            md.smallest = key.into();
1062            md.largest = kbuf.into();
1063        }
1064    }
1065    Ok(md)
1066}
1067
1068fn log_file_name(db: &Path, num: FileNum) -> PathBuf {
1069    db.join(format!("{:06}.log", num))
1070}
1071
1072fn lock_file_name(db: &Path) -> PathBuf {
1073    db.join("LOCK")
1074}
1075
1076/// open_info_log opens an info log file in the given database. It transparently returns a
1077/// /dev/null logger in case the open fails.
1078fn open_info_log<E: Env + ?Sized, P: AsRef<Path>>(env: &E, db: P) -> Logger {
1079    let db = db.as_ref();
1080    let logfilename = db.join("LOG");
1081    let oldlogfilename = db.join("LOG.old");
1082    let _ = env.mkdir(Path::new(db));
1083    if let Ok(e) = env.exists(Path::new(&logfilename)) {
1084        if e {
1085            let _ = env.rename(Path::new(&logfilename), Path::new(&oldlogfilename));
1086        }
1087    }
1088    if let Ok(w) = env.open_writable_file(Path::new(&logfilename)) {
1089        Logger(w)
1090    } else {
1091        Logger(Box::new(io::sink()))
1092    }
1093}
1094
1095#[cfg(test)]
1096pub mod testutil {
1097    use super::*;
1098
1099    use crate::version::testutil::make_version;
1100
1101    /// build_db creates a database filled with the tables created by make_version().
1102    pub fn build_db() -> (DB, Options) {
1103        let name = "db";
1104        let (v, mut opt) = make_version();
1105        opt.reuse_logs = false;
1106        opt.reuse_manifest = false;
1107        let mut ve = VersionEdit::new();
1108        ve.set_comparator_name(opt.cmp.id());
1109        ve.set_log_num(0);
1110        // 9 files + 1 manifest we write below.
1111        ve.set_next_file(11);
1112        // 30 entries in these tables.
1113        ve.set_last_seq(30);
1114
1115        for l in 0..NUM_LEVELS {
1116            for f in &v.files[l] {
1117                ve.add_file(l, f.borrow().clone());
1118            }
1119        }
1120
1121        let manifest = manifest_file_name(name, 10);
1122        let manifest_file = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1123        let mut lw = LogWriter::new(manifest_file);
1124        lw.add_record(&ve.encode()).unwrap();
1125        lw.flush().unwrap();
1126        set_current_file(opt.env.as_ref().as_ref(), name, 10).unwrap();
1127
1128        (DB::open(name, opt.clone()).unwrap(), opt)
1129    }
1130
1131    /// set_file_to_compact ensures that the specified table file will be compacted next.
1132    pub fn set_file_to_compact(db: &mut DB, num: FileNum) {
1133        let v = db.current();
1134        let mut v = v.borrow_mut();
1135
1136        let mut ftc = None;
1137        for l in 0..NUM_LEVELS {
1138            for f in &v.files[l] {
1139                if f.borrow().num == num {
1140                    ftc = Some((f.clone(), l));
1141                }
1142            }
1143        }
1144        if let Some((f, l)) = ftc {
1145            v.file_to_compact = Some(f);
1146            v.file_to_compact_lvl = l;
1147        } else {
1148            panic!("file number not found");
1149        }
1150    }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::testutil::{build_db, set_file_to_compact};
1156    use super::*;
1157
1158    use crate::error::Status;
1159    use crate::key_types::LookupKey;
1160    use crate::mem_env::MemEnv;
1161    use crate::options;
1162    use crate::test_util::LdbIteratorIter;
1163    use crate::version::testutil::make_version;
1164
1165    #[test]
1166    fn test_db_impl_open_info_log() {
1167        let e = MemEnv::new();
1168        {
1169            let l = Some(share(open_info_log(&e, "abc")));
1170            assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1171            log!(l, "hello {}", "world");
1172            assert_eq!(12, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1173        }
1174        {
1175            let l = Some(share(open_info_log(&e, "abc")));
1176            assert!(e.exists(&Path::new("abc").join("LOG.old")).unwrap());
1177            assert!(e.exists(&Path::new("abc").join("LOG")).unwrap());
1178            assert_eq!(12, e.size_of(&Path::new("abc").join("LOG.old")).unwrap());
1179            assert_eq!(0, e.size_of(&Path::new("abc").join("LOG")).unwrap());
1180            log!(l, "something else");
1181            log!(l, "and another {}", 1);
1182
1183            let mut s = String::new();
1184            let mut r = e
1185                .open_sequential_file(&Path::new("abc").join("LOG"))
1186                .unwrap();
1187            r.read_to_string(&mut s).unwrap();
1188            assert_eq!("something else\nand another 1\n", &s);
1189        }
1190    }
1191
1192    fn build_memtable() -> MemTable {
1193        let mut mt = MemTable::new(options::for_test().cmp);
1194        let mut i = 1;
1195        for k in ["abc", "def", "ghi", "jkl", "mno", "aabc", "test123"].iter() {
1196            mt.add(i, ValueType::TypeValue, k.as_bytes(), b"looooongval");
1197            i += 1;
1198        }
1199        mt
1200    }
1201
1202    #[test]
1203    fn test_db_impl_init() {
1204        // A sanity check for recovery and basic persistence.
1205        let opt = options::for_test();
1206        let env = opt.env.clone();
1207
1208        // Several test cases with different options follow. The printlns can eventually be
1209        // removed.
1210
1211        {
1212            let mut opt = opt.clone();
1213            opt.reuse_manifest = false;
1214            let _ = DB::open("otherdb", opt.clone()).unwrap();
1215
1216            eprintln!(
1217                "children after: {:?}",
1218                env.children(Path::new("otherdb")).unwrap()
1219            );
1220            assert!(env.exists(&Path::new("otherdb").join("CURRENT")).unwrap());
1221            // Database is initialized and initial manifest reused.
1222            assert!(!env
1223                .exists(&Path::new("otherdb").join("MANIFEST-000001"))
1224                .unwrap());
1225            assert!(env
1226                .exists(&Path::new("otherdb").join("MANIFEST-000002"))
1227                .unwrap());
1228            assert!(env
1229                .exists(&Path::new("otherdb").join("000003.log"))
1230                .unwrap());
1231        }
1232
1233        {
1234            let mut opt = opt.clone();
1235            opt.reuse_manifest = true;
1236            let mut db = DB::open("db", opt.clone()).unwrap();
1237
1238            eprintln!(
1239                "children after: {:?}",
1240                env.children(&Path::new("db").join("")).unwrap()
1241            );
1242            assert!(env.exists(&Path::new("db").join("CURRENT")).unwrap());
1243            // Database is initialized and initial manifest reused.
1244            assert!(env
1245                .exists(&Path::new("db").join("MANIFEST-000001"))
1246                .unwrap());
1247            assert!(env.exists(&Path::new("db").join("LOCK")).unwrap());
1248            assert!(env.exists(&Path::new("db").join("000003.log")).unwrap());
1249
1250            db.put(b"abc", b"def").unwrap();
1251            db.put(b"abd", b"def").unwrap();
1252        }
1253
1254        {
1255            eprintln!(
1256                "children before: {:?}",
1257                env.children(&Path::new("db").join("")).unwrap()
1258            );
1259            let mut opt = opt.clone();
1260            opt.reuse_manifest = false;
1261            opt.reuse_logs = false;
1262            let mut db = DB::open("db", opt.clone()).unwrap();
1263
1264            eprintln!(
1265                "children after: {:?}",
1266                env.children(&Path::new("db").join("")).unwrap()
1267            );
1268            // Obsolete manifest is deleted.
1269            assert!(!env
1270                .exists(&Path::new("db").join("MANIFEST-000001"))
1271                .unwrap());
1272            // New manifest is created.
1273            assert!(env
1274                .exists(&Path::new("db").join("MANIFEST-000002"))
1275                .unwrap());
1276            // Obsolete log file is deleted.
1277            assert!(!env.exists(&Path::new("db").join("000003.log")).unwrap());
1278            // New L0 table has been added.
1279            assert!(env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1280            assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1281            // Check that entry exists and is correct. Phew, long call chain!
1282            let current = db.current();
1283            log!(opt.log, "files: {:?}", current.borrow().files);
1284            assert_eq!(
1285                b"def",
1286                current
1287                    .borrow_mut()
1288                    .get(LookupKey::new(b"abc", 1).internal_key())
1289                    .unwrap()
1290                    .unwrap()
1291                    .0
1292                    .as_ref()
1293            );
1294            db.put(b"abe", b"def").unwrap();
1295        }
1296
1297        {
1298            eprintln!(
1299                "children before: {:?}",
1300                env.children(Path::new("db")).unwrap()
1301            );
1302            // reuse_manifest above causes the old manifest to be deleted as obsolete, but no new
1303            // manifest is written. CURRENT becomes stale.
1304            let mut opt = opt.clone();
1305            opt.reuse_logs = true;
1306            let db = DB::open("db", opt).unwrap();
1307
1308            eprintln!(
1309                "children after: {:?}",
1310                env.children(Path::new("db")).unwrap()
1311            );
1312            assert!(!env
1313                .exists(&Path::new("db").join("MANIFEST-000001"))
1314                .unwrap());
1315            assert!(env
1316                .exists(&Path::new("db").join("MANIFEST-000002"))
1317                .unwrap());
1318            assert!(!env
1319                .exists(&Path::new("db").join("MANIFEST-000005"))
1320                .unwrap());
1321            assert!(env.exists(&Path::new("db").join("000004.log")).unwrap());
1322            // 000004 should be reused, no new log file should be created.
1323            assert!(!env.exists(&Path::new("db").join("000006.log")).unwrap());
1324            // Log is reused, so memtable should contain last written entry from above.
1325            assert_eq!(1, db.mem.len());
1326            assert_eq!(b"def", &*db.mem.get(&LookupKey::new(b"abe", 3)).0.unwrap());
1327        }
1328    }
1329
1330    #[test]
1331    fn test_db_impl_compact_range() {
1332        let (mut db, opt) = build_db();
1333        let env = &opt.env;
1334
1335        eprintln!(
1336            "children before: {:?}",
1337            env.children(&Path::new("db").join("")).unwrap()
1338        );
1339        db.compact_range(b"aaa", b"dba").unwrap();
1340        eprintln!(
1341            "children after: {:?}",
1342            env.children(&Path::new("db").join("")).unwrap()
1343        );
1344
1345        assert_eq!(
1346            250,
1347            opt.env
1348                .size_of(&Path::new("db").join("000007.ldb"))
1349                .unwrap()
1350        );
1351        assert_eq!(
1352            200,
1353            opt.env
1354                .size_of(&Path::new("db").join("000008.ldb"))
1355                .unwrap()
1356        );
1357        assert_eq!(
1358            200,
1359            opt.env
1360                .size_of(&Path::new("db").join("000009.ldb"))
1361                .unwrap()
1362        );
1363        assert_eq!(
1364            435,
1365            opt.env
1366                .size_of(&Path::new("db").join("000015.ldb"))
1367                .unwrap()
1368        );
1369
1370        assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1371        assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1372        assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1373        assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1374        assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1375        assert!(!opt.env.exists(&Path::new("db").join("000013.ldb")).unwrap());
1376        assert!(!opt.env.exists(&Path::new("db").join("000014.ldb")).unwrap());
1377
1378        assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1379        assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1380        assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1381        assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1382    }
1383
1384    #[test]
1385    fn test_db_impl_compact_range_memtable() {
1386        let (mut db, opt) = build_db();
1387        let env = &opt.env;
1388
1389        db.put(b"xxx", b"123").unwrap();
1390
1391        eprintln!(
1392            "children before: {:?}",
1393            env.children(Path::new("db")).unwrap()
1394        );
1395        db.compact_range(b"aaa", b"dba").unwrap();
1396        eprintln!(
1397            "children after: {:?}",
1398            env.children(Path::new("db")).unwrap()
1399        );
1400
1401        assert_eq!(
1402            250,
1403            opt.env
1404                .size_of(&Path::new("db").join("000007.ldb"))
1405                .unwrap()
1406        );
1407        assert_eq!(
1408            200,
1409            opt.env
1410                .size_of(&Path::new("db").join("000008.ldb"))
1411                .unwrap()
1412        );
1413        assert_eq!(
1414            200,
1415            opt.env
1416                .size_of(&Path::new("db").join("000009.ldb"))
1417                .unwrap()
1418        );
1419        assert_eq!(
1420            182,
1421            opt.env
1422                .size_of(&Path::new("db").join("000014.ldb"))
1423                .unwrap()
1424        );
1425        assert_eq!(
1426            435,
1427            opt.env
1428                .size_of(&Path::new("db").join("000017.ldb"))
1429                .unwrap()
1430        );
1431
1432        assert!(!opt.env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1433        assert!(!opt.env.exists(&Path::new("db").join("000002.ldb")).unwrap());
1434        assert!(!opt.env.exists(&Path::new("db").join("000003.ldb")).unwrap());
1435        assert!(!opt.env.exists(&Path::new("db").join("000004.ldb")).unwrap());
1436        assert!(!opt.env.exists(&Path::new("db").join("000005.ldb")).unwrap());
1437        assert!(!opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1438        assert!(!opt.env.exists(&Path::new("db").join("000015.ldb")).unwrap());
1439        assert!(!opt.env.exists(&Path::new("db").join("000016.ldb")).unwrap());
1440
1441        assert_eq!(b"val1".to_vec(), db.get(b"aaa").unwrap());
1442        assert_eq!(b"val2".to_vec(), db.get(b"cab").unwrap());
1443        assert_eq!(b"val3".to_vec(), db.get(b"aba").unwrap());
1444        assert_eq!(b"val3".to_vec(), db.get(b"fab").unwrap());
1445        assert_eq!(b"123".to_vec(), db.get(b"xxx").unwrap());
1446    }
1447
1448    #[allow(unused_variables)]
1449    #[test]
1450    fn test_db_impl_locking() {
1451        let opt = options::for_test();
1452        let db = DB::open("db", opt.clone()).unwrap();
1453        let want_err = Status::new(
1454            StatusCode::LockError,
1455            "database lock is held by another instance",
1456        );
1457        assert_eq!(want_err, DB::open("db", opt.clone()).err().unwrap());
1458    }
1459
1460    #[test]
1461    fn test_db_impl_build_table() {
1462        let mut opt = options::for_test();
1463        opt.block_size = 128;
1464        let mt = build_memtable();
1465
1466        let f = build_table("db", &opt, mt.iter(), 123).unwrap();
1467        let path = &Path::new("db").join("000123.ldb");
1468
1469        assert_eq!(LookupKey::new(b"aabc", 6).internal_key(), &f.smallest);
1470        assert_eq!(LookupKey::new(b"test123", 7).internal_key(), &f.largest);
1471        assert_eq!(379, f.size);
1472        assert_eq!(123, f.num);
1473        assert!(opt.env.exists(path).unwrap());
1474
1475        {
1476            // Read table back in.
1477            let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
1478            let tbl = tc.get_table(123).unwrap();
1479            assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count());
1480        }
1481
1482        {
1483            // Corrupt table; make sure it doesn't load fully.
1484            let mut buf = vec![];
1485            opt.env
1486                .open_sequential_file(path)
1487                .unwrap()
1488                .read_to_end(&mut buf)
1489                .unwrap();
1490            buf[150] += 1;
1491            opt.env
1492                .open_writable_file(path)
1493                .unwrap()
1494                .write_all(&buf)
1495                .unwrap();
1496
1497            let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
1498            let tbl = tc.get_table(123).unwrap();
1499            // The last two entries are skipped due to the corruption above.
1500            assert_eq!(
1501                5,
1502                LdbIteratorIter::wrap(&mut tbl.iter())
1503                    .map(|v| eprintln!("{:?}", v))
1504                    .count()
1505            );
1506        }
1507    }
1508
1509    #[allow(unused_variables)]
1510    #[test]
1511    fn test_db_impl_build_db_sanity() {
1512        let db = build_db().0;
1513        let env = &db.opt.env;
1514        let name = &db.name;
1515
1516        assert!(env.exists(Path::new(&log_file_name(name, 12))).unwrap());
1517    }
1518
1519    #[test]
1520    fn test_db_impl_get_from_table_with_snapshot() {
1521        let mut db = build_db().0;
1522
1523        assert_eq!(30, db.vset.borrow().last_seq);
1524
1525        // seq = 31
1526        db.put(b"xyy", b"123").unwrap();
1527        let old_ss = db.get_snapshot();
1528        // seq = 32
1529        db.put(b"xyz", b"123").unwrap();
1530        db.flush().unwrap();
1531        assert!(db.get_at(&old_ss, b"xyy").unwrap().is_some());
1532        assert!(db.get_at(&old_ss, b"xyz").unwrap().is_none());
1533
1534        // memtable get
1535        assert_eq!(b"123", db.get(b"xyz").unwrap().as_ref());
1536        assert!(db.get_internal(31, b"xyy").unwrap().is_some());
1537        assert!(db.get_internal(32, b"xyy").unwrap().is_some());
1538
1539        assert!(db.get_internal(31, b"xyz").unwrap().is_none());
1540        assert!(db.get_internal(32, b"xyz").unwrap().is_some());
1541
1542        // table get
1543        assert_eq!(b"val2", db.get(b"eab").unwrap().as_ref());
1544        assert!(db.get_internal(3, b"eab").unwrap().is_none());
1545        assert!(db.get_internal(32, b"eab").unwrap().is_some());
1546
1547        {
1548            let ss = db.get_snapshot();
1549            assert_eq!(b"val2", db.get_at(&ss, b"eab").unwrap().unwrap().as_ref());
1550        }
1551
1552        // from table.
1553        assert_eq!(b"val2", db.get(b"cab").unwrap().as_ref());
1554    }
1555
1556    #[test]
1557    fn test_db_impl_delete() {
1558        let mut db = build_db().0;
1559
1560        db.put(b"xyy", b"123").unwrap();
1561        db.put(b"xyz", b"123").unwrap();
1562
1563        assert!(db.get(b"xyy").is_some());
1564        assert!(db.get(b"gaa").is_some());
1565
1566        // Delete one memtable entry and one table entry.
1567        db.delete(b"xyy").unwrap();
1568        db.delete(b"gaa").unwrap();
1569
1570        assert!(db.get(b"xyy").is_none());
1571        assert!(db.get(b"gaa").is_none());
1572        assert!(db.get(b"xyz").is_some());
1573    }
1574
1575    #[test]
1576    fn test_db_impl_compact_single_file() {
1577        let mut db = build_db().0;
1578        set_file_to_compact(&mut db, 4);
1579        db.maybe_do_compaction().unwrap();
1580
1581        let env = &db.opt.env;
1582        let name = &db.name;
1583        assert!(!env.exists(Path::new(&table_file_name(name, 3))).unwrap());
1584        assert!(!env.exists(Path::new(&table_file_name(name, 4))).unwrap());
1585        assert!(!env.exists(Path::new(&table_file_name(name, 5))).unwrap());
1586        assert!(env.exists(Path::new(&table_file_name(name, 13))).unwrap());
1587    }
1588
1589    #[test]
1590    fn test_db_impl_compaction_trivial_move() {
1591        let mut db = DB::open("db", options::for_test()).unwrap();
1592
1593        db.put(b"abc", b"xyz").unwrap();
1594        db.put(b"ab3", b"xyz").unwrap();
1595        db.put(b"ab0", b"xyz").unwrap();
1596        db.put(b"abz", b"xyz").unwrap();
1597        assert_eq!(4, db.mem.len());
1598        let mut imm = MemTable::new(db.opt.cmp.clone());
1599        mem::swap(&mut imm, &mut db.mem);
1600        db.imm = Some(imm);
1601        db.compact_memtable().unwrap();
1602
1603        eprintln!(
1604            "children after: {:?}",
1605            db.opt.env.children(Path::new("db")).unwrap()
1606        );
1607        assert!(db
1608            .opt
1609            .env
1610            .exists(&Path::new("db").join("000004.ldb"))
1611            .unwrap());
1612
1613        {
1614            let v = db.current();
1615            let mut v = v.borrow_mut();
1616            v.file_to_compact = Some(v.files[2][0].clone());
1617            v.file_to_compact_lvl = 2;
1618        }
1619
1620        db.maybe_do_compaction().unwrap();
1621
1622        {
1623            let v = db.current();
1624            let v = v.borrow_mut();
1625            assert_eq!(1, v.files[3].len());
1626        }
1627    }
1628
1629    #[test]
1630    fn test_db_impl_memtable_compaction() {
1631        let mut opt = options::for_test();
1632        opt.write_buffer_size = 25;
1633        let mut db = DB::new("db", opt);
1634
1635        // Fill up memtable.
1636        db.mem = build_memtable();
1637
1638        // Trigger memtable compaction.
1639        db.make_room_for_write(true).unwrap();
1640        assert_eq!(0, db.mem.len());
1641        assert!(db
1642            .opt
1643            .env
1644            .exists(&Path::new("db").join("000002.log"))
1645            .unwrap());
1646        assert!(db
1647            .opt
1648            .env
1649            .exists(&Path::new("db").join("000003.ldb"))
1650            .unwrap());
1651        assert_eq!(
1652            351,
1653            db.opt
1654                .env
1655                .size_of(&Path::new("db").join("000003.ldb"))
1656                .unwrap()
1657        );
1658        assert_eq!(
1659            7,
1660            LdbIteratorIter::wrap(&mut db.cache.borrow_mut().get_table(3).unwrap().iter()).count()
1661        );
1662    }
1663
1664    #[test]
1665    fn test_db_impl_compaction() {
1666        let mut db = build_db().0;
1667        let v = db.current();
1668        v.borrow_mut().compaction_score = Some(2.0);
1669        v.borrow_mut().compaction_level = Some(1);
1670
1671        db.maybe_do_compaction().unwrap();
1672
1673        assert!(!db
1674            .opt
1675            .env
1676            .exists(&Path::new("db").join("000003.ldb"))
1677            .unwrap());
1678        assert!(db
1679            .opt
1680            .env
1681            .exists(&Path::new("db").join("000013.ldb"))
1682            .unwrap());
1683        assert_eq!(
1684            345,
1685            db.opt
1686                .env
1687                .size_of(&Path::new("db").join("000013.ldb"))
1688                .unwrap()
1689        );
1690
1691        // New current version.
1692        let v = db.current();
1693        assert_eq!(0, v.borrow().files[1].len());
1694        assert_eq!(2, v.borrow().files[2].len());
1695    }
1696
1697    #[test]
1698    fn test_db_impl_compaction_trivial() {
1699        let (mut v, opt) = make_version();
1700
1701        let to_compact = v.files[2][0].clone();
1702        v.file_to_compact = Some(to_compact);
1703        v.file_to_compact_lvl = 2;
1704
1705        let mut db = DB::new("db", opt.clone());
1706        db.vset.borrow_mut().add_version(v);
1707        db.vset.borrow_mut().next_file_num = 10;
1708
1709        db.maybe_do_compaction().unwrap();
1710
1711        assert!(opt.env.exists(&Path::new("db").join("000006.ldb")).unwrap());
1712        assert!(!opt.env.exists(&Path::new("db").join("000010.ldb")).unwrap());
1713        assert_eq!(
1714            218,
1715            opt.env
1716                .size_of(&Path::new("db").join("000006.ldb"))
1717                .unwrap()
1718        );
1719
1720        let v = db.current();
1721        assert_eq!(1, v.borrow().files[2].len());
1722        assert_eq!(3, v.borrow().files[3].len());
1723    }
1724
1725    #[test]
1726    fn test_db_impl_compaction_state_cleanup() {
1727        let env: Box<dyn Env> = Box::new(MemEnv::new());
1728        let name = "db";
1729
1730        let stuff = b"abcdefghijkl";
1731        env.open_writable_file(&Path::new("db").join("000001.ldb"))
1732            .unwrap()
1733            .write_all(stuff)
1734            .unwrap();
1735        let fmd = FileMetaData {
1736            num: 1,
1737            ..Default::default()
1738        };
1739
1740        let mut cs = CompactionState::new(Compaction::new(&options::for_test(), 2, None), 12);
1741        cs.outputs = vec![fmd];
1742        cs.cleanup(env.as_ref(), name);
1743
1744        assert!(!env.exists(&Path::new("db").join("000001.ldb")).unwrap());
1745    }
1746
1747    #[test]
1748    fn test_db_impl_open_close_reopen() {
1749        let opt;
1750        {
1751            let mut db = build_db().0;
1752            opt = db.opt.clone();
1753            db.put(b"xx1", b"111").unwrap();
1754            db.put(b"xx2", b"112").unwrap();
1755            db.put(b"xx3", b"113").unwrap();
1756            db.put(b"xx4", b"114").unwrap();
1757            db.put(b"xx5", b"115").unwrap();
1758            db.delete(b"xx2").unwrap();
1759        }
1760
1761        {
1762            let mut db = DB::open("db", opt.clone()).unwrap();
1763            db.delete(b"xx5").unwrap();
1764        }
1765
1766        {
1767            let mut db = DB::open("db", opt.clone()).unwrap();
1768
1769            assert_eq!(None, db.get(b"xx5"));
1770
1771            let ss = db.get_snapshot();
1772            db.put(b"xx4", b"222").unwrap();
1773            let ss2 = db.get_snapshot();
1774
1775            assert_eq!(
1776                Some(b"113".to_vec()),
1777                db.get_at(&ss, b"xx3").unwrap().map(|b| b.to_vec())
1778            );
1779            assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1780            assert_eq!(None, db.get_at(&ss, b"xx5").unwrap());
1781
1782            assert_eq!(
1783                Some(b"114".to_vec()),
1784                db.get_at(&ss, b"xx4").unwrap().map(|b| b.to_vec())
1785            );
1786            assert_eq!(
1787                Some(b"222".to_vec()),
1788                db.get_at(&ss2, b"xx4").unwrap().map(|b| b.to_vec())
1789            );
1790        }
1791
1792        {
1793            let mut db = DB::open("db", opt).unwrap();
1794
1795            let ss = db.get_snapshot();
1796            assert_eq!(
1797                Some(b"113".to_vec()),
1798                db.get_at(&ss, b"xx3").unwrap().map(|b| b.to_vec())
1799            );
1800            assert_eq!(
1801                Some(b"222".to_vec()),
1802                db.get_at(&ss, b"xx4").unwrap().map(|b| b.to_vec())
1803            );
1804            assert_eq!(None, db.get_at(&ss, b"xx2").unwrap());
1805        }
1806    }
1807
1808    #[test]
1809    fn test_drop_memtable() {
1810        let mut db = DB::open("db", options::for_test()).unwrap();
1811        let mut cnt = 0;
1812        let mut buf: Vec<u8> = Vec::with_capacity(8);
1813        let entries_per_batch = 1;
1814        let max_num = 100000;
1815        while cnt < max_num {
1816            let mut write_batch = WriteBatch::new();
1817            for i in 0..entries_per_batch {
1818                buf.clear();
1819                buf.extend_from_slice(format!("{}-{}", cnt, i).as_bytes());
1820                write_batch.put(buf.as_slice(), buf.as_slice());
1821            }
1822            db.write(write_batch, false).unwrap();
1823            cnt += entries_per_batch;
1824        }
1825    }
1826}