use std::collections::HashMap;
use std::rc::Rc;
use failure::{err_msg, Error};
use serde_json::Value as JsonValue;
use schema::{RecordField, Schema};
#[derive(Clone, Debug, PartialEq)]
pub enum Value {
Null,
Boolean(bool),
Int(i32),
Long(i64),
Float(f32),
Double(f64),
Bytes(Vec<u8>),
String(String),
Fixed(usize, Vec<u8>),
Enum(i32, String),
Union(Option<Box<Value>>),
Array(Vec<Value>),
Map(HashMap<String, Value>),
Record(Vec<(String, Value)>),
}
pub trait ToAvro {
fn avro(self) -> Value;
}
macro_rules! to_avro(
($t:ty, $v:expr) => (
impl ToAvro for $t {
fn avro(self) -> Value {
$v(self)
}
}
);
);
to_avro!(bool, Value::Boolean);
to_avro!(i32, Value::Int);
to_avro!(i64, Value::Long);
to_avro!(f32, Value::Float);
to_avro!(f64, Value::Double);
to_avro!(String, Value::String);
impl ToAvro for () {
fn avro(self) -> Value {
Value::Null
}
}
impl ToAvro for usize {
fn avro(self) -> Value {
(self as i64).avro()
}
}
impl<'a> ToAvro for &'a str {
fn avro(self) -> Value {
Value::String(self.to_owned())
}
}
impl<'a> ToAvro for &'a [u8] {
fn avro(self) -> Value {
Value::Bytes(self.to_owned())
}
}
impl<T> ToAvro for Option<T>
where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Union(self.map(|v| Box::new(v.avro())))
}
}
impl<T> ToAvro for HashMap<String, T>
where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
self.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
)
}
}
impl<'a, T> ToAvro for HashMap<&'a str, T>
where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
self.into_iter()
.map(|(key, value)| (key.to_owned(), value.avro()))
.collect::<_>(),
)
}
}
impl ToAvro for Value {
fn avro(self) -> Value {
self
}
}
impl<T> ToAvro for Box<T>
where
T: ToAvro,
{
fn avro(self) -> Value {
(*self).avro()
}
}
#[derive(Debug, Clone)]
pub struct Record {
pub fields: Vec<(String, Value)>,
schema_lookup: Rc<HashMap<String, usize>>,
}
impl Record {
pub fn new(schema: &Schema) -> Option<Record> {
match schema {
&Schema::Record {
fields: ref schema_fields,
lookup: ref schema_lookup,
..
} => {
let mut fields = Vec::with_capacity(schema_fields.len());
for schema_field in schema_fields.iter() {
fields.push((schema_field.name.clone(), Value::Null));
}
Some(Record {
fields,
schema_lookup: schema_lookup.clone(),
})
},
_ => None,
}
}
pub fn put<V>(&mut self, field: &str, value: V)
where
V: ToAvro,
{
match self.schema_lookup.get(field) {
Some(&position) => self.fields[position].1 = value.avro(),
None => (),
}
}
}
impl ToAvro for Record {
fn avro(self) -> Value {
Value::Record(self.fields)
}
}
impl ToAvro for JsonValue {
fn avro(self) -> Value {
match self {
JsonValue::Null => Value::Null,
JsonValue::Bool(b) => Value::Boolean(b),
JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => Value::String(s),
JsonValue::Array(items) => {
Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
},
JsonValue::Object(items) => Value::Map(
items
.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
),
}
}
}
impl Value {
pub fn validate(&self, schema: &Schema) -> bool {
match (self, schema) {
(&Value::Null, &Schema::Null) => true,
(&Value::Boolean(_), &Schema::Boolean) => true,
(&Value::Int(_), &Schema::Int) => true,
(&Value::Long(_), &Schema::Long) => true,
(&Value::Float(_), &Schema::Float) => true,
(&Value::Double(_), &Schema::Double) => true,
(&Value::Bytes(_), &Schema::Bytes) => true,
(&Value::String(_), &Schema::String) => true,
(&Value::Fixed(n, _), &Schema::Fixed { size, .. }) => n == size,
(&Value::String(ref s), &Schema::Enum { ref symbols, .. }) => symbols.contains(s),
(&Value::Enum(i, ref s), &Schema::Enum { ref symbols, .. }) => symbols
.get(i as usize)
.map(|ref symbol| symbol == &s)
.unwrap_or(false),
(&Value::Union(None), &Schema::Union(_)) => true,
(&Value::Union(Some(ref value)), &Schema::Union(ref inner)) => value.validate(inner),
(&Value::Array(ref items), &Schema::Array(ref inner)) => {
items.iter().all(|item| item.validate(inner))
},
(&Value::Map(ref items), &Schema::Map(ref inner)) => {
items.iter().all(|(_, value)| value.validate(inner))
},
(&Value::Record(ref record_fields), &Schema::Record { ref fields, .. }) => {
fields.len() == record_fields.len()
&& fields.iter().zip(record_fields.iter()).all(
|(field, &(ref name, ref value))| {
field.name == *name && value.validate(&field.schema)
},
)
},
_ => false,
}
}
pub fn resolve(self, schema: &Schema) -> Result<Self, Error> {
match schema {
&Schema::Null => self.resolve_null(),
&Schema::Boolean => self.resolve_boolean(),
&Schema::Int => self.resolve_int(),
&Schema::Long => self.resolve_long(),
&Schema::Float => self.resolve_float(),
&Schema::Double => self.resolve_double(),
&Schema::Bytes => self.resolve_bytes(),
&Schema::String => self.resolve_string(),
&Schema::Fixed { size, .. } => self.resolve_fixed(size),
&Schema::Union(ref inner) => self.resolve_union(inner),
&Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
&Schema::Array(ref inner) => self.resolve_array(inner),
&Schema::Map(ref inner) => self.resolve_map(inner),
&Schema::Record { ref fields, .. } => self.resolve_record(fields),
}
}
fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
other => Err(err_msg(format!("Null expected, got {:?}", other))),
}
}
fn resolve_boolean(self) -> Result<Self, Error> {
match self {
Value::Boolean(b) => Ok(Value::Boolean(b)),
other => Err(err_msg(format!("Boolean expected, got {:?}", other))),
}
}
fn resolve_int(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Int(n)),
Value::Long(n) => Ok(Value::Int(n as i32)),
other => Err(err_msg(format!("Int expected, got {:?}", other))),
}
}
fn resolve_long(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Long(n as i64)),
Value::Long(n) => Ok(Value::Long(n)),
other => Err(err_msg(format!("Long expected, got {:?}", other))),
}
}
fn resolve_float(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Float(n as f32)),
Value::Long(n) => Ok(Value::Float(n as f32)),
Value::Float(x) => Ok(Value::Float(x)),
Value::Double(x) => Ok(Value::Float(x as f32)),
other => Err(err_msg(format!("Float expected, got {:?}", other))),
}
}
fn resolve_double(self) -> Result<Self, Error> {
match self {
Value::Int(n) => Ok(Value::Double(n as f64)),
Value::Long(n) => Ok(Value::Double(n as f64)),
Value::Float(x) => Ok(Value::Double(x as f64)),
Value::Double(x) => Ok(Value::Double(x)),
other => Err(err_msg(format!("Double expected, got {:?}", other))),
}
}
fn resolve_bytes(self) -> Result<Self, Error> {
match self {
Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
other => Err(err_msg(format!("Bytes expected, got {:?}", other))),
}
}
fn resolve_string(self) -> Result<Self, Error> {
match self {
Value::String(s) => Ok(Value::String(s)),
Value::Bytes(bytes) => Ok(Value::String(String::from_utf8(bytes)?)),
other => Err(err_msg(format!("String expected, got {:?}", other))),
}
}
fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
match self {
Value::Fixed(n, bytes) => if n == size {
Ok(Value::Fixed(n, bytes))
} else {
Err(err_msg(format!(
"Fixed size mismatch, {} expected, got {}",
size, n
)))
},
other => Err(err_msg(format!("String expected, got {:?}", other))),
}
}
fn resolve_enum(self, symbols: &Vec<String>) -> Result<Self, Error> {
let validate_symbol = |symbol: String, symbols: &Vec<String>| {
if let Some(index) = symbols.iter().position(|ref item| item == &&symbol) {
Ok(Value::Enum(index as i32, symbol))
} else {
Err(err_msg(format!(
"Enum default {} is not among allowed symbols {:?}",
symbol, symbols,
)))
}
};
match self {
Value::Enum(i, s) => if i > 0 && i < symbols.len() as i32 {
validate_symbol(s, symbols)
} else {
Err(err_msg(format!(
"Enum value {} is out of bound {}",
i,
symbols.len() as i32
)))
},
Value::String(s) => validate_symbol(s, symbols),
other => Err(err_msg(format!(
"Enum({:?}) expected, got {:?}",
symbols, other
))),
}
}
fn resolve_union(self, schema: &Schema) -> Result<Self, Error> {
match self {
Value::Union(None) => Ok(Value::Union(None)),
Value::Union(Some(inner)) => Ok(Value::Union(Some(Box::new(inner.resolve(schema)?)))),
other => Err(err_msg(format!(
"Union({:?}) expected, got {:?}",
schema, other
))),
}
}
fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
match self {
Value::Array(items) => Ok(Value::Array(items
.into_iter()
.map(|item| item.resolve(schema))
.collect::<Result<Vec<_>, _>>()?)),
other => Err(err_msg(format!(
"Array({:?}) expected, got {:?}",
schema, other
))),
}
}
fn resolve_map(self, schema: &Schema) -> Result<Self, Error> {
match self {
Value::Map(items) => Ok(Value::Map(items
.into_iter()
.map(|(key, value)| value.resolve(schema).map(|value| (key, value)))
.collect::<Result<HashMap<_, _>, _>>()?)),
other => Err(err_msg(format!(
"Map({:?}) expected, got {:?}",
schema, other
))),
}
}
fn resolve_record(self, fields: &Vec<RecordField>) -> Result<Self, Error> {
let mut items = match self {
Value::Map(items) => Ok(items),
Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
other => Err(err_msg(format!(
"Record({:?}) expected, got {:?}",
fields, other
))),
}?;
let new_fields = fields
.iter()
.map(|field| {
let value = match items.remove(&field.name) {
Some(value) => value,
None => match field.default {
Some(ref value) => match field.schema {
Schema::Enum { ref symbols, .. } => {
value.clone().avro().resolve_enum(symbols)?
},
_ => value.clone().avro(),
},
_ => return Err(err_msg(format!("missing field {} in record", field.name))),
},
};
value
.resolve(&field.schema)
.map(|value| (field.name.clone(), value))
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Value::Record(new_fields))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::rc::Rc;
use schema::{Name, RecordField, RecordFieldOrder};
#[test]
fn validate() {
let value_schema_valid = vec![
(Value::Int(42), Schema::Int, true),
(Value::Int(42), Schema::Boolean, false),
(
Value::Union(None),
Schema::Union(Rc::new(Schema::Int)),
true,
),
(
Value::Union(Some(Box::new(Value::Int(42)))),
Schema::Union(Rc::new(Schema::Int)),
true,
),
(
Value::Union(Some(Box::new(Value::Null))),
Schema::Union(Rc::new(Schema::Int)),
false,
),
(
Value::Array(vec![Value::Long(42i64)]),
Schema::Array(Rc::new(Schema::Long)),
true,
),
(
Value::Array(vec![Value::Boolean(true)]),
Schema::Array(Rc::new(Schema::Long)),
false,
),
(Value::Record(vec![]), Schema::Null, false),
];
for (value, schema, valid) in value_schema_valid.into_iter() {
assert_eq!(valid, value.validate(&schema));
}
}
#[test]
fn validate_fixed() {
let schema = Schema::Fixed {
size: 4,
name: Name::new("some_fixed"),
};
assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(&schema));
}
#[test]
fn validate_enum() {
let schema = Schema::Enum {
name: Name::new("some_enum"),
doc: None,
symbols: vec![
"spades".to_string(),
"hearts".to_string(),
"diamonds".to_string(),
"clubs".to_string(),
],
};
assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
assert!(Value::String("spades".to_string()).validate(&schema));
assert!(!Value::Enum(1, "spades".to_string()).validate(&schema));
assert!(!Value::String("lorem".to_string()).validate(&schema));
let other_schema = Schema::Enum {
name: Name::new("some_other_enum"),
doc: None,
symbols: vec![
"hearts".to_string(),
"diamonds".to_string(),
"clubs".to_string(),
"spades".to_string(),
],
};
assert!(!Value::Enum(0, "spades".to_string()).validate(&other_schema));
}
#[test]
fn validate_record() {
let schema = Schema::Record {
name: Name::new("some_record"),
doc: None,
fields: vec![
RecordField {
name: "a".to_string(),
doc: None,
default: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
},
RecordField {
name: "b".to_string(),
doc: None,
default: None,
schema: Schema::String,
order: RecordFieldOrder::Ascending,
position: 1,
},
],
lookup: Rc::new(HashMap::new()),
};
assert!(
Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
]).validate(&schema)
);
assert!(!Value::Record(vec![
("b".to_string(), Value::String("foo".to_string())),
("a".to_string(), Value::Long(42i64)),
]).validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Boolean(false)),
("b".to_string(), Value::String("foo".to_string())),
]).validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("c".to_string(), Value::String("foo".to_string())),
]).validate(&schema));
assert!(!Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
("c".to_string(), Value::Null),
]).validate(&schema));
}
}