holt 0.3.3

An adaptive-radix-tree metadata storage engine for path-shaped keys, with per-blob concurrency and crash-safe persistence.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
//! One checkpoint round — the planner's main work unit, also
//! invoked synchronously by `Checkpointer::Drop` to drain in-flight
//! dirty state before the Tree handle disappears.
//!
//! ## Sequence
//!
//! 0. **Merge pass** (optional, controlled by
//!    `CheckpointConfig::auto_merge`) — drains queued parent-merge
//!    candidates and folds mergeable children back into parents.
//!    Merge mutations are staged through the same dirty /
//!    pending-delete sets as foreground writes, then flushed by
//!    this round after the WAL sync.
//! 1. **Snapshot dirty + pending deletes** under the exclusive
//!    side of the tree's commit-publish gate.
//! 2. **Flush WAL** through the journal worker so every record that
//!    mirrors a snapshotted seq is durable before we drop it.
//! 3. **Clone snapshotted bytes** while still holding the same
//!    commit-publish gate, then enqueue one checkpoint epoch to
//!    the I/O worker.
//! 4. **Retire completed epochs** on later planner turns in FIFO
//!    order. This is the truncate watermark: a later epoch may not
//!    advance WAL trimming before every older epoch is known to
//!    have landed or restored.
//! 5. **Truncate WAL** only when the pipeline is empty and
//!    `bm.dirty_count() == 0 && bm.pending_delete_count() == 0`
//!    under the commit-publish gate.
//!
//! This function is called from two places:
//!
//! - The `checkpoint_thread` main loop in [`super::mod`]
//!   (background path).
//! - `Checkpointer::Drop` (synchronous final round on the calling
//!   thread, after the planner has joined and writers are
//!   guaranteed to be gone).

use crossbeam_channel::{bounded, Receiver, TryRecvError};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use crate::api::errors::{Error, Result};
use crate::engine;
use crate::layout::BlobGuid;
use crate::store::blob_store::BlobStore;
use crate::store::WriteThroughEntry;

use super::io::{CheckpointEpoch, CheckpointEpochReport, IoTask};
use super::Shared;

pub(super) struct Pipeline {
    in_flight: VecDeque<PendingEpoch>,
    max_in_flight: usize,
}

struct PendingEpoch {
    rx: Receiver<CheckpointEpochReport>,
    snap: HashMap<BlobGuid, u64>,
    pending: HashMap<BlobGuid, u64>,
}

impl Pipeline {
    pub(super) fn new(max_in_flight: usize) -> Self {
        Self {
            in_flight: VecDeque::new(),
            max_in_flight: max_in_flight.max(1),
        }
    }

    pub(super) fn has_room(&self) -> bool {
        self.in_flight.len() < self.max_in_flight
    }

    pub(super) fn is_empty(&self) -> bool {
        self.in_flight.is_empty()
    }

    pub(super) fn reap_ready(&mut self, shared: &Arc<Shared>) -> Result<()> {
        while let Some(front) = self.in_flight.front() {
            match front.rx.try_recv() {
                Ok(report) => {
                    self.in_flight.pop_front().expect("front exists");
                    finish_epoch(shared, report)?;
                }
                Err(TryRecvError::Empty) => break,
                Err(TryRecvError::Disconnected) => {
                    let pending = self.in_flight.pop_front().expect("front exists");
                    restore_unreported_epoch(shared, pending);
                    return Err(Error::Internal(
                        "checkpoint: I/O worker dropped epoch completion",
                    ));
                }
            }
        }
        self.maybe_truncate(shared)
    }

    fn wait_for_room(&mut self, shared: &Arc<Shared>) -> Result<()> {
        if self.has_room() {
            return Ok(());
        }
        self.wait_one(shared)
    }

    pub(super) fn drain(&mut self, shared: &Arc<Shared>) -> Result<()> {
        let mut first_err = None;
        while !self.in_flight.is_empty() {
            if let Err(e) = self.wait_one(shared) {
                first_err.get_or_insert(e);
            }
        }
        if let Some(e) = first_err {
            return Err(e);
        }
        self.maybe_truncate(shared)
    }

