1use futures::Future;
2use lapin::options::BasicQosOptions;
3use lapin::Channel;
4use log::debug;
5use log::info;
6use log::warn;
7use serde::de::DeserializeOwned;
8use serde_json;
9
10use super::rmq_primitive::constant::*;
11use super::rmq_primitive::create_channel;
12use super::rmq_primitive::unreliable_ack_or_reject;
13use super::rmq_primitive::AckType;
14use super::rmq_primitive::Responsibility;
15use crate::Result;
16
17pub async fn consume_forever<InputMsg, HandlerState, HandlerResult>(
42 rmq_uri: &str,
43 input_queue: &'static str,
44 handler: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
45 handler_state: HandlerState,
46 prefetch_count: u16,
47) where
48 InputMsg: DeserializeOwned + Send + 'static,
49 HandlerState: Clone + Send + 'static,
50 HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
51{
52 loop {
53 match consume_queue(
55 rmq_uri,
56 &input_queue,
57 handler,
58 handler_state.clone(),
59 prefetch_count,
60 )
61 .await
62 {
63 Ok(()) => (),
64 Err(e) => {
65 warn!(
66 "error happened when consuming queue {}, will retry: {}",
67 &input_queue, e
68 );
69 }
70 }
71
72 let duration = std::time::Duration::from_millis(2000);
74 info!(
75 "sleep for {} seconds before reconnecting to queue {}",
76 &duration.as_secs(),
77 &input_queue
78 );
79 tokio::time::sleep(duration).await
80 }
81}
82
83async fn consume_queue<InputMsg, HandlerState, HandlerResult>(
84 rmq_uri: &str,
85 input_queue: &'static str,
86 handler: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
87 handler_state: HandlerState,
88 prefetch_count: u16,
89) -> Result<()>
90where
91 InputMsg: DeserializeOwned + Send + 'static,
92 HandlerState: Clone + Send + 'static,
93 HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
94{
95 info!("creating channel for consuming queue {}", input_queue);
97 let channel = create_channel(rmq_uri).await?;
98 info!("setting prefetch to be {}", prefetch_count);
99 channel
100 .basic_qos(prefetch_count, BasicQosOptions { global: false })
101 .await?;
102
103 let consumer_tag = format!("dgec-{}", input_queue);
107 info!("creating consumer {}", &consumer_tag);
108 let consumer = channel
109 .basic_consume(
110 input_queue,
111 &consumer_tag,
112 RMQ_BASIC_CONSUME_OPTIONS,
113 lapin::types::FieldTable::default(),
114 )
115 .await?;
116
117 info!("entering consuming loop for queue {}", input_queue);
119 for delivery in consumer {
120 let (channel, msg) = delivery?;
121 tokio::spawn(handle_one_delivery(
122 channel,
123 msg,
124 handler,
125 handler_state.clone(),
126 ));
127 }
128
129 Ok(())
130}
131
132async fn handle_one_delivery<InputMsg, HandlerState, HandlerResult>(
133 channel: Channel,
134 delivery: lapin::message::Delivery,
135 handle: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
136 handler_state: HandlerState,
137) where
138 InputMsg: DeserializeOwned + Send + 'static,
139 HandlerState: Clone + Send + 'static,
140 HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
141{
142 debug!("processing message of tag: {}", delivery.delivery_tag);
143 match serde_json::from_slice::<InputMsg>(&delivery.data) {
144 Err(e) => {
145 warn!(
150 "failed to parse json when processing delivery: {}, msg will be dropped, error is: {}, data is: {:?}",
151 &delivery.delivery_tag, e, &delivery.data
152 );
153 unreliable_ack_or_reject(channel, AckType::Ack, delivery.delivery_tag).await
154 }
155 Ok(msg) => match handle(handler_state, channel.clone(), msg).await {
156 Err(e) => {
157 warn!(
158 "an error occurred while handling message {}, will requeue it, error is: {}",
159 &delivery.delivery_tag, e
160 );
161 unreliable_ack_or_reject(channel, AckType::Reject, delivery.delivery_tag).await
162 }
163 Ok(Responsibility::Reject) => {
164 debug!("explicitly rejecting message {}", &delivery.delivery_tag);
165 unreliable_ack_or_reject(channel, AckType::Reject, delivery.delivery_tag).await
166 }
167
168 Ok(Responsibility::Accept) => {
169 debug!("accepting message {}", &delivery.delivery_tag);
170 unreliable_ack_or_reject(channel, AckType::Ack, delivery.delivery_tag).await
171 }
172 },
173 }
174}