1mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12 coding::{Decode, Encode},
13 compaction::stream::CompactionStream,
14 file::{fsync_directory, BLOBS_FOLDER},
15 iter_guard::{IterGuard, IterGuardImpl},
16 r#abstract::{AbstractTree, RangeItem},
17 table::Table,
18 tree::inner::MemtableId,
19 value::InternalValue,
20 version::Version,
21 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22 Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
23 UserKey, UserValue,
24};
25use handle::BlobIndirection;
26use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
27
28pub struct Guard {
29 tree: crate::BlobTree,
30 version: Version,
31 kv: crate::Result<InternalValue>,
32}
33
34impl IterGuard for Guard {
35 fn key(self) -> crate::Result<UserKey> {
36 self.kv.map(|kv| kv.key.user_key)
37 }
38
39 fn size(self) -> crate::Result<u32> {
40 let kv = self.kv?;
41
42 if kv.key.value_type.is_indirection() {
43 let mut cursor = Cursor::new(kv.value);
44 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
45 } else {
46 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
47 Ok(kv.value.len() as u32)
48 }
49 }
50
51 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52 resolve_value_handle(
53 self.tree.id(),
54 self.tree.blobs_folder.as_path(),
55 &self.tree.index.config.cache,
56 &self.tree.index.config.descriptor_table,
57 &self.version,
58 self.kv?,
59 )
60 }
61}
62
63fn resolve_value_handle(
64 tree_id: TreeId,
65 blobs_folder: &std::path::Path,
66 cache: &Arc<Cache>,
67 descriptor_table: &Arc<DescriptorTable>,
68 version: &Version,
69 item: InternalValue,
70) -> RangeItem {
71 if item.key.value_type.is_indirection() {
72 let mut cursor = Cursor::new(item.value);
73 let vptr = BlobIndirection::decode_from(&mut cursor)?;
74
75 match Accessor::new(&version.blob_files).get(
77 tree_id,
78 blobs_folder,
79 &item.key.user_key,
80 &vptr.vhandle,
81 cache,
82 descriptor_table,
83 ) {
84 Ok(Some(v)) => {
85 let k = item.key.user_key;
86 Ok((k, v))
87 }
88 Ok(None) => {
89 panic!(
90 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
91 item.key.user_key, vptr.vhandle,
92 version.id(),
93 );
94 }
95 Err(e) => Err(e),
96 }
97 } else {
98 let k = item.key.user_key;
99 let v = item.value;
100 Ok((k, v))
101 }
102}
103
104#[derive(Clone)]
110pub struct BlobTree {
111 #[doc(hidden)]
113 pub index: crate::Tree,
114
115 blobs_folder: Arc<PathBuf>,
116}
117
118impl BlobTree {
119 pub(crate) fn open(config: Config) -> crate::Result<Self> {
120 let index = crate::Tree::open(config)?;
121
122 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
123 std::fs::create_dir_all(&blobs_folder)?;
124 fsync_directory(&blobs_folder)?;
125
126 let blob_file_id_to_continue_with = index
127 .current_version()
128 .blob_files
129 .list_ids()
130 .max()
131 .map(|x| x + 1)
132 .unwrap_or_default();
133
134 index
135 .0
136 .blob_file_id_generator
137 .set(blob_file_id_to_continue_with);
138
139 Ok(Self {
140 index,
141 blobs_folder: Arc::new(blobs_folder),
142 })
143 }
144}
145
146impl AbstractTree for BlobTree {
147 fn next_table_id(&self) -> TableId {
148 self.index.next_table_id()
149 }
150
151 fn id(&self) -> crate::TreeId {
152 self.index.id()
153 }
154
155 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
156 self.index.get_internal_entry(key, seqno)
157 }
158
159 fn current_version(&self) -> Version {
160 self.index.current_version()
161 }
162
163 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
164 let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
165 return Ok(None);
166 };
167
168 let Some((table, blob_file)) =
169 self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
170 else {
171 return Ok(None);
172 };
173 self.register_tables(
174 std::slice::from_ref(&table),
175 blob_file.as_ref().map(std::slice::from_ref),
176 None,
177 )?;
178
179 Ok(Some(table))
180 }
181
182 #[cfg(feature = "metrics")]
183 fn metrics(&self) -> &Arc<crate::Metrics> {
184 self.index.metrics()
185 }
186
187 fn version_free_list_len(&self) -> usize {
188 self.index.version_free_list_len()
189 }
190
191 fn prefix<K: AsRef<[u8]>>(
192 &self,
193 prefix: K,
194 seqno: SeqNo,
195 index: Option<Arc<Memtable>>,
196 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
197 use crate::range::prefix_to_range;
198
199 let range = prefix_to_range(prefix.as_ref());
200 let version = self.index.get_version_for_snapshot(seqno).version;
201 let tree = self.clone();
202
203 Box::new(
204 self.index
205 .create_internal_range(&range, seqno, index)
206 .map(move |kv| {
207 IterGuardImpl::Blob(Guard {
208 tree: tree.clone(),
209 version: version.clone(),
210 kv,
211 })
212 }),
213 )
214 }
215
216 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
217 &self,
218 range: R,
219 seqno: SeqNo,
220 index: Option<Arc<Memtable>>,
221 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
222 let version = self.index.get_version_for_snapshot(seqno);
223 let tree = self.clone();
224
225 Box::new(
226 self.index
227 .create_internal_range(&range, seqno, index)
228 .map(move |kv| {
229 IterGuardImpl::Blob(Guard {
230 tree: tree.clone(),
231 version: version.clone().version,
232 kv,
233 })
234 }),
235 )
236 }
237
238 fn tombstone_count(&self) -> u64 {
239 self.index.tombstone_count()
240 }
241
242 fn weak_tombstone_count(&self) -> u64 {
243 self.index.weak_tombstone_count()
244 }
245
246 fn weak_tombstone_reclaimable_count(&self) -> u64 {
247 self.index.weak_tombstone_reclaimable_count()
248 }
249
250 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
251 self.index.drop_range(range)
252 }
253
254 fn ingest(
255 &self,
256 iter: impl Iterator<Item = (UserKey, UserValue)>,
257 seqno_generator: &SequenceNumberCounter,
258 visible_seqno: &SequenceNumberCounter,
259 ) -> crate::Result<()> {
260 use crate::{compaction::MoveDown, tree::ingest::Ingestion};
261 use std::time::Instant;
262
263 let seqno = seqno_generator.next();
264
265 let blob_file_size = self
266 .index
267 .config
268 .kv_separation_opts
269 .as_ref()
270 .expect("kv separation options should exist")
271 .file_target_size;
272
273 let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
274 let mut blob_writer = BlobFileWriter::new(
275 self.index.0.blob_file_id_generator.clone(),
276 blob_file_size,
277 self.index.config.path.join(BLOBS_FOLDER),
278 )?
279 .use_compression(
280 self.index
281 .config
282 .kv_separation_opts
283 .as_ref()
284 .expect("blob options should exist")
285 .compression,
286 );
287
288 let start = Instant::now();
289 let mut count = 0;
290 let mut last_key = None;
291
292 let separation_threshold = self
293 .index
294 .config
295 .kv_separation_opts
296 .as_ref()
297 .expect("kv separation options should exist")
298 .separation_threshold;
299
300 for (key, value) in iter {
301 if let Some(last_key) = &last_key {
302 assert!(
303 key > last_key,
304 "next key in bulk ingest was not greater than last key",
305 );
306 }
307 last_key = Some(key.clone());
308
309 #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
310 let value_size = value.len() as u32;
311
312 if value_size >= separation_threshold {
313 let offset = blob_writer.offset();
314 let blob_file_id = blob_writer.blob_file_id();
315 let on_disk_size = blob_writer.write(&key, seqno, &value)?;
316
317 let indirection = BlobIndirection {
318 vhandle: ValueHandle {
319 blob_file_id,
320 offset,
321 on_disk_size,
322 },
323 size: value_size,
324 };
325
326 table_writer.write_indirection(key, indirection)?;
327 } else {
328 table_writer.write(key, value)?;
329 }
330
331 count += 1;
332 }
333
334 let blob_files = blob_writer.finish()?;
335 let results = table_writer.writer.finish()?;
336
337 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
338 let pin_index = self.index.config.filter_block_pinning_policy.get(0);
339
340 let created_tables = results
341 .into_iter()
342 .map(|(table_id, checksum)| -> crate::Result<Table> {
343 Table::recover(
344 self.index
345 .config
346 .path
347 .join(crate::file::TABLES_FOLDER)
348 .join(table_id.to_string()),
349 checksum,
350 self.index.id,
351 self.index.config.cache.clone(),
352 self.index.config.descriptor_table.clone(),
353 pin_filter,
354 pin_index,
355 #[cfg(feature = "metrics")]
356 self.index.metrics.clone(),
357 )
358 })
359 .collect::<crate::Result<Vec<_>>>()?;
360
361 self.register_tables(&created_tables, Some(&blob_files), None)?;
362
363 let last_level_idx = self.index.config.level_count - 1;
364
365 self.compact(Arc::new(MoveDown(0, last_level_idx)), 0)?;
366
367 visible_seqno.fetch_max(seqno + 1);
368
369 log::info!("Ingested {count} items in {:?}", start.elapsed());
370
371 Ok(())
372 }
373
374 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
375 self.index.major_compact(target_size, seqno_threshold)
376 }
377
378 fn clear_active_memtable(&self) {
379 self.index.clear_active_memtable();
380 }
381
382 fn l0_run_count(&self) -> usize {
383 self.index.l0_run_count()
384 }
385
386 fn blob_file_count(&self) -> usize {
387 self.current_version().blob_file_count()
388 }
389
390 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
393 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
394 return Ok(None);
395 };
396
397 Ok(Some(if item.key.value_type.is_indirection() {
398 let mut cursor = Cursor::new(item.value);
399 let vptr = BlobIndirection::decode_from(&mut cursor)?;
400 vptr.size
401 } else {
402 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
403 {
404 item.value.len() as u32
405 }
406 }))
407 }
408
409 fn stale_blob_bytes(&self) -> u64 {
410 self.current_version().gc_stats().stale_bytes()
411 }
412
413 fn filter_size(&self) -> usize {
414 self.index.filter_size()
415 }
416
417 fn pinned_filter_size(&self) -> usize {
418 self.index.pinned_filter_size()
419 }
420
421 fn pinned_block_index_size(&self) -> usize {
422 self.index.pinned_block_index_size()
423 }
424
425 fn sealed_memtable_count(&self) -> usize {
426 self.index.sealed_memtable_count()
427 }
428
429 fn flush_memtable(
430 &self,
431 table_id: TableId,
432 memtable: &Arc<Memtable>,
433 eviction_seqno: SeqNo,
434 ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
435 use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
436
437 let table_folder = self.index.config.path.join(TABLES_FOLDER);
438
439 log::debug!("Flushing memtable & performing key-value separation");
440 log::debug!("=> to table in {}", table_folder.display());
441 log::debug!("=> to blob file at {}", self.blobs_folder.display());
442
443 let mut table_writer =
444 TableWriter::new(table_folder.join(table_id.to_string()), table_id, 0)?
445 .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
447 .use_bloom_policy({
448 use crate::config::FilterPolicyEntry::{Bloom, None};
449 use crate::table::filter::BloomConstructionPolicy;
450
451 match self.index.config.filter_policy.get(0) {
452 Bloom(policy) => policy,
453 None => BloomConstructionPolicy::BitsPerKey(0.0),
454 }
455 });
456
457 let mut blob_writer = BlobFileWriter::new(
458 self.index.0.blob_file_id_generator.clone(),
459 u64::MAX,
460 self.index.config.path.join(BLOBS_FOLDER),
461 )?
462 .use_compression(
463 self.index
464 .config
465 .kv_separation_opts
466 .as_ref()
467 .expect("blob options should exist")
468 .compression,
469 );
470
471 let iter = memtable.iter().map(Ok);
472 let compaction_stream = CompactionStream::new(iter, eviction_seqno);
473
474 let mut blob_bytes_referenced = 0;
475 let mut blob_on_disk_bytes_referenced = 0;
476 let mut blobs_referenced_count = 0;
477
478 let separation_threshold = self
479 .index
480 .config
481 .kv_separation_opts
482 .as_ref()
483 .expect("kv separation options should exist")
484 .separation_threshold;
485
486 for item in compaction_stream {
487 let item = item?;
488
489 if item.is_tombstone() {
490 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
493 continue;
494 }
495
496 let value = item.value;
497
498 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
499 let value_size = value.len() as u32;
500
501 if value_size >= separation_threshold {
502 let offset = blob_writer.offset();
503 let blob_file_id = blob_writer.blob_file_id();
504 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
505
506 let indirection = BlobIndirection {
507 vhandle: ValueHandle {
508 blob_file_id,
509 offset,
510 on_disk_size,
511 },
512 size: value_size,
513 };
514
515 table_writer.write({
516 let mut vptr =
517 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
518 vptr.key.value_type = crate::ValueType::Indirection;
519 vptr
520 })?;
521
522 blob_bytes_referenced += u64::from(value_size);
523 blob_on_disk_bytes_referenced += u64::from(on_disk_size);
524 blobs_referenced_count += 1;
525 } else {
526 table_writer.write(InternalValue::new(item.key, value))?;
527 }
528 }
529
530 log::trace!("Creating blob file");
531 let blob_files = blob_writer.finish()?;
532 assert!(blob_files.len() <= 1);
533 let blob_file = blob_files.into_iter().next();
534
535 log::trace!("Creating LSM-tree table {table_id}");
536
537 if blob_bytes_referenced > 0 {
538 if let Some(blob_file) = &blob_file {
539 table_writer.link_blob_file(
540 blob_file.id(),
541 blobs_referenced_count,
542 blob_bytes_referenced,
543 blob_on_disk_bytes_referenced,
544 );
545 }
546 }
547
548 let table = self.index.consume_writer(table_writer)?;
549
550 Ok(table.map(|table| (table, blob_file)))
551 }
552
553 fn register_tables(
554 &self,
555 tables: &[Table],
556 blob_files: Option<&[BlobFile]>,
557 frag_map: Option<FragmentationMap>,
558 ) -> crate::Result<()> {
559 self.index.register_tables(tables, blob_files, frag_map)
560 }
561
562 fn set_active_memtable(&self, memtable: Memtable) {
563 self.index.set_active_memtable(memtable);
564 }
565
566 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
567 self.index.add_sealed_memtable(id, memtable);
568 }
569
570 fn compact(
571 &self,
572 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
573 seqno_threshold: SeqNo,
574 ) -> crate::Result<()> {
575 self.index.compact(strategy, seqno_threshold)
576 }
577
578 fn get_next_table_id(&self) -> TableId {
579 self.index.get_next_table_id()
580 }
581
582 fn tree_config(&self) -> &Config {
583 &self.index.config
584 }
585
586 fn get_highest_seqno(&self) -> Option<SeqNo> {
587 self.index.get_highest_seqno()
588 }
589
590 fn active_memtable_size(&self) -> u64 {
591 self.index.active_memtable_size()
592 }
593
594 fn tree_type(&self) -> crate::TreeType {
595 crate::TreeType::Blob
596 }
597
598 fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
599 self.index.rotate_memtable()
600 }
601
602 fn table_count(&self) -> usize {
603 self.index.table_count()
604 }
605
606 fn level_table_count(&self, idx: usize) -> Option<usize> {
607 self.index.level_table_count(idx)
608 }
609
610 fn approximate_len(&self) -> usize {
611 self.index.approximate_len()
612 }
613
614 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
617 self.index.is_empty(seqno, index)
618 }
619
620 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
623 self.index.contains_key(key, seqno)
624 }
625
626 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
629 self.index.len(seqno, index)
630 }
631
632 fn disk_space(&self) -> u64 {
633 let version = self.current_version();
634 self.index.disk_space() + version.blob_files.on_disk_size()
635 }
636
637 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
638 self.index.get_highest_memtable_seqno()
639 }
640
641 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
642 self.index.get_highest_persisted_seqno()
643 }
644
645 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
646 &self,
647 key: K,
648 value: V,
649 seqno: SeqNo,
650 ) -> (u64, u64) {
651 self.index.insert(key, value.into(), seqno)
652 }
653
654 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
655 let key = key.as_ref();
656
657 let Some(item) = self.index.get_internal_entry(key, seqno)? else {
658 return Ok(None);
659 };
660
661 let version = self.index.get_version_for_snapshot(seqno).version;
662
663 let (_, v) = resolve_value_handle(
664 self.id(),
665 self.blobs_folder.as_path(),
666 &self.index.config.cache,
667 &self.index.config.descriptor_table,
668 &version,
669 item,
670 )?;
671
672 Ok(Some(v))
673 }
674
675 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
676 self.index.remove(key, seqno)
677 }
678
679 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
680 self.index.remove_weak(key, seqno)
681 }
682}