Skip to main content

obj/
db.rs

1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
5//! § API and pinned in
6//! [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
7//! § Db public API contract.
8
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::marker::PhantomData;
11use std::ops::Bound;
12use std::path::Path;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use obj_core::btree::BTree;
17use obj_core::pager::page::PageId;
18use obj_core::pager::{lock_path_for, Pager};
19use obj_core::platform::FileHandle;
20use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
21
22use crate::config::Config;
23use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
24
25/// Per-batch refill size for [`IterAll`]. The iterator yields one
26/// document at a time to the caller, but internally fetches the
27/// primary B-tree in `BATCH` chunks so the per-step pager-lock
28/// acquisition cost amortises over many `next` calls. Power-of-ten
29/// Rule 3: the buffer is fixed-size (constant 256 entries — at
30/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
31/// with the collection's total size.
32const ITER_ALL_BATCH: usize = 256;
33
34/// The embedded document database.
35///
36/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
37/// concurrent reader / single-writer access.  See
38/// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
39/// § "Concurrent readers, one writer".
40///
41/// At M6 the public `Db` is hard-typed against
42/// `obj_core::FileHandle`.  A future refactor may make it generic
43/// over `F: FileBackend` so fault-injection harnesses can build on
44/// the same API; today the test helpers reach for the lower-level
45/// `obj-core` building blocks instead.
46pub struct Db {
47    pub(crate) env: Arc<TxnEnv<FileHandle>>,
48    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
49    pub(crate) readonly: bool,
50    pub(crate) busy_timeout: Duration,
51    /// Per-process cache of `(collection, version)` keys whose
52    /// `Document::indexes()` reconciliation has already run. M7
53    /// #57: reconciliation is idempotent but a catalog walk +
54    /// declaration cycle is non-trivial — caching ensures only the
55    /// first `WriteTxn::collection::<T>()` call per process per
56    /// `(collection, version)` pays the cost. #130: the version is part
57    /// of the key so a later schema version that ADDS an index still
58    /// reconciles (the name-only key skipped it, leaving the new index
59    /// never `Active`).
60    pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
61    /// M11 #93: registry of attached read-only databases keyed by
62    /// namespace. Populated by [`Db::attach`]; consulted by
63    /// [`Db::collection_namespace`] and by
64    /// [`Db::read_transaction`] when it pins per-attached
65    /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
66    /// `&mut self` while live read transactions hold borrows into
67    /// the registry; the mutex coordinates the two.
68    pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
69    /// #83 (c): published length of `attached`, mutated only while the
70    /// `attached` mutex is held. A relaxed load lets the hot read path
71    /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
72    /// no databases are attached (the overwhelmingly common case).
73    pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
74}
75
76impl std::fmt::Debug for Db {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("Db")
79            .field("readonly", &self.readonly)
80            .field("busy_timeout", &self.busy_timeout)
81            .finish_non_exhaustive()
82    }
83}
84
85// ---------- FFI plumbing ------------------------------------------
86//
87// The accessors below expose the shareable internals of `Db` so the
88// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
89// iterator handles without re-implementing the lifecycle. They are
90// marked `#[doc(hidden)]` because user-Rust code should reach for
91// the typed `Db::transaction` / `Db::read_transaction` API; the
92// raw accessors exist so a sibling FFI / native-host crate can
93// build its own self-contained handle types.
94impl Db {
95    /// Shared environment Arc — pager + cross-process lock file.
96    ///
97    /// Used by [`libobj`](../../libobj/index.html) to build owned
98    /// transaction handles whose lifetime extends past a single
99    /// `Db::transaction` closure call.
100    #[doc(hidden)]
101    #[must_use]
102    pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
103        Arc::clone(&self.env)
104    }
105
106    /// Shared catalog Arc.
107    ///
108    /// Used by [`libobj`](../../libobj/index.html) for the same
109    /// reason as [`Self::env_arc`].
110    #[doc(hidden)]
111    #[must_use]
112    pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
113        Arc::clone(&self.catalog)
114    }
115
116    /// Shared per-process reconciliation cache Arc.
117    ///
118    /// Used by [`libobj`](../../libobj/index.html) for the same
119    /// reason as [`Self::env_arc`].
120    #[doc(hidden)]
121    #[must_use]
122    pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
123        Arc::clone(&self.reconciled)
124    }
125
126    /// Busy-lock timeout configured at open time.
127    #[doc(hidden)]
128    #[must_use]
129    pub fn busy_timeout(&self) -> Duration {
130        self.busy_timeout
131    }
132}
133
134impl Db {
135    /// Open or create a file-backed database at `path` with default
136    /// configuration.
137    ///
138    /// Creates the file if absent; reopens otherwise. A `Db` is
139    /// `Send + Sync` — share across threads via `Arc<Db>` for the
140    /// concurrent-reader / single-writer workload documented in
141    /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md).
142    ///
143    /// For an ephemeral database use [`Db::memory`]; for a
144    /// read-only handle that coexists with another process's writer
145    /// use [`Db::open_readonly`]; for custom durability /
146    /// cache / lock knobs use [`Db::open_with`] + [`Config`].
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// # fn main() -> obj::Result<()> {
152    /// use obj::Db;
153    ///
154    /// let dir = tempfile::tempdir()?;
155    ///
156    /// // File-backed. Creates the file if absent; reopens otherwise.
157    /// let _db = Db::open(dir.path().join("app.obj"))?;
158    ///
159    /// // In-memory. No persistence, no file locks. Useful for tests.
160    /// let _mem = Db::memory()?;
161    ///
162    /// // Read-only. Coexists safely with a writer in another process.
163    /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
164    /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
165    /// # Ok(())
166    /// # }
167    /// ```
168    ///
169    /// # Errors
170    ///
171    /// Returns the underlying [`Error`] from
172    /// [`obj_core::pager::Pager::open`] on syscall or format
173    /// failure.
174    #[cfg_attr(
175        feature = "tracing",
176        tracing::instrument(
177            name = "db.open",
178            level = "info",
179            skip_all,
180            fields(path = %path.as_ref().display()),
181        )
182    )]
183    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
184        Self::open_with(path, Config::default())
185    }
186
187    /// Open or create a file-backed database with `config`.
188    ///
189    /// # Errors
190    ///
191    /// As [`Db::open`].
192    // Takes `Config` by value: this is the frozen builder-handoff
193    // signature (the caller's `Config::default().sync_mode(..)...`
194    // chain ends here). `Config` is no longer `Copy` (issue #17), so
195    // clippy now sees the by-value argument as only borrowed in the
196    // body — but the by-value convention is part of the 1.0 surface
197    // and must not change. Power-of-ten Rule 10: documented allow.
198    #[allow(clippy::needless_pass_by_value)]
199    #[cfg_attr(
200        feature = "tracing",
201        tracing::instrument(
202            name = "db.open",
203            level = "info",
204            skip_all,
205            fields(path = %path.as_ref().display()),
206        )
207    )]
208    pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
209        let path_buf = path.as_ref().to_path_buf();
210        // Issue #31: the pager `Config` is no longer `Copy` (its
211        // `encryption_key` zeroizes on drop), so move it out of
212        // `config` rather than copying it — `from_parts` only reads
213        // the remaining scalar fields. `mem::take` leaves a default
214        // (keyless) pager `Config` behind, which is never read again.
215        let pager_config = std::mem::take(&mut config.pager);
216        let pager = Pager::open(&path_buf, pager_config)?;
217        // Issue #1: cross-process locks anchor against a dedicated
218        // `<db>.obj-lock` sidecar file. Keeping the lock byte out
219        // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
220        // Windows once the DB grows past whatever offset the
221        // lock byte would otherwise sit at (`LockFileEx` is
222        // mandatory). The sidecar is left in place across opens;
223        // deleting it would race two openers into two different
224        // inodes and miss each other's exclusion.
225        let lock_file = if config.cross_process_lock {
226            let lock_path = lock_path_for(&path_buf);
227            let handle = FileHandle::open_or_create(&lock_path)?;
228            // Materialise the locked byte range as real file
229            // content. POSIX OFD and Windows `LockFileEx` both
230            // permit locking past EOF, but a non-empty sidecar
231            // is the most conservative choice across kernels
232            // (and lets the same offsets work on every platform).
233            // 128 bytes covers WRITER_LOCK (96) and the full
234            // READER_LOCK_RANGE (97..128).
235            handle.set_len(128)?;
236            Some(Arc::new(handle))
237        } else {
238            None
239        };
240        Self::from_parts(pager, lock_file, &config)
241    }
242
243    /// Open a fresh in-memory database.  No persistence, no file
244    /// locks.  Useful for unit tests and ephemeral workloads.
245    ///
246    /// # Errors
247    ///
248    /// Returns [`Error::InvalidArgument`] only if `config` has
249    /// zero cache frames.
250    pub fn memory() -> Result<Self> {
251        Self::memory_with(Config::default())
252    }
253
254    /// As [`Db::memory`] with a caller-supplied [`Config`].
255    ///
256    /// # Errors
257    ///
258    /// As [`Db::memory`].
259    // By-value `Config` is the frozen builder-handoff signature; see
260    // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
261    // allow (issue #17 dropped `Copy`, which made the lint fire).
262    #[allow(clippy::needless_pass_by_value)]
263    pub fn memory_with(mut config: Config) -> Result<Self> {
264        // Issue #31: move the (no-longer-`Copy`) pager `Config` out
265        // rather than copying it; `from_parts` only reads the
266        // remaining scalar fields. See `open_with` for the rationale.
267        let pager_config = std::mem::take(&mut config.pager);
268        let pager = Pager::memory(pager_config)?;
269        Self::from_parts(pager, None, &config)
270    }
271
272    /// Open the database at `path` in read-only mode.  The
273    /// resulting `Db` rejects every mutating operation with
274    /// `Err(Error::ReadOnly { ... })`.  Coexists safely with a
275    /// writer in another process via the cross-process reader
276    /// lock.
277    ///
278    /// # Errors
279    ///
280    /// As [`Db::open`].
281    pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
282        let config = Config {
283            readonly: true,
284            ..Config::default()
285        };
286        Self::open_with(path, config)
287    }
288
289    fn from_parts(
290        mut pager: Pager<FileHandle>,
291        lock_file: Option<Arc<FileHandle>>,
292        config: &Config,
293    ) -> Result<Self> {
294        // #64: catalog init (and its `tree.insert` → `alloc_page` /
295        // `set_root_catalog` chain) mutates the file header. Header
296        // mutations now ride the WAL via `stage_or_write_header`,
297        // which requires an open Pager txn so the staged page-0 frame
298        // commits atomically with the B-tree pages it depends on.
299        // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
300        // a `Db::open` against a fresh file is durable on return
301        // (matching the pre-#64 contract). `open_or_init` on an
302        // already-initialised database is read-only and the wrapped
303        // `commit()` is a no-op (`Pager::commit` short-circuits on an
304        // empty pending set + clean `header_dirty`).
305        pager.begin_txn();
306        let init = Catalog::open_or_init(&mut pager);
307        let catalog = match init {
308            Ok(c) => {
309                let commit_result = pager.commit();
310                pager.end_txn();
311                commit_result?;
312                c
313            }
314            Err(e) => {
315                pager.end_txn();
316                return Err(e);
317            }
318        };
319        // M11 #91: lightweight open-time integrity check. Runs the
320        // catalog-portion of `integrity_check` and surfaces any
321        // detected failure as the strongest available error so the
322        // caller decides whether to attempt repair or abort. Opt out
323        // via `Config::skip_open_check(true)`.
324        if !config.skip_open_check {
325            let report = obj_core::integrity::quick_check(&mut pager)?;
326            if let Some(err) = first_failure_as_error(&report) {
327                return Err(err);
328            }
329        }
330        Ok(Self {
331            env: Arc::new(TxnEnv::new(pager, lock_file)),
332            catalog: Arc::new(Mutex::new(catalog)),
333            readonly: config.readonly,
334            busy_timeout: config.busy_timeout,
335            reconciled: Arc::new(Mutex::new(HashSet::new())),
336            attached: Arc::new(Mutex::new(HashMap::new())),
337            attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
338        })
339    }
340
341    /// Run a closure inside a write transaction.
342    ///
343    /// Begins a [`WriteTxn`], runs the closure with `&mut tx`.  If
344    /// the closure returns `Ok(r)`, the transaction is committed and
345    /// `Ok(r)` is returned.  If the closure returns `Err(e)`, the
346    /// transaction is rolled back and `Err(e)` is returned.  A
347    /// panic inside the closure unwinds with an implicit rollback
348    /// via the `WriteTxn` `Drop` impl. See
349    /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md)
350    /// for the lock-acquisition contract.
351    ///
352    /// # Examples
353    ///
354    /// Atomic batch — both inserts commit together or not at all:
355    ///
356    /// ```
357    /// # fn main() -> obj::Result<()> {
358    /// use obj::Db;
359    /// use serde::{Deserialize, Serialize};
360    ///
361    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
362    /// struct Order {
363    ///     customer_id: u64,
364    ///     total_cents: u64,
365    /// }
366    ///
367    /// let dir = tempfile::tempdir()?;
368    /// let db = Db::open(dir.path().join("txn.obj"))?;
369    ///
370    /// let (a, b) = db.transaction(|tx| {
371    ///     let coll = tx.collection::<Order>()?;
372    ///     let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
373    ///     let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
374    ///     Ok((a, b))
375    /// })?;
376    /// assert_ne!(a, b, "freshly-allocated ids are distinct");
377    /// # Ok(())
378    /// # }
379    /// ```
380    ///
381    /// Returning `Err(_)` rolls every staged write back; the `Err`
382    /// the closure returns is the `Err` the caller sees:
383    ///
384    /// ```
385    /// # fn main() -> obj::Result<()> {
386    /// use obj::{Db, Error};
387    /// use serde::{Deserialize, Serialize};
388    ///
389    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
390    /// struct Order { total_cents: u64 }
391    ///
392    /// let dir = tempfile::tempdir()?;
393    /// let db = Db::open(dir.path().join("rollback.obj"))?;
394    /// let id = db.insert(Order { total_cents: 10 })?;
395    ///
396    /// let outcome: obj::Result<()> = db.transaction(|tx| {
397    ///     let coll = tx.collection::<Order>()?;
398    ///     coll.update(id, |o| { o.total_cents = 99_999; })?;
399    ///     Err(Error::InvalidArgument("synthetic abort"))
400    /// });
401    /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
402    ///
403    /// let after: Order = db
404    ///     .get::<Order>(id)?
405    ///     .ok_or(Error::InvalidArgument("just inserted"))?;
406    /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
407    /// # Ok(())
408    /// # }
409    /// ```
410    ///
411    /// # Errors
412    ///
413    /// - [`Error::ReadOnly`] if the database was opened read-only.
414    /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
415    /// - Any error the closure returns.
416    #[cfg_attr(
417        feature = "tracing",
418        tracing::instrument(name = "db.transaction", level = "info", skip_all)
419    )]
420    pub fn transaction<R, F>(&self, body: F) -> Result<R>
421    where
422        F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
423    {
424        if self.readonly {
425            return Err(Error::ReadOnly {
426                operation: "transaction",
427            });
428        }
429        #[cfg(feature = "tracing")]
430        tracing::debug!("begin");
431        let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
432        let mut tx = WriteTxn::new(
433            inner,
434            Arc::clone(&self.catalog),
435            Arc::clone(&self.reconciled),
436        );
437        match body(&mut tx) {
438            Ok(value) => {
439                tx.commit()?;
440                #[cfg(feature = "tracing")]
441                tracing::debug!("commit");
442                Ok(value)
443            }
444            Err(e) => {
445                // Best-effort rollback; surface the closure's
446                // error.  Refresh the in-memory catalog after
447                // rollback so its B-tree root state matches the
448                // file's `root_catalog` header field — catalog
449                // mutations write the header directly (not through
450                // the WAL) so a rollback leaves the header pointing
451                // at the most-recent in-memory root, which is
452                // exactly what re-opening the catalog will pick up.
453                let _ = tx.rollback();
454                let _ = self.refresh_catalog();
455                #[cfg(feature = "tracing")]
456                tracing::debug!("rollback");
457                Err(e)
458            }
459        }
460    }
461
462    /// Re-open the in-memory `Catalog` handle from the pager.  Used
463    /// after a transaction rollback to discard the catalog's in-
464    /// memory `next_collection_id` / `tree.root` state that may
465    /// have advanced during the rolled-back closure.
466    fn refresh_catalog(&self) -> Result<()> {
467        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
468            kind: obj_core::LockKind::WriterInProcess,
469        })?;
470        let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
471        let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
472            kind: obj_core::LockKind::WriterInProcess,
473        })?;
474        *existing = fresh;
475        Ok(())
476    }
477
478    /// Run a closure inside a read transaction.  See
479    /// [`Self::transaction`] for the closure shape and atomicity
480    /// contract; reads inside `body` observe a consistent
481    /// snapshot of the database.
482    ///
483    /// Every read inside the closure observes a single consistent
484    /// snapshot — the snapshot is pinned at the moment the closure
485    /// begins. Concurrent writers do not affect what the closure
486    /// sees.
487    ///
488    /// # Examples
489    ///
490    /// Two reads inside one `read_transaction` see the same value
491    /// even if a writer commits in between:
492    ///
493    /// ```
494    /// # fn main() -> obj::Result<()> {
495    /// use obj::Db;
496    /// use serde::{Deserialize, Serialize};
497    ///
498    /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
499    /// struct Order { total_cents: u64 }
500    ///
501    /// let dir = tempfile::tempdir()?;
502    /// let db = Db::open(dir.path().join("read.obj"))?;
503    /// let id = db.insert(Order { total_cents: 10 })?;
504    ///
505    /// let (a, b) = db.read_transaction(|tx| {
506    ///     let coll = tx.collection::<Order>()?;
507    ///     let a = coll.get(id)?;
508    ///     let b = coll.get(id)?;
509    ///     Ok((a, b))
510    /// })?;
511    /// assert_eq!(a, b);
512    /// # Ok(())
513    /// # }
514    /// ```
515    ///
516    /// # Errors
517    ///
518    /// - [`Error::Busy`] if the reader lock could not be acquired.
519    /// - Any error the closure returns.
520    #[cfg_attr(
521        feature = "tracing",
522        tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
523    )]
524    pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
525    where
526        F: FnOnce(&ReadTxn<'_>) -> Result<R>,
527    {
528        #[cfg(feature = "tracing")]
529        tracing::debug!("begin");
530        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
531        // M11 #93: pin one snapshot per attached database so reads
532        // through the closure observe a consistent view per file.
533        let attached_contexts = self.pin_attached_snapshots()?;
534        let tx = ReadTxn::with_attached(inner, attached_contexts);
535        let result = body(&tx);
536        #[cfg(feature = "tracing")]
537        tracing::debug!("commit");
538        result
539    }
540
541    /// Snapshot every attached database into a per-namespace
542    /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
543    /// pin; dropping the returned map releases every pin.
544    fn pin_attached_snapshots(
545        &self,
546    ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
547        // #83 (c): the common case is no attached databases. A single
548        // relaxed load of `attached_len` lets the empty path skip the
549        // `self.attached` mutex acquire entirely (`with_capacity(0)`
550        // already does not allocate, so the mutex is the only cost).
551        //
552        // Correctness under concurrent detach: the map produced here
553        // feeds only namespaced read dispatch in `open_readonly_named`
554        // (`<ns>.<tail>` → attached snapshot). With zero attachments no
555        // namespace can resolve, so an empty map is observably the same
556        // as the locked build. `attached_len` is mutated only while the
557        // `attached` mutex is held (in `attach` / `detach`), so a `0`
558        // observed here means "no attachment is committed to the map",
559        // matching what a lock-and-read would have seen at that instant.
560        // A concurrent attach that has not yet published its count is
561        // exactly an attach ordered *after* this txn began — invisible
562        // by design, same as the cross-process snapshot semantics.
563        if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
564            return Ok(std::collections::HashMap::new());
565        }
566        let registry = self.attached.lock().map_err(|_| Error::Busy {
567            kind: obj_core::LockKind::WriterInProcess,
568        })?;
569        let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
570            std::collections::HashMap::with_capacity(registry.len());
571        for (namespace, attached) in registry.iter() {
572            let env = Arc::clone(&attached.env);
573            let snapshot = {
574                let mut pager = env.pager().lock().map_err(|_| Error::Busy {
575                    kind: obj_core::LockKind::WriterInProcess,
576                })?;
577                pager.reader_snapshot()?
578            };
579            out.insert(
580                namespace.clone(),
581                crate::txn::AttachedReadCtx { env, snapshot },
582            );
583        }
584        Ok(out)
585    }
586
587    /// Pin one [`crate::txn::AttachedReadCtx`] for the single attachment
588    /// registered under `namespace`. Used by [`Self::dump_raw`]'s
589    /// namespaced full-scan path: it needs exactly one attached env +
590    /// pinned snapshot, not the whole per-namespace map
591    /// [`Self::pin_attached_snapshots`] builds.
592    ///
593    /// The snapshot is pinned for the returned context's lifetime; the
594    /// caller (the `DumpIter`) owns the context, so the pin holds for
595    /// the iterator's whole life and a concurrent `detach` cannot
596    /// invalidate the in-flight scan.
597    ///
598    /// # Errors
599    ///
600    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is not
601    ///   attached on this handle.
602    /// - [`Error::Busy`] if the registry / pager mutex is poisoned.
603    pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
604        let env = {
605            let registry = self.attached.lock().map_err(|_| Error::Busy {
606                kind: obj_core::LockKind::WriterInProcess,
607            })?;
608            let attached =
609                registry
610                    .get(namespace)
611                    .ok_or_else(|| Error::CollectionNamespaceUnknown {
612                        namespace: namespace.to_owned(),
613                    })?;
614            Arc::clone(&attached.env)
615        };
616        let snapshot = {
617            let mut pager = env.pager().lock().map_err(|_| Error::Busy {
618                kind: obj_core::LockKind::WriterInProcess,
619            })?;
620            pager.reader_snapshot()?
621        };
622        Ok(crate::txn::AttachedReadCtx { env, snapshot })
623    }
624
625    /// Attach the database at `path` under `namespace`. The
626    /// attached file is opened read-only; collections in the
627    /// attached file become visible to subsequent
628    /// `Db::collection::<T>()` calls (and the one-shot per-op API
629    /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
630    /// form `<namespace>.<collection_name>`.
631    ///
632    /// Writes against namespaced collections return
633    /// [`Error::AttachedDatabaseIsReadOnly`].
634    ///
635    /// Each attached database gets its own snapshot pinned at
636    /// read-transaction begin; [`Db::detach`] removes the registry
637    /// entry but in-flight reads complete against their pinned
638    /// snapshot.
639    ///
640    /// # Examples
641    ///
642    /// ```
643    /// # fn main() -> obj::Result<()> {
644    /// use obj::Db;
645    /// use serde::{Deserialize, Serialize};
646    ///
647    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
648    /// #[obj(collection = "orders_attach_doc")]
649    /// struct Order { total_cents: u64 }
650    ///
651    /// // Same struct shape, namespaced collection name. Reads
652    /// // against this type route to the attached "archive" db.
653    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
654    /// #[obj(collection = "archive.orders_attach_doc")]
655    /// struct ArchivedOrder { total_cents: u64 }
656    ///
657    /// let dir = tempfile::tempdir()?;
658    /// let live = dir.path().join("live.obj");
659    /// let archive = dir.path().join("archive.obj");
660    ///
661    /// // Seed the archive (writes go through its own un-namespaced type).
662    /// {
663    ///     let archive_db = Db::open(&archive)?;
664    ///     let _ = archive_db.insert(Order { total_cents: 999 })?;
665    /// }
666    ///
667    /// // Open the live db, attach the archive under "archive".
668    /// let mut db = Db::open(&live)?;
669    /// let _ = db.insert(Order { total_cents: 100 })?;
670    /// db.attach(&archive, "archive")?;
671    ///
672    /// // One read transaction, two collections.
673    /// db.read_transaction(|tx| {
674    ///     let live = tx.collection::<Order>()?;
675    ///     let arch = tx.collection::<ArchivedOrder>()?;
676    ///     assert_eq!(live.all()?.len(), 1);
677    ///     assert_eq!(arch.all()?.len(), 1);
678    ///     Ok(())
679    /// })?;
680    ///
681    /// db.detach("archive")?;
682    /// # Ok(())
683    /// # }
684    /// ```
685    ///
686    /// # Errors
687    ///
688    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
689    ///   use on this `Db`.
690    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
691    ///   opened read-only.
692    pub fn attach<P: AsRef<std::path::Path>>(
693        &mut self,
694        path: P,
695        namespace: impl Into<String>,
696    ) -> Result<()> {
697        // The `&mut self` signature is preserved for API stability,
698        // but the attachment registry is interior-mutable
699        // (`Arc<Mutex<_>>` + `Arc<AtomicUsize>`), so the work is
700        // delegated to the `&self` core shared with
701        // [`Self::attach_shared`].
702        self.attach_inner(path.as_ref(), namespace.into())
703    }
704
705    /// Shared-reference (`&self`) form of [`Self::attach`]. Attaches
706    /// the database at `path` under `namespace` through `&self`, for
707    /// callers that hold a shared handle (e.g. an `Arc<Db>`) and
708    /// cannot obtain `&mut self`.
709    ///
710    /// Behaviour is identical to [`Self::attach`]: the attachment
711    /// registry is interior-mutable, guarded by the same per-`Db`
712    /// mutex, so concurrent `attach_shared` / `detach_shared` calls
713    /// from multiple threads serialise on that mutex. The duplicate-
714    /// namespace re-check after the read-only open closes the race
715    /// between two concurrent attaches of the same namespace.
716    ///
717    /// # Examples
718    ///
719    /// ```
720    /// # fn main() -> obj::Result<()> {
721    /// use std::sync::Arc;
722    /// use obj::Db;
723    ///
724    /// let dir = tempfile::tempdir()?;
725    /// let live = dir.path().join("live.obj");
726    /// let archive = dir.path().join("archive.obj");
727    /// let _ = Db::open(&archive)?;
728    ///
729    /// // A shared handle cannot call `&mut self` `attach`, but can
730    /// // call `attach_shared`.
731    /// let db = Arc::new(Db::open(&live)?);
732    /// db.attach_shared(&archive, "archive")?;
733    /// db.detach_shared("archive")?;
734    /// # Ok(())
735    /// # }
736    /// ```
737    ///
738    /// # Errors
739    ///
740    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
741    ///   use on this `Db`.
742    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
743    ///   opened read-only.
744    /// - [`Error::Busy`] if the registry mutex is poisoned.
745    pub fn attach_shared<P: AsRef<std::path::Path>>(
746        &self,
747        path: P,
748        namespace: impl Into<String>,
749    ) -> Result<()> {
750        self.attach_inner(path.as_ref(), namespace.into())
751    }
752
753    /// `&self` core shared by [`Self::attach`] and
754    /// [`Self::attach_shared`]. Both signatures are thin wrappers; the
755    /// mutation flows through the interior-mutable `attached` registry
756    /// regardless of the public method's receiver.
757    fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
758        let path_buf = path.to_path_buf();
759        {
760            let registry = self.attached.lock().map_err(|_| Error::Busy {
761                kind: obj_core::LockKind::WriterInProcess,
762            })?;
763            if registry.contains_key(&namespace) {
764                return Err(Error::AttachmentAlreadyExists { namespace });
765            }
766        }
767        let attached_db =
768            Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
769                path: path_buf.clone(),
770                source: Box::new(source),
771            })?;
772        let env = Arc::clone(&attached_db.env);
773        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
774            kind: obj_core::LockKind::WriterInProcess,
775        })?;
776        // Re-check after acquiring the lock — a sibling caller may
777        // have raced; the `Arc<Mutex<_>>` shape makes the check
778        // necessary on the second acquire.
779        if registry.contains_key(&namespace) {
780            return Err(Error::AttachmentAlreadyExists { namespace });
781        }
782        registry.insert(
783            namespace,
784            AttachedDb {
785                env,
786                _db: attached_db,
787            },
788        );
789        // #83 (c): publish the new length while still holding the
790        // `attached` mutex so `pin_attached_snapshots`' relaxed load is
791        // never stale relative to a committed attachment.
792        self.attached_len
793            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
794        Ok(())
795    }
796
797    /// Remove the attachment registered under `namespace`. Returns
798    /// [`Error::CollectionNamespaceUnknown`] if the namespace is
799    /// not attached.
800    ///
801    /// In-flight read transactions hold their own snapshot pins on
802    /// the attached env; detach removes the registry entry, but the
803    /// in-flight read may still complete against its pinned
804    /// snapshot.
805    ///
806    /// # Errors
807    ///
808    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
809    ///   not attached.
810    /// - [`Error::Busy`] if the registry mutex is poisoned.
811    pub fn detach(&mut self, namespace: &str) -> Result<()> {
812        // `&mut self` preserved for API stability; the registry is
813        // interior-mutable, so the work delegates to the `&self` core
814        // shared with [`Self::detach_shared`].
815        self.detach_inner(namespace)
816    }
817
818    /// Shared-reference (`&self`) form of [`Self::detach`]. Removes the
819    /// attachment registered under `namespace` through `&self`, for
820    /// callers that hold a shared handle (e.g. an `Arc<Db>`).
821    ///
822    /// Behaviour is identical to [`Self::detach`]. In-flight read
823    /// transactions hold their own snapshot pins on the attached env;
824    /// `detach_shared` removes the registry entry, but the in-flight
825    /// read may still complete against its pinned snapshot. Concurrent
826    /// `attach_shared` / `detach_shared` calls serialise on the same
827    /// per-`Db` registry mutex.
828    ///
829    /// # Errors
830    ///
831    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
832    ///   not attached.
833    /// - [`Error::Busy`] if the registry mutex is poisoned.
834    pub fn detach_shared(&self, namespace: &str) -> Result<()> {
835        self.detach_inner(namespace)
836    }
837
838    /// `&self` core shared by [`Self::detach`] and
839    /// [`Self::detach_shared`].
840    fn detach_inner(&self, namespace: &str) -> Result<()> {
841        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
842            kind: obj_core::LockKind::WriterInProcess,
843        })?;
844        if registry.remove(namespace).is_none() {
845            return Err(Error::CollectionNamespaceUnknown {
846                namespace: namespace.to_owned(),
847            });
848        }
849        // #83 (c): publish the post-detach length under the lock.
850        self.attached_len
851            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
852        Ok(())
853    }
854
855    /// Insert `doc` into its collection.  One-shot transaction;
856    /// returns the assigned [`Id`].
857    ///
858    /// The one-shot API opens, commits, and closes a private
859    /// transaction per call. Reach for [`Db::transaction`] when
860    /// several mutations must commit or roll back as a single
861    /// atomic unit.
862    ///
863    /// # Examples
864    ///
865    /// One-shot CRUD against a `Document`-derived type:
866    ///
867    /// ```
868    /// # fn main() -> obj::Result<()> {
869    /// use obj::Db;
870    /// use serde::{Deserialize, Serialize};
871    ///
872    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
873    /// struct Order {
874    ///     customer_id: u64,
875    ///     total_cents: u64,
876    ///     status: String,
877    /// }
878    ///
879    /// let dir = tempfile::tempdir()?;
880    /// let db = Db::open(dir.path().join("oneshot.obj"))?;
881    ///
882    /// // insert returns the freshly-allocated Id.
883    /// let id = db.insert(Order {
884    ///     customer_id: 1,
885    ///     total_cents: 100,
886    ///     status: "pending".to_owned(),
887    /// })?;
888    ///
889    /// // get returns Option<T>.
890    /// let _maybe: Option<Order> = db.get::<Order>(id)?;
891    ///
892    /// // update applies a closure in place.
893    /// db.update::<Order, _>(id, |o| {
894    ///     o.status = "shipped".to_owned();
895    /// })?;
896    ///
897    /// // upsert at a caller-supplied id (insert or replace).
898    /// let id2 = obj::Id::try_new(42)
899    ///     .ok_or(obj::Error::InvalidArgument("non-zero"))?;
900    /// db.upsert::<Order>(id2, Order {
901    ///     customer_id: 2,
902    ///     total_cents: 999,
903    ///     status: "new".to_owned(),
904    /// })?;
905    ///
906    /// // delete returns true if the row existed.
907    /// let existed = db.delete::<Order>(id)?;
908    /// assert!(existed);
909    /// # Ok(())
910    /// # }
911    /// ```
912    ///
913    /// # Errors
914    ///
915    /// As [`Self::transaction`] plus any error from
916    /// [`crate::Collection::insert`].
917    pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
918        self.transaction(|tx| tx.collection::<T>()?.insert(doc))
919    }
920
921    /// Fetch the document at `id`.  Returns `Ok(None)` if absent.
922    ///
923    /// # Errors
924    ///
925    /// As [`Self::read_transaction`] plus any error from
926    /// [`crate::Collection::get`].
927    pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
928        // #83 (b): the one-shot point read goes through the fused
929        // single-pager-lock path (descriptor resolve + primary get
930        // under one guard) instead of `tx.collection()?.get()`, which
931        // takes the pager mutex twice. Observably identical for the
932        // one-shot caller (same snapshot, same `CollectionNotFound`
933        // contract). Namespaced reads fall back to the handle path.
934        self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
935    }
936
937    /// Update the document at `id` via the closure.
938    ///
939    /// # Errors
940    ///
941    /// - [`Error::DocumentNotFound`] if `id` does not exist.
942    /// - As [`Self::transaction`].
943    pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
944    where
945        T: Document,
946        F: FnOnce(&mut T),
947    {
948        self.transaction(|tx| tx.collection::<T>()?.update(id, f))
949    }
950
951    /// Delete the document at `id`.  Returns `true` if it existed.
952    ///
953    /// # Errors
954    ///
955    /// As [`Self::transaction`].
956    pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
957        self.transaction(|tx| tx.collection::<T>()?.delete(id))
958    }
959
960    /// Insert or replace the document at `id`.
961    ///
962    /// # Errors
963    ///
964    /// As [`Self::transaction`].
965    pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
966        self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
967    }
968
969    /// Convenience wrapper around [`crate::Collection::find_unique`]
970    /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
971    /// Runs inside a one-shot read transaction.
972    ///
973    /// `O(log n)`, no collection scan; the lookup walks the named
974    /// index's B-tree directly. Defined only on `Unique` indexes —
975    /// for the other kinds use
976    /// [`Collection::lookup`](crate::Collection::lookup) or
977    /// [`Collection::index_range`](crate::Collection::index_range).
978    ///
979    /// # Examples
980    ///
981    /// ```
982    /// # fn main() -> obj::Result<()> {
983    /// use obj::Db;
984    /// use serde::{Deserialize, Serialize};
985    ///
986    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
987    /// #[obj(collection = "customers_find_unique_doc")]
988    /// struct Customer {
989    ///     #[obj(index = unique)]
990    ///     email: String,
991    /// }
992    ///
993    /// let dir = tempfile::tempdir()?;
994    /// let db = Db::open(dir.path().join("find-unique.obj"))?;
995    /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
996    /// let by_email: Option<Customer> = db
997    ///     .find_unique::<Customer>("email", "ada@example.com")?;
998    /// assert!(by_email.is_some());
999    /// # Ok(())
1000    /// # }
1001    /// ```
1002    ///
1003    /// # Errors
1004    ///
1005    /// As [`crate::Collection::find_unique`].
1006    pub fn find_unique<T: Document>(
1007        &self,
1008        index_name: &str,
1009        key: impl Into<obj_core::codec::Dynamic>,
1010    ) -> Result<Option<T>> {
1011        self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
1012    }
1013
1014    /// Construct a fresh M8 [`crate::Query`] builder rooted at this
1015    /// database. The builder borrows `&self` for the build phase;
1016    /// the borrow ends when [`crate::Query::fetch`] returns.
1017    ///
1018    /// Compose with [`Query::filter`](crate::Query::filter),
1019    /// [`Query::limit`](crate::Query::limit),
1020    /// [`Query::sort_by`](crate::Query::sort_by),
1021    /// [`Query::index_range`](crate::Query::index_range). Terminate
1022    /// with [`Query::fetch`](crate::Query::fetch) (for the
1023    /// documents) or [`Query::count`](crate::Query::count) (for the
1024    /// count alone).
1025    ///
1026    /// Mirrors [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1027    /// § Querying — see the M8 examples in
1028    /// [`crates/obj/tests/design_md_queries.rs`](https://github.com/uname-n/obj/blob/master/crates/obj/tests/design_md_queries.rs)
1029    /// for the full surface.
1030    ///
1031    /// # Examples
1032    ///
1033    /// Top-N matching documents by an indexed field, ascending:
1034    ///
1035    /// ```
1036    /// # fn main() -> obj::Result<()> {
1037    /// use obj::Db;
1038    /// use obj_core::codec::Dynamic;
1039    /// use serde::{Deserialize, Serialize};
1040    ///
1041    /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1042    /// enum OrderStatus { Pending, Shipped }
1043    ///
1044    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1045    /// #[obj(collection = "orders_query_doc")]
1046    /// struct Order {
1047    ///     #[obj(index)]
1048    ///     customer_id: u64,
1049    ///     status: OrderStatus,
1050    ///     #[obj(index)]
1051    ///     placed_at: u64,
1052    /// }
1053    ///
1054    /// let dir = tempfile::tempdir()?;
1055    /// let db = Db::open(dir.path().join("queries.obj"))?;
1056    /// for i in 0..20u64 {
1057    ///     let _ = db.insert(Order {
1058    ///         customer_id: i % 3,
1059    ///         status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
1060    ///         placed_at: i * 1_000,
1061    ///     })?;
1062    /// }
1063    ///
1064    /// let pending: Vec<Order> = db
1065    ///     .query::<Order>()
1066    ///     .filter(|o| o.status == OrderStatus::Pending)
1067    ///     .sort_by(|o| Dynamic::U64(o.placed_at))
1068    ///     .limit(5)
1069    ///     .fetch()?;
1070    /// assert!(pending.len() <= 5);
1071    /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
1072    /// # Ok(())
1073    /// # }
1074    /// ```
1075    #[must_use]
1076    pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
1077        crate::Query::new(self)
1078    }
1079
1080    /// Open a read-only typed handle to the collection registered
1081    /// under the runtime `name`, instead of the type's compile-time
1082    /// `T::COLLECTION`.
1083    ///
1084    /// This unlocks the
1085    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1086    /// § Portability example for
1087    /// attached databases: by passing a namespaced name like
1088    /// `"archive.orders"`, the returned [`crate::Collection`] reads
1089    /// from the database attached under the `"archive"` namespace
1090    /// — see [`Db::attach`].
1091    ///
1092    /// Construction is **infallible**: errors (missing collection,
1093    /// unknown namespace, busy lock) surface at the first method
1094    /// call on the handle, not at the call to `collection(name)`.
1095    /// Each read-only method on the returned handle opens a private
1096    /// [`Db::read_transaction`] and dispatches against the
1097    /// runtime-named collection's catalog row.
1098    ///
1099    /// # Read-only
1100    ///
1101    /// The returned handle rejects every mutating call —
1102    /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
1103    /// all return [`Error::ReadOnly`]. To write into a non-default
1104    /// collection, override [`obj_core::Document::COLLECTION`] on
1105    /// the type itself (compile-time-bound) and use the regular
1106    /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
1107    /// Phase 1B (M11 #94) intentionally limits the runtime accessor
1108    /// to reads — the write-through-runtime-name path requires
1109    /// engine plumbing that is deferred to a later milestone.
1110    ///
1111    /// # Example
1112    ///
1113    /// ```
1114    /// use obj::{Db, Document};
1115    /// use serde::{Deserialize, Serialize};
1116    /// use tempfile::tempdir;
1117    ///
1118    /// #[derive(Debug, Clone, Serialize, Deserialize)]
1119    /// struct Order { customer_id: u64, total_cents: u64 }
1120    ///
1121    /// impl Document for Order {
1122    ///     const COLLECTION: &'static str = "orders";
1123    ///     const VERSION: u32 = 1;
1124    /// }
1125    ///
1126    /// fn run() -> obj::Result<()> {
1127    ///     let dir = tempdir()?;
1128    ///     let archive_path = dir.path().join("archive.obj");
1129    ///     // Populate the archive database first.
1130    ///     {
1131    ///         let archive_db = Db::open(&archive_path)?;
1132    ///         archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
1133    ///     }
1134    ///     // Attach it under a namespace and read via the runtime
1135    ///     // accessor — no need to re-declare `Order` with a
1136    ///     // namespaced `COLLECTION`.
1137    ///     let main_path = dir.path().join("main.obj");
1138    ///     let mut db = Db::open(&main_path)?;
1139    ///     db.attach(&archive_path, "archive")?;
1140    ///     let archived: Vec<Order> = db
1141    ///         .collection::<Order>("archive.orders")
1142    ///         .all()?
1143    ///         .into_iter()
1144    ///         .map(|(_id, doc)| doc)
1145    ///         .collect();
1146    ///     assert_eq!(archived.len(), 1);
1147    ///     Ok(())
1148    /// }
1149    /// # run().unwrap();
1150    /// ```
1151    #[must_use]
1152    pub fn collection<T: Document + Send + 'static>(
1153        &self,
1154        name: impl Into<String>,
1155    ) -> crate::Collection<'_, T> {
1156        crate::Collection::<T>::lazy(self, name.into())
1157    }
1158
1159    /// Convenience shim mirroring
1160    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md) —
1161    /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1162    /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1163    /// that drives the streaming iterator to exhaustion and
1164    /// collects; if the collection is large enough that peak
1165    /// memory matters, prefer [`Db::iter_all`] directly.
1166    ///
1167    /// Sorting requires materialisation — the comparator needs
1168    /// every key up front. Use [`Db::query`] +
1169    /// [`Query::sort_by`](crate::Query::sort_by) for the
1170    /// top-N-sorted workload; the iterator side has no streaming
1171    /// sorted shape.
1172    ///
1173    /// # Examples
1174    ///
1175    /// ```
1176    /// # fn main() -> obj::Result<()> {
1177    /// use obj::Db;
1178    /// use serde::{Deserialize, Serialize};
1179    ///
1180    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1181    /// struct Order { total_cents: u64 }
1182    ///
1183    /// let dir = tempfile::tempdir()?;
1184    /// let db = Db::open(dir.path().join("all.obj"))?;
1185    /// for i in 0..5u64 {
1186    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1187    /// }
1188    /// let listed: Vec<Order> = db.all::<Order>()?;
1189    /// assert_eq!(listed.len(), 5);
1190    /// # Ok(())
1191    /// # }
1192    /// ```
1193    ///
1194    /// # Errors
1195    ///
1196    /// As [`Db::iter_all`].
1197    pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1198        self.iter_all::<T>()?
1199            .map(|step| step.map(|(_id, doc)| doc))
1200            .collect()
1201    }
1202
1203    /// Write a self-contained `.obj` file at `dest` carrying this
1204    /// database's state at the LSN of an internally-taken reader
1205    /// snapshot.
1206    ///
1207    /// Hot backup — writers continue uninterrupted against the
1208    /// source. Post-snapshot writes are NOT in the destination.
1209    ///
1210    /// Algorithm (per
1211    /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
1212    /// § Hot backup):
1213    ///
1214    /// 1. Take a `ReaderSnapshot` against the source pager (pins
1215    ///    `pinned_lsn`).
1216    /// 2. `OpenOptions::create_new(true)` on `dest`.
1217    /// 3. Copy main-file pages `0..page_count` to `dest`.
1218    /// 4. Overlay every frame in the snapshot's frozen WAL view
1219    ///    onto `dest` at the frame's page-id offset.
1220    /// 5. If the snapshot carries a WAL-staged page-0 header,
1221    ///    overlay it (so `dest`'s page-0 reflects the catalog
1222    ///    root / freelist head / page count the snapshot would
1223    ///    have observed).
1224    /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1225    ///    the header CRC32C.
1226    /// 7. `sync_data(SyncMode::Full)` on `dest`.
1227    /// 8. Drop the snapshot (releases the WAL pin).
1228    ///
1229    /// On any mid-backup error the destination file is removed
1230    /// best-effort so a half-written backup does not linger.
1231    ///
1232    /// # Examples
1233    ///
1234    /// ```
1235    /// # fn main() -> obj::Result<()> {
1236    /// use obj::Db;
1237    /// use serde::{Deserialize, Serialize};
1238    ///
1239    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1240    /// #[obj(collection = "notes_backup_doc")]
1241    /// struct Note { body: String }
1242    ///
1243    /// let dir = tempfile::tempdir()?;
1244    /// let src = dir.path().join("src.obj");
1245    /// let dst = dir.path().join("backup.obj");
1246    ///
1247    /// let db = Db::open(&src)?;
1248    /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1249    /// db.backup_to(&dst)?;
1250    ///
1251    /// // The backup is itself a fully-formed obj file. Open it and read.
1252    /// let backup = Db::open(&dst)?;
1253    /// let listed: Vec<Note> = backup.all::<Note>()?;
1254    /// assert_eq!(listed.len(), 1);
1255    /// # Ok(())
1256    /// # }
1257    /// ```
1258    ///
1259    /// # Errors
1260    ///
1261    /// - [`Error::BackupDestinationExists`] if `dest` already
1262    ///   exists.
1263    /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1264    ///   a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1265    /// - [`Error::Io`] on syscall failure during the copy.
1266    pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1267        // #76: a hot backup reads the source MAIN FILE page-by-page
1268        // (`backup::copy_main_file`). The in-process pager `Mutex`
1269        // alone excludes only same-process writers; a writer in a
1270        // SEPARATE OS process can checkpoint pages into the main file
1271        // mid-copy and yield a torn backup. Hold the cross-process
1272        // `WRITER_LOCK` for the whole copy so no external writer can
1273        // mutate the main file under us.
1274        //
1275        // Lock-order invariant (MUST match `WriteTxn::begin`, see
1276        // `docs/concurrency.md`): in-process write-serialization FIRST,
1277        // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1278        // (innermost, acquired and released within the critical
1279        // section). `obj_core::WriteTxn::begin` acquires the first two
1280        // in exactly that order, so routing through it inherits the
1281        // correct ordering and cannot invert against a concurrent
1282        // in-process `WriteTxn`. We make NO writes; the txn is rolled
1283        // back (a no-op restore of the unchanged header) once the copy
1284        // is done. On in-memory / `cross_process_lock = false` envs the
1285        // cross-process guard is absent (`lock_file = None`) and the
1286        // txn degrades to the in-process serialization guard only.
1287        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1288        let result = self.run_backup_under_guard(dest);
1289        // Release both lock layers in the documented order regardless
1290        // of the copy's outcome. A rollback failure (poisoned pager
1291        // mutex) only surfaces when the copy itself succeeded.
1292        let unlock = guard.rollback();
1293        result.and(unlock)
1294    }
1295
1296    /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1297    /// reader snapshot, and run the backup copy. Factored out of
1298    /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1299    /// is unambiguous and the function stays within the power-of-ten
1300    /// 60-line budget (Rule 4).
1301    fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1302        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1303            kind: obj_core::LockKind::WriterInProcess,
1304        })?;
1305        let snapshot = pager.reader_snapshot()?;
1306        obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1307        // Snapshot drops at end of scope; the pin is released and
1308        // any deferred checkpoint may proceed.
1309        drop(snapshot);
1310        drop(pager);
1311        Ok(())
1312    }
1313
1314    /// Fold committed WAL pages into the main `.obj` file and reset
1315    /// the WAL back to its 64-byte header.
1316    ///
1317    /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1318    /// pager lock the same way [`Self::backup_to`] does, then calls
1319    /// the pager checkpoint. After it returns, the committed records
1320    /// live in the main file rather than the `-wal` sidecar, and the
1321    /// WAL is truncated to header-only.
1322    ///
1323    /// # Deferred / no-op behavior
1324    ///
1325    /// - If a live MVCC reader has pinned an LSN below the end of the
1326    ///   WAL, the checkpoint is **deferred** (a safe no-op) so the
1327    ///   reader's frames are not reclaimed out from under it.
1328    /// - If there is nothing to fold (no committed WAL frames), the
1329    ///   call is a harmless no-op.
1330    ///
1331    /// This is the reusable engine entry point behind the Python
1332    /// `Db.checkpoint()` binding and a future checkpoint-on-close
1333    /// path (issue #5).
1334    ///
1335    /// # Errors
1336    ///
1337    /// - [`Error::ReadOnly`] if the database was opened read-only.
1338    /// - [`Error::Busy`] if the pager lock is poisoned.
1339    /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1340    ///   (e.g. [`Error::Io`] on syscall failure).
1341    #[cfg_attr(
1342        feature = "tracing",
1343        tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1344    )]
1345    pub fn checkpoint(&self) -> Result<()> {
1346        if self.readonly {
1347            return Err(Error::ReadOnly {
1348                operation: "checkpoint",
1349            });
1350        }
1351        // #76: checkpoint folds committed WAL frames into the main
1352        // file and rotates the WAL salt. Both touch the main file and
1353        // the on-disk header. Taken under only the in-process pager
1354        // `Mutex` (as before), a writer in a SEPARATE process could
1355        // run its own commit/checkpoint concurrently and corrupt the
1356        // salt-rotation handshake or interleave main-file writes. Hold
1357        // the cross-process `WRITER_LOCK` for the duration, in the
1358        // documented lock order (in-process serialization → cross-
1359        // process writer lock → pager mutex; see `WriteTxn::begin` and
1360        // `Self::backup_to`). We make no user writes; the surrounding
1361        // txn is rolled back afterwards. The rollback's
1362        // `restore_header_snapshot` only rewrites `root_catalog` /
1363        // `freelist_head` / `page_count` (unchanged by checkpoint) and
1364        // preserves the freshly-rotated `wal_salt`, so it is a no-op
1365        // with respect to the checkpoint's durable effects.
1366        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1367        let result = self.run_checkpoint_under_guard();
1368        let unlock = guard.rollback();
1369        result.and(unlock)
1370    }
1371
1372    /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1373    /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1374    /// cross-process lock guard's lifetime is unambiguous.
1375    fn run_checkpoint_under_guard(&self) -> Result<()> {
1376        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1377            kind: obj_core::LockKind::WriterInProcess,
1378        })?;
1379        pager.checkpoint()?;
1380        drop(pager);
1381        Ok(())
1382    }
1383
1384    /// Streaming iterator over every `(Id, T)` pair in the
1385    /// collection. The returned [`IterAll`] holds a read transaction
1386    /// — and therefore a pinned reader snapshot — for its entire
1387    /// lifetime; the borrow on `self` ends when the iterator is
1388    /// dropped.
1389    ///
1390    /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1391    /// errors surface as `Some(Err(_))` rather than ending the
1392    /// iteration; the caller decides whether to propagate or
1393    /// continue.
1394    ///
1395    /// Peak memory does NOT scale with collection size — the
1396    /// iterator's internal buffer is fixed at
1397    /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the
1398    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1399    /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1400    ///
1401    /// `Query::fetch` is sort-compatible (sort requires
1402    /// materialisation); `iter_all` is NOT — there is no streaming
1403    /// shape for sort because the comparator needs every key
1404    /// up front. Use `Query::sort_by` + `fetch` for the sorted
1405    /// workload; use `iter_all` for the unsorted large-scan
1406    /// workload.
1407    ///
1408    /// # Examples
1409    ///
1410    /// Streaming a small collection and folding into a running sum.
1411    /// The iterator's peak memory stays bounded regardless of how
1412    /// many documents the collection holds:
1413    ///
1414    /// ```
1415    /// # fn main() -> obj::Result<()> {
1416    /// use obj::Db;
1417    /// use serde::{Deserialize, Serialize};
1418    ///
1419    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1420    /// struct Order { total_cents: u64 }
1421    ///
1422    /// let dir = tempfile::tempdir()?;
1423    /// let db = Db::open(dir.path().join("iter.obj"))?;
1424    /// for i in 0..5u64 {
1425    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1426    /// }
1427    ///
1428    /// let mut total: u64 = 0;
1429    /// for step in db.iter_all::<Order>()? {
1430    ///     let (_id, doc) = step?;
1431    ///     total = total
1432    ///         .checked_add(doc.total_cents)
1433    ///         .ok_or(obj::Error::InvalidArgument("overflow"))?;
1434    /// }
1435    /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1436    /// # Ok(())
1437    /// # }
1438    /// ```
1439    ///
1440    /// # Errors
1441    ///
1442    /// - As [`Db::read_transaction`] (construction-time).
1443    /// - [`Error::CollectionNotFound`] if the collection is not yet
1444    ///   registered at the snapshot's pinned LSN.
1445    /// - Per-step iteration may yield `Some(Err(_))` for pager,
1446    ///   B-tree, or codec failures.
1447    pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1448        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1449        let txn = ReadTxn::new(inner);
1450        // Resolve the descriptor up front so an absent collection
1451        // surfaces at construction (matching `Db::all`'s pre-M8
1452        // contract). The handle's lifetime is bound to `txn`; we
1453        // drop the handle immediately and stash the descriptor.
1454        let descriptor = {
1455            let coll = txn.collection::<T>()?;
1456            coll.descriptor().clone()
1457        };
1458        Ok(IterAll {
1459            txn,
1460            descriptor,
1461            buffer: VecDeque::new(),
1462            last_emitted_key: None,
1463            finished: false,
1464            _phantom: PhantomData,
1465        })
1466    }
1467}
1468
1469/// Streaming iterator returned by [`Db::iter_all`].
1470///
1471/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1472/// is consistent with the snapshot pinned at construction. Yields
1473/// `Result<(Id, T)>` one entry at a time; refills its internal
1474/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1475/// pager-lock acquisition, so peak memory stays bounded at a small
1476/// constant regardless of the collection's size — power-of-ten
1477/// Rule 3.
1478///
1479/// Construction errors surface at the [`Db::iter_all`] call site;
1480/// per-step errors (pager, B-tree, codec) surface as
1481/// `Some(Err(_))` during iteration and do NOT terminate it — the
1482/// caller decides whether to continue.
1483// Note: `IterAll<T>` is deliberately outside the `serde` surface
1484// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1485// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1486// neither of which is serializable in a meaningful way. Users who
1487// need an off-line representation of iteration output should
1488// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1489// resulting `Vec<T>` themselves.
1490pub struct IterAll<'db, T> {
1491    /// Owns the snapshot pin for the iterator's lifetime.
1492    txn: ReadTxn<'db>,
1493    /// Cached primary-tree root + collection id (`Document::decode`
1494    /// needs the collection id to validate the per-doc header).
1495    descriptor: CollectionDescriptor,
1496    /// Pre-decoded buffer of upcoming entries.
1497    buffer: VecDeque<Result<(Id, T)>>,
1498    /// Resumption marker — `Excluded(last_emitted_key)` is the
1499    /// start bound of the next refill.
1500    last_emitted_key: Option<Vec<u8>>,
1501    /// `true` once the underlying B-tree iterator returned no more
1502    /// entries; subsequent `next` calls return `None`.
1503    finished: bool,
1504    /// Track the `T` parameter without owning a value.
1505    _phantom: PhantomData<fn() -> T>,
1506}
1507
1508impl<T> std::fmt::Debug for IterAll<'_, T> {
1509    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1510        f.debug_struct("IterAll")
1511            .field("collection_id", &self.descriptor.collection_id)
1512            .field("buffer_len", &self.buffer.len())
1513            .field("finished", &self.finished)
1514            .finish_non_exhaustive()
1515    }
1516}
1517
1518impl<T: Document + Send + 'static> IterAll<'_, T> {
1519    /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1520    /// entries, resuming from `last_emitted_key`. Stores per-doc
1521    /// decode errors as `Err` in the buffer so the caller can
1522    /// observe them via `next()` without aborting iteration.
1523    ///
1524    /// Power-of-ten Rule 7: every fallible operation is either
1525    /// captured into the buffer or returned up-front via `?` (which
1526    /// propagates the lock-acquisition / B-tree-open failures
1527    /// before the buffer is touched).
1528    fn refill(&mut self) -> Result<()> {
1529        // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1530        // NOT keep an immutable borrow of `self.txn` alive across
1531        // the buffer-mutation calls below.
1532        let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1533        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1534            kind: obj_core::LockKind::WriterInProcess,
1535        })?;
1536        let root_pid = PageId::new(self.descriptor.primary_root)
1537            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1538        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1539        let start = match &self.last_emitted_key {
1540            Some(k) => Bound::Excluded(k.clone()),
1541            None => Bound::Unbounded,
1542        };
1543        let collection_id = self.descriptor.collection_id;
1544        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1545        let mut yielded: usize = 0;
1546        let mut last_key: Option<Vec<u8>> = None;
1547        let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1548        for step in iter {
1549            if yielded >= ITER_ALL_BATCH {
1550                break;
1551            }
1552            yielded = yielded
1553                .checked_add(1)
1554                .ok_or(Error::BTreeInvariantViolated {
1555                    reason: "iter_all batch counter overflow",
1556                })?;
1557            buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1558        }
1559        if yielded < ITER_ALL_BATCH {
1560            self.finished = true;
1561        }
1562        // The lock is held until `pager` drops at function end — but
1563        // by the time we mutate `self.buffer` / `self.last_emitted_key`
1564        // below, the `iter` (which borrowed `pager`) is already
1565        // dropped (its scope ended). Move the staged batch over.
1566        drop(pager);
1567        self.buffer.extend(batch);
1568        if let Some(k) = last_key {
1569            self.last_emitted_key = Some(k);
1570        }
1571        Ok(())
1572    }
1573}
1574
1575/// Process one B-tree iterator step into the staged `batch`. Decode
1576/// errors and corruption errors are captured as `Err` entries so
1577/// they surface via `next()` rather than aborting the refill —
1578/// power-of-ten Rule 7.
1579///
1580/// Free function (rather than `&mut self`) so the caller can keep
1581/// the pager lock open across multiple buffer-pushes without the
1582/// borrow checker tripping on a second `&mut self.buffer` borrow.
1583fn buffer_one_entry<T: Document>(
1584    batch: &mut VecDeque<Result<(Id, T)>>,
1585    last_key: &mut Option<Vec<u8>>,
1586    collection_id: u32,
1587    step: Result<(Vec<u8>, Vec<u8>)>,
1588) {
1589    let (key, value) = match step {
1590        Ok(kv) => kv,
1591        Err(e) => {
1592            batch.push_back(Err(e));
1593            return;
1594        }
1595    };
1596    let Some(id) = Id::from_be_bytes(&key) else {
1597        batch.push_back(Err(Error::InvalidArgument(
1598            "primary B-tree key is not an Id",
1599        )));
1600        return;
1601    };
1602    *last_key = Some(key);
1603    let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1604    batch.push_back(decoded.map(|doc| (id, doc)));
1605}
1606
1607impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1608    type Item = Result<(Id, T)>;
1609
1610    fn next(&mut self) -> Option<Self::Item> {
1611        if let Some(item) = self.buffer.pop_front() {
1612            return Some(item);
1613        }
1614        if self.finished {
1615            return None;
1616        }
1617        if let Err(e) = self.refill() {
1618            // A lock / open failure surfaces here ONCE and then
1619            // `finished` is flipped so subsequent `next` calls
1620            // terminate cleanly. The error is surfaced to the
1621            // caller via `Some(Err(_))` — power-of-ten Rule 7.
1622            self.finished = true;
1623            return Some(Err(e));
1624        }
1625        self.buffer.pop_front()
1626    }
1627}
1628
1629/// Split a possibly-namespaced collection name into its
1630/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1631/// only; downstream collection names may contain further dots.
1632///
1633/// `"users"` → `(None, "users")`.
1634/// `"archive.orders"` → `(Some("archive"), "orders")`.
1635/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1636#[must_use]
1637pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1638    match name.find('.') {
1639        Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1640        None => (None, name),
1641    }
1642}
1643
1644/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1645/// concurrent reader / single-writer workloads.  The thread-safety
1646/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1647const _: () = {
1648    fn assert_send_sync<T: Send + Sync>() {}
1649    let _ = assert_send_sync::<Db>;
1650};
1651
1652/// Translate the first failure in `report` (if any) into the
1653/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1654/// open-time fast check so the caller sees `Err(Error::Corruption {
1655/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1656/// in the returned error is the locus of the failure when one is
1657/// available; for failures whose locus is a non-page (e.g. an
1658/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1659/// catalog root page-id is used as a stand-in.
1660fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1661    let first = report.failures.first()?;
1662    let err = match first {
1663        obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1664        | obj_core::IntegrityFailure::OrphanPage { page_id }
1665        | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1666        | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1667        | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1668        | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1669        | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1670            Error::Corruption { page_id: *page_id }
1671        }
1672        obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1673            Error::BTreeDepthExceeded { limit: *limit }
1674        }
1675        // The remaining variants — OrphanIndexEntry,
1676        // MissingIndexEntry, and any future `#[non_exhaustive]`
1677        // additions — are never emitted by `quick_check` (which
1678        // walks the catalog tree only, not the per-collection
1679        // primary or index trees). The defensive fallback is a
1680        // coarse `Corruption { page_id: 0 }` so the open-time
1681        // check still fails closed if the obj-core layer ever
1682        // grows a new fast-check failure mode.
1683        _ => Error::Corruption { page_id: 0 },
1684    };
1685    Some(err)
1686}