obj-db 1.1.2

Embedded document database. Stable file format, full ACID, single-file portability.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
//! M11 #95 — Hot-backup integrity under concurrent writes
//! (M11 exit gate, correctness side).
//!
//! Per the M11 exit criterion: "Backup of a live, actively-written
//! DB produces a file that itself passes integrity check."
//!
//! Workload: `std::thread::scope` with two threads —
//!
//! 1. A **writer** that continuously runs `Db::transaction` with
//!    a 60% insert / 25% update / 15% delete mix (the M6 stress
//!    distribution). Uses a deterministic seeded `ChaCha8Rng` so
//!    a failing seed is reproducible via `OBJ_BACKUP_SEED=<N>`.
//! 2. A **backup driver** that waits a brief warm-up period (so
//!    the writer commits some data), then calls
//!    `Db::backup_to(backup_path)`. After backup completes, the
//!    driver flips `stop = true` so the writer winds down.
//!
//! After the threads join, the test:
//!
//! - Opens `Db::open(backup_path)` and runs `integrity_check` —
//!   asserts `report.is_ok()`. If this ever fails for any seed,
//!   the M11 exit gate has surfaced a categorical bug.
//! - Runs `integrity_check` on the source DB at end-of-workload —
//!   also asserts `report.is_ok()`.
//! - Records doc counts from both source and backup as
//!   diagnostics. Power-of-ten Rule 5: we do NOT assert a tight
//!   doc-count bound because the snapshot LSN's view is internal
//!   to `backup_to`; the contract the test enforces is the
//!   stronger "the backup passes `integrity_check`," which implies
//!   any present doc carries a self-consistent index / primary
//!   pair regardless of the snapshot's exact pin point.
//!
//! Duration is parameterised via `OBJ_BACKUP_DURATION_SECS`
//! (default 10 s for local runs; CI runs the 30 s gate).
//!
//! Heartbeat watchdog: the writer bumps an `AtomicU64` heartbeat
//! every 100 ops; a watchdog thread panics if it fails to advance
//! for 5 s.
//!
//! On failure the run prints `OBJ_BACKUP_SEED=<N>` to stderr and
//! writes the captured op log to
//! `target/backup_concurrent/seed-<N>.log`.
//!
//! # Power-of-ten posture
//!
//! - **Rule 2.** Every loop in this test is bounded by a deadline
//!   plus an explicit `stop` flag; the watchdog itself loops
//!   against `start.elapsed() < duration`.
//! - **Rule 4.** Per-op helpers live in their own functions.
//! - **Rule 5.** The cross-thread shared state goes through
//!   `Arc<Mutex<>>` / atomics — no `static mut`. The integrity-
//!   check assertion is the gate's type-level invariant.
//! - **Rule 7.** Errors from `backup_to` / `integrity_check`
//!   surface via `Result`; the harness's `expect` calls are
//!   confined to setup steps (`tempdir`, `Db::open`) whose
//!   failure is itself a test failure.

#![forbid(unsafe_code)]

use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use obj::{Config, Db, Document, Id};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;

/// Default duration for a local run when `OBJ_BACKUP_DURATION_SECS`
/// is unset. CI uses 30 s; the human-driven release-validation
/// soak may run longer.
const DEFAULT_DURATION_SECS: u64 = 10;
/// Ops the writer commits between heartbeat bumps.
const HEARTBEAT_OPS_GRANULARITY: u64 = 100;
/// Watchdog tolerance for a stalled heartbeat. 5 s per the issue's
/// "writer's heartbeat doesn't advance for 5 s" spec.
const HEARTBEAT_STALL_SECS: u64 = 5;
/// Default seed when `OBJ_BACKUP_SEED` is unset.
const DEFAULT_SEED: u64 = 0xBACC_BACC_DEAD_BEEF;
/// Warm-up delay before the backup driver fires. Gives the writer
/// time to commit some data so the backup is not against an empty
/// pager view.
const BACKUP_WARMUP: Duration = Duration::from_millis(500);

