rusty_leveldb/
version_set.rs

1use crate::cmp::{Cmp, InternalKeyCmp};
2use crate::env::Env;
3use crate::error::{err, Result, StatusCode};
4use crate::key_types::{parse_internal_key, InternalKey, UserKey};
5use crate::log::{LogReader, LogWriter};
6use crate::merging_iter::MergingIter;
7use crate::options::Options;
8use crate::table_cache::TableCache;
9use crate::types::{
10    parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, Shared, NUM_LEVELS,
11};
12use crate::version::{new_version_iter, total_size, FileMetaHandle, Version};
13use crate::version_edit::VersionEdit;
14
15use bytes::Bytes;
16use std::cmp::Ordering;
17use std::collections::HashSet;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21
22pub struct Compaction {
23    level: usize,
24    max_file_size: usize,
25    input_version: Option<Shared<Version>>,
26    level_ixs: [usize; NUM_LEVELS],
27    cmp: Rc<Box<dyn Cmp>>,
28    icmp: InternalKeyCmp,
29
30    manual: bool,
31
32    // "parent" inputs from level and level+1.
33    inputs: [Vec<FileMetaHandle>; 2],
34    grandparent_ix: usize,
35    // remaining inputs from level+2..NUM_LEVELS
36    grandparents: Option<Vec<FileMetaHandle>>,
37    overlapped_bytes: usize,
38    seen_key: bool,
39    edit: VersionEdit,
40}
41
42impl Compaction {
43    // Note: opt.cmp should be the user-supplied or default comparator (not an InternalKeyCmp).
44    pub fn new(opt: &Options, level: usize, input: Option<Shared<Version>>) -> Compaction {
45        Compaction {
46            level,
47            max_file_size: opt.max_file_size,
48            input_version: input,
49            level_ixs: Default::default(),
50            cmp: opt.cmp.clone(),
51            icmp: InternalKeyCmp(opt.cmp.clone()),
52            manual: false,
53
54            inputs: Default::default(),
55            grandparent_ix: 0,
56            grandparents: Default::default(),
57            overlapped_bytes: 0,
58            seen_key: false,
59            edit: VersionEdit::new(),
60        }
61    }
62
63    fn add_input(&mut self, parent: usize, f: FileMetaHandle) {
64        assert!(parent <= 1);
65        self.inputs[parent].push(f)
66    }
67
68    pub fn level(&self) -> usize {
69        self.level
70    }
71
72    pub fn input(&self, parent: usize, ix: usize) -> FileMetaData {
73        assert!(parent < 2);
74        assert!(ix < self.inputs[parent].len());
75        self.inputs[parent][ix].borrow().clone()
76    }
77
78    pub fn num_inputs(&self, parent: usize) -> usize {
79        assert!(parent < 2);
80        self.inputs[parent].len()
81    }
82
83    pub fn edit(&mut self) -> &mut VersionEdit {
84        &mut self.edit
85    }
86
87    pub fn into_edit(self) -> VersionEdit {
88        self.edit
89    }
90
91    /// add_input_deletions marks the current input files as deleted in the inner VersionEdit.
92    pub fn add_input_deletions(&mut self) {
93        for parent in 0..2 {
94            for f in &self.inputs[parent] {
95                self.edit.delete_file(self.level + parent, f.borrow().num);
96            }
97        }
98    }
99
100    /// is_base_level_for checks whether the given key may exist in levels higher than this
101    /// compaction's level plus 2. I.e., whether the levels for this compaction are the last ones
102    /// to contain the key.
103    pub fn is_base_level_for(&mut self, k: UserKey<'_>) -> bool {
104        assert!(self.input_version.is_some());
105        let inp_version = self.input_version.as_ref().unwrap();
106        for level in self.level + 2..NUM_LEVELS {
107            let files = &inp_version.borrow().files[level];
108            while self.level_ixs[level] < files.len() {
109                let f = files[self.level_ixs[level]].borrow();
110                if self.cmp.cmp(k, parse_internal_key(&f.largest).2) <= Ordering::Equal {
111                    if self.cmp.cmp(k, parse_internal_key(&f.smallest).2) >= Ordering::Equal {
112                        // key is in this file's range, so this is not the base level.
113                        return false;
114                    }
115                    break;
116                }
117                // level_ixs contains cross-call state to speed up following lookups.
118                self.level_ixs[level] += 1;
119            }
120        }
121        true
122    }
123
124    pub fn is_trivial_move(&self) -> bool {
125        if self.manual {
126            return false;
127        }
128
129        let inputs_size;
130        if let Some(gp) = self.grandparents.as_ref() {
131            inputs_size = total_size(gp.iter());
132        } else {
133            inputs_size = 0;
134        }
135        self.num_inputs(0) == 1 && self.num_inputs(1) == 0 && inputs_size < 10 * self.max_file_size
136    }
137
138    pub fn should_stop_before(&mut self, k: InternalKey<'_>) -> bool {
139        if self.grandparents.is_none() {
140            self.seen_key = true;
141            return false;
142        }
143        let grandparents = self.grandparents.as_ref().unwrap();
144        while self.grandparent_ix < grandparents.len()
145            && self
146                .icmp
147                .cmp(k, &grandparents[self.grandparent_ix].borrow().largest)
148                == Ordering::Greater
149        {
150            if self.seen_key {
151                self.overlapped_bytes += grandparents[self.grandparent_ix].borrow().size;
152            }
153            self.grandparent_ix += 1;
154        }
155        self.seen_key = true;
156
157        if self.overlapped_bytes > 10 * self.max_file_size {
158            self.overlapped_bytes = 0;
159            true
160        } else {
161            false
162        }
163    }
164}
165
166/// VersionSet managed the various versions that are live within a database. A single version
167/// contains references to the files on disk as they were at a certain point.
168pub struct VersionSet {
169    dbname: PathBuf,
170    opt: Options,
171    cmp: InternalKeyCmp,
172    cache: Shared<TableCache>,
173
174    pub next_file_num: u64,
175    pub manifest_num: u64,
176    pub last_seq: u64,
177    pub log_num: u64,
178    pub prev_log_num: u64,
179
180    current: Option<Shared<Version>>,
181    compaction_ptrs: [Bytes; NUM_LEVELS],
182
183    descriptor_log: Option<LogWriter<Box<dyn Write>>>,
184}
185
186impl VersionSet {
187    // Note: opt.cmp should not contain an InternalKeyCmp at this point, but instead the default or
188    // user-supplied one.
189    pub fn new<P: AsRef<Path>>(db: P, opt: Options, cache: Shared<TableCache>) -> VersionSet {
190        let v = share(Version::new(cache.clone(), opt.cmp.clone()));
191        VersionSet {
192            dbname: db.as_ref().to_owned(),
193            cmp: InternalKeyCmp(opt.cmp.clone()),
194            opt,
195            cache,
196
197            next_file_num: 2,
198            manifest_num: 0,
199            last_seq: 0,
200            log_num: 0,
201            prev_log_num: 0,
202
203            current: Some(v),
204            compaction_ptrs: Default::default(),
205            descriptor_log: None,
206        }
207    }
208
209    pub fn current_summary(&self) -> String {
210        self.current.as_ref().unwrap().borrow().level_summary()
211    }
212
213    /// live_files returns the files that are currently active.
214    pub fn live_files(&self) -> HashSet<FileNum> {
215        let mut files = HashSet::new();
216        if let Some(ref version) = self.current {
217            for level in 0..NUM_LEVELS {
218                for file in &version.borrow().files[level] {
219                    files.insert(file.borrow().num);
220                }
221            }
222        }
223        files
224    }
225
226    /// current returns a reference to the current version. It panics if there is no current
227    /// version.
228    pub fn current(&self) -> Shared<Version> {
229        assert!(self.current.is_some());
230        self.current.as_ref().unwrap().clone()
231    }
232
233    pub fn add_version(&mut self, v: Version) {
234        self.current = Some(share(v));
235    }
236
237    pub fn new_file_number(&mut self) -> FileNum {
238        self.next_file_num += 1;
239        self.next_file_num - 1
240    }
241
242    pub fn reuse_file_number(&mut self, n: FileNum) {
243        if n == self.next_file_num - 1 {
244            self.next_file_num = n;
245        }
246    }
247
248    pub fn mark_file_number_used(&mut self, n: FileNum) {
249        if self.next_file_num <= n {
250            self.next_file_num = n + 1;
251        }
252    }
253
254    /// needs_compaction returns true if a compaction makes sense at this point.
255    pub fn needs_compaction(&self) -> bool {
256        assert!(self.current.is_some());
257        let v = self.current.as_ref().unwrap();
258        let v = v.borrow();
259        v.compaction_score.unwrap_or(0.0) >= 1.0 || v.file_to_compact.is_some()
260    }
261
262    fn approximate_offset(&self, v: &Shared<Version>, key: InternalKey<'_>) -> usize {
263        let mut offset = 0;
264        for level in 0..NUM_LEVELS {
265            for f in &v.borrow().files[level] {
266                if self.opt.cmp.cmp(&f.borrow().largest, key) <= Ordering::Equal {
267                    offset += f.borrow().size;
268                } else if self.opt.cmp.cmp(&f.borrow().smallest, key) == Ordering::Greater {
269                    // In higher levels, files are sorted; we don't need to search further.
270                    if level > 0 {
271                        break;
272                    }
273                } else if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) {
274                    offset += tbl.approx_offset_of(key);
275                }
276            }
277        }
278        offset
279    }
280
281    pub fn pick_compaction(&mut self) -> Option<Compaction> {
282        assert!(self.current.is_some());
283        let current = self.current();
284        let current = current.borrow();
285
286        let mut c = Compaction::new(&self.opt, 0, self.current.clone());
287        let level;
288
289        // Size compaction?
290        if current.compaction_score.unwrap_or(0.0) >= 1.0 {
291            level = current.compaction_level.unwrap();
292            assert!(level < NUM_LEVELS - 1);
293
294            for f in &current.files[level] {
295                if self.compaction_ptrs[level].is_empty()
296                    || self
297                        .cmp
298                        .cmp(&f.borrow().largest, &self.compaction_ptrs[level])
299                        == Ordering::Greater
300                {
301                    c.add_input(0, f.clone());
302                    break;
303                }
304            }
305
306            if c.num_inputs(0) == 0 {
307                // Add first file in level. This will also reset the compaction pointers.
308                c.add_input(0, current.files[level][0].clone());
309            }
310        } else if let Some(ref ftc) = current.file_to_compact {
311            // Seek compaction?
312            level = current.file_to_compact_lvl;
313            c.add_input(0, ftc.clone());
314        } else {
315            return None;
316        }
317
318        c.level = level;
319        c.input_version.clone_from(&self.current);
320
321        if level == 0 {
322            let (smallest, largest) = get_range(&self.cmp, c.inputs[0].iter());
323            // This call intentionally overwrites the file previously put into c.inputs[0].
324            c.inputs[0] = current.overlapping_inputs(0, &smallest, &largest);
325            assert!(!c.inputs[0].is_empty());
326        }
327
328        self.setup_other_inputs(&mut c);
329        Some(c)
330    }
331
332    pub fn compact_range(
333        &mut self,
334        level: usize,
335        from: InternalKey<'_>,
336        to: InternalKey<'_>,
337    ) -> Option<Compaction> {
338        assert!(self.current.is_some());
339        let mut inputs = self
340            .current
341            .as_ref()
342            .unwrap()
343            .borrow()
344            .overlapping_inputs(level, from, to);
345        if inputs.is_empty() {
346            return None;
347        }
348
349        if level > 0 {
350            let mut total = 0;
351            for i in 0..inputs.len() {
352                total += inputs[i].borrow().size;
353                if total > self.opt.max_file_size {
354                    inputs.truncate(i + 1);
355                    break;
356                }
357            }
358        }
359
360        let mut c = Compaction::new(&self.opt, level, self.current.clone());
361        c.inputs[0] = inputs;
362        c.manual = true;
363        self.setup_other_inputs(&mut c);
364        Some(c)
365    }
366
367    fn setup_other_inputs(&mut self, compaction: &mut Compaction) {
368        assert!(self.current.is_some());
369        let current = self.current.as_ref().unwrap();
370        let current = current.borrow();
371
372        let level = compaction.level;
373        let (mut smallest, mut largest) = get_range(&self.cmp, compaction.inputs[0].iter());
374
375        // Set up level+1 inputs.
376        compaction.inputs[1] = current.overlapping_inputs(level + 1, &smallest, &largest);
377
378        let (mut allstart, mut alllimit) = get_range(
379            &self.cmp,
380            compaction.inputs[0]
381                .iter()
382                .chain(compaction.inputs[1].iter()),
383        );
384
385        // Check if we can add more inputs in the current level without having to compact more
386        // inputs from level+1.
387        if !compaction.inputs[1].is_empty() {
388            let expanded0 = current.overlapping_inputs(level, &allstart, &alllimit);
389            let inputs1_size = total_size(compaction.inputs[1].iter());
390            let expanded0_size = total_size(expanded0.iter());
391            // ...if we picked up more files in the current level, and the total size is acceptable
392            if expanded0.len() > compaction.num_inputs(0)
393                && (inputs1_size + expanded0_size) < 25 * self.opt.max_file_size
394            {
395                let (new_start, new_limit) = get_range(&self.cmp, expanded0.iter());
396                let expanded1 = current.overlapping_inputs(level + 1, &new_start, &new_limit);
397                if expanded1.len() == compaction.num_inputs(1) {
398                    log!(
399                        self.opt.log,
400                        "Expanding inputs@{} {}+{} ({}+{} bytes) to {}+{} ({}+{} bytes)",
401                        level,
402                        compaction.inputs[0].len(),
403                        compaction.inputs[1].len(),
404                        total_size(compaction.inputs[0].iter()),
405                        total_size(compaction.inputs[1].iter()),
406                        expanded0.len(),
407                        expanded1.len(),
408                        total_size(expanded0.iter()),
409                        total_size(expanded1.iter())
410                    );
411
412                    smallest = new_start;
413                    largest = new_limit;
414                    compaction.inputs[0] = expanded0;
415                    compaction.inputs[1] = expanded1;
416                    let (newallstart, newalllimit) = get_range(
417                        &self.cmp,
418                        compaction.inputs[0]
419                            .iter()
420                            .chain(compaction.inputs[1].iter()),
421                    );
422                    allstart = newallstart;
423                    alllimit = newalllimit;
424                }
425            }
426        }
427
428        // Set the list of grandparent (l+2) inputs to the files overlapped by the current overall
429        // range.
430        if level + 2 < NUM_LEVELS {
431            let grandparents = self.current.as_ref().unwrap().borrow().overlapping_inputs(
432                level + 2,
433                &allstart,
434                &alllimit,
435            );
436            compaction.grandparents = Some(grandparents);
437        }
438
439        log!(
440            self.opt.log,
441            "Compacting @{} {:?} .. {:?}",
442            level,
443            smallest,
444            largest
445        );
446
447        compaction.edit().set_compact_pointer(level, &largest);
448        self.compaction_ptrs[level] = largest;
449    }
450
451    /// write_snapshot writes the current version, with all files, to the manifest.
452    fn write_snapshot(&mut self) -> Result<usize> {
453        assert!(self.descriptor_log.is_some());
454
455        let mut edit = VersionEdit::new();
456        edit.set_comparator_name(self.opt.cmp.id());
457
458        // Save compaction pointers.
459        for level in 0..NUM_LEVELS {
460            if !self.compaction_ptrs[level].is_empty() {
461                edit.set_compact_pointer(level, &self.compaction_ptrs[level]);
462            }
463        }
464
465        let current = self.current.as_ref().unwrap().borrow();
466        // Save files.
467        for level in 0..NUM_LEVELS {
468            let fs = &current.files[level];
469            for f in fs {
470                edit.add_file(level, f.borrow().clone());
471            }
472        }
473        self.descriptor_log
474            .as_mut()
475            .unwrap()
476            .add_record(&edit.encode())
477    }
478
479    /// log_and_apply merges the given edit with the current state and generates a new version. It
480    /// writes the VersionEdit to the manifest.
481    pub fn log_and_apply(&mut self, mut edit: VersionEdit) -> Result<()> {
482        assert!(self.current.is_some());
483
484        if edit.log_number.is_none() {
485            edit.set_log_num(self.log_num);
486        } else {
487            assert!(edit.log_number.unwrap() >= self.log_num);
488            assert!(edit.log_number.unwrap() < self.next_file_num);
489        }
490        if edit.prev_log_number.is_none() {
491            edit.set_prev_log_num(self.prev_log_num);
492        }
493        edit.set_next_file(self.next_file_num);
494        edit.set_last_seq(self.last_seq);
495
496        let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
497        {
498            let mut builder = Builder::new();
499            builder.apply(&edit, &mut self.compaction_ptrs);
500            builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
501        }
502        self.finalize(&mut v);
503
504        if self.descriptor_log.is_none() {
505            let descname = manifest_file_name(&self.dbname, self.manifest_num);
506            edit.set_next_file(self.next_file_num);
507            self.descriptor_log = Some(LogWriter::new(
508                self.opt.env.open_writable_file(Path::new(&descname))?,
509            ));
510            self.write_snapshot()?;
511        }
512
513        let encoded = edit.encode();
514        if let Some(ref mut lw) = self.descriptor_log {
515            lw.add_record(&encoded)?;
516            lw.flush()?;
517        }
518        set_current_file(
519            self.opt.env.as_ref().as_ref(),
520            &self.dbname,
521            self.manifest_num,
522        )?;
523
524        self.add_version(v);
525        // log_number was set above.
526        self.log_num = edit.log_number.unwrap();
527
528        // TODO: Roll back written files if something went wrong.
529        Ok(())
530    }
531
532    fn finalize(&self, v: &mut Version) {
533        let mut best_lvl = None;
534        let mut best_score = None;
535
536        for l in 0..NUM_LEVELS - 1 {
537            let score = if l == 0 {
538                v.files[l].len() as f64 / 4.0
539            } else {
540                let mut max_bytes = 10.0 * f64::from(1 << 20);
541                for _ in 0..l - 1 {
542                    max_bytes *= 10.0;
543                }
544                total_size(v.files[l].iter()) as f64 / max_bytes
545            };
546            if let Some(ref mut b) = best_score {
547                if *b < score {
548                    *b = score;
549                    best_lvl = Some(l);
550                }
551            } else {
552                best_score = Some(score);
553                best_lvl = Some(l);
554            }
555        }
556        v.compaction_score = best_score;
557        v.compaction_level = best_lvl;
558    }
559
560    /// recover recovers the state of a LevelDB instance from the files on disk. If recover()
561    /// returns true, the a manifest needs to be written eventually (using log_and_apply()).
562    pub fn recover(&mut self) -> Result<bool> {
563        assert!(self.current.is_some());
564
565        let mut current = read_current_file(self.opt.env.as_ref().as_ref(), &self.dbname)?;
566        let len = current.len();
567        current.truncate(len - 1);
568        let current = Path::new(&current);
569
570        let descfilename = self.dbname.join(current);
571        let mut builder = Builder::new();
572        {
573            let mut descfile = self
574                .opt
575                .env
576                .open_sequential_file(Path::new(&descfilename))?;
577            let mut logreader = LogReader::new(
578                &mut descfile,
579                // checksum=
580                true,
581            );
582
583            let mut log_number = None;
584            let mut prev_log_number = None;
585            let mut next_file_number = None;
586            let mut last_seq = None;
587
588            let mut buf = Vec::new();
589            while let Ok(size) = logreader.read(&mut buf) {
590                if size == 0 {
591                    break;
592                }
593                let edit = VersionEdit::decode_from(&buf)?;
594                builder.apply(&edit, &mut self.compaction_ptrs);
595                if let Some(ln) = edit.log_number {
596                    log_number = Some(ln);
597                }
598                if let Some(nfn) = edit.next_file_number {
599                    next_file_number = Some(nfn);
600                }
601                if let Some(ls) = edit.last_seq {
602                    last_seq = Some(ls);
603                }
604                if let Some(pln) = edit.prev_log_number {
605                    prev_log_number = Some(pln);
606                }
607            }
608
609            if let Some(ln) = log_number {
610                self.log_num = ln;
611                self.mark_file_number_used(ln);
612            } else {
613                return err(
614                    StatusCode::Corruption,
615                    "no meta-lognumber entry in descriptor",
616                );
617            }
618            if let Some(nfn) = next_file_number {
619                self.next_file_num = nfn + 1;
620            } else {
621                return err(
622                    StatusCode::Corruption,
623                    "no meta-next-file entry in descriptor",
624                );
625            }
626            if let Some(ls) = last_seq {
627                self.last_seq = ls;
628            } else {
629                return err(
630                    StatusCode::Corruption,
631                    "no last-sequence entry in descriptor",
632                );
633            }
634            if let Some(pln) = prev_log_number {
635                self.prev_log_num = pln;
636                self.mark_file_number_used(prev_log_number.unwrap());
637            } else {
638                self.prev_log_num = 0;
639            }
640        }
641
642        let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
643        builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
644        self.finalize(&mut v);
645        self.add_version(v);
646        self.manifest_num = self.next_file_num - 1;
647        log!(
648            self.opt.log,
649            "Recovered manifest with next_file={} manifest_num={} log_num={} prev_log_num={} \
650             last_seq={}",
651            self.next_file_num,
652            self.manifest_num,
653            self.log_num,
654            self.prev_log_num,
655            self.last_seq
656        );
657
658        // A new manifest needs to be written only if we don't reuse the existing one.
659        Ok(!self.reuse_manifest(&descfilename, current))
660    }
661
662    /// reuse_manifest checks whether the current manifest can be reused.
663    fn reuse_manifest(
664        &mut self,
665        current_manifest_path: &Path,
666        current_manifest_base: &Path,
667    ) -> bool {
668        // Note: The original has only one option, reuse_logs; reuse_logs has to be set in order to
669        // reuse manifests.
670        // However, there's not much that stops us from reusing manifests without reusing logs or
671        // vice versa. One issue exists though: If no write operations are done, empty log files
672        // will accumulate every time a DB is opened, until at least one write happens (otherwise,
673        // the logs won't be compacted and deleted).
674        if !self.opt.reuse_manifest {
675            return false;
676        }
677        // The original doesn't reuse manifests; we do.
678        if let Ok((num, typ)) = parse_file_name(current_manifest_base) {
679            if typ != FileType::Descriptor {
680                return false;
681            }
682            if let Ok(size) = self.opt.env.size_of(Path::new(current_manifest_path)) {
683                if size >= self.opt.max_file_size {
684                    return false;
685                }
686
687                assert!(self.descriptor_log.is_none());
688                let s = self
689                    .opt
690                    .env
691                    .open_appendable_file(Path::new(current_manifest_path));
692                if let Ok(f) = s {
693                    log!(self.opt.log, "reusing manifest {:?}", current_manifest_path);
694                    self.descriptor_log = Some(LogWriter::new_with_off(f, size));
695                    self.manifest_num = num;
696                    return true;
697                } else {
698                    log!(self.opt.log, "reuse_manifest: {}", s.err().unwrap());
699                }
700            }
701        }
702        false
703    }
704
705    /// make_input_iterator returns an iterator over the inputs of a compaction.
706    pub fn make_input_iterator(&self, c: &Compaction) -> Box<dyn LdbIterator> {
707        let cap = if c.level == 0 { c.num_inputs(0) + 1 } else { 2 };
708        let mut iters: Vec<Box<dyn LdbIterator>> = Vec::with_capacity(cap);
709        for i in 0..2 {
710            if c.num_inputs(i) == 0 {
711                continue;
712            }
713            if c.level + i == 0 {
714                // Add individual iterators for L0 tables.
715                for fi in 0..c.num_inputs(i) {
716                    let f = &c.inputs[i][fi];
717                    let s = self.cache.borrow_mut().get_table(f.borrow().num);
718                    if let Ok(tbl) = s {
719                        iters.push(Box::new(tbl.iter()));
720                    } else {
721                        log!(
722                            self.opt.log,
723                            "error opening table {}: {}",
724                            f.borrow().num,
725                            s.err().unwrap()
726                        );
727                    }
728                }
729            } else {
730                // Create concatenating iterator higher levels.
731                iters.push(Box::new(new_version_iter(
732                    c.inputs[i].clone(),
733                    self.cache.clone(),
734                    self.opt.cmp.clone(),
735                )));
736            }
737        }
738        assert!(iters.len() <= cap);
739        let cmp: Rc<Box<dyn Cmp>> = Rc::new(Box::new(self.cmp.clone()));
740        Box::new(MergingIter::new(cmp, iters))
741    }
742}
743
744struct Builder {
745    // (added, deleted) files per level.
746    deleted: [Vec<FileNum>; NUM_LEVELS],
747    added: [Vec<FileMetaHandle>; NUM_LEVELS],
748}
749
750impl Builder {
751    fn new() -> Builder {
752        Builder {
753            deleted: Default::default(),
754            added: Default::default(),
755        }
756    }
757
758    /// apply applies the edits recorded in edit to the builder state. compaction pointers are
759    /// copied to the supplied compaction_ptrs array.
760    fn apply(&mut self, edit: &VersionEdit, compaction_ptrs: &mut [Bytes; NUM_LEVELS]) {
761        for c in edit.compaction_ptrs.iter() {
762            compaction_ptrs[c.level].clone_from(&c.key);
763        }
764        for &(level, num) in edit.deleted.iter() {
765            self.deleted[level].push(num);
766        }
767        for &(level, ref f) in edit.new_files.iter() {
768            let mut f = f.clone();
769            f.allowed_seeks = f.size / 16384;
770            if f.allowed_seeks < 100 {
771                f.allowed_seeks = 100;
772            }
773            // Remove this file from the list of deleted files.
774            self.deleted[level] = self.deleted[level]
775                .iter()
776                .filter_map(|d| if *d != f.num { Some(*d) } else { None })
777                .collect();
778            self.added[level].push(share(f));
779        }
780    }
781
782    /// maybe_add_file adds a file f at level to version v, if it's not already marked as deleted
783    /// in this edit. It also asserts that the ordering of files is preserved.
784    fn maybe_add_file(
785        &mut self,
786        cmp: &InternalKeyCmp,
787        v: &mut Version,
788        level: usize,
789        f: FileMetaHandle,
790    ) {
791        // Only add file if it's not already deleted.
792        if self.deleted[level].iter().any(|d| *d == f.borrow().num) {
793            return;
794        }
795        {
796            let files = &v.files[level];
797            if level > 0 && !files.is_empty() {
798                // File must be after last file in level.
799                assert_eq!(
800                    cmp.cmp(
801                        &files[files.len() - 1].borrow().largest,
802                        &f.borrow().smallest
803                    ),
804                    Ordering::Less
805                );
806            }
807        }
808        v.files[level].push(f);
809    }
810
811    /// save_to saves the edits applied to the builder to v, adding all non-deleted files from
812    /// Version base to v.
813    fn save_to(&mut self, cmp: &InternalKeyCmp, base: &Shared<Version>, v: &mut Version) {
814        for level in 0..NUM_LEVELS {
815            sort_files_by_smallest(cmp, &mut self.added[level]);
816            // The base version should already have sorted files.
817            sort_files_by_smallest(cmp, &mut base.borrow_mut().files[level]);
818
819            let added = self.added[level].clone();
820            let basefiles = base.borrow().files[level].clone();
821            v.files[level].reserve(basefiles.len() + self.added[level].len());
822
823            let iadded = added.into_iter();
824            let ibasefiles = basefiles.into_iter();
825            let merged = merge_iters(iadded, ibasefiles, |a, b| {
826                cmp.cmp(&a.borrow().smallest, &b.borrow().smallest)
827            });
828            for m in merged {
829                self.maybe_add_file(cmp, v, level, m);
830            }
831
832            // Make sure that there is no overlap in higher levels.
833            if level == 0 {
834                continue;
835            }
836            for i in 1..v.files[level].len() {
837                let (prev_end, this_begin) = (
838                    &v.files[level][i - 1].borrow().largest,
839                    &v.files[level][i].borrow().smallest,
840                );
841                assert!(cmp.cmp(prev_end, this_begin) < Ordering::Equal);
842            }
843        }
844    }
845}
846
847fn manifest_name(file_num: FileNum) -> PathBuf {
848    Path::new(&format!("MANIFEST-{:06}", file_num)).to_owned()
849}
850
851pub fn manifest_file_name<P: AsRef<Path>>(dbname: P, file_num: FileNum) -> PathBuf {
852    dbname.as_ref().join(manifest_name(file_num))
853}
854
855fn temp_file_name<P: AsRef<Path>>(dbname: P, file_num: FileNum) -> PathBuf {
856    dbname.as_ref().join(format!("{:06}.dbtmp", file_num))
857}
858
859fn current_file_name<P: AsRef<Path>>(dbname: P) -> PathBuf {
860    dbname.as_ref().join("CURRENT").to_owned()
861}
862
863pub fn read_current_file(env: &dyn Env, dbname: &Path) -> Result<String> {
864    let mut current = String::new();
865    let mut f = env.open_sequential_file(Path::new(&current_file_name(dbname)))?;
866    f.read_to_string(&mut current)?;
867    if current.is_empty() || !current.ends_with('\n') {
868        return err(
869            StatusCode::Corruption,
870            "current file is empty or has no newline",
871        );
872    }
873    Ok(current)
874}
875
876pub fn set_current_file<P: AsRef<Path>>(
877    env: &dyn Env,
878    dbname: P,
879    manifest_file_num: FileNum,
880) -> Result<()> {
881    let dbname = dbname.as_ref();
882    let manifest_base = manifest_name(manifest_file_num);
883    let tempfile = temp_file_name(dbname, manifest_file_num);
884    {
885        let mut f = env.open_writable_file(Path::new(&tempfile))?;
886        f.write_all(manifest_base.display().to_string().as_bytes())?;
887        f.write_all(b"\n")?;
888    }
889    let currentfile = current_file_name(dbname);
890    if let Err(e) = env.rename(Path::new(&tempfile), Path::new(&currentfile)) {
891        // ignore error.
892        let _ = env.delete(Path::new(&tempfile));
893        return Err(e);
894    }
895    Ok(())
896}
897
898/// sort_files_by_smallest sorts the list of files by the smallest keys of the files.
899fn sort_files_by_smallest<C: Cmp>(cmp: &C, files: &mut [FileMetaHandle]) {
900    files.sort_by(|a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest))
901}
902
903/// merge_iters merges and collects the items from two sorted iterators.
904fn merge_iters<
905    Item,
906    C: Fn(&Item, &Item) -> Ordering,
907    I: Iterator<Item = Item>,
908    J: Iterator<Item = Item>,
909>(
910    mut iter_a: I,
911    mut iter_b: J,
912    cmp: C,
913) -> Vec<Item> {
914    let mut a = iter_a.next();
915    let mut b = iter_b.next();
916    let mut out = vec![];
917    while a.is_some() && b.is_some() {
918        let ord = cmp(a.as_ref().unwrap(), b.as_ref().unwrap());
919        if ord == Ordering::Less {
920            out.push(a.unwrap());
921            a = iter_a.next();
922        } else {
923            out.push(b.unwrap());
924            b = iter_b.next();
925        }
926    }
927
928    // Push cached elements.
929    if let Some(a_) = a {
930        out.push(a_);
931    }
932    if let Some(b_) = b {
933        out.push(b_);
934    }
935
936    // Push remaining elements from either iterator.
937    for a in iter_a {
938        out.push(a);
939    }
940    for b in iter_b {
941        out.push(b);
942    }
943    out
944}
945
946/// get_range returns the indices of the files within files that have the smallest lower bound
947/// respectively the largest upper bound.
948fn get_range<'a, C: Cmp, I: Iterator<Item = &'a FileMetaHandle>>(
949    c: &C,
950    files: I,
951) -> (Bytes, Bytes) {
952    let mut smallest = None;
953    let mut largest = None;
954    for f in files {
955        if smallest.is_none() {
956            smallest = Some(f.borrow().smallest.clone());
957        }
958        if largest.is_none() {
959            largest = Some(f.borrow().largest.clone());
960        }
961        let f = f.borrow();
962        if c.cmp(&f.smallest, smallest.as_ref().unwrap()) == Ordering::Less {
963            smallest = Some(f.smallest.clone());
964        }
965        if c.cmp(&f.largest, largest.as_ref().unwrap()) == Ordering::Greater {
966            largest = Some(f.largest.clone());
967        }
968    }
969    (smallest.unwrap(), largest.unwrap())
970}
971
972#[cfg(test)]
973mod tests {
974    use super::*;
975    use crate::cache::Cache;
976    use crate::cmp::DefaultCmp;
977    use crate::key_types::LookupKey;
978    use crate::test_util::LdbIteratorIter;
979    use crate::types::FileMetaData;
980    use crate::version::testutil::make_version;
981
982    fn example_files() -> Vec<FileMetaHandle> {
983        let f1 = FileMetaData {
984            num: 1,
985            size: 10,
986            smallest: b"f".to_vec().into(),
987            largest: b"g".to_vec().into(),
988            ..Default::default()
989        };
990        let f2 = FileMetaData {
991            num: 2,
992            size: 20,
993            smallest: b"e".to_vec().into(),
994            largest: b"f".to_vec().into(),
995            ..Default::default()
996        };
997        let f3 = FileMetaData {
998            num: 3,
999            size: 30,
1000            smallest: b"a".to_vec().into(),
1001            largest: b"b".to_vec().into(),
1002            ..Default::default()
1003        };
1004        let f4 = FileMetaData {
1005            num: 4,
1006            size: 40,
1007            smallest: b"q".to_vec().into(),
1008            largest: b"z".to_vec().into(),
1009            ..Default::default()
1010        };
1011        vec![f1, f2, f3, f4].into_iter().map(share).collect()
1012    }
1013
1014    #[test]
1015    fn test_version_set_merge_iters() {
1016        let v1 = vec![2, 4, 6, 8, 10];
1017        let v2 = vec![1, 3, 5, 7];
1018        assert_eq!(
1019            vec![1, 2, 3, 4, 5, 6, 7, 8, 10],
1020            merge_iters(v1.into_iter(), v2.into_iter(), |a, b| a.cmp(b))
1021        );
1022    }
1023
1024    #[test]
1025    fn test_version_set_total_size() {
1026        assert_eq!(100, total_size(example_files().iter()));
1027    }
1028
1029    #[test]
1030    fn test_version_set_get_range() {
1031        let cmp = DefaultCmp;
1032        let fs = example_files();
1033        assert_eq!(
1034            (b"a".to_vec().into(), b"z".to_vec().into()),
1035            get_range(&cmp, fs.iter())
1036        );
1037    }
1038
1039    #[test]
1040    fn test_version_set_builder() {
1041        let (v, opt) = make_version();
1042        let v = share(v);
1043
1044        let fmd = FileMetaData {
1045            num: 21,
1046            size: 123,
1047            smallest: LookupKey::new(b"klm", 777).internal_key().to_vec().into(),
1048            largest: LookupKey::new(b"kop", 700).internal_key().to_vec().into(),
1049            ..Default::default()
1050        };
1051
1052        let mut ve = VersionEdit::new();
1053        ve.add_file(1, fmd);
1054        // This deletion should be undone by apply().
1055        ve.delete_file(1, 21);
1056        ve.delete_file(0, 2);
1057        ve.set_compact_pointer(2, LookupKey::new(b"xxx", 123).internal_key());
1058
1059        let mut b = Builder::new();
1060        let mut ptrs: [Bytes; NUM_LEVELS] = Default::default();
1061        b.apply(&ve, &mut ptrs);
1062
1063        assert_eq!(
1064            &[120_u8, 120, 120, 1, 123, 0, 0, 0, 0, 0, 0],
1065            ptrs[2].as_ref()
1066        );
1067        assert_eq!(2, b.deleted[0][0]);
1068        assert_eq!(1, b.added[1].len());
1069
1070        let mut v2 = Version::new(
1071            share(TableCache::new(
1072                "db",
1073                opt.clone(),
1074                share(Cache::new(128)),
1075                100,
1076            )),
1077            opt.cmp.clone(),
1078        );
1079        b.save_to(&InternalKeyCmp(opt.cmp.clone()), &v, &mut v2);
1080        // Second file in L0 was removed.
1081        assert_eq!(1, v2.files[0].len());
1082        // File was added to L1.
1083        assert_eq!(4, v2.files[1].len());
1084        assert_eq!(21, v2.files[1][3].borrow().num);
1085    }
1086
1087    #[test]
1088    fn test_version_set_log_and_apply() {
1089        let (_, opt) = make_version();
1090        let mut vs = VersionSet::new(
1091            "db",
1092            opt.clone(),
1093            share(TableCache::new(
1094                "db",
1095                opt.clone(),
1096                share(Cache::new(128)),
1097                100,
1098            )),
1099        );
1100
1101        assert_eq!(2, vs.new_file_number());
1102        // Simulate NewDB
1103        {
1104            let mut ve = VersionEdit::new();
1105            ve.set_comparator_name("leveldb.BytewiseComparator");
1106            ve.set_log_num(10);
1107            ve.set_next_file(20);
1108            ve.set_last_seq(30);
1109
1110            // Write first manifest to be recovered from.
1111            let manifest = manifest_file_name("db", 19);
1112            let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1113            let mut lw = LogWriter::new(mffile);
1114            lw.add_record(&ve.encode()).unwrap();
1115            lw.flush().unwrap();
1116            set_current_file(opt.env.as_ref().as_ref(), "db", 19).unwrap();
1117        }
1118
1119        // Recover from new state.
1120        {
1121            vs.recover().unwrap();
1122            assert_eq!(10, vs.log_num);
1123            assert_eq!(21, vs.next_file_num);
1124            assert_eq!(30, vs.last_seq);
1125            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
1126            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[1].len());
1127            assert_eq!(35, vs.write_snapshot().unwrap());
1128        }
1129
1130        // Simulate compaction by adding a file.
1131        {
1132            let mut ve = VersionEdit::new();
1133            ve.set_log_num(11);
1134            let fmd = FileMetaData {
1135                num: 21,
1136                size: 123,
1137                smallest: LookupKey::new(b"abc", 777).internal_key().to_vec().into(),
1138                largest: LookupKey::new(b"def", 700).internal_key().to_vec().into(),
1139                ..Default::default()
1140            };
1141            ve.add_file(1, fmd);
1142            vs.log_and_apply(ve).unwrap();
1143
1144            assert!(opt.env.exists(&Path::new("db").join("CURRENT")).unwrap());
1145            assert!(opt
1146                .env
1147                .exists(&Path::new("db").join("MANIFEST-000019"))
1148                .unwrap());
1149            // next_file_num and last_seq are untouched by log_and_apply
1150            assert_eq!(21, vs.new_file_number());
1151            assert_eq!(22, vs.next_file_num);
1152            assert_eq!(30, vs.last_seq);
1153            // the following fields are touched by log_and_apply.
1154            assert_eq!(11, vs.log_num);
1155
1156            // The previous "compaction" should have added one file to the first level in the
1157            // current version.
1158            assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
1159            assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len());
1160            assert_eq!(63, vs.write_snapshot().unwrap());
1161        }
1162    }
1163
1164    #[test]
1165    fn test_version_set_utils() {
1166        let (v, opt) = make_version();
1167        let mut vs = VersionSet::new(
1168            "db",
1169            opt.clone(),
1170            share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1171        );
1172        vs.add_version(v);
1173        // live_files()
1174        assert_eq!(9, vs.live_files().len());
1175        assert!(vs.live_files().contains(&3));
1176
1177        let v = vs.current();
1178        let v = v.borrow();
1179        // num_level_bytes()
1180        assert_eq!(483, v.num_level_bytes(0));
1181        assert_eq!(651, v.num_level_bytes(1));
1182        assert_eq!(468, v.num_level_bytes(2));
1183        // num_level_files()
1184        assert_eq!(2, v.num_level_files(0));
1185        assert_eq!(3, v.num_level_files(1));
1186        assert_eq!(2, v.num_level_files(2));
1187        // new_file_number()
1188        assert_eq!(2, vs.new_file_number());
1189        assert_eq!(3, vs.new_file_number());
1190    }
1191
1192    #[test]
1193    fn test_version_set_pick_compaction() {
1194        let (mut v, opt) = make_version();
1195        let mut vs = VersionSet::new(
1196            "db",
1197            opt.clone(),
1198            share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1199        );
1200
1201        v.compaction_score = Some(2.0);
1202        v.compaction_level = Some(0);
1203        vs.add_version(v);
1204
1205        // Size compaction
1206        {
1207            let c = vs.pick_compaction().unwrap();
1208            assert_eq!(2, c.inputs[0].len());
1209            assert_eq!(1, c.inputs[1].len());
1210            assert_eq!(0, c.level);
1211            assert!(c.input_version.is_some());
1212        }
1213        // Seek compaction
1214        {
1215            let current = vs.current();
1216            current.borrow_mut().compaction_score = None;
1217            current.borrow_mut().compaction_level = None;
1218            current.borrow_mut().file_to_compact_lvl = 1;
1219
1220            let fmd = current.borrow().files[1][0].clone();
1221            current.borrow_mut().file_to_compact = Some(fmd);
1222
1223            let c = vs.pick_compaction().unwrap();
1224            assert_eq!(3, c.inputs[0].len()); // inputs on l+0 are expanded.
1225            assert_eq!(1, c.inputs[1].len());
1226            assert_eq!(1, c.level);
1227            assert!(c.input_version.is_some());
1228        }
1229    }
1230
1231    /// iterator_properties tests that it contains len elements and that they are ordered in
1232    /// ascending order by cmp.
1233    fn iterator_properties<It: LdbIterator>(mut it: It, len: usize, cmp: Rc<Box<dyn Cmp>>) {
1234        let mut wr = LdbIteratorIter::wrap(&mut it);
1235        let first = wr.next().unwrap();
1236        let mut count = 1;
1237        wr.fold(first, |(a, _), (b, c)| {
1238            assert_eq!(Ordering::Less, cmp.cmp(&a, &b));
1239            count += 1;
1240            (b, c)
1241        });
1242        assert_eq!(len, count);
1243    }
1244
1245    #[test]
1246    fn test_version_set_compaction() {
1247        let (v, opt) = make_version();
1248        let mut vs = VersionSet::new(
1249            "db",
1250            opt.clone(),
1251            share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1252        );
1253        time_test!();
1254        vs.add_version(v);
1255
1256        {
1257            // approximate_offset()
1258            let v = vs.current();
1259            assert_eq!(
1260                0,
1261                vs.approximate_offset(&v, LookupKey::new(b"aaa", 9000).internal_key())
1262            );
1263            assert_eq!(
1264                232,
1265                vs.approximate_offset(&v, LookupKey::new(b"bab", 9000).internal_key())
1266            );
1267            assert_eq!(
1268                1134,
1269                vs.approximate_offset(&v, LookupKey::new(b"fab", 9000).internal_key())
1270            );
1271        }
1272        // The following tests reuse the same version set and verify that various compactions work
1273        // like they should.
1274        {
1275            time_test!("compaction tests");
1276            // compact level 0 with a partial range.
1277            let from = LookupKey::new(b"000", 1000);
1278            let to = LookupKey::new(b"ab", 1010);
1279            let c = vs
1280                .compact_range(0, from.internal_key(), to.internal_key())
1281                .unwrap();
1282            assert_eq!(2, c.inputs[0].len());
1283            assert_eq!(1, c.inputs[1].len());
1284            assert_eq!(1, c.grandparents.unwrap().len());
1285
1286            // compact level 0, but entire range of keys in version.
1287            let from = LookupKey::new(b"000", 1000);
1288            let to = LookupKey::new(b"zzz", 1010);
1289            let c = vs
1290                .compact_range(0, from.internal_key(), to.internal_key())
1291                .unwrap();
1292            assert_eq!(2, c.inputs[0].len());
1293            assert_eq!(1, c.inputs[1].len());
1294            assert_eq!(1, c.grandparents.as_ref().unwrap().len());
1295            iterator_properties(
1296                vs.make_input_iterator(&c),
1297                12,
1298                Rc::new(Box::new(vs.cmp.clone())),
1299            );
1300
1301            // Expand input range on higher level.
1302            let from = LookupKey::new(b"dab", 1000);
1303            let to = LookupKey::new(b"eab", 1010);
1304            let c = vs
1305                .compact_range(1, from.internal_key(), to.internal_key())
1306                .unwrap();
1307            assert_eq!(3, c.inputs[0].len());
1308            assert_eq!(1, c.inputs[1].len());
1309            assert_eq!(0, c.grandparents.as_ref().unwrap().len());
1310            iterator_properties(
1311                vs.make_input_iterator(&c),
1312                12,
1313                Rc::new(Box::new(vs.cmp.clone())),
1314            );
1315
1316            // is_trivial_move
1317            let from = LookupKey::new(b"fab", 1000);
1318            let to = LookupKey::new(b"fba", 1010);
1319            let mut c = vs
1320                .compact_range(2, from.internal_key(), to.internal_key())
1321                .unwrap();
1322            // pretend it's not manual
1323            c.manual = false;
1324            assert!(c.is_trivial_move());
1325
1326            // should_stop_before
1327            let from = LookupKey::new(b"000", 1000);
1328            let to = LookupKey::new(b"zzz", 1010);
1329            let mid = LookupKey::new(b"abc", 1010);
1330            let mut c = vs
1331                .compact_range(0, from.internal_key(), to.internal_key())
1332                .unwrap();
1333            assert!(!c.should_stop_before(from.internal_key()));
1334            assert!(!c.should_stop_before(mid.internal_key()));
1335            assert!(!c.should_stop_before(to.internal_key()));
1336
1337            // is_base_level_for
1338            let from = LookupKey::new(b"000", 1000);
1339            let to = LookupKey::new(b"zzz", 1010);
1340            let mut c = vs
1341                .compact_range(0, from.internal_key(), to.internal_key())
1342                .unwrap();
1343            assert!(c.is_base_level_for(b"aaa"));
1344            assert!(!c.is_base_level_for(b"hac"));
1345
1346            // input/add_input_deletions
1347            let from = LookupKey::new(b"000", 1000);
1348            let to = LookupKey::new(b"zzz", 1010);
1349            let mut c = vs
1350                .compact_range(0, from.internal_key(), to.internal_key())
1351                .unwrap();
1352            for inp in &[(0, 0, 1), (0, 1, 2), (1, 0, 3)] {
1353                let f = &c.inputs[inp.0][inp.1];
1354                assert_eq!(inp.2, f.borrow().num);
1355            }
1356            c.add_input_deletions();
1357            assert_eq!(23, c.edit().encode().len())
1358        }
1359    }
1360}