Skip to main content

icydb_core/db/schema/
store.rs

1//! Module: db::schema::store
2//! Responsibility: stable BTreeMap-backed schema metadata persistence.
3//! Does not own: reconciliation policy, typed snapshot encoding, or generated proposal construction.
4//! Boundary: provides the third per-store stable memory alongside row and index stores.
5
6use crate::{
7    db::{
8        codec::{
9            finalize_hash_sha256, new_hash_sha256, write_hash_len_u32, write_hash_str_u32,
10            write_hash_tag_u8, write_hash_u32, write_hash_u64,
11        },
12        commit::CommitSchemaFingerprint,
13        direction::Direction,
14        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
15        schema::{
16            AcceptedSchemaSnapshot, PersistedFieldKind, PersistedIndexKeyItemSnapshot,
17            PersistedIndexKeySnapshot, PersistedSchemaSnapshot, SchemaVersion,
18            accepted_schema_cache_fingerprint,
19            accepted_schema_cache_fingerprint_for_persisted_snapshot,
20            accepted_schema_cache_fingerprint_method_version, decode_persisted_schema_snapshot,
21            encode_persisted_schema_snapshot, schema_snapshot_integrity_detail,
22        },
23    },
24    error::InternalError,
25    traits::Storable,
26    types::EntityTag,
27};
28use ic_memory::stable_structures::storable::Bound as StorableBound;
29use ic_memory::stable_structures::{
30    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
31};
32use sha2::Digest;
33use std::borrow::Cow;
34#[cfg(test)]
35use std::cell::Cell;
36use std::collections::{BTreeMap as StdBTreeMap, BTreeSet};
37use std::convert::Infallible;
38use std::ops::Bound as RangeBound;
39
40const SCHEMA_KEY_BYTES_USIZE: usize = 12;
41const SCHEMA_KEY_BYTES: u32 = 12;
42pub(in crate::db) const MAX_SCHEMA_SNAPSHOT_BYTES: u32 = 512 * 1024;
43const SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION: u8 = 1;
44const SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION: u8 = 2;
45const SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION: u8 = 3;
46const RAW_SCHEMA_SNAPSHOT_MAGIC: &[u8; 8] = b"ICYDBSCH";
47const RAW_SCHEMA_SNAPSHOT_VALUE_VERSION: u8 = 1;
48const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES: usize = 25;
49const RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32: u32 = 25;
50
51#[cfg(test)]
52thread_local! {
53    static LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS: Cell<u64> = const { Cell::new(0) };
54}
55
56#[cfg(test)]
57pub(in crate::db) fn reset_latest_raw_snapshots_by_entity_call_count_for_tests() {
58    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(0));
59}
60
61#[cfg(test)]
62pub(in crate::db) fn latest_raw_snapshots_by_entity_call_count_for_tests() -> u64 {
63    LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(Cell::get)
64}
65
66///
67/// RawSchemaKey
68///
69/// Stable key for one persisted schema snapshot entry.
70/// It combines the entity tag and schema version so reconciliation can load
71/// concrete versions without depending on generated entity names.
72///
73
74#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
75struct RawSchemaKey([u8; SCHEMA_KEY_BYTES_USIZE]);
76
77impl RawSchemaKey {
78    /// Build the raw persisted key for one entity schema version.
79    #[must_use]
80    fn from_entity_version(entity: EntityTag, version: SchemaVersion) -> Self {
81        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
82        out[..size_of::<u64>()].copy_from_slice(&entity.value().to_be_bytes());
83        out[size_of::<u64>()..].copy_from_slice(&version.get().to_be_bytes());
84
85        Self(out)
86    }
87
88    /// Return the entity tag encoded in this schema key.
89    #[must_use]
90    fn entity_tag(self) -> EntityTag {
91        let mut bytes = [0u8; size_of::<u64>()];
92        bytes.copy_from_slice(&self.0[..size_of::<u64>()]);
93
94        EntityTag::new(u64::from_be_bytes(bytes))
95    }
96
97    /// Return the schema version encoded in this schema key.
98    #[must_use]
99    fn version(self) -> u32 {
100        let mut bytes = [0u8; size_of::<u32>()];
101        bytes.copy_from_slice(&self.0[size_of::<u64>()..]);
102
103        u32::from_be_bytes(bytes)
104    }
105
106    fn entity_range_bounds(entity: EntityTag) -> (RangeBound<Self>, RangeBound<Self>) {
107        (
108            RangeBound::Included(Self::from_entity_version(entity, SchemaVersion::new(0))),
109            RangeBound::Included(Self::from_entity_version(
110                entity,
111                SchemaVersion::new(u32::MAX),
112            )),
113        )
114    }
115}
116
117impl Storable for RawSchemaKey {
118    fn to_bytes(&self) -> Cow<'_, [u8]> {
119        Cow::Borrowed(&self.0)
120    }
121
122    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
123        debug_assert_eq!(
124            bytes.len(),
125            SCHEMA_KEY_BYTES_USIZE,
126            "RawSchemaKey::from_bytes received unexpected byte length",
127        );
128
129        if bytes.len() != SCHEMA_KEY_BYTES_USIZE {
130            return Self([0u8; SCHEMA_KEY_BYTES_USIZE]);
131        }
132
133        let mut out = [0u8; SCHEMA_KEY_BYTES_USIZE];
134        out.copy_from_slice(bytes.as_ref());
135        Self(out)
136    }
137
138    fn into_bytes(self) -> Vec<u8> {
139        self.0.to_vec()
140    }
141
142    const BOUND: StorableBound = StorableBound::Bounded {
143        max_size: SCHEMA_KEY_BYTES,
144        is_fixed_size: true,
145    };
146}
147
148///
149/// RawSchemaSnapshot
150///
151/// Raw persisted schema snapshot payload.
152/// This wrapper stores the encoded `PersistedSchemaSnapshot` payload while
153/// keeping the stable-memory value boundary independent from the typed schema
154/// DTOs used by reconciliation.
155///
156
157#[derive(Clone, Debug, Eq, PartialEq)]
158struct RawSchemaSnapshot {
159    payload: Vec<u8>,
160    accepted_schema_fingerprint: Option<CommitSchemaFingerprint>,
161}
162
163impl RawSchemaSnapshot {
164    /// Encode one typed persisted-schema snapshot into a raw store payload.
165    fn from_persisted_snapshot(snapshot: &PersistedSchemaSnapshot) -> Result<Self, InternalError> {
166        validate_typed_schema_snapshot_for_store(snapshot)?;
167
168        let accepted_schema_fingerprint =
169            accepted_schema_cache_fingerprint_for_persisted_snapshot(snapshot)?;
170        let payload = encode_persisted_schema_snapshot(snapshot)?;
171
172        Ok(Self {
173            payload,
174            accepted_schema_fingerprint: Some(accepted_schema_fingerprint),
175        })
176    }
177
178    /// Build one raw schema snapshot from already-encoded bytes.
179    #[must_use]
180    #[cfg(test)]
181    const fn from_bytes(payload: Vec<u8>) -> Self {
182        Self {
183            payload,
184            accepted_schema_fingerprint: None,
185        }
186    }
187
188    /// Borrow the encoded schema snapshot payload.
189    #[must_use]
190    const fn as_bytes(&self) -> &[u8] {
191        self.payload.as_slice()
192    }
193
194    /// Consume the snapshot into its encoded payload bytes.
195    #[must_use]
196    fn into_bytes(self) -> Vec<u8> {
197        self.payload
198    }
199
200    /// Return the accepted schema identity fingerprint stored beside the raw
201    /// payload, without decoding the persisted snapshot.
202    fn accepted_schema_fingerprint(&self) -> Result<CommitSchemaFingerprint, InternalError> {
203        self.accepted_schema_fingerprint.ok_or_else(|| {
204            InternalError::store_corruption(
205                "obsolete raw schema snapshot storage format missing accepted schema identity header",
206            )
207        })
208    }
209
210    /// Decode this raw store payload into a typed persisted-schema snapshot.
211    fn decode_persisted_snapshot(&self) -> Result<PersistedSchemaSnapshot, InternalError> {
212        decode_persisted_schema_snapshot(self.as_bytes())
213    }
214}
215
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub(in crate::db) struct AcceptedCatalogIdentity {
218    entity_tag: EntityTag,
219    entity_path: &'static str,
220    store_path: &'static str,
221    accepted_schema_version: SchemaVersion,
222    fingerprint_method_version: u8,
223    accepted_schema_fingerprint: CommitSchemaFingerprint,
224}
225
226impl AcceptedCatalogIdentity {
227    #[must_use]
228    pub(in crate::db) const fn new(
229        entity_tag: EntityTag,
230        entity_path: &'static str,
231        store_path: &'static str,
232        accepted_schema_version: SchemaVersion,
233        accepted_schema_fingerprint: CommitSchemaFingerprint,
234    ) -> Self {
235        Self {
236            entity_tag,
237            entity_path,
238            store_path,
239            accepted_schema_version,
240            fingerprint_method_version: accepted_schema_cache_fingerprint_method_version(),
241            accepted_schema_fingerprint,
242        }
243    }
244
245    #[must_use]
246    pub(in crate::db) const fn entity_tag(self) -> EntityTag {
247        self.entity_tag
248    }
249
250    #[must_use]
251    pub(in crate::db) const fn entity_path(self) -> &'static str {
252        self.entity_path
253    }
254
255    #[must_use]
256    pub(in crate::db) const fn store_path(self) -> &'static str {
257        self.store_path
258    }
259
260    #[must_use]
261    pub(in crate::db) const fn accepted_schema_version(self) -> SchemaVersion {
262        self.accepted_schema_version
263    }
264
265    #[must_use]
266    pub(in crate::db) const fn fingerprint_method_version(self) -> u8 {
267        self.fingerprint_method_version
268    }
269
270    #[must_use]
271    pub(in crate::db) const fn accepted_schema_fingerprint(self) -> CommitSchemaFingerprint {
272        self.accepted_schema_fingerprint
273    }
274}
275
276#[derive(Clone, Debug, Eq, PartialEq)]
277pub(in crate::db) struct AcceptedCatalogSnapshotSelection {
278    identity: AcceptedCatalogIdentity,
279    raw_snapshot: Vec<u8>,
280}
281
282impl AcceptedCatalogSnapshotSelection {
283    #[must_use]
284    const fn new(identity: AcceptedCatalogIdentity, raw_snapshot: Vec<u8>) -> Self {
285        Self {
286            identity,
287            raw_snapshot,
288        }
289    }
290
291    #[must_use]
292    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
293        self.identity
294    }
295
296    pub(in crate::db) fn decode_verified(&self) -> Result<AcceptedSchemaSnapshot, InternalError> {
297        let snapshot = decode_persisted_schema_snapshot(&self.raw_snapshot)?;
298        let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
299        let identity = self.identity();
300
301        if accepted.persisted_snapshot().version() != identity.accepted_schema_version() {
302            return Err(InternalError::store_invariant(
303                "accepted catalog identity selected a different schema version than the decoded snapshot",
304            ));
305        }
306        if accepted.entity_path() != identity.entity_path() {
307            return Err(InternalError::store_invariant(
308                "accepted catalog identity selected a different entity path than the decoded snapshot",
309            ));
310        }
311
312        let decoded_fingerprint = accepted_schema_cache_fingerprint(&accepted)?;
313        if decoded_fingerprint != identity.accepted_schema_fingerprint() {
314            return Err(InternalError::store_invariant(
315                "accepted catalog identity fingerprint did not match the decoded snapshot",
316            ));
317        }
318
319        Ok(accepted)
320    }
321}
322
323impl Storable for RawSchemaSnapshot {
324    fn to_bytes(&self) -> Cow<'_, [u8]> {
325        let Some(fingerprint) = self.accepted_schema_fingerprint else {
326            return Cow::Borrowed(self.as_bytes());
327        };
328
329        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
330        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
331        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
332        bytes.extend_from_slice(&fingerprint);
333        bytes.extend_from_slice(self.as_bytes());
334
335        Cow::Owned(bytes)
336    }
337
338    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
339        let bytes = bytes.into_owned();
340        if bytes.len() >= RAW_SCHEMA_SNAPSHOT_HEADER_BYTES
341            && &bytes[..RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_MAGIC
342            && bytes[RAW_SCHEMA_SNAPSHOT_MAGIC.len()] == RAW_SCHEMA_SNAPSHOT_VALUE_VERSION
343        {
344            let fingerprint_start = RAW_SCHEMA_SNAPSHOT_MAGIC.len() + size_of::<u8>();
345            let fingerprint_end = fingerprint_start + size_of::<CommitSchemaFingerprint>();
346            let mut fingerprint = [0_u8; size_of::<CommitSchemaFingerprint>()];
347            fingerprint.copy_from_slice(&bytes[fingerprint_start..fingerprint_end]);
348
349            return Self {
350                payload: bytes[fingerprint_end..].to_vec(),
351                accepted_schema_fingerprint: Some(fingerprint),
352            };
353        }
354
355        Self {
356            payload: bytes,
357            accepted_schema_fingerprint: None,
358        }
359    }
360
361    fn into_bytes(self) -> Vec<u8> {
362        let Some(fingerprint) = self.accepted_schema_fingerprint else {
363            return self.payload;
364        };
365
366        let mut bytes = Vec::with_capacity(RAW_SCHEMA_SNAPSHOT_HEADER_BYTES + self.payload.len());
367        bytes.extend_from_slice(RAW_SCHEMA_SNAPSHOT_MAGIC);
368        bytes.push(RAW_SCHEMA_SNAPSHOT_VALUE_VERSION);
369        bytes.extend_from_slice(&fingerprint);
370        bytes.extend_from_slice(&self.payload);
371
372        bytes
373    }
374
375    const BOUND: StorableBound = StorableBound::Bounded {
376        max_size: MAX_SCHEMA_SNAPSHOT_BYTES + RAW_SCHEMA_SNAPSHOT_HEADER_BYTES_U32,
377        is_fixed_size: false,
378    };
379}
380
381// Validate typed schema snapshots before they are encoded into the raw schema
382// metadata store. This catches caller-side invariant violations separately from
383// raw persisted-byte corruption handled by the codec decode boundary.
384fn validate_typed_schema_snapshot_for_store(
385    snapshot: &PersistedSchemaSnapshot,
386) -> Result<(), InternalError> {
387    if let Some(detail) = schema_snapshot_integrity_detail(
388        "schema snapshot",
389        snapshot.version(),
390        snapshot.primary_key_field_ids(),
391        snapshot.row_layout(),
392        snapshot.fields(),
393    ) {
394        return Err(InternalError::store_invariant(detail));
395    }
396
397    Ok(())
398}
399
400///
401/// SchemaStoreFootprint
402///
403/// Current raw schema metadata footprint for one entity. Reconciliation uses
404/// this value to report stable-memory pressure without decoding schema payloads
405/// or exposing field-level metadata through metrics.
406///
407
408#[derive(Clone, Copy, Debug, Eq, PartialEq)]
409pub(in crate::db) struct SchemaStoreFootprint {
410    snapshots: u64,
411    encoded_bytes: u64,
412    latest_snapshot_bytes: u64,
413}
414
415///
416/// SchemaStoreCatalogMetadata
417///
418/// Accepted schema-store catalog metadata derived from latest persisted
419/// snapshots. This is diagnostic allocation metadata, not allocation identity.
420///
421
422#[derive(Clone, Copy, Debug, Eq, PartialEq)]
423pub(in crate::db) struct SchemaStoreCatalogMetadata {
424    schema_version: SchemaVersion,
425    schema_fingerprint_method_version: u8,
426    schema_fingerprint: CommitSchemaFingerprint,
427    entity_count: u64,
428}
429
430impl SchemaStoreCatalogMetadata {
431    /// Build catalog metadata from already-derived accepted schema facts.
432    #[must_use]
433    const fn new(
434        schema_version: SchemaVersion,
435        schema_fingerprint_method_version: u8,
436        schema_fingerprint: CommitSchemaFingerprint,
437        entity_count: u64,
438    ) -> Self {
439        Self {
440            schema_version,
441            schema_fingerprint_method_version,
442            schema_fingerprint,
443            entity_count,
444        }
445    }
446
447    /// Return the maximum latest schema version represented in the catalog.
448    #[must_use]
449    pub(in crate::db) const fn schema_version(self) -> SchemaVersion {
450        self.schema_version
451    }
452
453    /// Return the fingerprint method version for this diagnostic metadata row.
454    #[must_use]
455    pub(in crate::db) const fn schema_fingerprint_method_version(self) -> u8 {
456        self.schema_fingerprint_method_version
457    }
458
459    /// Return the deterministic catalog fingerprint for latest accepted
460    /// snapshots.
461    #[must_use]
462    pub(in crate::db) const fn schema_fingerprint(self) -> CommitSchemaFingerprint {
463        self.schema_fingerprint
464    }
465
466    /// Return number of entity schemas represented in this catalog metadata.
467    #[must_use]
468    pub(in crate::db) const fn entity_count(self) -> u64 {
469        self.entity_count
470    }
471}
472
473///
474/// SchemaStoreAllocationMetadata
475///
476/// Role-specific allocation metadata derived from latest accepted schema-store
477/// snapshots. These fingerprints describe the accepted contract that owns each
478/// allocation role; they are diagnostics, not allocation identity.
479///
480
481#[derive(Clone, Copy, Debug, Eq, PartialEq)]
482pub(in crate::db) struct SchemaStoreAllocationMetadata {
483    data: SchemaStoreCatalogMetadata,
484    index: SchemaStoreCatalogMetadata,
485    schema: SchemaStoreCatalogMetadata,
486}
487
488impl SchemaStoreAllocationMetadata {
489    /// Build one role-specific metadata set from already-derived accepted
490    /// schema facts.
491    #[must_use]
492    const fn new(
493        data: SchemaStoreCatalogMetadata,
494        index: SchemaStoreCatalogMetadata,
495        schema: SchemaStoreCatalogMetadata,
496    ) -> Self {
497        Self {
498            data,
499            index,
500            schema,
501        }
502    }
503
504    /// Return accepted row-layout allocation metadata for data memory.
505    #[must_use]
506    pub(in crate::db) const fn data(self) -> SchemaStoreCatalogMetadata {
507        self.data
508    }
509
510    /// Return accepted index-catalog allocation metadata for index memory.
511    #[must_use]
512    pub(in crate::db) const fn index(self) -> SchemaStoreCatalogMetadata {
513        self.index
514    }
515
516    /// Return accepted full schema-catalog allocation metadata for schema
517    /// memory.
518    #[must_use]
519    pub(in crate::db) const fn schema(self) -> SchemaStoreCatalogMetadata {
520        self.schema
521    }
522}
523
524impl SchemaStoreFootprint {
525    /// Build one schema-store footprint from already-counted raw payload facts.
526    #[must_use]
527    const fn new(snapshots: u64, encoded_bytes: u64, latest_snapshot_bytes: u64) -> Self {
528        Self {
529            snapshots,
530            encoded_bytes,
531            latest_snapshot_bytes,
532        }
533    }
534
535    /// Return the number of raw schema snapshots stored for the entity.
536    #[must_use]
537    pub(in crate::db) const fn snapshots(self) -> u64 {
538        self.snapshots
539    }
540
541    /// Return the total encoded payload bytes stored for the entity.
542    #[must_use]
543    pub(in crate::db) const fn encoded_bytes(self) -> u64 {
544        self.encoded_bytes
545    }
546
547    /// Return the encoded payload bytes for the highest-version snapshot.
548    #[must_use]
549    pub(in crate::db) const fn latest_snapshot_bytes(self) -> u64 {
550        self.latest_snapshot_bytes
551    }
552}
553
554///
555/// SchemaStore
556///
557/// Thin persistence wrapper over one stable or heap schema metadata BTreeMap.
558/// Startup reconciliation writes and validates encoded schema snapshots here
559/// before row/index operations proceed.
560///
561
562pub struct SchemaStore {
563    backend: SchemaStoreBackend,
564}
565
566enum SchemaStoreBackend {
567    Stable(StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>),
568    Heap(StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>),
569    Journaled {
570        canonical:
571            StableBTreeMap<RawSchemaKey, RawSchemaSnapshot, VirtualMemory<DefaultMemoryImpl>>,
572        live: StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
573        tombstones: BTreeSet<RawSchemaKey>,
574    },
575}
576
577/// Control-flow result for schema-store traversal visitors.
578#[derive(Clone, Copy, Debug, Eq, PartialEq)]
579enum SchemaStoreVisit {
580    Continue,
581    #[allow(
582        dead_code,
583        reason = "schema traversal exposes early-stop semantics for bounded future callers; focused tests cover it before live call sites need it"
584    )]
585    Stop,
586}
587
588impl SchemaStoreVisit {
589    const fn should_stop(self) -> bool {
590        matches!(self, Self::Stop)
591    }
592}
593
594impl SchemaStore {
595    /// Initialize the schema store with the provided backing memory.
596    #[must_use]
597    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
598        Self {
599            backend: SchemaStoreBackend::Stable(StableBTreeMap::init(memory)),
600        }
601    }
602
603    /// Initialize a volatile heap-backed schema store.
604    #[must_use]
605    pub const fn init_heap() -> Self {
606        Self {
607            backend: SchemaStoreBackend::Heap(StdBTreeMap::new()),
608        }
609    }
610
611    /// Initialize a journaled cached-stable schema store.
612    ///
613    /// Normal schema publication writes only the live projection. Canonical
614    /// stable schema history is updated by future journal fold/recovery paths.
615    #[must_use]
616    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
617        Self {
618            backend: SchemaStoreBackend::Journaled {
619                canonical: StableBTreeMap::init(memory),
620                live: StdBTreeMap::new(),
621                tombstones: BTreeSet::new(),
622            },
623        }
624    }
625
626    /// Insert or replace one typed persisted schema snapshot.
627    pub(in crate::db) fn insert_persisted_snapshot(
628        &mut self,
629        entity: EntityTag,
630        snapshot: &PersistedSchemaSnapshot,
631    ) -> Result<(), InternalError> {
632        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
633        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
634        let _ = self.insert_raw_snapshot(key, raw_snapshot);
635
636        Ok(())
637    }
638
639    /// Insert one typed persisted schema snapshot only if the current live
640    /// accepted catalog identity still matches the identity captured before
641    /// schema mutation planning.
642    pub(in crate::db) fn insert_persisted_snapshot_if_latest_identity(
643        &mut self,
644        expected: AcceptedCatalogIdentity,
645        snapshot: &PersistedSchemaSnapshot,
646    ) -> Result<(), InternalError> {
647        let live = self.latest_catalog_identity(
648            expected.entity_tag(),
649            expected.entity_path(),
650            expected.store_path(),
651        )?;
652        if live
653            .as_ref()
654            .map(AcceptedCatalogSnapshotSelection::identity)
655            != Some(expected)
656        {
657            return Err(InternalError::schema_ddl_publication_race_lost(
658                expected.entity_path(),
659            ));
660        }
661
662        self.insert_persisted_snapshot(expected.entity_tag(), snapshot)
663    }
664
665    /// Reset the volatile projection for journaled recovery without mutating
666    /// the canonical stable schema base.
667    pub(in crate::db) fn reset_journaled_live_projection(&mut self) -> Result<(), InternalError> {
668        let SchemaStoreBackend::Journaled {
669            live, tombstones, ..
670        } = &mut self.backend
671        else {
672            return Err(InternalError::store_invariant(
673                "journaled live projection reset requires a journaled schema store",
674            ));
675        };
676
677        live.clear();
678        tombstones.clear();
679
680        Ok(())
681    }
682
683    /// Apply one folded journal schema snapshot into the canonical stable base.
684    pub(in crate::db) fn fold_persisted_snapshot(
685        &mut self,
686        entity: EntityTag,
687        snapshot: &PersistedSchemaSnapshot,
688    ) -> Result<(), InternalError> {
689        let SchemaStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
690            return Err(InternalError::store_invariant(
691                "journal schema fold requires a journaled schema store",
692            ));
693        };
694
695        let key = RawSchemaKey::from_entity_version(entity, snapshot.version());
696        let raw_snapshot = RawSchemaSnapshot::from_persisted_snapshot(snapshot)?;
697        canonical.insert(key, raw_snapshot);
698
699        Ok(())
700    }
701
702    /// Load and decode one typed persisted schema snapshot.
703    #[cfg(test)]
704    pub(in crate::db) fn get_persisted_snapshot(
705        &self,
706        entity: EntityTag,
707        version: SchemaVersion,
708    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
709        let key = RawSchemaKey::from_entity_version(entity, version);
710        self.get_raw_snapshot(&key)
711            .map(|snapshot| snapshot.decode_persisted_snapshot())
712            .transpose()
713    }
714
715    /// Load and decode the highest stored schema snapshot version for one entity.
716    pub(in crate::db) fn latest_persisted_snapshot(
717        &self,
718        entity: EntityTag,
719    ) -> Result<Option<PersistedSchemaSnapshot>, InternalError> {
720        self.latest_raw_snapshot(entity)
721            .map(|snapshot| snapshot.decode_persisted_snapshot())
722            .transpose()
723    }
724
725    /// Return the latest accepted catalog identity for one entity without
726    /// decoding the selected schema snapshot.
727    pub(in crate::db) fn latest_catalog_identity(
728        &self,
729        entity: EntityTag,
730        entity_path: &'static str,
731        store_path: &'static str,
732    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError> {
733        let Some((version, raw_snapshot)) = self.latest_raw_snapshot_entry(entity) else {
734            return Ok(None);
735        };
736        let fingerprint = raw_snapshot.accepted_schema_fingerprint()?;
737        let identity =
738            AcceptedCatalogIdentity::new(entity, entity_path, store_path, version, fingerprint);
739
740        Ok(Some(AcceptedCatalogSnapshotSelection::new(
741            identity,
742            raw_snapshot.into_bytes(),
743        )))
744    }
745
746    /// Return raw schema-store footprint facts for one entity.
747    #[must_use]
748    pub(in crate::db) fn entity_footprint(&self, entity: EntityTag) -> SchemaStoreFootprint {
749        let mut snapshots = 0u64;
750        let mut encoded_bytes = 0u64;
751        let mut latest = None::<(SchemaVersion, u64)>;
752
753        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
754            if key.entity_tag() != entity {
755                return Ok(SchemaStoreVisit::Continue);
756            }
757
758            let snapshot_bytes = u64::try_from(snapshot.as_bytes().len()).unwrap_or(u64::MAX);
759            snapshots = snapshots.saturating_add(1);
760            encoded_bytes = encoded_bytes.saturating_add(snapshot_bytes);
761
762            let version = SchemaVersion::new(key.version());
763            if latest
764                .as_ref()
765                .is_none_or(|(latest_version, _)| version > *latest_version)
766            {
767                latest = Some((version, snapshot_bytes));
768            }
769            Ok(SchemaStoreVisit::Continue)
770        });
771
772        SchemaStoreFootprint::new(
773            snapshots,
774            encoded_bytes,
775            latest.map_or(0, |(_, snapshot_bytes)| snapshot_bytes),
776        )
777    }
778
779    /// Derive accepted catalog metadata from latest persisted schema snapshots.
780    ///
781    /// This function intentionally reads only the persisted schema store. It
782    /// does not reconstruct metadata from generated models when the store has
783    /// no accepted snapshots.
784    #[cfg(test)]
785    pub(in crate::db) fn catalog_metadata(
786        &self,
787    ) -> Result<Option<SchemaStoreCatalogMetadata>, InternalError> {
788        Ok(self
789            .allocation_metadata()?
790            .map(SchemaStoreAllocationMetadata::schema))
791    }
792
793    /// Derive role-specific allocation metadata from latest persisted schema
794    /// snapshots.
795    ///
796    /// This function intentionally reads only accepted schema-store payloads.
797    /// It never reconstructs metadata from generated models when the store has
798    /// no accepted snapshots.
799    pub(in crate::db) fn allocation_metadata(
800        &self,
801    ) -> Result<Option<SchemaStoreAllocationMetadata>, InternalError> {
802        let latest_by_entity = self.latest_raw_snapshots_by_entity();
803        if latest_by_entity.is_empty() {
804            return Ok(None);
805        }
806
807        Ok(Some(SchemaStoreAllocationMetadata::new(
808            derive_data_allocation_metadata(&latest_by_entity)?,
809            derive_index_allocation_metadata(&latest_by_entity)?,
810            derive_schema_catalog_metadata(&latest_by_entity)?,
811        )))
812    }
813
814    /// Insert or replace one raw schema snapshot.
815    fn insert_raw_snapshot(
816        &mut self,
817        key: RawSchemaKey,
818        snapshot: RawSchemaSnapshot,
819    ) -> Option<RawSchemaSnapshot> {
820        let previous_journaled = if matches!(self.backend, SchemaStoreBackend::Journaled { .. }) {
821            self.get_raw_snapshot_for_backend(&key)
822        } else {
823            None
824        };
825        match &mut self.backend {
826            SchemaStoreBackend::Stable(map) => map.insert(key, snapshot),
827            SchemaStoreBackend::Heap(map) => map.insert(key, snapshot),
828            SchemaStoreBackend::Journaled {
829                live, tombstones, ..
830            } => {
831                tombstones.remove(&key);
832                live.insert(key, snapshot);
833                previous_journaled
834            }
835        }
836    }
837
838    /// Load one raw schema snapshot by key.
839    #[must_use]
840    #[cfg(test)]
841    fn get_raw_snapshot(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
842        match &self.backend {
843            SchemaStoreBackend::Stable(map) => map.get(key),
844            SchemaStoreBackend::Heap(map) => map.get(key).cloned(),
845            SchemaStoreBackend::Journaled { .. } => self.get_raw_snapshot_for_backend(key),
846        }
847    }
848
849    /// Return whether one schema snapshot key is present.
850    #[must_use]
851    #[cfg(test)]
852    fn contains_raw_snapshot(&self, key: &RawSchemaKey) -> bool {
853        match &self.backend {
854            SchemaStoreBackend::Stable(map) => map.contains_key(key),
855            SchemaStoreBackend::Heap(map) => map.contains_key(key),
856            SchemaStoreBackend::Journaled { .. } => {
857                self.get_raw_snapshot_for_backend(key).is_some()
858            }
859        }
860    }
861
862    /// Return the number of schema snapshot entries in this store.
863    #[must_use]
864    #[cfg(test)]
865    pub(in crate::db) fn len(&self) -> u64 {
866        match &self.backend {
867            SchemaStoreBackend::Stable(map) => map.len(),
868            SchemaStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
869            SchemaStoreBackend::Journaled { .. } => {
870                let mut count = 0_u64;
871                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
872                    count = count.saturating_add(1);
873                    Ok(SchemaStoreVisit::Continue)
874                });
875                count
876            }
877        }
878    }
879
880    /// Return whether this schema store currently has no persisted snapshots.
881    #[must_use]
882    #[cfg(test)]
883    pub(in crate::db) fn is_empty(&self) -> bool {
884        match &self.backend {
885            SchemaStoreBackend::Stable(map) => map.is_empty(),
886            SchemaStoreBackend::Heap(map) => map.is_empty(),
887            SchemaStoreBackend::Journaled { .. } => {
888                let mut empty = true;
889                let _: Result<(), Infallible> = self.visit_raw_snapshots(|_key, _snapshot| {
890                    empty = false;
891                    Ok(SchemaStoreVisit::Stop)
892                });
893                empty
894            }
895        }
896    }
897
898    /// Clear all schema metadata entries from the store.
899    #[cfg(test)]
900    pub(in crate::db) fn clear(&mut self) {
901        match &mut self.backend {
902            SchemaStoreBackend::Stable(map) => map.clear_new(),
903            SchemaStoreBackend::Heap(map) => map.clear(),
904            SchemaStoreBackend::Journaled {
905                canonical,
906                live,
907                tombstones,
908            } => {
909                live.clear();
910                tombstones.clear();
911                for entry in canonical.iter() {
912                    tombstones.insert(*entry.key());
913                }
914            }
915        }
916    }
917
918    fn latest_raw_snapshots_by_entity(
919        &self,
920    ) -> StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)> {
921        #[cfg(test)]
922        LATEST_RAW_SNAPSHOTS_BY_ENTITY_CALLS.with(|calls| calls.set(calls.get().saturating_add(1)));
923
924        let mut latest_by_entity =
925            StdBTreeMap::<EntityTag, (SchemaVersion, RawSchemaSnapshot)>::new();
926
927        let _: Result<(), std::convert::Infallible> = self.visit_raw_snapshots(|key, snapshot| {
928            let version = SchemaVersion::new(key.version());
929            match latest_by_entity.get_mut(&key.entity_tag()) {
930                Some((latest_version, latest_snapshot)) if version > *latest_version => {
931                    *latest_version = version;
932                    *latest_snapshot = snapshot.clone();
933                }
934                None => {
935                    latest_by_entity.insert(key.entity_tag(), (version, snapshot.clone()));
936                }
937                Some(_) => {}
938            }
939            Ok(SchemaStoreVisit::Continue)
940        });
941
942        latest_by_entity
943    }
944
945    /// Visit raw schema snapshots in canonical store order without exposing
946    /// the backing stable-map iterator.
947    fn visit_raw_snapshots<E>(
948        &self,
949        visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
950    ) -> Result<(), E> {
951        match &self.backend {
952            SchemaStoreBackend::Stable(map) => {
953                let mut visitor = visitor;
954                for entry in map.iter() {
955                    if visitor(entry.key(), &entry.value())?.should_stop() {
956                        break;
957                    }
958                }
959            }
960            SchemaStoreBackend::Heap(map) => {
961                let mut visitor = visitor;
962                for (key, snapshot) in map {
963                    if visitor(key, snapshot)?.should_stop() {
964                        break;
965                    }
966                }
967            }
968            SchemaStoreBackend::Journaled {
969                canonical,
970                live,
971                tombstones,
972            } => Self::visit_journaled_raw_snapshot_range(
973                canonical,
974                live,
975                tombstones,
976                (RangeBound::Unbounded, RangeBound::Unbounded),
977                Direction::Asc,
978                visitor,
979            )?,
980        }
981
982        Ok(())
983    }
984
985    #[cfg(test)]
986    #[must_use]
987    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
988        match &self.backend {
989            SchemaStoreBackend::Stable(map)
990            | SchemaStoreBackend::Journaled { canonical: map, .. } => map.len(),
991            SchemaStoreBackend::Heap(_) => 0,
992        }
993    }
994
995    fn get_raw_snapshot_for_backend(&self, key: &RawSchemaKey) -> Option<RawSchemaSnapshot> {
996        let SchemaStoreBackend::Journaled {
997            canonical,
998            live,
999            tombstones,
1000        } = &self.backend
1001        else {
1002            return None;
1003        };
1004
1005        if tombstones.contains(key) {
1006            return None;
1007        }
1008        live.get(key).cloned().or_else(|| canonical.get(key))
1009    }
1010
1011    fn latest_raw_snapshot(&self, entity: EntityTag) -> Option<RawSchemaSnapshot> {
1012        self.latest_raw_snapshot_entry(entity)
1013            .map(|(_, snapshot)| snapshot)
1014    }
1015
1016    fn latest_raw_snapshot_entry(
1017        &self,
1018        entity: EntityTag,
1019    ) -> Option<(SchemaVersion, RawSchemaSnapshot)> {
1020        let bounds = RawSchemaKey::entity_range_bounds(entity);
1021        match &self.backend {
1022            SchemaStoreBackend::Stable(map) => map
1023                .range((bounds.0, bounds.1))
1024                .next_back()
1025                .map(|entry| (SchemaVersion::new(entry.key().version()), entry.value())),
1026            SchemaStoreBackend::Heap(map) => map
1027                .range((bounds.0, bounds.1))
1028                .next_back()
1029                .map(|(key, snapshot)| (SchemaVersion::new(key.version()), snapshot.clone())),
1030            SchemaStoreBackend::Journaled {
1031                canonical,
1032                live,
1033                tombstones,
1034            } => {
1035                let mut latest = None;
1036                let _: Result<(), Infallible> = Self::visit_journaled_raw_snapshot_range(
1037                    canonical,
1038                    live,
1039                    tombstones,
1040                    bounds,
1041                    Direction::Desc,
1042                    |key, snapshot| {
1043                        latest = Some((SchemaVersion::new(key.version()), snapshot.clone()));
1044                        Ok(SchemaStoreVisit::Stop)
1045                    },
1046                );
1047                latest
1048            }
1049        }
1050    }
1051
1052    fn visit_journaled_raw_snapshot_range<E>(
1053        canonical: &StableBTreeMap<
1054            RawSchemaKey,
1055            RawSchemaSnapshot,
1056            VirtualMemory<DefaultMemoryImpl>,
1057        >,
1058        live: &StdBTreeMap<RawSchemaKey, RawSchemaSnapshot>,
1059        tombstones: &BTreeSet<RawSchemaKey>,
1060        bounds: (RangeBound<RawSchemaKey>, RangeBound<RawSchemaKey>),
1061        direction: Direction,
1062        mut visitor: impl FnMut(&RawSchemaKey, &RawSchemaSnapshot) -> Result<SchemaStoreVisit, E>,
1063    ) -> Result<(), E> {
1064        match direction {
1065            Direction::Asc => visit_ordered_overlay(
1066                canonical.range((bounds.0, bounds.1)),
1067                live.range((bounds.0, bounds.1)),
1068                Direction::Asc,
1069                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1070                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1071                |live_entry| !tombstones.contains(live_entry.0),
1072                |entry| {
1073                    let visit = match entry {
1074                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1075                            visitor(canonical_entry.key(), &canonical_entry.value())?
1076                        }
1077                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1078                    };
1079                    Ok(if visit.should_stop() {
1080                        OrderedOverlayVisit::Stop
1081                    } else {
1082                        OrderedOverlayVisit::Continue
1083                    })
1084                },
1085            ),
1086            Direction::Desc => visit_ordered_overlay(
1087                canonical.range((bounds.0, bounds.1)).rev(),
1088                live.range((bounds.0, bounds.1)).rev(),
1089                Direction::Desc,
1090                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
1091                |canonical_entry| !tombstones.contains(canonical_entry.key()),
1092                |live_entry| !tombstones.contains(live_entry.0),
1093                |entry| {
1094                    let visit = match entry {
1095                        OrderedOverlayEntry::Canonical(canonical_entry) => {
1096                            visitor(canonical_entry.key(), &canonical_entry.value())?
1097                        }
1098                        OrderedOverlayEntry::Live((key, snapshot)) => visitor(key, snapshot)?,
1099                    };
1100                    Ok(if visit.should_stop() {
1101                        OrderedOverlayVisit::Stop
1102                    } else {
1103                        OrderedOverlayVisit::Continue
1104                    })
1105                },
1106            ),
1107        }
1108    }
1109}
1110
1111fn derive_data_allocation_metadata(
1112    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1113) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1114    let mut max_version = SchemaVersion::initial();
1115    let mut hasher = new_hash_sha256();
1116    write_hash_tag_u8(
1117        &mut hasher,
1118        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1119    );
1120
1121    for (entity, (_, snapshot)) in latest_by_entity {
1122        let persisted = snapshot.decode_persisted_snapshot()?;
1123        if persisted.version() > max_version {
1124            max_version = persisted.version();
1125        }
1126
1127        let data_projection = PersistedSchemaSnapshot::new_with_primary_key_fields_and_indexes(
1128            persisted.version(),
1129            persisted.entity_path().to_string(),
1130            persisted.entity_name().to_string(),
1131            persisted.primary_key_field_ids().to_vec(),
1132            persisted.row_layout().clone(),
1133            persisted.fields().to_vec(),
1134            Vec::new(),
1135        );
1136        let encoded = encode_persisted_schema_snapshot(&data_projection)?;
1137
1138        write_hash_u64(&mut hasher, entity.value());
1139        write_hash_u32(&mut hasher, persisted.version().get());
1140        write_hash_len_u32(&mut hasher, encoded.len());
1141        hasher.update(encoded);
1142    }
1143
1144    Ok(finalize_schema_metadata(
1145        max_version,
1146        SCHEMA_STORE_DATA_ALLOCATION_FINGERPRINT_VERSION,
1147        hasher,
1148        latest_by_entity.len(),
1149    ))
1150}
1151
1152fn derive_index_allocation_metadata(
1153    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1154) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1155    let mut max_version = SchemaVersion::initial();
1156    let mut hasher = new_hash_sha256();
1157    write_hash_tag_u8(
1158        &mut hasher,
1159        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1160    );
1161
1162    for (entity, (_, snapshot)) in latest_by_entity {
1163        let persisted = snapshot.decode_persisted_snapshot()?;
1164        if persisted.version() > max_version {
1165            max_version = persisted.version();
1166        }
1167
1168        write_hash_u64(&mut hasher, entity.value());
1169        write_hash_u32(&mut hasher, persisted.version().get());
1170        write_hash_len_u32(&mut hasher, persisted.indexes().len());
1171        for index in persisted.indexes() {
1172            write_hash_u32(&mut hasher, u32::from(index.ordinal()));
1173            write_hash_str_u32(&mut hasher, index.name());
1174            write_hash_str_u32(&mut hasher, index.store());
1175            write_hash_tag_u8(&mut hasher, u8::from(index.unique()));
1176            write_hash_str_u32(&mut hasher, persisted_index_origin_name(index.origin()));
1177            match index.predicate_sql() {
1178                Some(predicate_sql) => {
1179                    write_hash_tag_u8(&mut hasher, 1);
1180                    write_hash_str_u32(&mut hasher, predicate_sql);
1181                }
1182                None => write_hash_tag_u8(&mut hasher, 0),
1183            }
1184            hash_persisted_index_key(&mut hasher, index.key());
1185        }
1186    }
1187
1188    Ok(finalize_schema_metadata(
1189        max_version,
1190        SCHEMA_STORE_INDEX_ALLOCATION_FINGERPRINT_VERSION,
1191        hasher,
1192        latest_by_entity.len(),
1193    ))
1194}
1195
1196fn derive_schema_catalog_metadata(
1197    latest_by_entity: &StdBTreeMap<EntityTag, (SchemaVersion, RawSchemaSnapshot)>,
1198) -> Result<SchemaStoreCatalogMetadata, InternalError> {
1199    let mut max_version = SchemaVersion::initial();
1200    let mut hasher = new_hash_sha256();
1201    write_hash_tag_u8(&mut hasher, SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION);
1202
1203    for (entity, (version, snapshot)) in latest_by_entity {
1204        let persisted = snapshot.decode_persisted_snapshot()?;
1205        if persisted.version() > max_version {
1206            max_version = persisted.version();
1207        }
1208
1209        write_hash_u64(&mut hasher, entity.value());
1210        write_hash_u32(&mut hasher, version.get());
1211        write_hash_len_u32(&mut hasher, snapshot.as_bytes().len());
1212        hasher.update(snapshot.as_bytes());
1213    }
1214
1215    Ok(finalize_schema_metadata(
1216        max_version,
1217        SCHEMA_STORE_CATALOG_FINGERPRINT_VERSION,
1218        hasher,
1219        latest_by_entity.len(),
1220    ))
1221}
1222
1223fn finalize_schema_metadata(
1224    schema_version: SchemaVersion,
1225    schema_fingerprint_method_version: u8,
1226    hasher: sha2::Sha256,
1227    entity_count: usize,
1228) -> SchemaStoreCatalogMetadata {
1229    let digest = finalize_hash_sha256(hasher);
1230    let mut schema_fingerprint = [0u8; 16];
1231    schema_fingerprint.copy_from_slice(&digest[..16]);
1232
1233    SchemaStoreCatalogMetadata::new(
1234        schema_version,
1235        schema_fingerprint_method_version,
1236        schema_fingerprint,
1237        u64::try_from(entity_count).unwrap_or(u64::MAX),
1238    )
1239}
1240
1241fn hash_persisted_index_key(hasher: &mut sha2::Sha256, key: &PersistedIndexKeySnapshot) {
1242    match key {
1243        PersistedIndexKeySnapshot::FieldPath(paths) => {
1244            write_hash_tag_u8(hasher, 1);
1245            write_hash_len_u32(hasher, paths.len());
1246            for path in paths {
1247                hash_persisted_index_field_path(hasher, path);
1248            }
1249        }
1250        PersistedIndexKeySnapshot::Items(items) => {
1251            write_hash_tag_u8(hasher, 2);
1252            write_hash_len_u32(hasher, items.len());
1253            for item in items {
1254                match item {
1255                    PersistedIndexKeyItemSnapshot::FieldPath(path) => {
1256                        write_hash_tag_u8(hasher, 1);
1257                        hash_persisted_index_field_path(hasher, path);
1258                    }
1259                    PersistedIndexKeyItemSnapshot::Expression(expression) => {
1260                        write_hash_tag_u8(hasher, 2);
1261                        write_hash_str_u32(hasher, persisted_expression_op_name(expression.op()));
1262                        hash_persisted_index_field_path(hasher, expression.source());
1263                        hash_persisted_field_kind(hasher, expression.input_kind());
1264                        hash_persisted_field_kind(hasher, expression.output_kind());
1265                        write_hash_str_u32(hasher, expression.canonical_text());
1266                    }
1267                }
1268            }
1269        }
1270    }
1271}
1272
1273fn hash_persisted_index_field_path(
1274    hasher: &mut sha2::Sha256,
1275    path: &crate::db::schema::PersistedIndexFieldPathSnapshot,
1276) {
1277    write_hash_u32(hasher, path.field_id().get());
1278    write_hash_u32(hasher, u32::from(path.slot().get()));
1279    write_hash_len_u32(hasher, path.path().len());
1280    for segment in path.path() {
1281        write_hash_str_u32(hasher, segment);
1282    }
1283    hash_persisted_field_kind(hasher, path.kind());
1284    write_hash_tag_u8(hasher, u8::from(path.nullable()));
1285}
1286
1287fn hash_persisted_field_kind(hasher: &mut sha2::Sha256, kind: &PersistedFieldKind) {
1288    match kind {
1289        PersistedFieldKind::Account => write_hash_tag_u8(hasher, 1),
1290        PersistedFieldKind::Blob { max_len } => {
1291            write_hash_tag_u8(hasher, 2);
1292            hash_optional_u32(hasher, *max_len);
1293        }
1294        PersistedFieldKind::Bool => write_hash_tag_u8(hasher, 3),
1295        PersistedFieldKind::Date => write_hash_tag_u8(hasher, 4),
1296        PersistedFieldKind::Decimal { scale } => {
1297            write_hash_tag_u8(hasher, 5);
1298            write_hash_u32(hasher, *scale);
1299        }
1300        PersistedFieldKind::Duration => write_hash_tag_u8(hasher, 6),
1301        PersistedFieldKind::Enum { path, variants } => {
1302            write_hash_tag_u8(hasher, 7);
1303            write_hash_str_u32(hasher, path);
1304            write_hash_len_u32(hasher, variants.len());
1305            for variant in variants {
1306                write_hash_str_u32(hasher, variant.ident());
1307                match variant.payload_kind() {
1308                    Some(payload_kind) => {
1309                        write_hash_tag_u8(hasher, 1);
1310                        hash_persisted_field_kind(hasher, payload_kind);
1311                    }
1312                    None => write_hash_tag_u8(hasher, 0),
1313                }
1314                write_hash_str_u32(
1315                    hasher,
1316                    field_storage_decode_name(variant.payload_storage_decode()),
1317                );
1318            }
1319        }
1320        PersistedFieldKind::Float32 => write_hash_tag_u8(hasher, 8),
1321        PersistedFieldKind::Float64 => write_hash_tag_u8(hasher, 9),
1322        PersistedFieldKind::Int8 => write_hash_tag_u8(hasher, 10),
1323        PersistedFieldKind::Int16 => write_hash_tag_u8(hasher, 11),
1324        PersistedFieldKind::Int32 => write_hash_tag_u8(hasher, 12),
1325        PersistedFieldKind::Int64 => write_hash_tag_u8(hasher, 13),
1326        PersistedFieldKind::Int128 => write_hash_tag_u8(hasher, 14),
1327        PersistedFieldKind::IntBig { max_bytes } => {
1328            write_hash_tag_u8(hasher, 15);
1329            write_hash_u32(hasher, *max_bytes);
1330        }
1331        PersistedFieldKind::Principal => write_hash_tag_u8(hasher, 16),
1332        PersistedFieldKind::Subaccount => write_hash_tag_u8(hasher, 17),
1333        PersistedFieldKind::Text { max_len } => {
1334            write_hash_tag_u8(hasher, 18);
1335            hash_optional_u32(hasher, *max_len);
1336        }
1337        PersistedFieldKind::Timestamp => write_hash_tag_u8(hasher, 19),
1338        PersistedFieldKind::Nat8 => write_hash_tag_u8(hasher, 20),
1339        PersistedFieldKind::Nat16 => write_hash_tag_u8(hasher, 21),
1340        PersistedFieldKind::Nat32 => write_hash_tag_u8(hasher, 22),
1341        PersistedFieldKind::Nat64 => write_hash_tag_u8(hasher, 23),
1342        PersistedFieldKind::Nat128 => write_hash_tag_u8(hasher, 24),
1343        PersistedFieldKind::NatBig { max_bytes } => {
1344            write_hash_tag_u8(hasher, 25);
1345            write_hash_u32(hasher, *max_bytes);
1346        }
1347        PersistedFieldKind::Ulid => write_hash_tag_u8(hasher, 26),
1348        PersistedFieldKind::Unit => write_hash_tag_u8(hasher, 27),
1349        PersistedFieldKind::Relation {
1350            target_path,
1351            target_entity_name,
1352            target_entity_tag,
1353            target_store_path,
1354            key_kind,
1355            strength,
1356        } => {
1357            write_hash_tag_u8(hasher, 28);
1358            write_hash_str_u32(hasher, target_path);
1359            write_hash_str_u32(hasher, target_entity_name);
1360            write_hash_u64(hasher, target_entity_tag.value());
1361            write_hash_str_u32(hasher, target_store_path);
1362            hash_persisted_field_kind(hasher, key_kind);
1363            write_hash_str_u32(hasher, persisted_relation_strength_name(*strength));
1364        }
1365        PersistedFieldKind::List(inner) => {
1366            write_hash_tag_u8(hasher, 29);
1367            hash_persisted_field_kind(hasher, inner);
1368        }
1369        PersistedFieldKind::Set(inner) => {
1370            write_hash_tag_u8(hasher, 30);
1371            hash_persisted_field_kind(hasher, inner);
1372        }
1373        PersistedFieldKind::Map { key, value } => {
1374            write_hash_tag_u8(hasher, 31);
1375            hash_persisted_field_kind(hasher, key);
1376            hash_persisted_field_kind(hasher, value);
1377        }
1378        PersistedFieldKind::Structured { queryable } => {
1379            write_hash_tag_u8(hasher, 32);
1380            write_hash_tag_u8(hasher, u8::from(*queryable));
1381        }
1382    }
1383}
1384
1385fn hash_optional_u32(hasher: &mut sha2::Sha256, value: Option<u32>) {
1386    match value {
1387        Some(value) => {
1388            write_hash_tag_u8(hasher, 1);
1389            write_hash_u32(hasher, value);
1390        }
1391        None => write_hash_tag_u8(hasher, 0),
1392    }
1393}
1394
1395const fn persisted_index_origin_name(
1396    origin: crate::db::schema::PersistedIndexOrigin,
1397) -> &'static str {
1398    match origin {
1399        crate::db::schema::PersistedIndexOrigin::Generated => "generated",
1400        crate::db::schema::PersistedIndexOrigin::SqlDdl => "sql_ddl",
1401    }
1402}
1403
1404const fn persisted_expression_op_name(
1405    op: crate::db::schema::PersistedIndexExpressionOp,
1406) -> &'static str {
1407    match op {
1408        crate::db::schema::PersistedIndexExpressionOp::Lower => "lower",
1409        crate::db::schema::PersistedIndexExpressionOp::Upper => "upper",
1410        crate::db::schema::PersistedIndexExpressionOp::Trim => "trim",
1411        crate::db::schema::PersistedIndexExpressionOp::LowerTrim => "lower_trim",
1412        crate::db::schema::PersistedIndexExpressionOp::Date => "date",
1413        crate::db::schema::PersistedIndexExpressionOp::Year => "year",
1414        crate::db::schema::PersistedIndexExpressionOp::Month => "month",
1415        crate::db::schema::PersistedIndexExpressionOp::Day => "day",
1416    }
1417}
1418
1419const fn persisted_relation_strength_name(
1420    strength: crate::db::schema::PersistedRelationStrength,
1421) -> &'static str {
1422    match strength {
1423        crate::db::schema::PersistedRelationStrength::Strong => "strong",
1424        crate::db::schema::PersistedRelationStrength::Weak => "weak",
1425    }
1426}
1427
1428const fn field_storage_decode_name(
1429    decode: crate::model::field::FieldStorageDecode,
1430) -> &'static str {
1431    match decode {
1432        crate::model::field::FieldStorageDecode::ByKind => "by_kind",
1433        crate::model::field::FieldStorageDecode::Value => "value",
1434    }
1435}
1436
1437///
1438/// TESTS
1439///
1440
1441#[cfg(test)]
1442mod tests;