Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::schema::{
8        PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
9        encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
10    },
11    error::InternalError,
12    traits::Storable,
13    types::EntityTag,
14};
15use ic_memory::stable_structures::storable::Bound;
16use ic_memory::stable_structures::{BTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory};
17use std::borrow::Cow;
18
19const SCHEMA_KEY_BYTES_USIZE: usize = 12;
20const SCHEMA_KEY_BYTES: u32 = 12;
21const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
22
23///
24/// RawSchemaKey
25///
26/// Stable key for one persisted schema snapshot entry.
27/// It combines the entity tag and schema version so reconciliation can load
28/// concrete versions without depending on generated entity names.
29///
30
31#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
32struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
33
34impl RawSchemaKey {
35    /// Build the raw persisted key for one entity schema version.
36    #[must_use]
37    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
38        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
39        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
40        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
41
42        Self(out)
43    }
44
45    /// Return the entity tag encoded in this schema key.
46    #[must_use]
47    fn entity_tag(self) -> EntityTag {
48        let mut bytes = [0u8; size_of::<u64>()];
49        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
50
51        EntityTag::new(u64::from_be_bytes(bytes))
52    }
53
54    /// Return the schema version encoded in this schema key.
55    #[must_use]
56    fn version(self) -> u32 {
57        let mut bytes = [0u8; size_of::<u32>()];
58        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
59
60        u32::from_be_bytes(bytes)
61    }
62}
63
64impl Storable for RawSchemaKey {
65    fn to_bytes(&self) -> Cow<'_, [u8]> {
66        Cow::Borrowed(&self.0)
67    }
68
69    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
70        debug_assert_eq!(
71            bytes.len(),
72            SCHEMA_KEY_BYTES_USIZE,
73            "RawSchemaKey::from_bytes received unexpected byte length",
74        );
75
76        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
77            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
78        }
79
80        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81        out.copy_from_slice(bytes.as_ref());
82        Self(out)
83    }
84
85    fn into_bytes(self) -> Vec<u8> {
86        self.0.to_vec()
87    }
88
89    const BOUND: Bound = Bound::Bounded {
90        max_size: SCHEMA_KEY_BYTES,
91        is_fixed_size: true,
92    };
93}
94
95///
96/// RawSchemaSnapshot
97///
98/// Raw persisted schema snapshot payload.
99/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
100/// keeping the stable-memory value boundary independent from the typed schema
101/// DTOs used by reconciliation.
102///
103
104#[derive(Clone, Debug, Eq, PartialEq)]
105struct RawSchemaSnapshot(Vec<u8>);
106
107impl RawSchemaSnapshot {
108    /// Encode one typed persisted-schema snapshot into a raw store payload.
109    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
110        validate_typed_schema_snapshot_for_store(snapshot)?;
111
112        encode_persisted_schema_snapshot(snapshot).map(Self)
113    }
114
115    /// Build one raw schema snapshot from already-encoded bytes.
116    #[must_use]
117    #[cfg(test)]
118    const fn from_bytes(bytes: Vec<u8>) -> Self {
119        Self(bytes)
120    }
121
122    /// Borrow the encoded schema snapshot payload.
123    #[must_use]
124    const fn as_bytes(&self) -> &[u8] {
125        self.0.as_slice()
126    }
127
128    /// Consume the snapshot into its encoded payload bytes.
129    #[must_use]
130    #[cfg(test)]
131    fn into_bytes(self) -> Vec<u8> {
132        self.0
133    }
134
135    /// Decode this raw store payload into a typed persisted-schema snapshot.
136    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
137        decode_persisted_schema_snapshot(self.as_bytes())
138    }
139}
140
141impl Storable for RawSchemaSnapshot {
142    fn to_bytes(&self) -> Cow<'_, [u8]> {
143        Cow::Borrowed(self.as_bytes())
144    }
145
146    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
147        Self(bytes.into_owned())
148    }
149
150    fn into_bytes(self) -> Vec<u8> {
151        self.0
152    }
153
154    const BOUND: Bound = Bound::Bounded {
155        max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
156        is_fixed_size: false,
157    };
158}
159
160// Validate typed schema snapshots before they are encoded into the raw schema
161// metadata store. This catches caller-side invariant violations separately from
162// raw persisted-byte corruption handled by the codec decode boundary.
163fn validate_typed_schema_snapshot_for_store(
164    snapshot: &PersistedSchemaSnapshot,
165) -> Result<(), InternalError> {
166    if let Some(detail) = schema_snapshot_integrity_detail(
167        "schema snapshot",
168        snapshot.version(),
169        snapshot.primary_key_field_ids(),
170        snapshot.row_layout(),
171        snapshot.fields(),
172    ) {
173        return Err(InternalError::store_invariant(detail));
174    }
175
176    Ok(())
177}
178
179///
180/// SchemaStoreFootprint
181///
182/// Current raw schema metadata footprint for one entity. Reconciliation uses
183/// this value to report stable-memory pressure without decoding schema payloads
184/// or exposing field-level metadata through metrics.
185///
186
187#[derive(Clone, Copy, Debug, Eq, PartialEq)]
188pub(in crate::db) struct SchemaStoreFootprint {
189    snapshots: u64,
190    encoded_bytes: u64,
191    latest_snapshot_bytes: u64,
192}
193
194impl SchemaStoreFootprint {
195    /// Build one schema-store footprint from already-counted raw payload facts.
196    #[must_use]
197    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
198        Self {
199            snapshots,
200            encoded_bytes,
201            latest_snapshot_bytes,
202        }
203    }
204
205    /// Return the number of raw schema snapshots stored for the entity.
206    #[must_use]
207    pub(in crate::db) const fn snapshots(self) -> u64 {
208        self.snapshots
209    }
210
211    /// Return the total encoded payload bytes stored for the entity.
212    #[must_use]
213    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
214        self.encoded_bytes
215    }
216
217    /// Return the encoded payload bytes for the highest-version snapshot.
218    #[must_use]
219    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
220        self.latest_snapshot_bytes
221    }
222}
223
224///
225/// SchemaStore
226///
227/// Thin persistence wrapper over one stable schema metadata BTreeMap.
228/// Startup reconciliation writes and validates encoded schema snapshots here
229/// before row/index operations proceed.
230///
231
232pub struct SchemaStore {
233    map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
234}
235
236impl SchemaStore {
237    /// Initialize the schema store with the provided backing memory.
238    #[must_use]
239    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
240        Self {
241            map: BTreeMap::init(memory),
242        }
243    }
244
245    /// Insert or replace one typed persisted schema snapshot.
246    pub(in crate::db) fn insert_persisted_snapshot(
247        &mut self,
248        entity: EntityTag,
249        snapshot: &PersistedSchemaSnapshot,
250    ) -> Result<(), InternalError> {
251        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
252        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
253        let _ = self.insert_raw_snapshot(key, raw_snapshot);
254
255        Ok(())
256    }
257
258    /// Load and decode one typed persisted schema snapshot.
259    #[cfg(test)]
260    pub(in crate::db) fn get_persisted_snapshot(
261        &self,
262        entity: EntityTag,
263        version: SchemaVersion,
264    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
265        let key = RawSchemaKey::from_entity_version(entity, version);
266        self.get_raw_snapshot(&key)
267            .map(|snapshot| snapshot.decode_persisted_snapshot())
268            .transpose()
269    }
270
271    /// Load and decode the highest stored schema snapshot version for one entity.
272    pub(in crate::db) fn latest_persisted_snapshot(
273        &self,
274        entity: EntityTag,
275    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
276        let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
277        for entry in self.map.iter() {
278            let (key, snapshot) = entry.into_pair();
279            if key.entity_tag() != entity {
280                continue;
281            }
282
283            let version = SchemaVersion::new(key.version());
284            if latest
285                .as_ref()
286                .is_none_or(|(latest_version, _)| version > *latest_version)
287            {
288                latest = Some((version, snapshot));
289            }
290        }
291
292        latest
293            .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
294            .transpose()
295    }
296
297    /// Return raw schema-store footprint facts for one entity.
298    #[must_use]
299    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
300        let mut snapshots = 0u64;
301        let mut encoded_bytes = 0u64;
302        let mut latest = None::<(SchemaVersion, u64)>;
303
304        for entry in self.map.iter() {
305            let (key, snapshot) = entry.into_pair();
306            if key.entity_tag() != entity {
307                continue;
308            }
309
310            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
311            snapshots = snapshots.saturating_add(1);
312            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
313
314            let version = SchemaVersion::new(key.version());
315            if latest
316                .as_ref()
317                .is_none_or(|(latest_version, _)| version > *latest_version)
318            {
319                latest = Some((version, snapshot_bytes));
320            }
321        }
322
323        SchemaStoreFootprint::new(
324            snapshots,
325            encoded_bytes,
326            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
327        )
328    }
329
330    /// Insert or replace one raw schema snapshot.
331    fn insert_raw_snapshot(
332        &mut self,
333        key: RawSchemaKey,
334        snapshot: RawSchemaSnapshot,
335    ) -> Option<RawSchemaSnapshot> {
336        self.map.insert(key, snapshot)
337    }
338
339    /// Load one raw schema snapshot by key.
340    #[must_use]
341    #[cfg(test)]
342    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
343        self.map.get(key)
344    }
345
346    /// Return whether one schema snapshot key is present.
347    #[must_use]
348    #[cfg(test)]
349    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
350        self.map.contains_key(key)
351    }
352
353    /// Return the number of schema snapshot entries in this store.
354    #[must_use]
355    #[cfg(test)]
356    pub(in crate::db) fn len(&self) -> u64 {
357        self.map.len()
358    }
359
360    /// Return whether this schema store currently has no persisted snapshots.
361    #[must_use]
362    #[cfg(test)]
363    pub(in crate::db) fn is_empty(&self) -> bool {
364        self.map.is_empty()
365    }
366
367    /// Clear all schema metadata entries from the store.
368    #[cfg(test)]
369    pub(in crate::db) fn clear(&mut self) {
370        self.map.clear_new();
371    }
372}
373
374///
375/// TESTS
376///
377
378#[cfg(test)]
379mod tests {
380    use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
381    use crate::{
382        db::schema::{
383            FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedNestedLeafSnapshot,
384            PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
385            SchemaVersion, encode_persisted_schema_snapshot,
386        },
387        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
388        testing::test_memory,
389        traits::Storable,
390        types::EntityTag,
391    };
392    use std::borrow::Cow;
393
394    #[test]
395    fn raw_schema_key_round_trips_entity_and_version() {
396        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
397            SchemaVersion::initial()
398        });
399        let encoded = key.to_bytes().into_owned();
400        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
401
402        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
403        assert_eq!(decoded.version(), SchemaVersion::initial().get());
404    }
405
406    #[test]
407    fn raw_schema_snapshot_round_trips_payload_bytes() {
408        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
409        let encoded = snapshot.to_bytes().into_owned();
410        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
411
412        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
413        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
414    }
415
416    #[test]
417    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
418        let mut store = SchemaStore::init(test_memory(251));
419        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
420
421        assert!(store.is_empty());
422        assert!(!store.contains_raw_snapshot(&key));
423
424        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
425
426        assert_eq!(store.len(), 1);
427        assert!(store.contains_raw_snapshot(&key));
428        assert_eq!(
429            store
430                .get_raw_snapshot(&key)
431                .expect("schema snapshot should be present")
432                .as_bytes(),
433            &[9, 4, 6],
434        );
435
436        store.clear();
437        assert!(store.is_empty());
438    }
439
440    #[test]
441    fn schema_store_loads_latest_snapshot_for_entity() {
442        let mut store = SchemaStore::init(test_memory(252));
443        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
444        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
445        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
446
447        store
448            .insert_persisted_snapshot(EntityTag::new(41), &initial)
449            .expect("initial schema snapshot should encode");
450        store
451            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
452            .expect("other entity schema snapshot should encode");
453        store
454            .insert_persisted_snapshot(EntityTag::new(41), &newer)
455            .expect("newer schema snapshot should encode");
456
457        let latest = store
458            .latest_persisted_snapshot(EntityTag::new(41))
459            .expect("latest schema snapshot should decode")
460            .expect("schema snapshot should exist");
461
462        assert_eq!(latest.version(), SchemaVersion::new(2));
463        assert_eq!(latest.entity_name(), "Newer");
464    }
465
466    #[test]
467    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
468        let mut store = SchemaStore::init(test_memory(242));
469        store.insert_raw_snapshot(
470            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
471            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
472        );
473        store.insert_raw_snapshot(
474            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
475            RawSchemaSnapshot::from_bytes(vec![5, 8]),
476        );
477        store.insert_raw_snapshot(
478            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
479            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
480        );
481
482        let footprint = store.entity_footprint(EntityTag::new(71));
483
484        assert_eq!(footprint.snapshots(), 2);
485        assert_eq!(footprint.encoded_bytes(), 7);
486        assert_eq!(footprint.latest_snapshot_bytes(), 4);
487    }
488
489    #[test]
490    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
491        let mut store = SchemaStore::init(test_memory(253));
492        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
493            SchemaVersion::new(2),
494            SchemaVersion::initial(),
495            "Invalid",
496        );
497
498        let err = store
499            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
500            .expect_err("schema store should reject mismatched snapshot/layout versions");
501
502        assert!(
503            err.message()
504                .contains("schema snapshot row-layout version mismatch"),
505            "schema store should preserve the version mismatch diagnostic"
506        );
507    }
508
509    #[test]
510    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
511        let mut store = SchemaStore::init(test_memory(254));
512        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
513        let invalid = PersistedSchemaSnapshot::new(
514            base.version(),
515            base.entity_path().to_string(),
516            base.entity_name().to_string(),
517            base.primary_key_field_id(),
518            SchemaRowLayout::new(
519                base.version(),
520                vec![
521                    (FieldId::new(1), SchemaFieldSlot::new(0)),
522                    (FieldId::new(2), SchemaFieldSlot::new(3)),
523                ],
524            ),
525            base.fields().to_vec(),
526        );
527
528        let err = store
529            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
530            .expect_err("schema store should reject divergent field/layout slots");
531
532        assert!(
533            err.message()
534                .contains("schema snapshot field slot mismatch"),
535            "schema store should report the duplicated slot divergence"
536        );
537    }
538
539    #[test]
540    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
541        let mut store = SchemaStore::init(test_memory(246));
542        let base =
543            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
544        let invalid = PersistedSchemaSnapshot::new(
545            base.version(),
546            base.entity_path().to_string(),
547            base.entity_name().to_string(),
548            base.primary_key_field_id(),
549            SchemaRowLayout::new(
550                base.version(),
551                vec![
552                    (FieldId::new(1), SchemaFieldSlot::new(0)),
553                    (FieldId::new(2), SchemaFieldSlot::new(0)),
554                ],
555            ),
556            base.fields().to_vec(),
557        );
558
559        let err = store
560            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
561            .expect_err("schema store should reject duplicate row-layout slots");
562
563        assert!(
564            err.message()
565                .contains("schema snapshot duplicate row-layout slot"),
566            "schema store should report the row-layout slot ambiguity"
567        );
568    }
569
570    #[test]
571    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
572        let mut store = SchemaStore::init(test_memory(248));
573        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
574        let invalid = PersistedSchemaSnapshot::new(
575            base.version(),
576            base.entity_path().to_string(),
577            base.entity_name().to_string(),
578            FieldId::new(99),
579            base.row_layout().clone(),
580            base.fields().to_vec(),
581        );
582
583        let err = store
584            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
585            .expect_err("schema store should reject snapshots without the primary-key field");
586
587        assert!(
588            err.message()
589                .contains("schema snapshot primary key field missing from row layout"),
590            "schema store should report the missing primary-key field"
591        );
592    }
593
594    #[test]
595    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
596        let mut store = SchemaStore::init(test_memory(249));
597        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
598        let corrupt_key =
599            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
600
601        store
602            .insert_persisted_snapshot(EntityTag::new(45), &initial)
603            .expect("initial schema snapshot should encode");
604        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
605
606        let err = store
607            .latest_persisted_snapshot(EntityTag::new(45))
608            .expect_err("latest corrupt schema snapshot must fail closed");
609
610        assert!(
611            err.message()
612                .contains("failed to decode persisted schema snapshot"),
613            "latest-version lookup should report the corrupt newest snapshot"
614        );
615    }
616
617    #[test]
618    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
619        let mut store = SchemaStore::init(test_memory(250));
620        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
621        let invalid = PersistedSchemaSnapshot::new(
622            base.version(),
623            base.entity_path().to_string(),
624            base.entity_name().to_string(),
625            base.primary_key_field_id(),
626            SchemaRowLayout::new(
627                base.version(),
628                vec![
629                    (FieldId::new(1), SchemaFieldSlot::new(0)),
630                    (FieldId::new(2), SchemaFieldSlot::new(3)),
631                ],
632            ),
633            base.fields().to_vec(),
634        );
635        let raw = encode_persisted_schema_snapshot(&invalid)
636            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
637        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
638
639        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
640
641        let err = store
642            .latest_persisted_snapshot(EntityTag::new(46))
643            .expect_err("raw decode should reject divergent field/layout slots");
644
645        assert!(
646            err.message()
647                .contains("persisted schema snapshot field slot mismatch"),
648            "schema codec should report the raw decoded slot divergence"
649        );
650    }
651
652    #[test]
653    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
654        let mut store = SchemaStore::init(test_memory(247));
655        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
656        let invalid = PersistedSchemaSnapshot::new(
657            base.version(),
658            base.entity_path().to_string(),
659            base.entity_name().to_string(),
660            FieldId::new(99),
661            base.row_layout().clone(),
662            base.fields().to_vec(),
663        );
664        let raw = encode_persisted_schema_snapshot(&invalid)
665            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
666        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
667
668        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
669
670        let err = store
671            .latest_persisted_snapshot(EntityTag::new(48))
672            .expect_err("raw decode should reject snapshots without the primary-key field");
673
674        assert!(
675            err.message()
676                .contains("persisted schema snapshot primary key field missing from row layout"),
677            "schema codec should report the raw decoded missing primary-key field"
678        );
679    }
680
681    #[test]
682    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
683        let mut store = SchemaStore::init(test_memory(245));
684        let base =
685            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
686        let mut fields = base.fields().to_vec();
687        let duplicate = PersistedFieldSnapshot::new(
688            fields[1].id(),
689            fields[0].name().to_string(),
690            fields[1].slot(),
691            fields[1].kind().clone(),
692            fields[1].nested_leaves().to_vec(),
693            fields[1].nullable(),
694            fields[1].default().clone(),
695            fields[1].storage_decode(),
696            fields[1].leaf_codec(),
697        );
698        fields[1] = duplicate;
699        let invalid = PersistedSchemaSnapshot::new(
700            base.version(),
701            base.entity_path().to_string(),
702            base.entity_name().to_string(),
703            base.primary_key_field_id(),
704            base.row_layout().clone(),
705            fields,
706        );
707        let raw = encode_persisted_schema_snapshot(&invalid)
708            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
709        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
710
711        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
712
713        let err = store
714            .latest_persisted_snapshot(EntityTag::new(50))
715            .expect_err("raw decode should reject duplicate field names");
716
717        assert!(
718            err.message()
719                .contains("persisted schema snapshot duplicate field name"),
720            "schema codec should report the raw decoded field-name ambiguity"
721        );
722    }
723
724    #[test]
725    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
726        let mut store = SchemaStore::init(test_memory(244));
727        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
728        let mut fields = base.fields().to_vec();
729        let invalid_field = PersistedFieldSnapshot::new(
730            fields[1].id(),
731            fields[1].name().to_string(),
732            fields[1].slot(),
733            fields[1].kind().clone(),
734            vec![PersistedNestedLeafSnapshot::new(
735                Vec::new(),
736                PersistedFieldKind::Blob { max_len: None },
737                false,
738                FieldStorageDecode::ByKind,
739                LeafCodec::Scalar(ScalarCodec::Blob),
740            )],
741            fields[1].nullable(),
742            fields[1].default().clone(),
743            fields[1].storage_decode(),
744            fields[1].leaf_codec(),
745        );
746        fields[1] = invalid_field;
747        let invalid = PersistedSchemaSnapshot::new(
748            base.version(),
749            base.entity_path().to_string(),
750            base.entity_name().to_string(),
751            base.primary_key_field_id(),
752            base.row_layout().clone(),
753            fields,
754        );
755
756        let err = store
757            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
758            .expect_err("schema store should reject empty nested leaf paths");
759
760        assert!(
761            err.message()
762                .contains("schema snapshot empty nested leaf path"),
763            "schema store should report the empty nested leaf path"
764        );
765    }
766
767    #[test]
768    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
769        let mut store = SchemaStore::init(test_memory(243));
770        let base =
771            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
772        let mut fields = base.fields().to_vec();
773        let duplicate_leaves = vec![
774            PersistedNestedLeafSnapshot::new(
775                vec!["bytes".to_string()],
776                PersistedFieldKind::Blob { max_len: None },
777                false,
778                FieldStorageDecode::ByKind,
779                LeafCodec::Scalar(ScalarCodec::Blob),
780            ),
781            PersistedNestedLeafSnapshot::new(
782                vec!["bytes".to_string()],
783                PersistedFieldKind::Text { max_len: None },
784                false,
785                FieldStorageDecode::ByKind,
786                LeafCodec::Scalar(ScalarCodec::Text),
787            ),
788        ];
789        let invalid_field = PersistedFieldSnapshot::new(
790            fields[1].id(),
791            fields[1].name().to_string(),
792            fields[1].slot(),
793            fields[1].kind().clone(),
794            duplicate_leaves,
795            fields[1].nullable(),
796            fields[1].default().clone(),
797            fields[1].storage_decode(),
798            fields[1].leaf_codec(),
799        );
800        fields[1] = invalid_field;
801        let invalid = PersistedSchemaSnapshot::new(
802            base.version(),
803            base.entity_path().to_string(),
804            base.entity_name().to_string(),
805            base.primary_key_field_id(),
806            base.row_layout().clone(),
807            fields,
808        );
809        let raw = encode_persisted_schema_snapshot(&invalid)
810            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
811        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
812
813        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
814
815        let err = store
816            .latest_persisted_snapshot(EntityTag::new(52))
817            .expect_err("raw decode should reject duplicate nested leaf paths");
818
819        assert!(
820            err.message()
821                .contains("persisted schema snapshot duplicate nested leaf path"),
822            "schema codec should report the raw decoded nested path ambiguity"
823        );
824    }
825
826    #[test]
827    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
828        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
829
830        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
831            .expect("schema snapshot should encode");
832        let decoded = raw
833            .decode_persisted_snapshot()
834            .expect("schema snapshot should decode");
835
836        assert_eq!(decoded, snapshot);
837    }
838
839    // Build one typed schema snapshot used by schema-store tests. The exact
840    // field contracts are intentionally rich enough to cover nested metadata,
841    // scalar codecs, and structural fallback payloads through the raw store.
842    fn persisted_schema_snapshot_for_test(
843        version: SchemaVersion,
844        entity_name: &str,
845    ) -> PersistedSchemaSnapshot {
846        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
847    }
848
849    // Build one typed schema snapshot with independently selectable snapshot
850    // and row-layout versions. Production snapshots should keep these aligned;
851    // tests can deliberately break that invariant at the store boundary.
852    fn persisted_schema_snapshot_with_layout_version_for_test(
853        version: SchemaVersion,
854        layout_version: SchemaVersion,
855        entity_name: &str,
856    ) -> PersistedSchemaSnapshot {
857        PersistedSchemaSnapshot::new(
858            version,
859            format!("entities::{entity_name}"),
860            entity_name.to_string(),
861            FieldId::new(1),
862            SchemaRowLayout::new(
863                layout_version,
864                vec![
865                    (FieldId::new(1), SchemaFieldSlot::new(0)),
866                    (FieldId::new(2), SchemaFieldSlot::new(1)),
867                ],
868            ),
869            vec![
870                PersistedFieldSnapshot::new(
871                    FieldId::new(1),
872                    "id".to_string(),
873                    SchemaFieldSlot::new(0),
874                    PersistedFieldKind::Ulid,
875                    Vec::new(),
876                    false,
877                    SchemaFieldDefault::None,
878                    FieldStorageDecode::ByKind,
879                    LeafCodec::Scalar(ScalarCodec::Ulid),
880                ),
881                PersistedFieldSnapshot::new(
882                    FieldId::new(2),
883                    "payload".to_string(),
884                    SchemaFieldSlot::new(1),
885                    PersistedFieldKind::Map {
886                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
887                        value: Box::new(PersistedFieldKind::List(Box::new(
888                            PersistedFieldKind::Nat64,
889                        ))),
890                    },
891                    vec![PersistedNestedLeafSnapshot::new(
892                        vec!["bytes".to_string()],
893                        PersistedFieldKind::Blob { max_len: None },
894                        false,
895                        FieldStorageDecode::ByKind,
896                        LeafCodec::Scalar(ScalarCodec::Blob),
897                    )],
898                    false,
899                    SchemaFieldDefault::None,
900                    FieldStorageDecode::ByKind,
901                    LeafCodec::StructuralFallback,
902                ),
903            ],
904        )
905    }
906}