1use crate::model::{
5 CommandDefinition, EnumDefinition, EventDefinition, FieldDefinition, FieldType,
6 PayloadFieldDefinition, PkBuilder, PkBytes, RecordDefinition, RecordKind, Version, read_bool,
7 read_i32_le, read_i64_le, read_u8, read_u16_le, read_u32_le, read_u64_le,
8};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11
12pub type SchemaFingerprint = [u8; 16];
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21pub struct SchemaIdentity {
22 pub schema_version: Version,
24 pub types_schema_fingerprint: SchemaFingerprint,
26 pub record_schema_fingerprint: SchemaFingerprint,
28 pub command_schema_fingerprint: SchemaFingerprint,
30 pub event_schema_fingerprint: SchemaFingerprint,
32}
33
34#[derive(Debug, Clone)]
36pub struct SchemaRegistry {
37 schema_version: Version,
38 record_defs: BTreeMap<RecordKind, RecordDefinition>,
39 command_defs: BTreeMap<u8, CommandDefinition>,
40 event_defs: BTreeMap<u8, EventDefinition>,
41 enum_defs: BTreeMap<&'static str, EnumDefinition>,
42 record_schema_fingerprint: SchemaFingerprint,
43 command_schema_fingerprint: SchemaFingerprint,
44 event_schema_fingerprint: SchemaFingerprint,
45 types_schema_fingerprint: SchemaFingerprint,
46}
47
48impl SchemaRegistry {
49 pub fn new(
51 schema_version: Version,
52 record_defs: &[RecordDefinition],
53 command_defs: &[CommandDefinition],
54 event_defs: &[EventDefinition],
55 enum_defs: &[EnumDefinition],
56 ) -> Self {
57 let record_defs = build_record_map(record_defs);
58 let command_defs = build_command_map(command_defs);
59 let event_defs = build_event_map(event_defs);
60 let enum_defs = build_enum_map(enum_defs);
61
62 let record_schema_fingerprint = fingerprint_record_defs(record_defs.values().copied());
63 let command_schema_fingerprint = fingerprint_command_defs(command_defs.values().copied());
64 let event_schema_fingerprint = fingerprint_event_defs(event_defs.values().copied());
65 let types_schema_fingerprint = fingerprint_enum_defs(enum_defs.values().copied());
66
67 Self {
68 schema_version,
69 record_defs,
70 command_defs,
71 event_defs,
72 enum_defs,
73 record_schema_fingerprint,
74 command_schema_fingerprint,
75 event_schema_fingerprint,
76 types_schema_fingerprint,
77 }
78 }
79
80 pub fn with_records(schema_version: Version, record_defs: &[RecordDefinition]) -> Self {
82 Self::new(schema_version, record_defs, &[], &[], &[])
83 }
84
85 #[allow(clippy::too_many_arguments)]
86 pub(crate) fn new_with_fingerprints(
87 schema_version: Version,
88 record_defs: &[RecordDefinition],
89 command_defs: &[CommandDefinition],
90 event_defs: &[EventDefinition],
91 enum_defs: &[EnumDefinition],
92 record_schema_fingerprint: SchemaFingerprint,
93 command_schema_fingerprint: SchemaFingerprint,
94 event_schema_fingerprint: SchemaFingerprint,
95 types_schema_fingerprint: SchemaFingerprint,
96 ) -> Self {
97 let mut registry = Self::new(
98 schema_version,
99 record_defs,
100 command_defs,
101 event_defs,
102 enum_defs,
103 );
104 registry.record_schema_fingerprint = record_schema_fingerprint;
105 registry.command_schema_fingerprint = command_schema_fingerprint;
106 registry.event_schema_fingerprint = event_schema_fingerprint;
107 registry.types_schema_fingerprint = types_schema_fingerprint;
108 registry
109 }
110
111 #[inline(always)]
113 pub fn schema_version(&self) -> Version {
114 self.schema_version
115 }
116
117 #[inline(always)]
119 pub fn record_schema_fingerprint(&self) -> SchemaFingerprint {
120 self.record_schema_fingerprint
121 }
122
123 #[inline(always)]
125 pub fn command_schema_fingerprint(&self) -> SchemaFingerprint {
126 self.command_schema_fingerprint
127 }
128
129 #[inline(always)]
131 pub fn event_schema_fingerprint(&self) -> SchemaFingerprint {
132 self.event_schema_fingerprint
133 }
134
135 #[inline(always)]
137 pub fn types_schema_fingerprint(&self) -> SchemaFingerprint {
138 self.types_schema_fingerprint
139 }
140
141 #[inline(always)]
143 pub fn identity(&self) -> SchemaIdentity {
144 SchemaIdentity {
145 schema_version: self.schema_version,
146 types_schema_fingerprint: self.types_schema_fingerprint,
147 record_schema_fingerprint: self.record_schema_fingerprint,
148 command_schema_fingerprint: self.command_schema_fingerprint,
149 event_schema_fingerprint: self.event_schema_fingerprint,
150 }
151 }
152
153 pub fn try_get(&self, kind: RecordKind) -> Option<&RecordDefinition> {
155 self.record_defs.get(&kind)
156 }
157
158 #[inline]
160 pub fn record_defs(&self) -> impl Iterator<Item = &RecordDefinition> + '_ {
161 self.record_defs.values()
162 }
163
164 pub fn try_get_command(&self, kind: u8) -> Option<&CommandDefinition> {
166 self.command_defs.get(&kind)
167 }
168
169 #[inline]
171 pub fn command_defs(&self) -> impl Iterator<Item = &CommandDefinition> + '_ {
172 self.command_defs.values()
173 }
174
175 pub fn try_get_event(&self, kind: u8) -> Option<&EventDefinition> {
177 self.event_defs.get(&kind)
178 }
179
180 #[inline]
182 pub fn event_defs(&self) -> impl Iterator<Item = &EventDefinition> + '_ {
183 self.event_defs.values()
184 }
185
186 #[inline]
188 pub fn enum_defs(&self) -> impl Iterator<Item = &EnumDefinition> + '_ {
189 self.enum_defs.values()
190 }
191
192 #[inline]
194 pub fn encode_pk(&self, kind: RecordKind, data: &[u8]) -> Option<PkBytes> {
195 let def = self.try_get(kind)?;
196 if let Some(encode) = def.pk_encode {
197 return Some(encode(data));
198 }
199 encode_pk_generic(def, data)
200 }
201
202 #[inline]
204 pub fn supports_pk_encoding(&self, kind: RecordKind) -> bool {
205 let Some(def) = self.try_get(kind) else {
206 return false;
207 };
208 def.pk_encode.is_some() || can_encode_pk_generic(def)
209 }
210}
211
212fn encode_pk_generic(def: &RecordDefinition, data: &[u8]) -> Option<PkBytes> {
213 if !can_encode_pk_generic(def) {
214 return None;
215 }
216
217 let mut pk = PkBuilder::new();
218 for field_name in def.pk_fields {
219 let field = def.field_by_name(field_name)?;
220 push_pk_field_bytes(&mut pk, field, data)?;
221 }
222 Some(pk.finish())
223}
224
225fn can_encode_pk_generic(def: &RecordDefinition) -> bool {
226 if !def.is_pk_idx || def.pk_fields.is_empty() {
227 return false;
228 }
229
230 for field_name in def.pk_fields {
231 let Some(field) = def.field_by_name(field_name) else {
232 return false;
233 };
234 match field.ty {
235 FieldType::Bool
236 | FieldType::U8
237 | FieldType::EnumU8
238 | FieldType::U16
239 | FieldType::U32
240 | FieldType::U64
241 | FieldType::I32
242 | FieldType::I64 => {}
243 FieldType::FixedBytes => {
244 if field.len < 2 {
245 return false;
246 }
247 }
248 FieldType::U128 | FieldType::VarBytes => return false,
249 }
250 }
251 true
252}
253
254fn push_pk_field_bytes(
255 builder: &mut PkBuilder,
256 field: &FieldDefinition,
257 data: &[u8],
258) -> Option<()> {
259 let offset = field.offset as usize;
260 match field.ty {
261 FieldType::Bool => builder.push_u8(u8::from(read_bool(data, offset).ok()?)),
262 FieldType::U8 | FieldType::EnumU8 => builder.push_u8(read_u8(data, offset).ok()?),
263 FieldType::U16 => builder.push_u16(read_u16_le(data, offset).ok()?),
264 FieldType::U32 => builder.push_u32(read_u32_le(data, offset).ok()?),
265 FieldType::U64 => builder.push_u64(read_u64_le(data, offset).ok()?),
266 FieldType::I32 => builder.push_i32(read_i32_le(data, offset).ok()?),
267 FieldType::I64 => builder.push_i64(read_i64_le(data, offset).ok()?),
268 FieldType::FixedBytes => {
269 let padded_len = usize::try_from(field.len).ok()?;
270 if padded_len < 2 {
271 return None;
272 }
273 let start = offset.checked_add(2)?;
274 let end = offset.checked_add(padded_len)?;
275 let bytes = data.get(start..end)?;
276 builder.push_bytes(bytes);
277 }
278 FieldType::U128 | FieldType::VarBytes => return None,
279 }
280 Some(())
281}
282
283fn build_record_map(defs: &[RecordDefinition]) -> BTreeMap<RecordKind, RecordDefinition> {
284 let mut map = BTreeMap::new();
285 for def in defs {
286 assert_ne!(
287 def.kind, 0,
288 "record kind 0 is reserved and cannot be registered: {}",
289 def.name
290 );
291 assert!(
292 map.insert(def.kind, *def).is_none(),
293 "duplicate record kind registered: kind={}, name={}",
294 def.kind,
295 def.name
296 );
297 }
298 map
299}
300
301fn build_command_map(defs: &[CommandDefinition]) -> BTreeMap<u8, CommandDefinition> {
302 let mut map = BTreeMap::new();
303 for def in defs {
304 assert!(
305 map.insert(def.kind, *def).is_none(),
306 "duplicate command kind registered: kind={}, name={}",
307 def.kind,
308 def.name
309 );
310 }
311 map
312}
313
314fn build_event_map(defs: &[EventDefinition]) -> BTreeMap<u8, EventDefinition> {
315 let mut map = BTreeMap::new();
316 for def in defs {
317 assert!(
318 map.insert(def.kind, *def).is_none(),
319 "duplicate event kind registered: kind={}, name={}",
320 def.kind,
321 def.name
322 );
323 }
324 map
325}
326
327fn build_enum_map(defs: &[EnumDefinition]) -> BTreeMap<&'static str, EnumDefinition> {
328 let mut map = BTreeMap::new();
329 for def in defs {
330 assert!(
331 map.insert(def.name, *def).is_none(),
332 "duplicate enum definition registered: name={}",
333 def.name
334 );
335 }
336 map
337}
338
339#[derive(Clone, Copy)]
340struct Fingerprinter {
341 lo: u64,
342 hi: u64,
343}
344
345impl Fingerprinter {
346 const LO_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
347 const HI_OFFSET: u64 = 0x8422_2325_cbf2_9ce4;
348 const PRIME: u64 = 0x0000_0100_0000_01b3;
349
350 fn new() -> Self {
351 Self {
352 lo: Self::LO_OFFSET,
353 hi: Self::HI_OFFSET,
354 }
355 }
356
357 fn write_u8(&mut self, value: u8) {
358 self.lo ^= value as u64;
359 self.lo = self.lo.wrapping_mul(Self::PRIME);
360 self.hi ^= (value as u64).wrapping_add(0x9e37_79b9);
361 self.hi = self.hi.wrapping_mul(Self::PRIME);
362 }
363
364 fn write_bool(&mut self, value: bool) {
365 self.write_u8(u8::from(value));
366 }
367
368 fn write_u16(&mut self, value: u16) {
369 self.write_bytes(&value.to_le_bytes());
370 }
371
372 fn write_u32(&mut self, value: u32) {
373 self.write_bytes(&value.to_le_bytes());
374 }
375
376 fn write_str(&mut self, value: &str) {
377 self.write_u32(value.len() as u32);
378 self.write_bytes(value.as_bytes());
379 }
380
381 fn write_opt_str(&mut self, value: Option<&str>) {
382 match value {
383 Some(value) => {
384 self.write_u8(1);
385 self.write_str(value);
386 }
387 None => self.write_u8(0),
388 }
389 }
390
391 fn write_opt_u32(&mut self, value: Option<u32>) {
392 match value {
393 Some(value) => {
394 self.write_u8(1);
395 self.write_u32(value);
396 }
397 None => self.write_u8(0),
398 }
399 }
400
401 fn write_bytes(&mut self, bytes: &[u8]) {
402 for &byte in bytes {
403 self.write_u8(byte);
404 }
405 }
406
407 fn finish(self) -> SchemaFingerprint {
408 let mut out = [0u8; 16];
409 out[..8].copy_from_slice(&self.lo.to_le_bytes());
410 out[8..].copy_from_slice(&self.hi.to_le_bytes());
411 out
412 }
413}
414
415fn fingerprint_record_defs(defs: impl IntoIterator<Item = RecordDefinition>) -> SchemaFingerprint {
416 let mut fp = Fingerprinter::new();
417 for def in defs {
418 fp.write_u8(def.kind);
419 fp.write_str(def.name);
420 fp.write_bool(def.is_pk_idx);
421 fp.write_bool(def.support_range_scan);
422 fp.write_u32(def.data_size);
423 fp.write_u16(def.version);
424 fp.write_bool(def.pk_encode.is_some());
425 fp.write_u32(def.fields.len() as u32);
426 for field in def.fields {
427 write_record_field(&mut fp, field);
428 }
429 fp.write_u32(def.reserved_fields.len() as u32);
430 for field in def.reserved_fields {
431 write_record_field(&mut fp, field);
432 }
433 fp.write_u32(def.pk_fields.len() as u32);
434 for pk_field in def.pk_fields {
435 fp.write_str(pk_field);
436 }
437 }
438 fp.finish()
439}
440
441fn fingerprint_command_defs(
442 defs: impl IntoIterator<Item = CommandDefinition>,
443) -> SchemaFingerprint {
444 let mut fp = Fingerprinter::new();
445 for def in defs {
446 fp.write_u8(def.kind);
447 fp.write_str(def.name);
448 fp.write_u16(def.version);
449 fp.write_u32(def.fields.len() as u32);
450 for field in def.fields {
451 write_payload_field(&mut fp, field);
452 }
453 }
454 fp.finish()
455}
456
457fn fingerprint_event_defs(defs: impl IntoIterator<Item = EventDefinition>) -> SchemaFingerprint {
458 let mut fp = Fingerprinter::new();
459 for def in defs {
460 fp.write_u8(def.kind);
461 fp.write_str(def.name);
462 fp.write_u16(def.version);
463 fp.write_u32(def.fields.len() as u32);
464 for field in def.fields {
465 write_payload_field(&mut fp, field);
466 }
467 }
468 fp.finish()
469}
470
471fn fingerprint_enum_defs(defs: impl IntoIterator<Item = EnumDefinition>) -> SchemaFingerprint {
472 let mut fp = Fingerprinter::new();
473 for def in defs {
474 fp.write_str(def.name);
475 fp.write_u32(def.variants.len() as u32);
476 for variant in def.variants {
477 fp.write_str(variant.name);
478 fp.write_u8(variant.discriminant);
479 }
480 }
481 fp.finish()
482}
483
484fn write_record_field(fp: &mut Fingerprinter, field: &FieldDefinition) {
485 fp.write_str(field.name);
486 fp.write_u32(field.field_index);
487 fp.write_u32(field.offset);
488 fp.write_u8(field_type_tag(field.ty));
489 fp.write_u32(field.len);
490 fp.write_str(field.rust_type_name);
491 fp.write_opt_str(field.enum_type_name);
492 fp.write_bool(field.immutable);
493}
494
495fn write_payload_field(fp: &mut Fingerprinter, field: &PayloadFieldDefinition) {
496 fp.write_str(field.name);
497 fp.write_u32(field.field_index);
498 fp.write_u8(field_type_tag(field.ty));
499 fp.write_str(field.rust_type_name);
500 fp.write_opt_str(field.enum_type_name);
501 fp.write_opt_u32(field.fixed_size);
502}
503
504fn field_type_tag(ty: FieldType) -> u8 {
505 match ty {
506 FieldType::Bool => 1,
507 FieldType::U8 => 2,
508 FieldType::U16 => 3,
509 FieldType::U32 => 4,
510 FieldType::U64 => 5,
511 FieldType::I32 => 6,
512 FieldType::I64 => 7,
513 FieldType::U128 => 8,
514 FieldType::FixedBytes => 9,
515 FieldType::VarBytes => 10,
516 FieldType::EnumU8 => 11,
517 }
518}