lsm_tree/vlog/blob_file/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5pub mod merge;
6pub mod meta;
7pub mod multi_writer;
8pub mod reader;
9pub mod scanner;
10pub mod writer;
11
12use crate::path::{Path, PathBuf};
13use crate::{
14 Checksum, GlobalTableId, TreeId, blob_tree::FragmentationMap, deletion_pause::DeletionPause,
15 file_accessor::FileAccessor, fs::Fs, vlog::BlobFileId,
16};
17#[cfg(not(feature = "std"))]
18use alloc::boxed::Box;
19use alloc::sync::Arc;
20use core::sync::atomic::AtomicBool;
21pub use meta::Metadata;
22
23/// A blob file is an immutable, sorted, contiguous file that contains large key-value pairs (blobs)
24//
25// `#[derive(Debug)]` cannot be used because [`Fs`] is not `Debug` (trait
26// objects without an explicit `Debug` bound would require boxing through
27// `dyn Debug`). A manual impl that prints stable identifiers gives the
28// same operational ergonomics as the previous derived `Debug` without
29// pulling `Debug` into the `Fs` trait bound (which would cascade through
30// every backend).
31pub struct Inner {
32 /// Blob file ID
33 pub id: BlobFileId,
34
35 pub tree_id: TreeId,
36
37 /// File path
38 pub path: PathBuf,
39
40 /// Statistics
41 pub meta: Metadata,
42
43 /// Whether this blob file is deleted (logically)
44 pub is_deleted: AtomicBool,
45
46 /// Tight-space punch-on-drop offset, or [`u64::MAX`] (default) for "no
47 /// punch". When tight-space blob relocation rewrites this file's live
48 /// entries below an offset into a fresh compact file, the PRIOR view is
49 /// marked here with that absolute data-section offset; once every reader
50 /// holding it drops, this view's [`Drop`] reclaims the consumed
51 /// `[data_start, offset)` data frames via
52 /// [`Fs::punch_hole`] and LEAVES the file in
53 /// place (the restricted view still serves the suffix). Mirrors
54 /// `table::Inner::punch_on_drop`. Distinct from [`Self::is_deleted`].
55 #[cfg_attr(
56 not(feature = "std"),
57 allow(
58 dead_code,
59 reason = "tight-space punch-on-drop frontier; the punch consumer is std-gated, so unread under no_std"
60 )
61 )]
62 pub(crate) punch_on_drop: portable_atomic::AtomicU64,
63
64 pub checksum: Checksum,
65
66 pub(crate) file_accessor: FileAccessor,
67
68 /// Filesystem backend used by [`Drop`] for the physical removal.
69 /// Carries the same `Fs` instance the file was opened through so that
70 /// in-memory and routed-tier backends behave consistently with the
71 /// rest of the tree.
72 pub(crate) fs: Arc<dyn Fs>,
73
74 /// Tree-wide file-deletion gate. Installed once by
75 /// [`BlobFile::install_deletion_pause`] after the file is registered
76 /// with a tree. When `Some` and active, the [`Drop`] impl defers the
77 /// underlying `remove_file` so an in-progress checkpoint can hard-link
78 /// the file before it disappears.
79 // `once_cell::race::OnceBox` — see Table::Inner::deletion_pause
80 // for the rationale (no-std-friendly one-shot slot).
81 pub(crate) deletion_pause: once_cell::race::OnceBox<Arc<DeletionPause>>,
82
83 /// Tree-wide background file deleter. See
84 /// [`Table::install_background_deleter`](crate::Table) for the contract:
85 /// when present (and no checkpoint pause is active) the [`Drop`] impl frees
86 /// the blob file's blocks synchronously via
87 /// [`Fs::truncate_file`] and hands the
88 /// directory-entry `unlink` to this deleter, off the foreground path.
89 // std-only (the deleter spawns a thread); see Table::Inner for rationale.
90 #[cfg(feature = "std")]
91 pub(crate) background_deleter: once_cell::race::OnceBox<Arc<crate::BackgroundDeleter>>,
92}
93
94impl Inner {
95 fn global_id(&self) -> GlobalTableId {
96 GlobalTableId::from((self.tree_id, self.id))
97 }
98}
99
100impl core::fmt::Debug for Inner {
101 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102 f.debug_struct("blob_file::Inner")
103 .field("id", &self.id)
104 .field("tree_id", &self.tree_id)
105 .field("path", &self.path)
106 .field(
107 "is_deleted",
108 &self.is_deleted.load(core::sync::atomic::Ordering::Relaxed),
109 )
110 .field("meta", &self.meta)
111 .finish_non_exhaustive()
112 }
113}
114
115impl Drop for Inner {
116 fn drop(&mut self) {
117 if self.is_deleted.load(core::sync::atomic::Ordering::Acquire) {
118 log::trace!(
119 "Cleanup deleted blob file {:?} at {}",
120 self.id,
121 self.path.display(),
122 );
123
124 // Move the accessor out and drop it FIRST so every pinned
125 // Arc<dyn FsFile> the file_accessor holds is released before
126 // we try to unlink. On Windows (and any other platform where
127 // an open handle blocks unlink) a live handle here would
128 // make remove_file fail silently, leaking the blob file's
129 // disk space — the same hazard already handled in
130 // table::Inner::drop. Eviction from the descriptor table
131 // happens through the same accessor before the drop.
132 let global_id = self.global_id();
133 let file_accessor = core::mem::replace(&mut self.file_accessor, FileAccessor::Closed);
134 file_accessor
135 .as_descriptor_table()
136 .inspect(|d| d.remove_for_blob_file(&global_id));
137 drop(file_accessor);
138
139 // If a checkpoint is active, defer the physical deletion so the
140 // file remains hard-linkable until the checkpoint releases its
141 // pause. Short-circuit on the common no-checkpoint path: skip
142 // the Arc<dyn Fs> bump and PathBuf clone unless a pause is
143 // both installed AND currently active. `try_enqueue` still
144 // re-checks `is_active()` under the queue lock to close the
145 // publish-then-release race, so the outer check is pure perf.
146 let deferred = match self.deletion_pause.get() {
147 Some(pause) if pause.is_active() => {
148 pause.try_enqueue(Arc::clone(&self.fs), self.path.clone())
149 }
150 _ => false,
151 };
152
153 if deferred {
154 log::trace!(
155 "Deferred deletion of blob file {:?} at {} (checkpoint active)",
156 self.id,
157 self.path.display(),
158 );
159 return;
160 }
161
162 // Off-foreground reclaim: free the blocks synchronously (accurate
163 // footprint scan) and hand the unlink to the background deleter.
164 // Falls through to a synchronous remove_file when none installed.
165 #[cfg(feature = "std")]
166 if let Some(deleter) = self.background_deleter.get() {
167 // Truncate only when we own the sole hard link — a checkpoint
168 // may have hard-linked this blob file, and truncating the shared
169 // inode would zero the checkpoint's copy. Otherwise skip the
170 // truncate and just unlink (data survives via the other link).
171 if self.fs.hard_link_count(&self.path).is_ok_and(|n| n <= 1)
172 && let Err(e) = self.fs.truncate_file(&self.path)
173 {
174 log::warn!(
175 "Failed to truncate deleted blob file {:?} at {}: {e:?}",
176 self.id,
177 self.path.display(),
178 );
179 }
180 deleter.enqueue(Arc::clone(&self.fs), self.path.clone());
181 return;
182 }
183
184 if let Err(e) = self.fs.remove_file(&self.path) {
185 log::warn!(
186 "Failed to cleanup deleted blob file {:?} at {}: {e:?}",
187 self.id,
188 self.path.display(),
189 );
190 }
191 } else {
192 // Not deleted, but possibly marked for tight-space prefix reclaim:
193 // this (old) view's last Arc is dropping, so no reader can touch the
194 // relocated prefix anymore. Punch the consumed data frames
195 // `[data_start, offset)` and LEAVE the file — the restricted view (a
196 // distinct Inner) still serves the suffix. A blob file is an SFA
197 // archive, so the punch must start at the `data` section (skip the
198 // header); the TOC sits at the tail and stays intact. `offset` is an
199 // absolute data-section position (a frame boundary from the
200 // relocation scanner). Re-read the data-section start from the TOC
201 // here rather than carrying it on every blob-file Inner — the punch
202 // is a rare, tight-space-only path.
203 //
204 // Hole punching is a std-only capability (the tight-space relocation
205 // loop that arms it is itself `#[cfg(feature = "std")]`), so the punch
206 // action is gated. The atomic load is no-std-safe but pointless when
207 // nothing can arm it.
208 #[cfg(feature = "std")]
209 {
210 let off = self
211 .punch_on_drop
212 .load(core::sync::atomic::Ordering::Acquire);
213 if off != u64::MAX {
214 match data_section_start(&*self.fs, &self.path) {
215 Ok(data_start) if off > data_start => {
216 if let Err(e) =
217 self.fs.punch_hole(&self.path, data_start, off - data_start)
218 {
219 log::warn!(
220 "Failed to punch tight-space data [{data_start}, {off}) of blob file {:?} at {}: {e:?}",
221 self.id,
222 self.path.display(),
223 );
224 }
225 }
226 Ok(_) => {} // nothing consumed below the data start
227 Err(e) => log::warn!(
228 "Skipping tight-space punch of blob file {:?} at {}: could not read data section: {e:?}",
229 self.id,
230 self.path.display(),
231 ),
232 }
233 }
234 }
235 }
236 }
237}
238
239/// Byte offset where a blob file's `data` section begins, read from its SFA TOC.
240/// Used by the tight-space punch so it reclaims only data frames and never the
241/// SFA header that precedes them.
242#[cfg(feature = "std")]
243fn data_section_start(fs: &dyn Fs, path: &Path) -> crate::Result<u64> {
244 let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
245 let reader = crate::sfa::Reader::from_reader(&mut file)?;
246 let data = reader
247 .toc()
248 .section(b"data")
249 .ok_or(crate::Error::InvalidHeader("BlobFile"))?;
250 Ok(data.pos())
251}
252
253/// A blob file stores large values and is part of the value log
254#[derive(Clone)]
255pub struct BlobFile(pub(crate) Arc<Inner>);
256
257impl Eq for BlobFile {}
258
259impl PartialEq for BlobFile {
260 fn eq(&self, other: &Self) -> bool {
261 self.id().eq(&other.id())
262 }
263}
264
265impl core::hash::Hash for BlobFile {
266 fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
267 self.id().hash(state);
268 }
269}
270
271impl BlobFile {
272 pub(crate) fn mark_as_deleted(&self) {
273 self.0
274 .is_deleted
275 .store(true, core::sync::atomic::Ordering::Release);
276 }
277
278 /// Marks this view to punch the consumed `[data_start, offset)` data frames
279 /// when its last `Arc` drops (see [`Inner::punch_on_drop`]). `offset` is an
280 /// absolute data-section position. Set on the PRIOR view once a tight-space
281 /// relocation slice has moved its `[data_start, offset)` live entries into a
282 /// fresh compact file and that move is durably installed.
283 #[cfg(feature = "std")]
284 pub(crate) fn mark_punch_on_drop(&self, offset: u64) {
285 self.0
286 .punch_on_drop
287 .store(offset, core::sync::atomic::Ordering::Release);
288 }
289
290 /// Re-opens this blob file as a DISTINCT [`Inner`] (its own file handle and a
291 /// fresh punch-on-drop atomic) over the same physical file. The tight-space
292 /// relocation loop installs the re-opened view in the new version and arms
293 /// the prior view to punch its consumed prefix once its readers drain, so a
294 /// stale blob file is reclaimed in place while the suffix keeps serving the
295 /// not-yet-relocated entries — the blob analog of [`Table::reopen_restricted`](crate::Table::reopen_restricted).
296 ///
297 /// # Errors
298 ///
299 /// Propagates any error from re-opening the file.
300 #[cfg(feature = "std")]
301 pub(crate) fn reopen(&self) -> crate::Result<Self> {
302 super::recover_blob_file(
303 &self.0.path,
304 self.0.id,
305 self.0.checksum,
306 self.0.tree_id,
307 &self.0.fs,
308 )
309 }
310
311 /// Installs the tree-wide deletion pause used by checkpoints.
312 /// Idempotent: a second call is a no-op.
313 pub(crate) fn install_deletion_pause(&self, pause: Arc<DeletionPause>) {
314 let _ = self.0.deletion_pause.set(Box::new(pause));
315 }
316
317 /// Installs the tree-wide background file deleter. Idempotent.
318 #[cfg(feature = "std")]
319 pub(crate) fn install_background_deleter(&self, deleter: Arc<crate::BackgroundDeleter>) {
320 let _ = self.0.background_deleter.set(Box::new(deleter));
321 }
322
323 /// Returns the blob file ID.
324 #[must_use]
325 pub fn id(&self) -> BlobFileId {
326 self.0.id
327 }
328
329 /// Returns the full blob file checksum.
330 #[must_use]
331 pub fn checksum(&self) -> Checksum {
332 self.0.checksum
333 }
334
335 /// Returns the blob file path.
336 #[must_use]
337 pub fn path(&self) -> &Path {
338 &self.0.path
339 }
340
341 /// Returns the blob file accessor.
342 #[must_use]
343 pub(crate) fn file_accessor(&self) -> &FileAccessor {
344 &self.0.file_accessor
345 }
346
347 /// Returns the number of items in the blob file.
348 #[must_use]
349 #[expect(clippy::len_without_is_empty)]
350 pub fn len(&self) -> u64 {
351 self.0.meta.item_count
352 }
353
354 /// Physical on-disk file size in bytes, including the per-entry framing
355 /// (V4 header + key) and the metadata block / trailer — not just the
356 /// compressed payload (`meta.total_compressed_bytes`). Used as a
357 /// conservative upper bound on the transient output of a blob relocation:
358 /// the rewritten file re-emits the same framing, so the source file's
359 /// physical size bounds the output (and includes the dead blobs a relocation
360 /// drops, making it strictly conservative).
361 ///
362 /// # Errors
363 ///
364 /// Returns an error if the blob file's size cannot be stat-ed.
365 pub(crate) fn physical_size(&self) -> crate::Result<u64> {
366 Ok(self.0.fs.metadata(&self.0.path)?.len)
367 }
368
369 /// Returns `true` if the blob file is stale (based on the given staleness threshold).
370 pub(crate) fn is_stale(&self, frag_map: &FragmentationMap, threshold: f32) -> bool {
371 frag_map.get(&self.id()).is_some_and(|x| {
372 #[expect(
373 clippy::cast_precision_loss,
374 reason = "ok to lose precision as this is an approximate calculation"
375 )]
376 let stale_bytes = x.bytes as f32;
377 #[expect(
378 clippy::cast_precision_loss,
379 reason = "ok to lose precision as this is an approximate calculation"
380 )]
381 let all_bytes = self.0.meta.total_uncompressed_bytes as f32;
382 let ratio = stale_bytes / all_bytes;
383 ratio >= threshold
384 })
385 }
386
387 /// Returns `true` if the blob file has no more incoming references, and can be safely removed from a Version.
388 pub(crate) fn is_dead(&self, frag_map: &FragmentationMap) -> bool {
389 frag_map.get(&self.id()).is_some_and(|x| {
390 let stale_bytes = x.bytes;
391 let all_bytes = self.0.meta.total_uncompressed_bytes;
392 stale_bytes == all_bytes
393 })
394 }
395}