obj-core 1.1.2

Storage engine internals for the obj embedded document database (pager, WAL, B-tree, codec, catalog).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
//! Transaction layer (L7).
//!
//! Wraps the [`Pager`] + cross-process file locks + reader snapshots
//! into a write- / read-transaction abstraction.  Single-writer
//! model: a [`WriteTxn`] holds (a) the in-process write-
//! serialization gate on the pager-shared [`TxnEnv`] and (b) the
//! cross-process `WRITER_LOCK` byte (when the env was constructed
//! with a lock file).  [`ReadTxn`] holds a shared reader lock byte
//! and a [`ReaderSnapshot`]; readers do not contend with each other
//! and do not block writers.
//!
//! This module exposes the building blocks; M6 issue #47 wraps the
//! result as `obj::Db`.
//!
//! # In-process write-serialization gate (issue #18)
//!
//! The gate is an [`AtomicBool`] behind an `Arc`, NOT a `Mutex<()>`.
//! An acquired
//! [`WriteSerialGuard`] OWNS a clone of that `Arc` and, on `Drop`,
//! `store(false, Release)`s the flag.  Because the guard owns the
//! `Arc` (rather than borrowing the env), it is `Send + 'static`,
//! which in turn makes [`WriteTxn`] `Send` — letting the obj-py
//! binding release the GIL across the blocking lock-acquire.
//!
//! **No poisoning (deliberate, and strictly better).**  A `Mutex<()>`
//! poisons if a thread panics while holding the guard, turning every
//! subsequent `WriteTxn::begin` into a permanent
//! `Busy{WriterInProcess}`.  The `AtomicBool` gate has no such state:
//! if a writer panics mid-transaction, unwinding drops the
//! [`WriteTxn`] (whose `Drop` rolls back — restoring
//! `header_at_begin` so the pager is left at consistent committed
//! state) and then drops the [`WriteSerialGuard`] (which releases the
//! gate).  The next writer proceeds against that consistent state.
//! This replaces a permanent-Busy failure mode with a
//! recover-and-continue one.
//!
//! # Power-of-ten posture
//!
//! - **Rule 4.** Public methods on `WriteTxn` / `ReadTxn` are short
//!   delegations to the pager.
//! - **Rule 5.** `WriteTxn::commit` flips an internal `committed`
//!   flag before draining the txn so a subsequent `Drop` on a
//!   committed txn cannot roll back; the flag is debug-asserted in
//!   the `Drop` impl.
//! - **Rule 7.** No `unwrap` / `expect` in production paths.  A
//!   poisoned pager mutex surfaces as [`Error::Busy`] with
//!   `LockKind::WriterInProcess` rather than a panic; the gate itself
//!   cannot poison.
//! - **Rule 9.** No `dyn` — `WriteTxn<'db, F: FileBackend>` and
//!   `ReadTxn<'db, F: FileBackend>` are monomorphised.

#![forbid(unsafe_code)]

use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;

use crate::error::{Error, LockKind, Result};
use crate::pager::page::{Page, PageId};
use crate::pager::{HeaderSnapshot, Pager, ReaderSnapshot};
use crate::platform::{FileBackend, FileHandle, ReaderLock, WriterLock};

/// Default busy timeout for `WriteTxn::begin` and `ReadTxn::begin`
/// when the caller does not pass a per-call deadline.  5 seconds
/// matches `SQLite`'s default and the design.md `Config::busy_timeout`
/// proposal.
pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);

/// Environment shared by every [`WriteTxn`] / [`ReadTxn`] in a
/// process.  Holds the pager (behind an `Arc<Mutex<_>>`), the in-
/// process write-serialization mutex, and — for file-backed
/// databases — an optional [`FileHandle`] used for cross-process
/// byte-range locking.
///
/// `TxnEnv` is `Send + Sync`; construct one and share via `Arc`
/// across threads (or across an `obj::Db` whose ownership wraps it).
#[derive(Debug)]
pub struct TxnEnv<F: FileBackend = FileHandle> {
    /// The pager.  Behind a Mutex so cache mutations on a cache miss
    /// stay sound under concurrent reader-snapshot reads.  In M6 the
    /// scaling is limited by this mutex; lock-free read paths are
    /// future work (post-M6).
    pager: Arc<Mutex<Pager<F>>>,
    /// In-process writer-serialization gate.  `false` = free,
    /// `true` = held.  A `WriteTxn` holds a [`WriteSerialGuard`] (which
    /// owns a clone of this `Arc`) for its entire lifetime, so at most
    /// one `WriteTxn` is alive per env per process at a time.  Behind
    /// an `Arc` so the guard can OWN it and therefore be `Send`
    /// (issue #18); an `AtomicBool` rather than a `Mutex<()>` so it
    /// cannot poison (see the module-level "In-process gate" docs).
    write_serialization: Arc<AtomicBool>,
    /// File handle held for cross-process locking.  `None` for in-
    /// memory envs (no file to lock) or for callers that explicitly
    /// disable the cross-process lock (e.g. fault-injection tests
    /// where the file-backend type is a harness rather than the
    /// production `FileHandle`).  The handle owns its own fd so
    /// locking calls do not need the pager's mutex.
    lock_file: Option<Arc<FileHandle>>,
}

