1use std::collections::HashMap;
10use std::fmt;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone, PartialEq)]
17pub enum SyncSchemaType {
18 Json,
20 Avro,
22 Protobuf,
24 Turtle,
26 Custom(String),
28}
29
30#[derive(Debug, Clone)]
34pub struct SyncSchema {
35 pub id: u32,
37 pub subject: String,
39 pub version: u32,
41 pub schema_type: SyncSchemaType,
43 pub definition: String,
45 pub created_at_ms: u64,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SyncCompatibilityMode {
54 None,
56 Backward,
58 Forward,
60 Full,
62 BackwardTransitive,
64 ForwardTransitive,
66 FullTransitive,
68}
69
70#[derive(Debug)]
74pub enum SyncRegistryError {
75 SubjectNotFound(String),
77 SchemaNotFound { subject: String, version: u32 },
79 IncompatibleSchema(String),
81 DuplicateSchema(String),
83 InvalidSchema(String),
85}
86
87impl fmt::Display for SyncRegistryError {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 SyncRegistryError::SubjectNotFound(s) => write!(f, "subject not found: {s}"),
91 SyncRegistryError::SchemaNotFound { subject, version } => {
92 write!(f, "schema not found: subject '{subject}' version {version}")
93 }
94 SyncRegistryError::IncompatibleSchema(msg) => {
95 write!(f, "incompatible schema: {msg}")
96 }
97 SyncRegistryError::DuplicateSchema(msg) => write!(f, "duplicate schema: {msg}"),
98 SyncRegistryError::InvalidSchema(msg) => write!(f, "invalid schema: {msg}"),
99 }
100 }
101}
102
103impl std::error::Error for SyncRegistryError {}
104
105pub struct SyncSchemaRegistry {
109 schemas: HashMap<String, Vec<SyncSchema>>,
111 compatibility: HashMap<String, SyncCompatibilityMode>,
113 global_compatibility: SyncCompatibilityMode,
115 next_id: u32,
117}
118
119impl SyncSchemaRegistry {
120 pub fn new() -> Self {
122 Self::with_global_compatibility(SyncCompatibilityMode::None)
123 }
124
125 pub fn with_global_compatibility(mode: SyncCompatibilityMode) -> Self {
127 Self {
128 schemas: HashMap::new(),
129 compatibility: HashMap::new(),
130 global_compatibility: mode,
131 next_id: 1,
132 }
133 }
134
135 pub fn register(
144 &mut self,
145 subject: &str,
146 schema_type: SyncSchemaType,
147 definition: &str,
148 ) -> Result<u32, SyncRegistryError> {
149 if definition.is_empty() {
150 return Err(SyncRegistryError::InvalidSchema(
151 "schema definition must not be empty".to_string(),
152 ));
153 }
154
155 if let Some(versions) = self.schemas.get(subject) {
157 if let Some(latest) = versions.last() {
158 if latest.definition == definition {
159 return Err(SyncRegistryError::DuplicateSchema(format!(
160 "definition is identical to version {} for subject '{subject}'",
161 latest.version
162 )));
163 }
164 }
165
166 let compat = self.get_compatibility(subject);
168 if compat != SyncCompatibilityMode::None {
169 if definition.contains("BREAK") {
172 return Err(SyncRegistryError::IncompatibleSchema(format!(
173 "schema marked as breaking for subject '{subject}'"
174 )));
175 }
176 }
177 }
178
179 let version = self
180 .schemas
181 .get(subject)
182 .map(|v| v.len() as u32 + 1)
183 .unwrap_or(1);
184
185 let id = self.next_id;
186 self.next_id += 1;
187
188 let now_ms = SystemTime::now()
189 .duration_since(UNIX_EPOCH)
190 .map(|d| d.as_millis() as u64)
191 .unwrap_or(0);
192
193 let schema = SyncSchema {
194 id,
195 subject: subject.to_string(),
196 version,
197 schema_type,
198 definition: definition.to_string(),
199 created_at_ms: now_ms,
200 };
201
202 self.schemas
203 .entry(subject.to_string())
204 .or_default()
205 .push(schema);
206
207 Ok(id)
208 }
209
210 pub fn get_latest(&self, subject: &str) -> Result<&SyncSchema, SyncRegistryError> {
212 self.schemas
213 .get(subject)
214 .and_then(|v| v.last())
215 .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
216 }
217
218 pub fn get_version(
220 &self,
221 subject: &str,
222 version: u32,
223 ) -> Result<&SyncSchema, SyncRegistryError> {
224 let versions = self
225 .schemas
226 .get(subject)
227 .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))?;
228
229 versions
230 .iter()
231 .find(|s| s.version == version)
232 .ok_or_else(|| SyncRegistryError::SchemaNotFound {
233 subject: subject.to_string(),
234 version,
235 })
236 }
237
238 pub fn get_by_id(&self, id: u32) -> Option<&SyncSchema> {
240 self.schemas
241 .values()
242 .flat_map(|v| v.iter())
243 .find(|s| s.id == id)
244 }
245
246 pub fn subjects(&self) -> Vec<&str> {
248 self.schemas.keys().map(|s| s.as_str()).collect()
249 }
250
251 pub fn versions(&self, subject: &str) -> Result<Vec<u32>, SyncRegistryError> {
253 self.schemas
254 .get(subject)
255 .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
256 .map(|v| v.iter().map(|s| s.version).collect())
257 }
258
259 pub fn delete_version(&mut self, subject: &str, version: u32) -> Result<(), SyncRegistryError> {
261 let versions = self
262 .schemas
263 .get_mut(subject)
264 .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))?;
265
266 let pos = versions
267 .iter()
268 .position(|s| s.version == version)
269 .ok_or_else(|| SyncRegistryError::SchemaNotFound {
270 subject: subject.to_string(),
271 version,
272 })?;
273
274 versions.remove(pos);
275 if versions.is_empty() {
276 self.schemas.remove(subject);
277 }
278 Ok(())
279 }
280
281 pub fn delete_subject(&mut self, subject: &str) -> Result<usize, SyncRegistryError> {
283 self.schemas
284 .remove(subject)
285 .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
286 .map(|v| v.len())
287 }
288
289 pub fn set_compatibility(&mut self, subject: &str, mode: SyncCompatibilityMode) {
291 self.compatibility.insert(subject.to_string(), mode);
292 }
293
294 pub fn get_compatibility(&self, subject: &str) -> SyncCompatibilityMode {
296 self.compatibility
297 .get(subject)
298 .copied()
299 .unwrap_or(self.global_compatibility)
300 }
301
302 pub fn check_compatibility(
307 &self,
308 subject: &str,
309 definition: &str,
310 ) -> Result<bool, SyncRegistryError> {
311 let compat = self.get_compatibility(subject);
312 if compat == SyncCompatibilityMode::None {
313 return Ok(true);
314 }
315
316 if !self.schemas.contains_key(subject) {
317 return Ok(true);
319 }
320
321 Ok(!definition.contains("BREAK"))
323 }
324
325 pub fn total_schemas(&self) -> usize {
327 self.schemas.values().map(|v| v.len()).sum()
328 }
329}
330
331impl Default for SyncSchemaRegistry {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
348 fn test_register_first_schema_returns_id() {
349 let mut r = SyncSchemaRegistry::new();
350 let id = r
351 .register("my-topic", SyncSchemaType::Json, r#"{"type":"object"}"#)
352 .unwrap();
353 assert!(id >= 1);
354 }
355
356 #[test]
357 fn test_register_first_schema_version_is_one() {
358 let mut r = SyncSchemaRegistry::new();
359 r.register("sub", SyncSchemaType::Json, "schema_v1")
360 .unwrap();
361 let schema = r.get_latest("sub").unwrap();
362 assert_eq!(schema.version, 1);
363 }
364
365 #[test]
366 fn test_register_second_version_increments() {
367 let mut r = SyncSchemaRegistry::new();
368 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
369 r.register("sub", SyncSchemaType::Json, "v2").unwrap();
370 let latest = r.get_latest("sub").unwrap();
371 assert_eq!(latest.version, 2);
372 }
373
374 #[test]
375 fn test_register_multiple_subjects_independent() {
376 let mut r = SyncSchemaRegistry::new();
377 r.register("a", SyncSchemaType::Json, "schema_a").unwrap();
378 r.register("b", SyncSchemaType::Avro, "schema_b").unwrap();
379 assert_eq!(r.get_latest("a").unwrap().version, 1);
380 assert_eq!(r.get_latest("b").unwrap().version, 1);
381 }
382
383 #[test]
384 fn test_register_empty_definition_error() {
385 let mut r = SyncSchemaRegistry::new();
386 let result = r.register("sub", SyncSchemaType::Json, "");
387 assert!(matches!(result, Err(SyncRegistryError::InvalidSchema(_))));
388 }
389
390 #[test]
391 fn test_register_duplicate_definition_error() {
392 let mut r = SyncSchemaRegistry::new();
393 r.register("sub", SyncSchemaType::Json, "same").unwrap();
394 let result = r.register("sub", SyncSchemaType::Json, "same");
395 assert!(matches!(result, Err(SyncRegistryError::DuplicateSchema(_))));
396 }
397
398 #[test]
399 fn test_register_ids_are_unique() {
400 let mut r = SyncSchemaRegistry::new();
401 let id1 = r.register("a", SyncSchemaType::Json, "v1").unwrap();
402 let id2 = r.register("b", SyncSchemaType::Json, "v1").unwrap();
403 assert_ne!(id1, id2);
404 }
405
406 #[test]
407 fn test_register_all_schema_types() {
408 let mut r = SyncSchemaRegistry::new();
409 r.register("j", SyncSchemaType::Json, "json_schema")
410 .unwrap();
411 r.register("a", SyncSchemaType::Avro, "avro_schema")
412 .unwrap();
413 r.register("p", SyncSchemaType::Protobuf, "proto_schema")
414 .unwrap();
415 r.register("t", SyncSchemaType::Turtle, "turtle_schema")
416 .unwrap();
417 r.register("c", SyncSchemaType::Custom("myformat".into()), "custom")
418 .unwrap();
419 assert_eq!(r.total_schemas(), 5);
420 }
421
422 #[test]
425 fn test_get_latest_returns_last_registered() {
426 let mut r = SyncSchemaRegistry::new();
427 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
428 r.register("sub", SyncSchemaType::Json, "v2").unwrap();
429 let latest = r.get_latest("sub").unwrap();
430 assert_eq!(latest.definition, "v2");
431 }
432
433 #[test]
434 fn test_get_latest_subject_not_found_error() {
435 let r = SyncSchemaRegistry::new();
436 assert!(matches!(
437 r.get_latest("nope"),
438 Err(SyncRegistryError::SubjectNotFound(_))
439 ));
440 }
441
442 #[test]
445 fn test_get_version_returns_correct_version() {
446 let mut r = SyncSchemaRegistry::new();
447 r.register("sub", SyncSchemaType::Json, "v1_def").unwrap();
448 r.register("sub", SyncSchemaType::Json, "v2_def").unwrap();
449 let v1 = r.get_version("sub", 1).unwrap();
450 assert_eq!(v1.definition, "v1_def");
451 }
452
453 #[test]
454 fn test_get_version_not_found_error() {
455 let mut r = SyncSchemaRegistry::new();
456 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
457 let result = r.get_version("sub", 99);
458 assert!(matches!(
459 result,
460 Err(SyncRegistryError::SchemaNotFound { .. })
461 ));
462 }
463
464 #[test]
465 fn test_get_version_subject_not_found_error() {
466 let r = SyncSchemaRegistry::new();
467 let result = r.get_version("ghost", 1);
468 assert!(matches!(result, Err(SyncRegistryError::SubjectNotFound(_))));
469 }
470
471 #[test]
474 fn test_get_by_id_returns_correct_schema() {
475 let mut r = SyncSchemaRegistry::new();
476 let id = r
477 .register("sub", SyncSchemaType::Json, "definition")
478 .unwrap();
479 let schema = r.get_by_id(id).expect("should find by ID");
480 assert_eq!(schema.id, id);
481 assert_eq!(schema.definition, "definition");
482 }
483
484 #[test]
485 fn test_get_by_id_unknown_returns_none() {
486 let r = SyncSchemaRegistry::new();
487 assert!(r.get_by_id(9999).is_none());
488 }
489
490 #[test]
493 fn test_subjects_empty_registry() {
494 let r = SyncSchemaRegistry::new();
495 assert!(r.subjects().is_empty());
496 }
497
498 #[test]
499 fn test_subjects_lists_all() {
500 let mut r = SyncSchemaRegistry::new();
501 r.register("a", SyncSchemaType::Json, "1").unwrap();
502 r.register("b", SyncSchemaType::Json, "2").unwrap();
503 let subs = r.subjects();
504 assert_eq!(subs.len(), 2);
505 assert!(subs.contains(&"a"));
506 assert!(subs.contains(&"b"));
507 }
508
509 #[test]
512 fn test_versions_returns_all_version_numbers() {
513 let mut r = SyncSchemaRegistry::new();
514 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
515 r.register("sub", SyncSchemaType::Json, "v2").unwrap();
516 r.register("sub", SyncSchemaType::Json, "v3").unwrap();
517 let vs = r.versions("sub").unwrap();
518 assert_eq!(vs, vec![1, 2, 3]);
519 }
520
521 #[test]
522 fn test_versions_subject_not_found_error() {
523 let r = SyncSchemaRegistry::new();
524 assert!(matches!(
525 r.versions("ghost"),
526 Err(SyncRegistryError::SubjectNotFound(_))
527 ));
528 }
529
530 #[test]
533 fn test_delete_version_removes_entry() {
534 let mut r = SyncSchemaRegistry::new();
535 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
536 r.register("sub", SyncSchemaType::Json, "v2").unwrap();
537 r.delete_version("sub", 1).unwrap();
538 assert_eq!(r.versions("sub").unwrap(), vec![2]);
539 }
540
541 #[test]
542 fn test_delete_last_version_removes_subject() {
543 let mut r = SyncSchemaRegistry::new();
544 r.register("sub", SyncSchemaType::Json, "only").unwrap();
545 r.delete_version("sub", 1).unwrap();
546 assert!(!r.subjects().contains(&"sub"));
547 }
548
549 #[test]
550 fn test_delete_version_not_found_error() {
551 let mut r = SyncSchemaRegistry::new();
552 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
553 assert!(matches!(
554 r.delete_version("sub", 99),
555 Err(SyncRegistryError::SchemaNotFound { .. })
556 ));
557 }
558
559 #[test]
560 fn test_delete_version_subject_not_found_error() {
561 let mut r = SyncSchemaRegistry::new();
562 assert!(matches!(
563 r.delete_version("ghost", 1),
564 Err(SyncRegistryError::SubjectNotFound(_))
565 ));
566 }
567
568 #[test]
571 fn test_delete_subject_removes_all_versions() {
572 let mut r = SyncSchemaRegistry::new();
573 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
574 r.register("sub", SyncSchemaType::Json, "v2").unwrap();
575 let count = r.delete_subject("sub").unwrap();
576 assert_eq!(count, 2);
577 assert!(r.subjects().is_empty());
578 }
579
580 #[test]
581 fn test_delete_subject_not_found_error() {
582 let mut r = SyncSchemaRegistry::new();
583 assert!(matches!(
584 r.delete_subject("ghost"),
585 Err(SyncRegistryError::SubjectNotFound(_))
586 ));
587 }
588
589 #[test]
592 fn test_set_and_get_compatibility_per_subject() {
593 let mut r = SyncSchemaRegistry::new();
594 r.set_compatibility("sub", SyncCompatibilityMode::Backward);
595 assert_eq!(r.get_compatibility("sub"), SyncCompatibilityMode::Backward);
596 }
597
598 #[test]
599 fn test_get_compatibility_falls_back_to_global() {
600 let r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Full);
601 assert_eq!(
602 r.get_compatibility("any-subject"),
603 SyncCompatibilityMode::Full
604 );
605 }
606
607 #[test]
608 fn test_per_subject_overrides_global() {
609 let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
610 r.set_compatibility("sub", SyncCompatibilityMode::None);
611 assert_eq!(r.get_compatibility("sub"), SyncCompatibilityMode::None);
612 }
613
614 #[test]
615 fn test_global_compatibility_default_is_none() {
616 let r = SyncSchemaRegistry::new();
617 assert_eq!(r.get_compatibility("any"), SyncCompatibilityMode::None);
618 }
619
620 #[test]
623 fn test_check_compatibility_none_mode_always_true() {
624 let mut r = SyncSchemaRegistry::new(); r.register("sub", SyncSchemaType::Json, "v1").unwrap();
626 assert!(r.check_compatibility("sub", "BREAK anything").unwrap());
627 }
628
629 #[test]
630 fn test_check_compatibility_non_none_mode_ok() {
631 let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
632 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
633 assert!(r.check_compatibility("sub", "v2 definition").unwrap());
634 }
635
636 #[test]
637 fn test_check_compatibility_non_none_mode_breaks() {
638 let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
639 r.register("sub", SyncSchemaType::Json, "v1").unwrap();
640 assert!(!r.check_compatibility("sub", "BREAK v2").unwrap());
641 }
642
643 #[test]
644 fn test_check_compatibility_no_existing_versions_is_ok() {
645 let r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Full);
646 assert!(r
647 .check_compatibility("new-subject", "first schema")
648 .unwrap());
649 }
650
651 #[test]
654 fn test_total_schemas_empty() {
655 let r = SyncSchemaRegistry::new();
656 assert_eq!(r.total_schemas(), 0);
657 }
658
659 #[test]
660 fn test_total_schemas_counts_all_versions() {
661 let mut r = SyncSchemaRegistry::new();
662 r.register("a", SyncSchemaType::Json, "v1").unwrap();
663 r.register("a", SyncSchemaType::Json, "v2").unwrap();
664 r.register("b", SyncSchemaType::Json, "v1").unwrap();
665 assert_eq!(r.total_schemas(), 3);
666 }
667
668 #[test]
671 fn test_error_display_subject_not_found() {
672 let e = SyncRegistryError::SubjectNotFound("my-topic".to_string());
673 assert!(e.to_string().contains("my-topic"));
674 }
675
676 #[test]
677 fn test_error_display_schema_not_found() {
678 let e = SyncRegistryError::SchemaNotFound {
679 subject: "sub".to_string(),
680 version: 3,
681 };
682 let s = e.to_string();
683 assert!(s.contains("sub") && s.contains('3'));
684 }
685
686 #[test]
687 fn test_error_display_incompatible_schema() {
688 let e = SyncRegistryError::IncompatibleSchema("reason".to_string());
689 assert!(e.to_string().contains("reason"));
690 }
691
692 #[test]
693 fn test_error_display_duplicate_schema() {
694 let e = SyncRegistryError::DuplicateSchema("dup".to_string());
695 assert!(e.to_string().contains("dup"));
696 }
697
698 #[test]
699 fn test_error_display_invalid_schema() {
700 let e = SyncRegistryError::InvalidSchema("empty".to_string());
701 assert!(e.to_string().contains("empty"));
702 }
703
704 #[test]
707 fn test_schema_subject_field_set_correctly() {
708 let mut r = SyncSchemaRegistry::new();
709 r.register("my-subject", SyncSchemaType::Avro, "schema")
710 .unwrap();
711 let s = r.get_latest("my-subject").unwrap();
712 assert_eq!(s.subject, "my-subject");
713 }
714
715 #[test]
716 fn test_schema_created_at_ms_non_zero() {
717 let mut r = SyncSchemaRegistry::new();
718 r.register("sub", SyncSchemaType::Json, "x").unwrap();
719 let s = r.get_latest("sub").unwrap();
720 let _: u64 = s.created_at_ms;
723 }
724
725 #[test]
726 fn test_schema_type_preserved() {
727 let mut r = SyncSchemaRegistry::new();
728 r.register("p", SyncSchemaType::Protobuf, "proto def")
729 .unwrap();
730 let s = r.get_latest("p").unwrap();
731 assert_eq!(s.schema_type, SyncSchemaType::Protobuf);
732 }
733
734 #[test]
737 fn test_default_registry_is_empty() {
738 let r = SyncSchemaRegistry::default();
739 assert_eq!(r.total_schemas(), 0);
740 assert!(r.subjects().is_empty());
741 }
742
743 #[test]
746 fn test_backward_transitive_mode_accessible() {
747 let mut r = SyncSchemaRegistry::new();
748 r.set_compatibility("sub", SyncCompatibilityMode::BackwardTransitive);
749 assert_eq!(
750 r.get_compatibility("sub"),
751 SyncCompatibilityMode::BackwardTransitive
752 );
753 }
754
755 #[test]
756 fn test_forward_transitive_mode_accessible() {
757 let mut r = SyncSchemaRegistry::new();
758 r.set_compatibility("sub", SyncCompatibilityMode::ForwardTransitive);
759 assert_eq!(
760 r.get_compatibility("sub"),
761 SyncCompatibilityMode::ForwardTransitive
762 );
763 }
764
765 #[test]
766 fn test_full_transitive_mode_accessible() {
767 let mut r = SyncSchemaRegistry::new();
768 r.set_compatibility("sub", SyncCompatibilityMode::FullTransitive);
769 assert_eq!(
770 r.get_compatibility("sub"),
771 SyncCompatibilityMode::FullTransitive
772 );
773 }
774}