holt 0.1.0

An adaptive-radix-tree metadata storage engine for path-shaped keys, with per-blob concurrency and crash-safe persistence.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Stage 5c integration tests — `Tree` <-> WAL hookup.
//!
//! Cover:
//! - Persistent put/delete/rename round-trip through reopen with
//!   `wal_sync_on_commit = true` (verifies WAL replay reconstructs
//!   the logical state on a crash-without-checkpoint).
//! - "Default mode without checkpoint loses unflushed" — under
//!   the default config (no per-op fsync) a drop without
//!   `checkpoint()` leaves the disk WAL empty and reopen sees
//!   the pre-mutation state.
//! - `checkpoint()` flushes everything and truncates the WAL.

use std::fs;
use std::path::{Path, PathBuf};

use tempfile::tempdir;

use holt::{Tree, TreeConfig};

fn wal_path(dir: &Path) -> PathBuf {
    dir.join("journal.wal")
}

/// `TreeConfig::new(dir)` plus `wal_sync_on_commit = true` —
/// tests that simulate a crash without checkpoint need every
/// record on disk before the drop.
fn durable_cfg(dir: &std::path::Path) -> TreeConfig {
    let mut cfg = TreeConfig::new(dir);
    cfg.wal_sync_on_commit = true;
    cfg
}

#[test]
fn persistent_put_then_reopen_via_wal_replay() {
    let dir = tempdir().unwrap();
    let cfg = durable_cfg(dir.path());

    // Round 1: open, put, drop without checkpoint. Per-op WAL
    // fsync is on (`wal_sync_on_commit = true`), so every record
    // is on disk before the drop.
    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..50u32 {
            let k = format!("k{i:03}");
            let v = format!("v-{i}");
            tree.put(k.as_bytes(), v.as_bytes()).unwrap();
        }
    } // tree dropped — no explicit checkpoint.

    // The WAL file exists and has bytes past the header.
    let wal_size_after_drop = fs::metadata(wal_path(dir.path())).unwrap().len();
    assert!(wal_size_after_drop > 32, "WAL should hold records");

    // Round 2: reopen. Replay rebuilds every key from the log.
    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..50u32 {
            let k = format!("k{i:03}");
            let v = format!("v-{i}");
            assert_eq!(
                tree.get(k.as_bytes()).unwrap().as_deref(),
                Some(v.as_bytes()),
                "WAL replay should have restored key {k}",
            );
        }
    }
}

#[test]
fn checkpoint_truncates_wal_and_keys_survive_reopen() {
    let dir = tempdir().unwrap();
    // Need per-op fsync so the WAL has bytes on disk before
    // the pre-checkpoint size assertion can trip.
    let cfg = durable_cfg(dir.path());

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..20u32 {
            tree.put(format!("k{i:02}").as_bytes(), format!("v{i}").as_bytes())
                .unwrap();
        }
        let wal_size_before = fs::metadata(wal_path(dir.path())).unwrap().len();
        assert!(wal_size_before > 32);
        tree.checkpoint().unwrap();
        let wal_size_after = fs::metadata(wal_path(dir.path())).unwrap().len();
        assert_eq!(
            wal_size_after, 32,
            "checkpoint should truncate WAL to header-only",
        );
    }

    // Reopen — everything still readable, but via the blob image
    // rather than WAL replay (the WAL is header-only).
    {
        let tree = Tree::open(cfg).unwrap();
        for i in 0..20u32 {
            let k = format!("k{i:02}");
            let v = format!("v{i}");
            assert_eq!(
                tree.get(k.as_bytes()).unwrap().as_deref(),
                Some(v.as_bytes()),
            );
        }
    }
}

#[test]
fn delete_through_wal_replays_correctly() {
    let dir = tempdir().unwrap();
    let cfg = durable_cfg(dir.path());

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..10u32 {
            tree.put(format!("k{i}").as_bytes(), format!("v{i}").as_bytes())
                .unwrap();
        }
        // Delete every even key.
        for i in 0..10u32 {
            if i % 2 == 0 {
                let prev = tree.delete(format!("k{i}").as_bytes()).unwrap();
                assert!(prev.is_some());
            }
        }
    }

    let tree = Tree::open(cfg).unwrap();
    for i in 0..10u32 {
        let got = tree.get(format!("k{i}").as_bytes()).unwrap();
        if i % 2 == 0 {
            assert_eq!(got, None, "k{i} should have been deleted");
        } else {
            assert_eq!(got.as_deref(), Some(format!("v{i}").as_bytes()));
        }
    }
}