impl<F: FileBackend> TxnEnv<F> {
    /// Construct an env wrapping the given pager.  `lock_file` is an
    /// optional dedicated file handle for cross-process locks; pass
    /// `None` for in-memory or fault-injection environments.
    #[must_use]
    pub fn new(pager: Pager<F>, lock_file: Option<Arc<FileHandle>>) -> Self {
        Self {
            pager: Arc::new(Mutex::new(pager)),
            write_serialization: Arc::new(AtomicBool::new(false)),
            lock_file,
        }
    }

    /// Shared access to the pager's `Arc<Mutex<_>>`.  Used by the
    /// `obj::Db` wrapper (M6 issue #47) to dispatch reads inside a
    /// `ReadTxn` closure.  Exposed at the txn boundary so the M6
    /// stress test can sample state without a full `WriteTxn`.
    #[must_use]
    pub fn pager(&self) -> &Arc<Mutex<Pager<F>>> {
        &self.pager
    }
}

/// RAII guard on the in-process write-serialization gate.
///
/// Owns a clone of the env's `Arc<AtomicBool>` so it is `Send +
/// 'static` (it does NOT borrow the env).  Constructed only by the
/// crate-internal gate acquire, which sets the flag to `true` via a
/// CAS; `Drop` clears it with a `Release` store so a writer that
/// releases on one thread is observed by an acquirer that CASes on
/// another.
///
/// At most one `WriteSerialGuard` exists per env at a time — that is
/// the single-writer invariant the CAS enforces.
#[derive(Debug)]
#[must_use = "the gate is released when the WriteSerialGuard drops"]
pub struct WriteSerialGuard {
    gate: Arc<AtomicBool>,
}

impl Drop for WriteSerialGuard {
    fn drop(&mut self) {
        // Release the gate.  `Release` pairs with the `Acquire` in the
        // acquiring CAS so the next writer sees all of this writer's
        // memory effects (including a `Drop`-time rollback) before it
        // proceeds.  Cannot poison: a panicking writer still runs this
        // on unwind, freeing the gate for the next writer.
        self.gate.store(false, Ordering::Release);
    }
}

/// A `Send` token holding BOTH blocking-acquired write locks, with no
/// borrow of the env.
///
/// Returned by [`WriteTxn::acquire`] and consumed by
/// [`WriteTxn::from_acquire`].  Splitting the blocking acquisition
/// (which yields this `Send` token) from the cheap, non-blocking txn
/// assembly lets a caller (obj-py) run the acquire on a worker thread
/// with the Python GIL released, then build the `!'static` `WriteTxn`
/// after re-taking the GIL.
///
/// Holds the in-process [`WriteSerialGuard`] and the optional
/// cross-process [`WriterLock`]; both are `Send`, so this token is
/// `Send`.  Generic over `F` only to thread the same backend type
/// through to [`WriteTxn::from_acquire`]; the token itself stores no
/// `F` value, so it carries a `PhantomData<fn() -> F>` to stay
/// covariant-free and `Send` regardless of `F`.
#[derive(Debug)]
#[must_use = "a WriteAcquire holds the write locks until consumed by WriteTxn::from_acquire"]
pub struct WriteAcquire<F: FileBackend> {
    write_guard: WriteSerialGuard,
    writer_lock: Option<WriterLock>,
    _backend: core::marker::PhantomData<fn() -> F>,
}

