Skip to main content

force_pubsub/
codec.rs

1//! Avro codec utilities for encoding and decoding Salesforce Pub/Sub events.
2//!
3//! Salesforce Pub/Sub events are encoded as Avro binary (not JSON).
4//! Each event carries a `schema_id` in its header that identifies the Avro
5//! schema required for decoding.
6
7use apache_avro::{Schema, from_value, to_avro_datum};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::error::{PubSubError, Result};
12
13fn avro_value_from_bytes(schema: &Schema, bytes: &[u8]) -> Result<apache_avro::types::Value> {
14    apache_avro::from_avro_datum(schema, &mut std::io::Cursor::new(bytes), None)
15        .map_err(|e| PubSubError::Avro(e.to_string()))
16}
17
18/// Decode Avro-binary bytes into a [`serde_json::Value`] using the given schema.
19///
20/// The schema must match the `schema_id` on the event header.
21///
22/// # Errors
23///
24/// Returns [`PubSubError::Avro`] if the bytes cannot be decoded with the
25/// provided schema.
26pub fn decode_avro(schema: &Schema, bytes: &[u8]) -> Result<Value> {
27    let value = avro_value_from_bytes(schema, bytes)?;
28    from_value::<Value>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
29}
30
31/// Decode Avro-binary bytes directly into a typed struct `T` using the given schema.
32///
33/// This is a **single-step** Avro → `T` decode. It is a public utility for callers
34/// who already hold a schema and bytes and want to avoid the two-step
35/// (`decode_avro` → `serde_json::from_value`) path used internally by the typed
36/// subscribe stream. Both produce identical results; this variant is marginally
37/// cheaper because it skips the intermediate [`serde_json::Value`] allocation.
38///
39/// The typed subscribe stream (`subscribe_typed` / `subscribe_typed_dynamic`) does
40/// not use this function because it is built on top of the dynamic stream
41/// (which decodes to `Value` first) so that both variants share one subscribe
42/// loop implementation. Callers who own raw event bytes and want zero-overhead
43/// typed decoding should prefer this function.
44///
45/// # Errors
46///
47/// Returns [`PubSubError::Avro`] if the bytes cannot be decoded or deserialized
48/// into `T`.
49pub fn decode_avro_typed<T: for<'de> Deserialize<'de>>(schema: &Schema, bytes: &[u8]) -> Result<T> {
50    let value = avro_value_from_bytes(schema, bytes)?;
51    from_value::<T>(&value).map_err(|e| PubSubError::Avro(e.to_string()))
52}
53
54/// Encode a serializable value to Avro binary using the given schema.
55///
56/// Used when publishing events to the Salesforce Pub/Sub API.
57///
58/// # Errors
59///
60/// Returns [`PubSubError::Avro`] if the value cannot be serialized or resolved
61/// against the provided schema.
62pub fn encode_avro<T: Serialize>(schema: &Schema, value: &T) -> Result<Vec<u8>> {
63    let avro_value = apache_avro::to_value(value).map_err(|e| PubSubError::Avro(e.to_string()))?;
64    let resolved = avro_value
65        .resolve(schema)
66        .map_err(|e| PubSubError::Avro(e.to_string()))?;
67    to_avro_datum(schema, resolved).map_err(|e| PubSubError::Avro(e.to_string()))
68}
69
70#[cfg(test)]
71#[allow(clippy::unwrap_used, clippy::expect_used)]
72mod tests {
73    use super::*;
74    use apache_avro::Schema;
75
76    const SIMPLE_SCHEMA: &str = r#"
77    {
78        "type": "record",
79        "name": "TestEvent",
80        "fields": [
81            {"name": "id", "type": "string"},
82            {"name": "amount", "type": "double"}
83        ]
84    }
85    "#;
86
87    #[test]
88    fn test_encode_decode_roundtrip_dynamic() {
89        let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
90        let payload = serde_json::json!({
91            "id": "event-001",
92            "amount": 99.5
93        });
94
95        let encoded = encode_avro(&schema, &payload).expect("encode succeeds");
96        assert!(!encoded.is_empty());
97
98        let decoded: Value = decode_avro(&schema, &encoded).expect("decode succeeds");
99        assert_eq!(decoded["id"], "event-001");
100        assert!((decoded["amount"].as_f64().unwrap() - 99.5).abs() < f64::EPSILON);
101    }
102
103    #[test]
104    fn test_encode_decode_roundtrip_typed() {
105        #[derive(Serialize, Deserialize, Debug, PartialEq)]
106        struct TestEvent {
107            id: String,
108            amount: f64,
109        }
110
111        let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
112        let event = TestEvent {
113            id: "event-002".to_string(),
114            amount: 42.0,
115        };
116
117        let encoded = encode_avro(&schema, &event).expect("encode succeeds");
118        let decoded: TestEvent = decode_avro_typed(&schema, &encoded).expect("decode succeeds");
119
120        assert_eq!(decoded.id, "event-002");
121        assert!((decoded.amount - 42.0).abs() < f64::EPSILON);
122    }
123
124    #[test]
125    fn test_decode_invalid_bytes_returns_error() {
126        let schema = Schema::parse_str(SIMPLE_SCHEMA).expect("valid schema");
127        let garbage = vec![0xFF, 0xFE, 0xFD];
128        let result = decode_avro(&schema, &garbage);
129        let Err(err) = result else {
130            panic!("Expected an error");
131        };
132        assert!(matches!(err, PubSubError::Avro(_)));
133    }
134}