1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 manifest::Manifest,
16 memtable::Memtable,
17 slice::Slice,
18 table::Table,
19 value::InternalValue,
20 version::{recovery::recover, SuperVersion, SuperVersions, Version},
21 vlog::BlobFile,
22 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23 ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27 ops::{Bound, RangeBounds},
28 path::Path,
29 sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let (k, v) = self.0?;
44
45 if pred(&k) {
46 Ok((k, Some(v)))
47 } else {
48 Ok((k, None))
49 }
50 }
51
52 fn key(self) -> crate::Result<UserKey> {
53 self.0.map(|(k, _)| k)
54 }
55
56 fn size(self) -> crate::Result<u32> {
57 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58 self.into_inner().map(|(_, v)| v.len() as u32)
59 }
60
61 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62 self.0
63 }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67 if item.is_tombstone() {
68 None
69 } else {
70 Some(item)
71 }
72}
73
74#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79 type Target = TreeInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AbstractTree for Tree {
87 fn table_file_cache_size(&self) -> usize {
88 self.config.descriptor_table.len()
89 }
90
91 fn get_version_history_lock(
92 &self,
93 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94 self.version_history.write().expect("lock is poisoned")
95 }
96
97 fn next_table_id(&self) -> TableId {
98 self.0.table_id_counter.get()
99 }
100
101 fn id(&self) -> TreeId {
102 self.id
103 }
104
105 fn blob_file_count(&self) -> usize {
106 0
107 }
108
109 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110 let super_version = self
111 .version_history
112 .read()
113 .expect("lock is poisoned")
114 .get_version_for_snapshot(seqno);
115
116 Self::get_internal_entry_from_version(&super_version, key, seqno)
117 }
118
119 fn current_version(&self) -> Version {
120 self.version_history
121 .read()
122 .expect("poisoned")
123 .latest_version()
124 .version
125 }
126
127 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128 self.flush_lock.lock().expect("lock is poisoned")
129 }
130
131 #[cfg(feature = "metrics")]
132 fn metrics(&self) -> &Arc<crate::Metrics> {
133 &self.0.metrics
134 }
135
136 fn version_free_list_len(&self) -> usize {
137 self.version_history
138 .read()
139 .expect("lock is poisoned")
140 .free_list_len()
141 }
142
143 fn prefix<K: AsRef<[u8]>>(
144 &self,
145 prefix: K,
146 seqno: SeqNo,
147 index: Option<(Arc<Memtable>, SeqNo)>,
148 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149 Box::new(
150 self.create_prefix(&prefix, seqno, index)
151 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152 )
153 }
154
155 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156 &self,
157 range: R,
158 seqno: SeqNo,
159 index: Option<(Arc<Memtable>, SeqNo)>,
160 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161 Box::new(
162 self.create_range(&range, seqno, index)
163 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164 )
165 }
166
167 fn tombstone_count(&self) -> u64 {
169 self.current_version()
170 .iter_tables()
171 .map(Table::tombstone_count)
172 .sum()
173 }
174
175 fn weak_tombstone_count(&self) -> u64 {
177 self.current_version()
178 .iter_tables()
179 .map(Table::weak_tombstone_count)
180 .sum()
181 }
182
183 fn weak_tombstone_reclaimable_count(&self) -> u64 {
185 self.current_version()
186 .iter_tables()
187 .map(Table::weak_tombstone_reclaimable)
188 .sum()
189 }
190
191 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194 if is_empty {
195 return Ok(());
196 }
197
198 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200 let _lock = self
202 .0
203 .major_compaction_lock
204 .write()
205 .expect("lock is poisoned");
206
207 log::info!("Starting drop_range compaction");
208 self.inner_compact(strategy, 0)
209 }
210
211 #[doc(hidden)]
212 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
213 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
214
215 let _lock = self
217 .0
218 .major_compaction_lock
219 .write()
220 .expect("lock is poisoned");
221
222 log::info!("Starting major compaction");
223 self.inner_compact(strategy, seqno_threshold)
224 }
225
226 fn l0_run_count(&self) -> usize {
227 self.current_version()
228 .level(0)
229 .map(|x| x.run_count())
230 .unwrap_or_default()
231 }
232
233 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
234 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
235 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
236 }
237
238 fn filter_size(&self) -> usize {
239 self.current_version()
240 .iter_tables()
241 .map(Table::filter_size)
242 .sum()
243 }
244
245 fn pinned_filter_size(&self) -> usize {
246 self.current_version()
247 .iter_tables()
248 .map(Table::pinned_filter_size)
249 .sum()
250 }
251
252 fn pinned_block_index_size(&self) -> usize {
253 self.current_version()
254 .iter_tables()
255 .map(Table::pinned_block_index_size)
256 .sum()
257 }
258
259 fn sealed_memtable_count(&self) -> usize {
260 self.version_history
261 .read()
262 .expect("lock is poisoned")
263 .latest_version()
264 .sealed_memtables
265 .len()
266 }
267
268 fn flush_to_tables(
269 &self,
270 stream: impl Iterator<Item = crate::Result<InternalValue>>,
271 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
272 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
273 use std::time::Instant;
274
275 let start = Instant::now();
276
277 let folder = self.config.path.join(TABLES_FOLDER);
278
279 let data_block_size = self.config.data_block_size_policy.get(0);
280
281 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
282 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
283
284 let data_block_compression = self.config.data_block_compression_policy.get(0);
285 let index_block_compression = self.config.index_block_compression_policy.get(0);
286
287 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
288
289 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
290 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
291
292 log::debug!(
293 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
294 folder.display(),
295 );
296
297 let mut table_writer = MultiWriter::new(
298 folder.clone(),
299 self.table_id_counter.clone(),
300 64 * 1_024 * 1_024,
301 0,
302 )?
303 .use_data_block_restart_interval(data_block_restart_interval)
304 .use_index_block_restart_interval(index_block_restart_interval)
305 .use_data_block_compression(data_block_compression)
306 .use_index_block_compression(index_block_compression)
307 .use_data_block_size(data_block_size)
308 .use_data_block_hash_ratio(data_block_hash_ratio)
309 .use_bloom_policy({
310 use crate::config::FilterPolicyEntry::{Bloom, None};
311 use crate::table::filter::BloomConstructionPolicy;
312
313 match self.config.filter_policy.get(0) {
314 Bloom(policy) => policy,
315 None => BloomConstructionPolicy::BitsPerKey(0.0),
316 }
317 });
318
319 if index_partitioning {
320 table_writer = table_writer.use_partitioned_index();
321 }
322 if filter_partitioning {
323 table_writer = table_writer.use_partitioned_filter();
324 }
325
326 for item in stream {
327 table_writer.write(item?)?;
328 }
329
330 let result = table_writer.finish()?;
331
332 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
333
334 let pin_filter = self.config.filter_block_pinning_policy.get(0);
335 let pin_index = self.config.index_block_pinning_policy.get(0);
336
337 let tables = result
339 .into_iter()
340 .map(|(table_id, checksum)| -> crate::Result<Table> {
341 Table::recover(
342 folder.join(table_id.to_string()),
343 checksum,
344 0,
345 self.id,
346 self.config.cache.clone(),
347 self.config.descriptor_table.clone(),
348 pin_filter,
349 pin_index,
350 #[cfg(feature = "metrics")]
351 self.metrics.clone(),
352 )
353 })
354 .collect::<crate::Result<Vec<_>>>()?;
355
356 Ok(Some((tables, None)))
357 }
358
359 #[expect(clippy::significant_drop_tightening)]
360 fn register_tables(
361 &self,
362 tables: &[Table],
363 blob_files: Option<&[BlobFile]>,
364 frag_map: Option<crate::blob_tree::FragmentationMap>,
365 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
366 gc_watermark: SeqNo,
367 ) -> crate::Result<()> {
368 log::trace!(
369 "Registering {} tables, {} blob files",
370 tables.len(),
371 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
372 );
373
374 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
375 let mut version_lock = self.version_history.write().expect("lock is poisoned");
376
377 version_lock.upgrade_version(
378 &self.config.path,
379 |current| {
380 let mut copy = current.clone();
381
382 copy.version = copy.version.with_new_l0_run(
383 tables,
384 blob_files,
385 frag_map.filter(|x| !x.is_empty()),
386 );
387
388 for &table_id in sealed_memtables_to_delete {
389 log::trace!("releasing sealed memtable #{table_id}");
390 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
391 }
392
393 Ok(copy)
394 },
395 &self.config.seqno,
396 )?;
397
398 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
399 log::warn!("Version GC failed: {e:?}");
400 }
401
402 Ok(())
403 }
404
405 fn clear_active_memtable(&self) {
406 use crate::tree::sealed::SealedMemtables;
407
408 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
409 let super_version = version_history_lock.latest_version();
410
411 if super_version.active_memtable.is_empty() {
412 return;
413 }
414
415 let mut copy = version_history_lock.latest_version();
416 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
417 copy.sealed_memtables = Arc::new(SealedMemtables::default());
418
419 copy.seqno = super_version.seqno;
421
422 version_history_lock.append_version(copy);
423
424 log::trace!("cleared active memtable");
425 }
426
427 fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
428 let mut version_lock = self.version_history.write().expect("lock is poisoned");
429 version_lock.append_sealed_memtable(memtable);
430 }
431
432 fn compact(
433 &self,
434 strategy: Arc<dyn CompactionStrategy>,
435 seqno_threshold: SeqNo,
436 ) -> crate::Result<()> {
437 let _lock = self
441 .0
442 .major_compaction_lock
443 .read()
444 .expect("lock is poisoned");
445
446 self.inner_compact(strategy, seqno_threshold)
447 }
448
449 fn get_next_table_id(&self) -> TableId {
450 self.0.get_next_table_id()
451 }
452
453 fn tree_config(&self) -> &Config {
454 &self.config
455 }
456
457 fn active_memtable(&self) -> Arc<Memtable> {
458 self.version_history
459 .read()
460 .expect("lock is poisoned")
461 .latest_version()
462 .active_memtable
463 }
464
465 fn tree_type(&self) -> crate::TreeType {
466 crate::TreeType::Standard
467 }
468
469 #[expect(clippy::significant_drop_tightening)]
470 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
471 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
472 let super_version = version_history_lock.latest_version();
473
474 if super_version.active_memtable.is_empty() {
475 return None;
476 }
477
478 let yanked_memtable = super_version.active_memtable;
479
480 let mut copy = version_history_lock.latest_version();
481 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
482 copy.sealed_memtables =
483 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
484
485 copy.seqno = super_version.seqno;
487
488 version_history_lock.append_version(copy);
489
490 log::trace!(
491 "rotate: added memtable id={} to sealed memtables",
492 yanked_memtable.id,
493 );
494
495 Some(yanked_memtable)
496 }
497
498 fn table_count(&self) -> usize {
499 self.current_version().table_count()
500 }
501
502 fn level_table_count(&self, idx: usize) -> Option<usize> {
503 self.current_version().level(idx).map(|x| x.table_count())
504 }
505
506 fn approximate_len(&self) -> usize {
507 let super_version = self
508 .version_history
509 .read()
510 .expect("lock is poisoned")
511 .latest_version();
512
513 let tables_item_count = self
514 .current_version()
515 .iter_tables()
516 .map(|x| x.metadata.item_count)
517 .sum::<u64>();
518
519 let memtable_count = super_version.active_memtable.len() as u64;
520 let sealed_count = super_version
521 .sealed_memtables
522 .iter()
523 .map(|mt| mt.len())
524 .sum::<usize>() as u64;
525
526 (memtable_count + sealed_count + tables_item_count)
527 .try_into()
528 .expect("should not be too large")
529 }
530
531 fn disk_space(&self) -> u64 {
532 self.current_version()
533 .iter_levels()
534 .map(super::version::Level::size)
535 .sum()
536 }
537
538 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
539 let version = self
540 .version_history
541 .read()
542 .expect("lock is poisoned")
543 .latest_version();
544
545 let active = version.active_memtable.get_highest_seqno();
546
547 let sealed = version
548 .sealed_memtables
549 .iter()
550 .map(|mt| mt.get_highest_seqno())
551 .max()
552 .flatten();
553
554 active.max(sealed)
555 }
556
557 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
558 self.current_version()
559 .iter_tables()
560 .map(Table::get_highest_seqno)
561 .max()
562 }
563
564 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
565 Ok(self
566 .get_internal_entry(key.as_ref(), seqno)?
567 .map(|x| x.value))
568 }
569
570 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
571 &self,
572 key: K,
573 value: V,
574 seqno: SeqNo,
575 ) -> (u64, u64) {
576 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
577 self.append_entry(value)
578 }
579
580 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581 let value = InternalValue::new_tombstone(key, seqno);
582 self.append_entry(value)
583 }
584
585 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
586 let value = InternalValue::new_weak_tombstone(key, seqno);
587 self.append_entry(value)
588 }
589}
590
591impl Tree {
592 #[doc(hidden)]
593 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
594 version: SuperVersion,
595 range: &'a R,
596 seqno: SeqNo,
597 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
598 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
599 use crate::range::{IterState, TreeIter};
600 use std::ops::Bound::{self, Excluded, Included, Unbounded};
601
602 let lo: Bound<UserKey> = match range.start_bound() {
603 Included(x) => Included(x.as_ref().into()),
604 Excluded(x) => Excluded(x.as_ref().into()),
605 Unbounded => Unbounded,
606 };
607
608 let hi: Bound<UserKey> = match range.end_bound() {
609 Included(x) => Included(x.as_ref().into()),
610 Excluded(x) => Excluded(x.as_ref().into()),
611 Unbounded => Unbounded,
612 };
613
614 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
615
616 let iter_state = { IterState { version, ephemeral } };
617
618 TreeIter::create_range(iter_state, bounds, seqno)
619 }
620
621 pub(crate) fn get_internal_entry_from_version(
622 super_version: &SuperVersion,
623 key: &[u8],
624 seqno: SeqNo,
625 ) -> crate::Result<Option<InternalValue>> {
626 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
627 return Ok(ignore_tombstone_value(entry));
628 }
629
630 if let Some(entry) =
632 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
633 {
634 return Ok(ignore_tombstone_value(entry));
635 }
636
637 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
639 }
640
641 fn get_internal_entry_from_tables(
642 version: &Version,
643 key: &[u8],
644 seqno: SeqNo,
645 ) -> crate::Result<Option<InternalValue>> {
646 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
649
650 for level in version.iter_levels() {
651 for run in level.iter() {
652 if run.len() >= 4 {
654 if let Some(table) = run.get_for_key(key) {
655 if let Some(item) = table.get(key, seqno, key_hash)? {
656 return Ok(ignore_tombstone_value(item));
657 }
658 }
659 } else {
660 for table in run.iter() {
662 if !table.is_key_in_key_range(key) {
663 continue;
664 }
665
666 if let Some(item) = table.get(key, seqno, key_hash)? {
667 return Ok(ignore_tombstone_value(item));
668 }
669 }
670 }
671 }
672 }
673
674 Ok(None)
675 }
676
677 fn get_internal_entry_from_sealed_memtables(
678 super_version: &SuperVersion,
679 key: &[u8],
680 seqno: SeqNo,
681 ) -> Option<InternalValue> {
682 for mt in super_version.sealed_memtables.iter().rev() {
683 if let Some(entry) = mt.get(key, seqno) {
684 return Some(entry);
685 }
686 }
687
688 None
689 }
690
691 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
692 self.version_history
693 .read()
694 .expect("lock is poisoned")
695 .get_version_for_snapshot(seqno)
696 }
697
698 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
709 range: &R,
710 ) -> (OwnedBounds, bool) {
711 use Bound::{Excluded, Included, Unbounded};
712
713 let start = match range.start_bound() {
714 Included(key) => Included(Slice::from(key.as_ref())),
715 Excluded(key) => Excluded(Slice::from(key.as_ref())),
716 Unbounded => Unbounded,
717 };
718
719 let end = match range.end_bound() {
720 Included(key) => Included(Slice::from(key.as_ref())),
721 Excluded(key) => Excluded(Slice::from(key.as_ref())),
722 Unbounded => Unbounded,
723 };
724
725 let is_empty =
726 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
727 lo.as_ref() > hi.as_ref()
728 } else {
729 false
730 };
731
732 (OwnedBounds { start, end }, is_empty)
733 }
734
735 pub(crate) fn open(config: Config) -> crate::Result<Self> {
748 log::debug!("Opening LSM-tree at {}", config.path.display());
749
750 if config.path.join("version").try_exists()? {
752 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
753 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
754 }
755
756 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
757 Self::recover(config)
758 } else {
759 Self::create_new(config)
760 }?;
761
762 Ok(tree)
763 }
764
765 pub(crate) fn consume_writer(
766 &self,
767 writer: crate::table::Writer,
768 ) -> crate::Result<Option<Table>> {
769 let table_file_path = writer.path.clone();
770
771 let Some((_, checksum)) = writer.finish()? else {
772 return Ok(None);
773 };
774
775 log::debug!("Finalized table write at {}", table_file_path.display());
776
777 let pin_filter = self.config.filter_block_pinning_policy.get(0);
778 let pin_index = self.config.index_block_pinning_policy.get(0);
779
780 let created_table = Table::recover(
781 table_file_path,
782 checksum,
783 0,
784 self.id,
785 self.config.cache.clone(),
786 self.config.descriptor_table.clone(),
787 pin_filter,
788 pin_index,
789 #[cfg(feature = "metrics")]
790 self.metrics.clone(),
791 )?;
792
793 log::debug!("Flushed table to {:?}", created_table.path);
794
795 Ok(Some(created_table))
796 }
797
798 #[doc(hidden)]
800 #[must_use]
801 pub fn is_compacting(&self) -> bool {
802 !self
803 .compaction_state
804 .lock()
805 .expect("lock is poisoned")
806 .hidden_set()
807 .is_empty()
808 }
809
810 fn inner_compact(
811 &self,
812 strategy: Arc<dyn CompactionStrategy>,
813 mvcc_gc_watermark: SeqNo,
814 ) -> crate::Result<()> {
815 use crate::compaction::worker::{do_compaction, Options};
816
817 let mut opts = Options::from_tree(self, strategy);
818 opts.mvcc_gc_watermark = mvcc_gc_watermark;
819
820 do_compaction(&opts)?;
821
822 log::debug!("Compaction run over");
823
824 Ok(())
825 }
826
827 #[doc(hidden)]
828 #[must_use]
829 pub fn create_iter(
830 &self,
831 seqno: SeqNo,
832 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
833 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
834 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
835 }
836
837 #[doc(hidden)]
838 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
839 &self,
840 range: &'a R,
841 seqno: SeqNo,
842 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
843 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
844 let version_history_lock = self.version_history.read().expect("lock is poisoned");
845 let super_version = version_history_lock.get_version_for_snapshot(seqno);
846
847 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
848 Ok(kv) => Ok((kv.key.user_key, kv.value)),
849 Err(e) => Err(e),
850 })
851 }
852
853 #[doc(hidden)]
854 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
855 &self,
856 prefix: K,
857 seqno: SeqNo,
858 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
859 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
860 use crate::range::prefix_to_range;
861
862 let range = prefix_to_range(prefix.as_ref());
863 self.create_range(&range, seqno, ephemeral)
864 }
865
866 #[doc(hidden)]
870 #[must_use]
871 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
872 self.version_history
873 .read()
874 .expect("lock is poisoned")
875 .latest_version()
876 .active_memtable
877 .insert(value)
878 }
879
880 fn recover(mut config: Config) -> crate::Result<Self> {
886 use crate::stop_signal::StopSignal;
887 use inner::get_next_tree_id;
888
889 log::info!("Recovering LSM-tree at {}", config.path.display());
890
891 let tree_id = get_next_tree_id();
902
903 #[cfg(feature = "metrics")]
904 let metrics = Arc::new(Metrics::default());
905
906 let version = Self::recover_levels(
907 &config.path,
908 tree_id,
909 &config,
910 #[cfg(feature = "metrics")]
911 &metrics,
912 )?;
913
914 {
915 let manifest_path = config.path.join(format!("v{}", version.id()));
916 let reader = sfa::Reader::new(&manifest_path)?;
917 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
918
919 if manifest.version != FormatVersion::V3 {
920 return Err(crate::Error::InvalidVersion(manifest.version.into()));
921 }
922
923 let requested_tree_type = match config.kv_separation_opts {
924 Some(_) => crate::TreeType::Blob,
925 None => crate::TreeType::Standard,
926 };
927
928 if version.tree_type() != requested_tree_type {
929 log::error!(
930 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
931 version.tree_type(),
932 );
933 return Err(crate::Error::Unrecoverable);
934 }
935
936 config.level_count = manifest.level_count;
938 }
939
940 let highest_table_id = version
941 .iter_tables()
942 .map(Table::id)
943 .max()
944 .unwrap_or_default();
945
946 let inner = TreeInner {
947 id: tree_id,
948 memtable_id_counter: SequenceNumberCounter::default(),
949 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
950 blob_file_id_counter: SequenceNumberCounter::default(),
951 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
952 stop_signal: StopSignal::default(),
953 config,
954 major_compaction_lock: RwLock::default(),
955 flush_lock: Mutex::default(),
956 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
957
958 #[cfg(feature = "metrics")]
959 metrics,
960 };
961
962 Ok(Self(Arc::new(inner)))
963 }
964
965 fn create_new(config: Config) -> crate::Result<Self> {
967 use crate::file::{fsync_directory, TABLES_FOLDER};
968 use std::fs::create_dir_all;
969
970 let path = config.path.clone();
971 log::trace!("Creating LSM-tree at {}", path.display());
972
973 create_dir_all(&path)?;
974
975 let table_folder_path = path.join(TABLES_FOLDER);
976 create_dir_all(&table_folder_path)?;
977
978 fsync_directory(&table_folder_path)?;
998 fsync_directory(&path)?;
999
1000 let inner = TreeInner::create_new(config)?;
1001 Ok(Self(Arc::new(inner)))
1002 }
1003
1004 fn recover_levels<P: AsRef<Path>>(
1006 tree_path: P,
1007 tree_id: TreeId,
1008 config: &Config,
1009 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1010 ) -> crate::Result<Version> {
1011 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1012
1013 let tree_path = tree_path.as_ref();
1014
1015 let recovery = recover(tree_path)?;
1016
1017 let table_map = {
1018 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
1019 crate::HashMap::default();
1020
1021 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1022 for run in table_ids {
1023 for table in run {
1024 #[expect(
1025 clippy::expect_used,
1026 reason = "there are always less than 256 levels"
1027 )]
1028 result.insert(
1029 table.id,
1030 (
1031 level_idx
1032 .try_into()
1033 .expect("there are less than 256 levels"),
1034 table.checksum,
1035 table.global_seqno,
1036 ),
1037 );
1038 }
1039 }
1040 }
1041
1042 result
1043 };
1044
1045 let cnt = table_map.len();
1046
1047 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1048
1049 let progress_mod = match cnt {
1050 _ if cnt <= 20 => 1,
1051 _ if cnt <= 100 => 10,
1052 _ => 100,
1053 };
1054
1055 let mut tables = vec![];
1056
1057 let table_base_folder = tree_path.join(TABLES_FOLDER);
1058
1059 if !table_base_folder.try_exists()? {
1060 std::fs::create_dir_all(&table_base_folder)?;
1061 fsync_directory(&table_base_folder)?;
1062 }
1063
1064 let mut orphaned_tables = vec![];
1065
1066 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1067 let dirent = dirent?;
1068 let file_name = dirent.file_name();
1069
1070 if file_name == ".DS_Store" {
1072 continue;
1073 }
1074
1075 if file_name.to_string_lossy().starts_with("._") {
1077 continue;
1078 }
1079
1080 let table_file_name = file_name.to_str().ok_or_else(|| {
1081 log::error!("invalid table file name {}", file_name.display());
1082 crate::Error::Unrecoverable
1083 })?;
1084
1085 let table_file_path = dirent.path();
1086 assert!(!table_file_path.is_dir());
1087
1088 log::debug!("Recovering table from {}", table_file_path.display());
1089
1090 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1091 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1092 crate::Error::Unrecoverable
1093 })?;
1094
1095 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1096 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1097 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1098
1099 let table = Table::recover(
1100 table_file_path,
1101 checksum,
1102 global_seqno,
1103 tree_id,
1104 config.cache.clone(),
1105 config.descriptor_table.clone(),
1106 pin_filter,
1107 pin_index,
1108 #[cfg(feature = "metrics")]
1109 metrics.clone(),
1110 )?;
1111
1112 log::debug!("Recovered table from {:?}", table.path);
1113
1114 tables.push(table);
1115
1116 if idx % progress_mod == 0 {
1117 log::debug!("Recovered {idx}/{cnt} tables");
1118 }
1119 } else {
1120 orphaned_tables.push(table_file_path);
1121 }
1122 }
1123
1124 if tables.len() < cnt {
1125 log::error!(
1126 "Recovered less tables than expected: {:?}",
1127 table_map.keys(),
1128 );
1129 return Err(crate::Error::Unrecoverable);
1130 }
1131
1132 log::debug!("Successfully recovered {} tables", tables.len());
1133
1134 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1135 &tree_path.join(crate::file::BLOBS_FOLDER),
1136 &recovery.blob_file_ids,
1137 )?;
1138
1139 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1140
1141 Self::cleanup_orphaned_version(tree_path, version.id())?;
1144
1145 for table_path in orphaned_tables {
1146 log::debug!("Deleting orphaned table {}", table_path.display());
1147 std::fs::remove_file(&table_path)?;
1148 }
1149
1150 for blob_file_path in orphaned_blob_files {
1151 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1152 std::fs::remove_file(&blob_file_path)?;
1153 }
1154
1155 Ok(version)
1156 }
1157
1158 fn cleanup_orphaned_version(
1159 path: &Path,
1160 latest_version_id: crate::version::VersionId,
1161 ) -> crate::Result<()> {
1162 let version_str = format!("v{latest_version_id}");
1163
1164 for file in std::fs::read_dir(path)? {
1165 let dirent = file?;
1166
1167 if dirent.file_type()?.is_dir() {
1168 continue;
1169 }
1170
1171 let name = dirent.file_name();
1172
1173 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1174 log::trace!("Cleanup orphaned version {}", name.display());
1175 std::fs::remove_file(dirent.path())?;
1176 }
1177 }
1178
1179 Ok(())
1180 }
1181}