1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 manifest::Manifest,
16 memtable::Memtable,
17 slice::Slice,
18 table::Table,
19 value::InternalValue,
20 version::{recovery::recover, SuperVersion, SuperVersions, Version},
21 vlog::BlobFile,
22 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23 ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27 ops::{Bound, RangeBounds},
28 path::Path,
29 sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let (k, v) = self.0?;
44
45 if pred(&k) {
46 Ok((k, Some(v)))
47 } else {
48 Ok((k, None))
49 }
50 }
51
52 fn key(self) -> crate::Result<UserKey> {
53 self.0.map(|(k, _)| k)
54 }
55
56 fn size(self) -> crate::Result<u32> {
57 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58 self.into_inner().map(|(_, v)| v.len() as u32)
59 }
60
61 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62 self.0
63 }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67 if item.is_tombstone() {
68 None
69 } else {
70 Some(item)
71 }
72}
73
74#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79 type Target = TreeInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AbstractTree for Tree {
87 fn table_file_cache_size(&self) -> usize {
88 self.config.descriptor_table.len()
89 }
90
91 fn get_version_history_lock(
92 &self,
93 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94 self.version_history.write().expect("lock is poisoned")
95 }
96
97 fn next_table_id(&self) -> TableId {
98 self.0.table_id_counter.get()
99 }
100
101 fn id(&self) -> TreeId {
102 self.id
103 }
104
105 fn blob_file_count(&self) -> usize {
106 0
107 }
108
109 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110 let super_version = self
111 .version_history
112 .read()
113 .expect("lock is poisoned")
114 .get_version_for_snapshot(seqno);
115
116 Self::get_internal_entry_from_version(&super_version, key, seqno)
117 }
118
119 fn current_version(&self) -> Version {
120 self.version_history
121 .read()
122 .expect("poisoned")
123 .latest_version()
124 .version
125 }
126
127 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128 self.flush_lock.lock().expect("lock is poisoned")
129 }
130
131 #[cfg(feature = "metrics")]
132 fn metrics(&self) -> &Arc<crate::Metrics> {
133 &self.0.metrics
134 }
135
136 fn version_free_list_len(&self) -> usize {
137 self.version_history
138 .read()
139 .expect("lock is poisoned")
140 .free_list_len()
141 }
142
143 fn prefix<K: AsRef<[u8]>>(
144 &self,
145 prefix: K,
146 seqno: SeqNo,
147 index: Option<(Arc<Memtable>, SeqNo)>,
148 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149 Box::new(
150 self.create_prefix(&prefix, seqno, index)
151 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152 )
153 }
154
155 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156 &self,
157 range: R,
158 seqno: SeqNo,
159 index: Option<(Arc<Memtable>, SeqNo)>,
160 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161 Box::new(
162 self.create_range(&range, seqno, index)
163 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164 )
165 }
166
167 fn tombstone_count(&self) -> u64 {
169 self.current_version()
170 .iter_tables()
171 .map(Table::tombstone_count)
172 .sum()
173 }
174
175 fn weak_tombstone_count(&self) -> u64 {
177 self.current_version()
178 .iter_tables()
179 .map(Table::weak_tombstone_count)
180 .sum()
181 }
182
183 fn weak_tombstone_reclaimable_count(&self) -> u64 {
185 self.current_version()
186 .iter_tables()
187 .map(Table::weak_tombstone_reclaimable)
188 .sum()
189 }
190
191 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194 if is_empty {
195 return Ok(());
196 }
197
198 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200 let _lock = self
202 .0
203 .major_compaction_lock
204 .write()
205 .expect("lock is poisoned");
206
207 log::info!("Starting drop_range compaction");
208 self.inner_compact(strategy, 0)
209 }
210
211 #[doc(hidden)]
212 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
213 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
214
215 let _lock = self
217 .0
218 .major_compaction_lock
219 .write()
220 .expect("lock is poisoned");
221
222 log::info!("Starting major compaction");
223 self.inner_compact(strategy, seqno_threshold)
224 }
225
226 fn l0_run_count(&self) -> usize {
227 self.current_version()
228 .level(0)
229 .map(|x| x.run_count())
230 .unwrap_or_default()
231 }
232
233 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
234 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
235 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
236 }
237
238 fn filter_size(&self) -> usize {
239 self.current_version()
240 .iter_tables()
241 .map(Table::filter_size)
242 .sum()
243 }
244
245 fn pinned_filter_size(&self) -> usize {
246 self.current_version()
247 .iter_tables()
248 .map(Table::pinned_filter_size)
249 .sum()
250 }
251
252 fn pinned_block_index_size(&self) -> usize {
253 self.current_version()
254 .iter_tables()
255 .map(Table::pinned_block_index_size)
256 .sum()
257 }
258
259 fn sealed_memtable_count(&self) -> usize {
260 self.version_history
261 .read()
262 .expect("lock is poisoned")
263 .latest_version()
264 .sealed_memtables
265 .len()
266 }
267
268 fn flush_to_tables(
269 &self,
270 stream: impl Iterator<Item = crate::Result<InternalValue>>,
271 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
272 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
273 use std::time::Instant;
274
275 let start = Instant::now();
276
277 let folder = self.config.path.join(TABLES_FOLDER);
278
279 let data_block_size = self.config.data_block_size_policy.get(0);
280
281 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
282 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
283
284 let data_block_compression = self.config.data_block_compression_policy.get(0);
285 let index_block_compression = self.config.index_block_compression_policy.get(0);
286
287 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
288
289 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
290 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
291
292 log::debug!(
293 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
294 folder.display(),
295 );
296
297 let mut table_writer = MultiWriter::new(
298 folder.clone(),
299 self.table_id_counter.clone(),
300 64 * 1_024 * 1_024,
301 0,
302 )?
303 .use_data_block_restart_interval(data_block_restart_interval)
304 .use_index_block_restart_interval(index_block_restart_interval)
305 .use_data_block_compression(data_block_compression)
306 .use_index_block_compression(index_block_compression)
307 .use_data_block_size(data_block_size)
308 .use_data_block_hash_ratio(data_block_hash_ratio)
309 .use_bloom_policy({
310 use crate::config::FilterPolicyEntry::{Bloom, None};
311 use crate::table::filter::BloomConstructionPolicy;
312
313 match self.config.filter_policy.get(0) {
314 Bloom(policy) => policy,
315 None => BloomConstructionPolicy::BitsPerKey(0.0),
316 }
317 });
318
319 if index_partitioning {
320 table_writer = table_writer.use_partitioned_index();
321 }
322 if filter_partitioning {
323 table_writer = table_writer.use_partitioned_filter();
324 }
325
326 for item in stream {
327 table_writer.write(item?)?;
328 }
329
330 let result = table_writer.finish()?;
331
332 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
333
334 let pin_filter = self.config.filter_block_pinning_policy.get(0);
335 let pin_index = self.config.index_block_pinning_policy.get(0);
336
337 let tables = result
339 .into_iter()
340 .map(|(table_id, checksum)| -> crate::Result<Table> {
341 Table::recover(
342 folder.join(table_id.to_string()),
343 checksum,
344 0,
345 self.id,
346 self.config.cache.clone(),
347 self.config.descriptor_table.clone(),
348 pin_filter,
349 pin_index,
350 #[cfg(feature = "metrics")]
351 self.metrics.clone(),
352 )
353 })
354 .collect::<crate::Result<Vec<_>>>()?;
355
356 Ok(Some((tables, None)))
357 }
358
359 #[expect(clippy::significant_drop_tightening)]
360 fn register_tables(
361 &self,
362 tables: &[Table],
363 blob_files: Option<&[BlobFile]>,
364 frag_map: Option<crate::blob_tree::FragmentationMap>,
365 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
366 gc_watermark: SeqNo,
367 ) -> crate::Result<()> {
368 log::trace!(
369 "Registering {} tables, {} blob files",
370 tables.len(),
371 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
372 );
373
374 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
375 let mut version_lock = self.version_history.write().expect("lock is poisoned");
376
377 version_lock.upgrade_version(
378 &self.config.path,
379 |current| {
380 let mut copy = current.clone();
381
382 copy.version = copy.version.with_new_l0_run(
383 tables,
384 blob_files,
385 frag_map.filter(|x| !x.is_empty()),
386 );
387
388 for &table_id in sealed_memtables_to_delete {
389 log::trace!("releasing sealed memtable #{table_id}");
390 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
391 }
392
393 Ok(copy)
394 },
395 &self.config.seqno,
396 )?;
397
398 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
399 log::warn!("Version GC failed: {e:?}");
400 }
401
402 Ok(())
403 }
404
405 fn clear_active_memtable(&self) {
406 self.version_history
407 .write()
408 .expect("lock is poisoned")
409 .latest_version()
410 .active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
411 }
412
413 fn set_active_memtable(&self, memtable: Memtable) {
414 self.version_history
415 .write()
416 .expect("lock is poisoned")
417 .latest_version()
418 .active_memtable = Arc::new(memtable);
419 }
420
421 fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
422 let mut version_lock = self.version_history.write().expect("lock is poisoned");
423 version_lock.append_sealed_memtable(memtable);
424 }
425
426 fn compact(
427 &self,
428 strategy: Arc<dyn CompactionStrategy>,
429 seqno_threshold: SeqNo,
430 ) -> crate::Result<()> {
431 let _lock = self
435 .0
436 .major_compaction_lock
437 .read()
438 .expect("lock is poisoned");
439
440 self.inner_compact(strategy, seqno_threshold)
441 }
442
443 fn get_next_table_id(&self) -> TableId {
444 self.0.get_next_table_id()
445 }
446
447 fn tree_config(&self) -> &Config {
448 &self.config
449 }
450
451 fn active_memtable(&self) -> Arc<Memtable> {
452 self.version_history
453 .read()
454 .expect("lock is poisoned")
455 .latest_version()
456 .active_memtable
457 .clone()
458 }
459
460 fn tree_type(&self) -> crate::TreeType {
461 crate::TreeType::Standard
462 }
463
464 #[expect(clippy::significant_drop_tightening)]
465 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
466 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
467 let super_version = version_history_lock.latest_version();
468
469 if super_version.active_memtable.is_empty() {
470 return None;
471 }
472
473 let yanked_memtable = super_version.active_memtable;
474
475 let mut copy = version_history_lock.latest_version();
476 copy.seqno = self.config.seqno.next();
477 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
478 copy.sealed_memtables =
479 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
480
481 version_history_lock.append_version(copy);
482
483 log::trace!(
484 "rotate: added memtable id={} to sealed memtables",
485 yanked_memtable.id,
486 );
487
488 Some(yanked_memtable)
489 }
490
491 fn table_count(&self) -> usize {
492 self.current_version().table_count()
493 }
494
495 fn level_table_count(&self, idx: usize) -> Option<usize> {
496 self.current_version().level(idx).map(|x| x.table_count())
497 }
498
499 fn approximate_len(&self) -> usize {
500 let super_version = self
501 .version_history
502 .read()
503 .expect("lock is poisoned")
504 .latest_version();
505
506 let tables_item_count = self
507 .current_version()
508 .iter_tables()
509 .map(|x| x.metadata.item_count)
510 .sum::<u64>();
511
512 let memtable_count = super_version.active_memtable.len() as u64;
513 let sealed_count = super_version
514 .sealed_memtables
515 .iter()
516 .map(|mt| mt.len())
517 .sum::<usize>() as u64;
518
519 (memtable_count + sealed_count + tables_item_count)
520 .try_into()
521 .expect("should not be too large")
522 }
523
524 fn disk_space(&self) -> u64 {
525 self.current_version()
526 .iter_levels()
527 .map(super::version::Level::size)
528 .sum()
529 }
530
531 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
532 let version = self
533 .version_history
534 .read()
535 .expect("lock is poisoned")
536 .latest_version();
537
538 let active = version.active_memtable.get_highest_seqno();
539
540 let sealed = version
541 .sealed_memtables
542 .iter()
543 .map(|mt| mt.get_highest_seqno())
544 .max()
545 .flatten();
546
547 active.max(sealed)
548 }
549
550 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
551 self.current_version()
552 .iter_tables()
553 .map(Table::get_highest_seqno)
554 .max()
555 }
556
557 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
558 Ok(self
559 .get_internal_entry(key.as_ref(), seqno)?
560 .map(|x| x.value))
561 }
562
563 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
564 &self,
565 key: K,
566 value: V,
567 seqno: SeqNo,
568 ) -> (u64, u64) {
569 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
570 self.append_entry(value)
571 }
572
573 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
574 let value = InternalValue::new_tombstone(key, seqno);
575 self.append_entry(value)
576 }
577
578 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
579 let value = InternalValue::new_weak_tombstone(key, seqno);
580 self.append_entry(value)
581 }
582}
583
584impl Tree {
585 #[doc(hidden)]
586 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
587 version: SuperVersion,
588 range: &'a R,
589 seqno: SeqNo,
590 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
591 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
592 use crate::range::{IterState, TreeIter};
593 use std::ops::Bound::{self, Excluded, Included, Unbounded};
594
595 let lo: Bound<UserKey> = match range.start_bound() {
596 Included(x) => Included(x.as_ref().into()),
597 Excluded(x) => Excluded(x.as_ref().into()),
598 Unbounded => Unbounded,
599 };
600
601 let hi: Bound<UserKey> = match range.end_bound() {
602 Included(x) => Included(x.as_ref().into()),
603 Excluded(x) => Excluded(x.as_ref().into()),
604 Unbounded => Unbounded,
605 };
606
607 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
608
609 let iter_state = { IterState { version, ephemeral } };
610
611 TreeIter::create_range(iter_state, bounds, seqno)
612 }
613
614 pub(crate) fn get_internal_entry_from_version(
615 super_version: &SuperVersion,
616 key: &[u8],
617 seqno: SeqNo,
618 ) -> crate::Result<Option<InternalValue>> {
619 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
620 return Ok(ignore_tombstone_value(entry));
621 }
622
623 if let Some(entry) =
625 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
626 {
627 return Ok(ignore_tombstone_value(entry));
628 }
629
630 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
632 }
633
634 fn get_internal_entry_from_tables(
635 version: &Version,
636 key: &[u8],
637 seqno: SeqNo,
638 ) -> crate::Result<Option<InternalValue>> {
639 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
642
643 for level in version.iter_levels() {
644 for run in level.iter() {
645 if run.len() >= 4 {
647 if let Some(table) = run.get_for_key(key) {
648 if let Some(item) = table.get(key, seqno, key_hash)? {
649 return Ok(ignore_tombstone_value(item));
650 }
651 }
652 } else {
653 for table in run.iter() {
655 if !table.is_key_in_key_range(key) {
656 continue;
657 }
658
659 if let Some(item) = table.get(key, seqno, key_hash)? {
660 return Ok(ignore_tombstone_value(item));
661 }
662 }
663 }
664 }
665 }
666
667 Ok(None)
668 }
669
670 fn get_internal_entry_from_sealed_memtables(
671 super_version: &SuperVersion,
672 key: &[u8],
673 seqno: SeqNo,
674 ) -> Option<InternalValue> {
675 for mt in super_version.sealed_memtables.iter().rev() {
676 if let Some(entry) = mt.get(key, seqno) {
677 return Some(entry);
678 }
679 }
680
681 None
682 }
683
684 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
685 self.version_history
686 .read()
687 .expect("lock is poisoned")
688 .get_version_for_snapshot(seqno)
689 }
690
691 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
702 range: &R,
703 ) -> (OwnedBounds, bool) {
704 use Bound::{Excluded, Included, Unbounded};
705
706 let start = match range.start_bound() {
707 Included(key) => Included(Slice::from(key.as_ref())),
708 Excluded(key) => Excluded(Slice::from(key.as_ref())),
709 Unbounded => Unbounded,
710 };
711
712 let end = match range.end_bound() {
713 Included(key) => Included(Slice::from(key.as_ref())),
714 Excluded(key) => Excluded(Slice::from(key.as_ref())),
715 Unbounded => Unbounded,
716 };
717
718 let is_empty =
719 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
720 lo.as_ref() > hi.as_ref()
721 } else {
722 false
723 };
724
725 (OwnedBounds { start, end }, is_empty)
726 }
727
728 pub(crate) fn open(config: Config) -> crate::Result<Self> {
741 log::debug!("Opening LSM-tree at {}", config.path.display());
742
743 if config.path.join("version").try_exists()? {
745 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
746 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
747 }
748
749 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
750 Self::recover(config)
751 } else {
752 Self::create_new(config)
753 }?;
754
755 Ok(tree)
756 }
757
758 pub(crate) fn consume_writer(
759 &self,
760 writer: crate::table::Writer,
761 ) -> crate::Result<Option<Table>> {
762 let table_file_path = writer.path.clone();
763
764 let Some((_, checksum)) = writer.finish()? else {
765 return Ok(None);
766 };
767
768 log::debug!("Finalized table write at {}", table_file_path.display());
769
770 let pin_filter = self.config.filter_block_pinning_policy.get(0);
771 let pin_index = self.config.index_block_pinning_policy.get(0);
772
773 let created_table = Table::recover(
774 table_file_path,
775 checksum,
776 0,
777 self.id,
778 self.config.cache.clone(),
779 self.config.descriptor_table.clone(),
780 pin_filter,
781 pin_index,
782 #[cfg(feature = "metrics")]
783 self.metrics.clone(),
784 )?;
785
786 log::debug!("Flushed table to {:?}", created_table.path);
787
788 Ok(Some(created_table))
789 }
790
791 #[doc(hidden)]
793 #[must_use]
794 pub fn is_compacting(&self) -> bool {
795 !self
796 .compaction_state
797 .lock()
798 .expect("lock is poisoned")
799 .hidden_set()
800 .is_empty()
801 }
802
803 fn inner_compact(
804 &self,
805 strategy: Arc<dyn CompactionStrategy>,
806 mvcc_gc_watermark: SeqNo,
807 ) -> crate::Result<()> {
808 use crate::compaction::worker::{do_compaction, Options};
809
810 let mut opts = Options::from_tree(self, strategy);
811 opts.mvcc_gc_watermark = mvcc_gc_watermark;
812
813 do_compaction(&opts)?;
814
815 log::debug!("Compaction run over");
816
817 Ok(())
818 }
819
820 #[doc(hidden)]
821 #[must_use]
822 pub fn create_iter(
823 &self,
824 seqno: SeqNo,
825 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
826 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
827 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
828 }
829
830 #[doc(hidden)]
831 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
832 &self,
833 range: &'a R,
834 seqno: SeqNo,
835 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
836 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
837 let version_history_lock = self.version_history.read().expect("lock is poisoned");
838 let super_version = version_history_lock.get_version_for_snapshot(seqno);
839
840 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
841 Ok(kv) => Ok((kv.key.user_key, kv.value)),
842 Err(e) => Err(e),
843 })
844 }
845
846 #[doc(hidden)]
847 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
848 &self,
849 prefix: K,
850 seqno: SeqNo,
851 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
852 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853 use crate::range::prefix_to_range;
854
855 let range = prefix_to_range(prefix.as_ref());
856 self.create_range(&range, seqno, ephemeral)
857 }
858
859 #[doc(hidden)]
863 #[must_use]
864 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
865 self.version_history
866 .read()
867 .expect("lock is poisoned")
868 .latest_version()
869 .active_memtable
870 .insert(value)
871 }
872
873 fn recover(mut config: Config) -> crate::Result<Self> {
879 use crate::stop_signal::StopSignal;
880 use inner::get_next_tree_id;
881
882 log::info!("Recovering LSM-tree at {}", config.path.display());
883
884 let tree_id = get_next_tree_id();
895
896 #[cfg(feature = "metrics")]
897 let metrics = Arc::new(Metrics::default());
898
899 let version = Self::recover_levels(
900 &config.path,
901 tree_id,
902 &config,
903 #[cfg(feature = "metrics")]
904 &metrics,
905 )?;
906
907 {
908 let manifest_path = config.path.join(format!("v{}", version.id()));
909 let reader = sfa::Reader::new(&manifest_path)?;
910 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
911
912 if manifest.version != FormatVersion::V3 {
913 return Err(crate::Error::InvalidVersion(manifest.version.into()));
914 }
915
916 let requested_tree_type = match config.kv_separation_opts {
917 Some(_) => crate::TreeType::Blob,
918 None => crate::TreeType::Standard,
919 };
920
921 if version.tree_type() != requested_tree_type {
922 log::error!(
923 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
924 version.tree_type(),
925 );
926 return Err(crate::Error::Unrecoverable);
927 }
928
929 config.level_count = manifest.level_count;
931 }
932
933 let highest_table_id = version
934 .iter_tables()
935 .map(Table::id)
936 .max()
937 .unwrap_or_default();
938
939 let inner = TreeInner {
940 id: tree_id,
941 memtable_id_counter: SequenceNumberCounter::default(),
942 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
943 blob_file_id_counter: SequenceNumberCounter::default(),
944 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
945 stop_signal: StopSignal::default(),
946 config,
947 major_compaction_lock: RwLock::default(),
948 flush_lock: Mutex::default(),
949 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
950
951 #[cfg(feature = "metrics")]
952 metrics,
953 };
954
955 Ok(Self(Arc::new(inner)))
956 }
957
958 fn create_new(config: Config) -> crate::Result<Self> {
960 use crate::file::{fsync_directory, TABLES_FOLDER};
961 use std::fs::create_dir_all;
962
963 let path = config.path.clone();
964 log::trace!("Creating LSM-tree at {}", path.display());
965
966 create_dir_all(&path)?;
967
968 let table_folder_path = path.join(TABLES_FOLDER);
969 create_dir_all(&table_folder_path)?;
970
971 fsync_directory(&table_folder_path)?;
991 fsync_directory(&path)?;
992
993 let inner = TreeInner::create_new(config)?;
994 Ok(Self(Arc::new(inner)))
995 }
996
997 fn recover_levels<P: AsRef<Path>>(
999 tree_path: P,
1000 tree_id: TreeId,
1001 config: &Config,
1002 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1003 ) -> crate::Result<Version> {
1004 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1005
1006 let tree_path = tree_path.as_ref();
1007
1008 let recovery = recover(tree_path)?;
1009
1010 let table_map = {
1011 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
1012 crate::HashMap::default();
1013
1014 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1015 for run in table_ids {
1016 for table in run {
1017 #[expect(
1018 clippy::expect_used,
1019 reason = "there are always less than 256 levels"
1020 )]
1021 result.insert(
1022 table.id,
1023 (
1024 level_idx
1025 .try_into()
1026 .expect("there are less than 256 levels"),
1027 table.checksum,
1028 table.global_seqno,
1029 ),
1030 );
1031 }
1032 }
1033 }
1034
1035 result
1036 };
1037
1038 let cnt = table_map.len();
1039
1040 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1041
1042 let progress_mod = match cnt {
1043 _ if cnt <= 20 => 1,
1044 _ if cnt <= 100 => 10,
1045 _ => 100,
1046 };
1047
1048 let mut tables = vec![];
1049
1050 let table_base_folder = tree_path.join(TABLES_FOLDER);
1051
1052 if !table_base_folder.try_exists()? {
1053 std::fs::create_dir_all(&table_base_folder)?;
1054 fsync_directory(&table_base_folder)?;
1055 }
1056
1057 let mut orphaned_tables = vec![];
1058
1059 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1060 let dirent = dirent?;
1061 let file_name = dirent.file_name();
1062
1063 if file_name == ".DS_Store" {
1065 continue;
1066 }
1067
1068 if file_name.to_string_lossy().starts_with("._") {
1070 continue;
1071 }
1072
1073 let table_file_name = file_name.to_str().ok_or_else(|| {
1074 log::error!("invalid table file name {}", file_name.display());
1075 crate::Error::Unrecoverable
1076 })?;
1077
1078 let table_file_path = dirent.path();
1079 assert!(!table_file_path.is_dir());
1080
1081 log::debug!("Recovering table from {}", table_file_path.display());
1082
1083 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1084 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1085 crate::Error::Unrecoverable
1086 })?;
1087
1088 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1089 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1090 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1091
1092 let table = Table::recover(
1093 table_file_path,
1094 checksum,
1095 global_seqno,
1096 tree_id,
1097 config.cache.clone(),
1098 config.descriptor_table.clone(),
1099 pin_filter,
1100 pin_index,
1101 #[cfg(feature = "metrics")]
1102 metrics.clone(),
1103 )?;
1104
1105 log::debug!("Recovered table from {:?}", table.path);
1106
1107 tables.push(table);
1108
1109 if idx % progress_mod == 0 {
1110 log::debug!("Recovered {idx}/{cnt} tables");
1111 }
1112 } else {
1113 orphaned_tables.push(table_file_path);
1114 }
1115 }
1116
1117 if tables.len() < cnt {
1118 log::error!(
1119 "Recovered less tables than expected: {:?}",
1120 table_map.keys(),
1121 );
1122 return Err(crate::Error::Unrecoverable);
1123 }
1124
1125 log::debug!("Successfully recovered {} tables", tables.len());
1126
1127 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1128 &tree_path.join(crate::file::BLOBS_FOLDER),
1129 &recovery.blob_file_ids,
1130 )?;
1131
1132 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1133
1134 Self::cleanup_orphaned_version(tree_path, version.id())?;
1137
1138 for table_path in orphaned_tables {
1139 log::debug!("Deleting orphaned table {}", table_path.display());
1140 std::fs::remove_file(&table_path)?;
1141 }
1142
1143 for blob_file_path in orphaned_blob_files {
1144 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1145 std::fs::remove_file(&blob_file_path)?;
1146 }
1147
1148 Ok(version)
1149 }
1150
1151 fn cleanup_orphaned_version(
1152 path: &Path,
1153 latest_version_id: crate::version::VersionId,
1154 ) -> crate::Result<()> {
1155 let version_str = format!("v{latest_version_id}");
1156
1157 for file in std::fs::read_dir(path)? {
1158 let dirent = file?;
1159
1160 if dirent.file_type()?.is_dir() {
1161 continue;
1162 }
1163
1164 let name = dirent.file_name();
1165
1166 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1167 log::trace!("Cleanup orphaned version {}", name.display());
1168 std::fs::remove_file(dirent.path())?;
1169 }
1170 }
1171
1172 Ok(())
1173 }
1174}