1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 blob_tree::FragmentationMap,
11 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12 config::Config,
13 file::BLOBS_FOLDER,
14 format_version::FormatVersion,
15 iter_guard::{IterGuard, IterGuardImpl},
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::{multi_writer::MultiWriter, Table},
20 value::InternalValue,
21 version::{recovery::recover, SuperVersion, SuperVersions, Version, VersionId},
22 vlog::BlobFile,
23 AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
24 TreeType, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, TreeId, TreeInner};
27use std::{
28 ops::{Bound, RangeBounds},
29 path::Path,
30 sync::{Arc, Mutex, MutexGuard, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(self, pred: impl Fn(&UserKey) -> bool) -> crate::Result<Option<KvPair>> {
40 let (k, v) = self.0?;
41
42 if pred(&k) {
43 Ok(Some((k, v)))
44 } else {
45 Ok(None)
46 }
47 }
48
49 fn key(self) -> crate::Result<UserKey> {
50 self.0.map(|(k, _)| k)
51 }
52
53 fn size(self) -> crate::Result<u32> {
54 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
55 self.into_inner().map(|(_, v)| v.len() as u32)
56 }
57
58 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
59 self.0
60 }
61}
62
63fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
64 if item.is_tombstone() {
65 None
66 } else {
67 Some(item)
68 }
69}
70
71#[derive(Clone)]
73pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
74
75impl std::ops::Deref for Tree {
76 type Target = TreeInner;
77
78 fn deref(&self) -> &Self::Target {
79 &self.0
80 }
81}
82
83impl AbstractTree for Tree {
84 fn get_version_history_lock(
85 &self,
86 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
87 self.version_history.write().expect("lock is poisoned")
88 }
89
90 fn next_table_id(&self) -> TableId {
91 self.0.table_id_counter.get()
92 }
93
94 fn id(&self) -> TreeId {
95 self.id
96 }
97
98 fn blob_file_count(&self) -> usize {
99 0
100 }
101
102 #[expect(clippy::significant_drop_tightening)]
103 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
104 let version_history_lock = self.version_history.read().expect("lock is poisoned");
105 let super_version = version_history_lock.get_version_for_snapshot(seqno);
106
107 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
108 return Ok(ignore_tombstone_value(entry));
109 }
110
111 if let Some(entry) =
113 Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
114 {
115 return Ok(ignore_tombstone_value(entry));
116 }
117
118 self.get_internal_entry_from_tables(&super_version.version, key, seqno)
120 }
121
122 fn current_version(&self) -> Version {
123 self.version_history
124 .read()
125 .expect("poisoned")
126 .latest_version()
127 .version
128 }
129
130 fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
131 self.flush_lock.lock().expect("lock is poisoned")
132 }
133
134 #[cfg(feature = "metrics")]
135 fn metrics(&self) -> &Arc<crate::Metrics> {
136 &self.0.metrics
137 }
138
139 fn version_free_list_len(&self) -> usize {
140 self.version_history
141 .read()
142 .expect("lock is poisoned")
143 .free_list_len()
144 }
145
146 fn prefix<K: AsRef<[u8]>>(
147 &self,
148 prefix: K,
149 seqno: SeqNo,
150 index: Option<Arc<Memtable>>,
151 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
152 Box::new(
153 self.create_prefix(&prefix, seqno, index)
154 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
155 )
156 }
157
158 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
159 &self,
160 range: R,
161 seqno: SeqNo,
162 index: Option<Arc<Memtable>>,
163 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
164 Box::new(
165 self.create_range(&range, seqno, index)
166 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
167 )
168 }
169
170 fn tombstone_count(&self) -> u64 {
172 self.current_version()
173 .iter_tables()
174 .map(Table::tombstone_count)
175 .sum()
176 }
177
178 fn weak_tombstone_count(&self) -> u64 {
180 self.current_version()
181 .iter_tables()
182 .map(Table::weak_tombstone_count)
183 .sum()
184 }
185
186 fn weak_tombstone_reclaimable_count(&self) -> u64 {
188 self.current_version()
189 .iter_tables()
190 .map(Table::weak_tombstone_reclaimable)
191 .sum()
192 }
193
194 fn ingest(
195 &self,
196 iter: impl Iterator<Item = (UserKey, UserValue)>,
197 seqno_generator: &SequenceNumberCounter,
198 visible_seqno: &SequenceNumberCounter,
199 ) -> crate::Result<()> {
200 use crate::tree::ingest::Ingestion;
201 use std::time::Instant;
202
203 let seqno = seqno_generator.next();
204
205 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
208
209 let start = Instant::now();
210 let mut count = 0;
211 let mut last_key = None;
212
213 for (key, value) in iter {
214 if let Some(last_key) = &last_key {
215 assert!(
216 key > last_key,
217 "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
218 );
219 }
220 last_key = Some(key.clone());
221
222 writer.write(key, value)?;
223
224 count += 1;
225 }
226
227 writer.finish()?;
228
229 visible_seqno.fetch_max(seqno + 1);
230
231 log::info!("Ingested {count} items in {:?}", start.elapsed());
232
233 Ok(())
234 }
235
236 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
237 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
238
239 if is_empty {
240 return Ok(());
241 }
242
243 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
244
245 let _lock = self
247 .0
248 .major_compaction_lock
249 .write()
250 .expect("lock is poisoned");
251
252 log::info!("Starting drop_range compaction");
253 self.inner_compact(strategy, 0)
254 }
255
256 #[doc(hidden)]
257 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
258 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
259
260 let _lock = self
262 .0
263 .major_compaction_lock
264 .write()
265 .expect("lock is poisoned");
266
267 log::info!("Starting major compaction");
268 self.inner_compact(strategy, seqno_threshold)
269 }
270
271 fn l0_run_count(&self) -> usize {
272 self.current_version()
273 .level(0)
274 .map(|x| x.run_count())
275 .unwrap_or_default()
276 }
277
278 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
279 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
280 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
281 }
282
283 fn filter_size(&self) -> usize {
284 self.current_version()
285 .iter_tables()
286 .map(Table::filter_size)
287 .sum()
288 }
289
290 fn pinned_filter_size(&self) -> usize {
291 self.current_version()
292 .iter_tables()
293 .map(Table::pinned_filter_size)
294 .sum()
295 }
296
297 fn pinned_block_index_size(&self) -> usize {
298 self.current_version()
299 .iter_tables()
300 .map(Table::pinned_block_index_size)
301 .sum()
302 }
303
304 fn sealed_memtable_count(&self) -> usize {
305 self.version_history
306 .read()
307 .expect("lock is poisoned")
308 .latest_version()
309 .sealed_memtables
310 .len()
311 }
312
313 fn flush_to_tables(
314 &self,
315 stream: impl Iterator<Item = crate::Result<InternalValue>>,
316 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
317 use crate::file::TABLES_FOLDER;
318 use std::time::Instant;
319
320 let start = Instant::now();
321
322 let folder = self.config.path.join(TABLES_FOLDER);
323
324 let data_block_size = self.config.data_block_size_policy.get(0);
325
326 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
327 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
328
329 let data_block_compression = self.config.data_block_compression_policy.get(0);
330 let index_block_compression = self.config.index_block_compression_policy.get(0);
331
332 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
333
334 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
335 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
336
337 log::debug!(
338 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
339 folder.display(),
340 );
341
342 let mut table_writer = MultiWriter::new(
343 folder.clone(),
344 self.table_id_counter.clone(),
345 64 * 1_024 * 1_024,
346 0,
347 )?
348 .use_data_block_restart_interval(data_block_restart_interval)
349 .use_index_block_restart_interval(index_block_restart_interval)
350 .use_data_block_compression(data_block_compression)
351 .use_index_block_compression(index_block_compression)
352 .use_data_block_size(data_block_size)
353 .use_data_block_hash_ratio(data_block_hash_ratio)
354 .use_bloom_policy({
355 use crate::config::FilterPolicyEntry::{Bloom, None};
356 use crate::table::filter::BloomConstructionPolicy;
357
358 match self.config.filter_policy.get(0) {
359 Bloom(policy) => policy,
360 None => BloomConstructionPolicy::BitsPerKey(0.0),
361 }
362 });
363
364 if index_partitioning {
365 table_writer = table_writer.use_partitioned_index();
366 }
367 if filter_partitioning {
368 table_writer = table_writer.use_partitioned_filter();
369 }
370
371 for item in stream {
372 table_writer.write(item?)?;
373 }
374
375 let result = table_writer.finish()?;
376
377 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
378
379 let pin_filter = self.config.filter_block_pinning_policy.get(0);
380 let pin_index = self.config.index_block_pinning_policy.get(0);
381
382 let tables = result
384 .into_iter()
385 .map(|(table_id, checksum)| -> crate::Result<Table> {
386 Table::recover(
387 folder.join(table_id.to_string()),
388 checksum,
389 self.id,
390 self.config.cache.clone(),
391 self.config.descriptor_table.clone(),
392 pin_filter,
393 pin_index,
394 #[cfg(feature = "metrics")]
395 self.metrics.clone(),
396 )
397 })
398 .collect::<crate::Result<Vec<_>>>()?;
399
400 Ok(Some((tables, None)))
401 }
402
403 #[expect(clippy::significant_drop_tightening)]
404 fn register_tables(
405 &self,
406 tables: &[Table],
407 blob_files: Option<&[BlobFile]>,
408 frag_map: Option<FragmentationMap>,
409 sealed_memtables_to_delete: &[MemtableId],
410 gc_watermark: SeqNo,
411 ) -> crate::Result<()> {
412 log::trace!(
413 "Registering {} tables, {} blob files",
414 tables.len(),
415 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
416 );
417
418 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
419 let mut version_lock = self.version_history.write().expect("lock is poisoned");
420
421 version_lock.upgrade_version(
422 &self.config.path,
423 |current| {
424 let mut copy = current.clone();
425
426 copy.version = copy.version.with_new_l0_run(
427 tables,
428 blob_files,
429 frag_map.filter(|x| !x.is_empty()),
430 );
431
432 for &table_id in sealed_memtables_to_delete {
433 log::trace!("releasing sealed memtable #{table_id}");
434 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
435 }
436
437 Ok(copy)
438 },
439 &self.config.seqno,
440 )?;
441
442 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
443 log::warn!("Version GC failed: {e:?}");
444 }
445
446 Ok(())
447 }
448
449 fn clear_active_memtable(&self) {
450 self.version_history
451 .write()
452 .expect("lock is poisoned")
453 .latest_version()
454 .active_memtable = Arc::new(Memtable::default());
455 }
456
457 fn set_active_memtable(&self, memtable: Memtable) {
458 self.version_history
459 .write()
460 .expect("lock is poisoned")
461 .latest_version()
462 .active_memtable = Arc::new(memtable);
463 }
464
465 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
466 let mut version_lock = self.version_history.write().expect("lock is poisoned");
467 version_lock.append_sealed_memtable(id, memtable);
468 }
469
470 fn compact(
471 &self,
472 strategy: Arc<dyn CompactionStrategy>,
473 seqno_threshold: SeqNo,
474 ) -> crate::Result<()> {
475 let _lock = self
479 .0
480 .major_compaction_lock
481 .read()
482 .expect("lock is poisoned");
483
484 self.inner_compact(strategy, seqno_threshold)
485 }
486
487 fn get_next_table_id(&self) -> TableId {
488 self.0.get_next_table_id()
489 }
490
491 fn tree_config(&self) -> &Config {
492 &self.config
493 }
494
495 fn active_memtable_size(&self) -> u64 {
496 use std::sync::atomic::Ordering::Acquire;
497
498 self.version_history
499 .read()
500 .expect("lock is poisoned")
501 .latest_version()
502 .active_memtable
503 .approximate_size
504 .load(Acquire)
505 }
506
507 fn tree_type(&self) -> crate::TreeType {
508 crate::TreeType::Standard
509 }
510
511 #[expect(clippy::significant_drop_tightening)]
512 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
513 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
514 let super_version = version_history_lock.latest_version();
515
516 if super_version.active_memtable.is_empty() {
517 return None;
518 }
519
520 let yanked_memtable = super_version.active_memtable;
521 let memtable_id = self.0.memtable_id_counter.next();
522
523 let mut copy = version_history_lock.latest_version();
524 copy.seqno = self.config.seqno.next();
525 copy.active_memtable = Arc::new(Memtable::default());
526 copy.sealed_memtables = Arc::new(
527 super_version
528 .sealed_memtables
529 .add(memtable_id, yanked_memtable.clone()),
530 );
531
532 version_history_lock.append_version(copy);
533
534 log::trace!("rotate: added memtable id={memtable_id} to sealed memtables");
535
536 Some(yanked_memtable)
537 }
538
539 fn table_count(&self) -> usize {
540 self.current_version().table_count()
541 }
542
543 fn level_table_count(&self, idx: usize) -> Option<usize> {
544 self.current_version().level(idx).map(|x| x.table_count())
545 }
546
547 #[expect(clippy::significant_drop_tightening)]
548 fn approximate_len(&self) -> usize {
549 let super_version = self
550 .version_history
551 .read()
552 .expect("lock is poisoned")
553 .latest_version();
554
555 let tables_item_count = self
556 .current_version()
557 .iter_tables()
558 .map(|x| x.metadata.item_count)
559 .sum::<u64>();
560
561 let memtable_count = super_version.active_memtable.len() as u64;
562 let sealed_count = super_version
563 .sealed_memtables
564 .iter()
565 .map(|(_, mt)| mt.len())
566 .sum::<usize>() as u64;
567
568 (memtable_count + sealed_count + tables_item_count)
569 .try_into()
570 .expect("should not be too large")
571 }
572
573 fn disk_space(&self) -> u64 {
574 self.current_version()
575 .iter_levels()
576 .map(super::version::Level::size)
577 .sum()
578 }
579
580 #[expect(clippy::significant_drop_tightening)]
581 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
582 let version = self
583 .version_history
584 .read()
585 .expect("lock is poisoned")
586 .latest_version();
587
588 let active = version.active_memtable.get_highest_seqno();
589
590 let sealed = version
591 .sealed_memtables
592 .iter()
593 .map(|(_, table)| table.get_highest_seqno())
594 .max()
595 .flatten();
596
597 active.max(sealed)
598 }
599
600 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
601 self.current_version()
602 .iter_tables()
603 .map(Table::get_highest_seqno)
604 .max()
605 }
606
607 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
608 Ok(self
609 .get_internal_entry(key.as_ref(), seqno)?
610 .map(|x| x.value))
611 }
612
613 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
614 &self,
615 key: K,
616 value: V,
617 seqno: SeqNo,
618 ) -> (u64, u64) {
619 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
620 self.append_entry(value)
621 }
622
623 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
624 let value = InternalValue::new_tombstone(key, seqno);
625 self.append_entry(value)
626 }
627
628 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
629 let value = InternalValue::new_weak_tombstone(key, seqno);
630 self.append_entry(value)
631 }
632}
633
634impl Tree {
635 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
636 self.version_history
637 .read()
638 .expect("lock is poisoned")
639 .get_version_for_snapshot(seqno)
640 }
641
642 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
653 range: &R,
654 ) -> (OwnedBounds, bool) {
655 use Bound::{Excluded, Included, Unbounded};
656
657 let start = match range.start_bound() {
658 Included(key) => Included(Slice::from(key.as_ref())),
659 Excluded(key) => Excluded(Slice::from(key.as_ref())),
660 Unbounded => Unbounded,
661 };
662
663 let end = match range.end_bound() {
664 Included(key) => Included(Slice::from(key.as_ref())),
665 Excluded(key) => Excluded(Slice::from(key.as_ref())),
666 Unbounded => Unbounded,
667 };
668
669 let is_empty =
670 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
671 lo.as_ref() > hi.as_ref()
672 } else {
673 false
674 };
675
676 (OwnedBounds { start, end }, is_empty)
677 }
678
679 pub(crate) fn open(config: Config) -> crate::Result<Self> {
692 use crate::file::MANIFEST_FILE;
693
694 log::debug!("Opening LSM-tree at {}", config.path.display());
695
696 if config.path.join("version").try_exists()? {
698 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
699 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
700 }
701
702 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
703 Self::recover(config)
704 } else {
705 Self::create_new(config)
706 }?;
707
708 Ok(tree)
709 }
710
711 #[doc(hidden)]
713 #[must_use]
714 pub fn is_compacting(&self) -> bool {
715 !self
716 .compaction_state
717 .lock()
718 .expect("lock is poisoned")
719 .hidden_set()
720 .is_empty()
721 }
722
723 fn get_internal_entry_from_sealed_memtables(
724 super_version: &SuperVersion,
725 key: &[u8],
726 seqno: SeqNo,
727 ) -> Option<InternalValue> {
728 for (_, memtable) in super_version.sealed_memtables.iter().rev() {
729 if let Some(entry) = memtable.get(key, seqno) {
730 return Some(entry);
731 }
732 }
733
734 None
735 }
736
737 fn get_internal_entry_from_tables(
738 &self,
739 version: &Version,
740 key: &[u8],
741 seqno: SeqNo,
742 ) -> crate::Result<Option<InternalValue>> {
743 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
746
747 for level in version.iter_levels() {
748 for run in level.iter() {
749 if run.len() >= 4 {
751 if let Some(table) = run.get_for_key(key) {
752 if let Some(item) = table.get(key, seqno, key_hash)? {
753 return Ok(ignore_tombstone_value(item));
754 }
755 }
756 } else {
757 for table in run.iter() {
759 if !table.is_key_in_key_range(key) {
760 continue;
761 }
762
763 if let Some(item) = table.get(key, seqno, key_hash)? {
764 return Ok(ignore_tombstone_value(item));
765 }
766 }
767 }
768 }
769 }
770
771 Ok(None)
772 }
773
774 fn inner_compact(
775 &self,
776 strategy: Arc<dyn CompactionStrategy>,
777 mvcc_gc_watermark: SeqNo,
778 ) -> crate::Result<()> {
779 use crate::compaction::worker::{do_compaction, Options};
780
781 let mut opts = Options::from_tree(self, strategy);
782 opts.mvcc_gc_watermark = mvcc_gc_watermark;
783
784 do_compaction(&opts)?;
785
786 log::debug!("Compaction run over");
787
788 Ok(())
789 }
790
791 #[doc(hidden)]
792 #[must_use]
793 pub fn create_iter(
794 &self,
795 seqno: SeqNo,
796 ephemeral: Option<Arc<Memtable>>,
797 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
798 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
799 }
800
801 #[doc(hidden)]
802 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
803 &'a self,
804 range: &'a R,
805 seqno: SeqNo,
806 ephemeral: Option<Arc<Memtable>>,
807 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
808 use crate::range::{IterState, TreeIter};
809 use std::ops::Bound::{self, Excluded, Included, Unbounded};
810
811 let lo: Bound<UserKey> = match range.start_bound() {
812 Included(x) => Included(x.as_ref().into()),
813 Excluded(x) => Excluded(x.as_ref().into()),
814 Unbounded => Unbounded,
815 };
816
817 let hi: Bound<UserKey> = match range.end_bound() {
818 Included(x) => Included(x.as_ref().into()),
819 Excluded(x) => Excluded(x.as_ref().into()),
820 Unbounded => Unbounded,
821 };
822
823 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
824
825 let version = self.get_version_for_snapshot(seqno);
826
827 let iter_state = { IterState { version, ephemeral } };
828
829 TreeIter::create_range(iter_state, bounds, seqno)
830 }
831
832 #[doc(hidden)]
833 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
834 &self,
835 range: &'a R,
836 seqno: SeqNo,
837 ephemeral: Option<Arc<Memtable>>,
838 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839 self.create_internal_range(range, seqno, ephemeral)
840 .map(|item| match item {
841 Ok(kv) => Ok((kv.key.user_key, kv.value)),
842 Err(e) => Err(e),
843 })
844 }
845
846 #[doc(hidden)]
847 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
848 &self,
849 prefix: K,
850 seqno: SeqNo,
851 ephemeral: Option<Arc<Memtable>>,
852 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853 use crate::range::prefix_to_range;
854
855 let range = prefix_to_range(prefix.as_ref());
856 self.create_range(&range, seqno, ephemeral)
857 }
858
859 #[doc(hidden)]
863 #[must_use]
864 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
865 self.version_history
866 .read()
867 .expect("lock is poisoned")
868 .latest_version()
869 .active_memtable
870 .insert(value)
871 }
872
873 fn recover(mut config: Config) -> crate::Result<Self> {
879 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
880 use inner::get_next_tree_id;
881
882 log::info!("Recovering LSM-tree at {}", config.path.display());
883
884 let manifest = {
885 let manifest_path = config.path.join(MANIFEST_FILE);
886 let reader = sfa::Reader::new(&manifest_path)?;
887 Manifest::decode_from(&manifest_path, &reader)?
888 };
889
890 if manifest.version != FormatVersion::V3 {
891 if manifest.version == FormatVersion::V2 {
892 log::error!("It looks like you are trying to open a V2 database - the database needs a manual migration, a tool is available at <TODO: 3.0.0 LINK>.");
893 }
894 if manifest.version as u8 > 3 {
895 log::error!("It looks like you are trying to open a database from the future. Are you a time traveller?");
896 }
897 return Err(crate::Error::InvalidVersion(manifest.version.into()));
898 }
899
900 config.level_count = manifest.level_count;
902
903 let tree_id = get_next_tree_id();
904
905 #[cfg(feature = "metrics")]
906 let metrics = Arc::new(Metrics::default());
907
908 let version = Self::recover_levels(
909 &config.path,
910 tree_id,
911 &config.cache,
912 &config.descriptor_table,
913 #[cfg(feature = "metrics")]
914 &metrics,
915 )?;
916
917 let highest_table_id = version
918 .iter_tables()
919 .map(Table::id)
920 .max()
921 .unwrap_or_default();
922
923 let inner = TreeInner {
924 id: tree_id,
925 memtable_id_counter: SequenceNumberCounter::default(),
926 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
927 blob_file_id_counter: SequenceNumberCounter::default(),
928 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
929 stop_signal: StopSignal::default(),
930 config,
931 major_compaction_lock: RwLock::default(),
932 flush_lock: Mutex::default(),
933 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
934
935 #[cfg(feature = "metrics")]
936 metrics,
937 };
938
939 Ok(Self(Arc::new(inner)))
940 }
941
942 fn create_new(config: Config) -> crate::Result<Self> {
944 use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
945 use std::fs::create_dir_all;
946
947 let path = config.path.clone();
948 log::trace!("Creating LSM-tree at {}", path.display());
949
950 create_dir_all(&path)?;
951
952 let manifest_path = path.join(MANIFEST_FILE);
953 assert!(!manifest_path.try_exists()?);
954
955 let table_folder_path = path.join(TABLES_FOLDER);
956 create_dir_all(&table_folder_path)?;
957
958 {
960 let mut writer = sfa::Writer::new_at_path(manifest_path)?;
961
962 Manifest {
963 version: FormatVersion::V3,
964 level_count: config.level_count,
965 tree_type: if config.kv_separation_opts.is_some() {
966 TreeType::Blob
967 } else {
968 TreeType::Standard
969 },
970 }
971 .encode_into(&mut writer)?;
972
973 writer.finish()?;
974 }
975
976 fsync_directory(&table_folder_path)?;
978 fsync_directory(&path)?;
979
980 let inner = TreeInner::create_new(config)?;
981 Ok(Self(Arc::new(inner)))
982 }
983
984 fn recover_levels<P: AsRef<Path>>(
986 tree_path: P,
987 tree_id: TreeId,
988 cache: &Arc<Cache>,
989 descriptor_table: &Arc<DescriptorTable>,
990 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
991 ) -> crate::Result<Version> {
992 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
993
994 let tree_path = tree_path.as_ref();
995
996 let recovery = recover(tree_path)?;
997
998 let table_map = {
999 let mut result: crate::HashMap<TableId, (u8 , Checksum)> =
1000 crate::HashMap::default();
1001
1002 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1003 for run in table_ids {
1004 for &(table_id, checksum) in run {
1005 #[expect(
1006 clippy::expect_used,
1007 reason = "there are always less than 256 levels"
1008 )]
1009 result.insert(
1010 table_id,
1011 (
1012 level_idx
1013 .try_into()
1014 .expect("there are less than 256 levels"),
1015 checksum,
1016 ),
1017 );
1018 }
1019 }
1020 }
1021
1022 result
1023 };
1024
1025 let cnt = table_map.len();
1026
1027 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1028
1029 let progress_mod = match cnt {
1030 _ if cnt <= 20 => 1,
1031 _ if cnt <= 100 => 10,
1032 _ => 100,
1033 };
1034
1035 let mut tables = vec![];
1036
1037 let table_base_folder = tree_path.join(TABLES_FOLDER);
1038
1039 if !table_base_folder.try_exists()? {
1040 std::fs::create_dir_all(&table_base_folder)?;
1041 fsync_directory(&table_base_folder)?;
1042 }
1043
1044 let mut orphaned_tables = vec![];
1045
1046 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1047 let dirent = dirent?;
1048 let file_name = dirent.file_name();
1049
1050 if file_name == ".DS_Store" {
1052 continue;
1053 }
1054
1055 if file_name.to_string_lossy().starts_with("._") {
1057 continue;
1058 }
1059
1060 let table_file_name = file_name.to_str().ok_or_else(|| {
1061 log::error!("invalid table file name {}", file_name.display());
1062 crate::Error::Unrecoverable
1063 })?;
1064
1065 let table_file_path = dirent.path();
1066 assert!(!table_file_path.is_dir());
1067
1068 log::debug!("Recovering table from {}", table_file_path.display());
1069
1070 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1071 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1072 crate::Error::Unrecoverable
1073 })?;
1074
1075 if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1076 let table = Table::recover(
1077 table_file_path,
1078 checksum,
1079 tree_id,
1080 cache.clone(),
1081 descriptor_table.clone(),
1082 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1085 metrics.clone(),
1086 )?;
1087
1088 log::debug!("Recovered table from {:?}", table.path);
1089
1090 tables.push(table);
1091
1092 if idx % progress_mod == 0 {
1093 log::debug!("Recovered {idx}/{cnt} tables");
1094 }
1095 } else {
1096 orphaned_tables.push(table_file_path);
1097 }
1098 }
1099
1100 if tables.len() < cnt {
1101 log::error!(
1102 "Recovered less tables than expected: {:?}",
1103 table_map.keys(),
1104 );
1105 return Err(crate::Error::Unrecoverable);
1106 }
1107
1108 log::debug!("Successfully recovered {} tables", tables.len());
1109
1110 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1111 &tree_path.join(BLOBS_FOLDER),
1112 &recovery.blob_file_ids,
1113 )?;
1114
1115 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1116
1117 Self::cleanup_orphaned_version(tree_path, version.id())?;
1120
1121 for table_path in orphaned_tables {
1122 log::debug!("Deleting orphaned table {}", table_path.display());
1123 std::fs::remove_file(&table_path)?;
1124 }
1125
1126 for blob_file_path in orphaned_blob_files {
1127 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1128 std::fs::remove_file(&blob_file_path)?;
1129 }
1130
1131 Ok(version)
1132 }
1133
1134 fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1135 let version_str = format!("v{latest_version_id}");
1136
1137 for file in std::fs::read_dir(path)? {
1138 let dirent = file?;
1139
1140 if dirent.file_type()?.is_dir() {
1141 continue;
1142 }
1143
1144 let name = dirent.file_name();
1145
1146 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1147 log::trace!("Cleanup orphaned version {}", name.display());
1148 std::fs::remove_file(dirent.path())?;
1149 }
1150 }
1151
1152 Ok(())
1153 }
1154}