obj-core 1.1.2

Storage engine internals for the obj embedded document database (pager, WAL, B-tree, codec, catalog).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
//! Pager fault-injection integration tests (issue #20).
//!
//! Each test drives `Pager<FaultyFileHandle>` through a hand-crafted
//! crash scenario and verifies the recovery contract from #18 holds:
//! the pager re-opens cleanly, committed pages are present, and
//! uncommitted pages either appear with their committed bytes or do
//! not appear at all.
//!
//! These complement the seed-range cycle test in
//! `tests/crash_cycles.rs`. The cycle test gives coverage; these
//! tests pin specific R1 paths the salt-rotation logic (#16)
//! introduced so a regression is caught at the right layer.

#![cfg(test)]

use std::collections::HashMap;
use std::panic::AssertUnwindSafe;

use tempfile::TempDir;

use crate::pager::page::{Page, PageId};
use crate::pager::{wal_path_for, Config, Pager};
use crate::platform::fault::{FaultPlan, FaultyFileHandle, FAULT_CRASH_MARKER};
use crate::platform::FileHandle;

fn open_faulty(
    main_path: &std::path::Path,
    main_plan: FaultPlan,
    wal_plan: FaultPlan,
    config: Config,
) -> crate::Result<Pager<FaultyFileHandle>> {
    let main = FaultyFileHandle::new(FileHandle::open_or_create(main_path)?, main_plan);
    let wal_path = wal_path_for(main_path);
    let wal = FaultyFileHandle::new(FileHandle::open_or_create(&wal_path)?, wal_plan);
    // #64: subsequent `alloc_page`/`free_page` calls debug-assert
    // `in_txn`. Enter a Pager txn at open so the fault tests below
    // don't have to add a `begin_txn` at every call site.
    let mut p = Pager::<FaultyFileHandle>::open_with_backends(main, wal, wal_path, config)?;
    p.begin_txn();
    Ok(p)
}

fn write_committed(p: &mut Pager<FaultyFileHandle>, id: PageId, marker: u8) -> crate::Result<()> {
    let mut page = Page::zeroed();
    page.as_bytes_mut()[0] = marker;
    page.as_bytes_mut()[1024] = marker.wrapping_mul(3);
    p.write_page(id, &page)?;
    let _ = p.commit()?;
    Ok(())
}

fn id(n: u64) -> PageId {
    PageId::new(n).expect("non-zero")
}

fn panic_carries_marker(payload: &Box<dyn std::any::Any + Send>) -> bool {
    let s = payload
        .downcast_ref::<String>()
        .map(String::as_str)
        .or_else(|| payload.downcast_ref::<&'static str>().copied())
        .unwrap_or("");
    s.contains(FAULT_CRASH_MARKER)
}

/// Test: crash between salt-write and main-file fsync.
///
/// Scenario: after a successful commit + checkpoint, the pager
/// (1) flushes the view to the main file, (2) `sync_data` on main,
/// (3) rotates the WAL salt and `sync_data` on WAL, (4) stamps the
/// new salt into the main header, (5) `sync_data` on main again.
///
/// If a crash lands between steps 3 and 5 we have: NEW salt in WAL,
/// OLD salt in main header. Recovery reads OLD salt, sees a salt
/// mismatch on the WAL, treats it as empty — and because step 2
/// already made the main file authoritative, no data is lost.
///
/// We hand-craft this state by directly editing the WAL header AFTER
/// a clean checkpoint, then reopening with a normal pager. The
/// fault-injection wiring (`Pager<FaultyFileHandle>`) is exercised
/// when we *write* the data, ensuring the production-code path is
/// the same one under test as in the cycle harness.
#[test]
fn crash_between_salt_write_and_main_fsync_recovers() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("crash_salt.obj");
    let a_id = {
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(101),
            FaultPlan::noop(202),
            Config::default(),
        )
        .expect("open faulty");
        let a = p.alloc_page().expect("alloc");
        write_committed(&mut p, a, 0xAA).expect("write+commit");
        p.checkpoint().expect("checkpoint");
        drop(p);
        a.get()
    };
    // Simulate "crash between step 3 (WAL salt rotated and synced)
    // and step 5 (main header updated)": overwrite the WAL header
    // salt to a fresh value the main header does not match.
    {
        use std::fs::OpenOptions;
        use std::io::{Seek, SeekFrom, Write as _};
        let wal_path = wal_path_for(&path);
        let mut f = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&wal_path)
            .expect("open wal");
        // The WAL salt lives at bytes 12..16 of the header.
        f.seek(SeekFrom::Start(12)).expect("seek");
        f.write_all(&0xDEAD_BEEFu32.to_le_bytes()).expect("write");
        f.sync_all().expect("sync");
    }
    // Recovery should succeed via the "salt mismatch → empty WAL"
    // path. Data already on the main file remains readable.
    let mut p = Pager::open(&path, Config::default()).expect("reopen");
    let read = p.read_page(id(a_id)).expect("read");
    assert_eq!(read.as_bytes()[0], 0xAA);
    assert_eq!(read.as_bytes()[1024], 0xAAu8.wrapping_mul(3));
}

