use crate::schema::compatibility::Compatibility;
use crate::schema::version::SchemaVersion;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Schema {
pub id: u32,
pub version: SchemaVersion,
pub definition: String,
}
pub struct SchemaRegistry {
schemas: Arc<RwLock<HashMap<String, Vec<Schema>>>>,
compatibility: Compatibility,
}
impl SchemaRegistry {
pub fn new() -> Self {
SchemaRegistry {
schemas: Arc::new(RwLock::new(HashMap::new())),
compatibility: Compatibility::BACKWARD,
}
}
pub fn register_schema(&self, topic: &str, definition: &str) -> Result<Schema, String> {
let mut schemas = self.schemas.write().unwrap();
let topic_schemas = schemas.entry(topic.to_string()).or_insert_with(Vec::new);
let new_schema = Schema {
id: topic_schemas.len() as u32,
version: SchemaVersion::new(topic_schemas.len() as u32 + 1),
definition: definition.to_string(),
};
if self.check_compatibility(&new_schema, topic_schemas) {
topic_schemas.push(new_schema.clone());
Ok(new_schema)
} else {
Err("The schema failed the compatibility check.".to_string())
}
}
fn check_compatibility(&self, new_schema: &Schema, existing_schemas: &[Schema]) -> bool {
if existing_schemas.is_empty() {
return true;
}
let latest_schema = existing_schemas.last().unwrap();
self.compatibility.check(new_schema, latest_schema)
}
pub fn get_schema(&self, topic: &str, version: Option<u32>) -> Option<Schema> {
let schemas = self.schemas.read().unwrap();
let topic_schemas = schemas.get(topic)?;
match version {
Some(v) => topic_schemas.iter().find(|s| s.version.major == v).cloned(),
None => topic_schemas.last().cloned(),
}
}
pub fn set_compatibility(&mut self, compatibility: Compatibility) {
self.compatibility = compatibility;
}
pub fn get_all_schemas(&self, topic: &str) -> Option<Vec<Schema>> {
let schemas = self.schemas.read().unwrap();
schemas.get(topic).cloned()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_schema_registration() {
let registry = SchemaRegistry::new();
let schema_def =
r#"{"type":"record","name":"test","fields":[{"name":"id","type":"string"}]}"#;
let result = registry.register_schema("test_topic", schema_def);
assert!(result.is_ok());
let schema = result.unwrap();
assert_eq!(schema.id, 0);
assert_eq!(schema.version.major, 1);
}
#[test]
fn test_schema_compatibility() {
let registry = SchemaRegistry::new();
let schema1 = r#"{"type":"record","name":"test","fields":[{"name":"id","type":"string"}]}"#;
let result1 = registry.register_schema("test_topic", schema1);
assert!(result1.is_ok());
let schema2 = r#"{"type":"record","name":"test","fields":[{"name":"id","type":"string"},{"name":"value","type":"string"}]}"#;
let result2 = registry.register_schema("test_topic", schema2);
assert!(result2.is_ok());
}
}