1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::Decode,
13 file::{fsync_directory, BLOBS_FOLDER},
14 iter_guard::{IterGuard, IterGuardImpl},
15 r#abstract::{AbstractTree, RangeItem},
16 table::Table,
17 tree::inner::MemtableId,
18 value::InternalValue,
19 version::Version,
20 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21 Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
22 UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{
26 io::Cursor,
27 ops::RangeBounds,
28 path::PathBuf,
29 sync::{Arc, MutexGuard},
30};
31
32pub struct Guard {
34 tree: crate::BlobTree,
35 version: Version,
36 kv: crate::Result<InternalValue>,
37}
38
39impl IterGuard for Guard {
40 fn into_inner_if(
41 self,
42 pred: impl Fn(&UserKey) -> bool,
43 ) -> crate::Result<(UserKey, Option<UserValue>)> {
44 let kv = self.kv?;
45
46 if pred(&kv.key.user_key) {
47 resolve_value_handle(
48 self.tree.id(),
49 self.tree.blobs_folder.as_path(),
50 &self.tree.index.config.cache,
51 &self.tree.index.config.descriptor_table,
52 &self.version,
53 kv,
54 )
55 .map(|(k, v)| (k, Some(v)))
56 } else {
57 Ok((kv.key.user_key, None))
58 }
59 }
60
61 fn key(self) -> crate::Result<UserKey> {
62 self.kv.map(|kv| kv.key.user_key)
63 }
64
65 fn size(self) -> crate::Result<u32> {
66 let kv = self.kv?;
67
68 if kv.key.value_type.is_indirection() {
69 let mut cursor = Cursor::new(kv.value);
70 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
71 } else {
72 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
73 Ok(kv.value.len() as u32)
74 }
75 }
76
77 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
78 resolve_value_handle(
79 self.tree.id(),
80 self.tree.blobs_folder.as_path(),
81 &self.tree.index.config.cache,
82 &self.tree.index.config.descriptor_table,
83 &self.version,
84 self.kv?,
85 )
86 }
87}
88
89fn resolve_value_handle(
90 tree_id: TreeId,
91 blobs_folder: &std::path::Path,
92 cache: &Arc<Cache>,
93 descriptor_table: &Arc<DescriptorTable>,
94 version: &Version,
95 item: InternalValue,
96) -> RangeItem {
97 if item.key.value_type.is_indirection() {
98 let mut cursor = Cursor::new(item.value);
99 let vptr = BlobIndirection::decode_from(&mut cursor)?;
100
101 match Accessor::new(&version.blob_files).get(
103 tree_id,
104 blobs_folder,
105 &item.key.user_key,
106 &vptr.vhandle,
107 cache,
108 descriptor_table,
109 ) {
110 Ok(Some(v)) => {
111 let k = item.key.user_key;
112 Ok((k, v))
113 }
114 Ok(None) => {
115 panic!(
116 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
117 item.key.user_key, vptr.vhandle,
118 version.id(),
119 );
120 }
121 Err(e) => Err(e),
122 }
123 } else {
124 let k = item.key.user_key;
125 let v = item.value;
126 Ok((k, v))
127 }
128}
129
130#[derive(Clone)]
136pub struct BlobTree {
137 #[doc(hidden)]
139 pub index: crate::Tree,
140
141 blobs_folder: Arc<PathBuf>,
142}
143
144impl BlobTree {
145 pub(crate) fn open(config: Config) -> crate::Result<Self> {
146 let index = crate::Tree::open(config)?;
147
148 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
149 std::fs::create_dir_all(&blobs_folder)?;
150 fsync_directory(&blobs_folder)?;
151
152 let blob_file_id_to_continue_with = index
153 .current_version()
154 .blob_files
155 .list_ids()
156 .max()
157 .map(|x| x + 1)
158 .unwrap_or_default();
159
160 index
161 .0
162 .blob_file_id_counter
163 .set(blob_file_id_to_continue_with);
164
165 Ok(Self {
166 index,
167 blobs_folder: Arc::new(blobs_folder),
168 })
169 }
170}
171
172impl AbstractTree for BlobTree {
173 fn table_file_cache_size(&self) -> usize {
174 self.index.table_file_cache_size()
175 }
176
177 fn get_version_history_lock(
178 &self,
179 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
180 self.index.get_version_history_lock()
181 }
182
183 fn next_table_id(&self) -> TableId {
184 self.index.next_table_id()
185 }
186
187 fn id(&self) -> crate::TreeId {
188 self.index.id()
189 }
190
191 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
192 self.index.get_internal_entry(key, seqno)
193 }
194
195 fn current_version(&self) -> Version {
196 self.index.current_version()
197 }
198
199 #[cfg(feature = "metrics")]
200 fn metrics(&self) -> &Arc<crate::Metrics> {
201 self.index.metrics()
202 }
203
204 fn version_free_list_len(&self) -> usize {
205 self.index.version_free_list_len()
206 }
207
208 fn prefix<K: AsRef<[u8]>>(
209 &self,
210 prefix: K,
211 seqno: SeqNo,
212 index: Option<(Arc<Memtable>, SeqNo)>,
213 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
214 use crate::range::prefix_to_range;
215
216 let super_version = self.index.get_version_for_snapshot(seqno);
217 let tree = self.clone();
218
219 let range = prefix_to_range(prefix.as_ref());
220
221 Box::new(
222 crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
223 move |kv| {
224 IterGuardImpl::Blob(Guard {
225 tree: tree.clone(),
226 version: super_version.version.clone(),
227 kv,
228 })
229 },
230 ),
231 )
232 }
233
234 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
235 &self,
236 range: R,
237 seqno: SeqNo,
238 index: Option<(Arc<Memtable>, SeqNo)>,
239 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
240 let super_version = self.index.get_version_for_snapshot(seqno);
241 let tree = self.clone();
242
243 Box::new(
244 crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
245 move |kv| {
246 IterGuardImpl::Blob(Guard {
247 tree: tree.clone(),
248 version: super_version.version.clone(),
249 kv,
250 })
251 },
252 ),
253 )
254 }
255
256 fn tombstone_count(&self) -> u64 {
257 self.index.tombstone_count()
258 }
259
260 fn weak_tombstone_count(&self) -> u64 {
261 self.index.weak_tombstone_count()
262 }
263
264 fn weak_tombstone_reclaimable_count(&self) -> u64 {
265 self.index.weak_tombstone_reclaimable_count()
266 }
267
268 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
269 self.index.drop_range(range)
270 }
271
272 fn ingest(
273 &self,
274 iter: impl Iterator<Item = (UserKey, UserValue)>,
275 seqno_generator: &SequenceNumberCounter,
276 visible_seqno: &SequenceNumberCounter,
277 ) -> crate::Result<()> {
278 use crate::tree::ingest::Ingestion;
279 use std::time::Instant;
280
281 let seqno = seqno_generator.next();
282
283 let blob_file_size = self
284 .index
285 .config
286 .kv_separation_opts
287 .as_ref()
288 .expect("kv separation options should exist")
289 .file_target_size;
290
291 let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
292 let mut blob_writer = BlobFileWriter::new(
293 self.index.0.blob_file_id_counter.clone(),
294 blob_file_size,
295 self.index.config.path.join(BLOBS_FOLDER),
296 )?
297 .use_compression(
298 self.index
299 .config
300 .kv_separation_opts
301 .as_ref()
302 .expect("blob options should exist")
303 .compression,
304 );
305
306 let start = Instant::now();
307 let mut count = 0;
308 let mut last_key = None;
309
310 let separation_threshold = self
311 .index
312 .config
313 .kv_separation_opts
314 .as_ref()
315 .expect("kv separation options should exist")
316 .separation_threshold;
317
318 for (key, value) in iter {
319 if let Some(last_key) = &last_key {
320 assert!(
321 key > last_key,
322 "next key in bulk ingest was not greater than last key",
323 );
324 }
325 last_key = Some(key.clone());
326
327 #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
328 let value_size = value.len() as u32;
329
330 if value_size >= separation_threshold {
331 let offset = blob_writer.offset();
332 let blob_file_id = blob_writer.blob_file_id();
333 let on_disk_size = blob_writer.write(&key, seqno, &value)?;
334
335 let indirection = BlobIndirection {
336 vhandle: ValueHandle {
337 blob_file_id,
338 offset,
339 on_disk_size,
340 },
341 size: value_size,
342 };
343
344 table_writer.write_indirection(key, indirection)?;
345 } else {
346 table_writer.write(key, value)?;
347 }
348
349 count += 1;
350 }
351
352 let blob_files = blob_writer.finish()?;
353 let results = table_writer.writer.finish()?;
354
355 let created_tables = results
356 .into_iter()
357 .map(|(table_id, checksum)| -> crate::Result<Table> {
358 Table::recover(
359 self.index
360 .config
361 .path
362 .join(crate::file::TABLES_FOLDER)
363 .join(table_id.to_string()),
364 checksum,
365 self.index.id,
366 self.index.config.cache.clone(),
367 self.index.config.descriptor_table.clone(),
368 false,
369 false,
370 #[cfg(feature = "metrics")]
371 self.index.metrics.clone(),
372 )
373 })
374 .collect::<crate::Result<Vec<_>>>()?;
375
376 self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
377
378 visible_seqno.fetch_max(seqno + 1);
379
380 log::info!("Ingested {count} items in {:?}", start.elapsed());
381
382 Ok(())
383 }
384
385 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
386 self.index.major_compact(target_size, seqno_threshold)
387 }
388
389 fn clear_active_memtable(&self) {
390 self.index.clear_active_memtable();
391 }
392
393 fn l0_run_count(&self) -> usize {
394 self.index.l0_run_count()
395 }
396
397 fn blob_file_count(&self) -> usize {
398 self.current_version().blob_file_count()
399 }
400
401 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
404 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
405 return Ok(None);
406 };
407
408 Ok(Some(if item.key.value_type.is_indirection() {
409 let mut cursor = Cursor::new(item.value);
410 let vptr = BlobIndirection::decode_from(&mut cursor)?;
411 vptr.size
412 } else {
413 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
414 {
415 item.value.len() as u32
416 }
417 }))
418 }
419
420 fn stale_blob_bytes(&self) -> u64 {
421 self.current_version().gc_stats().stale_bytes()
422 }
423
424 fn filter_size(&self) -> usize {
425 self.index.filter_size()
426 }
427
428 fn pinned_filter_size(&self) -> usize {
429 self.index.pinned_filter_size()
430 }
431
432 fn pinned_block_index_size(&self) -> usize {
433 self.index.pinned_block_index_size()
434 }
435
436 fn sealed_memtable_count(&self) -> usize {
437 self.index.sealed_memtable_count()
438 }
439
440 fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
441 self.index.get_flush_lock()
442 }
443
444 fn flush_to_tables(
445 &self,
446 stream: impl Iterator<Item = crate::Result<InternalValue>>,
447 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
448 use crate::{coding::Encode, file::TABLES_FOLDER, table::multi_writer::MultiWriter};
449
450 let start = std::time::Instant::now();
451
452 let table_folder = self.index.config.path.join(TABLES_FOLDER);
453
454 let data_block_size = self.index.config.data_block_size_policy.get(0);
455
456 let data_block_restart_interval =
457 self.index.config.data_block_restart_interval_policy.get(0);
458 let index_block_restart_interval =
459 self.index.config.index_block_restart_interval_policy.get(0);
460
461 let data_block_compression = self.index.config.data_block_compression_policy.get(0);
462 let index_block_compression = self.index.config.index_block_compression_policy.get(0);
463
464 let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
465
466 let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
467 let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
468
469 log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}");
470 log::debug!("=> to table(s) in {}", table_folder.display());
471 log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
472
473 let mut table_writer = MultiWriter::new(
474 table_folder.clone(),
475 self.index.table_id_counter.clone(),
476 64 * 1_024 * 1_024,
477 0,
478 )?
479 .use_data_block_restart_interval(data_block_restart_interval)
480 .use_index_block_restart_interval(index_block_restart_interval)
481 .use_data_block_compression(data_block_compression)
482 .use_index_block_compression(index_block_compression)
483 .use_data_block_size(data_block_size)
484 .use_data_block_hash_ratio(data_block_hash_ratio)
485 .use_bloom_policy({
486 use crate::config::FilterPolicyEntry::{Bloom, None};
487 use crate::table::filter::BloomConstructionPolicy;
488
489 match self.index.config.filter_policy.get(0) {
490 Bloom(policy) => policy,
491 None => BloomConstructionPolicy::BitsPerKey(0.0),
492 }
493 });
494
495 if index_partitioning {
496 table_writer = table_writer.use_partitioned_index();
497 }
498 if filter_partitioning {
499 table_writer = table_writer.use_partitioned_filter();
500 }
501
502 let kv_opts = self
503 .index
504 .config
505 .kv_separation_opts
506 .as_ref()
507 .expect("kv separation options should exist");
508
509 let mut blob_writer = BlobFileWriter::new(
510 self.index.0.blob_file_id_counter.clone(),
511 kv_opts.file_target_size,
512 self.index.config.path.join(BLOBS_FOLDER),
513 )?
514 .use_compression(
515 self.index
516 .config
517 .kv_separation_opts
518 .as_ref()
519 .expect("blob options should exist")
520 .compression,
521 );
522
523 let separation_threshold = kv_opts.separation_threshold;
524
525 for item in stream {
526 let item = item?;
527
528 if item.is_tombstone() {
529 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
532 continue;
533 }
534
535 let value = item.value;
536
537 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
538 let value_size = value.len() as u32;
539
540 if value_size >= separation_threshold {
541 let offset = blob_writer.offset();
542 let blob_file_id = blob_writer.blob_file_id();
543 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
544
545 let indirection = BlobIndirection {
546 vhandle: ValueHandle {
547 blob_file_id,
548 offset,
549 on_disk_size,
550 },
551 size: value_size,
552 };
553
554 table_writer.write({
555 let mut vptr =
556 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
557 vptr.key.value_type = crate::ValueType::Indirection;
558 vptr
559 })?;
560
561 table_writer.register_blob(indirection);
562 } else {
563 table_writer.write(InternalValue::new(item.key, value))?;
564 }
565 }
566
567 let blob_files = blob_writer.finish()?;
568
569 let result = table_writer.finish()?;
570
571 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
572
573 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
574 let pin_index = self.index.config.index_block_pinning_policy.get(0);
575
576 let tables = result
578 .into_iter()
579 .map(|(table_id, checksum)| -> crate::Result<Table> {
580 Table::recover(
581 table_folder.join(table_id.to_string()),
582 checksum,
583 self.index.id,
584 self.index.config.cache.clone(),
585 self.index.config.descriptor_table.clone(),
586 pin_filter,
587 pin_index,
588 #[cfg(feature = "metrics")]
589 self.index.metrics.clone(),
590 )
591 })
592 .collect::<crate::Result<Vec<_>>>()?;
593
594 Ok(Some((tables, Some(blob_files))))
595 }
596
597 fn register_tables(
598 &self,
599 tables: &[Table],
600 blob_files: Option<&[BlobFile]>,
601 frag_map: Option<FragmentationMap>,
602 sealed_memtables_to_delete: &[MemtableId],
603 gc_watermark: SeqNo,
604 ) -> crate::Result<()> {
605 self.index.register_tables(
606 tables,
607 blob_files,
608 frag_map,
609 sealed_memtables_to_delete,
610 gc_watermark,
611 )
612 }
613
614 fn set_active_memtable(&self, memtable: Memtable) {
615 self.index.set_active_memtable(memtable);
616 }
617
618 fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
619 self.index.add_sealed_memtable(memtable);
620 }
621
622 fn compact(
623 &self,
624 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
625 seqno_threshold: SeqNo,
626 ) -> crate::Result<()> {
627 self.index.compact(strategy, seqno_threshold)
628 }
629
630 fn get_next_table_id(&self) -> TableId {
631 self.index.get_next_table_id()
632 }
633
634 fn tree_config(&self) -> &Config {
635 &self.index.config
636 }
637
638 fn get_highest_seqno(&self) -> Option<SeqNo> {
639 self.index.get_highest_seqno()
640 }
641
642 fn active_memtable_size(&self) -> u64 {
643 self.index.active_memtable_size()
644 }
645
646 fn tree_type(&self) -> crate::TreeType {
647 crate::TreeType::Blob
648 }
649
650 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
651 self.index.rotate_memtable()
652 }
653
654 fn table_count(&self) -> usize {
655 self.index.table_count()
656 }
657
658 fn level_table_count(&self, idx: usize) -> Option<usize> {
659 self.index.level_table_count(idx)
660 }
661
662 fn approximate_len(&self) -> usize {
663 self.index.approximate_len()
664 }
665
666 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
669 self.index.is_empty(seqno, index)
670 }
671
672 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
675 self.index.contains_key(key, seqno)
676 }
677
678 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
681 self.index.len(seqno, index)
682 }
683
684 fn disk_space(&self) -> u64 {
685 let version = self.current_version();
686 self.index.disk_space() + version.blob_files.on_disk_size()
687 }
688
689 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
690 self.index.get_highest_memtable_seqno()
691 }
692
693 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
694 self.index.get_highest_persisted_seqno()
695 }
696
697 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
698 &self,
699 key: K,
700 value: V,
701 seqno: SeqNo,
702 ) -> (u64, u64) {
703 self.index.insert(key, value.into(), seqno)
704 }
705
706 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
707 let key = key.as_ref();
708
709 let super_version = self
710 .index
711 .version_history
712 .read()
713 .expect("lock is poisoned")
714 .get_version_for_snapshot(seqno);
715
716 let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)?
717 else {
718 return Ok(None);
719 };
720
721 let (_, v) = resolve_value_handle(
722 self.id(),
723 self.blobs_folder.as_path(),
724 &self.index.config.cache,
725 &self.index.config.descriptor_table,
726 &super_version.version,
727 item,
728 )?;
729
730 Ok(Some(v))
731 }
732
733 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
734 self.index.remove(key, seqno)
735 }
736
737 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
738 self.index.remove_weak(key, seqno)
739 }
740}