1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 schema::{
14 PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15 PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17 },
18 },
19 error::InternalError,
20 traits::Storable,
21 types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{
25 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
26};
27use sha2::Digest;
28use std::borrow::Cow;
29use std::collections::BTreeMap as StdBTreeMap;
30
31const SCHEMA_KEY_BYTES_USIZE: usize = 12;
32const SCHEMA_KEY_BYTES: u32 = 12;
33const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
34const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
35const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
36const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
37
38#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
47struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
48
49impl RawSchemaKey {
50 #[must_use]
52 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
53 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
54 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
55 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
56
57 Self(out)
58 }
59
60 #[must_use]
62 fn entity_tag(self) -> EntityTag {
63 let mut bytes = [0u8; size_of::<u64>()];
64 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
65
66 EntityTag::new(u64::from_be_bytes(bytes))
67 }
68
69 #[must_use]
71 fn version(self) -> u32 {
72 let mut bytes = [0u8; size_of::<u32>()];
73 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
74
75 u32::from_be_bytes(bytes)
76 }
77}
78
79impl Storable for RawSchemaKey {
80 fn to_bytes(&self) -> Cow<'_, [u8]> {
81 Cow::Borrowed(&self.0)
82 }
83
84 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
85 debug_assert_eq!(
86 bytes.len(),
87 SCHEMA_KEY_BYTES_USIZE,
88 "RawSchemaKey::from_bytes received unexpected byte length",
89 );
90
91 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
92 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
93 }
94
95 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
96 out.copy_from_slice(bytes.as_ref());
97 Self(out)
98 }
99
100 fn into_bytes(self) -> Vec<u8> {
101 self.0.to_vec()
102 }
103
104 const BOUND: Bound = Bound::Bounded {
105 max_size: SCHEMA_KEY_BYTES,
106 is_fixed_size: true,
107 };
108}
109
110#[derive(Clone, Debug, Eq, PartialEq)]
120struct RawSchemaSnapshot(Vec<u8>);
121
122impl RawSchemaSnapshot {
123 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
125 validate_typed_schema_snapshot_for_store(snapshot)?;
126
127 encode_persisted_schema_snapshot(snapshot).map(Self)
128 }
129
130 #[must_use]
132 #[cfg(test)]
133 const fn from_bytes(bytes: Vec<u8>) -> Self {
134 Self(bytes)
135 }
136
137 #[must_use]
139 const fn as_bytes(&self) -> &[u8] {
140 self.0.as_slice()
141 }
142
143 #[must_use]
145 #[cfg(test)]
146 fn into_bytes(self) -> Vec<u8> {
147 self.0
148 }
149
150 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
152 decode_persisted_schema_snapshot(self.as_bytes())
153 }
154}
155
156impl Storable for RawSchemaSnapshot {
157 fn to_bytes(&self) -> Cow<'_, [u8]> {
158 Cow::Borrowed(self.as_bytes())
159 }
160
161 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
162 Self(bytes.into_owned())
163 }
164
165 fn into_bytes(self) -> Vec<u8> {
166 self.0
167 }
168
169 const BOUND: Bound = Bound::Bounded {
170 max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
171 is_fixed_size: false,
172 };
173}
174
175fn validate_typed_schema_snapshot_for_store(
179 snapshot: &PersistedSchemaSnapshot,
180) -> Result<(), InternalError> {
181 if let Some(detail) = schema_snapshot_integrity_detail(
182 "schema snapshot",
183 snapshot.version(),
184 snapshot.primary_key_field_ids(),
185 snapshot.row_layout(),
186 snapshot.fields(),
187 ) {
188 return Err(InternalError::store_invariant(detail));
189 }
190
191 Ok(())
192}
193
194#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203pub(in crate::db) struct SchemaStoreFootprint {
204 snapshots: u64,
205 encoded_bytes: u64,
206 latest_snapshot_bytes: u64,
207}
208
209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct SchemaStoreCatalogMetadata {
218 schema_version: SchemaVersion,
219 schema_fingerprint: CommitSchemaFingerprint,
220 entity_count: u64,
221}
222
223impl SchemaStoreCatalogMetadata {
224 #[must_use]
226 const fn new(
227 schema_version: SchemaVersion,
228 schema_fingerprint: CommitSchemaFingerprint,
229 entity_count: u64,
230 ) -> Self {
231 Self {
232 schema_version,
233 schema_fingerprint,
234 entity_count,
235 }
236 }
237
238 #[must_use]
240 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
241 self.schema_version
242 }
243
244 #[must_use]
247 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
248 self.schema_fingerprint
249 }
250
251 #[must_use]
253 pub(in crate::db) const fn entity_count(self) -> u64 {
254 self.entity_count
255 }
256}
257
258#[derive(Clone, Copy, Debug, Eq, PartialEq)]
267pub(in crate::db) struct SchemaStoreAllocationMetadata {
268 data: SchemaStoreCatalogMetadata,
269 index: SchemaStoreCatalogMetadata,
270 schema: SchemaStoreCatalogMetadata,
271}
272
273impl SchemaStoreAllocationMetadata {
274 #[must_use]
277 const fn new(
278 data: SchemaStoreCatalogMetadata,
279 index: SchemaStoreCatalogMetadata,
280 schema: SchemaStoreCatalogMetadata,
281 ) -> Self {
282 Self {
283 data,
284 index,
285 schema,
286 }
287 }
288
289 #[must_use]
291 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
292 self.data
293 }
294
295 #[must_use]
297 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
298 self.index
299 }
300
301 #[must_use]
304 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
305 self.schema
306 }
307}
308
309impl SchemaStoreFootprint {
310 #[must_use]
312 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
313 Self {
314 snapshots,
315 encoded_bytes,
316 latest_snapshot_bytes,
317 }
318 }
319
320 #[must_use]
322 pub(in crate::db) const fn snapshots(self) -> u64 {
323 self.snapshots
324 }
325
326 #[must_use]
328 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
329 self.encoded_bytes
330 }
331
332 #[must_use]
334 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
335 self.latest_snapshot_bytes
336 }
337}
338
339pub struct SchemaStore {
348 backend: SchemaStoreBackend,
349}
350
351enum SchemaStoreBackend {
352 Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
353 Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
354}
355
356#[derive(Clone, Copy, Debug, Eq, PartialEq)]
358enum SchemaStoreVisit {
359 Continue,
360 #[allow(
361 dead_code,
362 reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
363 )]
364 Stop,
365}
366
367impl SchemaStoreVisit {
368 const fn should_stop(self) -> bool {
369 matches!(self, Self::Stop)
370 }
371}
372
373impl SchemaStore {
374 #[must_use]
376 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
377 Self {
378 backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
379 }
380 }
381
382 #[must_use]
384 pub const fn init_heap() -> Self {
385 Self {
386 backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
387 }
388 }
389
390 #[must_use]
392 pub(in crate::db) const fn is_heap_storage(&self) -> bool {
393 matches!(self.backend, SchemaStoreBackend::Heap(_))
394 }
395
396 pub(in crate::db) fn insert_persisted_snapshot(
398 &mut self,
399 entity: EntityTag,
400 snapshot: &PersistedSchemaSnapshot,
401 ) -> Result<(), InternalError> {
402 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
403 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
404 let _ = self.insert_raw_snapshot(key, raw_snapshot);
405
406 Ok(())
407 }
408
409 #[cfg(test)]
411 pub(in crate::db) fn get_persisted_snapshot(
412 &self,
413 entity: EntityTag,
414 version: SchemaVersion,
415 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
416 let key = RawSchemaKey::from_entity_version(entity, version);
417 self.get_raw_snapshot(&key)
418 .map(|snapshot| snapshot.decode_persisted_snapshot())
419 .transpose()
420 }
421
422 pub(in crate::db) fn latest_persisted_snapshot(
424 &self,
425 entity: EntityTag,
426 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
427 let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
428 let _: Result<(), InternalError> = self.visit_raw_snapshots(|key, snapshot| {
429 if key.entity_tag() != entity {
430 return Ok(SchemaStoreVisit::Continue);
431 }
432
433 let version = SchemaVersion::new(key.version());
434 if latest
435 .as_ref()
436 .is_none_or(|(latest_version, _)| version > *latest_version)
437 {
438 latest = Some((version, snapshot.clone()));
439 }
440 Ok(SchemaStoreVisit::Continue)
441 });
442
443 latest
444 .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
445 .transpose()
446 }
447
448 #[must_use]
450 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
451 let mut snapshots = 0u64;
452 let mut encoded_bytes = 0u64;
453 let mut latest = None::<(SchemaVersion, u64)>;
454
455 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
456 if key.entity_tag() != entity {
457 return Ok(SchemaStoreVisit::Continue);
458 }
459
460 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
461 snapshots = snapshots.saturating_add(1);
462 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
463
464 let version = SchemaVersion::new(key.version());
465 if latest
466 .as_ref()
467 .is_none_or(|(latest_version, _)| version > *latest_version)
468 {
469 latest = Some((version, snapshot_bytes));
470 }
471 Ok(SchemaStoreVisit::Continue)
472 });
473
474 SchemaStoreFootprint::new(
475 snapshots,
476 encoded_bytes,
477 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
478 )
479 }
480
481 #[cfg(test)]
487 pub(in crate::db) fn catalog_metadata(
488 &self,
489 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
490 Ok(self
491 .allocation_metadata()?
492 .map(SchemaStoreAllocationMetadata::schema))
493 }
494
495 pub(in crate::db) fn allocation_metadata(
502 &self,
503 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
504 let latest_by_entity = self.latest_raw_snapshots_by_entity();
505 if latest_by_entity.is_empty() {
506 return Ok(None);
507 }
508
509 Ok(Some(SchemaStoreAllocationMetadata::new(
510 derive_data_allocation_metadata(&latest_by_entity)?,
511 derive_index_allocation_metadata(&latest_by_entity)?,
512 derive_schema_catalog_metadata(&latest_by_entity)?,
513 )))
514 }
515
516 fn insert_raw_snapshot(
518 &mut self,
519 key: RawSchemaKey,
520 snapshot: RawSchemaSnapshot,
521 ) -> Option<RawSchemaSnapshot> {
522 match &mut self.backend {
523 SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
524 SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
525 }
526 }
527
528 #[must_use]
530 #[cfg(test)]
531 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
532 match &self.backend {
533 SchemaStoreBackend::Stable(map) => map.get(key),
534 SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
535 }
536 }
537
538 #[must_use]
540 #[cfg(test)]
541 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
542 match &self.backend {
543 SchemaStoreBackend::Stable(map) => map.contains_key(key),
544 SchemaStoreBackend::Heap(map) => map.contains_key(key),
545 }
546 }
547
548 #[must_use]
550 #[cfg(test)]
551 pub(in crate::db) fn len(&self) -> u64 {
552 match &self.backend {
553 SchemaStoreBackend::Stable(map) => map.len(),
554 SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
555 }
556 }
557
558 #[must_use]
560 #[cfg(test)]
561 pub(in crate::db) fn is_empty(&self) -> bool {
562 match &self.backend {
563 SchemaStoreBackend::Stable(map) => map.is_empty(),
564 SchemaStoreBackend::Heap(map) => map.is_empty(),
565 }
566 }
567
568 #[cfg(test)]
570 pub(in crate::db) fn clear(&mut self) {
571 match &mut self.backend {
572 SchemaStoreBackend::Stable(map) => map.clear_new(),
573 SchemaStoreBackend::Heap(map) => map.clear(),
574 }
575 }
576
577 fn latest_raw_snapshots_by_entity(
578 &self,
579 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
580 let mut latest_by_entity =
581 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
582
583 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
584 let version = SchemaVersion::new(key.version());
585 match latest_by_entity.get_mut(&key.entity_tag()) {
586 Some((latest_version, latest_snapshot)) if version > *latest_version => {
587 *latest_version = version;
588 *latest_snapshot = snapshot.clone();
589 }
590 None => {
591 latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
592 }
593 Some(_) => {}
594 }
595 Ok(SchemaStoreVisit::Continue)
596 });
597
598 latest_by_entity
599 }
600
601 fn visit_raw_snapshots<E>(
604 &self,
605 mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
606 ) -> Result<(), E> {
607 match &self.backend {
608 SchemaStoreBackend::Stable(map) => {
609 for entry in map.iter() {
610 if visitor(entry.key(), &entry.value())?.should_stop() {
611 break;
612 }
613 }
614 }
615 SchemaStoreBackend::Heap(map) => {
616 for (key, snapshot) in map {
617 if visitor(key, snapshot)?.should_stop() {
618 break;
619 }
620 }
621 }
622 }
623
624 Ok(())
625 }
626}
627
628fn derive_data_allocation_metadata(
629 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
630) -> Result<SchemaStoreCatalogMetadata, InternalError> {
631 let mut max_version = SchemaVersion::initial();
632 let mut hasher = new_hash_sha256();
633 write_hash_tag_u8(
634 &mut hasher,
635 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
636 );
637
638 for (entity, (_, snapshot)) in latest_by_entity {
639 let persisted = snapshot.decode_persisted_snapshot()?;
640 if persisted.version() > max_version {
641 max_version = persisted.version();
642 }
643
644 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
645 persisted.version(),
646 persisted.entity_path().to_string(),
647 persisted.entity_name().to_string(),
648 persisted.primary_key_field_ids().to_vec(),
649 persisted.row_layout().clone(),
650 persisted.fields().to_vec(),
651 Vec::new(),
652 );
653 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
654
655 write_hash_u64(&mut hasher, entity.value());
656 write_hash_u32(&mut hasher, persisted.version().get());
657 write_hash_len_u32(&mut hasher, encoded.len());
658 hasher.update(encoded);
659 }
660
661 Ok(finalize_schema_metadata(
662 max_version,
663 hasher,
664 latest_by_entity.len(),
665 ))
666}
667
668fn derive_index_allocation_metadata(
669 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
670) -> Result<SchemaStoreCatalogMetadata, InternalError> {
671 let mut max_version = SchemaVersion::initial();
672 let mut hasher = new_hash_sha256();
673 write_hash_tag_u8(
674 &mut hasher,
675 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
676 );
677
678 for (entity, (_, snapshot)) in latest_by_entity {
679 let persisted = snapshot.decode_persisted_snapshot()?;
680 if persisted.version() > max_version {
681 max_version = persisted.version();
682 }
683
684 write_hash_u64(&mut hasher, entity.value());
685 write_hash_u32(&mut hasher, persisted.version().get());
686 write_hash_len_u32(&mut hasher, persisted.indexes().len());
687 for index in persisted.indexes() {
688 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
689 write_hash_str_u32(&mut hasher, index.name());
690 write_hash_str_u32(&mut hasher, index.store());
691 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
692 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
693 match index.predicate_sql() {
694 Some(predicate_sql) => {
695 write_hash_tag_u8(&mut hasher, 1);
696 write_hash_str_u32(&mut hasher, predicate_sql);
697 }
698 None => write_hash_tag_u8(&mut hasher, 0),
699 }
700 hash_persisted_index_key(&mut hasher, index.key());
701 }
702 }
703
704 Ok(finalize_schema_metadata(
705 max_version,
706 hasher,
707 latest_by_entity.len(),
708 ))
709}
710
711fn derive_schema_catalog_metadata(
712 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
713) -> Result<SchemaStoreCatalogMetadata, InternalError> {
714 let mut max_version = SchemaVersion::initial();
715 let mut hasher = new_hash_sha256();
716 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
717
718 for (entity, (version, snapshot)) in latest_by_entity {
719 let persisted = snapshot.decode_persisted_snapshot()?;
720 if persisted.version() > max_version {
721 max_version = persisted.version();
722 }
723
724 write_hash_u64(&mut hasher, entity.value());
725 write_hash_u32(&mut hasher, version.get());
726 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
727 hasher.update(snapshot.as_bytes());
728 }
729
730 Ok(finalize_schema_metadata(
731 max_version,
732 hasher,
733 latest_by_entity.len(),
734 ))
735}
736
737fn finalize_schema_metadata(
738 schema_version: SchemaVersion,
739 hasher: sha2::Sha256,
740 entity_count: usize,
741) -> SchemaStoreCatalogMetadata {
742 let digest = finalize_hash_sha256(hasher);
743 let mut schema_fingerprint = [0u8; 16];
744 schema_fingerprint.copy_from_slice(&digest[..16]);
745
746 SchemaStoreCatalogMetadata::new(
747 schema_version,
748 schema_fingerprint,
749 u64::try_from(entity_count).unwrap_or(u64::MAX),
750 )
751}
752
753fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
754 match key {
755 PersistedIndexKeySnapshot::FieldPath(paths) => {
756 write_hash_tag_u8(hasher, 1);
757 write_hash_len_u32(hasher, paths.len());
758 for path in paths {
759 hash_persisted_index_field_path(hasher, path);
760 }
761 }
762 PersistedIndexKeySnapshot::Items(items) => {
763 write_hash_tag_u8(hasher, 2);
764 write_hash_len_u32(hasher, items.len());
765 for item in items {
766 match item {
767 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
768 write_hash_tag_u8(hasher, 1);
769 hash_persisted_index_field_path(hasher, path);
770 }
771 PersistedIndexKeyItemSnapshot::Expression(expression) => {
772 write_hash_tag_u8(hasher, 2);
773 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
774 hash_persisted_index_field_path(hasher, expression.source());
775 hash_persisted_field_kind(hasher, expression.input_kind());
776 hash_persisted_field_kind(hasher, expression.output_kind());
777 write_hash_str_u32(hasher, expression.canonical_text());
778 }
779 }
780 }
781 }
782 }
783}
784
785fn hash_persisted_index_field_path(
786 hasher: &mut sha2::Sha256,
787 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
788) {
789 write_hash_u32(hasher, path.field_id().get());
790 write_hash_u32(hasher, u32::from(path.slot().get()));
791 write_hash_len_u32(hasher, path.path().len());
792 for segment in path.path() {
793 write_hash_str_u32(hasher, segment);
794 }
795 hash_persisted_field_kind(hasher, path.kind());
796 write_hash_tag_u8(hasher, u8::from(path.nullable()));
797}
798
799fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
800 match kind {
801 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
802 PersistedFieldKind::Blob { max_len } => {
803 write_hash_tag_u8(hasher, 2);
804 hash_optional_u32(hasher, *max_len);
805 }
806 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
807 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
808 PersistedFieldKind::Decimal { scale } => {
809 write_hash_tag_u8(hasher, 5);
810 write_hash_u32(hasher, *scale);
811 }
812 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
813 PersistedFieldKind::Enum { path, variants } => {
814 write_hash_tag_u8(hasher, 7);
815 write_hash_str_u32(hasher, path);
816 write_hash_len_u32(hasher, variants.len());
817 for variant in variants {
818 write_hash_str_u32(hasher, variant.ident());
819 match variant.payload_kind() {
820 Some(payload_kind) => {
821 write_hash_tag_u8(hasher, 1);
822 hash_persisted_field_kind(hasher, payload_kind);
823 }
824 None => write_hash_tag_u8(hasher, 0),
825 }
826 write_hash_str_u32(
827 hasher,
828 field_storage_decode_name(variant.payload_storage_decode()),
829 );
830 }
831 }
832 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
833 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
834 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
835 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
836 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
837 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
838 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
839 PersistedFieldKind::IntBig { max_bytes } => {
840 write_hash_tag_u8(hasher, 15);
841 write_hash_u32(hasher, *max_bytes);
842 }
843 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
844 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
845 PersistedFieldKind::Text { max_len } => {
846 write_hash_tag_u8(hasher, 18);
847 hash_optional_u32(hasher, *max_len);
848 }
849 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
850 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
851 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
852 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
853 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
854 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
855 PersistedFieldKind::NatBig { max_bytes } => {
856 write_hash_tag_u8(hasher, 25);
857 write_hash_u32(hasher, *max_bytes);
858 }
859 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
860 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
861 PersistedFieldKind::Relation {
862 target_path,
863 target_entity_name,
864 target_entity_tag,
865 target_store_path,
866 key_kind,
867 strength,
868 } => {
869 write_hash_tag_u8(hasher, 28);
870 write_hash_str_u32(hasher, target_path);
871 write_hash_str_u32(hasher, target_entity_name);
872 write_hash_u64(hasher, target_entity_tag.value());
873 write_hash_str_u32(hasher, target_store_path);
874 hash_persisted_field_kind(hasher, key_kind);
875 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
876 }
877 PersistedFieldKind::List(inner) => {
878 write_hash_tag_u8(hasher, 29);
879 hash_persisted_field_kind(hasher, inner);
880 }
881 PersistedFieldKind::Set(inner) => {
882 write_hash_tag_u8(hasher, 30);
883 hash_persisted_field_kind(hasher, inner);
884 }
885 PersistedFieldKind::Map { key, value } => {
886 write_hash_tag_u8(hasher, 31);
887 hash_persisted_field_kind(hasher, key);
888 hash_persisted_field_kind(hasher, value);
889 }
890 PersistedFieldKind::Structured { queryable } => {
891 write_hash_tag_u8(hasher, 32);
892 write_hash_tag_u8(hasher, u8::from(*queryable));
893 }
894 }
895}
896
897fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
898 match value {
899 Some(value) => {
900 write_hash_tag_u8(hasher, 1);
901 write_hash_u32(hasher, value);
902 }
903 None => write_hash_tag_u8(hasher, 0),
904 }
905}
906
907const fn persisted_index_origin_name(
908 origin: crate::db::schema::PersistedIndexOrigin,
909) -> &'static str {
910 match origin {
911 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
912 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
913 }
914}
915
916const fn persisted_expression_op_name(
917 op: crate::db::schema::PersistedIndexExpressionOp,
918) -> &'static str {
919 match op {
920 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
921 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
922 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
923 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
924 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
925 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
926 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
927 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
928 }
929}
930
931const fn persisted_relation_strength_name(
932 strength: crate::db::schema::PersistedRelationStrength,
933) -> &'static str {
934 match strength {
935 crate::db::schema::PersistedRelationStrength::Strong => "strong",
936 crate::db::schema::PersistedRelationStrength::Weak => "weak",
937 }
938}
939
940const fn field_storage_decode_name(
941 decode: crate::model::field::FieldStorageDecode,
942) -> &'static str {
943 match decode {
944 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
945 crate::model::field::FieldStorageDecode::Value => "value",
946 }
947}
948
949#[cfg(test)]
954mod tests {
955 use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore, SchemaStoreVisit};
956 use crate::{
957 db::schema::{
958 FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
959 PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
960 PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
961 SchemaVersion, encode_persisted_schema_snapshot,
962 },
963 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
964 testing::test_memory,
965 traits::Storable,
966 types::EntityTag,
967 };
968 use std::borrow::Cow;
969 use std::convert::Infallible;
970
971 #[test]
972 fn raw_schema_key_round_trips_entity_and_version() {
973 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
974 SchemaVersion::initial()
975 });
976 let encoded = key.to_bytes().into_owned();
977 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
978
979 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
980 assert_eq!(decoded.version(), SchemaVersion::initial().get());
981 }
982
983 #[test]
984 fn raw_schema_snapshot_round_trips_payload_bytes() {
985 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
986 let encoded = snapshot.to_bytes().into_owned();
987 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
988
989 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
990 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
991 }
992
993 #[test]
994 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
995 let mut store = SchemaStore::init(test_memory(251));
996 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
997
998 assert!(store.is_empty());
999 assert!(!store.contains_raw_snapshot(&key));
1000
1001 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
1002
1003 assert_eq!(store.len(), 1);
1004 assert!(store.contains_raw_snapshot(&key));
1005 assert_eq!(
1006 store
1007 .get_raw_snapshot(&key)
1008 .expect("schema snapshot should be present")
1009 .as_bytes(),
1010 &[9, 4, 6],
1011 );
1012
1013 store.clear();
1014 assert!(store.is_empty());
1015 }
1016
1017 #[test]
1018 fn schema_store_loads_latest_snapshot_for_entity() {
1019 let mut store = SchemaStore::init(test_memory(252));
1020 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1021 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1022 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1023
1024 store
1025 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1026 .expect("initial schema snapshot should encode");
1027 store
1028 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1029 .expect("other entity schema snapshot should encode");
1030 store
1031 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1032 .expect("newer schema snapshot should encode");
1033
1034 let latest = store
1035 .latest_persisted_snapshot(EntityTag::new(41))
1036 .expect("latest schema snapshot should decode")
1037 .expect("schema snapshot should exist");
1038
1039 assert_eq!(latest.version(), SchemaVersion::new(2));
1040 assert_eq!(latest.entity_name(), "Newer");
1041 }
1042
1043 #[test]
1044 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1045 let mut store = SchemaStore::init(test_memory(242));
1046 store.insert_raw_snapshot(
1047 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1048 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1049 );
1050 store.insert_raw_snapshot(
1051 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1052 RawSchemaSnapshot::from_bytes(vec![5, 8]),
1053 );
1054 store.insert_raw_snapshot(
1055 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1056 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1057 );
1058
1059 let footprint = store.entity_footprint(EntityTag::new(71));
1060
1061 assert_eq!(footprint.snapshots(), 2);
1062 assert_eq!(footprint.encoded_bytes(), 7);
1063 assert_eq!(footprint.latest_snapshot_bytes(), 4);
1064 }
1065
1066 #[test]
1067 fn schema_store_visit_raw_snapshots_preserves_key_order() {
1068 let mut store = SchemaStore::init(test_memory(235));
1069 store.insert_raw_snapshot(
1070 RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1071 RawSchemaSnapshot::from_bytes(vec![32]),
1072 );
1073 store.insert_raw_snapshot(
1074 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1075 RawSchemaSnapshot::from_bytes(vec![13]),
1076 );
1077 store.insert_raw_snapshot(
1078 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1079 RawSchemaSnapshot::from_bytes(vec![11]),
1080 );
1081
1082 let mut visited = Vec::new();
1083 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1084 visited.push((
1085 key.entity_tag().value(),
1086 key.version(),
1087 snapshot.as_bytes()[0],
1088 ));
1089 Ok(SchemaStoreVisit::Continue)
1090 });
1091
1092 assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1093 }
1094
1095 #[test]
1096 fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1097 let mut store = SchemaStore::init(test_memory(234));
1098 store.insert_raw_snapshot(
1099 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1100 RawSchemaSnapshot::from_bytes(vec![21]),
1101 );
1102 store.insert_raw_snapshot(
1103 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1104 RawSchemaSnapshot::from_bytes(vec![22]),
1105 );
1106
1107 let mut visited = Vec::new();
1108 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1109 visited.push(key.version());
1110 Ok(SchemaStoreVisit::Stop)
1111 });
1112
1113 assert_eq!(visited, vec![1]);
1114 }
1115
1116 #[test]
1117 fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1118 let mut store = SchemaStore::init_heap();
1119 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1120 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1121 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1122
1123 store
1124 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1125 .expect("initial heap schema snapshot should encode");
1126 store
1127 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1128 .expect("other heap schema snapshot should encode");
1129 store
1130 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1131 .expect("newer heap schema snapshot should encode");
1132
1133 let latest = store
1134 .latest_persisted_snapshot(EntityTag::new(41))
1135 .expect("latest heap schema snapshot should decode")
1136 .expect("heap schema snapshot should exist");
1137 assert_eq!(latest.version(), SchemaVersion::new(2));
1138 assert_eq!(latest.entity_name(), "Newer");
1139
1140 let mut visited = Vec::new();
1141 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1142 visited.push((
1143 key.entity_tag().value(),
1144 key.version(),
1145 snapshot.as_bytes().len(),
1146 ));
1147 Ok(if visited.len() == 2 {
1148 SchemaStoreVisit::Stop
1149 } else {
1150 SchemaStoreVisit::Continue
1151 })
1152 });
1153 assert_eq!(
1154 visited
1155 .iter()
1156 .map(|(entity, version, _)| (*entity, *version))
1157 .collect::<Vec<_>>(),
1158 vec![(41, 1), (41, 2)]
1159 );
1160 }
1161
1162 #[test]
1163 fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1164 let store = SchemaStore::init(test_memory(241));
1165
1166 assert_eq!(
1167 store
1168 .catalog_metadata()
1169 .expect("empty schema catalog metadata should derive"),
1170 None
1171 );
1172 }
1173
1174 #[test]
1175 fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1176 let mut store = SchemaStore::init(test_memory(240));
1177 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1178 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1179 let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1180
1181 store
1182 .insert_persisted_snapshot(EntityTag::new(81), &initial)
1183 .expect("initial schema snapshot should encode");
1184 let initial_metadata = store
1185 .catalog_metadata()
1186 .expect("initial schema catalog metadata should derive")
1187 .expect("initial schema catalog metadata should be present");
1188
1189 store
1190 .insert_persisted_snapshot(EntityTag::new(81), &newer)
1191 .expect("newer schema snapshot should encode");
1192 store
1193 .insert_persisted_snapshot(EntityTag::new(82), &other)
1194 .expect("other schema snapshot should encode");
1195 let updated_metadata = store
1196 .catalog_metadata()
1197 .expect("updated schema catalog metadata should derive")
1198 .expect("updated schema catalog metadata should be present");
1199
1200 assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1201 assert_eq!(initial_metadata.entity_count(), 1);
1202 assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1203 assert_eq!(updated_metadata.entity_count(), 2);
1204 assert_ne!(
1205 initial_metadata.schema_fingerprint(),
1206 updated_metadata.schema_fingerprint(),
1207 "catalog fingerprint must change when latest accepted schema catalog changes"
1208 );
1209 }
1210
1211 #[test]
1212 fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1213 let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1214 let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1215
1216 let mut left = SchemaStore::init(test_memory(239));
1217 left.insert_persisted_snapshot(EntityTag::new(91), &first)
1218 .expect("first schema snapshot should encode");
1219 left.insert_persisted_snapshot(EntityTag::new(92), &second)
1220 .expect("second schema snapshot should encode");
1221
1222 let mut right = SchemaStore::init(test_memory(238));
1223 right
1224 .insert_persisted_snapshot(EntityTag::new(92), &second)
1225 .expect("second schema snapshot should encode");
1226 right
1227 .insert_persisted_snapshot(EntityTag::new(91), &first)
1228 .expect("first schema snapshot should encode");
1229
1230 let left_metadata = left
1231 .catalog_metadata()
1232 .expect("left schema catalog metadata should derive");
1233 let right_metadata = right
1234 .catalog_metadata()
1235 .expect("right schema catalog metadata should derive");
1236
1237 assert_eq!(left_metadata, right_metadata);
1238 }
1239
1240 #[test]
1241 fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1242 let without_index =
1243 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1244 let with_index = persisted_schema_snapshot_with_index_for_test(
1245 SchemaVersion::initial(),
1246 "RoleSpecific",
1247 "payload_idx",
1248 );
1249
1250 let mut base = SchemaStore::init(test_memory(237));
1251 base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1252 .expect("base schema snapshot should encode");
1253 let base_metadata = base
1254 .allocation_metadata()
1255 .expect("base allocation metadata should derive")
1256 .expect("base allocation metadata should be present");
1257
1258 let mut indexed = SchemaStore::init(test_memory(236));
1259 indexed
1260 .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1261 .expect("indexed schema snapshot should encode");
1262 let indexed_metadata = indexed
1263 .allocation_metadata()
1264 .expect("indexed allocation metadata should derive")
1265 .expect("indexed allocation metadata should be present");
1266
1267 assert_eq!(
1268 base_metadata.data().schema_fingerprint(),
1269 indexed_metadata.data().schema_fingerprint(),
1270 "data allocation metadata should ignore accepted index catalog changes"
1271 );
1272 assert_ne!(
1273 base_metadata.index().schema_fingerprint(),
1274 indexed_metadata.index().schema_fingerprint(),
1275 "index allocation metadata should change when accepted index catalog changes"
1276 );
1277 assert_ne!(
1278 base_metadata.schema().schema_fingerprint(),
1279 indexed_metadata.schema().schema_fingerprint(),
1280 "schema allocation metadata should change when full accepted catalog changes"
1281 );
1282 assert_ne!(
1283 indexed_metadata.data().schema_fingerprint(),
1284 indexed_metadata.index().schema_fingerprint(),
1285 "data and index allocation metadata should have distinct role fingerprints"
1286 );
1287 assert_ne!(
1288 indexed_metadata.index().schema_fingerprint(),
1289 indexed_metadata.schema().schema_fingerprint(),
1290 "index and schema allocation metadata should have distinct role fingerprints"
1291 );
1292 }
1293
1294 #[test]
1295 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1296 let mut store = SchemaStore::init(test_memory(253));
1297 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1298 SchemaVersion::new(2),
1299 SchemaVersion::initial(),
1300 "Invalid",
1301 );
1302
1303 let err = store
1304 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1305 .expect_err("schema store should reject mismatched snapshot/layout versions");
1306
1307 assert!(
1308 err.message()
1309 .contains("schema snapshot row-layout version mismatch"),
1310 "schema store should preserve the version mismatch diagnostic"
1311 );
1312 }
1313
1314 #[test]
1315 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1316 let mut store = SchemaStore::init(test_memory(254));
1317 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1318 let invalid = PersistedSchemaSnapshot::new(
1319 base.version(),
1320 base.entity_path().to_string(),
1321 base.entity_name().to_string(),
1322 base.first_primary_key_field_id(),
1323 SchemaRowLayout::new(
1324 base.version(),
1325 vec![
1326 (FieldId::new(1), SchemaFieldSlot::new(0)),
1327 (FieldId::new(2), SchemaFieldSlot::new(3)),
1328 ],
1329 ),
1330 base.fields().to_vec(),
1331 );
1332
1333 let err = store
1334 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1335 .expect_err("schema store should reject divergent field/layout slots");
1336
1337 assert!(
1338 err.message()
1339 .contains("schema snapshot field slot mismatch"),
1340 "schema store should report the duplicated slot divergence"
1341 );
1342 }
1343
1344 #[test]
1345 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1346 let mut store = SchemaStore::init(test_memory(246));
1347 let base =
1348 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1349 let invalid = PersistedSchemaSnapshot::new(
1350 base.version(),
1351 base.entity_path().to_string(),
1352 base.entity_name().to_string(),
1353 base.first_primary_key_field_id(),
1354 SchemaRowLayout::new(
1355 base.version(),
1356 vec![
1357 (FieldId::new(1), SchemaFieldSlot::new(0)),
1358 (FieldId::new(2), SchemaFieldSlot::new(0)),
1359 ],
1360 ),
1361 base.fields().to_vec(),
1362 );
1363
1364 let err = store
1365 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1366 .expect_err("schema store should reject duplicate row-layout slots");
1367
1368 assert!(
1369 err.message()
1370 .contains("schema snapshot duplicate row-layout slot"),
1371 "schema store should report the row-layout slot ambiguity"
1372 );
1373 }
1374
1375 #[test]
1376 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1377 let mut store = SchemaStore::init(test_memory(248));
1378 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1379 let invalid = PersistedSchemaSnapshot::new(
1380 base.version(),
1381 base.entity_path().to_string(),
1382 base.entity_name().to_string(),
1383 FieldId::new(99),
1384 base.row_layout().clone(),
1385 base.fields().to_vec(),
1386 );
1387
1388 let err = store
1389 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1390 .expect_err("schema store should reject snapshots without the primary-key field");
1391
1392 assert!(
1393 err.message()
1394 .contains("schema snapshot primary key field missing from row layout"),
1395 "schema store should report the missing primary-key field"
1396 );
1397 }
1398
1399 #[test]
1400 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1401 let mut store = SchemaStore::init(test_memory(249));
1402 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1403 let corrupt_key =
1404 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1405
1406 store
1407 .insert_persisted_snapshot(EntityTag::new(45), &initial)
1408 .expect("initial schema snapshot should encode");
1409 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1410
1411 let err = store
1412 .latest_persisted_snapshot(EntityTag::new(45))
1413 .expect_err("latest corrupt schema snapshot must fail closed");
1414
1415 assert!(
1416 err.message()
1417 .contains("failed to decode persisted schema snapshot"),
1418 "latest-version lookup should report the corrupt newest snapshot"
1419 );
1420 }
1421
1422 #[test]
1423 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1424 let mut store = SchemaStore::init(test_memory(250));
1425 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1426 let invalid = PersistedSchemaSnapshot::new(
1427 base.version(),
1428 base.entity_path().to_string(),
1429 base.entity_name().to_string(),
1430 base.first_primary_key_field_id(),
1431 SchemaRowLayout::new(
1432 base.version(),
1433 vec![
1434 (FieldId::new(1), SchemaFieldSlot::new(0)),
1435 (FieldId::new(2), SchemaFieldSlot::new(3)),
1436 ],
1437 ),
1438 base.fields().to_vec(),
1439 );
1440 let raw = encode_persisted_schema_snapshot(&invalid)
1441 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1442 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1443
1444 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1445
1446 let err = store
1447 .latest_persisted_snapshot(EntityTag::new(46))
1448 .expect_err("raw decode should reject divergent field/layout slots");
1449
1450 assert!(
1451 err.message()
1452 .contains("persisted schema snapshot field slot mismatch"),
1453 "schema codec should report the raw decoded slot divergence"
1454 );
1455 }
1456
1457 #[test]
1458 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1459 let mut store = SchemaStore::init(test_memory(247));
1460 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1461 let invalid = PersistedSchemaSnapshot::new(
1462 base.version(),
1463 base.entity_path().to_string(),
1464 base.entity_name().to_string(),
1465 FieldId::new(99),
1466 base.row_layout().clone(),
1467 base.fields().to_vec(),
1468 );
1469 let raw = encode_persisted_schema_snapshot(&invalid)
1470 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1471 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1472
1473 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1474
1475 let err = store
1476 .latest_persisted_snapshot(EntityTag::new(48))
1477 .expect_err("raw decode should reject snapshots without the primary-key field");
1478
1479 assert!(
1480 err.message()
1481 .contains("persisted schema snapshot primary key field missing from row layout"),
1482 "schema codec should report the raw decoded missing primary-key field"
1483 );
1484 }
1485
1486 #[test]
1487 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1488 let mut store = SchemaStore::init(test_memory(245));
1489 let base =
1490 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1491 let mut fields = base.fields().to_vec();
1492 let duplicate = PersistedFieldSnapshot::new(
1493 fields[1].id(),
1494 fields[0].name().to_string(),
1495 fields[1].slot(),
1496 fields[1].kind().clone(),
1497 fields[1].nested_leaves().to_vec(),
1498 fields[1].nullable(),
1499 fields[1].default().clone(),
1500 fields[1].storage_decode(),
1501 fields[1].leaf_codec(),
1502 );
1503 fields[1] = duplicate;
1504 let invalid = PersistedSchemaSnapshot::new(
1505 base.version(),
1506 base.entity_path().to_string(),
1507 base.entity_name().to_string(),
1508 base.first_primary_key_field_id(),
1509 base.row_layout().clone(),
1510 fields,
1511 );
1512 let raw = encode_persisted_schema_snapshot(&invalid)
1513 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1514 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1515
1516 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1517
1518 let err = store
1519 .latest_persisted_snapshot(EntityTag::new(50))
1520 .expect_err("raw decode should reject duplicate field names");
1521
1522 assert!(
1523 err.message()
1524 .contains("persisted schema snapshot duplicate field name"),
1525 "schema codec should report the raw decoded field-name ambiguity"
1526 );
1527 }
1528
1529 #[test]
1530 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1531 let mut store = SchemaStore::init(test_memory(244));
1532 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1533 let mut fields = base.fields().to_vec();
1534 let invalid_field = PersistedFieldSnapshot::new(
1535 fields[1].id(),
1536 fields[1].name().to_string(),
1537 fields[1].slot(),
1538 fields[1].kind().clone(),
1539 vec![PersistedNestedLeafSnapshot::new(
1540 Vec::new(),
1541 PersistedFieldKind::Blob { max_len: None },
1542 false,
1543 FieldStorageDecode::ByKind,
1544 LeafCodec::Scalar(ScalarCodec::Blob),
1545 )],
1546 fields[1].nullable(),
1547 fields[1].default().clone(),
1548 fields[1].storage_decode(),
1549 fields[1].leaf_codec(),
1550 );
1551 fields[1] = invalid_field;
1552 let invalid = PersistedSchemaSnapshot::new(
1553 base.version(),
1554 base.entity_path().to_string(),
1555 base.entity_name().to_string(),
1556 base.first_primary_key_field_id(),
1557 base.row_layout().clone(),
1558 fields,
1559 );
1560
1561 let err = store
1562 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1563 .expect_err("schema store should reject empty nested leaf paths");
1564
1565 assert!(
1566 err.message()
1567 .contains("schema snapshot empty nested leaf path"),
1568 "schema store should report the empty nested leaf path"
1569 );
1570 }
1571
1572 #[test]
1573 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1574 let mut store = SchemaStore::init(test_memory(243));
1575 let base =
1576 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1577 let mut fields = base.fields().to_vec();
1578 let duplicate_leaves = vec![
1579 PersistedNestedLeafSnapshot::new(
1580 vec!["bytes".to_string()],
1581 PersistedFieldKind::Blob { max_len: None },
1582 false,
1583 FieldStorageDecode::ByKind,
1584 LeafCodec::Scalar(ScalarCodec::Blob),
1585 ),
1586 PersistedNestedLeafSnapshot::new(
1587 vec!["bytes".to_string()],
1588 PersistedFieldKind::Text { max_len: None },
1589 false,
1590 FieldStorageDecode::ByKind,
1591 LeafCodec::Scalar(ScalarCodec::Text),
1592 ),
1593 ];
1594 let invalid_field = PersistedFieldSnapshot::new(
1595 fields[1].id(),
1596 fields[1].name().to_string(),
1597 fields[1].slot(),
1598 fields[1].kind().clone(),
1599 duplicate_leaves,
1600 fields[1].nullable(),
1601 fields[1].default().clone(),
1602 fields[1].storage_decode(),
1603 fields[1].leaf_codec(),
1604 );
1605 fields[1] = invalid_field;
1606 let invalid = PersistedSchemaSnapshot::new(
1607 base.version(),
1608 base.entity_path().to_string(),
1609 base.entity_name().to_string(),
1610 base.first_primary_key_field_id(),
1611 base.row_layout().clone(),
1612 fields,
1613 );
1614 let raw = encode_persisted_schema_snapshot(&invalid)
1615 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1616 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1617
1618 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1619
1620 let err = store
1621 .latest_persisted_snapshot(EntityTag::new(52))
1622 .expect_err("raw decode should reject duplicate nested leaf paths");
1623
1624 assert!(
1625 err.message()
1626 .contains("persisted schema snapshot duplicate nested leaf path"),
1627 "schema codec should report the raw decoded nested path ambiguity"
1628 );
1629 }
1630
1631 #[test]
1632 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1633 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1634
1635 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1636 .expect("schema snapshot should encode");
1637 let decoded = raw
1638 .decode_persisted_snapshot()
1639 .expect("schema snapshot should decode");
1640
1641 assert_eq!(decoded, snapshot);
1642 }
1643
1644 fn persisted_schema_snapshot_for_test(
1648 version: SchemaVersion,
1649 entity_name: &str,
1650 ) -> PersistedSchemaSnapshot {
1651 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1652 }
1653
1654 fn persisted_schema_snapshot_with_index_for_test(
1655 version: SchemaVersion,
1656 entity_name: &str,
1657 index_name: &str,
1658 ) -> PersistedSchemaSnapshot {
1659 let base = persisted_schema_snapshot_for_test(version, entity_name);
1660
1661 PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1662 base.version(),
1663 base.entity_path().to_string(),
1664 base.entity_name().to_string(),
1665 base.primary_key_field_ids().to_vec(),
1666 base.row_layout().clone(),
1667 base.fields().to_vec(),
1668 vec![PersistedIndexSnapshot::new(
1669 0,
1670 index_name.to_string(),
1671 "RoleSpecificStore".to_string(),
1672 false,
1673 PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1674 FieldId::new(1),
1675 SchemaFieldSlot::new(0),
1676 vec!["id".to_string()],
1677 PersistedFieldKind::Ulid,
1678 false,
1679 )]),
1680 None,
1681 )],
1682 )
1683 }
1684
1685 fn persisted_schema_snapshot_with_layout_version_for_test(
1689 version: SchemaVersion,
1690 layout_version: SchemaVersion,
1691 entity_name: &str,
1692 ) -> PersistedSchemaSnapshot {
1693 PersistedSchemaSnapshot::new(
1694 version,
1695 format!("entities::{entity_name}"),
1696 entity_name.to_string(),
1697 FieldId::new(1),
1698 SchemaRowLayout::new(
1699 layout_version,
1700 vec![
1701 (FieldId::new(1), SchemaFieldSlot::new(0)),
1702 (FieldId::new(2), SchemaFieldSlot::new(1)),
1703 ],
1704 ),
1705 vec![
1706 PersistedFieldSnapshot::new(
1707 FieldId::new(1),
1708 "id".to_string(),
1709 SchemaFieldSlot::new(0),
1710 PersistedFieldKind::Ulid,
1711 Vec::new(),
1712 false,
1713 SchemaFieldDefault::None,
1714 FieldStorageDecode::ByKind,
1715 LeafCodec::Scalar(ScalarCodec::Ulid),
1716 ),
1717 PersistedFieldSnapshot::new(
1718 FieldId::new(2),
1719 "payload".to_string(),
1720 SchemaFieldSlot::new(1),
1721 PersistedFieldKind::Map {
1722 key: Box::new(PersistedFieldKind::Text { max_len: None }),
1723 value: Box::new(PersistedFieldKind::List(Box::new(
1724 PersistedFieldKind::Nat64,
1725 ))),
1726 },
1727 vec![PersistedNestedLeafSnapshot::new(
1728 vec!["bytes".to_string()],
1729 PersistedFieldKind::Blob { max_len: None },
1730 false,
1731 FieldStorageDecode::ByKind,
1732 LeafCodec::Scalar(ScalarCodec::Blob),
1733 )],
1734 false,
1735 SchemaFieldDefault::None,
1736 FieldStorageDecode::ByKind,
1737 LeafCodec::StructuralFallback,
1738 ),
1739 ],
1740 )
1741 }
1742}