obj/collection.rs
1//! `Collection<T>` — typed handle to one collection's primary
2//! B-tree.
3
4use std::borrow::Cow;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::marker::PhantomData;
7use std::ops::Bound;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use obj_core::btree::BTree;
11use obj_core::codec::{decode, encode};
12use obj_core::pager::page::PageId;
13use obj_core::pager::Pager;
14use obj_core::platform::FileHandle;
15use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
16
17/// Boxed iterator alias used by [`Collection::lookup`] and
18/// [`Collection::index_range`]. The iterator borrows from the
19/// enclosing transaction; the `'tx` lifetime is bound on the
20/// `Collection<T>` it was obtained from.
21pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
22
23/// Per-batch refill size for [`IterIndexRange`]. The iterator yields
24/// one `(user_key, T)` pair at a time but pulls index B-tree entries
25/// in fixed-size chunks so the per-step pager-lock acquisition cost
26/// amortises across many `next()` calls. Power-of-ten Rule 3: the
27/// buffer is fixed-size — at ~8 bytes/key plus the trailing 8-byte
28/// id suffix that's ~4 KiB peak for the staged batch. The buffer
29/// does NOT scale with the range's total size.
30const ITER_INDEX_RANGE_BATCH: usize = 256;
31
32/// Per-call cap on the bounded `HashSet<Id>` used by
33/// [`Collection::count_distinct_ids_in_range`] to count unique
34/// document `Id`s under an `Each` index. Power-of-ten Rule 3: the
35/// distinct set is allocation-bounded; exceeding the cap surfaces
36/// [`obj_core::Error::DistinctCountExceeded`] rather than chewing
37/// arbitrary memory. The user can narrow the range via
38/// `.index_range(...)` to fit inside the budget.
39pub const MAX_DISTINCT_IDS: usize = 100_000;
40
41use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
42
43/// #90 — per-transaction descriptor cache. Maps collection name to
44/// its LIVE [`CollectionDescriptor`] (with `next_id`, `primary_root`,
45/// and every index `root_page_id` advanced IN-MEMORY across the
46/// transaction). This is the single mid-txn source of truth for those
47/// roots; [`crate::WriteTxn::commit`] flushes each entry back to the
48/// catalog exactly once. Shared (via `Arc`) between the [`WriteTxn`]
49/// and every [`Collection`] handle opened on it, so two handles of the
50/// same collection observe the same advancing descriptor.
51pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
52
53/// Construct an empty [`DescriptorCache`].
54#[must_use]
55pub(crate) fn new_descriptor_cache() -> DescriptorCache {
56 Arc::new(Mutex::new(HashMap::new()))
57}
58
59/// Acquire the descriptor-cache mutex; map poison into `Busy` (Rule
60/// 7 — no panic on a poisoned lock).
61pub(crate) fn lock_descriptors(
62 cache: &Mutex<HashMap<String, CollectionDescriptor>>,
63) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
64 cache.lock().map_err(|_| Error::Busy {
65 kind: obj_core::LockKind::WriterInProcess,
66 })
67}
68
69/// Resolve the live cached descriptor for `name`, lazily loading it
70/// from the catalog B-tree on first touch in this transaction. After
71/// the first load the catalog tree is NEVER re-read mid-txn for this
72/// collection — every subsequent read/advance goes through the cache
73/// entry, so the unique pre-check and every index-tree open observe
74/// the in-memory-advanced roots (#90 load-bearing invariant).
75///
76/// Returns a mutable borrow into the cache so callers bump `next_id`,
77/// advance `primary_root`, and let `apply_doc_change` advance index
78/// roots in place — all without a per-doc `Catalog::update`.
79pub(crate) fn cached_descriptor_mut<'g>(
80 cache: &'g mut HashMap<String, CollectionDescriptor>,
81 pager: &mut Pager<FileHandle>,
82 catalog: &Catalog<FileHandle>,
83 name: &str,
84) -> Result<&'g mut CollectionDescriptor> {
85 if !cache.contains_key(name) {
86 let descriptor = catalog_get_required(pager, catalog, name)?;
87 cache.insert(name.to_owned(), descriptor);
88 }
89 cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
90}
91
92/// Typed handle to a collection.
93///
94/// Construct via [`crate::WriteTxn::collection`] (lazy-create) or
95/// [`crate::ReadTxn::collection`] (read-only; errors if absent), or
96/// via [`crate::Db::collection`] for a one-shot read-only handle
97/// bound to a runtime collection name (M11 #94 — Phase 1B).
98///
99/// All methods take `&self` because the underlying state lives
100/// behind mutexes on the parent transaction; the handle itself is
101/// stateless beyond the descriptor it caches.
102pub struct Collection<'tx, T: Document> {
103 /// `Mode::Write` carries the [`WriteTxn`] reference;
104 /// `Mode::Read` the [`ReadTxn`] reference; `Mode::Lazy` carries
105 /// a `&Db` and opens a private read transaction on each method
106 /// call. The [`Collection`]'s lifetime is bound to whichever
107 /// it was constructed from.
108 mode: CollectionMode<'tx>,
109 /// Collection name resolved at construction. For handles built
110 /// via [`crate::WriteTxn::collection`] / [`crate::ReadTxn::collection`]
111 /// this equals `T::COLLECTION`. For handles built via
112 /// [`crate::Db::collection`] this is the caller-supplied runtime
113 /// name (which may differ from `T::COLLECTION`, e.g.
114 /// `"archive.orders"` against a type whose declared `COLLECTION`
115 /// is `"orders"`).
116 //
117 // `collection_name` rather than the lint-suggested `name`
118 // because `name` collides with `IndexDescriptor::name` /
119 // `IndexSpec::name` throughout this module's internals.
120 //
121 // `Cow<'static, str>` (#84): the common `WriteTxn`/`ReadTxn`
122 // open path resolves to `T::COLLECTION` — a `&'static str` —
123 // so it can be stored as `Cow::Borrowed` with NO `to_owned()`
124 // heap allocation per read. The runtime-name path
125 // (`Db::collection`, `open_readonly_named` with a caller name)
126 // stores `Cow::Owned`.
127 #[allow(clippy::struct_field_names)]
128 collection_name: Cow<'static, str>,
129 /// Cached descriptor. Populated at construction for `Write` /
130 /// `Read` mode; never updated in place — a `Collection<T>`
131 /// reflects the catalog row that existed when the handle was
132 /// opened. `update` / `delete` inside the same txn re-read the
133 /// descriptor through the catalog lock to capture mutations
134 /// from prior calls in the same txn (e.g. an `insert` that
135 /// advanced `next_id`).
136 ///
137 /// For `Lazy` mode the descriptor is a sentinel — the real
138 /// descriptor is loaded inside each method's private read
139 /// transaction.
140 descriptor: CollectionDescriptor,
141 _phantom: PhantomData<fn() -> T>,
142}
143
144/// Backing-reference inside a [`Collection`]. Encodes whether
145/// the txn is writable.
146enum CollectionMode<'tx> {
147 Write(WriteRef<'tx>),
148 Read(ReadRef<'tx>),
149 /// Read-only handle that opens a private read transaction on
150 /// each method call. Constructed by [`crate::Db::collection`];
151 /// the `'tx` lifetime is the borrow of `&Db`.
152 Lazy(LazyRef<'tx>),
153}
154
155struct LazyRef<'db> {
156 /// Borrowed `Db` — methods open one-shot read transactions on
157 /// it. The `'db` lifetime keeps the borrow alive for as long
158 /// as the [`Collection`] handle.
159 db: &'db crate::Db,
160}
161
162struct WriteRef<'tx> {
163 env: &'tx obj_core::TxnEnv<FileHandle>,
164 catalog: Arc<Mutex<Catalog<FileHandle>>>,
165 /// #90: shared per-txn descriptor cache (cloned from the owning
166 /// [`WriteTxn`]). The single mid-txn source of truth for
167 /// `next_id` / `primary_root` / index roots; flushed once at
168 /// commit.
169 descriptors: DescriptorCache,
170}
171
172struct ReadRef<'tx> {
173 snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
174 env: &'tx obj_core::TxnEnv<FileHandle>,
175}
176
177impl<'tx, T: Document> Collection<'tx, T> {
178 /// Open the collection on the write side, lazy-creating the
179 /// catalog row + an empty primary B-tree on first call, and
180 /// reconciling the type's declared `Document::indexes()` against
181 /// the catalog's stored descriptors (M7 #57) on the first call
182 /// per process per collection.
183 pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
184 // Stage 1: lazy-create the collection row + an empty primary
185 // B-tree if this is the first call for `T` against this
186 // database. The returned descriptor is the pre-reconcile
187 // shape; we re-read after reconciliation below because the
188 // reconciler may have appended index descriptors.
189 let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
190 reconcile_indexes_once::<T>(
191 &tx.inner,
192 &tx.catalog,
193 &tx.reconciled,
194 &mut tx.reconciled_staged,
195 )?;
196 // Re-read the descriptor in case the reconciler mutated the
197 // index roster (the reconciler runs through `Catalog::update`
198 // which rewrites the descriptor's `indexes` vector).
199 let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
200 Ok(Self {
201 mode: CollectionMode::Write(WriteRef {
202 env: tx.inner.env(),
203 catalog: Arc::clone(&tx.catalog),
204 descriptors: Arc::clone(&tx.descriptors),
205 }),
206 // `T::COLLECTION` is `&'static str` (#84): borrow, don't
207 // allocate.
208 collection_name: Cow::Borrowed(T::COLLECTION),
209 descriptor,
210 _phantom: PhantomData,
211 })
212 }
213
214 /// Open the collection on the read side. Errors if the
215 /// collection has not yet been registered AT THE SNAPSHOT'S
216 /// PINNED LSN — a collection created by a concurrent writer
217 /// AFTER the snapshot was pinned is invisible to this txn (M6
218 /// #53). The descriptor is read by walking the catalog B+tree
219 /// rooted at `snapshot.root_catalog()` (the value captured by
220 /// `Pager::reader_snapshot` at pin time) using the snapshot-
221 /// aware [`Catalog::lookup_via_snapshot`] free function — NOT
222 /// via the writer's live `Catalog.tree.root`, which a concurrent
223 /// writer may have COW-advanced past the snapshot.
224 ///
225 /// M11 #93: if `T::COLLECTION` is of the form
226 /// `<namespace>.<name>`, the lookup dispatches against the
227 /// attached database registered under `<namespace>` instead of
228 /// the calling Db. The attached snapshot is the one pinned by
229 /// `Db::read_transaction` at txn-begin; the attached env is
230 /// passed through the same way.
231 pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
232 // `T::COLLECTION` is `&'static str` — store it as
233 // `Cow::Borrowed` (#84), avoiding the per-read `to_owned()`
234 // heap allocation on the common typed-handle path.
235 Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
236 }
237
238 /// Open the collection on the read side against a caller-supplied
239 /// runtime `name`. Like [`Self::open_readonly`] but the catalog
240 /// lookup uses `name` instead of `T::COLLECTION` — required for
241 /// [`crate::Db::collection`]'s Phase 1B accessor (M11 #94), which
242 /// binds a runtime collection name (e.g. `"archive.orders"`) to
243 /// the typed handle.
244 ///
245 /// Namespace dispatch (`<ns>.<tail>` → attached database) follows
246 /// the same shape as [`Self::open_readonly`].
247 pub(crate) fn open_readonly_named(
248 tx: &'tx ReadTxn<'_>,
249 name: Cow<'static, str>,
250 ) -> Result<Self> {
251 let (namespace, tail) = crate::db::split_namespace(&name);
252 let (env, snapshot, lookup_name): (
253 &'tx obj_core::TxnEnv<FileHandle>,
254 &'tx obj_core::ReaderSnapshot<FileHandle>,
255 &str,
256 ) = match namespace {
257 None => (tx.inner.env(), tx.inner.snapshot(), &name),
258 Some(ns) => {
259 let ctx = tx
260 .attached
261 .get(ns)
262 .ok_or_else(|| Error::CollectionNamespaceUnknown {
263 namespace: ns.to_owned(),
264 })?;
265 (ctx.env.as_ref(), &ctx.snapshot, tail)
266 }
267 };
268 let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
269 else {
270 return Err(Error::CollectionNotFound {
271 name: name.into_owned(),
272 });
273 };
274 Ok(Self {
275 mode: CollectionMode::Read(ReadRef { snapshot, env }),
276 // `name` is moved in unchanged — `Cow::Borrowed(&'static)`
277 // for the typed-handle path (no alloc), `Cow::Owned` for
278 // runtime-name handles.
279 collection_name: name,
280 descriptor,
281 _phantom: PhantomData,
282 })
283 }
284
285 /// Construct a deferred-lookup, read-only handle bound to a
286 /// runtime collection name. Each method call opens a private
287 /// read transaction on `db` and dispatches through
288 /// [`Self::open_readonly_named`]. Construction is infallible —
289 /// errors (missing collection, unknown namespace, etc.) surface
290 /// at the first method call.
291 ///
292 /// Used by [`crate::Db::collection`] (Phase 1B, M11 #94).
293 pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
294 Self {
295 mode: CollectionMode::Lazy(LazyRef { db }),
296 collection_name: Cow::Owned(name),
297 // Sentinel descriptor — never read in Lazy mode; the
298 // real descriptor is loaded inside each method's
299 // private read transaction.
300 descriptor: CollectionDescriptor::new(0, 0, 0),
301 _phantom: PhantomData,
302 }
303 }
304
305 /// Cached descriptor (`collection_id`, `primary_root`,
306 /// `type_version`, `next_id` at handle-open time).
307 #[must_use]
308 pub fn descriptor(&self) -> &CollectionDescriptor {
309 &self.descriptor
310 }
311
312 /// #90: the LIVE primary-tree root for a `Write`-mode handle.
313 ///
314 /// Prefers the per-txn descriptor cache (which carries every
315 /// `primary_root` advance from this txn's prior writes) so a
316 /// read-after-write on the same handle inside one transaction
317 /// observes its own uncommitted inserts. Falls back to the
318 /// handle's open-time `primary_root` if this collection has not
319 /// yet been written in the txn (no cache entry).
320 fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
321 let cache = lock_descriptors(&write.descriptors)?;
322 Ok(cache
323 .get(self.collection_name.as_ref())
324 .map_or(self.descriptor.primary_root, |d| d.primary_root))
325 }
326
327 /// #90: the LIVE `root_page_id` for the named `Active` index on a
328 /// `Write`-mode handle. Prefers the per-txn cache so a read after
329 /// an index-mutating write in the same txn descends the advanced
330 /// root; falls back to `fallback` (the handle's open-time
331 /// descriptor entry) when the collection has no cache entry yet.
332 fn write_index_root(
333 &self,
334 write: &WriteRef<'tx>,
335 index_name: &str,
336 fallback: u64,
337 ) -> Result<u64> {
338 let cache = lock_descriptors(&write.descriptors)?;
339 let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
340 return Ok(fallback);
341 };
342 let live = descriptor
343 .indexes
344 .iter()
345 .find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
346 .map_or(fallback, |d| d.root_page_id);
347 Ok(live)
348 }
349
350 /// Insert `doc`. Returns the freshly-allocated [`Id`].
351 ///
352 /// # Errors
353 ///
354 /// - [`Error::ReadOnly`] if the handle is read-only.
355 /// - Pager / catalog / codec errors propagated.
356 // `doc: T` is taken by value for caller ergonomics (the user
357 // owns the value and gives it to the database). The codec's
358 // `encode(&T, ...)` takes a reference, so clippy sees the
359 // function body as "owned but borrows only" — accepted as the
360 // intended public-API shape.
361 #[allow(clippy::needless_pass_by_value)]
362 pub fn insert(&self, doc: T) -> Result<Id> {
363 let write = self.write_or_err("insert")?;
364 let name: &str = self.collection_name.as_ref();
365 let mut pager = lock_pager(write.env)?;
366 let catalog = lock_catalog(&write.catalog)?;
367 // #90: the cached descriptor is the SOLE mid-txn source of
368 // truth. `next_id` is an in-memory bump; `primary_root` and
369 // index roots advance in the cache; NO per-doc
370 // `Catalog::update` — the single flush is deferred to commit.
371 let mut cache = lock_descriptors(&write.descriptors)?;
372 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
373 let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
374 let bytes = encode(&doc, descriptor.collection_id)?;
375 let key = id.to_be_bytes();
376 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
377 tree.insert(&mut pager, &key, &bytes)?;
378 descriptor.primary_root = tree.root().get();
379 // Index maintenance lands inside the same WAL transaction as
380 // the primary write, descending the cache's in-memory index
381 // roots. A `UniqueConstraintViolation` here surfaces via `?`
382 // and the surrounding `WriteTxn` rolls back atomically.
383 crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
384 Ok(id)
385 }
386
387 /// Fetch the document at `id`.
388 ///
389 /// On the write side this consults the pager (sees pending
390 /// writes in the current txn). On the read side it consults
391 /// the snapshot's frozen view.
392 ///
393 /// # Lazy migration (M10 #84)
394 ///
395 /// If the on-disk record was written by an older
396 /// `Document::VERSION` than the current `T::VERSION`, the codec
397 /// walks the stored bytes through the schema registered for
398 /// that version (see `T::historical_schemas()`) and dispatches
399 /// the resulting structured `Dynamic` through `T::migrate`.
400 /// **The migrated bytes are NOT written back to disk.** The
401 /// next [`Collection::get`] re-reads the same v(n) bytes and
402 /// re-runs migration. Only a subsequent
403 /// [`Collection::update`](Self::update) /
404 /// [`Collection::upsert`](Self::upsert) writes the document
405 /// back, at which point the on-disk header records
406 /// `T::VERSION`.
407 ///
408 /// This contract is what allows mixed-version reads to scale:
409 /// a 10⁹-doc collection does not need to be batch-rewritten on
410 /// schema upgrade. Power-of-ten Rule 7: every "migration ran"
411 /// path returns the migrated `T`; no implicit write-back.
412 ///
413 /// # Errors
414 ///
415 /// Pager / B-tree / codec errors propagated. In particular:
416 ///
417 /// - [`Error::SchemaNotRegistered`](obj_core::Error::SchemaNotRegistered)
418 /// if the stored record carries a `type_version` for which
419 /// `T::historical_schemas()` has no entry.
420 /// - [`Error::SchemaMigrationNotImplemented`](obj_core::Error::SchemaMigrationNotImplemented)
421 /// if the registered `T::migrate` returns the default error.
422 pub fn get(&self, id: Id) -> Result<Option<T>> {
423 if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
424 return r;
425 }
426 let key = id.to_be_bytes();
427 let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
428 return Ok(None);
429 };
430 Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
431 }
432
433 /// Read the primary-tree value at `key` against the current
434 /// `Write` / `Read` mode (Lazy mode is dispatched upstream of
435 /// every public caller via [`Self::dispatch_lazy`]).
436 fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
437 match &self.mode {
438 CollectionMode::Write(write) => {
439 // #90: descend the LIVE primary root so a get after an
440 // insert/update in the SAME txn sees its own writes.
441 let primary_root = self.write_primary_root(write)?;
442 let mut pager = lock_pager(write.env)?;
443 let tree = btree_handle(&pager, primary_root)?;
444 tree.get(&mut pager, key)
445 }
446 CollectionMode::Read(read) => {
447 snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
448 }
449 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
450 operation: "internal: lazy-mode primary read",
451 }),
452 }
453 }
454
455 /// Apply `f` to the document at `id`, writing the mutated value
456 /// back.
457 ///
458 /// # Errors
459 ///
460 /// - [`Error::ReadOnly`] on a read-side handle.
461 /// - [`Error::DocumentNotFound`] if `id` is absent.
462 /// - Pager / catalog / codec errors propagated.
463 pub fn update<F>(&self, id: Id, f: F) -> Result<()>
464 where
465 F: FnOnce(&mut T),
466 {
467 let write = self.write_or_err("update")?;
468 let name: &str = self.collection_name.as_ref();
469 let mut pager = lock_pager(write.env)?;
470 let catalog = lock_catalog(&write.catalog)?;
471 // #90: resolve the live cached descriptor (sole mid-txn source
472 // of truth); advance its roots in place, no per-doc flush.
473 let mut cache = lock_descriptors(&write.descriptors)?;
474 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
475 let key = id.to_be_bytes();
476 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
477 let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
478 collection: T::COLLECTION,
479 id: id.get(),
480 })?;
481 let old_value = decode::<T>(&existing, descriptor.collection_id)?;
482 let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
483 f(&mut new_value);
484 let bytes = encode(&new_value, descriptor.collection_id)?;
485 tree.delete(&mut pager, &key)?;
486 tree.insert(&mut pager, &key, &bytes)?;
487 descriptor.primary_root = tree.root().get();
488 crate::index_maint::apply_doc_change::<T>(
489 &mut pager,
490 descriptor,
491 Some(&old_value),
492 Some(&new_value),
493 id,
494 )?;
495 Ok(())
496 }
497
498 /// Delete the document at `id`. Returns `true` if it existed.
499 ///
500 /// # Errors
501 ///
502 /// - [`Error::ReadOnly`] on a read-side handle.
503 /// - Pager / catalog errors propagated.
504 pub fn delete(&self, id: Id) -> Result<bool> {
505 let write = self.write_or_err("delete")?;
506 let name: &str = self.collection_name.as_ref();
507 let mut pager = lock_pager(write.env)?;
508 let catalog = lock_catalog(&write.catalog)?;
509 // #90: cached descriptor is the sole mid-txn source of truth.
510 let mut cache = lock_descriptors(&write.descriptors)?;
511 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
512 let key = id.to_be_bytes();
513 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
514 // Materialise the old document so the index-maintenance
515 // path can compute the OLD key set and emit per-index
516 // delete entries. If the row does not exist, we still
517 // return `Ok(false)` for API back-compat (no index work).
518 let old_value = match tree.get(&mut pager, &key)? {
519 Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
520 None => None,
521 };
522 let removed = tree.delete(&mut pager, &key)?;
523 descriptor.primary_root = tree.root().get();
524 crate::index_maint::apply_doc_change::<T>(
525 &mut pager,
526 descriptor,
527 old_value.as_ref(),
528 None,
529 id,
530 )?;
531 Ok(removed)
532 }
533
534 /// Insert-or-replace `doc` at `id`.
535 ///
536 /// # Errors
537 ///
538 /// - [`Error::ReadOnly`] on a read-side handle.
539 /// - Pager / catalog / codec errors propagated.
540 // `doc: T` passed by value for ergonomic ownership transfer;
541 // codec takes `&T` so clippy reports a "borrow only" body.
542 #[allow(clippy::needless_pass_by_value)]
543 pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
544 let write = self.write_or_err("upsert")?;
545 let name: &str = self.collection_name.as_ref();
546 let mut pager = lock_pager(write.env)?;
547 let catalog = lock_catalog(&write.catalog)?;
548 // #90: cached descriptor is the sole mid-txn source of truth.
549 let mut cache = lock_descriptors(&write.descriptors)?;
550 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
551 let bytes = encode(&doc, descriptor.collection_id)?;
552 let key = id.to_be_bytes();
553 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
554 // Materialise the prior value (if any) so the index-
555 // maintenance diff sees the OLD key set.
556 let old_value = match tree.get(&mut pager, &key)? {
557 Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
558 None => None,
559 };
560 let _ = tree.delete(&mut pager, &key)?;
561 tree.insert(&mut pager, &key, &bytes)?;
562 descriptor.primary_root = tree.root().get();
563 crate::index_maint::apply_doc_change::<T>(
564 &mut pager,
565 descriptor,
566 old_value.as_ref(),
567 Some(&doc),
568 id,
569 )?;
570 Ok(())
571 }
572
573 /// Look up the single document whose `index_name` key matches
574 /// `key` under a `Unique` index.
575 ///
576 /// Errors with [`Error::IndexNotUnique`] if `index_name` resolves
577 /// to a non-unique index — `find_unique` is *only* defined on
578 /// `Unique` indexes. For `Standard` / `Each` / `Composite` use
579 /// [`Self::lookup`] (which returns an iterator).
580 ///
581 /// Snapshot-aware: on a write-side handle the lookup sees the
582 /// current txn's pending writes; on a read-side handle it sees
583 /// the snapshot's frozen view.
584 ///
585 /// # Errors
586 ///
587 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
588 /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
589 /// - Pager / B-tree / codec errors propagated.
590 pub fn find_unique(
591 &self,
592 index_name: &str,
593 key: impl Into<obj_core::codec::Dynamic>,
594 ) -> Result<Option<T>> {
595 let key_dyn = key.into();
596 if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
597 return r;
598 }
599 let descriptor = self.active_index(index_name)?;
600 if descriptor.kind != obj_core::IndexKind::Unique {
601 return Err(Error::IndexNotUnique {
602 collection: self.collection_name.clone().into_owned(),
603 name: index_name.to_owned(),
604 });
605 }
606 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
607 let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
608 match id_bytes {
609 Some(bytes) => match Id::from_be_bytes(&bytes) {
610 Some(id) => self.get(id),
611 None => Err(Error::Corruption { page_id: 0 }),
612 },
613 None => Ok(None),
614 }
615 }
616
617 /// Yield every document whose `index_name` key matches `key`.
618 /// Works on `Standard` / `Unique` / `Each` indexes. Returns
619 /// `Err(Error::IndexKindMismatch)`-style guidance for
620 /// `Composite` (use [`Self::index_range`] for tuple-shaped
621 /// keys).
622 ///
623 /// The same document is yielded at most once even if it owns
624 /// multiple matching entries — `Each` indexes can encode the
625 /// same `id` under multiple element keys; we de-dup on emit.
626 ///
627 /// # Errors
628 ///
629 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
630 /// - Pager / B-tree / codec errors propagated.
631 pub fn lookup(
632 &self,
633 index_name: &str,
634 key: impl Into<obj_core::codec::Dynamic>,
635 ) -> Result<IndexIter<'static, T>>
636 where
637 T: Send + 'static,
638 {
639 let key_dyn = key.into();
640 if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
641 return r;
642 }
643 let descriptor = self.active_index(index_name)?;
644 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
645 let ids = match descriptor.kind {
646 obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
647 obj_core::IndexKind::Standard
648 | obj_core::IndexKind::Each
649 | obj_core::IndexKind::Composite => {
650 self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
651 }
652 // `IndexKind` is `#[non_exhaustive]`; an unknown kind means
653 // this build predates the on-disk index. Refuse rather than
654 // return a wrong (possibly empty) id set.
655 _ => return Err(Error::InvalidArgument("unsupported index kind")),
656 };
657 let resolved = self.resolve_unique_ids(ids)?;
658 Ok(Box::new(resolved.into_iter().map(Ok)))
659 }
660
661 /// Yield `(user_key, doc)` pairs whose index key falls within
662 /// `range`. The bounds are [`Dynamic`](obj_core::codec::Dynamic)
663 /// values — the same ergonomic type [`crate::Query::index_range`]
664 /// takes — encoded internally through the order-preserving field
665 /// encoder ([`obj_core::index::encode_field`]); callers no longer
666 /// hand-encode index-key bytes.
667 ///
668 /// For non-Unique kinds (`Standard` / `Each` / `Composite`) the
669 /// bounds are widened internally so a user-facing
670 /// `Included(x)..=Included(x)` range matches every entry whose
671 /// user-key equals `x` even though the underlying B-tree key
672 /// carries an `id_be8` suffix (see
673 /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
674 /// § Index key
675 /// encoding § Range-bound widening (non-Unique kinds)).
676 ///
677 /// # Errors
678 ///
679 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
680 /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
681 /// carries an embedded NUL byte (the order-preserving encoder
682 /// rejects those).
683 /// - Pager / B-tree / codec errors propagated.
684 pub fn index_range<R>(
685 &self,
686 index_name: &str,
687 range: R,
688 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
689 where
690 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
691 T: Send + 'static,
692 {
693 let start = encode_dynamic_bound(range.start_bound())?;
694 let end = encode_dynamic_bound(range.end_bound())?;
695 self.index_range_encoded(index_name, start, end)
696 }
697
698 /// Encoded-bytes variant of [`Self::index_range`]. The bounds are
699 /// already the order-preserving field encoding of the user's
700 /// `Dynamic` value(s); this keeps the signature general for
701 /// `Composite` "starts-with" scans and is the entry point the
702 /// query layer / lazy-dispatch recursion call after they have
703 /// done their own encoding.
704 pub(crate) fn index_range_encoded(
705 &self,
706 index_name: &str,
707 start_bound: std::ops::Bound<Vec<u8>>,
708 end_bound: std::ops::Bound<Vec<u8>>,
709 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
710 where
711 T: Send + 'static,
712 {
713 if let Some(r) = self.dispatch_lazy(|c| {
714 c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
715 }) {
716 return r;
717 }
718 let descriptor = self.active_index(index_name)?;
719 let (start, end) =
720 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
721 let entries = self.collect_range(descriptor, start, end)?;
722 let descriptor_kind = descriptor.kind;
723 let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
724 let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
725 for (full_key, id_bytes_value) in entries {
726 let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
727 out.push(Err(Error::Corruption { page_id: 0 }));
728 continue;
729 };
730 // For Each indexes the same doc may appear multiple
731 // times under different element keys — de-dup on
732 // emission.
733 if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
734 continue;
735 }
736 // For non-unique kinds the B-tree key includes the
737 // trailing 8-byte id suffix; strip it so the caller
738 // sees only the user-key bytes.
739 let user_key = strip_id_suffix(&full_key, descriptor_kind);
740 match self.get(id) {
741 Ok(Some(doc)) => out.push(Ok((user_key, doc))),
742 Ok(None) => {
743 // Orphan index entry — surface as Corruption.
744 out.push(Err(Error::Corruption { page_id: 0 }));
745 }
746 Err(e) => out.push(Err(e)),
747 }
748 }
749 Ok(Box::new(out.into_iter()))
750 }
751
752 /// Streaming variant of [`Self::index_range`] (Phase 7A perf
753 /// pass, M14 #14). Yields `(user_key, T)` pairs lazily — the
754 /// returned [`IterIndexRange`] decodes one `T` per `next()` call
755 /// rather than building a `Vec<Result<(_, T)>>` of every match
756 /// up front. The iterator borrows `&'a self`, so it must be
757 /// consumed inside the lifetime of the enclosing
758 /// [`crate::WriteTxn`] / [`crate::ReadTxn`] (or the
759 /// [`crate::Db::collection`] handle, in Lazy mode).
760 ///
761 /// # When to prefer `iter_range` over `index_range`
762 ///
763 /// - **Memory.** `index_range` allocates `O(matches × sizeof(T))`
764 /// upfront; `iter_range` keeps a fixed-size [`VecDeque`] of
765 /// `(key, id)` pairs (`ITER_INDEX_RANGE_BATCH = 256` entries)
766 /// and decodes one `T` at a time. For a 100k-row range with
767 /// ~500-byte documents that's ~50 MB peak vs. a few KiB.
768 /// - **Latency-to-first-row.** `index_range` decodes every
769 /// matching document before returning the iterator;
770 /// `iter_range` returns immediately after the first chunk
771 /// refill, so the first `next()` returns after one index walk
772 /// + one primary-tree `get` (rather than `N`).
773 ///
774 /// # When `index_range` is still the right answer
775 ///
776 /// `index_range` returns an `IndexIter<'static, _>` — it can
777 /// escape the `read_transaction` / `transaction` closure that
778 /// produced it. `iter_range` is bound to `&self`, so the
779 /// iterator dies when the [`Collection`] handle dies. If you
780 /// need to return the iterator to outer scope, stick with
781 /// `index_range`.
782 ///
783 /// # Per-row `get`-back design choice
784 ///
785 /// Each `next()` yields `(user_key, T)` by calling
786 /// [`Self::get`] under the hood — i.e. a SECOND B+tree descent
787 /// per row (the first is the index range walk; the second is
788 /// the primary-tree `get(id)`). This is intentional and
789 /// inherited from `index_range`: the index leaf stores only
790 /// the document `id` (8 bytes), not the document bytes. A
791 /// future format-minor bump may add value-in-index storage to
792 /// short-circuit the second descent; that work is pinned to
793 /// post-1.0 (tracked as pit issue #16, "value-in-index
794 /// storage to eliminate `index_range` double-decode").
795 ///
796 /// # Errors
797 ///
798 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
799 /// - Pager / B-tree / codec errors propagated at construction
800 /// and from each `next()` call.
801 pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
802 where
803 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
804 T: Send + 'static,
805 {
806 let start_bound = encode_dynamic_bound(range.start_bound())?;
807 let end_bound = encode_dynamic_bound(range.end_bound())?;
808 self.iter_range_encoded(index_name, start_bound, end_bound)
809 }
810
811 /// Encoded-bytes variant of [`Self::iter_range`]. Bounds are the
812 /// order-preserving field encoding of the user's `Dynamic`
813 /// value(s); used internally by the lazy-mode fallback path.
814 fn iter_range_encoded<'a>(
815 &'a self,
816 index_name: &str,
817 start_bound: Bound<Vec<u8>>,
818 end_bound: Bound<Vec<u8>>,
819 ) -> Result<IterIndexRange<'a, T>>
820 where
821 T: Send + 'static,
822 {
823 // Lazy mode falls back to `index_range`'s eager materialization
824 // path so the index walk + per-row `get` share a single
825 // snapshot. Streaming refills can't preserve that — each
826 // refill would open a fresh txn and observe a different
827 // snapshot. Lazy callers who want true streaming should open
828 // an explicit `Db::read_transaction` and call `iter_range`
829 // on the bound collection there.
830 if matches!(self.mode, CollectionMode::Lazy(_)) {
831 return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
832 }
833 let descriptor = self.active_index(index_name)?;
834 // #90: in Write mode the iterator must walk the LIVE index
835 // root (the per-txn cache's advanced value) so a streaming
836 // scan opened after an in-txn index write sees its own
837 // entries. In Read mode there is no write cache, so the
838 // open-time descriptor root is authoritative.
839 let index_root = match &self.mode {
840 CollectionMode::Write(w) => {
841 self.write_index_root(w, index_name, descriptor.root_page_id)?
842 }
843 _ => descriptor.root_page_id,
844 };
845 let (start, end) =
846 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
847 // Stash the resumption marker as `Excluded(start)` only on
848 // the first refill; afterwards the iterator overwrites it
849 // with the last full_key it emitted. The initial `start`
850 // bound is honoured by `refill` via the same `last_full_key`
851 // → `Excluded(_)` translation.
852 let initial_resume = match start {
853 Bound::Included(k) => InitialResume::Included(k),
854 Bound::Excluded(k) => InitialResume::Excluded(k),
855 Bound::Unbounded => InitialResume::Unbounded,
856 };
857 Ok(IterIndexRange {
858 coll: self,
859 descriptor_kind: descriptor.kind,
860 index_root,
861 initial_resume: Some(initial_resume),
862 last_full_key: None,
863 end_bound: end,
864 buffer: VecDeque::new(),
865 emitted_ids: HashSet::new(),
866 finished: false,
867 })
868 }
869
870 /// Lazy-mode fallback for [`Self::iter_range`]: delegates to
871 /// [`Self::index_range`] (which itself dispatches through a fresh
872 /// read txn) and rehouses the eagerly-materialised entries into
873 /// the streaming iterator's buffer as
874 /// [`StagedEntry::Resolved`]. Power-of-ten Rule 4: keeping this
875 /// isolated so the streaming path's `iter_range` body stays
876 /// small.
877 fn iter_range_lazy_fallback<'a>(
878 &'a self,
879 index_name: &str,
880 start_bound: Bound<Vec<u8>>,
881 end_bound: Bound<Vec<u8>>,
882 ) -> Result<IterIndexRange<'a, T>>
883 where
884 T: Send + 'static,
885 {
886 let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
887 let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
888 for item in materialized {
889 match item {
890 Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
891 Err(e) => buffer.push_back(Err(e)),
892 }
893 }
894 Ok(IterIndexRange {
895 coll: self,
896 descriptor_kind: obj_core::IndexKind::Standard,
897 index_root: 0,
898 initial_resume: None,
899 last_full_key: None,
900 end_bound: Bound::Unbounded,
901 buffer,
902 emitted_ids: HashSet::new(),
903 finished: true,
904 })
905 }
906
907 /// Look up the `IndexKind` of an active index by name. Used by
908 /// the M8 query layer to dispatch `Query::count` between the
909 /// entry-count and distinct-id-count paths (M8 follow-up #72).
910 ///
911 /// # Errors
912 ///
913 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
914 pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
915 Ok(self.active_index(index_name)?.kind)
916 }
917
918 /// Resolve `index_name` to an `Active` `IndexDescriptor` on the
919 /// collection. Errors with [`Error::IndexNotFound`] if absent
920 /// or `DroppedPending`.
921 ///
922 /// Returns a borrow into `self.descriptor.indexes` — no per-lookup
923 /// clone (#84). Every caller uses the descriptor only within the
924 /// enclosing `&self` borrow; `iter_range_encoded` copies the two
925 /// `Copy` fields it needs (`kind`, `root_page_id`) into the
926 /// returned iterator rather than holding the borrow.
927 fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
928 let entry = self
929 .descriptor
930 .indexes
931 .iter()
932 .find(|d| d.name == index_name);
933 match entry {
934 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
935 _ => Err(Error::IndexNotFound {
936 collection: self.collection_name.clone().into_owned(),
937 name: index_name.to_owned(),
938 }),
939 }
940 }
941
942 /// Single-key `get` on an index B-tree. Used by `find_unique`
943 /// and by the Unique-kind branch of `lookup`.
944 fn index_get(
945 &self,
946 descriptor: &obj_core::IndexDescriptor,
947 key: &[u8],
948 ) -> Result<Option<Vec<u8>>> {
949 match &self.mode {
950 CollectionMode::Write(write) => {
951 // #90: descend the LIVE index root from the per-txn
952 // cache so an index read after an index-mutating write
953 // in the SAME txn observes its own entries.
954 let root_raw =
955 self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
956 let root = PageId::new(root_raw)
957 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
958 let mut pager = lock_pager(write.env)?;
959 let tree = BTree::<FileHandle>::open(&pager, root)?;
960 tree.get(&mut pager, key)
961 }
962 CollectionMode::Read(read) => {
963 let root = PageId::new(descriptor.root_page_id)
964 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
965 let pager = lock_pager(read.env)?;
966 BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
967 }
968 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
969 operation: "internal: lazy-mode index_get",
970 }),
971 }
972 }
973
974 /// Collect every `(full_key, value)` entry from an index B-tree
975 /// whose key starts with `prefix`. For unique kinds the prefix
976 /// is the entire key (one match max); for non-unique kinds we
977 /// match every key whose first `prefix.len()` bytes equal
978 /// `prefix` (the trailing `id_suffix` varies per doc).
979 fn collect_nonunique_equal(
980 &self,
981 descriptor: &obj_core::IndexDescriptor,
982 prefix: &[u8],
983 ) -> Result<Vec<u64>> {
984 let entries = self.collect_range(
985 descriptor,
986 std::ops::Bound::Included(prefix.to_vec()),
987 // Upper bound is `prefix || 0xFF..` — every key whose
988 // user-portion equals `prefix` falls in
989 // `[prefix, prefix || u64::MAX]`.
990 std::ops::Bound::Included(append_max_id(prefix)),
991 )?;
992 let mut ids = Vec::with_capacity(entries.len());
993 let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
994 for (_full_key, value) in entries {
995 let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
996 if emitted.insert(id.get()) {
997 ids.push(id.get());
998 }
999 }
1000 Ok(ids)
1001 }
1002
1003 /// Collect a single id from a Unique index B-tree at `key`.
1004 fn collect_unique(
1005 &self,
1006 descriptor: &obj_core::IndexDescriptor,
1007 key: &[u8],
1008 ) -> Result<Vec<u64>> {
1009 match self.index_get(descriptor, key)? {
1010 Some(bytes) => Id::from_be_bytes(&bytes)
1011 .map(|id| vec![id.get()])
1012 .ok_or(Error::Corruption { page_id: 0 }),
1013 None => Ok(Vec::new()),
1014 }
1015 }
1016
1017 /// Collect every `(full_key, value)` entry from an index B-tree
1018 /// whose key falls within `(start, end)`.
1019 fn collect_range(
1020 &self,
1021 descriptor: &obj_core::IndexDescriptor,
1022 start: std::ops::Bound<Vec<u8>>,
1023 end: std::ops::Bound<Vec<u8>>,
1024 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1025 // Read mode pins every page read to the txn's snapshot (M12
1026 // #12): a concurrent writer's post-snapshot index entries must
1027 // not leak into a read txn's range/count. Write mode keeps the
1028 // live-pager scan so the txn observes its own uncommitted
1029 // index writes (it has no snapshot to pin against).
1030 match &self.mode {
1031 CollectionMode::Read(r) => {
1032 let root = PageId::new(descriptor.root_page_id)
1033 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1034 let pager = lock_pager(r.env)?;
1035 let iter = BTree::<FileHandle>::range_via_snapshot(
1036 &pager,
1037 r.snapshot,
1038 root,
1039 (start, end),
1040 )?;
1041 let mut out = Vec::new();
1042 for step in iter {
1043 out.push(step?);
1044 }
1045 Ok(out)
1046 }
1047 CollectionMode::Write(w) => {
1048 // #90: scan the LIVE index root from the per-txn cache
1049 // so an in-txn index scan sees its own uncommitted
1050 // entries (the open-time descriptor root may be stale).
1051 let root_raw =
1052 self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
1053 let root = PageId::new(root_raw)
1054 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1055 let mut pager = lock_pager(w.env)?;
1056 let tree = BTree::<FileHandle>::open(&pager, root)?;
1057 let iter = tree.range(&mut pager, (start, end))?;
1058 let mut out = Vec::new();
1059 for step in iter {
1060 out.push(step?);
1061 }
1062 Ok(out)
1063 }
1064 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1065 operation: "internal: lazy-mode collect_range",
1066 }),
1067 }
1068 }
1069
1070 /// Resolve a `Vec<u64>` of `Id` integer values into concrete
1071 /// `T` documents via `self.get`. Preserves order; missing rows
1072 /// surface as `Error::Corruption` (orphan index entry).
1073 fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
1074 let mut out = Vec::with_capacity(ids.len());
1075 for raw in ids {
1076 let id =
1077 Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
1078 let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
1079 out.push(doc);
1080 }
1081 Ok(out)
1082 }
1083
1084 /// Count every entry in the primary tree WITHOUT decoding the
1085 /// documents. Used by the M8 [`crate::Query::count`] no-decode
1086 /// fast path; the iterator visits leaf pages and counts entries
1087 /// rather than running each through postcard.
1088 ///
1089 /// Power-of-ten Rule 2: bounded by the B+tree's `MAX_RANGE_NODES`
1090 /// budget (inherited from `BTree::range`).
1091 ///
1092 /// # Errors
1093 ///
1094 /// Pager / B-tree errors propagated.
1095 pub fn count_all(&self) -> Result<u64> {
1096 // Closure (rather than `Collection::count_all`) so the
1097 // higher-ranked lifetime on `dispatch_lazy`'s
1098 // `for<'a> FnOnce(&Collection<'a, T>) -> _` bound is
1099 // satisfied — `Collection::count_all` resolves to a single
1100 // lifetime fn-item type, which fails the HRTB check.
1101 #[allow(clippy::redundant_closure_for_method_calls)]
1102 if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
1103 return r;
1104 }
1105 // Read mode pins the full-tree scan to the txn's snapshot
1106 // (M12 #12) so a concurrent writer's post-snapshot inserts
1107 // cannot perturb the count; write mode keeps the live scan so
1108 // the txn sees its own uncommitted writes.
1109 match &self.mode {
1110 CollectionMode::Read(r) => {
1111 let pager = lock_pager(r.env)?;
1112 let pid = PageId::new(self.descriptor.primary_root)
1113 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1114 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
1115 count_range_iter(iter)
1116 }
1117 CollectionMode::Write(w) => {
1118 // #90: count the LIVE primary root so an in-txn count
1119 // reflects this txn's own uncommitted inserts/deletes.
1120 let root = self.write_primary_root(w)?;
1121 let mut pager = lock_pager(w.env)?;
1122 let tree = btree_handle(&pager, root)?;
1123 let iter = tree.range(&mut pager, ..)?;
1124 count_range_iter(iter)
1125 }
1126 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1127 operation: "internal: lazy-mode count_all",
1128 }),
1129 }
1130 }
1131
1132 /// Count every entry whose encoded key falls inside `range` on
1133 /// the named index's B-tree, WITHOUT decoding any document. M8
1134 /// fast path for [`crate::Query::count`] when the source is an
1135 /// `index_range`.
1136 ///
1137 /// Returns the number of index B-tree entries — for an `Each`
1138 /// index that may exceed the document count (one doc emits
1139 /// multiple entries); for other kinds it equals the matching
1140 /// doc count.
1141 ///
1142 /// # Errors
1143 ///
1144 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1145 /// - Pager / B-tree errors propagated.
1146 pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1147 where
1148 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1149 {
1150 let start = encode_dynamic_bound(range.start_bound())?;
1151 let end = encode_dynamic_bound(range.end_bound())?;
1152 self.count_index_range_encoded(index_name, start, end)
1153 }
1154
1155 /// Encoded-bytes variant of [`Self::count_index_range`]. Bounds
1156 /// are the order-preserving field encoding of the user's
1157 /// `Dynamic` value(s); used by the query-layer count fast path.
1158 pub(crate) fn count_index_range_encoded(
1159 &self,
1160 index_name: &str,
1161 start_bound: std::ops::Bound<Vec<u8>>,
1162 end_bound: std::ops::Bound<Vec<u8>>,
1163 ) -> Result<u64> {
1164 if let Some(r) = self.dispatch_lazy(|c| {
1165 c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
1166 }) {
1167 return r;
1168 }
1169 let descriptor = self.active_index(index_name)?;
1170 let (start, end) =
1171 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1172 let entries = self.collect_range(descriptor, start, end)?;
1173 // `entries.len()` is a `usize`; promote to `u64` carefully.
1174 // `usize` is at most 64 bits on every supported target.
1175 u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1176 reason: "index range entry count exceeds u64",
1177 })
1178 }
1179
1180 /// Count distinct document `Id`s whose entries fall inside
1181 /// `range` on the named index's B-tree, WITHOUT decoding any
1182 /// document. For `Each` indexes this is the correct shape of
1183 /// the "how many docs match" question — `count_index_range`
1184 /// returns the entry count, which overshoots when a single doc
1185 /// contributes multiple entries.
1186 ///
1187 /// Implementation walks the index B-tree, parses the trailing
1188 /// 8-byte big-endian `Id` suffix from each non-unique key, and
1189 /// tracks the unique set in a bounded [`std::collections::HashSet`]
1190 /// capped at [`MAX_DISTINCT_IDS`]. Exceeding the cap surfaces
1191 /// [`Error::DistinctCountExceeded`] — the caller should narrow
1192 /// the range.
1193 ///
1194 /// # Per-kind semantics
1195 ///
1196 /// - `Standard`, `Composite`: equivalent to `count_index_range`
1197 /// (one entry per doc by construction; the trailing-id-suffix
1198 /// walk still produces the same total).
1199 /// - `Unique`: keys carry NO id suffix — the entry value is the
1200 /// raw 8-byte `Id`; the walk reads the value instead.
1201 /// - `Each`: the dedup is meaningful — one doc may contribute
1202 /// N entries under N distinct element keys.
1203 ///
1204 /// # Errors
1205 ///
1206 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1207 /// - [`Error::DistinctCountExceeded`] if the distinct set
1208 /// exceeds [`MAX_DISTINCT_IDS`].
1209 /// - [`Error::Corruption`] if an entry's id suffix / value is
1210 /// not parseable as an [`obj_core::Id`].
1211 /// - Pager / B-tree errors propagated.
1212 pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1213 where
1214 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1215 {
1216 let start = encode_dynamic_bound(range.start_bound())?;
1217 let end = encode_dynamic_bound(range.end_bound())?;
1218 self.count_distinct_ids_in_range_encoded(index_name, start, end)
1219 }
1220
1221 /// Encoded-bytes variant of [`Self::count_distinct_ids_in_range`].
1222 /// Bounds are the order-preserving field encoding of the user's
1223 /// `Dynamic` value(s); used by the query-layer count fast path.
1224 pub(crate) fn count_distinct_ids_in_range_encoded(
1225 &self,
1226 index_name: &str,
1227 start_bound: std::ops::Bound<Vec<u8>>,
1228 end_bound: std::ops::Bound<Vec<u8>>,
1229 ) -> Result<u64> {
1230 if let Some(r) = self.dispatch_lazy(|c| {
1231 c.count_distinct_ids_in_range_encoded(
1232 index_name,
1233 start_bound.clone(),
1234 end_bound.clone(),
1235 )
1236 }) {
1237 return r;
1238 }
1239 let descriptor = self.active_index(index_name)?;
1240 let (start, end) =
1241 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1242 let entries = self.collect_range(descriptor, start, end)?;
1243 let mut distinct: HashSet<u64> = HashSet::new();
1244 for (full_key, value) in entries {
1245 let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
1246 if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
1247 return Err(Error::DistinctCountExceeded {
1248 limit: MAX_DISTINCT_IDS,
1249 });
1250 }
1251 }
1252 // `distinct.len()` is a `usize`; promote to `u64` carefully.
1253 // `usize` is at most 64 bits on every supported target.
1254 u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
1255 reason: "distinct id count exceeds u64",
1256 })
1257 }
1258
1259 /// Materialise every `(Id, T)` pair in the collection.
1260 ///
1261 /// Implementation note: M6 returns an owned `Vec` rather than a
1262 /// streaming iterator because the B+tree range API borrows the
1263 /// pager, and threading that borrow through the mutex guards
1264 /// in the iterator chain is awkward. M7+ may convert to a
1265 /// streaming shape once the index API is in place.
1266 ///
1267 /// # Errors
1268 ///
1269 /// Pager / B-tree / codec errors propagated.
1270 pub fn all(&self) -> Result<Vec<(Id, T)>> {
1271 // Closure (rather than `Collection::all`) for the same
1272 // HRTB reason documented on `count_all` above.
1273 #[allow(clippy::redundant_closure_for_method_calls)]
1274 if let Some(r) = self.dispatch_lazy(|c| c.all()) {
1275 return r;
1276 }
1277 match &self.mode {
1278 CollectionMode::Write(write) => {
1279 // #90: scan the LIVE primary root so an in-txn `all()`
1280 // includes this txn's own uncommitted inserts.
1281 let root = self.write_primary_root(write)?;
1282 let mut pager = lock_pager(write.env)?;
1283 scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
1284 }
1285 CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
1286 read.snapshot,
1287 read.env,
1288 self.descriptor.primary_root,
1289 self.descriptor.collection_id,
1290 ),
1291 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1292 operation: "internal: lazy-mode all",
1293 }),
1294 }
1295 }
1296
1297 fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
1298 match &self.mode {
1299 CollectionMode::Write(w) => Ok(w),
1300 CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
1301 Err(Error::ReadOnly { operation: op })
1302 }
1303 }
1304 }
1305
1306 /// If this handle is in `Lazy` mode, dispatch `body` through a
1307 /// fresh read transaction on the bound [`crate::Db`] — opening a
1308 /// transient [`Collection`] via [`Self::open_readonly_named`] and
1309 /// invoking the user-supplied closure on it. Returns `Some(_)`
1310 /// when the dispatch fires (the closure ran or the underlying
1311 /// open failed); returns `None` for non-Lazy handles so the
1312 /// caller can fall through to the existing logic.
1313 ///
1314 /// Power-of-ten Rule 4: keeps each public method's body small —
1315 /// the dispatch shim is one match arm instead of a per-method
1316 /// `if let CollectionMode::Lazy(_)` ladder.
1317 fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
1318 where
1319 F: FnOnce(&Collection<'_, T>) -> Result<R>,
1320 {
1321 match &self.mode {
1322 CollectionMode::Lazy(LazyRef { db }) => {
1323 // Clone the stored name into an owned `Cow` for the
1324 // transient handle opened inside the private read txn.
1325 // `into_owned` keeps the `'static` Cow bound the
1326 // `open_readonly_named` signature requires regardless
1327 // of whether the stored name was Borrowed or Owned.
1328 let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
1329 Some(db.read_transaction(move |tx| {
1330 let coll = Collection::<T>::open_readonly_named(tx, name)?;
1331 body(&coll)
1332 }))
1333 }
1334 _ => None,
1335 }
1336 }
1337}
1338
1339// ---------- internal helpers --------------------------------------
1340
1341fn lock_pager(
1342 env: &obj_core::TxnEnv<FileHandle>,
1343) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1344 env.pager().lock().map_err(|_| Error::Busy {
1345 kind: obj_core::LockKind::WriterInProcess,
1346 })
1347}
1348
1349/// Ensure `T::COLLECTION` exists in the catalog, lazy-creating an
1350/// empty primary B-tree on first call. Used on the write side.
1351fn ensure_collection<T: Document>(
1352 inner: &obj_core::WriteTxn<'_, FileHandle>,
1353 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1354) -> Result<CollectionDescriptor> {
1355 let mut pager = inner.lock_pager()?;
1356 let mut catalog_guard = lock_catalog(catalog)?;
1357 if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
1358 return Ok(d);
1359 }
1360 let tree = BTree::<FileHandle>::empty(&mut pager)?;
1361 let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
1362 let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
1363 catalog_guard
1364 .get(&mut pager, T::COLLECTION)?
1365 .ok_or(Error::Corruption { page_id: 0 })
1366}
1367
1368/// Re-read the descriptor for `T::COLLECTION` after any catalog
1369/// mutation. Used by [`Collection::open_or_create`] after the
1370/// reconciler runs.
1371fn reread_descriptor<T: Document>(
1372 inner: &obj_core::WriteTxn<'_, FileHandle>,
1373 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1374) -> Result<CollectionDescriptor> {
1375 let mut pager = inner.lock_pager()?;
1376 let catalog_guard = lock_catalog(catalog)?;
1377 catalog_guard
1378 .get(&mut pager, T::COLLECTION)?
1379 .ok_or(Error::Corruption { page_id: 0 })
1380}
1381
1382/// Reconcile `T::indexes()` against the catalog's stored descriptors
1383/// on the FIRST call per process per `(collection, version)`.
1384/// Subsequent calls for the same `(collection, version)` observe the
1385/// cache hit (in the shared `reconciled` set OR this txn's `staged`
1386/// set) and skip the catalog walk.
1387///
1388/// Reconciliation runs inside the user's WAL transaction so a
1389/// rolled-back txn leaves the catalog clean. If reconciliation
1390/// fails (e.g. `Error::IndexKindMismatch`), neither set is populated
1391/// so the next attempt re-runs the reconciler.
1392///
1393/// #93 — the skip-check is `shared ∪ staged`: a second handle of the
1394/// same `(collection, version)` within ONE txn still skips the
1395/// (idempotent) catalog walk via `staged`, but the key is recorded in
1396/// the per-txn `staged` set, NOT the shared set.
1397/// [`crate::WriteTxn::commit`] promotes the staged keys into the shared
1398/// set only after the WAL commit lands — so a rolled-back lazy-create
1399/// can never poison the shared cache into skipping reconciliation on a
1400/// later txn (the original bug).
1401fn reconcile_indexes_once<T: Document>(
1402 inner: &obj_core::WriteTxn<'_, FileHandle>,
1403 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1404 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1405 staged: &mut HashSet<(String, u32)>,
1406) -> Result<()> {
1407 // The generic `#[derive(Document)]` path is exactly the non-generic
1408 // raw seam ([`reconcile_specs_once`]) with the collection name,
1409 // version, and spec list supplied from the compile-time `T` rather
1410 // than from a caller argument. Both share ONE body so the cache /
1411 // staging / validation / catalog-walk semantics can never drift
1412 // apart (#108).
1413 reconcile_specs_once(
1414 inner,
1415 catalog,
1416 reconciled,
1417 staged,
1418 T::COLLECTION,
1419 T::VERSION,
1420 &T::indexes(),
1421 )
1422}
1423
1424/// Non-generic core of index reconciliation, shared by the generic
1425/// `#[derive(Document)]` path ([`reconcile_indexes_once`]) and the
1426/// public raw seam ([`crate::WriteTxn::reconcile_indexes_raw`], #108).
1427///
1428/// Reconciles `specs` against the catalog's stored descriptors for
1429/// `collection` on the FIRST call per process per `(collection,
1430/// version)`, honoring the same `shared ∪ staged` skip-cache (keyed by
1431/// `(collection, version)` — #130), per-txn staging, and pre-mutation
1432/// validation as the generic path — the only difference is that the
1433/// collection name, version, and spec list are arguments rather than
1434/// `T::COLLECTION` / `T::VERSION` / `T::indexes()`.
1435///
1436/// # Why the cache key includes `version` (#130)
1437///
1438/// `Catalog::reconcile_indexes` is a FULL reconcile: it declares specs
1439/// missing from the catalog AND drops `Active` descriptors absent from
1440/// `specs`. Keying the skip-cache by `collection` ALONE meant that once
1441/// a process reconciled a collection at one schema version, a LATER
1442/// version in the same process that ADDED a new index never reconciled
1443/// — the added index never became `Active` and index-maintaining writes
1444/// failed with `IndexNotFound`. Keying by `(collection, version)`
1445/// reconciles each version exactly once: the common single-version case
1446/// is unchanged (one key, reconciled once), and a cross-version index
1447/// ADD reconciles the new version's specs on its first write.
1448///
1449/// ## Caveat — conflicting index REMOVAL interleaved across versions
1450///
1451/// Because each `(collection, version)` reconciles independently and
1452/// `reconcile_indexes` drops `Active` indexes absent from the version's
1453/// specs, alternating writes between two live versions of the SAME
1454/// collection in ONE process — where the versions declare DIFFERENT
1455/// index sets — can leave the catalog reflecting whichever version
1456/// reconciled most recently (its specs drive the drop set). Index
1457/// ADDITION (the common monotonic schema-evolution case) is fully
1458/// correct. This is strictly better than the prior behavior, which
1459/// never reconciled the second version at all (so its added index was
1460/// simply missing); the removal-interleaving edge is a narrow,
1461/// single-process anti-pattern.
1462///
1463/// The caller MUST have ensured `collection` exists in the catalog
1464/// (the generic path runs [`ensure_collection`] first; the raw seam
1465/// runs [`crate::txn::ensure_collection_raw`]) — `reconcile_indexes`
1466/// errors with `CollectionNotFound` otherwise.
1467pub(crate) fn reconcile_specs_once(
1468 inner: &obj_core::WriteTxn<'_, FileHandle>,
1469 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1470 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1471 staged: &mut HashSet<(String, u32)>,
1472 collection: &str,
1473 version: u32,
1474 specs: &[obj_core::IndexSpec],
1475) -> Result<()> {
1476 // Cache key is `(collection, version)` (#130): each schema version
1477 // reconciles its OWN spec set exactly once per process.
1478 let key = (collection.to_owned(), version);
1479 // Fast path: already reconciled in a prior committed txn (shared)
1480 // or earlier in THIS txn (staged) — skip the catalog walk. Probe
1481 // `staged` first: it needs no lock and covers the repeat-handle
1482 // case; the shared probe takes the mutex only when `staged` misses.
1483 if staged.contains(&key) {
1484 return Ok(());
1485 }
1486 {
1487 let cache = lock_reconciled(reconciled)?;
1488 if cache.contains(&key) {
1489 return Ok(());
1490 }
1491 }
1492 // Validate specs before any catalog mutation so a bad spec
1493 // does not leave a half-reconciled catalog.
1494 for spec in specs {
1495 spec.validate()?;
1496 }
1497 {
1498 let mut pager = inner.lock_pager()?;
1499 let mut catalog_guard = lock_catalog(catalog)?;
1500 let _post = catalog_guard.reconcile_indexes(&mut pager, collection, specs)?;
1501 }
1502 // #93: stage in the PER-TXN set, not the shared one. Promotion to
1503 // the shared set is deferred to a successful `WriteTxn::commit`.
1504 staged.insert(key);
1505 Ok(())
1506}
1507
1508fn lock_reconciled(
1509 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1510) -> Result<std::sync::MutexGuard<'_, HashSet<(String, u32)>>> {
1511 reconciled.lock().map_err(|_| Error::Busy {
1512 kind: obj_core::LockKind::WriterInProcess,
1513 })
1514}
1515
1516/// Read-side descriptor lookup against a caller-supplied
1517/// collection name. M11 #93 introduced this byte-shape so the
1518/// namespace-aware [`Collection::open_readonly`] can perform the
1519/// catalog walk against either the calling Db's snapshot (no
1520/// namespace) or an attached Db's snapshot (with the namespace
1521/// prefix stripped).
1522fn read_descriptor_via_snapshot_named(
1523 env: &obj_core::TxnEnv<FileHandle>,
1524 snapshot: &obj_core::ReaderSnapshot<FileHandle>,
1525 name: &str,
1526) -> Result<Option<CollectionDescriptor>> {
1527 let pager = lock_pager(env)?;
1528 Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1529}
1530
1531/// #83 (b): fused one-shot point read for [`crate::Db::get`].
1532///
1533/// Resolves the collection descriptor and the primary-tree value for
1534/// `id` under a SINGLE pager-mutex acquisition (see
1535/// [`snapshot_resolve_and_get`]), then decodes the bytes against the
1536/// resolved `collection_id`. This collapses the two back-to-back
1537/// pager locks the equivalent handle path pays (one to open the
1538/// handle / resolve the descriptor, one for the `get`).
1539///
1540/// Observably identical to
1541/// `tx.collection::<T>()?.get(id)` for the one-shot caller: the
1542/// descriptor lookup and the value get run on the same `ReadTxn`
1543/// snapshot, and a missing collection still surfaces as
1544/// [`Error::CollectionNotFound`]. Namespaced reads (`<ns>.<tail>`)
1545/// fall back to the handle path so the attached-snapshot dispatch is
1546/// unchanged.
1547pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
1548 let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
1549 if namespace.is_some() {
1550 // Namespaced: keep the existing attached-snapshot dispatch.
1551 return Collection::<T>::open_readonly(tx)?.get(id);
1552 }
1553 let key = id.to_be_bytes();
1554 let resolved =
1555 snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
1556 match resolved {
1557 Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
1558 None => Ok(None),
1559 }
1560}
1561
1562/// Re-read the descriptor inside an already-locked pager + catalog
1563/// pair. Surfaces a missing collection as `Error::Corruption`
1564/// because the caller has already opened a write txn against it.
1565fn catalog_get_required(
1566 pager: &mut Pager<FileHandle>,
1567 catalog: &Catalog<FileHandle>,
1568 name: &str,
1569) -> Result<CollectionDescriptor> {
1570 catalog
1571 .get(pager, name)?
1572 .ok_or(Error::Corruption { page_id: 0 })
1573}
1574
1575fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1576 let root_pid =
1577 PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1578 BTree::<FileHandle>::open(pager, root_pid)
1579}
1580
1581/// Drain a B-tree range iterator, counting entries WITHOUT retaining
1582/// their bytes. Shared by [`Collection::count_all`]'s live (write) and
1583/// snapshot-pinned (read) scan arms. Power-of-ten Rule 2: the
1584/// iterator carries its own `MAX_RANGE_NODES` budget; the `u64`
1585/// overflow check guards the count itself.
1586fn count_range_iter<I>(iter: I) -> Result<u64>
1587where
1588 I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
1589{
1590 let mut n: u64 = 0;
1591 for step in iter {
1592 // Probe-only: drop the bytes the moment they decode.
1593 let _ = step?;
1594 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1595 reason: "primary tree entry count exceeds u64",
1596 })?;
1597 }
1598 Ok(n)
1599}
1600
1601// `persist_root` removed in M7 #58: every mutating method now
1602// routes through `index_maint::apply_doc_change`, which persists
1603// the descriptor (including the possibly-advanced `primary_root`)
1604// via `Catalog::update` after every per-index B-tree mutation.
1605
1606/// Snapshot-consistent B-tree lookup (M6 #53).
1607///
1608/// Walks the primary B+tree rooted at `primary_root` using
1609/// [`obj_core::btree::BTree::get_via_snapshot`], which descends
1610/// through [`obj_core::ReaderSnapshot::read_page`] rather than the
1611/// live `Pager::read_page`. This bypasses the WAL `state.view` /
1612/// `state.pending` overlays — a concurrent writer's post-snapshot
1613/// COW commits cannot poison the reader's walk.
1614///
1615/// `primary_root` MUST be the descriptor's `primary_root` as-of
1616/// the snapshot's pinned LSN (i.e. the value read via
1617/// [`obj_core::Catalog::lookup_via_snapshot`] in
1618/// [`read_descriptor_via_snapshot`] above). Using the writer's
1619/// live `primary_root` would defeat the snapshot read.
1620fn snapshot_get_via_btree(
1621 snap: &obj_core::ReaderSnapshot<FileHandle>,
1622 env: &obj_core::TxnEnv<FileHandle>,
1623 primary_root: u64,
1624 key: &[u8],
1625) -> Result<Option<Vec<u8>>> {
1626 let pager = lock_pager(env)?;
1627 let root_pid = PageId::new(primary_root)
1628 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1629 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
1630}
1631
1632/// #83 (b): fused single-lock read. Resolves the descriptor for
1633/// `name` and performs the primary-tree `get` for `key` under ONE
1634/// pager-mutex acquisition, against the SAME `snapshot` — collapsing
1635/// the descriptor-lookup lock (`read_descriptor_via_snapshot_named`)
1636/// and the value-get lock (`snapshot_get_via_btree`) that the
1637/// two-call handle path pays back-to-back.
1638///
1639/// Returns `(descriptor, value)` so the caller can `decode` against
1640/// the resolved `collection_id`. A missing collection surfaces as
1641/// `Err(CollectionNotFound)` (matching the handle path's open-time
1642/// contract for the one-shot caller); a present collection with no
1643/// entry for `key` surfaces as `Ok(None)`.
1644///
1645/// Power-of-ten: keeps poison → `Error::Busy` (Rule 7, via
1646/// `lock_pager`); ≤ 60 lines (Rule 4); `debug_assert`s that the
1647/// snapshot-resolved `primary_root` is the one fed to the get
1648/// (Rule 5).
1649fn snapshot_resolve_and_get(
1650 snap: &obj_core::ReaderSnapshot<FileHandle>,
1651 env: &obj_core::TxnEnv<FileHandle>,
1652 name: &str,
1653 key: &[u8],
1654) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
1655 let pager = lock_pager(env)?;
1656 let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
1657 return Err(Error::CollectionNotFound {
1658 name: name.to_owned(),
1659 });
1660 };
1661 let root_pid = PageId::new(descriptor.primary_root)
1662 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1663 // Rule 5: the value-get MUST descend the descriptor's own
1664 // snapshot-time root — never a re-resolved or live root. A live
1665 // primary B-tree root is always page 1+, so a zero `collection_id`
1666 // here would mean we resolved a degenerate catalog row.
1667 debug_assert_eq!(
1668 root_pid.get(),
1669 descriptor.primary_root,
1670 "fused get must descend the snapshot-resolved primary_root",
1671 );
1672 let value =
1673 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
1674 Ok(value.map(|v| (descriptor, v)))
1675}
1676
1677fn scan_all<T: Document>(
1678 pager: &mut Pager<FileHandle>,
1679 primary_root: u64,
1680 collection_id: u32,
1681) -> Result<Vec<(Id, T)>> {
1682 let tree = btree_handle(pager, primary_root)?;
1683 let iter = tree.range(pager, ..)?;
1684 let mut out = Vec::new();
1685 for entry in iter {
1686 let (key, value) = entry?;
1687 let id = Id::from_be_bytes(&key)
1688 .ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
1689 let doc = decode::<T>(&value, collection_id)?;
1690 out.push((id, doc));
1691 }
1692 Ok(out)
1693}
1694
1695fn snapshot_scan_via_btree<T: Document>(
1696 _snap: &obj_core::ReaderSnapshot<FileHandle>,
1697 env: &obj_core::TxnEnv<FileHandle>,
1698 primary_root: u64,
1699 collection_id: u32,
1700) -> Result<Vec<(Id, T)>> {
1701 let mut pager = lock_pager(env)?;
1702 scan_all::<T>(&mut pager, primary_root, collection_id)
1703}
1704
1705/// Encode the caller-supplied `Dynamic` value(s) into the bytes a
1706/// lookup against `descriptor` would use as a B-tree key. For
1707/// `Unique` indexes the result is the key bytes verbatim; for
1708/// non-unique kinds the lookup helpers extend with the per-doc
1709/// id suffix at scan time.
1710fn index_key_for_lookup(
1711 descriptor: &obj_core::IndexDescriptor,
1712 fields: &[obj_core::codec::Dynamic],
1713) -> Result<obj_core::index::EncodedIndexKey> {
1714 // Ref-based encode (#84): pass the descriptor's `kind` (Copy) and
1715 // `key_paths` BY REFERENCE — no transient `IndexSpec`, no
1716 // `name`/`key_paths` clone, no redundant `IndexSpec::validate` on
1717 // an already-validated on-disk descriptor. The byte output is
1718 // identical to the old `from_parts` + `encode_index_key` path
1719 // (see `encode_index_key_parts` and the byte-identity test).
1720 obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
1721}
1722
1723/// Encode a `Bound<&Dynamic>` into the index-key `Bound<Vec<u8>>` the
1724/// B-tree scan uses. Shared by the `Dynamic`-taking range methods on
1725/// [`Collection`].
1726///
1727/// A scalar `Dynamic` is encoded with the order-preserving field
1728/// encoder ([`obj_core::index::encode_field`]) — byte-identical to
1729/// what [`crate::Query::index_range`] produces, so a query and a
1730/// direct collection scan over the same scalar bound observe the
1731/// same entries. A [`Dynamic::Seq`](obj_core::codec::Dynamic::Seq)
1732/// bound is encoded as a composite key (the
1733/// [`COMPOSITE_TAG`](obj_core::index::COMPOSITE_TAG)-prefixed
1734/// concatenation of each element's field encoding) so a `Composite`
1735/// index can be range-scanned by a full tuple bound.
1736fn encode_dynamic_bound(
1737 b: std::ops::Bound<&obj_core::codec::Dynamic>,
1738) -> Result<std::ops::Bound<Vec<u8>>> {
1739 match b {
1740 std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
1741 std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
1742 std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
1743 }
1744}
1745
1746/// Encode one `Dynamic` bound value into index-key bytes. Scalars go
1747/// through [`obj_core::index::encode_field`]; a `Seq` is encoded as a
1748/// composite tuple key. Power-of-ten Rule 4: kept separate so
1749/// [`encode_dynamic_bound`] stays a thin three-arm match.
1750fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
1751 match v {
1752 obj_core::codec::Dynamic::Seq(fields) => {
1753 // `COMPOSITE_TAG || encode_field(f0) || encode_field(f1) ..`
1754 // is byte-identical to `encode_index_key`'s composite path
1755 // for the same fields — see `obj_core::index::key`.
1756 let mut out = vec![obj_core::index::COMPOSITE_TAG];
1757 for f in fields {
1758 out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
1759 }
1760 Ok(out)
1761 }
1762 _ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
1763 }
1764}
1765
1766/// Append 8 `0xFF` bytes to `prefix`. Used as the exclusive upper
1767/// bound of an equality lookup against a non-unique index: every
1768/// key with the same user-prefix is ≤ `prefix || 0xFF..` because
1769/// the trailing 8 bytes are an `Id` (`u64` BE).
1770fn append_max_id(prefix: &[u8]) -> Vec<u8> {
1771 let mut out = Vec::with_capacity(prefix.len() + 8);
1772 out.extend_from_slice(prefix);
1773 out.extend_from_slice(&u64::MAX.to_be_bytes());
1774 out
1775}
1776
1777/// Trim the trailing 8-byte id suffix off a non-unique index key.
1778/// For `Unique` keys the suffix is absent, so the full key is the
1779/// user portion.
1780fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
1781 match kind {
1782 obj_core::IndexKind::Unique => full_key.to_vec(),
1783 _ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
1784 _ => full_key.to_vec(),
1785 }
1786}
1787
1788/// Recover the `Id` (as a `u64`) from one index B-tree entry. For
1789/// non-unique kinds the id is the trailing 8 bytes of the KEY (the
1790/// suffix appended by the maintenance path); for `Unique` keys the
1791/// id is the VALUE. Used by
1792/// [`Collection::count_distinct_ids_in_range`].
1793fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
1794 // `Unique` indexes carry the id in the value; non-unique kinds
1795 // (Standard / Each / Composite) carry it as the trailing 8-byte
1796 // suffix of the key. The slicing here is O(1) — no per-entry
1797 // loop to bound (the outer walk's bound is the distinct-set cap).
1798 let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
1799 value
1800 } else {
1801 if full_key.len() < 8 {
1802 return Err(Error::Corruption { page_id: 0 });
1803 }
1804 &full_key[full_key.len() - 8..]
1805 };
1806 let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
1807 Ok(id.get())
1808}
1809
1810// =====================================================================
1811// Phase 7A (M14 #14) — streaming index range iterator
1812// =====================================================================
1813
1814/// Resumption marker for [`IterIndexRange`]'s first refill. After the
1815/// first batch the iterator switches to `Excluded(last_emitted_full_key)`
1816/// for subsequent refills (the same shape `Db::iter_all` uses for the
1817/// primary tree).
1818enum InitialResume {
1819 Included(Vec<u8>),
1820 Excluded(Vec<u8>),
1821 Unbounded,
1822}
1823
1824/// One entry in [`IterIndexRange`]'s pending buffer. Read/Write
1825/// modes stage `Pending(key, id)` and resolve the `T` lazily on
1826/// `next()`; Lazy mode pre-resolves under a single `read_transaction`
1827/// (to preserve snapshot consistency across the index walk + the
1828/// per-row primary `get`) and stages `Resolved(key, T)` directly.
1829enum StagedEntry<T> {
1830 Pending(Vec<u8>, Id),
1831 Resolved(Vec<u8>, T),
1832}
1833
1834/// Streaming iterator returned by [`Collection::iter_range`]. Yields
1835/// `Result<(user_key_bytes, T)>` one row at a time; internally
1836/// refills a fixed-size `(user_key, Id)` buffer in batches of
1837/// `ITER_INDEX_RANGE_BATCH = 256` so the per-step pager-lock cost
1838/// amortises. Memory stays bounded at `O(batch × small_bytes +
1839/// distinct_ids)` regardless of the range's total size.
1840///
1841/// Held data: a `&'a Collection<'_, T>` borrow (the iterator is bound
1842/// to the lifetime of `Collection::iter_range`'s `&self` borrow), the
1843/// index's root page-id, the dedup set for `Each` indexes, the next-
1844/// chunk resumption marker, and the staged batch.
1845pub struct IterIndexRange<'a, T: Document> {
1846 coll: &'a Collection<'a, T>,
1847 descriptor_kind: obj_core::IndexKind,
1848 index_root: u64,
1849 /// First-refill marker — `None` after the iterator has emitted
1850 /// at least one chunk; subsequent refills use `last_full_key`.
1851 initial_resume: Option<InitialResume>,
1852 /// Last full B-tree key emitted by the most recent refill. Drives
1853 /// the `Excluded(_)` resumption bound for the next chunk.
1854 last_full_key: Option<Vec<u8>>,
1855 /// User-supplied end bound (already widened per index kind).
1856 end_bound: Bound<Vec<u8>>,
1857 /// Pre-staged entries from the most recent refill. `next()`
1858 /// pops from the front. Each entry is either `Pending(key, id)`
1859 /// (deferred get-back, the Read/Write streaming path) or
1860 /// `Resolved(key, T)` (eager get-back inside a single
1861 /// `read_transaction`, the Lazy-mode fallback).
1862 buffer: VecDeque<Result<StagedEntry<T>>>,
1863 /// Persistent de-dup set for `Each` indexes. Power-of-ten Rule
1864 /// 3: the set is intentionally unbounded — if the caller wants
1865 /// a hard cap they should use
1866 /// [`Collection::count_distinct_ids_in_range`] (which caps at
1867 /// [`MAX_DISTINCT_IDS`]); the iterator's correctness contract
1868 /// is per-row dedup across the whole range.
1869 emitted_ids: HashSet<u64>,
1870 finished: bool,
1871}
1872
1873impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
1874 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1875 f.debug_struct("IterIndexRange")
1876 .field("descriptor_kind", &self.descriptor_kind)
1877 .field("index_root", &self.index_root)
1878 .field("buffer_len", &self.buffer.len())
1879 .field("emitted_ids_len", &self.emitted_ids.len())
1880 .field("finished", &self.finished)
1881 .finish_non_exhaustive()
1882 }
1883}
1884
1885impl<T: Document> IterIndexRange<'_, T> {
1886 /// Refill `self.buffer` with up to [`ITER_INDEX_RANGE_BATCH`]
1887 /// `(user_key, Id)` pairs by walking the index B-tree from the
1888 /// current resumption marker. Sets `self.finished` when the
1889 /// underlying range scan yields fewer than the requested batch
1890 /// (i.e. it ran past the end bound).
1891 ///
1892 /// Power-of-ten Rule 7: per-step decode errors are pushed into
1893 /// the buffer as `Err(_)` so the caller observes them via
1894 /// `next()` rather than aborting iteration.
1895 fn refill(&mut self) -> Result<()> {
1896 let env = match &self.coll.mode {
1897 CollectionMode::Write(w) => w.env,
1898 CollectionMode::Read(r) => r.env,
1899 CollectionMode::Lazy(_) => {
1900 // Lazy mode falls back to the eager `index_range`
1901 // path in `iter_range` itself (see below). Reaching
1902 // refill in Lazy mode indicates an internal logic
1903 // error, NOT a recoverable corruption — surface as
1904 // a typed error rather than `unwrap`.
1905 return Err(Error::ReadOnly {
1906 operation: "internal: iter_range refill in Lazy mode",
1907 });
1908 }
1909 };
1910 let root_pid = PageId::new(self.index_root)
1911 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1912 let start = self.next_start_bound();
1913 let end = clone_bound_ref(&self.end_bound);
1914 let mut pager = lock_pager(env)?;
1915 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1916 let iter = tree.range(&mut pager, (start, end))?;
1917 let mut staged: VecDeque<Result<StagedEntry<T>>> =
1918 VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
1919 let mut last_full: Option<Vec<u8>> = None;
1920 let mut consumed: usize = 0;
1921 for step in iter {
1922 if consumed >= ITER_INDEX_RANGE_BATCH {
1923 break;
1924 }
1925 consumed = consumed
1926 .checked_add(1)
1927 .ok_or(Error::BTreeInvariantViolated {
1928 reason: "iter_range batch counter overflow",
1929 })?;
1930 self.stage_one(&mut staged, &mut last_full, step);
1931 }
1932 if consumed < ITER_INDEX_RANGE_BATCH {
1933 self.finished = true;
1934 }
1935 drop(pager);
1936 self.buffer.extend(staged);
1937 if let Some(k) = last_full {
1938 self.last_full_key = Some(k);
1939 }
1940 Ok(())
1941 }
1942
1943 /// Process one B-tree step into the staged batch. Encapsulates
1944 /// the `Each`-dedup, the trailing-id-suffix strip, and the
1945 /// `Id::from_be_bytes` parse. Free helper so the refill body
1946 /// stays under the Rule-4 60-line ceiling.
1947 fn stage_one(
1948 &mut self,
1949 staged: &mut VecDeque<Result<StagedEntry<T>>>,
1950 last_full: &mut Option<Vec<u8>>,
1951 step: Result<(Vec<u8>, Vec<u8>)>,
1952 ) {
1953 let (full_key, id_bytes) = match step {
1954 Ok(kv) => kv,
1955 Err(e) => {
1956 staged.push_back(Err(e));
1957 return;
1958 }
1959 };
1960 *last_full = Some(full_key.clone());
1961 let Some(id) = Id::from_be_bytes(&id_bytes) else {
1962 staged.push_back(Err(Error::Corruption { page_id: 0 }));
1963 return;
1964 };
1965 if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
1966 // Same doc already emitted under a different element
1967 // key — skip without producing an output entry.
1968 return;
1969 }
1970 let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
1971 staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
1972 }
1973
1974 /// Compute the start bound for the next refill: use
1975 /// `initial_resume` on the first call (consuming it), thereafter
1976 /// use `Excluded(last_full_key)`.
1977 fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
1978 if let Some(initial) = self.initial_resume.take() {
1979 return match initial {
1980 InitialResume::Included(k) => Bound::Included(k),
1981 InitialResume::Excluded(k) => Bound::Excluded(k),
1982 InitialResume::Unbounded => Bound::Unbounded,
1983 };
1984 }
1985 match &self.last_full_key {
1986 Some(k) => Bound::Excluded(k.clone()),
1987 None => Bound::Unbounded,
1988 }
1989 }
1990}
1991
1992impl<T: Document> Iterator for IterIndexRange<'_, T> {
1993 type Item = Result<(Vec<u8>, T)>;
1994
1995 fn next(&mut self) -> Option<Self::Item> {
1996 loop {
1997 if let Some(staged) = self.buffer.pop_front() {
1998 return Some(self.resolve_one(staged));
1999 }
2000 if self.finished {
2001 return None;
2002 }
2003 if let Err(e) = self.refill() {
2004 // Latch the iterator shut on a refill failure
2005 // (lock acquisition, B-tree open, etc.). Surface
2006 // the error once, then return None on subsequent
2007 // calls — power-of-ten Rule 7.
2008 self.finished = true;
2009 return Some(Err(e));
2010 }
2011 // refill ran; the loop will pop or notice finished.
2012 }
2013 }
2014}
2015
2016impl<T: Document> IterIndexRange<'_, T> {
2017 /// Resolve one staged entry into a `(user_key, T)` pair. For
2018 /// `Pending(_, id)` entries (the Read/Write streaming path),
2019 /// calls [`Collection::get`] to decode `T` on demand; for
2020 /// `Resolved(_, T)` entries (the Lazy-mode eager path), returns
2021 /// the already-decoded value. Orphan index entries (id missing
2022 /// in the primary tree) surface as [`Error::Corruption`],
2023 /// matching [`Collection::index_range`]'s existing contract.
2024 fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
2025 match staged? {
2026 StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
2027 Some(doc) => Ok((user_key, doc)),
2028 None => Err(Error::Corruption { page_id: 0 }),
2029 },
2030 StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
2031 }
2032 }
2033}
2034
2035/// Clone a `&Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`. Takes a
2036/// borrowed owned bound (the shape `IterIndexRange::end_bound`
2037/// stores) and hands back an owned copy for the resumption walk.
2038fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
2039 match b {
2040 Bound::Included(v) => Bound::Included(v.clone()),
2041 Bound::Excluded(v) => Bound::Excluded(v.clone()),
2042 Bound::Unbounded => Bound::Unbounded,
2043 }
2044}