/// A write transaction.
///
/// Construct via [`WriteTxn::begin`] (or, to release a host lock
/// across the blocking acquire, [`WriteTxn::acquire`] +
/// [`WriteTxn::from_acquire`]).  Holds, for its entire lifetime:
/// 1. A [`WriteSerialGuard`] on the env's in-process write-
///    serialization gate — ensures at most one `WriteTxn` per env per
///    process.
/// 2. An optional cross-process `WRITER_LOCK` byte — ensures at most
///    one `WriteTxn` across the cluster of processes that have
///    opened the same file.
///
/// `WriteTxn::commit` finalises the transaction through the pager's
/// WAL; `WriteTxn::rollback` discards pending writes.  Dropping an
/// uncommitted `WriteTxn` rolls back automatically (and logs a
/// `tracing` debug event, gated on the `tracing` feature, so the
/// caller learns about the silent rollback).
///
/// `Send` (issue #18): every field is `Send` — `&TxnEnv` is
/// `Send + Sync`, [`WriteSerialGuard`] owns an `Arc<AtomicBool>`, and
/// [`WriterLock`] / [`HeaderSnapshot`] are `Send`.  Soundness rests on
/// the single-writer invariant: at most one `WriteTxn` exists per env
/// at a time (the gate enforces it), and every pager access re-locks
/// the pager `Mutex` per-op, so there is no thread-affine state to
/// violate when the handle moves between threads.
///
/// Generic over `F: FileBackend` (Rule 9: no `dyn`).
#[derive(Debug)]
pub struct WriteTxn<'db, F: FileBackend> {
    env: &'db TxnEnv<F>,
    /// In-process write-serialization guard.  Owns an `Arc<AtomicBool>`
    /// (the env's gate), so it is `Send` — this is what makes the
    /// whole `WriteTxn` `Send` (issue #18).  `Option` because `commit`
    /// and `rollback` consume it before the txn is dropped.
    write_guard: Option<WriteSerialGuard>,
    /// Cross-process `WRITER_LOCK` guard.  `None` for envs without a
    /// `lock_file`.  Released on drop or on explicit
    /// `commit`/`rollback`.
    writer_lock: Option<WriterLock>,
    /// Snapshot of header fields the M5 catalog + freelist code
    /// writes direct-to-disk, plus a clone of the WAL committed
    /// view at txn begin.  Restored on rollback so a rolled-back
    /// txn that mutated the catalog (via the M6 `obj::Db` public
    /// API) does not leak a header that points at unwritten page
    /// bodies, and does not leave the WAL view missing pages that
    /// `free_page` removed.  `Option` because `commit`/`rollback`
    /// take ownership of the snapshot.  See
    /// `Pager::header_snapshot` for the rationale.
    header_at_begin: Option<HeaderSnapshot>,
    /// `true` once `commit` or `rollback` has run.  A `Drop` on a
    /// committed/rolled-back txn is a no-op.
    finished: bool,
}