/// Test: torn write on a WAL frame mid-commit. The harness writes
/// only a prefix of the page body before fsync; the next reopen
/// must NOT replay the torn frame.
///
/// Pre-#21 the WAL recovery walk truncated at the first bad CRC,
/// silently dropping the rest of the WAL. Post-#21 the truncation
/// only happens past the LAST commit marker — torn-tail bytes
/// after a clean commit are still tolerated.
#[test]
fn torn_write_on_wal_frame_mid_commit_recovers() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("torn_wal.obj");
    let (a_id, b_id) = {
        // Phase 1: open with noop plans, set up two committed pages
        // so the WAL has known-good content.
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(11),
            FaultPlan::noop(22),
            Config::default(),
        )
        .expect("open faulty");
        let a = p.alloc_page().expect("alloc a");
        let b = p.alloc_page().expect("alloc b");
        write_committed(&mut p, a, 0xAA).expect("commit a");
        write_committed(&mut p, b, 0xBB).expect("commit b");
        drop(p);
        (a.get(), b.get())
    };
    // Phase 2: open again, attempt a third commit, but the WAL's
    // write_all_at this time returns a torn prefix. Probability 1.0
    // guarantees the very next write is torn.
    {
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(33),
            FaultPlan::new(44, 1.0, 0.0, 0.0, 0.0, 0),
            Config::default(),
        )
        .expect("open faulty");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0xCC;
        let _ = p.write_page(id(a_id), &page);
        // commit will issue write_all_at on the WAL frame — torn
        // probability 1.0 means it lands short. We allow any outcome
        // (Ok or Err) and rely on reopen to enforce the invariant.
        let _ = p.commit();
        drop(p);
    }
    // Phase 3: reopen with a normal pager. The torn third commit
    // must NOT be visible; the first two commits' bytes must be
    // intact.
    let mut p = Pager::open(&path, Config::default()).expect("reopen");
    let ra = p.read_page(id(a_id)).expect("read a");
    assert_eq!(
        ra.as_bytes()[0],
        0xAA,
        "torn frame must not overwrite committed bytes",
    );
    let rb = p.read_page(id(b_id)).expect("read b");
    assert_eq!(rb.as_bytes()[0], 0xBB);
}

