Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        schema::{
14            PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15            PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17        },
18    },
19    error::InternalError,
20    traits::Storable,
21    types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{
25    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
26};
27use sha2::Digest;
28use std::borrow::Cow;
29use std::collections::BTreeMap as StdBTreeMap;
30
31const SCHEMA_KEY_BYTES_USIZE: usize = 12;
32const SCHEMA_KEY_BYTES: u32 = 12;
33const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
34const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
35const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
36const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
37
38///
39/// RawSchemaKey
40///
41/// Stable key for one persisted schema snapshot entry.
42/// It combines the entity tag and schema version so reconciliation can load
43/// concrete versions without depending on generated entity names.
44///
45
46#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
47struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
48
49impl RawSchemaKey {
50    /// Build the raw persisted key for one entity schema version.
51    #[must_use]
52    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
53        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
54        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
55        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
56
57        Self(out)
58    }
59
60    /// Return the entity tag encoded in this schema key.
61    #[must_use]
62    fn entity_tag(self) -> EntityTag {
63        let mut bytes = [0u8; size_of::<u64>()];
64        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
65
66        EntityTag::new(u64::from_be_bytes(bytes))
67    }
68
69    /// Return the schema version encoded in this schema key.
70    #[must_use]
71    fn version(self) -> u32 {
72        let mut bytes = [0u8; size_of::<u32>()];
73        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
74
75        u32::from_be_bytes(bytes)
76    }
77}
78
79impl Storable for RawSchemaKey {
80    fn to_bytes(&self) -> Cow<'_, [u8]> {
81        Cow::Borrowed(&self.0)
82    }
83
84    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
85        debug_assert_eq!(
86            bytes.len(),
87            SCHEMA_KEY_BYTES_USIZE,
88            "RawSchemaKey::from_bytes received unexpected byte length",
89        );
90
91        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
92            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
93        }
94
95        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
96        out.copy_from_slice(bytes.as_ref());
97        Self(out)
98    }
99
100    fn into_bytes(self) -> Vec<u8> {
101        self.0.to_vec()
102    }
103
104    const BOUND: Bound = Bound::Bounded {
105        max_size: SCHEMA_KEY_BYTES,
106        is_fixed_size: true,
107    };
108}
109
110///
111/// RawSchemaSnapshot
112///
113/// Raw persisted schema snapshot payload.
114/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
115/// keeping the stable-memory value boundary independent from the typed schema
116/// DTOs used by reconciliation.
117///
118
119#[derive(Clone, Debug, Eq, PartialEq)]
120struct RawSchemaSnapshot(Vec<u8>);
121
122impl RawSchemaSnapshot {
123    /// Encode one typed persisted-schema snapshot into a raw store payload.
124    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
125        validate_typed_schema_snapshot_for_store(snapshot)?;
126
127        encode_persisted_schema_snapshot(snapshot).map(Self)
128    }
129
130    /// Build one raw schema snapshot from already-encoded bytes.
131    #[must_use]
132    #[cfg(test)]
133    const fn from_bytes(bytes: Vec<u8>) -> Self {
134        Self(bytes)
135    }
136
137    /// Borrow the encoded schema snapshot payload.
138    #[must_use]
139    const fn as_bytes(&self) -> &[u8] {
140        self.0.as_slice()
141    }
142
143    /// Consume the snapshot into its encoded payload bytes.
144    #[must_use]
145    #[cfg(test)]
146    fn into_bytes(self) -> Vec<u8> {
147        self.0
148    }
149
150    /// Decode this raw store payload into a typed persisted-schema snapshot.
151    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
152        decode_persisted_schema_snapshot(self.as_bytes())
153    }
154}
155
156impl Storable for RawSchemaSnapshot {
157    fn to_bytes(&self) -> Cow<'_, [u8]> {
158        Cow::Borrowed(self.as_bytes())
159    }
160
161    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
162        Self(bytes.into_owned())
163    }
164
165    fn into_bytes(self) -> Vec<u8> {
166        self.0
167    }
168
169    const BOUND: Bound = Bound::Bounded {
170        max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
171        is_fixed_size: false,
172    };
173}
174
175// Validate typed schema snapshots before they are encoded into the raw schema
176// metadata store. This catches caller-side invariant violations separately from
177// raw persisted-byte corruption handled by the codec decode boundary.
178fn validate_typed_schema_snapshot_for_store(
179    snapshot: &PersistedSchemaSnapshot,
180) -> Result<(), InternalError> {
181    if let Some(detail) = schema_snapshot_integrity_detail(
182        "schema snapshot",
183        snapshot.version(),
184        snapshot.primary_key_field_ids(),
185        snapshot.row_layout(),
186        snapshot.fields(),
187    ) {
188        return Err(InternalError::store_invariant(detail));
189    }
190
191    Ok(())
192}
193
194///
195/// SchemaStoreFootprint
196///
197/// Current raw schema metadata footprint for one entity. Reconciliation uses
198/// this value to report stable-memory pressure without decoding schema payloads
199/// or exposing field-level metadata through metrics.
200///
201
202#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203pub(in crate::db) struct SchemaStoreFootprint {
204    snapshots: u64,
205    encoded_bytes: u64,
206    latest_snapshot_bytes: u64,
207}
208
209///
210/// SchemaStoreCatalogMetadata
211///
212/// Accepted schema-store catalog metadata derived from latest persisted
213/// snapshots. This is diagnostic allocation metadata, not allocation identity.
214///
215
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct SchemaStoreCatalogMetadata {
218    schema_version: SchemaVersion,
219    schema_fingerprint: CommitSchemaFingerprint,
220    entity_count: u64,
221}
222
223impl SchemaStoreCatalogMetadata {
224    /// Build catalog metadata from already-derived accepted schema facts.
225    #[must_use]
226    const fn new(
227        schema_version: SchemaVersion,
228        schema_fingerprint: CommitSchemaFingerprint,
229        entity_count: u64,
230    ) -> Self {
231        Self {
232            schema_version,
233            schema_fingerprint,
234            entity_count,
235        }
236    }
237
238    /// Return the maximum latest schema version represented in the catalog.
239    #[must_use]
240    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
241        self.schema_version
242    }
243
244    /// Return the deterministic catalog fingerprint for latest accepted
245    /// snapshots.
246    #[must_use]
247    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
248        self.schema_fingerprint
249    }
250
251    /// Return number of entity schemas represented in this catalog metadata.
252    #[must_use]
253    pub(in crate::db) const fn entity_count(self) -> u64 {
254        self.entity_count
255    }
256}
257
258///
259/// SchemaStoreAllocationMetadata
260///
261/// Role-specific allocation metadata derived from latest accepted schema-store
262/// snapshots. These fingerprints describe the accepted contract that owns each
263/// allocation role; they are diagnostics, not allocation identity.
264///
265
266#[derive(Clone, Copy, Debug, Eq, PartialEq)]
267pub(in crate::db) struct SchemaStoreAllocationMetadata {
268    data: SchemaStoreCatalogMetadata,
269    index: SchemaStoreCatalogMetadata,
270    schema: SchemaStoreCatalogMetadata,
271}
272
273impl SchemaStoreAllocationMetadata {
274    /// Build one role-specific metadata set from already-derived accepted
275    /// schema facts.
276    #[must_use]
277    const fn new(
278        data: SchemaStoreCatalogMetadata,
279        index: SchemaStoreCatalogMetadata,
280        schema: SchemaStoreCatalogMetadata,
281    ) -> Self {
282        Self {
283            data,
284            index,
285            schema,
286        }
287    }
288
289    /// Return accepted row-layout allocation metadata for data memory.
290    #[must_use]
291    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
292        self.data
293    }
294
295    /// Return accepted index-catalog allocation metadata for index memory.
296    #[must_use]
297    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
298        self.index
299    }
300
301    /// Return accepted full schema-catalog allocation metadata for schema
302    /// memory.
303    #[must_use]
304    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
305        self.schema
306    }
307}
308
309impl SchemaStoreFootprint {
310    /// Build one schema-store footprint from already-counted raw payload facts.
311    #[must_use]
312    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
313        Self {
314            snapshots,
315            encoded_bytes,
316            latest_snapshot_bytes,
317        }
318    }
319
320    /// Return the number of raw schema snapshots stored for the entity.
321    #[must_use]
322    pub(in crate::db) const fn snapshots(self) -> u64 {
323        self.snapshots
324    }
325
326    /// Return the total encoded payload bytes stored for the entity.
327    #[must_use]
328    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
329        self.encoded_bytes
330    }
331
332    /// Return the encoded payload bytes for the highest-version snapshot.
333    #[must_use]
334    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
335        self.latest_snapshot_bytes
336    }
337}
338
339///
340/// SchemaStore
341///
342/// Thin persistence wrapper over one stable or heap schema metadata BTreeMap.
343/// Startup reconciliation writes and validates encoded schema snapshots here
344/// before row/index operations proceed.
345///
346
347pub struct SchemaStore {
348    backend: SchemaStoreBackend,
349}
350
351enum SchemaStoreBackend {
352    Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
353    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
354}
355
356/// Control-flow result for schema-store traversal visitors.
357#[derive(Clone, Copy, Debug, Eq, PartialEq)]
358enum SchemaStoreVisit {
359    Continue,
360    #[allow(
361        dead_code,
362        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
363    )]
364    Stop,
365}
366
367impl SchemaStoreVisit {
368    const fn should_stop(self) -> bool {
369        matches!(self, Self::Stop)
370    }
371}
372
373impl SchemaStore {
374    /// Initialize the schema store with the provided backing memory.
375    #[must_use]
376    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
377        Self {
378            backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
379        }
380    }
381
382    /// Initialize a volatile heap-backed schema store.
383    #[must_use]
384    pub const fn init_heap() -> Self {
385        Self {
386            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
387        }
388    }
389
390    /// Insert or replace one typed persisted schema snapshot.
391    pub(in crate::db) fn insert_persisted_snapshot(
392        &mut self,
393        entity: EntityTag,
394        snapshot: &PersistedSchemaSnapshot,
395    ) -> Result<(), InternalError> {
396        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
397        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
398        let _ = self.insert_raw_snapshot(key, raw_snapshot);
399
400        Ok(())
401    }
402
403    /// Load and decode one typed persisted schema snapshot.
404    #[cfg(test)]
405    pub(in crate::db) fn get_persisted_snapshot(
406        &self,
407        entity: EntityTag,
408        version: SchemaVersion,
409    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
410        let key = RawSchemaKey::from_entity_version(entity, version);
411        self.get_raw_snapshot(&key)
412            .map(|snapshot| snapshot.decode_persisted_snapshot())
413            .transpose()
414    }
415
416    /// Load and decode the highest stored schema snapshot version for one entity.
417    pub(in crate::db) fn latest_persisted_snapshot(
418        &self,
419        entity: EntityTag,
420    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
421        let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
422        let _: Result<(), InternalError> = self.visit_raw_snapshots(|key, snapshot| {
423            if key.entity_tag() != entity {
424                return Ok(SchemaStoreVisit::Continue);
425            }
426
427            let version = SchemaVersion::new(key.version());
428            if latest
429                .as_ref()
430                .is_none_or(|(latest_version, _)| version > *latest_version)
431            {
432                latest = Some((version, snapshot.clone()));
433            }
434            Ok(SchemaStoreVisit::Continue)
435        });
436
437        latest
438            .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
439            .transpose()
440    }
441
442    /// Return raw schema-store footprint facts for one entity.
443    #[must_use]
444    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
445        let mut snapshots = 0u64;
446        let mut encoded_bytes = 0u64;
447        let mut latest = None::<(SchemaVersion, u64)>;
448
449        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
450            if key.entity_tag() != entity {
451                return Ok(SchemaStoreVisit::Continue);
452            }
453
454            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
455            snapshots = snapshots.saturating_add(1);
456            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
457
458            let version = SchemaVersion::new(key.version());
459            if latest
460                .as_ref()
461                .is_none_or(|(latest_version, _)| version > *latest_version)
462            {
463                latest = Some((version, snapshot_bytes));
464            }
465            Ok(SchemaStoreVisit::Continue)
466        });
467
468        SchemaStoreFootprint::new(
469            snapshots,
470            encoded_bytes,
471            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
472        )
473    }
474
475    /// Derive accepted catalog metadata from latest persisted schema snapshots.
476    ///
477    /// This function intentionally reads only the persisted schema store. It
478    /// does not reconstruct metadata from generated models when the store has
479    /// no accepted snapshots.
480    #[cfg(test)]
481    pub(in crate::db) fn catalog_metadata(
482        &self,
483    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
484        Ok(self
485            .allocation_metadata()?
486            .map(SchemaStoreAllocationMetadata::schema))
487    }
488
489    /// Derive role-specific allocation metadata from latest persisted schema
490    /// snapshots.
491    ///
492    /// This function intentionally reads only accepted schema-store payloads.
493    /// It never reconstructs metadata from generated models when the store has
494    /// no accepted snapshots.
495    pub(in crate::db) fn allocation_metadata(
496        &self,
497    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
498        let latest_by_entity = self.latest_raw_snapshots_by_entity();
499        if latest_by_entity.is_empty() {
500            return Ok(None);
501        }
502
503        Ok(Some(SchemaStoreAllocationMetadata::new(
504            derive_data_allocation_metadata(&latest_by_entity)?,
505            derive_index_allocation_metadata(&latest_by_entity)?,
506            derive_schema_catalog_metadata(&latest_by_entity)?,
507        )))
508    }
509
510    /// Insert or replace one raw schema snapshot.
511    fn insert_raw_snapshot(
512        &mut self,
513        key: RawSchemaKey,
514        snapshot: RawSchemaSnapshot,
515    ) -> Option<RawSchemaSnapshot> {
516        match &mut self.backend {
517            SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
518            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
519        }
520    }
521
522    /// Load one raw schema snapshot by key.
523    #[must_use]
524    #[cfg(test)]
525    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
526        match &self.backend {
527            SchemaStoreBackend::Stable(map) => map.get(key),
528            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
529        }
530    }
531
532    /// Return whether one schema snapshot key is present.
533    #[must_use]
534    #[cfg(test)]
535    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
536        match &self.backend {
537            SchemaStoreBackend::Stable(map) => map.contains_key(key),
538            SchemaStoreBackend::Heap(map) => map.contains_key(key),
539        }
540    }
541
542    /// Return the number of schema snapshot entries in this store.
543    #[must_use]
544    #[cfg(test)]
545    pub(in crate::db) fn len(&self) -> u64 {
546        match &self.backend {
547            SchemaStoreBackend::Stable(map) => map.len(),
548            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
549        }
550    }
551
552    /// Return whether this schema store currently has no persisted snapshots.
553    #[must_use]
554    #[cfg(test)]
555    pub(in crate::db) fn is_empty(&self) -> bool {
556        match &self.backend {
557            SchemaStoreBackend::Stable(map) => map.is_empty(),
558            SchemaStoreBackend::Heap(map) => map.is_empty(),
559        }
560    }
561
562    /// Clear all schema metadata entries from the store.
563    #[cfg(test)]
564    pub(in crate::db) fn clear(&mut self) {
565        match &mut self.backend {
566            SchemaStoreBackend::Stable(map) => map.clear_new(),
567            SchemaStoreBackend::Heap(map) => map.clear(),
568        }
569    }
570
571    fn latest_raw_snapshots_by_entity(
572        &self,
573    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
574        let mut latest_by_entity =
575            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
576
577        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
578            let version = SchemaVersion::new(key.version());
579            match latest_by_entity.get_mut(&key.entity_tag()) {
580                Some((latest_version, latest_snapshot)) if version > *latest_version => {
581                    *latest_version = version;
582                    *latest_snapshot = snapshot.clone();
583                }
584                None => {
585                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
586                }
587                Some(_) => {}
588            }
589            Ok(SchemaStoreVisit::Continue)
590        });
591
592        latest_by_entity
593    }
594
595    /// Visit raw schema snapshots in canonical store order without exposing
596    /// the backing stable-map iterator.
597    fn visit_raw_snapshots<E>(
598        &self,
599        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
600    ) -> Result<(), E> {
601        match &self.backend {
602            SchemaStoreBackend::Stable(map) => {
603                for entry in map.iter() {
604                    if visitor(entry.key(), &entry.value())?.should_stop() {
605                        break;
606                    }
607                }
608            }
609            SchemaStoreBackend::Heap(map) => {
610                for (key, snapshot) in map {
611                    if visitor(key, snapshot)?.should_stop() {
612                        break;
613                    }
614                }
615            }
616        }
617
618        Ok(())
619    }
620}
621
622fn derive_data_allocation_metadata(
623    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
624) -> Result<SchemaStoreCatalogMetadata, InternalError> {
625    let mut max_version = SchemaVersion::initial();
626    let mut hasher = new_hash_sha256();
627    write_hash_tag_u8(
628        &mut hasher,
629        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
630    );
631
632    for (entity, (_, snapshot)) in latest_by_entity {
633        let persisted = snapshot.decode_persisted_snapshot()?;
634        if persisted.version() > max_version {
635            max_version = persisted.version();
636        }
637
638        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
639            persisted.version(),
640            persisted.entity_path().to_string(),
641            persisted.entity_name().to_string(),
642            persisted.primary_key_field_ids().to_vec(),
643            persisted.row_layout().clone(),
644            persisted.fields().to_vec(),
645            Vec::new(),
646        );
647        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
648
649        write_hash_u64(&mut hasher, entity.value());
650        write_hash_u32(&mut hasher, persisted.version().get());
651        write_hash_len_u32(&mut hasher, encoded.len());
652        hasher.update(encoded);
653    }
654
655    Ok(finalize_schema_metadata(
656        max_version,
657        hasher,
658        latest_by_entity.len(),
659    ))
660}
661
662fn derive_index_allocation_metadata(
663    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
664) -> Result<SchemaStoreCatalogMetadata, InternalError> {
665    let mut max_version = SchemaVersion::initial();
666    let mut hasher = new_hash_sha256();
667    write_hash_tag_u8(
668        &mut hasher,
669        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
670    );
671
672    for (entity, (_, snapshot)) in latest_by_entity {
673        let persisted = snapshot.decode_persisted_snapshot()?;
674        if persisted.version() > max_version {
675            max_version = persisted.version();
676        }
677
678        write_hash_u64(&mut hasher, entity.value());
679        write_hash_u32(&mut hasher, persisted.version().get());
680        write_hash_len_u32(&mut hasher, persisted.indexes().len());
681        for index in persisted.indexes() {
682            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
683            write_hash_str_u32(&mut hasher, index.name());
684            write_hash_str_u32(&mut hasher, index.store());
685            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
686            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
687            match index.predicate_sql() {
688                Some(predicate_sql) => {
689                    write_hash_tag_u8(&mut hasher, 1);
690                    write_hash_str_u32(&mut hasher, predicate_sql);
691                }
692                None => write_hash_tag_u8(&mut hasher, 0),
693            }
694            hash_persisted_index_key(&mut hasher, index.key());
695        }
696    }
697
698    Ok(finalize_schema_metadata(
699        max_version,
700        hasher,
701        latest_by_entity.len(),
702    ))
703}
704
705fn derive_schema_catalog_metadata(
706    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
707) -> Result<SchemaStoreCatalogMetadata, InternalError> {
708    let mut max_version = SchemaVersion::initial();
709    let mut hasher = new_hash_sha256();
710    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
711
712    for (entity, (version, snapshot)) in latest_by_entity {
713        let persisted = snapshot.decode_persisted_snapshot()?;
714        if persisted.version() > max_version {
715            max_version = persisted.version();
716        }
717
718        write_hash_u64(&mut hasher, entity.value());
719        write_hash_u32(&mut hasher, version.get());
720        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
721        hasher.update(snapshot.as_bytes());
722    }
723
724    Ok(finalize_schema_metadata(
725        max_version,
726        hasher,
727        latest_by_entity.len(),
728    ))
729}
730
731fn finalize_schema_metadata(
732    schema_version: SchemaVersion,
733    hasher: sha2::Sha256,
734    entity_count: usize,
735) -> SchemaStoreCatalogMetadata {
736    let digest = finalize_hash_sha256(hasher);
737    let mut schema_fingerprint = [0u8; 16];
738    schema_fingerprint.copy_from_slice(&digest[..16]);
739
740    SchemaStoreCatalogMetadata::new(
741        schema_version,
742        schema_fingerprint,
743        u64::try_from(entity_count).unwrap_or(u64::MAX),
744    )
745}
746
747fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
748    match key {
749        PersistedIndexKeySnapshot::FieldPath(paths) => {
750            write_hash_tag_u8(hasher, 1);
751            write_hash_len_u32(hasher, paths.len());
752            for path in paths {
753                hash_persisted_index_field_path(hasher, path);
754            }
755        }
756        PersistedIndexKeySnapshot::Items(items) => {
757            write_hash_tag_u8(hasher, 2);
758            write_hash_len_u32(hasher, items.len());
759            for item in items {
760                match item {
761                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
762                        write_hash_tag_u8(hasher, 1);
763                        hash_persisted_index_field_path(hasher, path);
764                    }
765                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
766                        write_hash_tag_u8(hasher, 2);
767                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
768                        hash_persisted_index_field_path(hasher, expression.source());
769                        hash_persisted_field_kind(hasher, expression.input_kind());
770                        hash_persisted_field_kind(hasher, expression.output_kind());
771                        write_hash_str_u32(hasher, expression.canonical_text());
772                    }
773                }
774            }
775        }
776    }
777}
778
779fn hash_persisted_index_field_path(
780    hasher: &mut sha2::Sha256,
781    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
782) {
783    write_hash_u32(hasher, path.field_id().get());
784    write_hash_u32(hasher, u32::from(path.slot().get()));
785    write_hash_len_u32(hasher, path.path().len());
786    for segment in path.path() {
787        write_hash_str_u32(hasher, segment);
788    }
789    hash_persisted_field_kind(hasher, path.kind());
790    write_hash_tag_u8(hasher, u8::from(path.nullable()));
791}
792
793fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
794    match kind {
795        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
796        PersistedFieldKind::Blob { max_len } => {
797            write_hash_tag_u8(hasher, 2);
798            hash_optional_u32(hasher, *max_len);
799        }
800        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
801        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
802        PersistedFieldKind::Decimal { scale } => {
803            write_hash_tag_u8(hasher, 5);
804            write_hash_u32(hasher, *scale);
805        }
806        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
807        PersistedFieldKind::Enum { path, variants } => {
808            write_hash_tag_u8(hasher, 7);
809            write_hash_str_u32(hasher, path);
810            write_hash_len_u32(hasher, variants.len());
811            for variant in variants {
812                write_hash_str_u32(hasher, variant.ident());
813                match variant.payload_kind() {
814                    Some(payload_kind) => {
815                        write_hash_tag_u8(hasher, 1);
816                        hash_persisted_field_kind(hasher, payload_kind);
817                    }
818                    None => write_hash_tag_u8(hasher, 0),
819                }
820                write_hash_str_u32(
821                    hasher,
822                    field_storage_decode_name(variant.payload_storage_decode()),
823                );
824            }
825        }
826        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
827        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
828        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
829        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
830        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
831        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
832        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
833        PersistedFieldKind::IntBig { max_bytes } => {
834            write_hash_tag_u8(hasher, 15);
835            write_hash_u32(hasher, *max_bytes);
836        }
837        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
838        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
839        PersistedFieldKind::Text { max_len } => {
840            write_hash_tag_u8(hasher, 18);
841            hash_optional_u32(hasher, *max_len);
842        }
843        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
844        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
845        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
846        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
847        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
848        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
849        PersistedFieldKind::NatBig { max_bytes } => {
850            write_hash_tag_u8(hasher, 25);
851            write_hash_u32(hasher, *max_bytes);
852        }
853        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
854        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
855        PersistedFieldKind::Relation {
856            target_path,
857            target_entity_name,
858            target_entity_tag,
859            target_store_path,
860            key_kind,
861            strength,
862        } => {
863            write_hash_tag_u8(hasher, 28);
864            write_hash_str_u32(hasher, target_path);
865            write_hash_str_u32(hasher, target_entity_name);
866            write_hash_u64(hasher, target_entity_tag.value());
867            write_hash_str_u32(hasher, target_store_path);
868            hash_persisted_field_kind(hasher, key_kind);
869            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
870        }
871        PersistedFieldKind::List(inner) => {
872            write_hash_tag_u8(hasher, 29);
873            hash_persisted_field_kind(hasher, inner);
874        }
875        PersistedFieldKind::Set(inner) => {
876            write_hash_tag_u8(hasher, 30);
877            hash_persisted_field_kind(hasher, inner);
878        }
879        PersistedFieldKind::Map { key, value } => {
880            write_hash_tag_u8(hasher, 31);
881            hash_persisted_field_kind(hasher, key);
882            hash_persisted_field_kind(hasher, value);
883        }
884        PersistedFieldKind::Structured { queryable } => {
885            write_hash_tag_u8(hasher, 32);
886            write_hash_tag_u8(hasher, u8::from(*queryable));
887        }
888    }
889}
890
891fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
892    match value {
893        Some(value) => {
894            write_hash_tag_u8(hasher, 1);
895            write_hash_u32(hasher, value);
896        }
897        None => write_hash_tag_u8(hasher, 0),
898    }
899}
900
901const fn persisted_index_origin_name(
902    origin: crate::db::schema::PersistedIndexOrigin,
903) -> &'static str {
904    match origin {
905        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
906        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
907    }
908}
909
910const fn persisted_expression_op_name(
911    op: crate::db::schema::PersistedIndexExpressionOp,
912) -> &'static str {
913    match op {
914        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
915        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
916        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
917        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
918        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
919        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
920        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
921        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
922    }
923}
924
925const fn persisted_relation_strength_name(
926    strength: crate::db::schema::PersistedRelationStrength,
927) -> &'static str {
928    match strength {
929        crate::db::schema::PersistedRelationStrength::Strong => "strong",
930        crate::db::schema::PersistedRelationStrength::Weak => "weak",
931    }
932}
933
934const fn field_storage_decode_name(
935    decode: crate::model::field::FieldStorageDecode,
936) -> &'static str {
937    match decode {
938        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
939        crate::model::field::FieldStorageDecode::Value => "value",
940    }
941}
942
943///
944/// TESTS
945///
946
947#[cfg(test)]
948mod tests {
949    use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore, SchemaStoreVisit};
950    use crate::{
951        db::schema::{
952            FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
953            PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
954            PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
955            SchemaVersion, encode_persisted_schema_snapshot,
956        },
957        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
958        testing::test_memory,
959        traits::Storable,
960        types::EntityTag,
961    };
962    use std::borrow::Cow;
963    use std::convert::Infallible;
964
965    #[test]
966    fn raw_schema_key_round_trips_entity_and_version() {
967        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
968            SchemaVersion::initial()
969        });
970        let encoded = key.to_bytes().into_owned();
971        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
972
973        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
974        assert_eq!(decoded.version(), SchemaVersion::initial().get());
975    }
976
977    #[test]
978    fn raw_schema_snapshot_round_trips_payload_bytes() {
979        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
980        let encoded = snapshot.to_bytes().into_owned();
981        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
982
983        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
984        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
985    }
986
987    #[test]
988    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
989        let mut store = SchemaStore::init(test_memory(251));
990        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
991
992        assert!(store.is_empty());
993        assert!(!store.contains_raw_snapshot(&key));
994
995        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
996
997        assert_eq!(store.len(), 1);
998        assert!(store.contains_raw_snapshot(&key));
999        assert_eq!(
1000            store
1001                .get_raw_snapshot(&key)
1002                .expect("schema snapshot should be present")
1003                .as_bytes(),
1004            &[9, 4, 6],
1005        );
1006
1007        store.clear();
1008        assert!(store.is_empty());
1009    }
1010
1011    #[test]
1012    fn schema_store_loads_latest_snapshot_for_entity() {
1013        let mut store = SchemaStore::init(test_memory(252));
1014        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1015        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1016        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1017
1018        store
1019            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1020            .expect("initial schema snapshot should encode");
1021        store
1022            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1023            .expect("other entity schema snapshot should encode");
1024        store
1025            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1026            .expect("newer schema snapshot should encode");
1027
1028        let latest = store
1029            .latest_persisted_snapshot(EntityTag::new(41))
1030            .expect("latest schema snapshot should decode")
1031            .expect("schema snapshot should exist");
1032
1033        assert_eq!(latest.version(), SchemaVersion::new(2));
1034        assert_eq!(latest.entity_name(), "Newer");
1035    }
1036
1037    #[test]
1038    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1039        let mut store = SchemaStore::init(test_memory(242));
1040        store.insert_raw_snapshot(
1041            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1042            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1043        );
1044        store.insert_raw_snapshot(
1045            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1046            RawSchemaSnapshot::from_bytes(vec![5, 8]),
1047        );
1048        store.insert_raw_snapshot(
1049            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1050            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1051        );
1052
1053        let footprint = store.entity_footprint(EntityTag::new(71));
1054
1055        assert_eq!(footprint.snapshots(), 2);
1056        assert_eq!(footprint.encoded_bytes(), 7);
1057        assert_eq!(footprint.latest_snapshot_bytes(), 4);
1058    }
1059
1060    #[test]
1061    fn schema_store_visit_raw_snapshots_preserves_key_order() {
1062        let mut store = SchemaStore::init(test_memory(235));
1063        store.insert_raw_snapshot(
1064            RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1065            RawSchemaSnapshot::from_bytes(vec![32]),
1066        );
1067        store.insert_raw_snapshot(
1068            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1069            RawSchemaSnapshot::from_bytes(vec![13]),
1070        );
1071        store.insert_raw_snapshot(
1072            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1073            RawSchemaSnapshot::from_bytes(vec![11]),
1074        );
1075
1076        let mut visited = Vec::new();
1077        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1078            visited.push((
1079                key.entity_tag().value(),
1080                key.version(),
1081                snapshot.as_bytes()[0],
1082            ));
1083            Ok(SchemaStoreVisit::Continue)
1084        });
1085
1086        assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1087    }
1088
1089    #[test]
1090    fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1091        let mut store = SchemaStore::init(test_memory(234));
1092        store.insert_raw_snapshot(
1093            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1094            RawSchemaSnapshot::from_bytes(vec![21]),
1095        );
1096        store.insert_raw_snapshot(
1097            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1098            RawSchemaSnapshot::from_bytes(vec![22]),
1099        );
1100
1101        let mut visited = Vec::new();
1102        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1103            visited.push(key.version());
1104            Ok(SchemaStoreVisit::Stop)
1105        });
1106
1107        assert_eq!(visited, vec![1]);
1108    }
1109
1110    #[test]
1111    fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1112        let mut store = SchemaStore::init_heap();
1113        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1114        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1115        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1116
1117        store
1118            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1119            .expect("initial heap schema snapshot should encode");
1120        store
1121            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1122            .expect("other heap schema snapshot should encode");
1123        store
1124            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1125            .expect("newer heap schema snapshot should encode");
1126
1127        let latest = store
1128            .latest_persisted_snapshot(EntityTag::new(41))
1129            .expect("latest heap schema snapshot should decode")
1130            .expect("heap schema snapshot should exist");
1131        assert_eq!(latest.version(), SchemaVersion::new(2));
1132        assert_eq!(latest.entity_name(), "Newer");
1133
1134        let mut visited = Vec::new();
1135        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1136            visited.push((
1137                key.entity_tag().value(),
1138                key.version(),
1139                snapshot.as_bytes().len(),
1140            ));
1141            Ok(if visited.len() == 2 {
1142                SchemaStoreVisit::Stop
1143            } else {
1144                SchemaStoreVisit::Continue
1145            })
1146        });
1147        assert_eq!(
1148            visited
1149                .iter()
1150                .map(|(entity, version, _)| (*entity, *version))
1151                .collect::<Vec<_>>(),
1152            vec![(41, 1), (41, 2)]
1153        );
1154    }
1155
1156    #[test]
1157    fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1158        let store = SchemaStore::init(test_memory(241));
1159
1160        assert_eq!(
1161            store
1162                .catalog_metadata()
1163                .expect("empty schema catalog metadata should derive"),
1164            None
1165        );
1166    }
1167
1168    #[test]
1169    fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1170        let mut store = SchemaStore::init(test_memory(240));
1171        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1172        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1173        let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1174
1175        store
1176            .insert_persisted_snapshot(EntityTag::new(81), &initial)
1177            .expect("initial schema snapshot should encode");
1178        let initial_metadata = store
1179            .catalog_metadata()
1180            .expect("initial schema catalog metadata should derive")
1181            .expect("initial schema catalog metadata should be present");
1182
1183        store
1184            .insert_persisted_snapshot(EntityTag::new(81), &newer)
1185            .expect("newer schema snapshot should encode");
1186        store
1187            .insert_persisted_snapshot(EntityTag::new(82), &other)
1188            .expect("other schema snapshot should encode");
1189        let updated_metadata = store
1190            .catalog_metadata()
1191            .expect("updated schema catalog metadata should derive")
1192            .expect("updated schema catalog metadata should be present");
1193
1194        assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1195        assert_eq!(initial_metadata.entity_count(), 1);
1196        assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1197        assert_eq!(updated_metadata.entity_count(), 2);
1198        assert_ne!(
1199            initial_metadata.schema_fingerprint(),
1200            updated_metadata.schema_fingerprint(),
1201            "catalog fingerprint must change when latest accepted schema catalog changes"
1202        );
1203    }
1204
1205    #[test]
1206    fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1207        let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1208        let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1209
1210        let mut left = SchemaStore::init(test_memory(239));
1211        left.insert_persisted_snapshot(EntityTag::new(91), &first)
1212            .expect("first schema snapshot should encode");
1213        left.insert_persisted_snapshot(EntityTag::new(92), &second)
1214            .expect("second schema snapshot should encode");
1215
1216        let mut right = SchemaStore::init(test_memory(238));
1217        right
1218            .insert_persisted_snapshot(EntityTag::new(92), &second)
1219            .expect("second schema snapshot should encode");
1220        right
1221            .insert_persisted_snapshot(EntityTag::new(91), &first)
1222            .expect("first schema snapshot should encode");
1223
1224        let left_metadata = left
1225            .catalog_metadata()
1226            .expect("left schema catalog metadata should derive");
1227        let right_metadata = right
1228            .catalog_metadata()
1229            .expect("right schema catalog metadata should derive");
1230
1231        assert_eq!(left_metadata, right_metadata);
1232    }
1233
1234    #[test]
1235    fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1236        let without_index =
1237            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1238        let with_index = persisted_schema_snapshot_with_index_for_test(
1239            SchemaVersion::initial(),
1240            "RoleSpecific",
1241            "payload_idx",
1242        );
1243
1244        let mut base = SchemaStore::init(test_memory(237));
1245        base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1246            .expect("base schema snapshot should encode");
1247        let base_metadata = base
1248            .allocation_metadata()
1249            .expect("base allocation metadata should derive")
1250            .expect("base allocation metadata should be present");
1251
1252        let mut indexed = SchemaStore::init(test_memory(236));
1253        indexed
1254            .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1255            .expect("indexed schema snapshot should encode");
1256        let indexed_metadata = indexed
1257            .allocation_metadata()
1258            .expect("indexed allocation metadata should derive")
1259            .expect("indexed allocation metadata should be present");
1260
1261        assert_eq!(
1262            base_metadata.data().schema_fingerprint(),
1263            indexed_metadata.data().schema_fingerprint(),
1264            "data allocation metadata should ignore accepted index catalog changes"
1265        );
1266        assert_ne!(
1267            base_metadata.index().schema_fingerprint(),
1268            indexed_metadata.index().schema_fingerprint(),
1269            "index allocation metadata should change when accepted index catalog changes"
1270        );
1271        assert_ne!(
1272            base_metadata.schema().schema_fingerprint(),
1273            indexed_metadata.schema().schema_fingerprint(),
1274            "schema allocation metadata should change when full accepted catalog changes"
1275        );
1276        assert_ne!(
1277            indexed_metadata.data().schema_fingerprint(),
1278            indexed_metadata.index().schema_fingerprint(),
1279            "data and index allocation metadata should have distinct role fingerprints"
1280        );
1281        assert_ne!(
1282            indexed_metadata.index().schema_fingerprint(),
1283            indexed_metadata.schema().schema_fingerprint(),
1284            "index and schema allocation metadata should have distinct role fingerprints"
1285        );
1286    }
1287
1288    #[test]
1289    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1290        let mut store = SchemaStore::init(test_memory(253));
1291        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1292            SchemaVersion::new(2),
1293            SchemaVersion::initial(),
1294            "Invalid",
1295        );
1296
1297        let err = store
1298            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1299            .expect_err("schema store should reject mismatched snapshot/layout versions");
1300
1301        assert!(
1302            err.message()
1303                .contains("schema snapshot row-layout version mismatch"),
1304            "schema store should preserve the version mismatch diagnostic"
1305        );
1306    }
1307
1308    #[test]
1309    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1310        let mut store = SchemaStore::init(test_memory(254));
1311        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1312        let invalid = PersistedSchemaSnapshot::new(
1313            base.version(),
1314            base.entity_path().to_string(),
1315            base.entity_name().to_string(),
1316            base.first_primary_key_field_id(),
1317            SchemaRowLayout::new(
1318                base.version(),
1319                vec![
1320                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1321                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1322                ],
1323            ),
1324            base.fields().to_vec(),
1325        );
1326
1327        let err = store
1328            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1329            .expect_err("schema store should reject divergent field/layout slots");
1330
1331        assert!(
1332            err.message()
1333                .contains("schema snapshot field slot mismatch"),
1334            "schema store should report the duplicated slot divergence"
1335        );
1336    }
1337
1338    #[test]
1339    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1340        let mut store = SchemaStore::init(test_memory(246));
1341        let base =
1342            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1343        let invalid = PersistedSchemaSnapshot::new(
1344            base.version(),
1345            base.entity_path().to_string(),
1346            base.entity_name().to_string(),
1347            base.first_primary_key_field_id(),
1348            SchemaRowLayout::new(
1349                base.version(),
1350                vec![
1351                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1352                    (FieldId::new(2), SchemaFieldSlot::new(0)),
1353                ],
1354            ),
1355            base.fields().to_vec(),
1356        );
1357
1358        let err = store
1359            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1360            .expect_err("schema store should reject duplicate row-layout slots");
1361
1362        assert!(
1363            err.message()
1364                .contains("schema snapshot duplicate row-layout slot"),
1365            "schema store should report the row-layout slot ambiguity"
1366        );
1367    }
1368
1369    #[test]
1370    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1371        let mut store = SchemaStore::init(test_memory(248));
1372        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1373        let invalid = PersistedSchemaSnapshot::new(
1374            base.version(),
1375            base.entity_path().to_string(),
1376            base.entity_name().to_string(),
1377            FieldId::new(99),
1378            base.row_layout().clone(),
1379            base.fields().to_vec(),
1380        );
1381
1382        let err = store
1383            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1384            .expect_err("schema store should reject snapshots without the primary-key field");
1385
1386        assert!(
1387            err.message()
1388                .contains("schema snapshot primary key field missing from row layout"),
1389            "schema store should report the missing primary-key field"
1390        );
1391    }
1392
1393    #[test]
1394    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1395        let mut store = SchemaStore::init(test_memory(249));
1396        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1397        let corrupt_key =
1398            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1399
1400        store
1401            .insert_persisted_snapshot(EntityTag::new(45), &initial)
1402            .expect("initial schema snapshot should encode");
1403        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1404
1405        let err = store
1406            .latest_persisted_snapshot(EntityTag::new(45))
1407            .expect_err("latest corrupt schema snapshot must fail closed");
1408
1409        assert!(
1410            err.message()
1411                .contains("failed to decode persisted schema snapshot"),
1412            "latest-version lookup should report the corrupt newest snapshot"
1413        );
1414    }
1415
1416    #[test]
1417    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1418        let mut store = SchemaStore::init(test_memory(250));
1419        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1420        let invalid = PersistedSchemaSnapshot::new(
1421            base.version(),
1422            base.entity_path().to_string(),
1423            base.entity_name().to_string(),
1424            base.first_primary_key_field_id(),
1425            SchemaRowLayout::new(
1426                base.version(),
1427                vec![
1428                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1429                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1430                ],
1431            ),
1432            base.fields().to_vec(),
1433        );
1434        let raw = encode_persisted_schema_snapshot(&invalid)
1435            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1436        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1437
1438        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1439
1440        let err = store
1441            .latest_persisted_snapshot(EntityTag::new(46))
1442            .expect_err("raw decode should reject divergent field/layout slots");
1443
1444        assert!(
1445            err.message()
1446                .contains("persisted schema snapshot field slot mismatch"),
1447            "schema codec should report the raw decoded slot divergence"
1448        );
1449    }
1450
1451    #[test]
1452    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1453        let mut store = SchemaStore::init(test_memory(247));
1454        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1455        let invalid = PersistedSchemaSnapshot::new(
1456            base.version(),
1457            base.entity_path().to_string(),
1458            base.entity_name().to_string(),
1459            FieldId::new(99),
1460            base.row_layout().clone(),
1461            base.fields().to_vec(),
1462        );
1463        let raw = encode_persisted_schema_snapshot(&invalid)
1464            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1465        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1466
1467        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1468
1469        let err = store
1470            .latest_persisted_snapshot(EntityTag::new(48))
1471            .expect_err("raw decode should reject snapshots without the primary-key field");
1472
1473        assert!(
1474            err.message()
1475                .contains("persisted schema snapshot primary key field missing from row layout"),
1476            "schema codec should report the raw decoded missing primary-key field"
1477        );
1478    }
1479
1480    #[test]
1481    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1482        let mut store = SchemaStore::init(test_memory(245));
1483        let base =
1484            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1485        let mut fields = base.fields().to_vec();
1486        let duplicate = PersistedFieldSnapshot::new(
1487            fields[1].id(),
1488            fields[0].name().to_string(),
1489            fields[1].slot(),
1490            fields[1].kind().clone(),
1491            fields[1].nested_leaves().to_vec(),
1492            fields[1].nullable(),
1493            fields[1].default().clone(),
1494            fields[1].storage_decode(),
1495            fields[1].leaf_codec(),
1496        );
1497        fields[1] = duplicate;
1498        let invalid = PersistedSchemaSnapshot::new(
1499            base.version(),
1500            base.entity_path().to_string(),
1501            base.entity_name().to_string(),
1502            base.first_primary_key_field_id(),
1503            base.row_layout().clone(),
1504            fields,
1505        );
1506        let raw = encode_persisted_schema_snapshot(&invalid)
1507            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1508        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1509
1510        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1511
1512        let err = store
1513            .latest_persisted_snapshot(EntityTag::new(50))
1514            .expect_err("raw decode should reject duplicate field names");
1515
1516        assert!(
1517            err.message()
1518                .contains("persisted schema snapshot duplicate field name"),
1519            "schema codec should report the raw decoded field-name ambiguity"
1520        );
1521    }
1522
1523    #[test]
1524    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1525        let mut store = SchemaStore::init(test_memory(244));
1526        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1527        let mut fields = base.fields().to_vec();
1528        let invalid_field = PersistedFieldSnapshot::new(
1529            fields[1].id(),
1530            fields[1].name().to_string(),
1531            fields[1].slot(),
1532            fields[1].kind().clone(),
1533            vec![PersistedNestedLeafSnapshot::new(
1534                Vec::new(),
1535                PersistedFieldKind::Blob { max_len: None },
1536                false,
1537                FieldStorageDecode::ByKind,
1538                LeafCodec::Scalar(ScalarCodec::Blob),
1539            )],
1540            fields[1].nullable(),
1541            fields[1].default().clone(),
1542            fields[1].storage_decode(),
1543            fields[1].leaf_codec(),
1544        );
1545        fields[1] = invalid_field;
1546        let invalid = PersistedSchemaSnapshot::new(
1547            base.version(),
1548            base.entity_path().to_string(),
1549            base.entity_name().to_string(),
1550            base.first_primary_key_field_id(),
1551            base.row_layout().clone(),
1552            fields,
1553        );
1554
1555        let err = store
1556            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1557            .expect_err("schema store should reject empty nested leaf paths");
1558
1559        assert!(
1560            err.message()
1561                .contains("schema snapshot empty nested leaf path"),
1562            "schema store should report the empty nested leaf path"
1563        );
1564    }
1565
1566    #[test]
1567    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1568        let mut store = SchemaStore::init(test_memory(243));
1569        let base =
1570            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1571        let mut fields = base.fields().to_vec();
1572        let duplicate_leaves = vec![
1573            PersistedNestedLeafSnapshot::new(
1574                vec!["bytes".to_string()],
1575                PersistedFieldKind::Blob { max_len: None },
1576                false,
1577                FieldStorageDecode::ByKind,
1578                LeafCodec::Scalar(ScalarCodec::Blob),
1579            ),
1580            PersistedNestedLeafSnapshot::new(
1581                vec!["bytes".to_string()],
1582                PersistedFieldKind::Text { max_len: None },
1583                false,
1584                FieldStorageDecode::ByKind,
1585                LeafCodec::Scalar(ScalarCodec::Text),
1586            ),
1587        ];
1588        let invalid_field = PersistedFieldSnapshot::new(
1589            fields[1].id(),
1590            fields[1].name().to_string(),
1591            fields[1].slot(),
1592            fields[1].kind().clone(),
1593            duplicate_leaves,
1594            fields[1].nullable(),
1595            fields[1].default().clone(),
1596            fields[1].storage_decode(),
1597            fields[1].leaf_codec(),
1598        );
1599        fields[1] = invalid_field;
1600        let invalid = PersistedSchemaSnapshot::new(
1601            base.version(),
1602            base.entity_path().to_string(),
1603            base.entity_name().to_string(),
1604            base.first_primary_key_field_id(),
1605            base.row_layout().clone(),
1606            fields,
1607        );
1608        let raw = encode_persisted_schema_snapshot(&invalid)
1609            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1610        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1611
1612        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1613
1614        let err = store
1615            .latest_persisted_snapshot(EntityTag::new(52))
1616            .expect_err("raw decode should reject duplicate nested leaf paths");
1617
1618        assert!(
1619            err.message()
1620                .contains("persisted schema snapshot duplicate nested leaf path"),
1621            "schema codec should report the raw decoded nested path ambiguity"
1622        );
1623    }
1624
1625    #[test]
1626    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1627        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1628
1629        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1630            .expect("schema snapshot should encode");
1631        let decoded = raw
1632            .decode_persisted_snapshot()
1633            .expect("schema snapshot should decode");
1634
1635        assert_eq!(decoded, snapshot);
1636    }
1637
1638    // Build one typed schema snapshot used by schema-store tests. The exact
1639    // field contracts are intentionally rich enough to cover nested metadata,
1640    // scalar codecs, and structural fallback payloads through the raw store.
1641    fn persisted_schema_snapshot_for_test(
1642        version: SchemaVersion,
1643        entity_name: &str,
1644    ) -> PersistedSchemaSnapshot {
1645        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1646    }
1647
1648    fn persisted_schema_snapshot_with_index_for_test(
1649        version: SchemaVersion,
1650        entity_name: &str,
1651        index_name: &str,
1652    ) -> PersistedSchemaSnapshot {
1653        let base = persisted_schema_snapshot_for_test(version, entity_name);
1654
1655        PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1656            base.version(),
1657            base.entity_path().to_string(),
1658            base.entity_name().to_string(),
1659            base.primary_key_field_ids().to_vec(),
1660            base.row_layout().clone(),
1661            base.fields().to_vec(),
1662            vec![PersistedIndexSnapshot::new(
1663                0,
1664                index_name.to_string(),
1665                "RoleSpecificStore".to_string(),
1666                false,
1667                PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1668                    FieldId::new(1),
1669                    SchemaFieldSlot::new(0),
1670                    vec!["id".to_string()],
1671                    PersistedFieldKind::Ulid,
1672                    false,
1673                )]),
1674                None,
1675            )],
1676        )
1677    }
1678
1679    // Build one typed schema snapshot with independently selectable snapshot
1680    // and row-layout versions. Production snapshots should keep these aligned;
1681    // tests can deliberately break that invariant at the store boundary.
1682    fn persisted_schema_snapshot_with_layout_version_for_test(
1683        version: SchemaVersion,
1684        layout_version: SchemaVersion,
1685        entity_name: &str,
1686    ) -> PersistedSchemaSnapshot {
1687        PersistedSchemaSnapshot::new(
1688            version,
1689            format!("entities::{entity_name}"),
1690            entity_name.to_string(),
1691            FieldId::new(1),
1692            SchemaRowLayout::new(
1693                layout_version,
1694                vec![
1695                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1696                    (FieldId::new(2), SchemaFieldSlot::new(1)),
1697                ],
1698            ),
1699            vec![
1700                PersistedFieldSnapshot::new(
1701                    FieldId::new(1),
1702                    "id".to_string(),
1703                    SchemaFieldSlot::new(0),
1704                    PersistedFieldKind::Ulid,
1705                    Vec::new(),
1706                    false,
1707                    SchemaFieldDefault::None,
1708                    FieldStorageDecode::ByKind,
1709                    LeafCodec::Scalar(ScalarCodec::Ulid),
1710                ),
1711                PersistedFieldSnapshot::new(
1712                    FieldId::new(2),
1713                    "payload".to_string(),
1714                    SchemaFieldSlot::new(1),
1715                    PersistedFieldKind::Map {
1716                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
1717                        value: Box::new(PersistedFieldKind::List(Box::new(
1718                            PersistedFieldKind::Nat64,
1719                        ))),
1720                    },
1721                    vec![PersistedNestedLeafSnapshot::new(
1722                        vec!["bytes".to_string()],
1723                        PersistedFieldKind::Blob { max_len: None },
1724                        false,
1725                        FieldStorageDecode::ByKind,
1726                        LeafCodec::Scalar(ScalarCodec::Blob),
1727                    )],
1728                    false,
1729                    SchemaFieldDefault::None,
1730                    FieldStorageDecode::ByKind,
1731                    LeafCodec::StructuralFallback,
1732                ),
1733            ],
1734        )
1735    }
1736}