1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::{fsync_directory, BLOBS_FOLDER},
15 iter_guard::{IterGuard, IterGuardImpl},
16 r#abstract::{AbstractTree, RangeItem},
17 segment::Segment,
18 tree::inner::MemtableId,
19 value::InternalValue,
20 version::Version,
21 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22 Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28 blob_tree: &'a BlobTree,
29 version: Version,
30 kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34 fn key(self) -> crate::Result<UserKey> {
35 self.kv.map(|kv| kv.key.user_key)
36 }
37
38 fn size(self) -> crate::Result<u32> {
39 let kv = self.kv?;
40
41 if kv.key.value_type.is_indirection() {
42 let mut cursor = Cursor::new(kv.value);
43 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44 } else {
45 #[allow(clippy::cast_possible_truncation)]
47 Ok(kv.value.len() as u32)
48 }
49 }
50
51 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52 resolve_value_handle(self.blob_tree, &self.version, self.kv?)
53 }
54}
55
56fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
57 if item.key.value_type.is_indirection() {
58 let mut cursor = Cursor::new(item.value);
59 let vptr = BlobIndirection::decode_from(&mut cursor)?;
60
61 match Accessor::new(&version.value_log).get(
63 tree.id(),
64 &tree.blobs_folder,
65 &item.key.user_key,
66 &vptr.vhandle,
67 &tree.index.config.cache,
68 &tree.index.config.descriptor_table,
69 ) {
70 Ok(Some(v)) => {
71 let k = item.key.user_key;
72 Ok((k, v))
73 }
74 Ok(None) => {
75 panic!(
76 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
77 item.key.user_key, vptr.vhandle,
78 version.id(),
79 );
80 }
81 Err(e) => Err(e),
82 }
83 } else {
84 let k = item.key.user_key;
85 let v = item.value;
86 Ok((k, v))
87 }
88}
89
90#[derive(Clone)]
96pub struct BlobTree {
97 #[doc(hidden)]
99 pub index: crate::Tree,
100
101 blobs_folder: PathBuf,
102}
103
104impl BlobTree {
105 pub(crate) fn open(config: Config) -> crate::Result<Self> {
106 let index = crate::Tree::open(config)?;
107
108 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
109 std::fs::create_dir_all(&blobs_folder)?;
110 fsync_directory(&blobs_folder)?;
111
112 let blob_file_id_to_continue_with = index
113 .current_version()
114 .value_log
115 .values()
116 .map(BlobFile::id)
117 .max()
118 .map(|x| x + 1)
119 .unwrap_or_default();
120
121 index
122 .0
123 .blob_file_id_generator
124 .set(blob_file_id_to_continue_with);
125
126 Ok(Self {
127 index,
128 blobs_folder,
129 })
130 }
131}
132
133impl AbstractTree for BlobTree {
134 fn next_table_id(&self) -> SegmentId {
135 self.index.next_table_id()
136 }
137
138 fn id(&self) -> crate::TreeId {
139 self.index.id()
140 }
141
142 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
143 self.index.get_internal_entry(key, seqno)
144 }
145
146 fn current_version(&self) -> Version {
147 self.index.current_version()
148 }
149
150 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
151 let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
152 return Ok(None);
153 };
154
155 let Some((segment, blob_file)) =
156 self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
157 else {
158 return Ok(None);
159 };
160 self.register_segments(
161 std::slice::from_ref(&segment),
162 blob_file.as_ref().map(std::slice::from_ref),
163 None,
164 eviction_seqno,
165 )?;
166
167 Ok(Some(segment))
168 }
169
170 #[cfg(feature = "metrics")]
171 fn metrics(&self) -> &Arc<crate::Metrics> {
172 self.index.metrics()
173 }
174
175 fn version_free_list_len(&self) -> usize {
176 self.index.version_free_list_len()
177 }
178
179 fn prefix<K: AsRef<[u8]>>(
180 &self,
181 prefix: K,
182 seqno: SeqNo,
183 index: Option<Arc<Memtable>>,
184 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
185 use crate::range::prefix_to_range;
186
187 let range = prefix_to_range(prefix.as_ref());
188
189 let version = self.current_version();
190
191 Box::new(
192 self.index
193 .create_internal_range(&range, seqno, index)
194 .map(move |kv| {
195 IterGuardImpl::Blob(Guard {
196 blob_tree: self,
197 version: version.clone(), kv,
199 })
200 }),
201 )
202 }
203
204 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
205 &self,
206 range: R,
207 seqno: SeqNo,
208 index: Option<Arc<Memtable>>,
209 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
210 let version = self.current_version();
211
212 Box::new(
214 self.index
215 .create_internal_range(&range, seqno, index)
216 .map(move |kv| {
217 IterGuardImpl::Blob(Guard {
218 blob_tree: self,
219 version: version.clone(), kv,
221 })
222 }),
223 )
224 }
225
226 fn tombstone_count(&self) -> u64 {
227 self.index.tombstone_count()
228 }
229
230 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
231 self.index.drop_range(range)
232 }
233
234 fn ingest(
235 &self,
236 iter: impl Iterator<Item = (UserKey, UserValue)>,
237 seqno_generator: &SequenceNumberCounter,
238 visible_seqno: &SequenceNumberCounter,
239 ) -> crate::Result<()> {
240 use crate::tree::ingest::Ingestion;
241 use std::time::Instant;
242
243 todo!();
247
248 Ok(())
308 }
309
310 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
311 self.index.major_compact(target_size, seqno_threshold)
312 }
313
314 fn clear_active_memtable(&self) {
315 self.index.clear_active_memtable();
316 }
317
318 fn l0_run_count(&self) -> usize {
319 self.index.l0_run_count()
320 }
321
322 fn blob_file_count(&self) -> usize {
323 self.current_version().blob_file_count()
324 }
325
326 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
329 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
330 return Ok(None);
331 };
332
333 Ok(Some(if item.key.value_type.is_indirection() {
334 let mut cursor = Cursor::new(item.value);
335 let vptr = BlobIndirection::decode_from(&mut cursor)?;
336 vptr.size
337 } else {
338 #[allow(clippy::cast_possible_truncation)]
340 {
341 item.value.len() as u32
342 }
343 }))
344 }
345
346 fn stale_blob_bytes(&self) -> u64 {
347 self.current_version().gc_stats().stale_bytes()
348 }
349
350 fn filter_size(&self) -> usize {
351 self.index.filter_size()
352 }
353
354 fn pinned_filter_size(&self) -> usize {
355 self.index.pinned_filter_size()
356 }
357
358 fn pinned_block_index_size(&self) -> usize {
359 self.index.pinned_block_index_size()
360 }
361
362 fn sealed_memtable_count(&self) -> usize {
363 self.index.sealed_memtable_count()
364 }
365
366 fn flush_memtable(
367 &self,
368 segment_id: SegmentId,
369 memtable: &Arc<Memtable>,
370 eviction_seqno: SeqNo,
371 ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
372 use crate::{file::SEGMENTS_FOLDER, segment::Writer as SegmentWriter};
373
374 let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
375
376 log::debug!("Flushing memtable & performing key-value separation");
377 log::debug!("=> to LSM table in {}", lsm_segment_folder.display());
378 log::debug!("=> to blob file at {}", self.blobs_folder.display());
379
380 let mut segment_writer =
381 SegmentWriter::new(lsm_segment_folder.join(segment_id.to_string()), segment_id)?
382 .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
384 .use_bloom_policy({
385 use crate::config::FilterPolicyEntry::{Bloom, None};
386 use crate::segment::filter::BloomConstructionPolicy;
387
388 match self.index.config.filter_policy.get(0) {
389 Bloom(policy) => policy,
390 None => BloomConstructionPolicy::BitsPerKey(0.0),
391 }
392 });
393
394 let mut blob_writer = BlobFileWriter::new(
395 self.index.0.blob_file_id_generator.clone(),
396 u64::MAX, self.index.config.path.join(BLOBS_FOLDER),
398 )?
399 .use_compression(
400 self.index
401 .config
402 .kv_separation_opts
403 .as_ref()
404 .expect("blob options should exist")
405 .compression,
406 );
407
408 let iter = memtable.iter().map(Ok);
409 let compaction_filter = CompactionStream::new(iter, eviction_seqno);
410
411 let mut blob_bytes_referenced = 0;
412 let mut blobs_referenced_count = 0;
413
414 let separation_threshold = self
415 .index
416 .config
417 .kv_separation_opts
418 .as_ref()
419 .expect("kv separation options should exist")
420 .separation_threshold;
421
422 for item in compaction_filter {
423 let item = item?;
424
425 if item.is_tombstone() {
426 segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
429 continue;
430 }
431
432 let value = item.value;
433
434 #[allow(clippy::cast_possible_truncation)]
436 let value_size = value.len() as u32;
437
438 if value_size >= separation_threshold {
439 let offset = blob_writer.offset();
440 let blob_file_id = blob_writer.blob_file_id();
441 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
442
443 let indirection = BlobIndirection {
444 vhandle: ValueHandle {
445 blob_file_id,
446 offset,
447 on_disk_size,
448 },
449 size: value_size,
450 };
451
452 segment_writer.write({
453 let mut vptr =
454 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
455 vptr.key.value_type = crate::ValueType::Indirection;
456 vptr
457 })?;
458
459 blob_bytes_referenced += u64::from(value_size);
460 blobs_referenced_count += 1;
461 } else {
462 segment_writer.write(InternalValue::new(item.key, value))?;
463 }
464 }
465
466 log::trace!("Creating blob file");
467 let blob_files = blob_writer.finish()?;
468 assert!(blob_files.len() <= 1);
469 let blob_file = blob_files.into_iter().next();
470
471 log::trace!("Creating LSM-tree segment {segment_id}");
472
473 if blob_bytes_referenced > 0 {
474 if let Some(blob_file) = &blob_file {
475 segment_writer.link_blob_file(
476 blob_file.id(),
477 blob_bytes_referenced,
478 blobs_referenced_count,
479 );
480 }
481 }
482
483 let segment = self.index.consume_writer(segment_writer)?;
484
485 Ok(segment.map(|segment| (segment, blob_file)))
486 }
487
488 fn register_segments(
489 &self,
490 segments: &[Segment],
491 blob_files: Option<&[BlobFile]>,
492 frag_map: Option<FragmentationMap>,
493 seqno_threshold: SeqNo,
494 ) -> crate::Result<()> {
495 self.index
496 .register_segments(segments, blob_files, frag_map, seqno_threshold)
497 }
498
499 fn set_active_memtable(&self, memtable: Memtable) {
500 self.index.set_active_memtable(memtable);
501 }
502
503 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
504 self.index.add_sealed_memtable(id, memtable);
505 }
506
507 fn compact(
508 &self,
509 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
510 seqno_threshold: SeqNo,
511 ) -> crate::Result<()> {
512 self.index.compact(strategy, seqno_threshold)
513 }
514
515 fn get_next_segment_id(&self) -> SegmentId {
516 self.index.get_next_segment_id()
517 }
518
519 fn tree_config(&self) -> &Config {
520 &self.index.config
521 }
522
523 fn get_highest_seqno(&self) -> Option<SeqNo> {
524 self.index.get_highest_seqno()
525 }
526
527 fn active_memtable_size(&self) -> u64 {
528 self.index.active_memtable_size()
529 }
530
531 fn tree_type(&self) -> crate::TreeType {
532 crate::TreeType::Blob
533 }
534
535 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
536 self.index.rotate_memtable()
537 }
538
539 fn segment_count(&self) -> usize {
540 self.index.segment_count()
541 }
542
543 fn level_segment_count(&self, idx: usize) -> Option<usize> {
544 self.index.level_segment_count(idx)
545 }
546
547 fn approximate_len(&self) -> usize {
548 self.index.approximate_len()
549 }
550
551 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
554 self.index.is_empty(seqno, index)
555 }
556
557 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
560 self.index.contains_key(key, seqno)
561 }
562
563 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
566 self.index.len(seqno, index)
567 }
568
569 fn disk_space(&self) -> u64 {
570 let version = self.current_version();
571 let vlog = crate::vlog::Accessor::new(&version.value_log);
572 self.index.disk_space() + vlog.disk_space()
573 }
574
575 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
576 self.index.get_highest_memtable_seqno()
577 }
578
579 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
580 self.index.get_highest_persisted_seqno()
581 }
582
583 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
584 &self,
585 key: K,
586 value: V,
587 seqno: SeqNo,
588 ) -> (u64, u64) {
589 self.index.insert(key, value.into(), seqno)
590 }
591
592 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
593 let key = key.as_ref();
594
595 let Some(item) = self.index.get_internal_entry(key, seqno)? else {
599 return Ok(None);
600 };
601
602 let version = self.current_version();
603 let (_, v) = resolve_value_handle(self, &version, item)?;
604
605 Ok(Some(v))
606 }
607
608 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
609 self.index.remove(key, seqno)
610 }
611
612 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
613 self.index.remove_weak(key, seqno)
614 }
615}