1use google_cloud_gax::grpc::{Code, Status};
2use google_cloud_gax::retry::RetrySetting;
3use google_cloud_googleapis::pubsub::v1::{
4 AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, StreamingPullRequest,
5};
6use std::ops::{Deref, DerefMut};
7use std::time::Duration;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10
11use crate::apiv1::default_retry_setting;
12use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
13
14#[derive(Debug, Clone)]
15pub struct ReceivedMessage {
16 pub message: PubsubMessage,
17 pub ack_id: String,
18 pub subscription: String,
19 pub subscriber_client: SubscriberClient,
20 pub delivery_attempt: Option<usize>,
21}
22
23impl ReceivedMessage {
24 pub(crate) fn new(
25 subscription: String,
26 subc: SubscriberClient,
27 message: PubsubMessage,
28 ack_id: String,
29 delivery_attempt: Option<usize>,
30 ) -> Self {
31 Self {
32 message,
33 ack_id,
34 subscription,
35 subscriber_client: subc,
36 delivery_attempt,
37 }
38 }
39
40 pub fn ack_id(&self) -> &str {
41 self.ack_id.as_str()
42 }
43
44 pub async fn ack(&self) -> Result<(), Status> {
45 ack(
46 &self.subscriber_client,
47 self.subscription.to_string(),
48 vec![self.ack_id.to_string()],
49 )
50 .await
51 }
52
53 pub async fn nack(&self) -> Result<(), Status> {
54 nack(
55 &self.subscriber_client,
56 self.subscription.to_string(),
57 vec![self.ack_id.to_string()],
58 )
59 .await
60 }
61
62 pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
63 modify_ack_deadline(
64 &self.subscriber_client,
65 self.subscription.to_string(),
66 vec![self.ack_id.to_string()],
67 ack_deadline_seconds,
68 )
69 .await
70 }
71
72 pub fn delivery_attempt(&self) -> Option<usize> {
80 self.delivery_attempt
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct SubscriberConfig {
86 pub ping_interval: Duration,
88 pub retry_setting: Option<RetrySetting>,
89 pub stream_ack_deadline_seconds: i32,
95 pub max_outstanding_messages: i64,
105 pub max_outstanding_bytes: i64,
106}
107
108impl Default for SubscriberConfig {
109 fn default() -> Self {
110 let mut retry = default_retry_setting();
112 retry.codes.push(Code::Cancelled);
113
114 Self {
115 ping_interval: Duration::from_secs(10),
116 retry_setting: Some(retry),
117 stream_ack_deadline_seconds: 60,
118 max_outstanding_messages: 50,
119 max_outstanding_bytes: 1000 * 1000 * 1000,
120 }
121 }
122}
123
124struct UnprocessedMessages {
125 tx: Option<oneshot::Sender<Option<Vec<String>>>>,
126 ack_ids: Option<Vec<String>>,
127}
128
129impl UnprocessedMessages {
130 fn new(tx: oneshot::Sender<Option<Vec<String>>>) -> Self {
131 Self {
132 tx: Some(tx),
133 ack_ids: Some(vec![]),
134 }
135 }
136}
137
138impl Deref for UnprocessedMessages {
139 type Target = Vec<String>;
140
141 fn deref(&self) -> &Self::Target {
142 self.ack_ids.as_ref().unwrap()
143 }
144}
145
146impl DerefMut for UnprocessedMessages {
147 fn deref_mut(&mut self) -> &mut Vec<String> {
148 self.ack_ids.as_mut().unwrap()
149 }
150}
151
152impl Drop for UnprocessedMessages {
153 fn drop(&mut self) {
154 if let Some(tx) = self.tx.take() {
155 let _ = tx.send(self.ack_ids.take());
156 }
157 }
158}
159
160#[derive(Debug)]
161pub(crate) struct Subscriber {
162 client: SubscriberClient,
163 subscription: String,
164 task_to_receive: Option<JoinHandle<()>>,
165 unprocessed_messages_receiver: Option<oneshot::Receiver<Option<Vec<String>>>>,
167}
168
169impl Drop for Subscriber {
170 fn drop(&mut self) {
171 if let Some(task) = self.task_to_receive.take() {
172 task.abort();
173 }
174 let rx = match self.unprocessed_messages_receiver.take() {
175 None => return,
176 Some(rx) => rx,
177 };
178 let subscription = self.subscription.clone();
179 let client = self.client.clone();
180 tracing::warn!(
181 "Subscriber is not disposed. Call dispose() to properly clean up resources. subscription={}",
182 &subscription
183 );
184 let task = async move {
185 if let Ok(Some(messages)) = rx.await {
186 if messages.is_empty() {
187 return;
188 }
189 tracing::debug!("nack {} unprocessed messages", messages.len());
190 if let Err(err) = nack(&client, subscription, messages).await {
191 tracing::error!("failed to nack message: {:?}", err);
192 }
193 }
194 };
195 let _forget = tokio::spawn(task);
196 }
197}
198
199impl Subscriber {
200 pub fn spawn(
201 subscription: String,
202 client: SubscriberClient,
203 queue: mpsc::Sender<ReceivedMessage>,
204 config: SubscriberConfig,
205 ) -> Self {
206 let subscription_clone = subscription.clone();
207 let client_clone = client.clone();
208
209 let (tx, rx) = oneshot::channel();
211 let task_to_receive = async move {
212 tracing::debug!("start subscriber: {}", subscription);
213
214 let retryable_codes = match &config.retry_setting {
215 Some(v) => v.codes.clone(),
216 None => default_retry_setting().codes,
217 };
218
219 let mut unprocessed_messages = UnprocessedMessages::new(tx);
220 loop {
221 let mut request = create_empty_streaming_pull_request();
222 request.subscription = subscription.to_string();
223 request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
224 request.max_outstanding_messages = config.max_outstanding_messages;
225 request.max_outstanding_bytes = config.max_outstanding_bytes;
226
227 let response = Self::receive(
228 client.clone(),
229 request,
230 config.clone(),
231 queue.clone(),
232 &mut unprocessed_messages,
233 )
234 .await;
235
236 if let Err(e) = response {
237 if retryable_codes.contains(&e.code()) {
238 tracing::trace!("refresh connection: subscriber will reconnect {:?} : {}", e, subscription);
239 continue;
240 } else {
241 tracing::error!("failed to receive message: subscriber will stop {:?} : {}", e, subscription);
242 break;
243 }
244 } else {
245 tracing::debug!("stopped to receive message: {}", subscription);
246 break;
247 }
248 }
249 tracing::debug!("stop subscriber: {}", subscription);
250 };
251
252 Self {
255 client: client_clone,
256 subscription: subscription_clone,
257 task_to_receive: Some(tokio::spawn(task_to_receive)),
258 unprocessed_messages_receiver: Some(rx),
259 }
260 }
261
262 async fn receive(
263 client: SubscriberClient,
264 request: StreamingPullRequest,
265 config: SubscriberConfig,
266 queue: mpsc::Sender<ReceivedMessage>,
267 unprocessed_messages: &mut Vec<String>,
268 ) -> Result<(), Status> {
269 let subscription = request.subscription.to_string();
270
271 let response = client
273 .streaming_pull(request, config.ping_interval, config.retry_setting.clone())
274 .await?;
275 let mut stream = response.into_inner();
276
277 loop {
279 let message = stream.message().await?;
280 let messages = match message {
281 Some(m) => m.received_messages,
282 None => return Ok(()),
283 };
284
285 let mut msgs = Vec::with_capacity(messages.len());
286 for received_message in messages {
287 if let Some(message) = received_message.message {
288 let id = message.message_id.clone();
289 tracing::trace!("message received: msg_id={id}");
290 let msg = ReceivedMessage::new(
291 subscription.clone(),
292 client.clone(),
293 message,
294 received_message.ack_id.clone(),
295 (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
296 );
297 unprocessed_messages.push(msg.ack_id.clone());
298 msgs.push(msg);
299 }
300 }
301
302 for msg in msgs.drain(..) {
303 let ack_id = msg.ack_id.clone();
304 if queue.send(msg).await.is_ok() {
305 unprocessed_messages.retain(|e| *e != ack_id);
306 } else {
307 break;
309 }
310 }
311 }
312 }
313
314 pub async fn dispose(mut self) -> usize {
315 if let Some(task) = self.task_to_receive.take() {
316 task.abort();
317 }
318 let mut count = 0;
319 let rx = match self.unprocessed_messages_receiver.take() {
320 None => return count,
321 Some(rx) => rx,
322 };
323
324 if let Ok(Some(messages)) = rx.await {
325 if messages.is_empty() {
327 return count;
328 }
329 let size = messages.len();
330 tracing::debug!("nack {} unprocessed messages", size);
331 let result = nack(&self.client, self.subscription.clone(), messages).await;
332 match result {
333 Ok(_) => count = size,
334 Err(err) => tracing::error!("failed to nack message: {:?}", err),
335 }
336 }
337 count
338 }
339}
340
341async fn modify_ack_deadline(
342 subscriber_client: &SubscriberClient,
343 subscription: String,
344 ack_ids: Vec<String>,
345 ack_deadline_seconds: i32,
346) -> Result<(), Status> {
347 if ack_ids.is_empty() {
348 return Ok(());
349 }
350 let req = ModifyAckDeadlineRequest {
351 subscription,
352 ack_deadline_seconds,
353 ack_ids,
354 };
355 subscriber_client
356 .modify_ack_deadline(req, None)
357 .await
358 .map(|e| e.into_inner())
359}
360
361pub(crate) async fn nack(
362 subscriber_client: &SubscriberClient,
363 subscription: String,
364 ack_ids: Vec<String>,
365) -> Result<(), Status> {
366 for chunk in ack_ids.chunks(100) {
367 modify_ack_deadline(subscriber_client, subscription.clone(), chunk.to_vec(), 0).await?;
368 }
369 Ok(())
370}
371
372pub(crate) async fn ack(
373 subscriber_client: &SubscriberClient,
374 subscription: String,
375 ack_ids: Vec<String>,
376) -> Result<(), Status> {
377 if ack_ids.is_empty() {
378 return Ok(());
379 }
380 let req = AcknowledgeRequest { subscription, ack_ids };
381 subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
382}