1pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10 blob_tree::FragmentationMap,
11 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12 config::Config,
13 file::BLOBS_FOLDER,
14 format_version::FormatVersion,
15 iter_guard::{IterGuard, IterGuardImpl},
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::Table,
20 tree::inner::SuperVersion,
21 value::InternalValue,
22 version::{recovery::recover, Version, VersionId},
23 vlog::BlobFile,
24 AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
25 TreeType, UserKey, UserValue, ValueType,
26};
27use inner::{MemtableId, TreeId, TreeInner};
28use std::{
29 ops::{Bound, RangeBounds},
30 path::Path,
31 sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
32};
33
34#[cfg(feature = "metrics")]
35use crate::metrics::Metrics;
36
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40 fn key(self) -> crate::Result<UserKey> {
41 self.0.map(|(k, _)| k)
42 }
43
44 fn size(self) -> crate::Result<u32> {
45 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
46 self.into_inner().map(|(_, v)| v.len() as u32)
47 }
48
49 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
50 self.0
51 }
52}
53
54fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
55 if item.is_tombstone() {
56 None
57 } else {
58 Some(item)
59 }
60}
61
62#[derive(Clone)]
64pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
65
66impl std::ops::Deref for Tree {
67 type Target = TreeInner;
68
69 fn deref(&self) -> &Self::Target {
70 &self.0
71 }
72}
73
74impl AbstractTree for Tree {
75 fn next_table_id(&self) -> TableId {
76 self.0
77 .table_id_counter
78 .load(std::sync::atomic::Ordering::Relaxed)
79 }
80
81 fn id(&self) -> TreeId {
82 self.id
83 }
84
85 #[expect(clippy::significant_drop_tightening)]
86 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
87 let version_lock = self.super_version.read().expect("lock is poisoned");
88
89 if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
90 return Ok(ignore_tombstone_value(entry));
91 }
92
93 if let Some(entry) =
95 Self::get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
96 {
97 return Ok(ignore_tombstone_value(entry));
98 }
99
100 Self::get_internal_entry_from_tables(&version_lock, key, seqno)
102 }
103
104 fn current_version(&self) -> Version {
105 self.super_version.read().expect("poisoned").version.clone()
106 }
107
108 fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
109 log::debug!("Flushing active memtable");
110
111 let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
112 return Ok(None);
113 };
114
115 let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
116 else {
117 return Ok(None);
118 };
119 self.register_tables(std::slice::from_ref(&table), None, None, seqno_threshold)?;
120
121 Ok(Some(table))
122 }
123
124 #[cfg(feature = "metrics")]
125 fn metrics(&self) -> &Arc<crate::Metrics> {
126 &self.0.metrics
127 }
128
129 fn version_free_list_len(&self) -> usize {
130 self.compaction_state
131 .lock()
132 .expect("lock is poisoned")
133 .version_free_list_len()
134 }
135
136 fn prefix<K: AsRef<[u8]>>(
137 &self,
138 prefix: K,
139 seqno: SeqNo,
140 index: Option<Arc<Memtable>>,
141 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
142 Box::new(
143 self.create_prefix(&prefix, seqno, index)
144 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
145 )
146 }
147
148 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
149 &self,
150 range: R,
151 seqno: SeqNo,
152 index: Option<Arc<Memtable>>,
153 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
154 Box::new(
155 self.create_range(&range, seqno, index)
156 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
157 )
158 }
159
160 fn tombstone_count(&self) -> u64 {
162 self.current_version()
163 .iter_tables()
164 .map(Table::tombstone_count)
165 .sum()
166 }
167
168 fn weak_tombstone_count(&self) -> u64 {
170 self.current_version()
171 .iter_tables()
172 .map(Table::weak_tombstone_count)
173 .sum()
174 }
175
176 fn weak_tombstone_reclaimable_count(&self) -> u64 {
178 self.current_version()
179 .iter_tables()
180 .map(Table::weak_tombstone_reclaimable)
181 .sum()
182 }
183
184 fn ingest(
185 &self,
186 iter: impl Iterator<Item = (UserKey, UserValue)>,
187 seqno_generator: &SequenceNumberCounter,
188 visible_seqno: &SequenceNumberCounter,
189 ) -> crate::Result<()> {
190 use crate::tree::ingest::Ingestion;
191 use std::time::Instant;
192
193 let seqno = seqno_generator.next();
197
198 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
209
210 let start = Instant::now();
211 let mut count = 0;
212 let mut last_key = None;
213
214 #[expect(clippy::explicit_counter_loop)]
215 for (key, value) in iter {
216 if let Some(last_key) = &last_key {
217 assert!(
218 key > last_key,
219 "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
220 );
221 }
222 last_key = Some(key.clone());
223
224 writer.write(key, value)?;
225
226 count += 1;
227 }
228
229 writer.finish()?;
230
231 visible_seqno.fetch_max(seqno + 1);
232
233 log::info!("Ingested {count} items in {:?}", start.elapsed());
234
235 Ok(())
236 }
237
238 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
239 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
240
241 if is_empty {
242 return Ok(());
243 }
244
245 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
246
247 let _lock = self
249 .0
250 .major_compaction_lock
251 .write()
252 .expect("lock is poisoned");
253
254 log::info!("Starting drop_range compaction");
255 self.inner_compact(strategy, 0)
256 }
257
258 #[doc(hidden)]
259 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
260 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
261
262 let _lock = self
264 .0
265 .major_compaction_lock
266 .write()
267 .expect("lock is poisoned");
268
269 log::info!("Starting major compaction");
270 self.inner_compact(strategy, seqno_threshold)
271 }
272
273 fn l0_run_count(&self) -> usize {
274 self.current_version()
275 .level(0)
276 .map(|x| x.run_count())
277 .unwrap_or_default()
278 }
279
280 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
281 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
282 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
283 }
284
285 fn filter_size(&self) -> usize {
286 self.current_version()
287 .iter_tables()
288 .map(Table::filter_size)
289 .sum()
290 }
291
292 fn pinned_filter_size(&self) -> usize {
293 self.current_version()
294 .iter_tables()
295 .map(Table::pinned_filter_size)
296 .sum()
297 }
298
299 fn pinned_block_index_size(&self) -> usize {
300 self.current_version()
301 .iter_tables()
302 .map(Table::pinned_block_index_size)
303 .sum()
304 }
305
306 fn sealed_memtable_count(&self) -> usize {
307 self.super_version
308 .read()
309 .expect("lock is poisoned")
310 .sealed_memtables
311 .len()
312 }
313
314 fn flush_memtable(
315 &self,
316 table_id: TableId,
317 memtable: &Arc<Memtable>,
318 seqno_threshold: SeqNo,
319 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
320 use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
321 use std::time::Instant;
322
323 let start = Instant::now();
324
325 let folder = self.config.path.join(TABLES_FOLDER);
326 let table_file_path = folder.join(table_id.to_string());
327
328 let data_block_size = self.config.data_block_size_policy.get(0);
329
330 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
331 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
332
333 let data_block_compression = self.config.data_block_compression_policy.get(0);
334 let index_block_compression = self.config.index_block_compression_policy.get(0);
335
336 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
337
338 let index_partioning = self.config.index_block_partitioning_policy.get(0);
339 let filter_partioning = self.config.filter_block_partitioning_policy.get(0);
340
341 log::debug!(
342 "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
343 table_file_path.display(),
344 );
345
346 let mut table_writer = Writer::new(table_file_path, table_id, 0)?
347 .use_data_block_restart_interval(data_block_restart_interval)
348 .use_index_block_restart_interval(index_block_restart_interval)
349 .use_data_block_compression(data_block_compression)
350 .use_index_block_compression(index_block_compression)
351 .use_data_block_size(data_block_size)
352 .use_data_block_hash_ratio(data_block_hash_ratio)
353 .use_bloom_policy({
354 use crate::config::FilterPolicyEntry::{Bloom, None};
355 use crate::table::filter::BloomConstructionPolicy;
356
357 match self.config.filter_policy.get(0) {
358 Bloom(policy) => policy,
359 None => BloomConstructionPolicy::BitsPerKey(0.0),
360 }
361 });
362
363 if index_partioning {
364 table_writer = table_writer.use_partitioned_index();
365 }
366 if filter_partioning {
367 table_writer = table_writer.use_partitioned_filter();
368 }
369
370 let iter = memtable.iter().map(Ok);
371 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
372
373 for item in compaction_filter {
374 table_writer.write(item?)?;
375 }
376
377 let result = self.consume_writer(table_writer)?;
378
379 log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
380
381 Ok(result.map(|table| (table, None)))
382 }
383
384 #[expect(clippy::significant_drop_tightening)]
385 fn register_tables(
386 &self,
387 tables: &[Table],
388 blob_files: Option<&[BlobFile]>,
389 frag_map: Option<FragmentationMap>,
390 seqno_threshold: SeqNo,
391 ) -> crate::Result<()> {
392 log::trace!(
393 "Registering {} tables, {} blob files",
394 tables.len(),
395 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
396 );
397
398 let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
399 let mut super_version = self.super_version.write().expect("lock is poisoned");
400
401 compaction_state.upgrade_version(
402 &mut super_version,
403 |version| {
404 Ok(version.with_new_l0_run(tables, blob_files, frag_map.filter(|x| !x.is_empty())))
405 },
406 seqno_threshold,
407 )?;
408
409 for table in tables {
410 log::trace!("releasing sealed memtable {}", table.id());
411
412 super_version.sealed_memtables =
413 Arc::new(super_version.sealed_memtables.remove(table.id()));
414 }
415
416 Ok(())
417 }
418
419 fn clear_active_memtable(&self) {
420 self.super_version
421 .write()
422 .expect("lock is poisoned")
423 .active_memtable = Arc::new(Memtable::default());
424 }
425
426 fn set_active_memtable(&self, memtable: Memtable) {
427 let mut version_lock = self.super_version.write().expect("lock is poisoned");
428 version_lock.active_memtable = Arc::new(memtable);
429 }
430
431 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
432 let mut version_lock = self.super_version.write().expect("lock is poisoned");
433 version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
434 }
435
436 fn compact(
437 &self,
438 strategy: Arc<dyn CompactionStrategy>,
439 seqno_threshold: SeqNo,
440 ) -> crate::Result<()> {
441 let _lock = self
445 .0
446 .major_compaction_lock
447 .read()
448 .expect("lock is poisoned");
449
450 self.inner_compact(strategy, seqno_threshold)
451 }
452
453 fn get_next_table_id(&self) -> TableId {
454 self.0.get_next_table_id()
455 }
456
457 fn tree_config(&self) -> &Config {
458 &self.config
459 }
460
461 fn active_memtable_size(&self) -> u64 {
462 use std::sync::atomic::Ordering::Acquire;
463
464 self.super_version
465 .read()
466 .expect("lock is poisoned")
467 .active_memtable
468 .approximate_size
469 .load(Acquire)
470 }
471
472 fn tree_type(&self) -> crate::TreeType {
473 crate::TreeType::Standard
474 }
475
476 #[expect(clippy::significant_drop_tightening)]
477 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
478 let mut version_lock = self.super_version.write().expect("lock is poisoned");
479
480 if version_lock.active_memtable.is_empty() {
481 return None;
482 }
483
484 let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
485 let yanked_memtable = yanked_memtable;
486
487 let tmp_memtable_id = self.get_next_table_id();
488
489 version_lock.sealed_memtables = Arc::new(
490 version_lock
491 .sealed_memtables
492 .add(tmp_memtable_id, yanked_memtable.clone()),
493 );
494
495 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
496
497 Some((tmp_memtable_id, yanked_memtable))
498 }
499
500 fn table_count(&self) -> usize {
501 self.current_version().table_count()
502 }
503
504 fn level_table_count(&self, idx: usize) -> Option<usize> {
505 self.current_version().level(idx).map(|x| x.table_count())
506 }
507
508 #[expect(clippy::significant_drop_tightening)]
509 fn approximate_len(&self) -> usize {
510 let version = self.super_version.read().expect("lock is poisoned");
511
512 let tables_item_count = self
513 .current_version()
514 .iter_tables()
515 .map(|x| x.metadata.item_count)
516 .sum::<u64>();
517
518 let memtable_count = version.active_memtable.len() as u64;
519 let sealed_count = version
520 .sealed_memtables
521 .iter()
522 .map(|(_, mt)| mt.len())
523 .sum::<usize>() as u64;
524
525 (memtable_count + sealed_count + tables_item_count)
526 .try_into()
527 .expect("should not be too large")
528 }
529
530 fn disk_space(&self) -> u64 {
531 self.current_version()
532 .iter_levels()
533 .map(super::version::Level::size)
534 .sum()
535 }
536
537 #[expect(clippy::significant_drop_tightening)]
538 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
539 let version = self.super_version.read().expect("lock is poisoned");
540
541 let active = version.active_memtable.get_highest_seqno();
542
543 let sealed = version
544 .sealed_memtables
545 .iter()
546 .map(|(_, table)| table.get_highest_seqno())
547 .max()
548 .flatten();
549
550 active.max(sealed)
551 }
552
553 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
554 self.current_version()
555 .iter_tables()
556 .map(Table::get_highest_seqno)
557 .max()
558 }
559
560 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
561 Ok(self
562 .get_internal_entry(key.as_ref(), seqno)?
563 .map(|x| x.value))
564 }
565
566 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
567 &self,
568 key: K,
569 value: V,
570 seqno: SeqNo,
571 ) -> (u64, u64) {
572 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
573 self.append_entry(value)
574 }
575
576 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
577 let value = InternalValue::new_tombstone(key, seqno);
578 self.append_entry(value)
579 }
580
581 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
582 let value = InternalValue::new_weak_tombstone(key, seqno);
583 self.append_entry(value)
584 }
585}
586
587impl Tree {
588 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
599 range: &R,
600 ) -> (OwnedBounds, bool) {
601 use Bound::{Excluded, Included, Unbounded};
602
603 let start = match range.start_bound() {
604 Included(key) => Included(Slice::from(key.as_ref())),
605 Excluded(key) => Excluded(Slice::from(key.as_ref())),
606 Unbounded => Unbounded,
607 };
608
609 let end = match range.end_bound() {
610 Included(key) => Included(Slice::from(key.as_ref())),
611 Excluded(key) => Excluded(Slice::from(key.as_ref())),
612 Unbounded => Unbounded,
613 };
614
615 let is_empty =
616 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
617 lo.as_ref() > hi.as_ref()
618 } else {
619 false
620 };
621
622 (OwnedBounds { start, end }, is_empty)
623 }
624
625 pub(crate) fn open(config: Config) -> crate::Result<Self> {
638 use crate::file::MANIFEST_FILE;
639
640 log::debug!("Opening LSM-tree at {}", config.path.display());
641
642 if config.path.join("version").try_exists()? {
644 return Err(crate::Error::InvalidVersion(FormatVersion::V1));
645 }
646
647 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
648 Self::recover(config)
649 } else {
650 Self::create_new(config)
651 }?;
652
653 Ok(tree)
654 }
655
656 pub(crate) fn consume_writer(
657 &self,
658 writer: crate::table::Writer,
659 ) -> crate::Result<Option<Table>> {
660 let table_file_path = writer.path.clone();
661
662 let Some((_, checksum)) = writer.finish()? else {
663 return Ok(None);
664 };
665
666 log::debug!("Finalized table write at {}", table_file_path.display());
667
668 let pin_filter = self.config.filter_block_pinning_policy.get(0);
669 let pin_index = self.config.filter_block_pinning_policy.get(0);
670
671 let created_table = Table::recover(
672 table_file_path,
673 checksum,
674 self.id,
675 self.config.cache.clone(),
676 self.config.descriptor_table.clone(),
677 pin_filter,
678 pin_index,
679 #[cfg(feature = "metrics")]
680 self.metrics.clone(),
681 )?;
682
683 log::debug!("Flushed table to {:?}", created_table.path);
684
685 Ok(Some(created_table))
686 }
687
688 #[doc(hidden)]
690 #[must_use]
691 pub fn is_compacting(&self) -> bool {
692 !self
693 .compaction_state
694 .lock()
695 .expect("lock is poisoned")
696 .hidden_set()
697 .is_empty()
698 }
699
700 fn get_internal_entry_from_sealed_memtables(
701 super_version: &SuperVersion,
702 key: &[u8],
703 seqno: SeqNo,
704 ) -> Option<InternalValue> {
705 for (_, memtable) in super_version.sealed_memtables.iter().rev() {
706 if let Some(entry) = memtable.get(key, seqno) {
707 return Some(entry);
708 }
709 }
710
711 None
712 }
713
714 fn get_internal_entry_from_tables(
715 super_version: &SuperVersion,
716 key: &[u8],
717 seqno: SeqNo,
718 ) -> crate::Result<Option<InternalValue>> {
719 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
722
723 for level in super_version.version.iter_levels() {
724 for run in level.iter() {
725 if run.len() >= 4 {
727 if let Some(table) = run.get_for_key(key) {
728 if let Some(item) = table.get(key, seqno, key_hash)? {
729 return Ok(ignore_tombstone_value(item));
730 }
731 }
732 } else {
733 for table in run.iter() {
735 if !table.is_key_in_key_range(key) {
736 continue;
737 }
738
739 if let Some(item) = table.get(key, seqno, key_hash)? {
740 return Ok(ignore_tombstone_value(item));
741 }
742 }
743 }
744 }
745 }
746
747 Ok(None)
748 }
749
750 fn inner_compact(
751 &self,
752 strategy: Arc<dyn CompactionStrategy>,
753 seqno_threshold: SeqNo,
754 ) -> crate::Result<()> {
755 use crate::compaction::worker::{do_compaction, Options};
756
757 let mut opts = Options::from_tree(self, strategy);
758 opts.eviction_seqno = seqno_threshold;
759
760 do_compaction(&opts)?;
761
762 log::debug!("Compaction run over");
763
764 Ok(())
765 }
766
767 #[doc(hidden)]
768 #[must_use]
769 pub fn create_iter(
770 &self,
771 seqno: SeqNo,
772 ephemeral: Option<Arc<Memtable>>,
773 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
774 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
775 }
776
777 #[doc(hidden)]
778 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
779 &'a self,
780 range: &'a R,
781 seqno: SeqNo,
782 ephemeral: Option<Arc<Memtable>>,
783 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
784 use crate::range::{IterState, TreeIter};
785 use std::ops::Bound::{self, Excluded, Included, Unbounded};
786
787 let lo: Bound<UserKey> = match range.start_bound() {
788 Included(x) => Included(x.as_ref().into()),
789 Excluded(x) => Excluded(x.as_ref().into()),
790 Unbounded => Unbounded,
791 };
792
793 let hi: Bound<UserKey> = match range.end_bound() {
794 Included(x) => Included(x.as_ref().into()),
795 Excluded(x) => Excluded(x.as_ref().into()),
796 Unbounded => Unbounded,
797 };
798
799 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
800
801 let super_version = self.super_version.write().expect("lock is poisoned");
802
803 let iter_state = {
804 let active = &super_version.active_memtable;
805 let sealed = &super_version.sealed_memtables;
806
807 IterState {
808 active: active.clone(),
809 sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
810 ephemeral,
811 version: super_version.version.clone(),
812 }
813 };
814
815 TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
816 }
817
818 #[doc(hidden)]
819 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
820 &self,
821 range: &'a R,
822 seqno: SeqNo,
823 ephemeral: Option<Arc<Memtable>>,
824 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
825 self.create_internal_range(range, seqno, ephemeral)
826 .map(|item| match item {
827 Ok(kv) => Ok((kv.key.user_key, kv.value)),
828 Err(e) => Err(e),
829 })
830 }
831
832 #[doc(hidden)]
833 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
834 &self,
835 prefix: K,
836 seqno: SeqNo,
837 ephemeral: Option<Arc<Memtable>>,
838 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839 use crate::range::prefix_to_range;
840
841 let range = prefix_to_range(prefix.as_ref());
842 self.create_range(&range, seqno, ephemeral)
843 }
844
845 #[doc(hidden)]
849 #[must_use]
850 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
851 self.super_version
852 .read()
853 .expect("lock is poisoned")
854 .active_memtable
855 .insert(value)
856 }
857
858 fn recover(mut config: Config) -> crate::Result<Self> {
864 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
865 use inner::get_next_tree_id;
866
867 log::info!("Recovering LSM-tree at {}", config.path.display());
868
869 let manifest = {
870 let manifest_path = config.path.join(MANIFEST_FILE);
871 let reader = sfa::Reader::new(&manifest_path)?;
872 Manifest::decode_from(&manifest_path, &reader)?
873 };
874
875 if manifest.version != FormatVersion::V3 {
876 return Err(crate::Error::InvalidVersion(manifest.version));
877 }
878
879 config.level_count = manifest.level_count;
881
882 let tree_id = get_next_tree_id();
883
884 #[cfg(feature = "metrics")]
885 let metrics = Arc::new(Metrics::default());
886
887 let version = Self::recover_levels(
888 &config.path,
889 tree_id,
890 &config.cache,
891 &config.descriptor_table,
892 #[cfg(feature = "metrics")]
893 &metrics,
894 )?;
895
896 let highest_table_id = version
897 .iter_tables()
898 .map(Table::id)
899 .max()
900 .unwrap_or_default();
901
902 let path = config.path.clone();
903
904 let inner = TreeInner {
905 id: tree_id,
906 table_id_counter: Arc::new(AtomicU64::new(highest_table_id + 1)),
907 blob_file_id_generator: SequenceNumberCounter::default(),
908 super_version: Arc::new(RwLock::new(SuperVersion {
909 active_memtable: Arc::default(),
910 sealed_memtables: Arc::default(),
911 version,
912 })),
913 stop_signal: StopSignal::default(),
914 config,
915 major_compaction_lock: RwLock::default(),
916 compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
917
918 #[cfg(feature = "metrics")]
919 metrics,
920 };
921
922 Ok(Self(Arc::new(inner)))
923 }
924
925 fn create_new(config: Config) -> crate::Result<Self> {
927 use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
928 use std::fs::create_dir_all;
929
930 let path = config.path.clone();
931 log::trace!("Creating LSM-tree at {}", path.display());
932
933 create_dir_all(&path)?;
934
935 let manifest_path = path.join(MANIFEST_FILE);
936 assert!(!manifest_path.try_exists()?);
937
938 let table_folder_path = path.join(TABLES_FOLDER);
939 create_dir_all(&table_folder_path)?;
940
941 {
943 let mut writer = sfa::Writer::new_at_path(manifest_path)?;
944
945 Manifest {
946 version: FormatVersion::V3,
947 level_count: config.level_count,
948 tree_type: if config.kv_separation_opts.is_some() {
949 TreeType::Blob
950 } else {
951 TreeType::Standard
952 },
953 }
954 .encode_into(&mut writer)?;
955
956 writer.finish()?;
957 }
958
959 fsync_directory(&table_folder_path)?;
961 fsync_directory(&path)?;
962
963 let inner = TreeInner::create_new(config)?;
964 Ok(Self(Arc::new(inner)))
965 }
966
967 fn recover_levels<P: AsRef<Path>>(
969 tree_path: P,
970 tree_id: TreeId,
971 cache: &Arc<Cache>,
972 descriptor_table: &Arc<DescriptorTable>,
973 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
974 ) -> crate::Result<Version> {
975 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
976
977 let tree_path = tree_path.as_ref();
978
979 let recovery = recover(tree_path)?;
980
981 let table_map = {
982 let mut result: crate::HashMap<TableId, (u8 , Checksum)> =
983 crate::HashMap::default();
984
985 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
986 for run in table_ids {
987 for &(table_id, checksum) in run {
988 #[expect(
989 clippy::expect_used,
990 reason = "there are always less than 256 levels"
991 )]
992 result.insert(
993 table_id,
994 (
995 level_idx
996 .try_into()
997 .expect("there are less than 256 levels"),
998 checksum,
999 ),
1000 );
1001 }
1002 }
1003 }
1004
1005 result
1006 };
1007
1008 let cnt = table_map.len();
1009
1010 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1011
1012 let progress_mod = match cnt {
1013 _ if cnt <= 20 => 1,
1014 _ if cnt <= 100 => 10,
1015 _ => 100,
1016 };
1017
1018 let mut tables = vec![];
1019
1020 let table_base_folder = tree_path.join(TABLES_FOLDER);
1021
1022 if !table_base_folder.try_exists()? {
1023 std::fs::create_dir_all(&table_base_folder)?;
1024 fsync_directory(&table_base_folder)?;
1025 }
1026
1027 let mut orphaned_tables = vec![];
1028
1029 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1030 let dirent = dirent?;
1031 let file_name = dirent.file_name();
1032
1033 if file_name == ".DS_Store" {
1035 continue;
1036 }
1037
1038 if file_name.to_string_lossy().starts_with("._") {
1040 continue;
1041 }
1042
1043 let table_file_name = file_name.to_str().ok_or_else(|| {
1044 log::error!("invalid table file name {}", file_name.display());
1045 crate::Error::Unrecoverable
1046 })?;
1047
1048 let table_file_path = dirent.path();
1049 assert!(!table_file_path.is_dir());
1050
1051 log::debug!("Recovering table from {}", table_file_path.display());
1052
1053 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1054 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1055 crate::Error::Unrecoverable
1056 })?;
1057
1058 if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1059 let table = Table::recover(
1060 table_file_path,
1061 checksum,
1062 tree_id,
1063 cache.clone(),
1064 descriptor_table.clone(),
1065 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1068 metrics.clone(),
1069 )?;
1070
1071 log::debug!("Recovered table from {:?}", table.path);
1072
1073 tables.push(table);
1074
1075 if idx % progress_mod == 0 {
1076 log::debug!("Recovered {idx}/{cnt} tables");
1077 }
1078 } else {
1079 orphaned_tables.push(table_file_path);
1080 }
1081 }
1082
1083 if tables.len() < cnt {
1084 log::error!(
1085 "Recovered less tables than expected: {:?}",
1086 table_map.keys(),
1087 );
1088 return Err(crate::Error::Unrecoverable);
1089 }
1090
1091 log::debug!("Successfully recovered {} tables", tables.len());
1092
1093 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1094 &tree_path.join(BLOBS_FOLDER),
1095 &recovery.blob_file_ids,
1096 )?;
1097
1098 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1099
1100 Self::cleanup_orphaned_version(tree_path, version.id())?;
1103
1104 for table_path in orphaned_tables {
1105 log::debug!("Deleting orphaned table {}", table_path.display());
1106 std::fs::remove_file(&table_path)?;
1107 }
1108
1109 for blob_file_path in orphaned_blob_files {
1110 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1111 std::fs::remove_file(&blob_file_path)?;
1112 }
1113
1114 Ok(version)
1115 }
1116
1117 fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1118 let version_str = format!("v{latest_version_id}");
1119
1120 for file in std::fs::read_dir(path)? {
1121 let dirent = file?;
1122
1123 if dirent.file_type()?.is_dir() {
1124 continue;
1125 }
1126
1127 let name = dirent.file_name();
1128
1129 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1130 log::trace!("Cleanup orphaned version {}", name.display());
1131 std::fs::remove_file(dirent.path())?;
1132 }
1133 }
1134
1135 Ok(())
1136 }
1137}