1pub(crate) mod ingest;
6pub mod inner;
7
8use crate::{
9 cache::Cache,
10 coding::{Decode, Encode},
11 compaction::CompactionStrategy,
12 config::Config,
13 descriptor_table::FileDescriptorTable,
14 level_manifest::LevelManifest,
15 manifest::Manifest,
16 memtable::Memtable,
17 segment::{
18 block_index::{full_index::FullBlockIndex, BlockIndexImpl},
19 meta::TableType,
20 Segment, SegmentInner,
21 },
22 value::InternalValue,
23 version::Version,
24 AbstractTree, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
27use std::{
28 io::Cursor,
29 ops::RangeBounds,
30 path::Path,
31 sync::atomic::AtomicBool,
32 sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
33};
34
35fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
36 if item.is_tombstone() {
37 None
38 } else {
39 Some(item)
40 }
41}
42
43#[derive(Clone)]
45pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
46
47impl std::ops::Deref for Tree {
48 type Target = TreeInner;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl AbstractTree for Tree {
56 fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
57 use crate::tree::ingest::Ingestion;
58 use std::time::Instant;
59
60 let lock = self.lock_active_memtable();
62 assert!(
63 lock.is_empty(),
64 "can only perform bulk_ingest on empty trees",
65 );
66
67 let mut writer = Ingestion::new(self)?;
68
69 let start = Instant::now();
70 let mut count = 0;
71 let mut last_key = None;
72
73 for (key, value) in iter {
74 if let Some(last_key) = &last_key {
75 assert!(
76 key > last_key,
77 "next key in bulk ingest was not greater than last key",
78 );
79 }
80 last_key = Some(key.clone());
81
82 writer.write(key, value)?;
83
84 count += 1;
85 }
86
87 writer.finish()?;
88
89 log::info!("Ingested {count} items in {:?}", start.elapsed());
90
91 Ok(())
92 }
93
94 #[doc(hidden)]
95 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
96 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
97
98 let _lock = self
100 .0
101 .major_compaction_lock
102 .write()
103 .expect("lock is poisoned");
104
105 log::info!("Starting major compaction");
106 self.inner_compact(strategy, seqno_threshold)
107 }
108
109 fn l0_run_count(&self) -> usize {
110 let lock = self.levels.read().expect("lock is poisoned");
111
112 let first_level = lock
113 .levels
114 .first()
115 .expect("first level should always exist");
116
117 if first_level.is_disjoint {
118 1
119 } else {
120 first_level.segments.len()
124 }
125 }
126
127 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
128 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
129 }
130
131 fn bloom_filter_size(&self) -> usize {
132 self.levels
133 .read()
134 .expect("lock is poisoned")
135 .iter()
136 .map(super::segment::Segment::bloom_filter_size)
137 .sum()
138 }
139
140 fn sealed_memtable_count(&self) -> usize {
141 self.sealed_memtables
142 .read()
143 .expect("lock is poisoned")
144 .len()
145 }
146
147 fn verify(&self) -> crate::Result<usize> {
148 let _lock = self.lock_active_memtable();
150
151 let mut sum = 0;
152
153 let level_manifest = self.levels.read().expect("lock is poisoned");
154
155 for level in &level_manifest.levels {
156 for segment in &level.segments {
157 sum += segment.verify()?;
158 }
159 }
160
161 Ok(sum)
162 }
163
164 fn keys(
165 &self,
166 seqno: Option<SeqNo>,
167 index: Option<Arc<Memtable>>,
168 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
169 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
170 }
171
172 fn values(
173 &self,
174 seqno: Option<SeqNo>,
175 index: Option<Arc<Memtable>>,
176 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
177 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
178 }
179
180 fn flush_memtable(
181 &self,
182 segment_id: SegmentId,
183 memtable: &Arc<Memtable>,
184 seqno_threshold: SeqNo,
185 ) -> crate::Result<Option<Segment>> {
186 use crate::{
187 compaction::stream::CompactionStream,
188 file::SEGMENTS_FOLDER,
189 segment::writer::{Options, Writer},
190 };
191 use std::time::Instant;
192
193 let start = Instant::now();
194
195 let folder = self.config.path.join(SEGMENTS_FOLDER);
196 log::debug!("writing segment to {folder:?}");
197
198 let mut segment_writer = Writer::new(Options {
199 segment_id,
200 folder,
201 data_block_size: self.config.data_block_size,
202 index_block_size: self.config.index_block_size,
203 })?
204 .use_compression(self.config.compression);
205
206 {
207 use crate::segment::writer::BloomConstructionPolicy;
208
209 if self.config.bloom_bits_per_key >= 0 {
210 segment_writer =
211 segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
212 } else {
213 segment_writer =
214 segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
215 }
216 }
217
218 let iter = memtable.iter().map(Ok);
219 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
220
221 for item in compaction_filter {
222 segment_writer.write(item?)?;
223 }
224
225 let result = self.consume_writer(segment_id, segment_writer)?;
226
227 log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
228
229 Ok(result)
230 }
231
232 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
233 log::trace!("register: Acquiring levels manifest write lock");
235 let mut original_levels = self.levels.write().expect("lock is poisoned");
236 log::trace!("register: Acquired levels manifest write lock");
237
238 log::trace!("register: Acquiring sealed memtables write lock");
240 let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
241 log::trace!("register: Acquired sealed memtables write lock");
242
243 original_levels.atomic_swap(|recipe| {
244 for segment in segments.iter().cloned() {
245 recipe
246 .first_mut()
247 .expect("first level should exist")
248 .insert(segment);
249 }
250 })?;
251
252 for segment in segments {
255 log::trace!("releasing sealed memtable {}", segment.id());
256 sealed_memtables.remove(segment.id());
257 }
258
259 Ok(())
260 }
261
262 fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
263 self.active_memtable.write().expect("lock is poisoned")
264 }
265
266 fn clear_active_memtable(&self) {
267 *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
268 }
269
270 fn set_active_memtable(&self, memtable: Memtable) {
271 let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
272 *memtable_lock = Arc::new(memtable);
273 }
274
275 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
276 let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
277 memtable_lock.add(id, memtable);
278 }
279
280 fn compact(
281 &self,
282 strategy: Arc<dyn CompactionStrategy>,
283 seqno_threshold: SeqNo,
284 ) -> crate::Result<()> {
285 let _lock = self
289 .0
290 .major_compaction_lock
291 .read()
292 .expect("lock is poisoned");
293
294 self.inner_compact(strategy, seqno_threshold)
295 }
296
297 fn get_next_segment_id(&self) -> SegmentId {
298 self.0.get_next_segment_id()
299 }
300
301 fn tree_config(&self) -> &Config {
302 &self.config
303 }
304
305 fn active_memtable_size(&self) -> u32 {
306 use std::sync::atomic::Ordering::Acquire;
307
308 self.active_memtable
309 .read()
310 .expect("lock is poisoned")
311 .approximate_size
312 .load(Acquire)
313 }
314
315 fn tree_type(&self) -> crate::TreeType {
316 crate::TreeType::Standard
317 }
318
319 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
320 log::trace!("rotate: acquiring active memtable write lock");
321 let mut active_memtable = self.lock_active_memtable();
322
323 log::trace!("rotate: acquiring sealed memtables write lock");
324 let mut sealed_memtables = self.lock_sealed_memtables();
325
326 if active_memtable.is_empty() {
327 return None;
328 }
329
330 let yanked_memtable = std::mem::take(&mut *active_memtable);
331 let yanked_memtable = yanked_memtable;
332
333 let tmp_memtable_id = self.get_next_segment_id();
334 sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
335
336 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
337
338 Some((tmp_memtable_id, yanked_memtable))
339 }
340
341 fn segment_count(&self) -> usize {
342 self.levels.read().expect("lock is poisoned").len()
343 }
344
345 fn level_segment_count(&self, idx: usize) -> Option<usize> {
346 self.levels
347 .read()
348 .expect("lock is poisoned")
349 .levels
350 .get(idx)
351 .map(|x| x.len())
352 }
353
354 #[allow(clippy::significant_drop_tightening)]
355 fn approximate_len(&self) -> usize {
356 let levels = self.levels.read().expect("lock is poisoned");
358 let memtable = self.active_memtable.read().expect("lock is poisoned");
359 let sealed = self.sealed_memtables.read().expect("lock is poisoned");
360
361 let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
362 let memtable_count = memtable.len() as u64;
363 let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
364
365 (memtable_count + sealed_count + segments_item_count)
366 .try_into()
367 .expect("should not be too large")
368 }
369
370 fn disk_space(&self) -> u64 {
371 let levels = self.levels.read().expect("lock is poisoned");
372 levels.iter().map(|x| x.metadata.file_size).sum()
373 }
374
375 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
376 let active = self
377 .active_memtable
378 .read()
379 .expect("lock is poisoned")
380 .get_highest_seqno();
381
382 let sealed = self
383 .sealed_memtables
384 .read()
385 .expect("Lock is poisoned")
386 .iter()
387 .map(|(_, table)| table.get_highest_seqno())
388 .max()
389 .flatten();
390
391 active.max(sealed)
392 }
393
394 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
395 let levels = self.levels.read().expect("lock is poisoned");
396 levels
397 .iter()
398 .map(super::segment::Segment::get_highest_seqno)
399 .max()
400 }
401
402 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
403 use crate::AnyTree::Standard;
404
405 Snapshot::new(Standard(self.clone()), seqno)
406 }
407
408 fn get<K: AsRef<[u8]>>(
409 &self,
410 key: K,
411 seqno: Option<SeqNo>,
412 ) -> crate::Result<Option<UserValue>> {
413 Ok(self
414 .get_internal_entry(key.as_ref(), seqno)?
415 .map(|x| x.value))
416 }
417
418 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
419 &self,
420 range: R,
421 seqno: Option<SeqNo>,
422 index: Option<Arc<Memtable>>,
423 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
424 Box::new(self.create_range(&range, seqno, index))
425 }
426
427 fn prefix<K: AsRef<[u8]>>(
428 &self,
429 prefix: K,
430 seqno: Option<SeqNo>,
431 index: Option<Arc<Memtable>>,
432 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
433 Box::new(self.create_prefix(prefix, seqno, index))
434 }
435
436 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
437 &self,
438 key: K,
439 value: V,
440 seqno: SeqNo,
441 ) -> (u32, u32) {
442 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
443 self.append_entry(value)
444 }
445
446 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
447 let value = InternalValue::new_tombstone(key, seqno);
448 self.append_entry(value)
449 }
450
451 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
452 let value = InternalValue::new_weak_tombstone(key, seqno);
453 self.append_entry(value)
454 }
455}
456
457impl Tree {
458 pub(crate) fn open(config: Config) -> crate::Result<Self> {
471 use crate::file::MANIFEST_FILE;
472
473 log::debug!("Opening LSM-tree at {:?}", config.path);
474
475 if config.path.join("version").try_exists()? {
477 return Err(crate::Error::InvalidVersion(Version::V1));
478 }
479
480 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
481 Self::recover(config)
482 } else {
483 Self::create_new(config)
484 }?;
485
486 Ok(tree)
487 }
488
489 pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
490 self.active_memtable.read().expect("lock is poisoned")
491 }
492
493 pub(crate) fn consume_writer(
494 &self,
495 segment_id: SegmentId,
496 mut writer: crate::segment::writer::Writer,
497 ) -> crate::Result<Option<Segment>> {
498 let segment_folder = writer.opts.folder.clone();
499 let segment_file_path = segment_folder.join(segment_id.to_string());
500
501 let Some(trailer) = writer.finish()? else {
502 return Ok(None);
503 };
504
505 log::debug!("Finalized segment write at {segment_folder:?}");
506
507 let block_index =
508 FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
509 let block_index = Arc::new(BlockIndexImpl::Full(block_index));
510
511 let created_segment: Segment = SegmentInner {
512 path: segment_file_path.to_path_buf(),
513
514 tree_id: self.id,
515
516 metadata: trailer.metadata,
517 offsets: trailer.offsets,
518
519 descriptor_table: self.config.descriptor_table.clone(),
520 block_index,
521 cache: self.config.cache.clone(),
522
523 bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
524
525 is_deleted: AtomicBool::default(),
526 }
527 .into();
528
529 self.config
530 .descriptor_table
531 .insert(segment_file_path, created_segment.global_id());
532
533 log::debug!("Flushed segment to {segment_folder:?}");
534
535 Ok(Some(created_segment))
536 }
537
538 #[doc(hidden)]
549 pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
550 log::debug!("Flushing active memtable");
551
552 let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
553 return Ok(None);
554 };
555
556 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
557 else {
558 return Ok(None);
559 };
560 self.register_segments(&[segment.clone()])?;
561
562 Ok(Some(segment))
563 }
564
565 #[doc(hidden)]
567 #[must_use]
568 pub fn is_compacting(&self) -> bool {
569 let levels = self.levels.read().expect("lock is poisoned");
570 levels.is_compacting()
571 }
572
573 fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
575 self.sealed_memtables.write().expect("lock is poisoned")
576 }
577
578 pub(crate) fn get_internal_entry_with_memtable(
581 &self,
582 memtable_lock: &Memtable,
583 key: &[u8],
584 seqno: Option<SeqNo>,
585 ) -> crate::Result<Option<InternalValue>> {
586 if let Some(entry) = memtable_lock.get(key, seqno) {
587 return Ok(ignore_tombstone_value(entry));
588 };
589
590 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
592 return Ok(ignore_tombstone_value(entry));
593 }
594
595 self.get_internal_entry_from_segments(key, seqno)
596 }
597
598 fn get_internal_entry_from_sealed_memtables(
599 &self,
600 key: &[u8],
601 seqno: Option<SeqNo>,
602 ) -> Option<InternalValue> {
603 let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
604
605 for (_, memtable) in memtable_lock.iter().rev() {
606 if let Some(entry) = memtable.get(key, seqno) {
607 return Some(entry);
608 }
609 }
610
611 None
612 }
613
614 fn get_internal_entry_from_segments(
615 &self,
616 key: &[u8],
617 seqno: Option<SeqNo>,
618 ) -> crate::Result<Option<InternalValue>> {
619 let key_hash = crate::bloom::BloomFilter::get_hash(key);
622
623 let level_manifest = self.levels.read().expect("lock is poisoned");
624
625 for level in &level_manifest.levels {
626 if level.len() >= 4 {
628 if let Some(level) = level.as_disjoint() {
629 if let Some(segment) = level.get_segment_containing_key(key) {
630 if let Some(item) = segment.get(key, seqno, key_hash)? {
631 return Ok(ignore_tombstone_value(item));
632 }
633 }
634
635 continue;
637 }
638 }
639
640 for segment in &level.segments {
642 if !segment.is_key_in_key_range(key) {
643 continue;
644 }
645
646 if let Some(item) = segment.get(key, seqno, key_hash)? {
647 return Ok(ignore_tombstone_value(item));
648 }
649 }
650 }
651
652 Ok(None)
653 }
654
655 #[doc(hidden)]
656 pub fn get_internal_entry(
657 &self,
658 key: &[u8],
659 seqno: Option<SeqNo>,
660 ) -> crate::Result<Option<InternalValue>> {
661 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
664
665 if let Some(entry) = memtable_lock.get(key, seqno) {
666 return Ok(ignore_tombstone_value(entry));
667 };
668
669 drop(memtable_lock);
670
671 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
673 return Ok(ignore_tombstone_value(entry));
674 }
675
676 self.get_internal_entry_from_segments(key, seqno)
678 }
679
680 fn inner_compact(
681 &self,
682 strategy: Arc<dyn CompactionStrategy>,
683 seqno_threshold: SeqNo,
684 ) -> crate::Result<()> {
685 use crate::compaction::worker::{do_compaction, Options};
686
687 let mut opts = Options::from_tree(self, strategy);
688 opts.eviction_seqno = seqno_threshold;
689
690 do_compaction(&opts)?;
691
692 log::debug!("Compaction run over");
693
694 Ok(())
695 }
696
697 #[doc(hidden)]
698 #[must_use]
699 pub fn create_iter(
700 &self,
701 seqno: Option<SeqNo>,
702 ephemeral: Option<Arc<Memtable>>,
703 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
704 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
705 }
706
707 #[doc(hidden)]
708 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
709 &'a self,
710 range: &'a R,
711 seqno: Option<SeqNo>,
712 ephemeral: Option<Arc<Memtable>>,
713 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
714 use crate::range::{IterState, TreeIter};
715 use std::ops::Bound::{self, Excluded, Included, Unbounded};
716
717 let lo: Bound<UserKey> = match range.start_bound() {
718 Included(x) => Included(x.as_ref().into()),
719 Excluded(x) => Excluded(x.as_ref().into()),
720 Unbounded => Unbounded,
721 };
722
723 let hi: Bound<UserKey> = match range.end_bound() {
724 Included(x) => Included(x.as_ref().into()),
725 Excluded(x) => Excluded(x.as_ref().into()),
726 Unbounded => Unbounded,
727 };
728
729 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
730
731 log::trace!("range read: acquiring levels manifest read lock");
732 let level_manifest =
734 guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
735 log::trace!("range read: acquired level manifest read lock");
736
737 log::trace!("range read: acquiring active memtable read lock");
738 let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
739 .expect("lock is poisoned");
740 log::trace!("range read: acquired active memtable read lock");
741
742 log::trace!("range read: acquiring sealed memtable read lock");
743 let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
744 .expect("lock is poisoned");
745 log::trace!("range read: acquired sealed memtable read lock");
746
747 let iter_state = IterState {
748 active: active.clone(),
749 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
750 ephemeral,
751 levels: level_manifest.levels.clone(),
752 };
753
754 TreeIter::create_range(iter_state, bounds, seqno, level_manifest)
755 }
756
757 #[doc(hidden)]
758 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
759 &'a self,
760 range: &'a R,
761 seqno: Option<SeqNo>,
762 ephemeral: Option<Arc<Memtable>>,
763 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
764 self.create_internal_range(range, seqno, ephemeral)
765 .map(|item| match item {
766 Ok(kv) => Ok((kv.key.user_key, kv.value)),
767 Err(e) => Err(e),
768 })
769 }
770
771 #[doc(hidden)]
772 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
773 &'a self,
774 prefix: K,
775 seqno: Option<SeqNo>,
776 ephemeral: Option<Arc<Memtable>>,
777 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
778 use crate::range::prefix_to_range;
779
780 let range = prefix_to_range(prefix.as_ref());
781 self.create_range(&range, seqno, ephemeral)
782 }
783
784 #[doc(hidden)]
788 #[must_use]
789 pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
790 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
791 memtable_lock.insert(value)
792 }
793
794 fn recover(mut config: Config) -> crate::Result<Self> {
800 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
801 use inner::get_next_tree_id;
802
803 log::info!("Recovering LSM-tree at {:?}", config.path);
804
805 let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
806 let mut bytes = Cursor::new(bytes);
807 let manifest = Manifest::decode_from(&mut bytes)?;
808
809 if manifest.version != Version::V2 {
810 return Err(crate::Error::InvalidVersion(manifest.version));
811 }
812
813 config.level_count = manifest.level_count;
815 config.table_type = manifest.table_type;
816 config.tree_type = manifest.tree_type;
817
818 let tree_id = get_next_tree_id();
819
820 let mut levels = Self::recover_levels(
821 &config.path,
822 tree_id,
823 &config.cache,
824 &config.descriptor_table,
825 )?;
826 levels.update_metadata();
827
828 let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
829
830 let inner = TreeInner {
831 id: tree_id,
832 segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
833 active_memtable: Arc::default(),
834 sealed_memtables: Arc::default(),
835 levels: Arc::new(RwLock::new(levels)),
836 stop_signal: StopSignal::default(),
837 config,
838 major_compaction_lock: RwLock::default(),
839 };
840
841 Ok(Self(Arc::new(inner)))
842 }
843
844 fn create_new(config: Config) -> crate::Result<Self> {
846 use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
847 use std::fs::{create_dir_all, File};
848
849 let path = config.path.clone();
850 log::trace!("Creating LSM-tree at {path:?}");
851
852 create_dir_all(&path)?;
853
854 let manifest_path = path.join(MANIFEST_FILE);
855 assert!(!manifest_path.try_exists()?);
856
857 let segment_folder_path = path.join(SEGMENTS_FOLDER);
858 create_dir_all(&segment_folder_path)?;
859
860 let mut file = File::create(manifest_path)?;
863 Manifest {
864 version: Version::V2,
865 level_count: config.level_count,
866 tree_type: config.tree_type,
867 table_type: TableType::Block,
868 }
869 .encode_into(&mut file)?;
870 file.sync_all()?;
871
872 fsync_directory(&segment_folder_path)?;
874 fsync_directory(&path)?;
875
876 let inner = TreeInner::create_new(config)?;
877 Ok(Self(Arc::new(inner)))
878 }
879
880 fn recover_levels<P: AsRef<Path>>(
882 tree_path: P,
883 tree_id: TreeId,
884 cache: &Arc<Cache>,
885 descriptor_table: &Arc<FileDescriptorTable>,
886 ) -> crate::Result<LevelManifest> {
887 use crate::{
888 file::fsync_directory,
889 file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
890 SegmentId,
891 };
892
893 let tree_path = tree_path.as_ref();
894
895 let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
896 log::info!("Recovering manifest at {level_manifest_path:?}");
897
898 let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
899 let cnt = segment_id_map.len();
900
901 log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
902
903 let progress_mod = match cnt {
904 _ if cnt <= 20 => 1,
905 _ if cnt <= 100 => 10,
906 _ => 100,
907 };
908
909 let mut segments = vec![];
910
911 let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
912
913 if !segment_base_folder.try_exists()? {
914 std::fs::create_dir_all(&segment_base_folder)?;
915 fsync_directory(&segment_base_folder)?;
916 }
917
918 for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
919 let dirent = dirent?;
920
921 let file_name = dirent.file_name();
922
923 if file_name == ".DS_Store" {
924 continue;
925 }
926
927 let segment_file_name = file_name.to_str().ok_or_else(|| {
928 log::error!("invalid segment file name {file_name:?}");
929 crate::Error::Unrecoverable
930 })?;
931
932 let segment_file_path = dirent.path();
933 assert!(!segment_file_path.is_dir());
934
935 log::debug!("Recovering segment from {segment_file_path:?}");
936
937 let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
938 log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
939 crate::Error::Unrecoverable
940 })?;
941
942 if let Some(&level_idx) = segment_id_map.get(&segment_id) {
943 let segment = Segment::recover(
944 &segment_file_path,
945 tree_id,
946 cache.clone(),
947 descriptor_table.clone(),
948 level_idx == 0 || level_idx == 1,
949 )?;
950
951 descriptor_table.insert(&segment_file_path, segment.global_id());
952
953 segments.push(segment);
954 log::debug!("Recovered segment from {segment_file_path:?}");
955
956 if idx % progress_mod == 0 {
957 log::debug!("Recovered {idx}/{cnt} disk segments");
958 }
959 } else {
960 log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
961 std::fs::remove_file(&segment_file_path)?;
962 }
963 }
964
965 if segments.len() < cnt {
966 log::error!(
967 "Recovered less segments than expected: {:?}",
968 segment_id_map.keys(),
969 );
970 return Err(crate::Error::Unrecoverable);
971 }
972
973 log::debug!("Successfully recovered {} segments", segments.len());
974
975 LevelManifest::recover(&level_manifest_path, segments)
976 }
977}