1pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::path::Path;
10use crate::{
11 AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
12 ValueType,
13 compaction::{CompactionStrategy, drop_range::OwnedBounds, state::CompactionState},
14 config::Config,
15 format_version::FormatVersion,
16 fs::Fs,
17 iter_guard::{IterGuard, IterGuardImpl},
18 key::InternalKey,
19 manifest::Manifest,
20 memtable::Memtable,
21 range_tombstone::RangeTombstone,
22 scan_since::ScanSinceEvent,
23 slice::Slice,
24 table::Table,
25 value::InternalValue,
26 version::{SuperVersion, SuperVersions, Version, recovery::recover},
27 vlog::BlobFile,
28};
29use alloc::sync::Arc;
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, string::ToString, vec::Vec};
32use core::ops::{Bound, RangeBounds};
33use inner::{FlushGuard, TreeId, TreeInner, VersionsWriteGuard};
34#[cfg(feature = "std")]
37use parking_lot::{Mutex, RwLock};
38#[cfg(not(feature = "std"))]
39use spin::{Mutex, RwLock};
40
41#[cfg(feature = "metrics")]
42use crate::metrics::Metrics;
43
44pub const MIN_RESERVED_HEADROOM: u64 = 1024 * 1024;
49
50const ADMISSION_DISK_FREE_TTL: core::time::Duration = core::time::Duration::from_secs(1);
55
56pub struct Guard(crate::Result<(UserKey, UserValue)>);
58
59impl IterGuard for Guard {
60 fn into_inner_if(
61 self,
62 pred: impl Fn(&UserKey) -> bool,
63 ) -> crate::Result<(UserKey, Option<UserValue>)> {
64 let (k, v) = self.0?;
65
66 if pred(&k) {
67 Ok((k, Some(v)))
68 } else {
69 Ok((k, None))
70 }
71 }
72
73 fn key(self) -> crate::Result<UserKey> {
74 self.0.map(|(k, _)| k)
75 }
76
77 fn size(self) -> crate::Result<u32> {
78 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
79 self.into_inner().map(|(_, v)| v.len() as u32)
80 }
81
82 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
83 self.0
84 }
85}
86
87trait TablePointLookup: Sized {
93 fn lookup(
94 table: &Table,
95 key: &[u8],
96 seqno: SeqNo,
97 key_hash: u64,
98 ) -> crate::Result<Option<Self>>;
99 fn entry_seqno(&self) -> SeqNo;
100 fn filter_tombstone(self) -> Option<Self>;
101}
102
103type TableEntry = InternalValue;
105
106type CoveredKey = (usize, u64, Option<InternalValue>);
109
110type DedupedMissKeys = (Vec<(usize, u64)>, Vec<(usize, usize)>);
114
115struct RunResolve {
117 covered: Vec<CoveredKey>,
119 not_covered: Vec<(usize, u64)>,
121}
122
123struct BlockTask<'a> {
128 table: &'a crate::Table,
129 file: Arc<dyn crate::fs::FsFile>,
130 handle: crate::table::BlockHandle,
131 table_seqno: SeqNo,
132 special: bool,
133 keys: Vec<usize>,
134}
135
136impl TablePointLookup for TableEntry {
137 fn lookup(
138 table: &Table,
139 key: &[u8],
140 seqno: SeqNo,
141 key_hash: u64,
142 ) -> crate::Result<Option<Self>> {
143 table.get(key, seqno, key_hash)
144 }
145
146 fn entry_seqno(&self) -> SeqNo {
147 self.key.seqno
148 }
149
150 fn filter_tombstone(self) -> Option<Self> {
151 ignore_tombstone_value(self)
152 }
153}
154
155type TableEntryWithBlock = (InternalValue, crate::table::Block);
157
158impl TablePointLookup for TableEntryWithBlock {
159 fn lookup(
160 table: &Table,
161 key: &[u8],
162 seqno: SeqNo,
163 key_hash: u64,
164 ) -> crate::Result<Option<Self>> {
165 table.get_with_block(key, seqno, key_hash)
166 }
167
168 fn entry_seqno(&self) -> SeqNo {
169 self.0.key.seqno
170 }
171
172 fn filter_tombstone(self) -> Option<Self> {
173 ignore_tombstone_value(self.0).map(|iv| (iv, self.1))
174 }
175}
176
177type TableValue = (ValueType, SeqNo, crate::Slice);
180
181impl TablePointLookup for TableValue {
182 fn lookup(
183 table: &Table,
184 key: &[u8],
185 seqno: SeqNo,
186 key_hash: u64,
187 ) -> crate::Result<Option<Self>> {
188 table.get_value(key, seqno, key_hash)
189 }
190
191 fn entry_seqno(&self) -> SeqNo {
192 self.1
193 }
194
195 fn filter_tombstone(self) -> Option<Self> {
196 if self.0.is_tombstone() {
197 None
198 } else {
199 Some(self)
200 }
201 }
202}
203
204fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
205 if item.is_tombstone() {
206 None
207 } else {
208 Some(item)
209 }
210}
211
212#[derive(Clone)]
214pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
215
216impl core::ops::Deref for Tree {
217 type Target = TreeInner;
218
219 fn deref(&self) -> &Self::Target {
220 &self.0
221 }
222}
223
224impl crate::abstract_tree::sealed::Sealed for Tree {}
225
226fn standard_guard(item: crate::Result<InternalValue>) -> IterGuardImpl {
228 IterGuardImpl::Standard(Guard(item.map(|iv| (iv.key.user_key, iv.value))))
229}
230
231#[expect(
233 clippy::redundant_pub_crate,
234 reason = "reached from blob_tree as crate::tree::range_to_user_bounds"
235)]
236pub(crate) fn range_to_user_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
237 range: &R,
238) -> (Bound<UserKey>, Bound<UserKey>) {
239 use core::ops::Bound::{Excluded, Included, Unbounded};
240 let lo = match range.start_bound() {
241 Included(x) => Included(x.as_ref().into()),
242 Excluded(x) => Excluded(x.as_ref().into()),
243 Unbounded => Unbounded,
244 };
245 let hi = match range.end_bound() {
246 Included(x) => Included(x.as_ref().into()),
247 Excluded(x) => Excluded(x.as_ref().into()),
248 Unbounded => Unbounded,
249 };
250 (lo, hi)
251}
252
253struct StandardSeekable {
256 inner: crate::range::SeekableTreeIter,
257}
258
259impl Iterator for StandardSeekable {
260 type Item = IterGuardImpl;
261
262 fn next(&mut self) -> Option<Self::Item> {
263 self.inner.next().map(standard_guard)
264 }
265}
266
267impl DoubleEndedIterator for StandardSeekable {
268 fn next_back(&mut self) -> Option<Self::Item> {
269 self.inner.next_back().map(standard_guard)
270 }
271}
272
273impl crate::iter_guard::SeekableGuardIter for StandardSeekable {
274 fn seek_to(&mut self, key: &[u8]) {
275 self.inner.seek_to(key);
276 }
277
278 fn seek_to_for_prev(&mut self, key: &[u8]) {
279 self.inner.seek_to_for_prev(key);
280 }
281
282 fn peek_key(&mut self) -> Option<crate::Result<crate::UserKey>> {
283 self.inner.peek_key()
284 }
285}
286
287impl AbstractTree for Tree {
288 fn table_file_cache_size(&self) -> usize {
289 self.config
290 .descriptor_table
291 .as_ref()
292 .map_or(0, |dt| dt.len())
293 }
294
295 fn get_version_history_lock(&self) -> VersionsWriteGuard<'_> {
296 self.version_history.write()
297 }
298
299 fn next_table_id(&self) -> TableId {
300 self.0.table_id_counter.get()
301 }
302
303 fn id(&self) -> TreeId {
304 self.id
305 }
306
307 fn blob_file_count(&self) -> usize {
308 0
309 }
310
311 #[cfg(feature = "std")]
312 fn create_checkpoint(
313 &self,
314 target_path: &crate::path::Path,
315 ) -> crate::Result<crate::CheckpointInfo> {
316 crate::checkpoint::run_checkpoint(
317 self,
318 &crate::checkpoint::CheckpointParams {
319 target_root: target_path,
320 target_fs: &self.config.fs,
321 src_root: &self.config.path,
322 src_fs: &self.config.fs,
323 deletion_pause: &self.deletion_pause,
324 visible_seqno: &self.config.visible_seqno,
325 include_blobs: false,
326 runtime_config: self.0.runtime_config.load_full(),
327 encryption: self.0.config.encryption.clone(),
328 },
329 )
330 }
331
332 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
333 let super_version = self.version_history.read().latest_version();
334
335 let key = Slice::from(key);
336
337 for kv in super_version.active_memtable.range_internal((
338 Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
339 Bound::Unbounded,
340 )) {
341 log::info!("[Active] {kv:?}");
342 }
343
344 for mt in super_version.sealed_memtables.iter().rev() {
345 for kv in mt.range_internal((
346 Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
347 Bound::Unbounded,
348 )) {
349 log::info!("[Sealed #{}] {kv:?}", mt.id());
350 }
351 }
352
353 for table in super_version
354 .version
355 .iter_levels()
356 .flat_map(|lvl| lvl.iter())
357 .filter_map(|run| run.get_for_key_cmp(&key, self.config.comparator.as_ref()))
358 {
359 for kv in table.range(..) {
360 let kv = kv?;
361
362 if kv.key.user_key != key {
363 break;
364 }
365
366 log::info!("[Table #{}] {kv:?}", table.id());
367 }
368 }
369
370 Ok(())
371 }
372
373 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
374 #[cfg(feature = "std")]
386 {
387 let latest = self.latest_super_version.load();
388 if seqno > latest.seqno {
389 return Self::get_internal_entry_from_version(
390 &latest,
391 key,
392 seqno,
393 self.config.comparator.as_ref(),
394 );
395 }
396 }
397
398 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
401
402 Self::get_internal_entry_from_version(
403 &super_version,
404 key,
405 seqno,
406 self.config.comparator.as_ref(),
407 )
408 }
409
410 fn current_version(&self) -> Version {
411 self.version_history.read().latest_version().version
412 }
413
414 fn storage_stats(&self) -> crate::Result<crate::StorageStats> {
415 let version = self.current_version();
419 let mut stats =
421 crate::storage_stats::compute_storage_stats(&version, self.is_compacting(), true)?;
422 let (capacity, available, compaction_possible) = self.admission_capacity(stats.used_bytes);
425 stats.capacity_bytes = capacity;
426 stats.available_bytes = available;
427 stats.compaction_possible = compaction_possible;
428 if self.storage_admission_enabled()
436 && capacity.is_some()
437 && stats.status == crate::StorageStatus::Healthy
438 {
439 let sst_need = crate::storage_stats::full_compaction_demand_bytes(&version);
447 let sst_dest_level = self.0.config.level_count.saturating_sub(1);
450 let quota_headroom = self.quota_headroom(stats.used_bytes);
451 let full_fits = crate::compaction::worker::space_fits_two_layer(
452 &self.0.config,
453 quota_headroom,
454 sst_need,
455 sst_dest_level,
456 0,
457 );
458 stats.status = if full_fits {
459 crate::StorageStatus::FullCompactionAvailable
460 } else {
461 crate::StorageStatus::TightCompactionAvailable
462 };
463 }
464 if self.is_read_only() {
468 stats.status = crate::StorageStatus::ReadOnlyOutOfSpace;
469 }
470 Ok(stats)
471 }
472
473 fn write_admission(&self) -> crate::Result<()> {
474 self.compute_write_admission()
475 }
476
477 fn write_backpressure(
478 &self,
479 strategy: &dyn crate::compaction::CompactionStrategy,
480 ) -> crate::Backpressure {
481 let thresholds = self.0.runtime_config.load().backpressure;
485 if thresholds.is_off() {
486 return crate::Backpressure::None;
487 }
488 let version = self.current_version();
489 let l0_count = version
493 .iter_levels()
494 .next()
495 .map_or(0, |level| level.table_count());
496 let pending = strategy.pending_compaction_bytes(&version);
497 crate::Backpressure::compute(l0_count, pending, &thresholds)
498 }
499
500 fn get_flush_lock(&self) -> FlushGuard<'_> {
501 self.flush_lock.lock()
502 }
503
504 #[cfg(feature = "metrics")]
505 fn metrics(&self) -> &Arc<crate::Metrics> {
506 &self.0.metrics
507 }
508
509 #[cfg(feature = "metrics")]
510 fn cache_stats(&self) -> crate::CacheStats {
511 let cache = &self.0.config.cache;
512 self.metrics().cache_stats(cache.size(), cache.capacity())
513 }
514
515 fn version_free_list_len(&self) -> usize {
516 self.version_history.read().free_list_len()
517 }
518
519 fn prefix<K: AsRef<[u8]>>(
520 &self,
521 prefix: K,
522 seqno: SeqNo,
523 index: Option<(Arc<Memtable>, SeqNo)>,
524 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
525 Box::new(
526 self.create_prefix(&prefix, seqno, index)
527 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
528 )
529 }
530
531 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
532 &self,
533 range: R,
534 seqno: SeqNo,
535 index: Option<(Arc<Memtable>, SeqNo)>,
536 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
537 Box::new(
538 self.create_range(&range, seqno, index)
539 .map(|kv| IterGuardImpl::Standard(Guard(kv))),
540 )
541 }
542
543 fn range_seekable<K: AsRef<[u8]>, R: RangeBounds<K>>(
544 &self,
545 range: R,
546 seqno: SeqNo,
547 index: Option<(Arc<Memtable>, SeqNo)>,
548 ) -> Box<dyn crate::iter_guard::SeekableGuardIter + 'static> {
549 let (lo, hi) = range_to_user_bounds(&range);
550 let inner = self.create_seekable_range_bounds(lo, hi, seqno, index);
551 Box::new(StandardSeekable { inner })
552 }
553
554 fn batch_range_scan<K: AsRef<[u8]>, R: RangeBounds<K> + 'static, I: IntoIterator<Item = R>>(
555 &self,
556 intervals: I,
557 seqno: SeqNo,
558 index: Option<(Arc<Memtable>, SeqNo)>,
559 ) -> Box<dyn Iterator<Item = IterGuardImpl> + Send + 'static>
560 where
561 I::IntoIter: Send + 'static,
562 {
563 let inner =
566 self.create_seekable_range_bounds(Bound::Unbounded, Bound::Unbounded, seqno, index);
567 let intervals = intervals.into_iter().map(|r| range_to_user_bounds(&r));
568 Box::new(crate::range::BatchRangeScan::new(inner, intervals).map(standard_guard))
569 }
570
571 fn tombstone_count(&self) -> u64 {
573 self.current_version()
574 .iter_tables()
575 .map(Table::tombstone_count)
576 .sum()
577 }
578
579 fn weak_tombstone_count(&self) -> u64 {
581 self.current_version()
582 .iter_tables()
583 .map(Table::weak_tombstone_count)
584 .sum()
585 }
586
587 fn weak_tombstone_reclaimable_count(&self) -> u64 {
589 self.current_version()
590 .iter_tables()
591 .map(Table::weak_tombstone_reclaimable)
592 .sum()
593 }
594
595 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
596 let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
597
598 if is_empty {
599 return Ok(());
600 }
601
602 let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
603
604 let _lock = self.0.major_compaction_lock.write();
606
607 log::info!("Starting drop_range compaction");
608 self.inner_compact(strategy, 0)?;
609 Ok(())
610 }
611
612 fn clear(&self) -> crate::Result<()> {
613 let config = self.tree_config();
614 let mut versions = self.get_version_history_lock();
615
616 let prior = versions.latest_version();
619
620 versions.upgrade_version(
621 &config.path,
622 |v| {
623 let mut copy = v.clone();
624 copy.active_memtable = Arc::new(Memtable::new(
625 self.memtable_id_counter.next(),
626 self.config.comparator.clone(),
627 ));
628 copy.sealed_memtables = Arc::default();
629 copy.version = Version::new(v.version.id() + 1, self.tree_type());
630 Ok(copy)
631 },
632 &config.seqno,
633 &config.visible_seqno,
634 &*config.fs,
635 self.0.runtime_config.load_full(),
636 self.0.config.encryption.clone(),
637 )?;
638
639 versions.drain_obsolete_to_latest();
643 drop(versions); for table in prior.version.iter_tables() {
653 table.mark_as_deleted();
654 }
655 for blob_file in prior.version.blob_files.iter() {
656 blob_file.mark_as_deleted();
657 }
658
659 Ok(())
660 }
661
662 #[doc(hidden)]
663 fn major_compact(
664 &self,
665 target_size: u64,
666 seqno_threshold: SeqNo,
667 ) -> crate::Result<crate::compaction::CompactionResult> {
668 let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
669
670 let _lock = self.0.major_compaction_lock.write();
672
673 log::info!("Starting major compaction");
674 self.inner_compact(strategy, seqno_threshold)
675 }
676
677 fn l0_run_count(&self) -> usize {
678 self.current_version()
679 .level(0)
680 .map(|x| x.run_count())
681 .unwrap_or_default()
682 }
683
684 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
685 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
686 Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
687 }
688
689 fn filter_size(&self) -> u64 {
690 self.current_version()
691 .iter_tables()
692 .map(Table::filter_size)
693 .map(u64::from)
694 .sum()
695 }
696
697 fn pinned_filter_size(&self) -> usize {
698 self.current_version()
699 .iter_tables()
700 .map(Table::pinned_filter_size)
701 .sum()
702 }
703
704 fn pinned_block_index_size(&self) -> usize {
705 self.current_version()
706 .iter_tables()
707 .map(Table::pinned_block_index_size)
708 .sum()
709 }
710
711 fn sealed_memtable_count(&self) -> usize {
712 self.version_history
713 .read()
714 .latest_version()
715 .sealed_memtables
716 .len()
717 }
718
719 fn flush_to_tables_with_rt(
720 &self,
721 stream: impl Iterator<Item = crate::Result<InternalValue>>,
722 range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
723 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
724 use crate::table::multi_writer::MultiWriter;
725 use crate::time::Instant;
726
727 let start = Instant::now();
728
729 let (folder, level_fs) = self.config.tables_folder_for_level(0);
730
731 let data_block_size = self.config.data_block_size_policy.get(0);
732
733 let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
734 let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
735
736 let data_block_compression = self.config.data_block_compression_policy.get(0);
737 let index_block_compression = self.config.index_block_compression_policy.get(0);
738
739 let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
740
741 let index_partitioning = self.config.index_block_partitioning_policy.get(0);
742 let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
743
744 let rc = self.0.runtime_config.load_full();
751
752 log::debug!(
753 "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
754 folder.display(),
755 );
756
757 let mut table_writer = MultiWriter::new(
758 folder.clone(),
759 self.table_id_counter.clone(),
760 64 * 1_024 * 1_024,
761 0,
762 level_fs.clone(),
763 )?
764 .set_comparator(self.config.comparator.clone())
765 .use_data_block_restart_interval(data_block_restart_interval)
766 .use_index_block_restart_interval(index_block_restart_interval)
767 .use_data_block_compression(data_block_compression)
768 .use_index_block_compression(index_block_compression)
769 .use_data_block_size(data_block_size)
770 .use_data_block_hash_ratio(data_block_hash_ratio)
771 .use_bloom_policy({
772 use crate::config::FilterPolicyEntry::{Bloom, None};
773 use crate::table::filter::BloomConstructionPolicy;
774
775 match self.config.filter_policy.get(0) {
776 Bloom(policy) => policy,
777 None => BloomConstructionPolicy::BitsPerKey(0.0),
778 }
779 });
780
781 if index_partitioning {
782 table_writer = table_writer.use_adaptive_index(rc.index_partition_spill_threshold);
788 }
789 if filter_partitioning {
790 table_writer = table_writer.use_partitioned_filter();
791 }
792
793 table_writer = table_writer.use_prefix_extractor(self.config.prefix_extractor.clone());
794 table_writer = table_writer.use_encryption(self.config.encryption.clone());
795 table_writer = table_writer.use_page_ecc(self.config.page_ecc, rc.ecc_scheme);
799 table_writer = table_writer.use_sync_mode(self.config.sync_mode);
800
801 table_writer = table_writer.use_seqno_in_index(rc.seqno_in_index);
802 table_writer = table_writer.use_zone_map(rc.zone_map);
803 table_writer = table_writer.use_columnar(rc.columnar);
804 table_writer = table_writer.use_disable_cow_on_sst(rc.disable_cow_on_sst_files);
805 table_writer = table_writer.use_kv_checksums(rc.kv_checksums, rc.kv_checksum_algo);
810 table_writer = table_writer.use_locator(self.config.locator_policy.get(0));
812
813 #[cfg(zstd_any)]
814 {
815 table_writer = table_writer.use_zstd_dictionary(self.config.zstd_dictionary.clone());
816 }
817
818 table_writer.set_range_tombstones(range_tombstones);
822
823 for item in stream {
824 table_writer.write(item?)?;
825 }
826
827 let result = table_writer.finish()?;
828
829 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
830
831 let pin_filter = self.config.filter_block_pinning_policy.get(0);
832 let pin_index = self.config.index_block_pinning_policy.get(0);
833
834 let tables = result
836 .into_iter()
837 .map(|(table_id, checksum)| -> crate::Result<Table> {
838 Table::recover(
839 folder.join(table_id.to_string()),
840 checksum,
841 0,
842 self.id,
843 table_id,
844 self.config.cache.clone(),
845 self.config.descriptor_table.clone(),
846 level_fs.clone(),
847 pin_filter,
848 pin_index,
849 self.config.encryption.clone(),
850 #[cfg(zstd_any)]
851 self.config.zstd_dictionary.clone(),
852 self.config.comparator.clone(),
853 #[cfg(feature = "metrics")]
854 self.metrics.clone(),
855 )
856 })
857 .collect::<crate::Result<Vec<_>>>()?;
858
859 Ok(Some((tables, None)))
863 }
864
865 #[expect(clippy::significant_drop_tightening)]
866 fn register_tables(
867 &self,
868 tables: &[Table],
869 blob_files: Option<&[BlobFile]>,
870 frag_map: Option<crate::blob_tree::FragmentationMap>,
871 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
872 gc_watermark: SeqNo,
873 ) -> crate::Result<()> {
874 log::trace!(
875 "Registering {} tables, {} blob files",
876 tables.len(),
877 blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
878 );
879
880 for table in tables {
884 table.install_deletion_pause(Arc::clone(&self.deletion_pause));
885 #[cfg(feature = "std")]
886 table.install_background_deleter(Arc::clone(&self.background_deleter));
887 table.install_heal_hints(Arc::clone(&self.heal_hints));
888 }
889 if let Some(bfs) = blob_files {
890 for bf in bfs {
891 bf.install_deletion_pause(Arc::clone(&self.deletion_pause));
892 #[cfg(feature = "std")]
893 bf.install_background_deleter(Arc::clone(&self.background_deleter));
894 }
895 }
896
897 let mut _compaction_state = self.compaction_state.lock();
898 let mut version_lock = self.version_history.write();
899
900 version_lock.upgrade_version(
901 &self.config.path,
902 |current| {
903 let mut copy = current.clone();
904
905 let ctx = crate::version::TransformContext::new(self.config.comparator.as_ref());
906 copy.version = copy.version.with_new_l0_run(
907 tables,
908 blob_files,
909 frag_map.filter(|x| !x.is_empty()),
910 &ctx,
911 );
912
913 for &table_id in sealed_memtables_to_delete {
914 log::trace!("releasing sealed memtable #{table_id}");
915 copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
916 }
917
918 Ok(copy)
919 },
920 &self.config.seqno,
921 &self.config.visible_seqno,
922 &*self.config.fs,
923 self.0.runtime_config.load_full(),
924 self.0.config.encryption.clone(),
925 )?;
926
927 if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark, &*self.config.fs)
928 {
929 log::warn!("Version GC failed: {e:?}");
930 }
931
932 Ok(())
933 }
934
935 fn clear_active_memtable(&self) {
936 use crate::tree::sealed::SealedMemtables;
937
938 let mut version_history_lock = self.version_history.write();
939 let super_version = version_history_lock.latest_version();
940
941 if super_version.active_memtable.is_empty() {
942 return;
943 }
944
945 let mut copy = version_history_lock.latest_version();
946 copy.active_memtable = Arc::new(Memtable::new(
947 self.memtable_id_counter.next(),
948 self.config.comparator.clone(),
949 ));
950 copy.sealed_memtables = Arc::new(SealedMemtables::default());
951
952 copy.seqno = super_version.seqno;
954
955 version_history_lock.replace_latest_version(copy);
956
957 log::trace!("cleared active memtable");
958 }
959
960 fn compact(
961 &self,
962 strategy: Arc<dyn CompactionStrategy>,
963 seqno_threshold: SeqNo,
964 ) -> crate::Result<crate::compaction::CompactionResult> {
965 let _lock = self.0.major_compaction_lock.read();
969
970 self.inner_compact(strategy, seqno_threshold)
971 }
972
973 fn get_next_table_id(&self) -> TableId {
974 self.0.get_next_table_id()
975 }
976
977 fn tree_config(&self) -> &Config {
978 &self.config
979 }
980
981 fn active_memtable(&self) -> Arc<Memtable> {
982 self.version_history.read().latest_version().active_memtable
983 }
984
985 #[expect(clippy::significant_drop_tightening)]
986 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
987 let mut version_history_lock = self.version_history.write();
988 let super_version = version_history_lock.latest_version();
989
990 if super_version.active_memtable.is_empty() {
991 return None;
992 }
993
994 let yanked_memtable = super_version.active_memtable;
995
996 let mut copy = version_history_lock.latest_version();
997 copy.active_memtable = Arc::new(Memtable::new(
998 self.memtable_id_counter.next(),
999 self.config.comparator.clone(),
1000 ));
1001 copy.sealed_memtables =
1002 Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
1003
1004 copy.seqno = super_version.seqno;
1006
1007 version_history_lock.replace_latest_version(copy);
1008
1009 log::trace!(
1010 "rotate: added memtable id={} to sealed memtables",
1011 yanked_memtable.id,
1012 );
1013
1014 Some(yanked_memtable)
1015 }
1016
1017 fn table_count(&self) -> usize {
1018 self.current_version().table_count()
1019 }
1020
1021 fn level_table_count(&self, idx: usize) -> Option<usize> {
1022 self.current_version().level(idx).map(|x| x.table_count())
1023 }
1024
1025 fn approximate_len(&self) -> usize {
1026 let super_version = self.version_history.read().latest_version();
1027
1028 let tables_item_count = self
1029 .current_version()
1030 .iter_tables()
1031 .map(|x| x.metadata.item_count)
1032 .sum::<u64>();
1033
1034 let memtable_count = super_version.active_memtable.len() as u64;
1035 let sealed_count = super_version
1036 .sealed_memtables
1037 .iter()
1038 .map(|mt| mt.len())
1039 .sum::<usize>() as u64;
1040
1041 #[expect(clippy::expect_used, reason = "result should fit into usize")]
1042 (memtable_count + sealed_count + tables_item_count)
1043 .try_into()
1044 .expect("approximate_len too large for usize")
1045 }
1046
1047 fn disk_space(&self) -> u64 {
1048 self.current_version()
1049 .iter_levels()
1050 .map(super::version::Level::size)
1051 .sum()
1052 }
1053
1054 fn approximate_range_stats<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1055 &self,
1056 range: R,
1057 seqno: SeqNo,
1058 ) -> crate::Result<crate::ApproximateRangeStats> {
1059 use crate::table::block_index::BlockIndex;
1060 use core::ops::Bound;
1061
1062 let lo: Bound<&[u8]> = match range.start_bound() {
1063 Bound::Included(k) => Bound::Included(k.as_ref()),
1064 Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1065 Bound::Unbounded => Bound::Unbounded,
1066 };
1067 let hi: Bound<&[u8]> = match range.end_bound() {
1068 Bound::Included(k) => Bound::Included(k.as_ref()),
1069 Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1070 Bound::Unbounded => Bound::Unbounded,
1071 };
1072 let bounds = (lo, hi);
1073
1074 let mut bytes: u64 = 0;
1075 let mut key_count: u64 = 0;
1076
1077 let comparator = self.config.comparator.as_ref();
1082 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1083
1084 for table in super_version.version.iter_tables() {
1088 if !table
1092 .metadata
1093 .key_range
1094 .overlaps_with_bounds_cmp(&bounds, comparator)
1095 {
1096 continue;
1097 }
1098 let Some(table_seqno) = seqno.checked_sub(table.global_seqno()) else {
1104 continue;
1105 };
1106
1107 let Some(last) = table.block_index.iter().next_back() else {
1109 continue;
1110 };
1111 let last = last?;
1112 let data_end = *last.offset() + u64::from(last.size());
1113 if data_end == 0 {
1114 continue;
1115 }
1116
1117 let block_span = |key: &[u8]| -> crate::Result<Option<(u64, u64)>> {
1123 let Some(mut iter) = table.block_index.forward_reader(key, table_seqno) else {
1124 return Ok(None);
1125 };
1126 let Some(handle) = iter.next() else {
1127 return Ok(None);
1128 };
1129 let h = handle?;
1130 let start = *h.offset();
1131 Ok(Some((start, (start + u64::from(h.size())).min(data_end))))
1132 };
1133 let off_lo = match lo {
1134 Bound::Included(k) | Bound::Excluded(k) => {
1135 block_span(k)?.map_or(data_end, |(start, _)| start)
1136 }
1137 Bound::Unbounded => 0,
1138 };
1139 let off_lo = match table.restrict_lower_bound() {
1144 Some(rb) => {
1145 off_lo.max(block_span(rb.as_ref())?.map_or(data_end, |(start, _)| start))
1146 }
1147 None => off_lo,
1148 };
1149 let off_hi = match hi {
1150 Bound::Included(k) | Bound::Excluded(k) => {
1151 block_span(k)?.map_or(data_end, |(_, end)| end)
1152 }
1153 Bound::Unbounded => data_end,
1154 };
1155 let idx_bytes = off_hi.saturating_sub(off_lo);
1156 if idx_bytes == 0 {
1157 continue;
1158 }
1159
1160 let num = u128::from(idx_bytes);
1168 let den = u128::from(data_end);
1169 let blob_bytes = table.referenced_blob_bytes()?;
1170 let sst_blob = u64::try_from(u128::from(blob_bytes) * num / den).unwrap_or(u64::MAX);
1171 let in_range_entries = u64::try_from(u128::from(table.metadata.item_count) * num / den)
1175 .unwrap_or(u64::MAX)
1176 .max(1);
1177 bytes = bytes.saturating_add(idx_bytes).saturating_add(sst_blob);
1178 key_count = key_count.saturating_add(in_range_entries);
1179 }
1180
1181 let mt_range = (
1186 match lo {
1187 Bound::Included(k) => {
1188 Bound::Included(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Tombstone))
1189 }
1190 Bound::Excluded(k) => {
1191 Bound::Excluded(InternalKey::new(k, 0, crate::ValueType::Tombstone))
1192 }
1193 Bound::Unbounded => Bound::Unbounded,
1194 },
1195 match hi {
1196 Bound::Included(k) => {
1197 Bound::Included(InternalKey::new(k, 0, crate::ValueType::Value))
1198 }
1199 Bound::Excluded(k) => {
1200 Bound::Excluded(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Value))
1201 }
1202 Bound::Unbounded => Bound::Unbounded,
1203 },
1204 );
1205 let estimate = |mt: &crate::Memtable| -> (u64, u64) {
1206 let total = mt.len() as u64;
1207 if total == 0 {
1208 return (0, 0);
1209 }
1210 let count = mt
1213 .range_internal(mt_range.clone())
1214 .filter(|kv| kv.key.seqno < seqno)
1215 .count() as u64;
1216 if count == 0 {
1217 return (0, 0);
1218 }
1219 let mt_bytes =
1220 u64::try_from(u128::from(mt.size()) * u128::from(count) / u128::from(total))
1221 .unwrap_or(u64::MAX);
1222 (mt_bytes, count)
1223 };
1224 let (b, c) = estimate(&super_version.active_memtable);
1225 bytes = bytes.saturating_add(b);
1226 key_count = key_count.saturating_add(c);
1227 for mt in super_version.sealed_memtables.iter() {
1228 let (b, c) = estimate(mt);
1229 bytes = bytes.saturating_add(b);
1230 key_count = key_count.saturating_add(c);
1231 }
1232
1233 Ok(crate::ApproximateRangeStats { bytes, key_count })
1234 }
1235
1236 fn approximate_range_cardinality<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1237 &self,
1238 range: R,
1239 seqno: SeqNo,
1240 ) -> crate::Result<crate::RangeCardinality> {
1241 use crate::table::block_index::BlockIndex;
1242 use core::cmp::Ordering;
1243 use core::ops::Bound;
1244
1245 let lo: Bound<&[u8]> = match range.start_bound() {
1246 Bound::Included(k) => Bound::Included(k.as_ref()),
1247 Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1248 Bound::Unbounded => Bound::Unbounded,
1249 };
1250 let hi: Bound<&[u8]> = match range.end_bound() {
1251 Bound::Included(k) => Bound::Included(k.as_ref()),
1252 Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1253 Bound::Unbounded => Bound::Unbounded,
1254 };
1255 let bounds = (lo, hi);
1256 let comparator = self.config.comparator.as_ref();
1257 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1258
1259 let mut rows: u64 = 0;
1260 let mut total_rows: u64 = 0;
1261
1262 for table in super_version.version.iter_tables() {
1263 total_rows = total_rows.saturating_add(table.metadata.item_count);
1264 if !table
1265 .metadata
1266 .key_range
1267 .overlaps_with_bounds_cmp(&bounds, comparator)
1268 {
1269 continue;
1270 }
1271 let Some(table_seqno) = seqno.checked_sub(table.global_seqno()) else {
1274 continue;
1275 };
1276 let eff_lo = effective_lower_bound(
1281 lo,
1282 table.restrict_lower_bound().map(AsRef::as_ref),
1283 comparator,
1284 );
1285 let zone_map = &table.zone_map;
1286 if !zone_map.is_empty() {
1287 let reader = match eff_lo {
1294 Bound::Included(k) | Bound::Excluded(k) => {
1295 table.block_index.forward_reader(k, table_seqno)
1296 }
1297 Bound::Unbounded => Some(table.block_index.iter()),
1298 };
1299 if let Some(reader) = reader {
1300 for handle in reader {
1301 let handle = handle?;
1302 let Some(col) = zone_map
1303 .columns_for(*handle.offset())
1304 .and_then(|c| c.first())
1305 else {
1306 continue;
1307 };
1308 let above_hi = match hi {
1309 Bound::Included(hk) => {
1310 comparator.compare(&col.min, hk) == Ordering::Greater
1311 }
1312 Bound::Excluded(hk) => {
1313 comparator.compare(&col.min, hk) != Ordering::Less
1314 }
1315 Bound::Unbounded => false,
1316 };
1317 if above_hi {
1318 break;
1319 }
1320 rows = rows.saturating_add(u64::from(col.row_count));
1321 }
1322 }
1323 } else if let Some(last) = table.block_index.iter().next_back() {
1324 let last = last?;
1327 let data_end = *last.offset() + u64::from(last.size());
1328 if data_end > 0 {
1329 let off = |key: &[u8], end: bool| -> crate::Result<u64> {
1330 match table.block_index.forward_reader(key, table_seqno) {
1331 Some(mut it) => match it.next() {
1332 Some(h) => {
1333 let h = h?;
1334 Ok(if end {
1335 (*h.offset() + u64::from(h.size())).min(data_end)
1336 } else {
1337 *h.offset()
1338 })
1339 }
1340 None => Ok(data_end),
1341 },
1342 None => Ok(data_end),
1343 }
1344 };
1345 let off_lo = match eff_lo {
1346 Bound::Included(k) | Bound::Excluded(k) => off(k, false)?,
1347 Bound::Unbounded => 0,
1348 };
1349 let off_hi = match hi {
1350 Bound::Included(k) | Bound::Excluded(k) => off(k, true)?,
1351 Bound::Unbounded => data_end,
1352 };
1353 let idx_bytes = off_hi.saturating_sub(off_lo);
1354 if idx_bytes > 0 {
1355 let est = u64::try_from(
1356 u128::from(table.metadata.item_count) * u128::from(idx_bytes)
1357 / u128::from(data_end),
1358 )
1359 .unwrap_or(u64::MAX)
1360 .max(1);
1361 rows = rows.saturating_add(est);
1362 }
1363 }
1364 }
1365 }
1366
1367 let mt_range = (
1370 match lo {
1371 Bound::Included(k) => {
1372 Bound::Included(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Tombstone))
1373 }
1374 Bound::Excluded(k) => {
1375 Bound::Excluded(InternalKey::new(k, 0, crate::ValueType::Tombstone))
1376 }
1377 Bound::Unbounded => Bound::Unbounded,
1378 },
1379 match hi {
1380 Bound::Included(k) => {
1381 Bound::Included(InternalKey::new(k, 0, crate::ValueType::Value))
1382 }
1383 Bound::Excluded(k) => {
1384 Bound::Excluded(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Value))
1385 }
1386 Bound::Unbounded => Bound::Unbounded,
1387 },
1388 );
1389 let mut add_memtable = |mt: &crate::Memtable| {
1390 total_rows = total_rows.saturating_add(mt.len() as u64);
1391 let in_range = mt
1392 .range_internal(mt_range.clone())
1393 .filter(|kv| kv.key.seqno < seqno)
1394 .count() as u64;
1395 rows = rows.saturating_add(in_range);
1396 };
1397 add_memtable(&super_version.active_memtable);
1398 for mt in super_version.sealed_memtables.iter() {
1399 add_memtable(mt);
1400 }
1401
1402 #[expect(
1405 clippy::cast_precision_loss,
1406 reason = "row counts never approach 2^52; the ratio is approximate anyway"
1407 )]
1408 let selectivity = if total_rows == 0 {
1409 0.0
1410 } else {
1411 (rows.min(total_rows) as f64) / (total_rows as f64)
1412 };
1413 Ok(crate::RangeCardinality { rows, selectivity })
1414 }
1415
1416 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
1417 let version = self.version_history.read().latest_version();
1418
1419 let active = version.active_memtable.get_highest_seqno();
1420
1421 let sealed = version
1422 .sealed_memtables
1423 .iter()
1424 .map(|mt| mt.get_highest_seqno())
1425 .max()
1426 .flatten();
1427
1428 active.max(sealed)
1429 }
1430
1431 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
1432 self.current_version()
1433 .iter_tables()
1434 .map(Table::get_highest_seqno)
1435 .max()
1436 }
1437
1438 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
1439 let key = key.as_ref();
1440
1441 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1442
1443 Self::resolve_or_passthrough(
1444 &super_version,
1445 key,
1446 seqno,
1447 self.config.merge_operator.as_ref(),
1448 self.config.comparator.as_ref(),
1449 )
1450 }
1451
1452 fn get_pinned<K: AsRef<[u8]>>(
1453 &self,
1454 key: K,
1455 seqno: SeqNo,
1456 ) -> crate::Result<Option<crate::PinnableSlice>> {
1457 let key = key.as_ref();
1458
1459 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1460
1461 Self::resolve_or_passthrough_pinned(
1462 &super_version,
1463 key,
1464 seqno,
1465 self.config.merge_operator.as_ref(),
1466 self.config.comparator.as_ref(),
1467 )
1468 }
1469
1470 #[expect(
1471 clippy::indexing_slicing,
1472 reason = "indices are generated from 0..n range, always in bounds"
1473 )]
1474 fn multi_get<K: AsRef<[u8]>>(
1475 &self,
1476 keys: impl IntoIterator<Item = K>,
1477 seqno: SeqNo,
1478 ) -> crate::Result<Vec<Option<UserValue>>> {
1479 let super_version = self.get_version_for_snapshot(seqno);
1480 let comparator = self.config.comparator.as_ref();
1481 let merge_operator = self.config.merge_operator.as_ref();
1482
1483 let keys: Vec<_> = keys.into_iter().collect();
1485 let n = keys.len();
1486 if n == 0 {
1487 return Ok(Vec::new());
1488 }
1489
1490 if n <= 2 {
1492 return keys
1493 .iter()
1494 .map(|key| {
1495 Self::resolve_or_passthrough(
1496 &super_version,
1497 key.as_ref(),
1498 seqno,
1499 merge_operator,
1500 comparator,
1501 )
1502 })
1503 .collect();
1504 }
1505
1506 let mut internal_entries: Vec<Option<InternalValue>> = vec![None; n];
1510 let mut remaining: Vec<usize> = Vec::with_capacity(n);
1511
1512 for idx in 0..n {
1513 let key = keys[idx].as_ref();
1514
1515 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
1517 internal_entries[idx] = Some(entry);
1518 continue;
1519 }
1520
1521 if let Some(entry) =
1523 Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
1524 {
1525 internal_entries[idx] = Some(entry);
1526 continue;
1527 }
1528
1529 remaining.push(idx);
1530 }
1531
1532 if !remaining.is_empty() {
1535 remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));
1536
1537 let (miss_keys, duplicates) =
1541 Self::dedup_sorted_miss_keys(&remaining, &keys, comparator);
1542
1543 Self::batch_get_from_tables(
1544 &super_version.version,
1545 &keys,
1546 miss_keys,
1547 seqno,
1548 comparator,
1549 &*self.config.fs,
1550 &mut internal_entries,
1551 )?;
1552
1553 Self::fan_out_duplicates(&duplicates, &mut internal_entries);
1554 }
1555
1556 let mut results = vec![None; n];
1558 for idx in 0..n {
1559 let entry = internal_entries[idx].take();
1560 results[idx] = Self::resolve_entry(
1561 &super_version,
1562 keys[idx].as_ref(),
1563 entry,
1564 seqno,
1565 merge_operator,
1566 comparator,
1567 )?;
1568 }
1569
1570 Ok(results)
1571 }
1572
1573 fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
1574 if batch.is_empty() {
1575 return Ok((0, self.active_memtable().size()));
1576 }
1577 Ok(self.append_batch(batch.materialize(seqno)?))
1578 }
1579
1580 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
1581 &self,
1582 key: K,
1583 value: V,
1584 seqno: SeqNo,
1585 ) -> (u64, u64) {
1586 let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
1587 self.append_entry(value)
1588 }
1589
1590 fn merge<K: Into<UserKey>, V: Into<UserValue>>(
1591 &self,
1592 key: K,
1593 operand: V,
1594 seqno: SeqNo,
1595 ) -> (u64, u64) {
1596 let value = InternalValue::new_merge_operand(key, operand, seqno);
1597 self.append_entry(value)
1598 }
1599
1600 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1601 let value = InternalValue::new_tombstone(key, seqno);
1602 self.append_entry(value)
1603 }
1604
1605 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1606 let value = InternalValue::new_weak_tombstone(key, seqno);
1607 self.append_entry(value)
1608 }
1609
1610 fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
1611 let memtable = Arc::clone(&self.version_history.read().latest_version().active_memtable);
1612
1613 memtable.insert_range_tombstone(start.into(), end.into(), seqno)
1614 }
1615}
1616
1617impl Tree {
1618 fn map_event<F>(
1625 entry: InternalValue,
1626 version: &Version,
1627 resolve_indirection: &F,
1628 ) -> crate::Result<ScanSinceEvent>
1629 where
1630 F: Fn(&Version, InternalValue) -> crate::Result<ScanSinceEvent>,
1631 {
1632 if entry.key.value_type == ValueType::Indirection {
1633 return resolve_indirection(version, entry);
1634 }
1635 let seqno = entry.key.seqno;
1636 let key = entry.key.user_key;
1637 Ok(match entry.key.value_type {
1638 ValueType::Value => ScanSinceEvent::Insert {
1639 key,
1640 value: entry.value,
1641 seqno,
1642 },
1643 ValueType::MergeOperand => ScanSinceEvent::MergeOperand {
1644 key,
1645 operand: entry.value,
1646 seqno,
1647 },
1648 ValueType::Tombstone | ValueType::WeakTombstone => {
1649 ScanSinceEvent::PointTombstone { key, seqno }
1650 }
1651 ValueType::Indirection => unreachable!("Indirection handled above"),
1652 })
1653 }
1654
1655 pub(crate) fn scan_since_seqno_with<F>(
1671 &self,
1672 target_seqno: SeqNo,
1673 block_skip: bool,
1674 resolve_indirection: F,
1675 ) -> crate::Result<alloc::vec::IntoIter<ScanSinceEvent>>
1676 where
1677 F: Fn(&Version, InternalValue) -> crate::Result<ScanSinceEvent>,
1678 {
1679 let super_version = self.version_history.read().latest_version();
1680 let version = &super_version.version;
1681
1682 let end_seqno = {
1691 let active = super_version.active_memtable.get_highest_seqno();
1692 let sealed = super_version
1693 .sealed_memtables
1694 .iter()
1695 .map(|mt| mt.get_highest_seqno())
1696 .max()
1697 .flatten();
1698 let tables = version.iter_tables().map(Table::get_highest_seqno).max();
1699 active.max(sealed).max(tables)
1700 };
1701 let Some(end_seqno) = end_seqno else {
1703 return Ok(Vec::new().into_iter());
1704 };
1705
1706 let mut events: Vec<ScanSinceEvent> = Vec::new();
1707
1708 for entry in super_version.active_memtable.iter() {
1710 if entry.key.seqno >= target_seqno && entry.key.seqno <= end_seqno {
1711 events.push(Self::map_event(entry, version, &resolve_indirection)?);
1712 }
1713 }
1714 for memtable in super_version.sealed_memtables.iter() {
1715 for entry in memtable.iter() {
1716 if entry.key.seqno >= target_seqno && entry.key.seqno <= end_seqno {
1717 events.push(Self::map_event(entry, version, &resolve_indirection)?);
1718 }
1719 }
1720 }
1721 for table in version.iter_tables() {
1722 for entry in
1725 table.scan_seqno_range(target_seqno, end_seqno.saturating_add(1), block_skip)?
1726 {
1727 events.push(Self::map_event(entry, version, &resolve_indirection)?);
1728 }
1729 }
1730
1731 let mut push_range_tombstone = |rt: &RangeTombstone| {
1733 if rt.seqno >= target_seqno && rt.seqno <= end_seqno {
1734 events.push(ScanSinceEvent::RangeTombstone {
1735 start_key: rt.start.clone(),
1736 end_key: rt.end.clone(),
1737 seqno: rt.seqno,
1738 });
1739 }
1740 };
1741 for rt in super_version.active_memtable.range_tombstones_sorted() {
1742 push_range_tombstone(&rt);
1743 }
1744 for memtable in super_version.sealed_memtables.iter() {
1745 for rt in memtable.range_tombstones_sorted() {
1746 push_range_tombstone(&rt);
1747 }
1748 }
1749 for table in version.iter_tables() {
1750 for rt in table.range_tombstones() {
1751 push_range_tombstone(rt);
1752 }
1753 }
1754
1755 events.sort_by_key(ScanSinceEvent::seqno);
1757
1758 Ok(events.into_iter())
1759 }
1760
1761 pub fn scan_since_seqno(
1805 &self,
1806 target_seqno: SeqNo,
1807 ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
1808 self.scan_since_seqno_with(target_seqno, true, |_version, _entry| {
1812 Err(crate::Error::FeatureUnsupported(
1813 "scan_since_seqno on KV-separated values requires the blob-tree scan path",
1814 ))
1815 })
1816 }
1817
1818 pub fn scan_since_seqno_full_scan(
1844 &self,
1845 target_seqno: SeqNo,
1846 ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
1847 self.scan_since_seqno_with(target_seqno, false, |_version, _entry| {
1848 Err(crate::Error::FeatureUnsupported(
1849 "scan_since_seqno on KV-separated values requires the blob-tree scan path",
1850 ))
1851 })
1852 }
1853
1854 pub fn update_runtime_config<F>(&self, mutator: F) -> crate::Result<()>
1901 where
1902 F: FnOnce(&mut crate::runtime_config::RuntimeConfig),
1903 {
1904 let mut auto_heal = false;
1917 self.0.runtime_config.try_update(|c| {
1918 mutator(c);
1919 auto_heal = c.auto_heal;
1920 })?;
1921 self.0.heal_hints.set_enabled(auto_heal);
1922 *self.0.admission_used_cache.lock() = None;
1926 Ok(())
1927 }
1928
1929 #[must_use]
1932 pub fn runtime_config(&self) -> Arc<crate::runtime_config::RuntimeConfig> {
1933 self.0.runtime_config.load_full()
1934 }
1935
1936 #[must_use]
1976 pub fn heal_hints(&self) -> Arc<crate::heal_hints::HealHints> {
1977 Arc::clone(&self.0.heal_hints)
1978 }
1979
1980 fn resolve_or_passthrough(
1983 super_version: &SuperVersion,
1984 key: &[u8],
1985 seqno: SeqNo,
1986 merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
1987 comparator: &dyn crate::comparator::UserComparator,
1988 ) -> crate::Result<Option<UserValue>> {
1989 let entry = Self::get_value(super_version, key, seqno, comparator)?;
1990
1991 match entry {
1992 Some((ValueType::MergeOperand, entry_seqno, value)) => {
1993 if let Some(merge_op) = merge_operator {
1994 Self::resolve_merge_via_pipeline(
1998 super_version.clone(),
1999 key,
2000 seqno,
2001 Arc::clone(merge_op),
2002 )
2003 } else if Self::is_suppressed_by_range_tombstones(
2004 super_version,
2005 key,
2006 entry_seqno,
2007 seqno,
2008 comparator,
2009 ) {
2010 Ok(None)
2011 } else {
2012 Ok(Some(value))
2013 }
2014 }
2015 Some((_, _, value)) => Ok(Some(value)),
2016 None => Ok(None),
2017 }
2018 }
2019
2020 fn resolve_pinned_entry(
2024 super_version: &SuperVersion,
2025 key: &[u8],
2026 entry: InternalValue,
2027 seqno: SeqNo,
2028 merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2029 comparator: &dyn crate::comparator::UserComparator,
2030 wrap: impl FnOnce(UserValue) -> crate::PinnableSlice,
2031 ) -> crate::Result<Option<crate::PinnableSlice>> {
2032 use crate::PinnableSlice;
2033
2034 let Some(entry) = ignore_tombstone_value(entry) else {
2035 return Ok(None);
2036 };
2037 if Self::is_suppressed_by_range_tombstones(
2038 super_version,
2039 key,
2040 entry.key.seqno,
2041 seqno,
2042 comparator,
2043 ) {
2044 return Ok(None);
2045 }
2046 if entry.key.value_type == ValueType::MergeOperand
2047 && let Some(merge_op) = merge_operator
2048 {
2049 return Self::resolve_merge_via_pipeline(
2051 super_version.clone(),
2052 key,
2053 seqno,
2054 Arc::clone(merge_op),
2055 )
2056 .map(|opt| opt.map(PinnableSlice::owned));
2057 }
2058 Ok(Some(wrap(entry.value)))
2059 }
2060
2061 fn resolve_or_passthrough_pinned(
2064 super_version: &SuperVersion,
2065 key: &[u8],
2066 seqno: SeqNo,
2067 merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2068 comparator: &dyn crate::comparator::UserComparator,
2069 ) -> crate::Result<Option<crate::PinnableSlice>> {
2070 use crate::PinnableSlice;
2071
2072 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2074 return Self::resolve_pinned_entry(
2075 super_version,
2076 key,
2077 entry,
2078 seqno,
2079 merge_operator,
2080 comparator,
2081 PinnableSlice::owned,
2082 );
2083 }
2084
2085 if let Some(entry) =
2087 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2088 {
2089 return Self::resolve_pinned_entry(
2090 super_version,
2091 key,
2092 entry,
2093 seqno,
2094 merge_operator,
2095 comparator,
2096 PinnableSlice::owned,
2097 );
2098 }
2099
2100 let key_hash = crate::hash::hash64(key);
2102
2103 if let Some((entry, block)) = Self::get_internal_entry_with_block_from_tables(
2104 &super_version.version,
2105 key,
2106 seqno,
2107 key_hash,
2108 comparator,
2109 )? {
2110 return Self::resolve_pinned_entry(
2111 super_version,
2112 key,
2113 entry,
2114 seqno,
2115 merge_operator,
2116 comparator,
2117 |value| PinnableSlice::pinned(block, value),
2118 );
2119 }
2120
2121 Ok(None)
2122 }
2123
2124 fn get_internal_entry_with_block_from_tables(
2127 version: &Version,
2128 key: &[u8],
2129 seqno: SeqNo,
2130 key_hash: u64,
2131 comparator: &dyn crate::comparator::UserComparator,
2132 ) -> crate::Result<Option<(InternalValue, crate::table::Block)>> {
2133 Self::find_in_tables::<TableEntryWithBlock>(version, key, seqno, key_hash, comparator)
2134 }
2135
2136 pub(crate) fn resolve_merge_via_pipeline(
2146 version: SuperVersion,
2147 key: &[u8],
2148 seqno: SeqNo,
2149 merge_operator: Arc<dyn crate::merge_operator::MergeOperator>,
2150 ) -> crate::Result<Option<UserValue>> {
2151 use crate::range::{IterState, TreeIter};
2152
2153 let key_hash = crate::hash::hash64(key);
2154 let bloom_key = crate::Slice::from(key);
2158 let comparator = version.active_memtable.comparator.clone();
2159
2160 let iter_state = IterState {
2161 version,
2162 ephemeral: None,
2163 merge_operator: Some(merge_operator),
2164 comparator,
2165 prefix_hash: None,
2166 key_hash: Some(key_hash),
2167 bloom_key: Some(bloom_key),
2168 #[cfg(feature = "metrics")]
2169 metrics: None,
2170 };
2171
2172 let mut iter = TreeIter::create_range_point(iter_state, key, seqno);
2176
2177 match iter.next() {
2178 Some(Ok(entry)) => Ok(Some(entry.value)),
2179 Some(Err(e)) => Err(e),
2180 None => Ok(None),
2181 }
2182 }
2183
2184 #[doc(hidden)]
2185 pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
2186 version: SuperVersion,
2187 range: &'a R,
2188 seqno: SeqNo,
2189 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
2190 merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
2191 comparator: crate::comparator::SharedComparator,
2192 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
2193 Self::create_internal_range_with_prefix_hash(
2194 version,
2195 range,
2196 seqno,
2197 ephemeral,
2198 merge_operator,
2199 comparator,
2200 None,
2201 )
2202 }
2203
2204 #[doc(hidden)]
2207 pub(crate) fn create_internal_range_with_prefix_hash<
2208 'a,
2209 K: AsRef<[u8]> + 'a,
2210 R: RangeBounds<K> + 'a,
2211 >(
2212 version: SuperVersion,
2213 range: &'a R,
2214 seqno: SeqNo,
2215 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
2216 merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
2217 comparator: crate::comparator::SharedComparator,
2218 prefix_hash: Option<u64>,
2219 ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
2220 use crate::range::{IterState, TreeIter};
2221 use core::ops::Bound::{self, Excluded, Included, Unbounded};
2222
2223 let lo: Bound<UserKey> = match range.start_bound() {
2224 Included(x) => Included(x.as_ref().into()),
2225 Excluded(x) => Excluded(x.as_ref().into()),
2226 Unbounded => Unbounded,
2227 };
2228
2229 let hi: Bound<UserKey> = match range.end_bound() {
2230 Included(x) => Included(x.as_ref().into()),
2231 Excluded(x) => Excluded(x.as_ref().into()),
2232 Unbounded => Unbounded,
2233 };
2234
2235 let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
2236
2237 let iter_state = IterState {
2238 version,
2239 ephemeral,
2240 merge_operator,
2241 comparator,
2242 prefix_hash,
2243 key_hash: None,
2244 bloom_key: None,
2245 #[cfg(feature = "metrics")]
2246 metrics: None,
2247 };
2248
2249 TreeIter::create_range(iter_state, bounds, seqno)
2250 }
2251
2252 pub(crate) fn get_internal_entry_from_version(
2253 super_version: &SuperVersion,
2254 key: &[u8],
2255 seqno: SeqNo,
2256 comparator: &dyn crate::comparator::UserComparator,
2257 ) -> crate::Result<Option<InternalValue>> {
2258 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2262 let Some(entry) = ignore_tombstone_value(entry) else {
2263 return Ok(None);
2264 };
2265
2266 if Self::is_suppressed_by_range_tombstones(
2268 super_version,
2269 key,
2270 entry.key.seqno,
2271 seqno,
2272 comparator,
2273 ) {
2274 return Ok(None);
2275 }
2276 return Ok(Some(entry));
2277 }
2278
2279 if let Some(entry) =
2281 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2282 {
2283 let Some(entry) = ignore_tombstone_value(entry) else {
2284 return Ok(None);
2285 };
2286
2287 if Self::is_suppressed_by_range_tombstones(
2288 super_version,
2289 key,
2290 entry.key.seqno,
2291 seqno,
2292 comparator,
2293 ) {
2294 return Ok(None);
2295 }
2296 return Ok(Some(entry));
2297 }
2298
2299 let entry =
2301 Self::get_internal_entry_from_tables(&super_version.version, key, seqno, comparator)?;
2302
2303 if let Some(entry) = entry {
2304 if Self::is_suppressed_by_range_tombstones(
2305 super_version,
2306 key,
2307 entry.key.seqno,
2308 seqno,
2309 comparator,
2310 ) {
2311 return Ok(None);
2312 }
2313 return Ok(Some(entry));
2314 }
2315
2316 Ok(None)
2317 }
2318
2319 pub(crate) fn get_value(
2328 super_version: &SuperVersion,
2329 key: &[u8],
2330 seqno: SeqNo,
2331 comparator: &dyn crate::comparator::UserComparator,
2332 ) -> crate::Result<Option<(ValueType, SeqNo, crate::Slice)>> {
2333 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2334 let Some(entry) = ignore_tombstone_value(entry) else {
2335 return Ok(None);
2336 };
2337 if Self::is_suppressed_by_range_tombstones(
2338 super_version,
2339 key,
2340 entry.key.seqno,
2341 seqno,
2342 comparator,
2343 ) {
2344 return Ok(None);
2345 }
2346 return Ok(Some((entry.key.value_type, entry.key.seqno, entry.value)));
2347 }
2348
2349 if let Some(entry) =
2350 Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2351 {
2352 let Some(entry) = ignore_tombstone_value(entry) else {
2353 return Ok(None);
2354 };
2355 if Self::is_suppressed_by_range_tombstones(
2356 super_version,
2357 key,
2358 entry.key.seqno,
2359 seqno,
2360 comparator,
2361 ) {
2362 return Ok(None);
2363 }
2364 return Ok(Some((entry.key.value_type, entry.key.seqno, entry.value)));
2365 }
2366
2367 let key_hash = crate::hash::hash64(key);
2368 let entry = Self::find_in_tables::<TableValue>(
2369 &super_version.version,
2370 key,
2371 seqno,
2372 key_hash,
2373 comparator,
2374 )?;
2375 if let Some((value_type, entry_seqno, value)) = entry {
2376 if Self::is_suppressed_by_range_tombstones(
2377 super_version,
2378 key,
2379 entry_seqno,
2380 seqno,
2381 comparator,
2382 ) {
2383 return Ok(None);
2384 }
2385 return Ok(Some((value_type, entry_seqno, value)));
2386 }
2387
2388 Ok(None)
2389 }
2390
2391 pub(crate) fn is_suppressed_by_range_tombstones(
2394 super_version: &SuperVersion,
2395 key: &[u8],
2396 key_seqno: SeqNo,
2397 read_seqno: SeqNo,
2398 comparator: &dyn crate::comparator::UserComparator,
2399 ) -> bool {
2400 if super_version
2403 .active_memtable
2404 .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno)
2405 {
2406 return true;
2407 }
2408
2409 for mt in super_version.sealed_memtables.iter().rev() {
2411 if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) {
2412 return true;
2413 }
2414 }
2415
2416 for table in super_version
2423 .version
2424 .iter_levels()
2425 .flat_map(|lvl| lvl.iter())
2426 .flat_map(|run| run.iter())
2427 .filter(|t| !t.range_tombstones().is_empty())
2428 .filter(|t| {
2429 let kr = &t.metadata.key_range;
2431 comparator.compare(kr.min(), key) != core::cmp::Ordering::Greater
2432 && comparator.compare(key, kr.max()) != core::cmp::Ordering::Greater
2433 })
2434 {
2435 let rts = table.range_tombstones();
2436
2437 let candidate_end = rts.partition_point(|rt| {
2440 comparator.compare(&rt.start, key) != core::cmp::Ordering::Greater
2441 });
2442
2443 for rt in rts.iter().take(candidate_end) {
2444 if rt.visible_at(read_seqno)
2446 && comparator.compare(&rt.start, key) != core::cmp::Ordering::Greater
2447 && comparator.compare(key, &rt.end) == core::cmp::Ordering::Less
2448 && key_seqno < rt.seqno
2449 {
2450 return true;
2451 }
2452 }
2453 }
2454
2455 false
2456 }
2457
2458 fn resolve_entry(
2464 super_version: &SuperVersion,
2465 key: &[u8],
2466 entry: Option<InternalValue>,
2467 seqno: SeqNo,
2468 merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2469 comparator: &dyn crate::comparator::UserComparator,
2470 ) -> crate::Result<Option<UserValue>> {
2471 let Some(entry) = entry else {
2472 return Ok(None);
2473 };
2474 Self::resolve_pinned_entry(
2475 super_version,
2476 key,
2477 entry,
2478 seqno,
2479 merge_operator,
2480 comparator,
2481 crate::PinnableSlice::owned,
2482 )
2483 .map(|opt| opt.map(crate::PinnableSlice::into_value))
2484 }
2485
2486 #[expect(
2497 clippy::indexing_slicing,
2498 reason = "remaining/miss_keys carry batch-local key indices < keys.len()"
2499 )]
2500 pub(crate) fn dedup_sorted_miss_keys<K: AsRef<[u8]>>(
2501 remaining: &[usize],
2502 keys: &[K],
2503 comparator: &dyn crate::comparator::UserComparator,
2504 ) -> DedupedMissKeys {
2505 let mut miss_keys: Vec<(usize, u64)> = Vec::with_capacity(remaining.len());
2506 let mut duplicates: Vec<(usize, usize)> = Vec::new();
2507 for &idx in remaining {
2508 let key = keys[idx].as_ref();
2509 match miss_keys.last() {
2510 Some(&(rep_idx, _))
2511 if comparator.compare(keys[rep_idx].as_ref(), key)
2512 == core::cmp::Ordering::Equal =>
2513 {
2514 duplicates.push((idx, rep_idx));
2515 }
2516 _ => miss_keys.push((idx, crate::hash::hash64(key))),
2517 }
2518 }
2519 (miss_keys, duplicates)
2520 }
2521
2522 #[expect(
2527 clippy::indexing_slicing,
2528 reason = "duplicate/representative indices are batch-local key indices < entries.len()"
2529 )]
2530 pub(crate) fn fan_out_duplicates(
2531 duplicates: &[(usize, usize)],
2532 internal_entries: &mut [Option<InternalValue>],
2533 ) {
2534 for &(dup_idx, rep_idx) in duplicates {
2535 let resolved = internal_entries[rep_idx].clone();
2536 internal_entries[dup_idx] = resolved;
2537 }
2538 }
2539
2540 #[expect(
2548 clippy::indexing_slicing,
2549 reason = "miss_keys entries carry batch-local indices; callers must pass a results slice aligned with keys"
2550 )]
2551 pub(crate) fn batch_get_from_tables<K: AsRef<[u8]>>(
2552 version: &Version,
2553 keys: &[K],
2554 miss_keys: Vec<(usize, u64)>,
2555 seqno: SeqNo,
2556 comparator: &dyn crate::comparator::UserComparator,
2557 fs: &dyn crate::fs::Fs,
2558 results: &mut [Option<InternalValue>],
2559 ) -> crate::Result<()> {
2560 debug_assert_eq!(results.len(), keys.len());
2561 debug_assert!(miss_keys.iter().all(|&(i, _)| i < keys.len()));
2562
2563 let mut still_remaining = miss_keys;
2565
2566 for (level_idx, level) in version.iter_levels().enumerate() {
2567 if still_remaining.is_empty() {
2568 break;
2569 }
2570
2571 if Self::prewarm_level_cross_sst(fs, level, &still_remaining, keys, seqno, comparator)
2580 && Self::resolve_level_chunked(
2581 fs,
2582 level,
2583 &mut still_remaining,
2584 keys,
2585 seqno,
2586 comparator,
2587 results,
2588 )?
2589 {
2590 continue;
2591 }
2592
2593 if level_idx == 0 {
2594 let mut at_ceiling = vec![false; keys.len()];
2599
2600 for run in level.iter() {
2601 let resolved = Self::resolve_run_batched(
2605 run,
2606 &still_remaining,
2607 keys,
2608 seqno,
2609 comparator,
2610 |idx| at_ceiling[idx],
2611 )?;
2612 for (idx, _hash, item) in resolved.covered {
2613 let Some(item) = item else { continue };
2614 match &results[idx] {
2615 Some(current) if current.key.seqno >= item.key.seqno => {}
2616 _ => {
2617 if item.key.seqno.checked_add(1) == Some(seqno) {
2618 at_ceiling[idx] = true;
2619 }
2620 results[idx] = Some(item);
2621 }
2622 }
2623 }
2624 }
2627
2628 still_remaining.retain(|&(idx, _)| results[idx].is_none());
2630 } else {
2631 let mut covered_miss: Vec<(usize, u64)> = Vec::new();
2636
2637 for run in level.iter() {
2638 let resolved = Self::resolve_run_batched(
2639 run,
2640 &still_remaining,
2641 keys,
2642 seqno,
2643 comparator,
2644 |_| false,
2645 )?;
2646 for (idx, hash, item) in resolved.covered {
2647 if let Some(item) = item {
2648 results[idx] = Some(item);
2649 } else {
2650 covered_miss.push((idx, hash));
2653 }
2654 }
2655 still_remaining = resolved.not_covered;
2656 }
2657
2658 let needs_sort = !covered_miss.is_empty();
2662 still_remaining.extend(covered_miss);
2663 if needs_sort {
2664 still_remaining.sort_by(|&(a, _), &(b, _)| {
2665 comparator.compare(keys[a].as_ref(), keys[b].as_ref())
2666 });
2667 }
2668 }
2669 }
2670
2671 Ok(())
2672 }
2673
2674 #[expect(
2686 clippy::indexing_slicing,
2687 reason = "i < remaining.len() is loop-checked; idx values are valid key indices (caller's keys/results are aligned, same as batch_get_from_tables)"
2688 )]
2689 fn resolve_run_batched<K: AsRef<[u8]>>(
2690 run: &crate::version::Run<crate::Table>,
2691 remaining: &[(usize, u64)],
2692 keys: &[K],
2693 seqno: SeqNo,
2694 comparator: &dyn crate::comparator::UserComparator,
2695 skip: impl Fn(usize) -> bool,
2696 ) -> crate::Result<RunResolve> {
2697 let mut covered: Vec<CoveredKey> = Vec::new();
2698 let mut not_covered: Vec<(usize, u64)> = Vec::new();
2699
2700 let mut i = 0;
2701 while i < remaining.len() {
2702 let (idx, hash) = remaining[i];
2703 if skip(idx) {
2704 i += 1;
2705 continue;
2706 }
2707 let key = keys[idx].as_ref();
2708 let Some(table) = run.get_for_key_cmp(key, comparator) else {
2709 not_covered.push((idx, hash));
2710 i += 1;
2711 continue;
2712 };
2713
2714 let table_id = table.id();
2719 let mut batch: Vec<(&[u8], u64)> = Vec::new();
2720 let mut batch_keys: Vec<(usize, u64)> = Vec::new();
2721 while i < remaining.len() {
2722 let (jdx, jhash) = remaining[i];
2723 if skip(jdx) {
2724 i += 1;
2725 continue;
2726 }
2727 let jkey = keys[jdx].as_ref();
2728 match run.get_for_key_cmp(jkey, comparator) {
2729 Some(t) if t.id() == table_id => {
2730 batch.push((jkey, jhash));
2731 batch_keys.push((jdx, jhash));
2732 i += 1;
2733 }
2734 _ => break,
2735 }
2736 }
2737
2738 for ((kidx, khash), item) in batch_keys.into_iter().zip(table.batch_get(&batch, seqno)?)
2739 {
2740 covered.push((kidx, khash, item));
2741 }
2742 }
2743
2744 Ok(RunResolve {
2745 covered,
2746 not_covered,
2747 })
2748 }
2749
2750 #[expect(
2766 clippy::indexing_slicing,
2767 reason = "planned[ti] and all_buffers[k..end] indices are built from `planned` itself, so they are in range by construction"
2768 )]
2769 fn prewarm_level_cross_sst<K: AsRef<[u8]>>(
2770 fs: &dyn crate::fs::Fs,
2771 level: &crate::version::Level,
2772 remaining: &[(usize, u64)],
2773 keys: &[K],
2774 seqno: SeqNo,
2775 comparator: &dyn crate::comparator::UserComparator,
2776 ) -> bool {
2777 let mut planned: Vec<(
2780 &crate::Table,
2781 Arc<dyn crate::fs::FsFile>,
2782 Vec<crate::table::BlockHandle>,
2783 )> = Vec::new();
2784 for run in level.iter() {
2785 let mut i = 0;
2786 while i < remaining.len() {
2787 let (idx, _) = remaining[i];
2788 let key = keys[idx].as_ref();
2789 let Some(table) = run.get_for_key_cmp(key, comparator) else {
2790 i += 1;
2791 continue;
2792 };
2793 let table_id = table.id();
2794 let mut batch: Vec<(&[u8], u64)> = Vec::new();
2795 while i < remaining.len() {
2796 let (jdx, jhash) = remaining[i];
2797 let jkey = keys[jdx].as_ref();
2798 match run.get_for_key_cmp(jkey, comparator) {
2799 Some(t) if t.id() == table_id => {
2800 batch.push((jkey, jhash));
2801 i += 1;
2802 }
2803 _ => break,
2804 }
2805 }
2806 if let Some((file, handles)) = table.plan_prewarm(&batch, seqno) {
2807 planned.push((table, file, handles));
2808 }
2809 }
2810 }
2811
2812 let total_cold: usize = planned.iter().map(|(_, _, h)| h.len()).sum();
2813 if total_cold < 2 {
2814 return false;
2815 }
2816 let Some((first_table, _, _)) = planned.first() else {
2818 return false;
2819 };
2820 let cap = first_table.cache_capacity();
2821 let total_bytes: u64 = planned
2822 .iter()
2823 .flat_map(|(_, _, h)| h.iter().map(|x| u64::from(x.size())))
2824 .sum();
2825 if cap == 0 {
2826 return false;
2827 }
2828 if total_bytes > cap / 2 {
2829 return true;
2832 }
2833
2834 let mut all_buffers: Vec<Vec<u8>> = planned
2836 .iter()
2837 .flat_map(|(_, _, handles)| handles.iter().map(|h| vec![0u8; h.size() as usize]))
2838 .collect();
2839 let flat: Vec<(usize, u64)> = planned
2841 .iter()
2842 .enumerate()
2843 .flat_map(|(ti, (_, _, handles))| handles.iter().map(move |h| (ti, *h.offset())))
2844 .collect();
2845
2846 {
2847 let mut reqs: Vec<crate::fs::BlockRead<'_>> = flat
2848 .iter()
2849 .zip(all_buffers.iter_mut())
2850 .map(|(&(ti, offset), buf)| crate::fs::BlockRead {
2851 file: planned[ti].1.as_ref(),
2852 offset,
2853 buf: buf.as_mut_slice(),
2854 })
2855 .collect();
2856 if fs.read_blocks_batched(&mut reqs).is_err() {
2859 return false;
2860 }
2861 }
2862
2863 let mut k = 0;
2865 for (table, _, handles) in &planned {
2866 let end = k + handles.len();
2867 table.decode_prewarmed(handles, &all_buffers[k..end]);
2868 k = end;
2869 }
2870 false
2871 }
2872
2873 #[expect(
2882 clippy::indexing_slicing,
2883 reason = "i < remaining.len() loop-checked; idx/jdx are valid key indices; batch_idx[pos] is in range (pos came from this table's own plan)"
2884 )]
2885 fn plan_level_block_tasks<'a, K: AsRef<[u8]>>(
2886 level: &'a crate::version::Level,
2887 remaining: &[(usize, u64)],
2888 keys: &[K],
2889 seqno: SeqNo,
2890 comparator: &dyn crate::comparator::UserComparator,
2891 ) -> crate::Result<Vec<BlockTask<'a>>> {
2892 let mut tasks: Vec<BlockTask<'a>> = Vec::new();
2893 for run in level.iter() {
2894 let mut i = 0;
2895 while i < remaining.len() {
2896 let (idx, _) = remaining[i];
2897 let key = keys[idx].as_ref();
2898 let Some(table) = run.get_for_key_cmp(key, comparator) else {
2899 i += 1;
2900 continue;
2901 };
2902 let table_id = table.id();
2903 let mut batch: Vec<(&[u8], u64)> = Vec::new();
2904 let mut batch_idx: Vec<usize> = Vec::new();
2905 while i < remaining.len() {
2906 let (jdx, jhash) = remaining[i];
2907 let jkey = keys[jdx].as_ref();
2908 match run.get_for_key_cmp(jkey, comparator) {
2909 Some(t) if t.id() == table_id => {
2910 batch.push((jkey, jhash));
2911 batch_idx.push(jdx);
2912 i += 1;
2913 }
2914 _ => break,
2915 }
2916 }
2917 if let Some((file, table_seqno, special, blocks)) =
2918 table.plan_block_tasks(&batch, seqno)?
2919 {
2920 for (handle, positions) in blocks {
2921 let task_keys: Vec<usize> =
2922 positions.iter().map(|&pos| batch_idx[pos]).collect();
2923 tasks.push(BlockTask {
2924 table,
2925 file: Arc::clone(&file),
2926 handle,
2927 table_seqno,
2928 special,
2929 keys: task_keys,
2930 });
2931 }
2932 }
2933 }
2934 }
2935 Ok(tasks)
2936 }
2937
2938 #[expect(
2947 clippy::indexing_slicing,
2948 reason = "start/end stay within tasks by construction"
2949 )]
2950 fn resolve_level_chunked<K: AsRef<[u8]>>(
2951 fs: &dyn crate::fs::Fs,
2952 level: &crate::version::Level,
2953 still_remaining: &mut Vec<(usize, u64)>,
2954 keys: &[K],
2955 seqno: SeqNo,
2956 comparator: &dyn crate::comparator::UserComparator,
2957 results: &mut [Option<InternalValue>],
2958 ) -> crate::Result<bool> {
2959 let tasks = Self::plan_level_block_tasks(level, still_remaining, keys, seqno, comparator)?;
2960 let Some(first) = tasks.first() else {
2961 return Ok(false);
2962 };
2963 if tasks.iter().any(|t| t.special) {
2969 return Ok(false);
2970 }
2971 let budget = (first.table.cache_capacity() / 2).max(1);
2976
2977 let mut start = 0;
2978 while start < tasks.len() {
2979 let mut bytes = 0u64;
2980 let mut end = start;
2981 while end < tasks.len() {
2982 let sz = u64::from(tasks[end].handle.size());
2983 if end > start && bytes + sz > budget {
2984 break;
2985 }
2986 bytes += sz;
2987 end += 1;
2988 }
2989 Self::resolve_block_task_chunk(fs, &tasks[start..end], keys, results)?;
2990 start = end;
2991 }
2992 still_remaining.retain(|&(idx, _)| results[idx].is_none());
2993 Ok(true)
2994 }
2995
2996 #[expect(
3001 clippy::indexing_slicing,
3002 reason = "buffers is built from chunk so indices align; key indices are valid (caller's keys/results aligned)"
3003 )]
3004 fn resolve_block_task_chunk<K: AsRef<[u8]>>(
3005 fs: &dyn crate::fs::Fs,
3006 chunk: &[BlockTask<'_>],
3007 keys: &[K],
3008 results: &mut [Option<InternalValue>],
3009 ) -> crate::Result<()> {
3010 let mut buffers: Vec<Vec<u8>> = chunk
3011 .iter()
3012 .map(|t| vec![0u8; t.handle.size() as usize])
3013 .collect();
3014 {
3015 let mut reqs: Vec<crate::fs::BlockRead<'_>> = chunk
3016 .iter()
3017 .zip(buffers.iter_mut())
3018 .map(|(t, buf)| crate::fs::BlockRead {
3019 file: t.file.as_ref(),
3020 offset: *t.handle.offset(),
3021 buf: buf.as_mut_slice(),
3022 })
3023 .collect();
3024 fs.read_blocks_batched(&mut reqs)?;
3025 }
3026
3027 for (task, buf) in chunk.iter().zip(buffers.iter()) {
3028 if let Some(block) = task.table.decode_data_block_from_bytes(buf)? {
3029 for &kidx in &task.keys {
3030 if let Some(item) = task.table.point_read_translated(
3031 &block,
3032 keys[kidx].as_ref(),
3033 task.table_seqno,
3034 )? {
3035 Self::keep_highest(results, kidx, item);
3036 }
3037 }
3038 }
3039 }
3040 Ok(())
3041 }
3042
3043 #[expect(
3047 clippy::indexing_slicing,
3048 reason = "idx is a valid key index aligned with results"
3049 )]
3050 fn keep_highest(results: &mut [Option<InternalValue>], idx: usize, item: InternalValue) {
3051 match &results[idx] {
3052 Some(current) if current.key.seqno >= item.key.seqno => {}
3053 _ => results[idx] = Some(item),
3054 }
3055 }
3056
3057 fn get_internal_entry_from_tables(
3058 version: &Version,
3059 key: &[u8],
3060 seqno: SeqNo,
3061 comparator: &dyn crate::comparator::UserComparator,
3062 ) -> crate::Result<Option<InternalValue>> {
3063 let key_hash = crate::hash::hash64(key);
3064 Self::find_in_tables::<TableEntry>(version, key, seqno, key_hash, comparator)
3065 }
3066
3067 fn find_in_tables<T: TablePointLookup>(
3073 version: &Version,
3074 key: &[u8],
3075 seqno: SeqNo,
3076 key_hash: u64,
3077 comparator: &dyn crate::comparator::UserComparator,
3078 ) -> crate::Result<Option<T>> {
3079 for (level_idx, level) in version.iter_levels().enumerate() {
3080 if level_idx == 0 {
3081 let mut best: Option<T> = None;
3082
3083 for run in level.iter() {
3084 if let Some(table) = run.get_for_key_cmp(key, comparator)
3085 && let Some(item) = T::lookup(table, key, seqno, key_hash)?
3086 {
3087 match &best {
3088 Some(current) if current.entry_seqno() >= item.entry_seqno() => {}
3089 _ => {
3090 if item.entry_seqno().checked_add(1) == Some(seqno) {
3094 return Ok(item.filter_tombstone());
3095 }
3096 best = Some(item);
3097 }
3098 }
3099 }
3100 }
3101
3102 if let Some(entry) = best {
3103 return Ok(entry.filter_tombstone());
3104 }
3105 } else {
3106 for run in level.iter() {
3110 if let Some(table) = run.get_for_key_cmp(key, comparator) {
3111 if let Some(item) = T::lookup(table, key, seqno, key_hash)? {
3112 return Ok(item.filter_tombstone());
3113 }
3114 break;
3115 }
3116 }
3117 }
3118 }
3119
3120 Ok(None)
3121 }
3122
3123 pub(crate) fn get_internal_entry_from_sealed_memtables(
3124 super_version: &SuperVersion,
3125 key: &[u8],
3126 seqno: SeqNo,
3127 ) -> Option<InternalValue> {
3128 for mt in super_version.sealed_memtables.iter().rev() {
3129 if let Some(entry) = mt.get(key, seqno) {
3130 return Some(entry);
3131 }
3132 }
3133
3134 None
3135 }
3136
3137 pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
3138 self.version_history.read().get_version_for_snapshot(seqno)
3139 }
3140
3141 fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
3152 range: &R,
3153 ) -> (OwnedBounds, bool) {
3154 use Bound::{Excluded, Included, Unbounded};
3155
3156 let start = match range.start_bound() {
3157 Included(key) => Included(Slice::from(key.as_ref())),
3158 Excluded(key) => Excluded(Slice::from(key.as_ref())),
3159 Unbounded => Unbounded,
3160 };
3161
3162 let end = match range.end_bound() {
3163 Included(key) => Included(Slice::from(key.as_ref())),
3164 Excluded(key) => Excluded(Slice::from(key.as_ref())),
3165 Unbounded => Unbounded,
3166 };
3167
3168 let is_empty =
3169 if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
3170 lo.as_ref() > hi.as_ref()
3171 } else {
3172 false
3173 };
3174
3175 (OwnedBounds { start, end }, is_empty)
3176 }
3177
3178 pub(crate) fn open(config: Config) -> crate::Result<Self> {
3191 log::debug!("Opening LSM-tree at {}", config.path.display());
3192
3193 #[cfg(feature = "parallel")]
3200 let config = {
3201 let mut config = config;
3202 if config.compaction_pool.is_none() && config.compaction_threads > 1 {
3203 config.compaction_pool = Some(Arc::new(
3204 crate::table::writer::RayonSpawner::with_threads(config.compaction_threads)?,
3205 ));
3206 }
3207 config
3208 };
3209
3210 if (config.page_ecc || config.initial_runtime_config.page_ecc)
3221 && !cfg!(feature = "page_ecc")
3222 {
3223 return Err(crate::Error::PageEccUnsupported);
3224 }
3225
3226 #[cfg(feature = "std")]
3237 let directory_lock = {
3238 config.fs.create_dir_all(&config.path)?;
3239 crate::config::acquire_directory_lock(&*config.fs, &config.path, config.directory_lock)?
3240 };
3241
3242 if config.fs.exists(&config.path.join("version"))? {
3244 log::error!(
3245 "It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated."
3246 );
3247 return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
3248 }
3249
3250 let tree = match crate::version::recovery::get_current_version(
3254 &config.path,
3255 &*config.fs,
3256 config.encryption.clone(),
3257 ) {
3258 Ok(_) => Self::recover(
3259 config,
3260 #[cfg(feature = "std")]
3261 directory_lock,
3262 ),
3263 Err(crate::Error::Io(e)) if e.kind() == crate::io::ErrorKind::NotFound => {
3264 if has_existing_version_state(&config.path, &*config.fs)? {
3271 let msg = format!(
3281 "Tree::open: refusing to recover {} — `current` pointer is missing \
3282 but the directory still holds version artifacts (tables/, blobs/, \
3283 or vN). This is the on-disk signature of a half-written checkpoint \
3284 or interrupted sealing. Remove the partial directory and retry the \
3285 checkpoint, or restore `current` from a backup before reopening.",
3286 config.path.display(),
3287 );
3288 log::error!("{msg}");
3289 return Err(crate::Error::from(crate::io::Error::new(
3290 crate::io::ErrorKind::InvalidData,
3291 msg,
3292 )));
3293 }
3294 Self::create_new(
3295 config,
3296 #[cfg(feature = "std")]
3297 directory_lock,
3298 )
3299 }
3300 Err(e) => Err(e),
3301 }?;
3302
3303 Ok(tree)
3304 }
3305
3306 #[doc(hidden)]
3308 #[must_use]
3309 pub fn is_compacting(&self) -> bool {
3310 !self.compaction_state.lock().hidden_set().is_empty()
3311 }
3312
3313 fn probe_disk_free(&self) -> u64 {
3329 self.0.config.min_available_space()
3330 }
3331
3332 pub(crate) fn admission_capacity(&self, used: u64) -> (Option<u64>, Option<u64>, bool) {
3343 let quota = self
3344 .0
3345 .runtime_config
3346 .load()
3347 .storage_limit_bytes
3348 .unwrap_or(u64::MAX);
3349 let free = self.probe_disk_free();
3350 let capacity = if free == u64::MAX {
3355 quota
3356 } else {
3357 quota.min(free + used)
3358 };
3359 if capacity == u64::MAX {
3360 return (None, None, true);
3361 }
3362 let available = capacity.saturating_sub(used);
3366 (
3367 Some(capacity),
3368 Some(available),
3369 available >= MIN_RESERVED_HEADROOM,
3370 )
3371 }
3372
3373 pub(crate) fn quota_headroom(&self, used: u64) -> u64 {
3381 self.0
3382 .runtime_config
3383 .load()
3384 .storage_limit_bytes
3385 .map_or(u64::MAX, |limit| limit.saturating_sub(used))
3386 }
3387
3388 pub(crate) fn storage_admission_enabled(&self) -> bool {
3393 self.0.runtime_config.load().storage_admission_check
3394 }
3395
3396 #[expect(
3397 clippy::significant_drop_tightening,
3398 reason = "the admission cache lock intentionally spans the recompute \
3399 (stat + disk-free probe) so concurrent admission checks \
3400 coalesce on a single probe rather than each issuing a syscall"
3401 )]
3402 fn compute_write_admission(&self) -> crate::Result<()> {
3403 let rc = self.0.runtime_config.load();
3404 if !rc.storage_admission_check {
3405 return Ok(());
3406 }
3407
3408 let super_version = self.version_history.read().latest_version();
3415 let vid = super_version.version.id();
3416
3417 let now = crate::time::Instant::now();
3440 let (used, disk_free) = {
3441 let mut cache = self.0.admission_used_cache.lock();
3442 match *cache {
3443 Some((cvid, used, free, at))
3447 if cvid == vid
3448 && cfg!(feature = "std")
3449 && at.elapsed() < ADMISSION_DISK_FREE_TTL =>
3450 {
3451 (used, free)
3452 }
3453 Some((cvid, used, _, _)) if cvid == vid => {
3455 let free = self.probe_disk_free();
3456 *cache = Some((vid, used, free, now));
3457 (used, free)
3458 }
3459 _ => {
3461 let used = crate::storage_stats::compute_used_bytes(&super_version.version)?;
3462 let free = self.probe_disk_free();
3463 *cache = Some((vid, used, free, now));
3464 (used, free)
3465 }
3466 }
3467 };
3468
3469 let quota = rc.storage_limit_bytes.unwrap_or(u64::MAX);
3487 let limit = if disk_free == u64::MAX {
3492 quota
3493 } else {
3494 quota.min(disk_free + used)
3495 };
3496 if limit == u64::MAX {
3498 return Ok(());
3499 }
3500
3501 let pending_memtable_bytes: u64 = super_version.active_memtable.size()
3515 + super_version
3516 .sealed_memtables
3517 .iter()
3518 .map(|m| m.size())
3519 .sum::<u64>();
3520
3521 let reserved =
3522 (pending_memtable_bytes + pending_memtable_bytes / 8).max(MIN_RESERVED_HEADROOM);
3523 match used.checked_add(reserved) {
3527 Some(total) if total <= limit => Ok(()),
3528 _ => Err(crate::Error::StorageFull { used, limit }),
3529 }
3530 }
3531
3532 fn inner_compact(
3533 &self,
3534 strategy: Arc<dyn CompactionStrategy>,
3535 mvcc_gc_watermark: SeqNo,
3536 ) -> crate::Result<crate::compaction::CompactionResult> {
3537 use crate::compaction::worker::{Options, do_compaction};
3538
3539 let mut opts = Options::from_tree(self, strategy);
3540 opts.mvcc_gc_watermark = mvcc_gc_watermark;
3541
3542 let result = do_compaction(&opts)?;
3543
3544 log::debug!("Compaction run over");
3545
3546 Ok(result)
3547 }
3548
3549 #[doc(hidden)]
3550 #[must_use]
3551 pub fn create_iter(
3552 &self,
3553 seqno: SeqNo,
3554 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3555 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3556 self.create_range::<UserKey, _>(&.., seqno, ephemeral)
3557 }
3558
3559 #[doc(hidden)]
3560 pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
3561 &self,
3562 range: &'a R,
3563 seqno: SeqNo,
3564 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3565 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3566 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3567
3568 Self::create_internal_range(
3569 super_version,
3570 range,
3571 seqno,
3572 ephemeral,
3573 self.config.merge_operator.clone(),
3574 self.config.comparator.clone(),
3575 )
3576 .map(|item| match item {
3577 Ok(kv) => Ok((kv.key.user_key, kv.value)),
3578 Err(e) => Err(e),
3579 })
3580 }
3581
3582 #[doc(hidden)]
3585 #[must_use]
3586 pub fn create_seekable_range_bounds(
3587 &self,
3588 lo: Bound<UserKey>,
3589 hi: Bound<UserKey>,
3590 seqno: SeqNo,
3591 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3592 ) -> crate::range::SeekableTreeIter {
3593 use crate::range::{IterState, SeekableTreeIter};
3594
3595 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3596
3597 let iter_state = IterState {
3598 version: super_version,
3599 ephemeral,
3600 merge_operator: self.config.merge_operator.clone(),
3601 comparator: self.config.comparator.clone(),
3602 prefix_hash: None,
3603 key_hash: None,
3604 bloom_key: None,
3605 #[cfg(feature = "metrics")]
3606 metrics: Some(self.0.metrics.clone()),
3607 };
3608
3609 SeekableTreeIter::create(iter_state, lo, hi, seqno)
3610 }
3611
3612 #[doc(hidden)]
3613 pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
3614 &self,
3615 prefix: K,
3616 seqno: SeqNo,
3617 ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3618 ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3619 use crate::prefix::compute_prefix_hash;
3620 use crate::range::{IterState, TreeIter, prefix_to_range};
3621
3622 let prefix_bytes = prefix.as_ref();
3623
3624 let prefix_hash = compute_prefix_hash(self.config.prefix_extractor.as_ref(), prefix_bytes);
3625
3626 let range = prefix_to_range(prefix_bytes);
3627
3628 let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3629
3630 let iter_state = IterState {
3631 version: super_version,
3632 ephemeral,
3633 merge_operator: self.config.merge_operator.clone(),
3634 comparator: self.config.comparator.clone(),
3635 prefix_hash,
3636 key_hash: None,
3637 bloom_key: None,
3638 #[cfg(feature = "metrics")]
3639 metrics: Some(self.0.metrics.clone()),
3640 };
3641
3642 TreeIter::create_range(iter_state, range, seqno).map(|item| match item {
3643 Ok(kv) => Ok((kv.key.user_key, kv.value)),
3644 Err(e) => Err(e),
3645 })
3646 }
3647
3648 #[doc(hidden)]
3652 #[must_use]
3653 pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
3654 use crate::runtime_config::{KvChecksumComputePoint, KvChecksumPolicy};
3655
3656 let kv_digest = {
3668 let rc = self.0.runtime_config.load();
3669 if matches!(
3670 rc.kv_checksum_compute_point,
3671 KvChecksumComputePoint::AtInsert
3672 ) && !matches!(rc.kv_checksums, KvChecksumPolicy::Off)
3673 {
3674 crate::table::block::kv_checksum::kv_digest(&value, rc.kv_checksum_algo).map(|d| {
3675 #[expect(
3676 clippy::cast_possible_truncation,
3677 reason = "AtInsert is config-validated to a 4-byte algorithm; the digest fits u32"
3678 )]
3679 let lo = d as u32;
3680 (lo, rc.kv_checksum_algo)
3681 })
3682 } else {
3683 None
3684 }
3685 };
3686
3687 self.version_history
3692 .read()
3693 .latest_version()
3694 .active_memtable
3695 .insert_with_kv_digest(value, kv_digest)
3696 }
3697
3698 #[doc(hidden)]
3705 #[must_use]
3706 pub(crate) fn append_batch(&self, items: Vec<InternalValue>) -> (u64, u64) {
3707 use crate::runtime_config::{KvChecksumComputePoint, KvChecksumPolicy};
3708
3709 let kv_algo = {
3713 let rc = self.0.runtime_config.load();
3714 if matches!(
3715 rc.kv_checksum_compute_point,
3716 KvChecksumComputePoint::AtInsert
3717 ) && !matches!(rc.kv_checksums, KvChecksumPolicy::Off)
3718 {
3719 Some(rc.kv_checksum_algo)
3720 } else {
3721 None
3722 }
3723 };
3724
3725 self.version_history
3729 .read()
3730 .latest_version()
3731 .active_memtable
3732 .insert_batch_with_kv_algo(items, kv_algo)
3733 }
3734
3735 #[expect(
3741 clippy::too_many_lines,
3742 reason = "Tree::recover threads the whole open sequence (CURRENT validation, \
3743 Manifest decode, encryption + runtime plumbing, version recovery, \
3744 TreeInner assembly) — splitting it would create helper functions whose \
3745 only caller is this one site"
3746 )]
3747 fn recover(
3748 mut config: Config,
3749 #[cfg(feature = "std")] directory_lock: Option<Box<dyn crate::fs::FsFile>>,
3753 ) -> crate::Result<Self> {
3754 use crate::stop_signal::StopSignal;
3755 use inner::get_next_tree_id;
3756
3757 log::info!("Recovering LSM-tree at {}", config.path.display());
3758
3759 let snapshot_id = crate::version::recovery::get_current_version(
3771 &config.path,
3772 &*config.fs,
3773 config.encryption.clone(),
3774 )?;
3775 {
3776 let version_id = snapshot_id;
3777 let manifest_path = config.path.join(format!("v{version_id}"));
3778 let mut archive_reader = crate::manifest_blocks::reader::ManifestArchiveReader::open(
3787 &manifest_path,
3788 &*config.fs,
3789 alloc::sync::Arc::new(crate::runtime_config::RuntimeConfig::default()),
3790 config.encryption.clone(),
3791 )?;
3792 let manifest = Manifest::decode_from(&mut archive_reader)?;
3793
3794 if !matches!(manifest.version, FormatVersion::V5) {
3795 return Err(crate::Error::InvalidVersion(manifest.version.into()));
3796 }
3797
3798 let supplied_name = config.comparator.name();
3799 if manifest.comparator_name != supplied_name {
3800 log::warn!(
3801 "Comparator mismatch: tree was created with {:?} but opened with {:?}",
3802 manifest.comparator_name,
3803 supplied_name,
3804 );
3805 return Err(crate::Error::ComparatorMismatch {
3806 stored: manifest.comparator_name,
3807 supplied: supplied_name,
3808 });
3809 }
3810
3811 config.level_count = manifest.level_count;
3813 }
3814
3815 let tree_id = get_next_tree_id();
3816
3817 #[cfg(feature = "metrics")]
3818 let metrics = Arc::new(Metrics::default());
3819
3820 let version = Self::recover_levels(
3821 &config.path,
3822 tree_id,
3823 &config,
3824 #[cfg(feature = "metrics")]
3825 &metrics,
3826 )?;
3827
3828 {
3829 let requested_tree_type = match config.kv_separation_opts {
3830 Some(_) => crate::TreeType::Blob,
3831 None => crate::TreeType::Standard,
3832 };
3833
3834 if version.tree_type() != requested_tree_type {
3835 log::error!(
3836 "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
3837 version.tree_type(),
3838 );
3839 return Err(crate::Error::Unrecoverable);
3840 }
3841 }
3842
3843 let highest_table_id = version
3844 .iter_tables()
3845 .map(Table::id)
3846 .max()
3847 .unwrap_or_default();
3848
3849 let comparator = config.comparator.clone();
3850
3851 let deletion_pause = crate::deletion_pause::DeletionPause::new_shared();
3852 #[cfg(feature = "std")]
3853 let background_deleter = Arc::new(crate::BackgroundDeleter::new(None));
3854 let heal_hints =
3855 crate::heal_hints::HealHints::new_shared(config.initial_runtime_config.auto_heal);
3856
3857 let initial_runtime = config.initial_runtime_config.clone();
3861 let sync_mode = config.sync_mode;
3862 let super_versions = SuperVersions::new(
3863 version,
3864 &comparator,
3865 sync_mode,
3866 snapshot_id,
3867 config.manifest_log_rotate_bytes,
3868 );
3869 #[cfg(feature = "std")]
3870 let latest_super_version = super_versions.latest_handle();
3871 let inner = TreeInner {
3872 id: tree_id,
3873 memtable_id_counter: SequenceNumberCounter::new(1),
3874 table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
3875 blob_file_id_counter: SequenceNumberCounter::default(),
3876 version_history: Arc::new(RwLock::new(super_versions)),
3877 #[cfg(feature = "std")]
3878 latest_super_version,
3879 stop_signal: StopSignal::default(),
3880 config: Arc::new(config),
3881 major_compaction_lock: RwLock::default(),
3882 flush_lock: Mutex::default(),
3883 #[cfg(feature = "std")]
3884 _directory_lock: directory_lock,
3885 compaction_state: Arc::new(Mutex::new(CompactionState::default())),
3886 deletion_pause: Arc::clone(&deletion_pause),
3887 #[cfg(feature = "std")]
3888 background_deleter: Arc::clone(&background_deleter),
3889 heal_hints: Arc::clone(&heal_hints),
3890 runtime_config: Arc::new(crate::runtime_config::handle::RuntimeConfigHandle::new(
3891 initial_runtime,
3892 )),
3893 admission_used_cache: Mutex::new(None),
3894
3895 #[cfg(feature = "metrics")]
3896 metrics,
3897 };
3898
3899 let version = inner.version_history.read().latest_version().version;
3906 let recovered_tables: Vec<Table> = version.iter_tables().cloned().collect();
3907 let recovered_blobs: Vec<BlobFile> = version.blob_files.iter().cloned().collect();
3908
3909 for table in &recovered_tables {
3910 table.install_deletion_pause(Arc::clone(&deletion_pause));
3911 #[cfg(feature = "std")]
3912 table.install_background_deleter(Arc::clone(&background_deleter));
3913 table.install_heal_hints(Arc::clone(&heal_hints));
3914 }
3915 for blob_file in &recovered_blobs {
3916 blob_file.install_deletion_pause(Arc::clone(&deletion_pause));
3917 #[cfg(feature = "std")]
3918 blob_file.install_background_deleter(Arc::clone(&background_deleter));
3919 }
3920
3921 Ok(Self(Arc::new(inner)))
3922 }
3923
3924 fn create_new(
3926 config: Config,
3927 #[cfg(feature = "std")] directory_lock: Option<Box<dyn crate::fs::FsFile>>,
3930 ) -> crate::Result<Self> {
3931 use crate::file::fsync_directory;
3932
3933 let path = config.path.clone();
3934 log::trace!("Creating LSM-tree at {}", path.display());
3935
3936 let sync_mode = config.sync_mode;
3937
3938 (*config.fs).create_dir_all(&path)?;
3939
3940 for (table_folder_path, folder_fs) in config.all_tables_folders() {
3945 folder_fs.create_dir_all(&table_folder_path)?;
3946 fsync_directory(&table_folder_path, &*folder_fs, sync_mode)?;
3947 if let Some(parent) = table_folder_path.parent() {
3948 fsync_directory(parent, &*folder_fs, sync_mode)?;
3949 if let Some(grandparent) = parent.parent() {
3950 fsync_directory(grandparent, &*folder_fs, sync_mode)?;
3951 }
3952 }
3953 }
3954
3955 fsync_directory(&path, &*config.fs, sync_mode)?;
3957
3958 let inner = TreeInner::create_new(
3959 config,
3960 #[cfg(feature = "std")]
3961 directory_lock,
3962 )?;
3963 Ok(Self(Arc::new(inner)))
3964 }
3965
3966 #[expect(
3972 clippy::too_many_lines,
3973 reason = "recovery logic is inherently complex"
3974 )]
3975 fn recover_levels<P: AsRef<Path>>(
3976 tree_path: P,
3977 tree_id: TreeId,
3978 config: &Config,
3979 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
3980 ) -> crate::Result<Version> {
3981 use crate::{TableId, file::fsync_directory};
3982
3983 let tree_path = tree_path.as_ref();
3984
3985 let recovery = recover(
3986 tree_path,
3987 &*config.fs,
3988 config.manifest_recovery_mode,
3989 config.encryption.clone(),
3990 )?;
3991
3992 let snapshot_id = recovery.snapshot_id;
3996
3997 let mut table_map = {
3998 let mut result: crate::HashMap<TableId, (u8 , Checksum, SeqNo)> =
3999 crate::HashMap::default();
4000
4001 for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
4002 for run in table_ids {
4003 for table in run {
4004 #[expect(
4005 clippy::expect_used,
4006 reason = "there are always less than 256 levels"
4007 )]
4008 result.insert(
4009 table.id,
4010 (
4011 level_idx
4012 .try_into()
4013 .expect("there are less than 256 levels"),
4014 table.checksum,
4015 table.global_seqno,
4016 ),
4017 );
4018 }
4019 }
4020 }
4021
4022 result
4023 };
4024
4025 let cnt = table_map.len();
4026
4027 log::debug!("Recovering {cnt} tables from {}", tree_path.display());
4028
4029 let progress_mod = match cnt {
4030 _ if cnt <= 20 => 1,
4031 _ if cnt <= 100 => 10,
4032 _ => 100,
4033 };
4034
4035 let mut tables = vec![];
4036 let mut recovered_table_ids: crate::HashSet<TableId> = crate::HashSet::default();
4040 let mut orphaned_tables: Vec<(crate::path::PathBuf, Arc<dyn crate::fs::Fs>)> = vec![];
4041
4042 let all_folders = config.all_tables_folders();
4044
4045 for (table_base_folder, folder_fs) in &all_folders {
4046 if !folder_fs.exists(table_base_folder)? {
4047 folder_fs.create_dir_all(table_base_folder)?;
4048 fsync_directory(table_base_folder, &**folder_fs, config.sync_mode)?;
4049 if let Some(parent) = table_base_folder.parent() {
4050 fsync_directory(parent, &**folder_fs, config.sync_mode)?;
4051 if let Some(grandparent) = parent.parent() {
4052 fsync_directory(grandparent, &**folder_fs, config.sync_mode)?;
4053 }
4054 }
4055 }
4056
4057 for dirent in folder_fs.read_dir(table_base_folder)? {
4058 let crate::fs::FsDirEntry {
4059 path: table_file_path,
4060 file_name,
4061 is_dir,
4062 } = dirent;
4063
4064 if file_name == ".DS_Store" {
4066 continue;
4067 }
4068
4069 if file_name.starts_with("._") {
4071 continue;
4072 }
4073
4074 let table_file_name = &file_name;
4075 if is_dir {
4076 log::warn!(
4077 "Skipping unexpected directory in tables folder: {}",
4078 table_file_path.display()
4079 );
4080 continue;
4081 }
4082
4083 let table_id = table_file_name.parse::<TableId>().map_err(|e| {
4084 log::error!("invalid table file name {table_file_name:?}: {e:?}");
4085 crate::Error::Unrecoverable
4086 })?;
4087
4088 if let Some((level_idx, checksum, global_seqno)) = table_map.remove(&table_id) {
4091 let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
4092 let pin_index = config.index_block_pinning_policy.get(level_idx.into());
4093
4094 let table = Table::recover(
4095 table_file_path,
4096 checksum,
4097 global_seqno,
4098 tree_id,
4099 table_id,
4100 config.cache.clone(),
4101 config.descriptor_table.clone(),
4102 folder_fs.clone(),
4103 pin_filter,
4104 pin_index,
4105 config.encryption.clone(),
4106 #[cfg(zstd_any)]
4107 config.zstd_dictionary.clone(),
4108 config.comparator.clone(),
4109 #[cfg(feature = "metrics")]
4110 metrics.clone(),
4111 )?;
4112
4113 tables.push(table);
4114 recovered_table_ids.insert(table_id);
4115
4116 if tables.len() % progress_mod == 0 {
4117 log::debug!("Recovered {}/{cnt} tables", tables.len());
4118 }
4119 } else if recovered_table_ids.contains(&table_id) {
4120 log::warn!(
4124 "Skipping duplicate sighting of manifest table {table_id} in {}",
4125 table_file_path.display(),
4126 );
4127 } else {
4128 orphaned_tables.push((table_file_path, folder_fs.clone()));
4129 }
4130 }
4131 }
4132
4133 if tables.len() < cnt {
4134 if let Some(routes) = &config.level_routes {
4151 let all_missing_uncovered = table_map
4152 .values()
4153 .all(|(level, _, _)| !routes.iter().any(|r| r.levels.contains(level)));
4154
4155 if all_missing_uncovered {
4156 let found = tables.len();
4157 let missing_ids: Vec<_> = table_map.keys().collect();
4158
4159 log::error!(
4160 "Route mismatch: expected {cnt} tables but found {found} — \
4161 level_routes do not cover all previously used levels. \
4162 Missing table IDs: {missing_ids:?}",
4163 );
4164 return Err(crate::Error::RouteMismatch {
4165 expected: cnt,
4166 found,
4167 });
4168 }
4169 }
4170
4171 log::error!(
4172 "Recovered less tables than expected: {:?}",
4173 table_map.keys(),
4174 );
4175 return Err(crate::Error::Unrecoverable);
4176 }
4177
4178 log::debug!("Successfully recovered {} tables", tables.len());
4179
4180 let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
4181 &tree_path.join(crate::file::BLOBS_FOLDER),
4182 &recovery.blob_file_ids,
4183 tree_id,
4184 config.descriptor_table.as_ref(),
4185 &config.fs,
4186 )?;
4187
4188 let version = Version::from_recovery(recovery, &tables, &blob_files)?;
4189
4190 Self::cleanup_orphaned_version(tree_path, snapshot_id, &*config.fs)?;
4196
4197 for (table_path, orphan_fs) in orphaned_tables {
4198 log::debug!("Deleting orphaned table {}", table_path.display());
4199 orphan_fs.remove_file(&table_path)?;
4200 }
4201
4202 for blob_file_path in orphaned_blob_files {
4203 log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
4204 (*config.fs).remove_file(&blob_file_path)?;
4205 }
4206
4207 Ok(version)
4208 }
4209
4210 fn cleanup_orphaned_version(
4226 path: &Path,
4227 snapshot_id: crate::version::VersionId,
4228 fs: &dyn crate::fs::Fs,
4229 ) -> crate::Result<()> {
4230 let snapshot_str = format!("v{snapshot_id}");
4231 let log_str = format!("edits-{snapshot_id}");
4232
4233 for dirent in fs.read_dir(path)? {
4234 if dirent.is_dir {
4235 continue;
4236 }
4237
4238 let name = &dirent.file_name;
4239 let is_orphan_snapshot = name.starts_with('v') && *name != snapshot_str;
4240 let is_orphan_log = name.starts_with("edits-") && *name != log_str;
4241 if is_orphan_snapshot || is_orphan_log {
4242 log::trace!("Cleanup orphaned manifest file {name}");
4243 match fs.remove_file(&dirent.path) {
4244 Ok(()) => {}
4245 Err(e) if e.kind() == crate::io::ErrorKind::NotFound => {}
4246 Err(e) => return Err(e.into()),
4247 }
4248 }
4249 }
4250
4251 Ok(())
4252 }
4253}
4254
4255fn has_existing_version_state(folder: &Path, fs: &dyn Fs) -> crate::Result<bool> {
4266 if fs.exists(&folder.join(crate::file::TABLES_FOLDER))?
4267 || fs.exists(&folder.join(crate::file::BLOBS_FOLDER))?
4268 {
4269 return Ok(true);
4270 }
4271 let entries = match fs.read_dir(folder) {
4272 Ok(entries) => entries,
4273 Err(e) if e.kind() == crate::io::ErrorKind::NotFound => return Ok(false),
4274 Err(e) => return Err(e.into()),
4275 };
4276 for entry in entries {
4277 let name = &entry.file_name;
4278 if name.starts_with('v') && name.len() > 1 && name[1..].bytes().all(|c| c.is_ascii_digit())
4279 {
4280 return Ok(true);
4281 }
4282 }
4283 Ok(false)
4284}
4285
4286fn effective_lower_bound<'a>(
4293 lo: core::ops::Bound<&'a [u8]>,
4294 restriction: Option<&'a [u8]>,
4295 cmp: &dyn crate::comparator::UserComparator,
4296) -> core::ops::Bound<&'a [u8]> {
4297 use core::cmp::Ordering;
4298 use core::ops::Bound;
4299 match (lo, restriction) {
4300 (Bound::Unbounded, Some(rb)) => Bound::Included(rb),
4301 (Bound::Included(k) | Bound::Excluded(k), Some(rb))
4302 if cmp.compare(rb, k) == Ordering::Greater =>
4303 {
4304 Bound::Included(rb)
4305 }
4306 _ => lo,
4307 }
4308}
4309
4310#[cfg(test)]
4311mod cardinality_tests;
4312
4313#[cfg(all(test, feature = "metrics"))]
4314mod cache_stats_tests;