1use crate::{
7 db::{
8 codec::{
9 finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10 write_hash_tag_u8, write_hash_u32, write_hash_u64,
11 },
12 commit::CommitSchemaFingerprint,
13 direction::Direction,
14 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15 schema::{
16 AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17 PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18 accepted_schema_cache_fingerprint,
19 accepted_schema_cache_fingerprint_for_persisted_snapshot,
20 accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21 encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22 },
23 },
24 error::InternalError,
25 traits::Storable,
26 types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49
50#[cfg(test)]
51thread_local! {
52 static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
53}
54
55#[cfg(test)]
56pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
57 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
58}
59
60#[cfg(test)]
61pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
62 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
63}
64
65#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
74struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
75
76impl RawSchemaKey {
77 #[must_use]
79 fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
80 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81 out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
82 out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
83
84 Self(out)
85 }
86
87 #[must_use]
89 fn entity_tag(self) -> EntityTag {
90 let mut bytes = [0u8; size_of::<u64>()];
91 bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
92
93 EntityTag::new(u64::from_be_bytes(bytes))
94 }
95
96 #[must_use]
98 fn version(self) -> u32 {
99 let mut bytes = [0u8; size_of::<u32>()];
100 bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
101
102 u32::from_be_bytes(bytes)
103 }
104
105 fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
106 (
107 RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
108 RangeBound::Included(Self::from_entity_version(
109 entity,
110 SchemaVersion::new(u32::MAX),
111 )),
112 )
113 }
114}
115
116impl Storable for RawSchemaKey {
117 fn to_bytes(&self) -> Cow<'_, [u8]> {
118 Cow::Borrowed(&self.0)
119 }
120
121 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
122 debug_assert_eq!(
123 bytes.len(),
124 SCHEMA_KEY_BYTES_USIZE,
125 "RawSchemaKey::from_bytes received unexpected byte length",
126 );
127
128 if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
129 return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
130 }
131
132 let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
133 out.copy_from_slice(bytes.as_ref());
134 Self(out)
135 }
136
137 fn into_bytes(self) -> Vec<u8> {
138 self.0.to_vec()
139 }
140
141 const BOUND: StorableBound = StorableBound::Bounded {
142 max_size: SCHEMA_KEY_BYTES,
143 is_fixed_size: true,
144 };
145}
146
147#[derive(Clone, Debug, Eq, PartialEq)]
157struct RawSchemaSnapshot {
158 payload: Vec<u8>,
159 accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
160}
161
162impl RawSchemaSnapshot {
163 fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
165 validate_typed_schema_snapshot_for_store(snapshot)?;
166
167 let accepted_schema_fingerprint =
168 accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
169 let payload = encode_persisted_schema_snapshot(snapshot)?;
170
171 Ok(Self {
172 payload,
173 accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
174 })
175 }
176
177 #[must_use]
179 #[cfg(test)]
180 const fn from_bytes(payload: Vec<u8>) -> Self {
181 Self {
182 payload,
183 accepted_schema_fingerprint: None,
184 }
185 }
186
187 #[must_use]
189 const fn as_bytes(&self) -> &[u8] {
190 self.payload.as_slice()
191 }
192
193 #[must_use]
195 fn into_bytes(self) -> Vec<u8> {
196 self.payload
197 }
198
199 fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
202 self.accepted_schema_fingerprint
203 .ok_or_else(InternalError::store_corruption)
204 }
205
206 fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
208 decode_persisted_schema_snapshot(self.as_bytes())
209 }
210}
211
212#[derive(Clone, Copy, Debug, Eq, PartialEq)]
213pub(in crate::db) struct AcceptedCatalogIdentity {
214 entity_tag: EntityTag,
215 entity_path: &'static str,
216 store_path: &'static str,
217 accepted_schema_version: SchemaVersion,
218 fingerprint_method_version: u8,
219 accepted_schema_fingerprint: CommitSchemaFingerprint,
220}
221
222impl AcceptedCatalogIdentity {
223 #[must_use]
224 pub(in crate::db) const fn new(
225 entity_tag: EntityTag,
226 entity_path: &'static str,
227 store_path: &'static str,
228 accepted_schema_version: SchemaVersion,
229 accepted_schema_fingerprint: CommitSchemaFingerprint,
230 ) -> Self {
231 Self {
232 entity_tag,
233 entity_path,
234 store_path,
235 accepted_schema_version,
236 fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
237 accepted_schema_fingerprint,
238 }
239 }
240
241 #[must_use]
242 pub(in crate::db) const fn entity_tag(self) -> EntityTag {
243 self.entity_tag
244 }
245
246 #[must_use]
247 pub(in crate::db) const fn entity_path(self) -> &'static str {
248 self.entity_path
249 }
250
251 #[must_use]
252 pub(in crate::db) const fn store_path(self) -> &'static str {
253 self.store_path
254 }
255
256 #[must_use]
257 pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
258 self.accepted_schema_version
259 }
260
261 #[must_use]
262 pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
263 self.fingerprint_method_version
264 }
265
266 #[must_use]
267 pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
268 self.accepted_schema_fingerprint
269 }
270}
271
272#[derive(Clone, Debug, Eq, PartialEq)]
273pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
274 identity: AcceptedCatalogIdentity,
275 raw_snapshot: Vec<u8>,
276}
277
278impl AcceptedCatalogSnapshotSelection {
279 #[must_use]
280 const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
281 Self {
282 identity,
283 raw_snapshot,
284 }
285 }
286
287 #[must_use]
288 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
289 self.identity
290 }
291
292 pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
293 let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
294 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
295 let identity = self.identity();
296
297 if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
298 return Err(InternalError::store_invariant());
299 }
300 if accepted.entity_path() != identity.entity_path() {
301 return Err(InternalError::store_invariant());
302 }
303
304 let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
305 if decoded_fingerprint != identity.accepted_schema_fingerprint() {
306 return Err(InternalError::store_invariant());
307 }
308
309 Ok(accepted)
310 }
311}
312
313impl Storable for RawSchemaSnapshot {
314 fn to_bytes(&self) -> Cow<'_, [u8]> {
315 let Some(fingerprint) = self.accepted_schema_fingerprint else {
316 return Cow::Borrowed(self.as_bytes());
317 };
318
319 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
320 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
321 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
322 bytes.extend_from_slice(&fingerprint);
323 bytes.extend_from_slice(self.as_bytes());
324
325 Cow::Owned(bytes)
326 }
327
328 fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
329 let bytes = bytes.into_owned();
330 if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
331 && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
332 && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
333 {
334 let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
335 let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
336 let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
337 fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
338
339 return Self {
340 payload: bytes[fingerprint_end..].to_vec(),
341 accepted_schema_fingerprint: Some(fingerprint),
342 };
343 }
344
345 Self {
346 payload: bytes,
347 accepted_schema_fingerprint: None,
348 }
349 }
350
351 fn into_bytes(self) -> Vec<u8> {
352 let Some(fingerprint) = self.accepted_schema_fingerprint else {
353 return self.payload;
354 };
355
356 let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
357 bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
358 bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
359 bytes.extend_from_slice(&fingerprint);
360 bytes.extend_from_slice(&self.payload);
361
362 bytes
363 }
364
365 const BOUND: StorableBound = StorableBound::Unbounded;
366}
367
368fn validate_typed_schema_snapshot_for_store(
372 snapshot: &PersistedSchemaSnapshot,
373) -> Result<(), InternalError> {
374 if schema_snapshot_integrity_detail(
375 "schema snapshot",
376 snapshot.version(),
377 snapshot.primary_key_field_ids(),
378 snapshot.row_layout(),
379 snapshot.fields(),
380 )
381 .is_some()
382 {
383 return Err(InternalError::store_invariant());
384 }
385
386 Ok(())
387}
388
389#[derive(Clone, Copy, Debug, Eq, PartialEq)]
398pub(in crate::db) struct SchemaStoreFootprint {
399 snapshots: u64,
400 encoded_bytes: u64,
401 latest_snapshot_bytes: u64,
402}
403
404#[derive(Clone, Copy, Debug, Eq, PartialEq)]
412pub(in crate::db) struct SchemaStoreCatalogMetadata {
413 schema_version: SchemaVersion,
414 schema_fingerprint_method_version: u8,
415 schema_fingerprint: CommitSchemaFingerprint,
416 entity_count: u64,
417}
418
419impl SchemaStoreCatalogMetadata {
420 #[must_use]
422 const fn new(
423 schema_version: SchemaVersion,
424 schema_fingerprint_method_version: u8,
425 schema_fingerprint: CommitSchemaFingerprint,
426 entity_count: u64,
427 ) -> Self {
428 Self {
429 schema_version,
430 schema_fingerprint_method_version,
431 schema_fingerprint,
432 entity_count,
433 }
434 }
435
436 #[must_use]
438 pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
439 self.schema_version
440 }
441
442 #[must_use]
444 pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
445 self.schema_fingerprint_method_version
446 }
447
448 #[must_use]
451 pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
452 self.schema_fingerprint
453 }
454
455 #[must_use]
457 pub(in crate::db) const fn entity_count(self) -> u64 {
458 self.entity_count
459 }
460}
461
462#[derive(Clone, Copy, Debug, Eq, PartialEq)]
471pub(in crate::db) struct SchemaStoreAllocationMetadata {
472 data: SchemaStoreCatalogMetadata,
473 index: SchemaStoreCatalogMetadata,
474 schema: SchemaStoreCatalogMetadata,
475}
476
477impl SchemaStoreAllocationMetadata {
478 #[must_use]
481 const fn new(
482 data: SchemaStoreCatalogMetadata,
483 index: SchemaStoreCatalogMetadata,
484 schema: SchemaStoreCatalogMetadata,
485 ) -> Self {
486 Self {
487 data,
488 index,
489 schema,
490 }
491 }
492
493 #[must_use]
495 pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
496 self.data
497 }
498
499 #[must_use]
501 pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
502 self.index
503 }
504
505 #[must_use]
508 pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
509 self.schema
510 }
511}
512
513impl SchemaStoreFootprint {
514 #[must_use]
516 const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
517 Self {
518 snapshots,
519 encoded_bytes,
520 latest_snapshot_bytes,
521 }
522 }
523
524 #[must_use]
526 pub(in crate::db) const fn snapshots(self) -> u64 {
527 self.snapshots
528 }
529
530 #[must_use]
532 pub(in crate::db) const fn encoded_bytes(self) -> u64 {
533 self.encoded_bytes
534 }
535
536 #[must_use]
538 pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
539 self.latest_snapshot_bytes
540 }
541}
542
543pub struct SchemaStore {
552 backend: SchemaStoreBackend,
553}
554
555enum SchemaStoreBackend {
556 Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
557 Journaled {
558 canonical:
559 StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
560 live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
561 tombstones: BTreeSet<RawSchemaKey>,
562 },
563}
564
565#[derive(Clone, Copy, Debug, Eq, PartialEq)]
567enum SchemaStoreVisit {
568 Continue,
569 Stop,
570}
571
572impl SchemaStoreVisit {
573 const fn should_stop(self) -> bool {
574 matches!(self, Self::Stop)
575 }
576}
577
578impl SchemaStore {
579 #[must_use]
581 pub const fn init_heap() -> Self {
582 Self {
583 backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
584 }
585 }
586
587 #[must_use]
592 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
593 Self {
594 backend: SchemaStoreBackend::Journaled {
595 canonical: StableBTreeMap::init(memory),
596 live: StdBTreeMap::new(),
597 tombstones: BTreeSet::new(),
598 },
599 }
600 }
601
602 pub(in crate::db) fn insert_persisted_snapshot(
604 &mut self,
605 entity: EntityTag,
606 snapshot: &PersistedSchemaSnapshot,
607 ) -> Result<(), InternalError> {
608 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
609 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
610 let _ = self.insert_raw_snapshot(key, raw_snapshot);
611
612 Ok(())
613 }
614
615 pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
619 &mut self,
620 expected: AcceptedCatalogIdentity,
621 snapshot: &PersistedSchemaSnapshot,
622 ) -> Result<(), InternalError> {
623 let live = self.latest_catalog_identity(
624 expected.entity_tag(),
625 expected.entity_path(),
626 expected.store_path(),
627 )?;
628 if live
629 .as_ref()
630 .map(AcceptedCatalogSnapshotSelection::identity)
631 != Some(expected)
632 {
633 return Err(InternalError::schema_ddl_publication_race_lost(
634 expected.entity_path(),
635 ));
636 }
637
638 self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
639 }
640
641 pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
644 let SchemaStoreBackend::Journaled {
645 live, tombstones, ..
646 } = &mut self.backend
647 else {
648 return Err(InternalError::store_invariant());
649 };
650
651 live.clear();
652 tombstones.clear();
653
654 Ok(())
655 }
656
657 pub(in crate::db) fn fold_persisted_snapshot(
659 &mut self,
660 entity: EntityTag,
661 snapshot: &PersistedSchemaSnapshot,
662 ) -> Result<(), InternalError> {
663 let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
664 return Err(InternalError::store_invariant());
665 };
666
667 let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
668 let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
669 canonical.insert(key, raw_snapshot);
670
671 Ok(())
672 }
673
674 #[cfg(test)]
676 pub(in crate::db) fn get_persisted_snapshot(
677 &self,
678 entity: EntityTag,
679 version: SchemaVersion,
680 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
681 let key = RawSchemaKey::from_entity_version(entity, version);
682 self.get_raw_snapshot(&key)
683 .map(|snapshot| snapshot.decode_persisted_snapshot())
684 .transpose()
685 }
686
687 pub(in crate::db) fn latest_persisted_snapshot(
689 &self,
690 entity: EntityTag,
691 ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
692 self.latest_raw_snapshot(entity)
693 .map(|snapshot| snapshot.decode_persisted_snapshot())
694 .transpose()
695 }
696
697 pub(in crate::db) fn latest_catalog_identity(
700 &self,
701 entity: EntityTag,
702 entity_path: &'static str,
703 store_path: &'static str,
704 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
705 let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
706 return Ok(None);
707 };
708 let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
709 let identity =
710 AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
711
712 Ok(Some(AcceptedCatalogSnapshotSelection::new(
713 identity,
714 raw_snapshot.into_bytes(),
715 )))
716 }
717
718 #[must_use]
720 pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
721 let mut snapshots = 0u64;
722 let mut encoded_bytes = 0u64;
723 let mut latest = None::<(SchemaVersion, u64)>;
724
725 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
726 if key.entity_tag() != entity {
727 return Ok(SchemaStoreVisit::Continue);
728 }
729
730 let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
731 snapshots = snapshots.saturating_add(1);
732 encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
733
734 let version = SchemaVersion::new(key.version());
735 if latest
736 .as_ref()
737 .is_none_or(|(latest_version, _)| version > *latest_version)
738 {
739 latest = Some((version, snapshot_bytes));
740 }
741 Ok(SchemaStoreVisit::Continue)
742 });
743
744 SchemaStoreFootprint::new(
745 snapshots,
746 encoded_bytes,
747 latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
748 )
749 }
750
751 #[cfg(test)]
757 pub(in crate::db) fn catalog_metadata(
758 &self,
759 ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
760 Ok(self
761 .allocation_metadata()?
762 .map(SchemaStoreAllocationMetadata::schema))
763 }
764
765 pub(in crate::db) fn allocation_metadata(
772 &self,
773 ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
774 let latest_by_entity = self.latest_raw_snapshots_by_entity();
775 if latest_by_entity.is_empty() {
776 return Ok(None);
777 }
778
779 Ok(Some(SchemaStoreAllocationMetadata::new(
780 derive_data_allocation_metadata(&latest_by_entity)?,
781 derive_index_allocation_metadata(&latest_by_entity)?,
782 derive_schema_catalog_metadata(&latest_by_entity)?,
783 )))
784 }
785
786 fn insert_raw_snapshot(
788 &mut self,
789 key: RawSchemaKey,
790 snapshot: RawSchemaSnapshot,
791 ) -> Option<RawSchemaSnapshot> {
792 let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
793 self.get_raw_snapshot_for_backend(&key)
794 } else {
795 None
796 };
797 match &mut self.backend {
798 SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
799 SchemaStoreBackend::Journaled {
800 live, tombstones, ..
801 } => {
802 tombstones.remove(&key);
803 live.insert(key, snapshot);
804 previous_journaled
805 }
806 }
807 }
808
809 #[must_use]
811 #[cfg(test)]
812 fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
813 match &self.backend {
814 SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
815 SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
816 }
817 }
818
819 #[must_use]
821 #[cfg(test)]
822 fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
823 match &self.backend {
824 SchemaStoreBackend::Heap(map) => map.contains_key(key),
825 SchemaStoreBackend::Journaled { .. } => {
826 self.get_raw_snapshot_for_backend(key).is_some()
827 }
828 }
829 }
830
831 #[must_use]
833 #[cfg(test)]
834 pub(in crate::db) fn len(&self) -> u64 {
835 match &self.backend {
836 SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
837 SchemaStoreBackend::Journaled { .. } => {
838 let mut count = 0_u64;
839 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
840 count = count.saturating_add(1);
841 Ok(SchemaStoreVisit::Continue)
842 });
843 count
844 }
845 }
846 }
847
848 #[must_use]
850 #[cfg(test)]
851 pub(in crate::db) fn is_empty(&self) -> bool {
852 match &self.backend {
853 SchemaStoreBackend::Heap(map) => map.is_empty(),
854 SchemaStoreBackend::Journaled { .. } => {
855 let mut empty = true;
856 let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
857 empty = false;
858 Ok(SchemaStoreVisit::Stop)
859 });
860 empty
861 }
862 }
863 }
864
865 #[cfg(test)]
867 pub(in crate::db) fn clear(&mut self) {
868 match &mut self.backend {
869 SchemaStoreBackend::Heap(map) => map.clear(),
870 SchemaStoreBackend::Journaled {
871 canonical,
872 live,
873 tombstones,
874 } => {
875 live.clear();
876 tombstones.clear();
877 for entry in canonical.iter() {
878 tombstones.insert(*entry.key());
879 }
880 }
881 }
882 }
883
884 fn latest_raw_snapshots_by_entity(
885 &self,
886 ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
887 #[cfg(test)]
888 LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
889
890 let mut latest_by_entity =
891 StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
892
893 let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
894 let version = SchemaVersion::new(key.version());
895 match latest_by_entity.get_mut(&key.entity_tag()) {
896 Some((latest_version, latest_snapshot)) if version > *latest_version => {
897 *latest_version = version;
898 *latest_snapshot = snapshot.clone();
899 }
900 None => {
901 latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
902 }
903 Some(_) => {}
904 }
905 Ok(SchemaStoreVisit::Continue)
906 });
907
908 latest_by_entity
909 }
910
911 fn visit_raw_snapshots<E>(
914 &self,
915 visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
916 ) -> Result<(), E> {
917 match &self.backend {
918 SchemaStoreBackend::Heap(map) => {
919 let mut visitor = visitor;
920 for (key, snapshot) in map {
921 if visitor(key, snapshot)?.should_stop() {
922 break;
923 }
924 }
925 }
926 SchemaStoreBackend::Journaled {
927 canonical,
928 live,
929 tombstones,
930 } => Self::visit_journaled_raw_snapshot_range(
931 canonical,
932 live,
933 tombstones,
934 (RangeBound::Unbounded, RangeBound::Unbounded),
935 Direction::Asc,
936 visitor,
937 )?,
938 }
939
940 Ok(())
941 }
942
943 #[cfg(test)]
944 #[must_use]
945 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
946 match &self.backend {
947 SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
948 SchemaStoreBackend::Heap(_) => 0,
949 }
950 }
951
952 fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
953 let SchemaStoreBackend::Journaled {
954 canonical,
955 live,
956 tombstones,
957 } = &self.backend
958 else {
959 return None;
960 };
961
962 if tombstones.contains(key) {
963 return None;
964 }
965 live.get(key).cloned().or_else(|| canonical.get(key))
966 }
967
968 fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
969 self.latest_raw_snapshot_entry(entity)
970 .map(|(_, snapshot)| snapshot)
971 }
972
973 fn latest_raw_snapshot_entry(
974 &self,
975 entity: EntityTag,
976 ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
977 let bounds = RawSchemaKey::entity_range_bounds(entity);
978 match &self.backend {
979 SchemaStoreBackend::Heap(map) => map
980 .range((bounds.0, bounds.1))
981 .next_back()
982 .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
983 SchemaStoreBackend::Journaled {
984 canonical,
985 live,
986 tombstones,
987 } => {
988 let mut latest = None;
989 let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
990 canonical,
991 live,
992 tombstones,
993 bounds,
994 Direction::Desc,
995 |key, snapshot| {
996 latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
997 Ok(SchemaStoreVisit::Stop)
998 },
999 );
1000 latest
1001 }
1002 }
1003 }
1004
1005 fn visit_journaled_raw_snapshot_range<E>(
1006 canonical: &StableBTreeMap<
1007 RawSchemaKey,
1008 RawSchemaSnapshot,
1009 VirtualMemory<DefaultMemoryImpl>,
1010 >,
1011 live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1012 tombstones: &BTreeSet<RawSchemaKey>,
1013 bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1014 direction: Direction,
1015 mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1016 ) -> Result<(), E> {
1017 match direction {
1018 Direction::Asc => visit_ordered_overlay(
1019 canonical.range((bounds.0, bounds.1)),
1020 live.range((bounds.0, bounds.1)),
1021 Direction::Asc,
1022 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1023 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1024 |live_entry| !tombstones.contains(live_entry.0),
1025 |entry| {
1026 let visit = match entry {
1027 OrderedOverlayEntry::Canonical(canonical_entry) => {
1028 visitor(canonical_entry.key(), &canonical_entry.value())?
1029 }
1030 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1031 };
1032 Ok(if visit.should_stop() {
1033 OrderedOverlayVisit::Stop
1034 } else {
1035 OrderedOverlayVisit::Continue
1036 })
1037 },
1038 ),
1039 Direction::Desc => visit_ordered_overlay(
1040 canonical.range((bounds.0, bounds.1)).rev(),
1041 live.range((bounds.0, bounds.1)).rev(),
1042 Direction::Desc,
1043 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1044 |canonical_entry| !tombstones.contains(canonical_entry.key()),
1045 |live_entry| !tombstones.contains(live_entry.0),
1046 |entry| {
1047 let visit = match entry {
1048 OrderedOverlayEntry::Canonical(canonical_entry) => {
1049 visitor(canonical_entry.key(), &canonical_entry.value())?
1050 }
1051 OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1052 };
1053 Ok(if visit.should_stop() {
1054 OrderedOverlayVisit::Stop
1055 } else {
1056 OrderedOverlayVisit::Continue
1057 })
1058 },
1059 ),
1060 }
1061 }
1062}
1063
1064fn derive_data_allocation_metadata(
1065 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1066) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1067 let mut max_version = SchemaVersion::initial();
1068 let mut hasher = new_hash_sha256();
1069 write_hash_tag_u8(
1070 &mut hasher,
1071 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1072 );
1073
1074 for (entity, (_, snapshot)) in latest_by_entity {
1075 let persisted = snapshot.decode_persisted_snapshot()?;
1076 if persisted.version() > max_version {
1077 max_version = persisted.version();
1078 }
1079
1080 let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1081 persisted.version(),
1082 persisted.entity_path().to_string(),
1083 persisted.entity_name().to_string(),
1084 persisted.primary_key_field_ids().to_vec(),
1085 persisted.row_layout().clone(),
1086 persisted.fields().to_vec(),
1087 Vec::new(),
1088 );
1089 let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1090
1091 write_hash_u64(&mut hasher, entity.value());
1092 write_hash_u32(&mut hasher, persisted.version().get());
1093 write_hash_len_u32(&mut hasher, encoded.len());
1094 hasher.update(encoded);
1095 }
1096
1097 Ok(finalize_schema_metadata(
1098 max_version,
1099 SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1100 hasher,
1101 latest_by_entity.len(),
1102 ))
1103}
1104
1105fn derive_index_allocation_metadata(
1106 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1107) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1108 let mut max_version = SchemaVersion::initial();
1109 let mut hasher = new_hash_sha256();
1110 write_hash_tag_u8(
1111 &mut hasher,
1112 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1113 );
1114
1115 for (entity, (_, snapshot)) in latest_by_entity {
1116 let persisted = snapshot.decode_persisted_snapshot()?;
1117 if persisted.version() > max_version {
1118 max_version = persisted.version();
1119 }
1120
1121 write_hash_u64(&mut hasher, entity.value());
1122 write_hash_u32(&mut hasher, persisted.version().get());
1123 write_hash_len_u32(&mut hasher, persisted.indexes().len());
1124 for index in persisted.indexes() {
1125 write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1126 write_hash_str_u32(&mut hasher, index.name());
1127 write_hash_str_u32(&mut hasher, index.store());
1128 write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1129 write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1130 match index.predicate_sql() {
1131 Some(predicate_sql) => {
1132 write_hash_tag_u8(&mut hasher, 1);
1133 write_hash_str_u32(&mut hasher, predicate_sql);
1134 }
1135 None => write_hash_tag_u8(&mut hasher, 0),
1136 }
1137 hash_persisted_index_key(&mut hasher, index.key());
1138 }
1139 }
1140
1141 Ok(finalize_schema_metadata(
1142 max_version,
1143 SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1144 hasher,
1145 latest_by_entity.len(),
1146 ))
1147}
1148
1149fn derive_schema_catalog_metadata(
1150 latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1151) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1152 let mut max_version = SchemaVersion::initial();
1153 let mut hasher = new_hash_sha256();
1154 write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1155
1156 for (entity, (version, snapshot)) in latest_by_entity {
1157 let persisted = snapshot.decode_persisted_snapshot()?;
1158 if persisted.version() > max_version {
1159 max_version = persisted.version();
1160 }
1161
1162 write_hash_u64(&mut hasher, entity.value());
1163 write_hash_u32(&mut hasher, version.get());
1164 write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1165 hasher.update(snapshot.as_bytes());
1166 }
1167
1168 Ok(finalize_schema_metadata(
1169 max_version,
1170 SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1171 hasher,
1172 latest_by_entity.len(),
1173 ))
1174}
1175
1176fn finalize_schema_metadata(
1177 schema_version: SchemaVersion,
1178 schema_fingerprint_method_version: u8,
1179 hasher: sha2::Sha256,
1180 entity_count: usize,
1181) -> SchemaStoreCatalogMetadata {
1182 let digest = finalize_hash_sha256(hasher);
1183 let mut schema_fingerprint = [0u8; 16];
1184 schema_fingerprint.copy_from_slice(&digest[..16]);
1185
1186 SchemaStoreCatalogMetadata::new(
1187 schema_version,
1188 schema_fingerprint_method_version,
1189 schema_fingerprint,
1190 u64::try_from(entity_count).unwrap_or(u64::MAX),
1191 )
1192}
1193
1194fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1195 match key {
1196 PersistedIndexKeySnapshot::FieldPath(paths) => {
1197 write_hash_tag_u8(hasher, 1);
1198 write_hash_len_u32(hasher, paths.len());
1199 for path in paths {
1200 hash_persisted_index_field_path(hasher, path);
1201 }
1202 }
1203 PersistedIndexKeySnapshot::Items(items) => {
1204 write_hash_tag_u8(hasher, 2);
1205 write_hash_len_u32(hasher, items.len());
1206 for item in items {
1207 match item {
1208 PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1209 write_hash_tag_u8(hasher, 1);
1210 hash_persisted_index_field_path(hasher, path);
1211 }
1212 PersistedIndexKeyItemSnapshot::Expression(expression) => {
1213 write_hash_tag_u8(hasher, 2);
1214 write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1215 hash_persisted_index_field_path(hasher, expression.source());
1216 hash_persisted_field_kind(hasher, expression.input_kind());
1217 hash_persisted_field_kind(hasher, expression.output_kind());
1218 write_hash_str_u32(hasher, expression.canonical_text());
1219 }
1220 }
1221 }
1222 }
1223 }
1224}
1225
1226fn hash_persisted_index_field_path(
1227 hasher: &mut sha2::Sha256,
1228 path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1229) {
1230 write_hash_u32(hasher, path.field_id().get());
1231 write_hash_u32(hasher, u32::from(path.slot().get()));
1232 write_hash_len_u32(hasher, path.path().len());
1233 for segment in path.path() {
1234 write_hash_str_u32(hasher, segment);
1235 }
1236 hash_persisted_field_kind(hasher, path.kind());
1237 write_hash_tag_u8(hasher, u8::from(path.nullable()));
1238}
1239
1240fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1241 match kind {
1242 PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1243 PersistedFieldKind::Blob { max_len } => {
1244 write_hash_tag_u8(hasher, 2);
1245 hash_optional_u32(hasher, *max_len);
1246 }
1247 PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1248 PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1249 PersistedFieldKind::Decimal { scale } => {
1250 write_hash_tag_u8(hasher, 5);
1251 write_hash_u32(hasher, *scale);
1252 }
1253 PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1254 PersistedFieldKind::Enum { path, variants } => {
1255 write_hash_tag_u8(hasher, 7);
1256 write_hash_str_u32(hasher, path);
1257 write_hash_len_u32(hasher, variants.len());
1258 for variant in variants {
1259 write_hash_str_u32(hasher, variant.ident());
1260 match variant.payload_kind() {
1261 Some(payload_kind) => {
1262 write_hash_tag_u8(hasher, 1);
1263 hash_persisted_field_kind(hasher, payload_kind);
1264 }
1265 None => write_hash_tag_u8(hasher, 0),
1266 }
1267 write_hash_str_u32(
1268 hasher,
1269 field_storage_decode_name(variant.payload_storage_decode()),
1270 );
1271 }
1272 }
1273 PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1274 PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1275 PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1276 PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1277 PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1278 PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1279 PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1280 PersistedFieldKind::IntBig { max_bytes } => {
1281 write_hash_tag_u8(hasher, 15);
1282 write_hash_u32(hasher, *max_bytes);
1283 }
1284 PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1285 PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1286 PersistedFieldKind::Text { max_len } => {
1287 write_hash_tag_u8(hasher, 18);
1288 hash_optional_u32(hasher, *max_len);
1289 }
1290 PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1291 PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1292 PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1293 PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1294 PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1295 PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1296 PersistedFieldKind::NatBig { max_bytes } => {
1297 write_hash_tag_u8(hasher, 25);
1298 write_hash_u32(hasher, *max_bytes);
1299 }
1300 PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1301 PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1302 PersistedFieldKind::Relation {
1303 target_path,
1304 target_entity_name,
1305 target_entity_tag,
1306 target_store_path,
1307 key_kind,
1308 strength,
1309 } => {
1310 write_hash_tag_u8(hasher, 28);
1311 write_hash_str_u32(hasher, target_path);
1312 write_hash_str_u32(hasher, target_entity_name);
1313 write_hash_u64(hasher, target_entity_tag.value());
1314 write_hash_str_u32(hasher, target_store_path);
1315 hash_persisted_field_kind(hasher, key_kind);
1316 write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1317 }
1318 PersistedFieldKind::List(inner) => {
1319 write_hash_tag_u8(hasher, 29);
1320 hash_persisted_field_kind(hasher, inner);
1321 }
1322 PersistedFieldKind::Set(inner) => {
1323 write_hash_tag_u8(hasher, 30);
1324 hash_persisted_field_kind(hasher, inner);
1325 }
1326 PersistedFieldKind::Map { key, value } => {
1327 write_hash_tag_u8(hasher, 31);
1328 hash_persisted_field_kind(hasher, key);
1329 hash_persisted_field_kind(hasher, value);
1330 }
1331 PersistedFieldKind::Structured { queryable } => {
1332 write_hash_tag_u8(hasher, 32);
1333 write_hash_tag_u8(hasher, u8::from(*queryable));
1334 }
1335 }
1336}
1337
1338fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1339 match value {
1340 Some(value) => {
1341 write_hash_tag_u8(hasher, 1);
1342 write_hash_u32(hasher, value);
1343 }
1344 None => write_hash_tag_u8(hasher, 0),
1345 }
1346}
1347
1348const fn persisted_index_origin_name(
1349 origin: crate::db::schema::PersistedIndexOrigin,
1350) -> &'static str {
1351 match origin {
1352 crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1353 crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1354 }
1355}
1356
1357const fn persisted_expression_op_name(
1358 op: crate::db::schema::PersistedIndexExpressionOp,
1359) -> &'static str {
1360 match op {
1361 crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1362 crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1363 crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1364 crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1365 crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1366 crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1367 crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1368 crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1369 }
1370}
1371
1372const fn persisted_relation_strength_name(
1373 strength: crate::db::schema::PersistedRelationStrength,
1374) -> &'static str {
1375 match strength {
1376 crate::db::schema::PersistedRelationStrength::Strong => "strong",
1377 crate::db::schema::PersistedRelationStrength::Weak => "weak",
1378 }
1379}
1380
1381const fn field_storage_decode_name(
1382 decode: crate::model::field::FieldStorageDecode,
1383) -> &'static str {
1384 match decode {
1385 crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1386 crate::model::field::FieldStorageDecode::Value => "value",
1387 }
1388}
1389
1390#[cfg(test)]
1395mod tests;