1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 manifest::Manifest,
16 memtable::Memtable,
17 slice::Slice,
18 table::Table,
19 value::InternalValue,
20 version::{recovery::recover, SuperVersion, SuperVersions, Version},
21 vlog::BlobFile,
22 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23 ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27 ops::{Bound, RangeBounds},
28 path::Path,
29 sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let (k, v) = self.0?;
44
45 if pred(&k) {
46 Ok((k, Some(v)))
47 } else {
48 Ok((k, None))
49 }
50 }
51
52 fn key(self) -> crate::Result<UserKey> {
53 self.0.map(|(k, _)| k)
54 }
55
56 fn size(self) -> crate::Result<u32> {
57 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58 self.into_inner().map(|(_, v)| v.len() as u32)
59 }
60
61 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62 self.0
63 }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67 if item.is_tombstone() {
68 None
69 } else {
70 Some(item)
71 }
72}
73
74#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79 type Target = TreeInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AbstractTree for Tree {
87 fn table_file_cache_size(&self) -> usize {
88 self.config.descriptor_table.len()
89 }
90
91 fn get_version_history_lock(
92 &self,
93 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
95 self.version_history.write().expect("lock is poisoned")
96 }
97
98 fn next_table_id(&self) -> TableId {
99 self.0.table_id_counter.get()
100 }
101
102 fn id(&self) -> TreeId {
103 self.id
104 }
105
106 fn blob_file_count(&self) -> usize {
107 0
108 }
109
110 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
111 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
112 let super_version = self
113 .version_history
114 .read()
115 .expect("lock is poisoned")
116 .get_version_for_snapshot(seqno);
117
118 Self::get_internal_entry_from_version(&super_version, key, seqno)
119 }
120
121 fn current_version(&self) -> Version {
122 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
123 self.version_history
124 .read()
125 .expect("poisoned")
126 .latest_version()
127 .version
128 }
129
130 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
131 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
132 self.flush_lock.lock().expect("lock is poisoned")
133 }
134
135 #[cfg(feature = "metrics")]
136 fn metrics(&self) -> &Arc<crate::Metrics> {
137 &self.0.metrics
138 }
139
140 fn version_free_list_len(&self) -> usize {
141 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
142 self.version_history
143 .read()
144 .expect("lock is poisoned")
145 .free_list_len()
146 }
147
148 fn prefix<K: AsRef<[u8]>>(
149 &self,
150 prefix: K,
151 seqno: SeqNo,
152 index: Option<(Arc<Memtable>, SeqNo)>,
153 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
154 Box::new(
155 self.create_prefix(&prefix, seqno, index)
156 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
157 )
158 }
159
160 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
161 &self,
162 range: R,
163 seqno: SeqNo,
164 index: Option<(Arc<Memtable>, SeqNo)>,
165 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
166 Box::new(
167 self.create_range(&range, seqno, index)
168 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
169 )
170 }
171
172 fn tombstone_count(&self) -> u64 {
174 self.current_version()
175 .iter_tables()
176 .map(Table::tombstone_count)
177 .sum()
178 }
179
180 fn weak_tombstone_count(&self) -> u64 {
182 self.current_version()
183 .iter_tables()
184 .map(Table::weak_tombstone_count)
185 .sum()
186 }
187
188 fn weak_tombstone_reclaimable_count(&self) -> u64 {
190 self.current_version()
191 .iter_tables()
192 .map(Table::weak_tombstone_reclaimable)
193 .sum()
194 }
195
196 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
197 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
198
199 if is_empty {
200 return Ok(());
201 }
202
203 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
204
205 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
207 let _lock = self
208 .0
209 .major_compaction_lock
210 .write()
211 .expect("lock is poisoned");
212
213 log::info!("Starting drop_range compaction");
214 self.inner_compact(strategy, 0)
215 }
216
217 fn clear(&self) -> crate::Result<()> {
218 let mut versions = self.get_version_history_lock();
219
220 versions.upgrade_version(
221 &self.config.path,
222 |v| {
223 let mut copy = v.clone();
224 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
225 copy.sealed_memtables = Arc::default();
226 copy.version = Version::new(v.version.id() + 1, self.tree_type());
227 Ok(copy)
228 },
229 &self.config.seqno,
230 &self.config.visible_seqno,
231 )
232 }
233
234 #[doc(hidden)]
235 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
236 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
237
238 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
240 let _lock = self
241 .0
242 .major_compaction_lock
243 .write()
244 .expect("lock is poisoned");
245
246 log::info!("Starting major compaction");
247 self.inner_compact(strategy, seqno_threshold)
248 }
249
250 fn l0_run_count(&self) -> usize {
251 self.current_version()
252 .level(0)
253 .map(|x| x.run_count())
254 .unwrap_or_default()
255 }
256
257 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
258 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
259 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
260 }
261
262 fn filter_size(&self) -> u64 {
263 self.current_version()
264 .iter_tables()
265 .map(Table::filter_size)
266 .map(u64::from)
267 .sum()
268 }
269
270 fn pinned_filter_size(&self) -> usize {
271 self.current_version()
272 .iter_tables()
273 .map(Table::pinned_filter_size)
274 .sum()
275 }
276
277 fn pinned_block_index_size(&self) -> usize {
278 self.current_version()
279 .iter_tables()
280 .map(Table::pinned_block_index_size)
281 .sum()
282 }
283
284 fn sealed_memtable_count(&self) -> usize {
285 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
286 self.version_history
287 .read()
288 .expect("lock is poisoned")
289 .latest_version()
290 .sealed_memtables
291 .len()
292 }
293
294 fn flush_to_tables(
295 &self,
296 stream: impl Iterator<Item = crate::Result<InternalValue>>,
297 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
298 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
299 use std::time::Instant;
300
301 let start = Instant::now();
302
303 let folder = self.config.path.join(TABLES_FOLDER);
304
305 let data_block_size = self.config.data_block_size_policy.get(0);
306
307 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
308 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
309
310 let data_block_compression = self.config.data_block_compression_policy.get(0);
311 let index_block_compression = self.config.index_block_compression_policy.get(0);
312
313 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
314
315 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
316 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
317
318 log::debug!(
319 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
320 folder.display(),
321 );
322
323 let mut table_writer = MultiWriter::new(
324 folder.clone(),
325 self.table_id_counter.clone(),
326 64 * 1_024 * 1_024,
327 0,
328 )?
329 .use_data_block_restart_interval(data_block_restart_interval)
330 .use_index_block_restart_interval(index_block_restart_interval)
331 .use_data_block_compression(data_block_compression)
332 .use_index_block_compression(index_block_compression)
333 .use_data_block_size(data_block_size)
334 .use_data_block_hash_ratio(data_block_hash_ratio)
335 .use_bloom_policy({
336 use crate::config::FilterPolicyEntry::{Bloom, None};
337 use crate::table::filter::BloomConstructionPolicy;
338
339 match self.config.filter_policy.get(0) {
340 Bloom(policy) => policy,
341 None => BloomConstructionPolicy::BitsPerKey(0.0),
342 }
343 });
344
345 if index_partitioning {
346 table_writer = table_writer.use_partitioned_index();
347 }
348 if filter_partitioning {
349 table_writer = table_writer.use_partitioned_filter();
350 }
351
352 for item in stream {
353 table_writer.write(item?)?;
354 }
355
356 let result = table_writer.finish()?;
357
358 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
359
360 let pin_filter = self.config.filter_block_pinning_policy.get(0);
361 let pin_index = self.config.index_block_pinning_policy.get(0);
362
363 let tables = result
365 .into_iter()
366 .map(|(table_id, checksum)| -> crate::Result<Table> {
367 Table::recover(
368 folder.join(table_id.to_string()),
369 checksum,
370 0,
371 self.id,
372 self.config.cache.clone(),
373 self.config.descriptor_table.clone(),
374 pin_filter,
375 pin_index,
376 #[cfg(feature = "metrics")]
377 self.metrics.clone(),
378 )
379 })
380 .collect::<crate::Result<Vec<_>>>()?;
381
382 Ok(Some((tables, None)))
383 }
384
385 #[expect(clippy::significant_drop_tightening)]
386 fn register_tables(
387 &self,
388 tables: &[Table],
389 blob_files: Option<&[BlobFile]>,
390 frag_map: Option<crate::blob_tree::FragmentationMap>,
391 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
392 gc_watermark: SeqNo,
393 ) -> crate::Result<()> {
394 log::trace!(
395 "Registering {} tables, {} blob files",
396 tables.len(),
397 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
398 );
399
400 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
401 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
402 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
403 let mut version_lock = self.version_history.write().expect("lock is poisoned");
404
405 version_lock.upgrade_version(
406 &self.config.path,
407 |current| {
408 let mut copy = current.clone();
409
410 copy.version = copy.version.with_new_l0_run(
411 tables,
412 blob_files,
413 frag_map.filter(|x| !x.is_empty()),
414 );
415
416 for &table_id in sealed_memtables_to_delete {
417 log::trace!("releasing sealed memtable #{table_id}");
418 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
419 }
420
421 Ok(copy)
422 },
423 &self.config.seqno,
424 &self.config.visible_seqno,
425 )?;
426
427 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
428 log::warn!("Version GC failed: {e:?}");
429 }
430
431 Ok(())
432 }
433
434 fn clear_active_memtable(&self) {
435 use crate::tree::sealed::SealedMemtables;
436
437 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
438 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
439 let super_version = version_history_lock.latest_version();
440
441 if super_version.active_memtable.is_empty() {
442 return;
443 }
444
445 let mut copy = version_history_lock.latest_version();
446 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
447 copy.sealed_memtables = Arc::new(SealedMemtables::default());
448
449 copy.seqno = super_version.seqno;
451
452 version_history_lock.replace_latest_version(copy);
453
454 log::trace!("cleared active memtable");
455 }
456
457 fn compact(
458 &self,
459 strategy: Arc<dyn CompactionStrategy>,
460 seqno_threshold: SeqNo,
461 ) -> crate::Result<()> {
462 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
466 let _lock = self
467 .0
468 .major_compaction_lock
469 .read()
470 .expect("lock is poisoned");
471
472 self.inner_compact(strategy, seqno_threshold)
473 }
474
475 fn get_next_table_id(&self) -> TableId {
476 self.0.get_next_table_id()
477 }
478
479 fn tree_config(&self) -> &Config {
480 &self.config
481 }
482
483 fn active_memtable(&self) -> Arc<Memtable> {
484 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
485 self.version_history
486 .read()
487 .expect("lock is poisoned")
488 .latest_version()
489 .active_memtable
490 }
491
492 fn tree_type(&self) -> crate::TreeType {
493 crate::TreeType::Standard
494 }
495
496 #[expect(clippy::significant_drop_tightening)]
497 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
498 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
499 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
500 let super_version = version_history_lock.latest_version();
501
502 if super_version.active_memtable.is_empty() {
503 return None;
504 }
505
506 let yanked_memtable = super_version.active_memtable;
507
508 let mut copy = version_history_lock.latest_version();
509 copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
510 copy.sealed_memtables =
511 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
512
513 copy.seqno = super_version.seqno;
515
516 version_history_lock.replace_latest_version(copy);
517
518 log::trace!(
519 "rotate: added memtable id={} to sealed memtables",
520 yanked_memtable.id,
521 );
522
523 Some(yanked_memtable)
524 }
525
526 fn table_count(&self) -> usize {
527 self.current_version().table_count()
528 }
529
530 fn level_table_count(&self, idx: usize) -> Option<usize> {
531 self.current_version().level(idx).map(|x| x.table_count())
532 }
533
534 fn approximate_len(&self) -> usize {
535 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
536 let super_version = self
537 .version_history
538 .read()
539 .expect("lock is poisoned")
540 .latest_version();
541
542 let tables_item_count = self
543 .current_version()
544 .iter_tables()
545 .map(|x| x.metadata.item_count)
546 .sum::<u64>();
547
548 let memtable_count = super_version.active_memtable.len() as u64;
549 let sealed_count = super_version
550 .sealed_memtables
551 .iter()
552 .map(|mt| mt.len())
553 .sum::<usize>() as u64;
554
555 #[expect(clippy::expect_used, reason = "result should fit into usize")]
556 (memtable_count + sealed_count + tables_item_count)
557 .try_into()
558 .expect("approximate_len too large for usize")
559 }
560
561 fn disk_space(&self) -> u64 {
562 self.current_version()
563 .iter_levels()
564 .map(super::version::Level::size)
565 .sum()
566 }
567
568 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
569 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
570 let version = self
571 .version_history
572 .read()
573 .expect("lock is poisoned")
574 .latest_version();
575
576 let active = version.active_memtable.get_highest_seqno();
577
578 let sealed = version
579 .sealed_memtables
580 .iter()
581 .map(|mt| mt.get_highest_seqno())
582 .max()
583 .flatten();
584
585 active.max(sealed)
586 }
587
588 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
589 self.current_version()
590 .iter_tables()
591 .map(Table::get_highest_seqno)
592 .max()
593 }
594
595 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
596 Ok(self
597 .get_internal_entry(key.as_ref(), seqno)?
598 .map(|x| x.value))
599 }
600
601 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
602 &self,
603 key: K,
604 value: V,
605 seqno: SeqNo,
606 ) -> (u64, u64) {
607 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
608 self.append_entry(value)
609 }
610
611 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
612 let value = InternalValue::new_tombstone(key, seqno);
613 self.append_entry(value)
614 }
615
616 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
617 let value = InternalValue::new_weak_tombstone(key, seqno);
618 self.append_entry(value)
619 }
620}
621
622impl Tree {
623 #[doc(hidden)]
624 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
625 version: SuperVersion,
626 range: &'a R,
627 seqno: SeqNo,
628 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
629 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
630 use crate::range::{IterState, TreeIter};
631 use std::ops::Bound::{self, Excluded, Included, Unbounded};
632
633 let lo: Bound<UserKey> = match range.start_bound() {
634 Included(x) => Included(x.as_ref().into()),
635 Excluded(x) => Excluded(x.as_ref().into()),
636 Unbounded => Unbounded,
637 };
638
639 let hi: Bound<UserKey> = match range.end_bound() {
640 Included(x) => Included(x.as_ref().into()),
641 Excluded(x) => Excluded(x.as_ref().into()),
642 Unbounded => Unbounded,
643 };
644
645 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
646
647 let iter_state = { IterState { version, ephemeral } };
648
649 TreeIter::create_range(iter_state, bounds, seqno)
650 }
651
652 pub(crate) fn get_internal_entry_from_version(
653 super_version: &SuperVersion,
654 key: &[u8],
655 seqno: SeqNo,
656 ) -> crate::Result<Option<InternalValue>> {
657 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
658 return Ok(ignore_tombstone_value(entry));
659 }
660
661 if let Some(entry) =
663 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
664 {
665 return Ok(ignore_tombstone_value(entry));
666 }
667
668 Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
670 }
671
672 fn get_internal_entry_from_tables(
673 version: &Version,
674 key: &[u8],
675 seqno: SeqNo,
676 ) -> crate::Result<Option<InternalValue>> {
677 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
680
681 for table in version
682 .iter_levels()
683 .flat_map(|lvl| lvl.iter())
684 .filter_map(|run| run.get_for_key(key))
685 {
686 if let Some(item) = table.get(key, seqno, key_hash)? {
687 return Ok(ignore_tombstone_value(item));
688 }
689 }
690
691 Ok(None)
692 }
693
694 fn get_internal_entry_from_sealed_memtables(
695 super_version: &SuperVersion,
696 key: &[u8],
697 seqno: SeqNo,
698 ) -> Option<InternalValue> {
699 for mt in super_version.sealed_memtables.iter().rev() {
700 if let Some(entry) = mt.get(key, seqno) {
701 return Some(entry);
702 }
703 }
704
705 None
706 }
707
708 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
709 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
710 self.version_history
711 .read()
712 .expect("lock is poisoned")
713 .get_version_for_snapshot(seqno)
714 }
715
716 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
727 range: &R,
728 ) -> (OwnedBounds, bool) {
729 use Bound::{Excluded, Included, Unbounded};
730
731 let start = match range.start_bound() {
732 Included(key) => Included(Slice::from(key.as_ref())),
733 Excluded(key) => Excluded(Slice::from(key.as_ref())),
734 Unbounded => Unbounded,
735 };
736
737 let end = match range.end_bound() {
738 Included(key) => Included(Slice::from(key.as_ref())),
739 Excluded(key) => Excluded(Slice::from(key.as_ref())),
740 Unbounded => Unbounded,
741 };
742
743 let is_empty =
744 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
745 lo.as_ref() > hi.as_ref()
746 } else {
747 false
748 };
749
750 (OwnedBounds { start, end }, is_empty)
751 }
752
753 pub(crate) fn open(config: Config) -> crate::Result<Self> {
766 log::debug!("Opening LSM-tree at {}", config.path.display());
767
768 if config.path.join("version").try_exists()? {
770 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
771 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
772 }
773
774 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
775 Self::recover(config)
776 } else {
777 Self::create_new(config)
778 }?;
779
780 Ok(tree)
781 }
782
783 #[doc(hidden)]
785 #[must_use]
786 pub fn is_compacting(&self) -> bool {
787 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
788 !self
789 .compaction_state
790 .lock()
791 .expect("lock is poisoned")
792 .hidden_set()
793 .is_empty()
794 }
795
796 fn inner_compact(
797 &self,
798 strategy: Arc<dyn CompactionStrategy>,
799 mvcc_gc_watermark: SeqNo,
800 ) -> crate::Result<()> {
801 use crate::compaction::worker::{do_compaction, Options};
802
803 let mut opts = Options::from_tree(self, strategy);
804 opts.mvcc_gc_watermark = mvcc_gc_watermark;
805
806 do_compaction(&opts)?;
807
808 log::debug!("Compaction run over");
809
810 Ok(())
811 }
812
813 #[doc(hidden)]
814 #[must_use]
815 pub fn create_iter(
816 &self,
817 seqno: SeqNo,
818 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
819 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
820 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
821 }
822
823 #[doc(hidden)]
824 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
825 &self,
826 range: &'a R,
827 seqno: SeqNo,
828 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
829 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
830 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
831 let super_version = self
832 .version_history
833 .read()
834 .expect("lock is poisoned")
835 .get_version_for_snapshot(seqno);
836
837 Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
838 Ok(kv) => Ok((kv.key.user_key, kv.value)),
839 Err(e) => Err(e),
840 })
841 }
842
843 #[doc(hidden)]
844 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
845 &self,
846 prefix: K,
847 seqno: SeqNo,
848 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
849 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
850 use crate::range::prefix_to_range;
851
852 let range = prefix_to_range(prefix.as_ref());
853 self.create_range(&range, seqno, ephemeral)
854 }
855
856 #[doc(hidden)]
860 #[must_use]
861 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
862 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
863 self.version_history
864 .read()
865 .expect("lock is poisoned")
866 .latest_version()
867 .active_memtable
868 .insert(value)
869 }
870
871 fn recover(mut config: Config) -> crate::Result<Self> {
877 use crate::stop_signal::StopSignal;
878 use inner::get_next_tree_id;
879
880 log::info!("Recovering LSM-tree at {}", config.path.display());
881
882 let tree_id = get_next_tree_id();
893
894 #[cfg(feature = "metrics")]
895 let metrics = Arc::new(Metrics::default());
896
897 let version = Self::recover_levels(
898 &config.path,
899 tree_id,
900 &config,
901 #[cfg(feature = "metrics")]
902 &metrics,
903 )?;
904
905 {
906 let manifest_path = config.path.join(format!("v{}", version.id()));
907 let reader = sfa::Reader::new(&manifest_path)?;
908 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
909
910 if manifest.version != FormatVersion::V3 {
911 return Err(crate::Error::InvalidVersion(manifest.version.into()));
912 }
913
914 let requested_tree_type = match config.kv_separation_opts {
915 Some(_) => crate::TreeType::Blob,
916 None => crate::TreeType::Standard,
917 };
918
919 if version.tree_type() != requested_tree_type {
920 log::error!(
921 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
922 version.tree_type(),
923 );
924 return Err(crate::Error::Unrecoverable);
925 }
926
927 config.level_count = manifest.level_count;
929 }
930
931 let highest_table_id = version
932 .iter_tables()
933 .map(Table::id)
934 .max()
935 .unwrap_or_default();
936
937 let inner = TreeInner {
938 id: tree_id,
939 memtable_id_counter: SequenceNumberCounter::default(),
940 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
941 blob_file_id_counter: SequenceNumberCounter::default(),
942 version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
943 stop_signal: StopSignal::default(),
944 config: Arc::new(config),
945 major_compaction_lock: RwLock::default(),
946 flush_lock: Mutex::default(),
947 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
948
949 #[cfg(feature = "metrics")]
950 metrics,
951 };
952
953 Ok(Self(Arc::new(inner)))
954 }
955
956 fn create_new(config: Config) -> crate::Result<Self> {
958 use crate::file::{fsync_directory, TABLES_FOLDER};
959 use std::fs::create_dir_all;
960
961 let path = config.path.clone();
962 log::trace!("Creating LSM-tree at {}", path.display());
963
964 create_dir_all(&path)?;
965
966 let table_folder_path = path.join(TABLES_FOLDER);
967 create_dir_all(&table_folder_path)?;
968
969 fsync_directory(&table_folder_path)?;
971 fsync_directory(&path)?;
972
973 let inner = TreeInner::create_new(config)?;
974 Ok(Self(Arc::new(inner)))
975 }
976
977 fn recover_levels<P: AsRef<Path>>(
979 tree_path: P,
980 tree_id: TreeId,
981 config: &Config,
982 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
983 ) -> crate::Result<Version> {
984 use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
985
986 let tree_path = tree_path.as_ref();
987
988 let recovery = recover(tree_path)?;
989
990 let table_map = {
991 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
992 crate::HashMap::default();
993
994 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
995 for run in table_ids {
996 for table in run {
997 #[expect(
998 clippy::expect_used,
999 reason = "there are always less than 256 levels"
1000 )]
1001 result.insert(
1002 table.id,
1003 (
1004 level_idx
1005 .try_into()
1006 .expect("there are less than 256 levels"),
1007 table.checksum,
1008 table.global_seqno,
1009 ),
1010 );
1011 }
1012 }
1013 }
1014
1015 result
1016 };
1017
1018 let cnt = table_map.len();
1019
1020 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1021
1022 let progress_mod = match cnt {
1023 _ if cnt <= 20 => 1,
1024 _ if cnt <= 100 => 10,
1025 _ => 100,
1026 };
1027
1028 let mut tables = vec![];
1029
1030 let table_base_folder = tree_path.join(TABLES_FOLDER);
1031
1032 if !table_base_folder.try_exists()? {
1033 std::fs::create_dir_all(&table_base_folder)?;
1034 fsync_directory(&table_base_folder)?;
1035 }
1036
1037 let mut orphaned_tables = vec![];
1038
1039 for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1040 let dirent = dirent?;
1041 let file_name = dirent.file_name();
1042
1043 if file_name == ".DS_Store" {
1045 continue;
1046 }
1047
1048 if file_name.to_string_lossy().starts_with("._") {
1050 continue;
1051 }
1052
1053 let table_file_name = file_name.to_str().ok_or_else(|| {
1054 log::error!("invalid table file name {}", file_name.display());
1055 crate::Error::Unrecoverable
1056 })?;
1057
1058 let table_file_path = dirent.path();
1059 assert!(!table_file_path.is_dir());
1060
1061 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1062 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1063 crate::Error::Unrecoverable
1064 })?;
1065
1066 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1067 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1068 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1069
1070 let table = Table::recover(
1071 table_file_path,
1072 checksum,
1073 global_seqno,
1074 tree_id,
1075 config.cache.clone(),
1076 config.descriptor_table.clone(),
1077 pin_filter,
1078 pin_index,
1079 #[cfg(feature = "metrics")]
1080 metrics.clone(),
1081 )?;
1082
1083 tables.push(table);
1084
1085 if idx % progress_mod == 0 {
1086 log::debug!("Recovered {idx}/{cnt} tables");
1087 }
1088 } else {
1089 orphaned_tables.push(table_file_path);
1090 }
1091 }
1092
1093 if tables.len() < cnt {
1094 log::error!(
1095 "Recovered less tables than expected: {:?}",
1096 table_map.keys(),
1097 );
1098 return Err(crate::Error::Unrecoverable);
1099 }
1100
1101 log::debug!("Successfully recovered {} tables", tables.len());
1102
1103 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1104 &tree_path.join(crate::file::BLOBS_FOLDER),
1105 &recovery.blob_file_ids,
1106 )?;
1107
1108 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1109
1110 Self::cleanup_orphaned_version(tree_path, version.id())?;
1113
1114 for table_path in orphaned_tables {
1115 log::debug!("Deleting orphaned table {}", table_path.display());
1116 std::fs::remove_file(&table_path)?;
1117 }
1118
1119 for blob_file_path in orphaned_blob_files {
1120 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1121 std::fs::remove_file(&blob_file_path)?;
1122 }
1123
1124 Ok(version)
1125 }
1126
1127 fn cleanup_orphaned_version(
1128 path: &Path,
1129 latest_version_id: crate::version::VersionId,
1130 ) -> crate::Result<()> {
1131 let version_str = format!("v{latest_version_id}");
1132
1133 for file in std::fs::read_dir(path)? {
1134 let dirent = file?;
1135
1136 if dirent.file_type()?.is_dir() {
1137 continue;
1138 }
1139
1140 let name = dirent.file_name();
1141
1142 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1143 log::trace!("Cleanup orphaned version {}", name.display());
1144 std::fs::remove_file(dirent.path())?;
1145 }
1146 }
1147
1148 Ok(())
1149 }
1150}