Skip to main content

resilient_rabbitmq_client/
client_handler.rs

1use amqprs::callbacks::ConnectionCallback;
2use amqprs::channel::BasicConsumeArguments;
3use amqprs::channel::Channel;
4use amqprs::connection::{Connection, OpenConnectionArguments};
5use amqprs::consumer::AsyncConsumer;
6use amqprs::error::Error;
7use amqprs::{AmqpChannelId, Close};
8use async_trait::async_trait;
9use futures::lock::Mutex;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc::{self, Receiver, Sender};
13use tokio::time::{Duration, sleep};
14#[allow(unused)]
15use tracing::{debug, error, info, trace, warn};
16
17use crate::consumer::ChannelConsumerHandler;
18use crate::publisher::DataToPublish;
19use crate::publisher::PublishChannel;
20
21pub type ConnectionArguments = OpenConnectionArguments;
22
23/// This struct is holding the state of the RabbitMQ client. The whole resiliency revolves around it.
24/// It will handle connection errors and backup the list of consumers (basic.consume) you declare and recreate them when something happens.
25/// If you need to publish to RabbitMQ as well, a PublishChannel will be extracted from it and regenerated if needed.
26pub struct RabbitMqClientHandler<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> {
27    // Globally useful part
28    need_to_reload_sender: Sender<NeedToReload>,
29    need_to_reload_receiver: Receiver<NeedToReload>,
30    open_connection_arguments: OpenConnectionArguments,
31    new_valid_channel_sender: Option<Sender<()>>,
32
33    // Consumer specific part
34    with_consumer: bool,
35    consumer_connection: Connection,
36    expected_consumers: Vec<(BasicConsumerType, BasicConsumeArguments)>,
37    consume_channels: HashMap<AmqpChannelId, (Channel, BasicConsumerType, BasicConsumeArguments)>,
38
39    // Publisher specific part
40    with_publisher: bool,
41    publisher_connection: Option<Connection>,
42    publish_channel: Option<Arc<Mutex<Channel>>>,
43    data_to_publish_sender: Option<Sender<DataToPublish>>
44}
45
46impl<BasicConsumerType: AsyncConsumer + Clone + Send + 'static>
47    RabbitMqClientHandler<BasicConsumerType>
48{
49    pub async fn new(
50        open_connection_arguments: OpenConnectionArguments,
51    ) -> RabbitMqClientHandler<BasicConsumerType> {
52        loop {
53            match Connection::open(&open_connection_arguments).await {
54                Ok(consumer_connection) => {
55                    let (need_to_reload_sender, need_to_reload_receiver) =
56                        mpsc::channel::<NeedToReload>(100);
57
58                    let connection_callbacks =
59                        ConnectionHandler::from(need_to_reload_sender.clone());
60
61                    match consumer_connection.register_callback(connection_callbacks).await {
62                        Ok(_) => {
63                            debug!(
64                                connection_name = consumer_connection.connection_name(),
65                                "RabbitMqClientHandler creation successful",
66                            );
67
68                            return RabbitMqClientHandler {
69                                // Global
70                                need_to_reload_sender,
71                                need_to_reload_receiver,
72                                open_connection_arguments,
73                                new_valid_channel_sender: None,
74                                // Consume
75                                with_consumer: true,
76                                consumer_connection,
77                                expected_consumers: Vec::new(),
78                                consume_channels: HashMap::new(),
79                                // Publish
80                                with_publisher: false,
81                                publisher_connection: None,
82                                publish_channel: None,
83                                data_to_publish_sender: None,
84                            };
85                        }
86                        Err(error_content) => {
87                            warn!("Failure to register callbacks : {:?}", error_content);
88                        }
89                    }
90                }
91                Err(error_content) => {
92                    warn!(
93                        "Failure to create RabbitMqClientHandler : {:?}",
94                        error_content
95                    );
96                }
97            }
98            sleep(Duration::from_secs(2)).await;
99        }
100    }
101
102    pub async fn try_reconnect(&mut self) -> Result<(), String> {
103        match Connection::open(&self.open_connection_arguments).await {
104            Ok(new_consumer_connection) => {
105                let connection_callbacks =
106                    ConnectionHandler::from(self.need_to_reload_sender.clone());
107
108                match new_consumer_connection
109                    .register_callback(connection_callbacks)
110                    .await
111                {
112                    Ok(_) => {
113                        self.consumer_connection = new_consumer_connection;
114
115                        debug!(
116                            connection_name = self.consumer_connection.connection_name(),
117                            "RabbitMqClientHandler creation successful"
118                        );
119
120                        Ok(())
121                    }
122                    Err(error_content) => Err(format!(
123                        "Failure to register callbacks : {:?}",
124                        error_content
125                    )),
126                }
127            }
128            Err(error_content) => Err(format!(
129                "Failure to create RabbitMqClientHandler : {:?}",
130                error_content
131            )),
132        }
133    }
134
135    pub async fn create_publisher(&mut self) -> Result<PublishChannel, String> {
136        debug!(
137            connection_name = self.consumer_connection.connection_name(),
138            "Beginning to create a publisher based on RabbitMqClientHandler",
139        );
140
141        match self.consumer_connection.open_channel(None).await {
142            Ok(publish_channel) => {
143                let (data_to_publish_sender, data_to_publish_receiver) =
144                    mpsc::channel::<DataToPublish>(100);
145                let (new_valid_channel_sender, new_valid_channel_receiver) =
146                    mpsc::channel::<()>(10);
147
148                let publish_channel_handle = Arc::new(Mutex::new(publish_channel));
149
150                debug!("Publisher created and turned into Arc<Mutex>");
151                self.with_publisher = true;
152                self.publish_channel = Some(publish_channel_handle.clone());
153                self.data_to_publish_sender = Some(data_to_publish_sender);
154                self.new_valid_channel_sender = Some(new_valid_channel_sender);
155
156                Ok(PublishChannel::from(
157                    publish_channel_handle,
158                    new_valid_channel_receiver,
159                    self.need_to_reload_sender.clone(),
160                    data_to_publish_receiver,
161                ))
162            }
163            Err(error_content) => Err(format!("Failed to create publisher : {:?}", error_content)),
164        }
165    }
166
167    pub async fn new_consumer(
168        &mut self,
169        basic_consumer: BasicConsumerType,
170        basic_consumer_args: BasicConsumeArguments,
171    ) {
172        self.expected_consumers
173            .push((basic_consumer.clone(), basic_consumer_args.clone()));
174
175        loop {
176            match self
177                .create_consumer(basic_consumer.clone(), basic_consumer_args.clone())
178                .await
179            {
180                Ok(_) => {
181                    break;
182                }
183                Err(error_content) => {
184                    warn!("Failed to regenerate consumer : {:?}", error_content);
185                    sleep(Duration::from_secs(2)).await;
186                }
187            }
188        }
189    }
190
191    pub async fn create_consumer(
192        &mut self,
193        basic_consumer: BasicConsumerType,
194        basic_consumer_args: BasicConsumeArguments,
195    ) -> Result<(), Error> {
196        debug!(
197            connection_name = self.consumer_connection.connection_name(),
198            "create_consumer : trying to create channel on connection"
199        );
200        if !self.consumer_connection.is_open() {
201            return Err(Error::ConnectionCloseError(
202                "Connection is closed.".to_string(),
203            ));
204        }
205
206        match self.consumer_connection.open_channel(None).await {
207            Ok(new_channel) => {
208                debug!(channel_id = new_channel.channel_id(), "new channel created");
209
210                match new_channel
211                    .basic_consume(basic_consumer.clone(), basic_consumer_args.clone())
212                    .await
213                {
214                    Ok(consumer_tag) => {
215                        debug!(
216                            channel_id = new_channel.channel_id(),
217                            consumer_tag, "Consumer created"
218                        );
219
220                        match new_channel
221                            .register_callback(ChannelConsumerHandler::from(
222                                basic_consumer.clone(),
223                                basic_consumer_args.clone(),
224                                self.need_to_reload_sender.clone(),
225                            ))
226                            .await
227                        {
228                            Ok(_) => {
229                                info!(
230                                    channel_id = new_channel.channel_id(),
231                                    "Consumer created with callbacks : {:?}", basic_consumer_args
232                                );
233
234                                self.consume_channels.insert(
235                                    new_channel.channel_id(),
236                                    (new_channel, basic_consumer, basic_consumer_args),
237                                );
238
239                                Ok(())
240                            }
241                            Err(error_content) => Err(error_content),
242                        }
243                    }
244                    Err(error_content) => Err(error_content),
245                }
246            }
247            Err(error_content) => Err(error_content),
248        }
249    }
250
251    pub fn get_publish_sender_clone(&self) -> Result<Sender<DataToPublish>, String> {
252        match &self.data_to_publish_sender {
253            Some(sender) => { Ok(sender.clone()) }
254            None => { Err("Called get_publish_sender_clone() on this RabbitMqClientHandler but no publish channel is initialized".to_string()) }
255        }
256    }
257
258    pub async fn keep_alive(&mut self) {
259        while let Some(need_to_reload_details) = self.need_to_reload_receiver.recv().await {
260            match need_to_reload_details {
261                NeedToReload::All => {
262                    debug!("Need to reload whole connection and channels");
263                    debug!(
264                        "Old channels list : {}",
265                        display_channels_list(&self.consume_channels)
266                    );
267                    self.consume_channels.clear();
268                    'outer: loop {
269                        match self.try_reconnect().await {
270                            Ok(_) => {
271                                for (basic_consumer, basic_consumer_args) in
272                                    self.expected_consumers.clone()
273                                {
274                                    loop {
275                                        match self
276                                            .create_consumer(
277                                                basic_consumer.clone(),
278                                                basic_consumer_args.clone(),
279                                            )
280                                            .await
281                                        {
282                                            Ok(_) => {
283                                                break;
284                                            }
285                                            Err(Error::InternalChannelError(error_details)) => {
286                                                warn!(
287                                                    "Failure to create consumer {:?} : {}",
288                                                    basic_consumer_args, error_details
289                                                );
290                                                sleep(Duration::from_secs(2)).await;
291                                                continue;
292                                            }
293                                            Err(error_content) => {
294                                                debug!(
295                                                    "Failure to create consumer : {:?}",
296                                                    error_content
297                                                );
298                                                continue 'outer;
299                                            }
300                                        }
301                                    }
302                                }
303
304                                info!(
305                                    "Reload connection/channels/consumer ok: {}",
306                                    self.consumer_connection.connection_name()
307                                );
308                                if let Err(error_detail) = self
309                                    .need_to_reload_sender
310                                    .send(NeedToReload::PublishChannel)
311                                    .await
312                                {
313                                    error!(
314                                        "Failed to send NeedToReload signal over channel : {:?}",
315                                        error_detail
316                                    )
317                                }
318
319                                break;
320                            }
321                            Err(error_content) => {
322                                warn!(
323                                    "Failed to reload connection/channel/consumer : {:?}",
324                                    error_content
325                                );
326                                sleep(Duration::from_secs(2)).await;
327                            }
328                        }
329                    }
330                }
331                NeedToReload::ConsumerChannel(channel_id) => {
332                    debug!("Need to reload channel {}", channel_id);
333                    debug!(
334                        "Old channels list : {}",
335                        display_channels_list(&self.consume_channels)
336                    );
337
338                    let (basic_consumer, basic_consumer_args) = match self
339                        .consume_channels
340                        .get(&channel_id)
341                    {
342                        Some((
343                            _problematic_channel,
344                            old_basic_consumer,
345                            old_basic_consumer_args,
346                        )) => (old_basic_consumer.clone(), old_basic_consumer_args.clone()),
347                        None => {
348                            error!(
349                                "NeedToReload::ConsumerChannel({}) received but missing from cache. Reload all.",
350                                channel_id
351                            );
352                            if let Err(error_detail) =
353                                self.need_to_reload_sender.send(NeedToReload::All).await
354                            {
355                                error!(
356                                    "Failed to send NeedToReload signal over channel : {:?}",
357                                    error_detail
358                                );
359                            }
360                            continue;
361                        }
362                    };
363
364                    self.consume_channels.remove(&channel_id);
365
366                    loop {
367                        match &mut self
368                            .create_consumer(basic_consumer.clone(), basic_consumer_args.clone())
369                            .await
370                        {
371                            Ok(_) => {
372                                info!("Reload consumer ok: {:?}", basic_consumer_args);
373                                break;
374                            }
375                            Err(Error::InternalChannelError(error_details)) => {
376                                warn!(
377                                    "Failure to create consumer {:?} : {}",
378                                    basic_consumer_args, error_details
379                                );
380                                sleep(Duration::from_secs(2)).await;
381                                continue;
382                            }
383                            Err(_) => {
384                                debug!("Need to reload whole connection an channels");
385                                debug!(
386                                    "Old channels list : {}",
387                                    display_channels_list(&self.consume_channels)
388                                );
389                                self.consume_channels.clear();
390                                'outer: loop {
391                                    match self.try_reconnect().await {
392                                        Ok(_) => {
393                                            for (basic_consumer, basic_consumer_args) in
394                                                self.expected_consumers.clone()
395                                            {
396                                                loop {
397                                                    match self
398                                                        .create_consumer(
399                                                            basic_consumer.clone(),
400                                                            basic_consumer_args.clone(),
401                                                        )
402                                                        .await
403                                                    {
404                                                        Ok(_) => {
405                                                            break;
406                                                        }
407                                                        Err(Error::InternalChannelError(
408                                                            error_details,
409                                                        )) => {
410                                                            warn!(
411                                                                "Failure to create consumer {:?} : {}",
412                                                                basic_consumer_args, error_details
413                                                            );
414                                                            sleep(Duration::from_secs(2)).await;
415                                                            continue;
416                                                        }
417                                                        Err(error_content) => {
418                                                            debug!(
419                                                                "Failure to create consumer : {:?}",
420                                                                error_content
421                                                            );
422                                                            continue 'outer;
423                                                        }
424                                                    }
425                                                }
426                                            }
427
428                                            info!(
429                                                "Reload connection/channels/consumer ok: {}",
430                                                self.consumer_connection.connection_name()
431                                            );
432                                            break;
433                                        }
434                                        Err(error_content) => {
435                                            warn!(
436                                                "Échec reload connection/channel/consumer : {:?}",
437                                                error_content
438                                            );
439                                            sleep(Duration::from_secs(2)).await;
440                                        }
441                                    }
442                                }
443                            }
444                        }
445                    }
446                }
447                NeedToReload::PublishChannel => {
448                    if self.with_publisher {
449                        loop {
450                            match self.consumer_connection.open_channel(None).await {
451                                Ok(new_publish_channel) => {
452                                    info!(
453                                        "Reload publisher ok on channel {}",
454                                        new_publish_channel.channel_id()
455                                    );
456
457                                    let publish_channel_clone =
458                                        self.publish_channel.clone().unwrap();
459                                    let mut publish_channel_handle =
460                                        publish_channel_clone.lock().await;
461                                    *publish_channel_handle = new_publish_channel;
462
463                                    self.new_valid_channel_sender
464                                        .clone()
465                                        .unwrap()
466                                        .send(())
467                                        .await
468                                        .unwrap();
469
470                                    break;
471                                }
472                                Err(Error::InternalChannelError(error_details)) => {
473                                    warn!("Failure to create PublishChannel : {}", error_details);
474                                    sleep(Duration::from_secs(2)).await;
475                                    continue;
476                                }
477                                Err(error_content) => {
478                                    debug!(
479                                        "Failure to create PublishChannel : {:?}. Need to reload everything.",
480                                        error_content
481                                    );
482                                    if let Err(error_detail) =
483                                        self.need_to_reload_sender.send(NeedToReload::All).await
484                                    {
485                                        error!(
486                                            "Failed to send NeedToReload signal over channel : {:?}",
487                                            error_detail
488                                        )
489                                    }
490                                }
491                            }
492                        }
493                    }
494                }
495            }
496
497            debug!(
498                "New channels list : {}",
499                display_channels_list(&self.consume_channels)
500            );
501        }
502    }
503}
504
505/// The type which implements the ConnectionCallback trait from amqprs
506#[derive(Clone)]
507pub struct ConnectionHandler {
508    need_to_reload_sender: Sender<NeedToReload>,
509}
510
511impl ConnectionHandler {
512    pub fn from(need_to_reload_sender: Sender<NeedToReload>) -> ConnectionHandler {
513        ConnectionHandler {
514            need_to_reload_sender,
515        }
516    }
517}
518
519#[async_trait]
520impl ConnectionCallback for ConnectionHandler {
521    async fn close(&mut self, connection: &Connection, close: Close) -> Result<(), Error> {
522        warn!(
523            "End of connection received from broker : connection {}, cause: {}",
524            connection, close
525        );
526
527        loop {
528            match self.need_to_reload_sender.send(NeedToReload::All).await {
529                Ok(_) => {
530                    sleep(Duration::from_secs(1)).await;
531                    break;
532                }
533                Err(error_content) => {
534                    warn!(
535                        "Failed to send need_new_connection MPSC signal : {:?}",
536                        error_content
537                    );
538                    sleep(Duration::from_secs(5)).await;
539                }
540            }
541        }
542
543        Ok(())
544    }
545
546    async fn blocked(&mut self, connection: &Connection, reason: String) {
547        warn!(
548            "Blocked connection received from broker : connection {}, cause: {}",
549            connection, reason
550        );
551
552        loop {
553            match self.need_to_reload_sender.send(NeedToReload::All).await {
554                Ok(_) => {
555                    sleep(Duration::from_secs(1)).await;
556                    break;
557                }
558                Err(error_content) => {
559                    warn!(
560                        "Failed to send need_new_connection MPSC signal : {:?}",
561                        error_content
562                    );
563                    sleep(Duration::from_secs(5)).await;
564                }
565            }
566        }
567    }
568
569    async fn unblocked(&mut self, connection: &Connection) {
570        warn!(
571            "Unblocked connection received from broker : connection {}",
572            connection
573        );
574    }
575
576    async fn secret_updated(&mut self, connection: &Connection) {
577        warn!("secret_updated : connection {}", connection);
578
579        loop {
580            match self.need_to_reload_sender.send(NeedToReload::All).await {
581                Ok(_) => {
582                    sleep(Duration::from_secs(1)).await;
583                    break;
584                }
585                Err(error_content) => {
586                    warn!(
587                        "Failed to send need_new_connection MPSC signal : {:?}",
588                        error_content
589                    );
590                    sleep(Duration::from_secs(5)).await;
591                }
592            }
593        }
594    }
595}
596
597/// Convenient enum type for this crate's internals. This enum is being sent by the ChannelConsumerHandler instances to the ConnectionHandler whenever something bad is detected.
598#[derive(Debug, Clone)]
599pub enum NeedToReload {
600    All,
601    ConsumerChannel(AmqpChannelId),
602    PublishChannel,
603}
604
605/// Convenient method for debugging purposes. When used in a DEBUG log level context, this crate will display channels lists at specific points (mainly when an item needs to be reloaded).
606fn display_channels_list<BasicConsumerType: AsyncConsumer + Clone + Send + 'static>(
607    channels: &HashMap<AmqpChannelId, (Channel, BasicConsumerType, BasicConsumeArguments)>,
608) -> String {
609    let mut list = String::new();
610    for channel_id in channels.keys() {
611        list.push_str(&format!("{} ", channel_id));
612    }
613    list
614}