Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32: u32 = 25;
50
51#[cfg(test)]
52thread_local! {
53    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
54}
55
56#[cfg(test)]
57pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
58    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
59}
60
61#[cfg(test)]
62pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
63    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
64}
65
66///
67/// RawSchemaKey
68///
69/// Stable key for one persisted schema snapshot entry.
70/// It combines the entity tag and schema version so reconciliation can load
71/// concrete versions without depending on generated entity names.
72///
73
74#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
75struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
76
77impl RawSchemaKey {
78    /// Build the raw persisted key for one entity schema version.
79    #[must_use]
80    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
81        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
82        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
83        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
84
85        Self(out)
86    }
87
88    /// Return the entity tag encoded in this schema key.
89    #[must_use]
90    fn entity_tag(self) -> EntityTag {
91        let mut bytes = [0u8; size_of::<u64>()];
92        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
93
94        EntityTag::new(u64::from_be_bytes(bytes))
95    }
96
97    /// Return the schema version encoded in this schema key.
98    #[must_use]
99    fn version(self) -> u32 {
100        let mut bytes = [0u8; size_of::<u32>()];
101        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
102
103        u32::from_be_bytes(bytes)
104    }
105
106    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
107        (
108            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
109            RangeBound::Included(Self::from_entity_version(
110                entity,
111                SchemaVersion::new(u32::MAX),
112            )),
113        )
114    }
115}
116
117impl Storable for RawSchemaKey {
118    fn to_bytes(&self) -> Cow<'_, [u8]> {
119        Cow::Borrowed(&self.0)
120    }
121
122    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
123        debug_assert_eq!(
124            bytes.len(),
125            SCHEMA_KEY_BYTES_USIZE,
126            "RawSchemaKey::from_bytes received unexpected byte length",
127        );
128
129        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
130            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
131        }
132
133        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
134        out.copy_from_slice(bytes.as_ref());
135        Self(out)
136    }
137
138    fn into_bytes(self) -> Vec<u8> {
139        self.0.to_vec()
140    }
141
142    const BOUND: StorableBound = StorableBound::Bounded {
143        max_size: SCHEMA_KEY_BYTES,
144        is_fixed_size: true,
145    };
146}
147
148///
149/// RawSchemaSnapshot
150///
151/// Raw persisted schema snapshot payload.
152/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
153/// keeping the stable-memory value boundary independent from the typed schema
154/// DTOs used by reconciliation.
155///
156
157#[derive(Clone, Debug, Eq, PartialEq)]
158struct RawSchemaSnapshot {
159    payload: Vec<u8>,
160    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
161}
162
163impl RawSchemaSnapshot {
164    /// Encode one typed persisted-schema snapshot into a raw store payload.
165    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
166        validate_typed_schema_snapshot_for_store(snapshot)?;
167
168        let accepted_schema_fingerprint =
169            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
170        let payload = encode_persisted_schema_snapshot(snapshot)?;
171
172        Ok(Self {
173            payload,
174            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
175        })
176    }
177
178    /// Build one raw schema snapshot from already-encoded bytes.
179    #[must_use]
180    #[cfg(test)]
181    const fn from_bytes(payload: Vec<u8>) -> Self {
182        Self {
183            payload,
184            accepted_schema_fingerprint: None,
185        }
186    }
187
188    /// Borrow the encoded schema snapshot payload.
189    #[must_use]
190    const fn as_bytes(&self) -> &[u8] {
191        self.payload.as_slice()
192    }
193
194    /// Consume the snapshot into its encoded payload bytes.
195    #[must_use]
196    fn into_bytes(self) -> Vec<u8> {
197        self.payload
198    }
199
200    /// Return the accepted schema identity fingerprint stored beside the raw
201    /// payload, without decoding the persisted snapshot.
202    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
203        self.accepted_schema_fingerprint
204            .ok_or_else(InternalError::store_corruption)
205    }
206
207    /// Decode this raw store payload into a typed persisted-schema snapshot.
208    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
209        decode_persisted_schema_snapshot(self.as_bytes())
210    }
211}
212
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214pub(in crate::db) struct AcceptedCatalogIdentity {
215    entity_tag: EntityTag,
216    entity_path: &'static str,
217    store_path: &'static str,
218    accepted_schema_version: SchemaVersion,
219    fingerprint_method_version: u8,
220    accepted_schema_fingerprint: CommitSchemaFingerprint,
221}
222
223impl AcceptedCatalogIdentity {
224    #[must_use]
225    pub(in crate::db) const fn new(
226        entity_tag: EntityTag,
227        entity_path: &'static str,
228        store_path: &'static str,
229        accepted_schema_version: SchemaVersion,
230        accepted_schema_fingerprint: CommitSchemaFingerprint,
231    ) -> Self {
232        Self {
233            entity_tag,
234            entity_path,
235            store_path,
236            accepted_schema_version,
237            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
238            accepted_schema_fingerprint,
239        }
240    }
241
242    #[must_use]
243    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
244        self.entity_tag
245    }
246
247    #[must_use]
248    pub(in crate::db) const fn entity_path(self) -> &'static str {
249        self.entity_path
250    }
251
252    #[must_use]
253    pub(in crate::db) const fn store_path(self) -> &'static str {
254        self.store_path
255    }
256
257    #[must_use]
258    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
259        self.accepted_schema_version
260    }
261
262    #[must_use]
263    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
264        self.fingerprint_method_version
265    }
266
267    #[must_use]
268    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
269        self.accepted_schema_fingerprint
270    }
271}
272
273#[derive(Clone, Debug, Eq, PartialEq)]
274pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
275    identity: AcceptedCatalogIdentity,
276    raw_snapshot: Vec<u8>,
277}
278
279impl AcceptedCatalogSnapshotSelection {
280    #[must_use]
281    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
282        Self {
283            identity,
284            raw_snapshot,
285        }
286    }
287
288    #[must_use]
289    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
290        self.identity
291    }
292
293    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
294        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
295        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
296        let identity = self.identity();
297
298        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
299            return Err(InternalError::store_invariant());
300        }
301        if accepted.entity_path() != identity.entity_path() {
302            return Err(InternalError::store_invariant());
303        }
304
305        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
306        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
307            return Err(InternalError::store_invariant());
308        }
309
310        Ok(accepted)
311    }
312}
313
314impl Storable for RawSchemaSnapshot {
315    fn to_bytes(&self) -> Cow<'_, [u8]> {
316        let Some(fingerprint) = self.accepted_schema_fingerprint else {
317            return Cow::Borrowed(self.as_bytes());
318        };
319
320        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
321        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
322        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
323        bytes.extend_from_slice(&fingerprint);
324        bytes.extend_from_slice(self.as_bytes());
325
326        Cow::Owned(bytes)
327    }
328
329    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
330        let bytes = bytes.into_owned();
331        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
332            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
333            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
334        {
335            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
336            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
337            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
338            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
339
340            return Self {
341                payload: bytes[fingerprint_end..].to_vec(),
342                accepted_schema_fingerprint: Some(fingerprint),
343            };
344        }
345
346        Self {
347            payload: bytes,
348            accepted_schema_fingerprint: None,
349        }
350    }
351
352    fn into_bytes(self) -> Vec<u8> {
353        let Some(fingerprint) = self.accepted_schema_fingerprint else {
354            return self.payload;
355        };
356
357        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
358        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
359        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
360        bytes.extend_from_slice(&fingerprint);
361        bytes.extend_from_slice(&self.payload);
362
363        bytes
364    }
365
366    const BOUND: StorableBound = StorableBound::Bounded {
367        max_size: MAX_SCHEMA_SNAPSHOT_BYTES + RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32,
368        is_fixed_size: false,
369    };
370}
371
372// Validate typed schema snapshots before they are encoded into the raw schema
373// metadata store. This catches caller-side invariant violations separately from
374// raw persisted-byte corruption handled by the codec decode boundary.
375fn validate_typed_schema_snapshot_for_store(
376    snapshot: &PersistedSchemaSnapshot,
377) -> Result<(), InternalError> {
378    if schema_snapshot_integrity_detail(
379        "schema snapshot",
380        snapshot.version(),
381        snapshot.primary_key_field_ids(),
382        snapshot.row_layout(),
383        snapshot.fields(),
384    )
385    .is_some()
386    {
387        return Err(InternalError::store_invariant());
388    }
389
390    Ok(())
391}
392
393///
394/// SchemaStoreFootprint
395///
396/// Current raw schema metadata footprint for one entity. Reconciliation uses
397/// this value to report stable-memory pressure without decoding schema payloads
398/// or exposing field-level metadata through metrics.
399///
400
401#[derive(Clone, Copy, Debug, Eq, PartialEq)]
402pub(in crate::db) struct SchemaStoreFootprint {
403    snapshots: u64,
404    encoded_bytes: u64,
405    latest_snapshot_bytes: u64,
406}
407
408///
409/// SchemaStoreCatalogMetadata
410///
411/// Accepted schema-store catalog metadata derived from latest persisted
412/// snapshots. This is diagnostic allocation metadata, not allocation identity.
413///
414
415#[derive(Clone, Copy, Debug, Eq, PartialEq)]
416pub(in crate::db) struct SchemaStoreCatalogMetadata {
417    schema_version: SchemaVersion,
418    schema_fingerprint_method_version: u8,
419    schema_fingerprint: CommitSchemaFingerprint,
420    entity_count: u64,
421}
422
423impl SchemaStoreCatalogMetadata {
424    /// Build catalog metadata from already-derived accepted schema facts.
425    #[must_use]
426    const fn new(
427        schema_version: SchemaVersion,
428        schema_fingerprint_method_version: u8,
429        schema_fingerprint: CommitSchemaFingerprint,
430        entity_count: u64,
431    ) -> Self {
432        Self {
433            schema_version,
434            schema_fingerprint_method_version,
435            schema_fingerprint,
436            entity_count,
437        }
438    }
439
440    /// Return the maximum latest schema version represented in the catalog.
441    #[must_use]
442    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
443        self.schema_version
444    }
445
446    /// Return the fingerprint method version for this diagnostic metadata row.
447    #[must_use]
448    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
449        self.schema_fingerprint_method_version
450    }
451
452    /// Return the deterministic catalog fingerprint for latest accepted
453    /// snapshots.
454    #[must_use]
455    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
456        self.schema_fingerprint
457    }
458
459    /// Return number of entity schemas represented in this catalog metadata.
460    #[must_use]
461    pub(in crate::db) const fn entity_count(self) -> u64 {
462        self.entity_count
463    }
464}
465
466///
467/// SchemaStoreAllocationMetadata
468///
469/// Role-specific allocation metadata derived from latest accepted schema-store
470/// snapshots. These fingerprints describe the accepted contract that owns each
471/// allocation role; they are diagnostics, not allocation identity.
472///
473
474#[derive(Clone, Copy, Debug, Eq, PartialEq)]
475pub(in crate::db) struct SchemaStoreAllocationMetadata {
476    data: SchemaStoreCatalogMetadata,
477    index: SchemaStoreCatalogMetadata,
478    schema: SchemaStoreCatalogMetadata,
479}
480
481impl SchemaStoreAllocationMetadata {
482    /// Build one role-specific metadata set from already-derived accepted
483    /// schema facts.
484    #[must_use]
485    const fn new(
486        data: SchemaStoreCatalogMetadata,
487        index: SchemaStoreCatalogMetadata,
488        schema: SchemaStoreCatalogMetadata,
489    ) -> Self {
490        Self {
491            data,
492            index,
493            schema,
494        }
495    }
496
497    /// Return accepted row-layout allocation metadata for data memory.
498    #[must_use]
499    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
500        self.data
501    }
502
503    /// Return accepted index-catalog allocation metadata for index memory.
504    #[must_use]
505    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
506        self.index
507    }
508
509    /// Return accepted full schema-catalog allocation metadata for schema
510    /// memory.
511    #[must_use]
512    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
513        self.schema
514    }
515}
516
517impl SchemaStoreFootprint {
518    /// Build one schema-store footprint from already-counted raw payload facts.
519    #[must_use]
520    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
521        Self {
522            snapshots,
523            encoded_bytes,
524            latest_snapshot_bytes,
525        }
526    }
527
528    /// Return the number of raw schema snapshots stored for the entity.
529    #[must_use]
530    pub(in crate::db) const fn snapshots(self) -> u64 {
531        self.snapshots
532    }
533
534    /// Return the total encoded payload bytes stored for the entity.
535    #[must_use]
536    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
537        self.encoded_bytes
538    }
539
540    /// Return the encoded payload bytes for the highest-version snapshot.
541    #[must_use]
542    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
543        self.latest_snapshot_bytes
544    }
545}
546
547///
548/// SchemaStore
549///
550/// Thin persistence wrapper over one stable or heap schema metadata BTreeMap.
551/// Startup reconciliation writes and validates encoded schema snapshots here
552/// before row/index operations proceed.
553///
554
555pub struct SchemaStore {
556    backend: SchemaStoreBackend,
557}
558
559enum SchemaStoreBackend {
560    Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
561    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
562    Journaled {
563        canonical:
564            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
565        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
566        tombstones: BTreeSet<RawSchemaKey>,
567    },
568}
569
570/// Control-flow result for schema-store traversal visitors.
571#[derive(Clone, Copy, Debug, Eq, PartialEq)]
572enum SchemaStoreVisit {
573    Continue,
574    #[allow(
575        dead_code,
576        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
577    )]
578    Stop,
579}
580
581impl SchemaStoreVisit {
582    const fn should_stop(self) -> bool {
583        matches!(self, Self::Stop)
584    }
585}
586
587impl SchemaStore {
588    /// Initialize the schema store with the provided backing memory.
589    #[must_use]
590    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
591        Self {
592            backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
593        }
594    }
595
596    /// Initialize a volatile heap-backed schema store.
597    #[must_use]
598    pub const fn init_heap() -> Self {
599        Self {
600            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
601        }
602    }
603
604    /// Initialize a journaled cached-stable schema store.
605    ///
606    /// Normal schema publication writes only the live projection. Canonical
607    /// stable schema history is updated by future journal fold/recovery paths.
608    #[must_use]
609    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
610        Self {
611            backend: SchemaStoreBackend::Journaled {
612                canonical: StableBTreeMap::init(memory),
613                live: StdBTreeMap::new(),
614                tombstones: BTreeSet::new(),
615            },
616        }
617    }
618
619    /// Insert or replace one typed persisted schema snapshot.
620    pub(in crate::db) fn insert_persisted_snapshot(
621        &mut self,
622        entity: EntityTag,
623        snapshot: &PersistedSchemaSnapshot,
624    ) -> Result<(), InternalError> {
625        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
626        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
627        let _ = self.insert_raw_snapshot(key, raw_snapshot);
628
629        Ok(())
630    }
631
632    /// Insert one typed persisted schema snapshot only if the current live
633    /// accepted catalog identity still matches the identity captured before
634    /// schema mutation planning.
635    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
636        &mut self,
637        expected: AcceptedCatalogIdentity,
638        snapshot: &PersistedSchemaSnapshot,
639    ) -> Result<(), InternalError> {
640        let live = self.latest_catalog_identity(
641            expected.entity_tag(),
642            expected.entity_path(),
643            expected.store_path(),
644        )?;
645        if live
646            .as_ref()
647            .map(AcceptedCatalogSnapshotSelection::identity)
648            != Some(expected)
649        {
650            return Err(InternalError::schema_ddl_publication_race_lost(
651                expected.entity_path(),
652            ));
653        }
654
655        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
656    }
657
658    /// Reset the volatile projection for journaled recovery without mutating
659    /// the canonical stable schema base.
660    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
661        let SchemaStoreBackend::Journaled {
662            live, tombstones, ..
663        } = &mut self.backend
664        else {
665            return Err(InternalError::store_invariant());
666        };
667
668        live.clear();
669        tombstones.clear();
670
671        Ok(())
672    }
673
674    /// Apply one folded journal schema snapshot into the canonical stable base.
675    pub(in crate::db) fn fold_persisted_snapshot(
676        &mut self,
677        entity: EntityTag,
678        snapshot: &PersistedSchemaSnapshot,
679    ) -> Result<(), InternalError> {
680        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
681            return Err(InternalError::store_invariant());
682        };
683
684        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
685        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
686        canonical.insert(key, raw_snapshot);
687
688        Ok(())
689    }
690
691    /// Load and decode one typed persisted schema snapshot.
692    #[cfg(test)]
693    pub(in crate::db) fn get_persisted_snapshot(
694        &self,
695        entity: EntityTag,
696        version: SchemaVersion,
697    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
698        let key = RawSchemaKey::from_entity_version(entity, version);
699        self.get_raw_snapshot(&key)
700            .map(|snapshot| snapshot.decode_persisted_snapshot())
701            .transpose()
702    }
703
704    /// Load and decode the highest stored schema snapshot version for one entity.
705    pub(in crate::db) fn latest_persisted_snapshot(
706        &self,
707        entity: EntityTag,
708    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
709        self.latest_raw_snapshot(entity)
710            .map(|snapshot| snapshot.decode_persisted_snapshot())
711            .transpose()
712    }
713
714    /// Return the latest accepted catalog identity for one entity without
715    /// decoding the selected schema snapshot.
716    pub(in crate::db) fn latest_catalog_identity(
717        &self,
718        entity: EntityTag,
719        entity_path: &'static str,
720        store_path: &'static str,
721    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
722        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
723            return Ok(None);
724        };
725        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
726        let identity =
727            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
728
729        Ok(Some(AcceptedCatalogSnapshotSelection::new(
730            identity,
731            raw_snapshot.into_bytes(),
732        )))
733    }
734
735    /// Return raw schema-store footprint facts for one entity.
736    #[must_use]
737    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
738        let mut snapshots = 0u64;
739        let mut encoded_bytes = 0u64;
740        let mut latest = None::<(SchemaVersion, u64)>;
741
742        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
743            if key.entity_tag() != entity {
744                return Ok(SchemaStoreVisit::Continue);
745            }
746
747            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
748            snapshots = snapshots.saturating_add(1);
749            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
750
751            let version = SchemaVersion::new(key.version());
752            if latest
753                .as_ref()
754                .is_none_or(|(latest_version, _)| version > *latest_version)
755            {
756                latest = Some((version, snapshot_bytes));
757            }
758            Ok(SchemaStoreVisit::Continue)
759        });
760
761        SchemaStoreFootprint::new(
762            snapshots,
763            encoded_bytes,
764            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
765        )
766    }
767
768    /// Derive accepted catalog metadata from latest persisted schema snapshots.
769    ///
770    /// This function intentionally reads only the persisted schema store. It
771    /// does not reconstruct metadata from generated models when the store has
772    /// no accepted snapshots.
773    #[cfg(test)]
774    pub(in crate::db) fn catalog_metadata(
775        &self,
776    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
777        Ok(self
778            .allocation_metadata()?
779            .map(SchemaStoreAllocationMetadata::schema))
780    }
781
782    /// Derive role-specific allocation metadata from latest persisted schema
783    /// snapshots.
784    ///
785    /// This function intentionally reads only accepted schema-store payloads.
786    /// It never reconstructs metadata from generated models when the store has
787    /// no accepted snapshots.
788    pub(in crate::db) fn allocation_metadata(
789        &self,
790    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
791        let latest_by_entity = self.latest_raw_snapshots_by_entity();
792        if latest_by_entity.is_empty() {
793            return Ok(None);
794        }
795
796        Ok(Some(SchemaStoreAllocationMetadata::new(
797            derive_data_allocation_metadata(&latest_by_entity)?,
798            derive_index_allocation_metadata(&latest_by_entity)?,
799            derive_schema_catalog_metadata(&latest_by_entity)?,
800        )))
801    }
802
803    /// Insert or replace one raw schema snapshot.
804    fn insert_raw_snapshot(
805        &mut self,
806        key: RawSchemaKey,
807        snapshot: RawSchemaSnapshot,
808    ) -> Option<RawSchemaSnapshot> {
809        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
810            self.get_raw_snapshot_for_backend(&key)
811        } else {
812            None
813        };
814        match &mut self.backend {
815            SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
816            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
817            SchemaStoreBackend::Journaled {
818                live, tombstones, ..
819            } => {
820                tombstones.remove(&key);
821                live.insert(key, snapshot);
822                previous_journaled
823            }
824        }
825    }
826
827    /// Load one raw schema snapshot by key.
828    #[must_use]
829    #[cfg(test)]
830    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
831        match &self.backend {
832            SchemaStoreBackend::Stable(map) => map.get(key),
833            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
834            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
835        }
836    }
837
838    /// Return whether one schema snapshot key is present.
839    #[must_use]
840    #[cfg(test)]
841    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
842        match &self.backend {
843            SchemaStoreBackend::Stable(map) => map.contains_key(key),
844            SchemaStoreBackend::Heap(map) => map.contains_key(key),
845            SchemaStoreBackend::Journaled { .. } => {
846                self.get_raw_snapshot_for_backend(key).is_some()
847            }
848        }
849    }
850
851    /// Return the number of schema snapshot entries in this store.
852    #[must_use]
853    #[cfg(test)]
854    pub(in crate::db) fn len(&self) -> u64 {
855        match &self.backend {
856            SchemaStoreBackend::Stable(map) => map.len(),
857            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
858            SchemaStoreBackend::Journaled { .. } => {
859                let mut count = 0_u64;
860                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
861                    count = count.saturating_add(1);
862                    Ok(SchemaStoreVisit::Continue)
863                });
864                count
865            }
866        }
867    }
868
869    /// Return whether this schema store currently has no persisted snapshots.
870    #[must_use]
871    #[cfg(test)]
872    pub(in crate::db) fn is_empty(&self) -> bool {
873        match &self.backend {
874            SchemaStoreBackend::Stable(map) => map.is_empty(),
875            SchemaStoreBackend::Heap(map) => map.is_empty(),
876            SchemaStoreBackend::Journaled { .. } => {
877                let mut empty = true;
878                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
879                    empty = false;
880                    Ok(SchemaStoreVisit::Stop)
881                });
882                empty
883            }
884        }
885    }
886
887    /// Clear all schema metadata entries from the store.
888    #[cfg(test)]
889    pub(in crate::db) fn clear(&mut self) {
890        match &mut self.backend {
891            SchemaStoreBackend::Stable(map) => map.clear_new(),
892            SchemaStoreBackend::Heap(map) => map.clear(),
893            SchemaStoreBackend::Journaled {
894                canonical,
895                live,
896                tombstones,
897            } => {
898                live.clear();
899                tombstones.clear();
900                for entry in canonical.iter() {
901                    tombstones.insert(*entry.key());
902                }
903            }
904        }
905    }
906
907    fn latest_raw_snapshots_by_entity(
908        &self,
909    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
910        #[cfg(test)]
911        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
912
913        let mut latest_by_entity =
914            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
915
916        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
917            let version = SchemaVersion::new(key.version());
918            match latest_by_entity.get_mut(&key.entity_tag()) {
919                Some((latest_version, latest_snapshot)) if version > *latest_version => {
920                    *latest_version = version;
921                    *latest_snapshot = snapshot.clone();
922                }
923                None => {
924                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
925                }
926                Some(_) => {}
927            }
928            Ok(SchemaStoreVisit::Continue)
929        });
930
931        latest_by_entity
932    }
933
934    /// Visit raw schema snapshots in canonical store order without exposing
935    /// the backing stable-map iterator.
936    fn visit_raw_snapshots<E>(
937        &self,
938        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
939    ) -> Result<(), E> {
940        match &self.backend {
941            SchemaStoreBackend::Stable(map) => {
942                let mut visitor = visitor;
943                for entry in map.iter() {
944                    if visitor(entry.key(), &entry.value())?.should_stop() {
945                        break;
946                    }
947                }
948            }
949            SchemaStoreBackend::Heap(map) => {
950                let mut visitor = visitor;
951                for (key, snapshot) in map {
952                    if visitor(key, snapshot)?.should_stop() {
953                        break;
954                    }
955                }
956            }
957            SchemaStoreBackend::Journaled {
958                canonical,
959                live,
960                tombstones,
961            } => Self::visit_journaled_raw_snapshot_range(
962                canonical,
963                live,
964                tombstones,
965                (RangeBound::Unbounded, RangeBound::Unbounded),
966                Direction::Asc,
967                visitor,
968            )?,
969        }
970
971        Ok(())
972    }
973
974    #[cfg(test)]
975    #[must_use]
976    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
977        match &self.backend {
978            SchemaStoreBackend::Stable(map)
979            | SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
980            SchemaStoreBackend::Heap(_) => 0,
981        }
982    }
983
984    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
985        let SchemaStoreBackend::Journaled {
986            canonical,
987            live,
988            tombstones,
989        } = &self.backend
990        else {
991            return None;
992        };
993
994        if tombstones.contains(key) {
995            return None;
996        }
997        live.get(key).cloned().or_else(|| canonical.get(key))
998    }
999
1000    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
1001        self.latest_raw_snapshot_entry(entity)
1002            .map(|(_, snapshot)| snapshot)
1003    }
1004
1005    fn latest_raw_snapshot_entry(
1006        &self,
1007        entity: EntityTag,
1008    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
1009        let bounds = RawSchemaKey::entity_range_bounds(entity);
1010        match &self.backend {
1011            SchemaStoreBackend::Stable(map) => map
1012                .range((bounds.0, bounds.1))
1013                .next_back()
1014                .map(|entry| (SchemaVersion::new(entry.key().version()), entry.value())),
1015            SchemaStoreBackend::Heap(map) => map
1016                .range((bounds.0, bounds.1))
1017                .next_back()
1018                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
1019            SchemaStoreBackend::Journaled {
1020                canonical,
1021                live,
1022                tombstones,
1023            } => {
1024                let mut latest = None;
1025                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
1026                    canonical,
1027                    live,
1028                    tombstones,
1029                    bounds,
1030                    Direction::Desc,
1031                    |key, snapshot| {
1032                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1033                        Ok(SchemaStoreVisit::Stop)
1034                    },
1035                );
1036                latest
1037            }
1038        }
1039    }
1040
1041    fn visit_journaled_raw_snapshot_range<E>(
1042        canonical: &StableBTreeMap<
1043            RawSchemaKey,
1044            RawSchemaSnapshot,
1045            VirtualMemory<DefaultMemoryImpl>,
1046        >,
1047        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1048        tombstones: &BTreeSet<RawSchemaKey>,
1049        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1050        direction: Direction,
1051        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1052    ) -> Result<(), E> {
1053        match direction {
1054            Direction::Asc => visit_ordered_overlay(
1055                canonical.range((bounds.0, bounds.1)),
1056                live.range((bounds.0, bounds.1)),
1057                Direction::Asc,
1058                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1059                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1060                |live_entry| !tombstones.contains(live_entry.0),
1061                |entry| {
1062                    let visit = match entry {
1063                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1064                            visitor(canonical_entry.key(), &canonical_entry.value())?
1065                        }
1066                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1067                    };
1068                    Ok(if visit.should_stop() {
1069                        OrderedOverlayVisit::Stop
1070                    } else {
1071                        OrderedOverlayVisit::Continue
1072                    })
1073                },
1074            ),
1075            Direction::Desc => visit_ordered_overlay(
1076                canonical.range((bounds.0, bounds.1)).rev(),
1077                live.range((bounds.0, bounds.1)).rev(),
1078                Direction::Desc,
1079                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1080                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1081                |live_entry| !tombstones.contains(live_entry.0),
1082                |entry| {
1083                    let visit = match entry {
1084                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1085                            visitor(canonical_entry.key(), &canonical_entry.value())?
1086                        }
1087                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1088                    };
1089                    Ok(if visit.should_stop() {
1090                        OrderedOverlayVisit::Stop
1091                    } else {
1092                        OrderedOverlayVisit::Continue
1093                    })
1094                },
1095            ),
1096        }
1097    }
1098}
1099
1100fn derive_data_allocation_metadata(
1101    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1102) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1103    let mut max_version = SchemaVersion::initial();
1104    let mut hasher = new_hash_sha256();
1105    write_hash_tag_u8(
1106        &mut hasher,
1107        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1108    );
1109
1110    for (entity, (_, snapshot)) in latest_by_entity {
1111        let persisted = snapshot.decode_persisted_snapshot()?;
1112        if persisted.version() > max_version {
1113            max_version = persisted.version();
1114        }
1115
1116        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1117            persisted.version(),
1118            persisted.entity_path().to_string(),
1119            persisted.entity_name().to_string(),
1120            persisted.primary_key_field_ids().to_vec(),
1121            persisted.row_layout().clone(),
1122            persisted.fields().to_vec(),
1123            Vec::new(),
1124        );
1125        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1126
1127        write_hash_u64(&mut hasher, entity.value());
1128        write_hash_u32(&mut hasher, persisted.version().get());
1129        write_hash_len_u32(&mut hasher, encoded.len());
1130        hasher.update(encoded);
1131    }
1132
1133    Ok(finalize_schema_metadata(
1134        max_version,
1135        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1136        hasher,
1137        latest_by_entity.len(),
1138    ))
1139}
1140
1141fn derive_index_allocation_metadata(
1142    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1143) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1144    let mut max_version = SchemaVersion::initial();
1145    let mut hasher = new_hash_sha256();
1146    write_hash_tag_u8(
1147        &mut hasher,
1148        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1149    );
1150
1151    for (entity, (_, snapshot)) in latest_by_entity {
1152        let persisted = snapshot.decode_persisted_snapshot()?;
1153        if persisted.version() > max_version {
1154            max_version = persisted.version();
1155        }
1156
1157        write_hash_u64(&mut hasher, entity.value());
1158        write_hash_u32(&mut hasher, persisted.version().get());
1159        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1160        for index in persisted.indexes() {
1161            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1162            write_hash_str_u32(&mut hasher, index.name());
1163            write_hash_str_u32(&mut hasher, index.store());
1164            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1165            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1166            match index.predicate_sql() {
1167                Some(predicate_sql) => {
1168                    write_hash_tag_u8(&mut hasher, 1);
1169                    write_hash_str_u32(&mut hasher, predicate_sql);
1170                }
1171                None => write_hash_tag_u8(&mut hasher, 0),
1172            }
1173            hash_persisted_index_key(&mut hasher, index.key());
1174        }
1175    }
1176
1177    Ok(finalize_schema_metadata(
1178        max_version,
1179        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1180        hasher,
1181        latest_by_entity.len(),
1182    ))
1183}
1184
1185fn derive_schema_catalog_metadata(
1186    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1187) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1188    let mut max_version = SchemaVersion::initial();
1189    let mut hasher = new_hash_sha256();
1190    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1191
1192    for (entity, (version, snapshot)) in latest_by_entity {
1193        let persisted = snapshot.decode_persisted_snapshot()?;
1194        if persisted.version() > max_version {
1195            max_version = persisted.version();
1196        }
1197
1198        write_hash_u64(&mut hasher, entity.value());
1199        write_hash_u32(&mut hasher, version.get());
1200        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1201        hasher.update(snapshot.as_bytes());
1202    }
1203
1204    Ok(finalize_schema_metadata(
1205        max_version,
1206        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1207        hasher,
1208        latest_by_entity.len(),
1209    ))
1210}
1211
1212fn finalize_schema_metadata(
1213    schema_version: SchemaVersion,
1214    schema_fingerprint_method_version: u8,
1215    hasher: sha2::Sha256,
1216    entity_count: usize,
1217) -> SchemaStoreCatalogMetadata {
1218    let digest = finalize_hash_sha256(hasher);
1219    let mut schema_fingerprint = [0u8; 16];
1220    schema_fingerprint.copy_from_slice(&digest[..16]);
1221
1222    SchemaStoreCatalogMetadata::new(
1223        schema_version,
1224        schema_fingerprint_method_version,
1225        schema_fingerprint,
1226        u64::try_from(entity_count).unwrap_or(u64::MAX),
1227    )
1228}
1229
1230fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1231    match key {
1232        PersistedIndexKeySnapshot::FieldPath(paths) => {
1233            write_hash_tag_u8(hasher, 1);
1234            write_hash_len_u32(hasher, paths.len());
1235            for path in paths {
1236                hash_persisted_index_field_path(hasher, path);
1237            }
1238        }
1239        PersistedIndexKeySnapshot::Items(items) => {
1240            write_hash_tag_u8(hasher, 2);
1241            write_hash_len_u32(hasher, items.len());
1242            for item in items {
1243                match item {
1244                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1245                        write_hash_tag_u8(hasher, 1);
1246                        hash_persisted_index_field_path(hasher, path);
1247                    }
1248                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1249                        write_hash_tag_u8(hasher, 2);
1250                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1251                        hash_persisted_index_field_path(hasher, expression.source());
1252                        hash_persisted_field_kind(hasher, expression.input_kind());
1253                        hash_persisted_field_kind(hasher, expression.output_kind());
1254                        write_hash_str_u32(hasher, expression.canonical_text());
1255                    }
1256                }
1257            }
1258        }
1259    }
1260}
1261
1262fn hash_persisted_index_field_path(
1263    hasher: &mut sha2::Sha256,
1264    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1265) {
1266    write_hash_u32(hasher, path.field_id().get());
1267    write_hash_u32(hasher, u32::from(path.slot().get()));
1268    write_hash_len_u32(hasher, path.path().len());
1269    for segment in path.path() {
1270        write_hash_str_u32(hasher, segment);
1271    }
1272    hash_persisted_field_kind(hasher, path.kind());
1273    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1274}
1275
1276fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1277    match kind {
1278        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1279        PersistedFieldKind::Blob { max_len } => {
1280            write_hash_tag_u8(hasher, 2);
1281            hash_optional_u32(hasher, *max_len);
1282        }
1283        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1284        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1285        PersistedFieldKind::Decimal { scale } => {
1286            write_hash_tag_u8(hasher, 5);
1287            write_hash_u32(hasher, *scale);
1288        }
1289        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1290        PersistedFieldKind::Enum { path, variants } => {
1291            write_hash_tag_u8(hasher, 7);
1292            write_hash_str_u32(hasher, path);
1293            write_hash_len_u32(hasher, variants.len());
1294            for variant in variants {
1295                write_hash_str_u32(hasher, variant.ident());
1296                match variant.payload_kind() {
1297                    Some(payload_kind) => {
1298                        write_hash_tag_u8(hasher, 1);
1299                        hash_persisted_field_kind(hasher, payload_kind);
1300                    }
1301                    None => write_hash_tag_u8(hasher, 0),
1302                }
1303                write_hash_str_u32(
1304                    hasher,
1305                    field_storage_decode_name(variant.payload_storage_decode()),
1306                );
1307            }
1308        }
1309        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1310        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1311        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1312        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1313        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1314        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1315        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1316        PersistedFieldKind::IntBig { max_bytes } => {
1317            write_hash_tag_u8(hasher, 15);
1318            write_hash_u32(hasher, *max_bytes);
1319        }
1320        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1321        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1322        PersistedFieldKind::Text { max_len } => {
1323            write_hash_tag_u8(hasher, 18);
1324            hash_optional_u32(hasher, *max_len);
1325        }
1326        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1327        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1328        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1329        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1330        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1331        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1332        PersistedFieldKind::NatBig { max_bytes } => {
1333            write_hash_tag_u8(hasher, 25);
1334            write_hash_u32(hasher, *max_bytes);
1335        }
1336        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1337        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1338        PersistedFieldKind::Relation {
1339            target_path,
1340            target_entity_name,
1341            target_entity_tag,
1342            target_store_path,
1343            key_kind,
1344            strength,
1345        } => {
1346            write_hash_tag_u8(hasher, 28);
1347            write_hash_str_u32(hasher, target_path);
1348            write_hash_str_u32(hasher, target_entity_name);
1349            write_hash_u64(hasher, target_entity_tag.value());
1350            write_hash_str_u32(hasher, target_store_path);
1351            hash_persisted_field_kind(hasher, key_kind);
1352            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1353        }
1354        PersistedFieldKind::List(inner) => {
1355            write_hash_tag_u8(hasher, 29);
1356            hash_persisted_field_kind(hasher, inner);
1357        }
1358        PersistedFieldKind::Set(inner) => {
1359            write_hash_tag_u8(hasher, 30);
1360            hash_persisted_field_kind(hasher, inner);
1361        }
1362        PersistedFieldKind::Map { key, value } => {
1363            write_hash_tag_u8(hasher, 31);
1364            hash_persisted_field_kind(hasher, key);
1365            hash_persisted_field_kind(hasher, value);
1366        }
1367        PersistedFieldKind::Structured { queryable } => {
1368            write_hash_tag_u8(hasher, 32);
1369            write_hash_tag_u8(hasher, u8::from(*queryable));
1370        }
1371    }
1372}
1373
1374fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1375    match value {
1376        Some(value) => {
1377            write_hash_tag_u8(hasher, 1);
1378            write_hash_u32(hasher, value);
1379        }
1380        None => write_hash_tag_u8(hasher, 0),
1381    }
1382}
1383
1384const fn persisted_index_origin_name(
1385    origin: crate::db::schema::PersistedIndexOrigin,
1386) -> &'static str {
1387    match origin {
1388        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1389        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1390    }
1391}
1392
1393const fn persisted_expression_op_name(
1394    op: crate::db::schema::PersistedIndexExpressionOp,
1395) -> &'static str {
1396    match op {
1397        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1398        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1399        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1400        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1401        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1402        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1403        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1404        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1405    }
1406}
1407
1408const fn persisted_relation_strength_name(
1409    strength: crate::db::schema::PersistedRelationStrength,
1410) -> &'static str {
1411    match strength {
1412        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1413        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1414    }
1415}
1416
1417const fn field_storage_decode_name(
1418    decode: crate::model::field::FieldStorageDecode,
1419) -> &'static str {
1420    match decode {
1421        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1422        crate::model::field::FieldStorageDecode::Value => "value",
1423    }
1424}
1425
1426///
1427/// TESTS
1428///
1429
1430#[cfg(test)]
1431mod tests;