/// Test: a dropped fsync on the main file after checkpoint must
/// still produce a recoverable database via the WAL salt-match
/// path. The salt rotation #16 was designed for exactly this case.
///
/// Scenario: after committing pages a + b, the pager checkpoints.
/// The main-file `sync_data` is silently dropped (data lingers in
/// the kernel page cache and would be lost on power loss). On
/// reopen, the main file's salt has NOT been updated (because the
/// header write also went through the dropped fsync, but in fact
/// the bytes themselves landed), and the WAL still carries the
/// pre-rotation salt — but in our scenario we have completed the
/// rotation. Recovery must either (a) re-apply via salt match,
/// or (b) accept the bytes already on disk.
///
/// # Coverage limitation (issue #53)
///
/// This test exercises the *control-flow* of the dropped-fsync path
/// (no real `sync_data` is issued; recovery must still reopen
/// cleanly) but NOT the *durability* consequence of a power loss.
/// The reason is structural: the bytes written before the dropped
/// fsync already reached the kernel page cache, and an in-process
/// harness cannot evict them — so Phase 2's reopen, running against
/// the same live kernel, simply reads them back. The "unsynced bytes
/// vanish on power loss" half of the scenario is therefore
/// unreachable here (see [`crate::platform::fault::FaultPlan::dropped_fsync_prob`]).
/// Power-loss durability at a coarser, commit-boundary granularity is
/// covered instead by the crash-cycle process-kill model in
/// `obj-core/tests/crash_cycles.rs`, which treats an injected panic as
/// a crash between two consistent commit points and asserts the reopen
/// invariant across 10 000 randomized seeds.
#[test]
fn dropped_fsync_on_checkpointed_main_file_recovers_via_wal_salt_match() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("dropped_fsync.obj");
    let (a_id, b_id) = {
        // Phase 1: open with a faulty MAIN handle that drops fsync
        // with probability 1.0. Every commit + checkpoint goes
        // through write_all_at successfully, but no sync_data is
        // actually issued on the main file.
        let mut p = open_faulty(
            &path,
            FaultPlan::new(55, 0.0, 1.0, 0.0, 0.0, 0),
            FaultPlan::noop(66),
            Config::default(),
        )
        .expect("open faulty");
        let a = p.alloc_page().expect("alloc a");
        let b = p.alloc_page().expect("alloc b");
        write_committed(&mut p, a, 0xAA).expect("commit a");
        write_committed(&mut p, b, 0xBB).expect("commit b");
        // Explicit checkpoint — this is where the dropped fsync
        // matters: the salt rotation completes on disk byte-wise
        // even though no sync_data was actually performed.
        p.checkpoint().expect("checkpoint");
        drop(p);
        (a.get(), b.get())
    };
    // Phase 2: reopen with a normal pager. Either the bytes survived
    // the simulated power loss (the expected outcome in this
    // in-process test — see issue #53: the dropped fsync leaves the
    // bytes in the kernel page cache, which this harness cannot evict,
    // so they are still readable) or recovery re-applies them via the
    // WAL — both paths must produce the same observable result.
    let mut p = Pager::open(&path, Config::default()).expect("reopen");
    let ra = p.read_page(id(a_id)).expect("read a");
    assert_eq!(ra.as_bytes()[0], 0xAA);
    let rb = p.read_page(id(b_id)).expect("read b");
    assert_eq!(rb.as_bytes()[0], 0xBB);
}

/// Cycle invariant: opening with a faulty backend, writing a
/// committed page, and the deliberate-crash boundary panicking
/// inside the harness's `crash_after_ops` window is correctly
/// trapped by `catch_unwind`. This is the precondition for the
/// cycle-test variant in `tests/crash_cycles.rs` to operate.
#[test]
fn deliberate_crash_in_pager_is_caught_by_catch_unwind() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("crash.obj");
    let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
        let mut p = open_faulty(
            &path,
            FaultPlan::new(0, 0.0, 0.0, 0.0, 0.0, 2),
            FaultPlan::noop(1),
            Config::default(),
        )
        .expect("open faulty");
        let _ = p.alloc_page();
        let _ = p.commit();
    }));
    match result {
        Ok(()) => {
            // The crash may not have fired if the count is mis-tuned.
            // We tolerate Ok here: the *invariant* this test pins is
            // panic-detection. If a future change makes the count
            // miss, no harm — the cycle test exercises the wiring
            // directly.
        }
        Err(p) => assert!(
            panic_carries_marker(&p),
            "panic must carry the deliberate-crash marker",
        ),
    }
    // Reopen must still succeed via the normal pager, demonstrating
    // the pager state on disk is recoverable after the trap.
    let p = Pager::open(&path, Config::default()).expect("reopen after crash");
    // The DB exists; allocation may or may not have completed.
    let _ = p.page_count();
}

