1mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::{
13 coding::Decode,
14 iter_guard::{IterGuard, IterGuardImpl},
15 r#abstract::{AbstractTree, RangeItem},
16 table::Table,
17 tree::inner::MemtableId,
18 value::InternalValue,
19 version::Version,
20 vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21 Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue,
22};
23use handle::BlobIndirection;
24use std::{
25 ops::RangeBounds,
26 path::{Path, PathBuf},
27 sync::{Arc, MutexGuard},
28};
29
30pub struct Guard {
32 tree: crate::BlobTree,
33 version: Version,
34 kv: crate::Result<InternalValue>,
35}
36
37impl IterGuard for Guard {
38 fn into_inner_if(
39 self,
40 pred: impl Fn(&UserKey) -> bool,
41 ) -> crate::Result<(UserKey, Option<UserValue>)> {
42 let kv = self.kv?;
43
44 if pred(&kv.key.user_key) {
45 resolve_value_handle(
46 self.tree.id(),
47 self.tree.blobs_folder.as_path(),
48 &self.tree.index.config.cache,
49 &self.version,
50 kv,
51 )
52 .map(|(k, v)| (k, Some(v)))
53 } else {
54 Ok((kv.key.user_key, None))
55 }
56 }
57
58 fn key(self) -> crate::Result<UserKey> {
59 self.kv.map(|kv| kv.key.user_key)
60 }
61
62 fn size(self) -> crate::Result<u32> {
63 let kv = self.kv?;
64
65 if kv.key.value_type.is_indirection() {
66 let mut cursor = std::io::Cursor::new(kv.value);
67 Ok(BlobIndirection::decode_from(&mut cursor)?.size)
68 } else {
69 #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
70 Ok(kv.value.len() as u32)
71 }
72 }
73
74 fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
75 resolve_value_handle(
76 self.tree.id(),
77 self.tree.blobs_folder.as_path(),
78 &self.tree.index.config.cache,
79 &self.version,
80 self.kv?,
81 )
82 }
83}
84
85fn resolve_value_handle(
86 tree_id: TreeId,
87 blobs_folder: &Path,
88 cache: &Cache,
89 version: &Version,
90 item: InternalValue,
91) -> RangeItem {
92 if item.key.value_type.is_indirection() {
93 let mut cursor = std::io::Cursor::new(item.value);
94 let vptr = BlobIndirection::decode_from(&mut cursor)?;
95
96 match Accessor::new(&version.blob_files).get(
98 tree_id,
99 blobs_folder,
100 &item.key.user_key,
101 &vptr.vhandle,
102 cache,
103 ) {
104 Ok(Some(v)) => {
105 let k = item.key.user_key;
106 Ok((k, v))
107 }
108 Ok(None) => {
109 panic!(
110 "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
111 item.key.user_key, vptr.vhandle,
112 version.id(),
113 );
114 }
115 Err(e) => Err(e),
116 }
117 } else {
118 let k = item.key.user_key;
119 let v = item.value;
120 Ok((k, v))
121 }
122}
123
124#[derive(Clone)]
130pub struct BlobTree {
131 #[doc(hidden)]
133 pub index: crate::Tree,
134
135 blobs_folder: Arc<PathBuf>,
136}
137
138impl BlobTree {
139 pub(crate) fn open(config: Config) -> crate::Result<Self> {
140 use crate::file::{fsync_directory, BLOBS_FOLDER};
141
142 let index = crate::Tree::open(config)?;
143
144 let blobs_folder = index.config.path.join(BLOBS_FOLDER);
145 std::fs::create_dir_all(&blobs_folder)?;
146 fsync_directory(&blobs_folder)?;
147
148 let blob_file_id_to_continue_with = index
149 .current_version()
150 .blob_files
151 .list_ids()
152 .max()
153 .map(|x| x + 1)
154 .unwrap_or_default();
155
156 index
157 .0
158 .blob_file_id_counter
159 .set(blob_file_id_to_continue_with);
160
161 Ok(Self {
162 index,
163 blobs_folder: Arc::new(blobs_folder),
164 })
165 }
166}
167
168impl AbstractTree for BlobTree {
169 fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
170 self.index.print_trace(key)
171 }
172
173 fn table_file_cache_size(&self) -> usize {
174 self.index.table_file_cache_size()
175 }
176
177 fn get_version_history_lock(
178 &self,
179 ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
180 self.index.get_version_history_lock()
181 }
182
183 fn next_table_id(&self) -> TableId {
184 self.index.next_table_id()
185 }
186
187 fn id(&self) -> crate::TreeId {
188 self.index.id()
189 }
190
191 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
192 self.index.get_internal_entry(key, seqno)
193 }
194
195 fn current_version(&self) -> Version {
196 self.index.current_version()
197 }
198
199 #[cfg(feature = "metrics")]
200 fn metrics(&self) -> &Arc<crate::Metrics> {
201 self.index.metrics()
202 }
203
204 fn version_free_list_len(&self) -> usize {
205 self.index.version_free_list_len()
206 }
207
208 fn prefix<K: AsRef<[u8]>>(
209 &self,
210 prefix: K,
211 seqno: SeqNo,
212 index: Option<(Arc<Memtable>, SeqNo)>,
213 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
214 use crate::range::prefix_to_range;
215
216 let super_version = self.index.get_version_for_snapshot(seqno);
217 let tree = self.clone();
218
219 let range = prefix_to_range(prefix.as_ref());
220
221 Box::new(
222 crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
223 move |kv| {
224 IterGuardImpl::Blob(Guard {
225 tree: tree.clone(),
226 version: super_version.version.clone(),
227 kv,
228 })
229 },
230 ),
231 )
232 }
233
234 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
235 &self,
236 range: R,
237 seqno: SeqNo,
238 index: Option<(Arc<Memtable>, SeqNo)>,
239 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
240 let super_version = self.index.get_version_for_snapshot(seqno);
241 let tree = self.clone();
242
243 Box::new(
244 crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
245 move |kv| {
246 IterGuardImpl::Blob(Guard {
247 tree: tree.clone(),
248 version: super_version.version.clone(),
249 kv,
250 })
251 },
252 ),
253 )
254 }
255
256 fn tombstone_count(&self) -> u64 {
257 self.index.tombstone_count()
258 }
259
260 fn weak_tombstone_count(&self) -> u64 {
261 self.index.weak_tombstone_count()
262 }
263
264 fn weak_tombstone_reclaimable_count(&self) -> u64 {
265 self.index.weak_tombstone_reclaimable_count()
266 }
267
268 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
269 self.index.drop_range(range)
270 }
271
272 fn clear(&self) -> crate::Result<()> {
273 self.index.clear()
274 }
275
276 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
277 self.index.major_compact(target_size, seqno_threshold)
278 }
279
280 fn clear_active_memtable(&self) {
281 self.index.clear_active_memtable();
282 }
283
284 fn l0_run_count(&self) -> usize {
285 self.index.l0_run_count()
286 }
287
288 fn blob_file_count(&self) -> usize {
289 self.current_version().blob_file_count()
290 }
291
292 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
295 let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
296 return Ok(None);
297 };
298
299 Ok(Some(if item.key.value_type.is_indirection() {
300 let mut cursor = std::io::Cursor::new(item.value);
301 let vptr = BlobIndirection::decode_from(&mut cursor)?;
302 vptr.size
303 } else {
304 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
305 {
306 item.value.len() as u32
307 }
308 }))
309 }
310
311 fn stale_blob_bytes(&self) -> u64 {
312 self.current_version().gc_stats().stale_bytes()
313 }
314
315 fn filter_size(&self) -> u64 {
316 self.index.filter_size()
317 }
318
319 fn pinned_filter_size(&self) -> usize {
320 self.index.pinned_filter_size()
321 }
322
323 fn pinned_block_index_size(&self) -> usize {
324 self.index.pinned_block_index_size()
325 }
326
327 fn sealed_memtable_count(&self) -> usize {
328 self.index.sealed_memtable_count()
329 }
330
331 fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
332 self.index.get_flush_lock()
333 }
334
335 fn flush_to_tables(
336 &self,
337 stream: impl Iterator<Item = crate::Result<InternalValue>>,
338 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
339 use crate::{
340 coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER,
341 table::multi_writer::MultiWriter,
342 };
343
344 let start = std::time::Instant::now();
345
346 let table_folder = self.index.config.path.join(TABLES_FOLDER);
347
348 let data_block_size = self.index.config.data_block_size_policy.get(0);
349
350 let data_block_restart_interval =
351 self.index.config.data_block_restart_interval_policy.get(0);
352 let index_block_restart_interval =
353 self.index.config.index_block_restart_interval_policy.get(0);
354
355 let data_block_compression = self.index.config.data_block_compression_policy.get(0);
356 let index_block_compression = self.index.config.index_block_compression_policy.get(0);
357
358 let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
359
360 let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
361 let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
362
363 log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}");
364 log::debug!("=> to table(s) in {}", table_folder.display());
365 log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
366
367 let mut table_writer = MultiWriter::new(
368 table_folder.clone(),
369 self.index.table_id_counter.clone(),
370 64 * 1_024 * 1_024,
371 0,
372 )?
373 .use_data_block_restart_interval(data_block_restart_interval)
374 .use_index_block_restart_interval(index_block_restart_interval)
375 .use_data_block_compression(data_block_compression)
376 .use_index_block_compression(index_block_compression)
377 .use_data_block_size(data_block_size)
378 .use_data_block_hash_ratio(data_block_hash_ratio)
379 .use_bloom_policy({
380 use crate::config::FilterPolicyEntry::{Bloom, None};
381 use crate::table::filter::BloomConstructionPolicy;
382
383 match self.index.config.filter_policy.get(0) {
384 Bloom(policy) => policy,
385 None => BloomConstructionPolicy::BitsPerKey(0.0),
386 }
387 });
388
389 if index_partitioning {
390 table_writer = table_writer.use_partitioned_index();
391 }
392 if filter_partitioning {
393 table_writer = table_writer.use_partitioned_filter();
394 }
395
396 #[expect(
397 clippy::expect_used,
398 reason = "cannot create blob tree without defining kv separation options"
399 )]
400 let kv_opts = self
401 .index
402 .config
403 .kv_separation_opts
404 .as_ref()
405 .expect("kv separation options should exist");
406
407 let mut blob_writer = BlobFileWriter::new(
408 self.index.0.blob_file_id_counter.clone(),
409 self.index.config.path.join(BLOBS_FOLDER),
410 self.id(),
411 self.index.config.descriptor_table.clone(),
412 )?
413 .use_target_size(kv_opts.file_target_size)
414 .use_compression(kv_opts.compression);
415
416 let separation_threshold = kv_opts.separation_threshold;
417
418 for item in stream {
419 let item = item?;
420
421 if item.is_tombstone() {
422 table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
425 continue;
426 }
427
428 let value = item.value;
429
430 #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
431 let value_size = value.len() as u32;
432
433 if value_size >= separation_threshold {
434 let offset = blob_writer.offset();
435 let blob_file_id = blob_writer.blob_file_id();
436 let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
437
438 let indirection = BlobIndirection {
439 vhandle: ValueHandle {
440 blob_file_id,
441 offset,
442 on_disk_size,
443 },
444 size: value_size,
445 };
446
447 table_writer.write({
448 let mut vptr =
449 InternalValue::new(item.key.clone(), indirection.encode_into_vec());
450 vptr.key.value_type = crate::ValueType::Indirection;
451 vptr
452 })?;
453
454 table_writer.register_blob(indirection);
455 } else {
456 table_writer.write(InternalValue::new(item.key, value))?;
457 }
458 }
459
460 let blob_files = blob_writer.finish()?;
461
462 let result = table_writer.finish()?;
463
464 log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
465
466 let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
467 let pin_index = self.index.config.index_block_pinning_policy.get(0);
468
469 let tables = result
471 .into_iter()
472 .map(|(table_id, checksum)| -> crate::Result<Table> {
473 Table::recover(
474 table_folder.join(table_id.to_string()),
475 checksum,
476 0,
477 self.index.id,
478 self.index.config.cache.clone(),
479 self.index.config.descriptor_table.clone(),
480 pin_filter,
481 pin_index,
482 #[cfg(feature = "metrics")]
483 self.index.metrics.clone(),
484 )
485 })
486 .collect::<crate::Result<Vec<_>>>()?;
487
488 Ok(Some((tables, Some(blob_files))))
489 }
490
491 fn register_tables(
492 &self,
493 tables: &[Table],
494 blob_files: Option<&[BlobFile]>,
495 frag_map: Option<FragmentationMap>,
496 sealed_memtables_to_delete: &[MemtableId],
497 gc_watermark: SeqNo,
498 ) -> crate::Result<()> {
499 self.index.register_tables(
500 tables,
501 blob_files,
502 frag_map,
503 sealed_memtables_to_delete,
504 gc_watermark,
505 )
506 }
507
508 fn compact(
509 &self,
510 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
511 seqno_threshold: SeqNo,
512 ) -> crate::Result<()> {
513 self.index.compact(strategy, seqno_threshold)
514 }
515
516 fn get_next_table_id(&self) -> TableId {
517 self.index.get_next_table_id()
518 }
519
520 fn tree_config(&self) -> &Config {
521 &self.index.config
522 }
523
524 fn get_highest_seqno(&self) -> Option<SeqNo> {
525 self.index.get_highest_seqno()
526 }
527
528 fn active_memtable(&self) -> Arc<Memtable> {
529 self.index.active_memtable()
530 }
531
532 fn tree_type(&self) -> crate::TreeType {
533 crate::TreeType::Blob
534 }
535
536 fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
537 self.index.rotate_memtable()
538 }
539
540 fn table_count(&self) -> usize {
541 self.index.table_count()
542 }
543
544 fn level_table_count(&self, idx: usize) -> Option<usize> {
545 self.index.level_table_count(idx)
546 }
547
548 fn approximate_len(&self) -> usize {
549 self.index.approximate_len()
550 }
551
552 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
555 self.index.is_empty(seqno, index)
556 }
557
558 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
561 self.index.contains_key(key, seqno)
562 }
563
564 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
567 self.index.len(seqno, index)
568 }
569
570 fn disk_space(&self) -> u64 {
571 let version = self.current_version();
572 self.index.disk_space() + version.blob_files.on_disk_size()
573 }
574
575 fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
576 self.index.get_highest_memtable_seqno()
577 }
578
579 fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
580 self.index.get_highest_persisted_seqno()
581 }
582
583 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
584 &self,
585 key: K,
586 value: V,
587 seqno: SeqNo,
588 ) -> (u64, u64) {
589 self.index.insert(key, value.into(), seqno)
590 }
591
592 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
593 let key = key.as_ref();
594
595 #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
596 let super_version = self
597 .index
598 .version_history
599 .read()
600 .expect("lock is poisoned")
601 .get_version_for_snapshot(seqno);
602
603 let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)?
604 else {
605 return Ok(None);
606 };
607
608 let (_, v) = resolve_value_handle(
609 self.id(),
610 self.blobs_folder.as_path(),
611 &self.index.config.cache,
612 &super_version.version,
613 item,
614 )?;
615
616 Ok(Some(v))
617 }
618
619 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
620 self.index.remove(key, seqno)
621 }
622
623 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
624 self.index.remove_weak(key, seqno)
625 }
626}