up_rust/
cloudevents.rs

1// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10//     https://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//
18// SPDX-License-Identifier: Apache-2.0
19
20// [impl->dsn~cloudevents-umessage-mapping~2]
21
22use crate::{
23    UAttributes, UAttributesError, UAttributesValidators, UCode, UMessage, UMessageError,
24    UMessageType, UPayloadFormat, UPriority, UUri, UUID,
25};
26use bytes::Bytes;
27use protobuf::{well_known_types::any::Any, Enum, EnumOrUnknown, MessageField};
28
29pub use cloudevents::{cloud_event::CloudEventAttributeValue, CloudEvent};
30
31include!(concat!(env!("OUT_DIR"), "/cloudevents/mod.rs"));
32
33// The _official_ content type to use for CloudEvents serialized using the
34// protobuf format.
35pub const CONTENT_TYPE_CLOUDEVENTS_PROTOBUF: &str = "application/cloudevents+protobuf";
36
37const CLOUDEVENTS_SPEC_VERSION: &str = "1.0";
38
39const EXTENSION_NAME_COMMSTATUS: &str = "commstatus";
40const EXTENSION_NAME_PERMISSION_LEVEL: &str = "plevel";
41const EXTENSION_NAME_PFORMAT: &str = "pformat";
42const EXTENSION_NAME_PRIORITY: &str = "priority";
43const EXTENSION_NAME_REQUEST_ID: &str = "reqid";
44const EXTENSION_NAME_SINK: &str = "sink";
45const EXTENSION_NAME_TOKEN: &str = "token";
46const EXTENSION_NAME_TRACEPARENT: &str = "traceparent";
47const EXTENSION_NAME_TTL: &str = "ttl";
48
49impl CloudEvent {
50    fn get_id(&self) -> Result<UUID, UAttributesError> {
51        self.id
52            .parse::<UUID>()
53            .map_err(|e| UAttributesError::parsing_error(e.to_string()))
54    }
55
56    fn set_id(&mut self, id: &UUID) {
57        self.id = id.to_hyphenated_string();
58    }
59
60    fn get_type(&self) -> Result<UMessageType, UAttributesError> {
61        UMessageType::try_from_cloudevent_type(self.type_.clone())
62    }
63
64    fn set_type(&mut self, type_: UMessageType) {
65        self.type_ = type_.to_cloudevent_type();
66    }
67
68    fn get_source(&self) -> Result<UUri, UAttributesError> {
69        self.source
70            .parse::<UUri>()
71            .map_err(|e| UAttributesError::parsing_error(e.to_string()))
72    }
73
74    fn set_source<T: Into<String>>(&mut self, uri: T) {
75        self.source = uri.into();
76    }
77
78    fn get_sink(&self) -> Result<Option<UUri>, UAttributesError> {
79        self.attributes
80            .get(EXTENSION_NAME_SINK)
81            .map(|v| v.ce_uri_ref())
82            .map_or(Ok(None), |uri| {
83                uri.parse::<UUri>()
84                    .map(Option::Some)
85                    .map_err(|e| UAttributesError::parsing_error(e.to_string()))
86            })
87    }
88
89    fn set_sink<T: Into<String>>(&mut self, uri: T) {
90        let mut val = CloudEventAttributeValue::new();
91        val.set_ce_uri_ref(uri.into());
92        self.attributes.insert(EXTENSION_NAME_SINK.to_string(), val);
93    }
94
95    fn get_priority(&self) -> Result<UPriority, UAttributesError> {
96        self.attributes
97            .get(EXTENSION_NAME_PRIORITY)
98            .map(|v| v.ce_string())
99            .map_or(Ok(UPriority::default()), |v| {
100                UPriority::try_from_priority_code(v)
101            })
102    }
103
104    fn set_priority(&mut self, priority: UPriority) {
105        let mut val = CloudEventAttributeValue::new();
106        val.set_ce_string(priority.to_priority_code());
107        self.attributes
108            .insert(EXTENSION_NAME_PRIORITY.to_string(), val);
109    }
110
111    fn get_ttl(&self) -> Option<u32> {
112        self.attributes
113            .get(EXTENSION_NAME_TTL)
114            .map(|v| v.ce_integer() as u32)
115    }
116
117    fn set_ttl(&mut self, ttl: u32) -> Result<(), UAttributesError> {
118        let v = i32::try_from(ttl).map_err(|e| UAttributesError::parsing_error(e.to_string()))?;
119        let mut val = CloudEventAttributeValue::new();
120        val.set_ce_integer(v);
121        self.attributes.insert(EXTENSION_NAME_TTL.to_string(), val);
122        Ok(())
123    }
124
125    fn get_token(&self) -> Option<String> {
126        self.attributes
127            .get(EXTENSION_NAME_TOKEN)
128            .map(|val| val.ce_string().to_string())
129    }
130
131    fn set_token<T: Into<String>>(&mut self, token: T) {
132        let mut val = CloudEventAttributeValue::new();
133        val.set_ce_string(token.into());
134        self.attributes
135            .insert(EXTENSION_NAME_TOKEN.to_string(), val);
136    }
137
138    fn get_permission_level(&self) -> Option<u32> {
139        self.attributes
140            .get(EXTENSION_NAME_PERMISSION_LEVEL)
141            .map(|v| v.ce_integer() as u32)
142    }
143
144    fn set_permission_level(&mut self, level: u32) -> Result<(), UAttributesError> {
145        let v = i32::try_from(level).map_err(|e| UAttributesError::parsing_error(e.to_string()))?;
146        let mut val = CloudEventAttributeValue::new();
147        val.set_ce_integer(v);
148        self.attributes
149            .insert(EXTENSION_NAME_PERMISSION_LEVEL.to_string(), val);
150        Ok(())
151    }
152
153    fn get_request_id(&self) -> Result<Option<UUID>, UAttributesError> {
154        self.attributes
155            .get(EXTENSION_NAME_REQUEST_ID)
156            .map(|v| v.ce_string().to_owned())
157            .map_or(Ok(None), |v| {
158                v.parse::<UUID>()
159                    .map(Option::Some)
160                    .map_err(|e| UAttributesError::parsing_error(e.to_string()))
161            })
162    }
163
164    fn set_request_id(&mut self, id: &UUID) {
165        let mut val = CloudEventAttributeValue::new();
166        val.set_ce_string(id.to_hyphenated_string());
167        self.attributes
168            .insert(EXTENSION_NAME_REQUEST_ID.to_string(), val);
169    }
170
171    fn get_commstatus(&self) -> Option<UCode> {
172        self.attributes
173            .get(EXTENSION_NAME_COMMSTATUS)
174            .map(|val| val.ce_integer())
175            .and_then(UCode::from_i32)
176    }
177
178    fn set_commstatus(&mut self, status: UCode) {
179        if status != UCode::OK {
180            let mut val = CloudEventAttributeValue::new();
181            val.set_ce_integer(status.value());
182            self.attributes
183                .insert(EXTENSION_NAME_COMMSTATUS.to_string(), val);
184        }
185    }
186
187    fn get_traceparent(&self) -> Option<String> {
188        self.attributes
189            .get(EXTENSION_NAME_TRACEPARENT)
190            .map(|val| val.ce_string().to_string())
191    }
192
193    fn set_traceparent<T: Into<String>>(&mut self, traceparent: T) {
194        let mut val = CloudEventAttributeValue::new();
195        val.set_ce_string(traceparent.into());
196        self.attributes
197            .insert(EXTENSION_NAME_TRACEPARENT.to_string(), val);
198    }
199
200    fn get_payload_format(&self) -> Result<UPayloadFormat, UAttributesError> {
201        self.attributes
202            .get(EXTENSION_NAME_PFORMAT)
203            .map(|val| val.ce_integer())
204            .map_or(Ok(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED), |format| {
205                UPayloadFormat::from_i32(format).ok_or(UAttributesError::ParsingError(
206                    "unsupported payload format".to_string(),
207                ))
208            })
209    }
210
211    fn set_payload_format(&mut self, format: UPayloadFormat) {
212        if format != UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED {
213            let mut val = CloudEventAttributeValue::new();
214            val.set_ce_integer(format.value());
215            self.attributes
216                .insert(EXTENSION_NAME_PFORMAT.to_string(), val);
217        }
218    }
219}
220
221impl TryFrom<UMessage> for CloudEvent {
222    type Error = UMessageError;
223
224    // Converts a uProtocol message into a CloudEvent using the
225    // [Protobuf Event Format](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/formats/protobuf-format.md).
226    //
227    // # Arguments
228    //
229    // * `message` - The message to create the event from.
230    //               Note that the message is not validated against the uProtocol specification before processing.
231    //
232    // # Returns
233    //
234    // Returns a CloudEvent protobuf with all information from the uProtocol message mapped as defined by the
235    // [uProtocol specification]().
236    //
237    // # Errors
238    //
239    // Returns an error if the given message does not contain the necessary information for creating a CloudEvent.
240    fn try_from(message: UMessage) -> Result<Self, Self::Error> {
241        if message.attributes.as_ref().is_none() {
242            return Err(UMessageError::AttributesValidationError(
243                UAttributesError::ValidationError("message has no attributes".to_string()),
244            ));
245        };
246        let mut event = CloudEvent::new();
247        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
248        if let Some(id) = message.id() {
249            event.set_id(id);
250        } else {
251            return Err(UMessageError::AttributesValidationError(
252                UAttributesError::ValidationError("message has no id".to_string()),
253            ));
254        }
255        if let Some(message_type) = message.type_() {
256            event.set_type(message_type);
257        } else {
258            return Err(UMessageError::AttributesValidationError(
259                UAttributesError::ValidationError("message has no type".to_string()),
260            ));
261        }
262        if let Some(source) = message.source() {
263            event.set_source(source);
264        } else {
265            return Err(UMessageError::AttributesValidationError(
266                UAttributesError::ValidationError("message has no source address".to_string()),
267            ));
268        }
269        if let Some(sink) = message.sink() {
270            event.set_sink(sink);
271        }
272        if let Some(priority) = message.priority() {
273            if priority != UPriority::UPRIORITY_UNSPECIFIED {
274                event.set_priority(priority);
275            }
276        } else {
277            return Err(UMessageError::AttributesValidationError(
278                UAttributesError::ValidationError("message has unsupported priority".to_string()),
279            ));
280        }
281        if let Some(ttl) = message.ttl() {
282            event.set_ttl(ttl)?;
283        }
284        if let Some(token) = message.token() {
285            event.set_token(token);
286        }
287        if let Some(plevel) = message.permission_level() {
288            event.set_permission_level(plevel)?;
289        }
290        if let Some(reqid) = message.request_id() {
291            event.set_request_id(reqid);
292        }
293        if let Some(commstatus) = message.commstatus() {
294            event.set_commstatus(commstatus);
295        }
296        if let Some(traceparent) = message.traceparent() {
297            event.set_traceparent(traceparent);
298        }
299        let payload_format = message.payload_format().unwrap_or_default();
300        if let Some(payload) = message.payload {
301            event.set_payload_format(payload_format);
302            match payload_format {
303                UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF
304                | UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
305                    let data = Any {
306                        value: payload.to_vec(),
307                        ..Default::default()
308                    };
309                    event.set_proto_data(data);
310                }
311                UPayloadFormat::UPAYLOAD_FORMAT_TEXT | UPayloadFormat::UPAYLOAD_FORMAT_JSON => {
312                    let data = String::from_utf8(payload.to_vec())
313                        .map(|v| v.to_string())
314                        .map_err(|_e| {
315                            UMessageError::PayloadError(
316                                "failed to transform payload to string".to_string(),
317                            )
318                        })?;
319                    event.set_text_data(data);
320                }
321                _ => {
322                    event.set_binary_data(payload.into());
323                }
324            }
325        }
326        Ok(event)
327    }
328}
329
330impl TryFrom<CloudEvent> for UMessage {
331    type Error = UMessageError;
332
333    // Converts a CloudEvent to a uProtocol message.
334    //
335    // # Arguments
336    //
337    // * `event` - The CloudEvent to create the message from.
338    //
339    // # Errors
340    //
341    // Returns an error if the given event does not contain the necessary information for creating a uProtocol message.
342    // Also returns an error if the resulting message is not a valid uProtocol message.
343    fn try_from(event: CloudEvent) -> Result<Self, Self::Error> {
344        if !CLOUDEVENTS_SPEC_VERSION.eq(&event.spec_version) {
345            let msg = format!("expected spec version 1.0 but found {}", event.spec_version);
346            return Err(UMessageError::AttributesValidationError(
347                UAttributesError::ValidationError(msg),
348            ));
349        }
350
351        let attributes = UAttributes {
352            commstatus: event.get_commstatus().map(EnumOrUnknown::from),
353            id: MessageField::from_option(Some(event.get_id()?)),
354            type_: EnumOrUnknown::from(event.get_type()?),
355            source: MessageField::from_option(Some(event.get_source()?)),
356            sink: MessageField::from_option(event.get_sink()?),
357            priority: EnumOrUnknown::from(event.get_priority()?),
358            ttl: event.get_ttl(),
359            permission_level: event.get_permission_level(),
360            reqid: MessageField::from_option(event.get_request_id()?),
361            token: event.get_token(),
362            traceparent: event.get_traceparent(),
363            payload_format: event.get_payload_format().map(EnumOrUnknown::from)?,
364            ..Default::default()
365        };
366        UAttributesValidators::get_validator_for_attributes(&attributes).validate(&attributes)?;
367
368        let payload = if event.has_binary_data() {
369            Some(Bytes::copy_from_slice(event.binary_data()))
370        } else if event.has_text_data() {
371            Some(event.text_data().to_owned().into())
372        } else if event.has_proto_data() {
373            Some(event.proto_data().value.to_vec().into())
374        } else {
375            None
376        };
377
378        Ok(UMessage {
379            attributes: Some(attributes).into(),
380            payload,
381            ..Default::default()
382        })
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use std::str::FromStr;
389
390    use cloudevents::CloudEvent;
391    use protobuf::{well_known_types::wrappers::StringValue, Message};
392
393    use crate::UMessageBuilder;
394
395    use super::*;
396
397    const MESSAGE_ID: &str = "00000000-0001-7000-8010-101010101a1a";
398    const TOPIC: &str = "//my-vehicle/A81B/1/A9BA";
399    const METHOD: &str = "//my-vehicle/A000/2/1";
400    const REPLY_TO: &str = "//my-vehicle/A81B/1/0";
401    const DESTINATION: &str = "//my-vehicle/A000/2/0";
402    const PERMISSION_LEVEL: u32 = 5;
403    const PRIORITY: UPriority = UPriority::UPRIORITY_CS4;
404    const TTL: u32 = 15_000;
405    const TRACEPARENT: &str = "traceparent";
406    const DATA: [u8; 4] = [0x00, 0x01, 0x02, 0x03];
407
408    //
409    // tests asserting conversion of UMessage -> CloudEvent
410    // [utest->dsn~cloudevents-umessage-mapping~2]
411    //
412
413    fn assert_standard_cloudevent_attributes(
414        event: &CloudEvent,
415        message_type: &str,
416        source: &str,
417        sink: Option<String>,
418    ) {
419        assert_eq!(event.spec_version, CLOUDEVENTS_SPEC_VERSION);
420        assert_eq!(event.type_, message_type);
421        assert_eq!(event.id, MESSAGE_ID);
422        assert_eq!(event.source.as_str(), source);
423        assert_eq!(
424            event
425                .attributes
426                .get(EXTENSION_NAME_SINK)
427                .map(|v| v.ce_uri_ref().to_owned()),
428            sink
429        );
430        assert_eq!(
431            event
432                .attributes
433                .get(EXTENSION_NAME_PRIORITY)
434                .map(|v| v.ce_string().to_owned()),
435            Some(PRIORITY.to_priority_code())
436        );
437        assert_eq!(
438            event
439                .attributes
440                .get(EXTENSION_NAME_TTL)
441                .map(|v| v.ce_integer() as u32),
442            Some(TTL),
443            "unexpected TTL"
444        );
445        assert_eq!(
446            event
447                .attributes
448                .get(EXTENSION_NAME_TRACEPARENT)
449                .map(|v| v.ce_string()),
450            Some(TRACEPARENT)
451        );
452    }
453
454    #[test]
455    fn test_try_from_publish_message_succeeds() {
456        let message_id = MESSAGE_ID
457            .parse::<UUID>()
458            .expect("failed to parse message ID");
459        let message =
460            UMessageBuilder::publish(UUri::from_str(TOPIC).expect("failed to create topic URI"))
461                .with_message_id(message_id)
462                .with_priority(PRIORITY)
463                .with_ttl(TTL)
464                .with_traceparent(TRACEPARENT)
465                .build_with_payload("test".as_bytes(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
466                .expect("failed to create message");
467
468        let event =
469            CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
470        assert_standard_cloudevent_attributes(&event, "up-pub.v1", TOPIC, None);
471        assert_eq!(
472            event
473                .attributes
474                .get(EXTENSION_NAME_PFORMAT)
475                .map(|v| v.ce_integer()),
476            Some(UPayloadFormat::UPAYLOAD_FORMAT_TEXT.value())
477        );
478        assert_eq!(event.text_data(), "test");
479    }
480
481    #[test]
482    fn test_try_from_notification_message_succeeds() {
483        let message_id = MESSAGE_ID
484            .parse::<UUID>()
485            .expect("failed to parse message ID");
486        let message = UMessageBuilder::notification(
487            UUri::from_str(TOPIC).expect("failed to create source URI"),
488            UUri::from_str(DESTINATION).expect("failed to create sink URI"),
489        )
490        .with_message_id(message_id)
491        .with_priority(PRIORITY)
492        .with_ttl(TTL)
493        .with_traceparent(TRACEPARENT)
494        .build_with_payload(
495            "{\"count\": 5}".as_bytes(),
496            UPayloadFormat::UPAYLOAD_FORMAT_JSON,
497        )
498        .expect("failed to create message");
499
500        let event =
501            CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
502        assert_standard_cloudevent_attributes(
503            &event,
504            "up-not.v1",
505            TOPIC,
506            Some(DESTINATION.to_string()),
507        );
508        assert_eq!(
509            event
510                .attributes
511                .get(EXTENSION_NAME_PFORMAT)
512                .map(|v| v.ce_integer()),
513            Some(UPayloadFormat::UPAYLOAD_FORMAT_JSON.value())
514        );
515        assert_eq!(event.text_data(), "{\"count\": 5}");
516    }
517
518    #[test]
519    fn test_try_from_request_message_succeeds() {
520        let mut payload = StringValue::new();
521        payload.value = "Hello".into();
522
523        let message_id = MESSAGE_ID
524            .parse::<UUID>()
525            .expect("failed to parse message ID");
526        let token = "my-token";
527        let message = UMessageBuilder::request(
528            UUri::from_str(METHOD).expect("failed to create sink URI"),
529            UUri::from_str(REPLY_TO).expect("failed to create source URI"),
530            TTL,
531        )
532        .with_message_id(message_id)
533        .with_priority(PRIORITY)
534        .with_permission_level(PERMISSION_LEVEL)
535        .with_traceparent(TRACEPARENT)
536        .with_token(token)
537        .build_with_wrapped_protobuf_payload(&payload)
538        .expect("failed to create message");
539        let event =
540            CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
541        assert_standard_cloudevent_attributes(
542            &event,
543            "up-req.v1",
544            REPLY_TO,
545            Some(METHOD.to_string()),
546        );
547        assert_eq!(
548            event
549                .attributes
550                .get(EXTENSION_NAME_TOKEN)
551                .map(|v| v.ce_string()),
552            Some(token)
553        );
554        assert_eq!(
555            event
556                .attributes
557                .get(EXTENSION_NAME_PERMISSION_LEVEL)
558                .map(|v| v.ce_integer()),
559            Some(PERMISSION_LEVEL as i32)
560        );
561        assert_eq!(
562            event
563                .attributes
564                .get(EXTENSION_NAME_PFORMAT)
565                .map(|v| v.ce_integer()),
566            Some(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.value())
567        );
568        assert!(!event.has_binary_data());
569        assert!(!event.has_text_data());
570        let payload_wrapped_in_any = Any::pack(&payload).expect("failed to wrap payload in Any");
571        assert_eq!(
572            event.proto_data().value,
573            Any::pack(&payload_wrapped_in_any)
574                .expect("failed to pack payload into Any")
575                .value
576        );
577    }
578
579    #[test]
580    fn test_try_from_response_message_succeeds() {
581        let mut payload = StringValue::new();
582        payload.value = "Hello".into();
583
584        let message_id = MESSAGE_ID
585            .parse::<UUID>()
586            .expect("failed to parse message ID");
587        let request_id = UUID::build();
588
589        let message = UMessageBuilder::response(
590            UUri::from_str(REPLY_TO).expect("failed to create sink URI"),
591            request_id.clone(),
592            UUri::from_str(METHOD).expect("failed to create source URI"),
593        )
594        .with_message_id(message_id)
595        .with_ttl(TTL)
596        .with_priority(PRIORITY)
597        .with_comm_status(UCode::OK)
598        .with_traceparent(TRACEPARENT)
599        .build_with_protobuf_payload(&payload)
600        .expect("failed to create message");
601
602        let event =
603            CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
604        assert_standard_cloudevent_attributes(
605            &event,
606            "up-res.v1",
607            METHOD,
608            Some(REPLY_TO.to_string()),
609        );
610        assert_eq!(
611            event
612                .attributes
613                .get(EXTENSION_NAME_COMMSTATUS)
614                .map(|v| v.ce_integer()),
615            None
616        );
617        assert_eq!(
618            event
619                .attributes
620                .get(EXTENSION_NAME_PFORMAT)
621                .map(|v| v.ce_integer()),
622            Some(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF.value())
623        );
624        assert!(!event.has_binary_data());
625        assert!(!event.has_text_data());
626        assert_eq!(
627            event.proto_data().value,
628            Any::pack(&payload)
629                .expect("failed to pack payload into Any")
630                .value
631        );
632    }
633
634    //
635    // tests asserting conversion of CloudEvent -> UMessage
636    // [utest->dsn~cloudevents-umessage-mapping~2]
637    //
638
639    fn assert_standard_umessage_attributes(
640        attribs: &UAttributes,
641        message_type: UMessageType,
642        source: &str,
643        sink: Option<String>,
644    ) {
645        assert_eq!(attribs.type_.enum_value_or_default(), message_type);
646        assert_eq!(
647            attribs.id.get_or_default().to_hyphenated_string(),
648            MESSAGE_ID
649        );
650        assert_eq!(attribs.source.get_or_default().to_uri(false), source);
651        assert_eq!(attribs.sink.as_ref().map(|uuri| uuri.to_uri(false)), sink);
652        assert_eq!(
653            attribs.priority.enum_value_or_default(),
654            UPriority::UPRIORITY_CS4
655        );
656        assert_eq!(attribs.ttl, Some(TTL));
657        assert_eq!(attribs.traceparent, Some(TRACEPARENT.to_string()));
658    }
659
660    #[test]
661    fn test_try_from_cloudevent_without_sink_fails() {
662        let mut event = CloudEvent::new();
663        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
664        event.type_ = UMessageType::UMESSAGE_TYPE_NOTIFICATION.to_cloudevent_type();
665        event.id = MESSAGE_ID.into();
666        event.source = TOPIC.into();
667
668        assert!(UMessage::try_from(event).is_err());
669    }
670
671    #[test]
672    fn test_try_from_publish_cloudevent_succeeds() {
673        let mut event = CloudEvent::new();
674        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
675        event.set_type(UMessageType::UMESSAGE_TYPE_PUBLISH);
676        event.id = MESSAGE_ID.into();
677        event.source = TOPIC.into();
678        event.set_priority(UPriority::UPRIORITY_CS4);
679        event.set_ttl(TTL).expect("failed to set TTL on message");
680        event.set_traceparent(TRACEPARENT);
681        event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
682        event.set_text_data("test".to_string());
683
684        let umessage =
685            UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
686        let attribs = umessage.attributes.get_or_default();
687        assert_standard_umessage_attributes(
688            attribs,
689            UMessageType::UMESSAGE_TYPE_PUBLISH,
690            TOPIC,
691            None,
692        );
693        assert_eq!(
694            attribs.payload_format.enum_value_or_default(),
695            UPayloadFormat::UPAYLOAD_FORMAT_TEXT
696        );
697        assert_eq!(umessage.payload, Some("test".as_bytes().to_vec().into()))
698    }
699
700    #[test]
701    fn test_try_from_notification_cloudevent_succeeds() {
702        let mut event = CloudEvent::new();
703        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
704        event.set_type(UMessageType::UMESSAGE_TYPE_NOTIFICATION);
705        event.id = MESSAGE_ID.into();
706        event.source = TOPIC.into();
707        event.set_sink(DESTINATION);
708        event.set_priority(UPriority::UPRIORITY_CS4);
709        event.set_ttl(TTL).expect("failed to set TTL on message");
710        event.set_traceparent(TRACEPARENT);
711        event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_JSON);
712        event.set_text_data("{\"count\": 5}".to_string());
713
714        let umessage =
715            UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
716        let attribs = umessage.attributes.get_or_default();
717        assert_standard_umessage_attributes(
718            attribs,
719            UMessageType::UMESSAGE_TYPE_NOTIFICATION,
720            TOPIC,
721            Some(DESTINATION.to_string()),
722        );
723        assert_eq!(
724            attribs.payload_format.enum_value_or_default(),
725            UPayloadFormat::UPAYLOAD_FORMAT_JSON
726        );
727        assert_eq!(
728            umessage.payload,
729            Some("{\"count\": 5}".as_bytes().to_vec().into())
730        )
731    }
732
733    #[test]
734    fn test_try_from_request_cloudevent_succeeds() {
735        let mut event = CloudEvent::new();
736        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
737        event.set_type(UMessageType::UMESSAGE_TYPE_REQUEST);
738        event.id = MESSAGE_ID.into();
739        event.source = REPLY_TO.into();
740        event.set_sink(METHOD);
741        event.set_priority(UPriority::UPRIORITY_CS4);
742        event.set_ttl(TTL).expect("failed to set TTL on message");
743        event.set_traceparent(TRACEPARENT);
744        event
745            .set_permission_level(PERMISSION_LEVEL)
746            .expect("failed to set permission level on message");
747        event.set_token("my-token");
748
749        let mut payload = StringValue::new();
750        payload.value = "Hello".into();
751        let payload_wrapped_in_any = Any::pack(&payload).expect("failed to wrap payload in Any");
752        let serialized_payload = payload_wrapped_in_any
753            .write_to_bytes()
754            .expect("failed to serialize payload");
755        event.set_proto_data(Any {
756            value: serialized_payload.clone(),
757            ..Default::default()
758        });
759
760        let umessage =
761            UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
762        let attribs = umessage.attributes.get_or_default();
763        assert_standard_umessage_attributes(
764            attribs,
765            UMessageType::UMESSAGE_TYPE_REQUEST,
766            REPLY_TO,
767            Some(METHOD.to_string()),
768        );
769        assert_eq!(attribs.permission_level, Some(PERMISSION_LEVEL));
770        assert_eq!(attribs.token, Some("my-token".to_string()));
771        assert_eq!(
772            attribs.payload_format.enum_value_or_default(),
773            UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED
774        );
775        assert_eq!(umessage.payload, Some(serialized_payload.into()));
776    }
777
778    #[test]
779    fn test_try_from_response_cloudevent_succeeds() {
780        let request_id = UUID::build();
781        let mut event = CloudEvent::new();
782        event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
783        event.set_type(UMessageType::UMESSAGE_TYPE_RESPONSE);
784        event.id = MESSAGE_ID.into();
785        event.source = METHOD.into();
786        event.set_sink(REPLY_TO);
787        event.set_priority(UPriority::UPRIORITY_CS4);
788        event.set_ttl(TTL).expect("failed to set TTL on message");
789        event.set_traceparent(TRACEPARENT);
790        event.set_request_id(&request_id);
791        event.set_commstatus(UCode::OK);
792        event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF);
793        event.set_proto_data(Any {
794            value: DATA.to_vec(),
795            ..Default::default()
796        });
797
798        let umessage =
799            UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
800        let attribs = umessage.attributes.get_or_default();
801        assert_standard_umessage_attributes(
802            attribs,
803            UMessageType::UMESSAGE_TYPE_RESPONSE,
804            METHOD,
805            Some(REPLY_TO.to_string()),
806        );
807        assert_eq!(attribs.commstatus, None);
808        assert_eq!(attribs.reqid, Some(request_id).into());
809        assert_eq!(
810            attribs.payload_format.enum_value_or_default(),
811            UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF
812        );
813        assert_eq!(umessage.payload, Some(DATA.to_vec().into()))
814    }
815}