ya_gcp/pubsub/mod.rs
1//! An API for interacting with the [Pub/Sub](https://cloud.google.com/pubsub) message service.
2//!
3//! Publishing and topic management is done through the [`PublisherClient`], while reading data and
4//! subscription management is done through the [`SubscriberClient`].
5
6use crate::grpc::{Body, BoxBody, Bytes, DefaultGrpcImpl, GrpcService, StdError};
7use crate::retry_policy::RetryPredicate;
8use std::fmt::Display;
9use tracing::debug_span;
10
11// alias Status as this module's error type
12pub use tonic::Status as Error;
13
14pub use client_builder::{BuildError, MakeConnection, PubSubConfig, Uri};
15pub use publish_sink::{PublishConfig, PublishError, PublishTopicSink, SinkError};
16pub use streaming_subscription::{
17 AcknowledgeError, AcknowledgeToken, ModifyAcknowledgeError, StreamSubscription,
18 StreamSubscriptionConfig,
19};
20
21pub(crate) mod client_builder;
22mod publish_sink;
23mod streaming_subscription;
24
25#[cfg(feature = "emulators")]
26#[cfg_attr(docsrs, doc(cfg(feature = "emulators")))]
27pub mod emulator;
28
29/// Types and functions generated from PubSub's gRPC schema
30#[allow(
31 rustdoc::broken_intra_doc_links,
32 rustdoc::bare_urls,
33 missing_docs,
34 unreachable_pub
35)]
36pub mod api {
37 include!("../generated/google.pubsub.v1.rs");
38
39 // re-exports of prost types used within the generated code for convenience
40 pub use prost_types::{Duration, FieldMask, Timestamp};
41}
42
43/// A client through which pubsub messages are sent, and topics are managed. Created
44/// from the [`build_pubsub_publisher`](crate::builder::ClientBuilder::build_pubsub_publisher)
45/// function.
46///
47/// This builds on top of the raw [gRPC publisher API](api::publisher_client::PublisherClient)
48/// to provide more ergonomic functionality
49#[derive(Debug, Clone)]
50pub struct PublisherClient<S = DefaultGrpcImpl> {
51 inner: api::publisher_client::PublisherClient<S>,
52}
53
54impl<S> PublisherClient<S> {
55 /// Manually construct a new client.
56 ///
57 /// There are limited circumstances in which this is useful; consider instead using the builder
58 /// function [crate::builder::ClientBuilder::build_pubsub_publisher]
59 pub fn from_raw_api(client: api::publisher_client::PublisherClient<S>) -> Self {
60 PublisherClient { inner: client }
61 }
62
63 /// Access the underlying grpc api
64 pub fn raw_api(&self) -> &api::publisher_client::PublisherClient<S> {
65 &self.inner
66 }
67
68 /// Mutably access the underlying grpc api
69 pub fn raw_api_mut(&mut self) -> &mut api::publisher_client::PublisherClient<S> {
70 &mut self.inner
71 }
72}
73
74impl<S> PublisherClient<S>
75where
76 S: GrpcService<BoxBody> + Clone,
77 S::Error: Into<StdError>,
78 S::ResponseBody: Body<Data = Bytes> + Send + 'static,
79 <S::ResponseBody as Body>::Error: Into<StdError> + Send,
80{
81 /// Create a sink which will publish [messages](api::PubsubMessage) to the given topic.
82 ///
83 /// See the type's [documentation](PublishTopicSink) for more details.
84 pub fn publish_topic_sink(
85 &mut self,
86 topic: ProjectTopicName,
87 config: PublishConfig,
88 ) -> PublishTopicSink<S> {
89 PublishTopicSink::new(self.inner.clone(), topic, config)
90 }
91}
92
93/// A client through which pubsub messages are consumed, and subscriptions are managed. Created
94/// from the [`build_pubsub_subscriber`](crate::builder::ClientBuilder::build_pubsub_subscriber)
95/// function.
96///
97/// This is an interface built on top of the raw [gRPC subscriber
98/// API](api::subscriber_client::SubscriberClient) which provides more ergonomic functionality
99#[derive(Debug, Clone)]
100pub struct SubscriberClient<S = DefaultGrpcImpl> {
101 inner: api::subscriber_client::SubscriberClient<S>,
102}
103
104impl<S> SubscriberClient<S> {
105 /// Manually construct a new client.
106 ///
107 /// There are limited circumstances in which this is useful; consider instead using the builder
108 /// function [crate::builder::ClientBuilder::build_pubsub_subscriber]
109 pub fn from_raw_api(client: api::subscriber_client::SubscriberClient<S>) -> Self {
110 Self { inner: client }
111 }
112
113 /// Access the underlying grpc api
114 pub fn raw_api(&self) -> &api::subscriber_client::SubscriberClient<S> {
115 &self.inner
116 }
117
118 /// Mutably access the underlying grpc api
119 pub fn raw_api_mut(&mut self) -> &mut api::subscriber_client::SubscriberClient<S> {
120 &mut self.inner
121 }
122}
123
124impl<S> SubscriberClient<S>
125where
126 S: GrpcService<BoxBody> + Clone,
127 S::Error: Into<StdError>,
128 S::ResponseBody: Body<Data = Bytes> + Send + 'static,
129 <S::ResponseBody as Body>::Error: Into<StdError> + Send,
130{
131 /// Start a streaming subscription with the pubsub service.
132 ///
133 /// The returned stream will yield [messages](api::PubsubMessage) along with corresponding
134 /// [`AcknowledgeTokens`](AcknowledgeToken), used to control message re-delivery.
135 pub fn stream_subscription(
136 &mut self,
137 subscription: ProjectSubscriptionName,
138 config: StreamSubscriptionConfig,
139 ) -> StreamSubscription<S> {
140 let sub_name: String = subscription.clone().into();
141 let span = debug_span!("create_subscription", topic = sub_name);
142 let _guard = span.enter();
143 StreamSubscription::new(
144 // As of the ack handler changes, the streaming implementation needs more than one
145 // client. The obvious approach would be to clone the client as necessary. However
146 // adding a Clone bound is a major semver change, and for downstream-simplification
147 // reasons we'd like to avoid that.
148 //
149 // Fortunately, there already *was* a clone bound on this `stream_subscription`
150 // function, which is the only public way to construct the stream. Also fortunately we
151 // only need a static number of clients and not arbitrary clones, so we can clone here
152 // and pass an array down there. This isn't *pretty*, but it works
153 //
154 // TODO(0.12.0) just add the clone bound
155 [
156 self.inner.clone(),
157 self.inner.clone(),
158 self.inner.clone(),
159 self.inner.clone(),
160 ],
161 subscription.into(),
162 config,
163 )
164 }
165}
166
167/// A project and subscription name combined in a format expected by API calls,
168///
169/// ```
170/// use ya_gcp::pubsub::ProjectSubscriptionName;
171///
172/// assert_eq!(
173/// String::from(ProjectSubscriptionName::new(
174/// "my-project",
175/// "my-subscription"
176/// )),
177/// "projects/my-project/subscriptions/my-subscription".to_string(),
178/// );
179/// ```
180#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
181pub struct ProjectSubscriptionName(String);
182
183impl ProjectSubscriptionName {
184 /// Create a new `ProjectSubscriptionName` from the given project and subscription names
185 pub fn new(project_name: impl Display, subscription_name: impl Display) -> Self {
186 Self(format!(
187 "projects/{project}/subscriptions/{subscription}",
188 project = project_name,
189 subscription = subscription_name
190 ))
191 }
192}
193
194impl From<ProjectSubscriptionName> for String {
195 fn from(from: ProjectSubscriptionName) -> String {
196 from.0
197 }
198}
199
200impl std::fmt::Display for ProjectSubscriptionName {
201 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
202 std::fmt::Display::fmt(&self.0, f)
203 }
204}
205
206/// A project and topic name combined in a format expected by API calls
207///
208/// ```
209/// use ya_gcp::pubsub::ProjectTopicName;
210///
211/// assert_eq!(
212/// String::from(ProjectTopicName::new("my-project", "my-topic")),
213/// "projects/my-project/topics/my-topic".to_string(),
214/// );
215/// ```
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct ProjectTopicName(String);
218
219impl ProjectTopicName {
220 /// Create a new `ProjectTopicName` from the given project and topic names
221 pub fn new(project_name: impl Display, topic_name: impl Display) -> Self {
222 Self(format!(
223 "projects/{project}/topics/{topic}",
224 project = project_name,
225 topic = topic_name,
226 ))
227 }
228}
229
230impl From<ProjectTopicName> for String {
231 fn from(from: ProjectTopicName) -> String {
232 from.0
233 }
234}
235
236impl std::fmt::Display for ProjectTopicName {
237 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
238 std::fmt::Display::fmt(&self.0, f)
239 }
240}
241
242/// The default [`RetryPredicate`] used for errors from PubSub operations
243#[derive(Debug, Default, Clone)]
244pub struct PubSubRetryCheck {
245 _priv: (),
246}
247
248impl PubSubRetryCheck {
249 /// Create a new instance with default settings
250 pub fn new() -> Self {
251 Self { _priv: () }
252 }
253}
254
255impl RetryPredicate<Error> for PubSubRetryCheck {
256 fn is_retriable(&self, error: &Error) -> bool {
257 use tonic::Code;
258
259 // this error code check is based on the ones used in the Java and Go pubsub client libs:
260 // https://github.com/googleapis/java-pubsub/blob/d969e8925edc3401e6eb534699ce0351a5f0b20b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java#L33
261 // https://github.com/googleapis/google-cloud-go/blob/ac9924157f35a00ff9d1e6ece9a7e0f12fc60226/pubsub/service.go#L51
262 // Go doesn't retry on cancelled, but Java does; Java always retries Unknown, but Go only
263 // does when the message implies "goaway". This takes a broad approach and retries on both
264 //
265 // This may need adjustment based on lower layers from the rust ecosystem, for example if
266 // tonic interprets h2 errors and forwards as Internal/Unknown for particular cases. For
267 // example, we could inspect the h2::Reason to discern NO_ERROR/GOAWAY from other Unknown
268 // errors. For now, this is left as permissive for simplicity
269
270 match error.code() {
271 Code::DeadlineExceeded
272 | Code::Internal
273 | Code::Cancelled
274 | Code::ResourceExhausted
275 | Code::Aborted
276 | Code::Unknown => true,
277 Code::Unavailable => {
278 let is_shutdown = error.message().contains("Server shutdownNow invoked");
279 !is_shutdown
280 }
281 _ => false,
282 }
283 }
284}