amqppatternwrapper/
lib.rs1pub mod amqp_pattern_wrapper {
2 use amiquip::{Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage, Publish};
3
4 pub struct ConnectedBroker {
5 connection: Connection,
6 channel: Channel,
7 queue_type: String,
8 queue_name: String,
9 init_complete: bool,
10 }
11
12 impl ConnectedBroker {
13 pub fn connect(addr: String) -> ConnectedBroker {
14 let mut connection = Connection::insecure_open(&addr.as_str()).expect("connection to receive mq");
15 let channel = connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str());
16
17 ConnectedBroker {
18 connection,
19 channel,
20 queue_type: String::new(),
21 queue_name: String::new(),
22 init_complete: false,
23 }
24 }
25
26 pub fn get_queue_name(&self) -> &String {
27 &self.queue_name
28 }
29
30 pub fn get_channel(&self) -> &Channel {
31 &self.channel
32 }
33
34 pub fn close(self) {
35 self.connection.close().expect("Closing Connection");
36 }
37
38 pub fn declare_work_queue(&mut self, queue_name: String) {
39 let mut options = QueueDeclareOptions::default();
40 options.durable = true;
41 let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect(""));
42
43 self.queue_name = String::from(queue.name());
44 self.queue_type = String::from("work_queue");
45 self.init_complete = true;
46 }
47
48 pub fn consume<F: Fn(Delivery)>(&self, handler: F) {
49 if self.init_complete != true {
50 println!("The queue connection is not configured correctly yet");
51 return;
52 }
53 let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer");
54
55 for (_, message) in consumer.receiver().iter().enumerate() {
56 match message {
57 ConsumerMessage::Delivery(d) => {
58 handler(d)
59 }
60 other => {
61 println!("consume ended: {:?}", other)
62 }
63 }
64 }
65 }
66
67 pub fn publish(&self, message: Publish) -> Result<(), amiquip::Error> {
68 self.channel.basic_publish("", message)
69 }
70 }
71}