Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        schema::{
14            PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15            PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17        },
18    },
19    error::InternalError,
20    traits::Storable,
21    types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{BTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory};
25use sha2::Digest;
26use std::borrow::Cow;
27use std::collections::BTreeMap as StdBTreeMap;
28
29const SCHEMA_KEY_BYTES_USIZE: usize = 12;
30const SCHEMA_KEY_BYTES: u32 = 12;
31const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
32const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
33const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
34const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
35
36///
37/// RawSchemaKey
38///
39/// Stable key for one persisted schema snapshot entry.
40/// It combines the entity tag and schema version so reconciliation can load
41/// concrete versions without depending on generated entity names.
42///
43
44#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
45struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
46
47impl RawSchemaKey {
48    /// Build the raw persisted key for one entity schema version.
49    #[must_use]
50    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
51        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
52        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
53        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
54
55        Self(out)
56    }
57
58    /// Return the entity tag encoded in this schema key.
59    #[must_use]
60    fn entity_tag(self) -> EntityTag {
61        let mut bytes = [0u8; size_of::<u64>()];
62        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
63
64        EntityTag::new(u64::from_be_bytes(bytes))
65    }
66
67    /// Return the schema version encoded in this schema key.
68    #[must_use]
69    fn version(self) -> u32 {
70        let mut bytes = [0u8; size_of::<u32>()];
71        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
72
73        u32::from_be_bytes(bytes)
74    }
75}
76
77impl Storable for RawSchemaKey {
78    fn to_bytes(&self) -> Cow<'_, [u8]> {
79        Cow::Borrowed(&self.0)
80    }
81
82    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
83        debug_assert_eq!(
84            bytes.len(),
85            SCHEMA_KEY_BYTES_USIZE,
86            "RawSchemaKey::from_bytes received unexpected byte length",
87        );
88
89        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
90            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
91        }
92
93        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
94        out.copy_from_slice(bytes.as_ref());
95        Self(out)
96    }
97
98    fn into_bytes(self) -> Vec<u8> {
99        self.0.to_vec()
100    }
101
102    const BOUND: Bound = Bound::Bounded {
103        max_size: SCHEMA_KEY_BYTES,
104        is_fixed_size: true,
105    };
106}
107
108///
109/// RawSchemaSnapshot
110///
111/// Raw persisted schema snapshot payload.
112/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
113/// keeping the stable-memory value boundary independent from the typed schema
114/// DTOs used by reconciliation.
115///
116
117#[derive(Clone, Debug, Eq, PartialEq)]
118struct RawSchemaSnapshot(Vec<u8>);
119
120impl RawSchemaSnapshot {
121    /// Encode one typed persisted-schema snapshot into a raw store payload.
122    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
123        validate_typed_schema_snapshot_for_store(snapshot)?;
124
125        encode_persisted_schema_snapshot(snapshot).map(Self)
126    }
127
128    /// Build one raw schema snapshot from already-encoded bytes.
129    #[must_use]
130    #[cfg(test)]
131    const fn from_bytes(bytes: Vec<u8>) -> Self {
132        Self(bytes)
133    }
134
135    /// Borrow the encoded schema snapshot payload.
136    #[must_use]
137    const fn as_bytes(&self) -> &[u8] {
138        self.0.as_slice()
139    }
140
141    /// Consume the snapshot into its encoded payload bytes.
142    #[must_use]
143    #[cfg(test)]
144    fn into_bytes(self) -> Vec<u8> {
145        self.0
146    }
147
148    /// Decode this raw store payload into a typed persisted-schema snapshot.
149    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
150        decode_persisted_schema_snapshot(self.as_bytes())
151    }
152}
153
154impl Storable for RawSchemaSnapshot {
155    fn to_bytes(&self) -> Cow<'_, [u8]> {
156        Cow::Borrowed(self.as_bytes())
157    }
158
159    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
160        Self(bytes.into_owned())
161    }
162
163    fn into_bytes(self) -> Vec<u8> {
164        self.0
165    }
166
167    const BOUND: Bound = Bound::Bounded {
168        max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
169        is_fixed_size: false,
170    };
171}
172
173// Validate typed schema snapshots before they are encoded into the raw schema
174// metadata store. This catches caller-side invariant violations separately from
175// raw persisted-byte corruption handled by the codec decode boundary.
176fn validate_typed_schema_snapshot_for_store(
177    snapshot: &PersistedSchemaSnapshot,
178) -> Result<(), InternalError> {
179    if let Some(detail) = schema_snapshot_integrity_detail(
180        "schema snapshot",
181        snapshot.version(),
182        snapshot.primary_key_field_ids(),
183        snapshot.row_layout(),
184        snapshot.fields(),
185    ) {
186        return Err(InternalError::store_invariant(detail));
187    }
188
189    Ok(())
190}
191
192///
193/// SchemaStoreFootprint
194///
195/// Current raw schema metadata footprint for one entity. Reconciliation uses
196/// this value to report stable-memory pressure without decoding schema payloads
197/// or exposing field-level metadata through metrics.
198///
199
200#[derive(Clone, Copy, Debug, Eq, PartialEq)]
201pub(in crate::db) struct SchemaStoreFootprint {
202    snapshots: u64,
203    encoded_bytes: u64,
204    latest_snapshot_bytes: u64,
205}
206
207///
208/// SchemaStoreCatalogMetadata
209///
210/// Accepted schema-store catalog metadata derived from latest persisted
211/// snapshots. This is diagnostic allocation metadata, not allocation identity.
212///
213
214#[derive(Clone, Copy, Debug, Eq, PartialEq)]
215pub(in crate::db) struct SchemaStoreCatalogMetadata {
216    schema_version: SchemaVersion,
217    schema_fingerprint: CommitSchemaFingerprint,
218    entity_count: u64,
219}
220
221impl SchemaStoreCatalogMetadata {
222    /// Build catalog metadata from already-derived accepted schema facts.
223    #[must_use]
224    const fn new(
225        schema_version: SchemaVersion,
226        schema_fingerprint: CommitSchemaFingerprint,
227        entity_count: u64,
228    ) -> Self {
229        Self {
230            schema_version,
231            schema_fingerprint,
232            entity_count,
233        }
234    }
235
236    /// Return the maximum latest schema version represented in the catalog.
237    #[must_use]
238    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
239        self.schema_version
240    }
241
242    /// Return the deterministic catalog fingerprint for latest accepted
243    /// snapshots.
244    #[must_use]
245    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
246        self.schema_fingerprint
247    }
248
249    /// Return number of entity schemas represented in this catalog metadata.
250    #[must_use]
251    pub(in crate::db) const fn entity_count(self) -> u64 {
252        self.entity_count
253    }
254}
255
256///
257/// SchemaStoreAllocationMetadata
258///
259/// Role-specific allocation metadata derived from latest accepted schema-store
260/// snapshots. These fingerprints describe the accepted contract that owns each
261/// allocation role; they are diagnostics, not allocation identity.
262///
263
264#[derive(Clone, Copy, Debug, Eq, PartialEq)]
265pub(in crate::db) struct SchemaStoreAllocationMetadata {
266    data: SchemaStoreCatalogMetadata,
267    index: SchemaStoreCatalogMetadata,
268    schema: SchemaStoreCatalogMetadata,
269}
270
271impl SchemaStoreAllocationMetadata {
272    /// Build one role-specific metadata set from already-derived accepted
273    /// schema facts.
274    #[must_use]
275    const fn new(
276        data: SchemaStoreCatalogMetadata,
277        index: SchemaStoreCatalogMetadata,
278        schema: SchemaStoreCatalogMetadata,
279    ) -> Self {
280        Self {
281            data,
282            index,
283            schema,
284        }
285    }
286
287    /// Return accepted row-layout allocation metadata for data memory.
288    #[must_use]
289    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
290        self.data
291    }
292
293    /// Return accepted index-catalog allocation metadata for index memory.
294    #[must_use]
295    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
296        self.index
297    }
298
299    /// Return accepted full schema-catalog allocation metadata for schema
300    /// memory.
301    #[must_use]
302    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
303        self.schema
304    }
305}
306
307impl SchemaStoreFootprint {
308    /// Build one schema-store footprint from already-counted raw payload facts.
309    #[must_use]
310    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
311        Self {
312            snapshots,
313            encoded_bytes,
314            latest_snapshot_bytes,
315        }
316    }
317
318    /// Return the number of raw schema snapshots stored for the entity.
319    #[must_use]
320    pub(in crate::db) const fn snapshots(self) -> u64 {
321        self.snapshots
322    }
323
324    /// Return the total encoded payload bytes stored for the entity.
325    #[must_use]
326    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
327        self.encoded_bytes
328    }
329
330    /// Return the encoded payload bytes for the highest-version snapshot.
331    #[must_use]
332    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
333        self.latest_snapshot_bytes
334    }
335}
336
337///
338/// SchemaStore
339///
340/// Thin persistence wrapper over one stable schema metadata BTreeMap.
341/// Startup reconciliation writes and validates encoded schema snapshots here
342/// before row/index operations proceed.
343///
344
345pub struct SchemaStore {
346    map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
347}
348
349impl SchemaStore {
350    /// Initialize the schema store with the provided backing memory.
351    #[must_use]
352    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
353        Self {
354            map: BTreeMap::init(memory),
355        }
356    }
357
358    /// Insert or replace one typed persisted schema snapshot.
359    pub(in crate::db) fn insert_persisted_snapshot(
360        &mut self,
361        entity: EntityTag,
362        snapshot: &PersistedSchemaSnapshot,
363    ) -> Result<(), InternalError> {
364        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
365        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
366        let _ = self.insert_raw_snapshot(key, raw_snapshot);
367
368        Ok(())
369    }
370
371    /// Load and decode one typed persisted schema snapshot.
372    #[cfg(test)]
373    pub(in crate::db) fn get_persisted_snapshot(
374        &self,
375        entity: EntityTag,
376        version: SchemaVersion,
377    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
378        let key = RawSchemaKey::from_entity_version(entity, version);
379        self.get_raw_snapshot(&key)
380            .map(|snapshot| snapshot.decode_persisted_snapshot())
381            .transpose()
382    }
383
384    /// Load and decode the highest stored schema snapshot version for one entity.
385    pub(in crate::db) fn latest_persisted_snapshot(
386        &self,
387        entity: EntityTag,
388    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
389        let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
390        for entry in self.map.iter() {
391            let (key, snapshot) = entry.into_pair();
392            if key.entity_tag() != entity {
393                continue;
394            }
395
396            let version = SchemaVersion::new(key.version());
397            if latest
398                .as_ref()
399                .is_none_or(|(latest_version, _)| version > *latest_version)
400            {
401                latest = Some((version, snapshot));
402            }
403        }
404
405        latest
406            .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
407            .transpose()
408    }
409
410    /// Return raw schema-store footprint facts for one entity.
411    #[must_use]
412    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
413        let mut snapshots = 0u64;
414        let mut encoded_bytes = 0u64;
415        let mut latest = None::<(SchemaVersion, u64)>;
416
417        for entry in self.map.iter() {
418            let (key, snapshot) = entry.into_pair();
419            if key.entity_tag() != entity {
420                continue;
421            }
422
423            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
424            snapshots = snapshots.saturating_add(1);
425            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
426
427            let version = SchemaVersion::new(key.version());
428            if latest
429                .as_ref()
430                .is_none_or(|(latest_version, _)| version > *latest_version)
431            {
432                latest = Some((version, snapshot_bytes));
433            }
434        }
435
436        SchemaStoreFootprint::new(
437            snapshots,
438            encoded_bytes,
439            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
440        )
441    }
442
443    /// Derive accepted catalog metadata from latest persisted schema snapshots.
444    ///
445    /// This function intentionally reads only the persisted schema store. It
446    /// does not reconstruct metadata from generated models when the store has
447    /// no accepted snapshots.
448    #[cfg(test)]
449    pub(in crate::db) fn catalog_metadata(
450        &self,
451    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
452        Ok(self
453            .allocation_metadata()?
454            .map(SchemaStoreAllocationMetadata::schema))
455    }
456
457    /// Derive role-specific allocation metadata from latest persisted schema
458    /// snapshots.
459    ///
460    /// This function intentionally reads only accepted schema-store payloads.
461    /// It never reconstructs metadata from generated models when the store has
462    /// no accepted snapshots.
463    pub(in crate::db) fn allocation_metadata(
464        &self,
465    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
466        let latest_by_entity = self.latest_raw_snapshots_by_entity();
467        if latest_by_entity.is_empty() {
468            return Ok(None);
469        }
470
471        Ok(Some(SchemaStoreAllocationMetadata::new(
472            derive_data_allocation_metadata(&latest_by_entity)?,
473            derive_index_allocation_metadata(&latest_by_entity)?,
474            derive_schema_catalog_metadata(&latest_by_entity)?,
475        )))
476    }
477
478    /// Insert or replace one raw schema snapshot.
479    fn insert_raw_snapshot(
480        &mut self,
481        key: RawSchemaKey,
482        snapshot: RawSchemaSnapshot,
483    ) -> Option<RawSchemaSnapshot> {
484        self.map.insert(key, snapshot)
485    }
486
487    /// Load one raw schema snapshot by key.
488    #[must_use]
489    #[cfg(test)]
490    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
491        self.map.get(key)
492    }
493
494    /// Return whether one schema snapshot key is present.
495    #[must_use]
496    #[cfg(test)]
497    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
498        self.map.contains_key(key)
499    }
500
501    /// Return the number of schema snapshot entries in this store.
502    #[must_use]
503    #[cfg(test)]
504    pub(in crate::db) fn len(&self) -> u64 {
505        self.map.len()
506    }
507
508    /// Return whether this schema store currently has no persisted snapshots.
509    #[must_use]
510    #[cfg(test)]
511    pub(in crate::db) fn is_empty(&self) -> bool {
512        self.map.is_empty()
513    }
514
515    /// Clear all schema metadata entries from the store.
516    #[cfg(test)]
517    pub(in crate::db) fn clear(&mut self) {
518        self.map.clear_new();
519    }
520
521    fn latest_raw_snapshots_by_entity(
522        &self,
523    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
524        let mut latest_by_entity =
525            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
526
527        for entry in self.map.iter() {
528            let (key, snapshot) = entry.into_pair();
529            let version = SchemaVersion::new(key.version());
530            match latest_by_entity.get_mut(&key.entity_tag()) {
531                Some((latest_version, latest_snapshot)) if version > *latest_version => {
532                    *latest_version = version;
533                    *latest_snapshot = snapshot;
534                }
535                None => {
536                    latest_by_entity.insert(key.entity_tag(), (version, snapshot));
537                }
538                Some(_) => {}
539            }
540        }
541
542        latest_by_entity
543    }
544}
545
546fn derive_data_allocation_metadata(
547    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
548) -> Result<SchemaStoreCatalogMetadata, InternalError> {
549    let mut max_version = SchemaVersion::initial();
550    let mut hasher = new_hash_sha256();
551    write_hash_tag_u8(
552        &mut hasher,
553        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
554    );
555
556    for (entity, (_, snapshot)) in latest_by_entity {
557        let persisted = snapshot.decode_persisted_snapshot()?;
558        if persisted.version() > max_version {
559            max_version = persisted.version();
560        }
561
562        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
563            persisted.version(),
564            persisted.entity_path().to_string(),
565            persisted.entity_name().to_string(),
566            persisted.primary_key_field_ids().to_vec(),
567            persisted.row_layout().clone(),
568            persisted.fields().to_vec(),
569            Vec::new(),
570        );
571        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
572
573        write_hash_u64(&mut hasher, entity.value());
574        write_hash_u32(&mut hasher, persisted.version().get());
575        write_hash_len_u32(&mut hasher, encoded.len());
576        hasher.update(encoded);
577    }
578
579    Ok(finalize_schema_metadata(
580        max_version,
581        hasher,
582        latest_by_entity.len(),
583    ))
584}
585
586fn derive_index_allocation_metadata(
587    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
588) -> Result<SchemaStoreCatalogMetadata, InternalError> {
589    let mut max_version = SchemaVersion::initial();
590    let mut hasher = new_hash_sha256();
591    write_hash_tag_u8(
592        &mut hasher,
593        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
594    );
595
596    for (entity, (_, snapshot)) in latest_by_entity {
597        let persisted = snapshot.decode_persisted_snapshot()?;
598        if persisted.version() > max_version {
599            max_version = persisted.version();
600        }
601
602        write_hash_u64(&mut hasher, entity.value());
603        write_hash_u32(&mut hasher, persisted.version().get());
604        write_hash_len_u32(&mut hasher, persisted.indexes().len());
605        for index in persisted.indexes() {
606            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
607            write_hash_str_u32(&mut hasher, index.name());
608            write_hash_str_u32(&mut hasher, index.store());
609            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
610            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
611            match index.predicate_sql() {
612                Some(predicate_sql) => {
613                    write_hash_tag_u8(&mut hasher, 1);
614                    write_hash_str_u32(&mut hasher, predicate_sql);
615                }
616                None => write_hash_tag_u8(&mut hasher, 0),
617            }
618            hash_persisted_index_key(&mut hasher, index.key());
619        }
620    }
621
622    Ok(finalize_schema_metadata(
623        max_version,
624        hasher,
625        latest_by_entity.len(),
626    ))
627}
628
629fn derive_schema_catalog_metadata(
630    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
631) -> Result<SchemaStoreCatalogMetadata, InternalError> {
632    let mut max_version = SchemaVersion::initial();
633    let mut hasher = new_hash_sha256();
634    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
635
636    for (entity, (version, snapshot)) in latest_by_entity {
637        let persisted = snapshot.decode_persisted_snapshot()?;
638        if persisted.version() > max_version {
639            max_version = persisted.version();
640        }
641
642        write_hash_u64(&mut hasher, entity.value());
643        write_hash_u32(&mut hasher, version.get());
644        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
645        hasher.update(snapshot.as_bytes());
646    }
647
648    Ok(finalize_schema_metadata(
649        max_version,
650        hasher,
651        latest_by_entity.len(),
652    ))
653}
654
655fn finalize_schema_metadata(
656    schema_version: SchemaVersion,
657    hasher: sha2::Sha256,
658    entity_count: usize,
659) -> SchemaStoreCatalogMetadata {
660    let digest = finalize_hash_sha256(hasher);
661    let mut schema_fingerprint = [0u8; 16];
662    schema_fingerprint.copy_from_slice(&digest[..16]);
663
664    SchemaStoreCatalogMetadata::new(
665        schema_version,
666        schema_fingerprint,
667        u64::try_from(entity_count).unwrap_or(u64::MAX),
668    )
669}
670
671fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
672    match key {
673        PersistedIndexKeySnapshot::FieldPath(paths) => {
674            write_hash_tag_u8(hasher, 1);
675            write_hash_len_u32(hasher, paths.len());
676            for path in paths {
677                hash_persisted_index_field_path(hasher, path);
678            }
679        }
680        PersistedIndexKeySnapshot::Items(items) => {
681            write_hash_tag_u8(hasher, 2);
682            write_hash_len_u32(hasher, items.len());
683            for item in items {
684                match item {
685                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
686                        write_hash_tag_u8(hasher, 1);
687                        hash_persisted_index_field_path(hasher, path);
688                    }
689                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
690                        write_hash_tag_u8(hasher, 2);
691                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
692                        hash_persisted_index_field_path(hasher, expression.source());
693                        hash_persisted_field_kind(hasher, expression.input_kind());
694                        hash_persisted_field_kind(hasher, expression.output_kind());
695                        write_hash_str_u32(hasher, expression.canonical_text());
696                    }
697                }
698            }
699        }
700    }
701}
702
703fn hash_persisted_index_field_path(
704    hasher: &mut sha2::Sha256,
705    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
706) {
707    write_hash_u32(hasher, path.field_id().get());
708    write_hash_u32(hasher, u32::from(path.slot().get()));
709    write_hash_len_u32(hasher, path.path().len());
710    for segment in path.path() {
711        write_hash_str_u32(hasher, segment);
712    }
713    hash_persisted_field_kind(hasher, path.kind());
714    write_hash_tag_u8(hasher, u8::from(path.nullable()));
715}
716
717fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
718    match kind {
719        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
720        PersistedFieldKind::Blob { max_len } => {
721            write_hash_tag_u8(hasher, 2);
722            hash_optional_u32(hasher, *max_len);
723        }
724        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
725        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
726        PersistedFieldKind::Decimal { scale } => {
727            write_hash_tag_u8(hasher, 5);
728            write_hash_u32(hasher, *scale);
729        }
730        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
731        PersistedFieldKind::Enum { path, variants } => {
732            write_hash_tag_u8(hasher, 7);
733            write_hash_str_u32(hasher, path);
734            write_hash_len_u32(hasher, variants.len());
735            for variant in variants {
736                write_hash_str_u32(hasher, variant.ident());
737                match variant.payload_kind() {
738                    Some(payload_kind) => {
739                        write_hash_tag_u8(hasher, 1);
740                        hash_persisted_field_kind(hasher, payload_kind);
741                    }
742                    None => write_hash_tag_u8(hasher, 0),
743                }
744                write_hash_str_u32(
745                    hasher,
746                    field_storage_decode_name(variant.payload_storage_decode()),
747                );
748            }
749        }
750        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
751        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
752        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
753        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
754        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
755        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
756        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
757        PersistedFieldKind::IntBig { max_bytes } => {
758            write_hash_tag_u8(hasher, 15);
759            write_hash_u32(hasher, *max_bytes);
760        }
761        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
762        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
763        PersistedFieldKind::Text { max_len } => {
764            write_hash_tag_u8(hasher, 18);
765            hash_optional_u32(hasher, *max_len);
766        }
767        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
768        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
769        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
770        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
771        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
772        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
773        PersistedFieldKind::NatBig { max_bytes } => {
774            write_hash_tag_u8(hasher, 25);
775            write_hash_u32(hasher, *max_bytes);
776        }
777        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
778        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
779        PersistedFieldKind::Relation {
780            target_path,
781            target_entity_name,
782            target_entity_tag,
783            target_store_path,
784            key_kind,
785            strength,
786        } => {
787            write_hash_tag_u8(hasher, 28);
788            write_hash_str_u32(hasher, target_path);
789            write_hash_str_u32(hasher, target_entity_name);
790            write_hash_u64(hasher, target_entity_tag.value());
791            write_hash_str_u32(hasher, target_store_path);
792            hash_persisted_field_kind(hasher, key_kind);
793            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
794        }
795        PersistedFieldKind::List(inner) => {
796            write_hash_tag_u8(hasher, 29);
797            hash_persisted_field_kind(hasher, inner);
798        }
799        PersistedFieldKind::Set(inner) => {
800            write_hash_tag_u8(hasher, 30);
801            hash_persisted_field_kind(hasher, inner);
802        }
803        PersistedFieldKind::Map { key, value } => {
804            write_hash_tag_u8(hasher, 31);
805            hash_persisted_field_kind(hasher, key);
806            hash_persisted_field_kind(hasher, value);
807        }
808        PersistedFieldKind::Structured { queryable } => {
809            write_hash_tag_u8(hasher, 32);
810            write_hash_tag_u8(hasher, u8::from(*queryable));
811        }
812    }
813}
814
815fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
816    match value {
817        Some(value) => {
818            write_hash_tag_u8(hasher, 1);
819            write_hash_u32(hasher, value);
820        }
821        None => write_hash_tag_u8(hasher, 0),
822    }
823}
824
825const fn persisted_index_origin_name(
826    origin: crate::db::schema::PersistedIndexOrigin,
827) -> &'static str {
828    match origin {
829        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
830        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
831    }
832}
833
834const fn persisted_expression_op_name(
835    op: crate::db::schema::PersistedIndexExpressionOp,
836) -> &'static str {
837    match op {
838        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
839        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
840        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
841        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
842        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
843        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
844        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
845        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
846    }
847}
848
849const fn persisted_relation_strength_name(
850    strength: crate::db::schema::PersistedRelationStrength,
851) -> &'static str {
852    match strength {
853        crate::db::schema::PersistedRelationStrength::Strong => "strong",
854        crate::db::schema::PersistedRelationStrength::Weak => "weak",
855    }
856}
857
858const fn field_storage_decode_name(
859    decode: crate::model::field::FieldStorageDecode,
860) -> &'static str {
861    match decode {
862        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
863        crate::model::field::FieldStorageDecode::Value => "value",
864    }
865}
866
867///
868/// TESTS
869///
870
871#[cfg(test)]
872mod tests {
873    use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
874    use crate::{
875        db::schema::{
876            FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
877            PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
878            PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
879            SchemaVersion, encode_persisted_schema_snapshot,
880        },
881        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
882        testing::test_memory,
883        traits::Storable,
884        types::EntityTag,
885    };
886    use std::borrow::Cow;
887
888    #[test]
889    fn raw_schema_key_round_trips_entity_and_version() {
890        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
891            SchemaVersion::initial()
892        });
893        let encoded = key.to_bytes().into_owned();
894        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
895
896        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
897        assert_eq!(decoded.version(), SchemaVersion::initial().get());
898    }
899
900    #[test]
901    fn raw_schema_snapshot_round_trips_payload_bytes() {
902        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
903        let encoded = snapshot.to_bytes().into_owned();
904        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
905
906        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
907        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
908    }
909
910    #[test]
911    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
912        let mut store = SchemaStore::init(test_memory(251));
913        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
914
915        assert!(store.is_empty());
916        assert!(!store.contains_raw_snapshot(&key));
917
918        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
919
920        assert_eq!(store.len(), 1);
921        assert!(store.contains_raw_snapshot(&key));
922        assert_eq!(
923            store
924                .get_raw_snapshot(&key)
925                .expect("schema snapshot should be present")
926                .as_bytes(),
927            &[9, 4, 6],
928        );
929
930        store.clear();
931        assert!(store.is_empty());
932    }
933
934    #[test]
935    fn schema_store_loads_latest_snapshot_for_entity() {
936        let mut store = SchemaStore::init(test_memory(252));
937        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
938        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
939        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
940
941        store
942            .insert_persisted_snapshot(EntityTag::new(41), &initial)
943            .expect("initial schema snapshot should encode");
944        store
945            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
946            .expect("other entity schema snapshot should encode");
947        store
948            .insert_persisted_snapshot(EntityTag::new(41), &newer)
949            .expect("newer schema snapshot should encode");
950
951        let latest = store
952            .latest_persisted_snapshot(EntityTag::new(41))
953            .expect("latest schema snapshot should decode")
954            .expect("schema snapshot should exist");
955
956        assert_eq!(latest.version(), SchemaVersion::new(2));
957        assert_eq!(latest.entity_name(), "Newer");
958    }
959
960    #[test]
961    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
962        let mut store = SchemaStore::init(test_memory(242));
963        store.insert_raw_snapshot(
964            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
965            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
966        );
967        store.insert_raw_snapshot(
968            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
969            RawSchemaSnapshot::from_bytes(vec![5, 8]),
970        );
971        store.insert_raw_snapshot(
972            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
973            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
974        );
975
976        let footprint = store.entity_footprint(EntityTag::new(71));
977
978        assert_eq!(footprint.snapshots(), 2);
979        assert_eq!(footprint.encoded_bytes(), 7);
980        assert_eq!(footprint.latest_snapshot_bytes(), 4);
981    }
982
983    #[test]
984    fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
985        let store = SchemaStore::init(test_memory(241));
986
987        assert_eq!(
988            store
989                .catalog_metadata()
990                .expect("empty schema catalog metadata should derive"),
991            None
992        );
993    }
994
995    #[test]
996    fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
997        let mut store = SchemaStore::init(test_memory(240));
998        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
999        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1000        let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1001
1002        store
1003            .insert_persisted_snapshot(EntityTag::new(81), &initial)
1004            .expect("initial schema snapshot should encode");
1005        let initial_metadata = store
1006            .catalog_metadata()
1007            .expect("initial schema catalog metadata should derive")
1008            .expect("initial schema catalog metadata should be present");
1009
1010        store
1011            .insert_persisted_snapshot(EntityTag::new(81), &newer)
1012            .expect("newer schema snapshot should encode");
1013        store
1014            .insert_persisted_snapshot(EntityTag::new(82), &other)
1015            .expect("other schema snapshot should encode");
1016        let updated_metadata = store
1017            .catalog_metadata()
1018            .expect("updated schema catalog metadata should derive")
1019            .expect("updated schema catalog metadata should be present");
1020
1021        assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1022        assert_eq!(initial_metadata.entity_count(), 1);
1023        assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1024        assert_eq!(updated_metadata.entity_count(), 2);
1025        assert_ne!(
1026            initial_metadata.schema_fingerprint(),
1027            updated_metadata.schema_fingerprint(),
1028            "catalog fingerprint must change when latest accepted schema catalog changes"
1029        );
1030    }
1031
1032    #[test]
1033    fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1034        let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1035        let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1036
1037        let mut left = SchemaStore::init(test_memory(239));
1038        left.insert_persisted_snapshot(EntityTag::new(91), &first)
1039            .expect("first schema snapshot should encode");
1040        left.insert_persisted_snapshot(EntityTag::new(92), &second)
1041            .expect("second schema snapshot should encode");
1042
1043        let mut right = SchemaStore::init(test_memory(238));
1044        right
1045            .insert_persisted_snapshot(EntityTag::new(92), &second)
1046            .expect("second schema snapshot should encode");
1047        right
1048            .insert_persisted_snapshot(EntityTag::new(91), &first)
1049            .expect("first schema snapshot should encode");
1050
1051        let left_metadata = left
1052            .catalog_metadata()
1053            .expect("left schema catalog metadata should derive");
1054        let right_metadata = right
1055            .catalog_metadata()
1056            .expect("right schema catalog metadata should derive");
1057
1058        assert_eq!(left_metadata, right_metadata);
1059    }
1060
1061    #[test]
1062    fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1063        let without_index =
1064            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1065        let with_index = persisted_schema_snapshot_with_index_for_test(
1066            SchemaVersion::initial(),
1067            "RoleSpecific",
1068            "payload_idx",
1069        );
1070
1071        let mut base = SchemaStore::init(test_memory(237));
1072        base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1073            .expect("base schema snapshot should encode");
1074        let base_metadata = base
1075            .allocation_metadata()
1076            .expect("base allocation metadata should derive")
1077            .expect("base allocation metadata should be present");
1078
1079        let mut indexed = SchemaStore::init(test_memory(236));
1080        indexed
1081            .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1082            .expect("indexed schema snapshot should encode");
1083        let indexed_metadata = indexed
1084            .allocation_metadata()
1085            .expect("indexed allocation metadata should derive")
1086            .expect("indexed allocation metadata should be present");
1087
1088        assert_eq!(
1089            base_metadata.data().schema_fingerprint(),
1090            indexed_metadata.data().schema_fingerprint(),
1091            "data allocation metadata should ignore accepted index catalog changes"
1092        );
1093        assert_ne!(
1094            base_metadata.index().schema_fingerprint(),
1095            indexed_metadata.index().schema_fingerprint(),
1096            "index allocation metadata should change when accepted index catalog changes"
1097        );
1098        assert_ne!(
1099            base_metadata.schema().schema_fingerprint(),
1100            indexed_metadata.schema().schema_fingerprint(),
1101            "schema allocation metadata should change when full accepted catalog changes"
1102        );
1103        assert_ne!(
1104            indexed_metadata.data().schema_fingerprint(),
1105            indexed_metadata.index().schema_fingerprint(),
1106            "data and index allocation metadata should have distinct role fingerprints"
1107        );
1108        assert_ne!(
1109            indexed_metadata.index().schema_fingerprint(),
1110            indexed_metadata.schema().schema_fingerprint(),
1111            "index and schema allocation metadata should have distinct role fingerprints"
1112        );
1113    }
1114
1115    #[test]
1116    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1117        let mut store = SchemaStore::init(test_memory(253));
1118        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1119            SchemaVersion::new(2),
1120            SchemaVersion::initial(),
1121            "Invalid",
1122        );
1123
1124        let err = store
1125            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1126            .expect_err("schema store should reject mismatched snapshot/layout versions");
1127
1128        assert!(
1129            err.message()
1130                .contains("schema snapshot row-layout version mismatch"),
1131            "schema store should preserve the version mismatch diagnostic"
1132        );
1133    }
1134
1135    #[test]
1136    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1137        let mut store = SchemaStore::init(test_memory(254));
1138        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1139        let invalid = PersistedSchemaSnapshot::new(
1140            base.version(),
1141            base.entity_path().to_string(),
1142            base.entity_name().to_string(),
1143            base.first_primary_key_field_id(),
1144            SchemaRowLayout::new(
1145                base.version(),
1146                vec![
1147                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1148                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1149                ],
1150            ),
1151            base.fields().to_vec(),
1152        );
1153
1154        let err = store
1155            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1156            .expect_err("schema store should reject divergent field/layout slots");
1157
1158        assert!(
1159            err.message()
1160                .contains("schema snapshot field slot mismatch"),
1161            "schema store should report the duplicated slot divergence"
1162        );
1163    }
1164
1165    #[test]
1166    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1167        let mut store = SchemaStore::init(test_memory(246));
1168        let base =
1169            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1170        let invalid = PersistedSchemaSnapshot::new(
1171            base.version(),
1172            base.entity_path().to_string(),
1173            base.entity_name().to_string(),
1174            base.first_primary_key_field_id(),
1175            SchemaRowLayout::new(
1176                base.version(),
1177                vec![
1178                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1179                    (FieldId::new(2), SchemaFieldSlot::new(0)),
1180                ],
1181            ),
1182            base.fields().to_vec(),
1183        );
1184
1185        let err = store
1186            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1187            .expect_err("schema store should reject duplicate row-layout slots");
1188
1189        assert!(
1190            err.message()
1191                .contains("schema snapshot duplicate row-layout slot"),
1192            "schema store should report the row-layout slot ambiguity"
1193        );
1194    }
1195
1196    #[test]
1197    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1198        let mut store = SchemaStore::init(test_memory(248));
1199        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1200        let invalid = PersistedSchemaSnapshot::new(
1201            base.version(),
1202            base.entity_path().to_string(),
1203            base.entity_name().to_string(),
1204            FieldId::new(99),
1205            base.row_layout().clone(),
1206            base.fields().to_vec(),
1207        );
1208
1209        let err = store
1210            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1211            .expect_err("schema store should reject snapshots without the primary-key field");
1212
1213        assert!(
1214            err.message()
1215                .contains("schema snapshot primary key field missing from row layout"),
1216            "schema store should report the missing primary-key field"
1217        );
1218    }
1219
1220    #[test]
1221    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1222        let mut store = SchemaStore::init(test_memory(249));
1223        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1224        let corrupt_key =
1225            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1226
1227        store
1228            .insert_persisted_snapshot(EntityTag::new(45), &initial)
1229            .expect("initial schema snapshot should encode");
1230        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1231
1232        let err = store
1233            .latest_persisted_snapshot(EntityTag::new(45))
1234            .expect_err("latest corrupt schema snapshot must fail closed");
1235
1236        assert!(
1237            err.message()
1238                .contains("failed to decode persisted schema snapshot"),
1239            "latest-version lookup should report the corrupt newest snapshot"
1240        );
1241    }
1242
1243    #[test]
1244    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1245        let mut store = SchemaStore::init(test_memory(250));
1246        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1247        let invalid = PersistedSchemaSnapshot::new(
1248            base.version(),
1249            base.entity_path().to_string(),
1250            base.entity_name().to_string(),
1251            base.first_primary_key_field_id(),
1252            SchemaRowLayout::new(
1253                base.version(),
1254                vec![
1255                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1256                    (FieldId::new(2), SchemaFieldSlot::new(3)),
1257                ],
1258            ),
1259            base.fields().to_vec(),
1260        );
1261        let raw = encode_persisted_schema_snapshot(&invalid)
1262            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1263        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1264
1265        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1266
1267        let err = store
1268            .latest_persisted_snapshot(EntityTag::new(46))
1269            .expect_err("raw decode should reject divergent field/layout slots");
1270
1271        assert!(
1272            err.message()
1273                .contains("persisted schema snapshot field slot mismatch"),
1274            "schema codec should report the raw decoded slot divergence"
1275        );
1276    }
1277
1278    #[test]
1279    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1280        let mut store = SchemaStore::init(test_memory(247));
1281        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1282        let invalid = PersistedSchemaSnapshot::new(
1283            base.version(),
1284            base.entity_path().to_string(),
1285            base.entity_name().to_string(),
1286            FieldId::new(99),
1287            base.row_layout().clone(),
1288            base.fields().to_vec(),
1289        );
1290        let raw = encode_persisted_schema_snapshot(&invalid)
1291            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1292        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1293
1294        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1295
1296        let err = store
1297            .latest_persisted_snapshot(EntityTag::new(48))
1298            .expect_err("raw decode should reject snapshots without the primary-key field");
1299
1300        assert!(
1301            err.message()
1302                .contains("persisted schema snapshot primary key field missing from row layout"),
1303            "schema codec should report the raw decoded missing primary-key field"
1304        );
1305    }
1306
1307    #[test]
1308    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1309        let mut store = SchemaStore::init(test_memory(245));
1310        let base =
1311            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1312        let mut fields = base.fields().to_vec();
1313        let duplicate = PersistedFieldSnapshot::new(
1314            fields[1].id(),
1315            fields[0].name().to_string(),
1316            fields[1].slot(),
1317            fields[1].kind().clone(),
1318            fields[1].nested_leaves().to_vec(),
1319            fields[1].nullable(),
1320            fields[1].default().clone(),
1321            fields[1].storage_decode(),
1322            fields[1].leaf_codec(),
1323        );
1324        fields[1] = duplicate;
1325        let invalid = PersistedSchemaSnapshot::new(
1326            base.version(),
1327            base.entity_path().to_string(),
1328            base.entity_name().to_string(),
1329            base.first_primary_key_field_id(),
1330            base.row_layout().clone(),
1331            fields,
1332        );
1333        let raw = encode_persisted_schema_snapshot(&invalid)
1334            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1335        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1336
1337        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1338
1339        let err = store
1340            .latest_persisted_snapshot(EntityTag::new(50))
1341            .expect_err("raw decode should reject duplicate field names");
1342
1343        assert!(
1344            err.message()
1345                .contains("persisted schema snapshot duplicate field name"),
1346            "schema codec should report the raw decoded field-name ambiguity"
1347        );
1348    }
1349
1350    #[test]
1351    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1352        let mut store = SchemaStore::init(test_memory(244));
1353        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1354        let mut fields = base.fields().to_vec();
1355        let invalid_field = PersistedFieldSnapshot::new(
1356            fields[1].id(),
1357            fields[1].name().to_string(),
1358            fields[1].slot(),
1359            fields[1].kind().clone(),
1360            vec![PersistedNestedLeafSnapshot::new(
1361                Vec::new(),
1362                PersistedFieldKind::Blob { max_len: None },
1363                false,
1364                FieldStorageDecode::ByKind,
1365                LeafCodec::Scalar(ScalarCodec::Blob),
1366            )],
1367            fields[1].nullable(),
1368            fields[1].default().clone(),
1369            fields[1].storage_decode(),
1370            fields[1].leaf_codec(),
1371        );
1372        fields[1] = invalid_field;
1373        let invalid = PersistedSchemaSnapshot::new(
1374            base.version(),
1375            base.entity_path().to_string(),
1376            base.entity_name().to_string(),
1377            base.first_primary_key_field_id(),
1378            base.row_layout().clone(),
1379            fields,
1380        );
1381
1382        let err = store
1383            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1384            .expect_err("schema store should reject empty nested leaf paths");
1385
1386        assert!(
1387            err.message()
1388                .contains("schema snapshot empty nested leaf path"),
1389            "schema store should report the empty nested leaf path"
1390        );
1391    }
1392
1393    #[test]
1394    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1395        let mut store = SchemaStore::init(test_memory(243));
1396        let base =
1397            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1398        let mut fields = base.fields().to_vec();
1399        let duplicate_leaves = vec![
1400            PersistedNestedLeafSnapshot::new(
1401                vec!["bytes".to_string()],
1402                PersistedFieldKind::Blob { max_len: None },
1403                false,
1404                FieldStorageDecode::ByKind,
1405                LeafCodec::Scalar(ScalarCodec::Blob),
1406            ),
1407            PersistedNestedLeafSnapshot::new(
1408                vec!["bytes".to_string()],
1409                PersistedFieldKind::Text { max_len: None },
1410                false,
1411                FieldStorageDecode::ByKind,
1412                LeafCodec::Scalar(ScalarCodec::Text),
1413            ),
1414        ];
1415        let invalid_field = PersistedFieldSnapshot::new(
1416            fields[1].id(),
1417            fields[1].name().to_string(),
1418            fields[1].slot(),
1419            fields[1].kind().clone(),
1420            duplicate_leaves,
1421            fields[1].nullable(),
1422            fields[1].default().clone(),
1423            fields[1].storage_decode(),
1424            fields[1].leaf_codec(),
1425        );
1426        fields[1] = invalid_field;
1427        let invalid = PersistedSchemaSnapshot::new(
1428            base.version(),
1429            base.entity_path().to_string(),
1430            base.entity_name().to_string(),
1431            base.first_primary_key_field_id(),
1432            base.row_layout().clone(),
1433            fields,
1434        );
1435        let raw = encode_persisted_schema_snapshot(&invalid)
1436            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1437        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1438
1439        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1440
1441        let err = store
1442            .latest_persisted_snapshot(EntityTag::new(52))
1443            .expect_err("raw decode should reject duplicate nested leaf paths");
1444
1445        assert!(
1446            err.message()
1447                .contains("persisted schema snapshot duplicate nested leaf path"),
1448            "schema codec should report the raw decoded nested path ambiguity"
1449        );
1450    }
1451
1452    #[test]
1453    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1454        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1455
1456        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1457            .expect("schema snapshot should encode");
1458        let decoded = raw
1459            .decode_persisted_snapshot()
1460            .expect("schema snapshot should decode");
1461
1462        assert_eq!(decoded, snapshot);
1463    }
1464
1465    // Build one typed schema snapshot used by schema-store tests. The exact
1466    // field contracts are intentionally rich enough to cover nested metadata,
1467    // scalar codecs, and structural fallback payloads through the raw store.
1468    fn persisted_schema_snapshot_for_test(
1469        version: SchemaVersion,
1470        entity_name: &str,
1471    ) -> PersistedSchemaSnapshot {
1472        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1473    }
1474
1475    fn persisted_schema_snapshot_with_index_for_test(
1476        version: SchemaVersion,
1477        entity_name: &str,
1478        index_name: &str,
1479    ) -> PersistedSchemaSnapshot {
1480        let base = persisted_schema_snapshot_for_test(version, entity_name);
1481
1482        PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1483            base.version(),
1484            base.entity_path().to_string(),
1485            base.entity_name().to_string(),
1486            base.primary_key_field_ids().to_vec(),
1487            base.row_layout().clone(),
1488            base.fields().to_vec(),
1489            vec![PersistedIndexSnapshot::new(
1490                0,
1491                index_name.to_string(),
1492                "RoleSpecificStore".to_string(),
1493                false,
1494                PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1495                    FieldId::new(1),
1496                    SchemaFieldSlot::new(0),
1497                    vec!["id".to_string()],
1498                    PersistedFieldKind::Ulid,
1499                    false,
1500                )]),
1501                None,
1502            )],
1503        )
1504    }
1505
1506    // Build one typed schema snapshot with independently selectable snapshot
1507    // and row-layout versions. Production snapshots should keep these aligned;
1508    // tests can deliberately break that invariant at the store boundary.
1509    fn persisted_schema_snapshot_with_layout_version_for_test(
1510        version: SchemaVersion,
1511        layout_version: SchemaVersion,
1512        entity_name: &str,
1513    ) -> PersistedSchemaSnapshot {
1514        PersistedSchemaSnapshot::new(
1515            version,
1516            format!("entities::{entity_name}"),
1517            entity_name.to_string(),
1518            FieldId::new(1),
1519            SchemaRowLayout::new(
1520                layout_version,
1521                vec![
1522                    (FieldId::new(1), SchemaFieldSlot::new(0)),
1523                    (FieldId::new(2), SchemaFieldSlot::new(1)),
1524                ],
1525            ),
1526            vec![
1527                PersistedFieldSnapshot::new(
1528                    FieldId::new(1),
1529                    "id".to_string(),
1530                    SchemaFieldSlot::new(0),
1531                    PersistedFieldKind::Ulid,
1532                    Vec::new(),
1533                    false,
1534                    SchemaFieldDefault::None,
1535                    FieldStorageDecode::ByKind,
1536                    LeafCodec::Scalar(ScalarCodec::Ulid),
1537                ),
1538                PersistedFieldSnapshot::new(
1539                    FieldId::new(2),
1540                    "payload".to_string(),
1541                    SchemaFieldSlot::new(1),
1542                    PersistedFieldKind::Map {
1543                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
1544                        value: Box::new(PersistedFieldKind::List(Box::new(
1545                            PersistedFieldKind::Nat64,
1546                        ))),
1547                    },
1548                    vec![PersistedNestedLeafSnapshot::new(
1549                        vec!["bytes".to_string()],
1550                        PersistedFieldKind::Blob { max_len: None },
1551                        false,
1552                        FieldStorageDecode::ByKind,
1553                        LeafCodec::Scalar(ScalarCodec::Blob),
1554                    )],
1555                    false,
1556                    SchemaFieldDefault::None,
1557                    FieldStorageDecode::ByKind,
1558                    LeafCodec::StructuralFallback,
1559                ),
1560            ],
1561        )
1562    }
1563}