Skip to main content

obj/
txn.rs

1//! Public `WriteTxn` / `ReadTxn` types.
2//!
3//! Thin wrappers over `obj_core::txn::{WriteTxn, ReadTxn}` that
4//! attach a [`Catalog`] reference (the obj-core txn types are
5//! catalog-agnostic; the catalog is the obj crate's responsibility).
6//!
7//! The catalog handle is `Arc<Mutex<Catalog<FileHandle>>>`.  Lock
8//! ordering: **always acquire the pager mutex (via the txn env)
9//! BEFORE the catalog mutex**.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, Mutex};
13
14use obj_core::btree::BTree;
15use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
16use obj_core::index::EncodedIndexKey;
17use obj_core::pager::checksum::crc32c;
18use obj_core::pager::page::PageId;
19use obj_core::pager::Pager;
20use obj_core::platform::FileHandle;
21use obj_core::{
22    Catalog, CollectionDescriptor, Document, Error, Id, IndexStatus, ReaderSnapshot, Result, TxnEnv,
23};
24
25use crate::collection::Collection;
26
27/// `type_version` stamped on documents written via the C ABI raw-
28/// bytes path. The C caller has no Rust `Document::VERSION`; we
29/// stamp 1 so the value is recognisable in dump output and the
30/// existing schema-version-from-future / migration logic still
31/// applies when a Rust-typed reader opens the same collection.
32///
33/// Bumping this constant is breaking for any consumer that has
34/// written raw-bytes data with the old value, so leave at 1
35/// pre-1.0.
36pub(crate) const RAW_BYTES_TYPE_VERSION: u32 = 1;
37
38/// Public write transaction.
39///
40/// Acquired by [`crate::Db::transaction`].  Holds the in-process
41/// write-serialization mutex + cross-process `WRITER_LOCK` for its
42/// entire lifetime.  `commit` / `rollback` consume `self`; dropping
43/// without explicitly committing rolls back automatically.
44pub struct WriteTxn<'db> {
45    pub(crate) inner: obj_core::WriteTxn<'db, FileHandle>,
46    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
47    /// Per-process cache of `(collection_name)` pairs whose
48    /// `T::indexes()` reconciliation has already run. Reconciliation
49    /// is idempotent but expensive (a catalog walk + index
50    /// declarations); caching keeps the first
51    /// `WriteTxn::collection::<T>()` call per-process per-collection
52    /// as the only one that pays the cost.
53    ///
54    /// #93 — membership is promoted into this SHARED set ONLY after a
55    /// successful [`Self::commit`]. During a txn the names live in the
56    /// per-txn [`Self::reconciled_staged`] set instead, so a
57    /// rolled-back first-ever txn never poisons this set into skipping
58    /// reconciliation on a later (committed) txn in the same process.
59    pub(crate) reconciled: Arc<Mutex<HashSet<String>>>,
60    /// #93 — per-transaction staged set of collection names whose
61    /// `T::indexes()` reconciliation has run INSIDE this (not-yet-
62    /// committed) txn. The skip-check in
63    /// [`crate::collection::reconcile_indexes_once`] is `shared ∪
64    /// staged`, so a second handle of the same collection in one txn
65    /// still skips the (idempotent) catalog walk — but the names are
66    /// only folded into the shared [`Self::reconciled`] set by
67    /// [`Self::commit`] AFTER the WAL commit succeeds. On rollback /
68    /// drop this set is discarded with NO shared-set mutation, so a
69    /// rolled-back lazy-create leaves the shared cache untouched and
70    /// the next txn re-reconciles correctly.
71    ///
72    /// Not behind a mutex: a `WriteTxn` is single-threaded (it holds
73    /// the write-serialization lock for its whole life and is borrowed
74    /// `&mut` by every write), so interior mutability via the staging
75    /// helpers is sufficient.
76    pub(crate) reconciled_staged: HashSet<String>,
77    /// #90 — batch-aware catalog flush. Per-transaction cache of the
78    /// LIVE [`CollectionDescriptor`] for every collection touched by
79    /// a write, keyed by collection name. This is the SOLE mid-txn
80    /// source of truth for `next_id`, `primary_root`, and each
81    /// index's `root_page_id`: every write bumps / advances these
82    /// IN-MEMORY rather than rewriting the catalog B-tree per doc.
83    /// [`Self::commit`] flushes each entry back through
84    /// `Catalog::update` exactly ONCE (see
85    /// [`Self::flush_descriptors`]). On rollback / drop the cache is
86    /// discarded with no catalog-tree side effects.
87    ///
88    /// The `Arc<Mutex<_>>` is cloned into each [`Collection`]'s
89    /// `WriteRef` so two handles of the same collection opened in one
90    /// txn share the single entry.
91    pub(crate) descriptors: crate::collection::DescriptorCache,
92}
93
94impl<'db> WriteTxn<'db> {
95    /// Construct a `WriteTxn` directly from its three pieces.
96    /// Public so the FFI layer ([`libobj`](../../libobj/index.html))
97    /// can build an owned write txn whose lifetime extends past a
98    /// single `Db::transaction` closure call.
99    ///
100    /// User-Rust callers should reach for `Db::transaction` — the
101    /// closure shape handles commit / rollback / drop semantics
102    /// correctly without needing direct construction.
103    #[doc(hidden)]
104    #[must_use]
105    pub fn from_parts(
106        inner: obj_core::WriteTxn<'db, FileHandle>,
107        catalog: Arc<Mutex<Catalog<FileHandle>>>,
108        reconciled: Arc<Mutex<HashSet<String>>>,
109    ) -> Self {
110        Self {
111            inner,
112            catalog,
113            reconciled,
114            reconciled_staged: HashSet::new(),
115            descriptors: crate::collection::new_descriptor_cache(),
116        }
117    }
118
119    pub(crate) fn new(
120        inner: obj_core::WriteTxn<'db, FileHandle>,
121        catalog: Arc<Mutex<Catalog<FileHandle>>>,
122        reconciled: Arc<Mutex<HashSet<String>>>,
123    ) -> Self {
124        Self::from_parts(inner, catalog, reconciled)
125    }
126
127    /// Open a typed handle to the collection `T` lives in.
128    ///
129    /// Lazily creates the catalog row + an empty primary B-tree on
130    /// first call for a given `T` inside the current process.  The
131    /// catalog mutation is staged in the same WAL transaction as
132    /// the user's subsequent writes — a rolled-back txn leaves no
133    /// half-created collection.
134    ///
135    /// # Errors
136    ///
137    /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
138    /// - Any error the pager / B-tree / postcard codec returns.
139    pub fn collection<T: Document>(&mut self) -> Result<Collection<'_, T>> {
140        // M11 #93: namespaced collections live in attached
141        // read-only databases. Reject writes eagerly at handle-
142        // open time so the caller does not chase an opaque "no
143        // such collection" error later.
144        if let (Some(namespace), tail) = crate::db::split_namespace(T::COLLECTION) {
145            return Err(Error::AttachedDatabaseIsReadOnly {
146                namespace: namespace.to_owned(),
147                collection: tail.to_owned(),
148            });
149        }
150        Collection::open_or_create(self)
151    }
152
153    /// Commit the transaction.
154    ///
155    /// #90: flushes every cached [`CollectionDescriptor`] back to the
156    /// catalog (one `Catalog::update` per touched collection) BEFORE
157    /// the WAL commit, so the coalesced `next_id` / `primary_root` /
158    /// index-root advances land durably in the same transaction as the
159    /// document + index writes. A flush failure aborts the commit (the
160    /// `?` propagates and `self` drops, rolling the WAL back) rather
161    /// than committing a half-flushed catalog.
162    ///
163    /// #93: AFTER the WAL commit succeeds, fold this txn's staged
164    /// reconciled-collection names into the shared per-process
165    /// [`Self::reconciled`] set, so the expensive `T::indexes()`
166    /// reconciliation is skipped for those collections on later txns.
167    /// Promotion is deliberately POST-commit: a rolled-back txn never
168    /// reaches here, so it cannot poison the shared cache into skipping
169    /// reconciliation against a catalog whose index rows it just rolled
170    /// back. A poisoned `reconciled` mutex maps to `Error::Busy` (Rule
171    /// 7) but does NOT un-commit the durable WAL state.
172    ///
173    /// # Errors
174    ///
175    /// As [`obj_core::WriteTxn::commit`], plus any catalog / pager /
176    /// postcard error surfaced by the descriptor flush, plus
177    /// [`Error::Busy`] if the shared `reconciled` mutex is poisoned at
178    /// promotion time (after the commit has already landed durably).
179    pub fn commit(self) -> Result<()> {
180        self.flush_descriptors()?;
181        // Move the pieces out before consuming `inner`: `commit`
182        // takes `self` by value, and `self.inner.commit()` moves the
183        // inner txn, so the shared/staged handles must be captured
184        // first.
185        let Self {
186            inner,
187            reconciled,
188            reconciled_staged,
189            ..
190        } = self;
191        inner.commit()?;
192        promote_reconciled(&reconciled, reconciled_staged)
193    }
194
195    /// #90: persist every cached descriptor back to the catalog
196    /// B-tree exactly once. Called by [`Self::commit`] before the WAL
197    /// commit. Iterates the per-txn descriptor cache (one entry per
198    /// touched collection — Rule 2 bound is the touched-collection
199    /// count) and issues one `Catalog::update` apiece, propagating the
200    /// first failure via `?` so a partial flush aborts the commit.
201    ///
202    /// Lock order matches every write path: pager BEFORE catalog (the
203    /// descriptor-cache lock is acquired and released first, so it is
204    /// never held across the pager/catalog locks).
205    fn flush_descriptors(&self) -> Result<()> {
206        let entries: Vec<(String, CollectionDescriptor)> = {
207            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
208            if cache.is_empty() {
209                return Ok(());
210            }
211            cache
212                .iter()
213                .map(|(name, descriptor)| (name.clone(), descriptor.clone()))
214                .collect()
215        };
216        let mut pager = lock_pager(self.inner.env())?;
217        let mut catalog = lock_catalog(&self.catalog)?;
218        for (name, descriptor) in &entries {
219            catalog.update(&mut pager, name, descriptor)?;
220        }
221        Ok(())
222    }
223
224    /// Roll the transaction back.
225    ///
226    /// # Errors
227    ///
228    /// As [`obj_core::WriteTxn::rollback`].
229    pub fn rollback(self) -> Result<()> {
230        self.inner.rollback()
231    }
232
233    /// **FFI shim**: insert a raw-bytes document into `collection`,
234    /// returning the freshly-allocated [`Id`].
235    ///
236    /// The payload is stored as-is; the on-disk record carries the
237    /// standard 16-byte [`DocumentHeader`] with
238    /// `type_version = RAW_BYTES_TYPE_VERSION` and a CRC32C of the
239    /// payload. The collection is lazy-created in the same WAL
240    /// transaction if it does not already exist.
241    ///
242    /// **Index maintenance does NOT run** on the raw-bytes path —
243    /// the C ABI's caller has no schema introspection. Documents
244    /// inserted through this path are invisible to indexes built
245    /// by typed [`Document`] writers, until a Rust-side
246    /// `WriteTxn::collection::<T>()` rewrites them.
247    ///
248    /// Forwards to [`Self::insert_with_version`] with
249    /// `type_version = RAW_BYTES_TYPE_VERSION`. Callers that have
250    /// a meaningful schema version to stamp (e.g. obj-py's typed
251    /// path, which knows `cls.__obj_version__`) should call
252    /// [`Self::insert_with_version`] directly.
253    ///
254    /// # Errors
255    ///
256    /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
257    ///   collections (attached dbs are read-only).
258    /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
259    ///   B-tree inline cap.
260    /// - Pager / catalog errors propagated.
261    #[doc(hidden)]
262    pub fn insert_raw_bytes(&mut self, collection: &str, payload: &[u8]) -> Result<Id> {
263        self.insert_with_version(collection, payload, RAW_BYTES_TYPE_VERSION)
264    }
265
266    /// **Engine API**: insert a raw-bytes document into `collection`
267    /// with a caller-supplied `type_version` stamped in the per-doc
268    /// header. Returns the freshly-allocated [`Id`].
269    ///
270    /// This is the byte-identity entry point for cross-language
271    /// writers (obj-py's typed `Db.insert(instance)` calls this
272    /// with `cls.__obj_version__`, matching what Rust's
273    /// `#[derive(Document)]` stamps via [`obj_core::codec::encode`]).
274    /// Apart from the version source, the behaviour is identical to
275    /// [`Self::insert_raw_bytes`].
276    ///
277    /// **Index maintenance does NOT run** — same caveat as the
278    /// raw-bytes shim. Use [`Self::collection`] for typed writes
279    /// that need index maintenance.
280    ///
281    /// # Errors
282    ///
283    /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
284    ///   collections.
285    /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
286    ///   B-tree inline cap.
287    /// - Pager / catalog errors propagated.
288    #[doc(hidden)]
289    pub fn insert_with_version(
290        &mut self,
291        collection: &str,
292        payload: &[u8],
293        type_version: u32,
294    ) -> Result<Id> {
295        reject_namespaced_write(collection)?;
296        let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
297        let mut pager = lock_pager(self.inner.env())?;
298        let catalog = lock_catalog(&self.catalog)?;
299        // #90: the per-txn descriptor cache is the sole mid-txn source
300        // of truth — `next_id` is an in-memory bump, `primary_root`
301        // advances in the cache, and NO per-doc `Catalog::update`
302        // fires (the single flush is deferred to commit). This keeps
303        // the raw-bytes path coherent with the typed path if both
304        // touch the same collection in one transaction.
305        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
306        let descriptor =
307            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
308        let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || collection.to_owned())?;
309        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
310        let key = id.to_be_bytes();
311        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
312        tree.insert(&mut pager, &key, &bytes)?;
313        descriptor.primary_root = tree.root().get();
314        Ok(id)
315    }
316
317    /// **FFI shim**: fetch the raw payload of the document at `id`
318    /// in `collection`. Returns `Ok(None)` if absent. The returned
319    /// `Vec<u8>` is the payload only (the 16-byte per-doc header
320    /// is stripped).
321    ///
322    /// Forwards to [`Self::get_with_version`] and discards the
323    /// stored version. Use [`Self::get_with_version`] directly when
324    /// the caller needs the header's `type_version` (e.g. obj-py's
325    /// typed read path, which dispatches schema migration on it).
326    ///
327    /// # Errors
328    ///
329    /// - [`Error::CollectionNotFound`] if the collection is unknown.
330    /// - [`Error::Corruption`] if the on-disk record is malformed.
331    /// - Pager / catalog errors propagated.
332    #[doc(hidden)]
333    pub fn get_raw_bytes(&mut self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
334        Ok(self
335            .get_with_version(collection, id)?
336            .map(|(payload, _version)| payload))
337    }
338
339    /// **Engine API**: fetch the raw payload AND stored
340    /// `type_version` of the document at `id` in `collection`.
341    /// Returns `Ok(None)` if absent.
342    ///
343    /// Companion read accessor for the version-aware write path
344    /// ([`Self::insert_with_version`]) — used by obj-py's typed
345    /// read pipeline to dispatch directly on the stored header
346    /// version instead of the historical try-decode-walk heuristic.
347    ///
348    /// # Errors
349    ///
350    /// As [`Self::get_raw_bytes`].
351    #[doc(hidden)]
352    pub fn get_with_version(&mut self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
353        // #90: a write-side raw get must descend the LIVE primary root
354        // — prefer the per-txn cache (which carries any uncommitted
355        // primary-root advance from a prior raw write in this txn) and
356        // fall back to the catalog tree if the collection has not been
357        // written in the txn.
358        let descriptor = self.live_descriptor_required(collection)?;
359        let mut pager = lock_pager(self.inner.env())?;
360        let tree = btree_handle(&pager, descriptor.primary_root)?;
361        let key = id.to_be_bytes();
362        match tree.get(&mut pager, &key)? {
363            Some(bytes) => Ok(Some(strip_raw_payload_with_version(
364                &bytes,
365                descriptor.collection_id,
366            )?)),
367            None => Ok(None),
368        }
369    }
370
371    /// #90: resolve the LIVE descriptor for `collection`, preferring
372    /// the per-txn cache (with its in-memory root advances) over the
373    /// catalog tree. Returns `CollectionNotFound` if neither has it.
374    /// Returns an owned clone so the caller does not hold the cache
375    /// lock across the subsequent pager work.
376    fn live_descriptor_required(&self, collection: &str) -> Result<CollectionDescriptor> {
377        {
378            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
379            if let Some(descriptor) = cache.get(collection) {
380                return Ok(descriptor.clone());
381            }
382        }
383        catalog_get_required(&self.inner, &self.catalog, collection)
384    }
385
386    /// **FFI shim**: update the document at `id` in `collection`
387    /// to `payload` bytes. Returns [`Error::DocumentNotFound`] if
388    /// the id is absent.
389    ///
390    /// Forwards to [`Self::update_with_version`] with
391    /// `type_version = RAW_BYTES_TYPE_VERSION`.
392    ///
393    /// # Errors
394    ///
395    /// - [`Error::DocumentNotFound`] if `id` does not exist.
396    /// - [`Error::AttachedDatabaseIsReadOnly`] / [`Error::DocumentTooLarge`]
397    ///   etc as [`Self::insert_raw_bytes`].
398    #[doc(hidden)]
399    pub fn update_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
400        self.update_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
401    }
402
403    /// **Engine API**: update the document at `id` in `collection`
404    /// to `payload` bytes, stamping the per-doc header's
405    /// `type_version` with the caller-supplied value.
406    ///
407    /// Companion to [`Self::insert_with_version`] for the typed
408    /// write path.
409    ///
410    /// # Errors
411    ///
412    /// As [`Self::update_raw_bytes`].
413    #[doc(hidden)]
414    pub fn update_with_version(
415        &mut self,
416        collection: &str,
417        id: Id,
418        payload: &[u8],
419        type_version: u32,
420    ) -> Result<()> {
421        reject_namespaced_write(collection)?;
422        // #90: route the descriptor through the per-txn cache so the
423        // primary-root advance coalesces into the single commit flush.
424        let exists = self.collection_exists(collection)?;
425        if !exists {
426            return Err(Error::CollectionNotFound {
427                name: collection.to_owned(),
428            });
429        }
430        let mut pager = lock_pager(self.inner.env())?;
431        let catalog = lock_catalog(&self.catalog)?;
432        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
433        let descriptor =
434            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
435        let key = id.to_be_bytes();
436        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
437        if tree.get(&mut pager, &key)?.is_none() {
438            // The collection name is owned (`String`); the
439            // DocumentNotFound variant's `collection` is `&'static
440            // str` (designed for typed Document::COLLECTION). Use a
441            // bespoke variant when widening would land — for v0 the
442            // closest match is Corruption with a synthetic page_id,
443            // but that's misleading; use InvalidArgument with the
444            // standard "not found" semantics surfaced by the caller.
445            return Err(Error::CollectionNotFound {
446                name: format!("{collection}#{}", id.get()),
447            });
448        }
449        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
450        tree.delete(&mut pager, &key)?;
451        tree.insert(&mut pager, &key, &bytes)?;
452        descriptor.primary_root = tree.root().get();
453        Ok(())
454    }
455
456    /// **FFI shim**: delete the document at `id` in `collection`.
457    /// Returns `Ok(true)` if it existed, `Ok(false)` if not.
458    ///
459    /// # Errors
460    ///
461    /// Pager / catalog errors propagated.
462    #[doc(hidden)]
463    pub fn delete_raw_bytes(&mut self, collection: &str, id: Id) -> Result<bool> {
464        reject_namespaced_write(collection)?;
465        if !self.collection_exists(collection)? {
466            return Err(Error::CollectionNotFound {
467                name: collection.to_owned(),
468            });
469        }
470        let mut pager = lock_pager(self.inner.env())?;
471        let catalog = lock_catalog(&self.catalog)?;
472        // #90: advance the primary root in the per-txn cache; the
473        // single catalog flush is deferred to commit.
474        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
475        let descriptor =
476            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
477        let key = id.to_be_bytes();
478        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
479        let removed = tree.delete(&mut pager, &key)?;
480        descriptor.primary_root = tree.root().get();
481        Ok(removed)
482    }
483
484    /// **FFI shim**: insert-or-replace the document at `id` in
485    /// `collection` to `payload` bytes.
486    ///
487    /// Forwards to [`Self::upsert_with_version`] with
488    /// `type_version = RAW_BYTES_TYPE_VERSION`.
489    ///
490    /// # Errors
491    ///
492    /// As [`Self::insert_raw_bytes`].
493    #[doc(hidden)]
494    pub fn upsert_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
495        self.upsert_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
496    }
497
498    /// **Engine API**: insert-or-replace the document at `id` in
499    /// `collection`, stamping the per-doc header's `type_version`
500    /// with the caller-supplied value. Companion to
501    /// [`Self::insert_with_version`] for the typed upsert path.
502    ///
503    /// # Errors
504    ///
505    /// As [`Self::insert_with_version`].
506    #[doc(hidden)]
507    pub fn upsert_with_version(
508        &mut self,
509        collection: &str,
510        id: Id,
511        payload: &[u8],
512        type_version: u32,
513    ) -> Result<()> {
514        reject_namespaced_write(collection)?;
515        let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
516        let mut pager = lock_pager(self.inner.env())?;
517        let catalog = lock_catalog(&self.catalog)?;
518        // #90: advance the primary root in the per-txn cache; flush
519        // once at commit.
520        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
521        let descriptor =
522            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
523        let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
524        let key = id.to_be_bytes();
525        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
526        let _ = tree.delete(&mut pager, &key)?;
527        tree.insert(&mut pager, &key, &bytes)?;
528        descriptor.primary_root = tree.root().get();
529        Ok(())
530    }
531
532    // ---------- index-maintaining raw-bytes writes (FFI) ----------
533
534    /// **Engine API**: insert a raw-bytes document into `collection`
535    /// AND maintain the named secondary indexes from caller-supplied
536    /// field-encoded keys. Returns the freshly-allocated [`Id`].
537    ///
538    /// Unlike [`Self::insert_raw_bytes`] (primary-only), this is the
539    /// schema-bearing raw write: the C ABI cannot reflect index keys
540    /// out of an opaque payload, so the CALLER supplies one
541    /// `(index_name, field_key)` entry per index value, where
542    /// `field_key` is the order-preserving encoding of the indexed
543    /// field (produced by `obj_core::index::encode_field` —
544    /// `libobj::obj_index_key_encode` wraps it). obj does the
545    /// kind-specific STORAGE-key composition (append the `Id` suffix
546    /// for `Standard` / `Each` / `Composite`; use the field key as-is
547    /// with the `Id` as value + enforce uniqueness for `Unique`),
548    /// matching the typed path byte-for-byte.
549    ///
550    /// Atomicity: the primary insert + every index maintenance lands
551    /// inside the same WAL transaction. An unknown index name
552    /// ([`Error::IndexNotFound`]) or a uniqueness violation
553    /// ([`Error::UniqueConstraintViolation`]) surfaces via `?` and the
554    /// surrounding [`WriteTxn`] rolls back the staged primary write
555    /// atomically — no half-written index.
556    ///
557    /// # Errors
558    ///
559    /// - [`Error::IndexNotFound`] if an entry names an index that does
560    ///   not exist or is not `Active` on `collection`.
561    /// - [`Error::UniqueConstraintViolation`] if a `Unique` entry's
562    ///   key already maps to a different document.
563    /// - As [`Self::insert_with_version`] (namespaced / too-large /
564    ///   pager / catalog).
565    pub fn insert_raw_indexed(
566        &mut self,
567        collection: &str,
568        payload: &[u8],
569        type_version: u32,
570        entries: &[(String, Vec<u8>)],
571    ) -> Result<Id> {
572        let id = self.insert_with_version(collection, payload, type_version)?;
573        self.maintain_raw_indexes(collection, id, &[], entries)?;
574        Ok(id)
575    }
576
577    /// **Engine API**: update the document at `id` in `collection` to
578    /// `payload` AND move its secondary-index entries from the OLD
579    /// caller-supplied field keys to the NEW ones.
580    ///
581    /// obj cannot re-derive the OLD index keys from the stored opaque
582    /// bytes, so the caller MUST supply BOTH the `remove` set (the
583    /// field keys the document indexed under before this update) and
584    /// the `add` set (the field keys it indexes under after). Each is
585    /// one `(index_name, field_key)` entry per index value. The
586    /// kind-specific composition + uniqueness enforcement matches
587    /// [`Self::insert_raw_indexed`].
588    ///
589    /// Atomicity: the primary update + every index removal/insertion
590    /// lands in the same WAL transaction; any error rolls the whole
591    /// thing back.
592    ///
593    /// # Errors
594    ///
595    /// - [`Error::CollectionNotFound`] if `id` does not exist.
596    /// - [`Error::IndexNotFound`] / [`Error::UniqueConstraintViolation`]
597    ///   as [`Self::insert_raw_indexed`].
598    /// - As [`Self::update_with_version`].
599    pub fn update_raw_indexed(
600        &mut self,
601        collection: &str,
602        id: Id,
603        payload: &[u8],
604        type_version: u32,
605        remove: &[(String, Vec<u8>)],
606        add: &[(String, Vec<u8>)],
607    ) -> Result<()> {
608        self.update_with_version(collection, id, payload, type_version)?;
609        self.maintain_raw_indexes(collection, id, remove, add)?;
610        Ok(())
611    }
612
613    /// **Engine API**: delete the document at `id` in `collection`
614    /// AND remove its secondary-index entries given the caller-
615    /// supplied OLD field keys. Returns `Ok(true)` if the primary
616    /// record existed, `Ok(false)` if not.
617    ///
618    /// As with [`Self::update_raw_indexed`], obj cannot re-derive the
619    /// index keys from stored bytes, so the caller supplies the
620    /// `remove` set (one `(index_name, field_key)` per indexed value).
621    /// The index removals always run (even on `Ok(false)`) so a caller
622    /// can repair a known-stale index entry; this mirrors the typed
623    /// `Collection::delete` which also diffs against the supplied OLD
624    /// key set regardless of primary presence.
625    ///
626    /// # Errors
627    ///
628    /// - [`Error::IndexNotFound`] if a `remove` entry names an unknown
629    ///   / non-`Active` index.
630    /// - As [`Self::delete_raw_bytes`].
631    pub fn delete_raw_indexed(
632        &mut self,
633        collection: &str,
634        id: Id,
635        remove: &[(String, Vec<u8>)],
636    ) -> Result<bool> {
637        let removed = self.delete_raw_bytes(collection, id)?;
638        self.maintain_raw_indexes(collection, id, remove, &[])?;
639        Ok(removed)
640    }
641
642    /// Apply the per-index removal (`old`) + addition (`new`) churn
643    /// for a raw-bytes write, composing the storage key per index
644    /// kind via the shared non-generic seam
645    /// [`crate::index_maint::maintain_index_from_keys`].
646    ///
647    /// Resolves each `(index_name, field_key)` entry to its `Active`
648    /// [`obj_core::IndexDescriptor`] by name, groups the OLD and NEW
649    /// field keys per index, and maintains every index touched by
650    /// either set. The (possibly COW-advanced) descriptor is persisted
651    /// back to the catalog once. Runs entirely under the pager +
652    /// catalog locks held for the whole call, inside the live WAL
653    /// transaction — so a mid-way error rolls back atomically.
654    fn maintain_raw_indexes(
655        &mut self,
656        collection: &str,
657        id: Id,
658        old: &[(String, Vec<u8>)],
659        new: &[(String, Vec<u8>)],
660    ) -> Result<()> {
661        if old.is_empty() && new.is_empty() {
662            return Ok(());
663        }
664        let mut pager = lock_pager(self.inner.env())?;
665        let catalog = lock_catalog(&self.catalog)?;
666        // #90: maintain against the per-txn cached descriptor (the
667        // sole mid-txn source of truth) so the unique pre-check sees
668        // prior eager index writes in this txn and the index-root
669        // advances accumulate in the cache. NO per-call
670        // `Catalog::update` — the single flush is deferred to commit.
671        let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
672        let descriptor =
673            crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
674        let touched = touched_index_names(old, new);
675        for index_name in &touched {
676            maintain_one_raw_index(&mut pager, descriptor, collection, index_name, old, new, id)?;
677        }
678        Ok(())
679    }
680
681    /// #90: does `collection` have a catalog row (either already in
682    /// the per-txn descriptor cache, or in the live catalog tree)?
683    /// Used by the raw update / delete paths to preserve their
684    /// `CollectionNotFound` contract before routing the descriptor
685    /// through the cache.
686    fn collection_exists(&self, collection: &str) -> Result<bool> {
687        {
688            let cache = crate::collection::lock_descriptors(&self.descriptors)?;
689            if cache.contains_key(collection) {
690                return Ok(true);
691            }
692        }
693        let mut pager = lock_pager(self.inner.env())?;
694        let catalog = lock_catalog(&self.catalog)?;
695        Ok(catalog.get(&mut pager, collection)?.is_some())
696    }
697}
698
699/// Public read transaction.  Acquired by
700/// [`crate::Db::read_transaction`].
701///
702/// Carries only the obj-core read-side handle — the writer's live
703/// `Catalog` is NOT held by a `ReadTxn` because reads consult the
704/// snapshot-pinned catalog root via
705/// [`obj_core::Catalog::lookup_via_snapshot`] (M6 #53), not the
706/// live `Catalog.tree.root`.
707///
708/// M11 #93: a `ReadTxn` MAY also carry one `AttachedReadCtx` per
709/// attached database registered on the calling [`crate::Db`]. The
710/// per-attached snapshots are pinned at txn-begin time and released
711/// when the `ReadTxn` drops; reads against `<namespace>.<collection>`
712/// route through them.
713pub struct ReadTxn<'db> {
714    pub(crate) inner: obj_core::ReadTxn<'db, FileHandle>,
715    /// Per-attached-database read contexts, keyed by namespace.
716    /// Populated by [`crate::Db::read_transaction`] before the
717    /// closure runs; emptied when the txn drops.
718    pub(crate) attached: HashMap<String, AttachedReadCtx>,
719}
720
721impl<'db> ReadTxn<'db> {
722    /// Construct a `ReadTxn` from a bare obj-core handle. Public so
723    /// the FFI layer can build an owned read txn whose lifetime
724    /// extends past a single `Db::read_transaction` closure call.
725    ///
726    /// User-Rust callers should reach for `Db::read_transaction`.
727    #[doc(hidden)]
728    #[must_use]
729    pub fn from_parts(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
730        Self {
731            inner,
732            attached: HashMap::new(),
733        }
734    }
735
736    pub(crate) fn new(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
737        Self::from_parts(inner)
738    }
739
740    pub(crate) fn with_attached(
741        inner: obj_core::ReadTxn<'db, FileHandle>,
742        attached: HashMap<String, AttachedReadCtx>,
743    ) -> Self {
744        Self { inner, attached }
745    }
746
747    /// **FFI shim**: fetch the raw payload of the document at `id`
748    /// in `collection`, snapshot-consistent against the read txn's
749    /// pinned LSN. Returns `Ok(None)` if absent.
750    ///
751    /// Forwards to [`Self::get_with_version`] and discards the
752    /// stored version.
753    ///
754    /// # Errors
755    ///
756    /// - [`Error::CollectionNotFound`] if the collection is unknown.
757    /// - [`Error::Corruption`] if the on-disk record is malformed.
758    /// - Pager / catalog errors propagated.
759    #[doc(hidden)]
760    pub fn get_raw_bytes(&self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
761        Ok(self
762            .get_with_version(collection, id)?
763            .map(|(payload, _version)| payload))
764    }
765
766    /// **Engine API**: fetch the raw payload AND stored
767    /// `type_version` of the document at `id` in `collection`,
768    /// snapshot-consistent against the read txn's pinned LSN.
769    /// Returns `Ok(None)` if absent.
770    ///
771    /// Companion read accessor for the version-aware write path
772    /// ([`WriteTxn::insert_with_version`]) — used by obj-py's
773    /// typed read pipeline to dispatch on the stored header
774    /// version instead of the historical try-decode-walk heuristic.
775    ///
776    /// # Errors
777    ///
778    /// As [`Self::get_raw_bytes`].
779    #[doc(hidden)]
780    pub fn get_with_version(&self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
781        let descriptor =
782            read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)?
783                .ok_or_else(|| Error::CollectionNotFound {
784                    name: collection.to_owned(),
785                })?;
786        let pager = lock_pager(self.inner.env())?;
787        let root = PageId::new(descriptor.primary_root)
788            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
789        let key = id.to_be_bytes();
790        let bytes = obj_core::btree::BTree::<FileHandle>::get_via_snapshot(
791            &pager,
792            self.inner.snapshot(),
793            root,
794            &key,
795        )?;
796        match bytes {
797            Some(b) => Ok(Some(strip_raw_payload_with_version(
798                &b,
799                descriptor.collection_id,
800            )?)),
801            None => Ok(None),
802        }
803    }
804
805    /// **FFI shim**: look up the descriptor for `collection`
806    /// against the snapshot. Returns `Ok(None)` if absent.
807    ///
808    /// Used by [`libobj`](../../libobj/index.html) for query /
809    /// iteration entry points.
810    ///
811    /// # Errors
812    ///
813    /// - Pager / catalog errors propagated.
814    #[doc(hidden)]
815    pub fn snapshot_descriptor(&self, collection: &str) -> Result<Option<CollectionDescriptor>> {
816        read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)
817    }
818
819    /// **FFI shim**: borrow the wrapped obj-core read txn. Used by
820    /// [`libobj`](../../libobj/index.html) iterators that need
821    /// snapshot-aware B-tree access.
822    #[doc(hidden)]
823    #[must_use]
824    pub fn inner(&self) -> &obj_core::ReadTxn<'db, FileHandle> {
825        &self.inner
826    }
827
828    /// **FFI shim**: resolve an `Active` index descriptor by name
829    /// on `collection`. Used by libobj's range / find_unique /
830    /// count paths.
831    ///
832    /// # Errors
833    ///
834    /// - [`Error::CollectionNotFound`] if the collection is absent.
835    /// - [`Error::IndexNotFound`] if the index is unknown or
836    ///   `DroppedPending`.
837    /// - Pager / catalog errors propagated.
838    #[doc(hidden)]
839    pub fn snapshot_index_descriptor(
840        &self,
841        collection: &str,
842        index: &str,
843    ) -> Result<obj_core::IndexDescriptor> {
844        let descriptor =
845            self.snapshot_descriptor(collection)?
846                .ok_or_else(|| Error::CollectionNotFound {
847                    name: collection.to_owned(),
848                })?;
849        let entry = descriptor.indexes.iter().find(|d| d.name == index);
850        match entry {
851            Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
852            _ => Err(Error::IndexNotFound {
853                collection: collection.to_owned(),
854                name: index.to_owned(),
855            }),
856        }
857    }
858
859    /// **FFI shim**: count every doc in `collection` snapshot-
860    /// consistently against the read txn's pinned LSN.
861    ///
862    /// # Errors
863    ///
864    /// As [`Self::snapshot_descriptor`] plus pager / B-tree.
865    #[doc(hidden)]
866    pub fn count_all_raw(&self, collection: &str) -> Result<u64> {
867        let descriptor =
868            self.snapshot_descriptor(collection)?
869                .ok_or_else(|| Error::CollectionNotFound {
870                    name: collection.to_owned(),
871                })?;
872        count_via_btree_range_full(
873            self.inner.env(),
874            self.inner.snapshot(),
875            descriptor.primary_root,
876        )
877    }
878
879    /// **FFI shim**: walk an index B-tree by raw-byte key range
880    /// and collect the matching `(Id, raw_payload)` pairs. The
881    /// caller is responsible for encoding `lower` / `upper` per the
882    /// M7 order-preserving encoding.
883    ///
884    /// `lower_bound` / `upper_bound` use Rust's `std::ops::Bound`
885    /// shape (Included / Excluded / Unbounded).
886    ///
887    /// Materialises every result in a `Vec` — the result set is
888    /// bounded by [`obj_core::btree::MAX_RANGE_NODES`] inherited
889    /// from `BTree::range`. The libobj iterator yields these one
890    /// at a time.
891    ///
892    /// # Errors
893    ///
894    /// - [`Error::IndexNotFound`] / [`Error::CollectionNotFound`].
895    /// - Pager / B-tree errors propagated.
896    #[doc(hidden)]
897    pub fn index_range_raw(
898        &self,
899        collection: &str,
900        index: &str,
901        lower: std::ops::Bound<Vec<u8>>,
902        upper: std::ops::Bound<Vec<u8>>,
903    ) -> Result<Vec<(Id, Vec<u8>)>> {
904        let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
905        let collection_descriptor =
906            self.snapshot_descriptor(collection)?
907                .ok_or_else(|| Error::CollectionNotFound {
908                    name: collection.to_owned(),
909                })?;
910        let (start, end) =
911            crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
912        let entries = collect_index_range_entries(
913            self.inner.env(),
914            self.inner.snapshot(),
915            index_descriptor.root_page_id,
916            start,
917            end,
918        )?;
919        materialize_id_payload_pairs(
920            self.inner.env(),
921            self.inner.snapshot(),
922            &collection_descriptor,
923            &index_descriptor,
924            entries,
925        )
926    }
927
928    /// **FFI shim**: count index B-tree entries inside `range`.
929    /// `lower` / `upper` follow the same raw-byte convention as
930    /// [`Self::index_range_raw`].
931    ///
932    /// # Errors
933    ///
934    /// As [`Self::index_range_raw`].
935    #[doc(hidden)]
936    pub fn count_index_range_raw(
937        &self,
938        collection: &str,
939        index: &str,
940        lower: std::ops::Bound<Vec<u8>>,
941        upper: std::ops::Bound<Vec<u8>>,
942    ) -> Result<u64> {
943        let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
944        let (start, end) =
945            crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
946        let entries = collect_index_range_entries(
947            self.inner.env(),
948            self.inner.snapshot(),
949            index_descriptor.root_page_id,
950            start,
951            end,
952        )?;
953        u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
954            reason: "index range entry count exceeds u64",
955        })
956    }
957
958    /// **FFI shim**: single-key lookup against a `Unique` index.
959    /// Returns the matched `(Id, payload)` or `Ok(None)`.
960    ///
961    /// `key_bytes` is the index key, pre-encoded by the caller per
962    /// the M7 order-preserving scheme.
963    ///
964    /// Forwards to [`Self::find_unique_with_version`] and discards
965    /// the stored version.
966    ///
967    /// # Errors
968    ///
969    /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
970    /// - As [`Self::index_range_raw`].
971    #[doc(hidden)]
972    pub fn find_unique_raw(
973        &self,
974        collection: &str,
975        index: &str,
976        key_bytes: &[u8],
977    ) -> Result<Option<(Id, Vec<u8>)>> {
978        Ok(self
979            .find_unique_with_version(collection, index, key_bytes)?
980            .map(|(id, payload, _version)| (id, payload)))
981    }
982
983    /// **Engine API**: single-key lookup against a `Unique` index,
984    /// returning the matched `(Id, payload, type_version)` or
985    /// `Ok(None)`. Companion to [`Self::get_with_version`] for the
986    /// typed find path.
987    ///
988    /// # Errors
989    ///
990    /// As [`Self::find_unique_raw`].
991    #[doc(hidden)]
992    pub fn find_unique_with_version(
993        &self,
994        collection: &str,
995        index: &str,
996        key_bytes: &[u8],
997    ) -> Result<Option<(Id, Vec<u8>, u32)>> {
998        let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
999        if index_descriptor.kind != obj_core::IndexKind::Unique {
1000            return Err(Error::IndexNotUnique {
1001                collection: collection.to_owned(),
1002                name: index.to_owned(),
1003            });
1004        }
1005        let id_bytes = {
1006            let pager = lock_pager(self.inner.env())?;
1007            let root = PageId::new(index_descriptor.root_page_id)
1008                .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1009            BTree::<FileHandle>::get_via_snapshot(&pager, self.inner.snapshot(), root, key_bytes)?
1010        };
1011        match id_bytes {
1012            Some(bytes) => {
1013                let id = Id::from_be_bytes(&bytes).ok_or(Error::Corruption { page_id: 0 })?;
1014                match self.get_with_version(collection, id)? {
1015                    Some((payload, version)) => Ok(Some((id, payload, version))),
1016                    None => Err(Error::Corruption { page_id: 0 }),
1017                }
1018            }
1019            None => Ok(None),
1020        }
1021    }
1022
1023    /// Open a typed handle to the collection `T` lives in.
1024    ///
1025    /// Read-only: returns [`Error::CollectionNotFound`] if the
1026    /// collection has never been registered AT THE SNAPSHOT'S
1027    /// PINNED LSN.
1028    ///
1029    /// If `T::COLLECTION` is of the form `<namespace>.<name>`, the
1030    /// txn dispatches against the attached database registered under
1031    /// `<namespace>` instead of the calling Db.
1032    ///
1033    /// # Errors
1034    ///
1035    /// - [`Error::CollectionNotFound`] if `T::COLLECTION` is not
1036    ///   registered in the catalog as-of the snapshot's pinned LSN.
1037    /// - [`Error::CollectionNamespaceUnknown`] if `T::COLLECTION`
1038    ///   carries a namespace prefix that is not attached.
1039    /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
1040    pub fn collection<T: Document>(&self) -> Result<Collection<'_, T>> {
1041        Collection::open_readonly(self)
1042    }
1043}
1044
1045/// Per-attached-database read context carried inside a [`ReadTxn`].
1046/// Pins one [`ReaderSnapshot`] against the attached database for the
1047/// duration of the calling Db's read transaction.
1048pub(crate) struct AttachedReadCtx {
1049    /// Calling-side stable reference to the attached env. Cloned
1050    /// from the [`AttachedDb`]'s `env` at txn-begin time so the
1051    /// `ReadTxn` does not retain a borrow on the calling Db's
1052    /// attached-registry mutex.
1053    pub(crate) env: Arc<TxnEnv<FileHandle>>,
1054    /// Snapshot pinned at txn-begin against `env`.
1055    pub(crate) snapshot: ReaderSnapshot<FileHandle>,
1056}
1057
1058/// One attached read-only database registered on the calling
1059/// [`crate::Db`] under a namespace.
1060///
1061/// Created by [`crate::Db::attach`]; stored inside the calling Db's
1062/// `attached: Arc<Mutex<HashMap<String, AttachedDb>>>` registry.
1063/// Removed by [`crate::Db::detach`] (or by the calling Db's drop,
1064/// which transitively drops all attachments).
1065pub(crate) struct AttachedDb {
1066    /// Cloned `Arc<TxnEnv>` of the attached db, so read transactions
1067    /// pin snapshots without re-locking the registry.
1068    pub(crate) env: Arc<TxnEnv<FileHandle>>,
1069    /// Calling-side keepalive for the attached `crate::Db`. The
1070    /// underscore prefix marks it as held-for-side-effect: dropping
1071    /// this `_db` releases file locks and any other resources the
1072    /// attached open acquired.
1073    pub(crate) _db: crate::Db,
1074}
1075
1076/// #93 — fold a committed txn's staged reconciled-collection names
1077/// into the shared per-process `reconciled` set. Called by
1078/// [`WriteTxn::commit`] only AFTER the WAL commit has landed, so the
1079/// shared cache is never poisoned by a rolled-back lazy-create.
1080///
1081/// A no-op (no lock acquired) when nothing was staged — the common
1082/// path for a txn that opened only already-reconciled collections, so
1083/// the fast "already reconciled" path pays nothing extra.
1084///
1085/// A poisoned `reconciled` mutex maps to [`Error::Busy`] (Rule 7); the
1086/// loop is bounded by the staged-name count (one entry per distinct
1087/// collection lazily reconciled in the txn — Rule 2).
1088fn promote_reconciled(reconciled: &Mutex<HashSet<String>>, staged: HashSet<String>) -> Result<()> {
1089    if staged.is_empty() {
1090        return Ok(());
1091    }
1092    let mut shared = reconciled.lock().map_err(|_| Error::Busy {
1093        kind: obj_core::LockKind::WriterInProcess,
1094    })?;
1095    shared.extend(staged);
1096    Ok(())
1097}
1098
1099/// Acquire the catalog mutex; convert a poison error into a
1100/// `WriterInProcess` Busy.  Helper shared by the public txn wrappers
1101/// and the [`Collection`] internals.
1102pub(crate) fn lock_catalog(
1103    catalog: &Mutex<Catalog<FileHandle>>,
1104) -> Result<std::sync::MutexGuard<'_, Catalog<FileHandle>>> {
1105    catalog.lock().map_err(|_| Error::Busy {
1106        kind: obj_core::LockKind::WriterInProcess,
1107    })
1108}
1109
1110// ---------- raw-bytes helpers (FFI) -------------------------------
1111
1112/// Reject a write against a namespaced collection. Attached
1113/// databases are read-only through the calling Db (M11 #93).
1114fn reject_namespaced_write(collection: &str) -> Result<()> {
1115    if let (Some(namespace), tail) = crate::db::split_namespace(collection) {
1116        return Err(Error::AttachedDatabaseIsReadOnly {
1117            namespace: namespace.to_owned(),
1118            collection: tail.to_owned(),
1119        });
1120    }
1121    Ok(())
1122}
1123
1124/// Acquire the env's pager mutex; map poison into Busy.
1125fn lock_pager(env: &TxnEnv<FileHandle>) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1126    env.pager().lock().map_err(|_| Error::Busy {
1127        kind: obj_core::LockKind::WriterInProcess,
1128    })
1129}
1130
1131/// Collect the de-duplicated set of index names touched by either the
1132/// `old` (remove) or `new` (add) entry list, preserving first-seen
1133/// order. Bounded by `old.len() + new.len()` — the caller's supplied
1134/// entry count (Rule 2).
1135fn touched_index_names(old: &[(String, Vec<u8>)], new: &[(String, Vec<u8>)]) -> Vec<String> {
1136    let mut names: Vec<String> = Vec::new();
1137    for (name, _key) in old.iter().chain(new.iter()) {
1138        if !names.iter().any(|n| n == name) {
1139            names.push(name.clone());
1140        }
1141    }
1142    names
1143}
1144
1145/// Gather every field-encoded key in `entries` whose index name
1146/// equals `index_name`, wrapped as [`EncodedIndexKey`] for the
1147/// composition seam. Order follows the caller's entry order.
1148fn keys_for_index(entries: &[(String, Vec<u8>)], index_name: &str) -> Vec<EncodedIndexKey> {
1149    entries
1150        .iter()
1151        .filter(|(name, _key)| name == index_name)
1152        .map(|(_name, key)| EncodedIndexKey::from_bytes(key.clone()))
1153        .collect()
1154}
1155
1156/// Resolve `index_name` to its `Active` descriptor index on
1157/// `descriptor`, then maintain that one index B-tree by diffing the
1158/// `old` field keys against the `new` ones through the shared
1159/// non-generic composition seam
1160/// ([`crate::index_maint::maintain_index_from_keys`]).
1161///
1162/// An unknown or non-`Active` index name is [`Error::IndexNotFound`]
1163/// — the raw write refuses rather than silently dropping the entry.
1164fn maintain_one_raw_index(
1165    pager: &mut Pager<FileHandle>,
1166    descriptor: &mut CollectionDescriptor,
1167    collection: &str,
1168    index_name: &str,
1169    old: &[(String, Vec<u8>)],
1170    new: &[(String, Vec<u8>)],
1171    id: Id,
1172) -> Result<()> {
1173    let idx = descriptor
1174        .indexes
1175        .iter()
1176        .position(|d| d.name == index_name && d.status == IndexStatus::Active)
1177        .ok_or_else(|| Error::IndexNotFound {
1178            collection: collection.to_owned(),
1179            name: index_name.to_owned(),
1180        })?;
1181    let spec = crate::index_maint::descriptor_to_spec(&descriptor.indexes[idx])?;
1182    let old_keys = keys_for_index(old, index_name);
1183    let new_keys = keys_for_index(new, index_name);
1184    crate::index_maint::maintain_index_from_keys(
1185        pager, descriptor, idx, collection, &spec, &old_keys, &new_keys, id,
1186    )
1187}
1188
1189/// Ensure a (raw-bytes) collection exists, lazy-creating an empty
1190/// primary B-tree on first call. Used by the C ABI's insert /
1191/// upsert paths. Distinct from `Collection::open_or_create` because
1192/// raw-bytes writes do NOT participate in the typed-reconciliation
1193/// path — no `T::indexes()` to walk.
1194fn ensure_collection_raw(
1195    inner: &obj_core::WriteTxn<'_, FileHandle>,
1196    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1197    name: &str,
1198) -> Result<CollectionDescriptor> {
1199    let mut pager = inner.lock_pager()?;
1200    let mut catalog_guard = lock_catalog(catalog)?;
1201    if let Some(d) = catalog_guard.get(&mut pager, name)? {
1202        return Ok(d);
1203    }
1204    let tree = BTree::<FileHandle>::empty(&mut pager)?;
1205    let descriptor = CollectionDescriptor::new(0, tree.root().get(), RAW_BYTES_TYPE_VERSION);
1206    let _id = catalog_guard.insert(&mut pager, name, descriptor)?;
1207    catalog_guard
1208        .get(&mut pager, name)?
1209        .ok_or(Error::Corruption { page_id: 0 })
1210}
1211
1212/// Look up the descriptor for `name`, returning
1213/// `Err(CollectionNotFound)` if absent. Used by `update` / `delete`
1214/// where lazy-create would mask the caller's mistake.
1215fn catalog_get_required(
1216    inner: &obj_core::WriteTxn<'_, FileHandle>,
1217    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1218    name: &str,
1219) -> Result<CollectionDescriptor> {
1220    let mut pager = inner.lock_pager()?;
1221    let catalog_guard = lock_catalog(catalog)?;
1222    catalog_guard
1223        .get(&mut pager, name)?
1224        .ok_or_else(|| Error::CollectionNotFound {
1225            name: name.to_owned(),
1226        })
1227}
1228
1229/// Snapshot-aware descriptor lookup on the read side. Returns
1230/// `Ok(None)` when the collection is absent at the snapshot's
1231/// pinned LSN.
1232fn read_descriptor_via_snapshot(
1233    env: &TxnEnv<FileHandle>,
1234    snapshot: &ReaderSnapshot<FileHandle>,
1235    name: &str,
1236) -> Result<Option<CollectionDescriptor>> {
1237    let pager = lock_pager(env)?;
1238    Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1239}
1240
1241/// Open a primary-tree handle from a descriptor's `primary_root`.
1242fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1243    let root_pid =
1244        PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1245    BTree::<FileHandle>::open(pager, root_pid)
1246}
1247
1248/// Wrap a raw payload with the on-disk [`DocumentHeader`] carrying
1249/// the caller-supplied `type_version`. The header stamps
1250/// `collection_id` (so a cross-collection forgery is detectable on
1251/// read), `type_version`, `payload_len`, and a CRC32C of the
1252/// payload.
1253///
1254/// The on-disk bytes produced here are byte-identical to what
1255/// [`obj_core::codec::encode`] emits for the same logical payload +
1256/// `type_version` — this is the key plumbing point that closes the
1257/// cross-language header-level interop gap (#13).
1258///
1259/// Returns [`Error::DocumentTooLarge`] if `payload.len() + 16` would
1260/// not fit inline in a B-tree leaf — mirrors
1261/// [`obj_core::codec::encode`]'s overflow handling.
1262fn wrap_raw_payload_with_version(
1263    collection_id: u32,
1264    payload: &[u8],
1265    type_version: u32,
1266) -> Result<Vec<u8>> {
1267    let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
1268        len: payload.len(),
1269        max: MAX_INLINE_DOC,
1270    })?;
1271    let total = DOC_HEADER_SIZE
1272        .checked_add(payload.len())
1273        .ok_or(Error::DocumentTooLarge {
1274            len: usize::MAX,
1275            max: MAX_INLINE_DOC,
1276        })?;
1277    if total > MAX_INLINE_DOC {
1278        return Err(Error::DocumentTooLarge {
1279            len: total,
1280            max: MAX_INLINE_DOC,
1281        });
1282    }
1283    let header = DocumentHeader {
1284        collection_id,
1285        type_version,
1286        payload_len,
1287        payload_crc32c: crc32c(payload),
1288    };
1289    let mut out = Vec::with_capacity(total);
1290    header.write_to(&mut out);
1291    out.extend_from_slice(payload);
1292    Ok(out)
1293}
1294
1295/// Strip the per-doc header and return `(payload, type_version)`.
1296/// Validates the header's `collection_id`, total length, and
1297/// payload CRC32C — surfaces [`Error::Corruption`] /
1298/// [`Error::CollectionIdMismatch`] on any mismatch. Exposes the
1299/// stored `type_version` so the typed read path can dispatch on
1300/// it directly.
1301fn strip_raw_payload_with_version(
1302    bytes: &[u8],
1303    expected_collection_id: u32,
1304) -> Result<(Vec<u8>, u32)> {
1305    let header = DocumentHeader::read_from(bytes)?;
1306    if header.collection_id != expected_collection_id {
1307        return Err(Error::CollectionIdMismatch {
1308            expected: expected_collection_id,
1309            found: header.collection_id,
1310        });
1311    }
1312    let payload_len =
1313        usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
1314    let total = DOC_HEADER_SIZE
1315        .checked_add(payload_len)
1316        .ok_or(Error::Corruption { page_id: 0 })?;
1317    if bytes.len() != total {
1318        return Err(Error::Corruption { page_id: 0 });
1319    }
1320    let payload = &bytes[DOC_HEADER_SIZE..total];
1321    if crc32c(payload) != header.payload_crc32c {
1322        return Err(Error::Corruption { page_id: 0 });
1323    }
1324    Ok((payload.to_vec(), header.type_version))
1325}
1326
1327/// Count every entry in a primary B-tree without decoding the
1328/// records. Used by the FFI `count_all_raw` path.
1329///
1330/// Snapshot-pinned (M12 #12): the full-tree scan resolves every page
1331/// read as-of the read txn's `snapshot`, so a concurrent writer's
1332/// post-snapshot inserts/deletes cannot perturb the count — it stays
1333/// consistent with the read txn's pinned LSN.
1334fn count_via_btree_range_full(
1335    env: &TxnEnv<FileHandle>,
1336    snapshot: &ReaderSnapshot<FileHandle>,
1337    primary_root: u64,
1338) -> Result<u64> {
1339    let pager = lock_pager(env)?;
1340    let root = PageId::new(primary_root)
1341        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1342    let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, ..)?;
1343    let mut n: u64 = 0;
1344    for step in iter {
1345        let _ = step?;
1346        n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1347            reason: "primary tree entry count exceeds u64",
1348        })?;
1349    }
1350    Ok(n)
1351}
1352
1353/// Walk an index B-tree by raw-byte key range and return every
1354/// (`full_key`, `value_bytes`) entry inside the range. Used by the
1355/// FFI `index_range_raw` / `count_index_range_raw` paths.
1356///
1357/// Snapshot-pinned (M12 #12): the descent and leaf-scan resolve every
1358/// page read as-of the read txn's `snapshot`, so a concurrent writer's
1359/// post-snapshot index entries do not leak into the range/count — the
1360/// enumeration stays consistent with the read txn's pinned LSN.
1361fn collect_index_range_entries(
1362    env: &TxnEnv<FileHandle>,
1363    snapshot: &ReaderSnapshot<FileHandle>,
1364    index_root: u64,
1365    start: std::ops::Bound<Vec<u8>>,
1366    end: std::ops::Bound<Vec<u8>>,
1367) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1368    let pager = lock_pager(env)?;
1369    let root =
1370        PageId::new(index_root).ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1371    let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, (start, end))?;
1372    let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1373    for step in iter {
1374        out.push(step?);
1375    }
1376    Ok(out)
1377}
1378
1379/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1380/// `(Id, raw_payload)` pairs. For `Unique` indexes the id is the
1381/// VALUE; for other kinds it is the trailing 8 bytes of the key
1382/// (see `docs/format.md` § Index key encoding).
1383///
1384/// Snapshot-aware: each primary lookup goes through the read
1385/// txn's pinned snapshot so the result set is consistent across
1386/// the call.
1387fn materialize_id_payload_pairs(
1388    env: &TxnEnv<FileHandle>,
1389    snapshot: &ReaderSnapshot<FileHandle>,
1390    collection: &CollectionDescriptor,
1391    index: &obj_core::IndexDescriptor,
1392    entries: Vec<(Vec<u8>, Vec<u8>)>,
1393) -> Result<Vec<(Id, Vec<u8>)>> {
1394    let mut out: Vec<(Id, Vec<u8>)> = Vec::with_capacity(entries.len());
1395    let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
1396    let primary_root = PageId::new(collection.primary_root)
1397        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1398    let pager = lock_pager(env)?;
1399    for (full_key, value) in entries {
1400        let id_u64 = if index.kind == obj_core::IndexKind::Unique {
1401            Id::from_be_bytes(&value)
1402                .ok_or(Error::Corruption { page_id: 0 })?
1403                .get()
1404        } else {
1405            if full_key.len() < 8 {
1406                return Err(Error::Corruption { page_id: 0 });
1407            }
1408            Id::from_be_bytes(&full_key[full_key.len() - 8..])
1409                .ok_or(Error::Corruption { page_id: 0 })?
1410                .get()
1411        };
1412        if index.kind == obj_core::IndexKind::Each && !emitted.insert(id_u64) {
1413            continue;
1414        }
1415        if index.kind != obj_core::IndexKind::Each {
1416            emitted.insert(id_u64);
1417        }
1418        let id = Id::try_new(id_u64).ok_or(Error::Corruption { page_id: 0 })?;
1419        let primary_bytes = BTree::<FileHandle>::get_via_snapshot(
1420            &pager,
1421            snapshot,
1422            primary_root,
1423            &id.to_be_bytes(),
1424        )?
1425        .ok_or(Error::Corruption { page_id: 0 })?;
1426        let (payload, _version) =
1427            strip_raw_payload_with_version(&primary_bytes, collection.collection_id)?;
1428        out.push((id, payload));
1429    }
1430    Ok(out)
1431}