#[test]
fn rename_through_wal_replays_correctly() {
    let dir = tempdir().unwrap();
    let cfg = durable_cfg(dir.path());

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        tree.put(b"a", b"v-a").unwrap();
        tree.put(b"b", b"v-b").unwrap();
        tree.rename(b"a", b"a2", false).unwrap();
    }

    let tree = Tree::open(cfg).unwrap();
    assert_eq!(tree.get(b"a").unwrap(), None);
    assert_eq!(tree.get(b"a2").unwrap().as_deref(), Some(&b"v-a"[..]));
    assert_eq!(tree.get(b"b").unwrap().as_deref(), Some(&b"v-b"[..]));
}

#[test]
fn default_mode_loses_writes_without_checkpoint_or_fsync() {
    // Under the default config (`wal_sync_on_commit = false`),
    // the WAL writer keeps records in its in-memory `Vec` /
    // auto-flushes them only when the buffer crosses 64 KB.
    // A short workload + drop-without-checkpoint = nothing
    // durable — exactly the high-throughput trade-off.
    let dir = tempdir().unwrap();
    let cfg = TreeConfig::new(dir.path());

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..50u32 {
            tree.put(
                format!("transient{i}").as_bytes(),
                format!("v{i}").as_bytes(),
            )
            .unwrap();
        }
        // Drop without `checkpoint()`. 50 records × ~80 B is
        // well below the 64 KB auto-flush threshold, so the WAL
        // file on disk is still header-only.
    }
    let wal_size = fs::metadata(wal_path(dir.path())).unwrap().len();
    assert_eq!(wal_size, 32);

    let tree = Tree::open(cfg).unwrap();
    for i in 0..50u32 {
        assert_eq!(
            tree.get(format!("transient{i}").as_bytes()).unwrap(),
            None,
            "transient{i} should have been lost",
        );
    }
}

#[test]
fn batched_mode_loses_writes_without_checkpoint() {
    let dir = tempdir().unwrap();
    let mut cfg = TreeConfig::new(dir.path());
    cfg.flush_on_write = false;

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..10u32 {
            tree.put(
                format!("transient{i}").as_bytes(),
                format!("v{i}").as_bytes(),
            )
            .unwrap();
        }
        // Drop without `checkpoint()`. Nothing was flushed:
        // - WAL records buffered in memory → lost
        // - BM root blob mutated in memory → lost
    }

    // Reopen — empty WAL, empty blob, no keys readable.
    let tree = Tree::open(cfg).unwrap();
    for i in 0..10u32 {
        assert_eq!(
            tree.get(format!("transient{i}").as_bytes()).unwrap(),
            None,
            "transient{i} should have been lost",
        );
    }
}

#[test]
fn batched_mode_with_checkpoint_persists_everything() {
    let dir = tempdir().unwrap();
    let mut cfg = TreeConfig::new(dir.path());
    cfg.flush_on_write = false;

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..30u32 {
            tree.put(
                format!("batch{i:02}").as_bytes(),
                format!("v{i}").as_bytes(),
            )
            .unwrap();
        }
        tree.checkpoint().unwrap();
        // After checkpoint, WAL is truncated and the blob image
        // is durable. Subsequent drops without further mutation
        // are safe.
    }

    let tree = Tree::open(cfg).unwrap();
    for i in 0..30u32 {
        let v = tree
            .get(format!("batch{i:02}").as_bytes())
            .unwrap()
            .expect("batch key survives via blob image");
        assert_eq!(v, format!("v{i}").into_bytes());
    }
}

#[test]
fn next_seq_resumes_past_replayed_records() {
    let dir = tempdir().unwrap();
    let cfg = durable_cfg(dir.path());

    // Round 1: write 5 keys; each consumes one seq.
    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..5u32 {
            tree.put(format!("k{i}").as_bytes(), b"v").unwrap();
        }
    }

    // Round 2: reopen. The replayed records carried seq 1..=5.
    // The first new `put` must use seq >= 6 — otherwise a leaf
    // built later could overwrite one rebuilt by replay.
    {
        let tree = Tree::open(cfg).unwrap();
        // The exact seq isn't exposed, but the round-trip
        // semantics imply: after a put, the value is readable.
        tree.put(b"after-replay", b"v").unwrap();
        assert_eq!(
            tree.get(b"after-replay").unwrap().as_deref(),
            Some(&b"v"[..])
        );
        // And every earlier key still readable too.
        for i in 0..5u32 {
            assert_eq!(
                tree.get(format!("k{i}").as_bytes()).unwrap().as_deref(),
                Some(&b"v"[..]),
            );
        }
    }
}

