1use apache_avro::{Schema, from_value, to_avro_datum};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::error::{PubSubError, Result};
12
13fn avro_value_from_bytes(schema: &Schema, bytes: &[u8]) -> Result<apache_avro::types::Value> {
14 apache_avro::from_avro_datum(schema, &mut std::io::Cursor::new(bytes), None)
15 .map_err(|e| PubSubError::Avro(e.to_string()))
16}
17
18pub fn decode_avro(schema: &Schema, bytes: &[u8]) -> Result<Value> {
27 let value = avro_value_from_bytes(schema, bytes)?;
28 from_value::<Value>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
29}
30
31pub fn decode_avro_typed<T: for<'de> Deserialize<'de>>(schema: &Schema, bytes: &[u8]) -> Result<T> {
50 let value = avro_value_from_bytes(schema, bytes)?;
51 from_value::<T>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
52}
53
54pub fn encode_avro<T: Serialize>(schema: &Schema, value: &T) -> Result<Vec<u8>> {
63 let avro_value = apache_avro::to_value(value).map_err(|e| PubSubError::Avro(e.to_string()))?;
64 let resolved = avro_value
65 .resolve(schema)
66 .map_err(|e| PubSubError::Avro(e.to_string()))?;
67 to_avro_datum(schema, resolved).map_err(|e| PubSubError::Avro(e.to_string()))
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use apache_avro::Schema;
74
75 const SIMPLE_SCHEMA: &str = r#"
76 {
77 "type": "record",
78 "name": "TestEvent",
79 "fields": [
80 {"name": "id", "type": "string"},
81 {"name": "amount", "type": "double"}
82 ]
83 }
84 "#;
85
86 #[test]
87 #[allow(clippy::items_after_statements)]
88 fn test_encode_decode_roundtrip_dynamic() {
89 let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
90 panic!("valid schema")
91 };
92 #[derive(Serialize)]
95 struct Payload<'a> {
96 id: &'a str,
97 amount: f64,
98 }
99
100 let payload = Payload {
101 id: "event-001",
102 amount: 99.5,
103 };
104
105 let encoded = match encode_avro(&schema, &payload) {
106 Ok(enc) => enc,
107 Err(e) => panic!("encode failed with: {e:?}"),
108 };
109 assert!(!encoded.is_empty());
110
111 let decoded = match decode_avro(&schema, &encoded) {
112 Ok(dec) => dec,
113 Err(e) => panic!("decode failed with: {e:?}"),
114 };
115 assert_eq!(decoded["id"], "event-001");
116 let Some(amount) = decoded["amount"].as_f64() else {
117 panic!("amount is not a valid f64")
118 };
119 assert!((amount - 99.5).abs() < f64::EPSILON);
120 }
121
122 #[test]
123 fn test_encode_decode_roundtrip_typed() {
124 #[derive(Serialize, Deserialize, Debug, PartialEq)]
125 struct TestEvent {
126 id: String,
127 amount: f64,
128 }
129
130 let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
131 panic!("valid schema")
132 };
133 let event = TestEvent {
134 id: "event-002".to_string(),
135 amount: 42.0,
136 };
137
138 let Ok(encoded) = encode_avro(&schema, &event) else {
139 panic!("encode succeeds")
140 };
141 let Ok(decoded) = decode_avro_typed::<TestEvent>(&schema, &encoded) else {
142 panic!("decode succeeds")
143 };
144
145 assert_eq!(decoded.id, "event-002");
146 assert!((decoded.amount - 42.0).abs() < f64::EPSILON);
147 }
148
149 #[test]
150 fn test_decode_invalid_bytes_returns_error() {
151 let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
152 panic!("valid schema")
153 };
154 let garbage = vec![0xFF, 0xFE, 0xFD];
155 let result = decode_avro(&schema, &garbage);
156 let Err(err) = result else {
157 panic!("Expected an error");
158 };
159 assert!(matches!(err, PubSubError::Avro(_)));
160 }
161}