Skip to main content

camel_dataformat_protobuf/
lib.rs

1use std::path::Path;
2
3use bytes::BytesMut;
4use camel_api::body::Body;
5use camel_api::data_format::DataFormat;
6use camel_api::error::CamelError;
7use camel_proto_compiler::{ProtoCache, compile_proto};
8use prost::Message;
9use prost_reflect::{DynamicMessage, MessageDescriptor};
10
11pub struct ProtobufDataFormat {
12    descriptor: MessageDescriptor,
13}
14
15impl ProtobufDataFormat {
16    pub fn new<P: AsRef<Path>>(proto_path: P, message_name: &str) -> Result<Self, CamelError> {
17        let pool =
18            compile_proto(proto_path.as_ref(), std::iter::empty::<&Path>()).map_err(|e| {
19                CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
20            })?;
21        let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
22            CamelError::TypeConversionFailed(format!(
23                "message descriptor not found: {message_name}"
24            ))
25        })?;
26        Ok(Self { descriptor })
27    }
28
29    pub fn new_with_cache<P: AsRef<Path>>(
30        proto_path: P,
31        message_name: &str,
32        cache: &ProtoCache,
33    ) -> Result<Self, CamelError> {
34        let pool = cache
35            .get_or_compile(proto_path.as_ref(), std::iter::empty::<&Path>())
36            .map_err(|e| {
37                CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
38            })?;
39        let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
40            CamelError::TypeConversionFailed(format!(
41                "message descriptor not found: {message_name}"
42            ))
43        })?;
44        Ok(Self { descriptor })
45    }
46
47    pub fn descriptor(&self) -> &MessageDescriptor {
48        &self.descriptor
49    }
50
51    pub fn json_to_dynamic(
52        &self,
53        json_val: serde_json::Value,
54    ) -> Result<DynamicMessage, CamelError> {
55        let json_str = serde_json::to_string(&json_val).map_err(|e| {
56            CamelError::TypeConversionFailed(format!("failed to serialize JSON: {e}"))
57        })?;
58        let mut de = serde_json::Deserializer::from_str(&json_str);
59        DynamicMessage::deserialize(self.descriptor.clone(), &mut de).map_err(|e| {
60            CamelError::TypeConversionFailed(format!("failed to parse JSON into protobuf: {e}"))
61        })
62    }
63
64    pub fn dynamic_to_json(&self, msg: DynamicMessage) -> Result<serde_json::Value, CamelError> {
65        serde_json::to_value(&msg).map_err(|e| {
66            CamelError::TypeConversionFailed(format!("failed to serialize protobuf to JSON: {e}"))
67        })
68    }
69}
70
71impl DataFormat for ProtobufDataFormat {
72    fn name(&self) -> &str {
73        "protobuf"
74    }
75
76    fn marshal(&self, body: Body) -> Result<Body, CamelError> {
77        match body {
78            Body::Json(val) => {
79                let msg = self.json_to_dynamic(val)?;
80                let mut buf = BytesMut::new();
81                msg.encode(&mut buf).map_err(|e| {
82                    CamelError::TypeConversionFailed(format!(
83                        "failed to encode protobuf message: {e}"
84                    ))
85                })?;
86                Ok(Body::Bytes(buf.freeze()))
87            }
88            Body::Text(text) => {
89                let val: serde_json::Value = serde_json::from_str(&text).map_err(|e| {
90                    CamelError::TypeConversionFailed(format!(
91                        "invalid JSON text for protobuf marshal: {e}"
92                    ))
93                })?;
94                self.marshal(Body::Json(val))
95            }
96            Body::Bytes(bytes) => Ok(Body::Bytes(bytes)),
97            Body::Empty => Err(CamelError::TypeConversionFailed(
98                "protobuf marshal does not support empty body".to_string(),
99            )),
100            Body::Stream(_) => Err(CamelError::TypeConversionFailed(
101                "protobuf marshal does not support stream body".to_string(),
102            )),
103            Body::Xml(_) => Err(CamelError::TypeConversionFailed(
104                "protobuf marshal does not support XML body".to_string(),
105            )),
106        }
107    }
108
109    fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
110        match body {
111            Body::Bytes(bytes) => {
112                let msg = DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(
113                    |e| {
114                        CamelError::TypeConversionFailed(format!(
115                            "failed to decode protobuf bytes: {e}"
116                        ))
117                    },
118                )?;
119                let json = self.dynamic_to_json(msg)?;
120                Ok(Body::Json(json))
121            }
122            Body::Json(val) => Ok(Body::Json(val)),
123            Body::Text(_) => Err(CamelError::TypeConversionFailed(
124                "protobuf unmarshal does not support text body".to_string(),
125            )),
126            Body::Stream(_) => Err(CamelError::TypeConversionFailed(
127                "protobuf unmarshal does not support stream body".to_string(),
128            )),
129            Body::Empty => Err(CamelError::TypeConversionFailed(
130                "protobuf unmarshal does not support empty body".to_string(),
131            )),
132            Body::Xml(_) => Err(CamelError::TypeConversionFailed(
133                "protobuf unmarshal does not support XML body".to_string(),
134            )),
135        }
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use std::path::PathBuf;
142
143    use bytes::Bytes;
144    use camel_api::body::Body;
145    use camel_api::data_format::DataFormat;
146    use serde_json::json;
147
148    use super::ProtobufDataFormat;
149
150    fn test_proto_path() -> PathBuf {
151        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
152            .join("tests")
153            .join("helloworld.proto")
154    }
155
156    fn data_format() -> ProtobufDataFormat {
157        ProtobufDataFormat::new(test_proto_path(), "helloworld.HelloRequest")
158            .expect("should load descriptor")
159    }
160
161    #[test]
162    fn test_name() {
163        let df = data_format();
164        assert_eq!(df.name(), "protobuf");
165    }
166
167    #[test]
168    fn test_marshal_json_to_bytes() {
169        let df = data_format();
170        let body = Body::Json(json!({ "name": "Alice" }));
171        let out = df.marshal(body).expect("marshal should succeed");
172        match out {
173            Body::Bytes(b) => assert!(!b.is_empty()),
174            other => panic!("expected bytes, got {other:?}"),
175        }
176    }
177
178    #[test]
179    fn test_unmarshal_bytes_to_json() {
180        let df = data_format();
181        let bytes = match df
182            .marshal(Body::Json(json!({ "name": "Alice" })))
183            .expect("marshal should succeed")
184        {
185            Body::Bytes(b) => b,
186            other => panic!("expected bytes, got {other:?}"),
187        };
188
189        let out = df
190            .unmarshal(Body::Bytes(bytes))
191            .expect("unmarshal should succeed");
192        match out {
193            Body::Json(v) => assert_eq!(v, json!({ "name": "Alice" })),
194            other => panic!("expected json, got {other:?}"),
195        }
196    }
197
198    #[test]
199    fn test_roundtrip_json_bytes_json() {
200        let df = data_format();
201        let input = json!({ "name": "Bob" });
202        let bytes = match df
203            .marshal(Body::Json(input.clone()))
204            .expect("marshal should succeed")
205        {
206            Body::Bytes(b) => b,
207            other => panic!("expected bytes, got {other:?}"),
208        };
209        let output = match df
210            .unmarshal(Body::Bytes(bytes))
211            .expect("unmarshal should succeed")
212        {
213            Body::Json(v) => v,
214            other => panic!("expected json, got {other:?}"),
215        };
216        assert_eq!(output, input);
217    }
218
219    #[test]
220    fn test_marshal_bytes_passthrough() {
221        let df = data_format();
222        let body = Body::Bytes(Bytes::from_static(b"raw"));
223        let out = df.marshal(body).expect("marshal should pass through bytes");
224        assert_eq!(out, Body::Bytes(Bytes::from_static(b"raw")));
225    }
226
227    #[test]
228    fn test_unmarshal_json_passthrough() {
229        let df = data_format();
230        let body = Body::Json(json!({ "name": "Passthrough" }));
231        let out = df
232            .unmarshal(body.clone())
233            .expect("unmarshal should pass through JSON");
234        assert_eq!(out, body);
235    }
236
237    #[test]
238    fn test_marshal_empty_rejected() {
239        let df = data_format();
240        let err = df.marshal(Body::Empty).expect_err("empty must be rejected");
241        assert!(format!("{err}").contains("empty"));
242    }
243
244    #[test]
245    fn test_unmarshal_empty_rejected() {
246        let df = data_format();
247        let err = df
248            .unmarshal(Body::Empty)
249            .expect_err("empty must be rejected");
250        assert!(format!("{err}").contains("empty"));
251    }
252
253    #[test]
254    fn test_message_not_found_error() {
255        let err = ProtobufDataFormat::new(test_proto_path(), "helloworld.DoesNotExist")
256            .err()
257            .expect("unknown message should fail");
258        assert!(format!("{err}").contains("message descriptor not found"));
259    }
260}