Skip to main content

statevec_model/
registry.rs

1// Copyright 2026 Jumpex Technology.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::model::{
5    CommandDefinition, EnumDefinition, EventDefinition, FieldDefinition, FieldType,
6    PayloadFieldDefinition, PkBuilder, PkBytes, RecordDefinition, RecordKind, Version, read_bool,
7    read_i32_le, read_i64_le, read_u8, read_u16_le, read_u32_le, read_u64_le,
8};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12/// Compact stable fingerprint for one schema section.
13///
14/// Fingerprints are deterministic drift detectors for schema compatibility.
15/// They are not cryptographic hashes and should not be used as an
16/// adversarial-collision defense.
17pub type SchemaFingerprint = [u8; 16];
18
19/// Complete schema identity for compatibility checks.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21pub struct SchemaIdentity {
22    /// Schema module version.
23    pub schema_version: Version,
24    /// Fingerprint of named enum/type definitions.
25    pub types_schema_fingerprint: SchemaFingerprint,
26    /// Fingerprint of record definitions.
27    pub record_schema_fingerprint: SchemaFingerprint,
28    /// Fingerprint of command definitions.
29    pub command_schema_fingerprint: SchemaFingerprint,
30    /// Fingerprint of event definitions.
31    pub event_schema_fingerprint: SchemaFingerprint,
32}
33
34/// Registry of all schema definitions exported by one domain schema module.
35#[derive(Debug, Clone)]
36pub struct SchemaRegistry {
37    schema_version: Version,
38    record_defs: BTreeMap<RecordKind, RecordDefinition>,
39    command_defs: BTreeMap<u8, CommandDefinition>,
40    event_defs: BTreeMap<u8, EventDefinition>,
41    enum_defs: BTreeMap<&'static str, EnumDefinition>,
42    record_schema_fingerprint: SchemaFingerprint,
43    command_schema_fingerprint: SchemaFingerprint,
44    event_schema_fingerprint: SchemaFingerprint,
45    types_schema_fingerprint: SchemaFingerprint,
46}
47
48impl SchemaRegistry {
49    /// Builds a schema registry from static definitions.
50    pub fn new(
51        schema_version: Version,
52        record_defs: &[RecordDefinition],
53        command_defs: &[CommandDefinition],
54        event_defs: &[EventDefinition],
55        enum_defs: &[EnumDefinition],
56    ) -> Self {
57        let record_defs = build_record_map(record_defs);
58        let command_defs = build_command_map(command_defs);
59        let event_defs = build_event_map(event_defs);
60        let enum_defs = build_enum_map(enum_defs);
61
62        let record_schema_fingerprint = fingerprint_record_defs(record_defs.values().copied());
63        let command_schema_fingerprint = fingerprint_command_defs(command_defs.values().copied());
64        let event_schema_fingerprint = fingerprint_event_defs(event_defs.values().copied());
65        let types_schema_fingerprint = fingerprint_enum_defs(enum_defs.values().copied());
66
67        Self {
68            schema_version,
69            record_defs,
70            command_defs,
71            event_defs,
72            enum_defs,
73            record_schema_fingerprint,
74            command_schema_fingerprint,
75            event_schema_fingerprint,
76            types_schema_fingerprint,
77        }
78    }
79
80    /// Builds a schema registry containing only record definitions.
81    pub fn with_records(schema_version: Version, record_defs: &[RecordDefinition]) -> Self {
82        Self::new(schema_version, record_defs, &[], &[], &[])
83    }
84
85    #[allow(clippy::too_many_arguments)]
86    pub(crate) fn new_with_fingerprints(
87        schema_version: Version,
88        record_defs: &[RecordDefinition],
89        command_defs: &[CommandDefinition],
90        event_defs: &[EventDefinition],
91        enum_defs: &[EnumDefinition],
92        record_schema_fingerprint: SchemaFingerprint,
93        command_schema_fingerprint: SchemaFingerprint,
94        event_schema_fingerprint: SchemaFingerprint,
95        types_schema_fingerprint: SchemaFingerprint,
96    ) -> Self {
97        let mut registry = Self::new(
98            schema_version,
99            record_defs,
100            command_defs,
101            event_defs,
102            enum_defs,
103        );
104        registry.record_schema_fingerprint = record_schema_fingerprint;
105        registry.command_schema_fingerprint = command_schema_fingerprint;
106        registry.event_schema_fingerprint = event_schema_fingerprint;
107        registry.types_schema_fingerprint = types_schema_fingerprint;
108        registry
109    }
110
111    /// Returns the schema module version.
112    #[inline(always)]
113    pub fn schema_version(&self) -> Version {
114        self.schema_version
115    }
116
117    /// Returns the record schema fingerprint.
118    #[inline(always)]
119    pub fn record_schema_fingerprint(&self) -> SchemaFingerprint {
120        self.record_schema_fingerprint
121    }
122
123    /// Returns the command schema fingerprint.
124    #[inline(always)]
125    pub fn command_schema_fingerprint(&self) -> SchemaFingerprint {
126        self.command_schema_fingerprint
127    }
128
129    /// Returns the event schema fingerprint.
130    #[inline(always)]
131    pub fn event_schema_fingerprint(&self) -> SchemaFingerprint {
132        self.event_schema_fingerprint
133    }
134
135    /// Returns the enum/type schema fingerprint.
136    #[inline(always)]
137    pub fn types_schema_fingerprint(&self) -> SchemaFingerprint {
138        self.types_schema_fingerprint
139    }
140
141    /// Returns the complete schema identity.
142    #[inline(always)]
143    pub fn identity(&self) -> SchemaIdentity {
144        SchemaIdentity {
145            schema_version: self.schema_version,
146            types_schema_fingerprint: self.types_schema_fingerprint,
147            record_schema_fingerprint: self.record_schema_fingerprint,
148            command_schema_fingerprint: self.command_schema_fingerprint,
149            event_schema_fingerprint: self.event_schema_fingerprint,
150        }
151    }
152
153    /// Returns a record definition by kind.
154    pub fn try_get(&self, kind: RecordKind) -> Option<&RecordDefinition> {
155        self.record_defs.get(&kind)
156    }
157
158    /// Iterates record definitions in stable kind order.
159    #[inline]
160    pub fn record_defs(&self) -> impl Iterator<Item = &RecordDefinition> + '_ {
161        self.record_defs.values()
162    }
163
164    /// Returns a command definition by kind.
165    pub fn try_get_command(&self, kind: u8) -> Option<&CommandDefinition> {
166        self.command_defs.get(&kind)
167    }
168
169    /// Iterates command definitions in stable kind order.
170    #[inline]
171    pub fn command_defs(&self) -> impl Iterator<Item = &CommandDefinition> + '_ {
172        self.command_defs.values()
173    }
174
175    /// Returns an event definition by kind.
176    pub fn try_get_event(&self, kind: u8) -> Option<&EventDefinition> {
177        self.event_defs.get(&kind)
178    }
179
180    /// Iterates event definitions in stable kind order.
181    #[inline]
182    pub fn event_defs(&self) -> impl Iterator<Item = &EventDefinition> + '_ {
183        self.event_defs.values()
184    }
185
186    /// Iterates enum definitions in stable name order.
187    #[inline]
188    pub fn enum_defs(&self) -> impl Iterator<Item = &EnumDefinition> + '_ {
189        self.enum_defs.values()
190    }
191
192    /// Encodes a primary key for a record buffer when the record supports it.
193    #[inline]
194    pub fn encode_pk(&self, kind: RecordKind, data: &[u8]) -> Option<PkBytes> {
195        let def = self.try_get(kind)?;
196        if let Some(encode) = def.pk_encode {
197            return Some(encode(data));
198        }
199        encode_pk_generic(def, data)
200    }
201
202    /// Returns whether the record kind has enough metadata for primary-key encoding.
203    #[inline]
204    pub fn supports_pk_encoding(&self, kind: RecordKind) -> bool {
205        let Some(def) = self.try_get(kind) else {
206            return false;
207        };
208        def.pk_encode.is_some() || can_encode_pk_generic(def)
209    }
210}
211
212fn encode_pk_generic(def: &RecordDefinition, data: &[u8]) -> Option<PkBytes> {
213    if !can_encode_pk_generic(def) {
214        return None;
215    }
216
217    let mut pk = PkBuilder::new();
218    for field_name in def.pk_fields {
219        let field = def.field_by_name(field_name)?;
220        push_pk_field_bytes(&mut pk, field, data)?;
221    }
222    Some(pk.finish())
223}
224
225fn can_encode_pk_generic(def: &RecordDefinition) -> bool {
226    if !def.is_pk_idx || def.pk_fields.is_empty() {
227        return false;
228    }
229
230    for field_name in def.pk_fields {
231        let Some(field) = def.field_by_name(field_name) else {
232            return false;
233        };
234        match field.ty {
235            FieldType::Bool
236            | FieldType::U8
237            | FieldType::EnumU8
238            | FieldType::U16
239            | FieldType::U32
240            | FieldType::U64
241            | FieldType::I32
242            | FieldType::I64 => {}
243            FieldType::FixedBytes => {
244                if field.len < 2 {
245                    return false;
246                }
247            }
248            FieldType::U128 | FieldType::VarBytes => return false,
249        }
250    }
251    true
252}
253
254fn push_pk_field_bytes(
255    builder: &mut PkBuilder,
256    field: &FieldDefinition,
257    data: &[u8],
258) -> Option<()> {
259    let offset = field.offset as usize;
260    match field.ty {
261        FieldType::Bool => builder.push_u8(u8::from(read_bool(data, offset).ok()?)),
262        FieldType::U8 | FieldType::EnumU8 => builder.push_u8(read_u8(data, offset).ok()?),
263        FieldType::U16 => builder.push_u16(read_u16_le(data, offset).ok()?),
264        FieldType::U32 => builder.push_u32(read_u32_le(data, offset).ok()?),
265        FieldType::U64 => builder.push_u64(read_u64_le(data, offset).ok()?),
266        FieldType::I32 => builder.push_i32(read_i32_le(data, offset).ok()?),
267        FieldType::I64 => builder.push_i64(read_i64_le(data, offset).ok()?),
268        FieldType::FixedBytes => {
269            let padded_len = usize::try_from(field.len).ok()?;
270            if padded_len < 2 {
271                return None;
272            }
273            let start = offset.checked_add(2)?;
274            let end = offset.checked_add(padded_len)?;
275            let bytes = data.get(start..end)?;
276            builder.push_bytes(bytes);
277        }
278        FieldType::U128 | FieldType::VarBytes => return None,
279    }
280    Some(())
281}
282
283fn build_record_map(defs: &[RecordDefinition]) -> BTreeMap<RecordKind, RecordDefinition> {
284    let mut map = BTreeMap::new();
285    for def in defs {
286        assert_ne!(
287            def.kind, 0,
288            "record kind 0 is reserved and cannot be registered: {}",
289            def.name
290        );
291        assert!(
292            map.insert(def.kind, *def).is_none(),
293            "duplicate record kind registered: kind={}, name={}",
294            def.kind,
295            def.name
296        );
297    }
298    map
299}
300
301fn build_command_map(defs: &[CommandDefinition]) -> BTreeMap<u8, CommandDefinition> {
302    let mut map = BTreeMap::new();
303    for def in defs {
304        assert!(
305            map.insert(def.kind, *def).is_none(),
306            "duplicate command kind registered: kind={}, name={}",
307            def.kind,
308            def.name
309        );
310    }
311    map
312}
313
314fn build_event_map(defs: &[EventDefinition]) -> BTreeMap<u8, EventDefinition> {
315    let mut map = BTreeMap::new();
316    for def in defs {
317        assert!(
318            map.insert(def.kind, *def).is_none(),
319            "duplicate event kind registered: kind={}, name={}",
320            def.kind,
321            def.name
322        );
323    }
324    map
325}
326
327fn build_enum_map(defs: &[EnumDefinition]) -> BTreeMap<&'static str, EnumDefinition> {
328    let mut map = BTreeMap::new();
329    for def in defs {
330        assert!(
331            map.insert(def.name, *def).is_none(),
332            "duplicate enum definition registered: name={}",
333            def.name
334        );
335    }
336    map
337}
338
339#[derive(Clone, Copy)]
340struct Fingerprinter {
341    lo: u64,
342    hi: u64,
343}
344
345impl Fingerprinter {
346    const LO_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
347    const HI_OFFSET: u64 = 0x8422_2325_cbf2_9ce4;
348    const PRIME: u64 = 0x0000_0100_0000_01b3;
349
350    fn new() -> Self {
351        Self {
352            lo: Self::LO_OFFSET,
353            hi: Self::HI_OFFSET,
354        }
355    }
356
357    fn write_u8(&mut self, value: u8) {
358        self.lo ^= value as u64;
359        self.lo = self.lo.wrapping_mul(Self::PRIME);
360        self.hi ^= (value as u64).wrapping_add(0x9e37_79b9);
361        self.hi = self.hi.wrapping_mul(Self::PRIME);
362    }
363
364    fn write_bool(&mut self, value: bool) {
365        self.write_u8(u8::from(value));
366    }
367
368    fn write_u16(&mut self, value: u16) {
369        self.write_bytes(&value.to_le_bytes());
370    }
371
372    fn write_u32(&mut self, value: u32) {
373        self.write_bytes(&value.to_le_bytes());
374    }
375
376    fn write_str(&mut self, value: &str) {
377        self.write_u32(value.len() as u32);
378        self.write_bytes(value.as_bytes());
379    }
380
381    fn write_opt_str(&mut self, value: Option<&str>) {
382        match value {
383            Some(value) => {
384                self.write_u8(1);
385                self.write_str(value);
386            }
387            None => self.write_u8(0),
388        }
389    }
390
391    fn write_opt_u32(&mut self, value: Option<u32>) {
392        match value {
393            Some(value) => {
394                self.write_u8(1);
395                self.write_u32(value);
396            }
397            None => self.write_u8(0),
398        }
399    }
400
401    fn write_bytes(&mut self, bytes: &[u8]) {
402        for &byte in bytes {
403            self.write_u8(byte);
404        }
405    }
406
407    fn finish(self) -> SchemaFingerprint {
408        let mut out = [0u8; 16];
409        out[..8].copy_from_slice(&self.lo.to_le_bytes());
410        out[8..].copy_from_slice(&self.hi.to_le_bytes());
411        out
412    }
413}
414
415fn fingerprint_record_defs(defs: impl IntoIterator<Item = RecordDefinition>) -> SchemaFingerprint {
416    let mut fp = Fingerprinter::new();
417    for def in defs {
418        fp.write_u8(def.kind);
419        fp.write_str(def.name);
420        fp.write_bool(def.is_pk_idx);
421        fp.write_bool(def.support_range_scan);
422        fp.write_u32(def.data_size);
423        fp.write_u16(def.version);
424        fp.write_bool(def.pk_encode.is_some());
425        fp.write_u32(def.fields.len() as u32);
426        for field in def.fields {
427            write_record_field(&mut fp, field);
428        }
429        fp.write_u32(def.reserved_fields.len() as u32);
430        for field in def.reserved_fields {
431            write_record_field(&mut fp, field);
432        }
433        fp.write_u32(def.pk_fields.len() as u32);
434        for pk_field in def.pk_fields {
435            fp.write_str(pk_field);
436        }
437    }
438    fp.finish()
439}
440
441fn fingerprint_command_defs(
442    defs: impl IntoIterator<Item = CommandDefinition>,
443) -> SchemaFingerprint {
444    let mut fp = Fingerprinter::new();
445    for def in defs {
446        fp.write_u8(def.kind);
447        fp.write_str(def.name);
448        fp.write_u16(def.version);
449        fp.write_u32(def.fields.len() as u32);
450        for field in def.fields {
451            write_payload_field(&mut fp, field);
452        }
453    }
454    fp.finish()
455}
456
457fn fingerprint_event_defs(defs: impl IntoIterator<Item = EventDefinition>) -> SchemaFingerprint {
458    let mut fp = Fingerprinter::new();
459    for def in defs {
460        fp.write_u8(def.kind);
461        fp.write_str(def.name);
462        fp.write_u16(def.version);
463        fp.write_u32(def.fields.len() as u32);
464        for field in def.fields {
465            write_payload_field(&mut fp, field);
466        }
467    }
468    fp.finish()
469}
470
471fn fingerprint_enum_defs(defs: impl IntoIterator<Item = EnumDefinition>) -> SchemaFingerprint {
472    let mut fp = Fingerprinter::new();
473    for def in defs {
474        fp.write_str(def.name);
475        fp.write_u32(def.variants.len() as u32);
476        for variant in def.variants {
477            fp.write_str(variant.name);
478            fp.write_u8(variant.discriminant);
479        }
480    }
481    fp.finish()
482}
483
484fn write_record_field(fp: &mut Fingerprinter, field: &FieldDefinition) {
485    fp.write_str(field.name);
486    fp.write_u32(field.field_index);
487    fp.write_u32(field.offset);
488    fp.write_u8(field_type_tag(field.ty));
489    fp.write_u32(field.len);
490    fp.write_str(field.rust_type_name);
491    fp.write_opt_str(field.enum_type_name);
492    fp.write_bool(field.immutable);
493}
494
495fn write_payload_field(fp: &mut Fingerprinter, field: &PayloadFieldDefinition) {
496    fp.write_str(field.name);
497    fp.write_u32(field.field_index);
498    fp.write_u8(field_type_tag(field.ty));
499    fp.write_str(field.rust_type_name);
500    fp.write_opt_str(field.enum_type_name);
501    fp.write_opt_u32(field.fixed_size);
502}
503
504fn field_type_tag(ty: FieldType) -> u8 {
505    match ty {
506        FieldType::Bool => 1,
507        FieldType::U8 => 2,
508        FieldType::U16 => 3,
509        FieldType::U32 => 4,
510        FieldType::U64 => 5,
511        FieldType::I32 => 6,
512        FieldType::I64 => 7,
513        FieldType::U128 => 8,
514        FieldType::FixedBytes => 9,
515        FieldType::VarBytes => 10,
516        FieldType::EnumU8 => 11,
517    }
518}