1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
pub mod amqp_pattern_wrapper { use amiquip::{Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage, Publish}; pub struct ConnectedBroker { connection: Connection, channel: Channel, queue_type: String, queue_name: String, init_complete: bool, } impl ConnectedBroker { pub fn connect(addr: String) -> ConnectedBroker { let mut connection = Connection::insecure_open(&addr.as_str()).expect("connection to receive mq"); let channel = connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str()); ConnectedBroker { connection, channel, queue_type: String::new(), queue_name: String::new(), init_complete: false, } } pub fn get_queue_name(&self) -> &String { &self.queue_name } pub fn get_channel(&self) -> &Channel { &self.channel } pub fn close(self) { self.connection.close().expect("Closing Connection"); } pub fn declare_work_queue(&mut self, queue_name: String) { let mut options = QueueDeclareOptions::default(); options.durable = true; let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect("")); self.queue_name = String::from(queue.name()); self.queue_type = String::from("work_queue"); self.init_complete = true; } pub fn consume<F: Fn(Delivery)>(&self, handler: F) { if self.init_complete != true { println!("The queue connection is not configured correctly yet"); return; } let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer"); for (_, message) in consumer.receiver().iter().enumerate() { match message { ConsumerMessage::Delivery(d) => { handler(d) } other => { println!("consume ended: {:?}", other) } } } } pub fn publish(&self, message: Publish) -> Result<(), amiquip::Error> { self.channel.basic_publish("", message) } } }