1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use lapin::{Channel, ExchangeKind};
5pub use lapin::options::{ExchangeDeclareOptions, ExchangeDeleteOptions};
6use lapin::types::FieldTable;
7
8pub type DeclareExchange = Pin<Box<dyn Fn(Arc<Channel>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> + Send + Sync>>;
9
10pub fn create_exchange(exchange: String, kind: ExchangeKind, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
11 Box::pin(move | channel|{
12 let exchange = exchange.clone();
13 let arguments = arguments.clone();
14 let options = options.clone();
15 let kind = kind.clone();
16 Box::pin(async move {
17 let backup_arguments = arguments.clone();
18 let backup_options = options.clone();
19 if let Err(err) = channel.exchange_declare(&exchange, kind.clone(), options.unwrap_or_else(|| ExchangeDeclareOptions::default()), arguments.unwrap_or_else(|| FieldTable::default())).await {
20 log::error!("Failed to declare exchange: {}", err);
21 log::warn!("Deleting the old one and creating a new one");
22
23 match channel.exchange_delete(&exchange, ExchangeDeleteOptions { if_unused: false, nowait: true}).await {
24 Ok(()) => {
25 if let Err(err) = channel.exchange_declare(&exchange, kind.clone(), backup_options.unwrap_or_else(|| ExchangeDeclareOptions::default()), backup_arguments.unwrap_or_else(|| FieldTable::default())).await {
26 log::error!("Failed to declare exchange (Exiting): {}", err);
27 std::process::exit(1);
28 }
29 }
30 Err(err) => {
31 log::error!("Cannot delete the exchange to declare a new one (Exiting): {}", err);
32 std::process::exit(1);
33 }
34 }
35 }
36 Ok(())
37 })
38 })
39}
40
41
42pub fn create_direct_exchange(exchange: String, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
43 create_exchange(exchange, ExchangeKind::Direct, options, arguments)
44}
45
46pub fn create_topic_exchange(exchange: String, options: Option<ExchangeDeclareOptions>, arguments: Option<FieldTable> ) -> DeclareExchange {
47 create_exchange(exchange, ExchangeKind::Topic, options, arguments)
48}