ya_gcp/pubsub/
mod.rs

1//! An API for interacting with the [Pub/Sub](https://cloud.google.com/pubsub) message service.
2//!
3//! Publishing and topic management is done through the [`PublisherClient`], while reading data and
4//! subscription management is done through the [`SubscriberClient`].
5
6use crate::grpc::{Body, BoxBody, Bytes, DefaultGrpcImpl, GrpcService, StdError};
7use crate::retry_policy::RetryPredicate;
8use std::fmt::Display;
9use tracing::debug_span;
10
11// alias Status as this module's error type
12pub use tonic::Status as Error;
13
14pub use client_builder::{BuildError, MakeConnection, PubSubConfig, Uri};
15pub use publish_sink::{PublishConfig, PublishError, PublishTopicSink, SinkError};
16pub use streaming_subscription::{
17    AcknowledgeError, AcknowledgeToken, ModifyAcknowledgeError, StreamSubscription,
18    StreamSubscriptionConfig,
19};
20
21pub(crate) mod client_builder;
22mod publish_sink;
23mod streaming_subscription;
24
25#[cfg(feature = "emulators")]
26#[cfg_attr(docsrs, doc(cfg(feature = "emulators")))]
27pub mod emulator;
28
29/// Types and functions generated from PubSub's gRPC schema
30#[allow(
31    rustdoc::broken_intra_doc_links,
32    rustdoc::bare_urls,
33    missing_docs,
34    unreachable_pub
35)]
36pub mod api {
37    include!("../generated/google.pubsub.v1.rs");
38
39    // re-exports of prost types used within the generated code for convenience
40    pub use prost_types::{Duration, FieldMask, Timestamp};
41}
42
43/// A client through which pubsub messages are sent, and topics are managed. Created
44/// from the [`build_pubsub_publisher`](crate::builder::ClientBuilder::build_pubsub_publisher)
45/// function.
46///
47/// This builds on top of the raw [gRPC publisher API](api::publisher_client::PublisherClient)
48/// to provide more ergonomic functionality
49#[derive(Debug, Clone)]
50pub struct PublisherClient<S = DefaultGrpcImpl> {
51    inner: api::publisher_client::PublisherClient<S>,
52}
53
54impl<S> PublisherClient<S> {
55    /// Manually construct a new client.
56    ///
57    /// There are limited circumstances in which this is useful; consider instead using the builder
58    /// function [crate::builder::ClientBuilder::build_pubsub_publisher]
59    pub fn from_raw_api(client: api::publisher_client::PublisherClient<S>) -> Self {
60        PublisherClient { inner: client }
61    }
62
63    /// Access the underlying grpc api
64    pub fn raw_api(&self) -> &api::publisher_client::PublisherClient<S> {
65        &self.inner
66    }
67
68    /// Mutably access the underlying grpc api
69    pub fn raw_api_mut(&mut self) -> &mut api::publisher_client::PublisherClient<S> {
70        &mut self.inner
71    }
72}
73
74impl<S> PublisherClient<S>
75where
76    S: GrpcService<BoxBody> + Clone,
77    S::Error: Into<StdError>,
78    S::ResponseBody: Body<Data = Bytes> + Send + 'static,
79    <S::ResponseBody as Body>::Error: Into<StdError> + Send,
80{
81    /// Create a sink which will publish [messages](api::PubsubMessage) to the given topic.
82    ///
83    /// See the type's [documentation](PublishTopicSink) for more details.
84    pub fn publish_topic_sink(
85        &mut self,
86        topic: ProjectTopicName,
87        config: PublishConfig,
88    ) -> PublishTopicSink<S> {
89        PublishTopicSink::new(self.inner.clone(), topic, config)
90    }
91}
92
93/// A client through which pubsub messages are consumed, and subscriptions are managed. Created
94/// from the [`build_pubsub_subscriber`](crate::builder::ClientBuilder::build_pubsub_subscriber)
95/// function.
96///
97/// This is an interface built on top of the raw [gRPC subscriber
98/// API](api::subscriber_client::SubscriberClient) which provides more ergonomic functionality
99#[derive(Debug, Clone)]
100pub struct SubscriberClient<S = DefaultGrpcImpl> {
101    inner: api::subscriber_client::SubscriberClient<S>,
102}
103
104impl<S> SubscriberClient<S> {
105    /// Manually construct a new client.
106    ///
107    /// There are limited circumstances in which this is useful; consider instead using the builder
108    /// function [crate::builder::ClientBuilder::build_pubsub_subscriber]
109    pub fn from_raw_api(client: api::subscriber_client::SubscriberClient<S>) -> Self {
110        Self { inner: client }
111    }
112
113    /// Access the underlying grpc api
114    pub fn raw_api(&self) -> &api::subscriber_client::SubscriberClient<S> {
115        &self.inner
116    }
117
118    /// Mutably access the underlying grpc api
119    pub fn raw_api_mut(&mut self) -> &mut api::subscriber_client::SubscriberClient<S> {
120        &mut self.inner
121    }
122}
123
124impl<S> SubscriberClient<S>
125where
126    S: GrpcService<BoxBody> + Clone,
127    S::Error: Into<StdError>,
128    S::ResponseBody: Body<Data = Bytes> + Send + 'static,
129    <S::ResponseBody as Body>::Error: Into<StdError> + Send,
130{
131    /// Start a streaming subscription with the pubsub service.
132    ///
133    /// The returned stream will yield [messages](api::PubsubMessage) along with corresponding
134    /// [`AcknowledgeTokens`](AcknowledgeToken), used to control message re-delivery.
135    pub fn stream_subscription(
136        &mut self,
137        subscription: ProjectSubscriptionName,
138        config: StreamSubscriptionConfig,
139    ) -> StreamSubscription<S> {
140        let sub_name: String = subscription.clone().into();
141        let span = debug_span!("create_subscription", topic = sub_name);
142        let _guard = span.enter();
143        StreamSubscription::new(
144            // As of the ack handler changes, the streaming implementation needs more than one
145            // client. The obvious approach would be to clone the client as necessary. However
146            // adding a Clone bound is a major semver change, and for downstream-simplification
147            // reasons we'd like to avoid that.
148            //
149            // Fortunately, there already *was* a clone bound on this `stream_subscription`
150            // function, which is the only public way to construct the stream. Also fortunately we
151            // only need a static number of clients and not arbitrary clones, so we can clone here
152            // and pass an array down there. This isn't *pretty*, but it works
153            //
154            // TODO(0.12.0) just add the clone bound
155            [
156                self.inner.clone(),
157                self.inner.clone(),
158                self.inner.clone(),
159                self.inner.clone(),
160            ],
161            subscription.into(),
162            config,
163        )
164    }
165}
166
167/// A project and subscription name combined in a format expected by API calls,
168///
169/// ```
170/// use ya_gcp::pubsub::ProjectSubscriptionName;
171///
172/// assert_eq!(
173///     String::from(ProjectSubscriptionName::new(
174///         "my-project",
175///         "my-subscription"
176///     )),
177///     "projects/my-project/subscriptions/my-subscription".to_string(),
178/// );
179/// ```
180#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
181pub struct ProjectSubscriptionName(String);
182
183impl ProjectSubscriptionName {
184    /// Create a new `ProjectSubscriptionName` from the given project and subscription names
185    pub fn new(project_name: impl Display, subscription_name: impl Display) -> Self {
186        Self(format!(
187            "projects/{project}/subscriptions/{subscription}",
188            project = project_name,
189            subscription = subscription_name
190        ))
191    }
192}
193
194impl From<ProjectSubscriptionName> for String {
195    fn from(from: ProjectSubscriptionName) -> String {
196        from.0
197    }
198}
199
200impl std::fmt::Display for ProjectSubscriptionName {
201    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
202        std::fmt::Display::fmt(&self.0, f)
203    }
204}
205
206/// A project and topic name combined in a format expected by API calls
207///
208/// ```
209/// use ya_gcp::pubsub::ProjectTopicName;
210///
211/// assert_eq!(
212///     String::from(ProjectTopicName::new("my-project", "my-topic")),
213///     "projects/my-project/topics/my-topic".to_string(),
214/// );
215/// ```
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct ProjectTopicName(String);
218
219impl ProjectTopicName {
220    /// Create a new `ProjectTopicName` from the given project and topic names
221    pub fn new(project_name: impl Display, topic_name: impl Display) -> Self {
222        Self(format!(
223            "projects/{project}/topics/{topic}",
224            project = project_name,
225            topic = topic_name,
226        ))
227    }
228}
229
230impl From<ProjectTopicName> for String {
231    fn from(from: ProjectTopicName) -> String {
232        from.0
233    }
234}
235
236impl std::fmt::Display for ProjectTopicName {
237    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
238        std::fmt::Display::fmt(&self.0, f)
239    }
240}
241
242/// The default [`RetryPredicate`] used for errors from PubSub operations
243#[derive(Debug, Default, Clone)]
244pub struct PubSubRetryCheck {
245    _priv: (),
246}
247
248impl PubSubRetryCheck {
249    /// Create a new instance with default settings
250    pub fn new() -> Self {
251        Self { _priv: () }
252    }
253}
254
255impl RetryPredicate<Error> for PubSubRetryCheck {
256    fn is_retriable(&self, error: &Error) -> bool {
257        use tonic::Code;
258
259        // this error code check is based on the ones used in the Java and Go pubsub client libs:
260        // https://github.com/googleapis/java-pubsub/blob/d969e8925edc3401e6eb534699ce0351a5f0b20b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java#L33
261        // https://github.com/googleapis/google-cloud-go/blob/ac9924157f35a00ff9d1e6ece9a7e0f12fc60226/pubsub/service.go#L51
262        // Go doesn't retry on cancelled, but Java does; Java always retries Unknown, but Go only
263        // does when the message implies "goaway". This takes a broad approach and retries on both
264        //
265        // This may need adjustment based on lower layers from the rust ecosystem, for example if
266        // tonic interprets h2 errors and forwards as Internal/Unknown for particular cases. For
267        // example, we could inspect the h2::Reason to discern NO_ERROR/GOAWAY from other Unknown
268        // errors. For now, this is left as permissive for simplicity
269
270        match error.code() {
271            Code::DeadlineExceeded
272            | Code::Internal
273            | Code::Cancelled
274            | Code::ResourceExhausted
275            | Code::Aborted
276            | Code::Unknown => true,
277            Code::Unavailable => {
278                let is_shutdown = error.message().contains("Server shutdownNow invoked");
279                !is_shutdown
280            }
281            _ => false,
282        }
283    }
284}