1use apache_avro::{Schema, from_value, to_avro_datum};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::error::{PubSubError, Result};
12
13fn avro_value_from_bytes(schema: &Schema, bytes: &[u8]) -> Result<apache_avro::types::Value> {
14 apache_avro::from_avro_datum(schema, &mut std::io::Cursor::new(bytes), None)
15 .map_err(|e| PubSubError::Avro(e.to_string()))
16}
17
18pub fn decode_avro(schema: &Schema, bytes: &[u8]) -> Result<Value> {
27 let value = avro_value_from_bytes(schema, bytes)?;
28 from_value::<Value>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
29}
30
31pub fn decode_avro_typed<T: for<'de> Deserialize<'de>>(schema: &Schema, bytes: &[u8]) -> Result<T> {
50 let value = avro_value_from_bytes(schema, bytes)?;
51 from_value::<T>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
52}
53
54pub fn encode_avro<T: Serialize>(schema: &Schema, value: &T) -> Result<Vec<u8>> {
63 let avro_value = apache_avro::to_value(value).map_err(|e| PubSubError::Avro(e.to_string()))?;
64 let resolved = avro_value
65 .resolve(schema)
66 .map_err(|e| PubSubError::Avro(e.to_string()))?;
67 to_avro_datum(schema, resolved).map_err(|e| PubSubError::Avro(e.to_string()))
68}
69
70#[cfg(test)]
71#[allow(clippy::unwrap_used, clippy::expect_used)]
72mod tests {
73 use super::*;
74 use apache_avro::Schema;
75
76 const SIMPLE_SCHEMA: &str = r#"
77 {
78 "type": "record",
79 "name": "TestEvent",
80 "fields": [
81 {"name": "id", "type": "string"},
82 {"name": "amount", "type": "double"}
83 ]
84 }
85 "#;
86
87 #[test]
88 fn test_encode_decode_roundtrip_dynamic() {
89 let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
90 let payload = serde_json::json!({
91 "id": "event-001",
92 "amount": 99.5
93 });
94
95 let encoded = encode_avro(&schema, &payload).expect("encode succeeds");
96 assert!(!encoded.is_empty());
97
98 let decoded: Value = decode_avro(&schema, &encoded).expect("decode succeeds");
99 assert_eq!(decoded["id"], "event-001");
100 assert!((decoded["amount"].as_f64().unwrap() - 99.5).abs() < f64::EPSILON);
101 }
102
103 #[test]
104 fn test_encode_decode_roundtrip_typed() {
105 #[derive(Serialize, Deserialize, Debug, PartialEq)]
106 struct TestEvent {
107 id: String,
108 amount: f64,
109 }
110
111 let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
112 let event = TestEvent {
113 id: "event-002".to_string(),
114 amount: 42.0,
115 };
116
117 let encoded = encode_avro(&schema, &event).expect("encode succeeds");
118 let decoded: TestEvent = decode_avro_typed(&schema, &encoded).expect("decode succeeds");
119
120 assert_eq!(decoded.id, "event-002");
121 assert!((decoded.amount - 42.0).abs() < f64::EPSILON);
122 }
123
124 #[test]
125 fn test_decode_invalid_bytes_returns_error() {
126 let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
127 let garbage = vec![0xFF, 0xFE, 0xFD];
128 let result = decode_avro(&schema, &garbage);
129 let Err(err) = result else {
130 panic!("Expected an error");
131 };
132 assert!(matches!(err, PubSubError::Avro(_)));
133 }
134}