Skip to main content

oxirs_stream/
schema_registry.rs

1//! # Schema Registry for Stream Events
2//!
3//! Enterprise-grade schema management, validation, and evolution for RDF streaming events.
4//! Provides centralized schema storage, versioning, compatibility checking, and validation.
5//!
6//! Key features:
7//! - Schema registration and versioning
8//! - Forward/backward compatibility checks
9//! - Event validation against schemas
10//! - Schema evolution management
11//! - Integration with external schema registries (Confluent, etc.)
12
13#[cfg(test)]
14use crate::EventMetadata;
15use crate::StreamEvent;
16use anyhow::{anyhow, Result};
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25/// Schema format types supported by the registry
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum SchemaFormat {
28    /// JSON Schema for JSON events
29    JsonSchema,
30    /// Apache Avro schema
31    Avro,
32    /// Protocol Buffers schema
33    Protobuf,
34    /// RDF/SPARQL schema (custom)
35    RdfSparql,
36    /// Custom schema format
37    Custom { format_name: String },
38}
39
40/// Schema compatibility modes for evolution
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub enum CompatibilityMode {
43    /// No compatibility checking
44    None,
45    /// New schema must be backward compatible
46    Backward,
47    /// New schema must be forward compatible
48    Forward,
49    /// New schema must be both backward and forward compatible
50    Full,
51    /// New schema can break compatibility (major version change)
52    Breaking,
53}
54
55/// Schema definition with metadata
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SchemaDefinition {
58    /// Unique schema identifier
59    pub id: Uuid,
60    /// Schema subject (topic/event type)
61    pub subject: String,
62    /// Schema version
63    pub version: u32,
64    /// Schema format
65    pub format: SchemaFormat,
66    /// Schema content (JSON, Avro, etc.)
67    pub schema_content: String,
68    /// Schema title/name
69    pub title: Option<String>,
70    /// Schema description
71    pub description: Option<String>,
72    /// Creation timestamp
73    pub created_at: DateTime<Utc>,
74    /// Last updated timestamp
75    pub updated_at: DateTime<Utc>,
76    /// Compatibility mode for this schema
77    pub compatibility: CompatibilityMode,
78    /// Tags for organization
79    pub tags: Vec<String>,
80    /// Schema metadata
81    pub metadata: HashMap<String, String>,
82}
83
84impl SchemaDefinition {
85    pub fn new(
86        subject: String,
87        version: u32,
88        format: SchemaFormat,
89        schema_content: String,
90    ) -> Self {
91        let now = Utc::now();
92        Self {
93            id: Uuid::new_v4(),
94            subject,
95            version,
96            format,
97            schema_content,
98            title: None,
99            description: None,
100            created_at: now,
101            updated_at: now,
102            compatibility: CompatibilityMode::Backward,
103            tags: Vec::new(),
104            metadata: HashMap::new(),
105        }
106    }
107
108    /// Update schema content and metadata
109    pub fn update_content(&mut self, content: String) {
110        self.schema_content = content;
111        self.updated_at = Utc::now();
112    }
113
114    /// Add tag to schema
115    pub fn add_tag(&mut self, tag: String) {
116        if !self.tags.contains(&tag) {
117            self.tags.push(tag);
118        }
119    }
120
121    /// Set schema metadata
122    pub fn set_metadata(&mut self, key: String, value: String) {
123        self.metadata.insert(key, value);
124        self.updated_at = Utc::now();
125    }
126}
127
128/// Schema validation result
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct ValidationResult {
131    /// Whether validation passed
132    pub is_valid: bool,
133    /// Validation errors (if any)
134    pub errors: Vec<String>,
135    /// Validation warnings
136    pub warnings: Vec<String>,
137    /// Schema used for validation
138    pub schema_id: Uuid,
139    /// Schema version used
140    pub schema_version: u32,
141    /// Validation timestamp
142    pub validated_at: DateTime<Utc>,
143}
144
145impl ValidationResult {
146    pub fn success(schema_id: Uuid, schema_version: u32) -> Self {
147        Self {
148            is_valid: true,
149            errors: Vec::new(),
150            warnings: Vec::new(),
151            schema_id,
152            schema_version,
153            validated_at: Utc::now(),
154        }
155    }
156
157    pub fn failure(schema_id: Uuid, schema_version: u32, errors: Vec<String>) -> Self {
158        Self {
159            is_valid: false,
160            errors,
161            warnings: Vec::new(),
162            schema_id,
163            schema_version,
164            validated_at: Utc::now(),
165        }
166    }
167
168    pub fn add_warning(&mut self, warning: String) {
169        self.warnings.push(warning);
170    }
171}
172
173/// Schema registry configuration
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct SchemaRegistryConfig {
176    /// Enable schema validation
177    pub enable_validation: bool,
178    /// Strict validation mode (fail on warnings)
179    pub strict_mode: bool,
180    /// Cache schema definitions in memory
181    pub enable_caching: bool,
182    /// Cache TTL in seconds
183    pub cache_ttl_seconds: u64,
184    /// External registry integration
185    pub external_registry: Option<ExternalRegistryConfig>,
186    /// Default compatibility mode for new schemas
187    pub default_compatibility: CompatibilityMode,
188    /// Maximum number of schema versions to keep
189    pub max_versions_per_subject: u32,
190}
191
192impl Default for SchemaRegistryConfig {
193    fn default() -> Self {
194        Self {
195            enable_validation: true,
196            strict_mode: false,
197            enable_caching: true,
198            cache_ttl_seconds: 3600, // 1 hour
199            external_registry: None,
200            default_compatibility: CompatibilityMode::Backward,
201            max_versions_per_subject: 10,
202        }
203    }
204}
205
206/// External schema registry configuration
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ExternalRegistryConfig {
209    /// Registry type (confluent, apicurio, etc.)
210    pub registry_type: String,
211    /// Registry URL
212    pub url: String,
213    /// Authentication configuration
214    pub auth: Option<RegistryAuth>,
215    /// Enable synchronization
216    pub enable_sync: bool,
217    /// Sync interval in seconds
218    pub sync_interval_seconds: u64,
219}
220
221/// Registry authentication configuration
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct RegistryAuth {
224    /// Authentication type (basic, bearer, etc.)
225    pub auth_type: String,
226    /// Username (for basic auth)
227    pub username: Option<String>,
228    /// Password (for basic auth)
229    pub password: Option<String>,
230    /// Bearer token
231    pub token: Option<String>,
232}
233
234/// In-memory schema registry implementation
235pub struct SchemaRegistry {
236    /// Registry configuration
237    config: SchemaRegistryConfig,
238    /// Schema definitions by subject and version
239    schemas: Arc<RwLock<HashMap<String, HashMap<u32, SchemaDefinition>>>>,
240    /// Schema cache for fast lookups
241    schema_cache: Arc<RwLock<HashMap<Uuid, SchemaDefinition>>>,
242    /// Latest version per subject
243    latest_versions: Arc<RwLock<HashMap<String, u32>>>,
244    /// Validation statistics
245    validation_stats: Arc<RwLock<ValidationStats>>,
246}
247
248/// Validation statistics
249#[derive(Debug, Default, Clone, Serialize, Deserialize)]
250pub struct ValidationStats {
251    pub total_validations: u64,
252    pub successful_validations: u64,
253    pub failed_validations: u64,
254    pub warnings_count: u64,
255    pub validation_time_ms: f64,
256    pub cache_hits: u64,
257    pub cache_misses: u64,
258}
259
260impl SchemaRegistry {
261    /// Create a new schema registry
262    pub fn new(config: SchemaRegistryConfig) -> Self {
263        Self {
264            config,
265            schemas: Arc::new(RwLock::new(HashMap::new())),
266            schema_cache: Arc::new(RwLock::new(HashMap::new())),
267            latest_versions: Arc::new(RwLock::new(HashMap::new())),
268            validation_stats: Arc::new(RwLock::new(ValidationStats::default())),
269        }
270    }
271
272    /// Register a new schema
273    pub async fn register_schema(
274        &self,
275        subject: String,
276        format: SchemaFormat,
277        schema_content: String,
278        compatibility: Option<CompatibilityMode>,
279    ) -> Result<SchemaDefinition> {
280        let mut schemas = self.schemas.write().await;
281        let mut latest_versions = self.latest_versions.write().await;
282
283        // Get next version number
284        let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
285        let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
286
287        // Check compatibility if there are existing versions
288        if next_version > 1 {
289            let latest_version = next_version - 1;
290            if let Some(existing_schema) = subject_schemas.get(&latest_version) {
291                self.check_compatibility(existing_schema, &schema_content, format.clone())
292                    .await?;
293            }
294        }
295
296        // Create new schema definition
297        let mut schema =
298            SchemaDefinition::new(subject.clone(), next_version, format, schema_content);
299
300        if let Some(compat) = compatibility {
301            schema.compatibility = compat;
302        } else {
303            schema.compatibility = self.config.default_compatibility.clone();
304        }
305
306        // Store schema
307        subject_schemas.insert(next_version, schema.clone());
308        latest_versions.insert(subject.clone(), next_version);
309
310        // Update cache
311        if self.config.enable_caching {
312            let mut cache = self.schema_cache.write().await;
313            cache.insert(schema.id, schema.clone());
314        }
315
316        info!(
317            "Registered schema for subject '{}' version {} with ID {}",
318            subject, next_version, schema.id
319        );
320
321        Ok(schema)
322    }
323
324    /// Get schema by subject and version
325    pub async fn get_schema(
326        &self,
327        subject: &str,
328        version: Option<u32>,
329    ) -> Result<Option<SchemaDefinition>> {
330        let schemas = self.schemas.read().await;
331
332        if let Some(subject_schemas) = schemas.get(subject) {
333            let version = if let Some(v) = version {
334                v
335            } else {
336                // Get latest version
337                let latest_versions = self.latest_versions.read().await;
338                *latest_versions
339                    .get(subject)
340                    .ok_or_else(|| anyhow!("No schemas found for subject: {}", subject))?
341            };
342
343            Ok(subject_schemas.get(&version).cloned())
344        } else {
345            Ok(None)
346        }
347    }
348
349    /// Get schema by ID
350    pub async fn get_schema_by_id(&self, schema_id: &Uuid) -> Result<Option<SchemaDefinition>> {
351        // Try cache first
352        if self.config.enable_caching {
353            let cache = self.schema_cache.read().await;
354            if let Some(schema) = cache.get(schema_id) {
355                let mut stats = self.validation_stats.write().await;
356                stats.cache_hits += 1;
357                return Ok(Some(schema.clone()));
358            }
359        }
360
361        // Search through all schemas
362        let schemas = self.schemas.read().await;
363        for subject_schemas in schemas.values() {
364            for schema in subject_schemas.values() {
365                if &schema.id == schema_id {
366                    // Update cache
367                    if self.config.enable_caching {
368                        let mut cache = self.schema_cache.write().await;
369                        cache.insert(*schema_id, schema.clone());
370                    }
371
372                    let mut stats = self.validation_stats.write().await;
373                    stats.cache_misses += 1;
374                    return Ok(Some(schema.clone()));
375                }
376            }
377        }
378
379        Ok(None)
380    }
381
382    /// List all schemas for a subject
383    pub async fn list_schemas(&self, subject: &str) -> Result<Vec<SchemaDefinition>> {
384        let schemas = self.schemas.read().await;
385
386        if let Some(subject_schemas) = schemas.get(subject) {
387            let mut schemas: Vec<SchemaDefinition> = subject_schemas.values().cloned().collect();
388            schemas.sort_by(|a, b| a.version.cmp(&b.version));
389            Ok(schemas)
390        } else {
391            Ok(Vec::new())
392        }
393    }
394
395    /// List all subjects
396    pub async fn list_subjects(&self) -> Result<Vec<String>> {
397        let schemas = self.schemas.read().await;
398        Ok(schemas.keys().cloned().collect())
399    }
400
401    /// Validate event against schema
402    pub async fn validate_event(
403        &self,
404        event: &StreamEvent,
405        subject: Option<&str>,
406    ) -> Result<ValidationResult> {
407        if !self.config.enable_validation {
408            return Ok(ValidationResult::success(Uuid::new_v4(), 1));
409        }
410
411        let start_time = std::time::Instant::now();
412        let mut stats = self.validation_stats.write().await;
413        stats.total_validations += 1;
414        drop(stats);
415
416        // Determine subject from event or parameter
417        let event_subject = subject
418            .map(|s| s.to_string())
419            .or_else(|| self.extract_subject_from_event(event))
420            .ok_or_else(|| anyhow!("Cannot determine subject for validation"))?;
421
422        // Get latest schema for subject
423        let schema = self
424            .get_schema(&event_subject, None)
425            .await?
426            .ok_or_else(|| anyhow!("No schema found for subject: {}", event_subject))?;
427
428        // Perform validation based on schema format
429        let validation_result = match schema.format {
430            SchemaFormat::JsonSchema => self.validate_with_json_schema(event, &schema).await?,
431            SchemaFormat::RdfSparql => self.validate_with_rdf_schema(event, &schema).await?,
432            SchemaFormat::Avro => self.validate_with_avro_schema(event, &schema).await?,
433            _ => {
434                warn!("Validation not implemented for format: {:?}", schema.format);
435                ValidationResult::success(schema.id, schema.version)
436            }
437        };
438
439        // Update statistics
440        let elapsed = start_time.elapsed();
441        let mut stats = self.validation_stats.write().await;
442        stats.validation_time_ms = (stats.validation_time_ms + elapsed.as_millis() as f64) / 2.0;
443
444        if validation_result.is_valid {
445            stats.successful_validations += 1;
446        } else {
447            stats.failed_validations += 1;
448        }
449
450        stats.warnings_count += validation_result.warnings.len() as u64;
451
452        debug!(
453            "Validated event against schema {} ({}ms): {}",
454            schema.id,
455            elapsed.as_millis(),
456            if validation_result.is_valid {
457                "VALID"
458            } else {
459                "INVALID"
460            }
461        );
462
463        Ok(validation_result)
464    }
465
466    /// Extract subject from event metadata or event type
467    fn extract_subject_from_event(&self, event: &StreamEvent) -> Option<String> {
468        // Try to get subject from event metadata
469        match event {
470            StreamEvent::TripleAdded { metadata, .. } => metadata
471                .properties
472                .get("subject")
473                .cloned()
474                .or_else(|| Some("rdf.triple.added".to_string())),
475            StreamEvent::TripleRemoved { metadata, .. } => metadata
476                .properties
477                .get("subject")
478                .cloned()
479                .or_else(|| Some("rdf.triple.removed".to_string())),
480            StreamEvent::SparqlUpdate { metadata, .. } => metadata
481                .properties
482                .get("subject")
483                .cloned()
484                .or_else(|| Some("sparql.update".to_string())),
485            StreamEvent::TransactionBegin { metadata, .. } => metadata
486                .properties
487                .get("subject")
488                .cloned()
489                .or_else(|| Some("transaction.begin".to_string())),
490            StreamEvent::TransactionCommit { metadata, .. } => metadata
491                .properties
492                .get("subject")
493                .cloned()
494                .or_else(|| Some("transaction.commit".to_string())),
495            _ => Some(format!("stream.event.{:?}", std::mem::discriminant(event))),
496        }
497    }
498
499    /// Check compatibility between schemas
500    async fn check_compatibility(
501        &self,
502        existing_schema: &SchemaDefinition,
503        new_schema_content: &str,
504        new_format: SchemaFormat,
505    ) -> Result<()> {
506        if existing_schema.compatibility == CompatibilityMode::None {
507            return Ok(());
508        }
509
510        if existing_schema.format != new_format {
511            return Err(anyhow!(
512                "Schema format changed from {:?} to {:?}",
513                existing_schema.format,
514                new_format
515            ));
516        }
517
518        // Basic compatibility checking (simplified)
519        match new_format {
520            SchemaFormat::JsonSchema => {
521                self.check_json_schema_compatibility(existing_schema, new_schema_content)
522                    .await
523            }
524            SchemaFormat::RdfSparql => {
525                self.check_rdf_schema_compatibility(existing_schema, new_schema_content)
526                    .await
527            }
528            _ => {
529                warn!(
530                    "Compatibility checking not implemented for format: {:?}",
531                    new_format
532                );
533                Ok(())
534            }
535        }
536    }
537
538    /// Validate event with JSON schema
539    async fn validate_with_json_schema(
540        &self,
541        _event: &StreamEvent,
542        schema: &SchemaDefinition,
543    ) -> Result<ValidationResult> {
544        // Simplified JSON schema validation
545        // In a real implementation, you would use a JSON schema library
546        debug!("Validating with JSON schema: {}", schema.id);
547        Ok(ValidationResult::success(schema.id, schema.version))
548    }
549
550    /// Validate event with RDF/SPARQL schema
551    async fn validate_with_rdf_schema(
552        &self,
553        event: &StreamEvent,
554        schema: &SchemaDefinition,
555    ) -> Result<ValidationResult> {
556        // Custom RDF validation logic
557        match event {
558            StreamEvent::TripleAdded {
559                subject,
560                predicate,
561                object: _,
562                ..
563            } => {
564                let mut errors = Vec::new();
565
566                // Basic URI validation
567                if !subject.starts_with("http://") && !subject.starts_with("https://") {
568                    errors.push(format!("Invalid subject URI: {subject}"));
569                }
570
571                if !predicate.starts_with("http://") && !predicate.starts_with("https://") {
572                    errors.push(format!("Invalid predicate URI: {predicate}"));
573                }
574
575                if errors.is_empty() {
576                    Ok(ValidationResult::success(schema.id, schema.version))
577                } else {
578                    Ok(ValidationResult::failure(schema.id, schema.version, errors))
579                }
580            }
581            _ => Ok(ValidationResult::success(schema.id, schema.version)),
582        }
583    }
584
585    /// Validate event with Avro schema
586    async fn validate_with_avro_schema(
587        &self,
588        _event: &StreamEvent,
589        schema: &SchemaDefinition,
590    ) -> Result<ValidationResult> {
591        // Simplified Avro validation
592        // In a real implementation, you would use the Apache Avro library
593        debug!("Validating with Avro schema: {}", schema.id);
594        Ok(ValidationResult::success(schema.id, schema.version))
595    }
596
597    /// Check JSON schema compatibility
598    async fn check_json_schema_compatibility(
599        &self,
600        _existing_schema: &SchemaDefinition,
601        _new_schema_content: &str,
602    ) -> Result<()> {
603        // Simplified compatibility check
604        // Real implementation would parse and compare JSON schemas
605        Ok(())
606    }
607
608    /// Check RDF schema compatibility
609    async fn check_rdf_schema_compatibility(
610        &self,
611        _existing_schema: &SchemaDefinition,
612        _new_schema_content: &str,
613    ) -> Result<()> {
614        // Simplified compatibility check for RDF schemas
615        Ok(())
616    }
617
618    /// Get validation statistics
619    pub async fn get_validation_stats(&self) -> ValidationStats {
620        let stats = self.validation_stats.read().await;
621        (*stats).clone()
622    }
623
624    /// Delete schema
625    pub async fn delete_schema(&self, subject: &str, version: Option<u32>) -> Result<bool> {
626        let mut schemas = self.schemas.write().await;
627        let mut latest_versions = self.latest_versions.write().await;
628
629        if let Some(subject_schemas) = schemas.get_mut(subject) {
630            if let Some(version) = version {
631                // Delete specific version
632                let removed = subject_schemas.remove(&version).is_some();
633
634                // Update latest version if this was the latest
635                if let Some(latest) = latest_versions.get(subject) {
636                    if *latest == version {
637                        let new_latest = subject_schemas.keys().max().cloned();
638                        if let Some(new_latest) = new_latest {
639                            latest_versions.insert(subject.to_string(), new_latest);
640                        } else {
641                            latest_versions.remove(subject);
642                            schemas.remove(subject);
643                        }
644                    }
645                }
646
647                Ok(removed)
648            } else {
649                // Delete all versions for subject
650                schemas.remove(subject);
651                latest_versions.remove(subject);
652                Ok(true)
653            }
654        } else {
655            Ok(false)
656        }
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use chrono::Utc;
664    use std::collections::HashMap;
665
666    #[tokio::test]
667    async fn test_schema_registration() -> Result<()> {
668        let config = SchemaRegistryConfig::default();
669        let registry = SchemaRegistry::new(config);
670
671        let schema_content = r#"
672        {
673            "type": "object",
674            "properties": {
675                "subject": {"type": "string"},
676                "predicate": {"type": "string"},
677                "object": {"type": "string"}
678            },
679            "required": ["subject", "predicate", "object"]
680        }"#;
681
682        let schema = registry
683            .register_schema(
684                "rdf.triple.added".to_string(),
685                SchemaFormat::JsonSchema,
686                schema_content.to_string(),
687                None,
688            )
689            .await?;
690
691        assert_eq!(schema.subject, "rdf.triple.added");
692        assert_eq!(schema.version, 1);
693        assert_eq!(schema.format, SchemaFormat::JsonSchema);
694
695        Ok(())
696    }
697
698    #[tokio::test]
699    async fn test_schema_retrieval() -> Result<()> {
700        let config = SchemaRegistryConfig::default();
701        let registry = SchemaRegistry::new(config);
702
703        let schema_content = r#"{"type": "object"}"#;
704        let registered_schema = registry
705            .register_schema(
706                "test.subject".to_string(),
707                SchemaFormat::JsonSchema,
708                schema_content.to_string(),
709                None,
710            )
711            .await?;
712
713        // Get by subject and version
714        let retrieved = registry
715            .get_schema("test.subject", Some(1))
716            .await?
717            .expect("Schema should exist");
718
719        assert_eq!(retrieved.id, registered_schema.id);
720        assert_eq!(retrieved.version, 1);
721
722        // Get by ID
723        let retrieved_by_id = registry
724            .get_schema_by_id(&registered_schema.id)
725            .await?
726            .expect("Schema should exist");
727
728        assert_eq!(retrieved_by_id.id, registered_schema.id);
729
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn test_event_validation() -> Result<()> {
735        let config = SchemaRegistryConfig::default();
736        let registry = SchemaRegistry::new(config);
737
738        // Register RDF schema
739        let schema_content = "RDF Triple Schema";
740        registry
741            .register_schema(
742                "rdf.triple.added".to_string(),
743                SchemaFormat::RdfSparql,
744                schema_content.to_string(),
745                None,
746            )
747            .await?;
748
749        // Create test event
750        let event = StreamEvent::TripleAdded {
751            subject: "https://example.org/subject".to_string(),
752            predicate: "https://example.org/predicate".to_string(),
753            object: "\"Test Object\"".to_string(),
754            graph: None,
755            metadata: EventMetadata {
756                event_id: "test_event_1".to_string(),
757                timestamp: Utc::now(),
758                source: "test".to_string(),
759                user: None,
760                context: None,
761                caused_by: None,
762                version: "1.0".to_string(),
763                properties: HashMap::new(),
764                checksum: None,
765            },
766        };
767
768        let validation_result = registry
769            .validate_event(&event, Some("rdf.triple.added"))
770            .await?;
771
772        assert!(validation_result.is_valid);
773        assert!(validation_result.errors.is_empty());
774
775        Ok(())
776    }
777
778    #[tokio::test]
779    async fn test_schema_versioning() -> Result<()> {
780        let config = SchemaRegistryConfig::default();
781        let registry = SchemaRegistry::new(config);
782
783        let subject = "test.versioning".to_string();
784
785        // Register version 1
786        let _v1 = registry
787            .register_schema(
788                subject.clone(),
789                SchemaFormat::JsonSchema,
790                r#"{"type": "object", "properties": {"name": {"type": "string"}}}"#.to_string(),
791                None,
792            )
793            .await?;
794
795        // Register version 2
796        let _v2 = registry
797            .register_schema(
798                subject.clone(),
799                SchemaFormat::JsonSchema,
800                r#"{"type": "object", "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}}"#.to_string(),
801                None,
802            )
803            .await?;
804
805        // List all schemas for subject
806        let schemas = registry.list_schemas(&subject).await?;
807        assert_eq!(schemas.len(), 2);
808        assert_eq!(schemas[0].version, 1);
809        assert_eq!(schemas[1].version, 2);
810
811        // Get latest version
812        let latest = registry.get_schema(&subject, None).await?.unwrap();
813        assert_eq!(latest.version, 2);
814
815        Ok(())
816    }
817}