/// Test document. Carries an `id_echo` field so the integrity
/// check's primary-vs-index cross-walk has something to validate
/// even before the M7 index suite is exercised in this test.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct BackupDoc {
    /// Echo of the document's `Id` value at insert / update time.
    id_echo: u64,
    /// Monotonic-ish version (advances on update).
    version: u32,
    /// Random payload.
    payload: Vec<u8>,
}

impl Document for BackupDoc {
    const COLLECTION: &'static str = "backup_docs";
    const VERSION: u32 = 1;
}

/// Writer operation tag.
#[derive(Debug, Clone, Copy)]
enum Op {
    Insert,
    Update,
    Delete,
}

#[ignore = "M11 exit gate: concurrent-backup integrity test; run via --ignored"]
#[test]
fn backup_concurrent() {
    let duration_secs = env::var("OBJ_BACKUP_DURATION_SECS")
        .ok()
        .and_then(|s| s.parse::<u64>().ok())
        .unwrap_or(DEFAULT_DURATION_SECS);
    let seed = env::var("OBJ_BACKUP_SEED")
        .ok()
        .and_then(|s| s.parse::<u64>().ok())
        .unwrap_or(DEFAULT_SEED);
    eprintln!("OBJ_BACKUP_SEED={seed} OBJ_BACKUP_DURATION_SECS={duration_secs}");

    let dir = TempDir::new().expect("tempdir");
    let src_path = dir.path().join("src.obj");
    let backup_path = dir.path().join("backup.obj");
    let outcome = run_backup_workload(&src_path, &backup_path, seed, duration_secs);
    report(&outcome, seed);
}

/// Aggregated outcome of one concurrent-backup run. `Some(msg)`
/// is a failure; `None` is success.
struct Outcome {
    failure: Option<String>,
    writer_ops: u64,
    writer_busy: u64,
    /// Source-DB doc count at end-of-workload.
    source_count: usize,
    /// Backup-DB doc count.
    backup_count: usize,
    op_log: Vec<String>,
}

fn report(outcome: &Outcome, seed: u64) {
    eprintln!(
        "backup_concurrent run complete: writer_ops={} writer_busy={} \
         source_count={} backup_count={}",
        outcome.writer_ops, outcome.writer_busy, outcome.source_count, outcome.backup_count,
    );
    if let Some(msg) = outcome.failure.as_ref() {
        let log_dir = PathBuf::from("target").join("backup_concurrent");
        let _ = fs::create_dir_all(&log_dir);
        let log_path = log_dir.join(format!("seed-{seed}.log"));
        if let Ok(mut f) = fs::File::create(&log_path) {
            for line in &outcome.op_log {
                let _ = writeln!(f, "{line}");
            }
        }
        panic!(
            "OBJ_BACKUP_SEED={seed} FAIL: {msg}\nop log: {}",
            log_path.display(),
        );
    }
}

/// Spawn the writer + backup driver + watchdog and wait for them
/// to finish. After the threads join, integrity-check both source
/// and backup.
fn run_backup_workload(
    src_path: &Path,
    backup_path: &Path,
    seed: u64,
    duration_secs: u64,
) -> Outcome {
    // Disable cross-process locks: this test shares one Db handle
    // across threads (same fd) — matches the M6 #49 stress harness.
    let config = Config::default()
        .cross_process_lock(false)
        .busy_timeout(Duration::from_mins(2));
    let db = Arc::new(Db::open_with(src_path, config).expect("open source"));
    let duration = Duration::from_secs(duration_secs);
    let shared = SharedState::new();
    let backup_outcome = Arc::new(Mutex::new(None));

    thread::scope(|scope| {
        let writer_db = Arc::clone(&db);
        let writer_shared = shared.clone();
        scope.spawn(move || {
            writer_loop(&writer_db, seed, duration, &writer_shared);
        });

        let backup_driver_db = Arc::clone(&db);
        let backup_driver_shared = shared.clone();
        let backup_path_buf: PathBuf = backup_path.to_path_buf();
        let backup_outcome_for_driver = Arc::clone(&backup_outcome);
        scope.spawn(move || {
            let res = backup_driver(&backup_driver_db, &backup_path_buf, &backup_driver_shared);
            if let Ok(mut slot) = backup_outcome_for_driver.lock() {
                *slot = Some(res);
            }
            backup_driver_shared.stop.store(true, Ordering::SeqCst);
        });

        run_watchdog(&shared, duration);
    });

    finalize_outcome(&db, backup_path, shared, &backup_outcome)
}

