1use std::{sync::Arc, time::Duration};
2
3use futures_lite::StreamExt;
4use lapin::{
5 Channel,
6 options::{BasicConsumeOptions, QueueDeclareOptions},
7 types::FieldTable,
8};
9use log::debug;
10use serde::{Deserialize, Serialize};
11use tokio::sync::Mutex;
12
13use super::connection::build_connection;
14
15#[derive(Serialize, Deserialize)]
16pub enum MessageType {
17 ExecuteFlow,
18 TestExecuteFlow,
19}
20
21#[derive(Serialize, Deserialize)]
22pub struct Sender {
23 pub name: String,
24 pub protocol: String,
25 pub version: String,
26}
27
28#[derive(Serialize, Deserialize)]
29pub struct Message {
30 pub message_type: MessageType,
31 pub sender: Sender,
32 pub timestamp: i64,
33 pub message_id: String,
34 pub body: String,
35}
36
37pub struct RabbitmqClient {
38 pub channel: Arc<Mutex<Channel>>,
39}
40
41#[derive(Debug)]
42pub enum RabbitMqError {
43 LapinError(lapin::Error),
44 ConnectionError(String),
45 TimeoutError,
46 DeserializationError,
47 SerializationError,
48}
49
50impl From<lapin::Error> for RabbitMqError {
51 fn from(error: lapin::Error) -> Self {
52 RabbitMqError::LapinError(error)
53 }
54}
55
56impl From<std::io::Error> for RabbitMqError {
57 fn from(error: std::io::Error) -> Self {
58 RabbitMqError::ConnectionError(error.to_string())
59 }
60}
61
62impl std::fmt::Display for RabbitMqError {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 RabbitMqError::LapinError(err) => write!(f, "RabbitMQ error: {}", err),
66 RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
67 RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
68 RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
69 RabbitMqError::SerializationError => write!(f, "Failed to serialize message"),
70 }
71 }
72}
73
74impl RabbitmqClient {
75 pub async fn new(rabbitmq_url: &str) -> Self {
77 let connection = build_connection(rabbitmq_url).await;
78 let channel = connection.create_channel().await.unwrap();
79
80 match channel
81 .queue_declare(
82 "send_queue",
83 QueueDeclareOptions::default(),
84 FieldTable::default(),
85 )
86 .await
87 {
88 Ok(_) => {
89 log::info!("Successfully declared send_queue");
90 }
91 Err(err) => log::error!("Failed to declare send_queue: {:?}", err),
92 }
93
94 match channel
95 .queue_declare(
96 "recieve_queue",
97 QueueDeclareOptions::default(),
98 FieldTable::default(),
99 )
100 .await
101 {
102 Ok(_) => {
103 log::info!("Successfully declared recieve_queue");
104 }
105 Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err),
106 }
107
108 RabbitmqClient {
109 channel: Arc::new(Mutex::new(channel)),
110 }
111 }
112
113 pub async fn send_message(
115 &self,
116 message_json: String,
117 queue_name: &str,
118 ) -> Result<(), RabbitMqError> {
119 let channel = self.channel.lock().await;
120
121 match channel
122 .basic_publish(
123 "", queue_name, lapin::options::BasicPublishOptions::default(),
126 message_json.as_bytes(),
127 lapin::BasicProperties::default(),
128 )
129 .await
130 {
131 Err(err) => {
132 log::error!("Failed to publish message: {:?}", err);
133 Err(RabbitMqError::LapinError(err))
134 }
135 Ok(_) => Ok(()),
136 }
137 }
138
139 pub async fn await_message_no_timeout(
141 &self,
142 queue_name: &str,
143 message_id: String,
144 ack_on_success: bool,
145 ) -> Result<Message, RabbitMqError> {
146 let mut consumer = {
147 let channel = self.channel.lock().await;
148
149 let consumer_res = channel
150 .basic_consume(
151 queue_name,
152 "consumer",
153 lapin::options::BasicConsumeOptions::default(),
154 FieldTable::default(),
155 )
156 .await;
157
158 match consumer_res {
159 Ok(consumer) => {
160 log::info!("Established queue connection to {}", queue_name);
161 consumer
162 }
163 Err(err) => {
164 log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
165 return Err(RabbitMqError::LapinError(err));
166 }
167 }
168 };
169
170 debug!("Starting to consume from {}", queue_name);
171
172 while let Some(delivery_result) = consumer.next().await {
173 let delivery = match delivery_result {
174 Ok(del) => del,
175 Err(_) => return Err(RabbitMqError::DeserializationError),
176 };
177 let data = &delivery.data;
178 let message_str = match std::str::from_utf8(&data) {
179 Ok(str) => str,
180 Err(_) => {
181 return Err(RabbitMqError::DeserializationError);
182 }
183 };
184
185 debug!("Received message: {}", message_str);
186
187 let message = match serde_json::from_str::<Message>(message_str) {
189 Ok(m) => m,
190 Err(e) => {
191 log::error!("Failed to parse message: {:?}", e);
192 return Err(RabbitMqError::DeserializationError);
193 }
194 };
195
196 if message.message_id == message_id {
197 if ack_on_success {
198 if let Err(delivery_error) = delivery
199 .ack(lapin::options::BasicAckOptions::default())
200 .await
201 {
202 log::error!("Failed to acknowledge message: {:?}", delivery_error);
203 }
204 }
205
206 return Ok(message);
207 }
208 }
209 Err(RabbitMqError::DeserializationError)
210 }
211
212 pub async fn receive_messages(
214 &self,
215 queue_name: &str,
216 handle_message: fn(Message) -> Result<Message, lapin::Error>,
217 ) -> Result<(), RabbitMqError> {
218 let mut consumer = {
219 let channel = self.channel.lock().await;
220
221 let consumer_res = channel
222 .basic_consume(
223 queue_name,
224 "consumer",
225 BasicConsumeOptions::default(),
226 FieldTable::default(),
227 )
228 .await;
229
230 match consumer_res {
231 Ok(consumer) => {
232 log::info!("Established queue connection to {}", queue_name);
233 consumer
234 }
235 Err(err) => {
236 log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
237 return Err(RabbitMqError::LapinError(err));
238 }
239 }
240 };
241
242 debug!("Starting to consume from {}", queue_name);
243
244 while let Some(delivery) = consumer.next().await {
245 let delivery = match delivery {
246 Ok(del) => del,
247 Err(err) => {
248 log::error!("Error receiving message: {:?}", err);
249 return Err(RabbitMqError::LapinError(err));
250 }
251 };
252
253 let data = &delivery.data;
254 let message_str = match std::str::from_utf8(&data) {
255 Ok(str) => {
256 log::info!("Received message: {}", str);
257 str
258 }
259 Err(err) => {
260 log::error!("Error decoding message: {:?}", err);
261 return Err(RabbitMqError::DeserializationError);
262 }
263 };
264 let inc_message = match serde_json::from_str::<Message>(message_str) {
266 Ok(mess) => mess,
267 Err(err) => {
268 log::error!("Error parsing message: {:?}", err);
269 return Err(RabbitMqError::DeserializationError);
270 }
271 };
272
273 let message = match handle_message(inc_message) {
274 Ok(mess) => mess,
275 Err(err) => {
276 log::error!("Error handling message: {:?}", err);
277 return Err(RabbitMqError::DeserializationError);
278 }
279 };
280
281 let message_json = match serde_json::to_string(&message) {
282 Ok(json) => json,
283 Err(err) => {
284 log::error!("Error serializing message: {:?}", err);
285 return Err(RabbitMqError::SerializationError);
286 }
287 };
288
289 {
290 let _ = self.send_message(message_json, "recieve_queue").await;
291 }
292
293 if let Err(delivery_error) = delivery
295 .ack(lapin::options::BasicAckOptions::default())
296 .await
297 {
298 log::error!("Failed to acknowledge message: {:?}", delivery_error);
299 }
300 }
301
302 Ok(())
303 }
304
305 pub async fn await_message(
307 &self,
308 queue_name: &str,
309 message_id: String,
310 timeout: Duration,
311 ack_on_success: bool,
312 ) -> Result<Message, RabbitMqError> {
313 match tokio::time::timeout(
315 timeout,
316 self.await_message_no_timeout(queue_name, message_id, ack_on_success),
317 )
318 .await
319 {
320 Ok(result) => result,
321 Err(_) => {
322 debug!(
323 "Timeout waiting for message after {} seconds",
324 timeout.as_secs()
325 );
326 Err(RabbitMqError::TimeoutError)
327 }
328 }
329 }
330}