code0_flow/flow_queue/
service.rs

1use std::{sync::Arc, time::Duration};
2
3use futures_lite::StreamExt;
4use lapin::{
5    Channel,
6    options::{BasicConsumeOptions, QueueDeclareOptions},
7    types::FieldTable,
8};
9use log::debug;
10use serde::{Deserialize, Serialize};
11use tokio::sync::Mutex;
12
13use super::connection::build_connection;
14
15#[derive(Serialize, Deserialize)]
16pub enum MessageType {
17    ExecuteFlow,
18    TestExecuteFlow,
19}
20
21#[derive(Serialize, Deserialize)]
22pub struct Sender {
23    pub name: String,
24    pub protocol: String,
25    pub version: String,
26}
27
28#[derive(Serialize, Deserialize)]
29pub struct Message {
30    pub message_type: MessageType,
31    pub sender: Sender,
32    pub timestamp: i64,
33    pub message_id: String,
34    pub body: String,
35}
36
37pub struct RabbitmqClient {
38    pub channel: Arc<Mutex<Channel>>,
39}
40
41#[derive(Debug)]
42pub enum RabbitMqError {
43    LapinError(lapin::Error),
44    ConnectionError(String),
45    TimeoutError,
46    DeserializationError,
47    SerializationError,
48}
49
50impl From<lapin::Error> for RabbitMqError {
51    fn from(error: lapin::Error) -> Self {
52        RabbitMqError::LapinError(error)
53    }
54}
55
56impl From<std::io::Error> for RabbitMqError {
57    fn from(error: std::io::Error) -> Self {
58        RabbitMqError::ConnectionError(error.to_string())
59    }
60}
61
62impl std::fmt::Display for RabbitMqError {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            RabbitMqError::LapinError(err) => write!(f, "RabbitMQ error: {}", err),
66            RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
67            RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
68            RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
69            RabbitMqError::SerializationError => write!(f, "Failed to serialize message"),
70        }
71    }
72}
73
74impl RabbitmqClient {
75    // Create a new RabbitMQ client with channel
76    pub async fn new(rabbitmq_url: &str) -> Self {
77        let connection = build_connection(rabbitmq_url).await;
78        let channel = connection.create_channel().await.unwrap();
79
80        match channel
81            .queue_declare(
82                "send_queue",
83                QueueDeclareOptions::default(),
84                FieldTable::default(),
85            )
86            .await
87        {
88            Ok(_) => {
89                log::info!("Successfully declared send_queue");
90            }
91            Err(err) => log::error!("Failed to declare send_queue: {:?}", err),
92        }
93
94        match channel
95            .queue_declare(
96                "recieve_queue",
97                QueueDeclareOptions::default(),
98                FieldTable::default(),
99            )
100            .await
101        {
102            Ok(_) => {
103                log::info!("Successfully declared recieve_queue");
104            }
105            Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err),
106        }
107
108        RabbitmqClient {
109            channel: Arc::new(Mutex::new(channel)),
110        }
111    }
112
113    // Send message to the queue
114    pub async fn send_message(
115        &self,
116        message_json: String,
117        queue_name: &str,
118    ) -> Result<(), RabbitMqError> {
119        let channel = self.channel.lock().await;
120
121        match channel
122            .basic_publish(
123                "",         // exchange
124                queue_name, // routing key (queue name)
125                lapin::options::BasicPublishOptions::default(),
126                message_json.as_bytes(),
127                lapin::BasicProperties::default(),
128            )
129            .await
130        {
131            Err(err) => {
132                log::error!("Failed to publish message: {:?}", err);
133                Err(RabbitMqError::LapinError(err))
134            }
135            Ok(_) => Ok(()),
136        }
137    }
138
139    // Receive messages from a queue with no timeout
140    pub async fn await_message_no_timeout(
141        &self,
142        queue_name: &str,
143        message_id: String,
144        ack_on_success: bool,
145    ) -> Result<Message, RabbitMqError> {
146        let mut consumer = {
147            let channel = self.channel.lock().await;
148
149            let consumer_res = channel
150                .basic_consume(
151                    queue_name,
152                    "consumer",
153                    lapin::options::BasicConsumeOptions::default(),
154                    FieldTable::default(),
155                )
156                .await;
157
158            match consumer_res {
159                Ok(consumer) => {
160                    log::info!("Established queue connection to {}", queue_name);
161                    consumer
162                }
163                Err(err) => {
164                    log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
165                    return Err(RabbitMqError::LapinError(err));
166                }
167            }
168        };
169
170        debug!("Starting to consume from {}", queue_name);
171
172        while let Some(delivery_result) = consumer.next().await {
173            let delivery = match delivery_result {
174                Ok(del) => del,
175                Err(_) => return Err(RabbitMqError::DeserializationError),
176            };
177            let data = &delivery.data;
178            let message_str = match std::str::from_utf8(&data) {
179                Ok(str) => str,
180                Err(_) => {
181                    return Err(RabbitMqError::DeserializationError);
182                }
183            };
184
185            debug!("Received message: {}", message_str);
186
187            // Parse the message
188            let message = match serde_json::from_str::<Message>(message_str) {
189                Ok(m) => m,
190                Err(e) => {
191                    log::error!("Failed to parse message: {:?}", e);
192                    return Err(RabbitMqError::DeserializationError);
193                }
194            };
195
196            if message.message_id == message_id {
197                if ack_on_success {
198                    if let Err(delivery_error) = delivery
199                        .ack(lapin::options::BasicAckOptions::default())
200                        .await
201                    {
202                        log::error!("Failed to acknowledge message: {:?}", delivery_error);
203                    }
204                }
205
206                return Ok(message);
207            }
208        }
209        Err(RabbitMqError::DeserializationError)
210    }
211
212    // Function intended to get used by the runtime
213    pub async fn receive_messages(
214        &self,
215        queue_name: &str,
216        handle_message: fn(Message) -> Result<Message, lapin::Error>,
217    ) -> Result<(), RabbitMqError> {
218        let mut consumer = {
219            let channel = self.channel.lock().await;
220
221            let consumer_res = channel
222                .basic_consume(
223                    queue_name,
224                    "consumer",
225                    BasicConsumeOptions::default(),
226                    FieldTable::default(),
227                )
228                .await;
229
230            match consumer_res {
231                Ok(consumer) => {
232                    log::info!("Established queue connection to {}", queue_name);
233                    consumer
234                }
235                Err(err) => {
236                    log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
237                    return Err(RabbitMqError::LapinError(err));
238                }
239            }
240        };
241
242        debug!("Starting to consume from {}", queue_name);
243
244        while let Some(delivery) = consumer.next().await {
245            let delivery = match delivery {
246                Ok(del) => del,
247                Err(err) => {
248                    log::error!("Error receiving message: {:?}", err);
249                    return Err(RabbitMqError::LapinError(err));
250                }
251            };
252
253            let data = &delivery.data;
254            let message_str = match std::str::from_utf8(&data) {
255                Ok(str) => {
256                    log::info!("Received message: {}", str);
257                    str
258                }
259                Err(err) => {
260                    log::error!("Error decoding message: {:?}", err);
261                    return Err(RabbitMqError::DeserializationError);
262                }
263            };
264            // Parse the message
265            let inc_message = match serde_json::from_str::<Message>(message_str) {
266                Ok(mess) => mess,
267                Err(err) => {
268                    log::error!("Error parsing message: {:?}", err);
269                    return Err(RabbitMqError::DeserializationError);
270                }
271            };
272
273            let message = match handle_message(inc_message) {
274                Ok(mess) => mess,
275                Err(err) => {
276                    log::error!("Error handling message: {:?}", err);
277                    return Err(RabbitMqError::DeserializationError);
278                }
279            };
280
281            let message_json = match serde_json::to_string(&message) {
282                Ok(json) => json,
283                Err(err) => {
284                    log::error!("Error serializing message: {:?}", err);
285                    return Err(RabbitMqError::SerializationError);
286                }
287            };
288
289            {
290                let _ = self.send_message(message_json, "recieve_queue").await;
291            }
292
293            // Acknowledge the message
294            if let Err(delivery_error) = delivery
295                .ack(lapin::options::BasicAckOptions::default())
296                .await
297            {
298                log::error!("Failed to acknowledge message: {:?}", delivery_error);
299            }
300        }
301
302        Ok(())
303    }
304
305    // Receive messages from a queue with timeout
306    pub async fn await_message(
307        &self,
308        queue_name: &str,
309        message_id: String,
310        timeout: Duration,
311        ack_on_success: bool,
312    ) -> Result<Message, RabbitMqError> {
313        // Set a timeout
314        match tokio::time::timeout(
315            timeout,
316            self.await_message_no_timeout(queue_name, message_id, ack_on_success),
317        )
318        .await
319        {
320            Ok(result) => result,
321            Err(_) => {
322                debug!(
323                    "Timeout waiting for message after {} seconds",
324                    timeout.as_secs()
325                );
326                Err(RabbitMqError::TimeoutError)
327            }
328        }
329    }
330}