Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        schema::{
14            PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15            PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17        },
18    },
19    error::InternalError,
20    traits::Storable,
21    types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{
25    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
26};
27use sha2::Digest;
28use std::borrow::Cow;
29use std::collections::BTreeMap as StdBTreeMap;
30
31const SCHEMA_KEY_BYTES_USIZE: usize = 12;
32const SCHEMA_KEY_BYTES: u32 = 12;
33const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
34const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
35const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
36const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
37
38///
39/// RawSchemaKey
40///
41/// Stable key for one persisted schema snapshot entry.
42/// It combines the entity tag and schema version so reconciliation can load
43/// concrete versions without depending on generated entity names.
44///
45
46#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
47struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
48
49impl RawSchemaKey {
50    /// Build the raw persisted key for one entity schema version.
51    #[must_use]
52    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
53        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
54        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
55        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
56
57        Self(out)
58    }
59
60    /// Return the entity tag encoded in this schema key.
61    #[must_use]
62    fn entity_tag(self) -> EntityTag {
63        let mut bytes = [0u8; size_of::<u64>()];
64        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
65
66        EntityTag::new(u64::from_be_bytes(bytes))
67    }
68
69    /// Return the schema version encoded in this schema key.
70    #[must_use]
71    fn version(self) -> u32 {
72        let mut bytes = [0u8; size_of::<u32>()];
73        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
74
75        u32::from_be_bytes(bytes)
76    }
77}
78
79impl Storable for RawSchemaKey {
80    fn to_bytes(&self) -> Cow<'_, [u8]> {
81        Cow::Borrowed(&self.0)
82    }
83
84    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
85        debug_assert_eq!(
86            bytes.len(),
87            SCHEMA_KEY_BYTES_USIZE,
88            "RawSchemaKey::from_bytes received unexpected byte length",
89        );
90
91        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
92            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
93        }
94
95        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
96        out.copy_from_slice(bytes.as_ref());
97        Self(out)
98    }
99
100    fn into_bytes(self) -> Vec<u8> {
101        self.0.to_vec()
102    }
103
104    const BOUND: Bound = Bound::Bounded {
105        max_size: SCHEMA_KEY_BYTES,
106        is_fixed_size: true,
107    };
108}
109
110///
111/// RawSchemaSnapshot
112///
113/// Raw persisted schema snapshot payload.
114/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
115/// keeping the stable-memory value boundary independent from the typed schema
116/// DTOs used by reconciliation.
117///
118
119#[derive(Clone, Debug, Eq, PartialEq)]
120struct RawSchemaSnapshot(Vec<u8>);
121
122impl RawSchemaSnapshot {
123    /// Encode one typed persisted-schema snapshot into a raw store payload.
124    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
125        validate_typed_schema_snapshot_for_store(snapshot)?;
126
127        encode_persisted_schema_snapshot(snapshot).map(Self)
128    }
129
130    /// Build one raw schema snapshot from already-encoded bytes.
131    #[must_use]
132    #[cfg(test)]
133    const fn from_bytes(bytes: Vec<u8>) -> Self {
134        Self(bytes)
135    }
136
137    /// Borrow the encoded schema snapshot payload.
138    #[must_use]
139    const fn as_bytes(&self) -> &[u8] {
140        self.0.as_slice()
141    }
142
143    /// Consume the snapshot into its encoded payload bytes.
144    #[must_use]
145    #[cfg(test)]
146    fn into_bytes(self) -> Vec<u8> {
147        self.0
148    }
149
150    /// Decode this raw store payload into a typed persisted-schema snapshot.
151    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
152        decode_persisted_schema_snapshot(self.as_bytes())
153    }
154}
155
156impl Storable for RawSchemaSnapshot {
157    fn to_bytes(&self) -> Cow<'_, [u8]> {
158        Cow::Borrowed(self.as_bytes())
159    }
160
161    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
162        Self(bytes.into_owned())
163    }
164
165    fn into_bytes(self) -> Vec<u8> {
166        self.0
167    }
168
169    const BOUND: Bound = Bound::Bounded {
170        max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
171        is_fixed_size: false,
172    };
173}
174
175// Validate typed schema snapshots before they are encoded into the raw schema
176// metadata store. This catches caller-side invariant violations separately from
177// raw persisted-byte corruption handled by the codec decode boundary.
178fn validate_typed_schema_snapshot_for_store(
179    snapshot: &PersistedSchemaSnapshot,
180) -> Result<(), InternalError> {
181    if let Some(detail) = schema_snapshot_integrity_detail(
182        "schema snapshot",
183        snapshot.version(),
184        snapshot.primary_key_field_ids(),
185        snapshot.row_layout(),
186        snapshot.fields(),
187    ) {
188        return Err(InternalError::store_invariant(detail));
189    }
190
191    Ok(())
192}
193
194///
195/// SchemaStoreFootprint
196///
197/// Current raw schema metadata footprint for one entity. Reconciliation uses
198/// this value to report stable-memory pressure without decoding schema payloads
199/// or exposing field-level metadata through metrics.
200///
201
202#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203pub(in crate::db) struct SchemaStoreFootprint {
204    snapshots: u64,
205    encoded_bytes: u64,
206    latest_snapshot_bytes: u64,
207}
208
209///
210/// SchemaStoreCatalogMetadata
211///
212/// Accepted schema-store catalog metadata derived from latest persisted
213/// snapshots. This is diagnostic allocation metadata, not allocation identity.
214///
215
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct SchemaStoreCatalogMetadata {
218    schema_version: SchemaVersion,
219    schema_fingerprint: CommitSchemaFingerprint,
220    entity_count: u64,
221}
222
223impl SchemaStoreCatalogMetadata {
224    /// Build catalog metadata from already-derived accepted schema facts.
225    #[must_use]
226    const fn new(
227        schema_version: SchemaVersion,
228        schema_fingerprint: CommitSchemaFingerprint,
229        entity_count: u64,
230    ) -> Self {
231        Self {
232            schema_version,
233            schema_fingerprint,
234            entity_count,
235        }
236    }
237
238    /// Return the maximum latest schema version represented in the catalog.
239    #[must_use]
240    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
241        self.schema_version
242    }
243
244    /// Return the deterministic catalog fingerprint for latest accepted
245    /// snapshots.
246    #[must_use]
247    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
248        self.schema_fingerprint
249    }
250
251    /// Return number of entity schemas represented in this catalog metadata.
252    #[must_use]
253    pub(in crate::db) const fn entity_count(self) -> u64 {
254        self.entity_count
255    }
256}
257
258///
259/// SchemaStoreAllocationMetadata
260///
261/// Role-specific allocation metadata derived from latest accepted schema-store
262/// snapshots. These fingerprints describe the accepted contract that owns each
263/// allocation role; they are diagnostics, not allocation identity.
264///
265
266#[derive(Clone, Copy, Debug, Eq, PartialEq)]
267pub(in crate::db) struct SchemaStoreAllocationMetadata {
268    data: SchemaStoreCatalogMetadata,
269    index: SchemaStoreCatalogMetadata,
270    schema: SchemaStoreCatalogMetadata,
271}
272
273impl SchemaStoreAllocationMetadata {
274    /// Build one role-specific metadata set from already-derived accepted
275    /// schema facts.
276    #[must_use]
277    const fn new(
278        data: SchemaStoreCatalogMetadata,
279        index: SchemaStoreCatalogMetadata,
280        schema: SchemaStoreCatalogMetadata,
281    ) -> Self {
282        Self {
283            data,
284            index,
285            schema,
286        }
287    }
288
289    /// Return accepted row-layout allocation metadata for data memory.
290    #[must_use]
291    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
292        self.data
293    }
294
295    /// Return accepted index-catalog allocation metadata for index memory.
296    #[must_use]
297    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
298        self.index
299    }
300
301    /// Return accepted full schema-catalog allocation metadata for schema
302    /// memory.
303    #[must_use]
304    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
305        self.schema
306    }
307}
308
309impl SchemaStoreFootprint {
310    /// Build one schema-store footprint from already-counted raw payload facts.
311    #[must_use]
312    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
313        Self {
314            snapshots,
315            encoded_bytes,
316            latest_snapshot_bytes,
317        }
318    }
319
320    /// Return the number of raw schema snapshots stored for the entity.
321    #[must_use]
322    pub(in crate::db) const fn snapshots(self) -> u64 {
323        self.snapshots
324    }
325
326    /// Return the total encoded payload bytes stored for the entity.
327    #[must_use]
328    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
329        self.encoded_bytes
330    }
331
332    /// Return the encoded payload bytes for the highest-version snapshot.
333    #[must_use]
334    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
335        self.latest_snapshot_bytes
336    }
337}
338
339///
340/// SchemaStore
341///
342/// Thin persistence wrapper over one stable or heap schema metadata BTreeMap.
343/// Startup reconciliation writes and validates encoded schema snapshots here
344/// before row/index operations proceed.
345///
346
347pub struct SchemaStore {
348    backend: SchemaStoreBackend,
349}
350
351enum SchemaStoreBackend {
352    Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
353    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
354}
355
356/// Control-flow result for schema-store traversal visitors.
357#[derive(Clone, Copy, Debug, Eq, PartialEq)]
358enum SchemaStoreVisit {
359    Continue,
360    #[allow(
361        dead_code,
362        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
363    )]
364    Stop,
365}
366
367impl SchemaStoreVisit {
368    const fn should_stop(self) -> bool {
369        matches!(self, Self::Stop)
370    }
371}
372
373impl SchemaStore {
374    /// Initialize the schema store with the provided backing memory.
375    #[must_use]
376    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
377        Self {
378            backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
379        }
380    }
381
382    /// Initialize a volatile heap-backed schema store.
383    #[must_use]
384    pub const fn init_heap() -> Self {
385        Self {
386            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
387        }
388    }
389
390    /// Return whether this schema store is heap-backed and volatile.
391    #[must_use]
392    pub(in crate::db) const fn is_heap_storage(&self) -> bool {
393        matches!(self.backend, SchemaStoreBackend::Heap(_))
394    }
395
396    /// Insert or replace one typed persisted schema snapshot.
397    pub(in crate::db) fn insert_persisted_snapshot(
398        &mut self,
399        entity: EntityTag,
400        snapshot: &PersistedSchemaSnapshot,
401    ) -> Result<(), InternalError> {
402        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
403        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
404        let _ = self.insert_raw_snapshot(key, raw_snapshot);
405
406        Ok(())
407    }
408
409    /// Load and decode one typed persisted schema snapshot.
410    #[cfg(test)]
411    pub(in crate::db) fn get_persisted_snapshot(
412        &self,
413        entity: EntityTag,
414        version: SchemaVersion,
415    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
416        let key = RawSchemaKey::from_entity_version(entity, version);
417        self.get_raw_snapshot(&key)
418            .map(|snapshot| snapshot.decode_persisted_snapshot())
419            .transpose()
420    }
421
422    /// Load and decode the highest stored schema snapshot version for one entity.
423    pub(in crate::db) fn latest_persisted_snapshot(
424        &self,
425        entity: EntityTag,
426    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
427        let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
428        let _: Result<(), InternalError> = self.visit_raw_snapshots(|key, snapshot| {
429            if key.entity_tag() != entity {
430                return Ok(SchemaStoreVisit::Continue);
431            }
432
433            let version = SchemaVersion::new(key.version());
434            if latest
435                .as_ref()
436                .is_none_or(|(latest_version, _)| version > *latest_version)
437            {
438                latest = Some((version, snapshot.clone()));
439            }
440            Ok(SchemaStoreVisit::Continue)
441        });
442
443        latest
444            .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
445            .transpose()
446    }
447
448    /// Return raw schema-store footprint facts for one entity.
449    #[must_use]
450    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
451        let mut snapshots = 0u64;
452        let mut encoded_bytes = 0u64;
453        let mut latest = None::<(SchemaVersion, u64)>;
454
455        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
456            if key.entity_tag() != entity {
457                return Ok(SchemaStoreVisit::Continue);
458            }
459
460            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
461            snapshots = snapshots.saturating_add(1);
462            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
463
464            let version = SchemaVersion::new(key.version());
465            if latest
466                .as_ref()
467                .is_none_or(|(latest_version, _)| version > *latest_version)
468            {
469                latest = Some((version, snapshot_bytes));
470            }
471            Ok(SchemaStoreVisit::Continue)
472        });
473
474        SchemaStoreFootprint::new(
475            snapshots,
476            encoded_bytes,
477            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
478        )
479    }
480
481    /// Derive accepted catalog metadata from latest persisted schema snapshots.
482    ///
483    /// This function intentionally reads only the persisted schema store. It
484    /// does not reconstruct metadata from generated models when the store has
485    /// no accepted snapshots.
486    #[cfg(test)]
487    pub(in crate::db) fn catalog_metadata(
488        &self,
489    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
490        Ok(self
491            .allocation_metadata()?
492            .map(SchemaStoreAllocationMetadata::schema))
493    }
494
495    /// Derive role-specific allocation metadata from latest persisted schema
496    /// snapshots.
497    ///
498    /// This function intentionally reads only accepted schema-store payloads.
499    /// It never reconstructs metadata from generated models when the store has
500    /// no accepted snapshots.
501    pub(in crate::db) fn allocation_metadata(
502        &self,
503    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
504        let latest_by_entity = self.latest_raw_snapshots_by_entity();
505        if latest_by_entity.is_empty() {
506            return Ok(None);
507        }
508
509        Ok(Some(SchemaStoreAllocationMetadata::new(
510            derive_data_allocation_metadata(&latest_by_entity)?,
511            derive_index_allocation_metadata(&latest_by_entity)?,
512            derive_schema_catalog_metadata(&latest_by_entity)?,
513        )))
514    }
515
516    /// Insert or replace one raw schema snapshot.
517    fn insert_raw_snapshot(
518        &mut self,
519        key: RawSchemaKey,
520        snapshot: RawSchemaSnapshot,
521    ) -> Option<RawSchemaSnapshot> {
522        match &mut self.backend {
523            SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
524            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
525        }
526    }
527
528    /// Load one raw schema snapshot by key.
529    #[must_use]
530    #[cfg(test)]
531    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
532        match &self.backend {
533            SchemaStoreBackend::Stable(map) => map.get(key),
534            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
535        }
536    }
537
538    /// Return whether one schema snapshot key is present.
539    #[must_use]
540    #[cfg(test)]
541    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
542        match &self.backend {
543            SchemaStoreBackend::Stable(map) => map.contains_key(key),
544            SchemaStoreBackend::Heap(map) => map.contains_key(key),
545        }
546    }
547
548    /// Return the number of schema snapshot entries in this store.
549    #[must_use]
550    #[cfg(test)]
551    pub(in crate::db) fn len(&self) -> u64 {
552        match &self.backend {
553            SchemaStoreBackend::Stable(map) => map.len(),
554            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
555        }
556    }
557
558    /// Return whether this schema store currently has no persisted snapshots.
559    #[must_use]
560    #[cfg(test)]
561    pub(in crate::db) fn is_empty(&self) -> bool {
562        match &self.backend {
563            SchemaStoreBackend::Stable(map) => map.is_empty(),
564            SchemaStoreBackend::Heap(map) => map.is_empty(),
565        }
566    }
567
568    /// Clear all schema metadata entries from the store.
569    #[cfg(test)]
570    pub(in crate::db) fn clear(&mut self) {
571        match &mut self.backend {
572            SchemaStoreBackend::Stable(map) => map.clear_new(),
573            SchemaStoreBackend::Heap(map) => map.clear(),
574        }
575    }
576
577    fn latest_raw_snapshots_by_entity(
578        &self,
579    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
580        let mut latest_by_entity =
581            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
582
583        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
584            let version = SchemaVersion::new(key.version());
585            match latest_by_entity.get_mut(&key.entity_tag()) {
586                Some((latest_version, latest_snapshot)) if version > *latest_version => {
587                    *latest_version = version;
588                    *latest_snapshot = snapshot.clone();
589                }
590                None => {
591                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
592                }
593                Some(_) => {}
594            }
595            Ok(SchemaStoreVisit::Continue)
596        });
597
598        latest_by_entity
599    }
600
601    /// Visit raw schema snapshots in canonical store order without exposing
602    /// the backing stable-map iterator.
603    fn visit_raw_snapshots<E>(
604        &self,
605        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
606    ) -> Result<(), E> {
607        match &self.backend {
608            SchemaStoreBackend::Stable(map) => {
609                for entry in map.iter() {
610                    if visitor(entry.key(), &entry.value())?.should_stop() {
611                        break;
612                    }
613                }
614            }
615            SchemaStoreBackend::Heap(map) => {
616                for (key, snapshot) in map {
617                    if visitor(key, snapshot)?.should_stop() {
618                        break;
619                    }
620                }
621            }
622        }
623
624        Ok(())
625    }
626}
627
628fn derive_data_allocation_metadata(
629    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
630) -> Result<SchemaStoreCatalogMetadata, InternalError> {
631    let mut max_version = SchemaVersion::initial();
632    let mut hasher = new_hash_sha256();
633    write_hash_tag_u8(
634        &mut hasher,
635        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
636    );
637
638    for (entity, (_, snapshot)) in latest_by_entity {
639        let persisted = snapshot.decode_persisted_snapshot()?;
640        if persisted.version() > max_version {
641            max_version = persisted.version();
642        }
643
644        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
645            persisted.version(),
646            persisted.entity_path().to_string(),
647            persisted.entity_name().to_string(),
648            persisted.primary_key_field_ids().to_vec(),
649            persisted.row_layout().clone(),
650            persisted.fields().to_vec(),
651            Vec::new(),
652        );
653        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
654
655        write_hash_u64(&mut hasher, entity.value());
656        write_hash_u32(&mut hasher, persisted.version().get());
657        write_hash_len_u32(&mut hasher, encoded.len());
658        hasher.update(encoded);
659    }
660
661    Ok(finalize_schema_metadata(
662        max_version,
663        hasher,
664        latest_by_entity.len(),
665    ))
666}
667
668fn derive_index_allocation_metadata(
669    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
670) -> Result<SchemaStoreCatalogMetadata, InternalError> {
671    let mut max_version = SchemaVersion::initial();
672    let mut hasher = new_hash_sha256();
673    write_hash_tag_u8(
674        &mut hasher,
675        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
676    );
677
678    for (entity, (_, snapshot)) in latest_by_entity {
679        let persisted = snapshot.decode_persisted_snapshot()?;
680        if persisted.version() > max_version {
681            max_version = persisted.version();
682        }
683
684        write_hash_u64(&mut hasher, entity.value());
685        write_hash_u32(&mut hasher, persisted.version().get());
686        write_hash_len_u32(&mut hasher, persisted.indexes().len());
687        for index in persisted.indexes() {
688            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
689            write_hash_str_u32(&mut hasher, index.name());
690            write_hash_str_u32(&mut hasher, index.store());
691            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
692            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
693            match index.predicate_sql() {
694                Some(predicate_sql) => {
695                    write_hash_tag_u8(&mut hasher, 1);
696                    write_hash_str_u32(&mut hasher, predicate_sql);
697                }
698                None => write_hash_tag_u8(&mut hasher, 0),
699            }
700            hash_persisted_index_key(&mut hasher, index.key());
701        }
702    }
703
704    Ok(finalize_schema_metadata(
705        max_version,
706        hasher,
707        latest_by_entity.len(),
708    ))
709}
710
711fn derive_schema_catalog_metadata(
712    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
713) -> Result<SchemaStoreCatalogMetadata, InternalError> {
714    let mut max_version = SchemaVersion::initial();
715    let mut hasher = new_hash_sha256();
716    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
717
718    for (entity, (version, snapshot)) in latest_by_entity {
719        let persisted = snapshot.decode_persisted_snapshot()?;
720        if persisted.version() > max_version {
721            max_version = persisted.version();
722        }
723
724        write_hash_u64(&mut hasher, entity.value());
725        write_hash_u32(&mut hasher, version.get());
726        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
727        hasher.update(snapshot.as_bytes());
728    }
729
730    Ok(finalize_schema_metadata(
731        max_version,
732        hasher,
733        latest_by_entity.len(),
734    ))
735}
736
737fn finalize_schema_metadata(
738    schema_version: SchemaVersion,
739    hasher: sha2::Sha256,
740    entity_count: usize,
741) -> SchemaStoreCatalogMetadata {
742    let digest = finalize_hash_sha256(hasher);
743    let mut schema_fingerprint = [0u8; 16];
744    schema_fingerprint.copy_from_slice(&digest[..16]);
745
746    SchemaStoreCatalogMetadata::new(
747        schema_version,
748        schema_fingerprint,
749        u64::try_from(entity_count).unwrap_or(u64::MAX),
750    )
751}
752
753fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
754    match key {
755        PersistedIndexKeySnapshot::FieldPath(paths) => {
756            write_hash_tag_u8(hasher, 1);
757            write_hash_len_u32(hasher, paths.len());
758            for path in paths {
759                hash_persisted_index_field_path(hasher, path);
760            }
761        }
762        PersistedIndexKeySnapshot::Items(items) => {
763            write_hash_tag_u8(hasher, 2);
764            write_hash_len_u32(hasher, items.len());
765            for item in items {
766                match item {
767                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
768                        write_hash_tag_u8(hasher, 1);
769                        hash_persisted_index_field_path(hasher, path);
770                    }
771                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
772                        write_hash_tag_u8(hasher, 2);
773                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
774                        hash_persisted_index_field_path(hasher, expression.source());
775                        hash_persisted_field_kind(hasher, expression.input_kind());
776                        hash_persisted_field_kind(hasher, expression.output_kind());
777                        write_hash_str_u32(hasher, expression.canonical_text());
778                    }
779                }
780            }
781        }
782    }
783}
784
785fn hash_persisted_index_field_path(
786    hasher: &mut sha2::Sha256,
787    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
788) {
789    write_hash_u32(hasher, path.field_id().get());
790    write_hash_u32(hasher, u32::from(path.slot().get()));
791    write_hash_len_u32(hasher, path.path().len());
792    for segment in path.path() {
793        write_hash_str_u32(hasher, segment);
794    }
795    hash_persisted_field_kind(hasher, path.kind());
796    write_hash_tag_u8(hasher, u8::from(path.nullable()));
797}
798
799fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
800    match kind {
801        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
802        PersistedFieldKind::Blob { max_len } => {
803            write_hash_tag_u8(hasher, 2);
804            hash_optional_u32(hasher, *max_len);
805        }
806        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
807        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
808        PersistedFieldKind::Decimal { scale } => {
809            write_hash_tag_u8(hasher, 5);
810            write_hash_u32(hasher, *scale);
811        }
812        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
813        PersistedFieldKind::Enum { path, variants } => {
814            write_hash_tag_u8(hasher, 7);
815            write_hash_str_u32(hasher, path);
816            write_hash_len_u32(hasher, variants.len());
817            for variant in variants {
818                write_hash_str_u32(hasher, variant.ident());
819                match variant.payload_kind() {
820                    Some(payload_kind) => {
821                        write_hash_tag_u8(hasher, 1);
822                        hash_persisted_field_kind(hasher, payload_kind);
823                    }
824                    None => write_hash_tag_u8(hasher, 0),
825                }
826                write_hash_str_u32(
827                    hasher,
828                    field_storage_decode_name(variant.payload_storage_decode()),
829                );
830            }
831        }
832        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
833        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
834        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
835        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
836        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
837        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
838        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
839        PersistedFieldKind::IntBig { max_bytes } => {
840            write_hash_tag_u8(hasher, 15);
841            write_hash_u32(hasher, *max_bytes);
842        }
843        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
844        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
845        PersistedFieldKind::Text { max_len } => {
846            write_hash_tag_u8(hasher, 18);
847            hash_optional_u32(hasher, *max_len);
848        }
849        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
850        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
851        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
852        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
853        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
854        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
855        PersistedFieldKind::NatBig { max_bytes } => {
856            write_hash_tag_u8(hasher, 25);
857            write_hash_u32(hasher, *max_bytes);
858        }
859        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
860        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
861        PersistedFieldKind::Relation {
862            target_path,
863            target_entity_name,
864            target_entity_tag,
865            target_store_path,
866            key_kind,
867            strength,
868        } => {
869            write_hash_tag_u8(hasher, 28);
870            write_hash_str_u32(hasher, target_path);
871            write_hash_str_u32(hasher, target_entity_name);
872            write_hash_u64(hasher, target_entity_tag.value());
873            write_hash_str_u32(hasher, target_store_path);
874            hash_persisted_field_kind(hasher, key_kind);
875            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
876        }
877        PersistedFieldKind::List(inner) => {
878            write_hash_tag_u8(hasher, 29);
879            hash_persisted_field_kind(hasher, inner);
880        }
881        PersistedFieldKind::Set(inner) => {
882            write_hash_tag_u8(hasher, 30);
883            hash_persisted_field_kind(hasher, inner);
884        }
885        PersistedFieldKind::Map { key, value } => {
886            write_hash_tag_u8(hasher, 31);
887            hash_persisted_field_kind(hasher, key);
888            hash_persisted_field_kind(hasher, value);
889        }
890        PersistedFieldKind::Structured { queryable } => {
891            write_hash_tag_u8(hasher, 32);
892            write_hash_tag_u8(hasher, u8::from(*queryable));
893        }
894    }
895}
896
897fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
898    match value {
899        Some(value) => {
900            write_hash_tag_u8(hasher, 1);
901            write_hash_u32(hasher, value);
902        }
903        None => write_hash_tag_u8(hasher, 0),
904    }
905}
906
907const fn persisted_index_origin_name(
908    origin: crate::db::schema::PersistedIndexOrigin,
909) -> &'static str {
910    match origin {
911        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
912        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
913    }
914}
915
916const fn persisted_expression_op_name(
917    op: crate::db::schema::PersistedIndexExpressionOp,
918) -> &'static str {
919    match op {
920        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
921        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
922        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
923        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
924        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
925        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
926        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
927        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
928    }
929}
930
931const fn persisted_relation_strength_name(
932    strength: crate::db::schema::PersistedRelationStrength,
933) -> &'static str {
934    match strength {
935        crate::db::schema::PersistedRelationStrength::Strong => "strong",
936        crate::db::schema::PersistedRelationStrength::Weak => "weak",
937    }
938}
939
940const fn field_storage_decode_name(
941    decode: crate::model::field::FieldStorageDecode,
942) -> &'static str {
943    match decode {
944        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
945        crate::model::field::FieldStorageDecode::Value => "value",
946    }
947}
948
949///
950/// TESTS
951///
952
953#[cfg(test)]
954mod tests {
955    use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore, SchemaStoreVisit};
956    use crate::{
957        db::schema::{
958            FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
959            PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
960            PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
961            SchemaVersion, encode_persisted_schema_snapshot,
962        },
963        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
964        testing::test_memory,
965        traits::Storable,
966        types::EntityTag,
967    };
968    use std::borrow::Cow;
969    use std::convert::Infallible;
970
971    #[test]
972    fn raw_schema_key_round_trips_entity_and_version() {
973        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
974            SchemaVersion::initial()
975        });
976        let encoded = key.to_bytes().into_owned();
977        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
978
979        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
980        assert_eq!(decoded.version(), SchemaVersion::initial().get());
981    }
982
983    #[test]
984    fn raw_schema_snapshot_round_trips_payload_bytes() {
985        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
986        let encoded = snapshot.to_bytes().into_owned();
987        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
988
989        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
990        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
991    }
992
993    #[test]
994    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
995        let mut store = SchemaStore::init(test_memory(251));
996        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
997
998        assert!(store.is_empty());
999        assert!(!store.contains_raw_snapshot(&key));
1000
1001        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
1002
1003        assert_eq!(store.len(), 1);
1004        assert!(store.contains_raw_snapshot(&key));
1005        assert_eq!(
1006            store
1007                .get_raw_snapshot(&key)
1008                .expect("schema snapshot should be present")
1009                .as_bytes(),
1010            &[9, 4, 6],
1011        );
1012
1013        store.clear();
1014        assert!(store.is_empty());
1015    }
1016
1017    #[test]
1018    fn schema_store_loads_latest_snapshot_for_entity() {
1019        let mut store = SchemaStore::init(test_memory(252));
1020        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1021        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1022        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1023
1024        store
1025            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1026            .expect("initial schema snapshot should encode");
1027        store
1028            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1029            .expect("other entity schema snapshot should encode");
1030        store
1031            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1032            .expect("newer schema snapshot should encode");
1033
1034        let latest = store
1035            .latest_persisted_snapshot(EntityTag::new(41))
1036            .expect("latest schema snapshot should decode")
1037            .expect("schema snapshot should exist");
1038
1039        assert_eq!(latest.version(), SchemaVersion::new(2));
1040        assert_eq!(latest.entity_name(), "Newer");
1041    }
1042
1043    #[test]
1044    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1045        let mut store = SchemaStore::init(test_memory(242));
1046        store.insert_raw_snapshot(
1047            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1048            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1049        );
1050        store.insert_raw_snapshot(
1051            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1052            RawSchemaSnapshot::from_bytes(vec![5, 8]),
1053        );
1054        store.insert_raw_snapshot(
1055            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1056            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1057        );
1058
1059        let footprint = store.entity_footprint(EntityTag::new(71));
1060
1061        assert_eq!(footprint.snapshots(), 2);
1062        assert_eq!(footprint.encoded_bytes(), 7);
1063        assert_eq!(footprint.latest_snapshot_bytes(), 4);
1064    }
1065
1066    #[test]
1067    fn schema_store_visit_raw_snapshots_preserves_key_order() {
1068        let mut store = SchemaStore::init(test_memory(235));
1069        store.insert_raw_snapshot(
1070            RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1071            RawSchemaSnapshot::from_bytes(vec![32]),
1072        );
1073        store.insert_raw_snapshot(
1074            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1075            RawSchemaSnapshot::from_bytes(vec![13]),
1076        );
1077        store.insert_raw_snapshot(
1078            RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1079            RawSchemaSnapshot::from_bytes(vec![11]),
1080        );
1081
1082        let mut visited = Vec::new();
1083        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1084            visited.push((
1085                key.entity_tag().value(),
1086                key.version(),
1087                snapshot.as_bytes()[0],
1088            ));
1089            Ok(SchemaStoreVisit::Continue)
1090        });
1091
1092        assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1093    }
1094
1095    #[test]
1096    fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1097        let mut store = SchemaStore::init(test_memory(234));
1098        store.insert_raw_snapshot(
1099            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1100            RawSchemaSnapshot::from_bytes(vec![21]),
1101        );
1102        store.insert_raw_snapshot(
1103            RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1104            RawSchemaSnapshot::from_bytes(vec![22]),
1105        );
1106
1107        let mut visited = Vec::new();
1108        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1109            visited.push(key.version());
1110            Ok(SchemaStoreVisit::Stop)
1111        });
1112
1113        assert_eq!(visited, vec![1]);
1114    }
1115
1116    #[test]
1117    fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1118        let mut store = SchemaStore::init_heap();
1119        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1120        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1121        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1122
1123        store
1124            .insert_persisted_snapshot(EntityTag::new(41), &initial)
1125            .expect("initial heap schema snapshot should encode");
1126        store
1127            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1128            .expect("other heap schema snapshot should encode");
1129        store
1130            .insert_persisted_snapshot(EntityTag::new(41), &newer)
1131            .expect("newer heap schema snapshot should encode");
1132
1133        let latest = store
1134            .latest_persisted_snapshot(EntityTag::new(41))
1135            .expect("latest heap schema snapshot should decode")
1136            .expect("heap schema snapshot should exist");
1137        assert_eq!(latest.version(), SchemaVersion::new(2));
1138        assert_eq!(latest.entity_name(), "Newer");
1139
1140        let mut visited = Vec::new();
1141        let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1142            visited.push((
1143                key.entity_tag().value(),
1144                key.version(),
1145                snapshot.as_bytes().len(),
1146            ));
1147            Ok(if visited.len() == 2 {
1148                SchemaStoreVisit::Stop
1149            } else {
1150                SchemaStoreVisit::Continue
1151            })
1152        });
1153        assert_eq!(
1154            visited
1155                .iter()
1156                .map(|(entity, version, _)| (*entity, *version))
1157                .collect::<Vec<_>>(),
1158            vec![(41, 1), (41, 2)]
1159        );
1160    }
1161
1162    #[test]
1163    fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1164        let store = SchemaStore::init(test_memory(241));
1165
1166        assert_eq!(
1167            store
1168                .catalog_metadata()
1169                .expect("empty schema catalog metadata should derive"),
1170            None
1171        );
1172    }
1173
1174    #[test]
1175    fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1176        let mut store = SchemaStore::init(test_memory(240));
1177        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1178        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1179        let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1180
1181        store
1182            .insert_persisted_snapshot(EntityTag::new(81), &initial)
1183            .expect("initial schema snapshot should encode");
1184        let initial_metadata = store
1185            .catalog_metadata()
1186            .expect("initial schema catalog metadata should derive")
1187            .expect("initial schema catalog metadata should be present");
1188
1189        store
1190            .insert_persisted_snapshot(EntityTag::new(81), &newer)
1191            .expect("newer schema snapshot should encode");
1192        store
1193            .insert_persisted_snapshot(EntityTag::new(82), &other)
1194            .expect("other schema snapshot should encode");
1195        let updated_metadata = store
1196            .catalog_metadata()
1197            .expect("updated schema catalog metadata should derive")
1198            .expect("updated schema catalog metadata should be present");
1199
1200        assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1201        assert_eq!(initial_metadata.entity_count(), 1);
1202        assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1203        assert_eq!(updated_metadata.entity_count(), 2);
1204        assert_ne!(
1205            initial_metadata.schema_fingerprint(),
1206            updated_metadata.schema_fingerprint(),
1207            "catalog fingerprint must change when latest accepted schema catalog changes"
1208        );
1209    }
1210
1211    #[test]
1212    fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1213        let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1214        let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1215
1216        let mut left = SchemaStore::init(test_memory(239));
1217        left.insert_persisted_snapshot(EntityTag::new(91), &first)
1218            .expect("first schema snapshot should encode");
1219        left.insert_persisted_snapshot(EntityTag::new(92), &second)
1220            .expect("second schema snapshot should encode");
1221
1222        let mut right = SchemaStore::init(test_memory(238));
1223        right
1224            .insert_persisted_snapshot(EntityTag::new(92), &second)
1225            .expect("second schema snapshot should encode");
1226        right
1227            .insert_persisted_snapshot(EntityTag::new(91), &first)
1228            .expect("first schema snapshot should encode");
1229
1230        let left_metadata = left
1231            .catalog_metadata()
1232            .expect("left schema catalog metadata should derive");
1233        let right_metadata = right
1234            .catalog_metadata()
1235            .expect("right schema catalog metadata should derive");
1236
1237        assert_eq!(left_metadata, right_metadata);
1238    }
1239
1240    #[test]
1241    fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1242        let without_index =
1243            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1244        let with_index = persisted_schema_snapshot_with_index_for_test(
1245            SchemaVersion::initial(),
1246            "RoleSpecific",
1247            "payload_idx",
1248        );
1249
1250        let mut base = SchemaStore::init(test_memory(237));
1251        base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1252            .expect("base schema snapshot should encode");
1253        let base_metadata = base
1254            .allocation_metadata()
1255            .expect("base allocation metadata should derive")
1256            .expect("base allocation metadata should be present");
1257
1258        let mut indexed = SchemaStore::init(test_memory(236));
1259        indexed
1260            .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1261            .expect("indexed schema snapshot should encode");
1262        let indexed_metadata = indexed
1263            .allocation_metadata()
1264            .expect("indexed allocation metadata should derive")
1265            .expect("indexed allocation metadata should be present");
1266
1267        assert_eq!(
1268            base_metadata.data().schema_fingerprint(),
1269            indexed_metadata.data().schema_fingerprint(),
1270            "data allocation metadata should ignore accepted index catalog changes"
1271        );
1272        assert_ne!(
1273            base_metadata.index().schema_fingerprint(),
1274            indexed_metadata.index().schema_fingerprint(),
1275            "index allocation metadata should change when accepted index catalog changes"
1276        );
1277        assert_ne!(
1278            base_metadata.schema().schema_fingerprint(),
1279            indexed_metadata.schema().schema_fingerprint(),
1280            "schema allocation metadata should change when full accepted catalog changes"
1281        );
1282        assert_ne!(
1283            indexed_metadata.data().schema_fingerprint(),
1284            indexed_metadata.index().schema_fingerprint(),
1285            "data and index allocation metadata should have distinct role fingerprints"
1286        );
1287        assert_ne!(
1288            indexed_metadata.index().schema_fingerprint(),
1289            indexed_metadata.schema().schema_fingerprint(),
1290            "index and schema allocation metadata should have distinct role fingerprints"
1291        );
1292    }
1293
1294    #[test]
1295    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1296        let mut store = SchemaStore::init(test_memory(253));
1297        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1298            SchemaVersion::new(2),
1299            SchemaVersion::initial(),
1300            "Invalid",
1301        );
1302
1303        let err = store
1304            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1305            .expect_err("schema store should reject mismatched snapshot/layout versions");
1306
1307        assert!(
1308            err.message()
1309                .contains("schema snapshot row-layout version mismatch"),
1310            "schema store should preserve the version mismatch diagnostic"
1311        );
1312    }
1313
1314    #[test]
1315    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1316        let mut store = SchemaStore::init(test_memory(254));
1317        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1318        let invalid = PersistedSchemaSnapshot::new(
1319            base.version(),
1320            base.entity_path().to_string(),
1321            base.entity_name().to_string(),
1322            base.first_primary_key_field_id(),
1323            SchemaRowLayout::new(
1324                base.version(),
1325                vec![
1326                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1327                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1328                ],
1329            ),
1330            base.fields().to_vec(),
1331        );
1332
1333        let err = store
1334            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1335            .expect_err("schema store should reject divergent field/layout slots");
1336
1337        assert!(
1338            err.message()
1339                .contains("schema snapshot field slot mismatch"),
1340            "schema store should report the duplicated slot divergence"
1341        );
1342    }
1343
1344    #[test]
1345    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1346        let mut store = SchemaStore::init(test_memory(246));
1347        let base =
1348            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1349        let invalid = PersistedSchemaSnapshot::new(
1350            base.version(),
1351            base.entity_path().to_string(),
1352            base.entity_name().to_string(),
1353            base.first_primary_key_field_id(),
1354            SchemaRowLayout::new(
1355                base.version(),
1356                vec![
1357                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1358                    (FieldId::new(2), SchemaFieldSlot::new(0)),
1359                ],
1360            ),
1361            base.fields().to_vec(),
1362        );
1363
1364        let err = store
1365            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1366            .expect_err("schema store should reject duplicate row-layout slots");
1367
1368        assert!(
1369            err.message()
1370                .contains("schema snapshot duplicate row-layout slot"),
1371            "schema store should report the row-layout slot ambiguity"
1372        );
1373    }
1374
1375    #[test]
1376    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1377        let mut store = SchemaStore::init(test_memory(248));
1378        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1379        let invalid = PersistedSchemaSnapshot::new(
1380            base.version(),
1381            base.entity_path().to_string(),
1382            base.entity_name().to_string(),
1383            FieldId::new(99),
1384            base.row_layout().clone(),
1385            base.fields().to_vec(),
1386        );
1387
1388        let err = store
1389            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1390            .expect_err("schema store should reject snapshots without the primary-key field");
1391
1392        assert!(
1393            err.message()
1394                .contains("schema snapshot primary key field missing from row layout"),
1395            "schema store should report the missing primary-key field"
1396        );
1397    }
1398
1399    #[test]
1400    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1401        let mut store = SchemaStore::init(test_memory(249));
1402        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1403        let corrupt_key =
1404            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1405
1406        store
1407            .insert_persisted_snapshot(EntityTag::new(45), &initial)
1408            .expect("initial schema snapshot should encode");
1409        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1410
1411        let err = store
1412            .latest_persisted_snapshot(EntityTag::new(45))
1413            .expect_err("latest corrupt schema snapshot must fail closed");
1414
1415        assert!(
1416            err.message()
1417                .contains("failed to decode persisted schema snapshot"),
1418            "latest-version lookup should report the corrupt newest snapshot"
1419        );
1420    }
1421
1422    #[test]
1423    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1424        let mut store = SchemaStore::init(test_memory(250));
1425        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1426        let invalid = PersistedSchemaSnapshot::new(
1427            base.version(),
1428            base.entity_path().to_string(),
1429            base.entity_name().to_string(),
1430            base.first_primary_key_field_id(),
1431            SchemaRowLayout::new(
1432                base.version(),
1433                vec![
1434                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1435                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1436                ],
1437            ),
1438            base.fields().to_vec(),
1439        );
1440        let raw = encode_persisted_schema_snapshot(&invalid)
1441            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1442        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1443
1444        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1445
1446        let err = store
1447            .latest_persisted_snapshot(EntityTag::new(46))
1448            .expect_err("raw decode should reject divergent field/layout slots");
1449
1450        assert!(
1451            err.message()
1452                .contains("persisted schema snapshot field slot mismatch"),
1453            "schema codec should report the raw decoded slot divergence"
1454        );
1455    }
1456
1457    #[test]
1458    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1459        let mut store = SchemaStore::init(test_memory(247));
1460        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1461        let invalid = PersistedSchemaSnapshot::new(
1462            base.version(),
1463            base.entity_path().to_string(),
1464            base.entity_name().to_string(),
1465            FieldId::new(99),
1466            base.row_layout().clone(),
1467            base.fields().to_vec(),
1468        );
1469        let raw = encode_persisted_schema_snapshot(&invalid)
1470            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1471        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1472
1473        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1474
1475        let err = store
1476            .latest_persisted_snapshot(EntityTag::new(48))
1477            .expect_err("raw decode should reject snapshots without the primary-key field");
1478
1479        assert!(
1480            err.message()
1481                .contains("persisted schema snapshot primary key field missing from row layout"),
1482            "schema codec should report the raw decoded missing primary-key field"
1483        );
1484    }
1485
1486    #[test]
1487    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1488        let mut store = SchemaStore::init(test_memory(245));
1489        let base =
1490            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1491        let mut fields = base.fields().to_vec();
1492        let duplicate = PersistedFieldSnapshot::new(
1493            fields[1].id(),
1494            fields[0].name().to_string(),
1495            fields[1].slot(),
1496            fields[1].kind().clone(),
1497            fields[1].nested_leaves().to_vec(),
1498            fields[1].nullable(),
1499            fields[1].default().clone(),
1500            fields[1].storage_decode(),
1501            fields[1].leaf_codec(),
1502        );
1503        fields[1] = duplicate;
1504        let invalid = PersistedSchemaSnapshot::new(
1505            base.version(),
1506            base.entity_path().to_string(),
1507            base.entity_name().to_string(),
1508            base.first_primary_key_field_id(),
1509            base.row_layout().clone(),
1510            fields,
1511        );
1512        let raw = encode_persisted_schema_snapshot(&invalid)
1513            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1514        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1515
1516        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1517
1518        let err = store
1519            .latest_persisted_snapshot(EntityTag::new(50))
1520            .expect_err("raw decode should reject duplicate field names");
1521
1522        assert!(
1523            err.message()
1524                .contains("persisted schema snapshot duplicate field name"),
1525            "schema codec should report the raw decoded field-name ambiguity"
1526        );
1527    }
1528
1529    #[test]
1530    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1531        let mut store = SchemaStore::init(test_memory(244));
1532        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1533        let mut fields = base.fields().to_vec();
1534        let invalid_field = PersistedFieldSnapshot::new(
1535            fields[1].id(),
1536            fields[1].name().to_string(),
1537            fields[1].slot(),
1538            fields[1].kind().clone(),
1539            vec![PersistedNestedLeafSnapshot::new(
1540                Vec::new(),
1541                PersistedFieldKind::Blob { max_len: None },
1542                false,
1543                FieldStorageDecode::ByKind,
1544                LeafCodec::Scalar(ScalarCodec::Blob),
1545            )],
1546            fields[1].nullable(),
1547            fields[1].default().clone(),
1548            fields[1].storage_decode(),
1549            fields[1].leaf_codec(),
1550        );
1551        fields[1] = invalid_field;
1552        let invalid = PersistedSchemaSnapshot::new(
1553            base.version(),
1554            base.entity_path().to_string(),
1555            base.entity_name().to_string(),
1556            base.first_primary_key_field_id(),
1557            base.row_layout().clone(),
1558            fields,
1559        );
1560
1561        let err = store
1562            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1563            .expect_err("schema store should reject empty nested leaf paths");
1564
1565        assert!(
1566            err.message()
1567                .contains("schema snapshot empty nested leaf path"),
1568            "schema store should report the empty nested leaf path"
1569        );
1570    }
1571
1572    #[test]
1573    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1574        let mut store = SchemaStore::init(test_memory(243));
1575        let base =
1576            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1577        let mut fields = base.fields().to_vec();
1578        let duplicate_leaves = vec![
1579            PersistedNestedLeafSnapshot::new(
1580                vec!["bytes".to_string()],
1581                PersistedFieldKind::Blob { max_len: None },
1582                false,
1583                FieldStorageDecode::ByKind,
1584                LeafCodec::Scalar(ScalarCodec::Blob),
1585            ),
1586            PersistedNestedLeafSnapshot::new(
1587                vec!["bytes".to_string()],
1588                PersistedFieldKind::Text { max_len: None },
1589                false,
1590                FieldStorageDecode::ByKind,
1591                LeafCodec::Scalar(ScalarCodec::Text),
1592            ),
1593        ];
1594        let invalid_field = PersistedFieldSnapshot::new(
1595            fields[1].id(),
1596            fields[1].name().to_string(),
1597            fields[1].slot(),
1598            fields[1].kind().clone(),
1599            duplicate_leaves,
1600            fields[1].nullable(),
1601            fields[1].default().clone(),
1602            fields[1].storage_decode(),
1603            fields[1].leaf_codec(),
1604        );
1605        fields[1] = invalid_field;
1606        let invalid = PersistedSchemaSnapshot::new(
1607            base.version(),
1608            base.entity_path().to_string(),
1609            base.entity_name().to_string(),
1610            base.first_primary_key_field_id(),
1611            base.row_layout().clone(),
1612            fields,
1613        );
1614        let raw = encode_persisted_schema_snapshot(&invalid)
1615            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1616        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1617
1618        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1619
1620        let err = store
1621            .latest_persisted_snapshot(EntityTag::new(52))
1622            .expect_err("raw decode should reject duplicate nested leaf paths");
1623
1624        assert!(
1625            err.message()
1626                .contains("persisted schema snapshot duplicate nested leaf path"),
1627            "schema codec should report the raw decoded nested path ambiguity"
1628        );
1629    }
1630
1631    #[test]
1632    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1633        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1634
1635        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1636            .expect("schema snapshot should encode");
1637        let decoded = raw
1638            .decode_persisted_snapshot()
1639            .expect("schema snapshot should decode");
1640
1641        assert_eq!(decoded, snapshot);
1642    }
1643
1644    // Build one typed schema snapshot used by schema-store tests. The exact
1645    // field contracts are intentionally rich enough to cover nested metadata,
1646    // scalar codecs, and structural fallback payloads through the raw store.
1647    fn persisted_schema_snapshot_for_test(
1648        version: SchemaVersion,
1649        entity_name: &str,
1650    ) -> PersistedSchemaSnapshot {
1651        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1652    }
1653
1654    fn persisted_schema_snapshot_with_index_for_test(
1655        version: SchemaVersion,
1656        entity_name: &str,
1657        index_name: &str,
1658    ) -> PersistedSchemaSnapshot {
1659        let base = persisted_schema_snapshot_for_test(version, entity_name);
1660
1661        PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1662            base.version(),
1663            base.entity_path().to_string(),
1664            base.entity_name().to_string(),
1665            base.primary_key_field_ids().to_vec(),
1666            base.row_layout().clone(),
1667            base.fields().to_vec(),
1668            vec![PersistedIndexSnapshot::new(
1669                0,
1670                index_name.to_string(),
1671                "RoleSpecificStore".to_string(),
1672                false,
1673                PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1674                    FieldId::new(1),
1675                    SchemaFieldSlot::new(0),
1676                    vec!["id".to_string()],
1677                    PersistedFieldKind::Ulid,
1678                    false,
1679                )]),
1680                None,
1681            )],
1682        )
1683    }
1684
1685    // Build one typed schema snapshot with independently selectable snapshot
1686    // and row-layout versions. Production snapshots should keep these aligned;
1687    // tests can deliberately break that invariant at the store boundary.
1688    fn persisted_schema_snapshot_with_layout_version_for_test(
1689        version: SchemaVersion,
1690        layout_version: SchemaVersion,
1691        entity_name: &str,
1692    ) -> PersistedSchemaSnapshot {
1693        PersistedSchemaSnapshot::new(
1694            version,
1695            format!("entities::{entity_name}"),
1696            entity_name.to_string(),
1697            FieldId::new(1),
1698            SchemaRowLayout::new(
1699                layout_version,
1700                vec![
1701                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1702                    (FieldId::new(2), SchemaFieldSlot::new(1)),
1703                ],
1704            ),
1705            vec![
1706                PersistedFieldSnapshot::new(
1707                    FieldId::new(1),
1708                    "id".to_string(),
1709                    SchemaFieldSlot::new(0),
1710                    PersistedFieldKind::Ulid,
1711                    Vec::new(),
1712                    false,
1713                    SchemaFieldDefault::None,
1714                    FieldStorageDecode::ByKind,
1715                    LeafCodec::Scalar(ScalarCodec::Ulid),
1716                ),
1717                PersistedFieldSnapshot::new(
1718                    FieldId::new(2),
1719                    "payload".to_string(),
1720                    SchemaFieldSlot::new(1),
1721                    PersistedFieldKind::Map {
1722                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
1723                        value: Box::new(PersistedFieldKind::List(Box::new(
1724                            PersistedFieldKind::Nat64,
1725                        ))),
1726                    },
1727                    vec![PersistedNestedLeafSnapshot::new(
1728                        vec!["bytes".to_string()],
1729                        PersistedFieldKind::Blob { max_len: None },
1730                        false,
1731                        FieldStorageDecode::ByKind,
1732                        LeafCodec::Scalar(ScalarCodec::Blob),
1733                    )],
1734                    false,
1735                    SchemaFieldDefault::None,
1736                    FieldStorageDecode::ByKind,
1737                    LeafCodec::StructuralFallback,
1738                ),
1739            ],
1740        )
1741    }
1742}