obj/collection.rs
1//! `Collection<T>` — typed handle to one collection's primary
2//! B-tree.
3
4use std::borrow::Cow;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::marker::PhantomData;
7use std::ops::Bound;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use obj_core::btree::BTree;
11use obj_core::codec::{decode, encode};
12use obj_core::pager::page::PageId;
13use obj_core::pager::Pager;
14use obj_core::platform::FileHandle;
15use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
16
17/// Boxed iterator alias used by [`Collection::lookup`] and
18/// [`Collection::index_range`]. The iterator borrows from the
19/// enclosing transaction; the `'tx` lifetime is bound on the
20/// `Collection<T>` it was obtained from.
21pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
22
23/// Per-batch refill size for [`IterIndexRange`]. The iterator yields
24/// one `(user_key, T)` pair at a time but pulls index B-tree entries
25/// in fixed-size chunks so the per-step pager-lock acquisition cost
26/// amortises across many `next()` calls. Power-of-ten Rule 3: the
27/// buffer is fixed-size — at ~8 bytes/key plus the trailing 8-byte
28/// id suffix that's ~4 KiB peak for the staged batch. The buffer
29/// does NOT scale with the range's total size.
30const ITER_INDEX_RANGE_BATCH: usize = 256;
31
32/// Per-call cap on the bounded `HashSet<Id>` used by
33/// [`Collection::count_distinct_ids_in_range`] to count unique
34/// document `Id`s under an `Each` index. Power-of-ten Rule 3: the
35/// distinct set is allocation-bounded; exceeding the cap surfaces
36/// [`obj_core::Error::DistinctCountExceeded`] rather than chewing
37/// arbitrary memory. The user can narrow the range via
38/// `.index_range(...)` to fit inside the budget.
39pub const MAX_DISTINCT_IDS: usize = 100_000;
40
41use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
42
43/// #90 — per-transaction descriptor cache. Maps collection name to
44/// its LIVE [`CollectionDescriptor`] (with `next_id`, `primary_root`,
45/// and every index `root_page_id` advanced IN-MEMORY across the
46/// transaction). This is the single mid-txn source of truth for those
47/// roots; [`crate::WriteTxn::commit`] flushes each entry back to the
48/// catalog exactly once. Shared (via `Arc`) between the [`WriteTxn`]
49/// and every [`Collection`] handle opened on it, so two handles of the
50/// same collection observe the same advancing descriptor.
51pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
52
53/// Construct an empty [`DescriptorCache`].
54#[must_use]
55pub(crate) fn new_descriptor_cache() -> DescriptorCache {
56 Arc::new(Mutex::new(HashMap::new()))
57}
58
59/// Acquire the descriptor-cache mutex; map poison into `Busy` (Rule
60/// 7 — no panic on a poisoned lock).
61pub(crate) fn lock_descriptors(
62 cache: &Mutex<HashMap<String, CollectionDescriptor>>,
63) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
64 cache.lock().map_err(|_| Error::Busy {
65 kind: obj_core::LockKind::WriterInProcess,
66 })
67}
68
69/// Resolve the live cached descriptor for `name`, lazily loading it
70/// from the catalog B-tree on first touch in this transaction. After
71/// the first load the catalog tree is NEVER re-read mid-txn for this
72/// collection — every subsequent read/advance goes through the cache
73/// entry, so the unique pre-check and every index-tree open observe
74/// the in-memory-advanced roots (#90 load-bearing invariant).
75///
76/// Returns a mutable borrow into the cache so callers bump `next_id`,
77/// advance `primary_root`, and let `apply_doc_change` advance index
78/// roots in place — all without a per-doc `Catalog::update`.
79pub(crate) fn cached_descriptor_mut<'g>(
80 cache: &'g mut HashMap<String, CollectionDescriptor>,
81 pager: &mut Pager<FileHandle>,
82 catalog: &Catalog<FileHandle>,
83 name: &str,
84) -> Result<&'g mut CollectionDescriptor> {
85 if !cache.contains_key(name) {
86 let descriptor = catalog_get_required(pager, catalog, name)?;
87 cache.insert(name.to_owned(), descriptor);
88 }
89 cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
90}
91
92/// Typed handle to a collection.
93///
94/// Construct via [`crate::WriteTxn::collection`] (lazy-create) or
95/// [`crate::ReadTxn::collection`] (read-only; errors if absent), or
96/// via [`crate::Db::collection`] for a one-shot read-only handle
97/// bound to a runtime collection name (M11 #94 — Phase 1B).
98///
99/// All methods take `&self` because the underlying state lives
100/// behind mutexes on the parent transaction; the handle itself is
101/// stateless beyond the descriptor it caches.
102pub struct Collection<'tx, T: Document> {
103 /// `Mode::Write` carries the [`WriteTxn`] reference;
104 /// `Mode::Read` the [`ReadTxn`] reference; `Mode::Lazy` carries
105 /// a `&Db` and opens a private read transaction on each method
106 /// call. The [`Collection`]'s lifetime is bound to whichever
107 /// it was constructed from.
108 mode: CollectionMode<'tx>,
109 /// Collection name resolved at construction. For handles built
110 /// via [`crate::WriteTxn::collection`] / [`crate::ReadTxn::collection`]
111 /// this equals `T::COLLECTION`. For handles built via
112 /// [`crate::Db::collection`] this is the caller-supplied runtime
113 /// name (which may differ from `T::COLLECTION`, e.g.
114 /// `"archive.orders"` against a type whose declared `COLLECTION`
115 /// is `"orders"`).
116 //
117 // `collection_name` rather than the lint-suggested `name`
118 // because `name` collides with `IndexDescriptor::name` /
119 // `IndexSpec::name` throughout this module's internals.
120 //
121 // `Cow<'static, str>` (#84): the common `WriteTxn`/`ReadTxn`
122 // open path resolves to `T::COLLECTION` — a `&'static str` —
123 // so it can be stored as `Cow::Borrowed` with NO `to_owned()`
124 // heap allocation per read. The runtime-name path
125 // (`Db::collection`, `open_readonly_named` with a caller name)
126 // stores `Cow::Owned`.
127 #[allow(clippy::struct_field_names)]
128 collection_name: Cow<'static, str>,
129 /// Cached descriptor. Populated at construction for `Write` /
130 /// `Read` mode; never updated in place — a `Collection<T>`
131 /// reflects the catalog row that existed when the handle was
132 /// opened. `update` / `delete` inside the same txn re-read the
133 /// descriptor through the catalog lock to capture mutations
134 /// from prior calls in the same txn (e.g. an `insert` that
135 /// advanced `next_id`).
136 ///
137 /// For `Lazy` mode the descriptor is a sentinel — the real
138 /// descriptor is loaded inside each method's private read
139 /// transaction.
140 descriptor: CollectionDescriptor,
141 _phantom: PhantomData<fn() -> T>,
142}
143
144/// Backing-reference inside a [`Collection`]. Encodes whether
145/// the txn is writable.
146enum CollectionMode<'tx> {
147 Write(WriteRef<'tx>),
148 Read(ReadRef<'tx>),
149 /// Read-only handle that opens a private read transaction on
150 /// each method call. Constructed by [`crate::Db::collection`];
151 /// the `'tx` lifetime is the borrow of `&Db`.
152 Lazy(LazyRef<'tx>),
153}
154
155struct LazyRef<'db> {
156 /// Borrowed `Db` — methods open one-shot read transactions on
157 /// it. The `'db` lifetime keeps the borrow alive for as long
158 /// as the [`Collection`] handle.
159 db: &'db crate::Db,
160}
161
162struct WriteRef<'tx> {
163 env: &'tx obj_core::TxnEnv<FileHandle>,
164 catalog: Arc<Mutex<Catalog<FileHandle>>>,
165 /// #90: shared per-txn descriptor cache (cloned from the owning
166 /// [`WriteTxn`]). The single mid-txn source of truth for
167 /// `next_id` / `primary_root` / index roots; flushed once at
168 /// commit.
169 descriptors: DescriptorCache,
170}
171
172struct ReadRef<'tx> {
173 snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
174 env: &'tx obj_core::TxnEnv<FileHandle>,
175}
176
177impl<'tx, T: Document> Collection<'tx, T> {
178 /// Open the collection on the write side, lazy-creating the
179 /// catalog row + an empty primary B-tree on first call, and
180 /// reconciling the type's declared `Document::indexes()` against
181 /// the catalog's stored descriptors (M7 #57) on the first call
182 /// per process per collection.
183 pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
184 // Stage 1: lazy-create the collection row + an empty primary
185 // B-tree if this is the first call for `T` against this
186 // database. The returned descriptor is the pre-reconcile
187 // shape; we re-read after reconciliation below because the
188 // reconciler may have appended index descriptors.
189 let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
190 reconcile_indexes_once::<T>(
191 &tx.inner,
192 &tx.catalog,
193 &tx.reconciled,
194 &mut tx.reconciled_staged,
195 )?;
196 // Re-read the descriptor in case the reconciler mutated the
197 // index roster (the reconciler runs through `Catalog::update`
198 // which rewrites the descriptor's `indexes` vector).
199 let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
200 Ok(Self {
201 mode: CollectionMode::Write(WriteRef {
202 env: tx.inner.env(),
203 catalog: Arc::clone(&tx.catalog),
204 descriptors: Arc::clone(&tx.descriptors),
205 }),
206 // `T::COLLECTION` is `&'static str` (#84): borrow, don't
207 // allocate.
208 collection_name: Cow::Borrowed(T::COLLECTION),
209 descriptor,
210 _phantom: PhantomData,
211 })
212 }
213
214 /// Open the collection on the read side. Errors if the
215 /// collection has not yet been registered AT THE SNAPSHOT'S
216 /// PINNED LSN — a collection created by a concurrent writer
217 /// AFTER the snapshot was pinned is invisible to this txn (M6
218 /// #53). The descriptor is read by walking the catalog B+tree
219 /// rooted at `snapshot.root_catalog()` (the value captured by
220 /// `Pager::reader_snapshot` at pin time) using the snapshot-
221 /// aware [`Catalog::lookup_via_snapshot`] free function — NOT
222 /// via the writer's live `Catalog.tree.root`, which a concurrent
223 /// writer may have COW-advanced past the snapshot.
224 ///
225 /// M11 #93: if `T::COLLECTION` is of the form
226 /// `<namespace>.<name>`, the lookup dispatches against the
227 /// attached database registered under `<namespace>` instead of
228 /// the calling Db. The attached snapshot is the one pinned by
229 /// `Db::read_transaction` at txn-begin; the attached env is
230 /// passed through the same way.
231 pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
232 // `T::COLLECTION` is `&'static str` — store it as
233 // `Cow::Borrowed` (#84), avoiding the per-read `to_owned()`
234 // heap allocation on the common typed-handle path.
235 Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
236 }
237
238 /// Open the collection on the read side against a caller-supplied
239 /// runtime `name`. Like [`Self::open_readonly`] but the catalog
240 /// lookup uses `name` instead of `T::COLLECTION` — required for
241 /// [`crate::Db::collection`]'s Phase 1B accessor (M11 #94), which
242 /// binds a runtime collection name (e.g. `"archive.orders"`) to
243 /// the typed handle.
244 ///
245 /// Namespace dispatch (`<ns>.<tail>` → attached database) follows
246 /// the same shape as [`Self::open_readonly`].
247 pub(crate) fn open_readonly_named(
248 tx: &'tx ReadTxn<'_>,
249 name: Cow<'static, str>,
250 ) -> Result<Self> {
251 let (namespace, tail) = crate::db::split_namespace(&name);
252 let (env, snapshot, lookup_name): (
253 &'tx obj_core::TxnEnv<FileHandle>,
254 &'tx obj_core::ReaderSnapshot<FileHandle>,
255 &str,
256 ) = match namespace {
257 None => (tx.inner.env(), tx.inner.snapshot(), &name),
258 Some(ns) => {
259 let ctx = tx
260 .attached
261 .get(ns)
262 .ok_or_else(|| Error::CollectionNamespaceUnknown {
263 namespace: ns.to_owned(),
264 })?;
265 (ctx.env.as_ref(), &ctx.snapshot, tail)
266 }
267 };
268 let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
269 else {
270 return Err(Error::CollectionNotFound {
271 name: name.into_owned(),
272 });
273 };
274 Ok(Self {
275 mode: CollectionMode::Read(ReadRef { snapshot, env }),
276 // `name` is moved in unchanged — `Cow::Borrowed(&'static)`
277 // for the typed-handle path (no alloc), `Cow::Owned` for
278 // runtime-name handles.
279 collection_name: name,
280 descriptor,
281 _phantom: PhantomData,
282 })
283 }
284
285 /// Construct a deferred-lookup, read-only handle bound to a
286 /// runtime collection name. Each method call opens a private
287 /// read transaction on `db` and dispatches through
288 /// [`Self::open_readonly_named`]. Construction is infallible —
289 /// errors (missing collection, unknown namespace, etc.) surface
290 /// at the first method call.
291 ///
292 /// Used by [`crate::Db::collection`] (Phase 1B, M11 #94).
293 pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
294 Self {
295 mode: CollectionMode::Lazy(LazyRef { db }),
296 collection_name: Cow::Owned(name),
297 // Sentinel descriptor — never read in Lazy mode; the
298 // real descriptor is loaded inside each method's
299 // private read transaction.
300 descriptor: CollectionDescriptor::new(0, 0, 0),
301 _phantom: PhantomData,
302 }
303 }
304
305 /// Cached descriptor (`collection_id`, `primary_root`,
306 /// `type_version`, `next_id` at handle-open time).
307 #[must_use]
308 pub fn descriptor(&self) -> &CollectionDescriptor {
309 &self.descriptor
310 }
311
312 /// #90: the LIVE primary-tree root for a `Write`-mode handle.
313 ///
314 /// Prefers the per-txn descriptor cache (which carries every
315 /// `primary_root` advance from this txn's prior writes) so a
316 /// read-after-write on the same handle inside one transaction
317 /// observes its own uncommitted inserts. Falls back to the
318 /// handle's open-time `primary_root` if this collection has not
319 /// yet been written in the txn (no cache entry).
320 fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
321 let cache = lock_descriptors(&write.descriptors)?;
322 Ok(cache
323 .get(self.collection_name.as_ref())
324 .map_or(self.descriptor.primary_root, |d| d.primary_root))
325 }
326
327 /// #90: the LIVE `root_page_id` for the named `Active` index on a
328 /// `Write`-mode handle. Prefers the per-txn cache so a read after
329 /// an index-mutating write in the same txn descends the advanced
330 /// root; falls back to `fallback` (the handle's open-time
331 /// descriptor entry) when the collection has no cache entry yet.
332 fn write_index_root(
333 &self,
334 write: &WriteRef<'tx>,
335 index_name: &str,
336 fallback: u64,
337 ) -> Result<u64> {
338 let cache = lock_descriptors(&write.descriptors)?;
339 let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
340 return Ok(fallback);
341 };
342 let live = descriptor
343 .indexes
344 .iter()
345 .find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
346 .map_or(fallback, |d| d.root_page_id);
347 Ok(live)
348 }
349
350 /// Insert `doc`. Returns the freshly-allocated [`Id`].
351 ///
352 /// # Errors
353 ///
354 /// - [`Error::ReadOnly`] if the handle is read-only.
355 /// - Pager / catalog / codec errors propagated.
356 // `doc: T` is taken by value for caller ergonomics (the user
357 // owns the value and gives it to the database). The codec's
358 // `encode(&T, ...)` takes a reference, so clippy sees the
359 // function body as "owned but borrows only" — accepted as the
360 // intended public-API shape.
361 #[allow(clippy::needless_pass_by_value)]
362 pub fn insert(&self, doc: T) -> Result<Id> {
363 let write = self.write_or_err("insert")?;
364 let name: &str = self.collection_name.as_ref();
365 let mut pager = lock_pager(write.env)?;
366 let catalog = lock_catalog(&write.catalog)?;
367 // #90: the cached descriptor is the SOLE mid-txn source of
368 // truth. `next_id` is an in-memory bump; `primary_root` and
369 // index roots advance in the cache; NO per-doc
370 // `Catalog::update` — the single flush is deferred to commit.
371 let mut cache = lock_descriptors(&write.descriptors)?;
372 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
373 let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
374 let bytes = encode(&doc, descriptor.collection_id)?;
375 let key = id.to_be_bytes();
376 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
377 tree.insert(&mut pager, &key, &bytes)?;
378 descriptor.primary_root = tree.root().get();
379 // Index maintenance lands inside the same WAL transaction as
380 // the primary write, descending the cache's in-memory index
381 // roots. A `UniqueConstraintViolation` here surfaces via `?`
382 // and the surrounding `WriteTxn` rolls back atomically.
383 crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
384 Ok(id)
385 }
386
387 /// Fetch the document at `id`.
388 ///
389 /// On the write side this consults the pager (sees pending
390 /// writes in the current txn). On the read side it consults
391 /// the snapshot's frozen view.
392 ///
393 /// # Lazy migration (M10 #84)
394 ///
395 /// If the on-disk record was written by an older
396 /// `Document::VERSION` than the current `T::VERSION`, the codec
397 /// walks the stored bytes through the schema registered for
398 /// that version (see `T::historical_schemas()`) and dispatches
399 /// the resulting structured `Dynamic` through `T::migrate`.
400 /// **The migrated bytes are NOT written back to disk.** The
401 /// next [`Collection::get`] re-reads the same v(n) bytes and
402 /// re-runs migration. Only a subsequent
403 /// [`Collection::update`](Self::update) /
404 /// [`Collection::upsert`](Self::upsert) writes the document
405 /// back, at which point the on-disk header records
406 /// `T::VERSION`.
407 ///
408 /// This contract is what allows mixed-version reads to scale:
409 /// a 10⁹-doc collection does not need to be batch-rewritten on
410 /// schema upgrade. Power-of-ten Rule 7: every "migration ran"
411 /// path returns the migrated `T`; no implicit write-back.
412 ///
413 /// # Errors
414 ///
415 /// Pager / B-tree / codec errors propagated. In particular:
416 ///
417 /// - [`Error::SchemaNotRegistered`](obj_core::Error::SchemaNotRegistered)
418 /// if the stored record carries a `type_version` for which
419 /// `T::historical_schemas()` has no entry.
420 /// - [`Error::SchemaMigrationNotImplemented`](obj_core::Error::SchemaMigrationNotImplemented)
421 /// if the registered `T::migrate` returns the default error.
422 pub fn get(&self, id: Id) -> Result<Option<T>> {
423 if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
424 return r;
425 }
426 let key = id.to_be_bytes();
427 let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
428 return Ok(None);
429 };
430 Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
431 }
432
433 /// Read the primary-tree value at `key` against the current
434 /// `Write` / `Read` mode (Lazy mode is dispatched upstream of
435 /// every public caller via [`Self::dispatch_lazy`]).
436 fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
437 match &self.mode {
438 CollectionMode::Write(write) => {
439 // #90: descend the LIVE primary root so a get after an
440 // insert/update in the SAME txn sees its own writes.
441 let primary_root = self.write_primary_root(write)?;
442 let mut pager = lock_pager(write.env)?;
443 let tree = btree_handle(&pager, primary_root)?;
444 tree.get(&mut pager, key)
445 }
446 CollectionMode::Read(read) => {
447 snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
448 }
449 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
450 operation: "internal: lazy-mode primary read",
451 }),
452 }
453 }
454
455 /// Apply `f` to the document at `id`, writing the mutated value
456 /// back.
457 ///
458 /// # Errors
459 ///
460 /// - [`Error::ReadOnly`] on a read-side handle.
461 /// - [`Error::DocumentNotFound`] if `id` is absent.
462 /// - Pager / catalog / codec errors propagated.
463 pub fn update<F>(&self, id: Id, f: F) -> Result<()>
464 where
465 F: FnOnce(&mut T),
466 {
467 let write = self.write_or_err("update")?;
468 let name: &str = self.collection_name.as_ref();
469 let mut pager = lock_pager(write.env)?;
470 let catalog = lock_catalog(&write.catalog)?;
471 // #90: resolve the live cached descriptor (sole mid-txn source
472 // of truth); advance its roots in place, no per-doc flush.
473 let mut cache = lock_descriptors(&write.descriptors)?;
474 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
475 let key = id.to_be_bytes();
476 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
477 let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
478 collection: T::COLLECTION,
479 id: id.get(),
480 })?;
481 let old_value = decode::<T>(&existing, descriptor.collection_id)?;
482 let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
483 f(&mut new_value);
484 let bytes = encode(&new_value, descriptor.collection_id)?;
485 tree.delete(&mut pager, &key)?;
486 tree.insert(&mut pager, &key, &bytes)?;
487 descriptor.primary_root = tree.root().get();
488 crate::index_maint::apply_doc_change::<T>(
489 &mut pager,
490 descriptor,
491 Some(&old_value),
492 Some(&new_value),
493 id,
494 )?;
495 Ok(())
496 }
497
498 /// Delete the document at `id`. Returns `true` if it existed.
499 ///
500 /// # Errors
501 ///
502 /// - [`Error::ReadOnly`] on a read-side handle.
503 /// - Pager / catalog errors propagated.
504 pub fn delete(&self, id: Id) -> Result<bool> {
505 let write = self.write_or_err("delete")?;
506 let name: &str = self.collection_name.as_ref();
507 let mut pager = lock_pager(write.env)?;
508 let catalog = lock_catalog(&write.catalog)?;
509 // #90: cached descriptor is the sole mid-txn source of truth.
510 let mut cache = lock_descriptors(&write.descriptors)?;
511 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
512 let key = id.to_be_bytes();
513 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
514 // Materialise the old document so the index-maintenance
515 // path can compute the OLD key set and emit per-index
516 // delete entries. If the row does not exist, we still
517 // return `Ok(false)` for API back-compat (no index work).
518 let old_value = match tree.get(&mut pager, &key)? {
519 Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
520 None => None,
521 };
522 let removed = tree.delete(&mut pager, &key)?;
523 descriptor.primary_root = tree.root().get();
524 crate::index_maint::apply_doc_change::<T>(
525 &mut pager,
526 descriptor,
527 old_value.as_ref(),
528 None,
529 id,
530 )?;
531 Ok(removed)
532 }
533
534 /// Insert-or-replace `doc` at `id`.
535 ///
536 /// # Errors
537 ///
538 /// - [`Error::ReadOnly`] on a read-side handle.
539 /// - Pager / catalog / codec errors propagated.
540 // `doc: T` passed by value for ergonomic ownership transfer;
541 // codec takes `&T` so clippy reports a "borrow only" body.
542 #[allow(clippy::needless_pass_by_value)]
543 pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
544 let write = self.write_or_err("upsert")?;
545 let name: &str = self.collection_name.as_ref();
546 let mut pager = lock_pager(write.env)?;
547 let catalog = lock_catalog(&write.catalog)?;
548 // #90: cached descriptor is the sole mid-txn source of truth.
549 let mut cache = lock_descriptors(&write.descriptors)?;
550 let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
551 let bytes = encode(&doc, descriptor.collection_id)?;
552 let key = id.to_be_bytes();
553 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
554 // Materialise the prior value (if any) so the index-
555 // maintenance diff sees the OLD key set.
556 let old_value = match tree.get(&mut pager, &key)? {
557 Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
558 None => None,
559 };
560 let _ = tree.delete(&mut pager, &key)?;
561 tree.insert(&mut pager, &key, &bytes)?;
562 descriptor.primary_root = tree.root().get();
563 crate::index_maint::apply_doc_change::<T>(
564 &mut pager,
565 descriptor,
566 old_value.as_ref(),
567 Some(&doc),
568 id,
569 )?;
570 Ok(())
571 }
572
573 /// Look up the single document whose `index_name` key matches
574 /// `key` under a `Unique` index.
575 ///
576 /// Errors with [`Error::IndexNotUnique`] if `index_name` resolves
577 /// to a non-unique index — `find_unique` is *only* defined on
578 /// `Unique` indexes. For `Standard` / `Each` / `Composite` use
579 /// [`Self::lookup`] (which returns an iterator).
580 ///
581 /// Snapshot-aware: on a write-side handle the lookup sees the
582 /// current txn's pending writes; on a read-side handle it sees
583 /// the snapshot's frozen view.
584 ///
585 /// # Errors
586 ///
587 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
588 /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
589 /// - Pager / B-tree / codec errors propagated.
590 pub fn find_unique(
591 &self,
592 index_name: &str,
593 key: impl Into<obj_core::codec::Dynamic>,
594 ) -> Result<Option<T>> {
595 let key_dyn = key.into();
596 if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
597 return r;
598 }
599 let descriptor = self.active_index(index_name)?;
600 if descriptor.kind != obj_core::IndexKind::Unique {
601 return Err(Error::IndexNotUnique {
602 collection: self.collection_name.clone().into_owned(),
603 name: index_name.to_owned(),
604 });
605 }
606 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
607 let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
608 match id_bytes {
609 Some(bytes) => match Id::from_be_bytes(&bytes) {
610 Some(id) => self.get(id),
611 None => Err(Error::Corruption { page_id: 0 }),
612 },
613 None => Ok(None),
614 }
615 }
616
617 /// Yield every document whose `index_name` key matches `key`.
618 /// Works on `Standard` / `Unique` / `Each` indexes. Returns
619 /// `Err(Error::IndexKindMismatch)`-style guidance for
620 /// `Composite` (use [`Self::index_range`] for tuple-shaped
621 /// keys).
622 ///
623 /// The same document is yielded at most once even if it owns
624 /// multiple matching entries — `Each` indexes can encode the
625 /// same `id` under multiple element keys; we de-dup on emit.
626 ///
627 /// # Errors
628 ///
629 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
630 /// - Pager / B-tree / codec errors propagated.
631 pub fn lookup(
632 &self,
633 index_name: &str,
634 key: impl Into<obj_core::codec::Dynamic>,
635 ) -> Result<IndexIter<'static, T>>
636 where
637 T: Send + 'static,
638 {
639 let key_dyn = key.into();
640 if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
641 return r;
642 }
643 let descriptor = self.active_index(index_name)?;
644 let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
645 let ids = match descriptor.kind {
646 obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
647 obj_core::IndexKind::Standard
648 | obj_core::IndexKind::Each
649 | obj_core::IndexKind::Composite => {
650 self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
651 }
652 // `IndexKind` is `#[non_exhaustive]`; an unknown kind means
653 // this build predates the on-disk index. Refuse rather than
654 // return a wrong (possibly empty) id set.
655 _ => return Err(Error::InvalidArgument("unsupported index kind")),
656 };
657 let resolved = self.resolve_unique_ids(ids)?;
658 Ok(Box::new(resolved.into_iter().map(Ok)))
659 }
660
661 /// Yield `(user_key, doc)` pairs whose index key falls within
662 /// `range`. The bounds are [`Dynamic`](obj_core::codec::Dynamic)
663 /// values — the same ergonomic type [`crate::Query::index_range`]
664 /// takes — encoded internally through the order-preserving field
665 /// encoder ([`obj_core::index::encode_field`]); callers no longer
666 /// hand-encode index-key bytes.
667 ///
668 /// For non-Unique kinds (`Standard` / `Each` / `Composite`) the
669 /// bounds are widened internally so a user-facing
670 /// `Included(x)..=Included(x)` range matches every entry whose
671 /// user-key equals `x` even though the underlying B-tree key
672 /// carries an `id_be8` suffix (see `docs/format.md` § Index key
673 /// encoding § Range-bound widening (non-Unique kinds)).
674 ///
675 /// # Errors
676 ///
677 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
678 /// - [`obj_core::Error::Codec`] if a `Dynamic::String` bound
679 /// carries an embedded NUL byte (the order-preserving encoder
680 /// rejects those).
681 /// - Pager / B-tree / codec errors propagated.
682 pub fn index_range<R>(
683 &self,
684 index_name: &str,
685 range: R,
686 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
687 where
688 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
689 T: Send + 'static,
690 {
691 let start = encode_dynamic_bound(range.start_bound())?;
692 let end = encode_dynamic_bound(range.end_bound())?;
693 self.index_range_encoded(index_name, start, end)
694 }
695
696 /// Encoded-bytes variant of [`Self::index_range`]. The bounds are
697 /// already the order-preserving field encoding of the user's
698 /// `Dynamic` value(s); this keeps the signature general for
699 /// `Composite` "starts-with" scans and is the entry point the
700 /// query layer / lazy-dispatch recursion call after they have
701 /// done their own encoding.
702 pub(crate) fn index_range_encoded(
703 &self,
704 index_name: &str,
705 start_bound: std::ops::Bound<Vec<u8>>,
706 end_bound: std::ops::Bound<Vec<u8>>,
707 ) -> Result<IndexIter<'static, (Vec<u8>, T)>>
708 where
709 T: Send + 'static,
710 {
711 if let Some(r) = self.dispatch_lazy(|c| {
712 c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
713 }) {
714 return r;
715 }
716 let descriptor = self.active_index(index_name)?;
717 let (start, end) =
718 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
719 let entries = self.collect_range(descriptor, start, end)?;
720 let descriptor_kind = descriptor.kind;
721 let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
722 let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
723 for (full_key, id_bytes_value) in entries {
724 let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
725 out.push(Err(Error::Corruption { page_id: 0 }));
726 continue;
727 };
728 // For Each indexes the same doc may appear multiple
729 // times under different element keys — de-dup on
730 // emission.
731 if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
732 continue;
733 }
734 // For non-unique kinds the B-tree key includes the
735 // trailing 8-byte id suffix; strip it so the caller
736 // sees only the user-key bytes.
737 let user_key = strip_id_suffix(&full_key, descriptor_kind);
738 match self.get(id) {
739 Ok(Some(doc)) => out.push(Ok((user_key, doc))),
740 Ok(None) => {
741 // Orphan index entry — surface as Corruption.
742 out.push(Err(Error::Corruption { page_id: 0 }));
743 }
744 Err(e) => out.push(Err(e)),
745 }
746 }
747 Ok(Box::new(out.into_iter()))
748 }
749
750 /// Streaming variant of [`Self::index_range`] (Phase 7A perf
751 /// pass, M14 #14). Yields `(user_key, T)` pairs lazily — the
752 /// returned [`IterIndexRange`] decodes one `T` per `next()` call
753 /// rather than building a `Vec<Result<(_, T)>>` of every match
754 /// up front. The iterator borrows `&'a self`, so it must be
755 /// consumed inside the lifetime of the enclosing
756 /// [`crate::WriteTxn`] / [`crate::ReadTxn`] (or the
757 /// [`crate::Db::collection`] handle, in Lazy mode).
758 ///
759 /// # When to prefer `iter_range` over `index_range`
760 ///
761 /// - **Memory.** `index_range` allocates `O(matches × sizeof(T))`
762 /// upfront; `iter_range` keeps a fixed-size [`VecDeque`] of
763 /// `(key, id)` pairs (`ITER_INDEX_RANGE_BATCH = 256` entries)
764 /// and decodes one `T` at a time. For a 100k-row range with
765 /// ~500-byte documents that's ~50 MB peak vs. a few KiB.
766 /// - **Latency-to-first-row.** `index_range` decodes every
767 /// matching document before returning the iterator;
768 /// `iter_range` returns immediately after the first chunk
769 /// refill, so the first `next()` returns after one index walk
770 /// + one primary-tree `get` (rather than `N`).
771 ///
772 /// # When `index_range` is still the right answer
773 ///
774 /// `index_range` returns an `IndexIter<'static, _>` — it can
775 /// escape the `read_transaction` / `transaction` closure that
776 /// produced it. `iter_range` is bound to `&self`, so the
777 /// iterator dies when the [`Collection`] handle dies. If you
778 /// need to return the iterator to outer scope, stick with
779 /// `index_range`.
780 ///
781 /// # Per-row `get`-back design choice
782 ///
783 /// Each `next()` yields `(user_key, T)` by calling
784 /// [`Self::get`] under the hood — i.e. a SECOND B+tree descent
785 /// per row (the first is the index range walk; the second is
786 /// the primary-tree `get(id)`). This is intentional and
787 /// inherited from `index_range`: the index leaf stores only
788 /// the document `id` (8 bytes), not the document bytes. A
789 /// future format-minor bump may add value-in-index storage to
790 /// short-circuit the second descent; that work is pinned to
791 /// post-1.0 (tracked as pit issue #16, "value-in-index
792 /// storage to eliminate `index_range` double-decode").
793 ///
794 /// # Errors
795 ///
796 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
797 /// - Pager / B-tree / codec errors propagated at construction
798 /// and from each `next()` call.
799 pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
800 where
801 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
802 T: Send + 'static,
803 {
804 let start_bound = encode_dynamic_bound(range.start_bound())?;
805 let end_bound = encode_dynamic_bound(range.end_bound())?;
806 self.iter_range_encoded(index_name, start_bound, end_bound)
807 }
808
809 /// Encoded-bytes variant of [`Self::iter_range`]. Bounds are the
810 /// order-preserving field encoding of the user's `Dynamic`
811 /// value(s); used internally by the lazy-mode fallback path.
812 fn iter_range_encoded<'a>(
813 &'a self,
814 index_name: &str,
815 start_bound: Bound<Vec<u8>>,
816 end_bound: Bound<Vec<u8>>,
817 ) -> Result<IterIndexRange<'a, T>>
818 where
819 T: Send + 'static,
820 {
821 // Lazy mode falls back to `index_range`'s eager materialization
822 // path so the index walk + per-row `get` share a single
823 // snapshot. Streaming refills can't preserve that — each
824 // refill would open a fresh txn and observe a different
825 // snapshot. Lazy callers who want true streaming should open
826 // an explicit `Db::read_transaction` and call `iter_range`
827 // on the bound collection there.
828 if matches!(self.mode, CollectionMode::Lazy(_)) {
829 return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
830 }
831 let descriptor = self.active_index(index_name)?;
832 // #90: in Write mode the iterator must walk the LIVE index
833 // root (the per-txn cache's advanced value) so a streaming
834 // scan opened after an in-txn index write sees its own
835 // entries. In Read mode there is no write cache, so the
836 // open-time descriptor root is authoritative.
837 let index_root = match &self.mode {
838 CollectionMode::Write(w) => {
839 self.write_index_root(w, index_name, descriptor.root_page_id)?
840 }
841 _ => descriptor.root_page_id,
842 };
843 let (start, end) =
844 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
845 // Stash the resumption marker as `Excluded(start)` only on
846 // the first refill; afterwards the iterator overwrites it
847 // with the last full_key it emitted. The initial `start`
848 // bound is honoured by `refill` via the same `last_full_key`
849 // → `Excluded(_)` translation.
850 let initial_resume = match start {
851 Bound::Included(k) => InitialResume::Included(k),
852 Bound::Excluded(k) => InitialResume::Excluded(k),
853 Bound::Unbounded => InitialResume::Unbounded,
854 };
855 Ok(IterIndexRange {
856 coll: self,
857 descriptor_kind: descriptor.kind,
858 index_root,
859 initial_resume: Some(initial_resume),
860 last_full_key: None,
861 end_bound: end,
862 buffer: VecDeque::new(),
863 emitted_ids: HashSet::new(),
864 finished: false,
865 })
866 }
867
868 /// Lazy-mode fallback for [`Self::iter_range`]: delegates to
869 /// [`Self::index_range`] (which itself dispatches through a fresh
870 /// read txn) and rehouses the eagerly-materialised entries into
871 /// the streaming iterator's buffer as
872 /// [`StagedEntry::Resolved`]. Power-of-ten Rule 4: keeping this
873 /// isolated so the streaming path's `iter_range` body stays
874 /// small.
875 fn iter_range_lazy_fallback<'a>(
876 &'a self,
877 index_name: &str,
878 start_bound: Bound<Vec<u8>>,
879 end_bound: Bound<Vec<u8>>,
880 ) -> Result<IterIndexRange<'a, T>>
881 where
882 T: Send + 'static,
883 {
884 let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
885 let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
886 for item in materialized {
887 match item {
888 Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
889 Err(e) => buffer.push_back(Err(e)),
890 }
891 }
892 Ok(IterIndexRange {
893 coll: self,
894 descriptor_kind: obj_core::IndexKind::Standard,
895 index_root: 0,
896 initial_resume: None,
897 last_full_key: None,
898 end_bound: Bound::Unbounded,
899 buffer,
900 emitted_ids: HashSet::new(),
901 finished: true,
902 })
903 }
904
905 /// Look up the `IndexKind` of an active index by name. Used by
906 /// the M8 query layer to dispatch `Query::count` between the
907 /// entry-count and distinct-id-count paths (M8 follow-up #72).
908 ///
909 /// # Errors
910 ///
911 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
912 pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
913 Ok(self.active_index(index_name)?.kind)
914 }
915
916 /// Resolve `index_name` to an `Active` `IndexDescriptor` on the
917 /// collection. Errors with [`Error::IndexNotFound`] if absent
918 /// or `DroppedPending`.
919 ///
920 /// Returns a borrow into `self.descriptor.indexes` — no per-lookup
921 /// clone (#84). Every caller uses the descriptor only within the
922 /// enclosing `&self` borrow; `iter_range_encoded` copies the two
923 /// `Copy` fields it needs (`kind`, `root_page_id`) into the
924 /// returned iterator rather than holding the borrow.
925 fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
926 let entry = self
927 .descriptor
928 .indexes
929 .iter()
930 .find(|d| d.name == index_name);
931 match entry {
932 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
933 _ => Err(Error::IndexNotFound {
934 collection: self.collection_name.clone().into_owned(),
935 name: index_name.to_owned(),
936 }),
937 }
938 }
939
940 /// Single-key `get` on an index B-tree. Used by `find_unique`
941 /// and by the Unique-kind branch of `lookup`.
942 fn index_get(
943 &self,
944 descriptor: &obj_core::IndexDescriptor,
945 key: &[u8],
946 ) -> Result<Option<Vec<u8>>> {
947 match &self.mode {
948 CollectionMode::Write(write) => {
949 // #90: descend the LIVE index root from the per-txn
950 // cache so an index read after an index-mutating write
951 // in the SAME txn observes its own entries.
952 let root_raw =
953 self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
954 let root = PageId::new(root_raw)
955 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
956 let mut pager = lock_pager(write.env)?;
957 let tree = BTree::<FileHandle>::open(&pager, root)?;
958 tree.get(&mut pager, key)
959 }
960 CollectionMode::Read(read) => {
961 let root = PageId::new(descriptor.root_page_id)
962 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
963 let pager = lock_pager(read.env)?;
964 BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
965 }
966 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
967 operation: "internal: lazy-mode index_get",
968 }),
969 }
970 }
971
972 /// Collect every `(full_key, value)` entry from an index B-tree
973 /// whose key starts with `prefix`. For unique kinds the prefix
974 /// is the entire key (one match max); for non-unique kinds we
975 /// match every key whose first `prefix.len()` bytes equal
976 /// `prefix` (the trailing `id_suffix` varies per doc).
977 fn collect_nonunique_equal(
978 &self,
979 descriptor: &obj_core::IndexDescriptor,
980 prefix: &[u8],
981 ) -> Result<Vec<u64>> {
982 let entries = self.collect_range(
983 descriptor,
984 std::ops::Bound::Included(prefix.to_vec()),
985 // Upper bound is `prefix || 0xFF..` — every key whose
986 // user-portion equals `prefix` falls in
987 // `[prefix, prefix || u64::MAX]`.
988 std::ops::Bound::Included(append_max_id(prefix)),
989 )?;
990 let mut ids = Vec::with_capacity(entries.len());
991 let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
992 for (_full_key, value) in entries {
993 let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
994 if emitted.insert(id.get()) {
995 ids.push(id.get());
996 }
997 }
998 Ok(ids)
999 }
1000
1001 /// Collect a single id from a Unique index B-tree at `key`.
1002 fn collect_unique(
1003 &self,
1004 descriptor: &obj_core::IndexDescriptor,
1005 key: &[u8],
1006 ) -> Result<Vec<u64>> {
1007 match self.index_get(descriptor, key)? {
1008 Some(bytes) => Id::from_be_bytes(&bytes)
1009 .map(|id| vec![id.get()])
1010 .ok_or(Error::Corruption { page_id: 0 }),
1011 None => Ok(Vec::new()),
1012 }
1013 }
1014
1015 /// Collect every `(full_key, value)` entry from an index B-tree
1016 /// whose key falls within `(start, end)`.
1017 fn collect_range(
1018 &self,
1019 descriptor: &obj_core::IndexDescriptor,
1020 start: std::ops::Bound<Vec<u8>>,
1021 end: std::ops::Bound<Vec<u8>>,
1022 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1023 // Read mode pins every page read to the txn's snapshot (M12
1024 // #12): a concurrent writer's post-snapshot index entries must
1025 // not leak into a read txn's range/count. Write mode keeps the
1026 // live-pager scan so the txn observes its own uncommitted
1027 // index writes (it has no snapshot to pin against).
1028 match &self.mode {
1029 CollectionMode::Read(r) => {
1030 let root = PageId::new(descriptor.root_page_id)
1031 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1032 let pager = lock_pager(r.env)?;
1033 let iter = BTree::<FileHandle>::range_via_snapshot(
1034 &pager,
1035 r.snapshot,
1036 root,
1037 (start, end),
1038 )?;
1039 let mut out = Vec::new();
1040 for step in iter {
1041 out.push(step?);
1042 }
1043 Ok(out)
1044 }
1045 CollectionMode::Write(w) => {
1046 // #90: scan the LIVE index root from the per-txn cache
1047 // so an in-txn index scan sees its own uncommitted
1048 // entries (the open-time descriptor root may be stale).
1049 let root_raw =
1050 self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
1051 let root = PageId::new(root_raw)
1052 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1053 let mut pager = lock_pager(w.env)?;
1054 let tree = BTree::<FileHandle>::open(&pager, root)?;
1055 let iter = tree.range(&mut pager, (start, end))?;
1056 let mut out = Vec::new();
1057 for step in iter {
1058 out.push(step?);
1059 }
1060 Ok(out)
1061 }
1062 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1063 operation: "internal: lazy-mode collect_range",
1064 }),
1065 }
1066 }
1067
1068 /// Resolve a `Vec<u64>` of `Id` integer values into concrete
1069 /// `T` documents via `self.get`. Preserves order; missing rows
1070 /// surface as `Error::Corruption` (orphan index entry).
1071 fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
1072 let mut out = Vec::with_capacity(ids.len());
1073 for raw in ids {
1074 let id =
1075 Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
1076 let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
1077 out.push(doc);
1078 }
1079 Ok(out)
1080 }
1081
1082 /// Count every entry in the primary tree WITHOUT decoding the
1083 /// documents. Used by the M8 [`crate::Query::count`] no-decode
1084 /// fast path; the iterator visits leaf pages and counts entries
1085 /// rather than running each through postcard.
1086 ///
1087 /// Power-of-ten Rule 2: bounded by the B+tree's `MAX_RANGE_NODES`
1088 /// budget (inherited from `BTree::range`).
1089 ///
1090 /// # Errors
1091 ///
1092 /// Pager / B-tree errors propagated.
1093 pub fn count_all(&self) -> Result<u64> {
1094 // Closure (rather than `Collection::count_all`) so the
1095 // higher-ranked lifetime on `dispatch_lazy`'s
1096 // `for<'a> FnOnce(&Collection<'a, T>) -> _` bound is
1097 // satisfied — `Collection::count_all` resolves to a single
1098 // lifetime fn-item type, which fails the HRTB check.
1099 #[allow(clippy::redundant_closure_for_method_calls)]
1100 if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
1101 return r;
1102 }
1103 // Read mode pins the full-tree scan to the txn's snapshot
1104 // (M12 #12) so a concurrent writer's post-snapshot inserts
1105 // cannot perturb the count; write mode keeps the live scan so
1106 // the txn sees its own uncommitted writes.
1107 match &self.mode {
1108 CollectionMode::Read(r) => {
1109 let pager = lock_pager(r.env)?;
1110 let pid = PageId::new(self.descriptor.primary_root)
1111 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1112 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
1113 count_range_iter(iter)
1114 }
1115 CollectionMode::Write(w) => {
1116 // #90: count the LIVE primary root so an in-txn count
1117 // reflects this txn's own uncommitted inserts/deletes.
1118 let root = self.write_primary_root(w)?;
1119 let mut pager = lock_pager(w.env)?;
1120 let tree = btree_handle(&pager, root)?;
1121 let iter = tree.range(&mut pager, ..)?;
1122 count_range_iter(iter)
1123 }
1124 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1125 operation: "internal: lazy-mode count_all",
1126 }),
1127 }
1128 }
1129
1130 /// Count every entry whose encoded key falls inside `range` on
1131 /// the named index's B-tree, WITHOUT decoding any document. M8
1132 /// fast path for [`crate::Query::count`] when the source is an
1133 /// `index_range`.
1134 ///
1135 /// Returns the number of index B-tree entries — for an `Each`
1136 /// index that may exceed the document count (one doc emits
1137 /// multiple entries); for other kinds it equals the matching
1138 /// doc count.
1139 ///
1140 /// # Errors
1141 ///
1142 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1143 /// - Pager / B-tree errors propagated.
1144 pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1145 where
1146 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1147 {
1148 let start = encode_dynamic_bound(range.start_bound())?;
1149 let end = encode_dynamic_bound(range.end_bound())?;
1150 self.count_index_range_encoded(index_name, start, end)
1151 }
1152
1153 /// Encoded-bytes variant of [`Self::count_index_range`]. Bounds
1154 /// are the order-preserving field encoding of the user's
1155 /// `Dynamic` value(s); used by the query-layer count fast path.
1156 pub(crate) fn count_index_range_encoded(
1157 &self,
1158 index_name: &str,
1159 start_bound: std::ops::Bound<Vec<u8>>,
1160 end_bound: std::ops::Bound<Vec<u8>>,
1161 ) -> Result<u64> {
1162 if let Some(r) = self.dispatch_lazy(|c| {
1163 c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
1164 }) {
1165 return r;
1166 }
1167 let descriptor = self.active_index(index_name)?;
1168 let (start, end) =
1169 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1170 let entries = self.collect_range(descriptor, start, end)?;
1171 // `entries.len()` is a `usize`; promote to `u64` carefully.
1172 // `usize` is at most 64 bits on every supported target.
1173 u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1174 reason: "index range entry count exceeds u64",
1175 })
1176 }
1177
1178 /// Count distinct document `Id`s whose entries fall inside
1179 /// `range` on the named index's B-tree, WITHOUT decoding any
1180 /// document. For `Each` indexes this is the correct shape of
1181 /// the "how many docs match" question — `count_index_range`
1182 /// returns the entry count, which overshoots when a single doc
1183 /// contributes multiple entries.
1184 ///
1185 /// Implementation walks the index B-tree, parses the trailing
1186 /// 8-byte big-endian `Id` suffix from each non-unique key, and
1187 /// tracks the unique set in a bounded [`std::collections::HashSet`]
1188 /// capped at [`MAX_DISTINCT_IDS`]. Exceeding the cap surfaces
1189 /// [`Error::DistinctCountExceeded`] — the caller should narrow
1190 /// the range.
1191 ///
1192 /// # Per-kind semantics
1193 ///
1194 /// - `Standard`, `Composite`: equivalent to `count_index_range`
1195 /// (one entry per doc by construction; the trailing-id-suffix
1196 /// walk still produces the same total).
1197 /// - `Unique`: keys carry NO id suffix — the entry value is the
1198 /// raw 8-byte `Id`; the walk reads the value instead.
1199 /// - `Each`: the dedup is meaningful — one doc may contribute
1200 /// N entries under N distinct element keys.
1201 ///
1202 /// # Errors
1203 ///
1204 /// - [`Error::IndexNotFound`] if `index_name` is unknown / dropped.
1205 /// - [`Error::DistinctCountExceeded`] if the distinct set
1206 /// exceeds [`MAX_DISTINCT_IDS`].
1207 /// - [`Error::Corruption`] if an entry's id suffix / value is
1208 /// not parseable as an [`obj_core::Id`].
1209 /// - Pager / B-tree errors propagated.
1210 pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
1211 where
1212 R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
1213 {
1214 let start = encode_dynamic_bound(range.start_bound())?;
1215 let end = encode_dynamic_bound(range.end_bound())?;
1216 self.count_distinct_ids_in_range_encoded(index_name, start, end)
1217 }
1218
1219 /// Encoded-bytes variant of [`Self::count_distinct_ids_in_range`].
1220 /// Bounds are the order-preserving field encoding of the user's
1221 /// `Dynamic` value(s); used by the query-layer count fast path.
1222 pub(crate) fn count_distinct_ids_in_range_encoded(
1223 &self,
1224 index_name: &str,
1225 start_bound: std::ops::Bound<Vec<u8>>,
1226 end_bound: std::ops::Bound<Vec<u8>>,
1227 ) -> Result<u64> {
1228 if let Some(r) = self.dispatch_lazy(|c| {
1229 c.count_distinct_ids_in_range_encoded(
1230 index_name,
1231 start_bound.clone(),
1232 end_bound.clone(),
1233 )
1234 }) {
1235 return r;
1236 }
1237 let descriptor = self.active_index(index_name)?;
1238 let (start, end) =
1239 crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
1240 let entries = self.collect_range(descriptor, start, end)?;
1241 let mut distinct: HashSet<u64> = HashSet::new();
1242 for (full_key, value) in entries {
1243 let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
1244 if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
1245 return Err(Error::DistinctCountExceeded {
1246 limit: MAX_DISTINCT_IDS,
1247 });
1248 }
1249 }
1250 // `distinct.len()` is a `usize`; promote to `u64` carefully.
1251 // `usize` is at most 64 bits on every supported target.
1252 u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
1253 reason: "distinct id count exceeds u64",
1254 })
1255 }
1256
1257 /// Materialise every `(Id, T)` pair in the collection.
1258 ///
1259 /// Implementation note: M6 returns an owned `Vec` rather than a
1260 /// streaming iterator because the B+tree range API borrows the
1261 /// pager, and threading that borrow through the mutex guards
1262 /// in the iterator chain is awkward. M7+ may convert to a
1263 /// streaming shape once the index API is in place.
1264 ///
1265 /// # Errors
1266 ///
1267 /// Pager / B-tree / codec errors propagated.
1268 pub fn all(&self) -> Result<Vec<(Id, T)>> {
1269 // Closure (rather than `Collection::all`) for the same
1270 // HRTB reason documented on `count_all` above.
1271 #[allow(clippy::redundant_closure_for_method_calls)]
1272 if let Some(r) = self.dispatch_lazy(|c| c.all()) {
1273 return r;
1274 }
1275 match &self.mode {
1276 CollectionMode::Write(write) => {
1277 // #90: scan the LIVE primary root so an in-txn `all()`
1278 // includes this txn's own uncommitted inserts.
1279 let root = self.write_primary_root(write)?;
1280 let mut pager = lock_pager(write.env)?;
1281 scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
1282 }
1283 CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
1284 read.snapshot,
1285 read.env,
1286 self.descriptor.primary_root,
1287 self.descriptor.collection_id,
1288 ),
1289 CollectionMode::Lazy(_) => Err(Error::ReadOnly {
1290 operation: "internal: lazy-mode all",
1291 }),
1292 }
1293 }
1294
1295 fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
1296 match &self.mode {
1297 CollectionMode::Write(w) => Ok(w),
1298 CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
1299 Err(Error::ReadOnly { operation: op })
1300 }
1301 }
1302 }
1303
1304 /// If this handle is in `Lazy` mode, dispatch `body` through a
1305 /// fresh read transaction on the bound [`crate::Db`] — opening a
1306 /// transient [`Collection`] via [`Self::open_readonly_named`] and
1307 /// invoking the user-supplied closure on it. Returns `Some(_)`
1308 /// when the dispatch fires (the closure ran or the underlying
1309 /// open failed); returns `None` for non-Lazy handles so the
1310 /// caller can fall through to the existing logic.
1311 ///
1312 /// Power-of-ten Rule 4: keeps each public method's body small —
1313 /// the dispatch shim is one match arm instead of a per-method
1314 /// `if let CollectionMode::Lazy(_)` ladder.
1315 fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
1316 where
1317 F: FnOnce(&Collection<'_, T>) -> Result<R>,
1318 {
1319 match &self.mode {
1320 CollectionMode::Lazy(LazyRef { db }) => {
1321 // Clone the stored name into an owned `Cow` for the
1322 // transient handle opened inside the private read txn.
1323 // `into_owned` keeps the `'static` Cow bound the
1324 // `open_readonly_named` signature requires regardless
1325 // of whether the stored name was Borrowed or Owned.
1326 let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
1327 Some(db.read_transaction(move |tx| {
1328 let coll = Collection::<T>::open_readonly_named(tx, name)?;
1329 body(&coll)
1330 }))
1331 }
1332 _ => None,
1333 }
1334 }
1335}
1336
1337// ---------- internal helpers --------------------------------------
1338
1339fn lock_pager(
1340 env: &obj_core::TxnEnv<FileHandle>,
1341) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1342 env.pager().lock().map_err(|_| Error::Busy {
1343 kind: obj_core::LockKind::WriterInProcess,
1344 })
1345}
1346
1347/// Ensure `T::COLLECTION` exists in the catalog, lazy-creating an
1348/// empty primary B-tree on first call. Used on the write side.
1349fn ensure_collection<T: Document>(
1350 inner: &obj_core::WriteTxn<'_, FileHandle>,
1351 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1352) -> Result<CollectionDescriptor> {
1353 let mut pager = inner.lock_pager()?;
1354 let mut catalog_guard = lock_catalog(catalog)?;
1355 if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
1356 return Ok(d);
1357 }
1358 let tree = BTree::<FileHandle>::empty(&mut pager)?;
1359 let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
1360 let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
1361 catalog_guard
1362 .get(&mut pager, T::COLLECTION)?
1363 .ok_or(Error::Corruption { page_id: 0 })
1364}
1365
1366/// Re-read the descriptor for `T::COLLECTION` after any catalog
1367/// mutation. Used by [`Collection::open_or_create`] after the
1368/// reconciler runs.
1369fn reread_descriptor<T: Document>(
1370 inner: &obj_core::WriteTxn<'_, FileHandle>,
1371 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1372) -> Result<CollectionDescriptor> {
1373 let mut pager = inner.lock_pager()?;
1374 let catalog_guard = lock_catalog(catalog)?;
1375 catalog_guard
1376 .get(&mut pager, T::COLLECTION)?
1377 .ok_or(Error::Corruption { page_id: 0 })
1378}
1379
1380/// Reconcile `T::indexes()` against the catalog's stored descriptors
1381/// on the FIRST call per process per `(collection, version)`.
1382/// Subsequent calls for the same `(collection, version)` observe the
1383/// cache hit (in the shared `reconciled` set OR this txn's `staged`
1384/// set) and skip the catalog walk.
1385///
1386/// Reconciliation runs inside the user's WAL transaction so a
1387/// rolled-back txn leaves the catalog clean. If reconciliation
1388/// fails (e.g. `Error::IndexKindMismatch`), neither set is populated
1389/// so the next attempt re-runs the reconciler.
1390///
1391/// #93 — the skip-check is `shared ∪ staged`: a second handle of the
1392/// same `(collection, version)` within ONE txn still skips the
1393/// (idempotent) catalog walk via `staged`, but the key is recorded in
1394/// the per-txn `staged` set, NOT the shared set.
1395/// [`crate::WriteTxn::commit`] promotes the staged keys into the shared
1396/// set only after the WAL commit lands — so a rolled-back lazy-create
1397/// can never poison the shared cache into skipping reconciliation on a
1398/// later txn (the original bug).
1399fn reconcile_indexes_once<T: Document>(
1400 inner: &obj_core::WriteTxn<'_, FileHandle>,
1401 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1402 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1403 staged: &mut HashSet<(String, u32)>,
1404) -> Result<()> {
1405 // The generic `#[derive(Document)]` path is exactly the non-generic
1406 // raw seam ([`reconcile_specs_once`]) with the collection name,
1407 // version, and spec list supplied from the compile-time `T` rather
1408 // than from a caller argument. Both share ONE body so the cache /
1409 // staging / validation / catalog-walk semantics can never drift
1410 // apart (#108).
1411 reconcile_specs_once(
1412 inner,
1413 catalog,
1414 reconciled,
1415 staged,
1416 T::COLLECTION,
1417 T::VERSION,
1418 &T::indexes(),
1419 )
1420}
1421
1422/// Non-generic core of index reconciliation, shared by the generic
1423/// `#[derive(Document)]` path ([`reconcile_indexes_once`]) and the
1424/// public raw seam ([`crate::WriteTxn::reconcile_indexes_raw`], #108).
1425///
1426/// Reconciles `specs` against the catalog's stored descriptors for
1427/// `collection` on the FIRST call per process per `(collection,
1428/// version)`, honoring the same `shared ∪ staged` skip-cache (keyed by
1429/// `(collection, version)` — #130), per-txn staging, and pre-mutation
1430/// validation as the generic path — the only difference is that the
1431/// collection name, version, and spec list are arguments rather than
1432/// `T::COLLECTION` / `T::VERSION` / `T::indexes()`.
1433///
1434/// # Why the cache key includes `version` (#130)
1435///
1436/// `Catalog::reconcile_indexes` is a FULL reconcile: it declares specs
1437/// missing from the catalog AND drops `Active` descriptors absent from
1438/// `specs`. Keying the skip-cache by `collection` ALONE meant that once
1439/// a process reconciled a collection at one schema version, a LATER
1440/// version in the same process that ADDED a new index never reconciled
1441/// — the added index never became `Active` and index-maintaining writes
1442/// failed with `IndexNotFound`. Keying by `(collection, version)`
1443/// reconciles each version exactly once: the common single-version case
1444/// is unchanged (one key, reconciled once), and a cross-version index
1445/// ADD reconciles the new version's specs on its first write.
1446///
1447/// ## Caveat — conflicting index REMOVAL interleaved across versions
1448///
1449/// Because each `(collection, version)` reconciles independently and
1450/// `reconcile_indexes` drops `Active` indexes absent from the version's
1451/// specs, alternating writes between two live versions of the SAME
1452/// collection in ONE process — where the versions declare DIFFERENT
1453/// index sets — can leave the catalog reflecting whichever version
1454/// reconciled most recently (its specs drive the drop set). Index
1455/// ADDITION (the common monotonic schema-evolution case) is fully
1456/// correct. This is strictly better than the prior behavior, which
1457/// never reconciled the second version at all (so its added index was
1458/// simply missing); the removal-interleaving edge is a narrow,
1459/// single-process anti-pattern.
1460///
1461/// The caller MUST have ensured `collection` exists in the catalog
1462/// (the generic path runs [`ensure_collection`] first; the raw seam
1463/// runs [`crate::txn::ensure_collection_raw`]) — `reconcile_indexes`
1464/// errors with `CollectionNotFound` otherwise.
1465pub(crate) fn reconcile_specs_once(
1466 inner: &obj_core::WriteTxn<'_, FileHandle>,
1467 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1468 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1469 staged: &mut HashSet<(String, u32)>,
1470 collection: &str,
1471 version: u32,
1472 specs: &[obj_core::IndexSpec],
1473) -> Result<()> {
1474 // Cache key is `(collection, version)` (#130): each schema version
1475 // reconciles its OWN spec set exactly once per process.
1476 let key = (collection.to_owned(), version);
1477 // Fast path: already reconciled in a prior committed txn (shared)
1478 // or earlier in THIS txn (staged) — skip the catalog walk. Probe
1479 // `staged` first: it needs no lock and covers the repeat-handle
1480 // case; the shared probe takes the mutex only when `staged` misses.
1481 if staged.contains(&key) {
1482 return Ok(());
1483 }
1484 {
1485 let cache = lock_reconciled(reconciled)?;
1486 if cache.contains(&key) {
1487 return Ok(());
1488 }
1489 }
1490 // Validate specs before any catalog mutation so a bad spec
1491 // does not leave a half-reconciled catalog.
1492 for spec in specs {
1493 spec.validate()?;
1494 }
1495 {
1496 let mut pager = inner.lock_pager()?;
1497 let mut catalog_guard = lock_catalog(catalog)?;
1498 let _post = catalog_guard.reconcile_indexes(&mut pager, collection, specs)?;
1499 }
1500 // #93: stage in the PER-TXN set, not the shared one. Promotion to
1501 // the shared set is deferred to a successful `WriteTxn::commit`.
1502 staged.insert(key);
1503 Ok(())
1504}
1505
1506fn lock_reconciled(
1507 reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
1508) -> Result<std::sync::MutexGuard<'_, HashSet<(String, u32)>>> {
1509 reconciled.lock().map_err(|_| Error::Busy {
1510 kind: obj_core::LockKind::WriterInProcess,
1511 })
1512}
1513
1514/// Read-side descriptor lookup against a caller-supplied
1515/// collection name. M11 #93 introduced this byte-shape so the
1516/// namespace-aware [`Collection::open_readonly`] can perform the
1517/// catalog walk against either the calling Db's snapshot (no
1518/// namespace) or an attached Db's snapshot (with the namespace
1519/// prefix stripped).
1520fn read_descriptor_via_snapshot_named(
1521 env: &obj_core::TxnEnv<FileHandle>,
1522 snapshot: &obj_core::ReaderSnapshot<FileHandle>,
1523 name: &str,
1524) -> Result<Option<CollectionDescriptor>> {
1525 let pager = lock_pager(env)?;
1526 Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1527}
1528
1529/// #83 (b): fused one-shot point read for [`crate::Db::get`].
1530///
1531/// Resolves the collection descriptor and the primary-tree value for
1532/// `id` under a SINGLE pager-mutex acquisition (see
1533/// [`snapshot_resolve_and_get`]), then decodes the bytes against the
1534/// resolved `collection_id`. This collapses the two back-to-back
1535/// pager locks the equivalent handle path pays (one to open the
1536/// handle / resolve the descriptor, one for the `get`).
1537///
1538/// Observably identical to
1539/// `tx.collection::<T>()?.get(id)` for the one-shot caller: the
1540/// descriptor lookup and the value get run on the same `ReadTxn`
1541/// snapshot, and a missing collection still surfaces as
1542/// [`Error::CollectionNotFound`]. Namespaced reads (`<ns>.<tail>`)
1543/// fall back to the handle path so the attached-snapshot dispatch is
1544/// unchanged.
1545pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
1546 let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
1547 if namespace.is_some() {
1548 // Namespaced: keep the existing attached-snapshot dispatch.
1549 return Collection::<T>::open_readonly(tx)?.get(id);
1550 }
1551 let key = id.to_be_bytes();
1552 let resolved =
1553 snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
1554 match resolved {
1555 Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
1556 None => Ok(None),
1557 }
1558}
1559
1560/// Re-read the descriptor inside an already-locked pager + catalog
1561/// pair. Surfaces a missing collection as `Error::Corruption`
1562/// because the caller has already opened a write txn against it.
1563fn catalog_get_required(
1564 pager: &mut Pager<FileHandle>,
1565 catalog: &Catalog<FileHandle>,
1566 name: &str,
1567) -> Result<CollectionDescriptor> {
1568 catalog
1569 .get(pager, name)?
1570 .ok_or(Error::Corruption { page_id: 0 })
1571}
1572
1573fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1574 let root_pid =
1575 PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1576 BTree::<FileHandle>::open(pager, root_pid)
1577}
1578
1579/// Drain a B-tree range iterator, counting entries WITHOUT retaining
1580/// their bytes. Shared by [`Collection::count_all`]'s live (write) and
1581/// snapshot-pinned (read) scan arms. Power-of-ten Rule 2: the
1582/// iterator carries its own `MAX_RANGE_NODES` budget; the `u64`
1583/// overflow check guards the count itself.
1584fn count_range_iter<I>(iter: I) -> Result<u64>
1585where
1586 I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
1587{
1588 let mut n: u64 = 0;
1589 for step in iter {
1590 // Probe-only: drop the bytes the moment they decode.
1591 let _ = step?;
1592 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1593 reason: "primary tree entry count exceeds u64",
1594 })?;
1595 }
1596 Ok(n)
1597}
1598
1599// `persist_root` removed in M7 #58: every mutating method now
1600// routes through `index_maint::apply_doc_change`, which persists
1601// the descriptor (including the possibly-advanced `primary_root`)
1602// via `Catalog::update` after every per-index B-tree mutation.
1603
1604/// Snapshot-consistent B-tree lookup (M6 #53).
1605///
1606/// Walks the primary B+tree rooted at `primary_root` using
1607/// [`obj_core::btree::BTree::get_via_snapshot`], which descends
1608/// through [`obj_core::ReaderSnapshot::read_page`] rather than the
1609/// live `Pager::read_page`. This bypasses the WAL `state.view` /
1610/// `state.pending` overlays — a concurrent writer's post-snapshot
1611/// COW commits cannot poison the reader's walk.
1612///
1613/// `primary_root` MUST be the descriptor's `primary_root` as-of
1614/// the snapshot's pinned LSN (i.e. the value read via
1615/// [`obj_core::Catalog::lookup_via_snapshot`] in
1616/// [`read_descriptor_via_snapshot`] above). Using the writer's
1617/// live `primary_root` would defeat the snapshot read.
1618fn snapshot_get_via_btree(
1619 snap: &obj_core::ReaderSnapshot<FileHandle>,
1620 env: &obj_core::TxnEnv<FileHandle>,
1621 primary_root: u64,
1622 key: &[u8],
1623) -> Result<Option<Vec<u8>>> {
1624 let pager = lock_pager(env)?;
1625 let root_pid = PageId::new(primary_root)
1626 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1627 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
1628}
1629
1630/// #83 (b): fused single-lock read. Resolves the descriptor for
1631/// `name` and performs the primary-tree `get` for `key` under ONE
1632/// pager-mutex acquisition, against the SAME `snapshot` — collapsing
1633/// the descriptor-lookup lock (`read_descriptor_via_snapshot_named`)
1634/// and the value-get lock (`snapshot_get_via_btree`) that the
1635/// two-call handle path pays back-to-back.
1636///
1637/// Returns `(descriptor, value)` so the caller can `decode` against
1638/// the resolved `collection_id`. A missing collection surfaces as
1639/// `Err(CollectionNotFound)` (matching the handle path's open-time
1640/// contract for the one-shot caller); a present collection with no
1641/// entry for `key` surfaces as `Ok(None)`.
1642///
1643/// Power-of-ten: keeps poison → `Error::Busy` (Rule 7, via
1644/// `lock_pager`); ≤ 60 lines (Rule 4); `debug_assert`s that the
1645/// snapshot-resolved `primary_root` is the one fed to the get
1646/// (Rule 5).
1647fn snapshot_resolve_and_get(
1648 snap: &obj_core::ReaderSnapshot<FileHandle>,
1649 env: &obj_core::TxnEnv<FileHandle>,
1650 name: &str,
1651 key: &[u8],
1652) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
1653 let pager = lock_pager(env)?;
1654 let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
1655 return Err(Error::CollectionNotFound {
1656 name: name.to_owned(),
1657 });
1658 };
1659 let root_pid = PageId::new(descriptor.primary_root)
1660 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1661 // Rule 5: the value-get MUST descend the descriptor's own
1662 // snapshot-time root — never a re-resolved or live root. A live
1663 // primary B-tree root is always page 1+, so a zero `collection_id`
1664 // here would mean we resolved a degenerate catalog row.
1665 debug_assert_eq!(
1666 root_pid.get(),
1667 descriptor.primary_root,
1668 "fused get must descend the snapshot-resolved primary_root",
1669 );
1670 let value =
1671 obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
1672 Ok(value.map(|v| (descriptor, v)))
1673}
1674
1675fn scan_all<T: Document>(
1676 pager: &mut Pager<FileHandle>,
1677 primary_root: u64,
1678 collection_id: u32,
1679) -> Result<Vec<(Id, T)>> {
1680 let tree = btree_handle(pager, primary_root)?;
1681 let iter = tree.range(pager, ..)?;
1682 let mut out = Vec::new();
1683 for entry in iter {
1684 let (key, value) = entry?;
1685 let id = Id::from_be_bytes(&key)
1686 .ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
1687 let doc = decode::<T>(&value, collection_id)?;
1688 out.push((id, doc));
1689 }
1690 Ok(out)
1691}
1692
1693fn snapshot_scan_via_btree<T: Document>(
1694 _snap: &obj_core::ReaderSnapshot<FileHandle>,
1695 env: &obj_core::TxnEnv<FileHandle>,
1696 primary_root: u64,
1697 collection_id: u32,
1698) -> Result<Vec<(Id, T)>> {
1699 let mut pager = lock_pager(env)?;
1700 scan_all::<T>(&mut pager, primary_root, collection_id)
1701}
1702
1703/// Encode the caller-supplied `Dynamic` value(s) into the bytes a
1704/// lookup against `descriptor` would use as a B-tree key. For
1705/// `Unique` indexes the result is the key bytes verbatim; for
1706/// non-unique kinds the lookup helpers extend with the per-doc
1707/// id suffix at scan time.
1708fn index_key_for_lookup(
1709 descriptor: &obj_core::IndexDescriptor,
1710 fields: &[obj_core::codec::Dynamic],
1711) -> Result<obj_core::index::EncodedIndexKey> {
1712 // Ref-based encode (#84): pass the descriptor's `kind` (Copy) and
1713 // `key_paths` BY REFERENCE — no transient `IndexSpec`, no
1714 // `name`/`key_paths` clone, no redundant `IndexSpec::validate` on
1715 // an already-validated on-disk descriptor. The byte output is
1716 // identical to the old `from_parts` + `encode_index_key` path
1717 // (see `encode_index_key_parts` and the byte-identity test).
1718 obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
1719}
1720
1721/// Encode a `Bound<&Dynamic>` into the index-key `Bound<Vec<u8>>` the
1722/// B-tree scan uses. Shared by the `Dynamic`-taking range methods on
1723/// [`Collection`].
1724///
1725/// A scalar `Dynamic` is encoded with the order-preserving field
1726/// encoder ([`obj_core::index::encode_field`]) — byte-identical to
1727/// what [`crate::Query::index_range`] produces, so a query and a
1728/// direct collection scan over the same scalar bound observe the
1729/// same entries. A [`Dynamic::Seq`](obj_core::codec::Dynamic::Seq)
1730/// bound is encoded as a composite key (the
1731/// [`COMPOSITE_TAG`](obj_core::index::COMPOSITE_TAG)-prefixed
1732/// concatenation of each element's field encoding) so a `Composite`
1733/// index can be range-scanned by a full tuple bound.
1734fn encode_dynamic_bound(
1735 b: std::ops::Bound<&obj_core::codec::Dynamic>,
1736) -> Result<std::ops::Bound<Vec<u8>>> {
1737 match b {
1738 std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
1739 std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
1740 std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
1741 }
1742}
1743
1744/// Encode one `Dynamic` bound value into index-key bytes. Scalars go
1745/// through [`obj_core::index::encode_field`]; a `Seq` is encoded as a
1746/// composite tuple key. Power-of-ten Rule 4: kept separate so
1747/// [`encode_dynamic_bound`] stays a thin three-arm match.
1748fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
1749 match v {
1750 obj_core::codec::Dynamic::Seq(fields) => {
1751 // `COMPOSITE_TAG || encode_field(f0) || encode_field(f1) ..`
1752 // is byte-identical to `encode_index_key`'s composite path
1753 // for the same fields — see `obj_core::index::key`.
1754 let mut out = vec![obj_core::index::COMPOSITE_TAG];
1755 for f in fields {
1756 out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
1757 }
1758 Ok(out)
1759 }
1760 _ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
1761 }
1762}
1763
1764/// Append 8 `0xFF` bytes to `prefix`. Used as the exclusive upper
1765/// bound of an equality lookup against a non-unique index: every
1766/// key with the same user-prefix is ≤ `prefix || 0xFF..` because
1767/// the trailing 8 bytes are an `Id` (`u64` BE).
1768fn append_max_id(prefix: &[u8]) -> Vec<u8> {
1769 let mut out = Vec::with_capacity(prefix.len() + 8);
1770 out.extend_from_slice(prefix);
1771 out.extend_from_slice(&u64::MAX.to_be_bytes());
1772 out
1773}
1774
1775/// Trim the trailing 8-byte id suffix off a non-unique index key.
1776/// For `Unique` keys the suffix is absent, so the full key is the
1777/// user portion.
1778fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
1779 match kind {
1780 obj_core::IndexKind::Unique => full_key.to_vec(),
1781 _ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
1782 _ => full_key.to_vec(),
1783 }
1784}
1785
1786/// Recover the `Id` (as a `u64`) from one index B-tree entry. For
1787/// non-unique kinds the id is the trailing 8 bytes of the KEY (the
1788/// suffix appended by the maintenance path); for `Unique` keys the
1789/// id is the VALUE. Used by
1790/// [`Collection::count_distinct_ids_in_range`].
1791fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
1792 // `Unique` indexes carry the id in the value; non-unique kinds
1793 // (Standard / Each / Composite) carry it as the trailing 8-byte
1794 // suffix of the key. The slicing here is O(1) — no per-entry
1795 // loop to bound (the outer walk's bound is the distinct-set cap).
1796 let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
1797 value
1798 } else {
1799 if full_key.len() < 8 {
1800 return Err(Error::Corruption { page_id: 0 });
1801 }
1802 &full_key[full_key.len() - 8..]
1803 };
1804 let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
1805 Ok(id.get())
1806}
1807
1808// =====================================================================
1809// Phase 7A (M14 #14) — streaming index range iterator
1810// =====================================================================
1811
1812/// Resumption marker for [`IterIndexRange`]'s first refill. After the
1813/// first batch the iterator switches to `Excluded(last_emitted_full_key)`
1814/// for subsequent refills (the same shape `Db::iter_all` uses for the
1815/// primary tree).
1816enum InitialResume {
1817 Included(Vec<u8>),
1818 Excluded(Vec<u8>),
1819 Unbounded,
1820}
1821
1822/// One entry in [`IterIndexRange`]'s pending buffer. Read/Write
1823/// modes stage `Pending(key, id)` and resolve the `T` lazily on
1824/// `next()`; Lazy mode pre-resolves under a single `read_transaction`
1825/// (to preserve snapshot consistency across the index walk + the
1826/// per-row primary `get`) and stages `Resolved(key, T)` directly.
1827enum StagedEntry<T> {
1828 Pending(Vec<u8>, Id),
1829 Resolved(Vec<u8>, T),
1830}
1831
1832/// Streaming iterator returned by [`Collection::iter_range`]. Yields
1833/// `Result<(user_key_bytes, T)>` one row at a time; internally
1834/// refills a fixed-size `(user_key, Id)` buffer in batches of
1835/// `ITER_INDEX_RANGE_BATCH = 256` so the per-step pager-lock cost
1836/// amortises. Memory stays bounded at `O(batch × small_bytes +
1837/// distinct_ids)` regardless of the range's total size.
1838///
1839/// Held data: a `&'a Collection<'_, T>` borrow (the iterator is bound
1840/// to the lifetime of `Collection::iter_range`'s `&self` borrow), the
1841/// index's root page-id, the dedup set for `Each` indexes, the next-
1842/// chunk resumption marker, and the staged batch.
1843pub struct IterIndexRange<'a, T: Document> {
1844 coll: &'a Collection<'a, T>,
1845 descriptor_kind: obj_core::IndexKind,
1846 index_root: u64,
1847 /// First-refill marker — `None` after the iterator has emitted
1848 /// at least one chunk; subsequent refills use `last_full_key`.
1849 initial_resume: Option<InitialResume>,
1850 /// Last full B-tree key emitted by the most recent refill. Drives
1851 /// the `Excluded(_)` resumption bound for the next chunk.
1852 last_full_key: Option<Vec<u8>>,
1853 /// User-supplied end bound (already widened per index kind).
1854 end_bound: Bound<Vec<u8>>,
1855 /// Pre-staged entries from the most recent refill. `next()`
1856 /// pops from the front. Each entry is either `Pending(key, id)`
1857 /// (deferred get-back, the Read/Write streaming path) or
1858 /// `Resolved(key, T)` (eager get-back inside a single
1859 /// `read_transaction`, the Lazy-mode fallback).
1860 buffer: VecDeque<Result<StagedEntry<T>>>,
1861 /// Persistent de-dup set for `Each` indexes. Power-of-ten Rule
1862 /// 3: the set is intentionally unbounded — if the caller wants
1863 /// a hard cap they should use
1864 /// [`Collection::count_distinct_ids_in_range`] (which caps at
1865 /// [`MAX_DISTINCT_IDS`]); the iterator's correctness contract
1866 /// is per-row dedup across the whole range.
1867 emitted_ids: HashSet<u64>,
1868 finished: bool,
1869}
1870
1871impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
1872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1873 f.debug_struct("IterIndexRange")
1874 .field("descriptor_kind", &self.descriptor_kind)
1875 .field("index_root", &self.index_root)
1876 .field("buffer_len", &self.buffer.len())
1877 .field("emitted_ids_len", &self.emitted_ids.len())
1878 .field("finished", &self.finished)
1879 .finish_non_exhaustive()
1880 }
1881}
1882
1883impl<T: Document> IterIndexRange<'_, T> {
1884 /// Refill `self.buffer` with up to [`ITER_INDEX_RANGE_BATCH`]
1885 /// `(user_key, Id)` pairs by walking the index B-tree from the
1886 /// current resumption marker. Sets `self.finished` when the
1887 /// underlying range scan yields fewer than the requested batch
1888 /// (i.e. it ran past the end bound).
1889 ///
1890 /// Power-of-ten Rule 7: per-step decode errors are pushed into
1891 /// the buffer as `Err(_)` so the caller observes them via
1892 /// `next()` rather than aborting iteration.
1893 fn refill(&mut self) -> Result<()> {
1894 let env = match &self.coll.mode {
1895 CollectionMode::Write(w) => w.env,
1896 CollectionMode::Read(r) => r.env,
1897 CollectionMode::Lazy(_) => {
1898 // Lazy mode falls back to the eager `index_range`
1899 // path in `iter_range` itself (see below). Reaching
1900 // refill in Lazy mode indicates an internal logic
1901 // error, NOT a recoverable corruption — surface as
1902 // a typed error rather than `unwrap`.
1903 return Err(Error::ReadOnly {
1904 operation: "internal: iter_range refill in Lazy mode",
1905 });
1906 }
1907 };
1908 let root_pid = PageId::new(self.index_root)
1909 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1910 let start = self.next_start_bound();
1911 let end = clone_bound_ref(&self.end_bound);
1912 let mut pager = lock_pager(env)?;
1913 let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
1914 let iter = tree.range(&mut pager, (start, end))?;
1915 let mut staged: VecDeque<Result<StagedEntry<T>>> =
1916 VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
1917 let mut last_full: Option<Vec<u8>> = None;
1918 let mut consumed: usize = 0;
1919 for step in iter {
1920 if consumed >= ITER_INDEX_RANGE_BATCH {
1921 break;
1922 }
1923 consumed = consumed
1924 .checked_add(1)
1925 .ok_or(Error::BTreeInvariantViolated {
1926 reason: "iter_range batch counter overflow",
1927 })?;
1928 self.stage_one(&mut staged, &mut last_full, step);
1929 }
1930 if consumed < ITER_INDEX_RANGE_BATCH {
1931 self.finished = true;
1932 }
1933 drop(pager);
1934 self.buffer.extend(staged);
1935 if let Some(k) = last_full {
1936 self.last_full_key = Some(k);
1937 }
1938 Ok(())
1939 }
1940
1941 /// Process one B-tree step into the staged batch. Encapsulates
1942 /// the `Each`-dedup, the trailing-id-suffix strip, and the
1943 /// `Id::from_be_bytes` parse. Free helper so the refill body
1944 /// stays under the Rule-4 60-line ceiling.
1945 fn stage_one(
1946 &mut self,
1947 staged: &mut VecDeque<Result<StagedEntry<T>>>,
1948 last_full: &mut Option<Vec<u8>>,
1949 step: Result<(Vec<u8>, Vec<u8>)>,
1950 ) {
1951 let (full_key, id_bytes) = match step {
1952 Ok(kv) => kv,
1953 Err(e) => {
1954 staged.push_back(Err(e));
1955 return;
1956 }
1957 };
1958 *last_full = Some(full_key.clone());
1959 let Some(id) = Id::from_be_bytes(&id_bytes) else {
1960 staged.push_back(Err(Error::Corruption { page_id: 0 }));
1961 return;
1962 };
1963 if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
1964 // Same doc already emitted under a different element
1965 // key — skip without producing an output entry.
1966 return;
1967 }
1968 let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
1969 staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
1970 }
1971
1972 /// Compute the start bound for the next refill: use
1973 /// `initial_resume` on the first call (consuming it), thereafter
1974 /// use `Excluded(last_full_key)`.
1975 fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
1976 if let Some(initial) = self.initial_resume.take() {
1977 return match initial {
1978 InitialResume::Included(k) => Bound::Included(k),
1979 InitialResume::Excluded(k) => Bound::Excluded(k),
1980 InitialResume::Unbounded => Bound::Unbounded,
1981 };
1982 }
1983 match &self.last_full_key {
1984 Some(k) => Bound::Excluded(k.clone()),
1985 None => Bound::Unbounded,
1986 }
1987 }
1988}
1989
1990impl<T: Document> Iterator for IterIndexRange<'_, T> {
1991 type Item = Result<(Vec<u8>, T)>;
1992
1993 fn next(&mut self) -> Option<Self::Item> {
1994 loop {
1995 if let Some(staged) = self.buffer.pop_front() {
1996 return Some(self.resolve_one(staged));
1997 }
1998 if self.finished {
1999 return None;
2000 }
2001 if let Err(e) = self.refill() {
2002 // Latch the iterator shut on a refill failure
2003 // (lock acquisition, B-tree open, etc.). Surface
2004 // the error once, then return None on subsequent
2005 // calls — power-of-ten Rule 7.
2006 self.finished = true;
2007 return Some(Err(e));
2008 }
2009 // refill ran; the loop will pop or notice finished.
2010 }
2011 }
2012}
2013
2014impl<T: Document> IterIndexRange<'_, T> {
2015 /// Resolve one staged entry into a `(user_key, T)` pair. For
2016 /// `Pending(_, id)` entries (the Read/Write streaming path),
2017 /// calls [`Collection::get`] to decode `T` on demand; for
2018 /// `Resolved(_, T)` entries (the Lazy-mode eager path), returns
2019 /// the already-decoded value. Orphan index entries (id missing
2020 /// in the primary tree) surface as [`Error::Corruption`],
2021 /// matching [`Collection::index_range`]'s existing contract.
2022 fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
2023 match staged? {
2024 StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
2025 Some(doc) => Ok((user_key, doc)),
2026 None => Err(Error::Corruption { page_id: 0 }),
2027 },
2028 StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
2029 }
2030 }
2031}
2032
2033/// Clone a `&Bound<Vec<u8>>` into an owned `Bound<Vec<u8>>`. Takes a
2034/// borrowed owned bound (the shape `IterIndexRange::end_bound`
2035/// stores) and hands back an owned copy for the resumption walk.
2036fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
2037 match b {
2038 Bound::Included(v) => Bound::Included(v.clone()),
2039 Bound::Excluded(v) => Bound::Excluded(v.clone()),
2040 Bound::Unbounded => Bound::Unbounded,
2041 }
2042}