use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt;
use digest::Digest;
use failure::{Error, Fail};
use log::{debug, warn};
use regex::Regex;
use serde::{
ser::{SerializeMap, SerializeSeq},
Serialize, Serializer,
};
use serde_json::{self, Map, Value};
use crate::reader::SchemaResolver;
use crate::types;
use crate::util::MapHelper;
use failure::_core::fmt::Formatter;
use itertools::Itertools;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::convert::TryFrom;
use std::fmt::Display;
use std::rc::Rc;
use types::{DecimalValue, Value as AvroValue};
pub fn resolve_schemas(writer_schema: &Schema, reader_schema: &Schema) -> Result<Schema, Error> {
let r_indices = reader_schema.indices.clone();
let (reader_to_writer_names, writer_to_reader_names): (HashMap<_, _>, HashMap<_, _>) =
writer_schema
.indices
.iter()
.flat_map(|(name, widx)| {
r_indices
.get(name)
.map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
})
.unzip();
let reader_fullnames = reader_schema
.indices
.iter()
.map(|(f, i)| (*i, f))
.collect::<HashMap<_, _>>();
let mut resolver = SchemaResolver {
named: Default::default(),
indices: Default::default(),
writer_to_reader_names,
reader_to_writer_names,
reader_to_resolved_names: Default::default(),
reader_fullnames,
reader_schema,
};
let writer_node = writer_schema.top_node_or_named();
let reader_node = reader_schema.top_node_or_named();
let inner = resolver.resolve(writer_node, reader_node)?;
let sch = Schema {
named: resolver.named.into_iter().map(Option::unwrap).collect(),
indices: resolver.indices,
top: inner,
};
Ok(sch)
}
#[derive(Fail, Debug)]
#[fail(display = "Failed to parse schema: {}", _0)]
pub struct ParseSchemaError(String);
impl ParseSchemaError {
pub fn new<S>(msg: S) -> ParseSchemaError
where
S: Into<String>,
{
ParseSchemaError(msg.into())
}
}
pub struct SchemaFingerprint {
pub bytes: Vec<u8>,
}
impl fmt::Display for SchemaFingerprint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
self.bytes
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<Vec<String>>()
.join("")
)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum SchemaPieceOrNamed {
Piece(SchemaPiece),
Named(usize),
}
impl SchemaPieceOrNamed {
pub fn get_human_name(&self, root: &Schema) -> String {
match self {
Self::Piece(piece) => format!("{:?}", piece),
Self::Named(idx) => format!("{}", root.lookup(*idx).name),
}
}
#[inline(always)]
pub fn get_piece_and_name<'a>(
&'a self,
root: &'a Schema,
) -> (&'a SchemaPiece, Option<&'a FullName>) {
self.as_ref().get_piece_and_name(root)
}
#[inline(always)]
pub fn as_ref(&self) -> SchemaPieceRefOrNamed {
match self {
SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
}
}
}
impl From<SchemaPiece> for SchemaPieceOrNamed {
#[inline(always)]
fn from(piece: SchemaPiece) -> Self {
Self::Piece(piece)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum SchemaPiece {
Null,
Boolean,
Int,
Long,
Float,
Double,
Date,
TimestampMilli,
TimestampMicro,
Decimal {
precision: usize,
scale: usize,
fixed_size: Option<usize>,
},
Bytes,
String,
Json,
Array(Box<SchemaPieceOrNamed>),
Map(Box<SchemaPieceOrNamed>),
Union(UnionSchema),
ResolveIntTsMilli,
ResolveIntTsMicro,
ResolveDateTimestamp,
ResolveIntLong,
ResolveIntFloat,
ResolveIntDouble,
ResolveLongFloat,
ResolveLongDouble,
ResolveFloatDouble,
ResolveConcreteUnion {
index: usize,
inner: Box<SchemaPieceOrNamed>,
n_reader_variants: usize,
reader_null_variant: Option<usize>,
},
ResolveUnionUnion {
permutation: Vec<Result<(usize, SchemaPieceOrNamed), String>>,
n_reader_variants: usize,
reader_null_variant: Option<usize>,
},
ResolveUnionConcrete {
index: usize,
inner: Box<SchemaPieceOrNamed>,
},
Record {
doc: Documentation,
fields: Vec<RecordField>,
lookup: HashMap<String, usize>,
},
Enum {
doc: Documentation,
symbols: Vec<String>,
default_idx: Option<usize>,
},
Fixed { size: usize },
ResolveRecord {
defaults: Vec<ResolvedDefaultValueField>,
fields: Vec<ResolvedRecordField>,
n_reader_fields: usize,
},
ResolveEnum {
doc: Documentation,
symbols: Vec<Result<(usize, String), String>>,
default: Option<(usize, String)>,
},
}
impl SchemaPiece {
pub fn is_underlying_int(&self) -> bool {
match self {
SchemaPiece::Int | SchemaPiece::Date => true,
_ => false,
}
}
pub fn is_underlying_long(&self) -> bool {
match self {
SchemaPiece::Long | SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => true,
_ => false,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Schema {
pub(crate) named: Vec<NamedSchemaPiece>,
pub(crate) indices: HashMap<FullName, usize>,
pub top: SchemaPieceOrNamed,
}
impl Schema {
pub fn top_node(&self) -> SchemaNode {
let (inner, name) = self.top.get_piece_and_name(self);
SchemaNode {
root: self,
inner,
name,
}
}
pub fn top_node_or_named(&self) -> SchemaNodeOrNamed {
SchemaNodeOrNamed {
root: self,
inner: self.top.as_ref(),
}
}
pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
&self.named[idx]
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum SchemaKind {
Null,
Boolean,
Int,
Long,
Float,
Double,
Bytes,
String,
Array,
Map,
Union,
Record,
Enum,
Fixed,
Unknown,
}
impl<'a> From<&'a SchemaPiece> for SchemaKind {
#[inline(always)]
fn from(piece: &'a SchemaPiece) -> SchemaKind {
match piece {
SchemaPiece::Null => SchemaKind::Null,
SchemaPiece::Boolean => SchemaKind::Boolean,
SchemaPiece::Int => SchemaKind::Int,
SchemaPiece::Long => SchemaKind::Long,
SchemaPiece::Float => SchemaKind::Float,
SchemaPiece::Double => SchemaKind::Double,
SchemaPiece::Date => SchemaKind::Int,
SchemaPiece::TimestampMilli
| SchemaPiece::TimestampMicro
| SchemaPiece::ResolveIntTsMilli
| SchemaPiece::ResolveDateTimestamp
| SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
SchemaPiece::Decimal {
fixed_size: None, ..
} => SchemaKind::Bytes,
SchemaPiece::Decimal {
fixed_size: Some(_),
..
} => SchemaKind::Fixed,
SchemaPiece::Bytes => SchemaKind::Bytes,
SchemaPiece::String => SchemaKind::String,
SchemaPiece::Array(_) => SchemaKind::Array,
SchemaPiece::Map(_) => SchemaKind::Map,
SchemaPiece::Union(_) => SchemaKind::Union,
SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
SchemaPiece::ResolveIntLong => SchemaKind::Long,
SchemaPiece::ResolveIntFloat => SchemaKind::Float,
SchemaPiece::ResolveIntDouble => SchemaKind::Double,
SchemaPiece::ResolveLongFloat => SchemaKind::Float,
SchemaPiece::ResolveLongDouble => SchemaKind::Double,
SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
SchemaPiece::Record { .. } => SchemaKind::Record,
SchemaPiece::Enum { .. } => SchemaKind::Enum,
SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
SchemaPiece::Json => SchemaKind::String,
}
}
}
impl<'a> From<SchemaNode<'a>> for SchemaKind {
#[inline(always)]
fn from(schema: SchemaNode<'a>) -> SchemaKind {
SchemaKind::from(schema.inner)
}
}
impl<'a> From<&'a Schema> for SchemaKind {
#[inline(always)]
fn from(schema: &'a Schema) -> SchemaKind {
Self::from(schema.top_node())
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Name {
pub name: String,
pub namespace: Option<String>,
pub aliases: Option<Vec<String>>,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct FullName {
name: String,
namespace: String,
}
impl FullName {
pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
if let Some(ns) = namespace {
FullName {
name: name.to_owned(),
namespace: ns.to_owned(),
}
} else {
let mut split = name.rsplitn(2, '.');
let name = split.next().unwrap();
let namespace = split.next().unwrap_or(default_namespace);
FullName {
name: name.into(),
namespace: namespace.into(),
}
}
}
}
impl Display for FullName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.namespace, self.name)
}
}
pub type Documentation = Option<String>;
impl Name {
pub fn new(name: &str) -> Name {
Name {
name: name.to_owned(),
namespace: None,
aliases: None,
}
}
fn parse(complex: &Map<String, Value>) -> Result<Self, Error> {
let name = complex
.name()
.ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
if name.is_empty() {
return Err(ParseSchemaError::new(format!(
"Name cannot be the empty string: {:?}",
complex
))
.into());
}
let (namespace, name) = if let Some(index) = name.rfind('.') {
let computed_namespace = name[..index].to_owned();
let computed_name = name[index + 1..].to_owned();
if let Some(provided_namespace) = complex.string("namespace") {
if provided_namespace != computed_namespace {
warn!(
"Found dots in name {}, updating to namespace {} and name {}",
name, computed_namespace, computed_name
);
}
}
(Some(computed_namespace), computed_name)
} else {
(complex.string("namespace"), name)
};
if !Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$")
.unwrap()
.is_match(&name)
{
return Err(ParseSchemaError::new(format!(
"Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
name
))
.into());
}
let aliases: Option<Vec<String>> = complex
.get("aliases")
.and_then(|aliases| aliases.as_array())
.and_then(|aliases| {
aliases
.iter()
.map(|alias| alias.as_str())
.map(|alias| alias.map(|a| a.to_string()))
.collect::<Option<_>>()
});
Ok(Name {
name,
namespace,
aliases,
})
}
pub fn fullname(&self, default_namespace: &str) -> FullName {
FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ResolvedDefaultValueField {
pub name: String,
pub doc: Documentation,
pub default: types::Value,
pub order: RecordFieldOrder,
pub position: usize,
}
#[derive(Clone, Debug, PartialEq)]
pub enum ResolvedRecordField {
Absent(Schema),
Present(RecordField),
}
#[derive(Clone, Debug, PartialEq)]
pub struct RecordField {
pub name: String,
pub doc: Documentation,
pub default: Option<Value>,
pub schema: SchemaPieceOrNamed,
pub order: RecordFieldOrder,
pub position: usize,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum RecordFieldOrder {
Ascending,
Descending,
Ignore,
}
impl RecordField {}
#[derive(Debug, Clone)]
pub struct UnionSchema {
schemas: Vec<SchemaPieceOrNamed>,
anon_variant_index: HashMap<SchemaKind, usize>,
named_variant_index: HashMap<usize, usize>,
}
impl UnionSchema {
pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, Error> {
let mut avindex = HashMap::new();
let mut nvindex = HashMap::new();
for (i, schema) in schemas.iter().enumerate() {
match schema {
SchemaPieceOrNamed::Piece(sp) => {
if let SchemaPiece::Union(_) = sp {
return Err(ParseSchemaError::new(
"Unions may not directly contain a union",
)
.into());
}
let kind = SchemaKind::from(sp);
if avindex.insert(kind, i).is_some() {
return Err(
ParseSchemaError::new("Unions cannot contain duplicate types").into(),
);
}
}
SchemaPieceOrNamed::Named(idx) => {
if nvindex.insert(*idx, i).is_some() {
return Err(
ParseSchemaError::new("Unions cannot contain duplicate types").into(),
);
}
}
}
}
Ok(UnionSchema {
schemas,
anon_variant_index: avindex,
named_variant_index: nvindex,
})
}
pub fn variants(&self) -> &[SchemaPieceOrNamed] {
&self.schemas
}
pub fn is_nullable(&self) -> bool {
!self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
}
pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
self.anon_variant_index
.get(&SchemaKind::from(sp))
.map(|idx| (*idx, &self.schemas[*idx]))
}
pub fn match_ref(
&self,
other: SchemaPieceRefOrNamed,
names_map: &HashMap<usize, usize>,
) -> Option<(usize, &SchemaPieceOrNamed)> {
match other {
SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
SchemaPieceRefOrNamed::Named(idx) => names_map
.get(&idx)
.and_then(|idx| self.named_variant_index.get(idx))
.map(|idx| (*idx, &self.schemas[*idx])),
}
}
#[inline(always)]
pub fn match_(
&self,
other: &SchemaPieceOrNamed,
names_map: &HashMap<usize, usize>,
) -> Option<(usize, &SchemaPieceOrNamed)> {
self.match_ref(other.as_ref(), names_map)
}
}
impl PartialEq for UnionSchema {
fn eq(&self, other: &UnionSchema) -> bool {
self.schemas.eq(&other.schemas)
}
}
#[derive(Default)]
struct SchemaParser {
named: Vec<Option<NamedSchemaPiece>>,
indices: HashMap<FullName, usize>,
}
impl SchemaParser {
fn parse(mut self, value: &Value) -> Result<Schema, Error> {
let top = self.parse_inner("", value)?;
let SchemaParser { named, indices } = self;
Ok(Schema {
named: named.into_iter().map(|o| o.unwrap()).collect(),
indices,
top,
})
}
fn parse_inner(
&mut self,
default_namespace: &str,
value: &Value,
) -> Result<SchemaPieceOrNamed, Error> {
match *value {
Value::String(ref t) => {
let name = FullName::from_parts(t.as_str(), None, default_namespace);
if let Some(idx) = self.indices.get(&name) {
Ok(SchemaPieceOrNamed::Named(*idx))
} else {
Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
t.as_str(),
)?))
}
}
Value::Object(ref data) => self.parse_complex(default_namespace, data),
Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
self.parse_union(default_namespace, data)?,
)),
_ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
}
}
fn alloc_name(&mut self, fullname: FullName) -> Result<usize, Error> {
let idx = match self.indices.entry(fullname) {
Entry::Vacant(ve) => *ve.insert(self.named.len()),
Entry::Occupied(oe) => {
return Err(ParseSchemaError::new(format!(
"Sub-schema with name {} encountered multiple times",
oe.key()
))
.into())
}
};
self.named.push(None);
Ok(idx)
}
fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
assert!(self.named[index].is_none());
self.named[index] = Some(schema);
}
fn parse_named_type(
&mut self,
type_name: &str,
default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<usize, Error> {
let name = Name::parse(complex)?;
match name.name.as_str() {
"null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
return Err(ParseSchemaError::new(format!(
"{} may not be used as a custom type name",
name.name
))
.into())
}
_ => {}
};
let fullname = name.fullname(default_namespace);
let default_namespace = fullname.namespace.clone();
let idx = self.alloc_name(fullname.clone())?;
let piece = match type_name {
"record" => self.parse_record(&default_namespace, complex),
"enum" => self.parse_enum(complex),
"fixed" => self.parse_fixed(&default_namespace, complex),
_ => unreachable!("Unknown named type kind: {}", type_name),
}?;
self.insert(
idx,
NamedSchemaPiece {
name: fullname,
piece,
},
);
Ok(idx)
}
fn parse_complex(
&mut self,
default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<SchemaPieceOrNamed, Error> {
match complex.get("type") {
Some(&Value::String(ref t)) => Ok(match t.as_str() {
"record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
t,
default_namespace,
complex,
)?),
"array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
"map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
"bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
"int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
"long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
"string" => SchemaPieceOrNamed::Piece(Self::parse_string(complex)),
other => {
let name = FullName {
name: other.into(),
namespace: default_namespace.into(),
};
if let Some(idx) = self.indices.get(&name) {
SchemaPieceOrNamed::Named(*idx)
} else {
SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
}
}
}),
Some(&Value::Object(ref data)) => match data.get("type") {
Some(ref value) => self.parse_inner(default_namespace, value),
None => Err(
ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
),
},
_ => Err(ParseSchemaError::new("No `type` in complex type").into()),
}
}
fn parse_record(
&mut self,
default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<SchemaPiece, Error> {
let mut lookup = HashMap::new();
let fields: Vec<RecordField> = complex
.get("fields")
.and_then(|fields| fields.as_array())
.ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
.and_then(|fields| {
fields
.iter()
.filter_map(|field| field.as_object())
.enumerate()
.map(|(position, field)| {
self.parse_record_field(default_namespace, field, position)
})
.collect::<Result<_, _>>()
})?;
for field in &fields {
lookup.insert(field.name.clone(), field.position);
}
Ok(SchemaPiece::Record {
doc: complex.doc(),
fields,
lookup,
})
}
fn parse_record_field(
&mut self,
default_namespace: &str,
field: &Map<String, Value>,
position: usize,
) -> Result<RecordField, Error> {
let name = field
.name()
.ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
let schema = field
.get("type")
.ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
.and_then(|type_| self.parse_inner(default_namespace, type_))?;
let default = field.get("default").cloned();
let order = field
.get("order")
.and_then(|order| order.as_str())
.and_then(|order| match order {
"ascending" => Some(RecordFieldOrder::Ascending),
"descending" => Some(RecordFieldOrder::Descending),
"ignore" => Some(RecordFieldOrder::Ignore),
_ => None,
})
.unwrap_or_else(|| RecordFieldOrder::Ascending);
Ok(RecordField {
name,
doc: field.doc(),
default,
schema,
order,
position,
})
}
fn parse_enum(&mut self, complex: &Map<String, Value>) -> Result<SchemaPiece, Error> {
let symbols: Vec<String> = complex
.get("symbols")
.and_then(|v| v.as_array())
.ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
.and_then(|symbols| {
symbols
.iter()
.map(|symbol| symbol.as_str().map(|s| s.to_string()))
.collect::<Option<_>>()
.ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
})?;
let mut unique_symbols: HashSet<&String> = HashSet::new();
for symbol in symbols.iter() {
if unique_symbols.contains(symbol) {
return Err(ParseSchemaError::new(format!(
"Enum symbols must be unique, found multiple: {}",
symbol
))
.into());
} else {
unique_symbols.insert(symbol);
}
}
let default_idx = if let Some(default) = complex.get("default") {
let default_str = default.as_str().ok_or_else(|| {
ParseSchemaError::new(format!(
"Enum default should be a string, got: {:?}",
default
))
})?;
let default_idx = symbols
.iter()
.position(|x| x == default_str)
.ok_or_else(|| {
ParseSchemaError::new(format!(
"Enum default not found in list of symbols: {}",
default_str
))
})?;
Some(default_idx)
} else {
None
};
Ok(SchemaPiece::Enum {
doc: complex.doc(),
symbols,
default_idx,
})
}
fn parse_array(
&mut self,
default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<SchemaPiece, Error> {
complex
.get("items")
.ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
.and_then(|items| self.parse_inner(default_namespace, items))
.map(|schema| SchemaPiece::Array(Box::new(schema)))
}
fn parse_map(
&mut self,
default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<SchemaPiece, Error> {
complex
.get("values")
.ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
.and_then(|items| self.parse_inner(default_namespace, items))
.map(|schema| SchemaPiece::Map(Box::new(schema)))
}
fn parse_union(
&mut self,
default_namespace: &str,
items: &[Value],
) -> Result<SchemaPiece, Error> {
items
.iter()
.map(|value| self.parse_inner(default_namespace, value))
.collect::<Result<Vec<_>, _>>()
.and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
}
fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), Error> {
let precision = complex
.get("precision")
.and_then(|v| v.as_i64())
.ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
if scale < 0 {
return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
}
if precision < 0 {
return Err(
ParseSchemaError::new("Decimal precision must be greater than zero").into(),
);
}
if scale > precision {
return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
}
Ok((precision as usize, scale as usize))
}
fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, Error> {
let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
if let Some("decimal") = logical_type {
match Self::parse_decimal(complex) {
Ok((precision, scale)) => {
return Ok(SchemaPiece::Decimal {
precision,
scale,
fixed_size: None,
})
}
Err(e) => warn!(
"parsing decimal as regular bytes due to parse error: {:?}, {:?}",
complex, e
),
}
}
Ok(SchemaPiece::Bytes)
}
fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, Error> {
const AVRO_DATE: &str = "date";
const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
if let Some(name) = complex.get("connect.name") {
if name == DEBEZIUM_DATE || name == KAFKA_DATE {
if name == KAFKA_DATE {
warn!("using deprecated debezium date format");
}
return Ok(SchemaPiece::Date);
}
}
if let Some(name) = complex.get("logicalType") {
if name == AVRO_DATE {
return Ok(SchemaPiece::Date);
}
}
if !complex.is_empty() {
debug!("parsing complex type as regular int: {:?}", complex);
}
Ok(SchemaPiece::Int)
}
fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, Error> {
const AVRO_MILLI_TS: &str = "timestamp-millis";
const AVRO_MICRO_TS: &str = "timestamp-micros";
const CONNECT_MILLI_TS: &[&str] = &[
"io.debezium.time.Timestamp",
"org.apache.kafka.connect.data.Timestamp",
];
const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
if CONNECT_MILLI_TS.contains(&&**name) {
return Ok(SchemaPiece::TimestampMilli);
}
if name == CONNECT_MICRO_TS {
return Ok(SchemaPiece::TimestampMicro);
}
}
if let Some(name) = complex.get("logicalType") {
if name == AVRO_MILLI_TS {
return Ok(SchemaPiece::TimestampMilli);
}
if name == AVRO_MICRO_TS {
return Ok(SchemaPiece::TimestampMicro);
}
}
if !complex.is_empty() {
debug!("parsing complex type as regular long: {:?}", complex);
}
Ok(SchemaPiece::Long)
}
fn parse_string(complex: &Map<String, Value>) -> SchemaPiece {
const CONNECT_JSON: &str = "io.debezium.data.Json";
if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
if CONNECT_JSON == name.as_str() {
return SchemaPiece::Json;
}
}
debug!("parsing complex type as regular string: {:?}", complex);
SchemaPiece::String
}
fn parse_fixed(
&mut self,
_default_namespace: &str,
complex: &Map<String, Value>,
) -> Result<SchemaPiece, Error> {
let _name = Name::parse(complex)?;
let size = complex
.get("size")
.and_then(|v| v.as_i64())
.ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
if size <= 0 {
return Err(ParseSchemaError::new(format!(
"Fixed values require a positive size attribute, found: {}",
size
))
.into());
}
let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
if let Some("decimal") = logical_type {
match Self::parse_decimal(complex) {
Ok((precision, scale)) => {
let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
if precision > max {
warn!("Decimal precision {} requires more than {} bytes of space, parsing as fixed", precision, size);
} else {
return Ok(SchemaPiece::Decimal {
precision,
scale,
fixed_size: Some(size as usize),
});
}
}
Err(e) => warn!(
"parsing decimal as fixed due to parse error: {:?}, {:?}",
complex, e
),
}
}
Ok(SchemaPiece::Fixed {
size: size as usize,
})
}
}
impl Schema {
pub fn parse_str(input: &str) -> Result<Self, Error> {
let value = serde_json::from_str(input)?;
Self::parse(&value)
}
pub fn parse(value: &Value) -> Result<Self, Error> {
let p = SchemaParser {
named: vec![],
indices: Default::default(),
};
p.parse(value)
}
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self).unwrap();
parsing_canonical_form(&json)
}
pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
let mut d = D::new();
d.input(self.canonical_form());
SchemaFingerprint {
bytes: d.result().to_vec(),
}
}
fn parse_primitive(primitive: &str) -> Result<SchemaPiece, Error> {
match primitive {
"null" => Ok(SchemaPiece::Null),
"boolean" => Ok(SchemaPiece::Boolean),
"int" => Ok(SchemaPiece::Int),
"long" => Ok(SchemaPiece::Long),
"double" => Ok(SchemaPiece::Double),
"float" => Ok(SchemaPiece::Float),
"bytes" => Ok(SchemaPiece::Bytes),
"string" => Ok(SchemaPiece::String),
other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct NamedSchemaPiece {
pub(crate) name: FullName,
pub piece: SchemaPiece,
}
#[derive(Copy, Clone, Debug)]
pub struct SchemaNode<'a> {
pub root: &'a Schema,
pub inner: &'a SchemaPiece,
pub name: Option<&'a FullName>,
}
#[derive(Copy, Clone)]
pub enum SchemaPieceRefOrNamed<'a> {
Piece(&'a SchemaPiece),
Named(usize),
}
impl<'a> SchemaPieceRefOrNamed<'a> {
#[inline(always)]
pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
match self {
SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
SchemaPieceRefOrNamed::Named(index) => {
let named_piece = root.lookup(index);
(&named_piece.piece, Some(&named_piece.name))
}
}
}
}
#[derive(Copy, Clone)]
pub struct SchemaNodeOrNamed<'a> {
pub root: &'a Schema,
pub inner: SchemaPieceRefOrNamed<'a>,
}
impl<'a> SchemaNodeOrNamed<'a> {
#[inline(always)]
pub fn lookup(self) -> SchemaNode<'a> {
let (inner, name) = self.inner.get_piece_and_name(self.root);
SchemaNode {
root: self.root,
inner,
name,
}
}
#[inline(always)]
pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
self.step_ref(next.as_ref())
}
#[inline(always)]
pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
Self {
root: self.root,
inner: match next {
SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
},
}
}
pub fn to_schema(self) -> Schema {
let mut cloner = SchemaSubtreeDeepCloner {
old_root: self.root,
old_to_new_names: Default::default(),
named: Default::default(),
};
let piece = cloner.clone_piece_or_named(self.inner);
let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
let indices: HashMap<FullName, usize> = named
.iter()
.enumerate()
.map(|(i, nsp)| (nsp.name.clone(), i))
.collect();
Schema {
named,
indices,
top: piece,
}
}
}
struct SchemaSubtreeDeepCloner<'a> {
old_root: &'a Schema,
old_to_new_names: HashMap<usize, usize>,
named: Vec<Option<NamedSchemaPiece>>,
}
impl<'a> SchemaSubtreeDeepCloner<'a> {
fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
match piece {
SchemaPiece::Null => SchemaPiece::Null,
SchemaPiece::Boolean => SchemaPiece::Boolean,
SchemaPiece::Int => SchemaPiece::Int,
SchemaPiece::Long => SchemaPiece::Long,
SchemaPiece::Float => SchemaPiece::Float,
SchemaPiece::Double => SchemaPiece::Double,
SchemaPiece::Date => SchemaPiece::Date,
SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
SchemaPiece::Json => SchemaPiece::Json,
SchemaPiece::Decimal {
scale,
precision,
fixed_size,
} => SchemaPiece::Decimal {
scale: *scale,
precision: *precision,
fixed_size: *fixed_size,
},
SchemaPiece::Bytes => SchemaPiece::Bytes,
SchemaPiece::String => SchemaPiece::String,
SchemaPiece::Array(inner) => {
SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
}
SchemaPiece::Map(inner) => {
SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
}
SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
schemas: us
.schemas
.iter()
.map(|s| self.clone_piece_or_named(s.as_ref()))
.collect(),
anon_variant_index: us.anon_variant_index.clone(),
named_variant_index: us.named_variant_index.clone(),
}),
SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
SchemaPiece::ResolveConcreteUnion {
index,
inner,
n_reader_variants,
reader_null_variant,
} => SchemaPiece::ResolveConcreteUnion {
index: *index,
inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
n_reader_variants: *n_reader_variants,
reader_null_variant: *reader_null_variant,
},
SchemaPiece::ResolveUnionUnion {
permutation,
n_reader_variants,
reader_null_variant,
} => SchemaPiece::ResolveUnionUnion {
permutation: permutation
.clone()
.into_iter()
.map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
.collect(),
n_reader_variants: *n_reader_variants,
reader_null_variant: *reader_null_variant,
},
SchemaPiece::ResolveUnionConcrete { index, inner } => {
SchemaPiece::ResolveUnionConcrete {
index: *index,
inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
}
}
SchemaPiece::Record {
doc,
fields,
lookup,
} => SchemaPiece::Record {
doc: doc.clone(),
fields: fields
.iter()
.map(|rf| RecordField {
name: rf.name.clone(),
doc: rf.doc.clone(),
default: rf.default.clone(),
schema: self.clone_piece_or_named(rf.schema.as_ref()),
order: rf.order,
position: rf.position,
})
.collect(),
lookup: lookup.clone(),
},
SchemaPiece::Enum {
doc,
symbols,
default_idx,
} => SchemaPiece::Enum {
doc: doc.clone(),
symbols: symbols.clone(),
default_idx: *default_idx,
},
SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
SchemaPiece::ResolveRecord {
defaults,
fields,
n_reader_fields,
} => SchemaPiece::ResolveRecord {
defaults: defaults.clone(),
fields: fields
.iter()
.map(|rf| match rf {
ResolvedRecordField::Present(rf) => {
ResolvedRecordField::Present(RecordField {
name: rf.name.clone(),
doc: rf.doc.clone(),
default: rf.default.clone(),
schema: self.clone_piece_or_named(rf.schema.as_ref()),
order: rf.order,
position: rf.position,
})
}
ResolvedRecordField::Absent(writer_schema) => {
ResolvedRecordField::Absent(writer_schema.clone())
}
})
.collect(),
n_reader_fields: *n_reader_fields,
},
SchemaPiece::ResolveEnum {
doc,
symbols,
default,
} => SchemaPiece::ResolveEnum {
doc: doc.clone(),
symbols: symbols.clone(),
default: default.clone(),
},
}
}
fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
match piece {
SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
SchemaPieceRefOrNamed::Named(index) => {
let new_index = match self.old_to_new_names.entry(index) {
Entry::Vacant(ve) => {
let new_index = self.named.len();
self.named.push(None);
ve.insert(new_index);
let old_named_piece = self.old_root.lookup(index);
let new_named_piece = NamedSchemaPiece {
name: old_named_piece.name.clone(),
piece: self.clone_piece(&old_named_piece.piece),
};
self.named[new_index] = Some(new_named_piece);
new_index
}
Entry::Occupied(oe) => *oe.get(),
};
SchemaPieceOrNamed::Named(new_index)
}
}
}
}
impl<'a> SchemaNode<'a> {
#[inline(always)]
pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
let (inner, name) = next.get_piece_and_name(self.root);
Self {
root: self.root,
inner,
name,
}
}
pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
use serde_json::Value::*;
let val = match (json, self.inner) {
(json, SchemaPiece::Union(us)) => match us.schemas.first() {
Some(variant) => AvroValue::Union {
index: 0,
inner: Box::new(self.step(variant).json_to_value(json)?),
n_variants: us.schemas.len(),
null_variant: us
.schemas
.iter()
.position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
},
None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
},
(Null, SchemaPiece::Null) => AvroValue::Null,
(Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
(Number(n), piece) => {
match piece {
SchemaPiece::Int => {
let i =
n.as_i64()
.and_then(|i| i32::try_from(i).ok())
.ok_or_else(|| {
ParseSchemaError(format!("{} is not a 32-bit integer", n))
})?;
AvroValue::Int(i)
}
SchemaPiece::Long => {
let i = n.as_i64().ok_or_else(|| {
ParseSchemaError(format!("{} is not a 64-bit integer", n))
})?;
AvroValue::Long(i)
}
SchemaPiece::Float => {
AvroValue::Float(n.as_f64().unwrap() as f32)
}
SchemaPiece::Double => AvroValue::Double(n.as_f64().unwrap()),
_ => {
return Err(ParseSchemaError(format!(
"Unexpected number in default: {}",
n
)))
}
}
}
(String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
(
String(s),
SchemaPiece::Decimal {
precision, scale, ..
},
) => AvroValue::Decimal(DecimalValue {
precision: *precision,
scale: *scale,
unscaled: s.clone().into_bytes(),
}),
(String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
(Object(map), SchemaPiece::Record { fields, .. }) => {
let field_values = fields
.iter()
.map(|rf| {
let jval = map.get(&rf.name).ok_or_else(|| {
ParseSchemaError(format!(
"Field not found in default value: {}",
rf.name
))
})?;
let value = self.step(&rf.schema).json_to_value(jval)?;
Ok((rf.name.clone(), value))
})
.collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
AvroValue::Record(field_values)
}
(String(s), SchemaPiece::Enum { symbols, .. }) => {
match symbols.iter().find_position(|sym| s == *sym) {
Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
}
}
(Array(vals), SchemaPiece::Array(inner)) => {
let node = self.step(&**inner);
let vals = vals
.iter()
.map(|val| node.json_to_value(val))
.collect::<Result<Vec<_>, ParseSchemaError>>()?;
AvroValue::Array(vals)
}
(Object(map), SchemaPiece::Map(inner)) => {
let node = self.step(&**inner);
let map = map
.iter()
.map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
.collect::<Result<HashMap<_, _>, ParseSchemaError>>()?;
AvroValue::Map(map)
}
(String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
AvroValue::Fixed(*size, s.clone().into_bytes())
}
_ => {
return Err(ParseSchemaError(format!(
"Json default value {} does not match schema",
json
)))
}
};
Ok(val)
}
}
#[derive(Clone)]
struct SchemaSerContext<'a> {
node: SchemaNodeOrNamed<'a>,
seen_named: Rc<RefCell<HashMap<usize, String>>>,
}
#[derive(Clone)]
struct RecordFieldSerContext<'a> {
outer: &'a SchemaSerContext<'a>,
inner: &'a RecordField,
}
impl<'a> SchemaSerContext<'a> {
fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
Self {
node: self.node.step_ref(next),
seen_named: self.seen_named.clone(),
}
}
}
impl<'a> Serialize for SchemaSerContext<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.node.inner {
SchemaPieceRefOrNamed::Piece(piece) => match piece {
SchemaPiece::Null => serializer.serialize_str("null"),
SchemaPiece::Boolean => serializer.serialize_str("boolean"),
SchemaPiece::Int => serializer.serialize_str("int"),
SchemaPiece::Long => serializer.serialize_str("long"),
SchemaPiece::Float => serializer.serialize_str("float"),
SchemaPiece::Double => serializer.serialize_str("double"),
SchemaPiece::Date => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "int")?;
map.serialize_entry("logicalType", "date")?;
map.end()
}
SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "long")?;
if piece == &SchemaPiece::TimestampMilli {
map.serialize_entry("logicalType", "timestamp-millis")?;
} else {
map.serialize_entry("logicalType", "timestamp-micros")?;
}
map.end()
}
SchemaPiece::Decimal {
precision,
scale,
fixed_size: None,
} => {
let mut map = serializer.serialize_map(Some(4))?;
map.serialize_entry("type", "bytes")?;
map.serialize_entry("precision", precision)?;
map.serialize_entry("scale", scale)?;
map.serialize_entry("logicalType", "decimal")?;
map.end()
}
SchemaPiece::Bytes => serializer.serialize_str("bytes"),
SchemaPiece::String => serializer.serialize_str("string"),
SchemaPiece::Array(inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "array")?;
map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
map.end()
}
SchemaPiece::Map(inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "map")?;
map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
map.end()
}
SchemaPiece::Union(inner) => {
let variants = inner.variants();
let mut seq = serializer.serialize_seq(Some(variants.len()))?;
for v in variants {
seq.serialize_element(&self.step(v.as_ref()))?;
}
seq.end()
}
SchemaPiece::Json => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "string")?;
map.serialize_entry("connect.name", "io.debezium.data.Json")?;
map.end()
}
SchemaPiece::Record { .. }
| SchemaPiece::Decimal {
fixed_size: Some(_),
..
}
| SchemaPiece::Enum { .. }
| SchemaPiece::Fixed { .. } => {
unreachable!("Unexpected named schema piece in anonymous schema position")
}
SchemaPiece::ResolveIntLong
| SchemaPiece::ResolveDateTimestamp
| SchemaPiece::ResolveIntFloat
| SchemaPiece::ResolveIntDouble
| SchemaPiece::ResolveLongFloat
| SchemaPiece::ResolveLongDouble
| SchemaPiece::ResolveFloatDouble
| SchemaPiece::ResolveConcreteUnion { .. }
| SchemaPiece::ResolveUnionUnion { .. }
| SchemaPiece::ResolveUnionConcrete { .. }
| SchemaPiece::ResolveRecord { .. }
| SchemaPiece::ResolveIntTsMicro
| SchemaPiece::ResolveIntTsMilli
| SchemaPiece::ResolveEnum { .. } => {
panic!("Attempted to serialize resolved schema")
}
},
SchemaPieceRefOrNamed::Named(index) => {
let mut map = self.seen_named.borrow_mut();
let named_piece = match map.get(&index) {
Some(name) => {
return serializer.serialize_str(name.as_str());
}
None => self.node.root.lookup(index),
};
let name = named_piece.name.to_string();
map.insert(index, name.clone());
std::mem::drop(map);
match &named_piece.piece {
SchemaPiece::Record { doc, fields, .. } => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "record")?;
map.serialize_entry("name", &name)?;
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
map.serialize_entry(
"fields",
&fields
.iter()
.map(|f| RecordFieldSerContext {
outer: self,
inner: f,
})
.collect::<Vec<_>>(),
)?;
map.end()
}
SchemaPiece::Enum {
symbols,
default_idx,
..
} => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "enum")?;
map.serialize_entry("name", &name)?;
map.serialize_entry("symbols", symbols)?;
if let Some(default_idx) = *default_idx {
assert!(default_idx < symbols.len());
map.serialize_entry("default", &symbols[default_idx])?;
}
map.end()
}
SchemaPiece::Fixed { size } => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
map.serialize_entry("name", &name)?;
map.serialize_entry("size", size)?;
map.end()
}
SchemaPiece::Decimal {
scale,
precision,
fixed_size: Some(size),
} => {
let mut map = serializer.serialize_map(Some(6))?;
map.serialize_entry("type", "fixed")?;
map.serialize_entry("logicalType", "decimal")?;
map.serialize_entry("name", &name)?;
map.serialize_entry("size", size)?;
map.serialize_entry("precision", precision)?;
map.serialize_entry("scale", scale)?;
map.end()
}
SchemaPiece::Null
| SchemaPiece::Boolean
| SchemaPiece::Int
| SchemaPiece::Long
| SchemaPiece::Float
| SchemaPiece::Double
| SchemaPiece::Date
| SchemaPiece::TimestampMilli
| SchemaPiece::TimestampMicro
| SchemaPiece::Decimal {
fixed_size: None, ..
}
| SchemaPiece::Bytes
| SchemaPiece::String
| SchemaPiece::Array(_)
| SchemaPiece::Map(_)
| SchemaPiece::Union(_)
| SchemaPiece::Json => {
unreachable!("Unexpected anonymous schema piece in named schema position")
}
SchemaPiece::ResolveIntLong
| SchemaPiece::ResolveDateTimestamp
| SchemaPiece::ResolveIntFloat
| SchemaPiece::ResolveIntDouble
| SchemaPiece::ResolveLongFloat
| SchemaPiece::ResolveLongDouble
| SchemaPiece::ResolveFloatDouble
| SchemaPiece::ResolveConcreteUnion { .. }
| SchemaPiece::ResolveUnionUnion { .. }
| SchemaPiece::ResolveUnionConcrete { .. }
| SchemaPiece::ResolveRecord { .. }
| SchemaPiece::ResolveIntTsMilli
| SchemaPiece::ResolveIntTsMicro
| SchemaPiece::ResolveEnum { .. } => {
panic!("Attempted to serialize resolved schema")
}
}
}
}
}
}
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let ctx = SchemaSerContext {
node: SchemaNodeOrNamed {
root: self,
inner: self.top.as_ref(),
},
seen_named: Rc::new(RefCell::new(Default::default())),
};
ctx.serialize(serializer)
}
}
impl<'a> Serialize for RecordFieldSerContext<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("name", &self.inner.name)?;
map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
if let Some(default) = &self.inner.default {
map.serialize_entry("default", default)?;
}
map.end()
}
}
fn parsing_canonical_form(schema: &serde_json::Value) -> String {
match schema {
serde_json::Value::Object(map) => pcf_map(map),
serde_json::Value::String(s) => pcf_string(s),
serde_json::Value::Array(v) => pcf_array(v),
_ => unreachable!(),
}
}
fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
let ns = schema.get("namespace").and_then(|v| v.as_str());
let mut fields = Vec::new();
for (k, v) in schema {
if schema.len() == 1 && k == "type" {
if let serde_json::Value::String(s) = v {
return pcf_string(s);
}
}
if field_ordering_position(k).is_none() {
continue;
}
if k == "name" {
let name = v.as_str().unwrap();
let n = match ns {
Some(namespace) if !name.contains('.') => {
Cow::Owned(format!("{}.{}", namespace, name))
}
_ => Cow::Borrowed(name),
};
fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
continue;
}
if k == "size" {
let i = match v.as_str() {
Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
None => v.as_i64().unwrap(),
};
fields.push((k, format!("{}:{}", pcf_string(k), i)));
continue;
}
fields.push((
k,
format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
));
}
fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
let inter = fields
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>()
.join(",");
format!("{{{}}}", inter)
}
fn pcf_array(arr: &[serde_json::Value]) -> String {
let inter = arr
.iter()
.map(parsing_canonical_form)
.collect::<Vec<String>>()
.join(",");
format!("[{}]", inter)
}
fn pcf_string(s: &str) -> String {
format!("\"{}\"", s)
}
fn field_ordering_position(field: &str) -> Option<usize> {
let v = match field {
"name" => 1,
"type" => 2,
"fields" => 3,
"symbols" => 4,
"items" => 5,
"values" => 6,
"size" => 7,
_ => return None,
};
Some(v)
}
#[cfg(test)]
mod tests {
use super::*;
fn check_schema(schema: &str, expected: SchemaPiece) {
let schema = Schema::parse_str(schema).unwrap();
assert_eq!(&expected, schema.top_node().inner);
let schema = serde_json::to_string(&schema).unwrap();
let schema = Schema::parse_str(&schema).unwrap();
assert_eq!(&expected, schema.top_node().inner);
}
#[test]
fn test_primitive_schema() {
check_schema("\"null\"", SchemaPiece::Null);
check_schema("\"int\"", SchemaPiece::Int);
check_schema("\"double\"", SchemaPiece::Double);
}
#[test]
fn test_array_schema() {
check_schema(
r#"{"type": "array", "items": "string"}"#,
SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
);
}
#[test]
fn test_map_schema() {
check_schema(
r#"{"type": "map", "values": "double"}"#,
SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
);
}
#[test]
fn test_union_schema() {
check_schema(
r#"["null", "int"]"#,
SchemaPiece::Union(
UnionSchema::new(vec![
SchemaPieceOrNamed::Piece(SchemaPiece::Null),
SchemaPieceOrNamed::Piece(SchemaPiece::Int),
])
.unwrap(),
),
);
}
#[test]
fn test_multi_union_schema() {
let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
assert!(schema.is_ok());
let schema = schema.unwrap();
let node = schema.top_node();
assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
let union_schema = match node.inner {
SchemaPiece::Union(u) => u,
_ => unreachable!(),
};
assert_eq!(union_schema.variants().len(), 5);
let mut variants = union_schema.variants().iter();
assert_eq!(
SchemaKind::from(node.step(variants.next().unwrap())),
SchemaKind::Null
);
assert_eq!(
SchemaKind::from(node.step(variants.next().unwrap())),
SchemaKind::Int
);
assert_eq!(
SchemaKind::from(node.step(variants.next().unwrap())),
SchemaKind::Float
);
assert_eq!(
SchemaKind::from(node.step(variants.next().unwrap())),
SchemaKind::String
);
assert_eq!(
SchemaKind::from(node.step(variants.next().unwrap())),
SchemaKind::Bytes
);
assert_eq!(variants.next(), None);
}
#[test]
fn test_record_schema() {
let schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let mut lookup = HashMap::new();
lookup.insert("a".to_owned(), 0);
lookup.insert("b".to_owned(), 1);
let expected = SchemaPiece::Record {
doc: None,
fields: vec![
RecordField {
name: "a".to_string(),
doc: None,
default: Some(Value::Number(42i64.into())),
schema: SchemaPiece::Long.into(),
order: RecordFieldOrder::Ascending,
position: 0,
},
RecordField {
name: "b".to_string(),
doc: None,
default: None,
schema: SchemaPiece::String.into(),
order: RecordFieldOrder::Ascending,
position: 1,
},
],
lookup,
};
check_schema(schema, expected);
}
#[test]
fn test_enum_schema() {
let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
let expected = SchemaPiece::Enum {
doc: None,
symbols: vec![
"diamonds".to_owned(),
"spades".to_owned(),
"jokers".to_owned(),
"clubs".to_owned(),
"hearts".to_owned(),
],
default_idx: Some(2),
};
check_schema(schema, expected);
let bad_schema = Schema::parse_str(
r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
);
assert!(bad_schema.is_err());
}
#[test]
fn test_fixed_schema() {
let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
let expected = SchemaPiece::Fixed { size: 16usize };
check_schema(schema, expected);
}
#[test]
fn test_date_schema() {
let kinds = &[
r#"{
"type": "int",
"name": "datish",
"logicalType": "date"
}"#,
r#"{
"type": "int",
"name": "datish",
"connect.name": "io.debezium.time.Date"
}"#,
r#"{
"type": "int",
"name": "datish",
"connect.name": "org.apache.kafka.connect.data.Date"
}"#,
];
for kind in kinds {
check_schema(*kind, SchemaPiece::Date);
let schema = Schema::parse_str(*kind).unwrap();
assert_eq!(
serde_json::to_string(&schema).unwrap(),
r#"{"type":"int","logicalType":"date"}"#
);
}
}
#[test]
fn test_decimal_schemas() {
let schema = r#"{
"type": "fixed",
"name": "dec",
"size": 8,
"logicalType": "decimal",
"precision": 12,
"scale": 5
}"#;
let expected = SchemaPiece::Decimal {
precision: 12,
scale: 5,
fixed_size: Some(8),
};
check_schema(schema, expected);
let schema = r#"{
"type": "bytes",
"logicalType": "decimal",
"precision": 12,
"scale": 5
}"#;
let expected = SchemaPiece::Decimal {
precision: 12,
scale: 5,
fixed_size: None,
};
check_schema(schema, expected);
let res = Schema::parse_str(
r#"["bytes", {
"type": "bytes",
"logicalType": "decimal",
"precision": 12,
"scale": 5
}]"#,
);
assert_eq!(
res.unwrap_err().to_string(),
"Failed to parse schema: Unions cannot contain duplicate types"
);
let writer_schema = Schema::parse_str(
r#"["null", {
"type": "bytes"
}]"#,
)
.unwrap();
let reader_schema = Schema::parse_str(
r#"["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 12,
"scale": 5
}]"#,
)
.unwrap();
let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
let expected = SchemaPiece::ResolveUnionUnion {
permutation: vec![
Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
Ok((
1,
SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
precision: 12,
scale: 5,
fixed_size: None,
}),
)),
],
n_reader_variants: 2,
reader_null_variant: Some(0),
};
assert_eq!(resolved.top_node().inner, &expected);
}
#[test]
fn test_no_documentation() {
let schema =
Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
.unwrap();
let doc = match schema.top_node().inner {
SchemaPiece::Enum { doc, .. } => doc.clone(),
_ => panic!(),
};
assert!(doc.is_none());
}
#[test]
fn test_documentation() {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
).unwrap();
let doc = match schema.top_node().inner {
SchemaPiece::Enum { doc, .. } => doc.clone(),
_ => None,
};
assert_eq!("Some documentation".to_owned(), doc.unwrap());
}
#[test]
fn test_namespaces_and_names() {
let schema = Schema::parse_str(
r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
)
.unwrap();
assert_eq!(schema.named.len(), 1);
assert_eq!(
schema.named[0].name,
FullName {
name: "name".into(),
namespace: "namespace".into()
}
);
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#,
)
.unwrap();
assert_eq!(schema.named.len(), 1);
assert_eq!(
schema.named[0].name,
FullName {
name: "dots".into(),
namespace: "name.has".into()
}
);
let schema = Schema::parse_str(
r#"{"type": "enum", "namespace": "namespace",
"name": "name.has.dots", "symbols": ["A", "B"]}"#,
)
.unwrap();
assert_eq!(schema.named.len(), 1);
assert_eq!(
schema.named[0].name,
FullName {
name: "dots".into(),
namespace: "name.has".into()
}
);
let schema = Schema::parse_str(
r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
"fields": [{"name": "name", "type": "string"}]}"#,
)
.unwrap();
assert_eq!(schema.named.len(), 1);
assert_eq!(
schema.named[0].name,
FullName {
name: "TestDoc".into(),
namespace: "".into()
}
);
let schema = Schema::parse_str(
r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
"fields": [{"name": "name", "type": "string"}]}"#,
)
.unwrap();
assert_eq!(schema.named.len(), 1);
assert_eq!(
schema.named[0].name,
FullName {
name: "TestDoc".into(),
namespace: "".into()
}
);
let first = Schema::parse_str(
r#"{"type": "fixed", "namespace": "namespace",
"name": "name", "size": 1}"#,
)
.unwrap();
let second = Schema::parse_str(
r#"{"type": "fixed", "name": "namespace.name",
"size": 1}"#,
)
.unwrap();
assert_eq!(first.named[0].name, second.named[0].name);
let first = Schema::parse_str(
r#"{"type": "fixed", "namespace": "namespace",
"name": "name", "size": 1}"#,
)
.unwrap();
let second = Schema::parse_str(
r#"{"type": "fixed", "name": "namespace.Name",
"size": 1}"#,
)
.unwrap();
assert_ne!(first.named[0].name, second.named[0].name);
let first = Schema::parse_str(
r#"{"type": "fixed", "namespace": "Namespace",
"name": "name", "size": 1}"#,
)
.unwrap();
let second = Schema::parse_str(
r#"{"type": "fixed", "namespace": "namespace",
"name": "name", "size": 1}"#,
)
.unwrap();
assert_ne!(first.named[0].name, second.named[0].name);
assert!(Schema::parse_str(
r#"{"type": "record", "name": "99 problems but a name aint one",
"fields": [{"name": "name", "type": "string"}]}"#
)
.is_err());
assert!(Schema::parse_str(
r#"{"type": "record", "name": "!!!",
"fields": [{"name": "name", "type": "string"}]}"#
)
.is_err());
assert!(Schema::parse_str(
r#"{"type": "record", "name": "_valid_until_©",
"fields": [{"name": "name", "type": "string"}]}"#
)
.is_err());
}
#[test]
fn test_schema_is_send() {
fn send<S: Send>(_s: S) {}
let schema = Schema {
named: vec![],
indices: Default::default(),
top: SchemaPiece::Null.into(),
};
send(schema);
}
#[test]
fn test_schema_is_sync() {
fn sync<S: Sync>(_s: S) {}
let schema = Schema {
named: vec![],
indices: Default::default(),
top: SchemaPiece::Null.into(),
};
sync(&schema);
sync(schema);
}
#[test]
fn test_schema_fingerprint() {
use md5::Md5;
use sha2::Sha256;
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let schema = Schema::parse_str(raw_schema).unwrap();
assert_eq!(
"5ecb2d1f0eaa647d409e6adbd5d70cd274d85802aa9167f5fe3b73ba70b32c76",
format!("{}", schema.fingerprint::<Sha256>())
);
assert_eq!(
"a2c99a3f40ea2eea32593d63b483e962",
format!("{}", schema.fingerprint::<Md5>())
);
}
}