1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2024-present, fjall-rs
// Copyright (c) 2026-present, Structured World Foundation
pub mod merge;
pub mod meta;
pub mod multi_writer;
pub mod reader;
pub mod scanner;
pub mod writer;
use crate::path::{Path, PathBuf};
use crate::{
Checksum, GlobalTableId, TreeId, blob_tree::FragmentationMap, deletion_pause::DeletionPause,
file_accessor::FileAccessor, fs::Fs, vlog::BlobFileId,
};
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::sync::atomic::AtomicBool;
pub use meta::Metadata;
/// A blob file is an immutable, sorted, contiguous file that contains large key-value pairs (blobs)
//
// `#[derive(Debug)]` cannot be used because [`Fs`] is not `Debug` (trait
// objects without an explicit `Debug` bound would require boxing through
// `dyn Debug`). A manual impl that prints stable identifiers gives the
// same operational ergonomics as the previous derived `Debug` without
// pulling `Debug` into the `Fs` trait bound (which would cascade through
// every backend).
pub struct Inner {
/// Blob file ID
pub id: BlobFileId,
pub tree_id: TreeId,
/// File path
pub path: PathBuf,
/// Statistics
pub meta: Metadata,
/// Whether this blob file is deleted (logically)
pub is_deleted: AtomicBool,
/// Tight-space punch-on-drop offset, or [`u64::MAX`] (default) for "no
/// punch". When tight-space blob relocation rewrites this file's live
/// entries below an offset into a fresh compact file, the PRIOR view is
/// marked here with that absolute data-section offset; once every reader
/// holding it drops, this view's [`Drop`] reclaims the consumed
/// `[data_start, offset)` data frames via
/// [`Fs::punch_hole`] and LEAVES the file in
/// place (the restricted view still serves the suffix). Mirrors
/// `table::Inner::punch_on_drop`. Distinct from [`Self::is_deleted`].
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "tight-space punch-on-drop frontier; the punch consumer is std-gated, so unread under no_std"
)
)]
pub(crate) punch_on_drop: portable_atomic::AtomicU64,
pub checksum: Checksum,
pub(crate) file_accessor: FileAccessor,
/// Filesystem backend used by [`Drop`] for the physical removal.
/// Carries the same `Fs` instance the file was opened through so that
/// in-memory and routed-tier backends behave consistently with the
/// rest of the tree.
pub(crate) fs: Arc<dyn Fs>,
/// Tree-wide file-deletion gate. Installed once by
/// [`BlobFile::install_deletion_pause`] after the file is registered
/// with a tree. When `Some` and active, the [`Drop`] impl defers the
/// underlying `remove_file` so an in-progress checkpoint can hard-link
/// the file before it disappears.
// `once_cell::race::OnceBox` — see Table::Inner::deletion_pause
// for the rationale (no-std-friendly one-shot slot).
pub(crate) deletion_pause: once_cell::race::OnceBox<Arc<DeletionPause>>,
/// Tree-wide background file deleter. See
/// [`Table::install_background_deleter`](crate::Table) for the contract:
/// when present (and no checkpoint pause is active) the [`Drop`] impl frees
/// the blob file's blocks synchronously via
/// [`Fs::truncate_file`] and hands the
/// directory-entry `unlink` to this deleter, off the foreground path.
// std-only (the deleter spawns a thread); see Table::Inner for rationale.
#[cfg(feature = "std")]
pub(crate) background_deleter: once_cell::race::OnceBox<Arc<crate::BackgroundDeleter>>,
}
impl Inner {
fn global_id(&self) -> GlobalTableId {
GlobalTableId::from((self.tree_id, self.id))
}
}
impl core::fmt::Debug for Inner {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("blob_file::Inner")
.field("id", &self.id)
.field("tree_id", &self.tree_id)
.field("path", &self.path)
.field(
"is_deleted",
&self.is_deleted.load(core::sync::atomic::Ordering::Relaxed),
)
.field("meta", &self.meta)
.finish_non_exhaustive()
}
}
impl Drop for Inner {
fn drop(&mut self) {
if self.is_deleted.load(core::sync::atomic::Ordering::Acquire) {
log::trace!(
"Cleanup deleted blob file {:?} at {}",
self.id,
self.path.display(),
);
// Move the accessor out and drop it FIRST so every pinned
// Arc<dyn FsFile> the file_accessor holds is released before
// we try to unlink. On Windows (and any other platform where
// an open handle blocks unlink) a live handle here would
// make remove_file fail silently, leaking the blob file's
// disk space — the same hazard already handled in
// table::Inner::drop. Eviction from the descriptor table
// happens through the same accessor before the drop.
let global_id = self.global_id();
let file_accessor = core::mem::replace(&mut self.file_accessor, FileAccessor::Closed);
file_accessor
.as_descriptor_table()
.inspect(|d| d.remove_for_blob_file(&global_id));
drop(file_accessor);
// If a checkpoint is active, defer the physical deletion so the
// file remains hard-linkable until the checkpoint releases its
// pause. Short-circuit on the common no-checkpoint path: skip
// the Arc<dyn Fs> bump and PathBuf clone unless a pause is
// both installed AND currently active. `try_enqueue` still
// re-checks `is_active()` under the queue lock to close the
// publish-then-release race, so the outer check is pure perf.
let deferred = match self.deletion_pause.get() {
Some(pause) if pause.is_active() => {
pause.try_enqueue(Arc::clone(&self.fs), self.path.clone())
}
_ => false,
};
if deferred {
log::trace!(
"Deferred deletion of blob file {:?} at {} (checkpoint active)",
self.id,
self.path.display(),
);
return;
}
// Off-foreground reclaim: free the blocks synchronously (accurate
// footprint scan) and hand the unlink to the background deleter.
// Falls through to a synchronous remove_file when none installed.
#[cfg(feature = "std")]
if let Some(deleter) = self.background_deleter.get() {
// Truncate only when we own the sole hard link — a checkpoint
// may have hard-linked this blob file, and truncating the shared
// inode would zero the checkpoint's copy. Otherwise skip the
// truncate and just unlink (data survives via the other link).
if self.fs.hard_link_count(&self.path).is_ok_and(|n| n <= 1)
&& let Err(e) = self.fs.truncate_file(&self.path)
{
log::warn!(
"Failed to truncate deleted blob file {:?} at {}: {e:?}",
self.id,
self.path.display(),
);
}
deleter.enqueue(Arc::clone(&self.fs), self.path.clone());
return;
}
if let Err(e) = self.fs.remove_file(&self.path) {
log::warn!(
"Failed to cleanup deleted blob file {:?} at {}: {e:?}",
self.id,
self.path.display(),
);
}
} else {
// Not deleted, but possibly marked for tight-space prefix reclaim:
// this (old) view's last Arc is dropping, so no reader can touch the
// relocated prefix anymore. Punch the consumed data frames
// `[data_start, offset)` and LEAVE the file — the restricted view (a
// distinct Inner) still serves the suffix. A blob file is an SFA
// archive, so the punch must start at the `data` section (skip the
// header); the TOC sits at the tail and stays intact. `offset` is an
// absolute data-section position (a frame boundary from the
// relocation scanner). Re-read the data-section start from the TOC
// here rather than carrying it on every blob-file Inner — the punch
// is a rare, tight-space-only path.
//
// Hole punching is a std-only capability (the tight-space relocation
// loop that arms it is itself `#[cfg(feature = "std")]`), so the punch
// action is gated. The atomic load is no-std-safe but pointless when
// nothing can arm it.
#[cfg(feature = "std")]
{
let off = self
.punch_on_drop
.load(core::sync::atomic::Ordering::Acquire);
if off != u64::MAX {
match data_section_start(&*self.fs, &self.path) {
Ok(data_start) if off > data_start => {
if let Err(e) =
self.fs.punch_hole(&self.path, data_start, off - data_start)
{
log::warn!(
"Failed to punch tight-space data [{data_start}, {off}) of blob file {:?} at {}: {e:?}",
self.id,
self.path.display(),
);
}
}
Ok(_) => {} // nothing consumed below the data start
Err(e) => log::warn!(
"Skipping tight-space punch of blob file {:?} at {}: could not read data section: {e:?}",
self.id,
self.path.display(),
),
}
}
}
}
}
}
/// Byte offset where a blob file's `data` section begins, read from its SFA TOC.
/// Used by the tight-space punch so it reclaims only data frames and never the
/// SFA header that precedes them.
#[cfg(feature = "std")]
fn data_section_start(fs: &dyn Fs, path: &Path) -> crate::Result<u64> {
let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = crate::sfa::Reader::from_reader(&mut file)?;
let data = reader
.toc()
.section(b"data")
.ok_or(crate::Error::InvalidHeader("BlobFile"))?;
Ok(data.pos())
}
/// A blob file stores large values and is part of the value log
#[derive(Clone)]
pub struct BlobFile(pub(crate) Arc<Inner>);
impl Eq for BlobFile {}
impl PartialEq for BlobFile {
fn eq(&self, other: &Self) -> bool {
self.id().eq(&other.id())
}
}
impl core::hash::Hash for BlobFile {
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
impl BlobFile {
pub(crate) fn mark_as_deleted(&self) {
self.0
.is_deleted
.store(true, core::sync::atomic::Ordering::Release);
}
/// Marks this view to punch the consumed `[data_start, offset)` data frames
/// when its last `Arc` drops (see [`Inner::punch_on_drop`]). `offset` is an
/// absolute data-section position. Set on the PRIOR view once a tight-space
/// relocation slice has moved its `[data_start, offset)` live entries into a
/// fresh compact file and that move is durably installed.
#[cfg(feature = "std")]
pub(crate) fn mark_punch_on_drop(&self, offset: u64) {
self.0
.punch_on_drop
.store(offset, core::sync::atomic::Ordering::Release);
}
/// Re-opens this blob file as a DISTINCT [`Inner`] (its own file handle and a
/// fresh punch-on-drop atomic) over the same physical file. The tight-space
/// relocation loop installs the re-opened view in the new version and arms
/// the prior view to punch its consumed prefix once its readers drain, so a
/// stale blob file is reclaimed in place while the suffix keeps serving the
/// not-yet-relocated entries — the blob analog of [`Table::reopen_restricted`](crate::Table::reopen_restricted).
///
/// # Errors
///
/// Propagates any error from re-opening the file.
#[cfg(feature = "std")]
pub(crate) fn reopen(&self) -> crate::Result<Self> {
super::recover_blob_file(
&self.0.path,
self.0.id,
self.0.checksum,
self.0.tree_id,
&self.0.fs,
)
}
/// Installs the tree-wide deletion pause used by checkpoints.
/// Idempotent: a second call is a no-op.
pub(crate) fn install_deletion_pause(&self, pause: Arc<DeletionPause>) {
let _ = self.0.deletion_pause.set(Box::new(pause));
}
/// Installs the tree-wide background file deleter. Idempotent.
#[cfg(feature = "std")]
pub(crate) fn install_background_deleter(&self, deleter: Arc<crate::BackgroundDeleter>) {
let _ = self.0.background_deleter.set(Box::new(deleter));
}
/// Returns the blob file ID.
#[must_use]
pub fn id(&self) -> BlobFileId {
self.0.id
}
/// Returns the full blob file checksum.
#[must_use]
pub fn checksum(&self) -> Checksum {
self.0.checksum
}
/// Returns the blob file path.
#[must_use]
pub fn path(&self) -> &Path {
&self.0.path
}
/// Returns the blob file accessor.
#[must_use]
pub(crate) fn file_accessor(&self) -> &FileAccessor {
&self.0.file_accessor
}
/// Returns the number of items in the blob file.
#[must_use]
#[expect(clippy::len_without_is_empty)]
pub fn len(&self) -> u64 {
self.0.meta.item_count
}
/// Physical on-disk file size in bytes, including the per-entry framing
/// (V4 header + key) and the metadata block / trailer — not just the
/// compressed payload (`meta.total_compressed_bytes`). Used as a
/// conservative upper bound on the transient output of a blob relocation:
/// the rewritten file re-emits the same framing, so the source file's
/// physical size bounds the output (and includes the dead blobs a relocation
/// drops, making it strictly conservative).
///
/// # Errors
///
/// Returns an error if the blob file's size cannot be stat-ed.
pub(crate) fn physical_size(&self) -> crate::Result<u64> {
Ok(self.0.fs.metadata(&self.0.path)?.len)
}
/// Returns `true` if the blob file is stale (based on the given staleness threshold).
pub(crate) fn is_stale(&self, frag_map: &FragmentationMap, threshold: f32) -> bool {
frag_map.get(&self.id()).is_some_and(|x| {
#[expect(
clippy::cast_precision_loss,
reason = "ok to lose precision as this is an approximate calculation"
)]
let stale_bytes = x.bytes as f32;
#[expect(
clippy::cast_precision_loss,
reason = "ok to lose precision as this is an approximate calculation"
)]
let all_bytes = self.0.meta.total_uncompressed_bytes as f32;
let ratio = stale_bytes / all_bytes;
ratio >= threshold
})
}
/// Returns `true` if the blob file has no more incoming references, and can be safely removed from a Version.
pub(crate) fn is_dead(&self, frag_map: &FragmentationMap) -> bool {
frag_map.get(&self.id()).is_some_and(|x| {
let stale_bytes = x.bytes;
let all_bytes = self.0.meta.total_uncompressed_bytes;
stale_bytes == all_bytes
})
}
}