Skip to main content

obj_core/
catalog.rs

1//! Catalog (L5) — on-disk registry of collections.
2//!
3//! See `docs/format.md` § Catalog for the authoritative on-disk
4//! shape. The catalog is a B+tree (M4 data structure) keyed by
5//! collection name and valued by postcard-encoded
6//! [`CollectionDescriptor`]s. Its root page-id is recorded in the
7//! file header field `root_catalog`; the M2..M4 default of zero
8//! signals "no catalog yet" and [`Catalog::open_or_init`] creates
9//! one on first open.
10//!
11//! # Power-of-ten posture
12//!
13//! - **Rule 1.** Catalog walks go through the M4 B+tree API which
14//!   uses an explicit stack — no recursion in this module.
15//! - **Rule 2.** [`Catalog::list_collections`] is bounded by
16//!   [`MAX_COLLECTIONS`]; exceeding the bound surfaces as
17//!   [`Error::BTreeScanLimitExceeded`].
18//! - **Rule 5.** Reserved-row presence is debug-asserted; the
19//!   `try_into` chain in `id_from_bytes` is the runtime boundary.
20//! - **Rule 7.** Every fallible step propagates via `?`; no
21//!   `unwrap` on the production path.
22//! - **Rule 9.** No `dyn` — the catalog is generic over `F:
23//!   FileBackend` and the B+tree it owns is monomorphised.
24
25#![forbid(unsafe_code)]
26
27use serde::{Deserialize, Serialize};
28
29use crate::btree::node::{decode_node, NodeKind};
30use crate::btree::{choose_child, BTree, MAX_BTREE_DEPTH};
31use crate::error::{Error, Result};
32use crate::id::{bump_next_id, Id};
33use crate::index::{IndexKind, IndexSpec};
34use crate::pager::page::PageId;
35use crate::pager::{Pager, ReaderSnapshot};
36use crate::platform::{FileBackend, FileHandle};
37
38use heapless::Vec as HeaplessVec;
39
40/// Maximum number of collections a single catalog may carry. Bounds
41/// [`Catalog::list_collections`] (Rule 2) and the
42/// next-collection-id allocator below. 1 << 20 (1 048 576) is a
43/// generous ceiling — at 64-byte descriptor payloads the catalog
44/// would still fit in ~64 MiB.
45pub const MAX_COLLECTIONS: usize = 1 << 20;
46
47/// The reserved catalog-name (empty UTF-8 bytes) under which the
48/// next-collection-id watermark is stored. Empty names are
49/// rejected on user-facing `insert`, so this row is private to the
50/// catalog implementation.
51const RESERVED_NEXT_ID_KEY: &[u8] = b"";
52
53/// On-disk description of a collection.
54///
55/// Encoded with `postcard` as the value of a catalog B-tree row.
56/// The exact shape is documented in `docs/format.md` § Catalog.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct CollectionDescriptor {
59    /// Catalog-assigned numeric id for this collection.
60    pub collection_id: u32,
61    /// Page-id of the collection's primary B-tree root.
62    pub primary_root: u64,
63    /// Current `Document::VERSION` for the collection's type.
64    pub type_version: u32,
65    /// Next-id watermark — the next [`Id`] the allocator will
66    /// hand out for this collection.
67    pub next_id: u64,
68    /// Secondary indexes. Empty in M5; populated in M7.
69    pub indexes: Vec<IndexDescriptor>,
70}
71
72impl CollectionDescriptor {
73    /// Construct a descriptor for a freshly-registered collection.
74    /// `primary_root` is the page-id of the collection's empty
75    /// primary B-tree (allocated by the caller before
76    /// [`Catalog::insert`]); `collection_id` is the value the
77    /// catalog will assign.
78    #[must_use]
79    pub const fn new(collection_id: u32, primary_root: u64, type_version: u32) -> Self {
80        Self {
81            collection_id,
82            primary_root,
83            type_version,
84            next_id: 1,
85            indexes: Vec::new(),
86        }
87    }
88}
89
90/// On-disk descriptor for a secondary index attached to a
91/// collection.
92///
93/// Persisted inside the owning [`CollectionDescriptor::indexes`]
94/// vector as part of the catalog row's postcard payload. The wire
95/// shape is documented in `docs/format.md` § Indexes; format-minor
96/// bumped to 2 in M7.
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct IndexDescriptor {
99    /// Catalog-assigned numeric id for this index. Stable across
100    /// reopens; **never reused** — a `DroppedPending` descriptor
101    /// retains its `index_id` so the page reclamation pass on the
102    /// next checkpoint does not race with concurrent readers.
103    pub index_id: u32,
104    /// User-visible name. Stable across reopens; the reconciler
105    /// matches a runtime [`IndexSpec`] to a stored descriptor by
106    /// this name.
107    pub name: String,
108    /// Discriminator for the kind of index. See
109    /// [`crate::index::IndexKind`].
110    pub kind: IndexKind,
111    /// Field path(s) the index is keyed by. Single-element for
112    /// `Standard` / `Unique` / `Each`; ≥ 2 for `Composite`.
113    pub key_paths: Vec<String>,
114    /// Page-id of the index B+tree's root.
115    pub root_page_id: u64,
116    /// Lifecycle status — see [`IndexStatus`].
117    pub status: IndexStatus,
118}
119
120/// Lifecycle state of an [`IndexDescriptor`].
121///
122/// `Active` indexes participate in writes (#58) and reads (#60);
123/// `DroppedPending` is a tombstone — the descriptor lingers so the
124/// `index_id` is not reused and the next `Pager::checkpoint` can
125/// reclaim the B+tree pages.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[repr(u8)]
128pub enum IndexStatus {
129    /// Index is live — every write maintains it, every read may
130    /// consult it.
131    Active = 0,
132    /// Index was dropped by the reconciler; the descriptor remains
133    /// so the `index_id` is not reused. Pages are reclaimed on the
134    /// next `Pager::checkpoint`.
135    DroppedPending = 1,
136}
137
138/// The catalog handle.
139///
140/// Owns the catalog B+tree's root page-id; mutating methods take
141/// `&mut Pager<F>` to advance the underlying B-tree through
142/// copy-on-write.
143#[derive(Debug)]
144pub struct Catalog<F: FileBackend = FileHandle> {
145    tree: BTree<F>,
146    /// Cached watermark for the next `collection_id` to allocate.
147    /// Loaded from the reserved catalog row on open and re-persisted
148    /// on every change.
149    next_collection_id: u32,
150}
151
152impl<F: FileBackend> Catalog<F> {
153    /// Open the catalog, creating it on first call.
154    ///
155    /// Reads the file header's `root_catalog` field via
156    /// [`Pager::root_catalog`]:
157    ///
158    /// - If non-zero, attaches to the existing catalog B-tree and
159    ///   loads the next-collection-id watermark from the reserved
160    ///   row.
161    /// - If zero, allocates a fresh B+tree root, seeds the reserved
162    ///   row with `1`, persists the root via
163    ///   [`Pager::set_root_catalog`], and commits.
164    ///
165    /// # Errors
166    ///
167    /// - [`Error::Corruption`] if an existing catalog's reserved
168    ///   row is missing or malformed.
169    /// - Pager / B-tree errors propagated as-is.
170    pub fn open_or_init(pager: &mut Pager<F>) -> Result<Self> {
171        let raw = pager.root_catalog();
172        if let Some(existing) = PageId::new(raw) {
173            return Self::open_existing(pager, existing);
174        }
175        Self::init_fresh(pager)
176    }
177
178    fn open_existing(pager: &mut Pager<F>, root: PageId) -> Result<Self> {
179        let tree = BTree::<F>::open(pager, root)?;
180        let watermark = match tree.get(pager, RESERVED_NEXT_ID_KEY)? {
181            Some(bytes) => postcard::from_bytes::<u32>(&bytes).map_err(Error::from)?,
182            None => {
183                // The reserved row MUST exist on every previously-
184                // initialised catalog; absence is corruption.
185                return Err(Error::Corruption {
186                    page_id: root.get(),
187                });
188            }
189        };
190        Ok(Self {
191            tree,
192            next_collection_id: watermark,
193        })
194    }
195
196    fn init_fresh(pager: &mut Pager<F>) -> Result<Self> {
197        let mut tree = BTree::<F>::empty(pager)?;
198        // Seed the reserved next-collection-id row with `1` — zero is
199        // reserved as a sentinel "no collection" (mirrors `Id`).
200        let watermark: u32 = 1;
201        let encoded = postcard::to_allocvec(&watermark)?;
202        tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
203        pager.set_root_catalog(tree.root().get())?;
204        Ok(Self {
205            tree,
206            next_collection_id: watermark,
207        })
208    }
209
210    /// Get the descriptor for the named collection. Returns
211    /// `Ok(None)` if no such collection exists.
212    ///
213    /// # Errors
214    ///
215    /// - [`Error::InvalidArgument`] if `name` is empty.
216    /// - Pager / B-tree / postcard errors propagated as-is.
217    pub fn get(&self, pager: &mut Pager<F>, name: &str) -> Result<Option<CollectionDescriptor>> {
218        validate_name(name)?;
219        match self.tree.get(pager, name.as_bytes())? {
220            Some(bytes) => {
221                let descriptor: CollectionDescriptor =
222                    postcard::from_bytes(&bytes).map_err(Error::from)?;
223                Ok(Some(descriptor))
224            }
225            None => Ok(None),
226        }
227    }
228
229    /// Look up a collection descriptor as-of a [`ReaderSnapshot`]'s
230    /// pinned LSN — i.e. observe the catalog state the reader's
231    /// snapshot pinned, NOT the writer's live `Catalog.tree.root`.
232    ///
233    /// Walks the catalog B+tree rooted at `snapshot.root_catalog()`
234    /// (the value captured by `Pager::reader_snapshot` at pin time;
235    /// see M6 #51). Every page read goes through
236    /// [`ReaderSnapshot::read_page`], which consults the snapshot's
237    /// frozen WAL view first and falls through to the main file —
238    /// bypassing the live WAL overlay that may have been advanced by
239    /// a concurrent writer since pin time. This is the M6 #53 fix:
240    /// without it, a reader's catalog descend can land on a freelist-
241    /// recycled page-id whose `state.view` contents are no longer a
242    /// valid B+tree node, surfacing as `Error::Corruption { page_id:
243    /// 0 }` from the codec.
244    ///
245    /// When `snapshot.root_catalog() == 0` the catalog did not exist
246    /// at the snapshot's pinned LSN; `Ok(None)` is returned and the
247    /// caller should surface that as `Error::CollectionNotFound`.
248    ///
249    /// # Errors
250    ///
251    /// - [`Error::InvalidArgument`] if `name` is empty.
252    /// - [`Error::BTreeDepthExceeded`] if the catalog B+tree exceeds
253    ///   `MAX_BTREE_DEPTH` (Rule 1 bound on the descend stack).
254    /// - [`Error::Corruption`] / [`Error::Codec`] propagated from the
255    ///   snapshot read and postcard decode.
256    pub fn lookup_via_snapshot(
257        pager: &Pager<F>,
258        snapshot: &ReaderSnapshot<F>,
259        name: &str,
260    ) -> Result<Option<CollectionDescriptor>> {
261        validate_name(name)?;
262        let Some(root) = PageId::new(snapshot.root_catalog()) else {
263            // root_catalog == 0 means the catalog did not exist at
264            // the snapshot's pinned LSN. Nothing to look up.
265            return Ok(None);
266        };
267        let key = name.as_bytes();
268        // Descend root → leaf via the snapshot's read_page.
269        // `heapless::Vec` for the descent stack (Rule 1 + Rule 3).
270        let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
271        let mut current = root;
272        let leaf_node = loop {
273            if path.push(current).is_err() {
274                return Err(Error::BTreeDepthExceeded {
275                    limit: MAX_BTREE_DEPTH,
276                });
277            }
278            let page = snapshot.read_page(pager, current)?;
279            let decoded = decode_node(page.as_bytes())?;
280            match decoded.kind {
281                NodeKind::Leaf => break decoded,
282                NodeKind::Internal => {
283                    current = choose_child(&decoded, key)?;
284                }
285            }
286        };
287        // Linear scan of the leaf's entries — the leaf is bounded by
288        // the slot-directory capacity, so this loop is statically
289        // bounded by `LEAF_SLOT_CAP` (Rule 2).
290        for entry in &leaf_node.leaves {
291            if entry.key.as_slice() == key {
292                let descriptor: CollectionDescriptor =
293                    postcard::from_bytes(&entry.value).map_err(Error::from)?;
294                return Ok(Some(descriptor));
295            }
296        }
297        Ok(None)
298    }
299
300    /// Register a new collection.
301    ///
302    /// Allocates the next `collection_id`, sets it on `descriptor`,
303    /// re-persists the next-collection-id watermark, and inserts
304    /// the descriptor into the catalog B-tree. The descriptor that
305    /// the caller passes in has its `collection_id` field
306    /// **ignored** — the catalog assigns the canonical value.
307    ///
308    /// Call [`Pager::commit`] after this to make the registration
309    /// durable.
310    ///
311    /// # Errors
312    ///
313    /// - [`Error::InvalidArgument`] if `name` is empty.
314    /// - [`Error::CollectionAlreadyExists`] if `name` is already
315    ///   registered.
316    /// - [`Error::IdSpaceExhausted`] if the `u32` `collection_id`
317    ///   space is exhausted.
318    /// - Pager / B-tree / postcard errors propagated.
319    pub fn insert(
320        &mut self,
321        pager: &mut Pager<F>,
322        name: &str,
323        mut descriptor: CollectionDescriptor,
324    ) -> Result<u32> {
325        // Rule 5 — M6 #51: catalog mutations must run inside a WAL
326        // transaction so the `set_root_catalog` header update and
327        // the B-tree page writes commit atomically. The pre-M6.5
328        // bug was a `set_root_catalog` that wrote direct-to-disk
329        // outside the WAL; this assertion defends against that
330        // class of regression. Memory pagers have no WAL and
331        // satisfy `in_txn` vacuously.
332        debug_assert!(
333            pager.in_txn(),
334            "Catalog::insert must run inside a WAL transaction \
335             (Pager::begin_txn / WriteTxn::begin)",
336        );
337        validate_name(name)?;
338        if self.tree.get(pager, name.as_bytes())?.is_some() {
339            return Err(Error::CollectionAlreadyExists {
340                name: name.to_owned(),
341            });
342        }
343        let assigned = self.next_collection_id;
344        descriptor.collection_id = assigned;
345        let new_watermark =
346            self.next_collection_id
347                .checked_add(1)
348                .ok_or_else(|| Error::IdSpaceExhausted {
349                    collection: "<catalog>".to_owned(),
350                })?;
351        let encoded = postcard::to_allocvec(&descriptor)?;
352        self.tree.insert(pager, name.as_bytes(), &encoded)?;
353        // Update the reserved next-collection-id row. B+tree inserts
354        // do not handle "key exists" updates — delete then re-insert.
355        self.persist_next_collection_id(pager, new_watermark)?;
356        // Catalog root may have changed (every B+tree mutation is COW).
357        pager.set_root_catalog(self.tree.root().get())?;
358        self.next_collection_id = new_watermark;
359        Ok(assigned)
360    }
361
362    /// Update an existing collection's descriptor in place.
363    ///
364    /// Used when `next_id` advances, `type_version` is bumped, or
365    /// secondary indexes change. The on-disk `collection_id` is
366    /// preserved across the update; callers should not change it.
367    ///
368    /// # Errors
369    ///
370    /// - [`Error::InvalidArgument`] if `name` is empty.
371    /// - [`Error::Corruption`] if the descriptor's `collection_id`
372    ///   disagrees with the catalog's record (defensive check —
373    ///   indicates a caller bug).
374    /// - Pager / B-tree / postcard errors propagated.
375    pub fn update(
376        &mut self,
377        pager: &mut Pager<F>,
378        name: &str,
379        descriptor: &CollectionDescriptor,
380    ) -> Result<()> {
381        // Rule 5 — see `Catalog::insert` for the rationale.
382        debug_assert!(
383            pager.in_txn(),
384            "Catalog::update must run inside a WAL transaction",
385        );
386        validate_name(name)?;
387        let existing = self.get(pager, name)?.ok_or(Error::InvalidArgument(
388            "catalog update: collection not registered",
389        ))?;
390        if existing.collection_id != descriptor.collection_id {
391            return Err(Error::Corruption {
392                page_id: self.tree.root().get(),
393            });
394        }
395        let encoded = postcard::to_allocvec(descriptor)?;
396        self.tree.delete(pager, name.as_bytes())?;
397        self.tree.insert(pager, name.as_bytes(), &encoded)?;
398        pager.set_root_catalog(self.tree.root().get())?;
399        Ok(())
400    }
401
402    /// Declare a new secondary index on the named collection.
403    ///
404    /// Allocates a fresh `index_id`, an empty index B+tree (M4),
405    /// and appends a new `IndexDescriptor { status: Active }` to
406    /// the collection's `indexes` vector. The mutation rides inside
407    /// the caller's WAL transaction; on rollback the descriptor +
408    /// the empty B+tree are both discarded.
409    ///
410    /// `spec` is validated before any state mutation; an invalid
411    /// spec surfaces as [`Error::InvalidArgument`] before the
412    /// catalog touches the pager.
413    ///
414    /// # Errors
415    ///
416    /// - [`Error::InvalidArgument`] if `spec.validate()` rejects
417    ///   the spec.
418    /// - [`Error::CollectionNotFound`] if `collection` is not
419    ///   registered.
420    /// - [`Error::IndexKindMismatch`] if an `Active` descriptor of
421    ///   the same name has a different `(kind, key_paths)`.
422    /// - [`Error::IdSpaceExhausted`] on `u32` `index_id` wraparound
423    ///   (defense-in-depth — practically unreachable).
424    /// - Pager / B-tree / postcard errors propagated.
425    pub fn declare_index(
426        &mut self,
427        pager: &mut Pager<F>,
428        collection: &str,
429        spec: &IndexSpec,
430    ) -> Result<u32> {
431        debug_assert!(
432            pager.in_txn(),
433            "Catalog::declare_index must run inside a WAL transaction",
434        );
435        spec.validate()?;
436        validate_name(collection)?;
437        let mut descriptor =
438            self.get(pager, collection)?
439                .ok_or_else(|| Error::CollectionNotFound {
440                    name: collection.to_owned(),
441                })?;
442        if let Some(existing) = descriptor.indexes.iter().find(|d| d.name == spec.name) {
443            return Self::reconcile_existing_index(existing, spec);
444        }
445        let index_id = next_index_id(&descriptor)?;
446        let root_page_id = BTree::<F>::empty(pager)?.root().get();
447        let new_descriptor = IndexDescriptor {
448            index_id,
449            name: spec.name.clone(),
450            kind: spec.kind,
451            key_paths: spec.key_paths.clone(),
452            root_page_id,
453            status: IndexStatus::Active,
454        };
455        descriptor.indexes.push(new_descriptor);
456        self.update(pager, collection, &descriptor)?;
457        Ok(index_id)
458    }
459
460    /// Reconcile a runtime [`IndexSpec`] against an already-stored
461    /// `IndexDescriptor` of the same name. Returns the existing
462    /// `index_id` if the `(kind, key_paths)` match; errors otherwise.
463    fn reconcile_existing_index(existing: &IndexDescriptor, spec: &IndexSpec) -> Result<u32> {
464        if existing.kind != spec.kind {
465            return Err(Error::IndexKindMismatch {
466                name: spec.name.clone(),
467                expected: spec.kind,
468                found: existing.kind,
469            });
470        }
471        if existing.key_paths != spec.key_paths {
472            return Err(Error::IndexKeyPathsMismatch {
473                name: spec.name.clone(),
474            });
475        }
476        // Re-activating a `DroppedPending` index is a deliberate
477        // re-declare — but M7 leaves the descriptor as-is; the
478        // reconciler will toggle it back to Active in a follow-up
479        // commit if needed. For now: existing match → idempotent.
480        Ok(existing.index_id)
481    }
482
483    /// Reconcile the runtime [`IndexSpec`] set for a collection
484    /// against the catalog's stored descriptors.
485    ///
486    /// - Specs present in `specs` and absent from the descriptor are
487    ///   **declared** (new `Active` descriptor + empty B+tree).
488    /// - `Active` descriptors absent from `specs` are flipped to
489    ///   `DroppedPending`.
490    /// - Matching `(name, kind, key_paths)` pairs are left alone —
491    ///   reconciliation is **idempotent**.
492    ///
493    /// Returns the descriptor's post-reconciliation index roster (a
494    /// `Vec<IndexDescriptor>` clone) so the caller can build its
495    /// maintenance plan without re-querying the catalog.
496    ///
497    /// # Errors
498    ///
499    /// - [`Error::IndexKindMismatch`] /
500    ///   [`Error::IndexKeyPathsMismatch`] on per-name structural
501    ///   disagreement.
502    /// - [`Error::IdSpaceExhausted`] on `u32` `index_id` wraparound.
503    /// - Pager / B-tree / postcard errors propagated.
504    pub fn reconcile_indexes(
505        &mut self,
506        pager: &mut Pager<F>,
507        collection: &str,
508        specs: &[IndexSpec],
509    ) -> Result<Vec<IndexDescriptor>> {
510        debug_assert!(
511            pager.in_txn(),
512            "Catalog::reconcile_indexes must run inside a WAL transaction",
513        );
514        validate_name(collection)?;
515        for spec in specs {
516            spec.validate()?;
517        }
518        // Stage 1: declare missing (and verify-match-or-error
519        // existing).
520        for spec in specs {
521            let _ = self.declare_index(pager, collection, spec)?;
522        }
523        // Stage 2: drop active descriptors whose name no longer
524        // appears in `specs`.
525        let descriptor = self
526            .get(pager, collection)?
527            .ok_or_else(|| Error::CollectionNotFound {
528                name: collection.to_owned(),
529            })?;
530        let mut to_drop: Vec<String> = Vec::new();
531        for d in &descriptor.indexes {
532            if d.status == IndexStatus::Active && !specs.iter().any(|s| s.name == d.name) {
533                to_drop.push(d.name.clone());
534            }
535        }
536        for name in to_drop {
537            self.drop_index(pager, collection, &name)?;
538        }
539        // Re-read for the post-reconciliation snapshot.
540        let final_descriptor =
541            self.get(pager, collection)?
542                .ok_or_else(|| Error::CollectionNotFound {
543                    name: collection.to_owned(),
544                })?;
545        Ok(final_descriptor.indexes)
546    }
547
548    /// Drop the named index from the named collection — flips the
549    /// descriptor's status to [`IndexStatus::DroppedPending`].
550    ///
551    /// The descriptor stays in the catalog so its `index_id` is not
552    /// reused. The index B+tree pages are reclaimed on the next
553    /// [`Pager::checkpoint`] pass (deferred reclamation: a
554    /// concurrent reader's snapshot may still need to walk the
555    /// pages until its pin is released).
556    ///
557    /// # Errors
558    ///
559    /// - [`Error::CollectionNotFound`] if `collection` is not
560    ///   registered.
561    /// - [`Error::IndexNotFound`] if `index_name` is not a known
562    ///   descriptor on the collection.
563    pub fn drop_index(
564        &mut self,
565        pager: &mut Pager<F>,
566        collection: &str,
567        index_name: &str,
568    ) -> Result<()> {
569        debug_assert!(
570            pager.in_txn(),
571            "Catalog::drop_index must run inside a WAL transaction",
572        );
573        validate_name(collection)?;
574        let mut descriptor =
575            self.get(pager, collection)?
576                .ok_or_else(|| Error::CollectionNotFound {
577                    name: collection.to_owned(),
578                })?;
579        let entry = descriptor
580            .indexes
581            .iter_mut()
582            .find(|d| d.name == index_name)
583            .ok_or_else(|| Error::IndexNotFound {
584                collection: collection.to_owned(),
585                name: index_name.to_owned(),
586            })?;
587        if entry.status == IndexStatus::Active {
588            entry.status = IndexStatus::DroppedPending;
589        }
590        self.update(pager, collection, &descriptor)?;
591        Ok(())
592    }
593
594    /// Allocate the next [`Id`] for the named collection,
595    /// persisting the bumped `next_id` watermark inside the
596    /// catalog row.
597    ///
598    /// Re-reads the descriptor, bumps `next_id`, writes the new
599    /// descriptor back via [`Catalog::update`], and returns the
600    /// just-issued id.
601    ///
602    /// The id-bump is staged through the WAL exactly like every
603    /// other catalog mutation; if the caller's surrounding
604    /// transaction is later rolled back (no `Pager::commit`), the
605    /// allocation is rolled back with it — the next open will
606    /// re-issue the same id.
607    ///
608    /// # Errors
609    ///
610    /// - [`Error::InvalidArgument`] if `name` is empty or not
611    ///   registered.
612    /// - [`Error::IdSpaceExhausted`] on `u64` wraparound.
613    /// - Pager / B-tree errors propagated.
614    pub fn next_id(&mut self, pager: &mut Pager<F>, name: &str) -> Result<Id> {
615        // Rule 5 — see `Catalog::insert` for the rationale.
616        debug_assert!(
617            pager.in_txn(),
618            "Catalog::next_id must run inside a WAL transaction",
619        );
620        validate_name(name)?;
621        let mut descriptor = self.get(pager, name)?.ok_or(Error::InvalidArgument(
622            "catalog next_id: collection not registered",
623        ))?;
624        // Use the bumper from `id` for the wraparound check. The
625        // closure builds the owned collection name on demand — it is
626        // only invoked on the wraparound / zero-watermark error path.
627        let issued = bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
628        self.update(pager, name, &descriptor)?;
629        Ok(issued)
630    }
631
632    /// List every registered collection.
633    ///
634    /// Scans the full catalog B-tree. Bounded by
635    /// [`MAX_COLLECTIONS`] (Rule 2). The reserved next-collection-id
636    /// row is filtered out.
637    ///
638    /// # Errors
639    ///
640    /// - [`Error::BTreeScanLimitExceeded`] if the catalog has more
641    ///   than [`MAX_COLLECTIONS`] entries.
642    /// - Pager / B-tree / postcard errors propagated.
643    pub fn list_collections(
644        &self,
645        pager: &mut Pager<F>,
646    ) -> Result<Vec<(String, CollectionDescriptor)>> {
647        let mut out: Vec<(String, CollectionDescriptor)> = Vec::new();
648        let mut scanned = 0usize;
649        let iter = self.tree.range(pager, ..)?;
650        for entry in iter {
651            scanned += 1;
652            if scanned > MAX_COLLECTIONS {
653                return Err(Error::BTreeScanLimitExceeded {
654                    limit: MAX_COLLECTIONS,
655                });
656            }
657            let (key, value) = entry?;
658            if key.as_slice() == RESERVED_NEXT_ID_KEY {
659                continue;
660            }
661            let name = String::from_utf8(key).map_err(|_| Error::Corruption {
662                page_id: self.tree.root().get(),
663            })?;
664            let descriptor: CollectionDescriptor =
665                postcard::from_bytes(&value).map_err(Error::from)?;
666            out.push((name, descriptor));
667        }
668        Ok(out)
669    }
670
671    fn persist_next_collection_id(&mut self, pager: &mut Pager<F>, watermark: u32) -> Result<()> {
672        let encoded = postcard::to_allocvec(&watermark)?;
673        // The reserved row already exists; delete + re-insert (B+tree
674        // inserts reject duplicates).
675        self.tree.delete(pager, RESERVED_NEXT_ID_KEY)?;
676        self.tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
677        Ok(())
678    }
679}
680
681/// Maximum collection-name length, in bytes (#43). 255 is a
682/// conservative, widely-compatible cap that comfortably exceeds any
683/// reasonable name yet keeps catalog keys bounded. Loosening this
684/// later is backward-compatible; tightening it would be breaking, so
685/// we pick a generous-but-finite bound at the 1.0 freeze.
686const MAX_COLLECTION_NAME_LEN: usize = 255;
687
688/// Validate a collection name (#43).
689///
690/// Policy:
691/// - Reject the empty string: it would collide with the reserved
692///   next-collection-id row and is a UX hazard (a Document with
693///   `COLLECTION = ""` would be invisible to `list_collections`).
694/// - Reject names longer than [`MAX_COLLECTION_NAME_LEN`] bytes so
695///   catalog keys stay bounded.
696/// - Reject names containing a NUL or any other ASCII/Unicode
697///   control character: such bytes are a portability and
698///   display-safety hazard (terminal injection, truncation at an
699///   embedded NUL on FFI boundaries).
700///
701/// All rejections surface as [`Error::InvalidArgument`].
702fn validate_name(name: &str) -> Result<()> {
703    if name.is_empty() {
704        return Err(Error::InvalidArgument("collection name must be non-empty"));
705    }
706    if name.len() > MAX_COLLECTION_NAME_LEN {
707        return Err(Error::InvalidArgument("collection name exceeds 255 bytes"));
708    }
709    if name.chars().any(char::is_control) {
710        return Err(Error::InvalidArgument(
711            "collection name must not contain NUL or control characters",
712        ));
713    }
714    Ok(())
715}
716
717/// Compute the next `index_id` for the collection. Scans the
718/// descriptor's existing indexes (including `DroppedPending` rows
719/// — their ids are NEVER reused, per `docs/format.md` § Indexes)
720/// and returns one past the max. Wraps with [`Error::IdSpaceExhausted`]
721/// on `u32::MAX`.
722fn next_index_id(descriptor: &CollectionDescriptor) -> Result<u32> {
723    let max = descriptor
724        .indexes
725        .iter()
726        .map(|d| d.index_id)
727        .max()
728        .unwrap_or(0);
729    max.checked_add(1).ok_or_else(|| Error::IdSpaceExhausted {
730        collection: format!("<indexes:{}>", descriptor.collection_id),
731    })
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737    use crate::pager::{Config, Pager};
738    use crate::platform::FileHandle;
739
740    fn fresh_pager() -> Pager<FileHandle> {
741        Pager::<FileHandle>::memory(Config::default()).expect("pager")
742    }
743
744    #[test]
745    fn validate_name_policy() {
746        // Accepted: a normal name and one exactly at the byte cap.
747        assert!(validate_name("users").is_ok());
748        let at_cap = "a".repeat(MAX_COLLECTION_NAME_LEN);
749        assert!(validate_name(&at_cap).is_ok());
750
751        // Rejected: empty.
752        assert!(matches!(validate_name(""), Err(Error::InvalidArgument(_))));
753        // Rejected: one byte over the cap.
754        let too_long = "a".repeat(MAX_COLLECTION_NAME_LEN + 1);
755        assert!(matches!(
756            validate_name(&too_long),
757            Err(Error::InvalidArgument(_))
758        ));
759        // Rejected: embedded NUL.
760        assert!(matches!(
761            validate_name("ab\0cd"),
762            Err(Error::InvalidArgument(_))
763        ));
764        // Rejected: other control characters (newline, tab, DEL).
765        for bad in ["line\nbreak", "tab\tname", "del\u{7f}name"] {
766            assert!(
767                matches!(validate_name(bad), Err(Error::InvalidArgument(_))),
768                "expected rejection for {bad:?}"
769            );
770        }
771    }
772
773    #[test]
774    fn open_or_init_on_fresh_pager_creates_catalog() {
775        let mut pager = fresh_pager();
776        assert_eq!(pager.root_catalog(), 0);
777        let _catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
778        assert_ne!(pager.root_catalog(), 0, "catalog root must be installed");
779    }
780
781    #[test]
782    fn insert_and_get_round_trip() {
783        let mut pager = fresh_pager();
784        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
785        let primary_root = BTree::<FileHandle>::empty(&mut pager)
786            .expect("primary tree")
787            .root();
788        let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
789        let assigned = catalog
790            .insert(&mut pager, "users", descriptor.clone())
791            .expect("insert users");
792        assert_eq!(assigned, 1, "first collection gets id 1");
793
794        let back = catalog
795            .get(&mut pager, "users")
796            .expect("get")
797            .expect("present");
798        assert_eq!(back.collection_id, assigned);
799        assert_eq!(back.primary_root, primary_root.get());
800        assert_eq!(back.type_version, 1);
801        assert_eq!(back.next_id, 1);
802        assert!(back.indexes.is_empty());
803    }
804
805    #[test]
806    fn duplicate_insert_errors() {
807        let mut pager = fresh_pager();
808        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
809        let primary_root = BTree::<FileHandle>::empty(&mut pager)
810            .expect("primary tree")
811            .root();
812        let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
813        catalog
814            .insert(&mut pager, "users", descriptor.clone())
815            .expect("first insert");
816        let err = catalog
817            .insert(&mut pager, "users", descriptor)
818            .expect_err("dup");
819        assert!(matches!(err, Error::CollectionAlreadyExists { ref name } if name == "users"));
820    }
821
822    #[test]
823    fn next_id_advances_and_persists() {
824        let mut pager = fresh_pager();
825        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
826        let primary_root = BTree::<FileHandle>::empty(&mut pager)
827            .expect("primary tree")
828            .root();
829        let _id = catalog
830            .insert(
831                &mut pager,
832                "users",
833                CollectionDescriptor::new(0, primary_root.get(), 1),
834            )
835            .expect("insert");
836        let id1 = catalog.next_id(&mut pager, "users").expect("next 1");
837        let id2 = catalog.next_id(&mut pager, "users").expect("next 2");
838        assert_eq!(id1.get(), 1);
839        assert_eq!(id2.get(), 2);
840        let descriptor = catalog
841            .get(&mut pager, "users")
842            .expect("get")
843            .expect("present");
844        assert_eq!(descriptor.next_id, 3, "next_id watermark advanced");
845    }
846
847    #[test]
848    fn cross_collection_id_isolation() {
849        let mut pager = fresh_pager();
850        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
851        let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
852        let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
853        catalog
854            .insert(&mut pager, "a", CollectionDescriptor::new(0, p1.get(), 1))
855            .expect("a");
856        catalog
857            .insert(&mut pager, "b", CollectionDescriptor::new(0, p2.get(), 1))
858            .expect("b");
859        // Allocate 3 ids in "a"; "b" should still report next_id = 1.
860        let _ = catalog.next_id(&mut pager, "a").expect("a1");
861        let _ = catalog.next_id(&mut pager, "a").expect("a2");
862        let _ = catalog.next_id(&mut pager, "a").expect("a3");
863        let a = catalog
864            .get(&mut pager, "a")
865            .expect("get a")
866            .expect("present");
867        let b = catalog
868            .get(&mut pager, "b")
869            .expect("get b")
870            .expect("present");
871        assert_eq!(a.next_id, 4);
872        assert_eq!(b.next_id, 1, "b's next_id unchanged");
873    }
874
875    #[test]
876    fn list_collections_excludes_reserved_row() {
877        let mut pager = fresh_pager();
878        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
879        let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
880        let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
881        catalog
882            .insert(
883                &mut pager,
884                "alpha",
885                CollectionDescriptor::new(0, p1.get(), 1),
886            )
887            .expect("alpha");
888        catalog
889            .insert(
890                &mut pager,
891                "beta",
892                CollectionDescriptor::new(0, p2.get(), 1),
893            )
894            .expect("beta");
895        let listed = catalog.list_collections(&mut pager).expect("list");
896        assert_eq!(listed.len(), 2);
897        let names: Vec<&str> = listed.iter().map(|(n, _)| n.as_str()).collect();
898        assert!(names.contains(&"alpha"));
899        assert!(names.contains(&"beta"));
900    }
901
902    #[test]
903    fn reopen_preserves_watermark() {
904        // Open catalog A, register two collections, allocate ids.
905        // Bypass commit (memory pager has none) — for memory pagers
906        // every catalog operation is immediately reflected in the
907        // pager state. Reopening means: drop the catalog handle and
908        // open a new one on the same pager.
909        let mut pager = fresh_pager();
910        let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
911        let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
912        {
913            let mut catalog =
914                Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
915            catalog
916                .insert(
917                    &mut pager,
918                    "users",
919                    CollectionDescriptor::new(0, p1.get(), 1),
920                )
921                .expect("users");
922            catalog
923                .insert(
924                    &mut pager,
925                    "posts",
926                    CollectionDescriptor::new(0, p2.get(), 1),
927                )
928                .expect("posts");
929            let _ = catalog.next_id(&mut pager, "users").expect("u1");
930            let _ = catalog.next_id(&mut pager, "users").expect("u2");
931        }
932        // Re-open: the new catalog handle reads the watermark and the
933        // descriptors back from disk (= the in-memory pager's
934        // committed state for memory backends).
935        let catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("reopen");
936        let listed = catalog.list_collections(&mut pager).expect("list");
937        assert_eq!(listed.len(), 2);
938        let users = listed
939            .iter()
940            .find(|(n, _)| n == "users")
941            .expect("users present");
942        assert_eq!(users.1.next_id, 3, "users next_id survived reopen");
943        let posts = listed
944            .iter()
945            .find(|(n, _)| n == "posts")
946            .expect("posts present");
947        assert_eq!(posts.1.next_id, 1);
948    }
949
950    #[test]
951    fn empty_name_rejected() {
952        let mut pager = fresh_pager();
953        let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
954        let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
955        let err = catalog
956            .insert(&mut pager, "", CollectionDescriptor::new(0, p1.get(), 1))
957            .expect_err("empty name rejected");
958        assert!(matches!(err, Error::InvalidArgument(_)));
959    }
960}