Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32: u32 = 25;
50
51#[cfg(test)]
52thread_local! {
53    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
54}
55
56#[cfg(test)]
57pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
58    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
59}
60
61#[cfg(test)]
62pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
63    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
64}
65
66///
67/// RawSchemaKey
68///
69/// Stable key for one persisted schema snapshot entry.
70/// It combines the entity tag and schema version so reconciliation can load
71/// concrete versions without depending on generated entity names.
72///
73
74#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
75struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
76
77impl RawSchemaKey {
78    /// Build the raw persisted key for one entity schema version.
79    #[must_use]
80    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
81        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
82        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
83        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
84
85        Self(out)
86    }
87
88    /// Return the entity tag encoded in this schema key.
89    #[must_use]
90    fn entity_tag(self) -> EntityTag {
91        let mut bytes = [0u8; size_of::<u64>()];
92        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
93
94        EntityTag::new(u64::from_be_bytes(bytes))
95    }
96
97    /// Return the schema version encoded in this schema key.
98    #[must_use]
99    fn version(self) -> u32 {
100        let mut bytes = [0u8; size_of::<u32>()];
101        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
102
103        u32::from_be_bytes(bytes)
104    }
105
106    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
107        (
108            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
109            RangeBound::Included(Self::from_entity_version(
110                entity,
111                SchemaVersion::new(u32::MAX),
112            )),
113        )
114    }
115}
116
117impl Storable for RawSchemaKey {
118    fn to_bytes(&self) -> Cow<'_, [u8]> {
119        Cow::Borrowed(&self.0)
120    }
121
122    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
123        debug_assert_eq!(
124            bytes.len(),
125            SCHEMA_KEY_BYTES_USIZE,
126            "RawSchemaKey::from_bytes received unexpected byte length",
127        );
128
129        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
130            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
131        }
132
133        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
134        out.copy_from_slice(bytes.as_ref());
135        Self(out)
136    }
137
138    fn into_bytes(self) -> Vec<u8> {
139        self.0.to_vec()
140    }
141
142    const BOUND: StorableBound = StorableBound::Bounded {
143        max_size: SCHEMA_KEY_BYTES,
144        is_fixed_size: true,
145    };
146}
147
148///
149/// RawSchemaSnapshot
150///
151/// Raw persisted schema snapshot payload.
152/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
153/// keeping the stable-memory value boundary independent from the typed schema
154/// DTOs used by reconciliation.
155///
156
157#[derive(Clone, Debug, Eq, PartialEq)]
158struct RawSchemaSnapshot {
159    payload: Vec<u8>,
160    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
161}
162
163impl RawSchemaSnapshot {
164    /// Encode one typed persisted-schema snapshot into a raw store payload.
165    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
166        validate_typed_schema_snapshot_for_store(snapshot)?;
167
168        let accepted_schema_fingerprint =
169            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
170        let payload = encode_persisted_schema_snapshot(snapshot)?;
171
172        Ok(Self {
173            payload,
174            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
175        })
176    }
177
178    /// Build one raw schema snapshot from already-encoded bytes.
179    #[must_use]
180    #[cfg(test)]
181    const fn from_bytes(payload: Vec<u8>) -> Self {
182        Self {
183            payload,
184            accepted_schema_fingerprint: None,
185        }
186    }
187
188    /// Borrow the encoded schema snapshot payload.
189    #[must_use]
190    const fn as_bytes(&self) -> &[u8] {
191        self.payload.as_slice()
192    }
193
194    /// Consume the snapshot into its encoded payload bytes.
195    #[must_use]
196    fn into_bytes(self) -> Vec<u8> {
197        self.payload
198    }
199
200    /// Return the accepted schema identity fingerprint stored beside the raw
201    /// payload, without decoding the persisted snapshot.
202    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
203        self.accepted_schema_fingerprint.ok_or_else(|| {
204            InternalError::store_corruption(
205                "obsolete raw schema snapshot storage format missing accepted schema identity header",
206            )
207        })
208    }
209
210    /// Decode this raw store payload into a typed persisted-schema snapshot.
211    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
212        decode_persisted_schema_snapshot(self.as_bytes())
213    }
214}
215
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct AcceptedCatalogIdentity {
218    entity_tag: EntityTag,
219    entity_path: &'static str,
220    store_path: &'static str,
221    accepted_schema_version: SchemaVersion,
222    fingerprint_method_version: u8,
223    accepted_schema_fingerprint: CommitSchemaFingerprint,
224}
225
226impl AcceptedCatalogIdentity {
227    #[must_use]
228    pub(in crate::db) const fn new(
229        entity_tag: EntityTag,
230        entity_path: &'static str,
231        store_path: &'static str,
232        accepted_schema_version: SchemaVersion,
233        accepted_schema_fingerprint: CommitSchemaFingerprint,
234    ) -> Self {
235        Self {
236            entity_tag,
237            entity_path,
238            store_path,
239            accepted_schema_version,
240            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
241            accepted_schema_fingerprint,
242        }
243    }
244
245    #[must_use]
246    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
247        self.entity_tag
248    }
249
250    #[must_use]
251    pub(in crate::db) const fn entity_path(self) -> &'static str {
252        self.entity_path
253    }
254
255    #[must_use]
256    pub(in crate::db) const fn store_path(self) -> &'static str {
257        self.store_path
258    }
259
260    #[must_use]
261    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
262        self.accepted_schema_version
263    }
264
265    #[must_use]
266    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
267        self.fingerprint_method_version
268    }
269
270    #[must_use]
271    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
272        self.accepted_schema_fingerprint
273    }
274}
275
276#[derive(Clone, Debug, Eq, PartialEq)]
277pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
278    identity: AcceptedCatalogIdentity,
279    raw_snapshot: Vec<u8>,
280}
281
282impl AcceptedCatalogSnapshotSelection {
283    #[must_use]
284    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
285        Self {
286            identity,
287            raw_snapshot,
288        }
289    }
290
291    #[must_use]
292    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
293        self.identity
294    }
295
296    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
297        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
298        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
299        let identity = self.identity();
300
301        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
302            return Err(InternalError::store_invariant(
303                "accepted catalog identity selected a different schema version than the decoded snapshot",
304            ));
305        }
306        if accepted.entity_path() != identity.entity_path() {
307            return Err(InternalError::store_invariant(
308                "accepted catalog identity selected a different entity path than the decoded snapshot",
309            ));
310        }
311
312        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
313        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
314            return Err(InternalError::store_invariant(
315                "accepted catalog identity fingerprint did not match the decoded snapshot",
316            ));
317        }
318
319        Ok(accepted)
320    }
321}
322
323impl Storable for RawSchemaSnapshot {
324    fn to_bytes(&self) -> Cow<'_, [u8]> {
325        let Some(fingerprint) = self.accepted_schema_fingerprint else {
326            return Cow::Borrowed(self.as_bytes());
327        };
328
329        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
330        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
331        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
332        bytes.extend_from_slice(&fingerprint);
333        bytes.extend_from_slice(self.as_bytes());
334
335        Cow::Owned(bytes)
336    }
337
338    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
339        let bytes = bytes.into_owned();
340        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
341            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
342            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
343        {
344            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
345            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
346            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
347            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
348
349            return Self {
350                payload: bytes[fingerprint_end..].to_vec(),
351                accepted_schema_fingerprint: Some(fingerprint),
352            };
353        }
354
355        Self {
356            payload: bytes,
357            accepted_schema_fingerprint: None,
358        }
359    }
360
361    fn into_bytes(self) -> Vec<u8> {
362        let Some(fingerprint) = self.accepted_schema_fingerprint else {
363            return self.payload;
364        };
365
366        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
367        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
368        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
369        bytes.extend_from_slice(&fingerprint);
370        bytes.extend_from_slice(&self.payload);
371
372        bytes
373    }
374
375    const BOUND: StorableBound = StorableBound::Bounded {
376        max_size: MAX_SCHEMA_SNAPSHOT_BYTES + RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32,
377        is_fixed_size: false,
378    };
379}
380
381// Validate typed schema snapshots before they are encoded into the raw schema
382// metadata store. This catches caller-side invariant violations separately from
383// raw persisted-byte corruption handled by the codec decode boundary.
384fn validate_typed_schema_snapshot_for_store(
385    snapshot: &PersistedSchemaSnapshot,
386) -> Result<(), InternalError> {
387    if let Some(detail) = schema_snapshot_integrity_detail(
388        "schema snapshot",
389        snapshot.version(),
390        snapshot.primary_key_field_ids(),
391        snapshot.row_layout(),
392        snapshot.fields(),
393    ) {
394        return Err(InternalError::store_invariant(detail));
395    }
396
397    Ok(())
398}
399
400///
401/// SchemaStoreFootprint
402///
403/// Current raw schema metadata footprint for one entity. Reconciliation uses
404/// this value to report stable-memory pressure without decoding schema payloads
405/// or exposing field-level metadata through metrics.
406///
407
408#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub(in crate::db) struct SchemaStoreFootprint {
410    snapshots: u64,
411    encoded_bytes: u64,
412    latest_snapshot_bytes: u64,
413}
414
415///
416/// SchemaStoreCatalogMetadata
417///
418/// Accepted schema-store catalog metadata derived from latest persisted
419/// snapshots. This is diagnostic allocation metadata, not allocation identity.
420///
421
422#[derive(Clone, Copy, Debug, Eq, PartialEq)]
423pub(in crate::db) struct SchemaStoreCatalogMetadata {
424    schema_version: SchemaVersion,
425    schema_fingerprint_method_version: u8,
426    schema_fingerprint: CommitSchemaFingerprint,
427    entity_count: u64,
428}
429
430impl SchemaStoreCatalogMetadata {
431    /// Build catalog metadata from already-derived accepted schema facts.
432    #[must_use]
433    const fn new(
434        schema_version: SchemaVersion,
435        schema_fingerprint_method_version: u8,
436        schema_fingerprint: CommitSchemaFingerprint,
437        entity_count: u64,
438    ) -> Self {
439        Self {
440            schema_version,
441            schema_fingerprint_method_version,
442            schema_fingerprint,
443            entity_count,
444        }
445    }
446
447    /// Return the maximum latest schema version represented in the catalog.
448    #[must_use]
449    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
450        self.schema_version
451    }
452
453    /// Return the fingerprint method version for this diagnostic metadata row.
454    #[must_use]
455    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
456        self.schema_fingerprint_method_version
457    }
458
459    /// Return the deterministic catalog fingerprint for latest accepted
460    /// snapshots.
461    #[must_use]
462    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
463        self.schema_fingerprint
464    }
465
466    /// Return number of entity schemas represented in this catalog metadata.
467    #[must_use]
468    pub(in crate::db) const fn entity_count(self) -> u64 {
469        self.entity_count
470    }
471}
472
473///
474/// SchemaStoreAllocationMetadata
475///
476/// Role-specific allocation metadata derived from latest accepted schema-store
477/// snapshots. These fingerprints describe the accepted contract that owns each
478/// allocation role; they are diagnostics, not allocation identity.
479///
480
481#[derive(Clone, Copy, Debug, Eq, PartialEq)]
482pub(in crate::db) struct SchemaStoreAllocationMetadata {
483    data: SchemaStoreCatalogMetadata,
484    index: SchemaStoreCatalogMetadata,
485    schema: SchemaStoreCatalogMetadata,
486}
487
488impl SchemaStoreAllocationMetadata {
489    /// Build one role-specific metadata set from already-derived accepted
490    /// schema facts.
491    #[must_use]
492    const fn new(
493        data: SchemaStoreCatalogMetadata,
494        index: SchemaStoreCatalogMetadata,
495        schema: SchemaStoreCatalogMetadata,
496    ) -> Self {
497        Self {
498            data,
499            index,
500            schema,
501        }
502    }
503
504    /// Return accepted row-layout allocation metadata for data memory.
505    #[must_use]
506    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
507        self.data
508    }
509
510    /// Return accepted index-catalog allocation metadata for index memory.
511    #[must_use]
512    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
513        self.index
514    }
515
516    /// Return accepted full schema-catalog allocation metadata for schema
517    /// memory.
518    #[must_use]
519    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
520        self.schema
521    }
522}
523
524impl SchemaStoreFootprint {
525    /// Build one schema-store footprint from already-counted raw payload facts.
526    #[must_use]
527    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
528        Self {
529            snapshots,
530            encoded_bytes,
531            latest_snapshot_bytes,
532        }
533    }
534
535    /// Return the number of raw schema snapshots stored for the entity.
536    #[must_use]
537    pub(in crate::db) const fn snapshots(self) -> u64 {
538        self.snapshots
539    }
540
541    /// Return the total encoded payload bytes stored for the entity.
542    #[must_use]
543    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
544        self.encoded_bytes
545    }
546
547    /// Return the encoded payload bytes for the highest-version snapshot.
548    #[must_use]
549    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
550        self.latest_snapshot_bytes
551    }
552}
553
554///
555/// SchemaStore
556///
557/// Thin persistence wrapper over one stable or heap schema metadata BTreeMap.
558/// Startup reconciliation writes and validates encoded schema snapshots here
559/// before row/index operations proceed.
560///
561
562pub struct SchemaStore {
563    backend: SchemaStoreBackend,
564}
565
566enum SchemaStoreBackend {
567    Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
568    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
569    Journaled {
570        canonical:
571            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
572        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
573        tombstones: BTreeSet<RawSchemaKey>,
574    },
575}
576
577/// Control-flow result for schema-store traversal visitors.
578#[derive(Clone, Copy, Debug, Eq, PartialEq)]
579enum SchemaStoreVisit {
580    Continue,
581    #[allow(
582        dead_code,
583        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
584    )]
585    Stop,
586}
587
588impl SchemaStoreVisit {
589    const fn should_stop(self) -> bool {
590        matches!(self, Self::Stop)
591    }
592}
593
594impl SchemaStore {
595    /// Initialize the schema store with the provided backing memory.
596    #[must_use]
597    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
598        Self {
599            backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
600        }
601    }
602
603    /// Initialize a volatile heap-backed schema store.
604    #[must_use]
605    pub const fn init_heap() -> Self {
606        Self {
607            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
608        }
609    }
610
611    /// Initialize a journaled cached-stable schema store.
612    ///
613    /// Normal schema publication writes only the live projection. Canonical
614    /// stable schema history is updated by future journal fold/recovery paths.
615    #[must_use]
616    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
617        Self {
618            backend: SchemaStoreBackend::Journaled {
619                canonical: StableBTreeMap::init(memory),
620                live: StdBTreeMap::new(),
621                tombstones: BTreeSet::new(),
622            },
623        }
624    }
625
626    /// Insert or replace one typed persisted schema snapshot.
627    pub(in crate::db) fn insert_persisted_snapshot(
628        &mut self,
629        entity: EntityTag,
630        snapshot: &PersistedSchemaSnapshot,
631    ) -> Result<(), InternalError> {
632        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
633        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
634        let _ = self.insert_raw_snapshot(key, raw_snapshot);
635
636        Ok(())
637    }
638
639    /// Insert one typed persisted schema snapshot only if the current live
640    /// accepted catalog identity still matches the identity captured before
641    /// schema mutation planning.
642    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
643        &mut self,
644        expected: AcceptedCatalogIdentity,
645        snapshot: &PersistedSchemaSnapshot,
646    ) -> Result<(), InternalError> {
647        let live = self.latest_catalog_identity(
648            expected.entity_tag(),
649            expected.entity_path(),
650            expected.store_path(),
651        )?;
652        if live
653            .as_ref()
654            .map(AcceptedCatalogSnapshotSelection::identity)
655            != Some(expected)
656        {
657            return Err(InternalError::schema_ddl_publication_race_lost(
658                expected.entity_path(),
659            ));
660        }
661
662        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
663    }
664
665    /// Reset the volatile projection for journaled recovery without mutating
666    /// the canonical stable schema base.
667    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
668        let SchemaStoreBackend::Journaled {
669            live, tombstones, ..
670        } = &mut self.backend
671        else {
672            return Err(InternalError::store_invariant(
673                "journaled live projection reset requires a journaled schema store",
674            ));
675        };
676
677        live.clear();
678        tombstones.clear();
679
680        Ok(())
681    }
682
683    /// Apply one folded journal schema snapshot into the canonical stable base.
684    pub(in crate::db) fn fold_persisted_snapshot(
685        &mut self,
686        entity: EntityTag,
687        snapshot: &PersistedSchemaSnapshot,
688    ) -> Result<(), InternalError> {
689        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
690            return Err(InternalError::store_invariant(
691                "journal schema fold requires a journaled schema store",
692            ));
693        };
694
695        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
696        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
697        canonical.insert(key, raw_snapshot);
698
699        Ok(())
700    }
701
702    /// Load and decode one typed persisted schema snapshot.
703    #[cfg(test)]
704    pub(in crate::db) fn get_persisted_snapshot(
705        &self,
706        entity: EntityTag,
707        version: SchemaVersion,
708    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
709        let key = RawSchemaKey::from_entity_version(entity, version);
710        self.get_raw_snapshot(&key)
711            .map(|snapshot| snapshot.decode_persisted_snapshot())
712            .transpose()
713    }
714
715    /// Load and decode the highest stored schema snapshot version for one entity.
716    pub(in crate::db) fn latest_persisted_snapshot(
717        &self,
718        entity: EntityTag,
719    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
720        self.latest_raw_snapshot(entity)
721            .map(|snapshot| snapshot.decode_persisted_snapshot())
722            .transpose()
723    }
724
725    /// Return the latest accepted catalog identity for one entity without
726    /// decoding the selected schema snapshot.
727    pub(in crate::db) fn latest_catalog_identity(
728        &self,
729        entity: EntityTag,
730        entity_path: &'static str,
731        store_path: &'static str,
732    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
733        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
734            return Ok(None);
735        };
736        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
737        let identity =
738            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
739
740        Ok(Some(AcceptedCatalogSnapshotSelection::new(
741            identity,
742            raw_snapshot.into_bytes(),
743        )))
744    }
745
746    /// Return raw schema-store footprint facts for one entity.
747    #[must_use]
748    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
749        let mut snapshots = 0u64;
750        let mut encoded_bytes = 0u64;
751        let mut latest = None::<(SchemaVersion, u64)>;
752
753        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
754            if key.entity_tag() != entity {
755                return Ok(SchemaStoreVisit::Continue);
756            }
757
758            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
759            snapshots = snapshots.saturating_add(1);
760            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
761
762            let version = SchemaVersion::new(key.version());
763            if latest
764                .as_ref()
765                .is_none_or(|(latest_version, _)| version > *latest_version)
766            {
767                latest = Some((version, snapshot_bytes));
768            }
769            Ok(SchemaStoreVisit::Continue)
770        });
771
772        SchemaStoreFootprint::new(
773            snapshots,
774            encoded_bytes,
775            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
776        )
777    }
778
779    /// Derive accepted catalog metadata from latest persisted schema snapshots.
780    ///
781    /// This function intentionally reads only the persisted schema store. It
782    /// does not reconstruct metadata from generated models when the store has
783    /// no accepted snapshots.
784    #[cfg(test)]
785    pub(in crate::db) fn catalog_metadata(
786        &self,
787    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
788        Ok(self
789            .allocation_metadata()?
790            .map(SchemaStoreAllocationMetadata::schema))
791    }
792
793    /// Derive role-specific allocation metadata from latest persisted schema
794    /// snapshots.
795    ///
796    /// This function intentionally reads only accepted schema-store payloads.
797    /// It never reconstructs metadata from generated models when the store has
798    /// no accepted snapshots.
799    pub(in crate::db) fn allocation_metadata(
800        &self,
801    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
802        let latest_by_entity = self.latest_raw_snapshots_by_entity();
803        if latest_by_entity.is_empty() {
804            return Ok(None);
805        }
806
807        Ok(Some(SchemaStoreAllocationMetadata::new(
808            derive_data_allocation_metadata(&latest_by_entity)?,
809            derive_index_allocation_metadata(&latest_by_entity)?,
810            derive_schema_catalog_metadata(&latest_by_entity)?,
811        )))
812    }
813
814    /// Insert or replace one raw schema snapshot.
815    fn insert_raw_snapshot(
816        &mut self,
817        key: RawSchemaKey,
818        snapshot: RawSchemaSnapshot,
819    ) -> Option<RawSchemaSnapshot> {
820        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
821            self.get_raw_snapshot_for_backend(&key)
822        } else {
823            None
824        };
825        match &mut self.backend {
826            SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
827            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
828            SchemaStoreBackend::Journaled {
829                live, tombstones, ..
830            } => {
831                tombstones.remove(&key);
832                live.insert(key, snapshot);
833                previous_journaled
834            }
835        }
836    }
837
838    /// Load one raw schema snapshot by key.
839    #[must_use]
840    #[cfg(test)]
841    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
842        match &self.backend {
843            SchemaStoreBackend::Stable(map) => map.get(key),
844            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
845            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
846        }
847    }
848
849    /// Return whether one schema snapshot key is present.
850    #[must_use]
851    #[cfg(test)]
852    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
853        match &self.backend {
854            SchemaStoreBackend::Stable(map) => map.contains_key(key),
855            SchemaStoreBackend::Heap(map) => map.contains_key(key),
856            SchemaStoreBackend::Journaled { .. } => {
857                self.get_raw_snapshot_for_backend(key).is_some()
858            }
859        }
860    }
861
862    /// Return the number of schema snapshot entries in this store.
863    #[must_use]
864    #[cfg(test)]
865    pub(in crate::db) fn len(&self) -> u64 {
866        match &self.backend {
867            SchemaStoreBackend::Stable(map) => map.len(),
868            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
869            SchemaStoreBackend::Journaled { .. } => {
870                let mut count = 0_u64;
871                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
872                    count = count.saturating_add(1);
873                    Ok(SchemaStoreVisit::Continue)
874                });
875                count
876            }
877        }
878    }
879
880    /// Return whether this schema store currently has no persisted snapshots.
881    #[must_use]
882    #[cfg(test)]
883    pub(in crate::db) fn is_empty(&self) -> bool {
884        match &self.backend {
885            SchemaStoreBackend::Stable(map) => map.is_empty(),
886            SchemaStoreBackend::Heap(map) => map.is_empty(),
887            SchemaStoreBackend::Journaled { .. } => {
888                let mut empty = true;
889                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
890                    empty = false;
891                    Ok(SchemaStoreVisit::Stop)
892                });
893                empty
894            }
895        }
896    }
897
898    /// Clear all schema metadata entries from the store.
899    #[cfg(test)]
900    pub(in crate::db) fn clear(&mut self) {
901        match &mut self.backend {
902            SchemaStoreBackend::Stable(map) => map.clear_new(),
903            SchemaStoreBackend::Heap(map) => map.clear(),
904            SchemaStoreBackend::Journaled {
905                canonical,
906                live,
907                tombstones,
908            } => {
909                live.clear();
910                tombstones.clear();
911                for entry in canonical.iter() {
912                    tombstones.insert(*entry.key());
913                }
914            }
915        }
916    }
917
918    fn latest_raw_snapshots_by_entity(
919        &self,
920    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
921        #[cfg(test)]
922        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
923
924        let mut latest_by_entity =
925            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
926
927        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
928            let version = SchemaVersion::new(key.version());
929            match latest_by_entity.get_mut(&key.entity_tag()) {
930                Some((latest_version, latest_snapshot)) if version > *latest_version => {
931                    *latest_version = version;
932                    *latest_snapshot = snapshot.clone();
933                }
934                None => {
935                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
936                }
937                Some(_) => {}
938            }
939            Ok(SchemaStoreVisit::Continue)
940        });
941
942        latest_by_entity
943    }
944
945    /// Visit raw schema snapshots in canonical store order without exposing
946    /// the backing stable-map iterator.
947    fn visit_raw_snapshots<E>(
948        &self,
949        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
950    ) -> Result<(), E> {
951        match &self.backend {
952            SchemaStoreBackend::Stable(map) => {
953                let mut visitor = visitor;
954                for entry in map.iter() {
955                    if visitor(entry.key(), &entry.value())?.should_stop() {
956                        break;
957                    }
958                }
959            }
960            SchemaStoreBackend::Heap(map) => {
961                let mut visitor = visitor;
962                for (key, snapshot) in map {
963                    if visitor(key, snapshot)?.should_stop() {
964                        break;
965                    }
966                }
967            }
968            SchemaStoreBackend::Journaled {
969                canonical,
970                live,
971                tombstones,
972            } => Self::visit_journaled_raw_snapshot_range(
973                canonical,
974                live,
975                tombstones,
976                (RangeBound::Unbounded, RangeBound::Unbounded),
977                Direction::Asc,
978                visitor,
979            )?,
980        }
981
982        Ok(())
983    }
984
985    #[cfg(test)]
986    #[must_use]
987    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
988        match &self.backend {
989            SchemaStoreBackend::Stable(map)
990            | SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
991            SchemaStoreBackend::Heap(_) => 0,
992        }
993    }
994
995    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
996        let SchemaStoreBackend::Journaled {
997            canonical,
998            live,
999            tombstones,
1000        } = &self.backend
1001        else {
1002            return None;
1003        };
1004
1005        if tombstones.contains(key) {
1006            return None;
1007        }
1008        live.get(key).cloned().or_else(|| canonical.get(key))
1009    }
1010
1011    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
1012        self.latest_raw_snapshot_entry(entity)
1013            .map(|(_, snapshot)| snapshot)
1014    }
1015
1016    fn latest_raw_snapshot_entry(
1017        &self,
1018        entity: EntityTag,
1019    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
1020        let bounds = RawSchemaKey::entity_range_bounds(entity);
1021        match &self.backend {
1022            SchemaStoreBackend::Stable(map) => map
1023                .range((bounds.0, bounds.1))
1024                .next_back()
1025                .map(|entry| (SchemaVersion::new(entry.key().version()), entry.value())),
1026            SchemaStoreBackend::Heap(map) => map
1027                .range((bounds.0, bounds.1))
1028                .next_back()
1029                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
1030            SchemaStoreBackend::Journaled {
1031                canonical,
1032                live,
1033                tombstones,
1034            } => {
1035                let mut latest = None;
1036                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
1037                    canonical,
1038                    live,
1039                    tombstones,
1040                    bounds,
1041                    Direction::Desc,
1042                    |key, snapshot| {
1043                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1044                        Ok(SchemaStoreVisit::Stop)
1045                    },
1046                );
1047                latest
1048            }
1049        }
1050    }
1051
1052    fn visit_journaled_raw_snapshot_range<E>(
1053        canonical: &StableBTreeMap<
1054            RawSchemaKey,
1055            RawSchemaSnapshot,
1056            VirtualMemory<DefaultMemoryImpl>,
1057        >,
1058        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1059        tombstones: &BTreeSet<RawSchemaKey>,
1060        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1061        direction: Direction,
1062        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1063    ) -> Result<(), E> {
1064        match direction {
1065            Direction::Asc => visit_ordered_overlay(
1066                canonical.range((bounds.0, bounds.1)),
1067                live.range((bounds.0, bounds.1)),
1068                Direction::Asc,
1069                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1070                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1071                |live_entry| !tombstones.contains(live_entry.0),
1072                |entry| {
1073                    let visit = match entry {
1074                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1075                            visitor(canonical_entry.key(), &canonical_entry.value())?
1076                        }
1077                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1078                    };
1079                    Ok(if visit.should_stop() {
1080                        OrderedOverlayVisit::Stop
1081                    } else {
1082                        OrderedOverlayVisit::Continue
1083                    })
1084                },
1085            ),
1086            Direction::Desc => visit_ordered_overlay(
1087                canonical.range((bounds.0, bounds.1)).rev(),
1088                live.range((bounds.0, bounds.1)).rev(),
1089                Direction::Desc,
1090                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1091                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1092                |live_entry| !tombstones.contains(live_entry.0),
1093                |entry| {
1094                    let visit = match entry {
1095                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1096                            visitor(canonical_entry.key(), &canonical_entry.value())?
1097                        }
1098                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1099                    };
1100                    Ok(if visit.should_stop() {
1101                        OrderedOverlayVisit::Stop
1102                    } else {
1103                        OrderedOverlayVisit::Continue
1104                    })
1105                },
1106            ),
1107        }
1108    }
1109}
1110
1111fn derive_data_allocation_metadata(
1112    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1113) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1114    let mut max_version = SchemaVersion::initial();
1115    let mut hasher = new_hash_sha256();
1116    write_hash_tag_u8(
1117        &mut hasher,
1118        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1119    );
1120
1121    for (entity, (_, snapshot)) in latest_by_entity {
1122        let persisted = snapshot.decode_persisted_snapshot()?;
1123        if persisted.version() > max_version {
1124            max_version = persisted.version();
1125        }
1126
1127        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1128            persisted.version(),
1129            persisted.entity_path().to_string(),
1130            persisted.entity_name().to_string(),
1131            persisted.primary_key_field_ids().to_vec(),
1132            persisted.row_layout().clone(),
1133            persisted.fields().to_vec(),
1134            Vec::new(),
1135        );
1136        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1137
1138        write_hash_u64(&mut hasher, entity.value());
1139        write_hash_u32(&mut hasher, persisted.version().get());
1140        write_hash_len_u32(&mut hasher, encoded.len());
1141        hasher.update(encoded);
1142    }
1143
1144    Ok(finalize_schema_metadata(
1145        max_version,
1146        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1147        hasher,
1148        latest_by_entity.len(),
1149    ))
1150}
1151
1152fn derive_index_allocation_metadata(
1153    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1154) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1155    let mut max_version = SchemaVersion::initial();
1156    let mut hasher = new_hash_sha256();
1157    write_hash_tag_u8(
1158        &mut hasher,
1159        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1160    );
1161
1162    for (entity, (_, snapshot)) in latest_by_entity {
1163        let persisted = snapshot.decode_persisted_snapshot()?;
1164        if persisted.version() > max_version {
1165            max_version = persisted.version();
1166        }
1167
1168        write_hash_u64(&mut hasher, entity.value());
1169        write_hash_u32(&mut hasher, persisted.version().get());
1170        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1171        for index in persisted.indexes() {
1172            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1173            write_hash_str_u32(&mut hasher, index.name());
1174            write_hash_str_u32(&mut hasher, index.store());
1175            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1176            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1177            match index.predicate_sql() {
1178                Some(predicate_sql) => {
1179                    write_hash_tag_u8(&mut hasher, 1);
1180                    write_hash_str_u32(&mut hasher, predicate_sql);
1181                }
1182                None => write_hash_tag_u8(&mut hasher, 0),
1183            }
1184            hash_persisted_index_key(&mut hasher, index.key());
1185        }
1186    }
1187
1188    Ok(finalize_schema_metadata(
1189        max_version,
1190        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1191        hasher,
1192        latest_by_entity.len(),
1193    ))
1194}
1195
1196fn derive_schema_catalog_metadata(
1197    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1198) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1199    let mut max_version = SchemaVersion::initial();
1200    let mut hasher = new_hash_sha256();
1201    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1202
1203    for (entity, (version, snapshot)) in latest_by_entity {
1204        let persisted = snapshot.decode_persisted_snapshot()?;
1205        if persisted.version() > max_version {
1206            max_version = persisted.version();
1207        }
1208
1209        write_hash_u64(&mut hasher, entity.value());
1210        write_hash_u32(&mut hasher, version.get());
1211        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1212        hasher.update(snapshot.as_bytes());
1213    }
1214
1215    Ok(finalize_schema_metadata(
1216        max_version,
1217        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1218        hasher,
1219        latest_by_entity.len(),
1220    ))
1221}
1222
1223fn finalize_schema_metadata(
1224    schema_version: SchemaVersion,
1225    schema_fingerprint_method_version: u8,
1226    hasher: sha2::Sha256,
1227    entity_count: usize,
1228) -> SchemaStoreCatalogMetadata {
1229    let digest = finalize_hash_sha256(hasher);
1230    let mut schema_fingerprint = [0u8; 16];
1231    schema_fingerprint.copy_from_slice(&digest[..16]);
1232
1233    SchemaStoreCatalogMetadata::new(
1234        schema_version,
1235        schema_fingerprint_method_version,
1236        schema_fingerprint,
1237        u64::try_from(entity_count).unwrap_or(u64::MAX),
1238    )
1239}
1240
1241fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1242    match key {
1243        PersistedIndexKeySnapshot::FieldPath(paths) => {
1244            write_hash_tag_u8(hasher, 1);
1245            write_hash_len_u32(hasher, paths.len());
1246            for path in paths {
1247                hash_persisted_index_field_path(hasher, path);
1248            }
1249        }
1250        PersistedIndexKeySnapshot::Items(items) => {
1251            write_hash_tag_u8(hasher, 2);
1252            write_hash_len_u32(hasher, items.len());
1253            for item in items {
1254                match item {
1255                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1256                        write_hash_tag_u8(hasher, 1);
1257                        hash_persisted_index_field_path(hasher, path);
1258                    }
1259                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1260                        write_hash_tag_u8(hasher, 2);
1261                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1262                        hash_persisted_index_field_path(hasher, expression.source());
1263                        hash_persisted_field_kind(hasher, expression.input_kind());
1264                        hash_persisted_field_kind(hasher, expression.output_kind());
1265                        write_hash_str_u32(hasher, expression.canonical_text());
1266                    }
1267                }
1268            }
1269        }
1270    }
1271}
1272
1273fn hash_persisted_index_field_path(
1274    hasher: &mut sha2::Sha256,
1275    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1276) {
1277    write_hash_u32(hasher, path.field_id().get());
1278    write_hash_u32(hasher, u32::from(path.slot().get()));
1279    write_hash_len_u32(hasher, path.path().len());
1280    for segment in path.path() {
1281        write_hash_str_u32(hasher, segment);
1282    }
1283    hash_persisted_field_kind(hasher, path.kind());
1284    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1285}
1286
1287fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1288    match kind {
1289        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1290        PersistedFieldKind::Blob { max_len } => {
1291            write_hash_tag_u8(hasher, 2);
1292            hash_optional_u32(hasher, *max_len);
1293        }
1294        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1295        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1296        PersistedFieldKind::Decimal { scale } => {
1297            write_hash_tag_u8(hasher, 5);
1298            write_hash_u32(hasher, *scale);
1299        }
1300        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1301        PersistedFieldKind::Enum { path, variants } => {
1302            write_hash_tag_u8(hasher, 7);
1303            write_hash_str_u32(hasher, path);
1304            write_hash_len_u32(hasher, variants.len());
1305            for variant in variants {
1306                write_hash_str_u32(hasher, variant.ident());
1307                match variant.payload_kind() {
1308                    Some(payload_kind) => {
1309                        write_hash_tag_u8(hasher, 1);
1310                        hash_persisted_field_kind(hasher, payload_kind);
1311                    }
1312                    None => write_hash_tag_u8(hasher, 0),
1313                }
1314                write_hash_str_u32(
1315                    hasher,
1316                    field_storage_decode_name(variant.payload_storage_decode()),
1317                );
1318            }
1319        }
1320        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1321        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1322        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1323        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1324        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1325        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1326        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1327        PersistedFieldKind::IntBig { max_bytes } => {
1328            write_hash_tag_u8(hasher, 15);
1329            write_hash_u32(hasher, *max_bytes);
1330        }
1331        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1332        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1333        PersistedFieldKind::Text { max_len } => {
1334            write_hash_tag_u8(hasher, 18);
1335            hash_optional_u32(hasher, *max_len);
1336        }
1337        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1338        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1339        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1340        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1341        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1342        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1343        PersistedFieldKind::NatBig { max_bytes } => {
1344            write_hash_tag_u8(hasher, 25);
1345            write_hash_u32(hasher, *max_bytes);
1346        }
1347        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1348        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1349        PersistedFieldKind::Relation {
1350            target_path,
1351            target_entity_name,
1352            target_entity_tag,
1353            target_store_path,
1354            key_kind,
1355            strength,
1356        } => {
1357            write_hash_tag_u8(hasher, 28);
1358            write_hash_str_u32(hasher, target_path);
1359            write_hash_str_u32(hasher, target_entity_name);
1360            write_hash_u64(hasher, target_entity_tag.value());
1361            write_hash_str_u32(hasher, target_store_path);
1362            hash_persisted_field_kind(hasher, key_kind);
1363            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1364        }
1365        PersistedFieldKind::List(inner) => {
1366            write_hash_tag_u8(hasher, 29);
1367            hash_persisted_field_kind(hasher, inner);
1368        }
1369        PersistedFieldKind::Set(inner) => {
1370            write_hash_tag_u8(hasher, 30);
1371            hash_persisted_field_kind(hasher, inner);
1372        }
1373        PersistedFieldKind::Map { key, value } => {
1374            write_hash_tag_u8(hasher, 31);
1375            hash_persisted_field_kind(hasher, key);
1376            hash_persisted_field_kind(hasher, value);
1377        }
1378        PersistedFieldKind::Structured { queryable } => {
1379            write_hash_tag_u8(hasher, 32);
1380            write_hash_tag_u8(hasher, u8::from(*queryable));
1381        }
1382    }
1383}
1384
1385fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1386    match value {
1387        Some(value) => {
1388            write_hash_tag_u8(hasher, 1);
1389            write_hash_u32(hasher, value);
1390        }
1391        None => write_hash_tag_u8(hasher, 0),
1392    }
1393}
1394
1395const fn persisted_index_origin_name(
1396    origin: crate::db::schema::PersistedIndexOrigin,
1397) -> &'static str {
1398    match origin {
1399        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1400        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1401    }
1402}
1403
1404const fn persisted_expression_op_name(
1405    op: crate::db::schema::PersistedIndexExpressionOp,
1406) -> &'static str {
1407    match op {
1408        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1409        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1410        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1411        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1412        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1413        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1414        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1415        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1416    }
1417}
1418
1419const fn persisted_relation_strength_name(
1420    strength: crate::db::schema::PersistedRelationStrength,
1421) -> &'static str {
1422    match strength {
1423        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1424        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1425    }
1426}
1427
1428const fn field_storage_decode_name(
1429    decode: crate::model::field::FieldStorageDecode,
1430) -> &'static str {
1431    match decode {
1432        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1433        crate::model::field::FieldStorageDecode::Value => "value",
1434    }
1435}
1436
1437///
1438/// TESTS
1439///
1440
1441#[cfg(test)]
1442mod tests {
1443    use super::{
1444        RawSchemaKey, RawSchemaSnapshot, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1445        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1446        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION, SchemaStore, SchemaStoreBackend,
1447        SchemaStoreVisit,
1448    };
1449    use crate::{
1450        db::{
1451            direction::Direction,
1452            schema::{
1453                AcceptedSchemaSnapshot, FieldId, PersistedFieldKind, PersistedFieldSnapshot,
1454                PersistedIndexFieldPathSnapshot, PersistedIndexKeySnapshot, PersistedIndexSnapshot,
1455                PersistedNestedLeafSnapshot, PersistedSchemaSnapshot, SchemaFieldDefault,
1456                SchemaFieldSlot, SchemaRowLayout, SchemaVersion, accepted_schema_cache_fingerprint,
1457                encode_persisted_schema_snapshot, persisted_schema_snapshot_decode_count_for_tests,
1458                reset_persisted_schema_snapshot_decode_count_for_tests,
1459            },
1460        },
1461        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
1462        testing::test_memory,
1463        traits::Storable,
1464        types::EntityTag,
1465    };
1466    use std::borrow::Cow;
1467    use std::convert::Infallible;
1468
1469    #[test]
1470    fn raw_schema_key_round_trips_entity_and_version() {
1471        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
1472            SchemaVersion::initial()
1473        });
1474        let encoded = key.to_bytes().into_owned();
1475        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
1476
1477        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
1478        assert_eq!(decoded.version(), SchemaVersion::initial().get());
1479    }
1480
1481    #[test]
1482    fn raw_schema_snapshot_round_trips_payload_bytes() {
1483        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
1484        let encoded = snapshot.to_bytes().into_owned();
1485        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
1486
1487        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
1488        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
1489    }
1490
1491    #[test]
1492    fn raw_schema_snapshot_round_trips_identity_header_for_typed_snapshot() {
1493        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Header");
1494        let accepted = AcceptedSchemaSnapshot::try_new(snapshot.clone())
1495            .expect("typed schema snapshot should be accepted");
1496        let expected_fingerprint = accepted_schema_cache_fingerprint(&accepted)
1497            .expect("accepted schema fingerprint should derive");
1498        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1499            .expect("typed schema snapshot should encode");
1500        let encoded = raw.to_bytes().into_owned();
1501        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
1502        let owned_encoded = <RawSchemaSnapshot as Storable>::into_bytes(raw.clone());
1503        let owned_decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(owned_encoded));
1504
1505        assert_eq!(decoded.as_bytes(), raw.as_bytes());
1506        assert_eq!(
1507            decoded
1508                .accepted_schema_fingerprint()
1509                .expect("identity header should decode"),
1510            expected_fingerprint
1511        );
1512        assert_eq!(
1513            owned_decoded
1514                .accepted_schema_fingerprint()
1515                .expect("owned identity header should decode"),
1516            expected_fingerprint
1517        );
1518    }
1519
1520    #[test]
1521    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
1522        let mut store = SchemaStore::init(test_memory(251));
1523        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
1524
1525        assert!(store.is_empty());
1526        assert!(!store.contains_raw_snapshot(&key));
1527
1528        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
1529
1530        assert_eq!(store.len(), 1);
1531        assert!(store.contains_raw_snapshot(&key));
1532        assert_eq!(
1533            store
1534                .get_raw_snapshot(&key)
1535                .expect("schema snapshot should be present")
1536                .as_bytes(),
1537            &[9, 4, 6],
1538        );
1539
1540        store.clear();
1541        assert!(store.is_empty());
1542    }
1543
1544    #[test]
1545    fn schema_store_loads_latest_snapshot_for_entity() {
1546        let mut store = SchemaStore::init(test_memory(252));
1547        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1548        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1549        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1550
1551        store
1552            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1553            .expect("initial schema snapshot should encode");
1554        store
1555            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1556            .expect("other entity schema snapshot should encode");
1557        store
1558            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1559            .expect("newer schema snapshot should encode");
1560
1561        let latest = store
1562            .latest_persisted_snapshot(EntityTag::new(41))
1563            .expect("latest schema snapshot should decode")
1564            .expect("schema snapshot should exist");
1565
1566        assert_eq!(latest.version(), SchemaVersion::new(2));
1567        assert_eq!(latest.entity_name(), "Newer");
1568    }
1569
1570    #[test]
1571    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1572        let mut store = SchemaStore::init(test_memory(242));
1573        store.insert_raw_snapshot(
1574            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1575            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1576        );
1577        store.insert_raw_snapshot(
1578            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1579            RawSchemaSnapshot::from_bytes(vec![5, 8]),
1580        );
1581        store.insert_raw_snapshot(
1582            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1583            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1584        );
1585
1586        let footprint = store.entity_footprint(EntityTag::new(71));
1587
1588        assert_eq!(footprint.snapshots(), 2);
1589        assert_eq!(footprint.encoded_bytes(), 7);
1590        assert_eq!(footprint.latest_snapshot_bytes(), 4);
1591    }
1592
1593    #[test]
1594    fn schema_store_visit_raw_snapshots_preserves_key_order() {
1595        let mut store = SchemaStore::init(test_memory(235));
1596        store.insert_raw_snapshot(
1597            RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1598            RawSchemaSnapshot::from_bytes(vec![32]),
1599        );
1600        store.insert_raw_snapshot(
1601            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1602            RawSchemaSnapshot::from_bytes(vec![13]),
1603        );
1604        store.insert_raw_snapshot(
1605            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1606            RawSchemaSnapshot::from_bytes(vec![11]),
1607        );
1608
1609        let mut visited = Vec::new();
1610        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1611            visited.push((
1612                key.entity_tag().value(),
1613                key.version(),
1614                snapshot.as_bytes()[0],
1615            ));
1616            Ok(SchemaStoreVisit::Continue)
1617        });
1618
1619        assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1620    }
1621
1622    #[test]
1623    fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1624        let mut store = SchemaStore::init(test_memory(234));
1625        store.insert_raw_snapshot(
1626            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1627            RawSchemaSnapshot::from_bytes(vec![21]),
1628        );
1629        store.insert_raw_snapshot(
1630            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1631            RawSchemaSnapshot::from_bytes(vec![22]),
1632        );
1633
1634        let mut visited = Vec::new();
1635        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1636            visited.push(key.version());
1637            Ok(SchemaStoreVisit::Stop)
1638        });
1639
1640        assert_eq!(visited, vec![1]);
1641    }
1642
1643    #[test]
1644    fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1645        let mut store = SchemaStore::init_heap();
1646        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1647        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1648        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1649
1650        store
1651            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1652            .expect("initial heap schema snapshot should encode");
1653        store
1654            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1655            .expect("other heap schema snapshot should encode");
1656        store
1657            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1658            .expect("newer heap schema snapshot should encode");
1659
1660        let latest = store
1661            .latest_persisted_snapshot(EntityTag::new(41))
1662            .expect("latest heap schema snapshot should decode")
1663            .expect("heap schema snapshot should exist");
1664        assert_eq!(latest.version(), SchemaVersion::new(2));
1665        assert_eq!(latest.entity_name(), "Newer");
1666
1667        let mut visited = Vec::new();
1668        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1669            visited.push((
1670                key.entity_tag().value(),
1671                key.version(),
1672                snapshot.as_bytes().len(),
1673            ));
1674            Ok(if visited.len() == 2 {
1675                SchemaStoreVisit::Stop
1676            } else {
1677                SchemaStoreVisit::Continue
1678            })
1679        });
1680        assert_eq!(
1681            visited
1682                .iter()
1683                .map(|(entity, version, _)| (*entity, *version))
1684                .collect::<Vec<_>>(),
1685            vec![(41, 1), (41, 2)]
1686        );
1687    }
1688
1689    #[test]
1690    fn journaled_schema_store_streams_overlay_latest_snapshot_and_early_stop() {
1691        let mut store = SchemaStore::init_journaled(test_memory(233));
1692        let canonical_initial =
1693            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1694        let canonical_replaced =
1695            persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Canonical");
1696        let live_replacement = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Live");
1697        let live_newer = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "LiveNewer");
1698        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Other");
1699
1700        store
1701            .fold_persisted_snapshot(EntityTag::new(61), &canonical_initial)
1702            .expect("initial canonical schema snapshot should encode");
1703        store
1704            .fold_persisted_snapshot(EntityTag::new(61), &canonical_replaced)
1705            .expect("canonical schema snapshot should encode");
1706        store
1707            .fold_persisted_snapshot(EntityTag::new(62), &other_entity)
1708            .expect("other canonical schema snapshot should encode");
1709        store
1710            .insert_persisted_snapshot(EntityTag::new(61), &live_replacement)
1711            .expect("live replacement schema snapshot should encode");
1712        store
1713            .insert_persisted_snapshot(EntityTag::new(61), &live_newer)
1714            .expect("live newer schema snapshot should encode");
1715
1716        let latest = store
1717            .latest_persisted_snapshot(EntityTag::new(61))
1718            .expect("latest journaled schema snapshot should decode")
1719            .expect("journaled schema snapshot should exist");
1720        assert_eq!(latest.version(), SchemaVersion::new(3));
1721        assert_eq!(latest.entity_name(), "LiveNewer");
1722        assert_eq!(store.len(), 4);
1723
1724        let mut visited = Vec::new();
1725        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1726            let decoded = snapshot
1727                .decode_persisted_snapshot()
1728                .expect("visited schema snapshot should decode");
1729            visited.push((
1730                key.entity_tag().value(),
1731                key.version(),
1732                decoded.entity_name().to_string(),
1733            ));
1734            Ok(if visited.len() == 3 {
1735                SchemaStoreVisit::Stop
1736            } else {
1737                SchemaStoreVisit::Continue
1738            })
1739        });
1740        assert_eq!(
1741            visited,
1742            vec![
1743                (61, 1, "Initial".to_string()),
1744                (61, 2, "Live".to_string()),
1745                (61, 3, "LiveNewer".to_string()),
1746            ],
1747        );
1748
1749        store.clear();
1750        assert!(store.is_empty());
1751        assert!(
1752            store
1753                .latest_persisted_snapshot(EntityTag::new(61))
1754                .expect("cleared journaled latest snapshot lookup should decode")
1755                .is_none(),
1756        );
1757    }
1758
1759    #[test]
1760    fn journaled_schema_store_latest_snapshot_reads_each_overlay_source() {
1761        let entity = EntityTag::new(71);
1762
1763        let mut canonical_only = SchemaStore::init_journaled(test_memory(231));
1764        let canonical =
1765            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalOnly");
1766        canonical_only
1767            .fold_persisted_snapshot(entity, &canonical)
1768            .expect("canonical-only schema snapshot should encode");
1769        assert_latest_schema(
1770            &canonical_only,
1771            entity,
1772            SchemaVersion::initial(),
1773            "CanonicalOnly",
1774        );
1775
1776        let mut live_only = SchemaStore::init_journaled(test_memory(230));
1777        let live = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveOnly");
1778        live_only
1779            .insert_persisted_snapshot(entity, &live)
1780            .expect("live-only schema snapshot should encode");
1781        assert_latest_schema(&live_only, entity, SchemaVersion::new(2), "LiveOnly");
1782
1783        let mut live_override = SchemaStore::init_journaled(test_memory(229));
1784        let canonical_duplicate =
1785            persisted_schema_snapshot_for_test(SchemaVersion::new(3), "CanonicalDuplicate");
1786        let live_duplicate =
1787            persisted_schema_snapshot_for_test(SchemaVersion::new(3), "LiveDuplicate");
1788        live_override
1789            .fold_persisted_snapshot(entity, &canonical_duplicate)
1790            .expect("canonical duplicate schema snapshot should encode");
1791        live_override
1792            .insert_persisted_snapshot(entity, &live_duplicate)
1793            .expect("live duplicate schema snapshot should encode");
1794        assert_latest_schema(
1795            &live_override,
1796            entity,
1797            SchemaVersion::new(3),
1798            "LiveDuplicate",
1799        );
1800    }
1801
1802    #[test]
1803    fn journaled_schema_store_descending_range_orders_live_between_canonical_versions() {
1804        let mut store = SchemaStore::init_journaled(test_memory(228));
1805        let entity = EntityTag::new(72);
1806        let lower_entity = EntityTag::new(71);
1807        let higher_entity = EntityTag::new(73);
1808        let canonical_initial =
1809            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalV1");
1810        let canonical_duplicate =
1811            persisted_schema_snapshot_for_test(SchemaVersion::new(2), "CanonicalV2");
1812        let live_duplicate = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveV2");
1813        let canonical_latest =
1814            persisted_schema_snapshot_for_test(SchemaVersion::new(3), "CanonicalV3");
1815        let unrelated_lower =
1816            persisted_schema_snapshot_for_test(SchemaVersion::new(9), "UnrelatedLower");
1817
1818        store
1819            .fold_persisted_snapshot(entity, &canonical_initial)
1820            .expect("canonical v1 schema snapshot should encode");
1821        store
1822            .fold_persisted_snapshot(entity, &canonical_duplicate)
1823            .expect("canonical v2 schema snapshot should encode");
1824        store
1825            .fold_persisted_snapshot(entity, &canonical_latest)
1826            .expect("canonical v3 schema snapshot should encode");
1827        store
1828            .fold_persisted_snapshot(lower_entity, &unrelated_lower)
1829            .expect("lower unrelated schema snapshot should encode");
1830        store
1831            .insert_persisted_snapshot(entity, &live_duplicate)
1832            .expect("live v2 schema snapshot should encode");
1833        store.insert_raw_snapshot(
1834            RawSchemaKey::from_entity_version(higher_entity, SchemaVersion::new(1)),
1835            RawSchemaSnapshot::from_bytes(vec![0xff]),
1836        );
1837
1838        let visited = visit_journaled_schema_range(&store, entity, Direction::Desc, usize::MAX);
1839        assert_eq!(
1840            visited,
1841            vec![
1842                (3, "CanonicalV3".to_string()),
1843                (2, "LiveV2".to_string()),
1844                (1, "CanonicalV1".to_string()),
1845            ],
1846        );
1847
1848        let early_stop = visit_journaled_schema_range(&store, entity, Direction::Desc, 1);
1849        assert_eq!(early_stop, vec![(3, "CanonicalV3".to_string())]);
1850    }
1851
1852    #[test]
1853    fn journaled_schema_store_latest_snapshot_skips_tombstoned_latest_version() {
1854        let entity = EntityTag::new(74);
1855
1856        let mut canonical_latest_tombstoned = SchemaStore::init_journaled(test_memory(227));
1857        let canonical_initial =
1858            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalV1");
1859        let canonical_latest =
1860            persisted_schema_snapshot_for_test(SchemaVersion::new(2), "CanonicalV2");
1861        canonical_latest_tombstoned
1862            .fold_persisted_snapshot(entity, &canonical_initial)
1863            .expect("canonical v1 schema snapshot should encode");
1864        canonical_latest_tombstoned
1865            .fold_persisted_snapshot(entity, &canonical_latest)
1866            .expect("canonical v2 schema snapshot should encode");
1867        tombstone_journaled_raw_snapshot(
1868            &mut canonical_latest_tombstoned,
1869            entity,
1870            SchemaVersion::new(2),
1871        );
1872
1873        assert_latest_schema(
1874            &canonical_latest_tombstoned,
1875            entity,
1876            SchemaVersion::initial(),
1877            "CanonicalV1",
1878        );
1879        assert!(
1880            canonical_latest_tombstoned
1881                .get_persisted_snapshot(entity, SchemaVersion::new(2))
1882                .expect("tombstoned canonical snapshot lookup should not decode")
1883                .is_none(),
1884        );
1885
1886        let mut live_latest_tombstoned = SchemaStore::init_journaled(test_memory(226));
1887        let live_latest = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveV2");
1888        live_latest_tombstoned
1889            .fold_persisted_snapshot(entity, &canonical_initial)
1890            .expect("canonical v1 schema snapshot should encode");
1891        live_latest_tombstoned
1892            .insert_persisted_snapshot(entity, &live_latest)
1893            .expect("live v2 schema snapshot should encode");
1894        tombstone_journaled_raw_snapshot(
1895            &mut live_latest_tombstoned,
1896            entity,
1897            SchemaVersion::new(2),
1898        );
1899
1900        assert_latest_schema(
1901            &live_latest_tombstoned,
1902            entity,
1903            SchemaVersion::initial(),
1904            "CanonicalV1",
1905        );
1906        assert!(
1907            live_latest_tombstoned
1908                .get_persisted_snapshot(entity, SchemaVersion::new(2))
1909                .expect("tombstoned live snapshot lookup should not decode")
1910                .is_none(),
1911        );
1912    }
1913
1914    #[test]
1915    fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1916        let store = SchemaStore::init(test_memory(241));
1917
1918        assert_eq!(
1919            store
1920                .catalog_metadata()
1921                .expect("empty schema catalog metadata should derive"),
1922            None
1923        );
1924    }
1925
1926    #[test]
1927    fn schema_store_latest_catalog_identity_uses_version_neutral_header_without_decoding() {
1928        let mut store = SchemaStore::init(test_memory(239));
1929        let entity = EntityTag::new(80);
1930        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Versioned");
1931        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Versioned");
1932        let expected_fingerprint = accepted_schema_cache_fingerprint(
1933            &AcceptedSchemaSnapshot::try_new(initial.clone())
1934                .expect("initial schema snapshot should be accepted"),
1935        )
1936        .expect("accepted schema fingerprint should derive");
1937
1938        store
1939            .insert_persisted_snapshot(entity, &initial)
1940            .expect("initial schema snapshot should encode");
1941        store
1942            .insert_persisted_snapshot(entity, &newer)
1943            .expect("newer schema snapshot should encode");
1944
1945        reset_persisted_schema_snapshot_decode_count_for_tests();
1946        let selection = store
1947            .latest_catalog_identity(entity, "entities::Versioned", "schema_store_test")
1948            .expect("latest catalog identity should derive from header")
1949            .expect("latest catalog identity should exist");
1950
1951        assert_eq!(persisted_schema_snapshot_decode_count_for_tests(), 0);
1952        assert_eq!(
1953            selection.identity().accepted_schema_version(),
1954            SchemaVersion::new(2)
1955        );
1956        assert_eq!(
1957            selection.identity().accepted_schema_fingerprint(),
1958            expected_fingerprint,
1959            "accepted catalog identity fingerprint must exclude schema_version",
1960        );
1961    }
1962
1963    #[test]
1964    fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1965        let mut store = SchemaStore::init(test_memory(240));
1966        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1967        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1968        let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1969
1970        store
1971            .insert_persisted_snapshot(EntityTag::new(81), &initial)
1972            .expect("initial schema snapshot should encode");
1973        let initial_metadata = store
1974            .catalog_metadata()
1975            .expect("initial schema catalog metadata should derive")
1976            .expect("initial schema catalog metadata should be present");
1977
1978        store
1979            .insert_persisted_snapshot(EntityTag::new(81), &newer)
1980            .expect("newer schema snapshot should encode");
1981        store
1982            .insert_persisted_snapshot(EntityTag::new(82), &other)
1983            .expect("other schema snapshot should encode");
1984        let updated_metadata = store
1985            .catalog_metadata()
1986            .expect("updated schema catalog metadata should derive")
1987            .expect("updated schema catalog metadata should be present");
1988
1989        assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1990        assert_eq!(
1991            initial_metadata.schema_fingerprint_method_version(),
1992            SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
1993        );
1994        assert_eq!(initial_metadata.entity_count(), 1);
1995        assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1996        assert_eq!(
1997            updated_metadata.schema_fingerprint_method_version(),
1998            SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
1999        );
2000        assert_eq!(updated_metadata.entity_count(), 2);
2001        assert_ne!(
2002            initial_metadata.schema_fingerprint(),
2003            updated_metadata.schema_fingerprint(),
2004            "catalog fingerprint must change when latest accepted schema catalog changes"
2005        );
2006    }
2007
2008    #[test]
2009    fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
2010        let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
2011        let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
2012
2013        let mut left = SchemaStore::init(test_memory(239));
2014        left.insert_persisted_snapshot(EntityTag::new(91), &first)
2015            .expect("first schema snapshot should encode");
2016        left.insert_persisted_snapshot(EntityTag::new(92), &second)
2017            .expect("second schema snapshot should encode");
2018
2019        let mut right = SchemaStore::init(test_memory(238));
2020        right
2021            .insert_persisted_snapshot(EntityTag::new(92), &second)
2022            .expect("second schema snapshot should encode");
2023        right
2024            .insert_persisted_snapshot(EntityTag::new(91), &first)
2025            .expect("first schema snapshot should encode");
2026
2027        let left_metadata = left
2028            .catalog_metadata()
2029            .expect("left schema catalog metadata should derive");
2030        let right_metadata = right
2031            .catalog_metadata()
2032            .expect("right schema catalog metadata should derive");
2033
2034        assert_eq!(left_metadata, right_metadata);
2035    }
2036
2037    #[test]
2038    fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
2039        let without_index =
2040            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
2041        let with_index = persisted_schema_snapshot_with_index_for_test(
2042            SchemaVersion::initial(),
2043            "RoleSpecific",
2044            "payload_idx",
2045        );
2046
2047        let mut base = SchemaStore::init(test_memory(237));
2048        base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
2049            .expect("base schema snapshot should encode");
2050        let base_metadata = base
2051            .allocation_metadata()
2052            .expect("base allocation metadata should derive")
2053            .expect("base allocation metadata should be present");
2054
2055        let mut indexed = SchemaStore::init(test_memory(236));
2056        indexed
2057            .insert_persisted_snapshot(EntityTag::new(93), &with_index)
2058            .expect("indexed schema snapshot should encode");
2059        let indexed_metadata = indexed
2060            .allocation_metadata()
2061            .expect("indexed allocation metadata should derive")
2062            .expect("indexed allocation metadata should be present");
2063
2064        assert_eq!(
2065            base_metadata.data().schema_fingerprint(),
2066            indexed_metadata.data().schema_fingerprint(),
2067            "data allocation metadata should ignore accepted index catalog changes"
2068        );
2069        assert_eq!(
2070            indexed_metadata.data().schema_fingerprint_method_version(),
2071            SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION
2072        );
2073        assert_eq!(
2074            indexed_metadata.index().schema_fingerprint_method_version(),
2075            SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION
2076        );
2077        assert_eq!(
2078            indexed_metadata
2079                .schema()
2080                .schema_fingerprint_method_version(),
2081            SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
2082        );
2083        assert_ne!(
2084            base_metadata.index().schema_fingerprint(),
2085            indexed_metadata.index().schema_fingerprint(),
2086            "index allocation metadata should change when accepted index catalog changes"
2087        );
2088        assert_ne!(
2089            base_metadata.schema().schema_fingerprint(),
2090            indexed_metadata.schema().schema_fingerprint(),
2091            "schema allocation metadata should change when full accepted catalog changes"
2092        );
2093        assert_ne!(
2094            indexed_metadata.data().schema_fingerprint(),
2095            indexed_metadata.index().schema_fingerprint(),
2096            "data and index allocation metadata should have distinct role fingerprints"
2097        );
2098        assert_ne!(
2099            indexed_metadata.index().schema_fingerprint(),
2100            indexed_metadata.schema().schema_fingerprint(),
2101            "index and schema allocation metadata should have distinct role fingerprints"
2102        );
2103    }
2104
2105    #[test]
2106    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
2107        let mut store = SchemaStore::init(test_memory(253));
2108        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
2109            SchemaVersion::new(2),
2110            SchemaVersion::initial(),
2111            "Invalid",
2112        );
2113
2114        let err = store
2115            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
2116            .expect_err("schema store should reject mismatched snapshot/layout versions");
2117
2118        assert!(
2119            err.message()
2120                .contains("schema snapshot row-layout version mismatch"),
2121            "schema store should preserve the version mismatch diagnostic"
2122        );
2123    }
2124
2125    #[test]
2126    fn schema_store_rejects_typed_snapshot_with_zero_schema_version() {
2127        let mut store = SchemaStore::init(test_memory(254));
2128        let invalid =
2129            persisted_schema_snapshot_for_test(SchemaVersion::new(0), "ZeroSchemaVersion");
2130
2131        let err = store
2132            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
2133            .expect_err("schema store should reject non-positive schema versions");
2134
2135        assert!(
2136            err.message()
2137                .contains("schema snapshot schema_version must be positive"),
2138            "schema store should hard-cut non-positive persisted schema versions"
2139        );
2140    }
2141
2142    #[test]
2143    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
2144        let mut store = SchemaStore::init(test_memory(232));
2145        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
2146        let invalid = PersistedSchemaSnapshot::new(
2147            base.version(),
2148            base.entity_path().to_string(),
2149            base.entity_name().to_string(),
2150            base.first_primary_key_field_id(),
2151            SchemaRowLayout::new(
2152                base.version(),
2153                vec![
2154                    (FieldId::new(1), SchemaFieldSlot::new(0)),
2155                    (FieldId::new(2), SchemaFieldSlot::new(3)),
2156                ],
2157            ),
2158            base.fields().to_vec(),
2159        );
2160
2161        let err = store
2162            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
2163            .expect_err("schema store should reject divergent field/layout slots");
2164
2165        assert!(
2166            err.message()
2167                .contains("schema snapshot field slot mismatch"),
2168            "schema store should report the duplicated slot divergence"
2169        );
2170    }
2171
2172    #[test]
2173    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
2174        let mut store = SchemaStore::init(test_memory(246));
2175        let base =
2176            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
2177        let invalid = PersistedSchemaSnapshot::new(
2178            base.version(),
2179            base.entity_path().to_string(),
2180            base.entity_name().to_string(),
2181            base.first_primary_key_field_id(),
2182            SchemaRowLayout::new(
2183                base.version(),
2184                vec![
2185                    (FieldId::new(1), SchemaFieldSlot::new(0)),
2186                    (FieldId::new(2), SchemaFieldSlot::new(0)),
2187                ],
2188            ),
2189            base.fields().to_vec(),
2190        );
2191
2192        let err = store
2193            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
2194            .expect_err("schema store should reject duplicate row-layout slots");
2195
2196        assert!(
2197            err.message()
2198                .contains("schema snapshot duplicate row-layout slot"),
2199            "schema store should report the row-layout slot ambiguity"
2200        );
2201    }
2202
2203    #[test]
2204    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
2205        let mut store = SchemaStore::init(test_memory(248));
2206        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
2207        let invalid = PersistedSchemaSnapshot::new(
2208            base.version(),
2209            base.entity_path().to_string(),
2210            base.entity_name().to_string(),
2211            FieldId::new(99),
2212            base.row_layout().clone(),
2213            base.fields().to_vec(),
2214        );
2215
2216        let err = store
2217            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
2218            .expect_err("schema store should reject snapshots without the primary-key field");
2219
2220        assert!(
2221            err.message()
2222                .contains("schema snapshot primary key field missing from row layout"),
2223            "schema store should report the missing primary-key field"
2224        );
2225    }
2226
2227    #[test]
2228    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
2229        let mut store = SchemaStore::init(test_memory(249));
2230        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
2231        let corrupt_key =
2232            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
2233
2234        store
2235            .insert_persisted_snapshot(EntityTag::new(45), &initial)
2236            .expect("initial schema snapshot should encode");
2237        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
2238
2239        let err = store
2240            .latest_persisted_snapshot(EntityTag::new(45))
2241            .expect_err("latest corrupt schema snapshot must fail closed");
2242
2243        assert!(
2244            err.message()
2245                .contains("failed to decode persisted schema snapshot"),
2246            "latest-version lookup should report the corrupt newest snapshot"
2247        );
2248    }
2249
2250    #[test]
2251    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
2252        let mut store = SchemaStore::init(test_memory(250));
2253        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
2254        let invalid = PersistedSchemaSnapshot::new(
2255            base.version(),
2256            base.entity_path().to_string(),
2257            base.entity_name().to_string(),
2258            base.first_primary_key_field_id(),
2259            SchemaRowLayout::new(
2260                base.version(),
2261                vec![
2262                    (FieldId::new(1), SchemaFieldSlot::new(0)),
2263                    (FieldId::new(2), SchemaFieldSlot::new(3)),
2264                ],
2265            ),
2266            base.fields().to_vec(),
2267        );
2268        let raw = encode_persisted_schema_snapshot(&invalid)
2269            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2270        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
2271
2272        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2273
2274        let err = store
2275            .latest_persisted_snapshot(EntityTag::new(46))
2276            .expect_err("raw decode should reject divergent field/layout slots");
2277
2278        assert!(
2279            err.message()
2280                .contains("persisted schema snapshot field slot mismatch"),
2281            "schema codec should report the raw decoded slot divergence"
2282        );
2283    }
2284
2285    #[test]
2286    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
2287        let mut store = SchemaStore::init(test_memory(247));
2288        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
2289        let invalid = PersistedSchemaSnapshot::new(
2290            base.version(),
2291            base.entity_path().to_string(),
2292            base.entity_name().to_string(),
2293            FieldId::new(99),
2294            base.row_layout().clone(),
2295            base.fields().to_vec(),
2296        );
2297        let raw = encode_persisted_schema_snapshot(&invalid)
2298            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2299        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
2300
2301        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2302
2303        let err = store
2304            .latest_persisted_snapshot(EntityTag::new(48))
2305            .expect_err("raw decode should reject snapshots without the primary-key field");
2306
2307        assert!(
2308            err.message()
2309                .contains("persisted schema snapshot primary key field missing from row layout"),
2310            "schema codec should report the raw decoded missing primary-key field"
2311        );
2312    }
2313
2314    #[test]
2315    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
2316        let mut store = SchemaStore::init(test_memory(245));
2317        let base =
2318            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
2319        let mut fields = base.fields().to_vec();
2320        let duplicate = PersistedFieldSnapshot::new(
2321            fields[1].id(),
2322            fields[0].name().to_string(),
2323            fields[1].slot(),
2324            fields[1].kind().clone(),
2325            fields[1].nested_leaves().to_vec(),
2326            fields[1].nullable(),
2327            fields[1].default().clone(),
2328            fields[1].storage_decode(),
2329            fields[1].leaf_codec(),
2330        );
2331        fields[1] = duplicate;
2332        let invalid = PersistedSchemaSnapshot::new(
2333            base.version(),
2334            base.entity_path().to_string(),
2335            base.entity_name().to_string(),
2336            base.first_primary_key_field_id(),
2337            base.row_layout().clone(),
2338            fields,
2339        );
2340        let raw = encode_persisted_schema_snapshot(&invalid)
2341            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2342        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
2343
2344        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2345
2346        let err = store
2347            .latest_persisted_snapshot(EntityTag::new(50))
2348            .expect_err("raw decode should reject duplicate field names");
2349
2350        assert!(
2351            err.message()
2352                .contains("persisted schema snapshot duplicate field name"),
2353            "schema codec should report the raw decoded field-name ambiguity"
2354        );
2355    }
2356
2357    #[test]
2358    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
2359        let mut store = SchemaStore::init(test_memory(244));
2360        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
2361        let mut fields = base.fields().to_vec();
2362        let invalid_field = PersistedFieldSnapshot::new(
2363            fields[1].id(),
2364            fields[1].name().to_string(),
2365            fields[1].slot(),
2366            fields[1].kind().clone(),
2367            vec![PersistedNestedLeafSnapshot::new(
2368                Vec::new(),
2369                PersistedFieldKind::Blob { max_len: None },
2370                false,
2371                FieldStorageDecode::ByKind,
2372                LeafCodec::Scalar(ScalarCodec::Blob),
2373            )],
2374            fields[1].nullable(),
2375            fields[1].default().clone(),
2376            fields[1].storage_decode(),
2377            fields[1].leaf_codec(),
2378        );
2379        fields[1] = invalid_field;
2380        let invalid = PersistedSchemaSnapshot::new(
2381            base.version(),
2382            base.entity_path().to_string(),
2383            base.entity_name().to_string(),
2384            base.first_primary_key_field_id(),
2385            base.row_layout().clone(),
2386            fields,
2387        );
2388
2389        let err = store
2390            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
2391            .expect_err("schema store should reject empty nested leaf paths");
2392
2393        assert!(
2394            err.message()
2395                .contains("schema snapshot empty nested leaf path"),
2396            "schema store should report the empty nested leaf path"
2397        );
2398    }
2399
2400    #[test]
2401    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
2402        let mut store = SchemaStore::init(test_memory(243));
2403        let base =
2404            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
2405        let mut fields = base.fields().to_vec();
2406        let duplicate_leaves = vec![
2407            PersistedNestedLeafSnapshot::new(
2408                vec!["bytes".to_string()],
2409                PersistedFieldKind::Blob { max_len: None },
2410                false,
2411                FieldStorageDecode::ByKind,
2412                LeafCodec::Scalar(ScalarCodec::Blob),
2413            ),
2414            PersistedNestedLeafSnapshot::new(
2415                vec!["bytes".to_string()],
2416                PersistedFieldKind::Text { max_len: None },
2417                false,
2418                FieldStorageDecode::ByKind,
2419                LeafCodec::Scalar(ScalarCodec::Text),
2420            ),
2421        ];
2422        let invalid_field = PersistedFieldSnapshot::new(
2423            fields[1].id(),
2424            fields[1].name().to_string(),
2425            fields[1].slot(),
2426            fields[1].kind().clone(),
2427            duplicate_leaves,
2428            fields[1].nullable(),
2429            fields[1].default().clone(),
2430            fields[1].storage_decode(),
2431            fields[1].leaf_codec(),
2432        );
2433        fields[1] = invalid_field;
2434        let invalid = PersistedSchemaSnapshot::new(
2435            base.version(),
2436            base.entity_path().to_string(),
2437            base.entity_name().to_string(),
2438            base.first_primary_key_field_id(),
2439            base.row_layout().clone(),
2440            fields,
2441        );
2442        let raw = encode_persisted_schema_snapshot(&invalid)
2443            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2444        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
2445
2446        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2447
2448        let err = store
2449            .latest_persisted_snapshot(EntityTag::new(52))
2450            .expect_err("raw decode should reject duplicate nested leaf paths");
2451
2452        assert!(
2453            err.message()
2454                .contains("persisted schema snapshot duplicate nested leaf path"),
2455            "schema codec should report the raw decoded nested path ambiguity"
2456        );
2457    }
2458
2459    #[test]
2460    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
2461        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
2462
2463        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
2464            .expect("schema snapshot should encode");
2465        let decoded = raw
2466            .decode_persisted_snapshot()
2467            .expect("schema snapshot should decode");
2468
2469        assert_eq!(decoded, snapshot);
2470    }
2471
2472    // Build one typed schema snapshot used by schema-store tests. The exact
2473    // field contracts are intentionally rich enough to cover nested metadata,
2474    // scalar codecs, and structural fallback payloads through the raw store.
2475    fn assert_latest_schema(
2476        store: &SchemaStore,
2477        entity: EntityTag,
2478        version: SchemaVersion,
2479        entity_name: &str,
2480    ) {
2481        let latest = store
2482            .latest_persisted_snapshot(entity)
2483            .expect("latest schema snapshot should decode")
2484            .expect("latest schema snapshot should exist");
2485
2486        assert_eq!(latest.version(), version);
2487        assert_eq!(latest.entity_name(), entity_name);
2488    }
2489
2490    fn tombstone_journaled_raw_snapshot(
2491        store: &mut SchemaStore,
2492        entity: EntityTag,
2493        version: SchemaVersion,
2494    ) {
2495        let key = RawSchemaKey::from_entity_version(entity, version);
2496        let SchemaStoreBackend::Journaled { tombstones, .. } = &mut store.backend else {
2497            panic!("schema tombstone test helper requires a journaled store");
2498        };
2499
2500        tombstones.insert(key);
2501    }
2502
2503    fn visit_journaled_schema_range(
2504        store: &SchemaStore,
2505        entity: EntityTag,
2506        direction: Direction,
2507        stop_after: usize,
2508    ) -> Vec<(u32, String)> {
2509        let SchemaStoreBackend::Journaled {
2510            canonical,
2511            live,
2512            tombstones,
2513        } = &store.backend
2514        else {
2515            panic!("schema range test helper requires a journaled store");
2516        };
2517
2518        let mut visited = Vec::new();
2519        let _: Result<(), Infallible> = SchemaStore::visit_journaled_raw_snapshot_range(
2520            canonical,
2521            live,
2522            tombstones,
2523            RawSchemaKey::entity_range_bounds(entity),
2524            direction,
2525            |key, snapshot| {
2526                let decoded = snapshot
2527                    .decode_persisted_snapshot()
2528                    .expect("visited schema snapshot should decode");
2529                visited.push((key.version(), decoded.entity_name().to_string()));
2530                Ok(if visited.len() >= stop_after {
2531                    SchemaStoreVisit::Stop
2532                } else {
2533                    SchemaStoreVisit::Continue
2534                })
2535            },
2536        );
2537
2538        visited
2539    }
2540
2541    fn persisted_schema_snapshot_for_test(
2542        version: SchemaVersion,
2543        entity_name: &str,
2544    ) -> PersistedSchemaSnapshot {
2545        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
2546    }
2547
2548    fn persisted_schema_snapshot_with_index_for_test(
2549        version: SchemaVersion,
2550        entity_name: &str,
2551        index_name: &str,
2552    ) -> PersistedSchemaSnapshot {
2553        let base = persisted_schema_snapshot_for_test(version, entity_name);
2554
2555        PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
2556            base.version(),
2557            base.entity_path().to_string(),
2558            base.entity_name().to_string(),
2559            base.primary_key_field_ids().to_vec(),
2560            base.row_layout().clone(),
2561            base.fields().to_vec(),
2562            vec![PersistedIndexSnapshot::new(
2563                0,
2564                index_name.to_string(),
2565                "RoleSpecificStore".to_string(),
2566                false,
2567                PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
2568                    FieldId::new(1),
2569                    SchemaFieldSlot::new(0),
2570                    vec!["id".to_string()],
2571                    PersistedFieldKind::Ulid,
2572                    false,
2573                )]),
2574                None,
2575            )],
2576        )
2577    }
2578
2579    // Build one typed schema snapshot with independently selectable snapshot
2580    // and row-layout versions. Production snapshots should keep these aligned;
2581    // tests can deliberately break that invariant at the store boundary.
2582    fn persisted_schema_snapshot_with_layout_version_for_test(
2583        version: SchemaVersion,
2584        layout_version: SchemaVersion,
2585        entity_name: &str,
2586    ) -> PersistedSchemaSnapshot {
2587        PersistedSchemaSnapshot::new(
2588            version,
2589            format!("entities::{entity_name}"),
2590            entity_name.to_string(),
2591            FieldId::new(1),
2592            SchemaRowLayout::new(
2593                layout_version,
2594                vec![
2595                    (FieldId::new(1), SchemaFieldSlot::new(0)),
2596                    (FieldId::new(2), SchemaFieldSlot::new(1)),
2597                ],
2598            ),
2599            vec![
2600                PersistedFieldSnapshot::new(
2601                    FieldId::new(1),
2602                    "id".to_string(),
2603                    SchemaFieldSlot::new(0),
2604                    PersistedFieldKind::Ulid,
2605                    Vec::new(),
2606                    false,
2607                    SchemaFieldDefault::None,
2608                    FieldStorageDecode::ByKind,
2609                    LeafCodec::Scalar(ScalarCodec::Ulid),
2610                ),
2611                PersistedFieldSnapshot::new(
2612                    FieldId::new(2),
2613                    "payload".to_string(),
2614                    SchemaFieldSlot::new(1),
2615                    PersistedFieldKind::Map {
2616                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
2617                        value: Box::new(PersistedFieldKind::List(Box::new(
2618                            PersistedFieldKind::Nat64,
2619                        ))),
2620                    },
2621                    vec![PersistedNestedLeafSnapshot::new(
2622                        vec!["bytes".to_string()],
2623                        PersistedFieldKind::Blob { max_len: None },
2624                        false,
2625                        FieldStorageDecode::ByKind,
2626                        LeafCodec::Scalar(ScalarCodec::Blob),
2627                    )],
2628                    false,
2629                    SchemaFieldDefault::None,
2630                    FieldStorageDecode::ByKind,
2631                    LeafCodec::StructuralFallback,
2632                ),
2633            ],
2634        )
2635    }
2636}