1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 direction::Direction,
14 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15 schema::{
16 AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17 PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18 accepted_schema_cache_fingerprint,
19 accepted_schema_cache_fingerprint_for_persisted_snapshot,
20 accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22 },
23 },
24 error::InternalError,
25 traits::Storable,
26 types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32: u32 = 25;
50
51#[cfg(test)]
52thread_local! {
53 static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
54}
55
56#[cfg(test)]
57pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
58 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
59}
60
61#[cfg(test)]
62pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
63 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
64}
65
66#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
75struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
76
77impl RawSchemaKey {
78 #[must_use]
80 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
81 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
82 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
83 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
84
85 Self(out)
86 }
87
88 #[must_use]
90 fn entity_tag(self) -> EntityTag {
91 let mut bytes = [0u8; size_of::<u64>()];
92 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
93
94 EntityTag::new(u64::from_be_bytes(bytes))
95 }
96
97 #[must_use]
99 fn version(self) -> u32 {
100 let mut bytes = [0u8; size_of::<u32>()];
101 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
102
103 u32::from_be_bytes(bytes)
104 }
105
106 fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
107 (
108 RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
109 RangeBound::Included(Self::from_entity_version(
110 entity,
111 SchemaVersion::new(u32::MAX),
112 )),
113 )
114 }
115}
116
117impl Storable for RawSchemaKey {
118 fn to_bytes(&self) -> Cow<'_, [u8]> {
119 Cow::Borrowed(&self.0)
120 }
121
122 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
123 debug_assert_eq!(
124 bytes.len(),
125 SCHEMA_KEY_BYTES_USIZE,
126 "RawSchemaKey::from_bytes received unexpected byte length",
127 );
128
129 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
130 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
131 }
132
133 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
134 out.copy_from_slice(bytes.as_ref());
135 Self(out)
136 }
137
138 fn into_bytes(self) -> Vec<u8> {
139 self.0.to_vec()
140 }
141
142 const BOUND: StorableBound = StorableBound::Bounded {
143 max_size: SCHEMA_KEY_BYTES,
144 is_fixed_size: true,
145 };
146}
147
148#[derive(Clone, Debug, Eq, PartialEq)]
158struct RawSchemaSnapshot {
159 payload: Vec<u8>,
160 accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
161}
162
163impl RawSchemaSnapshot {
164 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
166 validate_typed_schema_snapshot_for_store(snapshot)?;
167
168 let accepted_schema_fingerprint =
169 accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
170 let payload = encode_persisted_schema_snapshot(snapshot)?;
171
172 Ok(Self {
173 payload,
174 accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
175 })
176 }
177
178 #[must_use]
180 #[cfg(test)]
181 const fn from_bytes(payload: Vec<u8>) -> Self {
182 Self {
183 payload,
184 accepted_schema_fingerprint: None,
185 }
186 }
187
188 #[must_use]
190 const fn as_bytes(&self) -> &[u8] {
191 self.payload.as_slice()
192 }
193
194 #[must_use]
196 fn into_bytes(self) -> Vec<u8> {
197 self.payload
198 }
199
200 fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
203 self.accepted_schema_fingerprint.ok_or_else(|| {
204 InternalError::store_corruption(
205 "obsolete raw schema snapshot storage format missing accepted schema identity header",
206 )
207 })
208 }
209
210 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
212 decode_persisted_schema_snapshot(self.as_bytes())
213 }
214}
215
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct AcceptedCatalogIdentity {
218 entity_tag: EntityTag,
219 entity_path: &'static str,
220 store_path: &'static str,
221 accepted_schema_version: SchemaVersion,
222 fingerprint_method_version: u8,
223 accepted_schema_fingerprint: CommitSchemaFingerprint,
224}
225
226impl AcceptedCatalogIdentity {
227 #[must_use]
228 pub(in crate::db) const fn new(
229 entity_tag: EntityTag,
230 entity_path: &'static str,
231 store_path: &'static str,
232 accepted_schema_version: SchemaVersion,
233 accepted_schema_fingerprint: CommitSchemaFingerprint,
234 ) -> Self {
235 Self {
236 entity_tag,
237 entity_path,
238 store_path,
239 accepted_schema_version,
240 fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
241 accepted_schema_fingerprint,
242 }
243 }
244
245 #[must_use]
246 pub(in crate::db) const fn entity_tag(self) -> EntityTag {
247 self.entity_tag
248 }
249
250 #[must_use]
251 pub(in crate::db) const fn entity_path(self) -> &'static str {
252 self.entity_path
253 }
254
255 #[must_use]
256 pub(in crate::db) const fn store_path(self) -> &'static str {
257 self.store_path
258 }
259
260 #[must_use]
261 pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
262 self.accepted_schema_version
263 }
264
265 #[must_use]
266 pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
267 self.fingerprint_method_version
268 }
269
270 #[must_use]
271 pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
272 self.accepted_schema_fingerprint
273 }
274}
275
276#[derive(Clone, Debug, Eq, PartialEq)]
277pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
278 identity: AcceptedCatalogIdentity,
279 raw_snapshot: Vec<u8>,
280}
281
282impl AcceptedCatalogSnapshotSelection {
283 #[must_use]
284 const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
285 Self {
286 identity,
287 raw_snapshot,
288 }
289 }
290
291 #[must_use]
292 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
293 self.identity
294 }
295
296 pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
297 let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
298 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
299 let identity = self.identity();
300
301 if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
302 return Err(InternalError::store_invariant(
303 "accepted catalog identity selected a different schema version than the decoded snapshot",
304 ));
305 }
306 if accepted.entity_path() != identity.entity_path() {
307 return Err(InternalError::store_invariant(
308 "accepted catalog identity selected a different entity path than the decoded snapshot",
309 ));
310 }
311
312 let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
313 if decoded_fingerprint != identity.accepted_schema_fingerprint() {
314 return Err(InternalError::store_invariant(
315 "accepted catalog identity fingerprint did not match the decoded snapshot",
316 ));
317 }
318
319 Ok(accepted)
320 }
321}
322
323impl Storable for RawSchemaSnapshot {
324 fn to_bytes(&self) -> Cow<'_, [u8]> {
325 let Some(fingerprint) = self.accepted_schema_fingerprint else {
326 return Cow::Borrowed(self.as_bytes());
327 };
328
329 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
330 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
331 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
332 bytes.extend_from_slice(&fingerprint);
333 bytes.extend_from_slice(self.as_bytes());
334
335 Cow::Owned(bytes)
336 }
337
338 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
339 let bytes = bytes.into_owned();
340 if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
341 && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
342 && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
343 {
344 let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
345 let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
346 let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
347 fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
348
349 return Self {
350 payload: bytes[fingerprint_end..].to_vec(),
351 accepted_schema_fingerprint: Some(fingerprint),
352 };
353 }
354
355 Self {
356 payload: bytes,
357 accepted_schema_fingerprint: None,
358 }
359 }
360
361 fn into_bytes(self) -> Vec<u8> {
362 let Some(fingerprint) = self.accepted_schema_fingerprint else {
363 return self.payload;
364 };
365
366 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
367 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
368 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
369 bytes.extend_from_slice(&fingerprint);
370 bytes.extend_from_slice(&self.payload);
371
372 bytes
373 }
374
375 const BOUND: StorableBound = StorableBound::Bounded {
376 max_size: MAX_SCHEMA_SNAPSHOT_BYTES + RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32,
377 is_fixed_size: false,
378 };
379}
380
381fn validate_typed_schema_snapshot_for_store(
385 snapshot: &PersistedSchemaSnapshot,
386) -> Result<(), InternalError> {
387 if let Some(detail) = schema_snapshot_integrity_detail(
388 "schema snapshot",
389 snapshot.version(),
390 snapshot.primary_key_field_ids(),
391 snapshot.row_layout(),
392 snapshot.fields(),
393 ) {
394 return Err(InternalError::store_invariant(detail));
395 }
396
397 Ok(())
398}
399
400#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub(in crate::db) struct SchemaStoreFootprint {
410 snapshots: u64,
411 encoded_bytes: u64,
412 latest_snapshot_bytes: u64,
413}
414
415#[derive(Clone, Copy, Debug, Eq, PartialEq)]
423pub(in crate::db) struct SchemaStoreCatalogMetadata {
424 schema_version: SchemaVersion,
425 schema_fingerprint_method_version: u8,
426 schema_fingerprint: CommitSchemaFingerprint,
427 entity_count: u64,
428}
429
430impl SchemaStoreCatalogMetadata {
431 #[must_use]
433 const fn new(
434 schema_version: SchemaVersion,
435 schema_fingerprint_method_version: u8,
436 schema_fingerprint: CommitSchemaFingerprint,
437 entity_count: u64,
438 ) -> Self {
439 Self {
440 schema_version,
441 schema_fingerprint_method_version,
442 schema_fingerprint,
443 entity_count,
444 }
445 }
446
447 #[must_use]
449 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
450 self.schema_version
451 }
452
453 #[must_use]
455 pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
456 self.schema_fingerprint_method_version
457 }
458
459 #[must_use]
462 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
463 self.schema_fingerprint
464 }
465
466 #[must_use]
468 pub(in crate::db) const fn entity_count(self) -> u64 {
469 self.entity_count
470 }
471}
472
473#[derive(Clone, Copy, Debug, Eq, PartialEq)]
482pub(in crate::db) struct SchemaStoreAllocationMetadata {
483 data: SchemaStoreCatalogMetadata,
484 index: SchemaStoreCatalogMetadata,
485 schema: SchemaStoreCatalogMetadata,
486}
487
488impl SchemaStoreAllocationMetadata {
489 #[must_use]
492 const fn new(
493 data: SchemaStoreCatalogMetadata,
494 index: SchemaStoreCatalogMetadata,
495 schema: SchemaStoreCatalogMetadata,
496 ) -> Self {
497 Self {
498 data,
499 index,
500 schema,
501 }
502 }
503
504 #[must_use]
506 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
507 self.data
508 }
509
510 #[must_use]
512 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
513 self.index
514 }
515
516 #[must_use]
519 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
520 self.schema
521 }
522}
523
524impl SchemaStoreFootprint {
525 #[must_use]
527 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
528 Self {
529 snapshots,
530 encoded_bytes,
531 latest_snapshot_bytes,
532 }
533 }
534
535 #[must_use]
537 pub(in crate::db) const fn snapshots(self) -> u64 {
538 self.snapshots
539 }
540
541 #[must_use]
543 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
544 self.encoded_bytes
545 }
546
547 #[must_use]
549 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
550 self.latest_snapshot_bytes
551 }
552}
553
554pub struct SchemaStore {
563 backend: SchemaStoreBackend,
564}
565
566enum SchemaStoreBackend {
567 Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
568 Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
569 Journaled {
570 canonical:
571 StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
572 live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
573 tombstones: BTreeSet<RawSchemaKey>,
574 },
575}
576
577#[derive(Clone, Copy, Debug, Eq, PartialEq)]
579enum SchemaStoreVisit {
580 Continue,
581 #[allow(
582 dead_code,
583 reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
584 )]
585 Stop,
586}
587
588impl SchemaStoreVisit {
589 const fn should_stop(self) -> bool {
590 matches!(self, Self::Stop)
591 }
592}
593
594impl SchemaStore {
595 #[must_use]
597 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
598 Self {
599 backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
600 }
601 }
602
603 #[must_use]
605 pub const fn init_heap() -> Self {
606 Self {
607 backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
608 }
609 }
610
611 #[must_use]
616 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
617 Self {
618 backend: SchemaStoreBackend::Journaled {
619 canonical: StableBTreeMap::init(memory),
620 live: StdBTreeMap::new(),
621 tombstones: BTreeSet::new(),
622 },
623 }
624 }
625
626 pub(in crate::db) fn insert_persisted_snapshot(
628 &mut self,
629 entity: EntityTag,
630 snapshot: &PersistedSchemaSnapshot,
631 ) -> Result<(), InternalError> {
632 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
633 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
634 let _ = self.insert_raw_snapshot(key, raw_snapshot);
635
636 Ok(())
637 }
638
639 pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
643 &mut self,
644 expected: AcceptedCatalogIdentity,
645 snapshot: &PersistedSchemaSnapshot,
646 ) -> Result<(), InternalError> {
647 let live = self.latest_catalog_identity(
648 expected.entity_tag(),
649 expected.entity_path(),
650 expected.store_path(),
651 )?;
652 if live
653 .as_ref()
654 .map(AcceptedCatalogSnapshotSelection::identity)
655 != Some(expected)
656 {
657 return Err(InternalError::schema_ddl_publication_race_lost(
658 expected.entity_path(),
659 ));
660 }
661
662 self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
663 }
664
665 pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
668 let SchemaStoreBackend::Journaled {
669 live, tombstones, ..
670 } = &mut self.backend
671 else {
672 return Err(InternalError::store_invariant(
673 "journaled live projection reset requires a journaled schema store",
674 ));
675 };
676
677 live.clear();
678 tombstones.clear();
679
680 Ok(())
681 }
682
683 pub(in crate::db) fn fold_persisted_snapshot(
685 &mut self,
686 entity: EntityTag,
687 snapshot: &PersistedSchemaSnapshot,
688 ) -> Result<(), InternalError> {
689 let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
690 return Err(InternalError::store_invariant(
691 "journal schema fold requires a journaled schema store",
692 ));
693 };
694
695 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
696 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
697 canonical.insert(key, raw_snapshot);
698
699 Ok(())
700 }
701
702 #[cfg(test)]
704 pub(in crate::db) fn get_persisted_snapshot(
705 &self,
706 entity: EntityTag,
707 version: SchemaVersion,
708 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
709 let key = RawSchemaKey::from_entity_version(entity, version);
710 self.get_raw_snapshot(&key)
711 .map(|snapshot| snapshot.decode_persisted_snapshot())
712 .transpose()
713 }
714
715 pub(in crate::db) fn latest_persisted_snapshot(
717 &self,
718 entity: EntityTag,
719 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
720 self.latest_raw_snapshot(entity)
721 .map(|snapshot| snapshot.decode_persisted_snapshot())
722 .transpose()
723 }
724
725 pub(in crate::db) fn latest_catalog_identity(
728 &self,
729 entity: EntityTag,
730 entity_path: &'static str,
731 store_path: &'static str,
732 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
733 let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
734 return Ok(None);
735 };
736 let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
737 let identity =
738 AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
739
740 Ok(Some(AcceptedCatalogSnapshotSelection::new(
741 identity,
742 raw_snapshot.into_bytes(),
743 )))
744 }
745
746 #[must_use]
748 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
749 let mut snapshots = 0u64;
750 let mut encoded_bytes = 0u64;
751 let mut latest = None::<(SchemaVersion, u64)>;
752
753 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
754 if key.entity_tag() != entity {
755 return Ok(SchemaStoreVisit::Continue);
756 }
757
758 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
759 snapshots = snapshots.saturating_add(1);
760 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
761
762 let version = SchemaVersion::new(key.version());
763 if latest
764 .as_ref()
765 .is_none_or(|(latest_version, _)| version > *latest_version)
766 {
767 latest = Some((version, snapshot_bytes));
768 }
769 Ok(SchemaStoreVisit::Continue)
770 });
771
772 SchemaStoreFootprint::new(
773 snapshots,
774 encoded_bytes,
775 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
776 )
777 }
778
779 #[cfg(test)]
785 pub(in crate::db) fn catalog_metadata(
786 &self,
787 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
788 Ok(self
789 .allocation_metadata()?
790 .map(SchemaStoreAllocationMetadata::schema))
791 }
792
793 pub(in crate::db) fn allocation_metadata(
800 &self,
801 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
802 let latest_by_entity = self.latest_raw_snapshots_by_entity();
803 if latest_by_entity.is_empty() {
804 return Ok(None);
805 }
806
807 Ok(Some(SchemaStoreAllocationMetadata::new(
808 derive_data_allocation_metadata(&latest_by_entity)?,
809 derive_index_allocation_metadata(&latest_by_entity)?,
810 derive_schema_catalog_metadata(&latest_by_entity)?,
811 )))
812 }
813
814 fn insert_raw_snapshot(
816 &mut self,
817 key: RawSchemaKey,
818 snapshot: RawSchemaSnapshot,
819 ) -> Option<RawSchemaSnapshot> {
820 let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
821 self.get_raw_snapshot_for_backend(&key)
822 } else {
823 None
824 };
825 match &mut self.backend {
826 SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
827 SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
828 SchemaStoreBackend::Journaled {
829 live, tombstones, ..
830 } => {
831 tombstones.remove(&key);
832 live.insert(key, snapshot);
833 previous_journaled
834 }
835 }
836 }
837
838 #[must_use]
840 #[cfg(test)]
841 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
842 match &self.backend {
843 SchemaStoreBackend::Stable(map) => map.get(key),
844 SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
845 SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
846 }
847 }
848
849 #[must_use]
851 #[cfg(test)]
852 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
853 match &self.backend {
854 SchemaStoreBackend::Stable(map) => map.contains_key(key),
855 SchemaStoreBackend::Heap(map) => map.contains_key(key),
856 SchemaStoreBackend::Journaled { .. } => {
857 self.get_raw_snapshot_for_backend(key).is_some()
858 }
859 }
860 }
861
862 #[must_use]
864 #[cfg(test)]
865 pub(in crate::db) fn len(&self) -> u64 {
866 match &self.backend {
867 SchemaStoreBackend::Stable(map) => map.len(),
868 SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
869 SchemaStoreBackend::Journaled { .. } => {
870 let mut count = 0_u64;
871 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
872 count = count.saturating_add(1);
873 Ok(SchemaStoreVisit::Continue)
874 });
875 count
876 }
877 }
878 }
879
880 #[must_use]
882 #[cfg(test)]
883 pub(in crate::db) fn is_empty(&self) -> bool {
884 match &self.backend {
885 SchemaStoreBackend::Stable(map) => map.is_empty(),
886 SchemaStoreBackend::Heap(map) => map.is_empty(),
887 SchemaStoreBackend::Journaled { .. } => {
888 let mut empty = true;
889 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
890 empty = false;
891 Ok(SchemaStoreVisit::Stop)
892 });
893 empty
894 }
895 }
896 }
897
898 #[cfg(test)]
900 pub(in crate::db) fn clear(&mut self) {
901 match &mut self.backend {
902 SchemaStoreBackend::Stable(map) => map.clear_new(),
903 SchemaStoreBackend::Heap(map) => map.clear(),
904 SchemaStoreBackend::Journaled {
905 canonical,
906 live,
907 tombstones,
908 } => {
909 live.clear();
910 tombstones.clear();
911 for entry in canonical.iter() {
912 tombstones.insert(*entry.key());
913 }
914 }
915 }
916 }
917
918 fn latest_raw_snapshots_by_entity(
919 &self,
920 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
921 #[cfg(test)]
922 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
923
924 let mut latest_by_entity =
925 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
926
927 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
928 let version = SchemaVersion::new(key.version());
929 match latest_by_entity.get_mut(&key.entity_tag()) {
930 Some((latest_version, latest_snapshot)) if version > *latest_version => {
931 *latest_version = version;
932 *latest_snapshot = snapshot.clone();
933 }
934 None => {
935 latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
936 }
937 Some(_) => {}
938 }
939 Ok(SchemaStoreVisit::Continue)
940 });
941
942 latest_by_entity
943 }
944
945 fn visit_raw_snapshots<E>(
948 &self,
949 visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
950 ) -> Result<(), E> {
951 match &self.backend {
952 SchemaStoreBackend::Stable(map) => {
953 let mut visitor = visitor;
954 for entry in map.iter() {
955 if visitor(entry.key(), &entry.value())?.should_stop() {
956 break;
957 }
958 }
959 }
960 SchemaStoreBackend::Heap(map) => {
961 let mut visitor = visitor;
962 for (key, snapshot) in map {
963 if visitor(key, snapshot)?.should_stop() {
964 break;
965 }
966 }
967 }
968 SchemaStoreBackend::Journaled {
969 canonical,
970 live,
971 tombstones,
972 } => Self::visit_journaled_raw_snapshot_range(
973 canonical,
974 live,
975 tombstones,
976 (RangeBound::Unbounded, RangeBound::Unbounded),
977 Direction::Asc,
978 visitor,
979 )?,
980 }
981
982 Ok(())
983 }
984
985 #[cfg(test)]
986 #[must_use]
987 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
988 match &self.backend {
989 SchemaStoreBackend::Stable(map)
990 | SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
991 SchemaStoreBackend::Heap(_) => 0,
992 }
993 }
994
995 fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
996 let SchemaStoreBackend::Journaled {
997 canonical,
998 live,
999 tombstones,
1000 } = &self.backend
1001 else {
1002 return None;
1003 };
1004
1005 if tombstones.contains(key) {
1006 return None;
1007 }
1008 live.get(key).cloned().or_else(|| canonical.get(key))
1009 }
1010
1011 fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
1012 self.latest_raw_snapshot_entry(entity)
1013 .map(|(_, snapshot)| snapshot)
1014 }
1015
1016 fn latest_raw_snapshot_entry(
1017 &self,
1018 entity: EntityTag,
1019 ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
1020 let bounds = RawSchemaKey::entity_range_bounds(entity);
1021 match &self.backend {
1022 SchemaStoreBackend::Stable(map) => map
1023 .range((bounds.0, bounds.1))
1024 .next_back()
1025 .map(|entry| (SchemaVersion::new(entry.key().version()), entry.value())),
1026 SchemaStoreBackend::Heap(map) => map
1027 .range((bounds.0, bounds.1))
1028 .next_back()
1029 .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
1030 SchemaStoreBackend::Journaled {
1031 canonical,
1032 live,
1033 tombstones,
1034 } => {
1035 let mut latest = None;
1036 let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
1037 canonical,
1038 live,
1039 tombstones,
1040 bounds,
1041 Direction::Desc,
1042 |key, snapshot| {
1043 latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1044 Ok(SchemaStoreVisit::Stop)
1045 },
1046 );
1047 latest
1048 }
1049 }
1050 }
1051
1052 fn visit_journaled_raw_snapshot_range<E>(
1053 canonical: &StableBTreeMap<
1054 RawSchemaKey,
1055 RawSchemaSnapshot,
1056 VirtualMemory<DefaultMemoryImpl>,
1057 >,
1058 live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1059 tombstones: &BTreeSet<RawSchemaKey>,
1060 bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1061 direction: Direction,
1062 mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1063 ) -> Result<(), E> {
1064 match direction {
1065 Direction::Asc => visit_ordered_overlay(
1066 canonical.range((bounds.0, bounds.1)),
1067 live.range((bounds.0, bounds.1)),
1068 Direction::Asc,
1069 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1070 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1071 |live_entry| !tombstones.contains(live_entry.0),
1072 |entry| {
1073 let visit = match entry {
1074 OrderedOverlayEntry::Canonical(canonical_entry) => {
1075 visitor(canonical_entry.key(), &canonical_entry.value())?
1076 }
1077 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1078 };
1079 Ok(if visit.should_stop() {
1080 OrderedOverlayVisit::Stop
1081 } else {
1082 OrderedOverlayVisit::Continue
1083 })
1084 },
1085 ),
1086 Direction::Desc => visit_ordered_overlay(
1087 canonical.range((bounds.0, bounds.1)).rev(),
1088 live.range((bounds.0, bounds.1)).rev(),
1089 Direction::Desc,
1090 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1091 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1092 |live_entry| !tombstones.contains(live_entry.0),
1093 |entry| {
1094 let visit = match entry {
1095 OrderedOverlayEntry::Canonical(canonical_entry) => {
1096 visitor(canonical_entry.key(), &canonical_entry.value())?
1097 }
1098 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1099 };
1100 Ok(if visit.should_stop() {
1101 OrderedOverlayVisit::Stop
1102 } else {
1103 OrderedOverlayVisit::Continue
1104 })
1105 },
1106 ),
1107 }
1108 }
1109}
1110
1111fn derive_data_allocation_metadata(
1112 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1113) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1114 let mut max_version = SchemaVersion::initial();
1115 let mut hasher = new_hash_sha256();
1116 write_hash_tag_u8(
1117 &mut hasher,
1118 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1119 );
1120
1121 for (entity, (_, snapshot)) in latest_by_entity {
1122 let persisted = snapshot.decode_persisted_snapshot()?;
1123 if persisted.version() > max_version {
1124 max_version = persisted.version();
1125 }
1126
1127 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1128 persisted.version(),
1129 persisted.entity_path().to_string(),
1130 persisted.entity_name().to_string(),
1131 persisted.primary_key_field_ids().to_vec(),
1132 persisted.row_layout().clone(),
1133 persisted.fields().to_vec(),
1134 Vec::new(),
1135 );
1136 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1137
1138 write_hash_u64(&mut hasher, entity.value());
1139 write_hash_u32(&mut hasher, persisted.version().get());
1140 write_hash_len_u32(&mut hasher, encoded.len());
1141 hasher.update(encoded);
1142 }
1143
1144 Ok(finalize_schema_metadata(
1145 max_version,
1146 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1147 hasher,
1148 latest_by_entity.len(),
1149 ))
1150}
1151
1152fn derive_index_allocation_metadata(
1153 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1154) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1155 let mut max_version = SchemaVersion::initial();
1156 let mut hasher = new_hash_sha256();
1157 write_hash_tag_u8(
1158 &mut hasher,
1159 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1160 );
1161
1162 for (entity, (_, snapshot)) in latest_by_entity {
1163 let persisted = snapshot.decode_persisted_snapshot()?;
1164 if persisted.version() > max_version {
1165 max_version = persisted.version();
1166 }
1167
1168 write_hash_u64(&mut hasher, entity.value());
1169 write_hash_u32(&mut hasher, persisted.version().get());
1170 write_hash_len_u32(&mut hasher, persisted.indexes().len());
1171 for index in persisted.indexes() {
1172 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1173 write_hash_str_u32(&mut hasher, index.name());
1174 write_hash_str_u32(&mut hasher, index.store());
1175 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1176 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1177 match index.predicate_sql() {
1178 Some(predicate_sql) => {
1179 write_hash_tag_u8(&mut hasher, 1);
1180 write_hash_str_u32(&mut hasher, predicate_sql);
1181 }
1182 None => write_hash_tag_u8(&mut hasher, 0),
1183 }
1184 hash_persisted_index_key(&mut hasher, index.key());
1185 }
1186 }
1187
1188 Ok(finalize_schema_metadata(
1189 max_version,
1190 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1191 hasher,
1192 latest_by_entity.len(),
1193 ))
1194}
1195
1196fn derive_schema_catalog_metadata(
1197 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1198) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1199 let mut max_version = SchemaVersion::initial();
1200 let mut hasher = new_hash_sha256();
1201 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1202
1203 for (entity, (version, snapshot)) in latest_by_entity {
1204 let persisted = snapshot.decode_persisted_snapshot()?;
1205 if persisted.version() > max_version {
1206 max_version = persisted.version();
1207 }
1208
1209 write_hash_u64(&mut hasher, entity.value());
1210 write_hash_u32(&mut hasher, version.get());
1211 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1212 hasher.update(snapshot.as_bytes());
1213 }
1214
1215 Ok(finalize_schema_metadata(
1216 max_version,
1217 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1218 hasher,
1219 latest_by_entity.len(),
1220 ))
1221}
1222
1223fn finalize_schema_metadata(
1224 schema_version: SchemaVersion,
1225 schema_fingerprint_method_version: u8,
1226 hasher: sha2::Sha256,
1227 entity_count: usize,
1228) -> SchemaStoreCatalogMetadata {
1229 let digest = finalize_hash_sha256(hasher);
1230 let mut schema_fingerprint = [0u8; 16];
1231 schema_fingerprint.copy_from_slice(&digest[..16]);
1232
1233 SchemaStoreCatalogMetadata::new(
1234 schema_version,
1235 schema_fingerprint_method_version,
1236 schema_fingerprint,
1237 u64::try_from(entity_count).unwrap_or(u64::MAX),
1238 )
1239}
1240
1241fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1242 match key {
1243 PersistedIndexKeySnapshot::FieldPath(paths) => {
1244 write_hash_tag_u8(hasher, 1);
1245 write_hash_len_u32(hasher, paths.len());
1246 for path in paths {
1247 hash_persisted_index_field_path(hasher, path);
1248 }
1249 }
1250 PersistedIndexKeySnapshot::Items(items) => {
1251 write_hash_tag_u8(hasher, 2);
1252 write_hash_len_u32(hasher, items.len());
1253 for item in items {
1254 match item {
1255 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1256 write_hash_tag_u8(hasher, 1);
1257 hash_persisted_index_field_path(hasher, path);
1258 }
1259 PersistedIndexKeyItemSnapshot::Expression(expression) => {
1260 write_hash_tag_u8(hasher, 2);
1261 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1262 hash_persisted_index_field_path(hasher, expression.source());
1263 hash_persisted_field_kind(hasher, expression.input_kind());
1264 hash_persisted_field_kind(hasher, expression.output_kind());
1265 write_hash_str_u32(hasher, expression.canonical_text());
1266 }
1267 }
1268 }
1269 }
1270 }
1271}
1272
1273fn hash_persisted_index_field_path(
1274 hasher: &mut sha2::Sha256,
1275 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1276) {
1277 write_hash_u32(hasher, path.field_id().get());
1278 write_hash_u32(hasher, u32::from(path.slot().get()));
1279 write_hash_len_u32(hasher, path.path().len());
1280 for segment in path.path() {
1281 write_hash_str_u32(hasher, segment);
1282 }
1283 hash_persisted_field_kind(hasher, path.kind());
1284 write_hash_tag_u8(hasher, u8::from(path.nullable()));
1285}
1286
1287fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1288 match kind {
1289 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1290 PersistedFieldKind::Blob { max_len } => {
1291 write_hash_tag_u8(hasher, 2);
1292 hash_optional_u32(hasher, *max_len);
1293 }
1294 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1295 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1296 PersistedFieldKind::Decimal { scale } => {
1297 write_hash_tag_u8(hasher, 5);
1298 write_hash_u32(hasher, *scale);
1299 }
1300 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1301 PersistedFieldKind::Enum { path, variants } => {
1302 write_hash_tag_u8(hasher, 7);
1303 write_hash_str_u32(hasher, path);
1304 write_hash_len_u32(hasher, variants.len());
1305 for variant in variants {
1306 write_hash_str_u32(hasher, variant.ident());
1307 match variant.payload_kind() {
1308 Some(payload_kind) => {
1309 write_hash_tag_u8(hasher, 1);
1310 hash_persisted_field_kind(hasher, payload_kind);
1311 }
1312 None => write_hash_tag_u8(hasher, 0),
1313 }
1314 write_hash_str_u32(
1315 hasher,
1316 field_storage_decode_name(variant.payload_storage_decode()),
1317 );
1318 }
1319 }
1320 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1321 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1322 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1323 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1324 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1325 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1326 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1327 PersistedFieldKind::IntBig { max_bytes } => {
1328 write_hash_tag_u8(hasher, 15);
1329 write_hash_u32(hasher, *max_bytes);
1330 }
1331 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1332 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1333 PersistedFieldKind::Text { max_len } => {
1334 write_hash_tag_u8(hasher, 18);
1335 hash_optional_u32(hasher, *max_len);
1336 }
1337 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1338 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1339 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1340 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1341 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1342 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1343 PersistedFieldKind::NatBig { max_bytes } => {
1344 write_hash_tag_u8(hasher, 25);
1345 write_hash_u32(hasher, *max_bytes);
1346 }
1347 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1348 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1349 PersistedFieldKind::Relation {
1350 target_path,
1351 target_entity_name,
1352 target_entity_tag,
1353 target_store_path,
1354 key_kind,
1355 strength,
1356 } => {
1357 write_hash_tag_u8(hasher, 28);
1358 write_hash_str_u32(hasher, target_path);
1359 write_hash_str_u32(hasher, target_entity_name);
1360 write_hash_u64(hasher, target_entity_tag.value());
1361 write_hash_str_u32(hasher, target_store_path);
1362 hash_persisted_field_kind(hasher, key_kind);
1363 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1364 }
1365 PersistedFieldKind::List(inner) => {
1366 write_hash_tag_u8(hasher, 29);
1367 hash_persisted_field_kind(hasher, inner);
1368 }
1369 PersistedFieldKind::Set(inner) => {
1370 write_hash_tag_u8(hasher, 30);
1371 hash_persisted_field_kind(hasher, inner);
1372 }
1373 PersistedFieldKind::Map { key, value } => {
1374 write_hash_tag_u8(hasher, 31);
1375 hash_persisted_field_kind(hasher, key);
1376 hash_persisted_field_kind(hasher, value);
1377 }
1378 PersistedFieldKind::Structured { queryable } => {
1379 write_hash_tag_u8(hasher, 32);
1380 write_hash_tag_u8(hasher, u8::from(*queryable));
1381 }
1382 }
1383}
1384
1385fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1386 match value {
1387 Some(value) => {
1388 write_hash_tag_u8(hasher, 1);
1389 write_hash_u32(hasher, value);
1390 }
1391 None => write_hash_tag_u8(hasher, 0),
1392 }
1393}
1394
1395const fn persisted_index_origin_name(
1396 origin: crate::db::schema::PersistedIndexOrigin,
1397) -> &'static str {
1398 match origin {
1399 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1400 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1401 }
1402}
1403
1404const fn persisted_expression_op_name(
1405 op: crate::db::schema::PersistedIndexExpressionOp,
1406) -> &'static str {
1407 match op {
1408 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1409 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1410 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1411 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1412 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1413 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1414 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1415 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1416 }
1417}
1418
1419const fn persisted_relation_strength_name(
1420 strength: crate::db::schema::PersistedRelationStrength,
1421) -> &'static str {
1422 match strength {
1423 crate::db::schema::PersistedRelationStrength::Strong => "strong",
1424 crate::db::schema::PersistedRelationStrength::Weak => "weak",
1425 }
1426}
1427
1428const fn field_storage_decode_name(
1429 decode: crate::model::field::FieldStorageDecode,
1430) -> &'static str {
1431 match decode {
1432 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1433 crate::model::field::FieldStorageDecode::Value => "value",
1434 }
1435}
1436
1437#[cfg(test)]
1442mod tests {
1443 use super::{
1444 RawSchemaKey, RawSchemaSnapshot, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1445 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1446 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION, SchemaStore, SchemaStoreBackend,
1447 SchemaStoreVisit,
1448 };
1449 use crate::{
1450 db::{
1451 direction::Direction,
1452 schema::{
1453 AcceptedSchemaSnapshot, FieldId, PersistedFieldKind, PersistedFieldSnapshot,
1454 PersistedIndexFieldPathSnapshot, PersistedIndexKeySnapshot, PersistedIndexSnapshot,
1455 PersistedNestedLeafSnapshot, PersistedSchemaSnapshot, SchemaFieldDefault,
1456 SchemaFieldSlot, SchemaRowLayout, SchemaVersion, accepted_schema_cache_fingerprint,
1457 encode_persisted_schema_snapshot, persisted_schema_snapshot_decode_count_for_tests,
1458 reset_persisted_schema_snapshot_decode_count_for_tests,
1459 },
1460 },
1461 model::field::{FieldStorageDecode, LeafCodec, ScalarCodec},
1462 testing::test_memory,
1463 traits::Storable,
1464 types::EntityTag,
1465 };
1466 use std::borrow::Cow;
1467 use std::convert::Infallible;
1468
1469 #[test]
1470 fn raw_schema_key_round_trips_entity_and_version() {
1471 let key = RawSchemaKey::from_entity_version(EntityTag::new(0x0102_0304_0506_0708), {
1472 SchemaVersion::initial()
1473 });
1474 let encoded = key.to_bytes().into_owned();
1475 let decoded = RawSchemaKey::from_bytes(Cow::Owned(encoded));
1476
1477 assert_eq!(decoded.entity_tag(), EntityTag::new(0x0102_0304_0506_0708));
1478 assert_eq!(decoded.version(), SchemaVersion::initial().get());
1479 }
1480
1481 #[test]
1482 fn raw_schema_snapshot_round_trips_payload_bytes() {
1483 let snapshot = RawSchemaSnapshot::from_bytes(vec![1, 2, 3, 5, 8]);
1484 let encoded = snapshot.to_bytes().into_owned();
1485 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
1486
1487 assert_eq!(decoded.as_bytes(), &[1, 2, 3, 5, 8]);
1488 assert_eq!(decoded.into_bytes(), vec![1, 2, 3, 5, 8]);
1489 }
1490
1491 #[test]
1492 fn raw_schema_snapshot_round_trips_identity_header_for_typed_snapshot() {
1493 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Header");
1494 let accepted = AcceptedSchemaSnapshot::try_new(snapshot.clone())
1495 .expect("typed schema snapshot should be accepted");
1496 let expected_fingerprint = accepted_schema_cache_fingerprint(&accepted)
1497 .expect("accepted schema fingerprint should derive");
1498 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
1499 .expect("typed schema snapshot should encode");
1500 let encoded = raw.to_bytes().into_owned();
1501 let decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(encoded));
1502 let owned_encoded = <RawSchemaSnapshot as Storable>::into_bytes(raw.clone());
1503 let owned_decoded = <RawSchemaSnapshot as Storable>::from_bytes(Cow::Owned(owned_encoded));
1504
1505 assert_eq!(decoded.as_bytes(), raw.as_bytes());
1506 assert_eq!(
1507 decoded
1508 .accepted_schema_fingerprint()
1509 .expect("identity header should decode"),
1510 expected_fingerprint
1511 );
1512 assert_eq!(
1513 owned_decoded
1514 .accepted_schema_fingerprint()
1515 .expect("owned identity header should decode"),
1516 expected_fingerprint
1517 );
1518 }
1519
1520 #[test]
1521 fn schema_store_persists_raw_snapshots_by_entity_version_key() {
1522 let mut store = SchemaStore::init(test_memory(251));
1523 let key = RawSchemaKey::from_entity_version(EntityTag::new(17), SchemaVersion::initial());
1524
1525 assert!(store.is_empty());
1526 assert!(!store.contains_raw_snapshot(&key));
1527
1528 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(vec![9, 4, 6]));
1529
1530 assert_eq!(store.len(), 1);
1531 assert!(store.contains_raw_snapshot(&key));
1532 assert_eq!(
1533 store
1534 .get_raw_snapshot(&key)
1535 .expect("schema snapshot should be present")
1536 .as_bytes(),
1537 &[9, 4, 6],
1538 );
1539
1540 store.clear();
1541 assert!(store.is_empty());
1542 }
1543
1544 #[test]
1545 fn schema_store_loads_latest_snapshot_for_entity() {
1546 let mut store = SchemaStore::init(test_memory(252));
1547 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1548 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1549 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1550
1551 store
1552 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1553 .expect("initial schema snapshot should encode");
1554 store
1555 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1556 .expect("other entity schema snapshot should encode");
1557 store
1558 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1559 .expect("newer schema snapshot should encode");
1560
1561 let latest = store
1562 .latest_persisted_snapshot(EntityTag::new(41))
1563 .expect("latest schema snapshot should decode")
1564 .expect("schema snapshot should exist");
1565
1566 assert_eq!(latest.version(), SchemaVersion::new(2));
1567 assert_eq!(latest.entity_name(), "Newer");
1568 }
1569
1570 #[test]
1571 fn schema_store_entity_footprint_counts_raw_snapshots_without_decoding() {
1572 let mut store = SchemaStore::init(test_memory(242));
1573 store.insert_raw_snapshot(
1574 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::initial()),
1575 RawSchemaSnapshot::from_bytes(vec![1, 2, 3]),
1576 );
1577 store.insert_raw_snapshot(
1578 RawSchemaKey::from_entity_version(EntityTag::new(72), SchemaVersion::new(3)),
1579 RawSchemaSnapshot::from_bytes(vec![5, 8]),
1580 );
1581 store.insert_raw_snapshot(
1582 RawSchemaKey::from_entity_version(EntityTag::new(71), SchemaVersion::new(2)),
1583 RawSchemaSnapshot::from_bytes(vec![13, 21, 34, 55]),
1584 );
1585
1586 let footprint = store.entity_footprint(EntityTag::new(71));
1587
1588 assert_eq!(footprint.snapshots(), 2);
1589 assert_eq!(footprint.encoded_bytes(), 7);
1590 assert_eq!(footprint.latest_snapshot_bytes(), 4);
1591 }
1592
1593 #[test]
1594 fn schema_store_visit_raw_snapshots_preserves_key_order() {
1595 let mut store = SchemaStore::init(test_memory(235));
1596 store.insert_raw_snapshot(
1597 RawSchemaKey::from_entity_version(EntityTag::new(3), SchemaVersion::new(2)),
1598 RawSchemaSnapshot::from_bytes(vec![32]),
1599 );
1600 store.insert_raw_snapshot(
1601 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(3)),
1602 RawSchemaSnapshot::from_bytes(vec![13]),
1603 );
1604 store.insert_raw_snapshot(
1605 RawSchemaKey::from_entity_version(EntityTag::new(1), SchemaVersion::new(1)),
1606 RawSchemaSnapshot::from_bytes(vec![11]),
1607 );
1608
1609 let mut visited = Vec::new();
1610 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1611 visited.push((
1612 key.entity_tag().value(),
1613 key.version(),
1614 snapshot.as_bytes()[0],
1615 ));
1616 Ok(SchemaStoreVisit::Continue)
1617 });
1618
1619 assert_eq!(visited, vec![(1, 1, 11), (1, 3, 13), (3, 2, 32)]);
1620 }
1621
1622 #[test]
1623 fn schema_store_visit_raw_snapshots_can_stop_without_error() {
1624 let mut store = SchemaStore::init(test_memory(234));
1625 store.insert_raw_snapshot(
1626 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(1)),
1627 RawSchemaSnapshot::from_bytes(vec![21]),
1628 );
1629 store.insert_raw_snapshot(
1630 RawSchemaKey::from_entity_version(EntityTag::new(2), SchemaVersion::new(2)),
1631 RawSchemaSnapshot::from_bytes(vec![22]),
1632 );
1633
1634 let mut visited = Vec::new();
1635 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, _| {
1636 visited.push(key.version());
1637 Ok(SchemaStoreVisit::Stop)
1638 });
1639
1640 assert_eq!(visited, vec![1]);
1641 }
1642
1643 #[test]
1644 fn heap_schema_store_preserves_order_latest_snapshot_and_early_stop() {
1645 let mut store = SchemaStore::init_heap();
1646 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1647 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1648 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1649
1650 store
1651 .insert_persisted_snapshot(EntityTag::new(41), &initial)
1652 .expect("initial heap schema snapshot should encode");
1653 store
1654 .insert_persisted_snapshot(EntityTag::new(42), &other_entity)
1655 .expect("other heap schema snapshot should encode");
1656 store
1657 .insert_persisted_snapshot(EntityTag::new(41), &newer)
1658 .expect("newer heap schema snapshot should encode");
1659
1660 let latest = store
1661 .latest_persisted_snapshot(EntityTag::new(41))
1662 .expect("latest heap schema snapshot should decode")
1663 .expect("heap schema snapshot should exist");
1664 assert_eq!(latest.version(), SchemaVersion::new(2));
1665 assert_eq!(latest.entity_name(), "Newer");
1666
1667 let mut visited = Vec::new();
1668 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1669 visited.push((
1670 key.entity_tag().value(),
1671 key.version(),
1672 snapshot.as_bytes().len(),
1673 ));
1674 Ok(if visited.len() == 2 {
1675 SchemaStoreVisit::Stop
1676 } else {
1677 SchemaStoreVisit::Continue
1678 })
1679 });
1680 assert_eq!(
1681 visited
1682 .iter()
1683 .map(|(entity, version, _)| (*entity, *version))
1684 .collect::<Vec<_>>(),
1685 vec![(41, 1), (41, 2)]
1686 );
1687 }
1688
1689 #[test]
1690 fn journaled_schema_store_streams_overlay_latest_snapshot_and_early_stop() {
1691 let mut store = SchemaStore::init_journaled(test_memory(233));
1692 let canonical_initial =
1693 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1694 let canonical_replaced =
1695 persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Canonical");
1696 let live_replacement = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Live");
1697 let live_newer = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "LiveNewer");
1698 let other_entity = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Other");
1699
1700 store
1701 .fold_persisted_snapshot(EntityTag::new(61), &canonical_initial)
1702 .expect("initial canonical schema snapshot should encode");
1703 store
1704 .fold_persisted_snapshot(EntityTag::new(61), &canonical_replaced)
1705 .expect("canonical schema snapshot should encode");
1706 store
1707 .fold_persisted_snapshot(EntityTag::new(62), &other_entity)
1708 .expect("other canonical schema snapshot should encode");
1709 store
1710 .insert_persisted_snapshot(EntityTag::new(61), &live_replacement)
1711 .expect("live replacement schema snapshot should encode");
1712 store
1713 .insert_persisted_snapshot(EntityTag::new(61), &live_newer)
1714 .expect("live newer schema snapshot should encode");
1715
1716 let latest = store
1717 .latest_persisted_snapshot(EntityTag::new(61))
1718 .expect("latest journaled schema snapshot should decode")
1719 .expect("journaled schema snapshot should exist");
1720 assert_eq!(latest.version(), SchemaVersion::new(3));
1721 assert_eq!(latest.entity_name(), "LiveNewer");
1722 assert_eq!(store.len(), 4);
1723
1724 let mut visited = Vec::new();
1725 let _: Result<(), Infallible> = store.visit_raw_snapshots(|key, snapshot| {
1726 let decoded = snapshot
1727 .decode_persisted_snapshot()
1728 .expect("visited schema snapshot should decode");
1729 visited.push((
1730 key.entity_tag().value(),
1731 key.version(),
1732 decoded.entity_name().to_string(),
1733 ));
1734 Ok(if visited.len() == 3 {
1735 SchemaStoreVisit::Stop
1736 } else {
1737 SchemaStoreVisit::Continue
1738 })
1739 });
1740 assert_eq!(
1741 visited,
1742 vec![
1743 (61, 1, "Initial".to_string()),
1744 (61, 2, "Live".to_string()),
1745 (61, 3, "LiveNewer".to_string()),
1746 ],
1747 );
1748
1749 store.clear();
1750 assert!(store.is_empty());
1751 assert!(
1752 store
1753 .latest_persisted_snapshot(EntityTag::new(61))
1754 .expect("cleared journaled latest snapshot lookup should decode")
1755 .is_none(),
1756 );
1757 }
1758
1759 #[test]
1760 fn journaled_schema_store_latest_snapshot_reads_each_overlay_source() {
1761 let entity = EntityTag::new(71);
1762
1763 let mut canonical_only = SchemaStore::init_journaled(test_memory(231));
1764 let canonical =
1765 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalOnly");
1766 canonical_only
1767 .fold_persisted_snapshot(entity, &canonical)
1768 .expect("canonical-only schema snapshot should encode");
1769 assert_latest_schema(
1770 &canonical_only,
1771 entity,
1772 SchemaVersion::initial(),
1773 "CanonicalOnly",
1774 );
1775
1776 let mut live_only = SchemaStore::init_journaled(test_memory(230));
1777 let live = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveOnly");
1778 live_only
1779 .insert_persisted_snapshot(entity, &live)
1780 .expect("live-only schema snapshot should encode");
1781 assert_latest_schema(&live_only, entity, SchemaVersion::new(2), "LiveOnly");
1782
1783 let mut live_override = SchemaStore::init_journaled(test_memory(229));
1784 let canonical_duplicate =
1785 persisted_schema_snapshot_for_test(SchemaVersion::new(3), "CanonicalDuplicate");
1786 let live_duplicate =
1787 persisted_schema_snapshot_for_test(SchemaVersion::new(3), "LiveDuplicate");
1788 live_override
1789 .fold_persisted_snapshot(entity, &canonical_duplicate)
1790 .expect("canonical duplicate schema snapshot should encode");
1791 live_override
1792 .insert_persisted_snapshot(entity, &live_duplicate)
1793 .expect("live duplicate schema snapshot should encode");
1794 assert_latest_schema(
1795 &live_override,
1796 entity,
1797 SchemaVersion::new(3),
1798 "LiveDuplicate",
1799 );
1800 }
1801
1802 #[test]
1803 fn journaled_schema_store_descending_range_orders_live_between_canonical_versions() {
1804 let mut store = SchemaStore::init_journaled(test_memory(228));
1805 let entity = EntityTag::new(72);
1806 let lower_entity = EntityTag::new(71);
1807 let higher_entity = EntityTag::new(73);
1808 let canonical_initial =
1809 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalV1");
1810 let canonical_duplicate =
1811 persisted_schema_snapshot_for_test(SchemaVersion::new(2), "CanonicalV2");
1812 let live_duplicate = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveV2");
1813 let canonical_latest =
1814 persisted_schema_snapshot_for_test(SchemaVersion::new(3), "CanonicalV3");
1815 let unrelated_lower =
1816 persisted_schema_snapshot_for_test(SchemaVersion::new(9), "UnrelatedLower");
1817
1818 store
1819 .fold_persisted_snapshot(entity, &canonical_initial)
1820 .expect("canonical v1 schema snapshot should encode");
1821 store
1822 .fold_persisted_snapshot(entity, &canonical_duplicate)
1823 .expect("canonical v2 schema snapshot should encode");
1824 store
1825 .fold_persisted_snapshot(entity, &canonical_latest)
1826 .expect("canonical v3 schema snapshot should encode");
1827 store
1828 .fold_persisted_snapshot(lower_entity, &unrelated_lower)
1829 .expect("lower unrelated schema snapshot should encode");
1830 store
1831 .insert_persisted_snapshot(entity, &live_duplicate)
1832 .expect("live v2 schema snapshot should encode");
1833 store.insert_raw_snapshot(
1834 RawSchemaKey::from_entity_version(higher_entity, SchemaVersion::new(1)),
1835 RawSchemaSnapshot::from_bytes(vec![0xff]),
1836 );
1837
1838 let visited = visit_journaled_schema_range(&store, entity, Direction::Desc, usize::MAX);
1839 assert_eq!(
1840 visited,
1841 vec![
1842 (3, "CanonicalV3".to_string()),
1843 (2, "LiveV2".to_string()),
1844 (1, "CanonicalV1".to_string()),
1845 ],
1846 );
1847
1848 let early_stop = visit_journaled_schema_range(&store, entity, Direction::Desc, 1);
1849 assert_eq!(early_stop, vec![(3, "CanonicalV3".to_string())]);
1850 }
1851
1852 #[test]
1853 fn journaled_schema_store_latest_snapshot_skips_tombstoned_latest_version() {
1854 let entity = EntityTag::new(74);
1855
1856 let mut canonical_latest_tombstoned = SchemaStore::init_journaled(test_memory(227));
1857 let canonical_initial =
1858 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "CanonicalV1");
1859 let canonical_latest =
1860 persisted_schema_snapshot_for_test(SchemaVersion::new(2), "CanonicalV2");
1861 canonical_latest_tombstoned
1862 .fold_persisted_snapshot(entity, &canonical_initial)
1863 .expect("canonical v1 schema snapshot should encode");
1864 canonical_latest_tombstoned
1865 .fold_persisted_snapshot(entity, &canonical_latest)
1866 .expect("canonical v2 schema snapshot should encode");
1867 tombstone_journaled_raw_snapshot(
1868 &mut canonical_latest_tombstoned,
1869 entity,
1870 SchemaVersion::new(2),
1871 );
1872
1873 assert_latest_schema(
1874 &canonical_latest_tombstoned,
1875 entity,
1876 SchemaVersion::initial(),
1877 "CanonicalV1",
1878 );
1879 assert!(
1880 canonical_latest_tombstoned
1881 .get_persisted_snapshot(entity, SchemaVersion::new(2))
1882 .expect("tombstoned canonical snapshot lookup should not decode")
1883 .is_none(),
1884 );
1885
1886 let mut live_latest_tombstoned = SchemaStore::init_journaled(test_memory(226));
1887 let live_latest = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "LiveV2");
1888 live_latest_tombstoned
1889 .fold_persisted_snapshot(entity, &canonical_initial)
1890 .expect("canonical v1 schema snapshot should encode");
1891 live_latest_tombstoned
1892 .insert_persisted_snapshot(entity, &live_latest)
1893 .expect("live v2 schema snapshot should encode");
1894 tombstone_journaled_raw_snapshot(
1895 &mut live_latest_tombstoned,
1896 entity,
1897 SchemaVersion::new(2),
1898 );
1899
1900 assert_latest_schema(
1901 &live_latest_tombstoned,
1902 entity,
1903 SchemaVersion::initial(),
1904 "CanonicalV1",
1905 );
1906 assert!(
1907 live_latest_tombstoned
1908 .get_persisted_snapshot(entity, SchemaVersion::new(2))
1909 .expect("tombstoned live snapshot lookup should not decode")
1910 .is_none(),
1911 );
1912 }
1913
1914 #[test]
1915 fn schema_store_catalog_metadata_is_absent_without_accepted_snapshots() {
1916 let store = SchemaStore::init(test_memory(241));
1917
1918 assert_eq!(
1919 store
1920 .catalog_metadata()
1921 .expect("empty schema catalog metadata should derive"),
1922 None
1923 );
1924 }
1925
1926 #[test]
1927 fn schema_store_latest_catalog_identity_uses_version_neutral_header_without_decoding() {
1928 let mut store = SchemaStore::init(test_memory(239));
1929 let entity = EntityTag::new(80);
1930 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Versioned");
1931 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Versioned");
1932 let expected_fingerprint = accepted_schema_cache_fingerprint(
1933 &AcceptedSchemaSnapshot::try_new(initial.clone())
1934 .expect("initial schema snapshot should be accepted"),
1935 )
1936 .expect("accepted schema fingerprint should derive");
1937
1938 store
1939 .insert_persisted_snapshot(entity, &initial)
1940 .expect("initial schema snapshot should encode");
1941 store
1942 .insert_persisted_snapshot(entity, &newer)
1943 .expect("newer schema snapshot should encode");
1944
1945 reset_persisted_schema_snapshot_decode_count_for_tests();
1946 let selection = store
1947 .latest_catalog_identity(entity, "entities::Versioned", "schema_store_test")
1948 .expect("latest catalog identity should derive from header")
1949 .expect("latest catalog identity should exist");
1950
1951 assert_eq!(persisted_schema_snapshot_decode_count_for_tests(), 0);
1952 assert_eq!(
1953 selection.identity().accepted_schema_version(),
1954 SchemaVersion::new(2)
1955 );
1956 assert_eq!(
1957 selection.identity().accepted_schema_fingerprint(),
1958 expected_fingerprint,
1959 "accepted catalog identity fingerprint must exclude schema_version",
1960 );
1961 }
1962
1963 #[test]
1964 fn schema_store_catalog_metadata_uses_latest_persisted_snapshots() {
1965 let mut store = SchemaStore::init(test_memory(240));
1966 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
1967 let newer = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "Newer");
1968 let other = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Other");
1969
1970 store
1971 .insert_persisted_snapshot(EntityTag::new(81), &initial)
1972 .expect("initial schema snapshot should encode");
1973 let initial_metadata = store
1974 .catalog_metadata()
1975 .expect("initial schema catalog metadata should derive")
1976 .expect("initial schema catalog metadata should be present");
1977
1978 store
1979 .insert_persisted_snapshot(EntityTag::new(81), &newer)
1980 .expect("newer schema snapshot should encode");
1981 store
1982 .insert_persisted_snapshot(EntityTag::new(82), &other)
1983 .expect("other schema snapshot should encode");
1984 let updated_metadata = store
1985 .catalog_metadata()
1986 .expect("updated schema catalog metadata should derive")
1987 .expect("updated schema catalog metadata should be present");
1988
1989 assert_eq!(initial_metadata.schema_version(), SchemaVersion::initial());
1990 assert_eq!(
1991 initial_metadata.schema_fingerprint_method_version(),
1992 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
1993 );
1994 assert_eq!(initial_metadata.entity_count(), 1);
1995 assert_eq!(updated_metadata.schema_version(), SchemaVersion::new(3));
1996 assert_eq!(
1997 updated_metadata.schema_fingerprint_method_version(),
1998 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
1999 );
2000 assert_eq!(updated_metadata.entity_count(), 2);
2001 assert_ne!(
2002 initial_metadata.schema_fingerprint(),
2003 updated_metadata.schema_fingerprint(),
2004 "catalog fingerprint must change when latest accepted schema catalog changes"
2005 );
2006 }
2007
2008 #[test]
2009 fn schema_store_catalog_metadata_is_independent_of_insertion_order() {
2010 let first = persisted_schema_snapshot_for_test(SchemaVersion::new(2), "First");
2011 let second = persisted_schema_snapshot_for_test(SchemaVersion::new(3), "Second");
2012
2013 let mut left = SchemaStore::init(test_memory(239));
2014 left.insert_persisted_snapshot(EntityTag::new(91), &first)
2015 .expect("first schema snapshot should encode");
2016 left.insert_persisted_snapshot(EntityTag::new(92), &second)
2017 .expect("second schema snapshot should encode");
2018
2019 let mut right = SchemaStore::init(test_memory(238));
2020 right
2021 .insert_persisted_snapshot(EntityTag::new(92), &second)
2022 .expect("second schema snapshot should encode");
2023 right
2024 .insert_persisted_snapshot(EntityTag::new(91), &first)
2025 .expect("first schema snapshot should encode");
2026
2027 let left_metadata = left
2028 .catalog_metadata()
2029 .expect("left schema catalog metadata should derive");
2030 let right_metadata = right
2031 .catalog_metadata()
2032 .expect("right schema catalog metadata should derive");
2033
2034 assert_eq!(left_metadata, right_metadata);
2035 }
2036
2037 #[test]
2038 fn schema_store_allocation_metadata_uses_role_specific_fingerprints() {
2039 let without_index =
2040 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RoleSpecific");
2041 let with_index = persisted_schema_snapshot_with_index_for_test(
2042 SchemaVersion::initial(),
2043 "RoleSpecific",
2044 "payload_idx",
2045 );
2046
2047 let mut base = SchemaStore::init(test_memory(237));
2048 base.insert_persisted_snapshot(EntityTag::new(93), &without_index)
2049 .expect("base schema snapshot should encode");
2050 let base_metadata = base
2051 .allocation_metadata()
2052 .expect("base allocation metadata should derive")
2053 .expect("base allocation metadata should be present");
2054
2055 let mut indexed = SchemaStore::init(test_memory(236));
2056 indexed
2057 .insert_persisted_snapshot(EntityTag::new(93), &with_index)
2058 .expect("indexed schema snapshot should encode");
2059 let indexed_metadata = indexed
2060 .allocation_metadata()
2061 .expect("indexed allocation metadata should derive")
2062 .expect("indexed allocation metadata should be present");
2063
2064 assert_eq!(
2065 base_metadata.data().schema_fingerprint(),
2066 indexed_metadata.data().schema_fingerprint(),
2067 "data allocation metadata should ignore accepted index catalog changes"
2068 );
2069 assert_eq!(
2070 indexed_metadata.data().schema_fingerprint_method_version(),
2071 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION
2072 );
2073 assert_eq!(
2074 indexed_metadata.index().schema_fingerprint_method_version(),
2075 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION
2076 );
2077 assert_eq!(
2078 indexed_metadata
2079 .schema()
2080 .schema_fingerprint_method_version(),
2081 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION
2082 );
2083 assert_ne!(
2084 base_metadata.index().schema_fingerprint(),
2085 indexed_metadata.index().schema_fingerprint(),
2086 "index allocation metadata should change when accepted index catalog changes"
2087 );
2088 assert_ne!(
2089 base_metadata.schema().schema_fingerprint(),
2090 indexed_metadata.schema().schema_fingerprint(),
2091 "schema allocation metadata should change when full accepted catalog changes"
2092 );
2093 assert_ne!(
2094 indexed_metadata.data().schema_fingerprint(),
2095 indexed_metadata.index().schema_fingerprint(),
2096 "data and index allocation metadata should have distinct role fingerprints"
2097 );
2098 assert_ne!(
2099 indexed_metadata.index().schema_fingerprint(),
2100 indexed_metadata.schema().schema_fingerprint(),
2101 "index and schema allocation metadata should have distinct role fingerprints"
2102 );
2103 }
2104
2105 #[test]
2106 fn schema_store_rejects_mismatched_snapshot_and_layout_versions() {
2107 let mut store = SchemaStore::init(test_memory(253));
2108 let invalid = persisted_schema_snapshot_with_layout_version_for_test(
2109 SchemaVersion::new(2),
2110 SchemaVersion::initial(),
2111 "Invalid",
2112 );
2113
2114 let err = store
2115 .insert_persisted_snapshot(EntityTag::new(43), &invalid)
2116 .expect_err("schema store should reject mismatched snapshot/layout versions");
2117
2118 assert!(
2119 err.message()
2120 .contains("schema snapshot row-layout version mismatch"),
2121 "schema store should preserve the version mismatch diagnostic"
2122 );
2123 }
2124
2125 #[test]
2126 fn schema_store_rejects_typed_snapshot_with_zero_schema_version() {
2127 let mut store = SchemaStore::init(test_memory(254));
2128 let invalid =
2129 persisted_schema_snapshot_for_test(SchemaVersion::new(0), "ZeroSchemaVersion");
2130
2131 let err = store
2132 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
2133 .expect_err("schema store should reject non-positive schema versions");
2134
2135 assert!(
2136 err.message()
2137 .contains("schema snapshot schema_version must be positive"),
2138 "schema store should hard-cut non-positive persisted schema versions"
2139 );
2140 }
2141
2142 #[test]
2143 fn schema_store_rejects_typed_snapshot_with_divergent_field_slots() {
2144 let mut store = SchemaStore::init(test_memory(232));
2145 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "InvalidSlots");
2146 let invalid = PersistedSchemaSnapshot::new(
2147 base.version(),
2148 base.entity_path().to_string(),
2149 base.entity_name().to_string(),
2150 base.first_primary_key_field_id(),
2151 SchemaRowLayout::new(
2152 base.version(),
2153 vec![
2154 (FieldId::new(1), SchemaFieldSlot::new(0)),
2155 (FieldId::new(2), SchemaFieldSlot::new(3)),
2156 ],
2157 ),
2158 base.fields().to_vec(),
2159 );
2160
2161 let err = store
2162 .insert_persisted_snapshot(EntityTag::new(44), &invalid)
2163 .expect_err("schema store should reject divergent field/layout slots");
2164
2165 assert!(
2166 err.message()
2167 .contains("schema snapshot field slot mismatch"),
2168 "schema store should report the duplicated slot divergence"
2169 );
2170 }
2171
2172 #[test]
2173 fn schema_store_rejects_typed_snapshot_with_duplicate_row_layout_slot() {
2174 let mut store = SchemaStore::init(test_memory(246));
2175 let base =
2176 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateLayoutSlot");
2177 let invalid = PersistedSchemaSnapshot::new(
2178 base.version(),
2179 base.entity_path().to_string(),
2180 base.entity_name().to_string(),
2181 base.first_primary_key_field_id(),
2182 SchemaRowLayout::new(
2183 base.version(),
2184 vec![
2185 (FieldId::new(1), SchemaFieldSlot::new(0)),
2186 (FieldId::new(2), SchemaFieldSlot::new(0)),
2187 ],
2188 ),
2189 base.fields().to_vec(),
2190 );
2191
2192 let err = store
2193 .insert_persisted_snapshot(EntityTag::new(49), &invalid)
2194 .expect_err("schema store should reject duplicate row-layout slots");
2195
2196 assert!(
2197 err.message()
2198 .contains("schema snapshot duplicate row-layout slot"),
2199 "schema store should report the row-layout slot ambiguity"
2200 );
2201 }
2202
2203 #[test]
2204 fn schema_store_rejects_typed_snapshot_with_missing_primary_key_field() {
2205 let mut store = SchemaStore::init(test_memory(248));
2206 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "MissingPk");
2207 let invalid = PersistedSchemaSnapshot::new(
2208 base.version(),
2209 base.entity_path().to_string(),
2210 base.entity_name().to_string(),
2211 FieldId::new(99),
2212 base.row_layout().clone(),
2213 base.fields().to_vec(),
2214 );
2215
2216 let err = store
2217 .insert_persisted_snapshot(EntityTag::new(47), &invalid)
2218 .expect_err("schema store should reject snapshots without the primary-key field");
2219
2220 assert!(
2221 err.message()
2222 .contains("schema snapshot primary key field missing from row layout"),
2223 "schema store should report the missing primary-key field"
2224 );
2225 }
2226
2227 #[test]
2228 fn schema_store_does_not_fallback_when_latest_snapshot_is_corrupt() {
2229 let mut store = SchemaStore::init(test_memory(249));
2230 let initial = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Initial");
2231 let corrupt_key =
2232 RawSchemaKey::from_entity_version(EntityTag::new(45), SchemaVersion::new(3));
2233
2234 store
2235 .insert_persisted_snapshot(EntityTag::new(45), &initial)
2236 .expect("initial schema snapshot should encode");
2237 store.insert_raw_snapshot(corrupt_key, RawSchemaSnapshot::from_bytes(vec![0xff, 0x00]));
2238
2239 let err = store
2240 .latest_persisted_snapshot(EntityTag::new(45))
2241 .expect_err("latest corrupt schema snapshot must fail closed");
2242
2243 assert!(
2244 err.message()
2245 .contains("failed to decode persisted schema snapshot"),
2246 "latest-version lookup should report the corrupt newest snapshot"
2247 );
2248 }
2249
2250 #[test]
2251 fn schema_store_rejects_raw_snapshot_with_divergent_field_slots() {
2252 let mut store = SchemaStore::init(test_memory(250));
2253 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawInvalidSlots");
2254 let invalid = PersistedSchemaSnapshot::new(
2255 base.version(),
2256 base.entity_path().to_string(),
2257 base.entity_name().to_string(),
2258 base.first_primary_key_field_id(),
2259 SchemaRowLayout::new(
2260 base.version(),
2261 vec![
2262 (FieldId::new(1), SchemaFieldSlot::new(0)),
2263 (FieldId::new(2), SchemaFieldSlot::new(3)),
2264 ],
2265 ),
2266 base.fields().to_vec(),
2267 );
2268 let raw = encode_persisted_schema_snapshot(&invalid)
2269 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2270 let key = RawSchemaKey::from_entity_version(EntityTag::new(46), invalid.version());
2271
2272 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2273
2274 let err = store
2275 .latest_persisted_snapshot(EntityTag::new(46))
2276 .expect_err("raw decode should reject divergent field/layout slots");
2277
2278 assert!(
2279 err.message()
2280 .contains("persisted schema snapshot field slot mismatch"),
2281 "schema codec should report the raw decoded slot divergence"
2282 );
2283 }
2284
2285 #[test]
2286 fn schema_store_rejects_raw_snapshot_with_missing_primary_key_field() {
2287 let mut store = SchemaStore::init(test_memory(247));
2288 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "RawMissingPk");
2289 let invalid = PersistedSchemaSnapshot::new(
2290 base.version(),
2291 base.entity_path().to_string(),
2292 base.entity_name().to_string(),
2293 FieldId::new(99),
2294 base.row_layout().clone(),
2295 base.fields().to_vec(),
2296 );
2297 let raw = encode_persisted_schema_snapshot(&invalid)
2298 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2299 let key = RawSchemaKey::from_entity_version(EntityTag::new(48), invalid.version());
2300
2301 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2302
2303 let err = store
2304 .latest_persisted_snapshot(EntityTag::new(48))
2305 .expect_err("raw decode should reject snapshots without the primary-key field");
2306
2307 assert!(
2308 err.message()
2309 .contains("persisted schema snapshot primary key field missing from row layout"),
2310 "schema codec should report the raw decoded missing primary-key field"
2311 );
2312 }
2313
2314 #[test]
2315 fn schema_store_rejects_raw_snapshot_with_duplicate_field_name() {
2316 let mut store = SchemaStore::init(test_memory(245));
2317 let base =
2318 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateFieldName");
2319 let mut fields = base.fields().to_vec();
2320 let duplicate = PersistedFieldSnapshot::new(
2321 fields[1].id(),
2322 fields[0].name().to_string(),
2323 fields[1].slot(),
2324 fields[1].kind().clone(),
2325 fields[1].nested_leaves().to_vec(),
2326 fields[1].nullable(),
2327 fields[1].default().clone(),
2328 fields[1].storage_decode(),
2329 fields[1].leaf_codec(),
2330 );
2331 fields[1] = duplicate;
2332 let invalid = PersistedSchemaSnapshot::new(
2333 base.version(),
2334 base.entity_path().to_string(),
2335 base.entity_name().to_string(),
2336 base.first_primary_key_field_id(),
2337 base.row_layout().clone(),
2338 fields,
2339 );
2340 let raw = encode_persisted_schema_snapshot(&invalid)
2341 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2342 let key = RawSchemaKey::from_entity_version(EntityTag::new(50), invalid.version());
2343
2344 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2345
2346 let err = store
2347 .latest_persisted_snapshot(EntityTag::new(50))
2348 .expect_err("raw decode should reject duplicate field names");
2349
2350 assert!(
2351 err.message()
2352 .contains("persisted schema snapshot duplicate field name"),
2353 "schema codec should report the raw decoded field-name ambiguity"
2354 );
2355 }
2356
2357 #[test]
2358 fn schema_store_rejects_typed_snapshot_with_empty_nested_leaf_path() {
2359 let mut store = SchemaStore::init(test_memory(244));
2360 let base = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "EmptyNestedLeaf");
2361 let mut fields = base.fields().to_vec();
2362 let invalid_field = PersistedFieldSnapshot::new(
2363 fields[1].id(),
2364 fields[1].name().to_string(),
2365 fields[1].slot(),
2366 fields[1].kind().clone(),
2367 vec![PersistedNestedLeafSnapshot::new(
2368 Vec::new(),
2369 PersistedFieldKind::Blob { max_len: None },
2370 false,
2371 FieldStorageDecode::ByKind,
2372 LeafCodec::Scalar(ScalarCodec::Blob),
2373 )],
2374 fields[1].nullable(),
2375 fields[1].default().clone(),
2376 fields[1].storage_decode(),
2377 fields[1].leaf_codec(),
2378 );
2379 fields[1] = invalid_field;
2380 let invalid = PersistedSchemaSnapshot::new(
2381 base.version(),
2382 base.entity_path().to_string(),
2383 base.entity_name().to_string(),
2384 base.first_primary_key_field_id(),
2385 base.row_layout().clone(),
2386 fields,
2387 );
2388
2389 let err = store
2390 .insert_persisted_snapshot(EntityTag::new(51), &invalid)
2391 .expect_err("schema store should reject empty nested leaf paths");
2392
2393 assert!(
2394 err.message()
2395 .contains("schema snapshot empty nested leaf path"),
2396 "schema store should report the empty nested leaf path"
2397 );
2398 }
2399
2400 #[test]
2401 fn schema_store_rejects_raw_snapshot_with_duplicate_nested_leaf_path() {
2402 let mut store = SchemaStore::init(test_memory(243));
2403 let base =
2404 persisted_schema_snapshot_for_test(SchemaVersion::initial(), "DuplicateNestedLeaf");
2405 let mut fields = base.fields().to_vec();
2406 let duplicate_leaves = vec![
2407 PersistedNestedLeafSnapshot::new(
2408 vec!["bytes".to_string()],
2409 PersistedFieldKind::Blob { max_len: None },
2410 false,
2411 FieldStorageDecode::ByKind,
2412 LeafCodec::Scalar(ScalarCodec::Blob),
2413 ),
2414 PersistedNestedLeafSnapshot::new(
2415 vec!["bytes".to_string()],
2416 PersistedFieldKind::Text { max_len: None },
2417 false,
2418 FieldStorageDecode::ByKind,
2419 LeafCodec::Scalar(ScalarCodec::Text),
2420 ),
2421 ];
2422 let invalid_field = PersistedFieldSnapshot::new(
2423 fields[1].id(),
2424 fields[1].name().to_string(),
2425 fields[1].slot(),
2426 fields[1].kind().clone(),
2427 duplicate_leaves,
2428 fields[1].nullable(),
2429 fields[1].default().clone(),
2430 fields[1].storage_decode(),
2431 fields[1].leaf_codec(),
2432 );
2433 fields[1] = invalid_field;
2434 let invalid = PersistedSchemaSnapshot::new(
2435 base.version(),
2436 base.entity_path().to_string(),
2437 base.entity_name().to_string(),
2438 base.first_primary_key_field_id(),
2439 base.row_layout().clone(),
2440 fields,
2441 );
2442 let raw = encode_persisted_schema_snapshot(&invalid)
2443 .expect("invalid raw schema snapshot should encode for decode-boundary coverage");
2444 let key = RawSchemaKey::from_entity_version(EntityTag::new(52), invalid.version());
2445
2446 store.insert_raw_snapshot(key, RawSchemaSnapshot::from_bytes(raw));
2447
2448 let err = store
2449 .latest_persisted_snapshot(EntityTag::new(52))
2450 .expect_err("raw decode should reject duplicate nested leaf paths");
2451
2452 assert!(
2453 err.message()
2454 .contains("persisted schema snapshot duplicate nested leaf path"),
2455 "schema codec should report the raw decoded nested path ambiguity"
2456 );
2457 }
2458
2459 #[test]
2460 fn raw_schema_snapshot_encodes_and_decodes_typed_snapshot() {
2461 let snapshot = persisted_schema_snapshot_for_test(SchemaVersion::initial(), "Encoded");
2462
2463 let raw = RawSchemaSnapshot::from_persisted_snapshot(&snapshot)
2464 .expect("schema snapshot should encode");
2465 let decoded = raw
2466 .decode_persisted_snapshot()
2467 .expect("schema snapshot should decode");
2468
2469 assert_eq!(decoded, snapshot);
2470 }
2471
2472 fn assert_latest_schema(
2476 store: &SchemaStore,
2477 entity: EntityTag,
2478 version: SchemaVersion,
2479 entity_name: &str,
2480 ) {
2481 let latest = store
2482 .latest_persisted_snapshot(entity)
2483 .expect("latest schema snapshot should decode")
2484 .expect("latest schema snapshot should exist");
2485
2486 assert_eq!(latest.version(), version);
2487 assert_eq!(latest.entity_name(), entity_name);
2488 }
2489
2490 fn tombstone_journaled_raw_snapshot(
2491 store: &mut SchemaStore,
2492 entity: EntityTag,
2493 version: SchemaVersion,
2494 ) {
2495 let key = RawSchemaKey::from_entity_version(entity, version);
2496 let SchemaStoreBackend::Journaled { tombstones, .. } = &mut store.backend else {
2497 panic!("schema tombstone test helper requires a journaled store");
2498 };
2499
2500 tombstones.insert(key);
2501 }
2502
2503 fn visit_journaled_schema_range(
2504 store: &SchemaStore,
2505 entity: EntityTag,
2506 direction: Direction,
2507 stop_after: usize,
2508 ) -> Vec<(u32, String)> {
2509 let SchemaStoreBackend::Journaled {
2510 canonical,
2511 live,
2512 tombstones,
2513 } = &store.backend
2514 else {
2515 panic!("schema range test helper requires a journaled store");
2516 };
2517
2518 let mut visited = Vec::new();
2519 let _: Result<(), Infallible> = SchemaStore::visit_journaled_raw_snapshot_range(
2520 canonical,
2521 live,
2522 tombstones,
2523 RawSchemaKey::entity_range_bounds(entity),
2524 direction,
2525 |key, snapshot| {
2526 let decoded = snapshot
2527 .decode_persisted_snapshot()
2528 .expect("visited schema snapshot should decode");
2529 visited.push((key.version(), decoded.entity_name().to_string()));
2530 Ok(if visited.len() >= stop_after {
2531 SchemaStoreVisit::Stop
2532 } else {
2533 SchemaStoreVisit::Continue
2534 })
2535 },
2536 );
2537
2538 visited
2539 }
2540
2541 fn persisted_schema_snapshot_for_test(
2542 version: SchemaVersion,
2543 entity_name: &str,
2544 ) -> PersistedSchemaSnapshot {
2545 persisted_schema_snapshot_with_layout_version_for_test(version, version, entity_name)
2546 }
2547
2548 fn persisted_schema_snapshot_with_index_for_test(
2549 version: SchemaVersion,
2550 entity_name: &str,
2551 index_name: &str,
2552 ) -> PersistedSchemaSnapshot {
2553 let base = persisted_schema_snapshot_for_test(version, entity_name);
2554
2555 PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
2556 base.version(),
2557 base.entity_path().to_string(),
2558 base.entity_name().to_string(),
2559 base.primary_key_field_ids().to_vec(),
2560 base.row_layout().clone(),
2561 base.fields().to_vec(),
2562 vec![PersistedIndexSnapshot::new(
2563 0,
2564 index_name.to_string(),
2565 "RoleSpecificStore".to_string(),
2566 false,
2567 PersistedIndexKeySnapshot::FieldPath(vec![PersistedIndexFieldPathSnapshot::new(
2568 FieldId::new(1),
2569 SchemaFieldSlot::new(0),
2570 vec!["id".to_string()],
2571 PersistedFieldKind::Ulid,
2572 false,
2573 )]),
2574 None,
2575 )],
2576 )
2577 }
2578
2579 fn persisted_schema_snapshot_with_layout_version_for_test(
2583 version: SchemaVersion,
2584 layout_version: SchemaVersion,
2585 entity_name: &str,
2586 ) -> PersistedSchemaSnapshot {
2587 PersistedSchemaSnapshot::new(
2588 version,
2589 format!("entities::{entity_name}"),
2590 entity_name.to_string(),
2591 FieldId::new(1),
2592 SchemaRowLayout::new(
2593 layout_version,
2594 vec![
2595 (FieldId::new(1), SchemaFieldSlot::new(0)),
2596 (FieldId::new(2), SchemaFieldSlot::new(1)),
2597 ],
2598 ),
2599 vec![
2600 PersistedFieldSnapshot::new(
2601 FieldId::new(1),
2602 "id".to_string(),
2603 SchemaFieldSlot::new(0),
2604 PersistedFieldKind::Ulid,
2605 Vec::new(),
2606 false,
2607 SchemaFieldDefault::None,
2608 FieldStorageDecode::ByKind,
2609 LeafCodec::Scalar(ScalarCodec::Ulid),
2610 ),
2611 PersistedFieldSnapshot::new(
2612 FieldId::new(2),
2613 "payload".to_string(),
2614 SchemaFieldSlot::new(1),
2615 PersistedFieldKind::Map {
2616 key: Box::new(PersistedFieldKind::Text { max_len: None }),
2617 value: Box::new(PersistedFieldKind::List(Box::new(
2618 PersistedFieldKind::Nat64,
2619 ))),
2620 },
2621 vec![PersistedNestedLeafSnapshot::new(
2622 vec!["bytes".to_string()],
2623 PersistedFieldKind::Blob { max_len: None },
2624 false,
2625 FieldStorageDecode::ByKind,
2626 LeafCodec::Scalar(ScalarCodec::Blob),
2627 )],
2628 false,
2629 SchemaFieldDefault::None,
2630 FieldStorageDecode::ByKind,
2631 LeafCodec::StructuralFallback,
2632 ),
2633 ],
2634 )
2635 }
2636}