Skip to main content

gcloud_pubsub/apiv1/
subscriber_client.rs

1use google_cloud_gax::conn::Channel;
2use google_cloud_gax::create_request;
3use google_cloud_gax::grpc::Status;
4use google_cloud_gax::grpc::{IntoStreamingRequest, Response, Streaming};
5use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
6use google_cloud_googleapis::pubsub::v1::subscriber_client::SubscriberClient as InternalSubscriberClient;
7use google_cloud_googleapis::pubsub::v1::{
8    AcknowledgeRequest, CreateSnapshotRequest, DeleteSnapshotRequest, DeleteSubscriptionRequest, GetSnapshotRequest,
9    GetSubscriptionRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListSubscriptionsRequest,
10    ListSubscriptionsResponse, ModifyAckDeadlineRequest, ModifyPushConfigRequest, PullRequest, PullResponse,
11    SeekRequest, SeekResponse, Snapshot, StreamingPullRequest, StreamingPullResponse, Subscription,
12    UpdateSnapshotRequest, UpdateSubscriptionRequest,
13};
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::apiv1::conn_pool::ConnectionManager;
18use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
19
20pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
21    StreamingPullRequest {
22        subscription: "".to_string(),
23        ack_ids: vec![],
24        modify_deadline_seconds: vec![],
25        modify_deadline_ack_ids: vec![],
26        stream_ack_deadline_seconds: 0,
27        client_id: "".to_string(),
28        max_outstanding_messages: 0,
29        max_outstanding_bytes: 0,
30    }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriberClient {
35    cm: Arc<ConnectionManager>,
36    streaming_pull_cm: Arc<ConnectionManager>,
37}
38
39#[allow(dead_code)]
40impl SubscriberClient {
41    /// create new Subscriber client
42    pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient {
43        SubscriberClient {
44            cm: Arc::new(cm),
45            streaming_pull_cm: Arc::new(streaming_pull_cm),
46        }
47    }
48
49    #[inline]
50    fn client(&self) -> InternalSubscriberClient<Channel> {
51        InternalSubscriberClient::new(self.cm.conn())
52            .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
53            .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
54    }
55
56    #[inline]
57    fn client_for_streaming_pull(&self) -> InternalSubscriberClient<Channel> {
58        InternalSubscriberClient::new(self.streaming_pull_cm.conn())
59            .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
60            .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
61    }
62
63    pub(crate) fn streaming_pool_size(&self) -> usize {
64        self.streaming_pull_cm.num()
65    }
66
67    /// create_subscription creates a subscription to a given topic. See the [resource name rules]
68    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)).
69    /// If the subscription already exists, returns ALREADY_EXISTS.
70    /// If the corresponding topic doesn’t exist, returns NOT_FOUND.
71    ///
72    /// If the name is not provided in the request, the server will assign a random
73    /// name for this subscription on the same project as the topic, conforming
74    /// to the [resource name format]
75    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The generated
76    /// name is populated in the returned Subscription object. Note that for REST
77    /// API requests, you must specify a name in the request.
78    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79    pub async fn create_subscription(
80        &self,
81        req: Subscription,
82        retry: Option<RetrySetting>,
83    ) -> Result<Response<Subscription>, Status> {
84        let name = &req.name;
85        let action = || async {
86            let mut client = self.client();
87            let request = create_request(format!("name={name}"), req.clone());
88            client.create_subscription(request).await.map_transient_err()
89        };
90        invoke(retry, action).await
91    }
92
93    /// updateSubscription updates an existing subscription. Note that certain properties of a
94    /// subscription, such as its topic, are not modifiable.
95    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
96    pub async fn update_subscription(
97        &self,
98        req: UpdateSubscriptionRequest,
99        retry: Option<RetrySetting>,
100    ) -> Result<Response<Subscription>, Status> {
101        let name = match &req.subscription {
102            Some(s) => s.name.as_str(),
103            None => "",
104        };
105        let action = || async {
106            let mut client = self.client();
107            let request = create_request(format!("subscription.name={name}"), req.clone());
108            client.update_subscription(request).await.map_transient_err()
109        };
110        invoke(retry, action).await
111    }
112
113    /// get_subscription gets the configuration details of a subscription.
114    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
115    pub async fn get_subscription(
116        &self,
117        req: GetSubscriptionRequest,
118        retry: Option<RetrySetting>,
119    ) -> Result<Response<Subscription>, Status> {
120        let subscription = &req.subscription;
121        let action = || async {
122            let mut client = self.client();
123            let request = create_request(format!("subscription={subscription}"), req.clone());
124            client.get_subscription(request).await.map_transient_err()
125        };
126        invoke(retry, action).await
127    }
128
129    /// list_subscriptions lists matching subscriptions.
130    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
131    pub async fn list_subscriptions(
132        &self,
133        mut req: ListSubscriptionsRequest,
134        retry: Option<RetrySetting>,
135    ) -> Result<Vec<Subscription>, Status> {
136        let project = &req.project;
137        let mut all = vec![];
138        //eager loading
139        loop {
140            let action = || async {
141                let mut client = self.client();
142                let request = create_request(format!("project={project}"), req.clone());
143                client
144                    .list_subscriptions(request)
145                    .await
146                    .map(|d| d.into_inner())
147                    .map_transient_err()
148            };
149            let response: ListSubscriptionsResponse = invoke(retry.clone(), action).await?;
150            all.extend(response.subscriptions.into_iter());
151            if response.next_page_token.is_empty() {
152                return Ok(all);
153            }
154            req.page_token = response.next_page_token;
155        }
156    }
157
158    /// delete_subscription deletes an existing subscription. All messages retained in the subscription
159    /// are immediately dropped. Calls to Pull after deletion will return
160    /// NOT_FOUND. After a subscription is deleted, a new one may be created with
161    /// the same name, but the new one has no association with the old
162    /// subscription or its topic unless the same topic is specified.
163    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
164    pub async fn delete_subscription(
165        &self,
166        req: DeleteSubscriptionRequest,
167        retry: Option<RetrySetting>,
168    ) -> Result<Response<()>, Status> {
169        let subscription = &req.subscription;
170        let action = || async {
171            let mut client = self.client();
172            let request = create_request(format!("subscription={subscription}"), req.clone());
173            client.delete_subscription(request).await.map_transient_err()
174        };
175        invoke(retry, action).await
176    }
177
178    /// ModifyAckDeadline modifies the ack deadline for a specific message. This method is useful
179    /// to indicate that more time is needed to process a message by the
180    /// subscriber, or to make the message available for redelivery if the
181    /// processing was interrupted. Note that this does not modify the
182    /// subscription-level ackDeadlineSeconds used for subsequent messages.
183    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
184    pub async fn modify_ack_deadline(
185        &self,
186        req: ModifyAckDeadlineRequest,
187        retry: Option<RetrySetting>,
188    ) -> Result<Response<()>, Status> {
189        let subscription = &req.subscription;
190        let action = || async {
191            let mut client = self.client();
192            let request = create_request(format!("subscription={subscription}"), req.clone());
193            client.modify_ack_deadline(request).await.map_transient_err()
194        };
195        invoke(retry, action).await
196    }
197
198    /// acknowledge acknowledges the messages associated with the ack_ids in the
199    /// AcknowledgeRequest. The Pub/Sub system can remove the relevant messages
200    /// from the subscription.
201    ///
202    /// Acknowledging a message whose ack deadline has expired may succeed,
203    /// but such a message may be redelivered later. Acknowledging a message more
204    /// than once will not result in an error.
205    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
206    pub async fn acknowledge(
207        &self,
208        req: AcknowledgeRequest,
209        retry: Option<RetrySetting>,
210    ) -> Result<Response<()>, Status> {
211        let subscription = &req.subscription;
212        let action = || async {
213            let mut client = self.client();
214            let request = create_request(format!("subscription={subscription}"), req.clone());
215            client.acknowledge(request).await.map_transient_err()
216        };
217        invoke(retry, action).await
218    }
219
220    /// pull pulls messages from the server. The server may return UNAVAILABLE if
221    /// there are too many concurrent pull requests pending for the given
222    /// subscription.
223    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
224    pub async fn pull(&self, req: PullRequest, retry: Option<RetrySetting>) -> Result<Response<PullResponse>, Status> {
225        let subscription = &req.subscription;
226        let action = || async {
227            let mut client = self.client();
228            let request = create_request(format!("subscription={subscription}"), req.clone());
229            client.pull(request).await.map_transient_err()
230        };
231        invoke(retry, action).await
232    }
233
234    /// streaming_pull establishes a stream with the server, which sends messages down to the
235    /// client. The client streams acknowledgements and ack deadline modifications
236    /// back to the server. The server will close the stream and return the status
237    /// on any error. The server may close the stream with status UNAVAILABLE to
238    /// reassign server-side resources, in which case, the client should
239    /// re-establish the stream. Flow control can be achieved by configuring the
240    /// underlying RPC channel.
241    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
242    pub async fn streaming_pull(
243        &self,
244        req: StreamingPullRequest,
245        interval: Duration,
246        retry: Option<RetrySetting>,
247    ) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
248        let action = || async {
249            let mut client = self.client_for_streaming_pull();
250            let base_req = req.clone();
251            let request = Box::pin(async_stream::stream! {
252                yield base_req.clone();
253                loop {
254                    tokio::time::sleep(interval).await;
255                    yield create_empty_streaming_pull_request();
256                }
257            });
258            let mut v = request.into_streaming_request();
259            let target = v.metadata_mut();
260            target.append(
261                "x-goog-request-params",
262                format!("subscription={}", req.subscription).parse().unwrap(),
263            );
264            client.streaming_pull(v).await.map_transient_err()
265        };
266        invoke(retry, action).await
267    }
268
269    /// modify_push_config modifies the PushConfig for a specified subscription.
270    ///
271    /// This may be used to change a push subscription to a pull one (signified by
272    /// an empty PushConfig) or vice versa, or change the endpoint URL and other
273    /// attributes of a push subscription. Messages will accumulate for delivery
274    /// continuously through the call regardless of changes to the PushConfig.
275    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
276    pub async fn modify_push_config(
277        &self,
278        req: ModifyPushConfigRequest,
279        retry: Option<RetrySetting>,
280    ) -> Result<Response<()>, Status> {
281        let subscription = &req.subscription;
282        let action = || async {
283            let mut client = self.client();
284            let request = create_request(format!("subscription={subscription}"), req.clone());
285            client.modify_push_config(request).await.map_transient_err()
286        };
287        invoke(retry, action).await
288    }
289
290    /// get_snapshot gets the configuration details of a snapshot. Snapshots are used in
291    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
292    /// operations, which allow you to manage message acknowledgments in bulk. That
293    /// is, you can set the acknowledgment state of messages in an existing
294    /// subscription to the state captured by a snapshot
295    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
296    pub async fn get_snapshot(
297        &self,
298        req: GetSnapshotRequest,
299        retry: Option<RetrySetting>,
300    ) -> Result<Response<Snapshot>, Status> {
301        let snapshot = &req.snapshot;
302        let action = || async {
303            let mut client = self.client();
304            let request = create_request(format!("snapshot={snapshot}"), req.clone());
305            client.get_snapshot(request).await.map_transient_err()
306        };
307        invoke(retry, action).await
308    }
309
310    /// list_snapshots lists the existing snapshots. Snapshots are used in Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations, which
311    /// allow you to manage message acknowledgments in bulk. That is, you can set
312    /// the acknowledgment state of messages in an existing subscription to the
313    /// state captured by a snapshot.
314    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
315    pub async fn list_snapshots(
316        &self,
317        mut req: ListSnapshotsRequest,
318        retry: Option<RetrySetting>,
319    ) -> Result<Vec<Snapshot>, Status> {
320        let project = &req.project;
321        let mut all = vec![];
322        //eager loading
323        loop {
324            let action = || async {
325                let mut client = self.client();
326                let request = create_request(format!("project={project}"), req.clone());
327                client
328                    .list_snapshots(request)
329                    .await
330                    .map(|d| d.into_inner())
331                    .map_transient_err()
332            };
333            let response: ListSnapshotsResponse = invoke(retry.clone(), action).await?;
334            all.extend(response.snapshots.into_iter());
335            if response.next_page_token.is_empty() {
336                return Ok(all);
337            }
338            req.page_token = response.next_page_token;
339        }
340    }
341
342    /// create_snapshot creates a snapshot from the requested subscription. Snapshots are used in
343    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
344    /// which allow you to manage message acknowledgments in bulk. That is, you can
345    /// set the acknowledgment state of messages in an existing subscription to the
346    /// state captured by a snapshot.
347    /// If the snapshot already exists, returns ALREADY_EXISTS.
348    /// If the requested subscription doesn’t exist, returns NOT_FOUND.
349    /// If the backlog in the subscription is too old – and the resulting snapshot
350    /// would expire in less than 1 hour – then FAILED_PRECONDITION is returned.
351    /// See also the Snapshot.expire_time field. If the name is not provided in
352    /// the request, the server will assign a random
353    /// name for this snapshot on the same project as the subscription, conforming
354    /// to the [resource name format]
355    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The
356    /// generated name is populated in the returned Snapshot object. Note that for
357    /// REST API requests, you must specify a name in the request.
358    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
359    pub async fn create_snapshot(
360        &self,
361        req: CreateSnapshotRequest,
362        retry: Option<RetrySetting>,
363    ) -> Result<Response<Snapshot>, Status> {
364        let name = &req.name;
365        let action = || async {
366            let mut client = self.client();
367            let request = create_request(format!("name={name}"), req.clone());
368            client.create_snapshot(request).await.map_transient_err()
369        };
370        invoke(retry, action).await
371    }
372
373    /// update_snapshot updates an existing snapshot. Snapshots are used in
374    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
375    /// operations, which allow
376    /// you to manage message acknowledgments in bulk. That is, you can set the
377    /// acknowledgment state of messages in an existing subscription to the state
378    /// captured by a snapshot.
379    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
380    pub async fn update_snapshot(
381        &self,
382        req: UpdateSnapshotRequest,
383        retry: Option<RetrySetting>,
384    ) -> Result<Response<Snapshot>, Status> {
385        let name = match &req.snapshot {
386            Some(v) => v.name.as_str(),
387            None => "",
388        };
389        let action = || async {
390            let mut client = self.client();
391            let request = create_request(format!("snapshot.name={name}"), req.clone());
392            client.update_snapshot(request).await.map_transient_err()
393        };
394        invoke(retry, action).await
395    }
396
397    /// delete_snapshot removes an existing snapshot. Snapshots are used in [Seek]
398    /// (https://cloud.google.com/pubsub/docs/replay-overview (at https://cloud.google.com/pubsub/docs/replay-overview)) operations, which
399    /// allow you to manage message acknowledgments in bulk. That is, you can set
400    /// the acknowledgment state of messages in an existing subscription to the
401    /// state captured by a snapshot.
402    /// When the snapshot is deleted, all messages retained in the snapshot
403    /// are immediately dropped. After a snapshot is deleted, a new one may be
404    /// created with the same name, but the new one has no association with the old
405    /// snapshot or its subscription, unless the same subscription is specified.
406    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
407    pub async fn delete_snapshot(
408        &self,
409        req: DeleteSnapshotRequest,
410        retry: Option<RetrySetting>,
411    ) -> Result<Response<()>, Status> {
412        let name = &req.snapshot;
413        let action = || async {
414            let mut client = self.client();
415            let request = create_request(format!("snapshot={name}"), req.clone());
416            client.delete_snapshot(request).await.map_transient_err()
417        };
418        invoke(retry, action).await
419    }
420
421    // seek [seeks](https://cloud.google.com/pubsub/docs/replay-overview) a subscription to
422    // a point back in time (with a TimeStamp) or to a saved snapshot.
423    pub async fn seek(&self, req: SeekRequest, retry: Option<RetrySetting>) -> Result<Response<SeekResponse>, Status> {
424        let action = || async {
425            let mut client = self.client();
426            let subscription = req.subscription.clone();
427            let request = create_request(format!("subscription={subscription}"), req.clone());
428            client.seek(request).await.map_transient_err()
429        };
430        invoke(retry, action).await
431    }
432}