    fn wait_one(&mut self, shared: &Arc<Shared>) -> Result<()> {
        let Some(pending) = self.in_flight.pop_front() else {
            return Ok(());
        };
        if let Ok(report) = pending.rx.recv() {
            finish_epoch(shared, report)
        } else {
            restore_unreported_epoch(shared, pending);
            Err(Error::Internal(
                "checkpoint: I/O worker dropped epoch completion",
            ))
        }
    }

    fn push(&mut self, pending: PendingEpoch) {
        debug_assert!(self.has_room());
        self.in_flight.push_back(pending);
    }

    fn maybe_truncate(&self, shared: &Arc<Shared>) -> Result<()> {
        if !self.in_flight.is_empty() {
            return Ok(());
        }
        let Some(journal) = &shared.journal else {
            return Ok(());
        };
        if !journal.needs_checkpoint() {
            return Ok(());
        }
        let _commit = shared.commit_gate.enter_checkpoint();
        if shared.bm.dirty_count() == 0 && shared.bm.pending_delete_count() == 0 {
            journal.truncate()?;
            use std::sync::atomic::Ordering;
            shared.truncates.fetch_add(1, Ordering::Relaxed);
        }
        Ok(())
    }
}

pub(super) fn run_round_sync(shared: &Arc<Shared>) -> Result<()> {
    let mut pipeline = Pipeline::new(1);
    run_round(shared, &mut pipeline)?;
    pipeline.drain(shared)
}

