camel_dataformat_protobuf/
lib.rs1use std::path::Path;
12
13use bytes::BytesMut;
14use camel_api::body::Body;
15use camel_api::data_format::DataFormat;
16use camel_api::error::CamelError;
17use camel_proto_compiler::{ProtoCache, compile_proto};
18use prost::Message;
19use prost_reflect::{DynamicMessage, MessageDescriptor};
20
21#[derive(Debug, Clone, Default, PartialEq, Eq)]
22pub struct ProtobufConfig {
23 pub content_type_format: Option<String>,
26 pub instance_class: Option<String>,
28}
29
30impl ProtobufConfig {
31 pub fn validate(&self) -> Result<(), CamelError> {
32 if let Some(instance_class) = &self.instance_class
33 && instance_class.trim().is_empty()
34 {
35 return Err(CamelError::TypeConversionFailed(
36 "instance_class must be non-empty when set".to_string(),
37 ));
38 }
39
40 Ok(())
41 }
42}
43
44pub struct ProtobufDataFormat {
45 descriptor: MessageDescriptor,
46}
47
48impl ProtobufDataFormat {
49 pub fn new<P: AsRef<Path>>(proto_path: P, message_name: &str) -> Result<Self, CamelError> {
50 let pool =
51 compile_proto(proto_path.as_ref(), std::iter::empty::<&Path>()).map_err(|e| {
52 CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
53 })?;
54 let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
55 CamelError::Config(format!("message descriptor not found: {message_name}"))
56 })?;
57 Ok(Self { descriptor })
58 }
59
60 pub fn new_with_cache<P: AsRef<Path>>(
61 proto_path: P,
62 message_name: &str,
63 cache: &ProtoCache,
64 ) -> Result<Self, CamelError> {
65 let pool = cache
66 .get_or_compile(proto_path.as_ref(), std::iter::empty::<&Path>())
67 .map_err(|e| {
68 CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
69 })?;
70 let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
71 CamelError::Config(format!("message descriptor not found: {message_name}"))
72 })?;
73 Ok(Self { descriptor })
74 }
75
76 pub fn descriptor(&self) -> &MessageDescriptor {
77 &self.descriptor
78 }
79
80 pub fn json_to_dynamic(
81 &self,
82 json_val: serde_json::Value,
83 ) -> Result<DynamicMessage, CamelError> {
84 let json_str = serde_json::to_string(&json_val).map_err(|e| {
85 CamelError::TypeConversionFailed(format!("failed to serialize JSON: {e}"))
86 })?;
87 let mut de = serde_json::Deserializer::from_str(&json_str);
88 DynamicMessage::deserialize(self.descriptor.clone(), &mut de).map_err(|e| {
89 CamelError::TypeConversionFailed(format!("failed to parse JSON into protobuf: {e}"))
90 })
91 }
92
93 pub fn dynamic_to_json(&self, msg: DynamicMessage) -> Result<serde_json::Value, CamelError> {
94 serde_json::to_value(&msg).map_err(|e| {
95 CamelError::TypeConversionFailed(format!("failed to serialize protobuf to JSON: {e}"))
96 })
97 }
98}
99
100impl DataFormat for ProtobufDataFormat {
101 fn name(&self) -> &str {
102 "protobuf"
103 }
104
105 fn marshal(&self, body: Body) -> Result<Body, CamelError> {
106 match body {
107 Body::Json(val) => {
108 let msg = self.json_to_dynamic(val)?;
109 let mut buf = BytesMut::new();
110 msg.encode(&mut buf).map_err(|e| {
111 CamelError::TypeConversionFailed(format!(
112 "failed to encode protobuf message: {e}"
113 ))
114 })?;
115 Ok(Body::Bytes(buf.freeze()))
116 }
117 Body::Text(text) => {
118 let val: serde_json::Value = serde_json::from_str(&text).map_err(|e| {
119 CamelError::TypeConversionFailed(format!(
120 "invalid JSON text for protobuf marshal: {e}"
121 ))
122 })?;
123 self.marshal(Body::Json(val))
124 }
125 Body::Bytes(bytes) => {
126 DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(|e| {
127 CamelError::ProcessorError(format!(
128 "protobuf marshal: invalid bytes for type {}: {e}",
129 self.descriptor.full_name()
130 ))
131 })?;
132 Ok(Body::Bytes(bytes))
133 }
134 Body::Empty => Err(CamelError::TypeConversionFailed(
135 "protobuf marshal does not support empty body".to_string(),
136 )),
137 Body::Stream(_) => Err(CamelError::TypeConversionFailed(
138 "protobuf marshal does not support stream body".to_string(),
139 )),
140 Body::Xml(_) => Err(CamelError::TypeConversionFailed(
141 "protobuf marshal does not support XML body".to_string(),
142 )),
143 }
144 }
145
146 fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
147 match body {
148 Body::Bytes(bytes) => {
149 let msg = DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(
150 |e| {
151 CamelError::TypeConversionFailed(format!(
152 "failed to decode protobuf bytes: {e}"
153 ))
154 },
155 )?;
156 let json = self.dynamic_to_json(msg)?;
157 Ok(Body::Json(json))
158 }
159 Body::Json(val) => Ok(Body::Json(val)),
160 Body::Text(_) => Err(CamelError::TypeConversionFailed(
161 "protobuf unmarshal does not support text body".to_string(),
162 )),
163 Body::Stream(_) => Err(CamelError::TypeConversionFailed(
164 "protobuf unmarshal does not support stream body".to_string(),
165 )),
166 Body::Empty => Err(CamelError::TypeConversionFailed(
167 "protobuf unmarshal does not support empty body".to_string(),
168 )),
169 Body::Xml(_) => Err(CamelError::TypeConversionFailed(
170 "protobuf unmarshal does not support XML body".to_string(),
171 )),
172 }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::path::PathBuf;
179
180 use bytes::Bytes;
181 use camel_api::body::Body;
182 use camel_api::data_format::DataFormat;
183 use camel_api::error::CamelError;
184 use serde_json::json;
185
186 use super::{ProtobufConfig, ProtobufDataFormat};
187
188 fn test_proto_path() -> PathBuf {
189 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
190 .join("tests")
191 .join("helloworld.proto")
192 }
193
194 fn data_format() -> ProtobufDataFormat {
195 ProtobufDataFormat::new(test_proto_path(), "helloworld.HelloRequest")
196 .expect("should load descriptor")
197 }
198
199 #[test]
200 fn test_name() {
201 let df = data_format();
202 assert_eq!(df.name(), "protobuf");
203 }
204
205 #[test]
206 fn test_marshal_json_to_bytes() {
207 let df = data_format();
208 let body = Body::Json(json!({ "name": "Alice" }));
209 let out = df.marshal(body).expect("marshal should succeed");
210 match out {
211 Body::Bytes(b) => assert!(!b.is_empty()),
212 other => panic!("expected bytes, got {other:?}"),
213 }
214 }
215
216 #[test]
217 fn test_unmarshal_bytes_to_json() {
218 let df = data_format();
219 let bytes = match df
220 .marshal(Body::Json(json!({ "name": "Alice" })))
221 .expect("marshal should succeed")
222 {
223 Body::Bytes(b) => b,
224 other => panic!("expected bytes, got {other:?}"),
225 };
226
227 let out = df
228 .unmarshal(Body::Bytes(bytes))
229 .expect("unmarshal should succeed");
230 match out {
231 Body::Json(v) => assert_eq!(v, json!({ "name": "Alice" })),
232 other => panic!("expected json, got {other:?}"),
233 }
234 }
235
236 #[test]
237 fn test_roundtrip_json_bytes_json() {
238 let df = data_format();
239 let input = json!({ "name": "Bob" });
240 let bytes = match df
241 .marshal(Body::Json(input.clone()))
242 .expect("marshal should succeed")
243 {
244 Body::Bytes(b) => b,
245 other => panic!("expected bytes, got {other:?}"),
246 };
247 let output = match df
248 .unmarshal(Body::Bytes(bytes))
249 .expect("unmarshal should succeed")
250 {
251 Body::Json(v) => v,
252 other => panic!("expected json, got {other:?}"),
253 };
254 assert_eq!(output, input);
255 }
256
257 #[test]
258 fn test_marshal_bytes_passthrough() {
259 let df = data_format();
260 let body = Body::Bytes(Bytes::from_static(b"raw"));
261 let err = df
262 .marshal(body)
263 .expect_err("invalid bytes should be rejected");
264 assert!(
265 err.to_string()
266 .contains("protobuf marshal: invalid bytes for type")
267 );
268 }
269
270 #[test]
271 fn test_marshal_valid_bytes_accepted() {
272 let df = data_format();
273 let bytes = match df
274 .marshal(Body::Json(json!({ "name": "Alice" })))
275 .expect("marshal should succeed")
276 {
277 Body::Bytes(b) => b,
278 other => panic!("expected bytes, got {other:?}"),
279 };
280 let out = df
281 .marshal(Body::Bytes(bytes.clone()))
282 .expect("valid protobuf bytes should be accepted");
283 assert_eq!(out, Body::Bytes(bytes));
284 }
285
286 #[test]
287 fn test_unmarshal_json_passthrough() {
288 let df = data_format();
289 let body = Body::Json(json!({ "name": "Passthrough" }));
290 let out = df
291 .unmarshal(body.clone())
292 .expect("unmarshal should pass through JSON");
293 assert_eq!(out, body);
294 }
295
296 #[test]
297 fn test_marshal_empty_rejected() {
298 let df = data_format();
299 let err = df.marshal(Body::Empty).expect_err("empty must be rejected");
300 assert!(format!("{err}").contains("empty"));
301 }
302
303 #[test]
304 fn test_unmarshal_empty_rejected() {
305 let df = data_format();
306 let err = df
307 .unmarshal(Body::Empty)
308 .expect_err("empty must be rejected");
309 assert!(format!("{err}").contains("empty"));
310 }
311
312 #[test]
313 fn test_message_not_found_error() {
314 let err = ProtobufDataFormat::new(test_proto_path(), "helloworld.DoesNotExist")
315 .err()
316 .expect("unknown message should fail");
317 assert!(matches!(err, CamelError::Config(_)));
318 }
319
320 #[test]
321 fn test_empty_instance_class_rejected() {
322 let config = ProtobufConfig {
323 instance_class: Some("".into()),
324 ..Default::default()
325 };
326 assert!(config.validate().is_err());
327 }
328
329 #[test]
330 fn test_valid_instance_class_accepted() {
331 let config = ProtobufConfig {
332 instance_class: Some("com.example.MyMessage".into()),
333 ..Default::default()
334 };
335 assert!(config.validate().is_ok());
336 }
337
338 #[test]
339 fn test_no_instance_class_valid() {
340 let config = ProtobufConfig::default();
341 assert!(config.validate().is_ok());
342 }
343
344 #[test]
347 fn test_encode_decode_roundtrip() {
348 let df = data_format();
349 let original = json!({ "name": "RoundtripCharlie" });
350
351 let encoded = match df
353 .marshal(Body::Json(original.clone()))
354 .expect("marshal should succeed")
355 {
356 Body::Bytes(b) => b,
357 other => panic!("expected bytes after marshal, got {other:?}"),
358 };
359 assert!(!encoded.is_empty(), "encoded bytes should not be empty");
360
361 let decoded = match df
363 .unmarshal(Body::Bytes(encoded))
364 .expect("unmarshal should succeed")
365 {
366 Body::Json(v) => v,
367 other => panic!("expected json after unmarshal, got {other:?}"),
368 };
369
370 assert_eq!(decoded, original, "roundtrip should preserve field values");
371 }
372}