1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::{fsync_directory, BLOBS_FOLDER},
15 iter_guard::{IterGuard, IterGuardImpl},
16 r#abstract::{AbstractTree, RangeItem},
17 table::Table,
18 tree::inner::MemtableId,
19 value::InternalValue,
20 version::Version,
21 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22 Config, Memtable, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28 blob_tree: &'a BlobTree,
29 version: Version,
30 kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34 fn key(self) -> crate::Result<UserKey> {
35 self.kv.map(|kv| kv.key.user_key)
36 }
37
38 fn size(self) -> crate::Result<u32> {
39 let kv = self.kv?;
40
41 if kv.key.value_type.is_indirection() {
42 let mut cursor = Cursor::new(kv.value);
43 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44 } else {
45 #[allow(clippy::cast_possible_truncation)]
47 Ok(kv.value.len() as u32)
48 }
49 }
50
51 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52 resolve_value_handle(self.blob_tree, &self.version, self.kv?)
53 }
54}
55
56fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
57 if item.key.value_type.is_indirection() {
58 let mut cursor = Cursor::new(item.value);
59 let vptr = BlobIndirection::decode_from(&mut cursor)?;
60
61 match Accessor::new(&version.blob_files).get(
63 tree.id(),
64 &tree.blobs_folder,
65 &item.key.user_key,
66 &vptr.vhandle,
67 &tree.index.config.cache,
68 &tree.index.config.descriptor_table,
69 ) {
70 Ok(Some(v)) => {
71 let k = item.key.user_key;
72 Ok((k, v))
73 }
74 Ok(None) => {
75 panic!(
76 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
77 item.key.user_key, vptr.vhandle,
78 version.id(),
79 );
80 }
81 Err(e) => Err(e),
82 }
83 } else {
84 let k = item.key.user_key;
85 let v = item.value;
86 Ok((k, v))
87 }
88}
89
90#[derive(Clone)]
96pub struct BlobTree {
97 #[doc(hidden)]
99 pub index: crate::Tree,
100
101 blobs_folder: PathBuf,
102}
103
104impl BlobTree {
105 pub(crate) fn open(config: Config) -> crate::Result<Self> {
106 let index = crate::Tree::open(config)?;
107
108 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
109 std::fs::create_dir_all(&blobs_folder)?;
110 fsync_directory(&blobs_folder)?;
111
112 let blob_file_id_to_continue_with = index
113 .current_version()
114 .blob_files
115 .list_ids()
116 .max()
117 .map(|x| x + 1)
118 .unwrap_or_default();
119
120 index
121 .0
122 .blob_file_id_generator
123 .set(blob_file_id_to_continue_with);
124
125 Ok(Self {
126 index,
127 blobs_folder,
128 })
129 }
130}
131
132impl AbstractTree for BlobTree {
133 fn next_table_id(&self) -> TableId {
134 self.index.next_table_id()
135 }
136
137 fn id(&self) -> crate::TreeId {
138 self.index.id()
139 }
140
141 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
142 self.index.get_internal_entry(key, seqno)
143 }
144
145 fn current_version(&self) -> Version {
146 self.index.current_version()
147 }
148
149 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
150 let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
151 return Ok(None);
152 };
153
154 let Some((table, blob_file)) =
155 self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
156 else {
157 return Ok(None);
158 };
159 self.register_tables(
160 std::slice::from_ref(&table),
161 blob_file.as_ref().map(std::slice::from_ref),
162 None,
163 eviction_seqno,
164 )?;
165
166 Ok(Some(table))
167 }
168
169 #[cfg(feature = "metrics")]
170 fn metrics(&self) -> &Arc<crate::Metrics> {
171 self.index.metrics()
172 }
173
174 fn version_free_list_len(&self) -> usize {
175 self.index.version_free_list_len()
176 }
177
178 fn prefix<K: AsRef<[u8]>>(
179 &self,
180 prefix: K,
181 seqno: SeqNo,
182 index: Option<Arc<Memtable>>,
183 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
184 use crate::range::prefix_to_range;
185
186 let range = prefix_to_range(prefix.as_ref());
187
188 let version = self.current_version();
189
190 Box::new(
191 self.index
192 .create_internal_range(&range, seqno, index)
193 .map(move |kv| {
194 IterGuardImpl::Blob(Guard {
195 blob_tree: self,
196 version: version.clone(), kv,
198 })
199 }),
200 )
201 }
202
203 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
204 &self,
205 range: R,
206 seqno: SeqNo,
207 index: Option<Arc<Memtable>>,
208 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
209 let version = self.current_version();
210
211 Box::new(
213 self.index
214 .create_internal_range(&range, seqno, index)
215 .map(move |kv| {
216 IterGuardImpl::Blob(Guard {
217 blob_tree: self,
218 version: version.clone(), kv,
220 })
221 }),
222 )
223 }
224
225 fn tombstone_count(&self) -> u64 {
226 self.index.tombstone_count()
227 }
228
229 fn weak_tombstone_count(&self) -> u64 {
230 self.index.weak_tombstone_count()
231 }
232
233 fn weak_tombstone_reclaimable_count(&self) -> u64 {
234 self.index.weak_tombstone_reclaimable_count()
235 }
236
237 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
238 self.index.drop_range(range)
239 }
240
241 fn ingest(
242 &self,
243 iter: impl Iterator<Item = (UserKey, UserValue)>,
244 seqno_generator: &SequenceNumberCounter,
245 visible_seqno: &SequenceNumberCounter,
246 ) -> crate::Result<()> {
247 use crate::tree::ingest::Ingestion;
248 use std::time::Instant;
249
250 todo!();
254
255 Ok(())
315 }
316
317 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
318 self.index.major_compact(target_size, seqno_threshold)
319 }
320
321 fn clear_active_memtable(&self) {
322 self.index.clear_active_memtable();
323 }
324
325 fn l0_run_count(&self) -> usize {
326 self.index.l0_run_count()
327 }
328
329 fn blob_file_count(&self) -> usize {
330 self.current_version().blob_file_count()
331 }
332
333 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
336 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
337 return Ok(None);
338 };
339
340 Ok(Some(if item.key.value_type.is_indirection() {
341 let mut cursor = Cursor::new(item.value);
342 let vptr = BlobIndirection::decode_from(&mut cursor)?;
343 vptr.size
344 } else {
345 #[allow(clippy::cast_possible_truncation)]
347 {
348 item.value.len() as u32
349 }
350 }))
351 }
352
353 fn stale_blob_bytes(&self) -> u64 {
354 self.current_version().gc_stats().stale_bytes()
355 }
356
357 fn filter_size(&self) -> usize {
358 self.index.filter_size()
359 }
360
361 fn pinned_filter_size(&self) -> usize {
362 self.index.pinned_filter_size()
363 }
364
365 fn pinned_block_index_size(&self) -> usize {
366 self.index.pinned_block_index_size()
367 }
368
369 fn sealed_memtable_count(&self) -> usize {
370 self.index.sealed_memtable_count()
371 }
372
373 fn flush_memtable(
374 &self,
375 table_id: TableId,
376 memtable: &Arc<Memtable>,
377 eviction_seqno: SeqNo,
378 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
379 use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
380
381 let table_folder = self.index.config.path.join(TABLES_FOLDER);
382
383 log::debug!("Flushing memtable & performing key-value separation");
384 log::debug!("=> to table in {}", table_folder.display());
385 log::debug!("=> to blob file at {}", self.blobs_folder.display());
386
387 let mut table_writer = TableWriter::new(table_folder.join(table_id.to_string()), table_id)?
388 .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
390 .use_bloom_policy({
391 use crate::config::FilterPolicyEntry::{Bloom, None};
392 use crate::table::filter::BloomConstructionPolicy;
393
394 match self.index.config.filter_policy.get(0) {
395 Bloom(policy) => policy,
396 None => BloomConstructionPolicy::BitsPerKey(0.0),
397 }
398 });
399
400 let mut blob_writer = BlobFileWriter::new(
401 self.index.0.blob_file_id_generator.clone(),
402 u64::MAX, self.index.config.path.join(BLOBS_FOLDER),
404 )?
405 .use_compression(
406 self.index
407 .config
408 .kv_separation_opts
409 .as_ref()
410 .expect("blob options should exist")
411 .compression,
412 );
413
414 let iter = memtable.iter().map(Ok);
415 let compaction_stream = CompactionStream::new(iter, eviction_seqno);
416
417 let mut blob_bytes_referenced = 0;
418 let mut blob_on_disk_bytes_referenced = 0;
419 let mut blobs_referenced_count = 0;
420
421 let separation_threshold = self
422 .index
423 .config
424 .kv_separation_opts
425 .as_ref()
426 .expect("kv separation options should exist")
427 .separation_threshold;
428
429 for item in compaction_stream {
430 let item = item?;
431
432 if item.is_tombstone() {
433 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
436 continue;
437 }
438
439 let value = item.value;
440
441 #[allow(clippy::cast_possible_truncation)]
443 let value_size = value.len() as u32;
444
445 if value_size >= separation_threshold {
446 let offset = blob_writer.offset();
447 let blob_file_id = blob_writer.blob_file_id();
448 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
449
450 let indirection = BlobIndirection {
451 vhandle: ValueHandle {
452 blob_file_id,
453 offset,
454 on_disk_size,
455 },
456 size: value_size,
457 };
458
459 table_writer.write({
460 let mut vptr =
461 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
462 vptr.key.value_type = crate::ValueType::Indirection;
463 vptr
464 })?;
465
466 blob_bytes_referenced += u64::from(value_size);
467 blob_on_disk_bytes_referenced += u64::from(on_disk_size);
468 blobs_referenced_count += 1;
469 } else {
470 table_writer.write(InternalValue::new(item.key, value))?;
471 }
472 }
473
474 log::trace!("Creating blob file");
475 let blob_files = blob_writer.finish()?;
476 assert!(blob_files.len() <= 1);
477 let blob_file = blob_files.into_iter().next();
478
479 log::trace!("Creating LSM-tree table {table_id}");
480
481 if blob_bytes_referenced > 0 {
482 if let Some(blob_file) = &blob_file {
483 table_writer.link_blob_file(
484 blob_file.id(),
485 blobs_referenced_count,
486 blob_bytes_referenced,
487 blob_on_disk_bytes_referenced,
488 );
489 }
490 }
491
492 let table = self.index.consume_writer(table_writer)?;
493
494 Ok(table.map(|table| (table, blob_file)))
495 }
496
497 fn register_tables(
498 &self,
499 tables: &[Table],
500 blob_files: Option<&[BlobFile]>,
501 frag_map: Option<FragmentationMap>,
502 seqno_threshold: SeqNo,
503 ) -> crate::Result<()> {
504 self.index
505 .register_tables(tables, blob_files, frag_map, seqno_threshold)
506 }
507
508 fn set_active_memtable(&self, memtable: Memtable) {
509 self.index.set_active_memtable(memtable);
510 }
511
512 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
513 self.index.add_sealed_memtable(id, memtable);
514 }
515
516 fn compact(
517 &self,
518 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
519 seqno_threshold: SeqNo,
520 ) -> crate::Result<()> {
521 self.index.compact(strategy, seqno_threshold)
522 }
523
524 fn get_next_table_id(&self) -> TableId {
525 self.index.get_next_table_id()
526 }
527
528 fn tree_config(&self) -> &Config {
529 &self.index.config
530 }
531
532 fn get_highest_seqno(&self) -> Option<SeqNo> {
533 self.index.get_highest_seqno()
534 }
535
536 fn active_memtable_size(&self) -> u64 {
537 self.index.active_memtable_size()
538 }
539
540 fn tree_type(&self) -> crate::TreeType {
541 crate::TreeType::Blob
542 }
543
544 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
545 self.index.rotate_memtable()
546 }
547
548 fn table_count(&self) -> usize {
549 self.index.table_count()
550 }
551
552 fn level_table_count(&self, idx: usize) -> Option<usize> {
553 self.index.level_table_count(idx)
554 }
555
556 fn approximate_len(&self) -> usize {
557 self.index.approximate_len()
558 }
559
560 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
563 self.index.is_empty(seqno, index)
564 }
565
566 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
569 self.index.contains_key(key, seqno)
570 }
571
572 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
575 self.index.len(seqno, index)
576 }
577
578 fn disk_space(&self) -> u64 {
579 let version = self.current_version();
580 self.index.disk_space() + version.blob_files.on_disk_size()
581 }
582
583 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
584 self.index.get_highest_memtable_seqno()
585 }
586
587 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
588 self.index.get_highest_persisted_seqno()
589 }
590
591 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
592 &self,
593 key: K,
594 value: V,
595 seqno: SeqNo,
596 ) -> (u64, u64) {
597 self.index.insert(key, value.into(), seqno)
598 }
599
600 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
601 let key = key.as_ref();
602
603 let Some(item) = self.index.get_internal_entry(key, seqno)? else {
607 return Ok(None);
608 };
609
610 let version = self.current_version();
611 let (_, v) = resolve_value_handle(self, &version, item)?;
612
613 Ok(Some(v))
614 }
615
616 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
617 self.index.remove(key, seqno)
618 }
619
620 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
621 self.index.remove_weak(key, seqno)
622 }
623}