/// Per-run shared state pulled out of `run_backup_workload` so the
/// fn body stays within power-of-ten Rule 4.
#[derive(Clone)]
struct SharedState {
    heartbeat: Arc<AtomicU64>,
    stop: Arc<AtomicBool>,
    expected: Arc<Mutex<HashMap<u64, ExpectedState>>>,
    op_log: Arc<Mutex<Vec<String>>>,
    writer_ops: Arc<AtomicU64>,
    writer_busy: Arc<AtomicU64>,
    writer_failure: Arc<Mutex<Option<String>>>,
}

impl SharedState {
    fn new() -> Self {
        Self {
            heartbeat: Arc::new(AtomicU64::new(0)),
            stop: Arc::new(AtomicBool::new(false)),
            expected: Arc::new(Mutex::new(HashMap::new())),
            op_log: Arc::new(Mutex::new(Vec::new())),
            writer_ops: Arc::new(AtomicU64::new(0)),
            writer_busy: Arc::new(AtomicU64::new(0)),
            writer_failure: Arc::new(Mutex::new(None)),
        }
    }
}

/// Last known committed state for an id — same contract as the M6
/// stress test. Used by the writer to skip ids it knows are
/// deleted.
#[derive(Debug, Clone, Copy)]
enum ExpectedState {
    Present,
    Deleted,
}

fn writer_loop(db: &Arc<Db>, seed: u64, duration: Duration, shared: &SharedState) {
    let mut rng = ChaCha8Rng::seed_from_u64(seed);
    let start = Instant::now();
    let mut iter: u64 = 0;
    while !shared.stop.load(Ordering::Relaxed) && start.elapsed() < duration {
        iter = iter.saturating_add(1);
        if iter.is_multiple_of(HEARTBEAT_OPS_GRANULARITY) {
            shared.heartbeat.store(iter, Ordering::Relaxed);
        }
        let op = choose_op(&mut rng);
        match perform_writer_op(db, op, &mut rng, &shared.expected) {
            Ok(()) => {
                shared.writer_ops.fetch_add(1, Ordering::Relaxed);
            }
            Err(obj::Error::Busy { .. }) => {
                shared.writer_busy.fetch_add(1, Ordering::Relaxed);
            }
            Err(e) => {
                if let Ok(mut log) = shared.op_log.lock() {
                    log.push(format!("writer iter {iter}: op {op:?} err: {e:?}"));
                }
                if let Ok(mut slot) = shared.writer_failure.lock() {
                    *slot = Some(format!("writer iter {iter}: op {op:?} err: {e:?}"));
                }
                shared.stop.store(true, Ordering::SeqCst);
                return;
            }
        }
    }
    shared.heartbeat.store(iter, Ordering::Relaxed);
}

/// 60% Insert / 25% Update / 15% Delete — same distribution as
/// the M6 stress test.
fn choose_op(rng: &mut ChaCha8Rng) -> Op {
    let n: u32 = rng.random_range(0..100);
    match n {
        0..60 => Op::Insert,
        60..85 => Op::Update,
        _ => Op::Delete,
    }
}

fn perform_writer_op(
    db: &Db,
    op: Op,
    rng: &mut ChaCha8Rng,
    expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
    match op {
        Op::Insert => writer_insert(db, rng, expected),
        Op::Update => writer_update(db, rng, expected),
        Op::Delete => writer_delete(db, rng, expected),
    }
}

fn writer_insert(
    db: &Db,
    rng: &mut ChaCha8Rng,
    expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
    let payload = random_payload(rng);
    let inserted = db.transaction(|tx| {
        let coll = tx.collection::<BackupDoc>()?;
        let id = coll.insert(BackupDoc {
            id_echo: 0,
            version: 1,
            payload,
        })?;
        coll.update(id, |d: &mut BackupDoc| {
            d.id_echo = id.get();
        })?;
        Ok(id)
    })?;
    if let Ok(mut map) = expected.lock() {
        map.insert(inserted.get(), ExpectedState::Present);
    }
    Ok(())
}

