use super::types::{
FieldDescriptor, FieldType, FieldValue, MessageDescriptor, SchemaRegistryError,
SchemaRegistryResult,
};
use std::collections::HashSet;
pub const MAX_FIELD_NUMBER: u32 = 536_870_911;
pub const RESERVED_RANGE_START: u32 = 19_000;
pub const RESERVED_RANGE_END: u32 = 19_999;
pub fn validate_descriptor(desc: &MessageDescriptor) -> SchemaRegistryResult<()> {
if desc.name.is_empty() {
return Err(SchemaRegistryError::Validation(
"message name must not be empty".to_string(),
));
}
let mut seen_numbers: HashSet<u32> = HashSet::new();
let mut seen_names: HashSet<&str> = HashSet::new();
for field in &desc.fields {
validate_field_number(field)?;
if !seen_numbers.insert(field.field_number) {
return Err(SchemaRegistryError::Validation(format!(
"duplicate field number {} in message '{}'",
field.field_number, desc.name
)));
}
if !seen_names.insert(field.name.as_str()) {
return Err(SchemaRegistryError::Validation(format!(
"duplicate field name '{}' in message '{}'",
field.name, desc.name
)));
}
validate_field_name(field)?;
}
Ok(())
}
fn validate_field_number(field: &FieldDescriptor) -> SchemaRegistryResult<()> {
if field.field_number == 0 {
return Err(SchemaRegistryError::Validation(format!(
"field '{}': field number 0 is not allowed (must be ≥ 1)",
field.name
)));
}
if field.field_number > MAX_FIELD_NUMBER {
return Err(SchemaRegistryError::Validation(format!(
"field '{}': field number {} exceeds the maximum of {}",
field.name, field.field_number, MAX_FIELD_NUMBER
)));
}
if (RESERVED_RANGE_START..=RESERVED_RANGE_END).contains(&field.field_number) {
return Err(SchemaRegistryError::Validation(format!(
"field '{}': field number {} is in the reserved range [{}, {}]",
field.name, field.field_number, RESERVED_RANGE_START, RESERVED_RANGE_END
)));
}
Ok(())
}
fn validate_field_name(field: &FieldDescriptor) -> SchemaRegistryResult<()> {
if field.name.is_empty() {
return Err(SchemaRegistryError::Validation(format!(
"field number {}: field name must not be empty",
field.field_number
)));
}
Ok(())
}
pub fn validate_field_value(field: &FieldDescriptor, value: &FieldValue) -> bool {
type_compatible(&field.field_type, value)
}
fn type_compatible(ft: &FieldType, value: &FieldValue) -> bool {
match (ft, value) {
(FieldType::Int32, FieldValue::Int32(_)) => true,
(FieldType::Int64, FieldValue::Int64(_)) => true,
(FieldType::UInt32, FieldValue::UInt32(_)) => true,
(FieldType::UInt64, FieldValue::UInt64(_)) => true,
(FieldType::Float, FieldValue::Float(_)) => true,
(FieldType::Double, FieldValue::Double(_)) => true,
(FieldType::Bool, FieldValue::Bool(_)) => true,
(FieldType::String, FieldValue::Str(_)) => true,
(FieldType::Bytes, FieldValue::Bytes(_)) => true,
(FieldType::Message(_), FieldValue::Message(_)) => true,
(FieldType::Int64, FieldValue::Int32(_)) => true,
(FieldType::UInt64, FieldValue::UInt32(_)) => true,
(FieldType::Double, FieldValue::Float(_)) => true,
(FieldType::Repeated(_), FieldValue::Bytes(_)) => true,
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema_registry::types::{FieldDescriptor, FieldType, MessageDescriptor};
fn make_descriptor(fields: Vec<FieldDescriptor>) -> MessageDescriptor {
MessageDescriptor {
name: "TestMsg".to_string(),
package: "test".to_string(),
fields,
}
}
#[test]
fn test_valid_descriptor_passes() {
let desc = make_descriptor(vec![
FieldDescriptor::optional(1, "id", FieldType::Int64),
FieldDescriptor::optional(2, "name", FieldType::String),
]);
assert!(validate_descriptor(&desc).is_ok());
}
#[test]
fn test_duplicate_field_number_rejected() {
let desc = make_descriptor(vec![
FieldDescriptor::optional(1, "a", FieldType::Int32),
FieldDescriptor::optional(1, "b", FieldType::Int32),
]);
let err = validate_descriptor(&desc).unwrap_err();
assert!(matches!(err, SchemaRegistryError::Validation(_)));
assert!(err.to_string().contains("duplicate field number"));
}
#[test]
fn test_duplicate_field_name_rejected() {
let desc = make_descriptor(vec![
FieldDescriptor::optional(1, "value", FieldType::Int32),
FieldDescriptor::optional(2, "value", FieldType::Int64),
]);
let err = validate_descriptor(&desc).unwrap_err();
assert!(matches!(err, SchemaRegistryError::Validation(_)));
assert!(err.to_string().contains("duplicate field name"));
}
#[test]
fn test_field_number_zero_rejected() {
let desc = make_descriptor(vec![FieldDescriptor::optional(0, "bad", FieldType::Bool)]);
assert!(validate_descriptor(&desc).is_err());
}
#[test]
fn test_field_number_exceeds_max_rejected() {
let desc = make_descriptor(vec![FieldDescriptor::optional(
MAX_FIELD_NUMBER + 1,
"big",
FieldType::Bool,
)]);
assert!(validate_descriptor(&desc).is_err());
}
#[test]
fn test_reserved_range_rejected() {
for n in [19_000u32, 19_500, 19_999] {
let desc = make_descriptor(vec![FieldDescriptor::optional(
n,
"reserved",
FieldType::Bool,
)]);
let err = validate_descriptor(&desc).unwrap_err();
assert!(
err.to_string().contains("reserved range"),
"field_number={n}"
);
}
}
#[test]
fn test_boundary_just_outside_reserved_range_ok() {
let below = make_descriptor(vec![FieldDescriptor::optional(
18_999,
"below",
FieldType::Bool,
)]);
assert!(validate_descriptor(&below).is_ok());
let above = make_descriptor(vec![FieldDescriptor::optional(
20_000,
"above",
FieldType::Bool,
)]);
assert!(validate_descriptor(&above).is_ok());
}
#[test]
fn test_empty_message_name_rejected() {
let desc = MessageDescriptor {
name: std::string::String::new(),
package: "pkg".to_string(),
fields: vec![],
};
assert!(validate_descriptor(&desc).is_err());
}
#[test]
fn test_validate_field_value_exact_types() {
let fd = FieldDescriptor::optional(1, "x", FieldType::Int32);
assert!(validate_field_value(&fd, &FieldValue::Int32(42)));
assert!(!validate_field_value(&fd, &FieldValue::Int64(42)));
}
#[test]
fn test_validate_field_value_widening_allowed() {
let fd = FieldDescriptor::optional(1, "x", FieldType::Int64);
assert!(validate_field_value(&fd, &FieldValue::Int32(42)));
}
#[test]
fn test_validate_field_value_repeated_bytes() {
let fd =
FieldDescriptor::optional(1, "items", FieldType::Repeated(Box::new(FieldType::Int32)));
assert!(validate_field_value(
&fd,
&FieldValue::Bytes(vec![0x01, 0x02])
));
}
}