1use google_cloud_gax::conn::Channel;
2use google_cloud_gax::create_request;
3use google_cloud_gax::grpc::Status;
4use google_cloud_gax::grpc::{IntoStreamingRequest, Response, Streaming};
5use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
6use google_cloud_googleapis::pubsub::v1::subscriber_client::SubscriberClient as InternalSubscriberClient;
7use google_cloud_googleapis::pubsub::v1::{
8 AcknowledgeRequest, CreateSnapshotRequest, DeleteSnapshotRequest, DeleteSubscriptionRequest, GetSnapshotRequest,
9 GetSubscriptionRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListSubscriptionsRequest,
10 ListSubscriptionsResponse, ModifyAckDeadlineRequest, ModifyPushConfigRequest, PullRequest, PullResponse,
11 SeekRequest, SeekResponse, Snapshot, StreamingPullRequest, StreamingPullResponse, Subscription,
12 UpdateSnapshotRequest, UpdateSubscriptionRequest,
13};
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::apiv1::conn_pool::ConnectionManager;
18use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
19
20pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
21 StreamingPullRequest {
22 subscription: "".to_string(),
23 ack_ids: vec![],
24 modify_deadline_seconds: vec![],
25 modify_deadline_ack_ids: vec![],
26 stream_ack_deadline_seconds: 0,
27 client_id: "".to_string(),
28 max_outstanding_messages: 0,
29 max_outstanding_bytes: 0,
30 }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriberClient {
35 cm: Arc<ConnectionManager>,
36 streaming_pull_cm: Arc<ConnectionManager>,
37}
38
39#[allow(dead_code)]
40impl SubscriberClient {
41 pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient {
43 SubscriberClient {
44 cm: Arc::new(cm),
45 streaming_pull_cm: Arc::new(streaming_pull_cm),
46 }
47 }
48
49 #[inline]
50 fn client(&self) -> InternalSubscriberClient<Channel> {
51 InternalSubscriberClient::new(self.cm.conn())
52 .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
53 .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
54 }
55
56 #[inline]
57 fn client_for_streaming_pull(&self) -> InternalSubscriberClient<Channel> {
58 InternalSubscriberClient::new(self.streaming_pull_cm.conn())
59 .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
60 .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
61 }
62
63 pub(crate) fn streaming_pool_size(&self) -> usize {
64 self.streaming_pull_cm.num()
65 }
66
67 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79 pub async fn create_subscription(
80 &self,
81 req: Subscription,
82 retry: Option<RetrySetting>,
83 ) -> Result<Response<Subscription>, Status> {
84 let name = &req.name;
85 let action = || async {
86 let mut client = self.client();
87 let request = create_request(format!("name={name}"), req.clone());
88 client.create_subscription(request).await.map_transient_err()
89 };
90 invoke(retry, action).await
91 }
92
93 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
96 pub async fn update_subscription(
97 &self,
98 req: UpdateSubscriptionRequest,
99 retry: Option<RetrySetting>,
100 ) -> Result<Response<Subscription>, Status> {
101 let name = match &req.subscription {
102 Some(s) => s.name.as_str(),
103 None => "",
104 };
105 let action = || async {
106 let mut client = self.client();
107 let request = create_request(format!("subscription.name={name}"), req.clone());
108 client.update_subscription(request).await.map_transient_err()
109 };
110 invoke(retry, action).await
111 }
112
113 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
115 pub async fn get_subscription(
116 &self,
117 req: GetSubscriptionRequest,
118 retry: Option<RetrySetting>,
119 ) -> Result<Response<Subscription>, Status> {
120 let subscription = &req.subscription;
121 let action = || async {
122 let mut client = self.client();
123 let request = create_request(format!("subscription={subscription}"), req.clone());
124 client.get_subscription(request).await.map_transient_err()
125 };
126 invoke(retry, action).await
127 }
128
129 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
131 pub async fn list_subscriptions(
132 &self,
133 mut req: ListSubscriptionsRequest,
134 retry: Option<RetrySetting>,
135 ) -> Result<Vec<Subscription>, Status> {
136 let project = &req.project;
137 let mut all = vec![];
138 loop {
140 let action = || async {
141 let mut client = self.client();
142 let request = create_request(format!("project={project}"), req.clone());
143 client
144 .list_subscriptions(request)
145 .await
146 .map(|d| d.into_inner())
147 .map_transient_err()
148 };
149 let response: ListSubscriptionsResponse = invoke(retry.clone(), action).await?;
150 all.extend(response.subscriptions.into_iter());
151 if response.next_page_token.is_empty() {
152 return Ok(all);
153 }
154 req.page_token = response.next_page_token;
155 }
156 }
157
158 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
164 pub async fn delete_subscription(
165 &self,
166 req: DeleteSubscriptionRequest,
167 retry: Option<RetrySetting>,
168 ) -> Result<Response<()>, Status> {
169 let subscription = &req.subscription;
170 let action = || async {
171 let mut client = self.client();
172 let request = create_request(format!("subscription={subscription}"), req.clone());
173 client.delete_subscription(request).await.map_transient_err()
174 };
175 invoke(retry, action).await
176 }
177
178 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
184 pub async fn modify_ack_deadline(
185 &self,
186 req: ModifyAckDeadlineRequest,
187 retry: Option<RetrySetting>,
188 ) -> Result<Response<()>, Status> {
189 let subscription = &req.subscription;
190 let action = || async {
191 let mut client = self.client();
192 let request = create_request(format!("subscription={subscription}"), req.clone());
193 client.modify_ack_deadline(request).await.map_transient_err()
194 };
195 invoke(retry, action).await
196 }
197
198 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
206 pub async fn acknowledge(
207 &self,
208 req: AcknowledgeRequest,
209 retry: Option<RetrySetting>,
210 ) -> Result<Response<()>, Status> {
211 let subscription = &req.subscription;
212 let action = || async {
213 let mut client = self.client();
214 let request = create_request(format!("subscription={subscription}"), req.clone());
215 client.acknowledge(request).await.map_transient_err()
216 };
217 invoke(retry, action).await
218 }
219
220 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
224 pub async fn pull(&self, req: PullRequest, retry: Option<RetrySetting>) -> Result<Response<PullResponse>, Status> {
225 let subscription = &req.subscription;
226 let action = || async {
227 let mut client = self.client();
228 let request = create_request(format!("subscription={subscription}"), req.clone());
229 client.pull(request).await.map_transient_err()
230 };
231 invoke(retry, action).await
232 }
233
234 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
242 pub async fn streaming_pull(
243 &self,
244 req: StreamingPullRequest,
245 interval: Duration,
246 retry: Option<RetrySetting>,
247 ) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
248 let action = || async {
249 let mut client = self.client_for_streaming_pull();
250 let base_req = req.clone();
251 let request = Box::pin(async_stream::stream! {
252 yield base_req.clone();
253 loop {
254 tokio::time::sleep(interval).await;
255 yield create_empty_streaming_pull_request();
256 }
257 });
258 let mut v = request.into_streaming_request();
259 let target = v.metadata_mut();
260 target.append(
261 "x-goog-request-params",
262 format!("subscription={}", req.subscription).parse().unwrap(),
263 );
264 client.streaming_pull(v).await.map_transient_err()
265 };
266 invoke(retry, action).await
267 }
268
269 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
276 pub async fn modify_push_config(
277 &self,
278 req: ModifyPushConfigRequest,
279 retry: Option<RetrySetting>,
280 ) -> Result<Response<()>, Status> {
281 let subscription = &req.subscription;
282 let action = || async {
283 let mut client = self.client();
284 let request = create_request(format!("subscription={subscription}"), req.clone());
285 client.modify_push_config(request).await.map_transient_err()
286 };
287 invoke(retry, action).await
288 }
289
290 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
296 pub async fn get_snapshot(
297 &self,
298 req: GetSnapshotRequest,
299 retry: Option<RetrySetting>,
300 ) -> Result<Response<Snapshot>, Status> {
301 let snapshot = &req.snapshot;
302 let action = || async {
303 let mut client = self.client();
304 let request = create_request(format!("snapshot={snapshot}"), req.clone());
305 client.get_snapshot(request).await.map_transient_err()
306 };
307 invoke(retry, action).await
308 }
309
310 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
315 pub async fn list_snapshots(
316 &self,
317 mut req: ListSnapshotsRequest,
318 retry: Option<RetrySetting>,
319 ) -> Result<Vec<Snapshot>, Status> {
320 let project = &req.project;
321 let mut all = vec![];
322 loop {
324 let action = || async {
325 let mut client = self.client();
326 let request = create_request(format!("project={project}"), req.clone());
327 client
328 .list_snapshots(request)
329 .await
330 .map(|d| d.into_inner())
331 .map_transient_err()
332 };
333 let response: ListSnapshotsResponse = invoke(retry.clone(), action).await?;
334 all.extend(response.snapshots.into_iter());
335 if response.next_page_token.is_empty() {
336 return Ok(all);
337 }
338 req.page_token = response.next_page_token;
339 }
340 }
341
342 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
359 pub async fn create_snapshot(
360 &self,
361 req: CreateSnapshotRequest,
362 retry: Option<RetrySetting>,
363 ) -> Result<Response<Snapshot>, Status> {
364 let name = &req.name;
365 let action = || async {
366 let mut client = self.client();
367 let request = create_request(format!("name={name}"), req.clone());
368 client.create_snapshot(request).await.map_transient_err()
369 };
370 invoke(retry, action).await
371 }
372
373 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
380 pub async fn update_snapshot(
381 &self,
382 req: UpdateSnapshotRequest,
383 retry: Option<RetrySetting>,
384 ) -> Result<Response<Snapshot>, Status> {
385 let name = match &req.snapshot {
386 Some(v) => v.name.as_str(),
387 None => "",
388 };
389 let action = || async {
390 let mut client = self.client();
391 let request = create_request(format!("snapshot.name={name}"), req.clone());
392 client.update_snapshot(request).await.map_transient_err()
393 };
394 invoke(retry, action).await
395 }
396
397 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
407 pub async fn delete_snapshot(
408 &self,
409 req: DeleteSnapshotRequest,
410 retry: Option<RetrySetting>,
411 ) -> Result<Response<()>, Status> {
412 let name = &req.snapshot;
413 let action = || async {
414 let mut client = self.client();
415 let request = create_request(format!("snapshot={name}"), req.clone());
416 client.delete_snapshot(request).await.map_transient_err()
417 };
418 invoke(retry, action).await
419 }
420
421 pub async fn seek(&self, req: SeekRequest, retry: Option<RetrySetting>) -> Result<Response<SeekResponse>, Status> {
424 let action = || async {
425 let mut client = self.client();
426 let subscription = req.subscription.clone();
427 let request = create_request(format!("subscription={subscription}"), req.clone());
428 client.seek(request).await.map_transient_err()
429 };
430 invoke(retry, action).await
431 }
432}