impl<'db, F: FileBackend> WriteTxn<'db, F> {
    /// Begin a new write transaction against `env`.
    ///
    /// Equivalent to [`Self::from_acquire`]`(env, `[`Self::acquire`]
    /// `(env, timeout)?)`: it performs the two blocking lock acquires
    /// (in-process gate, then cross-process `WRITER_LOCK`) and then
    /// assembles the txn.  Callers that need to release a host runtime
    /// lock (e.g. the Python GIL) across the blocking wait should call
    /// [`Self::acquire`] / [`Self::from_acquire`] directly so the
    /// blocking step runs without the host lock held.
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] with `LockKind::Writer` if the cross-
    ///   process lock did not become available within `timeout`.
    /// - [`Error::Busy`] with `LockKind::WriterInProcess` if another
    ///   thread in the same process is mid-write.
    /// - [`Error::Io`] on lock syscall failure.
    pub fn begin(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
        Self::from_acquire(env, Self::acquire(env, timeout)?)
    }

    /// Perform BOTH blocking write-lock acquires and return a `Send`
    /// token, without touching the pager or borrowing the env's
    /// lifetime beyond the call.
    ///
    /// Acquires the in-process serialization gate FIRST (bounded busy-
    /// poll against `timeout`), THEN the cross-process `WRITER_LOCK`
    /// (if `env.lock_file` is `Some`).  This order is load-bearing:
    /// the cross-process OFD lock is per-fd and the whole process
    /// shares one lock-file fd, so two same-process threads would BOTH
    /// pass the cross-process lock — the in-process gate is the
    /// authoritative same-process serializer and must win first.  On a
    /// cross-process failure the in-process guard is dropped (releasing
    /// the gate) before the error returns, exactly as `begin` did.
    ///
    /// The returned [`WriteAcquire`] owns both guards and is `Send`,
    /// so the caller may move it across threads (e.g. acquire on a
    /// worker thread with the Python GIL released).
    ///
    /// # Errors
    ///
    /// As [`Self::begin`].
    pub fn acquire(env: &'db TxnEnv<F>, timeout: Duration) -> Result<WriteAcquire<F>> {
        // In-process gate first (see method docs for why the order is
        // load-bearing).
        let write_guard = acquire_write_serialization(&env.write_serialization, timeout)?;
        // Then the cross-process lock.  Use the full original timeout:
        // the in-process acquire was already bounded by it, and a few
        // ms of slack at worst is acceptable.
        let writer_lock = match env.lock_file.as_ref() {
            Some(handle) => match handle.lock_writer(timeout) {
                Ok(g) => Some(g),
                Err(e) => {
                    // Drop the gate guard before returning so a failed
                    // cross-process acquire does not leave the in-
                    // process gate held — identical to the old `begin`.
                    drop(write_guard);
                    return Err(e);
                }
            },
            None => None,
        };
        Ok(WriteAcquire {
            write_guard,
            writer_lock,
            _backend: core::marker::PhantomData,
        })
    }

    /// Assemble a `WriteTxn` from a previously-[`acquire`](Self::acquire)d
    /// token.  This is the cheap, NON-blocking half of `begin`: it
    /// briefly locks the pager to flip the txn-depth gauge and take the
    /// header snapshot, then takes ownership of the token's guards.
    ///
    /// Holding the pager mutex here cannot deadlock: the caller already
    /// owns the in-process gate (carried in `acq`), so no other
    /// `WriteTxn` is alive, and the snapshot is cheap.
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] with `LockKind::WriterInProcess` if the pager
    ///   mutex is poisoned.
    pub fn from_acquire(env: &'db TxnEnv<F>, acq: WriteAcquire<F>) -> Result<Self> {
        // M6 #51: flip the pager's txn-depth gauge so the Catalog's
        // debug-assert at its mutation boundaries sees a non-zero
        // depth; snapshot the header fields the catalog / freelist
        // write direct-to-disk so rollback can restore them.
        let header_at_begin = {
            let mut pager = env.pager.lock().map_err(|_| Error::Busy {
                kind: LockKind::WriterInProcess,
            })?;
            pager.begin_txn();
            pager.header_snapshot()
        };
        Ok(Self {
            env,
            write_guard: Some(acq.write_guard),
            writer_lock: acq.writer_lock,
            header_at_begin: Some(header_at_begin),
            finished: false,
        })
    }

    /// Write `page` at `id` through the pager.
    ///
    /// # Errors
    ///
    /// - [`Error::InvalidArgument`] if `id` is out of range.
    /// - [`Error::Io`] on syscall failure.
    pub fn write_page(&self, id: PageId, page: &Page) -> Result<()> {
        let mut pager = self.lock_pager()?;
        pager.write_page(id, page)
    }

    /// Read `id` through the pager (sees pending + committed +
    /// main).  Used inside a write txn that needs to read-modify-
    /// write a page.  Returns an owned [`Page`] because the borrow
    /// chain through the pager's mutex would otherwise tie the
    /// returned reference to the guard.
    ///
    /// # Errors
    ///
    /// As [`Pager::read_page`].
    pub fn read_page(&self, id: PageId) -> Result<Page> {
        let mut pager = self.lock_pager()?;
        let page_ref = pager.read_page(id)?;
        Ok(page_ref.to_owned_page())
    }

    /// Allocate a new page through the pager.
    ///
    /// # Errors
    ///
    /// As [`Pager::alloc_page`].
    pub fn alloc_page(&self) -> Result<PageId> {
        let mut pager = self.lock_pager()?;
        pager.alloc_page()
    }

    /// Acquire the pager mutex.  Bubble a poisoned mutex up as
    /// `WriterInProcess` — every txn method that takes the pager
    /// goes through here so the failure mode is uniform.
    ///
    /// # Errors
    ///
    /// Returns [`Error::Busy`] with `LockKind::WriterInProcess` if
    /// the mutex is poisoned by a previous panic.
    pub fn lock_pager(&self) -> Result<MutexGuard<'_, Pager<F>>> {
        self.env.pager.lock().map_err(|_| Error::Busy {
            kind: LockKind::WriterInProcess,
        })
    }

    /// Access the underlying env.  Used by callers (e.g. `obj`
    /// crate) that compose typed handles over the raw txn.
    #[must_use]
    pub fn env(&self) -> &'db TxnEnv<F> {
        self.env
    }

    /// Commit the transaction.  Calls `Pager::commit` to make the
    /// staged writes durable, then releases both lock layers.
    ///
    /// # Errors
    ///
    /// Returns whatever [`Pager::commit`] returns.
    pub fn commit(mut self) -> Result<()> {
        {
            let mut pager = self.lock_pager()?;
            let _lsn = pager.commit()?;
            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
            pager.end_txn();
        }
        self.finished = true;
        // Drop the in-process guard explicitly so the cross-process
        // lock's release order is deterministic.
        self.write_guard.take();
        // Snapshot drops here — no rollback needed on commit.
        self.header_at_begin.take();
        // `writer_lock`'s `Drop` releases the OFD lock; nothing
        // forces us to call `release()` here.  We do call it so a
        // release error can be surfaced (rather than swallowed by
        // `Drop`).
        if let Some(lock) = self.writer_lock.take() {
            lock.release()?;
        }
        Ok(())
    }

    /// Roll the transaction back, dropping all pending writes.
    ///
    /// # Errors
    ///
    /// Returns [`Error::Busy`] only if the in-process pager mutex
    /// is poisoned; otherwise infallible.
    pub fn rollback(mut self) -> Result<()> {
        let snap = self.header_at_begin.take();
        {
            let mut pager = self.lock_pager()?;
            rollback_pending(&mut pager);
            // Restore header fields that the catalog / freelist
            // code wrote direct-to-disk during the rolled-back
            // closure.
            if let Some(s) = snap {
                pager.restore_header_snapshot(s)?;
            }
            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
            pager.end_txn();
        }
        self.finished = true;
        self.write_guard.take();
        if let Some(lock) = self.writer_lock.take() {
            lock.release()?;
        }
        Ok(())
    }
}

