google_cloud_pubsub/apiv1/
subscriber_client.rs1use std::sync::Arc;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::create_request;
5use google_cloud_gax::grpc::Status;
6use google_cloud_gax::grpc::{IntoStreamingRequest, Response, Streaming};
7use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
8use google_cloud_googleapis::pubsub::v1::subscriber_client::SubscriberClient as InternalSubscriberClient;
9use google_cloud_googleapis::pubsub::v1::{
10 AcknowledgeRequest, CreateSnapshotRequest, DeleteSnapshotRequest, DeleteSubscriptionRequest, GetSnapshotRequest,
11 GetSubscriptionRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListSubscriptionsRequest,
12 ListSubscriptionsResponse, ModifyAckDeadlineRequest, ModifyPushConfigRequest, PullRequest, PullResponse,
13 SeekRequest, SeekResponse, Snapshot, StreamingPullRequest, StreamingPullResponse, Subscription,
14 UpdateSnapshotRequest, UpdateSubscriptionRequest,
15};
16
17use crate::apiv1::conn_pool::ConnectionManager;
18use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
19
20pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
21 StreamingPullRequest {
22 subscription: "".to_string(),
23 ack_ids: vec![],
24 modify_deadline_seconds: vec![],
25 modify_deadline_ack_ids: vec![],
26 stream_ack_deadline_seconds: 0,
27 client_id: "".to_string(),
28 max_outstanding_messages: 0,
29 max_outstanding_bytes: 0,
30 }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriberClient {
35 cm: Arc<ConnectionManager>,
36 streaming_pull_cm: Arc<ConnectionManager>,
37}
38
39#[allow(dead_code)]
40impl SubscriberClient {
41 pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient {
43 SubscriberClient {
44 cm: Arc::new(cm),
45 streaming_pull_cm: Arc::new(streaming_pull_cm),
46 }
47 }
48
49 #[inline]
50 fn client(&self) -> InternalSubscriberClient<Channel> {
51 InternalSubscriberClient::new(self.cm.conn())
52 .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
53 .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
54 }
55
56 #[inline]
57 fn client_for_streaming_pull(&self) -> InternalSubscriberClient<Channel> {
58 InternalSubscriberClient::new(self.streaming_pull_cm.conn())
59 .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
60 .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
61 }
62
63 pub(crate) fn streaming_pool_size(&self) -> usize {
64 self.streaming_pull_cm.num()
65 }
66
67 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79 pub async fn create_subscription(
80 &self,
81 req: Subscription,
82 retry: Option<RetrySetting>,
83 ) -> Result<Response<Subscription>, Status> {
84 let name = &req.name;
85 let action = || async {
86 let mut client = self.client();
87 let request = create_request(format!("name={name}"), req.clone());
88 client.create_subscription(request).await.map_transient_err()
89 };
90 invoke(retry, action).await
91 }
92
93 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
96 pub async fn update_subscription(
97 &self,
98 req: UpdateSubscriptionRequest,
99 retry: Option<RetrySetting>,
100 ) -> Result<Response<Subscription>, Status> {
101 let name = match &req.subscription {
102 Some(s) => s.name.as_str(),
103 None => "",
104 };
105 let action = || async {
106 let mut client = self.client();
107 let request = create_request(format!("subscription.name={name}"), req.clone());
108 client.update_subscription(request).await.map_transient_err()
109 };
110 invoke(retry, action).await
111 }
112
113 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
115 pub async fn get_subscription(
116 &self,
117 req: GetSubscriptionRequest,
118 retry: Option<RetrySetting>,
119 ) -> Result<Response<Subscription>, Status> {
120 let subscription = &req.subscription;
121 let action = || async {
122 let mut client = self.client();
123 let request = create_request(format!("subscription={subscription}"), req.clone());
124 client.get_subscription(request).await.map_transient_err()
125 };
126 invoke(retry, action).await
127 }
128
129 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
131 pub async fn list_subscriptions(
132 &self,
133 mut req: ListSubscriptionsRequest,
134 retry: Option<RetrySetting>,
135 ) -> Result<Vec<Subscription>, Status> {
136 let project = &req.project;
137 let mut all = vec![];
138 loop {
140 let action = || async {
141 let mut client = self.client();
142 let request = create_request(format!("project={project}"), req.clone());
143 client
144 .list_subscriptions(request)
145 .await
146 .map(|d| d.into_inner())
147 .map_transient_err()
148 };
149 let response: ListSubscriptionsResponse = invoke(retry.clone(), action).await?;
150 all.extend(response.subscriptions.into_iter());
151 if response.next_page_token.is_empty() {
152 return Ok(all);
153 }
154 req.page_token = response.next_page_token;
155 }
156 }
157
158 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
164 pub async fn delete_subscription(
165 &self,
166 req: DeleteSubscriptionRequest,
167 retry: Option<RetrySetting>,
168 ) -> Result<Response<()>, Status> {
169 let subscription = &req.subscription;
170 let action = || async {
171 let mut client = self.client();
172 let request = create_request(format!("subscription={subscription}"), req.clone());
173 client.delete_subscription(request).await.map_transient_err()
174 };
175 invoke(retry, action).await
176 }
177
178 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
184 pub async fn modify_ack_deadline(
185 &self,
186 req: ModifyAckDeadlineRequest,
187 retry: Option<RetrySetting>,
188 ) -> Result<Response<()>, Status> {
189 let subscription = &req.subscription;
190 let action = || async {
191 let mut client = self.client();
192 let request = create_request(format!("subscription={subscription}"), req.clone());
193 client.modify_ack_deadline(request).await.map_transient_err()
194 };
195 invoke(retry, action).await
196 }
197
198 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
206 pub async fn acknowledge(
207 &self,
208 req: AcknowledgeRequest,
209 retry: Option<RetrySetting>,
210 ) -> Result<Response<()>, Status> {
211 let subscription = &req.subscription;
212 let action = || async {
213 let mut client = self.client();
214 let request = create_request(format!("subscription={subscription}"), req.clone());
215 client.acknowledge(request).await.map_transient_err()
216 };
217 invoke(retry, action).await
218 }
219
220 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
224 pub async fn pull(&self, req: PullRequest, retry: Option<RetrySetting>) -> Result<Response<PullResponse>, Status> {
225 let subscription = &req.subscription;
226 let action = || async {
227 let mut client = self.client();
228 let request = create_request(format!("subscription={subscription}"), req.clone());
229 client.pull(request).await.map_transient_err()
230 };
231 invoke(retry, action).await
232 }
233
234 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
242 pub async fn streaming_pull(
243 &self,
244 req: StreamingPullRequest,
245 ping_receiver: async_channel::Receiver<bool>,
246 retry: Option<RetrySetting>,
247 ) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
248 let action = || async {
249 let mut client = self.client_for_streaming_pull();
250 let base_req = req.clone();
251 let rx = ping_receiver.clone();
252 let request = Box::pin(async_stream::stream! {
253 yield base_req.clone();
254
255 while let Ok(_r) = rx.recv().await {
258 yield create_empty_streaming_pull_request();
259 }
260 });
261 let mut v = request.into_streaming_request();
262 let target = v.metadata_mut();
263 target.append(
264 "x-goog-request-params",
265 format!("subscription={}", req.subscription).parse().unwrap(),
266 );
267 client.streaming_pull(v).await.map_transient_err()
268 };
269 invoke(retry, action).await
270 }
271
272 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
279 pub async fn modify_push_config(
280 &self,
281 req: ModifyPushConfigRequest,
282 retry: Option<RetrySetting>,
283 ) -> Result<Response<()>, Status> {
284 let subscription = &req.subscription;
285 let action = || async {
286 let mut client = self.client();
287 let request = create_request(format!("subscription={subscription}"), req.clone());
288 client.modify_push_config(request).await.map_transient_err()
289 };
290 invoke(retry, action).await
291 }
292
293 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
299 pub async fn get_snapshot(
300 &self,
301 req: GetSnapshotRequest,
302 retry: Option<RetrySetting>,
303 ) -> Result<Response<Snapshot>, Status> {
304 let snapshot = &req.snapshot;
305 let action = || async {
306 let mut client = self.client();
307 let request = create_request(format!("snapshot={snapshot}"), req.clone());
308 client.get_snapshot(request).await.map_transient_err()
309 };
310 invoke(retry, action).await
311 }
312
313 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
318 pub async fn list_snapshots(
319 &self,
320 mut req: ListSnapshotsRequest,
321 retry: Option<RetrySetting>,
322 ) -> Result<Vec<Snapshot>, Status> {
323 let project = &req.project;
324 let mut all = vec![];
325 loop {
327 let action = || async {
328 let mut client = self.client();
329 let request = create_request(format!("project={project}"), req.clone());
330 client
331 .list_snapshots(request)
332 .await
333 .map(|d| d.into_inner())
334 .map_transient_err()
335 };
336 let response: ListSnapshotsResponse = invoke(retry.clone(), action).await?;
337 all.extend(response.snapshots.into_iter());
338 if response.next_page_token.is_empty() {
339 return Ok(all);
340 }
341 req.page_token = response.next_page_token;
342 }
343 }
344
345 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
362 pub async fn create_snapshot(
363 &self,
364 req: CreateSnapshotRequest,
365 retry: Option<RetrySetting>,
366 ) -> Result<Response<Snapshot>, Status> {
367 let name = &req.name;
368 let action = || async {
369 let mut client = self.client();
370 let request = create_request(format!("name={name}"), req.clone());
371 client.create_snapshot(request).await.map_transient_err()
372 };
373 invoke(retry, action).await
374 }
375
376 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
383 pub async fn update_snapshot(
384 &self,
385 req: UpdateSnapshotRequest,
386 retry: Option<RetrySetting>,
387 ) -> Result<Response<Snapshot>, Status> {
388 let name = match &req.snapshot {
389 Some(v) => v.name.as_str(),
390 None => "",
391 };
392 let action = || async {
393 let mut client = self.client();
394 let request = create_request(format!("snapshot.name={name}"), req.clone());
395 client.update_snapshot(request).await.map_transient_err()
396 };
397 invoke(retry, action).await
398 }
399
400 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
410 pub async fn delete_snapshot(
411 &self,
412 req: DeleteSnapshotRequest,
413 retry: Option<RetrySetting>,
414 ) -> Result<Response<()>, Status> {
415 let name = &req.snapshot;
416 let action = || async {
417 let mut client = self.client();
418 let request = create_request(format!("snapshot={name}"), req.clone());
419 client.delete_snapshot(request).await.map_transient_err()
420 };
421 invoke(retry, action).await
422 }
423
424 pub async fn seek(&self, req: SeekRequest, retry: Option<RetrySetting>) -> Result<Response<SeekResponse>, Status> {
427 let action = || async {
428 let mut client = self.client();
429 let subscription = req.subscription.clone();
430 let request = create_request(format!("subscription={subscription}"), req.clone());
431 client.seek(request).await.map_transient_err()
432 };
433 invoke(retry, action).await
434 }
435}