up_rust/
umessage.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14mod umessagebuilder;
15mod umessagetype;
16
17use bytes::Bytes;
18use protobuf::{well_known_types::any::Any, Enum, Message, MessageFull};
19
20pub use umessagebuilder::*;
21
22pub use crate::up_core_api::umessage::UMessage;
23
24use crate::{UAttributesError, UCode, UMessageType, UPayloadFormat, UPriority, UUri, UUID};
25
26#[derive(Debug)]
27pub enum UMessageError {
28    AttributesValidationError(UAttributesError),
29    DataSerializationError(protobuf::Error),
30    PayloadError(String),
31}
32
33impl std::fmt::Display for UMessageError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::AttributesValidationError(e) => f.write_fmt(format_args!(
37                "Builder state is not consistent with message type: {}",
38                e
39            )),
40            Self::DataSerializationError(e) => {
41                f.write_fmt(format_args!("Failed to serialize payload: {}", e))
42            }
43            Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
44        }
45    }
46}
47
48impl std::error::Error for UMessageError {}
49
50impl From<UAttributesError> for UMessageError {
51    fn from(value: UAttributesError) -> Self {
52        Self::AttributesValidationError(value)
53    }
54}
55
56impl From<protobuf::Error> for UMessageError {
57    fn from(value: protobuf::Error) -> Self {
58        Self::DataSerializationError(value)
59    }
60}
61
62impl From<String> for UMessageError {
63    fn from(value: String) -> Self {
64        Self::PayloadError(value)
65    }
66}
67
68impl From<&str> for UMessageError {
69    fn from(value: &str) -> Self {
70        Self::from(value.to_string())
71    }
72}
73
74impl UMessage {
75    /// Gets this message's type.
76    pub fn type_(&self) -> Option<UMessageType> {
77        self.attributes
78            .as_ref()
79            .and_then(|attribs| attribs.type_.enum_value().ok())
80    }
81
82    /// Gets this message's type.
83    ///
84    /// # Panics
85    ///
86    /// if the property has no value.
87    pub fn type_unchecked(&self) -> UMessageType {
88        self.type_().expect("message has no type")
89    }
90
91    /// Gets this message's identifier.
92    pub fn id(&self) -> Option<&UUID> {
93        self.attributes
94            .as_ref()
95            .and_then(|attribs| attribs.id.as_ref())
96    }
97
98    /// Gets this message's identifier.
99    ///
100    /// # Panics
101    ///
102    /// if the property has no value.
103    pub fn id_unchecked(&self) -> &UUID {
104        self.id().expect("message has no ID")
105    }
106
107    /// Gets this message's source address.
108    pub fn source(&self) -> Option<&UUri> {
109        self.attributes
110            .as_ref()
111            .and_then(|attribs| attribs.source.as_ref())
112    }
113
114    /// Gets this message's source address.
115    ///
116    /// # Panics
117    ///
118    /// if the property has no value.
119    pub fn source_unchecked(&self) -> &UUri {
120        self.source().expect("message has no source")
121    }
122
123    /// Gets this message's sink address.
124    pub fn sink(&self) -> Option<&UUri> {
125        self.attributes
126            .as_ref()
127            .and_then(|attribs| attribs.sink.as_ref())
128    }
129
130    /// Gets this message's sink address.
131    ///
132    /// # Panics
133    ///
134    /// if the property has no value.
135    pub fn sink_unchecked(&self) -> &UUri {
136        self.sink().expect("message has no sink")
137    }
138
139    /// Gets this message's priority.
140    pub fn priority(&self) -> Option<UPriority> {
141        self.attributes
142            .as_ref()
143            .and_then(|attribs| attribs.priority.enum_value().ok())
144            .map(|prio| {
145                if prio == UPriority::UPRIORITY_UNSPECIFIED {
146                    crate::uattributes::UPRIORITY_DEFAULT
147                } else {
148                    prio
149                }
150            })
151    }
152
153    /// Gets this message's priority.
154    ///
155    /// # Panics
156    ///
157    /// if the property has no value.
158    pub fn priority_unchecked(&self) -> UPriority {
159        self.priority().expect("message has no priority")
160    }
161
162    /// Gets this message's commstatus.
163    pub fn commstatus(&self) -> Option<UCode> {
164        self.attributes
165            .as_ref()
166            .and_then(|attribs| attribs.commstatus)
167            .and_then(|v| v.enum_value().ok())
168    }
169
170    /// Gets this message's commstatus.
171    ///
172    /// # Panics
173    ///
174    /// if the property has no value.
175    pub fn commstatus_unchecked(&self) -> UCode {
176        self.commstatus().expect("message has no commstatus")
177    }
178
179    /// Gets this message's time-to-live.
180    ///
181    /// # Returns
182    ///
183    /// the time-to-live in milliseconds.
184    pub fn ttl(&self) -> Option<u32> {
185        self.attributes.as_ref().and_then(|attribs| attribs.ttl)
186    }
187
188    /// Gets this message's time-to-live.
189    ///
190    /// # Returns
191    ///
192    /// the time-to-live in milliseconds.
193    ///
194    /// # Panics
195    ///
196    /// if the property has no value.
197    pub fn ttl_unchecked(&self) -> u32 {
198        self.ttl().expect("message has no time-to-live")
199    }
200
201    /// Gets this message's permission level.
202    pub fn permission_level(&self) -> Option<u32> {
203        self.attributes
204            .as_ref()
205            .and_then(|attribs| attribs.permission_level)
206    }
207
208    /// Gets this message's token.
209    pub fn token(&self) -> Option<&String> {
210        self.attributes
211            .as_ref()
212            .and_then(|attribs| attribs.token.as_ref())
213    }
214
215    /// Gets this message's traceparent.
216    pub fn traceparent(&self) -> Option<&String> {
217        self.attributes
218            .as_ref()
219            .and_then(|attribs| attribs.traceparent.as_ref())
220    }
221
222    /// Gets this message's request identifier.
223    pub fn request_id(&self) -> Option<&UUID> {
224        self.attributes
225            .as_ref()
226            .and_then(|attribs| attribs.reqid.as_ref())
227    }
228
229    /// Gets this message's request identifier.
230    ///
231    /// # Panics
232    ///
233    /// if the property has no value.
234    pub fn request_id_unchecked(&self) -> &UUID {
235        self.request_id().expect("message has no request ID")
236    }
237
238    /// Gets this message's payload format.
239    pub fn payload_format(&self) -> Option<UPayloadFormat> {
240        self.attributes
241            .as_ref()
242            .and_then(|attribs| attribs.payload_format.enum_value().ok())
243    }
244
245    /// Gets this message's payload format.
246    ///
247    /// # Panics
248    ///
249    /// if the property has no value.
250    pub fn payload_format_unchecked(&self) -> UPayloadFormat {
251        self.payload_format()
252            .expect("message has no payload format")
253    }
254
255    /// Checks if this is a Publish message.
256    ///
257    /// # Examples
258    ///
259    /// ```rust
260    /// use up_rust::{UAttributes, UMessage, UMessageType};
261    ///
262    /// let attribs = UAttributes {
263    ///   type_: UMessageType::UMESSAGE_TYPE_PUBLISH.into(),
264    ///   ..Default::default()
265    /// };
266    /// let msg = UMessage {
267    ///   attributes: Some(attribs).into(),
268    ///   ..Default::default()
269    /// };
270    /// assert!(msg.is_publish());
271    /// ```
272    pub fn is_publish(&self) -> bool {
273        self.attributes
274            .as_ref()
275            .is_some_and(|attribs| attribs.is_publish())
276    }
277
278    /// Checks if this is an RPC Request message.
279    ///
280    /// # Examples
281    ///
282    /// ```rust
283    /// use up_rust::{UAttributes, UMessage, UMessageType};
284    ///
285    /// let attribs = UAttributes {
286    ///   type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(),
287    ///   ..Default::default()
288    /// };
289    /// let msg = UMessage {
290    ///   attributes: Some(attribs).into(),
291    ///   ..Default::default()
292    /// };
293    /// assert!(msg.is_request());
294    /// ```
295    pub fn is_request(&self) -> bool {
296        self.attributes
297            .as_ref()
298            .is_some_and(|attribs| attribs.is_request())
299    }
300
301    /// Checks if this is an RPC Response message.
302    ///
303    /// # Examples
304    ///
305    /// ```rust
306    /// use up_rust::{UAttributes, UMessage, UMessageType};
307    ///
308    /// let attribs = UAttributes {
309    ///   type_: UMessageType::UMESSAGE_TYPE_RESPONSE.into(),
310    ///   ..Default::default()
311    /// };
312    /// let msg = UMessage {
313    ///   attributes: Some(attribs).into(),
314    ///   ..Default::default()
315    /// };
316    /// assert!(msg.is_response());
317    /// ```
318    pub fn is_response(&self) -> bool {
319        self.attributes
320            .as_ref()
321            .is_some_and(|attribs| attribs.is_response())
322    }
323
324    /// Checks if this is a Notification message.
325    ///
326    /// # Examples
327    ///
328    /// ```rust
329    /// use up_rust::{UAttributes, UMessage, UMessageType};
330    ///
331    /// let attribs = UAttributes {
332    ///   type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
333    ///   ..Default::default()
334    /// };
335    /// let msg = UMessage {
336    ///   attributes: Some(attribs).into(),
337    ///   ..Default::default()
338    /// };
339    /// assert!(msg.is_notification());
340    /// ```
341    pub fn is_notification(&self) -> bool {
342        self.attributes
343            .as_ref()
344            .is_some_and(|attribs| attribs.is_notification())
345    }
346
347    /// Deserializes this message's protobuf payload into a type.
348    ///
349    /// # Type Parameters
350    ///
351    /// * `T`: The target type of the data to be unpacked.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the message payload format is neither [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF]
356    /// nor [UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY] or if the bytes in the
357    /// payload cannot be deserialized into the target type.
358    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
359        if let Some(payload) = self.payload.as_ref() {
360            let payload_format = self.attributes.payload_format.enum_value_or_default();
361            deserialize_protobuf_bytes(payload, &payload_format)
362        } else {
363            Err(UMessageError::PayloadError(
364                "Message has no payload".to_string(),
365            ))
366        }
367    }
368}
369
370/// Deserializes a protobuf message from a byte array.
371///
372/// # Type Parameters
373///
374/// * `T`: The target type of the data to be unpacked.
375///
376/// # Arguments
377///
378/// * `payload` - The payload data.
379/// * `payload_format` - The format/encoding of the data. Must be one of
380///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`
381///    - `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`
382///
383/// # Errors
384///
385/// Returns an error if the payload format is unsupported or if the data can not be deserialized
386/// into the target type based on the given format.
387pub(crate) fn deserialize_protobuf_bytes<T: MessageFull + Default>(
388    payload: &Bytes,
389    payload_format: &UPayloadFormat,
390) -> Result<T, UMessageError> {
391    match payload_format {
392        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => {
393            T::parse_from_tokio_bytes(payload).map_err(UMessageError::DataSerializationError)
394        }
395        UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
396            Any::parse_from_tokio_bytes(payload)
397                .map_err(UMessageError::DataSerializationError)
398                .and_then(|any| match any.unpack() {
399                    Ok(Some(v)) => Ok(v),
400                    Ok(None) => Err(UMessageError::PayloadError(
401                        "cannot deserialize payload, message type mismatch".to_string(),
402                    )),
403                    Err(e) => Err(UMessageError::DataSerializationError(e)),
404                })
405        }
406        _ => {
407            let detail_msg = payload_format.to_media_type().map_or_else(
408                || format!("Unknown payload format: {}", payload_format.value()),
409                |mt| format!("Invalid/unsupported payload format: {mt}"),
410            );
411            Err(UMessageError::from(detail_msg))
412        }
413    }
414}
415
416#[cfg(test)]
417mod test {
418    use std::io;
419
420    use protobuf::well_known_types::{any::Any, duration::Duration, wrappers::StringValue};
421    use test_case::test_case;
422
423    use crate::{UAttributes, UStatus};
424
425    use super::*;
426
427    #[test]
428    fn test_deserialize_protobuf_bytes_succeeds() {
429        let mut data = StringValue::new();
430        data.value = "hello world".to_string();
431        let any = Any::pack(&data.clone()).unwrap();
432        let buf: Bytes = any.write_to_bytes().unwrap().into();
433
434        let result = deserialize_protobuf_bytes::<StringValue>(
435            &buf,
436            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
437        );
438        assert!(result.is_ok_and(|v| v.value == *"hello world"));
439
440        let result = deserialize_protobuf_bytes::<StringValue>(
441            &data.write_to_bytes().unwrap().into(),
442            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
443        );
444        assert!(result.is_ok_and(|v| v.value == *"hello world"));
445    }
446
447    #[test]
448    fn test_deserialize_protobuf_bytes_fails_for_payload_type_mismatch() {
449        let mut data = StringValue::new();
450        data.value = "hello world".to_string();
451        let any = Any::pack(&data).unwrap();
452        let buf: Bytes = any.write_to_bytes().unwrap().into();
453        let result = deserialize_protobuf_bytes::<UStatus>(
454            &buf,
455            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
456        );
457        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
458    }
459
460    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_JSON; "JSON format")]
461    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_RAW; "RAW format")]
462    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SHM; "SHM format")]
463    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP; "SOMEIP format")]
464    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV; "SOMEIP TLV format")]
465    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_TEXT; "TEXT format")]
466    #[test_case(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; "UNSPECIFIED format")]
467    fn test_deserialize_protobuf_bytes_fails_for_(format: UPayloadFormat) {
468        let result = deserialize_protobuf_bytes::<UStatus>(&"hello".into(), &format);
469        assert!(result.is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
470    }
471
472    #[test]
473    fn test_deserialize_protobuf_bytes_fails_for_invalid_encoding() {
474        let any = Any {
475            type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
476            value: vec![0x0A],
477            ..Default::default()
478        };
479        let buf = any.write_to_bytes().unwrap();
480        let result = deserialize_protobuf_bytes::<Duration>(
481            &buf.into(),
482            &UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
483        );
484        assert!(result.is_err_and(|e| matches!(e, UMessageError::DataSerializationError(_))))
485    }
486
487    #[test]
488    fn extract_payload_succeeds() {
489        let payload = StringValue {
490            value: "hello".to_string(),
491            ..Default::default()
492        };
493        let buf = Any::pack(&payload)
494            .and_then(|a| a.write_to_bytes())
495            .unwrap();
496        let msg = UMessage {
497            attributes: Some(UAttributes {
498                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
499                ..Default::default()
500            })
501            .into(),
502            payload: Some(buf.into()),
503            ..Default::default()
504        };
505        assert!(msg
506            .extract_protobuf::<StringValue>()
507            .is_ok_and(|v| v.value == *"hello"));
508    }
509
510    #[test]
511    fn extract_payload_fails_for_no_payload() {
512        let msg = UMessage {
513            attributes: Some(UAttributes {
514                payload_format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.into(),
515                ..Default::default()
516            })
517            .into(),
518            ..Default::default()
519        };
520        assert!(msg
521            .extract_protobuf::<StringValue>()
522            .is_err_and(|e| matches!(e, UMessageError::PayloadError(_))));
523    }
524
525    #[test]
526    fn test_from_attributes_error() {
527        let attributes_error = UAttributesError::validation_error("failed to validate");
528        let message_error = UMessageError::from(attributes_error);
529        assert!(matches!(
530            message_error,
531            UMessageError::AttributesValidationError(UAttributesError::ValidationError(_))
532        ));
533    }
534
535    #[test]
536    fn test_from_protobuf_error() {
537        let protobuf_error = protobuf::Error::from(io::Error::last_os_error());
538        let message_error = UMessageError::from(protobuf_error);
539        assert!(matches!(
540            message_error,
541            UMessageError::DataSerializationError(_)
542        ));
543    }
544
545    #[test]
546    fn test_from_error_msg() {
547        let message_error = UMessageError::from("an error occurred");
548        assert!(matches!(message_error, UMessageError::PayloadError(_)));
549    }
550}