1mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::path::{Path, PathBuf};
13use crate::tree::inner::{FlushGuard, VersionsWriteGuard};
14use crate::{
15 Cache, Config, Memtable, ScanSinceEvent, SeqNo, TableId, TreeId, UserKey, UserValue,
16 abstract_tree::{AbstractTree, RangeItem},
17 coding::Decode,
18 iter_guard::{IterGuard, IterGuardImpl},
19 table::Table,
20 tree::inner::MemtableId,
21 value::InternalValue,
22 version::Version,
23 vlog::{Accessor, BlobFile, BlobFileWriter},
24};
25use alloc::sync::Arc;
26#[cfg(not(feature = "std"))]
27use alloc::{boxed::Box, string::ToString, vec::Vec};
28use core::ops::RangeBounds;
29use handle::BlobIndirection;
30
31pub struct Guard {
33 tree: crate::BlobTree,
34 version: Version,
35 kv: crate::Result<InternalValue>,
36}
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<(UserKey, Option<UserValue>)> {
43 let kv = self.kv?;
44
45 if pred(&kv.key.user_key) {
46 resolve_value_handle(
47 self.tree.id(),
48 self.tree.blobs_folder.as_path(),
49 &self.tree.index.config.cache,
50 &self.version,
51 kv,
52 #[cfg(zstd_any)]
53 self.tree
54 .index
55 .config
56 .kv_separation_opts
57 .as_ref()
58 .and_then(|o| o.zstd_dictionary.as_deref()),
59 )
60 .map(|(k, v)| (k, Some(v)))
61 } else {
62 Ok((kv.key.user_key, None))
63 }
64 }
65
66 fn key(self) -> crate::Result<UserKey> {
67 self.kv.map(|kv| kv.key.user_key)
68 }
69
70 fn size(self) -> crate::Result<u32> {
71 let kv = self.kv?;
72
73 if kv.key.value_type.is_indirection() {
74 let mut cursor = crate::io::Cursor::new(kv.value);
75 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
76 } else {
77 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
78 Ok(kv.value.len() as u32)
79 }
80 }
81
82 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
83 resolve_value_handle(
84 self.tree.id(),
85 self.tree.blobs_folder.as_path(),
86 &self.tree.index.config.cache,
87 &self.version,
88 self.kv?,
89 #[cfg(zstd_any)]
90 self.tree
91 .index
92 .config
93 .kv_separation_opts
94 .as_ref()
95 .and_then(|o| o.zstd_dictionary.as_deref()),
96 )
97 }
98}
99
100fn resolve_value_handle(
101 tree_id: TreeId,
102 blobs_folder: &Path,
103 cache: &Cache,
104 version: &Version,
105 item: InternalValue,
106 #[cfg(zstd_any)] zstd_dictionary: Option<&crate::compression::ZstdDictionary>,
107) -> RangeItem {
108 if item.key.value_type.is_indirection() {
109 let mut cursor = crate::io::Cursor::new(item.value);
110 let vptr = BlobIndirection::decode_from(&mut cursor)?;
111
112 let accessor = {
114 let a = Accessor::new(&version.blob_files);
115 #[cfg(zstd_any)]
116 let a = a.with_dict(zstd_dictionary);
117 a
118 };
119
120 match accessor.get(
121 tree_id,
122 blobs_folder,
123 &item.key.user_key,
124 &vptr.vhandle,
125 cache,
126 ) {
127 Ok(Some(v)) => {
128 let k = item.key.user_key;
129 Ok((k, v))
130 }
131 Ok(None) => {
132 panic!(
133 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
134 item.key.user_key,
135 vptr.vhandle,
136 version.id(),
137 );
138 }
139 Err(e) => Err(e),
140 }
141 } else {
142 let k = item.key.user_key;
143 let v = item.value;
144 Ok((k, v))
145 }
146}
147
148#[derive(Clone)]
154pub struct BlobTree {
155 #[doc(hidden)]
157 pub index: crate::Tree,
158
159 blobs_folder: Arc<PathBuf>,
160}
161
162impl BlobTree {
163 fn full_compaction_blob_need(&self, version: &crate::version::Version) -> crate::Result<u64> {
168 let Some(blob_opts) = &self.index.config.kv_separation_opts else {
169 return Ok(0);
170 };
171 let all_tables: crate::HashSet<TableId> = version.iter_tables().map(Table::id).collect();
172 crate::compaction::worker::pick_blob_files_to_rewrite(&all_tables, version, blob_opts)?
173 .iter()
174 .try_fold(0u64, |acc, bf| bf.physical_size().map(|size| acc + size))
175 }
176
177 pub(crate) fn open(config: Config) -> crate::Result<Self> {
178 use crate::file::{BLOBS_FOLDER, fsync_directory};
179
180 let index = crate::Tree::open(config)?;
181
182 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
183 (*index.config.fs).create_dir_all(&blobs_folder)?;
184 fsync_directory(&blobs_folder, &*index.config.fs, index.config.sync_mode)?;
185
186 let blob_file_id_to_continue_with = index
187 .current_version()
188 .blob_files
189 .list_ids()
190 .max()
191 .map(|x| x + 1)
192 .unwrap_or_default();
193
194 index
195 .0
196 .blob_file_id_counter
197 .set(blob_file_id_to_continue_with);
198
199 Ok(Self {
200 index,
201 blobs_folder: Arc::new(blobs_folder),
202 })
203 }
204
205 fn resolve_key(
207 &self,
208 super_version: &crate::version::SuperVersion,
209 key: &[u8],
210 seqno: SeqNo,
211 ) -> crate::Result<Option<UserValue>> {
212 let Some(item) = crate::Tree::get_internal_entry_from_version(
213 super_version,
214 key,
215 seqno,
216 self.index.config.comparator.as_ref(),
217 )?
218 else {
219 return Ok(None);
220 };
221
222 let (_, v) = resolve_value_handle(
223 self.id(),
224 self.blobs_folder.as_path(),
225 &self.index.config.cache,
226 &super_version.version,
227 item,
228 #[cfg(zstd_any)]
229 self.index
230 .config
231 .kv_separation_opts
232 .as_ref()
233 .and_then(|o| o.zstd_dictionary.as_deref()),
234 )?;
235
236 Ok(Some(v))
237 }
238
239 pub fn scan_since_seqno(
258 &self,
259 target_seqno: SeqNo,
260 ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
261 self.index
262 .scan_since_seqno_with(target_seqno, true, |version, entry| {
263 let seqno = entry.key.seqno;
264 let (key, value) = resolve_value_handle(
265 self.id(),
266 self.blobs_folder.as_path(),
267 &self.index.config.cache,
268 version,
269 entry,
270 #[cfg(zstd_any)]
271 self.index
272 .config
273 .kv_separation_opts
274 .as_ref()
275 .and_then(|o| o.zstd_dictionary.as_deref()),
276 )?;
277 Ok(ScanSinceEvent::Insert { key, value, seqno })
278 })
279 }
280}
281
282impl crate::abstract_tree::sealed::Sealed for BlobTree {}
283
284fn blob_guard(
287 tree: &crate::BlobTree,
288 version: &Version,
289 item: crate::Result<InternalValue>,
290) -> IterGuardImpl {
291 IterGuardImpl::Blob(Guard {
292 tree: tree.clone(),
293 version: version.clone(),
294 kv: item,
295 })
296}
297
298struct BlobSeekable {
301 inner: crate::range::SeekableTreeIter,
302 tree: crate::BlobTree,
303 version: Version,
304}
305
306impl Iterator for BlobSeekable {
307 type Item = IterGuardImpl;
308
309 fn next(&mut self) -> Option<Self::Item> {
310 self.inner
311 .next()
312 .map(|item| blob_guard(&self.tree, &self.version, item))
313 }
314}
315
316impl DoubleEndedIterator for BlobSeekable {
317 fn next_back(&mut self) -> Option<Self::Item> {
318 self.inner
319 .next_back()
320 .map(|item| blob_guard(&self.tree, &self.version, item))
321 }
322}
323
324impl crate::iter_guard::SeekableGuardIter for BlobSeekable {
325 fn seek_to(&mut self, key: &[u8]) {
326 self.inner.seek_to(key);
327 }
328
329 fn seek_to_for_prev(&mut self, key: &[u8]) {
330 self.inner.seek_to_for_prev(key);
331 }
332
333 fn peek_key(&mut self) -> Option<crate::Result<crate::UserKey>> {
334 self.inner.peek_key()
337 }
338}
339
340impl AbstractTree for BlobTree {
341 #[cfg(feature = "std")]
342 fn create_checkpoint(
343 &self,
344 target_path: &crate::path::Path,
345 ) -> crate::Result<crate::CheckpointInfo> {
346 crate::checkpoint::run_checkpoint(
347 self,
348 &crate::checkpoint::CheckpointParams {
349 target_root: target_path,
350 target_fs: &self.index.config.fs,
351 src_root: &self.index.config.path,
352 src_fs: &self.index.config.fs,
353 deletion_pause: &self.index.deletion_pause,
354 visible_seqno: &self.index.config.visible_seqno,
355 include_blobs: true,
356 runtime_config: self.index.0.runtime_config.load_full(),
357 encryption: self.index.0.config.encryption.clone(),
358 },
359 )
360 }
361
362 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
363 self.index.print_trace(key)
364 }
365
366 fn table_file_cache_size(&self) -> usize {
367 self.index.table_file_cache_size()
368 }
369
370 fn get_version_history_lock(&self) -> VersionsWriteGuard<'_> {
371 self.index.get_version_history_lock()
372 }
373
374 fn next_table_id(&self) -> TableId {
375 self.index.next_table_id()
376 }
377
378 fn id(&self) -> crate::TreeId {
379 self.index.id()
380 }
381
382 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
383 self.index.get_internal_entry(key, seqno)
384 }
385
386 fn current_version(&self) -> Version {
387 self.index.current_version()
388 }
389
390 fn storage_stats(&self) -> crate::Result<crate::StorageStats> {
391 let version = self.current_version();
398 let mut stats = crate::storage_stats::compute_storage_stats(
399 &version,
400 self.index.is_compacting(),
401 false,
402 )?;
403 let (capacity, available, compaction_possible) =
408 self.index.admission_capacity(stats.used_bytes);
409 stats.capacity_bytes = capacity;
410 stats.available_bytes = available;
411 stats.compaction_possible = compaction_possible;
412 let blob_need = self.full_compaction_blob_need(&version)?;
419 stats.full_compaction_bytes += blob_need;
420 if self.index.storage_admission_enabled()
424 && capacity.is_some()
425 && stats.status == crate::StorageStatus::Healthy
426 {
427 let sst_need = crate::storage_stats::full_compaction_demand_bytes(&version);
435 let sst_dest_level = self.index.config.level_count.saturating_sub(1);
438 let quota_headroom = self.index.quota_headroom(stats.used_bytes);
439 let full_fits = crate::compaction::worker::space_fits_two_layer(
440 &self.index.config,
441 quota_headroom,
442 sst_need,
443 sst_dest_level,
444 blob_need,
445 );
446 stats.status = if full_fits {
447 crate::StorageStatus::FullCompactionAvailable
448 } else {
449 crate::StorageStatus::TightCompactionAvailable
450 };
451 }
452 if self.index.is_read_only() {
456 stats.status = crate::StorageStatus::ReadOnlyOutOfSpace;
457 }
458 Ok(stats)
459 }
460
461 fn write_admission(&self) -> crate::Result<()> {
462 self.index.write_admission()
468 }
469
470 fn write_backpressure(
471 &self,
472 strategy: &dyn crate::compaction::CompactionStrategy,
473 ) -> crate::Backpressure {
474 self.index.write_backpressure(strategy)
478 }
479
480 #[cfg(feature = "metrics")]
481 fn cache_stats(&self) -> crate::CacheStats {
482 self.index.cache_stats()
483 }
484
485 #[cfg(feature = "metrics")]
486 fn metrics(&self) -> &Arc<crate::Metrics> {
487 self.index.metrics()
488 }
489
490 fn version_free_list_len(&self) -> usize {
491 self.index.version_free_list_len()
492 }
493
494 fn prefix<K: AsRef<[u8]>>(
495 &self,
496 prefix: K,
497 seqno: SeqNo,
498 index: Option<(Arc<Memtable>, SeqNo)>,
499 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
500 use crate::prefix::compute_prefix_hash;
501 use crate::range::prefix_to_range;
502
503 let prefix_bytes = prefix.as_ref();
504
505 let prefix_hash =
506 compute_prefix_hash(self.index.config.prefix_extractor.as_ref(), prefix_bytes);
507
508 let super_version = self.index.get_version_for_snapshot(seqno);
509 let tree = self.clone();
510
511 let range = prefix_to_range(prefix_bytes);
512
513 Box::new(
514 crate::Tree::create_internal_range_with_prefix_hash(
515 super_version.clone(),
516 &range,
517 seqno,
518 index,
519 None, self.index.config.comparator.clone(),
521 prefix_hash,
522 )
523 .map(move |kv| {
524 IterGuardImpl::Blob(Guard {
525 tree: tree.clone(),
526 version: super_version.version.clone(),
527 kv,
528 })
529 }),
530 )
531 }
532
533 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
534 &self,
535 range: R,
536 seqno: SeqNo,
537 index: Option<(Arc<Memtable>, SeqNo)>,
538 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
539 let super_version = self.index.get_version_for_snapshot(seqno);
540 let tree = self.clone();
541
542 Box::new(
543 crate::Tree::create_internal_range(
544 super_version.clone(),
545 &range,
546 seqno,
547 index,
548 None,
549 self.index.config.comparator.clone(),
550 )
551 .map(move |kv| {
552 IterGuardImpl::Blob(Guard {
553 tree: tree.clone(),
554 version: super_version.version.clone(),
555 kv,
556 })
557 }),
558 )
559 }
560
561 fn range_seekable<K: AsRef<[u8]>, R: RangeBounds<K>>(
562 &self,
563 range: R,
564 seqno: SeqNo,
565 index: Option<(Arc<Memtable>, SeqNo)>,
566 ) -> Box<dyn crate::iter_guard::SeekableGuardIter + 'static> {
567 let (lo, hi) = crate::tree::range_to_user_bounds(&range);
568 let inner = self
569 .index
570 .create_seekable_range_bounds(lo, hi, seqno, index);
571 let version = inner.version();
572 Box::new(BlobSeekable {
573 inner,
574 tree: self.clone(),
575 version,
576 })
577 }
578
579 fn batch_range_scan<K: AsRef<[u8]>, R: RangeBounds<K> + 'static, I: IntoIterator<Item = R>>(
580 &self,
581 intervals: I,
582 seqno: SeqNo,
583 index: Option<(Arc<Memtable>, SeqNo)>,
584 ) -> Box<dyn Iterator<Item = IterGuardImpl> + Send + 'static>
585 where
586 I::IntoIter: Send + 'static,
587 {
588 let inner = self.index.create_seekable_range_bounds(
589 core::ops::Bound::Unbounded,
590 core::ops::Bound::Unbounded,
591 seqno,
592 index,
593 );
594 let version = inner.version();
595 let tree = self.clone();
596 let intervals = intervals
597 .into_iter()
598 .map(|r| crate::tree::range_to_user_bounds(&r));
599 Box::new(
600 crate::range::BatchRangeScan::new(inner, intervals)
601 .map(move |item| blob_guard(&tree, &version, item)),
602 )
603 }
604
605 fn tombstone_count(&self) -> u64 {
606 self.index.tombstone_count()
607 }
608
609 fn weak_tombstone_count(&self) -> u64 {
610 self.index.weak_tombstone_count()
611 }
612
613 fn weak_tombstone_reclaimable_count(&self) -> u64 {
614 self.index.weak_tombstone_reclaimable_count()
615 }
616
617 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
618 self.index.drop_range(range)
619 }
620
621 fn clear(&self) -> crate::Result<()> {
622 let config = self.tree_config();
623 let mut versions = self.get_version_history_lock();
624
625 let prior = versions.latest_version();
628
629 versions.upgrade_version(
630 &config.path,
631 |v| {
632 let mut copy = v.clone();
633 copy.active_memtable = Arc::new(Memtable::new(
634 self.index.memtable_id_counter.next(),
635 config.comparator.clone(),
636 ));
637 copy.sealed_memtables = Arc::default();
638 copy.version = Version::new(v.version.id() + 1, self.tree_type());
639 Ok(copy)
640 },
641 &config.seqno,
642 &config.visible_seqno,
643 &*config.fs,
644 self.index.0.runtime_config.load_full(),
645 self.index.0.config.encryption.clone(),
646 )?;
647
648 versions.drain_obsolete_to_latest();
653 drop(versions);
654
655 for table in prior.version.iter_tables() {
656 table.mark_as_deleted();
657 }
658 for blob_file in prior.version.blob_files.iter() {
659 blob_file.mark_as_deleted();
660 }
661
662 Ok(())
663 }
664
665 fn major_compact(
666 &self,
667 target_size: u64,
668 seqno_threshold: SeqNo,
669 ) -> crate::Result<crate::compaction::CompactionResult> {
670 self.index.major_compact(target_size, seqno_threshold)
671 }
672
673 fn clear_active_memtable(&self) {
674 self.index.clear_active_memtable();
675 }
676
677 fn l0_run_count(&self) -> usize {
678 self.index.l0_run_count()
679 }
680
681 fn blob_file_count(&self) -> usize {
682 self.current_version().blob_file_count()
683 }
684
685 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
688 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
689 return Ok(None);
690 };
691
692 Ok(Some(if item.key.value_type.is_indirection() {
693 let mut cursor = crate::io::Cursor::new(item.value);
694 let vptr = BlobIndirection::decode_from(&mut cursor)?;
695 vptr.size
696 } else {
697 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
698 {
699 item.value.len() as u32
700 }
701 }))
702 }
703
704 fn stale_blob_bytes(&self) -> u64 {
705 self.current_version().gc_stats().stale_bytes()
706 }
707
708 fn filter_size(&self) -> u64 {
709 self.index.filter_size()
710 }
711
712 fn pinned_filter_size(&self) -> usize {
713 self.index.pinned_filter_size()
714 }
715
716 fn pinned_block_index_size(&self) -> usize {
717 self.index.pinned_block_index_size()
718 }
719
720 fn sealed_memtable_count(&self) -> usize {
721 self.index.sealed_memtable_count()
722 }
723
724 fn get_flush_lock(&self) -> FlushGuard<'_> {
725 self.index.get_flush_lock()
726 }
727
728 #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")]
729 fn flush_to_tables_with_rt(
730 &self,
731 stream: impl Iterator<Item = crate::Result<InternalValue>>,
732 range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
733 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
734 use crate::{coding::Encode, file::BLOBS_FOLDER, table::multi_writer::MultiWriter};
735
736 let start = crate::time::Instant::now();
737
738 let (table_folder, level_fs) = self.index.config.tables_folder_for_level(0);
739
740 let data_block_size = self.index.config.data_block_size_policy.get(0);
741
742 let data_block_restart_interval =
743 self.index.config.data_block_restart_interval_policy.get(0);
744 let index_block_restart_interval =
745 self.index.config.index_block_restart_interval_policy.get(0);
746
747 let data_block_compression = self.index.config.data_block_compression_policy.get(0);
748 let index_block_compression = self.index.config.index_block_compression_policy.get(0);
749
750 let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
751
752 let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
753 let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
754
755 log::debug!(
756 "Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}"
757 );
758 log::debug!("=> to table(s) in {}", table_folder.display());
759 log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
760
761 let mut table_writer = MultiWriter::new(
762 table_folder.clone(),
763 self.index.table_id_counter.clone(),
764 64 * 1_024 * 1_024,
765 0,
766 level_fs.clone(),
767 )?
768 .set_comparator(self.index.config.comparator.clone())
769 .use_data_block_restart_interval(data_block_restart_interval)
770 .use_index_block_restart_interval(index_block_restart_interval)
771 .use_data_block_compression(data_block_compression)
772 .use_index_block_compression(index_block_compression)
773 .use_data_block_size(data_block_size)
774 .use_data_block_hash_ratio(data_block_hash_ratio)
775 .use_bloom_policy({
776 use crate::config::FilterPolicyEntry::{Bloom, None};
777 use crate::table::filter::BloomConstructionPolicy;
778
779 match self.index.config.filter_policy.get(0) {
780 Bloom(policy) => policy,
781 None => BloomConstructionPolicy::BitsPerKey(0.0),
782 }
783 });
784
785 let rc = self.index.0.runtime_config.load_full();
789
790 if index_partitioning {
791 table_writer = table_writer.use_adaptive_index(rc.index_partition_spill_threshold);
794 }
795 if filter_partitioning {
796 table_writer = table_writer.use_partitioned_filter();
797 }
798
799 table_writer =
800 table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
801 table_writer = table_writer.use_encryption(self.index.config.encryption.clone());
802 table_writer = table_writer.use_page_ecc(self.index.config.page_ecc, rc.ecc_scheme);
806 table_writer = table_writer.use_sync_mode(self.index.config.sync_mode);
807 table_writer = table_writer.use_seqno_in_index(rc.seqno_in_index);
808 table_writer = table_writer.use_zone_map(rc.zone_map);
809 table_writer = table_writer.use_columnar(rc.columnar);
810 table_writer = table_writer.use_disable_cow_on_sst(rc.disable_cow_on_sst_files);
811 table_writer = table_writer.use_kv_checksums(rc.kv_checksums, rc.kv_checksum_algo);
812 table_writer = table_writer.use_locator(self.index.config.locator_policy.get(0));
813
814 #[cfg(zstd_any)]
815 {
816 table_writer =
817 table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
818 }
819
820 #[expect(
821 clippy::expect_used,
822 reason = "cannot create blob tree without defining kv separation options"
823 )]
824 let kv_opts = self
825 .index
826 .config
827 .kv_separation_opts
828 .as_ref()
829 .expect("kv separation options should exist");
830
831 let mut blob_writer = {
832 let w = BlobFileWriter::new(
833 self.index.0.blob_file_id_counter.clone(),
834 self.index.config.path.join(BLOBS_FOLDER),
835 self.id(),
836 self.index.config.descriptor_table.clone(),
837 self.index.config.fs.clone(),
838 )?
839 .use_target_size(kv_opts.file_target_size)
840 .use_compression(kv_opts.compression)
841 .use_sync_mode(self.index.config.sync_mode);
842 #[cfg(zstd_any)]
843 let w = w.use_zstd_dictionary(kv_opts.zstd_dictionary.clone());
844 w
845 };
846
847 let separation_threshold = kv_opts.separation_threshold;
848
849 table_writer.set_range_tombstones(range_tombstones);
853
854 for item in stream {
855 let item = item?;
856
857 if item.is_tombstone() {
858 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
861 continue;
862 }
863
864 let value = item.value;
865
866 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
867 let value_size = value.len() as u32;
868
869 if value_size >= separation_threshold {
870 let vhandle = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
871
872 let indirection = BlobIndirection {
873 vhandle,
874 size: value_size,
875 };
876
877 table_writer.write({
878 let mut vptr =
879 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
880 vptr.key.value_type = crate::ValueType::Indirection;
881 vptr
882 })?;
883
884 table_writer.register_blob(indirection);
885 } else {
886 table_writer.write(InternalValue::new(item.key, value))?;
887 }
888 }
889
890 let blob_files = blob_writer.finish()?;
891
892 let result = table_writer.finish()?;
893
894 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
895
896 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
897 let pin_index = self.index.config.index_block_pinning_policy.get(0);
898
899 let tables = result
901 .into_iter()
902 .map(|(table_id, checksum)| -> crate::Result<Table> {
903 Table::recover(
904 table_folder.join(table_id.to_string()),
905 checksum,
906 0,
907 self.index.id,
908 table_id,
909 self.index.config.cache.clone(),
910 self.index.config.descriptor_table.clone(),
911 level_fs.clone(),
912 pin_filter,
913 pin_index,
914 self.index.config.encryption.clone(),
915 #[cfg(zstd_any)]
916 self.index.config.zstd_dictionary.clone(),
917 self.index.config.comparator.clone(),
918 #[cfg(feature = "metrics")]
919 self.index.metrics.clone(),
920 )
921 })
922 .collect::<crate::Result<Vec<_>>>()?;
923
924 Ok(Some((tables, Some(blob_files))))
928 }
929
930 fn register_tables(
931 &self,
932 tables: &[Table],
933 blob_files: Option<&[BlobFile]>,
934 frag_map: Option<FragmentationMap>,
935 sealed_memtables_to_delete: &[MemtableId],
936 gc_watermark: SeqNo,
937 ) -> crate::Result<()> {
938 self.index.register_tables(
939 tables,
940 blob_files,
941 frag_map,
942 sealed_memtables_to_delete,
943 gc_watermark,
944 )
945 }
946
947 fn compact(
948 &self,
949 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
950 seqno_threshold: SeqNo,
951 ) -> crate::Result<crate::compaction::CompactionResult> {
952 self.index.compact(strategy, seqno_threshold)
953 }
954
955 fn get_next_table_id(&self) -> TableId {
956 self.index.get_next_table_id()
957 }
958
959 fn tree_config(&self) -> &Config {
960 &self.index.config
961 }
962
963 fn get_highest_seqno(&self) -> Option<SeqNo> {
964 self.index.get_highest_seqno()
965 }
966
967 fn active_memtable(&self) -> Arc<Memtable> {
968 self.index.active_memtable()
969 }
970
971 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
972 self.index.rotate_memtable()
973 }
974
975 fn table_count(&self) -> usize {
976 self.index.table_count()
977 }
978
979 fn level_table_count(&self, idx: usize) -> Option<usize> {
980 self.index.level_table_count(idx)
981 }
982
983 fn approximate_len(&self) -> usize {
984 self.index.approximate_len()
985 }
986
987 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
990 self.index.is_empty(seqno, index)
991 }
992
993 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
996 self.index.contains_key(key, seqno)
997 }
998
999 fn contains_prefix<K: AsRef<[u8]>>(
1003 &self,
1004 prefix: K,
1005 seqno: SeqNo,
1006 index: Option<(Arc<Memtable>, SeqNo)>,
1007 ) -> crate::Result<bool> {
1008 self.index.contains_prefix(prefix, seqno, index)
1009 }
1010
1011 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
1014 self.index.len(seqno, index)
1015 }
1016
1017 fn disk_space(&self) -> u64 {
1018 let version = self.current_version();
1019 self.index.disk_space() + version.blob_files.on_disk_size()
1020 }
1021
1022 fn approximate_range_stats<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1023 &self,
1024 range: R,
1025 seqno: SeqNo,
1026 ) -> crate::Result<crate::ApproximateRangeStats> {
1027 self.index.approximate_range_stats(range, seqno)
1038 }
1039
1040 fn approximate_range_cardinality<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1041 &self,
1042 range: R,
1043 seqno: SeqNo,
1044 ) -> crate::Result<crate::RangeCardinality> {
1045 self.index.approximate_range_cardinality(range, seqno)
1049 }
1050
1051 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
1052 self.index.get_highest_memtable_seqno()
1053 }
1054
1055 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
1056 self.index.get_highest_persisted_seqno()
1057 }
1058
1059 fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
1060 self.index.apply_batch(batch, seqno)
1061 }
1062
1063 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
1064 &self,
1065 key: K,
1066 value: V,
1067 seqno: SeqNo,
1068 ) -> (u64, u64) {
1069 self.index.insert(key, value.into(), seqno)
1070 }
1071
1072 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
1073 let super_version = self.index.get_version_for_snapshot(seqno);
1074 self.resolve_key(&super_version, key.as_ref(), seqno)
1075 }
1076
1077 #[expect(
1078 clippy::indexing_slicing,
1079 reason = "indices are generated from 0..n range, always in bounds"
1080 )]
1081 fn multi_get<K: AsRef<[u8]>>(
1082 &self,
1083 keys: impl IntoIterator<Item = K>,
1084 seqno: SeqNo,
1085 ) -> crate::Result<Vec<Option<crate::UserValue>>> {
1086 let keys: Vec<_> = keys.into_iter().collect();
1087 let n = keys.len();
1088 if n == 0 {
1089 return Ok(Vec::new());
1090 }
1091
1092 let super_version = self.index.get_version_for_snapshot(seqno);
1093 let comparator = self.index.config.comparator.as_ref();
1094
1095 if n <= 2 {
1097 return keys
1098 .iter()
1099 .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
1100 .collect();
1101 }
1102
1103 let mut internal_entries: Vec<Option<crate::value::InternalValue>> = vec![None; n];
1105 let mut remaining: Vec<usize> = Vec::with_capacity(n);
1106
1107 for idx in 0..n {
1108 let key = keys[idx].as_ref();
1109 if let Some(entry) = super_version.active_memtable.get(key, seqno) {
1110 internal_entries[idx] = Some(entry);
1111 continue;
1112 }
1113 if let Some(entry) =
1114 crate::Tree::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
1115 {
1116 internal_entries[idx] = Some(entry);
1117 continue;
1118 }
1119 remaining.push(idx);
1120 }
1121
1122 if !remaining.is_empty() {
1124 remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));
1125
1126 let (miss_keys, duplicates) =
1131 crate::Tree::dedup_sorted_miss_keys(&remaining, &keys, comparator);
1132
1133 crate::Tree::batch_get_from_tables(
1134 &super_version.version,
1135 &keys,
1136 miss_keys,
1137 seqno,
1138 comparator,
1139 &*self.index.config.fs,
1140 &mut internal_entries,
1141 )?;
1142
1143 crate::Tree::fan_out_duplicates(&duplicates, &mut internal_entries);
1144 }
1145
1146 let mut results = vec![None; n];
1148 for idx in 0..n {
1149 if let Some(item) = internal_entries[idx].take() {
1150 if item.is_tombstone() {
1151 continue;
1152 }
1153 if crate::Tree::is_suppressed_by_range_tombstones(
1154 &super_version,
1155 keys[idx].as_ref(),
1156 item.key.seqno,
1157 seqno,
1158 comparator,
1159 ) {
1160 continue;
1161 }
1162 if item.key.value_type.is_merge_operand() {
1167 if let Some(merge_op) = &self.index.config.merge_operator {
1168 results[idx] = crate::Tree::resolve_merge_via_pipeline(
1169 super_version.clone(),
1170 keys[idx].as_ref(),
1171 seqno,
1172 Arc::clone(merge_op),
1173 )?;
1174 } else {
1175 results[idx] = Some(item.value);
1176 }
1177 continue;
1178 }
1179 let (_, v) = resolve_value_handle(
1180 self.id(),
1181 self.blobs_folder.as_path(),
1182 &self.index.config.cache,
1183 &super_version.version,
1184 item,
1185 #[cfg(zstd_any)]
1186 self.index
1187 .config
1188 .kv_separation_opts
1189 .as_ref()
1190 .and_then(|o| o.zstd_dictionary.as_deref()),
1191 )?;
1192 results[idx] = Some(v);
1193 }
1194 }
1195
1196 Ok(results)
1197 }
1198
1199 fn merge<K: Into<UserKey>, V: Into<UserValue>>(
1200 &self,
1201 key: K,
1202 operand: V,
1203 seqno: SeqNo,
1204 ) -> (u64, u64) {
1205 self.index.merge(key, operand, seqno)
1206 }
1207
1208 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1209 self.index.remove(key, seqno)
1210 }
1211
1212 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1213 self.index.remove_weak(key, seqno)
1214 }
1215
1216 fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
1217 self.index.remove_range(start, end, seqno)
1218 }
1219}