Skip to main content

obj/
collection.rs

1//! `Collection<T>` — typed handle to one collection's primary
2//! B-tree.
3
4use std::borrow::Cow;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::marker::PhantomData;
7use std::ops::Bound;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use obj_core::btree::BTree;
11use obj_core::codec::{decode, encode};
12use obj_core::pager::page::PageId;
13use obj_core::pager::Pager;
14use obj_core::platform::FileHandle;
15use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
16
17/// Boxed iterator alias used by [`Collection::lookup`] and
18/// [`Collection::index_range`]. The iterator borrows from the
19/// enclosing transaction; the `'tx` lifetime is bound on the
20/// `Collection<T>` it was obtained from.
21pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
22
23/// Per-batch refill size for [`IterIndexRange`]. The iterator yields
24/// one `(user_key, T)` pair at a time but pulls index B-tree entries
25/// in fixed-size chunks so the per-step pager-lock acquisition cost
26/// amortises across many `next()` calls. Power-of-ten Rule 3: the
27/// buffer is fixed-size — at ~8 bytes/key plus the trailing 8-byte
28/// id suffix that's ~4 KiB peak for the staged batch. The buffer
29/// does NOT scale with the range's total size.
30const ITER_INDEX_RANGE_BATCH: usize = 256;
31
32/// Per-call cap on the bounded `HashSet<Id>` used by
33/// [`Collection::count_distinct_ids_in_range`] to count unique
34/// document `Id`s under an `Each` index. Power-of-ten Rule 3: the
35/// distinct set is allocation-bounded; exceeding the cap surfaces
36/// [`obj_core::Error::DistinctCountExceeded`] rather than chewing
37/// arbitrary memory. The user can narrow the range via
38/// `.index_range(...)` to fit inside the budget.
39pub const MAX_DISTINCT_IDS: usize = 100_000;
40
41use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
42
43/// #90 — per-transaction descriptor cache. Maps collection name to
44/// its LIVE [`CollectionDescriptor`] (with `next_id`, `primary_root`,
45/// and every index `root_page_id` advanced IN-MEMORY across the
46/// transaction). This is the single mid-txn source of truth for those
47/// roots; [`crate::WriteTxn::commit`] flushes each entry back to the
48/// catalog exactly once. Shared (via `Arc`) between the [`WriteTxn`]
49/// and every [`Collection`] handle opened on it, so two handles of the
50/// same collection observe the same advancing descriptor.
51pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
52
53/// Construct an empty [`DescriptorCache`].
54#[must_use]
55pub(crate) fn new_descriptor_cache() -> DescriptorCache {
56    Arc::new(Mutex::new(HashMap::new()))
57}
58
59/// Acquire the descriptor-cache mutex; map poison into `Busy` (Rule
60/// 7 — no panic on a poisoned lock).
61pub(crate) fn lock_descriptors(
62    cache: &Mutex<HashMap<String, CollectionDescriptor>>,
63) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
64    cache.lock().map_err(|_| Error::Busy {
65        kind: obj_core::LockKind::WriterInProcess,
66    })
67}
68
69/// Resolve the live cached descriptor for `name`, lazily loading it
70/// from the catalog B-tree on first touch in this transaction. After
71/// the first load the catalog tree is NEVER re-read mid-txn for this
72/// collection — every subsequent read/advance goes through the cache
73/// entry, so the unique pre-check and every index-tree open observe
74/// the in-memory-advanced roots (#90 load-bearing invariant).
75///
76/// Returns a mutable borrow into the cache so callers bump `next_id`,
77/// advance `primary_root`, and let `apply_doc_change` advance index
78/// roots in place — all without a per-doc `Catalog::update`.
79pub(crate) fn cached_descriptor_mut<'g>(
80    cache: &'g mut HashMap<String, CollectionDescriptor>,
81    pager: &mut Pager<FileHandle>,
82    catalog: &Catalog<FileHandle>,
83    name: &str,
84) -> Result<&'g mut CollectionDescriptor> {
85    if !cache.contains_key(name) {
86        let descriptor = catalog_get_required(pager, catalog, name)?;
87        cache.insert(name.to_owned(), descriptor);
88    }
89    cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
90}
91
92/// Typed handle to a collection.
93///
94/// Construct via [`crate::WriteTxn::collection`] (lazy-create) or
95/// [`crate::ReadTxn::collection`] (read-only; errors if absent), or
96/// via [`crate::Db::collection`] for a one-shot read-only handle
97/// bound to a runtime collection name (M11 #94 — Phase 1B).
98///
99/// All methods take `&self` because the underlying state lives
100/// behind mutexes on the parent transaction; the handle itself is
101/// stateless beyond the descriptor it caches.
102pub struct Collection<'tx, T: Document> {
103    /// `Mode::Write` carries the [`WriteTxn`] reference;
104    /// `Mode::Read` the [`ReadTxn`] reference; `Mode::Lazy` carries
105    /// a `&Db` and opens a private read transaction on each method
106    /// call.  The [`Collection`]'s lifetime is bound to whichever
107    /// it was constructed from.
108    mode: CollectionMode<'tx>,
109    /// Collection name resolved at construction. For handles built
110    /// via [`crate::WriteTxn::collection`] / [`crate::ReadTxn::collection`]
111    /// this equals `T::COLLECTION`. For handles built via
112    /// [`crate::Db::collection`] this is the caller-supplied runtime
113    /// name (which may differ from `T::COLLECTION`, e.g.
114    /// `"archive.orders"` against a type whose declared `COLLECTION`
115    /// is `"orders"`).
116    //
117    // `collection_name` rather than the lint-suggested `name`
118    // because `name` collides with `IndexDescriptor::name` /
119    // `IndexSpec::name` throughout this module's internals.
120    //
121    // `Cow<'static, str>` (#84): the common `WriteTxn`/`ReadTxn`
122    // open path resolves to `T::COLLECTION` — a `&'static str` —
123    // so it can be stored as `Cow::Borrowed` with NO `to_owned()`
124    // heap allocation per read. The runtime-name path
125    // (`Db::collection`, `open_readonly_named` with a caller name)
126    // stores `Cow::Owned`.
127    #[allow(clippy::struct_field_names)]
128    collection_name: Cow<'static, str>,
129    /// Cached descriptor.  Populated at construction for `Write` /
130    /// `Read` mode; never updated in place — a `Collection<T>`
131    /// reflects the catalog row that existed when the handle was
132    /// opened.  `update` / `delete` inside the same txn re-read the
133    /// descriptor through the catalog lock to capture mutations
134    /// from prior calls in the same txn (e.g. an `insert` that
135    /// advanced `next_id`).
136    ///
137    /// For `Lazy` mode the descriptor is a sentinel — the real
138    /// descriptor is loaded inside each method's private read
139    /// transaction.
140    descriptor: CollectionDescriptor,
141    _phantom: PhantomData<fn() -> T>,
142}
143
144/// Backing-reference inside a [`Collection`].  Encodes whether
145/// the txn is writable.
146enum CollectionMode<'tx> {
147    Write(WriteRef<'tx>),
148    Read(ReadRef<'tx>),
149    /// Read-only handle that opens a private read transaction on
150    /// each method call. Constructed by [`crate::Db::collection`];
151    /// the `'tx` lifetime is the borrow of `&Db`.
152    Lazy(LazyRef<'tx>),
153}
154
155struct LazyRef<'db> {
156    /// Borrowed `Db` — methods open one-shot read transactions on
157    /// it. The `'db` lifetime keeps the borrow alive for as long
158    /// as the [`Collection`] handle.
159    db: &'db crate::Db,
160}
161
162struct WriteRef<'tx> {
163    env: &'tx obj_core::TxnEnv<FileHandle>,
164    catalog: Arc<Mutex<Catalog<FileHandle>>>,
165    /// #90: shared per-txn descriptor cache (cloned from the owning
166    /// [`WriteTxn`]). The single mid-txn source of truth for
167    /// `next_id` / `primary_root` / index roots; flushed once at
168    /// commit.
169    descriptors: DescriptorCache,
170}
171
172struct ReadRef<'tx> {
173    snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
174    env: &'tx obj_core::TxnEnv<FileHandle>,
175}
176
177impl<'tx, T: Document> Collection<'tx, T> {
178    /// Open the collection on the write side, lazy-creating the
179    /// catalog row + an empty primary B-tree on first call, and
180    /// reconciling the type's declared `Document::indexes()` against
181    /// the catalog's stored descriptors (M7 #57) on the first call
182    /// per process per collection.
183    pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
184        // Stage 1: lazy-create the collection row + an empty primary
185        // B-tree if this is the first call for `T` against this
186        // database. The returned descriptor is the pre-reconcile
187        // shape; we re-read after reconciliation below because the
188        // reconciler may have appended index descriptors.
189        let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
190        reconcile_indexes_once::<T>(
191            &tx.inner,
192            &tx.catalog,
193            &tx.reconciled,
194            &mut tx.reconciled_staged,
195        )?;
196        // Re-read the descriptor in case the reconciler mutated the
197        // index roster (the reconciler runs through `Catalog::update`
198        // which rewrites the descriptor's `indexes` vector).
199        let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
200        Ok(Self {
201            mode: CollectionMode::Write(WriteRef {
202                env: tx.inner.env(),
203                catalog: Arc::clone(&tx.catalog),
204                descriptors: Arc::clone(&tx.descriptors),
205            }),
206            // `T::COLLECTION` is `&'static str` (#84): borrow, don't
207            // allocate.
208            collection_name: Cow::Borrowed(T::COLLECTION),
209            descriptor,
210            _phantom: PhantomData,
211        })
212    }
213
214    /// Open the collection on the read side.  Errors if the
215    /// collection has not yet been registered AT THE SNAPSHOT'S
216    /// PINNED LSN — a collection created by a concurrent writer
217    /// AFTER the snapshot was pinned is invisible to this txn (M6
218    /// #53). The descriptor is read by walking the catalog B+tree
219    /// rooted at `snapshot.root_catalog()` (the value captured by
220    /// `Pager::reader_snapshot` at pin time) using the snapshot-
221    /// aware [`Catalog::lookup_via_snapshot`] free function — NOT
222    /// via the writer's live `Catalog.tree.root`, which a concurrent
223    /// writer may have COW-advanced past the snapshot.
224    ///
225    /// M11 #93: if `T::COLLECTION` is of the form
226    /// `<namespace>.<name>`, the lookup dispatches against the
227    /// attached database registered under `<namespace>` instead of
228    /// the calling Db. The attached snapshot is the one pinned by
229    /// `Db::read_transaction` at txn-begin; the attached env is
230    /// passed through the same way.
231    pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
232        // `T::COLLECTION` is `&'static str` — store it as
233        // `Cow::Borrowed` (#84), avoiding the per-read `to_owned()`
234        // heap allocation on the common typed-handle path.
235        Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
236    }
237
238    /// Open the collection on the read side against a caller-supplied
239    /// runtime `name`. Like [`Self::open_readonly`] but the catalog
240    /// lookup uses `name` instead of `T::COLLECTION` — required for
241    /// [`crate::Db::collection`]'s Phase 1B accessor (M11 #94), which
242    /// binds a runtime collection name (e.g. `"archive.orders"`) to
243    /// the typed handle.
244    ///
245    /// Namespace dispatch (`<ns>.<tail>` → attached database) follows
246    /// the same shape as [`Self::open_readonly`].
247    pub(crate) fn open_readonly_named(
248        tx: &'tx ReadTxn<'_>,
249        name: Cow<'static, str>,
250    ) -> Result<Self> {
251        let (namespace, tail) = crate::db::split_namespace(&name);
252        let (env, snapshot, lookup_name): (
253            &'tx obj_core::TxnEnv<FileHandle>,
254            &'tx obj_core::ReaderSnapshot<FileHandle>,
255            &str,
256        ) = match namespace {
257            None => (tx.inner.env(), tx.inner.snapshot(), &name),
258            Some(ns) => {
259                let ctx = tx
260                    .attached
261                    .get(ns)
262                    .ok_or_else(|| Error::CollectionNamespaceUnknown {
263                        namespace: ns.to_owned(),
264                    })?;
265                (ctx.env.as_ref(), &ctx.snapshot, tail)
266            }
267        };
268        let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
269        else {
270            return Err(Error::CollectionNotFound {
271                name: name.into_owned(),
272            });
273        };
274        Ok(Self {
275            mode: CollectionMode::Read(ReadRef { snapshot, env }),
276            // `name` is moved in unchanged — `Cow::Borrowed(&'static)`
277            // for the typed-handle path (no alloc), `Cow::Owned` for
278            // runtime-name handles.
279            collection_name: name,
280            descriptor,
281            _phantom: PhantomData,
282        })
283    }
284
285    /// Construct a deferred-lookup, read-only handle bound to a
286    /// runtime collection name. Each method call opens a private
287    /// read transaction on `db` and dispatches through
288    /// [`Self::open_readonly_named`]. Construction is infallible —
289    /// errors (missing collection, unknown namespace, etc.) surface
290    /// at the first method call.
291    ///
292    /// Used by [`crate::Db::collection`] (Phase 1B, M11 #94).
293    pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
294        Self {
295            mode: CollectionMode::Lazy(LazyRef { db }),
296            collection_name: Cow::Owned(name),
297            // Sentinel descriptor — never read in Lazy mode; the
298            // real descriptor is loaded inside each method's
299            // private read transaction.
300            descriptor: CollectionDescriptor::new(0, 0, 0),
301            _phantom: PhantomData,
302        }
303    }
304
305    /// Cached descriptor (`collection_id`, `primary_root`,
306    /// `type_version`, `next_id` at handle-open time).
307    #[must_use]
308    pub fn descriptor(&self) -> &CollectionDescriptor {
309        &self.descriptor
310    }
311
312    /// #90: the LIVE primary-tree root for a `Write`-mode handle.
313    ///
314    /// Prefers the per-txn descriptor cache (which carries every
315    /// `primary_root` advance from this txn's prior writes) so a
316    /// read-after-write on the same handle inside one transaction
317    /// observes its own uncommitted inserts. Falls back to the
318    /// handle's open-time `primary_root` if this collection has not
319    /// yet been written in the txn (no cache entry).
320    fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
321        let cache = lock_descriptors(&write.descriptors)?;
322        Ok(cache
323            .get(self.collection_name.as_ref())
324            .map_or(self.descriptor.primary_root, |d| d.primary_root))
325    }
326
327    /// #90: the LIVE `root_page_id` for the named `Active` index on a
328    /// `Write`-mode handle. Prefers the per-txn cache so a read after
329    /// an index-mutating write in the same txn descends the advanced
330    /// root; falls back to `fallback` (the handle's open-time
331    /// descriptor entry) when the collection has no cache entry yet.
332    fn write_index_root(
333        &self,
334        write: &WriteRef<'tx>,
335        index_name: &str,
336        fallback: u64,
337    ) -> Result<u64> {
338        let cache = lock_descriptors(&write.descriptors)?;
339        let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
340            return Ok(fallback);
341        };
342        let live = descriptor
343            .indexes
344            .iter()
345            .find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
346            .map_or(fallback, |d| d.root_page_id);
347        Ok(live)
348    }
349
350    /// Insert `doc`.  Returns the freshly-allocated [`Id`].
351    ///
352    /// # Errors
353    ///
354    /// - [`Error::ReadOnly`] if the handle is read-only.
355    /// - Pager / catalog / codec errors propagated.
356    // `doc: T` is taken by value for caller ergonomics (the user
357    // owns the value and gives it to the database).  The codec's
358    // `encode(&T, ...)` takes a reference, so clippy sees the
359    // function body as "owned but borrows only" — accepted as the
360    // intended public-API shape.
361    #[allow(clippy::needless_pass_by_value)]
362    pub fn insert(&self, doc: T) -> Result<Id> {
363        let write = self.write_or_err("insert")?;
364        let name: &str = self.collection_name.as_ref();
365        let mut pager = lock_pager(write.env)?;
366        let catalog = lock_catalog(&write.catalog)?;
367        // #90: the cached descriptor is the SOLE mid-txn source of
368        // truth. `next_id` is an in-memory bump; `primary_root` and
369        // index roots advance in the cache; NO per-doc
370        // `Catalog::update` — the single flush is deferred to commit.
371        let mut cache = lock_descriptors(&write.descriptors)?;
372        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
373        let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
374        let bytes = encode(&doc, descriptor.collection_id)?;
375        let key = id.to_be_bytes();
376        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
377        tree.insert(&mut pager, &key, &bytes)?;
378        descriptor.primary_root = tree.root().get();
379        // Index maintenance lands inside the same WAL transaction as
380        // the primary write, descending the cache's in-memory index
381        // roots. A `UniqueConstraintViolation` here surfaces via `?`
382        // and the surrounding `WriteTxn` rolls back atomically.
383        crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
384        Ok(id)
385    }
386
387    /// Fetch the document at `id`.
388    ///
389    /// On the write side this consults the pager (sees pending
390    /// writes in the current txn).  On the read side it consults
391    /// the snapshot's frozen view.
392    ///
393    /// # Lazy migration (M10 #84)
394    ///
395    /// If the on-disk record was written by an older
396    /// `Document::VERSION` than the current `T::VERSION`, the codec
397    /// walks the stored bytes through the schema registered for
398    /// that version (see `T::historical_schemas()`) and dispatches
399    /// the resulting structured `Dynamic` through `T::migrate`.
400    /// **The migrated bytes are NOT written back to disk.** The
401    /// next [`Collection::get`] re-reads the same v(n) bytes and
402    /// re-runs migration. Only a subsequent
403    /// [`Collection::update`](Self::update) /
404    /// [`Collection::upsert`](Self::upsert) writes the document
405    /// back, at which point the on-disk header records
406    /// `T::VERSION`.
407    ///
408    /// This contract is what allows mixed-version reads to scale:
409    /// a 10⁹-doc collection does not need to be batch-rewritten on
410    /// schema upgrade. Power-of-ten Rule 7: every "migration ran"
411    /// path returns the migrated `T`; no implicit write-back.
412    ///
413    /// # Errors
414    ///
415    /// Pager / B-tree / codec errors propagated. In particular:
416    ///
417    /// - [`Error::SchemaNotRegistered`](obj_core::Error::SchemaNotRegistered)
418    ///   if the stored record carries a `type_version` for which
419    ///   `T::historical_schemas()` has no entry.
420    /// - [`Error::SchemaMigrationNotImplemented`](obj_core::Error::SchemaMigrationNotImplemented)
421    ///   if the registered `T::migrate` returns the default error.
422    pub fn get(&self, id: Id) -> Result<Option<T>> {
423        if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
424            return r;
425        }
426        let key = id.to_be_bytes();
427        let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
428            return Ok(None);
429        };
430        Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
431    }
432
433    /// Read the primary-tree value at `key` against the current
434    /// `Write` / `Read` mode (Lazy mode is dispatched upstream of
435    /// every public caller via [`Self::dispatch_lazy`]).
436    fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
437        match &self.mode {
438            CollectionMode::Write(write) => {
439                // #90: descend the LIVE primary root so a get after an
440                // insert/update in the SAME txn sees its own writes.
441                let primary_root = self.write_primary_root(write)?;
442                let mut pager = lock_pager(write.env)?;
443                let tree = btree_handle(&pager, primary_root)?;
444                tree.get(&mut pager, key)
445            }
446            CollectionMode::Read(read) => {
447                snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
448            }
449            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
450                operation: "internal: lazy-mode primary read",
451            }),
452        }
453    }
454
455    /// Apply `f` to the document at `id`, writing the mutated value
456    /// back.
457    ///
458    /// # Errors
459    ///
460    /// - [`Error::ReadOnly`] on a read-side handle.
461    /// - [`Error::DocumentNotFound`] if `id` is absent.
462    /// - Pager / catalog / codec errors propagated.
463    pub fn update<F>(&self, id: Id, f: F) -> Result<()>
464    where
465        F: FnOnce(&mut T),
466    {
467        let write = self.write_or_err("update")?;
468        let name: &str = self.collection_name.as_ref();
469        let mut pager = lock_pager(write.env)?;
470        let catalog = lock_catalog(&write.catalog)?;
471        // #90: resolve the live cached descriptor (sole mid-txn source
472        // of truth); advance its roots in place, no per-doc flush.
473        let mut cache = lock_descriptors(&write.descriptors)?;
474        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
475        let key = id.to_be_bytes();
476        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
477        let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
478            collection: T::COLLECTION,
479            id: id.get(),
480        })?;
481        let old_value = decode::<T>(&existing, descriptor.collection_id)?;
482        let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
483        f(&mut new_value);
484        let bytes = encode(&new_value, descriptor.collection_id)?;
485        tree.delete(&mut pager, &key)?;
486        tree.insert(&mut pager, &key, &bytes)?;
487        descriptor.primary_root = tree.root().get();
488        crate::index_maint::apply_doc_change::<T>(
489            &mut pager,
490            descriptor,
491            Some(&old_value),
492            Some(&new_value),
493            id,
494        )?;
495        Ok(())
496    }
497
498    /// Delete the document at `id`.  Returns `true` if it existed.
499    ///
500    /// # Errors
501    ///
502    /// - [`Error::ReadOnly`] on a read-side handle.
503    /// - Pager / catalog errors propagated.
504    pub fn delete(&self, id: Id) -> Result<bool> {
505        let write = self.write_or_err("delete")?;
506        let name: &str = self.collection_name.as_ref();
507        let mut pager = lock_pager(write.env)?;
508        let catalog = lock_catalog(&write.catalog)?;
509        // #90: cached descriptor is the sole mid-txn source of truth.
510        let mut cache = lock_descriptors(&write.descriptors)?;
511        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
512        let key = id.to_be_bytes();
513        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
514        // Materialise the old document so the index-maintenance
515        // path can compute the OLD key set and emit per-index
516        // delete entries. If the row does not exist, we still
517        // return `Ok(false)` for API back-compat (no index work).
518        let old_value = match tree.get(&mut pager, &key)? {
519            Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
520            None => None,
521        };
522        let removed = tree.delete(&mut pager, &key)?;
523        descriptor.primary_root = tree.root().get();
524        crate::index_maint::apply_doc_change::<T>(
525            &mut pager,
526            descriptor,
527            old_value.as_ref(),
528            None,
529            id,
530        )?;
531        Ok(removed)
532    }
533
534    /// Insert-or-replace `doc` at `id`.
535    ///
536    /// # Errors
537    ///
538    /// - [`Error::ReadOnly`] on a read-side handle.
539    /// - Pager / catalog / codec errors propagated.
540    // `doc: T` passed by value for ergonomic ownership transfer;
541    // codec takes `&T` so clippy reports a "borrow only" body.
542    #[allow(clippy::needless_pass_by_value)]
543    pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
544        let write = self.write_or_err("upsert")?;
545        let name: &str = self.collection_name.as_ref();
546        let mut pager = lock_pager(write.env)?;
547        let catalog = lock_catalog(&write.catalog)?;
548        // #90: cached descriptor is the sole mid-txn source of truth.
549        let mut cache = lock_descriptors(&write.descriptors)?;
550        let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
551        let bytes = encode(&doc, descriptor.collection_id)?;
552        let key = id.to_be_bytes();
553        let mut tree = btree_handle(&pager, descriptor.primary_root)?;
554        // Materialise the prior value (if any) so the index-
555        // maintenance diff sees the OLD key set.
556        let old_value = match tree.get(&mut pager, &key)? {
557            Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
558            None => None,
559        };
560        let _ = tree.delete(&mut pager, &key)?;
561        tree.insert(&mut pager, &key, &bytes)?;
562        descriptor.primary_root = tree.root().get();
563        crate::index_maint::apply_doc_change::<T>(
564            &mut pager,
565            descriptor,
566            old_value.as_ref(),
567            Some(&doc),
568            id,
569        )?;
570        Ok(())
571    }
572
573    /// Look up the single document whose `index_name` key matches
574    /// `key` under a `Unique` index.
575    ///
576    /// Errors with [`Error::IndexNotUnique`] if `index_name` resolves
577    /// to a non-unique index — `find_unique` is *only* defined on
578    /// `Unique` indexes. For `Standard` / `Each` / `Composite` use
579    /// [`Self::lookup`] (which returns an iterator).
580    ///
581    /// Snapshot-aware: on a write-side handle the lookup sees the
582    /// current txn's pending writes; on a read-side handle it sees
583    /// the snapshot's frozen view.
584    ///
585    /// # Errors
586    ///
587    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
588    /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
589    /// - Pager / B-tree / codec errors propagated.
590    pub fn find_unique(
591        &self,
592        index_name: &str,
593        key: impl Into<obj_core::codec::Dynamic>,
594    ) -> Result<Option<T>> {
595        let key_dyn = key.into();
596        if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
597            return r;
598        }
599        let descriptor = self.active_index(index_name)?;
600        if descriptor.kind != obj_core::IndexKind::Unique {
601            return Err(Error::IndexNotUnique {
602                collection: self.collection_name.clone().into_owned(),
603                name: index_name.to_owned(),
604            });
605        }
606        let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
607        let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
608        match id_bytes {
609            Some(bytes) => match Id::from_be_bytes(&bytes) {
610                Some(id) => self.get(id),
611                None => Err(Error::Corruption { page_id: 0 }),
612            },
613            None => Ok(None),
614        }
615    }
616
617    /// Yield every document whose `index_name` key matches `key`.
618    /// Works on `Standard` / `Unique` / `Each` indexes. Returns
619    /// `Err(Error::IndexKindMismatch)`-style guidance for
620    /// `Composite` (use [`Self::index_range`] for tuple-shaped
621    /// keys).
622    ///
623    /// The same document is yielded at most once even if it owns
624    /// multiple matching entries — `Each` indexes can encode the
625    /// same `id` under multiple element keys; we de-dup on emit.
626    ///
627    /// # Errors
628    ///
629    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
630    /// - Pager / B-tree / codec errors propagated.
631    pub fn lookup(
632        &self,
633        index_name: &str,
634        key: impl Into<obj_core::codec::Dynamic>,
635    ) -> Result<IndexIter<'static, T>>
636    where
637        T: Send + 'static,
638    {
639        let key_dyn = key.into();
640        if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
641            return r;
642        }
643        let descriptor = self.active_index(index_name)?;
644        let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
645        let ids = match descriptor.kind {
646            obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
647            obj_core::IndexKind::Standard
648            | obj_core::IndexKind::Each
649            | obj_core::IndexKind::Composite => {
650                self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
651            }
652            // `IndexKind` is `#[non_exhaustive]`; an unknown kind means
653            // this build predates the on-disk index. Refuse rather than
654            // return a wrong (possibly empty) id set.
655            _ => return Err(Error::InvalidArgument("unsupported index kind")),
656        };
657        let resolved = self.resolve_unique_ids(ids)?;
658        Ok(Box::new(resolved.into_iter().map(Ok)))
659    }
660
661    /// Yield `(user_key, doc)` pairs whose index key falls within
662    /// `range`. The bounds are [`Dynamic`](obj_core::codec::Dynamic)
663    /// values — the same ergonomic type [`crate::Query::index_range`]
664    /// takes — encoded internally through the order-preserving field
665    /// encoder ([`obj_core::index::encode_field`]); callers no longer
666    /// hand-encode index-key bytes.
667    ///
668    /// For non-Unique kinds (`Standard` / `Each` / `Composite`) the
669    /// bounds are widened internally so a user-facing
670    /// `Included(x)..=Included(x)` range matches every entry whose
671    /// user-key equals `x` even though the underlying B-tree key
672    /// carries an `id_be8` suffix (see
673    /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
674    /// § Index key
675    /// encoding § Range-bound widening (non-Unique kinds)).
676    ///
677    /// # Errors
678    ///
679    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
680    /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
681    ///   carries an embedded NUL byte (the order-preserving encoder
682    ///   rejects those).
683    /// - Pager / B-tree / codec errors propagated.
684    pub fn index_range<R>(
685        &self,
686        index_name: &str,
687        range: R,
688    ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
689    where
690        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
691        T: Send + 'static,
692    {
693        let start = encode_dynamic_bound(range.start_bound())?;
694        let end = encode_dynamic_bound(range.end_bound())?;
695        self.index_range_encoded(index_name, start, end)
696    }
697
698    /// Encoded-bytes variant of [`Self::index_range`]. The bounds are
699    /// already the order-preserving field encoding of the user's
700    /// `Dynamic` value(s); this keeps the signature general for
701    /// `Composite` "starts-with" scans and is the entry point the
702    /// query layer / lazy-dispatch recursion call after they have
703    /// done their own encoding.
704    pub(crate) fn index_range_encoded(
705        &self,
706        index_name: &str,
707        start_bound: std::ops::Bound<Vec<u8>>,
708        end_bound: std::ops::Bound<Vec<u8>>,
709    ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
710    where
711        T: Send + 'static,
712    {
713        if let Some(r) = self.dispatch_lazy(|c| {
714            c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
715        }) {
716            return r;
717        }
718        let descriptor = self.active_index(index_name)?;
719        let (start, end) =
720            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
721        let entries = self.collect_range(descriptor, start, end)?;
722        let descriptor_kind = descriptor.kind;
723        let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
724        let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
725        for (full_key, id_bytes_value) in entries {
726            let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
727                out.push(Err(Error::Corruption { page_id: 0 }));
728                continue;
729            };
730            // For Each indexes the same doc may appear multiple
731            // times under different element keys — de-dup on
732            // emission.
733            if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
734                continue;
735            }
736            // For non-unique kinds the B-tree key includes the
737            // trailing 8-byte id suffix; strip it so the caller
738            // sees only the user-key bytes.
739            let user_key = strip_id_suffix(&full_key, descriptor_kind);
740            match self.get(id) {
741                Ok(Some(doc)) => out.push(Ok((user_key, doc))),
742                Ok(None) => {
743                    // Orphan index entry — surface as Corruption.
744                    out.push(Err(Error::Corruption { page_id: 0 }));
745                }
746                Err(e) => out.push(Err(e)),
747            }
748        }
749        Ok(Box::new(out.into_iter()))
750    }
751
752    /// Streaming variant of [`Self::index_range`] (Phase 7A perf
753    /// pass, M14 #14). Yields `(user_key, T)` pairs lazily — the
754    /// returned [`IterIndexRange`] decodes one `T` per `next()` call
755    /// rather than building a `Vec<Result<(_, T)>>` of every match
756    /// up front. The iterator borrows `&'a self`, so it must be
757    /// consumed inside the lifetime of the enclosing
758    /// [`crate::WriteTxn`] / [`crate::ReadTxn`] (or the
759    /// [`crate::Db::collection`] handle, in Lazy mode).
760    ///
761    /// # When to prefer `iter_range` over `index_range`
762    ///
763    /// - **Memory.** `index_range` allocates `O(matches × sizeof(T))`
764    ///   upfront; `iter_range` keeps a fixed-size [`VecDeque`] of
765    ///   `(key, id)` pairs (`ITER_INDEX_RANGE_BATCH = 256` entries)
766    ///   and decodes one `T` at a time. For a 100k-row range with
767    ///   ~500-byte documents that's ~50 MB peak vs. a few KiB.
768    /// - **Latency-to-first-row.** `index_range` decodes every
769    ///   matching document before returning the iterator;
770    ///   `iter_range` returns immediately after the first chunk
771    ///   refill, so the first `next()` returns after one index walk
772    ///   + one primary-tree `get` (rather than `N`).
773    ///
774    /// # When `index_range` is still the right answer
775    ///
776    /// `index_range` returns an `IndexIter<'static, _>` — it can
777    /// escape the `read_transaction` / `transaction` closure that
778    /// produced it. `iter_range` is bound to `&self`, so the
779    /// iterator dies when the [`Collection`] handle dies. If you
780    /// need to return the iterator to outer scope, stick with
781    /// `index_range`.
782    ///
783    /// # Per-row `get`-back design choice
784    ///
785    /// Each `next()` yields `(user_key, T)` by calling
786    /// [`Self::get`] under the hood — i.e. a SECOND B+tree descent
787    /// per row (the first is the index range walk; the second is
788    /// the primary-tree `get(id)`). This is intentional and
789    /// inherited from `index_range`: the index leaf stores only
790    /// the document `id` (8 bytes), not the document bytes. A
791    /// future format-minor bump may add value-in-index storage to
792    /// short-circuit the second descent; that work is pinned to
793    /// post-1.0 (tracked as pit issue #16, "value-in-index
794    /// storage to eliminate `index_range` double-decode").
795    ///
796    /// # Errors
797    ///
798    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
799    /// - Pager / B-tree / codec errors propagated at construction
800    ///   and from each `next()` call.
801    pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
802    where
803        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
804        T: Send + 'static,
805    {
806        let start_bound = encode_dynamic_bound(range.start_bound())?;
807        let end_bound = encode_dynamic_bound(range.end_bound())?;
808        self.iter_range_encoded(index_name, start_bound, end_bound)
809    }
810
811    /// Encoded-bytes variant of [`Self::iter_range`]. Bounds are the
812    /// order-preserving field encoding of the user's `Dynamic`
813    /// value(s); used internally by the lazy-mode fallback path.
814    fn iter_range_encoded<'a>(
815        &'a self,
816        index_name: &str,
817        start_bound: Bound<Vec<u8>>,
818        end_bound: Bound<Vec<u8>>,
819    ) -> Result<IterIndexRange<'a, T>>
820    where
821        T: Send + 'static,
822    {
823        // Lazy mode falls back to `index_range`'s eager materialization
824        // path so the index walk + per-row `get` share a single
825        // snapshot. Streaming refills can't preserve that — each
826        // refill would open a fresh txn and observe a different
827        // snapshot. Lazy callers who want true streaming should open
828        // an explicit `Db::read_transaction` and call `iter_range`
829        // on the bound collection there.
830        if matches!(self.mode, CollectionMode::Lazy(_)) {
831            return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
832        }
833        let descriptor = self.active_index(index_name)?;
834        // #90: in Write mode the iterator must walk the LIVE index
835        // root (the per-txn cache's advanced value) so a streaming
836        // scan opened after an in-txn index write sees its own
837        // entries. In Read mode there is no write cache, so the
838        // open-time descriptor root is authoritative.
839        let index_root = match &self.mode {
840            CollectionMode::Write(w) => {
841                self.write_index_root(w, index_name, descriptor.root_page_id)?
842            }
843            _ => descriptor.root_page_id,
844        };
845        let (start, end) =
846            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
847        // Stash the resumption marker as `Excluded(start)` only on
848        // the first refill; afterwards the iterator overwrites it
849        // with the last full_key it emitted. The initial `start`
850        // bound is honoured by `refill` via the same `last_full_key`
851        // → `Excluded(_)` translation.
852        let initial_resume = match start {
853            Bound::Included(k) => InitialResume::Included(k),
854            Bound::Excluded(k) => InitialResume::Excluded(k),
855            Bound::Unbounded => InitialResume::Unbounded,
856        };
857        Ok(IterIndexRange {
858            coll: self,
859            descriptor_kind: descriptor.kind,
860            index_root,
861            initial_resume: Some(initial_resume),
862            last_full_key: None,
863            end_bound: end,
864            buffer: VecDeque::new(),
865            emitted_ids: HashSet::new(),
866            finished: false,
867        })
868    }
869
870    /// Lazy-mode fallback for [`Self::iter_range`]: delegates to
871    /// [`Self::index_range`] (which itself dispatches through a fresh
872    /// read txn) and rehouses the eagerly-materialised entries into
873    /// the streaming iterator's buffer as
874    /// [`StagedEntry::Resolved`]. Power-of-ten Rule 4: keeping this
875    /// isolated so the streaming path's `iter_range` body stays
876    /// small.
877    fn iter_range_lazy_fallback<'a>(
878        &'a self,
879        index_name: &str,
880        start_bound: Bound<Vec<u8>>,
881        end_bound: Bound<Vec<u8>>,
882    ) -> Result<IterIndexRange<'a, T>>
883    where
884        T: Send + 'static,
885    {
886        let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
887        let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
888        for item in materialized {
889            match item {
890                Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
891                Err(e) => buffer.push_back(Err(e)),
892            }
893        }
894        Ok(IterIndexRange {
895            coll: self,
896            descriptor_kind: obj_core::IndexKind::Standard,
897            index_root: 0,
898            initial_resume: None,
899            last_full_key: None,
900            end_bound: Bound::Unbounded,
901            buffer,
902            emitted_ids: HashSet::new(),
903            finished: true,
904        })
905    }
906
907    /// Look up the `IndexKind` of an active index by name. Used by
908    /// the M8 query layer to dispatch `Query::count` between the
909    /// entry-count and distinct-id-count paths (M8 follow-up #72).
910    ///
911    /// # Errors
912    ///
913    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
914    pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
915        Ok(self.active_index(index_name)?.kind)
916    }
917
918    /// Resolve `index_name` to an `Active` `IndexDescriptor` on the
919    /// collection. Errors with [`Error::IndexNotFound`] if absent
920    /// or `DroppedPending`.
921    ///
922    /// Returns a borrow into `self.descriptor.indexes` — no per-lookup
923    /// clone (#84). Every caller uses the descriptor only within the
924    /// enclosing `&self` borrow; `iter_range_encoded` copies the two
925    /// `Copy` fields it needs (`kind`, `root_page_id`) into the
926    /// returned iterator rather than holding the borrow.
927    fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
928        let entry = self
929            .descriptor
930            .indexes
931            .iter()
932            .find(|d| d.name == index_name);
933        match entry {
934            Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
935            _ => Err(Error::IndexNotFound {
936                collection: self.collection_name.clone().into_owned(),
937                name: index_name.to_owned(),
938            }),
939        }
940    }
941
942    /// Single-key `get` on an index B-tree. Used by `find_unique`
943    /// and by the Unique-kind branch of `lookup`.
944    fn index_get(
945        &self,
946        descriptor: &obj_core::IndexDescriptor,
947        key: &[u8],
948    ) -> Result<Option<Vec<u8>>> {
949        match &self.mode {
950            CollectionMode::Write(write) => {
951                // #90: descend the LIVE index root from the per-txn
952                // cache so an index read after an index-mutating write
953                // in the SAME txn observes its own entries.
954                let root_raw =
955                    self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
956                let root = PageId::new(root_raw)
957                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
958                let mut pager = lock_pager(write.env)?;
959                let tree = BTree::<FileHandle>::open(&pager, root)?;
960                tree.get(&mut pager, key)
961            }
962            CollectionMode::Read(read) => {
963                let root = PageId::new(descriptor.root_page_id)
964                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
965                let pager = lock_pager(read.env)?;
966                BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
967            }
968            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
969                operation: "internal: lazy-mode index_get",
970            }),
971        }
972    }
973
974    /// Collect every `(full_key, value)` entry from an index B-tree
975    /// whose key starts with `prefix`. For unique kinds the prefix
976    /// is the entire key (one match max); for non-unique kinds we
977    /// match every key whose first `prefix.len()` bytes equal
978    /// `prefix` (the trailing `id_suffix` varies per doc).
979    fn collect_nonunique_equal(
980        &self,
981        descriptor: &obj_core::IndexDescriptor,
982        prefix: &[u8],
983    ) -> Result<Vec<u64>> {
984        let entries = self.collect_range(
985            descriptor,
986            std::ops::Bound::Included(prefix.to_vec()),
987            // Upper bound is `prefix || 0xFF..` — every key whose
988            // user-portion equals `prefix` falls in
989            // `[prefix, prefix || u64::MAX]`.
990            std::ops::Bound::Included(append_max_id(prefix)),
991        )?;
992        let mut ids = Vec::with_capacity(entries.len());
993        let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
994        for (_full_key, value) in entries {
995            let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
996            if emitted.insert(id.get()) {
997                ids.push(id.get());
998            }
999        }
1000        Ok(ids)
1001    }
1002
1003    /// Collect a single id from a Unique index B-tree at `key`.
1004    fn collect_unique(
1005        &self,
1006        descriptor: &obj_core::IndexDescriptor,
1007        key: &[u8],
1008    ) -> Result<Vec<u64>> {
1009        match self.index_get(descriptor, key)? {
1010            Some(bytes) => Id::from_be_bytes(&bytes)
1011                .map(|id| vec![id.get()])
1012                .ok_or(Error::Corruption { page_id: 0 }),
1013            None => Ok(Vec::new()),
1014        }
1015    }
1016
1017    /// Collect every `(full_key, value)` entry from an index B-tree
1018    /// whose key falls within `(start, end)`.
1019    fn collect_range(
1020        &self,
1021        descriptor: &obj_core::IndexDescriptor,
1022        start: std::ops::Bound<Vec<u8>>,
1023        end: std::ops::Bound<Vec<u8>>,
1024    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1025        // Read mode pins every page read to the txn's snapshot (M12
1026        // #12): a concurrent writer's post-snapshot index entries must
1027        // not leak into a read txn's range/count. Write mode keeps the
1028        // live-pager scan so the txn observes its own uncommitted
1029        // index writes (it has no snapshot to pin against).
1030        match &self.mode {
1031            CollectionMode::Read(r) => {
1032                let root = PageId::new(descriptor.root_page_id)
1033                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1034                let pager = lock_pager(r.env)?;
1035                let iter = BTree::<FileHandle>::range_via_snapshot(
1036                    &pager,
1037                    r.snapshot,
1038                    root,
1039                    (start, end),
1040                )?;
1041                let mut out = Vec::new();
1042                for step in iter {
1043                    out.push(step?);
1044                }
1045                Ok(out)
1046            }
1047            CollectionMode::Write(w) => {
1048                // #90: scan the LIVE index root from the per-txn cache
1049                // so an in-txn index scan sees its own uncommitted
1050                // entries (the open-time descriptor root may be stale).
1051                let root_raw =
1052                    self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
1053                let root = PageId::new(root_raw)
1054                    .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1055                let mut pager = lock_pager(w.env)?;
1056                let tree = BTree::<FileHandle>::open(&pager, root)?;
1057                let iter = tree.range(&mut pager, (start, end))?;
1058                let mut out = Vec::new();
1059                for step in iter {
1060                    out.push(step?);
1061                }
1062                Ok(out)
1063            }
1064            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1065                operation: "internal: lazy-mode collect_range",
1066            }),
1067        }
1068    }
1069
1070    /// Resolve a `Vec<u64>` of `Id` integer values into concrete
1071    /// `T` documents via `self.get`. Preserves order; missing rows
1072    /// surface as `Error::Corruption` (orphan index entry).
1073    fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
1074        let mut out = Vec::with_capacity(ids.len());
1075        for raw in ids {
1076            let id =
1077                Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
1078            let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
1079            out.push(doc);
1080        }
1081        Ok(out)
1082    }
1083
1084    /// Count every entry in the primary tree WITHOUT decoding the
1085    /// documents. Used by the M8 [`crate::Query::count`] no-decode
1086    /// fast path; the iterator visits leaf pages and counts entries
1087    /// rather than running each through postcard.
1088    ///
1089    /// Power-of-ten Rule 2: bounded by the B+tree's `MAX_RANGE_NODES`
1090    /// budget (inherited from `BTree::range`).
1091    ///
1092    /// # Errors
1093    ///
1094    /// Pager / B-tree errors propagated.
1095    pub fn count_all(&self) -> Result<u64> {
1096        // Closure (rather than `Collection::count_all`) so the
1097        // higher-ranked lifetime on `dispatch_lazy`'s
1098        // `for<'a> FnOnce(&Collection<'a, T>) -> _` bound is
1099        // satisfied — `Collection::count_all` resolves to a single
1100        // lifetime fn-item type, which fails the HRTB check.
1101        #[allow(clippy::redundant_closure_for_method_calls)]
1102        if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
1103            return r;
1104        }
1105        // Read mode pins the full-tree scan to the txn's snapshot
1106        // (M12 #12) so a concurrent writer's post-snapshot inserts
1107        // cannot perturb the count; write mode keeps the live scan so
1108        // the txn sees its own uncommitted writes.
1109        match &self.mode {
1110            CollectionMode::Read(r) => {
1111                let pager = lock_pager(r.env)?;
1112                let pid = PageId::new(self.descriptor.primary_root)
1113                    .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1114                let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
1115                count_range_iter(iter)
1116            }
1117            CollectionMode::Write(w) => {
1118                // #90: count the LIVE primary root so an in-txn count
1119                // reflects this txn's own uncommitted inserts/deletes.
1120                let root = self.write_primary_root(w)?;
1121                let mut pager = lock_pager(w.env)?;
1122                let tree = btree_handle(&pager, root)?;
1123                let iter = tree.range(&mut pager, ..)?;
1124                count_range_iter(iter)
1125            }
1126            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1127                operation: "internal: lazy-mode count_all",
1128            }),
1129        }
1130    }
1131
1132    /// Count every entry whose encoded key falls inside `range` on
1133    /// the named index's B-tree, WITHOUT decoding any document. M8
1134    /// fast path for [`crate::Query::count`] when the source is an
1135    /// `index_range`.
1136    ///
1137    /// Returns the number of index B-tree entries — for an `Each`
1138    /// index that may exceed the document count (one doc emits
1139    /// multiple entries); for other kinds it equals the matching
1140    /// doc count.
1141    ///
1142    /// # Errors
1143    ///
1144    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1145    /// - Pager / B-tree errors propagated.
1146    pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1147    where
1148        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1149    {
1150        let start = encode_dynamic_bound(range.start_bound())?;
1151        let end = encode_dynamic_bound(range.end_bound())?;
1152        self.count_index_range_encoded(index_name, start, end)
1153    }
1154
1155    /// Encoded-bytes variant of [`Self::count_index_range`]. Bounds
1156    /// are the order-preserving field encoding of the user's
1157    /// `Dynamic` value(s); used by the query-layer count fast path.
1158    pub(crate) fn count_index_range_encoded(
1159        &self,
1160        index_name: &str,
1161        start_bound: std::ops::Bound<Vec<u8>>,
1162        end_bound: std::ops::Bound<Vec<u8>>,
1163    ) -> Result<u64> {
1164        if let Some(r) = self.dispatch_lazy(|c| {
1165            c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
1166        }) {
1167            return r;
1168        }
1169        let descriptor = self.active_index(index_name)?;
1170        let (start, end) =
1171            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1172        let entries = self.collect_range(descriptor, start, end)?;
1173        // `entries.len()` is a `usize`; promote to `u64` carefully.
1174        // `usize` is at most 64 bits on every supported target.
1175        u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1176            reason: "index range entry count exceeds u64",
1177        })
1178    }
1179
1180    /// Count distinct document `Id`s whose entries fall inside
1181    /// `range` on the named index's B-tree, WITHOUT decoding any
1182    /// document. For `Each` indexes this is the correct shape of
1183    /// the "how many docs match" question — `count_index_range`
1184    /// returns the entry count, which overshoots when a single doc
1185    /// contributes multiple entries.
1186    ///
1187    /// Implementation walks the index B-tree, parses the trailing
1188    /// 8-byte big-endian `Id` suffix from each non-unique key, and
1189    /// tracks the unique set in a bounded [`std::collections::HashSet`]
1190    /// capped at [`MAX_DISTINCT_IDS`]. Exceeding the cap surfaces
1191    /// [`Error::DistinctCountExceeded`] — the caller should narrow
1192    /// the range.
1193    ///
1194    /// # Per-kind semantics
1195    ///
1196    /// - `Standard`, `Composite`: equivalent to `count_index_range`
1197    ///   (one entry per doc by construction; the trailing-id-suffix
1198    ///   walk still produces the same total).
1199    /// - `Unique`: keys carry NO id suffix — the entry value is the
1200    ///   raw 8-byte `Id`; the walk reads the value instead.
1201    /// - `Each`: the dedup is meaningful — one doc may contribute
1202    ///   N entries under N distinct element keys.
1203    ///
1204    /// # Errors
1205    ///
1206    /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1207    /// - [`Error::DistinctCountExceeded`] if the distinct set
1208    ///   exceeds [`MAX_DISTINCT_IDS`].
1209    /// - [`Error::Corruption`] if an entry's id suffix / value is
1210    ///   not parseable as an [`obj_core::Id`].
1211    /// - Pager / B-tree errors propagated.
1212    pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1213    where
1214        R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1215    {
1216        let start = encode_dynamic_bound(range.start_bound())?;
1217        let end = encode_dynamic_bound(range.end_bound())?;
1218        self.count_distinct_ids_in_range_encoded(index_name, start, end)
1219    }
1220
1221    /// Encoded-bytes variant of [`Self::count_distinct_ids_in_range`].
1222    /// Bounds are the order-preserving field encoding of the user's
1223    /// `Dynamic` value(s); used by the query-layer count fast path.
1224    pub(crate) fn count_distinct_ids_in_range_encoded(
1225        &self,
1226        index_name: &str,
1227        start_bound: std::ops::Bound<Vec<u8>>,
1228        end_bound: std::ops::Bound<Vec<u8>>,
1229    ) -> Result<u64> {
1230        if let Some(r) = self.dispatch_lazy(|c| {
1231            c.count_distinct_ids_in_range_encoded(
1232                index_name,
1233                start_bound.clone(),
1234                end_bound.clone(),
1235            )
1236        }) {
1237            return r;
1238        }
1239        let descriptor = self.active_index(index_name)?;
1240        let (start, end) =
1241            crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1242        let entries = self.collect_range(descriptor, start, end)?;
1243        let mut distinct: HashSet<u64> = HashSet::new();
1244        for (full_key, value) in entries {
1245            let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
1246            if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
1247                return Err(Error::DistinctCountExceeded {
1248                    limit: MAX_DISTINCT_IDS,
1249                });
1250            }
1251        }
1252        // `distinct.len()` is a `usize`; promote to `u64` carefully.
1253        // `usize` is at most 64 bits on every supported target.
1254        u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
1255            reason: "distinct id count exceeds u64",
1256        })
1257    }
1258
1259    /// Materialise every `(Id, T)` pair in the collection.
1260    ///
1261    /// Implementation note: M6 returns an owned `Vec` rather than a
1262    /// streaming iterator because the B+tree range API borrows the
1263    /// pager, and threading that borrow through the mutex guards
1264    /// in the iterator chain is awkward.  M7+ may convert to a
1265    /// streaming shape once the index API is in place.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Pager / B-tree / codec errors propagated.
1270    pub fn all(&self) -> Result<Vec<(Id, T)>> {
1271        // Closure (rather than `Collection::all`) for the same
1272        // HRTB reason documented on `count_all` above.
1273        #[allow(clippy::redundant_closure_for_method_calls)]
1274        if let Some(r) = self.dispatch_lazy(|c| c.all()) {
1275            return r;
1276        }
1277        match &self.mode {
1278            CollectionMode::Write(write) => {
1279                // #90: scan the LIVE primary root so an in-txn `all()`
1280                // includes this txn's own uncommitted inserts.
1281                let root = self.write_primary_root(write)?;
1282                let mut pager = lock_pager(write.env)?;
1283                scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
1284            }
1285            CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
1286                read.snapshot,
1287                read.env,
1288                self.descriptor.primary_root,
1289                self.descriptor.collection_id,
1290            ),
1291            CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1292                operation: "internal: lazy-mode all",
1293            }),
1294        }
1295    }
1296
1297    fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
1298        match &self.mode {
1299            CollectionMode::Write(w) => Ok(w),
1300            CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
1301                Err(Error::ReadOnly { operation: op })
1302            }
1303        }
1304    }
1305
1306    /// If this handle is in `Lazy` mode, dispatch `body` through a
1307    /// fresh read transaction on the bound [`crate::Db`] — opening a
1308    /// transient [`Collection`] via [`Self::open_readonly_named`] and
1309    /// invoking the user-supplied closure on it. Returns `Some(_)`
1310    /// when the dispatch fires (the closure ran or the underlying
1311    /// open failed); returns `None` for non-Lazy handles so the
1312    /// caller can fall through to the existing logic.
1313    ///
1314    /// Power-of-ten Rule 4: keeps each public method's body small —
1315    /// the dispatch shim is one match arm instead of a per-method
1316    /// `if let CollectionMode::Lazy(_)` ladder.
1317    fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
1318    where
1319        F: FnOnce(&Collection<'_, T>) -> Result<R>,
1320    {
1321        match &self.mode {
1322            CollectionMode::Lazy(LazyRef { db }) => {
1323                // Clone the stored name into an owned `Cow` for the
1324                // transient handle opened inside the private read txn.
1325                // `into_owned` keeps the `'static` Cow bound the
1326                // `open_readonly_named` signature requires regardless
1327                // of whether the stored name was Borrowed or Owned.
1328                let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
1329                Some(db.read_transaction(move |tx| {
1330                    let coll = Collection::<T>::open_readonly_named(tx, name)?;
1331                    body(&coll)
1332                }))
1333            }
1334            _ => None,
1335        }
1336    }
1337}
1338
1339// ---------- internal helpers --------------------------------------
1340
1341fn lock_pager(
1342    env: &obj_core::TxnEnv<FileHandle>,
1343) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1344    env.pager().lock().map_err(|_| Error::Busy {
1345        kind: obj_core::LockKind::WriterInProcess,
1346    })
1347}
1348
1349/// Ensure `T::COLLECTION` exists in the catalog, lazy-creating an
1350/// empty primary B-tree on first call.  Used on the write side.
1351fn ensure_collection<T: Document>(
1352    inner: &obj_core::WriteTxn<'_, FileHandle>,
1353    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1354) -> Result<CollectionDescriptor> {
1355    let mut pager = inner.lock_pager()?;
1356    let mut catalog_guard = lock_catalog(catalog)?;
1357    if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
1358        return Ok(d);
1359    }
1360    let tree = BTree::<FileHandle>::empty(&mut pager)?;
1361    let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
1362    let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
1363    catalog_guard
1364        .get(&mut pager, T::COLLECTION)?
1365        .ok_or(Error::Corruption { page_id: 0 })
1366}
1367
1368/// Re-read the descriptor for `T::COLLECTION` after any catalog
1369/// mutation. Used by [`Collection::open_or_create`] after the
1370/// reconciler runs.
1371fn reread_descriptor<T: Document>(
1372    inner: &obj_core::WriteTxn<'_, FileHandle>,
1373    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1374) -> Result<CollectionDescriptor> {
1375    let mut pager = inner.lock_pager()?;
1376    let catalog_guard = lock_catalog(catalog)?;
1377    catalog_guard
1378        .get(&mut pager, T::COLLECTION)?
1379        .ok_or(Error::Corruption { page_id: 0 })
1380}
1381
1382/// Reconcile `T::indexes()` against the catalog's stored descriptors
1383/// on the FIRST call per process per `(collection, version)`.
1384/// Subsequent calls for the same `(collection, version)` observe the
1385/// cache hit (in the shared `reconciled` set OR this txn's `staged`
1386/// set) and skip the catalog walk.
1387///
1388/// Reconciliation runs inside the user's WAL transaction so a
1389/// rolled-back txn leaves the catalog clean. If reconciliation
1390/// fails (e.g. `Error::IndexKindMismatch`), neither set is populated
1391/// so the next attempt re-runs the reconciler.
1392///
1393/// #93 — the skip-check is `shared ∪ staged`: a second handle of the
1394/// same `(collection, version)` within ONE txn still skips the
1395/// (idempotent) catalog walk via `staged`, but the key is recorded in
1396/// the per-txn `staged` set, NOT the shared set.
1397/// [`crate::WriteTxn::commit`] promotes the staged keys into the shared
1398/// set only after the WAL commit lands — so a rolled-back lazy-create
1399/// can never poison the shared cache into skipping reconciliation on a
1400/// later txn (the original bug).
1401fn reconcile_indexes_once<T: Document>(
1402    inner: &obj_core::WriteTxn<'_, FileHandle>,
1403    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1404    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1405    staged: &mut HashSet<(String, u32)>,
1406) -> Result<()> {
1407    // The generic `#[derive(Document)]` path is exactly the non-generic
1408    // raw seam ([`reconcile_specs_once`]) with the collection name,
1409    // version, and spec list supplied from the compile-time `T` rather
1410    // than from a caller argument. Both share ONE body so the cache /
1411    // staging / validation / catalog-walk semantics can never drift
1412    // apart (#108).
1413    reconcile_specs_once(
1414        inner,
1415        catalog,
1416        reconciled,
1417        staged,
1418        T::COLLECTION,
1419        T::VERSION,
1420        &T::indexes(),
1421    )
1422}
1423
1424/// Non-generic core of index reconciliation, shared by the generic
1425/// `#[derive(Document)]` path ([`reconcile_indexes_once`]) and the
1426/// public raw seam ([`crate::WriteTxn::reconcile_indexes_raw`], #108).
1427///
1428/// Reconciles `specs` against the catalog's stored descriptors for
1429/// `collection` on the FIRST call per process per `(collection,
1430/// version)`, honoring the same `shared ∪ staged` skip-cache (keyed by
1431/// `(collection, version)` — #130), per-txn staging, and pre-mutation
1432/// validation as the generic path — the only difference is that the
1433/// collection name, version, and spec list are arguments rather than
1434/// `T::COLLECTION` / `T::VERSION` / `T::indexes()`.
1435///
1436/// # Why the cache key includes `version` (#130)
1437///
1438/// `Catalog::reconcile_indexes` is a FULL reconcile: it declares specs
1439/// missing from the catalog AND drops `Active` descriptors absent from
1440/// `specs`. Keying the skip-cache by `collection` ALONE meant that once
1441/// a process reconciled a collection at one schema version, a LATER
1442/// version in the same process that ADDED a new index never reconciled
1443/// — the added index never became `Active` and index-maintaining writes
1444/// failed with `IndexNotFound`. Keying by `(collection, version)`
1445/// reconciles each version exactly once: the common single-version case
1446/// is unchanged (one key, reconciled once), and a cross-version index
1447/// ADD reconciles the new version's specs on its first write.
1448///
1449/// ## Caveat — conflicting index REMOVAL interleaved across versions
1450///
1451/// Because each `(collection, version)` reconciles independently and
1452/// `reconcile_indexes` drops `Active` indexes absent from the version's
1453/// specs, alternating writes between two live versions of the SAME
1454/// collection in ONE process — where the versions declare DIFFERENT
1455/// index sets — can leave the catalog reflecting whichever version
1456/// reconciled most recently (its specs drive the drop set). Index
1457/// ADDITION (the common monotonic schema-evolution case) is fully
1458/// correct. This is strictly better than the prior behavior, which
1459/// never reconciled the second version at all (so its added index was
1460/// simply missing); the removal-interleaving edge is a narrow,
1461/// single-process anti-pattern.
1462///
1463/// The caller MUST have ensured `collection` exists in the catalog
1464/// (the generic path runs [`ensure_collection`] first; the raw seam
1465/// runs [`crate::txn::ensure_collection_raw`]) — `reconcile_indexes`
1466/// errors with `CollectionNotFound` otherwise.
1467pub(crate) fn reconcile_specs_once(
1468    inner: &obj_core::WriteTxn<'_, FileHandle>,
1469    catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1470    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1471    staged: &mut HashSet<(String, u32)>,
1472    collection: &str,
1473    version: u32,
1474    specs: &[obj_core::IndexSpec],
1475) -> Result<()> {
1476    // Cache key is `(collection, version)` (#130): each schema version
1477    // reconciles its OWN spec set exactly once per process.
1478    let key = (collection.to_owned(), version);
1479    // Fast path: already reconciled in a prior committed txn (shared)
1480    // or earlier in THIS txn (staged) — skip the catalog walk. Probe
1481    // `staged` first: it needs no lock and covers the repeat-handle
1482    // case; the shared probe takes the mutex only when `staged` misses.
1483    if staged.contains(&key) {
1484        return Ok(());
1485    }
1486    {
1487        let cache = lock_reconciled(reconciled)?;
1488        if cache.contains(&key) {
1489            return Ok(());
1490        }
1491    }
1492    // Validate specs before any catalog mutation so a bad spec
1493    // does not leave a half-reconciled catalog.
1494    for spec in specs {
1495        spec.validate()?;
1496    }
1497    {
1498        let mut pager = inner.lock_pager()?;
1499        let mut catalog_guard = lock_catalog(catalog)?;
1500        let _post = catalog_guard.reconcile_indexes(&mut pager, collection, specs)?;
1501    }
1502    // #93: stage in the PER-TXN set, not the shared one. Promotion to
1503    // the shared set is deferred to a successful `WriteTxn::commit`.
1504    staged.insert(key);
1505    Ok(())
1506}
1507
1508fn lock_reconciled(
1509    reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1510) -> Result<std::sync::MutexGuard<'_, HashSet<(String, u32)>>> {
1511    reconciled.lock().map_err(|_| Error::Busy {
1512        kind: obj_core::LockKind::WriterInProcess,
1513    })
1514}
1515
1516/// Read-side descriptor lookup against a caller-supplied
1517/// collection name. M11 #93 introduced this byte-shape so the
1518/// namespace-aware [`Collection::open_readonly`] can perform the
1519/// catalog walk against either the calling Db's snapshot (no
1520/// namespace) or an attached Db's snapshot (with the namespace
1521/// prefix stripped).
1522fn read_descriptor_via_snapshot_named(
1523    env: &obj_core::TxnEnv<FileHandle>,
1524    snapshot: &obj_core::ReaderSnapshot<FileHandle>,
1525    name: &str,
1526) -> Result<Option<CollectionDescriptor>> {
1527    let pager = lock_pager(env)?;
1528    Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1529}
1530
1531/// #83 (b): fused one-shot point read for [`crate::Db::get`].
1532///
1533/// Resolves the collection descriptor and the primary-tree value for
1534/// `id` under a SINGLE pager-mutex acquisition (see
1535/// [`snapshot_resolve_and_get`]), then decodes the bytes against the
1536/// resolved `collection_id`. This collapses the two back-to-back
1537/// pager locks the equivalent handle path pays (one to open the
1538/// handle / resolve the descriptor, one for the `get`).
1539///
1540/// Observably identical to
1541/// `tx.collection::<T>()?.get(id)` for the one-shot caller: the
1542/// descriptor lookup and the value get run on the same `ReadTxn`
1543/// snapshot, and a missing collection still surfaces as
1544/// [`Error::CollectionNotFound`]. Namespaced reads (`<ns>.<tail>`)
1545/// fall back to the handle path so the attached-snapshot dispatch is
1546/// unchanged.
1547pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
1548    let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
1549    if namespace.is_some() {
1550        // Namespaced: keep the existing attached-snapshot dispatch.
1551        return Collection::<T>::open_readonly(tx)?.get(id);
1552    }
1553    let key = id.to_be_bytes();
1554    let resolved =
1555        snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
1556    match resolved {
1557        Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
1558        None => Ok(None),
1559    }
1560}
1561
1562/// Re-read the descriptor inside an already-locked pager + catalog
1563/// pair.  Surfaces a missing collection as `Error::Corruption`
1564/// because the caller has already opened a write txn against it.
1565fn catalog_get_required(
1566    pager: &mut Pager<FileHandle>,
1567    catalog: &Catalog<FileHandle>,
1568    name: &str,
1569) -> Result<CollectionDescriptor> {
1570    catalog
1571        .get(pager, name)?
1572        .ok_or(Error::Corruption { page_id: 0 })
1573}
1574
1575fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1576    let root_pid =
1577        PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1578    BTree::<FileHandle>::open(pager, root_pid)
1579}
1580
1581/// Drain a B-tree range iterator, counting entries WITHOUT retaining
1582/// their bytes. Shared by [`Collection::count_all`]'s live (write) and
1583/// snapshot-pinned (read) scan arms. Power-of-ten Rule 2: the
1584/// iterator carries its own `MAX_RANGE_NODES` budget; the `u64`
1585/// overflow check guards the count itself.
1586fn count_range_iter<I>(iter: I) -> Result<u64>
1587where
1588    I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
1589{
1590    let mut n: u64 = 0;
1591    for step in iter {
1592        // Probe-only: drop the bytes the moment they decode.
1593        let _ = step?;
1594        n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1595            reason: "primary tree entry count exceeds u64",
1596        })?;
1597    }
1598    Ok(n)
1599}
1600
1601// `persist_root` removed in M7 #58: every mutating method now
1602// routes through `index_maint::apply_doc_change`, which persists
1603// the descriptor (including the possibly-advanced `primary_root`)
1604// via `Catalog::update` after every per-index B-tree mutation.
1605
1606/// Snapshot-consistent B-tree lookup (M6 #53).
1607///
1608/// Walks the primary B+tree rooted at `primary_root` using
1609/// [`obj_core::btree::BTree::get_via_snapshot`], which descends
1610/// through [`obj_core::ReaderSnapshot::read_page`] rather than the
1611/// live `Pager::read_page`. This bypasses the WAL `state.view` /
1612/// `state.pending` overlays — a concurrent writer's post-snapshot
1613/// COW commits cannot poison the reader's walk.
1614///
1615/// `primary_root` MUST be the descriptor's `primary_root` as-of
1616/// the snapshot's pinned LSN (i.e. the value read via
1617/// [`obj_core::Catalog::lookup_via_snapshot`] in
1618/// [`read_descriptor_via_snapshot`] above). Using the writer's
1619/// live `primary_root` would defeat the snapshot read.
1620fn snapshot_get_via_btree(
1621    snap: &obj_core::ReaderSnapshot<FileHandle>,
1622    env: &obj_core::TxnEnv<FileHandle>,
1623    primary_root: u64,
1624    key: &[u8],
1625) -> Result<Option<Vec<u8>>> {
1626    let pager = lock_pager(env)?;
1627    let root_pid = PageId::new(primary_root)
1628        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1629    obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
1630}
1631
1632/// #83 (b): fused single-lock read. Resolves the descriptor for
1633/// `name` and performs the primary-tree `get` for `key` under ONE
1634/// pager-mutex acquisition, against the SAME `snapshot` — collapsing
1635/// the descriptor-lookup lock (`read_descriptor_via_snapshot_named`)
1636/// and the value-get lock (`snapshot_get_via_btree`) that the
1637/// two-call handle path pays back-to-back.
1638///
1639/// Returns `(descriptor, value)` so the caller can `decode` against
1640/// the resolved `collection_id`. A missing collection surfaces as
1641/// `Err(CollectionNotFound)` (matching the handle path's open-time
1642/// contract for the one-shot caller); a present collection with no
1643/// entry for `key` surfaces as `Ok(None)`.
1644///
1645/// Power-of-ten: keeps poison → `Error::Busy` (Rule 7, via
1646/// `lock_pager`); ≤ 60 lines (Rule 4); `debug_assert`s that the
1647/// snapshot-resolved `primary_root` is the one fed to the get
1648/// (Rule 5).
1649fn snapshot_resolve_and_get(
1650    snap: &obj_core::ReaderSnapshot<FileHandle>,
1651    env: &obj_core::TxnEnv<FileHandle>,
1652    name: &str,
1653    key: &[u8],
1654) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
1655    let pager = lock_pager(env)?;
1656    let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
1657        return Err(Error::CollectionNotFound {
1658            name: name.to_owned(),
1659        });
1660    };
1661    let root_pid = PageId::new(descriptor.primary_root)
1662        .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1663    // Rule 5: the value-get MUST descend the descriptor's own
1664    // snapshot-time root — never a re-resolved or live root. A live
1665    // primary B-tree root is always page 1+, so a zero `collection_id`
1666    // here would mean we resolved a degenerate catalog row.
1667    debug_assert_eq!(
1668        root_pid.get(),
1669        descriptor.primary_root,
1670        "fused get must descend the snapshot-resolved primary_root",
1671    );
1672    let value =
1673        obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
1674    Ok(value.map(|v| (descriptor, v)))
1675}
1676
1677fn scan_all<T: Document>(
1678    pager: &mut Pager<FileHandle>,
1679    primary_root: u64,
1680    collection_id: u32,
1681) -> Result<Vec<(Id, T)>> {
1682    let tree = btree_handle(pager, primary_root)?;
1683    let iter = tree.range(pager, ..)?;
1684    let mut out = Vec::new();
1685    for entry in iter {
1686        let (key, value) = entry?;
1687        let id = Id::from_be_bytes(&key)
1688            .ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
1689        let doc = decode::<T>(&value, collection_id)?;
1690        out.push((id, doc));
1691    }
1692    Ok(out)
1693}
1694
1695fn snapshot_scan_via_btree<T: Document>(
1696    _snap: &obj_core::ReaderSnapshot<FileHandle>,
1697    env: &obj_core::TxnEnv<FileHandle>,
1698    primary_root: u64,
1699    collection_id: u32,
1700) -> Result<Vec<(Id, T)>> {
1701    let mut pager = lock_pager(env)?;
1702    scan_all::<T>(&mut pager, primary_root, collection_id)
1703}
1704
1705/// Encode the caller-supplied `Dynamic` value(s) into the bytes a
1706/// lookup against `descriptor` would use as a B-tree key. For
1707/// `Unique` indexes the result is the key bytes verbatim; for
1708/// non-unique kinds the lookup helpers extend with the per-doc
1709/// id suffix at scan time.
1710fn index_key_for_lookup(
1711    descriptor: &obj_core::IndexDescriptor,
1712    fields: &[obj_core::codec::Dynamic],
1713) -> Result<obj_core::index::EncodedIndexKey> {
1714    // Ref-based encode (#84): pass the descriptor's `kind` (Copy) and
1715    // `key_paths` BY REFERENCE — no transient `IndexSpec`, no
1716    // `name`/`key_paths` clone, no redundant `IndexSpec::validate` on
1717    // an already-validated on-disk descriptor. The byte output is
1718    // identical to the old `from_parts` + `encode_index_key` path
1719    // (see `encode_index_key_parts` and the byte-identity test).
1720    obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
1721}
1722
1723/// Encode a `Bound<&Dynamic>` into the index-key `Bound<Vec<u8>>` the
1724/// B-tree scan uses. Shared by the `Dynamic`-taking range methods on
1725/// [`Collection`].
1726///
1727/// A scalar `Dynamic` is encoded with the order-preserving field
1728/// encoder ([`obj_core::index::encode_field`]) — byte-identical to
1729/// what [`crate::Query::index_range`] produces, so a query and a
1730/// direct collection scan over the same scalar bound observe the
1731/// same entries. A [`Dynamic::Seq`](obj_core::codec::Dynamic::Seq)
1732/// bound is encoded as a composite key (the
1733/// [`COMPOSITE_TAG`](obj_core::index::COMPOSITE_TAG)-prefixed
1734/// concatenation of each element's field encoding) so a `Composite`
1735/// index can be range-scanned by a full tuple bound.
1736fn encode_dynamic_bound(
1737    b: std::ops::Bound<&obj_core::codec::Dynamic>,
1738) -> Result<std::ops::Bound<Vec<u8>>> {
1739    match b {
1740        std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
1741        std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
1742        std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
1743    }
1744}
1745
1746/// Encode one `Dynamic` bound value into index-key bytes. Scalars go
1747/// through [`obj_core::index::encode_field`]; a `Seq` is encoded as a
1748/// composite tuple key. Power-of-ten Rule 4: kept separate so
1749/// [`encode_dynamic_bound`] stays a thin three-arm match.
1750fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
1751    match v {
1752        obj_core::codec::Dynamic::Seq(fields) => {
1753            // `COMPOSITE_TAG || encode_field(f0) || encode_field(f1) ..`
1754            // is byte-identical to `encode_index_key`'s composite path
1755            // for the same fields — see `obj_core::index::key`.
1756            let mut out = vec![obj_core::index::COMPOSITE_TAG];
1757            for f in fields {
1758                out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
1759            }
1760            Ok(out)
1761        }
1762        _ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
1763    }
1764}
1765
1766/// Append 8 `0xFF` bytes to `prefix`. Used as the exclusive upper
1767/// bound of an equality lookup against a non-unique index: every
1768/// key with the same user-prefix is ≤ `prefix || 0xFF..` because
1769/// the trailing 8 bytes are an `Id` (`u64` BE).
1770fn append_max_id(prefix: &[u8]) -> Vec<u8> {
1771    let mut out = Vec::with_capacity(prefix.len() + 8);
1772    out.extend_from_slice(prefix);
1773    out.extend_from_slice(&u64::MAX.to_be_bytes());
1774    out
1775}
1776
1777/// Trim the trailing 8-byte id suffix off a non-unique index key.
1778/// For `Unique` keys the suffix is absent, so the full key is the
1779/// user portion.
1780fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
1781    match kind {
1782        obj_core::IndexKind::Unique => full_key.to_vec(),
1783        _ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
1784        _ => full_key.to_vec(),
1785    }
1786}
1787
1788/// Recover the `Id` (as a `u64`) from one index B-tree entry. For
1789/// non-unique kinds the id is the trailing 8 bytes of the KEY (the
1790/// suffix appended by the maintenance path); for `Unique` keys the
1791/// id is the VALUE. Used by
1792/// [`Collection::count_distinct_ids_in_range`].
1793fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
1794    // `Unique` indexes carry the id in the value; non-unique kinds
1795    // (Standard / Each / Composite) carry it as the trailing 8-byte
1796    // suffix of the key. The slicing here is O(1) — no per-entry
1797    // loop to bound (the outer walk's bound is the distinct-set cap).
1798    let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
1799        value
1800    } else {
1801        if full_key.len() < 8 {
1802            return Err(Error::Corruption { page_id: 0 });
1803        }
1804        &full_key[full_key.len() - 8..]
1805    };
1806    let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
1807    Ok(id.get())
1808}
1809
1810// =====================================================================
1811// Phase 7A (M14 #14) — streaming index range iterator
1812// =====================================================================
1813
1814/// Resumption marker for [`IterIndexRange`]'s first refill. After the
1815/// first batch the iterator switches to `Excluded(last_emitted_full_key)`
1816/// for subsequent refills (the same shape `Db::iter_all` uses for the
1817/// primary tree).
1818enum InitialResume {
1819    Included(Vec<u8>),
1820    Excluded(Vec<u8>),
1821    Unbounded,
1822}
1823
1824/// One entry in [`IterIndexRange`]'s pending buffer. Read/Write
1825/// modes stage `Pending(key, id)` and resolve the `T` lazily on
1826/// `next()`; Lazy mode pre-resolves under a single `read_transaction`
1827/// (to preserve snapshot consistency across the index walk + the
1828/// per-row primary `get`) and stages `Resolved(key, T)` directly.
1829enum StagedEntry<T> {
1830    Pending(Vec<u8>, Id),
1831    Resolved(Vec<u8>, T),
1832}
1833
1834/// Streaming iterator returned by [`Collection::iter_range`]. Yields
1835/// `Result<(user_key_bytes, T)>` one row at a time; internally
1836/// refills a fixed-size `(user_key, Id)` buffer in batches of
1837/// `ITER_INDEX_RANGE_BATCH = 256` so the per-step pager-lock cost
1838/// amortises. Memory stays bounded at `O(batch × small_bytes +
1839/// distinct_ids)` regardless of the range's total size.
1840///
1841/// Held data: a `&'a Collection<'_, T>` borrow (the iterator is bound
1842/// to the lifetime of `Collection::iter_range`'s `&self` borrow), the
1843/// index's root page-id, the dedup set for `Each` indexes, the next-
1844/// chunk resumption marker, and the staged batch.
1845pub struct IterIndexRange<'a, T: Document> {
1846    coll: &'a Collection<'a, T>,
1847    descriptor_kind: obj_core::IndexKind,
1848    index_root: u64,
1849    /// First-refill marker — `None` after the iterator has emitted
1850    /// at least one chunk; subsequent refills use `last_full_key`.
1851    initial_resume: Option<InitialResume>,
1852    /// Last full B-tree key emitted by the most recent refill. Drives
1853    /// the `Excluded(_)` resumption bound for the next chunk.
1854    last_full_key: Option<Vec<u8>>,
1855    /// User-supplied end bound (already widened per index kind).
1856    end_bound: Bound<Vec<u8>>,
1857    /// Pre-staged entries from the most recent refill. `next()`
1858    /// pops from the front. Each entry is either `Pending(key, id)`
1859    /// (deferred get-back, the Read/Write streaming path) or
1860    /// `Resolved(key, T)` (eager get-back inside a single
1861    /// `read_transaction`, the Lazy-mode fallback).
1862    buffer: VecDeque<Result<StagedEntry<T>>>,
1863    /// Persistent de-dup set for `Each` indexes. Power-of-ten Rule
1864    /// 3: the set is intentionally unbounded — if the caller wants
1865    /// a hard cap they should use
1866    /// [`Collection::count_distinct_ids_in_range`] (which caps at
1867    /// [`MAX_DISTINCT_IDS`]); the iterator's correctness contract
1868    /// is per-row dedup across the whole range.
1869    emitted_ids: HashSet<u64>,
1870    finished: bool,
1871}
1872
1873impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
1874    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1875        f.debug_struct("IterIndexRange")
1876            .field("descriptor_kind", &self.descriptor_kind)
1877            .field("index_root", &self.index_root)
1878            .field("buffer_len", &self.buffer.len())
1879            .field("emitted_ids_len", &self.emitted_ids.len())
1880            .field("finished", &self.finished)
1881            .finish_non_exhaustive()
1882    }
1883}
1884
1885impl<T: Document> IterIndexRange<'_, T> {
1886    /// Refill `self.buffer` with up to [`ITER_INDEX_RANGE_BATCH`]
1887    /// `(user_key, Id)` pairs by walking the index B-tree from the
1888    /// current resumption marker. Sets `self.finished` when the
1889    /// underlying range scan yields fewer than the requested batch
1890    /// (i.e. it ran past the end bound).
1891    ///
1892    /// Power-of-ten Rule 7: per-step decode errors are pushed into
1893    /// the buffer as `Err(_)` so the caller observes them via
1894    /// `next()` rather than aborting iteration.
1895    fn refill(&mut self) -> Result<()> {
1896        let env = match &self.coll.mode {
1897            CollectionMode::Write(w) => w.env,
1898            CollectionMode::Read(r) => r.env,
1899            CollectionMode::Lazy(_) => {
1900                // Lazy mode falls back to the eager `index_range`
1901                // path in `iter_range` itself (see below). Reaching
1902                // refill in Lazy mode indicates an internal logic
1903                // error, NOT a recoverable corruption — surface as
1904                // a typed error rather than `unwrap`.
1905                return Err(Error::ReadOnly {
1906                    operation: "internal: iter_range refill in Lazy mode",
1907                });
1908            }
1909        };
1910        let root_pid = PageId::new(self.index_root)
1911            .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1912        let start = self.next_start_bound();
1913        let end = clone_bound_ref(&self.end_bound);
1914        let mut pager = lock_pager(env)?;
1915        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1916        let iter = tree.range(&mut pager, (start, end))?;
1917        let mut staged: VecDeque<Result<StagedEntry<T>>> =
1918            VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
1919        let mut last_full: Option<Vec<u8>> = None;
1920        let mut consumed: usize = 0;
1921        for step in iter {
1922            if consumed >= ITER_INDEX_RANGE_BATCH {
1923                break;
1924            }
1925            consumed = consumed
1926                .checked_add(1)
1927                .ok_or(Error::BTreeInvariantViolated {
1928                    reason: "iter_range batch counter overflow",
1929                })?;
1930            self.stage_one(&mut staged, &mut last_full, step);
1931        }
1932        if consumed < ITER_INDEX_RANGE_BATCH {
1933            self.finished = true;
1934        }
1935        drop(pager);
1936        self.buffer.extend(staged);
1937        if let Some(k) = last_full {
1938            self.last_full_key = Some(k);
1939        }
1940        Ok(())
1941    }
1942
1943    /// Process one B-tree step into the staged batch. Encapsulates
1944    /// the `Each`-dedup, the trailing-id-suffix strip, and the
1945    /// `Id::from_be_bytes` parse. Free helper so the refill body
1946    /// stays under the Rule-4 60-line ceiling.
1947    fn stage_one(
1948        &mut self,
1949        staged: &mut VecDeque<Result<StagedEntry<T>>>,
1950        last_full: &mut Option<Vec<u8>>,
1951        step: Result<(Vec<u8>, Vec<u8>)>,
1952    ) {
1953        let (full_key, id_bytes) = match step {
1954            Ok(kv) => kv,
1955            Err(e) => {
1956                staged.push_back(Err(e));
1957                return;
1958            }
1959        };
1960        *last_full = Some(full_key.clone());
1961        let Some(id) = Id::from_be_bytes(&id_bytes) else {
1962            staged.push_back(Err(Error::Corruption { page_id: 0 }));
1963            return;
1964        };
1965        if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
1966            // Same doc already emitted under a different element
1967            // key — skip without producing an output entry.
1968            return;
1969        }
1970        let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
1971        staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
1972    }
1973
1974    /// Compute the start bound for the next refill: use
1975    /// `initial_resume` on the first call (consuming it), thereafter
1976    /// use `Excluded(last_full_key)`.
1977    fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
1978        if let Some(initial) = self.initial_resume.take() {
1979            return match initial {
1980                InitialResume::Included(k) => Bound::Included(k),
1981                InitialResume::Excluded(k) => Bound::Excluded(k),
1982                InitialResume::Unbounded => Bound::Unbounded,
1983            };
1984        }
1985        match &self.last_full_key {
1986            Some(k) => Bound::Excluded(k.clone()),
1987            None => Bound::Unbounded,
1988        }
1989    }
1990}
1991
1992impl<T: Document> Iterator for IterIndexRange<'_, T> {
1993    type Item = Result<(Vec<u8>, T)>;
1994
1995    fn next(&mut self) -> Option<Self::Item> {
1996        loop {
1997            if let Some(staged) = self.buffer.pop_front() {
1998                return Some(self.resolve_one(staged));
1999            }
2000            if self.finished {
2001                return None;
2002            }
2003            if let Err(e) = self.refill() {
2004                // Latch the iterator shut on a refill failure
2005                // (lock acquisition, B-tree open, etc.). Surface
2006                // the error once, then return None on subsequent
2007                // calls — power-of-ten Rule 7.
2008                self.finished = true;
2009                return Some(Err(e));
2010            }
2011            // refill ran; the loop will pop or notice finished.
2012        }
2013    }
2014}
2015
2016impl<T: Document> IterIndexRange<'_, T> {
2017    /// Resolve one staged entry into a `(user_key, T)` pair. For
2018    /// `Pending(_, id)` entries (the Read/Write streaming path),
2019    /// calls [`Collection::get`] to decode `T` on demand; for
2020    /// `Resolved(_, T)` entries (the Lazy-mode eager path), returns
2021    /// the already-decoded value. Orphan index entries (id missing
2022    /// in the primary tree) surface as [`Error::Corruption`],
2023    /// matching [`Collection::index_range`]'s existing contract.
2024    fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
2025        match staged? {
2026            StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
2027                Some(doc) => Ok((user_key, doc)),
2028                None => Err(Error::Corruption { page_id: 0 }),
2029            },
2030            StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
2031        }
2032    }
2033}
2034
2035/// Clone a `&Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`. Takes a
2036/// borrowed owned bound (the shape `IterIndexRange::end_bound`
2037/// stores) and hands back an owned copy for the resumption walk.
2038fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
2039    match b {
2040        Bound::Included(v) => Bound::Included(v.clone()),
2041        Bound::Excluded(v) => Bound::Excluded(v.clone()),
2042        Bound::Unbounded => Bound::Unbounded,
2043    }
2044}