1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 manifest::Manifest,
16 memtable::Memtable,
17 slice::Slice,
18 table::Table,
19 value::InternalValue,
20 version::{recovery::recover, SuperVersion, SuperVersions, Version},
21 vlog::BlobFile,
22 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23 ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27 ops::{Bound, RangeBounds},
28 path::Path,
29 sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let (k, v) = self.0?;
44
45 if pred(&k) {
46 Ok((k, Some(v)))
47 } else {
48 Ok((k, None))
49 }
50 }
51
52 fn key(self) -> crate::Result<UserKey> {
53 self.0.map(|(k, _)| k)
54 }
55
56 fn size(self) -> crate::Result<u32> {
57 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58 self.into_inner().map(|(_, v)| v.len() as u32)
59 }
60
61 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62 self.0
63 }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67 if item.is_tombstone() {
68 None
69 } else {
70 Some(item)
71 }
72}
73
74#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79 type Target = TreeInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AbstractTree for Tree {
87 fn table_file_cache_size(&self) -> usize {
88 self.config.descriptor_table.len()
89 }
90
91 fn get_version_history_lock(
92 &self,
93 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94 self.version_history.write().expect("lock is poisoned")
95 }
96
97 fn next_table_id(&self) -> TableId {
98 self.0.table_id_counter.get()
99 }
100
101 fn id(&self) -> TreeId {
102 self.id
103 }
104
105 fn blob_file_count(&self) -> usize {
106 0
107 }
108
109 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110 let super_version = self
111 .version_history
112 .read()
113 .expect("lock is poisoned")
114 .get_version_for_snapshot(seqno);
115
116 Self::get_internal_entry_from_version(&super_version, key, seqno)
117 }
118
119 fn current_version(&self) -> Version {
120 self.version_history
121 .read()
122 .expect("poisoned")
123 .latest_version()
124 .version
125 }
126
127 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128 self.flush_lock.lock().expect("lock is poisoned")
129 }
130
131 #[cfg(feature = "metrics")]
132 fn metrics(&self) -> &Arc<crate::Metrics> {
133 &self.0.metrics
134 }
135
136 fn version_free_list_len(&self) -> usize {
137 self.version_history
138 .read()
139 .expect("lock is poisoned")
140 .free_list_len()
141 }
142
143 fn prefix<K: AsRef<[u8]>>(
144 &self,
145 prefix: K,
146 seqno: SeqNo,
147 index: Option<(Arc<Memtable>, SeqNo)>,
148 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149 Box::new(
150 self.create_prefix(&prefix, seqno, index)
151 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152 )
153 }
154
155 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156 &self,
157 range: R,
158 seqno: SeqNo,
159 index: Option<(Arc<Memtable>, SeqNo)>,
160 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161 Box::new(
162 self.create_range(&range, seqno, index)
163 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164 )
165 }
166
167 fn tombstone_count(&self) -> u64 {
169 self.current_version()
170 .iter_tables()
171 .map(Table::tombstone_count)
172 .sum()
173 }
174
175 fn weak_tombstone_count(&self) -> u64 {
177 self.current_version()
178 .iter_tables()
179 .map(Table::weak_tombstone_count)
180 .sum()
181 }
182
183 fn weak_tombstone_reclaimable_count(&self) -> u64 {
185 self.current_version()
186 .iter_tables()
187 .map(Table::weak_tombstone_reclaimable)
188 .sum()
189 }
190
191 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194 if is_empty {
195 return Ok(());
196 }
197
198 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200 let _lock = self
202 .0
203 .major_compaction_lock
204 .write()
205 .expect("lock is poisoned");
206
207 log::info!("Starting drop_range compaction");
208 self.inner_compact(strategy, 0)
209 }
210
211 fn clear(&self) -> crate::Result<()> {
212 let mut versions = self.get_version_history_lock();
213
214 versions.upgrade_version(
215 &self.config.path,
216 |v| {
217 let mut copy = v.clone();
218 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
219 copy.sealed_memtables = Arc::default();
220 copy.version = Version::new(v.version.id() + 1, self.tree_type());
221 Ok(copy)
222 },
223 &self.config.seqno,
224 &self.config.visible_seqno,
225 )
226 }
227
228 #[doc(hidden)]
229 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
230 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
231
232 let _lock = self
234 .0
235 .major_compaction_lock
236 .write()
237 .expect("lock is poisoned");
238
239 log::info!("Starting major compaction");
240 self.inner_compact(strategy, seqno_threshold)
241 }
242
243 fn l0_run_count(&self) -> usize {
244 self.current_version()
245 .level(0)
246 .map(|x| x.run_count())
247 .unwrap_or_default()
248 }
249
250 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
251 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
252 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
253 }
254
255 fn filter_size(&self) -> u64 {
256 self.current_version()
257 .iter_tables()
258 .map(Table::filter_size)
259 .map(u64::from)
260 .sum()
261 }
262
263 fn pinned_filter_size(&self) -> usize {
264 self.current_version()
265 .iter_tables()
266 .map(Table::pinned_filter_size)
267 .sum()
268 }
269
270 fn pinned_block_index_size(&self) -> usize {
271 self.current_version()
272 .iter_tables()
273 .map(Table::pinned_block_index_size)
274 .sum()
275 }
276
277 fn sealed_memtable_count(&self) -> usize {
278 self.version_history
279 .read()
280 .expect("lock is poisoned")
281 .latest_version()
282 .sealed_memtables
283 .len()
284 }
285
286 fn flush_to_tables(
287 &self,
288 stream: impl Iterator<Item = crate::Result<InternalValue>>,
289 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
290 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
291 use std::time::Instant;
292
293 let start = Instant::now();
294
295 let folder = self.config.path.join(TABLES_FOLDER);
296
297 let data_block_size = self.config.data_block_size_policy.get(0);
298
299 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
300 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
301
302 let data_block_compression = self.config.data_block_compression_policy.get(0);
303 let index_block_compression = self.config.index_block_compression_policy.get(0);
304
305 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
306
307 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
308 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
309
310 log::debug!(
311 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
312 folder.display(),
313 );
314
315 let mut table_writer = MultiWriter::new(
316 folder.clone(),
317 self.table_id_counter.clone(),
318 64 * 1_024 * 1_024,
319 0,
320 )?
321 .use_data_block_restart_interval(data_block_restart_interval)
322 .use_index_block_restart_interval(index_block_restart_interval)
323 .use_data_block_compression(data_block_compression)
324 .use_index_block_compression(index_block_compression)
325 .use_data_block_size(data_block_size)
326 .use_data_block_hash_ratio(data_block_hash_ratio)
327 .use_bloom_policy({
328 use crate::config::FilterPolicyEntry::{Bloom, None};
329 use crate::table::filter::BloomConstructionPolicy;
330
331 match self.config.filter_policy.get(0) {
332 Bloom(policy) => policy,
333 None => BloomConstructionPolicy::BitsPerKey(0.0),
334 }
335 });
336
337 if index_partitioning {
338 table_writer = table_writer.use_partitioned_index();
339 }
340 if filter_partitioning {
341 table_writer = table_writer.use_partitioned_filter();
342 }
343
344 for item in stream {
345 table_writer.write(item?)?;
346 }
347
348 let result = table_writer.finish()?;
349
350 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
351
352 let pin_filter = self.config.filter_block_pinning_policy.get(0);
353 let pin_index = self.config.index_block_pinning_policy.get(0);
354
355 let tables = result
357 .into_iter()
358 .map(|(table_id, checksum)| -> crate::Result<Table> {
359 Table::recover(
360 folder.join(table_id.to_string()),
361 checksum,
362 0,
363 self.id,
364 self.config.cache.clone(),
365 self.config.descriptor_table.clone(),
366 pin_filter,
367 pin_index,
368 #[cfg(feature = "metrics")]
369 self.metrics.clone(),
370 )
371 })
372 .collect::<crate::Result<Vec<_>>>()?;
373
374 Ok(Some((tables, None)))
375 }
376
377 #[expect(clippy::significant_drop_tightening)]
378 fn register_tables(
379 &self,
380 tables: &[Table],
381 blob_files: Option<&[BlobFile]>,
382 frag_map: Option<crate::blob_tree::FragmentationMap>,
383 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
384 gc_watermark: SeqNo,
385 ) -> crate::Result<()> {
386 log::trace!(
387 "Registering {} tables, {} blob files",
388 tables.len(),
389 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
390 );
391
392 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
393 let mut version_lock = self.version_history.write().expect("lock is poisoned");
394
395 version_lock.upgrade_version(
396 &self.config.path,
397 |current| {
398 let mut copy = current.clone();
399
400 copy.version = copy.version.with_new_l0_run(
401 tables,
402 blob_files,
403 frag_map.filter(|x| !x.is_empty()),
404 );
405
406 for &table_id in sealed_memtables_to_delete {
407 log::trace!("releasing sealed memtable #{table_id}");
408 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
409 }
410
411 Ok(copy)
412 },
413 &self.config.seqno,
414 &self.config.visible_seqno,
415 )?;
416
417 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
418 log::warn!("Version GC failed: {e:?}");
419 }
420
421 Ok(())
422 }
423
424 fn clear_active_memtable(&self) {
425 use crate::tree::sealed::SealedMemtables;
426
427 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
428 let super_version = version_history_lock.latest_version();
429
430 if super_version.active_memtable.is_empty() {
431 return;
432 }
433
434 let mut copy = version_history_lock.latest_version();
435 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
436 copy.sealed_memtables = Arc::new(SealedMemtables::default());
437
438 copy.seqno = super_version.seqno;
440
441 version_history_lock.replace_latest_version(copy);
442
443 log::trace!("cleared active memtable");
444 }
445
446 fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
447 let mut version_lock = self.version_history.write().expect("lock is poisoned");
448 version_lock.append_sealed_memtable(memtable);
449 }
450
451 fn compact(
452 &self,
453 strategy: Arc<dyn CompactionStrategy>,
454 seqno_threshold: SeqNo,
455 ) -> crate::Result<()> {
456 let _lock = self
460 .0
461 .major_compaction_lock
462 .read()
463 .expect("lock is poisoned");
464
465 self.inner_compact(strategy, seqno_threshold)
466 }
467
468 fn get_next_table_id(&self) -> TableId {
469 self.0.get_next_table_id()
470 }
471
472 fn tree_config(&self) -> &Config {
473 &self.config
474 }
475
476 fn active_memtable(&self) -> Arc<Memtable> {
477 self.version_history
478 .read()
479 .expect("lock is poisoned")
480 .latest_version()
481 .active_memtable
482 }
483
484 fn tree_type(&self) -> crate::TreeType {
485 crate::TreeType::Standard
486 }
487
488 #[expect(clippy::significant_drop_tightening)]
489 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
490 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
491 let super_version = version_history_lock.latest_version();
492
493 if super_version.active_memtable.is_empty() {
494 return None;
495 }
496
497 let yanked_memtable = super_version.active_memtable;
498
499 let mut copy = version_history_lock.latest_version();
500 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
501 copy.sealed_memtables =
502 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
503
504 copy.seqno = super_version.seqno;
506
507 version_history_lock.replace_latest_version(copy);
508
509 log::trace!(
510 "rotate: added memtable id={} to sealed memtables",
511 yanked_memtable.id,
512 );
513
514 Some(yanked_memtable)
515 }
516
517 fn table_count(&self) -> usize {
518 self.current_version().table_count()
519 }
520
521 fn level_table_count(&self, idx: usize) -> Option<usize> {
522 self.current_version().level(idx).map(|x| x.table_count())
523 }
524
525 fn approximate_len(&self) -> usize {
526 let super_version = self
527 .version_history
528 .read()
529 .expect("lock is poisoned")
530 .latest_version();
531
532 let tables_item_count = self
533 .current_version()
534 .iter_tables()
535 .map(|x| x.metadata.item_count)
536 .sum::<u64>();
537
538 let memtable_count = super_version.active_memtable.len() as u64;
539 let sealed_count = super_version
540 .sealed_memtables
541 .iter()
542 .map(|mt| mt.len())
543 .sum::<usize>() as u64;
544
545 (memtable_count + sealed_count + tables_item_count)
546 .try_into()
547 .expect("should not be too large")
548 }
549
550 fn disk_space(&self) -> u64 {
551 self.current_version()
552 .iter_levels()
553 .map(super::version::Level::size)
554 .sum()
555 }
556
557 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
558 let version = self
559 .version_history
560 .read()
561 .expect("lock is poisoned")
562 .latest_version();
563
564 let active = version.active_memtable.get_highest_seqno();
565
566 let sealed = version
567 .sealed_memtables
568 .iter()
569 .map(|mt| mt.get_highest_seqno())
570 .max()
571 .flatten();
572
573 active.max(sealed)
574 }
575
576 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
577 self.current_version()
578 .iter_tables()
579 .map(Table::get_highest_seqno)
580 .max()
581 }
582
583 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
584 Ok(self
585 .get_internal_entry(key.as_ref(), seqno)?
586 .map(|x| x.value))
587 }
588
589 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
590 &self,
591 key: K,
592 value: V,
593 seqno: SeqNo,
594 ) -> (u64, u64) {
595 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
596 self.append_entry(value)
597 }
598
599 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
600 let value = InternalValue::new_tombstone(key, seqno);
601 self.append_entry(value)
602 }
603
604 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
605 let value = InternalValue::new_weak_tombstone(key, seqno);
606 self.append_entry(value)
607 }
608}
609
610impl Tree {
611 #[doc(hidden)]
612 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
613 version: SuperVersion,
614 range: &'a R,
615 seqno: SeqNo,
616 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
617 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
618 use crate::range::{IterState, TreeIter};
619 use std::ops::Bound::{self, Excluded, Included, Unbounded};
620
621 let lo: Bound<UserKey> = match range.start_bound() {
622 Included(x) => Included(x.as_ref().into()),
623 Excluded(x) => Excluded(x.as_ref().into()),
624 Unbounded => Unbounded,
625 };
626
627 let hi: Bound<UserKey> = match range.end_bound() {
628 Included(x) => Included(x.as_ref().into()),
629 Excluded(x) => Excluded(x.as_ref().into()),
630 Unbounded => Unbounded,
631 };
632
633 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
634
635 let iter_state = { IterState { version, ephemeral } };
636
637 TreeIter::create_range(iter_state, bounds, seqno)
638 }
639
640 pub(crate) fn get_internal_entry_from_version(
641 super_version: &SuperVersion,
642 key: &[u8],
643 seqno: SeqNo,
644 ) -> crate::Result<Option<InternalValue>> {
645 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
646 return Ok(ignore_tombstone_value(entry));
647 }
648
649 if let Some(entry) =
651 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
652 {
653 return Ok(ignore_tombstone_value(entry));
654 }
655
656 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
658 }
659
660 fn get_internal_entry_from_tables(
661 version: &Version,
662 key: &[u8],
663 seqno: SeqNo,
664 ) -> crate::Result<Option<InternalValue>> {
665 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
668
669 for level in version.iter_levels() {
670 for run in level.iter() {
671 if run.len() >= 4 {
673 if let Some(table) = run.get_for_key(key) {
674 if let Some(item) = table.get(key, seqno, key_hash)? {
675 return Ok(ignore_tombstone_value(item));
676 }
677 }
678 } else {
679 for table in run.iter() {
681 if !table.is_key_in_key_range(key) {
682 continue;
683 }
684
685 if let Some(item) = table.get(key, seqno, key_hash)? {
686 return Ok(ignore_tombstone_value(item));
687 }
688 }
689 }
690 }
691 }
692
693 Ok(None)
694 }
695
696 fn get_internal_entry_from_sealed_memtables(
697 super_version: &SuperVersion,
698 key: &[u8],
699 seqno: SeqNo,
700 ) -> Option<InternalValue> {
701 for mt in super_version.sealed_memtables.iter().rev() {
702 if let Some(entry) = mt.get(key, seqno) {
703 return Some(entry);
704 }
705 }
706
707 None
708 }
709
710 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
711 self.version_history
712 .read()
713 .expect("lock is poisoned")
714 .get_version_for_snapshot(seqno)
715 }
716
717 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
728 range: &R,
729 ) -> (OwnedBounds, bool) {
730 use Bound::{Excluded, Included, Unbounded};
731
732 let start = match range.start_bound() {
733 Included(key) => Included(Slice::from(key.as_ref())),
734 Excluded(key) => Excluded(Slice::from(key.as_ref())),
735 Unbounded => Unbounded,
736 };
737
738 let end = match range.end_bound() {
739 Included(key) => Included(Slice::from(key.as_ref())),
740 Excluded(key) => Excluded(Slice::from(key.as_ref())),
741 Unbounded => Unbounded,
742 };
743
744 let is_empty =
745 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
746 lo.as_ref() > hi.as_ref()
747 } else {
748 false
749 };
750
751 (OwnedBounds { start, end }, is_empty)
752 }
753
754 pub(crate) fn open(config: Config) -> crate::Result<Self> {
767 log::debug!("Opening LSM-tree at {}", config.path.display());
768
769 if config.path.join("version").try_exists()? {
771 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
772 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
773 }
774
775 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
776 Self::recover(config)
777 } else {
778 Self::create_new(config)
779 }?;
780
781 Ok(tree)
782 }
783
784 pub(crate) fn consume_writer(
785 &self,
786 writer: crate::table::Writer,
787 ) -> crate::Result<Option<Table>> {
788 let table_file_path = writer.path.clone();
789
790 let Some((_, checksum)) = writer.finish()? else {
791 return Ok(None);
792 };
793
794 log::debug!("Finalized table write at {}", table_file_path.display());
795
796 let pin_filter = self.config.filter_block_pinning_policy.get(0);
797 let pin_index = self.config.index_block_pinning_policy.get(0);
798
799 let created_table = Table::recover(
800 table_file_path,
801 checksum,
802 0,
803 self.id,
804 self.config.cache.clone(),
805 self.config.descriptor_table.clone(),
806 pin_filter,
807 pin_index,
808 #[cfg(feature = "metrics")]
809 self.metrics.clone(),
810 )?;
811
812 log::debug!("Flushed table to {:?}", created_table.path);
813
814 Ok(Some(created_table))
815 }
816
817 #[doc(hidden)]
819 #[must_use]
820 pub fn is_compacting(&self) -> bool {
821 !self
822 .compaction_state
823 .lock()
824 .expect("lock is poisoned")
825 .hidden_set()
826 .is_empty()
827 }
828
829 fn inner_compact(
830 &self,
831 strategy: Arc<dyn CompactionStrategy>,
832 mvcc_gc_watermark: SeqNo,
833 ) -> crate::Result<()> {
834 use crate::compaction::worker::{do_compaction, Options};
835
836 let mut opts = Options::from_tree(self, strategy);
837 opts.mvcc_gc_watermark = mvcc_gc_watermark;
838
839 do_compaction(&opts)?;
840
841 log::debug!("Compaction run over");
842
843 Ok(())
844 }
845
846 #[doc(hidden)]
847 #[must_use]
848 pub fn create_iter(
849 &self,
850 seqno: SeqNo,
851 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
852 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
854 }
855
856 #[doc(hidden)]
857 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
858 &self,
859 range: &'a R,
860 seqno: SeqNo,
861 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
862 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
863 let version_history_lock = self.version_history.read().expect("lock is poisoned");
864 let super_version = version_history_lock.get_version_for_snapshot(seqno);
865
866 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
867 Ok(kv) => Ok((kv.key.user_key, kv.value)),
868 Err(e) => Err(e),
869 })
870 }
871
872 #[doc(hidden)]
873 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
874 &self,
875 prefix: K,
876 seqno: SeqNo,
877 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
878 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
879 use crate::range::prefix_to_range;
880
881 let range = prefix_to_range(prefix.as_ref());
882 self.create_range(&range, seqno, ephemeral)
883 }
884
885 #[doc(hidden)]
889 #[must_use]
890 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
891 self.version_history
892 .read()
893 .expect("lock is poisoned")
894 .latest_version()
895 .active_memtable
896 .insert(value)
897 }
898
899 fn recover(mut config: Config) -> crate::Result<Self> {
905 use crate::stop_signal::StopSignal;
906 use inner::get_next_tree_id;
907
908 log::info!("Recovering LSM-tree at {}", config.path.display());
909
910 let tree_id = get_next_tree_id();
921
922 #[cfg(feature = "metrics")]
923 let metrics = Arc::new(Metrics::default());
924
925 let version = Self::recover_levels(
926 &config.path,
927 tree_id,
928 &config,
929 #[cfg(feature = "metrics")]
930 &metrics,
931 )?;
932
933 {
934 let manifest_path = config.path.join(format!("v{}", version.id()));
935 let reader = sfa::Reader::new(&manifest_path)?;
936 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
937
938 if manifest.version != FormatVersion::V3 {
939 return Err(crate::Error::InvalidVersion(manifest.version.into()));
940 }
941
942 let requested_tree_type = match config.kv_separation_opts {
943 Some(_) => crate::TreeType::Blob,
944 None => crate::TreeType::Standard,
945 };
946
947 if version.tree_type() != requested_tree_type {
948 log::error!(
949 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
950 version.tree_type(),
951 );
952 return Err(crate::Error::Unrecoverable);
953 }
954
955 config.level_count = manifest.level_count;
957 }
958
959 let highest_table_id = version
960 .iter_tables()
961 .map(Table::id)
962 .max()
963 .unwrap_or_default();
964
965 let inner = TreeInner {
966 id: tree_id,
967 memtable_id_counter: SequenceNumberCounter::default(),
968 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
969 blob_file_id_counter: SequenceNumberCounter::default(),
970 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
971 stop_signal: StopSignal::default(),
972 config,
973 major_compaction_lock: RwLock::default(),
974 flush_lock: Mutex::default(),
975 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
976
977 #[cfg(feature = "metrics")]
978 metrics,
979 };
980
981 Ok(Self(Arc::new(inner)))
982 }
983
984 fn create_new(config: Config) -> crate::Result<Self> {
986 use crate::file::{fsync_directory, TABLES_FOLDER};
987 use std::fs::create_dir_all;
988
989 let path = config.path.clone();
990 log::trace!("Creating LSM-tree at {}", path.display());
991
992 create_dir_all(&path)?;
993
994 let table_folder_path = path.join(TABLES_FOLDER);
995 create_dir_all(&table_folder_path)?;
996
997 fsync_directory(&table_folder_path)?;
999 fsync_directory(&path)?;
1000
1001 let inner = TreeInner::create_new(config)?;
1002 Ok(Self(Arc::new(inner)))
1003 }
1004
1005 fn recover_levels<P: AsRef<Path>>(
1007 tree_path: P,
1008 tree_id: TreeId,
1009 config: &Config,
1010 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1011 ) -> crate::Result<Version> {
1012 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1013
1014 let tree_path = tree_path.as_ref();
1015
1016 let recovery = recover(tree_path)?;
1017
1018 let table_map = {
1019 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
1020 crate::HashMap::default();
1021
1022 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1023 for run in table_ids {
1024 for table in run {
1025 #[expect(
1026 clippy::expect_used,
1027 reason = "there are always less than 256 levels"
1028 )]
1029 result.insert(
1030 table.id,
1031 (
1032 level_idx
1033 .try_into()
1034 .expect("there are less than 256 levels"),
1035 table.checksum,
1036 table.global_seqno,
1037 ),
1038 );
1039 }
1040 }
1041 }
1042
1043 result
1044 };
1045
1046 let cnt = table_map.len();
1047
1048 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1049
1050 let progress_mod = match cnt {
1051 _ if cnt <= 20 => 1,
1052 _ if cnt <= 100 => 10,
1053 _ => 100,
1054 };
1055
1056 let mut tables = vec![];
1057
1058 let table_base_folder = tree_path.join(TABLES_FOLDER);
1059
1060 if !table_base_folder.try_exists()? {
1061 std::fs::create_dir_all(&table_base_folder)?;
1062 fsync_directory(&table_base_folder)?;
1063 }
1064
1065 let mut orphaned_tables = vec![];
1066
1067 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1068 let dirent = dirent?;
1069 let file_name = dirent.file_name();
1070
1071 if file_name == ".DS_Store" {
1073 continue;
1074 }
1075
1076 if file_name.to_string_lossy().starts_with("._") {
1078 continue;
1079 }
1080
1081 let table_file_name = file_name.to_str().ok_or_else(|| {
1082 log::error!("invalid table file name {}", file_name.display());
1083 crate::Error::Unrecoverable
1084 })?;
1085
1086 let table_file_path = dirent.path();
1087 assert!(!table_file_path.is_dir());
1088
1089 log::debug!("Recovering table from {}", table_file_path.display());
1090
1091 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1092 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1093 crate::Error::Unrecoverable
1094 })?;
1095
1096 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1097 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1098 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1099
1100 let table = Table::recover(
1101 table_file_path,
1102 checksum,
1103 global_seqno,
1104 tree_id,
1105 config.cache.clone(),
1106 config.descriptor_table.clone(),
1107 pin_filter,
1108 pin_index,
1109 #[cfg(feature = "metrics")]
1110 metrics.clone(),
1111 )?;
1112
1113 log::debug!("Recovered table from {:?}", table.path);
1114
1115 tables.push(table);
1116
1117 if idx % progress_mod == 0 {
1118 log::debug!("Recovered {idx}/{cnt} tables");
1119 }
1120 } else {
1121 orphaned_tables.push(table_file_path);
1122 }
1123 }
1124
1125 if tables.len() < cnt {
1126 log::error!(
1127 "Recovered less tables than expected: {:?}",
1128 table_map.keys(),
1129 );
1130 return Err(crate::Error::Unrecoverable);
1131 }
1132
1133 log::debug!("Successfully recovered {} tables", tables.len());
1134
1135 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1136 &tree_path.join(crate::file::BLOBS_FOLDER),
1137 &recovery.blob_file_ids,
1138 )?;
1139
1140 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1141
1142 Self::cleanup_orphaned_version(tree_path, version.id())?;
1145
1146 for table_path in orphaned_tables {
1147 log::debug!("Deleting orphaned table {}", table_path.display());
1148 std::fs::remove_file(&table_path)?;
1149 }
1150
1151 for blob_file_path in orphaned_blob_files {
1152 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1153 std::fs::remove_file(&blob_file_path)?;
1154 }
1155
1156 Ok(version)
1157 }
1158
1159 fn cleanup_orphaned_version(
1160 path: &Path,
1161 latest_version_id: crate::version::VersionId,
1162 ) -> crate::Result<()> {
1163 let version_str = format!("v{latest_version_id}");
1164
1165 for file in std::fs::read_dir(path)? {
1166 let dirent = file?;
1167
1168 if dirent.file_type()?.is_dir() {
1169 continue;
1170 }
1171
1172 let name = dirent.file_name();
1173
1174 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1175 log::trace!("Cleanup orphaned version {}", name.display());
1176 std::fs::remove_file(dirent.path())?;
1177 }
1178 }
1179
1180 Ok(())
1181 }
1182}