1use google_cloud_gax::grpc::{Code, Status};
2use google_cloud_gax::retry::RetrySetting;
3use google_cloud_googleapis::pubsub::v1::{
4 AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, StreamingPullRequest,
5};
6use std::ops::{Deref, DerefMut};
7use std::time::Duration;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10
11use crate::apiv1::default_retry_setting as base_retry_setting;
12use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
13
14#[derive(Debug, Clone)]
15pub struct ReceivedMessage {
16 pub message: PubsubMessage,
17 pub ack_id: String,
18 pub subscription: String,
19 pub subscriber_client: SubscriberClient,
20 pub delivery_attempt: Option<usize>,
21}
22
23impl ReceivedMessage {
24 pub(crate) fn new(
25 subscription: String,
26 subc: SubscriberClient,
27 message: PubsubMessage,
28 ack_id: String,
29 delivery_attempt: Option<usize>,
30 ) -> Self {
31 Self {
32 message,
33 ack_id,
34 subscription,
35 subscriber_client: subc,
36 delivery_attempt,
37 }
38 }
39
40 pub fn ack_id(&self) -> &str {
41 self.ack_id.as_str()
42 }
43
44 pub async fn ack(&self) -> Result<(), Status> {
45 ack(
46 &self.subscriber_client,
47 self.subscription.to_string(),
48 vec![self.ack_id.to_string()],
49 )
50 .await
51 }
52
53 pub async fn nack(&self) -> Result<(), Status> {
54 nack(
55 &self.subscriber_client,
56 self.subscription.to_string(),
57 vec![self.ack_id.to_string()],
58 )
59 .await
60 }
61
62 pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
63 modify_ack_deadline(
64 &self.subscriber_client,
65 self.subscription.to_string(),
66 vec![self.ack_id.to_string()],
67 ack_deadline_seconds,
68 )
69 .await
70 }
71
72 pub fn delivery_attempt(&self) -> Option<usize> {
80 self.delivery_attempt
81 }
82}
83
84fn default_retry_setting() -> RetrySetting {
85 let mut retry = base_retry_setting();
86 retry.codes.push(Code::Cancelled);
88 retry
89}
90
91#[derive(Debug, Clone)]
92pub struct SubscriberConfig {
93 pub ping_interval: Duration,
95 pub retry_setting: Option<RetrySetting>,
96 pub stream_ack_deadline_seconds: i32,
102 pub max_outstanding_messages: i64,
112 pub max_outstanding_bytes: i64,
113}
114
115impl Default for SubscriberConfig {
116 fn default() -> Self {
117 Self {
118 ping_interval: Duration::from_secs(10),
119 retry_setting: Some(default_retry_setting()),
120 stream_ack_deadline_seconds: 60,
121 max_outstanding_messages: 50,
122 max_outstanding_bytes: 1000 * 1000 * 1000,
123 }
124 }
125}
126
127struct UnprocessedMessages {
128 tx: Option<oneshot::Sender<Option<Vec<String>>>>,
129 ack_ids: Option<Vec<String>>,
130}
131
132impl UnprocessedMessages {
133 fn new(tx: oneshot::Sender<Option<Vec<String>>>) -> Self {
134 Self {
135 tx: Some(tx),
136 ack_ids: Some(vec![]),
137 }
138 }
139}
140
141impl Deref for UnprocessedMessages {
142 type Target = Vec<String>;
143
144 fn deref(&self) -> &Self::Target {
145 self.ack_ids.as_ref().unwrap()
146 }
147}
148
149impl DerefMut for UnprocessedMessages {
150 fn deref_mut(&mut self) -> &mut Vec<String> {
151 self.ack_ids.as_mut().unwrap()
152 }
153}
154
155impl Drop for UnprocessedMessages {
156 fn drop(&mut self) {
157 if let Some(tx) = self.tx.take() {
158 let _ = tx.send(self.ack_ids.take());
159 }
160 }
161}
162
163#[derive(Debug)]
164pub(crate) struct Subscriber {
165 client: SubscriberClient,
166 subscription: String,
167 task_to_receive: Option<JoinHandle<()>>,
168 unprocessed_messages_receiver: Option<oneshot::Receiver<Option<Vec<String>>>>,
170}
171
172impl Drop for Subscriber {
173 fn drop(&mut self) {
174 if let Some(task) = self.task_to_receive.take() {
175 task.abort();
176 }
177 let rx = match self.unprocessed_messages_receiver.take() {
178 None => return,
179 Some(rx) => rx,
180 };
181 let subscription = self.subscription.clone();
182 let client = self.client.clone();
183 tracing::warn!(
184 "Subscriber is not disposed. Call dispose() to properly clean up resources. subscription={}",
185 &subscription
186 );
187 let task = async move {
188 if let Ok(Some(messages)) = rx.await {
189 if messages.is_empty() {
190 return;
191 }
192 tracing::debug!("nack {} unprocessed messages", messages.len());
193 if let Err(err) = nack(&client, subscription, messages).await {
194 tracing::error!("failed to nack message: {:?}", err);
195 }
196 }
197 };
198 let _forget = tokio::spawn(task);
199 }
200}
201
202impl Subscriber {
203 pub fn spawn(
204 subscription: String,
205 client: SubscriberClient,
206 queue: mpsc::Sender<ReceivedMessage>,
207 config: SubscriberConfig,
208 ) -> Self {
209 let subscription_clone = subscription.clone();
210 let client_clone = client.clone();
211
212 let (tx, rx) = oneshot::channel();
214 let task_to_receive = async move {
215 tracing::debug!("start subscriber: {}", subscription);
216
217 let retryable_codes = match &config.retry_setting {
218 Some(v) => v.codes.clone(),
219 None => default_retry_setting().codes,
220 };
221
222 let mut unprocessed_messages = UnprocessedMessages::new(tx);
223 loop {
224 let mut request = create_empty_streaming_pull_request();
225 request.subscription = subscription.to_string();
226 request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
227 request.max_outstanding_messages = config.max_outstanding_messages;
228 request.max_outstanding_bytes = config.max_outstanding_bytes;
229
230 let response = Self::receive(
231 client.clone(),
232 request,
233 config.clone(),
234 queue.clone(),
235 &mut unprocessed_messages,
236 )
237 .await;
238
239 if let Err(e) = response {
240 if retryable_codes.contains(&e.code()) {
241 tracing::trace!("refresh connection: subscriber will reconnect {:?} : {}", e, subscription);
242 continue;
243 } else {
244 tracing::error!("failed to receive message: subscriber will stop {:?} : {}", e, subscription);
245 break;
246 }
247 } else {
248 tracing::debug!("stopped to receive message: {}", subscription);
249 break;
250 }
251 }
252 tracing::debug!("stop subscriber: {}", subscription);
253 };
254
255 Self {
258 client: client_clone,
259 subscription: subscription_clone,
260 task_to_receive: Some(tokio::spawn(task_to_receive)),
261 unprocessed_messages_receiver: Some(rx),
262 }
263 }
264
265 async fn receive(
266 client: SubscriberClient,
267 request: StreamingPullRequest,
268 config: SubscriberConfig,
269 queue: mpsc::Sender<ReceivedMessage>,
270 unprocessed_messages: &mut Vec<String>,
271 ) -> Result<(), Status> {
272 let subscription = request.subscription.to_string();
273
274 let response = client
276 .streaming_pull(request, config.ping_interval, config.retry_setting.clone())
277 .await?;
278 let mut stream = response.into_inner();
279
280 loop {
282 let message = stream.message().await?;
283 let messages = match message {
284 Some(m) => m.received_messages,
285 None => return Ok(()),
286 };
287
288 let mut msgs = Vec::with_capacity(messages.len());
289 for received_message in messages {
290 if let Some(message) = received_message.message {
291 let id = message.message_id.clone();
292 tracing::trace!("message received: msg_id={id}");
293 let msg = ReceivedMessage::new(
294 subscription.clone(),
295 client.clone(),
296 message,
297 received_message.ack_id.clone(),
298 (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
299 );
300 unprocessed_messages.push(msg.ack_id.clone());
301 msgs.push(msg);
302 }
303 }
304
305 for msg in msgs.drain(..) {
306 let ack_id = msg.ack_id.clone();
307 if queue.send(msg).await.is_ok() {
308 unprocessed_messages.retain(|e| *e != ack_id);
309 } else {
310 break;
312 }
313 }
314 }
315 }
316
317 pub async fn dispose(mut self) -> usize {
318 if let Some(task) = self.task_to_receive.take() {
319 task.abort();
320 }
321 let mut count = 0;
322 let rx = match self.unprocessed_messages_receiver.take() {
323 None => return count,
324 Some(rx) => rx,
325 };
326
327 if let Ok(Some(messages)) = rx.await {
328 if messages.is_empty() {
330 return count;
331 }
332 let size = messages.len();
333 tracing::debug!("nack {} unprocessed messages", size);
334 let result = nack(&self.client, self.subscription.clone(), messages).await;
335 match result {
336 Ok(_) => count = size,
337 Err(err) => tracing::error!("failed to nack message: {:?}", err),
338 }
339 }
340 count
341 }
342}
343
344async fn modify_ack_deadline(
345 subscriber_client: &SubscriberClient,
346 subscription: String,
347 ack_ids: Vec<String>,
348 ack_deadline_seconds: i32,
349) -> Result<(), Status> {
350 if ack_ids.is_empty() {
351 return Ok(());
352 }
353 let req = ModifyAckDeadlineRequest {
354 subscription,
355 ack_deadline_seconds,
356 ack_ids,
357 };
358 subscriber_client
359 .modify_ack_deadline(req, None)
360 .await
361 .map(|e| e.into_inner())
362}
363
364pub(crate) async fn nack(
365 subscriber_client: &SubscriberClient,
366 subscription: String,
367 ack_ids: Vec<String>,
368) -> Result<(), Status> {
369 for chunk in ack_ids.chunks(100) {
370 modify_ack_deadline(subscriber_client, subscription.clone(), chunk.to_vec(), 0).await?;
371 }
372 Ok(())
373}
374
375pub(crate) async fn ack(
376 subscriber_client: &SubscriberClient,
377 subscription: String,
378 ack_ids: Vec<String>,
379) -> Result<(), Status> {
380 if ack_ids.is_empty() {
381 return Ok(());
382 }
383 let req = AcknowledgeRequest { subscription, ack_ids };
384 subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
385}