1mod cache;
6mod compression;
7mod gc;
8pub mod index;
9pub mod value;
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::BLOBS_FOLDER,
15 r#abstract::{AbstractTree, RangeItem},
16 tree::inner::MemtableId,
17 value::InternalValue,
18 Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
19};
20use cache::MyBlobCache;
21use compression::MyCompressor;
22use gc::{reader::GcReader, writer::GcWriter};
23use index::IndexTree;
24use std::{
25 io::Cursor,
26 ops::{RangeBounds, RangeFull},
27 sync::{atomic::AtomicUsize, Arc},
28};
29use value::MaybeInlineValue;
30use value_log::ValueLog;
31
32fn resolve_value_handle(vlog: &ValueLog<MyBlobCache, MyCompressor>, item: RangeItem) -> RangeItem {
33 use MaybeInlineValue::{Indirect, Inline};
34
35 match item {
36 Ok((key, value)) => {
37 let mut cursor = Cursor::new(value);
38
39 match MaybeInlineValue::decode_from(&mut cursor)? {
40 Inline(bytes) => Ok((key, bytes)),
41 Indirect { vhandle, .. } => {
42 match vlog.get(&vhandle) {
44 Ok(Some(bytes)) => Ok((key, bytes)),
45 Err(e) => Err(e.into()),
46 _ => {
47 panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
48 }
49 }
50 }
51 }
52 }
53 Err(e) => Err(e),
54 }
55}
56
57#[derive(Clone)]
65pub struct BlobTree {
66 #[doc(hidden)]
68 pub index: IndexTree,
69
70 #[doc(hidden)]
72 pub blobs: ValueLog<MyBlobCache, MyCompressor>,
73
74 #[doc(hidden)]
76 pub pending_segments: Arc<AtomicUsize>,
77}
78
79impl BlobTree {
80 pub(crate) fn open(config: Config) -> crate::Result<Self> {
81 let path = &config.path;
82
83 let vlog_path = path.join(BLOBS_FOLDER);
84 let vlog_cfg =
85 value_log::Config::<MyBlobCache, MyCompressor>::new(MyBlobCache(config.cache.clone()))
86 .segment_size_bytes(config.blob_file_target_size)
87 .compression(MyCompressor(config.blob_compression));
88
89 let index: IndexTree = config.open()?.into();
90
91 Ok(Self {
92 index,
93 blobs: ValueLog::open(vlog_path, vlog_cfg)?,
94 pending_segments: Arc::new(AtomicUsize::new(0)),
95 })
96 }
97
98 #[doc(hidden)]
101 pub fn gc_scan_stats(
102 &self,
103 seqno: SeqNo,
104 gc_watermark: SeqNo,
105 ) -> crate::Result<crate::gc::Report> {
106 use std::io::{Error as IoError, ErrorKind as IoErrorKind};
107 use MaybeInlineValue::{Indirect, Inline};
108
109 while self
110 .pending_segments
111 .load(std::sync::atomic::Ordering::Acquire)
112 > 0
113 {
114 }
117
118 let _memtable_lock = self.index.read_lock_active_memtable();
120
121 while self
122 .pending_segments
123 .load(std::sync::atomic::Ordering::Acquire)
124 > 0
125 {
126 }
129
130 let iter = self
131 .index
132 .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
133
134 let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
136
137 let result = self
138 .blobs
139 .scan_for_stats(iter.filter_map(|kv| {
140 let Ok(kv) = kv else {
141 return Some(Err(IoError::new(
142 IoErrorKind::Other,
143 "Failed to load KV pair from index tree",
144 )));
145 };
146
147 let mut cursor = Cursor::new(kv.value);
148 let value = match MaybeInlineValue::decode_from(&mut cursor) {
149 Ok(v) => v,
150 Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
151 };
152
153 match value {
154 Indirect { vhandle, size } => {
155 seqno_map
156 .entry(vhandle.segment_id)
157 .and_modify(|x| *x = (*x).max(kv.key.seqno))
158 .or_insert(kv.key.seqno);
159
160 Some(Ok((vhandle, size)))
161 }
162 Inline(_) => None,
163 }
164 }))
165 .map_err(Into::into);
166
167 let mut lock = self
168 .blobs
169 .manifest
170 .segments
171 .write()
172 .expect("lock is poisoned");
173
174 for (blob_file_id, max_seqno) in seqno_map {
180 if gc_watermark <= max_seqno {
181 if let Some(blob_file) = lock.get_mut(&blob_file_id) {
182 blob_file.gc_stats.set_stale_items(0);
183 blob_file.gc_stats.set_stale_bytes(0);
184 }
185 }
186 }
187
188 result
189 }
190
191 pub fn apply_gc_strategy(
192 &self,
193 strategy: &impl value_log::GcStrategy<MyBlobCache, MyCompressor>,
194 seqno: SeqNo,
195 ) -> crate::Result<u64> {
196 let memtable_lock = self.index.lock_active_memtable();
198
199 self.blobs.apply_gc_strategy(
200 strategy,
201 &GcReader::new(&self.index, &memtable_lock),
202 GcWriter::new(seqno, &memtable_lock),
203 )?;
204
205 self.blobs.drop_stale_segments().map_err(Into::into)
207 }
208
209 #[doc(hidden)]
211 pub fn gc_drop_stale(&self) -> crate::Result<u64> {
212 let _lock = self.index.lock_active_memtable();
214
215 self.blobs.drop_stale_segments().map_err(Into::into)
216 }
217
218 #[doc(hidden)]
219 pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
220 let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
221 return Ok(None);
222 };
223
224 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
225 else {
226 return Ok(None);
227 };
228 self.register_segments(&[segment.clone()])?;
229
230 Ok(Some(segment))
231 }
232}
233
234impl AbstractTree for BlobTree {
235 fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
236 use crate::tree::ingest::Ingestion;
237 use std::time::Instant;
238
239 let lock = self.lock_active_memtable();
241 assert!(
242 lock.is_empty(),
243 "can only perform bulk_ingest on empty trees",
244 );
245
246 let mut segment_writer = Ingestion::new(&self.index)?;
247 let mut blob_writer = self.blobs.get_writer()?;
248
249 let start = Instant::now();
250 let mut count = 0;
251 let mut last_key = None;
252
253 for (key, value) in iter {
254 if let Some(last_key) = &last_key {
255 assert!(
256 key > last_key,
257 "next key in bulk ingest was not greater than last key",
258 );
259 }
260 last_key = Some(key.clone());
261
262 #[allow(clippy::cast_possible_truncation)]
264 let value_size = value.len() as u32;
265
266 if value_size >= self.index.config.blob_file_separation_threshold {
267 let vhandle = blob_writer.get_next_value_handle();
268
269 let indirection = MaybeInlineValue::Indirect {
270 vhandle,
271 size: value_size,
272 };
273 let mut serialized_indirection = vec![];
275 indirection.encode_into(&mut serialized_indirection)?;
276
277 segment_writer.write(key.clone(), serialized_indirection.into())?;
278
279 blob_writer.write(&key, value)?;
280 } else {
281 let direct = MaybeInlineValue::Inline(value);
283 let serialized_direct = direct.encode_into_vec();
284 segment_writer.write(key, serialized_direct.into())?;
285 }
286
287 count += 1;
288 }
289
290 self.blobs.register_writer(blob_writer)?;
291 segment_writer.finish()?;
292
293 log::info!("Ingested {count} items in {:?}", start.elapsed());
294
295 Ok(())
296 }
297
298 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
299 self.index.major_compact(target_size, seqno_threshold)
300 }
301
302 fn clear_active_memtable(&self) {
303 self.index.clear_active_memtable();
304 }
305
306 fn l0_run_count(&self) -> usize {
307 self.index.l0_run_count()
308 }
309
310 fn blob_file_count(&self) -> usize {
311 self.blobs.segment_count()
312 }
313
314 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
317 let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
318
319 Ok(vhandle.map(|x| match x {
320 MaybeInlineValue::Inline(v) => v.len() as u32,
321
322 MaybeInlineValue::Indirect { size, .. } => size,
325 }))
326 }
327
328 fn bloom_filter_size(&self) -> usize {
329 self.index.bloom_filter_size()
330 }
331
332 fn sealed_memtable_count(&self) -> usize {
333 self.index.sealed_memtable_count()
334 }
335
336 #[doc(hidden)]
337 fn verify(&self) -> crate::Result<usize> {
338 let index_tree_sum = self.index.verify()?;
339 let vlog_sum = self.blobs.verify()?;
340 Ok(index_tree_sum + vlog_sum)
341 }
342
343 fn keys(
344 &self,
345 seqno: Option<SeqNo>,
346 index: Option<Arc<Memtable>>,
347 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
348 self.index.keys(seqno, index)
349 }
350
351 fn values(
352 &self,
353 seqno: Option<SeqNo>,
354 index: Option<Arc<Memtable>>,
355 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
356 Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
357 }
358
359 fn flush_memtable(
360 &self,
361 segment_id: SegmentId,
362 memtable: &Arc<Memtable>,
363 eviction_seqno: SeqNo,
364 ) -> crate::Result<Option<Segment>> {
365 use crate::{
366 file::SEGMENTS_FOLDER,
367 segment::writer::{Options, Writer as SegmentWriter},
368 };
369 use value::MaybeInlineValue;
370
371 let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
372
373 log::debug!("flushing memtable & performing key-value separation");
374 log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
375 log::debug!("=> to blob segment at {:?}", self.blobs.path);
376
377 let mut segment_writer = SegmentWriter::new(Options {
378 segment_id,
379 data_block_size: self.index.config.data_block_size,
380 index_block_size: self.index.config.index_block_size,
381 folder: lsm_segment_folder,
382 })?
383 .use_compression(self.index.config.compression);
384
385 segment_writer = segment_writer.use_bloom_policy(
386 crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
387 );
388
389 let mut blob_writer = self.blobs.get_writer()?;
390
391 let iter = memtable.iter().map(Ok);
392 let compaction_filter = CompactionStream::new(iter, eviction_seqno);
393
394 for item in compaction_filter {
395 let item = item?;
396
397 if item.is_tombstone() {
398 segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
402 continue;
403 }
404
405 let mut cursor = Cursor::new(item.value);
406
407 let value = MaybeInlineValue::decode_from(&mut cursor)?;
408 let value = match value {
409 MaybeInlineValue::Inline(value) => value,
410 indirection @ MaybeInlineValue::Indirect { .. } => {
411 let mut serialized_indirection = vec![];
415 indirection.encode_into(&mut serialized_indirection)?;
416
417 segment_writer
418 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
419
420 continue;
421 }
422 };
423
424 #[allow(clippy::cast_possible_truncation)]
426 let value_size = value.len() as u32;
427
428 if value_size >= self.index.config.blob_file_separation_threshold {
429 let vhandle = blob_writer.get_next_value_handle();
430
431 let indirection = MaybeInlineValue::Indirect {
432 vhandle,
433 size: value_size,
434 };
435 let mut serialized_indirection = vec![];
437 indirection.encode_into(&mut serialized_indirection)?;
438
439 segment_writer
440 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
441
442 blob_writer.write(&item.key.user_key, value)?;
443 } else {
444 let direct = MaybeInlineValue::Inline(value);
446 let serialized_direct = direct.encode_into_vec();
447 segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
448 }
449 }
450
451 let _memtable_lock = self.lock_active_memtable();
452
453 log::trace!("Register blob writer into value log");
454 self.blobs.register_writer(blob_writer)?;
455
456 log::trace!("Creating LSM-tree segment {segment_id}");
457 let segment = self.index.consume_writer(segment_id, segment_writer)?;
458
459 if segment.is_some() {
461 self.pending_segments
464 .fetch_add(1, std::sync::atomic::Ordering::Release);
465 }
466
467 Ok(segment)
468 }
469
470 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
471 self.index.register_segments(segments)?;
472
473 let count = self
474 .pending_segments
475 .load(std::sync::atomic::Ordering::Acquire);
476
477 assert!(
478 count >= segments.len(),
479 "pending_segments is less than segments to register - this is a bug"
480 );
481
482 self.pending_segments
483 .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
484
485 Ok(())
486 }
487
488 fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
489 self.index.lock_active_memtable()
490 }
491
492 fn set_active_memtable(&self, memtable: Memtable) {
493 self.index.set_active_memtable(memtable);
494 }
495
496 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
497 self.index.add_sealed_memtable(id, memtable);
498 }
499
500 fn compact(
501 &self,
502 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
503 seqno_threshold: SeqNo,
504 ) -> crate::Result<()> {
505 self.index.compact(strategy, seqno_threshold)
506 }
507
508 fn get_next_segment_id(&self) -> SegmentId {
509 self.index.get_next_segment_id()
510 }
511
512 fn tree_config(&self) -> &Config {
513 &self.index.config
514 }
515
516 fn get_highest_seqno(&self) -> Option<SeqNo> {
517 self.index.get_highest_seqno()
518 }
519
520 fn active_memtable_size(&self) -> u32 {
521 self.index.active_memtable_size()
522 }
523
524 fn tree_type(&self) -> crate::TreeType {
525 crate::TreeType::Blob
526 }
527
528 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
529 self.index.rotate_memtable()
530 }
531
532 fn segment_count(&self) -> usize {
533 self.index.segment_count()
534 }
535
536 fn level_segment_count(&self, idx: usize) -> Option<usize> {
537 self.index.level_segment_count(idx)
538 }
539
540 fn approximate_len(&self) -> usize {
541 self.index.approximate_len()
542 }
543
544 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
547 self.index.contains_key(key, seqno)
548 }
549
550 fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
553 self.index.len(seqno, index)
554 }
555
556 fn disk_space(&self) -> u64 {
557 self.index.disk_space() + self.blobs.manifest.disk_space_used()
558 }
559
560 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
561 self.index.get_highest_memtable_seqno()
562 }
563
564 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
565 self.index.get_highest_persisted_seqno()
566 }
567
568 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
569 use crate::AnyTree::Blob;
570
571 Snapshot::new(Blob(self.clone()), seqno)
572 }
573
574 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
575 &self,
576 range: R,
577 seqno: Option<SeqNo>,
578 index: Option<Arc<Memtable>>,
579 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
580 let vlog = self.blobs.clone();
581 Box::new(
582 self.index
583 .0
584 .create_range(&range, seqno, index)
585 .map(move |item| resolve_value_handle(&vlog, item)),
586 )
587 }
588
589 fn prefix<K: AsRef<[u8]>>(
590 &self,
591 prefix: K,
592 seqno: Option<SeqNo>,
593 index: Option<Arc<Memtable>>,
594 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
595 let vlog = self.blobs.clone();
596 Box::new(
597 self.index
598 .0
599 .create_prefix(prefix, seqno, index)
600 .map(move |item| resolve_value_handle(&vlog, item)),
601 )
602 }
603
604 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
605 &self,
606 key: K,
607 value: V,
608 seqno: SeqNo,
609 ) -> (u32, u32) {
610 use value::MaybeInlineValue;
611
612 let item = MaybeInlineValue::Inline(value.into());
616
617 let value = item.encode_into_vec();
618
619 self.index.insert(key, value, seqno)
620 }
621
622 fn get<K: AsRef<[u8]>>(
623 &self,
624 key: K,
625 seqno: Option<SeqNo>,
626 ) -> crate::Result<Option<crate::UserValue>> {
627 use value::MaybeInlineValue::{Indirect, Inline};
628
629 let key = key.as_ref();
630
631 let Some(value) = self.index.get_vhandle(key, seqno)? else {
632 return Ok(None);
633 };
634
635 match value {
636 Inline(bytes) => Ok(Some(bytes)),
637 Indirect { vhandle, .. } => {
638 match self.blobs.get(&vhandle)? {
640 Some(bytes) => Ok(Some(bytes)),
641 None => {
642 panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
643 }
644 }
645 }
646 }
647 }
648
649 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
650 self.index.remove(key, seqno)
651 }
652
653 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
654 self.index.remove_weak(key, seqno)
655 }
656}