1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 manifest::Manifest,
16 memtable::Memtable,
17 slice::Slice,
18 table::Table,
19 value::InternalValue,
20 version::{recovery::recover, SuperVersion, SuperVersions, Version},
21 vlog::BlobFile,
22 AbstractTree, Cache, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey,
23 UserValue, ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27 ops::{Bound, RangeBounds},
28 path::Path,
29 sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let (k, v) = self.0?;
44
45 if pred(&k) {
46 Ok((k, Some(v)))
47 } else {
48 Ok((k, None))
49 }
50 }
51
52 fn key(self) -> crate::Result<UserKey> {
53 self.0.map(|(k, _)| k)
54 }
55
56 fn size(self) -> crate::Result<u32> {
57 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58 self.into_inner().map(|(_, v)| v.len() as u32)
59 }
60
61 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62 self.0
63 }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67 if item.is_tombstone() {
68 None
69 } else {
70 Some(item)
71 }
72}
73
74#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79 type Target = TreeInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AbstractTree for Tree {
87 fn table_file_cache_size(&self) -> usize {
88 self.config.descriptor_table.len()
89 }
90
91 fn get_version_history_lock(
92 &self,
93 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94 self.version_history.write().expect("lock is poisoned")
95 }
96
97 fn next_table_id(&self) -> TableId {
98 self.0.table_id_counter.get()
99 }
100
101 fn id(&self) -> TreeId {
102 self.id
103 }
104
105 fn blob_file_count(&self) -> usize {
106 0
107 }
108
109 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110 let super_version = self
111 .version_history
112 .read()
113 .expect("lock is poisoned")
114 .get_version_for_snapshot(seqno);
115
116 Self::get_internal_entry_from_version(&super_version, key, seqno)
117 }
118
119 fn current_version(&self) -> Version {
120 self.version_history
121 .read()
122 .expect("poisoned")
123 .latest_version()
124 .version
125 }
126
127 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128 self.flush_lock.lock().expect("lock is poisoned")
129 }
130
131 #[cfg(feature = "metrics")]
132 fn metrics(&self) -> &Arc<crate::Metrics> {
133 &self.0.metrics
134 }
135
136 fn version_free_list_len(&self) -> usize {
137 self.version_history
138 .read()
139 .expect("lock is poisoned")
140 .free_list_len()
141 }
142
143 fn prefix<K: AsRef<[u8]>>(
144 &self,
145 prefix: K,
146 seqno: SeqNo,
147 index: Option<(Arc<Memtable>, SeqNo)>,
148 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149 Box::new(
150 self.create_prefix(&prefix, seqno, index)
151 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152 )
153 }
154
155 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156 &self,
157 range: R,
158 seqno: SeqNo,
159 index: Option<(Arc<Memtable>, SeqNo)>,
160 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161 Box::new(
162 self.create_range(&range, seqno, index)
163 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164 )
165 }
166
167 fn tombstone_count(&self) -> u64 {
169 self.current_version()
170 .iter_tables()
171 .map(Table::tombstone_count)
172 .sum()
173 }
174
175 fn weak_tombstone_count(&self) -> u64 {
177 self.current_version()
178 .iter_tables()
179 .map(Table::weak_tombstone_count)
180 .sum()
181 }
182
183 fn weak_tombstone_reclaimable_count(&self) -> u64 {
185 self.current_version()
186 .iter_tables()
187 .map(Table::weak_tombstone_reclaimable)
188 .sum()
189 }
190
191 fn ingest(
192 &self,
193 iter: impl Iterator<Item = (UserKey, UserValue)>,
194 seqno_generator: &SequenceNumberCounter,
195 visible_seqno: &SequenceNumberCounter,
196 ) -> crate::Result<()> {
197 use crate::tree::ingest::Ingestion;
198 use std::time::Instant;
199
200 let seqno = seqno_generator.next();
201
202 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
205
206 let start = Instant::now();
207 let mut count = 0;
208 let mut last_key = None;
209
210 for (key, value) in iter {
211 if let Some(last_key) = &last_key {
212 assert!(
213 key > last_key,
214 "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
215 );
216 }
217 last_key = Some(key.clone());
218
219 writer.write(key, value)?;
220
221 count += 1;
222 }
223
224 writer.finish()?;
225
226 visible_seqno.fetch_max(seqno + 1);
227
228 log::info!("Ingested {count} items in {:?}", start.elapsed());
229
230 Ok(())
231 }
232
233 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
234 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
235
236 if is_empty {
237 return Ok(());
238 }
239
240 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
241
242 let _lock = self
244 .0
245 .major_compaction_lock
246 .write()
247 .expect("lock is poisoned");
248
249 log::info!("Starting drop_range compaction");
250 self.inner_compact(strategy, 0)
251 }
252
253 #[doc(hidden)]
254 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
255 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
256
257 let _lock = self
259 .0
260 .major_compaction_lock
261 .write()
262 .expect("lock is poisoned");
263
264 log::info!("Starting major compaction");
265 self.inner_compact(strategy, seqno_threshold)
266 }
267
268 fn l0_run_count(&self) -> usize {
269 self.current_version()
270 .level(0)
271 .map(|x| x.run_count())
272 .unwrap_or_default()
273 }
274
275 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
276 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
277 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
278 }
279
280 fn filter_size(&self) -> usize {
281 self.current_version()
282 .iter_tables()
283 .map(Table::filter_size)
284 .sum()
285 }
286
287 fn pinned_filter_size(&self) -> usize {
288 self.current_version()
289 .iter_tables()
290 .map(Table::pinned_filter_size)
291 .sum()
292 }
293
294 fn pinned_block_index_size(&self) -> usize {
295 self.current_version()
296 .iter_tables()
297 .map(Table::pinned_block_index_size)
298 .sum()
299 }
300
301 fn sealed_memtable_count(&self) -> usize {
302 self.version_history
303 .read()
304 .expect("lock is poisoned")
305 .latest_version()
306 .sealed_memtables
307 .len()
308 }
309
310 fn flush_to_tables(
311 &self,
312 stream: impl Iterator<Item = crate::Result<InternalValue>>,
313 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
314 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
315 use std::time::Instant;
316
317 let start = Instant::now();
318
319 let folder = self.config.path.join(TABLES_FOLDER);
320
321 let data_block_size = self.config.data_block_size_policy.get(0);
322
323 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
324 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
325
326 let data_block_compression = self.config.data_block_compression_policy.get(0);
327 let index_block_compression = self.config.index_block_compression_policy.get(0);
328
329 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
330
331 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
332 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
333
334 log::debug!(
335 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
336 folder.display(),
337 );
338
339 let mut table_writer = MultiWriter::new(
340 folder.clone(),
341 self.table_id_counter.clone(),
342 64 * 1_024 * 1_024,
343 0,
344 )?
345 .use_data_block_restart_interval(data_block_restart_interval)
346 .use_index_block_restart_interval(index_block_restart_interval)
347 .use_data_block_compression(data_block_compression)
348 .use_index_block_compression(index_block_compression)
349 .use_data_block_size(data_block_size)
350 .use_data_block_hash_ratio(data_block_hash_ratio)
351 .use_bloom_policy({
352 use crate::config::FilterPolicyEntry::{Bloom, None};
353 use crate::table::filter::BloomConstructionPolicy;
354
355 match self.config.filter_policy.get(0) {
356 Bloom(policy) => policy,
357 None => BloomConstructionPolicy::BitsPerKey(0.0),
358 }
359 });
360
361 if index_partitioning {
362 table_writer = table_writer.use_partitioned_index();
363 }
364 if filter_partitioning {
365 table_writer = table_writer.use_partitioned_filter();
366 }
367
368 for item in stream {
369 table_writer.write(item?)?;
370 }
371
372 let result = table_writer.finish()?;
373
374 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
375
376 let pin_filter = self.config.filter_block_pinning_policy.get(0);
377 let pin_index = self.config.index_block_pinning_policy.get(0);
378
379 let tables = result
381 .into_iter()
382 .map(|(table_id, checksum)| -> crate::Result<Table> {
383 Table::recover(
384 folder.join(table_id.to_string()),
385 checksum,
386 self.id,
387 self.config.cache.clone(),
388 self.config.descriptor_table.clone(),
389 pin_filter,
390 pin_index,
391 #[cfg(feature = "metrics")]
392 self.metrics.clone(),
393 )
394 })
395 .collect::<crate::Result<Vec<_>>>()?;
396
397 Ok(Some((tables, None)))
398 }
399
400 #[expect(clippy::significant_drop_tightening)]
401 fn register_tables(
402 &self,
403 tables: &[Table],
404 blob_files: Option<&[BlobFile]>,
405 frag_map: Option<crate::blob_tree::FragmentationMap>,
406 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
407 gc_watermark: SeqNo,
408 ) -> crate::Result<()> {
409 log::trace!(
410 "Registering {} tables, {} blob files",
411 tables.len(),
412 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
413 );
414
415 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
416 let mut version_lock = self.version_history.write().expect("lock is poisoned");
417
418 version_lock.upgrade_version(
419 &self.config.path,
420 |current| {
421 let mut copy = current.clone();
422
423 copy.version = copy.version.with_new_l0_run(
424 tables,
425 blob_files,
426 frag_map.filter(|x| !x.is_empty()),
427 );
428
429 for &table_id in sealed_memtables_to_delete {
430 log::trace!("releasing sealed memtable #{table_id}");
431 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
432 }
433
434 Ok(copy)
435 },
436 &self.config.seqno,
437 )?;
438
439 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
440 log::warn!("Version GC failed: {e:?}");
441 }
442
443 Ok(())
444 }
445
446 fn clear_active_memtable(&self) {
447 self.version_history
448 .write()
449 .expect("lock is poisoned")
450 .latest_version()
451 .active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
452 }
453
454 fn set_active_memtable(&self, memtable: Memtable) {
455 self.version_history
456 .write()
457 .expect("lock is poisoned")
458 .latest_version()
459 .active_memtable = Arc::new(memtable);
460 }
461
462 fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
463 let mut version_lock = self.version_history.write().expect("lock is poisoned");
464 version_lock.append_sealed_memtable(memtable);
465 }
466
467 fn compact(
468 &self,
469 strategy: Arc<dyn CompactionStrategy>,
470 seqno_threshold: SeqNo,
471 ) -> crate::Result<()> {
472 let _lock = self
476 .0
477 .major_compaction_lock
478 .read()
479 .expect("lock is poisoned");
480
481 self.inner_compact(strategy, seqno_threshold)
482 }
483
484 fn get_next_table_id(&self) -> TableId {
485 self.0.get_next_table_id()
486 }
487
488 fn tree_config(&self) -> &Config {
489 &self.config
490 }
491
492 fn active_memtable_size(&self) -> u64 {
493 use std::sync::atomic::Ordering::Acquire;
494
495 self.version_history
496 .read()
497 .expect("lock is poisoned")
498 .latest_version()
499 .active_memtable
500 .approximate_size
501 .load(Acquire)
502 }
503
504 fn tree_type(&self) -> crate::TreeType {
505 crate::TreeType::Standard
506 }
507
508 #[expect(clippy::significant_drop_tightening)]
509 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
510 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
511 let super_version = version_history_lock.latest_version();
512
513 if super_version.active_memtable.is_empty() {
514 return None;
515 }
516
517 let yanked_memtable = super_version.active_memtable;
518
519 let mut copy = version_history_lock.latest_version();
520 copy.seqno = self.config.seqno.next();
521 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
522 copy.sealed_memtables =
523 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
524
525 version_history_lock.append_version(copy);
526
527 log::trace!(
528 "rotate: added memtable id={} to sealed memtables",
529 yanked_memtable.id,
530 );
531
532 Some(yanked_memtable)
533 }
534
535 fn table_count(&self) -> usize {
536 self.current_version().table_count()
537 }
538
539 fn level_table_count(&self, idx: usize) -> Option<usize> {
540 self.current_version().level(idx).map(|x| x.table_count())
541 }
542
543 #[expect(clippy::significant_drop_tightening)]
544 fn approximate_len(&self) -> usize {
545 let super_version = self
546 .version_history
547 .read()
548 .expect("lock is poisoned")
549 .latest_version();
550
551 let tables_item_count = self
552 .current_version()
553 .iter_tables()
554 .map(|x| x.metadata.item_count)
555 .sum::<u64>();
556
557 let memtable_count = super_version.active_memtable.len() as u64;
558 let sealed_count = super_version
559 .sealed_memtables
560 .iter()
561 .map(|mt| mt.len())
562 .sum::<usize>() as u64;
563
564 (memtable_count + sealed_count + tables_item_count)
565 .try_into()
566 .expect("should not be too large")
567 }
568
569 fn disk_space(&self) -> u64 {
570 self.current_version()
571 .iter_levels()
572 .map(super::version::Level::size)
573 .sum()
574 }
575
576 #[expect(clippy::significant_drop_tightening)]
577 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
578 let version = self
579 .version_history
580 .read()
581 .expect("lock is poisoned")
582 .latest_version();
583
584 let active = version.active_memtable.get_highest_seqno();
585
586 let sealed = version
587 .sealed_memtables
588 .iter()
589 .map(|mt| mt.get_highest_seqno())
590 .max()
591 .flatten();
592
593 active.max(sealed)
594 }
595
596 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
597 self.current_version()
598 .iter_tables()
599 .map(Table::get_highest_seqno)
600 .max()
601 }
602
603 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
604 Ok(self
605 .get_internal_entry(key.as_ref(), seqno)?
606 .map(|x| x.value))
607 }
608
609 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
610 &self,
611 key: K,
612 value: V,
613 seqno: SeqNo,
614 ) -> (u64, u64) {
615 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
616 self.append_entry(value)
617 }
618
619 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
620 let value = InternalValue::new_tombstone(key, seqno);
621 self.append_entry(value)
622 }
623
624 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
625 let value = InternalValue::new_weak_tombstone(key, seqno);
626 self.append_entry(value)
627 }
628}
629
630impl Tree {
631 #[doc(hidden)]
632 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
633 version: SuperVersion,
634 range: &'a R,
635 seqno: SeqNo,
636 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
637 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
638 use crate::range::{IterState, TreeIter};
639 use std::ops::Bound::{self, Excluded, Included, Unbounded};
640
641 let lo: Bound<UserKey> = match range.start_bound() {
642 Included(x) => Included(x.as_ref().into()),
643 Excluded(x) => Excluded(x.as_ref().into()),
644 Unbounded => Unbounded,
645 };
646
647 let hi: Bound<UserKey> = match range.end_bound() {
648 Included(x) => Included(x.as_ref().into()),
649 Excluded(x) => Excluded(x.as_ref().into()),
650 Unbounded => Unbounded,
651 };
652
653 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
654
655 let iter_state = { IterState { version, ephemeral } };
656
657 TreeIter::create_range(iter_state, bounds, seqno)
658 }
659
660 pub(crate) fn get_internal_entry_from_version(
661 super_version: &SuperVersion,
662 key: &[u8],
663 seqno: SeqNo,
664 ) -> crate::Result<Option<InternalValue>> {
665 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
666 return Ok(ignore_tombstone_value(entry));
667 }
668
669 if let Some(entry) =
671 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
672 {
673 return Ok(ignore_tombstone_value(entry));
674 }
675
676 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
678 }
679
680 fn get_internal_entry_from_tables(
681 version: &Version,
682 key: &[u8],
683 seqno: SeqNo,
684 ) -> crate::Result<Option<InternalValue>> {
685 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
688
689 for level in version.iter_levels() {
690 for run in level.iter() {
691 if run.len() >= 4 {
693 if let Some(table) = run.get_for_key(key) {
694 if let Some(item) = table.get(key, seqno, key_hash)? {
695 return Ok(ignore_tombstone_value(item));
696 }
697 }
698 } else {
699 for table in run.iter() {
701 if !table.is_key_in_key_range(key) {
702 continue;
703 }
704
705 if let Some(item) = table.get(key, seqno, key_hash)? {
706 return Ok(ignore_tombstone_value(item));
707 }
708 }
709 }
710 }
711 }
712
713 Ok(None)
714 }
715
716 fn get_internal_entry_from_sealed_memtables(
717 super_version: &SuperVersion,
718 key: &[u8],
719 seqno: SeqNo,
720 ) -> Option<InternalValue> {
721 for mt in super_version.sealed_memtables.iter().rev() {
722 if let Some(entry) = mt.get(key, seqno) {
723 return Some(entry);
724 }
725 }
726
727 None
728 }
729
730 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
731 self.version_history
732 .read()
733 .expect("lock is poisoned")
734 .get_version_for_snapshot(seqno)
735 }
736
737 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
748 range: &R,
749 ) -> (OwnedBounds, bool) {
750 use Bound::{Excluded, Included, Unbounded};
751
752 let start = match range.start_bound() {
753 Included(key) => Included(Slice::from(key.as_ref())),
754 Excluded(key) => Excluded(Slice::from(key.as_ref())),
755 Unbounded => Unbounded,
756 };
757
758 let end = match range.end_bound() {
759 Included(key) => Included(Slice::from(key.as_ref())),
760 Excluded(key) => Excluded(Slice::from(key.as_ref())),
761 Unbounded => Unbounded,
762 };
763
764 let is_empty =
765 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
766 lo.as_ref() > hi.as_ref()
767 } else {
768 false
769 };
770
771 (OwnedBounds { start, end }, is_empty)
772 }
773
774 pub(crate) fn open(config: Config) -> crate::Result<Self> {
787 log::debug!("Opening LSM-tree at {}", config.path.display());
788
789 if config.path.join("version").try_exists()? {
791 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
792 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
793 }
794
795 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
796 Self::recover(config)
797 } else {
798 Self::create_new(config)
799 }?;
800
801 Ok(tree)
802 }
803
804 #[doc(hidden)]
806 #[must_use]
807 pub fn is_compacting(&self) -> bool {
808 !self
809 .compaction_state
810 .lock()
811 .expect("lock is poisoned")
812 .hidden_set()
813 .is_empty()
814 }
815
816 fn inner_compact(
817 &self,
818 strategy: Arc<dyn CompactionStrategy>,
819 mvcc_gc_watermark: SeqNo,
820 ) -> crate::Result<()> {
821 use crate::compaction::worker::{do_compaction, Options};
822
823 let mut opts = Options::from_tree(self, strategy);
824 opts.mvcc_gc_watermark = mvcc_gc_watermark;
825
826 do_compaction(&opts)?;
827
828 log::debug!("Compaction run over");
829
830 Ok(())
831 }
832
833 #[doc(hidden)]
834 #[must_use]
835 pub fn create_iter(
836 &self,
837 seqno: SeqNo,
838 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
839 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
840 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
841 }
842
843 #[doc(hidden)]
844 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
845 &self,
846 range: &'a R,
847 seqno: SeqNo,
848 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
849 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
850 let version_history_lock = self.version_history.read().expect("lock is poisoned");
851 let super_version = version_history_lock.get_version_for_snapshot(seqno);
852
853 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
854 Ok(kv) => Ok((kv.key.user_key, kv.value)),
855 Err(e) => Err(e),
856 })
857 }
858
859 #[doc(hidden)]
860 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
861 &self,
862 prefix: K,
863 seqno: SeqNo,
864 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
865 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
866 use crate::range::prefix_to_range;
867
868 let range = prefix_to_range(prefix.as_ref());
869 self.create_range(&range, seqno, ephemeral)
870 }
871
872 #[doc(hidden)]
876 #[must_use]
877 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
878 self.version_history
879 .read()
880 .expect("lock is poisoned")
881 .latest_version()
882 .active_memtable
883 .insert(value)
884 }
885
886 fn recover(mut config: Config) -> crate::Result<Self> {
892 use crate::stop_signal::StopSignal;
893 use inner::get_next_tree_id;
894
895 log::info!("Recovering LSM-tree at {}", config.path.display());
896
897 let tree_id = get_next_tree_id();
908
909 #[cfg(feature = "metrics")]
910 let metrics = Arc::new(Metrics::default());
911
912 let version = Self::recover_levels(
913 &config.path,
914 tree_id,
915 &config.cache,
916 &config.descriptor_table,
917 #[cfg(feature = "metrics")]
918 &metrics,
919 )?;
920
921 {
922 let manifest_path = config.path.join(format!("v{}", version.id()));
923 let reader = sfa::Reader::new(&manifest_path)?;
924 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
925
926 if manifest.version != FormatVersion::V3 {
927 return Err(crate::Error::InvalidVersion(manifest.version.into()));
928 }
929
930 let requested_tree_type = match config.kv_separation_opts {
931 Some(_) => crate::TreeType::Blob,
932 None => crate::TreeType::Standard,
933 };
934
935 if version.tree_type() != requested_tree_type {
936 log::error!(
937 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
938 version.tree_type(),
939 );
940 return Err(crate::Error::Unrecoverable);
941 }
942
943 config.level_count = manifest.level_count;
945 }
946
947 let highest_table_id = version
948 .iter_tables()
949 .map(Table::id)
950 .max()
951 .unwrap_or_default();
952
953 let inner = TreeInner {
954 id: tree_id,
955 memtable_id_counter: SequenceNumberCounter::default(),
956 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
957 blob_file_id_counter: SequenceNumberCounter::default(),
958 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
959 stop_signal: StopSignal::default(),
960 config,
961 major_compaction_lock: RwLock::default(),
962 flush_lock: Mutex::default(),
963 compaction_state: Arc::new(Mutex::new(
964 crate::compaction::state::CompactionState::default(),
965 )),
966
967 #[cfg(feature = "metrics")]
968 metrics,
969 };
970
971 Ok(Self(Arc::new(inner)))
972 }
973
974 fn create_new(config: Config) -> crate::Result<Self> {
976 use crate::file::{fsync_directory, TABLES_FOLDER};
977 use std::fs::create_dir_all;
978
979 let path = config.path.clone();
980 log::trace!("Creating LSM-tree at {}", path.display());
981
982 create_dir_all(&path)?;
983
984 let table_folder_path = path.join(TABLES_FOLDER);
985 create_dir_all(&table_folder_path)?;
986
987 fsync_directory(&table_folder_path)?;
1007 fsync_directory(&path)?;
1008
1009 let inner = TreeInner::create_new(config)?;
1010 Ok(Self(Arc::new(inner)))
1011 }
1012
1013 fn recover_levels<P: AsRef<Path>>(
1015 tree_path: P,
1016 tree_id: TreeId,
1017 cache: &Arc<Cache>,
1018 descriptor_table: &Arc<crate::DescriptorTable>,
1019 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1020 ) -> crate::Result<Version> {
1021 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1022
1023 let tree_path = tree_path.as_ref();
1024
1025 let recovery = recover(tree_path)?;
1026
1027 let table_map = {
1028 let mut result: crate::HashMap<TableId, (u8 , Checksum)> =
1029 crate::HashMap::default();
1030
1031 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1032 for run in table_ids {
1033 for &(table_id, checksum) in run {
1034 #[expect(
1035 clippy::expect_used,
1036 reason = "there are always less than 256 levels"
1037 )]
1038 result.insert(
1039 table_id,
1040 (
1041 level_idx
1042 .try_into()
1043 .expect("there are less than 256 levels"),
1044 checksum,
1045 ),
1046 );
1047 }
1048 }
1049 }
1050
1051 result
1052 };
1053
1054 let cnt = table_map.len();
1055
1056 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1057
1058 let progress_mod = match cnt {
1059 _ if cnt <= 20 => 1,
1060 _ if cnt <= 100 => 10,
1061 _ => 100,
1062 };
1063
1064 let mut tables = vec![];
1065
1066 let table_base_folder = tree_path.join(TABLES_FOLDER);
1067
1068 if !table_base_folder.try_exists()? {
1069 std::fs::create_dir_all(&table_base_folder)?;
1070 fsync_directory(&table_base_folder)?;
1071 }
1072
1073 let mut orphaned_tables = vec![];
1074
1075 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1076 let dirent = dirent?;
1077 let file_name = dirent.file_name();
1078
1079 if file_name == ".DS_Store" {
1081 continue;
1082 }
1083
1084 if file_name.to_string_lossy().starts_with("._") {
1086 continue;
1087 }
1088
1089 let table_file_name = file_name.to_str().ok_or_else(|| {
1090 log::error!("invalid table file name {}", file_name.display());
1091 crate::Error::Unrecoverable
1092 })?;
1093
1094 let table_file_path = dirent.path();
1095 assert!(!table_file_path.is_dir());
1096
1097 log::debug!("Recovering table from {}", table_file_path.display());
1098
1099 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1100 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1101 crate::Error::Unrecoverable
1102 })?;
1103
1104 if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1105 let table = Table::recover(
1106 table_file_path,
1107 checksum,
1108 tree_id,
1109 cache.clone(),
1110 descriptor_table.clone(),
1111 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1114 metrics.clone(),
1115 )?;
1116
1117 log::debug!("Recovered table from {:?}", table.path);
1118
1119 tables.push(table);
1120
1121 if idx % progress_mod == 0 {
1122 log::debug!("Recovered {idx}/{cnt} tables");
1123 }
1124 } else {
1125 orphaned_tables.push(table_file_path);
1126 }
1127 }
1128
1129 if tables.len() < cnt {
1130 log::error!(
1131 "Recovered less tables than expected: {:?}",
1132 table_map.keys(),
1133 );
1134 return Err(crate::Error::Unrecoverable);
1135 }
1136
1137 log::debug!("Successfully recovered {} tables", tables.len());
1138
1139 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1140 &tree_path.join(crate::file::BLOBS_FOLDER),
1141 &recovery.blob_file_ids,
1142 )?;
1143
1144 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1145
1146 Self::cleanup_orphaned_version(tree_path, version.id())?;
1149
1150 for table_path in orphaned_tables {
1151 log::debug!("Deleting orphaned table {}", table_path.display());
1152 std::fs::remove_file(&table_path)?;
1153 }
1154
1155 for blob_file_path in orphaned_blob_files {
1156 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1157 std::fs::remove_file(&blob_file_path)?;
1158 }
1159
1160 Ok(version)
1161 }
1162
1163 fn cleanup_orphaned_version(
1164 path: &Path,
1165 latest_version_id: crate::version::VersionId,
1166 ) -> crate::Result<()> {
1167 let version_str = format!("v{latest_version_id}");
1168
1169 for file in std::fs::read_dir(path)? {
1170 let dirent = file?;
1171
1172 if dirent.file_type()?.is_dir() {
1173 continue;
1174 }
1175
1176 let name = dirent.file_name();
1177
1178 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1179 log::trace!("Cleanup orphaned version {}", name.display());
1180 std::fs::remove_file(dirent.path())?;
1181 }
1182 }
1183
1184 Ok(())
1185 }
1186}