use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use crate::error::{Result, SchemaRegError};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[cfg_attr(feature = "confluent", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "confluent", serde(transparent))]
pub struct SchemaId(u32);
impl SchemaId {
#[inline]
pub fn new(id: u32) -> Self {
Self(id)
}
#[inline]
pub fn as_u32(self) -> u32 {
self.0
}
}
impl From<u32> for SchemaId {
#[inline]
fn from(v: u32) -> Self {
Self(v)
}
}
impl From<SchemaId> for u32 {
#[inline]
fn from(v: SchemaId) -> Self {
v.0
}
}
impl PartialEq<u32> for SchemaId {
fn eq(&self, other: &u32) -> bool {
self.0 == *other
}
}
impl PartialEq<SchemaId> for u32 {
fn eq(&self, other: &SchemaId) -> bool {
*self == other.0
}
}
impl fmt::Display for SchemaId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[cfg_attr(feature = "confluent", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "confluent", serde(transparent))]
pub struct SchemaVersion(i32);
impl SchemaVersion {
#[inline]
pub fn new(v: i32) -> Self {
Self(v)
}
#[inline]
pub fn as_i32(self) -> i32 {
self.0
}
}
impl From<i32> for SchemaVersion {
#[inline]
fn from(v: i32) -> Self {
Self(v)
}
}
impl From<SchemaVersion> for i32 {
#[inline]
fn from(v: SchemaVersion) -> Self {
v.0
}
}
impl PartialEq<i32> for SchemaVersion {
fn eq(&self, other: &i32) -> bool {
self.0 == *other
}
}
impl PartialEq<SchemaVersion> for i32 {
fn eq(&self, other: &SchemaVersion) -> bool {
*self == other.0
}
}
impl fmt::Display for SchemaVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SchemaType {
Avro,
Protobuf,
Json,
}
impl SchemaType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Avro => "AVRO",
Self::Protobuf => "PROTOBUF",
Self::Json => "JSON",
}
}
}
impl fmt::Display for SchemaType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for SchemaType {
type Err = SchemaRegError;
fn from_str(s: &str) -> Result<Self> {
if s.eq_ignore_ascii_case("AVRO") {
Ok(Self::Avro)
} else if s.eq_ignore_ascii_case("PROTOBUF") {
Ok(Self::Protobuf)
} else if s.eq_ignore_ascii_case("JSON") {
Ok(Self::Json)
} else {
Err(SchemaRegError::invalid_state(format!(
"unknown schema type: '{s}'"
)))
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaReference {
pub name: String,
pub subject: String,
pub version: SchemaVersion,
}
impl SchemaReference {
pub fn new(
name: impl Into<String>,
subject: impl Into<String>,
version: impl Into<SchemaVersion>,
) -> Self {
Self {
name: name.into(),
subject: subject.into(),
version: version.into(),
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Schema {
pub id: SchemaId,
pub schema_type: SchemaType,
pub schema: Arc<str>,
pub version: Option<SchemaVersion>,
pub subject: Option<Arc<str>>,
pub references: Vec<SchemaReference>,
}
impl Schema {
pub fn new(
id: impl Into<SchemaId>,
schema_type: SchemaType,
schema: impl Into<Arc<str>>,
) -> Self {
Self {
id: id.into(),
schema_type,
schema: schema.into(),
version: None,
subject: None,
references: Vec::new(),
}
}
pub fn with_subject(
mut self,
subject: impl Into<Arc<str>>,
version: impl Into<SchemaVersion>,
) -> Self {
self.subject = Some(subject.into());
self.version = Some(version.into());
self
}
pub fn with_references(mut self, references: Vec<SchemaReference>) -> Self {
self.references = references;
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CompatibilityLevel {
Backward,
BackwardTransitive,
Forward,
ForwardTransitive,
Full,
FullTransitive,
None,
}
impl CompatibilityLevel {
pub fn as_str(self) -> &'static str {
match self {
Self::Backward => "BACKWARD",
Self::BackwardTransitive => "BACKWARD_TRANSITIVE",
Self::Forward => "FORWARD",
Self::ForwardTransitive => "FORWARD_TRANSITIVE",
Self::Full => "FULL",
Self::FullTransitive => "FULL_TRANSITIVE",
Self::None => "NONE",
}
}
}
impl fmt::Display for CompatibilityLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for CompatibilityLevel {
type Err = SchemaRegError;
fn from_str(s: &str) -> Result<Self> {
match s {
"BACKWARD" => Ok(Self::Backward),
"BACKWARD_TRANSITIVE" => Ok(Self::BackwardTransitive),
"FORWARD" => Ok(Self::Forward),
"FORWARD_TRANSITIVE" => Ok(Self::ForwardTransitive),
"FULL" => Ok(Self::Full),
"FULL_TRANSITIVE" => Ok(Self::FullTransitive),
"NONE" => Ok(Self::None),
_ => Err(SchemaRegError::invalid_state(format!(
"unknown compatibility level: '{s}'"
))),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum EncodeTarget {
Key,
#[default]
Value,
}
impl EncodeTarget {
#[inline]
pub fn is_key(self) -> bool {
matches!(self, Self::Key)
}
}
impl std::fmt::Display for EncodeTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Key => f.write_str("key"),
Self::Value => f.write_str("value"),
}
}
}
const _: () = {
fn assert_send_sync<T: Send + Sync + 'static>() {}
fn check() {
assert_send_sync::<Schema>();
assert_send_sync::<SchemaReference>();
assert_send_sync::<ArtifactId>();
assert_send_sync::<CompatibilityLevel>();
}
let _ = check;
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ArtifactId {
pub group: String,
pub artifact: String,
}
impl ArtifactId {
pub fn new(group: impl Into<String>, artifact: impl Into<String>) -> Self {
Self {
group: group.into(),
artifact: artifact.into(),
}
}
pub fn default_group(artifact: impl Into<String>) -> Self {
Self::new("default", artifact)
}
pub fn from_subject(subject: &str) -> Self {
match subject.split_once('/') {
Some((group, artifact)) => Self::new(group, artifact),
None => Self::default_group(subject),
}
}
pub fn to_subject(&self) -> String {
format!("{}/{}", self.group, self.artifact)
}
}
impl fmt::Display for ArtifactId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.group, self.artifact)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_schema_type_display() {
assert_eq!(SchemaType::Avro.to_string(), "AVRO");
assert_eq!(SchemaType::Protobuf.to_string(), "PROTOBUF");
assert_eq!(SchemaType::Json.to_string(), "JSON");
}
#[test]
fn test_schema_type_from_str() {
assert_eq!("AVRO".parse::<SchemaType>().unwrap(), SchemaType::Avro);
assert_eq!(
"PROTOBUF".parse::<SchemaType>().unwrap(),
SchemaType::Protobuf
);
assert_eq!("JSON".parse::<SchemaType>().unwrap(), SchemaType::Json);
}
#[test]
fn test_schema_type_from_str_case_insensitive() {
assert_eq!("avro".parse::<SchemaType>().unwrap(), SchemaType::Avro);
}
#[test]
fn test_schema_type_from_str_unknown() {
let result = "XML".parse::<SchemaType>();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("XML"));
}
#[test]
fn test_schema_new() {
let s = Schema::new(1u32, SchemaType::Avro, r#"{"type":"string"}"#);
assert_eq!(s.id, SchemaId::new(1));
assert_eq!(s.schema_type, SchemaType::Avro);
assert_eq!(s.schema, Arc::from(r#"{"type":"string"}"#));
assert_eq!(s.version, None);
assert_eq!(s.subject, None);
assert!(s.references.is_empty());
}
#[test]
fn test_schema_with_subject() {
let s = Schema::new(1u32, SchemaType::Avro, "{}").with_subject("my-topic-value", 3i32);
assert_eq!(s.subject.as_deref(), Some("my-topic-value"));
assert_eq!(s.version, Some(SchemaVersion::new(3)));
}
#[test]
fn test_schema_with_references() {
let refs = vec![SchemaReference::new("Ref", "ref-subject", 1i32)];
let s = Schema::new(1u32, SchemaType::Avro, "{}").with_references(refs.clone());
assert_eq!(s.references, refs);
}
#[test]
fn test_schema_reference_new() {
let r = SchemaReference::new("com.example.Address", "address-value", 2i32);
assert_eq!(r.name, "com.example.Address");
assert_eq!(r.subject, "address-value");
assert_eq!(r.version, SchemaVersion::new(2));
}
#[test]
fn test_schema_id_newtype() {
let id: SchemaId = 42u32.into();
assert_eq!(id.as_u32(), 42);
assert_eq!(u32::from(id), 42);
assert_eq!(id.to_string(), "42");
}
#[test]
fn test_schema_version_newtype() {
let v: SchemaVersion = 3i32.into();
assert_eq!(v.as_i32(), 3);
assert_eq!(i32::from(v), 3);
assert_eq!(v.to_string(), "3");
}
}