obj/db.rs
1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in `design.md` § API and pinned in `docs/format.md`
5//! § Db public API contract.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::marker::PhantomData;
9use std::ops::Bound;
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use obj_core::btree::BTree;
15use obj_core::pager::page::PageId;
16use obj_core::pager::{lock_path_for, Pager};
17use obj_core::platform::FileHandle;
18use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
19
20use crate::config::Config;
21use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
22
23/// Per-batch refill size for [`IterAll`]. The iterator yields one
24/// document at a time to the caller, but internally fetches the
25/// primary B-tree in `BATCH` chunks so the per-step pager-lock
26/// acquisition cost amortises over many `next` calls. Power-of-ten
27/// Rule 3: the buffer is fixed-size (constant 256 entries — at
28/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
29/// with the collection's total size.
30const ITER_ALL_BATCH: usize = 256;
31
32/// The embedded document database.
33///
34/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
35/// concurrent reader / single-writer access. See `design.md`
36/// § "Concurrent readers, one writer".
37///
38/// At M6 the public `Db` is hard-typed against
39/// `obj_core::FileHandle`. A future refactor may make it generic
40/// over `F: FileBackend` so fault-injection harnesses can build on
41/// the same API; today the test helpers reach for the lower-level
42/// `obj-core` building blocks instead.
43pub struct Db {
44 pub(crate) env: Arc<TxnEnv<FileHandle>>,
45 pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
46 pub(crate) readonly: bool,
47 pub(crate) busy_timeout: Duration,
48 /// Per-process cache of `(collection, version)` keys whose
49 /// `Document::indexes()` reconciliation has already run. M7
50 /// #57: reconciliation is idempotent but a catalog walk +
51 /// declaration cycle is non-trivial — caching ensures only the
52 /// first `WriteTxn::collection::<T>()` call per process per
53 /// `(collection, version)` pays the cost. #130: the version is part
54 /// of the key so a later schema version that ADDS an index still
55 /// reconciles (the name-only key skipped it, leaving the new index
56 /// never `Active`).
57 pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
58 /// M11 #93: registry of attached read-only databases keyed by
59 /// namespace. Populated by [`Db::attach`]; consulted by
60 /// [`Db::collection_namespace`] and by
61 /// [`Db::read_transaction`] when it pins per-attached
62 /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
63 /// `&mut self` while live read transactions hold borrows into
64 /// the registry; the mutex coordinates the two.
65 pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
66 /// #83 (c): published length of `attached`, mutated only while the
67 /// `attached` mutex is held. A relaxed load lets the hot read path
68 /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
69 /// no databases are attached (the overwhelmingly common case).
70 pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
71}
72
73impl std::fmt::Debug for Db {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("Db")
76 .field("readonly", &self.readonly)
77 .field("busy_timeout", &self.busy_timeout)
78 .finish_non_exhaustive()
79 }
80}
81
82// ---------- FFI plumbing ------------------------------------------
83//
84// The accessors below expose the shareable internals of `Db` so the
85// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
86// iterator handles without re-implementing the lifecycle. They are
87// marked `#[doc(hidden)]` because user-Rust code should reach for
88// the typed `Db::transaction` / `Db::read_transaction` API; the
89// raw accessors exist so a sibling FFI / native-host crate can
90// build its own self-contained handle types.
91impl Db {
92 /// Shared environment Arc — pager + cross-process lock file.
93 ///
94 /// Used by [`libobj`](../../libobj/index.html) to build owned
95 /// transaction handles whose lifetime extends past a single
96 /// `Db::transaction` closure call.
97 #[doc(hidden)]
98 #[must_use]
99 pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
100 Arc::clone(&self.env)
101 }
102
103 /// Shared catalog Arc.
104 ///
105 /// Used by [`libobj`](../../libobj/index.html) for the same
106 /// reason as [`Self::env_arc`].
107 #[doc(hidden)]
108 #[must_use]
109 pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
110 Arc::clone(&self.catalog)
111 }
112
113 /// Shared per-process reconciliation cache Arc.
114 ///
115 /// Used by [`libobj`](../../libobj/index.html) for the same
116 /// reason as [`Self::env_arc`].
117 #[doc(hidden)]
118 #[must_use]
119 pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
120 Arc::clone(&self.reconciled)
121 }
122
123 /// Busy-lock timeout configured at open time.
124 #[doc(hidden)]
125 #[must_use]
126 pub fn busy_timeout(&self) -> Duration {
127 self.busy_timeout
128 }
129}
130
131impl Db {
132 /// Open or create a file-backed database at `path` with default
133 /// configuration.
134 ///
135 /// Creates the file if absent; reopens otherwise. A `Db` is
136 /// `Send + Sync` — share across threads via `Arc<Db>` for the
137 /// concurrent-reader / single-writer workload documented in
138 /// `docs/concurrency.md`.
139 ///
140 /// For an ephemeral database use [`Db::memory`]; for a
141 /// read-only handle that coexists with another process's writer
142 /// use [`Db::open_readonly`]; for custom durability /
143 /// cache / lock knobs use [`Db::open_with`] + [`Config`].
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// # fn main() -> obj::Result<()> {
149 /// use obj::Db;
150 ///
151 /// let dir = tempfile::tempdir()?;
152 ///
153 /// // File-backed. Creates the file if absent; reopens otherwise.
154 /// let _db = Db::open(dir.path().join("app.obj"))?;
155 ///
156 /// // In-memory. No persistence, no file locks. Useful for tests.
157 /// let _mem = Db::memory()?;
158 ///
159 /// // Read-only. Coexists safely with a writer in another process.
160 /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
161 /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
162 /// # Ok(())
163 /// # }
164 /// ```
165 ///
166 /// # Errors
167 ///
168 /// Returns the underlying [`Error`] from
169 /// [`obj_core::pager::Pager::open`] on syscall or format
170 /// failure.
171 #[cfg_attr(
172 feature = "tracing",
173 tracing::instrument(
174 name = "db.open",
175 level = "info",
176 skip_all,
177 fields(path = %path.as_ref().display()),
178 )
179 )]
180 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
181 Self::open_with(path, Config::default())
182 }
183
184 /// Open or create a file-backed database with `config`.
185 ///
186 /// # Errors
187 ///
188 /// As [`Db::open`].
189 // Takes `Config` by value: this is the frozen builder-handoff
190 // signature (the caller's `Config::default().sync_mode(..)...`
191 // chain ends here). `Config` is no longer `Copy` (issue #17), so
192 // clippy now sees the by-value argument as only borrowed in the
193 // body — but the by-value convention is part of the 1.0 surface
194 // and must not change. Power-of-ten Rule 10: documented allow.
195 #[allow(clippy::needless_pass_by_value)]
196 #[cfg_attr(
197 feature = "tracing",
198 tracing::instrument(
199 name = "db.open",
200 level = "info",
201 skip_all,
202 fields(path = %path.as_ref().display()),
203 )
204 )]
205 pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
206 let path_buf = path.as_ref().to_path_buf();
207 // Issue #31: the pager `Config` is no longer `Copy` (its
208 // `encryption_key` zeroizes on drop), so move it out of
209 // `config` rather than copying it — `from_parts` only reads
210 // the remaining scalar fields. `mem::take` leaves a default
211 // (keyless) pager `Config` behind, which is never read again.
212 let pager_config = std::mem::take(&mut config.pager);
213 let pager = Pager::open(&path_buf, pager_config)?;
214 // Issue #1: cross-process locks anchor against a dedicated
215 // `<db>.obj-lock` sidecar file. Keeping the lock byte out
216 // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
217 // Windows once the DB grows past whatever offset the
218 // lock byte would otherwise sit at (`LockFileEx` is
219 // mandatory). The sidecar is left in place across opens;
220 // deleting it would race two openers into two different
221 // inodes and miss each other's exclusion.
222 let lock_file = if config.cross_process_lock {
223 let lock_path = lock_path_for(&path_buf);
224 let handle = FileHandle::open_or_create(&lock_path)?;
225 // Materialise the locked byte range as real file
226 // content. POSIX OFD and Windows `LockFileEx` both
227 // permit locking past EOF, but a non-empty sidecar
228 // is the most conservative choice across kernels
229 // (and lets the same offsets work on every platform).
230 // 128 bytes covers WRITER_LOCK (96) and the full
231 // READER_LOCK_RANGE (97..128).
232 handle.set_len(128)?;
233 Some(Arc::new(handle))
234 } else {
235 None
236 };
237 Self::from_parts(pager, lock_file, &config)
238 }
239
240 /// Open a fresh in-memory database. No persistence, no file
241 /// locks. Useful for unit tests and ephemeral workloads.
242 ///
243 /// # Errors
244 ///
245 /// Returns [`Error::InvalidArgument`] only if `config` has
246 /// zero cache frames.
247 pub fn memory() -> Result<Self> {
248 Self::memory_with(Config::default())
249 }
250
251 /// As [`Db::memory`] with a caller-supplied [`Config`].
252 ///
253 /// # Errors
254 ///
255 /// As [`Db::memory`].
256 // By-value `Config` is the frozen builder-handoff signature; see
257 // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
258 // allow (issue #17 dropped `Copy`, which made the lint fire).
259 #[allow(clippy::needless_pass_by_value)]
260 pub fn memory_with(mut config: Config) -> Result<Self> {
261 // Issue #31: move the (no-longer-`Copy`) pager `Config` out
262 // rather than copying it; `from_parts` only reads the
263 // remaining scalar fields. See `open_with` for the rationale.
264 let pager_config = std::mem::take(&mut config.pager);
265 let pager = Pager::memory(pager_config)?;
266 Self::from_parts(pager, None, &config)
267 }
268
269 /// Open the database at `path` in read-only mode. The
270 /// resulting `Db` rejects every mutating operation with
271 /// `Err(Error::ReadOnly { ... })`. Coexists safely with a
272 /// writer in another process via the cross-process reader
273 /// lock.
274 ///
275 /// # Errors
276 ///
277 /// As [`Db::open`].
278 pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
279 let config = Config {
280 readonly: true,
281 ..Config::default()
282 };
283 Self::open_with(path, config)
284 }
285
286 fn from_parts(
287 mut pager: Pager<FileHandle>,
288 lock_file: Option<Arc<FileHandle>>,
289 config: &Config,
290 ) -> Result<Self> {
291 // #64: catalog init (and its `tree.insert` → `alloc_page` /
292 // `set_root_catalog` chain) mutates the file header. Header
293 // mutations now ride the WAL via `stage_or_write_header`,
294 // which requires an open Pager txn so the staged page-0 frame
295 // commits atomically with the B-tree pages it depends on.
296 // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
297 // a `Db::open` against a fresh file is durable on return
298 // (matching the pre-#64 contract). `open_or_init` on an
299 // already-initialised database is read-only and the wrapped
300 // `commit()` is a no-op (`Pager::commit` short-circuits on an
301 // empty pending set + clean `header_dirty`).
302 pager.begin_txn();
303 let init = Catalog::open_or_init(&mut pager);
304 let catalog = match init {
305 Ok(c) => {
306 let commit_result = pager.commit();
307 pager.end_txn();
308 commit_result?;
309 c
310 }
311 Err(e) => {
312 pager.end_txn();
313 return Err(e);
314 }
315 };
316 // M11 #91: lightweight open-time integrity check. Runs the
317 // catalog-portion of `integrity_check` and surfaces any
318 // detected failure as the strongest available error so the
319 // caller decides whether to attempt repair or abort. Opt out
320 // via `Config::skip_open_check(true)`.
321 if !config.skip_open_check {
322 let report = obj_core::integrity::quick_check(&mut pager)?;
323 if let Some(err) = first_failure_as_error(&report) {
324 return Err(err);
325 }
326 }
327 Ok(Self {
328 env: Arc::new(TxnEnv::new(pager, lock_file)),
329 catalog: Arc::new(Mutex::new(catalog)),
330 readonly: config.readonly,
331 busy_timeout: config.busy_timeout,
332 reconciled: Arc::new(Mutex::new(HashSet::new())),
333 attached: Arc::new(Mutex::new(HashMap::new())),
334 attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
335 })
336 }
337
338 /// Run a closure inside a write transaction.
339 ///
340 /// Begins a [`WriteTxn`], runs the closure with `&mut tx`. If
341 /// the closure returns `Ok(r)`, the transaction is committed and
342 /// `Ok(r)` is returned. If the closure returns `Err(e)`, the
343 /// transaction is rolled back and `Err(e)` is returned. A
344 /// panic inside the closure unwinds with an implicit rollback
345 /// via the `WriteTxn` `Drop` impl. See `docs/concurrency.md`
346 /// for the lock-acquisition contract.
347 ///
348 /// # Examples
349 ///
350 /// Atomic batch — both inserts commit together or not at all:
351 ///
352 /// ```
353 /// # fn main() -> obj::Result<()> {
354 /// use obj::Db;
355 /// use serde::{Deserialize, Serialize};
356 ///
357 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
358 /// struct Order {
359 /// customer_id: u64,
360 /// total_cents: u64,
361 /// }
362 ///
363 /// let dir = tempfile::tempdir()?;
364 /// let db = Db::open(dir.path().join("txn.obj"))?;
365 ///
366 /// let (a, b) = db.transaction(|tx| {
367 /// let coll = tx.collection::<Order>()?;
368 /// let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
369 /// let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
370 /// Ok((a, b))
371 /// })?;
372 /// assert_ne!(a, b, "freshly-allocated ids are distinct");
373 /// # Ok(())
374 /// # }
375 /// ```
376 ///
377 /// Returning `Err(_)` rolls every staged write back; the `Err`
378 /// the closure returns is the `Err` the caller sees:
379 ///
380 /// ```
381 /// # fn main() -> obj::Result<()> {
382 /// use obj::{Db, Error};
383 /// use serde::{Deserialize, Serialize};
384 ///
385 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
386 /// struct Order { total_cents: u64 }
387 ///
388 /// let dir = tempfile::tempdir()?;
389 /// let db = Db::open(dir.path().join("rollback.obj"))?;
390 /// let id = db.insert(Order { total_cents: 10 })?;
391 ///
392 /// let outcome: obj::Result<()> = db.transaction(|tx| {
393 /// let coll = tx.collection::<Order>()?;
394 /// coll.update(id, |o| { o.total_cents = 99_999; })?;
395 /// Err(Error::InvalidArgument("synthetic abort"))
396 /// });
397 /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
398 ///
399 /// let after: Order = db
400 /// .get::<Order>(id)?
401 /// .ok_or(Error::InvalidArgument("just inserted"))?;
402 /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
403 /// # Ok(())
404 /// # }
405 /// ```
406 ///
407 /// # Errors
408 ///
409 /// - [`Error::ReadOnly`] if the database was opened read-only.
410 /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
411 /// - Any error the closure returns.
412 #[cfg_attr(
413 feature = "tracing",
414 tracing::instrument(name = "db.transaction", level = "info", skip_all)
415 )]
416 pub fn transaction<R, F>(&self, body: F) -> Result<R>
417 where
418 F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
419 {
420 if self.readonly {
421 return Err(Error::ReadOnly {
422 operation: "transaction",
423 });
424 }
425 #[cfg(feature = "tracing")]
426 tracing::debug!("begin");
427 let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
428 let mut tx = WriteTxn::new(
429 inner,
430 Arc::clone(&self.catalog),
431 Arc::clone(&self.reconciled),
432 );
433 match body(&mut tx) {
434 Ok(value) => {
435 tx.commit()?;
436 #[cfg(feature = "tracing")]
437 tracing::debug!("commit");
438 Ok(value)
439 }
440 Err(e) => {
441 // Best-effort rollback; surface the closure's
442 // error. Refresh the in-memory catalog after
443 // rollback so its B-tree root state matches the
444 // file's `root_catalog` header field — catalog
445 // mutations write the header directly (not through
446 // the WAL) so a rollback leaves the header pointing
447 // at the most-recent in-memory root, which is
448 // exactly what re-opening the catalog will pick up.
449 let _ = tx.rollback();
450 let _ = self.refresh_catalog();
451 #[cfg(feature = "tracing")]
452 tracing::debug!("rollback");
453 Err(e)
454 }
455 }
456 }
457
458 /// Re-open the in-memory `Catalog` handle from the pager. Used
459 /// after a transaction rollback to discard the catalog's in-
460 /// memory `next_collection_id` / `tree.root` state that may
461 /// have advanced during the rolled-back closure.
462 fn refresh_catalog(&self) -> Result<()> {
463 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
464 kind: obj_core::LockKind::WriterInProcess,
465 })?;
466 let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
467 let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
468 kind: obj_core::LockKind::WriterInProcess,
469 })?;
470 *existing = fresh;
471 Ok(())
472 }
473
474 /// Run a closure inside a read transaction. See
475 /// [`Self::transaction`] for the closure shape and atomicity
476 /// contract; reads inside `body` observe a consistent
477 /// snapshot of the database.
478 ///
479 /// Every read inside the closure observes a single consistent
480 /// snapshot — the snapshot is pinned at the moment the closure
481 /// begins. Concurrent writers do not affect what the closure
482 /// sees.
483 ///
484 /// # Examples
485 ///
486 /// Two reads inside one `read_transaction` see the same value
487 /// even if a writer commits in between:
488 ///
489 /// ```
490 /// # fn main() -> obj::Result<()> {
491 /// use obj::Db;
492 /// use serde::{Deserialize, Serialize};
493 ///
494 /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
495 /// struct Order { total_cents: u64 }
496 ///
497 /// let dir = tempfile::tempdir()?;
498 /// let db = Db::open(dir.path().join("read.obj"))?;
499 /// let id = db.insert(Order { total_cents: 10 })?;
500 ///
501 /// let (a, b) = db.read_transaction(|tx| {
502 /// let coll = tx.collection::<Order>()?;
503 /// let a = coll.get(id)?;
504 /// let b = coll.get(id)?;
505 /// Ok((a, b))
506 /// })?;
507 /// assert_eq!(a, b);
508 /// # Ok(())
509 /// # }
510 /// ```
511 ///
512 /// # Errors
513 ///
514 /// - [`Error::Busy`] if the reader lock could not be acquired.
515 /// - Any error the closure returns.
516 #[cfg_attr(
517 feature = "tracing",
518 tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
519 )]
520 pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
521 where
522 F: FnOnce(&ReadTxn<'_>) -> Result<R>,
523 {
524 #[cfg(feature = "tracing")]
525 tracing::debug!("begin");
526 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
527 // M11 #93: pin one snapshot per attached database so reads
528 // through the closure observe a consistent view per file.
529 let attached_contexts = self.pin_attached_snapshots()?;
530 let tx = ReadTxn::with_attached(inner, attached_contexts);
531 let result = body(&tx);
532 #[cfg(feature = "tracing")]
533 tracing::debug!("commit");
534 result
535 }
536
537 /// Snapshot every attached database into a per-namespace
538 /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
539 /// pin; dropping the returned map releases every pin.
540 fn pin_attached_snapshots(
541 &self,
542 ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
543 // #83 (c): the common case is no attached databases. A single
544 // relaxed load of `attached_len` lets the empty path skip the
545 // `self.attached` mutex acquire entirely (`with_capacity(0)`
546 // already does not allocate, so the mutex is the only cost).
547 //
548 // Correctness under concurrent detach: the map produced here
549 // feeds only namespaced read dispatch in `open_readonly_named`
550 // (`<ns>.<tail>` → attached snapshot). With zero attachments no
551 // namespace can resolve, so an empty map is observably the same
552 // as the locked build. `attached_len` is mutated only while the
553 // `attached` mutex is held (in `attach` / `detach`), so a `0`
554 // observed here means "no attachment is committed to the map",
555 // matching what a lock-and-read would have seen at that instant.
556 // A concurrent attach that has not yet published its count is
557 // exactly an attach ordered *after* this txn began — invisible
558 // by design, same as the cross-process snapshot semantics.
559 if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
560 return Ok(std::collections::HashMap::new());
561 }
562 let registry = self.attached.lock().map_err(|_| Error::Busy {
563 kind: obj_core::LockKind::WriterInProcess,
564 })?;
565 let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
566 std::collections::HashMap::with_capacity(registry.len());
567 for (namespace, attached) in registry.iter() {
568 let env = Arc::clone(&attached.env);
569 let snapshot = {
570 let mut pager = env.pager().lock().map_err(|_| Error::Busy {
571 kind: obj_core::LockKind::WriterInProcess,
572 })?;
573 pager.reader_snapshot()?
574 };
575 out.insert(
576 namespace.clone(),
577 crate::txn::AttachedReadCtx { env, snapshot },
578 );
579 }
580 Ok(out)
581 }
582
583 /// Pin one [`crate::txn::AttachedReadCtx`] for the single attachment
584 /// registered under `namespace`. Used by [`Self::dump_raw`]'s
585 /// namespaced full-scan path: it needs exactly one attached env +
586 /// pinned snapshot, not the whole per-namespace map
587 /// [`Self::pin_attached_snapshots`] builds.
588 ///
589 /// The snapshot is pinned for the returned context's lifetime; the
590 /// caller (the `DumpIter`) owns the context, so the pin holds for
591 /// the iterator's whole life and a concurrent `detach` cannot
592 /// invalidate the in-flight scan.
593 ///
594 /// # Errors
595 ///
596 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is not
597 /// attached on this handle.
598 /// - [`Error::Busy`] if the registry / pager mutex is poisoned.
599 pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
600 let env = {
601 let registry = self.attached.lock().map_err(|_| Error::Busy {
602 kind: obj_core::LockKind::WriterInProcess,
603 })?;
604 let attached =
605 registry
606 .get(namespace)
607 .ok_or_else(|| Error::CollectionNamespaceUnknown {
608 namespace: namespace.to_owned(),
609 })?;
610 Arc::clone(&attached.env)
611 };
612 let snapshot = {
613 let mut pager = env.pager().lock().map_err(|_| Error::Busy {
614 kind: obj_core::LockKind::WriterInProcess,
615 })?;
616 pager.reader_snapshot()?
617 };
618 Ok(crate::txn::AttachedReadCtx { env, snapshot })
619 }
620
621 /// Attach the database at `path` under `namespace`. The
622 /// attached file is opened read-only; collections in the
623 /// attached file become visible to subsequent
624 /// `Db::collection::<T>()` calls (and the one-shot per-op API
625 /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
626 /// form `<namespace>.<collection_name>`.
627 ///
628 /// Writes against namespaced collections return
629 /// [`Error::AttachedDatabaseIsReadOnly`].
630 ///
631 /// Each attached database gets its own snapshot pinned at
632 /// read-transaction begin; [`Db::detach`] removes the registry
633 /// entry but in-flight reads complete against their pinned
634 /// snapshot.
635 ///
636 /// # Examples
637 ///
638 /// ```
639 /// # fn main() -> obj::Result<()> {
640 /// use obj::Db;
641 /// use serde::{Deserialize, Serialize};
642 ///
643 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
644 /// #[obj(collection = "orders_attach_doc")]
645 /// struct Order { total_cents: u64 }
646 ///
647 /// // Same struct shape, namespaced collection name. Reads
648 /// // against this type route to the attached "archive" db.
649 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
650 /// #[obj(collection = "archive.orders_attach_doc")]
651 /// struct ArchivedOrder { total_cents: u64 }
652 ///
653 /// let dir = tempfile::tempdir()?;
654 /// let live = dir.path().join("live.obj");
655 /// let archive = dir.path().join("archive.obj");
656 ///
657 /// // Seed the archive (writes go through its own un-namespaced type).
658 /// {
659 /// let archive_db = Db::open(&archive)?;
660 /// let _ = archive_db.insert(Order { total_cents: 999 })?;
661 /// }
662 ///
663 /// // Open the live db, attach the archive under "archive".
664 /// let mut db = Db::open(&live)?;
665 /// let _ = db.insert(Order { total_cents: 100 })?;
666 /// db.attach(&archive, "archive")?;
667 ///
668 /// // One read transaction, two collections.
669 /// db.read_transaction(|tx| {
670 /// let live = tx.collection::<Order>()?;
671 /// let arch = tx.collection::<ArchivedOrder>()?;
672 /// assert_eq!(live.all()?.len(), 1);
673 /// assert_eq!(arch.all()?.len(), 1);
674 /// Ok(())
675 /// })?;
676 ///
677 /// db.detach("archive")?;
678 /// # Ok(())
679 /// # }
680 /// ```
681 ///
682 /// # Errors
683 ///
684 /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
685 /// use on this `Db`.
686 /// - [`Error::AttachmentNotReadable`] if `path` cannot be
687 /// opened read-only.
688 pub fn attach<P: AsRef<std::path::Path>>(
689 &mut self,
690 path: P,
691 namespace: impl Into<String>,
692 ) -> Result<()> {
693 // The `&mut self` signature is preserved for API stability,
694 // but the attachment registry is interior-mutable
695 // (`Arc<Mutex<_>>` + `Arc<AtomicUsize>`), so the work is
696 // delegated to the `&self` core shared with
697 // [`Self::attach_shared`].
698 self.attach_inner(path.as_ref(), namespace.into())
699 }
700
701 /// Shared-reference (`&self`) form of [`Self::attach`]. Attaches
702 /// the database at `path` under `namespace` through `&self`, for
703 /// callers that hold a shared handle (e.g. an `Arc<Db>`) and
704 /// cannot obtain `&mut self`.
705 ///
706 /// Behaviour is identical to [`Self::attach`]: the attachment
707 /// registry is interior-mutable, guarded by the same per-`Db`
708 /// mutex, so concurrent `attach_shared` / `detach_shared` calls
709 /// from multiple threads serialise on that mutex. The duplicate-
710 /// namespace re-check after the read-only open closes the race
711 /// between two concurrent attaches of the same namespace.
712 ///
713 /// # Examples
714 ///
715 /// ```
716 /// # fn main() -> obj::Result<()> {
717 /// use std::sync::Arc;
718 /// use obj::Db;
719 ///
720 /// let dir = tempfile::tempdir()?;
721 /// let live = dir.path().join("live.obj");
722 /// let archive = dir.path().join("archive.obj");
723 /// let _ = Db::open(&archive)?;
724 ///
725 /// // A shared handle cannot call `&mut self` `attach`, but can
726 /// // call `attach_shared`.
727 /// let db = Arc::new(Db::open(&live)?);
728 /// db.attach_shared(&archive, "archive")?;
729 /// db.detach_shared("archive")?;
730 /// # Ok(())
731 /// # }
732 /// ```
733 ///
734 /// # Errors
735 ///
736 /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
737 /// use on this `Db`.
738 /// - [`Error::AttachmentNotReadable`] if `path` cannot be
739 /// opened read-only.
740 /// - [`Error::Busy`] if the registry mutex is poisoned.
741 pub fn attach_shared<P: AsRef<std::path::Path>>(
742 &self,
743 path: P,
744 namespace: impl Into<String>,
745 ) -> Result<()> {
746 self.attach_inner(path.as_ref(), namespace.into())
747 }
748
749 /// `&self` core shared by [`Self::attach`] and
750 /// [`Self::attach_shared`]. Both signatures are thin wrappers; the
751 /// mutation flows through the interior-mutable `attached` registry
752 /// regardless of the public method's receiver.
753 fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
754 let path_buf = path.to_path_buf();
755 {
756 let registry = self.attached.lock().map_err(|_| Error::Busy {
757 kind: obj_core::LockKind::WriterInProcess,
758 })?;
759 if registry.contains_key(&namespace) {
760 return Err(Error::AttachmentAlreadyExists { namespace });
761 }
762 }
763 let attached_db =
764 Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
765 path: path_buf.clone(),
766 source: Box::new(source),
767 })?;
768 let env = Arc::clone(&attached_db.env);
769 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
770 kind: obj_core::LockKind::WriterInProcess,
771 })?;
772 // Re-check after acquiring the lock — a sibling caller may
773 // have raced; the `Arc<Mutex<_>>` shape makes the check
774 // necessary on the second acquire.
775 if registry.contains_key(&namespace) {
776 return Err(Error::AttachmentAlreadyExists { namespace });
777 }
778 registry.insert(
779 namespace,
780 AttachedDb {
781 env,
782 _db: attached_db,
783 },
784 );
785 // #83 (c): publish the new length while still holding the
786 // `attached` mutex so `pin_attached_snapshots`' relaxed load is
787 // never stale relative to a committed attachment.
788 self.attached_len
789 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
790 Ok(())
791 }
792
793 /// Remove the attachment registered under `namespace`. Returns
794 /// [`Error::CollectionNamespaceUnknown`] if the namespace is
795 /// not attached.
796 ///
797 /// In-flight read transactions hold their own snapshot pins on
798 /// the attached env; detach removes the registry entry, but the
799 /// in-flight read may still complete against its pinned
800 /// snapshot.
801 ///
802 /// # Errors
803 ///
804 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
805 /// not attached.
806 /// - [`Error::Busy`] if the registry mutex is poisoned.
807 pub fn detach(&mut self, namespace: &str) -> Result<()> {
808 // `&mut self` preserved for API stability; the registry is
809 // interior-mutable, so the work delegates to the `&self` core
810 // shared with [`Self::detach_shared`].
811 self.detach_inner(namespace)
812 }
813
814 /// Shared-reference (`&self`) form of [`Self::detach`]. Removes the
815 /// attachment registered under `namespace` through `&self`, for
816 /// callers that hold a shared handle (e.g. an `Arc<Db>`).
817 ///
818 /// Behaviour is identical to [`Self::detach`]. In-flight read
819 /// transactions hold their own snapshot pins on the attached env;
820 /// `detach_shared` removes the registry entry, but the in-flight
821 /// read may still complete against its pinned snapshot. Concurrent
822 /// `attach_shared` / `detach_shared` calls serialise on the same
823 /// per-`Db` registry mutex.
824 ///
825 /// # Errors
826 ///
827 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
828 /// not attached.
829 /// - [`Error::Busy`] if the registry mutex is poisoned.
830 pub fn detach_shared(&self, namespace: &str) -> Result<()> {
831 self.detach_inner(namespace)
832 }
833
834 /// `&self` core shared by [`Self::detach`] and
835 /// [`Self::detach_shared`].
836 fn detach_inner(&self, namespace: &str) -> Result<()> {
837 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
838 kind: obj_core::LockKind::WriterInProcess,
839 })?;
840 if registry.remove(namespace).is_none() {
841 return Err(Error::CollectionNamespaceUnknown {
842 namespace: namespace.to_owned(),
843 });
844 }
845 // #83 (c): publish the post-detach length under the lock.
846 self.attached_len
847 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
848 Ok(())
849 }
850
851 /// Insert `doc` into its collection. One-shot transaction;
852 /// returns the assigned [`Id`].
853 ///
854 /// The one-shot API opens, commits, and closes a private
855 /// transaction per call. Reach for [`Db::transaction`] when
856 /// several mutations must commit or roll back as a single
857 /// atomic unit.
858 ///
859 /// # Examples
860 ///
861 /// One-shot CRUD against a `Document`-derived type:
862 ///
863 /// ```
864 /// # fn main() -> obj::Result<()> {
865 /// use obj::Db;
866 /// use serde::{Deserialize, Serialize};
867 ///
868 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
869 /// struct Order {
870 /// customer_id: u64,
871 /// total_cents: u64,
872 /// status: String,
873 /// }
874 ///
875 /// let dir = tempfile::tempdir()?;
876 /// let db = Db::open(dir.path().join("oneshot.obj"))?;
877 ///
878 /// // insert returns the freshly-allocated Id.
879 /// let id = db.insert(Order {
880 /// customer_id: 1,
881 /// total_cents: 100,
882 /// status: "pending".to_owned(),
883 /// })?;
884 ///
885 /// // get returns Option<T>.
886 /// let _maybe: Option<Order> = db.get::<Order>(id)?;
887 ///
888 /// // update applies a closure in place.
889 /// db.update::<Order, _>(id, |o| {
890 /// o.status = "shipped".to_owned();
891 /// })?;
892 ///
893 /// // upsert at a caller-supplied id (insert or replace).
894 /// let id2 = obj::Id::try_new(42)
895 /// .ok_or(obj::Error::InvalidArgument("non-zero"))?;
896 /// db.upsert::<Order>(id2, Order {
897 /// customer_id: 2,
898 /// total_cents: 999,
899 /// status: "new".to_owned(),
900 /// })?;
901 ///
902 /// // delete returns true if the row existed.
903 /// let existed = db.delete::<Order>(id)?;
904 /// assert!(existed);
905 /// # Ok(())
906 /// # }
907 /// ```
908 ///
909 /// # Errors
910 ///
911 /// As [`Self::transaction`] plus any error from
912 /// [`crate::Collection::insert`].
913 pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
914 self.transaction(|tx| tx.collection::<T>()?.insert(doc))
915 }
916
917 /// Fetch the document at `id`. Returns `Ok(None)` if absent.
918 ///
919 /// # Errors
920 ///
921 /// As [`Self::read_transaction`] plus any error from
922 /// [`crate::Collection::get`].
923 pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
924 // #83 (b): the one-shot point read goes through the fused
925 // single-pager-lock path (descriptor resolve + primary get
926 // under one guard) instead of `tx.collection()?.get()`, which
927 // takes the pager mutex twice. Observably identical for the
928 // one-shot caller (same snapshot, same `CollectionNotFound`
929 // contract). Namespaced reads fall back to the handle path.
930 self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
931 }
932
933 /// Update the document at `id` via the closure.
934 ///
935 /// # Errors
936 ///
937 /// - [`Error::DocumentNotFound`] if `id` does not exist.
938 /// - As [`Self::transaction`].
939 pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
940 where
941 T: Document,
942 F: FnOnce(&mut T),
943 {
944 self.transaction(|tx| tx.collection::<T>()?.update(id, f))
945 }
946
947 /// Delete the document at `id`. Returns `true` if it existed.
948 ///
949 /// # Errors
950 ///
951 /// As [`Self::transaction`].
952 pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
953 self.transaction(|tx| tx.collection::<T>()?.delete(id))
954 }
955
956 /// Insert or replace the document at `id`.
957 ///
958 /// # Errors
959 ///
960 /// As [`Self::transaction`].
961 pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
962 self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
963 }
964
965 /// Convenience wrapper around [`crate::Collection::find_unique`]
966 /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
967 /// Runs inside a one-shot read transaction.
968 ///
969 /// `O(log n)`, no collection scan; the lookup walks the named
970 /// index's B-tree directly. Defined only on `Unique` indexes —
971 /// for the other kinds use
972 /// [`Collection::lookup`](crate::Collection::lookup) or
973 /// [`Collection::index_range`](crate::Collection::index_range).
974 ///
975 /// # Examples
976 ///
977 /// ```
978 /// # fn main() -> obj::Result<()> {
979 /// use obj::Db;
980 /// use serde::{Deserialize, Serialize};
981 ///
982 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
983 /// #[obj(collection = "customers_find_unique_doc")]
984 /// struct Customer {
985 /// #[obj(index = unique)]
986 /// email: String,
987 /// }
988 ///
989 /// let dir = tempfile::tempdir()?;
990 /// let db = Db::open(dir.path().join("find-unique.obj"))?;
991 /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
992 /// let by_email: Option<Customer> = db
993 /// .find_unique::<Customer>("email", "ada@example.com")?;
994 /// assert!(by_email.is_some());
995 /// # Ok(())
996 /// # }
997 /// ```
998 ///
999 /// # Errors
1000 ///
1001 /// As [`crate::Collection::find_unique`].
1002 pub fn find_unique<T: Document>(
1003 &self,
1004 index_name: &str,
1005 key: impl Into<obj_core::codec::Dynamic>,
1006 ) -> Result<Option<T>> {
1007 self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
1008 }
1009
1010 /// Construct a fresh M8 [`crate::Query`] builder rooted at this
1011 /// database. The builder borrows `&self` for the build phase;
1012 /// the borrow ends when [`crate::Query::fetch`] returns.
1013 ///
1014 /// Compose with [`Query::filter`](crate::Query::filter),
1015 /// [`Query::limit`](crate::Query::limit),
1016 /// [`Query::sort_by`](crate::Query::sort_by),
1017 /// [`Query::index_range`](crate::Query::index_range). Terminate
1018 /// with [`Query::fetch`](crate::Query::fetch) (for the
1019 /// documents) or [`Query::count`](crate::Query::count) (for the
1020 /// count alone).
1021 ///
1022 /// Mirrors `design.md` § Querying — see the M8 examples in
1023 /// `crates/obj/tests/design_md_queries.rs` for the full surface.
1024 ///
1025 /// # Examples
1026 ///
1027 /// Top-N matching documents by an indexed field, ascending:
1028 ///
1029 /// ```
1030 /// # fn main() -> obj::Result<()> {
1031 /// use obj::Db;
1032 /// use obj_core::codec::Dynamic;
1033 /// use serde::{Deserialize, Serialize};
1034 ///
1035 /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1036 /// enum OrderStatus { Pending, Shipped }
1037 ///
1038 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1039 /// #[obj(collection = "orders_query_doc")]
1040 /// struct Order {
1041 /// #[obj(index)]
1042 /// customer_id: u64,
1043 /// status: OrderStatus,
1044 /// #[obj(index)]
1045 /// placed_at: u64,
1046 /// }
1047 ///
1048 /// let dir = tempfile::tempdir()?;
1049 /// let db = Db::open(dir.path().join("queries.obj"))?;
1050 /// for i in 0..20u64 {
1051 /// let _ = db.insert(Order {
1052 /// customer_id: i % 3,
1053 /// status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
1054 /// placed_at: i * 1_000,
1055 /// })?;
1056 /// }
1057 ///
1058 /// let pending: Vec<Order> = db
1059 /// .query::<Order>()
1060 /// .filter(|o| o.status == OrderStatus::Pending)
1061 /// .sort_by(|o| Dynamic::U64(o.placed_at))
1062 /// .limit(5)
1063 /// .fetch()?;
1064 /// assert!(pending.len() <= 5);
1065 /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
1066 /// # Ok(())
1067 /// # }
1068 /// ```
1069 #[must_use]
1070 pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
1071 crate::Query::new(self)
1072 }
1073
1074 /// Open a read-only typed handle to the collection registered
1075 /// under the runtime `name`, instead of the type's compile-time
1076 /// `T::COLLECTION`.
1077 ///
1078 /// This unlocks the `design.md` § Portability example for
1079 /// attached databases: by passing a namespaced name like
1080 /// `"archive.orders"`, the returned [`crate::Collection`] reads
1081 /// from the database attached under the `"archive"` namespace
1082 /// — see [`Db::attach`].
1083 ///
1084 /// Construction is **infallible**: errors (missing collection,
1085 /// unknown namespace, busy lock) surface at the first method
1086 /// call on the handle, not at the call to `collection(name)`.
1087 /// Each read-only method on the returned handle opens a private
1088 /// [`Db::read_transaction`] and dispatches against the
1089 /// runtime-named collection's catalog row.
1090 ///
1091 /// # Read-only
1092 ///
1093 /// The returned handle rejects every mutating call —
1094 /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
1095 /// all return [`Error::ReadOnly`]. To write into a non-default
1096 /// collection, override [`obj_core::Document::COLLECTION`] on
1097 /// the type itself (compile-time-bound) and use the regular
1098 /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
1099 /// Phase 1B (M11 #94) intentionally limits the runtime accessor
1100 /// to reads — the write-through-runtime-name path requires
1101 /// engine plumbing that is deferred to a later milestone.
1102 ///
1103 /// # Example
1104 ///
1105 /// ```
1106 /// use obj::{Db, Document};
1107 /// use serde::{Deserialize, Serialize};
1108 /// use tempfile::tempdir;
1109 ///
1110 /// #[derive(Debug, Clone, Serialize, Deserialize)]
1111 /// struct Order { customer_id: u64, total_cents: u64 }
1112 ///
1113 /// impl Document for Order {
1114 /// const COLLECTION: &'static str = "orders";
1115 /// const VERSION: u32 = 1;
1116 /// }
1117 ///
1118 /// fn run() -> obj::Result<()> {
1119 /// let dir = tempdir()?;
1120 /// let archive_path = dir.path().join("archive.obj");
1121 /// // Populate the archive database first.
1122 /// {
1123 /// let archive_db = Db::open(&archive_path)?;
1124 /// archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
1125 /// }
1126 /// // Attach it under a namespace and read via the runtime
1127 /// // accessor — no need to re-declare `Order` with a
1128 /// // namespaced `COLLECTION`.
1129 /// let main_path = dir.path().join("main.obj");
1130 /// let mut db = Db::open(&main_path)?;
1131 /// db.attach(&archive_path, "archive")?;
1132 /// let archived: Vec<Order> = db
1133 /// .collection::<Order>("archive.orders")
1134 /// .all()?
1135 /// .into_iter()
1136 /// .map(|(_id, doc)| doc)
1137 /// .collect();
1138 /// assert_eq!(archived.len(), 1);
1139 /// Ok(())
1140 /// }
1141 /// # run().unwrap();
1142 /// ```
1143 #[must_use]
1144 pub fn collection<T: Document + Send + 'static>(
1145 &self,
1146 name: impl Into<String>,
1147 ) -> crate::Collection<'_, T> {
1148 crate::Collection::<T>::lazy(self, name.into())
1149 }
1150
1151 /// Convenience shim mirroring `design.md` —
1152 /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1153 /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1154 /// that drives the streaming iterator to exhaustion and
1155 /// collects; if the collection is large enough that peak
1156 /// memory matters, prefer [`Db::iter_all`] directly.
1157 ///
1158 /// Sorting requires materialisation — the comparator needs
1159 /// every key up front. Use [`Db::query`] +
1160 /// [`Query::sort_by`](crate::Query::sort_by) for the
1161 /// top-N-sorted workload; the iterator side has no streaming
1162 /// sorted shape.
1163 ///
1164 /// # Examples
1165 ///
1166 /// ```
1167 /// # fn main() -> obj::Result<()> {
1168 /// use obj::Db;
1169 /// use serde::{Deserialize, Serialize};
1170 ///
1171 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1172 /// struct Order { total_cents: u64 }
1173 ///
1174 /// let dir = tempfile::tempdir()?;
1175 /// let db = Db::open(dir.path().join("all.obj"))?;
1176 /// for i in 0..5u64 {
1177 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1178 /// }
1179 /// let listed: Vec<Order> = db.all::<Order>()?;
1180 /// assert_eq!(listed.len(), 5);
1181 /// # Ok(())
1182 /// # }
1183 /// ```
1184 ///
1185 /// # Errors
1186 ///
1187 /// As [`Db::iter_all`].
1188 pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1189 self.iter_all::<T>()?
1190 .map(|step| step.map(|(_id, doc)| doc))
1191 .collect()
1192 }
1193
1194 /// Write a self-contained `.obj` file at `dest` carrying this
1195 /// database's state at the LSN of an internally-taken reader
1196 /// snapshot.
1197 ///
1198 /// Hot backup — writers continue uninterrupted against the
1199 /// source. Post-snapshot writes are NOT in the destination.
1200 ///
1201 /// Algorithm (per `docs/format.md` § Hot backup):
1202 ///
1203 /// 1. Take a `ReaderSnapshot` against the source pager (pins
1204 /// `pinned_lsn`).
1205 /// 2. `OpenOptions::create_new(true)` on `dest`.
1206 /// 3. Copy main-file pages `0..page_count` to `dest`.
1207 /// 4. Overlay every frame in the snapshot's frozen WAL view
1208 /// onto `dest` at the frame's page-id offset.
1209 /// 5. If the snapshot carries a WAL-staged page-0 header,
1210 /// overlay it (so `dest`'s page-0 reflects the catalog
1211 /// root / freelist head / page count the snapshot would
1212 /// have observed).
1213 /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1214 /// the header CRC32C.
1215 /// 7. `sync_data(SyncMode::Full)` on `dest`.
1216 /// 8. Drop the snapshot (releases the WAL pin).
1217 ///
1218 /// On any mid-backup error the destination file is removed
1219 /// best-effort so a half-written backup does not linger.
1220 ///
1221 /// # Examples
1222 ///
1223 /// ```
1224 /// # fn main() -> obj::Result<()> {
1225 /// use obj::Db;
1226 /// use serde::{Deserialize, Serialize};
1227 ///
1228 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1229 /// #[obj(collection = "notes_backup_doc")]
1230 /// struct Note { body: String }
1231 ///
1232 /// let dir = tempfile::tempdir()?;
1233 /// let src = dir.path().join("src.obj");
1234 /// let dst = dir.path().join("backup.obj");
1235 ///
1236 /// let db = Db::open(&src)?;
1237 /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1238 /// db.backup_to(&dst)?;
1239 ///
1240 /// // The backup is itself a fully-formed obj file. Open it and read.
1241 /// let backup = Db::open(&dst)?;
1242 /// let listed: Vec<Note> = backup.all::<Note>()?;
1243 /// assert_eq!(listed.len(), 1);
1244 /// # Ok(())
1245 /// # }
1246 /// ```
1247 ///
1248 /// # Errors
1249 ///
1250 /// - [`Error::BackupDestinationExists`] if `dest` already
1251 /// exists.
1252 /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1253 /// a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1254 /// - [`Error::Io`] on syscall failure during the copy.
1255 pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1256 // #76: a hot backup reads the source MAIN FILE page-by-page
1257 // (`backup::copy_main_file`). The in-process pager `Mutex`
1258 // alone excludes only same-process writers; a writer in a
1259 // SEPARATE OS process can checkpoint pages into the main file
1260 // mid-copy and yield a torn backup. Hold the cross-process
1261 // `WRITER_LOCK` for the whole copy so no external writer can
1262 // mutate the main file under us.
1263 //
1264 // Lock-order invariant (MUST match `WriteTxn::begin`, see
1265 // `docs/concurrency.md`): in-process write-serialization FIRST,
1266 // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1267 // (innermost, acquired and released within the critical
1268 // section). `obj_core::WriteTxn::begin` acquires the first two
1269 // in exactly that order, so routing through it inherits the
1270 // correct ordering and cannot invert against a concurrent
1271 // in-process `WriteTxn`. We make NO writes; the txn is rolled
1272 // back (a no-op restore of the unchanged header) once the copy
1273 // is done. On in-memory / `cross_process_lock = false` envs the
1274 // cross-process guard is absent (`lock_file = None`) and the
1275 // txn degrades to the in-process serialization guard only.
1276 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1277 let result = self.run_backup_under_guard(dest);
1278 // Release both lock layers in the documented order regardless
1279 // of the copy's outcome. A rollback failure (poisoned pager
1280 // mutex) only surfaces when the copy itself succeeded.
1281 let unlock = guard.rollback();
1282 result.and(unlock)
1283 }
1284
1285 /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1286 /// reader snapshot, and run the backup copy. Factored out of
1287 /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1288 /// is unambiguous and the function stays within the power-of-ten
1289 /// 60-line budget (Rule 4).
1290 fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1291 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1292 kind: obj_core::LockKind::WriterInProcess,
1293 })?;
1294 let snapshot = pager.reader_snapshot()?;
1295 obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1296 // Snapshot drops at end of scope; the pin is released and
1297 // any deferred checkpoint may proceed.
1298 drop(snapshot);
1299 drop(pager);
1300 Ok(())
1301 }
1302
1303 /// Fold committed WAL pages into the main `.obj` file and reset
1304 /// the WAL back to its 64-byte header.
1305 ///
1306 /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1307 /// pager lock the same way [`Self::backup_to`] does, then calls
1308 /// the pager checkpoint. After it returns, the committed records
1309 /// live in the main file rather than the `-wal` sidecar, and the
1310 /// WAL is truncated to header-only.
1311 ///
1312 /// # Deferred / no-op behavior
1313 ///
1314 /// - If a live MVCC reader has pinned an LSN below the end of the
1315 /// WAL, the checkpoint is **deferred** (a safe no-op) so the
1316 /// reader's frames are not reclaimed out from under it.
1317 /// - If there is nothing to fold (no committed WAL frames), the
1318 /// call is a harmless no-op.
1319 ///
1320 /// This is the reusable engine entry point behind the Python
1321 /// `Db.checkpoint()` binding and a future checkpoint-on-close
1322 /// path (issue #5).
1323 ///
1324 /// # Errors
1325 ///
1326 /// - [`Error::ReadOnly`] if the database was opened read-only.
1327 /// - [`Error::Busy`] if the pager lock is poisoned.
1328 /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1329 /// (e.g. [`Error::Io`] on syscall failure).
1330 #[cfg_attr(
1331 feature = "tracing",
1332 tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1333 )]
1334 pub fn checkpoint(&self) -> Result<()> {
1335 if self.readonly {
1336 return Err(Error::ReadOnly {
1337 operation: "checkpoint",
1338 });
1339 }
1340 // #76: checkpoint folds committed WAL frames into the main
1341 // file and rotates the WAL salt. Both touch the main file and
1342 // the on-disk header. Taken under only the in-process pager
1343 // `Mutex` (as before), a writer in a SEPARATE process could
1344 // run its own commit/checkpoint concurrently and corrupt the
1345 // salt-rotation handshake or interleave main-file writes. Hold
1346 // the cross-process `WRITER_LOCK` for the duration, in the
1347 // documented lock order (in-process serialization → cross-
1348 // process writer lock → pager mutex; see `WriteTxn::begin` and
1349 // `Self::backup_to`). We make no user writes; the surrounding
1350 // txn is rolled back afterwards. The rollback's
1351 // `restore_header_snapshot` only rewrites `root_catalog` /
1352 // `freelist_head` / `page_count` (unchanged by checkpoint) and
1353 // preserves the freshly-rotated `wal_salt`, so it is a no-op
1354 // with respect to the checkpoint's durable effects.
1355 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1356 let result = self.run_checkpoint_under_guard();
1357 let unlock = guard.rollback();
1358 result.and(unlock)
1359 }
1360
1361 /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1362 /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1363 /// cross-process lock guard's lifetime is unambiguous.
1364 fn run_checkpoint_under_guard(&self) -> Result<()> {
1365 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1366 kind: obj_core::LockKind::WriterInProcess,
1367 })?;
1368 pager.checkpoint()?;
1369 drop(pager);
1370 Ok(())
1371 }
1372
1373 /// Streaming iterator over every `(Id, T)` pair in the
1374 /// collection. The returned [`IterAll`] holds a read transaction
1375 /// — and therefore a pinned reader snapshot — for its entire
1376 /// lifetime; the borrow on `self` ends when the iterator is
1377 /// dropped.
1378 ///
1379 /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1380 /// errors surface as `Some(Err(_))` rather than ending the
1381 /// iteration; the caller decides whether to propagate or
1382 /// continue.
1383 ///
1384 /// Peak memory does NOT scale with collection size — the
1385 /// iterator's internal buffer is fixed at
1386 /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the design.md
1387 /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1388 ///
1389 /// `Query::fetch` is sort-compatible (sort requires
1390 /// materialisation); `iter_all` is NOT — there is no streaming
1391 /// shape for sort because the comparator needs every key
1392 /// up front. Use `Query::sort_by` + `fetch` for the sorted
1393 /// workload; use `iter_all` for the unsorted large-scan
1394 /// workload.
1395 ///
1396 /// # Examples
1397 ///
1398 /// Streaming a small collection and folding into a running sum.
1399 /// The iterator's peak memory stays bounded regardless of how
1400 /// many documents the collection holds:
1401 ///
1402 /// ```
1403 /// # fn main() -> obj::Result<()> {
1404 /// use obj::Db;
1405 /// use serde::{Deserialize, Serialize};
1406 ///
1407 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1408 /// struct Order { total_cents: u64 }
1409 ///
1410 /// let dir = tempfile::tempdir()?;
1411 /// let db = Db::open(dir.path().join("iter.obj"))?;
1412 /// for i in 0..5u64 {
1413 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1414 /// }
1415 ///
1416 /// let mut total: u64 = 0;
1417 /// for step in db.iter_all::<Order>()? {
1418 /// let (_id, doc) = step?;
1419 /// total = total
1420 /// .checked_add(doc.total_cents)
1421 /// .ok_or(obj::Error::InvalidArgument("overflow"))?;
1422 /// }
1423 /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1424 /// # Ok(())
1425 /// # }
1426 /// ```
1427 ///
1428 /// # Errors
1429 ///
1430 /// - As [`Db::read_transaction`] (construction-time).
1431 /// - [`Error::CollectionNotFound`] if the collection is not yet
1432 /// registered at the snapshot's pinned LSN.
1433 /// - Per-step iteration may yield `Some(Err(_))` for pager,
1434 /// B-tree, or codec failures.
1435 pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1436 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1437 let txn = ReadTxn::new(inner);
1438 // Resolve the descriptor up front so an absent collection
1439 // surfaces at construction (matching `Db::all`'s pre-M8
1440 // contract). The handle's lifetime is bound to `txn`; we
1441 // drop the handle immediately and stash the descriptor.
1442 let descriptor = {
1443 let coll = txn.collection::<T>()?;
1444 coll.descriptor().clone()
1445 };
1446 Ok(IterAll {
1447 txn,
1448 descriptor,
1449 buffer: VecDeque::new(),
1450 last_emitted_key: None,
1451 finished: false,
1452 _phantom: PhantomData,
1453 })
1454 }
1455}
1456
1457/// Streaming iterator returned by [`Db::iter_all`].
1458///
1459/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1460/// is consistent with the snapshot pinned at construction. Yields
1461/// `Result<(Id, T)>` one entry at a time; refills its internal
1462/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1463/// pager-lock acquisition, so peak memory stays bounded at a small
1464/// constant regardless of the collection's size — power-of-ten
1465/// Rule 3.
1466///
1467/// Construction errors surface at the [`Db::iter_all`] call site;
1468/// per-step errors (pager, B-tree, codec) surface as
1469/// `Some(Err(_))` during iteration and do NOT terminate it — the
1470/// caller decides whether to continue.
1471// Note: `IterAll<T>` is deliberately outside the `serde` surface
1472// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1473// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1474// neither of which is serializable in a meaningful way. Users who
1475// need an off-line representation of iteration output should
1476// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1477// resulting `Vec<T>` themselves.
1478pub struct IterAll<'db, T> {
1479 /// Owns the snapshot pin for the iterator's lifetime.
1480 txn: ReadTxn<'db>,
1481 /// Cached primary-tree root + collection id (`Document::decode`
1482 /// needs the collection id to validate the per-doc header).
1483 descriptor: CollectionDescriptor,
1484 /// Pre-decoded buffer of upcoming entries.
1485 buffer: VecDeque<Result<(Id, T)>>,
1486 /// Resumption marker — `Excluded(last_emitted_key)` is the
1487 /// start bound of the next refill.
1488 last_emitted_key: Option<Vec<u8>>,
1489 /// `true` once the underlying B-tree iterator returned no more
1490 /// entries; subsequent `next` calls return `None`.
1491 finished: bool,
1492 /// Track the `T` parameter without owning a value.
1493 _phantom: PhantomData<fn() -> T>,
1494}
1495
1496impl<T> std::fmt::Debug for IterAll<'_, T> {
1497 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1498 f.debug_struct("IterAll")
1499 .field("collection_id", &self.descriptor.collection_id)
1500 .field("buffer_len", &self.buffer.len())
1501 .field("finished", &self.finished)
1502 .finish_non_exhaustive()
1503 }
1504}
1505
1506impl<T: Document + Send + 'static> IterAll<'_, T> {
1507 /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1508 /// entries, resuming from `last_emitted_key`. Stores per-doc
1509 /// decode errors as `Err` in the buffer so the caller can
1510 /// observe them via `next()` without aborting iteration.
1511 ///
1512 /// Power-of-ten Rule 7: every fallible operation is either
1513 /// captured into the buffer or returned up-front via `?` (which
1514 /// propagates the lock-acquisition / B-tree-open failures
1515 /// before the buffer is touched).
1516 fn refill(&mut self) -> Result<()> {
1517 // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1518 // NOT keep an immutable borrow of `self.txn` alive across
1519 // the buffer-mutation calls below.
1520 let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1521 let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1522 kind: obj_core::LockKind::WriterInProcess,
1523 })?;
1524 let root_pid = PageId::new(self.descriptor.primary_root)
1525 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1526 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1527 let start = match &self.last_emitted_key {
1528 Some(k) => Bound::Excluded(k.clone()),
1529 None => Bound::Unbounded,
1530 };
1531 let collection_id = self.descriptor.collection_id;
1532 let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1533 let mut yielded: usize = 0;
1534 let mut last_key: Option<Vec<u8>> = None;
1535 let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1536 for step in iter {
1537 if yielded >= ITER_ALL_BATCH {
1538 break;
1539 }
1540 yielded = yielded
1541 .checked_add(1)
1542 .ok_or(Error::BTreeInvariantViolated {
1543 reason: "iter_all batch counter overflow",
1544 })?;
1545 buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1546 }
1547 if yielded < ITER_ALL_BATCH {
1548 self.finished = true;
1549 }
1550 // The lock is held until `pager` drops at function end — but
1551 // by the time we mutate `self.buffer` / `self.last_emitted_key`
1552 // below, the `iter` (which borrowed `pager`) is already
1553 // dropped (its scope ended). Move the staged batch over.
1554 drop(pager);
1555 self.buffer.extend(batch);
1556 if let Some(k) = last_key {
1557 self.last_emitted_key = Some(k);
1558 }
1559 Ok(())
1560 }
1561}
1562
1563/// Process one B-tree iterator step into the staged `batch`. Decode
1564/// errors and corruption errors are captured as `Err` entries so
1565/// they surface via `next()` rather than aborting the refill —
1566/// power-of-ten Rule 7.
1567///
1568/// Free function (rather than `&mut self`) so the caller can keep
1569/// the pager lock open across multiple buffer-pushes without the
1570/// borrow checker tripping on a second `&mut self.buffer` borrow.
1571fn buffer_one_entry<T: Document>(
1572 batch: &mut VecDeque<Result<(Id, T)>>,
1573 last_key: &mut Option<Vec<u8>>,
1574 collection_id: u32,
1575 step: Result<(Vec<u8>, Vec<u8>)>,
1576) {
1577 let (key, value) = match step {
1578 Ok(kv) => kv,
1579 Err(e) => {
1580 batch.push_back(Err(e));
1581 return;
1582 }
1583 };
1584 let Some(id) = Id::from_be_bytes(&key) else {
1585 batch.push_back(Err(Error::InvalidArgument(
1586 "primary B-tree key is not an Id",
1587 )));
1588 return;
1589 };
1590 *last_key = Some(key);
1591 let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1592 batch.push_back(decoded.map(|doc| (id, doc)));
1593}
1594
1595impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1596 type Item = Result<(Id, T)>;
1597
1598 fn next(&mut self) -> Option<Self::Item> {
1599 if let Some(item) = self.buffer.pop_front() {
1600 return Some(item);
1601 }
1602 if self.finished {
1603 return None;
1604 }
1605 if let Err(e) = self.refill() {
1606 // A lock / open failure surfaces here ONCE and then
1607 // `finished` is flipped so subsequent `next` calls
1608 // terminate cleanly. The error is surfaced to the
1609 // caller via `Some(Err(_))` — power-of-ten Rule 7.
1610 self.finished = true;
1611 return Some(Err(e));
1612 }
1613 self.buffer.pop_front()
1614 }
1615}
1616
1617/// Split a possibly-namespaced collection name into its
1618/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1619/// only; downstream collection names may contain further dots.
1620///
1621/// `"users"` → `(None, "users")`.
1622/// `"archive.orders"` → `(Some("archive"), "orders")`.
1623/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1624#[must_use]
1625pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1626 match name.find('.') {
1627 Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1628 None => (None, name),
1629 }
1630}
1631
1632/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1633/// concurrent reader / single-writer workloads. The thread-safety
1634/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1635const _: () = {
1636 fn assert_send_sync<T: Send + Sync>() {}
1637 let _ = assert_send_sync::<Db>;
1638};
1639
1640/// Translate the first failure in `report` (if any) into the
1641/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1642/// open-time fast check so the caller sees `Err(Error::Corruption {
1643/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1644/// in the returned error is the locus of the failure when one is
1645/// available; for failures whose locus is a non-page (e.g. an
1646/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1647/// catalog root page-id is used as a stand-in.
1648fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1649 let first = report.failures.first()?;
1650 let err = match first {
1651 obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1652 | obj_core::IntegrityFailure::OrphanPage { page_id }
1653 | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1654 | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1655 | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1656 | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1657 | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1658 Error::Corruption { page_id: *page_id }
1659 }
1660 obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1661 Error::BTreeDepthExceeded { limit: *limit }
1662 }
1663 // The remaining variants — OrphanIndexEntry,
1664 // MissingIndexEntry, and any future `#[non_exhaustive]`
1665 // additions — are never emitted by `quick_check` (which
1666 // walks the catalog tree only, not the per-collection
1667 // primary or index trees). The defensive fallback is a
1668 // coarse `Corruption { page_id: 0 }` so the open-time
1669 // check still fails closed if the obj-core layer ever
1670 // grows a new fast-check failure mode.
1671 _ => Error::Corruption { page_id: 0 },
1672 };
1673 Some(err)
1674}