1use crate::{
23 UAttributes, UAttributesError, UAttributesValidators, UCode, UMessage, UMessageError,
24 UMessageType, UPayloadFormat, UPriority, UUri, UUID,
25};
26use bytes::Bytes;
27use protobuf::{well_known_types::any::Any, Enum, EnumOrUnknown, MessageField};
28
29pub use cloudevents::{cloud_event::CloudEventAttributeValue, CloudEvent};
30
31include!(concat!(env!("OUT_DIR"), "/cloudevents/mod.rs"));
32
33pub const CONTENT_TYPE_CLOUDEVENTS_PROTOBUF: &str = "application/cloudevents+protobuf";
36
37const CLOUDEVENTS_SPEC_VERSION: &str = "1.0";
38
39const EXTENSION_NAME_COMMSTATUS: &str = "commstatus";
40const EXTENSION_NAME_PERMISSION_LEVEL: &str = "plevel";
41const EXTENSION_NAME_PFORMAT: &str = "pformat";
42const EXTENSION_NAME_PRIORITY: &str = "priority";
43const EXTENSION_NAME_REQUEST_ID: &str = "reqid";
44const EXTENSION_NAME_SINK: &str = "sink";
45const EXTENSION_NAME_TOKEN: &str = "token";
46const EXTENSION_NAME_TRACEPARENT: &str = "traceparent";
47const EXTENSION_NAME_TTL: &str = "ttl";
48
49impl CloudEvent {
50 fn get_id(&self) -> Result<UUID, UAttributesError> {
51 self.id
52 .parse::<UUID>()
53 .map_err(|e| UAttributesError::parsing_error(e.to_string()))
54 }
55
56 fn set_id(&mut self, id: &UUID) {
57 self.id = id.to_hyphenated_string();
58 }
59
60 fn get_type(&self) -> Result<UMessageType, UAttributesError> {
61 UMessageType::try_from_cloudevent_type(self.type_.clone())
62 }
63
64 fn set_type(&mut self, type_: UMessageType) {
65 self.type_ = type_.to_cloudevent_type();
66 }
67
68 fn get_source(&self) -> Result<UUri, UAttributesError> {
69 self.source
70 .parse::<UUri>()
71 .map_err(|e| UAttributesError::parsing_error(e.to_string()))
72 }
73
74 fn set_source<T: Into<String>>(&mut self, uri: T) {
75 self.source = uri.into();
76 }
77
78 fn get_sink(&self) -> Result<Option<UUri>, UAttributesError> {
79 self.attributes
80 .get(EXTENSION_NAME_SINK)
81 .map(|v| v.ce_uri_ref())
82 .map_or(Ok(None), |uri| {
83 uri.parse::<UUri>()
84 .map(Option::Some)
85 .map_err(|e| UAttributesError::parsing_error(e.to_string()))
86 })
87 }
88
89 fn set_sink<T: Into<String>>(&mut self, uri: T) {
90 let mut val = CloudEventAttributeValue::new();
91 val.set_ce_uri_ref(uri.into());
92 self.attributes.insert(EXTENSION_NAME_SINK.to_string(), val);
93 }
94
95 fn get_priority(&self) -> Result<UPriority, UAttributesError> {
96 self.attributes
97 .get(EXTENSION_NAME_PRIORITY)
98 .map(|v| v.ce_string())
99 .map_or(Ok(UPriority::default()), |v| {
100 UPriority::try_from_priority_code(v)
101 })
102 }
103
104 fn set_priority(&mut self, priority: UPriority) {
105 let mut val = CloudEventAttributeValue::new();
106 val.set_ce_string(priority.to_priority_code());
107 self.attributes
108 .insert(EXTENSION_NAME_PRIORITY.to_string(), val);
109 }
110
111 fn get_ttl(&self) -> Option<u32> {
112 self.attributes
113 .get(EXTENSION_NAME_TTL)
114 .map(|v| v.ce_integer() as u32)
115 }
116
117 fn set_ttl(&mut self, ttl: u32) -> Result<(), UAttributesError> {
118 let v = i32::try_from(ttl).map_err(|e| UAttributesError::parsing_error(e.to_string()))?;
119 let mut val = CloudEventAttributeValue::new();
120 val.set_ce_integer(v);
121 self.attributes.insert(EXTENSION_NAME_TTL.to_string(), val);
122 Ok(())
123 }
124
125 fn get_token(&self) -> Option<String> {
126 self.attributes
127 .get(EXTENSION_NAME_TOKEN)
128 .map(|val| val.ce_string().to_string())
129 }
130
131 fn set_token<T: Into<String>>(&mut self, token: T) {
132 let mut val = CloudEventAttributeValue::new();
133 val.set_ce_string(token.into());
134 self.attributes
135 .insert(EXTENSION_NAME_TOKEN.to_string(), val);
136 }
137
138 fn get_permission_level(&self) -> Option<u32> {
139 self.attributes
140 .get(EXTENSION_NAME_PERMISSION_LEVEL)
141 .map(|v| v.ce_integer() as u32)
142 }
143
144 fn set_permission_level(&mut self, level: u32) -> Result<(), UAttributesError> {
145 let v = i32::try_from(level).map_err(|e| UAttributesError::parsing_error(e.to_string()))?;
146 let mut val = CloudEventAttributeValue::new();
147 val.set_ce_integer(v);
148 self.attributes
149 .insert(EXTENSION_NAME_PERMISSION_LEVEL.to_string(), val);
150 Ok(())
151 }
152
153 fn get_request_id(&self) -> Result<Option<UUID>, UAttributesError> {
154 self.attributes
155 .get(EXTENSION_NAME_REQUEST_ID)
156 .map(|v| v.ce_string().to_owned())
157 .map_or(Ok(None), |v| {
158 v.parse::<UUID>()
159 .map(Option::Some)
160 .map_err(|e| UAttributesError::parsing_error(e.to_string()))
161 })
162 }
163
164 fn set_request_id(&mut self, id: &UUID) {
165 let mut val = CloudEventAttributeValue::new();
166 val.set_ce_string(id.to_hyphenated_string());
167 self.attributes
168 .insert(EXTENSION_NAME_REQUEST_ID.to_string(), val);
169 }
170
171 fn get_commstatus(&self) -> Option<UCode> {
172 self.attributes
173 .get(EXTENSION_NAME_COMMSTATUS)
174 .map(|val| val.ce_integer())
175 .and_then(UCode::from_i32)
176 }
177
178 fn set_commstatus(&mut self, status: UCode) {
179 if status != UCode::OK {
180 let mut val = CloudEventAttributeValue::new();
181 val.set_ce_integer(status.value());
182 self.attributes
183 .insert(EXTENSION_NAME_COMMSTATUS.to_string(), val);
184 }
185 }
186
187 fn get_traceparent(&self) -> Option<String> {
188 self.attributes
189 .get(EXTENSION_NAME_TRACEPARENT)
190 .map(|val| val.ce_string().to_string())
191 }
192
193 fn set_traceparent<T: Into<String>>(&mut self, traceparent: T) {
194 let mut val = CloudEventAttributeValue::new();
195 val.set_ce_string(traceparent.into());
196 self.attributes
197 .insert(EXTENSION_NAME_TRACEPARENT.to_string(), val);
198 }
199
200 fn get_payload_format(&self) -> Result<UPayloadFormat, UAttributesError> {
201 self.attributes
202 .get(EXTENSION_NAME_PFORMAT)
203 .map(|val| val.ce_integer())
204 .map_or(Ok(UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED), |format| {
205 UPayloadFormat::from_i32(format).ok_or(UAttributesError::ParsingError(
206 "unsupported payload format".to_string(),
207 ))
208 })
209 }
210
211 fn set_payload_format(&mut self, format: UPayloadFormat) {
212 if format != UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED {
213 let mut val = CloudEventAttributeValue::new();
214 val.set_ce_integer(format.value());
215 self.attributes
216 .insert(EXTENSION_NAME_PFORMAT.to_string(), val);
217 }
218 }
219}
220
221impl TryFrom<UMessage> for CloudEvent {
222 type Error = UMessageError;
223
224 fn try_from(message: UMessage) -> Result<Self, Self::Error> {
241 if message.attributes.as_ref().is_none() {
242 return Err(UMessageError::AttributesValidationError(
243 UAttributesError::ValidationError("message has no attributes".to_string()),
244 ));
245 };
246 let mut event = CloudEvent::new();
247 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
248 if let Some(id) = message.id() {
249 event.set_id(id);
250 } else {
251 return Err(UMessageError::AttributesValidationError(
252 UAttributesError::ValidationError("message has no id".to_string()),
253 ));
254 }
255 if let Some(message_type) = message.type_() {
256 event.set_type(message_type);
257 } else {
258 return Err(UMessageError::AttributesValidationError(
259 UAttributesError::ValidationError("message has no type".to_string()),
260 ));
261 }
262 if let Some(source) = message.source() {
263 event.set_source(source);
264 } else {
265 return Err(UMessageError::AttributesValidationError(
266 UAttributesError::ValidationError("message has no source address".to_string()),
267 ));
268 }
269 if let Some(sink) = message.sink() {
270 event.set_sink(sink);
271 }
272 if let Some(priority) = message.priority() {
273 if priority != UPriority::UPRIORITY_UNSPECIFIED {
274 event.set_priority(priority);
275 }
276 } else {
277 return Err(UMessageError::AttributesValidationError(
278 UAttributesError::ValidationError("message has unsupported priority".to_string()),
279 ));
280 }
281 if let Some(ttl) = message.ttl() {
282 event.set_ttl(ttl)?;
283 }
284 if let Some(token) = message.token() {
285 event.set_token(token);
286 }
287 if let Some(plevel) = message.permission_level() {
288 event.set_permission_level(plevel)?;
289 }
290 if let Some(reqid) = message.request_id() {
291 event.set_request_id(reqid);
292 }
293 if let Some(commstatus) = message.commstatus() {
294 event.set_commstatus(commstatus);
295 }
296 if let Some(traceparent) = message.traceparent() {
297 event.set_traceparent(traceparent);
298 }
299 let payload_format = message.payload_format().unwrap_or_default();
300 if let Some(payload) = message.payload {
301 event.set_payload_format(payload_format);
302 match payload_format {
303 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF
304 | UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => {
305 let data = Any {
306 value: payload.to_vec(),
307 ..Default::default()
308 };
309 event.set_proto_data(data);
310 }
311 UPayloadFormat::UPAYLOAD_FORMAT_TEXT | UPayloadFormat::UPAYLOAD_FORMAT_JSON => {
312 let data = String::from_utf8(payload.to_vec())
313 .map(|v| v.to_string())
314 .map_err(|_e| {
315 UMessageError::PayloadError(
316 "failed to transform payload to string".to_string(),
317 )
318 })?;
319 event.set_text_data(data);
320 }
321 _ => {
322 event.set_binary_data(payload.into());
323 }
324 }
325 }
326 Ok(event)
327 }
328}
329
330impl TryFrom<CloudEvent> for UMessage {
331 type Error = UMessageError;
332
333 fn try_from(event: CloudEvent) -> Result<Self, Self::Error> {
344 if !CLOUDEVENTS_SPEC_VERSION.eq(&event.spec_version) {
345 let msg = format!("expected spec version 1.0 but found {}", event.spec_version);
346 return Err(UMessageError::AttributesValidationError(
347 UAttributesError::ValidationError(msg),
348 ));
349 }
350
351 let attributes = UAttributes {
352 commstatus: event.get_commstatus().map(EnumOrUnknown::from),
353 id: MessageField::from_option(Some(event.get_id()?)),
354 type_: EnumOrUnknown::from(event.get_type()?),
355 source: MessageField::from_option(Some(event.get_source()?)),
356 sink: MessageField::from_option(event.get_sink()?),
357 priority: EnumOrUnknown::from(event.get_priority()?),
358 ttl: event.get_ttl(),
359 permission_level: event.get_permission_level(),
360 reqid: MessageField::from_option(event.get_request_id()?),
361 token: event.get_token(),
362 traceparent: event.get_traceparent(),
363 payload_format: event.get_payload_format().map(EnumOrUnknown::from)?,
364 ..Default::default()
365 };
366 UAttributesValidators::get_validator_for_attributes(&attributes).validate(&attributes)?;
367
368 let payload = if event.has_binary_data() {
369 Some(Bytes::copy_from_slice(event.binary_data()))
370 } else if event.has_text_data() {
371 Some(event.text_data().to_owned().into())
372 } else if event.has_proto_data() {
373 Some(event.proto_data().value.to_vec().into())
374 } else {
375 None
376 };
377
378 Ok(UMessage {
379 attributes: Some(attributes).into(),
380 payload,
381 ..Default::default()
382 })
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use std::str::FromStr;
389
390 use cloudevents::CloudEvent;
391 use protobuf::{well_known_types::wrappers::StringValue, Message};
392
393 use crate::UMessageBuilder;
394
395 use super::*;
396
397 const MESSAGE_ID: &str = "00000000-0001-7000-8010-101010101a1a";
398 const TOPIC: &str = "//my-vehicle/A81B/1/A9BA";
399 const METHOD: &str = "//my-vehicle/A000/2/1";
400 const REPLY_TO: &str = "//my-vehicle/A81B/1/0";
401 const DESTINATION: &str = "//my-vehicle/A000/2/0";
402 const PERMISSION_LEVEL: u32 = 5;
403 const PRIORITY: UPriority = UPriority::UPRIORITY_CS4;
404 const TTL: u32 = 15_000;
405 const TRACEPARENT: &str = "traceparent";
406 const DATA: [u8; 4] = [0x00, 0x01, 0x02, 0x03];
407
408 fn assert_standard_cloudevent_attributes(
414 event: &CloudEvent,
415 message_type: &str,
416 source: &str,
417 sink: Option<String>,
418 ) {
419 assert_eq!(event.spec_version, CLOUDEVENTS_SPEC_VERSION);
420 assert_eq!(event.type_, message_type);
421 assert_eq!(event.id, MESSAGE_ID);
422 assert_eq!(event.source.as_str(), source);
423 assert_eq!(
424 event
425 .attributes
426 .get(EXTENSION_NAME_SINK)
427 .map(|v| v.ce_uri_ref().to_owned()),
428 sink
429 );
430 assert_eq!(
431 event
432 .attributes
433 .get(EXTENSION_NAME_PRIORITY)
434 .map(|v| v.ce_string().to_owned()),
435 Some(PRIORITY.to_priority_code())
436 );
437 assert_eq!(
438 event
439 .attributes
440 .get(EXTENSION_NAME_TTL)
441 .map(|v| v.ce_integer() as u32),
442 Some(TTL),
443 "unexpected TTL"
444 );
445 assert_eq!(
446 event
447 .attributes
448 .get(EXTENSION_NAME_TRACEPARENT)
449 .map(|v| v.ce_string()),
450 Some(TRACEPARENT)
451 );
452 }
453
454 #[test]
455 fn test_try_from_publish_message_succeeds() {
456 let message_id = MESSAGE_ID
457 .parse::<UUID>()
458 .expect("failed to parse message ID");
459 let message =
460 UMessageBuilder::publish(UUri::from_str(TOPIC).expect("failed to create topic URI"))
461 .with_message_id(message_id)
462 .with_priority(PRIORITY)
463 .with_ttl(TTL)
464 .with_traceparent(TRACEPARENT)
465 .build_with_payload("test".as_bytes(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
466 .expect("failed to create message");
467
468 let event =
469 CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
470 assert_standard_cloudevent_attributes(&event, "up-pub.v1", TOPIC, None);
471 assert_eq!(
472 event
473 .attributes
474 .get(EXTENSION_NAME_PFORMAT)
475 .map(|v| v.ce_integer()),
476 Some(UPayloadFormat::UPAYLOAD_FORMAT_TEXT.value())
477 );
478 assert_eq!(event.text_data(), "test");
479 }
480
481 #[test]
482 fn test_try_from_notification_message_succeeds() {
483 let message_id = MESSAGE_ID
484 .parse::<UUID>()
485 .expect("failed to parse message ID");
486 let message = UMessageBuilder::notification(
487 UUri::from_str(TOPIC).expect("failed to create source URI"),
488 UUri::from_str(DESTINATION).expect("failed to create sink URI"),
489 )
490 .with_message_id(message_id)
491 .with_priority(PRIORITY)
492 .with_ttl(TTL)
493 .with_traceparent(TRACEPARENT)
494 .build_with_payload(
495 "{\"count\": 5}".as_bytes(),
496 UPayloadFormat::UPAYLOAD_FORMAT_JSON,
497 )
498 .expect("failed to create message");
499
500 let event =
501 CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
502 assert_standard_cloudevent_attributes(
503 &event,
504 "up-not.v1",
505 TOPIC,
506 Some(DESTINATION.to_string()),
507 );
508 assert_eq!(
509 event
510 .attributes
511 .get(EXTENSION_NAME_PFORMAT)
512 .map(|v| v.ce_integer()),
513 Some(UPayloadFormat::UPAYLOAD_FORMAT_JSON.value())
514 );
515 assert_eq!(event.text_data(), "{\"count\": 5}");
516 }
517
518 #[test]
519 fn test_try_from_request_message_succeeds() {
520 let mut payload = StringValue::new();
521 payload.value = "Hello".into();
522
523 let message_id = MESSAGE_ID
524 .parse::<UUID>()
525 .expect("failed to parse message ID");
526 let token = "my-token";
527 let message = UMessageBuilder::request(
528 UUri::from_str(METHOD).expect("failed to create sink URI"),
529 UUri::from_str(REPLY_TO).expect("failed to create source URI"),
530 TTL,
531 )
532 .with_message_id(message_id)
533 .with_priority(PRIORITY)
534 .with_permission_level(PERMISSION_LEVEL)
535 .with_traceparent(TRACEPARENT)
536 .with_token(token)
537 .build_with_wrapped_protobuf_payload(&payload)
538 .expect("failed to create message");
539 let event =
540 CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
541 assert_standard_cloudevent_attributes(
542 &event,
543 "up-req.v1",
544 REPLY_TO,
545 Some(METHOD.to_string()),
546 );
547 assert_eq!(
548 event
549 .attributes
550 .get(EXTENSION_NAME_TOKEN)
551 .map(|v| v.ce_string()),
552 Some(token)
553 );
554 assert_eq!(
555 event
556 .attributes
557 .get(EXTENSION_NAME_PERMISSION_LEVEL)
558 .map(|v| v.ce_integer()),
559 Some(PERMISSION_LEVEL as i32)
560 );
561 assert_eq!(
562 event
563 .attributes
564 .get(EXTENSION_NAME_PFORMAT)
565 .map(|v| v.ce_integer()),
566 Some(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY.value())
567 );
568 assert!(!event.has_binary_data());
569 assert!(!event.has_text_data());
570 let payload_wrapped_in_any = Any::pack(&payload).expect("failed to wrap payload in Any");
571 assert_eq!(
572 event.proto_data().value,
573 Any::pack(&payload_wrapped_in_any)
574 .expect("failed to pack payload into Any")
575 .value
576 );
577 }
578
579 #[test]
580 fn test_try_from_response_message_succeeds() {
581 let mut payload = StringValue::new();
582 payload.value = "Hello".into();
583
584 let message_id = MESSAGE_ID
585 .parse::<UUID>()
586 .expect("failed to parse message ID");
587 let request_id = UUID::build();
588
589 let message = UMessageBuilder::response(
590 UUri::from_str(REPLY_TO).expect("failed to create sink URI"),
591 request_id.clone(),
592 UUri::from_str(METHOD).expect("failed to create source URI"),
593 )
594 .with_message_id(message_id)
595 .with_ttl(TTL)
596 .with_priority(PRIORITY)
597 .with_comm_status(UCode::OK)
598 .with_traceparent(TRACEPARENT)
599 .build_with_protobuf_payload(&payload)
600 .expect("failed to create message");
601
602 let event =
603 CloudEvent::try_from(message).expect("failed to create CloudEvent from UMessage");
604 assert_standard_cloudevent_attributes(
605 &event,
606 "up-res.v1",
607 METHOD,
608 Some(REPLY_TO.to_string()),
609 );
610 assert_eq!(
611 event
612 .attributes
613 .get(EXTENSION_NAME_COMMSTATUS)
614 .map(|v| v.ce_integer()),
615 None
616 );
617 assert_eq!(
618 event
619 .attributes
620 .get(EXTENSION_NAME_PFORMAT)
621 .map(|v| v.ce_integer()),
622 Some(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF.value())
623 );
624 assert!(!event.has_binary_data());
625 assert!(!event.has_text_data());
626 assert_eq!(
627 event.proto_data().value,
628 Any::pack(&payload)
629 .expect("failed to pack payload into Any")
630 .value
631 );
632 }
633
634 fn assert_standard_umessage_attributes(
640 attribs: &UAttributes,
641 message_type: UMessageType,
642 source: &str,
643 sink: Option<String>,
644 ) {
645 assert_eq!(attribs.type_.enum_value_or_default(), message_type);
646 assert_eq!(
647 attribs.id.get_or_default().to_hyphenated_string(),
648 MESSAGE_ID
649 );
650 assert_eq!(attribs.source.get_or_default().to_uri(false), source);
651 assert_eq!(attribs.sink.as_ref().map(|uuri| uuri.to_uri(false)), sink);
652 assert_eq!(
653 attribs.priority.enum_value_or_default(),
654 UPriority::UPRIORITY_CS4
655 );
656 assert_eq!(attribs.ttl, Some(TTL));
657 assert_eq!(attribs.traceparent, Some(TRACEPARENT.to_string()));
658 }
659
660 #[test]
661 fn test_try_from_cloudevent_without_sink_fails() {
662 let mut event = CloudEvent::new();
663 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
664 event.type_ = UMessageType::UMESSAGE_TYPE_NOTIFICATION.to_cloudevent_type();
665 event.id = MESSAGE_ID.into();
666 event.source = TOPIC.into();
667
668 assert!(UMessage::try_from(event).is_err());
669 }
670
671 #[test]
672 fn test_try_from_publish_cloudevent_succeeds() {
673 let mut event = CloudEvent::new();
674 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
675 event.set_type(UMessageType::UMESSAGE_TYPE_PUBLISH);
676 event.id = MESSAGE_ID.into();
677 event.source = TOPIC.into();
678 event.set_priority(UPriority::UPRIORITY_CS4);
679 event.set_ttl(TTL).expect("failed to set TTL on message");
680 event.set_traceparent(TRACEPARENT);
681 event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
682 event.set_text_data("test".to_string());
683
684 let umessage =
685 UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
686 let attribs = umessage.attributes.get_or_default();
687 assert_standard_umessage_attributes(
688 attribs,
689 UMessageType::UMESSAGE_TYPE_PUBLISH,
690 TOPIC,
691 None,
692 );
693 assert_eq!(
694 attribs.payload_format.enum_value_or_default(),
695 UPayloadFormat::UPAYLOAD_FORMAT_TEXT
696 );
697 assert_eq!(umessage.payload, Some("test".as_bytes().to_vec().into()))
698 }
699
700 #[test]
701 fn test_try_from_notification_cloudevent_succeeds() {
702 let mut event = CloudEvent::new();
703 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
704 event.set_type(UMessageType::UMESSAGE_TYPE_NOTIFICATION);
705 event.id = MESSAGE_ID.into();
706 event.source = TOPIC.into();
707 event.set_sink(DESTINATION);
708 event.set_priority(UPriority::UPRIORITY_CS4);
709 event.set_ttl(TTL).expect("failed to set TTL on message");
710 event.set_traceparent(TRACEPARENT);
711 event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_JSON);
712 event.set_text_data("{\"count\": 5}".to_string());
713
714 let umessage =
715 UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
716 let attribs = umessage.attributes.get_or_default();
717 assert_standard_umessage_attributes(
718 attribs,
719 UMessageType::UMESSAGE_TYPE_NOTIFICATION,
720 TOPIC,
721 Some(DESTINATION.to_string()),
722 );
723 assert_eq!(
724 attribs.payload_format.enum_value_or_default(),
725 UPayloadFormat::UPAYLOAD_FORMAT_JSON
726 );
727 assert_eq!(
728 umessage.payload,
729 Some("{\"count\": 5}".as_bytes().to_vec().into())
730 )
731 }
732
733 #[test]
734 fn test_try_from_request_cloudevent_succeeds() {
735 let mut event = CloudEvent::new();
736 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
737 event.set_type(UMessageType::UMESSAGE_TYPE_REQUEST);
738 event.id = MESSAGE_ID.into();
739 event.source = REPLY_TO.into();
740 event.set_sink(METHOD);
741 event.set_priority(UPriority::UPRIORITY_CS4);
742 event.set_ttl(TTL).expect("failed to set TTL on message");
743 event.set_traceparent(TRACEPARENT);
744 event
745 .set_permission_level(PERMISSION_LEVEL)
746 .expect("failed to set permission level on message");
747 event.set_token("my-token");
748
749 let mut payload = StringValue::new();
750 payload.value = "Hello".into();
751 let payload_wrapped_in_any = Any::pack(&payload).expect("failed to wrap payload in Any");
752 let serialized_payload = payload_wrapped_in_any
753 .write_to_bytes()
754 .expect("failed to serialize payload");
755 event.set_proto_data(Any {
756 value: serialized_payload.clone(),
757 ..Default::default()
758 });
759
760 let umessage =
761 UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
762 let attribs = umessage.attributes.get_or_default();
763 assert_standard_umessage_attributes(
764 attribs,
765 UMessageType::UMESSAGE_TYPE_REQUEST,
766 REPLY_TO,
767 Some(METHOD.to_string()),
768 );
769 assert_eq!(attribs.permission_level, Some(PERMISSION_LEVEL));
770 assert_eq!(attribs.token, Some("my-token".to_string()));
771 assert_eq!(
772 attribs.payload_format.enum_value_or_default(),
773 UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED
774 );
775 assert_eq!(umessage.payload, Some(serialized_payload.into()));
776 }
777
778 #[test]
779 fn test_try_from_response_cloudevent_succeeds() {
780 let request_id = UUID::build();
781 let mut event = CloudEvent::new();
782 event.spec_version = CLOUDEVENTS_SPEC_VERSION.into();
783 event.set_type(UMessageType::UMESSAGE_TYPE_RESPONSE);
784 event.id = MESSAGE_ID.into();
785 event.source = METHOD.into();
786 event.set_sink(REPLY_TO);
787 event.set_priority(UPriority::UPRIORITY_CS4);
788 event.set_ttl(TTL).expect("failed to set TTL on message");
789 event.set_traceparent(TRACEPARENT);
790 event.set_request_id(&request_id);
791 event.set_commstatus(UCode::OK);
792 event.set_payload_format(UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF);
793 event.set_proto_data(Any {
794 value: DATA.to_vec(),
795 ..Default::default()
796 });
797
798 let umessage =
799 UMessage::try_from(event).expect("failed to create UMessage from CloudEvent");
800 let attribs = umessage.attributes.get_or_default();
801 assert_standard_umessage_attributes(
802 attribs,
803 UMessageType::UMESSAGE_TYPE_RESPONSE,
804 METHOD,
805 Some(REPLY_TO.to_string()),
806 );
807 assert_eq!(attribs.commstatus, None);
808 assert_eq!(attribs.reqid, Some(request_id).into());
809 assert_eq!(
810 attribs.payload_format.enum_value_or_default(),
811 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF
812 );
813 assert_eq!(umessage.payload, Some(DATA.to_vec().into()))
814 }
815}