resilient_rabbitmq_client/
client_handler.rs1use amqprs::callbacks::ConnectionCallback;
2use amqprs::channel::BasicConsumeArguments;
3use amqprs::channel::Channel;
4use amqprs::connection::{Connection, OpenConnectionArguments};
5use amqprs::consumer::AsyncConsumer;
6use amqprs::error::Error;
7use amqprs::{AmqpChannelId, Close};
8use async_trait::async_trait;
9use futures::lock::Mutex;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc::{self, Receiver, Sender};
13use tokio::time::{Duration, sleep};
14#[allow(unused)]
15use tracing::{debug, error, info, trace, warn};
16
17use crate::consumer::ChannelConsumerHandler;
18use crate::publisher::DataToPublish;
19use crate::publisher::PublishChannel;
20
21pub type ConnectionArguments = OpenConnectionArguments;
22
23pub struct RabbitMqClientHandler<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> {
27 need_to_reload_sender: Sender<NeedToReload>,
29 need_to_reload_receiver: Receiver<NeedToReload>,
30 open_connection_arguments: OpenConnectionArguments,
31 new_valid_channel_sender: Option<Sender<()>>,
32
33 with_consumer: bool,
35 consumer_connection: Connection,
36 expected_consumers: Vec<(BasicConsumerType, BasicConsumeArguments)>,
37 consume_channels: HashMap<AmqpChannelId, (Channel, BasicConsumerType, BasicConsumeArguments)>,
38
39 with_publisher: bool,
41 publisher_connection: Option<Connection>,
42 publish_channel: Option<Arc<Mutex<Channel>>>,
43 data_to_publish_sender: Option<Sender<DataToPublish>>
44}
45
46impl<BasicConsumerType: AsyncConsumer + Clone + Send + 'static>
47 RabbitMqClientHandler<BasicConsumerType>
48{
49 pub async fn new(
50 open_connection_arguments: OpenConnectionArguments,
51 ) -> RabbitMqClientHandler<BasicConsumerType> {
52 loop {
53 match Connection::open(&open_connection_arguments).await {
54 Ok(consumer_connection) => {
55 let (need_to_reload_sender, need_to_reload_receiver) =
56 mpsc::channel::<NeedToReload>(100);
57
58 let connection_callbacks =
59 ConnectionHandler::from(need_to_reload_sender.clone());
60
61 match consumer_connection.register_callback(connection_callbacks).await {
62 Ok(_) => {
63 debug!(
64 connection_name = consumer_connection.connection_name(),
65 "RabbitMqClientHandler creation successful",
66 );
67
68 return RabbitMqClientHandler {
69 need_to_reload_sender,
71 need_to_reload_receiver,
72 open_connection_arguments,
73 new_valid_channel_sender: None,
74 with_consumer: true,
76 consumer_connection,
77 expected_consumers: Vec::new(),
78 consume_channels: HashMap::new(),
79 with_publisher: false,
81 publisher_connection: None,
82 publish_channel: None,
83 data_to_publish_sender: None,
84 };
85 }
86 Err(error_content) => {
87 warn!("Failure to register callbacks : {:?}", error_content);
88 }
89 }
90 }
91 Err(error_content) => {
92 warn!(
93 "Failure to create RabbitMqClientHandler : {:?}",
94 error_content
95 );
96 }
97 }
98 sleep(Duration::from_secs(2)).await;
99 }
100 }
101
102 pub async fn try_reconnect(&mut self) -> Result<(), String> {
103 match Connection::open(&self.open_connection_arguments).await {
104 Ok(new_consumer_connection) => {
105 let connection_callbacks =
106 ConnectionHandler::from(self.need_to_reload_sender.clone());
107
108 match new_consumer_connection
109 .register_callback(connection_callbacks)
110 .await
111 {
112 Ok(_) => {
113 self.consumer_connection = new_consumer_connection;
114
115 debug!(
116 connection_name = self.consumer_connection.connection_name(),
117 "RabbitMqClientHandler creation successful"
118 );
119
120 Ok(())
121 }
122 Err(error_content) => Err(format!(
123 "Failure to register callbacks : {:?}",
124 error_content
125 )),
126 }
127 }
128 Err(error_content) => Err(format!(
129 "Failure to create RabbitMqClientHandler : {:?}",
130 error_content
131 )),
132 }
133 }
134
135 pub async fn create_publisher(&mut self) -> Result<PublishChannel, String> {
136 debug!(
137 connection_name = self.consumer_connection.connection_name(),
138 "Beginning to create a publisher based on RabbitMqClientHandler",
139 );
140
141 match self.consumer_connection.open_channel(None).await {
142 Ok(publish_channel) => {
143 let (data_to_publish_sender, data_to_publish_receiver) =
144 mpsc::channel::<DataToPublish>(100);
145 let (new_valid_channel_sender, new_valid_channel_receiver) =
146 mpsc::channel::<()>(10);
147
148 let publish_channel_handle = Arc::new(Mutex::new(publish_channel));
149
150 debug!("Publisher created and turned into Arc<Mutex>");
151 self.with_publisher = true;
152 self.publish_channel = Some(publish_channel_handle.clone());
153 self.data_to_publish_sender = Some(data_to_publish_sender);
154 self.new_valid_channel_sender = Some(new_valid_channel_sender);
155
156 Ok(PublishChannel::from(
157 publish_channel_handle,
158 new_valid_channel_receiver,
159 self.need_to_reload_sender.clone(),
160 data_to_publish_receiver,
161 ))
162 }
163 Err(error_content) => Err(format!("Failed to create publisher : {:?}", error_content)),
164 }
165 }
166
167 pub async fn new_consumer(
168 &mut self,
169 basic_consumer: BasicConsumerType,
170 basic_consumer_args: BasicConsumeArguments,
171 ) {
172 self.expected_consumers
173 .push((basic_consumer.clone(), basic_consumer_args.clone()));
174
175 loop {
176 match self
177 .create_consumer(basic_consumer.clone(), basic_consumer_args.clone())
178 .await
179 {
180 Ok(_) => {
181 break;
182 }
183 Err(error_content) => {
184 warn!("Failed to regenerate consumer : {:?}", error_content);
185 sleep(Duration::from_secs(2)).await;
186 }
187 }
188 }
189 }
190
191 pub async fn create_consumer(
192 &mut self,
193 basic_consumer: BasicConsumerType,
194 basic_consumer_args: BasicConsumeArguments,
195 ) -> Result<(), Error> {
196 debug!(
197 connection_name = self.consumer_connection.connection_name(),
198 "create_consumer : trying to create channel on connection"
199 );
200 if !self.consumer_connection.is_open() {
201 return Err(Error::ConnectionCloseError(
202 "Connection is closed.".to_string(),
203 ));
204 }
205
206 match self.consumer_connection.open_channel(None).await {
207 Ok(new_channel) => {
208 debug!(channel_id = new_channel.channel_id(), "new channel created");
209
210 match new_channel
211 .basic_consume(basic_consumer.clone(), basic_consumer_args.clone())
212 .await
213 {
214 Ok(consumer_tag) => {
215 debug!(
216 channel_id = new_channel.channel_id(),
217 consumer_tag, "Consumer created"
218 );
219
220 match new_channel
221 .register_callback(ChannelConsumerHandler::from(
222 basic_consumer.clone(),
223 basic_consumer_args.clone(),
224 self.need_to_reload_sender.clone(),
225 ))
226 .await
227 {
228 Ok(_) => {
229 info!(
230 channel_id = new_channel.channel_id(),
231 "Consumer created with callbacks : {:?}", basic_consumer_args
232 );
233
234 self.consume_channels.insert(
235 new_channel.channel_id(),
236 (new_channel, basic_consumer, basic_consumer_args),
237 );
238
239 Ok(())
240 }
241 Err(error_content) => Err(error_content),
242 }
243 }
244 Err(error_content) => Err(error_content),
245 }
246 }
247 Err(error_content) => Err(error_content),
248 }
249 }
250
251 pub fn get_publish_sender_clone(&self) -> Result<Sender<DataToPublish>, String> {
252 match &self.data_to_publish_sender {
253 Some(sender) => { Ok(sender.clone()) }
254 None => { Err("Called get_publish_sender_clone() on this RabbitMqClientHandler but no publish channel is initialized".to_string()) }
255 }
256 }
257
258 pub async fn keep_alive(&mut self) {
259 while let Some(need_to_reload_details) = self.need_to_reload_receiver.recv().await {
260 match need_to_reload_details {
261 NeedToReload::All => {
262 debug!("Need to reload whole connection and channels");
263 debug!(
264 "Old channels list : {}",
265 display_channels_list(&self.consume_channels)
266 );
267 self.consume_channels.clear();
268 'outer: loop {
269 match self.try_reconnect().await {
270 Ok(_) => {
271 for (basic_consumer, basic_consumer_args) in
272 self.expected_consumers.clone()
273 {
274 loop {
275 match self
276 .create_consumer(
277 basic_consumer.clone(),
278 basic_consumer_args.clone(),
279 )
280 .await
281 {
282 Ok(_) => {
283 break;
284 }
285 Err(Error::InternalChannelError(error_details)) => {
286 warn!(
287 "Failure to create consumer {:?} : {}",
288 basic_consumer_args, error_details
289 );
290 sleep(Duration::from_secs(2)).await;
291 continue;
292 }
293 Err(error_content) => {
294 debug!(
295 "Failure to create consumer : {:?}",
296 error_content
297 );
298 continue 'outer;
299 }
300 }
301 }
302 }
303
304 info!(
305 "Reload connection/channels/consumer ok: {}",
306 self.consumer_connection.connection_name()
307 );
308 if let Err(error_detail) = self
309 .need_to_reload_sender
310 .send(NeedToReload::PublishChannel)
311 .await
312 {
313 error!(
314 "Failed to send NeedToReload signal over channel : {:?}",
315 error_detail
316 )
317 }
318
319 break;
320 }
321 Err(error_content) => {
322 warn!(
323 "Failed to reload connection/channel/consumer : {:?}",
324 error_content
325 );
326 sleep(Duration::from_secs(2)).await;
327 }
328 }
329 }
330 }
331 NeedToReload::ConsumerChannel(channel_id) => {
332 debug!("Need to reload channel {}", channel_id);
333 debug!(
334 "Old channels list : {}",
335 display_channels_list(&self.consume_channels)
336 );
337
338 let (basic_consumer, basic_consumer_args) = match self
339 .consume_channels
340 .get(&channel_id)
341 {
342 Some((
343 _problematic_channel,
344 old_basic_consumer,
345 old_basic_consumer_args,
346 )) => (old_basic_consumer.clone(), old_basic_consumer_args.clone()),
347 None => {
348 error!(
349 "NeedToReload::ConsumerChannel({}) received but missing from cache. Reload all.",
350 channel_id
351 );
352 if let Err(error_detail) =
353 self.need_to_reload_sender.send(NeedToReload::All).await
354 {
355 error!(
356 "Failed to send NeedToReload signal over channel : {:?}",
357 error_detail
358 );
359 }
360 continue;
361 }
362 };
363
364 self.consume_channels.remove(&channel_id);
365
366 loop {
367 match &mut self
368 .create_consumer(basic_consumer.clone(), basic_consumer_args.clone())
369 .await
370 {
371 Ok(_) => {
372 info!("Reload consumer ok: {:?}", basic_consumer_args);
373 break;
374 }
375 Err(Error::InternalChannelError(error_details)) => {
376 warn!(
377 "Failure to create consumer {:?} : {}",
378 basic_consumer_args, error_details
379 );
380 sleep(Duration::from_secs(2)).await;
381 continue;
382 }
383 Err(_) => {
384 debug!("Need to reload whole connection an channels");
385 debug!(
386 "Old channels list : {}",
387 display_channels_list(&self.consume_channels)
388 );
389 self.consume_channels.clear();
390 'outer: loop {
391 match self.try_reconnect().await {
392 Ok(_) => {
393 for (basic_consumer, basic_consumer_args) in
394 self.expected_consumers.clone()
395 {
396 loop {
397 match self
398 .create_consumer(
399 basic_consumer.clone(),
400 basic_consumer_args.clone(),
401 )
402 .await
403 {
404 Ok(_) => {
405 break;
406 }
407 Err(Error::InternalChannelError(
408 error_details,
409 )) => {
410 warn!(
411 "Failure to create consumer {:?} : {}",
412 basic_consumer_args, error_details
413 );
414 sleep(Duration::from_secs(2)).await;
415 continue;
416 }
417 Err(error_content) => {
418 debug!(
419 "Failure to create consumer : {:?}",
420 error_content
421 );
422 continue 'outer;
423 }
424 }
425 }
426 }
427
428 info!(
429 "Reload connection/channels/consumer ok: {}",
430 self.consumer_connection.connection_name()
431 );
432 break;
433 }
434 Err(error_content) => {
435 warn!(
436 "Échec reload connection/channel/consumer : {:?}",
437 error_content
438 );
439 sleep(Duration::from_secs(2)).await;
440 }
441 }
442 }
443 }
444 }
445 }
446 }
447 NeedToReload::PublishChannel => {
448 if self.with_publisher {
449 loop {
450 match self.consumer_connection.open_channel(None).await {
451 Ok(new_publish_channel) => {
452 info!(
453 "Reload publisher ok on channel {}",
454 new_publish_channel.channel_id()
455 );
456
457 let publish_channel_clone =
458 self.publish_channel.clone().unwrap();
459 let mut publish_channel_handle =
460 publish_channel_clone.lock().await;
461 *publish_channel_handle = new_publish_channel;
462
463 self.new_valid_channel_sender
464 .clone()
465 .unwrap()
466 .send(())
467 .await
468 .unwrap();
469
470 break;
471 }
472 Err(Error::InternalChannelError(error_details)) => {
473 warn!("Failure to create PublishChannel : {}", error_details);
474 sleep(Duration::from_secs(2)).await;
475 continue;
476 }
477 Err(error_content) => {
478 debug!(
479 "Failure to create PublishChannel : {:?}. Need to reload everything.",
480 error_content
481 );
482 if let Err(error_detail) =
483 self.need_to_reload_sender.send(NeedToReload::All).await
484 {
485 error!(
486 "Failed to send NeedToReload signal over channel : {:?}",
487 error_detail
488 )
489 }
490 }
491 }
492 }
493 }
494 }
495 }
496
497 debug!(
498 "New channels list : {}",
499 display_channels_list(&self.consume_channels)
500 );
501 }
502 }
503}
504
505#[derive(Clone)]
507pub struct ConnectionHandler {
508 need_to_reload_sender: Sender<NeedToReload>,
509}
510
511impl ConnectionHandler {
512 pub fn from(need_to_reload_sender: Sender<NeedToReload>) -> ConnectionHandler {
513 ConnectionHandler {
514 need_to_reload_sender,
515 }
516 }
517}
518
519#[async_trait]
520impl ConnectionCallback for ConnectionHandler {
521 async fn close(&mut self, connection: &Connection, close: Close) -> Result<(), Error> {
522 warn!(
523 "End of connection received from broker : connection {}, cause: {}",
524 connection, close
525 );
526
527 loop {
528 match self.need_to_reload_sender.send(NeedToReload::All).await {
529 Ok(_) => {
530 sleep(Duration::from_secs(1)).await;
531 break;
532 }
533 Err(error_content) => {
534 warn!(
535 "Failed to send need_new_connection MPSC signal : {:?}",
536 error_content
537 );
538 sleep(Duration::from_secs(5)).await;
539 }
540 }
541 }
542
543 Ok(())
544 }
545
546 async fn blocked(&mut self, connection: &Connection, reason: String) {
547 warn!(
548 "Blocked connection received from broker : connection {}, cause: {}",
549 connection, reason
550 );
551
552 loop {
553 match self.need_to_reload_sender.send(NeedToReload::All).await {
554 Ok(_) => {
555 sleep(Duration::from_secs(1)).await;
556 break;
557 }
558 Err(error_content) => {
559 warn!(
560 "Failed to send need_new_connection MPSC signal : {:?}",
561 error_content
562 );
563 sleep(Duration::from_secs(5)).await;
564 }
565 }
566 }
567 }
568
569 async fn unblocked(&mut self, connection: &Connection) {
570 warn!(
571 "Unblocked connection received from broker : connection {}",
572 connection
573 );
574 }
575
576 async fn secret_updated(&mut self, connection: &Connection) {
577 warn!("secret_updated : connection {}", connection);
578
579 loop {
580 match self.need_to_reload_sender.send(NeedToReload::All).await {
581 Ok(_) => {
582 sleep(Duration::from_secs(1)).await;
583 break;
584 }
585 Err(error_content) => {
586 warn!(
587 "Failed to send need_new_connection MPSC signal : {:?}",
588 error_content
589 );
590 sleep(Duration::from_secs(5)).await;
591 }
592 }
593 }
594 }
595}
596
597#[derive(Debug, Clone)]
599pub enum NeedToReload {
600 All,
601 ConsumerChannel(AmqpChannelId),
602 PublishChannel,
603}
604
605fn display_channels_list<BasicConsumerType: AsyncConsumer + Clone + Send + 'static>(
607 channels: &HashMap<AmqpChannelId, (Channel, BasicConsumerType, BasicConsumeArguments)>,
608) -> String {
609 let mut list = String::new();
610 for channel_id in channels.keys() {
611 list.push_str(&format!("{} ", channel_id));
612 }
613 list
614}