1mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28 AttributesValidationError(UAttributesError),
29 DataSerializationError(protobuf::Error),
30 PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37 "Builder state is not consistent with message type: {e}"
38 )),
39 Self::DataSerializationError(e) => {
40 f.write_fmt(format_args!("Failed to serialize payload: {e}"))
41 }
42 Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")),
43 }
44 }
45}
46
47impl std::error::Error for UMessageError {}
48
49impl From<UAttributesError> for UMessageError {
50 fn from(value: UAttributesError) -> Self {
51 Self::AttributesValidationError(value)
52 }
53}
54
55impl From<protobuf::Error> for UMessageError {
56 fn from(value: protobuf::Error) -> Self {
57 Self::DataSerializationError(value)
58 }
59}
60
61impl From<String> for UMessageError {
62 fn from(value: String) -> Self {
63 Self::PayloadError(value)
64 }
65}
66
67impl From<&str> for UMessageError {
68 fn from(value: &str) -> Self {
69 Self::from(value.to_string())
70 }
71}
72
73impl UMessage {
74 pub fn type_(&self) -> Option<UMessageType> {
76 self.attributes
77 .as_ref()
78 .and_then(|attribs| attribs.type_.enum_value().ok())
79 }
80
81 pub fn type_unchecked(&self) -> UMessageType {
87 self.type_().expect("message has no type")
88 }
89
90 pub fn id(&self) -> Option<&UUID> {
92 self.attributes
93 .as_ref()
94 .and_then(|attribs| attribs.id.as_ref())
95 }
96
97 pub fn id_unchecked(&self) -> &UUID {
103 self.id().expect("message has no ID")
104 }
105
106 pub fn source(&self) -> Option<&UUri> {
108 self.attributes
109 .as_ref()
110 .and_then(|attribs| attribs.source.as_ref())
111 }
112
113 pub fn source_unchecked(&self) -> &UUri {
119 self.source().expect("message has no source")
120 }
121
122 pub fn sink(&self) -> Option<&UUri> {
124 self.attributes
125 .as_ref()
126 .and_then(|attribs| attribs.sink.as_ref())
127 }
128
129 pub fn sink_unchecked(&self) -> &UUri {
135 self.sink().expect("message has no sink")
136 }
137
138 pub fn priority(&self) -> Option<UPriority> {
140 self.attributes
141 .as_ref()
142 .and_then(|attribs| attribs.priority.enum_value().ok())
143 .map(|prio| {
144 if prio == UPriority::UPRIORITY_UNSPECIFIED {
145 crate::uattributes::UPRIORITY_DEFAULT
146 } else {
147 prio
148 }
149 })
150 }
151
152 pub fn priority_unchecked(&self) -> UPriority {
158 self.priority().expect("message has no priority")
159 }
160
161 pub fn commstatus(&self) -> Option<UCode> {
163 self.attributes
164 .as_ref()
165 .and_then(|attribs| attribs.commstatus)
166 .and_then(|v| v.enum_value().ok())
167 }
168
169 pub fn commstatus_unchecked(&self) -> UCode {
175 self.commstatus().expect("message has no commstatus")
176 }
177
178 pub fn ttl(&self) -> Option<u32> {
184 self.attributes.as_ref().and_then(|attribs| attribs.ttl)
185 }
186
187 pub fn ttl_unchecked(&self) -> u32 {
197 self.ttl().expect("message has no time-to-live")
198 }
199
200 pub fn permission_level(&self) -> Option<u32> {
202 self.attributes
203 .as_ref()
204 .and_then(|attribs| attribs.permission_level)
205 }
206
207 pub fn token(&self) -> Option<&String> {
209 self.attributes
210 .as_ref()
211 .and_then(|attribs| attribs.token.as_ref())
212 }
213
214 pub fn traceparent(&self) -> Option<&String> {
216 self.attributes
217 .as_ref()
218 .and_then(|attribs| attribs.traceparent.as_ref())
219 }
220
221 pub fn request_id(&self) -> Option<&UUID> {
223 self.attributes
224 .as_ref()
225 .and_then(|attribs| attribs.reqid.as_ref())
226 }
227
228 pub fn request_id_unchecked(&self) -> &UUID {
234 self.request_id().expect("message has no request ID")
235 }
236
237 pub fn payload_format(&self) -> Option<UPayloadFormat> {
239 self.attributes
240 .as_ref()
241 .and_then(|attribs| attribs.payload_format.enum_value().ok())
242 }
243
244 pub fn payload_format_unchecked(&self) -> UPayloadFormat {
250 self.payload_format()
251 .expect("message has no payload format")
252 }
253
254 pub fn is_publish(&self) -> bool {
272 self.attributes
273 .as_ref()
274 .is_some_and(|attribs| attribs.is_publish())
275 }
276
277 pub fn is_request(&self) -> bool {
295 self.attributes
296 .as_ref()
297 .is_some_and(|attribs| attribs.is_request())
298 }
299
300 pub fn is_response(&self) -> bool {
318 self.attributes
319 .as_ref()
320 .is_some_and(|attribs| attribs.is_response())
321 }
322
323 pub fn is_notification(&self) -> bool {
341 self.attributes
342 .as_ref()
343 .is_some_and(|attribs| attribs.is_notification())
344 }
345
346 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
358 if let Some(payload) = self.payload.as_ref() {
359 let payload_format = self.attributes.payload_format.enum_value_or_default();
360 deserialize_protobuf_bytes(payload, &payload_format)
361 } else {
362 Err(UMessageError::PayloadError(
363 "Message has no payload".to_string(),
364 ))
365 }
366 }
367}
368
369pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
387 payload: &Bytes,
388 payload_format: &UPayloadFormat,
389) -> Result<T, UMessageError> {
390 match payload_format {
391 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
392 T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
393 }
394 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
395 Any::parse_from_tokio_bytes(payload)
396 .map_err(UMessageError::DataSerializationError)
397 .and_then(|any| match any.unpack() {
398 Ok(Some(v)) => Ok(v),
399 Ok(None) => Err(UMessageError::PayloadError(
400 "cannot deserialize payload, message type mismatch".to_string(),
401 )),
402 Err(e) => Err(UMessageError::DataSerializationError(e)),
403 })
404 }
405 _ => {
406 let detail_msg = payload_format.to_media_type().map_or_else(
407 || format!("Unknown payload format: {}", payload_format.value()),
408 |mt| format!("Invalid/unsupported payload format: {mt}"),
409 );
410 Err(UMessageError::from(detail_msg))
411 }
412 }
413}
414
415#[cfg(test)]
416mod test {
417 use std::io;
418
419 use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
420 use test_case::test_case;
421
422 use crate::{UAttributes, UStatus};
423
424 use super::*;
425
426 #[test]
427 fn test_deserialize_protobuf_bytes_succeeds() {
428 let mut data = StringValue::new();
429 data.value = "hello world".to_string();
430 let any = Any::pack(&data.clone()).unwrap();
431 let buf: Bytes = any.write_to_bytes().unwrap().into();
432
433 let result = deserialize_protobuf_bytes::<StringValue>(
434 &buf,
435 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
436 );
437 assert!(result.is_ok_and(|v| v.value == *"hello world"));
438
439 let result = deserialize_protobuf_bytes::<StringValue>(
440 &data.write_to_bytes().unwrap().into(),
441 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
442 );
443 assert!(result.is_ok_and(|v| v.value == *"hello world"));
444 }
445
446 #[test]
447 fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
448 let mut data = StringValue::new();
449 data.value = "hello world".to_string();
450 let any = Any::pack(&data).unwrap();
451 let buf: Bytes = any.write_to_bytes().unwrap().into();
452 let result = deserialize_protobuf_bytes::<UStatus>(
453 &buf,
454 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
455 );
456 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
457 }
458
459 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
460 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
461 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
462 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
463 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
464 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
465 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
466 fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
467 let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
468 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
469 }
470
471 #[test]
472 fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
473 let any = Any {
474 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
475 value: vec![0x0A],
476 ..Default::default()
477 };
478 let buf = any.write_to_bytes().unwrap();
479 let result = deserialize_protobuf_bytes::<Duration>(
480 &buf.into(),
481 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
482 );
483 assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
484 }
485
486 #[test]
487 fn extract_payload_succeeds() {
488 let payload = StringValue {
489 value: "hello".to_string(),
490 ..Default::default()
491 };
492 let buf = Any::pack(&payload)
493 .and_then(|a| a.write_to_bytes())
494 .unwrap();
495 let msg = UMessage {
496 attributes: Some(UAttributes {
497 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
498 ..Default::default()
499 })
500 .into(),
501 payload: Some(buf.into()),
502 ..Default::default()
503 };
504 assert!(msg
505 .extract_protobuf::<StringValue>()
506 .is_ok_and(|v| v.value == *"hello"));
507 }
508
509 #[test]
510 fn extract_payload_fails_for_no_payload() {
511 let msg = UMessage {
512 attributes: Some(UAttributes {
513 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
514 ..Default::default()
515 })
516 .into(),
517 ..Default::default()
518 };
519 assert!(msg
520 .extract_protobuf::<StringValue>()
521 .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
522 }
523
524 #[test]
525 fn test_from_attributes_error() {
526 let attributes_error = UAttributesError::validation_error("failed to validate");
527 let message_error = UMessageError::from(attributes_error);
528 assert!(matches!(
529 message_error,
530 UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
531 ));
532 }
533
534 #[test]
535 fn test_from_protobuf_error() {
536 let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
537 let message_error = UMessageError::from(protobuf_error);
538 assert!(matches!(
539 message_error,
540 UMessageError::DataSerializationError(_)
541 ));
542 }
543
544 #[test]
545 fn test_from_error_msg() {
546 let message_error = UMessageError::from("an error occurred");
547 assert!(matches!(message_error, UMessageError::PayloadError(_)));
548 }
549}