1pub mod inner;
6
7use crate::{
8 coding::{Decode, Encode},
9 compaction::CompactionStrategy,
10 config::Config,
11 descriptor_table::FileDescriptorTable,
12 level_manifest::LevelManifest,
13 manifest::Manifest,
14 memtable::Memtable,
15 segment::{
16 block_index::{full_index::FullBlockIndex, BlockIndexImpl},
17 meta::TableType,
18 Segment, SegmentInner,
19 },
20 value::InternalValue,
21 version::Version,
22 AbstractTree, BlockCache, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
23};
24use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
25use std::{
26 io::Cursor,
27 ops::RangeBounds,
28 path::Path,
29 sync::{
30 atomic::{AtomicBool, AtomicU64},
31 Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
32 },
33};
34
35fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
36 if item.is_tombstone() {
37 None
38 } else {
39 Some(item)
40 }
41}
42
43#[derive(Clone)]
45pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
46
47impl std::ops::Deref for Tree {
48 type Target = TreeInner;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl AbstractTree for Tree {
56 #[doc(hidden)]
57 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
58 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
59
60 let _lock = self
62 .0
63 .major_compaction_lock
64 .write()
65 .expect("lock is poisoned");
66
67 log::info!("Starting major compaction");
68 self.inner_compact(strategy, seqno_threshold)
69 }
70
71 fn l0_run_count(&self) -> usize {
72 let lock = self.levels.read().expect("lock is poisoned");
73
74 let first_level = lock
75 .levels
76 .first()
77 .expect("first level should always exist");
78
79 if first_level.is_disjoint {
80 1
81 } else {
82 first_level.segments.len()
86 }
87 }
88
89 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
90 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
91 }
92
93 fn bloom_filter_size(&self) -> usize {
94 self.levels
95 .read()
96 .expect("lock is poisoned")
97 .iter()
98 .map(super::segment::Segment::bloom_filter_size)
99 .sum()
100 }
101
102 fn sealed_memtable_count(&self) -> usize {
103 self.sealed_memtables
104 .read()
105 .expect("lock is poisoned")
106 .len()
107 }
108
109 fn verify(&self) -> crate::Result<usize> {
110 let _lock = self.lock_active_memtable();
112
113 let mut sum = 0;
114
115 let level_manifest = self.levels.read().expect("lock is poisoned");
116
117 for level in &level_manifest.levels {
118 for segment in &level.segments {
119 sum += segment.verify()?;
120 }
121 }
122
123 Ok(sum)
124 }
125
126 fn keys(
127 &self,
128 seqno: Option<SeqNo>,
129 index: Option<Arc<Memtable>>,
130 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
131 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
132 }
133
134 fn values(
135 &self,
136 seqno: Option<SeqNo>,
137 index: Option<Arc<Memtable>>,
138 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
139 Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
140 }
141
142 fn flush_memtable(
143 &self,
144 segment_id: SegmentId,
145 memtable: &Arc<Memtable>,
146 seqno_threshold: SeqNo,
147 ) -> crate::Result<Option<Segment>> {
148 use crate::{
149 compaction::stream::CompactionStream,
150 file::SEGMENTS_FOLDER,
151 segment::writer::{Options, Writer},
152 };
153
154 let start = std::time::Instant::now();
155
156 let folder = self.config.path.join(SEGMENTS_FOLDER);
157 log::debug!("writing segment to {folder:?}");
158
159 let mut segment_writer = Writer::new(Options {
160 segment_id,
161 folder,
162 data_block_size: self.config.data_block_size,
163 index_block_size: self.config.index_block_size,
164 })?
165 .use_compression(self.config.compression);
166
167 {
168 use crate::segment::writer::BloomConstructionPolicy;
169
170 if self.config.bloom_bits_per_key >= 0 {
171 segment_writer =
172 segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
173 } else {
174 segment_writer =
175 segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
176 }
177 }
178
179 let iter = memtable.iter().map(Ok);
180 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
181
182 for item in compaction_filter {
183 segment_writer.write(item?)?;
184 }
185
186 let result = self.consume_writer(segment_id, segment_writer)?;
187
188 log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
189
190 Ok(result)
191 }
192
193 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
194 log::trace!("register: Acquiring levels manifest write lock");
196 let mut original_levels = self.levels.write().expect("lock is poisoned");
197 log::trace!("register: Acquired levels manifest write lock");
198
199 log::trace!("register: Acquiring sealed memtables write lock");
201 let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
202 log::trace!("register: Acquired sealed memtables write lock");
203
204 original_levels.atomic_swap(|recipe| {
205 for segment in segments.iter().cloned() {
206 recipe
207 .first_mut()
208 .expect("first level should exist")
209 .insert(segment);
210 }
211 })?;
212
213 for segment in segments {
216 log::trace!("releasing sealed memtable {}", segment.id());
217 sealed_memtables.remove(segment.id());
218 }
219
220 Ok(())
221 }
222
223 fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
224 self.active_memtable.write().expect("lock is poisoned")
225 }
226
227 fn clear_active_memtable(&self) {
228 *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
229 }
230
231 fn set_active_memtable(&self, memtable: Memtable) {
232 let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
233 *memtable_lock = Arc::new(memtable);
234 }
235
236 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
237 let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
238 memtable_lock.add(id, memtable);
239 }
240
241 fn compact(
242 &self,
243 strategy: Arc<dyn CompactionStrategy>,
244 seqno_threshold: SeqNo,
245 ) -> crate::Result<()> {
246 let _lock = self
250 .0
251 .major_compaction_lock
252 .read()
253 .expect("lock is poisoned");
254
255 self.inner_compact(strategy, seqno_threshold)
256 }
257
258 fn get_next_segment_id(&self) -> SegmentId {
259 self.0.get_next_segment_id()
260 }
261
262 fn tree_config(&self) -> &Config {
263 &self.config
264 }
265
266 fn active_memtable_size(&self) -> u32 {
267 use std::sync::atomic::Ordering::Acquire;
268
269 self.active_memtable
270 .read()
271 .expect("lock is poisoned")
272 .approximate_size
273 .load(Acquire)
274 }
275
276 fn tree_type(&self) -> crate::TreeType {
277 crate::TreeType::Standard
278 }
279
280 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
281 log::trace!("rotate: acquiring active memtable write lock");
282 let mut active_memtable = self.lock_active_memtable();
283
284 log::trace!("rotate: acquiring sealed memtables write lock");
285 let mut sealed_memtables = self.lock_sealed_memtables();
286
287 if active_memtable.is_empty() {
288 return None;
289 }
290
291 let yanked_memtable = std::mem::take(&mut *active_memtable);
292 let yanked_memtable = yanked_memtable;
293
294 let tmp_memtable_id = self.get_next_segment_id();
295 sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
296
297 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
298
299 Some((tmp_memtable_id, yanked_memtable))
300 }
301
302 fn segment_count(&self) -> usize {
303 self.levels.read().expect("lock is poisoned").len()
304 }
305
306 fn level_segment_count(&self, idx: usize) -> Option<usize> {
307 self.levels
308 .read()
309 .expect("lock is poisoned")
310 .levels
311 .get(idx)
312 .map(|x| x.len())
313 }
314
315 #[allow(clippy::significant_drop_tightening)]
316 fn approximate_len(&self) -> usize {
317 let levels = self.levels.read().expect("lock is poisoned");
319 let memtable = self.active_memtable.read().expect("lock is poisoned");
320 let sealed = self.sealed_memtables.read().expect("lock is poisoned");
321
322 let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
323 let memtable_count = memtable.len() as u64;
324 let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
325
326 (memtable_count + sealed_count + segments_item_count)
327 .try_into()
328 .expect("should not be too large")
329 }
330
331 fn disk_space(&self) -> u64 {
332 let levels = self.levels.read().expect("lock is poisoned");
333 levels.iter().map(|x| x.metadata.file_size).sum()
334 }
335
336 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
337 let active = self
338 .active_memtable
339 .read()
340 .expect("lock is poisoned")
341 .get_highest_seqno();
342
343 let sealed = self
344 .sealed_memtables
345 .read()
346 .expect("Lock is poisoned")
347 .iter()
348 .map(|(_, table)| table.get_highest_seqno())
349 .max()
350 .flatten();
351
352 active.max(sealed)
353 }
354
355 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
356 let levels = self.levels.read().expect("lock is poisoned");
357 levels
358 .iter()
359 .map(super::segment::Segment::get_highest_seqno)
360 .max()
361 }
362
363 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
364 use crate::AnyTree::Standard;
365
366 Snapshot::new(Standard(self.clone()), seqno)
367 }
368
369 fn get<K: AsRef<[u8]>>(
370 &self,
371 key: K,
372 seqno: Option<SeqNo>,
373 ) -> crate::Result<Option<UserValue>> {
374 Ok(self
375 .get_internal_entry(key.as_ref(), seqno)?
376 .map(|x| x.value))
377 }
378
379 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
380 &self,
381 range: R,
382 seqno: Option<SeqNo>,
383 index: Option<Arc<Memtable>>,
384 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
385 Box::new(self.create_range(&range, seqno, index))
386 }
387
388 fn prefix<K: AsRef<[u8]>>(
389 &self,
390 prefix: K,
391 seqno: Option<SeqNo>,
392 index: Option<Arc<Memtable>>,
393 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
394 Box::new(self.create_prefix(prefix, seqno, index))
395 }
396
397 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
398 &self,
399 key: K,
400 value: V,
401 seqno: SeqNo,
402 ) -> (u32, u32) {
403 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
404 self.append_entry(value)
405 }
406
407 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
408 let value = InternalValue::new_tombstone(key, seqno);
409 self.append_entry(value)
410 }
411
412 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
413 let value = InternalValue::new_weak_tombstone(key, seqno);
414 self.append_entry(value)
415 }
416}
417
418impl Tree {
419 pub(crate) fn open(config: Config) -> crate::Result<Self> {
432 use crate::file::MANIFEST_FILE;
433
434 log::debug!("Opening LSM-tree at {:?}", config.path);
435
436 if config.path.join("version").try_exists()? {
438 return Err(crate::Error::InvalidVersion(Version::V1));
439 }
440
441 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
442 Self::recover(config)
443 } else {
444 Self::create_new(config)
445 }?;
446
447 Ok(tree)
448 }
449
450 pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
451 self.active_memtable.read().expect("lock is poisoned")
452 }
453
454 pub(crate) fn consume_writer(
455 &self,
456 segment_id: SegmentId,
457 mut writer: crate::segment::writer::Writer,
458 ) -> crate::Result<Option<Segment>> {
459 let segment_folder = writer.opts.folder.clone();
460 let segment_file_path = segment_folder.join(segment_id.to_string());
461
462 let Some(trailer) = writer.finish()? else {
463 return Ok(None);
464 };
465
466 log::debug!("Finalized segment write at {segment_folder:?}");
467
468 let block_index =
469 FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
470 let block_index = Arc::new(BlockIndexImpl::Full(block_index));
471
472 let created_segment: Segment = SegmentInner {
473 path: segment_file_path.to_path_buf(),
474
475 tree_id: self.id,
476
477 metadata: trailer.metadata,
478 offsets: trailer.offsets,
479
480 descriptor_table: self.config.descriptor_table.clone(),
481 block_index,
482 block_cache: self.config.block_cache.clone(),
483
484 bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
485
486 is_deleted: AtomicBool::default(),
487 }
488 .into();
489
490 self.config
491 .descriptor_table
492 .insert(segment_file_path, created_segment.global_id());
493
494 log::debug!("Flushed segment to {segment_folder:?}");
495
496 Ok(Some(created_segment))
497 }
498
499 #[doc(hidden)]
510 pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
511 log::debug!("Flushing active memtable");
512
513 let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
514 return Ok(None);
515 };
516
517 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
518 else {
519 return Ok(None);
520 };
521 self.register_segments(&[segment.clone()])?;
522
523 Ok(Some(segment))
524 }
525
526 #[doc(hidden)]
528 #[must_use]
529 pub fn is_compacting(&self) -> bool {
530 let levels = self.levels.read().expect("lock is poisoned");
531 levels.is_compacting()
532 }
533
534 fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
536 self.sealed_memtables.write().expect("lock is poisoned")
537 }
538
539 pub(crate) fn get_internal_entry_with_memtable(
542 &self,
543 memtable_lock: &Memtable,
544 key: &[u8],
545 seqno: Option<SeqNo>,
546 ) -> crate::Result<Option<InternalValue>> {
547 if let Some(entry) = memtable_lock.get(key, seqno) {
548 return Ok(ignore_tombstone_value(entry));
549 };
550
551 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
553 return Ok(ignore_tombstone_value(entry));
554 }
555
556 self.get_internal_entry_from_segments(key, seqno)
557 }
558
559 fn get_internal_entry_from_sealed_memtables(
560 &self,
561 key: &[u8],
562 seqno: Option<SeqNo>,
563 ) -> Option<InternalValue> {
564 let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
565
566 for (_, memtable) in memtable_lock.iter().rev() {
567 if let Some(entry) = memtable.get(key, seqno) {
568 return Some(entry);
569 }
570 }
571
572 None
573 }
574
575 fn get_internal_entry_from_segments(
576 &self,
577 key: &[u8],
578 seqno: Option<SeqNo>,
579 ) -> crate::Result<Option<InternalValue>> {
580 let key_hash = crate::bloom::BloomFilter::get_hash(key);
583
584 let level_manifest = self.levels.read().expect("lock is poisoned");
585
586 for level in &level_manifest.levels {
587 if level.len() >= 4 {
589 if let Some(level) = level.as_disjoint() {
590 if let Some(segment) = level.get_segment_containing_key(key) {
591 if let Some(item) = segment.get(key, seqno, key_hash)? {
592 return Ok(ignore_tombstone_value(item));
593 }
594 }
595
596 continue;
598 }
599 }
600
601 for segment in &level.segments {
603 if !segment.is_key_in_key_range(key) {
604 continue;
605 }
606
607 if let Some(item) = segment.get(key, seqno, key_hash)? {
608 return Ok(ignore_tombstone_value(item));
609 }
610 }
611 }
612
613 Ok(None)
614 }
615
616 #[doc(hidden)]
617 pub fn get_internal_entry(
618 &self,
619 key: &[u8],
620 seqno: Option<SeqNo>,
621 ) -> crate::Result<Option<InternalValue>> {
622 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
625
626 if let Some(entry) = memtable_lock.get(key, seqno) {
627 return Ok(ignore_tombstone_value(entry));
628 };
629
630 drop(memtable_lock);
631
632 if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
634 return Ok(ignore_tombstone_value(entry));
635 }
636
637 self.get_internal_entry_from_segments(key, seqno)
639 }
640
641 fn inner_compact(
642 &self,
643 strategy: Arc<dyn CompactionStrategy>,
644 seqno_threshold: SeqNo,
645 ) -> crate::Result<()> {
646 use crate::compaction::worker::{do_compaction, Options};
647
648 let mut opts = Options::from_tree(self, strategy);
649 opts.eviction_seqno = seqno_threshold;
650
651 do_compaction(&opts)?;
652
653 log::debug!("Compaction run over");
654
655 Ok(())
656 }
657
658 #[doc(hidden)]
659 #[must_use]
660 pub fn create_iter(
661 &self,
662 seqno: Option<SeqNo>,
663 ephemeral: Option<Arc<Memtable>>,
664 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
665 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
666 }
667
668 #[doc(hidden)]
669 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
670 &'a self,
671 range: &'a R,
672 seqno: Option<SeqNo>,
673 ephemeral: Option<Arc<Memtable>>,
674 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
675 use crate::range::{IterState, TreeIter};
676 use std::ops::Bound::{self, Excluded, Included, Unbounded};
677
678 let lo: Bound<UserKey> = match range.start_bound() {
679 Included(x) => Included(x.as_ref().into()),
680 Excluded(x) => Excluded(x.as_ref().into()),
681 Unbounded => Unbounded,
682 };
683
684 let hi: Bound<UserKey> = match range.end_bound() {
685 Included(x) => Included(x.as_ref().into()),
686 Excluded(x) => Excluded(x.as_ref().into()),
687 Unbounded => Unbounded,
688 };
689
690 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
691
692 log::trace!("range read: acquiring levels manifest read lock");
693 let level_manifest =
695 guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
696 log::trace!("range read: acquired level manifest read lock");
697
698 log::trace!("range read: acquiring active memtable read lock");
699 let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
700 .expect("lock is poisoned");
701 log::trace!("range read: acquired active memtable read lock");
702
703 log::trace!("range read: acquiring sealed memtable read lock");
704 let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
705 .expect("lock is poisoned");
706 log::trace!("range read: acquired sealed memtable read lock");
707
708 let iter_state = IterState {
709 active: active.clone(),
710 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
711 ephemeral,
712 levels: level_manifest.levels.clone(),
713 };
714
715 TreeIter::create_range(iter_state, bounds, seqno, level_manifest)
716 }
717
718 #[doc(hidden)]
719 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
720 &'a self,
721 range: &'a R,
722 seqno: Option<SeqNo>,
723 ephemeral: Option<Arc<Memtable>>,
724 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
725 self.create_internal_range(range, seqno, ephemeral)
726 .map(|item| match item {
727 Ok(kv) => Ok((kv.key.user_key, kv.value)),
728 Err(e) => Err(e),
729 })
730 }
731
732 #[doc(hidden)]
733 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
734 &'a self,
735 prefix: K,
736 seqno: Option<SeqNo>,
737 ephemeral: Option<Arc<Memtable>>,
738 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
739 use crate::range::prefix_to_range;
740
741 let range = prefix_to_range(prefix.as_ref());
742 self.create_range(&range, seqno, ephemeral)
743 }
744
745 #[doc(hidden)]
749 #[must_use]
750 pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
751 let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
752 memtable_lock.insert(value)
753 }
754
755 fn recover(mut config: Config) -> crate::Result<Self> {
761 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
762 use inner::get_next_tree_id;
763
764 log::info!("Recovering LSM-tree at {:?}", config.path);
765
766 let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
767 let mut bytes = Cursor::new(bytes);
768 let manifest = Manifest::decode_from(&mut bytes)?;
769
770 if manifest.version != Version::V2 {
771 return Err(crate::Error::InvalidVersion(manifest.version));
772 }
773
774 config.level_count = manifest.level_count;
776 config.table_type = manifest.table_type;
777 config.tree_type = manifest.tree_type;
778
779 let tree_id = get_next_tree_id();
780
781 let mut levels = Self::recover_levels(
782 &config.path,
783 tree_id,
784 &config.block_cache,
785 &config.descriptor_table,
786 )?;
787 levels.update_metadata();
788
789 let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
790
791 let inner = TreeInner {
792 id: tree_id,
793 segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
794 active_memtable: Arc::default(),
795 sealed_memtables: Arc::default(),
796 levels: Arc::new(RwLock::new(levels)),
797 stop_signal: StopSignal::default(),
798 config,
799 major_compaction_lock: RwLock::default(),
800 };
801
802 Ok(Self(Arc::new(inner)))
803 }
804
805 fn create_new(config: Config) -> crate::Result<Self> {
807 use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
808 use std::fs::{create_dir_all, File};
809
810 let path = config.path.clone();
811 log::trace!("Creating LSM-tree at {path:?}");
812
813 create_dir_all(&path)?;
814
815 let manifest_path = path.join(MANIFEST_FILE);
816 assert!(!manifest_path.try_exists()?);
817
818 let segment_folder_path = path.join(SEGMENTS_FOLDER);
819 create_dir_all(&segment_folder_path)?;
820
821 let mut file = File::create(manifest_path)?;
824 Manifest {
825 version: Version::V2,
826 level_count: config.level_count,
827 tree_type: config.tree_type,
828 table_type: TableType::Block,
829 }
830 .encode_into(&mut file)?;
831 file.sync_all()?;
832
833 fsync_directory(&segment_folder_path)?;
835 fsync_directory(&path)?;
836
837 let inner = TreeInner::create_new(config)?;
838 Ok(Self(Arc::new(inner)))
839 }
840
841 fn recover_levels<P: AsRef<Path>>(
843 tree_path: P,
844 tree_id: TreeId,
845 block_cache: &Arc<BlockCache>,
846 descriptor_table: &Arc<FileDescriptorTable>,
847 ) -> crate::Result<LevelManifest> {
848 use crate::{
849 file::fsync_directory,
850 file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
851 SegmentId,
852 };
853
854 let tree_path = tree_path.as_ref();
855
856 let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
857 log::info!("Recovering manifest at {level_manifest_path:?}");
858
859 let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
860 let cnt = segment_id_map.len();
861
862 log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
863
864 let progress_mod = match cnt {
865 _ if cnt <= 20 => 1,
866 _ if cnt <= 100 => 10,
867 _ => 100,
868 };
869
870 let mut segments = vec![];
871
872 let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
873
874 if !segment_base_folder.try_exists()? {
875 std::fs::create_dir_all(&segment_base_folder)?;
876 fsync_directory(&segment_base_folder)?;
877 }
878
879 for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
880 let dirent = dirent?;
881
882 let file_name = dirent.file_name();
883
884 if file_name == ".DS_Store" {
885 continue;
886 }
887
888 let segment_file_name = file_name.to_str().ok_or_else(|| {
889 log::error!("invalid segment file name {file_name:?}");
890 crate::Error::Unrecoverable
891 })?;
892
893 let segment_file_path = dirent.path();
894 assert!(!segment_file_path.is_dir());
895
896 if segment_file_name.starts_with("tmp_") {
897 log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
898 std::fs::remove_file(&segment_file_path)?;
899 continue;
900 }
901
902 log::debug!("Recovering segment from {segment_file_path:?}");
903
904 let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
905 log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
906 crate::Error::Unrecoverable
907 })?;
908
909 if let Some(&level_idx) = segment_id_map.get(&segment_id) {
910 let segment = Segment::recover(
911 &segment_file_path,
912 tree_id,
913 block_cache.clone(),
914 descriptor_table.clone(),
915 level_idx == 0 || level_idx == 1,
916 )?;
917
918 descriptor_table.insert(&segment_file_path, segment.global_id());
919
920 segments.push(segment);
921 log::debug!("Recovered segment from {segment_file_path:?}");
922
923 if idx % progress_mod == 0 {
924 log::debug!("Recovered {idx}/{cnt} disk segments");
925 }
926 } else {
927 log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
928 std::fs::remove_file(&segment_file_path)?;
929 }
930 }
931
932 if segments.len() < cnt {
933 log::error!(
934 "Recovered less segments than expected: {:?}",
935 segment_id_map.keys(),
936 );
937 return Err(crate::Error::Unrecoverable);
938 }
939
940 log::debug!("Successfully recovered {} segments", segments.len());
941
942 LevelManifest::recover(&level_manifest_path, segments)
943 }
944}