Skip to main content

obj/
db.rs

1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in `design.md` § API and pinned in `docs/format.md`
5//! § Db public API contract.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::marker::PhantomData;
9use std::ops::Bound;
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use obj_core::btree::BTree;
15use obj_core::pager::page::PageId;
16use obj_core::pager::{lock_path_for, Pager};
17use obj_core::platform::FileHandle;
18use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
19
20use crate::config::Config;
21use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
22
23/// Per-batch refill size for [`IterAll`]. The iterator yields one
24/// document at a time to the caller, but internally fetches the
25/// primary B-tree in `BATCH` chunks so the per-step pager-lock
26/// acquisition cost amortises over many `next` calls. Power-of-ten
27/// Rule 3: the buffer is fixed-size (constant 256 entries — at
28/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
29/// with the collection's total size.
30const ITER_ALL_BATCH: usize = 256;
31
32/// The embedded document database.
33///
34/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
35/// concurrent reader / single-writer access.  See `design.md`
36/// § "Concurrent readers, one writer".
37///
38/// At M6 the public `Db` is hard-typed against
39/// `obj_core::FileHandle`.  A future refactor may make it generic
40/// over `F: FileBackend` so fault-injection harnesses can build on
41/// the same API; today the test helpers reach for the lower-level
42/// `obj-core` building blocks instead.
43pub struct Db {
44    pub(crate) env: Arc<TxnEnv<FileHandle>>,
45    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
46    pub(crate) readonly: bool,
47    pub(crate) busy_timeout: Duration,
48    /// Per-process cache of collection names whose
49    /// `Document::indexes()` reconciliation has already run. M7
50    /// #57: reconciliation is idempotent but a catalog walk +
51    /// declaration cycle is non-trivial — caching ensures only the
52    /// first `WriteTxn::collection::<T>()` call per process pays
53    /// the cost.
54    pub(crate) reconciled: Arc<Mutex<HashSet<String>>>,
55    /// M11 #93: registry of attached read-only databases keyed by
56    /// namespace. Populated by [`Db::attach`]; consulted by
57    /// [`Db::collection_namespace`] and by
58    /// [`Db::read_transaction`] when it pins per-attached
59    /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
60    /// `&mut self` while live read transactions hold borrows into
61    /// the registry; the mutex coordinates the two.
62    pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
63    /// #83 (c): published length of `attached`, mutated only while the
64    /// `attached` mutex is held. A relaxed load lets the hot read path
65    /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
66    /// no databases are attached (the overwhelmingly common case).
67    pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
68}
69
70impl std::fmt::Debug for Db {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("Db")
73            .field("readonly", &self.readonly)
74            .field("busy_timeout", &self.busy_timeout)
75            .finish_non_exhaustive()
76    }
77}
78
79// ---------- FFI plumbing ------------------------------------------
80//
81// The accessors below expose the shareable internals of `Db` so the
82// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
83// iterator handles without re-implementing the lifecycle. They are
84// marked `#[doc(hidden)]` because user-Rust code should reach for
85// the typed `Db::transaction` / `Db::read_transaction` API; the
86// raw accessors exist so a sibling FFI / native-host crate can
87// build its own self-contained handle types.
88impl Db {
89    /// Shared environment Arc — pager + cross-process lock file.
90    ///
91    /// Used by [`libobj`](../../libobj/index.html) to build owned
92    /// transaction handles whose lifetime extends past a single
93    /// `Db::transaction` closure call.
94    #[doc(hidden)]
95    #[must_use]
96    pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
97        Arc::clone(&self.env)
98    }
99
100    /// Shared catalog Arc.
101    ///
102    /// Used by [`libobj`](../../libobj/index.html) for the same
103    /// reason as [`Self::env_arc`].
104    #[doc(hidden)]
105    #[must_use]
106    pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
107        Arc::clone(&self.catalog)
108    }
109
110    /// Shared per-process reconciliation cache Arc.
111    ///
112    /// Used by [`libobj`](../../libobj/index.html) for the same
113    /// reason as [`Self::env_arc`].
114    #[doc(hidden)]
115    #[must_use]
116    pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<String>>> {
117        Arc::clone(&self.reconciled)
118    }
119
120    /// Busy-lock timeout configured at open time.
121    #[doc(hidden)]
122    #[must_use]
123    pub fn busy_timeout(&self) -> Duration {
124        self.busy_timeout
125    }
126}
127
128impl Db {
129    /// Open or create a file-backed database at `path` with default
130    /// configuration.
131    ///
132    /// Creates the file if absent; reopens otherwise. A `Db` is
133    /// `Send + Sync` — share across threads via `Arc<Db>` for the
134    /// concurrent-reader / single-writer workload documented in
135    /// `docs/concurrency.md`.
136    ///
137    /// For an ephemeral database use [`Db::memory`]; for a
138    /// read-only handle that coexists with another process's writer
139    /// use [`Db::open_readonly`]; for custom durability /
140    /// cache / lock knobs use [`Db::open_with`] + [`Config`].
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// # fn main() -> obj::Result<()> {
146    /// use obj::Db;
147    ///
148    /// let dir = tempfile::tempdir()?;
149    ///
150    /// // File-backed. Creates the file if absent; reopens otherwise.
151    /// let _db = Db::open(dir.path().join("app.obj"))?;
152    ///
153    /// // In-memory. No persistence, no file locks. Useful for tests.
154    /// let _mem = Db::memory()?;
155    ///
156    /// // Read-only. Coexists safely with a writer in another process.
157    /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
158    /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// # Errors
164    ///
165    /// Returns the underlying [`Error`] from
166    /// [`obj_core::pager::Pager::open`] on syscall or format
167    /// failure.
168    #[cfg_attr(
169        feature = "tracing",
170        tracing::instrument(
171            name = "db.open",
172            level = "info",
173            skip_all,
174            fields(path = %path.as_ref().display()),
175        )
176    )]
177    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
178        Self::open_with(path, Config::default())
179    }
180
181    /// Open or create a file-backed database with `config`.
182    ///
183    /// # Errors
184    ///
185    /// As [`Db::open`].
186    // Takes `Config` by value: this is the frozen builder-handoff
187    // signature (the caller's `Config::default().sync_mode(..)...`
188    // chain ends here). `Config` is no longer `Copy` (issue #17), so
189    // clippy now sees the by-value argument as only borrowed in the
190    // body — but the by-value convention is part of the 1.0 surface
191    // and must not change. Power-of-ten Rule 10: documented allow.
192    #[allow(clippy::needless_pass_by_value)]
193    #[cfg_attr(
194        feature = "tracing",
195        tracing::instrument(
196            name = "db.open",
197            level = "info",
198            skip_all,
199            fields(path = %path.as_ref().display()),
200        )
201    )]
202    pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
203        let path_buf = path.as_ref().to_path_buf();
204        // Issue #31: the pager `Config` is no longer `Copy` (its
205        // `encryption_key` zeroizes on drop), so move it out of
206        // `config` rather than copying it — `from_parts` only reads
207        // the remaining scalar fields. `mem::take` leaves a default
208        // (keyless) pager `Config` behind, which is never read again.
209        let pager_config = std::mem::take(&mut config.pager);
210        let pager = Pager::open(&path_buf, pager_config)?;
211        // Issue #1: cross-process locks anchor against a dedicated
212        // `<db>.obj-lock` sidecar file. Keeping the lock byte out
213        // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
214        // Windows once the DB grows past whatever offset the
215        // lock byte would otherwise sit at (`LockFileEx` is
216        // mandatory). The sidecar is left in place across opens;
217        // deleting it would race two openers into two different
218        // inodes and miss each other's exclusion.
219        let lock_file = if config.cross_process_lock {
220            let lock_path = lock_path_for(&path_buf);
221            let handle = FileHandle::open_or_create(&lock_path)?;
222            // Materialise the locked byte range as real file
223            // content. POSIX OFD and Windows `LockFileEx` both
224            // permit locking past EOF, but a non-empty sidecar
225            // is the most conservative choice across kernels
226            // (and lets the same offsets work on every platform).
227            // 128 bytes covers WRITER_LOCK (96) and the full
228            // READER_LOCK_RANGE (97..128).
229            handle.set_len(128)?;
230            Some(Arc::new(handle))
231        } else {
232            None
233        };
234        Self::from_parts(pager, lock_file, &config)
235    }
236
237    /// Open a fresh in-memory database.  No persistence, no file
238    /// locks.  Useful for unit tests and ephemeral workloads.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`Error::InvalidArgument`] only if `config` has
243    /// zero cache frames.
244    pub fn memory() -> Result<Self> {
245        Self::memory_with(Config::default())
246    }
247
248    /// As [`Db::memory`] with a caller-supplied [`Config`].
249    ///
250    /// # Errors
251    ///
252    /// As [`Db::memory`].
253    // By-value `Config` is the frozen builder-handoff signature; see
254    // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
255    // allow (issue #17 dropped `Copy`, which made the lint fire).
256    #[allow(clippy::needless_pass_by_value)]
257    pub fn memory_with(mut config: Config) -> Result<Self> {
258        // Issue #31: move the (no-longer-`Copy`) pager `Config` out
259        // rather than copying it; `from_parts` only reads the
260        // remaining scalar fields. See `open_with` for the rationale.
261        let pager_config = std::mem::take(&mut config.pager);
262        let pager = Pager::memory(pager_config)?;
263        Self::from_parts(pager, None, &config)
264    }
265
266    /// Open the database at `path` in read-only mode.  The
267    /// resulting `Db` rejects every mutating operation with
268    /// `Err(Error::ReadOnly { ... })`.  Coexists safely with a
269    /// writer in another process via the cross-process reader
270    /// lock.
271    ///
272    /// # Errors
273    ///
274    /// As [`Db::open`].
275    pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
276        let config = Config {
277            readonly: true,
278            ..Config::default()
279        };
280        Self::open_with(path, config)
281    }
282
283    fn from_parts(
284        mut pager: Pager<FileHandle>,
285        lock_file: Option<Arc<FileHandle>>,
286        config: &Config,
287    ) -> Result<Self> {
288        // #64: catalog init (and its `tree.insert` → `alloc_page` /
289        // `set_root_catalog` chain) mutates the file header. Header
290        // mutations now ride the WAL via `stage_or_write_header`,
291        // which requires an open Pager txn so the staged page-0 frame
292        // commits atomically with the B-tree pages it depends on.
293        // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
294        // a `Db::open` against a fresh file is durable on return
295        // (matching the pre-#64 contract). `open_or_init` on an
296        // already-initialised database is read-only and the wrapped
297        // `commit()` is a no-op (`Pager::commit` short-circuits on an
298        // empty pending set + clean `header_dirty`).
299        pager.begin_txn();
300        let init = Catalog::open_or_init(&mut pager);
301        let catalog = match init {
302            Ok(c) => {
303                let commit_result = pager.commit();
304                pager.end_txn();
305                commit_result?;
306                c
307            }
308            Err(e) => {
309                pager.end_txn();
310                return Err(e);
311            }
312        };
313        // M11 #91: lightweight open-time integrity check. Runs the
314        // catalog-portion of `integrity_check` and surfaces any
315        // detected failure as the strongest available error so the
316        // caller decides whether to attempt repair or abort. Opt out
317        // via `Config::skip_open_check(true)`.
318        if !config.skip_open_check {
319            let report = obj_core::integrity::quick_check(&mut pager)?;
320            if let Some(err) = first_failure_as_error(&report) {
321                return Err(err);
322            }
323        }
324        Ok(Self {
325            env: Arc::new(TxnEnv::new(pager, lock_file)),
326            catalog: Arc::new(Mutex::new(catalog)),
327            readonly: config.readonly,
328            busy_timeout: config.busy_timeout,
329            reconciled: Arc::new(Mutex::new(HashSet::new())),
330            attached: Arc::new(Mutex::new(HashMap::new())),
331            attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
332        })
333    }
334
335    /// Run a closure inside a write transaction.
336    ///
337    /// Begins a [`WriteTxn`], runs the closure with `&mut tx`.  If
338    /// the closure returns `Ok(r)`, the transaction is committed and
339    /// `Ok(r)` is returned.  If the closure returns `Err(e)`, the
340    /// transaction is rolled back and `Err(e)` is returned.  A
341    /// panic inside the closure unwinds with an implicit rollback
342    /// via the `WriteTxn` `Drop` impl. See `docs/concurrency.md`
343    /// for the lock-acquisition contract.
344    ///
345    /// # Examples
346    ///
347    /// Atomic batch — both inserts commit together or not at all:
348    ///
349    /// ```
350    /// # fn main() -> obj::Result<()> {
351    /// use obj::Db;
352    /// use serde::{Deserialize, Serialize};
353    ///
354    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
355    /// struct Order {
356    ///     customer_id: u64,
357    ///     total_cents: u64,
358    /// }
359    ///
360    /// let dir = tempfile::tempdir()?;
361    /// let db = Db::open(dir.path().join("txn.obj"))?;
362    ///
363    /// let (a, b) = db.transaction(|tx| {
364    ///     let coll = tx.collection::<Order>()?;
365    ///     let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
366    ///     let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
367    ///     Ok((a, b))
368    /// })?;
369    /// assert_ne!(a, b, "freshly-allocated ids are distinct");
370    /// # Ok(())
371    /// # }
372    /// ```
373    ///
374    /// Returning `Err(_)` rolls every staged write back; the `Err`
375    /// the closure returns is the `Err` the caller sees:
376    ///
377    /// ```
378    /// # fn main() -> obj::Result<()> {
379    /// use obj::{Db, Error};
380    /// use serde::{Deserialize, Serialize};
381    ///
382    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
383    /// struct Order { total_cents: u64 }
384    ///
385    /// let dir = tempfile::tempdir()?;
386    /// let db = Db::open(dir.path().join("rollback.obj"))?;
387    /// let id = db.insert(Order { total_cents: 10 })?;
388    ///
389    /// let outcome: obj::Result<()> = db.transaction(|tx| {
390    ///     let coll = tx.collection::<Order>()?;
391    ///     coll.update(id, |o| { o.total_cents = 99_999; })?;
392    ///     Err(Error::InvalidArgument("synthetic abort"))
393    /// });
394    /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
395    ///
396    /// let after: Order = db
397    ///     .get::<Order>(id)?
398    ///     .ok_or(Error::InvalidArgument("just inserted"))?;
399    /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
400    /// # Ok(())
401    /// # }
402    /// ```
403    ///
404    /// # Errors
405    ///
406    /// - [`Error::ReadOnly`] if the database was opened read-only.
407    /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
408    /// - Any error the closure returns.
409    #[cfg_attr(
410        feature = "tracing",
411        tracing::instrument(name = "db.transaction", level = "info", skip_all)
412    )]
413    pub fn transaction<R, F>(&self, body: F) -> Result<R>
414    where
415        F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
416    {
417        if self.readonly {
418            return Err(Error::ReadOnly {
419                operation: "transaction",
420            });
421        }
422        #[cfg(feature = "tracing")]
423        tracing::debug!("begin");
424        let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
425        let mut tx = WriteTxn::new(
426            inner,
427            Arc::clone(&self.catalog),
428            Arc::clone(&self.reconciled),
429        );
430        match body(&mut tx) {
431            Ok(value) => {
432                tx.commit()?;
433                #[cfg(feature = "tracing")]
434                tracing::debug!("commit");
435                Ok(value)
436            }
437            Err(e) => {
438                // Best-effort rollback; surface the closure's
439                // error.  Refresh the in-memory catalog after
440                // rollback so its B-tree root state matches the
441                // file's `root_catalog` header field — catalog
442                // mutations write the header directly (not through
443                // the WAL) so a rollback leaves the header pointing
444                // at the most-recent in-memory root, which is
445                // exactly what re-opening the catalog will pick up.
446                let _ = tx.rollback();
447                let _ = self.refresh_catalog();
448                #[cfg(feature = "tracing")]
449                tracing::debug!("rollback");
450                Err(e)
451            }
452        }
453    }
454
455    /// Re-open the in-memory `Catalog` handle from the pager.  Used
456    /// after a transaction rollback to discard the catalog's in-
457    /// memory `next_collection_id` / `tree.root` state that may
458    /// have advanced during the rolled-back closure.
459    fn refresh_catalog(&self) -> Result<()> {
460        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
461            kind: obj_core::LockKind::WriterInProcess,
462        })?;
463        let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
464        let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
465            kind: obj_core::LockKind::WriterInProcess,
466        })?;
467        *existing = fresh;
468        Ok(())
469    }
470
471    /// Run a closure inside a read transaction.  See
472    /// [`Self::transaction`] for the closure shape and atomicity
473    /// contract; reads inside `body` observe a consistent
474    /// snapshot of the database.
475    ///
476    /// Every read inside the closure observes a single consistent
477    /// snapshot — the snapshot is pinned at the moment the closure
478    /// begins. Concurrent writers do not affect what the closure
479    /// sees.
480    ///
481    /// # Examples
482    ///
483    /// Two reads inside one `read_transaction` see the same value
484    /// even if a writer commits in between:
485    ///
486    /// ```
487    /// # fn main() -> obj::Result<()> {
488    /// use obj::Db;
489    /// use serde::{Deserialize, Serialize};
490    ///
491    /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
492    /// struct Order { total_cents: u64 }
493    ///
494    /// let dir = tempfile::tempdir()?;
495    /// let db = Db::open(dir.path().join("read.obj"))?;
496    /// let id = db.insert(Order { total_cents: 10 })?;
497    ///
498    /// let (a, b) = db.read_transaction(|tx| {
499    ///     let coll = tx.collection::<Order>()?;
500    ///     let a = coll.get(id)?;
501    ///     let b = coll.get(id)?;
502    ///     Ok((a, b))
503    /// })?;
504    /// assert_eq!(a, b);
505    /// # Ok(())
506    /// # }
507    /// ```
508    ///
509    /// # Errors
510    ///
511    /// - [`Error::Busy`] if the reader lock could not be acquired.
512    /// - Any error the closure returns.
513    #[cfg_attr(
514        feature = "tracing",
515        tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
516    )]
517    pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
518    where
519        F: FnOnce(&ReadTxn<'_>) -> Result<R>,
520    {
521        #[cfg(feature = "tracing")]
522        tracing::debug!("begin");
523        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
524        // M11 #93: pin one snapshot per attached database so reads
525        // through the closure observe a consistent view per file.
526        let attached_contexts = self.pin_attached_snapshots()?;
527        let tx = ReadTxn::with_attached(inner, attached_contexts);
528        let result = body(&tx);
529        #[cfg(feature = "tracing")]
530        tracing::debug!("commit");
531        result
532    }
533
534    /// Snapshot every attached database into a per-namespace
535    /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
536    /// pin; dropping the returned map releases every pin.
537    fn pin_attached_snapshots(
538        &self,
539    ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
540        // #83 (c): the common case is no attached databases. A single
541        // relaxed load of `attached_len` lets the empty path skip the
542        // `self.attached` mutex acquire entirely (`with_capacity(0)`
543        // already does not allocate, so the mutex is the only cost).
544        //
545        // Correctness under concurrent detach: the map produced here
546        // feeds only namespaced read dispatch in `open_readonly_named`
547        // (`<ns>.<tail>` → attached snapshot). With zero attachments no
548        // namespace can resolve, so an empty map is observably the same
549        // as the locked build. `attached_len` is mutated only while the
550        // `attached` mutex is held (in `attach` / `detach`), so a `0`
551        // observed here means "no attachment is committed to the map",
552        // matching what a lock-and-read would have seen at that instant.
553        // A concurrent attach that has not yet published its count is
554        // exactly an attach ordered *after* this txn began — invisible
555        // by design, same as the cross-process snapshot semantics.
556        if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
557            return Ok(std::collections::HashMap::new());
558        }
559        let registry = self.attached.lock().map_err(|_| Error::Busy {
560            kind: obj_core::LockKind::WriterInProcess,
561        })?;
562        let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
563            std::collections::HashMap::with_capacity(registry.len());
564        for (namespace, attached) in registry.iter() {
565            let env = Arc::clone(&attached.env);
566            let snapshot = {
567                let mut pager = env.pager().lock().map_err(|_| Error::Busy {
568                    kind: obj_core::LockKind::WriterInProcess,
569                })?;
570                pager.reader_snapshot()?
571            };
572            out.insert(
573                namespace.clone(),
574                crate::txn::AttachedReadCtx { env, snapshot },
575            );
576        }
577        Ok(out)
578    }
579
580    /// Attach the database at `path` under `namespace`. The
581    /// attached file is opened read-only; collections in the
582    /// attached file become visible to subsequent
583    /// `Db::collection::<T>()` calls (and the one-shot per-op API
584    /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
585    /// form `<namespace>.<collection_name>`.
586    ///
587    /// Writes against namespaced collections return
588    /// [`Error::AttachedDatabaseIsReadOnly`].
589    ///
590    /// Each attached database gets its own snapshot pinned at
591    /// read-transaction begin; [`Db::detach`] removes the registry
592    /// entry but in-flight reads complete against their pinned
593    /// snapshot.
594    ///
595    /// # Examples
596    ///
597    /// ```
598    /// # fn main() -> obj::Result<()> {
599    /// use obj::Db;
600    /// use serde::{Deserialize, Serialize};
601    ///
602    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
603    /// #[obj(collection = "orders_attach_doc")]
604    /// struct Order { total_cents: u64 }
605    ///
606    /// // Same struct shape, namespaced collection name. Reads
607    /// // against this type route to the attached "archive" db.
608    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
609    /// #[obj(collection = "archive.orders_attach_doc")]
610    /// struct ArchivedOrder { total_cents: u64 }
611    ///
612    /// let dir = tempfile::tempdir()?;
613    /// let live = dir.path().join("live.obj");
614    /// let archive = dir.path().join("archive.obj");
615    ///
616    /// // Seed the archive (writes go through its own un-namespaced type).
617    /// {
618    ///     let archive_db = Db::open(&archive)?;
619    ///     let _ = archive_db.insert(Order { total_cents: 999 })?;
620    /// }
621    ///
622    /// // Open the live db, attach the archive under "archive".
623    /// let mut db = Db::open(&live)?;
624    /// let _ = db.insert(Order { total_cents: 100 })?;
625    /// db.attach(&archive, "archive")?;
626    ///
627    /// // One read transaction, two collections.
628    /// db.read_transaction(|tx| {
629    ///     let live = tx.collection::<Order>()?;
630    ///     let arch = tx.collection::<ArchivedOrder>()?;
631    ///     assert_eq!(live.all()?.len(), 1);
632    ///     assert_eq!(arch.all()?.len(), 1);
633    ///     Ok(())
634    /// })?;
635    ///
636    /// db.detach("archive")?;
637    /// # Ok(())
638    /// # }
639    /// ```
640    ///
641    /// # Errors
642    ///
643    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
644    ///   use on this `Db`.
645    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
646    ///   opened read-only.
647    pub fn attach<P: AsRef<std::path::Path>>(
648        &mut self,
649        path: P,
650        namespace: impl Into<String>,
651    ) -> Result<()> {
652        let namespace = namespace.into();
653        let path_buf = path.as_ref().to_path_buf();
654        {
655            let registry = self.attached.lock().map_err(|_| Error::Busy {
656                kind: obj_core::LockKind::WriterInProcess,
657            })?;
658            if registry.contains_key(&namespace) {
659                return Err(Error::AttachmentAlreadyExists { namespace });
660            }
661        }
662        let attached_db =
663            Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
664                path: path_buf.clone(),
665                source: Box::new(source),
666            })?;
667        let env = Arc::clone(&attached_db.env);
668        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
669            kind: obj_core::LockKind::WriterInProcess,
670        })?;
671        // Re-check after acquiring the lock — a sibling caller may
672        // have raced; the `Arc<Mutex<_>>` shape makes the check
673        // necessary on the second acquire.
674        if registry.contains_key(&namespace) {
675            return Err(Error::AttachmentAlreadyExists { namespace });
676        }
677        registry.insert(
678            namespace,
679            AttachedDb {
680                env,
681                _db: attached_db,
682            },
683        );
684        // #83 (c): publish the new length while still holding the
685        // `attached` mutex so `pin_attached_snapshots`' relaxed load is
686        // never stale relative to a committed attachment.
687        self.attached_len
688            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
689        Ok(())
690    }
691
692    /// Remove the attachment registered under `namespace`. Returns
693    /// [`Error::CollectionNamespaceUnknown`] if the namespace is
694    /// not attached.
695    ///
696    /// In-flight read transactions hold their own snapshot pins on
697    /// the attached env; detach removes the registry entry, but the
698    /// in-flight read may still complete against its pinned
699    /// snapshot.
700    ///
701    /// # Errors
702    ///
703    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
704    ///   not attached.
705    /// - [`Error::Busy`] if the registry mutex is poisoned.
706    pub fn detach(&mut self, namespace: &str) -> Result<()> {
707        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
708            kind: obj_core::LockKind::WriterInProcess,
709        })?;
710        if registry.remove(namespace).is_none() {
711            return Err(Error::CollectionNamespaceUnknown {
712                namespace: namespace.to_owned(),
713            });
714        }
715        // #83 (c): publish the post-detach length under the lock.
716        self.attached_len
717            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
718        Ok(())
719    }
720
721    /// Insert `doc` into its collection.  One-shot transaction;
722    /// returns the assigned [`Id`].
723    ///
724    /// The one-shot API opens, commits, and closes a private
725    /// transaction per call. Reach for [`Db::transaction`] when
726    /// several mutations must commit or roll back as a single
727    /// atomic unit.
728    ///
729    /// # Examples
730    ///
731    /// One-shot CRUD against a `Document`-derived type:
732    ///
733    /// ```
734    /// # fn main() -> obj::Result<()> {
735    /// use obj::Db;
736    /// use serde::{Deserialize, Serialize};
737    ///
738    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
739    /// struct Order {
740    ///     customer_id: u64,
741    ///     total_cents: u64,
742    ///     status: String,
743    /// }
744    ///
745    /// let dir = tempfile::tempdir()?;
746    /// let db = Db::open(dir.path().join("oneshot.obj"))?;
747    ///
748    /// // insert returns the freshly-allocated Id.
749    /// let id = db.insert(Order {
750    ///     customer_id: 1,
751    ///     total_cents: 100,
752    ///     status: "pending".to_owned(),
753    /// })?;
754    ///
755    /// // get returns Option<T>.
756    /// let _maybe: Option<Order> = db.get::<Order>(id)?;
757    ///
758    /// // update applies a closure in place.
759    /// db.update::<Order, _>(id, |o| {
760    ///     o.status = "shipped".to_owned();
761    /// })?;
762    ///
763    /// // upsert at a caller-supplied id (insert or replace).
764    /// let id2 = obj::Id::try_new(42)
765    ///     .ok_or(obj::Error::InvalidArgument("non-zero"))?;
766    /// db.upsert::<Order>(id2, Order {
767    ///     customer_id: 2,
768    ///     total_cents: 999,
769    ///     status: "new".to_owned(),
770    /// })?;
771    ///
772    /// // delete returns true if the row existed.
773    /// let existed = db.delete::<Order>(id)?;
774    /// assert!(existed);
775    /// # Ok(())
776    /// # }
777    /// ```
778    ///
779    /// # Errors
780    ///
781    /// As [`Self::transaction`] plus any error from
782    /// [`crate::Collection::insert`].
783    pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
784        self.transaction(|tx| tx.collection::<T>()?.insert(doc))
785    }
786
787    /// Fetch the document at `id`.  Returns `Ok(None)` if absent.
788    ///
789    /// # Errors
790    ///
791    /// As [`Self::read_transaction`] plus any error from
792    /// [`crate::Collection::get`].
793    pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
794        // #83 (b): the one-shot point read goes through the fused
795        // single-pager-lock path (descriptor resolve + primary get
796        // under one guard) instead of `tx.collection()?.get()`, which
797        // takes the pager mutex twice. Observably identical for the
798        // one-shot caller (same snapshot, same `CollectionNotFound`
799        // contract). Namespaced reads fall back to the handle path.
800        self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
801    }
802
803    /// Update the document at `id` via the closure.
804    ///
805    /// # Errors
806    ///
807    /// - [`Error::DocumentNotFound`] if `id` does not exist.
808    /// - As [`Self::transaction`].
809    pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
810    where
811        T: Document,
812        F: FnOnce(&mut T),
813    {
814        self.transaction(|tx| tx.collection::<T>()?.update(id, f))
815    }
816
817    /// Delete the document at `id`.  Returns `true` if it existed.
818    ///
819    /// # Errors
820    ///
821    /// As [`Self::transaction`].
822    pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
823        self.transaction(|tx| tx.collection::<T>()?.delete(id))
824    }
825
826    /// Insert or replace the document at `id`.
827    ///
828    /// # Errors
829    ///
830    /// As [`Self::transaction`].
831    pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
832        self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
833    }
834
835    /// Convenience wrapper around [`crate::Collection::find_unique`]
836    /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
837    /// Runs inside a one-shot read transaction.
838    ///
839    /// `O(log n)`, no collection scan; the lookup walks the named
840    /// index's B-tree directly. Defined only on `Unique` indexes —
841    /// for the other kinds use
842    /// [`Collection::lookup`](crate::Collection::lookup) or
843    /// [`Collection::index_range`](crate::Collection::index_range).
844    ///
845    /// # Examples
846    ///
847    /// ```
848    /// # fn main() -> obj::Result<()> {
849    /// use obj::Db;
850    /// use serde::{Deserialize, Serialize};
851    ///
852    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
853    /// #[obj(collection = "customers_find_unique_doc")]
854    /// struct Customer {
855    ///     #[obj(index = unique)]
856    ///     email: String,
857    /// }
858    ///
859    /// let dir = tempfile::tempdir()?;
860    /// let db = Db::open(dir.path().join("find-unique.obj"))?;
861    /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
862    /// let by_email: Option<Customer> = db
863    ///     .find_unique::<Customer>("email", "ada@example.com")?;
864    /// assert!(by_email.is_some());
865    /// # Ok(())
866    /// # }
867    /// ```
868    ///
869    /// # Errors
870    ///
871    /// As [`crate::Collection::find_unique`].
872    pub fn find_unique<T: Document>(
873        &self,
874        index_name: &str,
875        key: impl Into<obj_core::codec::Dynamic>,
876    ) -> Result<Option<T>> {
877        self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
878    }
879
880    /// Construct a fresh M8 [`crate::Query`] builder rooted at this
881    /// database. The builder borrows `&self` for the build phase;
882    /// the borrow ends when [`crate::Query::fetch`] returns.
883    ///
884    /// Compose with [`Query::filter`](crate::Query::filter),
885    /// [`Query::limit`](crate::Query::limit),
886    /// [`Query::sort_by`](crate::Query::sort_by),
887    /// [`Query::index_range`](crate::Query::index_range). Terminate
888    /// with [`Query::fetch`](crate::Query::fetch) (for the
889    /// documents) or [`Query::count`](crate::Query::count) (for the
890    /// count alone).
891    ///
892    /// Mirrors `design.md` § Querying — see the M8 examples in
893    /// `crates/obj/tests/design_md_queries.rs` for the full surface.
894    ///
895    /// # Examples
896    ///
897    /// Top-N matching documents by an indexed field, ascending:
898    ///
899    /// ```
900    /// # fn main() -> obj::Result<()> {
901    /// use obj::Db;
902    /// use obj_core::codec::Dynamic;
903    /// use serde::{Deserialize, Serialize};
904    ///
905    /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
906    /// enum OrderStatus { Pending, Shipped }
907    ///
908    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
909    /// #[obj(collection = "orders_query_doc")]
910    /// struct Order {
911    ///     #[obj(index)]
912    ///     customer_id: u64,
913    ///     status: OrderStatus,
914    ///     #[obj(index)]
915    ///     placed_at: u64,
916    /// }
917    ///
918    /// let dir = tempfile::tempdir()?;
919    /// let db = Db::open(dir.path().join("queries.obj"))?;
920    /// for i in 0..20u64 {
921    ///     let _ = db.insert(Order {
922    ///         customer_id: i % 3,
923    ///         status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
924    ///         placed_at: i * 1_000,
925    ///     })?;
926    /// }
927    ///
928    /// let pending: Vec<Order> = db
929    ///     .query::<Order>()
930    ///     .filter(|o| o.status == OrderStatus::Pending)
931    ///     .sort_by(|o| Dynamic::U64(o.placed_at))
932    ///     .limit(5)
933    ///     .fetch()?;
934    /// assert!(pending.len() <= 5);
935    /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
936    /// # Ok(())
937    /// # }
938    /// ```
939    #[must_use]
940    pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
941        crate::Query::new(self)
942    }
943
944    /// Open a read-only typed handle to the collection registered
945    /// under the runtime `name`, instead of the type's compile-time
946    /// `T::COLLECTION`.
947    ///
948    /// This unlocks the `design.md` § Portability example for
949    /// attached databases: by passing a namespaced name like
950    /// `"archive.orders"`, the returned [`crate::Collection`] reads
951    /// from the database attached under the `"archive"` namespace
952    /// — see [`Db::attach`].
953    ///
954    /// Construction is **infallible**: errors (missing collection,
955    /// unknown namespace, busy lock) surface at the first method
956    /// call on the handle, not at the call to `collection(name)`.
957    /// Each read-only method on the returned handle opens a private
958    /// [`Db::read_transaction`] and dispatches against the
959    /// runtime-named collection's catalog row.
960    ///
961    /// # Read-only
962    ///
963    /// The returned handle rejects every mutating call —
964    /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
965    /// all return [`Error::ReadOnly`]. To write into a non-default
966    /// collection, override [`obj_core::Document::COLLECTION`] on
967    /// the type itself (compile-time-bound) and use the regular
968    /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
969    /// Phase 1B (M11 #94) intentionally limits the runtime accessor
970    /// to reads — the write-through-runtime-name path requires
971    /// engine plumbing that is deferred to a later milestone.
972    ///
973    /// # Example
974    ///
975    /// ```
976    /// use obj::{Db, Document};
977    /// use serde::{Deserialize, Serialize};
978    /// use tempfile::tempdir;
979    ///
980    /// #[derive(Debug, Clone, Serialize, Deserialize)]
981    /// struct Order { customer_id: u64, total_cents: u64 }
982    ///
983    /// impl Document for Order {
984    ///     const COLLECTION: &'static str = "orders";
985    ///     const VERSION: u32 = 1;
986    /// }
987    ///
988    /// fn run() -> obj::Result<()> {
989    ///     let dir = tempdir()?;
990    ///     let archive_path = dir.path().join("archive.obj");
991    ///     // Populate the archive database first.
992    ///     {
993    ///         let archive_db = Db::open(&archive_path)?;
994    ///         archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
995    ///     }
996    ///     // Attach it under a namespace and read via the runtime
997    ///     // accessor — no need to re-declare `Order` with a
998    ///     // namespaced `COLLECTION`.
999    ///     let main_path = dir.path().join("main.obj");
1000    ///     let mut db = Db::open(&main_path)?;
1001    ///     db.attach(&archive_path, "archive")?;
1002    ///     let archived: Vec<Order> = db
1003    ///         .collection::<Order>("archive.orders")
1004    ///         .all()?
1005    ///         .into_iter()
1006    ///         .map(|(_id, doc)| doc)
1007    ///         .collect();
1008    ///     assert_eq!(archived.len(), 1);
1009    ///     Ok(())
1010    /// }
1011    /// # run().unwrap();
1012    /// ```
1013    #[must_use]
1014    pub fn collection<T: Document + Send + 'static>(
1015        &self,
1016        name: impl Into<String>,
1017    ) -> crate::Collection<'_, T> {
1018        crate::Collection::<T>::lazy(self, name.into())
1019    }
1020
1021    /// Convenience shim mirroring `design.md` —
1022    /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1023    /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1024    /// that drives the streaming iterator to exhaustion and
1025    /// collects; if the collection is large enough that peak
1026    /// memory matters, prefer [`Db::iter_all`] directly.
1027    ///
1028    /// Sorting requires materialisation — the comparator needs
1029    /// every key up front. Use [`Db::query`] +
1030    /// [`Query::sort_by`](crate::Query::sort_by) for the
1031    /// top-N-sorted workload; the iterator side has no streaming
1032    /// sorted shape.
1033    ///
1034    /// # Examples
1035    ///
1036    /// ```
1037    /// # fn main() -> obj::Result<()> {
1038    /// use obj::Db;
1039    /// use serde::{Deserialize, Serialize};
1040    ///
1041    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1042    /// struct Order { total_cents: u64 }
1043    ///
1044    /// let dir = tempfile::tempdir()?;
1045    /// let db = Db::open(dir.path().join("all.obj"))?;
1046    /// for i in 0..5u64 {
1047    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1048    /// }
1049    /// let listed: Vec<Order> = db.all::<Order>()?;
1050    /// assert_eq!(listed.len(), 5);
1051    /// # Ok(())
1052    /// # }
1053    /// ```
1054    ///
1055    /// # Errors
1056    ///
1057    /// As [`Db::iter_all`].
1058    pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1059        self.iter_all::<T>()?
1060            .map(|step| step.map(|(_id, doc)| doc))
1061            .collect()
1062    }
1063
1064    /// Write a self-contained `.obj` file at `dest` carrying this
1065    /// database's state at the LSN of an internally-taken reader
1066    /// snapshot.
1067    ///
1068    /// Hot backup — writers continue uninterrupted against the
1069    /// source. Post-snapshot writes are NOT in the destination.
1070    ///
1071    /// Algorithm (per `docs/format.md` § Hot backup):
1072    ///
1073    /// 1. Take a `ReaderSnapshot` against the source pager (pins
1074    ///    `pinned_lsn`).
1075    /// 2. `OpenOptions::create_new(true)` on `dest`.
1076    /// 3. Copy main-file pages `0..page_count` to `dest`.
1077    /// 4. Overlay every frame in the snapshot's frozen WAL view
1078    ///    onto `dest` at the frame's page-id offset.
1079    /// 5. If the snapshot carries a WAL-staged page-0 header,
1080    ///    overlay it (so `dest`'s page-0 reflects the catalog
1081    ///    root / freelist head / page count the snapshot would
1082    ///    have observed).
1083    /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1084    ///    the header CRC32C.
1085    /// 7. `sync_data(SyncMode::Full)` on `dest`.
1086    /// 8. Drop the snapshot (releases the WAL pin).
1087    ///
1088    /// On any mid-backup error the destination file is removed
1089    /// best-effort so a half-written backup does not linger.
1090    ///
1091    /// # Examples
1092    ///
1093    /// ```
1094    /// # fn main() -> obj::Result<()> {
1095    /// use obj::Db;
1096    /// use serde::{Deserialize, Serialize};
1097    ///
1098    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1099    /// #[obj(collection = "notes_backup_doc")]
1100    /// struct Note { body: String }
1101    ///
1102    /// let dir = tempfile::tempdir()?;
1103    /// let src = dir.path().join("src.obj");
1104    /// let dst = dir.path().join("backup.obj");
1105    ///
1106    /// let db = Db::open(&src)?;
1107    /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1108    /// db.backup_to(&dst)?;
1109    ///
1110    /// // The backup is itself a fully-formed obj file. Open it and read.
1111    /// let backup = Db::open(&dst)?;
1112    /// let listed: Vec<Note> = backup.all::<Note>()?;
1113    /// assert_eq!(listed.len(), 1);
1114    /// # Ok(())
1115    /// # }
1116    /// ```
1117    ///
1118    /// # Errors
1119    ///
1120    /// - [`Error::BackupDestinationExists`] if `dest` already
1121    ///   exists.
1122    /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1123    ///   a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1124    /// - [`Error::Io`] on syscall failure during the copy.
1125    pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1126        // #76: a hot backup reads the source MAIN FILE page-by-page
1127        // (`backup::copy_main_file`). The in-process pager `Mutex`
1128        // alone excludes only same-process writers; a writer in a
1129        // SEPARATE OS process can checkpoint pages into the main file
1130        // mid-copy and yield a torn backup. Hold the cross-process
1131        // `WRITER_LOCK` for the whole copy so no external writer can
1132        // mutate the main file under us.
1133        //
1134        // Lock-order invariant (MUST match `WriteTxn::begin`, see
1135        // `docs/concurrency.md`): in-process write-serialization FIRST,
1136        // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1137        // (innermost, acquired and released within the critical
1138        // section). `obj_core::WriteTxn::begin` acquires the first two
1139        // in exactly that order, so routing through it inherits the
1140        // correct ordering and cannot invert against a concurrent
1141        // in-process `WriteTxn`. We make NO writes; the txn is rolled
1142        // back (a no-op restore of the unchanged header) once the copy
1143        // is done. On in-memory / `cross_process_lock = false` envs the
1144        // cross-process guard is absent (`lock_file = None`) and the
1145        // txn degrades to the in-process serialization guard only.
1146        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1147        let result = self.run_backup_under_guard(dest);
1148        // Release both lock layers in the documented order regardless
1149        // of the copy's outcome. A rollback failure (poisoned pager
1150        // mutex) only surfaces when the copy itself succeeded.
1151        let unlock = guard.rollback();
1152        result.and(unlock)
1153    }
1154
1155    /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1156    /// reader snapshot, and run the backup copy. Factored out of
1157    /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1158    /// is unambiguous and the function stays within the power-of-ten
1159    /// 60-line budget (Rule 4).
1160    fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1161        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1162            kind: obj_core::LockKind::WriterInProcess,
1163        })?;
1164        let snapshot = pager.reader_snapshot()?;
1165        obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1166        // Snapshot drops at end of scope; the pin is released and
1167        // any deferred checkpoint may proceed.
1168        drop(snapshot);
1169        drop(pager);
1170        Ok(())
1171    }
1172
1173    /// Fold committed WAL pages into the main `.obj` file and reset
1174    /// the WAL back to its 64-byte header.
1175    ///
1176    /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1177    /// pager lock the same way [`Self::backup_to`] does, then calls
1178    /// the pager checkpoint. After it returns, the committed records
1179    /// live in the main file rather than the `-wal` sidecar, and the
1180    /// WAL is truncated to header-only.
1181    ///
1182    /// # Deferred / no-op behavior
1183    ///
1184    /// - If a live MVCC reader has pinned an LSN below the end of the
1185    ///   WAL, the checkpoint is **deferred** (a safe no-op) so the
1186    ///   reader's frames are not reclaimed out from under it.
1187    /// - If there is nothing to fold (no committed WAL frames), the
1188    ///   call is a harmless no-op.
1189    ///
1190    /// This is the reusable engine entry point behind the Python
1191    /// `Db.checkpoint()` binding and a future checkpoint-on-close
1192    /// path (issue #5).
1193    ///
1194    /// # Errors
1195    ///
1196    /// - [`Error::ReadOnly`] if the database was opened read-only.
1197    /// - [`Error::Busy`] if the pager lock is poisoned.
1198    /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1199    ///   (e.g. [`Error::Io`] on syscall failure).
1200    #[cfg_attr(
1201        feature = "tracing",
1202        tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1203    )]
1204    pub fn checkpoint(&self) -> Result<()> {
1205        if self.readonly {
1206            return Err(Error::ReadOnly {
1207                operation: "checkpoint",
1208            });
1209        }
1210        // #76: checkpoint folds committed WAL frames into the main
1211        // file and rotates the WAL salt. Both touch the main file and
1212        // the on-disk header. Taken under only the in-process pager
1213        // `Mutex` (as before), a writer in a SEPARATE process could
1214        // run its own commit/checkpoint concurrently and corrupt the
1215        // salt-rotation handshake or interleave main-file writes. Hold
1216        // the cross-process `WRITER_LOCK` for the duration, in the
1217        // documented lock order (in-process serialization → cross-
1218        // process writer lock → pager mutex; see `WriteTxn::begin` and
1219        // `Self::backup_to`). We make no user writes; the surrounding
1220        // txn is rolled back afterwards. The rollback's
1221        // `restore_header_snapshot` only rewrites `root_catalog` /
1222        // `freelist_head` / `page_count` (unchanged by checkpoint) and
1223        // preserves the freshly-rotated `wal_salt`, so it is a no-op
1224        // with respect to the checkpoint's durable effects.
1225        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1226        let result = self.run_checkpoint_under_guard();
1227        let unlock = guard.rollback();
1228        result.and(unlock)
1229    }
1230
1231    /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1232    /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1233    /// cross-process lock guard's lifetime is unambiguous.
1234    fn run_checkpoint_under_guard(&self) -> Result<()> {
1235        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1236            kind: obj_core::LockKind::WriterInProcess,
1237        })?;
1238        pager.checkpoint()?;
1239        drop(pager);
1240        Ok(())
1241    }
1242
1243    /// Streaming iterator over every `(Id, T)` pair in the
1244    /// collection. The returned [`IterAll`] holds a read transaction
1245    /// — and therefore a pinned reader snapshot — for its entire
1246    /// lifetime; the borrow on `self` ends when the iterator is
1247    /// dropped.
1248    ///
1249    /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1250    /// errors surface as `Some(Err(_))` rather than ending the
1251    /// iteration; the caller decides whether to propagate or
1252    /// continue.
1253    ///
1254    /// Peak memory does NOT scale with collection size — the
1255    /// iterator's internal buffer is fixed at
1256    /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the design.md
1257    /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1258    ///
1259    /// `Query::fetch` is sort-compatible (sort requires
1260    /// materialisation); `iter_all` is NOT — there is no streaming
1261    /// shape for sort because the comparator needs every key
1262    /// up front. Use `Query::sort_by` + `fetch` for the sorted
1263    /// workload; use `iter_all` for the unsorted large-scan
1264    /// workload.
1265    ///
1266    /// # Examples
1267    ///
1268    /// Streaming a small collection and folding into a running sum.
1269    /// The iterator's peak memory stays bounded regardless of how
1270    /// many documents the collection holds:
1271    ///
1272    /// ```
1273    /// # fn main() -> obj::Result<()> {
1274    /// use obj::Db;
1275    /// use serde::{Deserialize, Serialize};
1276    ///
1277    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1278    /// struct Order { total_cents: u64 }
1279    ///
1280    /// let dir = tempfile::tempdir()?;
1281    /// let db = Db::open(dir.path().join("iter.obj"))?;
1282    /// for i in 0..5u64 {
1283    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1284    /// }
1285    ///
1286    /// let mut total: u64 = 0;
1287    /// for step in db.iter_all::<Order>()? {
1288    ///     let (_id, doc) = step?;
1289    ///     total = total
1290    ///         .checked_add(doc.total_cents)
1291    ///         .ok_or(obj::Error::InvalidArgument("overflow"))?;
1292    /// }
1293    /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1294    /// # Ok(())
1295    /// # }
1296    /// ```
1297    ///
1298    /// # Errors
1299    ///
1300    /// - As [`Db::read_transaction`] (construction-time).
1301    /// - [`Error::CollectionNotFound`] if the collection is not yet
1302    ///   registered at the snapshot's pinned LSN.
1303    /// - Per-step iteration may yield `Some(Err(_))` for pager,
1304    ///   B-tree, or codec failures.
1305    pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1306        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1307        let txn = ReadTxn::new(inner);
1308        // Resolve the descriptor up front so an absent collection
1309        // surfaces at construction (matching `Db::all`'s pre-M8
1310        // contract). The handle's lifetime is bound to `txn`; we
1311        // drop the handle immediately and stash the descriptor.
1312        let descriptor = {
1313            let coll = txn.collection::<T>()?;
1314            coll.descriptor().clone()
1315        };
1316        Ok(IterAll {
1317            txn,
1318            descriptor,
1319            buffer: VecDeque::new(),
1320            last_emitted_key: None,
1321            finished: false,
1322            _phantom: PhantomData,
1323        })
1324    }
1325}
1326
1327/// Streaming iterator returned by [`Db::iter_all`].
1328///
1329/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1330/// is consistent with the snapshot pinned at construction. Yields
1331/// `Result<(Id, T)>` one entry at a time; refills its internal
1332/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1333/// pager-lock acquisition, so peak memory stays bounded at a small
1334/// constant regardless of the collection's size — power-of-ten
1335/// Rule 3.
1336///
1337/// Construction errors surface at the [`Db::iter_all`] call site;
1338/// per-step errors (pager, B-tree, codec) surface as
1339/// `Some(Err(_))` during iteration and do NOT terminate it — the
1340/// caller decides whether to continue.
1341// Note: `IterAll<T>` is deliberately outside the `serde` surface
1342// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1343// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1344// neither of which is serializable in a meaningful way. Users who
1345// need an off-line representation of iteration output should
1346// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1347// resulting `Vec<T>` themselves.
1348pub struct IterAll<'db, T> {
1349    /// Owns the snapshot pin for the iterator's lifetime.
1350    txn: ReadTxn<'db>,
1351    /// Cached primary-tree root + collection id (`Document::decode`
1352    /// needs the collection id to validate the per-doc header).
1353    descriptor: CollectionDescriptor,
1354    /// Pre-decoded buffer of upcoming entries.
1355    buffer: VecDeque<Result<(Id, T)>>,
1356    /// Resumption marker — `Excluded(last_emitted_key)` is the
1357    /// start bound of the next refill.
1358    last_emitted_key: Option<Vec<u8>>,
1359    /// `true` once the underlying B-tree iterator returned no more
1360    /// entries; subsequent `next` calls return `None`.
1361    finished: bool,
1362    /// Track the `T` parameter without owning a value.
1363    _phantom: PhantomData<fn() -> T>,
1364}
1365
1366impl<T> std::fmt::Debug for IterAll<'_, T> {
1367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1368        f.debug_struct("IterAll")
1369            .field("collection_id", &self.descriptor.collection_id)
1370            .field("buffer_len", &self.buffer.len())
1371            .field("finished", &self.finished)
1372            .finish_non_exhaustive()
1373    }
1374}
1375
1376impl<T: Document + Send + 'static> IterAll<'_, T> {
1377    /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1378    /// entries, resuming from `last_emitted_key`. Stores per-doc
1379    /// decode errors as `Err` in the buffer so the caller can
1380    /// observe them via `next()` without aborting iteration.
1381    ///
1382    /// Power-of-ten Rule 7: every fallible operation is either
1383    /// captured into the buffer or returned up-front via `?` (which
1384    /// propagates the lock-acquisition / B-tree-open failures
1385    /// before the buffer is touched).
1386    fn refill(&mut self) -> Result<()> {
1387        // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1388        // NOT keep an immutable borrow of `self.txn` alive across
1389        // the buffer-mutation calls below.
1390        let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1391        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1392            kind: obj_core::LockKind::WriterInProcess,
1393        })?;
1394        let root_pid = PageId::new(self.descriptor.primary_root)
1395            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1396        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1397        let start = match &self.last_emitted_key {
1398            Some(k) => Bound::Excluded(k.clone()),
1399            None => Bound::Unbounded,
1400        };
1401        let collection_id = self.descriptor.collection_id;
1402        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1403        let mut yielded: usize = 0;
1404        let mut last_key: Option<Vec<u8>> = None;
1405        let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1406        for step in iter {
1407            if yielded >= ITER_ALL_BATCH {
1408                break;
1409            }
1410            yielded = yielded
1411                .checked_add(1)
1412                .ok_or(Error::BTreeInvariantViolated {
1413                    reason: "iter_all batch counter overflow",
1414                })?;
1415            buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1416        }
1417        if yielded < ITER_ALL_BATCH {
1418            self.finished = true;
1419        }
1420        // The lock is held until `pager` drops at function end — but
1421        // by the time we mutate `self.buffer` / `self.last_emitted_key`
1422        // below, the `iter` (which borrowed `pager`) is already
1423        // dropped (its scope ended). Move the staged batch over.
1424        drop(pager);
1425        self.buffer.extend(batch);
1426        if let Some(k) = last_key {
1427            self.last_emitted_key = Some(k);
1428        }
1429        Ok(())
1430    }
1431}
1432
1433/// Process one B-tree iterator step into the staged `batch`. Decode
1434/// errors and corruption errors are captured as `Err` entries so
1435/// they surface via `next()` rather than aborting the refill —
1436/// power-of-ten Rule 7.
1437///
1438/// Free function (rather than `&mut self`) so the caller can keep
1439/// the pager lock open across multiple buffer-pushes without the
1440/// borrow checker tripping on a second `&mut self.buffer` borrow.
1441fn buffer_one_entry<T: Document>(
1442    batch: &mut VecDeque<Result<(Id, T)>>,
1443    last_key: &mut Option<Vec<u8>>,
1444    collection_id: u32,
1445    step: Result<(Vec<u8>, Vec<u8>)>,
1446) {
1447    let (key, value) = match step {
1448        Ok(kv) => kv,
1449        Err(e) => {
1450            batch.push_back(Err(e));
1451            return;
1452        }
1453    };
1454    let Some(id) = Id::from_be_bytes(&key) else {
1455        batch.push_back(Err(Error::InvalidArgument(
1456            "primary B-tree key is not an Id",
1457        )));
1458        return;
1459    };
1460    *last_key = Some(key);
1461    let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1462    batch.push_back(decoded.map(|doc| (id, doc)));
1463}
1464
1465impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1466    type Item = Result<(Id, T)>;
1467
1468    fn next(&mut self) -> Option<Self::Item> {
1469        if let Some(item) = self.buffer.pop_front() {
1470            return Some(item);
1471        }
1472        if self.finished {
1473            return None;
1474        }
1475        if let Err(e) = self.refill() {
1476            // A lock / open failure surfaces here ONCE and then
1477            // `finished` is flipped so subsequent `next` calls
1478            // terminate cleanly. The error is surfaced to the
1479            // caller via `Some(Err(_))` — power-of-ten Rule 7.
1480            self.finished = true;
1481            return Some(Err(e));
1482        }
1483        self.buffer.pop_front()
1484    }
1485}
1486
1487/// Split a possibly-namespaced collection name into its
1488/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1489/// only; downstream collection names may contain further dots.
1490///
1491/// `"users"` → `(None, "users")`.
1492/// `"archive.orders"` → `(Some("archive"), "orders")`.
1493/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1494#[must_use]
1495pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1496    match name.find('.') {
1497        Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1498        None => (None, name),
1499    }
1500}
1501
1502/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1503/// concurrent reader / single-writer workloads.  The thread-safety
1504/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1505const _: () = {
1506    fn assert_send_sync<T: Send + Sync>() {}
1507    let _ = assert_send_sync::<Db>;
1508};
1509
1510/// Translate the first failure in `report` (if any) into the
1511/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1512/// open-time fast check so the caller sees `Err(Error::Corruption {
1513/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1514/// in the returned error is the locus of the failure when one is
1515/// available; for failures whose locus is a non-page (e.g. an
1516/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1517/// catalog root page-id is used as a stand-in.
1518fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1519    let first = report.failures.first()?;
1520    let err = match first {
1521        obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1522        | obj_core::IntegrityFailure::OrphanPage { page_id }
1523        | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1524        | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1525        | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1526        | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1527        | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1528            Error::Corruption { page_id: *page_id }
1529        }
1530        obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1531            Error::BTreeDepthExceeded { limit: *limit }
1532        }
1533        // The remaining variants — OrphanIndexEntry,
1534        // MissingIndexEntry, and any future `#[non_exhaustive]`
1535        // additions — are never emitted by `quick_check` (which
1536        // walks the catalog tree only, not the per-collection
1537        // primary or index trees). The defensive fallback is a
1538        // coarse `Corruption { page_id: 0 }` so the open-time
1539        // check still fails closed if the obj-core layer ever
1540        // grows a new fast-check failure mode.
1541        _ => Error::Corruption { page_id: 0 },
1542    };
1543    Some(err)
1544}