1mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28 AttributesValidationError(UAttributesError),
29 DataSerializationError(protobuf::Error),
30 PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37 "Builder state is not consistent with message type: {}",
38 e
39 )),
40 Self::DataSerializationError(e) => {
41 f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42 }
43 Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44 }
45 }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51 fn from(value: UAttributesError) -> Self {
52 Self::AttributesValidationError(value)
53 }
54}
55
56impl From<protobuf::Error> for UMessageError {
57 fn from(value: protobuf::Error) -> Self {
58 Self::DataSerializationError(value)
59 }
60}
61
62impl From<String> for UMessageError {
63 fn from(value: String) -> Self {
64 Self::PayloadError(value)
65 }
66}
67
68impl From<&str> for UMessageError {
69 fn from(value: &str) -> Self {
70 Self::from(value.to_string())
71 }
72}
73
74impl UMessage {
75 pub fn type_(&self) -> Option<UMessageType> {
77 self.attributes
78 .as_ref()
79 .and_then(|attribs| attribs.type_.enum_value().ok())
80 }
81
82 pub fn type_unchecked(&self) -> UMessageType {
88 self.type_().expect("message has no type")
89 }
90
91 pub fn id(&self) -> Option<&UUID> {
93 self.attributes
94 .as_ref()
95 .and_then(|attribs| attribs.id.as_ref())
96 }
97
98 pub fn id_unchecked(&self) -> &UUID {
104 self.id().expect("message has no ID")
105 }
106
107 pub fn source(&self) -> Option<&UUri> {
109 self.attributes
110 .as_ref()
111 .and_then(|attribs| attribs.source.as_ref())
112 }
113
114 pub fn source_unchecked(&self) -> &UUri {
120 self.source().expect("message has no source")
121 }
122
123 pub fn sink(&self) -> Option<&UUri> {
125 self.attributes
126 .as_ref()
127 .and_then(|attribs| attribs.sink.as_ref())
128 }
129
130 pub fn sink_unchecked(&self) -> &UUri {
136 self.sink().expect("message has no sink")
137 }
138
139 pub fn priority(&self) -> Option<UPriority> {
141 self.attributes
142 .as_ref()
143 .and_then(|attribs| attribs.priority.enum_value().ok())
144 }
145
146 pub fn priority_unchecked(&self) -> UPriority {
152 self.priority().expect("message has no priority")
153 }
154
155 pub fn commstatus(&self) -> Option<UCode> {
157 self.attributes
158 .as_ref()
159 .and_then(|attribs| attribs.commstatus)
160 .and_then(|v| v.enum_value().ok())
161 }
162
163 pub fn commstatus_unchecked(&self) -> UCode {
169 self.commstatus().expect("message has no commstatus")
170 }
171
172 pub fn ttl(&self) -> Option<u32> {
178 self.attributes.as_ref().and_then(|attribs| attribs.ttl)
179 }
180
181 pub fn ttl_unchecked(&self) -> u32 {
191 self.ttl().expect("message has no time-to-live")
192 }
193
194 pub fn permission_level(&self) -> Option<u32> {
196 self.attributes
197 .as_ref()
198 .and_then(|attribs| attribs.permission_level)
199 }
200
201 pub fn token(&self) -> Option<&String> {
203 self.attributes
204 .as_ref()
205 .and_then(|attribs| attribs.token.as_ref())
206 }
207
208 pub fn traceparent(&self) -> Option<&String> {
210 self.attributes
211 .as_ref()
212 .and_then(|attribs| attribs.traceparent.as_ref())
213 }
214
215 pub fn request_id(&self) -> Option<&UUID> {
217 self.attributes
218 .as_ref()
219 .and_then(|attribs| attribs.reqid.as_ref())
220 }
221
222 pub fn request_id_unchecked(&self) -> &UUID {
228 self.request_id().expect("message has no request ID")
229 }
230
231 pub fn payload_format(&self) -> Option<UPayloadFormat> {
233 self.attributes
234 .as_ref()
235 .and_then(|attribs| attribs.payload_format.enum_value().ok())
236 }
237
238 pub fn payload_format_unchecked(&self) -> UPayloadFormat {
244 self.payload_format()
245 .expect("message has no payload format")
246 }
247
248 pub fn is_publish(&self) -> bool {
266 self.attributes
267 .as_ref()
268 .is_some_and(|attribs| attribs.is_publish())
269 }
270
271 pub fn is_request(&self) -> bool {
289 self.attributes
290 .as_ref()
291 .is_some_and(|attribs| attribs.is_request())
292 }
293
294 pub fn is_response(&self) -> bool {
312 self.attributes
313 .as_ref()
314 .is_some_and(|attribs| attribs.is_response())
315 }
316
317 pub fn is_notification(&self) -> bool {
335 self.attributes
336 .as_ref()
337 .is_some_and(|attribs| attribs.is_notification())
338 }
339
340 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
359 if let Some(payload) = &self.payload {
360 let payload_format = self.attributes.payload_format.enum_value_or_default();
361 deserialize_protobuf_bytes(payload, &payload_format)
362 } else {
363 Err(UMessageError::PayloadError(
364 "No embedded payload".to_string(),
365 ))
366 }
367 }
368}
369
370pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
389 payload: &Bytes,
390 payload_format: &UPayloadFormat,
391) -> Result<T, UMessageError> {
392 match payload_format {
393 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
394 T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
395 }
396 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
397 | UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED => Any::parse_from_tokio_bytes(payload)
398 .map_err(UMessageError::DataSerializationError)
399 .and_then(|any| match any.unpack() {
400 Ok(Some(v)) => Ok(v),
401 Ok(None) => Err(UMessageError::PayloadError(
402 "cannot deserialize payload, message type mismatch".to_string(),
403 )),
404 Err(e) => Err(UMessageError::DataSerializationError(e)),
405 }),
406 _ => Err(UMessageError::from(format!(
407 "Unknown/invalid/unsupported payload format: {}",
408 payload_format
409 .to_media_type()
410 .unwrap_or("unknown".to_string())
411 ))),
412 }
413}
414
415#[cfg(test)]
416mod test {
417 use std::io;
418
419 use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
420 use test_case::test_case;
421
422 use crate::{UAttributes, UStatus};
423
424 use super::*;
425
426 #[test]
427 fn test_deserialize_protobuf_bytes_succeeds() {
428 let mut data = StringValue::new();
429 data.value = "hello world".to_string();
430 let any = Any::pack(&data.clone()).unwrap();
431 let buf: Bytes = any.write_to_bytes().unwrap().into();
432
433 let result = deserialize_protobuf_bytes::<StringValue>(
434 &buf,
435 &UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED,
436 );
437 assert!(result.is_ok_and(|v| v.value == *"hello world"));
438
439 let result = deserialize_protobuf_bytes::<StringValue>(
440 &buf,
441 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
442 );
443 assert!(result.is_ok_and(|v| v.value == *"hello world"));
444
445 let result = deserialize_protobuf_bytes::<StringValue>(
446 &data.write_to_bytes().unwrap().into(),
447 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
448 );
449 assert!(result.is_ok_and(|v| v.value == *"hello world"));
450 }
451
452 #[test]
453 fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
454 let mut data = StringValue::new();
455 data.value = "hello world".to_string();
456 let any = Any::pack(&data).unwrap();
457 let buf: Bytes = any.write_to_bytes().unwrap().into();
458 let result = deserialize_protobuf_bytes::<UStatus>(
459 &buf,
460 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
461 );
462 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
463 }
464
465 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
466 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
467 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
468 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
469 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
470 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
471 fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
472 let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
473 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
474 }
475
476 #[test]
477 fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
478 let any = Any {
479 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
480 value: vec![0x0A],
481 ..Default::default()
482 };
483 let buf = any.write_to_bytes().unwrap();
484 let result = deserialize_protobuf_bytes::<Duration>(
485 &buf.into(),
486 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
487 );
488 assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
489 }
490
491 #[test]
492 fn extract_payload_succeeds() {
493 let payload = StringValue {
494 value: "hello".to_string(),
495 ..Default::default()
496 };
497 let buf = Any::pack(&payload)
498 .and_then(|a| a.write_to_bytes())
499 .unwrap();
500 let msg = UMessage {
501 attributes: Some(UAttributes {
502 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
503 ..Default::default()
504 })
505 .into(),
506 payload: Some(buf.into()),
507 ..Default::default()
508 };
509 assert!(msg
510 .extract_protobuf::<StringValue>()
511 .is_ok_and(|v| v.value == *"hello"));
512 }
513
514 #[test]
515 fn extract_payload_fails_for_no_payload() {
516 let msg = UMessage {
517 attributes: Some(UAttributes {
518 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
519 ..Default::default()
520 })
521 .into(),
522 ..Default::default()
523 };
524 assert!(msg
525 .extract_protobuf::<StringValue>()
526 .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
527 }
528
529 #[test]
530 fn test_from_attributes_error() {
531 let attributes_error = UAttributesError::validation_error("failed to validate");
532 let message_error = UMessageError::from(attributes_error);
533 assert!(matches!(
534 message_error,
535 UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
536 ));
537 }
538
539 #[test]
540 fn test_from_protobuf_error() {
541 let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
542 let message_error = UMessageError::from(protobuf_error);
543 assert!(matches!(
544 message_error,
545 UMessageError::DataSerializationError(_)
546 ));
547 }
548
549 #[test]
550 fn test_from_error_msg() {
551 let message_error = UMessageError::from("an error occurred");
552 assert!(matches!(message_error, UMessageError::PayloadError(_)));
553 }
554}