Skip to main content

obj_core/
txn.rs

1//! Transaction layer (L7).
2//!
3//! Wraps the [`Pager`] + cross-process file locks + reader snapshots
4//! into a write- / read-transaction abstraction.  Single-writer
5//! model: a [`WriteTxn`] holds (a) the in-process write-
6//! serialization gate on the pager-shared [`TxnEnv`] and (b) the
7//! cross-process `WRITER_LOCK` byte (when the env was constructed
8//! with a lock file).  [`ReadTxn`] holds a shared reader lock byte
9//! and a [`ReaderSnapshot`]; readers do not contend with each other
10//! and do not block writers.
11//!
12//! This module exposes the building blocks; M6 issue #47 wraps the
13//! result as `obj::Db`.
14//!
15//! # In-process write-serialization gate (issue #18)
16//!
17//! The gate is an [`AtomicBool`] behind an `Arc`, NOT a `Mutex<()>`.
18//! An acquired
19//! [`WriteSerialGuard`] OWNS a clone of that `Arc` and, on `Drop`,
20//! `store(false, Release)`s the flag.  Because the guard owns the
21//! `Arc` (rather than borrowing the env), it is `Send + 'static`,
22//! which in turn makes [`WriteTxn`] `Send` — letting the obj-py
23//! binding release the GIL across the blocking lock-acquire.
24//!
25//! **No poisoning (deliberate, and strictly better).**  A `Mutex<()>`
26//! poisons if a thread panics while holding the guard, turning every
27//! subsequent `WriteTxn::begin` into a permanent
28//! `Busy{WriterInProcess}`.  The `AtomicBool` gate has no such state:
29//! if a writer panics mid-transaction, unwinding drops the
30//! [`WriteTxn`] (whose `Drop` rolls back — restoring
31//! `header_at_begin` so the pager is left at consistent committed
32//! state) and then drops the [`WriteSerialGuard`] (which releases the
33//! gate).  The next writer proceeds against that consistent state.
34//! This replaces a permanent-Busy failure mode with a
35//! recover-and-continue one.
36//!
37//! # Power-of-ten posture
38//!
39//! - **Rule 4.** Public methods on `WriteTxn` / `ReadTxn` are short
40//!   delegations to the pager.
41//! - **Rule 5.** `WriteTxn::commit` flips an internal `committed`
42//!   flag before draining the txn so a subsequent `Drop` on a
43//!   committed txn cannot roll back; the flag is debug-asserted in
44//!   the `Drop` impl.
45//! - **Rule 7.** No `unwrap` / `expect` in production paths.  A
46//!   poisoned pager mutex surfaces as [`Error::Busy`] with
47//!   `LockKind::WriterInProcess` rather than a panic; the gate itself
48//!   cannot poison.
49//! - **Rule 9.** No `dyn` — `WriteTxn<'db, F: FileBackend>` and
50//!   `ReadTxn<'db, F: FileBackend>` are monomorphised.
51
52#![forbid(unsafe_code)]
53
54use core::sync::atomic::{AtomicBool, Ordering};
55use std::sync::{Arc, Mutex, MutexGuard};
56use std::time::Duration;
57
58use crate::error::{Error, LockKind, Result};
59use crate::pager::page::{Page, PageId};
60use crate::pager::{HeaderSnapshot, Pager, ReaderSnapshot};
61use crate::platform::{FileBackend, FileHandle, ReaderLock, WriterLock};
62
63/// Default busy timeout for `WriteTxn::begin` and `ReadTxn::begin`
64/// when the caller does not pass a per-call deadline.  5 seconds
65/// matches `SQLite`'s default and the design.md `Config::busy_timeout`
66/// proposal.
67pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
68
69/// Environment shared by every [`WriteTxn`] / [`ReadTxn`] in a
70/// process.  Holds the pager (behind an `Arc<Mutex<_>>`), the in-
71/// process write-serialization mutex, and — for file-backed
72/// databases — an optional [`FileHandle`] used for cross-process
73/// byte-range locking.
74///
75/// `TxnEnv` is `Send + Sync`; construct one and share via `Arc`
76/// across threads (or across an `obj::Db` whose ownership wraps it).
77#[derive(Debug)]
78pub struct TxnEnv<F: FileBackend = FileHandle> {
79    /// The pager.  Behind a Mutex so cache mutations on a cache miss
80    /// stay sound under concurrent reader-snapshot reads.  In M6 the
81    /// scaling is limited by this mutex; lock-free read paths are
82    /// future work (post-M6).
83    pager: Arc<Mutex<Pager<F>>>,
84    /// In-process writer-serialization gate.  `false` = free,
85    /// `true` = held.  A `WriteTxn` holds a [`WriteSerialGuard`] (which
86    /// owns a clone of this `Arc`) for its entire lifetime, so at most
87    /// one `WriteTxn` is alive per env per process at a time.  Behind
88    /// an `Arc` so the guard can OWN it and therefore be `Send`
89    /// (issue #18); an `AtomicBool` rather than a `Mutex<()>` so it
90    /// cannot poison (see the module-level "In-process gate" docs).
91    write_serialization: Arc<AtomicBool>,
92    /// File handle held for cross-process locking.  `None` for in-
93    /// memory envs (no file to lock) or for callers that explicitly
94    /// disable the cross-process lock (e.g. fault-injection tests
95    /// where the file-backend type is a harness rather than the
96    /// production `FileHandle`).  The handle owns its own fd so
97    /// locking calls do not need the pager's mutex.
98    lock_file: Option<Arc<FileHandle>>,
99}
100
101impl<F: FileBackend> TxnEnv<F> {
102    /// Construct an env wrapping the given pager.  `lock_file` is an
103    /// optional dedicated file handle for cross-process locks; pass
104    /// `None` for in-memory or fault-injection environments.
105    #[must_use]
106    pub fn new(pager: Pager<F>, lock_file: Option<Arc<FileHandle>>) -> Self {
107        Self {
108            pager: Arc::new(Mutex::new(pager)),
109            write_serialization: Arc::new(AtomicBool::new(false)),
110            lock_file,
111        }
112    }
113
114    /// Shared access to the pager's `Arc<Mutex<_>>`.  Used by the
115    /// `obj::Db` wrapper (M6 issue #47) to dispatch reads inside a
116    /// `ReadTxn` closure.  Exposed at the txn boundary so the M6
117    /// stress test can sample state without a full `WriteTxn`.
118    #[must_use]
119    pub fn pager(&self) -> &Arc<Mutex<Pager<F>>> {
120        &self.pager
121    }
122}
123
124/// RAII guard on the in-process write-serialization gate.
125///
126/// Owns a clone of the env's `Arc<AtomicBool>` so it is `Send +
127/// 'static` (it does NOT borrow the env).  Constructed only by the
128/// crate-internal gate acquire, which sets the flag to `true` via a
129/// CAS; `Drop` clears it with a `Release` store so a writer that
130/// releases on one thread is observed by an acquirer that CASes on
131/// another.
132///
133/// At most one `WriteSerialGuard` exists per env at a time — that is
134/// the single-writer invariant the CAS enforces.
135#[derive(Debug)]
136#[must_use = "the gate is released when the WriteSerialGuard drops"]
137pub struct WriteSerialGuard {
138    gate: Arc<AtomicBool>,
139}
140
141impl Drop for WriteSerialGuard {
142    fn drop(&mut self) {
143        // Release the gate.  `Release` pairs with the `Acquire` in the
144        // acquiring CAS so the next writer sees all of this writer's
145        // memory effects (including a `Drop`-time rollback) before it
146        // proceeds.  Cannot poison: a panicking writer still runs this
147        // on unwind, freeing the gate for the next writer.
148        self.gate.store(false, Ordering::Release);
149    }
150}
151
152/// A `Send` token holding BOTH blocking-acquired write locks, with no
153/// borrow of the env.
154///
155/// Returned by [`WriteTxn::acquire`] and consumed by
156/// [`WriteTxn::from_acquire`].  Splitting the blocking acquisition
157/// (which yields this `Send` token) from the cheap, non-blocking txn
158/// assembly lets a caller (obj-py) run the acquire on a worker thread
159/// with the Python GIL released, then build the `!'static` `WriteTxn`
160/// after re-taking the GIL.
161///
162/// Holds the in-process [`WriteSerialGuard`] and the optional
163/// cross-process [`WriterLock`]; both are `Send`, so this token is
164/// `Send`.  Generic over `F` only to thread the same backend type
165/// through to [`WriteTxn::from_acquire`]; the token itself stores no
166/// `F` value, so it carries a `PhantomData<fn() -> F>` to stay
167/// covariant-free and `Send` regardless of `F`.
168#[derive(Debug)]
169#[must_use = "a WriteAcquire holds the write locks until consumed by WriteTxn::from_acquire"]
170pub struct WriteAcquire<F: FileBackend> {
171    write_guard: WriteSerialGuard,
172    writer_lock: Option<WriterLock>,
173    _backend: core::marker::PhantomData<fn() -> F>,
174}
175
176/// A write transaction.
177///
178/// Construct via [`WriteTxn::begin`] (or, to release a host lock
179/// across the blocking acquire, [`WriteTxn::acquire`] +
180/// [`WriteTxn::from_acquire`]).  Holds, for its entire lifetime:
181/// 1. A [`WriteSerialGuard`] on the env's in-process write-
182///    serialization gate — ensures at most one `WriteTxn` per env per
183///    process.
184/// 2. An optional cross-process `WRITER_LOCK` byte — ensures at most
185///    one `WriteTxn` across the cluster of processes that have
186///    opened the same file.
187///
188/// `WriteTxn::commit` finalises the transaction through the pager's
189/// WAL; `WriteTxn::rollback` discards pending writes.  Dropping an
190/// uncommitted `WriteTxn` rolls back automatically (and logs a
191/// `tracing` debug event, gated on the `tracing` feature, so the
192/// caller learns about the silent rollback).
193///
194/// `Send` (issue #18): every field is `Send` — `&TxnEnv` is
195/// `Send + Sync`, [`WriteSerialGuard`] owns an `Arc<AtomicBool>`, and
196/// [`WriterLock`] / [`HeaderSnapshot`] are `Send`.  Soundness rests on
197/// the single-writer invariant: at most one `WriteTxn` exists per env
198/// at a time (the gate enforces it), and every pager access re-locks
199/// the pager `Mutex` per-op, so there is no thread-affine state to
200/// violate when the handle moves between threads.
201///
202/// Generic over `F: FileBackend` (Rule 9: no `dyn`).
203#[derive(Debug)]
204pub struct WriteTxn<'db, F: FileBackend> {
205    env: &'db TxnEnv<F>,
206    /// In-process write-serialization guard.  Owns an `Arc<AtomicBool>`
207    /// (the env's gate), so it is `Send` — this is what makes the
208    /// whole `WriteTxn` `Send` (issue #18).  `Option` because `commit`
209    /// and `rollback` consume it before the txn is dropped.
210    write_guard: Option<WriteSerialGuard>,
211    /// Cross-process `WRITER_LOCK` guard.  `None` for envs without a
212    /// `lock_file`.  Released on drop or on explicit
213    /// `commit`/`rollback`.
214    writer_lock: Option<WriterLock>,
215    /// Snapshot of header fields the M5 catalog + freelist code
216    /// writes direct-to-disk, plus a clone of the WAL committed
217    /// view at txn begin.  Restored on rollback so a rolled-back
218    /// txn that mutated the catalog (via the M6 `obj::Db` public
219    /// API) does not leak a header that points at unwritten page
220    /// bodies, and does not leave the WAL view missing pages that
221    /// `free_page` removed.  `Option` because `commit`/`rollback`
222    /// take ownership of the snapshot.  See
223    /// `Pager::header_snapshot` for the rationale.
224    header_at_begin: Option<HeaderSnapshot>,
225    /// `true` once `commit` or `rollback` has run.  A `Drop` on a
226    /// committed/rolled-back txn is a no-op.
227    finished: bool,
228}
229
230impl<'db, F: FileBackend> WriteTxn<'db, F> {
231    /// Begin a new write transaction against `env`.
232    ///
233    /// Equivalent to [`Self::from_acquire`]`(env, `[`Self::acquire`]
234    /// `(env, timeout)?)`: it performs the two blocking lock acquires
235    /// (in-process gate, then cross-process `WRITER_LOCK`) and then
236    /// assembles the txn.  Callers that need to release a host runtime
237    /// lock (e.g. the Python GIL) across the blocking wait should call
238    /// [`Self::acquire`] / [`Self::from_acquire`] directly so the
239    /// blocking step runs without the host lock held.
240    ///
241    /// # Errors
242    ///
243    /// - [`Error::Busy`] with `LockKind::Writer` if the cross-
244    ///   process lock did not become available within `timeout`.
245    /// - [`Error::Busy`] with `LockKind::WriterInProcess` if another
246    ///   thread in the same process is mid-write.
247    /// - [`Error::Io`] on lock syscall failure.
248    pub fn begin(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
249        Self::from_acquire(env, Self::acquire(env, timeout)?)
250    }
251
252    /// Perform BOTH blocking write-lock acquires and return a `Send`
253    /// token, without touching the pager or borrowing the env's
254    /// lifetime beyond the call.
255    ///
256    /// Acquires the in-process serialization gate FIRST (bounded busy-
257    /// poll against `timeout`), THEN the cross-process `WRITER_LOCK`
258    /// (if `env.lock_file` is `Some`).  This order is load-bearing:
259    /// the cross-process OFD lock is per-fd and the whole process
260    /// shares one lock-file fd, so two same-process threads would BOTH
261    /// pass the cross-process lock — the in-process gate is the
262    /// authoritative same-process serializer and must win first.  On a
263    /// cross-process failure the in-process guard is dropped (releasing
264    /// the gate) before the error returns, exactly as `begin` did.
265    ///
266    /// The returned [`WriteAcquire`] owns both guards and is `Send`,
267    /// so the caller may move it across threads (e.g. acquire on a
268    /// worker thread with the Python GIL released).
269    ///
270    /// # Errors
271    ///
272    /// As [`Self::begin`].
273    pub fn acquire(env: &'db TxnEnv<F>, timeout: Duration) -> Result<WriteAcquire<F>> {
274        // In-process gate first (see method docs for why the order is
275        // load-bearing).
276        let write_guard = acquire_write_serialization(&env.write_serialization, timeout)?;
277        // Then the cross-process lock.  Use the full original timeout:
278        // the in-process acquire was already bounded by it, and a few
279        // ms of slack at worst is acceptable.
280        let writer_lock = match env.lock_file.as_ref() {
281            Some(handle) => match handle.lock_writer(timeout) {
282                Ok(g) => Some(g),
283                Err(e) => {
284                    // Drop the gate guard before returning so a failed
285                    // cross-process acquire does not leave the in-
286                    // process gate held — identical to the old `begin`.
287                    drop(write_guard);
288                    return Err(e);
289                }
290            },
291            None => None,
292        };
293        Ok(WriteAcquire {
294            write_guard,
295            writer_lock,
296            _backend: core::marker::PhantomData,
297        })
298    }
299
300    /// Assemble a `WriteTxn` from a previously-[`acquire`](Self::acquire)d
301    /// token.  This is the cheap, NON-blocking half of `begin`: it
302    /// briefly locks the pager to flip the txn-depth gauge and take the
303    /// header snapshot, then takes ownership of the token's guards.
304    ///
305    /// Holding the pager mutex here cannot deadlock: the caller already
306    /// owns the in-process gate (carried in `acq`), so no other
307    /// `WriteTxn` is alive, and the snapshot is cheap.
308    ///
309    /// # Errors
310    ///
311    /// - [`Error::Busy`] with `LockKind::WriterInProcess` if the pager
312    ///   mutex is poisoned.
313    pub fn from_acquire(env: &'db TxnEnv<F>, acq: WriteAcquire<F>) -> Result<Self> {
314        // M6 #51: flip the pager's txn-depth gauge so the Catalog's
315        // debug-assert at its mutation boundaries sees a non-zero
316        // depth; snapshot the header fields the catalog / freelist
317        // write direct-to-disk so rollback can restore them.
318        let header_at_begin = {
319            let mut pager = env.pager.lock().map_err(|_| Error::Busy {
320                kind: LockKind::WriterInProcess,
321            })?;
322            pager.begin_txn();
323            pager.header_snapshot()
324        };
325        Ok(Self {
326            env,
327            write_guard: Some(acq.write_guard),
328            writer_lock: acq.writer_lock,
329            header_at_begin: Some(header_at_begin),
330            finished: false,
331        })
332    }
333
334    /// Write `page` at `id` through the pager.
335    ///
336    /// # Errors
337    ///
338    /// - [`Error::InvalidArgument`] if `id` is out of range.
339    /// - [`Error::Io`] on syscall failure.
340    pub fn write_page(&self, id: PageId, page: &Page) -> Result<()> {
341        let mut pager = self.lock_pager()?;
342        pager.write_page(id, page)
343    }
344
345    /// Read `id` through the pager (sees pending + committed +
346    /// main).  Used inside a write txn that needs to read-modify-
347    /// write a page.  Returns an owned [`Page`] because the borrow
348    /// chain through the pager's mutex would otherwise tie the
349    /// returned reference to the guard.
350    ///
351    /// # Errors
352    ///
353    /// As [`Pager::read_page`].
354    pub fn read_page(&self, id: PageId) -> Result<Page> {
355        let mut pager = self.lock_pager()?;
356        let page_ref = pager.read_page(id)?;
357        Ok(page_ref.to_owned_page())
358    }
359
360    /// Allocate a new page through the pager.
361    ///
362    /// # Errors
363    ///
364    /// As [`Pager::alloc_page`].
365    pub fn alloc_page(&self) -> Result<PageId> {
366        let mut pager = self.lock_pager()?;
367        pager.alloc_page()
368    }
369
370    /// Acquire the pager mutex.  Bubble a poisoned mutex up as
371    /// `WriterInProcess` — every txn method that takes the pager
372    /// goes through here so the failure mode is uniform.
373    ///
374    /// # Errors
375    ///
376    /// Returns [`Error::Busy`] with `LockKind::WriterInProcess` if
377    /// the mutex is poisoned by a previous panic.
378    pub fn lock_pager(&self) -> Result<MutexGuard<'_, Pager<F>>> {
379        self.env.pager.lock().map_err(|_| Error::Busy {
380            kind: LockKind::WriterInProcess,
381        })
382    }
383
384    /// Access the underlying env.  Used by callers (e.g. `obj`
385    /// crate) that compose typed handles over the raw txn.
386    #[must_use]
387    pub fn env(&self) -> &'db TxnEnv<F> {
388        self.env
389    }
390
391    /// Commit the transaction.  Calls `Pager::commit` to make the
392    /// staged writes durable, then releases both lock layers.
393    ///
394    /// # Errors
395    ///
396    /// Returns whatever [`Pager::commit`] returns.
397    pub fn commit(mut self) -> Result<()> {
398        {
399            let mut pager = self.lock_pager()?;
400            let _lsn = pager.commit()?;
401            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
402            pager.end_txn();
403        }
404        self.finished = true;
405        // Drop the in-process guard explicitly so the cross-process
406        // lock's release order is deterministic.
407        self.write_guard.take();
408        // Snapshot drops here — no rollback needed on commit.
409        self.header_at_begin.take();
410        // `writer_lock`'s `Drop` releases the OFD lock; nothing
411        // forces us to call `release()` here.  We do call it so a
412        // release error can be surfaced (rather than swallowed by
413        // `Drop`).
414        if let Some(lock) = self.writer_lock.take() {
415            lock.release()?;
416        }
417        Ok(())
418    }
419
420    /// Roll the transaction back, dropping all pending writes.
421    ///
422    /// # Errors
423    ///
424    /// Returns [`Error::Busy`] only if the in-process pager mutex
425    /// is poisoned; otherwise infallible.
426    pub fn rollback(mut self) -> Result<()> {
427        let snap = self.header_at_begin.take();
428        {
429            let mut pager = self.lock_pager()?;
430            rollback_pending(&mut pager);
431            // Restore header fields that the catalog / freelist
432            // code wrote direct-to-disk during the rolled-back
433            // closure.
434            if let Some(s) = snap {
435                pager.restore_header_snapshot(s)?;
436            }
437            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
438            pager.end_txn();
439        }
440        self.finished = true;
441        self.write_guard.take();
442        if let Some(lock) = self.writer_lock.take() {
443            lock.release()?;
444        }
445        Ok(())
446    }
447}
448
449impl<F: FileBackend> Drop for WriteTxn<'_, F> {
450    fn drop(&mut self) {
451        if self.finished {
452            return;
453        }
454        // Uncommitted txn — roll back.  We can't return errors from
455        // `Drop`; best-effort drop the pending buffer + restore the
456        // header snapshot. #55: a library must not write to stderr, so
457        // the implicit-rollback diagnostic is routed to the `tracing`
458        // facade (gated on the `tracing` feature) instead of `eprintln!`.
459        // The early-return rollback pattern is legitimate, so this is a
460        // debug-level event, not a warning.
461        let snap = self.header_at_begin.take();
462        if let Ok(mut pager) = self.env.pager.lock() {
463            rollback_pending(&mut pager);
464            if let Some(s) = snap {
465                let _ = pager.restore_header_snapshot(s);
466            }
467            // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
468            pager.end_txn();
469        }
470        #[cfg(feature = "tracing")]
471        tracing::debug!("WriteTxn dropped without commit/rollback; pending writes discarded");
472        // Both `write_guard` and `writer_lock` release on drop.
473    }
474}
475
476/// Discard the pager's pending-transaction buffer.  Idempotent.
477///
478/// The pager exposes `commit` / `checkpoint` but no explicit
479/// rollback — pending writes simply never make it into the WAL.
480/// This helper drains the in-memory pending map so a re-entered
481/// `WriteTxn` on the same pager observes a clean slate.
482fn rollback_pending<F: FileBackend>(pager: &mut Pager<F>) {
483    pager.rollback_pending_writes();
484}
485
486/// Busy-poll a CAS on the in-process gate (`AtomicBool`) until it is
487/// acquired or `timeout` elapses, returning an owning
488/// [`WriteSerialGuard`].  Power-of-ten Rule 2: the loop is bounded by
489/// the deadline AND a defensive iter counter; a return of
490/// `Err(Error::Busy{WriterInProcess})` is the surfaced alternative to
491/// blocking forever.
492///
493/// The backoff schedule mirrors the prior `Mutex<()>` polyfill and
494/// `platform::lock`'s file-lock retry exactly: 1 ms initial, doubled
495/// per miss, capped at 100 ms.  Average overhead under uncontended
496/// load is one `compare_exchange_weak`.
497///
498/// Memory ordering: success uses `Acquire` so the winner observes the
499/// previous holder's writes (paired with the `Release` store in
500/// [`WriteSerialGuard::drop`]); failure uses `Relaxed` (a failed CAS
501/// synchronizes nothing).  `compare_exchange_weak` is used because the
502/// retry loop tolerates spurious failures and it is cheaper on some
503/// targets.
504///
505/// Unlike the old `Mutex<()>`, the gate cannot poison — there is no
506/// `Poisoned` arm; a panicking writer's guard `Drop` simply frees the
507/// gate (see the module-level "In-process gate" docs).
508fn acquire_write_serialization(
509    gate: &Arc<AtomicBool>,
510    timeout: Duration,
511) -> Result<WriteSerialGuard> {
512    let start = std::time::Instant::now();
513    let mut backoff = Duration::from_millis(1);
514    let max_backoff = Duration::from_millis(100);
515    // Rule 2 upper bound: the deadline check is the canonical exit;
516    // the iter counter is defensive.
517    let timeout_millis = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
518    let mut iters: u64 = 0;
519    let max_iters = timeout_millis.saturating_add(64);
520    loop {
521        iters = iters.saturating_add(1);
522        if iters > max_iters {
523            return Err(Error::Busy {
524                kind: LockKind::WriterInProcess,
525            });
526        }
527        if gate
528            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
529            .is_ok()
530        {
531            return Ok(WriteSerialGuard {
532                gate: Arc::clone(gate),
533            });
534        }
535        if start.elapsed() >= timeout {
536            return Err(Error::Busy {
537                kind: LockKind::WriterInProcess,
538            });
539        }
540        std::thread::sleep(backoff);
541        backoff = (backoff * 2).min(max_backoff);
542    }
543}
544
545/// A read transaction.
546///
547/// Construct via [`ReadTxn::begin`].  Holds:
548/// 1. A [`ReaderSnapshot`] pinning the WAL end-LSN at construction.
549/// 2. An optional cross-process `READER_LOCK` byte (shared with
550///    other readers but not exclusive with writers — see
551///    `docs/format.md` § File locking).
552///
553/// Reads inside a `ReadTxn` see a consistent snapshot of the
554/// database; pending writes from a concurrent `WriteTxn` and frames
555/// committed AFTER `ReadTxn::begin` are invisible.
556#[derive(Debug)]
557pub struct ReadTxn<'db, F: FileBackend> {
558    env: &'db TxnEnv<F>,
559    snapshot: ReaderSnapshot<F>,
560    _reader_lock: Option<ReaderLock>,
561}
562
563impl<'db, F: FileBackend> ReadTxn<'db, F> {
564    /// Begin a new read transaction.
565    ///
566    /// Acquires a cross-process reader-lock slot (if `env.lock_file`
567    /// is `Some`) with the env's default busy timeout, then takes a
568    /// [`ReaderSnapshot`] from the pager.  Readers do not contend
569    /// with each other (31 shared slots) and do not contend with
570    /// writers on the byte-range layer.
571    ///
572    /// # Errors
573    ///
574    /// - [`Error::Busy`] with `LockKind::Reader` on lock timeout.
575    /// - [`Error::Io`] on lock or pager syscall failure.
576    pub fn begin(env: &'db TxnEnv<F>) -> Result<Self> {
577        Self::begin_with_timeout(env, DEFAULT_BUSY_TIMEOUT)
578    }
579
580    /// As [`begin`](Self::begin) with a caller-supplied timeout.
581    ///
582    /// # Errors
583    ///
584    /// See [`begin`](Self::begin).
585    pub fn begin_with_timeout(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
586        let reader_lock = match env.lock_file.as_ref() {
587            Some(handle) => Some(handle.lock_reader(timeout)?),
588            None => None,
589        };
590        let snapshot = {
591            let mut pager = env.pager.lock().map_err(|_| Error::Busy {
592                kind: LockKind::WriterInProcess,
593            })?;
594            pager.reader_snapshot()?
595        };
596        Ok(Self {
597            env,
598            snapshot,
599            _reader_lock: reader_lock,
600        })
601    }
602
603    /// LSN this txn's snapshot pinned.  Diagnostic-only.
604    #[must_use]
605    pub fn pinned_lsn(&self) -> crate::wal::Lsn {
606        self.snapshot.pinned_lsn()
607    }
608
609    /// Read page `id` consistent with the txn's snapshot.
610    ///
611    /// # Errors
612    ///
613    /// As [`ReaderSnapshot::read_page`].
614    pub fn read_page(&self, id: PageId) -> Result<Page> {
615        let pager = self.env.pager.lock().map_err(|_| Error::Busy {
616            kind: LockKind::WriterInProcess,
617        })?;
618        // #81: this public API returns an OWNED `Page`, and the pager
619        // lock guard cannot outlive this call, so we must materialise
620        // an owned page here. `into_page` clones the body only on the
621        // `Shared` arm (and only at this ownership boundary); the hot
622        // decode paths (`get_via_snapshot`, `lookup_via_snapshot`,
623        // range scans) keep the handle and call `as_bytes` instead.
624        Ok(self.snapshot.read_page(&pager, id)?.into_page())
625    }
626
627    /// Access the underlying snapshot.  Used by `obj::Db` (M6 issue
628    /// #47) to dispatch typed reads on a read txn.
629    #[must_use]
630    pub fn snapshot(&self) -> &ReaderSnapshot<F> {
631        &self.snapshot
632    }
633
634    /// Access the env this txn lives in.  Used by `obj::Db` to
635    /// access the pager mutex.
636    #[must_use]
637    pub fn env(&self) -> &TxnEnv<F> {
638        self.env
639    }
640
641    /// Drop the txn explicitly (releases the snapshot pin and the
642    /// reader lock).
643    pub fn end(self) {
644        drop(self);
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::pager::page::Page;
652    use crate::pager::Config;
653    use crate::platform::FileHandle;
654    use std::sync::Arc;
655    use std::thread;
656    use tempfile::TempDir;
657
658    fn build_env(dir: &TempDir) -> (TxnEnv<FileHandle>, PageId) {
659        let path = dir.path().join("txn.obj");
660        let mut pager = Pager::open(&path, Config::default()).expect("pager");
661        // #64: enter a Pager txn so alloc_page's debug_assert passes.
662        // commit() inside the txn flushes; end_txn balances the
663        // begin_txn so subsequent WriteTxn::begin sees txn_depth=0.
664        pager.begin_txn();
665        let a = pager.alloc_page().expect("alloc");
666        let mut page = Page::zeroed();
667        page.as_bytes_mut()[0] = 0;
668        pager.write_page(a, &page).expect("write");
669        let _ = pager.commit().expect("commit");
670        pager.end_txn();
671        // Issue #1: lock byte lives in the `<db>.obj-lock`
672        // sidecar, not the main file. Mirror the production
673        // wiring so this test exercises the same path layout.
674        let lock_path = crate::pager::lock_path_for(&path);
675        let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock file"));
676        lock_file.set_len(128).expect("lock sidecar len");
677        (TxnEnv::new(pager, Some(lock_file)), a)
678    }
679
680    #[test]
681    fn write_txn_commit_makes_writes_visible() {
682        let dir = TempDir::new().expect("tmp");
683        let (env, a) = build_env(&dir);
684        let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
685        let mut page = Page::zeroed();
686        page.as_bytes_mut()[0] = 0x77;
687        tx.write_page(a, &page).expect("write");
688        tx.commit().expect("commit");
689
690        let rx = ReadTxn::begin(&env).expect("read");
691        let observed = rx.read_page(a).expect("read");
692        assert_eq!(observed.as_bytes()[0], 0x77);
693    }
694
695    #[test]
696    fn write_txn_rollback_drops_writes() {
697        let dir = TempDir::new().expect("tmp");
698        let (env, a) = build_env(&dir);
699        let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
700        let mut page = Page::zeroed();
701        page.as_bytes_mut()[0] = 0x99;
702        tx.write_page(a, &page).expect("write");
703        tx.rollback().expect("rollback");
704
705        let rx = ReadTxn::begin(&env).expect("read");
706        let observed = rx.read_page(a).expect("read");
707        assert_eq!(observed.as_bytes()[0], 0, "rollback must discard writes");
708    }
709
710    #[test]
711    fn in_process_writers_serialize() {
712        let dir = TempDir::new().expect("tmp");
713        let (env, _a) = build_env(&dir);
714        let tx1 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx1");
715        let err = WriteTxn::begin(&env, Duration::from_millis(10)).expect_err("tx2 busy");
716        assert!(matches!(
717            err,
718            Error::Busy {
719                kind: LockKind::WriterInProcess
720            }
721        ));
722        tx1.commit().expect("commit");
723        // After commit, the next txn must succeed.
724        let _tx3 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx3");
725    }
726
727    /// Issue #18: `WriteTxn` (and the new `WriteAcquire` /
728    /// `WriteSerialGuard`) must be `Send` so obj-py can move the
729    /// blocking acquire across an `allow_threads` boundary.  A
730    /// compile-time assertion via a generic `fn`; the `let _ =` keeps
731    /// it from being dead-code-eliminated.
732    #[test]
733    fn write_txn_is_send() {
734        fn assert_send<T: Send>() {}
735        assert_send::<WriteTxn<'_, FileHandle>>();
736        assert_send::<WriteAcquire<FileHandle>>();
737        assert_send::<WriteSerialGuard>();
738        // ReadTxn is already Send; assert it stays so.
739        assert_send::<ReadTxn<'_, FileHandle>>();
740    }
741
742    /// Issue #18: the gate does not poison.  If a writer panics
743    /// mid-transaction, unwinding drops the `WriteTxn` (rollback) then
744    /// the `WriteSerialGuard` (releases the gate), so the NEXT writer
745    /// proceeds — against the rolled-back, consistent state.  The old
746    /// `Mutex<()>` would have poisoned and turned every later `begin`
747    /// into a permanent `Busy{WriterInProcess}`.
748    #[test]
749    fn panic_in_writer_releases_gate_and_rolls_back() {
750        let dir = TempDir::new().expect("tmp");
751        let (env, a) = build_env(&dir);
752        let env = Arc::new(env);
753        // Panic while holding a WriteTxn that has staged a write.
754        let env_for_panic = Arc::clone(&env);
755        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
756            let tx = WriteTxn::begin(&env_for_panic, Duration::from_millis(50)).expect("begin");
757            let mut page = Page::zeroed();
758            page.as_bytes_mut()[0] = 0xEE;
759            tx.write_page(a, &page).expect("write");
760            panic!("simulated mid-write crash");
761        }));
762        assert!(result.is_err(), "the closure must have panicked");
763        // The gate is free: the next writer acquires immediately (no
764        // permanent Busy from poisoning).
765        let tx2 = WriteTxn::begin(&env, Duration::from_millis(50))
766            .expect("gate must be free after a panicking writer");
767        tx2.commit().expect("commit");
768        // The panicking writer's staged write was rolled back: the
769        // page reads as its pre-panic value (0), not 0xEE.
770        let rx = ReadTxn::begin(&env).expect("read");
771        let observed = rx.read_page(a).expect("read");
772        assert_eq!(
773            observed.as_bytes()[0],
774            0,
775            "panicking writer's staged write must have been rolled back",
776        );
777    }
778
779    /// 4 writer threads × 1000 iterations each — N writers
780    /// serialize via the in-process Mutex + the cross-process
781    /// `WRITER_LOCK`.  Every txn must succeed (no deadlock; no
782    /// aborted txn).
783    #[test]
784    fn n_writers_serialize_with_no_deadlock() {
785        let dir = TempDir::new().expect("tmp");
786        let path = dir.path().join("stress.obj");
787        let mut pager = Pager::open(&path, Config::default()).expect("pager");
788        // #64: wrap alloc_page in a Pager txn so the header-routing
789        // assert passes. end_txn before handing to TxnEnv so writers
790        // get a fresh txn-depth.
791        pager.begin_txn();
792        let a = pager.alloc_page().expect("alloc");
793        let mut page = Page::zeroed();
794        page.as_bytes_mut()[0] = 0;
795        pager.write_page(a, &page).expect("write");
796        let _ = pager.commit().expect("commit");
797        pager.end_txn();
798        // Issue #1: sidecar lock path, see `build_env` above.
799        let lock_path = crate::pager::lock_path_for(&path);
800        let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock"));
801        lock_file.set_len(128).expect("lock sidecar len");
802        let env = Arc::new(TxnEnv::new(pager, Some(lock_file)));
803
804        let n_writers = 4usize;
805        let iters_per_writer = 250u32;
806        thread::scope(|scope| {
807            let mut handles = Vec::with_capacity(n_writers);
808            for w in 0..n_writers {
809                let env = Arc::clone(&env);
810                handles.push(scope.spawn(move || {
811                    for i in 0..iters_per_writer {
812                        // Generous timeout because the writer needs
813                        // to block on the other writers in the worst
814                        // case.
815                        let tx = WriteTxn::begin(&env, Duration::from_secs(30))
816                            .expect("begin under load");
817                        let mut p = Page::zeroed();
818                        p.as_bytes_mut()[0] =
819                            u8::try_from((w * 1000 + i as usize) % 250 + 1).expect("byte fits");
820                        tx.write_page(a, &p).expect("write");
821                        tx.commit().expect("commit");
822                    }
823                }));
824            }
825            for h in handles {
826                h.join().expect("join");
827            }
828        });
829
830        // Read back from a fresh read txn — must see SOME writer's
831        // last value; cross-process serialization ensures no torn
832        // page.
833        let rx = ReadTxn::begin(&env).expect("read");
834        let p = rx.read_page(a).expect("read");
835        assert_ne!(p.as_bytes()[0], 0, "some writer's value must be visible");
836    }
837
838    #[test]
839    fn drop_without_commit_warns_and_rolls_back() {
840        let dir = TempDir::new().expect("tmp");
841        let (env, a) = build_env(&dir);
842        {
843            let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
844            let mut page = Page::zeroed();
845            page.as_bytes_mut()[0] = 0xAB;
846            tx.write_page(a, &page).expect("write");
847            // Drop without commit — implicit rollback.
848        }
849        let rx = ReadTxn::begin(&env).expect("read");
850        let observed = rx.read_page(a).expect("read");
851        assert_eq!(
852            observed.as_bytes()[0],
853            0,
854            "drop-without-commit must roll back",
855        );
856    }
857
858    #[test]
859    fn read_txn_sees_consistent_snapshot() {
860        let dir = TempDir::new().expect("tmp");
861        let (env, a) = build_env(&dir);
862
863        // Reader takes its snapshot now (sees byte 0).
864        let rx = ReadTxn::begin(&env).expect("read");
865
866        // Writer commits a new version.
867        {
868            let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("write");
869            let mut p = Page::zeroed();
870            p.as_bytes_mut()[0] = 0x55;
871            tx.write_page(a, &p).expect("write");
872            tx.commit().expect("commit");
873        }
874
875        // Reader STILL sees byte 0 from its pinned snapshot.
876        let observed = rx.read_page(a).expect("read");
877        assert_eq!(
878            observed.as_bytes()[0],
879            0,
880            "snapshot must isolate reader from concurrent commits",
881        );
882    }
883}