1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 key::InternalKey,
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::Table,
20 value::InternalValue,
21 version::{recovery::recover, SuperVersion, SuperVersions, Version},
22 vlog::BlobFile,
23 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
24 ValueType,
25};
26use inner::{TreeId, TreeInner};
27use std::{
28 ops::{Bound, RangeBounds},
29 path::Path,
30 sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40 fn into_inner_if(
41 self,
42 pred: impl Fn(&UserKey) -> bool,
43 ) -> crate::Result<(UserKey, Option<UserValue>)> {
44 let (k, v) = self.0?;
45
46 if pred(&k) {
47 Ok((k, Some(v)))
48 } else {
49 Ok((k, None))
50 }
51 }
52
53 fn key(self) -> crate::Result<UserKey> {
54 self.0.map(|(k, _)| k)
55 }
56
57 fn size(self) -> crate::Result<u32> {
58 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
59 self.into_inner().map(|(_, v)| v.len() as u32)
60 }
61
62 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63 self.0
64 }
65}
66
67fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
68 if item.is_tombstone() {
69 None
70 } else {
71 Some(item)
72 }
73}
74
75#[derive(Clone)]
77pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
78
79impl std::ops::Deref for Tree {
80 type Target = TreeInner;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl AbstractTree for Tree {
88 fn table_file_cache_size(&self) -> usize {
89 self.config
90 .descriptor_table
91 .as_ref()
92 .map_or(0, |dt| dt.len())
93 }
94
95 fn get_version_history_lock(
96 &self,
97 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
98 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
99 self.version_history.write().expect("lock is poisoned")
100 }
101
102 fn next_table_id(&self) -> TableId {
103 self.0.table_id_counter.get()
104 }
105
106 fn id(&self) -> TreeId {
107 self.id
108 }
109
110 fn blob_file_count(&self) -> usize {
111 0
112 }
113
114 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
115 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
116 let super_version = self
117 .version_history
118 .read()
119 .expect("lock is poisoned")
120 .latest_version();
121
122 let key = Slice::from(key);
123
124 for kv in super_version
125 .active_memtable
126 .range(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)..)
127 {
128 log::info!("[Active] {kv:?}");
129 }
130
131 for mt in super_version.sealed_memtables.iter().rev() {
132 for kv in mt.range(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)..) {
133 log::info!("[Sealed #{}] {kv:?}", mt.id());
134 }
135 }
136
137 for table in super_version
138 .version
139 .iter_levels()
140 .flat_map(|lvl| lvl.iter())
141 .filter_map(|run| run.get_for_key(&key))
142 {
143 for kv in table.range(..) {
144 let kv = kv?;
145
146 if kv.key.user_key != key {
147 break;
148 }
149
150 log::info!("[Table #{}] {kv:?}", table.id());
151 }
152 }
153
154 Ok(())
155 }
156
157 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
158 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
159 let super_version = self
160 .version_history
161 .read()
162 .expect("lock is poisoned")
163 .get_version_for_snapshot(seqno);
164
165 Self::get_internal_entry_from_version(&super_version, key, seqno)
166 }
167
168 fn current_version(&self) -> Version {
169 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
170 self.version_history
171 .read()
172 .expect("poisoned")
173 .latest_version()
174 .version
175 }
176
177 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
178 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
179 self.flush_lock.lock().expect("lock is poisoned")
180 }
181
182 #[cfg(feature = "metrics")]
183 fn metrics(&self) -> &Arc<crate::Metrics> {
184 &self.0.metrics
185 }
186
187 fn version_free_list_len(&self) -> usize {
188 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
189 self.version_history
190 .read()
191 .expect("lock is poisoned")
192 .free_list_len()
193 }
194
195 fn prefix<K: AsRef<[u8]>>(
196 &self,
197 prefix: K,
198 seqno: SeqNo,
199 index: Option<(Arc<Memtable>, SeqNo)>,
200 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
201 Box::new(
202 self.create_prefix(&prefix, seqno, index)
203 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
204 )
205 }
206
207 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
208 &self,
209 range: R,
210 seqno: SeqNo,
211 index: Option<(Arc<Memtable>, SeqNo)>,
212 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
213 Box::new(
214 self.create_range(&range, seqno, index)
215 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
216 )
217 }
218
219 fn tombstone_count(&self) -> u64 {
221 self.current_version()
222 .iter_tables()
223 .map(Table::tombstone_count)
224 .sum()
225 }
226
227 fn weak_tombstone_count(&self) -> u64 {
229 self.current_version()
230 .iter_tables()
231 .map(Table::weak_tombstone_count)
232 .sum()
233 }
234
235 fn weak_tombstone_reclaimable_count(&self) -> u64 {
237 self.current_version()
238 .iter_tables()
239 .map(Table::weak_tombstone_reclaimable)
240 .sum()
241 }
242
243 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
244 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
245
246 if is_empty {
247 return Ok(());
248 }
249
250 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
251
252 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
254 let _lock = self
255 .0
256 .major_compaction_lock
257 .write()
258 .expect("lock is poisoned");
259
260 log::info!("Starting drop_range compaction");
261 self.inner_compact(strategy, 0)
262 }
263
264 fn clear(&self) -> crate::Result<()> {
265 let mut versions = self.get_version_history_lock();
266
267 versions.upgrade_version(
268 &self.config.path,
269 |v| {
270 let mut copy = v.clone();
271 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
272 copy.sealed_memtables = Arc::default();
273 copy.version = Version::new(v.version.id() + 1, self.tree_type());
274 Ok(copy)
275 },
276 &self.config.seqno,
277 &self.config.visible_seqno,
278 )
279 }
280
281 #[doc(hidden)]
282 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
283 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
284
285 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
287 let _lock = self
288 .0
289 .major_compaction_lock
290 .write()
291 .expect("lock is poisoned");
292
293 log::info!("Starting major compaction");
294 self.inner_compact(strategy, seqno_threshold)
295 }
296
297 fn l0_run_count(&self) -> usize {
298 self.current_version()
299 .level(0)
300 .map(|x| x.run_count())
301 .unwrap_or_default()
302 }
303
304 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
305 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
306 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
307 }
308
309 fn filter_size(&self) -> u64 {
310 self.current_version()
311 .iter_tables()
312 .map(Table::filter_size)
313 .map(u64::from)
314 .sum()
315 }
316
317 fn pinned_filter_size(&self) -> usize {
318 self.current_version()
319 .iter_tables()
320 .map(Table::pinned_filter_size)
321 .sum()
322 }
323
324 fn pinned_block_index_size(&self) -> usize {
325 self.current_version()
326 .iter_tables()
327 .map(Table::pinned_block_index_size)
328 .sum()
329 }
330
331 fn sealed_memtable_count(&self) -> usize {
332 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
333 self.version_history
334 .read()
335 .expect("lock is poisoned")
336 .latest_version()
337 .sealed_memtables
338 .len()
339 }
340
341 fn flush_to_tables(
342 &self,
343 stream: impl Iterator<Item = crate::Result<InternalValue>>,
344 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
345 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
346 use std::time::Instant;
347
348 let start = Instant::now();
349
350 let folder = self.config.path.join(TABLES_FOLDER);
351
352 let data_block_size = self.config.data_block_size_policy.get(0);
353
354 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
355 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
356
357 let data_block_compression = self.config.data_block_compression_policy.get(0);
358 let index_block_compression = self.config.index_block_compression_policy.get(0);
359
360 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
361
362 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
363 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
364
365 log::debug!(
366 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
367 folder.display(),
368 );
369
370 let mut table_writer = MultiWriter::new(
371 folder.clone(),
372 self.table_id_counter.clone(),
373 64 * 1_024 * 1_024,
374 0,
375 )?
376 .use_data_block_restart_interval(data_block_restart_interval)
377 .use_index_block_restart_interval(index_block_restart_interval)
378 .use_data_block_compression(data_block_compression)
379 .use_index_block_compression(index_block_compression)
380 .use_data_block_size(data_block_size)
381 .use_data_block_hash_ratio(data_block_hash_ratio)
382 .use_bloom_policy({
383 use crate::config::FilterPolicyEntry::{Bloom, None};
384 use crate::table::filter::BloomConstructionPolicy;
385
386 match self.config.filter_policy.get(0) {
387 Bloom(policy) => policy,
388 None => BloomConstructionPolicy::BitsPerKey(0.0),
389 }
390 });
391
392 if index_partitioning {
393 table_writer = table_writer.use_partitioned_index();
394 }
395 if filter_partitioning {
396 table_writer = table_writer.use_partitioned_filter();
397 }
398
399 for item in stream {
400 table_writer.write(item?)?;
401 }
402
403 let result = table_writer.finish()?;
404
405 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
406
407 let pin_filter = self.config.filter_block_pinning_policy.get(0);
408 let pin_index = self.config.index_block_pinning_policy.get(0);
409
410 let tables = result
412 .into_iter()
413 .map(|(table_id, checksum)| -> crate::Result<Table> {
414 Table::recover(
415 folder.join(table_id.to_string()),
416 checksum,
417 0,
418 self.id,
419 self.config.cache.clone(),
420 self.config.descriptor_table.clone(),
421 pin_filter,
422 pin_index,
423 #[cfg(feature = "metrics")]
424 self.metrics.clone(),
425 )
426 })
427 .collect::<crate::Result<Vec<_>>>()?;
428
429 Ok(Some((tables, None)))
430 }
431
432 #[expect(clippy::significant_drop_tightening)]
433 fn register_tables(
434 &self,
435 tables: &[Table],
436 blob_files: Option<&[BlobFile]>,
437 frag_map: Option<crate::blob_tree::FragmentationMap>,
438 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
439 gc_watermark: SeqNo,
440 ) -> crate::Result<()> {
441 log::trace!(
442 "Registering {} tables, {} blob files",
443 tables.len(),
444 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
445 );
446
447 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
448 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
449 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
450 let mut version_lock = self.version_history.write().expect("lock is poisoned");
451
452 version_lock.upgrade_version(
453 &self.config.path,
454 |current| {
455 let mut copy = current.clone();
456
457 copy.version = copy.version.with_new_l0_run(
458 tables,
459 blob_files,
460 frag_map.filter(|x| !x.is_empty()),
461 );
462
463 for &table_id in sealed_memtables_to_delete {
464 log::trace!("releasing sealed memtable #{table_id}");
465 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
466 }
467
468 Ok(copy)
469 },
470 &self.config.seqno,
471 &self.config.visible_seqno,
472 )?;
473
474 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
475 log::warn!("Version GC failed: {e:?}");
476 }
477
478 Ok(())
479 }
480
481 fn clear_active_memtable(&self) {
482 use crate::tree::sealed::SealedMemtables;
483
484 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
485 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
486 let super_version = version_history_lock.latest_version();
487
488 if super_version.active_memtable.is_empty() {
489 return;
490 }
491
492 let mut copy = version_history_lock.latest_version();
493 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
494 copy.sealed_memtables = Arc::new(SealedMemtables::default());
495
496 copy.seqno = super_version.seqno;
498
499 version_history_lock.replace_latest_version(copy);
500
501 log::trace!("cleared active memtable");
502 }
503
504 fn compact(
505 &self,
506 strategy: Arc<dyn CompactionStrategy>,
507 seqno_threshold: SeqNo,
508 ) -> crate::Result<()> {
509 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
513 let _lock = self
514 .0
515 .major_compaction_lock
516 .read()
517 .expect("lock is poisoned");
518
519 self.inner_compact(strategy, seqno_threshold)
520 }
521
522 fn get_next_table_id(&self) -> TableId {
523 self.0.get_next_table_id()
524 }
525
526 fn tree_config(&self) -> &Config {
527 &self.config
528 }
529
530 fn active_memtable(&self) -> Arc<Memtable> {
531 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
532 self.version_history
533 .read()
534 .expect("lock is poisoned")
535 .latest_version()
536 .active_memtable
537 }
538
539 fn tree_type(&self) -> crate::TreeType {
540 crate::TreeType::Standard
541 }
542
543 #[expect(clippy::significant_drop_tightening)]
544 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
545 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
546 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
547 let super_version = version_history_lock.latest_version();
548
549 if super_version.active_memtable.is_empty() {
550 return None;
551 }
552
553 let yanked_memtable = super_version.active_memtable;
554
555 let mut copy = version_history_lock.latest_version();
556 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
557 copy.sealed_memtables =
558 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
559
560 copy.seqno = super_version.seqno;
562
563 version_history_lock.replace_latest_version(copy);
564
565 log::trace!(
566 "rotate: added memtable id={} to sealed memtables",
567 yanked_memtable.id,
568 );
569
570 Some(yanked_memtable)
571 }
572
573 fn table_count(&self) -> usize {
574 self.current_version().table_count()
575 }
576
577 fn level_table_count(&self, idx: usize) -> Option<usize> {
578 self.current_version().level(idx).map(|x| x.table_count())
579 }
580
581 fn approximate_len(&self) -> usize {
582 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
583 let super_version = self
584 .version_history
585 .read()
586 .expect("lock is poisoned")
587 .latest_version();
588
589 let tables_item_count = self
590 .current_version()
591 .iter_tables()
592 .map(|x| x.metadata.item_count)
593 .sum::<u64>();
594
595 let memtable_count = super_version.active_memtable.len() as u64;
596 let sealed_count = super_version
597 .sealed_memtables
598 .iter()
599 .map(|mt| mt.len())
600 .sum::<usize>() as u64;
601
602 #[expect(clippy::expect_used, reason = "result should fit into usize")]
603 (memtable_count + sealed_count + tables_item_count)
604 .try_into()
605 .expect("approximate_len too large for usize")
606 }
607
608 fn disk_space(&self) -> u64 {
609 self.current_version()
610 .iter_levels()
611 .map(super::version::Level::size)
612 .sum()
613 }
614
615 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
616 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
617 let version = self
618 .version_history
619 .read()
620 .expect("lock is poisoned")
621 .latest_version();
622
623 let active = version.active_memtable.get_highest_seqno();
624
625 let sealed = version
626 .sealed_memtables
627 .iter()
628 .map(|mt| mt.get_highest_seqno())
629 .max()
630 .flatten();
631
632 active.max(sealed)
633 }
634
635 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
636 self.current_version()
637 .iter_tables()
638 .map(Table::get_highest_seqno)
639 .max()
640 }
641
642 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
643 Ok(self
644 .get_internal_entry(key.as_ref(), seqno)?
645 .map(|x| x.value))
646 }
647
648 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
649 &self,
650 key: K,
651 value: V,
652 seqno: SeqNo,
653 ) -> (u64, u64) {
654 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
655 self.append_entry(value)
656 }
657
658 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
659 let value = InternalValue::new_tombstone(key, seqno);
660 self.append_entry(value)
661 }
662
663 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
664 let value = InternalValue::new_weak_tombstone(key, seqno);
665 self.append_entry(value)
666 }
667}
668
669impl Tree {
670 #[doc(hidden)]
671 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
672 version: SuperVersion,
673 range: &'a R,
674 seqno: SeqNo,
675 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
676 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
677 use crate::range::{IterState, TreeIter};
678 use std::ops::Bound::{self, Excluded, Included, Unbounded};
679
680 let lo: Bound<UserKey> = match range.start_bound() {
681 Included(x) => Included(x.as_ref().into()),
682 Excluded(x) => Excluded(x.as_ref().into()),
683 Unbounded => Unbounded,
684 };
685
686 let hi: Bound<UserKey> = match range.end_bound() {
687 Included(x) => Included(x.as_ref().into()),
688 Excluded(x) => Excluded(x.as_ref().into()),
689 Unbounded => Unbounded,
690 };
691
692 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
693
694 let iter_state = { IterState { version, ephemeral } };
695
696 TreeIter::create_range(iter_state, bounds, seqno)
697 }
698
699 pub(crate) fn get_internal_entry_from_version(
700 super_version: &SuperVersion,
701 key: &[u8],
702 seqno: SeqNo,
703 ) -> crate::Result<Option<InternalValue>> {
704 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
705 return Ok(ignore_tombstone_value(entry));
706 }
707
708 if let Some(entry) =
710 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
711 {
712 return Ok(ignore_tombstone_value(entry));
713 }
714
715 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
717 }
718
719 fn get_internal_entry_from_tables(
720 version: &Version,
721 key: &[u8],
722 seqno: SeqNo,
723 ) -> crate::Result<Option<InternalValue>> {
724 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
727
728 for table in version
729 .iter_levels()
730 .flat_map(|lvl| lvl.iter())
731 .filter_map(|run| run.get_for_key(key))
732 {
733 if let Some(item) = table.get(key, seqno, key_hash)? {
734 return Ok(ignore_tombstone_value(item));
735 }
736 }
737
738 Ok(None)
739 }
740
741 fn get_internal_entry_from_sealed_memtables(
742 super_version: &SuperVersion,
743 key: &[u8],
744 seqno: SeqNo,
745 ) -> Option<InternalValue> {
746 for mt in super_version.sealed_memtables.iter().rev() {
747 if let Some(entry) = mt.get(key, seqno) {
748 return Some(entry);
749 }
750 }
751
752 None
753 }
754
755 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
756 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
757 self.version_history
758 .read()
759 .expect("lock is poisoned")
760 .get_version_for_snapshot(seqno)
761 }
762
763 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
774 range: &R,
775 ) -> (OwnedBounds, bool) {
776 use Bound::{Excluded, Included, Unbounded};
777
778 let start = match range.start_bound() {
779 Included(key) => Included(Slice::from(key.as_ref())),
780 Excluded(key) => Excluded(Slice::from(key.as_ref())),
781 Unbounded => Unbounded,
782 };
783
784 let end = match range.end_bound() {
785 Included(key) => Included(Slice::from(key.as_ref())),
786 Excluded(key) => Excluded(Slice::from(key.as_ref())),
787 Unbounded => Unbounded,
788 };
789
790 let is_empty =
791 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
792 lo.as_ref() > hi.as_ref()
793 } else {
794 false
795 };
796
797 (OwnedBounds { start, end }, is_empty)
798 }
799
800 pub(crate) fn open(config: Config) -> crate::Result<Self> {
813 log::debug!("Opening LSM-tree at {}", config.path.display());
814
815 if config.path.join("version").try_exists()? {
817 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
818 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
819 }
820
821 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
822 Self::recover(config)
823 } else {
824 Self::create_new(config)
825 }?;
826
827 Ok(tree)
828 }
829
830 #[doc(hidden)]
832 #[must_use]
833 pub fn is_compacting(&self) -> bool {
834 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
835 !self
836 .compaction_state
837 .lock()
838 .expect("lock is poisoned")
839 .hidden_set()
840 .is_empty()
841 }
842
843 fn inner_compact(
844 &self,
845 strategy: Arc<dyn CompactionStrategy>,
846 mvcc_gc_watermark: SeqNo,
847 ) -> crate::Result<()> {
848 use crate::compaction::worker::{do_compaction, Options};
849
850 let mut opts = Options::from_tree(self, strategy);
851 opts.mvcc_gc_watermark = mvcc_gc_watermark;
852
853 do_compaction(&opts)?;
854
855 log::debug!("Compaction run over");
856
857 Ok(())
858 }
859
860 #[doc(hidden)]
861 #[must_use]
862 pub fn create_iter(
863 &self,
864 seqno: SeqNo,
865 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
866 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
867 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
868 }
869
870 #[doc(hidden)]
871 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
872 &self,
873 range: &'a R,
874 seqno: SeqNo,
875 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
876 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
877 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
878 let super_version = self
879 .version_history
880 .read()
881 .expect("lock is poisoned")
882 .get_version_for_snapshot(seqno);
883
884 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
885 Ok(kv) => Ok((kv.key.user_key, kv.value)),
886 Err(e) => Err(e),
887 })
888 }
889
890 #[doc(hidden)]
891 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
892 &self,
893 prefix: K,
894 seqno: SeqNo,
895 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
896 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
897 use crate::range::prefix_to_range;
898
899 let range = prefix_to_range(prefix.as_ref());
900 self.create_range(&range, seqno, ephemeral)
901 }
902
903 #[doc(hidden)]
907 #[must_use]
908 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
909 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
910 self.version_history
911 .read()
912 .expect("lock is poisoned")
913 .latest_version()
914 .active_memtable
915 .insert(value)
916 }
917
918 fn recover(mut config: Config) -> crate::Result<Self> {
924 use crate::stop_signal::StopSignal;
925 use inner::get_next_tree_id;
926
927 log::info!("Recovering LSM-tree at {}", config.path.display());
928
929 let tree_id = get_next_tree_id();
940
941 #[cfg(feature = "metrics")]
942 let metrics = Arc::new(Metrics::default());
943
944 let version = Self::recover_levels(
945 &config.path,
946 tree_id,
947 &config,
948 #[cfg(feature = "metrics")]
949 &metrics,
950 )?;
951
952 {
953 let manifest_path = config.path.join(format!("v{}", version.id()));
954 let reader = sfa::Reader::new(&manifest_path)?;
955 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
956
957 if manifest.version != FormatVersion::V3 {
958 return Err(crate::Error::InvalidVersion(manifest.version.into()));
959 }
960
961 let requested_tree_type = match config.kv_separation_opts {
962 Some(_) => crate::TreeType::Blob,
963 None => crate::TreeType::Standard,
964 };
965
966 if version.tree_type() != requested_tree_type {
967 log::error!(
968 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
969 version.tree_type(),
970 );
971 return Err(crate::Error::Unrecoverable);
972 }
973
974 config.level_count = manifest.level_count;
976 }
977
978 let highest_table_id = version
979 .iter_tables()
980 .map(Table::id)
981 .max()
982 .unwrap_or_default();
983
984 let inner = TreeInner {
985 id: tree_id,
986 memtable_id_counter: SequenceNumberCounter::default(),
987 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
988 blob_file_id_counter: SequenceNumberCounter::default(),
989 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
990 stop_signal: StopSignal::default(),
991 config: Arc::new(config),
992 major_compaction_lock: RwLock::default(),
993 flush_lock: Mutex::default(),
994 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
995
996 #[cfg(feature = "metrics")]
997 metrics,
998 };
999
1000 Ok(Self(Arc::new(inner)))
1001 }
1002
1003 fn create_new(config: Config) -> crate::Result<Self> {
1005 use crate::file::{fsync_directory, TABLES_FOLDER};
1006 use std::fs::create_dir_all;
1007
1008 let path = config.path.clone();
1009 log::trace!("Creating LSM-tree at {}", path.display());
1010
1011 create_dir_all(&path)?;
1012
1013 let table_folder_path = path.join(TABLES_FOLDER);
1014 create_dir_all(&table_folder_path)?;
1015
1016 fsync_directory(&table_folder_path)?;
1018 fsync_directory(&path)?;
1019
1020 let inner = TreeInner::create_new(config)?;
1021 Ok(Self(Arc::new(inner)))
1022 }
1023
1024 fn recover_levels<P: AsRef<Path>>(
1026 tree_path: P,
1027 tree_id: TreeId,
1028 config: &Config,
1029 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1030 ) -> crate::Result<Version> {
1031 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1032
1033 let tree_path = tree_path.as_ref();
1034
1035 let recovery = recover(tree_path)?;
1036
1037 let table_map = {
1038 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
1039 crate::HashMap::default();
1040
1041 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1042 for run in table_ids {
1043 for table in run {
1044 #[expect(
1045 clippy::expect_used,
1046 reason = "there are always less than 256 levels"
1047 )]
1048 result.insert(
1049 table.id,
1050 (
1051 level_idx
1052 .try_into()
1053 .expect("there are less than 256 levels"),
1054 table.checksum,
1055 table.global_seqno,
1056 ),
1057 );
1058 }
1059 }
1060 }
1061
1062 result
1063 };
1064
1065 let cnt = table_map.len();
1066
1067 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1068
1069 let progress_mod = match cnt {
1070 _ if cnt <= 20 => 1,
1071 _ if cnt <= 100 => 10,
1072 _ => 100,
1073 };
1074
1075 let mut tables = vec![];
1076
1077 let table_base_folder = tree_path.join(TABLES_FOLDER);
1078
1079 if !table_base_folder.try_exists()? {
1080 std::fs::create_dir_all(&table_base_folder)?;
1081 fsync_directory(&table_base_folder)?;
1082 }
1083
1084 let mut orphaned_tables = vec![];
1085
1086 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1087 let dirent = dirent?;
1088 let file_name = dirent.file_name();
1089
1090 if file_name == ".DS_Store" {
1092 continue;
1093 }
1094
1095 if file_name.to_string_lossy().starts_with("._") {
1097 continue;
1098 }
1099
1100 let table_file_name = file_name.to_str().ok_or_else(|| {
1101 log::error!("invalid table file name {}", file_name.display());
1102 crate::Error::Unrecoverable
1103 })?;
1104
1105 let table_file_path = dirent.path();
1106 assert!(!table_file_path.is_dir());
1107
1108 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1109 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1110 crate::Error::Unrecoverable
1111 })?;
1112
1113 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1114 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1115 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1116
1117 let table = Table::recover(
1118 table_file_path,
1119 checksum,
1120 global_seqno,
1121 tree_id,
1122 config.cache.clone(),
1123 config.descriptor_table.clone(),
1124 pin_filter,
1125 pin_index,
1126 #[cfg(feature = "metrics")]
1127 metrics.clone(),
1128 )?;
1129
1130 tables.push(table);
1131
1132 if idx % progress_mod == 0 {
1133 log::debug!("Recovered {idx}/{cnt} tables");
1134 }
1135 } else {
1136 orphaned_tables.push(table_file_path);
1137 }
1138 }
1139
1140 if tables.len() < cnt {
1141 log::error!(
1142 "Recovered less tables than expected: {:?}",
1143 table_map.keys(),
1144 );
1145 return Err(crate::Error::Unrecoverable);
1146 }
1147
1148 log::debug!("Successfully recovered {} tables", tables.len());
1149
1150 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1151 &tree_path.join(crate::file::BLOBS_FOLDER),
1152 &recovery.blob_file_ids,
1153 tree_id,
1154 config.descriptor_table.as_ref(),
1155 )?;
1156
1157 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1158
1159 Self::cleanup_orphaned_version(tree_path, version.id())?;
1162
1163 for table_path in orphaned_tables {
1164 log::debug!("Deleting orphaned table {}", table_path.display());
1165 std::fs::remove_file(&table_path)?;
1166 }
1167
1168 for blob_file_path in orphaned_blob_files {
1169 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1170 std::fs::remove_file(&blob_file_path)?;
1171 }
1172
1173 Ok(version)
1174 }
1175
1176 fn cleanup_orphaned_version(
1177 path: &Path,
1178 latest_version_id: crate::version::VersionId,
1179 ) -> crate::Result<()> {
1180 let version_str = format!("v{latest_version_id}");
1181
1182 for file in std::fs::read_dir(path)? {
1183 let dirent = file?;
1184
1185 if dirent.file_type()?.is_dir() {
1186 continue;
1187 }
1188
1189 let name = dirent.file_name();
1190
1191 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1192 log::trace!("Cleanup orphaned version {}", name.display());
1193 std::fs::remove_file(dirent.path())?;
1194 }
1195 }
1196
1197 Ok(())
1198 }
1199}