1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 blob_tree::FragmentationMap,
11 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12 config::Config,
13 file::BLOBS_FOLDER,
14 format_version::FormatVersion,
15 iter_guard::{IterGuard, IterGuardImpl},
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::Table,
20 value::InternalValue,
21 version::{recovery::recover, SuperVersion, SuperVersions, Version, VersionId},
22 vlog::BlobFile,
23 AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
24 TreeType, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, TreeId, TreeInner};
27use std::{
28 ops::{Bound, RangeBounds},
29 path::Path,
30 sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn key(self) -> crate::Result<UserKey> {
40 self.0.map(|(k, _)| k)
41 }
42
43 fn size(self) -> crate::Result<u32> {
44 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
45 self.into_inner().map(|(_, v)| v.len() as u32)
46 }
47
48 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
49 self.0
50 }
51}
52
53fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
54 if item.is_tombstone() {
55 None
56 } else {
57 Some(item)
58 }
59}
60
61#[derive(Clone)]
63pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
64
65impl std::ops::Deref for Tree {
66 type Target = TreeInner;
67
68 fn deref(&self) -> &Self::Target {
69 &self.0
70 }
71}
72
73impl AbstractTree for Tree {
74 fn next_table_id(&self) -> TableId {
75 self.0.table_id_counter.get()
76 }
77
78 fn id(&self) -> TreeId {
79 self.id
80 }
81
82 #[expect(clippy::significant_drop_tightening)]
83 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
84 let version_history_lock = self.version_history.read().expect("lock is poisoned");
85 let super_version = version_history_lock.get_version_for_snapshot(seqno);
86
87 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
88 return Ok(ignore_tombstone_value(entry));
89 }
90
91 if let Some(entry) =
93 Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
94 {
95 return Ok(ignore_tombstone_value(entry));
96 }
97
98 self.get_internal_entry_from_tables(&super_version.version, key, seqno)
100 }
101
102 fn current_version(&self) -> Version {
103 self.version_history
104 .read()
105 .expect("poisoned")
106 .latest_version()
107 .version
108 }
109
110 fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
111 log::debug!("Flushing active memtable");
112
113 let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
114 return Ok(None);
115 };
116
117 let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
118 else {
119 return Ok(None);
120 };
121 self.register_tables(std::slice::from_ref(&table), None, None)?;
122
123 Ok(Some(table))
124 }
125
126 #[cfg(feature = "metrics")]
127 fn metrics(&self) -> &Arc<crate::Metrics> {
128 &self.0.metrics
129 }
130
131 fn version_free_list_len(&self) -> usize {
132 self.version_history
133 .read()
134 .expect("lock is poisoned")
135 .free_list_len()
136 }
137
138 fn prefix<K: AsRef<[u8]>>(
139 &self,
140 prefix: K,
141 seqno: SeqNo,
142 index: Option<Arc<Memtable>>,
143 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
144 Box::new(
145 self.create_prefix(&prefix, seqno, index)
146 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
147 )
148 }
149
150 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
151 &self,
152 range: R,
153 seqno: SeqNo,
154 index: Option<Arc<Memtable>>,
155 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
156 Box::new(
157 self.create_range(&range, seqno, index)
158 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
159 )
160 }
161
162 fn tombstone_count(&self) -> u64 {
164 self.current_version()
165 .iter_tables()
166 .map(Table::tombstone_count)
167 .sum()
168 }
169
170 fn weak_tombstone_count(&self) -> u64 {
172 self.current_version()
173 .iter_tables()
174 .map(Table::weak_tombstone_count)
175 .sum()
176 }
177
178 fn weak_tombstone_reclaimable_count(&self) -> u64 {
180 self.current_version()
181 .iter_tables()
182 .map(Table::weak_tombstone_reclaimable)
183 .sum()
184 }
185
186 fn ingest(
187 &self,
188 iter: impl Iterator<Item = (UserKey, UserValue)>,
189 seqno_generator: &SequenceNumberCounter,
190 visible_seqno: &SequenceNumberCounter,
191 ) -> crate::Result<()> {
192 use crate::tree::ingest::Ingestion;
193 use std::time::Instant;
194
195 let seqno = seqno_generator.next();
196
197 let mut writer = Ingestion::new(self)?.with_seqno(seqno);
200
201 let start = Instant::now();
202 let mut count = 0;
203 let mut last_key = None;
204
205 for (key, value) in iter {
206 if let Some(last_key) = &last_key {
207 assert!(
208 key > last_key,
209 "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
210 );
211 }
212 last_key = Some(key.clone());
213
214 writer.write(key, value)?;
215
216 count += 1;
217 }
218
219 writer.finish()?;
220
221 visible_seqno.fetch_max(seqno + 1);
222
223 log::info!("Ingested {count} items in {:?}", start.elapsed());
224
225 Ok(())
226 }
227
228 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
229 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
230
231 if is_empty {
232 return Ok(());
233 }
234
235 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
236
237 let _lock = self
239 .0
240 .major_compaction_lock
241 .write()
242 .expect("lock is poisoned");
243
244 log::info!("Starting drop_range compaction");
245 self.inner_compact(strategy, 0)
246 }
247
248 #[doc(hidden)]
249 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
250 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
251
252 let _lock = self
254 .0
255 .major_compaction_lock
256 .write()
257 .expect("lock is poisoned");
258
259 log::info!("Starting major compaction");
260 self.inner_compact(strategy, seqno_threshold)
261 }
262
263 fn l0_run_count(&self) -> usize {
264 self.current_version()
265 .level(0)
266 .map(|x| x.run_count())
267 .unwrap_or_default()
268 }
269
270 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
271 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
272 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
273 }
274
275 fn filter_size(&self) -> usize {
276 self.current_version()
277 .iter_tables()
278 .map(Table::filter_size)
279 .sum()
280 }
281
282 fn pinned_filter_size(&self) -> usize {
283 self.current_version()
284 .iter_tables()
285 .map(Table::pinned_filter_size)
286 .sum()
287 }
288
289 fn pinned_block_index_size(&self) -> usize {
290 self.current_version()
291 .iter_tables()
292 .map(Table::pinned_block_index_size)
293 .sum()
294 }
295
296 fn sealed_memtable_count(&self) -> usize {
297 self.version_history
298 .read()
299 .expect("lock is poisoned")
300 .latest_version()
301 .sealed_memtables
302 .len()
303 }
304
305 fn flush_memtable(
306 &self,
307 table_id: TableId,
308 memtable: &Arc<Memtable>,
309 seqno_threshold: SeqNo,
310 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
311 use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
312 use std::time::Instant;
313
314 let start = Instant::now();
315
316 let folder = self.config.path.join(TABLES_FOLDER);
317 let table_file_path = folder.join(table_id.to_string());
318
319 let data_block_size = self.config.data_block_size_policy.get(0);
320
321 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
322 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
323
324 let data_block_compression = self.config.data_block_compression_policy.get(0);
325 let index_block_compression = self.config.index_block_compression_policy.get(0);
326
327 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
328
329 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
330 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
331
332 log::debug!(
333 "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
334 table_file_path.display(),
335 );
336
337 let mut table_writer = Writer::new(table_file_path, table_id, 0)?
338 .use_data_block_restart_interval(data_block_restart_interval)
339 .use_index_block_restart_interval(index_block_restart_interval)
340 .use_data_block_compression(data_block_compression)
341 .use_index_block_compression(index_block_compression)
342 .use_data_block_size(data_block_size)
343 .use_data_block_hash_ratio(data_block_hash_ratio)
344 .use_bloom_policy({
345 use crate::config::FilterPolicyEntry::{Bloom, None};
346 use crate::table::filter::BloomConstructionPolicy;
347
348 match self.config.filter_policy.get(0) {
349 Bloom(policy) => policy,
350 None => BloomConstructionPolicy::BitsPerKey(0.0),
351 }
352 });
353
354 if index_partitioning {
355 table_writer = table_writer.use_partitioned_index();
356 }
357 if filter_partitioning {
358 table_writer = table_writer.use_partitioned_filter();
359 }
360
361 let iter = memtable.iter().map(Ok);
362 let compaction_filter = CompactionStream::new(iter, seqno_threshold);
363
364 for item in compaction_filter {
365 table_writer.write(item?)?;
366 }
367
368 let result = self.consume_writer(table_writer)?;
369
370 log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
371
372 Ok(result.map(|table| (table, None)))
373 }
374
375 #[expect(clippy::significant_drop_tightening)]
376 fn register_tables(
377 &self,
378 tables: &[Table],
379 blob_files: Option<&[BlobFile]>,
380 frag_map: Option<FragmentationMap>,
381 ) -> crate::Result<()> {
382 log::trace!(
383 "Registering {} tables, {} blob files",
384 tables.len(),
385 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
386 );
387
388 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
389 let mut version_lock = self.version_history.write().expect("lock is poisoned");
390
391 version_lock.upgrade_version(
392 &self.config.path,
393 |current| {
394 let mut copy = current.clone();
395
396 copy.version = copy.version.with_new_l0_run(
397 tables,
398 blob_files,
399 frag_map.filter(|x| !x.is_empty()),
400 );
401
402 for table in tables {
403 log::trace!("releasing sealed memtable {}", table.id());
404 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table.id()));
405 }
406
407 Ok(copy)
408 },
409 &self.config.seqno,
410 )?;
411
412 Ok(())
413 }
414
415 fn clear_active_memtable(&self) {
416 self.version_history
417 .write()
418 .expect("lock is poisoned")
419 .latest_version()
420 .active_memtable = Arc::new(Memtable::default());
421 }
422
423 fn set_active_memtable(&self, memtable: Memtable) {
424 self.version_history
425 .write()
426 .expect("lock is poisoned")
427 .latest_version()
428 .active_memtable = Arc::new(memtable);
429 }
430
431 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
432 let mut version_lock = self.version_history.write().expect("lock is poisoned");
433 version_lock.append_sealed_memtable(id, memtable);
434 }
435
436 fn compact(
437 &self,
438 strategy: Arc<dyn CompactionStrategy>,
439 seqno_threshold: SeqNo,
440 ) -> crate::Result<()> {
441 let _lock = self
445 .0
446 .major_compaction_lock
447 .read()
448 .expect("lock is poisoned");
449
450 self.inner_compact(strategy, seqno_threshold)
451 }
452
453 fn get_next_table_id(&self) -> TableId {
454 self.0.get_next_table_id()
455 }
456
457 fn tree_config(&self) -> &Config {
458 &self.config
459 }
460
461 fn active_memtable_size(&self) -> u64 {
462 use std::sync::atomic::Ordering::Acquire;
463
464 self.version_history
465 .read()
466 .expect("lock is poisoned")
467 .latest_version()
468 .active_memtable
469 .approximate_size
470 .load(Acquire)
471 }
472
473 fn tree_type(&self) -> crate::TreeType {
474 crate::TreeType::Standard
475 }
476
477 #[expect(clippy::significant_drop_tightening)]
478 fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
479 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
480 let super_version = version_history_lock.latest_version();
481
482 if super_version.active_memtable.is_empty() {
483 return None;
484 }
485
486 let yanked_memtable = super_version.active_memtable;
487 let tmp_memtable_id = self.get_next_table_id();
488
489 let mut copy = version_history_lock.latest_version();
490 copy.seqno = self.config.seqno.next();
491 copy.active_memtable = Arc::new(Memtable::default());
492 copy.sealed_memtables = Arc::new(
493 super_version
494 .sealed_memtables
495 .add(tmp_memtable_id, yanked_memtable.clone()),
496 );
497
498 version_history_lock.append_version(copy);
499
500 log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
501
502 Some((tmp_memtable_id, yanked_memtable))
503 }
504
505 fn table_count(&self) -> usize {
506 self.current_version().table_count()
507 }
508
509 fn level_table_count(&self, idx: usize) -> Option<usize> {
510 self.current_version().level(idx).map(|x| x.table_count())
511 }
512
513 #[expect(clippy::significant_drop_tightening)]
514 fn approximate_len(&self) -> usize {
515 let super_version = self
516 .version_history
517 .read()
518 .expect("lock is poisoned")
519 .latest_version();
520
521 let tables_item_count = self
522 .current_version()
523 .iter_tables()
524 .map(|x| x.metadata.item_count)
525 .sum::<u64>();
526
527 let memtable_count = super_version.active_memtable.len() as u64;
528 let sealed_count = super_version
529 .sealed_memtables
530 .iter()
531 .map(|(_, mt)| mt.len())
532 .sum::<usize>() as u64;
533
534 (memtable_count + sealed_count + tables_item_count)
535 .try_into()
536 .expect("should not be too large")
537 }
538
539 fn disk_space(&self) -> u64 {
540 self.current_version()
541 .iter_levels()
542 .map(super::version::Level::size)
543 .sum()
544 }
545
546 #[expect(clippy::significant_drop_tightening)]
547 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
548 let version = self
549 .version_history
550 .read()
551 .expect("lock is poisoned")
552 .latest_version();
553
554 let active = version.active_memtable.get_highest_seqno();
555
556 let sealed = version
557 .sealed_memtables
558 .iter()
559 .map(|(_, table)| table.get_highest_seqno())
560 .max()
561 .flatten();
562
563 active.max(sealed)
564 }
565
566 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
567 self.current_version()
568 .iter_tables()
569 .map(Table::get_highest_seqno)
570 .max()
571 }
572
573 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
574 Ok(self
575 .get_internal_entry(key.as_ref(), seqno)?
576 .map(|x| x.value))
577 }
578
579 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
580 &self,
581 key: K,
582 value: V,
583 seqno: SeqNo,
584 ) -> (u64, u64) {
585 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
586 self.append_entry(value)
587 }
588
589 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
590 let value = InternalValue::new_tombstone(key, seqno);
591 self.append_entry(value)
592 }
593
594 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
595 let value = InternalValue::new_weak_tombstone(key, seqno);
596 self.append_entry(value)
597 }
598}
599
600impl Tree {
601 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
602 self.version_history
603 .read()
604 .expect("lock is poisoned")
605 .get_version_for_snapshot(seqno)
606 }
607
608 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
619 range: &R,
620 ) -> (OwnedBounds, bool) {
621 use Bound::{Excluded, Included, Unbounded};
622
623 let start = match range.start_bound() {
624 Included(key) => Included(Slice::from(key.as_ref())),
625 Excluded(key) => Excluded(Slice::from(key.as_ref())),
626 Unbounded => Unbounded,
627 };
628
629 let end = match range.end_bound() {
630 Included(key) => Included(Slice::from(key.as_ref())),
631 Excluded(key) => Excluded(Slice::from(key.as_ref())),
632 Unbounded => Unbounded,
633 };
634
635 let is_empty =
636 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
637 lo.as_ref() > hi.as_ref()
638 } else {
639 false
640 };
641
642 (OwnedBounds { start, end }, is_empty)
643 }
644
645 pub(crate) fn open(config: Config) -> crate::Result<Self> {
658 use crate::file::MANIFEST_FILE;
659
660 log::debug!("Opening LSM-tree at {}", config.path.display());
661
662 if config.path.join("version").try_exists()? {
664 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
665 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
666 }
667
668 let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
669 Self::recover(config)
670 } else {
671 Self::create_new(config)
672 }?;
673
674 Ok(tree)
675 }
676
677 pub(crate) fn consume_writer(
678 &self,
679 writer: crate::table::Writer,
680 ) -> crate::Result<Option<Table>> {
681 let table_file_path = writer.path.clone();
682
683 let Some((_, checksum)) = writer.finish()? else {
684 return Ok(None);
685 };
686
687 log::debug!("Finalized table write at {}", table_file_path.display());
688
689 let pin_filter = self.config.filter_block_pinning_policy.get(0);
690 let pin_index = self.config.filter_block_pinning_policy.get(0);
691
692 let created_table = Table::recover(
693 table_file_path,
694 checksum,
695 self.id,
696 self.config.cache.clone(),
697 self.config.descriptor_table.clone(),
698 pin_filter,
699 pin_index,
700 #[cfg(feature = "metrics")]
701 self.metrics.clone(),
702 )?;
703
704 log::debug!("Flushed table to {:?}", created_table.path);
705
706 Ok(Some(created_table))
707 }
708
709 #[doc(hidden)]
711 #[must_use]
712 pub fn is_compacting(&self) -> bool {
713 !self
714 .compaction_state
715 .lock()
716 .expect("lock is poisoned")
717 .hidden_set()
718 .is_empty()
719 }
720
721 fn get_internal_entry_from_sealed_memtables(
722 super_version: &SuperVersion,
723 key: &[u8],
724 seqno: SeqNo,
725 ) -> Option<InternalValue> {
726 for (_, memtable) in super_version.sealed_memtables.iter().rev() {
727 if let Some(entry) = memtable.get(key, seqno) {
728 return Some(entry);
729 }
730 }
731
732 None
733 }
734
735 fn get_internal_entry_from_tables(
736 &self,
737 version: &Version,
738 key: &[u8],
739 seqno: SeqNo,
740 ) -> crate::Result<Option<InternalValue>> {
741 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
744
745 for level in version.iter_levels() {
746 for run in level.iter() {
747 if run.len() >= 4 {
749 if let Some(table) = run.get_for_key(key) {
750 if let Some(item) = table.get(key, seqno, key_hash)? {
751 return Ok(ignore_tombstone_value(item));
752 }
753 }
754 } else {
755 for table in run.iter() {
757 if !table.is_key_in_key_range(key) {
758 continue;
759 }
760
761 if let Some(item) = table.get(key, seqno, key_hash)? {
762 return Ok(ignore_tombstone_value(item));
763 }
764 }
765 }
766 }
767 }
768
769 Ok(None)
770 }
771
772 fn inner_compact(
773 &self,
774 strategy: Arc<dyn CompactionStrategy>,
775 mvcc_gc_watermark: SeqNo,
776 ) -> crate::Result<()> {
777 use crate::compaction::worker::{do_compaction, Options};
778
779 let mut opts = Options::from_tree(self, strategy);
780 opts.mvcc_gc_watermark = mvcc_gc_watermark;
781
782 do_compaction(&opts)?;
783
784 log::debug!("Compaction run over");
785
786 Ok(())
787 }
788
789 #[doc(hidden)]
790 #[must_use]
791 pub fn create_iter(
792 &self,
793 seqno: SeqNo,
794 ephemeral: Option<Arc<Memtable>>,
795 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
796 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
797 }
798
799 #[doc(hidden)]
800 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
801 &'a self,
802 range: &'a R,
803 seqno: SeqNo,
804 ephemeral: Option<Arc<Memtable>>,
805 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
806 use crate::range::{IterState, TreeIter};
807 use std::ops::Bound::{self, Excluded, Included, Unbounded};
808
809 let lo: Bound<UserKey> = match range.start_bound() {
810 Included(x) => Included(x.as_ref().into()),
811 Excluded(x) => Excluded(x.as_ref().into()),
812 Unbounded => Unbounded,
813 };
814
815 let hi: Bound<UserKey> = match range.end_bound() {
816 Included(x) => Included(x.as_ref().into()),
817 Excluded(x) => Excluded(x.as_ref().into()),
818 Unbounded => Unbounded,
819 };
820
821 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
822
823 let version = self.get_version_for_snapshot(seqno);
824
825 let iter_state = { IterState { version, ephemeral } };
826
827 TreeIter::create_range(iter_state, bounds, seqno)
828 }
829
830 #[doc(hidden)]
831 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
832 &self,
833 range: &'a R,
834 seqno: SeqNo,
835 ephemeral: Option<Arc<Memtable>>,
836 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
837 self.create_internal_range(range, seqno, ephemeral)
838 .map(|item| match item {
839 Ok(kv) => Ok((kv.key.user_key, kv.value)),
840 Err(e) => Err(e),
841 })
842 }
843
844 #[doc(hidden)]
845 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
846 &self,
847 prefix: K,
848 seqno: SeqNo,
849 ephemeral: Option<Arc<Memtable>>,
850 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
851 use crate::range::prefix_to_range;
852
853 let range = prefix_to_range(prefix.as_ref());
854 self.create_range(&range, seqno, ephemeral)
855 }
856
857 #[doc(hidden)]
861 #[must_use]
862 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
863 self.version_history
864 .read()
865 .expect("lock is poisoned")
866 .latest_version()
867 .active_memtable
868 .insert(value)
869 }
870
871 fn recover(mut config: Config) -> crate::Result<Self> {
877 use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
878 use inner::get_next_tree_id;
879
880 log::info!("Recovering LSM-tree at {}", config.path.display());
881
882 let manifest = {
883 let manifest_path = config.path.join(MANIFEST_FILE);
884 let reader = sfa::Reader::new(&manifest_path)?;
885 Manifest::decode_from(&manifest_path, &reader)?
886 };
887
888 if manifest.version != FormatVersion::V3 {
889 if manifest.version == FormatVersion::V2 {
890 log::error!("It looks like you are trying to open a V2 database - the database needs a manual migration, a tool is available at <TODO: 3.0.0 LINK>.");
891 }
892 if manifest.version as u8 > 3 {
893 log::error!("It looks like you are trying to open a database from the future. Are you a time traveller?");
894 }
895 return Err(crate::Error::InvalidVersion(manifest.version.into()));
896 }
897
898 config.level_count = manifest.level_count;
900
901 let tree_id = get_next_tree_id();
902
903 #[cfg(feature = "metrics")]
904 let metrics = Arc::new(Metrics::default());
905
906 let version = Self::recover_levels(
907 &config.path,
908 tree_id,
909 &config.cache,
910 &config.descriptor_table,
911 #[cfg(feature = "metrics")]
912 &metrics,
913 )?;
914
915 let highest_table_id = version
916 .iter_tables()
917 .map(Table::id)
918 .max()
919 .unwrap_or_default();
920
921 let inner = TreeInner {
922 id: tree_id,
923 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
924 blob_file_id_generator: SequenceNumberCounter::default(),
925 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
926 stop_signal: StopSignal::default(),
927 config,
928 major_compaction_lock: RwLock::default(),
929 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
930
931 #[cfg(feature = "metrics")]
932 metrics,
933 };
934
935 Ok(Self(Arc::new(inner)))
936 }
937
938 fn create_new(config: Config) -> crate::Result<Self> {
940 use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
941 use std::fs::create_dir_all;
942
943 let path = config.path.clone();
944 log::trace!("Creating LSM-tree at {}", path.display());
945
946 create_dir_all(&path)?;
947
948 let manifest_path = path.join(MANIFEST_FILE);
949 assert!(!manifest_path.try_exists()?);
950
951 let table_folder_path = path.join(TABLES_FOLDER);
952 create_dir_all(&table_folder_path)?;
953
954 {
956 let mut writer = sfa::Writer::new_at_path(manifest_path)?;
957
958 Manifest {
959 version: FormatVersion::V3,
960 level_count: config.level_count,
961 tree_type: if config.kv_separation_opts.is_some() {
962 TreeType::Blob
963 } else {
964 TreeType::Standard
965 },
966 }
967 .encode_into(&mut writer)?;
968
969 writer.finish()?;
970 }
971
972 fsync_directory(&table_folder_path)?;
974 fsync_directory(&path)?;
975
976 let inner = TreeInner::create_new(config)?;
977 Ok(Self(Arc::new(inner)))
978 }
979
980 fn recover_levels<P: AsRef<Path>>(
982 tree_path: P,
983 tree_id: TreeId,
984 cache: &Arc<Cache>,
985 descriptor_table: &Arc<DescriptorTable>,
986 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
987 ) -> crate::Result<Version> {
988 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
989
990 let tree_path = tree_path.as_ref();
991
992 let recovery = recover(tree_path)?;
993
994 let table_map = {
995 let mut result: crate::HashMap<TableId, (u8 , Checksum)> =
996 crate::HashMap::default();
997
998 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
999 for run in table_ids {
1000 for &(table_id, checksum) in run {
1001 #[expect(
1002 clippy::expect_used,
1003 reason = "there are always less than 256 levels"
1004 )]
1005 result.insert(
1006 table_id,
1007 (
1008 level_idx
1009 .try_into()
1010 .expect("there are less than 256 levels"),
1011 checksum,
1012 ),
1013 );
1014 }
1015 }
1016 }
1017
1018 result
1019 };
1020
1021 let cnt = table_map.len();
1022
1023 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1024
1025 let progress_mod = match cnt {
1026 _ if cnt <= 20 => 1,
1027 _ if cnt <= 100 => 10,
1028 _ => 100,
1029 };
1030
1031 let mut tables = vec![];
1032
1033 let table_base_folder = tree_path.join(TABLES_FOLDER);
1034
1035 if !table_base_folder.try_exists()? {
1036 std::fs::create_dir_all(&table_base_folder)?;
1037 fsync_directory(&table_base_folder)?;
1038 }
1039
1040 let mut orphaned_tables = vec![];
1041
1042 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1043 let dirent = dirent?;
1044 let file_name = dirent.file_name();
1045
1046 if file_name == ".DS_Store" {
1048 continue;
1049 }
1050
1051 if file_name.to_string_lossy().starts_with("._") {
1053 continue;
1054 }
1055
1056 let table_file_name = file_name.to_str().ok_or_else(|| {
1057 log::error!("invalid table file name {}", file_name.display());
1058 crate::Error::Unrecoverable
1059 })?;
1060
1061 let table_file_path = dirent.path();
1062 assert!(!table_file_path.is_dir());
1063
1064 log::debug!("Recovering table from {}", table_file_path.display());
1065
1066 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1067 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1068 crate::Error::Unrecoverable
1069 })?;
1070
1071 if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1072 let table = Table::recover(
1073 table_file_path,
1074 checksum,
1075 tree_id,
1076 cache.clone(),
1077 descriptor_table.clone(),
1078 level_idx <= 1, level_idx <= 2, #[cfg(feature = "metrics")]
1081 metrics.clone(),
1082 )?;
1083
1084 log::debug!("Recovered table from {:?}", table.path);
1085
1086 tables.push(table);
1087
1088 if idx % progress_mod == 0 {
1089 log::debug!("Recovered {idx}/{cnt} tables");
1090 }
1091 } else {
1092 orphaned_tables.push(table_file_path);
1093 }
1094 }
1095
1096 if tables.len() < cnt {
1097 log::error!(
1098 "Recovered less tables than expected: {:?}",
1099 table_map.keys(),
1100 );
1101 return Err(crate::Error::Unrecoverable);
1102 }
1103
1104 log::debug!("Successfully recovered {} tables", tables.len());
1105
1106 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1107 &tree_path.join(BLOBS_FOLDER),
1108 &recovery.blob_file_ids,
1109 )?;
1110
1111 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1112
1113 Self::cleanup_orphaned_version(tree_path, version.id())?;
1116
1117 for table_path in orphaned_tables {
1118 log::debug!("Deleting orphaned table {}", table_path.display());
1119 std::fs::remove_file(&table_path)?;
1120 }
1121
1122 for blob_file_path in orphaned_blob_files {
1123 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1124 std::fs::remove_file(&blob_file_path)?;
1125 }
1126
1127 Ok(version)
1128 }
1129
1130 fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1131 let version_str = format!("v{latest_version_id}");
1132
1133 for file in std::fs::read_dir(path)? {
1134 let dirent = file?;
1135
1136 if dirent.file_type()?.is_dir() {
1137 continue;
1138 }
1139
1140 let name = dirent.file_name();
1141
1142 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1143 log::trace!("Cleanup orphaned version {}", name.display());
1144 std::fs::remove_file(dirent.path())?;
1145 }
1146 }
1147
1148 Ok(())
1149 }
1150}