camel_dataformat_protobuf/
lib.rs1use std::path::Path;
2
3use bytes::BytesMut;
4use camel_api::body::Body;
5use camel_api::data_format::DataFormat;
6use camel_api::error::CamelError;
7use camel_proto_compiler::{ProtoCache, compile_proto};
8use prost::Message;
9use prost_reflect::{DynamicMessage, MessageDescriptor};
10
11pub struct ProtobufDataFormat {
12 descriptor: MessageDescriptor,
13}
14
15impl ProtobufDataFormat {
16 pub fn new<P: AsRef<Path>>(proto_path: P, message_name: &str) -> Result<Self, CamelError> {
17 let pool =
18 compile_proto(proto_path.as_ref(), std::iter::empty::<&Path>()).map_err(|e| {
19 CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
20 })?;
21 let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
22 CamelError::TypeConversionFailed(format!(
23 "message descriptor not found: {message_name}"
24 ))
25 })?;
26 Ok(Self { descriptor })
27 }
28
29 pub fn new_with_cache<P: AsRef<Path>>(
30 proto_path: P,
31 message_name: &str,
32 cache: &ProtoCache,
33 ) -> Result<Self, CamelError> {
34 let pool = cache
35 .get_or_compile(proto_path.as_ref(), std::iter::empty::<&Path>())
36 .map_err(|e| {
37 CamelError::TypeConversionFailed(format!("failed to compile proto: {e}"))
38 })?;
39 let descriptor = pool.get_message_by_name(message_name).ok_or_else(|| {
40 CamelError::TypeConversionFailed(format!(
41 "message descriptor not found: {message_name}"
42 ))
43 })?;
44 Ok(Self { descriptor })
45 }
46
47 pub fn descriptor(&self) -> &MessageDescriptor {
48 &self.descriptor
49 }
50
51 pub fn json_to_dynamic(
52 &self,
53 json_val: serde_json::Value,
54 ) -> Result<DynamicMessage, CamelError> {
55 let json_str = serde_json::to_string(&json_val).map_err(|e| {
56 CamelError::TypeConversionFailed(format!("failed to serialize JSON: {e}"))
57 })?;
58 let mut de = serde_json::Deserializer::from_str(&json_str);
59 DynamicMessage::deserialize(self.descriptor.clone(), &mut de).map_err(|e| {
60 CamelError::TypeConversionFailed(format!("failed to parse JSON into protobuf: {e}"))
61 })
62 }
63
64 pub fn dynamic_to_json(&self, msg: DynamicMessage) -> Result<serde_json::Value, CamelError> {
65 serde_json::to_value(&msg).map_err(|e| {
66 CamelError::TypeConversionFailed(format!("failed to serialize protobuf to JSON: {e}"))
67 })
68 }
69}
70
71impl DataFormat for ProtobufDataFormat {
72 fn name(&self) -> &str {
73 "protobuf"
74 }
75
76 fn marshal(&self, body: Body) -> Result<Body, CamelError> {
77 match body {
78 Body::Json(val) => {
79 let msg = self.json_to_dynamic(val)?;
80 let mut buf = BytesMut::new();
81 msg.encode(&mut buf).map_err(|e| {
82 CamelError::TypeConversionFailed(format!(
83 "failed to encode protobuf message: {e}"
84 ))
85 })?;
86 Ok(Body::Bytes(buf.freeze()))
87 }
88 Body::Text(text) => {
89 let val: serde_json::Value = serde_json::from_str(&text).map_err(|e| {
90 CamelError::TypeConversionFailed(format!(
91 "invalid JSON text for protobuf marshal: {e}"
92 ))
93 })?;
94 self.marshal(Body::Json(val))
95 }
96 Body::Bytes(bytes) => Ok(Body::Bytes(bytes)),
97 Body::Empty => Err(CamelError::TypeConversionFailed(
98 "protobuf marshal does not support empty body".to_string(),
99 )),
100 Body::Stream(_) => Err(CamelError::TypeConversionFailed(
101 "protobuf marshal does not support stream body".to_string(),
102 )),
103 Body::Xml(_) => Err(CamelError::TypeConversionFailed(
104 "protobuf marshal does not support XML body".to_string(),
105 )),
106 }
107 }
108
109 fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
110 match body {
111 Body::Bytes(bytes) => {
112 let msg = DynamicMessage::decode(self.descriptor.clone(), bytes.as_ref()).map_err(
113 |e| {
114 CamelError::TypeConversionFailed(format!(
115 "failed to decode protobuf bytes: {e}"
116 ))
117 },
118 )?;
119 let json = self.dynamic_to_json(msg)?;
120 Ok(Body::Json(json))
121 }
122 Body::Json(val) => Ok(Body::Json(val)),
123 Body::Text(_) => Err(CamelError::TypeConversionFailed(
124 "protobuf unmarshal does not support text body".to_string(),
125 )),
126 Body::Stream(_) => Err(CamelError::TypeConversionFailed(
127 "protobuf unmarshal does not support stream body".to_string(),
128 )),
129 Body::Empty => Err(CamelError::TypeConversionFailed(
130 "protobuf unmarshal does not support empty body".to_string(),
131 )),
132 Body::Xml(_) => Err(CamelError::TypeConversionFailed(
133 "protobuf unmarshal does not support XML body".to_string(),
134 )),
135 }
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use std::path::PathBuf;
142
143 use bytes::Bytes;
144 use camel_api::body::Body;
145 use camel_api::data_format::DataFormat;
146 use serde_json::json;
147
148 use super::ProtobufDataFormat;
149
150 fn test_proto_path() -> PathBuf {
151 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
152 .join("tests")
153 .join("helloworld.proto")
154 }
155
156 fn data_format() -> ProtobufDataFormat {
157 ProtobufDataFormat::new(test_proto_path(), "helloworld.HelloRequest")
158 .expect("should load descriptor")
159 }
160
161 #[test]
162 fn test_name() {
163 let df = data_format();
164 assert_eq!(df.name(), "protobuf");
165 }
166
167 #[test]
168 fn test_marshal_json_to_bytes() {
169 let df = data_format();
170 let body = Body::Json(json!({ "name": "Alice" }));
171 let out = df.marshal(body).expect("marshal should succeed");
172 match out {
173 Body::Bytes(b) => assert!(!b.is_empty()),
174 other => panic!("expected bytes, got {other:?}"),
175 }
176 }
177
178 #[test]
179 fn test_unmarshal_bytes_to_json() {
180 let df = data_format();
181 let bytes = match df
182 .marshal(Body::Json(json!({ "name": "Alice" })))
183 .expect("marshal should succeed")
184 {
185 Body::Bytes(b) => b,
186 other => panic!("expected bytes, got {other:?}"),
187 };
188
189 let out = df
190 .unmarshal(Body::Bytes(bytes))
191 .expect("unmarshal should succeed");
192 match out {
193 Body::Json(v) => assert_eq!(v, json!({ "name": "Alice" })),
194 other => panic!("expected json, got {other:?}"),
195 }
196 }
197
198 #[test]
199 fn test_roundtrip_json_bytes_json() {
200 let df = data_format();
201 let input = json!({ "name": "Bob" });
202 let bytes = match df
203 .marshal(Body::Json(input.clone()))
204 .expect("marshal should succeed")
205 {
206 Body::Bytes(b) => b,
207 other => panic!("expected bytes, got {other:?}"),
208 };
209 let output = match df
210 .unmarshal(Body::Bytes(bytes))
211 .expect("unmarshal should succeed")
212 {
213 Body::Json(v) => v,
214 other => panic!("expected json, got {other:?}"),
215 };
216 assert_eq!(output, input);
217 }
218
219 #[test]
220 fn test_marshal_bytes_passthrough() {
221 let df = data_format();
222 let body = Body::Bytes(Bytes::from_static(b"raw"));
223 let out = df.marshal(body).expect("marshal should pass through bytes");
224 assert_eq!(out, Body::Bytes(Bytes::from_static(b"raw")));
225 }
226
227 #[test]
228 fn test_unmarshal_json_passthrough() {
229 let df = data_format();
230 let body = Body::Json(json!({ "name": "Passthrough" }));
231 let out = df
232 .unmarshal(body.clone())
233 .expect("unmarshal should pass through JSON");
234 assert_eq!(out, body);
235 }
236
237 #[test]
238 fn test_marshal_empty_rejected() {
239 let df = data_format();
240 let err = df.marshal(Body::Empty).expect_err("empty must be rejected");
241 assert!(format!("{err}").contains("empty"));
242 }
243
244 #[test]
245 fn test_unmarshal_empty_rejected() {
246 let df = data_format();
247 let err = df
248 .unmarshal(Body::Empty)
249 .expect_err("empty must be rejected");
250 assert!(format!("{err}").contains("empty"));
251 }
252
253 #[test]
254 fn test_message_not_found_error() {
255 let err = ProtobufDataFormat::new(test_proto_path(), "helloworld.DoesNotExist")
256 .err()
257 .expect("unknown message should fail");
258 assert!(format!("{err}").contains("message descriptor not found"));
259 }
260}