Skip to main content

camel_dataformat_protobuf/
lib.rs

1//! camel-dataformat-protobuf — Protobuf DataFormat for Apache Camel Rust.
2//!
3//! Provides marshal/unmarshal support for Protocol Buffers using dynamic message
4//! descriptors compiled at runtime via `prost-reflect`. JSON ↔ binary protobuf
5//! round-tripping is supported out of the box.
6//!
7//! TODO(PROTO-005): Schema registry integration (e.g. Confluent Schema Registry)
8//! is not yet implemented. When available, this will allow automatic schema
9//! lookup/registration by subject and version during marshal/unmarshal.
10
11use std::path::Path;
12
13use bytes::BytesMut;
14use camel_api::body::Body;
15use camel_api::data_format::DataFormat;
16use camel_api::error::CamelError;
17use camel_proto_compiler::{ProtoCache, compile_proto};
18use prost::Message;
19use prost_reflect::{DynamicMessage, MessageDescriptor};
20
21#[derive(Debug, Clone, Default, PartialEq, Eq)]
22pub struct ProtobufConfig {
23    /// Optional content type format, e.g. `application/protobuf` or `application/json`.
24    /// When unset, binary protobuf is assumed.
25    pub content_type_format: Option<String>,
26    /// Optional fully-qualified class/type name used for deserialization.
27    pub instance_class: Option<String>,
28}
29
30impl ProtobufConfig {
31    pub fn validate(&self) -> Result<(), CamelError> {
32        if let Some(instance_class) = &self.instance_class
33            && instance_class.trim().is_empty()
34        {
35            return Err(CamelError::TypeConversionFailed(
36                "instance_class must be non-empty when set".to_string(),
37            ));
38        }
39
40        Ok(())
41    }
42}
43
44pub struct ProtobufDataFormat {
45    descriptor: MessageDescriptor,
46}
47
48impl ProtobufDataFormat {
49    pub fn new<P: AsRef<Path>>(proto_path: P, message_name: &str) -> Result<Self, CamelError> {
50        let pool =
51            compile_proto(proto_path.as_ref(), std::iter::empty::<&Path>()).map_err(|e| {
52                CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
53            })?;
54        let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
55            CamelError::Config(format!("message descriptor not found: {message_name}"))
56        })?;
57        Ok(Self { descriptor })
58    }
59
60    pub fn new_with_cache<P: AsRef<Path>>(
61        proto_path: P,
62        message_name: &str,
63        cache: &ProtoCache,
64    ) -> Result<Self, CamelError> {
65        let pool = cache
66            .get_or_compile(proto_path.as_ref(), std::iter::empty::<&Path>())
67            .map_err(|e| {
68                CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
69            })?;
70        let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
71            CamelError::Config(format!("message descriptor not found: {message_name}"))
72        })?;
73        Ok(Self { descriptor })
74    }
75
76    pub fn descriptor(&self) -> &MessageDescriptor {
77        &self.descriptor
78    }
79
80    pub fn json_to_dynamic(
81        &self,
82        json_val: serde_json::Value,
83    ) -> Result<DynamicMessage, CamelError> {
84        let json_str = serde_json::to_string(&json_val).map_err(|e| {
85            CamelError::TypeConversionFailed(format!("failed to serialize JSON: {e}"))
86        })?;
87        let mut de = serde_json::Deserializer::from_str(&json_str);
88        DynamicMessage::deserialize(self.descriptor.clone(), &mut de).map_err(|e| {
89            CamelError::TypeConversionFailed(format!("failed to parse JSON into protobuf: {e}"))
90        })
91    }
92
93    pub fn dynamic_to_json(&self, msg: DynamicMessage) -> Result<serde_json::Value, CamelError> {
94        serde_json::to_value(&msg).map_err(|e| {
95            CamelError::TypeConversionFailed(format!("failed to serialize protobuf to JSON: {e}"))
96        })
97    }
98}
99
100impl DataFormat for ProtobufDataFormat {
101    fn name(&self) -> &str {
102        "protobuf"
103    }
104
105    fn marshal(&self, body: Body) -> Result<Body, CamelError> {
106        match body {
107            Body::Json(val) => {
108                let msg = self.json_to_dynamic(val)?;
109                let mut buf = BytesMut::new();
110                msg.encode(&mut buf).map_err(|e| {
111                    CamelError::TypeConversionFailed(format!(
112                        "failed to encode protobuf message: {e}"
113                    ))
114                })?;
115                Ok(Body::Bytes(buf.freeze()))
116            }
117            Body::Text(text) => {
118                let val: serde_json::Value = serde_json::from_str(&text).map_err(|e| {
119                    CamelError::TypeConversionFailed(format!(
120                        "invalid JSON text for protobuf marshal: {e}"
121                    ))
122                })?;
123                self.marshal(Body::Json(val))
124            }
125            Body::Bytes(bytes) => {
126                DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(|e| {
127                    CamelError::ProcessorError(format!(
128                        "protobuf marshal: invalid bytes for type {}: {e}",
129                        self.descriptor.full_name()
130                    ))
131                })?;
132                Ok(Body::Bytes(bytes))
133            }
134            Body::Empty => Err(CamelError::TypeConversionFailed(
135                "protobuf marshal does not support empty body".to_string(),
136            )),
137            Body::Stream(_) => Err(CamelError::TypeConversionFailed(
138                "protobuf marshal does not support stream body".to_string(),
139            )),
140            Body::Xml(_) => Err(CamelError::TypeConversionFailed(
141                "protobuf marshal does not support XML body".to_string(),
142            )),
143        }
144    }
145
146    fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
147        match body {
148            Body::Bytes(bytes) => {
149                let msg = DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(
150                    |e| {
151                        CamelError::TypeConversionFailed(format!(
152                            "failed to decode protobuf bytes: {e}"
153                        ))
154                    },
155                )?;
156                let json = self.dynamic_to_json(msg)?;
157                Ok(Body::Json(json))
158            }
159            Body::Json(val) => Ok(Body::Json(val)),
160            Body::Text(_) => Err(CamelError::TypeConversionFailed(
161                "protobuf unmarshal does not support text body".to_string(),
162            )),
163            Body::Stream(_) => Err(CamelError::TypeConversionFailed(
164                "protobuf unmarshal does not support stream body".to_string(),
165            )),
166            Body::Empty => Err(CamelError::TypeConversionFailed(
167                "protobuf unmarshal does not support empty body".to_string(),
168            )),
169            Body::Xml(_) => Err(CamelError::TypeConversionFailed(
170                "protobuf unmarshal does not support XML body".to_string(),
171            )),
172        }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::path::PathBuf;
179
180    use bytes::Bytes;
181    use camel_api::body::Body;
182    use camel_api::data_format::DataFormat;
183    use camel_api::error::CamelError;
184    use serde_json::json;
185
186    use super::{ProtobufConfig, ProtobufDataFormat};
187
188    fn test_proto_path() -> PathBuf {
189        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
190            .join("tests")
191            .join("helloworld.proto")
192    }
193
194    fn data_format() -> ProtobufDataFormat {
195        ProtobufDataFormat::new(test_proto_path(), "helloworld.HelloRequest")
196            .expect("should load descriptor")
197    }
198
199    #[test]
200    fn test_name() {
201        let df = data_format();
202        assert_eq!(df.name(), "protobuf");
203    }
204
205    #[test]
206    fn test_marshal_json_to_bytes() {
207        let df = data_format();
208        let body = Body::Json(json!({ "name": "Alice" }));
209        let out = df.marshal(body).expect("marshal should succeed");
210        match out {
211            Body::Bytes(b) => assert!(!b.is_empty()),
212            other => panic!("expected bytes, got {other:?}"),
213        }
214    }
215
216    #[test]
217    fn test_unmarshal_bytes_to_json() {
218        let df = data_format();
219        let bytes = match df
220            .marshal(Body::Json(json!({ "name": "Alice" })))
221            .expect("marshal should succeed")
222        {
223            Body::Bytes(b) => b,
224            other => panic!("expected bytes, got {other:?}"),
225        };
226
227        let out = df
228            .unmarshal(Body::Bytes(bytes))
229            .expect("unmarshal should succeed");
230        match out {
231            Body::Json(v) => assert_eq!(v, json!({ "name": "Alice" })),
232            other => panic!("expected json, got {other:?}"),
233        }
234    }
235
236    #[test]
237    fn test_roundtrip_json_bytes_json() {
238        let df = data_format();
239        let input = json!({ "name": "Bob" });
240        let bytes = match df
241            .marshal(Body::Json(input.clone()))
242            .expect("marshal should succeed")
243        {
244            Body::Bytes(b) => b,
245            other => panic!("expected bytes, got {other:?}"),
246        };
247        let output = match df
248            .unmarshal(Body::Bytes(bytes))
249            .expect("unmarshal should succeed")
250        {
251            Body::Json(v) => v,
252            other => panic!("expected json, got {other:?}"),
253        };
254        assert_eq!(output, input);
255    }
256
257    #[test]
258    fn test_marshal_bytes_passthrough() {
259        let df = data_format();
260        let body = Body::Bytes(Bytes::from_static(b"raw"));
261        let err = df
262            .marshal(body)
263            .expect_err("invalid bytes should be rejected");
264        assert!(
265            err.to_string()
266                .contains("protobuf marshal: invalid bytes for type")
267        );
268    }
269
270    #[test]
271    fn test_marshal_valid_bytes_accepted() {
272        let df = data_format();
273        let bytes = match df
274            .marshal(Body::Json(json!({ "name": "Alice" })))
275            .expect("marshal should succeed")
276        {
277            Body::Bytes(b) => b,
278            other => panic!("expected bytes, got {other:?}"),
279        };
280        let out = df
281            .marshal(Body::Bytes(bytes.clone()))
282            .expect("valid protobuf bytes should be accepted");
283        assert_eq!(out, Body::Bytes(bytes));
284    }
285
286    #[test]
287    fn test_unmarshal_json_passthrough() {
288        let df = data_format();
289        let body = Body::Json(json!({ "name": "Passthrough" }));
290        let out = df
291            .unmarshal(body.clone())
292            .expect("unmarshal should pass through JSON");
293        assert_eq!(out, body);
294    }
295
296    #[test]
297    fn test_marshal_empty_rejected() {
298        let df = data_format();
299        let err = df.marshal(Body::Empty).expect_err("empty must be rejected");
300        assert!(format!("{err}").contains("empty"));
301    }
302
303    #[test]
304    fn test_unmarshal_empty_rejected() {
305        let df = data_format();
306        let err = df
307            .unmarshal(Body::Empty)
308            .expect_err("empty must be rejected");
309        assert!(format!("{err}").contains("empty"));
310    }
311
312    #[test]
313    fn test_message_not_found_error() {
314        let err = ProtobufDataFormat::new(test_proto_path(), "helloworld.DoesNotExist")
315            .err()
316            .expect("unknown message should fail");
317        assert!(matches!(err, CamelError::Config(_)));
318    }
319
320    #[test]
321    fn test_empty_instance_class_rejected() {
322        let config = ProtobufConfig {
323            instance_class: Some("".into()),
324            ..Default::default()
325        };
326        assert!(config.validate().is_err());
327    }
328
329    #[test]
330    fn test_valid_instance_class_accepted() {
331        let config = ProtobufConfig {
332            instance_class: Some("com.example.MyMessage".into()),
333            ..Default::default()
334        };
335        assert!(config.validate().is_ok());
336    }
337
338    #[test]
339    fn test_no_instance_class_valid() {
340        let config = ProtobufConfig::default();
341        assert!(config.validate().is_ok());
342    }
343
344    /// PROTO-003: Encode/decode roundtrip — marshal JSON to bytes, then unmarshal
345    /// those bytes back to JSON and verify the values match the original input.
346    #[test]
347    fn test_encode_decode_roundtrip() {
348        let df = data_format();
349        let original = json!({ "name": "RoundtripCharlie" });
350
351        // Encode: JSON → binary protobuf bytes
352        let encoded = match df
353            .marshal(Body::Json(original.clone()))
354            .expect("marshal should succeed")
355        {
356            Body::Bytes(b) => b,
357            other => panic!("expected bytes after marshal, got {other:?}"),
358        };
359        assert!(!encoded.is_empty(), "encoded bytes should not be empty");
360
361        // Decode: binary protobuf bytes → JSON
362        let decoded = match df
363            .unmarshal(Body::Bytes(encoded))
364            .expect("unmarshal should succeed")
365        {
366            Body::Json(v) => v,
367            other => panic!("expected json after unmarshal, got {other:?}"),
368        };
369
370        assert_eq!(decoded, original, "roundtrip should preserve field values");
371    }
372}