google_cloud_pubsub/apiv1/
subscriber_client.rs

1use std::sync::Arc;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::create_request;
5use google_cloud_gax::grpc::Status;
6use google_cloud_gax::grpc::{IntoStreamingRequest, Response, Streaming};
7use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
8use google_cloud_googleapis::pubsub::v1::subscriber_client::SubscriberClient as InternalSubscriberClient;
9use google_cloud_googleapis::pubsub::v1::{
10    AcknowledgeRequest, CreateSnapshotRequest, DeleteSnapshotRequest, DeleteSubscriptionRequest, GetSnapshotRequest,
11    GetSubscriptionRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListSubscriptionsRequest,
12    ListSubscriptionsResponse, ModifyAckDeadlineRequest, ModifyPushConfigRequest, PullRequest, PullResponse,
13    SeekRequest, SeekResponse, Snapshot, StreamingPullRequest, StreamingPullResponse, Subscription,
14    UpdateSnapshotRequest, UpdateSubscriptionRequest,
15};
16
17use crate::apiv1::conn_pool::ConnectionManager;
18use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
19
20pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
21    StreamingPullRequest {
22        subscription: "".to_string(),
23        ack_ids: vec![],
24        modify_deadline_seconds: vec![],
25        modify_deadline_ack_ids: vec![],
26        stream_ack_deadline_seconds: 0,
27        client_id: "".to_string(),
28        max_outstanding_messages: 0,
29        max_outstanding_bytes: 0,
30    }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriberClient {
35    cm: Arc<ConnectionManager>,
36    streaming_pull_cm: Arc<ConnectionManager>,
37}
38
39#[allow(dead_code)]
40impl SubscriberClient {
41    /// create new Subscriber client
42    pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient {
43        SubscriberClient {
44            cm: Arc::new(cm),
45            streaming_pull_cm: Arc::new(streaming_pull_cm),
46        }
47    }
48
49    #[inline]
50    fn client(&self) -> InternalSubscriberClient<Channel> {
51        InternalSubscriberClient::new(self.cm.conn())
52            .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
53            .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
54    }
55
56    #[inline]
57    fn client_for_streaming_pull(&self) -> InternalSubscriberClient<Channel> {
58        InternalSubscriberClient::new(self.streaming_pull_cm.conn())
59            .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
60            .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
61    }
62
63    pub(crate) fn streaming_pool_size(&self) -> usize {
64        self.streaming_pull_cm.num()
65    }
66
67    /// create_subscription creates a subscription to a given topic. See the [resource name rules]
68    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)).
69    /// If the subscription already exists, returns ALREADY_EXISTS.
70    /// If the corresponding topic doesn’t exist, returns NOT_FOUND.
71    ///
72    /// If the name is not provided in the request, the server will assign a random
73    /// name for this subscription on the same project as the topic, conforming
74    /// to the [resource name format]
75    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The generated
76    /// name is populated in the returned Subscription object. Note that for REST
77    /// API requests, you must specify a name in the request.
78    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79    pub async fn create_subscription(
80        &self,
81        req: Subscription,
82        retry: Option<RetrySetting>,
83    ) -> Result<Response<Subscription>, Status> {
84        let name = &req.name;
85        let action = || async {
86            let mut client = self.client();
87            let request = create_request(format!("name={name}"), req.clone());
88            client.create_subscription(request).await.map_transient_err()
89        };
90        invoke(retry, action).await
91    }
92
93    /// updateSubscription updates an existing subscription. Note that certain properties of a
94    /// subscription, such as its topic, are not modifiable.
95    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
96    pub async fn update_subscription(
97        &self,
98        req: UpdateSubscriptionRequest,
99        retry: Option<RetrySetting>,
100    ) -> Result<Response<Subscription>, Status> {
101        let name = match &req.subscription {
102            Some(s) => s.name.as_str(),
103            None => "",
104        };
105        let action = || async {
106            let mut client = self.client();
107            let request = create_request(format!("subscription.name={name}"), req.clone());
108            client.update_subscription(request).await.map_transient_err()
109        };
110        invoke(retry, action).await
111    }
112
113    /// get_subscription gets the configuration details of a subscription.
114    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
115    pub async fn get_subscription(
116        &self,
117        req: GetSubscriptionRequest,
118        retry: Option<RetrySetting>,
119    ) -> Result<Response<Subscription>, Status> {
120        let subscription = &req.subscription;
121        let action = || async {
122            let mut client = self.client();
123            let request = create_request(format!("subscription={subscription}"), req.clone());
124            client.get_subscription(request).await.map_transient_err()
125        };
126        invoke(retry, action).await
127    }
128
129    /// list_subscriptions lists matching subscriptions.
130    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
131    pub async fn list_subscriptions(
132        &self,
133        mut req: ListSubscriptionsRequest,
134        retry: Option<RetrySetting>,
135    ) -> Result<Vec<Subscription>, Status> {
136        let project = &req.project;
137        let mut all = vec![];
138        //eager loading
139        loop {
140            let action = || async {
141                let mut client = self.client();
142                let request = create_request(format!("project={project}"), req.clone());
143                client
144                    .list_subscriptions(request)
145                    .await
146                    .map(|d| d.into_inner())
147                    .map_transient_err()
148            };
149            let response: ListSubscriptionsResponse = invoke(retry.clone(), action).await?;
150            all.extend(response.subscriptions.into_iter());
151            if response.next_page_token.is_empty() {
152                return Ok(all);
153            }
154            req.page_token = response.next_page_token;
155        }
156    }
157
158    /// delete_subscription deletes an existing subscription. All messages retained in the subscription
159    /// are immediately dropped. Calls to Pull after deletion will return
160    /// NOT_FOUND. After a subscription is deleted, a new one may be created with
161    /// the same name, but the new one has no association with the old
162    /// subscription or its topic unless the same topic is specified.
163    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
164    pub async fn delete_subscription(
165        &self,
166        req: DeleteSubscriptionRequest,
167        retry: Option<RetrySetting>,
168    ) -> Result<Response<()>, Status> {
169        let subscription = &req.subscription;
170        let action = || async {
171            let mut client = self.client();
172            let request = create_request(format!("subscription={subscription}"), req.clone());
173            client.delete_subscription(request).await.map_transient_err()
174        };
175        invoke(retry, action).await
176    }
177
178    /// ModifyAckDeadline modifies the ack deadline for a specific message. This method is useful
179    /// to indicate that more time is needed to process a message by the
180    /// subscriber, or to make the message available for redelivery if the
181    /// processing was interrupted. Note that this does not modify the
182    /// subscription-level ackDeadlineSeconds used for subsequent messages.
183    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
184    pub async fn modify_ack_deadline(
185        &self,
186        req: ModifyAckDeadlineRequest,
187        retry: Option<RetrySetting>,
188    ) -> Result<Response<()>, Status> {
189        let subscription = &req.subscription;
190        let action = || async {
191            let mut client = self.client();
192            let request = create_request(format!("subscription={subscription}"), req.clone());
193            client.modify_ack_deadline(request).await.map_transient_err()
194        };
195        invoke(retry, action).await
196    }
197
198    /// acknowledge acknowledges the messages associated with the ack_ids in the
199    /// AcknowledgeRequest. The Pub/Sub system can remove the relevant messages
200    /// from the subscription.
201    ///
202    /// Acknowledging a message whose ack deadline has expired may succeed,
203    /// but such a message may be redelivered later. Acknowledging a message more
204    /// than once will not result in an error.
205    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
206    pub async fn acknowledge(
207        &self,
208        req: AcknowledgeRequest,
209        retry: Option<RetrySetting>,
210    ) -> Result<Response<()>, Status> {
211        let subscription = &req.subscription;
212        let action = || async {
213            let mut client = self.client();
214            let request = create_request(format!("subscription={subscription}"), req.clone());
215            client.acknowledge(request).await.map_transient_err()
216        };
217        invoke(retry, action).await
218    }
219
220    /// pull pulls messages from the server. The server may return UNAVAILABLE if
221    /// there are too many concurrent pull requests pending for the given
222    /// subscription.
223    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
224    pub async fn pull(&self, req: PullRequest, retry: Option<RetrySetting>) -> Result<Response<PullResponse>, Status> {
225        let subscription = &req.subscription;
226        let action = || async {
227            let mut client = self.client();
228            let request = create_request(format!("subscription={subscription}"), req.clone());
229            client.pull(request).await.map_transient_err()
230        };
231        invoke(retry, action).await
232    }
233
234    /// streaming_pull establishes a stream with the server, which sends messages down to the
235    /// client. The client streams acknowledgements and ack deadline modifications
236    /// back to the server. The server will close the stream and return the status
237    /// on any error. The server may close the stream with status UNAVAILABLE to
238    /// reassign server-side resources, in which case, the client should
239    /// re-establish the stream. Flow control can be achieved by configuring the
240    /// underlying RPC channel.
241    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
242    pub async fn streaming_pull(
243        &self,
244        req: StreamingPullRequest,
245        ping_receiver: async_channel::Receiver<bool>,
246        retry: Option<RetrySetting>,
247    ) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
248        let action = || async {
249            let mut client = self.client_for_streaming_pull();
250            let base_req = req.clone();
251            let rx = ping_receiver.clone();
252            let request = Box::pin(async_stream::stream! {
253                yield base_req.clone();
254
255                // ping message.
256                // must be empty request
257                while let Ok(_r) = rx.recv().await {
258                   yield create_empty_streaming_pull_request();
259                }
260            });
261            let mut v = request.into_streaming_request();
262            let target = v.metadata_mut();
263            target.append(
264                "x-goog-request-params",
265                format!("subscription={}", req.subscription).parse().unwrap(),
266            );
267            client.streaming_pull(v).await.map_transient_err()
268        };
269        invoke(retry, action).await
270    }
271
272    /// modify_push_config modifies the PushConfig for a specified subscription.
273    ///
274    /// This may be used to change a push subscription to a pull one (signified by
275    /// an empty PushConfig) or vice versa, or change the endpoint URL and other
276    /// attributes of a push subscription. Messages will accumulate for delivery
277    /// continuously through the call regardless of changes to the PushConfig.
278    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
279    pub async fn modify_push_config(
280        &self,
281        req: ModifyPushConfigRequest,
282        retry: Option<RetrySetting>,
283    ) -> Result<Response<()>, Status> {
284        let subscription = &req.subscription;
285        let action = || async {
286            let mut client = self.client();
287            let request = create_request(format!("subscription={subscription}"), req.clone());
288            client.modify_push_config(request).await.map_transient_err()
289        };
290        invoke(retry, action).await
291    }
292
293    /// get_snapshot gets the configuration details of a snapshot. Snapshots are used in
294    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
295    /// operations, which allow you to manage message acknowledgments in bulk. That
296    /// is, you can set the acknowledgment state of messages in an existing
297    /// subscription to the state captured by a snapshot
298    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
299    pub async fn get_snapshot(
300        &self,
301        req: GetSnapshotRequest,
302        retry: Option<RetrySetting>,
303    ) -> Result<Response<Snapshot>, Status> {
304        let snapshot = &req.snapshot;
305        let action = || async {
306            let mut client = self.client();
307            let request = create_request(format!("snapshot={snapshot}"), req.clone());
308            client.get_snapshot(request).await.map_transient_err()
309        };
310        invoke(retry, action).await
311    }
312
313    /// list_snapshots lists the existing snapshots. Snapshots are used in Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations, which
314    /// allow you to manage message acknowledgments in bulk. That is, you can set
315    /// the acknowledgment state of messages in an existing subscription to the
316    /// state captured by a snapshot.
317    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
318    pub async fn list_snapshots(
319        &self,
320        mut req: ListSnapshotsRequest,
321        retry: Option<RetrySetting>,
322    ) -> Result<Vec<Snapshot>, Status> {
323        let project = &req.project;
324        let mut all = vec![];
325        //eager loading
326        loop {
327            let action = || async {
328                let mut client = self.client();
329                let request = create_request(format!("project={project}"), req.clone());
330                client
331                    .list_snapshots(request)
332                    .await
333                    .map(|d| d.into_inner())
334                    .map_transient_err()
335            };
336            let response: ListSnapshotsResponse = invoke(retry.clone(), action).await?;
337            all.extend(response.snapshots.into_iter());
338            if response.next_page_token.is_empty() {
339                return Ok(all);
340            }
341            req.page_token = response.next_page_token;
342        }
343    }
344
345    /// create_snapshot creates a snapshot from the requested subscription. Snapshots are used in
346    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
347    /// which allow you to manage message acknowledgments in bulk. That is, you can
348    /// set the acknowledgment state of messages in an existing subscription to the
349    /// state captured by a snapshot.
350    /// If the snapshot already exists, returns ALREADY_EXISTS.
351    /// If the requested subscription doesn’t exist, returns NOT_FOUND.
352    /// If the backlog in the subscription is too old – and the resulting snapshot
353    /// would expire in less than 1 hour – then FAILED_PRECONDITION is returned.
354    /// See also the Snapshot.expire_time field. If the name is not provided in
355    /// the request, the server will assign a random
356    /// name for this snapshot on the same project as the subscription, conforming
357    /// to the [resource name format]
358    /// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The
359    /// generated name is populated in the returned Snapshot object. Note that for
360    /// REST API requests, you must specify a name in the request.
361    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
362    pub async fn create_snapshot(
363        &self,
364        req: CreateSnapshotRequest,
365        retry: Option<RetrySetting>,
366    ) -> Result<Response<Snapshot>, Status> {
367        let name = &req.name;
368        let action = || async {
369            let mut client = self.client();
370            let request = create_request(format!("name={name}"), req.clone());
371            client.create_snapshot(request).await.map_transient_err()
372        };
373        invoke(retry, action).await
374    }
375
376    /// update_snapshot updates an existing snapshot. Snapshots are used in
377    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
378    /// operations, which allow
379    /// you to manage message acknowledgments in bulk. That is, you can set the
380    /// acknowledgment state of messages in an existing subscription to the state
381    /// captured by a snapshot.
382    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
383    pub async fn update_snapshot(
384        &self,
385        req: UpdateSnapshotRequest,
386        retry: Option<RetrySetting>,
387    ) -> Result<Response<Snapshot>, Status> {
388        let name = match &req.snapshot {
389            Some(v) => v.name.as_str(),
390            None => "",
391        };
392        let action = || async {
393            let mut client = self.client();
394            let request = create_request(format!("snapshot.name={name}"), req.clone());
395            client.update_snapshot(request).await.map_transient_err()
396        };
397        invoke(retry, action).await
398    }
399
400    /// delete_snapshot removes an existing snapshot. Snapshots are used in [Seek]
401    /// (https://cloud.google.com/pubsub/docs/replay-overview (at https://cloud.google.com/pubsub/docs/replay-overview)) operations, which
402    /// allow you to manage message acknowledgments in bulk. That is, you can set
403    /// the acknowledgment state of messages in an existing subscription to the
404    /// state captured by a snapshot.
405    /// When the snapshot is deleted, all messages retained in the snapshot
406    /// are immediately dropped. After a snapshot is deleted, a new one may be
407    /// created with the same name, but the new one has no association with the old
408    /// snapshot or its subscription, unless the same subscription is specified.
409    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
410    pub async fn delete_snapshot(
411        &self,
412        req: DeleteSnapshotRequest,
413        retry: Option<RetrySetting>,
414    ) -> Result<Response<()>, Status> {
415        let name = &req.snapshot;
416        let action = || async {
417            let mut client = self.client();
418            let request = create_request(format!("snapshot={name}"), req.clone());
419            client.delete_snapshot(request).await.map_transient_err()
420        };
421        invoke(retry, action).await
422    }
423
424    // seek [seeks](https://cloud.google.com/pubsub/docs/replay-overview) a subscription to
425    // a point back in time (with a TimeStamp) or to a saved snapshot.
426    pub async fn seek(&self, req: SeekRequest, retry: Option<RetrySetting>) -> Result<Response<SeekResponse>, Status> {
427        let action = || async {
428            let mut client = self.client();
429            let subscription = req.subscription.clone();
430            let request = create_request(format!("subscription={subscription}"), req.clone());
431            client.seek(request).await.map_transient_err()
432        };
433        invoke(retry, action).await
434    }
435}