hedwig/backends/googlepubsub/
mod.rs

1//! Adapters for using GCP's PubSub as a message service for hedwig
2
3#![macro_use]
4
5use std::{borrow::Cow, fmt::Display};
6
7pub use ya_gcp::{
8    self as gcp,
9    grpc::StatusCodeSet,
10    pubsub::{
11        AcknowledgeError, AcknowledgeToken, BuildError, Error as PubSubError,
12        ModifyAcknowledgeError, PubSubConfig, PubSubRetryCheck, SinkError,
13        StreamSubscriptionConfig, Uri,
14    },
15    retry_policy, AuthFlow, ClientBuilderConfig, CreateBuilderError, ServiceAccountAuth,
16};
17
18type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
19
20/// Create a new struct with the same fields as another struct, with the annotated exceptions
21///
22/// This is used to create a narrowed-down API type, with irrelevant fields removed and other fields
23/// replaced with richer types.
24macro_rules! match_fields {
25    (
26        $target:path =>
27
28        $(#[$struct_attr:meta])*
29        pub struct $struct_name:ident $(<$struct_generics:tt>)? {
30            $(
31                $(#[$field_attr:meta])*
32                pub $field_name:ident : $field_type:ty,
33            )*$(,)?
34
35            // fields which exist in the target but not in the struct.
36            // used to ensure names are listed exhaustively
37            @except:
38            $(
39                $target_except_field:ident,
40            )*$(,)?
41        }
42    ) => {
43        $(#[$struct_attr])*
44        // nested cfg_attr prevents older compilers from parsing the new doc = EXPR syntax
45        #[cfg_attr(docsrs, cfg_attr(docsrs,
46            doc = "", // newline
47            doc = concat!("This is a more ergonomic wrapper over [`", stringify!($target), "`]")
48        ))]
49        #[cfg_attr(not(docsrs), allow(missing_docs))]
50        pub struct $struct_name $(<$struct_generics>)? {
51            $(
52                #[cfg_attr(docsrs, cfg_attr(docsrs, doc = concat!(
53                    "See [`", stringify!($field_name), "`]",
54                    "(", stringify!($target), "::", stringify!($field_name), ")"
55                )))]
56                $(#[$field_attr])*
57                pub $field_name : $field_type,
58            )*
59        }
60
61        impl$(<$struct_generics>)? $struct_name $(<$struct_generics>)? {
62            const _MATCH_CHECK: () = {
63                match None {
64                    Some($target {
65                        $(
66                            $field_name: _,
67                        )*
68                        $(
69                            $target_except_field: _,
70                        )*
71                        ..
72                    }) => {},
73                    None => {}
74                };
75            };
76        }
77    };
78}
79
80mod consumer;
81mod publisher;
82
83pub use consumer::*;
84pub use publisher::*;
85
86/// A PubSub topic name.
87///
88/// This will be used to internally construct the expected
89/// `projects/{project}/topics/hedwig-{topic}` format for API calls
90#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
91pub struct TopicName<'s>(Cow<'s, str>);
92
93impl<'s> TopicName<'s> {
94    /// Create a new `TopicName`
95    pub fn new(name: impl Into<Cow<'s, str>>) -> Self {
96        Self(name.into())
97    }
98
99    /// Construct a full project and topic name with this name
100    fn into_project_topic_name(
101        self,
102        project_name: impl Display,
103    ) -> ya_gcp::pubsub::ProjectTopicName {
104        ya_gcp::pubsub::ProjectTopicName::new(
105            project_name,
106            std::format_args!("hedwig-{topic}", topic = self.0),
107        )
108    }
109}
110
111/// A builder used to create [`ConsumerClient`] and [`PublisherClient`] instances
112///
113/// Note that the builder is not consumed when creating clients, and many clients can be built
114/// using the same builder. This may allow some resource re-use across the clients
115pub struct ClientBuilder {
116    inner: ya_gcp::ClientBuilder,
117    pubsub_config: PubSubConfig,
118}
119
120impl ClientBuilder {
121    /// Create a new client builder using the default HTTPS connector based on the crate's
122    /// enabled features
123    pub async fn new(
124        config: ClientBuilderConfig,
125        pubsub_config: PubSubConfig,
126    ) -> Result<Self, CreateBuilderError> {
127        Ok(ClientBuilder {
128            inner: ya_gcp::ClientBuilder::new(config).await?,
129            pubsub_config,
130        })
131    }
132}
133
134impl ClientBuilder {
135    /// Create a new [`ConsumerClient`] for consuming messages from PubSub subscriptions within the
136    /// given project, identified by the given queue name.
137    pub async fn build_consumer(
138        &self,
139        project: impl Into<String>,
140        queue: impl Into<String>,
141    ) -> Result<ConsumerClient, BuildError> {
142        Ok(ConsumerClient::from_client(
143            self.inner
144                .build_pubsub_subscriber(self.pubsub_config.clone())
145                .await?,
146            project.into(),
147            queue.into(),
148        ))
149    }
150
151    /// Create a new [`PublisherClient`] for publishing messages to PubSub topics within the given
152    /// project.
153    ///
154    /// Each published message will have an attribute labelling the publisher with the given
155    /// identifier.
156    pub async fn build_publisher(
157        &self,
158        project: impl Into<String>,
159        publisher_id: impl Into<String>,
160    ) -> Result<PublisherClient, BuildError> {
161        Ok(PublisherClient::from_client(
162            self.inner
163                .build_pubsub_publisher(self.pubsub_config.clone())
164                .await?,
165            project.into(),
166            publisher_id.into(),
167        ))
168    }
169}