1use std::time::Duration;
2
3use tokio::select;
4use tokio::task::JoinHandle;
5use tokio::time::sleep;
6use tokio_util::sync::CancellationToken;
7
8use google_cloud_gax::grpc::{Code, Status, Streaming};
9use google_cloud_gax::retry::RetrySetting;
10use google_cloud_googleapis::pubsub::v1::{
11 AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, ReceivedMessage as InternalReceivedMessage,
12 StreamingPullResponse,
13};
14
15use crate::apiv1::default_retry_setting;
16use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
17
18#[derive(Debug)]
19pub struct ReceivedMessage {
20 pub message: PubsubMessage,
21 ack_id: String,
22 subscription: String,
23 subscriber_client: SubscriberClient,
24 delivery_attempt: Option<usize>,
25}
26
27impl ReceivedMessage {
28 pub(crate) fn new(
29 subscription: String,
30 subc: SubscriberClient,
31 message: PubsubMessage,
32 ack_id: String,
33 delivery_attempt: Option<usize>,
34 ) -> Self {
35 Self {
36 message,
37 ack_id,
38 subscription,
39 subscriber_client: subc,
40 delivery_attempt,
41 }
42 }
43
44 pub fn ack_id(&self) -> &str {
45 self.ack_id.as_str()
46 }
47
48 pub async fn ack(&self) -> Result<(), Status> {
49 ack(
50 &self.subscriber_client,
51 self.subscription.to_string(),
52 vec![self.ack_id.to_string()],
53 )
54 .await
55 }
56
57 pub async fn nack(&self) -> Result<(), Status> {
58 nack(
59 &self.subscriber_client,
60 self.subscription.to_string(),
61 vec![self.ack_id.to_string()],
62 )
63 .await
64 }
65
66 pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
67 modify_ack_deadline(
68 &self.subscriber_client,
69 self.subscription.to_string(),
70 vec![self.ack_id.to_string()],
71 ack_deadline_seconds,
72 )
73 .await
74 }
75
76 pub fn delivery_attempt(&self) -> Option<usize> {
84 self.delivery_attempt
85 }
86}
87
88#[derive(Debug, Clone)]
89pub struct SubscriberConfig {
90 pub ping_interval: Duration,
92 pub retry_setting: Option<RetrySetting>,
93 pub stream_ack_deadline_seconds: i32,
99 pub max_outstanding_messages: i64,
109 pub max_outstanding_bytes: i64,
110}
111
112impl Default for SubscriberConfig {
113 fn default() -> Self {
114 Self {
115 ping_interval: std::time::Duration::from_secs(10),
116 retry_setting: Some(default_retry_setting()),
117 stream_ack_deadline_seconds: 60,
118 max_outstanding_messages: 50,
119 max_outstanding_bytes: 1000 * 1000 * 1000,
120 }
121 }
122}
123
124#[derive(Debug)]
125pub(crate) struct Subscriber {
126 pinger: Option<JoinHandle<()>>,
127 inner: Option<JoinHandle<()>>,
128}
129
130impl Subscriber {
131 pub fn start(
132 ctx: CancellationToken,
133 subscription: String,
134 client: SubscriberClient,
135 queue: async_channel::Sender<ReceivedMessage>,
136 config: SubscriberConfig,
137 ) -> Self {
138 let (ping_sender, ping_receiver) = async_channel::unbounded();
139
140 let subscription_clone = subscription.to_string();
142
143 let cancel_receiver = ctx.clone();
144 let pinger = tokio::spawn(async move {
145 loop {
146 select! {
147 _ = ctx.cancelled() => {
148 ping_sender.close();
149 break;
150 }
151 _ = sleep(config.ping_interval) => {
152 let _ = ping_sender.send(true).await;
153 }
154 }
155 }
156 tracing::trace!("stop pinger : {}", subscription_clone);
157 });
158
159 let inner = tokio::spawn(async move {
160 tracing::trace!("start subscriber: {}", subscription);
161 let retryable_codes = match &config.retry_setting {
162 Some(v) => v.codes.clone(),
163 None => default_retry_setting().codes,
164 };
165 loop {
166 let mut request = create_empty_streaming_pull_request();
167 request.subscription = subscription.to_string();
168 request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
169 request.max_outstanding_messages = config.max_outstanding_messages;
170 request.max_outstanding_bytes = config.max_outstanding_bytes;
171
172 let response = client
173 .streaming_pull(request, ping_receiver.clone(), config.retry_setting.clone())
174 .await;
175
176 let stream = match response {
177 Ok(r) => r.into_inner(),
178 Err(e) => {
179 if e.code() == Code::Cancelled {
180 tracing::trace!("stop subscriber : {}", subscription);
181 break;
182 } else if retryable_codes.contains(&e.code()) {
183 tracing::warn!("failed to start streaming: will reconnect {:?} : {}", e, subscription);
184 continue;
185 } else {
186 tracing::error!("failed to start streaming: will stop {:?} : {}", e, subscription);
187 break;
188 }
189 }
190 };
191 match Self::recv(
192 client.clone(),
193 stream,
194 subscription.as_str(),
195 cancel_receiver.clone(),
196 queue.clone(),
197 )
198 .await
199 {
200 Ok(_) => break,
201 Err(e) => {
202 if retryable_codes.contains(&e.code()) {
203 tracing::trace!("reconnect - '{:?}' : {} ", e, subscription);
204 continue;
205 } else {
206 tracing::error!("terminated subscriber streaming with error {:?} : {}", e, subscription);
207 break;
208 }
209 }
210 }
211 }
212 tracing::trace!("stop subscriber in streaming: {}", subscription);
214 });
215 Self {
216 pinger: Some(pinger),
217 inner: Some(inner),
218 }
219 }
220
221 async fn recv(
222 client: SubscriberClient,
223 mut stream: Streaming<StreamingPullResponse>,
224 subscription: &str,
225 cancel: CancellationToken,
226 queue: async_channel::Sender<ReceivedMessage>,
227 ) -> Result<(), Status> {
228 tracing::trace!("start streaming: {}", subscription);
229 loop {
230 select! {
231 _ = cancel.cancelled() => {
232 queue.close();
233 return Ok(());
234 }
235 maybe = stream.message() => {
236 let message = maybe?;
237 let message = match message {
238 Some(m) => m,
239 None => return Ok(())
240 };
241 let _ = handle_message(&cancel, &queue, &client, subscription, message.received_messages).await;
242 }
243 }
244 }
245 }
246
247 pub async fn done(&mut self) {
248 if let Some(v) = self.pinger.take() {
249 let _ = v.await;
250 }
251 if let Some(v) = self.inner.take() {
252 let _ = v.await;
253 }
254 }
255}
256
257async fn handle_message(
258 cancel: &CancellationToken,
259 queue: &async_channel::Sender<ReceivedMessage>,
260 client: &SubscriberClient,
261 subscription: &str,
262 messages: Vec<InternalReceivedMessage>,
263) -> usize {
264 let mut nack_targets = vec![];
265 for received_message in messages {
266 if let Some(message) = received_message.message {
267 let id = message.message_id.clone();
268 tracing::debug!("message received: msg_id={id}");
269 let msg = ReceivedMessage::new(
270 subscription.to_string(),
271 client.clone(),
272 message,
273 received_message.ack_id.clone(),
274 (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
275 );
276 let should_nack = select! {
277 result = queue.send(msg) => result.is_err(),
278 _ = cancel.cancelled() => true
279 };
280 if should_nack {
281 tracing::info!("cancelled -> so nack immediately : msg_id={id}");
282 nack_targets.push(received_message.ack_id);
283 }
284 }
285 }
286 let size = nack_targets.len();
287 if size > 0 {
288 if let Err(err) = nack(client, subscription.to_string(), nack_targets).await {
290 tracing::error!(
291 "failed to nack immediately {err}. The messages will be redelivered after the ack deadline."
292 );
293 }
294 }
295 size
296}
297
298async fn modify_ack_deadline(
299 subscriber_client: &SubscriberClient,
300 subscription: String,
301 ack_ids: Vec<String>,
302 ack_deadline_seconds: i32,
303) -> Result<(), Status> {
304 if ack_ids.is_empty() {
305 return Ok(());
306 }
307 let req = ModifyAckDeadlineRequest {
308 subscription,
309 ack_deadline_seconds,
310 ack_ids,
311 };
312 subscriber_client
313 .modify_ack_deadline(req, None)
314 .await
315 .map(|e| e.into_inner())
316}
317
318async fn nack(subscriber_client: &SubscriberClient, subscription: String, ack_ids: Vec<String>) -> Result<(), Status> {
319 modify_ack_deadline(subscriber_client, subscription, ack_ids, 0).await
320}
321
322pub(crate) async fn ack(
323 subscriber_client: &SubscriberClient,
324 subscription: String,
325 ack_ids: Vec<String>,
326) -> Result<(), Status> {
327 if ack_ids.is_empty() {
328 return Ok(());
329 }
330 let req = AcknowledgeRequest { subscription, ack_ids };
331 subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
332}
333
334#[cfg(test)]
335mod tests {
336 use serial_test::serial;
337 use tokio_util::sync::CancellationToken;
338
339 use google_cloud_gax::conn::{ConnectionOptions, Environment};
340 use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage, PullRequest};
341
342 use crate::apiv1::conn_pool::ConnectionManager;
343 use crate::apiv1::publisher_client::PublisherClient;
344 use crate::apiv1::subscriber_client::SubscriberClient;
345 use crate::subscriber::handle_message;
346
347 #[ctor::ctor]
348 fn init() {
349 let _ = tracing_subscriber::fmt().try_init();
350 }
351
352 #[tokio::test(flavor = "multi_thread")]
353 #[serial]
354 async fn test_handle_message_immediately_nack() {
355 let cm = || async {
356 ConnectionManager::new(
357 4,
358 "",
359 &Environment::Emulator("localhost:8681".to_string()),
360 &ConnectionOptions::default(),
361 )
362 .await
363 .unwrap()
364 };
365 let subc = SubscriberClient::new(cm().await, cm().await);
366 let pubc = PublisherClient::new(cm().await);
367
368 pubc.publish(
369 PublishRequest {
370 topic: "projects/local-project/topics/test-topic1".to_string(),
371 messages: vec![PubsubMessage {
372 data: "hoge".into(),
373 ..Default::default()
374 }],
375 },
376 None,
377 )
378 .await
379 .unwrap();
380
381 let subscription = "projects/local-project/subscriptions/test-subscription1";
382 let response = subc
383 .pull(
384 PullRequest {
385 subscription: subscription.to_string(),
386 max_messages: 1,
387 ..Default::default()
388 },
389 None,
390 )
391 .await
392 .unwrap()
393 .into_inner();
394
395 let messages = response.received_messages;
396 let (queue, _) = async_channel::unbounded();
397 queue.close();
398 let nack_size = handle_message(&CancellationToken::new(), &queue, &subc, subscription, messages).await;
399 assert_eq!(1, nack_size);
400 }
401}