Skip to main content

obj/
collection.rs

1//! `Collection<T>` — typed handle to one collection's primary
2//! B-tree.
3
4use std::borrow::Cow;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::marker::PhantomData;
7use std::ops::Bound;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use obj_core::btree::BTree;
11use obj_core::codec::{decode, encode};
12use obj_core::pager::page::PageId;
13use obj_core::pager::Pager;
14use obj_core::platform::FileHandle;
15use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
16
17/// Boxed iterator alias used by [`Collection::lookup`] and
18/// [`Collection::index_range`]. The iterator borrows from the
19/// enclosing transaction; the `'tx` lifetime is bound on the
20/// `Collection<T>` it was obtained from.
21pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
22
23/// Per-batch refill size for [`IterIndexRange`]. The iterator yields
24/// one `(user_key, T)` pair at a time but pulls index B-tree entries
25/// in fixed-size chunks so the per-step pager-lock acquisition cost
26/// amortises across many `next()` calls. Power-of-ten Rule 3: the
27/// buffer is fixed-size — at ~8 bytes/key plus the trailing 8-byte
28/// id suffix that's ~4 KiB peak for the staged batch. The buffer
29/// does NOT scale with the range's total size.
30const ITER_INDEX_RANGE_BATCH: usize = 256;
31
32/// Per-call cap on the bounded `HashSet<Id>` used by
33/// [`Collection::count_distinct_ids_in_range`] to count unique
34/// document `Id`s under an `Each` index. Power-of-ten Rule 3: the
35/// distinct set is allocation-bounded; exceeding the cap surfaces
36/// [`obj_core::Error::DistinctCountExceeded`] rather than chewing
37/// arbitrary memory. The user can narrow the range via
38/// `.index_range(...)` to fit inside the budget.
39pub const MAX_DISTINCT_IDS: usize = 100_000;
40
41use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
42
43/// #90 — per-transaction descriptor cache. Maps collection name to
44/// its LIVE [`CollectionDescriptor`] (with `next_id`, `primary_root`,
45/// and every index `root_page_id` advanced IN-MEMORY across the
46/// transaction). This is the single mid-txn source of truth for those
47/// roots; [`crate::WriteTxn::commit`] flushes each entry back to the
48/// catalog exactly once. Shared (via `Arc`) between the [`WriteTxn`]
49/// and every [`Collection`] handle opened on it, so two handles of the
50/// same collection observe the same advancing descriptor.
51pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
52
53/// Construct an empty [`DescriptorCache`].
54#[must_use]
55pub(crate) fn new_descriptor_cache() -> DescriptorCache {
56    Arc::new(Mutex::new(HashMap::new()))
57}
58
59/// Acquire the descriptor-cache mutex; map poison into `Busy` (Rule
60/// 7 — no panic on a poisoned lock).
61pub(crate) fn lock_descriptors(
62    cache: &Mutex<HashMap<String, CollectionDescriptor>>,
63) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
64    cache.lock().map_err(|_| Error::Busy {
65        kind: obj_core::LockKind::WriterInProcess,
66    })
67}
68
69/// Resolve the live cached descriptor for `name`, lazily loading it
70/// from the catalog B-tree on first touch in this transaction. After
71/// the first load the catalog tree is NEVER re-read mid-txn for this
72/// collection — every subsequent read/advance goes through the cache
73/// entry, so the unique pre-check and every index-tree open observe
74/// the in-memory-advanced roots (#90 load-bearing invariant).
75///
76/// Returns a mutable borrow into the cache so callers bump `next_id`,
77/// advance `primary_root`, and let `apply_doc_change` advance index
78/// roots in place — all without a per-doc `Catalog::update`.
79pub(crate) fn cached_descriptor_mut<'g>(
80    cache: &'g mut HashMap<String, CollectionDescriptor>,
81    pager: &mut Pager<FileHandle>,
82    catalog: &Catalog<FileHandle>,
83    name: &str,
84) -> Result<&'g mut CollectionDescriptor> {
85    if !cache.contains_key(name) {
86        let descriptor = catalog_get_required(pager, catalog, name)?;
87        cache.insert(name.to_owned(), descriptor);
88    }
89    cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
90}
91
92/// Typed handle to a collection.
93///
94/// Construct via [`crate::WriteTxn::collection`] (lazy-create) or
95/// [`crate::ReadTxn::collection`] (read-only; errors if absent), or
96/// via [`crate::Db::collection`] for a one-shot read-only handle
97/// bound to a runtime collection name (M11 #94 — Phase 1B).
98///
99/// All methods take `&self` because the underlying state lives
100/// behind mutexes on the parent transaction; the handle itself is
101/// stateless beyond the descriptor it caches.
102pub struct Collection<'tx, T: Document> {
103    /// `Mode::Write` carries the [`WriteTxn`] reference;
104    /// `Mode::Read` the [`ReadTxn`] reference; `Mode::Lazy` carries
105    /// a `&Db` and opens a private read transaction on each method
106    /// call.  The [`Collection`]'s lifetime is bound to whichever
107    /// it was constructed from.
108    mode: CollectionMode<'tx>,
109    /// Collection name resolved at construction. For handles built
110    /// via [`crate::WriteTxn::collection`] / [`crate::ReadTxn::collection`]
111    /// this equals `T::COLLECTION`. For handles built via
112    /// [`crate::Db::collection`] this is the caller-supplied runtime
113    /// name (which may differ from `T::COLLECTION`, e.g.
114    /// `"archive.orders"` against a type whose declared `COLLECTION`
115    /// is `"orders"`).
116    //
117    // `collection_name` rather than the lint-suggested `name`
118    // because `name` collides with `IndexDescriptor::name` /
119    // `IndexSpec::name` throughout this module's internals.
120    //
121    // `Cow<'static, str>` (#84): the common `WriteTxn`/`ReadTxn`
122    // open path resolves to `T::COLLECTION` — a `&'static str` —
123    // so it can be stored as `Cow::Borrowed` with NO `to_owned()`
124    // heap allocation per read. The runtime-name path
125    // (`Db::collection`, `open_readonly_named` with a caller name)
126    // stores `Cow::Owned`.
127    #[allow(clippy::struct_field_names)]
128    collection_name: Cow<'static, str>,
129    /// Cached descriptor.  Populated at construction for `Write` /
130    /// `Read` mode; never updated in place — a `Collection<T>`
131    /// reflects the catalog row that existed when the handle was
132    /// opened.  `update` / `delete` inside the same txn re-read the
133    /// descriptor through the catalog lock to capture mutations
134    /// from prior calls in the same txn (e.g. an `insert` that
135    /// advanced `next_id`).
136    ///
137    /// For `Lazy` mode the descriptor is a sentinel — the real
138    /// descriptor is loaded inside each method's private read
139    /// transaction.
140    descriptor: CollectionDescriptor,
141    _phantom: PhantomData<fn() -> T>,
142}
143
144/// Backing-reference inside a [`Collection`].  Encodes whether
145/// the txn is writable.
146enum CollectionMode<'tx> {
147    Write(WriteRef<'tx>),
148    Read(ReadRef<'tx>),
149    /// Read-only handle that opens a private read transaction on
150    /// each method call. Constructed by [`crate::Db::collection`];
151    /// the `'tx` lifetime is the borrow of `&Db`.
152    Lazy(LazyRef<'tx>),
153}
154
155struct LazyRef<'db> {
156    /// Borrowed `Db` — methods open one-shot read transactions on
157    /// it. The `'db` lifetime keeps the borrow alive for as long
158    /// as the [`Collection`] handle.
159    db: &'db crate::Db,
160}
161
162struct WriteRef<'tx> {
163    env: &'tx obj_core::TxnEnv<FileHandle>,
164    catalog: Arc<Mutex<Catalog<FileHandle>>>,
165    /// #90: shared per-txn descriptor cache (cloned from the owning
166    /// [`WriteTxn`]). The single mid-txn source of truth for
167    /// `next_id` / `primary_root` / index roots; flushed once at
168    /// commit.
169    descriptors: DescriptorCache,
170}
171
172struct ReadRef<'tx> {
173    snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
174    env: &'tx obj_core::TxnEnv<FileHandle>,
175}
176
177impl<'tx, T: Document> Collection<'tx, T> {
178    /// Open the collection on the write side, lazy-creating the
179    /// catalog row + an empty primary B-tree on first call, and
180    /// reconciling the type's declared `Document::indexes()` against
181    /// the catalog's stored descriptors (M7 #57) on the first call
182    /// per process per collection.
183    pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
184        // Stage 1: lazy-create the collection row + an empty primary
185        // B-tree if this is the first call for `T` against this
186        // database. The returned descriptor is the pre-reconcile
187        // shape; we re-read after reconciliation below because the
188        // reconciler may have appended index descriptors.
189        let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
190        reconcile_indexes_once::<T>(
191            &tx.inner,
192            &tx.catalog,
193            &tx.reconciled,
194            &mut tx.reconciled_staged,
195        )?;
196        // Re-read the descriptor in case the reconciler mutated the
197        // index roster (the reconciler runs through `Catalog::update`
198        // which rewrites the descriptor's `indexes` vector).
199        let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
200        Ok(Self {
201            mode: CollectionMode::Write(WriteRef {
202                env: tx.inner.env(),
203                catalog: Arc::clone(&tx.catalog),
204                descriptors: Arc::clone(&tx.descriptors),
205            }),
206            // `T::COLLECTION` is `&'static str` (#84): borrow, don't
207            // allocate.
208            collection_name: Cow::Borrowed(T::COLLECTION),
209            descriptor,
210            _phantom: PhantomData,
211        })
212    }
213
214    /// Open the collection on the read side.  Errors if the
215    /// collection has not yet been registered AT THE SNAPSHOT'S
216    /// PINNED LSN — a collection created by a concurrent writer
217    /// AFTER the snapshot was pinned is invisible to this txn (M6
218    /// #53). The descriptor is read by walking the catalog B+tree
219    /// rooted at `snapshot.root_catalog()` (the value captured by
220    /// `Pager::reader_snapshot` at pin time) using the snapshot-
221    /// aware [`Catalog::lookup_via_snapshot`] free function — NOT
222    /// via the writer's live `Catalog.tree.root`, which a concurrent
223    /// writer may have COW-advanced past the snapshot.
224    ///
225    /// M11 #93: if `T::COLLECTION` is of the form
226    /// `<namespace>.<name>`, the lookup dispatches against the
227    /// attached database registered under `<namespace>` instead of
228    /// the calling Db. The attached snapshot is the one pinned by
229    /// `Db::read_transaction` at txn-begin; the attached env is
230    /// passed through the same way.
231    pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
232        // `T::COLLECTION` is `&'static str` — store it as
233        // `Cow::Borrowed` (#84), avoiding the per-read `to_owned()`
234        // heap allocation on the common typed-handle path.
235        Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
236    }
237
238    /// Open the collection on the read side against a caller-supplied
239    /// runtime `name`. Like [`Self::open_readonly`] but the catalog
240    /// lookup uses `name` instead of `T::COLLECTION` — required for
241    /// [`crate::Db::collection`]'s Phase 1B accessor (M11 #94), which
242    /// binds a runtime collection name (e.g. `"archive.orders"`) to
243    /// the typed handle.
244    ///
245    /// Namespace dispatch (`<ns>.<tail>` → attached database) follows
246    /// the same shape as [`Self::open_readonly`].
247    pub(crate) fn open_readonly_named(
248        tx: &'tx ReadTxn<'_>,
249        name: Cow<'static, str>,
250    ) -> Result<Self> {
251        let (namespace, tail) = crate::db::split_namespace(&name);
252        let (env, snapshot, lookup_name): (
253            &'tx obj_core::TxnEnv<FileHandle>,
254            &'tx obj_core::ReaderSnapshot<FileHandle>,
255            &str,
256        ) = match namespace {
257            None => (tx.inner.env(), tx.inner.snapshot(), &name),
258            Some(ns) => {
259                let ctx = tx
260                    .attached
261                    .get(ns)
262                    .ok_or_else(|| Error::CollectionNamespaceUnknown {
263                        namespace: ns.to_owned(),
264                    })?;
265                (ctx.env.as_ref(), &ctx.snapshot, tail)
266            }
267        };
268        let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
269        else {
270            return Err(Error::CollectionNotFound {
271                name: name.into_owned(),
272            });
273        };
274        Ok(Self {
275            mode: CollectionMode::Read(ReadRef { snapshot, env }),
276            // `name` is moved in unchanged — `Cow::Borrowed(&'static)`
277            // for the typed-handle path (no alloc), `Cow::Owned` for
278            // runtime-name handles.
279            collection_name: name,
280            descriptor,
281            _phantom: PhantomData,
282        })
283    }
284
285    /// Construct a deferred-lookup, read-only handle bound to a
286    /// runtime collection name. Each method call opens a private
287    /// read transaction on `db` and dispatches through
288    /// [`Self::open_readonly_named`]. Construction is infallible —
289    /// errors (missing collection, unknown namespace, etc.) surface
290    /// at the first method call.
291    ///
292    /// Used by [`crate::Db::collection`] (Phase 1B, M11 #94).
293    pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
294        Self {
295            mode: CollectionMode::Lazy(LazyRef { db }),
296            collection_name: Cow::Owned(name),
297            // Sentinel descriptor — never read in Lazy mode; the
298            // real descriptor is loaded inside each method's
299            // private read transaction.
300            descriptor: CollectionDescriptor::new(0, 0, 0),
301            _phantom: PhantomData,
302        }
303    }
304
305    /// Cached descriptor (`collection_id`, `primary_root`,
306    /// `type_version`, `next_id` at handle-open time).
307    #[must_use]
308    pub fn descriptor(&self) -> &CollectionDescriptor {
309        &self.descriptor
310    }
311
312    /// #90: the LIVE primary-tree root for a `Write`-mode handle.
313    ///
314    /// Prefers the per-txn descriptor cache (which carries every
315    /// `primary_root` advance from this txn's prior writes) so a
316    /// read-after-write on the same handle inside one transaction
317    /// observes its own uncommitted inserts. Falls back to the
318    /// handle's open-time `primary_root` if this collection has not
319    /// yet been written in the txn (no cache entry).
320    fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
321        let cache = lock_descriptors(&write.descriptors)?;
322        Ok(cache
323            .get(self.collection_name.as_ref())
324            .map_or(self.descriptor.primary_root, |d| d.primary_root))
325    }
326
327    /// #90: the LIVE `root_page_id` for the named `Active` index on a
328    /// `Write`-mode handle. Prefers the per-txn cache so a read after
329    /// an index-mutating write in the same txn descends the advanced
330    /// root; falls back to `fallback` (the handle's open-time
331    /// descriptor entry) when the collection has no cache entry yet.
332    fn write_index_root(
333        &self,
334        write: &WriteRef<'tx>,
335        index_name: &str,
336        fallback: u64,
337    ) -> Result<u64> {
338        let cache = lock_descriptors(&write.descriptors)?;
339        let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
340            return Ok(fallback);
341        };
342        let live = descriptor
343            .indexes
344            .iter()
345            .find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
346            .map_or(fallback, |d| d.root_page_id);
347        Ok(live)
348    }
349
350    /// Insert `doc`.  Returns the freshly-allocated [`Id`].
351    ///
352    /// # Errors
353    ///
354    /// - [`Error::ReadOnly`] if the handle is read-only.
355    /// - Pager / catalog / codec errors propagated.
356    // `doc: T` is taken by value for caller ergonomics (the user
357    // owns the value and gives it to the database).  The codec's
358    // `encode(&T, ...)` takes a reference, so clippy sees the
359    // function body as "owned but borrows only" — accepted as the
360    // intended public-API shape.
361    #[allow(clippy::needless_pass_by_value)]
362    pub fn insert(&self, doc: T) -> Result<Id> {
363        let write = self.write_or_err("insert")?;
364        let name: &str = self.collection_name.as_ref();
365        let mut pager = lock_pager(write.env)?;
366        let catalog = lock_catalog(&write.catalog)?;
367        // #90: the cached descriptor is the SOLE mid-txn source of
368        // truth. `next_id` is an in-memory bump; `primary_root` and
369        // index roots advance in the cache; NO per-doc
370        // `Catalog::update` — the single flush is deferred to commit.
371        let mut cache = lock_descriptors(&write.descriptors)?;
372        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
373        let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
374        let bytes = encode(&doc, descriptor.collection_id)?;
375        let key = id.to_be_bytes();
376        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
377        tree.insert(&mut pager, &key, &bytes)?;
378        descriptor.primary_root = tree.root().get();
379        // Index maintenance lands inside the same WAL transaction as
380        // the primary write, descending the cache's in-memory index
381        // roots. A `UniqueConstraintViolation` here surfaces via `?`
382        // and the surrounding `WriteTxn` rolls back atomically.
383        crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
384        Ok(id)
385    }
386
387    /// Fetch the document at `id`.
388    ///
389    /// On the write side this consults the pager (sees pending
390    /// writes in the current txn).  On the read side it consults
391    /// the snapshot's frozen view.
392    ///
393    /// # Lazy migration (M10 #84)
394    ///
395    /// If the on-disk record was written by an older
396    /// `Document::VERSION` than the current `T::VERSION`, the codec
397    /// walks the stored bytes through the schema registered for
398    /// that version (see `T::historical_schemas()`) and dispatches
399    /// the resulting structured `Dynamic` through `T::migrate`.
400    /// **The migrated bytes are NOT written back to disk.** The
401    /// next [`Collection::get`] re-reads the same v(n) bytes and
402    /// re-runs migration. Only a subsequent
403    /// [`Collection::update`](Self::update) /
404    /// [`Collection::upsert`](Self::upsert) writes the document
405    /// back, at which point the on-disk header records
406    /// `T::VERSION`.
407    ///
408    /// This contract is what allows mixed-version reads to scale:
409    /// a 10⁹-doc collection does not need to be batch-rewritten on
410    /// schema upgrade. Power-of-ten Rule 7: every "migration ran"
411    /// path returns the migrated `T`; no implicit write-back.
412    ///
413    /// # Errors
414    ///
415    /// Pager / B-tree / codec errors propagated. In particular:
416    ///
417    /// - [`Error::SchemaNotRegistered`](obj_core::Error::SchemaNotRegistered)
418    ///   if the stored record carries a `type_version` for which
419    ///   `T::historical_schemas()` has no entry.
420    /// - [`Error::SchemaMigrationNotImplemented`](obj_core::Error::SchemaMigrationNotImplemented)
421    ///   if the registered `T::migrate` returns the default error.
422    pub fn get(&self, id: Id) -> Result<Option<T>> {
423        if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
424            return r;
425        }
426        let key = id.to_be_bytes();
427        let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
428            return Ok(None);
429        };
430        Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
431    }
432
433    /// Read the primary-tree value at `key` against the current
434    /// `Write` / `Read` mode (Lazy mode is dispatched upstream of
435    /// every public caller via [`Self::dispatch_lazy`]).
436    fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
437        match &self.mode {
438            CollectionMode::Write(write) => {
439                // #90: descend the LIVE primary root so a get after an
440                // insert/update in the SAME txn sees its own writes.
441                let primary_root = self.write_primary_root(write)?;
442                let mut pager = lock_pager(write.env)?;
443                let tree = btree_handle(&pager, primary_root)?;
444                tree.get(&mut pager, key)
445            }
446            CollectionMode::Read(read) => {
447                snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
448            }
449            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
450                operation: "internal: lazy-mode primary read",
451            }),
452        }
453    }
454
455    /// Apply `f` to the document at `id`, writing the mutated value
456    /// back.
457    ///
458    /// # Errors
459    ///
460    /// - [`Error::ReadOnly`] on a read-side handle.
461    /// - [`Error::DocumentNotFound`] if `id` is absent.
462    /// - Pager / catalog / codec errors propagated.
463    pub fn update<F>(&self, id: Id, f: F) -> Result<()>
464    where
465        F: FnOnce(&mut T),
466    {
467        let write = self.write_or_err("update")?;
468        let name: &str = self.collection_name.as_ref();
469        let mut pager = lock_pager(write.env)?;
470        let catalog = lock_catalog(&write.catalog)?;
471        // #90: resolve the live cached descriptor (sole mid-txn source
472        // of truth); advance its roots in place, no per-doc flush.
473        let mut cache = lock_descriptors(&write.descriptors)?;
474        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
475        let key = id.to_be_bytes();
476        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
477        let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
478            collection: T::COLLECTION,
479            id: id.get(),
480        })?;
481        let old_value = decode::<T>(&existing, descriptor.collection_id)?;
482        let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
483        f(&mut new_value);
484        let bytes = encode(&new_value, descriptor.collection_id)?;
485        tree.delete(&mut pager, &key)?;
486        tree.insert(&mut pager, &key, &bytes)?;
487        descriptor.primary_root = tree.root().get();
488        crate::index_maint::apply_doc_change::<T>(
489            &mut pager,
490            descriptor,
491            Some(&old_value),
492            Some(&new_value),
493            id,
494        )?;
495        Ok(())
496    }
497
498    /// Delete the document at `id`.  Returns `true` if it existed.
499    ///
500    /// # Errors
501    ///
502    /// - [`Error::ReadOnly`] on a read-side handle.
503    /// - Pager / catalog errors propagated.
504    pub fn delete(&self, id: Id) -> Result<bool> {
505        let write = self.write_or_err("delete")?;
506        let name: &str = self.collection_name.as_ref();
507        let mut pager = lock_pager(write.env)?;
508        let catalog = lock_catalog(&write.catalog)?;
509        // #90: cached descriptor is the sole mid-txn source of truth.
510        let mut cache = lock_descriptors(&write.descriptors)?;
511        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
512        let key = id.to_be_bytes();
513        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
514        // Materialise the old document so the index-maintenance
515        // path can compute the OLD key set and emit per-index
516        // delete entries. If the row does not exist, we still
517        // return `Ok(false)` for API back-compat (no index work).
518        let old_value = match tree.get(&mut pager, &key)? {
519            Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
520            None => None,
521        };
522        let removed = tree.delete(&mut pager, &key)?;
523        descriptor.primary_root = tree.root().get();
524        crate::index_maint::apply_doc_change::<T>(
525            &mut pager,
526            descriptor,
527            old_value.as_ref(),
528            None,
529            id,
530        )?;
531        Ok(removed)
532    }
533
534    /// Insert-or-replace `doc` at `id`.
535    ///
536    /// # Errors
537    ///
538    /// - [`Error::ReadOnly`] on a read-side handle.
539    /// - Pager / catalog / codec errors propagated.
540    // `doc: T` passed by value for ergonomic ownership transfer;
541    // codec takes `&T` so clippy reports a "borrow only" body.
542    #[allow(clippy::needless_pass_by_value)]
543    pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
544        let write = self.write_or_err("upsert")?;
545        let name: &str = self.collection_name.as_ref();
546        let mut pager = lock_pager(write.env)?;
547        let catalog = lock_catalog(&write.catalog)?;
548        // #90: cached descriptor is the sole mid-txn source of truth.
549        let mut cache = lock_descriptors(&write.descriptors)?;
550        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
551        let bytes = encode(&doc, descriptor.collection_id)?;
552        let key = id.to_be_bytes();
553        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
554        // Materialise the prior value (if any) so the index-
555        // maintenance diff sees the OLD key set.
556        let old_value = match tree.get(&mut pager, &key)? {
557            Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
558            None => None,
559        };
560        let _ = tree.delete(&mut pager, &key)?;
561        tree.insert(&mut pager, &key, &bytes)?;
562        descriptor.primary_root = tree.root().get();
563        crate::index_maint::apply_doc_change::<T>(
564            &mut pager,
565            descriptor,
566            old_value.as_ref(),
567            Some(&doc),
568            id,
569        )?;
570        Ok(())
571    }
572
573    /// Look up the single document whose `index_name` key matches
574    /// `key` under a `Unique` index.
575    ///
576    /// Errors with [`Error::IndexNotUnique`] if `index_name` resolves
577    /// to a non-unique index — `find_unique` is *only* defined on
578    /// `Unique` indexes. For `Standard` / `Each` / `Composite` use
579    /// [`Self::lookup`] (which returns an iterator).
580    ///
581    /// Snapshot-aware: on a write-side handle the lookup sees the
582    /// current txn's pending writes; on a read-side handle it sees
583    /// the snapshot's frozen view.
584    ///
585    /// # Errors
586    ///
587    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
588    /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
589    /// - Pager / B-tree / codec errors propagated.
590    pub fn find_unique(
591        &self,
592        index_name: &str,
593        key: impl Into<obj_core::codec::Dynamic>,
594    ) -> Result<Option<T>> {
595        let key_dyn = key.into();
596        if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
597            return r;
598        }
599        let descriptor = self.active_index(index_name)?;
600        if descriptor.kind != obj_core::IndexKind::Unique {
601            return Err(Error::IndexNotUnique {
602                collection: self.collection_name.clone().into_owned(),
603                name: index_name.to_owned(),
604            });
605        }
606        let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
607        let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
608        match id_bytes {
609            Some(bytes) => match Id::from_be_bytes(&bytes) {
610                Some(id) => self.get(id),
611                None => Err(Error::Corruption { page_id: 0 }),
612            },
613            None => Ok(None),
614        }
615    }
616
617    /// Yield every document whose `index_name` key matches `key`.
618    /// Works on `Standard` / `Unique` / `Each` indexes. Returns
619    /// `Err(Error::IndexKindMismatch)`-style guidance for
620    /// `Composite` (use [`Self::index_range`] for tuple-shaped
621    /// keys).
622    ///
623    /// The same document is yielded at most once even if it owns
624    /// multiple matching entries — `Each` indexes can encode the
625    /// same `id` under multiple element keys; we de-dup on emit.
626    ///
627    /// # Errors
628    ///
629    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
630    /// - Pager / B-tree / codec errors propagated.
631    pub fn lookup(
632        &self,
633        index_name: &str,
634        key: impl Into<obj_core::codec::Dynamic>,
635    ) -> Result<IndexIter<'static, T>>
636    where
637        T: Send + 'static,
638    {
639        let key_dyn = key.into();
640        if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
641            return r;
642        }
643        let descriptor = self.active_index(index_name)?;
644        let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
645        let ids = match descriptor.kind {
646            obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
647            obj_core::IndexKind::Standard
648            | obj_core::IndexKind::Each
649            | obj_core::IndexKind::Composite => {
650                self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
651            }
652            // `IndexKind` is `#[non_exhaustive]`; an unknown kind means
653            // this build predates the on-disk index. Refuse rather than
654            // return a wrong (possibly empty) id set.
655            _ => return Err(Error::InvalidArgument("unsupported index kind")),
656        };
657        let resolved = self.resolve_unique_ids(ids)?;
658        Ok(Box::new(resolved.into_iter().map(Ok)))
659    }
660
661    /// Yield `(user_key, doc)` pairs whose index key falls within
662    /// `range`. The bounds are [`Dynamic`](obj_core::codec::Dynamic)
663    /// values — the same ergonomic type [`crate::Query::index_range`]
664    /// takes — encoded internally through the order-preserving field
665    /// encoder ([`obj_core::index::encode_field`]); callers no longer
666    /// hand-encode index-key bytes.
667    ///
668    /// For non-Unique kinds (`Standard` / `Each` / `Composite`) the
669    /// bounds are widened internally so a user-facing
670    /// `Included(x)..=Included(x)` range matches every entry whose
671    /// user-key equals `x` even though the underlying B-tree key
672    /// carries an `id_be8` suffix (see `docs/format.md` § Index key
673    /// encoding § Range-bound widening (non-Unique kinds)).
674    ///
675    /// # Errors
676    ///
677    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
678    /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
679    ///   carries an embedded NUL byte (the order-preserving encoder
680    ///   rejects those).
681    /// - Pager / B-tree / codec errors propagated.
682    pub fn index_range<R>(
683        &self,
684        index_name: &str,
685        range: R,
686    ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
687    where
688        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
689        T: Send + 'static,
690    {
691        let start = encode_dynamic_bound(range.start_bound())?;
692        let end = encode_dynamic_bound(range.end_bound())?;
693        self.index_range_encoded(index_name, start, end)
694    }
695
696    /// Encoded-bytes variant of [`Self::index_range`]. The bounds are
697    /// already the order-preserving field encoding of the user's
698    /// `Dynamic` value(s); this keeps the signature general for
699    /// `Composite` "starts-with" scans and is the entry point the
700    /// query layer / lazy-dispatch recursion call after they have
701    /// done their own encoding.
702    pub(crate) fn index_range_encoded(
703        &self,
704        index_name: &str,
705        start_bound: std::ops::Bound<Vec<u8>>,
706        end_bound: std::ops::Bound<Vec<u8>>,
707    ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
708    where
709        T: Send + 'static,
710    {
711        if let Some(r) = self.dispatch_lazy(|c| {
712            c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
713        }) {
714            return r;
715        }
716        let descriptor = self.active_index(index_name)?;
717        let (start, end) =
718            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
719        let entries = self.collect_range(descriptor, start, end)?;
720        let descriptor_kind = descriptor.kind;
721        let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
722        let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
723        for (full_key, id_bytes_value) in entries {
724            let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
725                out.push(Err(Error::Corruption { page_id: 0 }));
726                continue;
727            };
728            // For Each indexes the same doc may appear multiple
729            // times under different element keys — de-dup on
730            // emission.
731            if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
732                continue;
733            }
734            // For non-unique kinds the B-tree key includes the
735            // trailing 8-byte id suffix; strip it so the caller
736            // sees only the user-key bytes.
737            let user_key = strip_id_suffix(&full_key, descriptor_kind);
738            match self.get(id) {
739                Ok(Some(doc)) => out.push(Ok((user_key, doc))),
740                Ok(None) => {
741                    // Orphan index entry — surface as Corruption.
742                    out.push(Err(Error::Corruption { page_id: 0 }));
743                }
744                Err(e) => out.push(Err(e)),
745            }
746        }
747        Ok(Box::new(out.into_iter()))
748    }
749
750    /// Streaming variant of [`Self::index_range`] (Phase 7A perf
751    /// pass, M14 #14). Yields `(user_key, T)` pairs lazily — the
752    /// returned [`IterIndexRange`] decodes one `T` per `next()` call
753    /// rather than building a `Vec<Result<(_, T)>>` of every match
754    /// up front. The iterator borrows `&'a self`, so it must be
755    /// consumed inside the lifetime of the enclosing
756    /// [`crate::WriteTxn`] / [`crate::ReadTxn`] (or the
757    /// [`crate::Db::collection`] handle, in Lazy mode).
758    ///
759    /// # When to prefer `iter_range` over `index_range`
760    ///
761    /// - **Memory.** `index_range` allocates `O(matches × sizeof(T))`
762    ///   upfront; `iter_range` keeps a fixed-size [`VecDeque`] of
763    ///   `(key, id)` pairs (`ITER_INDEX_RANGE_BATCH = 256` entries)
764    ///   and decodes one `T` at a time. For a 100k-row range with
765    ///   ~500-byte documents that's ~50 MB peak vs. a few KiB.
766    /// - **Latency-to-first-row.** `index_range` decodes every
767    ///   matching document before returning the iterator;
768    ///   `iter_range` returns immediately after the first chunk
769    ///   refill, so the first `next()` returns after one index walk
770    ///   + one primary-tree `get` (rather than `N`).
771    ///
772    /// # When `index_range` is still the right answer
773    ///
774    /// `index_range` returns an `IndexIter<'static, _>` — it can
775    /// escape the `read_transaction` / `transaction` closure that
776    /// produced it. `iter_range` is bound to `&self`, so the
777    /// iterator dies when the [`Collection`] handle dies. If you
778    /// need to return the iterator to outer scope, stick with
779    /// `index_range`.
780    ///
781    /// # Per-row `get`-back design choice
782    ///
783    /// Each `next()` yields `(user_key, T)` by calling
784    /// [`Self::get`] under the hood — i.e. a SECOND B+tree descent
785    /// per row (the first is the index range walk; the second is
786    /// the primary-tree `get(id)`). This is intentional and
787    /// inherited from `index_range`: the index leaf stores only
788    /// the document `id` (8 bytes), not the document bytes. A
789    /// future format-minor bump may add value-in-index storage to
790    /// short-circuit the second descent; that work is pinned to
791    /// post-1.0 (tracked as pit issue #16, "value-in-index
792    /// storage to eliminate `index_range` double-decode").
793    ///
794    /// # Errors
795    ///
796    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
797    /// - Pager / B-tree / codec errors propagated at construction
798    ///   and from each `next()` call.
799    pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
800    where
801        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
802        T: Send + 'static,
803    {
804        let start_bound = encode_dynamic_bound(range.start_bound())?;
805        let end_bound = encode_dynamic_bound(range.end_bound())?;
806        self.iter_range_encoded(index_name, start_bound, end_bound)
807    }
808
809    /// Encoded-bytes variant of [`Self::iter_range`]. Bounds are the
810    /// order-preserving field encoding of the user's `Dynamic`
811    /// value(s); used internally by the lazy-mode fallback path.
812    fn iter_range_encoded<'a>(
813        &'a self,
814        index_name: &str,
815        start_bound: Bound<Vec<u8>>,
816        end_bound: Bound<Vec<u8>>,
817    ) -> Result<IterIndexRange<'a, T>>
818    where
819        T: Send + 'static,
820    {
821        // Lazy mode falls back to `index_range`'s eager materialization
822        // path so the index walk + per-row `get` share a single
823        // snapshot. Streaming refills can't preserve that — each
824        // refill would open a fresh txn and observe a different
825        // snapshot. Lazy callers who want true streaming should open
826        // an explicit `Db::read_transaction` and call `iter_range`
827        // on the bound collection there.
828        if matches!(self.mode, CollectionMode::Lazy(_)) {
829            return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
830        }
831        let descriptor = self.active_index(index_name)?;
832        // #90: in Write mode the iterator must walk the LIVE index
833        // root (the per-txn cache's advanced value) so a streaming
834        // scan opened after an in-txn index write sees its own
835        // entries. In Read mode there is no write cache, so the
836        // open-time descriptor root is authoritative.
837        let index_root = match &self.mode {
838            CollectionMode::Write(w) => {
839                self.write_index_root(w, index_name, descriptor.root_page_id)?
840            }
841            _ => descriptor.root_page_id,
842        };
843        let (start, end) =
844            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
845        // Stash the resumption marker as `Excluded(start)` only on
846        // the first refill; afterwards the iterator overwrites it
847        // with the last full_key it emitted. The initial `start`
848        // bound is honoured by `refill` via the same `last_full_key`
849        // → `Excluded(_)` translation.
850        let initial_resume = match start {
851            Bound::Included(k) => InitialResume::Included(k),
852            Bound::Excluded(k) => InitialResume::Excluded(k),
853            Bound::Unbounded => InitialResume::Unbounded,
854        };
855        Ok(IterIndexRange {
856            coll: self,
857            descriptor_kind: descriptor.kind,
858            index_root,
859            initial_resume: Some(initial_resume),
860            last_full_key: None,
861            end_bound: end,
862            buffer: VecDeque::new(),
863            emitted_ids: HashSet::new(),
864            finished: false,
865        })
866    }
867
868    /// Lazy-mode fallback for [`Self::iter_range`]: delegates to
869    /// [`Self::index_range`] (which itself dispatches through a fresh
870    /// read txn) and rehouses the eagerly-materialised entries into
871    /// the streaming iterator's buffer as
872    /// [`StagedEntry::Resolved`]. Power-of-ten Rule 4: keeping this
873    /// isolated so the streaming path's `iter_range` body stays
874    /// small.
875    fn iter_range_lazy_fallback<'a>(
876        &'a self,
877        index_name: &str,
878        start_bound: Bound<Vec<u8>>,
879        end_bound: Bound<Vec<u8>>,
880    ) -> Result<IterIndexRange<'a, T>>
881    where
882        T: Send + 'static,
883    {
884        let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
885        let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
886        for item in materialized {
887            match item {
888                Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
889                Err(e) => buffer.push_back(Err(e)),
890            }
891        }
892        Ok(IterIndexRange {
893            coll: self,
894            descriptor_kind: obj_core::IndexKind::Standard,
895            index_root: 0,
896            initial_resume: None,
897            last_full_key: None,
898            end_bound: Bound::Unbounded,
899            buffer,
900            emitted_ids: HashSet::new(),
901            finished: true,
902        })
903    }
904
905    /// Look up the `IndexKind` of an active index by name. Used by
906    /// the M8 query layer to dispatch `Query::count` between the
907    /// entry-count and distinct-id-count paths (M8 follow-up #72).
908    ///
909    /// # Errors
910    ///
911    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
912    pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
913        Ok(self.active_index(index_name)?.kind)
914    }
915
916    /// Resolve `index_name` to an `Active` `IndexDescriptor` on the
917    /// collection. Errors with [`Error::IndexNotFound`] if absent
918    /// or `DroppedPending`.
919    ///
920    /// Returns a borrow into `self.descriptor.indexes` — no per-lookup
921    /// clone (#84). Every caller uses the descriptor only within the
922    /// enclosing `&self` borrow; `iter_range_encoded` copies the two
923    /// `Copy` fields it needs (`kind`, `root_page_id`) into the
924    /// returned iterator rather than holding the borrow.
925    fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
926        let entry = self
927            .descriptor
928            .indexes
929            .iter()
930            .find(|d| d.name == index_name);
931        match entry {
932            Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
933            _ => Err(Error::IndexNotFound {
934                collection: self.collection_name.clone().into_owned(),
935                name: index_name.to_owned(),
936            }),
937        }
938    }
939
940    /// Single-key `get` on an index B-tree. Used by `find_unique`
941    /// and by the Unique-kind branch of `lookup`.
942    fn index_get(
943        &self,
944        descriptor: &obj_core::IndexDescriptor,
945        key: &[u8],
946    ) -> Result<Option<Vec<u8>>> {
947        match &self.mode {
948            CollectionMode::Write(write) => {
949                // #90: descend the LIVE index root from the per-txn
950                // cache so an index read after an index-mutating write
951                // in the SAME txn observes its own entries.
952                let root_raw =
953                    self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
954                let root = PageId::new(root_raw)
955                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
956                let mut pager = lock_pager(write.env)?;
957                let tree = BTree::<FileHandle>::open(&pager, root)?;
958                tree.get(&mut pager, key)
959            }
960            CollectionMode::Read(read) => {
961                let root = PageId::new(descriptor.root_page_id)
962                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
963                let pager = lock_pager(read.env)?;
964                BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
965            }
966            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
967                operation: "internal: lazy-mode index_get",
968            }),
969        }
970    }
971
972    /// Collect every `(full_key, value)` entry from an index B-tree
973    /// whose key starts with `prefix`. For unique kinds the prefix
974    /// is the entire key (one match max); for non-unique kinds we
975    /// match every key whose first `prefix.len()` bytes equal
976    /// `prefix` (the trailing `id_suffix` varies per doc).
977    fn collect_nonunique_equal(
978        &self,
979        descriptor: &obj_core::IndexDescriptor,
980        prefix: &[u8],
981    ) -> Result<Vec<u64>> {
982        let entries = self.collect_range(
983            descriptor,
984            std::ops::Bound::Included(prefix.to_vec()),
985            // Upper bound is `prefix || 0xFF..` — every key whose
986            // user-portion equals `prefix` falls in
987            // `[prefix, prefix || u64::MAX]`.
988            std::ops::Bound::Included(append_max_id(prefix)),
989        )?;
990        let mut ids = Vec::with_capacity(entries.len());
991        let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
992        for (_full_key, value) in entries {
993            let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
994            if emitted.insert(id.get()) {
995                ids.push(id.get());
996            }
997        }
998        Ok(ids)
999    }
1000
1001    /// Collect a single id from a Unique index B-tree at `key`.
1002    fn collect_unique(
1003        &self,
1004        descriptor: &obj_core::IndexDescriptor,
1005        key: &[u8],
1006    ) -> Result<Vec<u64>> {
1007        match self.index_get(descriptor, key)? {
1008            Some(bytes) => Id::from_be_bytes(&bytes)
1009                .map(|id| vec![id.get()])
1010                .ok_or(Error::Corruption { page_id: 0 }),
1011            None => Ok(Vec::new()),
1012        }
1013    }
1014
1015    /// Collect every `(full_key, value)` entry from an index B-tree
1016    /// whose key falls within `(start, end)`.
1017    fn collect_range(
1018        &self,
1019        descriptor: &obj_core::IndexDescriptor,
1020        start: std::ops::Bound<Vec<u8>>,
1021        end: std::ops::Bound<Vec<u8>>,
1022    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1023        // Read mode pins every page read to the txn's snapshot (M12
1024        // #12): a concurrent writer's post-snapshot index entries must
1025        // not leak into a read txn's range/count. Write mode keeps the
1026        // live-pager scan so the txn observes its own uncommitted
1027        // index writes (it has no snapshot to pin against).
1028        match &self.mode {
1029            CollectionMode::Read(r) => {
1030                let root = PageId::new(descriptor.root_page_id)
1031                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1032                let pager = lock_pager(r.env)?;
1033                let iter = BTree::<FileHandle>::range_via_snapshot(
1034                    &pager,
1035                    r.snapshot,
1036                    root,
1037                    (start, end),
1038                )?;
1039                let mut out = Vec::new();
1040                for step in iter {
1041                    out.push(step?);
1042                }
1043                Ok(out)
1044            }
1045            CollectionMode::Write(w) => {
1046                // #90: scan the LIVE index root from the per-txn cache
1047                // so an in-txn index scan sees its own uncommitted
1048                // entries (the open-time descriptor root may be stale).
1049                let root_raw =
1050                    self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
1051                let root = PageId::new(root_raw)
1052                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1053                let mut pager = lock_pager(w.env)?;
1054                let tree = BTree::<FileHandle>::open(&pager, root)?;
1055                let iter = tree.range(&mut pager, (start, end))?;
1056                let mut out = Vec::new();
1057                for step in iter {
1058                    out.push(step?);
1059                }
1060                Ok(out)
1061            }
1062            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1063                operation: "internal: lazy-mode collect_range",
1064            }),
1065        }
1066    }
1067
1068    /// Resolve a `Vec<u64>` of `Id` integer values into concrete
1069    /// `T` documents via `self.get`. Preserves order; missing rows
1070    /// surface as `Error::Corruption` (orphan index entry).
1071    fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
1072        let mut out = Vec::with_capacity(ids.len());
1073        for raw in ids {
1074            let id =
1075                Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
1076            let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
1077            out.push(doc);
1078        }
1079        Ok(out)
1080    }
1081
1082    /// Count every entry in the primary tree WITHOUT decoding the
1083    /// documents. Used by the M8 [`crate::Query::count`] no-decode
1084    /// fast path; the iterator visits leaf pages and counts entries
1085    /// rather than running each through postcard.
1086    ///
1087    /// Power-of-ten Rule 2: bounded by the B+tree's `MAX_RANGE_NODES`
1088    /// budget (inherited from `BTree::range`).
1089    ///
1090    /// # Errors
1091    ///
1092    /// Pager / B-tree errors propagated.
1093    pub fn count_all(&self) -> Result<u64> {
1094        // Closure (rather than `Collection::count_all`) so the
1095        // higher-ranked lifetime on `dispatch_lazy`'s
1096        // `for<'a> FnOnce(&Collection<'a, T>) -> _` bound is
1097        // satisfied — `Collection::count_all` resolves to a single
1098        // lifetime fn-item type, which fails the HRTB check.
1099        #[allow(clippy::redundant_closure_for_method_calls)]
1100        if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
1101            return r;
1102        }
1103        // Read mode pins the full-tree scan to the txn's snapshot
1104        // (M12 #12) so a concurrent writer's post-snapshot inserts
1105        // cannot perturb the count; write mode keeps the live scan so
1106        // the txn sees its own uncommitted writes.
1107        match &self.mode {
1108            CollectionMode::Read(r) => {
1109                let pager = lock_pager(r.env)?;
1110                let pid = PageId::new(self.descriptor.primary_root)
1111                    .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1112                let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
1113                count_range_iter(iter)
1114            }
1115            CollectionMode::Write(w) => {
1116                // #90: count the LIVE primary root so an in-txn count
1117                // reflects this txn's own uncommitted inserts/deletes.
1118                let root = self.write_primary_root(w)?;
1119                let mut pager = lock_pager(w.env)?;
1120                let tree = btree_handle(&pager, root)?;
1121                let iter = tree.range(&mut pager, ..)?;
1122                count_range_iter(iter)
1123            }
1124            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1125                operation: "internal: lazy-mode count_all",
1126            }),
1127        }
1128    }
1129
1130    /// Count every entry whose encoded key falls inside `range` on
1131    /// the named index's B-tree, WITHOUT decoding any document. M8
1132    /// fast path for [`crate::Query::count`] when the source is an
1133    /// `index_range`.
1134    ///
1135    /// Returns the number of index B-tree entries — for an `Each`
1136    /// index that may exceed the document count (one doc emits
1137    /// multiple entries); for other kinds it equals the matching
1138    /// doc count.
1139    ///
1140    /// # Errors
1141    ///
1142    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1143    /// - Pager / B-tree errors propagated.
1144    pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1145    where
1146        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1147    {
1148        let start = encode_dynamic_bound(range.start_bound())?;
1149        let end = encode_dynamic_bound(range.end_bound())?;
1150        self.count_index_range_encoded(index_name, start, end)
1151    }
1152
1153    /// Encoded-bytes variant of [`Self::count_index_range`]. Bounds
1154    /// are the order-preserving field encoding of the user's
1155    /// `Dynamic` value(s); used by the query-layer count fast path.
1156    pub(crate) fn count_index_range_encoded(
1157        &self,
1158        index_name: &str,
1159        start_bound: std::ops::Bound<Vec<u8>>,
1160        end_bound: std::ops::Bound<Vec<u8>>,
1161    ) -> Result<u64> {
1162        if let Some(r) = self.dispatch_lazy(|c| {
1163            c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
1164        }) {
1165            return r;
1166        }
1167        let descriptor = self.active_index(index_name)?;
1168        let (start, end) =
1169            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1170        let entries = self.collect_range(descriptor, start, end)?;
1171        // `entries.len()` is a `usize`; promote to `u64` carefully.
1172        // `usize` is at most 64 bits on every supported target.
1173        u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1174            reason: "index range entry count exceeds u64",
1175        })
1176    }
1177
1178    /// Count distinct document `Id`s whose entries fall inside
1179    /// `range` on the named index's B-tree, WITHOUT decoding any
1180    /// document. For `Each` indexes this is the correct shape of
1181    /// the "how many docs match" question — `count_index_range`
1182    /// returns the entry count, which overshoots when a single doc
1183    /// contributes multiple entries.
1184    ///
1185    /// Implementation walks the index B-tree, parses the trailing
1186    /// 8-byte big-endian `Id` suffix from each non-unique key, and
1187    /// tracks the unique set in a bounded [`std::collections::HashSet`]
1188    /// capped at [`MAX_DISTINCT_IDS`]. Exceeding the cap surfaces
1189    /// [`Error::DistinctCountExceeded`] — the caller should narrow
1190    /// the range.
1191    ///
1192    /// # Per-kind semantics
1193    ///
1194    /// - `Standard`, `Composite`: equivalent to `count_index_range`
1195    ///   (one entry per doc by construction; the trailing-id-suffix
1196    ///   walk still produces the same total).
1197    /// - `Unique`: keys carry NO id suffix — the entry value is the
1198    ///   raw 8-byte `Id`; the walk reads the value instead.
1199    /// - `Each`: the dedup is meaningful — one doc may contribute
1200    ///   N entries under N distinct element keys.
1201    ///
1202    /// # Errors
1203    ///
1204    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1205    /// - [`Error::DistinctCountExceeded`] if the distinct set
1206    ///   exceeds [`MAX_DISTINCT_IDS`].
1207    /// - [`Error::Corruption`] if an entry's id suffix / value is
1208    ///   not parseable as an [`obj_core::Id`].
1209    /// - Pager / B-tree errors propagated.
1210    pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1211    where
1212        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1213    {
1214        let start = encode_dynamic_bound(range.start_bound())?;
1215        let end = encode_dynamic_bound(range.end_bound())?;
1216        self.count_distinct_ids_in_range_encoded(index_name, start, end)
1217    }
1218
1219    /// Encoded-bytes variant of [`Self::count_distinct_ids_in_range`].
1220    /// Bounds are the order-preserving field encoding of the user's
1221    /// `Dynamic` value(s); used by the query-layer count fast path.
1222    pub(crate) fn count_distinct_ids_in_range_encoded(
1223        &self,
1224        index_name: &str,
1225        start_bound: std::ops::Bound<Vec<u8>>,
1226        end_bound: std::ops::Bound<Vec<u8>>,
1227    ) -> Result<u64> {
1228        if let Some(r) = self.dispatch_lazy(|c| {
1229            c.count_distinct_ids_in_range_encoded(
1230                index_name,
1231                start_bound.clone(),
1232                end_bound.clone(),
1233            )
1234        }) {
1235            return r;
1236        }
1237        let descriptor = self.active_index(index_name)?;
1238        let (start, end) =
1239            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1240        let entries = self.collect_range(descriptor, start, end)?;
1241        let mut distinct: HashSet<u64> = HashSet::new();
1242        for (full_key, value) in entries {
1243            let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
1244            if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
1245                return Err(Error::DistinctCountExceeded {
1246                    limit: MAX_DISTINCT_IDS,
1247                });
1248            }
1249        }
1250        // `distinct.len()` is a `usize`; promote to `u64` carefully.
1251        // `usize` is at most 64 bits on every supported target.
1252        u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
1253            reason: "distinct id count exceeds u64",
1254        })
1255    }
1256
1257    /// Materialise every `(Id, T)` pair in the collection.
1258    ///
1259    /// Implementation note: M6 returns an owned `Vec` rather than a
1260    /// streaming iterator because the B+tree range API borrows the
1261    /// pager, and threading that borrow through the mutex guards
1262    /// in the iterator chain is awkward.  M7+ may convert to a
1263    /// streaming shape once the index API is in place.
1264    ///
1265    /// # Errors
1266    ///
1267    /// Pager / B-tree / codec errors propagated.
1268    pub fn all(&self) -> Result<Vec<(Id, T)>> {
1269        // Closure (rather than `Collection::all`) for the same
1270        // HRTB reason documented on `count_all` above.
1271        #[allow(clippy::redundant_closure_for_method_calls)]
1272        if let Some(r) = self.dispatch_lazy(|c| c.all()) {
1273            return r;
1274        }
1275        match &self.mode {
1276            CollectionMode::Write(write) => {
1277                // #90: scan the LIVE primary root so an in-txn `all()`
1278                // includes this txn's own uncommitted inserts.
1279                let root = self.write_primary_root(write)?;
1280                let mut pager = lock_pager(write.env)?;
1281                scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
1282            }
1283            CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
1284                read.snapshot,
1285                read.env,
1286                self.descriptor.primary_root,
1287                self.descriptor.collection_id,
1288            ),
1289            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1290                operation: "internal: lazy-mode all",
1291            }),
1292        }
1293    }
1294
1295    fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
1296        match &self.mode {
1297            CollectionMode::Write(w) => Ok(w),
1298            CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
1299                Err(Error::ReadOnly { operation: op })
1300            }
1301        }
1302    }
1303
1304    /// If this handle is in `Lazy` mode, dispatch `body` through a
1305    /// fresh read transaction on the bound [`crate::Db`] — opening a
1306    /// transient [`Collection`] via [`Self::open_readonly_named`] and
1307    /// invoking the user-supplied closure on it. Returns `Some(_)`
1308    /// when the dispatch fires (the closure ran or the underlying
1309    /// open failed); returns `None` for non-Lazy handles so the
1310    /// caller can fall through to the existing logic.
1311    ///
1312    /// Power-of-ten Rule 4: keeps each public method's body small —
1313    /// the dispatch shim is one match arm instead of a per-method
1314    /// `if let CollectionMode::Lazy(_)` ladder.
1315    fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
1316    where
1317        F: FnOnce(&Collection<'_, T>) -> Result<R>,
1318    {
1319        match &self.mode {
1320            CollectionMode::Lazy(LazyRef { db }) => {
1321                // Clone the stored name into an owned `Cow` for the
1322                // transient handle opened inside the private read txn.
1323                // `into_owned` keeps the `'static` Cow bound the
1324                // `open_readonly_named` signature requires regardless
1325                // of whether the stored name was Borrowed or Owned.
1326                let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
1327                Some(db.read_transaction(move |tx| {
1328                    let coll = Collection::<T>::open_readonly_named(tx, name)?;
1329                    body(&coll)
1330                }))
1331            }
1332            _ => None,
1333        }
1334    }
1335}
1336
1337// ---------- internal helpers --------------------------------------
1338
1339fn lock_pager(
1340    env: &obj_core::TxnEnv<FileHandle>,
1341) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1342    env.pager().lock().map_err(|_| Error::Busy {
1343        kind: obj_core::LockKind::WriterInProcess,
1344    })
1345}
1346
1347/// Ensure `T::COLLECTION` exists in the catalog, lazy-creating an
1348/// empty primary B-tree on first call.  Used on the write side.
1349fn ensure_collection<T: Document>(
1350    inner: &obj_core::WriteTxn<'_, FileHandle>,
1351    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1352) -> Result<CollectionDescriptor> {
1353    let mut pager = inner.lock_pager()?;
1354    let mut catalog_guard = lock_catalog(catalog)?;
1355    if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
1356        return Ok(d);
1357    }
1358    let tree = BTree::<FileHandle>::empty(&mut pager)?;
1359    let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
1360    let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
1361    catalog_guard
1362        .get(&mut pager, T::COLLECTION)?
1363        .ok_or(Error::Corruption { page_id: 0 })
1364}
1365
1366/// Re-read the descriptor for `T::COLLECTION` after any catalog
1367/// mutation. Used by [`Collection::open_or_create`] after the
1368/// reconciler runs.
1369fn reread_descriptor<T: Document>(
1370    inner: &obj_core::WriteTxn<'_, FileHandle>,
1371    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1372) -> Result<CollectionDescriptor> {
1373    let mut pager = inner.lock_pager()?;
1374    let catalog_guard = lock_catalog(catalog)?;
1375    catalog_guard
1376        .get(&mut pager, T::COLLECTION)?
1377        .ok_or(Error::Corruption { page_id: 0 })
1378}
1379
1380/// Reconcile `T::indexes()` against the catalog's stored descriptors
1381/// on the FIRST call per process per `(collection, version)`.
1382/// Subsequent calls for the same `(collection, version)` observe the
1383/// cache hit (in the shared `reconciled` set OR this txn's `staged`
1384/// set) and skip the catalog walk.
1385///
1386/// Reconciliation runs inside the user's WAL transaction so a
1387/// rolled-back txn leaves the catalog clean. If reconciliation
1388/// fails (e.g. `Error::IndexKindMismatch`), neither set is populated
1389/// so the next attempt re-runs the reconciler.
1390///
1391/// #93 — the skip-check is `shared ∪ staged`: a second handle of the
1392/// same `(collection, version)` within ONE txn still skips the
1393/// (idempotent) catalog walk via `staged`, but the key is recorded in
1394/// the per-txn `staged` set, NOT the shared set.
1395/// [`crate::WriteTxn::commit`] promotes the staged keys into the shared
1396/// set only after the WAL commit lands — so a rolled-back lazy-create
1397/// can never poison the shared cache into skipping reconciliation on a
1398/// later txn (the original bug).
1399fn reconcile_indexes_once<T: Document>(
1400    inner: &obj_core::WriteTxn<'_, FileHandle>,
1401    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1402    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1403    staged: &mut HashSet<(String, u32)>,
1404) -> Result<()> {
1405    // The generic `#[derive(Document)]` path is exactly the non-generic
1406    // raw seam ([`reconcile_specs_once`]) with the collection name,
1407    // version, and spec list supplied from the compile-time `T` rather
1408    // than from a caller argument. Both share ONE body so the cache /
1409    // staging / validation / catalog-walk semantics can never drift
1410    // apart (#108).
1411    reconcile_specs_once(
1412        inner,
1413        catalog,
1414        reconciled,
1415        staged,
1416        T::COLLECTION,
1417        T::VERSION,
1418        &T::indexes(),
1419    )
1420}
1421
1422/// Non-generic core of index reconciliation, shared by the generic
1423/// `#[derive(Document)]` path ([`reconcile_indexes_once`]) and the
1424/// public raw seam ([`crate::WriteTxn::reconcile_indexes_raw`], #108).
1425///
1426/// Reconciles `specs` against the catalog's stored descriptors for
1427/// `collection` on the FIRST call per process per `(collection,
1428/// version)`, honoring the same `shared ∪ staged` skip-cache (keyed by
1429/// `(collection, version)` — #130), per-txn staging, and pre-mutation
1430/// validation as the generic path — the only difference is that the
1431/// collection name, version, and spec list are arguments rather than
1432/// `T::COLLECTION` / `T::VERSION` / `T::indexes()`.
1433///
1434/// # Why the cache key includes `version` (#130)
1435///
1436/// `Catalog::reconcile_indexes` is a FULL reconcile: it declares specs
1437/// missing from the catalog AND drops `Active` descriptors absent from
1438/// `specs`. Keying the skip-cache by `collection` ALONE meant that once
1439/// a process reconciled a collection at one schema version, a LATER
1440/// version in the same process that ADDED a new index never reconciled
1441/// — the added index never became `Active` and index-maintaining writes
1442/// failed with `IndexNotFound`. Keying by `(collection, version)`
1443/// reconciles each version exactly once: the common single-version case
1444/// is unchanged (one key, reconciled once), and a cross-version index
1445/// ADD reconciles the new version's specs on its first write.
1446///
1447/// ## Caveat — conflicting index REMOVAL interleaved across versions
1448///
1449/// Because each `(collection, version)` reconciles independently and
1450/// `reconcile_indexes` drops `Active` indexes absent from the version's
1451/// specs, alternating writes between two live versions of the SAME
1452/// collection in ONE process — where the versions declare DIFFERENT
1453/// index sets — can leave the catalog reflecting whichever version
1454/// reconciled most recently (its specs drive the drop set). Index
1455/// ADDITION (the common monotonic schema-evolution case) is fully
1456/// correct. This is strictly better than the prior behavior, which
1457/// never reconciled the second version at all (so its added index was
1458/// simply missing); the removal-interleaving edge is a narrow,
1459/// single-process anti-pattern.
1460///
1461/// The caller MUST have ensured `collection` exists in the catalog
1462/// (the generic path runs [`ensure_collection`] first; the raw seam
1463/// runs [`crate::txn::ensure_collection_raw`]) — `reconcile_indexes`
1464/// errors with `CollectionNotFound` otherwise.
1465pub(crate) fn reconcile_specs_once(
1466    inner: &obj_core::WriteTxn<'_, FileHandle>,
1467    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1468    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1469    staged: &mut HashSet<(String, u32)>,
1470    collection: &str,
1471    version: u32,
1472    specs: &[obj_core::IndexSpec],
1473) -> Result<()> {
1474    // Cache key is `(collection, version)` (#130): each schema version
1475    // reconciles its OWN spec set exactly once per process.
1476    let key = (collection.to_owned(), version);
1477    // Fast path: already reconciled in a prior committed txn (shared)
1478    // or earlier in THIS txn (staged) — skip the catalog walk. Probe
1479    // `staged` first: it needs no lock and covers the repeat-handle
1480    // case; the shared probe takes the mutex only when `staged` misses.
1481    if staged.contains(&key) {
1482        return Ok(());
1483    }
1484    {
1485        let cache = lock_reconciled(reconciled)?;
1486        if cache.contains(&key) {
1487            return Ok(());
1488        }
1489    }
1490    // Validate specs before any catalog mutation so a bad spec
1491    // does not leave a half-reconciled catalog.
1492    for spec in specs {
1493        spec.validate()?;
1494    }
1495    {
1496        let mut pager = inner.lock_pager()?;
1497        let mut catalog_guard = lock_catalog(catalog)?;
1498        let _post = catalog_guard.reconcile_indexes(&mut pager, collection, specs)?;
1499    }
1500    // #93: stage in the PER-TXN set, not the shared one. Promotion to
1501    // the shared set is deferred to a successful `WriteTxn::commit`.
1502    staged.insert(key);
1503    Ok(())
1504}
1505
1506fn lock_reconciled(
1507    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1508) -> Result<std::sync::MutexGuard<'_, HashSet<(String, u32)>>> {
1509    reconciled.lock().map_err(|_| Error::Busy {
1510        kind: obj_core::LockKind::WriterInProcess,
1511    })
1512}
1513
1514/// Read-side descriptor lookup against a caller-supplied
1515/// collection name. M11 #93 introduced this byte-shape so the
1516/// namespace-aware [`Collection::open_readonly`] can perform the
1517/// catalog walk against either the calling Db's snapshot (no
1518/// namespace) or an attached Db's snapshot (with the namespace
1519/// prefix stripped).
1520fn read_descriptor_via_snapshot_named(
1521    env: &obj_core::TxnEnv<FileHandle>,
1522    snapshot: &obj_core::ReaderSnapshot<FileHandle>,
1523    name: &str,
1524) -> Result<Option<CollectionDescriptor>> {
1525    let pager = lock_pager(env)?;
1526    Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1527}
1528
1529/// #83 (b): fused one-shot point read for [`crate::Db::get`].
1530///
1531/// Resolves the collection descriptor and the primary-tree value for
1532/// `id` under a SINGLE pager-mutex acquisition (see
1533/// [`snapshot_resolve_and_get`]), then decodes the bytes against the
1534/// resolved `collection_id`. This collapses the two back-to-back
1535/// pager locks the equivalent handle path pays (one to open the
1536/// handle / resolve the descriptor, one for the `get`).
1537///
1538/// Observably identical to
1539/// `tx.collection::<T>()?.get(id)` for the one-shot caller: the
1540/// descriptor lookup and the value get run on the same `ReadTxn`
1541/// snapshot, and a missing collection still surfaces as
1542/// [`Error::CollectionNotFound`]. Namespaced reads (`<ns>.<tail>`)
1543/// fall back to the handle path so the attached-snapshot dispatch is
1544/// unchanged.
1545pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
1546    let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
1547    if namespace.is_some() {
1548        // Namespaced: keep the existing attached-snapshot dispatch.
1549        return Collection::<T>::open_readonly(tx)?.get(id);
1550    }
1551    let key = id.to_be_bytes();
1552    let resolved =
1553        snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
1554    match resolved {
1555        Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
1556        None => Ok(None),
1557    }
1558}
1559
1560/// Re-read the descriptor inside an already-locked pager + catalog
1561/// pair.  Surfaces a missing collection as `Error::Corruption`
1562/// because the caller has already opened a write txn against it.
1563fn catalog_get_required(
1564    pager: &mut Pager<FileHandle>,
1565    catalog: &Catalog<FileHandle>,
1566    name: &str,
1567) -> Result<CollectionDescriptor> {
1568    catalog
1569        .get(pager, name)?
1570        .ok_or(Error::Corruption { page_id: 0 })
1571}
1572
1573fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1574    let root_pid =
1575        PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1576    BTree::<FileHandle>::open(pager, root_pid)
1577}
1578
1579/// Drain a B-tree range iterator, counting entries WITHOUT retaining
1580/// their bytes. Shared by [`Collection::count_all`]'s live (write) and
1581/// snapshot-pinned (read) scan arms. Power-of-ten Rule 2: the
1582/// iterator carries its own `MAX_RANGE_NODES` budget; the `u64`
1583/// overflow check guards the count itself.
1584fn count_range_iter<I>(iter: I) -> Result<u64>
1585where
1586    I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
1587{
1588    let mut n: u64 = 0;
1589    for step in iter {
1590        // Probe-only: drop the bytes the moment they decode.
1591        let _ = step?;
1592        n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1593            reason: "primary tree entry count exceeds u64",
1594        })?;
1595    }
1596    Ok(n)
1597}
1598
1599// `persist_root` removed in M7 #58: every mutating method now
1600// routes through `index_maint::apply_doc_change`, which persists
1601// the descriptor (including the possibly-advanced `primary_root`)
1602// via `Catalog::update` after every per-index B-tree mutation.
1603
1604/// Snapshot-consistent B-tree lookup (M6 #53).
1605///
1606/// Walks the primary B+tree rooted at `primary_root` using
1607/// [`obj_core::btree::BTree::get_via_snapshot`], which descends
1608/// through [`obj_core::ReaderSnapshot::read_page`] rather than the
1609/// live `Pager::read_page`. This bypasses the WAL `state.view` /
1610/// `state.pending` overlays — a concurrent writer's post-snapshot
1611/// COW commits cannot poison the reader's walk.
1612///
1613/// `primary_root` MUST be the descriptor's `primary_root` as-of
1614/// the snapshot's pinned LSN (i.e. the value read via
1615/// [`obj_core::Catalog::lookup_via_snapshot`] in
1616/// [`read_descriptor_via_snapshot`] above). Using the writer's
1617/// live `primary_root` would defeat the snapshot read.
1618fn snapshot_get_via_btree(
1619    snap: &obj_core::ReaderSnapshot<FileHandle>,
1620    env: &obj_core::TxnEnv<FileHandle>,
1621    primary_root: u64,
1622    key: &[u8],
1623) -> Result<Option<Vec<u8>>> {
1624    let pager = lock_pager(env)?;
1625    let root_pid = PageId::new(primary_root)
1626        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1627    obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
1628}
1629
1630/// #83 (b): fused single-lock read. Resolves the descriptor for
1631/// `name` and performs the primary-tree `get` for `key` under ONE
1632/// pager-mutex acquisition, against the SAME `snapshot` — collapsing
1633/// the descriptor-lookup lock (`read_descriptor_via_snapshot_named`)
1634/// and the value-get lock (`snapshot_get_via_btree`) that the
1635/// two-call handle path pays back-to-back.
1636///
1637/// Returns `(descriptor, value)` so the caller can `decode` against
1638/// the resolved `collection_id`. A missing collection surfaces as
1639/// `Err(CollectionNotFound)` (matching the handle path's open-time
1640/// contract for the one-shot caller); a present collection with no
1641/// entry for `key` surfaces as `Ok(None)`.
1642///
1643/// Power-of-ten: keeps poison → `Error::Busy` (Rule 7, via
1644/// `lock_pager`); ≤ 60 lines (Rule 4); `debug_assert`s that the
1645/// snapshot-resolved `primary_root` is the one fed to the get
1646/// (Rule 5).
1647fn snapshot_resolve_and_get(
1648    snap: &obj_core::ReaderSnapshot<FileHandle>,
1649    env: &obj_core::TxnEnv<FileHandle>,
1650    name: &str,
1651    key: &[u8],
1652) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
1653    let pager = lock_pager(env)?;
1654    let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
1655        return Err(Error::CollectionNotFound {
1656            name: name.to_owned(),
1657        });
1658    };
1659    let root_pid = PageId::new(descriptor.primary_root)
1660        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1661    // Rule 5: the value-get MUST descend the descriptor's own
1662    // snapshot-time root — never a re-resolved or live root. A live
1663    // primary B-tree root is always page 1+, so a zero `collection_id`
1664    // here would mean we resolved a degenerate catalog row.
1665    debug_assert_eq!(
1666        root_pid.get(),
1667        descriptor.primary_root,
1668        "fused get must descend the snapshot-resolved primary_root",
1669    );
1670    let value =
1671        obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
1672    Ok(value.map(|v| (descriptor, v)))
1673}
1674
1675fn scan_all<T: Document>(
1676    pager: &mut Pager<FileHandle>,
1677    primary_root: u64,
1678    collection_id: u32,
1679) -> Result<Vec<(Id, T)>> {
1680    let tree = btree_handle(pager, primary_root)?;
1681    let iter = tree.range(pager, ..)?;
1682    let mut out = Vec::new();
1683    for entry in iter {
1684        let (key, value) = entry?;
1685        let id = Id::from_be_bytes(&key)
1686            .ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
1687        let doc = decode::<T>(&value, collection_id)?;
1688        out.push((id, doc));
1689    }
1690    Ok(out)
1691}
1692
1693fn snapshot_scan_via_btree<T: Document>(
1694    _snap: &obj_core::ReaderSnapshot<FileHandle>,
1695    env: &obj_core::TxnEnv<FileHandle>,
1696    primary_root: u64,
1697    collection_id: u32,
1698) -> Result<Vec<(Id, T)>> {
1699    let mut pager = lock_pager(env)?;
1700    scan_all::<T>(&mut pager, primary_root, collection_id)
1701}
1702
1703/// Encode the caller-supplied `Dynamic` value(s) into the bytes a
1704/// lookup against `descriptor` would use as a B-tree key. For
1705/// `Unique` indexes the result is the key bytes verbatim; for
1706/// non-unique kinds the lookup helpers extend with the per-doc
1707/// id suffix at scan time.
1708fn index_key_for_lookup(
1709    descriptor: &obj_core::IndexDescriptor,
1710    fields: &[obj_core::codec::Dynamic],
1711) -> Result<obj_core::index::EncodedIndexKey> {
1712    // Ref-based encode (#84): pass the descriptor's `kind` (Copy) and
1713    // `key_paths` BY REFERENCE — no transient `IndexSpec`, no
1714    // `name`/`key_paths` clone, no redundant `IndexSpec::validate` on
1715    // an already-validated on-disk descriptor. The byte output is
1716    // identical to the old `from_parts` + `encode_index_key` path
1717    // (see `encode_index_key_parts` and the byte-identity test).
1718    obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
1719}
1720
1721/// Encode a `Bound<&Dynamic>` into the index-key `Bound<Vec<u8>>` the
1722/// B-tree scan uses. Shared by the `Dynamic`-taking range methods on
1723/// [`Collection`].
1724///
1725/// A scalar `Dynamic` is encoded with the order-preserving field
1726/// encoder ([`obj_core::index::encode_field`]) — byte-identical to
1727/// what [`crate::Query::index_range`] produces, so a query and a
1728/// direct collection scan over the same scalar bound observe the
1729/// same entries. A [`Dynamic::Seq`](obj_core::codec::Dynamic::Seq)
1730/// bound is encoded as a composite key (the
1731/// [`COMPOSITE_TAG`](obj_core::index::COMPOSITE_TAG)-prefixed
1732/// concatenation of each element's field encoding) so a `Composite`
1733/// index can be range-scanned by a full tuple bound.
1734fn encode_dynamic_bound(
1735    b: std::ops::Bound<&obj_core::codec::Dynamic>,
1736) -> Result<std::ops::Bound<Vec<u8>>> {
1737    match b {
1738        std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
1739        std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
1740        std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
1741    }
1742}
1743
1744/// Encode one `Dynamic` bound value into index-key bytes. Scalars go
1745/// through [`obj_core::index::encode_field`]; a `Seq` is encoded as a
1746/// composite tuple key. Power-of-ten Rule 4: kept separate so
1747/// [`encode_dynamic_bound`] stays a thin three-arm match.
1748fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
1749    match v {
1750        obj_core::codec::Dynamic::Seq(fields) => {
1751            // `COMPOSITE_TAG || encode_field(f0) || encode_field(f1) ..`
1752            // is byte-identical to `encode_index_key`'s composite path
1753            // for the same fields — see `obj_core::index::key`.
1754            let mut out = vec![obj_core::index::COMPOSITE_TAG];
1755            for f in fields {
1756                out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
1757            }
1758            Ok(out)
1759        }
1760        _ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
1761    }
1762}
1763
1764/// Append 8 `0xFF` bytes to `prefix`. Used as the exclusive upper
1765/// bound of an equality lookup against a non-unique index: every
1766/// key with the same user-prefix is ≤ `prefix || 0xFF..` because
1767/// the trailing 8 bytes are an `Id` (`u64` BE).
1768fn append_max_id(prefix: &[u8]) -> Vec<u8> {
1769    let mut out = Vec::with_capacity(prefix.len() + 8);
1770    out.extend_from_slice(prefix);
1771    out.extend_from_slice(&u64::MAX.to_be_bytes());
1772    out
1773}
1774
1775/// Trim the trailing 8-byte id suffix off a non-unique index key.
1776/// For `Unique` keys the suffix is absent, so the full key is the
1777/// user portion.
1778fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
1779    match kind {
1780        obj_core::IndexKind::Unique => full_key.to_vec(),
1781        _ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
1782        _ => full_key.to_vec(),
1783    }
1784}
1785
1786/// Recover the `Id` (as a `u64`) from one index B-tree entry. For
1787/// non-unique kinds the id is the trailing 8 bytes of the KEY (the
1788/// suffix appended by the maintenance path); for `Unique` keys the
1789/// id is the VALUE. Used by
1790/// [`Collection::count_distinct_ids_in_range`].
1791fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
1792    // `Unique` indexes carry the id in the value; non-unique kinds
1793    // (Standard / Each / Composite) carry it as the trailing 8-byte
1794    // suffix of the key. The slicing here is O(1) — no per-entry
1795    // loop to bound (the outer walk's bound is the distinct-set cap).
1796    let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
1797        value
1798    } else {
1799        if full_key.len() < 8 {
1800            return Err(Error::Corruption { page_id: 0 });
1801        }
1802        &full_key[full_key.len() - 8..]
1803    };
1804    let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
1805    Ok(id.get())
1806}
1807
1808// =====================================================================
1809// Phase 7A (M14 #14) — streaming index range iterator
1810// =====================================================================
1811
1812/// Resumption marker for [`IterIndexRange`]'s first refill. After the
1813/// first batch the iterator switches to `Excluded(last_emitted_full_key)`
1814/// for subsequent refills (the same shape `Db::iter_all` uses for the
1815/// primary tree).
1816enum InitialResume {
1817    Included(Vec<u8>),
1818    Excluded(Vec<u8>),
1819    Unbounded,
1820}
1821
1822/// One entry in [`IterIndexRange`]'s pending buffer. Read/Write
1823/// modes stage `Pending(key, id)` and resolve the `T` lazily on
1824/// `next()`; Lazy mode pre-resolves under a single `read_transaction`
1825/// (to preserve snapshot consistency across the index walk + the
1826/// per-row primary `get`) and stages `Resolved(key, T)` directly.
1827enum StagedEntry<T> {
1828    Pending(Vec<u8>, Id),
1829    Resolved(Vec<u8>, T),
1830}
1831
1832/// Streaming iterator returned by [`Collection::iter_range`]. Yields
1833/// `Result<(user_key_bytes, T)>` one row at a time; internally
1834/// refills a fixed-size `(user_key, Id)` buffer in batches of
1835/// `ITER_INDEX_RANGE_BATCH = 256` so the per-step pager-lock cost
1836/// amortises. Memory stays bounded at `O(batch × small_bytes +
1837/// distinct_ids)` regardless of the range's total size.
1838///
1839/// Held data: a `&'a Collection<'_, T>` borrow (the iterator is bound
1840/// to the lifetime of `Collection::iter_range`'s `&self` borrow), the
1841/// index's root page-id, the dedup set for `Each` indexes, the next-
1842/// chunk resumption marker, and the staged batch.
1843pub struct IterIndexRange<'a, T: Document> {
1844    coll: &'a Collection<'a, T>,
1845    descriptor_kind: obj_core::IndexKind,
1846    index_root: u64,
1847    /// First-refill marker — `None` after the iterator has emitted
1848    /// at least one chunk; subsequent refills use `last_full_key`.
1849    initial_resume: Option<InitialResume>,
1850    /// Last full B-tree key emitted by the most recent refill. Drives
1851    /// the `Excluded(_)` resumption bound for the next chunk.
1852    last_full_key: Option<Vec<u8>>,
1853    /// User-supplied end bound (already widened per index kind).
1854    end_bound: Bound<Vec<u8>>,
1855    /// Pre-staged entries from the most recent refill. `next()`
1856    /// pops from the front. Each entry is either `Pending(key, id)`
1857    /// (deferred get-back, the Read/Write streaming path) or
1858    /// `Resolved(key, T)` (eager get-back inside a single
1859    /// `read_transaction`, the Lazy-mode fallback).
1860    buffer: VecDeque<Result<StagedEntry<T>>>,
1861    /// Persistent de-dup set for `Each` indexes. Power-of-ten Rule
1862    /// 3: the set is intentionally unbounded — if the caller wants
1863    /// a hard cap they should use
1864    /// [`Collection::count_distinct_ids_in_range`] (which caps at
1865    /// [`MAX_DISTINCT_IDS`]); the iterator's correctness contract
1866    /// is per-row dedup across the whole range.
1867    emitted_ids: HashSet<u64>,
1868    finished: bool,
1869}
1870
1871impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
1872    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1873        f.debug_struct("IterIndexRange")
1874            .field("descriptor_kind", &self.descriptor_kind)
1875            .field("index_root", &self.index_root)
1876            .field("buffer_len", &self.buffer.len())
1877            .field("emitted_ids_len", &self.emitted_ids.len())
1878            .field("finished", &self.finished)
1879            .finish_non_exhaustive()
1880    }
1881}
1882
1883impl<T: Document> IterIndexRange<'_, T> {
1884    /// Refill `self.buffer` with up to [`ITER_INDEX_RANGE_BATCH`]
1885    /// `(user_key, Id)` pairs by walking the index B-tree from the
1886    /// current resumption marker. Sets `self.finished` when the
1887    /// underlying range scan yields fewer than the requested batch
1888    /// (i.e. it ran past the end bound).
1889    ///
1890    /// Power-of-ten Rule 7: per-step decode errors are pushed into
1891    /// the buffer as `Err(_)` so the caller observes them via
1892    /// `next()` rather than aborting iteration.
1893    fn refill(&mut self) -> Result<()> {
1894        let env = match &self.coll.mode {
1895            CollectionMode::Write(w) => w.env,
1896            CollectionMode::Read(r) => r.env,
1897            CollectionMode::Lazy(_) => {
1898                // Lazy mode falls back to the eager `index_range`
1899                // path in `iter_range` itself (see below). Reaching
1900                // refill in Lazy mode indicates an internal logic
1901                // error, NOT a recoverable corruption — surface as
1902                // a typed error rather than `unwrap`.
1903                return Err(Error::ReadOnly {
1904                    operation: "internal: iter_range refill in Lazy mode",
1905                });
1906            }
1907        };
1908        let root_pid = PageId::new(self.index_root)
1909            .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1910        let start = self.next_start_bound();
1911        let end = clone_bound_ref(&self.end_bound);
1912        let mut pager = lock_pager(env)?;
1913        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1914        let iter = tree.range(&mut pager, (start, end))?;
1915        let mut staged: VecDeque<Result<StagedEntry<T>>> =
1916            VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
1917        let mut last_full: Option<Vec<u8>> = None;
1918        let mut consumed: usize = 0;
1919        for step in iter {
1920            if consumed >= ITER_INDEX_RANGE_BATCH {
1921                break;
1922            }
1923            consumed = consumed
1924                .checked_add(1)
1925                .ok_or(Error::BTreeInvariantViolated {
1926                    reason: "iter_range batch counter overflow",
1927                })?;
1928            self.stage_one(&mut staged, &mut last_full, step);
1929        }
1930        if consumed < ITER_INDEX_RANGE_BATCH {
1931            self.finished = true;
1932        }
1933        drop(pager);
1934        self.buffer.extend(staged);
1935        if let Some(k) = last_full {
1936            self.last_full_key = Some(k);
1937        }
1938        Ok(())
1939    }
1940
1941    /// Process one B-tree step into the staged batch. Encapsulates
1942    /// the `Each`-dedup, the trailing-id-suffix strip, and the
1943    /// `Id::from_be_bytes` parse. Free helper so the refill body
1944    /// stays under the Rule-4 60-line ceiling.
1945    fn stage_one(
1946        &mut self,
1947        staged: &mut VecDeque<Result<StagedEntry<T>>>,
1948        last_full: &mut Option<Vec<u8>>,
1949        step: Result<(Vec<u8>, Vec<u8>)>,
1950    ) {
1951        let (full_key, id_bytes) = match step {
1952            Ok(kv) => kv,
1953            Err(e) => {
1954                staged.push_back(Err(e));
1955                return;
1956            }
1957        };
1958        *last_full = Some(full_key.clone());
1959        let Some(id) = Id::from_be_bytes(&id_bytes) else {
1960            staged.push_back(Err(Error::Corruption { page_id: 0 }));
1961            return;
1962        };
1963        if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
1964            // Same doc already emitted under a different element
1965            // key — skip without producing an output entry.
1966            return;
1967        }
1968        let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
1969        staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
1970    }
1971
1972    /// Compute the start bound for the next refill: use
1973    /// `initial_resume` on the first call (consuming it), thereafter
1974    /// use `Excluded(last_full_key)`.
1975    fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
1976        if let Some(initial) = self.initial_resume.take() {
1977            return match initial {
1978                InitialResume::Included(k) => Bound::Included(k),
1979                InitialResume::Excluded(k) => Bound::Excluded(k),
1980                InitialResume::Unbounded => Bound::Unbounded,
1981            };
1982        }
1983        match &self.last_full_key {
1984            Some(k) => Bound::Excluded(k.clone()),
1985            None => Bound::Unbounded,
1986        }
1987    }
1988}
1989
1990impl<T: Document> Iterator for IterIndexRange<'_, T> {
1991    type Item = Result<(Vec<u8>, T)>;
1992
1993    fn next(&mut self) -> Option<Self::Item> {
1994        loop {
1995            if let Some(staged) = self.buffer.pop_front() {
1996                return Some(self.resolve_one(staged));
1997            }
1998            if self.finished {
1999                return None;
2000            }
2001            if let Err(e) = self.refill() {
2002                // Latch the iterator shut on a refill failure
2003                // (lock acquisition, B-tree open, etc.). Surface
2004                // the error once, then return None on subsequent
2005                // calls — power-of-ten Rule 7.
2006                self.finished = true;
2007                return Some(Err(e));
2008            }
2009            // refill ran; the loop will pop or notice finished.
2010        }
2011    }
2012}
2013
2014impl<T: Document> IterIndexRange<'_, T> {
2015    /// Resolve one staged entry into a `(user_key, T)` pair. For
2016    /// `Pending(_, id)` entries (the Read/Write streaming path),
2017    /// calls [`Collection::get`] to decode `T` on demand; for
2018    /// `Resolved(_, T)` entries (the Lazy-mode eager path), returns
2019    /// the already-decoded value. Orphan index entries (id missing
2020    /// in the primary tree) surface as [`Error::Corruption`],
2021    /// matching [`Collection::index_range`]'s existing contract.
2022    fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
2023        match staged? {
2024            StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
2025                Some(doc) => Ok((user_key, doc)),
2026                None => Err(Error::Corruption { page_id: 0 }),
2027            },
2028            StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
2029        }
2030    }
2031}
2032
2033/// Clone a `&Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`. Takes a
2034/// borrowed owned bound (the shape `IterIndexRange::end_bound`
2035/// stores) and hands back an owned copy for the resumption walk.
2036fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
2037    match b {
2038        Bound::Included(v) => Bound::Included(v.clone()),
2039        Bound::Excluded(v) => Bound::Excluded(v.clone()),
2040        Bound::Unbounded => Bound::Unbounded,
2041    }
2042}