impl<F: FileBackend> Drop for WriteTxn<'_, F> {
    fn drop(&mut self) {
        if self.finished {
            return;
        }
        // Uncommitted txn — roll back.  We can't return errors from
        // `Drop`; best-effort drop the pending buffer + restore the
        // header snapshot. #55: a library must not write to stderr, so
        // the implicit-rollback diagnostic is routed to the `tracing`
        // facade (gated on the `tracing` feature) instead of `eprintln!`.
        // The early-return rollback pattern is legitimate, so this is a
        // debug-level event, not a warning.
        let snap = self.header_at_begin.take();
        if let Ok(mut pager) = self.env.pager.lock() {
            rollback_pending(&mut pager);
            if let Some(s) = snap {
                let _ = pager.restore_header_snapshot(s);
            }
            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
            pager.end_txn();
        }
        #[cfg(feature = "tracing")]
        tracing::debug!("WriteTxn dropped without commit/rollback; pending writes discarded");
        // Both `write_guard` and `writer_lock` release on drop.
    }
}

/// Discard the pager's pending-transaction buffer.  Idempotent.
///
/// The pager exposes `commit` / `checkpoint` but no explicit
/// rollback — pending writes simply never make it into the WAL.
/// This helper drains the in-memory pending map so a re-entered
/// `WriteTxn` on the same pager observes a clean slate.
fn rollback_pending<F: FileBackend>(pager: &mut Pager<F>) {
    pager.rollback_pending_writes();
}

/// Busy-poll a CAS on the in-process gate (`AtomicBool`) until it is
/// acquired or `timeout` elapses, returning an owning
/// [`WriteSerialGuard`].  Power-of-ten Rule 2: the loop is bounded by
/// the deadline AND a defensive iter counter; a return of
/// `Err(Error::Busy{WriterInProcess})` is the surfaced alternative to
/// blocking forever.
///
/// The backoff schedule mirrors the prior `Mutex<()>` polyfill and
/// `platform::lock`'s file-lock retry exactly: 1 ms initial, doubled
/// per miss, capped at 100 ms.  Average overhead under uncontended
/// load is one `compare_exchange_weak`.
///
/// Memory ordering: success uses `Acquire` so the winner observes the
/// previous holder's writes (paired with the `Release` store in
/// [`WriteSerialGuard::drop`]); failure uses `Relaxed` (a failed CAS
/// synchronizes nothing).  `compare_exchange_weak` is used because the
/// retry loop tolerates spurious failures and it is cheaper on some
/// targets.
///
/// Unlike the old `Mutex<()>`, the gate cannot poison — there is no
/// `Poisoned` arm; a panicking writer's guard `Drop` simply frees the
/// gate (see the module-level "In-process gate" docs).
fn acquire_write_serialization(
    gate: &Arc<AtomicBool>,
    timeout: Duration,
) -> Result<WriteSerialGuard> {
    let start = std::time::Instant::now();
    let mut backoff = Duration::from_millis(1);
    let max_backoff = Duration::from_millis(100);
    // Rule 2 upper bound: the deadline check is the canonical exit;
    // the iter counter is defensive.
    let timeout_millis = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
    let mut iters: u64 = 0;
    let max_iters = timeout_millis.saturating_add(64);
    loop {
        iters = iters.saturating_add(1);
        if iters > max_iters {
            return Err(Error::Busy {
                kind: LockKind::WriterInProcess,
            });
        }
        if gate
            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_ok()
        {
            return Ok(WriteSerialGuard {
                gate: Arc::clone(gate),
            });
        }
        if start.elapsed() >= timeout {
            return Err(Error::Busy {
                kind: LockKind::WriterInProcess,
            });
        }
        std::thread::sleep(backoff);
        backoff = (backoff * 2).min(max_backoff);
    }
}

