obj/collection.rs
1//! `Collection<T>` — typed handle to one collection's primary
2//! B-tree.
3
4use std::borrow::Cow;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::marker::PhantomData;
7use std::ops::Bound;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use obj_core::btree::BTree;
11use obj_core::codec::{decode, encode};
12use obj_core::pager::page::PageId;
13use obj_core::pager::Pager;
14use obj_core::platform::FileHandle;
15use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
16
17/// Boxed iterator alias used by [`Collection::lookup`] and
18/// [`Collection::index_range`]. The iterator borrows from the
19/// enclosing transaction; the `'tx` lifetime is bound on the
20/// `Collection<T>` it was obtained from.
21pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
22
23/// Per-batch refill size for [`IterIndexRange`]. The iterator yields
24/// one `(user_key, T)` pair at a time but pulls index B-tree entries
25/// in fixed-size chunks so the per-step pager-lock acquisition cost
26/// amortises across many `next()` calls. Power-of-ten Rule 3: the
27/// buffer is fixed-size — at ~8 bytes/key plus the trailing 8-byte
28/// id suffix that's ~4 KiB peak for the staged batch. The buffer
29/// does NOT scale with the range's total size.
30const ITER_INDEX_RANGE_BATCH: usize = 256;
31
32/// Per-call cap on the bounded `HashSet<Id>` used by
33/// [`Collection::count_distinct_ids_in_range`] to count unique
34/// document `Id`s under an `Each` index. Power-of-ten Rule 3: the
35/// distinct set is allocation-bounded; exceeding the cap surfaces
36/// [`obj_core::Error::DistinctCountExceeded`] rather than chewing
37/// arbitrary memory. The user can narrow the range via
38/// `.index_range(...)` to fit inside the budget.
39pub const MAX_DISTINCT_IDS: usize = 100_000;
40
41use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
42
43/// #90 — per-transaction descriptor cache. Maps collection name to
44/// its LIVE [`CollectionDescriptor`] (with `next_id`, `primary_root`,
45/// and every index `root_page_id` advanced IN-MEMORY across the
46/// transaction). This is the single mid-txn source of truth for those
47/// roots; [`crate::WriteTxn::commit`] flushes each entry back to the
48/// catalog exactly once. Shared (via `Arc`) between the [`WriteTxn`]
49/// and every [`Collection`] handle opened on it, so two handles of the
50/// same collection observe the same advancing descriptor.
51pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
52
53/// Construct an empty [`DescriptorCache`].
54#[must_use]
55pub(crate) fn new_descriptor_cache() -> DescriptorCache {
56 Arc::new(Mutex::new(HashMap::new()))
57}
58
59/// Acquire the descriptor-cache mutex; map poison into `Busy` (Rule
60/// 7 — no panic on a poisoned lock).
61pub(crate) fn lock_descriptors(
62 cache: &Mutex<HashMap<String, CollectionDescriptor>>,
63) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
64 cache.lock().map_err(|_| Error::Busy {
65 kind: obj_core::LockKind::WriterInProcess,
66 })
67}
68
69/// Resolve the live cached descriptor for `name`, lazily loading it
70/// from the catalog B-tree on first touch in this transaction. After
71/// the first load the catalog tree is NEVER re-read mid-txn for this
72/// collection — every subsequent read/advance goes through the cache
73/// entry, so the unique pre-check and every index-tree open observe
74/// the in-memory-advanced roots (#90 load-bearing invariant).
75///
76/// Returns a mutable borrow into the cache so callers bump `next_id`,
77/// advance `primary_root`, and let `apply_doc_change` advance index
78/// roots in place — all without a per-doc `Catalog::update`.
79pub(crate) fn cached_descriptor_mut<'g>(
80 cache: &'g mut HashMap<String, CollectionDescriptor>,
81 pager: &mut Pager<FileHandle>,
82 catalog: &Catalog<FileHandle>,
83 name: &str,
84) -> Result<&'g mut CollectionDescriptor> {
85 if !cache.contains_key(name) {
86 let descriptor = catalog_get_required(pager, catalog, name)?;
87 cache.insert(name.to_owned(), descriptor);
88 }
89 cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
90}
91
92/// Typed handle to a collection.
93///
94/// Construct via [`crate::WriteTxn::collection`] (lazy-create) or
95/// [`crate::ReadTxn::collection`] (read-only; errors if absent), or
96/// via [`crate::Db::collection`] for a one-shot read-only handle
97/// bound to a runtime collection name (M11 #94 — Phase 1B).
98///
99/// All methods take `&self` because the underlying state lives
100/// behind mutexes on the parent transaction; the handle itself is
101/// stateless beyond the descriptor it caches.
102pub struct Collection<'tx, T: Document> {
103 /// `Mode::Write` carries the [`WriteTxn`] reference;
104 /// `Mode::Read` the [`ReadTxn`] reference; `Mode::Lazy` carries
105 /// a `&Db` and opens a private read transaction on each method
106 /// call. The [`Collection`]'s lifetime is bound to whichever
107 /// it was constructed from.
108 mode: CollectionMode<'tx>,
109 /// Collection name resolved at construction. For handles built
110 /// via [`crate::WriteTxn::collection`] / [`crate::ReadTxn::collection`]
111 /// this equals `T::COLLECTION`. For handles built via
112 /// [`crate::Db::collection`] this is the caller-supplied runtime
113 /// name (which may differ from `T::COLLECTION`, e.g.
114 /// `"archive.orders"` against a type whose declared `COLLECTION`
115 /// is `"orders"`).
116 //
117 // `collection_name` rather than the lint-suggested `name`
118 // because `name` collides with `IndexDescriptor::name` /
119 // `IndexSpec::name` throughout this module's internals.
120 //
121 // `Cow<'static, str>` (#84): the common `WriteTxn`/`ReadTxn`
122 // open path resolves to `T::COLLECTION` — a `&'static str` —
123 // so it can be stored as `Cow::Borrowed` with NO `to_owned()`
124 // heap allocation per read. The runtime-name path
125 // (`Db::collection`, `open_readonly_named` with a caller name)
126 // stores `Cow::Owned`.
127 #[allow(clippy::struct_field_names)]
128 collection_name: Cow<'static, str>,
129 /// Cached descriptor. Populated at construction for `Write` /
130 /// `Read` mode; never updated in place — a `Collection<T>`
131 /// reflects the catalog row that existed when the handle was
132 /// opened. `update` / `delete` inside the same txn re-read the
133 /// descriptor through the catalog lock to capture mutations
134 /// from prior calls in the same txn (e.g. an `insert` that
135 /// advanced `next_id`).
136 ///
137 /// For `Lazy` mode the descriptor is a sentinel — the real
138 /// descriptor is loaded inside each method's private read
139 /// transaction.
140 descriptor: CollectionDescriptor,
141 _phantom: PhantomData<fn() -> T>,
142}
143
144/// Backing-reference inside a [`Collection`]. Encodes whether
145/// the txn is writable.
146enum CollectionMode<'tx> {
147 Write(WriteRef<'tx>),
148 Read(ReadRef<'tx>),
149 /// Read-only handle that opens a private read transaction on
150 /// each method call. Constructed by [`crate::Db::collection`];
151 /// the `'tx` lifetime is the borrow of `&Db`.
152 Lazy(LazyRef<'tx>),
153}
154
155struct LazyRef<'db> {
156 /// Borrowed `Db` — methods open one-shot read transactions on
157 /// it. The `'db` lifetime keeps the borrow alive for as long
158 /// as the [`Collection`] handle.
159 db: &'db crate::Db,
160}
161
162struct WriteRef<'tx> {
163 env: &'tx obj_core::TxnEnv<FileHandle>,
164 catalog: Arc<Mutex<Catalog<FileHandle>>>,
165 /// #90: shared per-txn descriptor cache (cloned from the owning
166 /// [`WriteTxn`]). The single mid-txn source of truth for
167 /// `next_id` / `primary_root` / index roots; flushed once at
168 /// commit.
169 descriptors: DescriptorCache,
170}
171
172struct ReadRef<'tx> {
173 snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
174 env: &'tx obj_core::TxnEnv<FileHandle>,
175}
176
177impl<'tx, T: Document> Collection<'tx, T> {
178 /// Open the collection on the write side, lazy-creating the
179 /// catalog row + an empty primary B-tree on first call, and
180 /// reconciling the type's declared `Document::indexes()` against
181 /// the catalog's stored descriptors (M7 #57) on the first call
182 /// per process per collection.
183 pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
184 // Stage 1: lazy-create the collection row + an empty primary
185 // B-tree if this is the first call for `T` against this
186 // database. The returned descriptor is the pre-reconcile
187 // shape; we re-read after reconciliation below because the
188 // reconciler may have appended index descriptors.
189 let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
190 reconcile_indexes_once::<T>(
191 &tx.inner,
192 &tx.catalog,
193 &tx.reconciled,
194 &mut tx.reconciled_staged,
195 )?;
196 // Re-read the descriptor in case the reconciler mutated the
197 // index roster (the reconciler runs through `Catalog::update`
198 // which rewrites the descriptor's `indexes` vector).
199 let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
200 Ok(Self {
201 mode: CollectionMode::Write(WriteRef {
202 env: tx.inner.env(),
203 catalog: Arc::clone(&tx.catalog),
204 descriptors: Arc::clone(&tx.descriptors),
205 }),
206 // `T::COLLECTION` is `&'static str` (#84): borrow, don't
207 // allocate.
208 collection_name: Cow::Borrowed(T::COLLECTION),
209 descriptor,
210 _phantom: PhantomData,
211 })
212 }
213
214 /// Open the collection on the read side. Errors if the
215 /// collection has not yet been registered AT THE SNAPSHOT'S
216 /// PINNED LSN — a collection created by a concurrent writer
217 /// AFTER the snapshot was pinned is invisible to this txn (M6
218 /// #53). The descriptor is read by walking the catalog B+tree
219 /// rooted at `snapshot.root_catalog()` (the value captured by
220 /// `Pager::reader_snapshot` at pin time) using the snapshot-
221 /// aware [`Catalog::lookup_via_snapshot`] free function — NOT
222 /// via the writer's live `Catalog.tree.root`, which a concurrent
223 /// writer may have COW-advanced past the snapshot.
224 ///
225 /// M11 #93: if `T::COLLECTION` is of the form
226 /// `<namespace>.<name>`, the lookup dispatches against the
227 /// attached database registered under `<namespace>` instead of
228 /// the calling Db. The attached snapshot is the one pinned by
229 /// `Db::read_transaction` at txn-begin; the attached env is
230 /// passed through the same way.
231 pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
232 // `T::COLLECTION` is `&'static str` — store it as
233 // `Cow::Borrowed` (#84), avoiding the per-read `to_owned()`
234 // heap allocation on the common typed-handle path.
235 Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
236 }
237
238 /// Open the collection on the read side against a caller-supplied
239 /// runtime `name`. Like [`Self::open_readonly`] but the catalog
240 /// lookup uses `name` instead of `T::COLLECTION` — required for
241 /// [`crate::Db::collection`]'s Phase 1B accessor (M11 #94), which
242 /// binds a runtime collection name (e.g. `"archive.orders"`) to
243 /// the typed handle.
244 ///
245 /// Namespace dispatch (`<ns>.<tail>` → attached database) follows
246 /// the same shape as [`Self::open_readonly`].
247 pub(crate) fn open_readonly_named(
248 tx: &'tx ReadTxn<'_>,
249 name: Cow<'static, str>,
250 ) -> Result<Self> {
251 let (namespace, tail) = crate::db::split_namespace(&name);
252 let (env, snapshot, lookup_name): (
253 &'tx obj_core::TxnEnv<FileHandle>,
254 &'tx obj_core::ReaderSnapshot<FileHandle>,
255 &str,
256 ) = match namespace {
257 None => (tx.inner.env(), tx.inner.snapshot(), &name),
258 Some(ns) => {
259 let ctx = tx
260 .attached
261 .get(ns)
262 .ok_or_else(|| Error::CollectionNamespaceUnknown {
263 namespace: ns.to_owned(),
264 })?;
265 (ctx.env.as_ref(), &ctx.snapshot, tail)
266 }
267 };
268 let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
269 else {
270 return Err(Error::CollectionNotFound {
271 name: name.into_owned(),
272 });
273 };
274 Ok(Self {
275 mode: CollectionMode::Read(ReadRef { snapshot, env }),
276 // `name` is moved in unchanged — `Cow::Borrowed(&'static)`
277 // for the typed-handle path (no alloc), `Cow::Owned` for
278 // runtime-name handles.
279 collection_name: name,
280 descriptor,
281 _phantom: PhantomData,
282 })
283 }
284
285 /// Construct a deferred-lookup, read-only handle bound to a
286 /// runtime collection name. Each method call opens a private
287 /// read transaction on `db` and dispatches through
288 /// [`Self::open_readonly_named`]. Construction is infallible —
289 /// errors (missing collection, unknown namespace, etc.) surface
290 /// at the first method call.
291 ///
292 /// Used by [`crate::Db::collection`] (Phase 1B, M11 #94).
293 pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
294 Self {
295 mode: CollectionMode::Lazy(LazyRef { db }),
296 collection_name: Cow::Owned(name),
297 // Sentinel descriptor — never read in Lazy mode; the
298 // real descriptor is loaded inside each method's
299 // private read transaction.
300 descriptor: CollectionDescriptor::new(0, 0, 0),
301 _phantom: PhantomData,
302 }
303 }
304
305 /// Cached descriptor (`collection_id`, `primary_root`,
306 /// `type_version`, `next_id` at handle-open time).
307 #[must_use]
308 pub fn descriptor(&self) -> &CollectionDescriptor {
309 &self.descriptor
310 }
311
312 /// #90: the LIVE primary-tree root for a `Write`-mode handle.
313 ///
314 /// Prefers the per-txn descriptor cache (which carries every
315 /// `primary_root` advance from this txn's prior writes) so a
316 /// read-after-write on the same handle inside one transaction
317 /// observes its own uncommitted inserts. Falls back to the
318 /// handle's open-time `primary_root` if this collection has not
319 /// yet been written in the txn (no cache entry).
320 fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
321 let cache = lock_descriptors(&write.descriptors)?;
322 Ok(cache
323 .get(self.collection_name.as_ref())
324 .map_or(self.descriptor.primary_root, |d| d.primary_root))
325 }
326
327 /// #90: the LIVE `root_page_id` for the named `Active` index on a
328 /// `Write`-mode handle. Prefers the per-txn cache so a read after
329 /// an index-mutating write in the same txn descends the advanced
330 /// root; falls back to `fallback` (the handle's open-time
331 /// descriptor entry) when the collection has no cache entry yet.
332 fn write_index_root(
333 &self,
334 write: &WriteRef<'tx>,
335 index_name: &str,
336 fallback: u64,
337 ) -> Result<u64> {
338 let cache = lock_descriptors(&write.descriptors)?;
339 let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
340 return Ok(fallback);
341 };
342 let live = descriptor
343 .indexes
344 .iter()
345 .find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
346 .map_or(fallback, |d| d.root_page_id);
347 Ok(live)
348 }
349
350 /// Insert `doc`. Returns the freshly-allocated [`Id`].
351 ///
352 /// # Errors
353 ///
354 /// - [`Error::ReadOnly`] if the handle is read-only.
355 /// - Pager / catalog / codec errors propagated.
356 // `doc: T` is taken by value for caller ergonomics (the user
357 // owns the value and gives it to the database). The codec's
358 // `encode(&T, ...)` takes a reference, so clippy sees the
359 // function body as "owned but borrows only" — accepted as the
360 // intended public-API shape.
361 #[allow(clippy::needless_pass_by_value)]
362 pub fn insert(&self, doc: T) -> Result<Id> {
363 let write = self.write_or_err("insert")?;
364 let name: &str = self.collection_name.as_ref();
365 let mut pager = lock_pager(write.env)?;
366 let catalog = lock_catalog(&write.catalog)?;
367 // #90: the cached descriptor is the SOLE mid-txn source of
368 // truth. `next_id` is an in-memory bump; `primary_root` and
369 // index roots advance in the cache; NO per-doc
370 // `Catalog::update` — the single flush is deferred to commit.
371 let mut cache = lock_descriptors(&write.descriptors)?;
372 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
373 let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
374 let bytes = encode(&doc, descriptor.collection_id)?;
375 let key = id.to_be_bytes();
376 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
377 tree.insert(&mut pager, &key, &bytes)?;
378 descriptor.primary_root = tree.root().get();
379 // Index maintenance lands inside the same WAL transaction as
380 // the primary write, descending the cache's in-memory index
381 // roots. A `UniqueConstraintViolation` here surfaces via `?`
382 // and the surrounding `WriteTxn` rolls back atomically.
383 crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
384 Ok(id)
385 }
386
387 /// Fetch the document at `id`.
388 ///
389 /// On the write side this consults the pager (sees pending
390 /// writes in the current txn). On the read side it consults
391 /// the snapshot's frozen view.
392 ///
393 /// # Lazy migration (M10 #84)
394 ///
395 /// If the on-disk record was written by an older
396 /// `Document::VERSION` than the current `T::VERSION`, the codec
397 /// walks the stored bytes through the schema registered for
398 /// that version (see `T::historical_schemas()`) and dispatches
399 /// the resulting structured `Dynamic` through `T::migrate`.
400 /// **The migrated bytes are NOT written back to disk.** The
401 /// next [`Collection::get`] re-reads the same v(n) bytes and
402 /// re-runs migration. Only a subsequent
403 /// [`Collection::update`](Self::update) /
404 /// [`Collection::upsert`](Self::upsert) writes the document
405 /// back, at which point the on-disk header records
406 /// `T::VERSION`.
407 ///
408 /// This contract is what allows mixed-version reads to scale:
409 /// a 10⁹-doc collection does not need to be batch-rewritten on
410 /// schema upgrade. Power-of-ten Rule 7: every "migration ran"
411 /// path returns the migrated `T`; no implicit write-back.
412 ///
413 /// # Errors
414 ///
415 /// Pager / B-tree / codec errors propagated. In particular:
416 ///
417 /// - [`Error::SchemaNotRegistered`](obj_core::Error::SchemaNotRegistered)
418 /// if the stored record carries a `type_version` for which
419 /// `T::historical_schemas()` has no entry.
420 /// - [`Error::SchemaMigrationNotImplemented`](obj_core::Error::SchemaMigrationNotImplemented)
421 /// if the registered `T::migrate` returns the default error.
422 pub fn get(&self, id: Id) -> Result<Option<T>> {
423 if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
424 return r;
425 }
426 let key = id.to_be_bytes();
427 let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
428 return Ok(None);
429 };
430 Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
431 }
432
433 /// Read the primary-tree value at `key` against the current
434 /// `Write` / `Read` mode (Lazy mode is dispatched upstream of
435 /// every public caller via [`Self::dispatch_lazy`]).
436 fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
437 match &self.mode {
438 CollectionMode::Write(write) => {
439 // #90: descend the LIVE primary root so a get after an
440 // insert/update in the SAME txn sees its own writes.
441 let primary_root = self.write_primary_root(write)?;
442 let mut pager = lock_pager(write.env)?;
443 let tree = btree_handle(&pager, primary_root)?;
444 tree.get(&mut pager, key)
445 }
446 CollectionMode::Read(read) => {
447 snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
448 }
449 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
450 operation: "internal: lazy-mode primary read",
451 }),
452 }
453 }
454
455 /// Apply `f` to the document at `id`, writing the mutated value
456 /// back.
457 ///
458 /// # Errors
459 ///
460 /// - [`Error::ReadOnly`] on a read-side handle.
461 /// - [`Error::DocumentNotFound`] if `id` is absent.
462 /// - Pager / catalog / codec errors propagated.
463 pub fn update<F>(&self, id: Id, f: F) -> Result<()>
464 where
465 F: FnOnce(&mut T),
466 {
467 let write = self.write_or_err("update")?;
468 let name: &str = self.collection_name.as_ref();
469 let mut pager = lock_pager(write.env)?;
470 let catalog = lock_catalog(&write.catalog)?;
471 // #90: resolve the live cached descriptor (sole mid-txn source
472 // of truth); advance its roots in place, no per-doc flush.
473 let mut cache = lock_descriptors(&write.descriptors)?;
474 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
475 let key = id.to_be_bytes();
476 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
477 let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
478 collection: T::COLLECTION,
479 id: id.get(),
480 })?;
481 let old_value = decode::<T>(&existing, descriptor.collection_id)?;
482 let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
483 f(&mut new_value);
484 let bytes = encode(&new_value, descriptor.collection_id)?;
485 tree.delete(&mut pager, &key)?;
486 tree.insert(&mut pager, &key, &bytes)?;
487 descriptor.primary_root = tree.root().get();
488 crate::index_maint::apply_doc_change::<T>(
489 &mut pager,
490 descriptor,
491 Some(&old_value),
492 Some(&new_value),
493 id,
494 )?;
495 Ok(())
496 }
497
498 /// Delete the document at `id`. Returns `true` if it existed.
499 ///
500 /// # Errors
501 ///
502 /// - [`Error::ReadOnly`] on a read-side handle.
503 /// - Pager / catalog errors propagated.
504 pub fn delete(&self, id: Id) -> Result<bool> {
505 let write = self.write_or_err("delete")?;
506 let name: &str = self.collection_name.as_ref();
507 let mut pager = lock_pager(write.env)?;
508 let catalog = lock_catalog(&write.catalog)?;
509 // #90: cached descriptor is the sole mid-txn source of truth.
510 let mut cache = lock_descriptors(&write.descriptors)?;
511 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
512 let key = id.to_be_bytes();
513 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
514 // Materialise the old document so the index-maintenance
515 // path can compute the OLD key set and emit per-index
516 // delete entries. If the row does not exist, we still
517 // return `Ok(false)` for API back-compat (no index work).
518 let old_value = match tree.get(&mut pager, &key)? {
519 Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
520 None => None,
521 };
522 let removed = tree.delete(&mut pager, &key)?;
523 descriptor.primary_root = tree.root().get();
524 crate::index_maint::apply_doc_change::<T>(
525 &mut pager,
526 descriptor,
527 old_value.as_ref(),
528 None,
529 id,
530 )?;
531 Ok(removed)
532 }
533
534 /// Insert-or-replace `doc` at `id`.
535 ///
536 /// # Errors
537 ///
538 /// - [`Error::ReadOnly`] on a read-side handle.
539 /// - Pager / catalog / codec errors propagated.
540 // `doc: T` passed by value for ergonomic ownership transfer;
541 // codec takes `&T` so clippy reports a "borrow only" body.
542 #[allow(clippy::needless_pass_by_value)]
543 pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
544 let write = self.write_or_err("upsert")?;
545 let name: &str = self.collection_name.as_ref();
546 let mut pager = lock_pager(write.env)?;
547 let catalog = lock_catalog(&write.catalog)?;
548 // #90: cached descriptor is the sole mid-txn source of truth.
549 let mut cache = lock_descriptors(&write.descriptors)?;
550 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
551 let bytes = encode(&doc, descriptor.collection_id)?;
552 let key = id.to_be_bytes();
553 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
554 // Materialise the prior value (if any) so the index-
555 // maintenance diff sees the OLD key set.
556 let old_value = match tree.get(&mut pager, &key)? {
557 Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
558 None => None,
559 };
560 let _ = tree.delete(&mut pager, &key)?;
561 tree.insert(&mut pager, &key, &bytes)?;
562 descriptor.primary_root = tree.root().get();
563 crate::index_maint::apply_doc_change::<T>(
564 &mut pager,
565 descriptor,
566 old_value.as_ref(),
567 Some(&doc),
568 id,
569 )?;
570 Ok(())
571 }
572
573 /// Look up the single document whose `index_name` key matches
574 /// `key` under a `Unique` index.
575 ///
576 /// Errors with [`Error::IndexNotUnique`] if `index_name` resolves
577 /// to a non-unique index — `find_unique` is *only* defined on
578 /// `Unique` indexes. For `Standard` / `Each` / `Composite` use
579 /// [`Self::lookup`] (which returns an iterator).
580 ///
581 /// Snapshot-aware: on a write-side handle the lookup sees the
582 /// current txn's pending writes; on a read-side handle it sees
583 /// the snapshot's frozen view.
584 ///
585 /// # Errors
586 ///
587 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
588 /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
589 /// - Pager / B-tree / codec errors propagated.
590 pub fn find_unique(
591 &self,
592 index_name: &str,
593 key: impl Into<obj_core::codec::Dynamic>,
594 ) -> Result<Option<T>> {
595 let key_dyn = key.into();
596 if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
597 return r;
598 }
599 let descriptor = self.active_index(index_name)?;
600 if descriptor.kind != obj_core::IndexKind::Unique {
601 return Err(Error::IndexNotUnique {
602 collection: self.collection_name.clone().into_owned(),
603 name: index_name.to_owned(),
604 });
605 }
606 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
607 let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
608 match id_bytes {
609 Some(bytes) => match Id::from_be_bytes(&bytes) {
610 Some(id) => self.get(id),
611 None => Err(Error::Corruption { page_id: 0 }),
612 },
613 None => Ok(None),
614 }
615 }
616
617 /// Yield every document whose `index_name` key matches `key`.
618 /// Works on `Standard` / `Unique` / `Each` indexes. Returns
619 /// `Err(Error::IndexKindMismatch)`-style guidance for
620 /// `Composite` (use [`Self::index_range`] for tuple-shaped
621 /// keys).
622 ///
623 /// The same document is yielded at most once even if it owns
624 /// multiple matching entries — `Each` indexes can encode the
625 /// same `id` under multiple element keys; we de-dup on emit.
626 ///
627 /// # Errors
628 ///
629 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
630 /// - Pager / B-tree / codec errors propagated.
631 pub fn lookup(
632 &self,
633 index_name: &str,
634 key: impl Into<obj_core::codec::Dynamic>,
635 ) -> Result<IndexIter<'static, T>>
636 where
637 T: Send + 'static,
638 {
639 let key_dyn = key.into();
640 if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
641 return r;
642 }
643 let descriptor = self.active_index(index_name)?;
644 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
645 let ids = match descriptor.kind {
646 obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
647 obj_core::IndexKind::Standard
648 | obj_core::IndexKind::Each
649 | obj_core::IndexKind::Composite => {
650 self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
651 }
652 // `IndexKind` is `#[non_exhaustive]`; an unknown kind means
653 // this build predates the on-disk index. Refuse rather than
654 // return a wrong (possibly empty) id set.
655 _ => return Err(Error::InvalidArgument("unsupported index kind")),
656 };
657 let resolved = self.resolve_unique_ids(ids)?;
658 Ok(Box::new(resolved.into_iter().map(Ok)))
659 }
660
661 /// Yield `(user_key, doc)` pairs whose index key falls within
662 /// `range`. The bounds are [`Dynamic`](obj_core::codec::Dynamic)
663 /// values — the same ergonomic type [`crate::Query::index_range`]
664 /// takes — encoded internally through the order-preserving field
665 /// encoder ([`obj_core::index::encode_field`]); callers no longer
666 /// hand-encode index-key bytes.
667 ///
668 /// For non-Unique kinds (`Standard` / `Each` / `Composite`) the
669 /// bounds are widened internally so a user-facing
670 /// `Included(x)..=Included(x)` range matches every entry whose
671 /// user-key equals `x` even though the underlying B-tree key
672 /// carries an `id_be8` suffix (see `docs/format.md` § Index key
673 /// encoding § Range-bound widening (non-Unique kinds)).
674 ///
675 /// # Errors
676 ///
677 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
678 /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
679 /// carries an embedded NUL byte (the order-preserving encoder
680 /// rejects those).
681 /// - Pager / B-tree / codec errors propagated.
682 pub fn index_range<R>(
683 &self,
684 index_name: &str,
685 range: R,
686 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
687 where
688 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
689 T: Send + 'static,
690 {
691 let start = encode_dynamic_bound(range.start_bound())?;
692 let end = encode_dynamic_bound(range.end_bound())?;
693 self.index_range_encoded(index_name, start, end)
694 }
695
696 /// Encoded-bytes variant of [`Self::index_range`]. The bounds are
697 /// already the order-preserving field encoding of the user's
698 /// `Dynamic` value(s); this keeps the signature general for
699 /// `Composite` "starts-with" scans and is the entry point the
700 /// query layer / lazy-dispatch recursion call after they have
701 /// done their own encoding.
702 pub(crate) fn index_range_encoded(
703 &self,
704 index_name: &str,
705 start_bound: std::ops::Bound<Vec<u8>>,
706 end_bound: std::ops::Bound<Vec<u8>>,
707 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
708 where
709 T: Send + 'static,
710 {
711 if let Some(r) = self.dispatch_lazy(|c| {
712 c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
713 }) {
714 return r;
715 }
716 let descriptor = self.active_index(index_name)?;
717 let (start, end) =
718 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
719 let entries = self.collect_range(descriptor, start, end)?;
720 let descriptor_kind = descriptor.kind;
721 let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
722 let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
723 for (full_key, id_bytes_value) in entries {
724 let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
725 out.push(Err(Error::Corruption { page_id: 0 }));
726 continue;
727 };
728 // For Each indexes the same doc may appear multiple
729 // times under different element keys — de-dup on
730 // emission.
731 if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
732 continue;
733 }
734 // For non-unique kinds the B-tree key includes the
735 // trailing 8-byte id suffix; strip it so the caller
736 // sees only the user-key bytes.
737 let user_key = strip_id_suffix(&full_key, descriptor_kind);
738 match self.get(id) {
739 Ok(Some(doc)) => out.push(Ok((user_key, doc))),
740 Ok(None) => {
741 // Orphan index entry — surface as Corruption.
742 out.push(Err(Error::Corruption { page_id: 0 }));
743 }
744 Err(e) => out.push(Err(e)),
745 }
746 }
747 Ok(Box::new(out.into_iter()))
748 }
749
750 /// Streaming variant of [`Self::index_range`] (Phase 7A perf
751 /// pass, M14 #14). Yields `(user_key, T)` pairs lazily — the
752 /// returned [`IterIndexRange`] decodes one `T` per `next()` call
753 /// rather than building a `Vec<Result<(_, T)>>` of every match
754 /// up front. The iterator borrows `&'a self`, so it must be
755 /// consumed inside the lifetime of the enclosing
756 /// [`crate::WriteTxn`] / [`crate::ReadTxn`] (or the
757 /// [`crate::Db::collection`] handle, in Lazy mode).
758 ///
759 /// # When to prefer `iter_range` over `index_range`
760 ///
761 /// - **Memory.** `index_range` allocates `O(matches × sizeof(T))`
762 /// upfront; `iter_range` keeps a fixed-size [`VecDeque`] of
763 /// `(key, id)` pairs (`ITER_INDEX_RANGE_BATCH = 256` entries)
764 /// and decodes one `T` at a time. For a 100k-row range with
765 /// ~500-byte documents that's ~50 MB peak vs. a few KiB.
766 /// - **Latency-to-first-row.** `index_range` decodes every
767 /// matching document before returning the iterator;
768 /// `iter_range` returns immediately after the first chunk
769 /// refill, so the first `next()` returns after one index walk
770 /// + one primary-tree `get` (rather than `N`).
771 ///
772 /// # When `index_range` is still the right answer
773 ///
774 /// `index_range` returns an `IndexIter<'static, _>` — it can
775 /// escape the `read_transaction` / `transaction` closure that
776 /// produced it. `iter_range` is bound to `&self`, so the
777 /// iterator dies when the [`Collection`] handle dies. If you
778 /// need to return the iterator to outer scope, stick with
779 /// `index_range`.
780 ///
781 /// # Per-row `get`-back design choice
782 ///
783 /// Each `next()` yields `(user_key, T)` by calling
784 /// [`Self::get`] under the hood — i.e. a SECOND B+tree descent
785 /// per row (the first is the index range walk; the second is
786 /// the primary-tree `get(id)`). This is intentional and
787 /// inherited from `index_range`: the index leaf stores only
788 /// the document `id` (8 bytes), not the document bytes. A
789 /// future format-minor bump may add value-in-index storage to
790 /// short-circuit the second descent; that work is pinned to
791 /// post-1.0 (tracked as pit issue #16, "value-in-index
792 /// storage to eliminate `index_range` double-decode").
793 ///
794 /// # Errors
795 ///
796 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
797 /// - Pager / B-tree / codec errors propagated at construction
798 /// and from each `next()` call.
799 pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
800 where
801 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
802 T: Send + 'static,
803 {
804 let start_bound = encode_dynamic_bound(range.start_bound())?;
805 let end_bound = encode_dynamic_bound(range.end_bound())?;
806 self.iter_range_encoded(index_name, start_bound, end_bound)
807 }
808
809 /// Encoded-bytes variant of [`Self::iter_range`]. Bounds are the
810 /// order-preserving field encoding of the user's `Dynamic`
811 /// value(s); used internally by the lazy-mode fallback path.
812 fn iter_range_encoded<'a>(
813 &'a self,
814 index_name: &str,
815 start_bound: Bound<Vec<u8>>,
816 end_bound: Bound<Vec<u8>>,
817 ) -> Result<IterIndexRange<'a, T>>
818 where
819 T: Send + 'static,
820 {
821 // Lazy mode falls back to `index_range`'s eager materialization
822 // path so the index walk + per-row `get` share a single
823 // snapshot. Streaming refills can't preserve that — each
824 // refill would open a fresh txn and observe a different
825 // snapshot. Lazy callers who want true streaming should open
826 // an explicit `Db::read_transaction` and call `iter_range`
827 // on the bound collection there.
828 if matches!(self.mode, CollectionMode::Lazy(_)) {
829 return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
830 }
831 let descriptor = self.active_index(index_name)?;
832 // #90: in Write mode the iterator must walk the LIVE index
833 // root (the per-txn cache's advanced value) so a streaming
834 // scan opened after an in-txn index write sees its own
835 // entries. In Read mode there is no write cache, so the
836 // open-time descriptor root is authoritative.
837 let index_root = match &self.mode {
838 CollectionMode::Write(w) => {
839 self.write_index_root(w, index_name, descriptor.root_page_id)?
840 }
841 _ => descriptor.root_page_id,
842 };
843 let (start, end) =
844 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
845 // Stash the resumption marker as `Excluded(start)` only on
846 // the first refill; afterwards the iterator overwrites it
847 // with the last full_key it emitted. The initial `start`
848 // bound is honoured by `refill` via the same `last_full_key`
849 // → `Excluded(_)` translation.
850 let initial_resume = match start {
851 Bound::Included(k) => InitialResume::Included(k),
852 Bound::Excluded(k) => InitialResume::Excluded(k),
853 Bound::Unbounded => InitialResume::Unbounded,
854 };
855 Ok(IterIndexRange {
856 coll: self,
857 descriptor_kind: descriptor.kind,
858 index_root,
859 initial_resume: Some(initial_resume),
860 last_full_key: None,
861 end_bound: end,
862 buffer: VecDeque::new(),
863 emitted_ids: HashSet::new(),
864 finished: false,
865 })
866 }
867
868 /// Lazy-mode fallback for [`Self::iter_range`]: delegates to
869 /// [`Self::index_range`] (which itself dispatches through a fresh
870 /// read txn) and rehouses the eagerly-materialised entries into
871 /// the streaming iterator's buffer as
872 /// [`StagedEntry::Resolved`]. Power-of-ten Rule 4: keeping this
873 /// isolated so the streaming path's `iter_range` body stays
874 /// small.
875 fn iter_range_lazy_fallback<'a>(
876 &'a self,
877 index_name: &str,
878 start_bound: Bound<Vec<u8>>,
879 end_bound: Bound<Vec<u8>>,
880 ) -> Result<IterIndexRange<'a, T>>
881 where
882 T: Send + 'static,
883 {
884 let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
885 let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
886 for item in materialized {
887 match item {
888 Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
889 Err(e) => buffer.push_back(Err(e)),
890 }
891 }
892 Ok(IterIndexRange {
893 coll: self,
894 descriptor_kind: obj_core::IndexKind::Standard,
895 index_root: 0,
896 initial_resume: None,
897 last_full_key: None,
898 end_bound: Bound::Unbounded,
899 buffer,
900 emitted_ids: HashSet::new(),
901 finished: true,
902 })
903 }
904
905 /// Look up the `IndexKind` of an active index by name. Used by
906 /// the M8 query layer to dispatch `Query::count` between the
907 /// entry-count and distinct-id-count paths (M8 follow-up #72).
908 ///
909 /// # Errors
910 ///
911 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
912 pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
913 Ok(self.active_index(index_name)?.kind)
914 }
915
916 /// Resolve `index_name` to an `Active` `IndexDescriptor` on the
917 /// collection. Errors with [`Error::IndexNotFound`] if absent
918 /// or `DroppedPending`.
919 ///
920 /// Returns a borrow into `self.descriptor.indexes` — no per-lookup
921 /// clone (#84). Every caller uses the descriptor only within the
922 /// enclosing `&self` borrow; `iter_range_encoded` copies the two
923 /// `Copy` fields it needs (`kind`, `root_page_id`) into the
924 /// returned iterator rather than holding the borrow.
925 fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
926 let entry = self
927 .descriptor
928 .indexes
929 .iter()
930 .find(|d| d.name == index_name);
931 match entry {
932 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
933 _ => Err(Error::IndexNotFound {
934 collection: self.collection_name.clone().into_owned(),
935 name: index_name.to_owned(),
936 }),
937 }
938 }
939
940 /// Single-key `get` on an index B-tree. Used by `find_unique`
941 /// and by the Unique-kind branch of `lookup`.
942 fn index_get(
943 &self,
944 descriptor: &obj_core::IndexDescriptor,
945 key: &[u8],
946 ) -> Result<Option<Vec<u8>>> {
947 match &self.mode {
948 CollectionMode::Write(write) => {
949 // #90: descend the LIVE index root from the per-txn
950 // cache so an index read after an index-mutating write
951 // in the SAME txn observes its own entries.
952 let root_raw =
953 self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
954 let root = PageId::new(root_raw)
955 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
956 let mut pager = lock_pager(write.env)?;
957 let tree = BTree::<FileHandle>::open(&pager, root)?;
958 tree.get(&mut pager, key)
959 }
960 CollectionMode::Read(read) => {
961 let root = PageId::new(descriptor.root_page_id)
962 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
963 let pager = lock_pager(read.env)?;
964 BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
965 }
966 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
967 operation: "internal: lazy-mode index_get",
968 }),
969 }
970 }
971
972 /// Collect every `(full_key, value)` entry from an index B-tree
973 /// whose key starts with `prefix`. For unique kinds the prefix
974 /// is the entire key (one match max); for non-unique kinds we
975 /// match every key whose first `prefix.len()` bytes equal
976 /// `prefix` (the trailing `id_suffix` varies per doc).
977 fn collect_nonunique_equal(
978 &self,
979 descriptor: &obj_core::IndexDescriptor,
980 prefix: &[u8],
981 ) -> Result<Vec<u64>> {
982 let entries = self.collect_range(
983 descriptor,
984 std::ops::Bound::Included(prefix.to_vec()),
985 // Upper bound is `prefix || 0xFF..` — every key whose
986 // user-portion equals `prefix` falls in
987 // `[prefix, prefix || u64::MAX]`.
988 std::ops::Bound::Included(append_max_id(prefix)),
989 )?;
990 let mut ids = Vec::with_capacity(entries.len());
991 let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
992 for (_full_key, value) in entries {
993 let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
994 if emitted.insert(id.get()) {
995 ids.push(id.get());
996 }
997 }
998 Ok(ids)
999 }
1000
1001 /// Collect a single id from a Unique index B-tree at `key`.
1002 fn collect_unique(
1003 &self,
1004 descriptor: &obj_core::IndexDescriptor,
1005 key: &[u8],
1006 ) -> Result<Vec<u64>> {
1007 match self.index_get(descriptor, key)? {
1008 Some(bytes) => Id::from_be_bytes(&bytes)
1009 .map(|id| vec![id.get()])
1010 .ok_or(Error::Corruption { page_id: 0 }),
1011 None => Ok(Vec::new()),
1012 }
1013 }
1014
1015 /// Collect every `(full_key, value)` entry from an index B-tree
1016 /// whose key falls within `(start, end)`.
1017 fn collect_range(
1018 &self,
1019 descriptor: &obj_core::IndexDescriptor,
1020 start: std::ops::Bound<Vec<u8>>,
1021 end: std::ops::Bound<Vec<u8>>,
1022 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1023 // Read mode pins every page read to the txn's snapshot (M12
1024 // #12): a concurrent writer's post-snapshot index entries must
1025 // not leak into a read txn's range/count. Write mode keeps the
1026 // live-pager scan so the txn observes its own uncommitted
1027 // index writes (it has no snapshot to pin against).
1028 match &self.mode {
1029 CollectionMode::Read(r) => {
1030 let root = PageId::new(descriptor.root_page_id)
1031 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1032 let pager = lock_pager(r.env)?;
1033 let iter = BTree::<FileHandle>::range_via_snapshot(
1034 &pager,
1035 r.snapshot,
1036 root,
1037 (start, end),
1038 )?;
1039 let mut out = Vec::new();
1040 for step in iter {
1041 out.push(step?);
1042 }
1043 Ok(out)
1044 }
1045 CollectionMode::Write(w) => {
1046 // #90: scan the LIVE index root from the per-txn cache
1047 // so an in-txn index scan sees its own uncommitted
1048 // entries (the open-time descriptor root may be stale).
1049 let root_raw =
1050 self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
1051 let root = PageId::new(root_raw)
1052 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1053 let mut pager = lock_pager(w.env)?;
1054 let tree = BTree::<FileHandle>::open(&pager, root)?;
1055 let iter = tree.range(&mut pager, (start, end))?;
1056 let mut out = Vec::new();
1057 for step in iter {
1058 out.push(step?);
1059 }
1060 Ok(out)
1061 }
1062 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1063 operation: "internal: lazy-mode collect_range",
1064 }),
1065 }
1066 }
1067
1068 /// Resolve a `Vec<u64>` of `Id` integer values into concrete
1069 /// `T` documents via `self.get`. Preserves order; missing rows
1070 /// surface as `Error::Corruption` (orphan index entry).
1071 fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
1072 let mut out = Vec::with_capacity(ids.len());
1073 for raw in ids {
1074 let id =
1075 Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
1076 let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
1077 out.push(doc);
1078 }
1079 Ok(out)
1080 }
1081
1082 /// Count every entry in the primary tree WITHOUT decoding the
1083 /// documents. Used by the M8 [`crate::Query::count`] no-decode
1084 /// fast path; the iterator visits leaf pages and counts entries
1085 /// rather than running each through postcard.
1086 ///
1087 /// Power-of-ten Rule 2: bounded by the B+tree's `MAX_RANGE_NODES`
1088 /// budget (inherited from `BTree::range`).
1089 ///
1090 /// # Errors
1091 ///
1092 /// Pager / B-tree errors propagated.
1093 pub fn count_all(&self) -> Result<u64> {
1094 // Closure (rather than `Collection::count_all`) so the
1095 // higher-ranked lifetime on `dispatch_lazy`'s
1096 // `for<'a> FnOnce(&Collection<'a, T>) -> _` bound is
1097 // satisfied — `Collection::count_all` resolves to a single
1098 // lifetime fn-item type, which fails the HRTB check.
1099 #[allow(clippy::redundant_closure_for_method_calls)]
1100 if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
1101 return r;
1102 }
1103 // Read mode pins the full-tree scan to the txn's snapshot
1104 // (M12 #12) so a concurrent writer's post-snapshot inserts
1105 // cannot perturb the count; write mode keeps the live scan so
1106 // the txn sees its own uncommitted writes.
1107 match &self.mode {
1108 CollectionMode::Read(r) => {
1109 let pager = lock_pager(r.env)?;
1110 let pid = PageId::new(self.descriptor.primary_root)
1111 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1112 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
1113 count_range_iter(iter)
1114 }
1115 CollectionMode::Write(w) => {
1116 // #90: count the LIVE primary root so an in-txn count
1117 // reflects this txn's own uncommitted inserts/deletes.
1118 let root = self.write_primary_root(w)?;
1119 let mut pager = lock_pager(w.env)?;
1120 let tree = btree_handle(&pager, root)?;
1121 let iter = tree.range(&mut pager, ..)?;
1122 count_range_iter(iter)
1123 }
1124 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1125 operation: "internal: lazy-mode count_all",
1126 }),
1127 }
1128 }
1129
1130 /// Count every entry whose encoded key falls inside `range` on
1131 /// the named index's B-tree, WITHOUT decoding any document. M8
1132 /// fast path for [`crate::Query::count`] when the source is an
1133 /// `index_range`.
1134 ///
1135 /// Returns the number of index B-tree entries — for an `Each`
1136 /// index that may exceed the document count (one doc emits
1137 /// multiple entries); for other kinds it equals the matching
1138 /// doc count.
1139 ///
1140 /// # Errors
1141 ///
1142 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1143 /// - Pager / B-tree errors propagated.
1144 pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1145 where
1146 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1147 {
1148 let start = encode_dynamic_bound(range.start_bound())?;
1149 let end = encode_dynamic_bound(range.end_bound())?;
1150 self.count_index_range_encoded(index_name, start, end)
1151 }
1152
1153 /// Encoded-bytes variant of [`Self::count_index_range`]. Bounds
1154 /// are the order-preserving field encoding of the user's
1155 /// `Dynamic` value(s); used by the query-layer count fast path.
1156 pub(crate) fn count_index_range_encoded(
1157 &self,
1158 index_name: &str,
1159 start_bound: std::ops::Bound<Vec<u8>>,
1160 end_bound: std::ops::Bound<Vec<u8>>,
1161 ) -> Result<u64> {
1162 if let Some(r) = self.dispatch_lazy(|c| {
1163 c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
1164 }) {
1165 return r;
1166 }
1167 let descriptor = self.active_index(index_name)?;
1168 let (start, end) =
1169 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1170 let entries = self.collect_range(descriptor, start, end)?;
1171 // `entries.len()` is a `usize`; promote to `u64` carefully.
1172 // `usize` is at most 64 bits on every supported target.
1173 u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1174 reason: "index range entry count exceeds u64",
1175 })
1176 }
1177
1178 /// Count distinct document `Id`s whose entries fall inside
1179 /// `range` on the named index's B-tree, WITHOUT decoding any
1180 /// document. For `Each` indexes this is the correct shape of
1181 /// the "how many docs match" question — `count_index_range`
1182 /// returns the entry count, which overshoots when a single doc
1183 /// contributes multiple entries.
1184 ///
1185 /// Implementation walks the index B-tree, parses the trailing
1186 /// 8-byte big-endian `Id` suffix from each non-unique key, and
1187 /// tracks the unique set in a bounded [`std::collections::HashSet`]
1188 /// capped at [`MAX_DISTINCT_IDS`]. Exceeding the cap surfaces
1189 /// [`Error::DistinctCountExceeded`] — the caller should narrow
1190 /// the range.
1191 ///
1192 /// # Per-kind semantics
1193 ///
1194 /// - `Standard`, `Composite`: equivalent to `count_index_range`
1195 /// (one entry per doc by construction; the trailing-id-suffix
1196 /// walk still produces the same total).
1197 /// - `Unique`: keys carry NO id suffix — the entry value is the
1198 /// raw 8-byte `Id`; the walk reads the value instead.
1199 /// - `Each`: the dedup is meaningful — one doc may contribute
1200 /// N entries under N distinct element keys.
1201 ///
1202 /// # Errors
1203 ///
1204 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1205 /// - [`Error::DistinctCountExceeded`] if the distinct set
1206 /// exceeds [`MAX_DISTINCT_IDS`].
1207 /// - [`Error::Corruption`] if an entry's id suffix / value is
1208 /// not parseable as an [`obj_core::Id`].
1209 /// - Pager / B-tree errors propagated.
1210 pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1211 where
1212 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1213 {
1214 let start = encode_dynamic_bound(range.start_bound())?;
1215 let end = encode_dynamic_bound(range.end_bound())?;
1216 self.count_distinct_ids_in_range_encoded(index_name, start, end)
1217 }
1218
1219 /// Encoded-bytes variant of [`Self::count_distinct_ids_in_range`].
1220 /// Bounds are the order-preserving field encoding of the user's
1221 /// `Dynamic` value(s); used by the query-layer count fast path.
1222 pub(crate) fn count_distinct_ids_in_range_encoded(
1223 &self,
1224 index_name: &str,
1225 start_bound: std::ops::Bound<Vec<u8>>,
1226 end_bound: std::ops::Bound<Vec<u8>>,
1227 ) -> Result<u64> {
1228 if let Some(r) = self.dispatch_lazy(|c| {
1229 c.count_distinct_ids_in_range_encoded(
1230 index_name,
1231 start_bound.clone(),
1232 end_bound.clone(),
1233 )
1234 }) {
1235 return r;
1236 }
1237 let descriptor = self.active_index(index_name)?;
1238 let (start, end) =
1239 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1240 let entries = self.collect_range(descriptor, start, end)?;
1241 let mut distinct: HashSet<u64> = HashSet::new();
1242 for (full_key, value) in entries {
1243 let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
1244 if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
1245 return Err(Error::DistinctCountExceeded {
1246 limit: MAX_DISTINCT_IDS,
1247 });
1248 }
1249 }
1250 // `distinct.len()` is a `usize`; promote to `u64` carefully.
1251 // `usize` is at most 64 bits on every supported target.
1252 u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
1253 reason: "distinct id count exceeds u64",
1254 })
1255 }
1256
1257 /// Materialise every `(Id, T)` pair in the collection.
1258 ///
1259 /// Implementation note: M6 returns an owned `Vec` rather than a
1260 /// streaming iterator because the B+tree range API borrows the
1261 /// pager, and threading that borrow through the mutex guards
1262 /// in the iterator chain is awkward. M7+ may convert to a
1263 /// streaming shape once the index API is in place.
1264 ///
1265 /// # Errors
1266 ///
1267 /// Pager / B-tree / codec errors propagated.
1268 pub fn all(&self) -> Result<Vec<(Id, T)>> {
1269 // Closure (rather than `Collection::all`) for the same
1270 // HRTB reason documented on `count_all` above.
1271 #[allow(clippy::redundant_closure_for_method_calls)]
1272 if let Some(r) = self.dispatch_lazy(|c| c.all()) {
1273 return r;
1274 }
1275 match &self.mode {
1276 CollectionMode::Write(write) => {
1277 // #90: scan the LIVE primary root so an in-txn `all()`
1278 // includes this txn's own uncommitted inserts.
1279 let root = self.write_primary_root(write)?;
1280 let mut pager = lock_pager(write.env)?;
1281 scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
1282 }
1283 CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
1284 read.snapshot,
1285 read.env,
1286 self.descriptor.primary_root,
1287 self.descriptor.collection_id,
1288 ),
1289 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1290 operation: "internal: lazy-mode all",
1291 }),
1292 }
1293 }
1294
1295 fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
1296 match &self.mode {
1297 CollectionMode::Write(w) => Ok(w),
1298 CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
1299 Err(Error::ReadOnly { operation: op })
1300 }
1301 }
1302 }
1303
1304 /// If this handle is in `Lazy` mode, dispatch `body` through a
1305 /// fresh read transaction on the bound [`crate::Db`] — opening a
1306 /// transient [`Collection`] via [`Self::open_readonly_named`] and
1307 /// invoking the user-supplied closure on it. Returns `Some(_)`
1308 /// when the dispatch fires (the closure ran or the underlying
1309 /// open failed); returns `None` for non-Lazy handles so the
1310 /// caller can fall through to the existing logic.
1311 ///
1312 /// Power-of-ten Rule 4: keeps each public method's body small —
1313 /// the dispatch shim is one match arm instead of a per-method
1314 /// `if let CollectionMode::Lazy(_)` ladder.
1315 fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
1316 where
1317 F: FnOnce(&Collection<'_, T>) -> Result<R>,
1318 {
1319 match &self.mode {
1320 CollectionMode::Lazy(LazyRef { db }) => {
1321 // Clone the stored name into an owned `Cow` for the
1322 // transient handle opened inside the private read txn.
1323 // `into_owned` keeps the `'static` Cow bound the
1324 // `open_readonly_named` signature requires regardless
1325 // of whether the stored name was Borrowed or Owned.
1326 let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
1327 Some(db.read_transaction(move |tx| {
1328 let coll = Collection::<T>::open_readonly_named(tx, name)?;
1329 body(&coll)
1330 }))
1331 }
1332 _ => None,
1333 }
1334 }
1335}
1336
1337// ---------- internal helpers --------------------------------------
1338
1339fn lock_pager(
1340 env: &obj_core::TxnEnv<FileHandle>,
1341) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1342 env.pager().lock().map_err(|_| Error::Busy {
1343 kind: obj_core::LockKind::WriterInProcess,
1344 })
1345}
1346
1347/// Ensure `T::COLLECTION` exists in the catalog, lazy-creating an
1348/// empty primary B-tree on first call. Used on the write side.
1349fn ensure_collection<T: Document>(
1350 inner: &obj_core::WriteTxn<'_, FileHandle>,
1351 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1352) -> Result<CollectionDescriptor> {
1353 let mut pager = inner.lock_pager()?;
1354 let mut catalog_guard = lock_catalog(catalog)?;
1355 if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
1356 return Ok(d);
1357 }
1358 let tree = BTree::<FileHandle>::empty(&mut pager)?;
1359 let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
1360 let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
1361 catalog_guard
1362 .get(&mut pager, T::COLLECTION)?
1363 .ok_or(Error::Corruption { page_id: 0 })
1364}
1365
1366/// Re-read the descriptor for `T::COLLECTION` after any catalog
1367/// mutation. Used by [`Collection::open_or_create`] after the
1368/// reconciler runs.
1369fn reread_descriptor<T: Document>(
1370 inner: &obj_core::WriteTxn<'_, FileHandle>,
1371 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1372) -> Result<CollectionDescriptor> {
1373 let mut pager = inner.lock_pager()?;
1374 let catalog_guard = lock_catalog(catalog)?;
1375 catalog_guard
1376 .get(&mut pager, T::COLLECTION)?
1377 .ok_or(Error::Corruption { page_id: 0 })
1378}
1379
1380/// Reconcile `T::indexes()` against the catalog's stored descriptors
1381/// on the FIRST call per process per collection. Subsequent calls
1382/// observe the cache hit (in the shared `reconciled` set OR this txn's
1383/// `staged` set) and skip the catalog walk.
1384///
1385/// Reconciliation runs inside the user's WAL transaction so a
1386/// rolled-back txn leaves the catalog clean. If reconciliation
1387/// fails (e.g. `Error::IndexKindMismatch`), neither set is populated
1388/// so the next attempt re-runs the reconciler.
1389///
1390/// #93 — the skip-check is `shared ∪ staged`: a second handle of the
1391/// same collection within ONE txn still skips the (idempotent) catalog
1392/// walk via `staged`, but the name is recorded in the per-txn `staged`
1393/// set, NOT the shared set. [`crate::WriteTxn::commit`] promotes the
1394/// staged names into the shared set only after the WAL commit lands —
1395/// so a rolled-back lazy-create can never poison the shared cache into
1396/// skipping reconciliation on a later txn (the original bug).
1397fn reconcile_indexes_once<T: Document>(
1398 inner: &obj_core::WriteTxn<'_, FileHandle>,
1399 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1400 reconciled: &Arc<Mutex<HashSet<String>>>,
1401 staged: &mut HashSet<String>,
1402) -> Result<()> {
1403 // Fast path: already reconciled in a prior committed txn (shared)
1404 // or earlier in THIS txn (staged) — skip the catalog walk. Probe
1405 // `staged` first: it needs no lock and covers the repeat-handle
1406 // case; the shared probe takes the mutex only when `staged` misses.
1407 if staged.contains(T::COLLECTION) {
1408 return Ok(());
1409 }
1410 {
1411 let cache = lock_reconciled(reconciled)?;
1412 if cache.contains(T::COLLECTION) {
1413 return Ok(());
1414 }
1415 }
1416 let specs = T::indexes();
1417 // Validate specs before any catalog mutation so a bad spec
1418 // does not leave a half-reconciled catalog.
1419 for spec in &specs {
1420 spec.validate()?;
1421 }
1422 {
1423 let mut pager = inner.lock_pager()?;
1424 let mut catalog_guard = lock_catalog(catalog)?;
1425 let _post = catalog_guard.reconcile_indexes(&mut pager, T::COLLECTION, &specs)?;
1426 }
1427 // #93: stage in the PER-TXN set, not the shared one. Promotion to
1428 // the shared set is deferred to a successful `WriteTxn::commit`.
1429 staged.insert(T::COLLECTION.to_owned());
1430 Ok(())
1431}
1432
1433fn lock_reconciled(
1434 reconciled: &Arc<Mutex<HashSet<String>>>,
1435) -> Result<std::sync::MutexGuard<'_, HashSet<String>>> {
1436 reconciled.lock().map_err(|_| Error::Busy {
1437 kind: obj_core::LockKind::WriterInProcess,
1438 })
1439}
1440
1441/// Read-side descriptor lookup against a caller-supplied
1442/// collection name. M11 #93 introduced this byte-shape so the
1443/// namespace-aware [`Collection::open_readonly`] can perform the
1444/// catalog walk against either the calling Db's snapshot (no
1445/// namespace) or an attached Db's snapshot (with the namespace
1446/// prefix stripped).
1447fn read_descriptor_via_snapshot_named(
1448 env: &obj_core::TxnEnv<FileHandle>,
1449 snapshot: &obj_core::ReaderSnapshot<FileHandle>,
1450 name: &str,
1451) -> Result<Option<CollectionDescriptor>> {
1452 let pager = lock_pager(env)?;
1453 Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1454}
1455
1456/// #83 (b): fused one-shot point read for [`crate::Db::get`].
1457///
1458/// Resolves the collection descriptor and the primary-tree value for
1459/// `id` under a SINGLE pager-mutex acquisition (see
1460/// [`snapshot_resolve_and_get`]), then decodes the bytes against the
1461/// resolved `collection_id`. This collapses the two back-to-back
1462/// pager locks the equivalent handle path pays (one to open the
1463/// handle / resolve the descriptor, one for the `get`).
1464///
1465/// Observably identical to
1466/// `tx.collection::<T>()?.get(id)` for the one-shot caller: the
1467/// descriptor lookup and the value get run on the same `ReadTxn`
1468/// snapshot, and a missing collection still surfaces as
1469/// [`Error::CollectionNotFound`]. Namespaced reads (`<ns>.<tail>`)
1470/// fall back to the handle path so the attached-snapshot dispatch is
1471/// unchanged.
1472pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
1473 let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
1474 if namespace.is_some() {
1475 // Namespaced: keep the existing attached-snapshot dispatch.
1476 return Collection::<T>::open_readonly(tx)?.get(id);
1477 }
1478 let key = id.to_be_bytes();
1479 let resolved =
1480 snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
1481 match resolved {
1482 Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
1483 None => Ok(None),
1484 }
1485}
1486
1487/// Re-read the descriptor inside an already-locked pager + catalog
1488/// pair. Surfaces a missing collection as `Error::Corruption`
1489/// because the caller has already opened a write txn against it.
1490fn catalog_get_required(
1491 pager: &mut Pager<FileHandle>,
1492 catalog: &Catalog<FileHandle>,
1493 name: &str,
1494) -> Result<CollectionDescriptor> {
1495 catalog
1496 .get(pager, name)?
1497 .ok_or(Error::Corruption { page_id: 0 })
1498}
1499
1500fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1501 let root_pid =
1502 PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1503 BTree::<FileHandle>::open(pager, root_pid)
1504}
1505
1506/// Drain a B-tree range iterator, counting entries WITHOUT retaining
1507/// their bytes. Shared by [`Collection::count_all`]'s live (write) and
1508/// snapshot-pinned (read) scan arms. Power-of-ten Rule 2: the
1509/// iterator carries its own `MAX_RANGE_NODES` budget; the `u64`
1510/// overflow check guards the count itself.
1511fn count_range_iter<I>(iter: I) -> Result<u64>
1512where
1513 I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
1514{
1515 let mut n: u64 = 0;
1516 for step in iter {
1517 // Probe-only: drop the bytes the moment they decode.
1518 let _ = step?;
1519 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1520 reason: "primary tree entry count exceeds u64",
1521 })?;
1522 }
1523 Ok(n)
1524}
1525
1526// `persist_root` removed in M7 #58: every mutating method now
1527// routes through `index_maint::apply_doc_change`, which persists
1528// the descriptor (including the possibly-advanced `primary_root`)
1529// via `Catalog::update` after every per-index B-tree mutation.
1530
1531/// Snapshot-consistent B-tree lookup (M6 #53).
1532///
1533/// Walks the primary B+tree rooted at `primary_root` using
1534/// [`obj_core::btree::BTree::get_via_snapshot`], which descends
1535/// through [`obj_core::ReaderSnapshot::read_page`] rather than the
1536/// live `Pager::read_page`. This bypasses the WAL `state.view` /
1537/// `state.pending` overlays — a concurrent writer's post-snapshot
1538/// COW commits cannot poison the reader's walk.
1539///
1540/// `primary_root` MUST be the descriptor's `primary_root` as-of
1541/// the snapshot's pinned LSN (i.e. the value read via
1542/// [`obj_core::Catalog::lookup_via_snapshot`] in
1543/// [`read_descriptor_via_snapshot`] above). Using the writer's
1544/// live `primary_root` would defeat the snapshot read.
1545fn snapshot_get_via_btree(
1546 snap: &obj_core::ReaderSnapshot<FileHandle>,
1547 env: &obj_core::TxnEnv<FileHandle>,
1548 primary_root: u64,
1549 key: &[u8],
1550) -> Result<Option<Vec<u8>>> {
1551 let pager = lock_pager(env)?;
1552 let root_pid = PageId::new(primary_root)
1553 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1554 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
1555}
1556
1557/// #83 (b): fused single-lock read. Resolves the descriptor for
1558/// `name` and performs the primary-tree `get` for `key` under ONE
1559/// pager-mutex acquisition, against the SAME `snapshot` — collapsing
1560/// the descriptor-lookup lock (`read_descriptor_via_snapshot_named`)
1561/// and the value-get lock (`snapshot_get_via_btree`) that the
1562/// two-call handle path pays back-to-back.
1563///
1564/// Returns `(descriptor, value)` so the caller can `decode` against
1565/// the resolved `collection_id`. A missing collection surfaces as
1566/// `Err(CollectionNotFound)` (matching the handle path's open-time
1567/// contract for the one-shot caller); a present collection with no
1568/// entry for `key` surfaces as `Ok(None)`.
1569///
1570/// Power-of-ten: keeps poison → `Error::Busy` (Rule 7, via
1571/// `lock_pager`); ≤ 60 lines (Rule 4); `debug_assert`s that the
1572/// snapshot-resolved `primary_root` is the one fed to the get
1573/// (Rule 5).
1574fn snapshot_resolve_and_get(
1575 snap: &obj_core::ReaderSnapshot<FileHandle>,
1576 env: &obj_core::TxnEnv<FileHandle>,
1577 name: &str,
1578 key: &[u8],
1579) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
1580 let pager = lock_pager(env)?;
1581 let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
1582 return Err(Error::CollectionNotFound {
1583 name: name.to_owned(),
1584 });
1585 };
1586 let root_pid = PageId::new(descriptor.primary_root)
1587 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1588 // Rule 5: the value-get MUST descend the descriptor's own
1589 // snapshot-time root — never a re-resolved or live root. A live
1590 // primary B-tree root is always page 1+, so a zero `collection_id`
1591 // here would mean we resolved a degenerate catalog row.
1592 debug_assert_eq!(
1593 root_pid.get(),
1594 descriptor.primary_root,
1595 "fused get must descend the snapshot-resolved primary_root",
1596 );
1597 let value =
1598 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
1599 Ok(value.map(|v| (descriptor, v)))
1600}
1601
1602fn scan_all<T: Document>(
1603 pager: &mut Pager<FileHandle>,
1604 primary_root: u64,
1605 collection_id: u32,
1606) -> Result<Vec<(Id, T)>> {
1607 let tree = btree_handle(pager, primary_root)?;
1608 let iter = tree.range(pager, ..)?;
1609 let mut out = Vec::new();
1610 for entry in iter {
1611 let (key, value) = entry?;
1612 let id = Id::from_be_bytes(&key)
1613 .ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
1614 let doc = decode::<T>(&value, collection_id)?;
1615 out.push((id, doc));
1616 }
1617 Ok(out)
1618}
1619
1620fn snapshot_scan_via_btree<T: Document>(
1621 _snap: &obj_core::ReaderSnapshot<FileHandle>,
1622 env: &obj_core::TxnEnv<FileHandle>,
1623 primary_root: u64,
1624 collection_id: u32,
1625) -> Result<Vec<(Id, T)>> {
1626 let mut pager = lock_pager(env)?;
1627 scan_all::<T>(&mut pager, primary_root, collection_id)
1628}
1629
1630/// Encode the caller-supplied `Dynamic` value(s) into the bytes a
1631/// lookup against `descriptor` would use as a B-tree key. For
1632/// `Unique` indexes the result is the key bytes verbatim; for
1633/// non-unique kinds the lookup helpers extend with the per-doc
1634/// id suffix at scan time.
1635fn index_key_for_lookup(
1636 descriptor: &obj_core::IndexDescriptor,
1637 fields: &[obj_core::codec::Dynamic],
1638) -> Result<obj_core::index::EncodedIndexKey> {
1639 // Ref-based encode (#84): pass the descriptor's `kind` (Copy) and
1640 // `key_paths` BY REFERENCE — no transient `IndexSpec`, no
1641 // `name`/`key_paths` clone, no redundant `IndexSpec::validate` on
1642 // an already-validated on-disk descriptor. The byte output is
1643 // identical to the old `from_parts` + `encode_index_key` path
1644 // (see `encode_index_key_parts` and the byte-identity test).
1645 obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
1646}
1647
1648/// Encode a `Bound<&Dynamic>` into the index-key `Bound<Vec<u8>>` the
1649/// B-tree scan uses. Shared by the `Dynamic`-taking range methods on
1650/// [`Collection`].
1651///
1652/// A scalar `Dynamic` is encoded with the order-preserving field
1653/// encoder ([`obj_core::index::encode_field`]) — byte-identical to
1654/// what [`crate::Query::index_range`] produces, so a query and a
1655/// direct collection scan over the same scalar bound observe the
1656/// same entries. A [`Dynamic::Seq`](obj_core::codec::Dynamic::Seq)
1657/// bound is encoded as a composite key (the
1658/// [`COMPOSITE_TAG`](obj_core::index::COMPOSITE_TAG)-prefixed
1659/// concatenation of each element's field encoding) so a `Composite`
1660/// index can be range-scanned by a full tuple bound.
1661fn encode_dynamic_bound(
1662 b: std::ops::Bound<&obj_core::codec::Dynamic>,
1663) -> Result<std::ops::Bound<Vec<u8>>> {
1664 match b {
1665 std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
1666 std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
1667 std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
1668 }
1669}
1670
1671/// Encode one `Dynamic` bound value into index-key bytes. Scalars go
1672/// through [`obj_core::index::encode_field`]; a `Seq` is encoded as a
1673/// composite tuple key. Power-of-ten Rule 4: kept separate so
1674/// [`encode_dynamic_bound`] stays a thin three-arm match.
1675fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
1676 match v {
1677 obj_core::codec::Dynamic::Seq(fields) => {
1678 // `COMPOSITE_TAG || encode_field(f0) || encode_field(f1) ..`
1679 // is byte-identical to `encode_index_key`'s composite path
1680 // for the same fields — see `obj_core::index::key`.
1681 let mut out = vec![obj_core::index::COMPOSITE_TAG];
1682 for f in fields {
1683 out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
1684 }
1685 Ok(out)
1686 }
1687 _ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
1688 }
1689}
1690
1691/// Append 8 `0xFF` bytes to `prefix`. Used as the exclusive upper
1692/// bound of an equality lookup against a non-unique index: every
1693/// key with the same user-prefix is ≤ `prefix || 0xFF..` because
1694/// the trailing 8 bytes are an `Id` (`u64` BE).
1695fn append_max_id(prefix: &[u8]) -> Vec<u8> {
1696 let mut out = Vec::with_capacity(prefix.len() + 8);
1697 out.extend_from_slice(prefix);
1698 out.extend_from_slice(&u64::MAX.to_be_bytes());
1699 out
1700}
1701
1702/// Trim the trailing 8-byte id suffix off a non-unique index key.
1703/// For `Unique` keys the suffix is absent, so the full key is the
1704/// user portion.
1705fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
1706 match kind {
1707 obj_core::IndexKind::Unique => full_key.to_vec(),
1708 _ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
1709 _ => full_key.to_vec(),
1710 }
1711}
1712
1713/// Recover the `Id` (as a `u64`) from one index B-tree entry. For
1714/// non-unique kinds the id is the trailing 8 bytes of the KEY (the
1715/// suffix appended by the maintenance path); for `Unique` keys the
1716/// id is the VALUE. Used by
1717/// [`Collection::count_distinct_ids_in_range`].
1718fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
1719 // `Unique` indexes carry the id in the value; non-unique kinds
1720 // (Standard / Each / Composite) carry it as the trailing 8-byte
1721 // suffix of the key. The slicing here is O(1) — no per-entry
1722 // loop to bound (the outer walk's bound is the distinct-set cap).
1723 let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
1724 value
1725 } else {
1726 if full_key.len() < 8 {
1727 return Err(Error::Corruption { page_id: 0 });
1728 }
1729 &full_key[full_key.len() - 8..]
1730 };
1731 let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
1732 Ok(id.get())
1733}
1734
1735// =====================================================================
1736// Phase 7A (M14 #14) — streaming index range iterator
1737// =====================================================================
1738
1739/// Resumption marker for [`IterIndexRange`]'s first refill. After the
1740/// first batch the iterator switches to `Excluded(last_emitted_full_key)`
1741/// for subsequent refills (the same shape `Db::iter_all` uses for the
1742/// primary tree).
1743enum InitialResume {
1744 Included(Vec<u8>),
1745 Excluded(Vec<u8>),
1746 Unbounded,
1747}
1748
1749/// One entry in [`IterIndexRange`]'s pending buffer. Read/Write
1750/// modes stage `Pending(key, id)` and resolve the `T` lazily on
1751/// `next()`; Lazy mode pre-resolves under a single `read_transaction`
1752/// (to preserve snapshot consistency across the index walk + the
1753/// per-row primary `get`) and stages `Resolved(key, T)` directly.
1754enum StagedEntry<T> {
1755 Pending(Vec<u8>, Id),
1756 Resolved(Vec<u8>, T),
1757}
1758
1759/// Streaming iterator returned by [`Collection::iter_range`]. Yields
1760/// `Result<(user_key_bytes, T)>` one row at a time; internally
1761/// refills a fixed-size `(user_key, Id)` buffer in batches of
1762/// `ITER_INDEX_RANGE_BATCH = 256` so the per-step pager-lock cost
1763/// amortises. Memory stays bounded at `O(batch × small_bytes +
1764/// distinct_ids)` regardless of the range's total size.
1765///
1766/// Held data: a `&'a Collection<'_, T>` borrow (the iterator is bound
1767/// to the lifetime of `Collection::iter_range`'s `&self` borrow), the
1768/// index's root page-id, the dedup set for `Each` indexes, the next-
1769/// chunk resumption marker, and the staged batch.
1770pub struct IterIndexRange<'a, T: Document> {
1771 coll: &'a Collection<'a, T>,
1772 descriptor_kind: obj_core::IndexKind,
1773 index_root: u64,
1774 /// First-refill marker — `None` after the iterator has emitted
1775 /// at least one chunk; subsequent refills use `last_full_key`.
1776 initial_resume: Option<InitialResume>,
1777 /// Last full B-tree key emitted by the most recent refill. Drives
1778 /// the `Excluded(_)` resumption bound for the next chunk.
1779 last_full_key: Option<Vec<u8>>,
1780 /// User-supplied end bound (already widened per index kind).
1781 end_bound: Bound<Vec<u8>>,
1782 /// Pre-staged entries from the most recent refill. `next()`
1783 /// pops from the front. Each entry is either `Pending(key, id)`
1784 /// (deferred get-back, the Read/Write streaming path) or
1785 /// `Resolved(key, T)` (eager get-back inside a single
1786 /// `read_transaction`, the Lazy-mode fallback).
1787 buffer: VecDeque<Result<StagedEntry<T>>>,
1788 /// Persistent de-dup set for `Each` indexes. Power-of-ten Rule
1789 /// 3: the set is intentionally unbounded — if the caller wants
1790 /// a hard cap they should use
1791 /// [`Collection::count_distinct_ids_in_range`] (which caps at
1792 /// [`MAX_DISTINCT_IDS`]); the iterator's correctness contract
1793 /// is per-row dedup across the whole range.
1794 emitted_ids: HashSet<u64>,
1795 finished: bool,
1796}
1797
1798impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
1799 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1800 f.debug_struct("IterIndexRange")
1801 .field("descriptor_kind", &self.descriptor_kind)
1802 .field("index_root", &self.index_root)
1803 .field("buffer_len", &self.buffer.len())
1804 .field("emitted_ids_len", &self.emitted_ids.len())
1805 .field("finished", &self.finished)
1806 .finish_non_exhaustive()
1807 }
1808}
1809
1810impl<T: Document> IterIndexRange<'_, T> {
1811 /// Refill `self.buffer` with up to [`ITER_INDEX_RANGE_BATCH`]
1812 /// `(user_key, Id)` pairs by walking the index B-tree from the
1813 /// current resumption marker. Sets `self.finished` when the
1814 /// underlying range scan yields fewer than the requested batch
1815 /// (i.e. it ran past the end bound).
1816 ///
1817 /// Power-of-ten Rule 7: per-step decode errors are pushed into
1818 /// the buffer as `Err(_)` so the caller observes them via
1819 /// `next()` rather than aborting iteration.
1820 fn refill(&mut self) -> Result<()> {
1821 let env = match &self.coll.mode {
1822 CollectionMode::Write(w) => w.env,
1823 CollectionMode::Read(r) => r.env,
1824 CollectionMode::Lazy(_) => {
1825 // Lazy mode falls back to the eager `index_range`
1826 // path in `iter_range` itself (see below). Reaching
1827 // refill in Lazy mode indicates an internal logic
1828 // error, NOT a recoverable corruption — surface as
1829 // a typed error rather than `unwrap`.
1830 return Err(Error::ReadOnly {
1831 operation: "internal: iter_range refill in Lazy mode",
1832 });
1833 }
1834 };
1835 let root_pid = PageId::new(self.index_root)
1836 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1837 let start = self.next_start_bound();
1838 let end = clone_bound_ref(&self.end_bound);
1839 let mut pager = lock_pager(env)?;
1840 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1841 let iter = tree.range(&mut pager, (start, end))?;
1842 let mut staged: VecDeque<Result<StagedEntry<T>>> =
1843 VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
1844 let mut last_full: Option<Vec<u8>> = None;
1845 let mut consumed: usize = 0;
1846 for step in iter {
1847 if consumed >= ITER_INDEX_RANGE_BATCH {
1848 break;
1849 }
1850 consumed = consumed
1851 .checked_add(1)
1852 .ok_or(Error::BTreeInvariantViolated {
1853 reason: "iter_range batch counter overflow",
1854 })?;
1855 self.stage_one(&mut staged, &mut last_full, step);
1856 }
1857 if consumed < ITER_INDEX_RANGE_BATCH {
1858 self.finished = true;
1859 }
1860 drop(pager);
1861 self.buffer.extend(staged);
1862 if let Some(k) = last_full {
1863 self.last_full_key = Some(k);
1864 }
1865 Ok(())
1866 }
1867
1868 /// Process one B-tree step into the staged batch. Encapsulates
1869 /// the `Each`-dedup, the trailing-id-suffix strip, and the
1870 /// `Id::from_be_bytes` parse. Free helper so the refill body
1871 /// stays under the Rule-4 60-line ceiling.
1872 fn stage_one(
1873 &mut self,
1874 staged: &mut VecDeque<Result<StagedEntry<T>>>,
1875 last_full: &mut Option<Vec<u8>>,
1876 step: Result<(Vec<u8>, Vec<u8>)>,
1877 ) {
1878 let (full_key, id_bytes) = match step {
1879 Ok(kv) => kv,
1880 Err(e) => {
1881 staged.push_back(Err(e));
1882 return;
1883 }
1884 };
1885 *last_full = Some(full_key.clone());
1886 let Some(id) = Id::from_be_bytes(&id_bytes) else {
1887 staged.push_back(Err(Error::Corruption { page_id: 0 }));
1888 return;
1889 };
1890 if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
1891 // Same doc already emitted under a different element
1892 // key — skip without producing an output entry.
1893 return;
1894 }
1895 let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
1896 staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
1897 }
1898
1899 /// Compute the start bound for the next refill: use
1900 /// `initial_resume` on the first call (consuming it), thereafter
1901 /// use `Excluded(last_full_key)`.
1902 fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
1903 if let Some(initial) = self.initial_resume.take() {
1904 return match initial {
1905 InitialResume::Included(k) => Bound::Included(k),
1906 InitialResume::Excluded(k) => Bound::Excluded(k),
1907 InitialResume::Unbounded => Bound::Unbounded,
1908 };
1909 }
1910 match &self.last_full_key {
1911 Some(k) => Bound::Excluded(k.clone()),
1912 None => Bound::Unbounded,
1913 }
1914 }
1915}
1916
1917impl<T: Document> Iterator for IterIndexRange<'_, T> {
1918 type Item = Result<(Vec<u8>, T)>;
1919
1920 fn next(&mut self) -> Option<Self::Item> {
1921 loop {
1922 if let Some(staged) = self.buffer.pop_front() {
1923 return Some(self.resolve_one(staged));
1924 }
1925 if self.finished {
1926 return None;
1927 }
1928 if let Err(e) = self.refill() {
1929 // Latch the iterator shut on a refill failure
1930 // (lock acquisition, B-tree open, etc.). Surface
1931 // the error once, then return None on subsequent
1932 // calls — power-of-ten Rule 7.
1933 self.finished = true;
1934 return Some(Err(e));
1935 }
1936 // refill ran; the loop will pop or notice finished.
1937 }
1938 }
1939}
1940
1941impl<T: Document> IterIndexRange<'_, T> {
1942 /// Resolve one staged entry into a `(user_key, T)` pair. For
1943 /// `Pending(_, id)` entries (the Read/Write streaming path),
1944 /// calls [`Collection::get`] to decode `T` on demand; for
1945 /// `Resolved(_, T)` entries (the Lazy-mode eager path), returns
1946 /// the already-decoded value. Orphan index entries (id missing
1947 /// in the primary tree) surface as [`Error::Corruption`],
1948 /// matching [`Collection::index_range`]'s existing contract.
1949 fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
1950 match staged? {
1951 StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
1952 Some(doc) => Ok((user_key, doc)),
1953 None => Err(Error::Corruption { page_id: 0 }),
1954 },
1955 StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
1956 }
1957 }
1958}
1959
1960/// Clone a `&Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`. Takes a
1961/// borrowed owned bound (the shape `IterIndexRange::end_bound`
1962/// stores) and hands back an owned copy for the resumption walk.
1963fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
1964 match b {
1965 Bound::Included(v) => Bound::Included(v.clone()),
1966 Bound::Excluded(v) => Bound::Excluded(v.clone()),
1967 Bound::Unbounded => Bound::Unbounded,
1968 }
1969}