obj/txn.rs
1//! Public `WriteTxn` / `ReadTxn` types.
2//!
3//! Thin wrappers over `obj_core::txn::{WriteTxn, ReadTxn}` that
4//! attach a [`Catalog`] reference (the obj-core txn types are
5//! catalog-agnostic; the catalog is the obj crate's responsibility).
6//!
7//! The catalog handle is `Arc<Mutex<Catalog<FileHandle>>>`. Lock
8//! ordering: **always acquire the pager mutex (via the txn env)
9//! BEFORE the catalog mutex**.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, Mutex};
13
14use obj_core::btree::BTree;
15use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
16use obj_core::index::EncodedIndexKey;
17use obj_core::pager::checksum::crc32c;
18use obj_core::pager::page::PageId;
19use obj_core::pager::Pager;
20use obj_core::platform::FileHandle;
21use obj_core::{
22 Catalog, CollectionDescriptor, Document, Error, Id, IndexStatus, ReaderSnapshot, Result, TxnEnv,
23};
24
25use crate::collection::Collection;
26
27/// `type_version` stamped on documents written via the C ABI raw-
28/// bytes path. The C caller has no Rust `Document::VERSION`; we
29/// stamp 1 so the value is recognisable in dump output and the
30/// existing schema-version-from-future / migration logic still
31/// applies when a Rust-typed reader opens the same collection.
32///
33/// Bumping this constant is breaking for any consumer that has
34/// written raw-bytes data with the old value, so leave at 1
35/// pre-1.0.
36pub(crate) const RAW_BYTES_TYPE_VERSION: u32 = 1;
37
38/// Public write transaction.
39///
40/// Acquired by [`crate::Db::transaction`]. Holds the in-process
41/// write-serialization mutex + cross-process `WRITER_LOCK` for its
42/// entire lifetime. `commit` / `rollback` consume `self`; dropping
43/// without explicitly committing rolls back automatically.
44pub struct WriteTxn<'db> {
45 pub(crate) inner: obj_core::WriteTxn<'db, FileHandle>,
46 pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
47 /// Per-process cache of `(collection_name, version)` keys whose
48 /// `T::indexes()` reconciliation has already run. Reconciliation
49 /// is idempotent but expensive (a catalog walk + index
50 /// declarations); caching keeps the first
51 /// `WriteTxn::collection::<T>()` call per-process per
52 /// `(collection, version)` as the only one that pays the cost.
53 ///
54 /// #130 — the key includes the schema `version`: a later version of
55 /// the same collection that ADDS an index reconciles on its first
56 /// write rather than being skipped (the name-only key never let a
57 /// cross-version index addition become `Active`). See
58 /// [`crate::collection::reconcile_specs_once`] for the full
59 /// rationale and the removal-interleaving caveat.
60 ///
61 /// #93 — membership is promoted into this SHARED set ONLY after a
62 /// successful [`Self::commit`]. During a txn the keys live in the
63 /// per-txn [`Self::reconciled_staged`] set instead, so a
64 /// rolled-back first-ever txn never poisons this set into skipping
65 /// reconciliation on a later (committed) txn in the same process.
66 pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
67 /// #93 — per-transaction staged set of `(collection, version)` keys
68 /// whose `T::indexes()` reconciliation has run INSIDE this (not-yet-
69 /// committed) txn. The skip-check in
70 /// [`crate::collection::reconcile_indexes_once`] is `shared ∪
71 /// staged`, so a second handle of the same `(collection, version)`
72 /// in one txn still skips the (idempotent) catalog walk — but the
73 /// keys are only folded into the shared [`Self::reconciled`] set by
74 /// [`Self::commit`] AFTER the WAL commit succeeds. On rollback /
75 /// drop this set is discarded with NO shared-set mutation, so a
76 /// rolled-back lazy-create leaves the shared cache untouched and
77 /// the next txn re-reconciles correctly.
78 ///
79 /// Not behind a mutex: a `WriteTxn` is single-threaded (it holds
80 /// the write-serialization lock for its whole life and is borrowed
81 /// `&mut` by every write), so interior mutability via the staging
82 /// helpers is sufficient.
83 pub(crate) reconciled_staged: HashSet<(String, u32)>,
84 /// #90 — batch-aware catalog flush. Per-transaction cache of the
85 /// LIVE [`CollectionDescriptor`] for every collection touched by
86 /// a write, keyed by collection name. This is the SOLE mid-txn
87 /// source of truth for `next_id`, `primary_root`, and each
88 /// index's `root_page_id`: every write bumps / advances these
89 /// IN-MEMORY rather than rewriting the catalog B-tree per doc.
90 /// [`Self::commit`] flushes each entry back through
91 /// `Catalog::update` exactly ONCE (see
92 /// [`Self::flush_descriptors`]). On rollback / drop the cache is
93 /// discarded with no catalog-tree side effects.
94 ///
95 /// The `Arc<Mutex<_>>` is cloned into each [`Collection`]'s
96 /// `WriteRef` so two handles of the same collection opened in one
97 /// txn share the single entry.
98 pub(crate) descriptors: crate::collection::DescriptorCache,
99}
100
101impl<'db> WriteTxn<'db> {
102 /// Construct a `WriteTxn` directly from its three pieces.
103 /// Public so the FFI layer ([`libobj`](../../libobj/index.html))
104 /// can build an owned write txn whose lifetime extends past a
105 /// single `Db::transaction` closure call.
106 ///
107 /// User-Rust callers should reach for `Db::transaction` — the
108 /// closure shape handles commit / rollback / drop semantics
109 /// correctly without needing direct construction.
110 #[doc(hidden)]
111 #[must_use]
112 pub fn from_parts(
113 inner: obj_core::WriteTxn<'db, FileHandle>,
114 catalog: Arc<Mutex<Catalog<FileHandle>>>,
115 reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
116 ) -> Self {
117 Self {
118 inner,
119 catalog,
120 reconciled,
121 reconciled_staged: HashSet::new(),
122 descriptors: crate::collection::new_descriptor_cache(),
123 }
124 }
125
126 pub(crate) fn new(
127 inner: obj_core::WriteTxn<'db, FileHandle>,
128 catalog: Arc<Mutex<Catalog<FileHandle>>>,
129 reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
130 ) -> Self {
131 Self::from_parts(inner, catalog, reconciled)
132 }
133
134 /// Open a typed handle to the collection `T` lives in.
135 ///
136 /// Lazily creates the catalog row + an empty primary B-tree on
137 /// first call for a given `T` inside the current process. The
138 /// catalog mutation is staged in the same WAL transaction as
139 /// the user's subsequent writes — a rolled-back txn leaves no
140 /// half-created collection.
141 ///
142 /// # Errors
143 ///
144 /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
145 /// - Any error the pager / B-tree / postcard codec returns.
146 pub fn collection<T: Document>(&mut self) -> Result<Collection<'_, T>> {
147 // M11 #93: namespaced collections live in attached
148 // read-only databases. Reject writes eagerly at handle-
149 // open time so the caller does not chase an opaque "no
150 // such collection" error later.
151 if let (Some(namespace), tail) = crate::db::split_namespace(T::COLLECTION) {
152 return Err(Error::AttachedDatabaseIsReadOnly {
153 namespace: namespace.to_owned(),
154 collection: tail.to_owned(),
155 });
156 }
157 Collection::open_or_create(self)
158 }
159
160 /// Commit the transaction.
161 ///
162 /// #90: flushes every cached [`CollectionDescriptor`] back to the
163 /// catalog (one `Catalog::update` per touched collection) BEFORE
164 /// the WAL commit, so the coalesced `next_id` / `primary_root` /
165 /// index-root advances land durably in the same transaction as the
166 /// document + index writes. A flush failure aborts the commit (the
167 /// `?` propagates and `self` drops, rolling the WAL back) rather
168 /// than committing a half-flushed catalog.
169 ///
170 /// #93: AFTER the WAL commit succeeds, fold this txn's staged
171 /// reconciled-collection names into the shared per-process
172 /// `reconciled` set, so the expensive `T::indexes()`
173 /// reconciliation is skipped for those collections on later txns.
174 /// Promotion is deliberately POST-commit: a rolled-back txn never
175 /// reaches here, so it cannot poison the shared cache into skipping
176 /// reconciliation against a catalog whose index rows it just rolled
177 /// back. A poisoned `reconciled` mutex maps to `Error::Busy` (Rule
178 /// 7) but does NOT un-commit the durable WAL state.
179 ///
180 /// # Errors
181 ///
182 /// As [`obj_core::WriteTxn::commit`], plus any catalog / pager /
183 /// postcard error surfaced by the descriptor flush, plus
184 /// [`Error::Busy`] if the shared `reconciled` mutex is poisoned at
185 /// promotion time (after the commit has already landed durably).
186 pub fn commit(self) -> Result<()> {
187 self.flush_descriptors()?;
188 // Move the pieces out before consuming `inner`: `commit`
189 // takes `self` by value, and `self.inner.commit()` moves the
190 // inner txn, so the shared/staged handles must be captured
191 // first.
192 let Self {
193 inner,
194 reconciled,
195 reconciled_staged,
196 ..
197 } = self;
198 inner.commit()?;
199 promote_reconciled(&reconciled, reconciled_staged)
200 }
201
202 /// #90: persist every cached descriptor back to the catalog
203 /// B-tree exactly once. Called by [`Self::commit`] before the WAL
204 /// commit. Iterates the per-txn descriptor cache (one entry per
205 /// touched collection — Rule 2 bound is the touched-collection
206 /// count) and issues one `Catalog::update` apiece, propagating the
207 /// first failure via `?` so a partial flush aborts the commit.
208 ///
209 /// Lock order matches every write path: pager BEFORE catalog (the
210 /// descriptor-cache lock is acquired and released first, so it is
211 /// never held across the pager/catalog locks).
212 fn flush_descriptors(&self) -> Result<()> {
213 let entries: Vec<(String, CollectionDescriptor)> = {
214 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
215 if cache.is_empty() {
216 return Ok(());
217 }
218 cache
219 .iter()
220 .map(|(name, descriptor)| (name.clone(), descriptor.clone()))
221 .collect()
222 };
223 let mut pager = lock_pager(self.inner.env())?;
224 let mut catalog = lock_catalog(&self.catalog)?;
225 for (name, descriptor) in &entries {
226 catalog.update(&mut pager, name, descriptor)?;
227 }
228 Ok(())
229 }
230
231 /// Roll the transaction back.
232 ///
233 /// # Errors
234 ///
235 /// As [`obj_core::WriteTxn::rollback`].
236 pub fn rollback(self) -> Result<()> {
237 self.inner.rollback()
238 }
239
240 /// **FFI shim**: insert a raw-bytes document into `collection`,
241 /// returning the freshly-allocated [`Id`].
242 ///
243 /// The payload is stored as-is; the on-disk record carries the
244 /// standard 16-byte [`DocumentHeader`] with
245 /// `type_version = RAW_BYTES_TYPE_VERSION` and a CRC32C of the
246 /// payload. The collection is lazy-created in the same WAL
247 /// transaction if it does not already exist.
248 ///
249 /// **Index maintenance does NOT run** on the raw-bytes path —
250 /// the C ABI's caller has no schema introspection. Documents
251 /// inserted through this path are invisible to indexes built
252 /// by typed [`Document`] writers, until a Rust-side
253 /// `WriteTxn::collection::<T>()` rewrites them.
254 ///
255 /// Forwards to [`Self::insert_with_version`] with
256 /// `type_version = RAW_BYTES_TYPE_VERSION`. Callers that have
257 /// a meaningful schema version to stamp (e.g. obj-py's typed
258 /// path, which knows `cls.__obj_version__`) should call
259 /// [`Self::insert_with_version`] directly.
260 ///
261 /// # Errors
262 ///
263 /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
264 /// collections (attached dbs are read-only).
265 /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
266 /// B-tree inline cap.
267 /// - Pager / catalog errors propagated.
268 #[doc(hidden)]
269 pub fn insert_raw_bytes(&mut self, collection: &str, payload: &[u8]) -> Result<Id> {
270 self.insert_with_version(collection, payload, RAW_BYTES_TYPE_VERSION)
271 }
272
273 /// **Engine API**: insert a raw-bytes document into `collection`
274 /// with a caller-supplied `type_version` stamped in the per-doc
275 /// header. Returns the freshly-allocated [`Id`].
276 ///
277 /// This is the byte-identity entry point for cross-language
278 /// writers (obj-py's typed `Db.insert(instance)` calls this
279 /// with `cls.__obj_version__`, matching what Rust's
280 /// `#[derive(Document)]` stamps via [`obj_core::codec::encode`]).
281 /// Apart from the version source, the behaviour is identical to
282 /// [`Self::insert_raw_bytes`].
283 ///
284 /// **Index maintenance does NOT run** — same caveat as the
285 /// raw-bytes shim. Use [`Self::collection`] for typed writes
286 /// that need index maintenance.
287 ///
288 /// # Errors
289 ///
290 /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
291 /// collections.
292 /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
293 /// B-tree inline cap.
294 /// - Pager / catalog errors propagated.
295 #[doc(hidden)]
296 pub fn insert_with_version(
297 &mut self,
298 collection: &str,
299 payload: &[u8],
300 type_version: u32,
301 ) -> Result<Id> {
302 reject_namespaced_write(collection)?;
303 let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
304 let mut pager = lock_pager(self.inner.env())?;
305 let catalog = lock_catalog(&self.catalog)?;
306 // #90: the per-txn descriptor cache is the sole mid-txn source
307 // of truth — `next_id` is an in-memory bump, `primary_root`
308 // advances in the cache, and NO per-doc `Catalog::update`
309 // fires (the single flush is deferred to commit). This keeps
310 // the raw-bytes path coherent with the typed path if both
311 // touch the same collection in one transaction.
312 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
313 let descriptor =
314 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
315 let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || collection.to_owned())?;
316 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
317 let key = id.to_be_bytes();
318 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
319 tree.insert(&mut pager, &key, &bytes)?;
320 descriptor.primary_root = tree.root().get();
321 Ok(id)
322 }
323
324 /// **FFI shim**: fetch the raw payload of the document at `id`
325 /// in `collection`. Returns `Ok(None)` if absent. The returned
326 /// `Vec<u8>` is the payload only (the 16-byte per-doc header
327 /// is stripped).
328 ///
329 /// Forwards to [`Self::get_with_version`] and discards the
330 /// stored version. Use [`Self::get_with_version`] directly when
331 /// the caller needs the header's `type_version` (e.g. obj-py's
332 /// typed read path, which dispatches schema migration on it).
333 ///
334 /// # Errors
335 ///
336 /// - [`Error::CollectionNotFound`] if the collection is unknown.
337 /// - [`Error::Corruption`] if the on-disk record is malformed.
338 /// - Pager / catalog errors propagated.
339 #[doc(hidden)]
340 pub fn get_raw_bytes(&mut self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
341 Ok(self
342 .get_with_version(collection, id)?
343 .map(|(payload, _version)| payload))
344 }
345
346 /// **Engine API**: fetch the raw payload AND stored
347 /// `type_version` of the document at `id` in `collection`.
348 /// Returns `Ok(None)` if absent.
349 ///
350 /// Companion read accessor for the version-aware write path
351 /// ([`Self::insert_with_version`]) — used by obj-py's typed
352 /// read pipeline to dispatch directly on the stored header
353 /// version instead of the historical try-decode-walk heuristic.
354 ///
355 /// # Errors
356 ///
357 /// As [`Self::get_raw_bytes`].
358 #[doc(hidden)]
359 pub fn get_with_version(&mut self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
360 // #90: a write-side raw get must descend the LIVE primary root
361 // — prefer the per-txn cache (which carries any uncommitted
362 // primary-root advance from a prior raw write in this txn) and
363 // fall back to the catalog tree if the collection has not been
364 // written in the txn.
365 let descriptor = self.live_descriptor_required(collection)?;
366 let mut pager = lock_pager(self.inner.env())?;
367 let tree = btree_handle(&pager, descriptor.primary_root)?;
368 let key = id.to_be_bytes();
369 match tree.get(&mut pager, &key)? {
370 Some(bytes) => Ok(Some(strip_raw_payload_with_version(
371 &bytes,
372 descriptor.collection_id,
373 )?)),
374 None => Ok(None),
375 }
376 }
377
378 /// #90: resolve the LIVE descriptor for `collection`, preferring
379 /// the per-txn cache (with its in-memory root advances) over the
380 /// catalog tree. Returns `CollectionNotFound` if neither has it.
381 /// Returns an owned clone so the caller does not hold the cache
382 /// lock across the subsequent pager work.
383 fn live_descriptor_required(&self, collection: &str) -> Result<CollectionDescriptor> {
384 {
385 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
386 if let Some(descriptor) = cache.get(collection) {
387 return Ok(descriptor.clone());
388 }
389 }
390 catalog_get_required(&self.inner, &self.catalog, collection)
391 }
392
393 /// **FFI shim**: update the document at `id` in `collection`
394 /// to `payload` bytes. Returns [`Error::DocumentNotFound`] if
395 /// the id is absent.
396 ///
397 /// Forwards to [`Self::update_with_version`] with
398 /// `type_version = RAW_BYTES_TYPE_VERSION`.
399 ///
400 /// # Errors
401 ///
402 /// - [`Error::DocumentNotFound`] if `id` does not exist.
403 /// - [`Error::AttachedDatabaseIsReadOnly`] / [`Error::DocumentTooLarge`]
404 /// etc as [`Self::insert_raw_bytes`].
405 #[doc(hidden)]
406 pub fn update_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
407 self.update_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
408 }
409
410 /// **Engine API**: update the document at `id` in `collection`
411 /// to `payload` bytes, stamping the per-doc header's
412 /// `type_version` with the caller-supplied value.
413 ///
414 /// Companion to [`Self::insert_with_version`] for the typed
415 /// write path.
416 ///
417 /// # Errors
418 ///
419 /// As [`Self::update_raw_bytes`].
420 #[doc(hidden)]
421 pub fn update_with_version(
422 &mut self,
423 collection: &str,
424 id: Id,
425 payload: &[u8],
426 type_version: u32,
427 ) -> Result<()> {
428 reject_namespaced_write(collection)?;
429 // #90: route the descriptor through the per-txn cache so the
430 // primary-root advance coalesces into the single commit flush.
431 let exists = self.collection_exists(collection)?;
432 if !exists {
433 return Err(Error::CollectionNotFound {
434 name: collection.to_owned(),
435 });
436 }
437 let mut pager = lock_pager(self.inner.env())?;
438 let catalog = lock_catalog(&self.catalog)?;
439 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
440 let descriptor =
441 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
442 let key = id.to_be_bytes();
443 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
444 if tree.get(&mut pager, &key)?.is_none() {
445 // The collection name is owned (`String`); the
446 // DocumentNotFound variant's `collection` is `&'static
447 // str` (designed for typed Document::COLLECTION). Use a
448 // bespoke variant when widening would land — for v0 the
449 // closest match is Corruption with a synthetic page_id,
450 // but that's misleading; use InvalidArgument with the
451 // standard "not found" semantics surfaced by the caller.
452 return Err(Error::CollectionNotFound {
453 name: format!("{collection}#{}", id.get()),
454 });
455 }
456 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
457 tree.delete(&mut pager, &key)?;
458 tree.insert(&mut pager, &key, &bytes)?;
459 descriptor.primary_root = tree.root().get();
460 Ok(())
461 }
462
463 /// **FFI shim**: delete the document at `id` in `collection`.
464 /// Returns `Ok(true)` if it existed, `Ok(false)` if not.
465 ///
466 /// # Errors
467 ///
468 /// Pager / catalog errors propagated.
469 #[doc(hidden)]
470 pub fn delete_raw_bytes(&mut self, collection: &str, id: Id) -> Result<bool> {
471 reject_namespaced_write(collection)?;
472 if !self.collection_exists(collection)? {
473 return Err(Error::CollectionNotFound {
474 name: collection.to_owned(),
475 });
476 }
477 let mut pager = lock_pager(self.inner.env())?;
478 let catalog = lock_catalog(&self.catalog)?;
479 // #90: advance the primary root in the per-txn cache; the
480 // single catalog flush is deferred to commit.
481 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
482 let descriptor =
483 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
484 let key = id.to_be_bytes();
485 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
486 let removed = tree.delete(&mut pager, &key)?;
487 descriptor.primary_root = tree.root().get();
488 Ok(removed)
489 }
490
491 /// **Engine API**: count every document in `collection` as seen
492 /// inside THIS write transaction — i.e. the count reflects the
493 /// txn's own uncommitted inserts / deletes, consistent with
494 /// [`Self::get_with_version`]'s live-read semantics.
495 ///
496 /// Companion to the snapshot-isolated read-side
497 /// [`ReadTxn::count_all_raw`]: the write side descends the LIVE
498 /// primary B-tree at the per-txn cache's current `primary_root`
499 /// (preferring any in-memory root advance from a prior raw write
500 /// in this txn) rather than a pinned reader snapshot. Used by
501 /// obj-py's `WriteCollection.count_all()` so a typed collection
502 /// handle can count uncommitted state.
503 ///
504 /// The full-tree scan does not decode records; the iteration
505 /// count is bounded by the primary tree's entry count and the
506 /// running total is `checked_add`-guarded (Rule 2 / Rule 7) so a
507 /// `> u64::MAX` count surfaces as [`Error::BTreeInvariantViolated`]
508 /// rather than wrapping.
509 ///
510 /// # Errors
511 ///
512 /// - [`Error::CollectionNotFound`] if `collection` does not exist.
513 /// - Pager / B-tree errors propagated from the descent / scan.
514 #[doc(hidden)]
515 pub fn count_all_raw(&mut self, collection: &str) -> Result<u64> {
516 let descriptor = self.live_descriptor_required(collection)?;
517 let mut pager = lock_pager(self.inner.env())?;
518 let tree = btree_handle(&pager, descriptor.primary_root)?;
519 let mut n: u64 = 0;
520 for step in tree.range(&mut pager, ..)? {
521 let _ = step?;
522 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
523 reason: "primary tree entry count exceeds u64",
524 })?;
525 }
526 Ok(n)
527 }
528
529 /// **FFI shim**: insert-or-replace the document at `id` in
530 /// `collection` to `payload` bytes.
531 ///
532 /// Forwards to [`Self::upsert_with_version`] with
533 /// `type_version = RAW_BYTES_TYPE_VERSION`.
534 ///
535 /// # Errors
536 ///
537 /// As [`Self::insert_raw_bytes`].
538 #[doc(hidden)]
539 pub fn upsert_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
540 self.upsert_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
541 }
542
543 /// **Engine API**: insert-or-replace the document at `id` in
544 /// `collection`, stamping the per-doc header's `type_version`
545 /// with the caller-supplied value. Companion to
546 /// [`Self::insert_with_version`] for the typed upsert path.
547 ///
548 /// # Errors
549 ///
550 /// As [`Self::insert_with_version`].
551 #[doc(hidden)]
552 pub fn upsert_with_version(
553 &mut self,
554 collection: &str,
555 id: Id,
556 payload: &[u8],
557 type_version: u32,
558 ) -> Result<()> {
559 reject_namespaced_write(collection)?;
560 let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
561 let mut pager = lock_pager(self.inner.env())?;
562 let catalog = lock_catalog(&self.catalog)?;
563 // #90: advance the primary root in the per-txn cache; flush
564 // once at commit.
565 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
566 let descriptor =
567 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
568 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
569 let key = id.to_be_bytes();
570 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
571 let _ = tree.delete(&mut pager, &key)?;
572 tree.insert(&mut pager, &key, &bytes)?;
573 descriptor.primary_root = tree.root().get();
574 Ok(())
575 }
576
577 // ---------- index-maintaining raw-bytes writes (FFI) ----------
578
579 /// **Engine API**: insert a raw-bytes document into `collection`
580 /// AND maintain the named secondary indexes from caller-supplied
581 /// field-encoded keys. Returns the freshly-allocated [`Id`].
582 ///
583 /// Unlike [`Self::insert_raw_bytes`] (primary-only), this is the
584 /// schema-bearing raw write: the C ABI cannot reflect index keys
585 /// out of an opaque payload, so the CALLER supplies one
586 /// `(index_name, field_key)` entry per index value, where
587 /// `field_key` is the order-preserving encoding of the indexed
588 /// field (produced by `obj_core::index::encode_field` —
589 /// `libobj::obj_index_key_encode` wraps it). obj does the
590 /// kind-specific STORAGE-key composition (append the `Id` suffix
591 /// for `Standard` / `Each` / `Composite`; use the field key as-is
592 /// with the `Id` as value + enforce uniqueness for `Unique`),
593 /// matching the typed path byte-for-byte.
594 ///
595 /// Atomicity: the primary insert + every index maintenance lands
596 /// inside the same WAL transaction. An unknown index name
597 /// ([`Error::IndexNotFound`]) or a uniqueness violation
598 /// ([`Error::UniqueConstraintViolation`]) surfaces via `?` and the
599 /// surrounding [`WriteTxn`] rolls back the staged primary write
600 /// atomically — no half-written index.
601 ///
602 /// # Errors
603 ///
604 /// - [`Error::IndexNotFound`] if an entry names an index that does
605 /// not exist or is not `Active` on `collection`.
606 /// - [`Error::UniqueConstraintViolation`] if a `Unique` entry's
607 /// key already maps to a different document.
608 /// - As [`Self::insert_with_version`] (namespaced / too-large /
609 /// pager / catalog).
610 pub fn insert_raw_indexed(
611 &mut self,
612 collection: &str,
613 payload: &[u8],
614 type_version: u32,
615 entries: &[(String, Vec<u8>)],
616 ) -> Result<Id> {
617 let id = self.insert_with_version(collection, payload, type_version)?;
618 self.maintain_raw_indexes(collection, id, &[], entries)?;
619 Ok(id)
620 }
621
622 /// **Engine API**: update the document at `id` in `collection` to
623 /// `payload` AND move its secondary-index entries from the OLD
624 /// caller-supplied field keys to the NEW ones.
625 ///
626 /// obj cannot re-derive the OLD index keys from the stored opaque
627 /// bytes, so the caller MUST supply BOTH the `remove` set (the
628 /// field keys the document indexed under before this update) and
629 /// the `add` set (the field keys it indexes under after). Each is
630 /// one `(index_name, field_key)` entry per index value. The
631 /// kind-specific composition + uniqueness enforcement matches
632 /// [`Self::insert_raw_indexed`].
633 ///
634 /// Atomicity: the primary update + every index removal/insertion
635 /// lands in the same WAL transaction; any error rolls the whole
636 /// thing back.
637 ///
638 /// # Errors
639 ///
640 /// - [`Error::CollectionNotFound`] if `id` does not exist.
641 /// - [`Error::IndexNotFound`] / [`Error::UniqueConstraintViolation`]
642 /// as [`Self::insert_raw_indexed`].
643 /// - As [`Self::update_with_version`].
644 pub fn update_raw_indexed(
645 &mut self,
646 collection: &str,
647 id: Id,
648 payload: &[u8],
649 type_version: u32,
650 remove: &[(String, Vec<u8>)],
651 add: &[(String, Vec<u8>)],
652 ) -> Result<()> {
653 self.update_with_version(collection, id, payload, type_version)?;
654 self.maintain_raw_indexes(collection, id, remove, add)?;
655 Ok(())
656 }
657
658 /// **Engine API**: delete the document at `id` in `collection`
659 /// AND remove its secondary-index entries given the caller-
660 /// supplied OLD field keys. Returns `Ok(true)` if the primary
661 /// record existed, `Ok(false)` if not.
662 ///
663 /// As with [`Self::update_raw_indexed`], obj cannot re-derive the
664 /// index keys from stored bytes, so the caller supplies the
665 /// `remove` set (one `(index_name, field_key)` per indexed value).
666 /// The index removals always run (even on `Ok(false)`) so a caller
667 /// can repair a known-stale index entry; this mirrors the typed
668 /// `Collection::delete` which also diffs against the supplied OLD
669 /// key set regardless of primary presence.
670 ///
671 /// # Errors
672 ///
673 /// - [`Error::IndexNotFound`] if a `remove` entry names an unknown
674 /// / non-`Active` index.
675 /// - As [`Self::delete_raw_bytes`].
676 pub fn delete_raw_indexed(
677 &mut self,
678 collection: &str,
679 id: Id,
680 remove: &[(String, Vec<u8>)],
681 ) -> Result<bool> {
682 let removed = self.delete_raw_bytes(collection, id)?;
683 self.maintain_raw_indexes(collection, id, remove, &[])?;
684 Ok(removed)
685 }
686
687 /// **Engine API**: declare / reconcile a runtime [`obj_core::IndexSpec`] set
688 /// into the catalog for `collection`, making each `Active` BEFORE
689 /// any index-maintaining raw write ([`Self::insert_raw_indexed`] &c.
690 /// require the index already `Active`).
691 ///
692 /// This is the NON-generic equivalent of the `#[derive(Document)]`
693 /// reconcile path (`WriteTxn::collection::<T>()`, which reflects
694 /// `T::indexes()`): a caller that has no Rust `Document` type — the
695 /// obj-py / FFI index-declaration path (#108) — supplies the specs
696 /// directly. Both share ONE body
697 /// (`reconcile_specs_once`) so the cache /
698 /// staging / validation / catalog-walk semantics never diverge.
699 ///
700 /// Lazy-creates the collection's catalog row + empty primary B-tree
701 /// on first call (as the typed path does), then runs the same
702 /// `shared ∪ staged` skip-cache: a SECOND call with the same
703 /// `(collection, version)` is a no-op (the underlying
704 /// `Catalog::reconcile_indexes` is itself idempotent for matching
705 /// `(name, kind, key_paths)`). The catalog mutation is staged in the
706 /// live WAL transaction — a rolled-back txn leaves no half-declared
707 /// index, and the per-process reconciled cache is only promoted on a
708 /// successful [`Self::commit`].
709 ///
710 /// # `version` (#130)
711 ///
712 /// The skip-cache is keyed by `(collection, version)`, not by
713 /// `collection` alone, so a LATER schema `version` of the same
714 /// collection that ADDS an index reconciles on its first call rather
715 /// than being skipped. The caller passes the schema version the
716 /// `specs` belong to (e.g. the typed `Document::VERSION` or the
717 /// obj-py `@document` version). One narrow caveat applies when two
718 /// live versions of one collection declare DIFFERENT (conflicting)
719 /// index sets and their writes interleave in a single process — see
720 /// the `reconcile_specs_once` internal docs. Index ADDITION (the
721 /// common monotonic case) is fully correct.
722 ///
723 /// # Errors
724 ///
725 /// - [`Error::InvalidArgument`] if any spec is malformed (validated
726 /// before any catalog mutation).
727 /// - [`Error::IndexKindMismatch`] / [`Error::IndexKeyPathsMismatch`]
728 /// if a spec re-declares an existing `Active` index with a
729 /// different `(kind, key_paths)`.
730 /// - [`Error::Busy`] on a poisoned pager / catalog mutex.
731 /// - Pager / B-tree / postcard errors propagated.
732 pub fn reconcile_indexes_raw(
733 &mut self,
734 collection: &str,
735 version: u32,
736 specs: &[obj_core::IndexSpec],
737 ) -> Result<()> {
738 // Lazy-create the collection first: `reconcile_indexes` errors
739 // with `CollectionNotFound` if the catalog row is absent, and an
740 // FFI caller declaring indexes on a brand-new collection should
741 // not have to issue a separate create. Mirrors the typed path's
742 // `ensure_collection::<T>` → `reconcile_indexes_once::<T>` order.
743 let _descriptor = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
744 crate::collection::reconcile_specs_once(
745 &self.inner,
746 &self.catalog,
747 &self.reconciled,
748 &mut self.reconciled_staged,
749 collection,
750 version,
751 specs,
752 )
753 }
754
755 /// Apply the per-index removal (`old`) + addition (`new`) churn
756 /// for a raw-bytes write, composing the storage key per index
757 /// kind via the shared non-generic seam
758 /// [`crate::index_maint::maintain_index_from_keys`].
759 ///
760 /// Resolves each `(index_name, field_key)` entry to its `Active`
761 /// [`obj_core::IndexDescriptor`] by name, groups the OLD and NEW
762 /// field keys per index, and maintains every index touched by
763 /// either set. The (possibly COW-advanced) descriptor is persisted
764 /// back to the catalog once. Runs entirely under the pager +
765 /// catalog locks held for the whole call, inside the live WAL
766 /// transaction — so a mid-way error rolls back atomically.
767 fn maintain_raw_indexes(
768 &mut self,
769 collection: &str,
770 id: Id,
771 old: &[(String, Vec<u8>)],
772 new: &[(String, Vec<u8>)],
773 ) -> Result<()> {
774 if old.is_empty() && new.is_empty() {
775 return Ok(());
776 }
777 let mut pager = lock_pager(self.inner.env())?;
778 let catalog = lock_catalog(&self.catalog)?;
779 // #90: maintain against the per-txn cached descriptor (the
780 // sole mid-txn source of truth) so the unique pre-check sees
781 // prior eager index writes in this txn and the index-root
782 // advances accumulate in the cache. NO per-call
783 // `Catalog::update` — the single flush is deferred to commit.
784 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
785 let descriptor =
786 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
787 let touched = touched_index_names(old, new);
788 for index_name in &touched {
789 maintain_one_raw_index(&mut pager, descriptor, collection, index_name, old, new, id)?;
790 }
791 Ok(())
792 }
793
794 /// #90: does `collection` have a catalog row (either already in
795 /// the per-txn descriptor cache, or in the live catalog tree)?
796 /// Used by the raw update / delete paths to preserve their
797 /// `CollectionNotFound` contract before routing the descriptor
798 /// through the cache.
799 fn collection_exists(&self, collection: &str) -> Result<bool> {
800 {
801 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
802 if cache.contains_key(collection) {
803 return Ok(true);
804 }
805 }
806 let mut pager = lock_pager(self.inner.env())?;
807 let catalog = lock_catalog(&self.catalog)?;
808 Ok(catalog.get(&mut pager, collection)?.is_some())
809 }
810}
811
812/// Public read transaction. Acquired by
813/// [`crate::Db::read_transaction`].
814///
815/// Carries only the obj-core read-side handle — the writer's live
816/// `Catalog` is NOT held by a `ReadTxn` because reads consult the
817/// snapshot-pinned catalog root via
818/// [`obj_core::Catalog::lookup_via_snapshot`] (M6 #53), not the
819/// live `Catalog.tree.root`.
820///
821/// M11 #93: a `ReadTxn` MAY also carry one `AttachedReadCtx` per
822/// attached database registered on the calling [`crate::Db`]. The
823/// per-attached snapshots are pinned at txn-begin time and released
824/// when the `ReadTxn` drops; reads against `<namespace>.<collection>`
825/// route through them.
826pub struct ReadTxn<'db> {
827 pub(crate) inner: obj_core::ReadTxn<'db, FileHandle>,
828 /// Per-attached-database read contexts, keyed by namespace.
829 /// Populated by [`crate::Db::read_transaction`] before the
830 /// closure runs; emptied when the txn drops.
831 pub(crate) attached: HashMap<String, AttachedReadCtx>,
832}
833
834impl<'db> ReadTxn<'db> {
835 /// Construct a `ReadTxn` from a bare obj-core handle. Public so
836 /// the FFI layer can build an owned read txn whose lifetime
837 /// extends past a single `Db::read_transaction` closure call.
838 ///
839 /// User-Rust callers should reach for `Db::read_transaction`.
840 #[doc(hidden)]
841 #[must_use]
842 pub fn from_parts(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
843 Self {
844 inner,
845 attached: HashMap::new(),
846 }
847 }
848
849 pub(crate) fn new(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
850 Self::from_parts(inner)
851 }
852
853 pub(crate) fn with_attached(
854 inner: obj_core::ReadTxn<'db, FileHandle>,
855 attached: HashMap<String, AttachedReadCtx>,
856 ) -> Self {
857 Self { inner, attached }
858 }
859
860 /// Resolve a (possibly namespaced) collection name to the
861 /// `(env, snapshot, lookup_name)` the raw-bytes read shims should
862 /// read through. A bare `"collection"` resolves against the
863 /// calling Db's own snapshot exactly as before; a
864 /// `"<ns>.<tail>"` name resolves against the read-only database
865 /// attached under `<ns>` (its pinned snapshot), with the namespace
866 /// prefix stripped for the catalog lookup.
867 ///
868 /// Mirrors the namespace dispatch in
869 /// [`crate::collection::Collection::open_readonly_named`] — the
870 /// only other namespace-aware read path — so both honour the same
871 /// `<ns>.<tail>` → attached-snapshot rule.
872 ///
873 /// # Errors
874 ///
875 /// - [`Error::CollectionNamespaceUnknown`] if `collection`
876 /// carries a namespace prefix that is not attached.
877 fn resolve_read_target<'a>(&'a self, collection: &'a str) -> Result<ReadTarget<'a>> {
878 let (namespace, tail) = crate::db::split_namespace(collection);
879 match namespace {
880 None => Ok(ReadTarget {
881 env: self.inner.env(),
882 snapshot: self.inner.snapshot(),
883 lookup_name: collection,
884 }),
885 Some(ns) => {
886 let ctx =
887 self.attached
888 .get(ns)
889 .ok_or_else(|| Error::CollectionNamespaceUnknown {
890 namespace: ns.to_owned(),
891 })?;
892 Ok(ReadTarget {
893 env: ctx.env.as_ref(),
894 snapshot: &ctx.snapshot,
895 lookup_name: tail,
896 })
897 }
898 }
899 }
900
901 /// **FFI shim**: fetch the raw payload of the document at `id`
902 /// in `collection`, snapshot-consistent against the read txn's
903 /// pinned LSN. Returns `Ok(None)` if absent.
904 ///
905 /// Forwards to [`Self::get_with_version`] and discards the
906 /// stored version.
907 ///
908 /// # Errors
909 ///
910 /// - [`Error::CollectionNotFound`] if the collection is unknown.
911 /// - [`Error::Corruption`] if the on-disk record is malformed.
912 /// - Pager / catalog errors propagated.
913 #[doc(hidden)]
914 pub fn get_raw_bytes(&self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
915 Ok(self
916 .get_with_version(collection, id)?
917 .map(|(payload, _version)| payload))
918 }
919
920 /// **Engine API**: fetch the raw payload AND stored
921 /// `type_version` of the document at `id` in `collection`,
922 /// snapshot-consistent against the read txn's pinned LSN.
923 /// Returns `Ok(None)` if absent.
924 ///
925 /// Companion read accessor for the version-aware write path
926 /// ([`WriteTxn::insert_with_version`]) — used by obj-py's
927 /// typed read pipeline to dispatch on the stored header
928 /// version instead of the historical try-decode-walk heuristic.
929 ///
930 /// # Errors
931 ///
932 /// As [`Self::get_raw_bytes`].
933 #[doc(hidden)]
934 pub fn get_with_version(&self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
935 let target = self.resolve_read_target(collection)?;
936 let descriptor = target.collection_descriptor(collection)?;
937 let pager = lock_pager(target.env)?;
938 let root = PageId::new(descriptor.primary_root)
939 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
940 let key = id.to_be_bytes();
941 let bytes = obj_core::btree::BTree::<FileHandle>::get_via_snapshot(
942 &pager,
943 target.snapshot,
944 root,
945 &key,
946 )?;
947 match bytes {
948 Some(b) => Ok(Some(strip_raw_payload_with_version(
949 &b,
950 descriptor.collection_id,
951 )?)),
952 None => Ok(None),
953 }
954 }
955
956 /// **FFI shim**: look up the descriptor for `collection`
957 /// against the snapshot. Returns `Ok(None)` if absent.
958 ///
959 /// Used by [`libobj`](../../libobj/index.html) for query /
960 /// iteration entry points.
961 ///
962 /// # Errors
963 ///
964 /// - Pager / catalog errors propagated.
965 #[doc(hidden)]
966 pub fn snapshot_descriptor(&self, collection: &str) -> Result<Option<CollectionDescriptor>> {
967 read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)
968 }
969
970 /// **FFI shim**: borrow the wrapped obj-core read txn. Used by
971 /// [`libobj`](../../libobj/index.html) iterators that need
972 /// snapshot-aware B-tree access.
973 #[doc(hidden)]
974 #[must_use]
975 pub fn inner(&self) -> &obj_core::ReadTxn<'db, FileHandle> {
976 &self.inner
977 }
978
979 /// **FFI shim**: resolve an `Active` index descriptor by name
980 /// on `collection`. Used by libobj's range / find_unique /
981 /// count paths.
982 ///
983 /// # Errors
984 ///
985 /// - [`Error::CollectionNotFound`] if the collection is absent.
986 /// - [`Error::IndexNotFound`] if the index is unknown or
987 /// `DroppedPending`.
988 /// - Pager / catalog errors propagated.
989 #[doc(hidden)]
990 pub fn snapshot_index_descriptor(
991 &self,
992 collection: &str,
993 index: &str,
994 ) -> Result<obj_core::IndexDescriptor> {
995 let descriptor =
996 self.snapshot_descriptor(collection)?
997 .ok_or_else(|| Error::CollectionNotFound {
998 name: collection.to_owned(),
999 })?;
1000 let entry = descriptor.indexes.iter().find(|d| d.name == index);
1001 match entry {
1002 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
1003 _ => Err(Error::IndexNotFound {
1004 collection: collection.to_owned(),
1005 name: index.to_owned(),
1006 }),
1007 }
1008 }
1009
1010 /// **FFI shim**: count every doc in `collection` snapshot-
1011 /// consistently against the read txn's pinned LSN.
1012 ///
1013 /// # Errors
1014 ///
1015 /// As [`Self::snapshot_descriptor`] plus pager / B-tree.
1016 #[doc(hidden)]
1017 pub fn count_all_raw(&self, collection: &str) -> Result<u64> {
1018 let target = self.resolve_read_target(collection)?;
1019 let descriptor = target.collection_descriptor(collection)?;
1020 count_via_btree_range_full(target.env, target.snapshot, descriptor.primary_root)
1021 }
1022
1023 /// **FFI shim**: walk an index B-tree by raw-byte key range
1024 /// and collect the matching `(Id, raw_payload)` pairs. The
1025 /// caller is responsible for encoding `lower` / `upper` per the
1026 /// M7 order-preserving encoding.
1027 ///
1028 /// `lower_bound` / `upper_bound` use Rust's `std::ops::Bound`
1029 /// shape (Included / Excluded / Unbounded).
1030 ///
1031 /// Materialises every result in a `Vec` — the result set is
1032 /// bounded by [`obj_core::btree::MAX_RANGE_NODES`] inherited
1033 /// from `BTree::range`. The libobj iterator yields these one
1034 /// at a time.
1035 ///
1036 /// # Errors
1037 ///
1038 /// - [`Error::IndexNotFound`] / [`Error::CollectionNotFound`].
1039 /// - Pager / B-tree errors propagated.
1040 #[doc(hidden)]
1041 pub fn index_range_raw(
1042 &self,
1043 collection: &str,
1044 index: &str,
1045 lower: std::ops::Bound<Vec<u8>>,
1046 upper: std::ops::Bound<Vec<u8>>,
1047 ) -> Result<Vec<(Id, Vec<u8>)>> {
1048 let target = self.resolve_read_target(collection)?;
1049 let index_descriptor = target.index_descriptor(collection, index)?;
1050 let collection_descriptor = target.collection_descriptor(collection)?;
1051 let (start, end) =
1052 crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1053 let entries = collect_index_range_entries(
1054 target.env,
1055 target.snapshot,
1056 index_descriptor.root_page_id,
1057 start,
1058 end,
1059 )?;
1060 materialize_id_payload_pairs(
1061 target.env,
1062 target.snapshot,
1063 &collection_descriptor,
1064 &index_descriptor,
1065 entries,
1066 )
1067 }
1068
1069 /// **Engine API**: walk an index B-tree by raw-byte key range and
1070 /// collect the matching `(Id, type_version, raw_payload)` rows.
1071 /// Companion to [`Self::index_range_raw`] that ALSO surfaces each
1072 /// record's stored `type_version` (read from the per-doc record
1073 /// header), so a typed range decode can dispatch schema migration at
1074 /// the version each record was actually written under — exactly as
1075 /// [`Self::find_unique_with_version`] does for the single-key path.
1076 ///
1077 /// `lower` / `upper` follow the same raw-byte convention as
1078 /// [`Self::index_range_raw`]; the result set is bounded identically.
1079 ///
1080 /// # Errors
1081 ///
1082 /// As [`Self::index_range_raw`].
1083 #[doc(hidden)]
1084 pub fn index_range_raw_with_version(
1085 &self,
1086 collection: &str,
1087 index: &str,
1088 lower: std::ops::Bound<Vec<u8>>,
1089 upper: std::ops::Bound<Vec<u8>>,
1090 ) -> Result<Vec<(Id, u32, Vec<u8>)>> {
1091 let target = self.resolve_read_target(collection)?;
1092 let index_descriptor = target.index_descriptor(collection, index)?;
1093 let collection_descriptor = target.collection_descriptor(collection)?;
1094 let (start, end) =
1095 crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1096 let entries = collect_index_range_entries(
1097 target.env,
1098 target.snapshot,
1099 index_descriptor.root_page_id,
1100 start,
1101 end,
1102 )?;
1103 materialize_id_version_payload_rows(
1104 target.env,
1105 target.snapshot,
1106 &collection_descriptor,
1107 &index_descriptor,
1108 entries,
1109 )
1110 }
1111
1112 /// **FFI shim**: count index B-tree entries inside `range`.
1113 /// `lower` / `upper` follow the same raw-byte convention as
1114 /// [`Self::index_range_raw`].
1115 ///
1116 /// # Errors
1117 ///
1118 /// As [`Self::index_range_raw`].
1119 #[doc(hidden)]
1120 pub fn count_index_range_raw(
1121 &self,
1122 collection: &str,
1123 index: &str,
1124 lower: std::ops::Bound<Vec<u8>>,
1125 upper: std::ops::Bound<Vec<u8>>,
1126 ) -> Result<u64> {
1127 let target = self.resolve_read_target(collection)?;
1128 let index_descriptor = target.index_descriptor(collection, index)?;
1129 let (start, end) =
1130 crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
1131 let entries = collect_index_range_entries(
1132 target.env,
1133 target.snapshot,
1134 index_descriptor.root_page_id,
1135 start,
1136 end,
1137 )?;
1138 u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
1139 reason: "index range entry count exceeds u64",
1140 })
1141 }
1142
1143 /// **FFI shim**: single-key lookup against a `Unique` index.
1144 /// Returns the matched `(Id, payload)` or `Ok(None)`.
1145 ///
1146 /// `key_bytes` is the index key, pre-encoded by the caller per
1147 /// the M7 order-preserving scheme.
1148 ///
1149 /// Forwards to [`Self::find_unique_with_version`] and discards
1150 /// the stored version.
1151 ///
1152 /// # Errors
1153 ///
1154 /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
1155 /// - As [`Self::index_range_raw`].
1156 #[doc(hidden)]
1157 pub fn find_unique_raw(
1158 &self,
1159 collection: &str,
1160 index: &str,
1161 key_bytes: &[u8],
1162 ) -> Result<Option<(Id, Vec<u8>)>> {
1163 Ok(self
1164 .find_unique_with_version(collection, index, key_bytes)?
1165 .map(|(id, payload, _version)| (id, payload)))
1166 }
1167
1168 /// **Engine API**: single-key lookup against a `Unique` index,
1169 /// returning the matched `(Id, payload, type_version)` or
1170 /// `Ok(None)`. Companion to [`Self::get_with_version`] for the
1171 /// typed find path.
1172 ///
1173 /// # Errors
1174 ///
1175 /// As [`Self::find_unique_raw`].
1176 #[doc(hidden)]
1177 pub fn find_unique_with_version(
1178 &self,
1179 collection: &str,
1180 index: &str,
1181 key_bytes: &[u8],
1182 ) -> Result<Option<(Id, Vec<u8>, u32)>> {
1183 let target = self.resolve_read_target(collection)?;
1184 let index_descriptor = target.index_descriptor(collection, index)?;
1185 if index_descriptor.kind != obj_core::IndexKind::Unique {
1186 return Err(Error::IndexNotUnique {
1187 collection: collection.to_owned(),
1188 name: index.to_owned(),
1189 });
1190 }
1191 let id_bytes = {
1192 let pager = lock_pager(target.env)?;
1193 let root = PageId::new(index_descriptor.root_page_id)
1194 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1195 BTree::<FileHandle>::get_via_snapshot(&pager, target.snapshot, root, key_bytes)?
1196 };
1197 match id_bytes {
1198 Some(bytes) => {
1199 let id = Id::from_be_bytes(&bytes).ok_or(Error::Corruption { page_id: 0 })?;
1200 match self.get_with_version(collection, id)? {
1201 Some((payload, version)) => Ok(Some((id, payload, version))),
1202 None => Err(Error::Corruption { page_id: 0 }),
1203 }
1204 }
1205 None => Ok(None),
1206 }
1207 }
1208
1209 /// Open a typed handle to the collection `T` lives in.
1210 ///
1211 /// Read-only: returns [`Error::CollectionNotFound`] if the
1212 /// collection has never been registered AT THE SNAPSHOT'S
1213 /// PINNED LSN.
1214 ///
1215 /// If `T::COLLECTION` is of the form `<namespace>.<name>`, the
1216 /// txn dispatches against the attached database registered under
1217 /// `<namespace>` instead of the calling Db.
1218 ///
1219 /// # Errors
1220 ///
1221 /// - [`Error::CollectionNotFound`] if `T::COLLECTION` is not
1222 /// registered in the catalog as-of the snapshot's pinned LSN.
1223 /// - [`Error::CollectionNamespaceUnknown`] if `T::COLLECTION`
1224 /// carries a namespace prefix that is not attached.
1225 /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
1226 pub fn collection<T: Document>(&self) -> Result<Collection<'_, T>> {
1227 Collection::open_readonly(self)
1228 }
1229}
1230
1231/// The `(env, snapshot, lookup_name)` triple a raw-bytes read shim
1232/// should read through, produced by [`ReadTxn::resolve_read_target`].
1233///
1234/// For a bare collection name the fields point at the calling Db's
1235/// own env / pinned snapshot; for a `<ns>.<tail>` name they point at
1236/// the attached read-only Db registered under `<ns>` and `lookup_name`
1237/// is the namespace-stripped `<tail>`. Every field borrows the
1238/// `ReadTxn` for `'a`, so the descriptor / B-tree reads stay pinned to
1239/// the same snapshot for the whole shim call.
1240struct ReadTarget<'a> {
1241 env: &'a TxnEnv<FileHandle>,
1242 snapshot: &'a ReaderSnapshot<FileHandle>,
1243 lookup_name: &'a str,
1244}
1245
1246impl ReadTarget<'_> {
1247 /// Resolve the collection descriptor for [`Self::lookup_name`]
1248 /// against this target's snapshot, surfacing
1249 /// [`Error::CollectionNotFound`] (under the ORIGINAL,
1250 /// possibly-namespaced name) when the collection is absent.
1251 fn collection_descriptor(&self, original: &str) -> Result<CollectionDescriptor> {
1252 read_descriptor_via_snapshot(self.env, self.snapshot, self.lookup_name)?.ok_or_else(|| {
1253 Error::CollectionNotFound {
1254 name: original.to_owned(),
1255 }
1256 })
1257 }
1258
1259 /// Resolve the `Active` index descriptor named `index` on
1260 /// [`Self::lookup_name`] against this target's snapshot. The
1261 /// `original` (possibly-namespaced) name is reported in
1262 /// [`Error::CollectionNotFound`] / [`Error::IndexNotFound`] so
1263 /// the caller sees the name it asked for.
1264 fn index_descriptor(&self, original: &str, index: &str) -> Result<obj_core::IndexDescriptor> {
1265 let descriptor = self.collection_descriptor(original)?;
1266 let entry = descriptor.indexes.iter().find(|d| d.name == index);
1267 match entry {
1268 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
1269 _ => Err(Error::IndexNotFound {
1270 collection: original.to_owned(),
1271 name: index.to_owned(),
1272 }),
1273 }
1274 }
1275}
1276
1277/// Per-attached-database read context carried inside a [`ReadTxn`].
1278/// Pins one [`ReaderSnapshot`] against the attached database for the
1279/// duration of the calling Db's read transaction.
1280pub(crate) struct AttachedReadCtx {
1281 /// Calling-side stable reference to the attached env. Cloned
1282 /// from the [`AttachedDb`]'s `env` at txn-begin time so the
1283 /// `ReadTxn` does not retain a borrow on the calling Db's
1284 /// attached-registry mutex.
1285 pub(crate) env: Arc<TxnEnv<FileHandle>>,
1286 /// Snapshot pinned at txn-begin against `env`.
1287 pub(crate) snapshot: ReaderSnapshot<FileHandle>,
1288}
1289
1290/// One attached read-only database registered on the calling
1291/// [`crate::Db`] under a namespace.
1292///
1293/// Created by [`crate::Db::attach`]; stored inside the calling Db's
1294/// `attached: Arc<Mutex<HashMap<String, AttachedDb>>>` registry.
1295/// Removed by [`crate::Db::detach`] (or by the calling Db's drop,
1296/// which transitively drops all attachments).
1297pub(crate) struct AttachedDb {
1298 /// Cloned `Arc<TxnEnv>` of the attached db, so read transactions
1299 /// pin snapshots without re-locking the registry.
1300 pub(crate) env: Arc<TxnEnv<FileHandle>>,
1301 /// Calling-side keepalive for the attached `crate::Db`. The
1302 /// underscore prefix marks it as held-for-side-effect: dropping
1303 /// this `_db` releases file locks and any other resources the
1304 /// attached open acquired.
1305 pub(crate) _db: crate::Db,
1306}
1307
1308/// #93 — fold a committed txn's staged reconciled `(collection,
1309/// version)` keys into the shared per-process `reconciled` set. Called
1310/// by [`WriteTxn::commit`] only AFTER the WAL commit has landed, so the
1311/// shared cache is never poisoned by a rolled-back lazy-create.
1312///
1313/// A no-op (no lock acquired) when nothing was staged — the common
1314/// path for a txn that opened only already-reconciled collections, so
1315/// the fast "already reconciled" path pays nothing extra.
1316///
1317/// A poisoned `reconciled` mutex maps to [`Error::Busy`] (Rule 7); the
1318/// loop is bounded by the staged-key count (one entry per distinct
1319/// `(collection, version)` lazily reconciled in the txn — Rule 2).
1320fn promote_reconciled(
1321 reconciled: &Mutex<HashSet<(String, u32)>>,
1322 staged: HashSet<(String, u32)>,
1323) -> Result<()> {
1324 if staged.is_empty() {
1325 return Ok(());
1326 }
1327 let mut shared = reconciled.lock().map_err(|_| Error::Busy {
1328 kind: obj_core::LockKind::WriterInProcess,
1329 })?;
1330 shared.extend(staged);
1331 Ok(())
1332}
1333
1334/// Acquire the catalog mutex; convert a poison error into a
1335/// `WriterInProcess` Busy. Helper shared by the public txn wrappers
1336/// and the [`Collection`] internals.
1337pub(crate) fn lock_catalog(
1338 catalog: &Mutex<Catalog<FileHandle>>,
1339) -> Result<std::sync::MutexGuard<'_, Catalog<FileHandle>>> {
1340 catalog.lock().map_err(|_| Error::Busy {
1341 kind: obj_core::LockKind::WriterInProcess,
1342 })
1343}
1344
1345// ---------- raw-bytes helpers (FFI) -------------------------------
1346
1347/// Reject a write against a namespaced collection. Attached
1348/// databases are read-only through the calling Db (M11 #93).
1349fn reject_namespaced_write(collection: &str) -> Result<()> {
1350 if let (Some(namespace), tail) = crate::db::split_namespace(collection) {
1351 return Err(Error::AttachedDatabaseIsReadOnly {
1352 namespace: namespace.to_owned(),
1353 collection: tail.to_owned(),
1354 });
1355 }
1356 Ok(())
1357}
1358
1359/// Acquire the env's pager mutex; map poison into Busy.
1360fn lock_pager(env: &TxnEnv<FileHandle>) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1361 env.pager().lock().map_err(|_| Error::Busy {
1362 kind: obj_core::LockKind::WriterInProcess,
1363 })
1364}
1365
1366/// Collect the de-duplicated set of index names touched by either the
1367/// `old` (remove) or `new` (add) entry list, preserving first-seen
1368/// order. Bounded by `old.len() + new.len()` — the caller's supplied
1369/// entry count (Rule 2).
1370fn touched_index_names(old: &[(String, Vec<u8>)], new: &[(String, Vec<u8>)]) -> Vec<String> {
1371 let mut names: Vec<String> = Vec::new();
1372 for (name, _key) in old.iter().chain(new.iter()) {
1373 if !names.iter().any(|n| n == name) {
1374 names.push(name.clone());
1375 }
1376 }
1377 names
1378}
1379
1380/// Gather every field-encoded key in `entries` whose index name
1381/// equals `index_name`, wrapped as [`EncodedIndexKey`] for the
1382/// composition seam. Order follows the caller's entry order.
1383fn keys_for_index(entries: &[(String, Vec<u8>)], index_name: &str) -> Vec<EncodedIndexKey> {
1384 entries
1385 .iter()
1386 .filter(|(name, _key)| name == index_name)
1387 .map(|(_name, key)| EncodedIndexKey::from_bytes(key.clone()))
1388 .collect()
1389}
1390
1391/// Resolve `index_name` to its `Active` descriptor index on
1392/// `descriptor`, then maintain that one index B-tree by diffing the
1393/// `old` field keys against the `new` ones through the shared
1394/// non-generic composition seam
1395/// ([`crate::index_maint::maintain_index_from_keys`]).
1396///
1397/// An unknown or non-`Active` index name is [`Error::IndexNotFound`]
1398/// — the raw write refuses rather than silently dropping the entry.
1399fn maintain_one_raw_index(
1400 pager: &mut Pager<FileHandle>,
1401 descriptor: &mut CollectionDescriptor,
1402 collection: &str,
1403 index_name: &str,
1404 old: &[(String, Vec<u8>)],
1405 new: &[(String, Vec<u8>)],
1406 id: Id,
1407) -> Result<()> {
1408 let idx = descriptor
1409 .indexes
1410 .iter()
1411 .position(|d| d.name == index_name && d.status == IndexStatus::Active)
1412 .ok_or_else(|| Error::IndexNotFound {
1413 collection: collection.to_owned(),
1414 name: index_name.to_owned(),
1415 })?;
1416 let spec = crate::index_maint::descriptor_to_spec(&descriptor.indexes[idx])?;
1417 let old_keys = keys_for_index(old, index_name);
1418 let new_keys = keys_for_index(new, index_name);
1419 crate::index_maint::maintain_index_from_keys(
1420 pager, descriptor, idx, collection, &spec, &old_keys, &new_keys, id,
1421 )
1422}
1423
1424/// Ensure a (raw-bytes) collection exists, lazy-creating an empty
1425/// primary B-tree on first call. Used by the C ABI's insert /
1426/// upsert paths. Distinct from `Collection::open_or_create` because
1427/// raw-bytes writes do NOT participate in the typed-reconciliation
1428/// path — no `T::indexes()` to walk.
1429fn ensure_collection_raw(
1430 inner: &obj_core::WriteTxn<'_, FileHandle>,
1431 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1432 name: &str,
1433) -> Result<CollectionDescriptor> {
1434 let mut pager = inner.lock_pager()?;
1435 let mut catalog_guard = lock_catalog(catalog)?;
1436 if let Some(d) = catalog_guard.get(&mut pager, name)? {
1437 return Ok(d);
1438 }
1439 let tree = BTree::<FileHandle>::empty(&mut pager)?;
1440 let descriptor = CollectionDescriptor::new(0, tree.root().get(), RAW_BYTES_TYPE_VERSION);
1441 let _id = catalog_guard.insert(&mut pager, name, descriptor)?;
1442 catalog_guard
1443 .get(&mut pager, name)?
1444 .ok_or(Error::Corruption { page_id: 0 })
1445}
1446
1447/// Look up the descriptor for `name`, returning
1448/// `Err(CollectionNotFound)` if absent. Used by `update` / `delete`
1449/// where lazy-create would mask the caller's mistake.
1450fn catalog_get_required(
1451 inner: &obj_core::WriteTxn<'_, FileHandle>,
1452 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1453 name: &str,
1454) -> Result<CollectionDescriptor> {
1455 let mut pager = inner.lock_pager()?;
1456 let catalog_guard = lock_catalog(catalog)?;
1457 catalog_guard
1458 .get(&mut pager, name)?
1459 .ok_or_else(|| Error::CollectionNotFound {
1460 name: name.to_owned(),
1461 })
1462}
1463
1464/// Snapshot-aware descriptor lookup on the read side. Returns
1465/// `Ok(None)` when the collection is absent at the snapshot's
1466/// pinned LSN.
1467fn read_descriptor_via_snapshot(
1468 env: &TxnEnv<FileHandle>,
1469 snapshot: &ReaderSnapshot<FileHandle>,
1470 name: &str,
1471) -> Result<Option<CollectionDescriptor>> {
1472 let pager = lock_pager(env)?;
1473 Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1474}
1475
1476/// Open a primary-tree handle from a descriptor's `primary_root`.
1477fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1478 let root_pid =
1479 PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1480 BTree::<FileHandle>::open(pager, root_pid)
1481}
1482
1483/// Wrap a raw payload with the on-disk [`DocumentHeader`] carrying
1484/// the caller-supplied `type_version`. The header stamps
1485/// `collection_id` (so a cross-collection forgery is detectable on
1486/// read), `type_version`, `payload_len`, and a CRC32C of the
1487/// payload.
1488///
1489/// The on-disk bytes produced here are byte-identical to what
1490/// [`obj_core::codec::encode`] emits for the same logical payload +
1491/// `type_version` — this is the key plumbing point that closes the
1492/// cross-language header-level interop gap (#13).
1493///
1494/// Returns [`Error::DocumentTooLarge`] if `payload.len() + 16` would
1495/// not fit inline in a B-tree leaf — mirrors
1496/// [`obj_core::codec::encode`]'s overflow handling.
1497fn wrap_raw_payload_with_version(
1498 collection_id: u32,
1499 payload: &[u8],
1500 type_version: u32,
1501) -> Result<Vec<u8>> {
1502 let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
1503 len: payload.len(),
1504 max: MAX_INLINE_DOC,
1505 })?;
1506 let total = DOC_HEADER_SIZE
1507 .checked_add(payload.len())
1508 .ok_or(Error::DocumentTooLarge {
1509 len: usize::MAX,
1510 max: MAX_INLINE_DOC,
1511 })?;
1512 if total > MAX_INLINE_DOC {
1513 return Err(Error::DocumentTooLarge {
1514 len: total,
1515 max: MAX_INLINE_DOC,
1516 });
1517 }
1518 let header = DocumentHeader {
1519 collection_id,
1520 type_version,
1521 payload_len,
1522 payload_crc32c: crc32c(payload),
1523 };
1524 let mut out = Vec::with_capacity(total);
1525 header.write_to(&mut out);
1526 out.extend_from_slice(payload);
1527 Ok(out)
1528}
1529
1530/// Strip the per-doc header and return `(payload, type_version)`.
1531/// Validates the header's `collection_id`, total length, and
1532/// payload CRC32C — surfaces [`Error::Corruption`] /
1533/// [`Error::CollectionIdMismatch`] on any mismatch. Exposes the
1534/// stored `type_version` so the typed read path can dispatch on
1535/// it directly.
1536fn strip_raw_payload_with_version(
1537 bytes: &[u8],
1538 expected_collection_id: u32,
1539) -> Result<(Vec<u8>, u32)> {
1540 let header = DocumentHeader::read_from(bytes)?;
1541 if header.collection_id != expected_collection_id {
1542 return Err(Error::CollectionIdMismatch {
1543 expected: expected_collection_id,
1544 found: header.collection_id,
1545 });
1546 }
1547 let payload_len =
1548 usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
1549 let total = DOC_HEADER_SIZE
1550 .checked_add(payload_len)
1551 .ok_or(Error::Corruption { page_id: 0 })?;
1552 if bytes.len() != total {
1553 return Err(Error::Corruption { page_id: 0 });
1554 }
1555 let payload = &bytes[DOC_HEADER_SIZE..total];
1556 if crc32c(payload) != header.payload_crc32c {
1557 return Err(Error::Corruption { page_id: 0 });
1558 }
1559 Ok((payload.to_vec(), header.type_version))
1560}
1561
1562/// Count every entry in a primary B-tree without decoding the
1563/// records. Used by the FFI `count_all_raw` path.
1564///
1565/// Snapshot-pinned (M12 #12): the full-tree scan resolves every page
1566/// read as-of the read txn's `snapshot`, so a concurrent writer's
1567/// post-snapshot inserts/deletes cannot perturb the count — it stays
1568/// consistent with the read txn's pinned LSN.
1569fn count_via_btree_range_full(
1570 env: &TxnEnv<FileHandle>,
1571 snapshot: &ReaderSnapshot<FileHandle>,
1572 primary_root: u64,
1573) -> Result<u64> {
1574 let pager = lock_pager(env)?;
1575 let root = PageId::new(primary_root)
1576 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1577 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, ..)?;
1578 let mut n: u64 = 0;
1579 for step in iter {
1580 let _ = step?;
1581 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1582 reason: "primary tree entry count exceeds u64",
1583 })?;
1584 }
1585 Ok(n)
1586}
1587
1588/// Walk an index B-tree by raw-byte key range and return every
1589/// (`full_key`, `value_bytes`) entry inside the range. Used by the
1590/// FFI `index_range_raw` / `count_index_range_raw` paths.
1591///
1592/// Snapshot-pinned (M12 #12): the descent and leaf-scan resolve every
1593/// page read as-of the read txn's `snapshot`, so a concurrent writer's
1594/// post-snapshot index entries do not leak into the range/count — the
1595/// enumeration stays consistent with the read txn's pinned LSN.
1596fn collect_index_range_entries(
1597 env: &TxnEnv<FileHandle>,
1598 snapshot: &ReaderSnapshot<FileHandle>,
1599 index_root: u64,
1600 start: std::ops::Bound<Vec<u8>>,
1601 end: std::ops::Bound<Vec<u8>>,
1602) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1603 let pager = lock_pager(env)?;
1604 let root =
1605 PageId::new(index_root).ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1606 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, (start, end))?;
1607 let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1608 for step in iter {
1609 out.push(step?);
1610 }
1611 Ok(out)
1612}
1613
1614/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1615/// `(Id, raw_payload)` pairs. For `Unique` indexes the id is the
1616/// VALUE; for other kinds it is the trailing 8 bytes of the key
1617/// (see `docs/format.md` § Index key encoding).
1618///
1619/// Snapshot-aware: each primary lookup goes through the read
1620/// txn's pinned snapshot so the result set is consistent across
1621/// the call.
1622fn materialize_id_payload_pairs(
1623 env: &TxnEnv<FileHandle>,
1624 snapshot: &ReaderSnapshot<FileHandle>,
1625 collection: &CollectionDescriptor,
1626 index: &obj_core::IndexDescriptor,
1627 entries: Vec<(Vec<u8>, Vec<u8>)>,
1628) -> Result<Vec<(Id, Vec<u8>)>> {
1629 let rows = materialize_id_version_payload_rows(env, snapshot, collection, index, entries)?;
1630 Ok(rows
1631 .into_iter()
1632 .map(|(id, _version, payload)| (id, payload))
1633 .collect())
1634}
1635
1636/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1637/// `(Id, type_version, raw_payload)` rows — the version-carrying form
1638/// of [`materialize_id_payload_pairs`]. The stored `type_version` comes
1639/// from each record's header (via [`strip_raw_payload_with_version`]),
1640/// letting the typed value-form `index_range` decode dispatch migration
1641/// at the version each record was written under. De-duplication +
1642/// id-resolution semantics are identical to the pairs form.
1643fn materialize_id_version_payload_rows(
1644 env: &TxnEnv<FileHandle>,
1645 snapshot: &ReaderSnapshot<FileHandle>,
1646 collection: &CollectionDescriptor,
1647 index: &obj_core::IndexDescriptor,
1648 entries: Vec<(Vec<u8>, Vec<u8>)>,
1649) -> Result<Vec<(Id, u32, Vec<u8>)>> {
1650 let mut out: Vec<(Id, u32, Vec<u8>)> = Vec::with_capacity(entries.len());
1651 let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
1652 let primary_root = PageId::new(collection.primary_root)
1653 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1654 let pager = lock_pager(env)?;
1655 for (full_key, value) in entries {
1656 let id_u64 = index_entry_id(index.kind, &full_key, &value)?;
1657 if index.kind == obj_core::IndexKind::Each && !emitted.insert(id_u64) {
1658 continue;
1659 }
1660 if index.kind != obj_core::IndexKind::Each {
1661 emitted.insert(id_u64);
1662 }
1663 let id = Id::try_new(id_u64).ok_or(Error::Corruption { page_id: 0 })?;
1664 let primary_bytes = BTree::<FileHandle>::get_via_snapshot(
1665 &pager,
1666 snapshot,
1667 primary_root,
1668 &id.to_be_bytes(),
1669 )?
1670 .ok_or(Error::Corruption { page_id: 0 })?;
1671 let (payload, version) =
1672 strip_raw_payload_with_version(&primary_bytes, collection.collection_id)?;
1673 out.push((id, version, payload));
1674 }
1675 Ok(out)
1676}
1677
1678/// Extract the document `Id` (as `u64`) an index entry points at. For
1679/// `Unique` indexes the id is the entry VALUE; for every other kind it
1680/// is the trailing 8 bytes of the full key (see `docs/format.md`
1681/// § Index key encoding). Shared by both materialisers.
1682fn index_entry_id(kind: obj_core::IndexKind, full_key: &[u8], value: &[u8]) -> Result<u64> {
1683 if kind == obj_core::IndexKind::Unique {
1684 return Ok(Id::from_be_bytes(value)
1685 .ok_or(Error::Corruption { page_id: 0 })?
1686 .get());
1687 }
1688 if full_key.len() < 8 {
1689 return Err(Error::Corruption { page_id: 0 });
1690 }
1691 Ok(Id::from_be_bytes(&full_key[full_key.len() - 8..])
1692 .ok_or(Error::Corruption { page_id: 0 })?
1693 .get())
1694}