1mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{
25 UAttributes, UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID,
26};
27
28#[derive(Debug)]
29pub enum UMessageError {
30 AttributesValidationError(UAttributesError),
31 DataSerializationError(protobuf::Error),
32 PayloadError(String),
33}
34
35impl std::fmt::Display for UMessageError {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 Self::AttributesValidationError(e) => f.write_fmt(format_args!(
39 "Builder state is not consistent with message type: {e}"
40 )),
41 Self::DataSerializationError(e) => {
42 f.write_fmt(format_args!("Failed to serialize payload: {e}"))
43 }
44 Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")),
45 }
46 }
47}
48
49impl std::error::Error for UMessageError {}
50
51impl From<UAttributesError> for UMessageError {
52 fn from(value: UAttributesError) -> Self {
53 Self::AttributesValidationError(value)
54 }
55}
56
57impl From<protobuf::Error> for UMessageError {
58 fn from(value: protobuf::Error) -> Self {
59 Self::DataSerializationError(value)
60 }
61}
62
63impl From<String> for UMessageError {
64 fn from(value: String) -> Self {
65 Self::PayloadError(value)
66 }
67}
68
69impl From<&str> for UMessageError {
70 fn from(value: &str) -> Self {
71 Self::from(value.to_string())
72 }
73}
74
75impl UMessage {
76 pub fn attributes(&self) -> Option<&UAttributes> {
78 self.attributes.as_ref()
79 }
80 pub fn attributes_unchecked(&self) -> &UAttributes {
82 self.attributes().expect("message has no attributes")
83 }
84
85 pub fn type_(&self) -> Option<UMessageType> {
87 self.attributes().and_then(UAttributes::type_)
88 }
89
90 pub fn type_unchecked(&self) -> UMessageType {
96 self.attributes_unchecked().type_unchecked()
97 }
98
99 pub fn id(&self) -> Option<&UUID> {
101 self.attributes().and_then(UAttributes::id)
102 }
103
104 pub fn id_unchecked(&self) -> &UUID {
110 self.attributes_unchecked().id_unchecked()
111 }
112
113 pub fn source(&self) -> Option<&UUri> {
115 self.attributes().and_then(UAttributes::source)
116 }
117
118 pub fn source_unchecked(&self) -> &UUri {
124 self.attributes_unchecked().source_unchecked()
125 }
126
127 pub fn sink(&self) -> Option<&UUri> {
129 self.attributes().and_then(UAttributes::sink)
130 }
131
132 pub fn sink_unchecked(&self) -> &UUri {
138 self.attributes_unchecked().sink_unchecked()
139 }
140
141 pub fn priority(&self) -> Option<UPriority> {
143 self.attributes().and_then(UAttributes::priority)
144 }
145
146 pub fn priority_unchecked(&self) -> UPriority {
152 self.attributes_unchecked().priority_unchecked()
153 }
154
155 pub fn commstatus(&self) -> Option<UCode> {
157 self.attributes().and_then(UAttributes::commstatus)
158 }
159
160 pub fn commstatus_unchecked(&self) -> UCode {
166 self.attributes_unchecked().commstatus_unchecked()
167 }
168
169 pub fn ttl(&self) -> Option<u32> {
175 self.attributes().and_then(UAttributes::ttl)
176 }
177
178 pub fn ttl_unchecked(&self) -> u32 {
188 self.attributes_unchecked().ttl_unchecked()
189 }
190
191 pub fn permission_level(&self) -> Option<u32> {
193 self.attributes().and_then(UAttributes::permission_level)
194 }
195
196 pub fn token(&self) -> Option<&String> {
198 self.attributes().and_then(UAttributes::token)
199 }
200
201 pub fn traceparent(&self) -> Option<&String> {
203 self.attributes().and_then(UAttributes::traceparent)
204 }
205
206 pub fn request_id(&self) -> Option<&UUID> {
208 self.attributes().and_then(UAttributes::request_id)
209 }
210
211 pub fn request_id_unchecked(&self) -> &UUID {
217 self.attributes_unchecked().request_id_unchecked()
218 }
219
220 pub fn payload_format(&self) -> Option<UPayloadFormat> {
222 self.attributes().and_then(UAttributes::payload_format)
223 }
224
225 pub fn payload_format_unchecked(&self) -> UPayloadFormat {
231 self.attributes_unchecked().payload_format_unchecked()
232 }
233
234 pub fn is_publish(&self) -> bool {
252 self.attributes().is_some_and(UAttributes::is_publish)
253 }
254
255 pub fn is_request(&self) -> bool {
273 self.attributes().is_some_and(UAttributes::is_request)
274 }
275
276 pub fn is_response(&self) -> bool {
294 self.attributes().is_some_and(UAttributes::is_response)
295 }
296
297 pub fn is_notification(&self) -> bool {
315 self.attributes().is_some_and(UAttributes::is_notification)
316 }
317
318 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
330 if let Some(payload) = self.payload.as_ref() {
331 let payload_format = self.payload_format().unwrap_or_default();
332 deserialize_protobuf_bytes(payload, &payload_format)
333 } else {
334 Err(UMessageError::PayloadError(
335 "Message has no payload".to_string(),
336 ))
337 }
338 }
339}
340
341pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
359 payload: &Bytes,
360 payload_format: &UPayloadFormat,
361) -> Result<T, UMessageError> {
362 match payload_format {
363 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
364 T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
365 }
366 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
367 Any::parse_from_tokio_bytes(payload)
368 .map_err(UMessageError::DataSerializationError)
369 .and_then(|any| match any.unpack() {
370 Ok(Some(v)) => Ok(v),
371 Ok(None) => Err(UMessageError::PayloadError(
372 "cannot deserialize payload, message type mismatch".to_string(),
373 )),
374 Err(e) => Err(UMessageError::DataSerializationError(e)),
375 })
376 }
377 _ => {
378 let detail_msg = payload_format.to_media_type().map_or_else(
379 || format!("Unknown payload format: {}", payload_format.value()),
380 |mt| format!("Invalid/unsupported payload format: {mt}"),
381 );
382 Err(UMessageError::from(detail_msg))
383 }
384 }
385}
386
387#[cfg(test)]
388mod test {
389 use std::io;
390
391 use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
392 use test_case::test_case;
393
394 use crate::{UAttributes, UStatus};
395
396 use super::*;
397
398 #[test]
399 fn test_deserialize_protobuf_bytes_succeeds() {
400 let mut data = StringValue::new();
401 data.value = "hello world".to_string();
402 let any = Any::pack(&data.clone()).unwrap();
403 let buf: Bytes = any.write_to_bytes().unwrap().into();
404
405 let result = deserialize_protobuf_bytes::<StringValue>(
406 &buf,
407 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
408 );
409 assert!(result.is_ok_and(|v| v.value == *"hello world"));
410
411 let result = deserialize_protobuf_bytes::<StringValue>(
412 &data.write_to_bytes().unwrap().into(),
413 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
414 );
415 assert!(result.is_ok_and(|v| v.value == *"hello world"));
416 }
417
418 #[test]
419 fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
420 let mut data = StringValue::new();
421 data.value = "hello world".to_string();
422 let any = Any::pack(&data).unwrap();
423 let buf: Bytes = any.write_to_bytes().unwrap().into();
424 let result = deserialize_protobuf_bytes::<UStatus>(
425 &buf,
426 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
427 );
428 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
429 }
430
431 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
432 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
433 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
434 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
435 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
436 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
437 #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
438 fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
439 let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
440 assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
441 }
442
443 #[test]
444 fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
445 let any = Any {
446 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
447 value: vec![0x0A],
448 ..Default::default()
449 };
450 let buf = any.write_to_bytes().unwrap();
451 let result = deserialize_protobuf_bytes::<Duration>(
452 &buf.into(),
453 &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
454 );
455 assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
456 }
457
458 #[test]
459 fn extract_payload_succeeds() {
460 let payload = StringValue {
461 value: "hello".to_string(),
462 ..Default::default()
463 };
464 let buf = Any::pack(&payload)
465 .and_then(|a| a.write_to_bytes())
466 .unwrap();
467 let msg = UMessage {
468 attributes: Some(UAttributes {
469 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
470 ..Default::default()
471 })
472 .into(),
473 payload: Some(buf.into()),
474 ..Default::default()
475 };
476 assert!(msg
477 .extract_protobuf::<StringValue>()
478 .is_ok_and(|v| v.value == *"hello"));
479 }
480
481 #[test]
482 fn extract_payload_fails_for_no_payload() {
483 let msg = UMessage {
484 attributes: Some(UAttributes {
485 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
486 ..Default::default()
487 })
488 .into(),
489 ..Default::default()
490 };
491 assert!(msg
492 .extract_protobuf::<StringValue>()
493 .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
494 }
495
496 #[test]
497 fn test_from_attributes_error() {
498 let attributes_error = UAttributesError::validation_error("failed to validate");
499 let message_error = UMessageError::from(attributes_error);
500 assert!(matches!(
501 message_error,
502 UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
503 ));
504 }
505
506 #[test]
507 fn test_from_protobuf_error() {
508 let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
509 let message_error = UMessageError::from(protobuf_error);
510 assert!(matches!(
511 message_error,
512 UMessageError::DataSerializationError(_)
513 ));
514 }
515
516 #[test]
517 fn test_from_error_msg() {
518 let message_error = UMessageError::from("an error occurred");
519 assert!(matches!(message_error, UMessageError::PayloadError(_)));
520 }
521}