1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::{fsync_directory, BLOBS_FOLDER},
15 iter_guard::{IterGuard, IterGuardImpl},
16 r#abstract::{AbstractTree, RangeItem},
17 table::Table,
18 tree::inner::MemtableId,
19 value::InternalValue,
20 version::Version,
21 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22 Config, Memtable, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28 blob_tree: &'a BlobTree,
29 version: Version,
30 kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34 fn key(self) -> crate::Result<UserKey> {
35 self.kv.map(|kv| kv.key.user_key)
36 }
37
38 fn size(self) -> crate::Result<u32> {
39 let kv = self.kv?;
40
41 if kv.key.value_type.is_indirection() {
42 let mut cursor = Cursor::new(kv.value);
43 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44 } else {
45 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
46 Ok(kv.value.len() as u32)
47 }
48 }
49
50 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
51 resolve_value_handle(self.blob_tree, &self.version, self.kv?)
52 }
53}
54
55fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
56 if item.key.value_type.is_indirection() {
57 let mut cursor = Cursor::new(item.value);
58 let vptr = BlobIndirection::decode_from(&mut cursor)?;
59
60 match Accessor::new(&version.blob_files).get(
62 tree.id(),
63 &tree.blobs_folder,
64 &item.key.user_key,
65 &vptr.vhandle,
66 &tree.index.config.cache,
67 &tree.index.config.descriptor_table,
68 ) {
69 Ok(Some(v)) => {
70 let k = item.key.user_key;
71 Ok((k, v))
72 }
73 Ok(None) => {
74 panic!(
75 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
76 item.key.user_key, vptr.vhandle,
77 version.id(),
78 );
79 }
80 Err(e) => Err(e),
81 }
82 } else {
83 let k = item.key.user_key;
84 let v = item.value;
85 Ok((k, v))
86 }
87}
88
89#[derive(Clone)]
95pub struct BlobTree {
96 #[doc(hidden)]
98 pub index: crate::Tree,
99
100 blobs_folder: PathBuf,
101}
102
103impl BlobTree {
104 pub(crate) fn open(config: Config) -> crate::Result<Self> {
105 let index = crate::Tree::open(config)?;
106
107 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
108 std::fs::create_dir_all(&blobs_folder)?;
109 fsync_directory(&blobs_folder)?;
110
111 let blob_file_id_to_continue_with = index
112 .current_version()
113 .blob_files
114 .list_ids()
115 .max()
116 .map(|x| x + 1)
117 .unwrap_or_default();
118
119 index
120 .0
121 .blob_file_id_generator
122 .set(blob_file_id_to_continue_with);
123
124 Ok(Self {
125 index,
126 blobs_folder,
127 })
128 }
129}
130
131impl AbstractTree for BlobTree {
132 fn next_table_id(&self) -> TableId {
133 self.index.next_table_id()
134 }
135
136 fn id(&self) -> crate::TreeId {
137 self.index.id()
138 }
139
140 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
141 self.index.get_internal_entry(key, seqno)
142 }
143
144 fn current_version(&self) -> Version {
145 self.index.current_version()
146 }
147
148 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
149 let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
150 return Ok(None);
151 };
152
153 let Some((table, blob_file)) =
154 self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
155 else {
156 return Ok(None);
157 };
158 self.register_tables(
159 std::slice::from_ref(&table),
160 blob_file.as_ref().map(std::slice::from_ref),
161 None,
162 eviction_seqno,
163 )?;
164
165 Ok(Some(table))
166 }
167
168 #[cfg(feature = "metrics")]
169 fn metrics(&self) -> &Arc<crate::Metrics> {
170 self.index.metrics()
171 }
172
173 fn version_free_list_len(&self) -> usize {
174 self.index.version_free_list_len()
175 }
176
177 fn prefix<K: AsRef<[u8]>>(
178 &self,
179 prefix: K,
180 seqno: SeqNo,
181 index: Option<Arc<Memtable>>,
182 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
183 use crate::range::prefix_to_range;
184
185 let range = prefix_to_range(prefix.as_ref());
186
187 let version = self.current_version();
188
189 Box::new(
190 self.index
191 .create_internal_range(&range, seqno, index)
192 .map(move |kv| {
193 IterGuardImpl::Blob(Guard {
194 blob_tree: self,
195 version: version.clone(), kv,
197 })
198 }),
199 )
200 }
201
202 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
203 &self,
204 range: R,
205 seqno: SeqNo,
206 index: Option<Arc<Memtable>>,
207 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
208 let version = self.current_version();
209
210 Box::new(
212 self.index
213 .create_internal_range(&range, seqno, index)
214 .map(move |kv| {
215 IterGuardImpl::Blob(Guard {
216 blob_tree: self,
217 version: version.clone(), kv,
219 })
220 }),
221 )
222 }
223
224 fn tombstone_count(&self) -> u64 {
225 self.index.tombstone_count()
226 }
227
228 fn weak_tombstone_count(&self) -> u64 {
229 self.index.weak_tombstone_count()
230 }
231
232 fn weak_tombstone_reclaimable_count(&self) -> u64 {
233 self.index.weak_tombstone_reclaimable_count()
234 }
235
236 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
237 self.index.drop_range(range)
238 }
239
240 fn ingest(
241 &self,
242 iter: impl Iterator<Item = (UserKey, UserValue)>,
243 seqno_generator: &SequenceNumberCounter,
244 visible_seqno: &SequenceNumberCounter,
245 ) -> crate::Result<()> {
246 use crate::tree::ingest::Ingestion;
247 use std::time::Instant;
248
249 todo!();
253
254 Ok(())
314 }
315
316 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
317 self.index.major_compact(target_size, seqno_threshold)
318 }
319
320 fn clear_active_memtable(&self) {
321 self.index.clear_active_memtable();
322 }
323
324 fn l0_run_count(&self) -> usize {
325 self.index.l0_run_count()
326 }
327
328 fn blob_file_count(&self) -> usize {
329 self.current_version().blob_file_count()
330 }
331
332 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
335 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
336 return Ok(None);
337 };
338
339 Ok(Some(if item.key.value_type.is_indirection() {
340 let mut cursor = Cursor::new(item.value);
341 let vptr = BlobIndirection::decode_from(&mut cursor)?;
342 vptr.size
343 } else {
344 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
345 {
346 item.value.len() as u32
347 }
348 }))
349 }
350
351 fn stale_blob_bytes(&self) -> u64 {
352 self.current_version().gc_stats().stale_bytes()
353 }
354
355 fn filter_size(&self) -> usize {
356 self.index.filter_size()
357 }
358
359 fn pinned_filter_size(&self) -> usize {
360 self.index.pinned_filter_size()
361 }
362
363 fn pinned_block_index_size(&self) -> usize {
364 self.index.pinned_block_index_size()
365 }
366
367 fn sealed_memtable_count(&self) -> usize {
368 self.index.sealed_memtable_count()
369 }
370
371 fn flush_memtable(
372 &self,
373 table_id: TableId,
374 memtable: &Arc<Memtable>,
375 eviction_seqno: SeqNo,
376 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
377 use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
378
379 let table_folder = self.index.config.path.join(TABLES_FOLDER);
380
381 log::debug!("Flushing memtable & performing key-value separation");
382 log::debug!("=> to table in {}", table_folder.display());
383 log::debug!("=> to blob file at {}", self.blobs_folder.display());
384
385 let mut table_writer =
386 TableWriter::new(table_folder.join(table_id.to_string()), table_id, 0)?
387 .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
389 .use_bloom_policy({
390 use crate::config::FilterPolicyEntry::{Bloom, None};
391 use crate::table::filter::BloomConstructionPolicy;
392
393 match self.index.config.filter_policy.get(0) {
394 Bloom(policy) => policy,
395 None => BloomConstructionPolicy::BitsPerKey(0.0),
396 }
397 });
398
399 let mut blob_writer = BlobFileWriter::new(
400 self.index.0.blob_file_id_generator.clone(),
401 u64::MAX, self.index.config.path.join(BLOBS_FOLDER),
403 )?
404 .use_compression(
405 self.index
406 .config
407 .kv_separation_opts
408 .as_ref()
409 .expect("blob options should exist")
410 .compression,
411 );
412
413 let iter = memtable.iter().map(Ok);
414 let compaction_stream = CompactionStream::new(iter, eviction_seqno);
415
416 let mut blob_bytes_referenced = 0;
417 let mut blob_on_disk_bytes_referenced = 0;
418 let mut blobs_referenced_count = 0;
419
420 let separation_threshold = self
421 .index
422 .config
423 .kv_separation_opts
424 .as_ref()
425 .expect("kv separation options should exist")
426 .separation_threshold;
427
428 for item in compaction_stream {
429 let item = item?;
430
431 if item.is_tombstone() {
432 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
435 continue;
436 }
437
438 let value = item.value;
439
440 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
441 let value_size = value.len() as u32;
442
443 if value_size >= separation_threshold {
444 let offset = blob_writer.offset();
445 let blob_file_id = blob_writer.blob_file_id();
446 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
447
448 let indirection = BlobIndirection {
449 vhandle: ValueHandle {
450 blob_file_id,
451 offset,
452 on_disk_size,
453 },
454 size: value_size,
455 };
456
457 table_writer.write({
458 let mut vptr =
459 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
460 vptr.key.value_type = crate::ValueType::Indirection;
461 vptr
462 })?;
463
464 blob_bytes_referenced += u64::from(value_size);
465 blob_on_disk_bytes_referenced += u64::from(on_disk_size);
466 blobs_referenced_count += 1;
467 } else {
468 table_writer.write(InternalValue::new(item.key, value))?;
469 }
470 }
471
472 log::trace!("Creating blob file");
473 let blob_files = blob_writer.finish()?;
474 assert!(blob_files.len() <= 1);
475 let blob_file = blob_files.into_iter().next();
476
477 log::trace!("Creating LSM-tree table {table_id}");
478
479 if blob_bytes_referenced > 0 {
480 if let Some(blob_file) = &blob_file {
481 table_writer.link_blob_file(
482 blob_file.id(),
483 blobs_referenced_count,
484 blob_bytes_referenced,
485 blob_on_disk_bytes_referenced,
486 );
487 }
488 }
489
490 let table = self.index.consume_writer(table_writer)?;
491
492 Ok(table.map(|table| (table, blob_file)))
493 }
494
495 fn register_tables(
496 &self,
497 tables: &[Table],
498 blob_files: Option<&[BlobFile]>,
499 frag_map: Option<FragmentationMap>,
500 seqno_threshold: SeqNo,
501 ) -> crate::Result<()> {
502 self.index
503 .register_tables(tables, blob_files, frag_map, seqno_threshold)
504 }
505
506 fn set_active_memtable(&self, memtable: Memtable) {
507 self.index.set_active_memtable(memtable);
508 }
509
510 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
511 self.index.add_sealed_memtable(id, memtable);
512 }
513
514 fn compact(
515 &self,
516 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
517 seqno_threshold: SeqNo,
518 ) -> crate::Result<()> {
519 self.index.compact(strategy, seqno_threshold)
520 }
521
522 fn get_next_table_id(&self) -> TableId {
523 self.index.get_next_table_id()
524 }
525
526 fn tree_config(&self) -> &Config {
527 &self.index.config
528 }
529
530 fn get_highest_seqno(&self) -> Option<SeqNo> {
531 self.index.get_highest_seqno()
532 }
533
534 fn active_memtable_size(&self) -> u64 {
535 self.index.active_memtable_size()
536 }
537
538 fn tree_type(&self) -> crate::TreeType {
539 crate::TreeType::Blob
540 }
541
542 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
543 self.index.rotate_memtable()
544 }
545
546 fn table_count(&self) -> usize {
547 self.index.table_count()
548 }
549
550 fn level_table_count(&self, idx: usize) -> Option<usize> {
551 self.index.level_table_count(idx)
552 }
553
554 fn approximate_len(&self) -> usize {
555 self.index.approximate_len()
556 }
557
558 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
561 self.index.is_empty(seqno, index)
562 }
563
564 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
567 self.index.contains_key(key, seqno)
568 }
569
570 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
573 self.index.len(seqno, index)
574 }
575
576 fn disk_space(&self) -> u64 {
577 let version = self.current_version();
578 self.index.disk_space() + version.blob_files.on_disk_size()
579 }
580
581 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
582 self.index.get_highest_memtable_seqno()
583 }
584
585 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
586 self.index.get_highest_persisted_seqno()
587 }
588
589 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
590 &self,
591 key: K,
592 value: V,
593 seqno: SeqNo,
594 ) -> (u64, u64) {
595 self.index.insert(key, value.into(), seqno)
596 }
597
598 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
599 let key = key.as_ref();
600
601 let Some(item) = self.index.get_internal_entry(key, seqno)? else {
605 return Ok(None);
606 };
607
608 let version = self.current_version();
609 let (_, v) = resolve_value_handle(self, &version, item)?;
610
611 Ok(Some(v))
612 }
613
614 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
615 self.index.remove(key, seqno)
616 }
617
618 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
619 self.index.remove_weak(key, seqno)
620 }
621}