obj_core/catalog.rs
1//! Catalog (L5) — on-disk registry of collections.
2//!
3//! See `docs/format.md` § Catalog for the authoritative on-disk
4//! shape. The catalog is a B+tree (M4 data structure) keyed by
5//! collection name and valued by postcard-encoded
6//! [`CollectionDescriptor`]s. Its root page-id is recorded in the
7//! file header field `root_catalog`; the M2..M4 default of zero
8//! signals "no catalog yet" and [`Catalog::open_or_init`] creates
9//! one on first open.
10//!
11//! # Power-of-ten posture
12//!
13//! - **Rule 1.** Catalog walks go through the M4 B+tree API which
14//! uses an explicit stack — no recursion in this module.
15//! - **Rule 2.** [`Catalog::list_collections`] is bounded by
16//! [`MAX_COLLECTIONS`]; exceeding the bound surfaces as
17//! [`Error::BTreeScanLimitExceeded`].
18//! - **Rule 5.** Reserved-row presence is debug-asserted; the
19//! `try_into` chain in `id_from_bytes` is the runtime boundary.
20//! - **Rule 7.** Every fallible step propagates via `?`; no
21//! `unwrap` on the production path.
22//! - **Rule 9.** No `dyn` — the catalog is generic over `F:
23//! FileBackend` and the B+tree it owns is monomorphised.
24
25#![forbid(unsafe_code)]
26
27use serde::{Deserialize, Serialize};
28
29use crate::btree::node::{decode_node, NodeKind};
30use crate::btree::{choose_child, BTree, MAX_BTREE_DEPTH};
31use crate::error::{Error, Result};
32use crate::id::{bump_next_id, Id};
33use crate::index::{IndexKind, IndexSpec};
34use crate::pager::page::PageId;
35use crate::pager::{Pager, ReaderSnapshot};
36use crate::platform::{FileBackend, FileHandle};
37
38use heapless::Vec as HeaplessVec;
39
40/// Maximum number of collections a single catalog may carry. Bounds
41/// [`Catalog::list_collections`] (Rule 2) and the
42/// next-collection-id allocator below. 1 << 20 (1 048 576) is a
43/// generous ceiling — at 64-byte descriptor payloads the catalog
44/// would still fit in ~64 MiB.
45pub const MAX_COLLECTIONS: usize = 1 << 20;
46
47/// The reserved catalog-name (empty UTF-8 bytes) under which the
48/// next-collection-id watermark is stored. Empty names are
49/// rejected on user-facing `insert`, so this row is private to the
50/// catalog implementation.
51const RESERVED_NEXT_ID_KEY: &[u8] = b"";
52
53/// On-disk description of a collection.
54///
55/// Encoded with `postcard` as the value of a catalog B-tree row.
56/// The exact shape is documented in `docs/format.md` § Catalog.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct CollectionDescriptor {
59 /// Catalog-assigned numeric id for this collection.
60 pub collection_id: u32,
61 /// Page-id of the collection's primary B-tree root.
62 pub primary_root: u64,
63 /// Current `Document::VERSION` for the collection's type.
64 pub type_version: u32,
65 /// Next-id watermark — the next [`Id`] the allocator will
66 /// hand out for this collection.
67 pub next_id: u64,
68 /// Secondary indexes. Empty in M5; populated in M7.
69 pub indexes: Vec<IndexDescriptor>,
70}
71
72impl CollectionDescriptor {
73 /// Construct a descriptor for a freshly-registered collection.
74 /// `primary_root` is the page-id of the collection's empty
75 /// primary B-tree (allocated by the caller before
76 /// [`Catalog::insert`]); `collection_id` is the value the
77 /// catalog will assign.
78 #[must_use]
79 pub const fn new(collection_id: u32, primary_root: u64, type_version: u32) -> Self {
80 Self {
81 collection_id,
82 primary_root,
83 type_version,
84 next_id: 1,
85 indexes: Vec::new(),
86 }
87 }
88}
89
90/// On-disk descriptor for a secondary index attached to a
91/// collection.
92///
93/// Persisted inside the owning [`CollectionDescriptor::indexes`]
94/// vector as part of the catalog row's postcard payload. The wire
95/// shape is documented in `docs/format.md` § Indexes; format-minor
96/// bumped to 2 in M7.
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct IndexDescriptor {
99 /// Catalog-assigned numeric id for this index. Stable across
100 /// reopens; **never reused** — a `DroppedPending` descriptor
101 /// retains its `index_id` so the page reclamation pass on the
102 /// next checkpoint does not race with concurrent readers.
103 pub index_id: u32,
104 /// User-visible name. Stable across reopens; the reconciler
105 /// matches a runtime [`IndexSpec`] to a stored descriptor by
106 /// this name.
107 pub name: String,
108 /// Discriminator for the kind of index. See
109 /// [`crate::index::IndexKind`].
110 pub kind: IndexKind,
111 /// Field path(s) the index is keyed by. Single-element for
112 /// `Standard` / `Unique` / `Each`; ≥ 2 for `Composite`.
113 pub key_paths: Vec<String>,
114 /// Page-id of the index B+tree's root.
115 pub root_page_id: u64,
116 /// Lifecycle status — see [`IndexStatus`].
117 pub status: IndexStatus,
118}
119
120/// Lifecycle state of an [`IndexDescriptor`].
121///
122/// `Active` indexes participate in writes (#58) and reads (#60);
123/// `DroppedPending` is a tombstone — the descriptor lingers so the
124/// `index_id` is not reused and the next `Pager::checkpoint` can
125/// reclaim the B+tree pages.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[repr(u8)]
128pub enum IndexStatus {
129 /// Index is live — every write maintains it, every read may
130 /// consult it.
131 Active = 0,
132 /// Index was dropped by the reconciler; the descriptor remains
133 /// so the `index_id` is not reused. Pages are reclaimed on the
134 /// next `Pager::checkpoint`.
135 DroppedPending = 1,
136}
137
138/// The catalog handle.
139///
140/// Owns the catalog B+tree's root page-id; mutating methods take
141/// `&mut Pager<F>` to advance the underlying B-tree through
142/// copy-on-write.
143#[derive(Debug)]
144pub struct Catalog<F: FileBackend = FileHandle> {
145 tree: BTree<F>,
146 /// Cached watermark for the next `collection_id` to allocate.
147 /// Loaded from the reserved catalog row on open and re-persisted
148 /// on every change.
149 next_collection_id: u32,
150}
151
152impl<F: FileBackend> Catalog<F> {
153 /// Open the catalog, creating it on first call.
154 ///
155 /// Reads the file header's `root_catalog` field via
156 /// [`Pager::root_catalog`]:
157 ///
158 /// - If non-zero, attaches to the existing catalog B-tree and
159 /// loads the next-collection-id watermark from the reserved
160 /// row.
161 /// - If zero, allocates a fresh B+tree root, seeds the reserved
162 /// row with `1`, persists the root via
163 /// [`Pager::set_root_catalog`], and commits.
164 ///
165 /// # Errors
166 ///
167 /// - [`Error::Corruption`] if an existing catalog's reserved
168 /// row is missing or malformed.
169 /// - Pager / B-tree errors propagated as-is.
170 pub fn open_or_init(pager: &mut Pager<F>) -> Result<Self> {
171 let raw = pager.root_catalog();
172 if let Some(existing) = PageId::new(raw) {
173 return Self::open_existing(pager, existing);
174 }
175 Self::init_fresh(pager)
176 }
177
178 fn open_existing(pager: &mut Pager<F>, root: PageId) -> Result<Self> {
179 let tree = BTree::<F>::open(pager, root)?;
180 let watermark = match tree.get(pager, RESERVED_NEXT_ID_KEY)? {
181 Some(bytes) => postcard::from_bytes::<u32>(&bytes).map_err(Error::from)?,
182 None => {
183 // The reserved row MUST exist on every previously-
184 // initialised catalog; absence is corruption.
185 return Err(Error::Corruption {
186 page_id: root.get(),
187 });
188 }
189 };
190 Ok(Self {
191 tree,
192 next_collection_id: watermark,
193 })
194 }
195
196 fn init_fresh(pager: &mut Pager<F>) -> Result<Self> {
197 let mut tree = BTree::<F>::empty(pager)?;
198 // Seed the reserved next-collection-id row with `1` — zero is
199 // reserved as a sentinel "no collection" (mirrors `Id`).
200 let watermark: u32 = 1;
201 let encoded = postcard::to_allocvec(&watermark)?;
202 tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
203 pager.set_root_catalog(tree.root().get())?;
204 Ok(Self {
205 tree,
206 next_collection_id: watermark,
207 })
208 }
209
210 /// Get the descriptor for the named collection. Returns
211 /// `Ok(None)` if no such collection exists.
212 ///
213 /// # Errors
214 ///
215 /// - [`Error::InvalidArgument`] if `name` is empty.
216 /// - Pager / B-tree / postcard errors propagated as-is.
217 pub fn get(&self, pager: &mut Pager<F>, name: &str) -> Result<Option<CollectionDescriptor>> {
218 validate_name(name)?;
219 match self.tree.get(pager, name.as_bytes())? {
220 Some(bytes) => {
221 let descriptor: CollectionDescriptor =
222 postcard::from_bytes(&bytes).map_err(Error::from)?;
223 Ok(Some(descriptor))
224 }
225 None => Ok(None),
226 }
227 }
228
229 /// Look up a collection descriptor as-of a [`ReaderSnapshot`]'s
230 /// pinned LSN — i.e. observe the catalog state the reader's
231 /// snapshot pinned, NOT the writer's live `Catalog.tree.root`.
232 ///
233 /// Walks the catalog B+tree rooted at `snapshot.root_catalog()`
234 /// (the value captured by `Pager::reader_snapshot` at pin time;
235 /// see M6 #51). Every page read goes through
236 /// [`ReaderSnapshot::read_page`], which consults the snapshot's
237 /// frozen WAL view first and falls through to the main file —
238 /// bypassing the live WAL overlay that may have been advanced by
239 /// a concurrent writer since pin time. This is the M6 #53 fix:
240 /// without it, a reader's catalog descend can land on a freelist-
241 /// recycled page-id whose `state.view` contents are no longer a
242 /// valid B+tree node, surfacing as `Error::Corruption { page_id:
243 /// 0 }` from the codec.
244 ///
245 /// When `snapshot.root_catalog() == 0` the catalog did not exist
246 /// at the snapshot's pinned LSN; `Ok(None)` is returned and the
247 /// caller should surface that as `Error::CollectionNotFound`.
248 ///
249 /// # Errors
250 ///
251 /// - [`Error::InvalidArgument`] if `name` is empty.
252 /// - [`Error::BTreeDepthExceeded`] if the catalog B+tree exceeds
253 /// `MAX_BTREE_DEPTH` (Rule 1 bound on the descend stack).
254 /// - [`Error::Corruption`] / [`Error::Codec`] propagated from the
255 /// snapshot read and postcard decode.
256 pub fn lookup_via_snapshot(
257 pager: &Pager<F>,
258 snapshot: &ReaderSnapshot<F>,
259 name: &str,
260 ) -> Result<Option<CollectionDescriptor>> {
261 validate_name(name)?;
262 let Some(root) = PageId::new(snapshot.root_catalog()) else {
263 // root_catalog == 0 means the catalog did not exist at
264 // the snapshot's pinned LSN. Nothing to look up.
265 return Ok(None);
266 };
267 let key = name.as_bytes();
268 // Descend root → leaf via the snapshot's read_page.
269 // `heapless::Vec` for the descent stack (Rule 1 + Rule 3).
270 let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
271 let mut current = root;
272 let leaf_node = loop {
273 if path.push(current).is_err() {
274 return Err(Error::BTreeDepthExceeded {
275 limit: MAX_BTREE_DEPTH,
276 });
277 }
278 let page = snapshot.read_page(pager, current)?;
279 let decoded = decode_node(page.as_bytes())?;
280 match decoded.kind {
281 NodeKind::Leaf => break decoded,
282 NodeKind::Internal => {
283 current = choose_child(&decoded, key)?;
284 }
285 }
286 };
287 // Linear scan of the leaf's entries — the leaf is bounded by
288 // the slot-directory capacity, so this loop is statically
289 // bounded by `LEAF_SLOT_CAP` (Rule 2).
290 for entry in &leaf_node.leaves {
291 if entry.key.as_slice() == key {
292 let descriptor: CollectionDescriptor =
293 postcard::from_bytes(&entry.value).map_err(Error::from)?;
294 return Ok(Some(descriptor));
295 }
296 }
297 Ok(None)
298 }
299
300 /// Register a new collection.
301 ///
302 /// Allocates the next `collection_id`, sets it on `descriptor`,
303 /// re-persists the next-collection-id watermark, and inserts
304 /// the descriptor into the catalog B-tree. The descriptor that
305 /// the caller passes in has its `collection_id` field
306 /// **ignored** — the catalog assigns the canonical value.
307 ///
308 /// Call [`Pager::commit`] after this to make the registration
309 /// durable.
310 ///
311 /// # Errors
312 ///
313 /// - [`Error::InvalidArgument`] if `name` is empty.
314 /// - [`Error::CollectionAlreadyExists`] if `name` is already
315 /// registered.
316 /// - [`Error::IdSpaceExhausted`] if the `u32` `collection_id`
317 /// space is exhausted.
318 /// - Pager / B-tree / postcard errors propagated.
319 pub fn insert(
320 &mut self,
321 pager: &mut Pager<F>,
322 name: &str,
323 mut descriptor: CollectionDescriptor,
324 ) -> Result<u32> {
325 // Rule 5 — M6 #51: catalog mutations must run inside a WAL
326 // transaction so the `set_root_catalog` header update and
327 // the B-tree page writes commit atomically. The pre-M6.5
328 // bug was a `set_root_catalog` that wrote direct-to-disk
329 // outside the WAL; this assertion defends against that
330 // class of regression. Memory pagers have no WAL and
331 // satisfy `in_txn` vacuously.
332 debug_assert!(
333 pager.in_txn(),
334 "Catalog::insert must run inside a WAL transaction \
335 (Pager::begin_txn / WriteTxn::begin)",
336 );
337 validate_name(name)?;
338 if self.tree.get(pager, name.as_bytes())?.is_some() {
339 return Err(Error::CollectionAlreadyExists {
340 name: name.to_owned(),
341 });
342 }
343 let assigned = self.next_collection_id;
344 descriptor.collection_id = assigned;
345 let new_watermark =
346 self.next_collection_id
347 .checked_add(1)
348 .ok_or_else(|| Error::IdSpaceExhausted {
349 collection: "<catalog>".to_owned(),
350 })?;
351 let encoded = postcard::to_allocvec(&descriptor)?;
352 self.tree.insert(pager, name.as_bytes(), &encoded)?;
353 // Update the reserved next-collection-id row. B+tree inserts
354 // do not handle "key exists" updates — delete then re-insert.
355 self.persist_next_collection_id(pager, new_watermark)?;
356 // Catalog root may have changed (every B+tree mutation is COW).
357 pager.set_root_catalog(self.tree.root().get())?;
358 self.next_collection_id = new_watermark;
359 Ok(assigned)
360 }
361
362 /// Update an existing collection's descriptor in place.
363 ///
364 /// Used when `next_id` advances, `type_version` is bumped, or
365 /// secondary indexes change. The on-disk `collection_id` is
366 /// preserved across the update; callers should not change it.
367 ///
368 /// # Errors
369 ///
370 /// - [`Error::InvalidArgument`] if `name` is empty.
371 /// - [`Error::Corruption`] if the descriptor's `collection_id`
372 /// disagrees with the catalog's record (defensive check —
373 /// indicates a caller bug).
374 /// - Pager / B-tree / postcard errors propagated.
375 pub fn update(
376 &mut self,
377 pager: &mut Pager<F>,
378 name: &str,
379 descriptor: &CollectionDescriptor,
380 ) -> Result<()> {
381 // Rule 5 — see `Catalog::insert` for the rationale.
382 debug_assert!(
383 pager.in_txn(),
384 "Catalog::update must run inside a WAL transaction",
385 );
386 validate_name(name)?;
387 let existing = self.get(pager, name)?.ok_or(Error::InvalidArgument(
388 "catalog update: collection not registered",
389 ))?;
390 if existing.collection_id != descriptor.collection_id {
391 return Err(Error::Corruption {
392 page_id: self.tree.root().get(),
393 });
394 }
395 let encoded = postcard::to_allocvec(descriptor)?;
396 self.tree.delete(pager, name.as_bytes())?;
397 self.tree.insert(pager, name.as_bytes(), &encoded)?;
398 pager.set_root_catalog(self.tree.root().get())?;
399 Ok(())
400 }
401
402 /// Declare a new secondary index on the named collection.
403 ///
404 /// Allocates a fresh `index_id`, an empty index B+tree (M4),
405 /// and appends a new `IndexDescriptor { status: Active }` to
406 /// the collection's `indexes` vector. The mutation rides inside
407 /// the caller's WAL transaction; on rollback the descriptor +
408 /// the empty B+tree are both discarded.
409 ///
410 /// `spec` is validated before any state mutation; an invalid
411 /// spec surfaces as [`Error::InvalidArgument`] before the
412 /// catalog touches the pager.
413 ///
414 /// # Errors
415 ///
416 /// - [`Error::InvalidArgument`] if `spec.validate()` rejects
417 /// the spec.
418 /// - [`Error::CollectionNotFound`] if `collection` is not
419 /// registered.
420 /// - [`Error::IndexKindMismatch`] if an `Active` descriptor of
421 /// the same name has a different `(kind, key_paths)`.
422 /// - [`Error::IdSpaceExhausted`] on `u32` `index_id` wraparound
423 /// (defense-in-depth — practically unreachable).
424 /// - Pager / B-tree / postcard errors propagated.
425 pub fn declare_index(
426 &mut self,
427 pager: &mut Pager<F>,
428 collection: &str,
429 spec: &IndexSpec,
430 ) -> Result<u32> {
431 debug_assert!(
432 pager.in_txn(),
433 "Catalog::declare_index must run inside a WAL transaction",
434 );
435 spec.validate()?;
436 validate_name(collection)?;
437 let mut descriptor =
438 self.get(pager, collection)?
439 .ok_or_else(|| Error::CollectionNotFound {
440 name: collection.to_owned(),
441 })?;
442 if let Some(existing) = descriptor.indexes.iter().find(|d| d.name == spec.name) {
443 return Self::reconcile_existing_index(existing, spec);
444 }
445 let index_id = next_index_id(&descriptor)?;
446 let root_page_id = BTree::<F>::empty(pager)?.root().get();
447 let new_descriptor = IndexDescriptor {
448 index_id,
449 name: spec.name.clone(),
450 kind: spec.kind,
451 key_paths: spec.key_paths.clone(),
452 root_page_id,
453 status: IndexStatus::Active,
454 };
455 descriptor.indexes.push(new_descriptor);
456 self.update(pager, collection, &descriptor)?;
457 Ok(index_id)
458 }
459
460 /// Reconcile a runtime [`IndexSpec`] against an already-stored
461 /// `IndexDescriptor` of the same name. Returns the existing
462 /// `index_id` if the `(kind, key_paths)` match; errors otherwise.
463 fn reconcile_existing_index(existing: &IndexDescriptor, spec: &IndexSpec) -> Result<u32> {
464 if existing.kind != spec.kind {
465 return Err(Error::IndexKindMismatch {
466 name: spec.name.clone(),
467 expected: spec.kind,
468 found: existing.kind,
469 });
470 }
471 if existing.key_paths != spec.key_paths {
472 return Err(Error::IndexKeyPathsMismatch {
473 name: spec.name.clone(),
474 });
475 }
476 // Re-activating a `DroppedPending` index is a deliberate
477 // re-declare — but M7 leaves the descriptor as-is; the
478 // reconciler will toggle it back to Active in a follow-up
479 // commit if needed. For now: existing match → idempotent.
480 Ok(existing.index_id)
481 }
482
483 /// Reconcile the runtime [`IndexSpec`] set for a collection
484 /// against the catalog's stored descriptors.
485 ///
486 /// - Specs present in `specs` and absent from the descriptor are
487 /// **declared** (new `Active` descriptor + empty B+tree).
488 /// - `Active` descriptors absent from `specs` are flipped to
489 /// `DroppedPending`.
490 /// - Matching `(name, kind, key_paths)` pairs are left alone —
491 /// reconciliation is **idempotent**.
492 ///
493 /// Returns the descriptor's post-reconciliation index roster (a
494 /// `Vec<IndexDescriptor>` clone) so the caller can build its
495 /// maintenance plan without re-querying the catalog.
496 ///
497 /// # Errors
498 ///
499 /// - [`Error::IndexKindMismatch`] /
500 /// [`Error::IndexKeyPathsMismatch`] on per-name structural
501 /// disagreement.
502 /// - [`Error::IdSpaceExhausted`] on `u32` `index_id` wraparound.
503 /// - Pager / B-tree / postcard errors propagated.
504 pub fn reconcile_indexes(
505 &mut self,
506 pager: &mut Pager<F>,
507 collection: &str,
508 specs: &[IndexSpec],
509 ) -> Result<Vec<IndexDescriptor>> {
510 debug_assert!(
511 pager.in_txn(),
512 "Catalog::reconcile_indexes must run inside a WAL transaction",
513 );
514 validate_name(collection)?;
515 for spec in specs {
516 spec.validate()?;
517 }
518 // Stage 1: declare missing (and verify-match-or-error
519 // existing).
520 for spec in specs {
521 let _ = self.declare_index(pager, collection, spec)?;
522 }
523 // Stage 2: drop active descriptors whose name no longer
524 // appears in `specs`.
525 let descriptor = self
526 .get(pager, collection)?
527 .ok_or_else(|| Error::CollectionNotFound {
528 name: collection.to_owned(),
529 })?;
530 let mut to_drop: Vec<String> = Vec::new();
531 for d in &descriptor.indexes {
532 if d.status == IndexStatus::Active && !specs.iter().any(|s| s.name == d.name) {
533 to_drop.push(d.name.clone());
534 }
535 }
536 for name in to_drop {
537 self.drop_index(pager, collection, &name)?;
538 }
539 // Re-read for the post-reconciliation snapshot.
540 let final_descriptor =
541 self.get(pager, collection)?
542 .ok_or_else(|| Error::CollectionNotFound {
543 name: collection.to_owned(),
544 })?;
545 Ok(final_descriptor.indexes)
546 }
547
548 /// Drop the named index from the named collection — flips the
549 /// descriptor's status to [`IndexStatus::DroppedPending`].
550 ///
551 /// The descriptor stays in the catalog so its `index_id` is not
552 /// reused. The index B+tree pages are reclaimed on the next
553 /// [`Pager::checkpoint`] pass (deferred reclamation: a
554 /// concurrent reader's snapshot may still need to walk the
555 /// pages until its pin is released).
556 ///
557 /// # Errors
558 ///
559 /// - [`Error::CollectionNotFound`] if `collection` is not
560 /// registered.
561 /// - [`Error::IndexNotFound`] if `index_name` is not a known
562 /// descriptor on the collection.
563 pub fn drop_index(
564 &mut self,
565 pager: &mut Pager<F>,
566 collection: &str,
567 index_name: &str,
568 ) -> Result<()> {
569 debug_assert!(
570 pager.in_txn(),
571 "Catalog::drop_index must run inside a WAL transaction",
572 );
573 validate_name(collection)?;
574 let mut descriptor =
575 self.get(pager, collection)?
576 .ok_or_else(|| Error::CollectionNotFound {
577 name: collection.to_owned(),
578 })?;
579 let entry = descriptor
580 .indexes
581 .iter_mut()
582 .find(|d| d.name == index_name)
583 .ok_or_else(|| Error::IndexNotFound {
584 collection: collection.to_owned(),
585 name: index_name.to_owned(),
586 })?;
587 if entry.status == IndexStatus::Active {
588 entry.status = IndexStatus::DroppedPending;
589 }
590 self.update(pager, collection, &descriptor)?;
591 Ok(())
592 }
593
594 /// Allocate the next [`Id`] for the named collection,
595 /// persisting the bumped `next_id` watermark inside the
596 /// catalog row.
597 ///
598 /// Re-reads the descriptor, bumps `next_id`, writes the new
599 /// descriptor back via [`Catalog::update`], and returns the
600 /// just-issued id.
601 ///
602 /// The id-bump is staged through the WAL exactly like every
603 /// other catalog mutation; if the caller's surrounding
604 /// transaction is later rolled back (no `Pager::commit`), the
605 /// allocation is rolled back with it — the next open will
606 /// re-issue the same id.
607 ///
608 /// # Errors
609 ///
610 /// - [`Error::InvalidArgument`] if `name` is empty or not
611 /// registered.
612 /// - [`Error::IdSpaceExhausted`] on `u64` wraparound.
613 /// - Pager / B-tree errors propagated.
614 pub fn next_id(&mut self, pager: &mut Pager<F>, name: &str) -> Result<Id> {
615 // Rule 5 — see `Catalog::insert` for the rationale.
616 debug_assert!(
617 pager.in_txn(),
618 "Catalog::next_id must run inside a WAL transaction",
619 );
620 validate_name(name)?;
621 let mut descriptor = self.get(pager, name)?.ok_or(Error::InvalidArgument(
622 "catalog next_id: collection not registered",
623 ))?;
624 // Use the bumper from `id` for the wraparound check. The
625 // closure builds the owned collection name on demand — it is
626 // only invoked on the wraparound / zero-watermark error path.
627 let issued = bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
628 self.update(pager, name, &descriptor)?;
629 Ok(issued)
630 }
631
632 /// List every registered collection.
633 ///
634 /// Scans the full catalog B-tree. Bounded by
635 /// [`MAX_COLLECTIONS`] (Rule 2). The reserved next-collection-id
636 /// row is filtered out.
637 ///
638 /// # Errors
639 ///
640 /// - [`Error::BTreeScanLimitExceeded`] if the catalog has more
641 /// than [`MAX_COLLECTIONS`] entries.
642 /// - Pager / B-tree / postcard errors propagated.
643 pub fn list_collections(
644 &self,
645 pager: &mut Pager<F>,
646 ) -> Result<Vec<(String, CollectionDescriptor)>> {
647 let mut out: Vec<(String, CollectionDescriptor)> = Vec::new();
648 let mut scanned = 0usize;
649 let iter = self.tree.range(pager, ..)?;
650 for entry in iter {
651 scanned += 1;
652 if scanned > MAX_COLLECTIONS {
653 return Err(Error::BTreeScanLimitExceeded {
654 limit: MAX_COLLECTIONS,
655 });
656 }
657 let (key, value) = entry?;
658 if key.as_slice() == RESERVED_NEXT_ID_KEY {
659 continue;
660 }
661 let name = String::from_utf8(key).map_err(|_| Error::Corruption {
662 page_id: self.tree.root().get(),
663 })?;
664 let descriptor: CollectionDescriptor =
665 postcard::from_bytes(&value).map_err(Error::from)?;
666 out.push((name, descriptor));
667 }
668 Ok(out)
669 }
670
671 fn persist_next_collection_id(&mut self, pager: &mut Pager<F>, watermark: u32) -> Result<()> {
672 let encoded = postcard::to_allocvec(&watermark)?;
673 // The reserved row already exists; delete + re-insert (B+tree
674 // inserts reject duplicates).
675 self.tree.delete(pager, RESERVED_NEXT_ID_KEY)?;
676 self.tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
677 Ok(())
678 }
679}
680
681/// Maximum collection-name length, in bytes (#43). 255 is a
682/// conservative, widely-compatible cap that comfortably exceeds any
683/// reasonable name yet keeps catalog keys bounded. Loosening this
684/// later is backward-compatible; tightening it would be breaking, so
685/// we pick a generous-but-finite bound at the 1.0 freeze.
686const MAX_COLLECTION_NAME_LEN: usize = 255;
687
688/// Validate a collection name (#43).
689///
690/// Policy:
691/// - Reject the empty string: it would collide with the reserved
692/// next-collection-id row and is a UX hazard (a Document with
693/// `COLLECTION = ""` would be invisible to `list_collections`).
694/// - Reject names longer than [`MAX_COLLECTION_NAME_LEN`] bytes so
695/// catalog keys stay bounded.
696/// - Reject names containing a NUL or any other ASCII/Unicode
697/// control character: such bytes are a portability and
698/// display-safety hazard (terminal injection, truncation at an
699/// embedded NUL on FFI boundaries).
700///
701/// All rejections surface as [`Error::InvalidArgument`].
702fn validate_name(name: &str) -> Result<()> {
703 if name.is_empty() {
704 return Err(Error::InvalidArgument("collection name must be non-empty"));
705 }
706 if name.len() > MAX_COLLECTION_NAME_LEN {
707 return Err(Error::InvalidArgument("collection name exceeds 255 bytes"));
708 }
709 if name.chars().any(char::is_control) {
710 return Err(Error::InvalidArgument(
711 "collection name must not contain NUL or control characters",
712 ));
713 }
714 Ok(())
715}
716
717/// Compute the next `index_id` for the collection. Scans the
718/// descriptor's existing indexes (including `DroppedPending` rows
719/// — their ids are NEVER reused, per `docs/format.md` § Indexes)
720/// and returns one past the max. Wraps with [`Error::IdSpaceExhausted`]
721/// on `u32::MAX`.
722fn next_index_id(descriptor: &CollectionDescriptor) -> Result<u32> {
723 let max = descriptor
724 .indexes
725 .iter()
726 .map(|d| d.index_id)
727 .max()
728 .unwrap_or(0);
729 max.checked_add(1).ok_or_else(|| Error::IdSpaceExhausted {
730 collection: format!("<indexes:{}>", descriptor.collection_id),
731 })
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use crate::pager::{Config, Pager};
738 use crate::platform::FileHandle;
739
740 fn fresh_pager() -> Pager<FileHandle> {
741 Pager::<FileHandle>::memory(Config::default()).expect("pager")
742 }
743
744 #[test]
745 fn validate_name_policy() {
746 // Accepted: a normal name and one exactly at the byte cap.
747 assert!(validate_name("users").is_ok());
748 let at_cap = "a".repeat(MAX_COLLECTION_NAME_LEN);
749 assert!(validate_name(&at_cap).is_ok());
750
751 // Rejected: empty.
752 assert!(matches!(validate_name(""), Err(Error::InvalidArgument(_))));
753 // Rejected: one byte over the cap.
754 let too_long = "a".repeat(MAX_COLLECTION_NAME_LEN + 1);
755 assert!(matches!(
756 validate_name(&too_long),
757 Err(Error::InvalidArgument(_))
758 ));
759 // Rejected: embedded NUL.
760 assert!(matches!(
761 validate_name("ab\0cd"),
762 Err(Error::InvalidArgument(_))
763 ));
764 // Rejected: other control characters (newline, tab, DEL).
765 for bad in ["line\nbreak", "tab\tname", "del\u{7f}name"] {
766 assert!(
767 matches!(validate_name(bad), Err(Error::InvalidArgument(_))),
768 "expected rejection for {bad:?}"
769 );
770 }
771 }
772
773 #[test]
774 fn open_or_init_on_fresh_pager_creates_catalog() {
775 let mut pager = fresh_pager();
776 assert_eq!(pager.root_catalog(), 0);
777 let _catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
778 assert_ne!(pager.root_catalog(), 0, "catalog root must be installed");
779 }
780
781 #[test]
782 fn insert_and_get_round_trip() {
783 let mut pager = fresh_pager();
784 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
785 let primary_root = BTree::<FileHandle>::empty(&mut pager)
786 .expect("primary tree")
787 .root();
788 let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
789 let assigned = catalog
790 .insert(&mut pager, "users", descriptor.clone())
791 .expect("insert users");
792 assert_eq!(assigned, 1, "first collection gets id 1");
793
794 let back = catalog
795 .get(&mut pager, "users")
796 .expect("get")
797 .expect("present");
798 assert_eq!(back.collection_id, assigned);
799 assert_eq!(back.primary_root, primary_root.get());
800 assert_eq!(back.type_version, 1);
801 assert_eq!(back.next_id, 1);
802 assert!(back.indexes.is_empty());
803 }
804
805 #[test]
806 fn duplicate_insert_errors() {
807 let mut pager = fresh_pager();
808 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
809 let primary_root = BTree::<FileHandle>::empty(&mut pager)
810 .expect("primary tree")
811 .root();
812 let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
813 catalog
814 .insert(&mut pager, "users", descriptor.clone())
815 .expect("first insert");
816 let err = catalog
817 .insert(&mut pager, "users", descriptor)
818 .expect_err("dup");
819 assert!(matches!(err, Error::CollectionAlreadyExists { ref name } if name == "users"));
820 }
821
822 #[test]
823 fn next_id_advances_and_persists() {
824 let mut pager = fresh_pager();
825 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
826 let primary_root = BTree::<FileHandle>::empty(&mut pager)
827 .expect("primary tree")
828 .root();
829 let _id = catalog
830 .insert(
831 &mut pager,
832 "users",
833 CollectionDescriptor::new(0, primary_root.get(), 1),
834 )
835 .expect("insert");
836 let id1 = catalog.next_id(&mut pager, "users").expect("next 1");
837 let id2 = catalog.next_id(&mut pager, "users").expect("next 2");
838 assert_eq!(id1.get(), 1);
839 assert_eq!(id2.get(), 2);
840 let descriptor = catalog
841 .get(&mut pager, "users")
842 .expect("get")
843 .expect("present");
844 assert_eq!(descriptor.next_id, 3, "next_id watermark advanced");
845 }
846
847 #[test]
848 fn cross_collection_id_isolation() {
849 let mut pager = fresh_pager();
850 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
851 let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
852 let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
853 catalog
854 .insert(&mut pager, "a", CollectionDescriptor::new(0, p1.get(), 1))
855 .expect("a");
856 catalog
857 .insert(&mut pager, "b", CollectionDescriptor::new(0, p2.get(), 1))
858 .expect("b");
859 // Allocate 3 ids in "a"; "b" should still report next_id = 1.
860 let _ = catalog.next_id(&mut pager, "a").expect("a1");
861 let _ = catalog.next_id(&mut pager, "a").expect("a2");
862 let _ = catalog.next_id(&mut pager, "a").expect("a3");
863 let a = catalog
864 .get(&mut pager, "a")
865 .expect("get a")
866 .expect("present");
867 let b = catalog
868 .get(&mut pager, "b")
869 .expect("get b")
870 .expect("present");
871 assert_eq!(a.next_id, 4);
872 assert_eq!(b.next_id, 1, "b's next_id unchanged");
873 }
874
875 #[test]
876 fn list_collections_excludes_reserved_row() {
877 let mut pager = fresh_pager();
878 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
879 let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
880 let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
881 catalog
882 .insert(
883 &mut pager,
884 "alpha",
885 CollectionDescriptor::new(0, p1.get(), 1),
886 )
887 .expect("alpha");
888 catalog
889 .insert(
890 &mut pager,
891 "beta",
892 CollectionDescriptor::new(0, p2.get(), 1),
893 )
894 .expect("beta");
895 let listed = catalog.list_collections(&mut pager).expect("list");
896 assert_eq!(listed.len(), 2);
897 let names: Vec<&str> = listed.iter().map(|(n, _)| n.as_str()).collect();
898 assert!(names.contains(&"alpha"));
899 assert!(names.contains(&"beta"));
900 }
901
902 #[test]
903 fn reopen_preserves_watermark() {
904 // Open catalog A, register two collections, allocate ids.
905 // Bypass commit (memory pager has none) — for memory pagers
906 // every catalog operation is immediately reflected in the
907 // pager state. Reopening means: drop the catalog handle and
908 // open a new one on the same pager.
909 let mut pager = fresh_pager();
910 let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
911 let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
912 {
913 let mut catalog =
914 Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
915 catalog
916 .insert(
917 &mut pager,
918 "users",
919 CollectionDescriptor::new(0, p1.get(), 1),
920 )
921 .expect("users");
922 catalog
923 .insert(
924 &mut pager,
925 "posts",
926 CollectionDescriptor::new(0, p2.get(), 1),
927 )
928 .expect("posts");
929 let _ = catalog.next_id(&mut pager, "users").expect("u1");
930 let _ = catalog.next_id(&mut pager, "users").expect("u2");
931 }
932 // Re-open: the new catalog handle reads the watermark and the
933 // descriptors back from disk (= the in-memory pager's
934 // committed state for memory backends).
935 let catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("reopen");
936 let listed = catalog.list_collections(&mut pager).expect("list");
937 assert_eq!(listed.len(), 2);
938 let users = listed
939 .iter()
940 .find(|(n, _)| n == "users")
941 .expect("users present");
942 assert_eq!(users.1.next_id, 3, "users next_id survived reopen");
943 let posts = listed
944 .iter()
945 .find(|(n, _)| n == "posts")
946 .expect("posts present");
947 assert_eq!(posts.1.next_id, 1);
948 }
949
950 #[test]
951 fn empty_name_rejected() {
952 let mut pager = fresh_pager();
953 let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
954 let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
955 let err = catalog
956 .insert(&mut pager, "", CollectionDescriptor::new(0, p1.get(), 1))
957 .expect_err("empty name rejected");
958 assert!(matches!(err, Error::InvalidArgument(_)));
959 }
960}