google_cloud_pubsub/subscriber/
handler.rs1use crate::error::AckError;
44use crate::subscriber::lease_state::NACK_SHUTDOWN_ERROR;
45use tokio::sync::mpsc::UnboundedSender;
46use tokio::sync::oneshot::Receiver;
47
48#[derive(Debug, PartialEq)]
50pub(super) enum Action {
51 Ack(String),
52 Nack(String),
53 ExactlyOnceAck(String),
54 ExactlyOnceNack(String),
55}
56
57#[derive(Debug)]
109#[non_exhaustive]
110pub enum Handler {
111 AtLeastOnce(AtLeastOnce),
112 ExactlyOnce(ExactlyOnce),
113}
114
115impl Handler {
116 pub fn ack(self) {
133 match self {
134 Handler::AtLeastOnce(h) => h.ack(),
135 Handler::ExactlyOnce(h) => h.ack(),
136 }
137 }
138
139 pub fn nack(self) {
155 match self {
156 Handler::AtLeastOnce(h) => h.nack(),
157 Handler::ExactlyOnce(h) => h.nack(),
158 }
159 }
160
161 #[cfg(test)]
162 pub(crate) fn ack_id(&self) -> &str {
163 match self {
164 Handler::AtLeastOnce(h) => h.ack_id(),
165 Handler::ExactlyOnce(h) => h.ack_id(),
166 }
167 }
168}
169
170#[derive(Debug)]
171struct AtLeastOnceImpl {
172 ack_id: String,
173 ack_tx: UnboundedSender<Action>,
174}
175
176impl AtLeastOnceImpl {
177 fn ack(self) {
178 let _ = self.ack_tx.send(Action::Ack(self.ack_id));
179 }
180
181 fn nack(self) {
182 let _ = self.ack_tx.send(Action::Nack(self.ack_id));
183 }
184}
185
186#[derive(Debug)]
188pub struct AtLeastOnce {
189 inner: Option<AtLeastOnceImpl>,
190}
191
192impl AtLeastOnce {
193 pub(super) fn new(ack_id: String, ack_tx: UnboundedSender<Action>) -> Self {
194 Self {
195 inner: Some(AtLeastOnceImpl { ack_id, ack_tx }),
196 }
197 }
198
199 pub fn ack(mut self) {
204 if let Some(inner) = self.inner.take() {
205 inner.ack();
206 }
207 }
208
209 pub fn nack(mut self) {
225 if let Some(inner) = self.inner.take() {
226 inner.nack();
227 }
228 }
229
230 #[cfg(test)]
231 pub(crate) fn ack_id(&self) -> &str {
232 self.inner
233 .as_ref()
234 .map(|i| i.ack_id.as_str())
235 .unwrap_or_default()
236 }
237}
238
239impl Drop for AtLeastOnce {
240 fn drop(&mut self) {
245 if let Some(inner) = self.inner.take() {
246 inner.nack();
247 }
248 }
249}
250
251#[derive(Debug)]
253pub struct ExactlyOnce {
254 inner: Option<ExactlyOnceImpl>,
255}
256
257impl ExactlyOnce {
258 pub(super) fn new(
259 ack_id: String,
260 ack_tx: UnboundedSender<Action>,
261 result_rx: Receiver<AckResult>,
262 ) -> Self {
263 Self {
264 inner: Some(ExactlyOnceImpl {
265 ack_id,
266 ack_tx,
267 result_rx,
268 }),
269 }
270 }
271
272 pub(crate) fn ack(mut self) {
277 if let Some(inner) = self.inner.take() {
278 inner.ack();
279 }
280 }
281
282 pub(crate) fn nack(mut self) {
283 if let Some(inner) = self.inner.take() {
284 inner.nack();
285 }
286 }
287
288 pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
308 let inner = self.inner.take().expect("handler impl is always some");
309 inner.confirmed_ack().await
310 }
311
312 pub async fn confirmed_nack(mut self) -> std::result::Result<(), AckError> {
331 let inner = self.inner.take().expect("handler impl is always some");
332 inner.confirmed_nack().await
333 }
334
335 #[cfg(test)]
336 pub(crate) fn ack_id(&self) -> &str {
337 self.inner
338 .as_ref()
339 .map(|i| i.ack_id.as_str())
340 .unwrap_or_default()
341 }
342}
343
344impl Drop for ExactlyOnce {
345 fn drop(&mut self) {
350 if let Some(inner) = self.inner.take() {
351 inner.nack();
352 }
353 }
354}
355
356#[derive(Debug)]
357struct ExactlyOnceImpl {
358 pub(super) ack_id: String,
359 pub(super) ack_tx: UnboundedSender<Action>,
360 pub(super) result_rx: Receiver<AckResult>,
361}
362
363impl ExactlyOnceImpl {
364 pub fn ack(self) {
365 let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
366 }
367
368 pub fn nack(self) {
369 let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
370 }
371
372 pub async fn confirmed_ack(self) -> AckResult {
373 self.ack_tx
374 .send(Action::ExactlyOnceAck(self.ack_id))
375 .map_err(|_| AckError::ShutdownBeforeAck)?;
376 self.result_rx
377 .await
378 .map_err(|e| AckError::Shutdown(e.into()))?
379 }
380
381 pub async fn confirmed_nack(self) -> AckResult {
382 self.ack_tx
383 .send(Action::ExactlyOnceNack(self.ack_id))
384 .map_err(|_| AckError::Shutdown(NACK_SHUTDOWN_ERROR.into()))?;
385 self.result_rx
386 .await
387 .map_err(|e| AckError::Shutdown(e.into()))?
388 }
389}
390
391pub(super) type AckResult = std::result::Result<(), AckError>;
393
394#[cfg(test)]
395mod tests {
396 use std::error::Error;
397
398 use super::super::lease_state::tests::test_id;
399 use super::*;
400 use tokio::sync::mpsc::error::TryRecvError;
401 use tokio::sync::mpsc::unbounded_channel;
402 use tokio::sync::oneshot::channel;
403
404 #[test]
405 fn handler_at_least_once_ack() -> anyhow::Result<()> {
406 let (ack_tx, mut ack_rx) = unbounded_channel();
407 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
408 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
409
410 h.ack();
411 let ack = ack_rx.try_recv()?;
412 assert_eq!(ack, Action::Ack(test_id(1)));
413
414 Ok(())
415 }
416
417 #[test]
418 fn handler_at_least_once_nack() -> anyhow::Result<()> {
419 let (ack_tx, mut ack_rx) = unbounded_channel();
420 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
421 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
422
423 h.nack();
424 let ack = ack_rx.try_recv()?;
425 assert_eq!(ack, Action::Nack(test_id(1)));
426
427 Ok(())
428 }
429
430 #[test]
431 fn handler_exactly_once_ack() -> anyhow::Result<()> {
432 let (ack_tx, mut ack_rx) = unbounded_channel();
433 let (_result_tx, result_rx) = channel();
434 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
435 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
436
437 h.ack();
438 let ack = ack_rx.try_recv()?;
439 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
440
441 Ok(())
442 }
443
444 #[test]
445 fn handler_exactly_once_nack() -> anyhow::Result<()> {
446 let (ack_tx, mut ack_rx) = unbounded_channel();
447 let (_result_tx, result_rx) = channel();
448 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
449 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
450
451 h.nack();
452 let ack = ack_rx.try_recv()?;
453 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
454
455 Ok(())
456 }
457
458 #[test]
459 fn at_least_once_ack() -> anyhow::Result<()> {
460 let (ack_tx, mut ack_rx) = unbounded_channel();
461 let h = AtLeastOnce::new(test_id(1), ack_tx);
462 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
463
464 h.ack();
465 let ack = ack_rx.try_recv()?;
466 assert_eq!(ack, Action::Ack(test_id(1)));
467
468 Ok(())
469 }
470
471 #[test]
472 fn at_least_once_nack() -> anyhow::Result<()> {
473 let (ack_tx, mut ack_rx) = unbounded_channel();
474 let h = AtLeastOnce::new(test_id(1), ack_tx);
475 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
476
477 h.nack();
478 let ack = ack_rx.try_recv()?;
479 assert_eq!(ack, Action::Nack(test_id(1)));
480
481 Ok(())
482 }
483
484 #[test]
485 fn exactly_once_ack() -> anyhow::Result<()> {
486 let (ack_tx, mut ack_rx) = unbounded_channel();
487 let (_result_tx, result_rx) = channel();
488 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
489 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
490
491 h.ack();
492 let ack = ack_rx.try_recv()?;
493 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
494
495 Ok(())
496 }
497
498 #[tokio::test]
499 async fn exactly_once_success() -> anyhow::Result<()> {
500 let (ack_tx, mut ack_rx) = unbounded_channel();
501 let (result_tx, result_rx) = channel();
502 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
503 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
504
505 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
506
507 let ack = ack_rx.recv().await.expect("ack should be sent");
508 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
509
510 result_tx
511 .send(Ok(()))
512 .expect("sending on a channel succeeds");
513 task.await??;
514
515 Ok(())
516 }
517
518 #[tokio::test]
519 async fn exactly_once_nack_success() -> anyhow::Result<()> {
520 let (ack_tx, mut ack_rx) = unbounded_channel();
521 let (result_tx, result_rx) = channel();
522 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
523 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
524
525 let task = tokio::task::spawn(async move { h.confirmed_nack().await });
526
527 let nack = ack_rx.recv().await.expect("ack should be sent");
528 assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
529
530 result_tx
531 .send(Ok(()))
532 .expect("sending on a channel succeeds");
533 task.await??;
534
535 Ok(())
536 }
537
538 #[tokio::test]
539 async fn exactly_once_error() -> anyhow::Result<()> {
540 let (ack_tx, mut ack_rx) = unbounded_channel();
541 let (result_tx, result_rx) = channel();
542 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
543 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
544
545 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
546
547 let ack = ack_rx.recv().await.expect("ack should be sent");
548 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
549
550 result_tx
551 .send(Err(AckError::LeaseExpired))
552 .expect("sending on a channel succeeds");
553 let err = task.await?.expect_err("ack should fail");
554 assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
555
556 Ok(())
557 }
558
559 #[tokio::test]
560 async fn exactly_once_nack_error() -> anyhow::Result<()> {
561 let (ack_tx, mut ack_rx) = unbounded_channel();
562 let (result_tx, result_rx) = channel();
563 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
564 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
565
566 let task = tokio::task::spawn(async move { h.confirmed_nack().await });
567
568 let nack = ack_rx.recv().await.expect("ack should be sent");
569 assert_eq!(nack, Action::ExactlyOnceNack(test_id(1)));
570
571 result_tx
572 .send(Err(AckError::LeaseExpired))
573 .expect("sending on a channel succeeds");
574 let err = task.await?.expect_err("ack should fail");
575 assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
576
577 Ok(())
578 }
579
580 #[tokio::test]
581 async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
582 let (ack_tx, mut ack_rx) = unbounded_channel();
583 let (_result_tx, result_rx) = channel();
584 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
585 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
586 drop(ack_rx);
587
588 let err = h.confirmed_ack().await.expect_err("ack should fail");
589 assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
590
591 Ok(())
592 }
593
594 #[tokio::test]
595 async fn exactly_once_nack_action_channel_closed() -> anyhow::Result<()> {
596 let (ack_tx, mut ack_rx) = unbounded_channel();
597 let (_result_tx, result_rx) = channel();
598 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
599 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
600 drop(ack_rx);
601
602 let err = h.confirmed_nack().await.expect_err("nack should fail");
603 assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
604 assert_eq!(
605 err.source()
606 .expect("shutdown errors have a source")
607 .to_string(),
608 NACK_SHUTDOWN_ERROR
609 );
610
611 Ok(())
612 }
613
614 #[tokio::test]
615 async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
616 let (ack_tx, mut ack_rx) = unbounded_channel();
617 let (result_tx, result_rx) = channel();
618 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
619 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
620
621 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
622
623 let ack = ack_rx.recv().await.expect("ack should be sent");
624 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
625
626 drop(result_tx);
627 let err = task.await?.expect_err("ack should fail");
628 assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
629
630 Ok(())
631 }
632
633 #[test]
634 fn exactly_once_nack() -> anyhow::Result<()> {
635 let (ack_tx, mut ack_rx) = unbounded_channel();
636 let (_result_tx, result_rx) = channel();
637 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
638 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
639
640 h.nack();
641 let ack = ack_rx.try_recv()?;
642 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
643
644 Ok(())
645 }
646
647 #[test]
648 fn handler_at_least_once_nack_on_drop() -> anyhow::Result<()> {
649 let (ack_tx, mut ack_rx) = unbounded_channel();
650 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
651 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
652
653 drop(h);
654 let ack = ack_rx.try_recv()?;
655 assert_eq!(ack, Action::Nack(test_id(1)));
656
657 Ok(())
658 }
659
660 #[test]
661 fn handler_exactly_once_nack_on_drop() -> anyhow::Result<()> {
662 let (ack_tx, mut ack_rx) = unbounded_channel();
663 let (_result_tx, result_rx) = channel();
664 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
665 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
666
667 drop(h);
668 let ack = ack_rx.try_recv()?;
669 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
670
671 Ok(())
672 }
673
674 #[test]
675 fn at_least_once_nack_on_drop() -> anyhow::Result<()> {
676 let (ack_tx, mut ack_rx) = unbounded_channel();
677 let h = AtLeastOnce::new(test_id(1), ack_tx);
678 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
679
680 drop(h);
681 let ack = ack_rx.try_recv()?;
682 assert_eq!(ack, Action::Nack(test_id(1)));
683
684 Ok(())
685 }
686
687 #[test]
688 fn exactly_once_nack_on_drop() -> anyhow::Result<()> {
689 let (ack_tx, mut ack_rx) = unbounded_channel();
690 let (_result_tx, result_rx) = channel();
691 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
692 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
693
694 drop(h);
695 let ack = ack_rx.try_recv()?;
696 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
697
698 Ok(())
699 }
700}