Skip to main content

lapin/
publisher_confirm.rs

1use crate::{Promise, Result, message::BasicReturnMessage, returned_messages::ReturnedMessages};
2use std::{
3    fmt,
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tracing::trace;
9
10pub struct PublisherConfirm {
11    inner: Option<Promise<Confirmation>>,
12    returned_messages: ReturnedMessages,
13}
14
15#[derive(Debug, PartialEq)]
16pub enum Confirmation {
17    Ack(Option<BasicReturnMessage>),
18    Nack(Option<BasicReturnMessage>),
19    NotRequested,
20}
21
22impl Confirmation {
23    pub fn take_message(self) -> Option<BasicReturnMessage> {
24        if let Confirmation::Ack(msg) | Confirmation::Nack(msg) = self {
25            msg
26        } else {
27            None
28        }
29    }
30
31    pub fn is_ack(&self) -> bool {
32        matches!(self, Confirmation::Ack(_))
33    }
34
35    pub fn is_nack(&self) -> bool {
36        matches!(self, Confirmation::Nack(_))
37    }
38}
39
40impl PublisherConfirm {
41    pub(crate) fn new(inner: Promise<Confirmation>, returned_messages: ReturnedMessages) -> Self {
42        Self {
43            inner: Some(inner),
44            returned_messages,
45        }
46    }
47
48    pub(crate) fn not_requested(returned_messages: ReturnedMessages) -> Self {
49        Self {
50            inner: Some(Promise::new_with_data(
51                "publisher-confirms.not-requested",
52                Ok(Confirmation::NotRequested),
53            )),
54            returned_messages,
55        }
56    }
57}
58
59impl fmt::Debug for PublisherConfirm {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_struct("PublisherConfirm").finish()
62    }
63}
64
65impl Future for PublisherConfirm {
66    type Output = Result<Confirmation>;
67
68    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69        let mut this = self.as_mut();
70        let res = Pin::new(
71            &mut this
72                .inner
73                .as_mut()
74                .expect("PublisherConfirm polled after completion"),
75        )
76        .poll(cx);
77        if res.is_ready() {
78            this.inner.take();
79        }
80        res
81    }
82}
83
84impl Drop for PublisherConfirm {
85    fn drop(&mut self) {
86        if let Some(promise) = self.inner.take() {
87            trace!("PublisherConfirm dropped without use, registering it for wait_for_confirms");
88            self.returned_messages.register_dropped_confirm(promise);
89        }
90    }
91}