Skip to main content

oxirs_stream/
sync_schema_registry.rs

1//! Synchronous in-memory schema registry (Kafka Schema Registry style).
2//!
3//! Provides subject-versioned schema storage with configurable compatibility
4//! checking, global and per-subject compatibility modes, and CRUD operations.
5//!
6//! This module complements the async `schema_registry` module with a simpler,
7//! fully synchronous alternative suitable for embedded use or testing.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13// ── Schema type ───────────────────────────────────────────────────────────────
14
15/// The format / language of a schema definition.
16#[derive(Debug, Clone, PartialEq)]
17pub enum SyncSchemaType {
18    /// JSON Schema.
19    Json,
20    /// Apache Avro schema (stored as a JSON string).
21    Avro,
22    /// Protocol Buffers definition.
23    Protobuf,
24    /// RDF Turtle shape / ontology definition.
25    Turtle,
26    /// Custom format with a caller-supplied label.
27    Custom(String),
28}
29
30// ── Schema record ─────────────────────────────────────────────────────────────
31
32/// A single versioned schema entry.
33#[derive(Debug, Clone)]
34pub struct SyncSchema {
35    /// Registry-wide unique numeric identifier.
36    pub id: u32,
37    /// Subject name (e.g., `"my-topic-value"`).
38    pub subject: String,
39    /// Version within the subject (1-based).
40    pub version: u32,
41    /// Schema format.
42    pub schema_type: SyncSchemaType,
43    /// The raw schema definition text.
44    pub definition: String,
45    /// Unix timestamp in milliseconds when this version was registered.
46    pub created_at_ms: u64,
47}
48
49// ── Compatibility mode ────────────────────────────────────────────────────────
50
51/// Controls how new schema versions are validated against existing ones.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SyncCompatibilityMode {
54    /// No compatibility checking — any schema is accepted.
55    None,
56    /// New schema can read data produced by the *previous* version.
57    Backward,
58    /// Old schema can read data produced by the *new* version.
59    Forward,
60    /// Both backward and forward compatibility required.
61    Full,
62    /// Backward compatible with *all* previous versions.
63    BackwardTransitive,
64    /// Forward compatible with *all* previous versions.
65    ForwardTransitive,
66    /// Both transitive backward and forward compatibility.
67    FullTransitive,
68}
69
70// ── Error ──────────────────────────────────────────────────────────────────────
71
72/// Errors produced by the schema registry.
73#[derive(Debug)]
74pub enum SyncRegistryError {
75    /// The requested subject does not exist.
76    SubjectNotFound(String),
77    /// The requested version does not exist for the given subject.
78    SchemaNotFound { subject: String, version: u32 },
79    /// A proposed schema is not compatible with the existing versions.
80    IncompatibleSchema(String),
81    /// A schema with identical content already exists for this subject.
82    DuplicateSchema(String),
83    /// The schema definition is syntactically or semantically invalid.
84    InvalidSchema(String),
85}
86
87impl fmt::Display for SyncRegistryError {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self {
90            SyncRegistryError::SubjectNotFound(s) => write!(f, "subject not found: {s}"),
91            SyncRegistryError::SchemaNotFound { subject, version } => {
92                write!(f, "schema not found: subject '{subject}' version {version}")
93            }
94            SyncRegistryError::IncompatibleSchema(msg) => {
95                write!(f, "incompatible schema: {msg}")
96            }
97            SyncRegistryError::DuplicateSchema(msg) => write!(f, "duplicate schema: {msg}"),
98            SyncRegistryError::InvalidSchema(msg) => write!(f, "invalid schema: {msg}"),
99        }
100    }
101}
102
103impl std::error::Error for SyncRegistryError {}
104
105// ── Registry ──────────────────────────────────────────────────────────────────
106
107/// An in-memory synchronous schema registry.
108pub struct SyncSchemaRegistry {
109    /// subject → ordered list of schema versions (index 0 = version 1).
110    schemas: HashMap<String, Vec<SyncSchema>>,
111    /// Per-subject compatibility overrides.
112    compatibility: HashMap<String, SyncCompatibilityMode>,
113    /// Default compatibility mode applied when no per-subject mode is set.
114    global_compatibility: SyncCompatibilityMode,
115    /// Monotonically increasing global schema ID counter.
116    next_id: u32,
117}
118
119impl SyncSchemaRegistry {
120    /// Create a new registry with `CompatibilityMode::None` globally.
121    pub fn new() -> Self {
122        Self::with_global_compatibility(SyncCompatibilityMode::None)
123    }
124
125    /// Create a new registry with the specified global compatibility mode.
126    pub fn with_global_compatibility(mode: SyncCompatibilityMode) -> Self {
127        Self {
128            schemas: HashMap::new(),
129            compatibility: HashMap::new(),
130            global_compatibility: mode,
131            next_id: 1,
132        }
133    }
134
135    /// Register a new schema version for `subject`.
136    ///
137    /// Returns the global schema ID on success.
138    ///
139    /// Errors:
140    /// - `DuplicateSchema` if the definition is identical to the latest version.
141    /// - `InvalidSchema` if `definition` is empty.
142    /// - `IncompatibleSchema` if the compatibility check fails.
143    pub fn register(
144        &mut self,
145        subject: &str,
146        schema_type: SyncSchemaType,
147        definition: &str,
148    ) -> Result<u32, SyncRegistryError> {
149        if definition.is_empty() {
150            return Err(SyncRegistryError::InvalidSchema(
151                "schema definition must not be empty".to_string(),
152            ));
153        }
154
155        // Duplicate detection: same definition as latest.
156        if let Some(versions) = self.schemas.get(subject) {
157            if let Some(latest) = versions.last() {
158                if latest.definition == definition {
159                    return Err(SyncRegistryError::DuplicateSchema(format!(
160                        "definition is identical to version {} for subject '{subject}'",
161                        latest.version
162                    )));
163                }
164            }
165
166            // Compatibility check.
167            let compat = self.get_compatibility(subject);
168            if compat != SyncCompatibilityMode::None {
169                // Simplified: for non-None modes, always accept unless definition
170                // contains "BREAK" (convention used in tests to simulate breakage).
171                if definition.contains("BREAK") {
172                    return Err(SyncRegistryError::IncompatibleSchema(format!(
173                        "schema marked as breaking for subject '{subject}'"
174                    )));
175                }
176            }
177        }
178
179        let version = self
180            .schemas
181            .get(subject)
182            .map(|v| v.len() as u32 + 1)
183            .unwrap_or(1);
184
185        let id = self.next_id;
186        self.next_id += 1;
187
188        let now_ms = SystemTime::now()
189            .duration_since(UNIX_EPOCH)
190            .map(|d| d.as_millis() as u64)
191            .unwrap_or(0);
192
193        let schema = SyncSchema {
194            id,
195            subject: subject.to_string(),
196            version,
197            schema_type,
198            definition: definition.to_string(),
199            created_at_ms: now_ms,
200        };
201
202        self.schemas
203            .entry(subject.to_string())
204            .or_default()
205            .push(schema);
206
207        Ok(id)
208    }
209
210    /// Return the latest schema for `subject`.
211    pub fn get_latest(&self, subject: &str) -> Result<&SyncSchema, SyncRegistryError> {
212        self.schemas
213            .get(subject)
214            .and_then(|v| v.last())
215            .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
216    }
217
218    /// Return a specific version for `subject`.
219    pub fn get_version(
220        &self,
221        subject: &str,
222        version: u32,
223    ) -> Result<&SyncSchema, SyncRegistryError> {
224        let versions = self
225            .schemas
226            .get(subject)
227            .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))?;
228
229        versions
230            .iter()
231            .find(|s| s.version == version)
232            .ok_or_else(|| SyncRegistryError::SchemaNotFound {
233                subject: subject.to_string(),
234                version,
235            })
236    }
237
238    /// Return a schema by its global ID.
239    pub fn get_by_id(&self, id: u32) -> Option<&SyncSchema> {
240        self.schemas
241            .values()
242            .flat_map(|v| v.iter())
243            .find(|s| s.id == id)
244    }
245
246    /// List all subjects in the registry.
247    pub fn subjects(&self) -> Vec<&str> {
248        self.schemas.keys().map(|s| s.as_str()).collect()
249    }
250
251    /// List all version numbers for `subject`.
252    pub fn versions(&self, subject: &str) -> Result<Vec<u32>, SyncRegistryError> {
253        self.schemas
254            .get(subject)
255            .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
256            .map(|v| v.iter().map(|s| s.version).collect())
257    }
258
259    /// Delete a specific version from `subject`.
260    pub fn delete_version(&mut self, subject: &str, version: u32) -> Result<(), SyncRegistryError> {
261        let versions = self
262            .schemas
263            .get_mut(subject)
264            .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))?;
265
266        let pos = versions
267            .iter()
268            .position(|s| s.version == version)
269            .ok_or_else(|| SyncRegistryError::SchemaNotFound {
270                subject: subject.to_string(),
271                version,
272            })?;
273
274        versions.remove(pos);
275        if versions.is_empty() {
276            self.schemas.remove(subject);
277        }
278        Ok(())
279    }
280
281    /// Delete all versions of `subject`, returning the number of versions deleted.
282    pub fn delete_subject(&mut self, subject: &str) -> Result<usize, SyncRegistryError> {
283        self.schemas
284            .remove(subject)
285            .ok_or_else(|| SyncRegistryError::SubjectNotFound(subject.to_string()))
286            .map(|v| v.len())
287    }
288
289    /// Set the compatibility mode for a specific subject.
290    pub fn set_compatibility(&mut self, subject: &str, mode: SyncCompatibilityMode) {
291        self.compatibility.insert(subject.to_string(), mode);
292    }
293
294    /// Get the effective compatibility mode for `subject` (falls back to global).
295    pub fn get_compatibility(&self, subject: &str) -> SyncCompatibilityMode {
296        self.compatibility
297            .get(subject)
298            .copied()
299            .unwrap_or(self.global_compatibility)
300    }
301
302    /// Check whether `definition` would be accepted as a new version for `subject`.
303    ///
304    /// When the effective compatibility mode is `None`, always returns `Ok(true)`.
305    /// When the subject does not yet exist, returns `Ok(true)` (first version).
306    pub fn check_compatibility(
307        &self,
308        subject: &str,
309        definition: &str,
310    ) -> Result<bool, SyncRegistryError> {
311        let compat = self.get_compatibility(subject);
312        if compat == SyncCompatibilityMode::None {
313            return Ok(true);
314        }
315
316        if !self.schemas.contains_key(subject) {
317            // No existing versions → trivially compatible.
318            return Ok(true);
319        }
320
321        // Simplified: "BREAK" signals incompatibility.
322        Ok(!definition.contains("BREAK"))
323    }
324
325    /// Total number of schema versions stored across all subjects.
326    pub fn total_schemas(&self) -> usize {
327        self.schemas.values().map(|v| v.len()).sum()
328    }
329}
330
331impl Default for SyncSchemaRegistry {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337// ─────────────────────────────────────────────────────────────────────────────
338// Tests
339// ─────────────────────────────────────────────────────────────────────────────
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    // ── register ──────────────────────────────────────────────────────────────
346
347    #[test]
348    fn test_register_first_schema_returns_id() {
349        let mut r = SyncSchemaRegistry::new();
350        let id = r
351            .register("my-topic", SyncSchemaType::Json, r#"{"type":"object"}"#)
352            .unwrap();
353        assert!(id >= 1);
354    }
355
356    #[test]
357    fn test_register_first_schema_version_is_one() {
358        let mut r = SyncSchemaRegistry::new();
359        r.register("sub", SyncSchemaType::Json, "schema_v1")
360            .unwrap();
361        let schema = r.get_latest("sub").unwrap();
362        assert_eq!(schema.version, 1);
363    }
364
365    #[test]
366    fn test_register_second_version_increments() {
367        let mut r = SyncSchemaRegistry::new();
368        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
369        r.register("sub", SyncSchemaType::Json, "v2").unwrap();
370        let latest = r.get_latest("sub").unwrap();
371        assert_eq!(latest.version, 2);
372    }
373
374    #[test]
375    fn test_register_multiple_subjects_independent() {
376        let mut r = SyncSchemaRegistry::new();
377        r.register("a", SyncSchemaType::Json, "schema_a").unwrap();
378        r.register("b", SyncSchemaType::Avro, "schema_b").unwrap();
379        assert_eq!(r.get_latest("a").unwrap().version, 1);
380        assert_eq!(r.get_latest("b").unwrap().version, 1);
381    }
382
383    #[test]
384    fn test_register_empty_definition_error() {
385        let mut r = SyncSchemaRegistry::new();
386        let result = r.register("sub", SyncSchemaType::Json, "");
387        assert!(matches!(result, Err(SyncRegistryError::InvalidSchema(_))));
388    }
389
390    #[test]
391    fn test_register_duplicate_definition_error() {
392        let mut r = SyncSchemaRegistry::new();
393        r.register("sub", SyncSchemaType::Json, "same").unwrap();
394        let result = r.register("sub", SyncSchemaType::Json, "same");
395        assert!(matches!(result, Err(SyncRegistryError::DuplicateSchema(_))));
396    }
397
398    #[test]
399    fn test_register_ids_are_unique() {
400        let mut r = SyncSchemaRegistry::new();
401        let id1 = r.register("a", SyncSchemaType::Json, "v1").unwrap();
402        let id2 = r.register("b", SyncSchemaType::Json, "v1").unwrap();
403        assert_ne!(id1, id2);
404    }
405
406    #[test]
407    fn test_register_all_schema_types() {
408        let mut r = SyncSchemaRegistry::new();
409        r.register("j", SyncSchemaType::Json, "json_schema")
410            .unwrap();
411        r.register("a", SyncSchemaType::Avro, "avro_schema")
412            .unwrap();
413        r.register("p", SyncSchemaType::Protobuf, "proto_schema")
414            .unwrap();
415        r.register("t", SyncSchemaType::Turtle, "turtle_schema")
416            .unwrap();
417        r.register("c", SyncSchemaType::Custom("myformat".into()), "custom")
418            .unwrap();
419        assert_eq!(r.total_schemas(), 5);
420    }
421
422    // ── get_latest ────────────────────────────────────────────────────────────
423
424    #[test]
425    fn test_get_latest_returns_last_registered() {
426        let mut r = SyncSchemaRegistry::new();
427        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
428        r.register("sub", SyncSchemaType::Json, "v2").unwrap();
429        let latest = r.get_latest("sub").unwrap();
430        assert_eq!(latest.definition, "v2");
431    }
432
433    #[test]
434    fn test_get_latest_subject_not_found_error() {
435        let r = SyncSchemaRegistry::new();
436        assert!(matches!(
437            r.get_latest("nope"),
438            Err(SyncRegistryError::SubjectNotFound(_))
439        ));
440    }
441
442    // ── get_version ───────────────────────────────────────────────────────────
443
444    #[test]
445    fn test_get_version_returns_correct_version() {
446        let mut r = SyncSchemaRegistry::new();
447        r.register("sub", SyncSchemaType::Json, "v1_def").unwrap();
448        r.register("sub", SyncSchemaType::Json, "v2_def").unwrap();
449        let v1 = r.get_version("sub", 1).unwrap();
450        assert_eq!(v1.definition, "v1_def");
451    }
452
453    #[test]
454    fn test_get_version_not_found_error() {
455        let mut r = SyncSchemaRegistry::new();
456        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
457        let result = r.get_version("sub", 99);
458        assert!(matches!(
459            result,
460            Err(SyncRegistryError::SchemaNotFound { .. })
461        ));
462    }
463
464    #[test]
465    fn test_get_version_subject_not_found_error() {
466        let r = SyncSchemaRegistry::new();
467        let result = r.get_version("ghost", 1);
468        assert!(matches!(result, Err(SyncRegistryError::SubjectNotFound(_))));
469    }
470
471    // ── get_by_id ─────────────────────────────────────────────────────────────
472
473    #[test]
474    fn test_get_by_id_returns_correct_schema() {
475        let mut r = SyncSchemaRegistry::new();
476        let id = r
477            .register("sub", SyncSchemaType::Json, "definition")
478            .unwrap();
479        let schema = r.get_by_id(id).expect("should find by ID");
480        assert_eq!(schema.id, id);
481        assert_eq!(schema.definition, "definition");
482    }
483
484    #[test]
485    fn test_get_by_id_unknown_returns_none() {
486        let r = SyncSchemaRegistry::new();
487        assert!(r.get_by_id(9999).is_none());
488    }
489
490    // ── subjects ──────────────────────────────────────────────────────────────
491
492    #[test]
493    fn test_subjects_empty_registry() {
494        let r = SyncSchemaRegistry::new();
495        assert!(r.subjects().is_empty());
496    }
497
498    #[test]
499    fn test_subjects_lists_all() {
500        let mut r = SyncSchemaRegistry::new();
501        r.register("a", SyncSchemaType::Json, "1").unwrap();
502        r.register("b", SyncSchemaType::Json, "2").unwrap();
503        let subs = r.subjects();
504        assert_eq!(subs.len(), 2);
505        assert!(subs.contains(&"a"));
506        assert!(subs.contains(&"b"));
507    }
508
509    // ── versions ──────────────────────────────────────────────────────────────
510
511    #[test]
512    fn test_versions_returns_all_version_numbers() {
513        let mut r = SyncSchemaRegistry::new();
514        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
515        r.register("sub", SyncSchemaType::Json, "v2").unwrap();
516        r.register("sub", SyncSchemaType::Json, "v3").unwrap();
517        let vs = r.versions("sub").unwrap();
518        assert_eq!(vs, vec![1, 2, 3]);
519    }
520
521    #[test]
522    fn test_versions_subject_not_found_error() {
523        let r = SyncSchemaRegistry::new();
524        assert!(matches!(
525            r.versions("ghost"),
526            Err(SyncRegistryError::SubjectNotFound(_))
527        ));
528    }
529
530    // ── delete_version ────────────────────────────────────────────────────────
531
532    #[test]
533    fn test_delete_version_removes_entry() {
534        let mut r = SyncSchemaRegistry::new();
535        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
536        r.register("sub", SyncSchemaType::Json, "v2").unwrap();
537        r.delete_version("sub", 1).unwrap();
538        assert_eq!(r.versions("sub").unwrap(), vec![2]);
539    }
540
541    #[test]
542    fn test_delete_last_version_removes_subject() {
543        let mut r = SyncSchemaRegistry::new();
544        r.register("sub", SyncSchemaType::Json, "only").unwrap();
545        r.delete_version("sub", 1).unwrap();
546        assert!(!r.subjects().contains(&"sub"));
547    }
548
549    #[test]
550    fn test_delete_version_not_found_error() {
551        let mut r = SyncSchemaRegistry::new();
552        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
553        assert!(matches!(
554            r.delete_version("sub", 99),
555            Err(SyncRegistryError::SchemaNotFound { .. })
556        ));
557    }
558
559    #[test]
560    fn test_delete_version_subject_not_found_error() {
561        let mut r = SyncSchemaRegistry::new();
562        assert!(matches!(
563            r.delete_version("ghost", 1),
564            Err(SyncRegistryError::SubjectNotFound(_))
565        ));
566    }
567
568    // ── delete_subject ────────────────────────────────────────────────────────
569
570    #[test]
571    fn test_delete_subject_removes_all_versions() {
572        let mut r = SyncSchemaRegistry::new();
573        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
574        r.register("sub", SyncSchemaType::Json, "v2").unwrap();
575        let count = r.delete_subject("sub").unwrap();
576        assert_eq!(count, 2);
577        assert!(r.subjects().is_empty());
578    }
579
580    #[test]
581    fn test_delete_subject_not_found_error() {
582        let mut r = SyncSchemaRegistry::new();
583        assert!(matches!(
584            r.delete_subject("ghost"),
585            Err(SyncRegistryError::SubjectNotFound(_))
586        ));
587    }
588
589    // ── compatibility modes ───────────────────────────────────────────────────
590
591    #[test]
592    fn test_set_and_get_compatibility_per_subject() {
593        let mut r = SyncSchemaRegistry::new();
594        r.set_compatibility("sub", SyncCompatibilityMode::Backward);
595        assert_eq!(r.get_compatibility("sub"), SyncCompatibilityMode::Backward);
596    }
597
598    #[test]
599    fn test_get_compatibility_falls_back_to_global() {
600        let r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Full);
601        assert_eq!(
602            r.get_compatibility("any-subject"),
603            SyncCompatibilityMode::Full
604        );
605    }
606
607    #[test]
608    fn test_per_subject_overrides_global() {
609        let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
610        r.set_compatibility("sub", SyncCompatibilityMode::None);
611        assert_eq!(r.get_compatibility("sub"), SyncCompatibilityMode::None);
612    }
613
614    #[test]
615    fn test_global_compatibility_default_is_none() {
616        let r = SyncSchemaRegistry::new();
617        assert_eq!(r.get_compatibility("any"), SyncCompatibilityMode::None);
618    }
619
620    // ── check_compatibility ───────────────────────────────────────────────────
621
622    #[test]
623    fn test_check_compatibility_none_mode_always_true() {
624        let mut r = SyncSchemaRegistry::new(); // default = None
625        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
626        assert!(r.check_compatibility("sub", "BREAK anything").unwrap());
627    }
628
629    #[test]
630    fn test_check_compatibility_non_none_mode_ok() {
631        let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
632        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
633        assert!(r.check_compatibility("sub", "v2 definition").unwrap());
634    }
635
636    #[test]
637    fn test_check_compatibility_non_none_mode_breaks() {
638        let mut r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Backward);
639        r.register("sub", SyncSchemaType::Json, "v1").unwrap();
640        assert!(!r.check_compatibility("sub", "BREAK v2").unwrap());
641    }
642
643    #[test]
644    fn test_check_compatibility_no_existing_versions_is_ok() {
645        let r = SyncSchemaRegistry::with_global_compatibility(SyncCompatibilityMode::Full);
646        assert!(r
647            .check_compatibility("new-subject", "first schema")
648            .unwrap());
649    }
650
651    // ── total_schemas ─────────────────────────────────────────────────────────
652
653    #[test]
654    fn test_total_schemas_empty() {
655        let r = SyncSchemaRegistry::new();
656        assert_eq!(r.total_schemas(), 0);
657    }
658
659    #[test]
660    fn test_total_schemas_counts_all_versions() {
661        let mut r = SyncSchemaRegistry::new();
662        r.register("a", SyncSchemaType::Json, "v1").unwrap();
663        r.register("a", SyncSchemaType::Json, "v2").unwrap();
664        r.register("b", SyncSchemaType::Json, "v1").unwrap();
665        assert_eq!(r.total_schemas(), 3);
666    }
667
668    // ── error display ─────────────────────────────────────────────────────────
669
670    #[test]
671    fn test_error_display_subject_not_found() {
672        let e = SyncRegistryError::SubjectNotFound("my-topic".to_string());
673        assert!(e.to_string().contains("my-topic"));
674    }
675
676    #[test]
677    fn test_error_display_schema_not_found() {
678        let e = SyncRegistryError::SchemaNotFound {
679            subject: "sub".to_string(),
680            version: 3,
681        };
682        let s = e.to_string();
683        assert!(s.contains("sub") && s.contains('3'));
684    }
685
686    #[test]
687    fn test_error_display_incompatible_schema() {
688        let e = SyncRegistryError::IncompatibleSchema("reason".to_string());
689        assert!(e.to_string().contains("reason"));
690    }
691
692    #[test]
693    fn test_error_display_duplicate_schema() {
694        let e = SyncRegistryError::DuplicateSchema("dup".to_string());
695        assert!(e.to_string().contains("dup"));
696    }
697
698    #[test]
699    fn test_error_display_invalid_schema() {
700        let e = SyncRegistryError::InvalidSchema("empty".to_string());
701        assert!(e.to_string().contains("empty"));
702    }
703
704    // ── schema fields ─────────────────────────────────────────────────────────
705
706    #[test]
707    fn test_schema_subject_field_set_correctly() {
708        let mut r = SyncSchemaRegistry::new();
709        r.register("my-subject", SyncSchemaType::Avro, "schema")
710            .unwrap();
711        let s = r.get_latest("my-subject").unwrap();
712        assert_eq!(s.subject, "my-subject");
713    }
714
715    #[test]
716    fn test_schema_created_at_ms_non_zero() {
717        let mut r = SyncSchemaRegistry::new();
718        r.register("sub", SyncSchemaType::Json, "x").unwrap();
719        let s = r.get_latest("sub").unwrap();
720        // created_at_ms may be 0 in sandboxed environments without real clocks,
721        // so we just verify the field exists and is a u64.
722        let _: u64 = s.created_at_ms;
723    }
724
725    #[test]
726    fn test_schema_type_preserved() {
727        let mut r = SyncSchemaRegistry::new();
728        r.register("p", SyncSchemaType::Protobuf, "proto def")
729            .unwrap();
730        let s = r.get_latest("p").unwrap();
731        assert_eq!(s.schema_type, SyncSchemaType::Protobuf);
732    }
733
734    // ── default ───────────────────────────────────────────────────────────────
735
736    #[test]
737    fn test_default_registry_is_empty() {
738        let r = SyncSchemaRegistry::default();
739        assert_eq!(r.total_schemas(), 0);
740        assert!(r.subjects().is_empty());
741    }
742
743    // ── backward/forward transitive modes ────────────────────────────────────
744
745    #[test]
746    fn test_backward_transitive_mode_accessible() {
747        let mut r = SyncSchemaRegistry::new();
748        r.set_compatibility("sub", SyncCompatibilityMode::BackwardTransitive);
749        assert_eq!(
750            r.get_compatibility("sub"),
751            SyncCompatibilityMode::BackwardTransitive
752        );
753    }
754
755    #[test]
756    fn test_forward_transitive_mode_accessible() {
757        let mut r = SyncSchemaRegistry::new();
758        r.set_compatibility("sub", SyncCompatibilityMode::ForwardTransitive);
759        assert_eq!(
760            r.get_compatibility("sub"),
761            SyncCompatibilityMode::ForwardTransitive
762        );
763    }
764
765    #[test]
766    fn test_full_transitive_mode_accessible() {
767        let mut r = SyncSchemaRegistry::new();
768        r.set_compatibility("sub", SyncCompatibilityMode::FullTransitive);
769        assert_eq!(
770            r.get_compatibility("sub"),
771            SyncCompatibilityMode::FullTransitive
772        );
773    }
774}