1use crate::error::AckError;
44use crate::subscriber::lease_state::NACK_SHUTDOWN_ERROR;
45use tokio::sync::mpsc::UnboundedSender;
46use tokio::sync::oneshot::Receiver;
47
48#[derive(Debug, PartialEq)]
50pub(super) enum Action {
51 Ack(String),
52 Nack(String),
53 ExactlyOnceAck(String),
54 ExactlyOnceNack(String),
55}
56
57#[derive(Debug)]
126#[non_exhaustive]
127pub enum Handler {
128 AtLeastOnce(AtLeastOnce),
141 ExactlyOnce(ExactlyOnce),
154}
155
156impl Handler {
157 pub fn ack(self) {
174 match self {
175 Handler::AtLeastOnce(h) => h.ack(),
176 Handler::ExactlyOnce(h) => h.ack(),
177 }
178 }
179
180 pub fn nack(self) {
196 match self {
197 Handler::AtLeastOnce(h) => h.nack(),
198 Handler::ExactlyOnce(h) => h.nack(),
199 }
200 }
201
202 pub fn delivery_attempt(&self) -> Option<i32> {
219 match self {
220 Handler::AtLeastOnce(h) => h.delivery_attempt(),
221 Handler::ExactlyOnce(h) => h.delivery_attempt(),
222 }
223 }
224}
225
226#[derive(Debug)]
227struct AtLeastOnceImpl {
228 ack_id: String,
229 ack_tx: UnboundedSender<Action>,
230 delivery_attempt: Option<i32>,
231}
232
233impl AtLeastOnceImpl {
234 fn ack(self) {
235 let _ = self.ack_tx.send(Action::Ack(self.ack_id));
236 }
237
238 fn nack(self) {
239 let _ = self.ack_tx.send(Action::Nack(self.ack_id));
240 }
241}
242
243#[derive(Debug)]
245pub struct AtLeastOnce {
246 inner: Option<AtLeastOnceImpl>,
247}
248
249impl AtLeastOnce {
250 pub(super) fn new(
251 ack_id: String,
252 ack_tx: UnboundedSender<Action>,
253 delivery_attempt: Option<i32>,
254 ) -> Self {
255 Self {
256 inner: Some(AtLeastOnceImpl {
257 ack_id,
258 ack_tx,
259 delivery_attempt,
260 }),
261 }
262 }
263
264 pub fn ack(mut self) {
269 if let Some(inner) = self.inner.take() {
270 inner.ack();
271 }
272 }
273
274 pub fn nack(mut self) {
290 if let Some(inner) = self.inner.take() {
291 inner.nack();
292 }
293 }
294
295 pub fn delivery_attempt(&self) -> Option<i32> {
312 self.inner.as_ref().and_then(|i| i.delivery_attempt)
313 }
314}
315
316impl Drop for AtLeastOnce {
317 fn drop(&mut self) {
322 if let Some(inner) = self.inner.take() {
323 inner.nack();
324 }
325 }
326}
327
328#[derive(Debug)]
330pub struct ExactlyOnce {
331 inner: Option<ExactlyOnceImpl>,
332}
333
334impl ExactlyOnce {
335 pub(super) fn new(
336 ack_id: String,
337 ack_tx: UnboundedSender<Action>,
338 result_rx: Receiver<AckResult>,
339 delivery_attempt: Option<i32>,
340 ) -> Self {
341 Self {
342 inner: Some(ExactlyOnceImpl {
343 ack_id,
344 ack_tx,
345 result_rx,
346 delivery_attempt,
347 }),
348 }
349 }
350
351 pub(crate) fn ack(mut self) {
356 if let Some(inner) = self.inner.take() {
357 inner.ack();
358 }
359 }
360
361 pub(crate) fn nack(mut self) {
362 if let Some(inner) = self.inner.take() {
363 inner.nack();
364 }
365 }
366
367 pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
387 let inner = self.inner.take().expect("handler impl is always some");
388 inner.confirmed_ack().await
389 }
390
391 pub async fn confirmed_nack(mut self) -> std::result::Result<(), AckError> {
410 let inner = self.inner.take().expect("handler impl is always some");
411 inner.confirmed_nack().await
412 }
413
414 pub fn delivery_attempt(&self) -> Option<i32> {
431 self.inner.as_ref().and_then(|i| i.delivery_attempt)
432 }
433}
434
435impl Drop for ExactlyOnce {
436 fn drop(&mut self) {
441 if let Some(inner) = self.inner.take() {
442 inner.nack();
443 }
444 }
445}
446
447#[derive(Debug)]
448struct ExactlyOnceImpl {
449 pub(super) ack_id: String,
450 pub(super) ack_tx: UnboundedSender<Action>,
451 pub(super) result_rx: Receiver<AckResult>,
452 pub(super) delivery_attempt: Option<i32>,
453}
454
455impl ExactlyOnceImpl {
456 pub fn ack(self) {
457 let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
458 }
459
460 pub fn nack(self) {
461 let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
462 }
463
464 pub async fn confirmed_ack(self) -> AckResult {
465 self.ack_tx
466 .send(Action::ExactlyOnceAck(self.ack_id))
467 .map_err(|_| AckError::ShutdownBeforeAck)?;
468 self.result_rx
469 .await
470 .map_err(|e| AckError::Shutdown(e.into()))?
471 }
472
473 pub async fn confirmed_nack(self) -> AckResult {
474 self.ack_tx
475 .send(Action::ExactlyOnceNack(self.ack_id))
476 .map_err(|_| AckError::Shutdown(NACK_SHUTDOWN_ERROR.into()))?;
477 self.result_rx
478 .await
479 .map_err(|e| AckError::Shutdown(e.into()))?
480 }
481}
482
483pub(super) type AckResult = std::result::Result<(), AckError>;
485
486#[cfg(test)]
487mod tests {
488 use std::error::Error;
489
490 use super::super::lease_state::tests::test_id;
491 use super::*;
492 use tokio::sync::mpsc::error::TryRecvError;
493 use tokio::sync::mpsc::unbounded_channel;
494 use tokio::sync::oneshot::channel;
495
496 impl Handler {
497 pub(crate) fn ack_id(&self) -> &str {
498 match self {
499 Handler::AtLeastOnce(h) => h.ack_id(),
500 Handler::ExactlyOnce(h) => h.ack_id(),
501 }
502 }
503 }
504
505 impl AtLeastOnce {
506 pub(crate) fn ack_id(&self) -> &str {
507 self.inner
508 .as_ref()
509 .map(|i| i.ack_id.as_str())
510 .unwrap_or_default()
511 }
512 }
513
514 impl ExactlyOnce {
515 pub(crate) fn ack_id(&self) -> &str {
516 self.inner
517 .as_ref()
518 .map(|i| i.ack_id.as_str())
519 .unwrap_or_default()
520 }
521 }
522
523 #[test]
524 fn handler_delivery_attempt() -> anyhow::Result<()> {
525 let (ack_tx, _) = unbounded_channel();
526 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, Some(5)));
527 assert_eq!(h.delivery_attempt(), Some(5));
528 Ok(())
529 }
530
531 #[test]
532 fn at_least_once_delivery_attempt() -> anyhow::Result<()> {
533 let (ack_tx, _) = unbounded_channel();
534 let h = AtLeastOnce::new(test_id(1), ack_tx, Some(5));
535 assert_eq!(h.delivery_attempt(), Some(5));
536 Ok(())
537 }
538
539 #[test]
540 fn at_least_once_delivery_attempt_none() -> anyhow::Result<()> {
541 let (ack_tx, _) = unbounded_channel();
542 let h = AtLeastOnce::new(test_id(1), ack_tx, None);
543 assert_eq!(h.delivery_attempt(), None);
544 Ok(())
545 }
546
547 #[test]
548 fn exactly_once_delivery_attempt() -> anyhow::Result<()> {
549 let (ack_tx, _) = unbounded_channel();
550 let (_result_tx, result_rx) = channel();
551 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, Some(5));
552 assert_eq!(h.delivery_attempt(), Some(5));
553 Ok(())
554 }
555
556 #[test]
557 fn exactly_once_delivery_attempt_none() -> anyhow::Result<()> {
558 let (ack_tx, _) = unbounded_channel();
559 let (_result_tx, result_rx) = channel();
560 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
561 assert_eq!(h.delivery_attempt(), None);
562 Ok(())
563 }
564
565 #[test]
566 fn handler_at_least_once_ack() -> anyhow::Result<()> {
567 let (ack_tx, mut ack_rx) = unbounded_channel();
568 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
569 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
570
571 h.ack();
572 let ack = ack_rx.try_recv()?;
573 assert_eq!(ack, Action::Ack(test_id(1)));
574
575 Ok(())
576 }
577
578 #[test]
579 fn handler_at_least_once_nack() -> anyhow::Result<()> {
580 let (ack_tx, mut ack_rx) = unbounded_channel();
581 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
582 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
583
584 h.nack();
585 let ack = ack_rx.try_recv()?;
586 assert_eq!(ack, Action::Nack(test_id(1)));
587
588 Ok(())
589 }
590
591 #[test]
592 fn handler_exactly_once_ack() -> anyhow::Result<()> {
593 let (ack_tx, mut ack_rx) = unbounded_channel();
594 let (_result_tx, result_rx) = channel();
595 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
596 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
597
598 h.ack();
599 let ack = ack_rx.try_recv()?;
600 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
601
602 Ok(())
603 }
604
605 #[test]
606 fn handler_exactly_once_nack() -> anyhow::Result<()> {
607 let (ack_tx, mut ack_rx) = unbounded_channel();
608 let (_result_tx, result_rx) = channel();
609 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
610 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
611
612 h.nack();
613 let ack = ack_rx.try_recv()?;
614 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
615
616 Ok(())
617 }
618
619 #[test]
620 fn at_least_once_ack() -> anyhow::Result<()> {
621 let (ack_tx, mut ack_rx) = unbounded_channel();
622 let h = AtLeastOnce::new(test_id(1), ack_tx, None);
623 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
624
625 h.ack();
626 let ack = ack_rx.try_recv()?;
627 assert_eq!(ack, Action::Ack(test_id(1)));
628
629 Ok(())
630 }
631
632 #[test]
633 fn at_least_once_nack() -> anyhow::Result<()> {
634 let (ack_tx, mut ack_rx) = unbounded_channel();
635 let h = AtLeastOnce::new(test_id(1), ack_tx, None);
636 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
637
638 h.nack();
639 let ack = ack_rx.try_recv()?;
640 assert_eq!(ack, Action::Nack(test_id(1)));
641
642 Ok(())
643 }
644
645 #[test]
646 fn exactly_once_ack() -> anyhow::Result<()> {
647 let (ack_tx, mut ack_rx) = unbounded_channel();
648 let (_result_tx, result_rx) = channel();
649 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
650 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
651
652 h.ack();
653 let ack = ack_rx.try_recv()?;
654 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
655
656 Ok(())
657 }
658
659 #[tokio::test]
660 async fn exactly_once_success() -> anyhow::Result<()> {
661 let (ack_tx, mut ack_rx) = unbounded_channel();
662 let (result_tx, result_rx) = channel();
663 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
664 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
665
666 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
667
668 let ack = ack_rx.recv().await.expect("ack should be sent");
669 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
670
671 result_tx
672 .send(Ok(()))
673 .expect("sending on a channel succeeds");
674 task.await??;
675
676 Ok(())
677 }
678
679 #[tokio::test]
680 async fn exactly_once_nack_success() -> anyhow::Result<()> {
681 let (ack_tx, mut ack_rx) = unbounded_channel();
682 let (result_tx, result_rx) = channel();
683 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
684 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
685
686 let task = tokio::task::spawn(async move { h.confirmed_nack().await });
687
688 let nack = ack_rx.recv().await.expect("ack should be sent");
689 assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
690
691 result_tx
692 .send(Ok(()))
693 .expect("sending on a channel succeeds");
694 task.await??;
695
696 Ok(())
697 }
698
699 #[tokio::test]
700 async fn exactly_once_error() -> anyhow::Result<()> {
701 let (ack_tx, mut ack_rx) = unbounded_channel();
702 let (result_tx, result_rx) = channel();
703 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
704 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
705
706 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
707
708 let ack = ack_rx.recv().await.expect("ack should be sent");
709 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
710
711 result_tx
712 .send(Err(AckError::LeaseExpired))
713 .expect("sending on a channel succeeds");
714 let err = task.await?.expect_err("ack should fail");
715 assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
716
717 Ok(())
718 }
719
720 #[tokio::test]
721 async fn exactly_once_nack_error() -> anyhow::Result<()> {
722 let (ack_tx, mut ack_rx) = unbounded_channel();
723 let (result_tx, result_rx) = channel();
724 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
725 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
726
727 let task = tokio::task::spawn(async move { h.confirmed_nack().await });
728
729 let nack = ack_rx.recv().await.expect("ack should be sent");
730 assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
731
732 result_tx
733 .send(Err(AckError::LeaseExpired))
734 .expect("sending on a channel succeeds");
735 let err = task.await?.expect_err("ack should fail");
736 assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
737
738 Ok(())
739 }
740
741 #[tokio::test]
742 async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
743 let (ack_tx, mut ack_rx) = unbounded_channel();
744 let (_result_tx, result_rx) = channel();
745 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
746 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
747 drop(ack_rx);
748
749 let err = h.confirmed_ack().await.expect_err("ack should fail");
750 assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
751
752 Ok(())
753 }
754
755 #[tokio::test]
756 async fn exactly_once_nack_action_channel_closed() -> anyhow::Result<()> {
757 let (ack_tx, mut ack_rx) = unbounded_channel();
758 let (_result_tx, result_rx) = channel();
759 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
760 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
761 drop(ack_rx);
762
763 let err = h.confirmed_nack().await.expect_err("nack should fail");
764 assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
765 assert_eq!(
766 err.source()
767 .expect("shutdown errors have a source")
768 .to_string(),
769 NACK_SHUTDOWN_ERROR
770 );
771
772 Ok(())
773 }
774
775 #[tokio::test]
776 async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
777 let (ack_tx, mut ack_rx) = unbounded_channel();
778 let (result_tx, result_rx) = channel();
779 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
780 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
781
782 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
783
784 let ack = ack_rx.recv().await.expect("ack should be sent");
785 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
786
787 drop(result_tx);
788 let err = task.await?.expect_err("ack should fail");
789 assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
790
791 Ok(())
792 }
793
794 #[test]
795 fn exactly_once_nack() -> anyhow::Result<()> {
796 let (ack_tx, mut ack_rx) = unbounded_channel();
797 let (_result_tx, result_rx) = channel();
798 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
799 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
800
801 h.nack();
802 let ack = ack_rx.try_recv()?;
803 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
804
805 Ok(())
806 }
807
808 #[test]
809 fn handler_at_least_once_nack_on_drop() -> anyhow::Result<()> {
810 let (ack_tx, mut ack_rx) = unbounded_channel();
811 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx, None));
812 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
813
814 drop(h);
815 let ack = ack_rx.try_recv()?;
816 assert_eq!(ack, Action::Nack(test_id(1)));
817
818 Ok(())
819 }
820
821 #[test]
822 fn handler_exactly_once_nack_on_drop() -> anyhow::Result<()> {
823 let (ack_tx, mut ack_rx) = unbounded_channel();
824 let (_result_tx, result_rx) = channel();
825 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx, None));
826 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
827
828 drop(h);
829 let ack = ack_rx.try_recv()?;
830 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
831
832 Ok(())
833 }
834
835 #[test]
836 fn at_least_once_nack_on_drop() -> anyhow::Result<()> {
837 let (ack_tx, mut ack_rx) = unbounded_channel();
838 let h = AtLeastOnce::new(test_id(1), ack_tx, None);
839 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
840
841 drop(h);
842 let ack = ack_rx.try_recv()?;
843 assert_eq!(ack, Action::Nack(test_id(1)));
844
845 Ok(())
846 }
847
848 #[test]
849 fn exactly_once_nack_on_drop() -> anyhow::Result<()> {
850 let (ack_tx, mut ack_rx) = unbounded_channel();
851 let (_result_tx, result_rx) = channel();
852 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx, None);
853 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
854
855 drop(h);
856 let ack = ack_rx.try_recv()?;
857 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
858
859 Ok(())
860 }
861}