use std::collections::HashMap;
use std::hash::BuildHasher;
use std::u8;
use failure::{Error, Fail};
use serde_json::Value as JsonValue;
use crate::schema::{RecordField, Schema, SchemaKind, UnionSchema};
#[derive(Fail, Debug)]
#[fail(display = "Schema resoulution error: {}", _0)]
pub struct SchemaResolutionError(pub String);
impl SchemaResolutionError {
pub fn new<S>(msg: S) -> SchemaResolutionError
where
S: Into<String>,
{
SchemaResolutionError(msg.into())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Value {
Null,
Boolean(bool),
Int(i32),
Long(i64),
Float(f32),
Double(f64),
Bytes(Vec<u8>),
String(String),
Fixed(usize, Vec<u8>),
Enum(i32, String),
Union(Box<Value>),
Array(Vec<Value>),
Map(HashMap<String, Value>),
Record(Vec<(String, Value)>),
}
pub trait ToAvro {
fn avro(self) -> Value;
}
macro_rules! to_avro(
($t:ty, $v:expr) => (
impl ToAvro for $t {
fn avro(self) -> Value {
$v(self)
}
}
);
);
to_avro!(bool, Value::Boolean);
to_avro!(i32, Value::Int);
to_avro!(i64, Value::Long);
to_avro!(f32, Value::Float);
to_avro!(f64, Value::Double);
to_avro!(String, Value::String);
impl ToAvro for () {
fn avro(self) -> Value {
Value::Null
}
}
impl ToAvro for usize {
fn avro(self) -> Value {
(self as i64).avro()
}
}
impl<'a> ToAvro for &'a str {
fn avro(self) -> Value {
Value::String(self.to_owned())
}
}
impl<'a> ToAvro for &'a [u8] {
fn avro(self) -> Value {
Value::Bytes(self.to_owned())
}
}
impl<T> ToAvro for Option<T>
where
T: ToAvro,
{
fn avro(self) -> Value {
let v = match self {
Some(v) => T::avro(v),
None => Value::Null,
};
Value::Union(Box::new(v))
}
}
impl<T, S: BuildHasher> ToAvro for HashMap<String, T, S>
where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
self.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
)
}
}
impl<'a, T, S: BuildHasher> ToAvro for HashMap<&'a str, T, S>
where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
self.into_iter()
.map(|(key, value)| (key.to_owned(), value.avro()))
.collect::<_>(),
)
}
}
impl ToAvro for Value {
fn avro(self) -> Value {
self
}
}
#[derive(Debug, Clone)]
pub struct Record<'a> {
pub fields: Vec<(String, Value)>,
schema_lookup: &'a HashMap<String, usize>,
}
impl<'a> Record<'a> {
pub fn new(schema: &Schema) -> Option<Record> {
match *schema {
Schema::Record {
fields: ref schema_fields,
lookup: ref schema_lookup,
..
} => {
let mut fields = Vec::with_capacity(schema_fields.len());
for schema_field in schema_fields.iter() {
fields.push((schema_field.name.clone(), Value::Null));
}
Some(Record {
fields,
schema_lookup,
})
}
_ => None,
}
}
pub fn put<V>(&mut self, field: &str, value: V)
where
V: ToAvro,
{
if let Some(&position) = self.schema_lookup.get(field) {
self.fields[position].1 = value.avro()
}
}
}
impl<'a> ToAvro for Record<'a> {
fn avro(self) -> Value {
Value::Record(self.fields)
}
}
impl ToAvro for JsonValue {
fn avro(self) -> Value {
match self {
JsonValue::Null => Value::Null,
JsonValue::Bool(b) => Value::Boolean(b),
JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64),
JsonValue::String(s) => Value::String(s),
JsonValue::Array(items) => {
Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
}
JsonValue::Object(items) => Value::Map(
items
.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
),
}
}
}
impl Value {
pub fn validate(&self, schema: &Schema) -> bool {
match (self, schema) {
(&Value::Null, &Schema::Null) => true,
(&Value::Boolean(_), &Schema::Boolean) => true,
(&Value::Int(_), &Schema::Int) => true,
(&Value::Long(_), &Schema::Long) => true,
(&Value::Float(_), &Schema::Float) => true,
(&Value::Double(_), &Schema::Double) => true,
(&Value::Bytes(_), &Schema::Bytes) => true,
(&Value::String(_), &Schema::String) => true,
(&Value::Fixed(n, _), &Schema::Fixed { size, .. }) => n == size,
(&Value::String(ref s), &Schema::Enum { ref symbols, .. }) => symbols.contains(s),
(&Value::Enum(i, ref s), &Schema::Enum { ref symbols, .. }) => symbols
.get(i as usize)
.map(|ref symbol| symbol == &s)
.unwrap_or(false),
(&Value::Union(ref value), &Schema::Union(ref inner)) => {
inner.find_schema(value).is_some()
}
(&Value::Array(ref items), &Schema::Array(ref inner)) => {
items.iter().all(|item| item.validate(inner))
}
(&Value::Map(ref items), &Schema::Map(ref inner)) => {
items.iter().all(|(_, value)| value.validate(inner))
}
(&Value::Record(ref record_fields), &Schema::Record { ref fields, .. }) => {
fields.len() == record_fields.len()
&& fields.iter().zip(record_fields.iter()).all(
|(field, &(ref name, ref value))| {
field.name == *name && value.validate(&field.schema)
},
)
}
_ => false,
}
}
pub fn resolve(mut self, schema: &Schema) -> Result<Self, Error> {
if SchemaKind::from(&self) == SchemaKind::Union
&& SchemaKind::from(schema) != SchemaKind::Union
{
let v = match self {
Value::Union(b) => *b,
_ => unreachable!(),
};
self = v;
}
match *schema {
Schema::Null => self.resolve_null(),
Schema::Boolean => self.resolve_boolean(),
Schema::Int => self.resolve_int(),
Schema::Long => self.resolve_long(),
Schema::Float => self.resolve_float(),
Schema::Double => self.resolve_double(),
Schema::Bytes => self.resolve_bytes(),
Schema::String => self.resolve_string(),
Schema::Fixed { size, .. } => self.resolve_fixed(size),
Schema::Union(ref inner) => self.resolve_union(inner),
Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
Schema::Array(ref inner) => self.resolve_array(inner),
Schema::Map(ref inner) => self.resolve_map(inner),
Schema::Record { ref fields, .. } => self.resolve_record(fields),
}
}
fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
other => {
Err(SchemaResolutionError::new(format!("Null expected, got {:?}", other)).into())
}
}
}
fn resolve_boolean(self) -> Result<Self, Error> {
match self {
Value::Boolean(b) => Ok(Value::Boolean(b)),
other => {
Err(SchemaResolutionError::new(format!("Boolean expected, got {:?}", other)).into())
}
}
}
fn resolve_int(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Int(n)),
Value::Long(n) => Ok(Value::Int(n as i32)),
other => {
Err(SchemaResolutionError::new(format!("Int expected, got {:?}", other)).into())
}
}
}
fn resolve_long(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Long(i64::from(n))),
Value::Long(n) => Ok(Value::Long(n)),
other => {
Err(SchemaResolutionError::new(format!("Long expected, got {:?}", other)).into())
}
}
}
fn resolve_float(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Float(n as f32)),
Value::Long(n) => Ok(Value::Float(n as f32)),
Value::Float(x) => Ok(Value::Float(x)),
Value::Double(x) => Ok(Value::Float(x as f32)),
other => {
Err(SchemaResolutionError::new(format!("Float expected, got {:?}", other)).into())
}
}
}
fn resolve_double(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Double(f64::from(n))),
Value::Long(n) => Ok(Value::Double(n as f64)),
Value::Float(x) => Ok(Value::Double(f64::from(x))),
Value::Double(x) => Ok(Value::Double(x)),
other => {
Err(SchemaResolutionError::new(format!("Double expected, got {:?}", other)).into())
}
}
}
fn resolve_bytes(self) -> Result<Self, Error> {
match self {
Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
Value::Array(items) => Ok(Value::Bytes(
items
.into_iter()
.map(Value::try_u8)
.collect::<Result<Vec<_>, _>>()?,
)),
other => {
Err(SchemaResolutionError::new(format!("Bytes expected, got {:?}", other)).into())
}
}
}
fn resolve_string(self) -> Result<Self, Error> {
match self {
Value::String(s) => Ok(Value::String(s)),
Value::Bytes(bytes) => Ok(Value::String(String::from_utf8(bytes)?)),
other => {
Err(SchemaResolutionError::new(format!("String expected, got {:?}", other)).into())
}
}
}
fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
match self {
Value::Fixed(n, bytes) => {
if n == size {
Ok(Value::Fixed(n, bytes))
} else {
Err(SchemaResolutionError::new(format!(
"Fixed size mismatch, {} expected, got {}",
size, n
))
.into())
}
}
other => {
Err(SchemaResolutionError::new(format!("String expected, got {:?}", other)).into())
}
}
}
fn resolve_enum(self, symbols: &[String]) -> Result<Self, Error> {
let validate_symbol = |symbol: String, symbols: &[String]| {
if let Some(index) = symbols.iter().position(|ref item| item == &&symbol) {
Ok(Value::Enum(index as i32, symbol))
} else {
Err(SchemaResolutionError::new(format!(
"Enum default {} is not among allowed symbols {:?}",
symbol, symbols,
))
.into())
}
};
match self {
Value::Enum(i, s) => {
if i >= 0 && i < symbols.len() as i32 {
validate_symbol(s, symbols)
} else {
Err(SchemaResolutionError::new(format!(
"Enum value {} is out of bound {}",
i,
symbols.len() as i32
))
.into())
}
}
Value::String(s) => validate_symbol(s, symbols),
other => Err(SchemaResolutionError::new(format!(
"Enum({:?}) expected, got {:?}",
symbols, other
))
.into()),
}
}
fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
let v = match self {
Value::Union(v) => *v,
v => v,
};
let (_, inner) = schema
.find_schema(&v)
.ok_or_else(|| SchemaResolutionError::new("Could not find matching type in union"))?;
Ok(Value::Union(Box::new(v.resolve(inner)?)))
}
fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
match self {
Value::Array(items) => Ok(Value::Array(
items
.into_iter()
.map(|item| item.resolve(schema))
.collect::<Result<Vec<_>, _>>()?,
)),
other => Err(SchemaResolutionError::new(format!(
"Array({:?}) expected, got {:?}",
schema, other
))
.into()),
}
}
fn resolve_map(self, schema: &Schema) -> Result<Self, Error> {
match self {
Value::Map(items) => Ok(Value::Map(
items
.into_iter()
.map(|(key, value)| value.resolve(schema).map(|value| (key, value)))
.collect::<Result<HashMap<_, _>, _>>()?,
)),
other => Err(SchemaResolutionError::new(format!(
"Map({:?}) expected, got {:?}",
schema, other
))
.into()),
}
}
fn resolve_record(self, fields: &[RecordField]) -> Result<Self, Error> {
let mut items = match self {
Value::Map(items) => Ok(items),
Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
other => Err(Error::from(SchemaResolutionError::new(format!(
"Record({:?}) expected, got {:?}",
fields, other
)))),
}?;
let new_fields = fields
.iter()
.map(|field| {
let value = match items.remove(&field.name) {
Some(value) => value,
None => match field.default {
Some(ref value) => match field.schema {
Schema::Enum { ref symbols, .. } => {
value.clone().avro().resolve_enum(symbols)?
}
Schema::Union(ref union_schema) => {
let first = &union_schema.variants()[0];
match first {
Schema::Null => Value::Union(Box::new(Value::Null)),
_ => {
Value::Union(Box::new(value.clone().avro().resolve(first)?))
}
}
}
_ => value.clone().avro(),
},
None => {
return Err(SchemaResolutionError::new(format!(
"missing field {} in record",
field.name
))
.into());
}
},
};
value
.resolve(&field.schema)
.map(|value| (field.name.clone(), value))
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Value::Record(new_fields))
}
fn try_u8(self) -> Result<u8, Error> {
let int = self.resolve(&Schema::Int)?;
if let Value::Int(n) = int {
if n >= 0 && n <= i32::from(u8::MAX) {
return Ok(n as u8);
}
}
Err(SchemaResolutionError::new(format!("Unable to convert to u8, got {:?}", int)).into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{Name, RecordField, RecordFieldOrder, UnionSchema};
#[test]
fn validate() {
let value_schema_valid = vec![
(Value::Int(42), Schema::Int, true),
(Value::Int(42), Schema::Boolean, false),
(
Value::Union(Box::new(Value::Null)),
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
true,
),
(
Value::Union(Box::new(Value::Int(42))),
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
true,
),
(
Value::Union(Box::new(Value::Null)),
Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
false,
),
(
Value::Union(Box::new(Value::Int(42))),
Schema::Union(
UnionSchema::new(vec![
Schema::Null,
Schema::Double,
Schema::String,
Schema::Int,
])
.unwrap(),
),
true,
),
(
Value::Array(vec![Value::Long(42i64)]),
Schema::Array(Box::new(Schema::Long)),
true,
),
(
Value::Array(vec![Value::Boolean(true)]),
Schema::Array(Box::new(Schema::Long)),
false,
),
(Value::Record(vec![]), Schema::Null, false),
];
for (value, schema, valid) in value_schema_valid.into_iter() {
assert_eq!(valid, value.validate(&schema));
}
}
#[test]
fn validate_fixed() {
let schema = Schema::Fixed {
size: 4,
name: Name::new("some_fixed"),
};
assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(&schema));
}
#[test]
fn validate_enum() {
let schema = Schema::Enum {
name: Name::new("some_enum"),
doc: None,
symbols: vec![
"spades".to_string(),
"hearts".to_string(),
"diamonds".to_string(),
"clubs".to_string(),
],
};
assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
assert!(Value::String("spades".to_string()).validate(&schema));
assert!(!Value::Enum(1, "spades".to_string()).validate(&schema));
assert!(!Value::String("lorem".to_string()).validate(&schema));
let other_schema = Schema::Enum {
name: Name::new("some_other_enum"),
doc: None,
symbols: vec![
"hearts".to_string(),
"diamonds".to_string(),
"clubs".to_string(),
"spades".to_string(),
],
};
assert!(!Value::Enum(0, "spades".to_string()).validate(&other_schema));
}
#[test]
fn validate_record() {
let schema = Schema::Record {
name: Name::new("some_record"),
doc: None,
fields: vec![
RecordField {
name: "a".to_string(),
doc: None,
default: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
},
RecordField {
name: "b".to_string(),
doc: None,
default: None,
schema: Schema::String,
order: RecordFieldOrder::Ascending,
position: 1,
},
],
lookup: HashMap::new(),
};
assert!(Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
])
.validate(&schema));
assert!(!Value::Record(vec![
("b".to_string(), Value::String("foo".to_string())),
("a".to_string(), Value::Long(42i64)),
])
.validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Boolean(false)),
("b".to_string(), Value::String("foo".to_string())),
])
.validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("c".to_string(), Value::String("foo".to_string())),
])
.validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
("c".to_string(), Value::Null),
])
.validate(&schema));
}
#[test]
fn resolve_bytes_ok() {
let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
assert_eq!(
value.resolve(&Schema::Bytes).unwrap(),
Value::Bytes(vec![0u8, 42u8])
);
}
#[test]
fn resolve_bytes_failure() {
let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
assert!(value.resolve(&Schema::Bytes).is_err());
}
}