1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 direction::Direction,
14 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15 schema::{
16 AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17 PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18 accepted_schema_cache_fingerprint,
19 accepted_schema_cache_fingerprint_for_persisted_snapshot,
20 accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22 },
23 },
24 error::InternalError,
25 traits::Storable,
26 types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49
50#[cfg(test)]
51thread_local! {
52 static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
53}
54
55#[cfg(test)]
56pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
57 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
58}
59
60#[cfg(test)]
61pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
62 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
63}
64
65#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
74struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
75
76impl RawSchemaKey {
77 #[must_use]
79 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
80 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
82 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
83
84 Self(out)
85 }
86
87 #[must_use]
89 fn entity_tag(self) -> EntityTag {
90 let mut bytes = [0u8; size_of::<u64>()];
91 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
92
93 EntityTag::new(u64::from_be_bytes(bytes))
94 }
95
96 #[must_use]
98 fn version(self) -> u32 {
99 let mut bytes = [0u8; size_of::<u32>()];
100 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
101
102 u32::from_be_bytes(bytes)
103 }
104
105 fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
106 (
107 RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
108 RangeBound::Included(Self::from_entity_version(
109 entity,
110 SchemaVersion::new(u32::MAX),
111 )),
112 )
113 }
114}
115
116impl Storable for RawSchemaKey {
117 fn to_bytes(&self) -> Cow<'_, [u8]> {
118 Cow::Borrowed(&self.0)
119 }
120
121 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
122 debug_assert_eq!(
123 bytes.len(),
124 SCHEMA_KEY_BYTES_USIZE,
125 "RawSchemaKey::from_bytes received unexpected byte length",
126 );
127
128 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
129 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
130 }
131
132 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
133 out.copy_from_slice(bytes.as_ref());
134 Self(out)
135 }
136
137 fn into_bytes(self) -> Vec<u8> {
138 self.0.to_vec()
139 }
140
141 const BOUND: StorableBound = StorableBound::Bounded {
142 max_size: SCHEMA_KEY_BYTES,
143 is_fixed_size: true,
144 };
145}
146
147#[derive(Clone, Debug, Eq, PartialEq)]
157struct RawSchemaSnapshot {
158 payload: Vec<u8>,
159 accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
160}
161
162impl RawSchemaSnapshot {
163 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
165 validate_typed_schema_snapshot_for_store(snapshot)?;
166
167 let accepted_schema_fingerprint =
168 accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
169 let payload = encode_persisted_schema_snapshot(snapshot)?;
170
171 Ok(Self {
172 payload,
173 accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
174 })
175 }
176
177 #[must_use]
179 #[cfg(test)]
180 const fn from_bytes(payload: Vec<u8>) -> Self {
181 Self {
182 payload,
183 accepted_schema_fingerprint: None,
184 }
185 }
186
187 #[must_use]
189 const fn as_bytes(&self) -> &[u8] {
190 self.payload.as_slice()
191 }
192
193 #[must_use]
195 fn into_bytes(self) -> Vec<u8> {
196 self.payload
197 }
198
199 fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
202 self.accepted_schema_fingerprint
203 .ok_or_else(InternalError::store_corruption)
204 }
205
206 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
208 decode_persisted_schema_snapshot(self.as_bytes())
209 }
210}
211
212#[derive(Clone, Copy, Debug, Eq, PartialEq)]
213pub(in crate::db) struct AcceptedCatalogIdentity {
214 entity_tag: EntityTag,
215 entity_path: &'static str,
216 store_path: &'static str,
217 accepted_schema_version: SchemaVersion,
218 fingerprint_method_version: u8,
219 accepted_schema_fingerprint: CommitSchemaFingerprint,
220}
221
222impl AcceptedCatalogIdentity {
223 #[must_use]
224 pub(in crate::db) const fn new(
225 entity_tag: EntityTag,
226 entity_path: &'static str,
227 store_path: &'static str,
228 accepted_schema_version: SchemaVersion,
229 accepted_schema_fingerprint: CommitSchemaFingerprint,
230 ) -> Self {
231 Self {
232 entity_tag,
233 entity_path,
234 store_path,
235 accepted_schema_version,
236 fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
237 accepted_schema_fingerprint,
238 }
239 }
240
241 #[must_use]
242 pub(in crate::db) const fn entity_tag(self) -> EntityTag {
243 self.entity_tag
244 }
245
246 #[must_use]
247 pub(in crate::db) const fn entity_path(self) -> &'static str {
248 self.entity_path
249 }
250
251 #[must_use]
252 pub(in crate::db) const fn store_path(self) -> &'static str {
253 self.store_path
254 }
255
256 #[must_use]
257 pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
258 self.accepted_schema_version
259 }
260
261 #[must_use]
262 pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
263 self.fingerprint_method_version
264 }
265
266 #[must_use]
267 pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
268 self.accepted_schema_fingerprint
269 }
270}
271
272#[derive(Clone, Debug, Eq, PartialEq)]
273pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
274 identity: AcceptedCatalogIdentity,
275 raw_snapshot: Vec<u8>,
276}
277
278impl AcceptedCatalogSnapshotSelection {
279 #[must_use]
280 const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
281 Self {
282 identity,
283 raw_snapshot,
284 }
285 }
286
287 #[must_use]
288 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
289 self.identity
290 }
291
292 pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
293 let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
294 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
295 let identity = self.identity();
296
297 if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
298 return Err(InternalError::store_invariant());
299 }
300 if accepted.entity_path() != identity.entity_path() {
301 return Err(InternalError::store_invariant());
302 }
303
304 let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
305 if decoded_fingerprint != identity.accepted_schema_fingerprint() {
306 return Err(InternalError::store_invariant());
307 }
308
309 Ok(accepted)
310 }
311}
312
313impl Storable for RawSchemaSnapshot {
314 fn to_bytes(&self) -> Cow<'_, [u8]> {
315 let Some(fingerprint) = self.accepted_schema_fingerprint else {
316 return Cow::Borrowed(self.as_bytes());
317 };
318
319 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
320 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
321 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
322 bytes.extend_from_slice(&fingerprint);
323 bytes.extend_from_slice(self.as_bytes());
324
325 Cow::Owned(bytes)
326 }
327
328 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
329 let bytes = bytes.into_owned();
330 if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
331 && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
332 && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
333 {
334 let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
335 let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
336 let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
337 fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
338
339 return Self {
340 payload: bytes[fingerprint_end..].to_vec(),
341 accepted_schema_fingerprint: Some(fingerprint),
342 };
343 }
344
345 Self {
346 payload: bytes,
347 accepted_schema_fingerprint: None,
348 }
349 }
350
351 fn into_bytes(self) -> Vec<u8> {
352 let Some(fingerprint) = self.accepted_schema_fingerprint else {
353 return self.payload;
354 };
355
356 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
357 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
358 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
359 bytes.extend_from_slice(&fingerprint);
360 bytes.extend_from_slice(&self.payload);
361
362 bytes
363 }
364
365 const BOUND: StorableBound = StorableBound::Unbounded;
366}
367
368fn validate_typed_schema_snapshot_for_store(
372 snapshot: &PersistedSchemaSnapshot,
373) -> Result<(), InternalError> {
374 if schema_snapshot_integrity_detail(
375 "schema snapshot",
376 snapshot.version(),
377 snapshot.primary_key_field_ids(),
378 snapshot.row_layout(),
379 snapshot.fields(),
380 )
381 .is_some()
382 {
383 return Err(InternalError::store_invariant());
384 }
385
386 Ok(())
387}
388
389#[derive(Clone, Copy, Debug, Eq, PartialEq)]
398pub(in crate::db) struct SchemaStoreFootprint {
399 snapshots: u64,
400 encoded_bytes: u64,
401 latest_snapshot_bytes: u64,
402}
403
404#[derive(Clone, Copy, Debug, Eq, PartialEq)]
412pub(in crate::db) struct SchemaStoreCatalogMetadata {
413 schema_version: SchemaVersion,
414 schema_fingerprint_method_version: u8,
415 schema_fingerprint: CommitSchemaFingerprint,
416 entity_count: u64,
417}
418
419impl SchemaStoreCatalogMetadata {
420 #[must_use]
422 const fn new(
423 schema_version: SchemaVersion,
424 schema_fingerprint_method_version: u8,
425 schema_fingerprint: CommitSchemaFingerprint,
426 entity_count: u64,
427 ) -> Self {
428 Self {
429 schema_version,
430 schema_fingerprint_method_version,
431 schema_fingerprint,
432 entity_count,
433 }
434 }
435
436 #[must_use]
438 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
439 self.schema_version
440 }
441
442 #[must_use]
444 pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
445 self.schema_fingerprint_method_version
446 }
447
448 #[must_use]
451 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
452 self.schema_fingerprint
453 }
454
455 #[must_use]
457 pub(in crate::db) const fn entity_count(self) -> u64 {
458 self.entity_count
459 }
460}
461
462#[derive(Clone, Copy, Debug, Eq, PartialEq)]
471pub(in crate::db) struct SchemaStoreAllocationMetadata {
472 data: SchemaStoreCatalogMetadata,
473 index: SchemaStoreCatalogMetadata,
474 schema: SchemaStoreCatalogMetadata,
475}
476
477impl SchemaStoreAllocationMetadata {
478 #[must_use]
481 const fn new(
482 data: SchemaStoreCatalogMetadata,
483 index: SchemaStoreCatalogMetadata,
484 schema: SchemaStoreCatalogMetadata,
485 ) -> Self {
486 Self {
487 data,
488 index,
489 schema,
490 }
491 }
492
493 #[must_use]
495 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
496 self.data
497 }
498
499 #[must_use]
501 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
502 self.index
503 }
504
505 #[must_use]
508 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
509 self.schema
510 }
511}
512
513impl SchemaStoreFootprint {
514 #[must_use]
516 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
517 Self {
518 snapshots,
519 encoded_bytes,
520 latest_snapshot_bytes,
521 }
522 }
523
524 #[must_use]
526 pub(in crate::db) const fn snapshots(self) -> u64 {
527 self.snapshots
528 }
529
530 #[must_use]
532 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
533 self.encoded_bytes
534 }
535
536 #[must_use]
538 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
539 self.latest_snapshot_bytes
540 }
541}
542
543pub struct SchemaStore {
552 backend: SchemaStoreBackend,
553}
554
555enum SchemaStoreBackend {
556 Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
557 Journaled {
558 canonical:
559 StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
560 live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
561 tombstones: BTreeSet<RawSchemaKey>,
562 },
563}
564
565#[derive(Clone, Copy, Debug, Eq, PartialEq)]
567enum SchemaStoreVisit {
568 Continue,
569 #[allow(
570 dead_code,
571 reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
572 )]
573 Stop,
574}
575
576impl SchemaStoreVisit {
577 const fn should_stop(self) -> bool {
578 matches!(self, Self::Stop)
579 }
580}
581
582impl SchemaStore {
583 #[must_use]
585 pub const fn init_heap() -> Self {
586 Self {
587 backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
588 }
589 }
590
591 #[must_use]
596 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
597 Self {
598 backend: SchemaStoreBackend::Journaled {
599 canonical: StableBTreeMap::init(memory),
600 live: StdBTreeMap::new(),
601 tombstones: BTreeSet::new(),
602 },
603 }
604 }
605
606 pub(in crate::db) fn insert_persisted_snapshot(
608 &mut self,
609 entity: EntityTag,
610 snapshot: &PersistedSchemaSnapshot,
611 ) -> Result<(), InternalError> {
612 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
613 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
614 let _ = self.insert_raw_snapshot(key, raw_snapshot);
615
616 Ok(())
617 }
618
619 pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
623 &mut self,
624 expected: AcceptedCatalogIdentity,
625 snapshot: &PersistedSchemaSnapshot,
626 ) -> Result<(), InternalError> {
627 let live = self.latest_catalog_identity(
628 expected.entity_tag(),
629 expected.entity_path(),
630 expected.store_path(),
631 )?;
632 if live
633 .as_ref()
634 .map(AcceptedCatalogSnapshotSelection::identity)
635 != Some(expected)
636 {
637 return Err(InternalError::schema_ddl_publication_race_lost(
638 expected.entity_path(),
639 ));
640 }
641
642 self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
643 }
644
645 pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
648 let SchemaStoreBackend::Journaled {
649 live, tombstones, ..
650 } = &mut self.backend
651 else {
652 return Err(InternalError::store_invariant());
653 };
654
655 live.clear();
656 tombstones.clear();
657
658 Ok(())
659 }
660
661 pub(in crate::db) fn fold_persisted_snapshot(
663 &mut self,
664 entity: EntityTag,
665 snapshot: &PersistedSchemaSnapshot,
666 ) -> Result<(), InternalError> {
667 let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
668 return Err(InternalError::store_invariant());
669 };
670
671 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
672 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
673 canonical.insert(key, raw_snapshot);
674
675 Ok(())
676 }
677
678 #[cfg(test)]
680 pub(in crate::db) fn get_persisted_snapshot(
681 &self,
682 entity: EntityTag,
683 version: SchemaVersion,
684 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
685 let key = RawSchemaKey::from_entity_version(entity, version);
686 self.get_raw_snapshot(&key)
687 .map(|snapshot| snapshot.decode_persisted_snapshot())
688 .transpose()
689 }
690
691 pub(in crate::db) fn latest_persisted_snapshot(
693 &self,
694 entity: EntityTag,
695 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
696 self.latest_raw_snapshot(entity)
697 .map(|snapshot| snapshot.decode_persisted_snapshot())
698 .transpose()
699 }
700
701 pub(in crate::db) fn latest_catalog_identity(
704 &self,
705 entity: EntityTag,
706 entity_path: &'static str,
707 store_path: &'static str,
708 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
709 let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
710 return Ok(None);
711 };
712 let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
713 let identity =
714 AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
715
716 Ok(Some(AcceptedCatalogSnapshotSelection::new(
717 identity,
718 raw_snapshot.into_bytes(),
719 )))
720 }
721
722 #[must_use]
724 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
725 let mut snapshots = 0u64;
726 let mut encoded_bytes = 0u64;
727 let mut latest = None::<(SchemaVersion, u64)>;
728
729 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
730 if key.entity_tag() != entity {
731 return Ok(SchemaStoreVisit::Continue);
732 }
733
734 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
735 snapshots = snapshots.saturating_add(1);
736 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
737
738 let version = SchemaVersion::new(key.version());
739 if latest
740 .as_ref()
741 .is_none_or(|(latest_version, _)| version > *latest_version)
742 {
743 latest = Some((version, snapshot_bytes));
744 }
745 Ok(SchemaStoreVisit::Continue)
746 });
747
748 SchemaStoreFootprint::new(
749 snapshots,
750 encoded_bytes,
751 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
752 )
753 }
754
755 #[cfg(test)]
761 pub(in crate::db) fn catalog_metadata(
762 &self,
763 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
764 Ok(self
765 .allocation_metadata()?
766 .map(SchemaStoreAllocationMetadata::schema))
767 }
768
769 pub(in crate::db) fn allocation_metadata(
776 &self,
777 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
778 let latest_by_entity = self.latest_raw_snapshots_by_entity();
779 if latest_by_entity.is_empty() {
780 return Ok(None);
781 }
782
783 Ok(Some(SchemaStoreAllocationMetadata::new(
784 derive_data_allocation_metadata(&latest_by_entity)?,
785 derive_index_allocation_metadata(&latest_by_entity)?,
786 derive_schema_catalog_metadata(&latest_by_entity)?,
787 )))
788 }
789
790 fn insert_raw_snapshot(
792 &mut self,
793 key: RawSchemaKey,
794 snapshot: RawSchemaSnapshot,
795 ) -> Option<RawSchemaSnapshot> {
796 let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
797 self.get_raw_snapshot_for_backend(&key)
798 } else {
799 None
800 };
801 match &mut self.backend {
802 SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
803 SchemaStoreBackend::Journaled {
804 live, tombstones, ..
805 } => {
806 tombstones.remove(&key);
807 live.insert(key, snapshot);
808 previous_journaled
809 }
810 }
811 }
812
813 #[must_use]
815 #[cfg(test)]
816 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
817 match &self.backend {
818 SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
819 SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
820 }
821 }
822
823 #[must_use]
825 #[cfg(test)]
826 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
827 match &self.backend {
828 SchemaStoreBackend::Heap(map) => map.contains_key(key),
829 SchemaStoreBackend::Journaled { .. } => {
830 self.get_raw_snapshot_for_backend(key).is_some()
831 }
832 }
833 }
834
835 #[must_use]
837 #[cfg(test)]
838 pub(in crate::db) fn len(&self) -> u64 {
839 match &self.backend {
840 SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
841 SchemaStoreBackend::Journaled { .. } => {
842 let mut count = 0_u64;
843 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
844 count = count.saturating_add(1);
845 Ok(SchemaStoreVisit::Continue)
846 });
847 count
848 }
849 }
850 }
851
852 #[must_use]
854 #[cfg(test)]
855 pub(in crate::db) fn is_empty(&self) -> bool {
856 match &self.backend {
857 SchemaStoreBackend::Heap(map) => map.is_empty(),
858 SchemaStoreBackend::Journaled { .. } => {
859 let mut empty = true;
860 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
861 empty = false;
862 Ok(SchemaStoreVisit::Stop)
863 });
864 empty
865 }
866 }
867 }
868
869 #[cfg(test)]
871 pub(in crate::db) fn clear(&mut self) {
872 match &mut self.backend {
873 SchemaStoreBackend::Heap(map) => map.clear(),
874 SchemaStoreBackend::Journaled {
875 canonical,
876 live,
877 tombstones,
878 } => {
879 live.clear();
880 tombstones.clear();
881 for entry in canonical.iter() {
882 tombstones.insert(*entry.key());
883 }
884 }
885 }
886 }
887
888 fn latest_raw_snapshots_by_entity(
889 &self,
890 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
891 #[cfg(test)]
892 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
893
894 let mut latest_by_entity =
895 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
896
897 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
898 let version = SchemaVersion::new(key.version());
899 match latest_by_entity.get_mut(&key.entity_tag()) {
900 Some((latest_version, latest_snapshot)) if version > *latest_version => {
901 *latest_version = version;
902 *latest_snapshot = snapshot.clone();
903 }
904 None => {
905 latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
906 }
907 Some(_) => {}
908 }
909 Ok(SchemaStoreVisit::Continue)
910 });
911
912 latest_by_entity
913 }
914
915 fn visit_raw_snapshots<E>(
918 &self,
919 visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
920 ) -> Result<(), E> {
921 match &self.backend {
922 SchemaStoreBackend::Heap(map) => {
923 let mut visitor = visitor;
924 for (key, snapshot) in map {
925 if visitor(key, snapshot)?.should_stop() {
926 break;
927 }
928 }
929 }
930 SchemaStoreBackend::Journaled {
931 canonical,
932 live,
933 tombstones,
934 } => Self::visit_journaled_raw_snapshot_range(
935 canonical,
936 live,
937 tombstones,
938 (RangeBound::Unbounded, RangeBound::Unbounded),
939 Direction::Asc,
940 visitor,
941 )?,
942 }
943
944 Ok(())
945 }
946
947 #[cfg(test)]
948 #[must_use]
949 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
950 match &self.backend {
951 SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
952 SchemaStoreBackend::Heap(_) => 0,
953 }
954 }
955
956 fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
957 let SchemaStoreBackend::Journaled {
958 canonical,
959 live,
960 tombstones,
961 } = &self.backend
962 else {
963 return None;
964 };
965
966 if tombstones.contains(key) {
967 return None;
968 }
969 live.get(key).cloned().or_else(|| canonical.get(key))
970 }
971
972 fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
973 self.latest_raw_snapshot_entry(entity)
974 .map(|(_, snapshot)| snapshot)
975 }
976
977 fn latest_raw_snapshot_entry(
978 &self,
979 entity: EntityTag,
980 ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
981 let bounds = RawSchemaKey::entity_range_bounds(entity);
982 match &self.backend {
983 SchemaStoreBackend::Heap(map) => map
984 .range((bounds.0, bounds.1))
985 .next_back()
986 .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
987 SchemaStoreBackend::Journaled {
988 canonical,
989 live,
990 tombstones,
991 } => {
992 let mut latest = None;
993 let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
994 canonical,
995 live,
996 tombstones,
997 bounds,
998 Direction::Desc,
999 |key, snapshot| {
1000 latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1001 Ok(SchemaStoreVisit::Stop)
1002 },
1003 );
1004 latest
1005 }
1006 }
1007 }
1008
1009 fn visit_journaled_raw_snapshot_range<E>(
1010 canonical: &StableBTreeMap<
1011 RawSchemaKey,
1012 RawSchemaSnapshot,
1013 VirtualMemory<DefaultMemoryImpl>,
1014 >,
1015 live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1016 tombstones: &BTreeSet<RawSchemaKey>,
1017 bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1018 direction: Direction,
1019 mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1020 ) -> Result<(), E> {
1021 match direction {
1022 Direction::Asc => visit_ordered_overlay(
1023 canonical.range((bounds.0, bounds.1)),
1024 live.range((bounds.0, bounds.1)),
1025 Direction::Asc,
1026 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1027 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1028 |live_entry| !tombstones.contains(live_entry.0),
1029 |entry| {
1030 let visit = match entry {
1031 OrderedOverlayEntry::Canonical(canonical_entry) => {
1032 visitor(canonical_entry.key(), &canonical_entry.value())?
1033 }
1034 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1035 };
1036 Ok(if visit.should_stop() {
1037 OrderedOverlayVisit::Stop
1038 } else {
1039 OrderedOverlayVisit::Continue
1040 })
1041 },
1042 ),
1043 Direction::Desc => visit_ordered_overlay(
1044 canonical.range((bounds.0, bounds.1)).rev(),
1045 live.range((bounds.0, bounds.1)).rev(),
1046 Direction::Desc,
1047 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1048 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1049 |live_entry| !tombstones.contains(live_entry.0),
1050 |entry| {
1051 let visit = match entry {
1052 OrderedOverlayEntry::Canonical(canonical_entry) => {
1053 visitor(canonical_entry.key(), &canonical_entry.value())?
1054 }
1055 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1056 };
1057 Ok(if visit.should_stop() {
1058 OrderedOverlayVisit::Stop
1059 } else {
1060 OrderedOverlayVisit::Continue
1061 })
1062 },
1063 ),
1064 }
1065 }
1066}
1067
1068fn derive_data_allocation_metadata(
1069 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1070) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1071 let mut max_version = SchemaVersion::initial();
1072 let mut hasher = new_hash_sha256();
1073 write_hash_tag_u8(
1074 &mut hasher,
1075 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1076 );
1077
1078 for (entity, (_, snapshot)) in latest_by_entity {
1079 let persisted = snapshot.decode_persisted_snapshot()?;
1080 if persisted.version() > max_version {
1081 max_version = persisted.version();
1082 }
1083
1084 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1085 persisted.version(),
1086 persisted.entity_path().to_string(),
1087 persisted.entity_name().to_string(),
1088 persisted.primary_key_field_ids().to_vec(),
1089 persisted.row_layout().clone(),
1090 persisted.fields().to_vec(),
1091 Vec::new(),
1092 );
1093 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1094
1095 write_hash_u64(&mut hasher, entity.value());
1096 write_hash_u32(&mut hasher, persisted.version().get());
1097 write_hash_len_u32(&mut hasher, encoded.len());
1098 hasher.update(encoded);
1099 }
1100
1101 Ok(finalize_schema_metadata(
1102 max_version,
1103 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1104 hasher,
1105 latest_by_entity.len(),
1106 ))
1107}
1108
1109fn derive_index_allocation_metadata(
1110 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1111) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1112 let mut max_version = SchemaVersion::initial();
1113 let mut hasher = new_hash_sha256();
1114 write_hash_tag_u8(
1115 &mut hasher,
1116 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1117 );
1118
1119 for (entity, (_, snapshot)) in latest_by_entity {
1120 let persisted = snapshot.decode_persisted_snapshot()?;
1121 if persisted.version() > max_version {
1122 max_version = persisted.version();
1123 }
1124
1125 write_hash_u64(&mut hasher, entity.value());
1126 write_hash_u32(&mut hasher, persisted.version().get());
1127 write_hash_len_u32(&mut hasher, persisted.indexes().len());
1128 for index in persisted.indexes() {
1129 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1130 write_hash_str_u32(&mut hasher, index.name());
1131 write_hash_str_u32(&mut hasher, index.store());
1132 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1133 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1134 match index.predicate_sql() {
1135 Some(predicate_sql) => {
1136 write_hash_tag_u8(&mut hasher, 1);
1137 write_hash_str_u32(&mut hasher, predicate_sql);
1138 }
1139 None => write_hash_tag_u8(&mut hasher, 0),
1140 }
1141 hash_persisted_index_key(&mut hasher, index.key());
1142 }
1143 }
1144
1145 Ok(finalize_schema_metadata(
1146 max_version,
1147 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1148 hasher,
1149 latest_by_entity.len(),
1150 ))
1151}
1152
1153fn derive_schema_catalog_metadata(
1154 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1155) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1156 let mut max_version = SchemaVersion::initial();
1157 let mut hasher = new_hash_sha256();
1158 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1159
1160 for (entity, (version, snapshot)) in latest_by_entity {
1161 let persisted = snapshot.decode_persisted_snapshot()?;
1162 if persisted.version() > max_version {
1163 max_version = persisted.version();
1164 }
1165
1166 write_hash_u64(&mut hasher, entity.value());
1167 write_hash_u32(&mut hasher, version.get());
1168 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1169 hasher.update(snapshot.as_bytes());
1170 }
1171
1172 Ok(finalize_schema_metadata(
1173 max_version,
1174 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1175 hasher,
1176 latest_by_entity.len(),
1177 ))
1178}
1179
1180fn finalize_schema_metadata(
1181 schema_version: SchemaVersion,
1182 schema_fingerprint_method_version: u8,
1183 hasher: sha2::Sha256,
1184 entity_count: usize,
1185) -> SchemaStoreCatalogMetadata {
1186 let digest = finalize_hash_sha256(hasher);
1187 let mut schema_fingerprint = [0u8; 16];
1188 schema_fingerprint.copy_from_slice(&digest[..16]);
1189
1190 SchemaStoreCatalogMetadata::new(
1191 schema_version,
1192 schema_fingerprint_method_version,
1193 schema_fingerprint,
1194 u64::try_from(entity_count).unwrap_or(u64::MAX),
1195 )
1196}
1197
1198fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1199 match key {
1200 PersistedIndexKeySnapshot::FieldPath(paths) => {
1201 write_hash_tag_u8(hasher, 1);
1202 write_hash_len_u32(hasher, paths.len());
1203 for path in paths {
1204 hash_persisted_index_field_path(hasher, path);
1205 }
1206 }
1207 PersistedIndexKeySnapshot::Items(items) => {
1208 write_hash_tag_u8(hasher, 2);
1209 write_hash_len_u32(hasher, items.len());
1210 for item in items {
1211 match item {
1212 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1213 write_hash_tag_u8(hasher, 1);
1214 hash_persisted_index_field_path(hasher, path);
1215 }
1216 PersistedIndexKeyItemSnapshot::Expression(expression) => {
1217 write_hash_tag_u8(hasher, 2);
1218 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1219 hash_persisted_index_field_path(hasher, expression.source());
1220 hash_persisted_field_kind(hasher, expression.input_kind());
1221 hash_persisted_field_kind(hasher, expression.output_kind());
1222 write_hash_str_u32(hasher, expression.canonical_text());
1223 }
1224 }
1225 }
1226 }
1227 }
1228}
1229
1230fn hash_persisted_index_field_path(
1231 hasher: &mut sha2::Sha256,
1232 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1233) {
1234 write_hash_u32(hasher, path.field_id().get());
1235 write_hash_u32(hasher, u32::from(path.slot().get()));
1236 write_hash_len_u32(hasher, path.path().len());
1237 for segment in path.path() {
1238 write_hash_str_u32(hasher, segment);
1239 }
1240 hash_persisted_field_kind(hasher, path.kind());
1241 write_hash_tag_u8(hasher, u8::from(path.nullable()));
1242}
1243
1244fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1245 match kind {
1246 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1247 PersistedFieldKind::Blob { max_len } => {
1248 write_hash_tag_u8(hasher, 2);
1249 hash_optional_u32(hasher, *max_len);
1250 }
1251 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1252 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1253 PersistedFieldKind::Decimal { scale } => {
1254 write_hash_tag_u8(hasher, 5);
1255 write_hash_u32(hasher, *scale);
1256 }
1257 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1258 PersistedFieldKind::Enum { path, variants } => {
1259 write_hash_tag_u8(hasher, 7);
1260 write_hash_str_u32(hasher, path);
1261 write_hash_len_u32(hasher, variants.len());
1262 for variant in variants {
1263 write_hash_str_u32(hasher, variant.ident());
1264 match variant.payload_kind() {
1265 Some(payload_kind) => {
1266 write_hash_tag_u8(hasher, 1);
1267 hash_persisted_field_kind(hasher, payload_kind);
1268 }
1269 None => write_hash_tag_u8(hasher, 0),
1270 }
1271 write_hash_str_u32(
1272 hasher,
1273 field_storage_decode_name(variant.payload_storage_decode()),
1274 );
1275 }
1276 }
1277 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1278 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1279 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1280 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1281 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1282 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1283 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1284 PersistedFieldKind::IntBig { max_bytes } => {
1285 write_hash_tag_u8(hasher, 15);
1286 write_hash_u32(hasher, *max_bytes);
1287 }
1288 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1289 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1290 PersistedFieldKind::Text { max_len } => {
1291 write_hash_tag_u8(hasher, 18);
1292 hash_optional_u32(hasher, *max_len);
1293 }
1294 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1295 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1296 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1297 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1298 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1299 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1300 PersistedFieldKind::NatBig { max_bytes } => {
1301 write_hash_tag_u8(hasher, 25);
1302 write_hash_u32(hasher, *max_bytes);
1303 }
1304 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1305 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1306 PersistedFieldKind::Relation {
1307 target_path,
1308 target_entity_name,
1309 target_entity_tag,
1310 target_store_path,
1311 key_kind,
1312 strength,
1313 } => {
1314 write_hash_tag_u8(hasher, 28);
1315 write_hash_str_u32(hasher, target_path);
1316 write_hash_str_u32(hasher, target_entity_name);
1317 write_hash_u64(hasher, target_entity_tag.value());
1318 write_hash_str_u32(hasher, target_store_path);
1319 hash_persisted_field_kind(hasher, key_kind);
1320 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1321 }
1322 PersistedFieldKind::List(inner) => {
1323 write_hash_tag_u8(hasher, 29);
1324 hash_persisted_field_kind(hasher, inner);
1325 }
1326 PersistedFieldKind::Set(inner) => {
1327 write_hash_tag_u8(hasher, 30);
1328 hash_persisted_field_kind(hasher, inner);
1329 }
1330 PersistedFieldKind::Map { key, value } => {
1331 write_hash_tag_u8(hasher, 31);
1332 hash_persisted_field_kind(hasher, key);
1333 hash_persisted_field_kind(hasher, value);
1334 }
1335 PersistedFieldKind::Structured { queryable } => {
1336 write_hash_tag_u8(hasher, 32);
1337 write_hash_tag_u8(hasher, u8::from(*queryable));
1338 }
1339 }
1340}
1341
1342fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1343 match value {
1344 Some(value) => {
1345 write_hash_tag_u8(hasher, 1);
1346 write_hash_u32(hasher, value);
1347 }
1348 None => write_hash_tag_u8(hasher, 0),
1349 }
1350}
1351
1352const fn persisted_index_origin_name(
1353 origin: crate::db::schema::PersistedIndexOrigin,
1354) -> &'static str {
1355 match origin {
1356 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1357 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1358 }
1359}
1360
1361const fn persisted_expression_op_name(
1362 op: crate::db::schema::PersistedIndexExpressionOp,
1363) -> &'static str {
1364 match op {
1365 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1366 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1367 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1368 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1369 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1370 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1371 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1372 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1373 }
1374}
1375
1376const fn persisted_relation_strength_name(
1377 strength: crate::db::schema::PersistedRelationStrength,
1378) -> &'static str {
1379 match strength {
1380 crate::db::schema::PersistedRelationStrength::Strong => "strong",
1381 crate::db::schema::PersistedRelationStrength::Weak => "weak",
1382 }
1383}
1384
1385const fn field_storage_decode_name(
1386 decode: crate::model::field::FieldStorageDecode,
1387) -> &'static str {
1388 match decode {
1389 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1390 crate::model::field::FieldStorageDecode::Value => "value",
1391 }
1392}
1393
1394#[cfg(test)]
1399mod tests;