1mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UPayloadFormat};
25
26#[derive(Debug)]
27pub enum UMessageError {
28 AttributesValidationError(UAttributesError),
29 DataSerializationError(protobuf::Error),
30 PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37 "Builder state is not consistent with message type: {}",
38 e
39 )),
40 Self::DataSerializationError(e) => {
41 f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42 }
43 Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44 }
45 }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51 fn from(value: UAttributesError) -> Self {
52 Self::AttributesValidationError(value)
53 }
54}
55
56impl From<protobuf::Error> for UMessageError {
57 fn from(value: protobuf::Error) -> Self {
58 Self::DataSerializationError(value)
59 }
60}
61
62impl From<String> for UMessageError {
63 fn from(value: String) -> Self {
64 Self::PayloadError(value)
65 }
66}
67
68impl From<&str> for UMessageError {
69 fn from(value: &str) -> Self {
70 Self::from(value.to_string())
71 }
72}
73
74impl UMessage {
75 pub fn is_publish(&self) -> bool {
93 self.attributes
94 .as_ref()
95 .is_some_and(|attribs| attribs.is_publish())
96 }
97
98 pub fn is_request(&self) -> bool {
116 self.attributes
117 .as_ref()
118 .is_some_and(|attribs| attribs.is_request())
119 }
120
121 pub fn is_response(&self) -> bool {
139 self.attributes
140 .as_ref()
141 .is_some_and(|attribs| attribs.is_response())
142 }
143
144 pub fn is_notification(&self) -> bool {
162 self.attributes
163 .as_ref()
164 .is_some_and(|attribs| attribs.is_notification())
165 }
166
167 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
186 if let Some(payload) = &self.payload {
187 let payload_format = self.attributes.payload_format.enum_value_or_default();
188 deserialize_protobuf_bytes(payload, &payload_format)
189 } else {
190 Err(UMessageError::PayloadError(
191 "No embedded payload".to_string(),
192 ))
193 }
194 }
195}
196
197pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
216 payload: &Bytes,
217 payload_format: &UPayloadFormat,
218) -> Result<T, UMessageError> {
219 match payload_format {
220 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
221 T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
222 }
223 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
224 | UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED => Any::parse_from_tokio_bytes(payload)
225 .map_err(UMessageError::DataSerializationError)
226 .and_then(|any| match any.unpack() {
227 Ok(Some(v)) => Ok(v),
228 Ok(None) => Err(UMessageError::PayloadError(
229 "cannot deserialize payload, message type mismatch".to_string(),
230 )),
231 Err(e) => Err(UMessageError::DataSerializationError(e)),
232 }),
233 _ => Err(UMessageError::from(format!(
234 "Unknown/invalid/unsupported payload format: {}",
235 payload_format
236 .to_media_type()
237 .unwrap_or("unknown".to_string())
238 ))),
239 }
240}
241
242#[cfg(test)]
243mod test {
244 use std::io;
245
246 use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
247 use test_case::test_case;
248
249 use crate::{UAttributes, UStatus};
250
251 use super::*;
252
253 #[test]
254 fn test_deserialize_protobuf_bytes_succeeds() {
255 let mut data = StringValue::new();
256 data.value = "hello world".to_string();
257 let any = Any::pack(&data.clone()).unwrap();
258 let buf: Bytes = any.write_to_bytes().unwrap().into();
259
260 let result = deserialize_protobuf_bytes::<StringValue>(
261 &buf,
262 &UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED,
263 );
264 assert!(result.is_ok_and(|v| v.value == *"hello world"));
265
266 let result = deserialize_protobuf_bytes::<StringValue>(
267 &buf,
268 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
269 );
270 assert!(result.is_ok_and(|v| v.value == *"hello world"));
271
272 let result = deserialize_protobuf_bytes::<StringValue>(
273 &data.write_to_bytes().unwrap().into(),
274 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
275 );
276 assert!(result.is_ok_and(|v| v.value == *"hello world"));
277 }
278
279 #[test]
280 fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
281 let mut data = StringValue::new();
282 data.value = "hello world".to_string();
283 let any = Any::pack(&data).unwrap();
284 let buf: Bytes = any.write_to_bytes().unwrap().into();
285 let result = deserialize_protobuf_bytes::<UStatus>(
286 &buf,
287 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
288 );
289 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
290 }
291
292 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
293 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
294 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
295 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
296 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
297 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
298 fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
299 let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
300 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
301 }
302
303 #[test]
304 fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
305 let any = Any {
306 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
307 value: vec![0x0A],
308 ..Default::default()
309 };
310 let buf = any.write_to_bytes().unwrap();
311 let result = deserialize_protobuf_bytes::<Duration>(
312 &buf.into(),
313 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
314 );
315 assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
316 }
317
318 #[test]
319 fn extract_payload_succeeds() {
320 let payload = StringValue {
321 value: "hello".to_string(),
322 ..Default::default()
323 };
324 let buf = Any::pack(&payload)
325 .and_then(|a| a.write_to_bytes())
326 .unwrap();
327 let msg = UMessage {
328 attributes: Some(UAttributes {
329 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
330 ..Default::default()
331 })
332 .into(),
333 payload: Some(buf.into()),
334 ..Default::default()
335 };
336 assert!(msg
337 .extract_protobuf::<StringValue>()
338 .is_ok_and(|v| v.value == *"hello"));
339 }
340
341 #[test]
342 fn extract_payload_fails_for_no_payload() {
343 let msg = UMessage {
344 attributes: Some(UAttributes {
345 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
346 ..Default::default()
347 })
348 .into(),
349 ..Default::default()
350 };
351 assert!(msg
352 .extract_protobuf::<StringValue>()
353 .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
354 }
355
356 #[test]
357 fn test_from_attributes_error() {
358 let attributes_error = UAttributesError::validation_error("failed to validate");
359 let message_error = UMessageError::from(attributes_error);
360 assert!(matches!(
361 message_error,
362 UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
363 ));
364 }
365
366 #[test]
367 fn test_from_protobuf_error() {
368 let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
369 let message_error = UMessageError::from(protobuf_error);
370 assert!(matches!(
371 message_error,
372 UMessageError::DataSerializationError(_)
373 ));
374 }
375
376 #[test]
377 fn test_from_error_msg() {
378 let message_error = UMessageError::from("an error occurred");
379 assert!(matches!(message_error, UMessageError::PayloadError(_)));
380 }
381}