Skip to main content

google_cloud_pubsub/subscriber/
handler.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handlers for acknowledging or rejecting messages.
16//!
17//! To acknowledge (ack) a message, you call [`Handler::ack()`].
18//!
19//! To reject (nack) a message, you call [`Handler::nack()`]. The
20//! message will be redelivered.
21//!
22//! # Example
23//!
24//! ```
25//! use google_cloud_pubsub::model::Message;
26//! # use google_cloud_pubsub::subscriber::handler::Handler;
27//! fn on_message(m: Message, h: Handler) {
28//!   match process(m) {
29//!     Ok(_) => h.ack(),
30//!     Err(e) => {
31//!         println!("failed to process message: {e:?}");
32//!         h.nack();
33//!     }
34//!   }
35//! }
36//!
37//! fn process(m: Message) -> anyhow::Result<()> {
38//!   // some business logic here...
39//!   # panic!()
40//! }
41//! ```
42
43use crate::error::AckError;
44use crate::subscriber::lease_state::NACK_SHUTDOWN_ERROR;
45use tokio::sync::mpsc::UnboundedSender;
46use tokio::sync::oneshot::Receiver;
47
48/// The action an application does with a message.
49#[derive(Debug, PartialEq)]
50pub(super) enum Action {
51    Ack(String),
52    Nack(String),
53    ExactlyOnceAck(String),
54    ExactlyOnceNack(String),
55}
56
57/// A handler for acknowledging or rejecting messages.
58///
59/// # Example
60///
61/// ```
62/// use google_cloud_pubsub::model::Message;
63/// # use google_cloud_pubsub::subscriber::handler::Handler;
64/// fn on_message(m: Message, h: Handler) {
65///   match process(m) {
66///     Ok(_) => h.ack(),
67///     Err(e) => {
68///         println!("failed to process message: {e:?}");
69///         h.nack();
70///     }
71///   }
72/// }
73///
74/// fn process(m: Message) -> anyhow::Result<()> {
75///   // some business logic here...
76///   # panic!()
77/// }
78/// ```
79///
80/// To acknowledge (ack) a message, you call [`Handler::ack()`].
81///
82/// To reject (nack) a message, you call [`Handler::nack()`]. The
83/// service will redeliver the message.
84///
85/// ## Delivery attempts
86///
87/// If the subscription has a [dead-letter topic] configured, you can retrieve
88/// the delivery attempt count.
89///
90/// [dead-letter topic]: https://cloud.google.com/pubsub/docs/dead-letter-topics
91///
92/// ```
93/// # use google_cloud_pubsub::subscriber::handler::Handler;
94/// fn on_message(h: Handler) {
95///     match h.delivery_attempt() {
96///         Some(i) => println!("Delivery attempt: {i}"),
97///         None => println!("Delivery attempt: unknown"),
98///     }
99/// }
100/// ```
101///
102/// ## Exactly-once delivery
103///
104/// If your subscription has [exactly-once delivery] enabled, you should
105/// destructure this enum into its [`Handler::ExactlyOnce`] branch.
106///
107/// Only when `ExactlyOnce::confirmed_ack()` returns `Ok` can you be certain
108/// that the message will not be redelivered.
109///
110/// [exactly-once delivery]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
111///
112/// ```
113/// use google_cloud_pubsub::model::Message;
114/// # use google_cloud_pubsub::subscriber::handler::Handler;
115/// async fn on_message(m: Message, h: Handler) {
116///   let Handler::ExactlyOnce(h) = h else {
117///     panic!("Oops, my subscription does not have exactly-once delivery enabled.")
118///   };
119///   match h.confirmed_ack().await {
120///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
121///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
122///   }
123/// }
124/// ```
125#[derive(Debug)]
126#[non_exhaustive]
127pub enum Handler {
128    /// A handler for at-least-once delivery.
129    ///
130    /// The handler type is determined by the subscription configuration.
131    ///
132    /// ```
133    /// # use google_cloud_pubsub::subscriber::handler::{Handler, AtLeastOnce};
134    /// # fn on_message(h: Handler) {
135    /// if let Handler::AtLeastOnce(h) = h {
136    ///     h.ack();
137    /// }
138    /// # }
139    /// ```
140    AtLeastOnce(AtLeastOnce),
141    /// A handler for exactly-once delivery.
142    ///
143    /// The handler type is determined by the subscription configuration.
144    ///
145    /// ```
146    /// # use google_cloud_pubsub::subscriber::handler::{Handler, ExactlyOnce};
147    /// # async fn on_message(h: Handler) -> Result<(), Box<dyn std::error::Error>> {
148    /// if let Handler::ExactlyOnce(h) = h {
149    ///     h.confirmed_ack().await?;
150    /// }
151    /// # Ok(()) }
152    /// ```
153    ExactlyOnce(ExactlyOnce),
154}
155
156impl Handler {
157    /// Acknowledge the message associated with this handler.
158    ///
159    /// # Example
160    ///
161    /// ```
162    /// use google_cloud_pubsub::model::Message;
163    /// # use google_cloud_pubsub::subscriber::handler::Handler;
164    /// fn on_message(m: Message, h: Handler) {
165    ///   println!("Received message: {m:?}");
166    ///   h.ack();
167    /// }
168    /// ```
169    ///
170    /// Note that the acknowledgement is best effort. The message may still be
171    /// redelivered to this client, or another client, even if exactly-once
172    /// delivery is enabled on the subscription.
173    pub fn ack(self) {
174        match self {
175            Handler::AtLeastOnce(h) => h.ack(),
176            Handler::ExactlyOnce(h) => h.ack(),
177        }
178    }
179
180    /// Rejects the message associated with this handler.
181    ///
182    /// # Example
183    ///
184    /// ```
185    /// use google_cloud_pubsub::model::Message;
186    /// # use google_cloud_pubsub::subscriber::handler::Handler;
187    /// fn on_message(m: Message, h: Handler) {
188    ///   println!("Received message: {m:?}");
189    ///   h.nack();
190    /// }
191    /// ```
192    ///
193    /// The message will be removed from this `Subscriber`'s lease management.
194    /// The service will redeliver this message, possibly to another client.
195    pub fn nack(self) {
196        match self {
197            Handler::AtLeastOnce(h) => h.nack(),
198            Handler::ExactlyOnce(h) => h.nack(),
199        }
200    }
201
202    /// Returns the delivery attempt count for this message, if available.
203    ///
204    /// # Example
205    ///
206    /// ```
207    /// # use google_cloud_pubsub::subscriber::handler::Handler;
208    /// fn on_message(h: Handler) {
209    ///     match h.delivery_attempt() {
210    ///         Some(i) => println!("Delivery attempt: {i}"),
211    ///         None => println!("Delivery attempt: unknown"),
212    ///     }
213    /// }
214    /// ```
215    ///
216    /// This returns `None` if dead-letter topics are not configured on the
217    /// subscription.
218    pub fn delivery_attempt(&self) -> Option<i32> {
219        match self {
220            Handler::AtLeastOnce(h) => h.delivery_attempt(),
221            Handler::ExactlyOnce(h) => h.delivery_attempt(),
222        }
223    }
224}
225
226#[derive(Debug)]
227struct AtLeastOnceImpl {
228    ack_id: String,
229    ack_tx: UnboundedSender<Action>,
230    delivery_attempt: Option<i32>,
231}
232
233impl AtLeastOnceImpl {
234    fn ack(self) {
235        let _ = self.ack_tx.send(Action::Ack(self.ack_id));
236    }
237
238    fn nack(self) {
239        let _ = self.ack_tx.send(Action::Nack(self.ack_id));
240    }
241}
242
243/// A handler for at-least-once delivery.
244#[derive(Debug)]
245pub struct AtLeastOnce {
246    inner: Option<AtLeastOnceImpl>,
247}
248
249impl AtLeastOnce {
250    pub(super) fn new(
251        ack_id: String,
252        ack_tx: UnboundedSender<Action>,
253        delivery_attempt: Option<i32>,
254    ) -> Self {
255        Self {
256            inner: Some(AtLeastOnceImpl {
257                ack_id,
258                ack_tx,
259                delivery_attempt,
260            }),
261        }
262    }
263
264    /// Acknowledge the message associated with this handler.
265    ///
266    /// Note that the acknowledgement is best effort. The message may still be
267    /// redelivered to this client, or another client.
268    pub fn ack(mut self) {
269        if let Some(inner) = self.inner.take() {
270            inner.ack();
271        }
272    }
273
274    /// Rejects the message associated with this handler.
275    ///
276    /// # Example
277    ///
278    /// ```
279    /// use google_cloud_pubsub::model::Message;
280    /// # use google_cloud_pubsub::subscriber::handler::AtLeastOnce;
281    /// fn on_message(m: Message, h: AtLeastOnce) {
282    ///   println!("Received message: {m:?}");
283    ///   h.nack();
284    /// }
285    /// ```
286    ///
287    /// The message will be removed from this `Subscriber`'s lease management.
288    /// The service will redeliver this message, possibly to another client.
289    pub fn nack(mut self) {
290        if let Some(inner) = self.inner.take() {
291            inner.nack();
292        }
293    }
294
295    /// Returns the delivery attempt count for this message, if available.
296    ///
297    /// # Example
298    ///
299    /// ```
300    /// # use google_cloud_pubsub::subscriber::handler::AtLeastOnce;
301    /// fn on_message(h: AtLeastOnce) {
302    ///     match h.delivery_attempt() {
303    ///         Some(i) => println!("Delivery attempt: {i}"),
304    ///         None => println!("Delivery attempt: unknown"),
305    ///     }
306    /// }
307    /// ```
308    ///
309    /// This returns `None` if dead-letter topics are not configured on the
310    /// subscription.
311    pub fn delivery_attempt(&self) -> Option<i32> {
312        self.inner.as_ref().and_then(|i| i.delivery_attempt)
313    }
314}
315
316impl Drop for AtLeastOnce {
317    /// Rejects the message associated with this handler.
318    ///
319    /// The message will be removed from this `Subscriber`'s lease management.
320    /// The service will redeliver this message, possibly to another client.
321    fn drop(&mut self) {
322        if let Some(inner) = self.inner.take() {
323            inner.nack();
324        }
325    }
326}
327
328/// A handler for exactly-once delivery.
329#[derive(Debug)]
330pub struct ExactlyOnce {
331    inner: Option<ExactlyOnceImpl>,
332}
333
334impl ExactlyOnce {
335    pub(super) fn new(
336        ack_id: String,
337        ack_tx: UnboundedSender<Action>,
338        result_rx: Receiver<AckResult>,
339        delivery_attempt: Option<i32>,
340    ) -> Self {
341        Self {
342            inner: Some(ExactlyOnceImpl {
343                ack_id,
344                ack_tx,
345                result_rx,
346                delivery_attempt,
347            }),
348        }
349    }
350
351    /// Acknowledge the message associated with this handler.
352    ///
353    /// Note that the acknowledgement is best effort. The message may still be
354    /// redelivered to this client, or another client.
355    pub(crate) fn ack(mut self) {
356        if let Some(inner) = self.inner.take() {
357            inner.ack();
358        }
359    }
360
361    pub(crate) fn nack(mut self) {
362        if let Some(inner) = self.inner.take() {
363            inner.nack();
364        }
365    }
366
367    /// Strongly acknowledge the message associated with this handler.
368    ///
369    /// ```
370    /// use google_cloud_pubsub::model::Message;
371    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
372    /// async fn on_message(m: Message, h: ExactlyOnce) {
373    ///   match h.confirmed_ack().await {
374    ///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
375    ///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
376    ///   }
377    /// }
378    /// ```
379    ///
380    /// If the result is an `Ok`, the message is guaranteed not to be delivered
381    /// again.
382    ///
383    /// If the result is an `Err`, the message may be redelivered, but this is
384    /// not guaranteed. If no redelivery occurs a sufficient interval after an
385    /// error, the acknowledgement likely succeeded.
386    pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
387        let inner = self.inner.take().expect("handler impl is always some");
388        inner.confirmed_ack().await
389    }
390
391    /// Rejects the message associated with this handler and waits for
392    /// confirmation.
393    ///
394    /// ```
395    /// use google_cloud_pubsub::model::Message;
396    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
397    /// async fn on_message(m: Message, h: ExactlyOnce) {
398    ///   match h.confirmed_nack().await {
399    ///     Ok(()) => println!("Confirmed nack for message={m:?}. The message will be redelivered."),
400    ///     Err(e) => println!("Failed to confirm nack for message={m:?} with error={e:?}"),
401    ///   }
402    /// }
403    /// ```
404    ///
405    /// If the result is an `Ok`, the message is guaranteed to be immediately
406    /// considered for redelivery. If an error occurs, the message will still
407    /// be redelivered, but it may be held for the remainder of its
408    /// `max_lease_extension`.
409    pub async fn confirmed_nack(mut self) -> std::result::Result<(), AckError> {
410        let inner = self.inner.take().expect("handler impl is always some");
411        inner.confirmed_nack().await
412    }
413
414    /// Returns the delivery attempt count for this message, if available.
415    ///
416    /// # Example
417    ///
418    /// ```
419    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
420    /// fn on_message(h: ExactlyOnce) {
421    ///     match h.delivery_attempt() {
422    ///         Some(i) => println!("Delivery attempt: {i}"),
423    ///         None => println!("Delivery attempt: unknown"),
424    ///     }
425    /// }
426    /// ```
427    ///
428    /// This returns `None` if dead-letter topics are not configured on the
429    /// subscription.
430    pub fn delivery_attempt(&self) -> Option<i32> {
431        self.inner.as_ref().and_then(|i| i.delivery_attempt)
432    }
433}
434
435impl Drop for ExactlyOnce {
436    /// Rejects the message associated with this handler.
437    ///
438    /// The message will be removed from this `Subscriber`'s lease management.
439    /// The service will redeliver this message, possibly to another client.
440    fn drop(&mut self) {
441        if let Some(inner) = self.inner.take() {
442            inner.nack();
443        }
444    }
445}
446
447#[derive(Debug)]
448struct ExactlyOnceImpl {
449    pub(super) ack_id: String,
450    pub(super) ack_tx: UnboundedSender<Action>,
451    pub(super) result_rx: Receiver<AckResult>,
452    pub(super) delivery_attempt: Option<i32>,
453}
454
455impl ExactlyOnceImpl {
456    pub fn ack(self) {
457        let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
458    }
459
460    pub fn nack(self) {
461        let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
462    }
463
464    pub async fn confirmed_ack(self) -> AckResult {
465        self.ack_tx
466            .send(Action::ExactlyOnceAck(self.ack_id))
467            .map_err(|_| AckError::ShutdownBeforeAck)?;
468        self.result_rx
469            .await
470            .map_err(|e| AckError::Shutdown(e.into()))?
471    }
472
473    pub async fn confirmed_nack(self) -> AckResult {
474        self.ack_tx
475            .send(Action::ExactlyOnceNack(self.ack_id))
476            .map_err(|_| AckError::Shutdown(NACK_SHUTDOWN_ERROR.into()))?;
477        self.result_rx
478            .await
479            .map_err(|e| AckError::Shutdown(e.into()))?
480    }
481}
482
483/// The result of a confirmed acknowledgement.
484pub(super) type AckResult = std::result::Result<(), AckError>;
485
486#[cfg(test)]
487mod tests {
488    use std::error::Error;
489
490    use super::super::lease_state::tests::test_id;
491    use super::*;
492    use tokio::sync::mpsc::error::TryRecvError;
493    use tokio::sync::mpsc::unbounded_channel;
494    use tokio::sync::oneshot::channel;
495
496    impl Handler {
497        pub(crate) fn ack_id(&self) -> &str {
498            match self {
499                Handler::AtLeastOnce(h) => h.ack_id(),
500                Handler::ExactlyOnce(h) => h.ack_id(),
501            }
502        }
503    }
504
505    impl AtLeastOnce {
506        pub(crate) fn ack_id(&self) -> &str {
507            self.inner
508                .as_ref()
509                .map(|i| i.ack_id.as_str())
510                .unwrap_or_default()
511        }
512    }
513
514    impl ExactlyOnce {
515        pub(crate) fn ack_id(&self) -> &str {
516            self.inner
517                .as_ref()
518                .map(|i| i.ack_id.as_str())
519                .unwrap_or_default()
520        }
521    }
522
523    #[test]
524    fn handler_delivery_attempt() -> anyhow::Result<()> {
525        let (ack_tx, _) = unbounded_channel();
526        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, Some(5)));
527        assert_eq!(h.delivery_attempt(), Some(5));
528        Ok(())
529    }
530
531    #[test]
532    fn at_least_once_delivery_attempt() -> anyhow::Result<()> {
533        let (ack_tx, _) = unbounded_channel();
534        let h = AtLeastOnce::new(test_id(1), ack_tx, Some(5));
535        assert_eq!(h.delivery_attempt(), Some(5));
536        Ok(())
537    }
538
539    #[test]
540    fn at_least_once_delivery_attempt_none() -> anyhow::Result<()> {
541        let (ack_tx, _) = unbounded_channel();
542        let h = AtLeastOnce::new(test_id(1), ack_tx, None);
543        assert_eq!(h.delivery_attempt(), None);
544        Ok(())
545    }
546
547    #[test]
548    fn exactly_once_delivery_attempt() -> anyhow::Result<()> {
549        let (ack_tx, _) = unbounded_channel();
550        let (_result_tx, result_rx) = channel();
551        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, Some(5));
552        assert_eq!(h.delivery_attempt(), Some(5));
553        Ok(())
554    }
555
556    #[test]
557    fn exactly_once_delivery_attempt_none() -> anyhow::Result<()> {
558        let (ack_tx, _) = unbounded_channel();
559        let (_result_tx, result_rx) = channel();
560        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
561        assert_eq!(h.delivery_attempt(), None);
562        Ok(())
563    }
564
565    #[test]
566    fn handler_at_least_once_ack() -> anyhow::Result<()> {
567        let (ack_tx, mut ack_rx) = unbounded_channel();
568        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
569        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
570
571        h.ack();
572        let ack = ack_rx.try_recv()?;
573        assert_eq!(ack, Action::Ack(test_id(1)));
574
575        Ok(())
576    }
577
578    #[test]
579    fn handler_at_least_once_nack() -> anyhow::Result<()> {
580        let (ack_tx, mut ack_rx) = unbounded_channel();
581        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
582        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
583
584        h.nack();
585        let ack = ack_rx.try_recv()?;
586        assert_eq!(ack, Action::Nack(test_id(1)));
587
588        Ok(())
589    }
590
591    #[test]
592    fn handler_exactly_once_ack() -> anyhow::Result<()> {
593        let (ack_tx, mut ack_rx) = unbounded_channel();
594        let (_result_tx, result_rx) = channel();
595        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
596        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
597
598        h.ack();
599        let ack = ack_rx.try_recv()?;
600        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
601
602        Ok(())
603    }
604
605    #[test]
606    fn handler_exactly_once_nack() -> anyhow::Result<()> {
607        let (ack_tx, mut ack_rx) = unbounded_channel();
608        let (_result_tx, result_rx) = channel();
609        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
610        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
611
612        h.nack();
613        let ack = ack_rx.try_recv()?;
614        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
615
616        Ok(())
617    }
618
619    #[test]
620    fn at_least_once_ack() -> anyhow::Result<()> {
621        let (ack_tx, mut ack_rx) = unbounded_channel();
622        let h = AtLeastOnce::new(test_id(1), ack_tx, None);
623        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
624
625        h.ack();
626        let ack = ack_rx.try_recv()?;
627        assert_eq!(ack, Action::Ack(test_id(1)));
628
629        Ok(())
630    }
631
632    #[test]
633    fn at_least_once_nack() -> anyhow::Result<()> {
634        let (ack_tx, mut ack_rx) = unbounded_channel();
635        let h = AtLeastOnce::new(test_id(1), ack_tx, None);
636        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
637
638        h.nack();
639        let ack = ack_rx.try_recv()?;
640        assert_eq!(ack, Action::Nack(test_id(1)));
641
642        Ok(())
643    }
644
645    #[test]
646    fn exactly_once_ack() -> anyhow::Result<()> {
647        let (ack_tx, mut ack_rx) = unbounded_channel();
648        let (_result_tx, result_rx) = channel();
649        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
650        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
651
652        h.ack();
653        let ack = ack_rx.try_recv()?;
654        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
655
656        Ok(())
657    }
658
659    #[tokio::test]
660    async fn exactly_once_success() -> anyhow::Result<()> {
661        let (ack_tx, mut ack_rx) = unbounded_channel();
662        let (result_tx, result_rx) = channel();
663        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
664        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
665
666        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
667
668        let ack = ack_rx.recv().await.expect("ack should be sent");
669        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
670
671        result_tx
672            .send(Ok(()))
673            .expect("sending on a channel succeeds");
674        task.await??;
675
676        Ok(())
677    }
678
679    #[tokio::test]
680    async fn exactly_once_nack_success() -> anyhow::Result<()> {
681        let (ack_tx, mut ack_rx) = unbounded_channel();
682        let (result_tx, result_rx) = channel();
683        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
684        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
685
686        let task = tokio::task::spawn(async move { h.confirmed_nack().await });
687
688        let nack = ack_rx.recv().await.expect("ack should be sent");
689        assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
690
691        result_tx
692            .send(Ok(()))
693            .expect("sending on a channel succeeds");
694        task.await??;
695
696        Ok(())
697    }
698
699    #[tokio::test]
700    async fn exactly_once_error() -> anyhow::Result<()> {
701        let (ack_tx, mut ack_rx) = unbounded_channel();
702        let (result_tx, result_rx) = channel();
703        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
704        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
705
706        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
707
708        let ack = ack_rx.recv().await.expect("ack should be sent");
709        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
710
711        result_tx
712            .send(Err(AckError::LeaseExpired))
713            .expect("sending on a channel succeeds");
714        let err = task.await?.expect_err("ack should fail");
715        assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
716
717        Ok(())
718    }
719
720    #[tokio::test]
721    async fn exactly_once_nack_error() -> anyhow::Result<()> {
722        let (ack_tx, mut ack_rx) = unbounded_channel();
723        let (result_tx, result_rx) = channel();
724        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
725        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
726
727        let task = tokio::task::spawn(async move { h.confirmed_nack().await });
728
729        let nack = ack_rx.recv().await.expect("ack should be sent");
730        assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
731
732        result_tx
733            .send(Err(AckError::LeaseExpired))
734            .expect("sending on a channel succeeds");
735        let err = task.await?.expect_err("ack should fail");
736        assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
737
738        Ok(())
739    }
740
741    #[tokio::test]
742    async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
743        let (ack_tx, mut ack_rx) = unbounded_channel();
744        let (_result_tx, result_rx) = channel();
745        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
746        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
747        drop(ack_rx);
748
749        let err = h.confirmed_ack().await.expect_err("ack should fail");
750        assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
751
752        Ok(())
753    }
754
755    #[tokio::test]
756    async fn exactly_once_nack_action_channel_closed() -> anyhow::Result<()> {
757        let (ack_tx, mut ack_rx) = unbounded_channel();
758        let (_result_tx, result_rx) = channel();
759        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
760        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
761        drop(ack_rx);
762
763        let err = h.confirmed_nack().await.expect_err("nack should fail");
764        assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
765        assert_eq!(
766            err.source()
767                .expect("shutdown errors have a source")
768                .to_string(),
769            NACK_SHUTDOWN_ERROR
770        );
771
772        Ok(())
773    }
774
775    #[tokio::test]
776    async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
777        let (ack_tx, mut ack_rx) = unbounded_channel();
778        let (result_tx, result_rx) = channel();
779        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
780        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
781
782        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
783
784        let ack = ack_rx.recv().await.expect("ack should be sent");
785        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
786
787        drop(result_tx);
788        let err = task.await?.expect_err("ack should fail");
789        assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
790
791        Ok(())
792    }
793
794    #[test]
795    fn exactly_once_nack() -> anyhow::Result<()> {
796        let (ack_tx, mut ack_rx) = unbounded_channel();
797        let (_result_tx, result_rx) = channel();
798        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
799        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
800
801        h.nack();
802        let ack = ack_rx.try_recv()?;
803        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
804
805        Ok(())
806    }
807
808    #[test]
809    fn handler_at_least_once_nack_on_drop() -> anyhow::Result<()> {
810        let (ack_tx, mut ack_rx) = unbounded_channel();
811        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
812        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
813
814        drop(h);
815        let ack = ack_rx.try_recv()?;
816        assert_eq!(ack, Action::Nack(test_id(1)));
817
818        Ok(())
819    }
820
821    #[test]
822    fn handler_exactly_once_nack_on_drop() -> anyhow::Result<()> {
823        let (ack_tx, mut ack_rx) = unbounded_channel();
824        let (_result_tx, result_rx) = channel();
825        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
826        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
827
828        drop(h);
829        let ack = ack_rx.try_recv()?;
830        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
831
832        Ok(())
833    }
834
835    #[test]
836    fn at_least_once_nack_on_drop() -> anyhow::Result<()> {
837        let (ack_tx, mut ack_rx) = unbounded_channel();
838        let h = AtLeastOnce::new(test_id(1), ack_tx, None);
839        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
840
841        drop(h);
842        let ack = ack_rx.try_recv()?;
843        assert_eq!(ack, Action::Nack(test_id(1)));
844
845        Ok(())
846    }
847
848    #[test]
849    fn exactly_once_nack_on_drop() -> anyhow::Result<()> {
850        let (ack_tx, mut ack_rx) = unbounded_channel();
851        let (_result_tx, result_rx) = channel();
852        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
853        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
854
855        drop(h);
856        let ack = ack_rx.try_recv()?;
857        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
858
859        Ok(())
860    }
861}