1use lapin::message::Delivery;
6use lapin::options::{BasicAckOptions, BasicNackOptions, BasicRejectOptions};
7use futures::{StreamExt, Stream};
8use anyhow::Result;
9use lapin::acker::Acker;
10
11enum Action {
12 Ack(Option<BasicAckOptions>),
13 Nack(Option<BasicNackOptions>),
14 Reject(Option<BasicRejectOptions>),
15}
16pub struct AutoAck {
19 acker: Option<Acker>,
20 action: Action,
21}
22
23impl AutoAck {
24 pub fn new(acker: Acker) -> Self {
29 Self {
30 acker: Some(acker),
31 action: Action::Ack(None),
32 }
33 }
34
35 pub fn new_ack(acker: Acker, options: BasicAckOptions) -> Self {
37 Self {
38 acker: Some(acker),
39 action: Action::Ack(Some(options)),
40 }
41 }
42
43 pub fn new_nack(acker: Acker, options: BasicNackOptions) -> Self {
44 Self {
45 acker: Some(acker),
46 action: Action::Nack(Some(options)),
47 }
48 }
49
50 pub fn new_reject(acker: Acker, options: BasicRejectOptions) -> Self {
51 Self {
52 acker: Some(acker),
53 action: Action::Reject(Some(options)),
54 }
55 }
56
57 pub fn change_to_nack(&mut self, options: Option<BasicNackOptions>) {
59 self.action = Action::Nack(options);
60 }
61
62 pub fn change_to_reject(&mut self, options: Option<BasicRejectOptions>) {
63 self.action = Action::Reject(options);
64 }
65
66 pub fn release(&mut self) -> Option<Acker> {
68 self.acker.take()
69 }
70
71 pub async fn execute(&mut self) -> Result<()> {
73 if let Some(acker) = self.acker.take() {
74 match self.action {
75 Action::Ack(ref mut options) => {
76 Self::do_ack(acker, options.take()).await
77 }
78 Action::Nack(ref mut options) => {
79 Self::do_nack(acker, options.take()).await
80 }
81 Action::Reject(ref mut option) => {
82 Self::do_reject(acker, option.take()).await
83 }
84 }
85 } else {
86 Ok(())
87 }
88 }
89
90
91
92
93 async fn do_ack(acker: Acker, options: Option<BasicAckOptions>) -> Result<()> {
95 acker.ack( options.unwrap_or_else(|| BasicAckOptions::default())).await?;
96 Ok(())
97 }
98 async fn do_nack(acker: Acker, options: Option<BasicNackOptions>) -> Result<()> {
100 acker.nack( options.unwrap_or_else(|| BasicNackOptions::default())).await?;
101 Ok(())
102 }
103
104 async fn do_reject(acker: Acker, options: Option<BasicRejectOptions>) -> Result<()> {
105 acker.reject(options.unwrap_or_else(|| BasicRejectOptions::default())).await?;
106 Ok(())
107 }
108}
109
110impl Drop for AutoAck {
111 fn drop(&mut self) {
112 if let Some(acker) = self.acker.take() {
113 match self.action {
114 Action::Ack(ref mut options) => {
115 #[cfg(feature = "tokio_runtime")]
116 tokio::spawn(Self::do_ack(acker, options.take()));
117 #[cfg(feature = "async_std_runtime")]
118 async_std::task::spawn(Self::do_ack(acker, options.take()));
119 }
120 Action::Nack(ref mut options) => {
121 #[cfg(feature = "tokio_runtime")]
122 tokio::spawn(Self::do_nack(acker, options.take()));
123 #[cfg(feature = "async_std_runtime")]
124 async_std::task::spawn(Self::do_nack(acker, options.take()));
125 }
126 Action::Reject(ref mut options) => {
127 #[cfg(feature = "tokio_runtime")]
128 tokio::spawn(Self::do_reject(acker, options.take()));
129 #[cfg(feature = "async_std_runtime")]
130 async_std::task::spawn(Self::do_reject(acker, options.take()));
131 }
132 }
133 }
134 }
135}
136
137pub fn auto_ack<S: StreamExt + Stream<Item = Delivery>>(stream: S) -> impl Stream<Item = (AutoAck, Vec<u8>)> {
140 stream.map(|delivery| (AutoAck::new(delivery.acker), delivery.data))
141}