google_cloud_pubsub/subscriber/
handler.rs1use crate::error::AckError;
44use tokio::sync::mpsc::UnboundedSender;
45use tokio::sync::oneshot::Receiver;
46
47#[derive(Debug, PartialEq)]
49pub(super) enum Action {
50 Ack(String),
51 Nack(String),
52 ExactlyOnceAck(String),
53 ExactlyOnceNack(String),
54}
55
56#[derive(Debug)]
108#[non_exhaustive]
109pub enum Handler {
110 AtLeastOnce(AtLeastOnce),
111 ExactlyOnce(ExactlyOnce),
112}
113
114impl Handler {
115 pub fn ack(self) {
132 match self {
133 Handler::AtLeastOnce(h) => h.ack(),
134 Handler::ExactlyOnce(h) => h.ack(),
135 }
136 }
137
138 #[cfg(test)]
139 pub(crate) fn ack_id(&self) -> &str {
140 match self {
141 Handler::AtLeastOnce(h) => h.ack_id(),
142 Handler::ExactlyOnce(h) => h.ack_id(),
143 }
144 }
145}
146
147#[derive(Debug)]
148struct AtLeastOnceImpl {
149 ack_id: String,
150 ack_tx: UnboundedSender<Action>,
151}
152
153impl AtLeastOnceImpl {
154 fn ack(self) {
155 let _ = self.ack_tx.send(Action::Ack(self.ack_id));
156 }
157
158 fn nack(self) {
159 let _ = self.ack_tx.send(Action::Nack(self.ack_id));
160 }
161}
162
163#[derive(Debug)]
165pub struct AtLeastOnce {
166 inner: Option<AtLeastOnceImpl>,
167}
168
169impl AtLeastOnce {
170 pub(super) fn new(ack_id: String, ack_tx: UnboundedSender<Action>) -> Self {
171 Self {
172 inner: Some(AtLeastOnceImpl { ack_id, ack_tx }),
173 }
174 }
175
176 pub fn ack(mut self) {
181 if let Some(inner) = self.inner.take() {
182 inner.ack();
183 }
184 }
185
186 #[cfg(test)]
187 pub(crate) fn ack_id(&self) -> &str {
188 self.inner
189 .as_ref()
190 .map(|i| i.ack_id.as_str())
191 .unwrap_or_default()
192 }
193}
194
195impl Drop for AtLeastOnce {
196 fn drop(&mut self) {
201 if let Some(inner) = self.inner.take() {
202 inner.nack();
203 }
204 }
205}
206
207#[derive(Debug)]
209pub struct ExactlyOnce {
210 inner: Option<ExactlyOnceImpl>,
211}
212
213impl ExactlyOnce {
214 pub(super) fn new(
215 ack_id: String,
216 ack_tx: UnboundedSender<Action>,
217 result_rx: Receiver<AckResult>,
218 ) -> Self {
219 Self {
220 inner: Some(ExactlyOnceImpl {
221 ack_id,
222 ack_tx,
223 result_rx,
224 }),
225 }
226 }
227
228 pub(crate) fn ack(mut self) {
233 if let Some(inner) = self.inner.take() {
234 inner.ack();
235 }
236 }
237
238 pub async fn confirmed_ack(mut self) -> std::result::Result<(), AckError> {
258 let inner = self.inner.take().expect("handler impl is always some");
259 inner.confirmed_ack().await
260 }
261
262 #[cfg(test)]
263 pub(crate) fn ack_id(&self) -> &str {
264 self.inner
265 .as_ref()
266 .map(|i| i.ack_id.as_str())
267 .unwrap_or_default()
268 }
269}
270
271impl Drop for ExactlyOnce {
272 fn drop(&mut self) {
277 if let Some(inner) = self.inner.take() {
278 inner.nack();
279 }
280 }
281}
282
283#[derive(Debug)]
284struct ExactlyOnceImpl {
285 pub(super) ack_id: String,
286 pub(super) ack_tx: UnboundedSender<Action>,
287 pub(super) result_rx: Receiver<AckResult>,
288}
289
290impl ExactlyOnceImpl {
291 pub fn ack(self) {
292 let _ = self.ack_tx.send(Action::ExactlyOnceAck(self.ack_id));
293 }
294
295 pub fn nack(self) {
296 let _ = self.ack_tx.send(Action::ExactlyOnceNack(self.ack_id));
297 }
298
299 pub async fn confirmed_ack(self) -> AckResult {
300 self.ack_tx
301 .send(Action::ExactlyOnceAck(self.ack_id))
302 .map_err(|_| AckError::ShutdownBeforeAck)?;
303 self.result_rx
304 .await
305 .map_err(|e| AckError::Shutdown(e.into()))?
306 }
307}
308
309pub(super) type AckResult = std::result::Result<(), AckError>;
311
312#[cfg(test)]
313mod tests {
314 use super::super::lease_state::tests::test_id;
315 use super::*;
316 use tokio::sync::mpsc::error::TryRecvError;
317 use tokio::sync::mpsc::unbounded_channel;
318 use tokio::sync::oneshot::channel;
319
320 #[test]
321 fn handler_at_least_once_ack() -> anyhow::Result<()> {
322 let (ack_tx, mut ack_rx) = unbounded_channel();
323 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
324 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
325
326 h.ack();
327 let ack = ack_rx.try_recv()?;
328 assert_eq!(ack, Action::Ack(test_id(1)));
329
330 Ok(())
331 }
332
333 #[test]
334 fn handler_at_least_once_nack() -> anyhow::Result<()> {
335 let (ack_tx, mut ack_rx) = unbounded_channel();
336 let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
337 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
338
339 drop(h);
340 let ack = ack_rx.try_recv()?;
341 assert_eq!(ack, Action::Nack(test_id(1)));
342
343 Ok(())
344 }
345
346 #[test]
347 fn handler_exactly_once_ack() -> anyhow::Result<()> {
348 let (ack_tx, mut ack_rx) = unbounded_channel();
349 let (_result_tx, result_rx) = channel();
350 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
351 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
352
353 h.ack();
354 let ack = ack_rx.try_recv()?;
355 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
356
357 Ok(())
358 }
359
360 #[test]
361 fn handler_exactly_once_nack() -> anyhow::Result<()> {
362 let (ack_tx, mut ack_rx) = unbounded_channel();
363 let (_result_tx, result_rx) = channel();
364 let h = Handler::ExactlyOnce(ExactlyOnce::new(test_id(1), ack_tx, result_rx));
365 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
366
367 drop(h);
368 let ack = ack_rx.try_recv()?;
369 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
370
371 Ok(())
372 }
373
374 #[test]
375 fn at_least_once_ack() -> anyhow::Result<()> {
376 let (ack_tx, mut ack_rx) = unbounded_channel();
377 let h = AtLeastOnce::new(test_id(1), ack_tx);
378 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
379
380 h.ack();
381 let ack = ack_rx.try_recv()?;
382 assert_eq!(ack, Action::Ack(test_id(1)));
383
384 Ok(())
385 }
386
387 #[test]
388 fn at_least_once_nack() -> anyhow::Result<()> {
389 let (ack_tx, mut ack_rx) = unbounded_channel();
390 let h = AtLeastOnce::new(test_id(1), ack_tx);
391 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
392
393 drop(h);
394 let ack = ack_rx.try_recv()?;
395 assert_eq!(ack, Action::Nack(test_id(1)));
396
397 Ok(())
398 }
399
400 #[test]
401 fn exactly_once_ack() -> anyhow::Result<()> {
402 let (ack_tx, mut ack_rx) = unbounded_channel();
403 let (_result_tx, result_rx) = channel();
404 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
405 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
406
407 h.ack();
408 let ack = ack_rx.try_recv()?;
409 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
410
411 Ok(())
412 }
413
414 #[tokio::test]
415 async fn exactly_once_success() -> anyhow::Result<()> {
416 let (ack_tx, mut ack_rx) = unbounded_channel();
417 let (result_tx, result_rx) = channel();
418 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
419 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
420
421 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
422
423 let ack = ack_rx.recv().await.expect("ack should be sent");
424 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
425
426 result_tx
427 .send(Ok(()))
428 .expect("sending on a channel succeeds");
429 task.await??;
430
431 Ok(())
432 }
433
434 #[tokio::test]
435 async fn exactly_once_error() -> anyhow::Result<()> {
436 let (ack_tx, mut ack_rx) = unbounded_channel();
437 let (result_tx, result_rx) = channel();
438 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
439 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
440
441 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
442
443 let ack = ack_rx.recv().await.expect("ack should be sent");
444 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
445
446 result_tx
447 .send(Err(AckError::LeaseExpired))
448 .expect("sending on a channel succeeds");
449 let err = task.await?.expect_err("ack should fail");
450 assert!(matches!(err, AckError::LeaseExpired), "{err:?}");
451
452 Ok(())
453 }
454
455 #[tokio::test]
456 async fn exactly_once_action_channel_closed() -> anyhow::Result<()> {
457 let (ack_tx, mut ack_rx) = unbounded_channel();
458 let (_result_tx, result_rx) = channel();
459 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
460 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
461 drop(ack_rx);
462
463 let err = h.confirmed_ack().await.expect_err("ack should fail");
464 assert!(matches!(err, AckError::ShutdownBeforeAck), "{err:?}");
465
466 Ok(())
467 }
468
469 #[tokio::test]
470 async fn exactly_once_result_channel_closed() -> anyhow::Result<()> {
471 let (ack_tx, mut ack_rx) = unbounded_channel();
472 let (result_tx, result_rx) = channel();
473 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
474 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
475
476 let task = tokio::task::spawn(async move { h.confirmed_ack().await });
477
478 let ack = ack_rx.recv().await.expect("ack should be sent");
479 assert_eq!(ack, Action::ExactlyOnceAck(test_id(1)));
480
481 drop(result_tx);
482 let err = task.await?.expect_err("ack should fail");
483 assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
484
485 Ok(())
486 }
487
488 #[test]
489 fn exactly_once_nack() -> anyhow::Result<()> {
490 let (ack_tx, mut ack_rx) = unbounded_channel();
491 let (_result_tx, result_rx) = channel();
492 let h = ExactlyOnce::new(test_id(1), ack_tx, result_rx);
493 assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
494
495 drop(h);
496 let ack = ack_rx.try_recv()?;
497 assert_eq!(ack, Action::ExactlyOnceNack(test_id(1)));
498
499 Ok(())
500 }
501}