Skip to main content

rivven_schema/
types.rs

1//! Schema types and data structures
2//!
3//! This module provides core types for the Schema Registry:
4//! - [`SchemaId`], [`Subject`], [`SchemaVersion`] - Core identifiers
5//! - [`Schema`], [`SubjectVersion`] - Schema data structures
6//! - [`SchemaContext`] - Multi-tenant context/group support
7//! - [`VersionState`] - Version lifecycle management
8//! - [`ValidationRule`] - Custom content validation rules
9//! - [`RegistryMode`](crate::server::RegistryMode), [`CompatibilityLevel`] - Registry configuration
10
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14// Re-export SchemaType from rivven-protocol (single source of truth)
15pub use rivven_protocol::SchemaType;
16
17/// Unique identifier for a schema (global across all subjects)
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub struct SchemaId(pub u32);
20
21impl SchemaId {
22    pub fn new(id: u32) -> Self {
23        Self(id)
24    }
25
26    pub fn as_u32(&self) -> u32 {
27        self.0
28    }
29}
30
31impl std::fmt::Display for SchemaId {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "{}", self.0)
34    }
35}
36
37impl From<u32> for SchemaId {
38    fn from(id: u32) -> Self {
39        Self(id)
40    }
41}
42
43/// Subject (typically topic-name + "-key" or "-value")
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct Subject(pub String);
46
47impl Subject {
48    pub fn new(name: impl Into<String>) -> Self {
49        Self(name.into())
50    }
51
52    /// Create a key subject for a topic
53    pub fn key(topic: &str) -> Self {
54        Self(format!("{}-key", topic))
55    }
56
57    /// Create a value subject for a topic
58    pub fn value(topic: &str) -> Self {
59        Self(format!("{}-value", topic))
60    }
61
62    pub fn as_str(&self) -> &str {
63        &self.0
64    }
65}
66
67impl std::fmt::Display for Subject {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "{}", self.0)
70    }
71}
72
73impl From<&str> for Subject {
74    fn from(s: &str) -> Self {
75        Self(s.to_string())
76    }
77}
78
79impl From<String> for Subject {
80    fn from(s: String) -> Self {
81        Self(s)
82    }
83}
84
85impl AsRef<str> for Subject {
86    fn as_ref(&self) -> &str {
87        &self.0
88    }
89}
90
91// ============================================================================
92// Schema Context (Multi-tenant support)
93// ============================================================================
94
95/// Schema context for multi-tenant isolation
96///
97/// Contexts allow organizing schemas into isolated namespaces, similar to
98/// Confluent's Schema Registry contexts. Each context has its own set of
99/// subjects and schemas.
100///
101/// # Examples
102///
103/// ```rust
104/// use rivven_schema::types::SchemaContext;
105///
106/// // Default context (root)
107/// let default_ctx = SchemaContext::default();
108///
109/// // Tenant-specific context
110/// let tenant_ctx = SchemaContext::new("tenant-123");
111///
112/// // Environment context
113/// let prod_ctx = SchemaContext::new("production");
114/// ```
115#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
116pub struct SchemaContext {
117    /// Context name (empty string = default/root context)
118    name: String,
119    /// Context description
120    #[serde(skip_serializing_if = "Option::is_none")]
121    description: Option<String>,
122    /// Whether this context is active
123    #[serde(default = "default_true")]
124    active: bool,
125}
126
127fn default_true() -> bool {
128    true
129}
130
131impl SchemaContext {
132    /// The default (root) context name
133    pub const DEFAULT: &'static str = "";
134
135    /// Create a new context with the given name
136    pub fn new(name: impl Into<String>) -> Self {
137        Self {
138            name: name.into(),
139            description: None,
140            active: true,
141        }
142    }
143
144    /// Create the default (root) context
145    pub fn default_context() -> Self {
146        Self::new("")
147    }
148
149    /// Get the context name
150    pub fn name(&self) -> &str {
151        &self.name
152    }
153
154    /// Check if this is the default context
155    pub fn is_default(&self) -> bool {
156        self.name.is_empty()
157    }
158
159    /// Set the context description
160    pub fn with_description(mut self, description: impl Into<String>) -> Self {
161        self.description = Some(description.into());
162        self
163    }
164
165    /// Get the context description
166    pub fn description(&self) -> Option<&str> {
167        self.description.as_deref()
168    }
169
170    /// Set the active state
171    pub fn with_active(mut self, active: bool) -> Self {
172        self.active = active;
173        self
174    }
175
176    /// Check if the context is active
177    pub fn is_active(&self) -> bool {
178        self.active
179    }
180
181    /// Create a fully qualified subject name with context prefix
182    ///
183    /// Format: `:.{context}:{subject}` (Confluent compatible)
184    pub fn qualify_subject(&self, subject: &Subject) -> String {
185        if self.is_default() {
186            subject.0.clone()
187        } else {
188            format!(":.{}:{}", self.name, subject.0)
189        }
190    }
191
192    /// Parse a qualified subject name into context and subject
193    pub fn parse_qualified(qualified: &str) -> (SchemaContext, Subject) {
194        if let Some(rest) = qualified.strip_prefix(":.") {
195            if let Some(colon_pos) = rest.find(':') {
196                let context_name = &rest[..colon_pos];
197                let subject_name = &rest[colon_pos + 1..];
198                return (SchemaContext::new(context_name), Subject::new(subject_name));
199            }
200        }
201        (SchemaContext::default_context(), Subject::new(qualified))
202    }
203}
204
205impl Default for SchemaContext {
206    fn default() -> Self {
207        Self::default_context()
208    }
209}
210
211impl std::fmt::Display for SchemaContext {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        if self.is_default() {
214            write!(f, "(default)")
215        } else {
216            write!(f, "{}", self.name)
217        }
218    }
219}
220
221// ============================================================================
222// Version State (Lifecycle management)
223// ============================================================================
224
225/// State of a schema version within a subject
226///
227/// Allows managing the lifecycle of schema versions without deletion.
228/// This is important for schema evolution and migration workflows.
229///
230/// # State Transitions
231///
232/// ```text
233///     ┌─────────┐
234///     │ Enabled │ ◄───── Initial state
235///     └────┬────┘
236///          │ deprecate()
237///          ▼
238///   ┌──────────────┐
239///   │  Deprecated  │ ◄───── Warning state
240///   └──────┬───────┘
241///          │ disable()
242///          ▼
243///     ┌──────────┐
244///     │ Disabled │ ◄───── Blocked state
245///     └──────────┘
246/// ```
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
248#[serde(rename_all = "UPPERCASE")]
249pub enum VersionState {
250    /// Version is active and can be used (default)
251    #[default]
252    Enabled,
253    /// Version is deprecated - still usable but clients are warned
254    Deprecated,
255    /// Version is disabled - cannot be used for new registrations
256    Disabled,
257}
258
259impl VersionState {
260    /// Check if the version is usable (enabled or deprecated)
261    pub fn is_usable(&self) -> bool {
262        matches!(self, VersionState::Enabled | VersionState::Deprecated)
263    }
264
265    /// Check if this state requires client warnings
266    pub fn requires_warning(&self) -> bool {
267        matches!(self, VersionState::Deprecated)
268    }
269
270    /// Check if this version is blocked from use
271    pub fn is_blocked(&self) -> bool {
272        matches!(self, VersionState::Disabled)
273    }
274
275    /// Transition to deprecated state
276    pub fn deprecate(&self) -> VersionState {
277        match self {
278            VersionState::Enabled => VersionState::Deprecated,
279            other => *other,
280        }
281    }
282
283    /// Transition to disabled state
284    pub fn disable(&self) -> VersionState {
285        VersionState::Disabled
286    }
287
288    /// Transition back to enabled state
289    pub fn enable(&self) -> VersionState {
290        VersionState::Enabled
291    }
292}
293
294impl std::fmt::Display for VersionState {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        match self {
297            VersionState::Enabled => write!(f, "ENABLED"),
298            VersionState::Deprecated => write!(f, "DEPRECATED"),
299            VersionState::Disabled => write!(f, "DISABLED"),
300        }
301    }
302}
303
304impl std::str::FromStr for VersionState {
305    type Err = String;
306
307    fn from_str(s: &str) -> Result<Self, Self::Err> {
308        match s.to_uppercase().as_str() {
309            "ENABLED" => Ok(VersionState::Enabled),
310            "DEPRECATED" => Ok(VersionState::Deprecated),
311            "DISABLED" => Ok(VersionState::Disabled),
312            _ => Err(format!("Invalid version state: {}", s)),
313        }
314    }
315}
316
317// ============================================================================
318// Content Validation Rules
319// ============================================================================
320
321/// Rule type for content validation
322#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
323#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
324pub enum ValidationRuleType {
325    /// JSON Schema validation rule
326    JsonSchema,
327    /// Regular expression pattern
328    Regex,
329    /// Field presence requirement
330    FieldRequired,
331    /// Field type constraint
332    FieldType,
333    /// Maximum schema size in bytes
334    MaxSize,
335    /// Schema naming convention
336    NamingConvention,
337}
338
339impl std::str::FromStr for ValidationRuleType {
340    type Err = String;
341
342    fn from_str(s: &str) -> Result<Self, Self::Err> {
343        match s.to_uppercase().as_str() {
344            "JSON_SCHEMA" | "JSONSCHEMA" => Ok(ValidationRuleType::JsonSchema),
345            "REGEX" => Ok(ValidationRuleType::Regex),
346            "FIELD_REQUIRED" | "FIELDREQUIRED" => Ok(ValidationRuleType::FieldRequired),
347            "FIELD_TYPE" | "FIELDTYPE" => Ok(ValidationRuleType::FieldType),
348            "MAX_SIZE" | "MAXSIZE" => Ok(ValidationRuleType::MaxSize),
349            "NAMING_CONVENTION" | "NAMINGCONVENTION" => Ok(ValidationRuleType::NamingConvention),
350            _ => Err(format!("Invalid validation rule type: {}", s)),
351        }
352    }
353}
354
355/// Content validation rule for custom schema validation
356///
357/// Validation rules extend beyond compatibility checking to enforce
358/// organizational policies and data quality requirements.
359///
360/// # Examples
361///
362/// ```rust
363/// use rivven_schema::types::{ValidationRule, ValidationRuleType, ValidationLevel};
364///
365/// // Require a "doc" field in all Avro schemas
366/// let doc_required = ValidationRule::new(
367///     "doc-required",
368///     ValidationRuleType::FieldRequired,
369///     r#"{"field": "doc"}"#,
370/// ).with_level(ValidationLevel::Error);
371///
372/// // Enforce naming convention
373/// let naming_rule = ValidationRule::new(
374///     "pascal-case",
375///     ValidationRuleType::NamingConvention,
376///     r#"{"pattern": "^[A-Z][a-zA-Z0-9]*$"}"#,
377/// );
378/// ```
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct ValidationRule {
381    /// Unique rule name
382    pub name: String,
383    /// Rule type
384    pub rule_type: ValidationRuleType,
385    /// Rule configuration (JSON)
386    pub config: String,
387    /// Validation level
388    #[serde(default)]
389    pub level: ValidationLevel,
390    /// Description of what the rule checks
391    #[serde(skip_serializing_if = "Option::is_none")]
392    pub description: Option<String>,
393    /// Whether the rule is active
394    #[serde(default = "default_true")]
395    pub active: bool,
396    /// Schema types this rule applies to (empty = all)
397    #[serde(default, skip_serializing_if = "Vec::is_empty")]
398    pub applies_to: Vec<SchemaType>,
399    /// Subjects this rule applies to (regex patterns, empty = all)
400    #[serde(default, skip_serializing_if = "Vec::is_empty")]
401    pub subject_patterns: Vec<String>,
402}
403
404impl ValidationRule {
405    /// Create a new validation rule
406    pub fn new(
407        name: impl Into<String>,
408        rule_type: ValidationRuleType,
409        config: impl Into<String>,
410    ) -> Self {
411        Self {
412            name: name.into(),
413            rule_type,
414            config: config.into(),
415            level: ValidationLevel::Error,
416            description: None,
417            active: true,
418            applies_to: Vec::new(),
419            subject_patterns: Vec::new(),
420        }
421    }
422
423    /// Set the validation level
424    pub fn with_level(mut self, level: ValidationLevel) -> Self {
425        self.level = level;
426        self
427    }
428
429    /// Set the description
430    pub fn with_description(mut self, description: impl Into<String>) -> Self {
431        self.description = Some(description.into());
432        self
433    }
434
435    /// Set the active state
436    pub fn with_active(mut self, active: bool) -> Self {
437        self.active = active;
438        self
439    }
440
441    /// Add schema types this rule applies to
442    pub fn with_schema_types(mut self, types: Vec<SchemaType>) -> Self {
443        self.applies_to = types;
444        self
445    }
446
447    /// Add subject patterns this rule applies to
448    pub fn with_subject_patterns(mut self, patterns: Vec<String>) -> Self {
449        self.subject_patterns = patterns;
450        self
451    }
452
453    /// Alias for with_subject_patterns for convenience
454    pub fn for_subjects(self, patterns: Vec<String>) -> Self {
455        self.with_subject_patterns(patterns)
456    }
457
458    /// Alias for with_schema_types for convenience
459    pub fn for_schema_types(self, types: Vec<SchemaType>) -> Self {
460        self.with_schema_types(types)
461    }
462
463    /// Check if this rule applies to the given schema type and subject
464    pub fn applies(&self, schema_type: SchemaType, subject: &str) -> bool {
465        if !self.active {
466            return false;
467        }
468
469        // Check schema type filter
470        if !self.applies_to.is_empty() && !self.applies_to.contains(&schema_type) {
471            return false;
472        }
473
474        // Check subject pattern filter
475        if !self.subject_patterns.is_empty() {
476            let matches_any = regex::RegexSet::new(&self.subject_patterns)
477                .map(|set| set.is_match(subject))
478                .unwrap_or(false);
479            if !matches_any {
480                return false;
481            }
482        }
483
484        true
485    }
486
487    /// Get the rule name
488    pub fn name(&self) -> &str {
489        &self.name
490    }
491
492    /// Get the rule type
493    pub fn rule_type(&self) -> ValidationRuleType {
494        self.rule_type.clone()
495    }
496
497    /// Get the rule config
498    pub fn config(&self) -> &str {
499        &self.config
500    }
501
502    /// Get the validation level
503    pub fn level(&self) -> ValidationLevel {
504        self.level
505    }
506
507    /// Get the description
508    pub fn description(&self) -> Option<&str> {
509        self.description.as_deref()
510    }
511
512    /// Check if the rule is active
513    pub fn is_active(&self) -> bool {
514        self.active
515    }
516}
517
518/// Validation level for rules
519#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
520#[serde(rename_all = "UPPERCASE")]
521pub enum ValidationLevel {
522    /// Validation failure is an error (blocks registration)
523    #[default]
524    Error,
525    /// Validation failure is a warning (logged but allowed)
526    Warning,
527    /// Validation is informational only
528    Info,
529}
530
531impl std::fmt::Display for ValidationLevel {
532    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533        match self {
534            ValidationLevel::Error => write!(f, "ERROR"),
535            ValidationLevel::Warning => write!(f, "WARNING"),
536            ValidationLevel::Info => write!(f, "INFO"),
537        }
538    }
539}
540
541impl std::str::FromStr for ValidationLevel {
542    type Err = String;
543
544    fn from_str(s: &str) -> Result<Self, Self::Err> {
545        match s.to_uppercase().as_str() {
546            "ERROR" => Ok(ValidationLevel::Error),
547            "WARNING" | "WARN" => Ok(ValidationLevel::Warning),
548            "INFO" => Ok(ValidationLevel::Info),
549            _ => Err(format!("Invalid validation level: {}", s)),
550        }
551    }
552}
553
554/// Result of a validation rule check
555#[derive(Debug, Clone, Serialize, Deserialize)]
556pub struct ValidationResult {
557    /// Rule name that was checked
558    pub rule_name: String,
559    /// Whether validation passed
560    pub passed: bool,
561    /// Validation level
562    pub level: ValidationLevel,
563    /// Message describing the result
564    pub message: String,
565    /// Optional details (JSON)
566    #[serde(skip_serializing_if = "Option::is_none")]
567    pub details: Option<String>,
568}
569
570impl ValidationResult {
571    /// Create a passing result
572    pub fn pass(rule_name: impl Into<String>) -> Self {
573        Self {
574            rule_name: rule_name.into(),
575            passed: true,
576            level: ValidationLevel::Info,
577            message: "Validation passed".to_string(),
578            details: None,
579        }
580    }
581
582    /// Create a failing result
583    pub fn fail(
584        rule_name: impl Into<String>,
585        level: ValidationLevel,
586        message: impl Into<String>,
587    ) -> Self {
588        Self {
589            rule_name: rule_name.into(),
590            passed: false,
591            level,
592            message: message.into(),
593            details: None,
594        }
595    }
596
597    /// Add details to the result
598    pub fn with_details(mut self, details: impl Into<String>) -> Self {
599        self.details = Some(details.into());
600        self
601    }
602}
603
604/// Collection of validation results
605#[derive(Debug, Clone, Default, Serialize, Deserialize)]
606pub struct ValidationReport {
607    /// All validation results
608    pub results: Vec<ValidationResult>,
609    /// Summary counts
610    pub summary: ValidationSummary,
611}
612
613impl ValidationReport {
614    /// Create a new empty report
615    pub fn new() -> Self {
616        Self::default()
617    }
618
619    /// Add a result to the report
620    pub fn add_result(&mut self, result: ValidationResult) {
621        if result.passed {
622            self.summary.passed += 1;
623        } else {
624            match result.level {
625                ValidationLevel::Error => self.summary.errors += 1,
626                ValidationLevel::Warning => self.summary.warnings += 1,
627                ValidationLevel::Info => self.summary.info += 1,
628            }
629        }
630        self.results.push(result);
631    }
632
633    /// Check if all validations passed (no errors)
634    pub fn is_valid(&self) -> bool {
635        self.summary.errors == 0
636    }
637
638    /// Check if there are any warnings
639    pub fn has_warnings(&self) -> bool {
640        self.summary.warnings > 0
641    }
642
643    /// Get all error messages
644    pub fn error_messages(&self) -> Vec<String> {
645        self.results
646            .iter()
647            .filter(|r| !r.passed && r.level == ValidationLevel::Error)
648            .map(|r| r.message.clone())
649            .collect()
650    }
651
652    /// Get all warning messages
653    pub fn warning_messages(&self) -> Vec<String> {
654        self.results
655            .iter()
656            .filter(|r| !r.passed && r.level == ValidationLevel::Warning)
657            .map(|r| r.message.clone())
658            .collect()
659    }
660
661    /// Get all info messages
662    pub fn info_messages(&self) -> Vec<String> {
663        self.results
664            .iter()
665            .filter(|r| !r.passed && r.level == ValidationLevel::Info)
666            .map(|r| r.message.clone())
667            .collect()
668    }
669}
670
671/// Summary counts for validation report
672#[derive(Debug, Clone, Default, Serialize, Deserialize)]
673pub struct ValidationSummary {
674    /// Number of passed validations
675    pub passed: usize,
676    /// Number of error-level failures
677    pub errors: usize,
678    /// Number of warning-level failures
679    pub warnings: usize,
680    /// Number of info-level failures
681    pub info: usize,
682}
683
684/// Version number within a subject
685#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
686pub struct SchemaVersion(pub u32);
687
688impl SchemaVersion {
689    /// The special "latest" version marker
690    pub const LATEST: u32 = u32::MAX;
691
692    pub fn new(version: u32) -> Self {
693        Self(version)
694    }
695
696    /// Create a version that represents "latest"
697    pub fn latest() -> Self {
698        Self(Self::LATEST)
699    }
700
701    /// Check if this represents the "latest" version
702    pub fn is_latest(&self) -> bool {
703        self.0 == Self::LATEST
704    }
705
706    pub fn as_u32(&self) -> u32 {
707        self.0
708    }
709}
710
711impl std::fmt::Display for SchemaVersion {
712    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
713        if self.is_latest() {
714            write!(f, "latest")
715        } else {
716            write!(f, "{}", self.0)
717        }
718    }
719}
720
721impl From<u32> for SchemaVersion {
722    fn from(v: u32) -> Self {
723        Self(v)
724    }
725}
726
727/// Schema metadata for organization and discovery
728#[derive(Debug, Clone, Default, Serialize, Deserialize)]
729pub struct SchemaMetadata {
730    /// Human-readable name
731    #[serde(skip_serializing_if = "Option::is_none")]
732    pub name: Option<String>,
733    /// Description of the schema
734    #[serde(skip_serializing_if = "Option::is_none")]
735    pub description: Option<String>,
736    /// Labels/tags for categorization (key-value pairs)
737    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
738    pub labels: HashMap<String, String>,
739    /// Owner/team responsible for the schema
740    #[serde(skip_serializing_if = "Option::is_none")]
741    pub owner: Option<String>,
742    /// Creation timestamp (RFC 3339)
743    #[serde(skip_serializing_if = "Option::is_none")]
744    pub created_at: Option<String>,
745    /// Last modified timestamp (RFC 3339)
746    #[serde(skip_serializing_if = "Option::is_none")]
747    pub modified_at: Option<String>,
748    /// Schema context for multi-tenant isolation
749    #[serde(skip_serializing_if = "Option::is_none")]
750    pub context: Option<String>,
751}
752
753impl SchemaMetadata {
754    pub fn new() -> Self {
755        Self::default()
756    }
757
758    pub fn with_name(mut self, name: impl Into<String>) -> Self {
759        self.name = Some(name.into());
760        self
761    }
762
763    pub fn with_description(mut self, description: impl Into<String>) -> Self {
764        self.description = Some(description.into());
765        self
766    }
767
768    pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
769        self.owner = Some(owner.into());
770        self
771    }
772
773    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
774        self.labels.insert(key.into(), value.into());
775        self
776    }
777
778    /// Set the context
779    pub fn with_context(mut self, context: impl Into<String>) -> Self {
780        self.context = Some(context.into());
781        self
782    }
783}
784
785/// A registered schema with its metadata
786#[derive(Debug, Clone, Serialize, Deserialize)]
787pub struct Schema {
788    /// Unique schema ID (global)
789    pub id: SchemaId,
790    /// Schema type/format
791    pub schema_type: SchemaType,
792    /// The schema definition (JSON string)
793    pub schema: String,
794    /// MD5 fingerprint for deduplication
795    #[serde(skip_serializing_if = "Option::is_none")]
796    pub fingerprint: Option<String>,
797    /// Schema references (for nested schemas)
798    #[serde(default, skip_serializing_if = "Vec::is_empty")]
799    pub references: Vec<SchemaReference>,
800    /// Schema metadata (name, description, labels)
801    #[serde(default, skip_serializing_if = "Option::is_none")]
802    pub metadata: Option<SchemaMetadata>,
803}
804
805impl Schema {
806    pub fn new(id: SchemaId, schema_type: SchemaType, schema: String) -> Self {
807        Self {
808            id,
809            schema_type,
810            schema,
811            fingerprint: None,
812            references: Vec::new(),
813            metadata: None,
814        }
815    }
816
817    pub fn with_fingerprint(mut self, fingerprint: String) -> Self {
818        self.fingerprint = Some(fingerprint);
819        self
820    }
821
822    pub fn with_references(mut self, references: Vec<SchemaReference>) -> Self {
823        self.references = references;
824        self
825    }
826
827    pub fn with_metadata(mut self, metadata: SchemaMetadata) -> Self {
828        self.metadata = Some(metadata);
829        self
830    }
831}
832
833/// Reference to another schema (for composition)
834#[derive(Debug, Clone, Serialize, Deserialize)]
835pub struct SchemaReference {
836    /// Reference name (used in the schema)
837    pub name: String,
838    /// Subject containing the referenced schema
839    pub subject: String,
840    /// Version of the referenced schema
841    pub version: u32,
842}
843
844/// A subject version combines subject, version, and schema
845#[derive(Debug, Clone, Serialize, Deserialize)]
846pub struct SubjectVersion {
847    /// Subject name
848    pub subject: Subject,
849    /// Version number
850    pub version: SchemaVersion,
851    /// Schema ID
852    pub id: SchemaId,
853    /// Schema type
854    pub schema_type: SchemaType,
855    /// The schema definition
856    pub schema: String,
857    /// Version state (enabled/deprecated/disabled)
858    #[serde(default)]
859    pub state: VersionState,
860}
861
862impl SubjectVersion {
863    pub fn new(
864        subject: Subject,
865        version: SchemaVersion,
866        id: SchemaId,
867        schema_type: SchemaType,
868        schema: String,
869    ) -> Self {
870        Self {
871            subject,
872            version,
873            id,
874            schema_type,
875            schema,
876            state: VersionState::Enabled,
877        }
878    }
879
880    /// Create with a specific state
881    pub fn with_state(mut self, state: VersionState) -> Self {
882        self.state = state;
883        self
884    }
885
886    /// Check if this version is usable
887    pub fn is_usable(&self) -> bool {
888        self.state.is_usable()
889    }
890}
891
892/// Compatibility level for schema evolution
893///
894/// | Level | Description | Can Read |
895/// |-------|-------------|----------|
896/// | BACKWARD | New schema can read old data | Old → New ✓ |
897/// | FORWARD | Old schema can read new data | New → Old ✓ |
898/// | FULL | Both directions | Both ✓ |
899/// | NONE | No checking | - |
900#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
901#[serde(rename_all = "UPPERCASE")]
902pub enum CompatibilityLevel {
903    /// New schema can read data written by old schema (default)
904    /// Safe for consumers to upgrade first
905    #[default]
906    Backward,
907
908    /// New schema can read data written by old schema (transitive)
909    BackwardTransitive,
910
911    /// Old schema can read data written by new schema
912    /// Safe for producers to upgrade first
913    Forward,
914
915    /// Old schema can read data written by new schema (transitive)
916    ForwardTransitive,
917
918    /// Both backward and forward compatible
919    Full,
920
921    /// Both backward and forward compatible (transitive)
922    FullTransitive,
923
924    /// No compatibility checking
925    None,
926}
927
928impl CompatibilityLevel {
929    /// Check if this level requires backward compatibility
930    pub fn is_backward(&self) -> bool {
931        matches!(
932            self,
933            Self::Backward | Self::BackwardTransitive | Self::Full | Self::FullTransitive
934        )
935    }
936
937    /// Check if this level requires forward compatibility
938    pub fn is_forward(&self) -> bool {
939        matches!(
940            self,
941            Self::Forward | Self::ForwardTransitive | Self::Full | Self::FullTransitive
942        )
943    }
944
945    /// Check if this level is transitive (checks all versions)
946    pub fn is_transitive(&self) -> bool {
947        matches!(
948            self,
949            Self::BackwardTransitive | Self::ForwardTransitive | Self::FullTransitive
950        )
951    }
952}
953
954impl std::fmt::Display for CompatibilityLevel {
955    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
956        let s = match self {
957            Self::Backward => "BACKWARD",
958            Self::BackwardTransitive => "BACKWARD_TRANSITIVE",
959            Self::Forward => "FORWARD",
960            Self::ForwardTransitive => "FORWARD_TRANSITIVE",
961            Self::Full => "FULL",
962            Self::FullTransitive => "FULL_TRANSITIVE",
963            Self::None => "NONE",
964        };
965        write!(f, "{}", s)
966    }
967}
968
969impl std::str::FromStr for CompatibilityLevel {
970    type Err = String;
971
972    fn from_str(s: &str) -> Result<Self, Self::Err> {
973        match s.to_uppercase().as_str() {
974            "BACKWARD" => Ok(Self::Backward),
975            "BACKWARD_TRANSITIVE" => Ok(Self::BackwardTransitive),
976            "FORWARD" => Ok(Self::Forward),
977            "FORWARD_TRANSITIVE" => Ok(Self::ForwardTransitive),
978            "FULL" => Ok(Self::Full),
979            "FULL_TRANSITIVE" => Ok(Self::FullTransitive),
980            "NONE" => Ok(Self::None),
981            _ => Err(format!("Unknown compatibility level: {}", s)),
982        }
983    }
984}
985
986#[cfg(test)]
987mod tests {
988    use super::*;
989
990    #[test]
991    fn test_schema_type_parse() {
992        assert_eq!("avro".parse::<SchemaType>().unwrap(), SchemaType::Avro);
993        assert_eq!("AVRO".parse::<SchemaType>().unwrap(), SchemaType::Avro);
994        assert_eq!("json".parse::<SchemaType>().unwrap(), SchemaType::Json);
995        assert_eq!(
996            "protobuf".parse::<SchemaType>().unwrap(),
997            SchemaType::Protobuf
998        );
999    }
1000
1001    #[test]
1002    fn test_subject_naming() {
1003        let key_subject = Subject::key("users");
1004        assert_eq!(key_subject.as_str(), "users-key");
1005
1006        let value_subject = Subject::value("users");
1007        assert_eq!(value_subject.as_str(), "users-value");
1008    }
1009
1010    #[test]
1011    fn test_schema_id() {
1012        let id = SchemaId::new(42);
1013        assert_eq!(id.as_u32(), 42);
1014        assert_eq!(format!("{}", id), "42");
1015    }
1016
1017    #[test]
1018    fn test_compatibility_level_parse() {
1019        assert_eq!(
1020            "backward".parse::<CompatibilityLevel>().unwrap(),
1021            CompatibilityLevel::Backward
1022        );
1023        assert_eq!(
1024            "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
1025            CompatibilityLevel::FullTransitive
1026        );
1027        assert_eq!(
1028            "none".parse::<CompatibilityLevel>().unwrap(),
1029            CompatibilityLevel::None
1030        );
1031    }
1032
1033    #[test]
1034    fn test_compatibility_level_methods() {
1035        assert!(CompatibilityLevel::Backward.is_backward());
1036        assert!(!CompatibilityLevel::Backward.is_forward());
1037        assert!(!CompatibilityLevel::Backward.is_transitive());
1038
1039        assert!(CompatibilityLevel::Full.is_backward());
1040        assert!(CompatibilityLevel::Full.is_forward());
1041        assert!(!CompatibilityLevel::Full.is_transitive());
1042
1043        assert!(CompatibilityLevel::FullTransitive.is_transitive());
1044    }
1045
1046    // ========================================================================
1047    // Schema Context Tests
1048    // ========================================================================
1049
1050    #[test]
1051    fn test_schema_context_default() {
1052        let ctx = SchemaContext::default();
1053        assert!(ctx.is_default());
1054        assert_eq!(ctx.name(), "");
1055        assert!(ctx.is_active());
1056    }
1057
1058    #[test]
1059    fn test_schema_context_new() {
1060        let ctx = SchemaContext::new("tenant-123");
1061        assert!(!ctx.is_default());
1062        assert_eq!(ctx.name(), "tenant-123");
1063        assert!(ctx.is_active());
1064    }
1065
1066    #[test]
1067    fn test_schema_context_with_description() {
1068        let ctx =
1069            SchemaContext::new("production").with_description("Production environment context");
1070
1071        assert_eq!(ctx.description(), Some("Production environment context"));
1072    }
1073
1074    #[test]
1075    fn test_schema_context_inactive() {
1076        let ctx = SchemaContext::new("deprecated").with_active(false);
1077
1078        assert!(!ctx.is_active());
1079    }
1080
1081    #[test]
1082    fn test_schema_context_qualify_subject() {
1083        let default_ctx = SchemaContext::default();
1084        let subject = Subject::new("users-value");
1085
1086        // Default context doesn't add prefix
1087        assert_eq!(default_ctx.qualify_subject(&subject), "users-value");
1088
1089        // Named context adds Confluent-compatible prefix
1090        let tenant_ctx = SchemaContext::new("tenant-123");
1091        assert_eq!(
1092            tenant_ctx.qualify_subject(&subject),
1093            ":.tenant-123:users-value"
1094        );
1095    }
1096
1097    #[test]
1098    fn test_schema_context_parse_qualified() {
1099        // Default context (no prefix)
1100        let (ctx, subject) = SchemaContext::parse_qualified("users-value");
1101        assert!(ctx.is_default());
1102        assert_eq!(subject.as_str(), "users-value");
1103
1104        // Named context with prefix
1105        let (ctx, subject) = SchemaContext::parse_qualified(":.tenant-123:users-value");
1106        assert_eq!(ctx.name(), "tenant-123");
1107        assert_eq!(subject.as_str(), "users-value");
1108    }
1109
1110    #[test]
1111    fn test_schema_context_display() {
1112        let default_ctx = SchemaContext::default();
1113        assert_eq!(format!("{}", default_ctx), "(default)");
1114
1115        let named_ctx = SchemaContext::new("prod");
1116        assert_eq!(format!("{}", named_ctx), "prod");
1117    }
1118
1119    // ========================================================================
1120    // Version State Tests
1121    // ========================================================================
1122
1123    #[test]
1124    fn test_version_state_default() {
1125        let state = VersionState::default();
1126        assert_eq!(state, VersionState::Enabled);
1127    }
1128
1129    #[test]
1130    fn test_version_state_usable() {
1131        assert!(VersionState::Enabled.is_usable());
1132        assert!(VersionState::Deprecated.is_usable());
1133        assert!(!VersionState::Disabled.is_usable());
1134    }
1135
1136    #[test]
1137    fn test_version_state_requires_warning() {
1138        assert!(!VersionState::Enabled.requires_warning());
1139        assert!(VersionState::Deprecated.requires_warning());
1140        assert!(!VersionState::Disabled.requires_warning());
1141    }
1142
1143    #[test]
1144    fn test_version_state_blocked() {
1145        assert!(!VersionState::Enabled.is_blocked());
1146        assert!(!VersionState::Deprecated.is_blocked());
1147        assert!(VersionState::Disabled.is_blocked());
1148    }
1149
1150    #[test]
1151    fn test_version_state_transitions() {
1152        let enabled = VersionState::Enabled;
1153
1154        // Enabled -> Deprecated
1155        let deprecated = enabled.deprecate();
1156        assert_eq!(deprecated, VersionState::Deprecated);
1157
1158        // Deprecated -> Deprecated (no change)
1159        let still_deprecated = deprecated.deprecate();
1160        assert_eq!(still_deprecated, VersionState::Deprecated);
1161
1162        // Deprecated -> Disabled
1163        let disabled = deprecated.disable();
1164        assert_eq!(disabled, VersionState::Disabled);
1165
1166        // Disabled -> Enabled
1167        let re_enabled = disabled.enable();
1168        assert_eq!(re_enabled, VersionState::Enabled);
1169    }
1170
1171    #[test]
1172    fn test_version_state_parse() {
1173        assert_eq!(
1174            "ENABLED".parse::<VersionState>().unwrap(),
1175            VersionState::Enabled
1176        );
1177        assert_eq!(
1178            "deprecated".parse::<VersionState>().unwrap(),
1179            VersionState::Deprecated
1180        );
1181        assert_eq!(
1182            "Disabled".parse::<VersionState>().unwrap(),
1183            VersionState::Disabled
1184        );
1185        assert!("invalid".parse::<VersionState>().is_err());
1186    }
1187
1188    #[test]
1189    fn test_version_state_display() {
1190        assert_eq!(format!("{}", VersionState::Enabled), "ENABLED");
1191        assert_eq!(format!("{}", VersionState::Deprecated), "DEPRECATED");
1192        assert_eq!(format!("{}", VersionState::Disabled), "DISABLED");
1193    }
1194
1195    // ========================================================================
1196    // Validation Rule Tests
1197    // ========================================================================
1198
1199    #[test]
1200    fn test_validation_rule_new() {
1201        let rule = ValidationRule::new(
1202            "doc-required",
1203            ValidationRuleType::FieldRequired,
1204            r#"{"field": "doc"}"#,
1205        );
1206
1207        assert_eq!(rule.name, "doc-required");
1208        assert_eq!(rule.rule_type, ValidationRuleType::FieldRequired);
1209        assert!(rule.active);
1210        assert_eq!(rule.level, ValidationLevel::Error);
1211    }
1212
1213    #[test]
1214    fn test_validation_rule_builder() {
1215        let rule = ValidationRule::new(
1216            "naming-convention",
1217            ValidationRuleType::NamingConvention,
1218            r#"{"pattern": "^[A-Z]"}"#,
1219        )
1220        .with_level(ValidationLevel::Warning)
1221        .with_description("Enforce PascalCase naming")
1222        .with_schema_types(vec![SchemaType::Avro])
1223        .with_subject_patterns(vec![".*-value$".to_string()]);
1224
1225        assert_eq!(rule.level, ValidationLevel::Warning);
1226        assert_eq!(
1227            rule.description,
1228            Some("Enforce PascalCase naming".to_string())
1229        );
1230        assert_eq!(rule.applies_to, vec![SchemaType::Avro]);
1231        assert_eq!(rule.subject_patterns, vec![".*-value$".to_string()]);
1232    }
1233
1234    #[test]
1235    fn test_validation_rule_applies() {
1236        let rule = ValidationRule::new("test", ValidationRuleType::JsonSchema, "{}")
1237            .with_schema_types(vec![SchemaType::Avro])
1238            .with_subject_patterns(vec!["users-.*".to_string()]);
1239
1240        // Matches both type and pattern
1241        assert!(rule.applies(SchemaType::Avro, "users-value"));
1242
1243        // Wrong type
1244        assert!(!rule.applies(SchemaType::Json, "users-value"));
1245
1246        // Wrong pattern
1247        assert!(!rule.applies(SchemaType::Avro, "orders-value"));
1248    }
1249
1250    #[test]
1251    fn test_validation_rule_inactive() {
1252        let rule =
1253            ValidationRule::new("test", ValidationRuleType::MaxSize, "{}").with_active(false);
1254
1255        assert!(!rule.applies(SchemaType::Avro, "any-subject"));
1256    }
1257
1258    #[test]
1259    fn test_validation_rule_no_filters() {
1260        let rule = ValidationRule::new("test", ValidationRuleType::Regex, "{}");
1261
1262        // Without filters, applies to everything
1263        assert!(rule.applies(SchemaType::Avro, "any-subject"));
1264        assert!(rule.applies(SchemaType::Json, "other-subject"));
1265        assert!(rule.applies(SchemaType::Protobuf, "third-subject"));
1266    }
1267
1268    // ========================================================================
1269    // Validation Result Tests
1270    // ========================================================================
1271
1272    #[test]
1273    fn test_validation_result_pass() {
1274        let result = ValidationResult::pass("test-rule");
1275
1276        assert!(result.passed);
1277        assert_eq!(result.rule_name, "test-rule");
1278    }
1279
1280    #[test]
1281    fn test_validation_result_fail() {
1282        let result = ValidationResult::fail(
1283            "naming-rule",
1284            ValidationLevel::Error,
1285            "Schema name must be PascalCase",
1286        )
1287        .with_details(r#"{"name": "invalidName"}"#);
1288
1289        assert!(!result.passed);
1290        assert_eq!(result.level, ValidationLevel::Error);
1291        assert_eq!(result.message, "Schema name must be PascalCase");
1292        assert!(result.details.is_some());
1293    }
1294
1295    // ========================================================================
1296    // Validation Report Tests
1297    // ========================================================================
1298
1299    #[test]
1300    fn test_validation_report_empty() {
1301        let report = ValidationReport::new();
1302
1303        assert!(report.is_valid());
1304        assert!(!report.has_warnings());
1305        assert!(report.results.is_empty());
1306    }
1307
1308    #[test]
1309    fn test_validation_report_with_results() {
1310        let mut report = ValidationReport::new();
1311
1312        report.add_result(ValidationResult::pass("rule1"));
1313        report.add_result(ValidationResult::fail(
1314            "rule2",
1315            ValidationLevel::Warning,
1316            "warning msg",
1317        ));
1318        report.add_result(ValidationResult::fail(
1319            "rule3",
1320            ValidationLevel::Error,
1321            "error msg",
1322        ));
1323
1324        assert!(!report.is_valid()); // Has errors
1325        assert!(report.has_warnings());
1326        assert_eq!(report.summary.passed, 1);
1327        assert_eq!(report.summary.warnings, 1);
1328        assert_eq!(report.summary.errors, 1);
1329    }
1330
1331    #[test]
1332    fn test_validation_report_error_messages() {
1333        let mut report = ValidationReport::new();
1334
1335        report.add_result(ValidationResult::fail(
1336            "rule1",
1337            ValidationLevel::Error,
1338            "Error 1",
1339        ));
1340        report.add_result(ValidationResult::fail(
1341            "rule2",
1342            ValidationLevel::Warning,
1343            "Warning 1",
1344        ));
1345        report.add_result(ValidationResult::fail(
1346            "rule3",
1347            ValidationLevel::Error,
1348            "Error 2",
1349        ));
1350
1351        let errors: Vec<String> = report.error_messages();
1352        assert_eq!(errors.len(), 2);
1353        assert!(errors.contains(&"Error 1".to_string()));
1354        assert!(errors.contains(&"Error 2".to_string()));
1355
1356        let warnings: Vec<String> = report.warning_messages();
1357        assert_eq!(warnings.len(), 1);
1358        assert!(warnings.contains(&"Warning 1".to_string()));
1359    }
1360
1361    // ========================================================================
1362    // Schema Metadata Tests
1363    // ========================================================================
1364
1365    #[test]
1366    fn test_schema_metadata_builder() {
1367        let metadata = SchemaMetadata::new()
1368            .with_name("UserEvent")
1369            .with_description("Schema for user events")
1370            .with_owner("platform-team")
1371            .with_label("domain", "users")
1372            .with_label("version", "v2")
1373            .with_context("production");
1374
1375        assert_eq!(metadata.name, Some("UserEvent".to_string()));
1376        assert_eq!(
1377            metadata.description,
1378            Some("Schema for user events".to_string())
1379        );
1380        assert_eq!(metadata.owner, Some("platform-team".to_string()));
1381        assert_eq!(metadata.labels.get("domain"), Some(&"users".to_string()));
1382        assert_eq!(metadata.labels.get("version"), Some(&"v2".to_string()));
1383        assert_eq!(metadata.context, Some("production".to_string()));
1384    }
1385
1386    // ========================================================================
1387    // SubjectVersion Tests
1388    // ========================================================================
1389
1390    #[test]
1391    fn test_subject_version_with_state() {
1392        let sv = SubjectVersion::new(
1393            Subject::new("users-value"),
1394            SchemaVersion::new(1),
1395            SchemaId::new(1),
1396            SchemaType::Avro,
1397            r#"{"type":"string"}"#.to_string(),
1398        )
1399        .with_state(VersionState::Deprecated);
1400
1401        assert_eq!(sv.state, VersionState::Deprecated);
1402        assert!(sv.is_usable()); // Deprecated is still usable
1403    }
1404
1405    #[test]
1406    fn test_subject_version_disabled() {
1407        let sv = SubjectVersion::new(
1408            Subject::new("users-value"),
1409            SchemaVersion::new(1),
1410            SchemaId::new(1),
1411            SchemaType::Avro,
1412            r#"{"type":"string"}"#.to_string(),
1413        )
1414        .with_state(VersionState::Disabled);
1415
1416        assert!(!sv.is_usable()); // Disabled is not usable
1417    }
1418}