/// Test (#52 / #91): a committed fresh allocation is durable via the
/// WAL — its zeroed body + the advancing `page_count` ride the SAME WAL
/// group-commit. Recovery replays that frame, restoring `page_count =
/// N+1` AND the page body into the committed view, so `read_page(N)`
/// succeeds without the main file ever having been extended at alloc.
///
/// #91 replaced the old design (where `alloc_fresh` wrote the extension
/// and a blank body DIRECTLY to the main file, and a pre-commit barrier
/// fsync'd them ahead of the WAL `page_count`). Now the ONLY durable
/// record of a committed-but-not-checkpointed fresh page is its WAL
/// frame — the main file is grown lazily at the next checkpoint. This
/// is strictly stronger: one atomically-ordered WAL group-commit
/// replaces two ordered fsyncs, and there is no window where the header
/// references a page the file is too short to hold (the past-EOF #52
/// hazard is deleted at the root).
#[test]
fn committed_alloc_page_recovers_via_wal_before_checkpoint() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("alloc_durable.obj");
    // Phase 1: alloc a page AND write known content, commit (so the
    // body + page_count ride one WAL group-commit), do NOT checkpoint.
    let a_id = {
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(11),
            FaultPlan::noop(22),
            Config::default(),
        )
        .expect("open faulty");
        let a = p.alloc_page().expect("alloc a");
        write_committed(&mut p, a, 0xCD).expect("write+commit");
        drop(p);
        a.get()
    };
    // Phase 2: the main file is NOT extended at alloc under #91 — the
    // fresh page lives only in the WAL until checkpoint. The main file
    // is therefore still just the header (one page) plus whatever the
    // open-time fresh-WAL initialisation wrote. Assert the WAL exists:
    // it carries the only durable copy of the committed alloc.
    assert!(
        wal_path_for(&path).exists(),
        "the committed alloc's durability lives in the WAL before checkpoint",
    );
    // Phase 3: reopen WITH the WAL present (the normal post-crash path).
    // Recovery replays the committed frame: page_count advances to N+1
    // AND the body lands in the view, so the page reads back its
    // committed bytes — proving the WAL group-commit was the durability
    // boundary, no main-file barrier required.
    let mut p = Pager::open(&path, Config::default()).expect("reopen");
    let read = p.read_page(id(a_id)).expect("read recovered alloc");
    assert_eq!(read.as_bytes()[0], 0xCD, "committed alloc body recovered");
    assert_eq!(read.as_bytes()[1024], 0xCDu8.wrapping_mul(3));
    // The recovered header is self-consistent: every page it claims is
    // readable (it resolves from the recovered WAL view).
    let pc = p.page_count();
    let mut pid = 1u64;
    while pid < pc {
        p.read_page(id(pid)).expect("header-claimed page readable");
        pid += 1;
    }
}

