up_rust/
umessage.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28    AttributesValidationError(UAttributesError),
29    DataSerializationError(protobuf::Error),
30    PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37                "Builder state is not consistent with message type: {e}"
38            )),
39            Self::DataSerializationError(e) => {
40                f.write_fmt(format_args!("Failed to serialize payload: {e}"))
41            }
42            Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")),
43        }
44    }
45}
46
47impl std::error::Error for UMessageError {}
48
49impl From<UAttributesError> for UMessageError {
50    fn from(value: UAttributesError) -> Self {
51        Self::AttributesValidationError(value)
52    }
53}
54
55impl From<protobuf::Error> for UMessageError {
56    fn from(value: protobuf::Error) -> Self {
57        Self::DataSerializationError(value)
58    }
59}
60
61impl From<String> for UMessageError {
62    fn from(value: String) -> Self {
63        Self::PayloadError(value)
64    }
65}
66
67impl From<&str> for UMessageError {
68    fn from(value: &str) -> Self {
69        Self::from(value.to_string())
70    }
71}
72
73impl UMessage {
74    /// Gets this message's type.
75    pub fn type_(&self) -> Option<UMessageType> {
76        self.attributes
77            .as_ref()
78            .and_then(|attribs| attribs.type_.enum_value().ok())
79    }
80
81    /// Gets this message's type.
82    ///
83    /// # Panics
84    ///
85    /// if the property has no value.
86    pub fn type_unchecked(&self) -> UMessageType {
87        self.type_().expect("message has no type")
88    }
89
90    /// Gets this message's identifier.
91    pub fn id(&self) -> Option<&UUID> {
92        self.attributes
93            .as_ref()
94            .and_then(|attribs| attribs.id.as_ref())
95    }
96
97    /// Gets this message's identifier.
98    ///
99    /// # Panics
100    ///
101    /// if the property has no value.
102    pub fn id_unchecked(&self) -> &UUID {
103        self.id().expect("message has no ID")
104    }
105
106    /// Gets this message's source address.
107    pub fn source(&self) -> Option<&UUri> {
108        self.attributes
109            .as_ref()
110            .and_then(|attribs| attribs.source.as_ref())
111    }
112
113    /// Gets this message's source address.
114    ///
115    /// # Panics
116    ///
117    /// if the property has no value.
118    pub fn source_unchecked(&self) -> &UUri {
119        self.source().expect("message has no source")
120    }
121
122    /// Gets this message's sink address.
123    pub fn sink(&self) -> Option<&UUri> {
124        self.attributes
125            .as_ref()
126            .and_then(|attribs| attribs.sink.as_ref())
127    }
128
129    /// Gets this message's sink address.
130    ///
131    /// # Panics
132    ///
133    /// if the property has no value.
134    pub fn sink_unchecked(&self) -> &UUri {
135        self.sink().expect("message has no sink")
136    }
137
138    /// Gets this message's priority.
139    pub fn priority(&self) -> Option<UPriority> {
140        self.attributes
141            .as_ref()
142            .and_then(|attribs| attribs.priority.enum_value().ok())
143            .map(|prio| {
144                if prio == UPriority::UPRIORITY_UNSPECIFIED {
145                    crate::uattributes::UPRIORITY_DEFAULT
146                } else {
147                    prio
148                }
149            })
150    }
151
152    /// Gets this message's priority.
153    ///
154    /// # Panics
155    ///
156    /// if the property has no value.
157    pub fn priority_unchecked(&self) -> UPriority {
158        self.priority().expect("message has no priority")
159    }
160
161    /// Gets this message's commstatus.
162    pub fn commstatus(&self) -> Option<UCode> {
163        self.attributes
164            .as_ref()
165            .and_then(|attribs| attribs.commstatus)
166            .and_then(|v| v.enum_value().ok())
167    }
168
169    /// Gets this message's commstatus.
170    ///
171    /// # Panics
172    ///
173    /// if the property has no value.
174    pub fn commstatus_unchecked(&self) -> UCode {
175        self.commstatus().expect("message has no commstatus")
176    }
177
178    /// Gets this message's time-to-live.
179    ///
180    /// # Returns
181    ///
182    /// the time-to-live in milliseconds.
183    pub fn ttl(&self) -> Option<u32> {
184        self.attributes.as_ref().and_then(|attribs| attribs.ttl)
185    }
186
187    /// Gets this message's time-to-live.
188    ///
189    /// # Returns
190    ///
191    /// the time-to-live in milliseconds.
192    ///
193    /// # Panics
194    ///
195    /// if the property has no value.
196    pub fn ttl_unchecked(&self) -> u32 {
197        self.ttl().expect("message has no time-to-live")
198    }
199
200    /// Gets this message's permission level.
201    pub fn permission_level(&self) -> Option<u32> {
202        self.attributes
203            .as_ref()
204            .and_then(|attribs| attribs.permission_level)
205    }
206
207    /// Gets this message's token.
208    pub fn token(&self) -> Option<&String> {
209        self.attributes
210            .as_ref()
211            .and_then(|attribs| attribs.token.as_ref())
212    }
213
214    /// Gets this message's traceparent.
215    pub fn traceparent(&self) -> Option<&String> {
216        self.attributes
217            .as_ref()
218            .and_then(|attribs| attribs.traceparent.as_ref())
219    }
220
221    /// Gets this message's request identifier.
222    pub fn request_id(&self) -> Option<&UUID> {
223        self.attributes
224            .as_ref()
225            .and_then(|attribs| attribs.reqid.as_ref())
226    }
227
228    /// Gets this message's request identifier.
229    ///
230    /// # Panics
231    ///
232    /// if the property has no value.
233    pub fn request_id_unchecked(&self) -> &UUID {
234        self.request_id().expect("message has no request ID")
235    }
236
237    /// Gets this message's payload format.
238    pub fn payload_format(&self) -> Option<UPayloadFormat> {
239        self.attributes
240            .as_ref()
241            .and_then(|attribs| attribs.payload_format.enum_value().ok())
242    }
243
244    /// Gets this message's payload format.
245    ///
246    /// # Panics
247    ///
248    /// if the property has no value.
249    pub fn payload_format_unchecked(&self) -> UPayloadFormat {
250        self.payload_format()
251            .expect("message has no payload format")
252    }
253
254    /// Checks if this is a Publish message.
255    ///
256    /// # Examples
257    ///
258    /// ```rust
259    /// use up_rust::{UAttributes, UMessage, UMessageType};
260    ///
261    /// let attribs = UAttributes {
262    ///   type_: UMessageType::UMESSAGE_TYPE_PUBLISH.into(),
263    ///   ..Default::default()
264    /// };
265    /// let msg = UMessage {
266    ///   attributes: Some(attribs).into(),
267    ///   ..Default::default()
268    /// };
269    /// assert!(msg.is_publish());
270    /// ```
271    pub fn is_publish(&self) -> bool {
272        self.attributes
273            .as_ref()
274            .is_some_and(|attribs| attribs.is_publish())
275    }
276
277    /// Checks if this is an RPC Request message.
278    ///
279    /// # Examples
280    ///
281    /// ```rust
282    /// use up_rust::{UAttributes, UMessage, UMessageType};
283    ///
284    /// let attribs = UAttributes {
285    ///   type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(),
286    ///   ..Default::default()
287    /// };
288    /// let msg = UMessage {
289    ///   attributes: Some(attribs).into(),
290    ///   ..Default::default()
291    /// };
292    /// assert!(msg.is_request());
293    /// ```
294    pub fn is_request(&self) -> bool {
295        self.attributes
296            .as_ref()
297            .is_some_and(|attribs| attribs.is_request())
298    }
299
300    /// Checks if this is an RPC Response message.
301    ///
302    /// # Examples
303    ///
304    /// ```rust
305    /// use up_rust::{UAttributes, UMessage, UMessageType};
306    ///
307    /// let attribs = UAttributes {
308    ///   type_: UMessageType::UMESSAGE_TYPE_RESPONSE.into(),
309    ///   ..Default::default()
310    /// };
311    /// let msg = UMessage {
312    ///   attributes: Some(attribs).into(),
313    ///   ..Default::default()
314    /// };
315    /// assert!(msg.is_response());
316    /// ```
317    pub fn is_response(&self) -> bool {
318        self.attributes
319            .as_ref()
320            .is_some_and(|attribs| attribs.is_response())
321    }
322
323    /// Checks if this is a Notification message.
324    ///
325    /// # Examples
326    ///
327    /// ```rust
328    /// use up_rust::{UAttributes, UMessage, UMessageType};
329    ///
330    /// let attribs = UAttributes {
331    ///   type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
332    ///   ..Default::default()
333    /// };
334    /// let msg = UMessage {
335    ///   attributes: Some(attribs).into(),
336    ///   ..Default::default()
337    /// };
338    /// assert!(msg.is_notification());
339    /// ```
340    pub fn is_notification(&self) -> bool {
341        self.attributes
342            .as_ref()
343            .is_some_and(|attribs| attribs.is_notification())
344    }
345
346    /// Deserializes this message's protobuf payload into a type.
347    ///
348    /// # Type Parameters
349    ///
350    /// * `T`: The target type of the data to be unpacked.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the message payload format is neither [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF]
355    /// nor [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY] or if the bytes in the
356    /// payload cannot be deserialized into the target type.
357    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
358        if let Some(payload) = self.payload.as_ref() {
359            let payload_format = self.attributes.payload_format.enum_value_or_default();
360            deserialize_protobuf_bytes(payload, &payload_format)
361        } else {
362            Err(UMessageError::PayloadError(
363                "Message has no payload".to_string(),
364            ))
365        }
366    }
367}
368
369/// Deserializes a protobuf message from a byte array.
370///
371/// # Type Parameters
372///
373/// * `T`: The target type of the data to be unpacked.
374///
375/// # Arguments
376///
377/// * `payload` - The payload data.
378/// * `payload_format` - The format/encoding of the data. Must be one of
379///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`
380///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`
381///
382/// # Errors
383///
384/// Returns an error if the payload format is unsupported or if the data can not be deserialized
385/// into the target type based on the given format.
386pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
387    payload: &Bytes,
388    payload_format: &UPayloadFormat,
389) -> Result<T, UMessageError> {
390    match payload_format {
391        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
392            T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
393        }
394        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
395            Any::parse_from_tokio_bytes(payload)
396                .map_err(UMessageError::DataSerializationError)
397                .and_then(|any| match any.unpack() {
398                    Ok(Some(v)) => Ok(v),
399                    Ok(None) => Err(UMessageError::PayloadError(
400                        "cannot deserialize payload, message type mismatch".to_string(),
401                    )),
402                    Err(e) => Err(UMessageError::DataSerializationError(e)),
403                })
404        }
405        _ => {
406            let detail_msg = payload_format.to_media_type().map_or_else(
407                || format!("Unknown payload format: {}", payload_format.value()),
408                |mt| format!("Invalid/unsupported payload format: {mt}"),
409            );
410            Err(UMessageError::from(detail_msg))
411        }
412    }
413}
414
415#[cfg(test)]
416mod test {
417    use std::io;
418
419    use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
420    use test_case::test_case;
421
422    use crate::{UAttributes, UStatus};
423
424    use super::*;
425
426    #[test]
427    fn test_deserialize_protobuf_bytes_succeeds() {
428        let mut data = StringValue::new();
429        data.value = "hello world".to_string();
430        let any = Any::pack(&data.clone()).unwrap();
431        let buf: Bytes = any.write_to_bytes().unwrap().into();
432
433        let result = deserialize_protobuf_bytes::<StringValue>(
434            &buf,
435            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
436        );
437        assert!(result.is_ok_and(|v| v.value == *"hello world"));
438
439        let result = deserialize_protobuf_bytes::<StringValue>(
440            &data.write_to_bytes().unwrap().into(),
441            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
442        );
443        assert!(result.is_ok_and(|v| v.value == *"hello world"));
444    }
445
446    #[test]
447    fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
448        let mut data = StringValue::new();
449        data.value = "hello world".to_string();
450        let any = Any::pack(&data).unwrap();
451        let buf: Bytes = any.write_to_bytes().unwrap().into();
452        let result = deserialize_protobuf_bytes::<UStatus>(
453            &buf,
454            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
455        );
456        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
457    }
458
459    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
460    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
461    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
462    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
463    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
464    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
465    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
466    fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
467        let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
468        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
469    }
470
471    #[test]
472    fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
473        let any = Any {
474            type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
475            value: vec![0x0A],
476            ..Default::default()
477        };
478        let buf = any.write_to_bytes().unwrap();
479        let result = deserialize_protobuf_bytes::<Duration>(
480            &buf.into(),
481            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
482        );
483        assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
484    }
485
486    #[test]
487    fn extract_payload_succeeds() {
488        let payload = StringValue {
489            value: "hello".to_string(),
490            ..Default::default()
491        };
492        let buf = Any::pack(&payload)
493            .and_then(|a| a.write_to_bytes())
494            .unwrap();
495        let msg = UMessage {
496            attributes: Some(UAttributes {
497                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
498                ..Default::default()
499            })
500            .into(),
501            payload: Some(buf.into()),
502            ..Default::default()
503        };
504        assert!(msg
505            .extract_protobuf::<StringValue>()
506            .is_ok_and(|v| v.value == *"hello"));
507    }
508
509    #[test]
510    fn extract_payload_fails_for_no_payload() {
511        let msg = UMessage {
512            attributes: Some(UAttributes {
513                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
514                ..Default::default()
515            })
516            .into(),
517            ..Default::default()
518        };
519        assert!(msg
520            .extract_protobuf::<StringValue>()
521            .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
522    }
523
524    #[test]
525    fn test_from_attributes_error() {
526        let attributes_error = UAttributesError::validation_error("failed to validate");
527        let message_error = UMessageError::from(attributes_error);
528        assert!(matches!(
529            message_error,
530            UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
531        ));
532    }
533
534    #[test]
535    fn test_from_protobuf_error() {
536        let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
537        let message_error = UMessageError::from(protobuf_error);
538        assert!(matches!(
539            message_error,
540            UMessageError::DataSerializationError(_)
541        ));
542    }
543
544    #[test]
545    fn test_from_error_msg() {
546        let message_error = UMessageError::from("an error occurred");
547        assert!(matches!(message_error, UMessageError::PayloadError(_)));
548    }
549}