Skip to main content

obj/
db.rs

1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in `design.md` § API and pinned in `docs/format.md`
5//! § Db public API contract.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::marker::PhantomData;
9use std::ops::Bound;
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use obj_core::btree::BTree;
15use obj_core::pager::page::PageId;
16use obj_core::pager::{lock_path_for, Pager};
17use obj_core::platform::FileHandle;
18use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
19
20use crate::config::Config;
21use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
22
23/// Per-batch refill size for [`IterAll`]. The iterator yields one
24/// document at a time to the caller, but internally fetches the
25/// primary B-tree in `BATCH` chunks so the per-step pager-lock
26/// acquisition cost amortises over many `next` calls. Power-of-ten
27/// Rule 3: the buffer is fixed-size (constant 256 entries — at
28/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
29/// with the collection's total size.
30const ITER_ALL_BATCH: usize = 256;
31
32/// The embedded document database.
33///
34/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
35/// concurrent reader / single-writer access.  See `design.md`
36/// § "Concurrent readers, one writer".
37///
38/// At M6 the public `Db` is hard-typed against
39/// `obj_core::FileHandle`.  A future refactor may make it generic
40/// over `F: FileBackend` so fault-injection harnesses can build on
41/// the same API; today the test helpers reach for the lower-level
42/// `obj-core` building blocks instead.
43pub struct Db {
44    pub(crate) env: Arc<TxnEnv<FileHandle>>,
45    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
46    pub(crate) readonly: bool,
47    pub(crate) busy_timeout: Duration,
48    /// Per-process cache of `(collection, version)` keys whose
49    /// `Document::indexes()` reconciliation has already run. M7
50    /// #57: reconciliation is idempotent but a catalog walk +
51    /// declaration cycle is non-trivial — caching ensures only the
52    /// first `WriteTxn::collection::<T>()` call per process per
53    /// `(collection, version)` pays the cost. #130: the version is part
54    /// of the key so a later schema version that ADDS an index still
55    /// reconciles (the name-only key skipped it, leaving the new index
56    /// never `Active`).
57    pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
58    /// M11 #93: registry of attached read-only databases keyed by
59    /// namespace. Populated by [`Db::attach`]; consulted by
60    /// [`Db::collection_namespace`] and by
61    /// [`Db::read_transaction`] when it pins per-attached
62    /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
63    /// `&mut self` while live read transactions hold borrows into
64    /// the registry; the mutex coordinates the two.
65    pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
66    /// #83 (c): published length of `attached`, mutated only while the
67    /// `attached` mutex is held. A relaxed load lets the hot read path
68    /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
69    /// no databases are attached (the overwhelmingly common case).
70    pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
71}
72
73impl std::fmt::Debug for Db {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("Db")
76            .field("readonly", &self.readonly)
77            .field("busy_timeout", &self.busy_timeout)
78            .finish_non_exhaustive()
79    }
80}
81
82// ---------- FFI plumbing ------------------------------------------
83//
84// The accessors below expose the shareable internals of `Db` so the
85// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
86// iterator handles without re-implementing the lifecycle. They are
87// marked `#[doc(hidden)]` because user-Rust code should reach for
88// the typed `Db::transaction` / `Db::read_transaction` API; the
89// raw accessors exist so a sibling FFI / native-host crate can
90// build its own self-contained handle types.
91impl Db {
92    /// Shared environment Arc — pager + cross-process lock file.
93    ///
94    /// Used by [`libobj`](../../libobj/index.html) to build owned
95    /// transaction handles whose lifetime extends past a single
96    /// `Db::transaction` closure call.
97    #[doc(hidden)]
98    #[must_use]
99    pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
100        Arc::clone(&self.env)
101    }
102
103    /// Shared catalog Arc.
104    ///
105    /// Used by [`libobj`](../../libobj/index.html) for the same
106    /// reason as [`Self::env_arc`].
107    #[doc(hidden)]
108    #[must_use]
109    pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
110        Arc::clone(&self.catalog)
111    }
112
113    /// Shared per-process reconciliation cache Arc.
114    ///
115    /// Used by [`libobj`](../../libobj/index.html) for the same
116    /// reason as [`Self::env_arc`].
117    #[doc(hidden)]
118    #[must_use]
119    pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
120        Arc::clone(&self.reconciled)
121    }
122
123    /// Busy-lock timeout configured at open time.
124    #[doc(hidden)]
125    #[must_use]
126    pub fn busy_timeout(&self) -> Duration {
127        self.busy_timeout
128    }
129}
130
131impl Db {
132    /// Open or create a file-backed database at `path` with default
133    /// configuration.
134    ///
135    /// Creates the file if absent; reopens otherwise. A `Db` is
136    /// `Send + Sync` — share across threads via `Arc<Db>` for the
137    /// concurrent-reader / single-writer workload documented in
138    /// `docs/concurrency.md`.
139    ///
140    /// For an ephemeral database use [`Db::memory`]; for a
141    /// read-only handle that coexists with another process's writer
142    /// use [`Db::open_readonly`]; for custom durability /
143    /// cache / lock knobs use [`Db::open_with`] + [`Config`].
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// # fn main() -> obj::Result<()> {
149    /// use obj::Db;
150    ///
151    /// let dir = tempfile::tempdir()?;
152    ///
153    /// // File-backed. Creates the file if absent; reopens otherwise.
154    /// let _db = Db::open(dir.path().join("app.obj"))?;
155    ///
156    /// // In-memory. No persistence, no file locks. Useful for tests.
157    /// let _mem = Db::memory()?;
158    ///
159    /// // Read-only. Coexists safely with a writer in another process.
160    /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
161    /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
162    /// # Ok(())
163    /// # }
164    /// ```
165    ///
166    /// # Errors
167    ///
168    /// Returns the underlying [`Error`] from
169    /// [`obj_core::pager::Pager::open`] on syscall or format
170    /// failure.
171    #[cfg_attr(
172        feature = "tracing",
173        tracing::instrument(
174            name = "db.open",
175            level = "info",
176            skip_all,
177            fields(path = %path.as_ref().display()),
178        )
179    )]
180    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
181        Self::open_with(path, Config::default())
182    }
183
184    /// Open or create a file-backed database with `config`.
185    ///
186    /// # Errors
187    ///
188    /// As [`Db::open`].
189    // Takes `Config` by value: this is the frozen builder-handoff
190    // signature (the caller's `Config::default().sync_mode(..)...`
191    // chain ends here). `Config` is no longer `Copy` (issue #17), so
192    // clippy now sees the by-value argument as only borrowed in the
193    // body — but the by-value convention is part of the 1.0 surface
194    // and must not change. Power-of-ten Rule 10: documented allow.
195    #[allow(clippy::needless_pass_by_value)]
196    #[cfg_attr(
197        feature = "tracing",
198        tracing::instrument(
199            name = "db.open",
200            level = "info",
201            skip_all,
202            fields(path = %path.as_ref().display()),
203        )
204    )]
205    pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
206        let path_buf = path.as_ref().to_path_buf();
207        // Issue #31: the pager `Config` is no longer `Copy` (its
208        // `encryption_key` zeroizes on drop), so move it out of
209        // `config` rather than copying it — `from_parts` only reads
210        // the remaining scalar fields. `mem::take` leaves a default
211        // (keyless) pager `Config` behind, which is never read again.
212        let pager_config = std::mem::take(&mut config.pager);
213        let pager = Pager::open(&path_buf, pager_config)?;
214        // Issue #1: cross-process locks anchor against a dedicated
215        // `<db>.obj-lock` sidecar file. Keeping the lock byte out
216        // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
217        // Windows once the DB grows past whatever offset the
218        // lock byte would otherwise sit at (`LockFileEx` is
219        // mandatory). The sidecar is left in place across opens;
220        // deleting it would race two openers into two different
221        // inodes and miss each other's exclusion.
222        let lock_file = if config.cross_process_lock {
223            let lock_path = lock_path_for(&path_buf);
224            let handle = FileHandle::open_or_create(&lock_path)?;
225            // Materialise the locked byte range as real file
226            // content. POSIX OFD and Windows `LockFileEx` both
227            // permit locking past EOF, but a non-empty sidecar
228            // is the most conservative choice across kernels
229            // (and lets the same offsets work on every platform).
230            // 128 bytes covers WRITER_LOCK (96) and the full
231            // READER_LOCK_RANGE (97..128).
232            handle.set_len(128)?;
233            Some(Arc::new(handle))
234        } else {
235            None
236        };
237        Self::from_parts(pager, lock_file, &config)
238    }
239
240    /// Open a fresh in-memory database.  No persistence, no file
241    /// locks.  Useful for unit tests and ephemeral workloads.
242    ///
243    /// # Errors
244    ///
245    /// Returns [`Error::InvalidArgument`] only if `config` has
246    /// zero cache frames.
247    pub fn memory() -> Result<Self> {
248        Self::memory_with(Config::default())
249    }
250
251    /// As [`Db::memory`] with a caller-supplied [`Config`].
252    ///
253    /// # Errors
254    ///
255    /// As [`Db::memory`].
256    // By-value `Config` is the frozen builder-handoff signature; see
257    // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
258    // allow (issue #17 dropped `Copy`, which made the lint fire).
259    #[allow(clippy::needless_pass_by_value)]
260    pub fn memory_with(mut config: Config) -> Result<Self> {
261        // Issue #31: move the (no-longer-`Copy`) pager `Config` out
262        // rather than copying it; `from_parts` only reads the
263        // remaining scalar fields. See `open_with` for the rationale.
264        let pager_config = std::mem::take(&mut config.pager);
265        let pager = Pager::memory(pager_config)?;
266        Self::from_parts(pager, None, &config)
267    }
268
269    /// Open the database at `path` in read-only mode.  The
270    /// resulting `Db` rejects every mutating operation with
271    /// `Err(Error::ReadOnly { ... })`.  Coexists safely with a
272    /// writer in another process via the cross-process reader
273    /// lock.
274    ///
275    /// # Errors
276    ///
277    /// As [`Db::open`].
278    pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
279        let config = Config {
280            readonly: true,
281            ..Config::default()
282        };
283        Self::open_with(path, config)
284    }
285
286    fn from_parts(
287        mut pager: Pager<FileHandle>,
288        lock_file: Option<Arc<FileHandle>>,
289        config: &Config,
290    ) -> Result<Self> {
291        // #64: catalog init (and its `tree.insert` → `alloc_page` /
292        // `set_root_catalog` chain) mutates the file header. Header
293        // mutations now ride the WAL via `stage_or_write_header`,
294        // which requires an open Pager txn so the staged page-0 frame
295        // commits atomically with the B-tree pages it depends on.
296        // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
297        // a `Db::open` against a fresh file is durable on return
298        // (matching the pre-#64 contract). `open_or_init` on an
299        // already-initialised database is read-only and the wrapped
300        // `commit()` is a no-op (`Pager::commit` short-circuits on an
301        // empty pending set + clean `header_dirty`).
302        pager.begin_txn();
303        let init = Catalog::open_or_init(&mut pager);
304        let catalog = match init {
305            Ok(c) => {
306                let commit_result = pager.commit();
307                pager.end_txn();
308                commit_result?;
309                c
310            }
311            Err(e) => {
312                pager.end_txn();
313                return Err(e);
314            }
315        };
316        // M11 #91: lightweight open-time integrity check. Runs the
317        // catalog-portion of `integrity_check` and surfaces any
318        // detected failure as the strongest available error so the
319        // caller decides whether to attempt repair or abort. Opt out
320        // via `Config::skip_open_check(true)`.
321        if !config.skip_open_check {
322            let report = obj_core::integrity::quick_check(&mut pager)?;
323            if let Some(err) = first_failure_as_error(&report) {
324                return Err(err);
325            }
326        }
327        Ok(Self {
328            env: Arc::new(TxnEnv::new(pager, lock_file)),
329            catalog: Arc::new(Mutex::new(catalog)),
330            readonly: config.readonly,
331            busy_timeout: config.busy_timeout,
332            reconciled: Arc::new(Mutex::new(HashSet::new())),
333            attached: Arc::new(Mutex::new(HashMap::new())),
334            attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
335        })
336    }
337
338    /// Run a closure inside a write transaction.
339    ///
340    /// Begins a [`WriteTxn`], runs the closure with `&mut tx`.  If
341    /// the closure returns `Ok(r)`, the transaction is committed and
342    /// `Ok(r)` is returned.  If the closure returns `Err(e)`, the
343    /// transaction is rolled back and `Err(e)` is returned.  A
344    /// panic inside the closure unwinds with an implicit rollback
345    /// via the `WriteTxn` `Drop` impl. See `docs/concurrency.md`
346    /// for the lock-acquisition contract.
347    ///
348    /// # Examples
349    ///
350    /// Atomic batch — both inserts commit together or not at all:
351    ///
352    /// ```
353    /// # fn main() -> obj::Result<()> {
354    /// use obj::Db;
355    /// use serde::{Deserialize, Serialize};
356    ///
357    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
358    /// struct Order {
359    ///     customer_id: u64,
360    ///     total_cents: u64,
361    /// }
362    ///
363    /// let dir = tempfile::tempdir()?;
364    /// let db = Db::open(dir.path().join("txn.obj"))?;
365    ///
366    /// let (a, b) = db.transaction(|tx| {
367    ///     let coll = tx.collection::<Order>()?;
368    ///     let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
369    ///     let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
370    ///     Ok((a, b))
371    /// })?;
372    /// assert_ne!(a, b, "freshly-allocated ids are distinct");
373    /// # Ok(())
374    /// # }
375    /// ```
376    ///
377    /// Returning `Err(_)` rolls every staged write back; the `Err`
378    /// the closure returns is the `Err` the caller sees:
379    ///
380    /// ```
381    /// # fn main() -> obj::Result<()> {
382    /// use obj::{Db, Error};
383    /// use serde::{Deserialize, Serialize};
384    ///
385    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
386    /// struct Order { total_cents: u64 }
387    ///
388    /// let dir = tempfile::tempdir()?;
389    /// let db = Db::open(dir.path().join("rollback.obj"))?;
390    /// let id = db.insert(Order { total_cents: 10 })?;
391    ///
392    /// let outcome: obj::Result<()> = db.transaction(|tx| {
393    ///     let coll = tx.collection::<Order>()?;
394    ///     coll.update(id, |o| { o.total_cents = 99_999; })?;
395    ///     Err(Error::InvalidArgument("synthetic abort"))
396    /// });
397    /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
398    ///
399    /// let after: Order = db
400    ///     .get::<Order>(id)?
401    ///     .ok_or(Error::InvalidArgument("just inserted"))?;
402    /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
403    /// # Ok(())
404    /// # }
405    /// ```
406    ///
407    /// # Errors
408    ///
409    /// - [`Error::ReadOnly`] if the database was opened read-only.
410    /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
411    /// - Any error the closure returns.
412    #[cfg_attr(
413        feature = "tracing",
414        tracing::instrument(name = "db.transaction", level = "info", skip_all)
415    )]
416    pub fn transaction<R, F>(&self, body: F) -> Result<R>
417    where
418        F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
419    {
420        if self.readonly {
421            return Err(Error::ReadOnly {
422                operation: "transaction",
423            });
424        }
425        #[cfg(feature = "tracing")]
426        tracing::debug!("begin");
427        let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
428        let mut tx = WriteTxn::new(
429            inner,
430            Arc::clone(&self.catalog),
431            Arc::clone(&self.reconciled),
432        );
433        match body(&mut tx) {
434            Ok(value) => {
435                tx.commit()?;
436                #[cfg(feature = "tracing")]
437                tracing::debug!("commit");
438                Ok(value)
439            }
440            Err(e) => {
441                // Best-effort rollback; surface the closure's
442                // error.  Refresh the in-memory catalog after
443                // rollback so its B-tree root state matches the
444                // file's `root_catalog` header field — catalog
445                // mutations write the header directly (not through
446                // the WAL) so a rollback leaves the header pointing
447                // at the most-recent in-memory root, which is
448                // exactly what re-opening the catalog will pick up.
449                let _ = tx.rollback();
450                let _ = self.refresh_catalog();
451                #[cfg(feature = "tracing")]
452                tracing::debug!("rollback");
453                Err(e)
454            }
455        }
456    }
457
458    /// Re-open the in-memory `Catalog` handle from the pager.  Used
459    /// after a transaction rollback to discard the catalog's in-
460    /// memory `next_collection_id` / `tree.root` state that may
461    /// have advanced during the rolled-back closure.
462    fn refresh_catalog(&self) -> Result<()> {
463        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
464            kind: obj_core::LockKind::WriterInProcess,
465        })?;
466        let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
467        let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
468            kind: obj_core::LockKind::WriterInProcess,
469        })?;
470        *existing = fresh;
471        Ok(())
472    }
473
474    /// Run a closure inside a read transaction.  See
475    /// [`Self::transaction`] for the closure shape and atomicity
476    /// contract; reads inside `body` observe a consistent
477    /// snapshot of the database.
478    ///
479    /// Every read inside the closure observes a single consistent
480    /// snapshot — the snapshot is pinned at the moment the closure
481    /// begins. Concurrent writers do not affect what the closure
482    /// sees.
483    ///
484    /// # Examples
485    ///
486    /// Two reads inside one `read_transaction` see the same value
487    /// even if a writer commits in between:
488    ///
489    /// ```
490    /// # fn main() -> obj::Result<()> {
491    /// use obj::Db;
492    /// use serde::{Deserialize, Serialize};
493    ///
494    /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
495    /// struct Order { total_cents: u64 }
496    ///
497    /// let dir = tempfile::tempdir()?;
498    /// let db = Db::open(dir.path().join("read.obj"))?;
499    /// let id = db.insert(Order { total_cents: 10 })?;
500    ///
501    /// let (a, b) = db.read_transaction(|tx| {
502    ///     let coll = tx.collection::<Order>()?;
503    ///     let a = coll.get(id)?;
504    ///     let b = coll.get(id)?;
505    ///     Ok((a, b))
506    /// })?;
507    /// assert_eq!(a, b);
508    /// # Ok(())
509    /// # }
510    /// ```
511    ///
512    /// # Errors
513    ///
514    /// - [`Error::Busy`] if the reader lock could not be acquired.
515    /// - Any error the closure returns.
516    #[cfg_attr(
517        feature = "tracing",
518        tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
519    )]
520    pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
521    where
522        F: FnOnce(&ReadTxn<'_>) -> Result<R>,
523    {
524        #[cfg(feature = "tracing")]
525        tracing::debug!("begin");
526        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
527        // M11 #93: pin one snapshot per attached database so reads
528        // through the closure observe a consistent view per file.
529        let attached_contexts = self.pin_attached_snapshots()?;
530        let tx = ReadTxn::with_attached(inner, attached_contexts);
531        let result = body(&tx);
532        #[cfg(feature = "tracing")]
533        tracing::debug!("commit");
534        result
535    }
536
537    /// Snapshot every attached database into a per-namespace
538    /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
539    /// pin; dropping the returned map releases every pin.
540    fn pin_attached_snapshots(
541        &self,
542    ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
543        // #83 (c): the common case is no attached databases. A single
544        // relaxed load of `attached_len` lets the empty path skip the
545        // `self.attached` mutex acquire entirely (`with_capacity(0)`
546        // already does not allocate, so the mutex is the only cost).
547        //
548        // Correctness under concurrent detach: the map produced here
549        // feeds only namespaced read dispatch in `open_readonly_named`
550        // (`<ns>.<tail>` → attached snapshot). With zero attachments no
551        // namespace can resolve, so an empty map is observably the same
552        // as the locked build. `attached_len` is mutated only while the
553        // `attached` mutex is held (in `attach` / `detach`), so a `0`
554        // observed here means "no attachment is committed to the map",
555        // matching what a lock-and-read would have seen at that instant.
556        // A concurrent attach that has not yet published its count is
557        // exactly an attach ordered *after* this txn began — invisible
558        // by design, same as the cross-process snapshot semantics.
559        if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
560            return Ok(std::collections::HashMap::new());
561        }
562        let registry = self.attached.lock().map_err(|_| Error::Busy {
563            kind: obj_core::LockKind::WriterInProcess,
564        })?;
565        let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
566            std::collections::HashMap::with_capacity(registry.len());
567        for (namespace, attached) in registry.iter() {
568            let env = Arc::clone(&attached.env);
569            let snapshot = {
570                let mut pager = env.pager().lock().map_err(|_| Error::Busy {
571                    kind: obj_core::LockKind::WriterInProcess,
572                })?;
573                pager.reader_snapshot()?
574            };
575            out.insert(
576                namespace.clone(),
577                crate::txn::AttachedReadCtx { env, snapshot },
578            );
579        }
580        Ok(out)
581    }
582
583    /// Pin one [`crate::txn::AttachedReadCtx`] for the single attachment
584    /// registered under `namespace`. Used by [`Self::dump_raw`]'s
585    /// namespaced full-scan path: it needs exactly one attached env +
586    /// pinned snapshot, not the whole per-namespace map
587    /// [`Self::pin_attached_snapshots`] builds.
588    ///
589    /// The snapshot is pinned for the returned context's lifetime; the
590    /// caller (the `DumpIter`) owns the context, so the pin holds for
591    /// the iterator's whole life and a concurrent `detach` cannot
592    /// invalidate the in-flight scan.
593    ///
594    /// # Errors
595    ///
596    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is not
597    ///   attached on this handle.
598    /// - [`Error::Busy`] if the registry / pager mutex is poisoned.
599    pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
600        let env = {
601            let registry = self.attached.lock().map_err(|_| Error::Busy {
602                kind: obj_core::LockKind::WriterInProcess,
603            })?;
604            let attached =
605                registry
606                    .get(namespace)
607                    .ok_or_else(|| Error::CollectionNamespaceUnknown {
608                        namespace: namespace.to_owned(),
609                    })?;
610            Arc::clone(&attached.env)
611        };
612        let snapshot = {
613            let mut pager = env.pager().lock().map_err(|_| Error::Busy {
614                kind: obj_core::LockKind::WriterInProcess,
615            })?;
616            pager.reader_snapshot()?
617        };
618        Ok(crate::txn::AttachedReadCtx { env, snapshot })
619    }
620
621    /// Attach the database at `path` under `namespace`. The
622    /// attached file is opened read-only; collections in the
623    /// attached file become visible to subsequent
624    /// `Db::collection::<T>()` calls (and the one-shot per-op API
625    /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
626    /// form `<namespace>.<collection_name>`.
627    ///
628    /// Writes against namespaced collections return
629    /// [`Error::AttachedDatabaseIsReadOnly`].
630    ///
631    /// Each attached database gets its own snapshot pinned at
632    /// read-transaction begin; [`Db::detach`] removes the registry
633    /// entry but in-flight reads complete against their pinned
634    /// snapshot.
635    ///
636    /// # Examples
637    ///
638    /// ```
639    /// # fn main() -> obj::Result<()> {
640    /// use obj::Db;
641    /// use serde::{Deserialize, Serialize};
642    ///
643    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
644    /// #[obj(collection = "orders_attach_doc")]
645    /// struct Order { total_cents: u64 }
646    ///
647    /// // Same struct shape, namespaced collection name. Reads
648    /// // against this type route to the attached "archive" db.
649    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
650    /// #[obj(collection = "archive.orders_attach_doc")]
651    /// struct ArchivedOrder { total_cents: u64 }
652    ///
653    /// let dir = tempfile::tempdir()?;
654    /// let live = dir.path().join("live.obj");
655    /// let archive = dir.path().join("archive.obj");
656    ///
657    /// // Seed the archive (writes go through its own un-namespaced type).
658    /// {
659    ///     let archive_db = Db::open(&archive)?;
660    ///     let _ = archive_db.insert(Order { total_cents: 999 })?;
661    /// }
662    ///
663    /// // Open the live db, attach the archive under "archive".
664    /// let mut db = Db::open(&live)?;
665    /// let _ = db.insert(Order { total_cents: 100 })?;
666    /// db.attach(&archive, "archive")?;
667    ///
668    /// // One read transaction, two collections.
669    /// db.read_transaction(|tx| {
670    ///     let live = tx.collection::<Order>()?;
671    ///     let arch = tx.collection::<ArchivedOrder>()?;
672    ///     assert_eq!(live.all()?.len(), 1);
673    ///     assert_eq!(arch.all()?.len(), 1);
674    ///     Ok(())
675    /// })?;
676    ///
677    /// db.detach("archive")?;
678    /// # Ok(())
679    /// # }
680    /// ```
681    ///
682    /// # Errors
683    ///
684    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
685    ///   use on this `Db`.
686    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
687    ///   opened read-only.
688    pub fn attach<P: AsRef<std::path::Path>>(
689        &mut self,
690        path: P,
691        namespace: impl Into<String>,
692    ) -> Result<()> {
693        // The `&mut self` signature is preserved for API stability,
694        // but the attachment registry is interior-mutable
695        // (`Arc<Mutex<_>>` + `Arc<AtomicUsize>`), so the work is
696        // delegated to the `&self` core shared with
697        // [`Self::attach_shared`].
698        self.attach_inner(path.as_ref(), namespace.into())
699    }
700
701    /// Shared-reference (`&self`) form of [`Self::attach`]. Attaches
702    /// the database at `path` under `namespace` through `&self`, for
703    /// callers that hold a shared handle (e.g. an `Arc<Db>`) and
704    /// cannot obtain `&mut self`.
705    ///
706    /// Behaviour is identical to [`Self::attach`]: the attachment
707    /// registry is interior-mutable, guarded by the same per-`Db`
708    /// mutex, so concurrent `attach_shared` / `detach_shared` calls
709    /// from multiple threads serialise on that mutex. The duplicate-
710    /// namespace re-check after the read-only open closes the race
711    /// between two concurrent attaches of the same namespace.
712    ///
713    /// # Examples
714    ///
715    /// ```
716    /// # fn main() -> obj::Result<()> {
717    /// use std::sync::Arc;
718    /// use obj::Db;
719    ///
720    /// let dir = tempfile::tempdir()?;
721    /// let live = dir.path().join("live.obj");
722    /// let archive = dir.path().join("archive.obj");
723    /// let _ = Db::open(&archive)?;
724    ///
725    /// // A shared handle cannot call `&mut self` `attach`, but can
726    /// // call `attach_shared`.
727    /// let db = Arc::new(Db::open(&live)?);
728    /// db.attach_shared(&archive, "archive")?;
729    /// db.detach_shared("archive")?;
730    /// # Ok(())
731    /// # }
732    /// ```
733    ///
734    /// # Errors
735    ///
736    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
737    ///   use on this `Db`.
738    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
739    ///   opened read-only.
740    /// - [`Error::Busy`] if the registry mutex is poisoned.
741    pub fn attach_shared<P: AsRef<std::path::Path>>(
742        &self,
743        path: P,
744        namespace: impl Into<String>,
745    ) -> Result<()> {
746        self.attach_inner(path.as_ref(), namespace.into())
747    }
748
749    /// `&self` core shared by [`Self::attach`] and
750    /// [`Self::attach_shared`]. Both signatures are thin wrappers; the
751    /// mutation flows through the interior-mutable `attached` registry
752    /// regardless of the public method's receiver.
753    fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
754        let path_buf = path.to_path_buf();
755        {
756            let registry = self.attached.lock().map_err(|_| Error::Busy {
757                kind: obj_core::LockKind::WriterInProcess,
758            })?;
759            if registry.contains_key(&namespace) {
760                return Err(Error::AttachmentAlreadyExists { namespace });
761            }
762        }
763        let attached_db =
764            Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
765                path: path_buf.clone(),
766                source: Box::new(source),
767            })?;
768        let env = Arc::clone(&attached_db.env);
769        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
770            kind: obj_core::LockKind::WriterInProcess,
771        })?;
772        // Re-check after acquiring the lock — a sibling caller may
773        // have raced; the `Arc<Mutex<_>>` shape makes the check
774        // necessary on the second acquire.
775        if registry.contains_key(&namespace) {
776            return Err(Error::AttachmentAlreadyExists { namespace });
777        }
778        registry.insert(
779            namespace,
780            AttachedDb {
781                env,
782                _db: attached_db,
783            },
784        );
785        // #83 (c): publish the new length while still holding the
786        // `attached` mutex so `pin_attached_snapshots`' relaxed load is
787        // never stale relative to a committed attachment.
788        self.attached_len
789            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
790        Ok(())
791    }
792
793    /// Remove the attachment registered under `namespace`. Returns
794    /// [`Error::CollectionNamespaceUnknown`] if the namespace is
795    /// not attached.
796    ///
797    /// In-flight read transactions hold their own snapshot pins on
798    /// the attached env; detach removes the registry entry, but the
799    /// in-flight read may still complete against its pinned
800    /// snapshot.
801    ///
802    /// # Errors
803    ///
804    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
805    ///   not attached.
806    /// - [`Error::Busy`] if the registry mutex is poisoned.
807    pub fn detach(&mut self, namespace: &str) -> Result<()> {
808        // `&mut self` preserved for API stability; the registry is
809        // interior-mutable, so the work delegates to the `&self` core
810        // shared with [`Self::detach_shared`].
811        self.detach_inner(namespace)
812    }
813
814    /// Shared-reference (`&self`) form of [`Self::detach`]. Removes the
815    /// attachment registered under `namespace` through `&self`, for
816    /// callers that hold a shared handle (e.g. an `Arc<Db>`).
817    ///
818    /// Behaviour is identical to [`Self::detach`]. In-flight read
819    /// transactions hold their own snapshot pins on the attached env;
820    /// `detach_shared` removes the registry entry, but the in-flight
821    /// read may still complete against its pinned snapshot. Concurrent
822    /// `attach_shared` / `detach_shared` calls serialise on the same
823    /// per-`Db` registry mutex.
824    ///
825    /// # Errors
826    ///
827    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
828    ///   not attached.
829    /// - [`Error::Busy`] if the registry mutex is poisoned.
830    pub fn detach_shared(&self, namespace: &str) -> Result<()> {
831        self.detach_inner(namespace)
832    }
833
834    /// `&self` core shared by [`Self::detach`] and
835    /// [`Self::detach_shared`].
836    fn detach_inner(&self, namespace: &str) -> Result<()> {
837        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
838            kind: obj_core::LockKind::WriterInProcess,
839        })?;
840        if registry.remove(namespace).is_none() {
841            return Err(Error::CollectionNamespaceUnknown {
842                namespace: namespace.to_owned(),
843            });
844        }
845        // #83 (c): publish the post-detach length under the lock.
846        self.attached_len
847            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
848        Ok(())
849    }
850
851    /// Insert `doc` into its collection.  One-shot transaction;
852    /// returns the assigned [`Id`].
853    ///
854    /// The one-shot API opens, commits, and closes a private
855    /// transaction per call. Reach for [`Db::transaction`] when
856    /// several mutations must commit or roll back as a single
857    /// atomic unit.
858    ///
859    /// # Examples
860    ///
861    /// One-shot CRUD against a `Document`-derived type:
862    ///
863    /// ```
864    /// # fn main() -> obj::Result<()> {
865    /// use obj::Db;
866    /// use serde::{Deserialize, Serialize};
867    ///
868    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
869    /// struct Order {
870    ///     customer_id: u64,
871    ///     total_cents: u64,
872    ///     status: String,
873    /// }
874    ///
875    /// let dir = tempfile::tempdir()?;
876    /// let db = Db::open(dir.path().join("oneshot.obj"))?;
877    ///
878    /// // insert returns the freshly-allocated Id.
879    /// let id = db.insert(Order {
880    ///     customer_id: 1,
881    ///     total_cents: 100,
882    ///     status: "pending".to_owned(),
883    /// })?;
884    ///
885    /// // get returns Option<T>.
886    /// let _maybe: Option<Order> = db.get::<Order>(id)?;
887    ///
888    /// // update applies a closure in place.
889    /// db.update::<Order, _>(id, |o| {
890    ///     o.status = "shipped".to_owned();
891    /// })?;
892    ///
893    /// // upsert at a caller-supplied id (insert or replace).
894    /// let id2 = obj::Id::try_new(42)
895    ///     .ok_or(obj::Error::InvalidArgument("non-zero"))?;
896    /// db.upsert::<Order>(id2, Order {
897    ///     customer_id: 2,
898    ///     total_cents: 999,
899    ///     status: "new".to_owned(),
900    /// })?;
901    ///
902    /// // delete returns true if the row existed.
903    /// let existed = db.delete::<Order>(id)?;
904    /// assert!(existed);
905    /// # Ok(())
906    /// # }
907    /// ```
908    ///
909    /// # Errors
910    ///
911    /// As [`Self::transaction`] plus any error from
912    /// [`crate::Collection::insert`].
913    pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
914        self.transaction(|tx| tx.collection::<T>()?.insert(doc))
915    }
916
917    /// Fetch the document at `id`.  Returns `Ok(None)` if absent.
918    ///
919    /// # Errors
920    ///
921    /// As [`Self::read_transaction`] plus any error from
922    /// [`crate::Collection::get`].
923    pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
924        // #83 (b): the one-shot point read goes through the fused
925        // single-pager-lock path (descriptor resolve + primary get
926        // under one guard) instead of `tx.collection()?.get()`, which
927        // takes the pager mutex twice. Observably identical for the
928        // one-shot caller (same snapshot, same `CollectionNotFound`
929        // contract). Namespaced reads fall back to the handle path.
930        self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
931    }
932
933    /// Update the document at `id` via the closure.
934    ///
935    /// # Errors
936    ///
937    /// - [`Error::DocumentNotFound`] if `id` does not exist.
938    /// - As [`Self::transaction`].
939    pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
940    where
941        T: Document,
942        F: FnOnce(&mut T),
943    {
944        self.transaction(|tx| tx.collection::<T>()?.update(id, f))
945    }
946
947    /// Delete the document at `id`.  Returns `true` if it existed.
948    ///
949    /// # Errors
950    ///
951    /// As [`Self::transaction`].
952    pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
953        self.transaction(|tx| tx.collection::<T>()?.delete(id))
954    }
955
956    /// Insert or replace the document at `id`.
957    ///
958    /// # Errors
959    ///
960    /// As [`Self::transaction`].
961    pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
962        self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
963    }
964
965    /// Convenience wrapper around [`crate::Collection::find_unique`]
966    /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
967    /// Runs inside a one-shot read transaction.
968    ///
969    /// `O(log n)`, no collection scan; the lookup walks the named
970    /// index's B-tree directly. Defined only on `Unique` indexes —
971    /// for the other kinds use
972    /// [`Collection::lookup`](crate::Collection::lookup) or
973    /// [`Collection::index_range`](crate::Collection::index_range).
974    ///
975    /// # Examples
976    ///
977    /// ```
978    /// # fn main() -> obj::Result<()> {
979    /// use obj::Db;
980    /// use serde::{Deserialize, Serialize};
981    ///
982    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
983    /// #[obj(collection = "customers_find_unique_doc")]
984    /// struct Customer {
985    ///     #[obj(index = unique)]
986    ///     email: String,
987    /// }
988    ///
989    /// let dir = tempfile::tempdir()?;
990    /// let db = Db::open(dir.path().join("find-unique.obj"))?;
991    /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
992    /// let by_email: Option<Customer> = db
993    ///     .find_unique::<Customer>("email", "ada@example.com")?;
994    /// assert!(by_email.is_some());
995    /// # Ok(())
996    /// # }
997    /// ```
998    ///
999    /// # Errors
1000    ///
1001    /// As [`crate::Collection::find_unique`].
1002    pub fn find_unique<T: Document>(
1003        &self,
1004        index_name: &str,
1005        key: impl Into<obj_core::codec::Dynamic>,
1006    ) -> Result<Option<T>> {
1007        self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
1008    }
1009
1010    /// Construct a fresh M8 [`crate::Query`] builder rooted at this
1011    /// database. The builder borrows `&self` for the build phase;
1012    /// the borrow ends when [`crate::Query::fetch`] returns.
1013    ///
1014    /// Compose with [`Query::filter`](crate::Query::filter),
1015    /// [`Query::limit`](crate::Query::limit),
1016    /// [`Query::sort_by`](crate::Query::sort_by),
1017    /// [`Query::index_range`](crate::Query::index_range). Terminate
1018    /// with [`Query::fetch`](crate::Query::fetch) (for the
1019    /// documents) or [`Query::count`](crate::Query::count) (for the
1020    /// count alone).
1021    ///
1022    /// Mirrors `design.md` § Querying — see the M8 examples in
1023    /// `crates/obj/tests/design_md_queries.rs` for the full surface.
1024    ///
1025    /// # Examples
1026    ///
1027    /// Top-N matching documents by an indexed field, ascending:
1028    ///
1029    /// ```
1030    /// # fn main() -> obj::Result<()> {
1031    /// use obj::Db;
1032    /// use obj_core::codec::Dynamic;
1033    /// use serde::{Deserialize, Serialize};
1034    ///
1035    /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1036    /// enum OrderStatus { Pending, Shipped }
1037    ///
1038    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1039    /// #[obj(collection = "orders_query_doc")]
1040    /// struct Order {
1041    ///     #[obj(index)]
1042    ///     customer_id: u64,
1043    ///     status: OrderStatus,
1044    ///     #[obj(index)]
1045    ///     placed_at: u64,
1046    /// }
1047    ///
1048    /// let dir = tempfile::tempdir()?;
1049    /// let db = Db::open(dir.path().join("queries.obj"))?;
1050    /// for i in 0..20u64 {
1051    ///     let _ = db.insert(Order {
1052    ///         customer_id: i % 3,
1053    ///         status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
1054    ///         placed_at: i * 1_000,
1055    ///     })?;
1056    /// }
1057    ///
1058    /// let pending: Vec<Order> = db
1059    ///     .query::<Order>()
1060    ///     .filter(|o| o.status == OrderStatus::Pending)
1061    ///     .sort_by(|o| Dynamic::U64(o.placed_at))
1062    ///     .limit(5)
1063    ///     .fetch()?;
1064    /// assert!(pending.len() <= 5);
1065    /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
1066    /// # Ok(())
1067    /// # }
1068    /// ```
1069    #[must_use]
1070    pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
1071        crate::Query::new(self)
1072    }
1073
1074    /// Open a read-only typed handle to the collection registered
1075    /// under the runtime `name`, instead of the type's compile-time
1076    /// `T::COLLECTION`.
1077    ///
1078    /// This unlocks the `design.md` § Portability example for
1079    /// attached databases: by passing a namespaced name like
1080    /// `"archive.orders"`, the returned [`crate::Collection`] reads
1081    /// from the database attached under the `"archive"` namespace
1082    /// — see [`Db::attach`].
1083    ///
1084    /// Construction is **infallible**: errors (missing collection,
1085    /// unknown namespace, busy lock) surface at the first method
1086    /// call on the handle, not at the call to `collection(name)`.
1087    /// Each read-only method on the returned handle opens a private
1088    /// [`Db::read_transaction`] and dispatches against the
1089    /// runtime-named collection's catalog row.
1090    ///
1091    /// # Read-only
1092    ///
1093    /// The returned handle rejects every mutating call —
1094    /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
1095    /// all return [`Error::ReadOnly`]. To write into a non-default
1096    /// collection, override [`obj_core::Document::COLLECTION`] on
1097    /// the type itself (compile-time-bound) and use the regular
1098    /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
1099    /// Phase 1B (M11 #94) intentionally limits the runtime accessor
1100    /// to reads — the write-through-runtime-name path requires
1101    /// engine plumbing that is deferred to a later milestone.
1102    ///
1103    /// # Example
1104    ///
1105    /// ```
1106    /// use obj::{Db, Document};
1107    /// use serde::{Deserialize, Serialize};
1108    /// use tempfile::tempdir;
1109    ///
1110    /// #[derive(Debug, Clone, Serialize, Deserialize)]
1111    /// struct Order { customer_id: u64, total_cents: u64 }
1112    ///
1113    /// impl Document for Order {
1114    ///     const COLLECTION: &'static str = "orders";
1115    ///     const VERSION: u32 = 1;
1116    /// }
1117    ///
1118    /// fn run() -> obj::Result<()> {
1119    ///     let dir = tempdir()?;
1120    ///     let archive_path = dir.path().join("archive.obj");
1121    ///     // Populate the archive database first.
1122    ///     {
1123    ///         let archive_db = Db::open(&archive_path)?;
1124    ///         archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
1125    ///     }
1126    ///     // Attach it under a namespace and read via the runtime
1127    ///     // accessor — no need to re-declare `Order` with a
1128    ///     // namespaced `COLLECTION`.
1129    ///     let main_path = dir.path().join("main.obj");
1130    ///     let mut db = Db::open(&main_path)?;
1131    ///     db.attach(&archive_path, "archive")?;
1132    ///     let archived: Vec<Order> = db
1133    ///         .collection::<Order>("archive.orders")
1134    ///         .all()?
1135    ///         .into_iter()
1136    ///         .map(|(_id, doc)| doc)
1137    ///         .collect();
1138    ///     assert_eq!(archived.len(), 1);
1139    ///     Ok(())
1140    /// }
1141    /// # run().unwrap();
1142    /// ```
1143    #[must_use]
1144    pub fn collection<T: Document + Send + 'static>(
1145        &self,
1146        name: impl Into<String>,
1147    ) -> crate::Collection<'_, T> {
1148        crate::Collection::<T>::lazy(self, name.into())
1149    }
1150
1151    /// Convenience shim mirroring `design.md` —
1152    /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1153    /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1154    /// that drives the streaming iterator to exhaustion and
1155    /// collects; if the collection is large enough that peak
1156    /// memory matters, prefer [`Db::iter_all`] directly.
1157    ///
1158    /// Sorting requires materialisation — the comparator needs
1159    /// every key up front. Use [`Db::query`] +
1160    /// [`Query::sort_by`](crate::Query::sort_by) for the
1161    /// top-N-sorted workload; the iterator side has no streaming
1162    /// sorted shape.
1163    ///
1164    /// # Examples
1165    ///
1166    /// ```
1167    /// # fn main() -> obj::Result<()> {
1168    /// use obj::Db;
1169    /// use serde::{Deserialize, Serialize};
1170    ///
1171    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1172    /// struct Order { total_cents: u64 }
1173    ///
1174    /// let dir = tempfile::tempdir()?;
1175    /// let db = Db::open(dir.path().join("all.obj"))?;
1176    /// for i in 0..5u64 {
1177    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1178    /// }
1179    /// let listed: Vec<Order> = db.all::<Order>()?;
1180    /// assert_eq!(listed.len(), 5);
1181    /// # Ok(())
1182    /// # }
1183    /// ```
1184    ///
1185    /// # Errors
1186    ///
1187    /// As [`Db::iter_all`].
1188    pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1189        self.iter_all::<T>()?
1190            .map(|step| step.map(|(_id, doc)| doc))
1191            .collect()
1192    }
1193
1194    /// Write a self-contained `.obj` file at `dest` carrying this
1195    /// database's state at the LSN of an internally-taken reader
1196    /// snapshot.
1197    ///
1198    /// Hot backup — writers continue uninterrupted against the
1199    /// source. Post-snapshot writes are NOT in the destination.
1200    ///
1201    /// Algorithm (per `docs/format.md` § Hot backup):
1202    ///
1203    /// 1. Take a `ReaderSnapshot` against the source pager (pins
1204    ///    `pinned_lsn`).
1205    /// 2. `OpenOptions::create_new(true)` on `dest`.
1206    /// 3. Copy main-file pages `0..page_count` to `dest`.
1207    /// 4. Overlay every frame in the snapshot's frozen WAL view
1208    ///    onto `dest` at the frame's page-id offset.
1209    /// 5. If the snapshot carries a WAL-staged page-0 header,
1210    ///    overlay it (so `dest`'s page-0 reflects the catalog
1211    ///    root / freelist head / page count the snapshot would
1212    ///    have observed).
1213    /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1214    ///    the header CRC32C.
1215    /// 7. `sync_data(SyncMode::Full)` on `dest`.
1216    /// 8. Drop the snapshot (releases the WAL pin).
1217    ///
1218    /// On any mid-backup error the destination file is removed
1219    /// best-effort so a half-written backup does not linger.
1220    ///
1221    /// # Examples
1222    ///
1223    /// ```
1224    /// # fn main() -> obj::Result<()> {
1225    /// use obj::Db;
1226    /// use serde::{Deserialize, Serialize};
1227    ///
1228    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1229    /// #[obj(collection = "notes_backup_doc")]
1230    /// struct Note { body: String }
1231    ///
1232    /// let dir = tempfile::tempdir()?;
1233    /// let src = dir.path().join("src.obj");
1234    /// let dst = dir.path().join("backup.obj");
1235    ///
1236    /// let db = Db::open(&src)?;
1237    /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1238    /// db.backup_to(&dst)?;
1239    ///
1240    /// // The backup is itself a fully-formed obj file. Open it and read.
1241    /// let backup = Db::open(&dst)?;
1242    /// let listed: Vec<Note> = backup.all::<Note>()?;
1243    /// assert_eq!(listed.len(), 1);
1244    /// # Ok(())
1245    /// # }
1246    /// ```
1247    ///
1248    /// # Errors
1249    ///
1250    /// - [`Error::BackupDestinationExists`] if `dest` already
1251    ///   exists.
1252    /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1253    ///   a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1254    /// - [`Error::Io`] on syscall failure during the copy.
1255    pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1256        // #76: a hot backup reads the source MAIN FILE page-by-page
1257        // (`backup::copy_main_file`). The in-process pager `Mutex`
1258        // alone excludes only same-process writers; a writer in a
1259        // SEPARATE OS process can checkpoint pages into the main file
1260        // mid-copy and yield a torn backup. Hold the cross-process
1261        // `WRITER_LOCK` for the whole copy so no external writer can
1262        // mutate the main file under us.
1263        //
1264        // Lock-order invariant (MUST match `WriteTxn::begin`, see
1265        // `docs/concurrency.md`): in-process write-serialization FIRST,
1266        // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1267        // (innermost, acquired and released within the critical
1268        // section). `obj_core::WriteTxn::begin` acquires the first two
1269        // in exactly that order, so routing through it inherits the
1270        // correct ordering and cannot invert against a concurrent
1271        // in-process `WriteTxn`. We make NO writes; the txn is rolled
1272        // back (a no-op restore of the unchanged header) once the copy
1273        // is done. On in-memory / `cross_process_lock = false` envs the
1274        // cross-process guard is absent (`lock_file = None`) and the
1275        // txn degrades to the in-process serialization guard only.
1276        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1277        let result = self.run_backup_under_guard(dest);
1278        // Release both lock layers in the documented order regardless
1279        // of the copy's outcome. A rollback failure (poisoned pager
1280        // mutex) only surfaces when the copy itself succeeded.
1281        let unlock = guard.rollback();
1282        result.and(unlock)
1283    }
1284
1285    /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1286    /// reader snapshot, and run the backup copy. Factored out of
1287    /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1288    /// is unambiguous and the function stays within the power-of-ten
1289    /// 60-line budget (Rule 4).
1290    fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1291        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1292            kind: obj_core::LockKind::WriterInProcess,
1293        })?;
1294        let snapshot = pager.reader_snapshot()?;
1295        obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1296        // Snapshot drops at end of scope; the pin is released and
1297        // any deferred checkpoint may proceed.
1298        drop(snapshot);
1299        drop(pager);
1300        Ok(())
1301    }
1302
1303    /// Fold committed WAL pages into the main `.obj` file and reset
1304    /// the WAL back to its 64-byte header.
1305    ///
1306    /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1307    /// pager lock the same way [`Self::backup_to`] does, then calls
1308    /// the pager checkpoint. After it returns, the committed records
1309    /// live in the main file rather than the `-wal` sidecar, and the
1310    /// WAL is truncated to header-only.
1311    ///
1312    /// # Deferred / no-op behavior
1313    ///
1314    /// - If a live MVCC reader has pinned an LSN below the end of the
1315    ///   WAL, the checkpoint is **deferred** (a safe no-op) so the
1316    ///   reader's frames are not reclaimed out from under it.
1317    /// - If there is nothing to fold (no committed WAL frames), the
1318    ///   call is a harmless no-op.
1319    ///
1320    /// This is the reusable engine entry point behind the Python
1321    /// `Db.checkpoint()` binding and a future checkpoint-on-close
1322    /// path (issue #5).
1323    ///
1324    /// # Errors
1325    ///
1326    /// - [`Error::ReadOnly`] if the database was opened read-only.
1327    /// - [`Error::Busy`] if the pager lock is poisoned.
1328    /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1329    ///   (e.g. [`Error::Io`] on syscall failure).
1330    #[cfg_attr(
1331        feature = "tracing",
1332        tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1333    )]
1334    pub fn checkpoint(&self) -> Result<()> {
1335        if self.readonly {
1336            return Err(Error::ReadOnly {
1337                operation: "checkpoint",
1338            });
1339        }
1340        // #76: checkpoint folds committed WAL frames into the main
1341        // file and rotates the WAL salt. Both touch the main file and
1342        // the on-disk header. Taken under only the in-process pager
1343        // `Mutex` (as before), a writer in a SEPARATE process could
1344        // run its own commit/checkpoint concurrently and corrupt the
1345        // salt-rotation handshake or interleave main-file writes. Hold
1346        // the cross-process `WRITER_LOCK` for the duration, in the
1347        // documented lock order (in-process serialization → cross-
1348        // process writer lock → pager mutex; see `WriteTxn::begin` and
1349        // `Self::backup_to`). We make no user writes; the surrounding
1350        // txn is rolled back afterwards. The rollback's
1351        // `restore_header_snapshot` only rewrites `root_catalog` /
1352        // `freelist_head` / `page_count` (unchanged by checkpoint) and
1353        // preserves the freshly-rotated `wal_salt`, so it is a no-op
1354        // with respect to the checkpoint's durable effects.
1355        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1356        let result = self.run_checkpoint_under_guard();
1357        let unlock = guard.rollback();
1358        result.and(unlock)
1359    }
1360
1361    /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1362    /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1363    /// cross-process lock guard's lifetime is unambiguous.
1364    fn run_checkpoint_under_guard(&self) -> Result<()> {
1365        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1366            kind: obj_core::LockKind::WriterInProcess,
1367        })?;
1368        pager.checkpoint()?;
1369        drop(pager);
1370        Ok(())
1371    }
1372
1373    /// Streaming iterator over every `(Id, T)` pair in the
1374    /// collection. The returned [`IterAll`] holds a read transaction
1375    /// — and therefore a pinned reader snapshot — for its entire
1376    /// lifetime; the borrow on `self` ends when the iterator is
1377    /// dropped.
1378    ///
1379    /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1380    /// errors surface as `Some(Err(_))` rather than ending the
1381    /// iteration; the caller decides whether to propagate or
1382    /// continue.
1383    ///
1384    /// Peak memory does NOT scale with collection size — the
1385    /// iterator's internal buffer is fixed at
1386    /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the design.md
1387    /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1388    ///
1389    /// `Query::fetch` is sort-compatible (sort requires
1390    /// materialisation); `iter_all` is NOT — there is no streaming
1391    /// shape for sort because the comparator needs every key
1392    /// up front. Use `Query::sort_by` + `fetch` for the sorted
1393    /// workload; use `iter_all` for the unsorted large-scan
1394    /// workload.
1395    ///
1396    /// # Examples
1397    ///
1398    /// Streaming a small collection and folding into a running sum.
1399    /// The iterator's peak memory stays bounded regardless of how
1400    /// many documents the collection holds:
1401    ///
1402    /// ```
1403    /// # fn main() -> obj::Result<()> {
1404    /// use obj::Db;
1405    /// use serde::{Deserialize, Serialize};
1406    ///
1407    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1408    /// struct Order { total_cents: u64 }
1409    ///
1410    /// let dir = tempfile::tempdir()?;
1411    /// let db = Db::open(dir.path().join("iter.obj"))?;
1412    /// for i in 0..5u64 {
1413    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
1414    /// }
1415    ///
1416    /// let mut total: u64 = 0;
1417    /// for step in db.iter_all::<Order>()? {
1418    ///     let (_id, doc) = step?;
1419    ///     total = total
1420    ///         .checked_add(doc.total_cents)
1421    ///         .ok_or(obj::Error::InvalidArgument("overflow"))?;
1422    /// }
1423    /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1424    /// # Ok(())
1425    /// # }
1426    /// ```
1427    ///
1428    /// # Errors
1429    ///
1430    /// - As [`Db::read_transaction`] (construction-time).
1431    /// - [`Error::CollectionNotFound`] if the collection is not yet
1432    ///   registered at the snapshot's pinned LSN.
1433    /// - Per-step iteration may yield `Some(Err(_))` for pager,
1434    ///   B-tree, or codec failures.
1435    pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1436        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1437        let txn = ReadTxn::new(inner);
1438        // Resolve the descriptor up front so an absent collection
1439        // surfaces at construction (matching `Db::all`'s pre-M8
1440        // contract). The handle's lifetime is bound to `txn`; we
1441        // drop the handle immediately and stash the descriptor.
1442        let descriptor = {
1443            let coll = txn.collection::<T>()?;
1444            coll.descriptor().clone()
1445        };
1446        Ok(IterAll {
1447            txn,
1448            descriptor,
1449            buffer: VecDeque::new(),
1450            last_emitted_key: None,
1451            finished: false,
1452            _phantom: PhantomData,
1453        })
1454    }
1455}
1456
1457/// Streaming iterator returned by [`Db::iter_all`].
1458///
1459/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1460/// is consistent with the snapshot pinned at construction. Yields
1461/// `Result<(Id, T)>` one entry at a time; refills its internal
1462/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1463/// pager-lock acquisition, so peak memory stays bounded at a small
1464/// constant regardless of the collection's size — power-of-ten
1465/// Rule 3.
1466///
1467/// Construction errors surface at the [`Db::iter_all`] call site;
1468/// per-step errors (pager, B-tree, codec) surface as
1469/// `Some(Err(_))` during iteration and do NOT terminate it — the
1470/// caller decides whether to continue.
1471// Note: `IterAll<T>` is deliberately outside the `serde` surface
1472// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1473// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1474// neither of which is serializable in a meaningful way. Users who
1475// need an off-line representation of iteration output should
1476// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1477// resulting `Vec<T>` themselves.
1478pub struct IterAll<'db, T> {
1479    /// Owns the snapshot pin for the iterator's lifetime.
1480    txn: ReadTxn<'db>,
1481    /// Cached primary-tree root + collection id (`Document::decode`
1482    /// needs the collection id to validate the per-doc header).
1483    descriptor: CollectionDescriptor,
1484    /// Pre-decoded buffer of upcoming entries.
1485    buffer: VecDeque<Result<(Id, T)>>,
1486    /// Resumption marker — `Excluded(last_emitted_key)` is the
1487    /// start bound of the next refill.
1488    last_emitted_key: Option<Vec<u8>>,
1489    /// `true` once the underlying B-tree iterator returned no more
1490    /// entries; subsequent `next` calls return `None`.
1491    finished: bool,
1492    /// Track the `T` parameter without owning a value.
1493    _phantom: PhantomData<fn() -> T>,
1494}
1495
1496impl<T> std::fmt::Debug for IterAll<'_, T> {
1497    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1498        f.debug_struct("IterAll")
1499            .field("collection_id", &self.descriptor.collection_id)
1500            .field("buffer_len", &self.buffer.len())
1501            .field("finished", &self.finished)
1502            .finish_non_exhaustive()
1503    }
1504}
1505
1506impl<T: Document + Send + 'static> IterAll<'_, T> {
1507    /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1508    /// entries, resuming from `last_emitted_key`. Stores per-doc
1509    /// decode errors as `Err` in the buffer so the caller can
1510    /// observe them via `next()` without aborting iteration.
1511    ///
1512    /// Power-of-ten Rule 7: every fallible operation is either
1513    /// captured into the buffer or returned up-front via `?` (which
1514    /// propagates the lock-acquisition / B-tree-open failures
1515    /// before the buffer is touched).
1516    fn refill(&mut self) -> Result<()> {
1517        // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1518        // NOT keep an immutable borrow of `self.txn` alive across
1519        // the buffer-mutation calls below.
1520        let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1521        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1522            kind: obj_core::LockKind::WriterInProcess,
1523        })?;
1524        let root_pid = PageId::new(self.descriptor.primary_root)
1525            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1526        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1527        let start = match &self.last_emitted_key {
1528            Some(k) => Bound::Excluded(k.clone()),
1529            None => Bound::Unbounded,
1530        };
1531        let collection_id = self.descriptor.collection_id;
1532        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1533        let mut yielded: usize = 0;
1534        let mut last_key: Option<Vec<u8>> = None;
1535        let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1536        for step in iter {
1537            if yielded >= ITER_ALL_BATCH {
1538                break;
1539            }
1540            yielded = yielded
1541                .checked_add(1)
1542                .ok_or(Error::BTreeInvariantViolated {
1543                    reason: "iter_all batch counter overflow",
1544                })?;
1545            buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1546        }
1547        if yielded < ITER_ALL_BATCH {
1548            self.finished = true;
1549        }
1550        // The lock is held until `pager` drops at function end — but
1551        // by the time we mutate `self.buffer` / `self.last_emitted_key`
1552        // below, the `iter` (which borrowed `pager`) is already
1553        // dropped (its scope ended). Move the staged batch over.
1554        drop(pager);
1555        self.buffer.extend(batch);
1556        if let Some(k) = last_key {
1557            self.last_emitted_key = Some(k);
1558        }
1559        Ok(())
1560    }
1561}
1562
1563/// Process one B-tree iterator step into the staged `batch`. Decode
1564/// errors and corruption errors are captured as `Err` entries so
1565/// they surface via `next()` rather than aborting the refill —
1566/// power-of-ten Rule 7.
1567///
1568/// Free function (rather than `&mut self`) so the caller can keep
1569/// the pager lock open across multiple buffer-pushes without the
1570/// borrow checker tripping on a second `&mut self.buffer` borrow.
1571fn buffer_one_entry<T: Document>(
1572    batch: &mut VecDeque<Result<(Id, T)>>,
1573    last_key: &mut Option<Vec<u8>>,
1574    collection_id: u32,
1575    step: Result<(Vec<u8>, Vec<u8>)>,
1576) {
1577    let (key, value) = match step {
1578        Ok(kv) => kv,
1579        Err(e) => {
1580            batch.push_back(Err(e));
1581            return;
1582        }
1583    };
1584    let Some(id) = Id::from_be_bytes(&key) else {
1585        batch.push_back(Err(Error::InvalidArgument(
1586            "primary B-tree key is not an Id",
1587        )));
1588        return;
1589    };
1590    *last_key = Some(key);
1591    let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1592    batch.push_back(decoded.map(|doc| (id, doc)));
1593}
1594
1595impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1596    type Item = Result<(Id, T)>;
1597
1598    fn next(&mut self) -> Option<Self::Item> {
1599        if let Some(item) = self.buffer.pop_front() {
1600            return Some(item);
1601        }
1602        if self.finished {
1603            return None;
1604        }
1605        if let Err(e) = self.refill() {
1606            // A lock / open failure surfaces here ONCE and then
1607            // `finished` is flipped so subsequent `next` calls
1608            // terminate cleanly. The error is surfaced to the
1609            // caller via `Some(Err(_))` — power-of-ten Rule 7.
1610            self.finished = true;
1611            return Some(Err(e));
1612        }
1613        self.buffer.pop_front()
1614    }
1615}
1616
1617/// Split a possibly-namespaced collection name into its
1618/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1619/// only; downstream collection names may contain further dots.
1620///
1621/// `"users"` → `(None, "users")`.
1622/// `"archive.orders"` → `(Some("archive"), "orders")`.
1623/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1624#[must_use]
1625pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1626    match name.find('.') {
1627        Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1628        None => (None, name),
1629    }
1630}
1631
1632/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1633/// concurrent reader / single-writer workloads.  The thread-safety
1634/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1635const _: () = {
1636    fn assert_send_sync<T: Send + Sync>() {}
1637    let _ = assert_send_sync::<Db>;
1638};
1639
1640/// Translate the first failure in `report` (if any) into the
1641/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1642/// open-time fast check so the caller sees `Err(Error::Corruption {
1643/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1644/// in the returned error is the locus of the failure when one is
1645/// available; for failures whose locus is a non-page (e.g. an
1646/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1647/// catalog root page-id is used as a stand-in.
1648fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1649    let first = report.failures.first()?;
1650    let err = match first {
1651        obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1652        | obj_core::IntegrityFailure::OrphanPage { page_id }
1653        | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1654        | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1655        | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1656        | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1657        | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1658            Error::Corruption { page_id: *page_id }
1659        }
1660        obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1661            Error::BTreeDepthExceeded { limit: *limit }
1662        }
1663        // The remaining variants — OrphanIndexEntry,
1664        // MissingIndexEntry, and any future `#[non_exhaustive]`
1665        // additions — are never emitted by `quick_check` (which
1666        // walks the catalog tree only, not the per-collection
1667        // primary or index trees). The defensive fallback is a
1668        // coarse `Corruption { page_id: 0 }` so the open-time
1669        // check still fails closed if the obj-core layer ever
1670        // grows a new fast-check failure mode.
1671        _ => Error::Corruption { page_id: 0 },
1672    };
1673    Some(err)
1674}