// The round is intentionally a single linear submission function:
// it maps "what is durable enough to enqueue" without hiding the
// WAL watermark / dirty snapshot / byte clone interlock.
#[allow(clippy::too_many_lines)]
pub(super) fn run_round(shared: &Arc<Shared>, pipeline: &mut Pipeline) -> Result<()> {
    use std::sync::atomic::Ordering;

    pipeline.reap_ready(shared)?;
    pipeline.wait_for_room(shared)?;

    shared.rounds_attempted.fetch_add(1, Ordering::Relaxed);

    // 0. Optional candidate-driven merge pass.
    let merged = if shared.cfg.auto_merge {
        match run_merge_pass(shared) {
            Ok(n) => n,
            Err(e) => {
                eprintln!("holt: checkpoint merge pass failed: {e}");
                0
            }
        }
    } else {
        0
    };
    shared.merges_total.fetch_add(merged, Ordering::Relaxed);

    #[cfg(feature = "tracing")]
    let round_start = std::time::Instant::now();

    // 1+2. Snapshot dirty + pending-deletes + cloned bytes + WAL
    // watermark under the same commit-publish gate used by
    // foreground persistent writers. Holding the gate through byte
    // cloning is load-bearing: a writer must not mutate a blob
    // between our dirty snapshot and `snapshot_bytes`, otherwise the
    // store flush could include bytes whose WAL record was not part
    // of the checkpoint snapshot.
    //
    // If `snapshot_pending_deletes` were taken outside this
    // commit-publish block, a writer could (a) enter its mutation,
    // (b) walker.erase that hits `SubtreeGone` (which calls
    // `mark_for_delete`), (c) submit the erase record, (d)
    // leave the gate, before we snapshot pending; we'd then
    // execute `store.delete_blob` and re-Sync manifest while
    // the writer's WAL record was still only in the writer's
    // buffer. A crash there would leave the manifest ahead of
    // WAL — exactly the W2D violation deferred-delete was
    // designed to prevent.
    //
    // No-WAL trees (memory mode, user-supplied store) skip the WAL
    // watermark but still clone immediately after draining.
    let (snap, pending, snap_bytes, wal_up_to) = if let Some(journal) = &shared.journal {
        let _commit = shared.commit_gate.enter_checkpoint();
        let snap = shared.bm.snapshot_dirty();
        let pending = shared.bm.snapshot_pending_deletes();
        let wal_up_to = journal.wal_work();
        let mut snap_bytes = Vec::with_capacity(snap.len());
        for (guid, seq) in &snap {
            let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
                shared.bm.restore_pending_deletes(pending);
                shared.bm.restore_dirty(snap.clone());
                return Err(Error::Internal(
                    "checkpoint: dirty entry lost cache image — invariant I1 violated",
                ));
            };
            snap_bytes.push((*guid, *seq, bytes));
        }
        (snap, pending, snap_bytes, Some(wal_up_to))
    } else {
        let snap = shared.bm.snapshot_dirty();
        let pending = shared.bm.snapshot_pending_deletes();
        let mut snap_bytes = Vec::with_capacity(snap.len());
        for (guid, seq) in &snap {
            let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
                shared.bm.restore_pending_deletes(pending);
                shared.bm.restore_dirty(snap.clone());
                return Err(Error::Internal(
                    "checkpoint: dirty entry lost cache image — invariant I1 violated",
                ));
            };
            snap_bytes.push((*guid, *seq, bytes));
        }
        (snap, pending, snap_bytes, None)
    };

    // 3. Force the WAL watermark before data-file writes, but do
    // not hold `commit_gate` across the fsync. Later writers may
    // append more WAL records while this flush runs; that is safe
    // because this epoch only writes the cloned dirty snapshot and
    // write-through retirement keeps newer dirty entries alive.
    if let (Some(journal), Some(up_to)) = (&shared.journal, wal_up_to) {
        if let Err(e) = journal.flush_up_to(up_to) {
            shared.bm.restore_pending_deletes(pending);
            shared.bm.restore_dirty(snap);
            return Err(e);
        }
    }
    let snap_count = snap.len();
    shared.last_dirty_count.store(snap_count, Ordering::Relaxed);

    // Early-skip only when nothing at all needs attention. A
    // pending deferred-delete from a previous round (e.g. one
    // whose `store.delete_blob` or trailing Sync failed and
    // got restored) was already drained above; check the
    // snapshot's length so we don't bail out on something we
    // just picked up. `needs_flush` covers the other recovery
    // edge: a prior round may have retired dirty entries after a
    // successful write-through but failed the following store
    // Sync, so there is still durable work even when dirty/pending
    // are both empty. A WAL-only round can skip store Sync but
    // must still retry truncate.
    let needs_store_flush = pipeline.in_flight.is_empty() && shared.bm.needs_flush();
    if snap.is_empty() && merged == 0 && pending.is_empty() && !needs_store_flush {
        pipeline.maybe_truncate(shared)?;
        shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
        #[cfg(feature = "tracing")]
        tracing::trace!(target: "holt::checkpoint", "round skipped — nothing dirty");
        return Ok(());
    }

    // 3. Hand the whole epoch to the I/O worker. The planner has
    // already snapshotted durable intent under the commit-publish
    // gate; the worker can now drive data writes, store sync, pending
    // manifest deletes, and trailing sync without holding up writers
    // or future snapshot rounds.
    let entries: Vec<_> = snap_bytes
        .into_iter()
        .map(|(guid, seq, bytes)| WriteThroughEntry {
            guid,
            bytes,
            expected_seq: seq,
        })
        .collect();
    let pending_for_recovery = pending.clone();
    let (tx, rx) = bounded(1);
    let epoch = CheckpointEpoch { entries, pending };
    if shared
        .io_tx
        .send(IoTask::CommitEpoch { epoch, on_done: tx })
        .is_err()
    {
        shared.bm.restore_pending_deletes(pending_for_recovery);
        shared.bm.restore_dirty(snap);
        return Err(Error::Internal(
            "checkpoint: I/O worker channel closed mid-round",
        ));
    }
    pipeline.push(PendingEpoch {
        rx,
        snap,
        pending: pending_for_recovery,
    });

    #[cfg(feature = "tracing")]
    {
        let elapsed = round_start.elapsed();
        tracing::info!(
            target: "holt::checkpoint",
            dirty_snapshot = snap_count,
            merged = merged,
            in_flight = pipeline.in_flight.len(),
            elapsed_us = elapsed.as_micros() as u64,
            "round submitted",
        );
    }

    Ok(())
}

