1mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28 AttributesValidationError(UAttributesError),
29 DataSerializationError(protobuf::Error),
30 PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37 "Builder state is not consistent with message type: {}",
38 e
39 )),
40 Self::DataSerializationError(e) => {
41 f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42 }
43 Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44 }
45 }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51 fn from(value: UAttributesError) -> Self {
52 Self::AttributesValidationError(value)
53 }
54}
55
56impl From<protobuf::Error> for UMessageError {
57 fn from(value: protobuf::Error) -> Self {
58 Self::DataSerializationError(value)
59 }
60}
61
62impl From<String> for UMessageError {
63 fn from(value: String) -> Self {
64 Self::PayloadError(value)
65 }
66}
67
68impl From<&str> for UMessageError {
69 fn from(value: &str) -> Self {
70 Self::from(value.to_string())
71 }
72}
73
74impl UMessage {
75 pub fn type_(&self) -> Option<UMessageType> {
77 self.attributes
78 .as_ref()
79 .and_then(|attribs| attribs.type_.enum_value().ok())
80 }
81
82 pub fn type_unchecked(&self) -> UMessageType {
88 self.type_().expect("message has no type")
89 }
90
91 pub fn id(&self) -> Option<&UUID> {
93 self.attributes
94 .as_ref()
95 .and_then(|attribs| attribs.id.as_ref())
96 }
97
98 pub fn id_unchecked(&self) -> &UUID {
104 self.id().expect("message has no ID")
105 }
106
107 pub fn source(&self) -> Option<&UUri> {
109 self.attributes
110 .as_ref()
111 .and_then(|attribs| attribs.source.as_ref())
112 }
113
114 pub fn source_unchecked(&self) -> &UUri {
120 self.source().expect("message has no source")
121 }
122
123 pub fn sink(&self) -> Option<&UUri> {
125 self.attributes
126 .as_ref()
127 .and_then(|attribs| attribs.sink.as_ref())
128 }
129
130 pub fn sink_unchecked(&self) -> &UUri {
136 self.sink().expect("message has no sink")
137 }
138
139 pub fn priority(&self) -> Option<UPriority> {
141 self.attributes
142 .as_ref()
143 .and_then(|attribs| attribs.priority.enum_value().ok())
144 .map(|prio| {
145 if prio == UPriority::UPRIORITY_UNSPECIFIED {
146 crate::uattributes::UPRIORITY_DEFAULT
147 } else {
148 prio
149 }
150 })
151 }
152
153 pub fn priority_unchecked(&self) -> UPriority {
159 self.priority().expect("message has no priority")
160 }
161
162 pub fn commstatus(&self) -> Option<UCode> {
164 self.attributes
165 .as_ref()
166 .and_then(|attribs| attribs.commstatus)
167 .and_then(|v| v.enum_value().ok())
168 }
169
170 pub fn commstatus_unchecked(&self) -> UCode {
176 self.commstatus().expect("message has no commstatus")
177 }
178
179 pub fn ttl(&self) -> Option<u32> {
185 self.attributes.as_ref().and_then(|attribs| attribs.ttl)
186 }
187
188 pub fn ttl_unchecked(&self) -> u32 {
198 self.ttl().expect("message has no time-to-live")
199 }
200
201 pub fn permission_level(&self) -> Option<u32> {
203 self.attributes
204 .as_ref()
205 .and_then(|attribs| attribs.permission_level)
206 }
207
208 pub fn token(&self) -> Option<&String> {
210 self.attributes
211 .as_ref()
212 .and_then(|attribs| attribs.token.as_ref())
213 }
214
215 pub fn traceparent(&self) -> Option<&String> {
217 self.attributes
218 .as_ref()
219 .and_then(|attribs| attribs.traceparent.as_ref())
220 }
221
222 pub fn request_id(&self) -> Option<&UUID> {
224 self.attributes
225 .as_ref()
226 .and_then(|attribs| attribs.reqid.as_ref())
227 }
228
229 pub fn request_id_unchecked(&self) -> &UUID {
235 self.request_id().expect("message has no request ID")
236 }
237
238 pub fn payload_format(&self) -> Option<UPayloadFormat> {
240 self.attributes
241 .as_ref()
242 .and_then(|attribs| attribs.payload_format.enum_value().ok())
243 }
244
245 pub fn payload_format_unchecked(&self) -> UPayloadFormat {
251 self.payload_format()
252 .expect("message has no payload format")
253 }
254
255 pub fn is_publish(&self) -> bool {
273 self.attributes
274 .as_ref()
275 .is_some_and(|attribs| attribs.is_publish())
276 }
277
278 pub fn is_request(&self) -> bool {
296 self.attributes
297 .as_ref()
298 .is_some_and(|attribs| attribs.is_request())
299 }
300
301 pub fn is_response(&self) -> bool {
319 self.attributes
320 .as_ref()
321 .is_some_and(|attribs| attribs.is_response())
322 }
323
324 pub fn is_notification(&self) -> bool {
342 self.attributes
343 .as_ref()
344 .is_some_and(|attribs| attribs.is_notification())
345 }
346
347 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
359 if let Some(payload) = self.payload.as_ref() {
360 let payload_format = self.attributes.payload_format.enum_value_or_default();
361 deserialize_protobuf_bytes(payload, &payload_format)
362 } else {
363 Err(UMessageError::PayloadError(
364 "Message has no payload".to_string(),
365 ))
366 }
367 }
368}
369
370pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
388 payload: &Bytes,
389 payload_format: &UPayloadFormat,
390) -> Result<T, UMessageError> {
391 match payload_format {
392 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
393 T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
394 }
395 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
396 Any::parse_from_tokio_bytes(payload)
397 .map_err(UMessageError::DataSerializationError)
398 .and_then(|any| match any.unpack() {
399 Ok(Some(v)) => Ok(v),
400 Ok(None) => Err(UMessageError::PayloadError(
401 "cannot deserialize payload, message type mismatch".to_string(),
402 )),
403 Err(e) => Err(UMessageError::DataSerializationError(e)),
404 })
405 }
406 _ => {
407 let detail_msg = payload_format.to_media_type().map_or_else(
408 || format!("Unknown payload format: {}", payload_format.value()),
409 |mt| format!("Invalid/unsupported payload format: {mt}"),
410 );
411 Err(UMessageError::from(detail_msg))
412 }
413 }
414}
415
416#[cfg(test)]
417mod test {
418 use std::io;
419
420 use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
421 use test_case::test_case;
422
423 use crate::{UAttributes, UStatus};
424
425 use super::*;
426
427 #[test]
428 fn test_deserialize_protobuf_bytes_succeeds() {
429 let mut data = StringValue::new();
430 data.value = "hello world".to_string();
431 let any = Any::pack(&data.clone()).unwrap();
432 let buf: Bytes = any.write_to_bytes().unwrap().into();
433
434 let result = deserialize_protobuf_bytes::<StringValue>(
435 &buf,
436 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
437 );
438 assert!(result.is_ok_and(|v| v.value == *"hello world"));
439
440 let result = deserialize_protobuf_bytes::<StringValue>(
441 &data.write_to_bytes().unwrap().into(),
442 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
443 );
444 assert!(result.is_ok_and(|v| v.value == *"hello world"));
445 }
446
447 #[test]
448 fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
449 let mut data = StringValue::new();
450 data.value = "hello world".to_string();
451 let any = Any::pack(&data).unwrap();
452 let buf: Bytes = any.write_to_bytes().unwrap().into();
453 let result = deserialize_protobuf_bytes::<UStatus>(
454 &buf,
455 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
456 );
457 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
458 }
459
460 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
461 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
462 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
463 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
464 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
465 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
466 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
467 fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
468 let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
469 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
470 }
471
472 #[test]
473 fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
474 let any = Any {
475 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
476 value: vec![0x0A],
477 ..Default::default()
478 };
479 let buf = any.write_to_bytes().unwrap();
480 let result = deserialize_protobuf_bytes::<Duration>(
481 &buf.into(),
482 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
483 );
484 assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
485 }
486
487 #[test]
488 fn extract_payload_succeeds() {
489 let payload = StringValue {
490 value: "hello".to_string(),
491 ..Default::default()
492 };
493 let buf = Any::pack(&payload)
494 .and_then(|a| a.write_to_bytes())
495 .unwrap();
496 let msg = UMessage {
497 attributes: Some(UAttributes {
498 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
499 ..Default::default()
500 })
501 .into(),
502 payload: Some(buf.into()),
503 ..Default::default()
504 };
505 assert!(msg
506 .extract_protobuf::<StringValue>()
507 .is_ok_and(|v| v.value == *"hello"));
508 }
509
510 #[test]
511 fn extract_payload_fails_for_no_payload() {
512 let msg = UMessage {
513 attributes: Some(UAttributes {
514 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
515 ..Default::default()
516 })
517 .into(),
518 ..Default::default()
519 };
520 assert!(msg
521 .extract_protobuf::<StringValue>()
522 .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
523 }
524
525 #[test]
526 fn test_from_attributes_error() {
527 let attributes_error = UAttributesError::validation_error("failed to validate");
528 let message_error = UMessageError::from(attributes_error);
529 assert!(matches!(
530 message_error,
531 UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
532 ));
533 }
534
535 #[test]
536 fn test_from_protobuf_error() {
537 let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
538 let message_error = UMessageError::from(protobuf_error);
539 assert!(matches!(
540 message_error,
541 UMessageError::DataSerializationError(_)
542 ));
543 }
544
545 #[test]
546 fn test_from_error_msg() {
547 let message_error = UMessageError::from("an error occurred");
548 assert!(matches!(message_error, UMessageError::PayloadError(_)));
549 }
550}