1pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10 blob_tree::FragmentationMap,
11 coding::{Decode, Encode},
12 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
13 config::Config,
14 file::BLOBS_FOLDER,
15 format_version::FormatVersion,
16 iter_guard::{IterGuard, IterGuardImpl},
17 manifest::Manifest,
18 memtable::Memtable,
19 segment::Segment,
20 slice::Slice,
21 tree::inner::SuperVersion,
22 value::InternalValue,
23 version::{recovery::recover_ids, Version, VersionId},
24 vlog::BlobFile,
25 AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter,
26 TreeType, UserKey, UserValue, ValueType,
27};
28use inner::{MemtableId, TreeId, TreeInner};
29use std::{
30 io::Cursor,
31 ops::{Bound, RangeBounds},
32 path::Path,
33 sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
34};
35
36#[cfg(feature = "metrics")]
37use crate::metrics::Metrics;
38
39pub struct Guard(crate::Result<(UserKey, UserValue)>);
40
41impl IterGuard for Guard {
42 fn key(self) -> crate::Result<UserKey> {
43 self.0.map(|(k, _)| k)
44 }
45
46 fn size(self) -> crate::Result<u32> {
47 #[allow(clippy::cast_possible_truncation)]
49 self.into_inner().map(|(_, v)| v.len() as u32)
50 }
51
52 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
53 self.0
54 }
55}
56
57fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
58 if item.is_tombstone() {
59 None
60 } else {
61 Some(item)
62 }
63}
64
65#[derive(Clone)]
67pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
68
69impl std::ops::Deref for Tree {
70 type Target = TreeInner;
71
72 fn deref(&self) -> &Self::Target {
73 &self.0
74 }
75}
76
77impl AbstractTree for Tree {
78 fn next_table_id(&self) -> SegmentId {
79 self.0
80 .segment_id_counter
81 .load(std::sync::atomic::Ordering::Relaxed)
82 }
83
84 fn id(&self) -> TreeId {
85 self.id
86 }
87
88 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
89 #[allow(clippy::significant_drop_tightening)]
90 let version_lock = self.super_version.read().expect("lock is poisoned");
91
92 if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
93 return Ok(ignore_tombstone_value(entry));
94 }
95
96 if let Some(entry) =
98 self.get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
99 {
100 return Ok(ignore_tombstone_value(entry));
101 }
102
103 self.get_internal_entry_from_segments(&version_lock, key, seqno)
105 }
106
107 fn current_version(&self) -> Version {
108 self.super_version.read().expect("poisoned").version.clone()
109 }
110
111 fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
112 log::debug!("Flushing active memtable");
113
114 let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
115 return Ok(None);
116 };
117
118 let Some((segment, _)) =
119 self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
120 else {
121 return Ok(None);
122 };
123 self.register_segments(std::slice::from_ref(&segment), None, None, seqno_threshold)?;
124
125 Ok(Some(segment))
126 }
127
128 #[cfg(feature = "metrics")]
129 fn metrics(&self) -> &Arc<crate::Metrics> {
130 &self.0.metrics
131 }
132
133 fn version_free_list_len(&self) -> usize {
134 self.compaction_state
135 .lock()
136 .expect("lock is poisoned")
137 .version_free_list_len()
138 }
139
140 fn prefix<K: AsRef<[u8]>>(
141 &self,
142 prefix: K,
143 seqno: SeqNo,
144 index: Option<Arc<Memtable>>,
145 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
146 Box::new(
147 self.create_prefix(&prefix, seqno, index)
148 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
149 )
150 }
151
152 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
153 &self,
154 range: R,
155 seqno: SeqNo,
156 index: Option<Arc<Memtable>>,
157 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
158 Box::new(
159 self.create_range(&range, seqno, index)
160 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
161 )
162 }
163
164 fn tombstone_count(&self) -> u64 {
166 self.current_version()
167 .iter_segments()
168 .map(Segment::tombstone_count)
169 .sum()
170 }
171
172 fn ingest(
173 &self,
174 iter: impl Iterator<Item = (UserKey, UserValue)>,
175 seqno_generator: &SequenceNumberCounter,
176 visible_seqno: &SequenceNumberCounter,
177 ) -> crate::Result<()> {
178 use crate::tree::ingest::Ingestion;
179 use std::time::Instant;
180
181 let seqno = seqno_generator.next();
185
186 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
197
198 let start = Instant::now();
199 let mut count = 0;
200 let mut last_key = None;
201
202 #[allow(clippy::explicit_counter_loop)]
203 for (key, value) in iter {
204 if let Some(last_key) = &last_key {
205 assert!(
206 key > last_key,
207 "next key in bulk ingest was not greater than last key",
208 );
209 }
210 last_key = Some(key.clone());
211
212 writer.write(key, value)?;
213
214 count += 1;
215 }
216
217 writer.finish()?;
218
219 visible_seqno.fetch_max(seqno + 1);
220
221 log::info!("Ingested {count} items in {:?}", start.elapsed());
222
223 Ok(())
224 }
225
226 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
227 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
228
229 if is_empty {
230 return Ok(());
231 }
232
233 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
234
235 let _lock = self
237 .0
238 .major_compaction_lock
239 .write()
240 .expect("lock is poisoned");
241
242 log::info!("Starting drop_range compaction");
243 self.inner_compact(strategy, 0)
244 }
245
246 #[doc(hidden)]
247 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
248 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
249
250 let _lock = self
252 .0
253 .major_compaction_lock
254 .write()
255 .expect("lock is poisoned");
256
257 log::info!("Starting major compaction");
258 self.inner_compact(strategy, seqno_threshold)
259 }
260
261 fn l0_run_count(&self) -> usize {
262 self.current_version()
263 .level(0)
264 .map(|x| x.run_count())
265 .unwrap_or_default()
266 }
267
268 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
269 #[allow(clippy::cast_possible_truncation)]
271 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
272 }
273
274 fn filter_size(&self) -> usize {
275 self.current_version()
276 .iter_segments()
277 .map(Segment::filter_size)
278 .sum()
279 }
280
281 fn pinned_filter_size(&self) -> usize {
282 self.current_version()
283 .iter_segments()
284 .map(Segment::pinned_filter_size)
285 .sum()
286 }
287
288 fn pinned_block_index_size(&self) -> usize {
289 self.current_version()
290 .iter_segments()
291 .map(Segment::pinned_block_index_size)
292 .sum()
293 }
294
295 fn sealed_memtable_count(&self) -> usize {
296 self.super_version
297 .read()
298 .expect("lock is poisoned")
299 .sealed_memtables
300 .len()
301 }
302
303 fn flush_memtable(
321 &self,
322 segment_id: SegmentId,
323 memtable: &Arc<Memtable>,
324 seqno_threshold: SeqNo,
325 ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
326 use crate::{compaction::stream::CompactionStream, file::SEGMENTS_FOLDER, segment::Writer};
327 use std::time::Instant;
328
329 let start = Instant::now();
330
331 let folder = self.config.path.join(SEGMENTS_FOLDER);
332 let segment_file_path = folder.join(segment_id.to_string());
333
334 let data_block_size = self.config.data_block_size_policy.get(0);
335 let index_block_size = self.config.index_block_size_policy.get(0);
336
337 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
338 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
339
340 let data_block_compression = self.config.data_block_compression_policy.get(0);
341 let index_block_compression = self.config.index_block_compression_policy.get(0);
342
343 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
344
345 log::debug!(
346 "Flushing segment to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
347 segment_file_path.display(),
348 );
349
350 let mut segment_writer = Writer::new(segment_file_path, segment_id)?
351 .use_data_block_restart_interval(data_block_restart_interval)
352 .use_index_block_restart_interval(index_block_restart_interval)
353 .use_data_block_compression(data_block_compression)
354 .use_index_block_compression(index_block_compression)
355 .use_data_block_size(data_block_size)
356 .use_index_block_size(index_block_size)
357 .use_data_block_hash_ratio(data_block_hash_ratio)
358 .use_bloom_policy({
359 use crate::config::FilterPolicyEntry::{Bloom, None};
360 use crate::segment::filter::BloomConstructionPolicy;
361
362 match self.config.filter_policy.get(0) {
363 Bloom(policy) => policy,
364 None => BloomConstructionPolicy::BitsPerKey(0.0),
365 }
366 });
367
368 let iter = memtable.iter().map(Ok);
369 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
370
371 for item in compaction_filter {
372 segment_writer.write(item?)?;
373 }
374
375 let result = self.consume_writer(segment_writer)?;
376
377 log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
378
379 Ok(result.map(|segment| (segment, None)))
380 }
381
382 fn register_segments(
383 &self,
384 segments: &[Segment],
385 blob_files: Option<&[BlobFile]>,
386 frag_map: Option<FragmentationMap>,
387 seqno_threshold: SeqNo,
388 ) -> crate::Result<()> {
389 log::trace!(
390 "Registering {} segments, {} blob files",
391 segments.len(),
392 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
393 );
394
395 let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
396 let mut super_version = self.super_version.write().expect("lock is poisoned");
397
398 compaction_state.upgrade_version(
399 &mut super_version,
400 |version| {
401 Ok(version.with_new_l0_run(
402 segments,
403 blob_files,
404 frag_map.filter(|x| !x.is_empty()),
405 ))
406 },
407 seqno_threshold,
408 )?;
409
410 for segment in segments {
411 log::trace!("releasing sealed memtable {}", segment.id());
412
413 super_version.sealed_memtables =
414 Arc::new(super_version.sealed_memtables.remove(segment.id()));
415 }
416
417 Ok(())
418 }
419
420 fn clear_active_memtable(&self) {
421 self.super_version
422 .write()
423 .expect("lock is poisoned")
424 .active_memtable = Arc::new(Memtable::default());
425 }
426
427 fn set_active_memtable(&self, memtable: Memtable) {
428 let mut version_lock = self.super_version.write().expect("lock is poisoned");
429 version_lock.active_memtable = Arc::new(memtable);
430 }
431
432 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
433 let mut version_lock = self.super_version.write().expect("lock is poisoned");
434 version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
435 }
436
437 fn compact(
438 &self,
439 strategy: Arc<dyn CompactionStrategy>,
440 seqno_threshold: SeqNo,
441 ) -> crate::Result<()> {
442 let _lock = self
446 .0
447 .major_compaction_lock
448 .read()
449 .expect("lock is poisoned");
450
451 self.inner_compact(strategy, seqno_threshold)
452 }
453
454 fn get_next_segment_id(&self) -> SegmentId {
455 self.0.get_next_segment_id()
456 }
457
458 fn tree_config(&self) -> &Config {
459 &self.config
460 }
461
462 fn active_memtable_size(&self) -> u64 {
463 use std::sync::atomic::Ordering::Acquire;
464
465 self.super_version
466 .read()
467 .expect("lock is poisoned")
468 .active_memtable
469 .approximate_size
470 .load(Acquire)
471 }
472
473 fn tree_type(&self) -> crate::TreeType {
474 crate::TreeType::Standard
475 }
476
477 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
478 let mut version_lock = self.super_version.write().expect("lock is poisoned");
479
480 if version_lock.active_memtable.is_empty() {
481 return None;
482 }
483
484 let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
485 let yanked_memtable = yanked_memtable;
486
487 let tmp_memtable_id = self.get_next_segment_id();
488
489 version_lock.sealed_memtables = Arc::new(
490 version_lock
491 .sealed_memtables
492 .add(tmp_memtable_id, yanked_memtable.clone()),
493 );
494
495 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
496
497 Some((tmp_memtable_id, yanked_memtable))
498 }
499
500 fn segment_count(&self) -> usize {
501 self.current_version().segment_count()
502 }
503
504 fn level_segment_count(&self, idx: usize) -> Option<usize> {
505 self.current_version().level(idx).map(|x| x.segment_count())
506 }
507
508 #[allow(clippy::significant_drop_tightening)]
509 fn approximate_len(&self) -> usize {
510 let version = self.super_version.read().expect("lock is poisoned");
511
512 let segments_item_count = self
513 .current_version()
514 .iter_segments()
515 .map(|x| x.metadata.item_count)
516 .sum::<u64>();
517
518 let memtable_count = version.active_memtable.len() as u64;
519 let sealed_count = version
520 .sealed_memtables
521 .iter()
522 .map(|(_, mt)| mt.len())
523 .sum::<usize>() as u64;
524
525 (memtable_count + sealed_count + segments_item_count)
526 .try_into()
527 .expect("should not be too large")
528 }
529
530 fn disk_space(&self) -> u64 {
531 self.current_version()
532 .iter_levels()
533 .map(super::version::Level::size)
534 .sum()
535 }
536
537 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
538 let version = self.super_version.read().expect("lock is poisoned");
539
540 let active = version.active_memtable.get_highest_seqno();
541
542 let sealed = version
543 .sealed_memtables
544 .iter()
545 .map(|(_, table)| table.get_highest_seqno())
546 .max()
547 .flatten();
548
549 active.max(sealed)
550 }
551
552 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
553 self.current_version()
554 .iter_segments()
555 .map(Segment::get_highest_seqno)
556 .max()
557 }
558
559 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
560 Ok(self
561 .get_internal_entry(key.as_ref(), seqno)?
562 .map(|x| x.value))
563 }
564
565 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
566 &self,
567 key: K,
568 value: V,
569 seqno: SeqNo,
570 ) -> (u64, u64) {
571 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
572 self.append_entry(value)
573 }
574
575 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
576 let value = InternalValue::new_tombstone(key, seqno);
577 self.append_entry(value)
578 }
579
580 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581 let value = InternalValue::new_weak_tombstone(key, seqno);
582 self.append_entry(value)
583 }
584}
585
586impl Tree {
587 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
598 range: &R,
599 ) -> (OwnedBounds, bool) {
600 use Bound::{Excluded, Included, Unbounded};
601
602 let start = match range.start_bound() {
603 Included(key) => Included(Slice::from(key.as_ref())),
604 Excluded(key) => Excluded(Slice::from(key.as_ref())),
605 Unbounded => Unbounded,
606 };
607
608 let end = match range.end_bound() {
609 Included(key) => Included(Slice::from(key.as_ref())),
610 Excluded(key) => Excluded(Slice::from(key.as_ref())),
611 Unbounded => Unbounded,
612 };
613
614 let is_empty =
615 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
616 lo.as_ref() > hi.as_ref()
617 } else {
618 false
619 };
620
621 (OwnedBounds { start, end }, is_empty)
622 }
623
624 pub(crate) fn open(config: Config) -> crate::Result<Self> {
637 use crate::file::MANIFEST_FILE;
638
639 log::debug!("Opening LSM-tree at {}", config.path.display());
640
641 if config.path.join("version").try_exists()? {
643 return Err(crate::Error::InvalidVersion(FormatVersion::V1));
644 }
645
646 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
647 Self::recover(config)
648 } else {
649 Self::create_new(config)
650 }?;
651
652 Ok(tree)
653 }
654
655 pub(crate) fn consume_writer(
656 &self,
657 writer: crate::segment::Writer,
658 ) -> crate::Result<Option<Segment>> {
659 let segment_file_path = writer.path.clone();
660
661 let Some(_) = writer.finish()? else {
662 return Ok(None);
663 };
664
665 log::debug!("Finalized segment write at {}", segment_file_path.display());
666
667 let pin_filter = self.config.filter_block_pinning_policy.get(0);
668 let pin_index = self.config.filter_block_pinning_policy.get(0);
669
670 let created_segment = Segment::recover(
671 segment_file_path,
672 self.id,
673 self.config.cache.clone(),
674 self.config.descriptor_table.clone(),
675 pin_filter,
676 pin_index,
677 #[cfg(feature = "metrics")]
678 self.metrics.clone(),
679 )?;
680
681 log::debug!("Flushed segment to {:?}", created_segment.path);
682
683 Ok(Some(created_segment))
684 }
685
686 #[doc(hidden)]
688 #[must_use]
689 pub fn is_compacting(&self) -> bool {
690 !self
691 .compaction_state
692 .lock()
693 .expect("lock is poisoned")
694 .hidden_set()
695 .is_empty()
696 }
697
698 fn get_internal_entry_from_sealed_memtables(
699 &self,
700 super_version: &SuperVersion,
701 key: &[u8],
702 seqno: SeqNo,
703 ) -> Option<InternalValue> {
704 for (_, memtable) in super_version.sealed_memtables.iter().rev() {
705 if let Some(entry) = memtable.get(key, seqno) {
706 return Some(entry);
707 }
708 }
709
710 None
711 }
712
713 fn get_internal_entry_from_segments(
714 &self,
715 super_version: &SuperVersion,
716 key: &[u8],
717 seqno: SeqNo,
718 ) -> crate::Result<Option<InternalValue>> {
719 let key_hash = crate::segment::filter::standard_bloom::Builder::get_hash(key);
722
723 for level in super_version.version.iter_levels() {
724 for run in level.iter() {
725 if run.len() >= 4 {
727 if let Some(segment) = run.get_for_key(key) {
728 if let Some(item) = segment.get(key, seqno, key_hash)? {
729 return Ok(ignore_tombstone_value(item));
730 }
731 }
732 } else {
733 for segment in run.iter() {
735 if !segment.is_key_in_key_range(key) {
736 continue;
737 }
738
739 if let Some(item) = segment.get(key, seqno, key_hash)? {
740 return Ok(ignore_tombstone_value(item));
741 }
742 }
743 }
744 }
745 }
746
747 Ok(None)
748 }
749
750 fn inner_compact(
751 &self,
752 strategy: Arc<dyn CompactionStrategy>,
753 seqno_threshold: SeqNo,
754 ) -> crate::Result<()> {
755 use crate::compaction::worker::{do_compaction, Options};
756
757 let mut opts = Options::from_tree(self, strategy);
758 opts.eviction_seqno = seqno_threshold;
759
760 do_compaction(&opts)?;
761
762 log::debug!("Compaction run over");
763
764 Ok(())
765 }
766
767 #[doc(hidden)]
768 #[must_use]
769 pub fn create_iter(
770 &self,
771 seqno: SeqNo,
772 ephemeral: Option<Arc<Memtable>>,
773 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
774 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
775 }
776
777 #[doc(hidden)]
778 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
779 &'a self,
780 range: &'a R,
781 seqno: SeqNo,
782 ephemeral: Option<Arc<Memtable>>,
783 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
784 use crate::range::{IterState, TreeIter};
785 use std::ops::Bound::{self, Excluded, Included, Unbounded};
786
787 let lo: Bound<UserKey> = match range.start_bound() {
788 Included(x) => Included(x.as_ref().into()),
789 Excluded(x) => Excluded(x.as_ref().into()),
790 Unbounded => Unbounded,
791 };
792
793 let hi: Bound<UserKey> = match range.end_bound() {
794 Included(x) => Included(x.as_ref().into()),
795 Excluded(x) => Excluded(x.as_ref().into()),
796 Unbounded => Unbounded,
797 };
798
799 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
800
801 let super_version = self.super_version.write().expect("lock is poisoned");
802
803 let iter_state = {
804 let active = &super_version.active_memtable;
805 let sealed = &super_version.sealed_memtables;
806
807 IterState {
808 active: active.clone(),
809 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
810 ephemeral,
811 version: super_version.version.clone(),
812 }
813 };
814
815 TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
816 }
817
818 #[doc(hidden)]
819 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
820 &self,
821 range: &'a R,
822 seqno: SeqNo,
823 ephemeral: Option<Arc<Memtable>>,
824 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
825 self.create_internal_range(range, seqno, ephemeral)
826 .map(|item| match item {
827 Ok(kv) => Ok((kv.key.user_key, kv.value)),
828 Err(e) => Err(e),
829 })
830 }
831
832 #[doc(hidden)]
833 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
834 &self,
835 prefix: K,
836 seqno: SeqNo,
837 ephemeral: Option<Arc<Memtable>>,
838 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839 use crate::range::prefix_to_range;
840
841 let range = prefix_to_range(prefix.as_ref());
842 self.create_range(&range, seqno, ephemeral)
843 }
844
845 #[doc(hidden)]
849 #[must_use]
850 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
851 self.super_version
852 .read()
853 .expect("lock is poisoned")
854 .active_memtable
855 .insert(value)
856 }
857
858 fn recover(mut config: Config) -> crate::Result<Self> {
864 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
865 use inner::get_next_tree_id;
866
867 log::info!("Recovering LSM-tree at {}", config.path.display());
868
869 let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
870 let mut bytes = Cursor::new(bytes);
871 let manifest = Manifest::decode_from(&mut bytes)?;
872
873 if manifest.version != FormatVersion::V3 {
874 return Err(crate::Error::InvalidVersion(manifest.version));
875 }
876
877 config.level_count = manifest.level_count;
879
880 let tree_id = get_next_tree_id();
881
882 #[cfg(feature = "metrics")]
883 let metrics = Arc::new(Metrics::default());
884
885 let version = Self::recover_levels(
886 &config.path,
887 tree_id,
888 &config.cache,
889 &config.descriptor_table,
890 #[cfg(feature = "metrics")]
891 &metrics,
892 )?;
893
894 let highest_segment_id = version
895 .iter_segments()
896 .map(Segment::id)
897 .max()
898 .unwrap_or_default();
899
900 let path = config.path.clone();
901
902 let inner = TreeInner {
903 id: tree_id,
904 segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
905 blob_file_id_generator: SequenceNumberCounter::default(),
906 super_version: Arc::new(RwLock::new(SuperVersion {
907 active_memtable: Arc::default(),
908 sealed_memtables: Arc::default(),
909 version,
910 })),
911 stop_signal: StopSignal::default(),
912 config,
913 major_compaction_lock: RwLock::default(),
914 compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
915
916 #[cfg(feature = "metrics")]
917 metrics,
918 };
919
920 Ok(Self(Arc::new(inner)))
921 }
922
923 fn create_new(config: Config) -> crate::Result<Self> {
925 use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
926 use std::fs::{create_dir_all, File};
927
928 let path = config.path.clone();
929 log::trace!("Creating LSM-tree at {}", path.display());
930
931 create_dir_all(&path)?;
932
933 let manifest_path = path.join(MANIFEST_FILE);
934 assert!(!manifest_path.try_exists()?);
935
936 let segment_folder_path = path.join(SEGMENTS_FOLDER);
937 create_dir_all(&segment_folder_path)?;
938
939 let mut file = File::create_new(manifest_path)?;
942 Manifest {
943 version: FormatVersion::V3,
944 level_count: config.level_count,
945 tree_type: if config.kv_separation_opts.is_some() {
946 TreeType::Blob
947 } else {
948 TreeType::Standard
949 },
950 }
951 .encode_into(&mut file)?;
952 file.sync_all()?;
953
954 fsync_directory(&segment_folder_path)?;
956 fsync_directory(&path)?;
957
958 let inner = TreeInner::create_new(config)?;
959 Ok(Self(Arc::new(inner)))
960 }
961
962 fn recover_levels<P: AsRef<Path>>(
964 tree_path: P,
965 tree_id: TreeId,
966 cache: &Arc<Cache>,
967 descriptor_table: &Arc<DescriptorTable>,
968 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
969 ) -> crate::Result<Version> {
970 use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId};
971
972 let tree_path = tree_path.as_ref();
973
974 let recovery = recover_ids(tree_path)?;
975
976 let table_id_map = {
977 let mut result: crate::HashMap<SegmentId, u8 > =
978 crate::HashMap::default();
979
980 for (level_idx, table_ids) in recovery.segment_ids.iter().enumerate() {
981 for run in table_ids {
982 for table_id in run {
983 #[allow(clippy::expect_used)]
985 result.insert(
986 *table_id,
987 level_idx
988 .try_into()
989 .expect("there are less than 256 levels"),
990 );
991 }
992 }
993 }
994
995 result
996 };
997
998 let cnt = table_id_map.len();
999
1000 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1001
1002 let progress_mod = match cnt {
1003 _ if cnt <= 20 => 1,
1004 _ if cnt <= 100 => 10,
1005 _ => 100,
1006 };
1007
1008 let mut tables = vec![];
1009
1010 let table_base_folder = tree_path.join(SEGMENTS_FOLDER);
1011
1012 if !table_base_folder.try_exists()? {
1013 std::fs::create_dir_all(&table_base_folder)?;
1014 fsync_directory(&table_base_folder)?;
1015 }
1016
1017 let mut orphaned_tables = vec![];
1018
1019 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1020 let dirent = dirent?;
1021 let file_name = dirent.file_name();
1022
1023 if file_name == ".DS_Store" {
1025 continue;
1026 }
1027
1028 if file_name.to_string_lossy().starts_with("._") {
1030 continue;
1031 }
1032
1033 let table_file_name = file_name.to_str().ok_or_else(|| {
1034 log::error!("invalid table file name {}", file_name.display());
1035 crate::Error::Unrecoverable
1036 })?;
1037
1038 let table_file_path = dirent.path();
1039 assert!(!table_file_path.is_dir());
1040
1041 log::debug!("Recovering table from {}", table_file_path.display());
1042
1043 let table_id = table_file_name.parse::<SegmentId>().map_err(|e| {
1044 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1045 crate::Error::Unrecoverable
1046 })?;
1047
1048 if let Some(&level_idx) = table_id_map.get(&table_id) {
1049 let table = Segment::recover(
1050 table_file_path,
1051 tree_id,
1052 cache.clone(),
1053 descriptor_table.clone(),
1054 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1057 metrics.clone(),
1058 )?;
1059
1060 log::debug!("Recovered table from {:?}", table.path);
1061
1062 tables.push(table);
1063
1064 if idx % progress_mod == 0 {
1065 log::debug!("Recovered {idx}/{cnt} tables");
1066 }
1067 } else {
1068 orphaned_tables.push(table_file_path);
1069 }
1070 }
1071
1072 if tables.len() < cnt {
1073 log::error!(
1074 "Recovered less tables than expected: {:?}",
1075 table_id_map.keys(),
1076 );
1077 return Err(crate::Error::Unrecoverable);
1078 }
1079
1080 log::debug!("Successfully recovered {} tables", tables.len());
1081
1082 let blob_files = crate::vlog::recover_blob_files(
1083 &tree_path.join(BLOBS_FOLDER),
1084 &recovery.blob_file_ids,
1085 )?;
1086
1087 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1088
1089 Self::cleanup_orphaned_version(tree_path, version.id())?;
1092
1093 for table_path in orphaned_tables {
1094 log::debug!("Deleting orphaned table {}", table_path.display());
1095 std::fs::remove_file(&table_path)?;
1096 }
1097
1098 Ok(version)
1101 }
1102
1103 fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1104 let version_str = format!("v{latest_version_id}");
1105
1106 for file in std::fs::read_dir(path)? {
1107 let dirent = file?;
1108
1109 if dirent.file_type()?.is_dir() {
1110 continue;
1111 }
1112
1113 let name = dirent.file_name();
1114
1115 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1116 log::trace!("Cleanup orphaned version {}", name.display());
1117 std::fs::remove_file(dirent.path())?;
1118 }
1119 }
1120
1121 Ok(())
1122 }
1123}