Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49
50#[cfg(test)]
51thread_local! {
52    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
53}
54
55#[cfg(test)]
56pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
57    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
58}
59
60#[cfg(test)]
61pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
62    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
63}
64
65///
66/// RawSchemaKey
67///
68/// Stable key for one persisted schema snapshot entry.
69/// It combines the entity tag and schema version so reconciliation can load
70/// concrete versions without depending on generated entity names.
71///
72
73#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
74struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
75
76impl RawSchemaKey {
77    /// Build the raw persisted key for one entity schema version.
78    #[must_use]
79    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
80        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
81        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
82        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
83
84        Self(out)
85    }
86
87    /// Return the entity tag encoded in this schema key.
88    #[must_use]
89    fn entity_tag(self) -> EntityTag {
90        let mut bytes = [0u8; size_of::<u64>()];
91        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
92
93        EntityTag::new(u64::from_be_bytes(bytes))
94    }
95
96    /// Return the schema version encoded in this schema key.
97    #[must_use]
98    fn version(self) -> u32 {
99        let mut bytes = [0u8; size_of::<u32>()];
100        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
101
102        u32::from_be_bytes(bytes)
103    }
104
105    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
106        (
107            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
108            RangeBound::Included(Self::from_entity_version(
109                entity,
110                SchemaVersion::new(u32::MAX),
111            )),
112        )
113    }
114}
115
116impl Storable for RawSchemaKey {
117    fn to_bytes(&self) -> Cow<'_, [u8]> {
118        Cow::Borrowed(&self.0)
119    }
120
121    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
122        debug_assert_eq!(
123            bytes.len(),
124            SCHEMA_KEY_BYTES_USIZE,
125            "RawSchemaKey::from_bytes received unexpected byte length",
126        );
127
128        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
129            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
130        }
131
132        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
133        out.copy_from_slice(bytes.as_ref());
134        Self(out)
135    }
136
137    fn into_bytes(self) -> Vec<u8> {
138        self.0.to_vec()
139    }
140
141    const BOUND: StorableBound = StorableBound::Bounded {
142        max_size: SCHEMA_KEY_BYTES,
143        is_fixed_size: true,
144    };
145}
146
147///
148/// RawSchemaSnapshot
149///
150/// Raw persisted schema snapshot payload.
151/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
152/// keeping the stable-memory value boundary independent from the typed schema
153/// DTOs used by reconciliation.
154///
155
156#[derive(Clone, Debug, Eq, PartialEq)]
157struct RawSchemaSnapshot {
158    payload: Vec<u8>,
159    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
160}
161
162impl RawSchemaSnapshot {
163    /// Encode one typed persisted-schema snapshot into a raw store payload.
164    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
165        validate_typed_schema_snapshot_for_store(snapshot)?;
166
167        let accepted_schema_fingerprint =
168            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
169        let payload = encode_persisted_schema_snapshot(snapshot)?;
170
171        Ok(Self {
172            payload,
173            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
174        })
175    }
176
177    /// Build one raw schema snapshot from already-encoded bytes.
178    #[must_use]
179    #[cfg(test)]
180    const fn from_bytes(payload: Vec<u8>) -> Self {
181        Self {
182            payload,
183            accepted_schema_fingerprint: None,
184        }
185    }
186
187    /// Borrow the encoded schema snapshot payload.
188    #[must_use]
189    const fn as_bytes(&self) -> &[u8] {
190        self.payload.as_slice()
191    }
192
193    /// Consume the snapshot into its encoded payload bytes.
194    #[must_use]
195    fn into_bytes(self) -> Vec<u8> {
196        self.payload
197    }
198
199    /// Return the accepted schema identity fingerprint stored beside the raw
200    /// payload, without decoding the persisted snapshot.
201    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
202        self.accepted_schema_fingerprint
203            .ok_or_else(InternalError::store_corruption)
204    }
205
206    /// Decode this raw store payload into a typed persisted-schema snapshot.
207    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
208        decode_persisted_schema_snapshot(self.as_bytes())
209    }
210}
211
212#[derive(Clone, Copy, Debug, Eq, PartialEq)]
213pub(in crate::db) struct AcceptedCatalogIdentity {
214    entity_tag: EntityTag,
215    entity_path: &'static str,
216    store_path: &'static str,
217    accepted_schema_version: SchemaVersion,
218    fingerprint_method_version: u8,
219    accepted_schema_fingerprint: CommitSchemaFingerprint,
220}
221
222impl AcceptedCatalogIdentity {
223    #[must_use]
224    pub(in crate::db) const fn new(
225        entity_tag: EntityTag,
226        entity_path: &'static str,
227        store_path: &'static str,
228        accepted_schema_version: SchemaVersion,
229        accepted_schema_fingerprint: CommitSchemaFingerprint,
230    ) -> Self {
231        Self {
232            entity_tag,
233            entity_path,
234            store_path,
235            accepted_schema_version,
236            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
237            accepted_schema_fingerprint,
238        }
239    }
240
241    #[must_use]
242    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
243        self.entity_tag
244    }
245
246    #[must_use]
247    pub(in crate::db) const fn entity_path(self) -> &'static str {
248        self.entity_path
249    }
250
251    #[must_use]
252    pub(in crate::db) const fn store_path(self) -> &'static str {
253        self.store_path
254    }
255
256    #[must_use]
257    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
258        self.accepted_schema_version
259    }
260
261    #[must_use]
262    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
263        self.fingerprint_method_version
264    }
265
266    #[must_use]
267    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
268        self.accepted_schema_fingerprint
269    }
270}
271
272#[derive(Clone, Debug, Eq, PartialEq)]
273pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
274    identity: AcceptedCatalogIdentity,
275    raw_snapshot: Vec<u8>,
276}
277
278impl AcceptedCatalogSnapshotSelection {
279    #[must_use]
280    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
281        Self {
282            identity,
283            raw_snapshot,
284        }
285    }
286
287    #[must_use]
288    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
289        self.identity
290    }
291
292    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
293        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
294        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
295        let identity = self.identity();
296
297        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
298            return Err(InternalError::store_invariant());
299        }
300        if accepted.entity_path() != identity.entity_path() {
301            return Err(InternalError::store_invariant());
302        }
303
304        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
305        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
306            return Err(InternalError::store_invariant());
307        }
308
309        Ok(accepted)
310    }
311}
312
313impl Storable for RawSchemaSnapshot {
314    fn to_bytes(&self) -> Cow<'_, [u8]> {
315        let Some(fingerprint) = self.accepted_schema_fingerprint else {
316            return Cow::Borrowed(self.as_bytes());
317        };
318
319        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
320        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
321        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
322        bytes.extend_from_slice(&fingerprint);
323        bytes.extend_from_slice(self.as_bytes());
324
325        Cow::Owned(bytes)
326    }
327
328    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
329        let bytes = bytes.into_owned();
330        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
331            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
332            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
333        {
334            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
335            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
336            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
337            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
338
339            return Self {
340                payload: bytes[fingerprint_end..].to_vec(),
341                accepted_schema_fingerprint: Some(fingerprint),
342            };
343        }
344
345        Self {
346            payload: bytes,
347            accepted_schema_fingerprint: None,
348        }
349    }
350
351    fn into_bytes(self) -> Vec<u8> {
352        let Some(fingerprint) = self.accepted_schema_fingerprint else {
353            return self.payload;
354        };
355
356        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
357        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
358        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
359        bytes.extend_from_slice(&fingerprint);
360        bytes.extend_from_slice(&self.payload);
361
362        bytes
363    }
364
365    const BOUND: StorableBound = StorableBound::Unbounded;
366}
367
368// Validate typed schema snapshots before they are encoded into the raw schema
369// metadata store. This catches caller-side invariant violations separately from
370// raw persisted-byte corruption handled by the codec decode boundary.
371fn validate_typed_schema_snapshot_for_store(
372    snapshot: &PersistedSchemaSnapshot,
373) -> Result<(), InternalError> {
374    if schema_snapshot_integrity_detail(
375        "schema snapshot",
376        snapshot.version(),
377        snapshot.primary_key_field_ids(),
378        snapshot.row_layout(),
379        snapshot.fields(),
380    )
381    .is_some()
382    {
383        return Err(InternalError::store_invariant());
384    }
385
386    Ok(())
387}
388
389///
390/// SchemaStoreFootprint
391///
392/// Current raw schema metadata footprint for one entity. Reconciliation uses
393/// this value to report stable-memory pressure without decoding schema payloads
394/// or exposing field-level metadata through metrics.
395///
396
397#[derive(Clone, Copy, Debug, Eq, PartialEq)]
398pub(in crate::db) struct SchemaStoreFootprint {
399    snapshots: u64,
400    encoded_bytes: u64,
401    latest_snapshot_bytes: u64,
402}
403
404///
405/// SchemaStoreCatalogMetadata
406///
407/// Accepted schema-store catalog metadata derived from latest persisted
408/// snapshots. This is diagnostic allocation metadata, not allocation identity.
409///
410
411#[derive(Clone, Copy, Debug, Eq, PartialEq)]
412pub(in crate::db) struct SchemaStoreCatalogMetadata {
413    schema_version: SchemaVersion,
414    schema_fingerprint_method_version: u8,
415    schema_fingerprint: CommitSchemaFingerprint,
416    entity_count: u64,
417}
418
419impl SchemaStoreCatalogMetadata {
420    /// Build catalog metadata from already-derived accepted schema facts.
421    #[must_use]
422    const fn new(
423        schema_version: SchemaVersion,
424        schema_fingerprint_method_version: u8,
425        schema_fingerprint: CommitSchemaFingerprint,
426        entity_count: u64,
427    ) -> Self {
428        Self {
429            schema_version,
430            schema_fingerprint_method_version,
431            schema_fingerprint,
432            entity_count,
433        }
434    }
435
436    /// Return the maximum latest schema version represented in the catalog.
437    #[must_use]
438    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
439        self.schema_version
440    }
441
442    /// Return the fingerprint method version for this diagnostic metadata row.
443    #[must_use]
444    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
445        self.schema_fingerprint_method_version
446    }
447
448    /// Return the deterministic catalog fingerprint for latest accepted
449    /// snapshots.
450    #[must_use]
451    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
452        self.schema_fingerprint
453    }
454
455    /// Return number of entity schemas represented in this catalog metadata.
456    #[must_use]
457    pub(in crate::db) const fn entity_count(self) -> u64 {
458        self.entity_count
459    }
460}
461
462///
463/// SchemaStoreAllocationMetadata
464///
465/// Role-specific allocation metadata derived from latest accepted schema-store
466/// snapshots. These fingerprints describe the accepted contract that owns each
467/// allocation role; they are diagnostics, not allocation identity.
468///
469
470#[derive(Clone, Copy, Debug, Eq, PartialEq)]
471pub(in crate::db) struct SchemaStoreAllocationMetadata {
472    data: SchemaStoreCatalogMetadata,
473    index: SchemaStoreCatalogMetadata,
474    schema: SchemaStoreCatalogMetadata,
475}
476
477impl SchemaStoreAllocationMetadata {
478    /// Build one role-specific metadata set from already-derived accepted
479    /// schema facts.
480    #[must_use]
481    const fn new(
482        data: SchemaStoreCatalogMetadata,
483        index: SchemaStoreCatalogMetadata,
484        schema: SchemaStoreCatalogMetadata,
485    ) -> Self {
486        Self {
487            data,
488            index,
489            schema,
490        }
491    }
492
493    /// Return accepted row-layout allocation metadata for data memory.
494    #[must_use]
495    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
496        self.data
497    }
498
499    /// Return accepted index-catalog allocation metadata for index memory.
500    #[must_use]
501    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
502        self.index
503    }
504
505    /// Return accepted full schema-catalog allocation metadata for schema
506    /// memory.
507    #[must_use]
508    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
509        self.schema
510    }
511}
512
513impl SchemaStoreFootprint {
514    /// Build one schema-store footprint from already-counted raw payload facts.
515    #[must_use]
516    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
517        Self {
518            snapshots,
519            encoded_bytes,
520            latest_snapshot_bytes,
521        }
522    }
523
524    /// Return the number of raw schema snapshots stored for the entity.
525    #[must_use]
526    pub(in crate::db) const fn snapshots(self) -> u64 {
527        self.snapshots
528    }
529
530    /// Return the total encoded payload bytes stored for the entity.
531    #[must_use]
532    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
533        self.encoded_bytes
534    }
535
536    /// Return the encoded payload bytes for the highest-version snapshot.
537    #[must_use]
538    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
539        self.latest_snapshot_bytes
540    }
541}
542
543///
544/// SchemaStore
545///
546/// Thin persistence wrapper over one journaled or heap schema metadata BTreeMap.
547/// Startup reconciliation writes and validates encoded schema snapshots here
548/// before row/index operations proceed.
549///
550
551pub struct SchemaStore {
552    backend: SchemaStoreBackend,
553}
554
555enum SchemaStoreBackend {
556    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
557    Journaled {
558        canonical:
559            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
560        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
561        tombstones: BTreeSet<RawSchemaKey>,
562    },
563}
564
565/// Control-flow result for schema-store traversal visitors.
566#[derive(Clone, Copy, Debug, Eq, PartialEq)]
567enum SchemaStoreVisit {
568    Continue,
569    Stop,
570}
571
572impl SchemaStoreVisit {
573    const fn should_stop(self) -> bool {
574        matches!(self, Self::Stop)
575    }
576}
577
578impl SchemaStore {
579    /// Initialize a volatile heap-backed schema store.
580    #[must_use]
581    pub const fn init_heap() -> Self {
582        Self {
583            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
584        }
585    }
586
587    /// Initialize a journaled cached-stable schema store.
588    ///
589    /// Normal schema publication writes only the live projection. Canonical
590    /// stable schema history is updated by future journal fold/recovery paths.
591    #[must_use]
592    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
593        Self {
594            backend: SchemaStoreBackend::Journaled {
595                canonical: StableBTreeMap::init(memory),
596                live: StdBTreeMap::new(),
597                tombstones: BTreeSet::new(),
598            },
599        }
600    }
601
602    /// Insert or replace one typed persisted schema snapshot.
603    pub(in crate::db) fn insert_persisted_snapshot(
604        &mut self,
605        entity: EntityTag,
606        snapshot: &PersistedSchemaSnapshot,
607    ) -> Result<(), InternalError> {
608        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
609        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
610        let _ = self.insert_raw_snapshot(key, raw_snapshot);
611
612        Ok(())
613    }
614
615    /// Insert one typed persisted schema snapshot only if the current live
616    /// accepted catalog identity still matches the identity captured before
617    /// schema mutation planning.
618    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
619        &mut self,
620        expected: AcceptedCatalogIdentity,
621        snapshot: &PersistedSchemaSnapshot,
622    ) -> Result<(), InternalError> {
623        let live = self.latest_catalog_identity(
624            expected.entity_tag(),
625            expected.entity_path(),
626            expected.store_path(),
627        )?;
628        if live
629            .as_ref()
630            .map(AcceptedCatalogSnapshotSelection::identity)
631            != Some(expected)
632        {
633            return Err(InternalError::schema_ddl_publication_race_lost(
634                expected.entity_path(),
635            ));
636        }
637
638        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
639    }
640
641    /// Reset the volatile projection for journaled recovery without mutating
642    /// the canonical stable schema base.
643    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
644        let SchemaStoreBackend::Journaled {
645            live, tombstones, ..
646        } = &mut self.backend
647        else {
648            return Err(InternalError::store_invariant());
649        };
650
651        live.clear();
652        tombstones.clear();
653
654        Ok(())
655    }
656
657    /// Apply one folded journal schema snapshot into the canonical stable base.
658    pub(in crate::db) fn fold_persisted_snapshot(
659        &mut self,
660        entity: EntityTag,
661        snapshot: &PersistedSchemaSnapshot,
662    ) -> Result<(), InternalError> {
663        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
664            return Err(InternalError::store_invariant());
665        };
666
667        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
668        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
669        canonical.insert(key, raw_snapshot);
670
671        Ok(())
672    }
673
674    /// Load and decode one typed persisted schema snapshot.
675    #[cfg(test)]
676    pub(in crate::db) fn get_persisted_snapshot(
677        &self,
678        entity: EntityTag,
679        version: SchemaVersion,
680    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
681        let key = RawSchemaKey::from_entity_version(entity, version);
682        self.get_raw_snapshot(&key)
683            .map(|snapshot| snapshot.decode_persisted_snapshot())
684            .transpose()
685    }
686
687    /// Load and decode the highest stored schema snapshot version for one entity.
688    pub(in crate::db) fn latest_persisted_snapshot(
689        &self,
690        entity: EntityTag,
691    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
692        self.latest_raw_snapshot(entity)
693            .map(|snapshot| snapshot.decode_persisted_snapshot())
694            .transpose()
695    }
696
697    /// Return the latest accepted catalog identity for one entity without
698    /// decoding the selected schema snapshot.
699    pub(in crate::db) fn latest_catalog_identity(
700        &self,
701        entity: EntityTag,
702        entity_path: &'static str,
703        store_path: &'static str,
704    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
705        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
706            return Ok(None);
707        };
708        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
709        let identity =
710            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
711
712        Ok(Some(AcceptedCatalogSnapshotSelection::new(
713            identity,
714            raw_snapshot.into_bytes(),
715        )))
716    }
717
718    /// Return raw schema-store footprint facts for one entity.
719    #[must_use]
720    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
721        let mut snapshots = 0u64;
722        let mut encoded_bytes = 0u64;
723        let mut latest = None::<(SchemaVersion, u64)>;
724
725        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
726            if key.entity_tag() != entity {
727                return Ok(SchemaStoreVisit::Continue);
728            }
729
730            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
731            snapshots = snapshots.saturating_add(1);
732            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
733
734            let version = SchemaVersion::new(key.version());
735            if latest
736                .as_ref()
737                .is_none_or(|(latest_version, _)| version > *latest_version)
738            {
739                latest = Some((version, snapshot_bytes));
740            }
741            Ok(SchemaStoreVisit::Continue)
742        });
743
744        SchemaStoreFootprint::new(
745            snapshots,
746            encoded_bytes,
747            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
748        )
749    }
750
751    /// Derive accepted catalog metadata from latest persisted schema snapshots.
752    ///
753    /// This function intentionally reads only the persisted schema store. It
754    /// does not reconstruct metadata from generated models when the store has
755    /// no accepted snapshots.
756    #[cfg(test)]
757    pub(in crate::db) fn catalog_metadata(
758        &self,
759    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
760        Ok(self
761            .allocation_metadata()?
762            .map(SchemaStoreAllocationMetadata::schema))
763    }
764
765    /// Derive role-specific allocation metadata from latest persisted schema
766    /// snapshots.
767    ///
768    /// This function intentionally reads only accepted schema-store payloads.
769    /// It never reconstructs metadata from generated models when the store has
770    /// no accepted snapshots.
771    pub(in crate::db) fn allocation_metadata(
772        &self,
773    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
774        let latest_by_entity = self.latest_raw_snapshots_by_entity();
775        if latest_by_entity.is_empty() {
776            return Ok(None);
777        }
778
779        Ok(Some(SchemaStoreAllocationMetadata::new(
780            derive_data_allocation_metadata(&latest_by_entity)?,
781            derive_index_allocation_metadata(&latest_by_entity)?,
782            derive_schema_catalog_metadata(&latest_by_entity)?,
783        )))
784    }
785
786    /// Insert or replace one raw schema snapshot.
787    fn insert_raw_snapshot(
788        &mut self,
789        key: RawSchemaKey,
790        snapshot: RawSchemaSnapshot,
791    ) -> Option<RawSchemaSnapshot> {
792        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
793            self.get_raw_snapshot_for_backend(&key)
794        } else {
795            None
796        };
797        match &mut self.backend {
798            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
799            SchemaStoreBackend::Journaled {
800                live, tombstones, ..
801            } => {
802                tombstones.remove(&key);
803                live.insert(key, snapshot);
804                previous_journaled
805            }
806        }
807    }
808
809    /// Load one raw schema snapshot by key.
810    #[must_use]
811    #[cfg(test)]
812    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
813        match &self.backend {
814            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
815            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
816        }
817    }
818
819    /// Return whether one schema snapshot key is present.
820    #[must_use]
821    #[cfg(test)]
822    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
823        match &self.backend {
824            SchemaStoreBackend::Heap(map) => map.contains_key(key),
825            SchemaStoreBackend::Journaled { .. } => {
826                self.get_raw_snapshot_for_backend(key).is_some()
827            }
828        }
829    }
830
831    /// Return the number of schema snapshot entries in this store.
832    #[must_use]
833    #[cfg(test)]
834    pub(in crate::db) fn len(&self) -> u64 {
835        match &self.backend {
836            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
837            SchemaStoreBackend::Journaled { .. } => {
838                let mut count = 0_u64;
839                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
840                    count = count.saturating_add(1);
841                    Ok(SchemaStoreVisit::Continue)
842                });
843                count
844            }
845        }
846    }
847
848    /// Return whether this schema store currently has no persisted snapshots.
849    #[must_use]
850    #[cfg(test)]
851    pub(in crate::db) fn is_empty(&self) -> bool {
852        match &self.backend {
853            SchemaStoreBackend::Heap(map) => map.is_empty(),
854            SchemaStoreBackend::Journaled { .. } => {
855                let mut empty = true;
856                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
857                    empty = false;
858                    Ok(SchemaStoreVisit::Stop)
859                });
860                empty
861            }
862        }
863    }
864
865    /// Clear all schema metadata entries from the store.
866    #[cfg(test)]
867    pub(in crate::db) fn clear(&mut self) {
868        match &mut self.backend {
869            SchemaStoreBackend::Heap(map) => map.clear(),
870            SchemaStoreBackend::Journaled {
871                canonical,
872                live,
873                tombstones,
874            } => {
875                live.clear();
876                tombstones.clear();
877                for entry in canonical.iter() {
878                    tombstones.insert(*entry.key());
879                }
880            }
881        }
882    }
883
884    fn latest_raw_snapshots_by_entity(
885        &self,
886    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
887        #[cfg(test)]
888        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
889
890        let mut latest_by_entity =
891            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
892
893        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
894            let version = SchemaVersion::new(key.version());
895            match latest_by_entity.get_mut(&key.entity_tag()) {
896                Some((latest_version, latest_snapshot)) if version > *latest_version => {
897                    *latest_version = version;
898                    *latest_snapshot = snapshot.clone();
899                }
900                None => {
901                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
902                }
903                Some(_) => {}
904            }
905            Ok(SchemaStoreVisit::Continue)
906        });
907
908        latest_by_entity
909    }
910
911    /// Visit raw schema snapshots in canonical store order without exposing
912    /// the backing stable-map iterator.
913    fn visit_raw_snapshots<E>(
914        &self,
915        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
916    ) -> Result<(), E> {
917        match &self.backend {
918            SchemaStoreBackend::Heap(map) => {
919                let mut visitor = visitor;
920                for (key, snapshot) in map {
921                    if visitor(key, snapshot)?.should_stop() {
922                        break;
923                    }
924                }
925            }
926            SchemaStoreBackend::Journaled {
927                canonical,
928                live,
929                tombstones,
930            } => Self::visit_journaled_raw_snapshot_range(
931                canonical,
932                live,
933                tombstones,
934                (RangeBound::Unbounded, RangeBound::Unbounded),
935                Direction::Asc,
936                visitor,
937            )?,
938        }
939
940        Ok(())
941    }
942
943    #[cfg(test)]
944    #[must_use]
945    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
946        match &self.backend {
947            SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
948            SchemaStoreBackend::Heap(_) => 0,
949        }
950    }
951
952    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
953        let SchemaStoreBackend::Journaled {
954            canonical,
955            live,
956            tombstones,
957        } = &self.backend
958        else {
959            return None;
960        };
961
962        if tombstones.contains(key) {
963            return None;
964        }
965        live.get(key).cloned().or_else(|| canonical.get(key))
966    }
967
968    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
969        self.latest_raw_snapshot_entry(entity)
970            .map(|(_, snapshot)| snapshot)
971    }
972
973    fn latest_raw_snapshot_entry(
974        &self,
975        entity: EntityTag,
976    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
977        let bounds = RawSchemaKey::entity_range_bounds(entity);
978        match &self.backend {
979            SchemaStoreBackend::Heap(map) => map
980                .range((bounds.0, bounds.1))
981                .next_back()
982                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
983            SchemaStoreBackend::Journaled {
984                canonical,
985                live,
986                tombstones,
987            } => {
988                let mut latest = None;
989                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
990                    canonical,
991                    live,
992                    tombstones,
993                    bounds,
994                    Direction::Desc,
995                    |key, snapshot| {
996                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
997                        Ok(SchemaStoreVisit::Stop)
998                    },
999                );
1000                latest
1001            }
1002        }
1003    }
1004
1005    fn visit_journaled_raw_snapshot_range<E>(
1006        canonical: &StableBTreeMap<
1007            RawSchemaKey,
1008            RawSchemaSnapshot,
1009            VirtualMemory<DefaultMemoryImpl>,
1010        >,
1011        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1012        tombstones: &BTreeSet<RawSchemaKey>,
1013        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1014        direction: Direction,
1015        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1016    ) -> Result<(), E> {
1017        match direction {
1018            Direction::Asc => visit_ordered_overlay(
1019                canonical.range((bounds.0, bounds.1)),
1020                live.range((bounds.0, bounds.1)),
1021                Direction::Asc,
1022                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1023                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1024                |live_entry| !tombstones.contains(live_entry.0),
1025                |entry| {
1026                    let visit = match entry {
1027                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1028                            visitor(canonical_entry.key(), &canonical_entry.value())?
1029                        }
1030                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1031                    };
1032                    Ok(if visit.should_stop() {
1033                        OrderedOverlayVisit::Stop
1034                    } else {
1035                        OrderedOverlayVisit::Continue
1036                    })
1037                },
1038            ),
1039            Direction::Desc => visit_ordered_overlay(
1040                canonical.range((bounds.0, bounds.1)).rev(),
1041                live.range((bounds.0, bounds.1)).rev(),
1042                Direction::Desc,
1043                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1044                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1045                |live_entry| !tombstones.contains(live_entry.0),
1046                |entry| {
1047                    let visit = match entry {
1048                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1049                            visitor(canonical_entry.key(), &canonical_entry.value())?
1050                        }
1051                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1052                    };
1053                    Ok(if visit.should_stop() {
1054                        OrderedOverlayVisit::Stop
1055                    } else {
1056                        OrderedOverlayVisit::Continue
1057                    })
1058                },
1059            ),
1060        }
1061    }
1062}
1063
1064fn derive_data_allocation_metadata(
1065    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1066) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1067    let mut max_version = SchemaVersion::initial();
1068    let mut hasher = new_hash_sha256();
1069    write_hash_tag_u8(
1070        &mut hasher,
1071        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1072    );
1073
1074    for (entity, (_, snapshot)) in latest_by_entity {
1075        let persisted = snapshot.decode_persisted_snapshot()?;
1076        if persisted.version() > max_version {
1077            max_version = persisted.version();
1078        }
1079
1080        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1081            persisted.version(),
1082            persisted.entity_path().to_string(),
1083            persisted.entity_name().to_string(),
1084            persisted.primary_key_field_ids().to_vec(),
1085            persisted.row_layout().clone(),
1086            persisted.fields().to_vec(),
1087            Vec::new(),
1088        );
1089        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1090
1091        write_hash_u64(&mut hasher, entity.value());
1092        write_hash_u32(&mut hasher, persisted.version().get());
1093        write_hash_len_u32(&mut hasher, encoded.len());
1094        hasher.update(encoded);
1095    }
1096
1097    Ok(finalize_schema_metadata(
1098        max_version,
1099        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1100        hasher,
1101        latest_by_entity.len(),
1102    ))
1103}
1104
1105fn derive_index_allocation_metadata(
1106    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1107) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1108    let mut max_version = SchemaVersion::initial();
1109    let mut hasher = new_hash_sha256();
1110    write_hash_tag_u8(
1111        &mut hasher,
1112        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1113    );
1114
1115    for (entity, (_, snapshot)) in latest_by_entity {
1116        let persisted = snapshot.decode_persisted_snapshot()?;
1117        if persisted.version() > max_version {
1118            max_version = persisted.version();
1119        }
1120
1121        write_hash_u64(&mut hasher, entity.value());
1122        write_hash_u32(&mut hasher, persisted.version().get());
1123        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1124        for index in persisted.indexes() {
1125            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1126            write_hash_str_u32(&mut hasher, index.name());
1127            write_hash_str_u32(&mut hasher, index.store());
1128            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1129            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1130            match index.predicate_sql() {
1131                Some(predicate_sql) => {
1132                    write_hash_tag_u8(&mut hasher, 1);
1133                    write_hash_str_u32(&mut hasher, predicate_sql);
1134                }
1135                None => write_hash_tag_u8(&mut hasher, 0),
1136            }
1137            hash_persisted_index_key(&mut hasher, index.key());
1138        }
1139    }
1140
1141    Ok(finalize_schema_metadata(
1142        max_version,
1143        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1144        hasher,
1145        latest_by_entity.len(),
1146    ))
1147}
1148
1149fn derive_schema_catalog_metadata(
1150    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1151) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1152    let mut max_version = SchemaVersion::initial();
1153    let mut hasher = new_hash_sha256();
1154    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1155
1156    for (entity, (version, snapshot)) in latest_by_entity {
1157        let persisted = snapshot.decode_persisted_snapshot()?;
1158        if persisted.version() > max_version {
1159            max_version = persisted.version();
1160        }
1161
1162        write_hash_u64(&mut hasher, entity.value());
1163        write_hash_u32(&mut hasher, version.get());
1164        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1165        hasher.update(snapshot.as_bytes());
1166    }
1167
1168    Ok(finalize_schema_metadata(
1169        max_version,
1170        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1171        hasher,
1172        latest_by_entity.len(),
1173    ))
1174}
1175
1176fn finalize_schema_metadata(
1177    schema_version: SchemaVersion,
1178    schema_fingerprint_method_version: u8,
1179    hasher: sha2::Sha256,
1180    entity_count: usize,
1181) -> SchemaStoreCatalogMetadata {
1182    let digest = finalize_hash_sha256(hasher);
1183    let mut schema_fingerprint = [0u8; 16];
1184    schema_fingerprint.copy_from_slice(&digest[..16]);
1185
1186    SchemaStoreCatalogMetadata::new(
1187        schema_version,
1188        schema_fingerprint_method_version,
1189        schema_fingerprint,
1190        u64::try_from(entity_count).unwrap_or(u64::MAX),
1191    )
1192}
1193
1194fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1195    match key {
1196        PersistedIndexKeySnapshot::FieldPath(paths) => {
1197            write_hash_tag_u8(hasher, 1);
1198            write_hash_len_u32(hasher, paths.len());
1199            for path in paths {
1200                hash_persisted_index_field_path(hasher, path);
1201            }
1202        }
1203        PersistedIndexKeySnapshot::Items(items) => {
1204            write_hash_tag_u8(hasher, 2);
1205            write_hash_len_u32(hasher, items.len());
1206            for item in items {
1207                match item {
1208                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1209                        write_hash_tag_u8(hasher, 1);
1210                        hash_persisted_index_field_path(hasher, path);
1211                    }
1212                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1213                        write_hash_tag_u8(hasher, 2);
1214                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1215                        hash_persisted_index_field_path(hasher, expression.source());
1216                        hash_persisted_field_kind(hasher, expression.input_kind());
1217                        hash_persisted_field_kind(hasher, expression.output_kind());
1218                        write_hash_str_u32(hasher, expression.canonical_text());
1219                    }
1220                }
1221            }
1222        }
1223    }
1224}
1225
1226fn hash_persisted_index_field_path(
1227    hasher: &mut sha2::Sha256,
1228    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1229) {
1230    write_hash_u32(hasher, path.field_id().get());
1231    write_hash_u32(hasher, u32::from(path.slot().get()));
1232    write_hash_len_u32(hasher, path.path().len());
1233    for segment in path.path() {
1234        write_hash_str_u32(hasher, segment);
1235    }
1236    hash_persisted_field_kind(hasher, path.kind());
1237    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1238}
1239
1240fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1241    match kind {
1242        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1243        PersistedFieldKind::Blob { max_len } => {
1244            write_hash_tag_u8(hasher, 2);
1245            hash_optional_u32(hasher, *max_len);
1246        }
1247        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1248        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1249        PersistedFieldKind::Decimal { scale } => {
1250            write_hash_tag_u8(hasher, 5);
1251            write_hash_u32(hasher, *scale);
1252        }
1253        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1254        PersistedFieldKind::Enum { path, variants } => {
1255            write_hash_tag_u8(hasher, 7);
1256            write_hash_str_u32(hasher, path);
1257            write_hash_len_u32(hasher, variants.len());
1258            for variant in variants {
1259                write_hash_str_u32(hasher, variant.ident());
1260                match variant.payload_kind() {
1261                    Some(payload_kind) => {
1262                        write_hash_tag_u8(hasher, 1);
1263                        hash_persisted_field_kind(hasher, payload_kind);
1264                    }
1265                    None => write_hash_tag_u8(hasher, 0),
1266                }
1267                write_hash_str_u32(
1268                    hasher,
1269                    field_storage_decode_name(variant.payload_storage_decode()),
1270                );
1271            }
1272        }
1273        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1274        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1275        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1276        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1277        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1278        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1279        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1280        PersistedFieldKind::IntBig { max_bytes } => {
1281            write_hash_tag_u8(hasher, 15);
1282            write_hash_u32(hasher, *max_bytes);
1283        }
1284        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1285        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1286        PersistedFieldKind::Text { max_len } => {
1287            write_hash_tag_u8(hasher, 18);
1288            hash_optional_u32(hasher, *max_len);
1289        }
1290        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1291        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1292        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1293        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1294        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1295        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1296        PersistedFieldKind::NatBig { max_bytes } => {
1297            write_hash_tag_u8(hasher, 25);
1298            write_hash_u32(hasher, *max_bytes);
1299        }
1300        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1301        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1302        PersistedFieldKind::Relation {
1303            target_path,
1304            target_entity_name,
1305            target_entity_tag,
1306            target_store_path,
1307            key_kind,
1308            strength,
1309        } => {
1310            write_hash_tag_u8(hasher, 28);
1311            write_hash_str_u32(hasher, target_path);
1312            write_hash_str_u32(hasher, target_entity_name);
1313            write_hash_u64(hasher, target_entity_tag.value());
1314            write_hash_str_u32(hasher, target_store_path);
1315            hash_persisted_field_kind(hasher, key_kind);
1316            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1317        }
1318        PersistedFieldKind::List(inner) => {
1319            write_hash_tag_u8(hasher, 29);
1320            hash_persisted_field_kind(hasher, inner);
1321        }
1322        PersistedFieldKind::Set(inner) => {
1323            write_hash_tag_u8(hasher, 30);
1324            hash_persisted_field_kind(hasher, inner);
1325        }
1326        PersistedFieldKind::Map { key, value } => {
1327            write_hash_tag_u8(hasher, 31);
1328            hash_persisted_field_kind(hasher, key);
1329            hash_persisted_field_kind(hasher, value);
1330        }
1331        PersistedFieldKind::Structured { queryable } => {
1332            write_hash_tag_u8(hasher, 32);
1333            write_hash_tag_u8(hasher, u8::from(*queryable));
1334        }
1335    }
1336}
1337
1338fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1339    match value {
1340        Some(value) => {
1341            write_hash_tag_u8(hasher, 1);
1342            write_hash_u32(hasher, value);
1343        }
1344        None => write_hash_tag_u8(hasher, 0),
1345    }
1346}
1347
1348const fn persisted_index_origin_name(
1349    origin: crate::db::schema::PersistedIndexOrigin,
1350) -> &'static str {
1351    match origin {
1352        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1353        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1354    }
1355}
1356
1357const fn persisted_expression_op_name(
1358    op: crate::db::schema::PersistedIndexExpressionOp,
1359) -> &'static str {
1360    match op {
1361        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1362        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1363        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1364        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1365        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1366        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1367        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1368        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1369    }
1370}
1371
1372const fn persisted_relation_strength_name(
1373    strength: crate::db::schema::PersistedRelationStrength,
1374) -> &'static str {
1375    match strength {
1376        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1377        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1378    }
1379}
1380
1381const fn field_storage_decode_name(
1382    decode: crate::model::field::FieldStorageDecode,
1383) -> &'static str {
1384    match decode {
1385        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1386        crate::model::field::FieldStorageDecode::Value => "value",
1387    }
1388}
1389
1390///
1391/// TESTS
1392///
1393
1394#[cfg(test)]
1395mod tests;