obj/db.rs
1//! `Db` — public entry point.
2//!
3//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
4//! described in `design.md` § API and pinned in `docs/format.md`
5//! § Db public API contract.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::marker::PhantomData;
9use std::ops::Bound;
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use obj_core::btree::BTree;
15use obj_core::pager::page::PageId;
16use obj_core::pager::{lock_path_for, Pager};
17use obj_core::platform::FileHandle;
18use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
19
20use crate::config::Config;
21use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
22
23/// Per-batch refill size for [`IterAll`]. The iterator yields one
24/// document at a time to the caller, but internally fetches the
25/// primary B-tree in `BATCH` chunks so the per-step pager-lock
26/// acquisition cost amortises over many `next` calls. Power-of-ten
27/// Rule 3: the buffer is fixed-size (constant 256 entries — at
28/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
29/// with the collection's total size.
30const ITER_ALL_BATCH: usize = 256;
31
32/// The embedded document database.
33///
34/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
35/// concurrent reader / single-writer access. See `design.md`
36/// § "Concurrent readers, one writer".
37///
38/// At M6 the public `Db` is hard-typed against
39/// `obj_core::FileHandle`. A future refactor may make it generic
40/// over `F: FileBackend` so fault-injection harnesses can build on
41/// the same API; today the test helpers reach for the lower-level
42/// `obj-core` building blocks instead.
43pub struct Db {
44 pub(crate) env: Arc<TxnEnv<FileHandle>>,
45 pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
46 pub(crate) readonly: bool,
47 pub(crate) busy_timeout: Duration,
48 /// Per-process cache of collection names whose
49 /// `Document::indexes()` reconciliation has already run. M7
50 /// #57: reconciliation is idempotent but a catalog walk +
51 /// declaration cycle is non-trivial — caching ensures only the
52 /// first `WriteTxn::collection::<T>()` call per process pays
53 /// the cost.
54 pub(crate) reconciled: Arc<Mutex<HashSet<String>>>,
55 /// M11 #93: registry of attached read-only databases keyed by
56 /// namespace. Populated by [`Db::attach`]; consulted by
57 /// [`Db::collection_namespace`] and by
58 /// [`Db::read_transaction`] when it pins per-attached
59 /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
60 /// `&mut self` while live read transactions hold borrows into
61 /// the registry; the mutex coordinates the two.
62 pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
63 /// #83 (c): published length of `attached`, mutated only while the
64 /// `attached` mutex is held. A relaxed load lets the hot read path
65 /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
66 /// no databases are attached (the overwhelmingly common case).
67 pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
68}
69
70impl std::fmt::Debug for Db {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("Db")
73 .field("readonly", &self.readonly)
74 .field("busy_timeout", &self.busy_timeout)
75 .finish_non_exhaustive()
76 }
77}
78
79// ---------- FFI plumbing ------------------------------------------
80//
81// The accessors below expose the shareable internals of `Db` so the
82// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
83// iterator handles without re-implementing the lifecycle. They are
84// marked `#[doc(hidden)]` because user-Rust code should reach for
85// the typed `Db::transaction` / `Db::read_transaction` API; the
86// raw accessors exist so a sibling FFI / native-host crate can
87// build its own self-contained handle types.
88impl Db {
89 /// Shared environment Arc — pager + cross-process lock file.
90 ///
91 /// Used by [`libobj`](../../libobj/index.html) to build owned
92 /// transaction handles whose lifetime extends past a single
93 /// `Db::transaction` closure call.
94 #[doc(hidden)]
95 #[must_use]
96 pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
97 Arc::clone(&self.env)
98 }
99
100 /// Shared catalog Arc.
101 ///
102 /// Used by [`libobj`](../../libobj/index.html) for the same
103 /// reason as [`Self::env_arc`].
104 #[doc(hidden)]
105 #[must_use]
106 pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
107 Arc::clone(&self.catalog)
108 }
109
110 /// Shared per-process reconciliation cache Arc.
111 ///
112 /// Used by [`libobj`](../../libobj/index.html) for the same
113 /// reason as [`Self::env_arc`].
114 #[doc(hidden)]
115 #[must_use]
116 pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<String>>> {
117 Arc::clone(&self.reconciled)
118 }
119
120 /// Busy-lock timeout configured at open time.
121 #[doc(hidden)]
122 #[must_use]
123 pub fn busy_timeout(&self) -> Duration {
124 self.busy_timeout
125 }
126}
127
128impl Db {
129 /// Open or create a file-backed database at `path` with default
130 /// configuration.
131 ///
132 /// Creates the file if absent; reopens otherwise. A `Db` is
133 /// `Send + Sync` — share across threads via `Arc<Db>` for the
134 /// concurrent-reader / single-writer workload documented in
135 /// `docs/concurrency.md`.
136 ///
137 /// For an ephemeral database use [`Db::memory`]; for a
138 /// read-only handle that coexists with another process's writer
139 /// use [`Db::open_readonly`]; for custom durability /
140 /// cache / lock knobs use [`Db::open_with`] + [`Config`].
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// # fn main() -> obj::Result<()> {
146 /// use obj::Db;
147 ///
148 /// let dir = tempfile::tempdir()?;
149 ///
150 /// // File-backed. Creates the file if absent; reopens otherwise.
151 /// let _db = Db::open(dir.path().join("app.obj"))?;
152 ///
153 /// // In-memory. No persistence, no file locks. Useful for tests.
154 /// let _mem = Db::memory()?;
155 ///
156 /// // Read-only. Coexists safely with a writer in another process.
157 /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
158 /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// # Errors
164 ///
165 /// Returns the underlying [`Error`] from
166 /// [`obj_core::pager::Pager::open`] on syscall or format
167 /// failure.
168 #[cfg_attr(
169 feature = "tracing",
170 tracing::instrument(
171 name = "db.open",
172 level = "info",
173 skip_all,
174 fields(path = %path.as_ref().display()),
175 )
176 )]
177 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
178 Self::open_with(path, Config::default())
179 }
180
181 /// Open or create a file-backed database with `config`.
182 ///
183 /// # Errors
184 ///
185 /// As [`Db::open`].
186 // Takes `Config` by value: this is the frozen builder-handoff
187 // signature (the caller's `Config::default().sync_mode(..)...`
188 // chain ends here). `Config` is no longer `Copy` (issue #17), so
189 // clippy now sees the by-value argument as only borrowed in the
190 // body — but the by-value convention is part of the 1.0 surface
191 // and must not change. Power-of-ten Rule 10: documented allow.
192 #[allow(clippy::needless_pass_by_value)]
193 #[cfg_attr(
194 feature = "tracing",
195 tracing::instrument(
196 name = "db.open",
197 level = "info",
198 skip_all,
199 fields(path = %path.as_ref().display()),
200 )
201 )]
202 pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
203 let path_buf = path.as_ref().to_path_buf();
204 // Issue #31: the pager `Config` is no longer `Copy` (its
205 // `encryption_key` zeroizes on drop), so move it out of
206 // `config` rather than copying it — `from_parts` only reads
207 // the remaining scalar fields. `mem::take` leaves a default
208 // (keyless) pager `Config` behind, which is never read again.
209 let pager_config = std::mem::take(&mut config.pager);
210 let pager = Pager::open(&path_buf, pager_config)?;
211 // Issue #1: cross-process locks anchor against a dedicated
212 // `<db>.obj-lock` sidecar file. Keeping the lock byte out
213 // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
214 // Windows once the DB grows past whatever offset the
215 // lock byte would otherwise sit at (`LockFileEx` is
216 // mandatory). The sidecar is left in place across opens;
217 // deleting it would race two openers into two different
218 // inodes and miss each other's exclusion.
219 let lock_file = if config.cross_process_lock {
220 let lock_path = lock_path_for(&path_buf);
221 let handle = FileHandle::open_or_create(&lock_path)?;
222 // Materialise the locked byte range as real file
223 // content. POSIX OFD and Windows `LockFileEx` both
224 // permit locking past EOF, but a non-empty sidecar
225 // is the most conservative choice across kernels
226 // (and lets the same offsets work on every platform).
227 // 128 bytes covers WRITER_LOCK (96) and the full
228 // READER_LOCK_RANGE (97..128).
229 handle.set_len(128)?;
230 Some(Arc::new(handle))
231 } else {
232 None
233 };
234 Self::from_parts(pager, lock_file, &config)
235 }
236
237 /// Open a fresh in-memory database. No persistence, no file
238 /// locks. Useful for unit tests and ephemeral workloads.
239 ///
240 /// # Errors
241 ///
242 /// Returns [`Error::InvalidArgument`] only if `config` has
243 /// zero cache frames.
244 pub fn memory() -> Result<Self> {
245 Self::memory_with(Config::default())
246 }
247
248 /// As [`Db::memory`] with a caller-supplied [`Config`].
249 ///
250 /// # Errors
251 ///
252 /// As [`Db::memory`].
253 // By-value `Config` is the frozen builder-handoff signature; see
254 // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
255 // allow (issue #17 dropped `Copy`, which made the lint fire).
256 #[allow(clippy::needless_pass_by_value)]
257 pub fn memory_with(mut config: Config) -> Result<Self> {
258 // Issue #31: move the (no-longer-`Copy`) pager `Config` out
259 // rather than copying it; `from_parts` only reads the
260 // remaining scalar fields. See `open_with` for the rationale.
261 let pager_config = std::mem::take(&mut config.pager);
262 let pager = Pager::memory(pager_config)?;
263 Self::from_parts(pager, None, &config)
264 }
265
266 /// Open the database at `path` in read-only mode. The
267 /// resulting `Db` rejects every mutating operation with
268 /// `Err(Error::ReadOnly { ... })`. Coexists safely with a
269 /// writer in another process via the cross-process reader
270 /// lock.
271 ///
272 /// # Errors
273 ///
274 /// As [`Db::open`].
275 pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
276 let config = Config {
277 readonly: true,
278 ..Config::default()
279 };
280 Self::open_with(path, config)
281 }
282
283 fn from_parts(
284 mut pager: Pager<FileHandle>,
285 lock_file: Option<Arc<FileHandle>>,
286 config: &Config,
287 ) -> Result<Self> {
288 // #64: catalog init (and its `tree.insert` → `alloc_page` /
289 // `set_root_catalog` chain) mutates the file header. Header
290 // mutations now ride the WAL via `stage_or_write_header`,
291 // which requires an open Pager txn so the staged page-0 frame
292 // commits atomically with the B-tree pages it depends on.
293 // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
294 // a `Db::open` against a fresh file is durable on return
295 // (matching the pre-#64 contract). `open_or_init` on an
296 // already-initialised database is read-only and the wrapped
297 // `commit()` is a no-op (`Pager::commit` short-circuits on an
298 // empty pending set + clean `header_dirty`).
299 pager.begin_txn();
300 let init = Catalog::open_or_init(&mut pager);
301 let catalog = match init {
302 Ok(c) => {
303 let commit_result = pager.commit();
304 pager.end_txn();
305 commit_result?;
306 c
307 }
308 Err(e) => {
309 pager.end_txn();
310 return Err(e);
311 }
312 };
313 // M11 #91: lightweight open-time integrity check. Runs the
314 // catalog-portion of `integrity_check` and surfaces any
315 // detected failure as the strongest available error so the
316 // caller decides whether to attempt repair or abort. Opt out
317 // via `Config::skip_open_check(true)`.
318 if !config.skip_open_check {
319 let report = obj_core::integrity::quick_check(&mut pager)?;
320 if let Some(err) = first_failure_as_error(&report) {
321 return Err(err);
322 }
323 }
324 Ok(Self {
325 env: Arc::new(TxnEnv::new(pager, lock_file)),
326 catalog: Arc::new(Mutex::new(catalog)),
327 readonly: config.readonly,
328 busy_timeout: config.busy_timeout,
329 reconciled: Arc::new(Mutex::new(HashSet::new())),
330 attached: Arc::new(Mutex::new(HashMap::new())),
331 attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
332 })
333 }
334
335 /// Run a closure inside a write transaction.
336 ///
337 /// Begins a [`WriteTxn`], runs the closure with `&mut tx`. If
338 /// the closure returns `Ok(r)`, the transaction is committed and
339 /// `Ok(r)` is returned. If the closure returns `Err(e)`, the
340 /// transaction is rolled back and `Err(e)` is returned. A
341 /// panic inside the closure unwinds with an implicit rollback
342 /// via the `WriteTxn` `Drop` impl. See `docs/concurrency.md`
343 /// for the lock-acquisition contract.
344 ///
345 /// # Examples
346 ///
347 /// Atomic batch — both inserts commit together or not at all:
348 ///
349 /// ```
350 /// # fn main() -> obj::Result<()> {
351 /// use obj::Db;
352 /// use serde::{Deserialize, Serialize};
353 ///
354 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
355 /// struct Order {
356 /// customer_id: u64,
357 /// total_cents: u64,
358 /// }
359 ///
360 /// let dir = tempfile::tempdir()?;
361 /// let db = Db::open(dir.path().join("txn.obj"))?;
362 ///
363 /// let (a, b) = db.transaction(|tx| {
364 /// let coll = tx.collection::<Order>()?;
365 /// let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
366 /// let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
367 /// Ok((a, b))
368 /// })?;
369 /// assert_ne!(a, b, "freshly-allocated ids are distinct");
370 /// # Ok(())
371 /// # }
372 /// ```
373 ///
374 /// Returning `Err(_)` rolls every staged write back; the `Err`
375 /// the closure returns is the `Err` the caller sees:
376 ///
377 /// ```
378 /// # fn main() -> obj::Result<()> {
379 /// use obj::{Db, Error};
380 /// use serde::{Deserialize, Serialize};
381 ///
382 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
383 /// struct Order { total_cents: u64 }
384 ///
385 /// let dir = tempfile::tempdir()?;
386 /// let db = Db::open(dir.path().join("rollback.obj"))?;
387 /// let id = db.insert(Order { total_cents: 10 })?;
388 ///
389 /// let outcome: obj::Result<()> = db.transaction(|tx| {
390 /// let coll = tx.collection::<Order>()?;
391 /// coll.update(id, |o| { o.total_cents = 99_999; })?;
392 /// Err(Error::InvalidArgument("synthetic abort"))
393 /// });
394 /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
395 ///
396 /// let after: Order = db
397 /// .get::<Order>(id)?
398 /// .ok_or(Error::InvalidArgument("just inserted"))?;
399 /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
400 /// # Ok(())
401 /// # }
402 /// ```
403 ///
404 /// # Errors
405 ///
406 /// - [`Error::ReadOnly`] if the database was opened read-only.
407 /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
408 /// - Any error the closure returns.
409 #[cfg_attr(
410 feature = "tracing",
411 tracing::instrument(name = "db.transaction", level = "info", skip_all)
412 )]
413 pub fn transaction<R, F>(&self, body: F) -> Result<R>
414 where
415 F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
416 {
417 if self.readonly {
418 return Err(Error::ReadOnly {
419 operation: "transaction",
420 });
421 }
422 #[cfg(feature = "tracing")]
423 tracing::debug!("begin");
424 let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
425 let mut tx = WriteTxn::new(
426 inner,
427 Arc::clone(&self.catalog),
428 Arc::clone(&self.reconciled),
429 );
430 match body(&mut tx) {
431 Ok(value) => {
432 tx.commit()?;
433 #[cfg(feature = "tracing")]
434 tracing::debug!("commit");
435 Ok(value)
436 }
437 Err(e) => {
438 // Best-effort rollback; surface the closure's
439 // error. Refresh the in-memory catalog after
440 // rollback so its B-tree root state matches the
441 // file's `root_catalog` header field — catalog
442 // mutations write the header directly (not through
443 // the WAL) so a rollback leaves the header pointing
444 // at the most-recent in-memory root, which is
445 // exactly what re-opening the catalog will pick up.
446 let _ = tx.rollback();
447 let _ = self.refresh_catalog();
448 #[cfg(feature = "tracing")]
449 tracing::debug!("rollback");
450 Err(e)
451 }
452 }
453 }
454
455 /// Re-open the in-memory `Catalog` handle from the pager. Used
456 /// after a transaction rollback to discard the catalog's in-
457 /// memory `next_collection_id` / `tree.root` state that may
458 /// have advanced during the rolled-back closure.
459 fn refresh_catalog(&self) -> Result<()> {
460 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
461 kind: obj_core::LockKind::WriterInProcess,
462 })?;
463 let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
464 let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
465 kind: obj_core::LockKind::WriterInProcess,
466 })?;
467 *existing = fresh;
468 Ok(())
469 }
470
471 /// Run a closure inside a read transaction. See
472 /// [`Self::transaction`] for the closure shape and atomicity
473 /// contract; reads inside `body` observe a consistent
474 /// snapshot of the database.
475 ///
476 /// Every read inside the closure observes a single consistent
477 /// snapshot — the snapshot is pinned at the moment the closure
478 /// begins. Concurrent writers do not affect what the closure
479 /// sees.
480 ///
481 /// # Examples
482 ///
483 /// Two reads inside one `read_transaction` see the same value
484 /// even if a writer commits in between:
485 ///
486 /// ```
487 /// # fn main() -> obj::Result<()> {
488 /// use obj::Db;
489 /// use serde::{Deserialize, Serialize};
490 ///
491 /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
492 /// struct Order { total_cents: u64 }
493 ///
494 /// let dir = tempfile::tempdir()?;
495 /// let db = Db::open(dir.path().join("read.obj"))?;
496 /// let id = db.insert(Order { total_cents: 10 })?;
497 ///
498 /// let (a, b) = db.read_transaction(|tx| {
499 /// let coll = tx.collection::<Order>()?;
500 /// let a = coll.get(id)?;
501 /// let b = coll.get(id)?;
502 /// Ok((a, b))
503 /// })?;
504 /// assert_eq!(a, b);
505 /// # Ok(())
506 /// # }
507 /// ```
508 ///
509 /// # Errors
510 ///
511 /// - [`Error::Busy`] if the reader lock could not be acquired.
512 /// - Any error the closure returns.
513 #[cfg_attr(
514 feature = "tracing",
515 tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
516 )]
517 pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
518 where
519 F: FnOnce(&ReadTxn<'_>) -> Result<R>,
520 {
521 #[cfg(feature = "tracing")]
522 tracing::debug!("begin");
523 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
524 // M11 #93: pin one snapshot per attached database so reads
525 // through the closure observe a consistent view per file.
526 let attached_contexts = self.pin_attached_snapshots()?;
527 let tx = ReadTxn::with_attached(inner, attached_contexts);
528 let result = body(&tx);
529 #[cfg(feature = "tracing")]
530 tracing::debug!("commit");
531 result
532 }
533
534 /// Snapshot every attached database into a per-namespace
535 /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
536 /// pin; dropping the returned map releases every pin.
537 fn pin_attached_snapshots(
538 &self,
539 ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
540 // #83 (c): the common case is no attached databases. A single
541 // relaxed load of `attached_len` lets the empty path skip the
542 // `self.attached` mutex acquire entirely (`with_capacity(0)`
543 // already does not allocate, so the mutex is the only cost).
544 //
545 // Correctness under concurrent detach: the map produced here
546 // feeds only namespaced read dispatch in `open_readonly_named`
547 // (`<ns>.<tail>` → attached snapshot). With zero attachments no
548 // namespace can resolve, so an empty map is observably the same
549 // as the locked build. `attached_len` is mutated only while the
550 // `attached` mutex is held (in `attach` / `detach`), so a `0`
551 // observed here means "no attachment is committed to the map",
552 // matching what a lock-and-read would have seen at that instant.
553 // A concurrent attach that has not yet published its count is
554 // exactly an attach ordered *after* this txn began — invisible
555 // by design, same as the cross-process snapshot semantics.
556 if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
557 return Ok(std::collections::HashMap::new());
558 }
559 let registry = self.attached.lock().map_err(|_| Error::Busy {
560 kind: obj_core::LockKind::WriterInProcess,
561 })?;
562 let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
563 std::collections::HashMap::with_capacity(registry.len());
564 for (namespace, attached) in registry.iter() {
565 let env = Arc::clone(&attached.env);
566 let snapshot = {
567 let mut pager = env.pager().lock().map_err(|_| Error::Busy {
568 kind: obj_core::LockKind::WriterInProcess,
569 })?;
570 pager.reader_snapshot()?
571 };
572 out.insert(
573 namespace.clone(),
574 crate::txn::AttachedReadCtx { env, snapshot },
575 );
576 }
577 Ok(out)
578 }
579
580 /// Attach the database at `path` under `namespace`. The
581 /// attached file is opened read-only; collections in the
582 /// attached file become visible to subsequent
583 /// `Db::collection::<T>()` calls (and the one-shot per-op API
584 /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
585 /// form `<namespace>.<collection_name>`.
586 ///
587 /// Writes against namespaced collections return
588 /// [`Error::AttachedDatabaseIsReadOnly`].
589 ///
590 /// Each attached database gets its own snapshot pinned at
591 /// read-transaction begin; [`Db::detach`] removes the registry
592 /// entry but in-flight reads complete against their pinned
593 /// snapshot.
594 ///
595 /// # Examples
596 ///
597 /// ```
598 /// # fn main() -> obj::Result<()> {
599 /// use obj::Db;
600 /// use serde::{Deserialize, Serialize};
601 ///
602 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
603 /// #[obj(collection = "orders_attach_doc")]
604 /// struct Order { total_cents: u64 }
605 ///
606 /// // Same struct shape, namespaced collection name. Reads
607 /// // against this type route to the attached "archive" db.
608 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
609 /// #[obj(collection = "archive.orders_attach_doc")]
610 /// struct ArchivedOrder { total_cents: u64 }
611 ///
612 /// let dir = tempfile::tempdir()?;
613 /// let live = dir.path().join("live.obj");
614 /// let archive = dir.path().join("archive.obj");
615 ///
616 /// // Seed the archive (writes go through its own un-namespaced type).
617 /// {
618 /// let archive_db = Db::open(&archive)?;
619 /// let _ = archive_db.insert(Order { total_cents: 999 })?;
620 /// }
621 ///
622 /// // Open the live db, attach the archive under "archive".
623 /// let mut db = Db::open(&live)?;
624 /// let _ = db.insert(Order { total_cents: 100 })?;
625 /// db.attach(&archive, "archive")?;
626 ///
627 /// // One read transaction, two collections.
628 /// db.read_transaction(|tx| {
629 /// let live = tx.collection::<Order>()?;
630 /// let arch = tx.collection::<ArchivedOrder>()?;
631 /// assert_eq!(live.all()?.len(), 1);
632 /// assert_eq!(arch.all()?.len(), 1);
633 /// Ok(())
634 /// })?;
635 ///
636 /// db.detach("archive")?;
637 /// # Ok(())
638 /// # }
639 /// ```
640 ///
641 /// # Errors
642 ///
643 /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
644 /// use on this `Db`.
645 /// - [`Error::AttachmentNotReadable`] if `path` cannot be
646 /// opened read-only.
647 pub fn attach<P: AsRef<std::path::Path>>(
648 &mut self,
649 path: P,
650 namespace: impl Into<String>,
651 ) -> Result<()> {
652 let namespace = namespace.into();
653 let path_buf = path.as_ref().to_path_buf();
654 {
655 let registry = self.attached.lock().map_err(|_| Error::Busy {
656 kind: obj_core::LockKind::WriterInProcess,
657 })?;
658 if registry.contains_key(&namespace) {
659 return Err(Error::AttachmentAlreadyExists { namespace });
660 }
661 }
662 let attached_db =
663 Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
664 path: path_buf.clone(),
665 source: Box::new(source),
666 })?;
667 let env = Arc::clone(&attached_db.env);
668 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
669 kind: obj_core::LockKind::WriterInProcess,
670 })?;
671 // Re-check after acquiring the lock — a sibling caller may
672 // have raced; the `Arc<Mutex<_>>` shape makes the check
673 // necessary on the second acquire.
674 if registry.contains_key(&namespace) {
675 return Err(Error::AttachmentAlreadyExists { namespace });
676 }
677 registry.insert(
678 namespace,
679 AttachedDb {
680 env,
681 _db: attached_db,
682 },
683 );
684 // #83 (c): publish the new length while still holding the
685 // `attached` mutex so `pin_attached_snapshots`' relaxed load is
686 // never stale relative to a committed attachment.
687 self.attached_len
688 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
689 Ok(())
690 }
691
692 /// Remove the attachment registered under `namespace`. Returns
693 /// [`Error::CollectionNamespaceUnknown`] if the namespace is
694 /// not attached.
695 ///
696 /// In-flight read transactions hold their own snapshot pins on
697 /// the attached env; detach removes the registry entry, but the
698 /// in-flight read may still complete against its pinned
699 /// snapshot.
700 ///
701 /// # Errors
702 ///
703 /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
704 /// not attached.
705 /// - [`Error::Busy`] if the registry mutex is poisoned.
706 pub fn detach(&mut self, namespace: &str) -> Result<()> {
707 let mut registry = self.attached.lock().map_err(|_| Error::Busy {
708 kind: obj_core::LockKind::WriterInProcess,
709 })?;
710 if registry.remove(namespace).is_none() {
711 return Err(Error::CollectionNamespaceUnknown {
712 namespace: namespace.to_owned(),
713 });
714 }
715 // #83 (c): publish the post-detach length under the lock.
716 self.attached_len
717 .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
718 Ok(())
719 }
720
721 /// Insert `doc` into its collection. One-shot transaction;
722 /// returns the assigned [`Id`].
723 ///
724 /// The one-shot API opens, commits, and closes a private
725 /// transaction per call. Reach for [`Db::transaction`] when
726 /// several mutations must commit or roll back as a single
727 /// atomic unit.
728 ///
729 /// # Examples
730 ///
731 /// One-shot CRUD against a `Document`-derived type:
732 ///
733 /// ```
734 /// # fn main() -> obj::Result<()> {
735 /// use obj::Db;
736 /// use serde::{Deserialize, Serialize};
737 ///
738 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
739 /// struct Order {
740 /// customer_id: u64,
741 /// total_cents: u64,
742 /// status: String,
743 /// }
744 ///
745 /// let dir = tempfile::tempdir()?;
746 /// let db = Db::open(dir.path().join("oneshot.obj"))?;
747 ///
748 /// // insert returns the freshly-allocated Id.
749 /// let id = db.insert(Order {
750 /// customer_id: 1,
751 /// total_cents: 100,
752 /// status: "pending".to_owned(),
753 /// })?;
754 ///
755 /// // get returns Option<T>.
756 /// let _maybe: Option<Order> = db.get::<Order>(id)?;
757 ///
758 /// // update applies a closure in place.
759 /// db.update::<Order, _>(id, |o| {
760 /// o.status = "shipped".to_owned();
761 /// })?;
762 ///
763 /// // upsert at a caller-supplied id (insert or replace).
764 /// let id2 = obj::Id::try_new(42)
765 /// .ok_or(obj::Error::InvalidArgument("non-zero"))?;
766 /// db.upsert::<Order>(id2, Order {
767 /// customer_id: 2,
768 /// total_cents: 999,
769 /// status: "new".to_owned(),
770 /// })?;
771 ///
772 /// // delete returns true if the row existed.
773 /// let existed = db.delete::<Order>(id)?;
774 /// assert!(existed);
775 /// # Ok(())
776 /// # }
777 /// ```
778 ///
779 /// # Errors
780 ///
781 /// As [`Self::transaction`] plus any error from
782 /// [`crate::Collection::insert`].
783 pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
784 self.transaction(|tx| tx.collection::<T>()?.insert(doc))
785 }
786
787 /// Fetch the document at `id`. Returns `Ok(None)` if absent.
788 ///
789 /// # Errors
790 ///
791 /// As [`Self::read_transaction`] plus any error from
792 /// [`crate::Collection::get`].
793 pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
794 // #83 (b): the one-shot point read goes through the fused
795 // single-pager-lock path (descriptor resolve + primary get
796 // under one guard) instead of `tx.collection()?.get()`, which
797 // takes the pager mutex twice. Observably identical for the
798 // one-shot caller (same snapshot, same `CollectionNotFound`
799 // contract). Namespaced reads fall back to the handle path.
800 self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
801 }
802
803 /// Update the document at `id` via the closure.
804 ///
805 /// # Errors
806 ///
807 /// - [`Error::DocumentNotFound`] if `id` does not exist.
808 /// - As [`Self::transaction`].
809 pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
810 where
811 T: Document,
812 F: FnOnce(&mut T),
813 {
814 self.transaction(|tx| tx.collection::<T>()?.update(id, f))
815 }
816
817 /// Delete the document at `id`. Returns `true` if it existed.
818 ///
819 /// # Errors
820 ///
821 /// As [`Self::transaction`].
822 pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
823 self.transaction(|tx| tx.collection::<T>()?.delete(id))
824 }
825
826 /// Insert or replace the document at `id`.
827 ///
828 /// # Errors
829 ///
830 /// As [`Self::transaction`].
831 pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
832 self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
833 }
834
835 /// Convenience wrapper around [`crate::Collection::find_unique`]
836 /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
837 /// Runs inside a one-shot read transaction.
838 ///
839 /// `O(log n)`, no collection scan; the lookup walks the named
840 /// index's B-tree directly. Defined only on `Unique` indexes —
841 /// for the other kinds use
842 /// [`Collection::lookup`](crate::Collection::lookup) or
843 /// [`Collection::index_range`](crate::Collection::index_range).
844 ///
845 /// # Examples
846 ///
847 /// ```
848 /// # fn main() -> obj::Result<()> {
849 /// use obj::Db;
850 /// use serde::{Deserialize, Serialize};
851 ///
852 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
853 /// #[obj(collection = "customers_find_unique_doc")]
854 /// struct Customer {
855 /// #[obj(index = unique)]
856 /// email: String,
857 /// }
858 ///
859 /// let dir = tempfile::tempdir()?;
860 /// let db = Db::open(dir.path().join("find-unique.obj"))?;
861 /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
862 /// let by_email: Option<Customer> = db
863 /// .find_unique::<Customer>("email", "ada@example.com")?;
864 /// assert!(by_email.is_some());
865 /// # Ok(())
866 /// # }
867 /// ```
868 ///
869 /// # Errors
870 ///
871 /// As [`crate::Collection::find_unique`].
872 pub fn find_unique<T: Document>(
873 &self,
874 index_name: &str,
875 key: impl Into<obj_core::codec::Dynamic>,
876 ) -> Result<Option<T>> {
877 self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
878 }
879
880 /// Construct a fresh M8 [`crate::Query`] builder rooted at this
881 /// database. The builder borrows `&self` for the build phase;
882 /// the borrow ends when [`crate::Query::fetch`] returns.
883 ///
884 /// Compose with [`Query::filter`](crate::Query::filter),
885 /// [`Query::limit`](crate::Query::limit),
886 /// [`Query::sort_by`](crate::Query::sort_by),
887 /// [`Query::index_range`](crate::Query::index_range). Terminate
888 /// with [`Query::fetch`](crate::Query::fetch) (for the
889 /// documents) or [`Query::count`](crate::Query::count) (for the
890 /// count alone).
891 ///
892 /// Mirrors `design.md` § Querying — see the M8 examples in
893 /// `crates/obj/tests/design_md_queries.rs` for the full surface.
894 ///
895 /// # Examples
896 ///
897 /// Top-N matching documents by an indexed field, ascending:
898 ///
899 /// ```
900 /// # fn main() -> obj::Result<()> {
901 /// use obj::Db;
902 /// use obj_core::codec::Dynamic;
903 /// use serde::{Deserialize, Serialize};
904 ///
905 /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
906 /// enum OrderStatus { Pending, Shipped }
907 ///
908 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
909 /// #[obj(collection = "orders_query_doc")]
910 /// struct Order {
911 /// #[obj(index)]
912 /// customer_id: u64,
913 /// status: OrderStatus,
914 /// #[obj(index)]
915 /// placed_at: u64,
916 /// }
917 ///
918 /// let dir = tempfile::tempdir()?;
919 /// let db = Db::open(dir.path().join("queries.obj"))?;
920 /// for i in 0..20u64 {
921 /// let _ = db.insert(Order {
922 /// customer_id: i % 3,
923 /// status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
924 /// placed_at: i * 1_000,
925 /// })?;
926 /// }
927 ///
928 /// let pending: Vec<Order> = db
929 /// .query::<Order>()
930 /// .filter(|o| o.status == OrderStatus::Pending)
931 /// .sort_by(|o| Dynamic::U64(o.placed_at))
932 /// .limit(5)
933 /// .fetch()?;
934 /// assert!(pending.len() <= 5);
935 /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
936 /// # Ok(())
937 /// # }
938 /// ```
939 #[must_use]
940 pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
941 crate::Query::new(self)
942 }
943
944 /// Open a read-only typed handle to the collection registered
945 /// under the runtime `name`, instead of the type's compile-time
946 /// `T::COLLECTION`.
947 ///
948 /// This unlocks the `design.md` § Portability example for
949 /// attached databases: by passing a namespaced name like
950 /// `"archive.orders"`, the returned [`crate::Collection`] reads
951 /// from the database attached under the `"archive"` namespace
952 /// — see [`Db::attach`].
953 ///
954 /// Construction is **infallible**: errors (missing collection,
955 /// unknown namespace, busy lock) surface at the first method
956 /// call on the handle, not at the call to `collection(name)`.
957 /// Each read-only method on the returned handle opens a private
958 /// [`Db::read_transaction`] and dispatches against the
959 /// runtime-named collection's catalog row.
960 ///
961 /// # Read-only
962 ///
963 /// The returned handle rejects every mutating call —
964 /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
965 /// all return [`Error::ReadOnly`]. To write into a non-default
966 /// collection, override [`obj_core::Document::COLLECTION`] on
967 /// the type itself (compile-time-bound) and use the regular
968 /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
969 /// Phase 1B (M11 #94) intentionally limits the runtime accessor
970 /// to reads — the write-through-runtime-name path requires
971 /// engine plumbing that is deferred to a later milestone.
972 ///
973 /// # Example
974 ///
975 /// ```
976 /// use obj::{Db, Document};
977 /// use serde::{Deserialize, Serialize};
978 /// use tempfile::tempdir;
979 ///
980 /// #[derive(Debug, Clone, Serialize, Deserialize)]
981 /// struct Order { customer_id: u64, total_cents: u64 }
982 ///
983 /// impl Document for Order {
984 /// const COLLECTION: &'static str = "orders";
985 /// const VERSION: u32 = 1;
986 /// }
987 ///
988 /// fn run() -> obj::Result<()> {
989 /// let dir = tempdir()?;
990 /// let archive_path = dir.path().join("archive.obj");
991 /// // Populate the archive database first.
992 /// {
993 /// let archive_db = Db::open(&archive_path)?;
994 /// archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
995 /// }
996 /// // Attach it under a namespace and read via the runtime
997 /// // accessor — no need to re-declare `Order` with a
998 /// // namespaced `COLLECTION`.
999 /// let main_path = dir.path().join("main.obj");
1000 /// let mut db = Db::open(&main_path)?;
1001 /// db.attach(&archive_path, "archive")?;
1002 /// let archived: Vec<Order> = db
1003 /// .collection::<Order>("archive.orders")
1004 /// .all()?
1005 /// .into_iter()
1006 /// .map(|(_id, doc)| doc)
1007 /// .collect();
1008 /// assert_eq!(archived.len(), 1);
1009 /// Ok(())
1010 /// }
1011 /// # run().unwrap();
1012 /// ```
1013 #[must_use]
1014 pub fn collection<T: Document + Send + 'static>(
1015 &self,
1016 name: impl Into<String>,
1017 ) -> crate::Collection<'_, T> {
1018 crate::Collection::<T>::lazy(self, name.into())
1019 }
1020
1021 /// Convenience shim mirroring `design.md` —
1022 /// `for order in db.all::<Order>()? { ... }`. Returns an owned
1023 /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
1024 /// that drives the streaming iterator to exhaustion and
1025 /// collects; if the collection is large enough that peak
1026 /// memory matters, prefer [`Db::iter_all`] directly.
1027 ///
1028 /// Sorting requires materialisation — the comparator needs
1029 /// every key up front. Use [`Db::query`] +
1030 /// [`Query::sort_by`](crate::Query::sort_by) for the
1031 /// top-N-sorted workload; the iterator side has no streaming
1032 /// sorted shape.
1033 ///
1034 /// # Examples
1035 ///
1036 /// ```
1037 /// # fn main() -> obj::Result<()> {
1038 /// use obj::Db;
1039 /// use serde::{Deserialize, Serialize};
1040 ///
1041 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1042 /// struct Order { total_cents: u64 }
1043 ///
1044 /// let dir = tempfile::tempdir()?;
1045 /// let db = Db::open(dir.path().join("all.obj"))?;
1046 /// for i in 0..5u64 {
1047 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1048 /// }
1049 /// let listed: Vec<Order> = db.all::<Order>()?;
1050 /// assert_eq!(listed.len(), 5);
1051 /// # Ok(())
1052 /// # }
1053 /// ```
1054 ///
1055 /// # Errors
1056 ///
1057 /// As [`Db::iter_all`].
1058 pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
1059 self.iter_all::<T>()?
1060 .map(|step| step.map(|(_id, doc)| doc))
1061 .collect()
1062 }
1063
1064 /// Write a self-contained `.obj` file at `dest` carrying this
1065 /// database's state at the LSN of an internally-taken reader
1066 /// snapshot.
1067 ///
1068 /// Hot backup — writers continue uninterrupted against the
1069 /// source. Post-snapshot writes are NOT in the destination.
1070 ///
1071 /// Algorithm (per `docs/format.md` § Hot backup):
1072 ///
1073 /// 1. Take a `ReaderSnapshot` against the source pager (pins
1074 /// `pinned_lsn`).
1075 /// 2. `OpenOptions::create_new(true)` on `dest`.
1076 /// 3. Copy main-file pages `0..page_count` to `dest`.
1077 /// 4. Overlay every frame in the snapshot's frozen WAL view
1078 /// onto `dest` at the frame's page-id offset.
1079 /// 5. If the snapshot carries a WAL-staged page-0 header,
1080 /// overlay it (so `dest`'s page-0 reflects the catalog
1081 /// root / freelist head / page count the snapshot would
1082 /// have observed).
1083 /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
1084 /// the header CRC32C.
1085 /// 7. `sync_data(SyncMode::Full)` on `dest`.
1086 /// 8. Drop the snapshot (releases the WAL pin).
1087 ///
1088 /// On any mid-backup error the destination file is removed
1089 /// best-effort so a half-written backup does not linger.
1090 ///
1091 /// # Examples
1092 ///
1093 /// ```
1094 /// # fn main() -> obj::Result<()> {
1095 /// use obj::Db;
1096 /// use serde::{Deserialize, Serialize};
1097 ///
1098 /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
1099 /// #[obj(collection = "notes_backup_doc")]
1100 /// struct Note { body: String }
1101 ///
1102 /// let dir = tempfile::tempdir()?;
1103 /// let src = dir.path().join("src.obj");
1104 /// let dst = dir.path().join("backup.obj");
1105 ///
1106 /// let db = Db::open(&src)?;
1107 /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
1108 /// db.backup_to(&dst)?;
1109 ///
1110 /// // The backup is itself a fully-formed obj file. Open it and read.
1111 /// let backup = Db::open(&dst)?;
1112 /// let listed: Vec<Note> = backup.all::<Note>()?;
1113 /// assert_eq!(listed.len(), 1);
1114 /// # Ok(())
1115 /// # }
1116 /// ```
1117 ///
1118 /// # Errors
1119 ///
1120 /// - [`Error::BackupDestinationExists`] if `dest` already
1121 /// exists.
1122 /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
1123 /// a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
1124 /// - [`Error::Io`] on syscall failure during the copy.
1125 pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1126 // #76: a hot backup reads the source MAIN FILE page-by-page
1127 // (`backup::copy_main_file`). The in-process pager `Mutex`
1128 // alone excludes only same-process writers; a writer in a
1129 // SEPARATE OS process can checkpoint pages into the main file
1130 // mid-copy and yield a torn backup. Hold the cross-process
1131 // `WRITER_LOCK` for the whole copy so no external writer can
1132 // mutate the main file under us.
1133 //
1134 // Lock-order invariant (MUST match `WriteTxn::begin`, see
1135 // `docs/concurrency.md`): in-process write-serialization FIRST,
1136 // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
1137 // (innermost, acquired and released within the critical
1138 // section). `obj_core::WriteTxn::begin` acquires the first two
1139 // in exactly that order, so routing through it inherits the
1140 // correct ordering and cannot invert against a concurrent
1141 // in-process `WriteTxn`. We make NO writes; the txn is rolled
1142 // back (a no-op restore of the unchanged header) once the copy
1143 // is done. On in-memory / `cross_process_lock = false` envs the
1144 // cross-process guard is absent (`lock_file = None`) and the
1145 // txn degrades to the in-process serialization guard only.
1146 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1147 let result = self.run_backup_under_guard(dest);
1148 // Release both lock layers in the documented order regardless
1149 // of the copy's outcome. A rollback failure (poisoned pager
1150 // mutex) only surfaces when the copy itself succeeded.
1151 let unlock = guard.rollback();
1152 result.and(unlock)
1153 }
1154
1155 /// #76 helper: take the pager `Mutex` (innermost lock), pin a
1156 /// reader snapshot, and run the backup copy. Factored out of
1157 /// [`Self::backup_to`] so the cross-process lock guard's lifetime
1158 /// is unambiguous and the function stays within the power-of-ten
1159 /// 60-line budget (Rule 4).
1160 fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
1161 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1162 kind: obj_core::LockKind::WriterInProcess,
1163 })?;
1164 let snapshot = pager.reader_snapshot()?;
1165 obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
1166 // Snapshot drops at end of scope; the pin is released and
1167 // any deferred checkpoint may proceed.
1168 drop(snapshot);
1169 drop(pager);
1170 Ok(())
1171 }
1172
1173 /// Fold committed WAL pages into the main `.obj` file and reset
1174 /// the WAL back to its 64-byte header.
1175 ///
1176 /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
1177 /// pager lock the same way [`Self::backup_to`] does, then calls
1178 /// the pager checkpoint. After it returns, the committed records
1179 /// live in the main file rather than the `-wal` sidecar, and the
1180 /// WAL is truncated to header-only.
1181 ///
1182 /// # Deferred / no-op behavior
1183 ///
1184 /// - If a live MVCC reader has pinned an LSN below the end of the
1185 /// WAL, the checkpoint is **deferred** (a safe no-op) so the
1186 /// reader's frames are not reclaimed out from under it.
1187 /// - If there is nothing to fold (no committed WAL frames), the
1188 /// call is a harmless no-op.
1189 ///
1190 /// This is the reusable engine entry point behind the Python
1191 /// `Db.checkpoint()` binding and a future checkpoint-on-close
1192 /// path (issue #5).
1193 ///
1194 /// # Errors
1195 ///
1196 /// - [`Error::ReadOnly`] if the database was opened read-only.
1197 /// - [`Error::Busy`] if the pager lock is poisoned.
1198 /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
1199 /// (e.g. [`Error::Io`] on syscall failure).
1200 #[cfg_attr(
1201 feature = "tracing",
1202 tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
1203 )]
1204 pub fn checkpoint(&self) -> Result<()> {
1205 if self.readonly {
1206 return Err(Error::ReadOnly {
1207 operation: "checkpoint",
1208 });
1209 }
1210 // #76: checkpoint folds committed WAL frames into the main
1211 // file and rotates the WAL salt. Both touch the main file and
1212 // the on-disk header. Taken under only the in-process pager
1213 // `Mutex` (as before), a writer in a SEPARATE process could
1214 // run its own commit/checkpoint concurrently and corrupt the
1215 // salt-rotation handshake or interleave main-file writes. Hold
1216 // the cross-process `WRITER_LOCK` for the duration, in the
1217 // documented lock order (in-process serialization → cross-
1218 // process writer lock → pager mutex; see `WriteTxn::begin` and
1219 // `Self::backup_to`). We make no user writes; the surrounding
1220 // txn is rolled back afterwards. The rollback's
1221 // `restore_header_snapshot` only rewrites `root_catalog` /
1222 // `freelist_head` / `page_count` (unchanged by checkpoint) and
1223 // preserves the freshly-rotated `wal_salt`, so it is a no-op
1224 // with respect to the checkpoint's durable effects.
1225 let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
1226 let result = self.run_checkpoint_under_guard();
1227 let unlock = guard.rollback();
1228 result.and(unlock)
1229 }
1230
1231 /// #76 helper: take the pager `Mutex` (innermost lock) and run the
1232 /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
1233 /// cross-process lock guard's lifetime is unambiguous.
1234 fn run_checkpoint_under_guard(&self) -> Result<()> {
1235 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
1236 kind: obj_core::LockKind::WriterInProcess,
1237 })?;
1238 pager.checkpoint()?;
1239 drop(pager);
1240 Ok(())
1241 }
1242
1243 /// Streaming iterator over every `(Id, T)` pair in the
1244 /// collection. The returned [`IterAll`] holds a read transaction
1245 /// — and therefore a pinned reader snapshot — for its entire
1246 /// lifetime; the borrow on `self` ends when the iterator is
1247 /// dropped.
1248 ///
1249 /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
1250 /// errors surface as `Some(Err(_))` rather than ending the
1251 /// iteration; the caller decides whether to propagate or
1252 /// continue.
1253 ///
1254 /// Peak memory does NOT scale with collection size — the
1255 /// iterator's internal buffer is fixed at
1256 /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the design.md
1257 /// ~512 byte/doc estimate). Power-of-ten Rule 3.
1258 ///
1259 /// `Query::fetch` is sort-compatible (sort requires
1260 /// materialisation); `iter_all` is NOT — there is no streaming
1261 /// shape for sort because the comparator needs every key
1262 /// up front. Use `Query::sort_by` + `fetch` for the sorted
1263 /// workload; use `iter_all` for the unsorted large-scan
1264 /// workload.
1265 ///
1266 /// # Examples
1267 ///
1268 /// Streaming a small collection and folding into a running sum.
1269 /// The iterator's peak memory stays bounded regardless of how
1270 /// many documents the collection holds:
1271 ///
1272 /// ```
1273 /// # fn main() -> obj::Result<()> {
1274 /// use obj::Db;
1275 /// use serde::{Deserialize, Serialize};
1276 ///
1277 /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
1278 /// struct Order { total_cents: u64 }
1279 ///
1280 /// let dir = tempfile::tempdir()?;
1281 /// let db = Db::open(dir.path().join("iter.obj"))?;
1282 /// for i in 0..5u64 {
1283 /// let _ = db.insert(Order { total_cents: i * 10 })?;
1284 /// }
1285 ///
1286 /// let mut total: u64 = 0;
1287 /// for step in db.iter_all::<Order>()? {
1288 /// let (_id, doc) = step?;
1289 /// total = total
1290 /// .checked_add(doc.total_cents)
1291 /// .ok_or(obj::Error::InvalidArgument("overflow"))?;
1292 /// }
1293 /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
1294 /// # Ok(())
1295 /// # }
1296 /// ```
1297 ///
1298 /// # Errors
1299 ///
1300 /// - As [`Db::read_transaction`] (construction-time).
1301 /// - [`Error::CollectionNotFound`] if the collection is not yet
1302 /// registered at the snapshot's pinned LSN.
1303 /// - Per-step iteration may yield `Some(Err(_))` for pager,
1304 /// B-tree, or codec failures.
1305 pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
1306 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
1307 let txn = ReadTxn::new(inner);
1308 // Resolve the descriptor up front so an absent collection
1309 // surfaces at construction (matching `Db::all`'s pre-M8
1310 // contract). The handle's lifetime is bound to `txn`; we
1311 // drop the handle immediately and stash the descriptor.
1312 let descriptor = {
1313 let coll = txn.collection::<T>()?;
1314 coll.descriptor().clone()
1315 };
1316 Ok(IterAll {
1317 txn,
1318 descriptor,
1319 buffer: VecDeque::new(),
1320 last_emitted_key: None,
1321 finished: false,
1322 _phantom: PhantomData,
1323 })
1324 }
1325}
1326
1327/// Streaming iterator returned by [`Db::iter_all`].
1328///
1329/// Holds a [`ReadTxn`] for its lifetime so every yielded document
1330/// is consistent with the snapshot pinned at construction. Yields
1331/// `Result<(Id, T)>` one entry at a time; refills its internal
1332/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
1333/// pager-lock acquisition, so peak memory stays bounded at a small
1334/// constant regardless of the collection's size — power-of-ten
1335/// Rule 3.
1336///
1337/// Construction errors surface at the [`Db::iter_all`] call site;
1338/// per-step errors (pager, B-tree, codec) surface as
1339/// `Some(Err(_))` during iteration and do NOT terminate it — the
1340/// caller decides whether to continue.
1341// Note: `IterAll<T>` is deliberately outside the `serde` surface
1342// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
1343// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
1344// neither of which is serializable in a meaningful way. Users who
1345// need an off-line representation of iteration output should
1346// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
1347// resulting `Vec<T>` themselves.
1348pub struct IterAll<'db, T> {
1349 /// Owns the snapshot pin for the iterator's lifetime.
1350 txn: ReadTxn<'db>,
1351 /// Cached primary-tree root + collection id (`Document::decode`
1352 /// needs the collection id to validate the per-doc header).
1353 descriptor: CollectionDescriptor,
1354 /// Pre-decoded buffer of upcoming entries.
1355 buffer: VecDeque<Result<(Id, T)>>,
1356 /// Resumption marker — `Excluded(last_emitted_key)` is the
1357 /// start bound of the next refill.
1358 last_emitted_key: Option<Vec<u8>>,
1359 /// `true` once the underlying B-tree iterator returned no more
1360 /// entries; subsequent `next` calls return `None`.
1361 finished: bool,
1362 /// Track the `T` parameter without owning a value.
1363 _phantom: PhantomData<fn() -> T>,
1364}
1365
1366impl<T> std::fmt::Debug for IterAll<'_, T> {
1367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1368 f.debug_struct("IterAll")
1369 .field("collection_id", &self.descriptor.collection_id)
1370 .field("buffer_len", &self.buffer.len())
1371 .field("finished", &self.finished)
1372 .finish_non_exhaustive()
1373 }
1374}
1375
1376impl<T: Document + Send + 'static> IterAll<'_, T> {
1377 /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
1378 /// entries, resuming from `last_emitted_key`. Stores per-doc
1379 /// decode errors as `Err` in the buffer so the caller can
1380 /// observe them via `next()` without aborting iteration.
1381 ///
1382 /// Power-of-ten Rule 7: every fallible operation is either
1383 /// captured into the buffer or returned up-front via `?` (which
1384 /// propagates the lock-acquisition / B-tree-open failures
1385 /// before the buffer is touched).
1386 fn refill(&mut self) -> Result<()> {
1387 // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
1388 // NOT keep an immutable borrow of `self.txn` alive across
1389 // the buffer-mutation calls below.
1390 let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
1391 let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
1392 kind: obj_core::LockKind::WriterInProcess,
1393 })?;
1394 let root_pid = PageId::new(self.descriptor.primary_root)
1395 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1396 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1397 let start = match &self.last_emitted_key {
1398 Some(k) => Bound::Excluded(k.clone()),
1399 None => Bound::Unbounded,
1400 };
1401 let collection_id = self.descriptor.collection_id;
1402 let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
1403 let mut yielded: usize = 0;
1404 let mut last_key: Option<Vec<u8>> = None;
1405 let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
1406 for step in iter {
1407 if yielded >= ITER_ALL_BATCH {
1408 break;
1409 }
1410 yielded = yielded
1411 .checked_add(1)
1412 .ok_or(Error::BTreeInvariantViolated {
1413 reason: "iter_all batch counter overflow",
1414 })?;
1415 buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
1416 }
1417 if yielded < ITER_ALL_BATCH {
1418 self.finished = true;
1419 }
1420 // The lock is held until `pager` drops at function end — but
1421 // by the time we mutate `self.buffer` / `self.last_emitted_key`
1422 // below, the `iter` (which borrowed `pager`) is already
1423 // dropped (its scope ended). Move the staged batch over.
1424 drop(pager);
1425 self.buffer.extend(batch);
1426 if let Some(k) = last_key {
1427 self.last_emitted_key = Some(k);
1428 }
1429 Ok(())
1430 }
1431}
1432
1433/// Process one B-tree iterator step into the staged `batch`. Decode
1434/// errors and corruption errors are captured as `Err` entries so
1435/// they surface via `next()` rather than aborting the refill —
1436/// power-of-ten Rule 7.
1437///
1438/// Free function (rather than `&mut self`) so the caller can keep
1439/// the pager lock open across multiple buffer-pushes without the
1440/// borrow checker tripping on a second `&mut self.buffer` borrow.
1441fn buffer_one_entry<T: Document>(
1442 batch: &mut VecDeque<Result<(Id, T)>>,
1443 last_key: &mut Option<Vec<u8>>,
1444 collection_id: u32,
1445 step: Result<(Vec<u8>, Vec<u8>)>,
1446) {
1447 let (key, value) = match step {
1448 Ok(kv) => kv,
1449 Err(e) => {
1450 batch.push_back(Err(e));
1451 return;
1452 }
1453 };
1454 let Some(id) = Id::from_be_bytes(&key) else {
1455 batch.push_back(Err(Error::InvalidArgument(
1456 "primary B-tree key is not an Id",
1457 )));
1458 return;
1459 };
1460 *last_key = Some(key);
1461 let decoded = obj_core::codec::decode::<T>(&value, collection_id);
1462 batch.push_back(decoded.map(|doc| (id, doc)));
1463}
1464
1465impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
1466 type Item = Result<(Id, T)>;
1467
1468 fn next(&mut self) -> Option<Self::Item> {
1469 if let Some(item) = self.buffer.pop_front() {
1470 return Some(item);
1471 }
1472 if self.finished {
1473 return None;
1474 }
1475 if let Err(e) = self.refill() {
1476 // A lock / open failure surfaces here ONCE and then
1477 // `finished` is flipped so subsequent `next` calls
1478 // terminate cleanly. The error is surfaced to the
1479 // caller via `Some(Err(_))` — power-of-ten Rule 7.
1480 self.finished = true;
1481 return Some(Err(e));
1482 }
1483 self.buffer.pop_front()
1484 }
1485}
1486
1487/// Split a possibly-namespaced collection name into its
1488/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
1489/// only; downstream collection names may contain further dots.
1490///
1491/// `"users"` → `(None, "users")`.
1492/// `"archive.orders"` → `(Some("archive"), "orders")`.
1493/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
1494#[must_use]
1495pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
1496 match name.find('.') {
1497 Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
1498 None => (None, name),
1499 }
1500}
1501
1502/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
1503/// concurrent reader / single-writer workloads. The thread-safety
1504/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
1505const _: () = {
1506 fn assert_send_sync<T: Send + Sync>() {}
1507 let _ = assert_send_sync::<Db>;
1508};
1509
1510/// Translate the first failure in `report` (if any) into the
1511/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
1512/// open-time fast check so the caller sees `Err(Error::Corruption {
1513/// page_id })` rather than an opaque `IntegrityReport`. The page-id
1514/// in the returned error is the locus of the failure when one is
1515/// available; for failures whose locus is a non-page (e.g. an
1516/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
1517/// catalog root page-id is used as a stand-in.
1518fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
1519 let first = report.failures.first()?;
1520 let err = match first {
1521 obj_core::IntegrityFailure::ChecksumMismatch { page_id }
1522 | obj_core::IntegrityFailure::OrphanPage { page_id }
1523 | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
1524 | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
1525 | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
1526 | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
1527 | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
1528 Error::Corruption { page_id: *page_id }
1529 }
1530 obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
1531 Error::BTreeDepthExceeded { limit: *limit }
1532 }
1533 // The remaining variants — OrphanIndexEntry,
1534 // MissingIndexEntry, and any future `#[non_exhaustive]`
1535 // additions — are never emitted by `quick_check` (which
1536 // walks the catalog tree only, not the per-collection
1537 // primary or index trees). The defensive fallback is a
1538 // coarse `Corruption { page_id: 0 }` so the open-time
1539 // check still fails closed if the obj-core layer ever
1540 // grows a new fast-check failure mode.
1541 _ => Error::Corruption { page_id: 0 },
1542 };
1543 Some(err)
1544}