Skip to main content

force_pubsub/
codec.rs

1//! Avro codec utilities for encoding and decoding Salesforce Pub/Sub events.
2//!
3//! Salesforce Pub/Sub events are encoded as Avro binary (not JSON).
4//! Each event carries a `schema_id` in its header that identifies the Avro
5//! schema required for decoding.
6
7use apache_avro::{Schema, from_value, to_avro_datum};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::error::{PubSubError, Result};
12
13fn avro_value_from_bytes(schema: &Schema, bytes: &[u8]) -> Result<apache_avro::types::Value> {
14    apache_avro::from_avro_datum(schema, &mut std::io::Cursor::new(bytes), None)
15        .map_err(|e| PubSubError::Avro(e.to_string()))
16}
17
18/// Decode Avro-binary bytes into a [`serde_json::Value`] using the given schema.
19///
20/// The schema must match the `schema_id` on the event header.
21///
22/// # Errors
23///
24/// Returns [`PubSubError::Avro`] if the bytes cannot be decoded with the
25/// provided schema.
26pub fn decode_avro(schema: &Schema, bytes: &[u8]) -> Result<Value> {
27    let value = avro_value_from_bytes(schema, bytes)?;
28    from_value::<Value>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
29}
30
31/// Decode Avro-binary bytes directly into a typed struct `T` using the given schema.
32///
33/// This is a **single-step** Avro → `T` decode. It is a public utility for callers
34/// who already hold a schema and bytes and want to avoid the two-step
35/// (`decode_avro` → `serde_json::from_value`) path used internally by the typed
36/// subscribe stream. Both produce identical results; this variant is marginally
37/// cheaper because it skips the intermediate [`serde_json::Value`] allocation.
38///
39/// The typed subscribe stream (`subscribe_typed` / `subscribe_typed_dynamic`) does
40/// not use this function because it is built on top of the dynamic stream
41/// (which decodes to `Value` first) so that both variants share one subscribe
42/// loop implementation. Callers who own raw event bytes and want zero-overhead
43/// typed decoding should prefer this function.
44///
45/// # Errors
46///
47/// Returns [`PubSubError::Avro`] if the bytes cannot be decoded or deserialized
48/// into `T`.
49pub fn decode_avro_typed<T: for<'de> Deserialize<'de>>(schema: &Schema, bytes: &[u8]) -> Result<T> {
50    let value = avro_value_from_bytes(schema, bytes)?;
51    from_value::<T>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
52}
53
54/// Encode a serializable value to Avro binary using the given schema.
55///
56/// Used when publishing events to the Salesforce Pub/Sub API.
57///
58/// # Errors
59///
60/// Returns [`PubSubError::Avro`] if the value cannot be serialized or resolved
61/// against the provided schema.
62pub fn encode_avro<T: Serialize>(schema: &Schema, value: &T) -> Result<Vec<u8>> {
63    let avro_value = apache_avro::to_value(value).map_err(|e| PubSubError::Avro(e.to_string()))?;
64    let resolved = avro_value
65        .resolve(schema)
66        .map_err(|e| PubSubError::Avro(e.to_string()))?;
67    to_avro_datum(schema, resolved).map_err(|e| PubSubError::Avro(e.to_string()))
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use apache_avro::Schema;
74
75    const SIMPLE_SCHEMA: &str = r#"
76    {
77        "type": "record",
78        "name": "TestEvent",
79        "fields": [
80            {"name": "id", "type": "string"},
81            {"name": "amount", "type": "double"}
82        ]
83    }
84    "#;
85
86    #[test]
87    #[allow(clippy::items_after_statements)]
88    fn test_encode_decode_roundtrip_dynamic() {
89        let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
90            panic!("valid schema")
91        };
92        // apache-avro 0.18 fails to encode serde_json::Value with arbitrary_precision enabled,
93        // so we must use a typed struct to test the encode functionality, mimicking dynamic behavior.
94        #[derive(Serialize)]
95        struct Payload<'a> {
96            id: &'a str,
97            amount: f64,
98        }
99
100        let payload = Payload {
101            id: "event-001",
102            amount: 99.5,
103        };
104
105        let encoded = match encode_avro(&schema, &payload) {
106            Ok(enc) => enc,
107            Err(e) => panic!("encode failed with: {e:?}"),
108        };
109        assert!(!encoded.is_empty());
110
111        let decoded = match decode_avro(&schema, &encoded) {
112            Ok(dec) => dec,
113            Err(e) => panic!("decode failed with: {e:?}"),
114        };
115        assert_eq!(decoded["id"], "event-001");
116        let Some(amount) = decoded["amount"].as_f64() else {
117            panic!("amount is not a valid f64")
118        };
119        assert!((amount - 99.5).abs() < f64::EPSILON);
120    }
121
122    #[test]
123    fn test_encode_decode_roundtrip_typed() {
124        #[derive(Serialize, Deserialize, Debug, PartialEq)]
125        struct TestEvent {
126            id: String,
127            amount: f64,
128        }
129
130        let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
131            panic!("valid schema")
132        };
133        let event = TestEvent {
134            id: "event-002".to_string(),
135            amount: 42.0,
136        };
137
138        let Ok(encoded) = encode_avro(&schema, &event) else {
139            panic!("encode succeeds")
140        };
141        let Ok(decoded) = decode_avro_typed::<TestEvent>(&schema, &encoded) else {
142            panic!("decode succeeds")
143        };
144
145        assert_eq!(decoded.id, "event-002");
146        assert!((decoded.amount - 42.0).abs() < f64::EPSILON);
147    }
148
149    #[test]
150    fn test_decode_invalid_bytes_returns_error() {
151        let Ok(schema) = Schema::parse_str(SIMPLE_SCHEMA) else {
152            panic!("valid schema")
153        };
154        let garbage = vec![0xFF, 0xFE, 0xFD];
155        let result = decode_avro(&schema, &garbage);
156        let Err(err) = result else {
157            panic!("Expected an error");
158        };
159        assert!(matches!(err, PubSubError::Avro(_)));
160    }
161}