up_rust/
umessage.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UPayloadFormat};
25
26#[derive(Debug)]
27pub enum UMessageError {
28    AttributesValidationError(UAttributesError),
29    DataSerializationError(protobuf::Error),
30    PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37                "Builder state is not consistent with message type: {}",
38                e
39            )),
40            Self::DataSerializationError(e) => {
41                f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42            }
43            Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44        }
45    }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51    fn from(value: UAttributesError) -> Self {
52        Self::AttributesValidationError(value)
53    }
54}
55
56impl From<protobuf::Error> for UMessageError {
57    fn from(value: protobuf::Error) -> Self {
58        Self::DataSerializationError(value)
59    }
60}
61
62impl From<String> for UMessageError {
63    fn from(value: String) -> Self {
64        Self::PayloadError(value)
65    }
66}
67
68impl From<&str> for UMessageError {
69    fn from(value: &str) -> Self {
70        Self::from(value.to_string())
71    }
72}
73
74impl UMessage {
75    /// Checks if this is a Publish message.
76    ///
77    /// # Examples
78    ///
79    /// ```rust
80    /// use up_rust::{UAttributes, UMessage, UMessageType};
81    ///
82    /// let attribs = UAttributes {
83    ///   type_: UMessageType::UMESSAGE_TYPE_PUBLISH.into(),
84    ///   ..Default::default()
85    /// };
86    /// let msg = UMessage {
87    ///   attributes: Some(attribs).into(),
88    ///   ..Default::default()
89    /// };
90    /// assert!(msg.is_publish());
91    /// ```
92    pub fn is_publish(&self) -> bool {
93        self.attributes
94            .as_ref()
95            .is_some_and(|attribs| attribs.is_publish())
96    }
97
98    /// Checks if this is an RPC Request message.
99    ///
100    /// # Examples
101    ///
102    /// ```rust
103    /// use up_rust::{UAttributes, UMessage, UMessageType};
104    ///
105    /// let attribs = UAttributes {
106    ///   type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(),
107    ///   ..Default::default()
108    /// };
109    /// let msg = UMessage {
110    ///   attributes: Some(attribs).into(),
111    ///   ..Default::default()
112    /// };
113    /// assert!(msg.is_request());
114    /// ```
115    pub fn is_request(&self) -> bool {
116        self.attributes
117            .as_ref()
118            .is_some_and(|attribs| attribs.is_request())
119    }
120
121    /// Checks if this is an RPC Response message.
122    ///
123    /// # Examples
124    ///
125    /// ```rust
126    /// use up_rust::{UAttributes, UMessage, UMessageType};
127    ///
128    /// let attribs = UAttributes {
129    ///   type_: UMessageType::UMESSAGE_TYPE_RESPONSE.into(),
130    ///   ..Default::default()
131    /// };
132    /// let msg = UMessage {
133    ///   attributes: Some(attribs).into(),
134    ///   ..Default::default()
135    /// };
136    /// assert!(msg.is_response());
137    /// ```
138    pub fn is_response(&self) -> bool {
139        self.attributes
140            .as_ref()
141            .is_some_and(|attribs| attribs.is_response())
142    }
143
144    /// Checks if this is a Notification message.
145    ///
146    /// # Examples
147    ///
148    /// ```rust
149    /// use up_rust::{UAttributes, UMessage, UMessageType};
150    ///
151    /// let attribs = UAttributes {
152    ///   type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
153    ///   ..Default::default()
154    /// };
155    /// let msg = UMessage {
156    ///   attributes: Some(attribs).into(),
157    ///   ..Default::default()
158    /// };
159    /// assert!(msg.is_notification());
160    /// ```
161    pub fn is_notification(&self) -> bool {
162        self.attributes
163            .as_ref()
164            .is_some_and(|attribs| attribs.is_notification())
165    }
166
167    /// If `UMessage` payload is available, deserialize it as a protobuf `Message`.
168    ///
169    /// This function is used to extract strongly-typed data from a `UMessage` object,
170    /// taking into account the payload format (will only succeed if payload format is
171    /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`)
172    ///
173    /// # Type Parameters
174    ///
175    /// * `T`: The target type of the data to be unpacked.
176    ///
177    /// # Returns
178    ///
179    /// * `Ok(T)`: The deserialized protobuf message contained in the payload.
180    ///
181    /// # Errors
182    ///
183    /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could
184    ///   not be deserialized into the target type `T`.
185    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
186        if let Some(payload) = &self.payload {
187            let payload_format = self.attributes.payload_format.enum_value_or_default();
188            deserialize_protobuf_bytes(payload, &payload_format)
189        } else {
190            Err(UMessageError::PayloadError(
191                "No embedded payload".to_string(),
192            ))
193        }
194    }
195}
196
197/// Deserializes a protobuf message from a byte array.
198///
199/// # Arguments
200///
201/// * `payload` - The payload data.
202/// * `payload_format` - The format/encoding of the data. Must be one of
203///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`
204///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`
205///    - `UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED`
206///
207/// `UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED` is interpreted as
208/// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY` according to the uProtocol
209/// specification.
210///
211/// # Errors
212///
213/// Returns an error if the payload format is unsupported or if the data is can not be deserialized
214/// based on the given format.
215pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
216    payload: &Bytes,
217    payload_format: &UPayloadFormat,
218) -> Result<T, UMessageError> {
219    match payload_format {
220        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
221            T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
222        }
223        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
224        | UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED => Any::parse_from_tokio_bytes(payload)
225            .map_err(UMessageError::DataSerializationError)
226            .and_then(|any| match any.unpack() {
227                Ok(Some(v)) => Ok(v),
228                Ok(None) => Err(UMessageError::PayloadError(
229                    "cannot deserialize payload, message type mismatch".to_string(),
230                )),
231                Err(e) => Err(UMessageError::DataSerializationError(e)),
232            }),
233        _ => Err(UMessageError::from(format!(
234            "Unknown/invalid/unsupported payload format: {}",
235            payload_format
236                .to_media_type()
237                .unwrap_or("unknown".to_string())
238        ))),
239    }
240}
241
242#[cfg(test)]
243mod test {
244    use std::io;
245
246    use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
247    use test_case::test_case;
248
249    use crate::{UAttributes, UStatus};
250
251    use super::*;
252
253    #[test]
254    fn test_deserialize_protobuf_bytes_succeeds() {
255        let mut data = StringValue::new();
256        data.value = "hello world".to_string();
257        let any = Any::pack(&data.clone()).unwrap();
258        let buf: Bytes = any.write_to_bytes().unwrap().into();
259
260        let result = deserialize_protobuf_bytes::<StringValue>(
261            &buf,
262            &UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED,
263        );
264        assert!(result.is_ok_and(|v| v.value == *"hello world"));
265
266        let result = deserialize_protobuf_bytes::<StringValue>(
267            &buf,
268            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
269        );
270        assert!(result.is_ok_and(|v| v.value == *"hello world"));
271
272        let result = deserialize_protobuf_bytes::<StringValue>(
273            &data.write_to_bytes().unwrap().into(),
274            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
275        );
276        assert!(result.is_ok_and(|v| v.value == *"hello world"));
277    }
278
279    #[test]
280    fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
281        let mut data = StringValue::new();
282        data.value = "hello world".to_string();
283        let any = Any::pack(&data).unwrap();
284        let buf: Bytes = any.write_to_bytes().unwrap().into();
285        let result = deserialize_protobuf_bytes::<UStatus>(
286            &buf,
287            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
288        );
289        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
290    }
291
292    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
293    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
294    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
295    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
296    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
297    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
298    fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
299        let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
300        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
301    }
302
303    #[test]
304    fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
305        let any = Any {
306            type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
307            value: vec![0x0A],
308            ..Default::default()
309        };
310        let buf = any.write_to_bytes().unwrap();
311        let result = deserialize_protobuf_bytes::<Duration>(
312            &buf.into(),
313            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
314        );
315        assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
316    }
317
318    #[test]
319    fn extract_payload_succeeds() {
320        let payload = StringValue {
321            value: "hello".to_string(),
322            ..Default::default()
323        };
324        let buf = Any::pack(&payload)
325            .and_then(|a| a.write_to_bytes())
326            .unwrap();
327        let msg = UMessage {
328            attributes: Some(UAttributes {
329                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
330                ..Default::default()
331            })
332            .into(),
333            payload: Some(buf.into()),
334            ..Default::default()
335        };
336        assert!(msg
337            .extract_protobuf::<StringValue>()
338            .is_ok_and(|v| v.value == *"hello"));
339    }
340
341    #[test]
342    fn extract_payload_fails_for_no_payload() {
343        let msg = UMessage {
344            attributes: Some(UAttributes {
345                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
346                ..Default::default()
347            })
348            .into(),
349            ..Default::default()
350        };
351        assert!(msg
352            .extract_protobuf::<StringValue>()
353            .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
354    }
355
356    #[test]
357    fn test_from_attributes_error() {
358        let attributes_error = UAttributesError::validation_error("failed to validate");
359        let message_error = UMessageError::from(attributes_error);
360        assert!(matches!(
361            message_error,
362            UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
363        ));
364    }
365
366    #[test]
367    fn test_from_protobuf_error() {
368        let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
369        let message_error = UMessageError::from(protobuf_error);
370        assert!(matches!(
371            message_error,
372            UMessageError::DataSerializationError(_)
373        ));
374    }
375
376    #[test]
377    fn test_from_error_msg() {
378        let message_error = UMessageError::from("an error occurred");
379        assert!(matches!(message_error, UMessageError::PayloadError(_)));
380    }
381}