1use super::SchemaId;
7use super::enum_support::EnumVariantInfo;
8use super::field_types::{FieldAnnotation, FieldType};
9use super::schema::TypeSchema;
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::sync::atomic::{AtomicU32, Ordering};
13
14const INITIAL_SCHEMA_ID: SchemaId = 1;
20
21#[derive(Debug, serde::Serialize, serde::Deserialize)]
33pub struct TypeSchemaRegistry {
34 #[serde(skip, default = "default_next_id")]
41 next_id: AtomicU32,
42 by_name: HashMap<String, TypeSchema>,
44 by_id: HashMap<SchemaId, String>,
46 #[serde(skip, default)]
54 predeclared_cache: RwLock<HashMap<String, SchemaId>>,
55 #[serde(skip, default)]
58 predeclared_by_id: RwLock<HashMap<SchemaId, TypeSchema>>,
59}
60
61fn default_next_id() -> AtomicU32 {
62 AtomicU32::new(INITIAL_SCHEMA_ID)
63}
64
65impl Default for TypeSchemaRegistry {
66 fn default() -> Self {
67 Self {
68 next_id: default_next_id(),
69 by_name: HashMap::new(),
70 by_id: HashMap::new(),
71 predeclared_cache: RwLock::new(HashMap::new()),
72 predeclared_by_id: RwLock::new(HashMap::new()),
73 }
74 }
75}
76
77impl Clone for TypeSchemaRegistry {
78 fn clone(&self) -> Self {
79 let predeclared_cache = self
80 .predeclared_cache
81 .read()
82 .map(|g| g.clone())
83 .unwrap_or_default();
84 let predeclared_by_id = self
85 .predeclared_by_id
86 .read()
87 .map(|g| g.clone())
88 .unwrap_or_default();
89 Self {
90 next_id: AtomicU32::new(self.next_id.load(Ordering::SeqCst)),
91 by_name: self.by_name.clone(),
92 by_id: self.by_id.clone(),
93 predeclared_cache: RwLock::new(predeclared_cache),
94 predeclared_by_id: RwLock::new(predeclared_by_id),
95 }
96 }
97}
98
99impl TypeSchemaRegistry {
100 pub fn new() -> Self {
102 Self::default()
103 }
104
105 pub fn allocate_id(&self) -> SchemaId {
112 self.next_id.fetch_add(1, Ordering::SeqCst)
113 }
114
115 pub fn ensure_next_id_above(&self, max_existing_id: SchemaId) {
122 let required_next = max_existing_id.saturating_add(1);
123 let mut current = self.next_id.load(Ordering::SeqCst);
124 while current < required_next {
125 match self.next_id.compare_exchange(
126 current,
127 required_next,
128 Ordering::SeqCst,
129 Ordering::SeqCst,
130 ) {
131 Ok(_) => break,
132 Err(actual) => current = actual,
133 }
134 }
135 }
136
137 #[cfg(test)]
140 pub(crate) fn peek_next_id(&self) -> SchemaId {
141 self.next_id.load(Ordering::SeqCst)
142 }
143
144 pub fn register(&mut self, schema: TypeSchema) {
146 let name = schema.name.clone();
147 let id = schema.id;
148 self.by_id.insert(id, name.clone());
149 self.by_name.insert(name, schema);
150 }
151
152 pub fn register_type(
154 &mut self,
155 name: impl Into<String>,
156 fields: Vec<(String, FieldType)>,
157 ) -> SchemaId {
158 let schema = TypeSchema::new(name, fields);
159 let id = schema.id;
160 self.register(schema);
161 id
162 }
163
164 pub fn register_type_with_annotations(
171 &mut self,
172 name: impl Into<String>,
173 fields: Vec<(String, FieldType)>,
174 field_annotations: Vec<Vec<FieldAnnotation>>,
175 ) -> SchemaId {
176 let mut schema = TypeSchema::new(name, fields);
177 for (i, annotations) in field_annotations.into_iter().enumerate() {
178 if i < schema.fields.len() && !annotations.is_empty() {
179 schema.fields[i].annotations = annotations;
180 }
181 }
182 let id = schema.id;
183 self.register(schema);
184 id
185 }
186
187 pub fn get(&self, name: &str) -> Option<&TypeSchema> {
189 self.by_name.get(name)
190 }
191
192 pub fn get_by_id(&self, id: SchemaId) -> Option<&TypeSchema> {
194 self.by_id.get(&id).and_then(|name| self.by_name.get(name))
195 }
196
197 pub fn max_schema_id(&self) -> Option<SchemaId> {
199 self.by_id.keys().copied().max()
200 }
201
202 pub fn field_offset(&self, type_name: &str, field_name: &str) -> Option<usize> {
204 self.get(type_name)?.field_offset(field_name)
205 }
206
207 pub fn has_type(&self, name: &str) -> bool {
209 self.by_name.contains_key(name)
210 }
211
212 pub fn type_count(&self) -> usize {
214 self.by_name.len()
215 }
216
217 pub fn type_names(&self) -> impl Iterator<Item = &str> {
219 self.by_name.keys().map(|s| s.as_str())
220 }
221
222 pub fn with_stdlib_types() -> Self {
229 let mut registry = Self::new();
230
231 registry.register_type_scoped(
233 "Row",
234 vec![
235 ("timestamp".to_string(), FieldType::Timestamp),
236 ("fields".to_string(), FieldType::Any), ],
238 );
239
240 registry.register_enum_scoped(
242 "Option",
243 vec![
244 EnumVariantInfo::new("Some", 0, 1), EnumVariantInfo::new("None", 1, 0), ],
247 );
248
249 registry.register_enum_scoped(
251 "Result",
252 vec![
253 EnumVariantInfo::new("Ok", 0, 1), EnumVariantInfo::new("Err", 1, 1), ],
256 );
257
258 super::builtin_schemas::register_builtin_schemas(&mut registry);
260
261 registry
265 }
266
267 pub fn with_stdlib_types_and_builtin_ids() -> (Self, super::builtin_schemas::BuiltinSchemaIds) {
273 let mut registry = Self::new();
274
275 registry.register_type_scoped(
277 "Row",
278 vec![
279 ("timestamp".to_string(), FieldType::Timestamp),
280 ("fields".to_string(), FieldType::Any),
281 ],
282 );
283
284 registry.register_enum_scoped(
286 "Option",
287 vec![
288 EnumVariantInfo::new("Some", 0, 1),
289 EnumVariantInfo::new("None", 1, 0),
290 ],
291 );
292 registry.register_enum_scoped(
293 "Result",
294 vec![
295 EnumVariantInfo::new("Ok", 0, 1),
296 EnumVariantInfo::new("Err", 1, 1),
297 ],
298 );
299
300 let ids = super::builtin_schemas::register_builtin_schemas(&mut registry);
302
303 (registry, ids)
304 }
305
306 pub fn register_type_scoped(
312 &mut self,
313 name: impl Into<String>,
314 fields: Vec<(String, FieldType)>,
315 ) -> SchemaId {
316 let id = self.allocate_id();
317 let schema = TypeSchema::with_id(id, name, fields);
318 self.register(schema);
319 id
320 }
321
322 pub fn register_enum_scoped(
325 &mut self,
326 name: impl Into<String>,
327 variants: Vec<EnumVariantInfo>,
328 ) -> SchemaId {
329 let id = self.allocate_id();
330 let schema = TypeSchema::new_enum_with_id(id, name, variants);
331 self.register(schema);
332 id
333 }
334
335 pub fn new_with_stdlib() -> Self {
349 let mut registry = Self::new();
350
351 registry.register_type_scoped(
353 "Row",
354 vec![
355 ("timestamp".to_string(), FieldType::Timestamp),
356 ("fields".to_string(), FieldType::Any),
357 ],
358 );
359
360 registry.register_enum_scoped(
362 "Option",
363 vec![
364 EnumVariantInfo::new("Some", 0, 1),
365 EnumVariantInfo::new("None", 1, 0),
366 ],
367 );
368 registry.register_enum_scoped(
369 "Result",
370 vec![
371 EnumVariantInfo::new("Ok", 0, 1),
372 EnumVariantInfo::new("Err", 1, 1),
373 ],
374 );
375
376 super::builtin_schemas::register_builtin_schemas(&mut registry);
388
389 registry
390 }
391
392 pub fn compute_all_hashes(&mut self) {
394 for schema in self.by_name.values_mut() {
395 schema.content_hash();
396 }
397 }
398
399 pub fn get_by_content_hash(&self, hash: &[u8; 32]) -> Option<&TypeSchema> {
404 self.by_name.values().find(|schema| {
405 let schema_hash = match schema.content_hash {
407 Some(h) => h,
408 None => schema.compute_content_hash(),
409 };
410 &schema_hash == hash
411 })
412 }
413
414 pub fn merge(&mut self, other: TypeSchemaRegistry) {
427 for (name, schema) in other.by_name {
428 if self.by_name.contains_key(&name) {
429 continue;
430 }
431 let id = schema.id;
432 if self.by_id.contains_key(&id) {
433 continue;
439 }
440 self.by_id.insert(id, name.clone());
441 self.by_name.insert(name, schema);
442 }
443 if let (Ok(other_by_id), Ok(mut self_by_id)) = (
445 other.predeclared_by_id.read(),
446 self.predeclared_by_id.write(),
447 ) {
448 for (id, schema) in other_by_id.iter() {
449 self_by_id.entry(*id).or_insert_with(|| schema.clone());
450 }
451 }
452 if let (Ok(other_cache), Ok(mut self_cache)) = (
453 other.predeclared_cache.read(),
454 self.predeclared_cache.write(),
455 ) {
456 for (key, id) in other_cache.iter() {
457 self_cache.entry(key.clone()).or_insert(*id);
458 }
459 }
460 }
461
462 fn predeclared_cache_key(fields: &[&str]) -> String {
467 fields.join("\u{1f}")
468 }
469
470 pub fn register_predeclared_any_schema(&self, fields: &[String]) -> SchemaId {
478 let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect();
479 let key = Self::predeclared_cache_key(&field_refs);
480
481 if let Ok(cache) = self.predeclared_cache.read() {
482 if let Some(id) = cache.get(&key) {
483 return *id;
484 }
485 }
486
487 let typed_fields: Vec<(String, FieldType)> = fields
488 .iter()
489 .map(|name| (name.clone(), FieldType::Any))
490 .collect();
491
492 let id = self.allocate_id();
493 let schema = TypeSchema::with_id(
494 id,
495 format!("__predecl_{}", fields.join("_")),
496 typed_fields,
497 );
498
499 if let Ok(mut reg) = self.predeclared_by_id.write() {
500 reg.insert(id, schema);
501 }
502 if let Ok(mut cache) = self.predeclared_cache.write() {
503 cache.insert(key, id);
504 }
505 id
506 }
507
508 pub fn lookup_predeclared_by_id(&self, id: SchemaId) -> Option<TypeSchema> {
510 self.predeclared_by_id
511 .read()
512 .ok()
513 .and_then(|reg| reg.get(&id).cloned())
514 }
515
516 pub fn mirror_predeclared_any_schema(&self, fields: &[String], id: SchemaId) {
524 let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect();
525 let key = Self::predeclared_cache_key(&field_refs);
526
527 if let Ok(cache) = self.predeclared_cache.read() {
528 if cache.get(&key).copied() == Some(id) {
529 return;
530 }
531 }
532
533 let typed_fields: Vec<(String, FieldType)> = fields
534 .iter()
535 .map(|name| (name.clone(), FieldType::Any))
536 .collect();
537
538 let schema = TypeSchema::with_id(
539 id,
540 format!("__predecl_{}", fields.join("_")),
541 typed_fields,
542 );
543
544 if let Ok(mut reg) = self.predeclared_by_id.write() {
545 reg.entry(id).or_insert(schema);
546 }
547 if let Ok(mut cache) = self.predeclared_cache.write() {
548 cache.entry(key).or_insert(id);
549 }
550 }
551
552 pub fn lookup_predeclared_id_by_field_order(&self, fields: &[&str]) -> Option<SchemaId> {
555 let key = Self::predeclared_cache_key(fields);
556 self.predeclared_cache
557 .read()
558 .ok()
559 .and_then(|cache| cache.get(&key).copied())
560 }
561
562 pub fn lookup_predeclared_by_field_set(&self, fields: &[&str]) -> Option<TypeSchema> {
564 let Ok(reg) = self.predeclared_by_id.read() else {
565 return None;
566 };
567 reg.values()
568 .find(|schema| {
569 if schema.fields.len() != fields.len() {
570 return false;
571 }
572 let wanted: std::collections::HashSet<&str> = fields.iter().copied().collect();
573 schema
574 .fields
575 .iter()
576 .all(|f| wanted.contains(f.name.as_str()))
577 })
578 .cloned()
579 }
580}
581
582pub struct TypeSchemaBuilder {
592 name: String,
593 fields: Vec<(String, FieldType)>,
594 field_meta: Vec<Vec<FieldAnnotation>>,
595}
596
597impl TypeSchemaBuilder {
598 pub fn new(name: impl Into<String>) -> Self {
600 Self {
601 name: name.into(),
602 fields: Vec::new(),
603 field_meta: Vec::new(),
604 }
605 }
606
607 pub fn f64_field(mut self, name: impl Into<String>) -> Self {
609 self.fields.push((name.into(), FieldType::F64));
610 self.field_meta.push(vec![]);
611 self
612 }
613
614 pub fn i64_field(mut self, name: impl Into<String>) -> Self {
616 self.fields.push((name.into(), FieldType::I64));
617 self.field_meta.push(vec![]);
618 self
619 }
620
621 pub fn decimal_field(mut self, name: impl Into<String>) -> Self {
623 self.fields.push((name.into(), FieldType::Decimal));
624 self.field_meta.push(vec![]);
625 self
626 }
627
628 pub fn bool_field(mut self, name: impl Into<String>) -> Self {
630 self.fields.push((name.into(), FieldType::Bool));
631 self.field_meta.push(vec![]);
632 self
633 }
634
635 pub fn string_field(mut self, name: impl Into<String>) -> Self {
637 self.fields.push((name.into(), FieldType::String));
638 self.field_meta.push(vec![]);
639 self
640 }
641
642 pub fn timestamp_field(mut self, name: impl Into<String>) -> Self {
644 self.fields.push((name.into(), FieldType::Timestamp));
645 self.field_meta.push(vec![]);
646 self
647 }
648
649 pub fn object_field(mut self, name: impl Into<String>, type_name: impl Into<String>) -> Self {
651 self.fields
652 .push((name.into(), FieldType::Object(type_name.into())));
653 self.field_meta.push(vec![]);
654 self
655 }
656
657 pub fn array_field(mut self, name: impl Into<String>, element_type: FieldType) -> Self {
659 self.fields
660 .push((name.into(), FieldType::Array(Box::new(element_type))));
661 self.field_meta.push(vec![]);
662 self
663 }
664
665 pub fn any_field(mut self, name: impl Into<String>) -> Self {
667 self.fields.push((name.into(), FieldType::Any));
668 self.field_meta.push(vec![]);
669 self
670 }
671
672 pub fn field_with_meta(
674 mut self,
675 name: impl Into<String>,
676 field_type: FieldType,
677 annotations: Vec<FieldAnnotation>,
678 ) -> Self {
679 self.fields.push((name.into(), field_type));
680 self.field_meta.push(annotations);
681 self
682 }
683
684 pub fn build(self) -> TypeSchema {
686 let mut schema = TypeSchema::new(self.name, self.fields);
687 for (i, annotations) in self.field_meta.into_iter().enumerate() {
689 if i < schema.fields.len() {
690 schema.fields[i].annotations = annotations;
691 }
692 }
693 schema
694 }
695
696 pub fn register(self, registry: &mut TypeSchemaRegistry) -> SchemaId {
705 let id = registry.allocate_id();
706 let mut schema = TypeSchema::with_id(id, self.name, self.fields);
707 for (i, annotations) in self.field_meta.into_iter().enumerate() {
708 if i < schema.fields.len() {
709 schema.fields[i].annotations = annotations;
710 }
711 }
712 registry.register(schema);
713 id
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 #[test]
722 fn test_registry() {
723 let mut registry = TypeSchemaRegistry::new();
724
725 let schema_id = registry.register_type(
726 "MyType",
727 vec![
728 ("x".to_string(), FieldType::F64),
729 ("y".to_string(), FieldType::F64),
730 ],
731 );
732
733 assert!(registry.has_type("MyType"));
734 assert!(!registry.has_type("OtherType"));
735
736 let schema = registry.get("MyType").unwrap();
737 assert_eq!(schema.id, schema_id);
738 assert_eq!(schema.field_count(), 2);
739
740 let schema_by_id = registry.get_by_id(schema_id).unwrap();
742 assert_eq!(schema_by_id.name, "MyType");
743 }
744
745 #[test]
746 fn test_builder() {
747 let mut registry = TypeSchemaRegistry::new();
748
749 let schema_id = TypeSchemaBuilder::new("Point")
750 .f64_field("x")
751 .f64_field("y")
752 .f64_field("z")
753 .register(&mut registry);
754
755 let schema = registry.get_by_id(schema_id).unwrap();
756 assert_eq!(schema.name, "Point");
757 assert_eq!(schema.field_count(), 3);
758 assert_eq!(schema.field_offset("x"), Some(0));
759 assert_eq!(schema.field_offset("y"), Some(8));
760 assert_eq!(schema.field_offset("z"), Some(16));
761 }
762
763 #[test]
764 fn test_stdlib_types() {
765 let registry = TypeSchemaRegistry::with_stdlib_types();
766
767 assert!(registry.has_type("Row"));
768 let row_schema = registry.get("Row").unwrap();
769 assert!(row_schema.has_field("timestamp"));
770 }
771
772 #[test]
773 fn test_ohlcv_schema() {
774 let mut registry = TypeSchemaRegistry::new();
776
777 TypeSchemaBuilder::new("Candle")
778 .timestamp_field("timestamp")
779 .f64_field("open")
780 .f64_field("high")
781 .f64_field("low")
782 .f64_field("close")
783 .f64_field("volume")
784 .register(&mut registry);
785
786 let schema = registry.get("Candle").unwrap();
787 assert_eq!(schema.field_count(), 6);
788 assert_eq!(schema.data_size, 48); assert_eq!(schema.field_offset("timestamp"), Some(0));
792 assert_eq!(schema.field_offset("open"), Some(8));
793 assert_eq!(schema.field_offset("high"), Some(16));
794 assert_eq!(schema.field_offset("low"), Some(24));
795 assert_eq!(schema.field_offset("close"), Some(32));
796 assert_eq!(schema.field_offset("volume"), Some(40));
797 }
798
799 #[test]
800 fn test_stdlib_enum_types() {
801 let registry = TypeSchemaRegistry::with_stdlib_types();
802
803 assert!(registry.has_type("Option"));
805 let option_schema = registry.get("Option").unwrap();
806 assert!(option_schema.is_enum());
807 assert_eq!(option_schema.variant_id("Some"), Some(0));
808 assert_eq!(option_schema.variant_id("None"), Some(1));
809
810 assert!(registry.has_type("Result"));
812 let result_schema = registry.get("Result").unwrap();
813 assert!(result_schema.is_enum());
814 assert_eq!(result_schema.variant_id("Ok"), Some(0));
815 assert_eq!(result_schema.variant_id("Err"), Some(1));
816 }
817
818 #[test]
819 fn test_max_schema_id() {
820 let mut registry = TypeSchemaRegistry::new();
821 let a = registry.register_type("A", vec![("x".to_string(), FieldType::F64)]);
822 let b = registry.register_type("B", vec![("y".to_string(), FieldType::F64)]);
823 assert_eq!(registry.max_schema_id(), Some(a.max(b)));
824 }
825
826 #[test]
836 fn b1_1_registry_allocate_id_is_per_instance() {
837 let r1 = TypeSchemaRegistry::new();
838 let r2 = TypeSchemaRegistry::new();
839
840 assert_eq!(r1.peek_next_id(), r2.peek_next_id());
842
843 let id1a = r1.allocate_id();
845 let id1b = r1.allocate_id();
846 assert_eq!(id1b, id1a + 1);
847 assert_eq!(r2.peek_next_id(), id1a);
848
849 let id2a = r2.allocate_id();
851 assert_eq!(id2a, id1a);
852 }
853
854 #[test]
855 fn b1_1_new_with_stdlib_uses_registry_counter_for_scoped_types() {
856 let mut r1 = TypeSchemaRegistry::new_with_stdlib();
857 let mut r2 = TypeSchemaRegistry::new_with_stdlib();
858
859 for name in ["Row", "Option", "Result"] {
861 assert!(r1.has_type(name), "r1 missing {name}");
862 assert!(r2.has_type(name), "r2 missing {name}");
863 }
864
865 let r1_user =
869 r1.register_type_scoped("UserA", vec![("x".to_string(), FieldType::F64)]);
870 let r2_user =
871 r2.register_type_scoped("UserA", vec![("x".to_string(), FieldType::F64)]);
872
873 assert_eq!(r1.get("UserA").unwrap().id, r1_user);
875 assert_eq!(r2.get("UserA").unwrap().id, r2_user);
876
877 let r1_user_b =
881 r1.register_type_scoped("UserB", vec![("y".to_string(), FieldType::F64)]);
882 assert_eq!(r1_user_b, r1_user + 1);
883
884 let r2_user_b =
886 r2.register_type_scoped("UserB", vec![("y".to_string(), FieldType::F64)]);
887 assert_eq!(r2_user_b, r2_user + 1);
888 }
889
890 #[test]
891 fn b1_1_scoped_enum_ids_are_per_registry() {
892 let mut r1 = TypeSchemaRegistry::new();
893 let mut r2 = TypeSchemaRegistry::new();
894
895 let e1 = r1.register_enum_scoped(
896 "Color",
897 vec![
898 EnumVariantInfo::new("Red", 0, 0),
899 EnumVariantInfo::new("Green", 1, 0),
900 ],
901 );
902 let e2 = r2.register_enum_scoped(
903 "Color",
904 vec![
905 EnumVariantInfo::new("Red", 0, 0),
906 EnumVariantInfo::new("Green", 1, 0),
907 ],
908 );
909
910 assert_eq!(e1, e2);
913 assert!(r1.get("Color").unwrap().is_enum());
914 assert!(r2.get("Color").unwrap().is_enum());
915 }
916
917 #[test]
918 fn b1_1_ensure_next_id_above_is_per_registry() {
919 let r1 = TypeSchemaRegistry::new();
920 let r2 = TypeSchemaRegistry::new();
921
922 r1.ensure_next_id_above(500);
923 assert_eq!(r1.peek_next_id(), 501);
924
925 assert_eq!(r2.peek_next_id(), INITIAL_SCHEMA_ID);
927 }
928}