hedwig/backends/googlepubsub/
mod.rs1#![macro_use]
4
5use std::{borrow::Cow, fmt::Display};
6
7pub use ya_gcp::{
8 self as gcp,
9 grpc::StatusCodeSet,
10 pubsub::{
11 AcknowledgeError, AcknowledgeToken, BuildError, Error as PubSubError,
12 ModifyAcknowledgeError, PubSubConfig, PubSubRetryCheck, SinkError,
13 StreamSubscriptionConfig, Uri,
14 },
15 retry_policy, AuthFlow, ClientBuilderConfig, CreateBuilderError, ServiceAccountAuth,
16};
17
18type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
19
20macro_rules! match_fields {
25 (
26 $target:path =>
27
28 $(#[$struct_attr:meta])*
29 pub struct $struct_name:ident $(<$struct_generics:tt>)? {
30 $(
31 $(#[$field_attr:meta])*
32 pub $field_name:ident : $field_type:ty,
33 )*$(,)?
34
35 @except:
38 $(
39 $target_except_field:ident,
40 )*$(,)?
41 }
42 ) => {
43 $(#[$struct_attr])*
44 #[cfg_attr(docsrs, cfg_attr(docsrs,
46 doc = "", doc = concat!("This is a more ergonomic wrapper over [`", stringify!($target), "`]")
48 ))]
49 #[cfg_attr(not(docsrs), allow(missing_docs))]
50 pub struct $struct_name $(<$struct_generics>)? {
51 $(
52 #[cfg_attr(docsrs, cfg_attr(docsrs, doc = concat!(
53 "See [`", stringify!($field_name), "`]",
54 "(", stringify!($target), "::", stringify!($field_name), ")"
55 )))]
56 $(#[$field_attr])*
57 pub $field_name : $field_type,
58 )*
59 }
60
61 impl$(<$struct_generics>)? $struct_name $(<$struct_generics>)? {
62 const _MATCH_CHECK: () = {
63 match None {
64 Some($target {
65 $(
66 $field_name: _,
67 )*
68 $(
69 $target_except_field: _,
70 )*
71 ..
72 }) => {},
73 None => {}
74 };
75 };
76 }
77 };
78}
79
80mod consumer;
81mod publisher;
82
83pub use consumer::*;
84pub use publisher::*;
85
86#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
91pub struct TopicName<'s>(Cow<'s, str>);
92
93impl<'s> TopicName<'s> {
94 pub fn new(name: impl Into<Cow<'s, str>>) -> Self {
96 Self(name.into())
97 }
98
99 fn into_project_topic_name(
101 self,
102 project_name: impl Display,
103 ) -> ya_gcp::pubsub::ProjectTopicName {
104 ya_gcp::pubsub::ProjectTopicName::new(
105 project_name,
106 std::format_args!("hedwig-{topic}", topic = self.0),
107 )
108 }
109}
110
111pub struct ClientBuilder {
116 inner: ya_gcp::ClientBuilder,
117 pubsub_config: PubSubConfig,
118}
119
120impl ClientBuilder {
121 pub async fn new(
124 config: ClientBuilderConfig,
125 pubsub_config: PubSubConfig,
126 ) -> Result<Self, CreateBuilderError> {
127 Ok(ClientBuilder {
128 inner: ya_gcp::ClientBuilder::new(config).await?,
129 pubsub_config,
130 })
131 }
132}
133
134impl ClientBuilder {
135 pub async fn build_consumer(
138 &self,
139 project: impl Into<String>,
140 queue: impl Into<String>,
141 ) -> Result<ConsumerClient, BuildError> {
142 Ok(ConsumerClient::from_client(
143 self.inner
144 .build_pubsub_subscriber(self.pubsub_config.clone())
145 .await?,
146 project.into(),
147 queue.into(),
148 ))
149 }
150
151 pub async fn build_publisher(
157 &self,
158 project: impl Into<String>,
159 publisher_id: impl Into<String>,
160 ) -> Result<PublisherClient, BuildError> {
161 Ok(PublisherClient::from_client(
162 self.inner
163 .build_pubsub_publisher(self.pubsub_config.clone())
164 .await?,
165 project.into(),
166 publisher_id.into(),
167 ))
168 }
169}