/// Test (#91): a growing commit whose SINGLE WAL group-commit fsync is
/// lost rolls back atomically to the last durable state.
///
/// #91 removed the per-commit main-file extension barrier: fresh pages
/// ride the WAL, so a growing commit issues exactly ONE `F_FULLFSYNC`
/// (the WAL group-commit). There is no longer any un-WAL'd main-file
/// extension to leave behind — so the failure mode is clean: if the WAL
/// commit fsync never reaches the platter and the un-committed WAL tail
/// is lost on power loss, the alloc simply did not happen. The main
/// file is NOT over-long (alloc never touched it), and recovery returns
/// the database to the durable baseline with NO page the file is too
/// short to hold (the past-EOF #52 hazard is structurally impossible
/// now).
///
/// This is the after-commit-before-checkpoint crash point with the WAL
/// fsync dropped: it asserts no garbage past the durable `page_count`
/// and a self-consistent recovered header.
#[test]
fn growing_commit_with_dropped_wal_fsync_rolls_back_clean() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("wal_fsync_window.obj");
    // Phase 1: durable baseline — alloc + write + commit one page on a
    // clean pager, checkpoint + drop so the main file is authoritative
    // and the WAL is empty going into Phase 2.
    let base_id = {
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(101),
            FaultPlan::noop(202),
            Config::default(),
        )
        .expect("open faulty");
        let a = p.alloc_page().expect("alloc baseline");
        write_committed(&mut p, a, 0xAB).expect("commit baseline");
        p.checkpoint().expect("checkpoint baseline");
        drop(p);
        a.get()
    };
    // Capture the durable main-file length: under #91 a growing commit
    // never extends the main file, so this length must NOT change.
    let baseline_len = std::fs::metadata(&path).expect("meta").len();
    // Phase 2: reopen with the WAL `sync_data` rigged to drop, alloc a
    // fresh page, and commit. The single WAL group-commit fsync is
    // silently dropped — its commit marker never reaches the platter.
    {
        let mut p = open_faulty(
            &path,
            FaultPlan::noop(303),
            FaultPlan::new(404, 0.0, 1.0, 0.0, 0.0, 0),
            Config::default(),
        )
        .expect("reopen faulty");
        let _a = p.alloc_page().expect("alloc fresh");
        let _ = p
            .commit()
            .expect("commit returns Ok with dropped WAL fsync");
        drop(p);
    }
    // The main file length is UNCHANGED — alloc_fresh never extended it
    // (guardrail: no past-EOF write, no over-long file).
    let after_len = std::fs::metadata(&path).expect("meta").len();
    assert_eq!(
        after_len, baseline_len,
        "#91: a growing commit must NOT extend the main file before checkpoint",
    );
    // Phase 3: model power loss discarding the un-fsync'd WAL tail.
    crate::wal::remove_wal(&wal_path_for(&path)).expect("remove wal");
    // Phase 4: reopen. Recovery rolls back to the durable baseline; the
    // baseline page is intact and every header-claimed page is readable.
    let mut p = Pager::open(&path, Config::default()).expect("reopen after crash");
    let page = p
        .read_page(id(base_id))
        .expect("baseline page readable after crash");
    assert_eq!(page.as_bytes()[0], 0xAB, "baseline page content survived");
    let pc = p.page_count();
    let mut pid = 1u64;
    while pid < pc {
        let id = PageId::new(pid).expect("non-zero");
        p.read_page(id)
            .expect("header-claimed page must be readable");
        pid += 1;
    }
}

