up_rust/
umessage.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28    AttributesValidationError(UAttributesError),
29    DataSerializationError(protobuf::Error),
30    PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37                "Builder state is not consistent with message type: {}",
38                e
39            )),
40            Self::DataSerializationError(e) => {
41                f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42            }
43            Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44        }
45    }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51    fn from(value: UAttributesError) -> Self {
52        Self::AttributesValidationError(value)
53    }
54}
55
56impl From<protobuf::Error> for UMessageError {
57    fn from(value: protobuf::Error) -> Self {
58        Self::DataSerializationError(value)
59    }
60}
61
62impl From<String> for UMessageError {
63    fn from(value: String) -> Self {
64        Self::PayloadError(value)
65    }
66}
67
68impl From<&str> for UMessageError {
69    fn from(value: &str) -> Self {
70        Self::from(value.to_string())
71    }
72}
73
74impl UMessage {
75    /// Gets this message's type.
76    pub fn type_(&self) -> Option<UMessageType> {
77        self.attributes
78            .as_ref()
79            .and_then(|attribs| attribs.type_.enum_value().ok())
80    }
81
82    /// Gets this message's type.
83    ///
84    /// # Panics
85    ///
86    /// if the property has no value.
87    pub fn type_unchecked(&self) -> UMessageType {
88        self.type_().expect("message has no type")
89    }
90
91    /// Gets this message's identifier.
92    pub fn id(&self) -> Option<&UUID> {
93        self.attributes
94            .as_ref()
95            .and_then(|attribs| attribs.id.as_ref())
96    }
97
98    /// Gets this message's identifier.
99    ///
100    /// # Panics
101    ///
102    /// if the property has no value.
103    pub fn id_unchecked(&self) -> &UUID {
104        self.id().expect("message has no ID")
105    }
106
107    /// Gets this message's source address.
108    pub fn source(&self) -> Option<&UUri> {
109        self.attributes
110            .as_ref()
111            .and_then(|attribs| attribs.source.as_ref())
112    }
113
114    /// Gets this message's source address.
115    ///
116    /// # Panics
117    ///
118    /// if the property has no value.
119    pub fn source_unchecked(&self) -> &UUri {
120        self.source().expect("message has no source")
121    }
122
123    /// Gets this message's sink address.
124    pub fn sink(&self) -> Option<&UUri> {
125        self.attributes
126            .as_ref()
127            .and_then(|attribs| attribs.sink.as_ref())
128    }
129
130    /// Gets this message's sink address.
131    ///
132    /// # Panics
133    ///
134    /// if the property has no value.
135    pub fn sink_unchecked(&self) -> &UUri {
136        self.sink().expect("message has no sink")
137    }
138
139    /// Gets this message's priority.
140    pub fn priority(&self) -> Option<UPriority> {
141        self.attributes
142            .as_ref()
143            .and_then(|attribs| attribs.priority.enum_value().ok())
144    }
145
146    /// Gets this message's priority.
147    ///
148    /// # Panics
149    ///
150    /// if the property has no value.
151    pub fn priority_unchecked(&self) -> UPriority {
152        self.priority().expect("message has no priority")
153    }
154
155    /// Gets this message's commstatus.
156    pub fn commstatus(&self) -> Option<UCode> {
157        self.attributes
158            .as_ref()
159            .and_then(|attribs| attribs.commstatus)
160            .and_then(|v| v.enum_value().ok())
161    }
162
163    /// Gets this message's commstatus.
164    ///
165    /// # Panics
166    ///
167    /// if the property has no value.
168    pub fn commstatus_unchecked(&self) -> UCode {
169        self.commstatus().expect("message has no commstatus")
170    }
171
172    /// Gets this message's time-to-live.
173    ///
174    /// # Returns
175    ///
176    /// the time-to-live in milliseconds.
177    pub fn ttl(&self) -> Option<u32> {
178        self.attributes.as_ref().and_then(|attribs| attribs.ttl)
179    }
180
181    /// Gets this message's time-to-live.
182    ///
183    /// # Returns
184    ///
185    /// the time-to-live in milliseconds.
186    ///
187    /// # Panics
188    ///
189    /// if the property has no value.
190    pub fn ttl_unchecked(&self) -> u32 {
191        self.ttl().expect("message has no time-to-live")
192    }
193
194    /// Gets this message's permission level.
195    pub fn permission_level(&self) -> Option<u32> {
196        self.attributes
197            .as_ref()
198            .and_then(|attribs| attribs.permission_level)
199    }
200
201    /// Gets this message's token.
202    pub fn token(&self) -> Option<&String> {
203        self.attributes
204            .as_ref()
205            .and_then(|attribs| attribs.token.as_ref())
206    }
207
208    /// Gets this message's traceparent.
209    pub fn traceparent(&self) -> Option<&String> {
210        self.attributes
211            .as_ref()
212            .and_then(|attribs| attribs.traceparent.as_ref())
213    }
214
215    /// Gets this message's request identifier.
216    pub fn request_id(&self) -> Option<&UUID> {
217        self.attributes
218            .as_ref()
219            .and_then(|attribs| attribs.reqid.as_ref())
220    }
221
222    /// Gets this message's request identifier.
223    ///
224    /// # Panics
225    ///
226    /// if the property has no value.
227    pub fn request_id_unchecked(&self) -> &UUID {
228        self.request_id().expect("message has no request ID")
229    }
230
231    /// Gets this message's payload format.
232    pub fn payload_format(&self) -> Option<UPayloadFormat> {
233        self.attributes
234            .as_ref()
235            .and_then(|attribs| attribs.payload_format.enum_value().ok())
236    }
237
238    /// Gets this message's payload format.
239    ///
240    /// # Panics
241    ///
242    /// if the property has no value.
243    pub fn payload_format_unchecked(&self) -> UPayloadFormat {
244        self.payload_format()
245            .expect("message has no payload format")
246    }
247
248    /// Checks if this is a Publish message.
249    ///
250    /// # Examples
251    ///
252    /// ```rust
253    /// use up_rust::{UAttributes, UMessage, UMessageType};
254    ///
255    /// let attribs = UAttributes {
256    ///   type_: UMessageType::UMESSAGE_TYPE_PUBLISH.into(),
257    ///   ..Default::default()
258    /// };
259    /// let msg = UMessage {
260    ///   attributes: Some(attribs).into(),
261    ///   ..Default::default()
262    /// };
263    /// assert!(msg.is_publish());
264    /// ```
265    pub fn is_publish(&self) -> bool {
266        self.attributes
267            .as_ref()
268            .is_some_and(|attribs| attribs.is_publish())
269    }
270
271    /// Checks if this is an RPC Request message.
272    ///
273    /// # Examples
274    ///
275    /// ```rust
276    /// use up_rust::{UAttributes, UMessage, UMessageType};
277    ///
278    /// let attribs = UAttributes {
279    ///   type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(),
280    ///   ..Default::default()
281    /// };
282    /// let msg = UMessage {
283    ///   attributes: Some(attribs).into(),
284    ///   ..Default::default()
285    /// };
286    /// assert!(msg.is_request());
287    /// ```
288    pub fn is_request(&self) -> bool {
289        self.attributes
290            .as_ref()
291            .is_some_and(|attribs| attribs.is_request())
292    }
293
294    /// Checks if this is an RPC Response message.
295    ///
296    /// # Examples
297    ///
298    /// ```rust
299    /// use up_rust::{UAttributes, UMessage, UMessageType};
300    ///
301    /// let attribs = UAttributes {
302    ///   type_: UMessageType::UMESSAGE_TYPE_RESPONSE.into(),
303    ///   ..Default::default()
304    /// };
305    /// let msg = UMessage {
306    ///   attributes: Some(attribs).into(),
307    ///   ..Default::default()
308    /// };
309    /// assert!(msg.is_response());
310    /// ```
311    pub fn is_response(&self) -> bool {
312        self.attributes
313            .as_ref()
314            .is_some_and(|attribs| attribs.is_response())
315    }
316
317    /// Checks if this is a Notification message.
318    ///
319    /// # Examples
320    ///
321    /// ```rust
322    /// use up_rust::{UAttributes, UMessage, UMessageType};
323    ///
324    /// let attribs = UAttributes {
325    ///   type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
326    ///   ..Default::default()
327    /// };
328    /// let msg = UMessage {
329    ///   attributes: Some(attribs).into(),
330    ///   ..Default::default()
331    /// };
332    /// assert!(msg.is_notification());
333    /// ```
334    pub fn is_notification(&self) -> bool {
335        self.attributes
336            .as_ref()
337            .is_some_and(|attribs| attribs.is_notification())
338    }
339
340    /// If `UMessage` payload is available, deserialize it as a protobuf `Message`.
341    ///
342    /// This function is used to extract strongly-typed data from a `UMessage` object,
343    /// taking into account the payload format (will only succeed if payload format is
344    /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`)
345    ///
346    /// # Type Parameters
347    ///
348    /// * `T`: The target type of the data to be unpacked.
349    ///
350    /// # Returns
351    ///
352    /// * `Ok(T)`: The deserialized protobuf message contained in the payload.
353    ///
354    /// # Errors
355    ///
356    /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could
357    ///   not be deserialized into the target type `T`.
358    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
359        if let Some(payload) = &self.payload {
360            let payload_format = self.attributes.payload_format.enum_value_or_default();
361            deserialize_protobuf_bytes(payload, &payload_format)
362        } else {
363            Err(UMessageError::PayloadError(
364                "No embedded payload".to_string(),
365            ))
366        }
367    }
368}
369
370/// Deserializes a protobuf message from a byte array.
371///
372/// # Arguments
373///
374/// * `payload` - The payload data.
375/// * `payload_format` - The format/encoding of the data. Must be one of
376///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`
377///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`
378///    - `UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED`
379///
380/// `UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED` is interpreted as
381/// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY` according to the uProtocol
382/// specification.
383///
384/// # Errors
385///
386/// Returns an error if the payload format is unsupported or if the data is can not be deserialized
387/// based on the given format.
388pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
389    payload: &Bytes,
390    payload_format: &UPayloadFormat,
391) -> Result<T, UMessageError> {
392    match payload_format {
393        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
394            T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
395        }
396        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
397        | UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED => Any::parse_from_tokio_bytes(payload)
398            .map_err(UMessageError::DataSerializationError)
399            .and_then(|any| match any.unpack() {
400                Ok(Some(v)) => Ok(v),
401                Ok(None) => Err(UMessageError::PayloadError(
402                    "cannot deserialize payload, message type mismatch".to_string(),
403                )),
404                Err(e) => Err(UMessageError::DataSerializationError(e)),
405            }),
406        _ => Err(UMessageError::from(format!(
407            "Unknown/invalid/unsupported payload format: {}",
408            payload_format
409                .to_media_type()
410                .unwrap_or("unknown".to_string())
411        ))),
412    }
413}
414
415#[cfg(test)]
416mod test {
417    use std::io;
418
419    use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
420    use test_case::test_case;
421
422    use crate::{UAttributes, UStatus};
423
424    use super::*;
425
426    #[test]
427    fn test_deserialize_protobuf_bytes_succeeds() {
428        let mut data = StringValue::new();
429        data.value = "hello world".to_string();
430        let any = Any::pack(&data.clone()).unwrap();
431        let buf: Bytes = any.write_to_bytes().unwrap().into();
432
433        let result = deserialize_protobuf_bytes::<StringValue>(
434            &buf,
435            &UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED,
436        );
437        assert!(result.is_ok_and(|v| v.value == *"hello world"));
438
439        let result = deserialize_protobuf_bytes::<StringValue>(
440            &buf,
441            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
442        );
443        assert!(result.is_ok_and(|v| v.value == *"hello world"));
444
445        let result = deserialize_protobuf_bytes::<StringValue>(
446            &data.write_to_bytes().unwrap().into(),
447            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
448        );
449        assert!(result.is_ok_and(|v| v.value == *"hello world"));
450    }
451
452    #[test]
453    fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
454        let mut data = StringValue::new();
455        data.value = "hello world".to_string();
456        let any = Any::pack(&data).unwrap();
457        let buf: Bytes = any.write_to_bytes().unwrap().into();
458        let result = deserialize_protobuf_bytes::<UStatus>(
459            &buf,
460            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
461        );
462        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
463    }
464
465    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
466    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
467    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
468    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
469    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
470    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
471    fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
472        let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
473        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
474    }
475
476    #[test]
477    fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
478        let any = Any {
479            type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
480            value: vec![0x0A],
481            ..Default::default()
482        };
483        let buf = any.write_to_bytes().unwrap();
484        let result = deserialize_protobuf_bytes::<Duration>(
485            &buf.into(),
486            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
487        );
488        assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
489    }
490
491    #[test]
492    fn extract_payload_succeeds() {
493        let payload = StringValue {
494            value: "hello".to_string(),
495            ..Default::default()
496        };
497        let buf = Any::pack(&payload)
498            .and_then(|a| a.write_to_bytes())
499            .unwrap();
500        let msg = UMessage {
501            attributes: Some(UAttributes {
502                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
503                ..Default::default()
504            })
505            .into(),
506            payload: Some(buf.into()),
507            ..Default::default()
508        };
509        assert!(msg
510            .extract_protobuf::<StringValue>()
511            .is_ok_and(|v| v.value == *"hello"));
512    }
513
514    #[test]
515    fn extract_payload_fails_for_no_payload() {
516        let msg = UMessage {
517            attributes: Some(UAttributes {
518                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
519                ..Default::default()
520            })
521            .into(),
522            ..Default::default()
523        };
524        assert!(msg
525            .extract_protobuf::<StringValue>()
526            .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
527    }
528
529    #[test]
530    fn test_from_attributes_error() {
531        let attributes_error = UAttributesError::validation_error("failed to validate");
532        let message_error = UMessageError::from(attributes_error);
533        assert!(matches!(
534            message_error,
535            UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
536        ));
537    }
538
539    #[test]
540    fn test_from_protobuf_error() {
541        let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
542        let message_error = UMessageError::from(protobuf_error);
543        assert!(matches!(
544            message_error,
545            UMessageError::DataSerializationError(_)
546        ));
547    }
548
549    #[test]
550    fn test_from_error_msg() {
551        let message_error = UMessageError::from("an error occurred");
552        assert!(matches!(message_error, UMessageError::PayloadError(_)));
553    }
554}