use crate::schema::registry::Schema;
use serde_json::Value;
#[derive(Debug, Clone, Copy)]
pub enum Compatibility {
BACKWARD,
FORWARD,
FULL,
NONE,
}
impl Compatibility {
pub fn check(&self, new_schema: &Schema, old_schema: &Schema) -> bool {
match self {
Compatibility::BACKWARD => Self::check_backward(new_schema, old_schema),
Compatibility::FORWARD => Self::check_forward(new_schema, old_schema),
Compatibility::FULL => {
Self::check_backward(new_schema, old_schema)
&& Self::check_forward(new_schema, old_schema)
}
Compatibility::NONE => true,
}
}
fn check_backward(new_schema: &Schema, old_schema: &Schema) -> bool {
if let (Ok(new_json), Ok(old_json)) = (
serde_json::from_str::<Value>(&new_schema.definition),
serde_json::from_str::<Value>(&old_schema.definition),
) {
Self::contains_all_required_fields(&new_json, &old_json)
} else {
false
}
}
fn check_forward(new_schema: &Schema, old_schema: &Schema) -> bool {
if let (Ok(new_json), Ok(old_json)) = (
serde_json::from_str::<Value>(&new_schema.definition),
serde_json::from_str::<Value>(&old_schema.definition),
) {
Self::new_fields_are_optional(&new_json, &old_json)
} else {
false
}
}
fn contains_all_required_fields(new_schema: &Value, old_schema: &Value) -> bool {
if let (Some(new_fields), Some(old_fields)) = (
new_schema.get("fields").and_then(Value::as_array),
old_schema.get("fields").and_then(Value::as_array),
) {
old_fields.iter().all(|old_field| {
let old_name = old_field.get("name").and_then(Value::as_str);
new_fields.iter().any(|new_field| {
let new_name = new_field.get("name").and_then(Value::as_str);
old_name == new_name
})
})
} else {
true }
}
fn new_fields_are_optional(new_schema: &Value, old_schema: &Value) -> bool {
if let (Some(new_fields), Some(old_fields)) = (
new_schema.get("fields").and_then(Value::as_array),
old_schema.get("fields").and_then(Value::as_array),
) {
new_fields.iter().all(|new_field| {
let new_name = new_field.get("name").and_then(Value::as_str);
old_fields.iter().any(|old_field| {
let old_name = old_field.get("name").and_then(Value::as_str);
new_name == old_name || new_field.get("default").is_some()
})
})
} else {
true
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backward_compatibility() {
let old_schema = Schema {
id: 1,
version: crate::schema::version::SchemaVersion::new(1),
definition: r#"{"type":"record","fields":[{"name":"id","type":"string"}]}"#.to_string(),
};
let new_schema = Schema {
id: 2,
version: crate::schema::version::SchemaVersion::new(2),
definition: r#"{"type":"record","fields":[{"name":"id","type":"string"},{"name":"value","type":"string","default":""}]}"#.to_string(),
};
let compatibility = Compatibility::BACKWARD;
assert!(compatibility.check(&new_schema, &old_schema));
}
}