pub mod amqp_pattern_wrapper {
use amiquip::{Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage, Publish};
pub struct ConnectedBroker {
connection: Connection,
channel: Channel,
queue_type: String,
queue_name: String,
init_complete: bool,
}
impl ConnectedBroker {
pub fn connect(addr: String) -> ConnectedBroker {
let mut connection = Connection::insecure_open(&addr.as_str()).expect("connection to receive mq");
let channel = connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str());
ConnectedBroker {
connection,
channel,
queue_type: String::new(),
queue_name: String::new(),
init_complete: false,
}
}
pub fn get_queue_name(&self) -> &String {
&self.queue_name
}
pub fn get_channel(&self) -> &Channel {
&self.channel
}
pub fn close(self) {
self.connection.close().expect("Closing Connection");
}
pub fn declare_work_queue(&mut self, queue_name: String) {
let mut options = QueueDeclareOptions::default();
options.durable = true;
let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect(""));
self.queue_name = String::from(queue.name());
self.queue_type = String::from("work_queue");
self.init_complete = true;
}
pub fn consume<F: Fn(Delivery)>(&self, handler: F) {
if self.init_complete != true {
println!("The queue connection is not configured correctly yet");
return;
}
let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer");
for (_, message) in consumer.receiver().iter().enumerate() {
match message {
ConsumerMessage::Delivery(d) => {
handler(d)
}
other => {
println!("consume ended: {:?}", other)
}
}
}
}
pub fn publish(&self, message: Publish) -> Result<(), amiquip::Error> {
self.channel.basic_publish("", message)
}
}
}