1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10 compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11 config::Config,
12 file::CURRENT_VERSION_FILE,
13 format_version::FormatVersion,
14 iter_guard::{IterGuard, IterGuardImpl},
15 key::InternalKey,
16 manifest::Manifest,
17 memtable::Memtable,
18 slice::Slice,
19 table::Table,
20 value::InternalValue,
21 version::{recovery::recover, SuperVersion, SuperVersions, Version},
22 vlog::BlobFile,
23 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
24 ValueType,
25};
26use inner::{TreeId, TreeInner};
27use std::{
28 ops::{Bound, RangeBounds},
29 path::Path,
30 sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40 fn into_inner_if(
41 self,
42 pred: impl Fn(&UserKey) -> bool,
43 ) -> crate::Result<(UserKey, Option<UserValue>)> {
44 let (k, v) = self.0?;
45
46 if pred(&k) {
47 Ok((k, Some(v)))
48 } else {
49 Ok((k, None))
50 }
51 }
52
53 fn key(self) -> crate::Result<UserKey> {
54 self.0.map(|(k, _)| k)
55 }
56
57 fn size(self) -> crate::Result<u32> {
58 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
59 self.into_inner().map(|(_, v)| v.len() as u32)
60 }
61
62 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63 self.0
64 }
65}
66
67fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
68 if item.is_tombstone() {
69 None
70 } else {
71 Some(item)
72 }
73}
74
75#[derive(Clone)]
77pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
78
79impl std::ops::Deref for Tree {
80 type Target = TreeInner;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl crate::abstract_tree::sealed::Sealed for Tree {}
88
89impl AbstractTree for Tree {
90 fn table_file_cache_size(&self) -> usize {
91 self.config
92 .descriptor_table
93 .as_ref()
94 .map_or(0, |dt| dt.len())
95 }
96
97 fn get_version_history_lock(
98 &self,
99 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
100 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
101 self.version_history.write().expect("lock is poisoned")
102 }
103
104 fn next_table_id(&self) -> TableId {
105 self.0.table_id_counter.get()
106 }
107
108 fn id(&self) -> TreeId {
109 self.id
110 }
111
112 fn blob_file_count(&self) -> usize {
113 0
114 }
115
116 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
117 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
118 let super_version = self
119 .version_history
120 .read()
121 .expect("lock is poisoned")
122 .latest_version();
123
124 let key = Slice::from(key);
125
126 for kv in super_version.active_memtable.range_internal((
127 Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
128 Bound::Unbounded,
129 )) {
130 log::info!("[Active] {kv:?}");
131 }
132
133 for mt in super_version.sealed_memtables.iter().rev() {
134 for kv in mt.range_internal((
135 Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
136 Bound::Unbounded,
137 )) {
138 log::info!("[Sealed #{}] {kv:?}", mt.id());
139 }
140 }
141
142 for table in super_version
143 .version
144 .iter_levels()
145 .flat_map(|lvl| lvl.iter())
146 .filter_map(|run| run.get_for_key_cmp(&key, self.config.comparator.as_ref()))
147 {
148 for kv in table.range(..) {
149 let kv = kv?;
150
151 if kv.key.user_key != key {
152 break;
153 }
154
155 log::info!("[Table #{}] {kv:?}", table.id());
156 }
157 }
158
159 Ok(())
160 }
161
162 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
163 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
164 let super_version = self
165 .version_history
166 .read()
167 .expect("lock is poisoned")
168 .get_version_for_snapshot(seqno);
169
170 Self::get_internal_entry_from_version(
171 &super_version,
172 key,
173 seqno,
174 self.config.comparator.as_ref(),
175 )
176 }
177
178 fn current_version(&self) -> Version {
179 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
180 self.version_history
181 .read()
182 .expect("poisoned")
183 .latest_version()
184 .version
185 }
186
187 fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
188 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
189 self.flush_lock.lock().expect("lock is poisoned")
190 }
191
192 #[cfg(feature = "metrics")]
193 fn metrics(&self) -> &Arc<crate::Metrics> {
194 &self.0.metrics
195 }
196
197 fn version_free_list_len(&self) -> usize {
198 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
199 self.version_history
200 .read()
201 .expect("lock is poisoned")
202 .free_list_len()
203 }
204
205 fn prefix<K: AsRef<[u8]>>(
206 &self,
207 prefix: K,
208 seqno: SeqNo,
209 index: Option<(Arc<Memtable>, SeqNo)>,
210 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
211 Box::new(
212 self.create_prefix(&prefix, seqno, index)
213 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
214 )
215 }
216
217 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
218 &self,
219 range: R,
220 seqno: SeqNo,
221 index: Option<(Arc<Memtable>, SeqNo)>,
222 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
223 Box::new(
224 self.create_range(&range, seqno, index)
225 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
226 )
227 }
228
229 fn tombstone_count(&self) -> u64 {
231 self.current_version()
232 .iter_tables()
233 .map(Table::tombstone_count)
234 .sum()
235 }
236
237 fn weak_tombstone_count(&self) -> u64 {
239 self.current_version()
240 .iter_tables()
241 .map(Table::weak_tombstone_count)
242 .sum()
243 }
244
245 fn weak_tombstone_reclaimable_count(&self) -> u64 {
247 self.current_version()
248 .iter_tables()
249 .map(Table::weak_tombstone_reclaimable)
250 .sum()
251 }
252
253 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
254 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
255
256 if is_empty {
257 return Ok(());
258 }
259
260 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
261
262 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
264 let _lock = self
265 .0
266 .major_compaction_lock
267 .write()
268 .expect("lock is poisoned");
269
270 log::info!("Starting drop_range compaction");
271 self.inner_compact(strategy, 0)?;
272 Ok(())
273 }
274
275 fn clear(&self) -> crate::Result<()> {
276 let mut versions = self.get_version_history_lock();
277
278 versions.upgrade_version(
279 &self.config.path,
280 |v| {
281 let mut copy = v.clone();
282 copy.active_memtable = Arc::new(Memtable::new(
283 self.memtable_id_counter.next(),
284 self.config.comparator.clone(),
285 ));
286 copy.sealed_memtables = Arc::default();
287 copy.version = Version::new(v.version.id() + 1, self.tree_type());
288 Ok(copy)
289 },
290 &self.config.seqno,
291 &self.config.visible_seqno,
292 &*self.config.fs,
293 )
294 }
295
296 #[doc(hidden)]
297 fn major_compact(
298 &self,
299 target_size: u64,
300 seqno_threshold: SeqNo,
301 ) -> crate::Result<crate::compaction::CompactionResult> {
302 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
303
304 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
306 let _lock = self
307 .0
308 .major_compaction_lock
309 .write()
310 .expect("lock is poisoned");
311
312 log::info!("Starting major compaction");
313 self.inner_compact(strategy, seqno_threshold)
314 }
315
316 fn l0_run_count(&self) -> usize {
317 self.current_version()
318 .level(0)
319 .map(|x| x.run_count())
320 .unwrap_or_default()
321 }
322
323 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
324 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
325 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
326 }
327
328 fn filter_size(&self) -> u64 {
329 self.current_version()
330 .iter_tables()
331 .map(Table::filter_size)
332 .map(u64::from)
333 .sum()
334 }
335
336 fn pinned_filter_size(&self) -> usize {
337 self.current_version()
338 .iter_tables()
339 .map(Table::pinned_filter_size)
340 .sum()
341 }
342
343 fn pinned_block_index_size(&self) -> usize {
344 self.current_version()
345 .iter_tables()
346 .map(Table::pinned_block_index_size)
347 .sum()
348 }
349
350 fn sealed_memtable_count(&self) -> usize {
351 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
352 self.version_history
353 .read()
354 .expect("lock is poisoned")
355 .latest_version()
356 .sealed_memtables
357 .len()
358 }
359
360 fn flush_to_tables_with_rt(
361 &self,
362 stream: impl Iterator<Item = crate::Result<InternalValue>>,
363 range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
364 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
365 use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
366 use std::time::Instant;
367
368 let start = Instant::now();
369
370 let folder = self.config.path.join(TABLES_FOLDER);
371
372 let data_block_size = self.config.data_block_size_policy.get(0);
373
374 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
375 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
376
377 let data_block_compression = self.config.data_block_compression_policy.get(0);
378 let index_block_compression = self.config.index_block_compression_policy.get(0);
379
380 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
381
382 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
383 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
384
385 log::debug!(
386 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
387 folder.display(),
388 );
389
390 let mut table_writer = MultiWriter::new(
391 folder.clone(),
392 self.table_id_counter.clone(),
393 64 * 1_024 * 1_024,
394 0,
395 self.config.fs.clone(),
396 )?
397 .use_data_block_restart_interval(data_block_restart_interval)
398 .use_index_block_restart_interval(index_block_restart_interval)
399 .use_data_block_compression(data_block_compression)
400 .use_index_block_compression(index_block_compression)
401 .use_data_block_size(data_block_size)
402 .use_data_block_hash_ratio(data_block_hash_ratio)
403 .use_bloom_policy({
404 use crate::config::FilterPolicyEntry::{Bloom, None};
405 use crate::table::filter::BloomConstructionPolicy;
406
407 match self.config.filter_policy.get(0) {
408 Bloom(policy) => policy,
409 None => BloomConstructionPolicy::BitsPerKey(0.0),
410 }
411 });
412
413 if index_partitioning {
414 table_writer = table_writer.use_partitioned_index();
415 }
416 if filter_partitioning {
417 table_writer = table_writer.use_partitioned_filter();
418 }
419
420 table_writer = table_writer.use_prefix_extractor(self.config.prefix_extractor.clone());
421 table_writer = table_writer.use_encryption(self.config.encryption.clone());
422
423 #[cfg(feature = "zstd")]
424 {
425 table_writer = table_writer.use_zstd_dictionary(self.config.zstd_dictionary.clone());
426 }
427
428 table_writer.set_range_tombstones(range_tombstones);
432
433 for item in stream {
434 table_writer.write(item?)?;
435 }
436
437 let result = table_writer.finish()?;
438
439 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
440
441 let pin_filter = self.config.filter_block_pinning_policy.get(0);
442 let pin_index = self.config.index_block_pinning_policy.get(0);
443
444 let tables = result
446 .into_iter()
447 .map(|(table_id, checksum)| -> crate::Result<Table> {
448 Table::recover(
449 folder.join(table_id.to_string()),
450 checksum,
451 0,
452 self.id,
453 self.config.cache.clone(),
454 self.config.descriptor_table.clone(),
455 pin_filter,
456 pin_index,
457 self.config.encryption.clone(),
458 #[cfg(feature = "zstd")]
459 self.config.zstd_dictionary.clone(),
460 self.config.comparator.clone(),
461 #[cfg(feature = "metrics")]
462 self.metrics.clone(),
463 )
464 })
465 .collect::<crate::Result<Vec<_>>>()?;
466
467 Ok(Some((tables, None)))
471 }
472
473 #[expect(clippy::significant_drop_tightening)]
474 fn register_tables(
475 &self,
476 tables: &[Table],
477 blob_files: Option<&[BlobFile]>,
478 frag_map: Option<crate::blob_tree::FragmentationMap>,
479 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
480 gc_watermark: SeqNo,
481 ) -> crate::Result<()> {
482 log::trace!(
483 "Registering {} tables, {} blob files",
484 tables.len(),
485 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
486 );
487
488 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
489 let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
490 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
491 let mut version_lock = self.version_history.write().expect("lock is poisoned");
492
493 version_lock.upgrade_version(
494 &self.config.path,
495 |current| {
496 let mut copy = current.clone();
497
498 let ctx = crate::version::TransformContext::new(self.config.comparator.as_ref());
499 copy.version = copy.version.with_new_l0_run(
500 tables,
501 blob_files,
502 frag_map.filter(|x| !x.is_empty()),
503 &ctx,
504 );
505
506 for &table_id in sealed_memtables_to_delete {
507 log::trace!("releasing sealed memtable #{table_id}");
508 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
509 }
510
511 Ok(copy)
512 },
513 &self.config.seqno,
514 &self.config.visible_seqno,
515 &*self.config.fs,
516 )?;
517
518 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
519 log::warn!("Version GC failed: {e:?}");
520 }
521
522 Ok(())
523 }
524
525 fn clear_active_memtable(&self) {
526 use crate::tree::sealed::SealedMemtables;
527
528 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
529 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
530 let super_version = version_history_lock.latest_version();
531
532 if super_version.active_memtable.is_empty() {
533 return;
534 }
535
536 let mut copy = version_history_lock.latest_version();
537 copy.active_memtable = Arc::new(Memtable::new(
538 self.memtable_id_counter.next(),
539 self.config.comparator.clone(),
540 ));
541 copy.sealed_memtables = Arc::new(SealedMemtables::default());
542
543 copy.seqno = super_version.seqno;
545
546 version_history_lock.replace_latest_version(copy);
547
548 log::trace!("cleared active memtable");
549 }
550
551 fn compact(
552 &self,
553 strategy: Arc<dyn CompactionStrategy>,
554 seqno_threshold: SeqNo,
555 ) -> crate::Result<crate::compaction::CompactionResult> {
556 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
560 let _lock = self
561 .0
562 .major_compaction_lock
563 .read()
564 .expect("lock is poisoned");
565
566 self.inner_compact(strategy, seqno_threshold)
567 }
568
569 fn get_next_table_id(&self) -> TableId {
570 self.0.get_next_table_id()
571 }
572
573 fn tree_config(&self) -> &Config {
574 &self.config
575 }
576
577 fn active_memtable(&self) -> Arc<Memtable> {
578 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
579 self.version_history
580 .read()
581 .expect("lock is poisoned")
582 .latest_version()
583 .active_memtable
584 }
585
586 fn tree_type(&self) -> crate::TreeType {
587 crate::TreeType::Standard
588 }
589
590 #[expect(clippy::significant_drop_tightening)]
591 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
592 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
593 let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
594 let super_version = version_history_lock.latest_version();
595
596 if super_version.active_memtable.is_empty() {
597 return None;
598 }
599
600 let yanked_memtable = super_version.active_memtable;
601
602 let mut copy = version_history_lock.latest_version();
603 copy.active_memtable = Arc::new(Memtable::new(
604 self.memtable_id_counter.next(),
605 self.config.comparator.clone(),
606 ));
607 copy.sealed_memtables =
608 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
609
610 copy.seqno = super_version.seqno;
612
613 version_history_lock.replace_latest_version(copy);
614
615 log::trace!(
616 "rotate: added memtable id={} to sealed memtables",
617 yanked_memtable.id,
618 );
619
620 Some(yanked_memtable)
621 }
622
623 fn table_count(&self) -> usize {
624 self.current_version().table_count()
625 }
626
627 fn level_table_count(&self, idx: usize) -> Option<usize> {
628 self.current_version().level(idx).map(|x| x.table_count())
629 }
630
631 fn approximate_len(&self) -> usize {
632 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
633 let super_version = self
634 .version_history
635 .read()
636 .expect("lock is poisoned")
637 .latest_version();
638
639 let tables_item_count = self
640 .current_version()
641 .iter_tables()
642 .map(|x| x.metadata.item_count)
643 .sum::<u64>();
644
645 let memtable_count = super_version.active_memtable.len() as u64;
646 let sealed_count = super_version
647 .sealed_memtables
648 .iter()
649 .map(|mt| mt.len())
650 .sum::<usize>() as u64;
651
652 #[expect(clippy::expect_used, reason = "result should fit into usize")]
653 (memtable_count + sealed_count + tables_item_count)
654 .try_into()
655 .expect("approximate_len too large for usize")
656 }
657
658 fn disk_space(&self) -> u64 {
659 self.current_version()
660 .iter_levels()
661 .map(super::version::Level::size)
662 .sum()
663 }
664
665 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
666 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
667 let version = self
668 .version_history
669 .read()
670 .expect("lock is poisoned")
671 .latest_version();
672
673 let active = version.active_memtable.get_highest_seqno();
674
675 let sealed = version
676 .sealed_memtables
677 .iter()
678 .map(|mt| mt.get_highest_seqno())
679 .max()
680 .flatten();
681
682 active.max(sealed)
683 }
684
685 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
686 self.current_version()
687 .iter_tables()
688 .map(Table::get_highest_seqno)
689 .max()
690 }
691
692 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
693 let key = key.as_ref();
694
695 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
696 let super_version = self
697 .version_history
698 .read()
699 .expect("lock is poisoned")
700 .get_version_for_snapshot(seqno);
701
702 Self::resolve_or_passthrough(
703 &super_version,
704 key,
705 seqno,
706 self.config.merge_operator.as_ref(),
707 self.config.comparator.as_ref(),
708 )
709 }
710
711 fn multi_get<K: AsRef<[u8]>>(
712 &self,
713 keys: impl IntoIterator<Item = K>,
714 seqno: SeqNo,
715 ) -> crate::Result<Vec<Option<UserValue>>> {
716 let super_version = self.get_version_for_snapshot(seqno);
717
718 keys.into_iter()
719 .map(|key| {
720 Self::resolve_or_passthrough(
721 &super_version,
722 key.as_ref(),
723 seqno,
724 self.config.merge_operator.as_ref(),
725 self.config.comparator.as_ref(),
726 )
727 })
728 .collect()
729 }
730
731 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
732 &self,
733 key: K,
734 value: V,
735 seqno: SeqNo,
736 ) -> (u64, u64) {
737 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
738 self.append_entry(value)
739 }
740
741 fn merge<K: Into<UserKey>, V: Into<UserValue>>(
742 &self,
743 key: K,
744 operand: V,
745 seqno: SeqNo,
746 ) -> (u64, u64) {
747 let value = InternalValue::new_merge_operand(key, operand, seqno);
748 self.append_entry(value)
749 }
750
751 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
752 let value = InternalValue::new_tombstone(key, seqno);
753 self.append_entry(value)
754 }
755
756 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
757 let value = InternalValue::new_weak_tombstone(key, seqno);
758 self.append_entry(value)
759 }
760
761 fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
762 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
763 let memtable = Arc::clone(
764 &self
765 .version_history
766 .read()
767 .expect("lock is poisoned")
768 .latest_version()
769 .active_memtable,
770 );
771
772 memtable.insert_range_tombstone(start.into(), end.into(), seqno)
773 }
774}
775
776impl Tree {
777 fn resolve_or_passthrough(
780 super_version: &SuperVersion,
781 key: &[u8],
782 seqno: SeqNo,
783 merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
784 comparator: &dyn crate::comparator::UserComparator,
785 ) -> crate::Result<Option<UserValue>> {
786 let entry = Self::get_internal_entry_from_version(super_version, key, seqno, comparator)?;
787
788 match entry {
789 Some(entry) if entry.key.value_type == ValueType::MergeOperand => {
790 if let Some(merge_op) = merge_operator {
791 Self::resolve_merge_via_pipeline(
795 super_version.clone(),
796 key,
797 seqno,
798 Arc::clone(merge_op),
799 )
800 } else if Self::is_suppressed_by_range_tombstones(
801 super_version,
802 key,
803 entry.key.seqno,
804 seqno,
805 comparator,
806 ) {
807 Ok(None)
808 } else {
809 Ok(Some(entry.value))
810 }
811 }
812 Some(entry) => Ok(Some(entry.value)),
813 None => Ok(None),
814 }
815 }
816
817 fn resolve_merge_via_pipeline(
827 version: SuperVersion,
828 key: &[u8],
829 seqno: SeqNo,
830 merge_operator: Arc<dyn crate::merge_operator::MergeOperator>,
831 ) -> crate::Result<Option<UserValue>> {
832 use crate::range::{IterState, TreeIter};
833
834 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
835 let bloom_key = crate::Slice::from(key);
839 let comparator = version.active_memtable.comparator.clone();
840
841 let iter_state = IterState {
842 version,
843 ephemeral: None,
844 merge_operator: Some(merge_operator),
845 comparator,
846 prefix_hash: None,
847 key_hash: Some(key_hash),
848 bloom_key: Some(bloom_key),
849 #[cfg(feature = "metrics")]
850 metrics: None,
851 };
852
853 let mut iter = TreeIter::create_range_point(iter_state, key, seqno);
857
858 match iter.next() {
859 Some(Ok(entry)) => Ok(Some(entry.value)),
860 Some(Err(e)) => Err(e),
861 None => Ok(None),
862 }
863 }
864
865 #[doc(hidden)]
866 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
867 version: SuperVersion,
868 range: &'a R,
869 seqno: SeqNo,
870 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
871 merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
872 comparator: crate::comparator::SharedComparator,
873 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
874 Self::create_internal_range_with_prefix_hash(
875 version,
876 range,
877 seqno,
878 ephemeral,
879 merge_operator,
880 comparator,
881 None,
882 )
883 }
884
885 #[doc(hidden)]
888 pub(crate) fn create_internal_range_with_prefix_hash<
889 'a,
890 K: AsRef<[u8]> + 'a,
891 R: RangeBounds<K> + 'a,
892 >(
893 version: SuperVersion,
894 range: &'a R,
895 seqno: SeqNo,
896 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
897 merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
898 comparator: crate::comparator::SharedComparator,
899 prefix_hash: Option<u64>,
900 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
901 use crate::range::{IterState, TreeIter};
902 use std::ops::Bound::{self, Excluded, Included, Unbounded};
903
904 let lo: Bound<UserKey> = match range.start_bound() {
905 Included(x) => Included(x.as_ref().into()),
906 Excluded(x) => Excluded(x.as_ref().into()),
907 Unbounded => Unbounded,
908 };
909
910 let hi: Bound<UserKey> = match range.end_bound() {
911 Included(x) => Included(x.as_ref().into()),
912 Excluded(x) => Excluded(x.as_ref().into()),
913 Unbounded => Unbounded,
914 };
915
916 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
917
918 let iter_state = IterState {
919 version,
920 ephemeral,
921 merge_operator,
922 comparator,
923 prefix_hash,
924 key_hash: None,
925 bloom_key: None,
926 #[cfg(feature = "metrics")]
927 metrics: None,
928 };
929
930 TreeIter::create_range(iter_state, bounds, seqno)
931 }
932
933 pub(crate) fn get_internal_entry_from_version(
934 super_version: &SuperVersion,
935 key: &[u8],
936 seqno: SeqNo,
937 comparator: &dyn crate::comparator::UserComparator,
938 ) -> crate::Result<Option<InternalValue>> {
939 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
943 let Some(entry) = ignore_tombstone_value(entry) else {
944 return Ok(None);
945 };
946
947 if Self::is_suppressed_by_range_tombstones(
949 super_version,
950 key,
951 entry.key.seqno,
952 seqno,
953 comparator,
954 ) {
955 return Ok(None);
956 }
957 return Ok(Some(entry));
958 }
959
960 if let Some(entry) =
962 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
963 {
964 let Some(entry) = ignore_tombstone_value(entry) else {
965 return Ok(None);
966 };
967
968 if Self::is_suppressed_by_range_tombstones(
969 super_version,
970 key,
971 entry.key.seqno,
972 seqno,
973 comparator,
974 ) {
975 return Ok(None);
976 }
977 return Ok(Some(entry));
978 }
979
980 let entry =
982 Self::get_internal_entry_from_tables(&super_version.version, key, seqno, comparator)?;
983
984 if let Some(entry) = entry {
985 if Self::is_suppressed_by_range_tombstones(
986 super_version,
987 key,
988 entry.key.seqno,
989 seqno,
990 comparator,
991 ) {
992 return Ok(None);
993 }
994 return Ok(Some(entry));
995 }
996
997 Ok(None)
998 }
999
1000 fn is_suppressed_by_range_tombstones(
1003 super_version: &SuperVersion,
1004 key: &[u8],
1005 key_seqno: SeqNo,
1006 read_seqno: SeqNo,
1007 comparator: &dyn crate::comparator::UserComparator,
1008 ) -> bool {
1009 if super_version
1012 .active_memtable
1013 .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno)
1014 {
1015 return true;
1016 }
1017
1018 for mt in super_version.sealed_memtables.iter().rev() {
1020 if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) {
1021 return true;
1022 }
1023 }
1024
1025 for table in super_version
1032 .version
1033 .iter_levels()
1034 .flat_map(|lvl| lvl.iter())
1035 .flat_map(|run| run.iter())
1036 .filter(|t| !t.range_tombstones().is_empty())
1037 .filter(|t| {
1038 let kr = &t.metadata.key_range;
1040 comparator.compare(kr.min(), key) != std::cmp::Ordering::Greater
1041 && comparator.compare(key, kr.max()) != std::cmp::Ordering::Greater
1042 })
1043 {
1044 let rts = table.range_tombstones();
1045
1046 let candidate_end = rts.partition_point(|rt| {
1049 comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
1050 });
1051
1052 for rt in rts.iter().take(candidate_end) {
1053 if rt.visible_at(read_seqno)
1055 && comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
1056 && comparator.compare(key, &rt.end) == std::cmp::Ordering::Less
1057 && key_seqno < rt.seqno
1058 {
1059 return true;
1060 }
1061 }
1062 }
1063
1064 false
1065 }
1066
1067 fn get_internal_entry_from_tables(
1068 version: &Version,
1069 key: &[u8],
1070 seqno: SeqNo,
1071 comparator: &dyn crate::comparator::UserComparator,
1072 ) -> crate::Result<Option<InternalValue>> {
1073 let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
1076
1077 for (level_idx, level) in version.iter_levels().enumerate() {
1087 if level_idx == 0 {
1088 let mut best: Option<InternalValue> = None;
1089
1090 for run in level.iter() {
1091 if let Some(table) = run.get_for_key_cmp(key, comparator) {
1092 if let Some(item) = table.get(key, seqno, key_hash)? {
1093 match &best {
1094 Some(current) if current.key.seqno >= item.key.seqno => {}
1098 _ => {
1099 if item.key.seqno == seqno {
1102 return Ok(ignore_tombstone_value(item));
1103 }
1104 best = Some(item);
1105 }
1106 }
1107 }
1108 }
1109 }
1110
1111 if let Some(entry) = best {
1112 return Ok(ignore_tombstone_value(entry));
1113 }
1114 } else {
1115 for run in level.iter() {
1116 if let Some(table) = run.get_for_key_cmp(key, comparator) {
1117 if let Some(item) = table.get(key, seqno, key_hash)? {
1118 return Ok(ignore_tombstone_value(item));
1119 }
1120 }
1121 }
1122 }
1123 }
1124
1125 Ok(None)
1126 }
1127
1128 fn get_internal_entry_from_sealed_memtables(
1129 super_version: &SuperVersion,
1130 key: &[u8],
1131 seqno: SeqNo,
1132 ) -> Option<InternalValue> {
1133 for mt in super_version.sealed_memtables.iter().rev() {
1134 if let Some(entry) = mt.get(key, seqno) {
1135 return Some(entry);
1136 }
1137 }
1138
1139 None
1140 }
1141
1142 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
1143 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1144 self.version_history
1145 .read()
1146 .expect("lock is poisoned")
1147 .get_version_for_snapshot(seqno)
1148 }
1149
1150 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
1161 range: &R,
1162 ) -> (OwnedBounds, bool) {
1163 use Bound::{Excluded, Included, Unbounded};
1164
1165 let start = match range.start_bound() {
1166 Included(key) => Included(Slice::from(key.as_ref())),
1167 Excluded(key) => Excluded(Slice::from(key.as_ref())),
1168 Unbounded => Unbounded,
1169 };
1170
1171 let end = match range.end_bound() {
1172 Included(key) => Included(Slice::from(key.as_ref())),
1173 Excluded(key) => Excluded(Slice::from(key.as_ref())),
1174 Unbounded => Unbounded,
1175 };
1176
1177 let is_empty =
1178 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
1179 lo.as_ref() > hi.as_ref()
1180 } else {
1181 false
1182 };
1183
1184 (OwnedBounds { start, end }, is_empty)
1185 }
1186
1187 pub(crate) fn open(config: Config) -> crate::Result<Self> {
1200 log::debug!("Opening LSM-tree at {}", config.path.display());
1201
1202 if config.path.join("version").try_exists()? {
1204 log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
1205 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
1206 }
1207
1208 let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
1209 Self::recover(config)
1210 } else {
1211 Self::create_new(config)
1212 }?;
1213
1214 Ok(tree)
1215 }
1216
1217 #[doc(hidden)]
1219 #[must_use]
1220 pub fn is_compacting(&self) -> bool {
1221 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1222 !self
1223 .compaction_state
1224 .lock()
1225 .expect("lock is poisoned")
1226 .hidden_set()
1227 .is_empty()
1228 }
1229
1230 fn inner_compact(
1231 &self,
1232 strategy: Arc<dyn CompactionStrategy>,
1233 mvcc_gc_watermark: SeqNo,
1234 ) -> crate::Result<crate::compaction::CompactionResult> {
1235 use crate::compaction::worker::{do_compaction, Options};
1236
1237 let mut opts = Options::from_tree(self, strategy);
1238 opts.mvcc_gc_watermark = mvcc_gc_watermark;
1239
1240 let result = do_compaction(&opts)?;
1241
1242 log::debug!("Compaction run over");
1243
1244 Ok(result)
1245 }
1246
1247 #[doc(hidden)]
1248 #[must_use]
1249 pub fn create_iter(
1250 &self,
1251 seqno: SeqNo,
1252 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1253 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1254 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
1255 }
1256
1257 #[doc(hidden)]
1258 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
1259 &self,
1260 range: &'a R,
1261 seqno: SeqNo,
1262 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1263 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1264 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1265 let super_version = self
1266 .version_history
1267 .read()
1268 .expect("lock is poisoned")
1269 .get_version_for_snapshot(seqno);
1270
1271 Self::create_internal_range(
1272 super_version,
1273 range,
1274 seqno,
1275 ephemeral,
1276 self.config.merge_operator.clone(),
1277 self.config.comparator.clone(),
1278 )
1279 .map(|item| match item {
1280 Ok(kv) => Ok((kv.key.user_key, kv.value)),
1281 Err(e) => Err(e),
1282 })
1283 }
1284
1285 #[doc(hidden)]
1286 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
1287 &self,
1288 prefix: K,
1289 seqno: SeqNo,
1290 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1291 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1292 use crate::prefix::compute_prefix_hash;
1293 use crate::range::{prefix_to_range, IterState, TreeIter};
1294
1295 let prefix_bytes = prefix.as_ref();
1296
1297 let prefix_hash = compute_prefix_hash(self.config.prefix_extractor.as_ref(), prefix_bytes);
1298
1299 let range = prefix_to_range(prefix_bytes);
1300
1301 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1302 let super_version = self
1303 .version_history
1304 .read()
1305 .expect("lock is poisoned")
1306 .get_version_for_snapshot(seqno);
1307
1308 let iter_state = IterState {
1309 version: super_version,
1310 ephemeral,
1311 merge_operator: self.config.merge_operator.clone(),
1312 comparator: self.config.comparator.clone(),
1313 prefix_hash,
1314 key_hash: None,
1315 bloom_key: None,
1316 #[cfg(feature = "metrics")]
1317 metrics: Some(self.0.metrics.clone()),
1318 };
1319
1320 TreeIter::create_range(iter_state, range, seqno).map(|item| match item {
1321 Ok(kv) => Ok((kv.key.user_key, kv.value)),
1322 Err(e) => Err(e),
1323 })
1324 }
1325
1326 #[doc(hidden)]
1330 #[must_use]
1331 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
1332 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1333 self.version_history
1334 .read()
1335 .expect("lock is poisoned")
1336 .latest_version()
1337 .active_memtable
1338 .insert(value)
1339 }
1340
1341 fn recover(mut config: Config) -> crate::Result<Self> {
1347 use crate::stop_signal::StopSignal;
1348 use inner::get_next_tree_id;
1349
1350 log::info!("Recovering LSM-tree at {}", config.path.display());
1351
1352 {
1360 let version_id = crate::version::recovery::get_current_version(&config.path)?;
1361 let manifest_path = config.path.join(format!("v{version_id}"));
1362 let reader = sfa::Reader::new(&manifest_path)?;
1363 let manifest = Manifest::decode_from(&manifest_path, &reader)?;
1364
1365 if !matches!(manifest.version, FormatVersion::V3 | FormatVersion::V4) {
1366 return Err(crate::Error::InvalidVersion(manifest.version.into()));
1367 }
1368
1369 let supplied_name = config.comparator.name();
1370 if manifest.comparator_name != supplied_name {
1371 log::warn!(
1372 "Comparator mismatch: tree was created with {:?} but opened with {:?}",
1373 manifest.comparator_name,
1374 supplied_name,
1375 );
1376 return Err(crate::Error::ComparatorMismatch {
1377 stored: manifest.comparator_name,
1378 supplied: supplied_name,
1379 });
1380 }
1381
1382 config.level_count = manifest.level_count;
1384 }
1385
1386 let tree_id = get_next_tree_id();
1387
1388 #[cfg(feature = "metrics")]
1389 let metrics = Arc::new(Metrics::default());
1390
1391 let version = Self::recover_levels(
1392 &config.path,
1393 tree_id,
1394 &config,
1395 #[cfg(feature = "metrics")]
1396 &metrics,
1397 )?;
1398
1399 {
1400 let requested_tree_type = match config.kv_separation_opts {
1401 Some(_) => crate::TreeType::Blob,
1402 None => crate::TreeType::Standard,
1403 };
1404
1405 if version.tree_type() != requested_tree_type {
1406 log::error!(
1407 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
1408 version.tree_type(),
1409 );
1410 return Err(crate::Error::Unrecoverable);
1411 }
1412 }
1413
1414 let highest_table_id = version
1415 .iter_tables()
1416 .map(Table::id)
1417 .max()
1418 .unwrap_or_default();
1419
1420 let comparator = config.comparator.clone();
1421
1422 let inner = TreeInner {
1423 id: tree_id,
1424 memtable_id_counter: SequenceNumberCounter::new(1),
1425 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
1426 blob_file_id_counter: SequenceNumberCounter::default(),
1427 version_history: Arc::new(RwLock::new(SuperVersions::new(version, comparator))),
1428 stop_signal: StopSignal::default(),
1429 config: Arc::new(config),
1430 major_compaction_lock: RwLock::default(),
1431 flush_lock: Mutex::default(),
1432 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
1433
1434 #[cfg(feature = "metrics")]
1435 metrics,
1436 };
1437
1438 Ok(Self(Arc::new(inner)))
1439 }
1440
1441 fn create_new(config: Config) -> crate::Result<Self> {
1443 use crate::file::{fsync_directory, TABLES_FOLDER};
1444 use crate::fs::Fs;
1445
1446 let path = config.path.clone();
1447 log::trace!("Creating LSM-tree at {}", path.display());
1448
1449 (*config.fs).create_dir_all(&path)?;
1450
1451 let table_folder_path = path.join(TABLES_FOLDER);
1452 (*config.fs).create_dir_all(&table_folder_path)?;
1453
1454 fsync_directory(&table_folder_path, &*config.fs)?;
1456 fsync_directory(&path, &*config.fs)?;
1457
1458 let inner = TreeInner::create_new(config)?;
1459 Ok(Self(Arc::new(inner)))
1460 }
1461
1462 #[expect(
1464 clippy::too_many_lines,
1465 reason = "recovery logic is inherently complex"
1466 )]
1467 fn recover_levels<P: AsRef<Path>>(
1468 tree_path: P,
1469 tree_id: TreeId,
1470 config: &Config,
1471 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1472 ) -> crate::Result<Version> {
1473 use crate::{file::fsync_directory, file::TABLES_FOLDER, fs::Fs, TableId};
1474
1475 let tree_path = tree_path.as_ref();
1476
1477 let recovery = recover(tree_path)?;
1478
1479 let table_map = {
1480 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
1481 crate::HashMap::default();
1482
1483 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1484 for run in table_ids {
1485 for table in run {
1486 #[expect(
1487 clippy::expect_used,
1488 reason = "there are always less than 256 levels"
1489 )]
1490 result.insert(
1491 table.id,
1492 (
1493 level_idx
1494 .try_into()
1495 .expect("there are less than 256 levels"),
1496 table.checksum,
1497 table.global_seqno,
1498 ),
1499 );
1500 }
1501 }
1502 }
1503
1504 result
1505 };
1506
1507 let cnt = table_map.len();
1508
1509 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1510
1511 let progress_mod = match cnt {
1512 _ if cnt <= 20 => 1,
1513 _ if cnt <= 100 => 10,
1514 _ => 100,
1515 };
1516
1517 let mut tables = vec![];
1518
1519 let table_base_folder = tree_path.join(TABLES_FOLDER);
1520
1521 if !config.fs.exists(&table_base_folder)? {
1522 (*config.fs).create_dir_all(&table_base_folder)?;
1523 fsync_directory(&table_base_folder, &*config.fs)?;
1524 }
1525
1526 let mut orphaned_tables = vec![];
1527
1528 for (idx, dirent) in config
1529 .fs
1530 .read_dir(&table_base_folder)?
1531 .into_iter()
1532 .enumerate()
1533 {
1534 let crate::fs::FsDirEntry {
1535 path: table_file_path,
1536 file_name,
1537 is_dir,
1538 } = dirent;
1539
1540 if file_name == ".DS_Store" {
1542 continue;
1543 }
1544
1545 if file_name.starts_with("._") {
1547 continue;
1548 }
1549
1550 let table_file_name = &file_name;
1551 assert!(!is_dir);
1552
1553 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1554 log::error!("invalid table file name {table_file_name:?}: {e:?}");
1555 crate::Error::Unrecoverable
1556 })?;
1557
1558 if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1559 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1560 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1561
1562 let table = Table::recover(
1563 table_file_path,
1564 checksum,
1565 global_seqno,
1566 tree_id,
1567 config.cache.clone(),
1568 config.descriptor_table.clone(),
1569 pin_filter,
1570 pin_index,
1571 config.encryption.clone(),
1572 #[cfg(feature = "zstd")]
1573 config.zstd_dictionary.clone(),
1574 config.comparator.clone(),
1575 #[cfg(feature = "metrics")]
1576 metrics.clone(),
1577 )?;
1578
1579 tables.push(table);
1580
1581 if idx % progress_mod == 0 {
1582 log::debug!("Recovered {idx}/{cnt} tables");
1583 }
1584 } else {
1585 orphaned_tables.push(table_file_path);
1586 }
1587 }
1588
1589 if tables.len() < cnt {
1590 log::error!(
1591 "Recovered less tables than expected: {:?}",
1592 table_map.keys(),
1593 );
1594 return Err(crate::Error::Unrecoverable);
1595 }
1596
1597 log::debug!("Successfully recovered {} tables", tables.len());
1598
1599 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1600 &tree_path.join(crate::file::BLOBS_FOLDER),
1601 &recovery.blob_file_ids,
1602 tree_id,
1603 config.descriptor_table.as_ref(),
1604 )?;
1605
1606 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1607
1608 Self::cleanup_orphaned_version(tree_path, version.id())?;
1611
1612 for table_path in orphaned_tables {
1613 log::debug!("Deleting orphaned table {}", table_path.display());
1614 std::fs::remove_file(&table_path)?;
1615 }
1616
1617 for blob_file_path in orphaned_blob_files {
1618 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1619 std::fs::remove_file(&blob_file_path)?;
1620 }
1621
1622 Ok(version)
1623 }
1624
1625 fn cleanup_orphaned_version(
1626 path: &Path,
1627 latest_version_id: crate::version::VersionId,
1628 ) -> crate::Result<()> {
1629 let version_str = format!("v{latest_version_id}");
1630
1631 for file in std::fs::read_dir(path)? {
1632 let dirent = file?;
1633
1634 if dirent.file_type()?.is_dir() {
1635 continue;
1636 }
1637
1638 let name = dirent.file_name();
1639
1640 if name.to_string_lossy().starts_with('v') && *name != *version_str {
1641 log::trace!("Cleanup orphaned version {}", name.display());
1642 std::fs::remove_file(dirent.path())?;
1643 }
1644 }
1645
1646 Ok(())
1647 }
1648}