Skip to main content

gcloud_pubsub/
subscriber.rs

1use google_cloud_gax::grpc::{Code, Status};
2use google_cloud_gax::retry::RetrySetting;
3use google_cloud_googleapis::pubsub::v1::{
4    AcknowledgeRequest, ModifyAckDeadlineRequest, PubsubMessage, StreamingPullRequest,
5};
6use std::ops::{Deref, DerefMut};
7use std::time::Duration;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10
11use crate::apiv1::default_retry_setting as base_retry_setting;
12use crate::apiv1::subscriber_client::{create_empty_streaming_pull_request, SubscriberClient};
13
14#[derive(Debug, Clone)]
15pub struct ReceivedMessage {
16    pub message: PubsubMessage,
17    pub ack_id: String,
18    pub subscription: String,
19    pub subscriber_client: SubscriberClient,
20    pub delivery_attempt: Option<usize>,
21}
22
23impl ReceivedMessage {
24    pub(crate) fn new(
25        subscription: String,
26        subc: SubscriberClient,
27        message: PubsubMessage,
28        ack_id: String,
29        delivery_attempt: Option<usize>,
30    ) -> Self {
31        Self {
32            message,
33            ack_id,
34            subscription,
35            subscriber_client: subc,
36            delivery_attempt,
37        }
38    }
39
40    pub fn ack_id(&self) -> &str {
41        self.ack_id.as_str()
42    }
43
44    pub async fn ack(&self) -> Result<(), Status> {
45        ack(
46            &self.subscriber_client,
47            self.subscription.to_string(),
48            vec![self.ack_id.to_string()],
49        )
50        .await
51    }
52
53    pub async fn nack(&self) -> Result<(), Status> {
54        nack(
55            &self.subscriber_client,
56            self.subscription.to_string(),
57            vec![self.ack_id.to_string()],
58        )
59        .await
60    }
61
62    pub async fn modify_ack_deadline(&self, ack_deadline_seconds: i32) -> Result<(), Status> {
63        modify_ack_deadline(
64            &self.subscriber_client,
65            self.subscription.to_string(),
66            vec![self.ack_id.to_string()],
67            ack_deadline_seconds,
68        )
69        .await
70    }
71
72    /// The approximate number of times that Cloud Pub/Sub has attempted to deliver
73    /// the associated message to a subscriber.
74    ///
75    /// The returned value, if present, will be greater than zero.
76    ///
77    /// For more information refer to the
78    /// [protobuf definition](https://github.com/googleapis/googleapis/blob/3c7c76fb63d0f511cdb8c3c1cbc157315f6fbfd3/google/pubsub/v1/pubsub.proto#L1099-L1115).
79    pub fn delivery_attempt(&self) -> Option<usize> {
80        self.delivery_attempt
81    }
82}
83
84fn default_retry_setting() -> RetrySetting {
85    let mut retry = base_retry_setting();
86    // Default retry setting with Cancelled code
87    retry.codes.push(Code::Cancelled);
88    retry
89}
90
91#[derive(Debug, Clone)]
92pub struct SubscriberConfig {
93    /// ping interval for Bi Directional Streaming
94    pub ping_interval: Duration,
95    pub retry_setting: Option<RetrySetting>,
96    /// It is important for exactly_once_delivery
97    /// The ack deadline to use for the stream. This must be provided in
98    /// the first request on the stream, but it can also be updated on subsequent
99    /// requests from client to server. The minimum deadline you can specify is 10
100    /// seconds. The maximum deadline you can specify is 600 seconds (10 minutes).
101    pub stream_ack_deadline_seconds: i32,
102    /// Flow control settings for the maximum number of outstanding messages. When
103    /// there are `max_outstanding_messages` or more currently sent to the
104    /// streaming pull client that have not yet been acked or nacked, the server
105    /// stops sending more messages. The sending of messages resumes once the
106    /// number of outstanding messages is less than this value. If the value is
107    /// <= 0, there is no limit to the number of outstanding messages. This
108    /// property can only be set on the initial StreamingPullRequest. If it is set
109    /// on a subsequent request, the stream will be aborted with status
110    /// `INVALID_ARGUMENT`.
111    pub max_outstanding_messages: i64,
112    pub max_outstanding_bytes: i64,
113}
114
115impl Default for SubscriberConfig {
116    fn default() -> Self {
117        Self {
118            ping_interval: Duration::from_secs(10),
119            retry_setting: Some(default_retry_setting()),
120            stream_ack_deadline_seconds: 60,
121            max_outstanding_messages: 50,
122            max_outstanding_bytes: 1000 * 1000 * 1000,
123        }
124    }
125}
126
127struct UnprocessedMessages {
128    tx: Option<oneshot::Sender<Option<Vec<String>>>>,
129    ack_ids: Option<Vec<String>>,
130}
131
132impl UnprocessedMessages {
133    fn new(tx: oneshot::Sender<Option<Vec<String>>>) -> Self {
134        Self {
135            tx: Some(tx),
136            ack_ids: Some(vec![]),
137        }
138    }
139}
140
141impl Deref for UnprocessedMessages {
142    type Target = Vec<String>;
143
144    fn deref(&self) -> &Self::Target {
145        self.ack_ids.as_ref().unwrap()
146    }
147}
148
149impl DerefMut for UnprocessedMessages {
150    fn deref_mut(&mut self) -> &mut Vec<String> {
151        self.ack_ids.as_mut().unwrap()
152    }
153}
154
155impl Drop for UnprocessedMessages {
156    fn drop(&mut self) {
157        if let Some(tx) = self.tx.take() {
158            let _ = tx.send(self.ack_ids.take());
159        }
160    }
161}
162
163#[derive(Debug)]
164pub(crate) struct Subscriber {
165    client: SubscriberClient,
166    subscription: String,
167    task_to_receive: Option<JoinHandle<()>>,
168    /// Ack id list of unprocessed messages.
169    unprocessed_messages_receiver: Option<oneshot::Receiver<Option<Vec<String>>>>,
170}
171
172impl Drop for Subscriber {
173    fn drop(&mut self) {
174        if let Some(task) = self.task_to_receive.take() {
175            task.abort();
176        }
177        let rx = match self.unprocessed_messages_receiver.take() {
178            None => return,
179            Some(rx) => rx,
180        };
181        let subscription = self.subscription.clone();
182        let client = self.client.clone();
183        tracing::warn!(
184            "Subscriber is not disposed. Call dispose() to properly clean up resources. subscription={}",
185            &subscription
186        );
187        let task = async move {
188            if let Ok(Some(messages)) = rx.await {
189                if messages.is_empty() {
190                    return;
191                }
192                tracing::debug!("nack {} unprocessed messages", messages.len());
193                if let Err(err) = nack(&client, subscription, messages).await {
194                    tracing::error!("failed to nack message: {:?}", err);
195                }
196            }
197        };
198        let _forget = tokio::spawn(task);
199    }
200}
201
202impl Subscriber {
203    pub fn spawn(
204        subscription: String,
205        client: SubscriberClient,
206        queue: mpsc::Sender<ReceivedMessage>,
207        config: SubscriberConfig,
208    ) -> Self {
209        let subscription_clone = subscription.clone();
210        let client_clone = client.clone();
211
212        // Build task to receive
213        let (tx, rx) = oneshot::channel();
214        let task_to_receive = async move {
215            tracing::debug!("start subscriber: {}", subscription);
216
217            let retryable_codes = match &config.retry_setting {
218                Some(v) => v.codes.clone(),
219                None => default_retry_setting().codes,
220            };
221
222            let mut unprocessed_messages = UnprocessedMessages::new(tx);
223            loop {
224                let mut request = create_empty_streaming_pull_request();
225                request.subscription = subscription.to_string();
226                request.stream_ack_deadline_seconds = config.stream_ack_deadline_seconds;
227                request.max_outstanding_messages = config.max_outstanding_messages;
228                request.max_outstanding_bytes = config.max_outstanding_bytes;
229
230                let response = Self::receive(
231                    client.clone(),
232                    request,
233                    config.clone(),
234                    queue.clone(),
235                    &mut unprocessed_messages,
236                )
237                .await;
238
239                if let Err(e) = response {
240                    if retryable_codes.contains(&e.code()) {
241                        tracing::trace!("refresh connection: subscriber will reconnect {:?} : {}", e, subscription);
242                        continue;
243                    } else {
244                        tracing::error!("failed to receive message: subscriber will stop {:?} : {}", e, subscription);
245                        break;
246                    }
247                } else {
248                    tracing::debug!("stopped to receive message: {}", subscription);
249                    break;
250                }
251            }
252            tracing::debug!("stop subscriber: {}", subscription);
253        };
254
255        // When the all the task stops queue will be closed automatically and closed is detected by receiver.
256
257        Self {
258            client: client_clone,
259            subscription: subscription_clone,
260            task_to_receive: Some(tokio::spawn(task_to_receive)),
261            unprocessed_messages_receiver: Some(rx),
262        }
263    }
264
265    async fn receive(
266        client: SubscriberClient,
267        request: StreamingPullRequest,
268        config: SubscriberConfig,
269        queue: mpsc::Sender<ReceivedMessage>,
270        unprocessed_messages: &mut Vec<String>,
271    ) -> Result<(), Status> {
272        let subscription = request.subscription.to_string();
273
274        // Call the streaming_pull method with the provided request and ping_receiver
275        let response = client
276            .streaming_pull(request, config.ping_interval, config.retry_setting.clone())
277            .await?;
278        let mut stream = response.into_inner();
279
280        // Process the stream
281        loop {
282            let message = stream.message().await?;
283            let messages = match message {
284                Some(m) => m.received_messages,
285                None => return Ok(()),
286            };
287
288            let mut msgs = Vec::with_capacity(messages.len());
289            for received_message in messages {
290                if let Some(message) = received_message.message {
291                    let id = message.message_id.clone();
292                    tracing::trace!("message received: msg_id={id}");
293                    let msg = ReceivedMessage::new(
294                        subscription.clone(),
295                        client.clone(),
296                        message,
297                        received_message.ack_id.clone(),
298                        (received_message.delivery_attempt > 0).then_some(received_message.delivery_attempt as usize),
299                    );
300                    unprocessed_messages.push(msg.ack_id.clone());
301                    msgs.push(msg);
302                }
303            }
304
305            for msg in msgs.drain(..) {
306                let ack_id = msg.ack_id.clone();
307                if queue.send(msg).await.is_ok() {
308                    unprocessed_messages.retain(|e| *e != ack_id);
309                } else {
310                    // Permanently close the stream if the queue is closed.
311                    break;
312                }
313            }
314        }
315    }
316
317    pub async fn dispose(mut self) -> usize {
318        if let Some(task) = self.task_to_receive.take() {
319            task.abort();
320        }
321        let mut count = 0;
322        let rx = match self.unprocessed_messages_receiver.take() {
323            None => return count,
324            Some(rx) => rx,
325        };
326
327        if let Ok(Some(messages)) = rx.await {
328            // Nack all the unprocessed messages
329            if messages.is_empty() {
330                return count;
331            }
332            let size = messages.len();
333            tracing::debug!("nack {} unprocessed messages", size);
334            let result = nack(&self.client, self.subscription.clone(), messages).await;
335            match result {
336                Ok(_) => count = size,
337                Err(err) => tracing::error!("failed to nack message: {:?}", err),
338            }
339        }
340        count
341    }
342}
343
344async fn modify_ack_deadline(
345    subscriber_client: &SubscriberClient,
346    subscription: String,
347    ack_ids: Vec<String>,
348    ack_deadline_seconds: i32,
349) -> Result<(), Status> {
350    if ack_ids.is_empty() {
351        return Ok(());
352    }
353    let req = ModifyAckDeadlineRequest {
354        subscription,
355        ack_deadline_seconds,
356        ack_ids,
357    };
358    subscriber_client
359        .modify_ack_deadline(req, None)
360        .await
361        .map(|e| e.into_inner())
362}
363
364pub(crate) async fn nack(
365    subscriber_client: &SubscriberClient,
366    subscription: String,
367    ack_ids: Vec<String>,
368) -> Result<(), Status> {
369    for chunk in ack_ids.chunks(100) {
370        modify_ack_deadline(subscriber_client, subscription.clone(), chunk.to_vec(), 0).await?;
371    }
372    Ok(())
373}
374
375pub(crate) async fn ack(
376    subscriber_client: &SubscriberClient,
377    subscription: String,
378    ack_ids: Vec<String>,
379) -> Result<(), Status> {
380    if ack_ids.is_empty() {
381        return Ok(());
382    }
383    let req = AcknowledgeRequest { subscription, ack_ids };
384    subscriber_client.acknowledge(req, None).await.map(|e| e.into_inner())
385}