Skip to main content

google_cloud_pubsub/subscriber/
handler.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handlers for acknowledging or rejecting messages.
16//!
17//! To acknowledge (ack) a message, you call [`Handler::ack()`].
18//!
19//! To reject (nack) a message, you [`drop()`][Drop::drop] the handler. The
20//! message will be redelivered.
21//!
22//! # Example
23//!
24//! ```
25//! use google_cloud_pubsub::model::Message;
26//! # use google_cloud_pubsub::subscriber::handler::Handler;
27//! fn on_message(m: Message, h: Handler) {
28//!   match process(m) {
29//!     Ok(_) => h.ack(),
30//!     Err(e) => {
31//!         println!("failed to process message: {e:?}");
32//!         drop(h);
33//!     }
34//!   }
35//! }
36//!
37//! fn process(m: Message) -> anyhow::Result<()> {
38//!   // some business logic here...
39//!   # panic!()
40//! }
41//! ```
42
43use crate::error::AckError;
44use tokio::sync::mpsc::UnboundedSender;
45use tokio::sync::oneshot::Receiver;
46
47/// The action an application does with a message.
48#[derive(Debug, PartialEq)]
49pub(super) enum Action {
50    Ack(String),
51    Nack(String),
52    ExactlyOnceAck(String),
53    ExactlyOnceNack(String),
54}
55
56/// A handler for acknowledging or rejecting messages.
57///
58/// # Example
59///
60/// ```
61/// use google_cloud_pubsub::model::Message;
62/// # use google_cloud_pubsub::subscriber::handler::Handler;
63/// fn on_message(m: Message, h: Handler) {
64///   match process(m) {
65///     Ok(_) => h.ack(),
66///     Err(e) => {
67///         println!("failed to process message: {e:?}");
68///         drop(h);
69///     }
70///   }
71/// }
72///
73/// fn process(m: Message) -> anyhow::Result<()> {
74///   // some business logic here...
75///   # panic!()
76/// }
77/// ```
78///
79/// To acknowledge (ack) a message, you call [`Handler::ack()`].
80///
81/// To reject (nack) a message, you [`drop()`][Drop::drop] the handler. The
82/// service will redeliver the message.
83///
84/// ## Exactly-once delivery
85///
86/// If your subscription has [exactly-once delivery] enabled, you should
87/// destructure this enum into its [`Handler::ExactlyOnce`] branch.
88///
89/// Only when `ExactlyOnce::confirmed_ack()` returns `Ok` can you be certain
90/// that the message will not be redelivered.
91///
92/// [exactly-once delivery]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
93///
94/// ```
95/// use google_cloud_pubsub::model::Message;
96/// # use google_cloud_pubsub::subscriber::handler::Handler;
97/// async fn on_message(m: Message, h: Handler) {
98///   let Handler::ExactlyOnce(h) = h else {
99///     panic!("Oops, my subscription does not have exactly-once delivery enabled.")
100///   };
101///   match h.confirmed_ack().await {
102///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
103///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
104///   }
105/// }
106/// ```
107#[derive(Debug)]
108#[non_exhaustive]
109pub enum Handler {
110    AtLeastOnce(AtLeastOnce),
111    ExactlyOnce(ExactlyOnce),
112}
113
114impl Handler {
115    /// Acknowledge the message associated with this handler.
116    ///
117    /// # Example
118    ///
119    /// ```
120    /// use google_cloud_pubsub::model::Message;
121    /// # use google_cloud_pubsub::subscriber::handler::Handler;
122    /// fn on_message(m: Message, h: Handler) {
123    ///   println!("Received message: {m:?}");
124    ///   h.ack();
125    /// }
126    /// ```
127    ///
128    /// Note that the acknowledgement is best effort. The message may still be
129    /// redelivered to this client, or another client, even if exactly-once
130    /// delivery is enabled on the subscription.
131    pub fn ack(self) {
132        match self {
133            Handler::AtLeastOnce(h) => h.ack(),
134            Handler::ExactlyOnce(h) => h.ack(),
135        }
136    }
137
138    #[cfg(test)]
139    pub(crate) fn ack_id(&self) -> &str {
140        match self {
141            Handler::AtLeastOnce(h) => h.ack_id(),
142            Handler::ExactlyOnce(h) => h.ack_id(),
143        }
144    }
145}
146
147#[derive(Debug)]
148struct AtLeastOnceImpl {
149    ack_id: String,
150    ack_tx: UnboundedSender<Action>,
151}
152
153impl AtLeastOnceImpl {
154    fn ack(self) {
155        let _ = self.ack_tx.send(Action::Ack(self.ack_id));
156    }
157
158    fn nack(self) {
159        let _ = self.ack_tx.send(Action::Nack(self.ack_id));
160    }
161}
162
163/// A handler for at-least-once delivery.
164#[derive(Debug)]
165pub struct AtLeastOnce {
166    inner: Option<AtLeastOnceImpl>,
167}
168
169impl AtLeastOnce {
170    pub(super) fn new(ack_id: String, ack_tx: UnboundedSender<Action>) -> Self {
171        Self {
172            inner: Some(AtLeastOnceImpl { ack_id, ack_tx }),
173        }
174    }
175
176    /// Acknowledge the message associated with this handler.
177    ///
178    /// Note that the acknowledgement is best effort. The message may still be
179    /// redelivered to this client, or another client.
180    pub fn ack(mut self) {
181        if let Some(inner) = self.inner.take() {
182            inner.ack();
183        }
184    }
185
186    #[cfg(test)]
187    pub(crate) fn ack_id(&self) -> &str {
188        self.inner
189            .as_ref()
190            .map(|i| i.ack_id.as_str())
191            .unwrap_or_default()
192    }
193}
194
195impl Drop for AtLeastOnce {
196    /// Rejects the message associated with this handler.
197    ///
198    /// The message will be removed from this `Subscriber`'s lease management.
199    /// The service will redeliver this message, possibly to another client.
200    fn drop(&mut self) {
201        if let Some(inner) = self.inner.take() {
202            inner.nack();
203        }
204    }
205}
206
207/// A handler for exactly-once delivery.
208#[derive(Debug)]
209pub struct ExactlyOnce {
210    inner: Option<ExactlyOnceImpl>,
211}
212
213impl ExactlyOnce {
214    pub(super) fn new(
215        ack_id: String,
216        ack_tx: UnboundedSender<Action>,
217        result_rx: Receiver<AckResult>,
218    ) -> Self {
219        Self {
220            inner: Some(ExactlyOnceImpl {
221                ack_id,
222                ack_tx,
223                result_rx,
224            }),
225        }
226    }
227
228    /// Acknowledge the message associated with this handler.
229    ///
230    /// Note that the acknowledgement is best effort. The message may still be
231    /// redelivered to this client, or another client.
232    pub(crate) fn ack(mut self) {
233        if let Some(inner) = self.inner.take() {
234            inner.ack();
235        }
236    }
237
238    /// Strongly acknowledge the message associated with this handler.
239    ///
240    /// ```
241    /// use google_cloud_pubsub::model::Message;
242    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
243    /// async fn on_message(m: Message, h: ExactlyOnce) {
244    ///   match h.confirmed_ack().await {
245    ///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
246    ///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
247    ///   }
248    /// }
249    /// ```
250    ///
251    /// If the result is an `Ok`, the message is guaranteed not to be delivered
252    /// again.
253    ///
254    /// If the result is an `Err`, the message may be redelivered, but this is
255    /// not guaranteed. If no redelivery occurs a sufficient interval after an
256    /// error, the acknowledgement likely succeeded.
257    pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
258        let inner = self.inner.take().expect("handler impl is always some");
259        inner.confirmed_ack().await
260    }
261
262    #[cfg(test)]
263    pub(crate) fn ack_id(&self) -> &str {
264        self.inner
265            .as_ref()
266            .map(|i| i.ack_id.as_str())
267            .unwrap_or_default()
268    }
269}
270
271impl Drop for ExactlyOnce {
272    /// Rejects the message associated with this handler.
273    ///
274    /// The message will be removed from this `Subscriber`'s lease management.
275    /// The service will redeliver this message, possibly to another client.
276    fn drop(&mut self) {
277        if let Some(inner) = self.inner.take() {
278            inner.nack();
279        }
280    }
281}
282
283#[derive(Debug)]
284struct ExactlyOnceImpl {
285    pub(super) ack_id: String,
286    pub(super) ack_tx: UnboundedSender<Action>,
287    pub(super) result_rx: Receiver<AckResult>,
288}
289
290impl ExactlyOnceImpl {
291    pub fn ack(self) {
292        let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
293    }
294
295    pub fn nack(self) {
296        let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
297    }
298
299    pub async fn confirmed_ack(self) -> AckResult {
300        self.ack_tx
301            .send(Action::ExactlyOnceAck(self.ack_id))
302            .map_err(|_| AckError::ShutdownBeforeAck)?;
303        self.result_rx
304            .await
305            .map_err(|e| AckError::Shutdown(e.into()))?
306    }
307}
308
309/// The result of a confirmed acknowledgement.
310pub(super) type AckResult = std::result::Result<(), AckError>;
311
312#[cfg(test)]
313mod tests {
314    use super::super::lease_state::tests::test_id;
315    use super::*;
316    use tokio::sync::mpsc::error::TryRecvError;
317    use tokio::sync::mpsc::unbounded_channel;
318    use tokio::sync::oneshot::channel;
319
320    #[test]
321    fn handler_at_least_once_ack() -> anyhow::Result<()> {
322        let (ack_tx, mut ack_rx) = unbounded_channel();
323        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
324        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
325
326        h.ack();
327        let ack = ack_rx.try_recv()?;
328        assert_eq!(ack, Action::Ack(test_id(1)));
329
330        Ok(())
331    }
332
333    #[test]
334    fn handler_at_least_once_nack() -> anyhow::Result<()> {
335        let (ack_tx, mut ack_rx) = unbounded_channel();
336        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
337        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
338
339        drop(h);
340        let ack = ack_rx.try_recv()?;
341        assert_eq!(ack, Action::Nack(test_id(1)));
342
343        Ok(())
344    }
345
346    #[test]
347    fn handler_exactly_once_ack() -> anyhow::Result<()> {
348        let (ack_tx, mut ack_rx) = unbounded_channel();
349        let (_result_tx, result_rx) = channel();
350        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
351        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
352
353        h.ack();
354        let ack = ack_rx.try_recv()?;
355        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
356
357        Ok(())
358    }
359
360    #[test]
361    fn handler_exactly_once_nack() -> anyhow::Result<()> {
362        let (ack_tx, mut ack_rx) = unbounded_channel();
363        let (_result_tx, result_rx) = channel();
364        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
365        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
366
367        drop(h);
368        let ack = ack_rx.try_recv()?;
369        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
370
371        Ok(())
372    }
373
374    #[test]
375    fn at_least_once_ack() -> anyhow::Result<()> {
376        let (ack_tx, mut ack_rx) = unbounded_channel();
377        let h = AtLeastOnce::new(test_id(1), ack_tx);
378        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
379
380        h.ack();
381        let ack = ack_rx.try_recv()?;
382        assert_eq!(ack, Action::Ack(test_id(1)));
383
384        Ok(())
385    }
386
387    #[test]
388    fn at_least_once_nack() -> anyhow::Result<()> {
389        let (ack_tx, mut ack_rx) = unbounded_channel();
390        let h = AtLeastOnce::new(test_id(1), ack_tx);
391        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
392
393        drop(h);
394        let ack = ack_rx.try_recv()?;
395        assert_eq!(ack, Action::Nack(test_id(1)));
396
397        Ok(())
398    }
399
400    #[test]
401    fn exactly_once_ack() -> anyhow::Result<()> {
402        let (ack_tx, mut ack_rx) = unbounded_channel();
403        let (_result_tx, result_rx) = channel();
404        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
405        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
406
407        h.ack();
408        let ack = ack_rx.try_recv()?;
409        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
410
411        Ok(())
412    }
413
414    #[tokio::test]
415    async fn exactly_once_success() -> anyhow::Result<()> {
416        let (ack_tx, mut ack_rx) = unbounded_channel();
417        let (result_tx, result_rx) = channel();
418        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
419        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
420
421        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
422
423        let ack = ack_rx.recv().await.expect("ack should be sent");
424        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
425
426        result_tx
427            .send(Ok(()))
428            .expect("sending on a channel succeeds");
429        task.await??;
430
431        Ok(())
432    }
433
434    #[tokio::test]
435    async fn exactly_once_error() -> anyhow::Result<()> {
436        let (ack_tx, mut ack_rx) = unbounded_channel();
437        let (result_tx, result_rx) = channel();
438        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
439        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
440
441        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
442
443        let ack = ack_rx.recv().await.expect("ack should be sent");
444        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
445
446        result_tx
447            .send(Err(AckError::LeaseExpired))
448            .expect("sending on a channel succeeds");
449        let err = task.await?.expect_err("ack should fail");
450        assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
451
452        Ok(())
453    }
454
455    #[tokio::test]
456    async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
457        let (ack_tx, mut ack_rx) = unbounded_channel();
458        let (_result_tx, result_rx) = channel();
459        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
460        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
461        drop(ack_rx);
462
463        let err = h.confirmed_ack().await.expect_err("ack should fail");
464        assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
465
466        Ok(())
467    }
468
469    #[tokio::test]
470    async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
471        let (ack_tx, mut ack_rx) = unbounded_channel();
472        let (result_tx, result_rx) = channel();
473        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
474        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
475
476        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
477
478        let ack = ack_rx.recv().await.expect("ack should be sent");
479        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
480
481        drop(result_tx);
482        let err = task.await?.expect_err("ack should fail");
483        assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
484
485        Ok(())
486    }
487
488    #[test]
489    fn exactly_once_nack() -> anyhow::Result<()> {
490        let (ack_tx, mut ack_rx) = unbounded_channel();
491        let (_result_tx, result_rx) = channel();
492        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
493        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
494
495        drop(h);
496        let ack = ack_rx.try_recv()?;
497        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
498
499        Ok(())
500    }
501}