fn writer_update(
    db: &Db,
    rng: &mut ChaCha8Rng,
    expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
    let Some((id, new_version)) = pick_existing_id(rng, expected) else {
        return Ok(());
    };
    let payload = random_payload(rng);
    let r = db.update::<BackupDoc, _>(
        Id::try_new(id).expect("nonzero by construction"),
        |d: &mut BackupDoc| {
            d.payload.clone_from(&payload);
            d.version = new_version;
        },
    );
    match r {
        Ok(()) => {
            if let Ok(mut map) = expected.lock() {
                map.insert(id, ExpectedState::Present);
            }
            Ok(())
        }
        Err(obj::Error::DocumentNotFound { .. }) => Ok(()),
        Err(e) => Err(e),
    }
}

fn writer_delete(
    db: &Db,
    rng: &mut ChaCha8Rng,
    expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
    let Some((id, _)) = pick_existing_id(rng, expected) else {
        return Ok(());
    };
    let existed = db.delete::<BackupDoc>(Id::try_new(id).expect("nonzero"))?;
    if existed {
        if let Ok(mut map) = expected.lock() {
            map.insert(id, ExpectedState::Deleted);
        }
    }
    Ok(())
}

fn random_payload(rng: &mut ChaCha8Rng) -> Vec<u8> {
    let len: usize = rng.random_range(8..256);
    let mut buf = vec![0u8; len];
    rng.fill(buf.as_mut_slice());
    buf
}

