1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use eventific::notification::{Listener, NotificationError}; use slog::Logger; use futures::{Future, Stream}; use uuid::Uuid; use lapin_futures::{Client, ConnectionProperties, Consumer, Channel, Queue}; use std::sync::{RwLock, Arc}; use lapin_futures::options::QueueDeclareOptions; use lapin_futures::options::QueueBindOptions; use lapin_futures::options::BasicConsumeOptions; use lapin_futures::types::FieldTable; use lapin_futures::message::Delivery; use std::process; pub struct RabbitMqListener { amqp_address: String, queue_postfix: String, logger: Option<Logger>, client: Option<Client>, queue_name: Option<String> } impl RabbitMqListener { pub fn new(amqp_address: &str, queue_postfix: &str) -> Self { Self { amqp_address: amqp_address.to_owned(), queue_postfix: queue_postfix.to_owned(), logger: None, client: None, queue_name: None } } fn create_channel(logger: &Logger, client: &Client) -> impl Future<Item=(Logger, Channel), Error=NotificationError> { let log = logger.clone(); let err_log = logger.clone(); info!(log, "Establishing a new channel to 🐰 RabbitMq"); client.create_channel() .map_err(move |err| { error!(err_log, "Failed to establish a channel to 🐰 RabbitMQ"; "error" => format!("{}", err)); NotificationError::Unknown(format_err!("{}", err)) }) .map(move |c| (log, c)) } fn create_queue((logger, channel): (Logger, Channel), queue_name: &str) -> impl Future<Item=(Logger, Channel, Queue), Error=NotificationError> { let options = QueueDeclareOptions { durable: false, ..Default::default() }; let err_log = logger.clone(); info!(logger, "Declaring queue {}", queue_name); channel.queue_declare(&queue_name, options, FieldTable::default()) .map_err(move |err| { error!(err_log, "Failed to create queue in 🐰 RabbitMQ"; "error" => format!("{}", err)); NotificationError::Unknown(format_err!("{}", err)) }) .and_then(move |queue| { Ok((logger, channel, queue)) }) } fn consume_queue((logger, channel, queue): (Logger, Channel, Queue)) -> impl Stream<Item=Delivery, Error=NotificationError> { info!(logger, "Starting to tail queue"); let err_log = logger.clone(); let options = BasicConsumeOptions { no_ack: true, no_local: true, nowait: false, exclusive: false }; channel.basic_consume(&queue, "eventific", options, FieldTable::default()) .map_err(move |err| { error!(err_log, "Failed to tail rabbit queue"; "error" => format!("{}", err)); NotificationError::Unknown(format_err!("{}", err)) }) .and_then(move |consumer| { info!(logger, "Successfully started listening to queue"); Ok(consumer.map_err(|err| NotificationError::FailedToListen(format_err!("{}", err)))) }) .into_stream() .flatten() } } impl Listener for RabbitMqListener { fn init(&mut self, logger: &Logger, service_name: &str) -> Box<dyn Future<Item=(), Error=NotificationError> + Send> { self.logger.replace(logger.new(o!("listener" => "rabbitmq"))); self.queue_name.replace(format!("{}-{}", service_name, self.queue_postfix)); let log = self.logger.as_ref().unwrap(); info!(log, "Initializing 🐰 RabbitMQ listener"); match Client::connect(&self.amqp_address, ConnectionProperties::default()).wait() { Ok(client) => { client.on_error(Box::new(|| { eprintln!("Rabbitmq Error"); eprintln!("Shutting down eventific..."); process::exit(1); })); info!(log, "Succesfully initialized 🐰 RabbitMQ listener"); self.client.replace(client); Box::new(futures::finished(())) }, Err(err) => { error!(log, "Failed to initialize 🐰 RabbitMQ listener"); Box::new(futures::failed(NotificationError::Unknown(format_err!("{}", err)))) }, } } fn listen(&self) -> Box<dyn Stream<Item=Uuid, Error=NotificationError> + Send> { let client = self.client.as_ref().expect("The listener has to be initialized"); let logger = self.logger.as_ref().unwrap().clone(); let queue_name = self.queue_name.clone().expect("The listener has to be initialized"); info!(logger, "Starting subscription to rabbit queue"); Box::new( Self::create_channel(&logger, &client) .and_then(move |p| Self::create_queue(p, &queue_name)) .map(Self::consume_queue) .into_stream() .flatten() .and_then(move |delivery| { match Uuid::from_slice(&delivery.data) { Ok(uuid) => { info!(logger, "Successfully parsed uuid"; "uuid" => format!("{}", uuid)); Ok(uuid) }, Err(err) => { warn!(logger, "Failed to parse UUID"; "error" => format!("{}", err)); Ok(Uuid::nil()) } } }) .filter(|uuid| { !uuid.is_nil() }) ) } }