obj/db.rs
1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
5//! § API and pinned in
6//! [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
7//! § Db public API contract.
8
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::marker::PhantomData;
11use std::ops::Bound;
12use std::path::Path;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use obj_core::btree::BTree;
17use obj_core::pager::page::PageId;
18use obj_core::pager::{lock_path_for, Pager};
19use obj_core::platform::FileHandle;
20use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
21
22use crate::config::Config;
23use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
24
25/// Per-batch refill size for [`IterAll`]. The iterator yields one
26/// document at a time to the caller, but internally fetches the
27/// primary B-tree in `BATCH` chunks so the per-step pager-lock
28/// acquisition cost amortises over many `next` calls. Power-of-ten
29/// Rule 3: the buffer is fixed-size (constant 256 entries — at
30/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
31/// with the collection's total size.
32const ITER_ALL_BATCH: usize = 256;
33
34/// The embedded document database.
35///
36/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
37/// concurrent reader / single-writer access. See
38/// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
39/// § "Concurrent readers, one writer".
40///
41/// At M6 the public `Db` is hard-typed against
42/// `obj_core::FileHandle`. A future refactor may make it generic
43/// over `F: FileBackend` so fault-injection harnesses can build on
44/// the same API; today the test helpers reach for the lower-level
45/// `obj-core` building blocks instead.
46pub struct Db {
47 pub(crate) env: Arc<TxnEnv<FileHandle>>,
48 pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
49 pub(crate) readonly: bool,
50 pub(crate) busy_timeout: Duration,
51 /// Per-process cache of `(collection, version)` keys whose
52 /// `Document::indexes()` reconciliation has already run. M7
53 /// #57: reconciliation is idempotent but a catalog walk +
54 /// declaration cycle is non-trivial — caching ensures only the
55 /// first `WriteTxn::collection::<T>()` call per process per
56 /// `(collection, version)` pays the cost. #130: the version is part
57 /// of the key so a later schema version that ADDS an index still
58 /// reconciles (the name-only key skipped it, leaving the new index
59 /// never `Active`).
60 pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
61 /// M11 #93: registry of attached read-only databases keyed by
62 /// namespace. Populated by [`Db::attach`]; consulted by
63 /// [`Db::collection_namespace`] and by
64 /// [`Db::read_transaction`] when it pins per-attached
65 /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
66 /// `&mut self` while live read transactions hold borrows into
67 /// the registry; the mutex coordinates the two.
68 pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
69 /// #83 (c): published length of `attached`, mutated only while the
70 /// `attached` mutex is held. A relaxed load lets the hot read path
71 /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
72 /// no databases are attached (the overwhelmingly common case).
73 pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
74}
75
76impl std::fmt::Debug for Db {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("Db")
79 .field("readonly", &self.readonly)
80 .field("busy_timeout", &self.busy_timeout)
81 .finish_non_exhaustive()
82 }
83}
84
85// ---------- FFI plumbing ------------------------------------------
86//
87// The accessors below expose the shareable internals of `Db` so the
88// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
89// iterator handles without re-implementing the lifecycle. They are
90// marked `#[doc(hidden)]` because user-Rust code should reach for
91// the typed `Db::transaction` / `Db::read_transaction` API; the
92// raw accessors exist so a sibling FFI / native-host crate can
93// build its own self-contained handle types.
94impl Db {
95 /// Shared environment Arc — pager + cross-process lock file.
96 ///
97 /// Used by [`libobj`](../../libobj/index.html) to build owned
98 /// transaction handles whose lifetime extends past a single
99 /// `Db::transaction` closure call.
100 #[doc(hidden)]
101 #[must_use]
102 pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
103 Arc::clone(&self.env)
104 }
105
106 /// Shared catalog Arc.
107 ///
108 /// Used by [`libobj`](../../libobj/index.html) for the same
109 /// reason as [`Self::env_arc`].
110 #[doc(hidden)]
111 #[must_use]
112 pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
113 Arc::clone(&self.catalog)
114 }
115
116 /// Shared per-process reconciliation cache Arc.
117 ///
118 /// Used by [`libobj`](../../libobj/index.html) for the same
119 /// reason as [`Self::env_arc`].
120 #[doc(hidden)]
121 #[must_use]
122 pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
123 Arc::clone(&self.reconciled)
124 }
125
126 /// Busy-lock timeout configured at open time.
127 #[doc(hidden)]
128 #[must_use]
129 pub fn busy_timeout(&self) -> Duration {
130 self.busy_timeout
131 }
132}
133
134impl Db {
135 /// Open or create a file-backed database at `path` with default
136 /// configuration.
137 ///
138 /// Creates the file if absent; reopens otherwise. A `Db` is
139 /// `Send + Sync` — share across threads via `Arc<Db>` for the
140 /// concurrent-reader / single-writer workload documented in
141 /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md).
142 ///
143 /// For an ephemeral database use [`Db::memory`]; for a
144 /// read-only handle that coexists with another process's writer
145 /// use [`Db::open_readonly`]; for custom durability /
146 /// cache / lock knobs use [`Db::open_with`] + [`Config`].
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// # fn main() -> obj::Result<()> {
152 /// use obj::Db;
153 ///
154 /// let dir = tempfile::tempdir()?;
155 ///
156 /// // File-backed. Creates the file if absent; reopens otherwise.
157 /// let _db = Db::open(dir.path().join("app.obj"))?;
158 ///
159 /// // In-memory. No persistence, no file locks. Useful for tests.
160 /// let _mem = Db::memory()?;
161 ///
162 /// // Read-only. Coexists safely with a writer in another process.
163 /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
164 /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
165 /// # Ok(())
166 /// # }
167 /// ```
168 ///
169 /// # Errors
170 ///
171 /// Returns the underlying [`Error`] from
172 /// [`obj_core::pager::Pager::open`] on syscall or format
173 /// failure.
174 #[cfg_attr(
175 feature = "tracing",
176 tracing::instrument(
177 name = "db.open",
178 level = "info",
179 skip_all,
180 fields(path = %path.as_ref().display()),
181 )
182 )]
183 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
184 Self::open_with(path, Config::default())
185 }
186
187 /// Open or create a file-backed database with `config`.
188 ///
189 /// # Errors
190 ///
191 /// As [`Db::open`].
192 // Takes `Config` by value: this is the frozen builder-handoff
193 // signature (the caller's `Config::default().sync_mode(..)...`
194 // chain ends here). `Config` is no longer `Copy` (issue #17), so
195 // clippy now sees the by-value argument as only borrowed in the
196 // body — but the by-value convention is part of the 1.0 surface
197 // and must not change. Power-of-ten Rule 10: documented allow.
198 #[allow(clippy::needless_pass_by_value)]
199 #[cfg_attr(
200 feature = "tracing",
201 tracing::instrument(
202 name = "db.open",
203 level = "info",
204 skip_all,
205 fields(path = %path.as_ref().display()),
206 )
207 )]
208 pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
209 let path_buf = path.as_ref().to_path_buf();
210 // Issue #31: the pager `Config` is no longer `Copy` (its
211 // `encryption_key` zeroizes on drop), so move it out of
212 // `config` rather than copying it — `from_parts` only reads
213 // the remaining scalar fields. `mem::take` leaves a default
214 // (keyless) pager `Config` behind, which is never read again.
215 let pager_config = std::mem::take(&mut config.pager);
216 let pager = Pager::open(&path_buf, pager_config)?;
217 // Issue #1: cross-process locks anchor against a dedicated
218 // `<db>.obj-lock` sidecar file. Keeping the lock byte out
219 // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
220 // Windows once the DB grows past whatever offset the
221 // lock byte would otherwise sit at (`LockFileEx` is
222 // mandatory). The sidecar is left in place across opens;
223 // deleting it would race two openers into two different
224 // inodes and miss each other's exclusion.
225 let lock_file = if config.cross_process_lock {
226 let lock_path = lock_path_for(&path_buf);
227 let handle = FileHandle::open_or_create(&lock_path)?;
228 // Materialise the locked byte range as real file
229 // content. POSIX OFD and Windows `LockFileEx` both
230 // permit locking past EOF, but a non-empty sidecar
231 // is the most conservative choice across kernels
232 // (and lets the same offsets work on every platform).
233 // 128 bytes covers WRITER_LOCK (96) and the full
234 // READER_LOCK_RANGE (97..128).
235 handle.set_len(128)?;
236 Some(Arc::new(handle))
237 } else {
238 None
239 };
240 Self::from_parts(pager, lock_file, &config)
241 }
242
243 /// Open a fresh in-memory database. No persistence, no file
244 /// locks. Useful for unit tests and ephemeral workloads.
245 ///
246 /// # Errors
247 ///
248 /// Returns [`Error::InvalidArgument`] only if `config` has
249 /// zero cache frames.
250 pub fn memory() -> Result<Self> {
251 Self::memory_with(Config::default())
252 }
253
254 /// As [`Db::memory`] with a caller-supplied [`Config`].
255 ///
256 /// # Errors
257 ///
258 /// As [`Db::memory`].
259 // By-value `Config` is the frozen builder-handoff signature; see
260 // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
261 // allow (issue #17 dropped `Copy`, which made the lint fire).
262 #[allow(clippy::needless_pass_by_value)]
263 pub fn memory_with(mut config: Config) -> Result<Self> {
264 // Issue #31: move the (no-longer-`Copy`) pager `Config` out
265 // rather than copying it; `from_parts` only reads the
266 // remaining scalar fields. See `open_with` for the rationale.
267 let pager_config = std::mem::take(&mut config.pager);
268 let pager = Pager::memory(pager_config)?;
269 Self::from_parts(pager, None, &config)
270 }
271
272 /// Open the database at `path` in read-only mode. The
273 /// resulting `Db` rejects every mutating operation with
274 /// `Err(Error::ReadOnly { ... })`. Coexists safely with a
275 /// writer in another process via the cross-process reader
276 /// lock.
277 ///
278 /// # Errors
279 ///
280 /// As [`Db::open`].
281 pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
282 let config = Config {
283 readonly: true,
284 ..Config::default()
285 };
286 Self::open_with(path, config)
287 }
288
289 fn from_parts(
290 mut pager: Pager<FileHandle>,
291 lock_file: Option<Arc<FileHandle>>,
292 config: &Config,
293 ) -> Result<Self> {
294 // #64: catalog init (and its `tree.insert` → `alloc_page` /
295 // `set_root_catalog` chain) mutates the file header. Header
296 // mutations now ride the WAL via `stage_or_write_header`,
297 // which requires an open Pager txn so the staged page-0 frame
298 // commits atomically with the B-tree pages it depends on.
299 // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
300 // a `Db::open` against a fresh file is durable on return
301 // (matching the pre-#64 contract). `open_or_init` on an
302 // already-initialised database is read-only and the wrapped
303 // `commit()` is a no-op (`Pager::commit` short-circuits on an
304 // empty pending set + clean `header_dirty`).
305 pager.begin_txn();
306 let init = Catalog::open_or_init(&mut pager);
307 let catalog = match init {
308 Ok(c) => {
309 let commit_result = pager.commit();
310 pager.end_txn();
311 commit_result?;
312 c
313 }
314 Err(e) => {
315 pager.end_txn();
316 return Err(e);
317 }
318 };
319 // M11 #91: lightweight open-time integrity check. Runs the
320 // catalog-portion of `integrity_check` and surfaces any
321 // detected failure as the strongest available error so the
322 // caller decides whether to attempt repair or abort. Opt out
323 // via `Config::skip_open_check(true)`.
324 if !config.skip_open_check {
325 let report = obj_core::integrity::quick_check(&mut pager)?;
326 if let Some(err) = first_failure_as_error(&report) {
327 return Err(err);
328 }
329 }
330 Ok(Self {
331 env: Arc::new(TxnEnv::new(pager, lock_file)),
332 catalog: Arc::new(Mutex::new(catalog)),
333 readonly: config.readonly,
334 busy_timeout: config.busy_timeout,
335 reconciled: Arc::new(Mutex::new(HashSet::new())),
336 attached: Arc::new(Mutex::new(HashMap::new())),
337 attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
338 })
339 }
340
341 /// Run a closure inside a write transaction.
342 ///
343 /// Begins a [`WriteTxn`], runs the closure with `&mut tx`. If
344 /// the closure returns `Ok(r)`, the transaction is committed and
345 /// `Ok(r)` is returned. If the closure returns `Err(e)`, the
346 /// transaction is rolled back and `Err(e)` is returned. A
347 /// panic inside the closure unwinds with an implicit rollback
348 /// via the `WriteTxn` `Drop` impl. See
349 /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md)
350 /// for the lock-acquisition contract.
351 ///
352 /// # Examples
353 ///
354 /// Atomic batch — both inserts commit together or not at all:
355 ///
356 /// ```
357 /// # fn main() -> obj::Result<()> {
358 /// use obj::Db;
359 /// use serde::{Deserialize, Serialize};
360 ///
361 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
362 /// struct Order {
363 /// customer_id: u64,
364 /// total_cents: u64,
365 /// }
366 ///
367 /// let dir = tempfile::tempdir()?;
368 /// let db = Db::open(dir.path().join("txn.obj"))?;
369 ///
370 /// let (a, b) = db.transaction(|tx| {
371 /// let coll = tx.collection::<Order>()?;
372 /// let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
373 /// let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
374 /// Ok((a, b))
375 /// })?;
376 /// assert_ne!(a, b, "freshly-allocated ids are distinct");
377 /// # Ok(())
378 /// # }
379 /// ```
380 ///
381 /// Returning `Err(_)` rolls every staged write back; the `Err`
382 /// the closure returns is the `Err` the caller sees:
383 ///
384 /// ```
385 /// # fn main() -> obj::Result<()> {
386 /// use obj::{Db, Error};
387 /// use serde::{Deserialize, Serialize};
388 ///
389 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
390 /// struct Order { total_cents: u64 }
391 ///
392 /// let dir = tempfile::tempdir()?;
393 /// let db = Db::open(dir.path().join("rollback.obj"))?;
394 /// let id = db.insert(Order { total_cents: 10 })?;
395 ///
396 /// let outcome: obj::Result<()> = db.transaction(|tx| {
397 /// let coll = tx.collection::<Order>()?;
398 /// coll.update(id, |o| { o.total_cents = 99_999; })?;
399 /// Err(Error::InvalidArgument("synthetic abort"))
400 /// });
401 /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
402 ///
403 /// let after: Order = db
404 /// .get::<Order>(id)?
405 /// .ok_or(Error::InvalidArgument("just inserted"))?;
406 /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
407 /// # Ok(())
408 /// # }
409 /// ```
410 ///
411 /// # Errors
412 ///
413 /// - [`Error::ReadOnly`] if the database was opened read-only.
414 /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
415 /// - Any error the closure returns.
416 #[cfg_attr(
417 feature = "tracing",
418 tracing::instrument(name = "db.transaction", level = "info", skip_all)
419 )]
420 pub fn transaction<R, F>(&self, body: F) -> Result<R>
421 where
422 F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
423 {
424 if self.readonly {
425 return Err(Error::ReadOnly {
426 operation: "transaction",
427 });
428 }
429 #[cfg(feature = "tracing")]
430 tracing::debug!("begin");
431 let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
432 let mut tx = WriteTxn::new(
433 inner,
434 Arc::clone(&self.catalog),
435 Arc::clone(&self.reconciled),
436 );
437 match body(&mut tx) {
438 Ok(value) => {
439 tx.commit()?;
440 #[cfg(feature = "tracing")]
441 tracing::debug!("commit");
442 Ok(value)
443 }
444 Err(e) => {
445 // Best-effort rollback; surface the closure's
446 // error. Refresh the in-memory catalog after
447 // rollback so its B-tree root state matches the
448 // file's `root_catalog` header field — catalog
449 // mutations write the header directly (not through
450 // the WAL) so a rollback leaves the header pointing
451 // at the most-recent in-memory root, which is
452 // exactly what re-opening the catalog will pick up.
453 let _ = tx.rollback();
454 let _ = self.refresh_catalog();
455 #[cfg(feature = "tracing")]
456 tracing::debug!("rollback");
457 Err(e)
458 }
459 }
460 }
461
462 /// Re-open the in-memory `Catalog` handle from the pager. Used
463 /// after a transaction rollback to discard the catalog's in-
464 /// memory `next_collection_id` / `tree.root` state that may
465 /// have advanced during the rolled-back closure.
466 fn refresh_catalog(&self) -> Result<()> {
467 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
468 kind: obj_core::LockKind::WriterInProcess,
469 })?;
470 let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
471 let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
472 kind: obj_core::LockKind::WriterInProcess,
473 })?;
474 *existing = fresh;
475 Ok(())
476 }
477
478 /// Run a closure inside a read transaction. See
479 /// [`Self::transaction`] for the closure shape and atomicity
480 /// contract; reads inside `body` observe a consistent
481 /// snapshot of the database.
482 ///
483 /// Every read inside the closure observes a single consistent
484 /// snapshot — the snapshot is pinned at the moment the closure
485 /// begins. Concurrent writers do not affect what the closure
486 /// sees.
487 ///
488 /// # Examples
489 ///
490 /// Two reads inside one `read_transaction` see the same value
491 /// even if a writer commits in between:
492 ///
493 /// ```
494 /// # fn main() -> obj::Result<()> {
495 /// use obj::Db;
496 /// use serde::{Deserialize, Serialize};
497 ///
498 /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
499 /// struct Order { total_cents: u64 }
500 ///
501 /// let dir = tempfile::tempdir()?;
502 /// let db = Db::open(dir.path().join("read.obj"))?;
503 /// let id = db.insert(Order { total_cents: 10 })?;
504 ///
505 /// let (a, b) = db.read_transaction(|tx| {
506 /// let coll = tx.collection::<Order>()?;
507 /// let a = coll.get(id)?;
508 /// let b = coll.get(id)?;
509 /// Ok((a, b))
510 /// })?;
511 /// assert_eq!(a, b);
512 /// # Ok(())
513 /// # }
514 /// ```
515 ///
516 /// # Errors
517 ///
518 /// - [`Error::Busy`] if the reader lock could not be acquired.
519 /// - Any error the closure returns.
520 #[cfg_attr(
521 feature = "tracing",
522 tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
523 )]
524 pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
525 where
526 F: FnOnce(&ReadTxn<'_>) -> Result<R>,
527 {
528 #[cfg(feature = "tracing")]
529 tracing::debug!("begin");
530 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
531 // M11 #93: pin one snapshot per attached database so reads
532 // through the closure observe a consistent view per file.
533 let attached_contexts = self.pin_attached_snapshots()?;
534 let tx = ReadTxn::with_attached(inner, attached_contexts);
535 let result = body(&tx);
536 #[cfg(feature = "tracing")]
537 tracing::debug!("commit");
538 result
539 }
540
541 /// Snapshot every attached database into a per-namespace
542 /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
543 /// pin; dropping the returned map releases every pin.
544 fn pin_attached_snapshots(
545 &self,
546 ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
547 // #83 (c): the common case is no attached databases. A single
548 // relaxed load of `attached_len` lets the empty path skip the
549 // `self.attached` mutex acquire entirely (`with_capacity(0)`
550 // already does not allocate, so the mutex is the only cost).
551 //
552 // Correctness under concurrent detach: the map produced here
553 // feeds only namespaced read dispatch in `open_readonly_named`
554 // (`<ns>.<tail>` → attached snapshot). With zero attachments no
555 // namespace can resolve, so an empty map is observably the same
556 // as the locked build. `attached_len` is mutated only while the
557 // `attached` mutex is held (in `attach` / `detach`), so a `0`
558 // observed here means "no attachment is committed to the map",
559 // matching what a lock-and-read would have seen at that instant.
560 // A concurrent attach that has not yet published its count is
561 // exactly an attach ordered *after* this txn began — invisible
562 // by design, same as the cross-process snapshot semantics.
563 if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
564 return Ok(std::collections::HashMap::new());
565 }
566 let registry = self.attached.lock().map_err(|_| Error::Busy {
567 kind: obj_core::LockKind::WriterInProcess,
568 })?;
569 let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
570 std::collections::HashMap::with_capacity(registry.len());
571 for (namespace, attached) in registry.iter() {
572 let env = Arc::clone(&attached.env);
573 let snapshot = {
574 let mut pager = env.pager().lock().map_err(|_| Error::Busy {
575 kind: obj_core::LockKind::WriterInProcess,
576 })?;
577 pager.reader_snapshot()?
578 };
579 out.insert(
580 namespace.clone(),
581 crate::txn::AttachedReadCtx { env, snapshot },
582 );
583 }
584 Ok(out)
585 }
586
587 /// Pin one [`crate::txn::AttachedReadCtx`] for the single attachment
588 /// registered under `namespace`. Used by [`Self::dump_raw`]'s
589 /// namespaced full-scan path: it needs exactly one attached env +
590 /// pinned snapshot, not the whole per-namespace map
591 /// [`Self::pin_attached_snapshots`] builds.
592 ///
593 /// The snapshot is pinned for the returned context's lifetime; the
594 /// caller (the `DumpIter`) owns the context, so the pin holds for
595 /// the iterator's whole life and a concurrent `detach` cannot
596 /// invalidate the in-flight scan.
597 ///
598 /// # Errors
599 ///
600 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is not
601 /// attached on this handle.
602 /// - [`Error::Busy`] if the registry / pager mutex is poisoned.
603 pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
604 let env = {
605 let registry = self.attached.lock().map_err(|_| Error::Busy {
606 kind: obj_core::LockKind::WriterInProcess,
607 })?;
608 let attached =
609 registry
610 .get(namespace)
611 .ok_or_else(|| Error::CollectionNamespaceUnknown {
612 namespace: namespace.to_owned(),
613 })?;
614 Arc::clone(&attached.env)
615 };
616 let snapshot = {
617 let mut pager = env.pager().lock().map_err(|_| Error::Busy {
618 kind: obj_core::LockKind::WriterInProcess,
619 })?;
620 pager.reader_snapshot()?
621 };
622 Ok(crate::txn::AttachedReadCtx { env, snapshot })
623 }
624
625 /// Attach the database at `path` under `namespace`. The
626 /// attached file is opened read-only; collections in the
627 /// attached file become visible to subsequent
628 /// `Db::collection::<T>()` calls (and the one-shot per-op API
629 /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
630 /// form `<namespace>.<collection_name>`.
631 ///
632 /// Writes against namespaced collections return
633 /// [`Error::AttachedDatabaseIsReadOnly`].
634 ///
635 /// Each attached database gets its own snapshot pinned at
636 /// read-transaction begin; [`Db::detach`] removes the registry
637 /// entry but in-flight reads complete against their pinned
638 /// snapshot.
639 ///
640 /// # Examples
641 ///
642 /// ```
643 /// # fn main() -> obj::Result<()> {
644 /// use obj::Db;
645 /// use serde::{Deserialize, Serialize};
646 ///
647 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
648 /// #[obj(collection = "orders_attach_doc")]
649 /// struct Order { total_cents: u64 }
650 ///
651 /// // Same struct shape, namespaced collection name. Reads
652 /// // against this type route to the attached "archive" db.
653 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
654 /// #[obj(collection = "archive.orders_attach_doc")]
655 /// struct ArchivedOrder { total_cents: u64 }
656 ///
657 /// let dir = tempfile::tempdir()?;
658 /// let live = dir.path().join("live.obj");
659 /// let archive = dir.path().join("archive.obj");
660 ///
661 /// // Seed the archive (writes go through its own un-namespaced type).
662 /// {
663 /// let archive_db = Db::open(&archive)?;
664 /// let _ = archive_db.insert(Order { total_cents: 999 })?;
665 /// }
666 ///
667 /// // Open the live db, attach the archive under "archive".
668 /// let mut db = Db::open(&live)?;
669 /// let _ = db.insert(Order { total_cents: 100 })?;
670 /// db.attach(&archive, "archive")?;
671 ///
672 /// // One read transaction, two collections.
673 /// db.read_transaction(|tx| {
674 /// let live = tx.collection::<Order>()?;
675 /// let arch = tx.collection::<ArchivedOrder>()?;
676 /// assert_eq!(live.all()?.len(), 1);
677 /// assert_eq!(arch.all()?.len(), 1);
678 /// Ok(())
679 /// })?;
680 ///
681 /// db.detach("archive")?;
682 /// # Ok(())
683 /// # }
684 /// ```
685 ///
686 /// # Errors
687 ///
688 /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
689 /// use on this `Db`.
690 /// - [`Error::AttachmentNotReadable`] if `path` cannot be
691 /// opened read-only.
692 pub fn attach<P: AsRef<std::path::Path>>(
693 &mut self,
694 path: P,
695 namespace: impl Into<String>,
696 ) -> Result<()> {
697 // The `&mut self` signature is preserved for API stability,
698 // but the attachment registry is interior-mutable
699 // (`Arc<Mutex<_>>` + `Arc<AtomicUsize>`), so the work is
700 // delegated to the `&self` core shared with
701 // [`Self::attach_shared`].
702 self.attach_inner(path.as_ref(), namespace.into())
703 }
704
705 /// Shared-reference (`&self`) form of [`Self::attach`]. Attaches
706 /// the database at `path` under `namespace` through `&self`, for
707 /// callers that hold a shared handle (e.g. an `Arc<Db>`) and
708 /// cannot obtain `&mut self`.
709 ///
710 /// Behaviour is identical to [`Self::attach`]: the attachment
711 /// registry is interior-mutable, guarded by the same per-`Db`
712 /// mutex, so concurrent `attach_shared` / `detach_shared` calls
713 /// from multiple threads serialise on that mutex. The duplicate-
714 /// namespace re-check after the read-only open closes the race
715 /// between two concurrent attaches of the same namespace.
716 ///
717 /// # Examples
718 ///
719 /// ```
720 /// # fn main() -> obj::Result<()> {
721 /// use std::sync::Arc;
722 /// use obj::Db;
723 ///
724 /// let dir = tempfile::tempdir()?;
725 /// let live = dir.path().join("live.obj");
726 /// let archive = dir.path().join("archive.obj");
727 /// let _ = Db::open(&archive)?;
728 ///
729 /// // A shared handle cannot call `&mut self` `attach`, but can
730 /// // call `attach_shared`.
731 /// let db = Arc::new(Db::open(&live)?);
732 /// db.attach_shared(&archive, "archive")?;
733 /// db.detach_shared("archive")?;
734 /// # Ok(())
735 /// # }
736 /// ```
737 ///
738 /// # Errors
739 ///
740 /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
741 /// use on this `Db`.
742 /// - [`Error::AttachmentNotReadable`] if `path` cannot be
743 /// opened read-only.
744 /// - [`Error::Busy`] if the registry mutex is poisoned.
745 pub fn attach_shared<P: AsRef<std::path::Path>>(
746 &self,
747 path: P,
748 namespace: impl Into<String>,
749 ) -> Result<()> {
750 self.attach_inner(path.as_ref(), namespace.into())
751 }
752
753 /// `&self` core shared by [`Self::attach`] and
754 /// [`Self::attach_shared`]. Both signatures are thin wrappers; the
755 /// mutation flows through the interior-mutable `attached` registry
756 /// regardless of the public method's receiver.
757 fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
758 let path_buf = path.to_path_buf();
759 {
760 let registry = self.attached.lock().map_err(|_| Error::Busy {
761 kind: obj_core::LockKind::WriterInProcess,
762 })?;
763 if registry.contains_key(&namespace) {
764 return Err(Error::AttachmentAlreadyExists { namespace });
765 }
766 }
767 let attached_db =
768 Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
769 path: path_buf.clone(),
770 source: Box::new(source),
771 })?;
772 let env = Arc::clone(&attached_db.env);
773 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
774 kind: obj_core::LockKind::WriterInProcess,
775 })?;
776 // Re-check after acquiring the lock — a sibling caller may
777 // have raced; the `Arc<Mutex<_>>` shape makes the check
778 // necessary on the second acquire.
779 if registry.contains_key(&namespace) {
780 return Err(Error::AttachmentAlreadyExists { namespace });
781 }
782 registry.insert(
783 namespace,
784 AttachedDb {
785 env,
786 _db: attached_db,
787 },
788 );
789 // #83 (c): publish the new length while still holding the
790 // `attached` mutex so `pin_attached_snapshots`' relaxed load is
791 // never stale relative to a committed attachment.
792 self.attached_len
793 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
794 Ok(())
795 }
796
797 /// Remove the attachment registered under `namespace`. Returns
798 /// [`Error::CollectionNamespaceUnknown`] if the namespace is
799 /// not attached.
800 ///
801 /// In-flight read transactions hold their own snapshot pins on
802 /// the attached env; detach removes the registry entry, but the
803 /// in-flight read may still complete against its pinned
804 /// snapshot.
805 ///
806 /// # Errors
807 ///
808 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
809 /// not attached.
810 /// - [`Error::Busy`] if the registry mutex is poisoned.
811 pub fn detach(&mut self, namespace: &str) -> Result<()> {
812 // `&mut self` preserved for API stability; the registry is
813 // interior-mutable, so the work delegates to the `&self` core
814 // shared with [`Self::detach_shared`].
815 self.detach_inner(namespace)
816 }
817
818 /// Shared-reference (`&self`) form of [`Self::detach`]. Removes the
819 /// attachment registered under `namespace` through `&self`, for
820 /// callers that hold a shared handle (e.g. an `Arc<Db>`).
821 ///
822 /// Behaviour is identical to [`Self::detach`]. In-flight read
823 /// transactions hold their own snapshot pins on the attached env;
824 /// `detach_shared` removes the registry entry, but the in-flight
825 /// read may still complete against its pinned snapshot. Concurrent
826 /// `attach_shared` / `detach_shared` calls serialise on the same
827 /// per-`Db` registry mutex.
828 ///
829 /// # Errors
830 ///
831 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
832 /// not attached.
833 /// - [`Error::Busy`] if the registry mutex is poisoned.
834 pub fn detach_shared(&self, namespace: &str) -> Result<()> {
835 self.detach_inner(namespace)
836 }
837
838 /// `&self` core shared by [`Self::detach`] and
839 /// [`Self::detach_shared`].
840 fn detach_inner(&self, namespace: &str) -> Result<()> {
841 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
842 kind: obj_core::LockKind::WriterInProcess,
843 })?;
844 if registry.remove(namespace).is_none() {
845 return Err(Error::CollectionNamespaceUnknown {
846 namespace: namespace.to_owned(),
847 });
848 }
849 // #83 (c): publish the post-detach length under the lock.
850 self.attached_len
851 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
852 Ok(())
853 }
854
855 /// Insert `doc` into its collection. One-shot transaction;
856 /// returns the assigned [`Id`].
857 ///
858 /// The one-shot API opens, commits, and closes a private
859 /// transaction per call. Reach for [`Db::transaction`] when
860 /// several mutations must commit or roll back as a single
861 /// atomic unit.
862 ///
863 /// # Examples
864 ///
865 /// One-shot CRUD against a `Document`-derived type:
866 ///
867 /// ```
868 /// # fn main() -> obj::Result<()> {
869 /// use obj::Db;
870 /// use serde::{Deserialize, Serialize};
871 ///
872 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
873 /// struct Order {
874 /// customer_id: u64,
875 /// total_cents: u64,
876 /// status: String,
877 /// }
878 ///
879 /// let dir = tempfile::tempdir()?;
880 /// let db = Db::open(dir.path().join("oneshot.obj"))?;
881 ///
882 /// // insert returns the freshly-allocated Id.
883 /// let id = db.insert(Order {
884 /// customer_id: 1,
885 /// total_cents: 100,
886 /// status: "pending".to_owned(),
887 /// })?;
888 ///
889 /// // get returns Option<T>.
890 /// let _maybe: Option<Order> = db.get::<Order>(id)?;
891 ///
892 /// // update applies a closure in place.
893 /// db.update::<Order, _>(id, |o| {
894 /// o.status = "shipped".to_owned();
895 /// })?;
896 ///
897 /// // upsert at a caller-supplied id (insert or replace).
898 /// let id2 = obj::Id::try_new(42)
899 /// .ok_or(obj::Error::InvalidArgument("non-zero"))?;
900 /// db.upsert::<Order>(id2, Order {
901 /// customer_id: 2,
902 /// total_cents: 999,
903 /// status: "new".to_owned(),
904 /// })?;
905 ///
906 /// // delete returns true if the row existed.
907 /// let existed = db.delete::<Order>(id)?;
908 /// assert!(existed);
909 /// # Ok(())
910 /// # }
911 /// ```
912 ///
913 /// # Errors
914 ///
915 /// As [`Self::transaction`] plus any error from
916 /// [`crate::Collection::insert`].
917 pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
918 self.transaction(|tx| tx.collection::<T>()?.insert(doc))
919 }
920
921 /// Fetch the document at `id`. Returns `Ok(None)` if absent.
922 ///
923 /// # Errors
924 ///
925 /// As [`Self::read_transaction`] plus any error from
926 /// [`crate::Collection::get`].
927 pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
928 // #83 (b): the one-shot point read goes through the fused
929 // single-pager-lock path (descriptor resolve + primary get
930 // under one guard) instead of `tx.collection()?.get()`, which
931 // takes the pager mutex twice. Observably identical for the
932 // one-shot caller (same snapshot, same `CollectionNotFound`
933 // contract). Namespaced reads fall back to the handle path.
934 self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
935 }
936
937 /// Update the document at `id` via the closure.
938 ///
939 /// # Errors
940 ///
941 /// - [`Error::DocumentNotFound`] if `id` does not exist.
942 /// - As [`Self::transaction`].
943 pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
944 where
945 T: Document,
946 F: FnOnce(&mut T),
947 {
948 self.transaction(|tx| tx.collection::<T>()?.update(id, f))
949 }
950
951 /// Delete the document at `id`. Returns `true` if it existed.
952 ///
953 /// # Errors
954 ///
955 /// As [`Self::transaction`].
956 pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
957 self.transaction(|tx| tx.collection::<T>()?.delete(id))
958 }
959
960 /// Insert or replace the document at `id`.
961 ///
962 /// # Errors
963 ///
964 /// As [`Self::transaction`].
965 pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
966 self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
967 }
968
969 /// Convenience wrapper around [`crate::Collection::find_unique`]
970 /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
971 /// Runs inside a one-shot read transaction.
972 ///
973 /// `O(log n)`, no collection scan; the lookup walks the named
974 /// index's B-tree directly. Defined only on `Unique` indexes —
975 /// for the other kinds use
976 /// [`Collection::lookup`](crate::Collection::lookup) or
977 /// [`Collection::index_range`](crate::Collection::index_range).
978 ///
979 /// # Examples
980 ///
981 /// ```
982 /// # fn main() -> obj::Result<()> {
983 /// use obj::Db;
984 /// use serde::{Deserialize, Serialize};
985 ///
986 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
987 /// #[obj(collection = "customers_find_unique_doc")]
988 /// struct Customer {
989 /// #[obj(index = unique)]
990 /// email: String,
991 /// }
992 ///
993 /// let dir = tempfile::tempdir()?;
994 /// let db = Db::open(dir.path().join("find-unique.obj"))?;
995 /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
996 /// let by_email: Option<Customer> = db
997 /// .find_unique::<Customer>("email", "ada@example.com")?;
998 /// assert!(by_email.is_some());
999 /// # Ok(())
1000 /// # }
1001 /// ```
1002 ///
1003 /// # Errors
1004 ///
1005 /// As [`crate::Collection::find_unique`].
1006 pub fn find_unique<T: Document>(
1007 &self,
1008 index_name: &str,
1009 key: impl Into<obj_core::codec::Dynamic>,
1010 ) -> Result<Option<T>> {
1011 self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
1012 }
1013
1014 /// Construct a fresh M8 [`crate::Query`] builder rooted at this
1015 /// database. The builder borrows `&self` for the build phase;
1016 /// the borrow ends when [`crate::Query::fetch`] returns.
1017 ///
1018 /// Compose with [`Query::filter`](crate::Query::filter),
1019 /// [`Query::limit`](crate::Query::limit),
1020 /// [`Query::sort_by`](crate::Query::sort_by),
1021 /// [`Query::index_range`](crate::Query::index_range). Terminate
1022 /// with [`Query::fetch`](crate::Query::fetch) (for the
1023 /// documents) or [`Query::count`](crate::Query::count) (for the
1024 /// count alone).
1025 ///
1026 /// Mirrors [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1027 /// § Querying — see the M8 examples in
1028 /// [`crates/obj/tests/design_md_queries.rs`](https://github.com/uname-n/obj/blob/master/crates/obj/tests/design_md_queries.rs)
1029 /// for the full surface.
1030 ///
1031 /// # Examples
1032 ///
1033 /// Top-N matching documents by an indexed field, ascending:
1034 ///
1035 /// ```
1036 /// # fn main() -> obj::Result<()> {
1037 /// use obj::Db;
1038 /// use obj_core::codec::Dynamic;
1039 /// use serde::{Deserialize, Serialize};
1040 ///
1041 /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1042 /// enum OrderStatus { Pending, Shipped }
1043 ///
1044 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1045 /// #[obj(collection = "orders_query_doc")]
1046 /// struct Order {
1047 /// #[obj(index)]
1048 /// customer_id: u64,
1049 /// status: OrderStatus,
1050 /// #[obj(index)]
1051 /// placed_at: u64,
1052 /// }
1053 ///
1054 /// let dir = tempfile::tempdir()?;
1055 /// let db = Db::open(dir.path().join("queries.obj"))?;
1056 /// for i in 0..20u64 {
1057 /// let _ = db.insert(Order {
1058 /// customer_id: i % 3,
1059 /// status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
1060 /// placed_at: i * 1_000,
1061 /// })?;
1062 /// }
1063 ///
1064 /// let pending: Vec<Order> = db
1065 /// .query::<Order>()
1066 /// .filter(|o| o.status == OrderStatus::Pending)
1067 /// .sort_by(|o| Dynamic::U64(o.placed_at))
1068 /// .limit(5)
1069 /// .fetch()?;
1070 /// assert!(pending.len() <= 5);
1071 /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
1072 /// # Ok(())
1073 /// # }
1074 /// ```
1075 #[must_use]
1076 pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
1077 crate::Query::new(self)
1078 }
1079
1080 /// Open a read-only typed handle to the collection registered
1081 /// under the runtime `name`, instead of the type's compile-time
1082 /// `T::COLLECTION`.
1083 ///
1084 /// This unlocks the
1085 /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1086 /// § Portability example for
1087 /// attached databases: by passing a namespaced name like
1088 /// `"archive.orders"`, the returned [`crate::Collection`] reads
1089 /// from the database attached under the `"archive"` namespace
1090 /// — see [`Db::attach`].
1091 ///
1092 /// Construction is **infallible**: errors (missing collection,
1093 /// unknown namespace, busy lock) surface at the first method
1094 /// call on the handle, not at the call to `collection(name)`.
1095 /// Each read-only method on the returned handle opens a private
1096 /// [`Db::read_transaction`] and dispatches against the
1097 /// runtime-named collection's catalog row.
1098 ///
1099 /// # Read-only
1100 ///
1101 /// The returned handle rejects every mutating call —
1102 /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
1103 /// all return [`Error::ReadOnly`]. To write into a non-default
1104 /// collection, override [`obj_core::Document::COLLECTION`] on
1105 /// the type itself (compile-time-bound) and use the regular
1106 /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
1107 /// Phase 1B (M11 #94) intentionally limits the runtime accessor
1108 /// to reads — the write-through-runtime-name path requires
1109 /// engine plumbing that is deferred to a later milestone.
1110 ///
1111 /// # Example
1112 ///
1113 /// ```
1114 /// use obj::{Db, Document};
1115 /// use serde::{Deserialize, Serialize};
1116 /// use tempfile::tempdir;
1117 ///
1118 /// #[derive(Debug, Clone, Serialize, Deserialize)]
1119 /// struct Order { customer_id: u64, total_cents: u64 }
1120 ///
1121 /// impl Document for Order {
1122 /// const COLLECTION: &'static str = "orders";
1123 /// const VERSION: u32 = 1;
1124 /// }
1125 ///
1126 /// fn run() -> obj::Result<()> {
1127 /// let dir = tempdir()?;
1128 /// let archive_path = dir.path().join("archive.obj");
1129 /// // Populate the archive database first.
1130 /// {
1131 /// let archive_db = Db::open(&archive_path)?;
1132 /// archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
1133 /// }
1134 /// // Attach it under a namespace and read via the runtime
1135 /// // accessor — no need to re-declare `Order` with a
1136 /// // namespaced `COLLECTION`.
1137 /// let main_path = dir.path().join("main.obj");
1138 /// let mut db = Db::open(&main_path)?;
1139 /// db.attach(&archive_path, "archive")?;
1140 /// let archived: Vec<Order> = db
1141 /// .collection::<Order>("archive.orders")
1142 /// .all()?
1143 /// .into_iter()
1144 /// .map(|(_id, doc)| doc)
1145 /// .collect();
1146 /// assert_eq!(archived.len(), 1);
1147 /// Ok(())
1148 /// }
1149 /// # run().unwrap();
1150 /// ```
1151 #[must_use]
1152 pub fn collection<T: Document + Send + 'static>(
1153 &self,
1154 name: impl Into<String>,
1155 ) -> crate::Collection<'_, T> {
1156 crate::Collection::<T>::lazy(self, name.into())
1157 }
1158
1159 /// Convenience shim mirroring
1160 /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md) —
1161 /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1162 /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1163 /// that drives the streaming iterator to exhaustion and
1164 /// collects; if the collection is large enough that peak
1165 /// memory matters, prefer [`Db::iter_all`] directly.
1166 ///
1167 /// Sorting requires materialisation — the comparator needs
1168 /// every key up front. Use [`Db::query`] +
1169 /// [`Query::sort_by`](crate::Query::sort_by) for the
1170 /// top-N-sorted workload; the iterator side has no streaming
1171 /// sorted shape.
1172 ///
1173 /// # Examples
1174 ///
1175 /// ```
1176 /// # fn main() -> obj::Result<()> {
1177 /// use obj::Db;
1178 /// use serde::{Deserialize, Serialize};
1179 ///
1180 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1181 /// struct Order { total_cents: u64 }
1182 ///
1183 /// let dir = tempfile::tempdir()?;
1184 /// let db = Db::open(dir.path().join("all.obj"))?;
1185 /// for i in 0..5u64 {
1186 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1187 /// }
1188 /// let listed: Vec<Order> = db.all::<Order>()?;
1189 /// assert_eq!(listed.len(), 5);
1190 /// # Ok(())
1191 /// # }
1192 /// ```
1193 ///
1194 /// # Errors
1195 ///
1196 /// As [`Db::iter_all`].
1197 pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1198 self.iter_all::<T>()?
1199 .map(|step| step.map(|(_id, doc)| doc))
1200 .collect()
1201 }
1202
1203 /// Write a self-contained `.obj` file at `dest` carrying this
1204 /// database's state at the LSN of an internally-taken reader
1205 /// snapshot.
1206 ///
1207 /// Hot backup — writers continue uninterrupted against the
1208 /// source. Post-snapshot writes are NOT in the destination.
1209 ///
1210 /// Algorithm (per
1211 /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
1212 /// § Hot backup):
1213 ///
1214 /// 1. Take a `ReaderSnapshot` against the source pager (pins
1215 /// `pinned_lsn`).
1216 /// 2. `OpenOptions::create_new(true)` on `dest`.
1217 /// 3. Copy main-file pages `0..page_count` to `dest`.
1218 /// 4. Overlay every frame in the snapshot's frozen WAL view
1219 /// onto `dest` at the frame's page-id offset.
1220 /// 5. If the snapshot carries a WAL-staged page-0 header,
1221 /// overlay it (so `dest`'s page-0 reflects the catalog
1222 /// root / freelist head / page count the snapshot would
1223 /// have observed).
1224 /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1225 /// the header CRC32C.
1226 /// 7. `sync_data(SyncMode::Full)` on `dest`.
1227 /// 8. Drop the snapshot (releases the WAL pin).
1228 ///
1229 /// On any mid-backup error the destination file is removed
1230 /// best-effort so a half-written backup does not linger.
1231 ///
1232 /// # Examples
1233 ///
1234 /// ```
1235 /// # fn main() -> obj::Result<()> {
1236 /// use obj::Db;
1237 /// use serde::{Deserialize, Serialize};
1238 ///
1239 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1240 /// #[obj(collection = "notes_backup_doc")]
1241 /// struct Note { body: String }
1242 ///
1243 /// let dir = tempfile::tempdir()?;
1244 /// let src = dir.path().join("src.obj");
1245 /// let dst = dir.path().join("backup.obj");
1246 ///
1247 /// let db = Db::open(&src)?;
1248 /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1249 /// db.backup_to(&dst)?;
1250 ///
1251 /// // The backup is itself a fully-formed obj file. Open it and read.
1252 /// let backup = Db::open(&dst)?;
1253 /// let listed: Vec<Note> = backup.all::<Note>()?;
1254 /// assert_eq!(listed.len(), 1);
1255 /// # Ok(())
1256 /// # }
1257 /// ```
1258 ///
1259 /// # Errors
1260 ///
1261 /// - [`Error::BackupDestinationExists`] if `dest` already
1262 /// exists.
1263 /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1264 /// a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1265 /// - [`Error::Io`] on syscall failure during the copy.
1266 pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1267 // #76: a hot backup reads the source MAIN FILE page-by-page
1268 // (`backup::copy_main_file`). The in-process pager `Mutex`
1269 // alone excludes only same-process writers; a writer in a
1270 // SEPARATE OS process can checkpoint pages into the main file
1271 // mid-copy and yield a torn backup. Hold the cross-process
1272 // `WRITER_LOCK` for the whole copy so no external writer can
1273 // mutate the main file under us.
1274 //
1275 // Lock-order invariant (MUST match `WriteTxn::begin`, see
1276 // `docs/concurrency.md`): in-process write-serialization FIRST,
1277 // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1278 // (innermost, acquired and released within the critical
1279 // section). `obj_core::WriteTxn::begin` acquires the first two
1280 // in exactly that order, so routing through it inherits the
1281 // correct ordering and cannot invert against a concurrent
1282 // in-process `WriteTxn`. We make NO writes; the txn is rolled
1283 // back (a no-op restore of the unchanged header) once the copy
1284 // is done. On in-memory / `cross_process_lock = false` envs the
1285 // cross-process guard is absent (`lock_file = None`) and the
1286 // txn degrades to the in-process serialization guard only.
1287 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1288 let result = self.run_backup_under_guard(dest);
1289 // Release both lock layers in the documented order regardless
1290 // of the copy's outcome. A rollback failure (poisoned pager
1291 // mutex) only surfaces when the copy itself succeeded.
1292 let unlock = guard.rollback();
1293 result.and(unlock)
1294 }
1295
1296 /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1297 /// reader snapshot, and run the backup copy. Factored out of
1298 /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1299 /// is unambiguous and the function stays within the power-of-ten
1300 /// 60-line budget (Rule 4).
1301 fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1302 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1303 kind: obj_core::LockKind::WriterInProcess,
1304 })?;
1305 let snapshot = pager.reader_snapshot()?;
1306 obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1307 // Snapshot drops at end of scope; the pin is released and
1308 // any deferred checkpoint may proceed.
1309 drop(snapshot);
1310 drop(pager);
1311 Ok(())
1312 }
1313
1314 /// Fold committed WAL pages into the main `.obj` file and reset
1315 /// the WAL back to its 64-byte header.
1316 ///
1317 /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1318 /// pager lock the same way [`Self::backup_to`] does, then calls
1319 /// the pager checkpoint. After it returns, the committed records
1320 /// live in the main file rather than the `-wal` sidecar, and the
1321 /// WAL is truncated to header-only.
1322 ///
1323 /// # Deferred / no-op behavior
1324 ///
1325 /// - If a live MVCC reader has pinned an LSN below the end of the
1326 /// WAL, the checkpoint is **deferred** (a safe no-op) so the
1327 /// reader's frames are not reclaimed out from under it.
1328 /// - If there is nothing to fold (no committed WAL frames), the
1329 /// call is a harmless no-op.
1330 ///
1331 /// This is the reusable engine entry point behind the Python
1332 /// `Db.checkpoint()` binding and a future checkpoint-on-close
1333 /// path (issue #5).
1334 ///
1335 /// # Errors
1336 ///
1337 /// - [`Error::ReadOnly`] if the database was opened read-only.
1338 /// - [`Error::Busy`] if the pager lock is poisoned.
1339 /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1340 /// (e.g. [`Error::Io`] on syscall failure).
1341 #[cfg_attr(
1342 feature = "tracing",
1343 tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1344 )]
1345 pub fn checkpoint(&self) -> Result<()> {
1346 if self.readonly {
1347 return Err(Error::ReadOnly {
1348 operation: "checkpoint",
1349 });
1350 }
1351 // #76: checkpoint folds committed WAL frames into the main
1352 // file and rotates the WAL salt. Both touch the main file and
1353 // the on-disk header. Taken under only the in-process pager
1354 // `Mutex` (as before), a writer in a SEPARATE process could
1355 // run its own commit/checkpoint concurrently and corrupt the
1356 // salt-rotation handshake or interleave main-file writes. Hold
1357 // the cross-process `WRITER_LOCK` for the duration, in the
1358 // documented lock order (in-process serialization → cross-
1359 // process writer lock → pager mutex; see `WriteTxn::begin` and
1360 // `Self::backup_to`). We make no user writes; the surrounding
1361 // txn is rolled back afterwards. The rollback's
1362 // `restore_header_snapshot` only rewrites `root_catalog` /
1363 // `freelist_head` / `page_count` (unchanged by checkpoint) and
1364 // preserves the freshly-rotated `wal_salt`, so it is a no-op
1365 // with respect to the checkpoint's durable effects.
1366 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1367 let result = self.run_checkpoint_under_guard();
1368 let unlock = guard.rollback();
1369 result.and(unlock)
1370 }
1371
1372 /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1373 /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1374 /// cross-process lock guard's lifetime is unambiguous.
1375 fn run_checkpoint_under_guard(&self) -> Result<()> {
1376 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1377 kind: obj_core::LockKind::WriterInProcess,
1378 })?;
1379 pager.checkpoint()?;
1380 drop(pager);
1381 Ok(())
1382 }
1383
1384 /// Streaming iterator over every `(Id, T)` pair in the
1385 /// collection. The returned [`IterAll`] holds a read transaction
1386 /// — and therefore a pinned reader snapshot — for its entire
1387 /// lifetime; the borrow on `self` ends when the iterator is
1388 /// dropped.
1389 ///
1390 /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1391 /// errors surface as `Some(Err(_))` rather than ending the
1392 /// iteration; the caller decides whether to propagate or
1393 /// continue.
1394 ///
1395 /// Peak memory does NOT scale with collection size — the
1396 /// iterator's internal buffer is fixed at
1397 /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the
1398 /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
1399 /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1400 ///
1401 /// `Query::fetch` is sort-compatible (sort requires
1402 /// materialisation); `iter_all` is NOT — there is no streaming
1403 /// shape for sort because the comparator needs every key
1404 /// up front. Use `Query::sort_by` + `fetch` for the sorted
1405 /// workload; use `iter_all` for the unsorted large-scan
1406 /// workload.
1407 ///
1408 /// # Examples
1409 ///
1410 /// Streaming a small collection and folding into a running sum.
1411 /// The iterator's peak memory stays bounded regardless of how
1412 /// many documents the collection holds:
1413 ///
1414 /// ```
1415 /// # fn main() -> obj::Result<()> {
1416 /// use obj::Db;
1417 /// use serde::{Deserialize, Serialize};
1418 ///
1419 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1420 /// struct Order { total_cents: u64 }
1421 ///
1422 /// let dir = tempfile::tempdir()?;
1423 /// let db = Db::open(dir.path().join("iter.obj"))?;
1424 /// for i in 0..5u64 {
1425 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1426 /// }
1427 ///
1428 /// let mut total: u64 = 0;
1429 /// for step in db.iter_all::<Order>()? {
1430 /// let (_id, doc) = step?;
1431 /// total = total
1432 /// .checked_add(doc.total_cents)
1433 /// .ok_or(obj::Error::InvalidArgument("overflow"))?;
1434 /// }
1435 /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1436 /// # Ok(())
1437 /// # }
1438 /// ```
1439 ///
1440 /// # Errors
1441 ///
1442 /// - As [`Db::read_transaction`] (construction-time).
1443 /// - [`Error::CollectionNotFound`] if the collection is not yet
1444 /// registered at the snapshot's pinned LSN.
1445 /// - Per-step iteration may yield `Some(Err(_))` for pager,
1446 /// B-tree, or codec failures.
1447 pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1448 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1449 let txn = ReadTxn::new(inner);
1450 // Resolve the descriptor up front so an absent collection
1451 // surfaces at construction (matching `Db::all`'s pre-M8
1452 // contract). The handle's lifetime is bound to `txn`; we
1453 // drop the handle immediately and stash the descriptor.
1454 let descriptor = {
1455 let coll = txn.collection::<T>()?;
1456 coll.descriptor().clone()
1457 };
1458 Ok(IterAll {
1459 txn,
1460 descriptor,
1461 buffer: VecDeque::new(),
1462 last_emitted_key: None,
1463 finished: false,
1464 _phantom: PhantomData,
1465 })
1466 }
1467}
1468
1469/// Streaming iterator returned by [`Db::iter_all`].
1470///
1471/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1472/// is consistent with the snapshot pinned at construction. Yields
1473/// `Result<(Id, T)>` one entry at a time; refills its internal
1474/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1475/// pager-lock acquisition, so peak memory stays bounded at a small
1476/// constant regardless of the collection's size — power-of-ten
1477/// Rule 3.
1478///
1479/// Construction errors surface at the [`Db::iter_all`] call site;
1480/// per-step errors (pager, B-tree, codec) surface as
1481/// `Some(Err(_))` during iteration and do NOT terminate it — the
1482/// caller decides whether to continue.
1483// Note: `IterAll<T>` is deliberately outside the `serde` surface
1484// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1485// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1486// neither of which is serializable in a meaningful way. Users who
1487// need an off-line representation of iteration output should
1488// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1489// resulting `Vec<T>` themselves.
1490pub struct IterAll<'db, T> {
1491 /// Owns the snapshot pin for the iterator's lifetime.
1492 txn: ReadTxn<'db>,
1493 /// Cached primary-tree root + collection id (`Document::decode`
1494 /// needs the collection id to validate the per-doc header).
1495 descriptor: CollectionDescriptor,
1496 /// Pre-decoded buffer of upcoming entries.
1497 buffer: VecDeque<Result<(Id, T)>>,
1498 /// Resumption marker — `Excluded(last_emitted_key)` is the
1499 /// start bound of the next refill.
1500 last_emitted_key: Option<Vec<u8>>,
1501 /// `true` once the underlying B-tree iterator returned no more
1502 /// entries; subsequent `next` calls return `None`.
1503 finished: bool,
1504 /// Track the `T` parameter without owning a value.
1505 _phantom: PhantomData<fn() -> T>,
1506}
1507
1508impl<T> std::fmt::Debug for IterAll<'_, T> {
1509 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1510 f.debug_struct("IterAll")
1511 .field("collection_id", &self.descriptor.collection_id)
1512 .field("buffer_len", &self.buffer.len())
1513 .field("finished", &self.finished)
1514 .finish_non_exhaustive()
1515 }
1516}
1517
1518impl<T: Document + Send + 'static> IterAll<'_, T> {
1519 /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1520 /// entries, resuming from `last_emitted_key`. Stores per-doc
1521 /// decode errors as `Err` in the buffer so the caller can
1522 /// observe them via `next()` without aborting iteration.
1523 ///
1524 /// Power-of-ten Rule 7: every fallible operation is either
1525 /// captured into the buffer or returned up-front via `?` (which
1526 /// propagates the lock-acquisition / B-tree-open failures
1527 /// before the buffer is touched).
1528 fn refill(&mut self) -> Result<()> {
1529 // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1530 // NOT keep an immutable borrow of `self.txn` alive across
1531 // the buffer-mutation calls below.
1532 let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1533 let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1534 kind: obj_core::LockKind::WriterInProcess,
1535 })?;
1536 let root_pid = PageId::new(self.descriptor.primary_root)
1537 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1538 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1539 let start = match &self.last_emitted_key {
1540 Some(k) => Bound::Excluded(k.clone()),
1541 None => Bound::Unbounded,
1542 };
1543 let collection_id = self.descriptor.collection_id;
1544 let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1545 let mut yielded: usize = 0;
1546 let mut last_key: Option<Vec<u8>> = None;
1547 let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1548 for step in iter {
1549 if yielded >= ITER_ALL_BATCH {
1550 break;
1551 }
1552 yielded = yielded
1553 .checked_add(1)
1554 .ok_or(Error::BTreeInvariantViolated {
1555 reason: "iter_all batch counter overflow",
1556 })?;
1557 buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1558 }
1559 if yielded < ITER_ALL_BATCH {
1560 self.finished = true;
1561 }
1562 // The lock is held until `pager` drops at function end — but
1563 // by the time we mutate `self.buffer` / `self.last_emitted_key`
1564 // below, the `iter` (which borrowed `pager`) is already
1565 // dropped (its scope ended). Move the staged batch over.
1566 drop(pager);
1567 self.buffer.extend(batch);
1568 if let Some(k) = last_key {
1569 self.last_emitted_key = Some(k);
1570 }
1571 Ok(())
1572 }
1573}
1574
1575/// Process one B-tree iterator step into the staged `batch`. Decode
1576/// errors and corruption errors are captured as `Err` entries so
1577/// they surface via `next()` rather than aborting the refill —
1578/// power-of-ten Rule 7.
1579///
1580/// Free function (rather than `&mut self`) so the caller can keep
1581/// the pager lock open across multiple buffer-pushes without the
1582/// borrow checker tripping on a second `&mut self.buffer` borrow.
1583fn buffer_one_entry<T: Document>(
1584 batch: &mut VecDeque<Result<(Id, T)>>,
1585 last_key: &mut Option<Vec<u8>>,
1586 collection_id: u32,
1587 step: Result<(Vec<u8>, Vec<u8>)>,
1588) {
1589 let (key, value) = match step {
1590 Ok(kv) => kv,
1591 Err(e) => {
1592 batch.push_back(Err(e));
1593 return;
1594 }
1595 };
1596 let Some(id) = Id::from_be_bytes(&key) else {
1597 batch.push_back(Err(Error::InvalidArgument(
1598 "primary B-tree key is not an Id",
1599 )));
1600 return;
1601 };
1602 *last_key = Some(key);
1603 let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1604 batch.push_back(decoded.map(|doc| (id, doc)));
1605}
1606
1607impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1608 type Item = Result<(Id, T)>;
1609
1610 fn next(&mut self) -> Option<Self::Item> {
1611 if let Some(item) = self.buffer.pop_front() {
1612 return Some(item);
1613 }
1614 if self.finished {
1615 return None;
1616 }
1617 if let Err(e) = self.refill() {
1618 // A lock / open failure surfaces here ONCE and then
1619 // `finished` is flipped so subsequent `next` calls
1620 // terminate cleanly. The error is surfaced to the
1621 // caller via `Some(Err(_))` — power-of-ten Rule 7.
1622 self.finished = true;
1623 return Some(Err(e));
1624 }
1625 self.buffer.pop_front()
1626 }
1627}
1628
1629/// Split a possibly-namespaced collection name into its
1630/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1631/// only; downstream collection names may contain further dots.
1632///
1633/// `"users"` → `(None, "users")`.
1634/// `"archive.orders"` → `(Some("archive"), "orders")`.
1635/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1636#[must_use]
1637pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1638 match name.find('.') {
1639 Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1640 None => (None, name),
1641 }
1642}
1643
1644/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1645/// concurrent reader / single-writer workloads. The thread-safety
1646/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1647const _: () = {
1648 fn assert_send_sync<T: Send + Sync>() {}
1649 let _ = assert_send_sync::<Db>;
1650};
1651
1652/// Translate the first failure in `report` (if any) into the
1653/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1654/// open-time fast check so the caller sees `Err(Error::Corruption {
1655/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1656/// in the returned error is the locus of the failure when one is
1657/// available; for failures whose locus is a non-page (e.g. an
1658/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1659/// catalog root page-id is used as a stand-in.
1660fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1661 let first = report.failures.first()?;
1662 let err = match first {
1663 obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1664 | obj_core::IntegrityFailure::OrphanPage { page_id }
1665 | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1666 | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1667 | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1668 | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1669 | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1670 Error::Corruption { page_id: *page_id }
1671 }
1672 obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1673 Error::BTreeDepthExceeded { limit: *limit }
1674 }
1675 // The remaining variants — OrphanIndexEntry,
1676 // MissingIndexEntry, and any future `#[non_exhaustive]`
1677 // additions — are never emitted by `quick_check` (which
1678 // walks the catalog tree only, not the per-collection
1679 // primary or index trees). The defensive fallback is a
1680 // coarse `Corruption { page_id: 0 }` so the open-time
1681 // check still fails closed if the obj-core layer ever
1682 // grows a new fast-check failure mode.
1683 _ => Error::Corruption { page_id: 0 },
1684 };
1685 Some(err)
1686}