1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
use eventific::notification::{Sender, NotificationError}; use slog::Logger; use futures::Future; use uuid::Uuid; use std::sync::{RwLock, Arc}; use lapin_futures::{Channel, ConnectionProperties, Client, BasicProperties}; use lapin_futures::options::ExchangeDeclareOptions; use lapin_futures::options::BasicPublishOptions; use lapin_futures::types::FieldTable; use std::process; pub struct RabbitMqSender { amqp_address: String, exchange_name: Option<String>, logger: Option<Logger>, client: Option<Client>, } impl RabbitMqSender { pub fn new(amqp_address: &str) -> Self { Self { amqp_address: amqp_address.to_owned(), logger: None, exchange_name: None, client: None } } } impl Sender for RabbitMqSender { fn init(&mut self, logger: &Logger, service_name: &str) -> Box<dyn Future<Item=(), Error=NotificationError> + Send> { self.logger.replace(logger.new(o!("sender" => "rabbitmq"))); let exchange_name = service_name.to_owned(); self.exchange_name.replace(exchange_name.to_owned()); let log = logger.clone(); let log2 = logger.clone(); info!(log, "Initializing new 🐰 RabbitMq Sender!"); match Client::connect(&self.amqp_address, ConnectionProperties::default()).wait() { Ok(client) => { client.on_error(Box::new(|| { eprintln!("Rabbitmq Error"); eprintln!("Shutting down eventific..."); process::exit(1); })); info!(log, "Successfully initialized new 🐰 RabbitMq Sender!"); self.client.replace(client); Box::new(futures::finished(())) }, Err(err) => { error!(log, "Failed to initialize 🐰 RabbitMQ sender"); Box::new(futures::failed(NotificationError::Unknown(format_err!("{}", err)))) }, } } fn send(&self, aggregate_id: Uuid) -> Box<dyn Future<Item=(), Error=NotificationError> + Send> { let client = self.client.as_ref().expect("The listener has to be initialized"); let logger = self.logger.as_ref().unwrap().clone(); let err_logger = logger.clone(); let exchange_name = self.exchange_name.clone().expect("The listener has to be initialized"); info!(logger, "Sending notification to rabbit exchange"; "uuid" => format!("{}", &aggregate_id)); Box::new(client.create_channel() .map_err(move |err| { error!(err_logger, "Failed to open channel to rabbit"; "error" => format!("{}", err)); NotificationError::FailedToSend(format_err!("{}", err)) }) .and_then(move |channel| { let payload = aggregate_id.as_bytes().to_vec(); let options = BasicPublishOptions::default(); let properties = BasicProperties::default(); let err_log = logger.clone(); channel.basic_publish(&exchange_name, "", payload, options, properties) .map_err(move |err| { error!(err_log, "Failed to send message to rabbit exchange"; "error" => format!("{}", err)); NotificationError::FailedToSend(format_err!("{}", err)) }) .map(move |_| { info!(logger, "Successfully sent message to rabbit exchange"); () }) })) } }