use apache_avro::{Error, Schema, error::Details, from_avro_datum, to_avro_datum, types::Value};
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use std::{io::Cursor, sync::OnceLock};
fn schemas_to_validate() -> &'static Vec<(&'static str, Value)> {
static SCHEMAS_TO_VALIDATE_ONCE: OnceLock<Vec<(&'static str, Value)>> = OnceLock::new();
SCHEMAS_TO_VALIDATE_ONCE.get_or_init(|| {
vec![
(r#""null""#, Value::Null),
(r#""boolean""#, Value::Boolean(true)),
(
r#""string""#,
Value::String("adsfasdf09809dsf-=adsf".to_string()),
),
(
r#""bytes""#,
Value::Bytes("12345abcd".to_string().into_bytes()),
),
(r#""int""#, Value::Int(1234)),
(r#""long""#, Value::Long(1234)),
(r#""float""#, Value::Float(1234.0)),
(r#""double""#, Value::Double(1234.0)),
(
r#"{"type": "fixed", "name": "Test", "size": 1}"#,
Value::Fixed(1, vec![b'B']),
),
(
r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#,
Value::Enum(1, "B".to_string()),
),
(
r#"{"type": "array", "items": "long"}"#,
Value::Array(vec![Value::Long(1), Value::Long(3), Value::Long(2)]),
),
(
r#"{"type": "map", "values": "long"}"#,
Value::Map(
[
("a".to_string(), Value::Long(1i64)),
("b".to_string(), Value::Long(3i64)),
("c".to_string(), Value::Long(2i64)),
]
.iter()
.cloned()
.collect(),
),
),
(
r#"["string", "null", "long"]"#,
Value::Union(1, Box::new(Value::Null)),
),
(
r#"{"type": "record", "name": "Test", "fields": [{"name": "f", "type": "long"}]}"#,
Value::Record(vec![("f".to_string(), Value::Long(1))]),
),
]
})
}
fn binary_encodings() -> &'static Vec<(i64, Vec<u8>)> {
static BINARY_ENCODINGS_ONCE: OnceLock<Vec<(i64, Vec<u8>)>> = OnceLock::new();
BINARY_ENCODINGS_ONCE.get_or_init(|| {
vec![
(0, vec![0x00]),
(-1, vec![0x01]),
(1, vec![0x02]),
(-2, vec![0x03]),
(2, vec![0x04]),
(-64, vec![0x7f]),
(64, vec![0x80, 0x01]),
(8192, vec![0x80, 0x80, 0x01]),
(-8193, vec![0x81, 0x80, 0x01]),
]
})
}
fn default_value_examples() -> &'static Vec<(&'static str, &'static str, Value)> {
static DEFAULT_VALUE_EXAMPLES_ONCE: OnceLock<Vec<(&'static str, &'static str, Value)>> =
OnceLock::new();
DEFAULT_VALUE_EXAMPLES_ONCE.get_or_init(|| {
vec![
(r#""null""#, "null", Value::Null),
(r#""boolean""#, "true", Value::Boolean(true)),
(r#""string""#, r#""foo""#, Value::String("foo".to_string())),
(r#""bytes""#, r#""a""#, Value::Bytes(vec![97])), (r#""bytes""#, r#""\u00FF""#, Value::Bytes(vec![195, 191])), (r#""int""#, "5", Value::Int(5)),
(r#""long""#, "5", Value::Long(5)),
(r#""float""#, "1.1", Value::Float(1.1)),
(r#""double""#, "1.1", Value::Double(1.1)),
(r#""float""#, r#""INF""#, Value::Float(f32::INFINITY)),
(r#""double""#, r#""INF""#, Value::Double(f64::INFINITY)),
(r#""float""#, r#""Infinity""#, Value::Float(f32::INFINITY)),
(
r#""float""#,
r#""-Infinity""#,
Value::Float(f32::NEG_INFINITY),
),
(r#""double""#, r#""Infinity""#, Value::Double(f64::INFINITY)),
(
r#""double""#,
r#""-Infinity""#,
Value::Double(f64::NEG_INFINITY),
),
(r#""float""#, r#""NaN""#, Value::Float(f32::NAN)),
(r#""double""#, r#""NaN""#, Value::Double(f64::NAN)),
(
r#"{"type": "fixed", "name": "F", "size": 2}"#,
r#""a""#,
Value::Fixed(1, vec![97]),
), (
r#"{"type": "fixed", "name": "F", "size": 2}"#,
r#""\u00FF""#,
Value::Fixed(2, vec![195, 191]),
), (
r#"{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}"#,
r#""FOO""#,
Value::Enum(0, "FOO".to_string()),
),
(
r#"{"type": "array", "items": "int"}"#,
"[1, 2, 3]",
Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
),
(
r#"{"type": "map", "values": "int"}"#,
r#"{"a": 1, "b": 2}"#,
Value::Map(
[
("a".to_string(), Value::Int(1)),
("b".to_string(), Value::Int(2)),
]
.iter()
.cloned()
.collect(),
),
),
(
r#"["int", "null"]"#,
"5",
Value::Union(0, Box::new(Value::Int(5))),
),
(
r#"{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}"#,
r#"{"A": 5}"#,
Value::Record(vec![("A".to_string(), Value::Int(5))]),
),
(
r#"["null", "int"]"#,
"null",
Value::Union(0, Box::new(Value::Null)),
),
]
})
}
fn long_record_schema() -> &'static Schema {
static LONG_RECORD_SCHEMA_ONCE: OnceLock<Schema> = OnceLock::new();
LONG_RECORD_SCHEMA_ONCE.get_or_init(|| {
Schema::parse_str(
r#"
{
"type": "record",
"name": "Test",
"fields": [
{"name": "A", "type": "int"},
{"name": "B", "type": "int"},
{"name": "C", "type": "int"},
{"name": "D", "type": "int"},
{"name": "E", "type": "int"},
{"name": "F", "type": "int"},
{"name": "G", "type": "int"}
]
}
"#,
)
.unwrap()
})
}
fn long_record_datum() -> &'static Value {
static LONG_RECORD_DATUM_ONCE: OnceLock<Value> = OnceLock::new();
LONG_RECORD_DATUM_ONCE.get_or_init(|| {
Value::Record(vec![
("A".to_string(), Value::Int(1)),
("B".to_string(), Value::Int(2)),
("C".to_string(), Value::Int(3)),
("D".to_string(), Value::Int(4)),
("E".to_string(), Value::Int(5)),
("F".to_string(), Value::Int(6)),
("G".to_string(), Value::Int(7)),
])
})
}
#[test]
fn test_validate() -> TestResult {
for (raw_schema, value) in schemas_to_validate().iter() {
let schema = Schema::parse_str(raw_schema)?;
assert!(
value.validate(&schema),
"value {value:?} does not validate schema: {raw_schema}"
);
}
Ok(())
}
#[test]
fn test_round_trip() -> TestResult {
for (raw_schema, value) in schemas_to_validate().iter() {
let schema = Schema::parse_str(raw_schema)?;
let encoded = to_avro_datum(&schema, value.clone()).unwrap();
let decoded = from_avro_datum(&schema, &mut Cursor::new(encoded), None).unwrap();
assert_eq!(value, &decoded);
}
Ok(())
}
#[test]
fn test_binary_int_encoding() -> TestResult {
for (number, hex_encoding) in binary_encodings().iter() {
let encoded = to_avro_datum(&Schema::Int, Value::Int(*number as i32))?;
assert_eq!(&encoded, hex_encoding);
}
Ok(())
}
#[test]
fn test_binary_long_encoding() -> TestResult {
for (number, hex_encoding) in binary_encodings().iter() {
let encoded = to_avro_datum(&Schema::Long, Value::Long(*number))?;
assert_eq!(&encoded, hex_encoding);
}
Ok(())
}
#[test]
fn test_schema_promotion() -> TestResult {
let promotable_schemas = [r#""int""#, r#""long""#, r#""float""#, r#""double""#];
let promotable_values = vec![
Value::Int(219),
Value::Long(219),
Value::Float(219.0),
Value::Double(219.0),
];
for (i, writer_raw_schema) in promotable_schemas.iter().enumerate() {
let writer_schema = Schema::parse_str(writer_raw_schema)?;
let original_value = &promotable_values[i];
for (j, reader_raw_schema) in promotable_schemas.iter().enumerate().skip(i + 1) {
let reader_schema = Schema::parse_str(reader_raw_schema)?;
let encoded = to_avro_datum(&writer_schema, original_value.clone())?;
let decoded = from_avro_datum(
&writer_schema,
&mut Cursor::new(encoded),
Some(&reader_schema),
)
.unwrap_or_else(|_| {
panic!("failed to decode {original_value:?} with schema: {reader_raw_schema:?}",)
});
assert_eq!(decoded, promotable_values[j]);
}
}
Ok(())
}
#[test]
fn test_unknown_symbol() -> TestResult {
let writer_schema =
Schema::parse_str(r#"{"type": "enum", "name": "Test", "symbols": ["FOO", "BAR"]}"#)?;
let reader_schema =
Schema::parse_str(r#"{"type": "enum", "name": "Test", "symbols": ["BAR", "BAZ"]}"#)?;
let original_value = Value::Enum(0, "FOO".to_string());
let encoded = to_avro_datum(&writer_schema, original_value)?;
let decoded = from_avro_datum(
&writer_schema,
&mut Cursor::new(encoded),
Some(&reader_schema),
);
assert!(decoded.is_err());
Ok(())
}
#[test]
fn test_default_value() -> TestResult {
for (field_type, default_json, default_datum) in default_value_examples().iter() {
let reader_schema = Schema::parse_str(&format!(
r#"{{
"type": "record",
"name": "Test",
"fields": [
{{"name": "H", "type": {field_type}, "default": {default_json}}}
]
}}"#
))?;
let datum_to_read = Value::Record(vec![("H".to_string(), default_datum.clone())]);
let encoded = to_avro_datum(long_record_schema(), long_record_datum().clone())?;
let datum_read = from_avro_datum(
long_record_schema(),
&mut Cursor::new(encoded),
Some(&reader_schema),
)?;
match default_datum {
Value::Double(f) if f.is_nan() => {
let Value::Record(fields) = datum_read else {
unreachable!("the test always constructs top level as record")
};
let Value::Double(f) = fields[0].1 else {
panic!("double expected")
};
assert!(
f.is_nan(),
"{field_type} -> {default_json} is parsed as {f} rather than NaN"
);
}
Value::Float(f) if f.is_nan() => {
let Value::Record(fields) = datum_read else {
unreachable!("the test always constructs top level as record")
};
let Value::Float(f) = fields[0].1 else {
panic!("double expected")
};
assert!(
f.is_nan(),
"{field_type} -> {default_json} is parsed as {f} rather than NaN"
);
}
_ => {
assert_eq!(
datum_read, datum_to_read,
"{} -> {}",
*field_type, *default_json
);
}
}
}
Ok(())
}
#[test]
fn test_no_default_value() -> TestResult {
let reader_schema = Schema::parse_str(
r#"{
"type": "record",
"name": "Test",
"fields": [
{"name": "H", "type": "int"}
]
}"#,
)?;
let encoded = to_avro_datum(long_record_schema(), long_record_datum().clone())?;
let result = from_avro_datum(
long_record_schema(),
&mut Cursor::new(encoded),
Some(&reader_schema),
);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_projection() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "Test",
"fields": [
{"name": "E", "type": "int"},
{"name": "F", "type": "int"}
]
}
"#,
)?;
let datum_to_read = Value::Record(vec![
("E".to_string(), Value::Int(5)),
("F".to_string(), Value::Int(6)),
]);
let encoded = to_avro_datum(long_record_schema(), long_record_datum().clone())?;
let datum_read = from_avro_datum(
long_record_schema(),
&mut Cursor::new(encoded),
Some(&reader_schema),
)?;
assert_eq!(datum_to_read, datum_read);
Ok(())
}
#[test]
fn test_field_order() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "Test",
"fields": [
{"name": "F", "type": "int"},
{"name": "E", "type": "int"}
]
}
"#,
)?;
let datum_to_read = Value::Record(vec![
("F".to_string(), Value::Int(6)),
("E".to_string(), Value::Int(5)),
]);
let encoded = to_avro_datum(long_record_schema(), long_record_datum().clone())?;
let datum_read = from_avro_datum(
long_record_schema(),
&mut Cursor::new(encoded),
Some(&reader_schema),
)?;
assert_eq!(datum_to_read, datum_read);
Ok(())
}
#[test]
fn test_type_exception() -> Result<(), String> {
let writer_schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "Test",
"fields": [
{"name": "F", "type": "int"},
{"name": "E", "type": "int"}
]
}
"#,
)
.unwrap();
let datum_to_write = Value::Record(vec![
("E".to_string(), Value::Int(5)),
("F".to_string(), Value::String(String::from("Bad"))),
]);
let encoded = to_avro_datum(&writer_schema, datum_to_write).map_err(Error::into_details);
match encoded {
Ok(_) => Err(String::from("Expected ValidationError, got Ok")),
Err(Details::Validation) => Ok(()),
Err(ref e) => Err(format!("Expected ValidationError, got {e:?}")),
}
}