rabbit_auto/
auto_ack.rs

1//! Helper to auto ack deliveries
2
3
4
5use lapin::message::Delivery;
6use lapin::options::{BasicAckOptions, BasicNackOptions, BasicRejectOptions};
7use futures::{StreamExt, Stream};
8use anyhow::Result;
9use lapin::acker::Acker;
10
11enum Action {
12    Ack(Option<BasicAckOptions>),
13    Nack(Option<BasicNackOptions>),
14    Reject(Option<BasicRejectOptions>),
15}
16/// Automatically ack delivery when this struct drops.
17/// There is an option to automatically nack the delivery.
18pub struct AutoAck {
19    acker: Option<Acker>,
20    action: Action,
21}
22
23impl AutoAck {
24    /// Creates a new auto ack
25    /// # Arguments:
26    /// * channel - the channel where this delivery was received on
27    /// * delivery - delivery which was received. This will take its id of the delivery.
28    pub fn new(acker: Acker) -> Self {
29        Self {
30            acker: Some(acker),
31            action: Action::Ack(None),
32        }
33    }
34
35    /// Creates a new auto ack with BasicAckOptions
36    pub fn new_ack(acker: Acker, options: BasicAckOptions) -> Self {
37        Self {
38            acker: Some(acker),
39            action: Action::Ack(Some(options)),
40        }
41    }
42
43    pub fn new_nack(acker: Acker, options: BasicNackOptions) -> Self {
44        Self {
45            acker: Some(acker),
46            action: Action::Nack(Some(options)),
47        }
48    }
49
50    pub fn new_reject(acker: Acker, options: BasicRejectOptions) -> Self {
51        Self {
52            acker: Some(acker),
53            action: Action::Reject(Some(options)),
54        }
55    }
56
57    /// Change the auto ack in to auto nack
58    pub fn change_to_nack(&mut self, options: Option<BasicNackOptions>) {
59        self.action = Action::Nack(options);
60    }
61
62    pub fn change_to_reject(&mut self, options: Option<BasicRejectOptions>) {
63        self.action = Action::Reject(options);
64    }
65
66    /// Release the channel and the tag from this
67    pub fn release(&mut self) -> Option<Acker> {
68        self.acker.take()
69    }
70
71    /// Perform the ack or nack on the channel, if this has not been already done or released.
72    pub async fn execute(&mut self) -> Result<()> {
73        if let Some(acker) = self.acker.take() {
74            match self.action {
75                Action::Ack(ref mut options) => {
76                    Self::do_ack(acker, options.take()).await
77                }
78                Action::Nack(ref mut options) => {
79                    Self::do_nack(acker, options.take()).await
80                }
81                Action::Reject(ref mut option) => {
82                    Self::do_reject(acker, option.take()).await
83                }
84            }
85        } else {
86            Ok(())
87        }
88    }
89
90
91
92
93    /// Ack the delivery on the channel
94    async fn do_ack(acker: Acker, options: Option<BasicAckOptions>) -> Result<()> {
95        acker.ack( options.unwrap_or_else(|| BasicAckOptions::default())).await?;
96        Ok(())
97    }
98    /// Nack the delivery on the channel
99    async fn do_nack(acker: Acker, options: Option<BasicNackOptions>) -> Result<()> {
100        acker.nack( options.unwrap_or_else(|| BasicNackOptions::default())).await?;
101        Ok(())
102    }
103
104    async fn do_reject(acker: Acker, options: Option<BasicRejectOptions>) -> Result<()> {
105        acker.reject(options.unwrap_or_else(|| BasicRejectOptions::default())).await?;
106        Ok(())
107    }
108}
109
110impl Drop for AutoAck {
111    fn drop(&mut self) {
112        if let Some(acker) = self.acker.take() {
113            match self.action {
114                Action::Ack(ref mut options) => {
115                    #[cfg(feature = "tokio_runtime")]
116                    tokio::spawn(Self::do_ack(acker, options.take()));
117                    #[cfg(feature = "async_std_runtime")]
118                    async_std::task::spawn(Self::do_ack(acker, options.take()));
119                }
120                Action::Nack(ref mut options) => {
121                    #[cfg(feature = "tokio_runtime")]
122                    tokio::spawn(Self::do_nack(acker, options.take()));
123                    #[cfg(feature = "async_std_runtime")]
124                    async_std::task::spawn(Self::do_nack(acker, options.take()));
125                }
126                Action::Reject(ref mut options) => {
127                    #[cfg(feature = "tokio_runtime")]
128                    tokio::spawn(Self::do_reject(acker, options.take()));
129                    #[cfg(feature = "async_std_runtime")]
130                    async_std::task::spawn(Self::do_reject(acker, options.take()));
131                }
132            }
133        }
134    }
135}
136
137/// Map consumer stream into automatically ack stream. The AutoAck object can still be used to release the ack,
138/// and manually ack it or not ack it.
139pub fn auto_ack<S: StreamExt + Stream<Item = Delivery>>(stream: S) -> impl Stream<Item = (AutoAck, Vec<u8>)> {
140    stream.map(|delivery| (AutoAck::new(delivery.acker), delivery.data))
141}