Skip to main content

lsm_tree/vlog/blob_file/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5pub mod merge;
6pub mod meta;
7pub mod multi_writer;
8pub mod reader;
9pub mod scanner;
10pub mod writer;
11
12use crate::path::{Path, PathBuf};
13use crate::{
14    Checksum, GlobalTableId, TreeId, blob_tree::FragmentationMap, deletion_pause::DeletionPause,
15    file_accessor::FileAccessor, fs::Fs, vlog::BlobFileId,
16};
17#[cfg(not(feature = "std"))]
18use alloc::boxed::Box;
19use alloc::sync::Arc;
20use core::sync::atomic::AtomicBool;
21pub use meta::Metadata;
22
23/// A blob file is an immutable, sorted, contiguous file that contains large key-value pairs (blobs)
24//
25// `#[derive(Debug)]` cannot be used because [`Fs`] is not `Debug` (trait
26// objects without an explicit `Debug` bound would require boxing through
27// `dyn Debug`). A manual impl that prints stable identifiers gives the
28// same operational ergonomics as the previous derived `Debug` without
29// pulling `Debug` into the `Fs` trait bound (which would cascade through
30// every backend).
31pub struct Inner {
32    /// Blob file ID
33    pub id: BlobFileId,
34
35    pub tree_id: TreeId,
36
37    /// File path
38    pub path: PathBuf,
39
40    /// Statistics
41    pub meta: Metadata,
42
43    /// Whether this blob file is deleted (logically)
44    pub is_deleted: AtomicBool,
45
46    /// Tight-space punch-on-drop offset, or [`u64::MAX`] (default) for "no
47    /// punch". When tight-space blob relocation rewrites this file's live
48    /// entries below an offset into a fresh compact file, the PRIOR view is
49    /// marked here with that absolute data-section offset; once every reader
50    /// holding it drops, this view's [`Drop`] reclaims the consumed
51    /// `[data_start, offset)` data frames via
52    /// [`Fs::punch_hole`] and LEAVES the file in
53    /// place (the restricted view still serves the suffix). Mirrors
54    /// `table::Inner::punch_on_drop`. Distinct from [`Self::is_deleted`].
55    #[cfg_attr(
56        not(feature = "std"),
57        allow(
58            dead_code,
59            reason = "tight-space punch-on-drop frontier; the punch consumer is std-gated, so unread under no_std"
60        )
61    )]
62    pub(crate) punch_on_drop: portable_atomic::AtomicU64,
63
64    pub checksum: Checksum,
65
66    pub(crate) file_accessor: FileAccessor,
67
68    /// Filesystem backend used by [`Drop`] for the physical removal.
69    /// Carries the same `Fs` instance the file was opened through so that
70    /// in-memory and routed-tier backends behave consistently with the
71    /// rest of the tree.
72    pub(crate) fs: Arc<dyn Fs>,
73
74    /// Tree-wide file-deletion gate. Installed once by
75    /// [`BlobFile::install_deletion_pause`] after the file is registered
76    /// with a tree. When `Some` and active, the [`Drop`] impl defers the
77    /// underlying `remove_file` so an in-progress checkpoint can hard-link
78    /// the file before it disappears.
79    // `once_cell::race::OnceBox` — see Table::Inner::deletion_pause
80    // for the rationale (no-std-friendly one-shot slot).
81    pub(crate) deletion_pause: once_cell::race::OnceBox<Arc<DeletionPause>>,
82
83    /// Tree-wide background file deleter. See
84    /// [`Table::install_background_deleter`](crate::Table) for the contract:
85    /// when present (and no checkpoint pause is active) the [`Drop`] impl frees
86    /// the blob file's blocks synchronously via
87    /// [`Fs::truncate_file`] and hands the
88    /// directory-entry `unlink` to this deleter, off the foreground path.
89    // std-only (the deleter spawns a thread); see Table::Inner for rationale.
90    #[cfg(feature = "std")]
91    pub(crate) background_deleter: once_cell::race::OnceBox<Arc<crate::BackgroundDeleter>>,
92}
93
94impl Inner {
95    fn global_id(&self) -> GlobalTableId {
96        GlobalTableId::from((self.tree_id, self.id))
97    }
98}
99
100impl core::fmt::Debug for Inner {
101    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102        f.debug_struct("blob_file::Inner")
103            .field("id", &self.id)
104            .field("tree_id", &self.tree_id)
105            .field("path", &self.path)
106            .field(
107                "is_deleted",
108                &self.is_deleted.load(core::sync::atomic::Ordering::Relaxed),
109            )
110            .field("meta", &self.meta)
111            .finish_non_exhaustive()
112    }
113}
114
115impl Drop for Inner {
116    fn drop(&mut self) {
117        if self.is_deleted.load(core::sync::atomic::Ordering::Acquire) {
118            log::trace!(
119                "Cleanup deleted blob file {:?} at {}",
120                self.id,
121                self.path.display(),
122            );
123
124            // Move the accessor out and drop it FIRST so every pinned
125            // Arc<dyn FsFile> the file_accessor holds is released before
126            // we try to unlink. On Windows (and any other platform where
127            // an open handle blocks unlink) a live handle here would
128            // make remove_file fail silently, leaking the blob file's
129            // disk space — the same hazard already handled in
130            // table::Inner::drop. Eviction from the descriptor table
131            // happens through the same accessor before the drop.
132            let global_id = self.global_id();
133            let file_accessor = core::mem::replace(&mut self.file_accessor, FileAccessor::Closed);
134            file_accessor
135                .as_descriptor_table()
136                .inspect(|d| d.remove_for_blob_file(&global_id));
137            drop(file_accessor);
138
139            // If a checkpoint is active, defer the physical deletion so the
140            // file remains hard-linkable until the checkpoint releases its
141            // pause. Short-circuit on the common no-checkpoint path: skip
142            // the Arc<dyn Fs> bump and PathBuf clone unless a pause is
143            // both installed AND currently active. `try_enqueue` still
144            // re-checks `is_active()` under the queue lock to close the
145            // publish-then-release race, so the outer check is pure perf.
146            let deferred = match self.deletion_pause.get() {
147                Some(pause) if pause.is_active() => {
148                    pause.try_enqueue(Arc::clone(&self.fs), self.path.clone())
149                }
150                _ => false,
151            };
152
153            if deferred {
154                log::trace!(
155                    "Deferred deletion of blob file {:?} at {} (checkpoint active)",
156                    self.id,
157                    self.path.display(),
158                );
159                return;
160            }
161
162            // Off-foreground reclaim: free the blocks synchronously (accurate
163            // footprint scan) and hand the unlink to the background deleter.
164            // Falls through to a synchronous remove_file when none installed.
165            #[cfg(feature = "std")]
166            if let Some(deleter) = self.background_deleter.get() {
167                // Truncate only when we own the sole hard link — a checkpoint
168                // may have hard-linked this blob file, and truncating the shared
169                // inode would zero the checkpoint's copy. Otherwise skip the
170                // truncate and just unlink (data survives via the other link).
171                if self.fs.hard_link_count(&self.path).is_ok_and(|n| n <= 1)
172                    && let Err(e) = self.fs.truncate_file(&self.path)
173                {
174                    log::warn!(
175                        "Failed to truncate deleted blob file {:?} at {}: {e:?}",
176                        self.id,
177                        self.path.display(),
178                    );
179                }
180                deleter.enqueue(Arc::clone(&self.fs), self.path.clone());
181                return;
182            }
183
184            if let Err(e) = self.fs.remove_file(&self.path) {
185                log::warn!(
186                    "Failed to cleanup deleted blob file {:?} at {}: {e:?}",
187                    self.id,
188                    self.path.display(),
189                );
190            }
191        } else {
192            // Not deleted, but possibly marked for tight-space prefix reclaim:
193            // this (old) view's last Arc is dropping, so no reader can touch the
194            // relocated prefix anymore. Punch the consumed data frames
195            // `[data_start, offset)` and LEAVE the file — the restricted view (a
196            // distinct Inner) still serves the suffix. A blob file is an SFA
197            // archive, so the punch must start at the `data` section (skip the
198            // header); the TOC sits at the tail and stays intact. `offset` is an
199            // absolute data-section position (a frame boundary from the
200            // relocation scanner). Re-read the data-section start from the TOC
201            // here rather than carrying it on every blob-file Inner — the punch
202            // is a rare, tight-space-only path.
203            //
204            // Hole punching is a std-only capability (the tight-space relocation
205            // loop that arms it is itself `#[cfg(feature = "std")]`), so the punch
206            // action is gated. The atomic load is no-std-safe but pointless when
207            // nothing can arm it.
208            #[cfg(feature = "std")]
209            {
210                let off = self
211                    .punch_on_drop
212                    .load(core::sync::atomic::Ordering::Acquire);
213                if off != u64::MAX {
214                    match data_section_start(&*self.fs, &self.path) {
215                        Ok(data_start) if off > data_start => {
216                            if let Err(e) =
217                                self.fs.punch_hole(&self.path, data_start, off - data_start)
218                            {
219                                log::warn!(
220                                    "Failed to punch tight-space data [{data_start}, {off}) of blob file {:?} at {}: {e:?}",
221                                    self.id,
222                                    self.path.display(),
223                                );
224                            }
225                        }
226                        Ok(_) => {} // nothing consumed below the data start
227                        Err(e) => log::warn!(
228                            "Skipping tight-space punch of blob file {:?} at {}: could not read data section: {e:?}",
229                            self.id,
230                            self.path.display(),
231                        ),
232                    }
233                }
234            }
235        }
236    }
237}
238
239/// Byte offset where a blob file's `data` section begins, read from its SFA TOC.
240/// Used by the tight-space punch so it reclaims only data frames and never the
241/// SFA header that precedes them.
242#[cfg(feature = "std")]
243fn data_section_start(fs: &dyn Fs, path: &Path) -> crate::Result<u64> {
244    let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
245    let reader = crate::sfa::Reader::from_reader(&mut file)?;
246    let data = reader
247        .toc()
248        .section(b"data")
249        .ok_or(crate::Error::InvalidHeader("BlobFile"))?;
250    Ok(data.pos())
251}
252
253/// A blob file stores large values and is part of the value log
254#[derive(Clone)]
255pub struct BlobFile(pub(crate) Arc<Inner>);
256
257impl Eq for BlobFile {}
258
259impl PartialEq for BlobFile {
260    fn eq(&self, other: &Self) -> bool {
261        self.id().eq(&other.id())
262    }
263}
264
265impl core::hash::Hash for BlobFile {
266    fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
267        self.id().hash(state);
268    }
269}
270
271impl BlobFile {
272    pub(crate) fn mark_as_deleted(&self) {
273        self.0
274            .is_deleted
275            .store(true, core::sync::atomic::Ordering::Release);
276    }
277
278    /// Marks this view to punch the consumed `[data_start, offset)` data frames
279    /// when its last `Arc` drops (see [`Inner::punch_on_drop`]). `offset` is an
280    /// absolute data-section position. Set on the PRIOR view once a tight-space
281    /// relocation slice has moved its `[data_start, offset)` live entries into a
282    /// fresh compact file and that move is durably installed.
283    #[cfg(feature = "std")]
284    pub(crate) fn mark_punch_on_drop(&self, offset: u64) {
285        self.0
286            .punch_on_drop
287            .store(offset, core::sync::atomic::Ordering::Release);
288    }
289
290    /// Re-opens this blob file as a DISTINCT [`Inner`] (its own file handle and a
291    /// fresh punch-on-drop atomic) over the same physical file. The tight-space
292    /// relocation loop installs the re-opened view in the new version and arms
293    /// the prior view to punch its consumed prefix once its readers drain, so a
294    /// stale blob file is reclaimed in place while the suffix keeps serving the
295    /// not-yet-relocated entries — the blob analog of [`Table::reopen_restricted`](crate::Table::reopen_restricted).
296    ///
297    /// # Errors
298    ///
299    /// Propagates any error from re-opening the file.
300    #[cfg(feature = "std")]
301    pub(crate) fn reopen(&self) -> crate::Result<Self> {
302        super::recover_blob_file(
303            &self.0.path,
304            self.0.id,
305            self.0.checksum,
306            self.0.tree_id,
307            &self.0.fs,
308        )
309    }
310
311    /// Installs the tree-wide deletion pause used by checkpoints.
312    /// Idempotent: a second call is a no-op.
313    pub(crate) fn install_deletion_pause(&self, pause: Arc<DeletionPause>) {
314        let _ = self.0.deletion_pause.set(Box::new(pause));
315    }
316
317    /// Installs the tree-wide background file deleter. Idempotent.
318    #[cfg(feature = "std")]
319    pub(crate) fn install_background_deleter(&self, deleter: Arc<crate::BackgroundDeleter>) {
320        let _ = self.0.background_deleter.set(Box::new(deleter));
321    }
322
323    /// Returns the blob file ID.
324    #[must_use]
325    pub fn id(&self) -> BlobFileId {
326        self.0.id
327    }
328
329    /// Returns the full blob file checksum.
330    #[must_use]
331    pub fn checksum(&self) -> Checksum {
332        self.0.checksum
333    }
334
335    /// Returns the blob file path.
336    #[must_use]
337    pub fn path(&self) -> &Path {
338        &self.0.path
339    }
340
341    /// Returns the blob file accessor.
342    #[must_use]
343    pub(crate) fn file_accessor(&self) -> &FileAccessor {
344        &self.0.file_accessor
345    }
346
347    /// Returns the number of items in the blob file.
348    #[must_use]
349    #[expect(clippy::len_without_is_empty)]
350    pub fn len(&self) -> u64 {
351        self.0.meta.item_count
352    }
353
354    /// Physical on-disk file size in bytes, including the per-entry framing
355    /// (V4 header + key) and the metadata block / trailer — not just the
356    /// compressed payload (`meta.total_compressed_bytes`). Used as a
357    /// conservative upper bound on the transient output of a blob relocation:
358    /// the rewritten file re-emits the same framing, so the source file's
359    /// physical size bounds the output (and includes the dead blobs a relocation
360    /// drops, making it strictly conservative).
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the blob file's size cannot be stat-ed.
365    pub(crate) fn physical_size(&self) -> crate::Result<u64> {
366        Ok(self.0.fs.metadata(&self.0.path)?.len)
367    }
368
369    /// Returns `true` if the blob file is stale (based on the given staleness threshold).
370    pub(crate) fn is_stale(&self, frag_map: &FragmentationMap, threshold: f32) -> bool {
371        frag_map.get(&self.id()).is_some_and(|x| {
372            #[expect(
373                clippy::cast_precision_loss,
374                reason = "ok to lose precision as this is an approximate calculation"
375            )]
376            let stale_bytes = x.bytes as f32;
377            #[expect(
378                clippy::cast_precision_loss,
379                reason = "ok to lose precision as this is an approximate calculation"
380            )]
381            let all_bytes = self.0.meta.total_uncompressed_bytes as f32;
382            let ratio = stale_bytes / all_bytes;
383            ratio >= threshold
384        })
385    }
386
387    /// Returns `true` if the blob file has no more incoming references, and can be safely removed from a Version.
388    pub(crate) fn is_dead(&self, frag_map: &FragmentationMap) -> bool {
389        frag_map.get(&self.id()).is_some_and(|x| {
390            let stale_bytes = x.bytes;
391            let all_bytes = self.0.meta.total_uncompressed_bytes;
392            stale_bytes == all_bytes
393        })
394    }
395}