1mod compression;
6mod gc;
7pub mod index;
8pub mod value;
9
10use crate::{
11 coding::{Decode, Encode},
12 compaction::stream::CompactionStream,
13 file::BLOBS_FOLDER,
14 r#abstract::{AbstractTree, RangeItem},
15 tree::inner::MemtableId,
16 value::InternalValue,
17 Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
18};
19use compression::MyCompressor;
20use gc::{reader::GcReader, writer::GcWriter};
21use index::IndexTree;
22use std::{
23 io::Cursor,
24 ops::{RangeBounds, RangeFull},
25 sync::{atomic::AtomicUsize, Arc},
26};
27use value::MaybeInlineValue;
28use value_log::ValueLog;
29
30fn resolve_value_handle(vlog: &ValueLog<MyCompressor>, item: RangeItem) -> RangeItem {
31 use MaybeInlineValue::{Indirect, Inline};
32
33 match item {
34 Ok((key, value)) => {
35 let mut cursor = Cursor::new(value);
36
37 match MaybeInlineValue::decode_from(&mut cursor)? {
38 Inline(bytes) => Ok((key, bytes)),
39 Indirect { vhandle, .. } => {
40 match vlog.get(&vhandle) {
42 Ok(Some(bytes)) => Ok((key, bytes)),
43 Err(e) => Err(e.into()),
44 _ => {
45 panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
46 }
47 }
48 }
49 }
50 }
51 Err(e) => Err(e),
52 }
53}
54
55#[derive(Clone)]
63pub struct BlobTree {
64 #[doc(hidden)]
66 pub index: IndexTree,
67
68 #[doc(hidden)]
70 pub blobs: ValueLog<MyCompressor>,
71
72 #[doc(hidden)]
74 pub pending_segments: Arc<AtomicUsize>,
75}
76
77impl BlobTree {
78 pub(crate) fn open(config: Config) -> crate::Result<Self> {
79 let path = &config.path;
80
81 let vlog_path = path.join(BLOBS_FOLDER);
82 let vlog_cfg = value_log::Config::<MyCompressor>::default()
83 .blob_cache(config.blob_cache.clone())
84 .segment_size_bytes(config.blob_file_target_size)
85 .compression(MyCompressor(config.blob_compression));
86
87 let index: IndexTree = config.open()?.into();
88
89 Ok(Self {
90 index,
91 blobs: ValueLog::open(vlog_path, vlog_cfg)?,
92 pending_segments: Arc::new(AtomicUsize::new(0)),
93 })
94 }
95
96 #[doc(hidden)]
99 pub fn gc_scan_stats(
100 &self,
101 seqno: SeqNo,
102 gc_watermark: SeqNo,
103 ) -> crate::Result<crate::gc::Report> {
104 use std::io::{Error as IoError, ErrorKind as IoErrorKind};
105 use MaybeInlineValue::{Indirect, Inline};
106
107 while self
108 .pending_segments
109 .load(std::sync::atomic::Ordering::Acquire)
110 > 0
111 {
112 }
115
116 let _memtable_lock = self.index.read_lock_active_memtable();
118
119 while self
120 .pending_segments
121 .load(std::sync::atomic::Ordering::Acquire)
122 > 0
123 {
124 }
127
128 let iter = self
129 .index
130 .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
131
132 let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
134
135 let result = self
136 .blobs
137 .scan_for_stats(iter.filter_map(|kv| {
138 let Ok(kv) = kv else {
139 return Some(Err(IoError::new(
140 IoErrorKind::Other,
141 "Failed to load KV pair from index tree",
142 )));
143 };
144
145 let mut cursor = Cursor::new(kv.value);
146 let value = match MaybeInlineValue::decode_from(&mut cursor) {
147 Ok(v) => v,
148 Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
149 };
150
151 match value {
152 Indirect { vhandle, size } => {
153 seqno_map
154 .entry(vhandle.segment_id)
155 .and_modify(|x| *x = (*x).max(kv.key.seqno))
156 .or_insert(kv.key.seqno);
157
158 Some(Ok((vhandle, size)))
159 }
160 Inline(_) => None,
161 }
162 }))
163 .map_err(Into::into);
164
165 let mut lock = self
166 .blobs
167 .manifest
168 .segments
169 .write()
170 .expect("lock is poisoned");
171
172 for (blob_file_id, max_seqno) in seqno_map {
178 if gc_watermark <= max_seqno {
179 if let Some(blob_file) = lock.get_mut(&blob_file_id) {
180 blob_file.gc_stats.set_stale_items(0);
181 blob_file.gc_stats.set_stale_bytes(0);
182 }
183 }
184 }
185
186 result
187 }
188
189 pub fn apply_gc_strategy(
190 &self,
191 strategy: &impl value_log::GcStrategy<MyCompressor>,
192 seqno: SeqNo,
193 ) -> crate::Result<u64> {
194 let memtable_lock = self.index.lock_active_memtable();
196
197 self.blobs.apply_gc_strategy(
198 strategy,
199 &GcReader::new(&self.index, &memtable_lock),
200 GcWriter::new(seqno, &memtable_lock),
201 )?;
202
203 self.blobs.drop_stale_segments().map_err(Into::into)
205 }
206
207 #[doc(hidden)]
209 pub fn gc_drop_stale(&self) -> crate::Result<u64> {
210 let _lock = self.index.lock_active_memtable();
212
213 self.blobs.drop_stale_segments().map_err(Into::into)
214 }
215
216 #[doc(hidden)]
217 pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
218 let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
219 return Ok(None);
220 };
221
222 let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
223 else {
224 return Ok(None);
225 };
226 self.register_segments(&[segment.clone()])?;
227
228 Ok(Some(segment))
229 }
230}
231
232impl AbstractTree for BlobTree {
233 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
234 self.index.major_compact(target_size, seqno_threshold)
235 }
236
237 fn clear_active_memtable(&self) {
238 self.index.clear_active_memtable();
239 }
240
241 fn l0_run_count(&self) -> usize {
242 self.index.l0_run_count()
243 }
244
245 fn blob_file_count(&self) -> usize {
246 self.blobs.segment_count()
247 }
248
249 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
252 let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
253
254 Ok(vhandle.map(|x| match x {
255 MaybeInlineValue::Inline(v) => v.len() as u32,
256
257 MaybeInlineValue::Indirect { size, .. } => size,
260 }))
261 }
262
263 fn bloom_filter_size(&self) -> usize {
264 self.index.bloom_filter_size()
265 }
266
267 fn sealed_memtable_count(&self) -> usize {
268 self.index.sealed_memtable_count()
269 }
270
271 #[doc(hidden)]
272 fn verify(&self) -> crate::Result<usize> {
273 let index_tree_sum = self.index.verify()?;
274 let vlog_sum = self.blobs.verify()?;
275 Ok(index_tree_sum + vlog_sum)
276 }
277
278 fn keys(
279 &self,
280 seqno: Option<SeqNo>,
281 index: Option<Arc<Memtable>>,
282 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
283 self.index.keys(seqno, index)
284 }
285
286 fn values(
287 &self,
288 seqno: Option<SeqNo>,
289 index: Option<Arc<Memtable>>,
290 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
291 Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
292 }
293
294 fn flush_memtable(
295 &self,
296 segment_id: SegmentId,
297 memtable: &Arc<Memtable>,
298 eviction_seqno: SeqNo,
299 ) -> crate::Result<Option<Segment>> {
300 use crate::{
301 file::SEGMENTS_FOLDER,
302 segment::writer::{Options, Writer as SegmentWriter},
303 };
304 use value::MaybeInlineValue;
305
306 let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
307
308 log::debug!("flushing memtable & performing key-value separation");
309 log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
310 log::debug!("=> to blob segment at {:?}", self.blobs.path);
311
312 let mut segment_writer = SegmentWriter::new(Options {
313 segment_id,
314 data_block_size: self.index.config.data_block_size,
315 index_block_size: self.index.config.index_block_size,
316 folder: lsm_segment_folder,
317 })?
318 .use_compression(self.index.config.compression);
319
320 segment_writer = segment_writer.use_bloom_policy(
321 crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
322 );
323
324 let mut blob_writer = self.blobs.get_writer()?;
325
326 let iter = memtable.iter().map(Ok);
327 let compaction_filter = CompactionStream::new(iter, eviction_seqno);
328
329 for item in compaction_filter {
330 let item = item?;
331
332 if item.is_tombstone() {
333 segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
337 continue;
338 }
339
340 let mut cursor = Cursor::new(item.value);
341
342 let value = MaybeInlineValue::decode_from(&mut cursor)?;
343 let value = match value {
344 MaybeInlineValue::Inline(value) => value,
345 indirection @ MaybeInlineValue::Indirect { .. } => {
346 let mut serialized_indirection = vec![];
350 indirection.encode_into(&mut serialized_indirection)?;
351
352 segment_writer
353 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
354
355 continue;
356 }
357 };
358
359 #[allow(clippy::cast_possible_truncation)]
361 let value_size = value.len() as u32;
362
363 if value_size >= self.index.config.blob_file_separation_threshold {
364 let vhandle = blob_writer.get_next_value_handle();
365
366 let indirection = MaybeInlineValue::Indirect {
367 vhandle,
368 size: value_size,
369 };
370 let mut serialized_indirection = vec![];
371 indirection.encode_into(&mut serialized_indirection)?;
372
373 segment_writer
374 .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
375
376 blob_writer.write(&item.key.user_key, value)?;
377 } else {
378 let direct = MaybeInlineValue::Inline(value);
379 let serialized_direct = direct.encode_into_vec();
380 segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
381 }
382 }
383
384 let _memtable_lock = self.lock_active_memtable();
385
386 log::trace!("Register blob writer into value log");
387 self.blobs.register_writer(blob_writer)?;
388
389 log::trace!("Creating LSM-tree segment {segment_id}");
390 let segment = self.index.consume_writer(segment_id, segment_writer)?;
391
392 if segment.is_some() {
394 self.pending_segments
397 .fetch_add(1, std::sync::atomic::Ordering::Release);
398 }
399
400 Ok(segment)
401 }
402
403 fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
404 self.index.register_segments(segments)?;
405
406 let count = self
407 .pending_segments
408 .load(std::sync::atomic::Ordering::Acquire);
409
410 assert!(
411 count >= segments.len(),
412 "pending_segments is less than segments to register - this is a bug"
413 );
414
415 self.pending_segments
416 .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
417
418 Ok(())
419 }
420
421 fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
422 self.index.lock_active_memtable()
423 }
424
425 fn set_active_memtable(&self, memtable: Memtable) {
426 self.index.set_active_memtable(memtable);
427 }
428
429 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
430 self.index.add_sealed_memtable(id, memtable);
431 }
432
433 fn compact(
434 &self,
435 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
436 seqno_threshold: SeqNo,
437 ) -> crate::Result<()> {
438 self.index.compact(strategy, seqno_threshold)
439 }
440
441 fn get_next_segment_id(&self) -> SegmentId {
442 self.index.get_next_segment_id()
443 }
444
445 fn tree_config(&self) -> &Config {
446 &self.index.config
447 }
448
449 fn get_highest_seqno(&self) -> Option<SeqNo> {
450 self.index.get_highest_seqno()
451 }
452
453 fn active_memtable_size(&self) -> u32 {
454 self.index.active_memtable_size()
455 }
456
457 fn tree_type(&self) -> crate::TreeType {
458 crate::TreeType::Blob
459 }
460
461 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
462 self.index.rotate_memtable()
463 }
464
465 fn segment_count(&self) -> usize {
466 self.index.segment_count()
467 }
468
469 fn level_segment_count(&self, idx: usize) -> Option<usize> {
470 self.index.level_segment_count(idx)
471 }
472
473 fn approximate_len(&self) -> usize {
474 self.index.approximate_len()
475 }
476
477 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
480 self.index.contains_key(key, seqno)
481 }
482
483 fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
486 self.index.len(seqno, index)
487 }
488
489 #[must_use]
490 fn disk_space(&self) -> u64 {
491 self.index.disk_space() + self.blobs.manifest.disk_space_used()
492 }
493
494 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
495 self.index.get_highest_memtable_seqno()
496 }
497
498 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
499 self.index.get_highest_persisted_seqno()
500 }
501
502 fn snapshot(&self, seqno: SeqNo) -> Snapshot {
503 use crate::AnyTree::Blob;
504
505 Snapshot::new(Blob(self.clone()), seqno)
506 }
507
508 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
509 &self,
510 range: R,
511 seqno: Option<SeqNo>,
512 index: Option<Arc<Memtable>>,
513 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
514 let vlog = self.blobs.clone();
515 Box::new(
516 self.index
517 .0
518 .create_range(&range, seqno, index)
519 .map(move |item| resolve_value_handle(&vlog, item)),
520 )
521 }
522
523 fn prefix<K: AsRef<[u8]>>(
524 &self,
525 prefix: K,
526 seqno: Option<SeqNo>,
527 index: Option<Arc<Memtable>>,
528 ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
529 let vlog = self.blobs.clone();
530 Box::new(
531 self.index
532 .0
533 .create_prefix(prefix, seqno, index)
534 .map(move |item| resolve_value_handle(&vlog, item)),
535 )
536 }
537
538 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
539 &self,
540 key: K,
541 value: V,
542 seqno: SeqNo,
543 ) -> (u32, u32) {
544 use value::MaybeInlineValue;
545
546 let item = MaybeInlineValue::Inline(value.into());
550
551 let value = item.encode_into_vec();
552
553 self.index.insert(key, value, seqno)
554 }
555
556 fn get<K: AsRef<[u8]>>(
557 &self,
558 key: K,
559 seqno: Option<SeqNo>,
560 ) -> crate::Result<Option<crate::UserValue>> {
561 use value::MaybeInlineValue::{Indirect, Inline};
562
563 let key = key.as_ref();
564
565 let Some(value) = self.index.get_vhandle(key, seqno)? else {
566 return Ok(None);
567 };
568
569 match value {
570 Inline(bytes) => Ok(Some(bytes)),
571 Indirect { vhandle, .. } => {
572 match self.blobs.get(&vhandle)? {
574 Some(bytes) => Ok(Some(bytes)),
575 None => {
576 panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
577 }
578 }
579 }
580 }
581 }
582
583 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
584 self.index.remove(key, seqno)
585 }
586
587 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
588 self.index.remove_weak(key, seqno)
589 }
590}