1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 schema::{
14 PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15 PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17 },
18 },
19 error::InternalError,
20 traits::Storable,
21 types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{
25 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
26};
27use sha2::Digest;
28use std::borrow::Cow;
29use std::collections::BTreeMap as StdBTreeMap;
30
31const SCHEMA_KEY_BYTES_USIZE: usize = 12;
32const SCHEMA_KEY_BYTES: u32 = 12;
33const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
34const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
35const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
36const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
37
38#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
47struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
48
49impl RawSchemaKey {
50 #[must_use]
52 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
53 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
54 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
55 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
56
57 Self(out)
58 }
59
60 #[must_use]
62 fn entity_tag(self) -> EntityTag {
63 let mut bytes = [0u8; size_of::<u64>()];
64 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
65
66 EntityTag::new(u64::from_be_bytes(bytes))
67 }
68
69 #[must_use]
71 fn version(self) -> u32 {
72 let mut bytes = [0u8; size_of::<u32>()];
73 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
74
75 u32::from_be_bytes(bytes)
76 }
77}
78
79impl Storable for RawSchemaKey {
80 fn to_bytes(&self) -> Cow<'_, [u8]> {
81 Cow::Borrowed(&self.0)
82 }
83
84 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
85 debug_assert_eq!(
86 bytes.len(),
87 SCHEMA_KEY_BYTES_USIZE,
88 "RawSchemaKey::from_bytes received unexpected byte length",
89 );
90
91 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
92 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
93 }
94
95 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
96 out.copy_from_slice(bytes.as_ref());
97 Self(out)
98 }
99
100 fn into_bytes(self) -> Vec<u8> {
101 self.0.to_vec()
102 }
103
104 const BOUND: Bound = Bound::Bounded {
105 max_size: SCHEMA_KEY_BYTES,
106 is_fixed_size: true,
107 };
108}
109
110#[derive(Clone, Debug, Eq, PartialEq)]
120struct RawSchemaSnapshot(Vec<u8>);
121
122impl RawSchemaSnapshot {
123 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
125 validate_typed_schema_snapshot_for_store(snapshot)?;
126
127 encode_persisted_schema_snapshot(snapshot).map(Self)
128 }
129
130 #[must_use]
132 #[cfg(test)]
133 const fn from_bytes(bytes: Vec<u8>) -> Self {
134 Self(bytes)
135 }
136
137 #[must_use]
139 const fn as_bytes(&self) -> &[u8] {
140 self.0.as_slice()
141 }
142
143 #[must_use]
145 #[cfg(test)]
146 fn into_bytes(self) -> Vec<u8> {
147 self.0
148 }
149
150 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
152 decode_persisted_schema_snapshot(self.as_bytes())
153 }
154}
155
156impl Storable for RawSchemaSnapshot {
157 fn to_bytes(&self) -> Cow<'_, [u8]> {
158 Cow::Borrowed(self.as_bytes())
159 }
160
161 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
162 Self(bytes.into_owned())
163 }
164
165 fn into_bytes(self) -> Vec<u8> {
166 self.0
167 }
168
169 const BOUND: Bound = Bound::Bounded {
170 max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
171 is_fixed_size: false,
172 };
173}
174
175fn validate_typed_schema_snapshot_for_store(
179 snapshot: &PersistedSchemaSnapshot,
180) -> Result<(), InternalError> {
181 if let Some(detail) = schema_snapshot_integrity_detail(
182 "schema snapshot",
183 snapshot.version(),
184 snapshot.primary_key_field_ids(),
185 snapshot.row_layout(),
186 snapshot.fields(),
187 ) {
188 return Err(InternalError::store_invariant(detail));
189 }
190
191 Ok(())
192}
193
194#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203pub(in crate::db) struct SchemaStoreFootprint {
204 snapshots: u64,
205 encoded_bytes: u64,
206 latest_snapshot_bytes: u64,
207}
208
209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct SchemaStoreCatalogMetadata {
218 schema_version: SchemaVersion,
219 schema_fingerprint: CommitSchemaFingerprint,
220 entity_count: u64,
221}
222
223impl SchemaStoreCatalogMetadata {
224 #[must_use]
226 const fn new(
227 schema_version: SchemaVersion,
228 schema_fingerprint: CommitSchemaFingerprint,
229 entity_count: u64,
230 ) -> Self {
231 Self {
232 schema_version,
233 schema_fingerprint,
234 entity_count,
235 }
236 }
237
238 #[must_use]
240 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
241 self.schema_version
242 }
243
244 #[must_use]
247 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
248 self.schema_fingerprint
249 }
250
251 #[must_use]
253 pub(in crate::db) const fn entity_count(self) -> u64 {
254 self.entity_count
255 }
256}
257
258#[derive(Clone, Copy, Debug, Eq, PartialEq)]
267pub(in crate::db) struct SchemaStoreAllocationMetadata {
268 data: SchemaStoreCatalogMetadata,
269 index: SchemaStoreCatalogMetadata,
270 schema: SchemaStoreCatalogMetadata,
271}
272
273impl SchemaStoreAllocationMetadata {
274 #[must_use]
277 const fn new(
278 data: SchemaStoreCatalogMetadata,
279 index: SchemaStoreCatalogMetadata,
280 schema: SchemaStoreCatalogMetadata,
281 ) -> Self {
282 Self {
283 data,
284 index,
285 schema,
286 }
287 }
288
289 #[must_use]
291 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
292 self.data
293 }
294
295 #[must_use]
297 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
298 self.index
299 }
300
301 #[must_use]
304 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
305 self.schema
306 }
307}
308
309impl SchemaStoreFootprint {
310 #[must_use]
312 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
313 Self {
314 snapshots,
315 encoded_bytes,
316 latest_snapshot_bytes,
317 }
318 }
319
320 #[must_use]
322 pub(in crate::db) const fn snapshots(self) -> u64 {
323 self.snapshots
324 }
325
326 #[must_use]
328 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
329 self.encoded_bytes
330 }
331
332 #[must_use]
334 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
335 self.latest_snapshot_bytes
336 }
337}
338
339pub struct SchemaStore {
348 backend: SchemaStoreBackend,
349}
350
351enum SchemaStoreBackend {
352 Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
353 Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
354}
355
356#[derive(Clone, Copy, Debug, Eq, PartialEq)]
358enum SchemaStoreVisit {
359 Continue,
360 #[allow(
361 dead_code,
362 reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
363 )]
364 Stop,
365}
366
367impl SchemaStoreVisit {
368 const fn should_stop(self) -> bool {
369 matches!(self, Self::Stop)
370 }
371}
372
373impl SchemaStore {
374 #[must_use]
376 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
377 Self {
378 backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
379 }
380 }
381
382 #[must_use]
384 pub const fn init_heap() -> Self {
385 Self {
386 backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
387 }
388 }
389
390 pub(in crate::db) fn insert_persisted_snapshot(
392 &mut self,
393 entity: EntityTag,
394 snapshot: &PersistedSchemaSnapshot,
395 ) -> Result<(), InternalError> {
396 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
397 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
398 let _ = self.insert_raw_snapshot(key, raw_snapshot);
399
400 Ok(())
401 }
402
403 #[cfg(test)]
405 pub(in crate::db) fn get_persisted_snapshot(
406 &self,
407 entity: EntityTag,
408 version: SchemaVersion,
409 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
410 let key = RawSchemaKey::from_entity_version(entity, version);
411 self.get_raw_snapshot(&key)
412 .map(|snapshot| snapshot.decode_persisted_snapshot())
413 .transpose()
414 }
415
416 pub(in crate::db) fn latest_persisted_snapshot(
418 &self,
419 entity: EntityTag,
420 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
421 let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
422 let _: Result<(), InternalError> = self.visit_raw_snapshots(|key, snapshot| {
423 if key.entity_tag() != entity {
424 return Ok(SchemaStoreVisit::Continue);
425 }
426
427 let version = SchemaVersion::new(key.version());
428 if latest
429 .as_ref()
430 .is_none_or(|(latest_version, _)| version > *latest_version)
431 {
432 latest = Some((version, snapshot.clone()));
433 }
434 Ok(SchemaStoreVisit::Continue)
435 });
436
437 latest
438 .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
439 .transpose()
440 }
441
442 #[must_use]
444 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
445 let mut snapshots = 0u64;
446 let mut encoded_bytes = 0u64;
447 let mut latest = None::<(SchemaVersion, u64)>;
448
449 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
450 if key.entity_tag() != entity {
451 return Ok(SchemaStoreVisit::Continue);
452 }
453
454 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
455 snapshots = snapshots.saturating_add(1);
456 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
457
458 let version = SchemaVersion::new(key.version());
459 if latest
460 .as_ref()
461 .is_none_or(|(latest_version, _)| version > *latest_version)
462 {
463 latest = Some((version, snapshot_bytes));
464 }
465 Ok(SchemaStoreVisit::Continue)
466 });
467
468 SchemaStoreFootprint::new(
469 snapshots,
470 encoded_bytes,
471 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
472 )
473 }
474
475 #[cfg(test)]
481 pub(in crate::db) fn catalog_metadata(
482 &self,
483 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
484 Ok(self
485 .allocation_metadata()?
486 .map(SchemaStoreAllocationMetadata::schema))
487 }
488
489 pub(in crate::db) fn allocation_metadata(
496 &self,
497 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
498 let latest_by_entity = self.latest_raw_snapshots_by_entity();
499 if latest_by_entity.is_empty() {
500 return Ok(None);
501 }
502
503 Ok(Some(SchemaStoreAllocationMetadata::new(
504 derive_data_allocation_metadata(&latest_by_entity)?,
505 derive_index_allocation_metadata(&latest_by_entity)?,
506 derive_schema_catalog_metadata(&latest_by_entity)?,
507 )))
508 }
509
510 fn insert_raw_snapshot(
512 &mut self,
513 key: RawSchemaKey,
514 snapshot: RawSchemaSnapshot,
515 ) -> Option<RawSchemaSnapshot> {
516 match &mut self.backend {
517 SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
518 SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
519 }
520 }
521
522 #[must_use]
524 #[cfg(test)]
525 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
526 match &self.backend {
527 SchemaStoreBackend::Stable(map) => map.get(key),
528 SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
529 }
530 }
531
532 #[must_use]
534 #[cfg(test)]
535 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
536 match &self.backend {
537 SchemaStoreBackend::Stable(map) => map.contains_key(key),
538 SchemaStoreBackend::Heap(map) => map.contains_key(key),
539 }
540 }
541
542 #[must_use]
544 #[cfg(test)]
545 pub(in crate::db) fn len(&self) -> u64 {
546 match &self.backend {
547 SchemaStoreBackend::Stable(map) => map.len(),
548 SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
549 }
550 }
551
552 #[must_use]
554 #[cfg(test)]
555 pub(in crate::db) fn is_empty(&self) -> bool {
556 match &self.backend {
557 SchemaStoreBackend::Stable(map) => map.is_empty(),
558 SchemaStoreBackend::Heap(map) => map.is_empty(),
559 }
560 }
561
562 #[cfg(test)]
564 pub(in crate::db) fn clear(&mut self) {
565 match &mut self.backend {
566 SchemaStoreBackend::Stable(map) => map.clear_new(),
567 SchemaStoreBackend::Heap(map) => map.clear(),
568 }
569 }
570
571 fn latest_raw_snapshots_by_entity(
572 &self,
573 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
574 let mut latest_by_entity =
575 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
576
577 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
578 let version = SchemaVersion::new(key.version());
579 match latest_by_entity.get_mut(&key.entity_tag()) {
580 Some((latest_version, latest_snapshot)) if version > *latest_version => {
581 *latest_version = version;
582 *latest_snapshot = snapshot.clone();
583 }
584 None => {
585 latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
586 }
587 Some(_) => {}
588 }
589 Ok(SchemaStoreVisit::Continue)
590 });
591
592 latest_by_entity
593 }
594
595 fn visit_raw_snapshots<E>(
598 &self,
599 mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
600 ) -> Result<(), E> {
601 match &self.backend {
602 SchemaStoreBackend::Stable(map) => {
603 for entry in map.iter() {
604 if visitor(entry.key(), &entry.value())?.should_stop() {
605 break;
606 }
607 }
608 }
609 SchemaStoreBackend::Heap(map) => {
610 for (key, snapshot) in map {
611 if visitor(key, snapshot)?.should_stop() {
612 break;
613 }
614 }
615 }
616 }
617
618 Ok(())
619 }
620}
621
622fn derive_data_allocation_metadata(
623 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
624) -> Result<SchemaStoreCatalogMetadata, InternalError> {
625 let mut max_version = SchemaVersion::initial();
626 let mut hasher = new_hash_sha256();
627 write_hash_tag_u8(
628 &mut hasher,
629 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
630 );
631
632 for (entity, (_, snapshot)) in latest_by_entity {
633 let persisted = snapshot.decode_persisted_snapshot()?;
634 if persisted.version() > max_version {
635 max_version = persisted.version();
636 }
637
638 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
639 persisted.version(),
640 persisted.entity_path().to_string(),
641 persisted.entity_name().to_string(),
642 persisted.primary_key_field_ids().to_vec(),
643 persisted.row_layout().clone(),
644 persisted.fields().to_vec(),
645 Vec::new(),
646 );
647 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
648
649 write_hash_u64(&mut hasher, entity.value());
650 write_hash_u32(&mut hasher, persisted.version().get());
651 write_hash_len_u32(&mut hasher, encoded.len());
652 hasher.update(encoded);
653 }
654
655 Ok(finalize_schema_metadata(
656 max_version,
657 hasher,
658 latest_by_entity.len(),
659 ))
660}
661
662fn derive_index_allocation_metadata(
663 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
664) -> Result<SchemaStoreCatalogMetadata, InternalError> {
665 let mut max_version = SchemaVersion::initial();
666 let mut hasher = new_hash_sha256();
667 write_hash_tag_u8(
668 &mut hasher,
669 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
670 );
671
672 for (entity, (_, snapshot)) in latest_by_entity {
673 let persisted = snapshot.decode_persisted_snapshot()?;
674 if persisted.version() > max_version {
675 max_version = persisted.version();
676 }
677
678 write_hash_u64(&mut hasher, entity.value());
679 write_hash_u32(&mut hasher, persisted.version().get());
680 write_hash_len_u32(&mut hasher, persisted.indexes().len());
681 for index in persisted.indexes() {
682 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
683 write_hash_str_u32(&mut hasher, index.name());
684 write_hash_str_u32(&mut hasher, index.store());
685 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
686 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
687 match index.predicate_sql() {
688 Some(predicate_sql) => {
689 write_hash_tag_u8(&mut hasher, 1);
690 write_hash_str_u32(&mut hasher, predicate_sql);
691 }
692 None => write_hash_tag_u8(&mut hasher, 0),
693 }
694 hash_persisted_index_key(&mut hasher, index.key());
695 }
696 }
697
698 Ok(finalize_schema_metadata(
699 max_version,
700 hasher,
701 latest_by_entity.len(),
702 ))
703}
704
705fn derive_schema_catalog_metadata(
706 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
707) -> Result<SchemaStoreCatalogMetadata, InternalError> {
708 let mut max_version = SchemaVersion::initial();
709 let mut hasher = new_hash_sha256();
710 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
711
712 for (entity, (version, snapshot)) in latest_by_entity {
713 let persisted = snapshot.decode_persisted_snapshot()?;
714 if persisted.version() > max_version {
715 max_version = persisted.version();
716 }
717
718 write_hash_u64(&mut hasher, entity.value());
719 write_hash_u32(&mut hasher, version.get());
720 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
721 hasher.update(snapshot.as_bytes());
722 }
723
724 Ok(finalize_schema_metadata(
725 max_version,
726 hasher,
727 latest_by_entity.len(),
728 ))
729}
730
731fn finalize_schema_metadata(
732 schema_version: SchemaVersion,
733 hasher: sha2::Sha256,
734 entity_count: usize,
735) -> SchemaStoreCatalogMetadata {
736 let digest = finalize_hash_sha256(hasher);
737 let mut schema_fingerprint = [0u8; 16];
738 schema_fingerprint.copy_from_slice(&digest[..16]);
739
740 SchemaStoreCatalogMetadata::new(
741 schema_version,
742 schema_fingerprint,
743 u64::try_from(entity_count).unwrap_or(u64::MAX),
744 )
745}
746
747fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
748 match key {
749 PersistedIndexKeySnapshot::FieldPath(paths) => {
750 write_hash_tag_u8(hasher, 1);
751 write_hash_len_u32(hasher, paths.len());
752 for path in paths {
753 hash_persisted_index_field_path(hasher, path);
754 }
755 }
756 PersistedIndexKeySnapshot::Items(items) => {
757 write_hash_tag_u8(hasher, 2);
758 write_hash_len_u32(hasher, items.len());
759 for item in items {
760 match item {
761 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
762 write_hash_tag_u8(hasher, 1);
763 hash_persisted_index_field_path(hasher, path);
764 }
765 PersistedIndexKeyItemSnapshot::Expression(expression) => {
766 write_hash_tag_u8(hasher, 2);
767 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
768 hash_persisted_index_field_path(hasher, expression.source());
769 hash_persisted_field_kind(hasher, expression.input_kind());
770 hash_persisted_field_kind(hasher, expression.output_kind());
771 write_hash_str_u32(hasher, expression.canonical_text());
772 }
773 }
774 }
775 }
776 }
777}
778
779fn hash_persisted_index_field_path(
780 hasher: &mut sha2::Sha256,
781 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
782) {
783 write_hash_u32(hasher, path.field_id().get());
784 write_hash_u32(hasher, u32::from(path.slot().get()));
785 write_hash_len_u32(hasher, path.path().len());
786 for segment in path.path() {
787 write_hash_str_u32(hasher, segment);
788 }
789 hash_persisted_field_kind(hasher, path.kind());
790 write_hash_tag_u8(hasher, u8::from(path.nullable()));
791}
792
793fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
794 match kind {
795 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
796 PersistedFieldKind::Blob { max_len } => {
797 write_hash_tag_u8(hasher, 2);
798 hash_optional_u32(hasher, *max_len);
799 }
800 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
801 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
802 PersistedFieldKind::Decimal { scale } => {
803 write_hash_tag_u8(hasher, 5);
804 write_hash_u32(hasher, *scale);
805 }
806 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
807 PersistedFieldKind::Enum { path, variants } => {
808 write_hash_tag_u8(hasher, 7);
809 write_hash_str_u32(hasher, path);
810 write_hash_len_u32(hasher, variants.len());
811 for variant in variants {
812 write_hash_str_u32(hasher, variant.ident());
813 match variant.payload_kind() {
814 Some(payload_kind) => {
815 write_hash_tag_u8(hasher, 1);
816 hash_persisted_field_kind(hasher, payload_kind);
817 }
818 None => write_hash_tag_u8(hasher, 0),
819 }
820 write_hash_str_u32(
821 hasher,
822 field_storage_decode_name(variant.payload_storage_decode()),
823 );
824 }
825 }
826 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
827 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
828 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
829 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
830 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
831 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
832 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
833 PersistedFieldKind::IntBig { max_bytes } => {
834 write_hash_tag_u8(hasher, 15);
835 write_hash_u32(hasher, *max_bytes);
836 }
837 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
838 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
839 PersistedFieldKind::Text { max_len } => {
840 write_hash_tag_u8(hasher, 18);
841 hash_optional_u32(hasher, *max_len);
842 }
843 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
844 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
845 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
846 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
847 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
848 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
849 PersistedFieldKind::NatBig { max_bytes } => {
850 write_hash_tag_u8(hasher, 25);
851 write_hash_u32(hasher, *max_bytes);
852 }
853 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
854 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
855 PersistedFieldKind::Relation {
856 target_path,
857 target_entity_name,
858 target_entity_tag,
859 target_store_path,
860 key_kind,
861 strength,
862 } => {
863 write_hash_tag_u8(hasher, 28);
864 write_hash_str_u32(hasher, target_path);
865 write_hash_str_u32(hasher, target_entity_name);
866 write_hash_u64(hasher, target_entity_tag.value());
867 write_hash_str_u32(hasher, target_store_path);
868 hash_persisted_field_kind(hasher, key_kind);
869 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
870 }
871 PersistedFieldKind::List(inner) => {
872 write_hash_tag_u8(hasher, 29);
873 hash_persisted_field_kind(hasher, inner);
874 }
875 PersistedFieldKind::Set(inner) => {
876 write_hash_tag_u8(hasher, 30);
877 hash_persisted_field_kind(hasher, inner);
878 }
879 PersistedFieldKind::Map { key, value } => {
880 write_hash_tag_u8(hasher, 31);
881 hash_persisted_field_kind(hasher, key);
882 hash_persisted_field_kind(hasher, value);
883 }
884 PersistedFieldKind::Structured { queryable } => {
885 write_hash_tag_u8(hasher, 32);
886 write_hash_tag_u8(hasher, u8::from(*queryable));
887 }
888 }
889}
890
891fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
892 match value {
893 Some(value) => {
894 write_hash_tag_u8(hasher, 1);
895 write_hash_u32(hasher, value);
896 }
897 None => write_hash_tag_u8(hasher, 0),
898 }
899}
900
901const fn persisted_index_origin_name(
902 origin: crate::db::schema::PersistedIndexOrigin,
903) -> &'static str {
904 match origin {
905 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
906 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
907 }
908}
909
910const fn persisted_expression_op_name(
911 op: crate::db::schema::PersistedIndexExpressionOp,
912) -> &'static str {
913 match op {
914 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
915 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
916 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
917 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
918 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
919 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
920 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
921 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
922 }
923}
924
925const fn persisted_relation_strength_name(
926 strength: crate::db::schema::PersistedRelationStrength,
927) -> &'static str {
928 match strength {
929 crate::db::schema::PersistedRelationStrength::Strong => "strong",
930 crate::db::schema::PersistedRelationStrength::Weak => "weak",
931 }
932}
933
934const fn field_storage_decode_name(
935 decode: crate::model::field::FieldStorageDecode,
936) -> &'static str {
937 match decode {
938 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
939 crate::model::field::FieldStorageDecode::Value => "value",
940 }
941}
942
943#[cfg(test)]
948mod tests {
949 use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore, SchemaStoreVisit};
950 use crate::{
951 db::schema::{
952 FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
953 PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
954 PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
955 SchemaVersion, encode_persisted_schema_snapshot,
956 },
957 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
958 testing::test_memory,
959 traits::Storable,
960 types::EntityTag,
961 };
962 use std::borrow::Cow;
963 use std::convert::Infallible;
964
965 #[test]
966 fn raw_schema_key_round_trips_entity_and_version() {
967 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
968 SchemaVersion::initial()
969 });
970 let encoded = key.to_bytes().into_owned();
971 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
972
973 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
974 assert_eq!(decoded.version(), SchemaVersion::initial().get());
975 }
976
977 #[test]
978 fn raw_schema_snapshot_round_trips_payload_bytes() {
979 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
980 let encoded = snapshot.to_bytes().into_owned();
981 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
982
983 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
984 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
985 }
986
987 #[test]
988 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
989 let mut store = SchemaStore::init(test_memory(251));
990 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
991
992 assert!(store.is_empty());
993 assert!(!store.contains_raw_snapshot(&key));
994
995 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
996
997 assert_eq!(store.len(), 1);
998 assert!(store.contains_raw_snapshot(&key));
999 assert_eq!(
1000 store
1001 .get_raw_snapshot(&key)
1002 .expect("schema snapshot should be present")
1003 .as_bytes(),
1004 &[9, 4, 6],
1005 );
1006
1007 store.clear();
1008 assert!(store.is_empty());
1009 }
1010
1011 #[test]
1012 fn schema_store_loads_latest_snapshot_for_entity() {
1013 let mut store = SchemaStore::init(test_memory(252));
1014 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1015 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1016 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1017
1018 store
1019 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1020 .expect("initial schema snapshot should encode");
1021 store
1022 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1023 .expect("other entity schema snapshot should encode");
1024 store
1025 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1026 .expect("newer schema snapshot should encode");
1027
1028 let latest = store
1029 .latest_persisted_snapshot(EntityTag::new(41))
1030 .expect("latest schema snapshot should decode")
1031 .expect("schema snapshot should exist");
1032
1033 assert_eq!(latest.version(), SchemaVersion::new(2));
1034 assert_eq!(latest.entity_name(), "Newer");
1035 }
1036
1037 #[test]
1038 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1039 let mut store = SchemaStore::init(test_memory(242));
1040 store.insert_raw_snapshot(
1041 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1042 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1043 );
1044 store.insert_raw_snapshot(
1045 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1046 RawSchemaSnapshot::from_bytes(vec![5, 8]),
1047 );
1048 store.insert_raw_snapshot(
1049 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1050 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1051 );
1052
1053 let footprint = store.entity_footprint(EntityTag::new(71));
1054
1055 assert_eq!(footprint.snapshots(), 2);
1056 assert_eq!(footprint.encoded_bytes(), 7);
1057 assert_eq!(footprint.latest_snapshot_bytes(), 4);
1058 }
1059
1060 #[test]
1061 fn schema_store_visit_raw_snapshots_preserves_key_order() {
1062 let mut store = SchemaStore::init(test_memory(235));
1063 store.insert_raw_snapshot(
1064 RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1065 RawSchemaSnapshot::from_bytes(vec![32]),
1066 );
1067 store.insert_raw_snapshot(
1068 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1069 RawSchemaSnapshot::from_bytes(vec![13]),
1070 );
1071 store.insert_raw_snapshot(
1072 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1073 RawSchemaSnapshot::from_bytes(vec![11]),
1074 );
1075
1076 let mut visited = Vec::new();
1077 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1078 visited.push((
1079 key.entity_tag().value(),
1080 key.version(),
1081 snapshot.as_bytes()[0],
1082 ));
1083 Ok(SchemaStoreVisit::Continue)
1084 });
1085
1086 assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1087 }
1088
1089 #[test]
1090 fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1091 let mut store = SchemaStore::init(test_memory(234));
1092 store.insert_raw_snapshot(
1093 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1094 RawSchemaSnapshot::from_bytes(vec![21]),
1095 );
1096 store.insert_raw_snapshot(
1097 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1098 RawSchemaSnapshot::from_bytes(vec![22]),
1099 );
1100
1101 let mut visited = Vec::new();
1102 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1103 visited.push(key.version());
1104 Ok(SchemaStoreVisit::Stop)
1105 });
1106
1107 assert_eq!(visited, vec![1]);
1108 }
1109
1110 #[test]
1111 fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1112 let mut store = SchemaStore::init_heap();
1113 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1114 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1115 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1116
1117 store
1118 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1119 .expect("initial heap schema snapshot should encode");
1120 store
1121 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1122 .expect("other heap schema snapshot should encode");
1123 store
1124 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1125 .expect("newer heap schema snapshot should encode");
1126
1127 let latest = store
1128 .latest_persisted_snapshot(EntityTag::new(41))
1129 .expect("latest heap schema snapshot should decode")
1130 .expect("heap schema snapshot should exist");
1131 assert_eq!(latest.version(), SchemaVersion::new(2));
1132 assert_eq!(latest.entity_name(), "Newer");
1133
1134 let mut visited = Vec::new();
1135 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1136 visited.push((
1137 key.entity_tag().value(),
1138 key.version(),
1139 snapshot.as_bytes().len(),
1140 ));
1141 Ok(if visited.len() == 2 {
1142 SchemaStoreVisit::Stop
1143 } else {
1144 SchemaStoreVisit::Continue
1145 })
1146 });
1147 assert_eq!(
1148 visited
1149 .iter()
1150 .map(|(entity, version, _)| (*entity, *version))
1151 .collect::<Vec<_>>(),
1152 vec![(41, 1), (41, 2)]
1153 );
1154 }
1155
1156 #[test]
1157 fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1158 let store = SchemaStore::init(test_memory(241));
1159
1160 assert_eq!(
1161 store
1162 .catalog_metadata()
1163 .expect("empty schema catalog metadata should derive"),
1164 None
1165 );
1166 }
1167
1168 #[test]
1169 fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1170 let mut store = SchemaStore::init(test_memory(240));
1171 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1172 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1173 let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1174
1175 store
1176 .insert_persisted_snapshot(EntityTag::new(81), &initial)
1177 .expect("initial schema snapshot should encode");
1178 let initial_metadata = store
1179 .catalog_metadata()
1180 .expect("initial schema catalog metadata should derive")
1181 .expect("initial schema catalog metadata should be present");
1182
1183 store
1184 .insert_persisted_snapshot(EntityTag::new(81), &newer)
1185 .expect("newer schema snapshot should encode");
1186 store
1187 .insert_persisted_snapshot(EntityTag::new(82), &other)
1188 .expect("other schema snapshot should encode");
1189 let updated_metadata = store
1190 .catalog_metadata()
1191 .expect("updated schema catalog metadata should derive")
1192 .expect("updated schema catalog metadata should be present");
1193
1194 assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1195 assert_eq!(initial_metadata.entity_count(), 1);
1196 assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1197 assert_eq!(updated_metadata.entity_count(), 2);
1198 assert_ne!(
1199 initial_metadata.schema_fingerprint(),
1200 updated_metadata.schema_fingerprint(),
1201 "catalog fingerprint must change when latest accepted schema catalog changes"
1202 );
1203 }
1204
1205 #[test]
1206 fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1207 let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1208 let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1209
1210 let mut left = SchemaStore::init(test_memory(239));
1211 left.insert_persisted_snapshot(EntityTag::new(91), &first)
1212 .expect("first schema snapshot should encode");
1213 left.insert_persisted_snapshot(EntityTag::new(92), &second)
1214 .expect("second schema snapshot should encode");
1215
1216 let mut right = SchemaStore::init(test_memory(238));
1217 right
1218 .insert_persisted_snapshot(EntityTag::new(92), &second)
1219 .expect("second schema snapshot should encode");
1220 right
1221 .insert_persisted_snapshot(EntityTag::new(91), &first)
1222 .expect("first schema snapshot should encode");
1223
1224 let left_metadata = left
1225 .catalog_metadata()
1226 .expect("left schema catalog metadata should derive");
1227 let right_metadata = right
1228 .catalog_metadata()
1229 .expect("right schema catalog metadata should derive");
1230
1231 assert_eq!(left_metadata, right_metadata);
1232 }
1233
1234 #[test]
1235 fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1236 let without_index =
1237 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1238 let with_index = persisted_schema_snapshot_with_index_for_test(
1239 SchemaVersion::initial(),
1240 "RoleSpecific",
1241 "payload_idx",
1242 );
1243
1244 let mut base = SchemaStore::init(test_memory(237));
1245 base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1246 .expect("base schema snapshot should encode");
1247 let base_metadata = base
1248 .allocation_metadata()
1249 .expect("base allocation metadata should derive")
1250 .expect("base allocation metadata should be present");
1251
1252 let mut indexed = SchemaStore::init(test_memory(236));
1253 indexed
1254 .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1255 .expect("indexed schema snapshot should encode");
1256 let indexed_metadata = indexed
1257 .allocation_metadata()
1258 .expect("indexed allocation metadata should derive")
1259 .expect("indexed allocation metadata should be present");
1260
1261 assert_eq!(
1262 base_metadata.data().schema_fingerprint(),
1263 indexed_metadata.data().schema_fingerprint(),
1264 "data allocation metadata should ignore accepted index catalog changes"
1265 );
1266 assert_ne!(
1267 base_metadata.index().schema_fingerprint(),
1268 indexed_metadata.index().schema_fingerprint(),
1269 "index allocation metadata should change when accepted index catalog changes"
1270 );
1271 assert_ne!(
1272 base_metadata.schema().schema_fingerprint(),
1273 indexed_metadata.schema().schema_fingerprint(),
1274 "schema allocation metadata should change when full accepted catalog changes"
1275 );
1276 assert_ne!(
1277 indexed_metadata.data().schema_fingerprint(),
1278 indexed_metadata.index().schema_fingerprint(),
1279 "data and index allocation metadata should have distinct role fingerprints"
1280 );
1281 assert_ne!(
1282 indexed_metadata.index().schema_fingerprint(),
1283 indexed_metadata.schema().schema_fingerprint(),
1284 "index and schema allocation metadata should have distinct role fingerprints"
1285 );
1286 }
1287
1288 #[test]
1289 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1290 let mut store = SchemaStore::init(test_memory(253));
1291 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1292 SchemaVersion::new(2),
1293 SchemaVersion::initial(),
1294 "Invalid",
1295 );
1296
1297 let err = store
1298 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1299 .expect_err("schema store should reject mismatched snapshot/layout versions");
1300
1301 assert!(
1302 err.message()
1303 .contains("schema snapshot row-layout version mismatch"),
1304 "schema store should preserve the version mismatch diagnostic"
1305 );
1306 }
1307
1308 #[test]
1309 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1310 let mut store = SchemaStore::init(test_memory(254));
1311 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1312 let invalid = PersistedSchemaSnapshot::new(
1313 base.version(),
1314 base.entity_path().to_string(),
1315 base.entity_name().to_string(),
1316 base.first_primary_key_field_id(),
1317 SchemaRowLayout::new(
1318 base.version(),
1319 vec![
1320 (FieldId::new(1), SchemaFieldSlot::new(0)),
1321 (FieldId::new(2), SchemaFieldSlot::new(3)),
1322 ],
1323 ),
1324 base.fields().to_vec(),
1325 );
1326
1327 let err = store
1328 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1329 .expect_err("schema store should reject divergent field/layout slots");
1330
1331 assert!(
1332 err.message()
1333 .contains("schema snapshot field slot mismatch"),
1334 "schema store should report the duplicated slot divergence"
1335 );
1336 }
1337
1338 #[test]
1339 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1340 let mut store = SchemaStore::init(test_memory(246));
1341 let base =
1342 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1343 let invalid = PersistedSchemaSnapshot::new(
1344 base.version(),
1345 base.entity_path().to_string(),
1346 base.entity_name().to_string(),
1347 base.first_primary_key_field_id(),
1348 SchemaRowLayout::new(
1349 base.version(),
1350 vec![
1351 (FieldId::new(1), SchemaFieldSlot::new(0)),
1352 (FieldId::new(2), SchemaFieldSlot::new(0)),
1353 ],
1354 ),
1355 base.fields().to_vec(),
1356 );
1357
1358 let err = store
1359 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1360 .expect_err("schema store should reject duplicate row-layout slots");
1361
1362 assert!(
1363 err.message()
1364 .contains("schema snapshot duplicate row-layout slot"),
1365 "schema store should report the row-layout slot ambiguity"
1366 );
1367 }
1368
1369 #[test]
1370 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1371 let mut store = SchemaStore::init(test_memory(248));
1372 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1373 let invalid = PersistedSchemaSnapshot::new(
1374 base.version(),
1375 base.entity_path().to_string(),
1376 base.entity_name().to_string(),
1377 FieldId::new(99),
1378 base.row_layout().clone(),
1379 base.fields().to_vec(),
1380 );
1381
1382 let err = store
1383 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1384 .expect_err("schema store should reject snapshots without the primary-key field");
1385
1386 assert!(
1387 err.message()
1388 .contains("schema snapshot primary key field missing from row layout"),
1389 "schema store should report the missing primary-key field"
1390 );
1391 }
1392
1393 #[test]
1394 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1395 let mut store = SchemaStore::init(test_memory(249));
1396 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1397 let corrupt_key =
1398 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1399
1400 store
1401 .insert_persisted_snapshot(EntityTag::new(45), &initial)
1402 .expect("initial schema snapshot should encode");
1403 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1404
1405 let err = store
1406 .latest_persisted_snapshot(EntityTag::new(45))
1407 .expect_err("latest corrupt schema snapshot must fail closed");
1408
1409 assert!(
1410 err.message()
1411 .contains("failed to decode persisted schema snapshot"),
1412 "latest-version lookup should report the corrupt newest snapshot"
1413 );
1414 }
1415
1416 #[test]
1417 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1418 let mut store = SchemaStore::init(test_memory(250));
1419 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1420 let invalid = PersistedSchemaSnapshot::new(
1421 base.version(),
1422 base.entity_path().to_string(),
1423 base.entity_name().to_string(),
1424 base.first_primary_key_field_id(),
1425 SchemaRowLayout::new(
1426 base.version(),
1427 vec![
1428 (FieldId::new(1), SchemaFieldSlot::new(0)),
1429 (FieldId::new(2), SchemaFieldSlot::new(3)),
1430 ],
1431 ),
1432 base.fields().to_vec(),
1433 );
1434 let raw = encode_persisted_schema_snapshot(&invalid)
1435 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1436 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1437
1438 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1439
1440 let err = store
1441 .latest_persisted_snapshot(EntityTag::new(46))
1442 .expect_err("raw decode should reject divergent field/layout slots");
1443
1444 assert!(
1445 err.message()
1446 .contains("persisted schema snapshot field slot mismatch"),
1447 "schema codec should report the raw decoded slot divergence"
1448 );
1449 }
1450
1451 #[test]
1452 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1453 let mut store = SchemaStore::init(test_memory(247));
1454 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1455 let invalid = PersistedSchemaSnapshot::new(
1456 base.version(),
1457 base.entity_path().to_string(),
1458 base.entity_name().to_string(),
1459 FieldId::new(99),
1460 base.row_layout().clone(),
1461 base.fields().to_vec(),
1462 );
1463 let raw = encode_persisted_schema_snapshot(&invalid)
1464 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1465 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1466
1467 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1468
1469 let err = store
1470 .latest_persisted_snapshot(EntityTag::new(48))
1471 .expect_err("raw decode should reject snapshots without the primary-key field");
1472
1473 assert!(
1474 err.message()
1475 .contains("persisted schema snapshot primary key field missing from row layout"),
1476 "schema codec should report the raw decoded missing primary-key field"
1477 );
1478 }
1479
1480 #[test]
1481 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1482 let mut store = SchemaStore::init(test_memory(245));
1483 let base =
1484 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1485 let mut fields = base.fields().to_vec();
1486 let duplicate = PersistedFieldSnapshot::new(
1487 fields[1].id(),
1488 fields[0].name().to_string(),
1489 fields[1].slot(),
1490 fields[1].kind().clone(),
1491 fields[1].nested_leaves().to_vec(),
1492 fields[1].nullable(),
1493 fields[1].default().clone(),
1494 fields[1].storage_decode(),
1495 fields[1].leaf_codec(),
1496 );
1497 fields[1] = duplicate;
1498 let invalid = PersistedSchemaSnapshot::new(
1499 base.version(),
1500 base.entity_path().to_string(),
1501 base.entity_name().to_string(),
1502 base.first_primary_key_field_id(),
1503 base.row_layout().clone(),
1504 fields,
1505 );
1506 let raw = encode_persisted_schema_snapshot(&invalid)
1507 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1508 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1509
1510 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1511
1512 let err = store
1513 .latest_persisted_snapshot(EntityTag::new(50))
1514 .expect_err("raw decode should reject duplicate field names");
1515
1516 assert!(
1517 err.message()
1518 .contains("persisted schema snapshot duplicate field name"),
1519 "schema codec should report the raw decoded field-name ambiguity"
1520 );
1521 }
1522
1523 #[test]
1524 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1525 let mut store = SchemaStore::init(test_memory(244));
1526 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1527 let mut fields = base.fields().to_vec();
1528 let invalid_field = PersistedFieldSnapshot::new(
1529 fields[1].id(),
1530 fields[1].name().to_string(),
1531 fields[1].slot(),
1532 fields[1].kind().clone(),
1533 vec![PersistedNestedLeafSnapshot::new(
1534 Vec::new(),
1535 PersistedFieldKind::Blob { max_len: None },
1536 false,
1537 FieldStorageDecode::ByKind,
1538 LeafCodec::Scalar(ScalarCodec::Blob),
1539 )],
1540 fields[1].nullable(),
1541 fields[1].default().clone(),
1542 fields[1].storage_decode(),
1543 fields[1].leaf_codec(),
1544 );
1545 fields[1] = invalid_field;
1546 let invalid = PersistedSchemaSnapshot::new(
1547 base.version(),
1548 base.entity_path().to_string(),
1549 base.entity_name().to_string(),
1550 base.first_primary_key_field_id(),
1551 base.row_layout().clone(),
1552 fields,
1553 );
1554
1555 let err = store
1556 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1557 .expect_err("schema store should reject empty nested leaf paths");
1558
1559 assert!(
1560 err.message()
1561 .contains("schema snapshot empty nested leaf path"),
1562 "schema store should report the empty nested leaf path"
1563 );
1564 }
1565
1566 #[test]
1567 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1568 let mut store = SchemaStore::init(test_memory(243));
1569 let base =
1570 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1571 let mut fields = base.fields().to_vec();
1572 let duplicate_leaves = vec![
1573 PersistedNestedLeafSnapshot::new(
1574 vec!["bytes".to_string()],
1575 PersistedFieldKind::Blob { max_len: None },
1576 false,
1577 FieldStorageDecode::ByKind,
1578 LeafCodec::Scalar(ScalarCodec::Blob),
1579 ),
1580 PersistedNestedLeafSnapshot::new(
1581 vec!["bytes".to_string()],
1582 PersistedFieldKind::Text { max_len: None },
1583 false,
1584 FieldStorageDecode::ByKind,
1585 LeafCodec::Scalar(ScalarCodec::Text),
1586 ),
1587 ];
1588 let invalid_field = PersistedFieldSnapshot::new(
1589 fields[1].id(),
1590 fields[1].name().to_string(),
1591 fields[1].slot(),
1592 fields[1].kind().clone(),
1593 duplicate_leaves,
1594 fields[1].nullable(),
1595 fields[1].default().clone(),
1596 fields[1].storage_decode(),
1597 fields[1].leaf_codec(),
1598 );
1599 fields[1] = invalid_field;
1600 let invalid = PersistedSchemaSnapshot::new(
1601 base.version(),
1602 base.entity_path().to_string(),
1603 base.entity_name().to_string(),
1604 base.first_primary_key_field_id(),
1605 base.row_layout().clone(),
1606 fields,
1607 );
1608 let raw = encode_persisted_schema_snapshot(&invalid)
1609 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1610 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1611
1612 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1613
1614 let err = store
1615 .latest_persisted_snapshot(EntityTag::new(52))
1616 .expect_err("raw decode should reject duplicate nested leaf paths");
1617
1618 assert!(
1619 err.message()
1620 .contains("persisted schema snapshot duplicate nested leaf path"),
1621 "schema codec should report the raw decoded nested path ambiguity"
1622 );
1623 }
1624
1625 #[test]
1626 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1627 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1628
1629 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1630 .expect("schema snapshot should encode");
1631 let decoded = raw
1632 .decode_persisted_snapshot()
1633 .expect("schema snapshot should decode");
1634
1635 assert_eq!(decoded, snapshot);
1636 }
1637
1638 fn persisted_schema_snapshot_for_test(
1642 version: SchemaVersion,
1643 entity_name: &str,
1644 ) -> PersistedSchemaSnapshot {
1645 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1646 }
1647
1648 fn persisted_schema_snapshot_with_index_for_test(
1649 version: SchemaVersion,
1650 entity_name: &str,
1651 index_name: &str,
1652 ) -> PersistedSchemaSnapshot {
1653 let base = persisted_schema_snapshot_for_test(version, entity_name);
1654
1655 PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1656 base.version(),
1657 base.entity_path().to_string(),
1658 base.entity_name().to_string(),
1659 base.primary_key_field_ids().to_vec(),
1660 base.row_layout().clone(),
1661 base.fields().to_vec(),
1662 vec![PersistedIndexSnapshot::new(
1663 0,
1664 index_name.to_string(),
1665 "RoleSpecificStore".to_string(),
1666 false,
1667 PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1668 FieldId::new(1),
1669 SchemaFieldSlot::new(0),
1670 vec!["id".to_string()],
1671 PersistedFieldKind::Ulid,
1672 false,
1673 )]),
1674 None,
1675 )],
1676 )
1677 }
1678
1679 fn persisted_schema_snapshot_with_layout_version_for_test(
1683 version: SchemaVersion,
1684 layout_version: SchemaVersion,
1685 entity_name: &str,
1686 ) -> PersistedSchemaSnapshot {
1687 PersistedSchemaSnapshot::new(
1688 version,
1689 format!("entities::{entity_name}"),
1690 entity_name.to_string(),
1691 FieldId::new(1),
1692 SchemaRowLayout::new(
1693 layout_version,
1694 vec![
1695 (FieldId::new(1), SchemaFieldSlot::new(0)),
1696 (FieldId::new(2), SchemaFieldSlot::new(1)),
1697 ],
1698 ),
1699 vec![
1700 PersistedFieldSnapshot::new(
1701 FieldId::new(1),
1702 "id".to_string(),
1703 SchemaFieldSlot::new(0),
1704 PersistedFieldKind::Ulid,
1705 Vec::new(),
1706 false,
1707 SchemaFieldDefault::None,
1708 FieldStorageDecode::ByKind,
1709 LeafCodec::Scalar(ScalarCodec::Ulid),
1710 ),
1711 PersistedFieldSnapshot::new(
1712 FieldId::new(2),
1713 "payload".to_string(),
1714 SchemaFieldSlot::new(1),
1715 PersistedFieldKind::Map {
1716 key: Box::new(PersistedFieldKind::Text { max_len: None }),
1717 value: Box::new(PersistedFieldKind::List(Box::new(
1718 PersistedFieldKind::Nat64,
1719 ))),
1720 },
1721 vec![PersistedNestedLeafSnapshot::new(
1722 vec!["bytes".to_string()],
1723 PersistedFieldKind::Blob { max_len: None },
1724 false,
1725 FieldStorageDecode::ByKind,
1726 LeafCodec::Scalar(ScalarCodec::Blob),
1727 )],
1728 false,
1729 SchemaFieldDefault::None,
1730 FieldStorageDecode::ByKind,
1731 LeafCodec::StructuralFallback,
1732 ),
1733 ],
1734 )
1735 }
1736}