Skip to main content

obj/
txn.rs

1//! Public `WriteTxn` / `ReadTxn` types.
2//!
3//! Thin wrappers over `obj_core::txn::{WriteTxn, ReadTxn}` that
4//! attach a [`Catalog`] reference (the obj-core txn types are
5//! catalog-agnostic; the catalog is the obj crate's responsibility).
6//!
7//! The catalog handle is `Arc<Mutex<Catalog<FileHandle>>>`.  Lock
8//! ordering: **always acquire the pager mutex (via the txn env)
9//! BEFORE the catalog mutex**.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, Mutex};
13
14use obj_core::btree::BTree;
15use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
16use obj_core::index::EncodedIndexKey;
17use obj_core::pager::checksum::crc32c;
18use obj_core::pager::page::PageId;
19use obj_core::pager::Pager;
20use obj_core::platform::FileHandle;
21use obj_core::{
22    Catalog, CollectionDescriptor, Document, Error, Id, IndexStatus, ReaderSnapshot, Result, TxnEnv,
23};
24
25use crate::collection::Collection;
26
27/// `type_version` stamped on documents written via the C ABI raw-
28/// bytes path. The C caller has no Rust `Document::VERSION`; we
29/// stamp 1 so the value is recognisable in dump output and the
30/// existing schema-version-from-future / migration logic still
31/// applies when a Rust-typed reader opens the same collection.
32///
33/// Bumping this constant is breaking for any consumer that has
34/// written raw-bytes data with the old value, so leave at 1
35/// pre-1.0.
36pub(crate) const RAW_BYTES_TYPE_VERSION: u32 = 1;
37
38/// Public write transaction.
39///
40/// Acquired by [`crate::Db::transaction`].  Holds the in-process
41/// write-serialization mutex + cross-process `WRITER_LOCK` for its
42/// entire lifetime.  `commit` / `rollback` consume `self`; dropping
43/// without explicitly committing rolls back automatically.
44pub struct WriteTxn<'db> {
45    pub(crate) inner: obj_core::WriteTxn<'db, FileHandle>,
46    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
47    /// Per-process cache of `(collection_name, version)` keys whose
48    /// `T::indexes()` reconciliation has already run. Reconciliation
49    /// is idempotent but expensive (a catalog walk + index
50    /// declarations); caching keeps the first
51    /// `WriteTxn::collection::<T>()` call per-process per
52    /// `(collection, version)` as the only one that pays the cost.
53    ///
54    /// #130 — the key includes the schema `version`: a later version of
55    /// the same collection that ADDS an index reconciles on its first
56    /// write rather than being skipped (the name-only key never let a
57    /// cross-version index addition become `Active`). See
58    /// [`crate::collection::reconcile_specs_once`] for the full
59    /// rationale and the removal-interleaving caveat.
60    ///
61    /// #93 — membership is promoted into this SHARED set ONLY after a
62    /// successful [`Self::commit`]. During a txn the keys live in the
63    /// per-txn [`Self::reconciled_staged`] set instead, so a
64    /// rolled-back first-ever txn never poisons this set into skipping
65    /// reconciliation on a later (committed) txn in the same process.
66    pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
67    /// #93 — per-transaction staged set of `(collection, version)` keys
68    /// whose `T::indexes()` reconciliation has run INSIDE this (not-yet-
69    /// committed) txn. The skip-check in
70    /// [`crate::collection::reconcile_indexes_once`] is `shared ∪
71    /// staged`, so a second handle of the same `(collection, version)`
72    /// in one txn still skips the (idempotent) catalog walk — but the
73    /// keys are only folded into the shared [`Self::reconciled`] set by
74    /// [`Self::commit`] AFTER the WAL commit succeeds. On rollback /
75    /// drop this set is discarded with NO shared-set mutation, so a
76    /// rolled-back lazy-create leaves the shared cache untouched and
77    /// the next txn re-reconciles correctly.
78    ///
79    /// Not behind a mutex: a `WriteTxn` is single-threaded (it holds
80    /// the write-serialization lock for its whole life and is borrowed
81    /// `&mut` by every write), so interior mutability via the staging
82    /// helpers is sufficient.
83    pub(crate) reconciled_staged: HashSet<(String, u32)>,
84    /// #90 — batch-aware catalog flush. Per-transaction cache of the
85    /// LIVE [`CollectionDescriptor`] for every collection touched by
86    /// a write, keyed by collection name. This is the SOLE mid-txn
87    /// source of truth for `next_id`, `primary_root`, and each
88    /// index's `root_page_id`: every write bumps / advances these
89    /// IN-MEMORY rather than rewriting the catalog B-tree per doc.
90    /// [`Self::commit`] flushes each entry back through
91    /// `Catalog::update` exactly ONCE (see
92    /// [`Self::flush_descriptors`]). On rollback / drop the cache is
93    /// discarded with no catalog-tree side effects.
94    ///
95    /// The `Arc<Mutex<_>>` is cloned into each [`Collection`]'s
96    /// `WriteRef` so two handles of the same collection opened in one
97    /// txn share the single entry.
98    pub(crate) descriptors: crate::collection::DescriptorCache,
99}
100
101impl<'db> WriteTxn<'db> {
102    /// Construct a `WriteTxn` directly from its three pieces.
103    /// Public so the FFI layer ([`libobj`](../../libobj/index.html))
104    /// can build an owned write txn whose lifetime extends past a
105    /// single `Db::transaction` closure call.
106    ///
107    /// User-Rust callers should reach for `Db::transaction` — the
108    /// closure shape handles commit / rollback / drop semantics
109    /// correctly without needing direct construction.
110    #[doc(hidden)]
111    #[must_use]
112    pub fn from_parts(
113        inner: obj_core::WriteTxn<'db, FileHandle>,
114        catalog: Arc<Mutex<Catalog<FileHandle>>>,
115        reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
116    ) -> Self {
117        Self {
118            inner,
119            catalog,
120            reconciled,
121            reconciled_staged: HashSet::new(),
122            descriptors: crate::collection::new_descriptor_cache(),
123        }
124    }
125
126    pub(crate) fn new(
127        inner: obj_core::WriteTxn<'db, FileHandle>,
128        catalog: Arc<Mutex<Catalog<FileHandle>>>,
129        reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
130    ) -> Self {
131        Self::from_parts(inner, catalog, reconciled)
132    }
133
134    /// Open a typed handle to the collection `T` lives in.
135    ///
136    /// Lazily creates the catalog row + an empty primary B-tree on
137    /// first call for a given `T` inside the current process.  The
138    /// catalog mutation is staged in the same WAL transaction as
139    /// the user's subsequent writes — a rolled-back txn leaves no
140    /// half-created collection.
141    ///
142    /// # Errors
143    ///
144    /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
145    /// - Any error the pager / B-tree / postcard codec returns.
146    pub fn collection<T: Document>(&mut self) -> Result<Collection<'_, T>> {
147        // M11 #93: namespaced collections live in attached
148        // read-only databases. Reject writes eagerly at handle-
149        // open time so the caller does not chase an opaque "no
150        // such collection" error later.
151        if let (Some(namespace), tail) = crate::db::split_namespace(T::COLLECTION) {
152            return Err(Error::AttachedDatabaseIsReadOnly {
153                namespace: namespace.to_owned(),
154                collection: tail.to_owned(),
155            });
156        }
157        Collection::open_or_create(self)
158    }
159
160    /// Commit the transaction.
161    ///
162    /// #90: flushes every cached [`CollectionDescriptor`] back to the
163    /// catalog (one `Catalog::update` per touched collection) BEFORE
164    /// the WAL commit, so the coalesced `next_id` / `primary_root` /
165    /// index-root advances land durably in the same transaction as the
166    /// document + index writes. A flush failure aborts the commit (the
167    /// `?` propagates and `self` drops, rolling the WAL back) rather
168    /// than committing a half-flushed catalog.
169    ///
170    /// #93: AFTER the WAL commit succeeds, fold this txn's staged
171    /// reconciled-collection names into the shared per-process
172    /// `reconciled` set, so the expensive `T::indexes()`
173    /// reconciliation is skipped for those collections on later txns.
174    /// Promotion is deliberately POST-commit: a rolled-back txn never
175    /// reaches here, so it cannot poison the shared cache into skipping
176    /// reconciliation against a catalog whose index rows it just rolled
177    /// back. A poisoned `reconciled` mutex maps to `Error::Busy` (Rule
178    /// 7) but does NOT un-commit the durable WAL state.
179    ///
180    /// # Errors
181    ///
182    /// As [`obj_core::WriteTxn::commit`], plus any catalog / pager /
183    /// postcard error surfaced by the descriptor flush, plus
184    /// [`Error::Busy`] if the shared `reconciled` mutex is poisoned at
185    /// promotion time (after the commit has already landed durably).
186    pub fn commit(self) -> Result<()> {
187        self.flush_descriptors()?;
188        // Move the pieces out before consuming `inner`: `commit`
189        // takes `self` by value, and `self.inner.commit()` moves the
190        // inner txn, so the shared/staged handles must be captured
191        // first.
192        let Self {
193            inner,
194            reconciled,
195            reconciled_staged,
196            ..
197        } = self;
198        inner.commit()?;
199        promote_reconciled(&reconciled, reconciled_staged)
200    }
201
202    /// #90: persist every cached descriptor back to the catalog
203    /// B-tree exactly once. Called by [`Self::commit`] before the WAL
204    /// commit. Iterates the per-txn descriptor cache (one entry per
205    /// touched collection — Rule 2 bound is the touched-collection
206    /// count) and issues one `Catalog::update` apiece, propagating the
207    /// first failure via `?` so a partial flush aborts the commit.
208    ///
209    /// Lock order matches every write path: pager BEFORE catalog (the
210    /// descriptor-cache lock is acquired and released first, so it is
211    /// never held across the pager/catalog locks).
212    fn flush_descriptors(&self) -> Result<()> {
213        let entries: Vec<(String, CollectionDescriptor)> = {
214            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
215            if cache.is_empty() {
216                return Ok(());
217            }
218            cache
219                .iter()
220                .map(|(name, descriptor)| (name.clone(), descriptor.clone()))
221                .collect()
222        };
223        let mut pager = lock_pager(self.inner.env())?;
224        let mut catalog = lock_catalog(&self.catalog)?;
225        for (name, descriptor) in &entries {
226            catalog.update(&mut pager, name, descriptor)?;
227        }
228        Ok(())
229    }
230
231    /// Roll the transaction back.
232    ///
233    /// # Errors
234    ///
235    /// As [`obj_core::WriteTxn::rollback`].
236    pub fn rollback(self) -> Result<()> {
237        self.inner.rollback()
238    }
239
240    /// **FFI shim**: insert a raw-bytes document into `collection`,
241    /// returning the freshly-allocated [`Id`].
242    ///
243    /// The payload is stored as-is; the on-disk record carries the
244    /// standard 16-byte [`DocumentHeader`] with
245    /// `type_version = RAW_BYTES_TYPE_VERSION` and a CRC32C of the
246    /// payload. The collection is lazy-created in the same WAL
247    /// transaction if it does not already exist.
248    ///
249    /// **Index maintenance does NOT run** on the raw-bytes path —
250    /// the C ABI's caller has no schema introspection. Documents
251    /// inserted through this path are invisible to indexes built
252    /// by typed [`Document`] writers, until a Rust-side
253    /// `WriteTxn::collection::<T>()` rewrites them.
254    ///
255    /// Forwards to [`Self::insert_with_version`] with
256    /// `type_version = RAW_BYTES_TYPE_VERSION`. Callers that have
257    /// a meaningful schema version to stamp (e.g. obj-py's typed
258    /// path, which knows `cls.__obj_version__`) should call
259    /// [`Self::insert_with_version`] directly.
260    ///
261    /// # Errors
262    ///
263    /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
264    ///   collections (attached dbs are read-only).
265    /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
266    ///   B-tree inline cap.
267    /// - Pager / catalog errors propagated.
268    #[doc(hidden)]
269    pub fn insert_raw_bytes(&mut self, collection: &str, payload: &[u8]) -> Result<Id> {
270        self.insert_with_version(collection, payload, RAW_BYTES_TYPE_VERSION)
271    }
272
273    /// **Engine API**: insert a raw-bytes document into `collection`
274    /// with a caller-supplied `type_version` stamped in the per-doc
275    /// header. Returns the freshly-allocated [`Id`].
276    ///
277    /// This is the byte-identity entry point for cross-language
278    /// writers (obj-py's typed `Db.insert(instance)` calls this
279    /// with `cls.__obj_version__`, matching what Rust's
280    /// `#[derive(Document)]` stamps via [`obj_core::codec::encode`]).
281    /// Apart from the version source, the behaviour is identical to
282    /// [`Self::insert_raw_bytes`].
283    ///
284    /// **Index maintenance does NOT run** — same caveat as the
285    /// raw-bytes shim. Use [`Self::collection`] for typed writes
286    /// that need index maintenance.
287    ///
288    /// # Errors
289    ///
290    /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
291    ///   collections.
292    /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
293    ///   B-tree inline cap.
294    /// - Pager / catalog errors propagated.
295    #[doc(hidden)]
296    pub fn insert_with_version(
297        &mut self,
298        collection: &str,
299        payload: &[u8],
300        type_version: u32,
301    ) -> Result<Id> {
302        reject_namespaced_write(collection)?;
303        let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
304        let mut pager = lock_pager(self.inner.env())?;
305        let catalog = lock_catalog(&self.catalog)?;
306        // #90: the per-txn descriptor cache is the sole mid-txn source
307        // of truth — `next_id` is an in-memory bump, `primary_root`
308        // advances in the cache, and NO per-doc `Catalog::update`
309        // fires (the single flush is deferred to commit). This keeps
310        // the raw-bytes path coherent with the typed path if both
311        // touch the same collection in one transaction.
312        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
313        let descriptor =
314            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
315        let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || collection.to_owned())?;
316        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
317        let key = id.to_be_bytes();
318        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
319        tree.insert(&mut pager, &key, &bytes)?;
320        descriptor.primary_root = tree.root().get();
321        Ok(id)
322    }
323
324    /// **FFI shim**: fetch the raw payload of the document at `id`
325    /// in `collection`. Returns `Ok(None)` if absent. The returned
326    /// `Vec<u8>` is the payload only (the 16-byte per-doc header
327    /// is stripped).
328    ///
329    /// Forwards to [`Self::get_with_version`] and discards the
330    /// stored version. Use [`Self::get_with_version`] directly when
331    /// the caller needs the header's `type_version` (e.g. obj-py's
332    /// typed read path, which dispatches schema migration on it).
333    ///
334    /// # Errors
335    ///
336    /// - [`Error::CollectionNotFound`] if the collection is unknown.
337    /// - [`Error::Corruption`] if the on-disk record is malformed.
338    /// - Pager / catalog errors propagated.
339    #[doc(hidden)]
340    pub fn get_raw_bytes(&mut self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
341        Ok(self
342            .get_with_version(collection, id)?
343            .map(|(payload, _version)| payload))
344    }
345
346    /// **Engine API**: fetch the raw payload AND stored
347    /// `type_version` of the document at `id` in `collection`.
348    /// Returns `Ok(None)` if absent.
349    ///
350    /// Companion read accessor for the version-aware write path
351    /// ([`Self::insert_with_version`]) — used by obj-py's typed
352    /// read pipeline to dispatch directly on the stored header
353    /// version instead of the historical try-decode-walk heuristic.
354    ///
355    /// # Errors
356    ///
357    /// As [`Self::get_raw_bytes`].
358    #[doc(hidden)]
359    pub fn get_with_version(&mut self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
360        // #90: a write-side raw get must descend the LIVE primary root
361        // — prefer the per-txn cache (which carries any uncommitted
362        // primary-root advance from a prior raw write in this txn) and
363        // fall back to the catalog tree if the collection has not been
364        // written in the txn.
365        let descriptor = self.live_descriptor_required(collection)?;
366        let mut pager = lock_pager(self.inner.env())?;
367        let tree = btree_handle(&pager, descriptor.primary_root)?;
368        let key = id.to_be_bytes();
369        match tree.get(&mut pager, &key)? {
370            Some(bytes) => Ok(Some(strip_raw_payload_with_version(
371                &bytes,
372                descriptor.collection_id,
373            )?)),
374            None => Ok(None),
375        }
376    }
377
378    /// #90: resolve the LIVE descriptor for `collection`, preferring
379    /// the per-txn cache (with its in-memory root advances) over the
380    /// catalog tree. Returns `CollectionNotFound` if neither has it.
381    /// Returns an owned clone so the caller does not hold the cache
382    /// lock across the subsequent pager work.
383    fn live_descriptor_required(&self, collection: &str) -> Result<CollectionDescriptor> {
384        {
385            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
386            if let Some(descriptor) = cache.get(collection) {
387                return Ok(descriptor.clone());
388            }
389        }
390        catalog_get_required(&self.inner, &self.catalog, collection)
391    }
392
393    /// **FFI shim**: update the document at `id` in `collection`
394    /// to `payload` bytes. Returns [`Error::DocumentNotFound`] if
395    /// the id is absent.
396    ///
397    /// Forwards to [`Self::update_with_version`] with
398    /// `type_version = RAW_BYTES_TYPE_VERSION`.
399    ///
400    /// # Errors
401    ///
402    /// - [`Error::DocumentNotFound`] if `id` does not exist.
403    /// - [`Error::AttachedDatabaseIsReadOnly`] / [`Error::DocumentTooLarge`]
404    ///   etc as [`Self::insert_raw_bytes`].
405    #[doc(hidden)]
406    pub fn update_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
407        self.update_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
408    }
409
410    /// **Engine API**: update the document at `id` in `collection`
411    /// to `payload` bytes, stamping the per-doc header's
412    /// `type_version` with the caller-supplied value.
413    ///
414    /// Companion to [`Self::insert_with_version`] for the typed
415    /// write path.
416    ///
417    /// # Errors
418    ///
419    /// As [`Self::update_raw_bytes`].
420    #[doc(hidden)]
421    pub fn update_with_version(
422        &mut self,
423        collection: &str,
424        id: Id,
425        payload: &[u8],
426        type_version: u32,
427    ) -> Result<()> {
428        reject_namespaced_write(collection)?;
429        // #90: route the descriptor through the per-txn cache so the
430        // primary-root advance coalesces into the single commit flush.
431        let exists = self.collection_exists(collection)?;
432        if !exists {
433            return Err(Error::CollectionNotFound {
434                name: collection.to_owned(),
435            });
436        }
437        let mut pager = lock_pager(self.inner.env())?;
438        let catalog = lock_catalog(&self.catalog)?;
439        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
440        let descriptor =
441            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
442        let key = id.to_be_bytes();
443        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
444        if tree.get(&mut pager, &key)?.is_none() {
445            // The collection name is owned (`String`); the
446            // DocumentNotFound variant's `collection` is `&'static
447            // str` (designed for typed Document::COLLECTION). Use a
448            // bespoke variant when widening would land — for v0 the
449            // closest match is Corruption with a synthetic page_id,
450            // but that's misleading; use InvalidArgument with the
451            // standard "not found" semantics surfaced by the caller.
452            return Err(Error::CollectionNotFound {
453                name: format!("{collection}#{}", id.get()),
454            });
455        }
456        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
457        tree.delete(&mut pager, &key)?;
458        tree.insert(&mut pager, &key, &bytes)?;
459        descriptor.primary_root = tree.root().get();
460        Ok(())
461    }
462
463    /// **FFI shim**: delete the document at `id` in `collection`.
464    /// Returns `Ok(true)` if it existed, `Ok(false)` if not.
465    ///
466    /// # Errors
467    ///
468    /// Pager / catalog errors propagated.
469    #[doc(hidden)]
470    pub fn delete_raw_bytes(&mut self, collection: &str, id: Id) -> Result<bool> {
471        reject_namespaced_write(collection)?;
472        if !self.collection_exists(collection)? {
473            return Err(Error::CollectionNotFound {
474                name: collection.to_owned(),
475            });
476        }
477        let mut pager = lock_pager(self.inner.env())?;
478        let catalog = lock_catalog(&self.catalog)?;
479        // #90: advance the primary root in the per-txn cache; the
480        // single catalog flush is deferred to commit.
481        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
482        let descriptor =
483            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
484        let key = id.to_be_bytes();
485        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
486        let removed = tree.delete(&mut pager, &key)?;
487        descriptor.primary_root = tree.root().get();
488        Ok(removed)
489    }
490
491    /// **Engine API**: count every document in `collection` as seen
492    /// inside THIS write transaction — i.e. the count reflects the
493    /// txn's own uncommitted inserts / deletes, consistent with
494    /// [`Self::get_with_version`]'s live-read semantics.
495    ///
496    /// Companion to the snapshot-isolated read-side
497    /// [`ReadTxn::count_all_raw`]: the write side descends the LIVE
498    /// primary B-tree at the per-txn cache's current `primary_root`
499    /// (preferring any in-memory root advance from a prior raw write
500    /// in this txn) rather than a pinned reader snapshot. Used by
501    /// obj-py's `WriteCollection.count_all()` so a typed collection
502    /// handle can count uncommitted state.
503    ///
504    /// The full-tree scan does not decode records; the iteration
505    /// count is bounded by the primary tree's entry count and the
506    /// running total is `checked_add`-guarded (Rule 2 / Rule 7) so a
507    /// `> u64::MAX` count surfaces as [`Error::BTreeInvariantViolated`]
508    /// rather than wrapping.
509    ///
510    /// # Errors
511    ///
512    /// - [`Error::CollectionNotFound`] if `collection` does not exist.
513    /// - Pager / B-tree errors propagated from the descent / scan.
514    #[doc(hidden)]
515    pub fn count_all_raw(&mut self, collection: &str) -> Result<u64> {
516        let descriptor = self.live_descriptor_required(collection)?;
517        let mut pager = lock_pager(self.inner.env())?;
518        let tree = btree_handle(&pager, descriptor.primary_root)?;
519        let mut n: u64 = 0;
520        for step in tree.range(&mut pager, ..)? {
521            let _ = step?;
522            n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
523                reason: "primary tree entry count exceeds u64",
524            })?;
525        }
526        Ok(n)
527    }
528
529    /// **FFI shim**: insert-or-replace the document at `id` in
530    /// `collection` to `payload` bytes.
531    ///
532    /// Forwards to [`Self::upsert_with_version`] with
533    /// `type_version = RAW_BYTES_TYPE_VERSION`.
534    ///
535    /// # Errors
536    ///
537    /// As [`Self::insert_raw_bytes`].
538    #[doc(hidden)]
539    pub fn upsert_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
540        self.upsert_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
541    }
542
543    /// **Engine API**: insert-or-replace the document at `id` in
544    /// `collection`, stamping the per-doc header's `type_version`
545    /// with the caller-supplied value. Companion to
546    /// [`Self::insert_with_version`] for the typed upsert path.
547    ///
548    /// # Errors
549    ///
550    /// As [`Self::insert_with_version`].
551    #[doc(hidden)]
552    pub fn upsert_with_version(
553        &mut self,
554        collection: &str,
555        id: Id,
556        payload: &[u8],
557        type_version: u32,
558    ) -> Result<()> {
559        reject_namespaced_write(collection)?;
560        let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
561        let mut pager = lock_pager(self.inner.env())?;
562        let catalog = lock_catalog(&self.catalog)?;
563        // #90: advance the primary root in the per-txn cache; flush
564        // once at commit.
565        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
566        let descriptor =
567            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
568        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
569        let key = id.to_be_bytes();
570        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
571        let _ = tree.delete(&mut pager, &key)?;
572        tree.insert(&mut pager, &key, &bytes)?;
573        descriptor.primary_root = tree.root().get();
574        Ok(())
575    }
576
577    // ---------- index-maintaining raw-bytes writes (FFI) ----------
578
579    /// **Engine API**: insert a raw-bytes document into `collection`
580    /// AND maintain the named secondary indexes from caller-supplied
581    /// field-encoded keys. Returns the freshly-allocated [`Id`].
582    ///
583    /// Unlike [`Self::insert_raw_bytes`] (primary-only), this is the
584    /// schema-bearing raw write: the C ABI cannot reflect index keys
585    /// out of an opaque payload, so the CALLER supplies one
586    /// `(index_name, field_key)` entry per index value, where
587    /// `field_key` is the order-preserving encoding of the indexed
588    /// field (produced by `obj_core::index::encode_field` —
589    /// `libobj::obj_index_key_encode` wraps it). obj does the
590    /// kind-specific STORAGE-key composition (append the `Id` suffix
591    /// for `Standard` / `Each` / `Composite`; use the field key as-is
592    /// with the `Id` as value + enforce uniqueness for `Unique`),
593    /// matching the typed path byte-for-byte.
594    ///
595    /// Atomicity: the primary insert + every index maintenance lands
596    /// inside the same WAL transaction. An unknown index name
597    /// ([`Error::IndexNotFound`]) or a uniqueness violation
598    /// ([`Error::UniqueConstraintViolation`]) surfaces via `?` and the
599    /// surrounding [`WriteTxn`] rolls back the staged primary write
600    /// atomically — no half-written index.
601    ///
602    /// # Errors
603    ///
604    /// - [`Error::IndexNotFound`] if an entry names an index that does
605    ///   not exist or is not `Active` on `collection`.
606    /// - [`Error::UniqueConstraintViolation`] if a `Unique` entry's
607    ///   key already maps to a different document.
608    /// - As [`Self::insert_with_version`] (namespaced / too-large /
609    ///   pager / catalog).
610    pub fn insert_raw_indexed(
611        &mut self,
612        collection: &str,
613        payload: &[u8],
614        type_version: u32,
615        entries: &[(String, Vec<u8>)],
616    ) -> Result<Id> {
617        let id = self.insert_with_version(collection, payload, type_version)?;
618        self.maintain_raw_indexes(collection, id, &[], entries)?;
619        Ok(id)
620    }
621
622    /// **Engine API**: update the document at `id` in `collection` to
623    /// `payload` AND move its secondary-index entries from the OLD
624    /// caller-supplied field keys to the NEW ones.
625    ///
626    /// obj cannot re-derive the OLD index keys from the stored opaque
627    /// bytes, so the caller MUST supply BOTH the `remove` set (the
628    /// field keys the document indexed under before this update) and
629    /// the `add` set (the field keys it indexes under after). Each is
630    /// one `(index_name, field_key)` entry per index value. The
631    /// kind-specific composition + uniqueness enforcement matches
632    /// [`Self::insert_raw_indexed`].
633    ///
634    /// Atomicity: the primary update + every index removal/insertion
635    /// lands in the same WAL transaction; any error rolls the whole
636    /// thing back.
637    ///
638    /// # Errors
639    ///
640    /// - [`Error::CollectionNotFound`] if `id` does not exist.
641    /// - [`Error::IndexNotFound`] / [`Error::UniqueConstraintViolation`]
642    ///   as [`Self::insert_raw_indexed`].
643    /// - As [`Self::update_with_version`].
644    pub fn update_raw_indexed(
645        &mut self,
646        collection: &str,
647        id: Id,
648        payload: &[u8],
649        type_version: u32,
650        remove: &[(String, Vec<u8>)],
651        add: &[(String, Vec<u8>)],
652    ) -> Result<()> {
653        self.update_with_version(collection, id, payload, type_version)?;
654        self.maintain_raw_indexes(collection, id, remove, add)?;
655        Ok(())
656    }
657
658    /// **Engine API**: delete the document at `id` in `collection`
659    /// AND remove its secondary-index entries given the caller-
660    /// supplied OLD field keys. Returns `Ok(true)` if the primary
661    /// record existed, `Ok(false)` if not.
662    ///
663    /// As with [`Self::update_raw_indexed`], obj cannot re-derive the
664    /// index keys from stored bytes, so the caller supplies the
665    /// `remove` set (one `(index_name, field_key)` per indexed value).
666    /// The index removals always run (even on `Ok(false)`) so a caller
667    /// can repair a known-stale index entry; this mirrors the typed
668    /// `Collection::delete` which also diffs against the supplied OLD
669    /// key set regardless of primary presence.
670    ///
671    /// # Errors
672    ///
673    /// - [`Error::IndexNotFound`] if a `remove` entry names an unknown
674    ///   / non-`Active` index.
675    /// - As [`Self::delete_raw_bytes`].
676    pub fn delete_raw_indexed(
677        &mut self,
678        collection: &str,
679        id: Id,
680        remove: &[(String, Vec<u8>)],
681    ) -> Result<bool> {
682        let removed = self.delete_raw_bytes(collection, id)?;
683        self.maintain_raw_indexes(collection, id, remove, &[])?;
684        Ok(removed)
685    }
686
687    /// **Engine API**: declare / reconcile a runtime [`obj_core::IndexSpec`] set
688    /// into the catalog for `collection`, making each `Active` BEFORE
689    /// any index-maintaining raw write ([`Self::insert_raw_indexed`] &c.
690    /// require the index already `Active`).
691    ///
692    /// This is the NON-generic equivalent of the `#[derive(Document)]`
693    /// reconcile path (`WriteTxn::collection::<T>()`, which reflects
694    /// `T::indexes()`): a caller that has no Rust `Document` type — the
695    /// obj-py / FFI index-declaration path (#108) — supplies the specs
696    /// directly. Both share ONE body
697    /// (`reconcile_specs_once`) so the cache /
698    /// staging / validation / catalog-walk semantics never diverge.
699    ///
700    /// Lazy-creates the collection's catalog row + empty primary B-tree
701    /// on first call (as the typed path does), then runs the same
702    /// `shared ∪ staged` skip-cache: a SECOND call with the same
703    /// `(collection, version)` is a no-op (the underlying
704    /// `Catalog::reconcile_indexes` is itself idempotent for matching
705    /// `(name, kind, key_paths)`). The catalog mutation is staged in the
706    /// live WAL transaction — a rolled-back txn leaves no half-declared
707    /// index, and the per-process reconciled cache is only promoted on a
708    /// successful [`Self::commit`].
709    ///
710    /// # `version` (#130)
711    ///
712    /// The skip-cache is keyed by `(collection, version)`, not by
713    /// `collection` alone, so a LATER schema `version` of the same
714    /// collection that ADDS an index reconciles on its first call rather
715    /// than being skipped. The caller passes the schema version the
716    /// `specs` belong to (e.g. the typed `Document::VERSION` or the
717    /// obj-py `@document` version). One narrow caveat applies when two
718    /// live versions of one collection declare DIFFERENT (conflicting)
719    /// index sets and their writes interleave in a single process — see
720    /// the `reconcile_specs_once` internal docs. Index ADDITION (the
721    /// common monotonic case) is fully correct.
722    ///
723    /// # Errors
724    ///
725    /// - [`Error::InvalidArgument`] if any spec is malformed (validated
726    ///   before any catalog mutation).
727    /// - [`Error::IndexKindMismatch`] / [`Error::IndexKeyPathsMismatch`]
728    ///   if a spec re-declares an existing `Active` index with a
729    ///   different `(kind, key_paths)`.
730    /// - [`Error::Busy`] on a poisoned pager / catalog mutex.
731    /// - Pager / B-tree / postcard errors propagated.
732    pub fn reconcile_indexes_raw(
733        &mut self,
734        collection: &str,
735        version: u32,
736        specs: &[obj_core::IndexSpec],
737    ) -> Result<()> {
738        // Lazy-create the collection first: `reconcile_indexes` errors
739        // with `CollectionNotFound` if the catalog row is absent, and an
740        // FFI caller declaring indexes on a brand-new collection should
741        // not have to issue a separate create. Mirrors the typed path's
742        // `ensure_collection::<T>` → `reconcile_indexes_once::<T>` order.
743        let _descriptor = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
744        crate::collection::reconcile_specs_once(
745            &self.inner,
746            &self.catalog,
747            &self.reconciled,
748            &mut self.reconciled_staged,
749            collection,
750            version,
751            specs,
752        )
753    }
754
755    /// Apply the per-index removal (`old`) + addition (`new`) churn
756    /// for a raw-bytes write, composing the storage key per index
757    /// kind via the shared non-generic seam
758    /// [`crate::index_maint::maintain_index_from_keys`].
759    ///
760    /// Resolves each `(index_name, field_key)` entry to its `Active`
761    /// [`obj_core::IndexDescriptor`] by name, groups the OLD and NEW
762    /// field keys per index, and maintains every index touched by
763    /// either set. The (possibly COW-advanced) descriptor is persisted
764    /// back to the catalog once. Runs entirely under the pager +
765    /// catalog locks held for the whole call, inside the live WAL
766    /// transaction — so a mid-way error rolls back atomically.
767    fn maintain_raw_indexes(
768        &mut self,
769        collection: &str,
770        id: Id,
771        old: &[(String, Vec<u8>)],
772        new: &[(String, Vec<u8>)],
773    ) -> Result<()> {
774        if old.is_empty() && new.is_empty() {
775            return Ok(());
776        }
777        let mut pager = lock_pager(self.inner.env())?;
778        let catalog = lock_catalog(&self.catalog)?;
779        // #90: maintain against the per-txn cached descriptor (the
780        // sole mid-txn source of truth) so the unique pre-check sees
781        // prior eager index writes in this txn and the index-root
782        // advances accumulate in the cache. NO per-call
783        // `Catalog::update` — the single flush is deferred to commit.
784        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
785        let descriptor =
786            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
787        let touched = touched_index_names(old, new);
788        for index_name in &touched {
789            maintain_one_raw_index(&mut pager, descriptor, collection, index_name, old, new, id)?;
790        }
791        Ok(())
792    }
793
794    /// #90: does `collection` have a catalog row (either already in
795    /// the per-txn descriptor cache, or in the live catalog tree)?
796    /// Used by the raw update / delete paths to preserve their
797    /// `CollectionNotFound` contract before routing the descriptor
798    /// through the cache.
799    fn collection_exists(&self, collection: &str) -> Result<bool> {
800        {
801            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
802            if cache.contains_key(collection) {
803                return Ok(true);
804            }
805        }
806        let mut pager = lock_pager(self.inner.env())?;
807        let catalog = lock_catalog(&self.catalog)?;
808        Ok(catalog.get(&mut pager, collection)?.is_some())
809    }
810}
811
812/// Public read transaction.  Acquired by
813/// [`crate::Db::read_transaction`].
814///
815/// Carries only the obj-core read-side handle — the writer's live
816/// `Catalog` is NOT held by a `ReadTxn` because reads consult the
817/// snapshot-pinned catalog root via
818/// [`obj_core::Catalog::lookup_via_snapshot`] (M6 #53), not the
819/// live `Catalog.tree.root`.
820///
821/// M11 #93: a `ReadTxn` MAY also carry one `AttachedReadCtx` per
822/// attached database registered on the calling [`crate::Db`]. The
823/// per-attached snapshots are pinned at txn-begin time and released
824/// when the `ReadTxn` drops; reads against `<namespace>.<collection>`
825/// route through them.
826pub struct ReadTxn<'db> {
827    pub(crate) inner: obj_core::ReadTxn<'db, FileHandle>,
828    /// Per-attached-database read contexts, keyed by namespace.
829    /// Populated by [`crate::Db::read_transaction`] before the
830    /// closure runs; emptied when the txn drops.
831    pub(crate) attached: HashMap<String, AttachedReadCtx>,
832}
833
834impl<'db> ReadTxn<'db> {
835    /// Construct a `ReadTxn` from a bare obj-core handle. Public so
836    /// the FFI layer can build an owned read txn whose lifetime
837    /// extends past a single `Db::read_transaction` closure call.
838    ///
839    /// User-Rust callers should reach for `Db::read_transaction`.
840    #[doc(hidden)]
841    #[must_use]
842    pub fn from_parts(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
843        Self {
844            inner,
845            attached: HashMap::new(),
846        }
847    }
848
849    pub(crate) fn new(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
850        Self::from_parts(inner)
851    }
852
853    pub(crate) fn with_attached(
854        inner: obj_core::ReadTxn<'db, FileHandle>,
855        attached: HashMap<String, AttachedReadCtx>,
856    ) -> Self {
857        Self { inner, attached }
858    }
859
860    /// Resolve a (possibly namespaced) collection name to the
861    /// `(env, snapshot, lookup_name)` the raw-bytes read shims should
862    /// read through. A bare `"collection"` resolves against the
863    /// calling Db's own snapshot exactly as before; a
864    /// `"<ns>.<tail>"` name resolves against the read-only database
865    /// attached under `<ns>` (its pinned snapshot), with the namespace
866    /// prefix stripped for the catalog lookup.
867    ///
868    /// Mirrors the namespace dispatch in
869    /// [`crate::collection::Collection::open_readonly_named`] — the
870    /// only other namespace-aware read path — so both honour the same
871    /// `<ns>.<tail>` → attached-snapshot rule.
872    ///
873    /// # Errors
874    ///
875    /// - [`Error::CollectionNamespaceUnknown`] if `collection`
876    ///   carries a namespace prefix that is not attached.
877    fn resolve_read_target<'a>(&'a self, collection: &'a str) -> Result<ReadTarget<'a>> {
878        let (namespace, tail) = crate::db::split_namespace(collection);
879        match namespace {
880            None => Ok(ReadTarget {
881                env: self.inner.env(),
882                snapshot: self.inner.snapshot(),
883                lookup_name: collection,
884            }),
885            Some(ns) => {
886                let ctx =
887                    self.attached
888                        .get(ns)
889                        .ok_or_else(|| Error::CollectionNamespaceUnknown {
890                            namespace: ns.to_owned(),
891                        })?;
892                Ok(ReadTarget {
893                    env: ctx.env.as_ref(),
894                    snapshot: &ctx.snapshot,
895                    lookup_name: tail,
896                })
897            }
898        }
899    }
900
901    /// **FFI shim**: fetch the raw payload of the document at `id`
902    /// in `collection`, snapshot-consistent against the read txn's
903    /// pinned LSN. Returns `Ok(None)` if absent.
904    ///
905    /// Forwards to [`Self::get_with_version`] and discards the
906    /// stored version.
907    ///
908    /// # Errors
909    ///
910    /// - [`Error::CollectionNotFound`] if the collection is unknown.
911    /// - [`Error::Corruption`] if the on-disk record is malformed.
912    /// - Pager / catalog errors propagated.
913    #[doc(hidden)]
914    pub fn get_raw_bytes(&self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
915        Ok(self
916            .get_with_version(collection, id)?
917            .map(|(payload, _version)| payload))
918    }
919
920    /// **Engine API**: fetch the raw payload AND stored
921    /// `type_version` of the document at `id` in `collection`,
922    /// snapshot-consistent against the read txn's pinned LSN.
923    /// Returns `Ok(None)` if absent.
924    ///
925    /// Companion read accessor for the version-aware write path
926    /// ([`WriteTxn::insert_with_version`]) — used by obj-py's
927    /// typed read pipeline to dispatch on the stored header
928    /// version instead of the historical try-decode-walk heuristic.
929    ///
930    /// # Errors
931    ///
932    /// As [`Self::get_raw_bytes`].
933    #[doc(hidden)]
934    pub fn get_with_version(&self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
935        let target = self.resolve_read_target(collection)?;
936        let descriptor = target.collection_descriptor(collection)?;
937        let pager = lock_pager(target.env)?;
938        let root = PageId::new(descriptor.primary_root)
939            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
940        let key = id.to_be_bytes();
941        let bytes = obj_core::btree::BTree::<FileHandle>::get_via_snapshot(
942            &pager,
943            target.snapshot,
944            root,
945            &key,
946        )?;
947        match bytes {
948            Some(b) => Ok(Some(strip_raw_payload_with_version(
949                &b,
950                descriptor.collection_id,
951            )?)),
952            None => Ok(None),
953        }
954    }
955
956    /// **FFI shim**: look up the descriptor for `collection`
957    /// against the snapshot. Returns `Ok(None)` if absent.
958    ///
959    /// Used by [`libobj`](../../libobj/index.html) for query /
960    /// iteration entry points.
961    ///
962    /// # Errors
963    ///
964    /// - Pager / catalog errors propagated.
965    #[doc(hidden)]
966    pub fn snapshot_descriptor(&self, collection: &str) -> Result<Option<CollectionDescriptor>> {
967        read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)
968    }
969
970    /// **FFI shim**: borrow the wrapped obj-core read txn. Used by
971    /// [`libobj`](../../libobj/index.html) iterators that need
972    /// snapshot-aware B-tree access.
973    #[doc(hidden)]
974    #[must_use]
975    pub fn inner(&self) -> &obj_core::ReadTxn<'db, FileHandle> {
976        &self.inner
977    }
978
979    /// **FFI shim**: resolve an `Active` index descriptor by name
980    /// on `collection`. Used by libobj's range / find_unique /
981    /// count paths.
982    ///
983    /// # Errors
984    ///
985    /// - [`Error::CollectionNotFound`] if the collection is absent.
986    /// - [`Error::IndexNotFound`] if the index is unknown or
987    ///   `DroppedPending`.
988    /// - Pager / catalog errors propagated.
989    #[doc(hidden)]
990    pub fn snapshot_index_descriptor(
991        &self,
992        collection: &str,
993        index: &str,
994    ) -> Result<obj_core::IndexDescriptor> {
995        let descriptor =
996            self.snapshot_descriptor(collection)?
997                .ok_or_else(|| Error::CollectionNotFound {
998                    name: collection.to_owned(),
999                })?;
1000        let entry = descriptor.indexes.iter().find(|d| d.name == index);
1001        match entry {
1002            Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
1003            _ => Err(Error::IndexNotFound {
1004                collection: collection.to_owned(),
1005                name: index.to_owned(),
1006            }),
1007        }
1008    }
1009
1010    /// **FFI shim**: count every doc in `collection` snapshot-
1011    /// consistently against the read txn's pinned LSN.
1012    ///
1013    /// # Errors
1014    ///
1015    /// As [`Self::snapshot_descriptor`] plus pager / B-tree.
1016    #[doc(hidden)]
1017    pub fn count_all_raw(&self, collection: &str) -> Result<u64> {
1018        let target = self.resolve_read_target(collection)?;
1019        let descriptor = target.collection_descriptor(collection)?;
1020        count_via_btree_range_full(target.env, target.snapshot, descriptor.primary_root)
1021    }
1022
1023    /// **FFI shim**: walk an index B-tree by raw-byte key range
1024    /// and collect the matching `(Id, raw_payload)` pairs. The
1025    /// caller is responsible for encoding `lower` / `upper` per the
1026    /// M7 order-preserving encoding.
1027    ///
1028    /// `lower_bound` / `upper_bound` use Rust's `std::ops::Bound`
1029    /// shape (Included / Excluded / Unbounded).
1030    ///
1031    /// Materialises every result in a `Vec` — the result set is
1032    /// bounded by [`obj_core::btree::MAX_RANGE_NODES`] inherited
1033    /// from `BTree::range`. The libobj iterator yields these one
1034    /// at a time.
1035    ///
1036    /// # Errors
1037    ///
1038    /// - [`Error::IndexNotFound`] / [`Error::CollectionNotFound`].
1039    /// - Pager / B-tree errors propagated.
1040    #[doc(hidden)]
1041    pub fn index_range_raw(
1042        &self,
1043        collection: &str,
1044        index: &str,
1045        lower: std::ops::Bound<Vec<u8>>,
1046        upper: std::ops::Bound<Vec<u8>>,
1047    ) -> Result<Vec<(Id, Vec<u8>)>> {
1048        let target = self.resolve_read_target(collection)?;
1049        let index_descriptor = target.index_descriptor(collection, index)?;
1050        let collection_descriptor = target.collection_descriptor(collection)?;
1051        let (start, end) =
1052            crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1053        let entries = collect_index_range_entries(
1054            target.env,
1055            target.snapshot,
1056            index_descriptor.root_page_id,
1057            start,
1058            end,
1059        )?;
1060        materialize_id_payload_pairs(
1061            target.env,
1062            target.snapshot,
1063            &collection_descriptor,
1064            &index_descriptor,
1065            entries,
1066        )
1067    }
1068
1069    /// **Engine API**: walk an index B-tree by raw-byte key range and
1070    /// collect the matching `(Id, type_version, raw_payload)` rows.
1071    /// Companion to [`Self::index_range_raw`] that ALSO surfaces each
1072    /// record's stored `type_version` (read from the per-doc record
1073    /// header), so a typed range decode can dispatch schema migration at
1074    /// the version each record was actually written under — exactly as
1075    /// [`Self::find_unique_with_version`] does for the single-key path.
1076    ///
1077    /// `lower` / `upper` follow the same raw-byte convention as
1078    /// [`Self::index_range_raw`]; the result set is bounded identically.
1079    ///
1080    /// # Errors
1081    ///
1082    /// As [`Self::index_range_raw`].
1083    #[doc(hidden)]
1084    pub fn index_range_raw_with_version(
1085        &self,
1086        collection: &str,
1087        index: &str,
1088        lower: std::ops::Bound<Vec<u8>>,
1089        upper: std::ops::Bound<Vec<u8>>,
1090    ) -> Result<Vec<(Id, u32, Vec<u8>)>> {
1091        let target = self.resolve_read_target(collection)?;
1092        let index_descriptor = target.index_descriptor(collection, index)?;
1093        let collection_descriptor = target.collection_descriptor(collection)?;
1094        let (start, end) =
1095            crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1096        let entries = collect_index_range_entries(
1097            target.env,
1098            target.snapshot,
1099            index_descriptor.root_page_id,
1100            start,
1101            end,
1102        )?;
1103        materialize_id_version_payload_rows(
1104            target.env,
1105            target.snapshot,
1106            &collection_descriptor,
1107            &index_descriptor,
1108            entries,
1109        )
1110    }
1111
1112    /// **FFI shim**: count index B-tree entries inside `range`.
1113    /// `lower` / `upper` follow the same raw-byte convention as
1114    /// [`Self::index_range_raw`].
1115    ///
1116    /// # Errors
1117    ///
1118    /// As [`Self::index_range_raw`].
1119    #[doc(hidden)]
1120    pub fn count_index_range_raw(
1121        &self,
1122        collection: &str,
1123        index: &str,
1124        lower: std::ops::Bound<Vec<u8>>,
1125        upper: std::ops::Bound<Vec<u8>>,
1126    ) -> Result<u64> {
1127        let target = self.resolve_read_target(collection)?;
1128        let index_descriptor = target.index_descriptor(collection, index)?;
1129        let (start, end) =
1130            crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1131        let entries = collect_index_range_entries(
1132            target.env,
1133            target.snapshot,
1134            index_descriptor.root_page_id,
1135            start,
1136            end,
1137        )?;
1138        u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1139            reason: "index range entry count exceeds u64",
1140        })
1141    }
1142
1143    /// **FFI shim**: single-key lookup against a `Unique` index.
1144    /// Returns the matched `(Id, payload)` or `Ok(None)`.
1145    ///
1146    /// `key_bytes` is the index key, pre-encoded by the caller per
1147    /// the M7 order-preserving scheme.
1148    ///
1149    /// Forwards to [`Self::find_unique_with_version`] and discards
1150    /// the stored version.
1151    ///
1152    /// # Errors
1153    ///
1154    /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
1155    /// - As [`Self::index_range_raw`].
1156    #[doc(hidden)]
1157    pub fn find_unique_raw(
1158        &self,
1159        collection: &str,
1160        index: &str,
1161        key_bytes: &[u8],
1162    ) -> Result<Option<(Id, Vec<u8>)>> {
1163        Ok(self
1164            .find_unique_with_version(collection, index, key_bytes)?
1165            .map(|(id, payload, _version)| (id, payload)))
1166    }
1167
1168    /// **Engine API**: single-key lookup against a `Unique` index,
1169    /// returning the matched `(Id, payload, type_version)` or
1170    /// `Ok(None)`. Companion to [`Self::get_with_version`] for the
1171    /// typed find path.
1172    ///
1173    /// # Errors
1174    ///
1175    /// As [`Self::find_unique_raw`].
1176    #[doc(hidden)]
1177    pub fn find_unique_with_version(
1178        &self,
1179        collection: &str,
1180        index: &str,
1181        key_bytes: &[u8],
1182    ) -> Result<Option<(Id, Vec<u8>, u32)>> {
1183        let target = self.resolve_read_target(collection)?;
1184        let index_descriptor = target.index_descriptor(collection, index)?;
1185        if index_descriptor.kind != obj_core::IndexKind::Unique {
1186            return Err(Error::IndexNotUnique {
1187                collection: collection.to_owned(),
1188                name: index.to_owned(),
1189            });
1190        }
1191        let id_bytes = {
1192            let pager = lock_pager(target.env)?;
1193            let root = PageId::new(index_descriptor.root_page_id)
1194                .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1195            BTree::<FileHandle>::get_via_snapshot(&pager, target.snapshot, root, key_bytes)?
1196        };
1197        match id_bytes {
1198            Some(bytes) => {
1199                let id = Id::from_be_bytes(&bytes).ok_or(Error::Corruption { page_id: 0 })?;
1200                match self.get_with_version(collection, id)? {
1201                    Some((payload, version)) => Ok(Some((id, payload, version))),
1202                    None => Err(Error::Corruption { page_id: 0 }),
1203                }
1204            }
1205            None => Ok(None),
1206        }
1207    }
1208
1209    /// Open a typed handle to the collection `T` lives in.
1210    ///
1211    /// Read-only: returns [`Error::CollectionNotFound`] if the
1212    /// collection has never been registered AT THE SNAPSHOT'S
1213    /// PINNED LSN.
1214    ///
1215    /// If `T::COLLECTION` is of the form `<namespace>.<name>`, the
1216    /// txn dispatches against the attached database registered under
1217    /// `<namespace>` instead of the calling Db.
1218    ///
1219    /// # Errors
1220    ///
1221    /// - [`Error::CollectionNotFound`] if `T::COLLECTION` is not
1222    ///   registered in the catalog as-of the snapshot's pinned LSN.
1223    /// - [`Error::CollectionNamespaceUnknown`] if `T::COLLECTION`
1224    ///   carries a namespace prefix that is not attached.
1225    /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
1226    pub fn collection<T: Document>(&self) -> Result<Collection<'_, T>> {
1227        Collection::open_readonly(self)
1228    }
1229}
1230
1231/// The `(env, snapshot, lookup_name)` triple a raw-bytes read shim
1232/// should read through, produced by [`ReadTxn::resolve_read_target`].
1233///
1234/// For a bare collection name the fields point at the calling Db's
1235/// own env / pinned snapshot; for a `<ns>.<tail>` name they point at
1236/// the attached read-only Db registered under `<ns>` and `lookup_name`
1237/// is the namespace-stripped `<tail>`. Every field borrows the
1238/// `ReadTxn` for `'a`, so the descriptor / B-tree reads stay pinned to
1239/// the same snapshot for the whole shim call.
1240struct ReadTarget<'a> {
1241    env: &'a TxnEnv<FileHandle>,
1242    snapshot: &'a ReaderSnapshot<FileHandle>,
1243    lookup_name: &'a str,
1244}
1245
1246impl ReadTarget<'_> {
1247    /// Resolve the collection descriptor for [`Self::lookup_name`]
1248    /// against this target's snapshot, surfacing
1249    /// [`Error::CollectionNotFound`] (under the ORIGINAL,
1250    /// possibly-namespaced name) when the collection is absent.
1251    fn collection_descriptor(&self, original: &str) -> Result<CollectionDescriptor> {
1252        read_descriptor_via_snapshot(self.env, self.snapshot, self.lookup_name)?.ok_or_else(|| {
1253            Error::CollectionNotFound {
1254                name: original.to_owned(),
1255            }
1256        })
1257    }
1258
1259    /// Resolve the `Active` index descriptor named `index` on
1260    /// [`Self::lookup_name`] against this target's snapshot. The
1261    /// `original` (possibly-namespaced) name is reported in
1262    /// [`Error::CollectionNotFound`] / [`Error::IndexNotFound`] so
1263    /// the caller sees the name it asked for.
1264    fn index_descriptor(&self, original: &str, index: &str) -> Result<obj_core::IndexDescriptor> {
1265        let descriptor = self.collection_descriptor(original)?;
1266        let entry = descriptor.indexes.iter().find(|d| d.name == index);
1267        match entry {
1268            Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
1269            _ => Err(Error::IndexNotFound {
1270                collection: original.to_owned(),
1271                name: index.to_owned(),
1272            }),
1273        }
1274    }
1275}
1276
1277/// Per-attached-database read context carried inside a [`ReadTxn`].
1278/// Pins one [`ReaderSnapshot`] against the attached database for the
1279/// duration of the calling Db's read transaction.
1280pub(crate) struct AttachedReadCtx {
1281    /// Calling-side stable reference to the attached env. Cloned
1282    /// from the [`AttachedDb`]'s `env` at txn-begin time so the
1283    /// `ReadTxn` does not retain a borrow on the calling Db's
1284    /// attached-registry mutex.
1285    pub(crate) env: Arc<TxnEnv<FileHandle>>,
1286    /// Snapshot pinned at txn-begin against `env`.
1287    pub(crate) snapshot: ReaderSnapshot<FileHandle>,
1288}
1289
1290/// One attached read-only database registered on the calling
1291/// [`crate::Db`] under a namespace.
1292///
1293/// Created by [`crate::Db::attach`]; stored inside the calling Db's
1294/// `attached: Arc<Mutex<HashMap<String, AttachedDb>>>` registry.
1295/// Removed by [`crate::Db::detach`] (or by the calling Db's drop,
1296/// which transitively drops all attachments).
1297pub(crate) struct AttachedDb {
1298    /// Cloned `Arc<TxnEnv>` of the attached db, so read transactions
1299    /// pin snapshots without re-locking the registry.
1300    pub(crate) env: Arc<TxnEnv<FileHandle>>,
1301    /// Calling-side keepalive for the attached `crate::Db`. The
1302    /// underscore prefix marks it as held-for-side-effect: dropping
1303    /// this `_db` releases file locks and any other resources the
1304    /// attached open acquired.
1305    pub(crate) _db: crate::Db,
1306}
1307
1308/// #93 — fold a committed txn's staged reconciled `(collection,
1309/// version)` keys into the shared per-process `reconciled` set. Called
1310/// by [`WriteTxn::commit`] only AFTER the WAL commit has landed, so the
1311/// shared cache is never poisoned by a rolled-back lazy-create.
1312///
1313/// A no-op (no lock acquired) when nothing was staged — the common
1314/// path for a txn that opened only already-reconciled collections, so
1315/// the fast "already reconciled" path pays nothing extra.
1316///
1317/// A poisoned `reconciled` mutex maps to [`Error::Busy`] (Rule 7); the
1318/// loop is bounded by the staged-key count (one entry per distinct
1319/// `(collection, version)` lazily reconciled in the txn — Rule 2).
1320fn promote_reconciled(
1321    reconciled: &Mutex<HashSet<(String, u32)>>,
1322    staged: HashSet<(String, u32)>,
1323) -> Result<()> {
1324    if staged.is_empty() {
1325        return Ok(());
1326    }
1327    let mut shared = reconciled.lock().map_err(|_| Error::Busy {
1328        kind: obj_core::LockKind::WriterInProcess,
1329    })?;
1330    shared.extend(staged);
1331    Ok(())
1332}
1333
1334/// Acquire the catalog mutex; convert a poison error into a
1335/// `WriterInProcess` Busy.  Helper shared by the public txn wrappers
1336/// and the [`Collection`] internals.
1337pub(crate) fn lock_catalog(
1338    catalog: &Mutex<Catalog<FileHandle>>,
1339) -> Result<std::sync::MutexGuard<'_, Catalog<FileHandle>>> {
1340    catalog.lock().map_err(|_| Error::Busy {
1341        kind: obj_core::LockKind::WriterInProcess,
1342    })
1343}
1344
1345// ---------- raw-bytes helpers (FFI) -------------------------------
1346
1347/// Reject a write against a namespaced collection. Attached
1348/// databases are read-only through the calling Db (M11 #93).
1349fn reject_namespaced_write(collection: &str) -> Result<()> {
1350    if let (Some(namespace), tail) = crate::db::split_namespace(collection) {
1351        return Err(Error::AttachedDatabaseIsReadOnly {
1352            namespace: namespace.to_owned(),
1353            collection: tail.to_owned(),
1354        });
1355    }
1356    Ok(())
1357}
1358
1359/// Acquire the env's pager mutex; map poison into Busy.
1360fn lock_pager(env: &TxnEnv<FileHandle>) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1361    env.pager().lock().map_err(|_| Error::Busy {
1362        kind: obj_core::LockKind::WriterInProcess,
1363    })
1364}
1365
1366/// Collect the de-duplicated set of index names touched by either the
1367/// `old` (remove) or `new` (add) entry list, preserving first-seen
1368/// order. Bounded by `old.len() + new.len()` — the caller's supplied
1369/// entry count (Rule 2).
1370fn touched_index_names(old: &[(String, Vec<u8>)], new: &[(String, Vec<u8>)]) -> Vec<String> {
1371    let mut names: Vec<String> = Vec::new();
1372    for (name, _key) in old.iter().chain(new.iter()) {
1373        if !names.iter().any(|n| n == name) {
1374            names.push(name.clone());
1375        }
1376    }
1377    names
1378}
1379
1380/// Gather every field-encoded key in `entries` whose index name
1381/// equals `index_name`, wrapped as [`EncodedIndexKey`] for the
1382/// composition seam. Order follows the caller's entry order.
1383fn keys_for_index(entries: &[(String, Vec<u8>)], index_name: &str) -> Vec<EncodedIndexKey> {
1384    entries
1385        .iter()
1386        .filter(|(name, _key)| name == index_name)
1387        .map(|(_name, key)| EncodedIndexKey::from_bytes(key.clone()))
1388        .collect()
1389}
1390
1391/// Resolve `index_name` to its `Active` descriptor index on
1392/// `descriptor`, then maintain that one index B-tree by diffing the
1393/// `old` field keys against the `new` ones through the shared
1394/// non-generic composition seam
1395/// ([`crate::index_maint::maintain_index_from_keys`]).
1396///
1397/// An unknown or non-`Active` index name is [`Error::IndexNotFound`]
1398/// — the raw write refuses rather than silently dropping the entry.
1399fn maintain_one_raw_index(
1400    pager: &mut Pager<FileHandle>,
1401    descriptor: &mut CollectionDescriptor,
1402    collection: &str,
1403    index_name: &str,
1404    old: &[(String, Vec<u8>)],
1405    new: &[(String, Vec<u8>)],
1406    id: Id,
1407) -> Result<()> {
1408    let idx = descriptor
1409        .indexes
1410        .iter()
1411        .position(|d| d.name == index_name && d.status == IndexStatus::Active)
1412        .ok_or_else(|| Error::IndexNotFound {
1413            collection: collection.to_owned(),
1414            name: index_name.to_owned(),
1415        })?;
1416    let spec = crate::index_maint::descriptor_to_spec(&descriptor.indexes[idx])?;
1417    let old_keys = keys_for_index(old, index_name);
1418    let new_keys = keys_for_index(new, index_name);
1419    crate::index_maint::maintain_index_from_keys(
1420        pager, descriptor, idx, collection, &spec, &old_keys, &new_keys, id,
1421    )
1422}
1423
1424/// Ensure a (raw-bytes) collection exists, lazy-creating an empty
1425/// primary B-tree on first call. Used by the C ABI's insert /
1426/// upsert paths. Distinct from `Collection::open_or_create` because
1427/// raw-bytes writes do NOT participate in the typed-reconciliation
1428/// path — no `T::indexes()` to walk.
1429fn ensure_collection_raw(
1430    inner: &obj_core::WriteTxn<'_, FileHandle>,
1431    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1432    name: &str,
1433) -> Result<CollectionDescriptor> {
1434    let mut pager = inner.lock_pager()?;
1435    let mut catalog_guard = lock_catalog(catalog)?;
1436    if let Some(d) = catalog_guard.get(&mut pager, name)? {
1437        return Ok(d);
1438    }
1439    let tree = BTree::<FileHandle>::empty(&mut pager)?;
1440    let descriptor = CollectionDescriptor::new(0, tree.root().get(), RAW_BYTES_TYPE_VERSION);
1441    let _id = catalog_guard.insert(&mut pager, name, descriptor)?;
1442    catalog_guard
1443        .get(&mut pager, name)?
1444        .ok_or(Error::Corruption { page_id: 0 })
1445}
1446
1447/// Look up the descriptor for `name`, returning
1448/// `Err(CollectionNotFound)` if absent. Used by `update` / `delete`
1449/// where lazy-create would mask the caller's mistake.
1450fn catalog_get_required(
1451    inner: &obj_core::WriteTxn<'_, FileHandle>,
1452    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1453    name: &str,
1454) -> Result<CollectionDescriptor> {
1455    let mut pager = inner.lock_pager()?;
1456    let catalog_guard = lock_catalog(catalog)?;
1457    catalog_guard
1458        .get(&mut pager, name)?
1459        .ok_or_else(|| Error::CollectionNotFound {
1460            name: name.to_owned(),
1461        })
1462}
1463
1464/// Snapshot-aware descriptor lookup on the read side. Returns
1465/// `Ok(None)` when the collection is absent at the snapshot's
1466/// pinned LSN.
1467fn read_descriptor_via_snapshot(
1468    env: &TxnEnv<FileHandle>,
1469    snapshot: &ReaderSnapshot<FileHandle>,
1470    name: &str,
1471) -> Result<Option<CollectionDescriptor>> {
1472    let pager = lock_pager(env)?;
1473    Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1474}
1475
1476/// Open a primary-tree handle from a descriptor's `primary_root`.
1477fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1478    let root_pid =
1479        PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1480    BTree::<FileHandle>::open(pager, root_pid)
1481}
1482
1483/// Wrap a raw payload with the on-disk [`DocumentHeader`] carrying
1484/// the caller-supplied `type_version`. The header stamps
1485/// `collection_id` (so a cross-collection forgery is detectable on
1486/// read), `type_version`, `payload_len`, and a CRC32C of the
1487/// payload.
1488///
1489/// The on-disk bytes produced here are byte-identical to what
1490/// [`obj_core::codec::encode`] emits for the same logical payload +
1491/// `type_version` — this is the key plumbing point that closes the
1492/// cross-language header-level interop gap (#13).
1493///
1494/// Returns [`Error::DocumentTooLarge`] if `payload.len() + 16` would
1495/// not fit inline in a B-tree leaf — mirrors
1496/// [`obj_core::codec::encode`]'s overflow handling.
1497fn wrap_raw_payload_with_version(
1498    collection_id: u32,
1499    payload: &[u8],
1500    type_version: u32,
1501) -> Result<Vec<u8>> {
1502    let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
1503        len: payload.len(),
1504        max: MAX_INLINE_DOC,
1505    })?;
1506    let total = DOC_HEADER_SIZE
1507        .checked_add(payload.len())
1508        .ok_or(Error::DocumentTooLarge {
1509            len: usize::MAX,
1510            max: MAX_INLINE_DOC,
1511        })?;
1512    if total > MAX_INLINE_DOC {
1513        return Err(Error::DocumentTooLarge {
1514            len: total,
1515            max: MAX_INLINE_DOC,
1516        });
1517    }
1518    let header = DocumentHeader {
1519        collection_id,
1520        type_version,
1521        payload_len,
1522        payload_crc32c: crc32c(payload),
1523    };
1524    let mut out = Vec::with_capacity(total);
1525    header.write_to(&mut out);
1526    out.extend_from_slice(payload);
1527    Ok(out)
1528}
1529
1530/// Strip the per-doc header and return `(payload, type_version)`.
1531/// Validates the header's `collection_id`, total length, and
1532/// payload CRC32C — surfaces [`Error::Corruption`] /
1533/// [`Error::CollectionIdMismatch`] on any mismatch. Exposes the
1534/// stored `type_version` so the typed read path can dispatch on
1535/// it directly.
1536fn strip_raw_payload_with_version(
1537    bytes: &[u8],
1538    expected_collection_id: u32,
1539) -> Result<(Vec<u8>, u32)> {
1540    let header = DocumentHeader::read_from(bytes)?;
1541    if header.collection_id != expected_collection_id {
1542        return Err(Error::CollectionIdMismatch {
1543            expected: expected_collection_id,
1544            found: header.collection_id,
1545        });
1546    }
1547    let payload_len =
1548        usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
1549    let total = DOC_HEADER_SIZE
1550        .checked_add(payload_len)
1551        .ok_or(Error::Corruption { page_id: 0 })?;
1552    if bytes.len() != total {
1553        return Err(Error::Corruption { page_id: 0 });
1554    }
1555    let payload = &bytes[DOC_HEADER_SIZE..total];
1556    if crc32c(payload) != header.payload_crc32c {
1557        return Err(Error::Corruption { page_id: 0 });
1558    }
1559    Ok((payload.to_vec(), header.type_version))
1560}
1561
1562/// Count every entry in a primary B-tree without decoding the
1563/// records. Used by the FFI `count_all_raw` path.
1564///
1565/// Snapshot-pinned (M12 #12): the full-tree scan resolves every page
1566/// read as-of the read txn's `snapshot`, so a concurrent writer's
1567/// post-snapshot inserts/deletes cannot perturb the count — it stays
1568/// consistent with the read txn's pinned LSN.
1569fn count_via_btree_range_full(
1570    env: &TxnEnv<FileHandle>,
1571    snapshot: &ReaderSnapshot<FileHandle>,
1572    primary_root: u64,
1573) -> Result<u64> {
1574    let pager = lock_pager(env)?;
1575    let root = PageId::new(primary_root)
1576        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1577    let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, ..)?;
1578    let mut n: u64 = 0;
1579    for step in iter {
1580        let _ = step?;
1581        n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1582            reason: "primary tree entry count exceeds u64",
1583        })?;
1584    }
1585    Ok(n)
1586}
1587
1588/// Walk an index B-tree by raw-byte key range and return every
1589/// (`full_key`, `value_bytes`) entry inside the range. Used by the
1590/// FFI `index_range_raw` / `count_index_range_raw` paths.
1591///
1592/// Snapshot-pinned (M12 #12): the descent and leaf-scan resolve every
1593/// page read as-of the read txn's `snapshot`, so a concurrent writer's
1594/// post-snapshot index entries do not leak into the range/count — the
1595/// enumeration stays consistent with the read txn's pinned LSN.
1596fn collect_index_range_entries(
1597    env: &TxnEnv<FileHandle>,
1598    snapshot: &ReaderSnapshot<FileHandle>,
1599    index_root: u64,
1600    start: std::ops::Bound<Vec<u8>>,
1601    end: std::ops::Bound<Vec<u8>>,
1602) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1603    let pager = lock_pager(env)?;
1604    let root =
1605        PageId::new(index_root).ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1606    let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, (start, end))?;
1607    let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1608    for step in iter {
1609        out.push(step?);
1610    }
1611    Ok(out)
1612}
1613
1614/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1615/// `(Id, raw_payload)` pairs. For `Unique` indexes the id is the
1616/// VALUE; for other kinds it is the trailing 8 bytes of the key
1617/// (see `docs/format.md` § Index key encoding).
1618///
1619/// Snapshot-aware: each primary lookup goes through the read
1620/// txn's pinned snapshot so the result set is consistent across
1621/// the call.
1622fn materialize_id_payload_pairs(
1623    env: &TxnEnv<FileHandle>,
1624    snapshot: &ReaderSnapshot<FileHandle>,
1625    collection: &CollectionDescriptor,
1626    index: &obj_core::IndexDescriptor,
1627    entries: Vec<(Vec<u8>, Vec<u8>)>,
1628) -> Result<Vec<(Id, Vec<u8>)>> {
1629    let rows = materialize_id_version_payload_rows(env, snapshot, collection, index, entries)?;
1630    Ok(rows
1631        .into_iter()
1632        .map(|(id, _version, payload)| (id, payload))
1633        .collect())
1634}
1635
1636/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1637/// `(Id, type_version, raw_payload)` rows — the version-carrying form
1638/// of [`materialize_id_payload_pairs`]. The stored `type_version` comes
1639/// from each record's header (via [`strip_raw_payload_with_version`]),
1640/// letting the typed value-form `index_range` decode dispatch migration
1641/// at the version each record was written under. De-duplication +
1642/// id-resolution semantics are identical to the pairs form.
1643fn materialize_id_version_payload_rows(
1644    env: &TxnEnv<FileHandle>,
1645    snapshot: &ReaderSnapshot<FileHandle>,
1646    collection: &CollectionDescriptor,
1647    index: &obj_core::IndexDescriptor,
1648    entries: Vec<(Vec<u8>, Vec<u8>)>,
1649) -> Result<Vec<(Id, u32, Vec<u8>)>> {
1650    let mut out: Vec<(Id, u32, Vec<u8>)> = Vec::with_capacity(entries.len());
1651    let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
1652    let primary_root = PageId::new(collection.primary_root)
1653        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1654    let pager = lock_pager(env)?;
1655    for (full_key, value) in entries {
1656        let id_u64 = index_entry_id(index.kind, &full_key, &value)?;
1657        if index.kind == obj_core::IndexKind::Each && !emitted.insert(id_u64) {
1658            continue;
1659        }
1660        if index.kind != obj_core::IndexKind::Each {
1661            emitted.insert(id_u64);
1662        }
1663        let id = Id::try_new(id_u64).ok_or(Error::Corruption { page_id: 0 })?;
1664        let primary_bytes = BTree::<FileHandle>::get_via_snapshot(
1665            &pager,
1666            snapshot,
1667            primary_root,
1668            &id.to_be_bytes(),
1669        )?
1670        .ok_or(Error::Corruption { page_id: 0 })?;
1671        let (payload, version) =
1672            strip_raw_payload_with_version(&primary_bytes, collection.collection_id)?;
1673        out.push((id, version, payload));
1674    }
1675    Ok(out)
1676}
1677
1678/// Extract the document `Id` (as `u64`) an index entry points at. For
1679/// `Unique` indexes the id is the entry VALUE; for every other kind it
1680/// is the trailing 8 bytes of the full key (see `docs/format.md`
1681/// § Index key encoding). Shared by both materialisers.
1682fn index_entry_id(kind: obj_core::IndexKind, full_key: &[u8], value: &[u8]) -> Result<u64> {
1683    if kind == obj_core::IndexKind::Unique {
1684        return Ok(Id::from_be_bytes(value)
1685            .ok_or(Error::Corruption { page_id: 0 })?
1686            .get());
1687    }
1688    if full_key.len() < 8 {
1689        return Err(Error::Corruption { page_id: 0 });
1690    }
1691    Ok(Id::from_be_bytes(&full_key[full_key.len() - 8..])
1692        .ok_or(Error::Corruption { page_id: 0 })?
1693        .get())
1694}