1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 schema::{
14 PersistedFieldKind, PersistedIndexKeyItemSnapshot, PersistedIndexKeySnapshot,
15 PersistedSchemaSnapshot, SchemaVersion, decode_persisted_schema_snapshot,
16 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
17 },
18 },
19 error::InternalError,
20 traits::Storable,
21 types::EntityTag,
22};
23use ic_memory::stable_structures::storable::Bound;
24use ic_memory::stable_structures::{BTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory};
25use sha2::Digest;
26use std::borrow::Cow;
27use std::collections::BTreeMap as StdBTreeMap;
28
29const SCHEMA_KEY_BYTES_USIZE: usize = 12;
30const SCHEMA_KEY_BYTES: u32 = 12;
31const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
32const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
33const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
34const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
35
36#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
45struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
46
47impl RawSchemaKey {
48 #[must_use]
50 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
51 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
52 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
53 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
54
55 Self(out)
56 }
57
58 #[must_use]
60 fn entity_tag(self) -> EntityTag {
61 let mut bytes = [0u8; size_of::<u64>()];
62 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
63
64 EntityTag::new(u64::from_be_bytes(bytes))
65 }
66
67 #[must_use]
69 fn version(self) -> u32 {
70 let mut bytes = [0u8; size_of::<u32>()];
71 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
72
73 u32::from_be_bytes(bytes)
74 }
75}
76
77impl Storable for RawSchemaKey {
78 fn to_bytes(&self) -> Cow<'_, [u8]> {
79 Cow::Borrowed(&self.0)
80 }
81
82 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
83 debug_assert_eq!(
84 bytes.len(),
85 SCHEMA_KEY_BYTES_USIZE,
86 "RawSchemaKey::from_bytes received unexpected byte length",
87 );
88
89 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
90 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
91 }
92
93 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
94 out.copy_from_slice(bytes.as_ref());
95 Self(out)
96 }
97
98 fn into_bytes(self) -> Vec<u8> {
99 self.0.to_vec()
100 }
101
102 const BOUND: Bound = Bound::Bounded {
103 max_size: SCHEMA_KEY_BYTES,
104 is_fixed_size: true,
105 };
106}
107
108#[derive(Clone, Debug, Eq, PartialEq)]
118struct RawSchemaSnapshot(Vec<u8>);
119
120impl RawSchemaSnapshot {
121 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
123 validate_typed_schema_snapshot_for_store(snapshot)?;
124
125 encode_persisted_schema_snapshot(snapshot).map(Self)
126 }
127
128 #[must_use]
130 #[cfg(test)]
131 const fn from_bytes(bytes: Vec<u8>) -> Self {
132 Self(bytes)
133 }
134
135 #[must_use]
137 const fn as_bytes(&self) -> &[u8] {
138 self.0.as_slice()
139 }
140
141 #[must_use]
143 #[cfg(test)]
144 fn into_bytes(self) -> Vec<u8> {
145 self.0
146 }
147
148 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
150 decode_persisted_schema_snapshot(self.as_bytes())
151 }
152}
153
154impl Storable for RawSchemaSnapshot {
155 fn to_bytes(&self) -> Cow<'_, [u8]> {
156 Cow::Borrowed(self.as_bytes())
157 }
158
159 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
160 Self(bytes.into_owned())
161 }
162
163 fn into_bytes(self) -> Vec<u8> {
164 self.0
165 }
166
167 const BOUND: Bound = Bound::Bounded {
168 max_size: MAX_SCHEMA_SNAPSHOT_BYTES,
169 is_fixed_size: false,
170 };
171}
172
173fn validate_typed_schema_snapshot_for_store(
177 snapshot: &PersistedSchemaSnapshot,
178) -> Result<(), InternalError> {
179 if let Some(detail) = schema_snapshot_integrity_detail(
180 "schema snapshot",
181 snapshot.version(),
182 snapshot.primary_key_field_ids(),
183 snapshot.row_layout(),
184 snapshot.fields(),
185 ) {
186 return Err(InternalError::store_invariant(detail));
187 }
188
189 Ok(())
190}
191
192#[derive(Clone, Copy, Debug, Eq, PartialEq)]
201pub(in crate::db) struct SchemaStoreFootprint {
202 snapshots: u64,
203 encoded_bytes: u64,
204 latest_snapshot_bytes: u64,
205}
206
207#[derive(Clone, Copy, Debug, Eq, PartialEq)]
215pub(in crate::db) struct SchemaStoreCatalogMetadata {
216 schema_version: SchemaVersion,
217 schema_fingerprint: CommitSchemaFingerprint,
218 entity_count: u64,
219}
220
221impl SchemaStoreCatalogMetadata {
222 #[must_use]
224 const fn new(
225 schema_version: SchemaVersion,
226 schema_fingerprint: CommitSchemaFingerprint,
227 entity_count: u64,
228 ) -> Self {
229 Self {
230 schema_version,
231 schema_fingerprint,
232 entity_count,
233 }
234 }
235
236 #[must_use]
238 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
239 self.schema_version
240 }
241
242 #[must_use]
245 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
246 self.schema_fingerprint
247 }
248
249 #[must_use]
251 pub(in crate::db) const fn entity_count(self) -> u64 {
252 self.entity_count
253 }
254}
255
256#[derive(Clone, Copy, Debug, Eq, PartialEq)]
265pub(in crate::db) struct SchemaStoreAllocationMetadata {
266 data: SchemaStoreCatalogMetadata,
267 index: SchemaStoreCatalogMetadata,
268 schema: SchemaStoreCatalogMetadata,
269}
270
271impl SchemaStoreAllocationMetadata {
272 #[must_use]
275 const fn new(
276 data: SchemaStoreCatalogMetadata,
277 index: SchemaStoreCatalogMetadata,
278 schema: SchemaStoreCatalogMetadata,
279 ) -> Self {
280 Self {
281 data,
282 index,
283 schema,
284 }
285 }
286
287 #[must_use]
289 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
290 self.data
291 }
292
293 #[must_use]
295 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
296 self.index
297 }
298
299 #[must_use]
302 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
303 self.schema
304 }
305}
306
307impl SchemaStoreFootprint {
308 #[must_use]
310 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
311 Self {
312 snapshots,
313 encoded_bytes,
314 latest_snapshot_bytes,
315 }
316 }
317
318 #[must_use]
320 pub(in crate::db) const fn snapshots(self) -> u64 {
321 self.snapshots
322 }
323
324 #[must_use]
326 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
327 self.encoded_bytes
328 }
329
330 #[must_use]
332 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
333 self.latest_snapshot_bytes
334 }
335}
336
337pub struct SchemaStore {
346 map: BTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
347}
348
349impl SchemaStore {
350 #[must_use]
352 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
353 Self {
354 map: BTreeMap::init(memory),
355 }
356 }
357
358 pub(in crate::db) fn insert_persisted_snapshot(
360 &mut self,
361 entity: EntityTag,
362 snapshot: &PersistedSchemaSnapshot,
363 ) -> Result<(), InternalError> {
364 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
365 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
366 let _ = self.insert_raw_snapshot(key, raw_snapshot);
367
368 Ok(())
369 }
370
371 #[cfg(test)]
373 pub(in crate::db) fn get_persisted_snapshot(
374 &self,
375 entity: EntityTag,
376 version: SchemaVersion,
377 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
378 let key = RawSchemaKey::from_entity_version(entity, version);
379 self.get_raw_snapshot(&key)
380 .map(|snapshot| snapshot.decode_persisted_snapshot())
381 .transpose()
382 }
383
384 pub(in crate::db) fn latest_persisted_snapshot(
386 &self,
387 entity: EntityTag,
388 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
389 let mut latest = None::<(SchemaVersion, RawSchemaSnapshot)>;
390 for entry in self.map.iter() {
391 let (key, snapshot) = entry.into_pair();
392 if key.entity_tag() != entity {
393 continue;
394 }
395
396 let version = SchemaVersion::new(key.version());
397 if latest
398 .as_ref()
399 .is_none_or(|(latest_version, _)| version > *latest_version)
400 {
401 latest = Some((version, snapshot));
402 }
403 }
404
405 latest
406 .map(|(_, snapshot)| snapshot.decode_persisted_snapshot())
407 .transpose()
408 }
409
410 #[must_use]
412 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
413 let mut snapshots = 0u64;
414 let mut encoded_bytes = 0u64;
415 let mut latest = None::<(SchemaVersion, u64)>;
416
417 for entry in self.map.iter() {
418 let (key, snapshot) = entry.into_pair();
419 if key.entity_tag() != entity {
420 continue;
421 }
422
423 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
424 snapshots = snapshots.saturating_add(1);
425 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
426
427 let version = SchemaVersion::new(key.version());
428 if latest
429 .as_ref()
430 .is_none_or(|(latest_version, _)| version > *latest_version)
431 {
432 latest = Some((version, snapshot_bytes));
433 }
434 }
435
436 SchemaStoreFootprint::new(
437 snapshots,
438 encoded_bytes,
439 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
440 )
441 }
442
443 #[cfg(test)]
449 pub(in crate::db) fn catalog_metadata(
450 &self,
451 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
452 Ok(self
453 .allocation_metadata()?
454 .map(SchemaStoreAllocationMetadata::schema))
455 }
456
457 pub(in crate::db) fn allocation_metadata(
464 &self,
465 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
466 let latest_by_entity = self.latest_raw_snapshots_by_entity();
467 if latest_by_entity.is_empty() {
468 return Ok(None);
469 }
470
471 Ok(Some(SchemaStoreAllocationMetadata::new(
472 derive_data_allocation_metadata(&latest_by_entity)?,
473 derive_index_allocation_metadata(&latest_by_entity)?,
474 derive_schema_catalog_metadata(&latest_by_entity)?,
475 )))
476 }
477
478 fn insert_raw_snapshot(
480 &mut self,
481 key: RawSchemaKey,
482 snapshot: RawSchemaSnapshot,
483 ) -> Option<RawSchemaSnapshot> {
484 self.map.insert(key, snapshot)
485 }
486
487 #[must_use]
489 #[cfg(test)]
490 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
491 self.map.get(key)
492 }
493
494 #[must_use]
496 #[cfg(test)]
497 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
498 self.map.contains_key(key)
499 }
500
501 #[must_use]
503 #[cfg(test)]
504 pub(in crate::db) fn len(&self) -> u64 {
505 self.map.len()
506 }
507
508 #[must_use]
510 #[cfg(test)]
511 pub(in crate::db) fn is_empty(&self) -> bool {
512 self.map.is_empty()
513 }
514
515 #[cfg(test)]
517 pub(in crate::db) fn clear(&mut self) {
518 self.map.clear_new();
519 }
520
521 fn latest_raw_snapshots_by_entity(
522 &self,
523 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
524 let mut latest_by_entity =
525 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
526
527 for entry in self.map.iter() {
528 let (key, snapshot) = entry.into_pair();
529 let version = SchemaVersion::new(key.version());
530 match latest_by_entity.get_mut(&key.entity_tag()) {
531 Some((latest_version, latest_snapshot)) if version > *latest_version => {
532 *latest_version = version;
533 *latest_snapshot = snapshot;
534 }
535 None => {
536 latest_by_entity.insert(key.entity_tag(), (version, snapshot));
537 }
538 Some(_) => {}
539 }
540 }
541
542 latest_by_entity
543 }
544}
545
546fn derive_data_allocation_metadata(
547 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
548) -> Result<SchemaStoreCatalogMetadata, InternalError> {
549 let mut max_version = SchemaVersion::initial();
550 let mut hasher = new_hash_sha256();
551 write_hash_tag_u8(
552 &mut hasher,
553 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
554 );
555
556 for (entity, (_, snapshot)) in latest_by_entity {
557 let persisted = snapshot.decode_persisted_snapshot()?;
558 if persisted.version() > max_version {
559 max_version = persisted.version();
560 }
561
562 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
563 persisted.version(),
564 persisted.entity_path().to_string(),
565 persisted.entity_name().to_string(),
566 persisted.primary_key_field_ids().to_vec(),
567 persisted.row_layout().clone(),
568 persisted.fields().to_vec(),
569 Vec::new(),
570 );
571 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
572
573 write_hash_u64(&mut hasher, entity.value());
574 write_hash_u32(&mut hasher, persisted.version().get());
575 write_hash_len_u32(&mut hasher, encoded.len());
576 hasher.update(encoded);
577 }
578
579 Ok(finalize_schema_metadata(
580 max_version,
581 hasher,
582 latest_by_entity.len(),
583 ))
584}
585
586fn derive_index_allocation_metadata(
587 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
588) -> Result<SchemaStoreCatalogMetadata, InternalError> {
589 let mut max_version = SchemaVersion::initial();
590 let mut hasher = new_hash_sha256();
591 write_hash_tag_u8(
592 &mut hasher,
593 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
594 );
595
596 for (entity, (_, snapshot)) in latest_by_entity {
597 let persisted = snapshot.decode_persisted_snapshot()?;
598 if persisted.version() > max_version {
599 max_version = persisted.version();
600 }
601
602 write_hash_u64(&mut hasher, entity.value());
603 write_hash_u32(&mut hasher, persisted.version().get());
604 write_hash_len_u32(&mut hasher, persisted.indexes().len());
605 for index in persisted.indexes() {
606 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
607 write_hash_str_u32(&mut hasher, index.name());
608 write_hash_str_u32(&mut hasher, index.store());
609 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
610 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
611 match index.predicate_sql() {
612 Some(predicate_sql) => {
613 write_hash_tag_u8(&mut hasher, 1);
614 write_hash_str_u32(&mut hasher, predicate_sql);
615 }
616 None => write_hash_tag_u8(&mut hasher, 0),
617 }
618 hash_persisted_index_key(&mut hasher, index.key());
619 }
620 }
621
622 Ok(finalize_schema_metadata(
623 max_version,
624 hasher,
625 latest_by_entity.len(),
626 ))
627}
628
629fn derive_schema_catalog_metadata(
630 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
631) -> Result<SchemaStoreCatalogMetadata, InternalError> {
632 let mut max_version = SchemaVersion::initial();
633 let mut hasher = new_hash_sha256();
634 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
635
636 for (entity, (version, snapshot)) in latest_by_entity {
637 let persisted = snapshot.decode_persisted_snapshot()?;
638 if persisted.version() > max_version {
639 max_version = persisted.version();
640 }
641
642 write_hash_u64(&mut hasher, entity.value());
643 write_hash_u32(&mut hasher, version.get());
644 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
645 hasher.update(snapshot.as_bytes());
646 }
647
648 Ok(finalize_schema_metadata(
649 max_version,
650 hasher,
651 latest_by_entity.len(),
652 ))
653}
654
655fn finalize_schema_metadata(
656 schema_version: SchemaVersion,
657 hasher: sha2::Sha256,
658 entity_count: usize,
659) -> SchemaStoreCatalogMetadata {
660 let digest = finalize_hash_sha256(hasher);
661 let mut schema_fingerprint = [0u8; 16];
662 schema_fingerprint.copy_from_slice(&digest[..16]);
663
664 SchemaStoreCatalogMetadata::new(
665 schema_version,
666 schema_fingerprint,
667 u64::try_from(entity_count).unwrap_or(u64::MAX),
668 )
669}
670
671fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
672 match key {
673 PersistedIndexKeySnapshot::FieldPath(paths) => {
674 write_hash_tag_u8(hasher, 1);
675 write_hash_len_u32(hasher, paths.len());
676 for path in paths {
677 hash_persisted_index_field_path(hasher, path);
678 }
679 }
680 PersistedIndexKeySnapshot::Items(items) => {
681 write_hash_tag_u8(hasher, 2);
682 write_hash_len_u32(hasher, items.len());
683 for item in items {
684 match item {
685 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
686 write_hash_tag_u8(hasher, 1);
687 hash_persisted_index_field_path(hasher, path);
688 }
689 PersistedIndexKeyItemSnapshot::Expression(expression) => {
690 write_hash_tag_u8(hasher, 2);
691 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
692 hash_persisted_index_field_path(hasher, expression.source());
693 hash_persisted_field_kind(hasher, expression.input_kind());
694 hash_persisted_field_kind(hasher, expression.output_kind());
695 write_hash_str_u32(hasher, expression.canonical_text());
696 }
697 }
698 }
699 }
700 }
701}
702
703fn hash_persisted_index_field_path(
704 hasher: &mut sha2::Sha256,
705 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
706) {
707 write_hash_u32(hasher, path.field_id().get());
708 write_hash_u32(hasher, u32::from(path.slot().get()));
709 write_hash_len_u32(hasher, path.path().len());
710 for segment in path.path() {
711 write_hash_str_u32(hasher, segment);
712 }
713 hash_persisted_field_kind(hasher, path.kind());
714 write_hash_tag_u8(hasher, u8::from(path.nullable()));
715}
716
717fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
718 match kind {
719 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
720 PersistedFieldKind::Blob { max_len } => {
721 write_hash_tag_u8(hasher, 2);
722 hash_optional_u32(hasher, *max_len);
723 }
724 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
725 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
726 PersistedFieldKind::Decimal { scale } => {
727 write_hash_tag_u8(hasher, 5);
728 write_hash_u32(hasher, *scale);
729 }
730 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
731 PersistedFieldKind::Enum { path, variants } => {
732 write_hash_tag_u8(hasher, 7);
733 write_hash_str_u32(hasher, path);
734 write_hash_len_u32(hasher, variants.len());
735 for variant in variants {
736 write_hash_str_u32(hasher, variant.ident());
737 match variant.payload_kind() {
738 Some(payload_kind) => {
739 write_hash_tag_u8(hasher, 1);
740 hash_persisted_field_kind(hasher, payload_kind);
741 }
742 None => write_hash_tag_u8(hasher, 0),
743 }
744 write_hash_str_u32(
745 hasher,
746 field_storage_decode_name(variant.payload_storage_decode()),
747 );
748 }
749 }
750 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
751 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
752 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
753 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
754 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
755 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
756 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
757 PersistedFieldKind::IntBig { max_bytes } => {
758 write_hash_tag_u8(hasher, 15);
759 write_hash_u32(hasher, *max_bytes);
760 }
761 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
762 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
763 PersistedFieldKind::Text { max_len } => {
764 write_hash_tag_u8(hasher, 18);
765 hash_optional_u32(hasher, *max_len);
766 }
767 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
768 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
769 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
770 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
771 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
772 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
773 PersistedFieldKind::NatBig { max_bytes } => {
774 write_hash_tag_u8(hasher, 25);
775 write_hash_u32(hasher, *max_bytes);
776 }
777 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
778 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
779 PersistedFieldKind::Relation {
780 target_path,
781 target_entity_name,
782 target_entity_tag,
783 target_store_path,
784 key_kind,
785 strength,
786 } => {
787 write_hash_tag_u8(hasher, 28);
788 write_hash_str_u32(hasher, target_path);
789 write_hash_str_u32(hasher, target_entity_name);
790 write_hash_u64(hasher, target_entity_tag.value());
791 write_hash_str_u32(hasher, target_store_path);
792 hash_persisted_field_kind(hasher, key_kind);
793 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
794 }
795 PersistedFieldKind::List(inner) => {
796 write_hash_tag_u8(hasher, 29);
797 hash_persisted_field_kind(hasher, inner);
798 }
799 PersistedFieldKind::Set(inner) => {
800 write_hash_tag_u8(hasher, 30);
801 hash_persisted_field_kind(hasher, inner);
802 }
803 PersistedFieldKind::Map { key, value } => {
804 write_hash_tag_u8(hasher, 31);
805 hash_persisted_field_kind(hasher, key);
806 hash_persisted_field_kind(hasher, value);
807 }
808 PersistedFieldKind::Structured { queryable } => {
809 write_hash_tag_u8(hasher, 32);
810 write_hash_tag_u8(hasher, u8::from(*queryable));
811 }
812 }
813}
814
815fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
816 match value {
817 Some(value) => {
818 write_hash_tag_u8(hasher, 1);
819 write_hash_u32(hasher, value);
820 }
821 None => write_hash_tag_u8(hasher, 0),
822 }
823}
824
825const fn persisted_index_origin_name(
826 origin: crate::db::schema::PersistedIndexOrigin,
827) -> &'static str {
828 match origin {
829 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
830 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
831 }
832}
833
834const fn persisted_expression_op_name(
835 op: crate::db::schema::PersistedIndexExpressionOp,
836) -> &'static str {
837 match op {
838 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
839 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
840 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
841 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
842 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
843 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
844 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
845 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
846 }
847}
848
849const fn persisted_relation_strength_name(
850 strength: crate::db::schema::PersistedRelationStrength,
851) -> &'static str {
852 match strength {
853 crate::db::schema::PersistedRelationStrength::Strong => "strong",
854 crate::db::schema::PersistedRelationStrength::Weak => "weak",
855 }
856}
857
858const fn field_storage_decode_name(
859 decode: crate::model::field::FieldStorageDecode,
860) -> &'static str {
861 match decode {
862 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
863 crate::model::field::FieldStorageDecode::Value => "value",
864 }
865}
866
867#[cfg(test)]
872mod tests {
873 use super::{RawSchemaKey, RawSchemaSnapshot, SchemaStore};
874 use crate::{
875 db::schema::{
876 FieldId, PersistedFieldKind, PersistedFieldSnapshot, PersistedIndexFieldPathSnapshot,
877 PersistedIndexKeySnapshot, PersistedIndexSnapshot, PersistedNestedLeafSnapshot,
878 PersistedSchemaSnapshot, SchemaFieldDefault, SchemaFieldSlot, SchemaRowLayout,
879 SchemaVersion, encode_persisted_schema_snapshot,
880 },
881 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
882 testing::test_memory,
883 traits::Storable,
884 types::EntityTag,
885 };
886 use std::borrow::Cow;
887
888 #[test]
889 fn raw_schema_key_round_trips_entity_and_version() {
890 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
891 SchemaVersion::initial()
892 });
893 let encoded = key.to_bytes().into_owned();
894 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
895
896 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
897 assert_eq!(decoded.version(), SchemaVersion::initial().get());
898 }
899
900 #[test]
901 fn raw_schema_snapshot_round_trips_payload_bytes() {
902 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
903 let encoded = snapshot.to_bytes().into_owned();
904 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
905
906 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
907 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
908 }
909
910 #[test]
911 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
912 let mut store = SchemaStore::init(test_memory(251));
913 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
914
915 assert!(store.is_empty());
916 assert!(!store.contains_raw_snapshot(&key));
917
918 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
919
920 assert_eq!(store.len(), 1);
921 assert!(store.contains_raw_snapshot(&key));
922 assert_eq!(
923 store
924 .get_raw_snapshot(&key)
925 .expect("schema snapshot should be present")
926 .as_bytes(),
927 &[9, 4, 6],
928 );
929
930 store.clear();
931 assert!(store.is_empty());
932 }
933
934 #[test]
935 fn schema_store_loads_latest_snapshot_for_entity() {
936 let mut store = SchemaStore::init(test_memory(252));
937 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
938 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
939 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
940
941 store
942 .insert_persisted_snapshot(EntityTag::new(41), &initial)
943 .expect("initial schema snapshot should encode");
944 store
945 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
946 .expect("other entity schema snapshot should encode");
947 store
948 .insert_persisted_snapshot(EntityTag::new(41), &newer)
949 .expect("newer schema snapshot should encode");
950
951 let latest = store
952 .latest_persisted_snapshot(EntityTag::new(41))
953 .expect("latest schema snapshot should decode")
954 .expect("schema snapshot should exist");
955
956 assert_eq!(latest.version(), SchemaVersion::new(2));
957 assert_eq!(latest.entity_name(), "Newer");
958 }
959
960 #[test]
961 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
962 let mut store = SchemaStore::init(test_memory(242));
963 store.insert_raw_snapshot(
964 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
965 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
966 );
967 store.insert_raw_snapshot(
968 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
969 RawSchemaSnapshot::from_bytes(vec![5, 8]),
970 );
971 store.insert_raw_snapshot(
972 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
973 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
974 );
975
976 let footprint = store.entity_footprint(EntityTag::new(71));
977
978 assert_eq!(footprint.snapshots(), 2);
979 assert_eq!(footprint.encoded_bytes(), 7);
980 assert_eq!(footprint.latest_snapshot_bytes(), 4);
981 }
982
983 #[test]
984 fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
985 let store = SchemaStore::init(test_memory(241));
986
987 assert_eq!(
988 store
989 .catalog_metadata()
990 .expect("empty schema catalog metadata should derive"),
991 None
992 );
993 }
994
995 #[test]
996 fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
997 let mut store = SchemaStore::init(test_memory(240));
998 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
999 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1000 let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1001
1002 store
1003 .insert_persisted_snapshot(EntityTag::new(81), &initial)
1004 .expect("initial schema snapshot should encode");
1005 let initial_metadata = store
1006 .catalog_metadata()
1007 .expect("initial schema catalog metadata should derive")
1008 .expect("initial schema catalog metadata should be present");
1009
1010 store
1011 .insert_persisted_snapshot(EntityTag::new(81), &newer)
1012 .expect("newer schema snapshot should encode");
1013 store
1014 .insert_persisted_snapshot(EntityTag::new(82), &other)
1015 .expect("other schema snapshot should encode");
1016 let updated_metadata = store
1017 .catalog_metadata()
1018 .expect("updated schema catalog metadata should derive")
1019 .expect("updated schema catalog metadata should be present");
1020
1021 assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1022 assert_eq!(initial_metadata.entity_count(), 1);
1023 assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1024 assert_eq!(updated_metadata.entity_count(), 2);
1025 assert_ne!(
1026 initial_metadata.schema_fingerprint(),
1027 updated_metadata.schema_fingerprint(),
1028 "catalog fingerprint must change when latest accepted schema catalog changes"
1029 );
1030 }
1031
1032 #[test]
1033 fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
1034 let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
1035 let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
1036
1037 let mut left = SchemaStore::init(test_memory(239));
1038 left.insert_persisted_snapshot(EntityTag::new(91), &first)
1039 .expect("first schema snapshot should encode");
1040 left.insert_persisted_snapshot(EntityTag::new(92), &second)
1041 .expect("second schema snapshot should encode");
1042
1043 let mut right = SchemaStore::init(test_memory(238));
1044 right
1045 .insert_persisted_snapshot(EntityTag::new(92), &second)
1046 .expect("second schema snapshot should encode");
1047 right
1048 .insert_persisted_snapshot(EntityTag::new(91), &first)
1049 .expect("first schema snapshot should encode");
1050
1051 let left_metadata = left
1052 .catalog_metadata()
1053 .expect("left schema catalog metadata should derive");
1054 let right_metadata = right
1055 .catalog_metadata()
1056 .expect("right schema catalog metadata should derive");
1057
1058 assert_eq!(left_metadata, right_metadata);
1059 }
1060
1061 #[test]
1062 fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
1063 let without_index =
1064 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
1065 let with_index = persisted_schema_snapshot_with_index_for_test(
1066 SchemaVersion::initial(),
1067 "RoleSpecific",
1068 "payload_idx",
1069 );
1070
1071 let mut base = SchemaStore::init(test_memory(237));
1072 base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
1073 .expect("base schema snapshot should encode");
1074 let base_metadata = base
1075 .allocation_metadata()
1076 .expect("base allocation metadata should derive")
1077 .expect("base allocation metadata should be present");
1078
1079 let mut indexed = SchemaStore::init(test_memory(236));
1080 indexed
1081 .insert_persisted_snapshot(EntityTag::new(93), &with_index)
1082 .expect("indexed schema snapshot should encode");
1083 let indexed_metadata = indexed
1084 .allocation_metadata()
1085 .expect("indexed allocation metadata should derive")
1086 .expect("indexed allocation metadata should be present");
1087
1088 assert_eq!(
1089 base_metadata.data().schema_fingerprint(),
1090 indexed_metadata.data().schema_fingerprint(),
1091 "data allocation metadata should ignore accepted index catalog changes"
1092 );
1093 assert_ne!(
1094 base_metadata.index().schema_fingerprint(),
1095 indexed_metadata.index().schema_fingerprint(),
1096 "index allocation metadata should change when accepted index catalog changes"
1097 );
1098 assert_ne!(
1099 base_metadata.schema().schema_fingerprint(),
1100 indexed_metadata.schema().schema_fingerprint(),
1101 "schema allocation metadata should change when full accepted catalog changes"
1102 );
1103 assert_ne!(
1104 indexed_metadata.data().schema_fingerprint(),
1105 indexed_metadata.index().schema_fingerprint(),
1106 "data and index allocation metadata should have distinct role fingerprints"
1107 );
1108 assert_ne!(
1109 indexed_metadata.index().schema_fingerprint(),
1110 indexed_metadata.schema().schema_fingerprint(),
1111 "index and schema allocation metadata should have distinct role fingerprints"
1112 );
1113 }
1114
1115 #[test]
1116 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
1117 let mut store = SchemaStore::init(test_memory(253));
1118 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
1119 SchemaVersion::new(2),
1120 SchemaVersion::initial(),
1121 "Invalid",
1122 );
1123
1124 let err = store
1125 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
1126 .expect_err("schema store should reject mismatched snapshot/layout versions");
1127
1128 assert!(
1129 err.message()
1130 .contains("schema snapshot row-layout version mismatch"),
1131 "schema store should preserve the version mismatch diagnostic"
1132 );
1133 }
1134
1135 #[test]
1136 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
1137 let mut store = SchemaStore::init(test_memory(254));
1138 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
1139 let invalid = PersistedSchemaSnapshot::new(
1140 base.version(),
1141 base.entity_path().to_string(),
1142 base.entity_name().to_string(),
1143 base.first_primary_key_field_id(),
1144 SchemaRowLayout::new(
1145 base.version(),
1146 vec![
1147 (FieldId::new(1), SchemaFieldSlot::new(0)),
1148 (FieldId::new(2), SchemaFieldSlot::new(3)),
1149 ],
1150 ),
1151 base.fields().to_vec(),
1152 );
1153
1154 let err = store
1155 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
1156 .expect_err("schema store should reject divergent field/layout slots");
1157
1158 assert!(
1159 err.message()
1160 .contains("schema snapshot field slot mismatch"),
1161 "schema store should report the duplicated slot divergence"
1162 );
1163 }
1164
1165 #[test]
1166 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
1167 let mut store = SchemaStore::init(test_memory(246));
1168 let base =
1169 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
1170 let invalid = PersistedSchemaSnapshot::new(
1171 base.version(),
1172 base.entity_path().to_string(),
1173 base.entity_name().to_string(),
1174 base.first_primary_key_field_id(),
1175 SchemaRowLayout::new(
1176 base.version(),
1177 vec![
1178 (FieldId::new(1), SchemaFieldSlot::new(0)),
1179 (FieldId::new(2), SchemaFieldSlot::new(0)),
1180 ],
1181 ),
1182 base.fields().to_vec(),
1183 );
1184
1185 let err = store
1186 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
1187 .expect_err("schema store should reject duplicate row-layout slots");
1188
1189 assert!(
1190 err.message()
1191 .contains("schema snapshot duplicate row-layout slot"),
1192 "schema store should report the row-layout slot ambiguity"
1193 );
1194 }
1195
1196 #[test]
1197 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
1198 let mut store = SchemaStore::init(test_memory(248));
1199 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
1200 let invalid = PersistedSchemaSnapshot::new(
1201 base.version(),
1202 base.entity_path().to_string(),
1203 base.entity_name().to_string(),
1204 FieldId::new(99),
1205 base.row_layout().clone(),
1206 base.fields().to_vec(),
1207 );
1208
1209 let err = store
1210 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
1211 .expect_err("schema store should reject snapshots without the primary-key field");
1212
1213 assert!(
1214 err.message()
1215 .contains("schema snapshot primary key field missing from row layout"),
1216 "schema store should report the missing primary-key field"
1217 );
1218 }
1219
1220 #[test]
1221 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
1222 let mut store = SchemaStore::init(test_memory(249));
1223 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1224 let corrupt_key =
1225 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
1226
1227 store
1228 .insert_persisted_snapshot(EntityTag::new(45), &initial)
1229 .expect("initial schema snapshot should encode");
1230 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
1231
1232 let err = store
1233 .latest_persisted_snapshot(EntityTag::new(45))
1234 .expect_err("latest corrupt schema snapshot must fail closed");
1235
1236 assert!(
1237 err.message()
1238 .contains("failed to decode persisted schema snapshot"),
1239 "latest-version lookup should report the corrupt newest snapshot"
1240 );
1241 }
1242
1243 #[test]
1244 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
1245 let mut store = SchemaStore::init(test_memory(250));
1246 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
1247 let invalid = PersistedSchemaSnapshot::new(
1248 base.version(),
1249 base.entity_path().to_string(),
1250 base.entity_name().to_string(),
1251 base.first_primary_key_field_id(),
1252 SchemaRowLayout::new(
1253 base.version(),
1254 vec![
1255 (FieldId::new(1), SchemaFieldSlot::new(0)),
1256 (FieldId::new(2), SchemaFieldSlot::new(3)),
1257 ],
1258 ),
1259 base.fields().to_vec(),
1260 );
1261 let raw = encode_persisted_schema_snapshot(&invalid)
1262 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1263 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
1264
1265 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1266
1267 let err = store
1268 .latest_persisted_snapshot(EntityTag::new(46))
1269 .expect_err("raw decode should reject divergent field/layout slots");
1270
1271 assert!(
1272 err.message()
1273 .contains("persisted schema snapshot field slot mismatch"),
1274 "schema codec should report the raw decoded slot divergence"
1275 );
1276 }
1277
1278 #[test]
1279 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
1280 let mut store = SchemaStore::init(test_memory(247));
1281 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
1282 let invalid = PersistedSchemaSnapshot::new(
1283 base.version(),
1284 base.entity_path().to_string(),
1285 base.entity_name().to_string(),
1286 FieldId::new(99),
1287 base.row_layout().clone(),
1288 base.fields().to_vec(),
1289 );
1290 let raw = encode_persisted_schema_snapshot(&invalid)
1291 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1292 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
1293
1294 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1295
1296 let err = store
1297 .latest_persisted_snapshot(EntityTag::new(48))
1298 .expect_err("raw decode should reject snapshots without the primary-key field");
1299
1300 assert!(
1301 err.message()
1302 .contains("persisted schema snapshot primary key field missing from row layout"),
1303 "schema codec should report the raw decoded missing primary-key field"
1304 );
1305 }
1306
1307 #[test]
1308 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
1309 let mut store = SchemaStore::init(test_memory(245));
1310 let base =
1311 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
1312 let mut fields = base.fields().to_vec();
1313 let duplicate = PersistedFieldSnapshot::new(
1314 fields[1].id(),
1315 fields[0].name().to_string(),
1316 fields[1].slot(),
1317 fields[1].kind().clone(),
1318 fields[1].nested_leaves().to_vec(),
1319 fields[1].nullable(),
1320 fields[1].default().clone(),
1321 fields[1].storage_decode(),
1322 fields[1].leaf_codec(),
1323 );
1324 fields[1] = duplicate;
1325 let invalid = PersistedSchemaSnapshot::new(
1326 base.version(),
1327 base.entity_path().to_string(),
1328 base.entity_name().to_string(),
1329 base.first_primary_key_field_id(),
1330 base.row_layout().clone(),
1331 fields,
1332 );
1333 let raw = encode_persisted_schema_snapshot(&invalid)
1334 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1335 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
1336
1337 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1338
1339 let err = store
1340 .latest_persisted_snapshot(EntityTag::new(50))
1341 .expect_err("raw decode should reject duplicate field names");
1342
1343 assert!(
1344 err.message()
1345 .contains("persisted schema snapshot duplicate field name"),
1346 "schema codec should report the raw decoded field-name ambiguity"
1347 );
1348 }
1349
1350 #[test]
1351 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
1352 let mut store = SchemaStore::init(test_memory(244));
1353 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
1354 let mut fields = base.fields().to_vec();
1355 let invalid_field = PersistedFieldSnapshot::new(
1356 fields[1].id(),
1357 fields[1].name().to_string(),
1358 fields[1].slot(),
1359 fields[1].kind().clone(),
1360 vec![PersistedNestedLeafSnapshot::new(
1361 Vec::new(),
1362 PersistedFieldKind::Blob { max_len: None },
1363 false,
1364 FieldStorageDecode::ByKind,
1365 LeafCodec::Scalar(ScalarCodec::Blob),
1366 )],
1367 fields[1].nullable(),
1368 fields[1].default().clone(),
1369 fields[1].storage_decode(),
1370 fields[1].leaf_codec(),
1371 );
1372 fields[1] = invalid_field;
1373 let invalid = PersistedSchemaSnapshot::new(
1374 base.version(),
1375 base.entity_path().to_string(),
1376 base.entity_name().to_string(),
1377 base.first_primary_key_field_id(),
1378 base.row_layout().clone(),
1379 fields,
1380 );
1381
1382 let err = store
1383 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
1384 .expect_err("schema store should reject empty nested leaf paths");
1385
1386 assert!(
1387 err.message()
1388 .contains("schema snapshot empty nested leaf path"),
1389 "schema store should report the empty nested leaf path"
1390 );
1391 }
1392
1393 #[test]
1394 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
1395 let mut store = SchemaStore::init(test_memory(243));
1396 let base =
1397 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
1398 let mut fields = base.fields().to_vec();
1399 let duplicate_leaves = vec![
1400 PersistedNestedLeafSnapshot::new(
1401 vec!["bytes".to_string()],
1402 PersistedFieldKind::Blob { max_len: None },
1403 false,
1404 FieldStorageDecode::ByKind,
1405 LeafCodec::Scalar(ScalarCodec::Blob),
1406 ),
1407 PersistedNestedLeafSnapshot::new(
1408 vec!["bytes".to_string()],
1409 PersistedFieldKind::Text { max_len: None },
1410 false,
1411 FieldStorageDecode::ByKind,
1412 LeafCodec::Scalar(ScalarCodec::Text),
1413 ),
1414 ];
1415 let invalid_field = PersistedFieldSnapshot::new(
1416 fields[1].id(),
1417 fields[1].name().to_string(),
1418 fields[1].slot(),
1419 fields[1].kind().clone(),
1420 duplicate_leaves,
1421 fields[1].nullable(),
1422 fields[1].default().clone(),
1423 fields[1].storage_decode(),
1424 fields[1].leaf_codec(),
1425 );
1426 fields[1] = invalid_field;
1427 let invalid = PersistedSchemaSnapshot::new(
1428 base.version(),
1429 base.entity_path().to_string(),
1430 base.entity_name().to_string(),
1431 base.first_primary_key_field_id(),
1432 base.row_layout().clone(),
1433 fields,
1434 );
1435 let raw = encode_persisted_schema_snapshot(&invalid)
1436 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
1437 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
1438
1439 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
1440
1441 let err = store
1442 .latest_persisted_snapshot(EntityTag::new(52))
1443 .expect_err("raw decode should reject duplicate nested leaf paths");
1444
1445 assert!(
1446 err.message()
1447 .contains("persisted schema snapshot duplicate nested leaf path"),
1448 "schema codec should report the raw decoded nested path ambiguity"
1449 );
1450 }
1451
1452 #[test]
1453 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
1454 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
1455
1456 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1457 .expect("schema snapshot should encode");
1458 let decoded = raw
1459 .decode_persisted_snapshot()
1460 .expect("schema snapshot should decode");
1461
1462 assert_eq!(decoded, snapshot);
1463 }
1464
1465 fn persisted_schema_snapshot_for_test(
1469 version: SchemaVersion,
1470 entity_name: &str,
1471 ) -> PersistedSchemaSnapshot {
1472 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
1473 }
1474
1475 fn persisted_schema_snapshot_with_index_for_test(
1476 version: SchemaVersion,
1477 entity_name: &str,
1478 index_name: &str,
1479 ) -> PersistedSchemaSnapshot {
1480 let base = persisted_schema_snapshot_for_test(version, entity_name);
1481
1482 PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1483 base.version(),
1484 base.entity_path().to_string(),
1485 base.entity_name().to_string(),
1486 base.primary_key_field_ids().to_vec(),
1487 base.row_layout().clone(),
1488 base.fields().to_vec(),
1489 vec![PersistedIndexSnapshot::new(
1490 0,
1491 index_name.to_string(),
1492 "RoleSpecificStore".to_string(),
1493 false,
1494 PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
1495 FieldId::new(1),
1496 SchemaFieldSlot::new(0),
1497 vec!["id".to_string()],
1498 PersistedFieldKind::Ulid,
1499 false,
1500 )]),
1501 None,
1502 )],
1503 )
1504 }
1505
1506 fn persisted_schema_snapshot_with_layout_version_for_test(
1510 version: SchemaVersion,
1511 layout_version: SchemaVersion,
1512 entity_name: &str,
1513 ) -> PersistedSchemaSnapshot {
1514 PersistedSchemaSnapshot::new(
1515 version,
1516 format!("entities::{entity_name}"),
1517 entity_name.to_string(),
1518 FieldId::new(1),
1519 SchemaRowLayout::new(
1520 layout_version,
1521 vec![
1522 (FieldId::new(1), SchemaFieldSlot::new(0)),
1523 (FieldId::new(2), SchemaFieldSlot::new(1)),
1524 ],
1525 ),
1526 vec![
1527 PersistedFieldSnapshot::new(
1528 FieldId::new(1),
1529 "id".to_string(),
1530 SchemaFieldSlot::new(0),
1531 PersistedFieldKind::Ulid,
1532 Vec::new(),
1533 false,
1534 SchemaFieldDefault::None,
1535 FieldStorageDecode::ByKind,
1536 LeafCodec::Scalar(ScalarCodec::Ulid),
1537 ),
1538 PersistedFieldSnapshot::new(
1539 FieldId::new(2),
1540 "payload".to_string(),
1541 SchemaFieldSlot::new(1),
1542 PersistedFieldKind::Map {
1543 key: Box::new(PersistedFieldKind::Text { max_len: None }),
1544 value: Box::new(PersistedFieldKind::List(Box::new(
1545 PersistedFieldKind::Nat64,
1546 ))),
1547 },
1548 vec![PersistedNestedLeafSnapshot::new(
1549 vec!["bytes".to_string()],
1550 PersistedFieldKind::Blob { max_len: None },
1551 false,
1552 FieldStorageDecode::ByKind,
1553 LeafCodec::Scalar(ScalarCodec::Blob),
1554 )],
1555 false,
1556 SchemaFieldDefault::None,
1557 FieldStorageDecode::ByKind,
1558 LeafCodec::StructuralFallback,
1559 ),
1560 ],
1561 )
1562 }
1563}