google_cloud_pubsub/
subscriber.rs

1use std::time::Duration;
2
3use tokio::select;
4use tokio::task::JoinHandle;
5use tokio::time::sleep;
6use tokio_util::sync::CancellationToken;
7
8use google_cloud_gax::grpc::{Code, Status, Streaming};
9use google_cloud_gax::retry::RetrySetting;
10use google_cloud_googleapis::pubsub::v1::{
11    AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, ReceivedMessage as InternalReceivedMessage,
12    StreamingPullResponse,
13};
14
15use crate::apiv1::default_retry_setting;
16use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
17
18#[derive(Debug)]
19pub struct ReceivedMessage {
20    pub message: PubsubMessage,
21    ack_id: String,
22    subscription: String,
23    subscriber_client: SubscriberClient,
24    delivery_attempt: Option<usize>,
25}
26
27impl ReceivedMessage {
28    pub(crate) fn new(
29        subscription: String,
30        subc: SubscriberClient,
31        message: PubsubMessage,
32        ack_id: String,
33        delivery_attempt: Option<usize>,
34    ) -> Self {
35        Self {
36            message,
37            ack_id,
38            subscription,
39            subscriber_client: subc,
40            delivery_attempt,
41        }
42    }
43
44    pub fn ack_id(&self) -> &str {
45        self.ack_id.as_str()
46    }
47
48    pub async fn ack(&self) -> Result<(), Status> {
49        ack(
50            &self.subscriber_client,
51            self.subscription.to_string(),
52            vec![self.ack_id.to_string()],
53        )
54        .await
55    }
56
57    pub async fn nack(&self) -> Result<(), Status> {
58        nack(
59            &self.subscriber_client,
60            self.subscription.to_string(),
61            vec![self.ack_id.to_string()],
62        )
63        .await
64    }
65
66    pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
67        modify_ack_deadline(
68            &self.subscriber_client,
69            self.subscription.to_string(),
70            vec![self.ack_id.to_string()],
71            ack_deadline_seconds,
72        )
73        .await
74    }
75
76    /// The approximate number of times that Cloud Pub/Sub has attempted to deliver
77    /// the associated message to a subscriber.
78    ///
79    /// The returned value, if present, will be greater than zero.
80    ///
81    /// For more information refer to the
82    /// [protobuf definition](https://github.com/googleapis/googleapis/blob/3c7c76fb63d0f511cdb8c3c1cbc157315f6fbfd3/google/pubsub/v1/pubsub.proto#L1099-L1115).
83    pub fn delivery_attempt(&self) -> Option<usize> {
84        self.delivery_attempt
85    }
86}
87
88#[derive(Debug, Clone)]
89pub struct SubscriberConfig {
90    /// ping interval for Bi Directional Streaming
91    pub ping_interval: Duration,
92    pub retry_setting: Option<RetrySetting>,
93    /// It is important for exactly_once_delivery
94    /// The ack deadline to use for the stream. This must be provided in
95    /// the first request on the stream, but it can also be updated on subsequent
96    /// requests from client to server. The minimum deadline you can specify is 10
97    /// seconds. The maximum deadline you can specify is 600 seconds (10 minutes).
98    pub stream_ack_deadline_seconds: i32,
99    /// Flow control settings for the maximum number of outstanding messages. When
100    /// there are `max_outstanding_messages` or more currently sent to the
101    /// streaming pull client that have not yet been acked or nacked, the server
102    /// stops sending more messages. The sending of messages resumes once the
103    /// number of outstanding messages is less than this value. If the value is
104    /// <= 0, there is no limit to the number of outstanding messages. This
105    /// property can only be set on the initial StreamingPullRequest. If it is set
106    /// on a subsequent request, the stream will be aborted with status
107    /// `INVALID_ARGUMENT`.
108    pub max_outstanding_messages: i64,
109    pub max_outstanding_bytes: i64,
110}
111
112impl Default for SubscriberConfig {
113    fn default() -> Self {
114        Self {
115            ping_interval: std::time::Duration::from_secs(10),
116            retry_setting: Some(default_retry_setting()),
117            stream_ack_deadline_seconds: 60,
118            max_outstanding_messages: 50,
119            max_outstanding_bytes: 1000 * 1000 * 1000,
120        }
121    }
122}
123
124#[derive(Debug)]
125pub(crate) struct Subscriber {
126    pinger: Option<JoinHandle<()>>,
127    inner: Option<JoinHandle<()>>,
128}
129
130impl Subscriber {
131    pub fn start(
132        ctx: CancellationToken,
133        subscription: String,
134        client: SubscriberClient,
135        queue: async_channel::Sender<ReceivedMessage>,
136        config: SubscriberConfig,
137    ) -> Self {
138        let (ping_sender, ping_receiver) = async_channel::unbounded();
139
140        // ping request
141        let subscription_clone = subscription.to_string();
142
143        let cancel_receiver = ctx.clone();
144        let pinger = tokio::spawn(async move {
145            loop {
146                select! {
147                    _ = ctx.cancelled() => {
148                        ping_sender.close();
149                        break;
150                    }
151                    _ = sleep(config.ping_interval) => {
152                        let _ = ping_sender.send(true).await;
153                    }
154                }
155            }
156            tracing::trace!("stop pinger : {}", subscription_clone);
157        });
158
159        let inner = tokio::spawn(async move {
160            tracing::trace!("start subscriber: {}", subscription);
161            let retryable_codes = match &config.retry_setting {
162                Some(v) => v.codes.clone(),
163                None => default_retry_setting().codes,
164            };
165            loop {
166                let mut request = create_empty_streaming_pull_request();
167                request.subscription = subscription.to_string();
168                request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
169                request.max_outstanding_messages = config.max_outstanding_messages;
170                request.max_outstanding_bytes = config.max_outstanding_bytes;
171
172                let response = client
173                    .streaming_pull(request, ping_receiver.clone(), config.retry_setting.clone())
174                    .await;
175
176                let stream = match response {
177                    Ok(r) => r.into_inner(),
178                    Err(e) => {
179                        if e.code() == Code::Cancelled {
180                            tracing::trace!("stop subscriber : {}", subscription);
181                            break;
182                        } else if retryable_codes.contains(&e.code()) {
183                            tracing::warn!("failed to start streaming: will reconnect {:?} : {}", e, subscription);
184                            continue;
185                        } else {
186                            tracing::error!("failed to start streaming: will stop {:?} : {}", e, subscription);
187                            break;
188                        }
189                    }
190                };
191                match Self::recv(
192                    client.clone(),
193                    stream,
194                    subscription.as_str(),
195                    cancel_receiver.clone(),
196                    queue.clone(),
197                )
198                .await
199                {
200                    Ok(_) => break,
201                    Err(e) => {
202                        if retryable_codes.contains(&e.code()) {
203                            tracing::trace!("reconnect - '{:?}' : {} ", e, subscription);
204                            continue;
205                        } else {
206                            tracing::error!("terminated subscriber streaming with error {:?} : {}", e, subscription);
207                            break;
208                        }
209                    }
210                }
211            }
212            // streaming request is closed when the ping_sender closed.
213            tracing::trace!("stop subscriber in streaming: {}", subscription);
214        });
215        Self {
216            pinger: Some(pinger),
217            inner: Some(inner),
218        }
219    }
220
221    async fn recv(
222        client: SubscriberClient,
223        mut stream: Streaming<StreamingPullResponse>,
224        subscription: &str,
225        cancel: CancellationToken,
226        queue: async_channel::Sender<ReceivedMessage>,
227    ) -> Result<(), Status> {
228        tracing::trace!("start streaming: {}", subscription);
229        loop {
230            select! {
231                _ = cancel.cancelled() => {
232                    queue.close();
233                    return Ok(());
234                }
235                maybe = stream.message() => {
236                    let message = maybe?;
237                    let message = match message {
238                        Some(m) => m,
239                        None => return Ok(())
240                    };
241                    let _ = handle_message(&cancel, &queue, &client, subscription, message.received_messages).await;
242                }
243            }
244        }
245    }
246
247    pub async fn done(&mut self) {
248        if let Some(v) = self.pinger.take() {
249            let _ = v.await;
250        }
251        if let Some(v) = self.inner.take() {
252            let _ = v.await;
253        }
254    }
255}
256
257async fn handle_message(
258    cancel: &CancellationToken,
259    queue: &async_channel::Sender<ReceivedMessage>,
260    client: &SubscriberClient,
261    subscription: &str,
262    messages: Vec<InternalReceivedMessage>,
263) -> usize {
264    let mut nack_targets = vec![];
265    for received_message in messages {
266        if let Some(message) = received_message.message {
267            let id = message.message_id.clone();
268            tracing::debug!("message received: msg_id={id}");
269            let msg = ReceivedMessage::new(
270                subscription.to_string(),
271                client.clone(),
272                message,
273                received_message.ack_id.clone(),
274                (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
275            );
276            let should_nack = select! {
277                result = queue.send(msg) => result.is_err(),
278                _ = cancel.cancelled() => true
279            };
280            if should_nack {
281                tracing::info!("cancelled -> so nack immediately : msg_id={id}");
282                nack_targets.push(received_message.ack_id);
283            }
284        }
285    }
286    let size = nack_targets.len();
287    if size > 0 {
288        // Nack immediately although the queue is closed only when the cancellation token is closed.
289        if let Err(err) = nack(client, subscription.to_string(), nack_targets).await {
290            tracing::error!(
291                "failed to nack immediately {err}. The messages will be redelivered after the ack deadline."
292            );
293        }
294    }
295    size
296}
297
298async fn modify_ack_deadline(
299    subscriber_client: &SubscriberClient,
300    subscription: String,
301    ack_ids: Vec<String>,
302    ack_deadline_seconds: i32,
303) -> Result<(), Status> {
304    if ack_ids.is_empty() {
305        return Ok(());
306    }
307    let req = ModifyAckDeadlineRequest {
308        subscription,
309        ack_deadline_seconds,
310        ack_ids,
311    };
312    subscriber_client
313        .modify_ack_deadline(req, None)
314        .await
315        .map(|e| e.into_inner())
316}
317
318async fn nack(subscriber_client: &SubscriberClient, subscription: String, ack_ids: Vec<String>) -> Result<(), Status> {
319    modify_ack_deadline(subscriber_client, subscription, ack_ids, 0).await
320}
321
322pub(crate) async fn ack(
323    subscriber_client: &SubscriberClient,
324    subscription: String,
325    ack_ids: Vec<String>,
326) -> Result<(), Status> {
327    if ack_ids.is_empty() {
328        return Ok(());
329    }
330    let req = AcknowledgeRequest { subscription, ack_ids };
331    subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
332}
333
334#[cfg(test)]
335mod tests {
336    use serial_test::serial;
337    use tokio_util::sync::CancellationToken;
338
339    use google_cloud_gax::conn::{ConnectionOptions, Environment};
340    use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage, PullRequest};
341
342    use crate::apiv1::conn_pool::ConnectionManager;
343    use crate::apiv1::publisher_client::PublisherClient;
344    use crate::apiv1::subscriber_client::SubscriberClient;
345    use crate::subscriber::handle_message;
346
347    #[ctor::ctor]
348    fn init() {
349        let _ = tracing_subscriber::fmt().try_init();
350    }
351
352    #[tokio::test(flavor = "multi_thread")]
353    #[serial]
354    async fn test_handle_message_immediately_nack() {
355        let cm = || async {
356            ConnectionManager::new(
357                4,
358                "",
359                &Environment::Emulator("localhost:8681".to_string()),
360                &ConnectionOptions::default(),
361            )
362            .await
363            .unwrap()
364        };
365        let subc = SubscriberClient::new(cm().await, cm().await);
366        let pubc = PublisherClient::new(cm().await);
367
368        pubc.publish(
369            PublishRequest {
370                topic: "projects/local-project/topics/test-topic1".to_string(),
371                messages: vec![PubsubMessage {
372                    data: "hoge".into(),
373                    ..Default::default()
374                }],
375            },
376            None,
377        )
378        .await
379        .unwrap();
380
381        let subscription = "projects/local-project/subscriptions/test-subscription1";
382        let response = subc
383            .pull(
384                PullRequest {
385                    subscription: subscription.to_string(),
386                    max_messages: 1,
387                    ..Default::default()
388                },
389                None,
390            )
391            .await
392            .unwrap()
393            .into_inner();
394
395        let messages = response.received_messages;
396        let (queue, _) = async_channel::unbounded();
397        queue.close();
398        let nack_size = handle_message(&CancellationToken::new(), &queue, &subc, subscription, messages).await;
399        assert_eq!(1, nack_size);
400    }
401}