fn pick_existing_id(
    rng: &mut ChaCha8Rng,
    expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> Option<(u64, u32)> {
    let Ok(map) = expected.lock() else {
        return None;
    };
    let present: Vec<u64> = map
        .iter()
        .filter_map(|(k, v)| matches!(v, ExpectedState::Present).then_some(*k))
        .collect();
    if present.is_empty() {
        return None;
    }
    let idx = rng.random_range(0..present.len());
    let id = present[idx];
    let next_version: u32 = rng.random_range(2..u32::MAX);
    Some((id, next_version))
}

/// Backup driver: wait for the warm-up window, then take a
/// backup. Returns the `backup_to` result so the test can surface
/// it as a hard failure if it errors.
fn backup_driver(db: &Arc<Db>, backup_path: &Path, shared: &SharedState) -> obj::Result<()> {
    // Brief warm-up so the writer commits something to back up.
    let warmup_start = Instant::now();
    while warmup_start.elapsed() < BACKUP_WARMUP && !shared.stop.load(Ordering::Relaxed) {
        thread::sleep(Duration::from_millis(50));
    }
    db.backup_to(backup_path)
}

/// Watchdog: panics if the writer's heartbeat fails to advance
/// for `HEARTBEAT_STALL_SECS`. Returns when the duration elapses
/// or the `stop` flag is set.
fn run_watchdog(shared: &SharedState, duration: Duration) {
    let start = Instant::now();
    let mut last_seen = shared.heartbeat.load(Ordering::Relaxed);
    let mut last_change = Instant::now();
    while !shared.stop.load(Ordering::Relaxed) && start.elapsed() < duration {
        thread::sleep(Duration::from_millis(250));
        let current = shared.heartbeat.load(Ordering::Relaxed);
        if current != last_seen {
            last_seen = current;
            last_change = Instant::now();
        } else if last_change.elapsed() > Duration::from_secs(HEARTBEAT_STALL_SECS) {
            shared.stop.store(true, Ordering::SeqCst);
            panic!(
                "watchdog: writer heartbeat stalled at {current} for \
                 {HEARTBEAT_STALL_SECS}s; rerun with OBJ_BACKUP_SEED=<seed> \
                 to repro",
            );
        }
    }
    shared.stop.store(true, Ordering::SeqCst);
}

/// After threads join: open the backup, integrity-check it, count
/// docs in both source + backup, integrity-check the source.
fn finalize_outcome(
    source_db: &Arc<Db>,
    backup_path: &Path,
    shared: SharedState,
    backup_outcome: &Arc<Mutex<Option<obj::Result<()>>>>,
) -> Outcome {
    let writer_ops = shared.writer_ops.load(Ordering::SeqCst);
    let writer_busy = shared.writer_busy.load(Ordering::SeqCst);
    let op_log: Vec<String> = Arc::try_unwrap(shared.op_log)
        .ok()
        .and_then(|m| m.into_inner().ok())
        .unwrap_or_default();
    let mut failure: Option<String> = Arc::try_unwrap(shared.writer_failure)
        .ok()
        .and_then(|m| m.into_inner().ok())
        .flatten();

    // Surface a backup-driver error as a hard failure if the
    // writer didn't already record one.
    let backup_result: Option<obj::Result<()>> =
        backup_outcome.lock().ok().and_then(|mut g| g.take());
    if failure.is_none() {
        match &backup_result {
            Some(Ok(())) => {}
            Some(Err(e)) => failure = Some(format!("backup_to: {e:?}")),
            None => failure = Some("backup driver did not run".to_owned()),
        }
    }

    let (source_count, source_check_err) = check_source(source_db);
    if failure.is_none() {
        failure = source_check_err;
    }

    let (backup_count, backup_check_err) = check_backup(backup_path);
    if failure.is_none() {
        failure = backup_check_err;
    }

    Outcome {
        failure,
        writer_ops,
        writer_busy,
        source_count,
        backup_count,
        op_log,
    }
}

/// Run `integrity_check` on the source DB and count its docs.
/// Returns `(doc_count, Some(failure_message))` on a problem,
/// `(doc_count, None)` on success.
fn check_source(db: &Arc<Db>) -> (usize, Option<String>) {
    let count = db.all::<BackupDoc>().map_or(0, |v| v.len());
    let report = match db.integrity_check() {
        Ok(r) => r,
        Err(e) => return (count, Some(format!("source integrity_check: {e:?}"))),
    };
    if !report.is_ok() {
        return (
            count,
            Some(format!(
                "source DB failed integrity_check at end-of-workload: \
                 failures = {:?}",
                report.failures,
            )),
        );
    }
    (count, None)
}

/// Re-open the backup file and run `integrity_check`. This is the
/// M11 exit gate's strongest invariant — a failure here is a
/// categorical bug.
fn check_backup(backup_path: &Path) -> (usize, Option<String>) {
    let db = match Db::open(backup_path) {
        Ok(d) => d,
        Err(e) => return (0, Some(format!("Db::open(backup): {e:?}"))),
    };
    let count = db.all::<BackupDoc>().map_or(0, |v| v.len());
    let report = match db.integrity_check() {
        Ok(r) => r,
        Err(e) => return (count, Some(format!("backup integrity_check: {e:?}"))),
    };
    if !report.is_ok() {
        return (
            count,
            Some(format!(
                "M11 EXIT GATE FAIL: backup did NOT pass integrity_check; \
                 failures = {:?}",
                report.failures,
            )),
        );
    }
    (count, None)
}

/// Compile-time `Send + Sync` check on the harness's shared state.
const _: () = {
    fn assert_send_sync<T: Send + Sync>() {}
    let _ = assert_send_sync::<SharedState>;
};

// ---------------------------------------------------------------------
// #76: cross-process backup must hold the cross-process WRITER_LOCK.
//
// The `backup_concurrent` gate above runs with `cross_process_lock(false)`
// and shares ONE `Db` handle across threads — it proves in-process
// backup integrity but, by construction, cannot exercise an
// EXTERNAL-process writer racing the copy. The bug in #76 was that
// `backup_to` / `checkpoint` took only the in-process pager `Mutex`,
// so a writer in a separate OS process was not excluded and could tear
// the backup.
//
// We model two processes with two independent `Db` handles (two OFD
// fds) to the same path — the same technique `cross_process_lock.rs`
// uses. With cross-process locking ENABLED, the post-#76 contract is:
//
//   * while a writer holds the cross-process WRITER_LOCK (an open
//     `db_writer.transaction`), a concurrent `db_backup.backup_to`
//     MUST be refused with `Error::Busy` rather than copy a main file
//     the writer is mutating; and
//   * once the writer releases, the backup succeeds and the resulting
//     file passes `integrity_check`.
//
// This is a deterministic, fast test (no soak loop), so it runs by
// default rather than behind `--ignored`.
// ---------------------------------------------------------------------

/// #76: a backup that races an external-process writer must be
/// excluded by the cross-process `WRITER_LOCK` (pre-#76 it took only
/// the in-process pager mutex and could tear).
#[test]
fn backup_blocked_by_cross_process_writer_lock() {
    let dir = TempDir::new().expect("tmp");
    let path = dir.path().join("xproc_backup.obj");
    let backup_path = dir.path().join("xproc_backup_out.obj");

    // Two independent handles == two processes for OFD-lock purposes.
    // Short busy timeout so the "blocked" assertion resolves fast.
    let cfg = || Config::default().busy_timeout(Duration::from_millis(300));
    let db_writer = Db::open_with(&path, cfg()).expect("open writer");
    let db_backup = Db::open_with(&path, cfg()).expect("open backup handle");

    // Seed a collection so the backup copies a non-trivial main file.
    db_writer
        .insert(BackupDoc {
            id_echo: 1,
            version: 1,
            payload: vec![0xAB; 64],
        })
        .expect("seed insert");

    // While db_writer holds an open write txn (cross-process
    // WRITER_LOCK held), db_backup.backup_to must report Busy.
    db_writer
        .transaction(|_tx| {
            let err = db_backup
                .backup_to(&backup_path)
                .expect_err("backup must be refused while a writer holds the lock");
            assert!(
                matches!(err, obj::Error::Busy { .. }),
                "expected Error::Busy while external writer holds WRITER_LOCK, got {err:?}",
            );
            // The refused backup must not have left a partial file.
            assert!(
                !backup_path.exists(),
                "a refused backup must not leave a destination file behind",
            );
            Ok::<(), obj::Error>(())
        })
        .expect("writer txn commits");

    // Writer released: the backup now succeeds and is itself a valid,
    // integrity-clean obj file.
    //
    // NOTE: we assert structural validity (`integrity_check`) but NOT
    // a doc count. `db_backup` is a SEPARATE handle that opened before
    // `db_writer` created the collection; its in-memory pager has not
    // replayed `db_writer`'s WAL-resident commit. Cross-process *data
    // freshness* of un-checkpointed WAL frames is a distinct concern
    // from #76, which is specifically about the backup copy being
    // torn by a concurrent external writer. The lock-exclusion
    // assertions above are the #76 contract; the integrity check here
    // proves the copy taken under the lock is self-consistent.
    db_backup
        .backup_to(&backup_path)
        .expect("backup succeeds once the writer lock is free");
    let restored = Db::open(&backup_path).expect("reopen backup");
    let report = restored.integrity_check().expect("integrity check runs");
    assert!(
        report.is_ok(),
        "cross-process backup must pass integrity_check; failures = {:?}",
        report.failures,
    );
}

/// #76: `checkpoint` must likewise hold the cross-process
/// `WRITER_LOCK`, so a checkpoint racing an external-process writer is
/// serialized rather than interleaving main-file writes / salt
/// rotation.
#[test]
fn checkpoint_blocked_by_cross_process_writer_lock() {
    let dir = TempDir::new().expect("tmp");
    let path = dir.path().join("xproc_ckpt.obj");

    let cfg = || Config::default().busy_timeout(Duration::from_millis(300));
    let db_writer = Db::open_with(&path, cfg()).expect("open writer");
    let db_ckpt = Db::open_with(&path, cfg()).expect("open checkpoint handle");

    db_writer
        .insert(BackupDoc {
            id_echo: 2,
            version: 1,
            payload: vec![0xCD; 32],
        })
        .expect("seed insert");

    db_writer
        .transaction(|_tx| {
            let err = db_ckpt
                .checkpoint()
                .expect_err("checkpoint must be refused while a writer holds the lock");
            assert!(
                matches!(err, obj::Error::Busy { .. }),
                "expected Error::Busy while external writer holds WRITER_LOCK, got {err:?}",
            );
            Ok::<(), obj::Error>(())
        })
        .expect("writer txn commits");

    // Once the writer releases, the checkpoint succeeds.
    db_ckpt
        .checkpoint()
        .expect("checkpoint succeeds once the writer lock is free");
}