#[test]
fn open_with_backend_attaches_no_wal() {
    use holt::{Backend, MemoryBackend, TreeBuilder};
    use std::sync::Arc;

    let dir = tempdir().unwrap();
    let backend: Arc<dyn Backend> = Arc::new(MemoryBackend::new());

    // open_with_backend deliberately bypasses WAL — `dir` here is
    // informational; the backend stores in memory.
    {
        let tree = TreeBuilder::new(dir.path())
            .open_with_backend(backend.clone())
            .unwrap();
        tree.put(b"k", b"v").unwrap();
    }

    // No WAL file should have been created.
    assert!(!wal_path(dir.path()).exists());
}

#[test]
fn many_round_trips_through_checkpoint_boundaries() {
    let dir = tempdir().unwrap();
    // The final batch isn't followed by a checkpoint — it relies
    // on per-op WAL fsync to survive the drop + reopen.
    let cfg = durable_cfg(dir.path());

    // Three rounds, each with a checkpoint mid-stream. Verifies
    // that records past a checkpoint are also durable (the WAL
    // truncate doesn't lose anything we already flushed through
    // the blob).
    {
        let tree = Tree::open(cfg.clone()).unwrap();
        for i in 0..20u32 {
            tree.put(format!("a{i:02}").as_bytes(), b"A").unwrap();
        }
        tree.checkpoint().unwrap();
        for i in 0..20u32 {
            tree.put(format!("b{i:02}").as_bytes(), b"B").unwrap();
        }
        tree.checkpoint().unwrap();
        for i in 0..20u32 {
            tree.put(format!("c{i:02}").as_bytes(), b"C").unwrap();
        }
        // No checkpoint after c-batch — relies on WAL replay.
    }

    let tree = Tree::open(cfg).unwrap();
    for i in 0..20u32 {
        assert_eq!(
            tree.get(format!("a{i:02}").as_bytes()).unwrap().as_deref(),
            Some(&b"A"[..]),
        );
        assert_eq!(
            tree.get(format!("b{i:02}").as_bytes()).unwrap().as_deref(),
            Some(&b"B"[..]),
        );
        assert_eq!(
            tree.get(format!("c{i:02}").as_bytes()).unwrap().as_deref(),
            Some(&b"C"[..]),
        );
    }
}

#[test]
fn batch_persists_through_crash_and_replay() {
    // Tree::txn emits one Batch WAL record; on reopen the replay
    // unpacks it transparently into per-inner callbacks so every
    // op in the batch comes back. `wal_sync_on_commit = true`
    // makes the simulated crash drop right after the batch flush.
    let dir = tempdir().unwrap();
    let cfg = durable_cfg(dir.path());

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        // Seed something to mutate inside the batch.
        tree.put(b"seed", b"S").unwrap();

        tree.txn(|b| {
            b.put(b"batch-a", b"A");
            b.put(b"batch-b", b"B");
            b.delete(b"seed");
            b.rename(b"batch-a", b"batch-aa", false);
        })
        .unwrap();
    } // dropped without checkpoint — disk has only the WAL.

    // Reopen — replay should reconstruct the post-batch state.
    let tree = Tree::open(cfg).unwrap();
    assert!(tree.get(b"seed").unwrap().is_none());
    assert!(tree.get(b"batch-a").unwrap().is_none());
    assert_eq!(tree.get(b"batch-aa").unwrap().as_deref(), Some(&b"A"[..]));
    assert_eq!(tree.get(b"batch-b").unwrap().as_deref(), Some(&b"B"[..]));
}

#[test]
fn batch_crash_before_flush_loses_whole_batch() {
    // Default mode (`wal_sync_on_commit = false`): if we drop
    // without checkpoint, the OS may not have flushed the batch
    // record yet, so the whole batch is rolled back on reopen.
    // We exercise the contract by skipping checkpoint and
    // checking that the unflushed batch isn't visible after
    // reopen.
    let dir = tempdir().unwrap();
    let cfg = TreeConfig::new(dir.path()); // default: wal_sync_on_commit = false

    {
        let tree = Tree::open(cfg.clone()).unwrap();
        tree.put(b"durable", b"D").unwrap();
        tree.checkpoint().unwrap();

        // Batch goes through the BM cache but the WAL flush is
        // deferred; without a checkpoint, the on-disk WAL stays
        // empty for these ops.
        tree.txn(|b| {
            b.put(b"vanish-a", b"VA");
            b.put(b"vanish-b", b"VB");
        })
        .unwrap();
        // Note: we do NOT call tree.checkpoint() — the batch
        // record sits in the WAL's in-memory buffer and dies
        // with the process.
    }

    let tree = Tree::open(cfg).unwrap();
    assert_eq!(tree.get(b"durable").unwrap().as_deref(), Some(&b"D"[..]));
    assert!(tree.get(b"vanish-a").unwrap().is_none());
    assert!(tree.get(b"vanish-b").unwrap().is_none());
}