Skip to main content

lsm_tree/vlog/blob_file/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod merge;
6pub mod meta;
7pub mod multi_writer;
8pub mod reader;
9pub mod scanner;
10pub mod writer;
11
12use crate::{
13    blob_tree::FragmentationMap, file_accessor::FileAccessor, vlog::BlobFileId, Checksum,
14    GlobalTableId, TreeId,
15};
16pub use meta::Metadata;
17use std::{
18    path::{Path, PathBuf},
19    sync::{atomic::AtomicBool, Arc},
20};
21
22/// A blob file is an immutable, sorted, contiguous file that contains large key-value pairs (blobs)
23#[derive(Debug)]
24pub struct Inner {
25    /// Blob file ID
26    pub id: BlobFileId,
27
28    pub tree_id: TreeId,
29
30    /// File path
31    pub path: PathBuf,
32
33    /// Statistics
34    pub meta: Metadata,
35
36    /// Whether this blob file is deleted (logically)
37    pub is_deleted: AtomicBool,
38
39    pub checksum: Checksum,
40
41    pub(crate) file_accessor: FileAccessor,
42}
43
44impl Inner {
45    fn global_id(&self) -> GlobalTableId {
46        GlobalTableId::from((self.tree_id, self.id))
47    }
48}
49
50impl Drop for Inner {
51    fn drop(&mut self) {
52        if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) {
53            log::trace!(
54                "Cleanup deleted blob file {:?} at {}",
55                self.id,
56                self.path.display(),
57            );
58
59            if let Err(e) = std::fs::remove_file(&*self.path) {
60                log::warn!(
61                    "Failed to cleanup deleted blob file {:?} at {}: {e:?}",
62                    self.id,
63                    self.path.display(),
64                );
65            }
66
67            self.file_accessor
68                .as_descriptor_table()
69                .inspect(|d| d.remove_for_blob_file(&self.global_id()));
70        }
71    }
72}
73
74/// A blob file stores large values and is part of the value log
75#[derive(Clone)]
76pub struct BlobFile(pub(crate) Arc<Inner>);
77
78impl Eq for BlobFile {}
79
80impl PartialEq for BlobFile {
81    fn eq(&self, other: &Self) -> bool {
82        self.id().eq(&other.id())
83    }
84}
85
86impl std::hash::Hash for BlobFile {
87    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
88        self.id().hash(state);
89    }
90}
91
92impl BlobFile {
93    pub(crate) fn mark_as_deleted(&self) {
94        self.0
95            .is_deleted
96            .store(true, std::sync::atomic::Ordering::Release);
97    }
98
99    /// Returns the blob file ID.
100    #[must_use]
101    pub fn id(&self) -> BlobFileId {
102        self.0.id
103    }
104
105    /// Returns the full blob file checksum.
106    #[must_use]
107    pub fn checksum(&self) -> Checksum {
108        self.0.checksum
109    }
110
111    /// Returns the blob file path.
112    #[must_use]
113    pub fn path(&self) -> &Path {
114        &self.0.path
115    }
116
117    /// Returns the blob file accessor.
118    #[must_use]
119    pub(crate) fn file_accessor(&self) -> &FileAccessor {
120        &self.0.file_accessor
121    }
122
123    /// Returns the number of items in the blob file.
124    #[must_use]
125    #[expect(clippy::len_without_is_empty)]
126    pub fn len(&self) -> u64 {
127        self.0.meta.item_count
128    }
129
130    /// Returns `true` if the blob file is stale (based on the given staleness threshold).
131    pub(crate) fn is_stale(&self, frag_map: &FragmentationMap, threshold: f32) -> bool {
132        frag_map.get(&self.id()).is_some_and(|x| {
133            #[expect(
134                clippy::cast_precision_loss,
135                reason = "ok to lose precision as this is an approximate calculation"
136            )]
137            let stale_bytes = x.bytes as f32;
138            #[expect(
139                clippy::cast_precision_loss,
140                reason = "ok to lose precision as this is an approximate calculation"
141            )]
142            let all_bytes = self.0.meta.total_uncompressed_bytes as f32;
143            let ratio = stale_bytes / all_bytes;
144            ratio >= threshold
145        })
146    }
147
148    /// Returns `true` if the blob file has no more incoming references, and can be safely removed from a Version.
149    pub(crate) fn is_dead(&self, frag_map: &FragmentationMap) -> bool {
150        frag_map.get(&self.id()).is_some_and(|x| {
151            let stale_bytes = x.bytes;
152            let all_bytes = self.0.meta.total_uncompressed_bytes;
153            stale_bytes == all_bytes
154        })
155    }
156}