obj/txn.rs
1//! Public `WriteTxn` / `ReadTxn` types.
2//!
3//! Thin wrappers over `obj_core::txn::{WriteTxn, ReadTxn}` that
4//! attach a [`Catalog`] reference (the obj-core txn types are
5//! catalog-agnostic; the catalog is the obj crate's responsibility).
6//!
7//! The catalog handle is `Arc<Mutex<Catalog<FileHandle>>>`. Lock
8//! ordering: **always acquire the pager mutex (via the txn env)
9//! BEFORE the catalog mutex**.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, Mutex};
13
14use obj_core::btree::BTree;
15use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
16use obj_core::index::EncodedIndexKey;
17use obj_core::pager::checksum::crc32c;
18use obj_core::pager::page::PageId;
19use obj_core::pager::Pager;
20use obj_core::platform::FileHandle;
21use obj_core::{
22 Catalog, CollectionDescriptor, Document, Error, Id, IndexStatus, ReaderSnapshot, Result, TxnEnv,
23};
24
25use crate::collection::Collection;
26
27/// `type_version` stamped on documents written via the C ABI raw-
28/// bytes path. The C caller has no Rust `Document::VERSION`; we
29/// stamp 1 so the value is recognisable in dump output and the
30/// existing schema-version-from-future / migration logic still
31/// applies when a Rust-typed reader opens the same collection.
32///
33/// Bumping this constant is breaking for any consumer that has
34/// written raw-bytes data with the old value, so leave at 1
35/// pre-1.0.
36pub(crate) const RAW_BYTES_TYPE_VERSION: u32 = 1;
37
38/// Public write transaction.
39///
40/// Acquired by [`crate::Db::transaction`]. Holds the in-process
41/// write-serialization mutex + cross-process `WRITER_LOCK` for its
42/// entire lifetime. `commit` / `rollback` consume `self`; dropping
43/// without explicitly committing rolls back automatically.
44pub struct WriteTxn<'db> {
45 pub(crate) inner: obj_core::WriteTxn<'db, FileHandle>,
46 pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
47 /// Per-process cache of `(collection_name)` pairs whose
48 /// `T::indexes()` reconciliation has already run. Reconciliation
49 /// is idempotent but expensive (a catalog walk + index
50 /// declarations); caching keeps the first
51 /// `WriteTxn::collection::<T>()` call per-process per-collection
52 /// as the only one that pays the cost.
53 ///
54 /// #93 — membership is promoted into this SHARED set ONLY after a
55 /// successful [`Self::commit`]. During a txn the names live in the
56 /// per-txn [`Self::reconciled_staged`] set instead, so a
57 /// rolled-back first-ever txn never poisons this set into skipping
58 /// reconciliation on a later (committed) txn in the same process.
59 pub(crate) reconciled: Arc<Mutex<HashSet<String>>>,
60 /// #93 — per-transaction staged set of collection names whose
61 /// `T::indexes()` reconciliation has run INSIDE this (not-yet-
62 /// committed) txn. The skip-check in
63 /// [`crate::collection::reconcile_indexes_once`] is `shared ∪
64 /// staged`, so a second handle of the same collection in one txn
65 /// still skips the (idempotent) catalog walk — but the names are
66 /// only folded into the shared [`Self::reconciled`] set by
67 /// [`Self::commit`] AFTER the WAL commit succeeds. On rollback /
68 /// drop this set is discarded with NO shared-set mutation, so a
69 /// rolled-back lazy-create leaves the shared cache untouched and
70 /// the next txn re-reconciles correctly.
71 ///
72 /// Not behind a mutex: a `WriteTxn` is single-threaded (it holds
73 /// the write-serialization lock for its whole life and is borrowed
74 /// `&mut` by every write), so interior mutability via the staging
75 /// helpers is sufficient.
76 pub(crate) reconciled_staged: HashSet<String>,
77 /// #90 — batch-aware catalog flush. Per-transaction cache of the
78 /// LIVE [`CollectionDescriptor`] for every collection touched by
79 /// a write, keyed by collection name. This is the SOLE mid-txn
80 /// source of truth for `next_id`, `primary_root`, and each
81 /// index's `root_page_id`: every write bumps / advances these
82 /// IN-MEMORY rather than rewriting the catalog B-tree per doc.
83 /// [`Self::commit`] flushes each entry back through
84 /// `Catalog::update` exactly ONCE (see
85 /// [`Self::flush_descriptors`]). On rollback / drop the cache is
86 /// discarded with no catalog-tree side effects.
87 ///
88 /// The `Arc<Mutex<_>>` is cloned into each [`Collection`]'s
89 /// `WriteRef` so two handles of the same collection opened in one
90 /// txn share the single entry.
91 pub(crate) descriptors: crate::collection::DescriptorCache,
92}
93
94impl<'db> WriteTxn<'db> {
95 /// Construct a `WriteTxn` directly from its three pieces.
96 /// Public so the FFI layer ([`libobj`](../../libobj/index.html))
97 /// can build an owned write txn whose lifetime extends past a
98 /// single `Db::transaction` closure call.
99 ///
100 /// User-Rust callers should reach for `Db::transaction` — the
101 /// closure shape handles commit / rollback / drop semantics
102 /// correctly without needing direct construction.
103 #[doc(hidden)]
104 #[must_use]
105 pub fn from_parts(
106 inner: obj_core::WriteTxn<'db, FileHandle>,
107 catalog: Arc<Mutex<Catalog<FileHandle>>>,
108 reconciled: Arc<Mutex<HashSet<String>>>,
109 ) -> Self {
110 Self {
111 inner,
112 catalog,
113 reconciled,
114 reconciled_staged: HashSet::new(),
115 descriptors: crate::collection::new_descriptor_cache(),
116 }
117 }
118
119 pub(crate) fn new(
120 inner: obj_core::WriteTxn<'db, FileHandle>,
121 catalog: Arc<Mutex<Catalog<FileHandle>>>,
122 reconciled: Arc<Mutex<HashSet<String>>>,
123 ) -> Self {
124 Self::from_parts(inner, catalog, reconciled)
125 }
126
127 /// Open a typed handle to the collection `T` lives in.
128 ///
129 /// Lazily creates the catalog row + an empty primary B-tree on
130 /// first call for a given `T` inside the current process. The
131 /// catalog mutation is staged in the same WAL transaction as
132 /// the user's subsequent writes — a rolled-back txn leaves no
133 /// half-created collection.
134 ///
135 /// # Errors
136 ///
137 /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
138 /// - Any error the pager / B-tree / postcard codec returns.
139 pub fn collection<T: Document>(&mut self) -> Result<Collection<'_, T>> {
140 // M11 #93: namespaced collections live in attached
141 // read-only databases. Reject writes eagerly at handle-
142 // open time so the caller does not chase an opaque "no
143 // such collection" error later.
144 if let (Some(namespace), tail) = crate::db::split_namespace(T::COLLECTION) {
145 return Err(Error::AttachedDatabaseIsReadOnly {
146 namespace: namespace.to_owned(),
147 collection: tail.to_owned(),
148 });
149 }
150 Collection::open_or_create(self)
151 }
152
153 /// Commit the transaction.
154 ///
155 /// #90: flushes every cached [`CollectionDescriptor`] back to the
156 /// catalog (one `Catalog::update` per touched collection) BEFORE
157 /// the WAL commit, so the coalesced `next_id` / `primary_root` /
158 /// index-root advances land durably in the same transaction as the
159 /// document + index writes. A flush failure aborts the commit (the
160 /// `?` propagates and `self` drops, rolling the WAL back) rather
161 /// than committing a half-flushed catalog.
162 ///
163 /// #93: AFTER the WAL commit succeeds, fold this txn's staged
164 /// reconciled-collection names into the shared per-process
165 /// [`Self::reconciled`] set, so the expensive `T::indexes()`
166 /// reconciliation is skipped for those collections on later txns.
167 /// Promotion is deliberately POST-commit: a rolled-back txn never
168 /// reaches here, so it cannot poison the shared cache into skipping
169 /// reconciliation against a catalog whose index rows it just rolled
170 /// back. A poisoned `reconciled` mutex maps to `Error::Busy` (Rule
171 /// 7) but does NOT un-commit the durable WAL state.
172 ///
173 /// # Errors
174 ///
175 /// As [`obj_core::WriteTxn::commit`], plus any catalog / pager /
176 /// postcard error surfaced by the descriptor flush, plus
177 /// [`Error::Busy`] if the shared `reconciled` mutex is poisoned at
178 /// promotion time (after the commit has already landed durably).
179 pub fn commit(self) -> Result<()> {
180 self.flush_descriptors()?;
181 // Move the pieces out before consuming `inner`: `commit`
182 // takes `self` by value, and `self.inner.commit()` moves the
183 // inner txn, so the shared/staged handles must be captured
184 // first.
185 let Self {
186 inner,
187 reconciled,
188 reconciled_staged,
189 ..
190 } = self;
191 inner.commit()?;
192 promote_reconciled(&reconciled, reconciled_staged)
193 }
194
195 /// #90: persist every cached descriptor back to the catalog
196 /// B-tree exactly once. Called by [`Self::commit`] before the WAL
197 /// commit. Iterates the per-txn descriptor cache (one entry per
198 /// touched collection — Rule 2 bound is the touched-collection
199 /// count) and issues one `Catalog::update` apiece, propagating the
200 /// first failure via `?` so a partial flush aborts the commit.
201 ///
202 /// Lock order matches every write path: pager BEFORE catalog (the
203 /// descriptor-cache lock is acquired and released first, so it is
204 /// never held across the pager/catalog locks).
205 fn flush_descriptors(&self) -> Result<()> {
206 let entries: Vec<(String, CollectionDescriptor)> = {
207 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
208 if cache.is_empty() {
209 return Ok(());
210 }
211 cache
212 .iter()
213 .map(|(name, descriptor)| (name.clone(), descriptor.clone()))
214 .collect()
215 };
216 let mut pager = lock_pager(self.inner.env())?;
217 let mut catalog = lock_catalog(&self.catalog)?;
218 for (name, descriptor) in &entries {
219 catalog.update(&mut pager, name, descriptor)?;
220 }
221 Ok(())
222 }
223
224 /// Roll the transaction back.
225 ///
226 /// # Errors
227 ///
228 /// As [`obj_core::WriteTxn::rollback`].
229 pub fn rollback(self) -> Result<()> {
230 self.inner.rollback()
231 }
232
233 /// **FFI shim**: insert a raw-bytes document into `collection`,
234 /// returning the freshly-allocated [`Id`].
235 ///
236 /// The payload is stored as-is; the on-disk record carries the
237 /// standard 16-byte [`DocumentHeader`] with
238 /// `type_version = RAW_BYTES_TYPE_VERSION` and a CRC32C of the
239 /// payload. The collection is lazy-created in the same WAL
240 /// transaction if it does not already exist.
241 ///
242 /// **Index maintenance does NOT run** on the raw-bytes path —
243 /// the C ABI's caller has no schema introspection. Documents
244 /// inserted through this path are invisible to indexes built
245 /// by typed [`Document`] writers, until a Rust-side
246 /// `WriteTxn::collection::<T>()` rewrites them.
247 ///
248 /// Forwards to [`Self::insert_with_version`] with
249 /// `type_version = RAW_BYTES_TYPE_VERSION`. Callers that have
250 /// a meaningful schema version to stamp (e.g. obj-py's typed
251 /// path, which knows `cls.__obj_version__`) should call
252 /// [`Self::insert_with_version`] directly.
253 ///
254 /// # Errors
255 ///
256 /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
257 /// collections (attached dbs are read-only).
258 /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
259 /// B-tree inline cap.
260 /// - Pager / catalog errors propagated.
261 #[doc(hidden)]
262 pub fn insert_raw_bytes(&mut self, collection: &str, payload: &[u8]) -> Result<Id> {
263 self.insert_with_version(collection, payload, RAW_BYTES_TYPE_VERSION)
264 }
265
266 /// **Engine API**: insert a raw-bytes document into `collection`
267 /// with a caller-supplied `type_version` stamped in the per-doc
268 /// header. Returns the freshly-allocated [`Id`].
269 ///
270 /// This is the byte-identity entry point for cross-language
271 /// writers (obj-py's typed `Db.insert(instance)` calls this
272 /// with `cls.__obj_version__`, matching what Rust's
273 /// `#[derive(Document)]` stamps via [`obj_core::codec::encode`]).
274 /// Apart from the version source, the behaviour is identical to
275 /// [`Self::insert_raw_bytes`].
276 ///
277 /// **Index maintenance does NOT run** — same caveat as the
278 /// raw-bytes shim. Use [`Self::collection`] for typed writes
279 /// that need index maintenance.
280 ///
281 /// # Errors
282 ///
283 /// - [`Error::AttachedDatabaseIsReadOnly`] for namespaced
284 /// collections.
285 /// - [`Error::DocumentTooLarge`] if `payload.len() + 16` > the
286 /// B-tree inline cap.
287 /// - Pager / catalog errors propagated.
288 #[doc(hidden)]
289 pub fn insert_with_version(
290 &mut self,
291 collection: &str,
292 payload: &[u8],
293 type_version: u32,
294 ) -> Result<Id> {
295 reject_namespaced_write(collection)?;
296 let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
297 let mut pager = lock_pager(self.inner.env())?;
298 let catalog = lock_catalog(&self.catalog)?;
299 // #90: the per-txn descriptor cache is the sole mid-txn source
300 // of truth — `next_id` is an in-memory bump, `primary_root`
301 // advances in the cache, and NO per-doc `Catalog::update`
302 // fires (the single flush is deferred to commit). This keeps
303 // the raw-bytes path coherent with the typed path if both
304 // touch the same collection in one transaction.
305 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
306 let descriptor =
307 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
308 let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || collection.to_owned())?;
309 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
310 let key = id.to_be_bytes();
311 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
312 tree.insert(&mut pager, &key, &bytes)?;
313 descriptor.primary_root = tree.root().get();
314 Ok(id)
315 }
316
317 /// **FFI shim**: fetch the raw payload of the document at `id`
318 /// in `collection`. Returns `Ok(None)` if absent. The returned
319 /// `Vec<u8>` is the payload only (the 16-byte per-doc header
320 /// is stripped).
321 ///
322 /// Forwards to [`Self::get_with_version`] and discards the
323 /// stored version. Use [`Self::get_with_version`] directly when
324 /// the caller needs the header's `type_version` (e.g. obj-py's
325 /// typed read path, which dispatches schema migration on it).
326 ///
327 /// # Errors
328 ///
329 /// - [`Error::CollectionNotFound`] if the collection is unknown.
330 /// - [`Error::Corruption`] if the on-disk record is malformed.
331 /// - Pager / catalog errors propagated.
332 #[doc(hidden)]
333 pub fn get_raw_bytes(&mut self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
334 Ok(self
335 .get_with_version(collection, id)?
336 .map(|(payload, _version)| payload))
337 }
338
339 /// **Engine API**: fetch the raw payload AND stored
340 /// `type_version` of the document at `id` in `collection`.
341 /// Returns `Ok(None)` if absent.
342 ///
343 /// Companion read accessor for the version-aware write path
344 /// ([`Self::insert_with_version`]) — used by obj-py's typed
345 /// read pipeline to dispatch directly on the stored header
346 /// version instead of the historical try-decode-walk heuristic.
347 ///
348 /// # Errors
349 ///
350 /// As [`Self::get_raw_bytes`].
351 #[doc(hidden)]
352 pub fn get_with_version(&mut self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
353 // #90: a write-side raw get must descend the LIVE primary root
354 // — prefer the per-txn cache (which carries any uncommitted
355 // primary-root advance from a prior raw write in this txn) and
356 // fall back to the catalog tree if the collection has not been
357 // written in the txn.
358 let descriptor = self.live_descriptor_required(collection)?;
359 let mut pager = lock_pager(self.inner.env())?;
360 let tree = btree_handle(&pager, descriptor.primary_root)?;
361 let key = id.to_be_bytes();
362 match tree.get(&mut pager, &key)? {
363 Some(bytes) => Ok(Some(strip_raw_payload_with_version(
364 &bytes,
365 descriptor.collection_id,
366 )?)),
367 None => Ok(None),
368 }
369 }
370
371 /// #90: resolve the LIVE descriptor for `collection`, preferring
372 /// the per-txn cache (with its in-memory root advances) over the
373 /// catalog tree. Returns `CollectionNotFound` if neither has it.
374 /// Returns an owned clone so the caller does not hold the cache
375 /// lock across the subsequent pager work.
376 fn live_descriptor_required(&self, collection: &str) -> Result<CollectionDescriptor> {
377 {
378 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
379 if let Some(descriptor) = cache.get(collection) {
380 return Ok(descriptor.clone());
381 }
382 }
383 catalog_get_required(&self.inner, &self.catalog, collection)
384 }
385
386 /// **FFI shim**: update the document at `id` in `collection`
387 /// to `payload` bytes. Returns [`Error::DocumentNotFound`] if
388 /// the id is absent.
389 ///
390 /// Forwards to [`Self::update_with_version`] with
391 /// `type_version = RAW_BYTES_TYPE_VERSION`.
392 ///
393 /// # Errors
394 ///
395 /// - [`Error::DocumentNotFound`] if `id` does not exist.
396 /// - [`Error::AttachedDatabaseIsReadOnly`] / [`Error::DocumentTooLarge`]
397 /// etc as [`Self::insert_raw_bytes`].
398 #[doc(hidden)]
399 pub fn update_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
400 self.update_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
401 }
402
403 /// **Engine API**: update the document at `id` in `collection`
404 /// to `payload` bytes, stamping the per-doc header's
405 /// `type_version` with the caller-supplied value.
406 ///
407 /// Companion to [`Self::insert_with_version`] for the typed
408 /// write path.
409 ///
410 /// # Errors
411 ///
412 /// As [`Self::update_raw_bytes`].
413 #[doc(hidden)]
414 pub fn update_with_version(
415 &mut self,
416 collection: &str,
417 id: Id,
418 payload: &[u8],
419 type_version: u32,
420 ) -> Result<()> {
421 reject_namespaced_write(collection)?;
422 // #90: route the descriptor through the per-txn cache so the
423 // primary-root advance coalesces into the single commit flush.
424 let exists = self.collection_exists(collection)?;
425 if !exists {
426 return Err(Error::CollectionNotFound {
427 name: collection.to_owned(),
428 });
429 }
430 let mut pager = lock_pager(self.inner.env())?;
431 let catalog = lock_catalog(&self.catalog)?;
432 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
433 let descriptor =
434 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
435 let key = id.to_be_bytes();
436 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
437 if tree.get(&mut pager, &key)?.is_none() {
438 // The collection name is owned (`String`); the
439 // DocumentNotFound variant's `collection` is `&'static
440 // str` (designed for typed Document::COLLECTION). Use a
441 // bespoke variant when widening would land — for v0 the
442 // closest match is Corruption with a synthetic page_id,
443 // but that's misleading; use InvalidArgument with the
444 // standard "not found" semantics surfaced by the caller.
445 return Err(Error::CollectionNotFound {
446 name: format!("{collection}#{}", id.get()),
447 });
448 }
449 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
450 tree.delete(&mut pager, &key)?;
451 tree.insert(&mut pager, &key, &bytes)?;
452 descriptor.primary_root = tree.root().get();
453 Ok(())
454 }
455
456 /// **FFI shim**: delete the document at `id` in `collection`.
457 /// Returns `Ok(true)` if it existed, `Ok(false)` if not.
458 ///
459 /// # Errors
460 ///
461 /// Pager / catalog errors propagated.
462 #[doc(hidden)]
463 pub fn delete_raw_bytes(&mut self, collection: &str, id: Id) -> Result<bool> {
464 reject_namespaced_write(collection)?;
465 if !self.collection_exists(collection)? {
466 return Err(Error::CollectionNotFound {
467 name: collection.to_owned(),
468 });
469 }
470 let mut pager = lock_pager(self.inner.env())?;
471 let catalog = lock_catalog(&self.catalog)?;
472 // #90: advance the primary root in the per-txn cache; the
473 // single catalog flush is deferred to commit.
474 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
475 let descriptor =
476 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
477 let key = id.to_be_bytes();
478 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
479 let removed = tree.delete(&mut pager, &key)?;
480 descriptor.primary_root = tree.root().get();
481 Ok(removed)
482 }
483
484 /// **FFI shim**: insert-or-replace the document at `id` in
485 /// `collection` to `payload` bytes.
486 ///
487 /// Forwards to [`Self::upsert_with_version`] with
488 /// `type_version = RAW_BYTES_TYPE_VERSION`.
489 ///
490 /// # Errors
491 ///
492 /// As [`Self::insert_raw_bytes`].
493 #[doc(hidden)]
494 pub fn upsert_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
495 self.upsert_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
496 }
497
498 /// **Engine API**: insert-or-replace the document at `id` in
499 /// `collection`, stamping the per-doc header's `type_version`
500 /// with the caller-supplied value. Companion to
501 /// [`Self::insert_with_version`] for the typed upsert path.
502 ///
503 /// # Errors
504 ///
505 /// As [`Self::insert_with_version`].
506 #[doc(hidden)]
507 pub fn upsert_with_version(
508 &mut self,
509 collection: &str,
510 id: Id,
511 payload: &[u8],
512 type_version: u32,
513 ) -> Result<()> {
514 reject_namespaced_write(collection)?;
515 let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
516 let mut pager = lock_pager(self.inner.env())?;
517 let catalog = lock_catalog(&self.catalog)?;
518 // #90: advance the primary root in the per-txn cache; flush
519 // once at commit.
520 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
521 let descriptor =
522 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
523 let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
524 let key = id.to_be_bytes();
525 let mut tree = btree_handle(&pager, descriptor.primary_root)?;
526 let _ = tree.delete(&mut pager, &key)?;
527 tree.insert(&mut pager, &key, &bytes)?;
528 descriptor.primary_root = tree.root().get();
529 Ok(())
530 }
531
532 // ---------- index-maintaining raw-bytes writes (FFI) ----------
533
534 /// **Engine API**: insert a raw-bytes document into `collection`
535 /// AND maintain the named secondary indexes from caller-supplied
536 /// field-encoded keys. Returns the freshly-allocated [`Id`].
537 ///
538 /// Unlike [`Self::insert_raw_bytes`] (primary-only), this is the
539 /// schema-bearing raw write: the C ABI cannot reflect index keys
540 /// out of an opaque payload, so the CALLER supplies one
541 /// `(index_name, field_key)` entry per index value, where
542 /// `field_key` is the order-preserving encoding of the indexed
543 /// field (produced by `obj_core::index::encode_field` —
544 /// `libobj::obj_index_key_encode` wraps it). obj does the
545 /// kind-specific STORAGE-key composition (append the `Id` suffix
546 /// for `Standard` / `Each` / `Composite`; use the field key as-is
547 /// with the `Id` as value + enforce uniqueness for `Unique`),
548 /// matching the typed path byte-for-byte.
549 ///
550 /// Atomicity: the primary insert + every index maintenance lands
551 /// inside the same WAL transaction. An unknown index name
552 /// ([`Error::IndexNotFound`]) or a uniqueness violation
553 /// ([`Error::UniqueConstraintViolation`]) surfaces via `?` and the
554 /// surrounding [`WriteTxn`] rolls back the staged primary write
555 /// atomically — no half-written index.
556 ///
557 /// # Errors
558 ///
559 /// - [`Error::IndexNotFound`] if an entry names an index that does
560 /// not exist or is not `Active` on `collection`.
561 /// - [`Error::UniqueConstraintViolation`] if a `Unique` entry's
562 /// key already maps to a different document.
563 /// - As [`Self::insert_with_version`] (namespaced / too-large /
564 /// pager / catalog).
565 pub fn insert_raw_indexed(
566 &mut self,
567 collection: &str,
568 payload: &[u8],
569 type_version: u32,
570 entries: &[(String, Vec<u8>)],
571 ) -> Result<Id> {
572 let id = self.insert_with_version(collection, payload, type_version)?;
573 self.maintain_raw_indexes(collection, id, &[], entries)?;
574 Ok(id)
575 }
576
577 /// **Engine API**: update the document at `id` in `collection` to
578 /// `payload` AND move its secondary-index entries from the OLD
579 /// caller-supplied field keys to the NEW ones.
580 ///
581 /// obj cannot re-derive the OLD index keys from the stored opaque
582 /// bytes, so the caller MUST supply BOTH the `remove` set (the
583 /// field keys the document indexed under before this update) and
584 /// the `add` set (the field keys it indexes under after). Each is
585 /// one `(index_name, field_key)` entry per index value. The
586 /// kind-specific composition + uniqueness enforcement matches
587 /// [`Self::insert_raw_indexed`].
588 ///
589 /// Atomicity: the primary update + every index removal/insertion
590 /// lands in the same WAL transaction; any error rolls the whole
591 /// thing back.
592 ///
593 /// # Errors
594 ///
595 /// - [`Error::CollectionNotFound`] if `id` does not exist.
596 /// - [`Error::IndexNotFound`] / [`Error::UniqueConstraintViolation`]
597 /// as [`Self::insert_raw_indexed`].
598 /// - As [`Self::update_with_version`].
599 pub fn update_raw_indexed(
600 &mut self,
601 collection: &str,
602 id: Id,
603 payload: &[u8],
604 type_version: u32,
605 remove: &[(String, Vec<u8>)],
606 add: &[(String, Vec<u8>)],
607 ) -> Result<()> {
608 self.update_with_version(collection, id, payload, type_version)?;
609 self.maintain_raw_indexes(collection, id, remove, add)?;
610 Ok(())
611 }
612
613 /// **Engine API**: delete the document at `id` in `collection`
614 /// AND remove its secondary-index entries given the caller-
615 /// supplied OLD field keys. Returns `Ok(true)` if the primary
616 /// record existed, `Ok(false)` if not.
617 ///
618 /// As with [`Self::update_raw_indexed`], obj cannot re-derive the
619 /// index keys from stored bytes, so the caller supplies the
620 /// `remove` set (one `(index_name, field_key)` per indexed value).
621 /// The index removals always run (even on `Ok(false)`) so a caller
622 /// can repair a known-stale index entry; this mirrors the typed
623 /// `Collection::delete` which also diffs against the supplied OLD
624 /// key set regardless of primary presence.
625 ///
626 /// # Errors
627 ///
628 /// - [`Error::IndexNotFound`] if a `remove` entry names an unknown
629 /// / non-`Active` index.
630 /// - As [`Self::delete_raw_bytes`].
631 pub fn delete_raw_indexed(
632 &mut self,
633 collection: &str,
634 id: Id,
635 remove: &[(String, Vec<u8>)],
636 ) -> Result<bool> {
637 let removed = self.delete_raw_bytes(collection, id)?;
638 self.maintain_raw_indexes(collection, id, remove, &[])?;
639 Ok(removed)
640 }
641
642 /// Apply the per-index removal (`old`) + addition (`new`) churn
643 /// for a raw-bytes write, composing the storage key per index
644 /// kind via the shared non-generic seam
645 /// [`crate::index_maint::maintain_index_from_keys`].
646 ///
647 /// Resolves each `(index_name, field_key)` entry to its `Active`
648 /// [`obj_core::IndexDescriptor`] by name, groups the OLD and NEW
649 /// field keys per index, and maintains every index touched by
650 /// either set. The (possibly COW-advanced) descriptor is persisted
651 /// back to the catalog once. Runs entirely under the pager +
652 /// catalog locks held for the whole call, inside the live WAL
653 /// transaction — so a mid-way error rolls back atomically.
654 fn maintain_raw_indexes(
655 &mut self,
656 collection: &str,
657 id: Id,
658 old: &[(String, Vec<u8>)],
659 new: &[(String, Vec<u8>)],
660 ) -> Result<()> {
661 if old.is_empty() && new.is_empty() {
662 return Ok(());
663 }
664 let mut pager = lock_pager(self.inner.env())?;
665 let catalog = lock_catalog(&self.catalog)?;
666 // #90: maintain against the per-txn cached descriptor (the
667 // sole mid-txn source of truth) so the unique pre-check sees
668 // prior eager index writes in this txn and the index-root
669 // advances accumulate in the cache. NO per-call
670 // `Catalog::update` — the single flush is deferred to commit.
671 let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
672 let descriptor =
673 crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
674 let touched = touched_index_names(old, new);
675 for index_name in &touched {
676 maintain_one_raw_index(&mut pager, descriptor, collection, index_name, old, new, id)?;
677 }
678 Ok(())
679 }
680
681 /// #90: does `collection` have a catalog row (either already in
682 /// the per-txn descriptor cache, or in the live catalog tree)?
683 /// Used by the raw update / delete paths to preserve their
684 /// `CollectionNotFound` contract before routing the descriptor
685 /// through the cache.
686 fn collection_exists(&self, collection: &str) -> Result<bool> {
687 {
688 let cache = crate::collection::lock_descriptors(&self.descriptors)?;
689 if cache.contains_key(collection) {
690 return Ok(true);
691 }
692 }
693 let mut pager = lock_pager(self.inner.env())?;
694 let catalog = lock_catalog(&self.catalog)?;
695 Ok(catalog.get(&mut pager, collection)?.is_some())
696 }
697}
698
699/// Public read transaction. Acquired by
700/// [`crate::Db::read_transaction`].
701///
702/// Carries only the obj-core read-side handle — the writer's live
703/// `Catalog` is NOT held by a `ReadTxn` because reads consult the
704/// snapshot-pinned catalog root via
705/// [`obj_core::Catalog::lookup_via_snapshot`] (M6 #53), not the
706/// live `Catalog.tree.root`.
707///
708/// M11 #93: a `ReadTxn` MAY also carry one `AttachedReadCtx` per
709/// attached database registered on the calling [`crate::Db`]. The
710/// per-attached snapshots are pinned at txn-begin time and released
711/// when the `ReadTxn` drops; reads against `<namespace>.<collection>`
712/// route through them.
713pub struct ReadTxn<'db> {
714 pub(crate) inner: obj_core::ReadTxn<'db, FileHandle>,
715 /// Per-attached-database read contexts, keyed by namespace.
716 /// Populated by [`crate::Db::read_transaction`] before the
717 /// closure runs; emptied when the txn drops.
718 pub(crate) attached: HashMap<String, AttachedReadCtx>,
719}
720
721impl<'db> ReadTxn<'db> {
722 /// Construct a `ReadTxn` from a bare obj-core handle. Public so
723 /// the FFI layer can build an owned read txn whose lifetime
724 /// extends past a single `Db::read_transaction` closure call.
725 ///
726 /// User-Rust callers should reach for `Db::read_transaction`.
727 #[doc(hidden)]
728 #[must_use]
729 pub fn from_parts(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
730 Self {
731 inner,
732 attached: HashMap::new(),
733 }
734 }
735
736 pub(crate) fn new(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
737 Self::from_parts(inner)
738 }
739
740 pub(crate) fn with_attached(
741 inner: obj_core::ReadTxn<'db, FileHandle>,
742 attached: HashMap<String, AttachedReadCtx>,
743 ) -> Self {
744 Self { inner, attached }
745 }
746
747 /// **FFI shim**: fetch the raw payload of the document at `id`
748 /// in `collection`, snapshot-consistent against the read txn's
749 /// pinned LSN. Returns `Ok(None)` if absent.
750 ///
751 /// Forwards to [`Self::get_with_version`] and discards the
752 /// stored version.
753 ///
754 /// # Errors
755 ///
756 /// - [`Error::CollectionNotFound`] if the collection is unknown.
757 /// - [`Error::Corruption`] if the on-disk record is malformed.
758 /// - Pager / catalog errors propagated.
759 #[doc(hidden)]
760 pub fn get_raw_bytes(&self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
761 Ok(self
762 .get_with_version(collection, id)?
763 .map(|(payload, _version)| payload))
764 }
765
766 /// **Engine API**: fetch the raw payload AND stored
767 /// `type_version` of the document at `id` in `collection`,
768 /// snapshot-consistent against the read txn's pinned LSN.
769 /// Returns `Ok(None)` if absent.
770 ///
771 /// Companion read accessor for the version-aware write path
772 /// ([`WriteTxn::insert_with_version`]) — used by obj-py's
773 /// typed read pipeline to dispatch on the stored header
774 /// version instead of the historical try-decode-walk heuristic.
775 ///
776 /// # Errors
777 ///
778 /// As [`Self::get_raw_bytes`].
779 #[doc(hidden)]
780 pub fn get_with_version(&self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
781 let descriptor =
782 read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)?
783 .ok_or_else(|| Error::CollectionNotFound {
784 name: collection.to_owned(),
785 })?;
786 let pager = lock_pager(self.inner.env())?;
787 let root = PageId::new(descriptor.primary_root)
788 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
789 let key = id.to_be_bytes();
790 let bytes = obj_core::btree::BTree::<FileHandle>::get_via_snapshot(
791 &pager,
792 self.inner.snapshot(),
793 root,
794 &key,
795 )?;
796 match bytes {
797 Some(b) => Ok(Some(strip_raw_payload_with_version(
798 &b,
799 descriptor.collection_id,
800 )?)),
801 None => Ok(None),
802 }
803 }
804
805 /// **FFI shim**: look up the descriptor for `collection`
806 /// against the snapshot. Returns `Ok(None)` if absent.
807 ///
808 /// Used by [`libobj`](../../libobj/index.html) for query /
809 /// iteration entry points.
810 ///
811 /// # Errors
812 ///
813 /// - Pager / catalog errors propagated.
814 #[doc(hidden)]
815 pub fn snapshot_descriptor(&self, collection: &str) -> Result<Option<CollectionDescriptor>> {
816 read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)
817 }
818
819 /// **FFI shim**: borrow the wrapped obj-core read txn. Used by
820 /// [`libobj`](../../libobj/index.html) iterators that need
821 /// snapshot-aware B-tree access.
822 #[doc(hidden)]
823 #[must_use]
824 pub fn inner(&self) -> &obj_core::ReadTxn<'db, FileHandle> {
825 &self.inner
826 }
827
828 /// **FFI shim**: resolve an `Active` index descriptor by name
829 /// on `collection`. Used by libobj's range / find_unique /
830 /// count paths.
831 ///
832 /// # Errors
833 ///
834 /// - [`Error::CollectionNotFound`] if the collection is absent.
835 /// - [`Error::IndexNotFound`] if the index is unknown or
836 /// `DroppedPending`.
837 /// - Pager / catalog errors propagated.
838 #[doc(hidden)]
839 pub fn snapshot_index_descriptor(
840 &self,
841 collection: &str,
842 index: &str,
843 ) -> Result<obj_core::IndexDescriptor> {
844 let descriptor =
845 self.snapshot_descriptor(collection)?
846 .ok_or_else(|| Error::CollectionNotFound {
847 name: collection.to_owned(),
848 })?;
849 let entry = descriptor.indexes.iter().find(|d| d.name == index);
850 match entry {
851 Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
852 _ => Err(Error::IndexNotFound {
853 collection: collection.to_owned(),
854 name: index.to_owned(),
855 }),
856 }
857 }
858
859 /// **FFI shim**: count every doc in `collection` snapshot-
860 /// consistently against the read txn's pinned LSN.
861 ///
862 /// # Errors
863 ///
864 /// As [`Self::snapshot_descriptor`] plus pager / B-tree.
865 #[doc(hidden)]
866 pub fn count_all_raw(&self, collection: &str) -> Result<u64> {
867 let descriptor =
868 self.snapshot_descriptor(collection)?
869 .ok_or_else(|| Error::CollectionNotFound {
870 name: collection.to_owned(),
871 })?;
872 count_via_btree_range_full(
873 self.inner.env(),
874 self.inner.snapshot(),
875 descriptor.primary_root,
876 )
877 }
878
879 /// **FFI shim**: walk an index B-tree by raw-byte key range
880 /// and collect the matching `(Id, raw_payload)` pairs. The
881 /// caller is responsible for encoding `lower` / `upper` per the
882 /// M7 order-preserving encoding.
883 ///
884 /// `lower_bound` / `upper_bound` use Rust's `std::ops::Bound`
885 /// shape (Included / Excluded / Unbounded).
886 ///
887 /// Materialises every result in a `Vec` — the result set is
888 /// bounded by [`obj_core::btree::MAX_RANGE_NODES`] inherited
889 /// from `BTree::range`. The libobj iterator yields these one
890 /// at a time.
891 ///
892 /// # Errors
893 ///
894 /// - [`Error::IndexNotFound`] / [`Error::CollectionNotFound`].
895 /// - Pager / B-tree errors propagated.
896 #[doc(hidden)]
897 pub fn index_range_raw(
898 &self,
899 collection: &str,
900 index: &str,
901 lower: std::ops::Bound<Vec<u8>>,
902 upper: std::ops::Bound<Vec<u8>>,
903 ) -> Result<Vec<(Id, Vec<u8>)>> {
904 let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
905 let collection_descriptor =
906 self.snapshot_descriptor(collection)?
907 .ok_or_else(|| Error::CollectionNotFound {
908 name: collection.to_owned(),
909 })?;
910 let (start, end) =
911 crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
912 let entries = collect_index_range_entries(
913 self.inner.env(),
914 self.inner.snapshot(),
915 index_descriptor.root_page_id,
916 start,
917 end,
918 )?;
919 materialize_id_payload_pairs(
920 self.inner.env(),
921 self.inner.snapshot(),
922 &collection_descriptor,
923 &index_descriptor,
924 entries,
925 )
926 }
927
928 /// **FFI shim**: count index B-tree entries inside `range`.
929 /// `lower` / `upper` follow the same raw-byte convention as
930 /// [`Self::index_range_raw`].
931 ///
932 /// # Errors
933 ///
934 /// As [`Self::index_range_raw`].
935 #[doc(hidden)]
936 pub fn count_index_range_raw(
937 &self,
938 collection: &str,
939 index: &str,
940 lower: std::ops::Bound<Vec<u8>>,
941 upper: std::ops::Bound<Vec<u8>>,
942 ) -> Result<u64> {
943 let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
944 let (start, end) =
945 crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
946 let entries = collect_index_range_entries(
947 self.inner.env(),
948 self.inner.snapshot(),
949 index_descriptor.root_page_id,
950 start,
951 end,
952 )?;
953 u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
954 reason: "index range entry count exceeds u64",
955 })
956 }
957
958 /// **FFI shim**: single-key lookup against a `Unique` index.
959 /// Returns the matched `(Id, payload)` or `Ok(None)`.
960 ///
961 /// `key_bytes` is the index key, pre-encoded by the caller per
962 /// the M7 order-preserving scheme.
963 ///
964 /// Forwards to [`Self::find_unique_with_version`] and discards
965 /// the stored version.
966 ///
967 /// # Errors
968 ///
969 /// - [`Error::IndexNotUnique`] if the index is not `Unique`.
970 /// - As [`Self::index_range_raw`].
971 #[doc(hidden)]
972 pub fn find_unique_raw(
973 &self,
974 collection: &str,
975 index: &str,
976 key_bytes: &[u8],
977 ) -> Result<Option<(Id, Vec<u8>)>> {
978 Ok(self
979 .find_unique_with_version(collection, index, key_bytes)?
980 .map(|(id, payload, _version)| (id, payload)))
981 }
982
983 /// **Engine API**: single-key lookup against a `Unique` index,
984 /// returning the matched `(Id, payload, type_version)` or
985 /// `Ok(None)`. Companion to [`Self::get_with_version`] for the
986 /// typed find path.
987 ///
988 /// # Errors
989 ///
990 /// As [`Self::find_unique_raw`].
991 #[doc(hidden)]
992 pub fn find_unique_with_version(
993 &self,
994 collection: &str,
995 index: &str,
996 key_bytes: &[u8],
997 ) -> Result<Option<(Id, Vec<u8>, u32)>> {
998 let index_descriptor = self.snapshot_index_descriptor(collection, index)?;
999 if index_descriptor.kind != obj_core::IndexKind::Unique {
1000 return Err(Error::IndexNotUnique {
1001 collection: collection.to_owned(),
1002 name: index.to_owned(),
1003 });
1004 }
1005 let id_bytes = {
1006 let pager = lock_pager(self.inner.env())?;
1007 let root = PageId::new(index_descriptor.root_page_id)
1008 .ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1009 BTree::<FileHandle>::get_via_snapshot(&pager, self.inner.snapshot(), root, key_bytes)?
1010 };
1011 match id_bytes {
1012 Some(bytes) => {
1013 let id = Id::from_be_bytes(&bytes).ok_or(Error::Corruption { page_id: 0 })?;
1014 match self.get_with_version(collection, id)? {
1015 Some((payload, version)) => Ok(Some((id, payload, version))),
1016 None => Err(Error::Corruption { page_id: 0 }),
1017 }
1018 }
1019 None => Ok(None),
1020 }
1021 }
1022
1023 /// Open a typed handle to the collection `T` lives in.
1024 ///
1025 /// Read-only: returns [`Error::CollectionNotFound`] if the
1026 /// collection has never been registered AT THE SNAPSHOT'S
1027 /// PINNED LSN.
1028 ///
1029 /// If `T::COLLECTION` is of the form `<namespace>.<name>`, the
1030 /// txn dispatches against the attached database registered under
1031 /// `<namespace>` instead of the calling Db.
1032 ///
1033 /// # Errors
1034 ///
1035 /// - [`Error::CollectionNotFound`] if `T::COLLECTION` is not
1036 /// registered in the catalog as-of the snapshot's pinned LSN.
1037 /// - [`Error::CollectionNamespaceUnknown`] if `T::COLLECTION`
1038 /// carries a namespace prefix that is not attached.
1039 /// - [`Error::Busy`] if the pager / catalog mutex is poisoned.
1040 pub fn collection<T: Document>(&self) -> Result<Collection<'_, T>> {
1041 Collection::open_readonly(self)
1042 }
1043}
1044
1045/// Per-attached-database read context carried inside a [`ReadTxn`].
1046/// Pins one [`ReaderSnapshot`] against the attached database for the
1047/// duration of the calling Db's read transaction.
1048pub(crate) struct AttachedReadCtx {
1049 /// Calling-side stable reference to the attached env. Cloned
1050 /// from the [`AttachedDb`]'s `env` at txn-begin time so the
1051 /// `ReadTxn` does not retain a borrow on the calling Db's
1052 /// attached-registry mutex.
1053 pub(crate) env: Arc<TxnEnv<FileHandle>>,
1054 /// Snapshot pinned at txn-begin against `env`.
1055 pub(crate) snapshot: ReaderSnapshot<FileHandle>,
1056}
1057
1058/// One attached read-only database registered on the calling
1059/// [`crate::Db`] under a namespace.
1060///
1061/// Created by [`crate::Db::attach`]; stored inside the calling Db's
1062/// `attached: Arc<Mutex<HashMap<String, AttachedDb>>>` registry.
1063/// Removed by [`crate::Db::detach`] (or by the calling Db's drop,
1064/// which transitively drops all attachments).
1065pub(crate) struct AttachedDb {
1066 /// Cloned `Arc<TxnEnv>` of the attached db, so read transactions
1067 /// pin snapshots without re-locking the registry.
1068 pub(crate) env: Arc<TxnEnv<FileHandle>>,
1069 /// Calling-side keepalive for the attached `crate::Db`. The
1070 /// underscore prefix marks it as held-for-side-effect: dropping
1071 /// this `_db` releases file locks and any other resources the
1072 /// attached open acquired.
1073 pub(crate) _db: crate::Db,
1074}
1075
1076/// #93 — fold a committed txn's staged reconciled-collection names
1077/// into the shared per-process `reconciled` set. Called by
1078/// [`WriteTxn::commit`] only AFTER the WAL commit has landed, so the
1079/// shared cache is never poisoned by a rolled-back lazy-create.
1080///
1081/// A no-op (no lock acquired) when nothing was staged — the common
1082/// path for a txn that opened only already-reconciled collections, so
1083/// the fast "already reconciled" path pays nothing extra.
1084///
1085/// A poisoned `reconciled` mutex maps to [`Error::Busy`] (Rule 7); the
1086/// loop is bounded by the staged-name count (one entry per distinct
1087/// collection lazily reconciled in the txn — Rule 2).
1088fn promote_reconciled(reconciled: &Mutex<HashSet<String>>, staged: HashSet<String>) -> Result<()> {
1089 if staged.is_empty() {
1090 return Ok(());
1091 }
1092 let mut shared = reconciled.lock().map_err(|_| Error::Busy {
1093 kind: obj_core::LockKind::WriterInProcess,
1094 })?;
1095 shared.extend(staged);
1096 Ok(())
1097}
1098
1099/// Acquire the catalog mutex; convert a poison error into a
1100/// `WriterInProcess` Busy. Helper shared by the public txn wrappers
1101/// and the [`Collection`] internals.
1102pub(crate) fn lock_catalog(
1103 catalog: &Mutex<Catalog<FileHandle>>,
1104) -> Result<std::sync::MutexGuard<'_, Catalog<FileHandle>>> {
1105 catalog.lock().map_err(|_| Error::Busy {
1106 kind: obj_core::LockKind::WriterInProcess,
1107 })
1108}
1109
1110// ---------- raw-bytes helpers (FFI) -------------------------------
1111
1112/// Reject a write against a namespaced collection. Attached
1113/// databases are read-only through the calling Db (M11 #93).
1114fn reject_namespaced_write(collection: &str) -> Result<()> {
1115 if let (Some(namespace), tail) = crate::db::split_namespace(collection) {
1116 return Err(Error::AttachedDatabaseIsReadOnly {
1117 namespace: namespace.to_owned(),
1118 collection: tail.to_owned(),
1119 });
1120 }
1121 Ok(())
1122}
1123
1124/// Acquire the env's pager mutex; map poison into Busy.
1125fn lock_pager(env: &TxnEnv<FileHandle>) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
1126 env.pager().lock().map_err(|_| Error::Busy {
1127 kind: obj_core::LockKind::WriterInProcess,
1128 })
1129}
1130
1131/// Collect the de-duplicated set of index names touched by either the
1132/// `old` (remove) or `new` (add) entry list, preserving first-seen
1133/// order. Bounded by `old.len() + new.len()` — the caller's supplied
1134/// entry count (Rule 2).
1135fn touched_index_names(old: &[(String, Vec<u8>)], new: &[(String, Vec<u8>)]) -> Vec<String> {
1136 let mut names: Vec<String> = Vec::new();
1137 for (name, _key) in old.iter().chain(new.iter()) {
1138 if !names.iter().any(|n| n == name) {
1139 names.push(name.clone());
1140 }
1141 }
1142 names
1143}
1144
1145/// Gather every field-encoded key in `entries` whose index name
1146/// equals `index_name`, wrapped as [`EncodedIndexKey`] for the
1147/// composition seam. Order follows the caller's entry order.
1148fn keys_for_index(entries: &[(String, Vec<u8>)], index_name: &str) -> Vec<EncodedIndexKey> {
1149 entries
1150 .iter()
1151 .filter(|(name, _key)| name == index_name)
1152 .map(|(_name, key)| EncodedIndexKey::from_bytes(key.clone()))
1153 .collect()
1154}
1155
1156/// Resolve `index_name` to its `Active` descriptor index on
1157/// `descriptor`, then maintain that one index B-tree by diffing the
1158/// `old` field keys against the `new` ones through the shared
1159/// non-generic composition seam
1160/// ([`crate::index_maint::maintain_index_from_keys`]).
1161///
1162/// An unknown or non-`Active` index name is [`Error::IndexNotFound`]
1163/// — the raw write refuses rather than silently dropping the entry.
1164fn maintain_one_raw_index(
1165 pager: &mut Pager<FileHandle>,
1166 descriptor: &mut CollectionDescriptor,
1167 collection: &str,
1168 index_name: &str,
1169 old: &[(String, Vec<u8>)],
1170 new: &[(String, Vec<u8>)],
1171 id: Id,
1172) -> Result<()> {
1173 let idx = descriptor
1174 .indexes
1175 .iter()
1176 .position(|d| d.name == index_name && d.status == IndexStatus::Active)
1177 .ok_or_else(|| Error::IndexNotFound {
1178 collection: collection.to_owned(),
1179 name: index_name.to_owned(),
1180 })?;
1181 let spec = crate::index_maint::descriptor_to_spec(&descriptor.indexes[idx])?;
1182 let old_keys = keys_for_index(old, index_name);
1183 let new_keys = keys_for_index(new, index_name);
1184 crate::index_maint::maintain_index_from_keys(
1185 pager, descriptor, idx, collection, &spec, &old_keys, &new_keys, id,
1186 )
1187}
1188
1189/// Ensure a (raw-bytes) collection exists, lazy-creating an empty
1190/// primary B-tree on first call. Used by the C ABI's insert /
1191/// upsert paths. Distinct from `Collection::open_or_create` because
1192/// raw-bytes writes do NOT participate in the typed-reconciliation
1193/// path — no `T::indexes()` to walk.
1194fn ensure_collection_raw(
1195 inner: &obj_core::WriteTxn<'_, FileHandle>,
1196 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1197 name: &str,
1198) -> Result<CollectionDescriptor> {
1199 let mut pager = inner.lock_pager()?;
1200 let mut catalog_guard = lock_catalog(catalog)?;
1201 if let Some(d) = catalog_guard.get(&mut pager, name)? {
1202 return Ok(d);
1203 }
1204 let tree = BTree::<FileHandle>::empty(&mut pager)?;
1205 let descriptor = CollectionDescriptor::new(0, tree.root().get(), RAW_BYTES_TYPE_VERSION);
1206 let _id = catalog_guard.insert(&mut pager, name, descriptor)?;
1207 catalog_guard
1208 .get(&mut pager, name)?
1209 .ok_or(Error::Corruption { page_id: 0 })
1210}
1211
1212/// Look up the descriptor for `name`, returning
1213/// `Err(CollectionNotFound)` if absent. Used by `update` / `delete`
1214/// where lazy-create would mask the caller's mistake.
1215fn catalog_get_required(
1216 inner: &obj_core::WriteTxn<'_, FileHandle>,
1217 catalog: &Arc<Mutex<Catalog<FileHandle>>>,
1218 name: &str,
1219) -> Result<CollectionDescriptor> {
1220 let mut pager = inner.lock_pager()?;
1221 let catalog_guard = lock_catalog(catalog)?;
1222 catalog_guard
1223 .get(&mut pager, name)?
1224 .ok_or_else(|| Error::CollectionNotFound {
1225 name: name.to_owned(),
1226 })
1227}
1228
1229/// Snapshot-aware descriptor lookup on the read side. Returns
1230/// `Ok(None)` when the collection is absent at the snapshot's
1231/// pinned LSN.
1232fn read_descriptor_via_snapshot(
1233 env: &TxnEnv<FileHandle>,
1234 snapshot: &ReaderSnapshot<FileHandle>,
1235 name: &str,
1236) -> Result<Option<CollectionDescriptor>> {
1237 let pager = lock_pager(env)?;
1238 Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
1239}
1240
1241/// Open a primary-tree handle from a descriptor's `primary_root`.
1242fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
1243 let root_pid =
1244 PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1245 BTree::<FileHandle>::open(pager, root_pid)
1246}
1247
1248/// Wrap a raw payload with the on-disk [`DocumentHeader`] carrying
1249/// the caller-supplied `type_version`. The header stamps
1250/// `collection_id` (so a cross-collection forgery is detectable on
1251/// read), `type_version`, `payload_len`, and a CRC32C of the
1252/// payload.
1253///
1254/// The on-disk bytes produced here are byte-identical to what
1255/// [`obj_core::codec::encode`] emits for the same logical payload +
1256/// `type_version` — this is the key plumbing point that closes the
1257/// cross-language header-level interop gap (#13).
1258///
1259/// Returns [`Error::DocumentTooLarge`] if `payload.len() + 16` would
1260/// not fit inline in a B-tree leaf — mirrors
1261/// [`obj_core::codec::encode`]'s overflow handling.
1262fn wrap_raw_payload_with_version(
1263 collection_id: u32,
1264 payload: &[u8],
1265 type_version: u32,
1266) -> Result<Vec<u8>> {
1267 let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
1268 len: payload.len(),
1269 max: MAX_INLINE_DOC,
1270 })?;
1271 let total = DOC_HEADER_SIZE
1272 .checked_add(payload.len())
1273 .ok_or(Error::DocumentTooLarge {
1274 len: usize::MAX,
1275 max: MAX_INLINE_DOC,
1276 })?;
1277 if total > MAX_INLINE_DOC {
1278 return Err(Error::DocumentTooLarge {
1279 len: total,
1280 max: MAX_INLINE_DOC,
1281 });
1282 }
1283 let header = DocumentHeader {
1284 collection_id,
1285 type_version,
1286 payload_len,
1287 payload_crc32c: crc32c(payload),
1288 };
1289 let mut out = Vec::with_capacity(total);
1290 header.write_to(&mut out);
1291 out.extend_from_slice(payload);
1292 Ok(out)
1293}
1294
1295/// Strip the per-doc header and return `(payload, type_version)`.
1296/// Validates the header's `collection_id`, total length, and
1297/// payload CRC32C — surfaces [`Error::Corruption`] /
1298/// [`Error::CollectionIdMismatch`] on any mismatch. Exposes the
1299/// stored `type_version` so the typed read path can dispatch on
1300/// it directly.
1301fn strip_raw_payload_with_version(
1302 bytes: &[u8],
1303 expected_collection_id: u32,
1304) -> Result<(Vec<u8>, u32)> {
1305 let header = DocumentHeader::read_from(bytes)?;
1306 if header.collection_id != expected_collection_id {
1307 return Err(Error::CollectionIdMismatch {
1308 expected: expected_collection_id,
1309 found: header.collection_id,
1310 });
1311 }
1312 let payload_len =
1313 usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
1314 let total = DOC_HEADER_SIZE
1315 .checked_add(payload_len)
1316 .ok_or(Error::Corruption { page_id: 0 })?;
1317 if bytes.len() != total {
1318 return Err(Error::Corruption { page_id: 0 });
1319 }
1320 let payload = &bytes[DOC_HEADER_SIZE..total];
1321 if crc32c(payload) != header.payload_crc32c {
1322 return Err(Error::Corruption { page_id: 0 });
1323 }
1324 Ok((payload.to_vec(), header.type_version))
1325}
1326
1327/// Count every entry in a primary B-tree without decoding the
1328/// records. Used by the FFI `count_all_raw` path.
1329///
1330/// Snapshot-pinned (M12 #12): the full-tree scan resolves every page
1331/// read as-of the read txn's `snapshot`, so a concurrent writer's
1332/// post-snapshot inserts/deletes cannot perturb the count — it stays
1333/// consistent with the read txn's pinned LSN.
1334fn count_via_btree_range_full(
1335 env: &TxnEnv<FileHandle>,
1336 snapshot: &ReaderSnapshot<FileHandle>,
1337 primary_root: u64,
1338) -> Result<u64> {
1339 let pager = lock_pager(env)?;
1340 let root = PageId::new(primary_root)
1341 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1342 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, ..)?;
1343 let mut n: u64 = 0;
1344 for step in iter {
1345 let _ = step?;
1346 n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
1347 reason: "primary tree entry count exceeds u64",
1348 })?;
1349 }
1350 Ok(n)
1351}
1352
1353/// Walk an index B-tree by raw-byte key range and return every
1354/// (`full_key`, `value_bytes`) entry inside the range. Used by the
1355/// FFI `index_range_raw` / `count_index_range_raw` paths.
1356///
1357/// Snapshot-pinned (M12 #12): the descent and leaf-scan resolve every
1358/// page read as-of the read txn's `snapshot`, so a concurrent writer's
1359/// post-snapshot index entries do not leak into the range/count — the
1360/// enumeration stays consistent with the read txn's pinned LSN.
1361fn collect_index_range_entries(
1362 env: &TxnEnv<FileHandle>,
1363 snapshot: &ReaderSnapshot<FileHandle>,
1364 index_root: u64,
1365 start: std::ops::Bound<Vec<u8>>,
1366 end: std::ops::Bound<Vec<u8>>,
1367) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1368 let pager = lock_pager(env)?;
1369 let root =
1370 PageId::new(index_root).ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
1371 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, (start, end))?;
1372 let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1373 for step in iter {
1374 out.push(step?);
1375 }
1376 Ok(out)
1377}
1378
1379/// Resolve a list of (`full_key`, `value_bytes`) index entries into
1380/// `(Id, raw_payload)` pairs. For `Unique` indexes the id is the
1381/// VALUE; for other kinds it is the trailing 8 bytes of the key
1382/// (see `docs/format.md` § Index key encoding).
1383///
1384/// Snapshot-aware: each primary lookup goes through the read
1385/// txn's pinned snapshot so the result set is consistent across
1386/// the call.
1387fn materialize_id_payload_pairs(
1388 env: &TxnEnv<FileHandle>,
1389 snapshot: &ReaderSnapshot<FileHandle>,
1390 collection: &CollectionDescriptor,
1391 index: &obj_core::IndexDescriptor,
1392 entries: Vec<(Vec<u8>, Vec<u8>)>,
1393) -> Result<Vec<(Id, Vec<u8>)>> {
1394 let mut out: Vec<(Id, Vec<u8>)> = Vec::with_capacity(entries.len());
1395 let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
1396 let primary_root = PageId::new(collection.primary_root)
1397 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
1398 let pager = lock_pager(env)?;
1399 for (full_key, value) in entries {
1400 let id_u64 = if index.kind == obj_core::IndexKind::Unique {
1401 Id::from_be_bytes(&value)
1402 .ok_or(Error::Corruption { page_id: 0 })?
1403 .get()
1404 } else {
1405 if full_key.len() < 8 {
1406 return Err(Error::Corruption { page_id: 0 });
1407 }
1408 Id::from_be_bytes(&full_key[full_key.len() - 8..])
1409 .ok_or(Error::Corruption { page_id: 0 })?
1410 .get()
1411 };
1412 if index.kind == obj_core::IndexKind::Each && !emitted.insert(id_u64) {
1413 continue;
1414 }
1415 if index.kind != obj_core::IndexKind::Each {
1416 emitted.insert(id_u64);
1417 }
1418 let id = Id::try_new(id_u64).ok_or(Error::Corruption { page_id: 0 })?;
1419 let primary_bytes = BTree::<FileHandle>::get_via_snapshot(
1420 &pager,
1421 snapshot,
1422 primary_root,
1423 &id.to_be_bytes(),
1424 )?
1425 .ok_or(Error::Corruption { page_id: 0 })?;
1426 let (payload, _version) =
1427 strip_raw_payload_with_version(&primary_bytes, collection.collection_id)?;
1428 out.push((id, payload));
1429 }
1430 Ok(out)
1431}