rabbit_auto/
exchanges.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use lapin::{Channel, ExchangeKind};
5pub use lapin::options::{ExchangeDeclareOptions, ExchangeDeleteOptions};
6use lapin::types::FieldTable;
7
8pub type DeclareExchange = Pin<Box<dyn Fn(Arc<Channel>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> + Send + Sync>>;
9
10pub fn create_exchange(exchange: String, kind: ExchangeKind, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
11      Box::pin(move | channel|{
12        let exchange = exchange.clone();
13        let arguments = arguments.clone();
14        let options = options.clone();
15          let kind = kind.clone();
16        Box::pin(async move {
17            let backup_arguments = arguments.clone();
18            let backup_options = options.clone();
19            if let Err(err) = channel.exchange_declare(&exchange, kind.clone(), options.unwrap_or_else(|| ExchangeDeclareOptions::default()), arguments.unwrap_or_else(|| FieldTable::default())).await {
20                log::error!("Failed to declare exchange: {}", err);
21                log::warn!("Deleting the old one and creating a new one");
22
23                match channel.exchange_delete(&exchange, ExchangeDeleteOptions { if_unused: false, nowait: true}).await {
24                    Ok(()) => {
25                        if let Err(err) = channel.exchange_declare(&exchange, kind.clone(), backup_options.unwrap_or_else(|| ExchangeDeclareOptions::default()), backup_arguments.unwrap_or_else(|| FieldTable::default())).await {
26                            log::error!("Failed to declare exchange (Exiting): {}", err);
27                            std::process::exit(1);
28                        }
29                    }
30                    Err(err) => {
31                        log::error!("Cannot delete the exchange to declare a new one (Exiting): {}", err);
32                        std::process::exit(1);
33                    }
34                }
35            }
36            Ok(())
37        })
38    })
39}
40
41
42pub fn create_direct_exchange(exchange: String, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
43    create_exchange(exchange, ExchangeKind::Direct, options, arguments)
44}
45
46pub fn create_topic_exchange(exchange: String, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
47    create_exchange(exchange, ExchangeKind::Topic, options, arguments)
48}