1mod cache;
6mod compression;
7mod gc;
8pub mod index;
9pub mod value;
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::BLOBS_FOLDER,
15 r#abstract::{AbstractTree, RangeItem},
16 tree::inner::MemtableId,
17 value::InternalValue,
18 Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
19};
20use cache::MyBlobCache;
21use compression::MyCompressor;
22use gc::{reader::GcReader, writer::GcWriter};
23use index::IndexTree;
24use std::{
25 io::Cursor,
26 ops::{RangeBounds, RangeFull},
27 sync::{atomic::AtomicUsize, Arc},
28};
29use value::MaybeInlineValue;
30use value_log::ValueLog;
31
32fn resolve_value_handle(vlog: &ValueLog<MyBlobCache, MyCompressor>, item: RangeItem) -> RangeItem {
33 use MaybeInlineValue::{Indirect, Inline};
34
35 match item {
36 Ok((key, value)) => {
37 let mut cursor = Cursor::new(value);
38
39 match MaybeInlineValue::decode_from(&mut cursor)? {
40 Inline(bytes) => Ok((key, bytes)),
41 Indirect { vhandle, .. } => {
42 match vlog.get(&vhandle) {
44 Ok(Some(bytes)) => Ok((key, bytes)),
45 Err(e) => Err(e.into()),
46 _ => {
47 panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
48 }
49 }
50 }
51 }
52 }
53 Err(e) => Err(e),
54 }
55}
56
57#[derive(Clone)]
65pub struct BlobTree {
66 #[doc(hidden)]
68 pub index: IndexTree,
69
70 #[doc(hidden)]
72 pub blobs: ValueLog<MyBlobCache, MyCompressor>,
73
74 #[doc(hidden)]
76 pub pending_segments: Arc<AtomicUsize>,
77}
78
79impl BlobTree {
80 pub(crate) fn open(config: Config) -> crate::Result<Self> {
81 let path = &config.path;
82
83 let vlog_path = path.join(BLOBS_FOLDER);
84 let vlog_cfg =
85 value_log::Config::<MyBlobCache, MyCompressor>::new(MyBlobCache(config.cache.clone()))
86 .segment_size_bytes(config.blob_file_target_size)
87 .compression(match config.blob_compression {
88 crate::CompressionType::None => None,
89
90 #[cfg(any(feature = "lz4", feature = "miniz"))]
91 c => Some(MyCompressor(c)),
92 });
93
94 let index: IndexTree = config.open()?.into();
95
96 Ok(Self {
97 index,
98 blobs: ValueLog::open(vlog_path, vlog_cfg)?,
99 pending_segments: Arc::new(AtomicUsize::new(0)),
100 })
101 }
102
103 #[doc(hidden)]
106 pub fn gc_scan_stats(
107 &self,
108 seqno: SeqNo,
109 gc_watermark: SeqNo,
110 ) -> crate::Result<crate::gc::Report> {
111 use std::io::{Error as IoError, ErrorKind as IoErrorKind};
112 use MaybeInlineValue::{Indirect, Inline};
113
114 while self
115 .pending_segments
116 .load(std::sync::atomic::Ordering::Acquire)
117 > 0
118 {
119 }
122
123 let _memtable_lock = self.index.read_lock_active_memtable();
125
126 while self
127 .pending_segments
128 .load(std::sync::atomic::Ordering::Acquire)
129 > 0
130 {
131 }
134
135 let iter = self
136 .index
137 .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
138
139 let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
141
142 let result = self
143 .blobs
144 .scan_for_stats(iter.filter_map(|kv| {
145 let Ok(kv) = kv else {
146 return Some(Err(IoError::new(
147 IoErrorKind::Other,
148 "Failed to load KV pair from index tree",
149 )));
150 };
151
152 let mut cursor = Cursor::new(kv.value);
153 let value = match MaybeInlineValue::decode_from(&mut cursor) {
154 Ok(v) => v,
155 Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
156 };
157
158 match value {
159 Indirect { vhandle, size } => {
160 seqno_map
161 .entry(vhandle.segment_id)
162 .and_modify(|x| *x = (*x).max(kv.key.seqno))
163 .or_insert(kv.key.seqno);
164
165 Some(Ok((vhandle, size)))
166 }
167 Inline(_) => None,
168 }
169 }))
170 .map_err(Into::into);
171
172 let mut lock = self
173 .blobs
174 .manifest
175 .segments
176 .write()
177 .expect("lock is poisoned");
178
179 for (blob_file_id, max_seqno) in seqno_map {
185 if gc_watermark <= max_seqno {
186 if let Some(blob_file) = lock.get_mut(&blob_file_id) {
187 blob_file.gc_stats.set_stale_items(0);
188 blob_file.gc_stats.set_stale_bytes(0);
189 }
190 }
191 }
192
193 result
194 }
195
196 pub fn apply_gc_strategy(
197 &self,
198 strategy: &impl value_log::GcStrategy<MyBlobCache, MyCompressor>,
199 seqno: SeqNo,
200 ) -> crate::Result<u64> {
201 let memtable_lock = self.index.lock_active_memtable();
203
204 self.blobs.apply_gc_strategy(
205 strategy,
206 &GcReader::new(&self.index, &memtable_lock),
207 GcWriter::new(seqno, &memtable_lock),
208 )?;
209
210 self.blobs.drop_stale_segments().map_err(Into::into)
212 }
213
214 #[doc(hidden)]
216 pub fn gc_drop_stale(&self) -> crate::Result<u64> {
217 let _lock = self.index.lock_active_memtable();
219
220 self.blobs.drop_stale_segments().map_err(Into::into)
221 }
222
223 #[doc(hidden)]
224 pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
225 let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
226 return Ok(None);
227 };
228
229 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
230 else {
231 return Ok(None);
232 };
233 self.register_segments(&[segment.clone()])?;
234
235 Ok(Some(segment))
236 }
237}
238
239impl AbstractTree for BlobTree {
240 fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
241 use crate::tree::ingest::Ingestion;
242 use std::time::Instant;
243
244 let lock = self.lock_active_memtable();
246 assert!(
247 lock.is_empty(),
248 "can only perform bulk_ingest on empty trees",
249 );
250
251 let mut segment_writer = Ingestion::new(&self.index)?;
252 let mut blob_writer = self.blobs.get_writer()?;
253
254 let start = Instant::now();
255 let mut count = 0;
256 let mut last_key = None;
257
258 for (key, value) in iter {
259 if let Some(last_key) = &last_key {
260 assert!(
261 key > last_key,
262 "next key in bulk ingest was not greater than last key",
263 );
264 }
265 last_key = Some(key.clone());
266
267 #[allow(clippy::cast_possible_truncation)]
269 let value_size = value.len() as u32;
270
271 if value_size >= self.index.config.blob_file_separation_threshold {
272 let vhandle = blob_writer.get_next_value_handle();
273
274 let indirection = MaybeInlineValue::Indirect {
275 vhandle,
276 size: value_size,
277 };
278 let mut serialized_indirection = vec![];
280 indirection.encode_into(&mut serialized_indirection)?;
281
282 segment_writer.write(key.clone(), serialized_indirection.into())?;
283
284 blob_writer.write(&key, value)?;
285 } else {
286 let direct = MaybeInlineValue::Inline(value);
288 let serialized_direct = direct.encode_into_vec();
289 segment_writer.write(key, serialized_direct.into())?;
290 }
291
292 count += 1;
293 }
294
295 self.blobs.register_writer(blob_writer)?;
296 segment_writer.finish()?;
297
298 log::info!("Ingested {count} items in {:?}", start.elapsed());
299
300 Ok(())
301 }
302
303 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
304 self.index.major_compact(target_size, seqno_threshold)
305 }
306
307 fn clear_active_memtable(&self) {
308 self.index.clear_active_memtable();
309 }
310
311 fn l0_run_count(&self) -> usize {
312 self.index.l0_run_count()
313 }
314
315 fn blob_file_count(&self) -> usize {
316 self.blobs.segment_count()
317 }
318
319 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
322 let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
323
324 Ok(vhandle.map(|x| match x {
325 MaybeInlineValue::Inline(v) => v.len() as u32,
326
327 MaybeInlineValue::Indirect { size, .. } => size,
330 }))
331 }
332
333 fn bloom_filter_size(&self) -> usize {
334 self.index.bloom_filter_size()
335 }
336
337 fn sealed_memtable_count(&self) -> usize {
338 self.index.sealed_memtable_count()
339 }
340
341 #[doc(hidden)]
342 fn verify(&self) -> crate::Result<usize> {
343 let index_tree_sum = self.index.verify()?;
344 let vlog_sum = self.blobs.verify()?;
345 Ok(index_tree_sum + vlog_sum)
346 }
347
348 fn keys(
349 &self,
350 seqno: Option<SeqNo>,
351 index: Option<Arc<Memtable>>,
352 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
353 self.index.keys(seqno, index)
354 }
355
356 fn values(
357 &self,
358 seqno: Option<SeqNo>,
359 index: Option<Arc<Memtable>>,
360 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
361 Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
362 }
363
364 fn flush_memtable(
365 &self,
366 segment_id: SegmentId,
367 memtable: &Arc<Memtable>,
368 eviction_seqno: SeqNo,
369 ) -> crate::Result<Option<Segment>> {
370 use crate::{
371 file::SEGMENTS_FOLDER,
372 segment::writer::{Options, Writer as SegmentWriter},
373 };
374 use value::MaybeInlineValue;
375
376 let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
377
378 log::debug!("flushing memtable & performing key-value separation");
379 log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
380 log::debug!("=> to blob segment at {:?}", self.blobs.path);
381
382 let mut segment_writer = SegmentWriter::new(Options {
383 segment_id,
384 data_block_size: self.index.config.data_block_size,
385 index_block_size: self.index.config.index_block_size,
386 folder: lsm_segment_folder,
387 })?
388 .use_compression(self.index.config.compression);
389
390 segment_writer = segment_writer.use_bloom_policy(
391 crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
392 );
393
394 let mut blob_writer = self.blobs.get_writer()?;
395
396 let iter = memtable.iter().map(Ok);
397 let compaction_filter = CompactionStream::new(iter, eviction_seqno);
398
399 for item in compaction_filter {
400 let item = item?;
401
402 if item.is_tombstone() {
403 segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
407 continue;
408 }
409
410 let mut cursor = Cursor::new(item.value);
411
412 let value = MaybeInlineValue::decode_from(&mut cursor)?;
413 let value = match value {
414 MaybeInlineValue::Inline(value) => value,
415 indirection @ MaybeInlineValue::Indirect { .. } => {
416 let mut serialized_indirection = vec![];
420 indirection.encode_into(&mut serialized_indirection)?;
421
422 segment_writer
423 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
424
425 continue;
426 }
427 };
428
429 #[allow(clippy::cast_possible_truncation)]
431 let value_size = value.len() as u32;
432
433 if value_size >= self.index.config.blob_file_separation_threshold {
434 let vhandle = blob_writer.get_next_value_handle();
435
436 let indirection = MaybeInlineValue::Indirect {
437 vhandle,
438 size: value_size,
439 };
440 let mut serialized_indirection = vec![];
442 indirection.encode_into(&mut serialized_indirection)?;
443
444 segment_writer
445 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
446
447 blob_writer.write(&item.key.user_key, value)?;
448 } else {
449 let direct = MaybeInlineValue::Inline(value);
451 let serialized_direct = direct.encode_into_vec();
452 segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
453 }
454 }
455
456 let _memtable_lock = self.lock_active_memtable();
457
458 log::trace!("Register blob writer into value log");
459 self.blobs.register_writer(blob_writer)?;
460
461 log::trace!("Creating LSM-tree segment {segment_id}");
462 let segment = self.index.consume_writer(segment_id, segment_writer)?;
463
464 if segment.is_some() {
466 self.pending_segments
469 .fetch_add(1, std::sync::atomic::Ordering::Release);
470 }
471
472 Ok(segment)
473 }
474
475 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
476 self.index.register_segments(segments)?;
477
478 let count = self
479 .pending_segments
480 .load(std::sync::atomic::Ordering::Acquire);
481
482 assert!(
483 count >= segments.len(),
484 "pending_segments is less than segments to register - this is a bug"
485 );
486
487 self.pending_segments
488 .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
489
490 Ok(())
491 }
492
493 fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
494 self.index.lock_active_memtable()
495 }
496
497 fn set_active_memtable(&self, memtable: Memtable) {
498 self.index.set_active_memtable(memtable);
499 }
500
501 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
502 self.index.add_sealed_memtable(id, memtable);
503 }
504
505 fn compact(
506 &self,
507 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
508 seqno_threshold: SeqNo,
509 ) -> crate::Result<()> {
510 self.index.compact(strategy, seqno_threshold)
511 }
512
513 fn get_next_segment_id(&self) -> SegmentId {
514 self.index.get_next_segment_id()
515 }
516
517 fn tree_config(&self) -> &Config {
518 &self.index.config
519 }
520
521 fn get_highest_seqno(&self) -> Option<SeqNo> {
522 self.index.get_highest_seqno()
523 }
524
525 fn active_memtable_size(&self) -> u32 {
526 self.index.active_memtable_size()
527 }
528
529 fn tree_type(&self) -> crate::TreeType {
530 crate::TreeType::Blob
531 }
532
533 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
534 self.index.rotate_memtable()
535 }
536
537 fn segment_count(&self) -> usize {
538 self.index.segment_count()
539 }
540
541 fn level_segment_count(&self, idx: usize) -> Option<usize> {
542 self.index.level_segment_count(idx)
543 }
544
545 fn approximate_len(&self) -> usize {
546 self.index.approximate_len()
547 }
548
549 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
552 self.index.contains_key(key, seqno)
553 }
554
555 fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
558 self.index.len(seqno, index)
559 }
560
561 fn disk_space(&self) -> u64 {
562 self.index.disk_space() + self.blobs.manifest.disk_space_used()
563 }
564
565 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
566 self.index.get_highest_memtable_seqno()
567 }
568
569 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
570 self.index.get_highest_persisted_seqno()
571 }
572
573 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
574 use crate::AnyTree::Blob;
575
576 Snapshot::new(Blob(self.clone()), seqno)
577 }
578
579 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
580 &self,
581 range: R,
582 seqno: Option<SeqNo>,
583 index: Option<Arc<Memtable>>,
584 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
585 let vlog = self.blobs.clone();
586 Box::new(
587 self.index
588 .0
589 .create_range(&range, seqno, index)
590 .map(move |item| resolve_value_handle(&vlog, item)),
591 )
592 }
593
594 fn prefix<K: AsRef<[u8]>>(
595 &self,
596 prefix: K,
597 seqno: Option<SeqNo>,
598 index: Option<Arc<Memtable>>,
599 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
600 let vlog = self.blobs.clone();
601 Box::new(
602 self.index
603 .0
604 .create_prefix(prefix, seqno, index)
605 .map(move |item| resolve_value_handle(&vlog, item)),
606 )
607 }
608
609 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
610 &self,
611 key: K,
612 value: V,
613 seqno: SeqNo,
614 ) -> (u32, u32) {
615 use value::MaybeInlineValue;
616
617 let item = MaybeInlineValue::Inline(value.into());
621
622 let value = item.encode_into_vec();
623
624 self.index.insert(key, value, seqno)
625 }
626
627 fn get<K: AsRef<[u8]>>(
628 &self,
629 key: K,
630 seqno: Option<SeqNo>,
631 ) -> crate::Result<Option<crate::UserValue>> {
632 use value::MaybeInlineValue::{Indirect, Inline};
633
634 let key = key.as_ref();
635
636 let Some(value) = self.index.get_vhandle(key, seqno)? else {
637 return Ok(None);
638 };
639
640 match value {
641 Inline(bytes) => Ok(Some(bytes)),
642 Indirect { vhandle, .. } => {
643 match self.blobs.get(&vhandle)? {
645 Some(bytes) => Ok(Some(bytes)),
646 None => {
647 panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
648 }
649 }
650 }
651 }
652 }
653
654 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
655 self.index.remove(key, seqno)
656 }
657
658 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
659 self.index.remove_weak(key, seqno)
660 }
661}