/// #91 crash-matrix sweep. A growing transaction's write path now issues
/// a deterministic sequence of syscalls: per-page WAL `write_all_at`s,
/// then one WAL group-commit `sync_data`, then (at checkpoint) the
/// main-file `set_len` grow, the per-page main `write_all_at`s, the main
/// `sync_data`, the WAL salt-rotation `write`/`sync_data`, and finally
/// the main header `write`/`sync_data`. Crashing at EVERY op index in a
/// bounded range sweeps all the named crash points — mid-WAL-append,
/// after-commit-before-checkpoint, during-checkpoint-grow (the op right
/// after the un-counted `set_len`), after-grow-before-body-write, and
/// during-salt-rotation — because each is some op in that sequence.
/// After each injected crash we reopen with a CLEAN pager and assert the
/// recovery contract: open succeeds (or surfaces `WalCorruption`, the
/// legitimate refuse-to-guess outcome) and every page the recovered
/// header claims reads back without `UnexpectedEof` / `Corruption` — the
/// pager-level analogue of `integrity_check`. No page beyond the durable
/// `page_count` can leak.
#[test]
fn growing_txn_crash_matrix_recovers_at_every_op() {
    // 40 ops comfortably spans the whole write/checkpoint/salt-rotation
    // sequence of a 6-page growing transaction (Rule 2: bounded).
    for crash_at in 1u64..=40 {
        let dir = TempDir::new().expect("tempdir");
        let path = dir.path().join("crash_matrix.obj");
        // Phase 1: a clean baseline so recovery has prior durable state.
        {
            let mut p = open_faulty(
                &path,
                FaultPlan::noop(1),
                FaultPlan::noop(2),
                Config::default(),
            )
            .expect("open baseline");
            let a = p.alloc_page().expect("alloc baseline");
            write_committed(&mut p, a, 0x10).expect("commit baseline");
            p.checkpoint().expect("checkpoint baseline");
            drop(p);
        }
        // Phase 2: reopen with BOTH backends rigged to crash at the
        // `crash_at`-th syscall, then drive a growing txn + checkpoint.
        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
            let mut p = open_faulty(
                &path,
                FaultPlan::new(100 + crash_at, 0.0, 0.0, 0.0, 0.0, crash_at),
                FaultPlan::new(200 + crash_at, 0.0, 0.0, 0.0, 0.0, crash_at),
                Config::default(),
            )
            .expect("reopen faulty");
            let mut ids = Vec::new();
            for k in 0..6u8 {
                let pid = p.alloc_page()?;
                let mut page = Page::zeroed();
                page.as_bytes_mut()[0] = 0x40 + k;
                p.write_page(pid, &page)?;
                ids.push(pid);
            }
            let _ = p.commit()?;
            p.checkpoint()?;
            crate::Result::Ok(())
        }));
        // The panic must be the deliberate crash marker (if it fired);
        // any other panic is a real bug.
        if let Err(payload) = &result {
            assert!(
                panic_carries_marker(payload),
                "crash_at={crash_at}: unexpected panic payload",
            );
        }
        // Phase 3: reopen CLEAN. Recovery must succeed (or refuse with
        // WalCorruption) and every header-claimed page must read cleanly.
        assert_recovers_clean(&path, crash_at);
    }
}

/// Reopen `path` with a normal pager and assert the #91 recovery
/// contract: every page in `[1, page_count)` is readable without
/// `UnexpectedEof` / `Corruption` (the pager-level integrity check).
/// `WalCorruption` at open is the legitimate refuse-to-guess outcome and
/// is accepted.
fn assert_recovers_clean(path: &std::path::Path, crash_at: u64) {
    let mut p = match Pager::open(path, Config::default()) {
        Ok(p) => p,
        Err(crate::Error::WalCorruption { .. }) => return,
        Err(e) => panic!("crash_at={crash_at}: recovery open failed: {e:?}"),
    };
    let pc = p.page_count();
    let mut pid = 1u64;
    while pid < pc {
        let id = PageId::new(pid).expect("non-zero");
        p.read_page(id).unwrap_or_else(|e| {
            panic!("crash_at={crash_at}: header-claimed page {pid} unreadable: {e:?}")
        });
        pid += 1;
    }
}

