up_rust/
umessage.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{
25    UAttributes, UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID,
26};
27
28#[derive(Debug)]
29pub enum UMessageError {
30    AttributesValidationError(UAttributesError),
31    DataSerializationError(protobuf::Error),
32    PayloadError(String),
33}
34
35impl std::fmt::Display for UMessageError {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::AttributesValidationError(e) => f.write_fmt(format_args!(
39                "Builder state is not consistent with message type: {e}"
40            )),
41            Self::DataSerializationError(e) => {
42                f.write_fmt(format_args!("Failed to serialize payload: {e}"))
43            }
44            Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")),
45        }
46    }
47}
48
49impl std::error::Error for UMessageError {}
50
51impl From<UAttributesError> for UMessageError {
52    fn from(value: UAttributesError) -> Self {
53        Self::AttributesValidationError(value)
54    }
55}
56
57impl From<protobuf::Error> for UMessageError {
58    fn from(value: protobuf::Error) -> Self {
59        Self::DataSerializationError(value)
60    }
61}
62
63impl From<String> for UMessageError {
64    fn from(value: String) -> Self {
65        Self::PayloadError(value)
66    }
67}
68
69impl From<&str> for UMessageError {
70    fn from(value: &str) -> Self {
71        Self::from(value.to_string())
72    }
73}
74
75impl UMessage {
76    /// Get this message's attributes.
77    pub fn attributes(&self) -> Option<&UAttributes> {
78        self.attributes.as_ref()
79    }
80    /// Gets this message's attributes.
81    pub fn attributes_unchecked(&self) -> &UAttributes {
82        self.attributes().expect("message has no attributes")
83    }
84
85    /// Gets this message's type.
86    pub fn type_(&self) -> Option<UMessageType> {
87        self.attributes().and_then(UAttributes::type_)
88    }
89
90    /// Gets this message's type.
91    ///
92    /// # Panics
93    ///
94    /// if the property has no value.
95    pub fn type_unchecked(&self) -> UMessageType {
96        self.attributes_unchecked().type_unchecked()
97    }
98
99    /// Gets this message's identifier.
100    pub fn id(&self) -> Option<&UUID> {
101        self.attributes().and_then(UAttributes::id)
102    }
103
104    /// Gets this message's identifier.
105    ///
106    /// # Panics
107    ///
108    /// if the property has no value.
109    pub fn id_unchecked(&self) -> &UUID {
110        self.attributes_unchecked().id_unchecked()
111    }
112
113    /// Gets this message's source address.
114    pub fn source(&self) -> Option<&UUri> {
115        self.attributes().and_then(UAttributes::source)
116    }
117
118    /// Gets this message's source address.
119    ///
120    /// # Panics
121    ///
122    /// if the property has no value.
123    pub fn source_unchecked(&self) -> &UUri {
124        self.attributes_unchecked().source_unchecked()
125    }
126
127    /// Gets this message's sink address.
128    pub fn sink(&self) -> Option<&UUri> {
129        self.attributes().and_then(UAttributes::sink)
130    }
131
132    /// Gets this message's sink address.
133    ///
134    /// # Panics
135    ///
136    /// if the property has no value.
137    pub fn sink_unchecked(&self) -> &UUri {
138        self.attributes_unchecked().sink_unchecked()
139    }
140
141    /// Gets this message's priority.
142    pub fn priority(&self) -> Option<UPriority> {
143        self.attributes().and_then(UAttributes::priority)
144    }
145
146    /// Gets this message's priority.
147    ///
148    /// # Panics
149    ///
150    /// if the property has no value.
151    pub fn priority_unchecked(&self) -> UPriority {
152        self.attributes_unchecked().priority_unchecked()
153    }
154
155    /// Gets this message's commstatus.
156    pub fn commstatus(&self) -> Option<UCode> {
157        self.attributes().and_then(UAttributes::commstatus)
158    }
159
160    /// Gets this message's commstatus.
161    ///
162    /// # Panics
163    ///
164    /// if the property has no value.
165    pub fn commstatus_unchecked(&self) -> UCode {
166        self.attributes_unchecked().commstatus_unchecked()
167    }
168
169    /// Gets this message's time-to-live.
170    ///
171    /// # Returns
172    ///
173    /// the time-to-live in milliseconds.
174    pub fn ttl(&self) -> Option<u32> {
175        self.attributes().and_then(UAttributes::ttl)
176    }
177
178    /// Gets this message's time-to-live.
179    ///
180    /// # Returns
181    ///
182    /// the time-to-live in milliseconds.
183    ///
184    /// # Panics
185    ///
186    /// if the property has no value.
187    pub fn ttl_unchecked(&self) -> u32 {
188        self.attributes_unchecked().ttl_unchecked()
189    }
190
191    /// Gets this message's permission level.
192    pub fn permission_level(&self) -> Option<u32> {
193        self.attributes().and_then(UAttributes::permission_level)
194    }
195
196    /// Gets this message's token.
197    pub fn token(&self) -> Option<&String> {
198        self.attributes().and_then(UAttributes::token)
199    }
200
201    /// Gets this message's traceparent.
202    pub fn traceparent(&self) -> Option<&String> {
203        self.attributes().and_then(UAttributes::traceparent)
204    }
205
206    /// Gets this message's request identifier.
207    pub fn request_id(&self) -> Option<&UUID> {
208        self.attributes().and_then(UAttributes::request_id)
209    }
210
211    /// Gets this message's request identifier.
212    ///
213    /// # Panics
214    ///
215    /// if the property has no value.
216    pub fn request_id_unchecked(&self) -> &UUID {
217        self.attributes_unchecked().request_id_unchecked()
218    }
219
220    /// Gets this message's payload format.
221    pub fn payload_format(&self) -> Option<UPayloadFormat> {
222        self.attributes().and_then(UAttributes::payload_format)
223    }
224
225    /// Gets this message's payload format.
226    ///
227    /// # Panics
228    ///
229    /// if the property has no value.
230    pub fn payload_format_unchecked(&self) -> UPayloadFormat {
231        self.attributes_unchecked().payload_format_unchecked()
232    }
233
234    /// Checks if this is a Publish message.
235    ///
236    /// # Examples
237    ///
238    /// ```rust
239    /// use up_rust::{UAttributes, UMessage, UMessageType};
240    ///
241    /// let attribs = UAttributes {
242    ///   type_: UMessageType::UMESSAGE_TYPE_PUBLISH.into(),
243    ///   ..Default::default()
244    /// };
245    /// let msg = UMessage {
246    ///   attributes: Some(attribs).into(),
247    ///   ..Default::default()
248    /// };
249    /// assert!(msg.is_publish());
250    /// ```
251    pub fn is_publish(&self) -> bool {
252        self.attributes().is_some_and(UAttributes::is_publish)
253    }
254
255    /// Checks if this is an RPC Request message.
256    ///
257    /// # Examples
258    ///
259    /// ```rust
260    /// use up_rust::{UAttributes, UMessage, UMessageType};
261    ///
262    /// let attribs = UAttributes {
263    ///   type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(),
264    ///   ..Default::default()
265    /// };
266    /// let msg = UMessage {
267    ///   attributes: Some(attribs).into(),
268    ///   ..Default::default()
269    /// };
270    /// assert!(msg.is_request());
271    /// ```
272    pub fn is_request(&self) -> bool {
273        self.attributes().is_some_and(UAttributes::is_request)
274    }
275
276    /// Checks if this is an RPC Response message.
277    ///
278    /// # Examples
279    ///
280    /// ```rust
281    /// use up_rust::{UAttributes, UMessage, UMessageType};
282    ///
283    /// let attribs = UAttributes {
284    ///   type_: UMessageType::UMESSAGE_TYPE_RESPONSE.into(),
285    ///   ..Default::default()
286    /// };
287    /// let msg = UMessage {
288    ///   attributes: Some(attribs).into(),
289    ///   ..Default::default()
290    /// };
291    /// assert!(msg.is_response());
292    /// ```
293    pub fn is_response(&self) -> bool {
294        self.attributes().is_some_and(UAttributes::is_response)
295    }
296
297    /// Checks if this is a Notification message.
298    ///
299    /// # Examples
300    ///
301    /// ```rust
302    /// use up_rust::{UAttributes, UMessage, UMessageType};
303    ///
304    /// let attribs = UAttributes {
305    ///   type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
306    ///   ..Default::default()
307    /// };
308    /// let msg = UMessage {
309    ///   attributes: Some(attribs).into(),
310    ///   ..Default::default()
311    /// };
312    /// assert!(msg.is_notification());
313    /// ```
314    pub fn is_notification(&self) -> bool {
315        self.attributes().is_some_and(UAttributes::is_notification)
316    }
317
318    /// Deserializes this message's protobuf payload into a type.
319    ///
320    /// # Type Parameters
321    ///
322    /// * `T`: The target type of the data to be unpacked.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the message payload format is neither [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF]
327    /// nor [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY] or if the bytes in the
328    /// payload cannot be deserialized into the target type.
329    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
330        if let Some(payload) = self.payload.as_ref() {
331            let payload_format = self.payload_format().unwrap_or_default();
332            deserialize_protobuf_bytes(payload, &payload_format)
333        } else {
334            Err(UMessageError::PayloadError(
335                "Message has no payload".to_string(),
336            ))
337        }
338    }
339}
340
341/// Deserializes a protobuf message from a byte array.
342///
343/// # Type Parameters
344///
345/// * `T`: The target type of the data to be unpacked.
346///
347/// # Arguments
348///
349/// * `payload` - The payload data.
350/// * `payload_format` - The format/encoding of the data. Must be one of
351///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`
352///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`
353///
354/// # Errors
355///
356/// Returns an error if the payload format is unsupported or if the data can not be deserialized
357/// into the target type based on the given format.
358pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
359    payload: &Bytes,
360    payload_format: &UPayloadFormat,
361) -> Result<T, UMessageError> {
362    match payload_format {
363        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
364            T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
365        }
366        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
367            Any::parse_from_tokio_bytes(payload)
368                .map_err(UMessageError::DataSerializationError)
369                .and_then(|any| match any.unpack() {
370                    Ok(Some(v)) => Ok(v),
371                    Ok(None) => Err(UMessageError::PayloadError(
372                        "cannot deserialize payload, message type mismatch".to_string(),
373                    )),
374                    Err(e) => Err(UMessageError::DataSerializationError(e)),
375                })
376        }
377        _ => {
378            let detail_msg = payload_format.to_media_type().map_or_else(
379                || format!("Unknown payload format: {}", payload_format.value()),
380                |mt| format!("Invalid/unsupported payload format: {mt}"),
381            );
382            Err(UMessageError::from(detail_msg))
383        }
384    }
385}
386
387#[cfg(test)]
388mod test {
389    use std::io;
390
391    use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
392    use test_case::test_case;
393
394    use crate::{UAttributes, UStatus};
395
396    use super::*;
397
398    #[test]
399    fn test_deserialize_protobuf_bytes_succeeds() {
400        let mut data = StringValue::new();
401        data.value = "hello world".to_string();
402        let any = Any::pack(&data.clone()).unwrap();
403        let buf: Bytes = any.write_to_bytes().unwrap().into();
404
405        let result = deserialize_protobuf_bytes::<StringValue>(
406            &buf,
407            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
408        );
409        assert!(result.is_ok_and(|v| v.value == *"hello world"));
410
411        let result = deserialize_protobuf_bytes::<StringValue>(
412            &data.write_to_bytes().unwrap().into(),
413            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
414        );
415        assert!(result.is_ok_and(|v| v.value == *"hello world"));
416    }
417
418    #[test]
419    fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
420        let mut data = StringValue::new();
421        data.value = "hello world".to_string();
422        let any = Any::pack(&data).unwrap();
423        let buf: Bytes = any.write_to_bytes().unwrap().into();
424        let result = deserialize_protobuf_bytes::<UStatus>(
425            &buf,
426            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
427        );
428        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
429    }
430
431    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
432    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
433    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
434    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
435    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
436    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
437    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
438    fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
439        let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
440        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
441    }
442
443    #[test]
444    fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
445        let any = Any {
446            type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
447            value: vec![0x0A],
448            ..Default::default()
449        };
450        let buf = any.write_to_bytes().unwrap();
451        let result = deserialize_protobuf_bytes::<Duration>(
452            &buf.into(),
453            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
454        );
455        assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
456    }
457
458    #[test]
459    fn extract_payload_succeeds() {
460        let payload = StringValue {
461            value: "hello".to_string(),
462            ..Default::default()
463        };
464        let buf = Any::pack(&payload)
465            .and_then(|a| a.write_to_bytes())
466            .unwrap();
467        let msg = UMessage {
468            attributes: Some(UAttributes {
469                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
470                ..Default::default()
471            })
472            .into(),
473            payload: Some(buf.into()),
474            ..Default::default()
475        };
476        assert!(msg
477            .extract_protobuf::<StringValue>()
478            .is_ok_and(|v| v.value == *"hello"));
479    }
480
481    #[test]
482    fn extract_payload_fails_for_no_payload() {
483        let msg = UMessage {
484            attributes: Some(UAttributes {
485                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
486                ..Default::default()
487            })
488            .into(),
489            ..Default::default()
490        };
491        assert!(msg
492            .extract_protobuf::<StringValue>()
493            .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
494    }
495
496    #[test]
497    fn test_from_attributes_error() {
498        let attributes_error = UAttributesError::validation_error("failed to validate");
499        let message_error = UMessageError::from(attributes_error);
500        assert!(matches!(
501            message_error,
502            UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
503        ));
504    }
505
506    #[test]
507    fn test_from_protobuf_error() {
508        let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
509        let message_error = UMessageError::from(protobuf_error);
510        assert!(matches!(
511            message_error,
512            UMessageError::DataSerializationError(_)
513        ));
514    }
515
516    #[test]
517    fn test_from_error_msg() {
518        let message_error = UMessageError::from("an error occurred");
519        assert!(matches!(message_error, UMessageError::PayloadError(_)));
520    }
521}