gcloud_pubsub/
subscriber.rs

1use google_cloud_gax::grpc::{Code, Status};
2use google_cloud_gax::retry::RetrySetting;
3use google_cloud_googleapis::pubsub::v1::{
4    AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, StreamingPullRequest,
5};
6use std::ops::{Deref, DerefMut};
7use std::time::Duration;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10
11use crate::apiv1::default_retry_setting;
12use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
13
14#[derive(Debug, Clone)]
15pub struct ReceivedMessage {
16    pub message: PubsubMessage,
17    pub ack_id: String,
18    pub subscription: String,
19    pub subscriber_client: SubscriberClient,
20    pub delivery_attempt: Option<usize>,
21}
22
23impl ReceivedMessage {
24    pub(crate) fn new(
25        subscription: String,
26        subc: SubscriberClient,
27        message: PubsubMessage,
28        ack_id: String,
29        delivery_attempt: Option<usize>,
30    ) -> Self {
31        Self {
32            message,
33            ack_id,
34            subscription,
35            subscriber_client: subc,
36            delivery_attempt,
37        }
38    }
39
40    pub fn ack_id(&self) -> &str {
41        self.ack_id.as_str()
42    }
43
44    pub async fn ack(&self) -> Result<(), Status> {
45        ack(
46            &self.subscriber_client,
47            self.subscription.to_string(),
48            vec![self.ack_id.to_string()],
49        )
50        .await
51    }
52
53    pub async fn nack(&self) -> Result<(), Status> {
54        nack(
55            &self.subscriber_client,
56            self.subscription.to_string(),
57            vec![self.ack_id.to_string()],
58        )
59        .await
60    }
61
62    pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
63        modify_ack_deadline(
64            &self.subscriber_client,
65            self.subscription.to_string(),
66            vec![self.ack_id.to_string()],
67            ack_deadline_seconds,
68        )
69        .await
70    }
71
72    /// The approximate number of times that Cloud Pub/Sub has attempted to deliver
73    /// the associated message to a subscriber.
74    ///
75    /// The returned value, if present, will be greater than zero.
76    ///
77    /// For more information refer to the
78    /// [protobuf definition](https://github.com/googleapis/googleapis/blob/3c7c76fb63d0f511cdb8c3c1cbc157315f6fbfd3/google/pubsub/v1/pubsub.proto#L1099-L1115).
79    pub fn delivery_attempt(&self) -> Option<usize> {
80        self.delivery_attempt
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct SubscriberConfig {
86    /// ping interval for Bi Directional Streaming
87    pub ping_interval: Duration,
88    pub retry_setting: Option<RetrySetting>,
89    /// It is important for exactly_once_delivery
90    /// The ack deadline to use for the stream. This must be provided in
91    /// the first request on the stream, but it can also be updated on subsequent
92    /// requests from client to server. The minimum deadline you can specify is 10
93    /// seconds. The maximum deadline you can specify is 600 seconds (10 minutes).
94    pub stream_ack_deadline_seconds: i32,
95    /// Flow control settings for the maximum number of outstanding messages. When
96    /// there are `max_outstanding_messages` or more currently sent to the
97    /// streaming pull client that have not yet been acked or nacked, the server
98    /// stops sending more messages. The sending of messages resumes once the
99    /// number of outstanding messages is less than this value. If the value is
100    /// <= 0, there is no limit to the number of outstanding messages. This
101    /// property can only be set on the initial StreamingPullRequest. If it is set
102    /// on a subsequent request, the stream will be aborted with status
103    /// `INVALID_ARGUMENT`.
104    pub max_outstanding_messages: i64,
105    pub max_outstanding_bytes: i64,
106}
107
108impl Default for SubscriberConfig {
109    fn default() -> Self {
110        // Default retry setting with Cancelled code
111        let mut retry = default_retry_setting();
112        retry.codes.push(Code::Cancelled);
113
114        Self {
115            ping_interval: Duration::from_secs(10),
116            retry_setting: Some(retry),
117            stream_ack_deadline_seconds: 60,
118            max_outstanding_messages: 50,
119            max_outstanding_bytes: 1000 * 1000 * 1000,
120        }
121    }
122}
123
124struct UnprocessedMessages {
125    tx: Option<oneshot::Sender<Option<Vec<String>>>>,
126    ack_ids: Option<Vec<String>>,
127}
128
129impl UnprocessedMessages {
130    fn new(tx: oneshot::Sender<Option<Vec<String>>>) -> Self {
131        Self {
132            tx: Some(tx),
133            ack_ids: Some(vec![]),
134        }
135    }
136}
137
138impl Deref for UnprocessedMessages {
139    type Target = Vec<String>;
140
141    fn deref(&self) -> &Self::Target {
142        self.ack_ids.as_ref().unwrap()
143    }
144}
145
146impl DerefMut for UnprocessedMessages {
147    fn deref_mut(&mut self) -> &mut Vec<String> {
148        self.ack_ids.as_mut().unwrap()
149    }
150}
151
152impl Drop for UnprocessedMessages {
153    fn drop(&mut self) {
154        if let Some(tx) = self.tx.take() {
155            let _ = tx.send(self.ack_ids.take());
156        }
157    }
158}
159
160#[derive(Debug)]
161pub(crate) struct Subscriber {
162    client: SubscriberClient,
163    subscription: String,
164    task_to_receive: Option<JoinHandle<()>>,
165    /// Ack id list of unprocessed messages.
166    unprocessed_messages_receiver: Option<oneshot::Receiver<Option<Vec<String>>>>,
167}
168
169impl Drop for Subscriber {
170    fn drop(&mut self) {
171        if let Some(task) = self.task_to_receive.take() {
172            task.abort();
173        }
174        let rx = match self.unprocessed_messages_receiver.take() {
175            None => return,
176            Some(rx) => rx,
177        };
178        let subscription = self.subscription.clone();
179        let client = self.client.clone();
180        tracing::warn!(
181            "Subscriber is not disposed. Call dispose() to properly clean up resources. subscription={}",
182            &subscription
183        );
184        let task = async move {
185            if let Ok(Some(messages)) = rx.await {
186                if messages.is_empty() {
187                    return;
188                }
189                tracing::debug!("nack {} unprocessed messages", messages.len());
190                if let Err(err) = nack(&client, subscription, messages).await {
191                    tracing::error!("failed to nack message: {:?}", err);
192                }
193            }
194        };
195        let _forget = tokio::spawn(task);
196    }
197}
198
199impl Subscriber {
200    pub fn spawn(
201        subscription: String,
202        client: SubscriberClient,
203        queue: mpsc::Sender<ReceivedMessage>,
204        config: SubscriberConfig,
205    ) -> Self {
206        let subscription_clone = subscription.clone();
207        let client_clone = client.clone();
208
209        // Build task to receive
210        let (tx, rx) = oneshot::channel();
211        let task_to_receive = async move {
212            tracing::debug!("start subscriber: {}", subscription);
213
214            let retryable_codes = match &config.retry_setting {
215                Some(v) => v.codes.clone(),
216                None => default_retry_setting().codes,
217            };
218
219            let mut unprocessed_messages = UnprocessedMessages::new(tx);
220            loop {
221                let mut request = create_empty_streaming_pull_request();
222                request.subscription = subscription.to_string();
223                request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
224                request.max_outstanding_messages = config.max_outstanding_messages;
225                request.max_outstanding_bytes = config.max_outstanding_bytes;
226
227                let response = Self::receive(
228                    client.clone(),
229                    request,
230                    config.clone(),
231                    queue.clone(),
232                    &mut unprocessed_messages,
233                )
234                .await;
235
236                if let Err(e) = response {
237                    if retryable_codes.contains(&e.code()) {
238                        tracing::trace!("refresh connection: subscriber will reconnect {:?} : {}", e, subscription);
239                        continue;
240                    } else {
241                        tracing::error!("failed to receive message: subscriber will stop {:?} : {}", e, subscription);
242                        break;
243                    }
244                } else {
245                    tracing::debug!("stopped to receive message: {}", subscription);
246                    break;
247                }
248            }
249            tracing::debug!("stop subscriber: {}", subscription);
250        };
251
252        // When the all the task stops queue will be closed automatically and closed is detected by receiver.
253
254        Self {
255            client: client_clone,
256            subscription: subscription_clone,
257            task_to_receive: Some(tokio::spawn(task_to_receive)),
258            unprocessed_messages_receiver: Some(rx),
259        }
260    }
261
262    async fn receive(
263        client: SubscriberClient,
264        request: StreamingPullRequest,
265        config: SubscriberConfig,
266        queue: mpsc::Sender<ReceivedMessage>,
267        unprocessed_messages: &mut Vec<String>,
268    ) -> Result<(), Status> {
269        let subscription = request.subscription.to_string();
270
271        // Call the streaming_pull method with the provided request and ping_receiver
272        let response = client
273            .streaming_pull(request, config.ping_interval, config.retry_setting.clone())
274            .await?;
275        let mut stream = response.into_inner();
276
277        // Process the stream
278        loop {
279            let message = stream.message().await?;
280            let messages = match message {
281                Some(m) => m.received_messages,
282                None => return Ok(()),
283            };
284
285            let mut msgs = Vec::with_capacity(messages.len());
286            for received_message in messages {
287                if let Some(message) = received_message.message {
288                    let id = message.message_id.clone();
289                    tracing::trace!("message received: msg_id={id}");
290                    let msg = ReceivedMessage::new(
291                        subscription.clone(),
292                        client.clone(),
293                        message,
294                        received_message.ack_id.clone(),
295                        (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
296                    );
297                    unprocessed_messages.push(msg.ack_id.clone());
298                    msgs.push(msg);
299                }
300            }
301
302            for msg in msgs.drain(..) {
303                let ack_id = msg.ack_id.clone();
304                if queue.send(msg).await.is_ok() {
305                    unprocessed_messages.retain(|e| *e != ack_id);
306                } else {
307                    // Permanently close the stream if the queue is closed.
308                    break;
309                }
310            }
311        }
312    }
313
314    pub async fn dispose(mut self) -> usize {
315        if let Some(task) = self.task_to_receive.take() {
316            task.abort();
317        }
318        let mut count = 0;
319        let rx = match self.unprocessed_messages_receiver.take() {
320            None => return count,
321            Some(rx) => rx,
322        };
323
324        if let Ok(Some(messages)) = rx.await {
325            // Nack all the unprocessed messages
326            if messages.is_empty() {
327                return count;
328            }
329            let size = messages.len();
330            tracing::debug!("nack {} unprocessed messages", size);
331            let result = nack(&self.client, self.subscription.clone(), messages).await;
332            match result {
333                Ok(_) => count = size,
334                Err(err) => tracing::error!("failed to nack message: {:?}", err),
335            }
336        }
337        count
338    }
339}
340
341async fn modify_ack_deadline(
342    subscriber_client: &SubscriberClient,
343    subscription: String,
344    ack_ids: Vec<String>,
345    ack_deadline_seconds: i32,
346) -> Result<(), Status> {
347    if ack_ids.is_empty() {
348        return Ok(());
349    }
350    let req = ModifyAckDeadlineRequest {
351        subscription,
352        ack_deadline_seconds,
353        ack_ids,
354    };
355    subscriber_client
356        .modify_ack_deadline(req, None)
357        .await
358        .map(|e| e.into_inner())
359}
360
361pub(crate) async fn nack(
362    subscriber_client: &SubscriberClient,
363    subscription: String,
364    ack_ids: Vec<String>,
365) -> Result<(), Status> {
366    for chunk in ack_ids.chunks(100) {
367        modify_ack_deadline(subscriber_client, subscription.clone(), chunk.to_vec(), 0).await?;
368    }
369    Ok(())
370}
371
372pub(crate) async fn ack(
373    subscriber_client: &SubscriberClient,
374    subscription: String,
375    ack_ids: Vec<String>,
376) -> Result<(), Status> {
377    if ack_ids.is_empty() {
378        return Ok(());
379    }
380    let req = AcknowledgeRequest { subscription, ack_ids };
381    subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
382}