Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::schema::{
8        PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
9        encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
10    },
11    error::InternalError,
12    traits::Storable,
13    types::EntityTag,
14};
15use canic_cdk::structures::{BTreeMap, DefaultMemoryImpl, memory::VirtualMemory, storable::Bound};
16use std::borrow::Cow;
17
18const SCHEMA_KEY_BYTES_USIZE: usize = 12;
19const SCHEMA_KEY_BYTES: u32 = 12;
20const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
21
22///
23/// RawSchemaKey
24///
25/// Stable key for one persisted schema snapshot entry.
26/// It combines the entity tag and schema version so reconciliation can load
27/// concrete versions without depending on generated entity names.
28///
29
30#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
31struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
32
33impl RawSchemaKey {
34    /// Build the raw persisted key for one entity schema version.
35    #[must_use]
36    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
37        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
38        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
39        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
40
41        Self(out)
42    }
43
44    /// Return the entity tag encoded in this schema key.
45    #[must_use]
46    fn entity_tag(self) -> EntityTag {
47        let mut bytes = [0u8; size_of::<u64>()];
48        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
49
50        EntityTag::new(u64::from_be_bytes(bytes))
51    }
52
53    /// Return the schema version encoded in this schema key.
54    #[must_use]
55    fn version(self) -> u32 {
56        let mut bytes = [0u8; size_of::<u32>()];
57        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
58
59        u32::from_be_bytes(bytes)
60    }
61}
62
63impl Storable for RawSchemaKey {
64    fn to_bytes(&self) -> Cow<'_, [u8]> {
65        Cow::Borrowed(&self.0)
66    }
67
68    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
69        debug_assert_eq!(
70            bytes.len(),
71            SCHEMA_KEY_BYTES_USIZE,
72            "RawSchemaKey::from_bytes received unexpected byte length",
73        );
74
75        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
76            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
77        }
78
79        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
80        out.copy_from_slice(bytes.as_ref());
81        Self(out)
82    }
83
84    fn into_bytes(self) -> Vec<u8> {
85        self.0.to_vec()
86    }
87
88    const BOUND: Bound = Bound::Bounded {
89        max_size: SCHEMA_KEY_BYTES,
90        is_fixed_size: true,
91    };
92}
93
94///
95/// RawSchemaSnapshot
96///
97/// Raw persisted schema snapshot payload.
98/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
99/// keeping the stable-memory value boundary independent from the typed schema
100/// DTOs used by reconciliation.
101///
102
103#[derive(Clone, Debug, Eq, PartialEq)]
104struct RawSchemaSnapshot(Vec<u8>);
105
106impl RawSchemaSnapshot {
107    /// Encode one typed persisted-schema snapshot into a raw store payload.
108    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
109        validate_typed_schema_snapshot_for_store(snapshot)?;
110
111        encode_persisted_schema_snapshot(snapshot).map(Self)
112    }
113
114    /// Build one raw schema snapshot from already-encoded bytes.
115    #[must_use]
116    #[cfg(test)]
117    const fn from_bytes(bytes: Vec<u8>) -> Self {
118        Self(bytes)
119    }
120
121    /// Borrow the encoded schema snapshot payload.
122    #[must_use]
123    const fn as_bytes(&self) -> &[u8] {
124        self.0.as_slice()
125    }
126
127    /// Consume the snapshot into its encoded payload bytes.
128    #[must_use]
129    #[cfg(test)]
130    fn into_bytes(self) -> Vec<u8> {
131        self.0
132    }
133
134    /// Decode this raw store payload into a typed persisted-schema snapshot.
135    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
136        decode_persisted_schema_snapshot(self.as_bytes())
137    }
138}
139
140impl Storable for RawSchemaSnapshot {
141    fn to_bytes(&self) -> Cow<'_, [u8]> {
142        Cow::Borrowed(self.as_bytes())
143    }
144
145    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
146        Self(bytes.into_owned())
147    }
148
149    fn into_bytes(self) -> Vec<u8> {
150        self.0
151    }
152
153    const BOUND: Bound = Bound::Bounded {
154        max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
155        is_fixed_size: false,
156    };
157}
158
159// Validate typed schema snapshots before they are encoded into the raw schema
160// metadata store. This catches caller-side invariant violations separately from
161// raw persisted-byte corruption handled by the codec decode boundary.
162fn validate_typed_schema_snapshot_for_store(
163    snapshot: &PersistedSchemaSnapshot,
164) -> Result<(), InternalError> {
165    if let Some(detail) = schema_snapshot_integrity_detail(
166        "schema snapshot",
167        snapshot.version(),
168        snapshot.primary_key_field_id(),
169        snapshot.row_layout(),
170        snapshot.fields(),
171    ) {
172        return Err(InternalError::store_invariant(detail));
173    }
174
175    Ok(())
176}
177
178///
179/// SchemaStoreFootprint
180///
181/// Current raw schema metadata footprint for one entity. Reconciliation uses
182/// this value to report stable-memory pressure without decoding schema payloads
183/// or exposing field-level metadata through metrics.
184///
185
186#[derive(Clone, Copy, Debug, Eq, PartialEq)]
187pub(in crate::db) struct SchemaStoreFootprint {
188    snapshots: u64,
189    encoded_bytes: u64,
190    latest_snapshot_bytes: u64,
191}
192
193impl SchemaStoreFootprint {
194    /// Build one schema-store footprint from already-counted raw payload facts.
195    #[must_use]
196    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
197        Self {
198            snapshots,
199            encoded_bytes,
200            latest_snapshot_bytes,
201        }
202    }
203
204    /// Return the number of raw schema snapshots stored for the entity.
205    #[must_use]
206    pub(in crate::db) const fn snapshots(self) -> u64 {
207        self.snapshots
208    }
209
210    /// Return the total encoded payload bytes stored for the entity.
211    #[must_use]
212    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
213        self.encoded_bytes
214    }
215
216    /// Return the encoded payload bytes for the highest-version snapshot.
217    #[must_use]
218    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
219        self.latest_snapshot_bytes
220    }
221}
222
223///
224/// SchemaStore
225///
226/// Thin persistence wrapper over one stable schema metadata BTreeMap.
227/// Startup reconciliation writes and validates encoded schema snapshots here
228/// before row/index operations proceed.
229///
230
231pub struct SchemaStore {
232    map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
233}
234
235impl SchemaStore {
236    /// Initialize the schema store with the provided backing memory.
237    #[must_use]
238    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
239        Self {
240            map: BTreeMap::init(memory),
241        }
242    }
243
244    /// Insert or replace one typed persisted schema snapshot.
245    pub(in crate::db) fn insert_persisted_snapshot(
246        &mut self,
247        entity: EntityTag,
248        snapshot: &PersistedSchemaSnapshot,
249    ) -> Result<(), InternalError> {
250        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
251        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
252        let _ = self.insert_raw_snapshot(key, raw_snapshot);
253
254        Ok(())
255    }
256
257    /// Load and decode one typed persisted schema snapshot.
258    #[cfg(test)]
259    pub(in crate::db) fn get_persisted_snapshot(
260        &self,
261        entity: EntityTag,
262        version: SchemaVersion,
263    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
264        let key = RawSchemaKey::from_entity_version(entity, version);
265        self.get_raw_snapshot(&key)
266            .map(|snapshot| snapshot.decode_persisted_snapshot())
267            .transpose()
268    }
269
270    /// Load and decode the highest stored schema snapshot version for one entity.
271    pub(in crate::db) fn latest_persisted_snapshot(
272        &self,
273        entity: EntityTag,
274    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
275        let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
276        for entry in self.map.iter() {
277            let (key, snapshot) = entry.into_pair();
278            if key.entity_tag() != entity {
279                continue;
280            }
281
282            let version = SchemaVersion::new(key.version());
283            if latest
284                .as_ref()
285                .is_none_or(|(latest_version, _)| version > *latest_version)
286            {
287                latest = Some((version, snapshot));
288            }
289        }
290
291        latest
292            .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
293            .transpose()
294    }
295
296    /// Return raw schema-store footprint facts for one entity.
297    #[must_use]
298    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
299        let mut snapshots = 0u64;
300        let mut encoded_bytes = 0u64;
301        let mut latest = None::<(SchemaVersion, u64)>;
302
303        for entry in self.map.iter() {
304            let (key, snapshot) = entry.into_pair();
305            if key.entity_tag() != entity {
306                continue;
307            }
308
309            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
310            snapshots = snapshots.saturating_add(1);
311            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
312
313            let version = SchemaVersion::new(key.version());
314            if latest
315                .as_ref()
316                .is_none_or(|(latest_version, _)| version > *latest_version)
317            {
318                latest = Some((version, snapshot_bytes));
319            }
320        }
321
322        SchemaStoreFootprint::new(
323            snapshots,
324            encoded_bytes,
325            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
326        )
327    }
328
329    /// Insert or replace one raw schema snapshot.
330    fn insert_raw_snapshot(
331        &mut self,
332        key: RawSchemaKey,
333        snapshot: RawSchemaSnapshot,
334    ) -> Option<RawSchemaSnapshot> {
335        self.map.insert(key, snapshot)
336    }
337
338    /// Load one raw schema snapshot by key.
339    #[must_use]
340    #[cfg(test)]
341    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
342        self.map.get(key)
343    }
344
345    /// Return whether one schema snapshot key is present.
346    #[must_use]
347    #[cfg(test)]
348    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
349        self.map.contains_key(key)
350    }
351
352    /// Return the number of schema snapshot entries in this store.
353    #[must_use]
354    #[cfg(test)]
355    pub(in crate::db) fn len(&self) -> u64 {
356        self.map.len()
357    }
358
359    /// Return whether this schema store currently has no persisted snapshots.
360    #[must_use]
361    #[cfg(test)]
362    pub(in crate::db) fn is_empty(&self) -> bool {
363        self.map.is_empty()
364    }
365
366    /// Clear all schema metadata entries from the store.
367    #[cfg(test)]
368    pub(in crate::db) fn clear(&mut self) {
369        self.map.clear();
370    }
371}
372
373///
374/// TESTS
375///
376
377#[cfg(test)]
378mod tests {
379    use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
380    use crate::{
381        db::schema::{
382            FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedNestedLeafSnapshot,
383            PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
384            SchemaVersion, encode_persisted_schema_snapshot,
385        },
386        model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
387        testing::test_memory,
388        traits::Storable,
389        types::EntityTag,
390    };
391    use std::borrow::Cow;
392
393    #[test]
394    fn raw_schema_key_round_trips_entity_and_version() {
395        let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
396            SchemaVersion::initial()
397        });
398        let encoded = key.to_bytes().into_owned();
399        let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
400
401        assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
402        assert_eq!(decoded.version(), SchemaVersion::initial().get());
403    }
404
405    #[test]
406    fn raw_schema_snapshot_round_trips_payload_bytes() {
407        let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
408        let encoded = snapshot.to_bytes().into_owned();
409        let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
410
411        assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
412        assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
413    }
414
415    #[test]
416    fn schema_store_persists_raw_snapshots_by_entity_version_key() {
417        let mut store = SchemaStore::init(test_memory(251));
418        let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
419
420        assert!(store.is_empty());
421        assert!(!store.contains_raw_snapshot(&key));
422
423        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
424
425        assert_eq!(store.len(), 1);
426        assert!(store.contains_raw_snapshot(&key));
427        assert_eq!(
428            store
429                .get_raw_snapshot(&key)
430                .expect("schema snapshot should be present")
431                .as_bytes(),
432            &[9, 4, 6],
433        );
434
435        store.clear();
436        assert!(store.is_empty());
437    }
438
439    #[test]
440    fn schema_store_loads_latest_snapshot_for_entity() {
441        let mut store = SchemaStore::init(test_memory(252));
442        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
443        let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
444        let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
445
446        store
447            .insert_persisted_snapshot(EntityTag::new(41), &initial)
448            .expect("initial schema snapshot should encode");
449        store
450            .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
451            .expect("other entity schema snapshot should encode");
452        store
453            .insert_persisted_snapshot(EntityTag::new(41), &newer)
454            .expect("newer schema snapshot should encode");
455
456        let latest = store
457            .latest_persisted_snapshot(EntityTag::new(41))
458            .expect("latest schema snapshot should decode")
459            .expect("schema snapshot should exist");
460
461        assert_eq!(latest.version(), SchemaVersion::new(2));
462        assert_eq!(latest.entity_name(), "Newer");
463    }
464
465    #[test]
466    fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
467        let mut store = SchemaStore::init(test_memory(242));
468        store.insert_raw_snapshot(
469            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
470            RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
471        );
472        store.insert_raw_snapshot(
473            RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
474            RawSchemaSnapshot::from_bytes(vec![5, 8]),
475        );
476        store.insert_raw_snapshot(
477            RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
478            RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
479        );
480
481        let footprint = store.entity_footprint(EntityTag::new(71));
482
483        assert_eq!(footprint.snapshots(), 2);
484        assert_eq!(footprint.encoded_bytes(), 7);
485        assert_eq!(footprint.latest_snapshot_bytes(), 4);
486    }
487
488    #[test]
489    fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
490        let mut store = SchemaStore::init(test_memory(253));
491        let invalid = persisted_schema_snapshot_with_layout_version_for_test(
492            SchemaVersion::new(2),
493            SchemaVersion::initial(),
494            "Invalid",
495        );
496
497        let err = store
498            .insert_persisted_snapshot(EntityTag::new(43), &invalid)
499            .expect_err("schema store should reject mismatched snapshot/layout versions");
500
501        assert!(
502            err.message()
503                .contains("schema snapshot row-layout version mismatch"),
504            "schema store should preserve the version mismatch diagnostic"
505        );
506    }
507
508    #[test]
509    fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
510        let mut store = SchemaStore::init(test_memory(254));
511        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
512        let invalid = PersistedSchemaSnapshot::new(
513            base.version(),
514            base.entity_path().to_string(),
515            base.entity_name().to_string(),
516            base.primary_key_field_id(),
517            SchemaRowLayout::new(
518                base.version(),
519                vec![
520                    (FieldId::new(1), SchemaFieldSlot::new(0)),
521                    (FieldId::new(2), SchemaFieldSlot::new(3)),
522                ],
523            ),
524            base.fields().to_vec(),
525        );
526
527        let err = store
528            .insert_persisted_snapshot(EntityTag::new(44), &invalid)
529            .expect_err("schema store should reject divergent field/layout slots");
530
531        assert!(
532            err.message()
533                .contains("schema snapshot field slot mismatch"),
534            "schema store should report the duplicated slot divergence"
535        );
536    }
537
538    #[test]
539    fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
540        let mut store = SchemaStore::init(test_memory(246));
541        let base =
542            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
543        let invalid = PersistedSchemaSnapshot::new(
544            base.version(),
545            base.entity_path().to_string(),
546            base.entity_name().to_string(),
547            base.primary_key_field_id(),
548            SchemaRowLayout::new(
549                base.version(),
550                vec![
551                    (FieldId::new(1), SchemaFieldSlot::new(0)),
552                    (FieldId::new(2), SchemaFieldSlot::new(0)),
553                ],
554            ),
555            base.fields().to_vec(),
556        );
557
558        let err = store
559            .insert_persisted_snapshot(EntityTag::new(49), &invalid)
560            .expect_err("schema store should reject duplicate row-layout slots");
561
562        assert!(
563            err.message()
564                .contains("schema snapshot duplicate row-layout slot"),
565            "schema store should report the row-layout slot ambiguity"
566        );
567    }
568
569    #[test]
570    fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
571        let mut store = SchemaStore::init(test_memory(248));
572        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
573        let invalid = PersistedSchemaSnapshot::new(
574            base.version(),
575            base.entity_path().to_string(),
576            base.entity_name().to_string(),
577            FieldId::new(99),
578            base.row_layout().clone(),
579            base.fields().to_vec(),
580        );
581
582        let err = store
583            .insert_persisted_snapshot(EntityTag::new(47), &invalid)
584            .expect_err("schema store should reject snapshots without the primary-key field");
585
586        assert!(
587            err.message()
588                .contains("schema snapshot primary key field missing from row layout"),
589            "schema store should report the missing primary-key field"
590        );
591    }
592
593    #[test]
594    fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
595        let mut store = SchemaStore::init(test_memory(249));
596        let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
597        let corrupt_key =
598            RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
599
600        store
601            .insert_persisted_snapshot(EntityTag::new(45), &initial)
602            .expect("initial schema snapshot should encode");
603        store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
604
605        let err = store
606            .latest_persisted_snapshot(EntityTag::new(45))
607            .expect_err("latest corrupt schema snapshot must fail closed");
608
609        assert!(
610            err.message()
611                .contains("failed to decode persisted schema snapshot"),
612            "latest-version lookup should report the corrupt newest snapshot"
613        );
614    }
615
616    #[test]
617    fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
618        let mut store = SchemaStore::init(test_memory(250));
619        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
620        let invalid = PersistedSchemaSnapshot::new(
621            base.version(),
622            base.entity_path().to_string(),
623            base.entity_name().to_string(),
624            base.primary_key_field_id(),
625            SchemaRowLayout::new(
626                base.version(),
627                vec![
628                    (FieldId::new(1), SchemaFieldSlot::new(0)),
629                    (FieldId::new(2), SchemaFieldSlot::new(3)),
630                ],
631            ),
632            base.fields().to_vec(),
633        );
634        let raw = encode_persisted_schema_snapshot(&invalid)
635            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
636        let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
637
638        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
639
640        let err = store
641            .latest_persisted_snapshot(EntityTag::new(46))
642            .expect_err("raw decode should reject divergent field/layout slots");
643
644        assert!(
645            err.message()
646                .contains("persisted schema snapshot field slot mismatch"),
647            "schema codec should report the raw decoded slot divergence"
648        );
649    }
650
651    #[test]
652    fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
653        let mut store = SchemaStore::init(test_memory(247));
654        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
655        let invalid = PersistedSchemaSnapshot::new(
656            base.version(),
657            base.entity_path().to_string(),
658            base.entity_name().to_string(),
659            FieldId::new(99),
660            base.row_layout().clone(),
661            base.fields().to_vec(),
662        );
663        let raw = encode_persisted_schema_snapshot(&invalid)
664            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
665        let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
666
667        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
668
669        let err = store
670            .latest_persisted_snapshot(EntityTag::new(48))
671            .expect_err("raw decode should reject snapshots without the primary-key field");
672
673        assert!(
674            err.message()
675                .contains("persisted schema snapshot primary key field missing from row layout"),
676            "schema codec should report the raw decoded missing primary-key field"
677        );
678    }
679
680    #[test]
681    fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
682        let mut store = SchemaStore::init(test_memory(245));
683        let base =
684            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
685        let mut fields = base.fields().to_vec();
686        let duplicate = PersistedFieldSnapshot::new(
687            fields[1].id(),
688            fields[0].name().to_string(),
689            fields[1].slot(),
690            fields[1].kind().clone(),
691            fields[1].nested_leaves().to_vec(),
692            fields[1].nullable(),
693            fields[1].default(),
694            fields[1].storage_decode(),
695            fields[1].leaf_codec(),
696        );
697        fields[1] = duplicate;
698        let invalid = PersistedSchemaSnapshot::new(
699            base.version(),
700            base.entity_path().to_string(),
701            base.entity_name().to_string(),
702            base.primary_key_field_id(),
703            base.row_layout().clone(),
704            fields,
705        );
706        let raw = encode_persisted_schema_snapshot(&invalid)
707            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
708        let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
709
710        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
711
712        let err = store
713            .latest_persisted_snapshot(EntityTag::new(50))
714            .expect_err("raw decode should reject duplicate field names");
715
716        assert!(
717            err.message()
718                .contains("persisted schema snapshot duplicate field name"),
719            "schema codec should report the raw decoded field-name ambiguity"
720        );
721    }
722
723    #[test]
724    fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
725        let mut store = SchemaStore::init(test_memory(244));
726        let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
727        let mut fields = base.fields().to_vec();
728        let invalid_field = PersistedFieldSnapshot::new(
729            fields[1].id(),
730            fields[1].name().to_string(),
731            fields[1].slot(),
732            fields[1].kind().clone(),
733            vec![PersistedNestedLeafSnapshot::new(
734                Vec::new(),
735                PersistedFieldKind::Blob { max_len: None },
736                false,
737                FieldStorageDecode::ByKind,
738                LeafCodec::Scalar(ScalarCodec::Blob),
739            )],
740            fields[1].nullable(),
741            fields[1].default(),
742            fields[1].storage_decode(),
743            fields[1].leaf_codec(),
744        );
745        fields[1] = invalid_field;
746        let invalid = PersistedSchemaSnapshot::new(
747            base.version(),
748            base.entity_path().to_string(),
749            base.entity_name().to_string(),
750            base.primary_key_field_id(),
751            base.row_layout().clone(),
752            fields,
753        );
754
755        let err = store
756            .insert_persisted_snapshot(EntityTag::new(51), &invalid)
757            .expect_err("schema store should reject empty nested leaf paths");
758
759        assert!(
760            err.message()
761                .contains("schema snapshot empty nested leaf path"),
762            "schema store should report the empty nested leaf path"
763        );
764    }
765
766    #[test]
767    fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
768        let mut store = SchemaStore::init(test_memory(243));
769        let base =
770            persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
771        let mut fields = base.fields().to_vec();
772        let duplicate_leaves = vec![
773            PersistedNestedLeafSnapshot::new(
774                vec!["bytes".to_string()],
775                PersistedFieldKind::Blob { max_len: None },
776                false,
777                FieldStorageDecode::ByKind,
778                LeafCodec::Scalar(ScalarCodec::Blob),
779            ),
780            PersistedNestedLeafSnapshot::new(
781                vec!["bytes".to_string()],
782                PersistedFieldKind::Text { max_len: None },
783                false,
784                FieldStorageDecode::ByKind,
785                LeafCodec::Scalar(ScalarCodec::Text),
786            ),
787        ];
788        let invalid_field = PersistedFieldSnapshot::new(
789            fields[1].id(),
790            fields[1].name().to_string(),
791            fields[1].slot(),
792            fields[1].kind().clone(),
793            duplicate_leaves,
794            fields[1].nullable(),
795            fields[1].default(),
796            fields[1].storage_decode(),
797            fields[1].leaf_codec(),
798        );
799        fields[1] = invalid_field;
800        let invalid = PersistedSchemaSnapshot::new(
801            base.version(),
802            base.entity_path().to_string(),
803            base.entity_name().to_string(),
804            base.primary_key_field_id(),
805            base.row_layout().clone(),
806            fields,
807        );
808        let raw = encode_persisted_schema_snapshot(&invalid)
809            .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
810        let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
811
812        store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
813
814        let err = store
815            .latest_persisted_snapshot(EntityTag::new(52))
816            .expect_err("raw decode should reject duplicate nested leaf paths");
817
818        assert!(
819            err.message()
820                .contains("persisted schema snapshot duplicate nested leaf path"),
821            "schema codec should report the raw decoded nested path ambiguity"
822        );
823    }
824
825    #[test]
826    fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
827        let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
828
829        let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
830            .expect("schema snapshot should encode");
831        let decoded = raw
832            .decode_persisted_snapshot()
833            .expect("schema snapshot should decode");
834
835        assert_eq!(decoded, snapshot);
836    }
837
838    // Build one typed schema snapshot used by schema-store tests. The exact
839    // field contracts are intentionally rich enough to cover nested metadata,
840    // scalar codecs, and structural fallback payloads through the raw store.
841    fn persisted_schema_snapshot_for_test(
842        version: SchemaVersion,
843        entity_name: &str,
844    ) -> PersistedSchemaSnapshot {
845        persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
846    }
847
848    // Build one typed schema snapshot with independently selectable snapshot
849    // and row-layout versions. Production snapshots should keep these aligned;
850    // tests can deliberately break that invariant at the store boundary.
851    fn persisted_schema_snapshot_with_layout_version_for_test(
852        version: SchemaVersion,
853        layout_version: SchemaVersion,
854        entity_name: &str,
855    ) -> PersistedSchemaSnapshot {
856        PersistedSchemaSnapshot::new(
857            version,
858            format!("entities::{entity_name}"),
859            entity_name.to_string(),
860            FieldId::new(1),
861            SchemaRowLayout::new(
862                layout_version,
863                vec![
864                    (FieldId::new(1), SchemaFieldSlot::new(0)),
865                    (FieldId::new(2), SchemaFieldSlot::new(1)),
866                ],
867            ),
868            vec![
869                PersistedFieldSnapshot::new(
870                    FieldId::new(1),
871                    "id".to_string(),
872                    SchemaFieldSlot::new(0),
873                    PersistedFieldKind::Ulid,
874                    Vec::new(),
875                    false,
876                    SchemaFieldDefault::None,
877                    FieldStorageDecode::ByKind,
878                    LeafCodec::Scalar(ScalarCodec::Ulid),
879                ),
880                PersistedFieldSnapshot::new(
881                    FieldId::new(2),
882                    "payload".to_string(),
883                    SchemaFieldSlot::new(1),
884                    PersistedFieldKind::Map {
885                        key: Box::new(PersistedFieldKind::Text { max_len: None }),
886                        value: Box::new(PersistedFieldKind::List(Box::new(
887                            PersistedFieldKind::Uint,
888                        ))),
889                    },
890                    vec![PersistedNestedLeafSnapshot::new(
891                        vec!["bytes".to_string()],
892                        PersistedFieldKind::Blob { max_len: None },
893                        false,
894                        FieldStorageDecode::ByKind,
895                        LeafCodec::Scalar(ScalarCodec::Blob),
896                    )],
897                    false,
898                    SchemaFieldDefault::None,
899                    FieldStorageDecode::ByKind,
900                    LeafCodec::StructuralFallback,
901                ),
902            ],
903        )
904    }
905}