1use crate::cmp::{Cmp, InternalKeyCmp};
2use crate::env::Env;
3use crate::error::{err, Result, StatusCode};
4use crate::key_types::{parse_internal_key, InternalKey, UserKey};
5use crate::log::{LogReader, LogWriter};
6use crate::merging_iter::MergingIter;
7use crate::options::Options;
8use crate::table_cache::TableCache;
9use crate::types::{
10 parse_file_name, share, FileMetaData, FileNum, FileType, LdbIterator, Shared, NUM_LEVELS,
11};
12use crate::version::{new_version_iter, total_size, FileMetaHandle, Version};
13use crate::version_edit::VersionEdit;
14
15use bytes::Bytes;
16use std::cmp::Ordering;
17use std::collections::HashSet;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21
22pub struct Compaction {
23 level: usize,
24 max_file_size: usize,
25 input_version: Option<Shared<Version>>,
26 level_ixs: [usize; NUM_LEVELS],
27 cmp: Rc<Box<dyn Cmp>>,
28 icmp: InternalKeyCmp,
29
30 manual: bool,
31
32 inputs: [Vec<FileMetaHandle>; 2],
34 grandparent_ix: usize,
35 grandparents: Option<Vec<FileMetaHandle>>,
37 overlapped_bytes: usize,
38 seen_key: bool,
39 edit: VersionEdit,
40}
41
42impl Compaction {
43 pub fn new(opt: &Options, level: usize, input: Option<Shared<Version>>) -> Compaction {
45 Compaction {
46 level,
47 max_file_size: opt.max_file_size,
48 input_version: input,
49 level_ixs: Default::default(),
50 cmp: opt.cmp.clone(),
51 icmp: InternalKeyCmp(opt.cmp.clone()),
52 manual: false,
53
54 inputs: Default::default(),
55 grandparent_ix: 0,
56 grandparents: Default::default(),
57 overlapped_bytes: 0,
58 seen_key: false,
59 edit: VersionEdit::new(),
60 }
61 }
62
63 fn add_input(&mut self, parent: usize, f: FileMetaHandle) {
64 assert!(parent <= 1);
65 self.inputs[parent].push(f)
66 }
67
68 pub fn level(&self) -> usize {
69 self.level
70 }
71
72 pub fn input(&self, parent: usize, ix: usize) -> FileMetaData {
73 assert!(parent < 2);
74 assert!(ix < self.inputs[parent].len());
75 self.inputs[parent][ix].borrow().clone()
76 }
77
78 pub fn num_inputs(&self, parent: usize) -> usize {
79 assert!(parent < 2);
80 self.inputs[parent].len()
81 }
82
83 pub fn edit(&mut self) -> &mut VersionEdit {
84 &mut self.edit
85 }
86
87 pub fn into_edit(self) -> VersionEdit {
88 self.edit
89 }
90
91 pub fn add_input_deletions(&mut self) {
93 for parent in 0..2 {
94 for f in &self.inputs[parent] {
95 self.edit.delete_file(self.level + parent, f.borrow().num);
96 }
97 }
98 }
99
100 pub fn is_base_level_for(&mut self, k: UserKey<'_>) -> bool {
104 assert!(self.input_version.is_some());
105 let inp_version = self.input_version.as_ref().unwrap();
106 for level in self.level + 2..NUM_LEVELS {
107 let files = &inp_version.borrow().files[level];
108 while self.level_ixs[level] < files.len() {
109 let f = files[self.level_ixs[level]].borrow();
110 if self.cmp.cmp(k, parse_internal_key(&f.largest).2) <= Ordering::Equal {
111 if self.cmp.cmp(k, parse_internal_key(&f.smallest).2) >= Ordering::Equal {
112 return false;
114 }
115 break;
116 }
117 self.level_ixs[level] += 1;
119 }
120 }
121 true
122 }
123
124 pub fn is_trivial_move(&self) -> bool {
125 if self.manual {
126 return false;
127 }
128
129 let inputs_size;
130 if let Some(gp) = self.grandparents.as_ref() {
131 inputs_size = total_size(gp.iter());
132 } else {
133 inputs_size = 0;
134 }
135 self.num_inputs(0) == 1 && self.num_inputs(1) == 0 && inputs_size < 10 * self.max_file_size
136 }
137
138 pub fn should_stop_before(&mut self, k: InternalKey<'_>) -> bool {
139 if self.grandparents.is_none() {
140 self.seen_key = true;
141 return false;
142 }
143 let grandparents = self.grandparents.as_ref().unwrap();
144 while self.grandparent_ix < grandparents.len()
145 && self
146 .icmp
147 .cmp(k, &grandparents[self.grandparent_ix].borrow().largest)
148 == Ordering::Greater
149 {
150 if self.seen_key {
151 self.overlapped_bytes += grandparents[self.grandparent_ix].borrow().size;
152 }
153 self.grandparent_ix += 1;
154 }
155 self.seen_key = true;
156
157 if self.overlapped_bytes > 10 * self.max_file_size {
158 self.overlapped_bytes = 0;
159 true
160 } else {
161 false
162 }
163 }
164}
165
166pub struct VersionSet {
169 dbname: PathBuf,
170 opt: Options,
171 cmp: InternalKeyCmp,
172 cache: Shared<TableCache>,
173
174 pub next_file_num: u64,
175 pub manifest_num: u64,
176 pub last_seq: u64,
177 pub log_num: u64,
178 pub prev_log_num: u64,
179
180 current: Option<Shared<Version>>,
181 compaction_ptrs: [Bytes; NUM_LEVELS],
182
183 descriptor_log: Option<LogWriter<Box<dyn Write>>>,
184}
185
186impl VersionSet {
187 pub fn new<P: AsRef<Path>>(db: P, opt: Options, cache: Shared<TableCache>) -> VersionSet {
190 let v = share(Version::new(cache.clone(), opt.cmp.clone()));
191 VersionSet {
192 dbname: db.as_ref().to_owned(),
193 cmp: InternalKeyCmp(opt.cmp.clone()),
194 opt,
195 cache,
196
197 next_file_num: 2,
198 manifest_num: 0,
199 last_seq: 0,
200 log_num: 0,
201 prev_log_num: 0,
202
203 current: Some(v),
204 compaction_ptrs: Default::default(),
205 descriptor_log: None,
206 }
207 }
208
209 pub fn current_summary(&self) -> String {
210 self.current.as_ref().unwrap().borrow().level_summary()
211 }
212
213 pub fn live_files(&self) -> HashSet<FileNum> {
215 let mut files = HashSet::new();
216 if let Some(ref version) = self.current {
217 for level in 0..NUM_LEVELS {
218 for file in &version.borrow().files[level] {
219 files.insert(file.borrow().num);
220 }
221 }
222 }
223 files
224 }
225
226 pub fn current(&self) -> Shared<Version> {
229 assert!(self.current.is_some());
230 self.current.as_ref().unwrap().clone()
231 }
232
233 pub fn add_version(&mut self, v: Version) {
234 self.current = Some(share(v));
235 }
236
237 pub fn new_file_number(&mut self) -> FileNum {
238 self.next_file_num += 1;
239 self.next_file_num - 1
240 }
241
242 pub fn reuse_file_number(&mut self, n: FileNum) {
243 if n == self.next_file_num - 1 {
244 self.next_file_num = n;
245 }
246 }
247
248 pub fn mark_file_number_used(&mut self, n: FileNum) {
249 if self.next_file_num <= n {
250 self.next_file_num = n + 1;
251 }
252 }
253
254 pub fn needs_compaction(&self) -> bool {
256 assert!(self.current.is_some());
257 let v = self.current.as_ref().unwrap();
258 let v = v.borrow();
259 v.compaction_score.unwrap_or(0.0) >= 1.0 || v.file_to_compact.is_some()
260 }
261
262 fn approximate_offset(&self, v: &Shared<Version>, key: InternalKey<'_>) -> usize {
263 let mut offset = 0;
264 for level in 0..NUM_LEVELS {
265 for f in &v.borrow().files[level] {
266 if self.opt.cmp.cmp(&f.borrow().largest, key) <= Ordering::Equal {
267 offset += f.borrow().size;
268 } else if self.opt.cmp.cmp(&f.borrow().smallest, key) == Ordering::Greater {
269 if level > 0 {
271 break;
272 }
273 } else if let Ok(tbl) = self.cache.borrow_mut().get_table(f.borrow().num) {
274 offset += tbl.approx_offset_of(key);
275 }
276 }
277 }
278 offset
279 }
280
281 pub fn pick_compaction(&mut self) -> Option<Compaction> {
282 assert!(self.current.is_some());
283 let current = self.current();
284 let current = current.borrow();
285
286 let mut c = Compaction::new(&self.opt, 0, self.current.clone());
287 let level;
288
289 if current.compaction_score.unwrap_or(0.0) >= 1.0 {
291 level = current.compaction_level.unwrap();
292 assert!(level < NUM_LEVELS - 1);
293
294 for f in ¤t.files[level] {
295 if self.compaction_ptrs[level].is_empty()
296 || self
297 .cmp
298 .cmp(&f.borrow().largest, &self.compaction_ptrs[level])
299 == Ordering::Greater
300 {
301 c.add_input(0, f.clone());
302 break;
303 }
304 }
305
306 if c.num_inputs(0) == 0 {
307 c.add_input(0, current.files[level][0].clone());
309 }
310 } else if let Some(ref ftc) = current.file_to_compact {
311 level = current.file_to_compact_lvl;
313 c.add_input(0, ftc.clone());
314 } else {
315 return None;
316 }
317
318 c.level = level;
319 c.input_version.clone_from(&self.current);
320
321 if level == 0 {
322 let (smallest, largest) = get_range(&self.cmp, c.inputs[0].iter());
323 c.inputs[0] = current.overlapping_inputs(0, &smallest, &largest);
325 assert!(!c.inputs[0].is_empty());
326 }
327
328 self.setup_other_inputs(&mut c);
329 Some(c)
330 }
331
332 pub fn compact_range(
333 &mut self,
334 level: usize,
335 from: InternalKey<'_>,
336 to: InternalKey<'_>,
337 ) -> Option<Compaction> {
338 assert!(self.current.is_some());
339 let mut inputs = self
340 .current
341 .as_ref()
342 .unwrap()
343 .borrow()
344 .overlapping_inputs(level, from, to);
345 if inputs.is_empty() {
346 return None;
347 }
348
349 if level > 0 {
350 let mut total = 0;
351 for i in 0..inputs.len() {
352 total += inputs[i].borrow().size;
353 if total > self.opt.max_file_size {
354 inputs.truncate(i + 1);
355 break;
356 }
357 }
358 }
359
360 let mut c = Compaction::new(&self.opt, level, self.current.clone());
361 c.inputs[0] = inputs;
362 c.manual = true;
363 self.setup_other_inputs(&mut c);
364 Some(c)
365 }
366
367 fn setup_other_inputs(&mut self, compaction: &mut Compaction) {
368 assert!(self.current.is_some());
369 let current = self.current.as_ref().unwrap();
370 let current = current.borrow();
371
372 let level = compaction.level;
373 let (mut smallest, mut largest) = get_range(&self.cmp, compaction.inputs[0].iter());
374
375 compaction.inputs[1] = current.overlapping_inputs(level + 1, &smallest, &largest);
377
378 let (mut allstart, mut alllimit) = get_range(
379 &self.cmp,
380 compaction.inputs[0]
381 .iter()
382 .chain(compaction.inputs[1].iter()),
383 );
384
385 if !compaction.inputs[1].is_empty() {
388 let expanded0 = current.overlapping_inputs(level, &allstart, &alllimit);
389 let inputs1_size = total_size(compaction.inputs[1].iter());
390 let expanded0_size = total_size(expanded0.iter());
391 if expanded0.len() > compaction.num_inputs(0)
393 && (inputs1_size + expanded0_size) < 25 * self.opt.max_file_size
394 {
395 let (new_start, new_limit) = get_range(&self.cmp, expanded0.iter());
396 let expanded1 = current.overlapping_inputs(level + 1, &new_start, &new_limit);
397 if expanded1.len() == compaction.num_inputs(1) {
398 log!(
399 self.opt.log,
400 "Expanding inputs@{} {}+{} ({}+{} bytes) to {}+{} ({}+{} bytes)",
401 level,
402 compaction.inputs[0].len(),
403 compaction.inputs[1].len(),
404 total_size(compaction.inputs[0].iter()),
405 total_size(compaction.inputs[1].iter()),
406 expanded0.len(),
407 expanded1.len(),
408 total_size(expanded0.iter()),
409 total_size(expanded1.iter())
410 );
411
412 smallest = new_start;
413 largest = new_limit;
414 compaction.inputs[0] = expanded0;
415 compaction.inputs[1] = expanded1;
416 let (newallstart, newalllimit) = get_range(
417 &self.cmp,
418 compaction.inputs[0]
419 .iter()
420 .chain(compaction.inputs[1].iter()),
421 );
422 allstart = newallstart;
423 alllimit = newalllimit;
424 }
425 }
426 }
427
428 if level + 2 < NUM_LEVELS {
431 let grandparents = self.current.as_ref().unwrap().borrow().overlapping_inputs(
432 level + 2,
433 &allstart,
434 &alllimit,
435 );
436 compaction.grandparents = Some(grandparents);
437 }
438
439 log!(
440 self.opt.log,
441 "Compacting @{} {:?} .. {:?}",
442 level,
443 smallest,
444 largest
445 );
446
447 compaction.edit().set_compact_pointer(level, &largest);
448 self.compaction_ptrs[level] = largest;
449 }
450
451 fn write_snapshot(&mut self) -> Result<usize> {
453 assert!(self.descriptor_log.is_some());
454
455 let mut edit = VersionEdit::new();
456 edit.set_comparator_name(self.opt.cmp.id());
457
458 for level in 0..NUM_LEVELS {
460 if !self.compaction_ptrs[level].is_empty() {
461 edit.set_compact_pointer(level, &self.compaction_ptrs[level]);
462 }
463 }
464
465 let current = self.current.as_ref().unwrap().borrow();
466 for level in 0..NUM_LEVELS {
468 let fs = ¤t.files[level];
469 for f in fs {
470 edit.add_file(level, f.borrow().clone());
471 }
472 }
473 self.descriptor_log
474 .as_mut()
475 .unwrap()
476 .add_record(&edit.encode())
477 }
478
479 pub fn log_and_apply(&mut self, mut edit: VersionEdit) -> Result<()> {
482 assert!(self.current.is_some());
483
484 if edit.log_number.is_none() {
485 edit.set_log_num(self.log_num);
486 } else {
487 assert!(edit.log_number.unwrap() >= self.log_num);
488 assert!(edit.log_number.unwrap() < self.next_file_num);
489 }
490 if edit.prev_log_number.is_none() {
491 edit.set_prev_log_num(self.prev_log_num);
492 }
493 edit.set_next_file(self.next_file_num);
494 edit.set_last_seq(self.last_seq);
495
496 let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
497 {
498 let mut builder = Builder::new();
499 builder.apply(&edit, &mut self.compaction_ptrs);
500 builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
501 }
502 self.finalize(&mut v);
503
504 if self.descriptor_log.is_none() {
505 let descname = manifest_file_name(&self.dbname, self.manifest_num);
506 edit.set_next_file(self.next_file_num);
507 self.descriptor_log = Some(LogWriter::new(
508 self.opt.env.open_writable_file(Path::new(&descname))?,
509 ));
510 self.write_snapshot()?;
511 }
512
513 let encoded = edit.encode();
514 if let Some(ref mut lw) = self.descriptor_log {
515 lw.add_record(&encoded)?;
516 lw.flush()?;
517 }
518 set_current_file(
519 self.opt.env.as_ref().as_ref(),
520 &self.dbname,
521 self.manifest_num,
522 )?;
523
524 self.add_version(v);
525 self.log_num = edit.log_number.unwrap();
527
528 Ok(())
530 }
531
532 fn finalize(&self, v: &mut Version) {
533 let mut best_lvl = None;
534 let mut best_score = None;
535
536 for l in 0..NUM_LEVELS - 1 {
537 let score = if l == 0 {
538 v.files[l].len() as f64 / 4.0
539 } else {
540 let mut max_bytes = 10.0 * f64::from(1 << 20);
541 for _ in 0..l - 1 {
542 max_bytes *= 10.0;
543 }
544 total_size(v.files[l].iter()) as f64 / max_bytes
545 };
546 if let Some(ref mut b) = best_score {
547 if *b < score {
548 *b = score;
549 best_lvl = Some(l);
550 }
551 } else {
552 best_score = Some(score);
553 best_lvl = Some(l);
554 }
555 }
556 v.compaction_score = best_score;
557 v.compaction_level = best_lvl;
558 }
559
560 pub fn recover(&mut self) -> Result<bool> {
563 assert!(self.current.is_some());
564
565 let mut current = read_current_file(self.opt.env.as_ref().as_ref(), &self.dbname)?;
566 let len = current.len();
567 current.truncate(len - 1);
568 let current = Path::new(¤t);
569
570 let descfilename = self.dbname.join(current);
571 let mut builder = Builder::new();
572 {
573 let mut descfile = self
574 .opt
575 .env
576 .open_sequential_file(Path::new(&descfilename))?;
577 let mut logreader = LogReader::new(
578 &mut descfile,
579 true,
581 );
582
583 let mut log_number = None;
584 let mut prev_log_number = None;
585 let mut next_file_number = None;
586 let mut last_seq = None;
587
588 let mut buf = Vec::new();
589 while let Ok(size) = logreader.read(&mut buf) {
590 if size == 0 {
591 break;
592 }
593 let edit = VersionEdit::decode_from(&buf)?;
594 builder.apply(&edit, &mut self.compaction_ptrs);
595 if let Some(ln) = edit.log_number {
596 log_number = Some(ln);
597 }
598 if let Some(nfn) = edit.next_file_number {
599 next_file_number = Some(nfn);
600 }
601 if let Some(ls) = edit.last_seq {
602 last_seq = Some(ls);
603 }
604 if let Some(pln) = edit.prev_log_number {
605 prev_log_number = Some(pln);
606 }
607 }
608
609 if let Some(ln) = log_number {
610 self.log_num = ln;
611 self.mark_file_number_used(ln);
612 } else {
613 return err(
614 StatusCode::Corruption,
615 "no meta-lognumber entry in descriptor",
616 );
617 }
618 if let Some(nfn) = next_file_number {
619 self.next_file_num = nfn + 1;
620 } else {
621 return err(
622 StatusCode::Corruption,
623 "no meta-next-file entry in descriptor",
624 );
625 }
626 if let Some(ls) = last_seq {
627 self.last_seq = ls;
628 } else {
629 return err(
630 StatusCode::Corruption,
631 "no last-sequence entry in descriptor",
632 );
633 }
634 if let Some(pln) = prev_log_number {
635 self.prev_log_num = pln;
636 self.mark_file_number_used(prev_log_number.unwrap());
637 } else {
638 self.prev_log_num = 0;
639 }
640 }
641
642 let mut v = Version::new(self.cache.clone(), self.opt.cmp.clone());
643 builder.save_to(&self.cmp, self.current.as_ref().unwrap(), &mut v);
644 self.finalize(&mut v);
645 self.add_version(v);
646 self.manifest_num = self.next_file_num - 1;
647 log!(
648 self.opt.log,
649 "Recovered manifest with next_file={} manifest_num={} log_num={} prev_log_num={} \
650 last_seq={}",
651 self.next_file_num,
652 self.manifest_num,
653 self.log_num,
654 self.prev_log_num,
655 self.last_seq
656 );
657
658 Ok(!self.reuse_manifest(&descfilename, current))
660 }
661
662 fn reuse_manifest(
664 &mut self,
665 current_manifest_path: &Path,
666 current_manifest_base: &Path,
667 ) -> bool {
668 if !self.opt.reuse_manifest {
675 return false;
676 }
677 if let Ok((num, typ)) = parse_file_name(current_manifest_base) {
679 if typ != FileType::Descriptor {
680 return false;
681 }
682 if let Ok(size) = self.opt.env.size_of(Path::new(current_manifest_path)) {
683 if size >= self.opt.max_file_size {
684 return false;
685 }
686
687 assert!(self.descriptor_log.is_none());
688 let s = self
689 .opt
690 .env
691 .open_appendable_file(Path::new(current_manifest_path));
692 if let Ok(f) = s {
693 log!(self.opt.log, "reusing manifest {:?}", current_manifest_path);
694 self.descriptor_log = Some(LogWriter::new_with_off(f, size));
695 self.manifest_num = num;
696 return true;
697 } else {
698 log!(self.opt.log, "reuse_manifest: {}", s.err().unwrap());
699 }
700 }
701 }
702 false
703 }
704
705 pub fn make_input_iterator(&self, c: &Compaction) -> Box<dyn LdbIterator> {
707 let cap = if c.level == 0 { c.num_inputs(0) + 1 } else { 2 };
708 let mut iters: Vec<Box<dyn LdbIterator>> = Vec::with_capacity(cap);
709 for i in 0..2 {
710 if c.num_inputs(i) == 0 {
711 continue;
712 }
713 if c.level + i == 0 {
714 for fi in 0..c.num_inputs(i) {
716 let f = &c.inputs[i][fi];
717 let s = self.cache.borrow_mut().get_table(f.borrow().num);
718 if let Ok(tbl) = s {
719 iters.push(Box::new(tbl.iter()));
720 } else {
721 log!(
722 self.opt.log,
723 "error opening table {}: {}",
724 f.borrow().num,
725 s.err().unwrap()
726 );
727 }
728 }
729 } else {
730 iters.push(Box::new(new_version_iter(
732 c.inputs[i].clone(),
733 self.cache.clone(),
734 self.opt.cmp.clone(),
735 )));
736 }
737 }
738 assert!(iters.len() <= cap);
739 let cmp: Rc<Box<dyn Cmp>> = Rc::new(Box::new(self.cmp.clone()));
740 Box::new(MergingIter::new(cmp, iters))
741 }
742}
743
744struct Builder {
745 deleted: [Vec<FileNum>; NUM_LEVELS],
747 added: [Vec<FileMetaHandle>; NUM_LEVELS],
748}
749
750impl Builder {
751 fn new() -> Builder {
752 Builder {
753 deleted: Default::default(),
754 added: Default::default(),
755 }
756 }
757
758 fn apply(&mut self, edit: &VersionEdit, compaction_ptrs: &mut [Bytes; NUM_LEVELS]) {
761 for c in edit.compaction_ptrs.iter() {
762 compaction_ptrs[c.level].clone_from(&c.key);
763 }
764 for &(level, num) in edit.deleted.iter() {
765 self.deleted[level].push(num);
766 }
767 for &(level, ref f) in edit.new_files.iter() {
768 let mut f = f.clone();
769 f.allowed_seeks = f.size / 16384;
770 if f.allowed_seeks < 100 {
771 f.allowed_seeks = 100;
772 }
773 self.deleted[level] = self.deleted[level]
775 .iter()
776 .filter_map(|d| if *d != f.num { Some(*d) } else { None })
777 .collect();
778 self.added[level].push(share(f));
779 }
780 }
781
782 fn maybe_add_file(
785 &mut self,
786 cmp: &InternalKeyCmp,
787 v: &mut Version,
788 level: usize,
789 f: FileMetaHandle,
790 ) {
791 if self.deleted[level].iter().any(|d| *d == f.borrow().num) {
793 return;
794 }
795 {
796 let files = &v.files[level];
797 if level > 0 && !files.is_empty() {
798 assert_eq!(
800 cmp.cmp(
801 &files[files.len() - 1].borrow().largest,
802 &f.borrow().smallest
803 ),
804 Ordering::Less
805 );
806 }
807 }
808 v.files[level].push(f);
809 }
810
811 fn save_to(&mut self, cmp: &InternalKeyCmp, base: &Shared<Version>, v: &mut Version) {
814 for level in 0..NUM_LEVELS {
815 sort_files_by_smallest(cmp, &mut self.added[level]);
816 sort_files_by_smallest(cmp, &mut base.borrow_mut().files[level]);
818
819 let added = self.added[level].clone();
820 let basefiles = base.borrow().files[level].clone();
821 v.files[level].reserve(basefiles.len() + self.added[level].len());
822
823 let iadded = added.into_iter();
824 let ibasefiles = basefiles.into_iter();
825 let merged = merge_iters(iadded, ibasefiles, |a, b| {
826 cmp.cmp(&a.borrow().smallest, &b.borrow().smallest)
827 });
828 for m in merged {
829 self.maybe_add_file(cmp, v, level, m);
830 }
831
832 if level == 0 {
834 continue;
835 }
836 for i in 1..v.files[level].len() {
837 let (prev_end, this_begin) = (
838 &v.files[level][i - 1].borrow().largest,
839 &v.files[level][i].borrow().smallest,
840 );
841 assert!(cmp.cmp(prev_end, this_begin) < Ordering::Equal);
842 }
843 }
844 }
845}
846
847fn manifest_name(file_num: FileNum) -> PathBuf {
848 Path::new(&format!("MANIFEST-{:06}", file_num)).to_owned()
849}
850
851pub fn manifest_file_name<P: AsRef<Path>>(dbname: P, file_num: FileNum) -> PathBuf {
852 dbname.as_ref().join(manifest_name(file_num))
853}
854
855fn temp_file_name<P: AsRef<Path>>(dbname: P, file_num: FileNum) -> PathBuf {
856 dbname.as_ref().join(format!("{:06}.dbtmp", file_num))
857}
858
859fn current_file_name<P: AsRef<Path>>(dbname: P) -> PathBuf {
860 dbname.as_ref().join("CURRENT").to_owned()
861}
862
863pub fn read_current_file(env: &dyn Env, dbname: &Path) -> Result<String> {
864 let mut current = String::new();
865 let mut f = env.open_sequential_file(Path::new(¤t_file_name(dbname)))?;
866 f.read_to_string(&mut current)?;
867 if current.is_empty() || !current.ends_with('\n') {
868 return err(
869 StatusCode::Corruption,
870 "current file is empty or has no newline",
871 );
872 }
873 Ok(current)
874}
875
876pub fn set_current_file<P: AsRef<Path>>(
877 env: &dyn Env,
878 dbname: P,
879 manifest_file_num: FileNum,
880) -> Result<()> {
881 let dbname = dbname.as_ref();
882 let manifest_base = manifest_name(manifest_file_num);
883 let tempfile = temp_file_name(dbname, manifest_file_num);
884 {
885 let mut f = env.open_writable_file(Path::new(&tempfile))?;
886 f.write_all(manifest_base.display().to_string().as_bytes())?;
887 f.write_all(b"\n")?;
888 }
889 let currentfile = current_file_name(dbname);
890 if let Err(e) = env.rename(Path::new(&tempfile), Path::new(¤tfile)) {
891 let _ = env.delete(Path::new(&tempfile));
893 return Err(e);
894 }
895 Ok(())
896}
897
898fn sort_files_by_smallest<C: Cmp>(cmp: &C, files: &mut [FileMetaHandle]) {
900 files.sort_by(|a, b| cmp.cmp(&a.borrow().smallest, &b.borrow().smallest))
901}
902
903fn merge_iters<
905 Item,
906 C: Fn(&Item, &Item) -> Ordering,
907 I: Iterator<Item = Item>,
908 J: Iterator<Item = Item>,
909>(
910 mut iter_a: I,
911 mut iter_b: J,
912 cmp: C,
913) -> Vec<Item> {
914 let mut a = iter_a.next();
915 let mut b = iter_b.next();
916 let mut out = vec![];
917 while a.is_some() && b.is_some() {
918 let ord = cmp(a.as_ref().unwrap(), b.as_ref().unwrap());
919 if ord == Ordering::Less {
920 out.push(a.unwrap());
921 a = iter_a.next();
922 } else {
923 out.push(b.unwrap());
924 b = iter_b.next();
925 }
926 }
927
928 if let Some(a_) = a {
930 out.push(a_);
931 }
932 if let Some(b_) = b {
933 out.push(b_);
934 }
935
936 for a in iter_a {
938 out.push(a);
939 }
940 for b in iter_b {
941 out.push(b);
942 }
943 out
944}
945
946fn get_range<'a, C: Cmp, I: Iterator<Item = &'a FileMetaHandle>>(
949 c: &C,
950 files: I,
951) -> (Bytes, Bytes) {
952 let mut smallest = None;
953 let mut largest = None;
954 for f in files {
955 if smallest.is_none() {
956 smallest = Some(f.borrow().smallest.clone());
957 }
958 if largest.is_none() {
959 largest = Some(f.borrow().largest.clone());
960 }
961 let f = f.borrow();
962 if c.cmp(&f.smallest, smallest.as_ref().unwrap()) == Ordering::Less {
963 smallest = Some(f.smallest.clone());
964 }
965 if c.cmp(&f.largest, largest.as_ref().unwrap()) == Ordering::Greater {
966 largest = Some(f.largest.clone());
967 }
968 }
969 (smallest.unwrap(), largest.unwrap())
970}
971
972#[cfg(test)]
973mod tests {
974 use super::*;
975 use crate::cache::Cache;
976 use crate::cmp::DefaultCmp;
977 use crate::key_types::LookupKey;
978 use crate::test_util::LdbIteratorIter;
979 use crate::types::FileMetaData;
980 use crate::version::testutil::make_version;
981
982 fn example_files() -> Vec<FileMetaHandle> {
983 let f1 = FileMetaData {
984 num: 1,
985 size: 10,
986 smallest: b"f".to_vec().into(),
987 largest: b"g".to_vec().into(),
988 ..Default::default()
989 };
990 let f2 = FileMetaData {
991 num: 2,
992 size: 20,
993 smallest: b"e".to_vec().into(),
994 largest: b"f".to_vec().into(),
995 ..Default::default()
996 };
997 let f3 = FileMetaData {
998 num: 3,
999 size: 30,
1000 smallest: b"a".to_vec().into(),
1001 largest: b"b".to_vec().into(),
1002 ..Default::default()
1003 };
1004 let f4 = FileMetaData {
1005 num: 4,
1006 size: 40,
1007 smallest: b"q".to_vec().into(),
1008 largest: b"z".to_vec().into(),
1009 ..Default::default()
1010 };
1011 vec![f1, f2, f3, f4].into_iter().map(share).collect()
1012 }
1013
1014 #[test]
1015 fn test_version_set_merge_iters() {
1016 let v1 = vec![2, 4, 6, 8, 10];
1017 let v2 = vec![1, 3, 5, 7];
1018 assert_eq!(
1019 vec![1, 2, 3, 4, 5, 6, 7, 8, 10],
1020 merge_iters(v1.into_iter(), v2.into_iter(), |a, b| a.cmp(b))
1021 );
1022 }
1023
1024 #[test]
1025 fn test_version_set_total_size() {
1026 assert_eq!(100, total_size(example_files().iter()));
1027 }
1028
1029 #[test]
1030 fn test_version_set_get_range() {
1031 let cmp = DefaultCmp;
1032 let fs = example_files();
1033 assert_eq!(
1034 (b"a".to_vec().into(), b"z".to_vec().into()),
1035 get_range(&cmp, fs.iter())
1036 );
1037 }
1038
1039 #[test]
1040 fn test_version_set_builder() {
1041 let (v, opt) = make_version();
1042 let v = share(v);
1043
1044 let fmd = FileMetaData {
1045 num: 21,
1046 size: 123,
1047 smallest: LookupKey::new(b"klm", 777).internal_key().to_vec().into(),
1048 largest: LookupKey::new(b"kop", 700).internal_key().to_vec().into(),
1049 ..Default::default()
1050 };
1051
1052 let mut ve = VersionEdit::new();
1053 ve.add_file(1, fmd);
1054 ve.delete_file(1, 21);
1056 ve.delete_file(0, 2);
1057 ve.set_compact_pointer(2, LookupKey::new(b"xxx", 123).internal_key());
1058
1059 let mut b = Builder::new();
1060 let mut ptrs: [Bytes; NUM_LEVELS] = Default::default();
1061 b.apply(&ve, &mut ptrs);
1062
1063 assert_eq!(
1064 &[120_u8, 120, 120, 1, 123, 0, 0, 0, 0, 0, 0],
1065 ptrs[2].as_ref()
1066 );
1067 assert_eq!(2, b.deleted[0][0]);
1068 assert_eq!(1, b.added[1].len());
1069
1070 let mut v2 = Version::new(
1071 share(TableCache::new(
1072 "db",
1073 opt.clone(),
1074 share(Cache::new(128)),
1075 100,
1076 )),
1077 opt.cmp.clone(),
1078 );
1079 b.save_to(&InternalKeyCmp(opt.cmp.clone()), &v, &mut v2);
1080 assert_eq!(1, v2.files[0].len());
1082 assert_eq!(4, v2.files[1].len());
1084 assert_eq!(21, v2.files[1][3].borrow().num);
1085 }
1086
1087 #[test]
1088 fn test_version_set_log_and_apply() {
1089 let (_, opt) = make_version();
1090 let mut vs = VersionSet::new(
1091 "db",
1092 opt.clone(),
1093 share(TableCache::new(
1094 "db",
1095 opt.clone(),
1096 share(Cache::new(128)),
1097 100,
1098 )),
1099 );
1100
1101 assert_eq!(2, vs.new_file_number());
1102 {
1104 let mut ve = VersionEdit::new();
1105 ve.set_comparator_name("leveldb.BytewiseComparator");
1106 ve.set_log_num(10);
1107 ve.set_next_file(20);
1108 ve.set_last_seq(30);
1109
1110 let manifest = manifest_file_name("db", 19);
1112 let mffile = opt.env.open_writable_file(Path::new(&manifest)).unwrap();
1113 let mut lw = LogWriter::new(mffile);
1114 lw.add_record(&ve.encode()).unwrap();
1115 lw.flush().unwrap();
1116 set_current_file(opt.env.as_ref().as_ref(), "db", 19).unwrap();
1117 }
1118
1119 {
1121 vs.recover().unwrap();
1122 assert_eq!(10, vs.log_num);
1123 assert_eq!(21, vs.next_file_num);
1124 assert_eq!(30, vs.last_seq);
1125 assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
1126 assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[1].len());
1127 assert_eq!(35, vs.write_snapshot().unwrap());
1128 }
1129
1130 {
1132 let mut ve = VersionEdit::new();
1133 ve.set_log_num(11);
1134 let fmd = FileMetaData {
1135 num: 21,
1136 size: 123,
1137 smallest: LookupKey::new(b"abc", 777).internal_key().to_vec().into(),
1138 largest: LookupKey::new(b"def", 700).internal_key().to_vec().into(),
1139 ..Default::default()
1140 };
1141 ve.add_file(1, fmd);
1142 vs.log_and_apply(ve).unwrap();
1143
1144 assert!(opt.env.exists(&Path::new("db").join("CURRENT")).unwrap());
1145 assert!(opt
1146 .env
1147 .exists(&Path::new("db").join("MANIFEST-000019"))
1148 .unwrap());
1149 assert_eq!(21, vs.new_file_number());
1151 assert_eq!(22, vs.next_file_num);
1152 assert_eq!(30, vs.last_seq);
1153 assert_eq!(11, vs.log_num);
1155
1156 assert_eq!(0, vs.current.as_ref().unwrap().borrow().files[0].len());
1159 assert_eq!(1, vs.current.as_ref().unwrap().borrow().files[1].len());
1160 assert_eq!(63, vs.write_snapshot().unwrap());
1161 }
1162 }
1163
1164 #[test]
1165 fn test_version_set_utils() {
1166 let (v, opt) = make_version();
1167 let mut vs = VersionSet::new(
1168 "db",
1169 opt.clone(),
1170 share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1171 );
1172 vs.add_version(v);
1173 assert_eq!(9, vs.live_files().len());
1175 assert!(vs.live_files().contains(&3));
1176
1177 let v = vs.current();
1178 let v = v.borrow();
1179 assert_eq!(483, v.num_level_bytes(0));
1181 assert_eq!(651, v.num_level_bytes(1));
1182 assert_eq!(468, v.num_level_bytes(2));
1183 assert_eq!(2, v.num_level_files(0));
1185 assert_eq!(3, v.num_level_files(1));
1186 assert_eq!(2, v.num_level_files(2));
1187 assert_eq!(2, vs.new_file_number());
1189 assert_eq!(3, vs.new_file_number());
1190 }
1191
1192 #[test]
1193 fn test_version_set_pick_compaction() {
1194 let (mut v, opt) = make_version();
1195 let mut vs = VersionSet::new(
1196 "db",
1197 opt.clone(),
1198 share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1199 );
1200
1201 v.compaction_score = Some(2.0);
1202 v.compaction_level = Some(0);
1203 vs.add_version(v);
1204
1205 {
1207 let c = vs.pick_compaction().unwrap();
1208 assert_eq!(2, c.inputs[0].len());
1209 assert_eq!(1, c.inputs[1].len());
1210 assert_eq!(0, c.level);
1211 assert!(c.input_version.is_some());
1212 }
1213 {
1215 let current = vs.current();
1216 current.borrow_mut().compaction_score = None;
1217 current.borrow_mut().compaction_level = None;
1218 current.borrow_mut().file_to_compact_lvl = 1;
1219
1220 let fmd = current.borrow().files[1][0].clone();
1221 current.borrow_mut().file_to_compact = Some(fmd);
1222
1223 let c = vs.pick_compaction().unwrap();
1224 assert_eq!(3, c.inputs[0].len()); assert_eq!(1, c.inputs[1].len());
1226 assert_eq!(1, c.level);
1227 assert!(c.input_version.is_some());
1228 }
1229 }
1230
1231 fn iterator_properties<It: LdbIterator>(mut it: It, len: usize, cmp: Rc<Box<dyn Cmp>>) {
1234 let mut wr = LdbIteratorIter::wrap(&mut it);
1235 let first = wr.next().unwrap();
1236 let mut count = 1;
1237 wr.fold(first, |(a, _), (b, c)| {
1238 assert_eq!(Ordering::Less, cmp.cmp(&a, &b));
1239 count += 1;
1240 (b, c)
1241 });
1242 assert_eq!(len, count);
1243 }
1244
1245 #[test]
1246 fn test_version_set_compaction() {
1247 let (v, opt) = make_version();
1248 let mut vs = VersionSet::new(
1249 "db",
1250 opt.clone(),
1251 share(TableCache::new("db", opt, share(Cache::new(128)), 100)),
1252 );
1253 time_test!();
1254 vs.add_version(v);
1255
1256 {
1257 let v = vs.current();
1259 assert_eq!(
1260 0,
1261 vs.approximate_offset(&v, LookupKey::new(b"aaa", 9000).internal_key())
1262 );
1263 assert_eq!(
1264 232,
1265 vs.approximate_offset(&v, LookupKey::new(b"bab", 9000).internal_key())
1266 );
1267 assert_eq!(
1268 1134,
1269 vs.approximate_offset(&v, LookupKey::new(b"fab", 9000).internal_key())
1270 );
1271 }
1272 {
1275 time_test!("compaction tests");
1276 let from = LookupKey::new(b"000", 1000);
1278 let to = LookupKey::new(b"ab", 1010);
1279 let c = vs
1280 .compact_range(0, from.internal_key(), to.internal_key())
1281 .unwrap();
1282 assert_eq!(2, c.inputs[0].len());
1283 assert_eq!(1, c.inputs[1].len());
1284 assert_eq!(1, c.grandparents.unwrap().len());
1285
1286 let from = LookupKey::new(b"000", 1000);
1288 let to = LookupKey::new(b"zzz", 1010);
1289 let c = vs
1290 .compact_range(0, from.internal_key(), to.internal_key())
1291 .unwrap();
1292 assert_eq!(2, c.inputs[0].len());
1293 assert_eq!(1, c.inputs[1].len());
1294 assert_eq!(1, c.grandparents.as_ref().unwrap().len());
1295 iterator_properties(
1296 vs.make_input_iterator(&c),
1297 12,
1298 Rc::new(Box::new(vs.cmp.clone())),
1299 );
1300
1301 let from = LookupKey::new(b"dab", 1000);
1303 let to = LookupKey::new(b"eab", 1010);
1304 let c = vs
1305 .compact_range(1, from.internal_key(), to.internal_key())
1306 .unwrap();
1307 assert_eq!(3, c.inputs[0].len());
1308 assert_eq!(1, c.inputs[1].len());
1309 assert_eq!(0, c.grandparents.as_ref().unwrap().len());
1310 iterator_properties(
1311 vs.make_input_iterator(&c),
1312 12,
1313 Rc::new(Box::new(vs.cmp.clone())),
1314 );
1315
1316 let from = LookupKey::new(b"fab", 1000);
1318 let to = LookupKey::new(b"fba", 1010);
1319 let mut c = vs
1320 .compact_range(2, from.internal_key(), to.internal_key())
1321 .unwrap();
1322 c.manual = false;
1324 assert!(c.is_trivial_move());
1325
1326 let from = LookupKey::new(b"000", 1000);
1328 let to = LookupKey::new(b"zzz", 1010);
1329 let mid = LookupKey::new(b"abc", 1010);
1330 let mut c = vs
1331 .compact_range(0, from.internal_key(), to.internal_key())
1332 .unwrap();
1333 assert!(!c.should_stop_before(from.internal_key()));
1334 assert!(!c.should_stop_before(mid.internal_key()));
1335 assert!(!c.should_stop_before(to.internal_key()));
1336
1337 let from = LookupKey::new(b"000", 1000);
1339 let to = LookupKey::new(b"zzz", 1010);
1340 let mut c = vs
1341 .compact_range(0, from.internal_key(), to.internal_key())
1342 .unwrap();
1343 assert!(c.is_base_level_for(b"aaa"));
1344 assert!(!c.is_base_level_for(b"hac"));
1345
1346 let from = LookupKey::new(b"000", 1000);
1348 let to = LookupKey::new(b"zzz", 1010);
1349 let mut c = vs
1350 .compact_range(0, from.internal_key(), to.internal_key())
1351 .unwrap();
1352 for inp in &[(0, 0, 1), (0, 1, 2), (1, 0, 3)] {
1353 let f = &c.inputs[inp.0][inp.1];
1354 assert_eq!(inp.2, f.borrow().num);
1355 }
1356 c.add_input_deletions();
1357 assert_eq!(23, c.edit().encode().len())
1358 }
1359 }
1360}