/// A read transaction.
///
/// Construct via [`ReadTxn::begin`].  Holds:
/// 1. A [`ReaderSnapshot`] pinning the WAL end-LSN at construction.
/// 2. An optional cross-process `READER_LOCK` byte (shared with
///    other readers but not exclusive with writers — see
///    `docs/format.md` § File locking).
///
/// Reads inside a `ReadTxn` see a consistent snapshot of the
/// database; pending writes from a concurrent `WriteTxn` and frames
/// committed AFTER `ReadTxn::begin` are invisible.
#[derive(Debug)]
pub struct ReadTxn<'db, F: FileBackend> {
    env: &'db TxnEnv<F>,
    snapshot: ReaderSnapshot<F>,
    _reader_lock: Option<ReaderLock>,
}

impl<'db, F: FileBackend> ReadTxn<'db, F> {
    /// Begin a new read transaction.
    ///
    /// Acquires a cross-process reader-lock slot (if `env.lock_file`
    /// is `Some`) with the env's default busy timeout, then takes a
    /// [`ReaderSnapshot`] from the pager.  Readers do not contend
    /// with each other (31 shared slots) and do not contend with
    /// writers on the byte-range layer.
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] with `LockKind::Reader` on lock timeout.
    /// - [`Error::Io`] on lock or pager syscall failure.
    pub fn begin(env: &'db TxnEnv<F>) -> Result<Self> {
        Self::begin_with_timeout(env, DEFAULT_BUSY_TIMEOUT)
    }

    /// As [`begin`](Self::begin) with a caller-supplied timeout.
    ///
    /// # Errors
    ///
    /// See [`begin`](Self::begin).
    pub fn begin_with_timeout(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
        let reader_lock = match env.lock_file.as_ref() {
            Some(handle) => Some(handle.lock_reader(timeout)?),
            None => None,
        };
        let snapshot = {
            let mut pager = env.pager.lock().map_err(|_| Error::Busy {
                kind: LockKind::WriterInProcess,
            })?;
            pager.reader_snapshot()?
        };
        Ok(Self {
            env,
            snapshot,
            _reader_lock: reader_lock,
        })
    }

    /// LSN this txn's snapshot pinned.  Diagnostic-only.
    #[must_use]
    pub fn pinned_lsn(&self) -> crate::wal::Lsn {
        self.snapshot.pinned_lsn()
    }

    /// Read page `id` consistent with the txn's snapshot.
    ///
    /// # Errors
    ///
    /// As [`ReaderSnapshot::read_page`].
    pub fn read_page(&self, id: PageId) -> Result<Page> {
        let pager = self.env.pager.lock().map_err(|_| Error::Busy {
            kind: LockKind::WriterInProcess,
        })?;
        // #81: this public API returns an OWNED `Page`, and the pager
        // lock guard cannot outlive this call, so we must materialise
        // an owned page here. `into_page` clones the body only on the
        // `Shared` arm (and only at this ownership boundary); the hot
        // decode paths (`get_via_snapshot`, `lookup_via_snapshot`,
        // range scans) keep the handle and call `as_bytes` instead.
        Ok(self.snapshot.read_page(&pager, id)?.into_page())
    }

    /// Access the underlying snapshot.  Used by `obj::Db` (M6 issue
    /// #47) to dispatch typed reads on a read txn.
    #[must_use]
    pub fn snapshot(&self) -> &ReaderSnapshot<F> {
        &self.snapshot
    }

    /// Access the env this txn lives in.  Used by `obj::Db` to
    /// access the pager mutex.
    #[must_use]
    pub fn env(&self) -> &TxnEnv<F> {
        self.env
    }

    /// Drop the txn explicitly (releases the snapshot pin and the
    /// reader lock).
    pub fn end(self) {
        drop(self);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::pager::page::Page;
    use crate::pager::Config;
    use crate::platform::FileHandle;
    use std::sync::Arc;
    use std::thread;
    use tempfile::TempDir;

    fn build_env(dir: &TempDir) -> (TxnEnv<FileHandle>, PageId) {
        let path = dir.path().join("txn.obj");
        let mut pager = Pager::open(&path, Config::default()).expect("pager");
        // #64: enter a Pager txn so alloc_page's debug_assert passes.
        // commit() inside the txn flushes; end_txn balances the
        // begin_txn so subsequent WriteTxn::begin sees txn_depth=0.
        pager.begin_txn();
        let a = pager.alloc_page().expect("alloc");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0;
        pager.write_page(a, &page).expect("write");
        let _ = pager.commit().expect("commit");
        pager.end_txn();
        // Issue #1: lock byte lives in the `<db>.obj-lock`
        // sidecar, not the main file. Mirror the production
        // wiring so this test exercises the same path layout.
        let lock_path = crate::pager::lock_path_for(&path);
        let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock file"));
        lock_file.set_len(128).expect("lock sidecar len");
        (TxnEnv::new(pager, Some(lock_file)), a)
    }

    #[test]
    fn write_txn_commit_makes_writes_visible() {
        let dir = TempDir::new().expect("tmp");
        let (env, a) = build_env(&dir);
        let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0x77;
        tx.write_page(a, &page).expect("write");
        tx.commit().expect("commit");

        let rx = ReadTxn::begin(&env).expect("read");
        let observed = rx.read_page(a).expect("read");
        assert_eq!(observed.as_bytes()[0], 0x77);
    }

    #[test]
    fn write_txn_rollback_drops_writes() {
        let dir = TempDir::new().expect("tmp");
        let (env, a) = build_env(&dir);
        let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0x99;
        tx.write_page(a, &page).expect("write");
        tx.rollback().expect("rollback");

        let rx = ReadTxn::begin(&env).expect("read");
        let observed = rx.read_page(a).expect("read");
        assert_eq!(observed.as_bytes()[0], 0, "rollback must discard writes");
    }

    #[test]
    fn in_process_writers_serialize() {
        let dir = TempDir::new().expect("tmp");
        let (env, _a) = build_env(&dir);
        let tx1 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx1");
        let err = WriteTxn::begin(&env, Duration::from_millis(10)).expect_err("tx2 busy");
        assert!(matches!(
            err,
            Error::Busy {
                kind: LockKind::WriterInProcess
            }
        ));
        tx1.commit().expect("commit");
        // After commit, the next txn must succeed.
        let _tx3 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx3");
    }

    /// Issue #18: `WriteTxn` (and the new `WriteAcquire` /
    /// `WriteSerialGuard`) must be `Send` so obj-py can move the
    /// blocking acquire across an `allow_threads` boundary.  A
    /// compile-time assertion via a generic `fn`; the `let _ =` keeps
    /// it from being dead-code-eliminated.
    #[test]
    fn write_txn_is_send() {
        fn assert_send<T: Send>() {}
        assert_send::<WriteTxn<'_, FileHandle>>();
        assert_send::<WriteAcquire<FileHandle>>();
        assert_send::<WriteSerialGuard>();
        // ReadTxn is already Send; assert it stays so.
        assert_send::<ReadTxn<'_, FileHandle>>();
    }

    /// Issue #18: the gate does not poison.  If a writer panics
    /// mid-transaction, unwinding drops the `WriteTxn` (rollback) then
    /// the `WriteSerialGuard` (releases the gate), so the NEXT writer
    /// proceeds — against the rolled-back, consistent state.  The old
    /// `Mutex<()>` would have poisoned and turned every later `begin`
    /// into a permanent `Busy{WriterInProcess}`.
    #[test]
    fn panic_in_writer_releases_gate_and_rolls_back() {
        let dir = TempDir::new().expect("tmp");
        let (env, a) = build_env(&dir);
        let env = Arc::new(env);
        // Panic while holding a WriteTxn that has staged a write.
        let env_for_panic = Arc::clone(&env);
        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            let tx = WriteTxn::begin(&env_for_panic, Duration::from_millis(50)).expect("begin");
            let mut page = Page::zeroed();
            page.as_bytes_mut()[0] = 0xEE;
            tx.write_page(a, &page).expect("write");
            panic!("simulated mid-write crash");
        }));
        assert!(result.is_err(), "the closure must have panicked");
        // The gate is free: the next writer acquires immediately (no
        // permanent Busy from poisoning).
        let tx2 = WriteTxn::begin(&env, Duration::from_millis(50))
            .expect("gate must be free after a panicking writer");
        tx2.commit().expect("commit");
        // The panicking writer's staged write was rolled back: the
        // page reads as its pre-panic value (0), not 0xEE.
        let rx = ReadTxn::begin(&env).expect("read");
        let observed = rx.read_page(a).expect("read");
        assert_eq!(
            observed.as_bytes()[0],
            0,
            "panicking writer's staged write must have been rolled back",
        );
    }

    /// 4 writer threads × 1000 iterations each — N writers
    /// serialize via the in-process Mutex + the cross-process
    /// `WRITER_LOCK`.  Every txn must succeed (no deadlock; no
    /// aborted txn).
    #[test]
    fn n_writers_serialize_with_no_deadlock() {
        let dir = TempDir::new().expect("tmp");
        let path = dir.path().join("stress.obj");
        let mut pager = Pager::open(&path, Config::default()).expect("pager");
        // #64: wrap alloc_page in a Pager txn so the header-routing
        // assert passes. end_txn before handing to TxnEnv so writers
        // get a fresh txn-depth.
        pager.begin_txn();
        let a = pager.alloc_page().expect("alloc");
        let mut page = Page::zeroed();
        page.as_bytes_mut()[0] = 0;
        pager.write_page(a, &page).expect("write");
        let _ = pager.commit().expect("commit");
        pager.end_txn();
        // Issue #1: sidecar lock path, see `build_env` above.
        let lock_path = crate::pager::lock_path_for(&path);
        let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock"));
        lock_file.set_len(128).expect("lock sidecar len");
        let env = Arc::new(TxnEnv::new(pager, Some(lock_file)));

        let n_writers = 4usize;
        let iters_per_writer = 250u32;
        thread::scope(|scope| {
            let mut handles = Vec::with_capacity(n_writers);
            for w in 0..n_writers {
                let env = Arc::clone(&env);
                handles.push(scope.spawn(move || {
                    for i in 0..iters_per_writer {
                        // Generous timeout because the writer needs
                        // to block on the other writers in the worst
                        // case.
                        let tx = WriteTxn::begin(&env, Duration::from_secs(30))
                            .expect("begin under load");
                        let mut p = Page::zeroed();
                        p.as_bytes_mut()[0] =
                            u8::try_from((w * 1000 + i as usize) % 250 + 1).expect("byte fits");
                        tx.write_page(a, &p).expect("write");
                        tx.commit().expect("commit");
                    }
                }));
            }
            for h in handles {
                h.join().expect("join");
            }
        });

        // Read back from a fresh read txn — must see SOME writer's
        // last value; cross-process serialization ensures no torn
        // page.
        let rx = ReadTxn::begin(&env).expect("read");
        let p = rx.read_page(a).expect("read");
        assert_ne!(p.as_bytes()[0], 0, "some writer's value must be visible");
    }

    #[test]
    fn drop_without_commit_warns_and_rolls_back() {
        let dir = TempDir::new().expect("tmp");
        let (env, a) = build_env(&dir);
        {
            let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
            let mut page = Page::zeroed();
            page.as_bytes_mut()[0] = 0xAB;
            tx.write_page(a, &page).expect("write");
            // Drop without commit — implicit rollback.
        }
        let rx = ReadTxn::begin(&env).expect("read");
        let observed = rx.read_page(a).expect("read");
        assert_eq!(
            observed.as_bytes()[0],
            0,
            "drop-without-commit must roll back",
        );
    }

    #[test]
    fn read_txn_sees_consistent_snapshot() {
        let dir = TempDir::new().expect("tmp");
        let (env, a) = build_env(&dir);

        // Reader takes its snapshot now (sees byte 0).
        let rx = ReadTxn::begin(&env).expect("read");

        // Writer commits a new version.
        {
            let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("write");
            let mut p = Page::zeroed();
            p.as_bytes_mut()[0] = 0x55;
            tx.write_page(a, &p).expect("write");
            tx.commit().expect("commit");
        }

        // Reader STILL sees byte 0 from its pinned snapshot.
        let observed = rx.read_page(a).expect("read");
        assert_eq!(
            observed.as_bytes()[0],
            0,
            "snapshot must isolate reader from concurrent commits",
        );
    }
}