1pub mod ingest;
6pub mod inner;
7
8use crate::{
9 coding::{Decode, Encode},
10 compaction::CompactionStrategy,
11 config::Config,
12 file::BLOBS_FOLDER,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 level_manifest::LevelManifest,
16 manifest::Manifest,
17 memtable::Memtable,
18 segment::Segment,
19 value::InternalValue,
20 vlog::BlobFile,
21 AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter, UserKey,
22 UserValue, ValueType,
23};
24use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
25use std::{
26 io::Cursor,
27 ops::RangeBounds,
28 path::Path,
29 sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
36
37impl IterGuard for Guard {
38 fn key(self) -> crate::Result<UserKey> {
39 self.0.map(|(k, _)| k)
40 }
41
42 fn size(self) -> crate::Result<u32> {
43 #[allow(clippy::cast_possible_truncation)]
45 self.into_inner().map(|(_, v)| v.len() as u32)
46 }
47
48 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
49 self.0
50 }
51}
52
53fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
54 if item.is_tombstone() {
55 None
56 } else {
57 Some(item)
58 }
59}
60
61#[derive(Clone)]
63pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
64
65impl std::ops::Deref for Tree {
66 type Target = TreeInner;
67
68 fn deref(&self) -> &Self::Target {
69 &self.0
70 }
71}
72
73impl AbstractTree for Tree {
74 #[cfg(feature = "metrics")]
75 fn metrics(&self) -> &Arc<crate::Metrics> {
76 &self.0.metrics
77 }
78
79 fn version_free_list_len(&self) -> usize {
80 self.manifest
81 .read()
82 .expect("lock is poisoned")
83 .version_free_list
84 .len()
85 }
86
87 fn prefix<K: AsRef<[u8]>>(
88 &self,
89 prefix: K,
90 seqno: SeqNo,
91 index: Option<Arc<Memtable>>,
92 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
93 Box::new(
94 self.create_prefix(&prefix, seqno, index)
95 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
96 )
97 }
98
99 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
100 &self,
101 range: R,
102 seqno: SeqNo,
103 index: Option<Arc<Memtable>>,
104 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
105 Box::new(
106 self.create_range(&range, seqno, index)
107 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
108 )
109 }
110
111 fn tombstone_count(&self) -> u64 {
113 self.manifest
114 .read()
115 .expect("lock is poisoned")
116 .current_version()
117 .iter_segments()
118 .map(Segment::tombstone_count)
119 .sum()
120 }
121
122 fn ingest(
123 &self,
124 iter: impl Iterator<Item = (UserKey, UserValue)>,
125 seqno_generator: &SequenceNumberCounter,
126 visible_seqno: &SequenceNumberCounter,
127 ) -> crate::Result<()> {
128 use crate::tree::ingest::Ingestion;
129 use std::time::Instant;
130
131 let memtable_lock = self.lock_active_memtable();
133
134 let seqno = seqno_generator.next();
135
136 assert!(
138 memtable_lock.is_empty(),
139 "can only perform bulk ingestion with empty memtable",
140 );
141
142 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
143
144 let start = Instant::now();
145 let mut count = 0;
146 let mut last_key = None;
147
148 for (key, value) in iter {
149 if let Some(last_key) = &last_key {
150 assert!(
151 key > last_key,
152 "next key in bulk ingest was not greater than last key",
153 );
154 }
155 last_key = Some(key.clone());
156
157 writer.write(key, value)?;
158
159 count += 1;
160 }
161
162 writer.finish()?;
163
164 visible_seqno.fetch_max(seqno + 1);
165
166 log::info!("Ingested {count} items in {:?}", start.elapsed());
167
168 Ok(())
169 }
170
171 fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
173 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(key_range));
174
175 let _lock = self
177 .0
178 .major_compaction_lock
179 .write()
180 .expect("lock is poisoned");
181
182 log::info!("Starting drop_range compaction");
183 self.inner_compact(strategy, 0)
184 }
185
186 #[doc(hidden)]
187 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
188 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
189
190 let _lock = self
192 .0
193 .major_compaction_lock
194 .write()
195 .expect("lock is poisoned");
196
197 log::info!("Starting major compaction");
198 self.inner_compact(strategy, seqno_threshold)
199 }
200
201 fn l0_run_count(&self) -> usize {
202 self.manifest
203 .read()
204 .expect("lock is poisoned")
205 .current_version()
206 .level(0)
207 .map(|x| x.run_count())
208 .unwrap_or_default()
209 }
210
211 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
212 #[allow(clippy::cast_possible_truncation)]
214 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
215 }
216
217 fn filter_size(&self) -> usize {
218 self.manifest
219 .read()
220 .expect("lock is poisoned")
221 .current_version()
222 .iter_segments()
223 .map(Segment::filter_size)
224 .sum()
225 }
226
227 fn pinned_filter_size(&self) -> usize {
228 self.manifest
229 .read()
230 .expect("lock is poisoned")
231 .current_version()
232 .iter_segments()
233 .map(Segment::pinned_filter_size)
234 .sum()
235 }
236
237 fn pinned_block_index_size(&self) -> usize {
238 self.manifest
239 .read()
240 .expect("lock is poisoned")
241 .current_version()
242 .iter_segments()
243 .map(Segment::pinned_block_index_size)
244 .sum()
245 }
246
247 fn sealed_memtable_count(&self) -> usize {
248 self.sealed_memtables
249 .read()
250 .expect("lock is poisoned")
251 .len()
252 }
253
254 fn flush_memtable(
272 &self,
273 segment_id: SegmentId,
274 memtable: &Arc<Memtable>,
275 seqno_threshold: SeqNo,
276 ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
277 use crate::{compaction::stream::CompactionStream, file::SEGMENTS_FOLDER, segment::Writer};
278 use std::time::Instant;
279
280 let start = Instant::now();
281
282 let folder = self.config.path.join(SEGMENTS_FOLDER);
283 let segment_file_path = folder.join(segment_id.to_string());
284
285 let data_block_size = self.config.data_block_size_policy.get(0);
286 let index_block_size = self.config.index_block_size_policy.get(0);
287
288 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
289 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
290
291 let data_block_compression = self.config.data_block_compression_policy.get(0);
292 let index_block_compression = self.config.index_block_compression_policy.get(0);
293
294 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
295
296 log::debug!(
297 "Flushing segment to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
298 segment_file_path.display(),
299 );
300
301 let mut segment_writer = Writer::new(segment_file_path, segment_id)?
302 .use_data_block_restart_interval(data_block_restart_interval)
303 .use_index_block_restart_interval(index_block_restart_interval)
304 .use_data_block_compression(data_block_compression)
305 .use_index_block_compression(index_block_compression)
306 .use_data_block_size(data_block_size)
307 .use_index_block_size(index_block_size)
308 .use_data_block_hash_ratio(data_block_hash_ratio)
309 .use_bloom_policy({
310 use crate::config::FilterPolicyEntry::{Bloom, None};
311 use crate::segment::filter::BloomConstructionPolicy;
312
313 match self.config.filter_policy.get(0) {
314 Bloom(policy) => policy,
315 None => BloomConstructionPolicy::BitsPerKey(0.0),
316 }
317 });
318
319 let iter = memtable.iter().map(Ok);
320 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
321
322 for item in compaction_filter {
323 segment_writer.write(item?)?;
324 }
325
326 let result = self.consume_writer(segment_writer)?;
327
328 log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
329
330 Ok(result.map(|segment| (segment, None)))
331 }
332
333 fn register_segments(
334 &self,
335 segments: &[Segment],
336 blob_files: Option<&[BlobFile]>,
337 seqno_threshold: SeqNo,
338 ) -> crate::Result<()> {
339 log::trace!(
340 "Registering {} segments, {} blob files",
341 segments.len(),
342 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
343 );
344
345 log::trace!("register: Acquiring levels manifest write lock");
347 let mut manifest = self.manifest.write().expect("lock is poisoned");
348 log::trace!("register: Acquired levels manifest write lock");
349
350 log::trace!("register: Acquiring sealed memtables write lock");
352 let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
353 log::trace!("register: Acquired sealed memtables write lock");
354
355 manifest.atomic_swap(
356 |version| version.with_new_l0_run(segments, blob_files),
357 seqno_threshold,
358 )?;
359
360 for segment in segments {
361 log::trace!("releasing sealed memtable {}", segment.id());
362 sealed_memtables.remove(segment.id());
363 }
364
365 Ok(())
366 }
367
368 fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
369 self.active_memtable.write().expect("lock is poisoned")
370 }
371
372 fn clear_active_memtable(&self) {
373 *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
374 }
375
376 fn set_active_memtable(&self, memtable: Memtable) {
377 let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
378 *memtable_lock = Arc::new(memtable);
379 }
380
381 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
382 let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
383 memtable_lock.add(id, memtable);
384 }
385
386 fn compact(
387 &self,
388 strategy: Arc<dyn CompactionStrategy>,
389 seqno_threshold: SeqNo,
390 ) -> crate::Result<()> {
391 let _lock = self
395 .0
396 .major_compaction_lock
397 .read()
398 .expect("lock is poisoned");
399
400 self.inner_compact(strategy, seqno_threshold)
401 }
402
403 fn get_next_segment_id(&self) -> SegmentId {
404 self.0.get_next_segment_id()
405 }
406
407 fn tree_config(&self) -> &Config {
408 &self.config
409 }
410
411 fn active_memtable_size(&self) -> u64 {
412 use std::sync::atomic::Ordering::Acquire;
413
414 self.active_memtable
415 .read()
416 .expect("lock is poisoned")
417 .approximate_size
418 .load(Acquire)
419 }
420
421 fn tree_type(&self) -> crate::TreeType {
422 crate::TreeType::Standard
423 }
424
425 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
426 log::trace!("rotate: acquiring active memtable write lock");
427 let mut active_memtable = self.lock_active_memtable();
428
429 log::trace!("rotate: acquiring sealed memtables write lock");
430 let mut sealed_memtables = self.lock_sealed_memtables();
431
432 if active_memtable.is_empty() {
433 return None;
434 }
435
436 let yanked_memtable = std::mem::take(&mut *active_memtable);
437 let yanked_memtable = yanked_memtable;
438
439 let tmp_memtable_id = self.get_next_segment_id();
440 sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
441
442 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
443
444 Some((tmp_memtable_id, yanked_memtable))
445 }
446
447 fn segment_count(&self) -> usize {
448 self.manifest
449 .read()
450 .expect("lock is poisoned")
451 .current_version()
452 .segment_count()
453 }
454
455 fn level_segment_count(&self, idx: usize) -> Option<usize> {
456 self.manifest
457 .read()
458 .expect("lock is poisoned")
459 .current_version()
460 .level(idx)
461 .map(|x| x.segment_count())
462 }
463
464 #[allow(clippy::significant_drop_tightening)]
465 fn approximate_len(&self) -> usize {
466 let manifest = self.manifest.read().expect("lock is poisoned");
468 let memtable = self.active_memtable.read().expect("lock is poisoned");
469 let sealed = self.sealed_memtables.read().expect("lock is poisoned");
470
471 let segments_item_count = manifest
472 .current_version()
473 .iter_segments()
474 .map(|x| x.metadata.item_count)
475 .sum::<u64>();
476
477 let memtable_count = memtable.len() as u64;
478 let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
479
480 (memtable_count + sealed_count + segments_item_count)
481 .try_into()
482 .expect("should not be too large")
483 }
484
485 fn disk_space(&self) -> u64 {
486 self.manifest
487 .read()
488 .expect("lock is poisoned")
489 .current_version()
490 .iter_levels()
491 .map(super::version::Level::size)
492 .sum()
493 }
494
495 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
496 let active = self
497 .active_memtable
498 .read()
499 .expect("lock is poisoned")
500 .get_highest_seqno();
501
502 let sealed = self
503 .sealed_memtables
504 .read()
505 .expect("Lock is poisoned")
506 .iter()
507 .map(|(_, table)| table.get_highest_seqno())
508 .max()
509 .flatten();
510
511 active.max(sealed)
512 }
513
514 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
515 self.manifest
516 .read()
517 .expect("lock is poisoned")
518 .current_version()
519 .iter_segments()
520 .map(Segment::get_highest_seqno)
521 .max()
522 }
523
524 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
525 Ok(self
526 .get_internal_entry(key.as_ref(), seqno)?
527 .map(|x| x.value))
528 }
529
530 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
531 &self,
532 key: K,
533 value: V,
534 seqno: SeqNo,
535 ) -> (u64, u64) {
536 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
537 self.append_entry(value)
538 }
539
540 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
541 let value = InternalValue::new_tombstone(key, seqno);
542 self.append_entry(value)
543 }
544
545 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
546 let value = InternalValue::new_weak_tombstone(key, seqno);
547 self.append_entry(value)
548 }
549}
550
551impl Tree {
552 pub(crate) fn open(config: Config) -> crate::Result<Self> {
565 use crate::file::MANIFEST_FILE;
566
567 log::debug!("Opening LSM-tree at {}", config.path.display());
568
569 if config.path.join("version").try_exists()? {
571 return Err(crate::Error::InvalidVersion(FormatVersion::V1));
572 }
573
574 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
575 Self::recover(config)
576 } else {
577 Self::create_new(config)
578 }?;
579
580 Ok(tree)
581 }
582
583 pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
584 self.active_memtable.read().expect("lock is poisoned")
585 }
586
587 pub(crate) fn consume_writer(
588 &self,
589 writer: crate::segment::Writer,
590 ) -> crate::Result<Option<Segment>> {
591 let segment_file_path = writer.path.clone();
592
593 let Some(_) = writer.finish()? else {
594 return Ok(None);
595 };
596
597 log::debug!("Finalized segment write at {}", segment_file_path.display());
598
599 let pin_filter = self.config.filter_block_pinning_policy.get(0);
626 let pin_index = self.config.filter_block_pinning_policy.get(0);
627
628 let created_segment = Segment::recover(
629 segment_file_path,
630 self.id,
631 self.config.cache.clone(),
632 self.config.descriptor_table.clone(),
633 pin_filter,
634 pin_index,
635 #[cfg(feature = "metrics")]
636 self.metrics.clone(),
637 )?;
638
639 log::debug!("Flushed segment to {:?}", created_segment.path);
640
641 Ok(Some(created_segment))
642 }
643
644 #[doc(hidden)]
655 pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
656 log::debug!("Flushing active memtable");
657
658 let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
659 return Ok(None);
660 };
661
662 let Some((segment, _)) =
663 self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
664 else {
665 return Ok(None);
666 };
667 self.register_segments(std::slice::from_ref(&segment), None, seqno_threshold)?;
668
669 Ok(Some(segment))
670 }
671
672 #[doc(hidden)]
674 #[must_use]
675 pub fn is_compacting(&self) -> bool {
676 self.manifest
677 .read()
678 .expect("lock is poisoned")
679 .is_compacting()
680 }
681
682 fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
684 self.sealed_memtables.write().expect("lock is poisoned")
685 }
686
687 pub(crate) fn get_internal_entry_with_memtable(
690 &self,
691 memtable_lock: &Memtable,
692 key: &[u8],
693 seqno: SeqNo,
694 ) -> crate::Result<Option<InternalValue>> {
695 if let Some(entry) = memtable_lock.get(key, seqno) {
696 return Ok(ignore_tombstone_value(entry));
697 }
698
699 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
701 return Ok(ignore_tombstone_value(entry));
702 }
703
704 self.get_internal_entry_from_segments(key, seqno)
705 }
706
707 fn get_internal_entry_from_sealed_memtables(
708 &self,
709 key: &[u8],
710 seqno: SeqNo,
711 ) -> Option<InternalValue> {
712 let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
713
714 for (_, memtable) in memtable_lock.iter().rev() {
715 if let Some(entry) = memtable.get(key, seqno) {
716 return Some(entry);
717 }
718 }
719
720 None
721 }
722
723 fn get_internal_entry_from_segments(
724 &self,
725 key: &[u8],
726 seqno: SeqNo,
727 ) -> crate::Result<Option<InternalValue>> {
728 let key_hash = crate::segment::filter::standard_bloom::Builder::get_hash(key);
731
732 let manifest = self.manifest.read().expect("lock is poisoned");
733
734 for level in manifest.current_version().iter_levels() {
735 for run in level.iter() {
736 if run.len() >= 4 {
738 if let Some(segment) = run.get_for_key(key) {
739 if let Some(item) = segment.get(key, seqno, key_hash)? {
740 return Ok(ignore_tombstone_value(item));
741 }
742 }
743 } else {
744 for segment in run.iter() {
746 if !segment.is_key_in_key_range(key) {
747 continue;
748 }
749
750 if let Some(item) = segment.get(key, seqno, key_hash)? {
751 return Ok(ignore_tombstone_value(item));
752 }
753 }
754 }
755 }
756 }
757
758 Ok(None)
759 }
760
761 #[doc(hidden)]
762 pub fn get_internal_entry(
763 &self,
764 key: &[u8],
765 seqno: SeqNo,
766 ) -> crate::Result<Option<InternalValue>> {
767 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
770
771 if let Some(entry) = memtable_lock.get(key, seqno) {
772 return Ok(ignore_tombstone_value(entry));
773 }
774
775 drop(memtable_lock);
776
777 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
779 return Ok(ignore_tombstone_value(entry));
780 }
781
782 self.get_internal_entry_from_segments(key, seqno)
784 }
785
786 fn inner_compact(
787 &self,
788 strategy: Arc<dyn CompactionStrategy>,
789 seqno_threshold: SeqNo,
790 ) -> crate::Result<()> {
791 use crate::compaction::worker::{do_compaction, Options};
792
793 let mut opts = Options::from_tree(self, strategy);
794 opts.eviction_seqno = seqno_threshold;
795
796 do_compaction(&opts)?;
797
798 log::debug!("Compaction run over");
799
800 Ok(())
801 }
802
803 #[doc(hidden)]
804 #[must_use]
805 pub fn create_iter(
806 &self,
807 seqno: SeqNo,
808 ephemeral: Option<Arc<Memtable>>,
809 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
810 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
811 }
812
813 #[doc(hidden)]
814 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
815 &'a self,
816 range: &'a R,
817 seqno: SeqNo,
818 ephemeral: Option<Arc<Memtable>>,
819 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
820 use crate::range::{IterState, TreeIter};
821 use std::ops::Bound::{self, Excluded, Included, Unbounded};
822
823 let lo: Bound<UserKey> = match range.start_bound() {
824 Included(x) => Included(x.as_ref().into()),
825 Excluded(x) => Excluded(x.as_ref().into()),
826 Unbounded => Unbounded,
827 };
828
829 let hi: Bound<UserKey> = match range.end_bound() {
830 Included(x) => Included(x.as_ref().into()),
831 Excluded(x) => Excluded(x.as_ref().into()),
832 Unbounded => Unbounded,
833 };
834
835 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
836
837 log::trace!("range read: acquiring read locks");
839
840 let manifest = self.manifest.read().expect("lock is poisoned");
841
842 let iter_state = {
843 let active = self.active_memtable.read().expect("lock is poisoned");
844 let sealed = &self.sealed_memtables.read().expect("lock is poisoned");
845
846 IterState {
847 active: active.clone(),
848 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
849 ephemeral,
850 version: manifest.current_version().clone(),
851 }
852 };
853
854 TreeIter::create_range(iter_state, bounds, seqno, &manifest)
855 }
856
857 #[doc(hidden)]
858 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
859 &self,
860 range: &'a R,
861 seqno: SeqNo,
862 ephemeral: Option<Arc<Memtable>>,
863 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
864 self.create_internal_range(range, seqno, ephemeral)
865 .map(|item| match item {
866 Ok(kv) => Ok((kv.key.user_key, kv.value)),
867 Err(e) => Err(e),
868 })
869 }
870
871 #[doc(hidden)]
872 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
873 &self,
874 prefix: K,
875 seqno: SeqNo,
876 ephemeral: Option<Arc<Memtable>>,
877 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
878 use crate::range::prefix_to_range;
879
880 let range = prefix_to_range(prefix.as_ref());
881 self.create_range(&range, seqno, ephemeral)
882 }
883
884 #[doc(hidden)]
888 #[must_use]
889 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
890 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
891 memtable_lock.insert(value)
892 }
893
894 fn recover(mut config: Config) -> crate::Result<Self> {
900 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
901 use inner::get_next_tree_id;
902
903 log::info!("Recovering LSM-tree at {}", config.path.display());
904
905 let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
906 let mut bytes = Cursor::new(bytes);
907 let manifest = Manifest::decode_from(&mut bytes)?;
908
909 if manifest.version != FormatVersion::V3 {
910 return Err(crate::Error::InvalidVersion(manifest.version));
911 }
912
913 config.level_count = manifest.level_count;
915 config.tree_type = manifest.tree_type;
916
917 let tree_id = get_next_tree_id();
918
919 #[cfg(feature = "metrics")]
920 let metrics = Arc::new(Metrics::default());
921
922 let levels = Self::recover_levels(
923 &config.path,
924 tree_id,
925 &config.cache,
926 &config.descriptor_table,
927 #[cfg(feature = "metrics")]
928 &metrics,
929 )?;
930
931 let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
932
933 let inner = TreeInner {
934 id: tree_id,
935 segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
936 active_memtable: Arc::default(),
937 sealed_memtables: Arc::default(),
938 manifest: Arc::new(RwLock::new(levels)),
939 stop_signal: StopSignal::default(),
940 config,
941 major_compaction_lock: RwLock::default(),
942 #[cfg(feature = "metrics")]
943 metrics,
944 };
945
946 Ok(Self(Arc::new(inner)))
947 }
948
949 fn create_new(config: Config) -> crate::Result<Self> {
951 use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
952 use std::fs::{create_dir_all, File};
953
954 let path = config.path.clone();
955 log::trace!("Creating LSM-tree at {}", path.display());
956
957 create_dir_all(&path)?;
958
959 let manifest_path = path.join(MANIFEST_FILE);
960 assert!(!manifest_path.try_exists()?);
961
962 let segment_folder_path = path.join(SEGMENTS_FOLDER);
963 create_dir_all(&segment_folder_path)?;
964
965 let mut file = File::create_new(manifest_path)?;
968 Manifest {
969 version: FormatVersion::V3,
970 level_count: config.level_count,
971 tree_type: config.tree_type,
972 }
974 .encode_into(&mut file)?;
975 file.sync_all()?;
976
977 fsync_directory(&segment_folder_path)?;
979 fsync_directory(&path)?;
980
981 let inner = TreeInner::create_new(config)?;
982 Ok(Self(Arc::new(inner)))
983 }
984
985 fn recover_levels<P: AsRef<Path>>(
987 tree_path: P,
988 tree_id: TreeId,
989 cache: &Arc<Cache>,
990 descriptor_table: &Arc<DescriptorTable>,
991 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
992 ) -> crate::Result<LevelManifest> {
993 use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId};
994
995 let tree_path = tree_path.as_ref();
996
997 let recovery = LevelManifest::recover_ids(tree_path)?;
998
999 let segment_id_map = {
1000 let mut result: crate::HashMap<SegmentId, u8 > =
1001 crate::HashMap::default();
1002
1003 for (level_idx, segment_ids) in recovery.segment_ids.iter().enumerate() {
1004 for run in segment_ids {
1005 for segment_id in run {
1006 #[allow(clippy::expect_used)]
1008 result.insert(
1009 *segment_id,
1010 level_idx
1011 .try_into()
1012 .expect("there are less than 256 levels"),
1013 );
1014 }
1015 }
1016 }
1017
1018 result
1019 };
1020
1021 let cnt = segment_id_map.len();
1022
1023 log::debug!(
1024 "Recovering {cnt} disk segments from {}",
1025 tree_path.display(),
1026 );
1027
1028 let progress_mod = match cnt {
1029 _ if cnt <= 20 => 1,
1030 _ if cnt <= 100 => 10,
1031 _ => 100,
1032 };
1033
1034 let mut segments = vec![];
1035
1036 let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
1037
1038 if !segment_base_folder.try_exists()? {
1039 std::fs::create_dir_all(&segment_base_folder)?;
1040 fsync_directory(&segment_base_folder)?;
1041 }
1042
1043 for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
1044 let dirent = dirent?;
1045 let file_name = dirent.file_name();
1046
1047 if file_name == ".DS_Store" {
1049 continue;
1050 }
1051
1052 if file_name.to_string_lossy().starts_with("._") {
1054 continue;
1055 }
1056
1057 let segment_file_name = file_name.to_str().ok_or_else(|| {
1058 log::error!("invalid segment file name {file_name:?}");
1059 crate::Error::Unrecoverable
1060 })?;
1061
1062 let segment_file_path = dirent.path();
1063 assert!(!segment_file_path.is_dir());
1064
1065 log::debug!("Recovering segment from {}", segment_file_path.display());
1066
1067 let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
1068 log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
1069 crate::Error::Unrecoverable
1070 })?;
1071
1072 if let Some(&level_idx) = segment_id_map.get(&segment_id) {
1073 let segment = Segment::recover(
1074 segment_file_path,
1075 tree_id,
1076 cache.clone(),
1077 descriptor_table.clone(),
1078 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1081 metrics.clone(),
1082 )?;
1083
1084 log::debug!("Recovered segment from {:?}", segment.path);
1085
1086 segments.push(segment);
1087
1088 if idx % progress_mod == 0 {
1089 log::debug!("Recovered {idx}/{cnt} disk segments");
1090 }
1091 } else {
1092 log::debug!(
1093 "Deleting unfinished segment: {}",
1094 segment_file_path.display(),
1095 );
1096 std::fs::remove_file(&segment_file_path)?;
1097 }
1098 }
1099
1100 if segments.len() < cnt {
1101 log::error!(
1102 "Recovered less segments than expected: {:?}",
1103 segment_id_map.keys(),
1104 );
1105 return Err(crate::Error::Unrecoverable);
1106 }
1107
1108 log::debug!("Successfully recovered {} segments", segments.len());
1109
1110 let blob_files = crate::vlog::recover_blob_files(
1111 &tree_path.join(BLOBS_FOLDER),
1112 &recovery.blob_file_ids,
1113 )?;
1114
1115 LevelManifest::recover(tree_path, &recovery, &segments, &blob_files)
1116 }
1117}