obj_core/txn.rs
1//! Transaction layer (L7).
2//!
3//! Wraps the [`Pager`] + cross-process file locks + reader snapshots
4//! into a write- / read-transaction abstraction. Single-writer
5//! model: a [`WriteTxn`] holds (a) the in-process write-
6//! serialization gate on the pager-shared [`TxnEnv`] and (b) the
7//! cross-process `WRITER_LOCK` byte (when the env was constructed
8//! with a lock file). [`ReadTxn`] holds a shared reader lock byte
9//! and a [`ReaderSnapshot`]; readers do not contend with each other
10//! and do not block writers.
11//!
12//! This module exposes the building blocks; M6 issue #47 wraps the
13//! result as `obj::Db`.
14//!
15//! # In-process write-serialization gate (issue #18)
16//!
17//! The gate is an [`AtomicBool`] behind an `Arc`, NOT a `Mutex<()>`.
18//! An acquired
19//! [`WriteSerialGuard`] OWNS a clone of that `Arc` and, on `Drop`,
20//! `store(false, Release)`s the flag. Because the guard owns the
21//! `Arc` (rather than borrowing the env), it is `Send + 'static`,
22//! which in turn makes [`WriteTxn`] `Send` — letting the obj-py
23//! binding release the GIL across the blocking lock-acquire.
24//!
25//! **No poisoning (deliberate, and strictly better).** A `Mutex<()>`
26//! poisons if a thread panics while holding the guard, turning every
27//! subsequent `WriteTxn::begin` into a permanent
28//! `Busy{WriterInProcess}`. The `AtomicBool` gate has no such state:
29//! if a writer panics mid-transaction, unwinding drops the
30//! [`WriteTxn`] (whose `Drop` rolls back — restoring
31//! `header_at_begin` so the pager is left at consistent committed
32//! state) and then drops the [`WriteSerialGuard`] (which releases the
33//! gate). The next writer proceeds against that consistent state.
34//! This replaces a permanent-Busy failure mode with a
35//! recover-and-continue one.
36//!
37//! # Power-of-ten posture
38//!
39//! - **Rule 4.** Public methods on `WriteTxn` / `ReadTxn` are short
40//! delegations to the pager.
41//! - **Rule 5.** `WriteTxn::commit` flips an internal `committed`
42//! flag before draining the txn so a subsequent `Drop` on a
43//! committed txn cannot roll back; the flag is debug-asserted in
44//! the `Drop` impl.
45//! - **Rule 7.** No `unwrap` / `expect` in production paths. A
46//! poisoned pager mutex surfaces as [`Error::Busy`] with
47//! `LockKind::WriterInProcess` rather than a panic; the gate itself
48//! cannot poison.
49//! - **Rule 9.** No `dyn` — `WriteTxn<'db, F: FileBackend>` and
50//! `ReadTxn<'db, F: FileBackend>` are monomorphised.
51
52#![forbid(unsafe_code)]
53
54use core::sync::atomic::{AtomicBool, Ordering};
55use std::sync::{Arc, Mutex, MutexGuard};
56use std::time::Duration;
57
58use crate::error::{Error, LockKind, Result};
59use crate::pager::page::{Page, PageId};
60use crate::pager::{HeaderSnapshot, Pager, ReaderSnapshot};
61use crate::platform::{FileBackend, FileHandle, ReaderLock, WriterLock};
62
63/// Default busy timeout for `WriteTxn::begin` and `ReadTxn::begin`
64/// when the caller does not pass a per-call deadline. 5 seconds
65/// matches `SQLite`'s default and the design.md `Config::busy_timeout`
66/// proposal.
67pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
68
69/// Environment shared by every [`WriteTxn`] / [`ReadTxn`] in a
70/// process. Holds the pager (behind an `Arc<Mutex<_>>`), the in-
71/// process write-serialization mutex, and — for file-backed
72/// databases — an optional [`FileHandle`] used for cross-process
73/// byte-range locking.
74///
75/// `TxnEnv` is `Send + Sync`; construct one and share via `Arc`
76/// across threads (or across an `obj::Db` whose ownership wraps it).
77#[derive(Debug)]
78pub struct TxnEnv<F: FileBackend = FileHandle> {
79 /// The pager. Behind a Mutex so cache mutations on a cache miss
80 /// stay sound under concurrent reader-snapshot reads. In M6 the
81 /// scaling is limited by this mutex; lock-free read paths are
82 /// future work (post-M6).
83 pager: Arc<Mutex<Pager<F>>>,
84 /// In-process writer-serialization gate. `false` = free,
85 /// `true` = held. A `WriteTxn` holds a [`WriteSerialGuard`] (which
86 /// owns a clone of this `Arc`) for its entire lifetime, so at most
87 /// one `WriteTxn` is alive per env per process at a time. Behind
88 /// an `Arc` so the guard can OWN it and therefore be `Send`
89 /// (issue #18); an `AtomicBool` rather than a `Mutex<()>` so it
90 /// cannot poison (see the module-level "In-process gate" docs).
91 write_serialization: Arc<AtomicBool>,
92 /// File handle held for cross-process locking. `None` for in-
93 /// memory envs (no file to lock) or for callers that explicitly
94 /// disable the cross-process lock (e.g. fault-injection tests
95 /// where the file-backend type is a harness rather than the
96 /// production `FileHandle`). The handle owns its own fd so
97 /// locking calls do not need the pager's mutex.
98 lock_file: Option<Arc<FileHandle>>,
99}
100
101impl<F: FileBackend> TxnEnv<F> {
102 /// Construct an env wrapping the given pager. `lock_file` is an
103 /// optional dedicated file handle for cross-process locks; pass
104 /// `None` for in-memory or fault-injection environments.
105 #[must_use]
106 pub fn new(pager: Pager<F>, lock_file: Option<Arc<FileHandle>>) -> Self {
107 Self {
108 pager: Arc::new(Mutex::new(pager)),
109 write_serialization: Arc::new(AtomicBool::new(false)),
110 lock_file,
111 }
112 }
113
114 /// Shared access to the pager's `Arc<Mutex<_>>`. Used by the
115 /// `obj::Db` wrapper (M6 issue #47) to dispatch reads inside a
116 /// `ReadTxn` closure. Exposed at the txn boundary so the M6
117 /// stress test can sample state without a full `WriteTxn`.
118 #[must_use]
119 pub fn pager(&self) -> &Arc<Mutex<Pager<F>>> {
120 &self.pager
121 }
122}
123
124/// RAII guard on the in-process write-serialization gate.
125///
126/// Owns a clone of the env's `Arc<AtomicBool>` so it is `Send +
127/// 'static` (it does NOT borrow the env). Constructed only by the
128/// crate-internal gate acquire, which sets the flag to `true` via a
129/// CAS; `Drop` clears it with a `Release` store so a writer that
130/// releases on one thread is observed by an acquirer that CASes on
131/// another.
132///
133/// At most one `WriteSerialGuard` exists per env at a time — that is
134/// the single-writer invariant the CAS enforces.
135#[derive(Debug)]
136#[must_use = "the gate is released when the WriteSerialGuard drops"]
137pub struct WriteSerialGuard {
138 gate: Arc<AtomicBool>,
139}
140
141impl Drop for WriteSerialGuard {
142 fn drop(&mut self) {
143 // Release the gate. `Release` pairs with the `Acquire` in the
144 // acquiring CAS so the next writer sees all of this writer's
145 // memory effects (including a `Drop`-time rollback) before it
146 // proceeds. Cannot poison: a panicking writer still runs this
147 // on unwind, freeing the gate for the next writer.
148 self.gate.store(false, Ordering::Release);
149 }
150}
151
152/// A `Send` token holding BOTH blocking-acquired write locks, with no
153/// borrow of the env.
154///
155/// Returned by [`WriteTxn::acquire`] and consumed by
156/// [`WriteTxn::from_acquire`]. Splitting the blocking acquisition
157/// (which yields this `Send` token) from the cheap, non-blocking txn
158/// assembly lets a caller (obj-py) run the acquire on a worker thread
159/// with the Python GIL released, then build the `!'static` `WriteTxn`
160/// after re-taking the GIL.
161///
162/// Holds the in-process [`WriteSerialGuard`] and the optional
163/// cross-process [`WriterLock`]; both are `Send`, so this token is
164/// `Send`. Generic over `F` only to thread the same backend type
165/// through to [`WriteTxn::from_acquire`]; the token itself stores no
166/// `F` value, so it carries a `PhantomData<fn() -> F>` to stay
167/// covariant-free and `Send` regardless of `F`.
168#[derive(Debug)]
169#[must_use = "a WriteAcquire holds the write locks until consumed by WriteTxn::from_acquire"]
170pub struct WriteAcquire<F: FileBackend> {
171 write_guard: WriteSerialGuard,
172 writer_lock: Option<WriterLock>,
173 _backend: core::marker::PhantomData<fn() -> F>,
174}
175
176/// A write transaction.
177///
178/// Construct via [`WriteTxn::begin`] (or, to release a host lock
179/// across the blocking acquire, [`WriteTxn::acquire`] +
180/// [`WriteTxn::from_acquire`]). Holds, for its entire lifetime:
181/// 1. A [`WriteSerialGuard`] on the env's in-process write-
182/// serialization gate — ensures at most one `WriteTxn` per env per
183/// process.
184/// 2. An optional cross-process `WRITER_LOCK` byte — ensures at most
185/// one `WriteTxn` across the cluster of processes that have
186/// opened the same file.
187///
188/// `WriteTxn::commit` finalises the transaction through the pager's
189/// WAL; `WriteTxn::rollback` discards pending writes. Dropping an
190/// uncommitted `WriteTxn` rolls back automatically (and logs a
191/// `tracing` debug event, gated on the `tracing` feature, so the
192/// caller learns about the silent rollback).
193///
194/// `Send` (issue #18): every field is `Send` — `&TxnEnv` is
195/// `Send + Sync`, [`WriteSerialGuard`] owns an `Arc<AtomicBool>`, and
196/// [`WriterLock`] / [`HeaderSnapshot`] are `Send`. Soundness rests on
197/// the single-writer invariant: at most one `WriteTxn` exists per env
198/// at a time (the gate enforces it), and every pager access re-locks
199/// the pager `Mutex` per-op, so there is no thread-affine state to
200/// violate when the handle moves between threads.
201///
202/// Generic over `F: FileBackend` (Rule 9: no `dyn`).
203#[derive(Debug)]
204pub struct WriteTxn<'db, F: FileBackend> {
205 env: &'db TxnEnv<F>,
206 /// In-process write-serialization guard. Owns an `Arc<AtomicBool>`
207 /// (the env's gate), so it is `Send` — this is what makes the
208 /// whole `WriteTxn` `Send` (issue #18). `Option` because `commit`
209 /// and `rollback` consume it before the txn is dropped.
210 write_guard: Option<WriteSerialGuard>,
211 /// Cross-process `WRITER_LOCK` guard. `None` for envs without a
212 /// `lock_file`. Released on drop or on explicit
213 /// `commit`/`rollback`.
214 writer_lock: Option<WriterLock>,
215 /// Snapshot of header fields the M5 catalog + freelist code
216 /// writes direct-to-disk, plus a clone of the WAL committed
217 /// view at txn begin. Restored on rollback so a rolled-back
218 /// txn that mutated the catalog (via the M6 `obj::Db` public
219 /// API) does not leak a header that points at unwritten page
220 /// bodies, and does not leave the WAL view missing pages that
221 /// `free_page` removed. `Option` because `commit`/`rollback`
222 /// take ownership of the snapshot. See
223 /// `Pager::header_snapshot` for the rationale.
224 header_at_begin: Option<HeaderSnapshot>,
225 /// `true` once `commit` or `rollback` has run. A `Drop` on a
226 /// committed/rolled-back txn is a no-op.
227 finished: bool,
228}
229
230impl<'db, F: FileBackend> WriteTxn<'db, F> {
231 /// Begin a new write transaction against `env`.
232 ///
233 /// Equivalent to [`Self::from_acquire`]`(env, `[`Self::acquire`]
234 /// `(env, timeout)?)`: it performs the two blocking lock acquires
235 /// (in-process gate, then cross-process `WRITER_LOCK`) and then
236 /// assembles the txn. Callers that need to release a host runtime
237 /// lock (e.g. the Python GIL) across the blocking wait should call
238 /// [`Self::acquire`] / [`Self::from_acquire`] directly so the
239 /// blocking step runs without the host lock held.
240 ///
241 /// # Errors
242 ///
243 /// - [`Error::Busy`] with `LockKind::Writer` if the cross-
244 /// process lock did not become available within `timeout`.
245 /// - [`Error::Busy`] with `LockKind::WriterInProcess` if another
246 /// thread in the same process is mid-write.
247 /// - [`Error::Io`] on lock syscall failure.
248 pub fn begin(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
249 Self::from_acquire(env, Self::acquire(env, timeout)?)
250 }
251
252 /// Perform BOTH blocking write-lock acquires and return a `Send`
253 /// token, without touching the pager or borrowing the env's
254 /// lifetime beyond the call.
255 ///
256 /// Acquires the in-process serialization gate FIRST (bounded busy-
257 /// poll against `timeout`), THEN the cross-process `WRITER_LOCK`
258 /// (if `env.lock_file` is `Some`). This order is load-bearing:
259 /// the cross-process OFD lock is per-fd and the whole process
260 /// shares one lock-file fd, so two same-process threads would BOTH
261 /// pass the cross-process lock — the in-process gate is the
262 /// authoritative same-process serializer and must win first. On a
263 /// cross-process failure the in-process guard is dropped (releasing
264 /// the gate) before the error returns, exactly as `begin` did.
265 ///
266 /// The returned [`WriteAcquire`] owns both guards and is `Send`,
267 /// so the caller may move it across threads (e.g. acquire on a
268 /// worker thread with the Python GIL released).
269 ///
270 /// # Errors
271 ///
272 /// As [`Self::begin`].
273 pub fn acquire(env: &'db TxnEnv<F>, timeout: Duration) -> Result<WriteAcquire<F>> {
274 // In-process gate first (see method docs for why the order is
275 // load-bearing).
276 let write_guard = acquire_write_serialization(&env.write_serialization, timeout)?;
277 // Then the cross-process lock. Use the full original timeout:
278 // the in-process acquire was already bounded by it, and a few
279 // ms of slack at worst is acceptable.
280 let writer_lock = match env.lock_file.as_ref() {
281 Some(handle) => match handle.lock_writer(timeout) {
282 Ok(g) => Some(g),
283 Err(e) => {
284 // Drop the gate guard before returning so a failed
285 // cross-process acquire does not leave the in-
286 // process gate held — identical to the old `begin`.
287 drop(write_guard);
288 return Err(e);
289 }
290 },
291 None => None,
292 };
293 Ok(WriteAcquire {
294 write_guard,
295 writer_lock,
296 _backend: core::marker::PhantomData,
297 })
298 }
299
300 /// Assemble a `WriteTxn` from a previously-[`acquire`](Self::acquire)d
301 /// token. This is the cheap, NON-blocking half of `begin`: it
302 /// briefly locks the pager to flip the txn-depth gauge and take the
303 /// header snapshot, then takes ownership of the token's guards.
304 ///
305 /// Holding the pager mutex here cannot deadlock: the caller already
306 /// owns the in-process gate (carried in `acq`), so no other
307 /// `WriteTxn` is alive, and the snapshot is cheap.
308 ///
309 /// # Errors
310 ///
311 /// - [`Error::Busy`] with `LockKind::WriterInProcess` if the pager
312 /// mutex is poisoned.
313 pub fn from_acquire(env: &'db TxnEnv<F>, acq: WriteAcquire<F>) -> Result<Self> {
314 // M6 #51: flip the pager's txn-depth gauge so the Catalog's
315 // debug-assert at its mutation boundaries sees a non-zero
316 // depth; snapshot the header fields the catalog / freelist
317 // write direct-to-disk so rollback can restore them.
318 let header_at_begin = {
319 let mut pager = env.pager.lock().map_err(|_| Error::Busy {
320 kind: LockKind::WriterInProcess,
321 })?;
322 pager.begin_txn();
323 pager.header_snapshot()
324 };
325 Ok(Self {
326 env,
327 write_guard: Some(acq.write_guard),
328 writer_lock: acq.writer_lock,
329 header_at_begin: Some(header_at_begin),
330 finished: false,
331 })
332 }
333
334 /// Write `page` at `id` through the pager.
335 ///
336 /// # Errors
337 ///
338 /// - [`Error::InvalidArgument`] if `id` is out of range.
339 /// - [`Error::Io`] on syscall failure.
340 pub fn write_page(&self, id: PageId, page: &Page) -> Result<()> {
341 let mut pager = self.lock_pager()?;
342 pager.write_page(id, page)
343 }
344
345 /// Read `id` through the pager (sees pending + committed +
346 /// main). Used inside a write txn that needs to read-modify-
347 /// write a page. Returns an owned [`Page`] because the borrow
348 /// chain through the pager's mutex would otherwise tie the
349 /// returned reference to the guard.
350 ///
351 /// # Errors
352 ///
353 /// As [`Pager::read_page`].
354 pub fn read_page(&self, id: PageId) -> Result<Page> {
355 let mut pager = self.lock_pager()?;
356 let page_ref = pager.read_page(id)?;
357 Ok(page_ref.to_owned_page())
358 }
359
360 /// Allocate a new page through the pager.
361 ///
362 /// # Errors
363 ///
364 /// As [`Pager::alloc_page`].
365 pub fn alloc_page(&self) -> Result<PageId> {
366 let mut pager = self.lock_pager()?;
367 pager.alloc_page()
368 }
369
370 /// Acquire the pager mutex. Bubble a poisoned mutex up as
371 /// `WriterInProcess` — every txn method that takes the pager
372 /// goes through here so the failure mode is uniform.
373 ///
374 /// # Errors
375 ///
376 /// Returns [`Error::Busy`] with `LockKind::WriterInProcess` if
377 /// the mutex is poisoned by a previous panic.
378 pub fn lock_pager(&self) -> Result<MutexGuard<'_, Pager<F>>> {
379 self.env.pager.lock().map_err(|_| Error::Busy {
380 kind: LockKind::WriterInProcess,
381 })
382 }
383
384 /// Access the underlying env. Used by callers (e.g. `obj`
385 /// crate) that compose typed handles over the raw txn.
386 #[must_use]
387 pub fn env(&self) -> &'db TxnEnv<F> {
388 self.env
389 }
390
391 /// Commit the transaction. Calls `Pager::commit` to make the
392 /// staged writes durable, then releases both lock layers.
393 ///
394 /// # Errors
395 ///
396 /// Returns whatever [`Pager::commit`] returns.
397 pub fn commit(mut self) -> Result<()> {
398 {
399 let mut pager = self.lock_pager()?;
400 let _lsn = pager.commit()?;
401 // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
402 pager.end_txn();
403 }
404 self.finished = true;
405 // Drop the in-process guard explicitly so the cross-process
406 // lock's release order is deterministic.
407 self.write_guard.take();
408 // Snapshot drops here — no rollback needed on commit.
409 self.header_at_begin.take();
410 // `writer_lock`'s `Drop` releases the OFD lock; nothing
411 // forces us to call `release()` here. We do call it so a
412 // release error can be surfaced (rather than swallowed by
413 // `Drop`).
414 if let Some(lock) = self.writer_lock.take() {
415 lock.release()?;
416 }
417 Ok(())
418 }
419
420 /// Roll the transaction back, dropping all pending writes.
421 ///
422 /// # Errors
423 ///
424 /// Returns [`Error::Busy`] only if the in-process pager mutex
425 /// is poisoned; otherwise infallible.
426 pub fn rollback(mut self) -> Result<()> {
427 let snap = self.header_at_begin.take();
428 {
429 let mut pager = self.lock_pager()?;
430 rollback_pending(&mut pager);
431 // Restore header fields that the catalog / freelist
432 // code wrote direct-to-disk during the rolled-back
433 // closure.
434 if let Some(s) = snap {
435 pager.restore_header_snapshot(s)?;
436 }
437 // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
438 pager.end_txn();
439 }
440 self.finished = true;
441 self.write_guard.take();
442 if let Some(lock) = self.writer_lock.take() {
443 lock.release()?;
444 }
445 Ok(())
446 }
447}
448
449impl<F: FileBackend> Drop for WriteTxn<'_, F> {
450 fn drop(&mut self) {
451 if self.finished {
452 return;
453 }
454 // Uncommitted txn — roll back. We can't return errors from
455 // `Drop`; best-effort drop the pending buffer + restore the
456 // header snapshot. #55: a library must not write to stderr, so
457 // the implicit-rollback diagnostic is routed to the `tracing`
458 // facade (gated on the `tracing` feature) instead of `eprintln!`.
459 // The early-return rollback pattern is legitimate, so this is a
460 // debug-level event, not a warning.
461 let snap = self.header_at_begin.take();
462 if let Ok(mut pager) = self.env.pager.lock() {
463 rollback_pending(&mut pager);
464 if let Some(s) = snap {
465 let _ = pager.restore_header_snapshot(s);
466 }
467 // M6 #51: pair with `begin_txn` from `WriteTxn::begin`.
468 pager.end_txn();
469 }
470 #[cfg(feature = "tracing")]
471 tracing::debug!("WriteTxn dropped without commit/rollback; pending writes discarded");
472 // Both `write_guard` and `writer_lock` release on drop.
473 }
474}
475
476/// Discard the pager's pending-transaction buffer. Idempotent.
477///
478/// The pager exposes `commit` / `checkpoint` but no explicit
479/// rollback — pending writes simply never make it into the WAL.
480/// This helper drains the in-memory pending map so a re-entered
481/// `WriteTxn` on the same pager observes a clean slate.
482fn rollback_pending<F: FileBackend>(pager: &mut Pager<F>) {
483 pager.rollback_pending_writes();
484}
485
486/// Busy-poll a CAS on the in-process gate (`AtomicBool`) until it is
487/// acquired or `timeout` elapses, returning an owning
488/// [`WriteSerialGuard`]. Power-of-ten Rule 2: the loop is bounded by
489/// the deadline AND a defensive iter counter; a return of
490/// `Err(Error::Busy{WriterInProcess})` is the surfaced alternative to
491/// blocking forever.
492///
493/// The backoff schedule mirrors the prior `Mutex<()>` polyfill and
494/// `platform::lock`'s file-lock retry exactly: 1 ms initial, doubled
495/// per miss, capped at 100 ms. Average overhead under uncontended
496/// load is one `compare_exchange_weak`.
497///
498/// Memory ordering: success uses `Acquire` so the winner observes the
499/// previous holder's writes (paired with the `Release` store in
500/// [`WriteSerialGuard::drop`]); failure uses `Relaxed` (a failed CAS
501/// synchronizes nothing). `compare_exchange_weak` is used because the
502/// retry loop tolerates spurious failures and it is cheaper on some
503/// targets.
504///
505/// Unlike the old `Mutex<()>`, the gate cannot poison — there is no
506/// `Poisoned` arm; a panicking writer's guard `Drop` simply frees the
507/// gate (see the module-level "In-process gate" docs).
508fn acquire_write_serialization(
509 gate: &Arc<AtomicBool>,
510 timeout: Duration,
511) -> Result<WriteSerialGuard> {
512 let start = std::time::Instant::now();
513 let mut backoff = Duration::from_millis(1);
514 let max_backoff = Duration::from_millis(100);
515 // Rule 2 upper bound: the deadline check is the canonical exit;
516 // the iter counter is defensive.
517 let timeout_millis = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
518 let mut iters: u64 = 0;
519 let max_iters = timeout_millis.saturating_add(64);
520 loop {
521 iters = iters.saturating_add(1);
522 if iters > max_iters {
523 return Err(Error::Busy {
524 kind: LockKind::WriterInProcess,
525 });
526 }
527 if gate
528 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
529 .is_ok()
530 {
531 return Ok(WriteSerialGuard {
532 gate: Arc::clone(gate),
533 });
534 }
535 if start.elapsed() >= timeout {
536 return Err(Error::Busy {
537 kind: LockKind::WriterInProcess,
538 });
539 }
540 std::thread::sleep(backoff);
541 backoff = (backoff * 2).min(max_backoff);
542 }
543}
544
545/// A read transaction.
546///
547/// Construct via [`ReadTxn::begin`]. Holds:
548/// 1. A [`ReaderSnapshot`] pinning the WAL end-LSN at construction.
549/// 2. An optional cross-process `READER_LOCK` byte (shared with
550/// other readers but not exclusive with writers — see
551/// `docs/format.md` § File locking).
552///
553/// Reads inside a `ReadTxn` see a consistent snapshot of the
554/// database; pending writes from a concurrent `WriteTxn` and frames
555/// committed AFTER `ReadTxn::begin` are invisible.
556#[derive(Debug)]
557pub struct ReadTxn<'db, F: FileBackend> {
558 env: &'db TxnEnv<F>,
559 snapshot: ReaderSnapshot<F>,
560 _reader_lock: Option<ReaderLock>,
561}
562
563impl<'db, F: FileBackend> ReadTxn<'db, F> {
564 /// Begin a new read transaction.
565 ///
566 /// Acquires a cross-process reader-lock slot (if `env.lock_file`
567 /// is `Some`) with the env's default busy timeout, then takes a
568 /// [`ReaderSnapshot`] from the pager. Readers do not contend
569 /// with each other (31 shared slots) and do not contend with
570 /// writers on the byte-range layer.
571 ///
572 /// # Errors
573 ///
574 /// - [`Error::Busy`] with `LockKind::Reader` on lock timeout.
575 /// - [`Error::Io`] on lock or pager syscall failure.
576 pub fn begin(env: &'db TxnEnv<F>) -> Result<Self> {
577 Self::begin_with_timeout(env, DEFAULT_BUSY_TIMEOUT)
578 }
579
580 /// As [`begin`](Self::begin) with a caller-supplied timeout.
581 ///
582 /// # Errors
583 ///
584 /// See [`begin`](Self::begin).
585 pub fn begin_with_timeout(env: &'db TxnEnv<F>, timeout: Duration) -> Result<Self> {
586 let reader_lock = match env.lock_file.as_ref() {
587 Some(handle) => Some(handle.lock_reader(timeout)?),
588 None => None,
589 };
590 let snapshot = {
591 let mut pager = env.pager.lock().map_err(|_| Error::Busy {
592 kind: LockKind::WriterInProcess,
593 })?;
594 pager.reader_snapshot()?
595 };
596 Ok(Self {
597 env,
598 snapshot,
599 _reader_lock: reader_lock,
600 })
601 }
602
603 /// LSN this txn's snapshot pinned. Diagnostic-only.
604 #[must_use]
605 pub fn pinned_lsn(&self) -> crate::wal::Lsn {
606 self.snapshot.pinned_lsn()
607 }
608
609 /// Read page `id` consistent with the txn's snapshot.
610 ///
611 /// # Errors
612 ///
613 /// As [`ReaderSnapshot::read_page`].
614 pub fn read_page(&self, id: PageId) -> Result<Page> {
615 let pager = self.env.pager.lock().map_err(|_| Error::Busy {
616 kind: LockKind::WriterInProcess,
617 })?;
618 // #81: this public API returns an OWNED `Page`, and the pager
619 // lock guard cannot outlive this call, so we must materialise
620 // an owned page here. `into_page` clones the body only on the
621 // `Shared` arm (and only at this ownership boundary); the hot
622 // decode paths (`get_via_snapshot`, `lookup_via_snapshot`,
623 // range scans) keep the handle and call `as_bytes` instead.
624 Ok(self.snapshot.read_page(&pager, id)?.into_page())
625 }
626
627 /// Access the underlying snapshot. Used by `obj::Db` (M6 issue
628 /// #47) to dispatch typed reads on a read txn.
629 #[must_use]
630 pub fn snapshot(&self) -> &ReaderSnapshot<F> {
631 &self.snapshot
632 }
633
634 /// Access the env this txn lives in. Used by `obj::Db` to
635 /// access the pager mutex.
636 #[must_use]
637 pub fn env(&self) -> &TxnEnv<F> {
638 self.env
639 }
640
641 /// Drop the txn explicitly (releases the snapshot pin and the
642 /// reader lock).
643 pub fn end(self) {
644 drop(self);
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use crate::pager::page::Page;
652 use crate::pager::Config;
653 use crate::platform::FileHandle;
654 use std::sync::Arc;
655 use std::thread;
656 use tempfile::TempDir;
657
658 fn build_env(dir: &TempDir) -> (TxnEnv<FileHandle>, PageId) {
659 let path = dir.path().join("txn.obj");
660 let mut pager = Pager::open(&path, Config::default()).expect("pager");
661 // #64: enter a Pager txn so alloc_page's debug_assert passes.
662 // commit() inside the txn flushes; end_txn balances the
663 // begin_txn so subsequent WriteTxn::begin sees txn_depth=0.
664 pager.begin_txn();
665 let a = pager.alloc_page().expect("alloc");
666 let mut page = Page::zeroed();
667 page.as_bytes_mut()[0] = 0;
668 pager.write_page(a, &page).expect("write");
669 let _ = pager.commit().expect("commit");
670 pager.end_txn();
671 // Issue #1: lock byte lives in the `<db>.obj-lock`
672 // sidecar, not the main file. Mirror the production
673 // wiring so this test exercises the same path layout.
674 let lock_path = crate::pager::lock_path_for(&path);
675 let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock file"));
676 lock_file.set_len(128).expect("lock sidecar len");
677 (TxnEnv::new(pager, Some(lock_file)), a)
678 }
679
680 #[test]
681 fn write_txn_commit_makes_writes_visible() {
682 let dir = TempDir::new().expect("tmp");
683 let (env, a) = build_env(&dir);
684 let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
685 let mut page = Page::zeroed();
686 page.as_bytes_mut()[0] = 0x77;
687 tx.write_page(a, &page).expect("write");
688 tx.commit().expect("commit");
689
690 let rx = ReadTxn::begin(&env).expect("read");
691 let observed = rx.read_page(a).expect("read");
692 assert_eq!(observed.as_bytes()[0], 0x77);
693 }
694
695 #[test]
696 fn write_txn_rollback_drops_writes() {
697 let dir = TempDir::new().expect("tmp");
698 let (env, a) = build_env(&dir);
699 let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
700 let mut page = Page::zeroed();
701 page.as_bytes_mut()[0] = 0x99;
702 tx.write_page(a, &page).expect("write");
703 tx.rollback().expect("rollback");
704
705 let rx = ReadTxn::begin(&env).expect("read");
706 let observed = rx.read_page(a).expect("read");
707 assert_eq!(observed.as_bytes()[0], 0, "rollback must discard writes");
708 }
709
710 #[test]
711 fn in_process_writers_serialize() {
712 let dir = TempDir::new().expect("tmp");
713 let (env, _a) = build_env(&dir);
714 let tx1 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx1");
715 let err = WriteTxn::begin(&env, Duration::from_millis(10)).expect_err("tx2 busy");
716 assert!(matches!(
717 err,
718 Error::Busy {
719 kind: LockKind::WriterInProcess
720 }
721 ));
722 tx1.commit().expect("commit");
723 // After commit, the next txn must succeed.
724 let _tx3 = WriteTxn::begin(&env, Duration::from_millis(50)).expect("tx3");
725 }
726
727 /// Issue #18: `WriteTxn` (and the new `WriteAcquire` /
728 /// `WriteSerialGuard`) must be `Send` so obj-py can move the
729 /// blocking acquire across an `allow_threads` boundary. A
730 /// compile-time assertion via a generic `fn`; the `let _ =` keeps
731 /// it from being dead-code-eliminated.
732 #[test]
733 fn write_txn_is_send() {
734 fn assert_send<T: Send>() {}
735 assert_send::<WriteTxn<'_, FileHandle>>();
736 assert_send::<WriteAcquire<FileHandle>>();
737 assert_send::<WriteSerialGuard>();
738 // ReadTxn is already Send; assert it stays so.
739 assert_send::<ReadTxn<'_, FileHandle>>();
740 }
741
742 /// Issue #18: the gate does not poison. If a writer panics
743 /// mid-transaction, unwinding drops the `WriteTxn` (rollback) then
744 /// the `WriteSerialGuard` (releases the gate), so the NEXT writer
745 /// proceeds — against the rolled-back, consistent state. The old
746 /// `Mutex<()>` would have poisoned and turned every later `begin`
747 /// into a permanent `Busy{WriterInProcess}`.
748 #[test]
749 fn panic_in_writer_releases_gate_and_rolls_back() {
750 let dir = TempDir::new().expect("tmp");
751 let (env, a) = build_env(&dir);
752 let env = Arc::new(env);
753 // Panic while holding a WriteTxn that has staged a write.
754 let env_for_panic = Arc::clone(&env);
755 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
756 let tx = WriteTxn::begin(&env_for_panic, Duration::from_millis(50)).expect("begin");
757 let mut page = Page::zeroed();
758 page.as_bytes_mut()[0] = 0xEE;
759 tx.write_page(a, &page).expect("write");
760 panic!("simulated mid-write crash");
761 }));
762 assert!(result.is_err(), "the closure must have panicked");
763 // The gate is free: the next writer acquires immediately (no
764 // permanent Busy from poisoning).
765 let tx2 = WriteTxn::begin(&env, Duration::from_millis(50))
766 .expect("gate must be free after a panicking writer");
767 tx2.commit().expect("commit");
768 // The panicking writer's staged write was rolled back: the
769 // page reads as its pre-panic value (0), not 0xEE.
770 let rx = ReadTxn::begin(&env).expect("read");
771 let observed = rx.read_page(a).expect("read");
772 assert_eq!(
773 observed.as_bytes()[0],
774 0,
775 "panicking writer's staged write must have been rolled back",
776 );
777 }
778
779 /// 4 writer threads × 1000 iterations each — N writers
780 /// serialize via the in-process Mutex + the cross-process
781 /// `WRITER_LOCK`. Every txn must succeed (no deadlock; no
782 /// aborted txn).
783 #[test]
784 fn n_writers_serialize_with_no_deadlock() {
785 let dir = TempDir::new().expect("tmp");
786 let path = dir.path().join("stress.obj");
787 let mut pager = Pager::open(&path, Config::default()).expect("pager");
788 // #64: wrap alloc_page in a Pager txn so the header-routing
789 // assert passes. end_txn before handing to TxnEnv so writers
790 // get a fresh txn-depth.
791 pager.begin_txn();
792 let a = pager.alloc_page().expect("alloc");
793 let mut page = Page::zeroed();
794 page.as_bytes_mut()[0] = 0;
795 pager.write_page(a, &page).expect("write");
796 let _ = pager.commit().expect("commit");
797 pager.end_txn();
798 // Issue #1: sidecar lock path, see `build_env` above.
799 let lock_path = crate::pager::lock_path_for(&path);
800 let lock_file = Arc::new(FileHandle::open_or_create(&lock_path).expect("lock"));
801 lock_file.set_len(128).expect("lock sidecar len");
802 let env = Arc::new(TxnEnv::new(pager, Some(lock_file)));
803
804 let n_writers = 4usize;
805 let iters_per_writer = 250u32;
806 thread::scope(|scope| {
807 let mut handles = Vec::with_capacity(n_writers);
808 for w in 0..n_writers {
809 let env = Arc::clone(&env);
810 handles.push(scope.spawn(move || {
811 for i in 0..iters_per_writer {
812 // Generous timeout because the writer needs
813 // to block on the other writers in the worst
814 // case.
815 let tx = WriteTxn::begin(&env, Duration::from_secs(30))
816 .expect("begin under load");
817 let mut p = Page::zeroed();
818 p.as_bytes_mut()[0] =
819 u8::try_from((w * 1000 + i as usize) % 250 + 1).expect("byte fits");
820 tx.write_page(a, &p).expect("write");
821 tx.commit().expect("commit");
822 }
823 }));
824 }
825 for h in handles {
826 h.join().expect("join");
827 }
828 });
829
830 // Read back from a fresh read txn — must see SOME writer's
831 // last value; cross-process serialization ensures no torn
832 // page.
833 let rx = ReadTxn::begin(&env).expect("read");
834 let p = rx.read_page(a).expect("read");
835 assert_ne!(p.as_bytes()[0], 0, "some writer's value must be visible");
836 }
837
838 #[test]
839 fn drop_without_commit_warns_and_rolls_back() {
840 let dir = TempDir::new().expect("tmp");
841 let (env, a) = build_env(&dir);
842 {
843 let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("begin");
844 let mut page = Page::zeroed();
845 page.as_bytes_mut()[0] = 0xAB;
846 tx.write_page(a, &page).expect("write");
847 // Drop without commit — implicit rollback.
848 }
849 let rx = ReadTxn::begin(&env).expect("read");
850 let observed = rx.read_page(a).expect("read");
851 assert_eq!(
852 observed.as_bytes()[0],
853 0,
854 "drop-without-commit must roll back",
855 );
856 }
857
858 #[test]
859 fn read_txn_sees_consistent_snapshot() {
860 let dir = TempDir::new().expect("tmp");
861 let (env, a) = build_env(&dir);
862
863 // Reader takes its snapshot now (sees byte 0).
864 let rx = ReadTxn::begin(&env).expect("read");
865
866 // Writer commits a new version.
867 {
868 let tx = WriteTxn::begin(&env, Duration::from_millis(50)).expect("write");
869 let mut p = Page::zeroed();
870 p.as_bytes_mut()[0] = 0x55;
871 tx.write_page(a, &p).expect("write");
872 tx.commit().expect("commit");
873 }
874
875 // Reader STILL sees byte 0 from its pinned snapshot.
876 let observed = rx.read_page(a).expect("read");
877 assert_eq!(
878 observed.as_bytes()[0],
879 0,
880 "snapshot must isolate reader from concurrent commits",
881 );
882 }
883}