/// #91 guardrail 1: a forced dirty eviction during a growing transaction
/// must NOT write a fresh page past the (un-extended) main-file EOF.
/// Open with a one-frame cache so EVERY read-through eviction is forced,
/// allocate and write many pages (far more than one cache frame),
/// then crash both pre-commit AND pre-checkpoint by simply dropping the
/// pager. Reopen and assert there is NO garbage at offsets >= the
/// durable `page_count`: the file is either short (alloc never touched
/// it) and recovery heals it, or the recovered header is self-consistent.
/// Before the fix, `alloc_fresh` seeded a DIRTY cache frame + ran
/// `handle_eviction`, which `write_back_page`d the fresh body straight to
/// a main file that #91 no longer extends — a past-EOF write.
/// Allocate, write a `base + k` marker into, and read back `n` fresh
/// pages on a one-frame-cache pager — every read-through forces an
/// eviction of the previously-resident frame, exercising guardrail 1's
/// "no dirty eviction of a fresh page to a short main file" path.
fn alloc_write_read_churn(p: &mut Pager<FaultyFileHandle>, n: u8, base: u8) {
    for k in 0..n {
        let pid = p.alloc_page().expect("alloc");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = base + k;
        p.write_page(pid, &page).expect("write");
        let _ = p.read_page(pid).expect("read back");
    }
}

#[test]
fn forced_dirty_eviction_during_growing_txn_never_writes_past_eof() {
    let dir = TempDir::new().expect("tempdir");
    // cache_frames = 1 forces a cache eviction on every read-through.
    let cfg = Config::default()
        .with_cache_frames(1)
        .expect("cache_frames=1")
        .with_checkpoint_threshold(u64::MAX);
    // Case A: crash PRE-COMMIT (drop with everything pending, no commit).
    let path_a = dir.path().join("forced_evict_a.obj");
    {
        let mut p = open_faulty(&path_a, FaultPlan::noop(1), FaultPlan::noop(2), cfg.clone())
            .expect("open faulty");
        alloc_write_read_churn(&mut p, 16, 0x50);
        drop(p);
    }
    // Reopen: pre-commit writes were never durable; the main file is
    // self-consistent (nothing past the durable page_count).
    {
        let mut p = Pager::open(&path_a, Config::default()).expect("reopen pre-commit");
        let pc = p.page_count();
        let mut pid = 1u64;
        while pid < pc {
            p.read_page(id(pid))
                .expect("page readable after pre-commit crash");
            pid += 1;
        }
    }
    // Case B: crash PRE-CHECKPOINT (commit, then drop before checkpoint).
    let path_b = dir.path().join("forced_evict_b.obj");
    let committed_pc = {
        let mut p =
            open_faulty(&path_b, FaultPlan::noop(3), FaultPlan::noop(4), cfg).expect("open faulty");
        alloc_write_read_churn(&mut p, 16, 0x60);
        let _ = p.commit().expect("commit");
        let pc = p.page_count();
        drop(p);
        pc
    };
    // Reopen: recovery replays the WAL; committed pages are all readable
    // and page_count matches. No fresh page was ever written past EOF.
    let mut p = Pager::open(&path_b, Config::default()).expect("reopen pre-checkpoint");
    assert_eq!(
        p.page_count(),
        committed_pc,
        "committed page_count recovered"
    );
    let mut pid = 1u64;
    while pid < committed_pc {
        let read = p
            .read_page(id(pid))
            .expect("committed page readable after pre-checkpoint crash");
        let expected = 0x60u8 + u8::try_from(pid - 1).expect("fits");
        assert_eq!(read.as_bytes()[0], expected, "page {pid} body recovered");
        pid += 1;
    }
}

// --- #91 fsync-count proof --------------------------------------------

/// A `FileBackend` that wraps a real `FileHandle` and counts every
/// `sync_data` call (the `F_FULLFSYNC` on macOS under `SyncMode::Full`).
/// The counter is shared via `Arc<AtomicU64>` so the test can read it
/// while the pager owns the backend.
struct CountingHandle {
    inner: FileHandle,
    fsyncs: std::sync::Arc<std::sync::atomic::AtomicU64>,
}

