1mod gc;
6pub mod index;
7pub mod value;
8
9use crate::{
10 coding::{Decode, Encode},
11 compaction::stream::CompactionStream,
12 file::{fsync_directory, BLOBS_FOLDER},
13 iter_guard::{IterGuard, IterGuardImpl},
14 r#abstract::{AbstractTree, RangeItem},
15 segment::Segment,
16 tree::inner::MemtableId,
17 value::InternalValue,
18 vlog::{Accessor, BlobFile, BlobFileId, BlobFileWriter, ValueHandle, ValueLog},
19 Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
20};
21use gc::{reader::GcReader, writer::GcWriter};
22use index::IndexTree;
23use std::{
24 collections::BTreeMap,
25 io::Cursor,
26 ops::{RangeBounds, RangeFull},
27 path::PathBuf,
28 sync::{
29 atomic::{AtomicU64, AtomicUsize},
30 Arc,
31 },
32};
33use value::MaybeInlineValue;
34
35pub struct Guard<'a>(
36 &'a BlobTree,
37 Arc<BTreeMap<BlobFileId, BlobFile>>,
38 crate::Result<(UserKey, UserValue)>,
39);
40
41impl IterGuard for Guard<'_> {
42 fn key(self) -> crate::Result<UserKey> {
43 self.2.map(|(k, _)| k)
44 }
45
46 fn size(self) -> crate::Result<u32> {
47 use MaybeInlineValue::{Indirect, Inline};
48
49 let value = self.2?.1;
50 let mut cursor = Cursor::new(value);
51
52 Ok(match MaybeInlineValue::decode_from(&mut cursor)? {
53 #[allow(clippy::cast_possible_truncation)]
55 Inline(bytes) => bytes.len() as u32,
56
57 Indirect { size, .. } => size,
59 })
60 }
61
62 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63 resolve_value_handle(self.0, &self.1, self.2)
64 }
65}
66
67fn resolve_value_handle(
68 tree: &BlobTree,
69 vlog: &BTreeMap<BlobFileId, BlobFile>,
70 item: RangeItem,
71) -> RangeItem {
72 use MaybeInlineValue::{Indirect, Inline};
73
74 match item {
75 Ok((key, value)) => {
76 let mut cursor = Cursor::new(value);
77
78 match MaybeInlineValue::decode_from(&mut cursor)? {
79 Inline(bytes) => Ok((key, bytes)),
80 Indirect { vhandle, .. } => {
81 match Accessor::new(vlog).get(
83 &tree.blobs_folder,
84 &key,
85 &vhandle,
86 &tree.index.config.cache,
87 &tree.index.config.descriptor_table,
88 ) {
89 Ok(Some(bytes)) => Ok((key, bytes)),
90 Err(e) => Err(e),
91 _ => {
92 panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
93 }
94 }
95 }
96 }
97 }
98 Err(e) => Err(e),
99 }
100}
101
102#[derive(Clone)]
108pub struct BlobTree {
109 #[doc(hidden)]
111 pub index: IndexTree,
112
113 blobs_folder: PathBuf,
114
115 #[doc(hidden)]
117 pub pending_segments: Arc<AtomicUsize>,
118
119 blob_file_id_generator: SequenceNumberCounter,
120}
121
122impl BlobTree {
123 pub(crate) fn open(config: Config) -> crate::Result<Self> {
124 let index: IndexTree = config.open()?.into();
133
134 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
135 std::fs::create_dir_all(&blobs_folder)?;
136 fsync_directory(&blobs_folder)?;
137
138 let blob_file_id_to_continue_with = index
139 .manifest
140 .read()
141 .expect("lock is poisoned")
142 .current_version()
143 .value_log
144 .values()
145 .map(BlobFile::id)
146 .max()
147 .map(|x| x + 1)
148 .unwrap_or_default();
149
150 Ok(Self {
151 index,
152 blobs_folder,
153 pending_segments: Arc::new(AtomicUsize::new(0)),
154 blob_file_id_generator: SequenceNumberCounter::new(blob_file_id_to_continue_with),
155 })
156 }
157
158 #[must_use]
159 pub fn space_amp(&self) -> f32 {
160 todo!()
161 }
162
163 fn consume_blob_file_writer(writer: BlobFileWriter) -> crate::Result<Vec<BlobFile>> {
173 use crate::vlog::blob_file::{GcStats, Inner as BlobFileInner, Metadata};
174
175 let writers = writer.finish()?;
176
177 let mut blob_files = Vec::with_capacity(writers.len());
178
179 for writer in writers {
180 if writer.item_count == 0 {
181 log::debug!(
182 "Blob file writer at {} has written no data, deleting empty blob file",
183 writer.path.display(),
184 );
185 if let Err(e) = std::fs::remove_file(&writer.path) {
186 log::warn!(
187 "Could not delete empty blob file at {}: {e:?}",
188 writer.path.display(),
189 );
190 }
191 continue;
192 }
193
194 let blob_file_id = writer.blob_file_id;
195
196 blob_files.push(BlobFile(Arc::new(BlobFileInner {
197 id: blob_file_id,
198 path: writer.path,
199 meta: Metadata {
200 item_count: writer.item_count,
201 compressed_bytes: writer.written_blob_bytes,
202 total_uncompressed_bytes: writer.uncompressed_bytes,
203
204 #[allow(clippy::expect_used)]
207 key_range: crate::KeyRange::new((
208 writer
209 .first_key
210 .clone()
211 .expect("should have written at least 1 item"),
212 writer
213 .last_key
214 .clone()
215 .expect("should have written at least 1 item"),
216 )),
217 },
218 gc_stats: GcStats::default(),
219 })));
220
221 log::debug!(
222 "Created blob file #{blob_file_id:?} ({} items, {} userdata bytes)",
223 writer.item_count,
224 writer.uncompressed_bytes,
225 );
226 }
227
228 Ok(blob_files)
229 }
230
231 #[doc(hidden)]
233 pub fn gc_scan_stats(
234 &self,
235 seqno: SeqNo,
236 gc_watermark: SeqNo,
237 ) -> crate::Result<crate::gc::Report> {
238 use std::io::Error as IoError;
239 use MaybeInlineValue::{Indirect, Inline};
240
241 todo!()
242
243 }
322
323 pub fn apply_gc_strategy(
324 &self,
325 strategy: &impl crate::vlog::GcStrategy,
326 seqno: SeqNo,
327 ) -> crate::Result<u64> {
328 todo!()
329
330 }
342
343 #[doc(hidden)]
345 pub fn gc_drop_stale(&self) -> crate::Result<u64> {
346 todo!()
347
348 }
353
354 #[doc(hidden)]
355 pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
356 let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
357 return Ok(None);
358 };
359
360 let Some((segment, blob_file)) =
361 self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
362 else {
363 return Ok(None);
364 };
365 self.register_segments(
366 std::slice::from_ref(&segment),
367 blob_file.as_ref().map(std::slice::from_ref),
368 eviction_seqno,
369 )?;
370
371 Ok(Some(segment))
372 }
373}
374
375impl AbstractTree for BlobTree {
376 #[cfg(feature = "metrics")]
377 fn metrics(&self) -> &Arc<crate::Metrics> {
378 self.index.metrics()
379 }
380
381 fn version_free_list_len(&self) -> usize {
382 self.index.version_free_list_len()
383 }
384
385 fn prefix<K: AsRef<[u8]>>(
386 &self,
387 prefix: K,
388 seqno: SeqNo,
389 index: Option<Arc<Memtable>>,
390 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
391 let version = self
392 .index
393 .manifest
394 .read()
395 .expect("lock is poisoned")
396 .current_version()
397 .clone();
398
399 Box::new(
401 self.index
402 .0
403 .create_prefix(&prefix, seqno, index)
404 .map(move |kv| IterGuardImpl::Blob(Guard(self, version.value_log.clone(), kv))),
405 )
406 }
407
408 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
409 &self,
410 range: R,
411 seqno: SeqNo,
412 index: Option<Arc<Memtable>>,
413 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
414 let version = self
415 .index
416 .manifest
417 .read()
418 .expect("lock is poisoned")
419 .current_version()
420 .clone();
421
422 Box::new(
424 self.index
425 .0
426 .create_range(&range, seqno, index)
427 .map(move |kv| IterGuardImpl::Blob(Guard(self, version.value_log.clone(), kv))),
428 )
429 }
430
431 fn tombstone_count(&self) -> u64 {
432 self.index.tombstone_count()
433 }
434
435 fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
436 self.index.drop_range(key_range)
437 }
438
439 fn ingest(
440 &self,
441 iter: impl Iterator<Item = (UserKey, UserValue)>,
442 seqno_generator: &SequenceNumberCounter,
443 visible_seqno: &SequenceNumberCounter,
444 ) -> crate::Result<()> {
445 use crate::tree::ingest::Ingestion;
446 use std::time::Instant;
447
448 todo!();
452
453 Ok(())
511 }
512
513 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
514 self.index.major_compact(target_size, seqno_threshold)
515 }
516
517 fn clear_active_memtable(&self) {
518 self.index.clear_active_memtable();
519 }
520
521 fn l0_run_count(&self) -> usize {
522 self.index.l0_run_count()
523 }
524
525 fn blob_file_count(&self) -> usize {
526 self.index
527 .manifest
528 .read()
529 .expect("lock is poisoned")
530 .current_version()
531 .blob_file_count()
532 }
533
534 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
537 let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
538
539 Ok(vhandle.map(|x| match x {
540 #[allow(clippy::cast_possible_truncation)]
542 MaybeInlineValue::Inline(v) => v.len() as u32,
543
544 MaybeInlineValue::Indirect { size, .. } => size,
547 }))
548 }
549
550 fn filter_size(&self) -> usize {
551 self.index.filter_size()
552 }
553
554 fn pinned_filter_size(&self) -> usize {
555 self.index.pinned_filter_size()
556 }
557
558 fn pinned_block_index_size(&self) -> usize {
559 self.index.pinned_block_index_size()
560 }
561
562 fn sealed_memtable_count(&self) -> usize {
563 self.index.sealed_memtable_count()
564 }
565
566 fn flush_memtable(
567 &self,
568 segment_id: SegmentId,
569 memtable: &Arc<Memtable>,
570 eviction_seqno: SeqNo,
571 ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
572 use crate::{file::SEGMENTS_FOLDER, segment::Writer as SegmentWriter};
573 use value::MaybeInlineValue;
574
575 let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
576
577 log::debug!("Flushing memtable & performing key-value separation");
578 log::debug!("=> to LSM segments in {}", lsm_segment_folder.display());
579 let mut segment_writer = SegmentWriter::new(
582 lsm_segment_folder.join(segment_id.to_string()),
583 segment_id,
584 )?
591 .use_data_block_compression(self.index.config.data_block_compression_policy.get(0));
592 let mut blob_writer = BlobFileWriter::new(
598 self.blob_file_id_generator.clone(),
599 u64::MAX,
600 self.index.config.path.join(BLOBS_FOLDER),
601 )?;
602 let iter = memtable.iter().map(Ok);
607 let compaction_filter = CompactionStream::new(iter, eviction_seqno);
608
609 for item in compaction_filter {
610 let item = item?;
611
612 if item.is_tombstone() {
613 segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
616 continue;
617 }
618
619 let mut cursor = Cursor::new(item.value);
620
621 let value = MaybeInlineValue::decode_from(&mut cursor)?;
622 let value = match value {
623 MaybeInlineValue::Inline(value) => value,
624 indirection @ MaybeInlineValue::Indirect { .. } => {
625 let mut serialized_indirection = vec![];
629 indirection.encode_into(&mut serialized_indirection)?;
630
631 segment_writer
632 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
633
634 continue;
635 }
636 };
637
638 #[allow(clippy::cast_possible_truncation)]
640 let value_size = value.len() as u32;
641
642 if value_size >= self.index.config.blob_file_separation_threshold {
643 let offset = blob_writer.offset();
644 let blob_file_id = blob_writer.blob_file_id();
645 let on_disk_size = blob_writer.write(&item.key.user_key, value)?;
646
647 let indirection = MaybeInlineValue::Indirect {
648 vhandle: ValueHandle {
649 blob_file_id,
650 offset,
651 on_disk_size,
652 },
653 size: value_size,
654 };
655 let mut serialized_indirection = vec![];
657 indirection.encode_into(&mut serialized_indirection)?;
658
659 segment_writer
660 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
661 } else {
662 let direct = MaybeInlineValue::Inline(value);
664 let serialized_direct = direct.encode_into_vec();
665 segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
666 }
667 }
668
669 log::trace!("Creating blob file");
673 let blob_files = Self::consume_blob_file_writer(blob_writer)?;
674 assert!(blob_files.len() <= 1);
675 let blob_file = blob_files.into_iter().next();
676
677 log::trace!("Creating LSM-tree segment {segment_id}");
678 let segment = self.index.consume_writer(segment_writer)?;
679
680 if segment.is_some() {
682 self.pending_segments
685 .fetch_add(1, std::sync::atomic::Ordering::Release);
686 }
687
688 Ok(segment.map(|segment| (segment, blob_file)))
689 }
690
691 fn register_segments(
692 &self,
693 segments: &[Segment],
694 blob_files: Option<&[BlobFile]>,
695 seqno_threshold: SeqNo,
696 ) -> crate::Result<()> {
697 self.index
698 .register_segments(segments, blob_files, seqno_threshold)?;
699
700 let count = self
701 .pending_segments
702 .load(std::sync::atomic::Ordering::Acquire);
703
704 assert!(
705 count >= segments.len(),
706 "pending_segments is less than segments to register - this is a bug"
707 );
708
709 self.pending_segments
710 .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
711
712 Ok(())
713 }
714
715 fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
716 self.index.lock_active_memtable()
717 }
718
719 fn set_active_memtable(&self, memtable: Memtable) {
720 self.index.set_active_memtable(memtable);
721 }
722
723 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
724 self.index.add_sealed_memtable(id, memtable);
725 }
726
727 fn compact(
728 &self,
729 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
730 seqno_threshold: SeqNo,
731 ) -> crate::Result<()> {
732 self.index.compact(strategy, seqno_threshold)
733 }
734
735 fn get_next_segment_id(&self) -> SegmentId {
736 self.index.get_next_segment_id()
737 }
738
739 fn tree_config(&self) -> &Config {
740 &self.index.config
741 }
742
743 fn get_highest_seqno(&self) -> Option<SeqNo> {
744 self.index.get_highest_seqno()
745 }
746
747 fn active_memtable_size(&self) -> u64 {
748 self.index.active_memtable_size()
749 }
750
751 fn tree_type(&self) -> crate::TreeType {
752 crate::TreeType::Blob
753 }
754
755 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
756 self.index.rotate_memtable()
757 }
758
759 fn segment_count(&self) -> usize {
760 self.index.segment_count()
761 }
762
763 fn level_segment_count(&self, idx: usize) -> Option<usize> {
764 self.index.level_segment_count(idx)
765 }
766
767 fn approximate_len(&self) -> usize {
768 self.index.approximate_len()
769 }
770
771 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
774 self.index.is_empty(seqno, index)
775 }
776
777 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
780 self.index.contains_key(key, seqno)
781 }
782
783 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
786 self.index.len(seqno, index)
787 }
788
789 fn disk_space(&self) -> u64 {
790 let lock = self.index.manifest.read().expect("lock is poisoned");
791 let version = lock.current_version();
792 let vlog = crate::vlog::Accessor::new(&version.value_log);
793 self.index.disk_space() + vlog.disk_space()
794 }
795
796 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
797 self.index.get_highest_memtable_seqno()
798 }
799
800 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
801 self.index.get_highest_persisted_seqno()
802 }
803
804 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
805 &self,
806 key: K,
807 value: V,
808 seqno: SeqNo,
809 ) -> (u64, u64) {
810 use value::MaybeInlineValue;
811
812 let item = MaybeInlineValue::Inline(value.into());
821
822 let value = item.encode_into_vec();
823
824 self.index.insert(key, value, seqno)
825 }
826
827 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
828 use value::MaybeInlineValue::{Indirect, Inline};
829
830 let key = key.as_ref();
831
832 let Some(value) = self.index.get_vhandle(key, seqno)? else {
836 return Ok(None);
837 };
838
839 match value {
840 Inline(bytes) => Ok(Some(bytes)),
841 Indirect { vhandle, .. } => {
842 let lock = self.index.manifest.read().expect("lock is poisoned");
843 let vlog = crate::vlog::Accessor::new(&lock.current_version().value_log);
844
845 match vlog.get(
847 &self.blobs_folder,
848 key,
849 &vhandle,
850 &self.index.config.cache,
851 &self.index.config.descriptor_table,
852 )? {
853 Some(v) => Ok(Some(v)),
854 None => {
855 panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
856 }
857 }
858 }
859 }
860 }
861
862 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
863 self.index.remove(key, seqno)
864 }
865
866 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
867 self.index.remove_weak(key, seqno)
868 }
869}