1use crate::{
7 db::schema::{
8 PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
9 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
10 },
11 error::InternalError,
12 traits::Storable,
13 types::EntityTag,
14};
15use ic_memory::stable_structures::storable::Bound;
16use ic_memory::stable_structures::{BTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory};
17use std::borrow::Cow;
18
19const SCHEMA_KEY_BYTES_USIZE: usize = 12;
20const SCHEMA_KEY_BYTES: u32 = 12;
21const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
22
23#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
32struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
33
34impl RawSchemaKey {
35 #[must_use]
37 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
38 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
39 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
40 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
41
42 Self(out)
43 }
44
45 #[must_use]
47 fn entity_tag(self) -> EntityTag {
48 let mut bytes = [0u8; size_of::<u64>()];
49 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
50
51 EntityTag::new(u64::from_be_bytes(bytes))
52 }
53
54 #[must_use]
56 fn version(self) -> u32 {
57 let mut bytes = [0u8; size_of::<u32>()];
58 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
59
60 u32::from_be_bytes(bytes)
61 }
62}
63
64impl Storable for RawSchemaKey {
65 fn to_bytes(&self) -> Cow<'_, [u8]> {
66 Cow::Borrowed(&self.0)
67 }
68
69 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
70 debug_assert_eq!(
71 bytes.len(),
72 SCHEMA_KEY_BYTES_USIZE,
73 "RawSchemaKey::from_bytes received unexpected byte length",
74 );
75
76 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
77 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
78 }
79
80 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81 out.copy_from_slice(bytes.as_ref());
82 Self(out)
83 }
84
85 fn into_bytes(self) -> Vec<u8> {
86 self.0.to_vec()
87 }
88
89 const BOUND: Bound = Bound::Bounded {
90 max_size: SCHEMA_KEY_BYTES,
91 is_fixed_size: true,
92 };
93}
94
95#[derive(Clone, Debug, Eq, PartialEq)]
105struct RawSchemaSnapshot(Vec<u8>);
106
107impl RawSchemaSnapshot {
108 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
110 validate_typed_schema_snapshot_for_store(snapshot)?;
111
112 encode_persisted_schema_snapshot(snapshot).map(Self)
113 }
114
115 #[must_use]
117 #[cfg(test)]
118 const fn from_bytes(bytes: Vec<u8>) -> Self {
119 Self(bytes)
120 }
121
122 #[must_use]
124 const fn as_bytes(&self) -> &[u8] {
125 self.0.as_slice()
126 }
127
128 #[must_use]
130 #[cfg(test)]
131 fn into_bytes(self) -> Vec<u8> {
132 self.0
133 }
134
135 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
137 decode_persisted_schema_snapshot(self.as_bytes())
138 }
139}
140
141impl Storable for RawSchemaSnapshot {
142 fn to_bytes(&self) -> Cow<'_, [u8]> {
143 Cow::Borrowed(self.as_bytes())
144 }
145
146 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
147 Self(bytes.into_owned())
148 }
149
150 fn into_bytes(self) -> Vec<u8> {
151 self.0
152 }
153
154 const BOUND: Bound = Bound::Bounded {
155 max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
156 is_fixed_size: false,
157 };
158}
159
160fn validate_typed_schema_snapshot_for_store(
164 snapshot: &PersistedSchemaSnapshot,
165) -> Result<(), InternalError> {
166 if let Some(detail) = schema_snapshot_integrity_detail(
167 "schema snapshot",
168 snapshot.version(),
169 snapshot.primary_key_field_ids(),
170 snapshot.row_layout(),
171 snapshot.fields(),
172 ) {
173 return Err(InternalError::store_invariant(detail));
174 }
175
176 Ok(())
177}
178
179#[derive(Clone, Copy, Debug, Eq, PartialEq)]
188pub(in crate::db) struct SchemaStoreFootprint {
189 snapshots: u64,
190 encoded_bytes: u64,
191 latest_snapshot_bytes: u64,
192}
193
194impl SchemaStoreFootprint {
195 #[must_use]
197 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
198 Self {
199 snapshots,
200 encoded_bytes,
201 latest_snapshot_bytes,
202 }
203 }
204
205 #[must_use]
207 pub(in crate::db) const fn snapshots(self) -> u64 {
208 self.snapshots
209 }
210
211 #[must_use]
213 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
214 self.encoded_bytes
215 }
216
217 #[must_use]
219 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
220 self.latest_snapshot_bytes
221 }
222}
223
224pub struct SchemaStore {
233 map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
234}
235
236impl SchemaStore {
237 #[must_use]
239 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
240 Self {
241 map: BTreeMap::init(memory),
242 }
243 }
244
245 pub(in crate::db) fn insert_persisted_snapshot(
247 &mut self,
248 entity: EntityTag,
249 snapshot: &PersistedSchemaSnapshot,
250 ) -> Result<(), InternalError> {
251 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
252 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
253 let _ = self.insert_raw_snapshot(key, raw_snapshot);
254
255 Ok(())
256 }
257
258 #[cfg(test)]
260 pub(in crate::db) fn get_persisted_snapshot(
261 &self,
262 entity: EntityTag,
263 version: SchemaVersion,
264 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
265 let key = RawSchemaKey::from_entity_version(entity, version);
266 self.get_raw_snapshot(&key)
267 .map(|snapshot| snapshot.decode_persisted_snapshot())
268 .transpose()
269 }
270
271 pub(in crate::db) fn latest_persisted_snapshot(
273 &self,
274 entity: EntityTag,
275 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
276 let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
277 for entry in self.map.iter() {
278 let (key, snapshot) = entry.into_pair();
279 if key.entity_tag() != entity {
280 continue;
281 }
282
283 let version = SchemaVersion::new(key.version());
284 if latest
285 .as_ref()
286 .is_none_or(|(latest_version, _)| version > *latest_version)
287 {
288 latest = Some((version, snapshot));
289 }
290 }
291
292 latest
293 .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
294 .transpose()
295 }
296
297 #[must_use]
299 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
300 let mut snapshots = 0u64;
301 let mut encoded_bytes = 0u64;
302 let mut latest = None::<(SchemaVersion, u64)>;
303
304 for entry in self.map.iter() {
305 let (key, snapshot) = entry.into_pair();
306 if key.entity_tag() != entity {
307 continue;
308 }
309
310 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
311 snapshots = snapshots.saturating_add(1);
312 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
313
314 let version = SchemaVersion::new(key.version());
315 if latest
316 .as_ref()
317 .is_none_or(|(latest_version, _)| version > *latest_version)
318 {
319 latest = Some((version, snapshot_bytes));
320 }
321 }
322
323 SchemaStoreFootprint::new(
324 snapshots,
325 encoded_bytes,
326 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
327 )
328 }
329
330 fn insert_raw_snapshot(
332 &mut self,
333 key: RawSchemaKey,
334 snapshot: RawSchemaSnapshot,
335 ) -> Option<RawSchemaSnapshot> {
336 self.map.insert(key, snapshot)
337 }
338
339 #[must_use]
341 #[cfg(test)]
342 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
343 self.map.get(key)
344 }
345
346 #[must_use]
348 #[cfg(test)]
349 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
350 self.map.contains_key(key)
351 }
352
353 #[must_use]
355 #[cfg(test)]
356 pub(in crate::db) fn len(&self) -> u64 {
357 self.map.len()
358 }
359
360 #[must_use]
362 #[cfg(test)]
363 pub(in crate::db) fn is_empty(&self) -> bool {
364 self.map.is_empty()
365 }
366
367 #[cfg(test)]
369 pub(in crate::db) fn clear(&mut self) {
370 self.map.clear_new();
371 }
372}
373
374#[cfg(test)]
379mod tests {
380 use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
381 use crate::{
382 db::schema::{
383 FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedNestedLeafSnapshot,
384 PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
385 SchemaVersion, encode_persisted_schema_snapshot,
386 },
387 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
388 testing::test_memory,
389 traits::Storable,
390 types::EntityTag,
391 };
392 use std::borrow::Cow;
393
394 #[test]
395 fn raw_schema_key_round_trips_entity_and_version() {
396 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
397 SchemaVersion::initial()
398 });
399 let encoded = key.to_bytes().into_owned();
400 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
401
402 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
403 assert_eq!(decoded.version(), SchemaVersion::initial().get());
404 }
405
406 #[test]
407 fn raw_schema_snapshot_round_trips_payload_bytes() {
408 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
409 let encoded = snapshot.to_bytes().into_owned();
410 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
411
412 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
413 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
414 }
415
416 #[test]
417 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
418 let mut store = SchemaStore::init(test_memory(251));
419 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
420
421 assert!(store.is_empty());
422 assert!(!store.contains_raw_snapshot(&key));
423
424 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
425
426 assert_eq!(store.len(), 1);
427 assert!(store.contains_raw_snapshot(&key));
428 assert_eq!(
429 store
430 .get_raw_snapshot(&key)
431 .expect("schema snapshot should be present")
432 .as_bytes(),
433 &[9, 4, 6],
434 );
435
436 store.clear();
437 assert!(store.is_empty());
438 }
439
440 #[test]
441 fn schema_store_loads_latest_snapshot_for_entity() {
442 let mut store = SchemaStore::init(test_memory(252));
443 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
444 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
445 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
446
447 store
448 .insert_persisted_snapshot(EntityTag::new(41), &initial)
449 .expect("initial schema snapshot should encode");
450 store
451 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
452 .expect("other entity schema snapshot should encode");
453 store
454 .insert_persisted_snapshot(EntityTag::new(41), &newer)
455 .expect("newer schema snapshot should encode");
456
457 let latest = store
458 .latest_persisted_snapshot(EntityTag::new(41))
459 .expect("latest schema snapshot should decode")
460 .expect("schema snapshot should exist");
461
462 assert_eq!(latest.version(), SchemaVersion::new(2));
463 assert_eq!(latest.entity_name(), "Newer");
464 }
465
466 #[test]
467 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
468 let mut store = SchemaStore::init(test_memory(242));
469 store.insert_raw_snapshot(
470 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
471 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
472 );
473 store.insert_raw_snapshot(
474 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
475 RawSchemaSnapshot::from_bytes(vec![5, 8]),
476 );
477 store.insert_raw_snapshot(
478 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
479 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
480 );
481
482 let footprint = store.entity_footprint(EntityTag::new(71));
483
484 assert_eq!(footprint.snapshots(), 2);
485 assert_eq!(footprint.encoded_bytes(), 7);
486 assert_eq!(footprint.latest_snapshot_bytes(), 4);
487 }
488
489 #[test]
490 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
491 let mut store = SchemaStore::init(test_memory(253));
492 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
493 SchemaVersion::new(2),
494 SchemaVersion::initial(),
495 "Invalid",
496 );
497
498 let err = store
499 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
500 .expect_err("schema store should reject mismatched snapshot/layout versions");
501
502 assert!(
503 err.message()
504 .contains("schema snapshot row-layout version mismatch"),
505 "schema store should preserve the version mismatch diagnostic"
506 );
507 }
508
509 #[test]
510 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
511 let mut store = SchemaStore::init(test_memory(254));
512 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
513 let invalid = PersistedSchemaSnapshot::new(
514 base.version(),
515 base.entity_path().to_string(),
516 base.entity_name().to_string(),
517 base.primary_key_field_id(),
518 SchemaRowLayout::new(
519 base.version(),
520 vec![
521 (FieldId::new(1), SchemaFieldSlot::new(0)),
522 (FieldId::new(2), SchemaFieldSlot::new(3)),
523 ],
524 ),
525 base.fields().to_vec(),
526 );
527
528 let err = store
529 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
530 .expect_err("schema store should reject divergent field/layout slots");
531
532 assert!(
533 err.message()
534 .contains("schema snapshot field slot mismatch"),
535 "schema store should report the duplicated slot divergence"
536 );
537 }
538
539 #[test]
540 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
541 let mut store = SchemaStore::init(test_memory(246));
542 let base =
543 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
544 let invalid = PersistedSchemaSnapshot::new(
545 base.version(),
546 base.entity_path().to_string(),
547 base.entity_name().to_string(),
548 base.primary_key_field_id(),
549 SchemaRowLayout::new(
550 base.version(),
551 vec![
552 (FieldId::new(1), SchemaFieldSlot::new(0)),
553 (FieldId::new(2), SchemaFieldSlot::new(0)),
554 ],
555 ),
556 base.fields().to_vec(),
557 );
558
559 let err = store
560 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
561 .expect_err("schema store should reject duplicate row-layout slots");
562
563 assert!(
564 err.message()
565 .contains("schema snapshot duplicate row-layout slot"),
566 "schema store should report the row-layout slot ambiguity"
567 );
568 }
569
570 #[test]
571 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
572 let mut store = SchemaStore::init(test_memory(248));
573 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
574 let invalid = PersistedSchemaSnapshot::new(
575 base.version(),
576 base.entity_path().to_string(),
577 base.entity_name().to_string(),
578 FieldId::new(99),
579 base.row_layout().clone(),
580 base.fields().to_vec(),
581 );
582
583 let err = store
584 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
585 .expect_err("schema store should reject snapshots without the primary-key field");
586
587 assert!(
588 err.message()
589 .contains("schema snapshot primary key field missing from row layout"),
590 "schema store should report the missing primary-key field"
591 );
592 }
593
594 #[test]
595 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
596 let mut store = SchemaStore::init(test_memory(249));
597 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
598 let corrupt_key =
599 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
600
601 store
602 .insert_persisted_snapshot(EntityTag::new(45), &initial)
603 .expect("initial schema snapshot should encode");
604 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
605
606 let err = store
607 .latest_persisted_snapshot(EntityTag::new(45))
608 .expect_err("latest corrupt schema snapshot must fail closed");
609
610 assert!(
611 err.message()
612 .contains("failed to decode persisted schema snapshot"),
613 "latest-version lookup should report the corrupt newest snapshot"
614 );
615 }
616
617 #[test]
618 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
619 let mut store = SchemaStore::init(test_memory(250));
620 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
621 let invalid = PersistedSchemaSnapshot::new(
622 base.version(),
623 base.entity_path().to_string(),
624 base.entity_name().to_string(),
625 base.primary_key_field_id(),
626 SchemaRowLayout::new(
627 base.version(),
628 vec![
629 (FieldId::new(1), SchemaFieldSlot::new(0)),
630 (FieldId::new(2), SchemaFieldSlot::new(3)),
631 ],
632 ),
633 base.fields().to_vec(),
634 );
635 let raw = encode_persisted_schema_snapshot(&invalid)
636 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
637 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
638
639 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
640
641 let err = store
642 .latest_persisted_snapshot(EntityTag::new(46))
643 .expect_err("raw decode should reject divergent field/layout slots");
644
645 assert!(
646 err.message()
647 .contains("persisted schema snapshot field slot mismatch"),
648 "schema codec should report the raw decoded slot divergence"
649 );
650 }
651
652 #[test]
653 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
654 let mut store = SchemaStore::init(test_memory(247));
655 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
656 let invalid = PersistedSchemaSnapshot::new(
657 base.version(),
658 base.entity_path().to_string(),
659 base.entity_name().to_string(),
660 FieldId::new(99),
661 base.row_layout().clone(),
662 base.fields().to_vec(),
663 );
664 let raw = encode_persisted_schema_snapshot(&invalid)
665 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
666 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
667
668 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
669
670 let err = store
671 .latest_persisted_snapshot(EntityTag::new(48))
672 .expect_err("raw decode should reject snapshots without the primary-key field");
673
674 assert!(
675 err.message()
676 .contains("persisted schema snapshot primary key field missing from row layout"),
677 "schema codec should report the raw decoded missing primary-key field"
678 );
679 }
680
681 #[test]
682 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
683 let mut store = SchemaStore::init(test_memory(245));
684 let base =
685 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
686 let mut fields = base.fields().to_vec();
687 let duplicate = PersistedFieldSnapshot::new(
688 fields[1].id(),
689 fields[0].name().to_string(),
690 fields[1].slot(),
691 fields[1].kind().clone(),
692 fields[1].nested_leaves().to_vec(),
693 fields[1].nullable(),
694 fields[1].default().clone(),
695 fields[1].storage_decode(),
696 fields[1].leaf_codec(),
697 );
698 fields[1] = duplicate;
699 let invalid = PersistedSchemaSnapshot::new(
700 base.version(),
701 base.entity_path().to_string(),
702 base.entity_name().to_string(),
703 base.primary_key_field_id(),
704 base.row_layout().clone(),
705 fields,
706 );
707 let raw = encode_persisted_schema_snapshot(&invalid)
708 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
709 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
710
711 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
712
713 let err = store
714 .latest_persisted_snapshot(EntityTag::new(50))
715 .expect_err("raw decode should reject duplicate field names");
716
717 assert!(
718 err.message()
719 .contains("persisted schema snapshot duplicate field name"),
720 "schema codec should report the raw decoded field-name ambiguity"
721 );
722 }
723
724 #[test]
725 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
726 let mut store = SchemaStore::init(test_memory(244));
727 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
728 let mut fields = base.fields().to_vec();
729 let invalid_field = PersistedFieldSnapshot::new(
730 fields[1].id(),
731 fields[1].name().to_string(),
732 fields[1].slot(),
733 fields[1].kind().clone(),
734 vec![PersistedNestedLeafSnapshot::new(
735 Vec::new(),
736 PersistedFieldKind::Blob { max_len: None },
737 false,
738 FieldStorageDecode::ByKind,
739 LeafCodec::Scalar(ScalarCodec::Blob),
740 )],
741 fields[1].nullable(),
742 fields[1].default().clone(),
743 fields[1].storage_decode(),
744 fields[1].leaf_codec(),
745 );
746 fields[1] = invalid_field;
747 let invalid = PersistedSchemaSnapshot::new(
748 base.version(),
749 base.entity_path().to_string(),
750 base.entity_name().to_string(),
751 base.primary_key_field_id(),
752 base.row_layout().clone(),
753 fields,
754 );
755
756 let err = store
757 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
758 .expect_err("schema store should reject empty nested leaf paths");
759
760 assert!(
761 err.message()
762 .contains("schema snapshot empty nested leaf path"),
763 "schema store should report the empty nested leaf path"
764 );
765 }
766
767 #[test]
768 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
769 let mut store = SchemaStore::init(test_memory(243));
770 let base =
771 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
772 let mut fields = base.fields().to_vec();
773 let duplicate_leaves = vec![
774 PersistedNestedLeafSnapshot::new(
775 vec!["bytes".to_string()],
776 PersistedFieldKind::Blob { max_len: None },
777 false,
778 FieldStorageDecode::ByKind,
779 LeafCodec::Scalar(ScalarCodec::Blob),
780 ),
781 PersistedNestedLeafSnapshot::new(
782 vec!["bytes".to_string()],
783 PersistedFieldKind::Text { max_len: None },
784 false,
785 FieldStorageDecode::ByKind,
786 LeafCodec::Scalar(ScalarCodec::Text),
787 ),
788 ];
789 let invalid_field = PersistedFieldSnapshot::new(
790 fields[1].id(),
791 fields[1].name().to_string(),
792 fields[1].slot(),
793 fields[1].kind().clone(),
794 duplicate_leaves,
795 fields[1].nullable(),
796 fields[1].default().clone(),
797 fields[1].storage_decode(),
798 fields[1].leaf_codec(),
799 );
800 fields[1] = invalid_field;
801 let invalid = PersistedSchemaSnapshot::new(
802 base.version(),
803 base.entity_path().to_string(),
804 base.entity_name().to_string(),
805 base.primary_key_field_id(),
806 base.row_layout().clone(),
807 fields,
808 );
809 let raw = encode_persisted_schema_snapshot(&invalid)
810 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
811 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
812
813 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
814
815 let err = store
816 .latest_persisted_snapshot(EntityTag::new(52))
817 .expect_err("raw decode should reject duplicate nested leaf paths");
818
819 assert!(
820 err.message()
821 .contains("persisted schema snapshot duplicate nested leaf path"),
822 "schema codec should report the raw decoded nested path ambiguity"
823 );
824 }
825
826 #[test]
827 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
828 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
829
830 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
831 .expect("schema snapshot should encode");
832 let decoded = raw
833 .decode_persisted_snapshot()
834 .expect("schema snapshot should decode");
835
836 assert_eq!(decoded, snapshot);
837 }
838
839 fn persisted_schema_snapshot_for_test(
843 version: SchemaVersion,
844 entity_name: &str,
845 ) -> PersistedSchemaSnapshot {
846 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
847 }
848
849 fn persisted_schema_snapshot_with_layout_version_for_test(
853 version: SchemaVersion,
854 layout_version: SchemaVersion,
855 entity_name: &str,
856 ) -> PersistedSchemaSnapshot {
857 PersistedSchemaSnapshot::new(
858 version,
859 format!("entities::{entity_name}"),
860 entity_name.to_string(),
861 FieldId::new(1),
862 SchemaRowLayout::new(
863 layout_version,
864 vec![
865 (FieldId::new(1), SchemaFieldSlot::new(0)),
866 (FieldId::new(2), SchemaFieldSlot::new(1)),
867 ],
868 ),
869 vec![
870 PersistedFieldSnapshot::new(
871 FieldId::new(1),
872 "id".to_string(),
873 SchemaFieldSlot::new(0),
874 PersistedFieldKind::Ulid,
875 Vec::new(),
876 false,
877 SchemaFieldDefault::None,
878 FieldStorageDecode::ByKind,
879 LeafCodec::Scalar(ScalarCodec::Ulid),
880 ),
881 PersistedFieldSnapshot::new(
882 FieldId::new(2),
883 "payload".to_string(),
884 SchemaFieldSlot::new(1),
885 PersistedFieldKind::Map {
886 key: Box::new(PersistedFieldKind::Text { max_len: None }),
887 value: Box::new(PersistedFieldKind::List(Box::new(
888 PersistedFieldKind::Nat64,
889 ))),
890 },
891 vec![PersistedNestedLeafSnapshot::new(
892 vec!["bytes".to_string()],
893 PersistedFieldKind::Blob { max_len: None },
894 false,
895 FieldStorageDecode::ByKind,
896 LeafCodec::Scalar(ScalarCodec::Blob),
897 )],
898 false,
899 SchemaFieldDefault::None,
900 FieldStorageDecode::ByKind,
901 LeafCodec::StructuralFallback,
902 ),
903 ],
904 )
905 }
906}