up_rust/communication/pubsub.rs
1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::{error::Error, fmt::Display, sync::Arc};
15
16use async_trait::async_trait;
17
18use crate::communication::RegistrationError;
19use crate::core::usubscription::SubscriptionStatus;
20use crate::{UListener, UStatus, UUri};
21
22use super::{CallOptions, UPayload};
23
24/// An error indicating a problem with publishing a message to a topic.
25// [impl->dsn~communication-layer-api-declaration~1]
26#[derive(Debug)]
27pub enum PubSubError {
28 /// Indicates that the given message cannot be sent because it is not a [valid Publish message](crate::PublishValidator).
29 InvalidArgument(String),
30 /// Indicates an unspecific error that occurred at the Transport Layer while trying to publish a message.
31 PublishError(UStatus),
32}
33
34#[cfg(not(tarpaulin_include))]
35impl Display for PubSubError {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 PubSubError::InvalidArgument(s) => f.write_str(s.as_str()),
39 PubSubError::PublishError(s) => {
40 f.write_fmt(format_args!("failed to publish message: {s}"))
41 }
42 }
43 }
44}
45
46impl Error for PubSubError {}
47
48/// A client for publishing messages to a topic.
49///
50/// Please refer to the
51/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
52// [impl->dsn~communication-layer-api-declaration~1]
53#[async_trait]
54pub trait Publisher: Send + Sync {
55 /// Publishes a message to a topic.
56 ///
57 /// # Arguments
58 ///
59 /// * `resource_id` - The (local) resource ID of the topic to publish to.
60 /// * `call_options` - Options to include in the published message.
61 /// * `payload` - Payload to include in the published message.
62 ///
63 /// # Errors
64 ///
65 /// Returns an error if the message could not be published.
66 async fn publish(
67 &self,
68 resource_id: u16,
69 call_options: CallOptions,
70 payload: Option<UPayload>,
71 ) -> Result<(), PubSubError>;
72}
73
74// [impl->dsn~communication-layer-api-declaration~1]
75#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
76pub trait SubscriptionChangeHandler: Send + Sync {
77 /// Invoked for each update to the subscription status for a given topic.
78 ///
79 /// Implementations must not block the current thread.
80 ///
81 /// # Arguments
82 ///
83 /// * `topic` - The topic for which the subscription status has changed.
84 /// * `status` - The new status of the subscription.
85 fn on_subscription_change(&self, topic: UUri, new_status: SubscriptionStatus);
86}
87
88/// A client for subscribing to topics.
89///
90/// Please refer to the
91/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
92// [impl->dsn~communication-layer-api-declaration~1]
93#[async_trait]
94pub trait Subscriber: Send + Sync {
95 /// Registers a handler to invoke for messages that have been published to a given topic.
96 ///
97 /// More than one handler can be registered for the same topic.
98 /// The same handler can be registered for multiple topics.
99 ///
100 /// # Arguments
101 ///
102 /// * `topic` - The topic to subscribe to. The topic must not contain any wildcards.
103 /// * `handler` - The handler to invoke for each message that has been published to the topic.
104 /// * `subscription_change_handler` - A handler to invoke for any subscription state changes for
105 /// the given topic.
106 ///
107 /// # Errors
108 ///
109 /// Returns an error if the listener cannot be registered.
110 async fn subscribe(
111 &self,
112 topic: &UUri,
113 handler: Arc<dyn UListener>,
114 subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
115 ) -> Result<(), RegistrationError>;
116
117 /// Deregisters a previously [registered handler](`Self::subscribe`).
118 ///
119 /// # Arguments
120 ///
121 /// * `topic` - The topic that the handler had been registered for.
122 /// * `handler` - The handler to unregister.
123 ///
124 /// # Errors
125 ///
126 /// Returns an error if the listener cannot be unregistered.
127 async fn unsubscribe(
128 &self,
129 topic: &UUri,
130 handler: Arc<dyn UListener>,
131 ) -> Result<(), RegistrationError>;
132}