1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::Decode,
13 file::{fsync_directory, BLOBS_FOLDER},
14 iter_guard::{IterGuard, IterGuardImpl},
15 r#abstract::{AbstractTree, RangeItem},
16 table::Table,
17 tree::inner::MemtableId,
18 value::InternalValue,
19 version::Version,
20 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21 Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
22 UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{
26 io::Cursor,
27 ops::RangeBounds,
28 path::PathBuf,
29 sync::{Arc, MutexGuard},
30};
31
32pub struct Guard {
33 tree: crate::BlobTree,
34 version: Version,
35 kv: crate::Result<InternalValue>,
36}
37
38impl IterGuard for Guard {
39 fn into_inner_if(
40 self,
41 pred: impl Fn(&UserKey) -> bool,
42 ) -> crate::Result<Option<crate::KvPair>> {
43 let kv = self.kv?;
44
45 if pred(&kv.key.user_key) {
46 resolve_value_handle(
47 self.tree.id(),
48 self.tree.blobs_folder.as_path(),
49 &self.tree.index.config.cache,
50 &self.tree.index.config.descriptor_table,
51 &self.version,
52 kv,
53 )
54 .map(Some)
55 } else {
56 Ok(None)
57 }
58 }
59
60 fn key(self) -> crate::Result<UserKey> {
61 self.kv.map(|kv| kv.key.user_key)
62 }
63
64 fn size(self) -> crate::Result<u32> {
65 let kv = self.kv?;
66
67 if kv.key.value_type.is_indirection() {
68 let mut cursor = Cursor::new(kv.value);
69 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
70 } else {
71 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
72 Ok(kv.value.len() as u32)
73 }
74 }
75
76 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
77 resolve_value_handle(
78 self.tree.id(),
79 self.tree.blobs_folder.as_path(),
80 &self.tree.index.config.cache,
81 &self.tree.index.config.descriptor_table,
82 &self.version,
83 self.kv?,
84 )
85 }
86}
87
88fn resolve_value_handle(
89 tree_id: TreeId,
90 blobs_folder: &std::path::Path,
91 cache: &Arc<Cache>,
92 descriptor_table: &Arc<DescriptorTable>,
93 version: &Version,
94 item: InternalValue,
95) -> RangeItem {
96 if item.key.value_type.is_indirection() {
97 let mut cursor = Cursor::new(item.value);
98 let vptr = BlobIndirection::decode_from(&mut cursor)?;
99
100 match Accessor::new(&version.blob_files).get(
102 tree_id,
103 blobs_folder,
104 &item.key.user_key,
105 &vptr.vhandle,
106 cache,
107 descriptor_table,
108 ) {
109 Ok(Some(v)) => {
110 let k = item.key.user_key;
111 Ok((k, v))
112 }
113 Ok(None) => {
114 panic!(
115 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
116 item.key.user_key, vptr.vhandle,
117 version.id(),
118 );
119 }
120 Err(e) => Err(e),
121 }
122 } else {
123 let k = item.key.user_key;
124 let v = item.value;
125 Ok((k, v))
126 }
127}
128
129#[derive(Clone)]
135pub struct BlobTree {
136 #[doc(hidden)]
138 pub index: crate::Tree,
139
140 blobs_folder: Arc<PathBuf>,
141}
142
143impl BlobTree {
144 pub(crate) fn open(config: Config) -> crate::Result<Self> {
145 let index = crate::Tree::open(config)?;
146
147 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
148 std::fs::create_dir_all(&blobs_folder)?;
149 fsync_directory(&blobs_folder)?;
150
151 let blob_file_id_to_continue_with = index
152 .current_version()
153 .blob_files
154 .list_ids()
155 .max()
156 .map(|x| x + 1)
157 .unwrap_or_default();
158
159 index
160 .0
161 .blob_file_id_counter
162 .set(blob_file_id_to_continue_with);
163
164 Ok(Self {
165 index,
166 blobs_folder: Arc::new(blobs_folder),
167 })
168 }
169}
170
171impl AbstractTree for BlobTree {
172 fn get_version_history_lock(
173 &self,
174 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
175 self.index.get_version_history_lock()
176 }
177
178 fn next_table_id(&self) -> TableId {
179 self.index.next_table_id()
180 }
181
182 fn id(&self) -> crate::TreeId {
183 self.index.id()
184 }
185
186 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
187 self.index.get_internal_entry(key, seqno)
188 }
189
190 fn current_version(&self) -> Version {
191 self.index.current_version()
192 }
193
194 #[cfg(feature = "metrics")]
195 fn metrics(&self) -> &Arc<crate::Metrics> {
196 self.index.metrics()
197 }
198
199 fn version_free_list_len(&self) -> usize {
200 self.index.version_free_list_len()
201 }
202
203 fn prefix<K: AsRef<[u8]>>(
204 &self,
205 prefix: K,
206 seqno: SeqNo,
207 index: Option<Arc<Memtable>>,
208 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
209 use crate::range::prefix_to_range;
210
211 let range = prefix_to_range(prefix.as_ref());
212 let version = self.index.get_version_for_snapshot(seqno).version;
213 let tree = self.clone();
214
215 Box::new(
216 self.index
217 .create_internal_range(&range, seqno, index)
218 .map(move |kv| {
219 IterGuardImpl::Blob(Guard {
220 tree: tree.clone(),
221 version: version.clone(),
222 kv,
223 })
224 }),
225 )
226 }
227
228 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
229 &self,
230 range: R,
231 seqno: SeqNo,
232 index: Option<Arc<Memtable>>,
233 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
234 let version = self.index.get_version_for_snapshot(seqno);
235 let tree = self.clone();
236
237 Box::new(
238 self.index
239 .create_internal_range(&range, seqno, index)
240 .map(move |kv| {
241 IterGuardImpl::Blob(Guard {
242 tree: tree.clone(),
243 version: version.clone().version,
244 kv,
245 })
246 }),
247 )
248 }
249
250 fn tombstone_count(&self) -> u64 {
251 self.index.tombstone_count()
252 }
253
254 fn weak_tombstone_count(&self) -> u64 {
255 self.index.weak_tombstone_count()
256 }
257
258 fn weak_tombstone_reclaimable_count(&self) -> u64 {
259 self.index.weak_tombstone_reclaimable_count()
260 }
261
262 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
263 self.index.drop_range(range)
264 }
265
266 fn ingest(
267 &self,
268 iter: impl Iterator<Item = (UserKey, UserValue)>,
269 seqno_generator: &SequenceNumberCounter,
270 visible_seqno: &SequenceNumberCounter,
271 ) -> crate::Result<()> {
272 use crate::tree::ingest::Ingestion;
273 use std::time::Instant;
274
275 let seqno = seqno_generator.next();
276
277 let blob_file_size = self
278 .index
279 .config
280 .kv_separation_opts
281 .as_ref()
282 .expect("kv separation options should exist")
283 .file_target_size;
284
285 let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
286 let mut blob_writer = BlobFileWriter::new(
287 self.index.0.blob_file_id_counter.clone(),
288 blob_file_size,
289 self.index.config.path.join(BLOBS_FOLDER),
290 )?
291 .use_compression(
292 self.index
293 .config
294 .kv_separation_opts
295 .as_ref()
296 .expect("blob options should exist")
297 .compression,
298 );
299
300 let start = Instant::now();
301 let mut count = 0;
302 let mut last_key = None;
303
304 let separation_threshold = self
305 .index
306 .config
307 .kv_separation_opts
308 .as_ref()
309 .expect("kv separation options should exist")
310 .separation_threshold;
311
312 for (key, value) in iter {
313 if let Some(last_key) = &last_key {
314 assert!(
315 key > last_key,
316 "next key in bulk ingest was not greater than last key",
317 );
318 }
319 last_key = Some(key.clone());
320
321 #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
322 let value_size = value.len() as u32;
323
324 if value_size >= separation_threshold {
325 let offset = blob_writer.offset();
326 let blob_file_id = blob_writer.blob_file_id();
327 let on_disk_size = blob_writer.write(&key, seqno, &value)?;
328
329 let indirection = BlobIndirection {
330 vhandle: ValueHandle {
331 blob_file_id,
332 offset,
333 on_disk_size,
334 },
335 size: value_size,
336 };
337
338 table_writer.write_indirection(key, indirection)?;
339 } else {
340 table_writer.write(key, value)?;
341 }
342
343 count += 1;
344 }
345
346 let blob_files = blob_writer.finish()?;
347 let results = table_writer.writer.finish()?;
348
349 let created_tables = results
350 .into_iter()
351 .map(|(table_id, checksum)| -> crate::Result<Table> {
352 Table::recover(
353 self.index
354 .config
355 .path
356 .join(crate::file::TABLES_FOLDER)
357 .join(table_id.to_string()),
358 checksum,
359 self.index.id,
360 self.index.config.cache.clone(),
361 self.index.config.descriptor_table.clone(),
362 false,
363 false,
364 #[cfg(feature = "metrics")]
365 self.index.metrics.clone(),
366 )
367 })
368 .collect::<crate::Result<Vec<_>>>()?;
369
370 self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
371
372 visible_seqno.fetch_max(seqno + 1);
373
374 log::info!("Ingested {count} items in {:?}", start.elapsed());
375
376 Ok(())
377 }
378
379 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
380 self.index.major_compact(target_size, seqno_threshold)
381 }
382
383 fn clear_active_memtable(&self) {
384 self.index.clear_active_memtable();
385 }
386
387 fn l0_run_count(&self) -> usize {
388 self.index.l0_run_count()
389 }
390
391 fn blob_file_count(&self) -> usize {
392 self.current_version().blob_file_count()
393 }
394
395 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
398 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
399 return Ok(None);
400 };
401
402 Ok(Some(if item.key.value_type.is_indirection() {
403 let mut cursor = Cursor::new(item.value);
404 let vptr = BlobIndirection::decode_from(&mut cursor)?;
405 vptr.size
406 } else {
407 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
408 {
409 item.value.len() as u32
410 }
411 }))
412 }
413
414 fn stale_blob_bytes(&self) -> u64 {
415 self.current_version().gc_stats().stale_bytes()
416 }
417
418 fn filter_size(&self) -> usize {
419 self.index.filter_size()
420 }
421
422 fn pinned_filter_size(&self) -> usize {
423 self.index.pinned_filter_size()
424 }
425
426 fn pinned_block_index_size(&self) -> usize {
427 self.index.pinned_block_index_size()
428 }
429
430 fn sealed_memtable_count(&self) -> usize {
431 self.index.sealed_memtable_count()
432 }
433
434 fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
435 self.index.get_flush_lock()
436 }
437
438 fn flush_to_tables(
439 &self,
440 stream: impl Iterator<Item = crate::Result<InternalValue>>,
441 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
442 use crate::{coding::Encode, file::TABLES_FOLDER, table::multi_writer::MultiWriter};
443
444 let start = std::time::Instant::now();
445
446 let table_folder = self.index.config.path.join(TABLES_FOLDER);
447
448 let data_block_size = self.index.config.data_block_size_policy.get(0);
449
450 let data_block_restart_interval =
451 self.index.config.data_block_restart_interval_policy.get(0);
452 let index_block_restart_interval =
453 self.index.config.index_block_restart_interval_policy.get(0);
454
455 let data_block_compression = self.index.config.data_block_compression_policy.get(0);
456 let index_block_compression = self.index.config.index_block_compression_policy.get(0);
457
458 let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
459
460 let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
461 let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
462
463 log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}");
464 log::debug!("=> to table(s) in {}", table_folder.display());
465 log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
466
467 let mut table_writer = MultiWriter::new(
468 table_folder.clone(),
469 self.index.table_id_counter.clone(),
470 64_000_000,
471 0,
472 )?
473 .use_data_block_restart_interval(data_block_restart_interval)
474 .use_index_block_restart_interval(index_block_restart_interval)
475 .use_data_block_compression(data_block_compression)
476 .use_index_block_compression(index_block_compression)
477 .use_data_block_size(data_block_size)
478 .use_data_block_hash_ratio(data_block_hash_ratio)
479 .use_bloom_policy({
480 use crate::config::FilterPolicyEntry::{Bloom, None};
481 use crate::table::filter::BloomConstructionPolicy;
482
483 match self.index.config.filter_policy.get(0) {
484 Bloom(policy) => policy,
485 None => BloomConstructionPolicy::BitsPerKey(0.0),
486 }
487 });
488
489 if index_partitioning {
490 table_writer = table_writer.use_partitioned_index();
491 }
492 if filter_partitioning {
493 table_writer = table_writer.use_partitioned_filter();
494 }
495
496 let kv_opts = self
497 .index
498 .config
499 .kv_separation_opts
500 .as_ref()
501 .expect("kv separation options should exist");
502
503 let mut blob_writer = BlobFileWriter::new(
504 self.index.0.blob_file_id_counter.clone(),
505 kv_opts.file_target_size,
506 self.index.config.path.join(BLOBS_FOLDER),
507 )?
508 .use_compression(
509 self.index
510 .config
511 .kv_separation_opts
512 .as_ref()
513 .expect("blob options should exist")
514 .compression,
515 );
516
517 let separation_threshold = kv_opts.separation_threshold;
518
519 for item in stream {
520 let item = item?;
521
522 if item.is_tombstone() {
523 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
526 continue;
527 }
528
529 let value = item.value;
530
531 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
532 let value_size = value.len() as u32;
533
534 if value_size >= separation_threshold {
535 let offset = blob_writer.offset();
536 let blob_file_id = blob_writer.blob_file_id();
537 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
538
539 let indirection = BlobIndirection {
540 vhandle: ValueHandle {
541 blob_file_id,
542 offset,
543 on_disk_size,
544 },
545 size: value_size,
546 };
547
548 table_writer.write({
549 let mut vptr =
550 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
551 vptr.key.value_type = crate::ValueType::Indirection;
552 vptr
553 })?;
554
555 table_writer.register_blob(indirection);
556 } else {
557 table_writer.write(InternalValue::new(item.key, value))?;
558 }
559 }
560
561 let blob_files = blob_writer.finish()?;
562
563 let result = table_writer.finish()?;
564
565 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
566
567 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
568 let pin_index = self.index.config.index_block_pinning_policy.get(0);
569
570 let tables = result
572 .into_iter()
573 .map(|(table_id, checksum)| -> crate::Result<Table> {
574 Table::recover(
575 table_folder.join(table_id.to_string()),
576 checksum,
577 self.index.id,
578 self.index.config.cache.clone(),
579 self.index.config.descriptor_table.clone(),
580 pin_filter,
581 pin_index,
582 #[cfg(feature = "metrics")]
583 self.index.metrics.clone(),
584 )
585 })
586 .collect::<crate::Result<Vec<_>>>()?;
587
588 Ok(Some((tables, Some(blob_files))))
589 }
590
591 fn register_tables(
592 &self,
593 tables: &[Table],
594 blob_files: Option<&[BlobFile]>,
595 frag_map: Option<FragmentationMap>,
596 sealed_memtables_to_delete: &[MemtableId],
597 gc_watermark: SeqNo,
598 ) -> crate::Result<()> {
599 self.index.register_tables(
600 tables,
601 blob_files,
602 frag_map,
603 sealed_memtables_to_delete,
604 gc_watermark,
605 )
606 }
607
608 fn set_active_memtable(&self, memtable: Memtable) {
609 self.index.set_active_memtable(memtable);
610 }
611
612 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
613 self.index.add_sealed_memtable(id, memtable);
614 }
615
616 fn compact(
617 &self,
618 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
619 seqno_threshold: SeqNo,
620 ) -> crate::Result<()> {
621 self.index.compact(strategy, seqno_threshold)
622 }
623
624 fn get_next_table_id(&self) -> TableId {
625 self.index.get_next_table_id()
626 }
627
628 fn tree_config(&self) -> &Config {
629 &self.index.config
630 }
631
632 fn get_highest_seqno(&self) -> Option<SeqNo> {
633 self.index.get_highest_seqno()
634 }
635
636 fn active_memtable_size(&self) -> u64 {
637 self.index.active_memtable_size()
638 }
639
640 fn tree_type(&self) -> crate::TreeType {
641 crate::TreeType::Blob
642 }
643
644 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
645 self.index.rotate_memtable()
646 }
647
648 fn table_count(&self) -> usize {
649 self.index.table_count()
650 }
651
652 fn level_table_count(&self, idx: usize) -> Option<usize> {
653 self.index.level_table_count(idx)
654 }
655
656 fn approximate_len(&self) -> usize {
657 self.index.approximate_len()
658 }
659
660 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
663 self.index.is_empty(seqno, index)
664 }
665
666 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
669 self.index.contains_key(key, seqno)
670 }
671
672 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
675 self.index.len(seqno, index)
676 }
677
678 fn disk_space(&self) -> u64 {
679 let version = self.current_version();
680 self.index.disk_space() + version.blob_files.on_disk_size()
681 }
682
683 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
684 self.index.get_highest_memtable_seqno()
685 }
686
687 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
688 self.index.get_highest_persisted_seqno()
689 }
690
691 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
692 &self,
693 key: K,
694 value: V,
695 seqno: SeqNo,
696 ) -> (u64, u64) {
697 self.index.insert(key, value.into(), seqno)
698 }
699
700 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
701 let key = key.as_ref();
702
703 let Some(item) = self.index.get_internal_entry(key, seqno)? else {
704 return Ok(None);
705 };
706
707 let version = self.index.get_version_for_snapshot(seqno).version;
708
709 let (_, v) = resolve_value_handle(
710 self.id(),
711 self.blobs_folder.as_path(),
712 &self.index.config.cache,
713 &self.index.config.descriptor_table,
714 &version,
715 item,
716 )?;
717
718 Ok(Some(v))
719 }
720
721 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
722 self.index.remove(key, seqno)
723 }
724
725 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
726 self.index.remove_weak(key, seqno)
727 }
728}