up_rust/umessage/umessagebuilder.rs
1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use bytes::Bytes;
15use protobuf::{well_known_types::any::Any, Enum, EnumOrUnknown, Message, MessageFull};
16
17use crate::uattributes::NotificationValidator;
18use crate::{
19 PublishValidator, RequestValidator, ResponseValidator, UAttributes, UAttributesValidator,
20 UCode, UMessage, UMessageError, UMessageType, UPayloadFormat, UPriority, UUri, UUID,
21};
22
23const PRIORITY_DEFAULT: UPriority = UPriority::UPRIORITY_CS1;
24
25/// A builder for creating [`UMessage`]s.
26///
27/// Messages are being used by a uEntity to inform other entities about the occurrence of events
28/// and/or to invoke service operations provided by other entities.
29pub struct UMessageBuilder {
30 comm_status: Option<EnumOrUnknown<UCode>>,
31 message_id: Option<UUID>,
32 message_type: UMessageType,
33 payload: Option<Bytes>,
34 payload_format: UPayloadFormat,
35 permission_level: Option<u32>,
36 priority: UPriority,
37 request_id: Option<UUID>,
38 sink: Option<UUri>,
39 source: Option<UUri>,
40 token: Option<String>,
41 traceparent: Option<String>,
42 ttl: Option<u32>,
43 validator: Box<dyn UAttributesValidator>,
44}
45
46impl Default for UMessageBuilder {
47 fn default() -> Self {
48 UMessageBuilder {
49 comm_status: None,
50 message_id: None,
51 message_type: UMessageType::UMESSAGE_TYPE_UNSPECIFIED,
52 payload: None,
53 payload_format: UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED,
54 permission_level: None,
55 priority: UPriority::UPRIORITY_UNSPECIFIED,
56 request_id: None,
57 sink: None,
58 source: None,
59 token: None,
60 traceparent: None,
61 ttl: None,
62 validator: Box::new(PublishValidator),
63 }
64 }
65}
66
67impl UMessageBuilder {
68 /// Gets a builder for creating *publish* messages.
69 ///
70 /// A publish message is used to notify all interested consumers of an event that has occurred.
71 /// Consumers usually indicate their interest by *subscribing* to a particular topic.
72 ///
73 /// # Arguments
74 ///
75 /// * `topic` - The topic to publish the message to.
76 ///
77 /// # Examples
78 ///
79 /// ```rust
80 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
81 ///
82 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
83 /// let topic = UUri::try_from("//my-vehicle/4210/1/B24D")?;
84 /// let message = UMessageBuilder::publish(topic.clone())
85 /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
86 /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_PUBLISH.into());
87 /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into());
88 /// assert_eq!(message.attributes.source, Some(topic).into());
89 /// # Ok(())
90 /// # }
91 /// ```
92 pub fn publish(topic: UUri) -> UMessageBuilder {
93 UMessageBuilder {
94 validator: Box::new(PublishValidator),
95 message_type: UMessageType::UMESSAGE_TYPE_PUBLISH,
96 source: Some(topic),
97 ..Default::default()
98 }
99 }
100
101 /// Gets a builder for creating *notification* messages.
102 ///
103 /// A notification is used to inform a specific consumer about an event that has occurred.
104 ///
105 /// # Arguments
106 ///
107 /// * `origin` - The component that the notification originates from.
108 /// * `destination` - The URI identifying the destination to send the notification to.
109 ///
110 /// # Examples
111 ///
112 /// ```rust
113 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
114 ///
115 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
116 /// let origin = UUri::try_from("//my-vehicle/4210/5/F20B")?;
117 /// let destination = UUri::try_from("//my-cloud/CCDD/2/0")?;
118 /// let message = UMessageBuilder::notification(origin.clone(), destination.clone())
119 /// .build_with_payload("unexpected movement", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
120 /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_NOTIFICATION.into());
121 /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into());
122 /// assert_eq!(message.attributes.source, Some(origin).into());
123 /// assert_eq!(message.attributes.sink, Some(destination).into());
124 /// # Ok(())
125 /// # }
126 /// ```
127 pub fn notification(origin: UUri, destination: UUri) -> UMessageBuilder {
128 UMessageBuilder {
129 validator: Box::new(NotificationValidator),
130 message_type: UMessageType::UMESSAGE_TYPE_NOTIFICATION,
131 source: Some(origin),
132 sink: Some(destination),
133 ..Default::default()
134 }
135 }
136
137 /// Gets a builder for creating RPC *request* messages.
138 ///
139 /// A request message is used to invoke a service's method with some input data, expecting
140 /// the service to reply with a response message which is correlated by means of the `request_id`.
141 ///
142 /// The builder will be initialized with [`UPriority::UPRIORITY_CS4`].
143 ///
144 /// # Arguments
145 ///
146 /// * `method_to_invoke` - The URI identifying the method to invoke.
147 /// * `reply_to_address` - The URI that the sender of the request expects the response message at.
148 /// * `ttl` - The number of milliseconds after which the request should no longer be processed
149 /// by the target service. The value is capped at [`i32::MAX`].
150 ///
151 /// # Examples
152 ///
153 /// ```rust
154 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
155 ///
156 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
157 /// let method_to_invoke = UUri::try_from("//my-vehicle/4210/5/64AB")?;
158 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
159 /// let message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
160 /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
161 /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_REQUEST.into());
162 /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into());
163 /// assert_eq!(message.attributes.source, Some(reply_to_address).into());
164 /// assert_eq!(message.attributes.sink, Some(method_to_invoke).into());
165 /// assert_eq!(message.attributes.ttl, Some(5000));
166 /// # Ok(())
167 /// # }
168 /// ```
169 pub fn request(method_to_invoke: UUri, reply_to_address: UUri, ttl: u32) -> UMessageBuilder {
170 UMessageBuilder {
171 validator: Box::new(RequestValidator),
172 message_type: UMessageType::UMESSAGE_TYPE_REQUEST,
173 source: Some(reply_to_address),
174 sink: Some(method_to_invoke),
175 ttl: Some(ttl),
176 priority: UPriority::UPRIORITY_CS4,
177 ..Default::default()
178 }
179 }
180
181 /// Gets a builder for creating RPC *response* messages.
182 ///
183 /// A response message is used to send the outcome of processing a request message
184 /// to the original sender of the request message.
185 ///
186 /// The builder will be initialized with [`UPriority::UPRIORITY_CS4`].
187 ///
188 /// # Arguments
189 ///
190 /// * `reply_to_address` - The URI that the sender of the request expects to receive the response message at.
191 /// * `request_id` - The identifier of the request that this is the response to.
192 /// * `invoked_method` - The URI identifying the method that has been invoked and which the created message is
193 /// the outcome of.
194 ///
195 /// # Examples
196 ///
197 /// ```rust
198 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUID, UUri};
199 ///
200 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
201 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
202 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
203 /// let request_id = UUID::build();
204 /// // a service implementation would normally use
205 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
206 /// let message = UMessageBuilder::response(reply_to_address.clone(), request_id.clone(), invoked_method.clone())
207 /// .build()?;
208 /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_RESPONSE.into());
209 /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into());
210 /// assert_eq!(message.attributes.source, Some(invoked_method).into());
211 /// assert_eq!(message.attributes.sink, Some(reply_to_address).into());
212 /// assert_eq!(message.attributes.reqid, Some(request_id).into());
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub fn response(
217 reply_to_address: UUri,
218 request_id: UUID,
219 invoked_method: UUri,
220 ) -> UMessageBuilder {
221 UMessageBuilder {
222 validator: Box::new(ResponseValidator),
223 message_type: UMessageType::UMESSAGE_TYPE_RESPONSE,
224 source: Some(invoked_method),
225 sink: Some(reply_to_address),
226 request_id: Some(request_id),
227 priority: UPriority::UPRIORITY_CS4,
228 ..Default::default()
229 }
230 }
231
232 /// Gets a builder for creating RPC *response* messages in reply to a *request*.
233 ///
234 /// A response message is used to send the outcome of processing a request message
235 /// to the original sender of the request message.
236 ///
237 /// The builder will be initialized with values from the given request attributes.
238 ///
239 /// # Arguments
240 ///
241 /// * `request_attributes` - The attributes from the request message. The response message builder will be initialized
242 /// with the corresponding attribute values.
243 ///
244 /// # Examples
245 ///
246 /// ```rust
247 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUID, UUri};
248 ///
249 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
250 /// let method_to_invoke = UUri::try_from("//my-vehicle/4210/5/64AB")?;
251 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
252 /// let request_message_id = UUID::build();
253 /// let request_message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
254 /// .with_message_id(request_message_id.clone()) // normally not needed, used only for asserts below
255 /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
256 ///
257 /// let response_message = UMessageBuilder::response_for_request(&request_message.attributes)
258 /// .with_priority(UPriority::UPRIORITY_CS5)
259 /// .build()?;
260 /// assert_eq!(response_message.attributes.type_, UMessageType::UMESSAGE_TYPE_RESPONSE.into());
261 /// assert_eq!(response_message.attributes.priority, UPriority::UPRIORITY_CS5.into());
262 /// assert_eq!(response_message.attributes.source, Some(method_to_invoke).into());
263 /// assert_eq!(response_message.attributes.sink, Some(reply_to_address).into());
264 /// assert_eq!(response_message.attributes.reqid, Some(request_message_id).into());
265 /// # Ok(())
266 /// # }
267 /// ```
268 pub fn response_for_request(request_attributes: &UAttributes) -> UMessageBuilder {
269 UMessageBuilder {
270 validator: Box::new(ResponseValidator),
271 message_type: UMessageType::UMESSAGE_TYPE_RESPONSE,
272 source: request_attributes.sink.as_ref().cloned(),
273 sink: request_attributes.source.as_ref().cloned(),
274 request_id: request_attributes.id.as_ref().cloned(),
275 priority: request_attributes
276 .priority
277 .enum_value_or(UPriority::UPRIORITY_CS4),
278 ..Default::default()
279 }
280 }
281
282 /// Sets the message's identifier.
283 ///
284 /// Every message must have an identifier. If this function is not used, an identifier will be
285 /// generated and set on the message when one of the `build` functions is called on the
286 /// `UMessageBuilder`.
287 ///
288 /// It's more typical to _not_ use this function, but could have edge case uses.
289 ///
290 /// # Arguments
291 ///
292 /// * `message_id` - The identifier to use.
293 ///
294 /// # Returns
295 ///
296 /// The builder.
297 ///
298 /// # Panics
299 ///
300 /// Panics if the given UUID is not a [valid uProtocol UUID](`UUID::is_uprotocol_uuid`).
301 ///
302 /// # Examples
303 ///
304 /// ```rust
305 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUID, UUri};
306 ///
307 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
308 /// let topic = UUri::try_from("//my-vehicle/4210/1/B24D")?;
309 /// let mut builder = UMessageBuilder::publish(topic);
310 /// builder.with_priority(UPriority::UPRIORITY_CS2);
311 /// let message_one = builder
312 /// .with_message_id(UUID::build())
313 /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
314 /// let message_two = builder
315 /// // use new message ID but retain all other attributes
316 /// .with_message_id(UUID::build())
317 /// .build_with_payload("open", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
318 /// assert_ne!(message_one.attributes.id, message_two.attributes.id);
319 /// assert_eq!(message_one.attributes.source, message_two.attributes.source);
320 /// assert_eq!(message_one.attributes.priority, UPriority::UPRIORITY_CS2.into());
321 /// assert_eq!(message_two.attributes.priority, UPriority::UPRIORITY_CS2.into());
322 /// # Ok(())
323 /// # }
324 /// ```
325 pub fn with_message_id(&mut self, message_id: UUID) -> &mut UMessageBuilder {
326 assert!(
327 message_id.is_uprotocol_uuid(),
328 "Message ID must be a valid uProtocol UUID"
329 );
330 self.message_id = Some(message_id);
331 self
332 }
333
334 /// Sets the message's priority.
335 ///
336 /// If not set explicitly, the default priority as defined in the
337 /// [uProtocol specification](https://github.com/eclipse-uprotocol/up-spec/blob/main/basics/qos.adoc)
338 /// is used.
339 ///
340 /// # Arguments
341 ///
342 /// * `priority` - The priority to be used for sending the message.
343 ///
344 /// # Returns
345 ///
346 /// The builder.
347 ///
348 /// # Panics
349 ///
350 /// if the builder is used for creating an RPC message but the given priority is less than CS4.
351 ///
352 /// # Examples
353 ///
354 /// ```rust
355 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
356 ///
357 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
358 /// let topic = UUri::try_from("//my-vehicle/4210/1/B24D")?;
359 /// let message = UMessageBuilder::publish(topic)
360 /// .with_priority(UPriority::UPRIORITY_CS5)
361 /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
362 /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into());
363 /// # Ok(())
364 /// # }
365 /// ```
366 pub fn with_priority(&mut self, priority: UPriority) -> &mut UMessageBuilder {
367 if self.message_type == UMessageType::UMESSAGE_TYPE_REQUEST
368 || self.message_type == UMessageType::UMESSAGE_TYPE_RESPONSE
369 {
370 assert!(priority.value() >= UPriority::UPRIORITY_CS4.value())
371 }
372 if priority != PRIORITY_DEFAULT {
373 // only set priority explicitly if it differs from the default priority
374 self.priority = priority;
375 } else {
376 // in all other cases set to UNSPECIFIED which will result in the
377 // priority not being included in the serialized protobuf
378 self.priority = UPriority::UPRIORITY_UNSPECIFIED;
379 }
380 self
381 }
382
383 /// Sets the message's time-to-live.
384 ///
385 /// # Arguments
386 ///
387 /// * `ttl` - The time-to-live in milliseconds. The value is capped at [`i32::MAX`].
388 ///
389 /// # Returns
390 ///
391 /// The builder.
392 ///
393 /// # Examples
394 ///
395 /// ```rust
396 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUID, UUri};
397 ///
398 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
399 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
400 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
401 /// let request_msg_id = UUID::build();
402 /// // a service implementation would normally use
403 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
404 /// let message = UMessageBuilder::response(reply_to_address, request_msg_id, invoked_method)
405 /// .with_ttl(2000)
406 /// .build()?;
407 /// assert_eq!(message.attributes.ttl, Some(2000));
408 /// # Ok(())
409 /// # }
410 /// ```
411 pub fn with_ttl(&mut self, ttl: u32) -> &mut UMessageBuilder {
412 self.ttl = Some(ttl);
413 self
414 }
415
416 /// Sets the message's authorization token used for TAP.
417 ///
418 /// # Arguments
419 ///
420 /// * `token` - The token.
421 ///
422 /// # Returns
423 ///
424 /// The builder.
425 ///
426 /// # Panics
427 ///
428 /// * if the message is not an RPC request message
429 ///
430 /// # Examples
431 ///
432 /// ```rust
433 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
434 ///
435 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
436 /// let method_to_invoke = UUri::try_from("//my-vehicle/4210/5/64AB")?;
437 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
438 /// let token = String::from("this-is-my-token");
439 /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000)
440 /// .with_token(token.clone())
441 /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
442 /// assert_eq!(message.attributes.token, Some(token));
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub fn with_token<T: Into<String>>(&mut self, token: T) -> &mut UMessageBuilder {
447 assert!(self.message_type == UMessageType::UMESSAGE_TYPE_REQUEST);
448 self.token = Some(token.into());
449 self
450 }
451
452 /// Sets the message's permission level.
453 ///
454 /// # Arguments
455 ///
456 /// * `level` - The level.
457 ///
458 /// # Returns
459 ///
460 /// The builder.
461 ///
462 /// # Panics
463 ///
464 /// * if the given level is greater than [`i32::MAX`]
465 /// * if the message is not an RPC request message
466 ///
467 /// # Examples
468 ///
469 /// ```rust
470 /// use up_rust::{UCode, UMessageBuilder, UPayloadFormat, UPriority, UUri};
471 ///
472 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
473 /// let method_to_invoke = UUri::try_from("//my-vehicle/4210/5/64AB")?;
474 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
475 /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000)
476 /// .with_permission_level(12)
477 /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
478 /// assert_eq!(message.attributes.permission_level, Some(12));
479 /// # Ok(())
480 /// # }
481 /// ```
482 pub fn with_permission_level(&mut self, level: u32) -> &mut UMessageBuilder {
483 assert!(self.message_type == UMessageType::UMESSAGE_TYPE_REQUEST);
484 self.permission_level = Some(level);
485 self
486 }
487
488 /// Sets the message's communication status.
489 ///
490 /// # Arguments
491 ///
492 /// * `comm_status` - The status.
493 ///
494 /// # Returns
495 ///
496 /// The builder.
497 ///
498 /// # Panics
499 ///
500 /// * if the message is not an RPC response message
501 ///
502 /// # Examples
503 ///
504 /// ```rust
505 /// use up_rust::{UCode, UMessageBuilder, UPriority, UUID, UUri};
506 ///
507 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
508 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
509 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
510 /// let request_msg_id = UUID::build();
511 /// // a service implementation would normally use
512 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
513 /// let message = UMessageBuilder::response(reply_to_address, request_msg_id, invoked_method)
514 /// .with_comm_status(UCode::OK)
515 /// .build()?;
516 /// assert_eq!(message.attributes.commstatus, Some(UCode::OK.into()));
517 /// # Ok(())
518 /// # }
519 /// ```
520 pub fn with_comm_status(&mut self, comm_status: UCode) -> &mut UMessageBuilder {
521 assert!(self.message_type == UMessageType::UMESSAGE_TYPE_RESPONSE);
522 self.comm_status = Some(comm_status.into());
523 self
524 }
525
526 /// Sets the identifier of the W3C Trace Context to convey in the message.
527 ///
528 /// # Arguments
529 ///
530 /// * `traceparent` - The identifier.
531 ///
532 /// # Returns
533 ///
534 /// The builder.
535 ///
536 /// # Examples
537 ///
538 /// ```rust
539 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
540 ///
541 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
542 /// let topic = UUri::try_from("//my-vehicle/4210/1/B24D")?;
543 /// let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
544 /// let message = UMessageBuilder::publish(topic.clone())
545 /// .with_traceparent(traceparent)
546 /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
547 /// assert_eq!(message.attributes.traceparent, Some(traceparent.to_string()));
548 /// # Ok(())
549 /// # }
550 pub fn with_traceparent<T: Into<String>>(&mut self, traceparent: T) -> &mut UMessageBuilder {
551 self.traceparent = Some(traceparent.into());
552 self
553 }
554
555 /// Creates the message based on the builder's state.
556 ///
557 /// # Returns
558 ///
559 /// A message ready to be sent using [`crate::UTransport::send`].
560 ///
561 /// # Errors
562 ///
563 /// If the properties set on the builder do not represent a consistent set of [`UAttributes`],
564 /// a [`UMessageError::AttributesValidationError`] is returned.
565 ///
566 /// # Examples
567 ///
568 /// ## Not setting `id` explicitly with [`UMessageBuilder::with_message_id()']
569 ///
570 /// The recommended way to use the `UMessageBuilder`.
571 ///
572 /// ```rust
573 /// use up_rust::{UAttributes, UAttributesValidators, UMessageBuilder, UMessageError, UMessageType, UPriority, UUID, UUri};
574 ///
575 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
576 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
577 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
578 /// // a service implementation would normally use
579 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
580 /// let result = UMessageBuilder::response(reply_to_address, UUID::build(), invoked_method)
581 /// .build();
582 /// assert!(result.is_ok());
583 /// # Ok(())
584 /// # }
585 /// ```
586 ///
587 /// ## Setting `id` explicitly with [`UMessageBuilder::with_message_id()']
588 ///
589 /// Note that explicitly using [`UMessageBuilder::with_message_id()'] is not required as shown
590 /// above.
591 ///
592 /// ```rust
593 /// use up_rust::{UMessageBuilder, UMessageType, UPriority, UUID, UUri};
594 ///
595 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
596 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
597 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
598 /// let message_id = UUID::build();
599 /// // a service implementation would normally use
600 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
601 /// let message = UMessageBuilder::response(reply_to_address, UUID::build(), invoked_method)
602 /// .with_message_id(message_id.clone())
603 /// .build()?;
604 /// assert_eq!(message.attributes.id, Some(message_id).into());
605 /// # Ok(())
606 /// # }
607 /// ```
608 pub fn build(&self) -> Result<UMessage, UMessageError> {
609 let message_id = self
610 .message_id
611 .clone()
612 .map_or_else(|| Some(UUID::build()), Some);
613 let attributes = UAttributes {
614 commstatus: self.comm_status,
615 id: message_id.into(),
616 payload_format: self.payload_format.into(),
617 permission_level: self.permission_level,
618 priority: self.priority.into(),
619 reqid: self.request_id.clone().into(),
620 sink: self.sink.clone().into(),
621 source: self.source.clone().into(),
622 token: self.token.clone(),
623 traceparent: self.traceparent.clone(),
624 ttl: self.ttl,
625 type_: self.message_type.into(),
626 ..Default::default()
627 };
628 self.validator
629 .validate(&attributes)
630 .map_err(UMessageError::from)
631 .map(|_| UMessage {
632 attributes: Some(attributes).into(),
633 payload: self.payload.to_owned(),
634 ..Default::default()
635 })
636 }
637
638 /// Creates the message based on the builder's state and some payload.
639 ///
640 /// # Arguments
641 ///
642 /// * `payload` - The data to set as payload.
643 /// * `format` - The payload format.
644 ///
645 /// # Returns
646 ///
647 /// A message ready to be sent using [`crate::UTransport::send`].
648 ///
649 /// # Errors
650 ///
651 /// If the properties set on the builder do not represent a consistent set of [`UAttributes`],
652 /// a [`UMessageError::AttributesValidationError`] is returned.
653 ///
654 /// # Examples
655 ///
656 /// ```rust
657 /// use up_rust::{UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UUri};
658 ///
659 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
660 /// let topic = UUri::try_from("//my-vehicle/4210/1/B24D")?;
661 /// let message = UMessageBuilder::publish(topic)
662 /// .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
663 /// assert!(message.payload.is_some());
664 /// # Ok(())
665 /// # }
666 /// ```
667 pub fn build_with_payload<T: Into<Bytes>>(
668 &mut self,
669 payload: T,
670 format: UPayloadFormat,
671 ) -> Result<UMessage, UMessageError> {
672 self.payload = Some(payload.into());
673 self.payload_format = format;
674
675 self.build()
676 }
677
678 /// Creates the message based on the builder's state and some payload.
679 ///
680 /// # Arguments
681 ///
682 /// * `payload` - The data to set as payload.
683 ///
684 /// # Returns
685 ///
686 /// A message ready to be sent using [`crate::UTransport::send`]. The message will have
687 /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`] set as its payload format.
688 ///
689 /// # Errors
690 ///
691 /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned.
692 /// If the properties set on the builder do not represent a consistent set of [`UAttributes`],
693 /// a [`UMessageError::AttributesValidationError`] is returned.
694 ///
695 /// # Examples
696 ///
697 /// ```rust
698 /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUID, UUri};
699 ///
700 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
701 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
702 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
703 /// let request_id = UUID::build();
704 /// // a service implementation would normally use
705 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
706 /// let message = UMessageBuilder::response(reply_to_address, request_id, invoked_method)
707 /// .with_comm_status(UCode::INVALID_ARGUMENT)
708 /// .build_with_protobuf_payload(&UStatus::fail("failed to parse request"))?;
709 /// assert!(message.payload.is_some());
710 /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF);
711 /// # Ok(())
712 /// # }
713 /// ```
714 pub fn build_with_protobuf_payload<T: Message>(
715 &mut self,
716 payload: &T,
717 ) -> Result<UMessage, UMessageError> {
718 payload
719 .write_to_bytes()
720 .map_err(UMessageError::from)
721 .and_then(|serialized_payload| {
722 self.build_with_payload(
723 serialized_payload,
724 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
725 )
726 })
727 }
728
729 /// Creates the message based on the builder's state and some payload.
730 ///
731 /// # Arguments
732 ///
733 /// * `payload` - The data to set as payload.
734 ///
735 /// # Returns
736 ///
737 /// A message ready to be sent using [`crate::UTransport::send`]. The message will have
738 /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`] set as its payload format.
739 ///
740 /// # Errors
741 ///
742 /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned.
743 /// If the properties set on the builder do not represent a consistent set of [`UAttributes`],
744 /// a [`UMessageError::AttributesValidationError`] is returned.
745 ///
746 /// # Examples
747 ///
748 /// ```rust
749 /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUID, UUri};
750 ///
751 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
752 /// let invoked_method = UUri::try_from("//my-vehicle/4210/5/64AB")?;
753 /// let reply_to_address = UUri::try_from("//my-cloud/BA4C/1/0")?;
754 /// let request_id = UUID::build();
755 /// // a service implementation would normally use
756 /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead
757 /// let message = UMessageBuilder::response(reply_to_address, request_id, invoked_method)
758 /// .with_comm_status(UCode::INVALID_ARGUMENT)
759 /// .build_with_wrapped_protobuf_payload(&UStatus::fail("failed to parse request"))?;
760 /// assert!(message.payload.is_some());
761 /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub fn build_with_wrapped_protobuf_payload<T: MessageFull>(
766 &mut self,
767 payload: &T,
768 ) -> Result<UMessage, UMessageError> {
769 Any::pack(payload)
770 .map_err(UMessageError::DataSerializationError)
771 .and_then(|any| any.write_to_bytes().map_err(UMessageError::from))
772 .and_then(|serialized_payload| {
773 self.build_with_payload(
774 serialized_payload,
775 UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
776 )
777 })
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use crate::UCode;
784
785 use super::*;
786
787 use test_case::test_case;
788
789 const METHOD_TO_INVOKE: &str = "//my-vehicle/4D123/2/6FA3";
790 const REPLY_TO_ADDRESS: &str = "//my-cloud/9CB3/1/0";
791 const TOPIC: &str = "//my-vehicle/4210/1/B24D";
792
793 #[test]
794 #[should_panic]
795 fn test_with_message_id_panics_for_invalid_uuid() {
796 let invalid_message_id = UUID {
797 msb: 0x00000000000000ab_u64,
798 lsb: 0x0000000000018000_u64,
799 ..Default::default()
800 };
801 let topic = UUri::try_from(TOPIC).expect("should have been able to create UUri");
802 UMessageBuilder::publish(topic).with_message_id(invalid_message_id);
803 }
804
805 #[test_case(Some(5), None, None; "with permission level")]
806 #[test_case(None, Some(UCode::NOT_FOUND), None; "with commstatus")]
807 #[test_case(None, None, Some(String::from("my-token")); "with token")]
808 #[should_panic]
809 fn test_publish_message_builder_panics(
810 perm_level: Option<u32>,
811 comm_status: Option<UCode>,
812 token: Option<String>,
813 ) {
814 let topic = UUri::try_from(TOPIC).expect("should have been able to create UUri");
815 let mut builder = UMessageBuilder::publish(topic);
816 if let Some(level) = perm_level {
817 builder.with_permission_level(level);
818 } else if let Some(status_code) = comm_status {
819 builder.with_comm_status(status_code);
820 } else if let Some(t) = token {
821 builder.with_token(t);
822 }
823 }
824
825 #[test_case(Some(5), None; "with permission level")]
826 #[test_case(None, Some(String::from("my-token")); "with token")]
827 #[should_panic]
828 fn test_response_message_builder_panics(perm_level: Option<u32>, token: Option<String>) {
829 let request_id = UUID::build();
830 let method_to_invoke = UUri::try_from(METHOD_TO_INVOKE)
831 .expect("should have been able to create destination UUri");
832 let reply_to_address = UUri::try_from(REPLY_TO_ADDRESS)
833 .expect("should have been able to create reply-to UUri");
834 let mut builder = UMessageBuilder::response(reply_to_address, request_id, method_to_invoke);
835
836 if let Some(level) = perm_level {
837 builder.with_permission_level(level);
838 } else if let Some(t) = token {
839 builder.with_token(t);
840 }
841 }
842
843 #[test_case(Some(UCode::NOT_FOUND), None; "for comm status")]
844 #[should_panic]
845 fn test_request_message_builder_panics(comm_status: Option<UCode>, perm_level: Option<u32>) {
846 let method_to_invoke = UUri::try_from(METHOD_TO_INVOKE)
847 .expect("should have been able to create destination UUri");
848 let reply_to_address = UUri::try_from(REPLY_TO_ADDRESS)
849 .expect("should have been able to create reply-to UUri");
850 let mut builder = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000);
851
852 if let Some(status) = comm_status {
853 builder.with_comm_status(status);
854 } else if let Some(level) = perm_level {
855 builder.with_permission_level(level);
856 }
857 }
858
859 #[test]
860 fn test_build_supports_repeated_invocation() {
861 let topic = UUri::try_from(TOPIC).expect("should have been able to create UUri");
862 let mut builder = UMessageBuilder::publish(topic);
863 let message_one = builder
864 .with_message_id(UUID::build())
865 .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
866 .expect("should have been able to create message");
867 let message_two = builder
868 .with_message_id(UUID::build())
869 .build_with_payload("unlocked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
870 .expect("should have been able to create message");
871 assert_eq!(message_one.attributes.type_, message_two.attributes.type_);
872 assert_ne!(message_one.attributes.id, message_two.attributes.id);
873 assert_eq!(message_one.attributes.source, message_two.attributes.source);
874 assert_ne!(message_one.payload, message_two.payload);
875 }
876
877 #[test]
878 fn test_build_retains_all_publish_attributes() {
879 let message_id = UUID::build();
880 let topic = UUri::try_from(TOPIC).expect("should have been able to create UUri");
881 let message = UMessageBuilder::publish(topic.clone())
882 .with_message_id(message_id.clone())
883 .with_priority(UPriority::UPRIORITY_CS2)
884 .with_ttl(5000)
885 .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
886 .expect("should have been able to create message");
887 assert_eq!(message.attributes.id, Some(message_id).into());
888 assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS2.into());
889 assert_eq!(message.attributes.source, Some(topic).into());
890 assert_eq!(message.attributes.ttl, Some(5000));
891 assert_eq!(
892 message.attributes.type_,
893 UMessageType::UMESSAGE_TYPE_PUBLISH.into()
894 );
895 }
896
897 #[test]
898 fn test_build_retains_all_request_attributes() {
899 let message_id = UUID::build();
900 let token = String::from("token");
901 let method_to_invoke = UUri::try_from(METHOD_TO_INVOKE)
902 .expect("should have been able to create destination UUri");
903 let reply_to_address = UUri::try_from(REPLY_TO_ADDRESS)
904 .expect("should have been able to create reply-to UUri");
905 let message =
906 UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
907 .with_message_id(message_id.clone())
908 .with_permission_level(5)
909 .with_priority(UPriority::UPRIORITY_CS4)
910 .with_token(token.clone())
911 .build_with_payload("unlock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
912 .expect("should have been able to create message");
913
914 assert_eq!(message.attributes.id, Some(message_id).into());
915 assert_eq!(message.attributes.permission_level, Some(5));
916 assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into());
917 assert_eq!(message.attributes.sink, Some(method_to_invoke).into());
918 assert_eq!(message.attributes.source, Some(reply_to_address).into());
919 assert_eq!(message.attributes.token, Some(token));
920 assert_eq!(message.attributes.ttl, Some(5000));
921 assert_eq!(
922 message.attributes.type_,
923 UMessageType::UMESSAGE_TYPE_REQUEST.into()
924 );
925 }
926
927 #[test]
928 fn test_builder_copies_request_attributes() {
929 let request_message_id = UUID::build();
930 let response_message_id = UUID::build();
931 let method_to_invoke = UUri::try_from(METHOD_TO_INVOKE)
932 .expect("should have been able to create destination UUri");
933 let reply_to_address = UUri::try_from(REPLY_TO_ADDRESS)
934 .expect("should have been able to create reply-to UUri");
935 let request_message =
936 UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
937 .with_message_id(request_message_id.clone())
938 .with_priority(UPriority::UPRIORITY_CS5)
939 .build()
940 .expect("should have been able to create message");
941 let message = UMessageBuilder::response_for_request(&request_message.attributes)
942 .with_message_id(response_message_id.clone())
943 .with_comm_status(UCode::DEADLINE_EXCEEDED)
944 .with_ttl(4000)
945 .build()
946 .expect("should have been able to create message");
947 assert_eq!(message.attributes.id, Some(response_message_id).into());
948 assert_eq!(
949 message.attributes.commstatus,
950 Some(EnumOrUnknown::from(UCode::DEADLINE_EXCEEDED))
951 );
952 assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into());
953 assert_eq!(message.attributes.reqid, Some(request_message_id).into());
954 assert_eq!(message.attributes.sink, Some(reply_to_address).into());
955 assert_eq!(message.attributes.source, Some(method_to_invoke).into());
956 assert_eq!(message.attributes.ttl, Some(4000));
957 assert_eq!(
958 message.attributes.type_,
959 UMessageType::UMESSAGE_TYPE_RESPONSE.into()
960 );
961 }
962
963 #[test]
964 fn test_build_retains_all_response_attributes() {
965 let message_id = UUID::build();
966 let request_id = UUID::build();
967 let method_to_invoke = UUri::try_from(METHOD_TO_INVOKE)
968 .expect("should have been able to create destination UUri");
969 let reply_to_address = UUri::try_from(REPLY_TO_ADDRESS)
970 .expect("should have been able to create reply-to UUri");
971 let message = UMessageBuilder::response(
972 reply_to_address.clone(),
973 request_id.clone(),
974 method_to_invoke.clone(),
975 )
976 .with_message_id(message_id.clone())
977 .with_comm_status(UCode::DEADLINE_EXCEEDED)
978 .with_priority(UPriority::UPRIORITY_CS5)
979 .with_ttl(0)
980 .build()
981 .expect("should have been able to create message");
982 assert_eq!(message.attributes.id, Some(message_id).into());
983 assert_eq!(
984 message.attributes.commstatus,
985 Some(EnumOrUnknown::from(UCode::DEADLINE_EXCEEDED))
986 );
987 assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into());
988 assert_eq!(message.attributes.reqid, Some(request_id).into());
989 assert_eq!(message.attributes.sink, Some(reply_to_address).into());
990 assert_eq!(message.attributes.source, Some(method_to_invoke).into());
991 assert_eq!(message.attributes.ttl, Some(0));
992 assert_eq!(
993 message.attributes.type_,
994 UMessageType::UMESSAGE_TYPE_RESPONSE.into()
995 );
996 }
997}