1use crate::{
7 db::schema::{
8 PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
9 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
10 },
11 error::InternalError,
12 traits::Storable,
13 types::EntityTag,
14};
15use canic_cdk::structures::{BTreeMap, DefaultMemoryImpl, memory::VirtualMemory, storable::Bound};
16use std::borrow::Cow;
17
18const SCHEMA_KEY_BYTES_USIZE: usize = 12;
19const SCHEMA_KEY_BYTES: u32 = 12;
20const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
21
22#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
31struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
32
33impl RawSchemaKey {
34 #[must_use]
36 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
37 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
38 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
39 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
40
41 Self(out)
42 }
43
44 #[must_use]
46 fn entity_tag(self) -> EntityTag {
47 let mut bytes = [0u8; size_of::<u64>()];
48 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
49
50 EntityTag::new(u64::from_be_bytes(bytes))
51 }
52
53 #[must_use]
55 fn version(self) -> u32 {
56 let mut bytes = [0u8; size_of::<u32>()];
57 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
58
59 u32::from_be_bytes(bytes)
60 }
61}
62
63impl Storable for RawSchemaKey {
64 fn to_bytes(&self) -> Cow<'_, [u8]> {
65 Cow::Borrowed(&self.0)
66 }
67
68 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
69 debug_assert_eq!(
70 bytes.len(),
71 SCHEMA_KEY_BYTES_USIZE,
72 "RawSchemaKey::from_bytes received unexpected byte length",
73 );
74
75 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
76 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
77 }
78
79 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
80 out.copy_from_slice(bytes.as_ref());
81 Self(out)
82 }
83
84 fn into_bytes(self) -> Vec<u8> {
85 self.0.to_vec()
86 }
87
88 const BOUND: Bound = Bound::Bounded {
89 max_size: SCHEMA_KEY_BYTES,
90 is_fixed_size: true,
91 };
92}
93
94#[derive(Clone, Debug, Eq, PartialEq)]
104struct RawSchemaSnapshot(Vec<u8>);
105
106impl RawSchemaSnapshot {
107 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
109 validate_typed_schema_snapshot_for_store(snapshot)?;
110
111 encode_persisted_schema_snapshot(snapshot).map(Self)
112 }
113
114 #[must_use]
116 #[cfg(test)]
117 const fn from_bytes(bytes: Vec<u8>) -> Self {
118 Self(bytes)
119 }
120
121 #[must_use]
123 const fn as_bytes(&self) -> &[u8] {
124 self.0.as_slice()
125 }
126
127 #[must_use]
129 #[cfg(test)]
130 fn into_bytes(self) -> Vec<u8> {
131 self.0
132 }
133
134 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
136 decode_persisted_schema_snapshot(self.as_bytes())
137 }
138}
139
140impl Storable for RawSchemaSnapshot {
141 fn to_bytes(&self) -> Cow<'_, [u8]> {
142 Cow::Borrowed(self.as_bytes())
143 }
144
145 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
146 Self(bytes.into_owned())
147 }
148
149 fn into_bytes(self) -> Vec<u8> {
150 self.0
151 }
152
153 const BOUND: Bound = Bound::Bounded {
154 max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
155 is_fixed_size: false,
156 };
157}
158
159fn validate_typed_schema_snapshot_for_store(
163 snapshot: &PersistedSchemaSnapshot,
164) -> Result<(), InternalError> {
165 if let Some(detail) = schema_snapshot_integrity_detail(
166 "schema snapshot",
167 snapshot.version(),
168 snapshot.primary_key_field_id(),
169 snapshot.row_layout(),
170 snapshot.fields(),
171 ) {
172 return Err(InternalError::store_invariant(detail));
173 }
174
175 Ok(())
176}
177
178#[derive(Clone, Copy, Debug, Eq, PartialEq)]
187pub(in crate::db) struct SchemaStoreFootprint {
188 snapshots: u64,
189 encoded_bytes: u64,
190 latest_snapshot_bytes: u64,
191}
192
193impl SchemaStoreFootprint {
194 #[must_use]
196 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
197 Self {
198 snapshots,
199 encoded_bytes,
200 latest_snapshot_bytes,
201 }
202 }
203
204 #[must_use]
206 pub(in crate::db) const fn snapshots(self) -> u64 {
207 self.snapshots
208 }
209
210 #[must_use]
212 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
213 self.encoded_bytes
214 }
215
216 #[must_use]
218 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
219 self.latest_snapshot_bytes
220 }
221}
222
223pub struct SchemaStore {
232 map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
233}
234
235impl SchemaStore {
236 #[must_use]
238 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
239 Self {
240 map: BTreeMap::init(memory),
241 }
242 }
243
244 pub(in crate::db) fn insert_persisted_snapshot(
246 &mut self,
247 entity: EntityTag,
248 snapshot: &PersistedSchemaSnapshot,
249 ) -> Result<(), InternalError> {
250 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
251 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
252 let _ = self.insert_raw_snapshot(key, raw_snapshot);
253
254 Ok(())
255 }
256
257 #[cfg(test)]
259 pub(in crate::db) fn get_persisted_snapshot(
260 &self,
261 entity: EntityTag,
262 version: SchemaVersion,
263 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
264 let key = RawSchemaKey::from_entity_version(entity, version);
265 self.get_raw_snapshot(&key)
266 .map(|snapshot| snapshot.decode_persisted_snapshot())
267 .transpose()
268 }
269
270 pub(in crate::db) fn latest_persisted_snapshot(
272 &self,
273 entity: EntityTag,
274 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
275 let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
276 for entry in self.map.iter() {
277 let (key, snapshot) = entry.into_pair();
278 if key.entity_tag() != entity {
279 continue;
280 }
281
282 let version = SchemaVersion::new(key.version());
283 if latest
284 .as_ref()
285 .is_none_or(|(latest_version, _)| version > *latest_version)
286 {
287 latest = Some((version, snapshot));
288 }
289 }
290
291 latest
292 .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
293 .transpose()
294 }
295
296 #[must_use]
298 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
299 let mut snapshots = 0u64;
300 let mut encoded_bytes = 0u64;
301 let mut latest = None::<(SchemaVersion, u64)>;
302
303 for entry in self.map.iter() {
304 let (key, snapshot) = entry.into_pair();
305 if key.entity_tag() != entity {
306 continue;
307 }
308
309 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
310 snapshots = snapshots.saturating_add(1);
311 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
312
313 let version = SchemaVersion::new(key.version());
314 if latest
315 .as_ref()
316 .is_none_or(|(latest_version, _)| version > *latest_version)
317 {
318 latest = Some((version, snapshot_bytes));
319 }
320 }
321
322 SchemaStoreFootprint::new(
323 snapshots,
324 encoded_bytes,
325 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
326 )
327 }
328
329 fn insert_raw_snapshot(
331 &mut self,
332 key: RawSchemaKey,
333 snapshot: RawSchemaSnapshot,
334 ) -> Option<RawSchemaSnapshot> {
335 self.map.insert(key, snapshot)
336 }
337
338 #[must_use]
340 #[cfg(test)]
341 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
342 self.map.get(key)
343 }
344
345 #[must_use]
347 #[cfg(test)]
348 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
349 self.map.contains_key(key)
350 }
351
352 #[must_use]
354 #[cfg(test)]
355 pub(in crate::db) fn len(&self) -> u64 {
356 self.map.len()
357 }
358
359 #[must_use]
361 #[cfg(test)]
362 pub(in crate::db) fn is_empty(&self) -> bool {
363 self.map.is_empty()
364 }
365
366 #[cfg(test)]
368 pub(in crate::db) fn clear(&mut self) {
369 self.map.clear();
370 }
371}
372
373#[cfg(test)]
378mod tests {
379 use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
380 use crate::{
381 db::schema::{
382 FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedNestedLeafSnapshot,
383 PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
384 SchemaVersion, encode_persisted_schema_snapshot,
385 },
386 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
387 testing::test_memory,
388 traits::Storable,
389 types::EntityTag,
390 };
391 use std::borrow::Cow;
392
393 #[test]
394 fn raw_schema_key_round_trips_entity_and_version() {
395 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
396 SchemaVersion::initial()
397 });
398 let encoded = key.to_bytes().into_owned();
399 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
400
401 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
402 assert_eq!(decoded.version(), SchemaVersion::initial().get());
403 }
404
405 #[test]
406 fn raw_schema_snapshot_round_trips_payload_bytes() {
407 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
408 let encoded = snapshot.to_bytes().into_owned();
409 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
410
411 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
412 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
413 }
414
415 #[test]
416 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
417 let mut store = SchemaStore::init(test_memory(251));
418 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
419
420 assert!(store.is_empty());
421 assert!(!store.contains_raw_snapshot(&key));
422
423 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
424
425 assert_eq!(store.len(), 1);
426 assert!(store.contains_raw_snapshot(&key));
427 assert_eq!(
428 store
429 .get_raw_snapshot(&key)
430 .expect("schema snapshot should be present")
431 .as_bytes(),
432 &[9, 4, 6],
433 );
434
435 store.clear();
436 assert!(store.is_empty());
437 }
438
439 #[test]
440 fn schema_store_loads_latest_snapshot_for_entity() {
441 let mut store = SchemaStore::init(test_memory(252));
442 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
443 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
444 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
445
446 store
447 .insert_persisted_snapshot(EntityTag::new(41), &initial)
448 .expect("initial schema snapshot should encode");
449 store
450 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
451 .expect("other entity schema snapshot should encode");
452 store
453 .insert_persisted_snapshot(EntityTag::new(41), &newer)
454 .expect("newer schema snapshot should encode");
455
456 let latest = store
457 .latest_persisted_snapshot(EntityTag::new(41))
458 .expect("latest schema snapshot should decode")
459 .expect("schema snapshot should exist");
460
461 assert_eq!(latest.version(), SchemaVersion::new(2));
462 assert_eq!(latest.entity_name(), "Newer");
463 }
464
465 #[test]
466 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
467 let mut store = SchemaStore::init(test_memory(242));
468 store.insert_raw_snapshot(
469 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
470 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
471 );
472 store.insert_raw_snapshot(
473 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
474 RawSchemaSnapshot::from_bytes(vec![5, 8]),
475 );
476 store.insert_raw_snapshot(
477 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
478 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
479 );
480
481 let footprint = store.entity_footprint(EntityTag::new(71));
482
483 assert_eq!(footprint.snapshots(), 2);
484 assert_eq!(footprint.encoded_bytes(), 7);
485 assert_eq!(footprint.latest_snapshot_bytes(), 4);
486 }
487
488 #[test]
489 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
490 let mut store = SchemaStore::init(test_memory(253));
491 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
492 SchemaVersion::new(2),
493 SchemaVersion::initial(),
494 "Invalid",
495 );
496
497 let err = store
498 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
499 .expect_err("schema store should reject mismatched snapshot/layout versions");
500
501 assert!(
502 err.message()
503 .contains("schema snapshot row-layout version mismatch"),
504 "schema store should preserve the version mismatch diagnostic"
505 );
506 }
507
508 #[test]
509 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
510 let mut store = SchemaStore::init(test_memory(254));
511 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
512 let invalid = PersistedSchemaSnapshot::new(
513 base.version(),
514 base.entity_path().to_string(),
515 base.entity_name().to_string(),
516 base.primary_key_field_id(),
517 SchemaRowLayout::new(
518 base.version(),
519 vec![
520 (FieldId::new(1), SchemaFieldSlot::new(0)),
521 (FieldId::new(2), SchemaFieldSlot::new(3)),
522 ],
523 ),
524 base.fields().to_vec(),
525 );
526
527 let err = store
528 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
529 .expect_err("schema store should reject divergent field/layout slots");
530
531 assert!(
532 err.message()
533 .contains("schema snapshot field slot mismatch"),
534 "schema store should report the duplicated slot divergence"
535 );
536 }
537
538 #[test]
539 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
540 let mut store = SchemaStore::init(test_memory(246));
541 let base =
542 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
543 let invalid = PersistedSchemaSnapshot::new(
544 base.version(),
545 base.entity_path().to_string(),
546 base.entity_name().to_string(),
547 base.primary_key_field_id(),
548 SchemaRowLayout::new(
549 base.version(),
550 vec![
551 (FieldId::new(1), SchemaFieldSlot::new(0)),
552 (FieldId::new(2), SchemaFieldSlot::new(0)),
553 ],
554 ),
555 base.fields().to_vec(),
556 );
557
558 let err = store
559 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
560 .expect_err("schema store should reject duplicate row-layout slots");
561
562 assert!(
563 err.message()
564 .contains("schema snapshot duplicate row-layout slot"),
565 "schema store should report the row-layout slot ambiguity"
566 );
567 }
568
569 #[test]
570 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
571 let mut store = SchemaStore::init(test_memory(248));
572 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
573 let invalid = PersistedSchemaSnapshot::new(
574 base.version(),
575 base.entity_path().to_string(),
576 base.entity_name().to_string(),
577 FieldId::new(99),
578 base.row_layout().clone(),
579 base.fields().to_vec(),
580 );
581
582 let err = store
583 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
584 .expect_err("schema store should reject snapshots without the primary-key field");
585
586 assert!(
587 err.message()
588 .contains("schema snapshot primary key field missing from row layout"),
589 "schema store should report the missing primary-key field"
590 );
591 }
592
593 #[test]
594 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
595 let mut store = SchemaStore::init(test_memory(249));
596 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
597 let corrupt_key =
598 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
599
600 store
601 .insert_persisted_snapshot(EntityTag::new(45), &initial)
602 .expect("initial schema snapshot should encode");
603 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
604
605 let err = store
606 .latest_persisted_snapshot(EntityTag::new(45))
607 .expect_err("latest corrupt schema snapshot must fail closed");
608
609 assert!(
610 err.message()
611 .contains("failed to decode persisted schema snapshot"),
612 "latest-version lookup should report the corrupt newest snapshot"
613 );
614 }
615
616 #[test]
617 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
618 let mut store = SchemaStore::init(test_memory(250));
619 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
620 let invalid = PersistedSchemaSnapshot::new(
621 base.version(),
622 base.entity_path().to_string(),
623 base.entity_name().to_string(),
624 base.primary_key_field_id(),
625 SchemaRowLayout::new(
626 base.version(),
627 vec![
628 (FieldId::new(1), SchemaFieldSlot::new(0)),
629 (FieldId::new(2), SchemaFieldSlot::new(3)),
630 ],
631 ),
632 base.fields().to_vec(),
633 );
634 let raw = encode_persisted_schema_snapshot(&invalid)
635 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
636 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
637
638 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
639
640 let err = store
641 .latest_persisted_snapshot(EntityTag::new(46))
642 .expect_err("raw decode should reject divergent field/layout slots");
643
644 assert!(
645 err.message()
646 .contains("persisted schema snapshot field slot mismatch"),
647 "schema codec should report the raw decoded slot divergence"
648 );
649 }
650
651 #[test]
652 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
653 let mut store = SchemaStore::init(test_memory(247));
654 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
655 let invalid = PersistedSchemaSnapshot::new(
656 base.version(),
657 base.entity_path().to_string(),
658 base.entity_name().to_string(),
659 FieldId::new(99),
660 base.row_layout().clone(),
661 base.fields().to_vec(),
662 );
663 let raw = encode_persisted_schema_snapshot(&invalid)
664 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
665 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
666
667 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
668
669 let err = store
670 .latest_persisted_snapshot(EntityTag::new(48))
671 .expect_err("raw decode should reject snapshots without the primary-key field");
672
673 assert!(
674 err.message()
675 .contains("persisted schema snapshot primary key field missing from row layout"),
676 "schema codec should report the raw decoded missing primary-key field"
677 );
678 }
679
680 #[test]
681 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
682 let mut store = SchemaStore::init(test_memory(245));
683 let base =
684 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
685 let mut fields = base.fields().to_vec();
686 let duplicate = PersistedFieldSnapshot::new(
687 fields[1].id(),
688 fields[0].name().to_string(),
689 fields[1].slot(),
690 fields[1].kind().clone(),
691 fields[1].nested_leaves().to_vec(),
692 fields[1].nullable(),
693 fields[1].default(),
694 fields[1].storage_decode(),
695 fields[1].leaf_codec(),
696 );
697 fields[1] = duplicate;
698 let invalid = PersistedSchemaSnapshot::new(
699 base.version(),
700 base.entity_path().to_string(),
701 base.entity_name().to_string(),
702 base.primary_key_field_id(),
703 base.row_layout().clone(),
704 fields,
705 );
706 let raw = encode_persisted_schema_snapshot(&invalid)
707 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
708 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
709
710 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
711
712 let err = store
713 .latest_persisted_snapshot(EntityTag::new(50))
714 .expect_err("raw decode should reject duplicate field names");
715
716 assert!(
717 err.message()
718 .contains("persisted schema snapshot duplicate field name"),
719 "schema codec should report the raw decoded field-name ambiguity"
720 );
721 }
722
723 #[test]
724 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
725 let mut store = SchemaStore::init(test_memory(244));
726 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
727 let mut fields = base.fields().to_vec();
728 let invalid_field = PersistedFieldSnapshot::new(
729 fields[1].id(),
730 fields[1].name().to_string(),
731 fields[1].slot(),
732 fields[1].kind().clone(),
733 vec![PersistedNestedLeafSnapshot::new(
734 Vec::new(),
735 PersistedFieldKind::Blob { max_len: None },
736 false,
737 FieldStorageDecode::ByKind,
738 LeafCodec::Scalar(ScalarCodec::Blob),
739 )],
740 fields[1].nullable(),
741 fields[1].default(),
742 fields[1].storage_decode(),
743 fields[1].leaf_codec(),
744 );
745 fields[1] = invalid_field;
746 let invalid = PersistedSchemaSnapshot::new(
747 base.version(),
748 base.entity_path().to_string(),
749 base.entity_name().to_string(),
750 base.primary_key_field_id(),
751 base.row_layout().clone(),
752 fields,
753 );
754
755 let err = store
756 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
757 .expect_err("schema store should reject empty nested leaf paths");
758
759 assert!(
760 err.message()
761 .contains("schema snapshot empty nested leaf path"),
762 "schema store should report the empty nested leaf path"
763 );
764 }
765
766 #[test]
767 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
768 let mut store = SchemaStore::init(test_memory(243));
769 let base =
770 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
771 let mut fields = base.fields().to_vec();
772 let duplicate_leaves = vec![
773 PersistedNestedLeafSnapshot::new(
774 vec!["bytes".to_string()],
775 PersistedFieldKind::Blob { max_len: None },
776 false,
777 FieldStorageDecode::ByKind,
778 LeafCodec::Scalar(ScalarCodec::Blob),
779 ),
780 PersistedNestedLeafSnapshot::new(
781 vec!["bytes".to_string()],
782 PersistedFieldKind::Text { max_len: None },
783 false,
784 FieldStorageDecode::ByKind,
785 LeafCodec::Scalar(ScalarCodec::Text),
786 ),
787 ];
788 let invalid_field = PersistedFieldSnapshot::new(
789 fields[1].id(),
790 fields[1].name().to_string(),
791 fields[1].slot(),
792 fields[1].kind().clone(),
793 duplicate_leaves,
794 fields[1].nullable(),
795 fields[1].default(),
796 fields[1].storage_decode(),
797 fields[1].leaf_codec(),
798 );
799 fields[1] = invalid_field;
800 let invalid = PersistedSchemaSnapshot::new(
801 base.version(),
802 base.entity_path().to_string(),
803 base.entity_name().to_string(),
804 base.primary_key_field_id(),
805 base.row_layout().clone(),
806 fields,
807 );
808 let raw = encode_persisted_schema_snapshot(&invalid)
809 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
810 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
811
812 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
813
814 let err = store
815 .latest_persisted_snapshot(EntityTag::new(52))
816 .expect_err("raw decode should reject duplicate nested leaf paths");
817
818 assert!(
819 err.message()
820 .contains("persisted schema snapshot duplicate nested leaf path"),
821 "schema codec should report the raw decoded nested path ambiguity"
822 );
823 }
824
825 #[test]
826 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
827 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
828
829 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
830 .expect("schema snapshot should encode");
831 let decoded = raw
832 .decode_persisted_snapshot()
833 .expect("schema snapshot should decode");
834
835 assert_eq!(decoded, snapshot);
836 }
837
838 fn persisted_schema_snapshot_for_test(
842 version: SchemaVersion,
843 entity_name: &str,
844 ) -> PersistedSchemaSnapshot {
845 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
846 }
847
848 fn persisted_schema_snapshot_with_layout_version_for_test(
852 version: SchemaVersion,
853 layout_version: SchemaVersion,
854 entity_name: &str,
855 ) -> PersistedSchemaSnapshot {
856 PersistedSchemaSnapshot::new(
857 version,
858 format!("entities::{entity_name}"),
859 entity_name.to_string(),
860 FieldId::new(1),
861 SchemaRowLayout::new(
862 layout_version,
863 vec![
864 (FieldId::new(1), SchemaFieldSlot::new(0)),
865 (FieldId::new(2), SchemaFieldSlot::new(1)),
866 ],
867 ),
868 vec![
869 PersistedFieldSnapshot::new(
870 FieldId::new(1),
871 "id".to_string(),
872 SchemaFieldSlot::new(0),
873 PersistedFieldKind::Ulid,
874 Vec::new(),
875 false,
876 SchemaFieldDefault::None,
877 FieldStorageDecode::ByKind,
878 LeafCodec::Scalar(ScalarCodec::Ulid),
879 ),
880 PersistedFieldSnapshot::new(
881 FieldId::new(2),
882 "payload".to_string(),
883 SchemaFieldSlot::new(1),
884 PersistedFieldKind::Map {
885 key: Box::new(PersistedFieldKind::Text { max_len: None }),
886 value: Box::new(PersistedFieldKind::List(Box::new(
887 PersistedFieldKind::Uint,
888 ))),
889 },
890 vec![PersistedNestedLeafSnapshot::new(
891 vec!["bytes".to_string()],
892 PersistedFieldKind::Blob { max_len: None },
893 false,
894 FieldStorageDecode::ByKind,
895 LeafCodec::Scalar(ScalarCodec::Blob),
896 )],
897 false,
898 SchemaFieldDefault::None,
899 FieldStorageDecode::ByKind,
900 LeafCodec::StructuralFallback,
901 ),
902 ],
903 )
904 }
905}