amqppatternwrapper/
lib.rs

1pub mod amqp_pattern_wrapper {
2    use amiquip::{Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage, Publish};
3
4    pub struct ConnectedBroker {
5        connection:  Connection,
6        channel: Channel,
7        queue_type: String,
8        queue_name: String,
9        init_complete: bool,
10    }
11
12    impl ConnectedBroker {
13        pub fn connect(addr: String) -> ConnectedBroker {
14            let mut connection = Connection::insecure_open(&addr.as_str()).expect("connection to receive mq");
15            let channel = connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str());
16
17            ConnectedBroker {
18                connection,
19                channel,
20                queue_type: String::new(),
21                queue_name: String::new(),
22                init_complete: false,
23            }
24        }
25
26        pub fn get_queue_name(&self) -> &String {
27            &self.queue_name
28        }
29
30        pub fn get_channel(&self) -> &Channel {
31            &self.channel
32        }
33        
34        pub fn close(self) {
35            self.connection.close().expect("Closing Connection");
36        }
37
38        pub fn declare_work_queue(&mut self, queue_name: String) {
39            let mut options = QueueDeclareOptions::default();
40            options.durable = true;
41            let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect(""));
42
43            self.queue_name = String::from(queue.name());
44            self.queue_type = String::from("work_queue");
45            self.init_complete = true;
46        }
47
48        pub fn consume<F: Fn(Delivery)>(&self, handler: F) {
49            if self.init_complete != true {
50                println!("The queue connection is not configured correctly yet");
51                return;
52            }
53            let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer");
54
55            for (_, message) in consumer.receiver().iter().enumerate() {
56                match message {
57                    ConsumerMessage::Delivery(d) => {
58                        handler(d)
59                    }
60                    other => {
61                        println!("consume ended: {:?}", other)
62                    }
63                }
64            }
65        }
66
67        pub fn publish(&self, message: Publish) -> Result<(), amiquip::Error> {
68            self.channel.basic_publish("", message)
69        }
70    }
71}