google_cloud_pubsub/subscriber/
handler.rs1use tokio::sync::mpsc::UnboundedSender;
16
17#[derive(Debug, PartialEq)]
19pub(super) enum AckResult {
20 Ack(String),
21 Nack(String),
22 }
24
25#[derive(Debug)]
27#[non_exhaustive]
28pub enum Handler {
29 AtLeastOnce(AtLeastOnce),
30 }
32
33impl Handler {
34 pub fn ack(self) {
39 match self {
40 Handler::AtLeastOnce(h) => h.ack(),
41 }
42 }
43
44 pub fn nack(self) {
49 match self {
50 Handler::AtLeastOnce(h) => h.nack(),
51 }
52 }
53}
54
55#[derive(Debug)]
57pub struct AtLeastOnce {
58 pub(super) ack_id: String,
59 pub(super) ack_tx: UnboundedSender<AckResult>,
60}
61
62impl AtLeastOnce {
63 pub fn ack(self) {
68 let _ = self.ack_tx.send(AckResult::Ack(self.ack_id));
69 }
70
71 pub fn nack(self) {
76 let _ = self.ack_tx.send(AckResult::Nack(self.ack_id));
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::super::lease_state::tests::test_id;
83 use super::*;
84 use tokio::sync::mpsc::error::TryRecvError;
85 use tokio::sync::mpsc::unbounded_channel;
86
87 #[test]
88 fn handler_ack() -> anyhow::Result<()> {
89 let (ack_tx, mut ack_rx) = unbounded_channel();
90 let h = Handler::AtLeastOnce(AtLeastOnce {
91 ack_id: test_id(1),
92 ack_tx,
93 });
94 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
95
96 h.ack();
97 let ack = ack_rx.try_recv()?;
98 assert_eq!(ack, AckResult::Ack(test_id(1)));
99
100 Ok(())
101 }
102
103 #[test]
104 fn handler_nack() -> anyhow::Result<()> {
105 let (ack_tx, mut ack_rx) = unbounded_channel();
106 let h = Handler::AtLeastOnce(AtLeastOnce {
107 ack_id: test_id(1),
108 ack_tx,
109 });
110 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
111
112 h.nack();
113 let ack = ack_rx.try_recv()?;
114 assert_eq!(ack, AckResult::Nack(test_id(1)));
115
116 Ok(())
117 }
118
119 #[test]
120 fn at_least_once_ack() -> anyhow::Result<()> {
121 let (ack_tx, mut ack_rx) = unbounded_channel();
122 let h = AtLeastOnce {
123 ack_id: test_id(1),
124 ack_tx,
125 };
126 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
127
128 h.ack();
129 let ack = ack_rx.try_recv()?;
130 assert_eq!(ack, AckResult::Ack(test_id(1)));
131
132 Ok(())
133 }
134
135 #[test]
136 fn at_least_once_nack() -> anyhow::Result<()> {
137 let (ack_tx, mut ack_rx) = unbounded_channel();
138 let h = AtLeastOnce {
139 ack_id: test_id(1),
140 ack_tx,
141 };
142 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
143
144 h.nack();
145 let ack = ack_rx.try_recv()?;
146 assert_eq!(ack, AckResult::Nack(test_id(1)));
147
148 Ok(())
149 }
150}