up_rust/communication/
pubsub.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::{error::Error, fmt::Display, sync::Arc};
15
16use async_trait::async_trait;
17
18use crate::communication::RegistrationError;
19use crate::core::usubscription::SubscriptionStatus;
20use crate::{UListener, UStatus, UUri};
21
22use super::{CallOptions, UPayload};
23
24/// An error indicating a problem with publishing a message to a topic.
25// [impl->dsn~communication-layer-api-declaration~1]
26#[derive(Debug)]
27pub enum PubSubError {
28    /// Indicates that the given message cannot be sent because it is not a [valid Publish message](crate::PublishValidator).
29    InvalidArgument(String),
30    /// Indicates an unspecific error that occurred at the Transport Layer while trying to publish a message.
31    PublishError(UStatus),
32}
33
34#[cfg(not(tarpaulin_include))]
35impl Display for PubSubError {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            PubSubError::InvalidArgument(s) => f.write_str(s.as_str()),
39            PubSubError::PublishError(s) => {
40                f.write_fmt(format_args!("failed to publish message: {s}"))
41            }
42        }
43    }
44}
45
46impl Error for PubSubError {}
47
48/// A client for publishing messages to a topic.
49///
50/// Please refer to the
51/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
52// [impl->dsn~communication-layer-api-declaration~1]
53#[async_trait]
54pub trait Publisher: Send + Sync {
55    /// Publishes a message to a topic.
56    ///
57    /// # Arguments
58    ///
59    /// * `resource_id` - The (local) resource ID of the topic to publish to.
60    /// * `call_options` - Options to include in the published message.
61    /// * `payload` - Payload to include in the published message.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if the message could not be published.
66    async fn publish(
67        &self,
68        resource_id: u16,
69        call_options: CallOptions,
70        payload: Option<UPayload>,
71    ) -> Result<(), PubSubError>;
72}
73
74// [impl->dsn~communication-layer-api-declaration~1]
75#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
76pub trait SubscriptionChangeHandler: Send + Sync {
77    /// Invoked for each update to the subscription status for a given topic.
78    ///
79    /// Implementations must not block the current thread.
80    ///
81    /// # Arguments
82    ///
83    /// * `topic` - The topic for which the subscription status has changed.
84    /// * `status` - The new status of the subscription.
85    fn on_subscription_change(&self, topic: UUri, new_status: SubscriptionStatus);
86}
87
88/// A client for subscribing to topics.
89///
90/// Please refer to the
91/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
92// [impl->dsn~communication-layer-api-declaration~1]
93#[async_trait]
94pub trait Subscriber: Send + Sync {
95    /// Registers a handler to invoke for messages that have been published to a given topic.
96    ///
97    /// More than one handler can be registered for the same topic.
98    /// The same handler can be registered for multiple topics.
99    ///
100    /// # Arguments
101    ///
102    /// * `topic` - The topic to subscribe to. The topic must not contain any wildcards.
103    /// * `handler` - The handler to invoke for each message that has been published to the topic.
104    /// * `subscription_change_handler` - A handler to invoke for any subscription state changes for
105    ///                                   the given topic.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the listener cannot be registered.
110    async fn subscribe(
111        &self,
112        topic: &UUri,
113        handler: Arc<dyn UListener>,
114        subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
115    ) -> Result<(), RegistrationError>;
116
117    /// Deregisters a previously [registered handler](`Self::subscribe`).
118    ///
119    /// # Arguments
120    ///
121    /// * `topic` - The topic that the handler had been registered for.
122    /// * `handler` - The handler to unregister.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the listener cannot be unregistered.
127    async fn unsubscribe(
128        &self,
129        topic: &UUri,
130        handler: Arc<dyn UListener>,
131    ) -> Result<(), RegistrationError>;
132}