use apache_avro::{Schema, from_value, to_avro_datum};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::error::{PubSubError, Result};
fn avro_value_from_bytes(schema: &Schema, bytes: &[u8]) -> Result<apache_avro::types::Value> {
apache_avro::from_avro_datum(schema, &mut std::io::Cursor::new(bytes), None)
.map_err(|e| PubSubError::Avro(e.to_string()))
}
pub fn decode_avro(schema: &Schema, bytes: &[u8]) -> Result<Value> {
let value = avro_value_from_bytes(schema, bytes)?;
from_value::<Value>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
}
pub fn decode_avro_typed<T: for<'de> Deserialize<'de>>(schema: &Schema, bytes: &[u8]) -> Result<T> {
let value = avro_value_from_bytes(schema, bytes)?;
from_value::<T>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
}
pub fn encode_avro<T: Serialize>(schema: &Schema, value: &T) -> Result<Vec<u8>> {
let avro_value = apache_avro::to_value(value).map_err(|e| PubSubError::Avro(e.to_string()))?;
let resolved = avro_value
.resolve(schema)
.map_err(|e| PubSubError::Avro(e.to_string()))?;
to_avro_datum(schema, resolved).map_err(|e| PubSubError::Avro(e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use apache_avro::Schema;
const SIMPLE_SCHEMA: &str = r#"
{
"type": "record",
"name": "TestEvent",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
"#;
#[test]
#[allow(clippy::items_after_statements)]
fn test_encode_decode_roundtrip_dynamic() {
let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
panic!("valid schema")
};
#[derive(Serialize)]
struct Payload<'a> {
id: &'a str,
amount: f64,
}
let payload = Payload {
id: "event-001",
amount: 99.5,
};
let encoded = match encode_avro(&schema, &payload) {
Ok(enc) => enc,
Err(e) => panic!("encode failed with: {e:?}"),
};
assert!(!encoded.is_empty());
let decoded = match decode_avro(&schema, &encoded) {
Ok(dec) => dec,
Err(e) => panic!("decode failed with: {e:?}"),
};
assert_eq!(decoded["id"], "event-001");
let Some(amount) = decoded["amount"].as_f64() else {
panic!("amount is not a valid f64")
};
assert!((amount - 99.5).abs() < f64::EPSILON);
}
#[test]
fn test_encode_decode_roundtrip_typed() {
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct TestEvent {
id: String,
amount: f64,
}
let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
panic!("valid schema")
};
let event = TestEvent {
id: "event-002".to_string(),
amount: 42.0,
};
let Ok(encoded) = encode_avro(&schema, &event) else {
panic!("encode succeeds")
};
let Ok(decoded) = decode_avro_typed::<TestEvent>(&schema, &encoded) else {
panic!("decode succeeds")
};
assert_eq!(decoded.id, "event-002");
assert!((decoded.amount - 42.0).abs() < f64::EPSILON);
}
#[test]
fn test_decode_invalid_bytes_returns_error() {
let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
panic!("valid schema")
};
let garbage = vec![0xFF, 0xFE, 0xFD];
let result = decode_avro(&schema, &garbage);
let Err(err) = result else {
panic!("Expected an error");
};
assert!(matches!(err, PubSubError::Avro(_)));
}
}