Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32: u32 = 25;
50
51#[cfg(test)]
52thread_local! {
53    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
54}
55
56#[cfg(test)]
57pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
58    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
59}
60
61#[cfg(test)]
62pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
63    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
64}
65
66///
67/// RawSchemaKey
68///
69/// Stable key for one persisted schema snapshot entry.
70/// It combines the entity tag and schema version so reconciliation can load
71/// concrete versions without depending on generated entity names.
72///
73
74#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
75struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
76
77impl RawSchemaKey {
78    /// Build the raw persisted key for one entity schema version.
79    #[must_use]
80    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
81        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
82        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
83        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
84
85        Self(out)
86    }
87
88    /// Return the entity tag encoded in this schema key.
89    #[must_use]
90    fn entity_tag(self) -> EntityTag {
91        let mut bytes = [0u8; size_of::<u64>()];
92        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
93
94        EntityTag::new(u64::from_be_bytes(bytes))
95    }
96
97    /// Return the schema version encoded in this schema key.
98    #[must_use]
99    fn version(self) -> u32 {
100        let mut bytes = [0u8; size_of::<u32>()];
101        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
102
103        u32::from_be_bytes(bytes)
104    }
105
106    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
107        (
108            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
109            RangeBound::Included(Self::from_entity_version(
110                entity,
111                SchemaVersion::new(u32::MAX),
112            )),
113        )
114    }
115}
116
117impl Storable for RawSchemaKey {
118    fn to_bytes(&self) -> Cow<'_, [u8]> {
119        Cow::Borrowed(&self.0)
120    }
121
122    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
123        debug_assert_eq!(
124            bytes.len(),
125            SCHEMA_KEY_BYTES_USIZE,
126            "RawSchemaKey::from_bytes received unexpected byte length",
127        );
128
129        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
130            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
131        }
132
133        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
134        out.copy_from_slice(bytes.as_ref());
135        Self(out)
136    }
137
138    fn into_bytes(self) -> Vec<u8> {
139        self.0.to_vec()
140    }
141
142    const BOUND: StorableBound = StorableBound::Bounded {
143        max_size: SCHEMA_KEY_BYTES,
144        is_fixed_size: true,
145    };
146}
147
148///
149/// RawSchemaSnapshot
150///
151/// Raw persisted schema snapshot payload.
152/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
153/// keeping the stable-memory value boundary independent from the typed schema
154/// DTOs used by reconciliation.
155///
156
157#[derive(Clone, Debug, Eq, PartialEq)]
158struct RawSchemaSnapshot {
159    payload: Vec<u8>,
160    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
161}
162
163impl RawSchemaSnapshot {
164    /// Encode one typed persisted-schema snapshot into a raw store payload.
165    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
166        validate_typed_schema_snapshot_for_store(snapshot)?;
167
168        let accepted_schema_fingerprint =
169            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
170        let payload = encode_persisted_schema_snapshot(snapshot)?;
171
172        Ok(Self {
173            payload,
174            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
175        })
176    }
177
178    /// Build one raw schema snapshot from already-encoded bytes.
179    #[must_use]
180    #[cfg(test)]
181    const fn from_bytes(payload: Vec<u8>) -> Self {
182        Self {
183            payload,
184            accepted_schema_fingerprint: None,
185        }
186    }
187
188    /// Borrow the encoded schema snapshot payload.
189    #[must_use]
190    const fn as_bytes(&self) -> &[u8] {
191        self.payload.as_slice()
192    }
193
194    /// Consume the snapshot into its encoded payload bytes.
195    #[must_use]
196    fn into_bytes(self) -> Vec<u8> {
197        self.payload
198    }
199
200    /// Return the accepted schema identity fingerprint stored beside the raw
201    /// payload, without decoding the persisted snapshot.
202    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
203        self.accepted_schema_fingerprint
204            .ok_or_else(InternalError::store_corruption)
205    }
206
207    /// Decode this raw store payload into a typed persisted-schema snapshot.
208    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
209        decode_persisted_schema_snapshot(self.as_bytes())
210    }
211}
212
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214pub(in crate::db) struct AcceptedCatalogIdentity {
215    entity_tag: EntityTag,
216    entity_path: &'static str,
217    store_path: &'static str,
218    accepted_schema_version: SchemaVersion,
219    fingerprint_method_version: u8,
220    accepted_schema_fingerprint: CommitSchemaFingerprint,
221}
222
223impl AcceptedCatalogIdentity {
224    #[must_use]
225    pub(in crate::db) const fn new(
226        entity_tag: EntityTag,
227        entity_path: &'static str,
228        store_path: &'static str,
229        accepted_schema_version: SchemaVersion,
230        accepted_schema_fingerprint: CommitSchemaFingerprint,
231    ) -> Self {
232        Self {
233            entity_tag,
234            entity_path,
235            store_path,
236            accepted_schema_version,
237            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
238            accepted_schema_fingerprint,
239        }
240    }
241
242    #[must_use]
243    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
244        self.entity_tag
245    }
246
247    #[must_use]
248    pub(in crate::db) const fn entity_path(self) -> &'static str {
249        self.entity_path
250    }
251
252    #[must_use]
253    pub(in crate::db) const fn store_path(self) -> &'static str {
254        self.store_path
255    }
256
257    #[must_use]
258    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
259        self.accepted_schema_version
260    }
261
262    #[must_use]
263    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
264        self.fingerprint_method_version
265    }
266
267    #[must_use]
268    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
269        self.accepted_schema_fingerprint
270    }
271}
272
273#[derive(Clone, Debug, Eq, PartialEq)]
274pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
275    identity: AcceptedCatalogIdentity,
276    raw_snapshot: Vec<u8>,
277}
278
279impl AcceptedCatalogSnapshotSelection {
280    #[must_use]
281    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
282        Self {
283            identity,
284            raw_snapshot,
285        }
286    }
287
288    #[must_use]
289    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
290        self.identity
291    }
292
293    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
294        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
295        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
296        let identity = self.identity();
297
298        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
299            return Err(InternalError::store_invariant());
300        }
301        if accepted.entity_path() != identity.entity_path() {
302            return Err(InternalError::store_invariant());
303        }
304
305        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
306        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
307            return Err(InternalError::store_invariant());
308        }
309
310        Ok(accepted)
311    }
312}
313
314impl Storable for RawSchemaSnapshot {
315    fn to_bytes(&self) -> Cow<'_, [u8]> {
316        let Some(fingerprint) = self.accepted_schema_fingerprint else {
317            return Cow::Borrowed(self.as_bytes());
318        };
319
320        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
321        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
322        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
323        bytes.extend_from_slice(&fingerprint);
324        bytes.extend_from_slice(self.as_bytes());
325
326        Cow::Owned(bytes)
327    }
328
329    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
330        let bytes = bytes.into_owned();
331        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
332            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
333            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
334        {
335            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
336            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
337            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
338            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
339
340            return Self {
341                payload: bytes[fingerprint_end..].to_vec(),
342                accepted_schema_fingerprint: Some(fingerprint),
343            };
344        }
345
346        Self {
347            payload: bytes,
348            accepted_schema_fingerprint: None,
349        }
350    }
351
352    fn into_bytes(self) -> Vec<u8> {
353        let Some(fingerprint) = self.accepted_schema_fingerprint else {
354            return self.payload;
355        };
356
357        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
358        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
359        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
360        bytes.extend_from_slice(&fingerprint);
361        bytes.extend_from_slice(&self.payload);
362
363        bytes
364    }
365
366    const BOUND: StorableBound = StorableBound::Bounded {
367        max_size: MAX_SCHEMA_SNAPSHOT_BYTES + RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32,
368        is_fixed_size: false,
369    };
370}
371
372// Validate typed schema snapshots before they are encoded into the raw schema
373// metadata store. This catches caller-side invariant violations separately from
374// raw persisted-byte corruption handled by the codec decode boundary.
375fn validate_typed_schema_snapshot_for_store(
376    snapshot: &PersistedSchemaSnapshot,
377) -> Result<(), InternalError> {
378    if schema_snapshot_integrity_detail(
379        "schema snapshot",
380        snapshot.version(),
381        snapshot.primary_key_field_ids(),
382        snapshot.row_layout(),
383        snapshot.fields(),
384    )
385    .is_some()
386    {
387        return Err(InternalError::store_invariant());
388    }
389
390    Ok(())
391}
392
393///
394/// SchemaStoreFootprint
395///
396/// Current raw schema metadata footprint for one entity. Reconciliation uses
397/// this value to report stable-memory pressure without decoding schema payloads
398/// or exposing field-level metadata through metrics.
399///
400
401#[derive(Clone, Copy, Debug, Eq, PartialEq)]
402pub(in crate::db) struct SchemaStoreFootprint {
403    snapshots: u64,
404    encoded_bytes: u64,
405    latest_snapshot_bytes: u64,
406}
407
408///
409/// SchemaStoreCatalogMetadata
410///
411/// Accepted schema-store catalog metadata derived from latest persisted
412/// snapshots. This is diagnostic allocation metadata, not allocation identity.
413///
414
415#[derive(Clone, Copy, Debug, Eq, PartialEq)]
416pub(in crate::db) struct SchemaStoreCatalogMetadata {
417    schema_version: SchemaVersion,
418    schema_fingerprint_method_version: u8,
419    schema_fingerprint: CommitSchemaFingerprint,
420    entity_count: u64,
421}
422
423impl SchemaStoreCatalogMetadata {
424    /// Build catalog metadata from already-derived accepted schema facts.
425    #[must_use]
426    const fn new(
427        schema_version: SchemaVersion,
428        schema_fingerprint_method_version: u8,
429        schema_fingerprint: CommitSchemaFingerprint,
430        entity_count: u64,
431    ) -> Self {
432        Self {
433            schema_version,
434            schema_fingerprint_method_version,
435            schema_fingerprint,
436            entity_count,
437        }
438    }
439
440    /// Return the maximum latest schema version represented in the catalog.
441    #[must_use]
442    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
443        self.schema_version
444    }
445
446    /// Return the fingerprint method version for this diagnostic metadata row.
447    #[must_use]
448    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
449        self.schema_fingerprint_method_version
450    }
451
452    /// Return the deterministic catalog fingerprint for latest accepted
453    /// snapshots.
454    #[must_use]
455    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
456        self.schema_fingerprint
457    }
458
459    /// Return number of entity schemas represented in this catalog metadata.
460    #[must_use]
461    pub(in crate::db) const fn entity_count(self) -> u64 {
462        self.entity_count
463    }
464}
465
466///
467/// SchemaStoreAllocationMetadata
468///
469/// Role-specific allocation metadata derived from latest accepted schema-store
470/// snapshots. These fingerprints describe the accepted contract that owns each
471/// allocation role; they are diagnostics, not allocation identity.
472///
473
474#[derive(Clone, Copy, Debug, Eq, PartialEq)]
475pub(in crate::db) struct SchemaStoreAllocationMetadata {
476    data: SchemaStoreCatalogMetadata,
477    index: SchemaStoreCatalogMetadata,
478    schema: SchemaStoreCatalogMetadata,
479}
480
481impl SchemaStoreAllocationMetadata {
482    /// Build one role-specific metadata set from already-derived accepted
483    /// schema facts.
484    #[must_use]
485    const fn new(
486        data: SchemaStoreCatalogMetadata,
487        index: SchemaStoreCatalogMetadata,
488        schema: SchemaStoreCatalogMetadata,
489    ) -> Self {
490        Self {
491            data,
492            index,
493            schema,
494        }
495    }
496
497    /// Return accepted row-layout allocation metadata for data memory.
498    #[must_use]
499    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
500        self.data
501    }
502
503    /// Return accepted index-catalog allocation metadata for index memory.
504    #[must_use]
505    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
506        self.index
507    }
508
509    /// Return accepted full schema-catalog allocation metadata for schema
510    /// memory.
511    #[must_use]
512    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
513        self.schema
514    }
515}
516
517impl SchemaStoreFootprint {
518    /// Build one schema-store footprint from already-counted raw payload facts.
519    #[must_use]
520    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
521        Self {
522            snapshots,
523            encoded_bytes,
524            latest_snapshot_bytes,
525        }
526    }
527
528    /// Return the number of raw schema snapshots stored for the entity.
529    #[must_use]
530    pub(in crate::db) const fn snapshots(self) -> u64 {
531        self.snapshots
532    }
533
534    /// Return the total encoded payload bytes stored for the entity.
535    #[must_use]
536    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
537        self.encoded_bytes
538    }
539
540    /// Return the encoded payload bytes for the highest-version snapshot.
541    #[must_use]
542    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
543        self.latest_snapshot_bytes
544    }
545}
546
547///
548/// SchemaStore
549///
550/// Thin persistence wrapper over one journaled or heap schema metadata BTreeMap.
551/// Startup reconciliation writes and validates encoded schema snapshots here
552/// before row/index operations proceed.
553///
554
555pub struct SchemaStore {
556    backend: SchemaStoreBackend,
557}
558
559enum SchemaStoreBackend {
560    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
561    Journaled {
562        canonical:
563            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
564        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
565        tombstones: BTreeSet<RawSchemaKey>,
566    },
567}
568
569/// Control-flow result for schema-store traversal visitors.
570#[derive(Clone, Copy, Debug, Eq, PartialEq)]
571enum SchemaStoreVisit {
572    Continue,
573    #[allow(
574        dead_code,
575        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
576    )]
577    Stop,
578}
579
580impl SchemaStoreVisit {
581    const fn should_stop(self) -> bool {
582        matches!(self, Self::Stop)
583    }
584}
585
586impl SchemaStore {
587    /// Initialize a volatile heap-backed schema store.
588    #[must_use]
589    pub const fn init_heap() -> Self {
590        Self {
591            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
592        }
593    }
594
595    /// Initialize a journaled cached-stable schema store.
596    ///
597    /// Normal schema publication writes only the live projection. Canonical
598    /// stable schema history is updated by future journal fold/recovery paths.
599    #[must_use]
600    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
601        Self {
602            backend: SchemaStoreBackend::Journaled {
603                canonical: StableBTreeMap::init(memory),
604                live: StdBTreeMap::new(),
605                tombstones: BTreeSet::new(),
606            },
607        }
608    }
609
610    /// Insert or replace one typed persisted schema snapshot.
611    pub(in crate::db) fn insert_persisted_snapshot(
612        &mut self,
613        entity: EntityTag,
614        snapshot: &PersistedSchemaSnapshot,
615    ) -> Result<(), InternalError> {
616        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
617        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
618        let _ = self.insert_raw_snapshot(key, raw_snapshot);
619
620        Ok(())
621    }
622
623    /// Insert one typed persisted schema snapshot only if the current live
624    /// accepted catalog identity still matches the identity captured before
625    /// schema mutation planning.
626    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
627        &mut self,
628        expected: AcceptedCatalogIdentity,
629        snapshot: &PersistedSchemaSnapshot,
630    ) -> Result<(), InternalError> {
631        let live = self.latest_catalog_identity(
632            expected.entity_tag(),
633            expected.entity_path(),
634            expected.store_path(),
635        )?;
636        if live
637            .as_ref()
638            .map(AcceptedCatalogSnapshotSelection::identity)
639            != Some(expected)
640        {
641            return Err(InternalError::schema_ddl_publication_race_lost(
642                expected.entity_path(),
643            ));
644        }
645
646        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
647    }
648
649    /// Reset the volatile projection for journaled recovery without mutating
650    /// the canonical stable schema base.
651    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
652        let SchemaStoreBackend::Journaled {
653            live, tombstones, ..
654        } = &mut self.backend
655        else {
656            return Err(InternalError::store_invariant());
657        };
658
659        live.clear();
660        tombstones.clear();
661
662        Ok(())
663    }
664
665    /// Apply one folded journal schema snapshot into the canonical stable base.
666    pub(in crate::db) fn fold_persisted_snapshot(
667        &mut self,
668        entity: EntityTag,
669        snapshot: &PersistedSchemaSnapshot,
670    ) -> Result<(), InternalError> {
671        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
672            return Err(InternalError::store_invariant());
673        };
674
675        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
676        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
677        canonical.insert(key, raw_snapshot);
678
679        Ok(())
680    }
681
682    /// Load and decode one typed persisted schema snapshot.
683    #[cfg(test)]
684    pub(in crate::db) fn get_persisted_snapshot(
685        &self,
686        entity: EntityTag,
687        version: SchemaVersion,
688    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
689        let key = RawSchemaKey::from_entity_version(entity, version);
690        self.get_raw_snapshot(&key)
691            .map(|snapshot| snapshot.decode_persisted_snapshot())
692            .transpose()
693    }
694
695    /// Load and decode the highest stored schema snapshot version for one entity.
696    pub(in crate::db) fn latest_persisted_snapshot(
697        &self,
698        entity: EntityTag,
699    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
700        self.latest_raw_snapshot(entity)
701            .map(|snapshot| snapshot.decode_persisted_snapshot())
702            .transpose()
703    }
704
705    /// Return the latest accepted catalog identity for one entity without
706    /// decoding the selected schema snapshot.
707    pub(in crate::db) fn latest_catalog_identity(
708        &self,
709        entity: EntityTag,
710        entity_path: &'static str,
711        store_path: &'static str,
712    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
713        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
714            return Ok(None);
715        };
716        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
717        let identity =
718            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
719
720        Ok(Some(AcceptedCatalogSnapshotSelection::new(
721            identity,
722            raw_snapshot.into_bytes(),
723        )))
724    }
725
726    /// Return raw schema-store footprint facts for one entity.
727    #[must_use]
728    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
729        let mut snapshots = 0u64;
730        let mut encoded_bytes = 0u64;
731        let mut latest = None::<(SchemaVersion, u64)>;
732
733        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
734            if key.entity_tag() != entity {
735                return Ok(SchemaStoreVisit::Continue);
736            }
737
738            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
739            snapshots = snapshots.saturating_add(1);
740            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
741
742            let version = SchemaVersion::new(key.version());
743            if latest
744                .as_ref()
745                .is_none_or(|(latest_version, _)| version > *latest_version)
746            {
747                latest = Some((version, snapshot_bytes));
748            }
749            Ok(SchemaStoreVisit::Continue)
750        });
751
752        SchemaStoreFootprint::new(
753            snapshots,
754            encoded_bytes,
755            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
756        )
757    }
758
759    /// Derive accepted catalog metadata from latest persisted schema snapshots.
760    ///
761    /// This function intentionally reads only the persisted schema store. It
762    /// does not reconstruct metadata from generated models when the store has
763    /// no accepted snapshots.
764    #[cfg(test)]
765    pub(in crate::db) fn catalog_metadata(
766        &self,
767    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
768        Ok(self
769            .allocation_metadata()?
770            .map(SchemaStoreAllocationMetadata::schema))
771    }
772
773    /// Derive role-specific allocation metadata from latest persisted schema
774    /// snapshots.
775    ///
776    /// This function intentionally reads only accepted schema-store payloads.
777    /// It never reconstructs metadata from generated models when the store has
778    /// no accepted snapshots.
779    pub(in crate::db) fn allocation_metadata(
780        &self,
781    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
782        let latest_by_entity = self.latest_raw_snapshots_by_entity();
783        if latest_by_entity.is_empty() {
784            return Ok(None);
785        }
786
787        Ok(Some(SchemaStoreAllocationMetadata::new(
788            derive_data_allocation_metadata(&latest_by_entity)?,
789            derive_index_allocation_metadata(&latest_by_entity)?,
790            derive_schema_catalog_metadata(&latest_by_entity)?,
791        )))
792    }
793
794    /// Insert or replace one raw schema snapshot.
795    fn insert_raw_snapshot(
796        &mut self,
797        key: RawSchemaKey,
798        snapshot: RawSchemaSnapshot,
799    ) -> Option<RawSchemaSnapshot> {
800        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
801            self.get_raw_snapshot_for_backend(&key)
802        } else {
803            None
804        };
805        match &mut self.backend {
806            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
807            SchemaStoreBackend::Journaled {
808                live, tombstones, ..
809            } => {
810                tombstones.remove(&key);
811                live.insert(key, snapshot);
812                previous_journaled
813            }
814        }
815    }
816
817    /// Load one raw schema snapshot by key.
818    #[must_use]
819    #[cfg(test)]
820    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
821        match &self.backend {
822            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
823            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
824        }
825    }
826
827    /// Return whether one schema snapshot key is present.
828    #[must_use]
829    #[cfg(test)]
830    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
831        match &self.backend {
832            SchemaStoreBackend::Heap(map) => map.contains_key(key),
833            SchemaStoreBackend::Journaled { .. } => {
834                self.get_raw_snapshot_for_backend(key).is_some()
835            }
836        }
837    }
838
839    /// Return the number of schema snapshot entries in this store.
840    #[must_use]
841    #[cfg(test)]
842    pub(in crate::db) fn len(&self) -> u64 {
843        match &self.backend {
844            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
845            SchemaStoreBackend::Journaled { .. } => {
846                let mut count = 0_u64;
847                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
848                    count = count.saturating_add(1);
849                    Ok(SchemaStoreVisit::Continue)
850                });
851                count
852            }
853        }
854    }
855
856    /// Return whether this schema store currently has no persisted snapshots.
857    #[must_use]
858    #[cfg(test)]
859    pub(in crate::db) fn is_empty(&self) -> bool {
860        match &self.backend {
861            SchemaStoreBackend::Heap(map) => map.is_empty(),
862            SchemaStoreBackend::Journaled { .. } => {
863                let mut empty = true;
864                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
865                    empty = false;
866                    Ok(SchemaStoreVisit::Stop)
867                });
868                empty
869            }
870        }
871    }
872
873    /// Clear all schema metadata entries from the store.
874    #[cfg(test)]
875    pub(in crate::db) fn clear(&mut self) {
876        match &mut self.backend {
877            SchemaStoreBackend::Heap(map) => map.clear(),
878            SchemaStoreBackend::Journaled {
879                canonical,
880                live,
881                tombstones,
882            } => {
883                live.clear();
884                tombstones.clear();
885                for entry in canonical.iter() {
886                    tombstones.insert(*entry.key());
887                }
888            }
889        }
890    }
891
892    fn latest_raw_snapshots_by_entity(
893        &self,
894    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
895        #[cfg(test)]
896        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
897
898        let mut latest_by_entity =
899            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
900
901        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
902            let version = SchemaVersion::new(key.version());
903            match latest_by_entity.get_mut(&key.entity_tag()) {
904                Some((latest_version, latest_snapshot)) if version > *latest_version => {
905                    *latest_version = version;
906                    *latest_snapshot = snapshot.clone();
907                }
908                None => {
909                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
910                }
911                Some(_) => {}
912            }
913            Ok(SchemaStoreVisit::Continue)
914        });
915
916        latest_by_entity
917    }
918
919    /// Visit raw schema snapshots in canonical store order without exposing
920    /// the backing stable-map iterator.
921    fn visit_raw_snapshots<E>(
922        &self,
923        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
924    ) -> Result<(), E> {
925        match &self.backend {
926            SchemaStoreBackend::Heap(map) => {
927                let mut visitor = visitor;
928                for (key, snapshot) in map {
929                    if visitor(key, snapshot)?.should_stop() {
930                        break;
931                    }
932                }
933            }
934            SchemaStoreBackend::Journaled {
935                canonical,
936                live,
937                tombstones,
938            } => Self::visit_journaled_raw_snapshot_range(
939                canonical,
940                live,
941                tombstones,
942                (RangeBound::Unbounded, RangeBound::Unbounded),
943                Direction::Asc,
944                visitor,
945            )?,
946        }
947
948        Ok(())
949    }
950
951    #[cfg(test)]
952    #[must_use]
953    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
954        match &self.backend {
955            SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
956            SchemaStoreBackend::Heap(_) => 0,
957        }
958    }
959
960    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
961        let SchemaStoreBackend::Journaled {
962            canonical,
963            live,
964            tombstones,
965        } = &self.backend
966        else {
967            return None;
968        };
969
970        if tombstones.contains(key) {
971            return None;
972        }
973        live.get(key).cloned().or_else(|| canonical.get(key))
974    }
975
976    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
977        self.latest_raw_snapshot_entry(entity)
978            .map(|(_, snapshot)| snapshot)
979    }
980
981    fn latest_raw_snapshot_entry(
982        &self,
983        entity: EntityTag,
984    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
985        let bounds = RawSchemaKey::entity_range_bounds(entity);
986        match &self.backend {
987            SchemaStoreBackend::Heap(map) => map
988                .range((bounds.0, bounds.1))
989                .next_back()
990                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
991            SchemaStoreBackend::Journaled {
992                canonical,
993                live,
994                tombstones,
995            } => {
996                let mut latest = None;
997                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
998                    canonical,
999                    live,
1000                    tombstones,
1001                    bounds,
1002                    Direction::Desc,
1003                    |key, snapshot| {
1004                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1005                        Ok(SchemaStoreVisit::Stop)
1006                    },
1007                );
1008                latest
1009            }
1010        }
1011    }
1012
1013    fn visit_journaled_raw_snapshot_range<E>(
1014        canonical: &StableBTreeMap<
1015            RawSchemaKey,
1016            RawSchemaSnapshot,
1017            VirtualMemory<DefaultMemoryImpl>,
1018        >,
1019        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1020        tombstones: &BTreeSet<RawSchemaKey>,
1021        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1022        direction: Direction,
1023        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1024    ) -> Result<(), E> {
1025        match direction {
1026            Direction::Asc => visit_ordered_overlay(
1027                canonical.range((bounds.0, bounds.1)),
1028                live.range((bounds.0, bounds.1)),
1029                Direction::Asc,
1030                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1031                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1032                |live_entry| !tombstones.contains(live_entry.0),
1033                |entry| {
1034                    let visit = match entry {
1035                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1036                            visitor(canonical_entry.key(), &canonical_entry.value())?
1037                        }
1038                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1039                    };
1040                    Ok(if visit.should_stop() {
1041                        OrderedOverlayVisit::Stop
1042                    } else {
1043                        OrderedOverlayVisit::Continue
1044                    })
1045                },
1046            ),
1047            Direction::Desc => visit_ordered_overlay(
1048                canonical.range((bounds.0, bounds.1)).rev(),
1049                live.range((bounds.0, bounds.1)).rev(),
1050                Direction::Desc,
1051                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1052                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1053                |live_entry| !tombstones.contains(live_entry.0),
1054                |entry| {
1055                    let visit = match entry {
1056                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1057                            visitor(canonical_entry.key(), &canonical_entry.value())?
1058                        }
1059                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1060                    };
1061                    Ok(if visit.should_stop() {
1062                        OrderedOverlayVisit::Stop
1063                    } else {
1064                        OrderedOverlayVisit::Continue
1065                    })
1066                },
1067            ),
1068        }
1069    }
1070}
1071
1072fn derive_data_allocation_metadata(
1073    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1074) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1075    let mut max_version = SchemaVersion::initial();
1076    let mut hasher = new_hash_sha256();
1077    write_hash_tag_u8(
1078        &mut hasher,
1079        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1080    );
1081
1082    for (entity, (_, snapshot)) in latest_by_entity {
1083        let persisted = snapshot.decode_persisted_snapshot()?;
1084        if persisted.version() > max_version {
1085            max_version = persisted.version();
1086        }
1087
1088        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1089            persisted.version(),
1090            persisted.entity_path().to_string(),
1091            persisted.entity_name().to_string(),
1092            persisted.primary_key_field_ids().to_vec(),
1093            persisted.row_layout().clone(),
1094            persisted.fields().to_vec(),
1095            Vec::new(),
1096        );
1097        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1098
1099        write_hash_u64(&mut hasher, entity.value());
1100        write_hash_u32(&mut hasher, persisted.version().get());
1101        write_hash_len_u32(&mut hasher, encoded.len());
1102        hasher.update(encoded);
1103    }
1104
1105    Ok(finalize_schema_metadata(
1106        max_version,
1107        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1108        hasher,
1109        latest_by_entity.len(),
1110    ))
1111}
1112
1113fn derive_index_allocation_metadata(
1114    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1115) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1116    let mut max_version = SchemaVersion::initial();
1117    let mut hasher = new_hash_sha256();
1118    write_hash_tag_u8(
1119        &mut hasher,
1120        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1121    );
1122
1123    for (entity, (_, snapshot)) in latest_by_entity {
1124        let persisted = snapshot.decode_persisted_snapshot()?;
1125        if persisted.version() > max_version {
1126            max_version = persisted.version();
1127        }
1128
1129        write_hash_u64(&mut hasher, entity.value());
1130        write_hash_u32(&mut hasher, persisted.version().get());
1131        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1132        for index in persisted.indexes() {
1133            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1134            write_hash_str_u32(&mut hasher, index.name());
1135            write_hash_str_u32(&mut hasher, index.store());
1136            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1137            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1138            match index.predicate_sql() {
1139                Some(predicate_sql) => {
1140                    write_hash_tag_u8(&mut hasher, 1);
1141                    write_hash_str_u32(&mut hasher, predicate_sql);
1142                }
1143                None => write_hash_tag_u8(&mut hasher, 0),
1144            }
1145            hash_persisted_index_key(&mut hasher, index.key());
1146        }
1147    }
1148
1149    Ok(finalize_schema_metadata(
1150        max_version,
1151        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1152        hasher,
1153        latest_by_entity.len(),
1154    ))
1155}
1156
1157fn derive_schema_catalog_metadata(
1158    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1159) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1160    let mut max_version = SchemaVersion::initial();
1161    let mut hasher = new_hash_sha256();
1162    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1163
1164    for (entity, (version, snapshot)) in latest_by_entity {
1165        let persisted = snapshot.decode_persisted_snapshot()?;
1166        if persisted.version() > max_version {
1167            max_version = persisted.version();
1168        }
1169
1170        write_hash_u64(&mut hasher, entity.value());
1171        write_hash_u32(&mut hasher, version.get());
1172        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1173        hasher.update(snapshot.as_bytes());
1174    }
1175
1176    Ok(finalize_schema_metadata(
1177        max_version,
1178        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1179        hasher,
1180        latest_by_entity.len(),
1181    ))
1182}
1183
1184fn finalize_schema_metadata(
1185    schema_version: SchemaVersion,
1186    schema_fingerprint_method_version: u8,
1187    hasher: sha2::Sha256,
1188    entity_count: usize,
1189) -> SchemaStoreCatalogMetadata {
1190    let digest = finalize_hash_sha256(hasher);
1191    let mut schema_fingerprint = [0u8; 16];
1192    schema_fingerprint.copy_from_slice(&digest[..16]);
1193
1194    SchemaStoreCatalogMetadata::new(
1195        schema_version,
1196        schema_fingerprint_method_version,
1197        schema_fingerprint,
1198        u64::try_from(entity_count).unwrap_or(u64::MAX),
1199    )
1200}
1201
1202fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1203    match key {
1204        PersistedIndexKeySnapshot::FieldPath(paths) => {
1205            write_hash_tag_u8(hasher, 1);
1206            write_hash_len_u32(hasher, paths.len());
1207            for path in paths {
1208                hash_persisted_index_field_path(hasher, path);
1209            }
1210        }
1211        PersistedIndexKeySnapshot::Items(items) => {
1212            write_hash_tag_u8(hasher, 2);
1213            write_hash_len_u32(hasher, items.len());
1214            for item in items {
1215                match item {
1216                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1217                        write_hash_tag_u8(hasher, 1);
1218                        hash_persisted_index_field_path(hasher, path);
1219                    }
1220                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1221                        write_hash_tag_u8(hasher, 2);
1222                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1223                        hash_persisted_index_field_path(hasher, expression.source());
1224                        hash_persisted_field_kind(hasher, expression.input_kind());
1225                        hash_persisted_field_kind(hasher, expression.output_kind());
1226                        write_hash_str_u32(hasher, expression.canonical_text());
1227                    }
1228                }
1229            }
1230        }
1231    }
1232}
1233
1234fn hash_persisted_index_field_path(
1235    hasher: &mut sha2::Sha256,
1236    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1237) {
1238    write_hash_u32(hasher, path.field_id().get());
1239    write_hash_u32(hasher, u32::from(path.slot().get()));
1240    write_hash_len_u32(hasher, path.path().len());
1241    for segment in path.path() {
1242        write_hash_str_u32(hasher, segment);
1243    }
1244    hash_persisted_field_kind(hasher, path.kind());
1245    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1246}
1247
1248fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1249    match kind {
1250        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1251        PersistedFieldKind::Blob { max_len } => {
1252            write_hash_tag_u8(hasher, 2);
1253            hash_optional_u32(hasher, *max_len);
1254        }
1255        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1256        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1257        PersistedFieldKind::Decimal { scale } => {
1258            write_hash_tag_u8(hasher, 5);
1259            write_hash_u32(hasher, *scale);
1260        }
1261        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1262        PersistedFieldKind::Enum { path, variants } => {
1263            write_hash_tag_u8(hasher, 7);
1264            write_hash_str_u32(hasher, path);
1265            write_hash_len_u32(hasher, variants.len());
1266            for variant in variants {
1267                write_hash_str_u32(hasher, variant.ident());
1268                match variant.payload_kind() {
1269                    Some(payload_kind) => {
1270                        write_hash_tag_u8(hasher, 1);
1271                        hash_persisted_field_kind(hasher, payload_kind);
1272                    }
1273                    None => write_hash_tag_u8(hasher, 0),
1274                }
1275                write_hash_str_u32(
1276                    hasher,
1277                    field_storage_decode_name(variant.payload_storage_decode()),
1278                );
1279            }
1280        }
1281        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1282        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1283        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1284        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1285        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1286        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1287        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1288        PersistedFieldKind::IntBig { max_bytes } => {
1289            write_hash_tag_u8(hasher, 15);
1290            write_hash_u32(hasher, *max_bytes);
1291        }
1292        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1293        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1294        PersistedFieldKind::Text { max_len } => {
1295            write_hash_tag_u8(hasher, 18);
1296            hash_optional_u32(hasher, *max_len);
1297        }
1298        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1299        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1300        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1301        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1302        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1303        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1304        PersistedFieldKind::NatBig { max_bytes } => {
1305            write_hash_tag_u8(hasher, 25);
1306            write_hash_u32(hasher, *max_bytes);
1307        }
1308        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1309        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1310        PersistedFieldKind::Relation {
1311            target_path,
1312            target_entity_name,
1313            target_entity_tag,
1314            target_store_path,
1315            key_kind,
1316            strength,
1317        } => {
1318            write_hash_tag_u8(hasher, 28);
1319            write_hash_str_u32(hasher, target_path);
1320            write_hash_str_u32(hasher, target_entity_name);
1321            write_hash_u64(hasher, target_entity_tag.value());
1322            write_hash_str_u32(hasher, target_store_path);
1323            hash_persisted_field_kind(hasher, key_kind);
1324            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1325        }
1326        PersistedFieldKind::List(inner) => {
1327            write_hash_tag_u8(hasher, 29);
1328            hash_persisted_field_kind(hasher, inner);
1329        }
1330        PersistedFieldKind::Set(inner) => {
1331            write_hash_tag_u8(hasher, 30);
1332            hash_persisted_field_kind(hasher, inner);
1333        }
1334        PersistedFieldKind::Map { key, value } => {
1335            write_hash_tag_u8(hasher, 31);
1336            hash_persisted_field_kind(hasher, key);
1337            hash_persisted_field_kind(hasher, value);
1338        }
1339        PersistedFieldKind::Structured { queryable } => {
1340            write_hash_tag_u8(hasher, 32);
1341            write_hash_tag_u8(hasher, u8::from(*queryable));
1342        }
1343    }
1344}
1345
1346fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1347    match value {
1348        Some(value) => {
1349            write_hash_tag_u8(hasher, 1);
1350            write_hash_u32(hasher, value);
1351        }
1352        None => write_hash_tag_u8(hasher, 0),
1353    }
1354}
1355
1356const fn persisted_index_origin_name(
1357    origin: crate::db::schema::PersistedIndexOrigin,
1358) -> &'static str {
1359    match origin {
1360        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1361        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1362    }
1363}
1364
1365const fn persisted_expression_op_name(
1366    op: crate::db::schema::PersistedIndexExpressionOp,
1367) -> &'static str {
1368    match op {
1369        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1370        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1371        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1372        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1373        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1374        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1375        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1376        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1377    }
1378}
1379
1380const fn persisted_relation_strength_name(
1381    strength: crate::db::schema::PersistedRelationStrength,
1382) -> &'static str {
1383    match strength {
1384        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1385        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1386    }
1387}
1388
1389const fn field_storage_decode_name(
1390    decode: crate::model::field::FieldStorageDecode,
1391) -> &'static str {
1392    match decode {
1393        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1394        crate::model::field::FieldStorageDecode::Value => "value",
1395    }
1396}
1397
1398///
1399/// TESTS
1400///
1401
1402#[cfg(test)]
1403mod tests;