1pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10 blob_tree::FragmentationMap,
11 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12 config::Config,
13 file::BLOBS_FOLDER,
14 format_version::FormatVersion,
15 iter_guard::{IterGuard, IterGuardImpl},
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::Table,
20 tree::inner::SuperVersion,
21 value::InternalValue,
22 version::{recovery::recover, Version, VersionId},
23 vlog::BlobFile,
24 AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
25 TreeType, UserKey, UserValue, ValueType,
26};
27use inner::{MemtableId, TreeId, TreeInner};
28use std::{
29 ops::{Bound, RangeBounds},
30 path::Path,
31 sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
32};
33
34#[cfg(feature = "metrics")]
35use crate::metrics::Metrics;
36
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40 fn key(self) -> crate::Result<UserKey> {
41 self.0.map(|(k, _)| k)
42 }
43
44 fn size(self) -> crate::Result<u32> {
45 #[allow(clippy::cast_possible_truncation)]
47 self.into_inner().map(|(_, v)| v.len() as u32)
48 }
49
50 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
51 self.0
52 }
53}
54
55fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
56 if item.is_tombstone() {
57 None
58 } else {
59 Some(item)
60 }
61}
62
63#[derive(Clone)]
65pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
66
67impl std::ops::Deref for Tree {
68 type Target = TreeInner;
69
70 fn deref(&self) -> &Self::Target {
71 &self.0
72 }
73}
74
75impl AbstractTree for Tree {
76 fn next_table_id(&self) -> TableId {
77 self.0
78 .table_id_counter
79 .load(std::sync::atomic::Ordering::Relaxed)
80 }
81
82 fn id(&self) -> TreeId {
83 self.id
84 }
85
86 #[allow(clippy::significant_drop_tightening)]
87 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
88 let version_lock = self.super_version.read().expect("lock is poisoned");
89
90 if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
91 return Ok(ignore_tombstone_value(entry));
92 }
93
94 if let Some(entry) =
96 Self::get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
97 {
98 return Ok(ignore_tombstone_value(entry));
99 }
100
101 Self::get_internal_entry_from_tables(&version_lock, key, seqno)
103 }
104
105 fn current_version(&self) -> Version {
106 self.super_version.read().expect("poisoned").version.clone()
107 }
108
109 fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
110 log::debug!("Flushing active memtable");
111
112 let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
113 return Ok(None);
114 };
115
116 let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
117 else {
118 return Ok(None);
119 };
120 self.register_tables(std::slice::from_ref(&table), None, None, seqno_threshold)?;
121
122 Ok(Some(table))
123 }
124
125 #[cfg(feature = "metrics")]
126 fn metrics(&self) -> &Arc<crate::Metrics> {
127 &self.0.metrics
128 }
129
130 fn version_free_list_len(&self) -> usize {
131 self.compaction_state
132 .lock()
133 .expect("lock is poisoned")
134 .version_free_list_len()
135 }
136
137 fn prefix<K: AsRef<[u8]>>(
138 &self,
139 prefix: K,
140 seqno: SeqNo,
141 index: Option<Arc<Memtable>>,
142 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
143 Box::new(
144 self.create_prefix(&prefix, seqno, index)
145 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
146 )
147 }
148
149 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
150 &self,
151 range: R,
152 seqno: SeqNo,
153 index: Option<Arc<Memtable>>,
154 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
155 Box::new(
156 self.create_range(&range, seqno, index)
157 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
158 )
159 }
160
161 fn tombstone_count(&self) -> u64 {
163 self.current_version()
164 .iter_tables()
165 .map(Table::tombstone_count)
166 .sum()
167 }
168
169 fn weak_tombstone_count(&self) -> u64 {
171 self.current_version()
172 .iter_tables()
173 .map(Table::weak_tombstone_count)
174 .sum()
175 }
176
177 fn weak_tombstone_reclaimable_count(&self) -> u64 {
179 self.current_version()
180 .iter_tables()
181 .map(Table::weak_tombstone_reclaimable)
182 .sum()
183 }
184
185 fn ingest(
186 &self,
187 iter: impl Iterator<Item = (UserKey, UserValue)>,
188 seqno_generator: &SequenceNumberCounter,
189 visible_seqno: &SequenceNumberCounter,
190 ) -> crate::Result<()> {
191 use crate::tree::ingest::Ingestion;
192 use std::time::Instant;
193
194 let seqno = seqno_generator.next();
198
199 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
210
211 let start = Instant::now();
212 let mut count = 0;
213 let mut last_key = None;
214
215 #[allow(clippy::explicit_counter_loop)]
216 for (key, value) in iter {
217 if let Some(last_key) = &last_key {
218 assert!(
219 key > last_key,
220 "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
221 );
222 }
223 last_key = Some(key.clone());
224
225 writer.write(key, value)?;
226
227 count += 1;
228 }
229
230 writer.finish()?;
231
232 visible_seqno.fetch_max(seqno + 1);
233
234 log::info!("Ingested {count} items in {:?}", start.elapsed());
235
236 Ok(())
237 }
238
239 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
240 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
241
242 if is_empty {
243 return Ok(());
244 }
245
246 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
247
248 let _lock = self
250 .0
251 .major_compaction_lock
252 .write()
253 .expect("lock is poisoned");
254
255 log::info!("Starting drop_range compaction");
256 self.inner_compact(strategy, 0)
257 }
258
259 #[doc(hidden)]
260 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
261 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
262
263 let _lock = self
265 .0
266 .major_compaction_lock
267 .write()
268 .expect("lock is poisoned");
269
270 log::info!("Starting major compaction");
271 self.inner_compact(strategy, seqno_threshold)
272 }
273
274 fn l0_run_count(&self) -> usize {
275 self.current_version()
276 .level(0)
277 .map(|x| x.run_count())
278 .unwrap_or_default()
279 }
280
281 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
282 #[allow(clippy::cast_possible_truncation)]
284 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
285 }
286
287 fn filter_size(&self) -> usize {
288 self.current_version()
289 .iter_tables()
290 .map(Table::filter_size)
291 .sum()
292 }
293
294 fn pinned_filter_size(&self) -> usize {
295 self.current_version()
296 .iter_tables()
297 .map(Table::pinned_filter_size)
298 .sum()
299 }
300
301 fn pinned_block_index_size(&self) -> usize {
302 self.current_version()
303 .iter_tables()
304 .map(Table::pinned_block_index_size)
305 .sum()
306 }
307
308 fn sealed_memtable_count(&self) -> usize {
309 self.super_version
310 .read()
311 .expect("lock is poisoned")
312 .sealed_memtables
313 .len()
314 }
315
316 fn flush_memtable(
317 &self,
318 table_id: TableId,
319 memtable: &Arc<Memtable>,
320 seqno_threshold: SeqNo,
321 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
322 use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
323 use std::time::Instant;
324
325 let start = Instant::now();
326
327 let folder = self.config.path.join(TABLES_FOLDER);
328 let table_file_path = folder.join(table_id.to_string());
329
330 let data_block_size = self.config.data_block_size_policy.get(0);
331 let index_block_size = self.config.index_block_size_policy.get(0);
332
333 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
334 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
335
336 let data_block_compression = self.config.data_block_compression_policy.get(0);
337 let index_block_compression = self.config.index_block_compression_policy.get(0);
338
339 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
340
341 let index_partioning = self.config.index_block_partitioning_policy.get(0);
342 let filter_partioning = self.config.filter_block_partitioning_policy.get(0);
343
344 log::debug!(
345 "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
346 table_file_path.display(),
347 );
348
349 let mut table_writer = Writer::new(table_file_path, table_id)?
350 .use_data_block_restart_interval(data_block_restart_interval)
351 .use_index_block_restart_interval(index_block_restart_interval)
352 .use_data_block_compression(data_block_compression)
353 .use_index_block_compression(index_block_compression)
354 .use_data_block_size(data_block_size)
355 .use_index_block_size(index_block_size)
356 .use_data_block_hash_ratio(data_block_hash_ratio)
357 .use_bloom_policy({
358 use crate::config::FilterPolicyEntry::{Bloom, None};
359 use crate::table::filter::BloomConstructionPolicy;
360
361 match self.config.filter_policy.get(0) {
362 Bloom(policy) => policy,
363 None => BloomConstructionPolicy::BitsPerKey(0.0),
364 }
365 });
366
367 if index_partioning {
368 table_writer = table_writer.use_partitioned_index();
369 }
370 if filter_partioning {
371 table_writer = table_writer.use_partitioned_filter();
372 }
373
374 let iter = memtable.iter().map(Ok);
375 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
376
377 for item in compaction_filter {
378 table_writer.write(item?)?;
379 }
380
381 let result = self.consume_writer(table_writer)?;
382
383 log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
384
385 Ok(result.map(|table| (table, None)))
386 }
387
388 #[allow(clippy::significant_drop_tightening)]
389 fn register_tables(
390 &self,
391 tables: &[Table],
392 blob_files: Option<&[BlobFile]>,
393 frag_map: Option<FragmentationMap>,
394 seqno_threshold: SeqNo,
395 ) -> crate::Result<()> {
396 log::trace!(
397 "Registering {} tables, {} blob files",
398 tables.len(),
399 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
400 );
401
402 let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
403 let mut super_version = self.super_version.write().expect("lock is poisoned");
404
405 compaction_state.upgrade_version(
406 &mut super_version,
407 |version| {
408 Ok(version.with_new_l0_run(tables, blob_files, frag_map.filter(|x| !x.is_empty())))
409 },
410 seqno_threshold,
411 )?;
412
413 for table in tables {
414 log::trace!("releasing sealed memtable {}", table.id());
415
416 super_version.sealed_memtables =
417 Arc::new(super_version.sealed_memtables.remove(table.id()));
418 }
419
420 Ok(())
421 }
422
423 fn clear_active_memtable(&self) {
424 self.super_version
425 .write()
426 .expect("lock is poisoned")
427 .active_memtable = Arc::new(Memtable::default());
428 }
429
430 fn set_active_memtable(&self, memtable: Memtable) {
431 let mut version_lock = self.super_version.write().expect("lock is poisoned");
432 version_lock.active_memtable = Arc::new(memtable);
433 }
434
435 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
436 let mut version_lock = self.super_version.write().expect("lock is poisoned");
437 version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
438 }
439
440 fn compact(
441 &self,
442 strategy: Arc<dyn CompactionStrategy>,
443 seqno_threshold: SeqNo,
444 ) -> crate::Result<()> {
445 let _lock = self
449 .0
450 .major_compaction_lock
451 .read()
452 .expect("lock is poisoned");
453
454 self.inner_compact(strategy, seqno_threshold)
455 }
456
457 fn get_next_table_id(&self) -> TableId {
458 self.0.get_next_table_id()
459 }
460
461 fn tree_config(&self) -> &Config {
462 &self.config
463 }
464
465 fn active_memtable_size(&self) -> u64 {
466 use std::sync::atomic::Ordering::Acquire;
467
468 self.super_version
469 .read()
470 .expect("lock is poisoned")
471 .active_memtable
472 .approximate_size
473 .load(Acquire)
474 }
475
476 fn tree_type(&self) -> crate::TreeType {
477 crate::TreeType::Standard
478 }
479
480 #[allow(clippy::significant_drop_tightening)]
481 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
482 let mut version_lock = self.super_version.write().expect("lock is poisoned");
483
484 if version_lock.active_memtable.is_empty() {
485 return None;
486 }
487
488 let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
489 let yanked_memtable = yanked_memtable;
490
491 let tmp_memtable_id = self.get_next_table_id();
492
493 version_lock.sealed_memtables = Arc::new(
494 version_lock
495 .sealed_memtables
496 .add(tmp_memtable_id, yanked_memtable.clone()),
497 );
498
499 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
500
501 Some((tmp_memtable_id, yanked_memtable))
502 }
503
504 fn table_count(&self) -> usize {
505 self.current_version().table_count()
506 }
507
508 fn level_table_count(&self, idx: usize) -> Option<usize> {
509 self.current_version().level(idx).map(|x| x.table_count())
510 }
511
512 #[allow(clippy::significant_drop_tightening)]
513 fn approximate_len(&self) -> usize {
514 let version = self.super_version.read().expect("lock is poisoned");
515
516 let tables_item_count = self
517 .current_version()
518 .iter_tables()
519 .map(|x| x.metadata.item_count)
520 .sum::<u64>();
521
522 let memtable_count = version.active_memtable.len() as u64;
523 let sealed_count = version
524 .sealed_memtables
525 .iter()
526 .map(|(_, mt)| mt.len())
527 .sum::<usize>() as u64;
528
529 (memtable_count + sealed_count + tables_item_count)
530 .try_into()
531 .expect("should not be too large")
532 }
533
534 fn disk_space(&self) -> u64 {
535 self.current_version()
536 .iter_levels()
537 .map(super::version::Level::size)
538 .sum()
539 }
540
541 #[allow(clippy::significant_drop_tightening)]
542 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
543 let version = self.super_version.read().expect("lock is poisoned");
544
545 let active = version.active_memtable.get_highest_seqno();
546
547 let sealed = version
548 .sealed_memtables
549 .iter()
550 .map(|(_, table)| table.get_highest_seqno())
551 .max()
552 .flatten();
553
554 active.max(sealed)
555 }
556
557 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
558 self.current_version()
559 .iter_tables()
560 .map(Table::get_highest_seqno)
561 .max()
562 }
563
564 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
565 Ok(self
566 .get_internal_entry(key.as_ref(), seqno)?
567 .map(|x| x.value))
568 }
569
570 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
571 &self,
572 key: K,
573 value: V,
574 seqno: SeqNo,
575 ) -> (u64, u64) {
576 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
577 self.append_entry(value)
578 }
579
580 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581 let value = InternalValue::new_tombstone(key, seqno);
582 self.append_entry(value)
583 }
584
585 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
586 let value = InternalValue::new_weak_tombstone(key, seqno);
587 self.append_entry(value)
588 }
589}
590
591impl Tree {
592 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
603 range: &R,
604 ) -> (OwnedBounds, bool) {
605 use Bound::{Excluded, Included, Unbounded};
606
607 let start = match range.start_bound() {
608 Included(key) => Included(Slice::from(key.as_ref())),
609 Excluded(key) => Excluded(Slice::from(key.as_ref())),
610 Unbounded => Unbounded,
611 };
612
613 let end = match range.end_bound() {
614 Included(key) => Included(Slice::from(key.as_ref())),
615 Excluded(key) => Excluded(Slice::from(key.as_ref())),
616 Unbounded => Unbounded,
617 };
618
619 let is_empty =
620 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
621 lo.as_ref() > hi.as_ref()
622 } else {
623 false
624 };
625
626 (OwnedBounds { start, end }, is_empty)
627 }
628
629 pub(crate) fn open(config: Config) -> crate::Result<Self> {
642 use crate::file::MANIFEST_FILE;
643
644 log::debug!("Opening LSM-tree at {}", config.path.display());
645
646 if config.path.join("version").try_exists()? {
648 return Err(crate::Error::InvalidVersion(FormatVersion::V1));
649 }
650
651 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
652 Self::recover(config)
653 } else {
654 Self::create_new(config)
655 }?;
656
657 Ok(tree)
658 }
659
660 pub(crate) fn consume_writer(
661 &self,
662 writer: crate::table::Writer,
663 ) -> crate::Result<Option<Table>> {
664 let table_file_path = writer.path.clone();
665
666 let Some((_, checksum)) = writer.finish()? else {
667 return Ok(None);
668 };
669
670 log::debug!("Finalized table write at {}", table_file_path.display());
671
672 let pin_filter = self.config.filter_block_pinning_policy.get(0);
673 let pin_index = self.config.filter_block_pinning_policy.get(0);
674
675 let created_table = Table::recover(
676 table_file_path,
677 checksum,
678 self.id,
679 self.config.cache.clone(),
680 self.config.descriptor_table.clone(),
681 pin_filter,
682 pin_index,
683 #[cfg(feature = "metrics")]
684 self.metrics.clone(),
685 )?;
686
687 log::debug!("Flushed table to {:?}", created_table.path);
688
689 Ok(Some(created_table))
690 }
691
692 #[doc(hidden)]
694 #[must_use]
695 pub fn is_compacting(&self) -> bool {
696 !self
697 .compaction_state
698 .lock()
699 .expect("lock is poisoned")
700 .hidden_set()
701 .is_empty()
702 }
703
704 fn get_internal_entry_from_sealed_memtables(
705 super_version: &SuperVersion,
706 key: &[u8],
707 seqno: SeqNo,
708 ) -> Option<InternalValue> {
709 for (_, memtable) in super_version.sealed_memtables.iter().rev() {
710 if let Some(entry) = memtable.get(key, seqno) {
711 return Some(entry);
712 }
713 }
714
715 None
716 }
717
718 fn get_internal_entry_from_tables(
719 super_version: &SuperVersion,
720 key: &[u8],
721 seqno: SeqNo,
722 ) -> crate::Result<Option<InternalValue>> {
723 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
726
727 for level in super_version.version.iter_levels() {
728 for run in level.iter() {
729 if run.len() >= 4 {
731 if let Some(table) = run.get_for_key(key) {
732 if let Some(item) = table.get(key, seqno, key_hash)? {
733 return Ok(ignore_tombstone_value(item));
734 }
735 }
736 } else {
737 for table in run.iter() {
739 if !table.is_key_in_key_range(key) {
740 continue;
741 }
742
743 if let Some(item) = table.get(key, seqno, key_hash)? {
744 return Ok(ignore_tombstone_value(item));
745 }
746 }
747 }
748 }
749 }
750
751 Ok(None)
752 }
753
754 fn inner_compact(
755 &self,
756 strategy: Arc<dyn CompactionStrategy>,
757 seqno_threshold: SeqNo,
758 ) -> crate::Result<()> {
759 use crate::compaction::worker::{do_compaction, Options};
760
761 let mut opts = Options::from_tree(self, strategy);
762 opts.eviction_seqno = seqno_threshold;
763
764 do_compaction(&opts)?;
765
766 log::debug!("Compaction run over");
767
768 Ok(())
769 }
770
771 #[doc(hidden)]
772 #[must_use]
773 pub fn create_iter(
774 &self,
775 seqno: SeqNo,
776 ephemeral: Option<Arc<Memtable>>,
777 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
778 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
779 }
780
781 #[doc(hidden)]
782 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
783 &'a self,
784 range: &'a R,
785 seqno: SeqNo,
786 ephemeral: Option<Arc<Memtable>>,
787 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
788 use crate::range::{IterState, TreeIter};
789 use std::ops::Bound::{self, Excluded, Included, Unbounded};
790
791 let lo: Bound<UserKey> = match range.start_bound() {
792 Included(x) => Included(x.as_ref().into()),
793 Excluded(x) => Excluded(x.as_ref().into()),
794 Unbounded => Unbounded,
795 };
796
797 let hi: Bound<UserKey> = match range.end_bound() {
798 Included(x) => Included(x.as_ref().into()),
799 Excluded(x) => Excluded(x.as_ref().into()),
800 Unbounded => Unbounded,
801 };
802
803 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
804
805 let super_version = self.super_version.write().expect("lock is poisoned");
806
807 let iter_state = {
808 let active = &super_version.active_memtable;
809 let sealed = &super_version.sealed_memtables;
810
811 IterState {
812 active: active.clone(),
813 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
814 ephemeral,
815 version: super_version.version.clone(),
816 }
817 };
818
819 TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
820 }
821
822 #[doc(hidden)]
823 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
824 &self,
825 range: &'a R,
826 seqno: SeqNo,
827 ephemeral: Option<Arc<Memtable>>,
828 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
829 self.create_internal_range(range, seqno, ephemeral)
830 .map(|item| match item {
831 Ok(kv) => Ok((kv.key.user_key, kv.value)),
832 Err(e) => Err(e),
833 })
834 }
835
836 #[doc(hidden)]
837 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
838 &self,
839 prefix: K,
840 seqno: SeqNo,
841 ephemeral: Option<Arc<Memtable>>,
842 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
843 use crate::range::prefix_to_range;
844
845 let range = prefix_to_range(prefix.as_ref());
846 self.create_range(&range, seqno, ephemeral)
847 }
848
849 #[doc(hidden)]
853 #[must_use]
854 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
855 self.super_version
856 .read()
857 .expect("lock is poisoned")
858 .active_memtable
859 .insert(value)
860 }
861
862 fn recover(mut config: Config) -> crate::Result<Self> {
868 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
869 use inner::get_next_tree_id;
870
871 log::info!("Recovering LSM-tree at {}", config.path.display());
872
873 let manifest = {
874 let manifest_path = config.path.join(MANIFEST_FILE);
875 let reader = sfa::Reader::new(&manifest_path)?;
876 Manifest::decode_from(&manifest_path, &reader)?
877 };
878
879 if manifest.version != FormatVersion::V3 {
880 return Err(crate::Error::InvalidVersion(manifest.version));
881 }
882
883 config.level_count = manifest.level_count;
885
886 let tree_id = get_next_tree_id();
887
888 #[cfg(feature = "metrics")]
889 let metrics = Arc::new(Metrics::default());
890
891 let version = Self::recover_levels(
892 &config.path,
893 tree_id,
894 &config.cache,
895 &config.descriptor_table,
896 #[cfg(feature = "metrics")]
897 &metrics,
898 )?;
899
900 let highest_table_id = version
901 .iter_tables()
902 .map(Table::id)
903 .max()
904 .unwrap_or_default();
905
906 let path = config.path.clone();
907
908 let inner = TreeInner {
909 id: tree_id,
910 table_id_counter: Arc::new(AtomicU64::new(highest_table_id + 1)),
911 blob_file_id_generator: SequenceNumberCounter::default(),
912 super_version: Arc::new(RwLock::new(SuperVersion {
913 active_memtable: Arc::default(),
914 sealed_memtables: Arc::default(),
915 version,
916 })),
917 stop_signal: StopSignal::default(),
918 config,
919 major_compaction_lock: RwLock::default(),
920 compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
921
922 #[cfg(feature = "metrics")]
923 metrics,
924 };
925
926 Ok(Self(Arc::new(inner)))
927 }
928
929 fn create_new(config: Config) -> crate::Result<Self> {
931 use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
932 use std::fs::create_dir_all;
933
934 let path = config.path.clone();
935 log::trace!("Creating LSM-tree at {}", path.display());
936
937 create_dir_all(&path)?;
938
939 let manifest_path = path.join(MANIFEST_FILE);
940 assert!(!manifest_path.try_exists()?);
941
942 let table_folder_path = path.join(TABLES_FOLDER);
943 create_dir_all(&table_folder_path)?;
944
945 {
947 let mut writer = sfa::Writer::new_at_path(manifest_path)?;
948
949 Manifest {
950 version: FormatVersion::V3,
951 level_count: config.level_count,
952 tree_type: if config.kv_separation_opts.is_some() {
953 TreeType::Blob
954 } else {
955 TreeType::Standard
956 },
957 }
958 .encode_into(&mut writer)?;
959
960 writer.finish()?;
961 }
962
963 fsync_directory(&table_folder_path)?;
965 fsync_directory(&path)?;
966
967 let inner = TreeInner::create_new(config)?;
968 Ok(Self(Arc::new(inner)))
969 }
970
971 fn recover_levels<P: AsRef<Path>>(
973 tree_path: P,
974 tree_id: TreeId,
975 cache: &Arc<Cache>,
976 descriptor_table: &Arc<DescriptorTable>,
977 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
978 ) -> crate::Result<Version> {
979 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
980
981 let tree_path = tree_path.as_ref();
982
983 let recovery = recover(tree_path)?;
984
985 let table_map = {
986 let mut result: crate::HashMap<TableId, (u8 , Checksum)> =
987 crate::HashMap::default();
988
989 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
990 for run in table_ids {
991 for &(table_id, checksum) in run {
992 #[allow(clippy::expect_used)]
994 result.insert(
995 table_id,
996 (
997 level_idx
998 .try_into()
999 .expect("there are less than 256 levels"),
1000 checksum,
1001 ),
1002 );
1003 }
1004 }
1005 }
1006
1007 result
1008 };
1009
1010 let cnt = table_map.len();
1011
1012 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1013
1014 let progress_mod = match cnt {
1015 _ if cnt <= 20 => 1,
1016 _ if cnt <= 100 => 10,
1017 _ => 100,
1018 };
1019
1020 let mut tables = vec![];
1021
1022 let table_base_folder = tree_path.join(TABLES_FOLDER);
1023
1024 if !table_base_folder.try_exists()? {
1025 std::fs::create_dir_all(&table_base_folder)?;
1026 fsync_directory(&table_base_folder)?;
1027 }
1028
1029 let mut orphaned_tables = vec![];
1030
1031 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1032 let dirent = dirent?;
1033 let file_name = dirent.file_name();
1034
1035 if file_name == ".DS_Store" {
1037 continue;
1038 }
1039
1040 if file_name.to_string_lossy().starts_with("._") {
1042 continue;
1043 }
1044
1045 let table_file_name = file_name.to_str().ok_or_else(|| {
1046 log::error!("invalid table file name {}", file_name.display());
1047 crate::Error::Unrecoverable
1048 })?;
1049
1050 let table_file_path = dirent.path();
1051 assert!(!table_file_path.is_dir());
1052
1053 log::debug!("Recovering table from {}", table_file_path.display());
1054
1055 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1056 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1057 crate::Error::Unrecoverable
1058 })?;
1059
1060 if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1061 let table = Table::recover(
1062 table_file_path,
1063 checksum,
1064 tree_id,
1065 cache.clone(),
1066 descriptor_table.clone(),
1067 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1070 metrics.clone(),
1071 )?;
1072
1073 log::debug!("Recovered table from {:?}", table.path);
1074
1075 tables.push(table);
1076
1077 if idx % progress_mod == 0 {
1078 log::debug!("Recovered {idx}/{cnt} tables");
1079 }
1080 } else {
1081 orphaned_tables.push(table_file_path);
1082 }
1083 }
1084
1085 if tables.len() < cnt {
1086 log::error!(
1087 "Recovered less tables than expected: {:?}",
1088 table_map.keys(),
1089 );
1090 return Err(crate::Error::Unrecoverable);
1091 }
1092
1093 log::debug!("Successfully recovered {} tables", tables.len());
1094
1095 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1096 &tree_path.join(BLOBS_FOLDER),
1097 &recovery.blob_file_ids,
1098 )?;
1099
1100 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1101
1102 Self::cleanup_orphaned_version(tree_path, version.id())?;
1105
1106 for table_path in orphaned_tables {
1107 log::debug!("Deleting orphaned table {}", table_path.display());
1108 std::fs::remove_file(&table_path)?;
1109 }
1110
1111 for blob_file_path in orphaned_blob_files {
1112 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1113 std::fs::remove_file(&blob_file_path)?;
1114 }
1115
1116 Ok(version)
1117 }
1118
1119 fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1120 let version_str = format!("v{latest_version_id}");
1121
1122 for file in std::fs::read_dir(path)? {
1123 let dirent = file?;
1124
1125 if dirent.file_type()?.is_dir() {
1126 continue;
1127 }
1128
1129 let name = dirent.file_name();
1130
1131 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1132 log::trace!("Cleanup orphaned version {}", name.display());
1133 std::fs::remove_file(dirent.path())?;
1134 }
1135 }
1136
1137 Ok(())
1138 }
1139}