1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
pub mod amqp_pattern_wrapper { use amiquip::{Queue, Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage}; use std::borrow::Borrow; pub struct ConnectedBroker { connection: Box<Connection>, channel: Box<Channel>, queue_type: String, queue_name: String, init_complete: bool, } impl ConnectedBroker { pub fn connect(addr: String) -> ConnectedBroker { let mut connection = Box::new(Connection::insecure_open(&addr.as_str()).expect("connection to receive mq")); let channel = Box::new(connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str())); ConnectedBroker { connection, channel, queue_type: String::new(), queue_name: String::new(), init_complete: false, } } pub fn close (mut self) { self.connection.close(); } pub fn declare_work_queue(&mut self, queue_name: String) { let mut options = QueueDeclareOptions::default(); options.durable = true; let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect("")); self.queue_name = String::from(queue.name()); } pub fn consume<F: Fn(Delivery)>(&self, handler: F) { let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer"); for (_, message) in consumer.receiver().iter().enumerate() { match message { ConsumerMessage::Delivery(d) => { handler(d) } other => { println!("consume ended: {:?}", other) } } } } } }