impl crate::FileBackend for CountingHandle {
    fn len(&self) -> crate::Result<u64> {
        self.inner.len()
    }
    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> crate::Result<()> {
        self.inner.read_exact_at(buf, offset)
    }
    fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<()> {
        self.inner.write_all_at(buf, offset)
    }
    fn set_len(&self, new_len: u64) -> crate::Result<()> {
        self.inner.set_len(new_len)
    }
    fn sync_data(&self, mode: crate::platform::SyncMode) -> crate::Result<()> {
        self.fsyncs
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        self.inner.sync_data(mode)
    }
    fn sync_all(&self) -> crate::Result<()> {
        self.inner.sync_all()
    }
}

/// #91 acceptance: a growing commit issues EXACTLY ONE `F_FULLFSYNC`
/// (the WAL group-commit), down from TWO in the pre-#91 design (the
/// per-commit main-file extension barrier + the WAL group-commit). We
/// count `sync_data` calls on BOTH backends across a growing commit that
/// is NOT auto-checkpointed: the only fsync is the WAL's. The main
/// backend sees ZERO syncs because fresh pages no longer extend or touch
/// the main file before checkpoint.
#[test]
fn growing_commit_issues_exactly_one_fsync() {
    use std::sync::atomic::{AtomicU64, Ordering};
    use std::sync::Arc;
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("fsync_count.obj");
    let main_fsyncs = Arc::new(AtomicU64::new(0));
    let wal_fsyncs = Arc::new(AtomicU64::new(0));
    // checkpoint_threshold = MAX so commit does NOT auto-checkpoint —
    // we isolate the commit's own fsync cost.
    let cfg = Config::default().with_checkpoint_threshold(u64::MAX);
    let main = CountingHandle {
        inner: FileHandle::open_or_create(&path).expect("main"),
        fsyncs: Arc::clone(&main_fsyncs),
    };
    let wal_path = wal_path_for(&path);
    let wal = CountingHandle {
        inner: FileHandle::open_or_create(&wal_path).expect("wal"),
        fsyncs: Arc::clone(&wal_fsyncs),
    };
    let mut p =
        Pager::<CountingHandle>::open_with_backends(main, wal, wal_path, cfg).expect("open");
    p.begin_txn();
    // Warm past the open-time fresh-WAL initialisation sync.
    main_fsyncs.store(0, Ordering::SeqCst);
    wal_fsyncs.store(0, Ordering::SeqCst);
    // A growing transaction: allocate + write several fresh pages, then
    // ONE commit. This is the `batch_insert_64`-shaped workload.
    for k in 0..8u8 {
        let pid = p.alloc_page().expect("alloc");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0x70 + k;
        p.write_page(pid, &page).expect("write");
    }
    let _ = p.commit().expect("commit");
    let wal_count = wal_fsyncs.load(Ordering::SeqCst);
    let main_count = main_fsyncs.load(Ordering::SeqCst);
    assert_eq!(
        wal_count, 1,
        "#91: a growing commit issues exactly ONE WAL group-commit fsync \
         (got {wal_count})",
    );
    assert_eq!(
        main_count, 0,
        "#91: a growing commit issues ZERO main-file fsyncs before \
         checkpoint — the pre-#91 main-file extension barrier is gone \
         (got {main_count})",
    );
    // Total = 1 (was 2 pre-#91: barrier + group-commit).
    assert_eq!(
        wal_count + main_count,
        1,
        "#91: total fsyncs per growing commit = 1"
    );
}

/// Helper: drain the in-memory state into a `HashMap`. Used by the
/// cycle-test fixtures.
#[allow(dead_code)] // wired into the seed-range cycle harness in #20.
pub(crate) fn snapshot_expected_pages<F: crate::FileBackend>(
    p: &mut Pager<F>,
    allocated: &[PageId],
) -> crate::Result<HashMap<PageId, Vec<u8>>> {
    let mut out = HashMap::with_capacity(allocated.len());
    for &pid in allocated {
        let page = p.read_page(pid)?;
        out.insert(pid, page.as_bytes().to_vec());
    }
    Ok(out)
}