up_rust/communication.rs
1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use bytes::Bytes;
15use protobuf::{well_known_types::any::Any, Message, MessageFull};
16use std::{error::Error, fmt::Display};
17
18pub use default_notifier::SimpleNotifier;
19#[cfg(feature = "usubscription")]
20pub use default_pubsub::{InMemorySubscriber, SimplePublisher};
21pub use in_memory_rpc_client::InMemoryRpcClient;
22pub use in_memory_rpc_server::InMemoryRpcServer;
23#[cfg(any(test, feature = "test-util"))]
24pub use notification::MockNotifier;
25pub use notification::{NotificationError, Notifier};
26#[cfg(any(test, feature = "test-util"))]
27pub use pubsub::MockSubscriptionChangeHandler;
28#[cfg(feature = "usubscription")]
29pub use pubsub::{PubSubError, Publisher, Subscriber};
30#[cfg(any(test, feature = "test-util"))]
31pub use rpc::{MockRequestHandler, MockRpcClient, MockRpcServerImpl};
32pub use rpc::{RequestHandler, RpcClient, RpcServer, ServiceInvocationError};
33#[cfg(feature = "udiscovery")]
34pub use udiscovery_client::RpcClientUDiscovery;
35#[cfg(feature = "usubscription")]
36pub use usubscription_client::RpcClientUSubscription;
37
38use crate::{
39 umessage::{self, UMessageError},
40 UCode, UMessage, UMessageBuilder, UPayloadFormat, UPriority, UStatus, UUID,
41};
42
43mod default_notifier;
44mod default_pubsub;
45mod in_memory_rpc_client;
46mod in_memory_rpc_server;
47mod notification;
48#[cfg(feature = "usubscription")]
49mod pubsub;
50mod rpc;
51#[cfg(feature = "udiscovery")]
52mod udiscovery_client;
53#[cfg(feature = "usubscription")]
54mod usubscription_client;
55
56/// An error indicating a problem with registering or unregistering a message listener.
57#[derive(Clone, Debug)]
58pub enum RegistrationError {
59 /// Indicates that a listener for a given address already exists.
60 AlreadyExists,
61 /// Indicates that the maximum number of listeners supported by the Transport Layer implementation
62 /// has already been registered.
63 MaxListenersExceeded,
64 /// Indicates that no listener is registered for given pattern URIs.
65 NoSuchListener,
66 /// Indicates that the underlying Transport Layer implementation does not support registration and
67 /// notification of message handlers.
68 PushDeliveryMethodNotSupported,
69 /// Indicates that some of the given filters are inappropriate in this context.
70 InvalidFilter(String),
71 /// Indicates a generic error.
72 Unknown(UStatus),
73}
74
75impl Display for RegistrationError {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 RegistrationError::AlreadyExists => {
79 f.write_str("a listener for the given filter criteria already exists")
80 }
81 RegistrationError::MaxListenersExceeded => {
82 f.write_str("maximum number of listeners has been reached")
83 }
84 RegistrationError::NoSuchListener => {
85 f.write_str("no listener registered for given pattern")
86 }
87 RegistrationError::PushDeliveryMethodNotSupported => f.write_str(
88 "the underlying transport implementation does not support the push delivery method",
89 ),
90 RegistrationError::InvalidFilter(msg) => {
91 f.write_fmt(format_args!("invalid filter(s): {}", msg))
92 }
93 RegistrationError::Unknown(status) => f.write_fmt(format_args!(
94 "error un-/registering listener: {}",
95 status.get_message()
96 )),
97 }
98 }
99}
100
101impl Error for RegistrationError {}
102
103impl From<UStatus> for RegistrationError {
104 fn from(value: UStatus) -> Self {
105 match value.code.enum_value() {
106 Ok(UCode::ALREADY_EXISTS) => RegistrationError::AlreadyExists,
107 Ok(UCode::NOT_FOUND) => RegistrationError::NoSuchListener,
108 Ok(UCode::RESOURCE_EXHAUSTED) => RegistrationError::MaxListenersExceeded,
109 Ok(UCode::UNIMPLEMENTED) => RegistrationError::PushDeliveryMethodNotSupported,
110 Ok(UCode::INVALID_ARGUMENT) => RegistrationError::InvalidFilter(value.get_message()),
111 _ => RegistrationError::Unknown(value),
112 }
113 }
114}
115
116/// General options that clients might want to specify when sending a uProtocol message.
117#[derive(Clone, Debug, PartialEq)]
118pub struct CallOptions {
119 ttl: u32,
120 message_id: Option<UUID>,
121 token: Option<String>,
122 priority: Option<UPriority>,
123}
124
125impl CallOptions {
126 /// Creates new call options for an RPC Request.
127 ///
128 /// # Arguments
129 ///
130 /// * `ttl` - The message's time-to-live in milliseconds.
131 /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
132 /// * `token` - The token to use for authenticating to infrastructure and service endpoints.
133 /// * `priority` - The message's priority or `None` to use the default priority for RPC Requests.
134 ///
135 /// # Returns
136 ///
137 /// Options suitable for invoking an RPC method.
138 ///
139 /// # Examples
140 ///
141 /// ```rust
142 /// use up_rust::{UPriority, UUID, communication::CallOptions};
143 ///
144 /// let uuid = UUID::new();
145 /// let options = CallOptions::for_rpc_request(15_000, Some(uuid.clone()), Some("token".to_string()), Some(UPriority::UPRIORITY_CS6));
146 /// assert_eq!(options.ttl(), 15_000);
147 /// assert_eq!(options.message_id(), Some(uuid));
148 /// assert_eq!(options.token(), Some("token".to_string()));
149 /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS6));
150 /// ```
151 pub fn for_rpc_request(
152 ttl: u32,
153 message_id: Option<UUID>,
154 token: Option<String>,
155 priority: Option<UPriority>,
156 ) -> Self {
157 CallOptions {
158 ttl,
159 message_id,
160 token,
161 priority,
162 }
163 }
164
165 /// Creates new call options for a Notification message.
166 ///
167 /// # Arguments
168 ///
169 /// * `ttl` - The message's time-to-live in milliseconds.
170 /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
171 /// * `priority` - The message's priority or `None` to use the default priority for Notifications.
172 ///
173 /// # Returns
174 ///
175 /// Options suitable for sending a Notification.
176 ///
177 /// # Examples
178 ///
179 /// ```rust
180 /// use up_rust::{UPriority, UUID, communication::CallOptions};
181 ///
182 /// let uuid = UUID::new();
183 /// let options = CallOptions::for_notification(Some(15_000), Some(uuid.clone()), Some(UPriority::UPRIORITY_CS2));
184 /// assert_eq!(options.ttl(), 15_000);
185 /// assert_eq!(options.message_id(), Some(uuid));
186 /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS2));
187 /// ```
188 pub fn for_notification(
189 ttl: Option<u32>,
190 message_id: Option<UUID>,
191 priority: Option<UPriority>,
192 ) -> Self {
193 CallOptions {
194 ttl: ttl.unwrap_or(0),
195 message_id,
196 token: None,
197 priority,
198 }
199 }
200
201 /// Creates new call options for a Publish message.
202 ///
203 /// # Arguments
204 ///
205 /// * `ttl` - The message's time-to-live in milliseconds or `None` if the message should not expire at all.
206 /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
207 /// * `priority` - The message's priority or `None` to use the default priority for Publish messages.
208 ///
209 /// # Returns
210 ///
211 /// Options suitable for sending a Publish message.
212 ///
213 /// # Examples
214 ///
215 /// ```rust
216 /// use up_rust::{UPriority, UUID, communication::CallOptions};
217 ///
218 /// let uuid = UUID::new();
219 /// let options = CallOptions::for_publish(Some(15_000), Some(uuid.clone()), Some(UPriority::UPRIORITY_CS2));
220 /// assert_eq!(options.ttl(), 15_000);
221 /// assert_eq!(options.message_id(), Some(uuid));
222 /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS2));
223 /// ```
224 pub fn for_publish(
225 ttl: Option<u32>,
226 message_id: Option<UUID>,
227 priority: Option<UPriority>,
228 ) -> Self {
229 CallOptions {
230 ttl: ttl.unwrap_or(0),
231 message_id,
232 token: None,
233 priority,
234 }
235 }
236
237 /// Gets the message's time-to-live in milliseconds.
238 pub fn ttl(&self) -> u32 {
239 self.ttl
240 }
241
242 /// Gets the identifier to use for the message.
243 pub fn message_id(&self) -> Option<UUID> {
244 self.message_id.clone()
245 }
246
247 /// Gets the token to use for authenticating to infrastructure and service endpoints.
248 pub fn token(&self) -> Option<String> {
249 self.token.clone()
250 }
251
252 /// Gets the message's priority.
253 pub fn priority(&self) -> Option<UPriority> {
254 self.priority
255 }
256}
257
258/// A wrapper around (raw) message payload data and the corresponding payload format.
259#[derive(Clone, Debug, PartialEq)]
260pub struct UPayload {
261 payload_format: UPayloadFormat,
262 payload: Bytes,
263}
264
265impl UPayload {
266 /// Creates a new payload for some data.
267 ///
268 /// # Examples
269 ///
270 /// ```rust
271 /// use up_rust::UPayloadFormat;
272 /// use up_rust::communication::UPayload;
273 ///
274 /// let data: Vec<u8> = vec![0x00_u8, 0x01_u8, 0x02_u8];
275 /// let payload = UPayload::new(data, UPayloadFormat::UPAYLOAD_FORMAT_RAW);
276 /// assert_eq!(payload.payload_format(), UPayloadFormat::UPAYLOAD_FORMAT_RAW);
277 /// assert_eq!(payload.payload().len(), 3);
278 /// ```
279 pub fn new<T: Into<Bytes>>(payload: T, payload_format: UPayloadFormat) -> Self {
280 UPayload {
281 payload_format,
282 payload: payload.into(),
283 }
284 }
285
286 /// Creates a new UPayload from a protobuf message.
287 ///
288 /// The resulting payload will have `UPayloadType::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`.
289 ///
290 /// # Errors
291 ///
292 /// Returns an error if the given message cannot be serialized to bytes.
293 ///
294 /// # Examples
295 ///
296 /// ```rust
297 /// use up_rust::{communication::UPayload, UPayloadFormat};
298 /// use protobuf::{well_known_types::wrappers::StringValue};
299 ///
300 /// let mut data = StringValue::new();
301 /// data.value = "hello world".to_string();
302 /// assert!(UPayload::try_from_protobuf(data).is_ok_and(|pl|
303 /// pl.payload_format() == UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
304 /// && pl.payload().len() > 0));
305 /// ```
306 pub fn try_from_protobuf<M>(message: M) -> Result<Self, UMessageError>
307 where
308 M: MessageFull,
309 {
310 Any::pack(&message)
311 .and_then(|any| any.write_to_bytes())
312 .map(|buf| UPayload::new(buf, UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY))
313 .map_err(UMessageError::DataSerializationError)
314 }
315
316 /// Gets the payload format.
317 ///
318 /// # Returns
319 ///
320 /// payload value of `UPayload`.
321 pub fn payload_format(&self) -> UPayloadFormat {
322 self.payload_format
323 }
324
325 /// Gets the payload data.
326 ///
327 /// Note that this consumes the payload.
328 pub fn payload(self) -> Bytes {
329 self.payload
330 }
331
332 /// Extracts the protobuf `Message` contained in payload.
333 ///
334 /// This function is used to extract strongly-typed data from a `UPayload` object,
335 /// taking into account the payload format (will only succeed if payload format is
336 /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`)
337 ///
338 /// # Type Parameters
339 ///
340 /// * `T`: The target type of the data to be unpacked.
341 ///
342 /// # Returns
343 ///
344 /// * `Ok(T)`: The deserialized protobuf `Message` contained in the payload.
345 ///
346 /// # Errors
347 ///
348 /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could
349 /// not be deserialized into the target type `T`.
350 ///
351 ///
352 /// # Examples
353 ///
354 /// ```rust
355 /// use up_rust::{communication::UPayload, UPayloadFormat};
356 /// use protobuf::{well_known_types::wrappers::StringValue};
357 ///
358 /// let mut data = StringValue::new();
359 /// data.value = "hello world".to_string();
360 /// let payload = UPayload::try_from_protobuf(data).expect("should be able to create UPayload from StringValue");
361 ///
362 /// let string_value: StringValue = payload.extract_protobuf().expect("should be able to extract StringValue from UPayload");
363 /// assert_eq!(string_value.value, *"hello world");
364 /// ```
365 pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
366 umessage::deserialize_protobuf_bytes(&self.payload, &self.payload_format)
367 }
368}
369
370/// Moves all common call options into the given message builder.
371///
372/// In particular, the following options are moved:
373/// * ttl
374/// * message ID
375/// * priority
376pub(crate) fn apply_common_options(
377 call_options: CallOptions,
378 message_builder: &mut UMessageBuilder,
379) {
380 message_builder.with_ttl(call_options.ttl);
381 if let Some(v) = call_options.message_id {
382 message_builder.with_message_id(v);
383 }
384 if let Some(v) = call_options.priority {
385 message_builder.with_priority(v);
386 }
387}
388
389/// Creates a message with given payload from a builder.
390pub(crate) fn build_message(
391 message_builder: &mut UMessageBuilder,
392 payload: Option<UPayload>,
393) -> Result<UMessage, UMessageError> {
394 if let Some(pl) = payload {
395 let format = pl.payload_format();
396 message_builder.build_with_payload(pl.payload, format)
397 } else {
398 message_builder.build()
399 }
400}