/// Candidate-driven merge pass — fold mergeable `BlobNode`
/// children back into their parents. Stages the mutations via the
/// unified `mark_dirty` + `mark_for_delete` protocol so the round's
/// later checkpoint epoch (WAL flush → data writes → store sync →
/// pending deletes → re-Sync → truncate) handles persistence under W2D.
/// Takes the exclusive maintenance gate around one parent at a
/// time so no foreground writer is lock-coupling through the child
/// edge being folded and queued for delete. Foreground spillovers
/// enqueue parent blobs. Candidates that inspect only too-large
/// children are consumed; future spillovers or manual maintenance
/// seeding will requeue the parent when there is fresh shape debt.
///
/// Returns the cumulative count of children folded.
///
/// An inline `bm.commit(parent)` + `bm.delete_blob(child)` would
/// be wrong here — both happen pre-Sync, pre-WAL. `bm.commit`
/// would push cache bytes (potentially including user mutations
/// whose WAL records aren't yet durable) directly to store, and
/// `bm.delete_blob` would mutate the manifest in-memory which a
/// later `store.flush` could persist while the corresponding
/// user WAL records still hadn't reached disk. Staging through
/// dirty / pending-delete avoids both: the only flush path is the
/// round's checkpoint epoch, which runs strictly after step 2's
/// WAL flush.
fn run_merge_pass(shared: &Arc<Shared>) -> Result<u64> {
    use crate::store::STRUCTURAL_SEQ;

    let parents = shared.bm.pop_merge_candidates(256);
    let mut merged_total = 0u64;
    for guid in parents {
        let _maintenance = shared.maintenance_gate.enter_exclusive();
        if !shared.bm.has_blob(guid)? {
            continue;
        }
        let _commit = shared
            .journal
            .as_ref()
            .map(|_| shared.commit_gate.enter_writer());
        let pin = shared.bm.pin(guid)?;
        let (stats, has_children) = {
            let mut guard = pin.write();
            let mut frame = guard.frame();
            let stats = engine::try_merge_children(shared.bm.as_ref(), &mut frame, STRUCTURAL_SEQ)?;
            (stats, frame.header().num_ext_blobs != 0)
        };
        if stats.merged > 0 {
            // Keep the parent pin alive until after dirty
            // publication; otherwise eviction can drop the updated
            // cache image before this round snapshots it.
            shared.bm.mark_dirty(guid, STRUCTURAL_SEQ);
            merged_total += u64::from(stats.merged);
            if has_children {
                shared.bm.note_merge_candidate(guid);
            }
        }
        drop(pin);
    }
    shared.bm.note_merges(merged_total);
    Ok(merged_total)
}

fn finish_epoch(shared: &Arc<Shared>, report: CheckpointEpochReport) -> Result<()> {
    use std::sync::atomic::Ordering;

    shared
        .blobs_flushed
        .fetch_add(report.dirty_flushed as u64, Ordering::Relaxed);
    let dirty_total = report.dirty_total;
    let dirty_flushed = report.dirty_flushed;
    let pending_total = report.pending_total;
    let applied_deletes = report.applied_deletes;
    if let Err(e) = report.result {
        eprintln!(
            "holt: checkpoint epoch failed (dirty={dirty_flushed}/{dirty_total}, pending deleted={applied_deletes}/{pending_total}): {e}",
        );
        return Err(e);
    }
    shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
    Ok(())
}

fn restore_unreported_epoch(shared: &Arc<Shared>, pending: PendingEpoch) {
    shared.bm.restore_pending_deletes(pending.pending);
    shared.bm.restore_dirty(pending.snap);
}