use std::borrow::Cow;
use std::collections::HashMap;
use failure::Error;
use serde::ser::{Serialize, SerializeMap, SerializeSeq, Serializer};
use serde_json::{self, Map, Value};
use types;
use util::MapHelper;
#[derive(Fail, Debug)]
#[fail(display = "Failed to parse schema: {}", _0)]
pub struct ParseSchemaError(String);
impl ParseSchemaError {
pub fn new<S>(msg: S) -> ParseSchemaError
where
S: Into<String>,
{
ParseSchemaError(msg.into())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Schema {
Null,
Boolean,
Int,
Long,
Float,
Double,
Bytes,
String,
Array(Box<Schema>),
Map(Box<Schema>),
Union(UnionSchema),
Record {
name: Name,
doc: Documentation,
fields: Vec<RecordField>,
lookup: HashMap<String, usize>,
},
Enum {
name: Name,
doc: Documentation,
symbols: Vec<String>,
},
Fixed { name: Name, size: usize },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum SchemaKind {
Null,
Boolean,
Int,
Long,
Float,
Double,
Bytes,
String,
Array,
Map,
Union,
Record,
Enum,
Fixed,
}
impl<'a> From<&'a Schema> for SchemaKind {
#[inline(always)]
fn from(schema: &'a Schema) -> SchemaKind {
match schema {
Schema::Null => SchemaKind::Null,
Schema::Boolean => SchemaKind::Boolean,
Schema::Int => SchemaKind::Int,
Schema::Long => SchemaKind::Long,
Schema::Float => SchemaKind::Float,
Schema::Double => SchemaKind::Double,
Schema::Bytes => SchemaKind::Bytes,
Schema::String => SchemaKind::String,
Schema::Array(_) => SchemaKind::Array,
Schema::Map(_) => SchemaKind::Map,
Schema::Union(_) => SchemaKind::Union,
Schema::Record { .. } => SchemaKind::Record,
Schema::Enum { .. } => SchemaKind::Enum,
Schema::Fixed { .. } => SchemaKind::Fixed,
}
}
}
impl<'a> From<&'a types::Value> for SchemaKind {
#[inline(always)]
fn from(value: &'a types::Value) -> SchemaKind {
match value {
types::Value::Null => SchemaKind::Null,
types::Value::Boolean(_) => SchemaKind::Boolean,
types::Value::Int(_) => SchemaKind::Int,
types::Value::Long(_) => SchemaKind::Long,
types::Value::Float(_) => SchemaKind::Float,
types::Value::Double(_) => SchemaKind::Double,
types::Value::Bytes(_) => SchemaKind::Bytes,
types::Value::String(_) => SchemaKind::String,
types::Value::Array(_) => SchemaKind::Array,
types::Value::Map(_) => SchemaKind::Map,
types::Value::Union(_) => SchemaKind::Union,
types::Value::Record(_) => SchemaKind::Record,
types::Value::Enum(_, _) => SchemaKind::Enum,
types::Value::Fixed(_, _) => SchemaKind::Fixed,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Name {
pub name: String,
pub namespace: Option<String>,
pub aliases: Option<Vec<String>>,
}
pub type Documentation = Option<String>;
impl Name {
pub fn new(name: &str) -> Name {
Name {
name: name.to_owned(),
namespace: None,
aliases: None,
}
}
fn parse(complex: &Map<String, Value>) -> Result<Self, Error> {
let name = complex
.name()
.ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
let namespace = complex.string("namespace");
let aliases: Option<Vec<String>> = complex
.get("aliases")
.and_then(|aliases| aliases.as_array())
.and_then(|aliases| {
aliases
.iter()
.map(|alias| alias.as_str())
.map(|alias| alias.map(|a| a.to_string()))
.collect::<Option<_>>()
});
Ok(Name {
name,
namespace,
aliases,
})
}
pub fn fullname(&self, default_namespace: Option<&str>) -> String {
if self.name.contains('.') {
self.name.clone()
} else {
let namespace = self
.namespace
.as_ref()
.map(|s| s.as_ref())
.or(default_namespace);
match namespace {
Some(ref namespace) => format!("{}.{}", namespace, self.name),
None => self.name.clone(),
}
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct RecordField {
pub name: String,
pub doc: Documentation,
pub default: Option<Value>,
pub schema: Schema,
pub order: RecordFieldOrder,
pub position: usize,
}
#[derive(Clone, Debug, PartialEq)]
pub enum RecordFieldOrder {
Ascending,
Descending,
Ignore,
}
impl RecordField {
fn parse(field: &Map<String, Value>, position: usize) -> Result<Self, Error> {
let name = field
.name()
.ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
let schema = field
.get("type")
.ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
.and_then(|type_| Schema::parse(type_))?;
let default = field.get("default").cloned();
let order = field
.get("order")
.and_then(|order| order.as_str())
.and_then(|order| match order {
"ascending" => Some(RecordFieldOrder::Ascending),
"descending" => Some(RecordFieldOrder::Descending),
"ignore" => Some(RecordFieldOrder::Ignore),
_ => None,
}).unwrap_or_else(|| RecordFieldOrder::Ascending);
Ok(RecordField {
name,
doc: field.doc(),
default,
schema,
order,
position,
})
}
}
#[derive(Debug, Clone)]
pub struct UnionSchema {
schemas: Vec<Schema>,
variant_index: HashMap<SchemaKind, usize>,
}
impl UnionSchema {
pub(crate) fn new(schemas: Vec<Schema>) -> Result<Self, Error> {
let mut vindex = HashMap::new();
for (i, schema) in schemas.iter().enumerate() {
if let Schema::Union(_) = schema {
Err(ParseSchemaError::new(
"Unions may not directly contain a union",
))?;
}
let kind = SchemaKind::from(schema);
if vindex.insert(kind, i).is_some() {
Err(ParseSchemaError::new(
"Unions cannot contain duplicate types",
))?;
}
}
Ok(UnionSchema {
schemas,
variant_index: vindex,
})
}
pub fn variants(&self) -> &[Schema] {
&self.schemas
}
pub fn is_nullable(&self) -> bool {
!self.schemas.is_empty() && self.schemas[0] == Schema::Null
}
pub fn find_schema(&self, value: &::types::Value) -> Option<(usize, &Schema)> {
let kind = SchemaKind::from(value);
self.variant_index
.get(&kind)
.cloned()
.map(|i| (i, &self.schemas[i]))
}
}
impl PartialEq for UnionSchema {
fn eq(&self, other: &UnionSchema) -> bool {
self.schemas.eq(&other.schemas)
}
}
impl Schema {
pub fn parse_str(input: &str) -> Result<Self, Error> {
let value = serde_json::from_str(input)?;
Self::parse(&value)
}
pub fn parse(value: &Value) -> Result<Self, Error> {
match *value {
Value::String(ref t) => Schema::parse_primitive(t.as_str()),
Value::Object(ref data) => Schema::parse_complex(data),
Value::Array(ref data) => Schema::parse_union(data),
_ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
}
}
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self).unwrap();
parsing_canonical_form(&json)
}
fn parse_primitive(primitive: &str) -> Result<Self, Error> {
match primitive {
"null" => Ok(Schema::Null),
"boolean" => Ok(Schema::Boolean),
"int" => Ok(Schema::Int),
"long" => Ok(Schema::Long),
"double" => Ok(Schema::Double),
"float" => Ok(Schema::Float),
"bytes" => Ok(Schema::Bytes),
"string" => Ok(Schema::String),
other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
}
}
fn parse_complex(complex: &Map<String, Value>) -> Result<Self, Error> {
match complex.get("type") {
Some(&Value::String(ref t)) => match t.as_str() {
"record" => Schema::parse_record(complex),
"enum" => Schema::parse_enum(complex),
"array" => Schema::parse_array(complex),
"map" => Schema::parse_map(complex),
"fixed" => Schema::parse_fixed(complex),
other => Schema::parse_primitive(other),
},
Some(&Value::Object(ref data)) => match data.get("type") {
Some(ref value) => Schema::parse(value),
None => Err(
ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
),
},
_ => Err(ParseSchemaError::new("No `type` in complex type").into()),
}
}
fn parse_record(complex: &Map<String, Value>) -> Result<Self, Error> {
let name = Name::parse(complex)?;
let mut lookup = HashMap::new();
let fields: Vec<RecordField> = complex
.get("fields")
.and_then(|fields| fields.as_array())
.ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
.and_then(|fields| {
fields
.iter()
.filter_map(|field| field.as_object())
.enumerate()
.map(|(position, field)| RecordField::parse(field, position))
.collect::<Result<_, _>>()
})?;
for field in &fields {
lookup.insert(field.name.clone(), field.position);
}
Ok(Schema::Record {
name,
doc: complex.doc(),
fields,
lookup,
})
}
fn parse_enum(complex: &Map<String, Value>) -> Result<Self, Error> {
let name = Name::parse(complex)?;
let symbols = complex
.get("symbols")
.and_then(|v| v.as_array())
.ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
.and_then(|symbols| {
symbols
.iter()
.map(|symbol| symbol.as_str().map(|s| s.to_string()))
.collect::<Option<_>>()
.ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
})?;
Ok(Schema::Enum {
name,
doc: complex.doc(),
symbols,
})
}
fn parse_array(complex: &Map<String, Value>) -> Result<Self, Error> {
complex
.get("items")
.ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
.and_then(|items| Schema::parse(items))
.map(|schema| Schema::Array(Box::new(schema)))
}
fn parse_map(complex: &Map<String, Value>) -> Result<Self, Error> {
complex
.get("values")
.ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
.and_then(|items| Schema::parse(items))
.map(|schema| Schema::Map(Box::new(schema)))
}
fn parse_union(items: &[Value]) -> Result<Self, Error> {
items
.iter()
.map(Schema::parse)
.collect::<Result<Vec<_>, _>>()
.and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
}
fn parse_fixed(complex: &Map<String, Value>) -> Result<Self, Error> {
let name = Name::parse(complex)?;
let size = complex
.get("size")
.and_then(|v| v.as_i64())
.ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
Ok(Schema::Fixed {
name,
size: size as usize,
})
}
}
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
Schema::Null => serializer.serialize_str("null"),
Schema::Boolean => serializer.serialize_str("boolean"),
Schema::Int => serializer.serialize_str("int"),
Schema::Long => serializer.serialize_str("long"),
Schema::Float => serializer.serialize_str("float"),
Schema::Double => serializer.serialize_str("double"),
Schema::Bytes => serializer.serialize_str("bytes"),
Schema::String => serializer.serialize_str("string"),
Schema::Array(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "array")?;
map.serialize_entry("items", &*inner.clone())?;
map.end()
},
Schema::Map(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "map")?;
map.serialize_entry("values", &*inner.clone())?;
map.end()
},
Schema::Union(ref inner) => {
let variants = inner.variants();
let mut seq = serializer.serialize_seq(Some(variants.len()))?;
for v in variants {
seq.serialize_element(v)?;
}
seq.end()
},
Schema::Record {
ref name,
ref doc,
ref fields,
..
} => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "record")?;
if let Some(ref n) = name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &name.name)?;
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
if let Some(ref aliases) = name.aliases {
map.serialize_entry("aliases", aliases)?;
}
map.serialize_entry("fields", fields)?;
map.end()
},
Schema::Enum {
ref name,
ref symbols,
..
} => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "enum")?;
map.serialize_entry("name", &name.name)?;
map.serialize_entry("symbols", symbols)?;
map.end()
},
Schema::Fixed { ref name, ref size } => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
map.serialize_entry("name", &name.name)?;
map.serialize_entry("size", size)?;
map.end()
},
}
}
}
impl Serialize for RecordField {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("name", &self.name)?;
map.serialize_entry("type", &self.schema)?;
if let Some(ref default) = self.default {
map.serialize_entry("default", default)?;
}
map.end()
}
}
fn parsing_canonical_form(schema: &serde_json::Value) -> String {
match schema {
serde_json::Value::Object(map) => pcf_map(map),
serde_json::Value::String(s) => pcf_string(s),
serde_json::Value::Array(v) => pcf_array(v),
_ => unreachable!(),
}
}
fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
let ns = schema.get("namespace").and_then(|v| v.as_str());
let mut fields = Vec::new();
for (k, v) in schema {
if schema.len() == 1 && k == "type" {
if let serde_json::Value::String(s) = v {
return pcf_string(s)
}
}
if field_ordering_position(k).is_none() {
continue
}
if k == "name" {
let name = v.as_str().unwrap();
let n = match ns {
Some(namespace) if !name.contains('.') => {
Cow::Owned(format!("{}.{}", namespace, name))
},
_ => Cow::Borrowed(name),
};
fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
continue
}
if k == "size" {
let i = match v.as_str() {
Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
None => v.as_i64().unwrap(),
};
fields.push((k, format!("{}:{}", pcf_string(k), i)));
continue
}
fields.push((
k,
format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
));
}
fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
let inter = fields
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>()
.join(",");
format!("{{{}}}", inter)
}
fn pcf_array(arr: &[serde_json::Value]) -> String {
let inter = arr
.iter()
.map(parsing_canonical_form)
.collect::<Vec<String>>()
.join(",");
format!("[{}]", inter)
}
fn pcf_string(s: &str) -> String {
format!("\"{}\"", s)
}
fn field_ordering_position(field: &str) -> Option<usize> {
let v = match field {
"name" => 1,
"type" => 2,
"fields" => 3,
"symbols" => 4,
"items" => 5,
"values" => 6,
"size" => 7,
_ => return None,
};
Some(v)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_invalid_schema() {
assert!(Schema::parse_str("invalid").is_err());
}
#[test]
fn test_primitive_schema() {
assert_eq!(Schema::Null, Schema::parse_str("\"null\"").unwrap());
assert_eq!(Schema::Int, Schema::parse_str("\"int\"").unwrap());
assert_eq!(Schema::Double, Schema::parse_str("\"double\"").unwrap());
}
#[test]
fn test_array_schema() {
let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#).unwrap();
assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
}
#[test]
fn test_map_schema() {
let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#).unwrap();
assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
}
#[test]
fn test_union_schema() {
let schema = Schema::parse_str(r#"["null", "int"]"#).unwrap();
assert_eq!(
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
schema
);
}
#[test]
fn test_union_unsupported_schema() {
let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
assert!(schema.is_err());
}
#[test]
fn test_multi_union_schema() {
let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
assert!(schema.is_ok());
let schema = schema.unwrap();
assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
let union_schema = match schema {
Schema::Union(u) => u,
_ => unreachable!(),
};
assert_eq!(union_schema.variants().len(), 5);
let mut variants = union_schema.variants().iter();
assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::Float
);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::String
);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::Bytes
);
assert_eq!(variants.next(), None);
}
#[test]
fn test_record_schema() {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#,
).unwrap();
let mut lookup = HashMap::new();
lookup.insert("a".to_owned(), 0);
lookup.insert("b".to_owned(), 1);
let expected = Schema::Record {
name: Name::new("test"),
doc: None,
fields: vec![
RecordField {
name: "a".to_string(),
doc: None,
default: Some(Value::Number(42i64.into())),
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
},
RecordField {
name: "b".to_string(),
doc: None,
default: None,
schema: Schema::String,
order: RecordFieldOrder::Ascending,
position: 1,
},
],
lookup,
};
assert_eq!(expected, schema);
}
#[test]
fn test_enum_schema() {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
).unwrap();
let expected = Schema::Enum {
name: Name::new("Suit"),
doc: None,
symbols: vec![
"diamonds".to_owned(),
"spades".to_owned(),
"clubs".to_owned(),
"hearts".to_owned(),
],
};
assert_eq!(expected, schema);
}
#[test]
fn test_fixed_schema() {
let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();
let expected = Schema::Fixed {
name: Name::new("test"),
size: 16usize,
};
assert_eq!(expected, schema);
}
#[test]
fn test_no_documentation() {
let schema =
Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
.unwrap();
let doc = match schema {
Schema::Enum { doc, .. } => doc,
_ => return assert!(false),
};
assert!(doc.is_none());
}
#[test]
fn test_documentation() {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
).unwrap();
let doc = match schema {
Schema::Enum { doc, .. } => doc,
_ => None,
};
assert_eq!("Some documentation".to_owned(), doc.unwrap());
}
#[test]
fn test_schema_is_send() {
fn send<S: Send>(_s: S) {}
let schema = Schema::Null;
send(schema);
}
#[test]
fn test_schema_is_sync() {
fn sync<S: Sync>(_s: S) {}
let schema = Schema::Null;
sync(&schema);
sync(schema);
}
}