Skip to main content

rivven_schema/
types.rs

1//! Schema types and data structures
2//!
3//! This module provides core types for the Schema Registry:
4//! - [`SchemaId`], [`Subject`], [`SchemaVersion`] - Core identifiers
5//! - [`Schema`], [`SubjectVersion`] - Schema data structures
6//! - [`SchemaContext`] - Multi-tenant context/group support
7//! - [`VersionState`] - Version lifecycle management
8//! - [`ValidationRule`] - Custom content validation rules
9//! - [`Mode`], [`CompatibilityLevel`] - Registry configuration
10
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14// Re-export SchemaType from rivven-protocol (single source of truth)
15pub use rivven_protocol::SchemaType;
16
17/// Unique identifier for a schema (global across all subjects)
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub struct SchemaId(pub u32);
20
21impl SchemaId {
22    pub fn new(id: u32) -> Self {
23        Self(id)
24    }
25
26    pub fn as_u32(&self) -> u32 {
27        self.0
28    }
29}
30
31impl std::fmt::Display for SchemaId {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "{}", self.0)
34    }
35}
36
37impl From<u32> for SchemaId {
38    fn from(id: u32) -> Self {
39        Self(id)
40    }
41}
42
43/// Subject (typically topic-name + "-key" or "-value")
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct Subject(pub String);
46
47impl Subject {
48    pub fn new(name: impl Into<String>) -> Self {
49        Self(name.into())
50    }
51
52    /// Create a key subject for a topic
53    pub fn key(topic: &str) -> Self {
54        Self(format!("{}-key", topic))
55    }
56
57    /// Create a value subject for a topic
58    pub fn value(topic: &str) -> Self {
59        Self(format!("{}-value", topic))
60    }
61
62    pub fn as_str(&self) -> &str {
63        &self.0
64    }
65}
66
67impl std::fmt::Display for Subject {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "{}", self.0)
70    }
71}
72
73impl From<&str> for Subject {
74    fn from(s: &str) -> Self {
75        Self(s.to_string())
76    }
77}
78
79impl From<String> for Subject {
80    fn from(s: String) -> Self {
81        Self(s)
82    }
83}
84
85impl AsRef<str> for Subject {
86    fn as_ref(&self) -> &str {
87        &self.0
88    }
89}
90
91// ============================================================================
92// Schema Context (Multi-tenant support)
93// ============================================================================
94
95/// Schema context for multi-tenant isolation
96///
97/// Contexts allow organizing schemas into isolated namespaces, similar to
98/// Confluent's Schema Registry contexts. Each context has its own set of
99/// subjects and schemas.
100///
101/// # Examples
102///
103/// ```rust
104/// use rivven_schema::types::SchemaContext;
105///
106/// // Default context (root)
107/// let default_ctx = SchemaContext::default();
108///
109/// // Tenant-specific context
110/// let tenant_ctx = SchemaContext::new("tenant-123");
111///
112/// // Environment context
113/// let prod_ctx = SchemaContext::new("production");
114/// ```
115#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
116pub struct SchemaContext {
117    /// Context name (empty string = default/root context)
118    name: String,
119    /// Context description
120    #[serde(skip_serializing_if = "Option::is_none")]
121    description: Option<String>,
122    /// Whether this context is active
123    #[serde(default = "default_true")]
124    active: bool,
125}
126
127fn default_true() -> bool {
128    true
129}
130
131impl SchemaContext {
132    /// The default (root) context name
133    pub const DEFAULT: &'static str = "";
134
135    /// Create a new context with the given name
136    pub fn new(name: impl Into<String>) -> Self {
137        Self {
138            name: name.into(),
139            description: None,
140            active: true,
141        }
142    }
143
144    /// Create the default (root) context
145    pub fn default_context() -> Self {
146        Self::new("")
147    }
148
149    /// Get the context name
150    pub fn name(&self) -> &str {
151        &self.name
152    }
153
154    /// Check if this is the default context
155    pub fn is_default(&self) -> bool {
156        self.name.is_empty()
157    }
158
159    /// Set the context description
160    pub fn with_description(mut self, description: impl Into<String>) -> Self {
161        self.description = Some(description.into());
162        self
163    }
164
165    /// Get the context description
166    pub fn description(&self) -> Option<&str> {
167        self.description.as_deref()
168    }
169
170    /// Set the active state
171    pub fn with_active(mut self, active: bool) -> Self {
172        self.active = active;
173        self
174    }
175
176    /// Check if the context is active
177    pub fn is_active(&self) -> bool {
178        self.active
179    }
180
181    /// Create a fully qualified subject name with context prefix
182    ///
183    /// Format: `:.{context}:{subject}` (Confluent compatible)
184    pub fn qualify_subject(&self, subject: &Subject) -> String {
185        if self.is_default() {
186            subject.0.clone()
187        } else {
188            format!(":.{}:{}", self.name, subject.0)
189        }
190    }
191
192    /// Parse a qualified subject name into context and subject
193    pub fn parse_qualified(qualified: &str) -> (SchemaContext, Subject) {
194        if let Some(rest) = qualified.strip_prefix(":.") {
195            if let Some(colon_pos) = rest.find(':') {
196                let context_name = &rest[..colon_pos];
197                let subject_name = &rest[colon_pos + 1..];
198                return (SchemaContext::new(context_name), Subject::new(subject_name));
199            }
200        }
201        (SchemaContext::default_context(), Subject::new(qualified))
202    }
203}
204
205impl Default for SchemaContext {
206    fn default() -> Self {
207        Self::default_context()
208    }
209}
210
211impl std::fmt::Display for SchemaContext {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        if self.is_default() {
214            write!(f, "(default)")
215        } else {
216            write!(f, "{}", self.name)
217        }
218    }
219}
220
221// ============================================================================
222// Version State (Lifecycle management)
223// ============================================================================
224
225/// State of a schema version within a subject
226///
227/// Allows managing the lifecycle of schema versions without deletion.
228/// This is important for schema evolution and migration workflows.
229///
230/// # State Transitions
231///
232/// ```text
233///     ┌─────────┐
234///     │ Enabled │ ◄───── Initial state
235///     └────┬────┘
236///          │ deprecate()
237///          ▼
238///   ┌──────────────┐
239///   │  Deprecated  │ ◄───── Warning state
240///   └──────┬───────┘
241///          │ disable()
242///          ▼
243///     ┌──────────┐
244///     │ Disabled │ ◄───── Blocked state
245///     └──────────┘
246/// ```
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
248#[serde(rename_all = "UPPERCASE")]
249pub enum VersionState {
250    /// Version is active and can be used (default)
251    #[default]
252    Enabled,
253    /// Version is deprecated - still usable but clients are warned
254    Deprecated,
255    /// Version is disabled - cannot be used for new registrations
256    Disabled,
257}
258
259impl VersionState {
260    /// Check if the version is usable (enabled or deprecated)
261    pub fn is_usable(&self) -> bool {
262        matches!(self, VersionState::Enabled | VersionState::Deprecated)
263    }
264
265    /// Check if this state requires client warnings
266    pub fn requires_warning(&self) -> bool {
267        matches!(self, VersionState::Deprecated)
268    }
269
270    /// Check if this version is blocked from use
271    pub fn is_blocked(&self) -> bool {
272        matches!(self, VersionState::Disabled)
273    }
274
275    /// Transition to deprecated state
276    pub fn deprecate(&self) -> VersionState {
277        match self {
278            VersionState::Enabled => VersionState::Deprecated,
279            other => *other,
280        }
281    }
282
283    /// Transition to disabled state
284    pub fn disable(&self) -> VersionState {
285        VersionState::Disabled
286    }
287
288    /// Transition back to enabled state
289    pub fn enable(&self) -> VersionState {
290        VersionState::Enabled
291    }
292}
293
294impl std::fmt::Display for VersionState {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        match self {
297            VersionState::Enabled => write!(f, "ENABLED"),
298            VersionState::Deprecated => write!(f, "DEPRECATED"),
299            VersionState::Disabled => write!(f, "DISABLED"),
300        }
301    }
302}
303
304impl std::str::FromStr for VersionState {
305    type Err = String;
306
307    fn from_str(s: &str) -> Result<Self, Self::Err> {
308        match s.to_uppercase().as_str() {
309            "ENABLED" => Ok(VersionState::Enabled),
310            "DEPRECATED" => Ok(VersionState::Deprecated),
311            "DISABLED" => Ok(VersionState::Disabled),
312            _ => Err(format!("Invalid version state: {}", s)),
313        }
314    }
315}
316
317// ============================================================================
318// Content Validation Rules
319// ============================================================================
320
321/// Rule type for content validation
322#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
323#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
324pub enum ValidationRuleType {
325    /// JSON Schema validation rule
326    JsonSchema,
327    /// Regular expression pattern
328    Regex,
329    /// Field presence requirement
330    FieldRequired,
331    /// Field type constraint
332    FieldType,
333    /// Custom CEL expression
334    Cel,
335    /// Maximum schema size in bytes
336    MaxSize,
337    /// Schema naming convention
338    NamingConvention,
339}
340
341impl std::str::FromStr for ValidationRuleType {
342    type Err = String;
343
344    fn from_str(s: &str) -> Result<Self, Self::Err> {
345        match s.to_uppercase().as_str() {
346            "JSON_SCHEMA" | "JSONSCHEMA" => Ok(ValidationRuleType::JsonSchema),
347            "REGEX" => Ok(ValidationRuleType::Regex),
348            "FIELD_REQUIRED" | "FIELDREQUIRED" => Ok(ValidationRuleType::FieldRequired),
349            "FIELD_TYPE" | "FIELDTYPE" => Ok(ValidationRuleType::FieldType),
350            "CEL" => Ok(ValidationRuleType::Cel),
351            "MAX_SIZE" | "MAXSIZE" => Ok(ValidationRuleType::MaxSize),
352            "NAMING_CONVENTION" | "NAMINGCONVENTION" => Ok(ValidationRuleType::NamingConvention),
353            _ => Err(format!("Invalid validation rule type: {}", s)),
354        }
355    }
356}
357
358/// Content validation rule for custom schema validation
359///
360/// Validation rules extend beyond compatibility checking to enforce
361/// organizational policies and data quality requirements.
362///
363/// # Examples
364///
365/// ```rust
366/// use rivven_schema::types::{ValidationRule, ValidationRuleType, ValidationLevel};
367///
368/// // Require a "doc" field in all Avro schemas
369/// let doc_required = ValidationRule::new(
370///     "doc-required",
371///     ValidationRuleType::FieldRequired,
372///     r#"{"field": "doc"}"#,
373/// ).with_level(ValidationLevel::Error);
374///
375/// // Enforce naming convention
376/// let naming_rule = ValidationRule::new(
377///     "pascal-case",
378///     ValidationRuleType::NamingConvention,
379///     r#"{"pattern": "^[A-Z][a-zA-Z0-9]*$"}"#,
380/// );
381/// ```
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct ValidationRule {
384    /// Unique rule name
385    pub name: String,
386    /// Rule type
387    pub rule_type: ValidationRuleType,
388    /// Rule configuration (JSON)
389    pub config: String,
390    /// Validation level
391    #[serde(default)]
392    pub level: ValidationLevel,
393    /// Description of what the rule checks
394    #[serde(skip_serializing_if = "Option::is_none")]
395    pub description: Option<String>,
396    /// Whether the rule is active
397    #[serde(default = "default_true")]
398    pub active: bool,
399    /// Schema types this rule applies to (empty = all)
400    #[serde(default, skip_serializing_if = "Vec::is_empty")]
401    pub applies_to: Vec<SchemaType>,
402    /// Subjects this rule applies to (regex patterns, empty = all)
403    #[serde(default, skip_serializing_if = "Vec::is_empty")]
404    pub subject_patterns: Vec<String>,
405}
406
407impl ValidationRule {
408    /// Create a new validation rule
409    pub fn new(
410        name: impl Into<String>,
411        rule_type: ValidationRuleType,
412        config: impl Into<String>,
413    ) -> Self {
414        Self {
415            name: name.into(),
416            rule_type,
417            config: config.into(),
418            level: ValidationLevel::Error,
419            description: None,
420            active: true,
421            applies_to: Vec::new(),
422            subject_patterns: Vec::new(),
423        }
424    }
425
426    /// Set the validation level
427    pub fn with_level(mut self, level: ValidationLevel) -> Self {
428        self.level = level;
429        self
430    }
431
432    /// Set the description
433    pub fn with_description(mut self, description: impl Into<String>) -> Self {
434        self.description = Some(description.into());
435        self
436    }
437
438    /// Set the active state
439    pub fn with_active(mut self, active: bool) -> Self {
440        self.active = active;
441        self
442    }
443
444    /// Add schema types this rule applies to
445    pub fn with_schema_types(mut self, types: Vec<SchemaType>) -> Self {
446        self.applies_to = types;
447        self
448    }
449
450    /// Add subject patterns this rule applies to
451    pub fn with_subject_patterns(mut self, patterns: Vec<String>) -> Self {
452        self.subject_patterns = patterns;
453        self
454    }
455
456    /// Alias for with_subject_patterns for convenience
457    pub fn for_subjects(self, patterns: Vec<String>) -> Self {
458        self.with_subject_patterns(patterns)
459    }
460
461    /// Alias for with_schema_types for convenience
462    pub fn for_schema_types(self, types: Vec<SchemaType>) -> Self {
463        self.with_schema_types(types)
464    }
465
466    /// Check if this rule applies to the given schema type and subject
467    pub fn applies(&self, schema_type: SchemaType, subject: &str) -> bool {
468        if !self.active {
469            return false;
470        }
471
472        // Check schema type filter
473        if !self.applies_to.is_empty() && !self.applies_to.contains(&schema_type) {
474            return false;
475        }
476
477        // Check subject pattern filter
478        if !self.subject_patterns.is_empty() {
479            let matches_any = self.subject_patterns.iter().any(|pattern| {
480                regex::Regex::new(pattern)
481                    .map(|re| re.is_match(subject))
482                    .unwrap_or(false)
483            });
484            if !matches_any {
485                return false;
486            }
487        }
488
489        true
490    }
491
492    /// Get the rule name
493    pub fn name(&self) -> &str {
494        &self.name
495    }
496
497    /// Get the rule type
498    pub fn rule_type(&self) -> ValidationRuleType {
499        self.rule_type.clone()
500    }
501
502    /// Get the rule config
503    pub fn config(&self) -> &str {
504        &self.config
505    }
506
507    /// Get the validation level
508    pub fn level(&self) -> ValidationLevel {
509        self.level
510    }
511
512    /// Get the description
513    pub fn description(&self) -> Option<&str> {
514        self.description.as_deref()
515    }
516
517    /// Check if the rule is active
518    pub fn is_active(&self) -> bool {
519        self.active
520    }
521}
522
523/// Validation level for rules
524#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
525#[serde(rename_all = "UPPERCASE")]
526pub enum ValidationLevel {
527    /// Validation failure is an error (blocks registration)
528    #[default]
529    Error,
530    /// Validation failure is a warning (logged but allowed)
531    Warning,
532    /// Validation is informational only
533    Info,
534}
535
536impl std::fmt::Display for ValidationLevel {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        match self {
539            ValidationLevel::Error => write!(f, "ERROR"),
540            ValidationLevel::Warning => write!(f, "WARNING"),
541            ValidationLevel::Info => write!(f, "INFO"),
542        }
543    }
544}
545
546impl std::str::FromStr for ValidationLevel {
547    type Err = String;
548
549    fn from_str(s: &str) -> Result<Self, Self::Err> {
550        match s.to_uppercase().as_str() {
551            "ERROR" => Ok(ValidationLevel::Error),
552            "WARNING" | "WARN" => Ok(ValidationLevel::Warning),
553            "INFO" => Ok(ValidationLevel::Info),
554            _ => Err(format!("Invalid validation level: {}", s)),
555        }
556    }
557}
558
559/// Result of a validation rule check
560#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct ValidationResult {
562    /// Rule name that was checked
563    pub rule_name: String,
564    /// Whether validation passed
565    pub passed: bool,
566    /// Validation level
567    pub level: ValidationLevel,
568    /// Message describing the result
569    pub message: String,
570    /// Optional details (JSON)
571    #[serde(skip_serializing_if = "Option::is_none")]
572    pub details: Option<String>,
573}
574
575impl ValidationResult {
576    /// Create a passing result
577    pub fn pass(rule_name: impl Into<String>) -> Self {
578        Self {
579            rule_name: rule_name.into(),
580            passed: true,
581            level: ValidationLevel::Info,
582            message: "Validation passed".to_string(),
583            details: None,
584        }
585    }
586
587    /// Create a failing result
588    pub fn fail(
589        rule_name: impl Into<String>,
590        level: ValidationLevel,
591        message: impl Into<String>,
592    ) -> Self {
593        Self {
594            rule_name: rule_name.into(),
595            passed: false,
596            level,
597            message: message.into(),
598            details: None,
599        }
600    }
601
602    /// Add details to the result
603    pub fn with_details(mut self, details: impl Into<String>) -> Self {
604        self.details = Some(details.into());
605        self
606    }
607}
608
609/// Collection of validation results
610#[derive(Debug, Clone, Default, Serialize, Deserialize)]
611pub struct ValidationReport {
612    /// All validation results
613    pub results: Vec<ValidationResult>,
614    /// Summary counts
615    pub summary: ValidationSummary,
616}
617
618impl ValidationReport {
619    /// Create a new empty report
620    pub fn new() -> Self {
621        Self::default()
622    }
623
624    /// Add a result to the report
625    pub fn add_result(&mut self, result: ValidationResult) {
626        if result.passed {
627            self.summary.passed += 1;
628        } else {
629            match result.level {
630                ValidationLevel::Error => self.summary.errors += 1,
631                ValidationLevel::Warning => self.summary.warnings += 1,
632                ValidationLevel::Info => self.summary.info += 1,
633            }
634        }
635        self.results.push(result);
636    }
637
638    /// Check if all validations passed (no errors)
639    pub fn is_valid(&self) -> bool {
640        self.summary.errors == 0
641    }
642
643    /// Check if there are any warnings
644    pub fn has_warnings(&self) -> bool {
645        self.summary.warnings > 0
646    }
647
648    /// Get all error messages
649    pub fn error_messages(&self) -> Vec<String> {
650        self.results
651            .iter()
652            .filter(|r| !r.passed && r.level == ValidationLevel::Error)
653            .map(|r| r.message.clone())
654            .collect()
655    }
656
657    /// Get all warning messages
658    pub fn warning_messages(&self) -> Vec<String> {
659        self.results
660            .iter()
661            .filter(|r| !r.passed && r.level == ValidationLevel::Warning)
662            .map(|r| r.message.clone())
663            .collect()
664    }
665
666    /// Get all info messages
667    pub fn info_messages(&self) -> Vec<String> {
668        self.results
669            .iter()
670            .filter(|r| !r.passed && r.level == ValidationLevel::Info)
671            .map(|r| r.message.clone())
672            .collect()
673    }
674}
675
676/// Summary counts for validation report
677#[derive(Debug, Clone, Default, Serialize, Deserialize)]
678pub struct ValidationSummary {
679    /// Number of passed validations
680    pub passed: usize,
681    /// Number of error-level failures
682    pub errors: usize,
683    /// Number of warning-level failures
684    pub warnings: usize,
685    /// Number of info-level failures
686    pub info: usize,
687}
688
689/// Version number within a subject
690#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
691pub struct SchemaVersion(pub u32);
692
693impl SchemaVersion {
694    /// The special "latest" version marker
695    pub const LATEST: u32 = u32::MAX;
696
697    pub fn new(version: u32) -> Self {
698        Self(version)
699    }
700
701    /// Create a version that represents "latest"
702    pub fn latest() -> Self {
703        Self(Self::LATEST)
704    }
705
706    /// Check if this represents the "latest" version
707    pub fn is_latest(&self) -> bool {
708        self.0 == Self::LATEST
709    }
710
711    pub fn as_u32(&self) -> u32 {
712        self.0
713    }
714}
715
716impl std::fmt::Display for SchemaVersion {
717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718        if self.is_latest() {
719            write!(f, "latest")
720        } else {
721            write!(f, "{}", self.0)
722        }
723    }
724}
725
726impl From<u32> for SchemaVersion {
727    fn from(v: u32) -> Self {
728        Self(v)
729    }
730}
731
732/// Schema metadata for organization and discovery
733#[derive(Debug, Clone, Default, Serialize, Deserialize)]
734pub struct SchemaMetadata {
735    /// Human-readable name
736    #[serde(skip_serializing_if = "Option::is_none")]
737    pub name: Option<String>,
738    /// Description of the schema
739    #[serde(skip_serializing_if = "Option::is_none")]
740    pub description: Option<String>,
741    /// Labels/tags for categorization (key-value pairs)
742    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
743    pub labels: HashMap<String, String>,
744    /// Owner/team responsible for the schema
745    #[serde(skip_serializing_if = "Option::is_none")]
746    pub owner: Option<String>,
747    /// Creation timestamp (RFC 3339)
748    #[serde(skip_serializing_if = "Option::is_none")]
749    pub created_at: Option<String>,
750    /// Last modified timestamp (RFC 3339)
751    #[serde(skip_serializing_if = "Option::is_none")]
752    pub modified_at: Option<String>,
753    /// Schema context for multi-tenant isolation
754    #[serde(skip_serializing_if = "Option::is_none")]
755    pub context: Option<String>,
756}
757
758impl SchemaMetadata {
759    pub fn new() -> Self {
760        Self::default()
761    }
762
763    pub fn with_name(mut self, name: impl Into<String>) -> Self {
764        self.name = Some(name.into());
765        self
766    }
767
768    pub fn with_description(mut self, description: impl Into<String>) -> Self {
769        self.description = Some(description.into());
770        self
771    }
772
773    pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
774        self.owner = Some(owner.into());
775        self
776    }
777
778    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
779        self.labels.insert(key.into(), value.into());
780        self
781    }
782
783    /// Set the context
784    pub fn with_context(mut self, context: impl Into<String>) -> Self {
785        self.context = Some(context.into());
786        self
787    }
788}
789
790/// A registered schema with its metadata
791#[derive(Debug, Clone, Serialize, Deserialize)]
792pub struct Schema {
793    /// Unique schema ID (global)
794    pub id: SchemaId,
795    /// Schema type/format
796    pub schema_type: SchemaType,
797    /// The schema definition (JSON string)
798    pub schema: String,
799    /// MD5 fingerprint for deduplication
800    #[serde(skip_serializing_if = "Option::is_none")]
801    pub fingerprint: Option<String>,
802    /// Schema references (for nested schemas)
803    #[serde(default, skip_serializing_if = "Vec::is_empty")]
804    pub references: Vec<SchemaReference>,
805    /// Schema metadata (name, description, labels)
806    #[serde(default, skip_serializing_if = "Option::is_none")]
807    pub metadata: Option<SchemaMetadata>,
808}
809
810impl Schema {
811    pub fn new(id: SchemaId, schema_type: SchemaType, schema: String) -> Self {
812        Self {
813            id,
814            schema_type,
815            schema,
816            fingerprint: None,
817            references: Vec::new(),
818            metadata: None,
819        }
820    }
821
822    pub fn with_fingerprint(mut self, fingerprint: String) -> Self {
823        self.fingerprint = Some(fingerprint);
824        self
825    }
826
827    pub fn with_references(mut self, references: Vec<SchemaReference>) -> Self {
828        self.references = references;
829        self
830    }
831
832    pub fn with_metadata(mut self, metadata: SchemaMetadata) -> Self {
833        self.metadata = Some(metadata);
834        self
835    }
836}
837
838/// Reference to another schema (for composition)
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct SchemaReference {
841    /// Reference name (used in the schema)
842    pub name: String,
843    /// Subject containing the referenced schema
844    pub subject: String,
845    /// Version of the referenced schema
846    pub version: u32,
847}
848
849/// A subject version combines subject, version, and schema
850#[derive(Debug, Clone, Serialize, Deserialize)]
851pub struct SubjectVersion {
852    /// Subject name
853    pub subject: Subject,
854    /// Version number
855    pub version: SchemaVersion,
856    /// Schema ID
857    pub id: SchemaId,
858    /// Schema type
859    pub schema_type: SchemaType,
860    /// The schema definition
861    pub schema: String,
862    /// Version state (enabled/deprecated/disabled)
863    #[serde(default)]
864    pub state: VersionState,
865}
866
867impl SubjectVersion {
868    pub fn new(
869        subject: Subject,
870        version: SchemaVersion,
871        id: SchemaId,
872        schema_type: SchemaType,
873        schema: String,
874    ) -> Self {
875        Self {
876            subject,
877            version,
878            id,
879            schema_type,
880            schema,
881            state: VersionState::Enabled,
882        }
883    }
884
885    /// Create with a specific state
886    pub fn with_state(mut self, state: VersionState) -> Self {
887        self.state = state;
888        self
889    }
890
891    /// Check if this version is usable
892    pub fn is_usable(&self) -> bool {
893        self.state.is_usable()
894    }
895}
896
897/// Mode for schema registration
898#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
899#[serde(rename_all = "UPPERCASE")]
900pub enum Mode {
901    /// Read-write mode (default)
902    #[default]
903    Readwrite,
904    /// Read-only mode
905    Readonly,
906    /// Import mode (for migrations)
907    Import,
908}
909
910impl std::fmt::Display for Mode {
911    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
912        match self {
913            Mode::Readwrite => write!(f, "READWRITE"),
914            Mode::Readonly => write!(f, "READONLY"),
915            Mode::Import => write!(f, "IMPORT"),
916        }
917    }
918}
919
920/// Compatibility level for schema evolution
921///
922/// | Level | Description | Can Read |
923/// |-------|-------------|----------|
924/// | BACKWARD | New schema can read old data | Old → New ✓ |
925/// | FORWARD | Old schema can read new data | New → Old ✓ |
926/// | FULL | Both directions | Both ✓ |
927/// | NONE | No checking | - |
928#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
929#[serde(rename_all = "UPPERCASE")]
930pub enum CompatibilityLevel {
931    /// New schema can read data written by old schema (default)
932    /// Safe for consumers to upgrade first
933    #[default]
934    Backward,
935
936    /// New schema can read data written by old schema (transitive)
937    BackwardTransitive,
938
939    /// Old schema can read data written by new schema
940    /// Safe for producers to upgrade first
941    Forward,
942
943    /// Old schema can read data written by new schema (transitive)
944    ForwardTransitive,
945
946    /// Both backward and forward compatible
947    Full,
948
949    /// Both backward and forward compatible (transitive)
950    FullTransitive,
951
952    /// No compatibility checking
953    None,
954}
955
956impl CompatibilityLevel {
957    /// Check if this level requires backward compatibility
958    pub fn is_backward(&self) -> bool {
959        matches!(
960            self,
961            Self::Backward | Self::BackwardTransitive | Self::Full | Self::FullTransitive
962        )
963    }
964
965    /// Check if this level requires forward compatibility
966    pub fn is_forward(&self) -> bool {
967        matches!(
968            self,
969            Self::Forward | Self::ForwardTransitive | Self::Full | Self::FullTransitive
970        )
971    }
972
973    /// Check if this level is transitive (checks all versions)
974    pub fn is_transitive(&self) -> bool {
975        matches!(
976            self,
977            Self::BackwardTransitive | Self::ForwardTransitive | Self::FullTransitive
978        )
979    }
980}
981
982impl std::fmt::Display for CompatibilityLevel {
983    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
984        let s = match self {
985            Self::Backward => "BACKWARD",
986            Self::BackwardTransitive => "BACKWARD_TRANSITIVE",
987            Self::Forward => "FORWARD",
988            Self::ForwardTransitive => "FORWARD_TRANSITIVE",
989            Self::Full => "FULL",
990            Self::FullTransitive => "FULL_TRANSITIVE",
991            Self::None => "NONE",
992        };
993        write!(f, "{}", s)
994    }
995}
996
997impl std::str::FromStr for CompatibilityLevel {
998    type Err = String;
999
1000    fn from_str(s: &str) -> Result<Self, Self::Err> {
1001        match s.to_uppercase().as_str() {
1002            "BACKWARD" => Ok(Self::Backward),
1003            "BACKWARD_TRANSITIVE" => Ok(Self::BackwardTransitive),
1004            "FORWARD" => Ok(Self::Forward),
1005            "FORWARD_TRANSITIVE" => Ok(Self::ForwardTransitive),
1006            "FULL" => Ok(Self::Full),
1007            "FULL_TRANSITIVE" => Ok(Self::FullTransitive),
1008            "NONE" => Ok(Self::None),
1009            _ => Err(format!("Unknown compatibility level: {}", s)),
1010        }
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use super::*;
1017
1018    #[test]
1019    fn test_schema_type_parse() {
1020        assert_eq!("avro".parse::<SchemaType>().unwrap(), SchemaType::Avro);
1021        assert_eq!("AVRO".parse::<SchemaType>().unwrap(), SchemaType::Avro);
1022        assert_eq!("json".parse::<SchemaType>().unwrap(), SchemaType::Json);
1023        assert_eq!(
1024            "protobuf".parse::<SchemaType>().unwrap(),
1025            SchemaType::Protobuf
1026        );
1027    }
1028
1029    #[test]
1030    fn test_subject_naming() {
1031        let key_subject = Subject::key("users");
1032        assert_eq!(key_subject.as_str(), "users-key");
1033
1034        let value_subject = Subject::value("users");
1035        assert_eq!(value_subject.as_str(), "users-value");
1036    }
1037
1038    #[test]
1039    fn test_schema_id() {
1040        let id = SchemaId::new(42);
1041        assert_eq!(id.as_u32(), 42);
1042        assert_eq!(format!("{}", id), "42");
1043    }
1044
1045    #[test]
1046    fn test_compatibility_level_parse() {
1047        assert_eq!(
1048            "backward".parse::<CompatibilityLevel>().unwrap(),
1049            CompatibilityLevel::Backward
1050        );
1051        assert_eq!(
1052            "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
1053            CompatibilityLevel::FullTransitive
1054        );
1055        assert_eq!(
1056            "none".parse::<CompatibilityLevel>().unwrap(),
1057            CompatibilityLevel::None
1058        );
1059    }
1060
1061    #[test]
1062    fn test_compatibility_level_methods() {
1063        assert!(CompatibilityLevel::Backward.is_backward());
1064        assert!(!CompatibilityLevel::Backward.is_forward());
1065        assert!(!CompatibilityLevel::Backward.is_transitive());
1066
1067        assert!(CompatibilityLevel::Full.is_backward());
1068        assert!(CompatibilityLevel::Full.is_forward());
1069        assert!(!CompatibilityLevel::Full.is_transitive());
1070
1071        assert!(CompatibilityLevel::FullTransitive.is_transitive());
1072    }
1073
1074    // ========================================================================
1075    // Schema Context Tests
1076    // ========================================================================
1077
1078    #[test]
1079    fn test_schema_context_default() {
1080        let ctx = SchemaContext::default();
1081        assert!(ctx.is_default());
1082        assert_eq!(ctx.name(), "");
1083        assert!(ctx.is_active());
1084    }
1085
1086    #[test]
1087    fn test_schema_context_new() {
1088        let ctx = SchemaContext::new("tenant-123");
1089        assert!(!ctx.is_default());
1090        assert_eq!(ctx.name(), "tenant-123");
1091        assert!(ctx.is_active());
1092    }
1093
1094    #[test]
1095    fn test_schema_context_with_description() {
1096        let ctx =
1097            SchemaContext::new("production").with_description("Production environment context");
1098
1099        assert_eq!(ctx.description(), Some("Production environment context"));
1100    }
1101
1102    #[test]
1103    fn test_schema_context_inactive() {
1104        let ctx = SchemaContext::new("deprecated").with_active(false);
1105
1106        assert!(!ctx.is_active());
1107    }
1108
1109    #[test]
1110    fn test_schema_context_qualify_subject() {
1111        let default_ctx = SchemaContext::default();
1112        let subject = Subject::new("users-value");
1113
1114        // Default context doesn't add prefix
1115        assert_eq!(default_ctx.qualify_subject(&subject), "users-value");
1116
1117        // Named context adds Confluent-compatible prefix
1118        let tenant_ctx = SchemaContext::new("tenant-123");
1119        assert_eq!(
1120            tenant_ctx.qualify_subject(&subject),
1121            ":.tenant-123:users-value"
1122        );
1123    }
1124
1125    #[test]
1126    fn test_schema_context_parse_qualified() {
1127        // Default context (no prefix)
1128        let (ctx, subject) = SchemaContext::parse_qualified("users-value");
1129        assert!(ctx.is_default());
1130        assert_eq!(subject.as_str(), "users-value");
1131
1132        // Named context with prefix
1133        let (ctx, subject) = SchemaContext::parse_qualified(":.tenant-123:users-value");
1134        assert_eq!(ctx.name(), "tenant-123");
1135        assert_eq!(subject.as_str(), "users-value");
1136    }
1137
1138    #[test]
1139    fn test_schema_context_display() {
1140        let default_ctx = SchemaContext::default();
1141        assert_eq!(format!("{}", default_ctx), "(default)");
1142
1143        let named_ctx = SchemaContext::new("prod");
1144        assert_eq!(format!("{}", named_ctx), "prod");
1145    }
1146
1147    // ========================================================================
1148    // Version State Tests
1149    // ========================================================================
1150
1151    #[test]
1152    fn test_version_state_default() {
1153        let state = VersionState::default();
1154        assert_eq!(state, VersionState::Enabled);
1155    }
1156
1157    #[test]
1158    fn test_version_state_usable() {
1159        assert!(VersionState::Enabled.is_usable());
1160        assert!(VersionState::Deprecated.is_usable());
1161        assert!(!VersionState::Disabled.is_usable());
1162    }
1163
1164    #[test]
1165    fn test_version_state_requires_warning() {
1166        assert!(!VersionState::Enabled.requires_warning());
1167        assert!(VersionState::Deprecated.requires_warning());
1168        assert!(!VersionState::Disabled.requires_warning());
1169    }
1170
1171    #[test]
1172    fn test_version_state_blocked() {
1173        assert!(!VersionState::Enabled.is_blocked());
1174        assert!(!VersionState::Deprecated.is_blocked());
1175        assert!(VersionState::Disabled.is_blocked());
1176    }
1177
1178    #[test]
1179    fn test_version_state_transitions() {
1180        let enabled = VersionState::Enabled;
1181
1182        // Enabled -> Deprecated
1183        let deprecated = enabled.deprecate();
1184        assert_eq!(deprecated, VersionState::Deprecated);
1185
1186        // Deprecated -> Deprecated (no change)
1187        let still_deprecated = deprecated.deprecate();
1188        assert_eq!(still_deprecated, VersionState::Deprecated);
1189
1190        // Deprecated -> Disabled
1191        let disabled = deprecated.disable();
1192        assert_eq!(disabled, VersionState::Disabled);
1193
1194        // Disabled -> Enabled
1195        let re_enabled = disabled.enable();
1196        assert_eq!(re_enabled, VersionState::Enabled);
1197    }
1198
1199    #[test]
1200    fn test_version_state_parse() {
1201        assert_eq!(
1202            "ENABLED".parse::<VersionState>().unwrap(),
1203            VersionState::Enabled
1204        );
1205        assert_eq!(
1206            "deprecated".parse::<VersionState>().unwrap(),
1207            VersionState::Deprecated
1208        );
1209        assert_eq!(
1210            "Disabled".parse::<VersionState>().unwrap(),
1211            VersionState::Disabled
1212        );
1213        assert!("invalid".parse::<VersionState>().is_err());
1214    }
1215
1216    #[test]
1217    fn test_version_state_display() {
1218        assert_eq!(format!("{}", VersionState::Enabled), "ENABLED");
1219        assert_eq!(format!("{}", VersionState::Deprecated), "DEPRECATED");
1220        assert_eq!(format!("{}", VersionState::Disabled), "DISABLED");
1221    }
1222
1223    // ========================================================================
1224    // Validation Rule Tests
1225    // ========================================================================
1226
1227    #[test]
1228    fn test_validation_rule_new() {
1229        let rule = ValidationRule::new(
1230            "doc-required",
1231            ValidationRuleType::FieldRequired,
1232            r#"{"field": "doc"}"#,
1233        );
1234
1235        assert_eq!(rule.name, "doc-required");
1236        assert_eq!(rule.rule_type, ValidationRuleType::FieldRequired);
1237        assert!(rule.active);
1238        assert_eq!(rule.level, ValidationLevel::Error);
1239    }
1240
1241    #[test]
1242    fn test_validation_rule_builder() {
1243        let rule = ValidationRule::new(
1244            "naming-convention",
1245            ValidationRuleType::NamingConvention,
1246            r#"{"pattern": "^[A-Z]"}"#,
1247        )
1248        .with_level(ValidationLevel::Warning)
1249        .with_description("Enforce PascalCase naming")
1250        .with_schema_types(vec![SchemaType::Avro])
1251        .with_subject_patterns(vec![".*-value$".to_string()]);
1252
1253        assert_eq!(rule.level, ValidationLevel::Warning);
1254        assert_eq!(
1255            rule.description,
1256            Some("Enforce PascalCase naming".to_string())
1257        );
1258        assert_eq!(rule.applies_to, vec![SchemaType::Avro]);
1259        assert_eq!(rule.subject_patterns, vec![".*-value$".to_string()]);
1260    }
1261
1262    #[test]
1263    fn test_validation_rule_applies() {
1264        let rule = ValidationRule::new("test", ValidationRuleType::JsonSchema, "{}")
1265            .with_schema_types(vec![SchemaType::Avro])
1266            .with_subject_patterns(vec!["users-.*".to_string()]);
1267
1268        // Matches both type and pattern
1269        assert!(rule.applies(SchemaType::Avro, "users-value"));
1270
1271        // Wrong type
1272        assert!(!rule.applies(SchemaType::Json, "users-value"));
1273
1274        // Wrong pattern
1275        assert!(!rule.applies(SchemaType::Avro, "orders-value"));
1276    }
1277
1278    #[test]
1279    fn test_validation_rule_inactive() {
1280        let rule =
1281            ValidationRule::new("test", ValidationRuleType::MaxSize, "{}").with_active(false);
1282
1283        assert!(!rule.applies(SchemaType::Avro, "any-subject"));
1284    }
1285
1286    #[test]
1287    fn test_validation_rule_no_filters() {
1288        let rule = ValidationRule::new("test", ValidationRuleType::Regex, "{}");
1289
1290        // Without filters, applies to everything
1291        assert!(rule.applies(SchemaType::Avro, "any-subject"));
1292        assert!(rule.applies(SchemaType::Json, "other-subject"));
1293        assert!(rule.applies(SchemaType::Protobuf, "third-subject"));
1294    }
1295
1296    // ========================================================================
1297    // Validation Result Tests
1298    // ========================================================================
1299
1300    #[test]
1301    fn test_validation_result_pass() {
1302        let result = ValidationResult::pass("test-rule");
1303
1304        assert!(result.passed);
1305        assert_eq!(result.rule_name, "test-rule");
1306    }
1307
1308    #[test]
1309    fn test_validation_result_fail() {
1310        let result = ValidationResult::fail(
1311            "naming-rule",
1312            ValidationLevel::Error,
1313            "Schema name must be PascalCase",
1314        )
1315        .with_details(r#"{"name": "invalidName"}"#);
1316
1317        assert!(!result.passed);
1318        assert_eq!(result.level, ValidationLevel::Error);
1319        assert_eq!(result.message, "Schema name must be PascalCase");
1320        assert!(result.details.is_some());
1321    }
1322
1323    // ========================================================================
1324    // Validation Report Tests
1325    // ========================================================================
1326
1327    #[test]
1328    fn test_validation_report_empty() {
1329        let report = ValidationReport::new();
1330
1331        assert!(report.is_valid());
1332        assert!(!report.has_warnings());
1333        assert!(report.results.is_empty());
1334    }
1335
1336    #[test]
1337    fn test_validation_report_with_results() {
1338        let mut report = ValidationReport::new();
1339
1340        report.add_result(ValidationResult::pass("rule1"));
1341        report.add_result(ValidationResult::fail(
1342            "rule2",
1343            ValidationLevel::Warning,
1344            "warning msg",
1345        ));
1346        report.add_result(ValidationResult::fail(
1347            "rule3",
1348            ValidationLevel::Error,
1349            "error msg",
1350        ));
1351
1352        assert!(!report.is_valid()); // Has errors
1353        assert!(report.has_warnings());
1354        assert_eq!(report.summary.passed, 1);
1355        assert_eq!(report.summary.warnings, 1);
1356        assert_eq!(report.summary.errors, 1);
1357    }
1358
1359    #[test]
1360    fn test_validation_report_error_messages() {
1361        let mut report = ValidationReport::new();
1362
1363        report.add_result(ValidationResult::fail(
1364            "rule1",
1365            ValidationLevel::Error,
1366            "Error 1",
1367        ));
1368        report.add_result(ValidationResult::fail(
1369            "rule2",
1370            ValidationLevel::Warning,
1371            "Warning 1",
1372        ));
1373        report.add_result(ValidationResult::fail(
1374            "rule3",
1375            ValidationLevel::Error,
1376            "Error 2",
1377        ));
1378
1379        let errors: Vec<String> = report.error_messages();
1380        assert_eq!(errors.len(), 2);
1381        assert!(errors.contains(&"Error 1".to_string()));
1382        assert!(errors.contains(&"Error 2".to_string()));
1383
1384        let warnings: Vec<String> = report.warning_messages();
1385        assert_eq!(warnings.len(), 1);
1386        assert!(warnings.contains(&"Warning 1".to_string()));
1387    }
1388
1389    // ========================================================================
1390    // Schema Metadata Tests
1391    // ========================================================================
1392
1393    #[test]
1394    fn test_schema_metadata_builder() {
1395        let metadata = SchemaMetadata::new()
1396            .with_name("UserEvent")
1397            .with_description("Schema for user events")
1398            .with_owner("platform-team")
1399            .with_label("domain", "users")
1400            .with_label("version", "v2")
1401            .with_context("production");
1402
1403        assert_eq!(metadata.name, Some("UserEvent".to_string()));
1404        assert_eq!(
1405            metadata.description,
1406            Some("Schema for user events".to_string())
1407        );
1408        assert_eq!(metadata.owner, Some("platform-team".to_string()));
1409        assert_eq!(metadata.labels.get("domain"), Some(&"users".to_string()));
1410        assert_eq!(metadata.labels.get("version"), Some(&"v2".to_string()));
1411        assert_eq!(metadata.context, Some("production".to_string()));
1412    }
1413
1414    // ========================================================================
1415    // SubjectVersion Tests
1416    // ========================================================================
1417
1418    #[test]
1419    fn test_subject_version_with_state() {
1420        let sv = SubjectVersion::new(
1421            Subject::new("users-value"),
1422            SchemaVersion::new(1),
1423            SchemaId::new(1),
1424            SchemaType::Avro,
1425            r#"{"type":"string"}"#.to_string(),
1426        )
1427        .with_state(VersionState::Deprecated);
1428
1429        assert_eq!(sv.state, VersionState::Deprecated);
1430        assert!(sv.is_usable()); // Deprecated is still usable
1431    }
1432
1433    #[test]
1434    fn test_subject_version_disabled() {
1435        let sv = SubjectVersion::new(
1436            Subject::new("users-value"),
1437            SchemaVersion::new(1),
1438            SchemaId::new(1),
1439            SchemaType::Avro,
1440            r#"{"type":"string"}"#.to_string(),
1441        )
1442        .with_state(VersionState::Disabled);
1443
1444        assert!(!sv.is_usable()); // Disabled is not usable
1445    }
1446}