1pub mod inner;
6
7use crate::{
8 coding::{Decode, Encode},
9 compaction::{stream::CompactionStream, CompactionStrategy},
10 config::Config,
11 descriptor_table::FileDescriptorTable,
12 level_manifest::LevelManifest,
13 manifest::Manifest,
14 memtable::Memtable,
15 range::{prefix_to_range, MemtableLockGuard, TreeIter},
16 segment::{
17 block_index::{full_index::FullBlockIndex, BlockIndexImpl},
18 meta::TableType,
19 Segment, SegmentInner,
20 },
21 stop_signal::StopSignal,
22 value::InternalValue,
23 version::Version,
24 AbstractTree, BlockCache, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
27use std::{
28 io::Cursor,
29 ops::RangeBounds,
30 path::Path,
31 sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
32};
33
34fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
35 if item.is_tombstone() {
36 None
37 } else {
38 Some(item)
39 }
40}
41
42#[derive(Clone)]
44pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
45
46impl std::ops::Deref for Tree {
47 type Target = TreeInner;
48
49 fn deref(&self) -> &Self::Target {
50 &self.0
51 }
52}
53
54impl AbstractTree for Tree {
55 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
56 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
57 }
58
59 fn bloom_filter_size(&self) -> usize {
60 self.levels
61 .read()
62 .expect("lock is poisoned")
63 .iter()
64 .map(|x| x.bloom_filter_size())
65 .sum()
66 }
67
68 fn sealed_memtable_count(&self) -> usize {
69 self.sealed_memtables
70 .read()
71 .expect("lock is poisoned")
72 .len()
73 }
74
75 fn is_first_level_disjoint(&self) -> bool {
76 self.levels
77 .read()
78 .expect("lock is poisoned")
79 .levels
80 .first()
81 .expect("first level should exist")
82 .is_disjoint
83 }
84
85 fn verify(&self) -> crate::Result<usize> {
86 let _lock = self.lock_active_memtable();
88
89 let mut sum = 0;
90
91 let level_manifest = self.levels.read().expect("lock is poisoned");
92
93 for level in &level_manifest.levels {
94 for segment in &level.segments {
95 sum += segment.verify()?;
96 }
97 }
98
99 Ok(sum)
100 }
101
102 fn keys(
103 &self,
104 seqno: Option<SeqNo>,
105 index: Option<Arc<Memtable>>,
106 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
107 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
108 }
109
110 fn values(
111 &self,
112 seqno: Option<SeqNo>,
113 index: Option<Arc<Memtable>>,
114 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
115 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
116 }
117
118 fn flush_memtable(
119 &self,
120 segment_id: SegmentId,
121 memtable: &Arc<Memtable>,
122 seqno_threshold: SeqNo,
123 ) -> crate::Result<Option<Segment>> {
124 use crate::{
125 file::SEGMENTS_FOLDER,
126 segment::writer::{Options, Writer},
127 };
128
129 let start = std::time::Instant::now();
130
131 let folder = self.config.path.join(SEGMENTS_FOLDER);
132 log::debug!("writing segment to {folder:?}");
133
134 let mut segment_writer = Writer::new(Options {
135 segment_id,
136 folder,
137 data_block_size: self.config.data_block_size,
138 index_block_size: self.config.index_block_size,
139 })?
140 .use_compression(self.config.compression);
141
142 {
143 use crate::segment::writer::BloomConstructionPolicy;
144
145 if self.config.bloom_bits_per_key >= 0 {
146 segment_writer =
147 segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
148 } else {
149 segment_writer =
150 segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
151 }
152 }
153
154 let iter = memtable.iter().map(Ok);
155 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
156
157 for item in compaction_filter {
158 segment_writer.write(item?)?;
159 }
160
161 let result = self.consume_writer(segment_id, segment_writer)?;
162
163 log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
164
165 Ok(result)
166 }
167
168 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
169 log::trace!("Acquiring levels manifest write lock");
171 let mut original_levels = self.levels.write().expect("lock is poisoned");
172
173 log::trace!("Acquiring sealed memtables write lock");
175 let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
176
177 original_levels.atomic_swap(|recipe| {
178 for segment in segments.iter().cloned() {
179 recipe
180 .first_mut()
181 .expect("first level should exist")
182 .insert(segment);
183 }
184 })?;
185
186 for segment in segments {
189 log::trace!("releasing sealed memtable {}", segment.id());
190 sealed_memtables.remove(segment.id());
191 }
192
193 Ok(())
194 }
195
196 fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Memtable> {
197 self.active_memtable.write().expect("lock is poisoned")
198 }
199
200 fn set_active_memtable(&self, memtable: Memtable) {
201 let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
202 *memtable_lock = memtable;
203 }
204
205 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
206 let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
207 memtable_lock.add(id, memtable);
208 }
209
210 fn compact(
211 &self,
212 strategy: Arc<dyn CompactionStrategy>,
213 seqno_threshold: SeqNo,
214 ) -> crate::Result<()> {
215 use crate::compaction::worker::{do_compaction, Options};
216
217 let mut opts = Options::from_tree(self, strategy);
218 opts.eviction_seqno = seqno_threshold;
219 do_compaction(&opts)?;
220
221 log::debug!("lsm-tree: compaction run over");
222
223 Ok(())
224 }
225
226 fn get_next_segment_id(&self) -> SegmentId {
227 self.0.get_next_segment_id()
228 }
229
230 fn tree_config(&self) -> &Config {
231 &self.config
232 }
233
234 fn active_memtable_size(&self) -> u32 {
235 use std::sync::atomic::Ordering::Acquire;
236
237 self.active_memtable
238 .read()
239 .expect("lock is poisoned")
240 .approximate_size
241 .load(Acquire)
242 }
243
244 fn tree_type(&self) -> crate::TreeType {
245 crate::TreeType::Standard
246 }
247
248 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
249 log::trace!("rotate: acquiring active memtable write lock");
250 let mut active_memtable = self.lock_active_memtable();
251
252 log::trace!("rotate: acquiring sealed memtables write lock");
253 let mut sealed_memtables = self.lock_sealed_memtables();
254
255 if active_memtable.is_empty() {
256 return None;
257 }
258
259 let yanked_memtable = std::mem::take(&mut *active_memtable);
260 let yanked_memtable = Arc::new(yanked_memtable);
261
262 let tmp_memtable_id = self.get_next_segment_id();
263 sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
264
265 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
266
267 Some((tmp_memtable_id, yanked_memtable))
268 }
269
270 fn segment_count(&self) -> usize {
271 self.levels.read().expect("lock is poisoned").len()
272 }
273
274 fn first_level_segment_count(&self) -> usize {
275 self.levels
276 .read()
277 .expect("lock is poisoned")
278 .first_level_segment_count()
279 }
280
281 #[allow(clippy::significant_drop_tightening)]
282 fn approximate_len(&self) -> usize {
283 let levels = self.levels.read().expect("lock is poisoned");
285 let memtable = self.active_memtable.read().expect("lock is poisoned");
286 let sealed = self.sealed_memtables.read().expect("lock is poisoned");
287
288 let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
289 let memtable_count = memtable.len() as u64;
290 let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
291
292 (memtable_count + sealed_count + segments_item_count)
293 .try_into()
294 .expect("should not be too large")
295 }
296
297 fn disk_space(&self) -> u64 {
298 let levels = self.levels.read().expect("lock is poisoned");
299 levels.iter().map(|x| x.metadata.file_size).sum()
300 }
301
302 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
303 let active = self
304 .active_memtable
305 .read()
306 .expect("lock is poisoned")
307 .get_highest_seqno();
308
309 let sealed = self
310 .sealed_memtables
311 .read()
312 .expect("Lock is poisoned")
313 .iter()
314 .map(|(_, table)| table.get_highest_seqno())
315 .max()
316 .flatten();
317
318 active.max(sealed)
319 }
320
321 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
322 let levels = self.levels.read().expect("lock is poisoned");
323 levels
324 .iter()
325 .map(super::segment::Segment::get_highest_seqno)
326 .max()
327 }
328
329 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
330 use crate::AnyTree::Standard;
331
332 Snapshot::new(Standard(self.clone()), seqno)
333 }
334
335 fn get<K: AsRef<[u8]>>(
336 &self,
337 key: K,
338 seqno: Option<SeqNo>,
339 ) -> crate::Result<Option<UserValue>> {
340 Ok(self.get_internal_entry(key, seqno)?.map(|x| x.value))
341 }
342
343 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
344 &self,
345 range: R,
346 seqno: Option<SeqNo>,
347 index: Option<Arc<Memtable>>,
348 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
349 Box::new(self.create_range(&range, seqno, index))
350 }
351
352 fn prefix<K: AsRef<[u8]>>(
353 &self,
354 prefix: K,
355 seqno: Option<SeqNo>,
356 index: Option<Arc<Memtable>>,
357 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
358 Box::new(self.create_prefix(prefix, seqno, index))
359 }
360
361 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
362 &self,
363 key: K,
364 value: V,
365 seqno: SeqNo,
366 ) -> (u32, u32) {
367 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
368 self.append_entry(value)
369 }
370
371 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
372 let value = InternalValue::new_tombstone(key, seqno);
373 self.append_entry(value)
374 }
375
376 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
377 let value = InternalValue::new_weak_tombstone(key, seqno);
378 self.append_entry(value)
379 }
380}
381
382impl Tree {
383 pub(crate) fn open(config: Config) -> crate::Result<Self> {
396 use crate::file::MANIFEST_FILE;
397
398 log::debug!("Opening LSM-tree at {:?}", config.path);
399
400 if config.path.join("version").try_exists()? {
402 return Err(crate::Error::InvalidVersion(Version::V1));
403 }
404
405 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
406 Self::recover(config)
407 } else {
408 Self::create_new(config)
409 }?;
410
411 Ok(tree)
412 }
413
414 pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Memtable> {
415 self.active_memtable.read().expect("lock is poisoned")
416 }
417
418 #[doc(hidden)]
430 pub fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
431 log::info!("Starting major compaction");
432 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
433 self.compact(strategy, seqno_threshold)
434 }
435
436 pub(crate) fn consume_writer(
437 &self,
438 segment_id: SegmentId,
439 mut writer: crate::segment::writer::Writer,
440 ) -> crate::Result<Option<Segment>> {
441 let segment_folder = writer.opts.folder.clone();
442 let segment_file_path = segment_folder.join(segment_id.to_string());
443
444 let Some(trailer) = writer.finish()? else {
445 return Ok(None);
446 };
447
448 log::debug!("Finalized segment write at {segment_folder:?}");
449
450 let block_index =
451 FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
452 let block_index = Arc::new(BlockIndexImpl::Full(block_index));
453
454 let created_segment: Segment = SegmentInner {
455 tree_id: self.id,
456
457 metadata: trailer.metadata,
458 offsets: trailer.offsets,
459
460 descriptor_table: self.config.descriptor_table.clone(),
461 block_index,
462 block_cache: self.config.block_cache.clone(),
463
464 bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
465 }
466 .into();
467
468 self.config
469 .descriptor_table
470 .insert(segment_file_path, created_segment.global_id());
471
472 log::debug!("Flushed segment to {segment_folder:?}");
473
474 Ok(Some(created_segment))
475 }
476
477 #[doc(hidden)]
488 pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
489 log::debug!("Flushing active memtable");
490
491 let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
492 return Ok(None);
493 };
494
495 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
496 else {
497 return Ok(None);
498 };
499 self.register_segments(&[segment.clone()])?;
500
501 Ok(Some(segment))
502 }
503
504 #[doc(hidden)]
506 #[must_use]
507 pub fn is_compacting(&self) -> bool {
508 let levels = self.levels.read().expect("lock is poisoned");
509 levels.is_compacting()
510 }
511
512 fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
514 self.sealed_memtables.write().expect("lock is poisoned")
515 }
516
517 pub(crate) fn get_internal_entry_with_lock<K: AsRef<[u8]>>(
519 &self,
520 memtable_lock: &RwLockWriteGuard<'_, Memtable>,
521 key: K,
522 seqno: Option<SeqNo>,
523 ) -> crate::Result<Option<InternalValue>> {
524 if let Some(entry) = memtable_lock.get(&key, seqno) {
525 return Ok(ignore_tombstone_value(entry));
526 };
527
528 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
530 return Ok(ignore_tombstone_value(entry));
531 }
532
533 self.get_internal_entry_from_segments(key, seqno)
534 }
535
536 fn get_internal_entry_from_sealed_memtables<K: AsRef<[u8]>>(
537 &self,
538 key: K,
539 seqno: Option<SeqNo>,
540 ) -> Option<InternalValue> {
541 let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
542
543 for (_, memtable) in memtable_lock.iter().rev() {
544 if let Some(entry) = memtable.get(&key, seqno) {
545 return Some(entry);
546 }
547 }
548
549 None
550 }
551
552 fn get_internal_entry_from_segments<K: AsRef<[u8]>>(
553 &self,
554 key: K,
555 seqno: Option<SeqNo>,
556 ) -> crate::Result<Option<InternalValue>> {
557 let key_hash = crate::bloom::BloomFilter::get_hash(key.as_ref());
560
561 let level_manifest = self.levels.read().expect("lock is poisoned");
562
563 for level in &level_manifest.levels {
564 if level.len() >= 4 {
566 if let Some(level) = level.as_disjoint() {
567 if let Some(segment) = level.get_segment_containing_key(&key) {
573 let maybe_item = segment.get(&key, seqno, key_hash)?;
574
575 if let Some(item) = maybe_item {
576 return Ok(ignore_tombstone_value(item));
577 }
578 }
579
580 continue;
582 }
583 }
584
585 for segment in &level.segments {
587 let maybe_item = segment.get(&key, seqno, key_hash)?;
588
589 if let Some(item) = maybe_item {
590 return Ok(ignore_tombstone_value(item));
591 }
592 }
593 }
594
595 Ok(None)
596 }
597
598 #[doc(hidden)]
599 pub fn get_internal_entry<K: AsRef<[u8]>>(
600 &self,
601 key: K,
602 seqno: Option<SeqNo>,
603 ) -> crate::Result<Option<InternalValue>> {
604 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
607
608 if let Some(entry) = memtable_lock.get(&key, seqno) {
609 return Ok(ignore_tombstone_value(entry));
610 };
611
612 drop(memtable_lock);
613
614 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
616 return Ok(ignore_tombstone_value(entry));
617 }
618
619 self.get_internal_entry_from_segments(key, seqno)
621 }
622
623 #[doc(hidden)]
624 #[must_use]
625 pub fn create_iter(
626 &self,
627 seqno: Option<SeqNo>,
628 ephemeral: Option<Arc<Memtable>>,
629 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
630 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
631 }
632
633 #[doc(hidden)]
634 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
635 &'a self,
636 range: &'a R,
637 seqno: Option<SeqNo>,
638 ephemeral: Option<Arc<Memtable>>,
639 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
640 use std::ops::Bound::{self, Excluded, Included, Unbounded};
641
642 let lo: Bound<UserKey> = match range.start_bound() {
643 Included(x) => Included(x.as_ref().into()),
644 Excluded(x) => Excluded(x.as_ref().into()),
645 Unbounded => Unbounded,
646 };
647
648 let hi: Bound<UserKey> = match range.end_bound() {
649 Included(x) => Included(x.as_ref().into()),
650 Excluded(x) => Excluded(x.as_ref().into()),
651 Unbounded => Unbounded,
652 };
653
654 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
655
656 let level_manifest_lock =
658 guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
659
660 let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
661 .expect("lock is poisoned");
662
663 let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
664 .expect("lock is poisoned");
665
666 TreeIter::create_range(
667 MemtableLockGuard {
668 active,
669 sealed,
670 ephemeral,
671 },
672 bounds,
673 seqno,
674 level_manifest_lock,
675 )
676 }
677
678 #[doc(hidden)]
679 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
680 &'a self,
681 range: &'a R,
682 seqno: Option<SeqNo>,
683 ephemeral: Option<Arc<Memtable>>,
684 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
685 self.create_internal_range(range, seqno, ephemeral)
686 .map(|item| match item {
687 Ok(kv) => Ok((kv.key.user_key, kv.value)),
688 Err(e) => Err(e),
689 })
690 }
691
692 #[doc(hidden)]
693 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
694 &'a self,
695 prefix: K,
696 seqno: Option<SeqNo>,
697 ephemeral: Option<Arc<Memtable>>,
698 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
699 let range = prefix_to_range(prefix.as_ref());
700 self.create_range(&range, seqno, ephemeral)
701 }
702
703 #[doc(hidden)]
707 #[must_use]
708 pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
709 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
710 memtable_lock.insert(value)
711 }
712
713 fn recover(mut config: Config) -> crate::Result<Self> {
719 use crate::file::MANIFEST_FILE;
720 use inner::get_next_tree_id;
721
722 log::info!("Recovering LSM-tree at {:?}", config.path);
723
724 let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
725 let mut bytes = Cursor::new(bytes);
726 let manifest = Manifest::decode_from(&mut bytes)?;
727
728 if manifest.version != Version::V2 {
729 return Err(crate::Error::InvalidVersion(manifest.version));
730 }
731
732 config.level_count = manifest.level_count;
734 config.table_type = manifest.table_type;
735 config.tree_type = manifest.tree_type;
736
737 let tree_id = get_next_tree_id();
738
739 let mut levels = Self::recover_levels(
740 &config.path,
741 tree_id,
742 &config.block_cache,
743 &config.descriptor_table,
744 )?;
745 levels.update_metadata();
746
747 let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
748
749 let inner = TreeInner {
750 id: tree_id,
751 segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
752 active_memtable: Arc::default(),
753 sealed_memtables: Arc::default(),
754 levels: Arc::new(RwLock::new(levels)),
755 stop_signal: StopSignal::default(),
756 config,
757 };
758
759 Ok(Self(Arc::new(inner)))
760 }
761
762 fn create_new(config: Config) -> crate::Result<Self> {
764 use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
765 use std::fs::{create_dir_all, File};
766
767 let path = config.path.clone();
768 log::trace!("Creating LSM-tree at {path:?}");
769
770 create_dir_all(&path)?;
771
772 let manifest_path = path.join(MANIFEST_FILE);
773 assert!(!manifest_path.try_exists()?);
774
775 let segment_folder_path = path.join(SEGMENTS_FOLDER);
776 create_dir_all(&segment_folder_path)?;
777
778 let mut file = File::create(manifest_path)?;
781 Manifest {
782 version: Version::V2,
783 level_count: config.level_count,
784 tree_type: config.tree_type,
785 table_type: TableType::Block,
786 }
787 .encode_into(&mut file)?;
788 file.sync_all()?;
789
790 fsync_directory(&segment_folder_path)?;
792 fsync_directory(&path)?;
793
794 let inner = TreeInner::create_new(config)?;
795 Ok(Self(Arc::new(inner)))
796 }
797
798 fn recover_levels<P: AsRef<Path>>(
800 tree_path: P,
801 tree_id: TreeId,
802 block_cache: &Arc<BlockCache>,
803 descriptor_table: &Arc<FileDescriptorTable>,
804 ) -> crate::Result<LevelManifest> {
805 use crate::{
806 file::fsync_directory,
807 file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
808 SegmentId,
809 };
810
811 let tree_path = tree_path.as_ref();
812
813 let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
814 log::info!("Recovering manifest at {level_manifest_path:?}");
815
816 let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
817 let cnt = segment_id_map.len();
818
819 log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
820
821 let progress_mod = match cnt {
822 _ if cnt <= 20 => 1,
823 _ if cnt <= 100 => 10,
824 _ => 100,
825 };
826
827 let mut segments = vec![];
828
829 let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
830
831 if !segment_base_folder.try_exists()? {
832 std::fs::create_dir_all(&segment_base_folder)?;
833 fsync_directory(&segment_base_folder)?;
834 }
835
836 for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
837 let dirent = dirent?;
838
839 let file_name = dirent.file_name();
840
841 if file_name == ".DS_Store" {
842 continue;
843 }
844
845 let segment_file_name = file_name.to_str().ok_or_else(|| {
846 log::error!("invalid segment file name {file_name:?}");
847 crate::Error::Unrecoverable
848 })?;
849
850 let segment_file_path = dirent.path();
851 assert!(!segment_file_path.is_dir());
852
853 if segment_file_name.starts_with("tmp_") {
854 log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
855 std::fs::remove_file(&segment_file_path)?;
856 continue;
857 }
858
859 log::debug!("Recovering segment from {segment_file_path:?}");
860
861 let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
862 log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
863 crate::Error::Unrecoverable
864 })?;
865
866 if let Some(&level_idx) = segment_id_map.get(&segment_id) {
867 let segment = Segment::recover(
868 &segment_file_path,
869 tree_id,
870 block_cache.clone(),
871 descriptor_table.clone(),
872 level_idx == 0 || level_idx == 1,
873 )?;
874
875 descriptor_table.insert(&segment_file_path, segment.global_id());
876
877 segments.push(segment);
878 log::debug!("Recovered segment from {segment_file_path:?}");
879
880 if idx % progress_mod == 0 {
881 log::debug!("Recovered {idx}/{cnt} disk segments");
882 }
883 } else {
884 log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
885 std::fs::remove_file(&segment_file_path)?;
886 }
887 }
888
889 if segments.len() < cnt {
890 log::error!(
891 "Recovered less segments than expected: {:?}",
892 segment_id_map.keys(),
893 );
894 return Err(crate::Error::Unrecoverable);
895 }
896
897 log::debug!("Successfully recovered {} segments", segments.len());
898
899 LevelManifest::recover(&level_manifest_path, segments)
900 }
901}