Skip to main content

google_cloud_pubsub/subscriber/
handler.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handlers for acknowledging or rejecting messages.
16//!
17//! To acknowledge (ack) a message, you call [`Handler::ack()`].
18//!
19//! To reject (nack) a message, you call [`Handler::nack()`]. The
20//! message will be redelivered.
21//!
22//! # Example
23//!
24//! ```
25//! use google_cloud_pubsub::model::Message;
26//! # use google_cloud_pubsub::subscriber::handler::Handler;
27//! fn on_message(m: Message, h: Handler) {
28//!   match process(m) {
29//!     Ok(_) => h.ack(),
30//!     Err(e) => {
31//!         println!("failed to process message: {e:?}");
32//!         h.nack();
33//!     }
34//!   }
35//! }
36//!
37//! fn process(m: Message) -> anyhow::Result<()> {
38//!   // some business logic here...
39//!   # panic!()
40//! }
41//! ```
42
43use crate::error::AckError;
44use crate::subscriber::lease_state::NACK_SHUTDOWN_ERROR;
45use tokio::sync::mpsc::UnboundedSender;
46use tokio::sync::oneshot::Receiver;
47
48/// The action an application does with a message.
49#[derive(Debug, PartialEq)]
50pub(super) enum Action {
51    Ack(String),
52    Nack(String),
53    ExactlyOnceAck(String),
54    ExactlyOnceNack(String),
55}
56
57/// A handler for acknowledging or rejecting messages.
58///
59/// # Example
60///
61/// ```
62/// use google_cloud_pubsub::model::Message;
63/// # use google_cloud_pubsub::subscriber::handler::Handler;
64/// fn on_message(m: Message, h: Handler) {
65///   match process(m) {
66///     Ok(_) => h.ack(),
67///     Err(e) => {
68///         println!("failed to process message: {e:?}");
69///         h.nack();
70///     }
71///   }
72/// }
73///
74/// fn process(m: Message) -> anyhow::Result<()> {
75///   // some business logic here...
76///   # panic!()
77/// }
78/// ```
79///
80/// To acknowledge (ack) a message, you call [`Handler::ack()`].
81///
82/// To reject (nack) a message, you call [`Handler::nack()`]. The
83/// service will redeliver the message.
84///
85/// ## Exactly-once delivery
86///
87/// If your subscription has [exactly-once delivery] enabled, you should
88/// destructure this enum into its [`Handler::ExactlyOnce`] branch.
89///
90/// Only when `ExactlyOnce::confirmed_ack()` returns `Ok` can you be certain
91/// that the message will not be redelivered.
92///
93/// [exactly-once delivery]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
94///
95/// ```
96/// use google_cloud_pubsub::model::Message;
97/// # use google_cloud_pubsub::subscriber::handler::Handler;
98/// async fn on_message(m: Message, h: Handler) {
99///   let Handler::ExactlyOnce(h) = h else {
100///     panic!("Oops, my subscription does not have exactly-once delivery enabled.")
101///   };
102///   match h.confirmed_ack().await {
103///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
104///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
105///   }
106/// }
107/// ```
108#[derive(Debug)]
109#[non_exhaustive]
110pub enum Handler {
111    AtLeastOnce(AtLeastOnce),
112    ExactlyOnce(ExactlyOnce),
113}
114
115impl Handler {
116    /// Acknowledge the message associated with this handler.
117    ///
118    /// # Example
119    ///
120    /// ```
121    /// use google_cloud_pubsub::model::Message;
122    /// # use google_cloud_pubsub::subscriber::handler::Handler;
123    /// fn on_message(m: Message, h: Handler) {
124    ///   println!("Received message: {m:?}");
125    ///   h.ack();
126    /// }
127    /// ```
128    ///
129    /// Note that the acknowledgement is best effort. The message may still be
130    /// redelivered to this client, or another client, even if exactly-once
131    /// delivery is enabled on the subscription.
132    pub fn ack(self) {
133        match self {
134            Handler::AtLeastOnce(h) => h.ack(),
135            Handler::ExactlyOnce(h) => h.ack(),
136        }
137    }
138
139    /// Rejects the message associated with this handler.
140    ///
141    /// # Example
142    ///
143    /// ```
144    /// use google_cloud_pubsub::model::Message;
145    /// # use google_cloud_pubsub::subscriber::handler::Handler;
146    /// fn on_message(m: Message, h: Handler) {
147    ///   println!("Received message: {m:?}");
148    ///   h.nack();
149    /// }
150    /// ```
151    ///
152    /// The message will be removed from this `Subscriber`'s lease management.
153    /// The service will redeliver this message, possibly to another client.
154    pub fn nack(self) {
155        match self {
156            Handler::AtLeastOnce(h) => h.nack(),
157            Handler::ExactlyOnce(h) => h.nack(),
158        }
159    }
160
161    #[cfg(test)]
162    pub(crate) fn ack_id(&self) -> &str {
163        match self {
164            Handler::AtLeastOnce(h) => h.ack_id(),
165            Handler::ExactlyOnce(h) => h.ack_id(),
166        }
167    }
168}
169
170#[derive(Debug)]
171struct AtLeastOnceImpl {
172    ack_id: String,
173    ack_tx: UnboundedSender<Action>,
174}
175
176impl AtLeastOnceImpl {
177    fn ack(self) {
178        let _ = self.ack_tx.send(Action::Ack(self.ack_id));
179    }
180
181    fn nack(self) {
182        let _ = self.ack_tx.send(Action::Nack(self.ack_id));
183    }
184}
185
186/// A handler for at-least-once delivery.
187#[derive(Debug)]
188pub struct AtLeastOnce {
189    inner: Option<AtLeastOnceImpl>,
190}
191
192impl AtLeastOnce {
193    pub(super) fn new(ack_id: String, ack_tx: UnboundedSender<Action>) -> Self {
194        Self {
195            inner: Some(AtLeastOnceImpl { ack_id, ack_tx }),
196        }
197    }
198
199    /// Acknowledge the message associated with this handler.
200    ///
201    /// Note that the acknowledgement is best effort. The message may still be
202    /// redelivered to this client, or another client.
203    pub fn ack(mut self) {
204        if let Some(inner) = self.inner.take() {
205            inner.ack();
206        }
207    }
208
209    /// Rejects the message associated with this handler.
210    ///
211    /// # Example
212    ///
213    /// ```
214    /// use google_cloud_pubsub::model::Message;
215    /// # use google_cloud_pubsub::subscriber::handler::AtLeastOnce;
216    /// fn on_message(m: Message, h: AtLeastOnce) {
217    ///   println!("Received message: {m:?}");
218    ///   h.nack();
219    /// }
220    /// ```
221    ///
222    /// The message will be removed from this `Subscriber`'s lease management.
223    /// The service will redeliver this message, possibly to another client.
224    pub fn nack(mut self) {
225        if let Some(inner) = self.inner.take() {
226            inner.nack();
227        }
228    }
229
230    #[cfg(test)]
231    pub(crate) fn ack_id(&self) -> &str {
232        self.inner
233            .as_ref()
234            .map(|i| i.ack_id.as_str())
235            .unwrap_or_default()
236    }
237}
238
239impl Drop for AtLeastOnce {
240    /// Rejects the message associated with this handler.
241    ///
242    /// The message will be removed from this `Subscriber`'s lease management.
243    /// The service will redeliver this message, possibly to another client.
244    fn drop(&mut self) {
245        if let Some(inner) = self.inner.take() {
246            inner.nack();
247        }
248    }
249}
250
251/// A handler for exactly-once delivery.
252#[derive(Debug)]
253pub struct ExactlyOnce {
254    inner: Option<ExactlyOnceImpl>,
255}
256
257impl ExactlyOnce {
258    pub(super) fn new(
259        ack_id: String,
260        ack_tx: UnboundedSender<Action>,
261        result_rx: Receiver<AckResult>,
262    ) -> Self {
263        Self {
264            inner: Some(ExactlyOnceImpl {
265                ack_id,
266                ack_tx,
267                result_rx,
268            }),
269        }
270    }
271
272    /// Acknowledge the message associated with this handler.
273    ///
274    /// Note that the acknowledgement is best effort. The message may still be
275    /// redelivered to this client, or another client.
276    pub(crate) fn ack(mut self) {
277        if let Some(inner) = self.inner.take() {
278            inner.ack();
279        }
280    }
281
282    pub(crate) fn nack(mut self) {
283        if let Some(inner) = self.inner.take() {
284            inner.nack();
285        }
286    }
287
288    /// Strongly acknowledge the message associated with this handler.
289    ///
290    /// ```
291    /// use google_cloud_pubsub::model::Message;
292    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
293    /// async fn on_message(m: Message, h: ExactlyOnce) {
294    ///   match h.confirmed_ack().await {
295    ///     Ok(()) => println!("Confirmed ack for message={m:?}. The message will not be redelivered."),
296    ///     Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
297    ///   }
298    /// }
299    /// ```
300    ///
301    /// If the result is an `Ok`, the message is guaranteed not to be delivered
302    /// again.
303    ///
304    /// If the result is an `Err`, the message may be redelivered, but this is
305    /// not guaranteed. If no redelivery occurs a sufficient interval after an
306    /// error, the acknowledgement likely succeeded.
307    pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
308        let inner = self.inner.take().expect("handler impl is always some");
309        inner.confirmed_ack().await
310    }
311
312    /// Rejects the message associated with this handler and waits for
313    /// confirmation.
314    ///
315    /// ```
316    /// use google_cloud_pubsub::model::Message;
317    /// # use google_cloud_pubsub::subscriber::handler::ExactlyOnce;
318    /// async fn on_message(m: Message, h: ExactlyOnce) {
319    ///   match h.confirmed_nack().await {
320    ///     Ok(()) => println!("Confirmed nack for message={m:?}. The message will be redelivered."),
321    ///     Err(e) => println!("Failed to confirm nack for message={m:?} with error={e:?}"),
322    ///   }
323    /// }
324    /// ```
325    ///
326    /// If the result is an `Ok`, the message is guaranteed to be immediately
327    /// considered for redelivery. If an error occurs, the message will still
328    /// be redelivered, but it may be held for the remainder of its
329    /// `max_lease_extension`.
330    pub async fn confirmed_nack(mut self) -> std::result::Result<(), AckError> {
331        let inner = self.inner.take().expect("handler impl is always some");
332        inner.confirmed_nack().await
333    }
334
335    #[cfg(test)]
336    pub(crate) fn ack_id(&self) -> &str {
337        self.inner
338            .as_ref()
339            .map(|i| i.ack_id.as_str())
340            .unwrap_or_default()
341    }
342}
343
344impl Drop for ExactlyOnce {
345    /// Rejects the message associated with this handler.
346    ///
347    /// The message will be removed from this `Subscriber`'s lease management.
348    /// The service will redeliver this message, possibly to another client.
349    fn drop(&mut self) {
350        if let Some(inner) = self.inner.take() {
351            inner.nack();
352        }
353    }
354}
355
356#[derive(Debug)]
357struct ExactlyOnceImpl {
358    pub(super) ack_id: String,
359    pub(super) ack_tx: UnboundedSender<Action>,
360    pub(super) result_rx: Receiver<AckResult>,
361}
362
363impl ExactlyOnceImpl {
364    pub fn ack(self) {
365        let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
366    }
367
368    pub fn nack(self) {
369        let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
370    }
371
372    pub async fn confirmed_ack(self) -> AckResult {
373        self.ack_tx
374            .send(Action::ExactlyOnceAck(self.ack_id))
375            .map_err(|_| AckError::ShutdownBeforeAck)?;
376        self.result_rx
377            .await
378            .map_err(|e| AckError::Shutdown(e.into()))?
379    }
380
381    pub async fn confirmed_nack(self) -> AckResult {
382        self.ack_tx
383            .send(Action::ExactlyOnceNack(self.ack_id))
384            .map_err(|_| AckError::Shutdown(NACK_SHUTDOWN_ERROR.into()))?;
385        self.result_rx
386            .await
387            .map_err(|e| AckError::Shutdown(e.into()))?
388    }
389}
390
391/// The result of a confirmed acknowledgement.
392pub(super) type AckResult = std::result::Result<(), AckError>;
393
394#[cfg(test)]
395mod tests {
396    use std::error::Error;
397
398    use super::super::lease_state::tests::test_id;
399    use super::*;
400    use tokio::sync::mpsc::error::TryRecvError;
401    use tokio::sync::mpsc::unbounded_channel;
402    use tokio::sync::oneshot::channel;
403
404    #[test]
405    fn handler_at_least_once_ack() -> anyhow::Result<()> {
406        let (ack_tx, mut ack_rx) = unbounded_channel();
407        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
408        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
409
410        h.ack();
411        let ack = ack_rx.try_recv()?;
412        assert_eq!(ack, Action::Ack(test_id(1)));
413
414        Ok(())
415    }
416
417    #[test]
418    fn handler_at_least_once_nack() -> anyhow::Result<()> {
419        let (ack_tx, mut ack_rx) = unbounded_channel();
420        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
421        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
422
423        h.nack();
424        let ack = ack_rx.try_recv()?;
425        assert_eq!(ack, Action::Nack(test_id(1)));
426
427        Ok(())
428    }
429
430    #[test]
431    fn handler_exactly_once_ack() -> anyhow::Result<()> {
432        let (ack_tx, mut ack_rx) = unbounded_channel();
433        let (_result_tx, result_rx) = channel();
434        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
435        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
436
437        h.ack();
438        let ack = ack_rx.try_recv()?;
439        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
440
441        Ok(())
442    }
443
444    #[test]
445    fn handler_exactly_once_nack() -> anyhow::Result<()> {
446        let (ack_tx, mut ack_rx) = unbounded_channel();
447        let (_result_tx, result_rx) = channel();
448        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
449        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
450
451        h.nack();
452        let ack = ack_rx.try_recv()?;
453        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
454
455        Ok(())
456    }
457
458    #[test]
459    fn at_least_once_ack() -> anyhow::Result<()> {
460        let (ack_tx, mut ack_rx) = unbounded_channel();
461        let h = AtLeastOnce::new(test_id(1), ack_tx);
462        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
463
464        h.ack();
465        let ack = ack_rx.try_recv()?;
466        assert_eq!(ack, Action::Ack(test_id(1)));
467
468        Ok(())
469    }
470
471    #[test]
472    fn at_least_once_nack() -> anyhow::Result<()> {
473        let (ack_tx, mut ack_rx) = unbounded_channel();
474        let h = AtLeastOnce::new(test_id(1), ack_tx);
475        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
476
477        h.nack();
478        let ack = ack_rx.try_recv()?;
479        assert_eq!(ack, Action::Nack(test_id(1)));
480
481        Ok(())
482    }
483
484    #[test]
485    fn exactly_once_ack() -> anyhow::Result<()> {
486        let (ack_tx, mut ack_rx) = unbounded_channel();
487        let (_result_tx, result_rx) = channel();
488        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
489        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
490
491        h.ack();
492        let ack = ack_rx.try_recv()?;
493        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
494
495        Ok(())
496    }
497
498    #[tokio::test]
499    async fn exactly_once_success() -> anyhow::Result<()> {
500        let (ack_tx, mut ack_rx) = unbounded_channel();
501        let (result_tx, result_rx) = channel();
502        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
503        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
504
505        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
506
507        let ack = ack_rx.recv().await.expect("ack should be sent");
508        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
509
510        result_tx
511            .send(Ok(()))
512            .expect("sending on a channel succeeds");
513        task.await??;
514
515        Ok(())
516    }
517
518    #[tokio::test]
519    async fn exactly_once_nack_success() -> anyhow::Result<()> {
520        let (ack_tx, mut ack_rx) = unbounded_channel();
521        let (result_tx, result_rx) = channel();
522        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
523        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
524
525        let task = tokio::task::spawn(async move { h.confirmed_nack().await });
526
527        let nack = ack_rx.recv().await.expect("ack should be sent");
528        assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
529
530        result_tx
531            .send(Ok(()))
532            .expect("sending on a channel succeeds");
533        task.await??;
534
535        Ok(())
536    }
537
538    #[tokio::test]
539    async fn exactly_once_error() -> anyhow::Result<()> {
540        let (ack_tx, mut ack_rx) = unbounded_channel();
541        let (result_tx, result_rx) = channel();
542        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
543        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
544
545        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
546
547        let ack = ack_rx.recv().await.expect("ack should be sent");
548        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
549
550        result_tx
551            .send(Err(AckError::LeaseExpired))
552            .expect("sending on a channel succeeds");
553        let err = task.await?.expect_err("ack should fail");
554        assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
555
556        Ok(())
557    }
558
559    #[tokio::test]
560    async fn exactly_once_nack_error() -> anyhow::Result<()> {
561        let (ack_tx, mut ack_rx) = unbounded_channel();
562        let (result_tx, result_rx) = channel();
563        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
564        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
565
566        let task = tokio::task::spawn(async move { h.confirmed_nack().await });
567
568        let nack = ack_rx.recv().await.expect("ack should be sent");
569        assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
570
571        result_tx
572            .send(Err(AckError::LeaseExpired))
573            .expect("sending on a channel succeeds");
574        let err = task.await?.expect_err("ack should fail");
575        assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
576
577        Ok(())
578    }
579
580    #[tokio::test]
581    async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
582        let (ack_tx, mut ack_rx) = unbounded_channel();
583        let (_result_tx, result_rx) = channel();
584        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
585        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
586        drop(ack_rx);
587
588        let err = h.confirmed_ack().await.expect_err("ack should fail");
589        assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
590
591        Ok(())
592    }
593
594    #[tokio::test]
595    async fn exactly_once_nack_action_channel_closed() -> anyhow::Result<()> {
596        let (ack_tx, mut ack_rx) = unbounded_channel();
597        let (_result_tx, result_rx) = channel();
598        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
599        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
600        drop(ack_rx);
601
602        let err = h.confirmed_nack().await.expect_err("nack should fail");
603        assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
604        assert_eq!(
605            err.source()
606                .expect("shutdown errors have a source")
607                .to_string(),
608            NACK_SHUTDOWN_ERROR
609        );
610
611        Ok(())
612    }
613
614    #[tokio::test]
615    async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
616        let (ack_tx, mut ack_rx) = unbounded_channel();
617        let (result_tx, result_rx) = channel();
618        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
619        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
620
621        let task = tokio::task::spawn(async move { h.confirmed_ack().await });
622
623        let ack = ack_rx.recv().await.expect("ack should be sent");
624        assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
625
626        drop(result_tx);
627        let err = task.await?.expect_err("ack should fail");
628        assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
629
630        Ok(())
631    }
632
633    #[test]
634    fn exactly_once_nack() -> anyhow::Result<()> {
635        let (ack_tx, mut ack_rx) = unbounded_channel();
636        let (_result_tx, result_rx) = channel();
637        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
638        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
639
640        h.nack();
641        let ack = ack_rx.try_recv()?;
642        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
643
644        Ok(())
645    }
646
647    #[test]
648    fn handler_at_least_once_nack_on_drop() -> anyhow::Result<()> {
649        let (ack_tx, mut ack_rx) = unbounded_channel();
650        let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
651        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
652
653        drop(h);
654        let ack = ack_rx.try_recv()?;
655        assert_eq!(ack, Action::Nack(test_id(1)));
656
657        Ok(())
658    }
659
660    #[test]
661    fn handler_exactly_once_nack_on_drop() -> anyhow::Result<()> {
662        let (ack_tx, mut ack_rx) = unbounded_channel();
663        let (_result_tx, result_rx) = channel();
664        let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
665        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
666
667        drop(h);
668        let ack = ack_rx.try_recv()?;
669        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
670
671        Ok(())
672    }
673
674    #[test]
675    fn at_least_once_nack_on_drop() -> anyhow::Result<()> {
676        let (ack_tx, mut ack_rx) = unbounded_channel();
677        let h = AtLeastOnce::new(test_id(1), ack_tx);
678        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
679
680        drop(h);
681        let ack = ack_rx.try_recv()?;
682        assert_eq!(ack, Action::Nack(test_id(1)));
683
684        Ok(())
685    }
686
687    #[test]
688    fn exactly_once_nack_on_drop() -> anyhow::Result<()> {
689        let (ack_tx, mut ack_rx) = unbounded_channel();
690        let (_result_tx, result_rx) = channel();
691        let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
692        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
693
694        drop(h);
695        let ack = ack_rx.try_recv()?;
696        assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
697
698        Ok(())
699    }
700}