1mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::{
13 abstract_tree::{AbstractTree, RangeItem},
14 coding::Decode,
15 iter_guard::{IterGuard, IterGuardImpl},
16 table::Table,
17 tree::inner::MemtableId,
18 value::InternalValue,
19 version::Version,
20 vlog::{Accessor, BlobFile, BlobFileWriter},
21 Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue,
22};
23use handle::BlobIndirection;
24use std::{
25 ops::RangeBounds,
26 path::{Path, PathBuf},
27 sync::{Arc, MutexGuard},
28};
29
30pub struct Guard {
32 tree: crate::BlobTree,
33 version: Version,
34 kv: crate::Result<InternalValue>,
35}
36
37impl IterGuard for Guard {
38 fn into_inner_if(
39 self,
40 pred: impl Fn(&UserKey) -> bool,
41 ) -> crate::Result<(UserKey, Option<UserValue>)> {
42 let kv = self.kv?;
43
44 if pred(&kv.key.user_key) {
45 resolve_value_handle(
46 self.tree.id(),
47 self.tree.blobs_folder.as_path(),
48 &self.tree.index.config.cache,
49 &self.version,
50 kv,
51 )
52 .map(|(k, v)| (k, Some(v)))
53 } else {
54 Ok((kv.key.user_key, None))
55 }
56 }
57
58 fn key(self) -> crate::Result<UserKey> {
59 self.kv.map(|kv| kv.key.user_key)
60 }
61
62 fn size(self) -> crate::Result<u32> {
63 let kv = self.kv?;
64
65 if kv.key.value_type.is_indirection() {
66 let mut cursor = std::io::Cursor::new(kv.value);
67 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
68 } else {
69 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
70 Ok(kv.value.len() as u32)
71 }
72 }
73
74 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
75 resolve_value_handle(
76 self.tree.id(),
77 self.tree.blobs_folder.as_path(),
78 &self.tree.index.config.cache,
79 &self.version,
80 self.kv?,
81 )
82 }
83}
84
85fn resolve_value_handle(
86 tree_id: TreeId,
87 blobs_folder: &Path,
88 cache: &Cache,
89 version: &Version,
90 item: InternalValue,
91) -> RangeItem {
92 if item.key.value_type.is_indirection() {
93 let mut cursor = std::io::Cursor::new(item.value);
94 let vptr = BlobIndirection::decode_from(&mut cursor)?;
95
96 match Accessor::new(&version.blob_files).get(
98 tree_id,
99 blobs_folder,
100 &item.key.user_key,
101 &vptr.vhandle,
102 cache,
103 ) {
104 Ok(Some(v)) => {
105 let k = item.key.user_key;
106 Ok((k, v))
107 }
108 Ok(None) => {
109 panic!(
110 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
111 item.key.user_key, vptr.vhandle,
112 version.id(),
113 );
114 }
115 Err(e) => Err(e),
116 }
117 } else {
118 let k = item.key.user_key;
119 let v = item.value;
120 Ok((k, v))
121 }
122}
123
124#[derive(Clone)]
130pub struct BlobTree {
131 #[doc(hidden)]
133 pub index: crate::Tree,
134
135 blobs_folder: Arc<PathBuf>,
136}
137
138impl BlobTree {
139 pub(crate) fn open(config: Config) -> crate::Result<Self> {
140 use crate::file::{fsync_directory, BLOBS_FOLDER};
141 use crate::fs::Fs;
142
143 let index = crate::Tree::open(config)?;
144
145 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
146 (*index.config.fs).create_dir_all(&blobs_folder)?;
147 fsync_directory(&blobs_folder, &*index.config.fs)?;
148
149 let blob_file_id_to_continue_with = index
150 .current_version()
151 .blob_files
152 .list_ids()
153 .max()
154 .map(|x| x + 1)
155 .unwrap_or_default();
156
157 index
158 .0
159 .blob_file_id_counter
160 .set(blob_file_id_to_continue_with);
161
162 Ok(Self {
163 index,
164 blobs_folder: Arc::new(blobs_folder),
165 })
166 }
167
168 fn resolve_key(
170 &self,
171 super_version: &crate::version::SuperVersion,
172 key: &[u8],
173 seqno: SeqNo,
174 ) -> crate::Result<Option<UserValue>> {
175 let Some(item) = crate::Tree::get_internal_entry_from_version(
176 super_version,
177 key,
178 seqno,
179 self.index.config.comparator.as_ref(),
180 )?
181 else {
182 return Ok(None);
183 };
184
185 let (_, v) = resolve_value_handle(
186 self.id(),
187 self.blobs_folder.as_path(),
188 &self.index.config.cache,
189 &super_version.version,
190 item,
191 )?;
192
193 Ok(Some(v))
194 }
195}
196
197impl crate::abstract_tree::sealed::Sealed for BlobTree {}
198
199impl AbstractTree for BlobTree {
200 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
201 self.index.print_trace(key)
202 }
203
204 fn table_file_cache_size(&self) -> usize {
205 self.index.table_file_cache_size()
206 }
207
208 fn get_version_history_lock(
209 &self,
210 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
211 self.index.get_version_history_lock()
212 }
213
214 fn next_table_id(&self) -> TableId {
215 self.index.next_table_id()
216 }
217
218 fn id(&self) -> crate::TreeId {
219 self.index.id()
220 }
221
222 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
223 self.index.get_internal_entry(key, seqno)
224 }
225
226 fn current_version(&self) -> Version {
227 self.index.current_version()
228 }
229
230 #[cfg(feature = "metrics")]
231 fn metrics(&self) -> &Arc<crate::Metrics> {
232 self.index.metrics()
233 }
234
235 fn version_free_list_len(&self) -> usize {
236 self.index.version_free_list_len()
237 }
238
239 fn prefix<K: AsRef<[u8]>>(
240 &self,
241 prefix: K,
242 seqno: SeqNo,
243 index: Option<(Arc<Memtable>, SeqNo)>,
244 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
245 use crate::prefix::compute_prefix_hash;
246 use crate::range::prefix_to_range;
247
248 let prefix_bytes = prefix.as_ref();
249
250 let prefix_hash =
251 compute_prefix_hash(self.index.config.prefix_extractor.as_ref(), prefix_bytes);
252
253 let super_version = self.index.get_version_for_snapshot(seqno);
254 let tree = self.clone();
255
256 let range = prefix_to_range(prefix_bytes);
257
258 Box::new(
259 crate::Tree::create_internal_range_with_prefix_hash(
260 super_version.clone(),
261 &range,
262 seqno,
263 index,
264 None, self.index.config.comparator.clone(),
266 prefix_hash,
267 )
268 .map(move |kv| {
269 IterGuardImpl::Blob(Guard {
270 tree: tree.clone(),
271 version: super_version.version.clone(),
272 kv,
273 })
274 }),
275 )
276 }
277
278 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
279 &self,
280 range: R,
281 seqno: SeqNo,
282 index: Option<(Arc<Memtable>, SeqNo)>,
283 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
284 let super_version = self.index.get_version_for_snapshot(seqno);
285 let tree = self.clone();
286
287 Box::new(
288 crate::Tree::create_internal_range(
289 super_version.clone(),
290 &range,
291 seqno,
292 index,
293 None,
294 self.index.config.comparator.clone(),
295 )
296 .map(move |kv| {
297 IterGuardImpl::Blob(Guard {
298 tree: tree.clone(),
299 version: super_version.version.clone(),
300 kv,
301 })
302 }),
303 )
304 }
305
306 fn tombstone_count(&self) -> u64 {
307 self.index.tombstone_count()
308 }
309
310 fn weak_tombstone_count(&self) -> u64 {
311 self.index.weak_tombstone_count()
312 }
313
314 fn weak_tombstone_reclaimable_count(&self) -> u64 {
315 self.index.weak_tombstone_reclaimable_count()
316 }
317
318 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
319 self.index.drop_range(range)
320 }
321
322 fn clear(&self) -> crate::Result<()> {
323 self.index.clear()
324 }
325
326 fn major_compact(
327 &self,
328 target_size: u64,
329 seqno_threshold: SeqNo,
330 ) -> crate::Result<crate::compaction::CompactionResult> {
331 self.index.major_compact(target_size, seqno_threshold)
332 }
333
334 fn clear_active_memtable(&self) {
335 self.index.clear_active_memtable();
336 }
337
338 fn l0_run_count(&self) -> usize {
339 self.index.l0_run_count()
340 }
341
342 fn blob_file_count(&self) -> usize {
343 self.current_version().blob_file_count()
344 }
345
346 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
349 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
350 return Ok(None);
351 };
352
353 Ok(Some(if item.key.value_type.is_indirection() {
354 let mut cursor = std::io::Cursor::new(item.value);
355 let vptr = BlobIndirection::decode_from(&mut cursor)?;
356 vptr.size
357 } else {
358 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
359 {
360 item.value.len() as u32
361 }
362 }))
363 }
364
365 fn stale_blob_bytes(&self) -> u64 {
366 self.current_version().gc_stats().stale_bytes()
367 }
368
369 fn filter_size(&self) -> u64 {
370 self.index.filter_size()
371 }
372
373 fn pinned_filter_size(&self) -> usize {
374 self.index.pinned_filter_size()
375 }
376
377 fn pinned_block_index_size(&self) -> usize {
378 self.index.pinned_block_index_size()
379 }
380
381 fn sealed_memtable_count(&self) -> usize {
382 self.index.sealed_memtable_count()
383 }
384
385 fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
386 self.index.get_flush_lock()
387 }
388
389 #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")]
390 fn flush_to_tables_with_rt(
391 &self,
392 stream: impl Iterator<Item = crate::Result<InternalValue>>,
393 range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
394 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
395 use crate::{
396 coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER,
397 table::multi_writer::MultiWriter,
398 };
399
400 let start = std::time::Instant::now();
401
402 let table_folder = self.index.config.path.join(TABLES_FOLDER);
403
404 let data_block_size = self.index.config.data_block_size_policy.get(0);
405
406 let data_block_restart_interval =
407 self.index.config.data_block_restart_interval_policy.get(0);
408 let index_block_restart_interval =
409 self.index.config.index_block_restart_interval_policy.get(0);
410
411 let data_block_compression = self.index.config.data_block_compression_policy.get(0);
412 let index_block_compression = self.index.config.index_block_compression_policy.get(0);
413
414 let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
415
416 let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
417 let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
418
419 log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}");
420 log::debug!("=> to table(s) in {}", table_folder.display());
421 log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
422
423 let mut table_writer = MultiWriter::new(
424 table_folder.clone(),
425 self.index.table_id_counter.clone(),
426 64 * 1_024 * 1_024,
427 0,
428 self.index.config.fs.clone(),
429 )?
430 .use_data_block_restart_interval(data_block_restart_interval)
431 .use_index_block_restart_interval(index_block_restart_interval)
432 .use_data_block_compression(data_block_compression)
433 .use_index_block_compression(index_block_compression)
434 .use_data_block_size(data_block_size)
435 .use_data_block_hash_ratio(data_block_hash_ratio)
436 .use_bloom_policy({
437 use crate::config::FilterPolicyEntry::{Bloom, None};
438 use crate::table::filter::BloomConstructionPolicy;
439
440 match self.index.config.filter_policy.get(0) {
441 Bloom(policy) => policy,
442 None => BloomConstructionPolicy::BitsPerKey(0.0),
443 }
444 });
445
446 if index_partitioning {
447 table_writer = table_writer.use_partitioned_index();
448 }
449 if filter_partitioning {
450 table_writer = table_writer.use_partitioned_filter();
451 }
452
453 table_writer =
454 table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
455 table_writer = table_writer.use_encryption(self.index.config.encryption.clone());
456
457 #[cfg(feature = "zstd")]
458 {
459 table_writer =
460 table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
461 }
462
463 #[expect(
464 clippy::expect_used,
465 reason = "cannot create blob tree without defining kv separation options"
466 )]
467 let kv_opts = self
468 .index
469 .config
470 .kv_separation_opts
471 .as_ref()
472 .expect("kv separation options should exist");
473
474 let mut blob_writer = BlobFileWriter::new(
475 self.index.0.blob_file_id_counter.clone(),
476 self.index.config.path.join(BLOBS_FOLDER),
477 self.id(),
478 self.index.config.descriptor_table.clone(),
479 self.index.config.fs.clone(),
480 )?
481 .use_target_size(kv_opts.file_target_size)
482 .use_compression(kv_opts.compression);
483
484 let separation_threshold = kv_opts.separation_threshold;
485
486 table_writer.set_range_tombstones(range_tombstones);
490
491 for item in stream {
492 let item = item?;
493
494 if item.is_tombstone() {
495 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
498 continue;
499 }
500
501 let value = item.value;
502
503 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
504 let value_size = value.len() as u32;
505
506 if value_size >= separation_threshold {
507 let vhandle = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
508
509 let indirection = BlobIndirection {
510 vhandle,
511 size: value_size,
512 };
513
514 table_writer.write({
515 let mut vptr =
516 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
517 vptr.key.value_type = crate::ValueType::Indirection;
518 vptr
519 })?;
520
521 table_writer.register_blob(indirection);
522 } else {
523 table_writer.write(InternalValue::new(item.key, value))?;
524 }
525 }
526
527 let blob_files = blob_writer.finish()?;
528
529 let result = table_writer.finish()?;
530
531 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
532
533 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
534 let pin_index = self.index.config.index_block_pinning_policy.get(0);
535
536 let tables = result
538 .into_iter()
539 .map(|(table_id, checksum)| -> crate::Result<Table> {
540 Table::recover(
541 table_folder.join(table_id.to_string()),
542 checksum,
543 0,
544 self.index.id,
545 self.index.config.cache.clone(),
546 self.index.config.descriptor_table.clone(),
547 pin_filter,
548 pin_index,
549 self.index.config.encryption.clone(),
550 #[cfg(feature = "zstd")]
551 self.index.config.zstd_dictionary.clone(),
552 self.index.config.comparator.clone(),
553 #[cfg(feature = "metrics")]
554 self.index.metrics.clone(),
555 )
556 })
557 .collect::<crate::Result<Vec<_>>>()?;
558
559 Ok(Some((tables, Some(blob_files))))
563 }
564
565 fn register_tables(
566 &self,
567 tables: &[Table],
568 blob_files: Option<&[BlobFile]>,
569 frag_map: Option<FragmentationMap>,
570 sealed_memtables_to_delete: &[MemtableId],
571 gc_watermark: SeqNo,
572 ) -> crate::Result<()> {
573 self.index.register_tables(
574 tables,
575 blob_files,
576 frag_map,
577 sealed_memtables_to_delete,
578 gc_watermark,
579 )
580 }
581
582 fn compact(
583 &self,
584 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
585 seqno_threshold: SeqNo,
586 ) -> crate::Result<crate::compaction::CompactionResult> {
587 self.index.compact(strategy, seqno_threshold)
588 }
589
590 fn get_next_table_id(&self) -> TableId {
591 self.index.get_next_table_id()
592 }
593
594 fn tree_config(&self) -> &Config {
595 &self.index.config
596 }
597
598 fn get_highest_seqno(&self) -> Option<SeqNo> {
599 self.index.get_highest_seqno()
600 }
601
602 fn active_memtable(&self) -> Arc<Memtable> {
603 self.index.active_memtable()
604 }
605
606 fn tree_type(&self) -> crate::TreeType {
607 crate::TreeType::Blob
608 }
609
610 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
611 self.index.rotate_memtable()
612 }
613
614 fn table_count(&self) -> usize {
615 self.index.table_count()
616 }
617
618 fn level_table_count(&self, idx: usize) -> Option<usize> {
619 self.index.level_table_count(idx)
620 }
621
622 fn approximate_len(&self) -> usize {
623 self.index.approximate_len()
624 }
625
626 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
629 self.index.is_empty(seqno, index)
630 }
631
632 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
635 self.index.contains_key(key, seqno)
636 }
637
638 fn contains_prefix<K: AsRef<[u8]>>(
642 &self,
643 prefix: K,
644 seqno: SeqNo,
645 index: Option<(Arc<Memtable>, SeqNo)>,
646 ) -> crate::Result<bool> {
647 self.index.contains_prefix(prefix, seqno, index)
648 }
649
650 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
653 self.index.len(seqno, index)
654 }
655
656 fn disk_space(&self) -> u64 {
657 let version = self.current_version();
658 self.index.disk_space() + version.blob_files.on_disk_size()
659 }
660
661 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
662 self.index.get_highest_memtable_seqno()
663 }
664
665 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
666 self.index.get_highest_persisted_seqno()
667 }
668
669 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
670 &self,
671 key: K,
672 value: V,
673 seqno: SeqNo,
674 ) -> (u64, u64) {
675 self.index.insert(key, value.into(), seqno)
676 }
677
678 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
679 let super_version = self.index.get_version_for_snapshot(seqno);
680 self.resolve_key(&super_version, key.as_ref(), seqno)
681 }
682
683 fn multi_get<K: AsRef<[u8]>>(
684 &self,
685 keys: impl IntoIterator<Item = K>,
686 seqno: SeqNo,
687 ) -> crate::Result<Vec<Option<crate::UserValue>>> {
688 let super_version = self.index.get_version_for_snapshot(seqno);
689
690 keys.into_iter()
691 .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
692 .collect()
693 }
694
695 fn merge<K: Into<UserKey>, V: Into<UserValue>>(
696 &self,
697 key: K,
698 operand: V,
699 seqno: SeqNo,
700 ) -> (u64, u64) {
701 self.index.merge(key, operand, seqno)
702 }
703
704 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
705 self.index.remove(key, seqno)
706 }
707
708 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
709 self.index.remove_weak(key, seqno)
710 }
711
712 fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
713 self.index.remove_range(start, end, seqno)
714 }
715}