Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49
50#[cfg(test)]
51thread_local! {
52    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
53}
54
55#[cfg(test)]
56pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
57    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
58}
59
60#[cfg(test)]
61pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
62    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
63}
64
65///
66/// RawSchemaKey
67///
68/// Stable key for one persisted schema snapshot entry.
69/// It combines the entity tag and schema version so reconciliation can load
70/// concrete versions without depending on generated entity names.
71///
72
73#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
74struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
75
76impl RawSchemaKey {
77    /// Build the raw persisted key for one entity schema version.
78    #[must_use]
79    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
80        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
82        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
83
84        Self(out)
85    }
86
87    /// Return the entity tag encoded in this schema key.
88    #[must_use]
89    fn entity_tag(self) -> EntityTag {
90        let mut bytes = [0u8; size_of::<u64>()];
91        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
92
93        EntityTag::new(u64::from_be_bytes(bytes))
94    }
95
96    /// Return the schema version encoded in this schema key.
97    #[must_use]
98    fn version(self) -> u32 {
99        let mut bytes = [0u8; size_of::<u32>()];
100        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
101
102        u32::from_be_bytes(bytes)
103    }
104
105    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
106        (
107            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
108            RangeBound::Included(Self::from_entity_version(
109                entity,
110                SchemaVersion::new(u32::MAX),
111            )),
112        )
113    }
114}
115
116impl Storable for RawSchemaKey {
117    fn to_bytes(&self) -> Cow<'_, [u8]> {
118        Cow::Borrowed(&self.0)
119    }
120
121    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
122        debug_assert_eq!(
123            bytes.len(),
124            SCHEMA_KEY_BYTES_USIZE,
125            "RawSchemaKey::from_bytes received unexpected byte length",
126        );
127
128        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
129            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
130        }
131
132        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
133        out.copy_from_slice(bytes.as_ref());
134        Self(out)
135    }
136
137    fn into_bytes(self) -> Vec<u8> {
138        self.0.to_vec()
139    }
140
141    const BOUND: StorableBound = StorableBound::Bounded {
142        max_size: SCHEMA_KEY_BYTES,
143        is_fixed_size: true,
144    };
145}
146
147///
148/// RawSchemaSnapshot
149///
150/// Raw persisted schema snapshot payload.
151/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
152/// keeping the stable-memory value boundary independent from the typed schema
153/// DTOs used by reconciliation.
154///
155
156#[derive(Clone, Debug, Eq, PartialEq)]
157struct RawSchemaSnapshot {
158    payload: Vec<u8>,
159    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
160}
161
162impl RawSchemaSnapshot {
163    /// Encode one typed persisted-schema snapshot into a raw store payload.
164    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
165        validate_typed_schema_snapshot_for_store(snapshot)?;
166
167        let accepted_schema_fingerprint =
168            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
169        let payload = encode_persisted_schema_snapshot(snapshot)?;
170
171        Ok(Self {
172            payload,
173            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
174        })
175    }
176
177    /// Build one raw schema snapshot from already-encoded bytes.
178    #[must_use]
179    #[cfg(test)]
180    const fn from_bytes(payload: Vec<u8>) -> Self {
181        Self {
182            payload,
183            accepted_schema_fingerprint: None,
184        }
185    }
186
187    /// Borrow the encoded schema snapshot payload.
188    #[must_use]
189    const fn as_bytes(&self) -> &[u8] {
190        self.payload.as_slice()
191    }
192
193    /// Consume the snapshot into its encoded payload bytes.
194    #[must_use]
195    fn into_bytes(self) -> Vec<u8> {
196        self.payload
197    }
198
199    /// Return the accepted schema identity fingerprint stored beside the raw
200    /// payload, without decoding the persisted snapshot.
201    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
202        self.accepted_schema_fingerprint
203            .ok_or_else(InternalError::store_corruption)
204    }
205
206    /// Decode this raw store payload into a typed persisted-schema snapshot.
207    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
208        decode_persisted_schema_snapshot(self.as_bytes())
209    }
210}
211
212#[derive(Clone, Copy, Debug, Eq, PartialEq)]
213pub(in crate::db) struct AcceptedCatalogIdentity {
214    entity_tag: EntityTag,
215    entity_path: &'static str,
216    store_path: &'static str,
217    accepted_schema_version: SchemaVersion,
218    fingerprint_method_version: u8,
219    accepted_schema_fingerprint: CommitSchemaFingerprint,
220}
221
222impl AcceptedCatalogIdentity {
223    #[must_use]
224    pub(in crate::db) const fn new(
225        entity_tag: EntityTag,
226        entity_path: &'static str,
227        store_path: &'static str,
228        accepted_schema_version: SchemaVersion,
229        accepted_schema_fingerprint: CommitSchemaFingerprint,
230    ) -> Self {
231        Self {
232            entity_tag,
233            entity_path,
234            store_path,
235            accepted_schema_version,
236            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
237            accepted_schema_fingerprint,
238        }
239    }
240
241    #[must_use]
242    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
243        self.entity_tag
244    }
245
246    #[must_use]
247    pub(in crate::db) const fn entity_path(self) -> &'static str {
248        self.entity_path
249    }
250
251    #[must_use]
252    pub(in crate::db) const fn store_path(self) -> &'static str {
253        self.store_path
254    }
255
256    #[must_use]
257    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
258        self.accepted_schema_version
259    }
260
261    #[must_use]
262    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
263        self.fingerprint_method_version
264    }
265
266    #[must_use]
267    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
268        self.accepted_schema_fingerprint
269    }
270}
271
272#[derive(Clone, Debug, Eq, PartialEq)]
273pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
274    identity: AcceptedCatalogIdentity,
275    raw_snapshot: Vec<u8>,
276}
277
278impl AcceptedCatalogSnapshotSelection {
279    #[must_use]
280    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
281        Self {
282            identity,
283            raw_snapshot,
284        }
285    }
286
287    #[must_use]
288    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
289        self.identity
290    }
291
292    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
293        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
294        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
295        let identity = self.identity();
296
297        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
298            return Err(InternalError::store_invariant());
299        }
300        if accepted.entity_path() != identity.entity_path() {
301            return Err(InternalError::store_invariant());
302        }
303
304        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
305        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
306            return Err(InternalError::store_invariant());
307        }
308
309        Ok(accepted)
310    }
311}
312
313impl Storable for RawSchemaSnapshot {
314    fn to_bytes(&self) -> Cow<'_, [u8]> {
315        let Some(fingerprint) = self.accepted_schema_fingerprint else {
316            return Cow::Borrowed(self.as_bytes());
317        };
318
319        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
320        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
321        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
322        bytes.extend_from_slice(&fingerprint);
323        bytes.extend_from_slice(self.as_bytes());
324
325        Cow::Owned(bytes)
326    }
327
328    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
329        let bytes = bytes.into_owned();
330        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
331            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
332            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
333        {
334            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
335            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
336            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
337            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
338
339            return Self {
340                payload: bytes[fingerprint_end..].to_vec(),
341                accepted_schema_fingerprint: Some(fingerprint),
342            };
343        }
344
345        Self {
346            payload: bytes,
347            accepted_schema_fingerprint: None,
348        }
349    }
350
351    fn into_bytes(self) -> Vec<u8> {
352        let Some(fingerprint) = self.accepted_schema_fingerprint else {
353            return self.payload;
354        };
355
356        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
357        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
358        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
359        bytes.extend_from_slice(&fingerprint);
360        bytes.extend_from_slice(&self.payload);
361
362        bytes
363    }
364
365    const BOUND: StorableBound = StorableBound::Unbounded;
366}
367
368// Validate typed schema snapshots before they are encoded into the raw schema
369// metadata store. This catches caller-side invariant violations separately from
370// raw persisted-byte corruption handled by the codec decode boundary.
371fn validate_typed_schema_snapshot_for_store(
372    snapshot: &PersistedSchemaSnapshot,
373) -> Result<(), InternalError> {
374    if schema_snapshot_integrity_detail(
375        "schema snapshot",
376        snapshot.version(),
377        snapshot.primary_key_field_ids(),
378        snapshot.row_layout(),
379        snapshot.fields(),
380    )
381    .is_some()
382    {
383        return Err(InternalError::store_invariant());
384    }
385
386    Ok(())
387}
388
389///
390/// SchemaStoreFootprint
391///
392/// Current raw schema metadata footprint for one entity. Reconciliation uses
393/// this value to report stable-memory pressure without decoding schema payloads
394/// or exposing field-level metadata through metrics.
395///
396
397#[derive(Clone, Copy, Debug, Eq, PartialEq)]
398pub(in crate::db) struct SchemaStoreFootprint {
399    snapshots: u64,
400    encoded_bytes: u64,
401    latest_snapshot_bytes: u64,
402}
403
404///
405/// SchemaStoreCatalogMetadata
406///
407/// Accepted schema-store catalog metadata derived from latest persisted
408/// snapshots. This is diagnostic allocation metadata, not allocation identity.
409///
410
411#[derive(Clone, Copy, Debug, Eq, PartialEq)]
412pub(in crate::db) struct SchemaStoreCatalogMetadata {
413    schema_version: SchemaVersion,
414    schema_fingerprint_method_version: u8,
415    schema_fingerprint: CommitSchemaFingerprint,
416    entity_count: u64,
417}
418
419impl SchemaStoreCatalogMetadata {
420    /// Build catalog metadata from already-derived accepted schema facts.
421    #[must_use]
422    const fn new(
423        schema_version: SchemaVersion,
424        schema_fingerprint_method_version: u8,
425        schema_fingerprint: CommitSchemaFingerprint,
426        entity_count: u64,
427    ) -> Self {
428        Self {
429            schema_version,
430            schema_fingerprint_method_version,
431            schema_fingerprint,
432            entity_count,
433        }
434    }
435
436    /// Return the maximum latest schema version represented in the catalog.
437    #[must_use]
438    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
439        self.schema_version
440    }
441
442    /// Return the fingerprint method version for this diagnostic metadata row.
443    #[must_use]
444    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
445        self.schema_fingerprint_method_version
446    }
447
448    /// Return the deterministic catalog fingerprint for latest accepted
449    /// snapshots.
450    #[must_use]
451    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
452        self.schema_fingerprint
453    }
454
455    /// Return number of entity schemas represented in this catalog metadata.
456    #[must_use]
457    pub(in crate::db) const fn entity_count(self) -> u64 {
458        self.entity_count
459    }
460}
461
462///
463/// SchemaStoreAllocationMetadata
464///
465/// Role-specific allocation metadata derived from latest accepted schema-store
466/// snapshots. These fingerprints describe the accepted contract that owns each
467/// allocation role; they are diagnostics, not allocation identity.
468///
469
470#[derive(Clone, Copy, Debug, Eq, PartialEq)]
471pub(in crate::db) struct SchemaStoreAllocationMetadata {
472    data: SchemaStoreCatalogMetadata,
473    index: SchemaStoreCatalogMetadata,
474    schema: SchemaStoreCatalogMetadata,
475}
476
477impl SchemaStoreAllocationMetadata {
478    /// Build one role-specific metadata set from already-derived accepted
479    /// schema facts.
480    #[must_use]
481    const fn new(
482        data: SchemaStoreCatalogMetadata,
483        index: SchemaStoreCatalogMetadata,
484        schema: SchemaStoreCatalogMetadata,
485    ) -> Self {
486        Self {
487            data,
488            index,
489            schema,
490        }
491    }
492
493    /// Return accepted row-layout allocation metadata for data memory.
494    #[must_use]
495    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
496        self.data
497    }
498
499    /// Return accepted index-catalog allocation metadata for index memory.
500    #[must_use]
501    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
502        self.index
503    }
504
505    /// Return accepted full schema-catalog allocation metadata for schema
506    /// memory.
507    #[must_use]
508    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
509        self.schema
510    }
511}
512
513impl SchemaStoreFootprint {
514    /// Build one schema-store footprint from already-counted raw payload facts.
515    #[must_use]
516    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
517        Self {
518            snapshots,
519            encoded_bytes,
520            latest_snapshot_bytes,
521        }
522    }
523
524    /// Return the number of raw schema snapshots stored for the entity.
525    #[must_use]
526    pub(in crate::db) const fn snapshots(self) -> u64 {
527        self.snapshots
528    }
529
530    /// Return the total encoded payload bytes stored for the entity.
531    #[must_use]
532    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
533        self.encoded_bytes
534    }
535
536    /// Return the encoded payload bytes for the highest-version snapshot.
537    #[must_use]
538    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
539        self.latest_snapshot_bytes
540    }
541}
542
543///
544/// SchemaStore
545///
546/// Thin persistence wrapper over one journaled or heap schema metadata BTreeMap.
547/// Startup reconciliation writes and validates encoded schema snapshots here
548/// before row/index operations proceed.
549///
550
551pub struct SchemaStore {
552    backend: SchemaStoreBackend,
553}
554
555enum SchemaStoreBackend {
556    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
557    Journaled {
558        canonical:
559            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
560        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
561        tombstones: BTreeSet<RawSchemaKey>,
562    },
563}
564
565/// Control-flow result for schema-store traversal visitors.
566#[derive(Clone, Copy, Debug, Eq, PartialEq)]
567enum SchemaStoreVisit {
568    Continue,
569    #[allow(
570        dead_code,
571        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
572    )]
573    Stop,
574}
575
576impl SchemaStoreVisit {
577    const fn should_stop(self) -> bool {
578        matches!(self, Self::Stop)
579    }
580}
581
582impl SchemaStore {
583    /// Initialize a volatile heap-backed schema store.
584    #[must_use]
585    pub const fn init_heap() -> Self {
586        Self {
587            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
588        }
589    }
590
591    /// Initialize a journaled cached-stable schema store.
592    ///
593    /// Normal schema publication writes only the live projection. Canonical
594    /// stable schema history is updated by future journal fold/recovery paths.
595    #[must_use]
596    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
597        Self {
598            backend: SchemaStoreBackend::Journaled {
599                canonical: StableBTreeMap::init(memory),
600                live: StdBTreeMap::new(),
601                tombstones: BTreeSet::new(),
602            },
603        }
604    }
605
606    /// Insert or replace one typed persisted schema snapshot.
607    pub(in crate::db) fn insert_persisted_snapshot(
608        &mut self,
609        entity: EntityTag,
610        snapshot: &PersistedSchemaSnapshot,
611    ) -> Result<(), InternalError> {
612        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
613        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
614        let _ = self.insert_raw_snapshot(key, raw_snapshot);
615
616        Ok(())
617    }
618
619    /// Insert one typed persisted schema snapshot only if the current live
620    /// accepted catalog identity still matches the identity captured before
621    /// schema mutation planning.
622    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
623        &mut self,
624        expected: AcceptedCatalogIdentity,
625        snapshot: &PersistedSchemaSnapshot,
626    ) -> Result<(), InternalError> {
627        let live = self.latest_catalog_identity(
628            expected.entity_tag(),
629            expected.entity_path(),
630            expected.store_path(),
631        )?;
632        if live
633            .as_ref()
634            .map(AcceptedCatalogSnapshotSelection::identity)
635            != Some(expected)
636        {
637            return Err(InternalError::schema_ddl_publication_race_lost(
638                expected.entity_path(),
639            ));
640        }
641
642        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
643    }
644
645    /// Reset the volatile projection for journaled recovery without mutating
646    /// the canonical stable schema base.
647    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
648        let SchemaStoreBackend::Journaled {
649            live, tombstones, ..
650        } = &mut self.backend
651        else {
652            return Err(InternalError::store_invariant());
653        };
654
655        live.clear();
656        tombstones.clear();
657
658        Ok(())
659    }
660
661    /// Apply one folded journal schema snapshot into the canonical stable base.
662    pub(in crate::db) fn fold_persisted_snapshot(
663        &mut self,
664        entity: EntityTag,
665        snapshot: &PersistedSchemaSnapshot,
666    ) -> Result<(), InternalError> {
667        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
668            return Err(InternalError::store_invariant());
669        };
670
671        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
672        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
673        canonical.insert(key, raw_snapshot);
674
675        Ok(())
676    }
677
678    /// Load and decode one typed persisted schema snapshot.
679    #[cfg(test)]
680    pub(in crate::db) fn get_persisted_snapshot(
681        &self,
682        entity: EntityTag,
683        version: SchemaVersion,
684    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
685        let key = RawSchemaKey::from_entity_version(entity, version);
686        self.get_raw_snapshot(&key)
687            .map(|snapshot| snapshot.decode_persisted_snapshot())
688            .transpose()
689    }
690
691    /// Load and decode the highest stored schema snapshot version for one entity.
692    pub(in crate::db) fn latest_persisted_snapshot(
693        &self,
694        entity: EntityTag,
695    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
696        self.latest_raw_snapshot(entity)
697            .map(|snapshot| snapshot.decode_persisted_snapshot())
698            .transpose()
699    }
700
701    /// Return the latest accepted catalog identity for one entity without
702    /// decoding the selected schema snapshot.
703    pub(in crate::db) fn latest_catalog_identity(
704        &self,
705        entity: EntityTag,
706        entity_path: &'static str,
707        store_path: &'static str,
708    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
709        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
710            return Ok(None);
711        };
712        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
713        let identity =
714            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
715
716        Ok(Some(AcceptedCatalogSnapshotSelection::new(
717            identity,
718            raw_snapshot.into_bytes(),
719        )))
720    }
721
722    /// Return raw schema-store footprint facts for one entity.
723    #[must_use]
724    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
725        let mut snapshots = 0u64;
726        let mut encoded_bytes = 0u64;
727        let mut latest = None::<(SchemaVersion, u64)>;
728
729        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
730            if key.entity_tag() != entity {
731                return Ok(SchemaStoreVisit::Continue);
732            }
733
734            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
735            snapshots = snapshots.saturating_add(1);
736            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
737
738            let version = SchemaVersion::new(key.version());
739            if latest
740                .as_ref()
741                .is_none_or(|(latest_version, _)| version > *latest_version)
742            {
743                latest = Some((version, snapshot_bytes));
744            }
745            Ok(SchemaStoreVisit::Continue)
746        });
747
748        SchemaStoreFootprint::new(
749            snapshots,
750            encoded_bytes,
751            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
752        )
753    }
754
755    /// Derive accepted catalog metadata from latest persisted schema snapshots.
756    ///
757    /// This function intentionally reads only the persisted schema store. It
758    /// does not reconstruct metadata from generated models when the store has
759    /// no accepted snapshots.
760    #[cfg(test)]
761    pub(in crate::db) fn catalog_metadata(
762        &self,
763    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
764        Ok(self
765            .allocation_metadata()?
766            .map(SchemaStoreAllocationMetadata::schema))
767    }
768
769    /// Derive role-specific allocation metadata from latest persisted schema
770    /// snapshots.
771    ///
772    /// This function intentionally reads only accepted schema-store payloads.
773    /// It never reconstructs metadata from generated models when the store has
774    /// no accepted snapshots.
775    pub(in crate::db) fn allocation_metadata(
776        &self,
777    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
778        let latest_by_entity = self.latest_raw_snapshots_by_entity();
779        if latest_by_entity.is_empty() {
780            return Ok(None);
781        }
782
783        Ok(Some(SchemaStoreAllocationMetadata::new(
784            derive_data_allocation_metadata(&latest_by_entity)?,
785            derive_index_allocation_metadata(&latest_by_entity)?,
786            derive_schema_catalog_metadata(&latest_by_entity)?,
787        )))
788    }
789
790    /// Insert or replace one raw schema snapshot.
791    fn insert_raw_snapshot(
792        &mut self,
793        key: RawSchemaKey,
794        snapshot: RawSchemaSnapshot,
795    ) -> Option<RawSchemaSnapshot> {
796        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
797            self.get_raw_snapshot_for_backend(&key)
798        } else {
799            None
800        };
801        match &mut self.backend {
802            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
803            SchemaStoreBackend::Journaled {
804                live, tombstones, ..
805            } => {
806                tombstones.remove(&key);
807                live.insert(key, snapshot);
808                previous_journaled
809            }
810        }
811    }
812
813    /// Load one raw schema snapshot by key.
814    #[must_use]
815    #[cfg(test)]
816    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
817        match &self.backend {
818            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
819            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
820        }
821    }
822
823    /// Return whether one schema snapshot key is present.
824    #[must_use]
825    #[cfg(test)]
826    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
827        match &self.backend {
828            SchemaStoreBackend::Heap(map) => map.contains_key(key),
829            SchemaStoreBackend::Journaled { .. } => {
830                self.get_raw_snapshot_for_backend(key).is_some()
831            }
832        }
833    }
834
835    /// Return the number of schema snapshot entries in this store.
836    #[must_use]
837    #[cfg(test)]
838    pub(in crate::db) fn len(&self) -> u64 {
839        match &self.backend {
840            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
841            SchemaStoreBackend::Journaled { .. } => {
842                let mut count = 0_u64;
843                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
844                    count = count.saturating_add(1);
845                    Ok(SchemaStoreVisit::Continue)
846                });
847                count
848            }
849        }
850    }
851
852    /// Return whether this schema store currently has no persisted snapshots.
853    #[must_use]
854    #[cfg(test)]
855    pub(in crate::db) fn is_empty(&self) -> bool {
856        match &self.backend {
857            SchemaStoreBackend::Heap(map) => map.is_empty(),
858            SchemaStoreBackend::Journaled { .. } => {
859                let mut empty = true;
860                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
861                    empty = false;
862                    Ok(SchemaStoreVisit::Stop)
863                });
864                empty
865            }
866        }
867    }
868
869    /// Clear all schema metadata entries from the store.
870    #[cfg(test)]
871    pub(in crate::db) fn clear(&mut self) {
872        match &mut self.backend {
873            SchemaStoreBackend::Heap(map) => map.clear(),
874            SchemaStoreBackend::Journaled {
875                canonical,
876                live,
877                tombstones,
878            } => {
879                live.clear();
880                tombstones.clear();
881                for entry in canonical.iter() {
882                    tombstones.insert(*entry.key());
883                }
884            }
885        }
886    }
887
888    fn latest_raw_snapshots_by_entity(
889        &self,
890    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
891        #[cfg(test)]
892        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
893
894        let mut latest_by_entity =
895            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
896
897        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
898            let version = SchemaVersion::new(key.version());
899            match latest_by_entity.get_mut(&key.entity_tag()) {
900                Some((latest_version, latest_snapshot)) if version > *latest_version => {
901                    *latest_version = version;
902                    *latest_snapshot = snapshot.clone();
903                }
904                None => {
905                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
906                }
907                Some(_) => {}
908            }
909            Ok(SchemaStoreVisit::Continue)
910        });
911
912        latest_by_entity
913    }
914
915    /// Visit raw schema snapshots in canonical store order without exposing
916    /// the backing stable-map iterator.
917    fn visit_raw_snapshots<E>(
918        &self,
919        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
920    ) -> Result<(), E> {
921        match &self.backend {
922            SchemaStoreBackend::Heap(map) => {
923                let mut visitor = visitor;
924                for (key, snapshot) in map {
925                    if visitor(key, snapshot)?.should_stop() {
926                        break;
927                    }
928                }
929            }
930            SchemaStoreBackend::Journaled {
931                canonical,
932                live,
933                tombstones,
934            } => Self::visit_journaled_raw_snapshot_range(
935                canonical,
936                live,
937                tombstones,
938                (RangeBound::Unbounded, RangeBound::Unbounded),
939                Direction::Asc,
940                visitor,
941            )?,
942        }
943
944        Ok(())
945    }
946
947    #[cfg(test)]
948    #[must_use]
949    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
950        match &self.backend {
951            SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
952            SchemaStoreBackend::Heap(_) => 0,
953        }
954    }
955
956    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
957        let SchemaStoreBackend::Journaled {
958            canonical,
959            live,
960            tombstones,
961        } = &self.backend
962        else {
963            return None;
964        };
965
966        if tombstones.contains(key) {
967            return None;
968        }
969        live.get(key).cloned().or_else(|| canonical.get(key))
970    }
971
972    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
973        self.latest_raw_snapshot_entry(entity)
974            .map(|(_, snapshot)| snapshot)
975    }
976
977    fn latest_raw_snapshot_entry(
978        &self,
979        entity: EntityTag,
980    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
981        let bounds = RawSchemaKey::entity_range_bounds(entity);
982        match &self.backend {
983            SchemaStoreBackend::Heap(map) => map
984                .range((bounds.0, bounds.1))
985                .next_back()
986                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
987            SchemaStoreBackend::Journaled {
988                canonical,
989                live,
990                tombstones,
991            } => {
992                let mut latest = None;
993                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
994                    canonical,
995                    live,
996                    tombstones,
997                    bounds,
998                    Direction::Desc,
999                    |key, snapshot| {
1000                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1001                        Ok(SchemaStoreVisit::Stop)
1002                    },
1003                );
1004                latest
1005            }
1006        }
1007    }
1008
1009    fn visit_journaled_raw_snapshot_range<E>(
1010        canonical: &StableBTreeMap<
1011            RawSchemaKey,
1012            RawSchemaSnapshot,
1013            VirtualMemory<DefaultMemoryImpl>,
1014        >,
1015        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1016        tombstones: &BTreeSet<RawSchemaKey>,
1017        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1018        direction: Direction,
1019        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1020    ) -> Result<(), E> {
1021        match direction {
1022            Direction::Asc => visit_ordered_overlay(
1023                canonical.range((bounds.0, bounds.1)),
1024                live.range((bounds.0, bounds.1)),
1025                Direction::Asc,
1026                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1027                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1028                |live_entry| !tombstones.contains(live_entry.0),
1029                |entry| {
1030                    let visit = match entry {
1031                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1032                            visitor(canonical_entry.key(), &canonical_entry.value())?
1033                        }
1034                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1035                    };
1036                    Ok(if visit.should_stop() {
1037                        OrderedOverlayVisit::Stop
1038                    } else {
1039                        OrderedOverlayVisit::Continue
1040                    })
1041                },
1042            ),
1043            Direction::Desc => visit_ordered_overlay(
1044                canonical.range((bounds.0, bounds.1)).rev(),
1045                live.range((bounds.0, bounds.1)).rev(),
1046                Direction::Desc,
1047                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1048                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1049                |live_entry| !tombstones.contains(live_entry.0),
1050                |entry| {
1051                    let visit = match entry {
1052                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1053                            visitor(canonical_entry.key(), &canonical_entry.value())?
1054                        }
1055                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1056                    };
1057                    Ok(if visit.should_stop() {
1058                        OrderedOverlayVisit::Stop
1059                    } else {
1060                        OrderedOverlayVisit::Continue
1061                    })
1062                },
1063            ),
1064        }
1065    }
1066}
1067
1068fn derive_data_allocation_metadata(
1069    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1070) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1071    let mut max_version = SchemaVersion::initial();
1072    let mut hasher = new_hash_sha256();
1073    write_hash_tag_u8(
1074        &mut hasher,
1075        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1076    );
1077
1078    for (entity, (_, snapshot)) in latest_by_entity {
1079        let persisted = snapshot.decode_persisted_snapshot()?;
1080        if persisted.version() > max_version {
1081            max_version = persisted.version();
1082        }
1083
1084        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1085            persisted.version(),
1086            persisted.entity_path().to_string(),
1087            persisted.entity_name().to_string(),
1088            persisted.primary_key_field_ids().to_vec(),
1089            persisted.row_layout().clone(),
1090            persisted.fields().to_vec(),
1091            Vec::new(),
1092        );
1093        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1094
1095        write_hash_u64(&mut hasher, entity.value());
1096        write_hash_u32(&mut hasher, persisted.version().get());
1097        write_hash_len_u32(&mut hasher, encoded.len());
1098        hasher.update(encoded);
1099    }
1100
1101    Ok(finalize_schema_metadata(
1102        max_version,
1103        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1104        hasher,
1105        latest_by_entity.len(),
1106    ))
1107}
1108
1109fn derive_index_allocation_metadata(
1110    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1111) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1112    let mut max_version = SchemaVersion::initial();
1113    let mut hasher = new_hash_sha256();
1114    write_hash_tag_u8(
1115        &mut hasher,
1116        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1117    );
1118
1119    for (entity, (_, snapshot)) in latest_by_entity {
1120        let persisted = snapshot.decode_persisted_snapshot()?;
1121        if persisted.version() > max_version {
1122            max_version = persisted.version();
1123        }
1124
1125        write_hash_u64(&mut hasher, entity.value());
1126        write_hash_u32(&mut hasher, persisted.version().get());
1127        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1128        for index in persisted.indexes() {
1129            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1130            write_hash_str_u32(&mut hasher, index.name());
1131            write_hash_str_u32(&mut hasher, index.store());
1132            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1133            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1134            match index.predicate_sql() {
1135                Some(predicate_sql) => {
1136                    write_hash_tag_u8(&mut hasher, 1);
1137                    write_hash_str_u32(&mut hasher, predicate_sql);
1138                }
1139                None => write_hash_tag_u8(&mut hasher, 0),
1140            }
1141            hash_persisted_index_key(&mut hasher, index.key());
1142        }
1143    }
1144
1145    Ok(finalize_schema_metadata(
1146        max_version,
1147        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1148        hasher,
1149        latest_by_entity.len(),
1150    ))
1151}
1152
1153fn derive_schema_catalog_metadata(
1154    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1155) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1156    let mut max_version = SchemaVersion::initial();
1157    let mut hasher = new_hash_sha256();
1158    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1159
1160    for (entity, (version, snapshot)) in latest_by_entity {
1161        let persisted = snapshot.decode_persisted_snapshot()?;
1162        if persisted.version() > max_version {
1163            max_version = persisted.version();
1164        }
1165
1166        write_hash_u64(&mut hasher, entity.value());
1167        write_hash_u32(&mut hasher, version.get());
1168        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1169        hasher.update(snapshot.as_bytes());
1170    }
1171
1172    Ok(finalize_schema_metadata(
1173        max_version,
1174        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1175        hasher,
1176        latest_by_entity.len(),
1177    ))
1178}
1179
1180fn finalize_schema_metadata(
1181    schema_version: SchemaVersion,
1182    schema_fingerprint_method_version: u8,
1183    hasher: sha2::Sha256,
1184    entity_count: usize,
1185) -> SchemaStoreCatalogMetadata {
1186    let digest = finalize_hash_sha256(hasher);
1187    let mut schema_fingerprint = [0u8; 16];
1188    schema_fingerprint.copy_from_slice(&digest[..16]);
1189
1190    SchemaStoreCatalogMetadata::new(
1191        schema_version,
1192        schema_fingerprint_method_version,
1193        schema_fingerprint,
1194        u64::try_from(entity_count).unwrap_or(u64::MAX),
1195    )
1196}
1197
1198fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1199    match key {
1200        PersistedIndexKeySnapshot::FieldPath(paths) => {
1201            write_hash_tag_u8(hasher, 1);
1202            write_hash_len_u32(hasher, paths.len());
1203            for path in paths {
1204                hash_persisted_index_field_path(hasher, path);
1205            }
1206        }
1207        PersistedIndexKeySnapshot::Items(items) => {
1208            write_hash_tag_u8(hasher, 2);
1209            write_hash_len_u32(hasher, items.len());
1210            for item in items {
1211                match item {
1212                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1213                        write_hash_tag_u8(hasher, 1);
1214                        hash_persisted_index_field_path(hasher, path);
1215                    }
1216                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1217                        write_hash_tag_u8(hasher, 2);
1218                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1219                        hash_persisted_index_field_path(hasher, expression.source());
1220                        hash_persisted_field_kind(hasher, expression.input_kind());
1221                        hash_persisted_field_kind(hasher, expression.output_kind());
1222                        write_hash_str_u32(hasher, expression.canonical_text());
1223                    }
1224                }
1225            }
1226        }
1227    }
1228}
1229
1230fn hash_persisted_index_field_path(
1231    hasher: &mut sha2::Sha256,
1232    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1233) {
1234    write_hash_u32(hasher, path.field_id().get());
1235    write_hash_u32(hasher, u32::from(path.slot().get()));
1236    write_hash_len_u32(hasher, path.path().len());
1237    for segment in path.path() {
1238        write_hash_str_u32(hasher, segment);
1239    }
1240    hash_persisted_field_kind(hasher, path.kind());
1241    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1242}
1243
1244fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1245    match kind {
1246        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1247        PersistedFieldKind::Blob { max_len } => {
1248            write_hash_tag_u8(hasher, 2);
1249            hash_optional_u32(hasher, *max_len);
1250        }
1251        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1252        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1253        PersistedFieldKind::Decimal { scale } => {
1254            write_hash_tag_u8(hasher, 5);
1255            write_hash_u32(hasher, *scale);
1256        }
1257        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1258        PersistedFieldKind::Enum { path, variants } => {
1259            write_hash_tag_u8(hasher, 7);
1260            write_hash_str_u32(hasher, path);
1261            write_hash_len_u32(hasher, variants.len());
1262            for variant in variants {
1263                write_hash_str_u32(hasher, variant.ident());
1264                match variant.payload_kind() {
1265                    Some(payload_kind) => {
1266                        write_hash_tag_u8(hasher, 1);
1267                        hash_persisted_field_kind(hasher, payload_kind);
1268                    }
1269                    None => write_hash_tag_u8(hasher, 0),
1270                }
1271                write_hash_str_u32(
1272                    hasher,
1273                    field_storage_decode_name(variant.payload_storage_decode()),
1274                );
1275            }
1276        }
1277        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1278        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1279        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1280        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1281        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1282        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1283        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1284        PersistedFieldKind::IntBig { max_bytes } => {
1285            write_hash_tag_u8(hasher, 15);
1286            write_hash_u32(hasher, *max_bytes);
1287        }
1288        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1289        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1290        PersistedFieldKind::Text { max_len } => {
1291            write_hash_tag_u8(hasher, 18);
1292            hash_optional_u32(hasher, *max_len);
1293        }
1294        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1295        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1296        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1297        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1298        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1299        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1300        PersistedFieldKind::NatBig { max_bytes } => {
1301            write_hash_tag_u8(hasher, 25);
1302            write_hash_u32(hasher, *max_bytes);
1303        }
1304        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1305        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1306        PersistedFieldKind::Relation {
1307            target_path,
1308            target_entity_name,
1309            target_entity_tag,
1310            target_store_path,
1311            key_kind,
1312            strength,
1313        } => {
1314            write_hash_tag_u8(hasher, 28);
1315            write_hash_str_u32(hasher, target_path);
1316            write_hash_str_u32(hasher, target_entity_name);
1317            write_hash_u64(hasher, target_entity_tag.value());
1318            write_hash_str_u32(hasher, target_store_path);
1319            hash_persisted_field_kind(hasher, key_kind);
1320            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1321        }
1322        PersistedFieldKind::List(inner) => {
1323            write_hash_tag_u8(hasher, 29);
1324            hash_persisted_field_kind(hasher, inner);
1325        }
1326        PersistedFieldKind::Set(inner) => {
1327            write_hash_tag_u8(hasher, 30);
1328            hash_persisted_field_kind(hasher, inner);
1329        }
1330        PersistedFieldKind::Map { key, value } => {
1331            write_hash_tag_u8(hasher, 31);
1332            hash_persisted_field_kind(hasher, key);
1333            hash_persisted_field_kind(hasher, value);
1334        }
1335        PersistedFieldKind::Structured { queryable } => {
1336            write_hash_tag_u8(hasher, 32);
1337            write_hash_tag_u8(hasher, u8::from(*queryable));
1338        }
1339    }
1340}
1341
1342fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1343    match value {
1344        Some(value) => {
1345            write_hash_tag_u8(hasher, 1);
1346            write_hash_u32(hasher, value);
1347        }
1348        None => write_hash_tag_u8(hasher, 0),
1349    }
1350}
1351
1352const fn persisted_index_origin_name(
1353    origin: crate::db::schema::PersistedIndexOrigin,
1354) -> &'static str {
1355    match origin {
1356        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1357        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1358    }
1359}
1360
1361const fn persisted_expression_op_name(
1362    op: crate::db::schema::PersistedIndexExpressionOp,
1363) -> &'static str {
1364    match op {
1365        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1366        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1367        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1368        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1369        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1370        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1371        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1372        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1373    }
1374}
1375
1376const fn persisted_relation_strength_name(
1377    strength: crate::db::schema::PersistedRelationStrength,
1378) -> &'static str {
1379    match strength {
1380        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1381        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1382    }
1383}
1384
1385const fn field_storage_decode_name(
1386    decode: crate::model::field::FieldStorageDecode,
1387) -> &'static str {
1388    match decode {
1389        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1390        crate::model::field::FieldStorageDecode::Value => "value",
1391    }
1392}
1393
1394///
1395/// TESTS
1396///
1397
1398#[cfg(test)]
1399mod tests;