up_rust/
communication.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use bytes::Bytes;
15use protobuf::{well_known_types::any::Any, Message, MessageFull};
16use std::{error::Error, fmt::Display};
17
18pub use default_notifier::SimpleNotifier;
19#[cfg(feature = "usubscription")]
20pub use default_pubsub::{InMemorySubscriber, SimplePublisher};
21pub use in_memory_rpc_client::InMemoryRpcClient;
22pub use in_memory_rpc_server::InMemoryRpcServer;
23#[cfg(any(test, feature = "test-util"))]
24pub use notification::MockNotifier;
25pub use notification::{NotificationError, Notifier};
26#[cfg(any(test, feature = "test-util"))]
27pub use pubsub::MockSubscriptionChangeHandler;
28#[cfg(feature = "usubscription")]
29pub use pubsub::{PubSubError, Publisher, Subscriber};
30#[cfg(any(test, feature = "test-util"))]
31pub use rpc::{MockRequestHandler, MockRpcClient, MockRpcServerImpl};
32pub use rpc::{RequestHandler, RpcClient, RpcServer, ServiceInvocationError};
33#[cfg(feature = "udiscovery")]
34pub use udiscovery_client::RpcClientUDiscovery;
35#[cfg(feature = "usubscription")]
36pub use usubscription_client::RpcClientUSubscription;
37
38use crate::{
39    umessage::{self, UMessageError},
40    UCode, UMessage, UMessageBuilder, UPayloadFormat, UPriority, UStatus, UUID,
41};
42
43mod default_notifier;
44mod default_pubsub;
45mod in_memory_rpc_client;
46mod in_memory_rpc_server;
47mod notification;
48#[cfg(feature = "usubscription")]
49mod pubsub;
50mod rpc;
51#[cfg(feature = "udiscovery")]
52mod udiscovery_client;
53#[cfg(feature = "usubscription")]
54mod usubscription_client;
55
56/// An error indicating a problem with registering or unregistering a message listener.
57#[derive(Clone, Debug)]
58pub enum RegistrationError {
59    /// Indicates that a listener for a given address already exists.
60    AlreadyExists,
61    /// Indicates that the maximum number of listeners supported by the Transport Layer implementation
62    /// has already been registered.
63    MaxListenersExceeded,
64    /// Indicates that no listener is registered for given pattern URIs.
65    NoSuchListener,
66    /// Indicates that the underlying Transport Layer implementation does not support registration and
67    /// notification of message handlers.
68    PushDeliveryMethodNotSupported,
69    /// Indicates that some of the given filters are inappropriate in this context.
70    InvalidFilter(String),
71    /// Indicates a generic error.
72    Unknown(UStatus),
73}
74
75impl Display for RegistrationError {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            RegistrationError::AlreadyExists => {
79                f.write_str("a listener for the given filter criteria already exists")
80            }
81            RegistrationError::MaxListenersExceeded => {
82                f.write_str("maximum number of listeners has been reached")
83            }
84            RegistrationError::NoSuchListener => {
85                f.write_str("no listener registered for given pattern")
86            }
87            RegistrationError::PushDeliveryMethodNotSupported => f.write_str(
88                "the underlying transport implementation does not support the push delivery method",
89            ),
90            RegistrationError::InvalidFilter(msg) => {
91                f.write_fmt(format_args!("invalid filter(s): {}", msg))
92            }
93            RegistrationError::Unknown(status) => f.write_fmt(format_args!(
94                "error un-/registering listener: {}",
95                status.get_message()
96            )),
97        }
98    }
99}
100
101impl Error for RegistrationError {}
102
103impl From<UStatus> for RegistrationError {
104    fn from(value: UStatus) -> Self {
105        match value.code.enum_value() {
106            Ok(UCode::ALREADY_EXISTS) => RegistrationError::AlreadyExists,
107            Ok(UCode::NOT_FOUND) => RegistrationError::NoSuchListener,
108            Ok(UCode::RESOURCE_EXHAUSTED) => RegistrationError::MaxListenersExceeded,
109            Ok(UCode::UNIMPLEMENTED) => RegistrationError::PushDeliveryMethodNotSupported,
110            Ok(UCode::INVALID_ARGUMENT) => RegistrationError::InvalidFilter(value.get_message()),
111            _ => RegistrationError::Unknown(value),
112        }
113    }
114}
115
116/// General options that clients might want to specify when sending a uProtocol message.
117#[derive(Clone, Debug, PartialEq)]
118pub struct CallOptions {
119    ttl: u32,
120    message_id: Option<UUID>,
121    token: Option<String>,
122    priority: Option<UPriority>,
123}
124
125impl CallOptions {
126    /// Creates new call options for an RPC Request.
127    ///
128    /// # Arguments
129    ///
130    /// * `ttl` - The message's time-to-live in milliseconds.
131    /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
132    /// * `token` - The token to use for authenticating to infrastructure and service endpoints.
133    /// * `priority` - The message's priority or `None` to use the default priority for RPC Requests.
134    ///
135    /// # Returns
136    ///
137    /// Options suitable for invoking an RPC method.
138    ///
139    /// # Examples
140    ///
141    /// ```rust
142    /// use up_rust::{UPriority, UUID, communication::CallOptions};
143    ///
144    /// let uuid = UUID::new();
145    /// let options = CallOptions::for_rpc_request(15_000, Some(uuid.clone()), Some("token".to_string()), Some(UPriority::UPRIORITY_CS6));
146    /// assert_eq!(options.ttl(), 15_000);
147    /// assert_eq!(options.message_id(), Some(uuid));
148    /// assert_eq!(options.token(), Some("token".to_string()));
149    /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS6));
150    /// ```
151    pub fn for_rpc_request(
152        ttl: u32,
153        message_id: Option<UUID>,
154        token: Option<String>,
155        priority: Option<UPriority>,
156    ) -> Self {
157        CallOptions {
158            ttl,
159            message_id,
160            token,
161            priority,
162        }
163    }
164
165    /// Creates new call options for a Notification message.
166    ///
167    /// # Arguments
168    ///
169    /// * `ttl` - The message's time-to-live in milliseconds.
170    /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
171    /// * `priority` - The message's priority or `None` to use the default priority for Notifications.
172    ///
173    /// # Returns
174    ///
175    /// Options suitable for sending a Notification.
176    ///
177    /// # Examples
178    ///
179    /// ```rust
180    /// use up_rust::{UPriority, UUID, communication::CallOptions};
181    ///
182    /// let uuid = UUID::new();
183    /// let options = CallOptions::for_notification(Some(15_000), Some(uuid.clone()), Some(UPriority::UPRIORITY_CS2));
184    /// assert_eq!(options.ttl(), 15_000);
185    /// assert_eq!(options.message_id(), Some(uuid));
186    /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS2));
187    /// ```
188    pub fn for_notification(
189        ttl: Option<u32>,
190        message_id: Option<UUID>,
191        priority: Option<UPriority>,
192    ) -> Self {
193        CallOptions {
194            ttl: ttl.unwrap_or(0),
195            message_id,
196            token: None,
197            priority,
198        }
199    }
200
201    /// Creates new call options for a Publish message.
202    ///
203    /// # Arguments
204    ///
205    /// * `ttl` - The message's time-to-live in milliseconds or `None` if the message should not expire at all.
206    /// * `message_id` - The identifier to use for the message or `None` to use a generated identifier.
207    /// * `priority` - The message's priority or `None` to use the default priority for Publish messages.
208    ///
209    /// # Returns
210    ///
211    /// Options suitable for sending a Publish message.
212    ///
213    /// # Examples
214    ///
215    /// ```rust
216    /// use up_rust::{UPriority, UUID, communication::CallOptions};
217    ///
218    /// let uuid = UUID::new();
219    /// let options = CallOptions::for_publish(Some(15_000), Some(uuid.clone()), Some(UPriority::UPRIORITY_CS2));
220    /// assert_eq!(options.ttl(), 15_000);
221    /// assert_eq!(options.message_id(), Some(uuid));
222    /// assert_eq!(options.priority(), Some(UPriority::UPRIORITY_CS2));
223    /// ```
224    pub fn for_publish(
225        ttl: Option<u32>,
226        message_id: Option<UUID>,
227        priority: Option<UPriority>,
228    ) -> Self {
229        CallOptions {
230            ttl: ttl.unwrap_or(0),
231            message_id,
232            token: None,
233            priority,
234        }
235    }
236
237    /// Gets the message's time-to-live in milliseconds.
238    pub fn ttl(&self) -> u32 {
239        self.ttl
240    }
241
242    /// Gets the identifier to use for the message.
243    pub fn message_id(&self) -> Option<UUID> {
244        self.message_id.clone()
245    }
246
247    /// Gets the token to use for authenticating to infrastructure and service endpoints.
248    pub fn token(&self) -> Option<String> {
249        self.token.clone()
250    }
251
252    /// Gets the message's priority.
253    pub fn priority(&self) -> Option<UPriority> {
254        self.priority
255    }
256}
257
258/// A wrapper around (raw) message payload data and the corresponding payload format.
259#[derive(Clone, Debug, PartialEq)]
260pub struct UPayload {
261    payload_format: UPayloadFormat,
262    payload: Bytes,
263}
264
265impl UPayload {
266    /// Creates a new payload for some data.
267    ///
268    /// # Examples
269    ///
270    /// ```rust
271    /// use up_rust::UPayloadFormat;
272    /// use up_rust::communication::UPayload;
273    ///
274    /// let data: Vec<u8> = vec![0x00_u8, 0x01_u8, 0x02_u8];
275    /// let payload = UPayload::new(data, UPayloadFormat::UPAYLOAD_FORMAT_RAW);
276    /// assert_eq!(payload.payload_format(), UPayloadFormat::UPAYLOAD_FORMAT_RAW);
277    /// assert_eq!(payload.payload().len(), 3);
278    /// ```
279    pub fn new<T: Into<Bytes>>(payload: T, payload_format: UPayloadFormat) -> Self {
280        UPayload {
281            payload_format,
282            payload: payload.into(),
283        }
284    }
285
286    /// Creates a new UPayload from a protobuf message.
287    ///
288    /// The resulting payload will have `UPayloadType::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the given message cannot be serialized to bytes.
293    ///
294    /// # Examples
295    ///
296    /// ```rust
297    /// use up_rust::{communication::UPayload, UPayloadFormat};
298    /// use protobuf::{well_known_types::wrappers::StringValue};
299    ///
300    /// let mut data = StringValue::new();
301    /// data.value = "hello world".to_string();
302    /// assert!(UPayload::try_from_protobuf(data).is_ok_and(|pl|
303    ///     pl.payload_format() == UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY
304    ///         && pl.payload().len() > 0));
305    /// ```
306    pub fn try_from_protobuf<M>(message: M) -> Result<Self, UMessageError>
307    where
308        M: MessageFull,
309    {
310        Any::pack(&message)
311            .and_then(|any| any.write_to_bytes())
312            .map(|buf| UPayload::new(buf, UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY))
313            .map_err(UMessageError::DataSerializationError)
314    }
315
316    /// Gets the payload format.
317    ///
318    /// # Returns
319    ///
320    /// payload value of `UPayload`.
321    pub fn payload_format(&self) -> UPayloadFormat {
322        self.payload_format
323    }
324
325    /// Gets the payload data.
326    ///
327    /// Note that this consumes the payload.
328    pub fn payload(self) -> Bytes {
329        self.payload
330    }
331
332    /// Extracts the protobuf `Message` contained in payload.
333    ///
334    /// This function is used to extract strongly-typed data from a `UPayload` object,
335    /// taking into account the payload format (will only succeed if payload format is
336    /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`)
337    ///
338    /// # Type Parameters
339    ///
340    /// * `T`: The target type of the data to be unpacked.
341    ///
342    /// # Returns
343    ///
344    /// * `Ok(T)`: The deserialized protobuf `Message` contained in the payload.
345    ///
346    /// # Errors
347    ///
348    /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could
349    ///   not be deserialized into the target type `T`.
350    ///
351    ///
352    /// # Examples
353    ///
354    /// ```rust
355    /// use up_rust::{communication::UPayload, UPayloadFormat};
356    /// use protobuf::{well_known_types::wrappers::StringValue};
357    ///
358    /// let mut data = StringValue::new();
359    /// data.value = "hello world".to_string();
360    /// let payload = UPayload::try_from_protobuf(data).expect("should be able to create UPayload from StringValue");
361    ///
362    /// let string_value: StringValue = payload.extract_protobuf().expect("should be able to extract StringValue from UPayload");
363    /// assert_eq!(string_value.value, *"hello world");
364    /// ```
365    pub fn extract_protobuf<T: MessageFull + Default>(&self) -> Result<T, UMessageError> {
366        umessage::deserialize_protobuf_bytes(&self.payload, &self.payload_format)
367    }
368}
369
370/// Moves all common call options into the given message builder.
371///
372/// In particular, the following options are moved:
373/// * ttl
374/// * message ID
375/// * priority
376pub(crate) fn apply_common_options(
377    call_options: CallOptions,
378    message_builder: &mut UMessageBuilder,
379) {
380    message_builder.with_ttl(call_options.ttl);
381    if let Some(v) = call_options.message_id {
382        message_builder.with_message_id(v);
383    }
384    if let Some(v) = call_options.priority {
385        message_builder.with_priority(v);
386    }
387}
388
389/// Creates a message with given payload from a builder.
390pub(crate) fn build_message(
391    message_builder: &mut UMessageBuilder,
392    payload: Option<UPayload>,
393) -> Result<UMessage, UMessageError> {
394    if let Some(pl) = payload {
395        let format = pl.payload_format();
396        message_builder.build_with_payload(pl.payload, format)
397    } else {
398        message_builder.build()
399    }
400}