1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/********************************************************************************
 * Copyright (c) 2024 Contributors to the Eclipse Foundation
 *
 * See the NOTICE file(s) distributed with this work for additional
 * information regarding copyright ownership.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Apache License Version 2.0 which is available at
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * SPDX-License-Identifier: Apache-2.0
 ********************************************************************************/

use std::{error::Error, fmt::Display, sync::Arc};

use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;

use crate::communication::RegistrationError;
use crate::core::usubscription::SubscriptionStatus;
use crate::{UListener, UStatus, UUri};

use super::{CallOptions, UPayload};

/// An error indicating a problem with publishing a message to a topic.
#[derive(Debug)]
pub enum PubSubError {
    /// Indicates that the given message cannot be sent because it is not a [valid Publish message](crate::PublishValidator).
    InvalidArgument(String),
    /// Indicates an unspecific error that occurred at the Transport Layer while trying to publish a message.
    PublishError(UStatus),
}

impl Display for PubSubError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            PubSubError::InvalidArgument(s) => f.write_str(s.as_str()),
            PubSubError::PublishError(s) => {
                f.write_fmt(format_args!("failed to publish message: {}", s))
            }
        }
    }
}

impl Error for PubSubError {}

/// A client for publishing messages to a topic.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait Publisher: Send + Sync {
    /// Publishes a message to a topic.
    ///
    /// # Arguments
    ///
    /// * `resource_id` - The (local) resource ID of the topic to publish to.
    /// * `call_options` - Options to include in the published message.
    /// * `payload` - Payload to include in the published message.
    ///
    /// # Errors
    ///
    /// Returns an error if the message could not be published.
    async fn publish(
        &self,
        resource_id: u16,
        call_options: CallOptions,
        payload: Option<UPayload>,
    ) -> Result<(), PubSubError>;
}

#[cfg_attr(test, automock)]
pub trait SubscriptionChangeHandler: Send + Sync {
    /// Invoked for each update to the subscription status for a given topic.
    ///
    /// Implementations must not block the current thread.
    ///
    /// # Arguments
    ///
    /// * `topic` - The topic for which the subscription status has changed.
    /// * `status` - The new status of the subscription.
    fn on_subscription_change(&self, topic: UUri, new_status: SubscriptionStatus);
}

/// A client for subscribing to topics.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait Subscriber: Send + Sync {
    /// Registers a handler to invoke for messages that have been published to a given topic.
    ///
    /// More than one handler can be registered for the same topic.
    /// The same handler can be registered for multiple topics.
    ///
    /// # Arguments
    ///
    /// * `topic` - The topic to subscribe to. The topic must not contain any wildcards.
    /// * `handler` - The handler to invoke for each message that has been published to the topic.
    /// * `subscription_change_handler` - A handler to invoke for any subscription state changes for
    ///                                   the given topic.
    ///
    /// # Errors
    ///
    /// Returns an error if the listener cannot be registered.
    async fn subscribe(
        &self,
        topic: &UUri,
        handler: Arc<dyn UListener>,
        subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
    ) -> Result<(), RegistrationError>;

    /// Unregisters a previously [registered handler](`Self::subscribe`).
    ///
    /// # Arguments
    ///
    /// * `topic` - The topic that the handler had been registered for.
    /// * `handler` - The handler to unregister.
    ///
    /// # Errors
    ///
    /// Returns an error if the listener cannot be unregistered.
    async fn unsubscribe(
        &self,
        topic: &UUri,
        handler: Arc<dyn UListener>,
    ) -> Result<(), RegistrationError>;
}