1use super::SchemaId;
7use super::enum_support::EnumVariantInfo;
8use super::field_types::{FieldAnnotation, FieldType};
9use super::schema::TypeSchema;
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::sync::atomic::{AtomicU32, Ordering};
13
14const INITIAL_SCHEMA_ID: SchemaId = 1;
20
21#[derive(Debug, serde::Serialize, serde::Deserialize)]
33pub struct TypeSchemaRegistry {
34 #[serde(skip, default = "default_next_id")]
41 next_id: AtomicU32,
42 by_name: HashMap<String, TypeSchema>,
44 by_id: HashMap<SchemaId, String>,
46 #[serde(skip, default)]
54 predeclared_cache: RwLock<HashMap<String, SchemaId>>,
55 #[serde(skip, default)]
58 predeclared_by_id: RwLock<HashMap<SchemaId, TypeSchema>>,
59}
60
61fn default_next_id() -> AtomicU32 {
62 AtomicU32::new(INITIAL_SCHEMA_ID)
63}
64
65impl Default for TypeSchemaRegistry {
66 fn default() -> Self {
67 Self {
68 next_id: default_next_id(),
69 by_name: HashMap::new(),
70 by_id: HashMap::new(),
71 predeclared_cache: RwLock::new(HashMap::new()),
72 predeclared_by_id: RwLock::new(HashMap::new()),
73 }
74 }
75}
76
77impl Clone for TypeSchemaRegistry {
78 fn clone(&self) -> Self {
79 let predeclared_cache = self
80 .predeclared_cache
81 .read()
82 .map(|g| g.clone())
83 .unwrap_or_default();
84 let predeclared_by_id = self
85 .predeclared_by_id
86 .read()
87 .map(|g| g.clone())
88 .unwrap_or_default();
89 Self {
90 next_id: AtomicU32::new(self.next_id.load(Ordering::SeqCst)),
91 by_name: self.by_name.clone(),
92 by_id: self.by_id.clone(),
93 predeclared_cache: RwLock::new(predeclared_cache),
94 predeclared_by_id: RwLock::new(predeclared_by_id),
95 }
96 }
97}
98
99impl TypeSchemaRegistry {
100 pub fn new() -> Self {
102 Self::default()
103 }
104
105 pub fn allocate_id(&self) -> SchemaId {
112 self.next_id.fetch_add(1, Ordering::SeqCst)
113 }
114
115 pub fn ensure_next_id_above(&self, max_existing_id: SchemaId) {
122 let required_next = max_existing_id.saturating_add(1);
123 let mut current = self.next_id.load(Ordering::SeqCst);
124 while current < required_next {
125 match self.next_id.compare_exchange(
126 current,
127 required_next,
128 Ordering::SeqCst,
129 Ordering::SeqCst,
130 ) {
131 Ok(_) => break,
132 Err(actual) => current = actual,
133 }
134 }
135 }
136
137 #[cfg(test)]
140 pub(crate) fn peek_next_id(&self) -> SchemaId {
141 self.next_id.load(Ordering::SeqCst)
142 }
143
144 pub fn register(&mut self, schema: TypeSchema) {
146 let name = schema.name.clone();
147 let id = schema.id;
148 self.by_id.insert(id, name.clone());
149 self.by_name.insert(name, schema);
150 }
151
152 pub fn register_type(
154 &mut self,
155 name: impl Into<String>,
156 fields: Vec<(String, FieldType)>,
157 ) -> SchemaId {
158 let schema = TypeSchema::new(name, fields);
159 let id = schema.id;
160 self.register(schema);
161 id
162 }
163
164 pub fn register_type_with_annotations(
171 &mut self,
172 name: impl Into<String>,
173 fields: Vec<(String, FieldType)>,
174 field_annotations: Vec<Vec<FieldAnnotation>>,
175 ) -> SchemaId {
176 let mut schema = TypeSchema::new(name, fields);
177 for (i, annotations) in field_annotations.into_iter().enumerate() {
178 if i < schema.fields.len() && !annotations.is_empty() {
179 schema.fields[i].annotations = annotations;
180 }
181 }
182 let id = schema.id;
183 self.register(schema);
184 id
185 }
186
187 pub fn get(&self, name: &str) -> Option<&TypeSchema> {
189 self.by_name.get(name)
190 }
191
192 pub fn get_by_id(&self, id: SchemaId) -> Option<&TypeSchema> {
194 self.by_id.get(&id).and_then(|name| self.by_name.get(name))
195 }
196
197 pub fn max_schema_id(&self) -> Option<SchemaId> {
199 self.by_id.keys().copied().max()
200 }
201
202 pub fn field_offset(&self, type_name: &str, field_name: &str) -> Option<usize> {
204 self.get(type_name)?.field_offset(field_name)
205 }
206
207 pub fn has_type(&self, name: &str) -> bool {
209 self.by_name.contains_key(name)
210 }
211
212 pub fn type_count(&self) -> usize {
214 self.by_name.len()
215 }
216
217 pub fn type_names(&self) -> impl Iterator<Item = &str> {
219 self.by_name.keys().map(|s| s.as_str())
220 }
221
222 pub fn with_stdlib_types() -> Self {
229 let mut registry = Self::new();
230
231 registry.register_type_scoped(
233 "Row",
234 vec![
235 ("timestamp".to_string(), FieldType::Timestamp),
236 ("fields".to_string(), FieldType::Any), ],
238 );
239
240 registry.register_enum_scoped(
242 "Option",
243 vec![
244 EnumVariantInfo::new("Some", 0, 1), EnumVariantInfo::new("None", 1, 0), ],
247 );
248
249 registry.register_enum_scoped(
251 "Result",
252 vec![
253 EnumVariantInfo::new("Ok", 0, 1), EnumVariantInfo::new("Err", 1, 1), ],
256 );
257
258 super::builtin_schemas::register_builtin_schemas(&mut registry);
260
261 registry
265 }
266
267 pub fn with_stdlib_types_and_builtin_ids() -> (Self, super::builtin_schemas::BuiltinSchemaIds) {
273 let mut registry = Self::new();
274
275 registry.register_type_scoped(
277 "Row",
278 vec![
279 ("timestamp".to_string(), FieldType::Timestamp),
280 ("fields".to_string(), FieldType::Any),
281 ],
282 );
283
284 registry.register_enum_scoped(
286 "Option",
287 vec![
288 EnumVariantInfo::new("Some", 0, 1),
289 EnumVariantInfo::new("None", 1, 0),
290 ],
291 );
292 registry.register_enum_scoped(
293 "Result",
294 vec![
295 EnumVariantInfo::new("Ok", 0, 1),
296 EnumVariantInfo::new("Err", 1, 1),
297 ],
298 );
299
300 let ids = super::builtin_schemas::register_builtin_schemas(&mut registry);
302
303 (registry, ids)
304 }
305
306 pub fn register_type_scoped(
312 &mut self,
313 name: impl Into<String>,
314 fields: Vec<(String, FieldType)>,
315 ) -> SchemaId {
316 let id = self.allocate_id();
317 let schema = TypeSchema::with_id(id, name, fields);
318 self.register(schema);
319 id
320 }
321
322 pub fn register_enum_scoped(
325 &mut self,
326 name: impl Into<String>,
327 variants: Vec<EnumVariantInfo>,
328 ) -> SchemaId {
329 let id = self.allocate_id();
330 let schema = TypeSchema::new_enum_with_id(id, name, variants);
331 self.register(schema);
332 id
333 }
334
335 pub fn new_with_stdlib() -> Self {
349 let mut registry = Self::new();
350
351 registry.register_type_scoped(
353 "Row",
354 vec![
355 ("timestamp".to_string(), FieldType::Timestamp),
356 ("fields".to_string(), FieldType::Any),
357 ],
358 );
359
360 registry.register_enum_scoped(
362 "Option",
363 vec![
364 EnumVariantInfo::new("Some", 0, 1),
365 EnumVariantInfo::new("None", 1, 0),
366 ],
367 );
368 registry.register_enum_scoped(
369 "Result",
370 vec![
371 EnumVariantInfo::new("Ok", 0, 1),
372 EnumVariantInfo::new("Err", 1, 1),
373 ],
374 );
375
376 super::builtin_schemas::register_builtin_schemas(&mut registry);
388
389 registry
390 }
391
392 pub fn compute_all_hashes(&mut self) {
394 for schema in self.by_name.values_mut() {
395 schema.content_hash();
396 }
397 }
398
399 pub fn get_by_content_hash(&self, hash: &[u8; 32]) -> Option<&TypeSchema> {
404 self.by_name.values().find(|schema| {
405 let schema_hash = match schema.content_hash {
407 Some(h) => h,
408 None => schema.compute_content_hash(),
409 };
410 &schema_hash == hash
411 })
412 }
413
414 pub fn merge(&mut self, other: TypeSchemaRegistry) {
427 for (name, schema) in other.by_name {
428 if self.by_name.contains_key(&name) {
429 continue;
430 }
431 let id = schema.id;
432 if self.by_id.contains_key(&id) {
433 continue;
439 }
440 self.by_id.insert(id, name.clone());
441 self.by_name.insert(name, schema);
442 }
443 if let (Ok(other_by_id), Ok(mut self_by_id)) = (
445 other.predeclared_by_id.read(),
446 self.predeclared_by_id.write(),
447 ) {
448 for (id, schema) in other_by_id.iter() {
449 self_by_id.entry(*id).or_insert_with(|| schema.clone());
450 }
451 }
452 if let (Ok(other_cache), Ok(mut self_cache)) = (
453 other.predeclared_cache.read(),
454 self.predeclared_cache.write(),
455 ) {
456 for (key, id) in other_cache.iter() {
457 self_cache.entry(key.clone()).or_insert(*id);
458 }
459 }
460 }
461
462 fn predeclared_cache_key(fields: &[&str]) -> String {
467 fields.join("\u{1f}")
468 }
469
470 pub fn register_predeclared_any_schema(&self, fields: &[String]) -> SchemaId {
478 let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect();
479 let key = Self::predeclared_cache_key(&field_refs);
480
481 if let Ok(cache) = self.predeclared_cache.read() {
482 if let Some(id) = cache.get(&key) {
483 return *id;
484 }
485 }
486
487 let typed_fields: Vec<(String, FieldType)> = fields
488 .iter()
489 .map(|name| (name.clone(), FieldType::Any))
490 .collect();
491
492 let id = self.allocate_id();
493 let schema = TypeSchema::with_id(
494 id,
495 format!("__predecl_{}", fields.join("_")),
496 typed_fields,
497 );
498
499 if let Ok(mut reg) = self.predeclared_by_id.write() {
500 reg.insert(id, schema);
501 }
502 if let Ok(mut cache) = self.predeclared_cache.write() {
503 cache.insert(key, id);
504 }
505 id
506 }
507
508 pub fn lookup_predeclared_by_id(&self, id: SchemaId) -> Option<TypeSchema> {
510 self.predeclared_by_id
511 .read()
512 .ok()
513 .and_then(|reg| reg.get(&id).cloned())
514 }
515
516 pub fn mirror_predeclared_any_schema(&self, fields: &[String], id: SchemaId) {
524 let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect();
525 let key = Self::predeclared_cache_key(&field_refs);
526
527 if let Ok(cache) = self.predeclared_cache.read() {
528 if cache.get(&key).copied() == Some(id) {
529 return;
530 }
531 }
532
533 let typed_fields: Vec<(String, FieldType)> = fields
534 .iter()
535 .map(|name| (name.clone(), FieldType::Any))
536 .collect();
537
538 let schema = TypeSchema::with_id(
539 id,
540 format!("__predecl_{}", fields.join("_")),
541 typed_fields,
542 );
543
544 if let Ok(mut reg) = self.predeclared_by_id.write() {
545 reg.entry(id).or_insert(schema);
546 }
547 if let Ok(mut cache) = self.predeclared_cache.write() {
548 cache.entry(key).or_insert(id);
549 }
550 }
551
552 pub fn lookup_predeclared_id_by_field_order(&self, fields: &[&str]) -> Option<SchemaId> {
555 let key = Self::predeclared_cache_key(fields);
556 self.predeclared_cache
557 .read()
558 .ok()
559 .and_then(|cache| cache.get(&key).copied())
560 }
561
562 pub fn lookup_predeclared_by_field_set(&self, fields: &[&str]) -> Option<TypeSchema> {
564 let Ok(reg) = self.predeclared_by_id.read() else {
565 return None;
566 };
567 reg.values()
568 .find(|schema| {
569 if schema.fields.len() != fields.len() {
570 return false;
571 }
572 let wanted: std::collections::HashSet<&str> = fields.iter().copied().collect();
573 schema
574 .fields
575 .iter()
576 .all(|f| wanted.contains(f.name.as_str()))
577 })
578 .cloned()
579 }
580}
581
582pub struct TypeSchemaBuilder {
592 name: String,
593 fields: Vec<(String, FieldType)>,
594 field_meta: Vec<Vec<FieldAnnotation>>,
595}
596
597impl TypeSchemaBuilder {
598 pub fn new(name: impl Into<String>) -> Self {
600 Self {
601 name: name.into(),
602 fields: Vec::new(),
603 field_meta: Vec::new(),
604 }
605 }
606
607 pub fn f64_field(mut self, name: impl Into<String>) -> Self {
609 self.fields.push((name.into(), FieldType::F64));
610 self.field_meta.push(vec![]);
611 self
612 }
613
614 pub fn i64_field(mut self, name: impl Into<String>) -> Self {
616 self.fields.push((name.into(), FieldType::I64));
617 self.field_meta.push(vec![]);
618 self
619 }
620
621 pub fn decimal_field(mut self, name: impl Into<String>) -> Self {
623 self.fields.push((name.into(), FieldType::Decimal));
624 self.field_meta.push(vec![]);
625 self
626 }
627
628 pub fn bool_field(mut self, name: impl Into<String>) -> Self {
630 self.fields.push((name.into(), FieldType::Bool));
631 self.field_meta.push(vec![]);
632 self
633 }
634
635 pub fn string_field(mut self, name: impl Into<String>) -> Self {
637 self.fields.push((name.into(), FieldType::String));
638 self.field_meta.push(vec![]);
639 self
640 }
641
642 pub fn timestamp_field(mut self, name: impl Into<String>) -> Self {
644 self.fields.push((name.into(), FieldType::Timestamp));
645 self.field_meta.push(vec![]);
646 self
647 }
648
649 pub fn object_field(mut self, name: impl Into<String>, type_name: impl Into<String>) -> Self {
651 self.fields
652 .push((name.into(), FieldType::Object(type_name.into())));
653 self.field_meta.push(vec![]);
654 self
655 }
656
657 pub fn array_field(mut self, name: impl Into<String>, element_type: FieldType) -> Self {
659 self.fields
660 .push((name.into(), FieldType::Array(Box::new(element_type))));
661 self.field_meta.push(vec![]);
662 self
663 }
664
665 pub fn hashmap_field(
670 mut self,
671 name: impl Into<String>,
672 key_type: FieldType,
673 value_type: FieldType,
674 ) -> Self {
675 self.fields.push((
676 name.into(),
677 FieldType::HashMap {
678 key: Box::new(key_type),
679 value: Box::new(value_type),
680 },
681 ));
682 self.field_meta.push(vec![]);
683 self
684 }
685
686 pub fn set_field(mut self, name: impl Into<String>, element_type: FieldType) -> Self {
691 self.fields
692 .push((name.into(), FieldType::Set(Box::new(element_type))));
693 self.field_meta.push(vec![]);
694 self
695 }
696
697 pub fn any_field(mut self, name: impl Into<String>) -> Self {
699 self.fields.push((name.into(), FieldType::Any));
700 self.field_meta.push(vec![]);
701 self
702 }
703
704 pub fn field_with_meta(
706 mut self,
707 name: impl Into<String>,
708 field_type: FieldType,
709 annotations: Vec<FieldAnnotation>,
710 ) -> Self {
711 self.fields.push((name.into(), field_type));
712 self.field_meta.push(annotations);
713 self
714 }
715
716 pub fn build(self) -> TypeSchema {
718 let mut schema = TypeSchema::new(self.name, self.fields);
719 for (i, annotations) in self.field_meta.into_iter().enumerate() {
721 if i < schema.fields.len() {
722 schema.fields[i].annotations = annotations;
723 }
724 }
725 schema
726 }
727
728 pub fn register(self, registry: &mut TypeSchemaRegistry) -> SchemaId {
737 let id = registry.allocate_id();
738 let mut schema = TypeSchema::with_id(id, self.name, self.fields);
739 for (i, annotations) in self.field_meta.into_iter().enumerate() {
740 if i < schema.fields.len() {
741 schema.fields[i].annotations = annotations;
742 }
743 }
744 registry.register(schema);
745 id
746 }
747}
748
749#[cfg(test)]
750mod tests {
751 use super::*;
752
753 #[test]
754 fn test_registry() {
755 let mut registry = TypeSchemaRegistry::new();
756
757 let schema_id = registry.register_type(
758 "MyType",
759 vec![
760 ("x".to_string(), FieldType::F64),
761 ("y".to_string(), FieldType::F64),
762 ],
763 );
764
765 assert!(registry.has_type("MyType"));
766 assert!(!registry.has_type("OtherType"));
767
768 let schema = registry.get("MyType").unwrap();
769 assert_eq!(schema.id, schema_id);
770 assert_eq!(schema.field_count(), 2);
771
772 let schema_by_id = registry.get_by_id(schema_id).unwrap();
774 assert_eq!(schema_by_id.name, "MyType");
775 }
776
777 #[test]
778 fn test_builder() {
779 let mut registry = TypeSchemaRegistry::new();
780
781 let schema_id = TypeSchemaBuilder::new("Point")
782 .f64_field("x")
783 .f64_field("y")
784 .f64_field("z")
785 .register(&mut registry);
786
787 let schema = registry.get_by_id(schema_id).unwrap();
788 assert_eq!(schema.name, "Point");
789 assert_eq!(schema.field_count(), 3);
790 assert_eq!(schema.field_offset("x"), Some(0));
791 assert_eq!(schema.field_offset("y"), Some(8));
792 assert_eq!(schema.field_offset("z"), Some(16));
793 }
794
795 #[test]
796 fn test_stdlib_types() {
797 let registry = TypeSchemaRegistry::with_stdlib_types();
798
799 assert!(registry.has_type("Row"));
800 let row_schema = registry.get("Row").unwrap();
801 assert!(row_schema.has_field("timestamp"));
802 }
803
804 #[test]
805 fn test_ohlcv_schema() {
806 let mut registry = TypeSchemaRegistry::new();
808
809 TypeSchemaBuilder::new("Candle")
810 .timestamp_field("timestamp")
811 .f64_field("open")
812 .f64_field("high")
813 .f64_field("low")
814 .f64_field("close")
815 .f64_field("volume")
816 .register(&mut registry);
817
818 let schema = registry.get("Candle").unwrap();
819 assert_eq!(schema.field_count(), 6);
820 assert_eq!(schema.data_size, 48); assert_eq!(schema.field_offset("timestamp"), Some(0));
824 assert_eq!(schema.field_offset("open"), Some(8));
825 assert_eq!(schema.field_offset("high"), Some(16));
826 assert_eq!(schema.field_offset("low"), Some(24));
827 assert_eq!(schema.field_offset("close"), Some(32));
828 assert_eq!(schema.field_offset("volume"), Some(40));
829 }
830
831 #[test]
832 fn test_stdlib_enum_types() {
833 let registry = TypeSchemaRegistry::with_stdlib_types();
834
835 assert!(registry.has_type("Option"));
837 let option_schema = registry.get("Option").unwrap();
838 assert!(option_schema.is_enum());
839 assert_eq!(option_schema.variant_id("Some"), Some(0));
840 assert_eq!(option_schema.variant_id("None"), Some(1));
841
842 assert!(registry.has_type("Result"));
844 let result_schema = registry.get("Result").unwrap();
845 assert!(result_schema.is_enum());
846 assert_eq!(result_schema.variant_id("Ok"), Some(0));
847 assert_eq!(result_schema.variant_id("Err"), Some(1));
848 }
849
850 #[test]
851 fn test_max_schema_id() {
852 let mut registry = TypeSchemaRegistry::new();
853 let a = registry.register_type("A", vec![("x".to_string(), FieldType::F64)]);
854 let b = registry.register_type("B", vec![("y".to_string(), FieldType::F64)]);
855 assert_eq!(registry.max_schema_id(), Some(a.max(b)));
856 }
857
858 #[test]
868 fn b1_1_registry_allocate_id_is_per_instance() {
869 let r1 = TypeSchemaRegistry::new();
870 let r2 = TypeSchemaRegistry::new();
871
872 assert_eq!(r1.peek_next_id(), r2.peek_next_id());
874
875 let id1a = r1.allocate_id();
877 let id1b = r1.allocate_id();
878 assert_eq!(id1b, id1a + 1);
879 assert_eq!(r2.peek_next_id(), id1a);
880
881 let id2a = r2.allocate_id();
883 assert_eq!(id2a, id1a);
884 }
885
886 #[test]
887 fn b1_1_new_with_stdlib_uses_registry_counter_for_scoped_types() {
888 let mut r1 = TypeSchemaRegistry::new_with_stdlib();
889 let mut r2 = TypeSchemaRegistry::new_with_stdlib();
890
891 for name in ["Row", "Option", "Result"] {
893 assert!(r1.has_type(name), "r1 missing {name}");
894 assert!(r2.has_type(name), "r2 missing {name}");
895 }
896
897 let r1_user =
901 r1.register_type_scoped("UserA", vec![("x".to_string(), FieldType::F64)]);
902 let r2_user =
903 r2.register_type_scoped("UserA", vec![("x".to_string(), FieldType::F64)]);
904
905 assert_eq!(r1.get("UserA").unwrap().id, r1_user);
907 assert_eq!(r2.get("UserA").unwrap().id, r2_user);
908
909 let r1_user_b =
913 r1.register_type_scoped("UserB", vec![("y".to_string(), FieldType::F64)]);
914 assert_eq!(r1_user_b, r1_user + 1);
915
916 let r2_user_b =
918 r2.register_type_scoped("UserB", vec![("y".to_string(), FieldType::F64)]);
919 assert_eq!(r2_user_b, r2_user + 1);
920 }
921
922 #[test]
923 fn b1_1_scoped_enum_ids_are_per_registry() {
924 let mut r1 = TypeSchemaRegistry::new();
925 let mut r2 = TypeSchemaRegistry::new();
926
927 let e1 = r1.register_enum_scoped(
928 "Color",
929 vec![
930 EnumVariantInfo::new("Red", 0, 0),
931 EnumVariantInfo::new("Green", 1, 0),
932 ],
933 );
934 let e2 = r2.register_enum_scoped(
935 "Color",
936 vec![
937 EnumVariantInfo::new("Red", 0, 0),
938 EnumVariantInfo::new("Green", 1, 0),
939 ],
940 );
941
942 assert_eq!(e1, e2);
945 assert!(r1.get("Color").unwrap().is_enum());
946 assert!(r2.get("Color").unwrap().is_enum());
947 }
948
949 #[test]
950 fn b1_1_ensure_next_id_above_is_per_registry() {
951 let r1 = TypeSchemaRegistry::new();
952 let r2 = TypeSchemaRegistry::new();
953
954 r1.ensure_next_id_above(500);
955 assert_eq!(r1.peek_next_id(), 501);
956
957 assert_eq!(r2.peek_next_id(), INITIAL_SCHEMA_ID);
959 }
960}