1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
pub mod amqp_pattern_wrapper {
    use amiquip::{Channel, Connection, QueueDeclareOptions, Delivery, ConsumerOptions, ConsumerMessage, Publish};

    pub struct ConnectedBroker {
        connection:  Connection,
        channel: Channel,
        queue_type: String,
        queue_name: String,
        init_complete: bool,
    }

    impl ConnectedBroker {
        pub fn connect(addr: String) -> ConnectedBroker {
            let mut connection = Connection::insecure_open(&addr.as_str()).expect("connection to receive mq");
            let channel = connection.open_channel(None).expect(format!("failed to open channel for connection: {}", addr).as_str());

            ConnectedBroker {
                connection,
                channel,
                queue_type: String::new(),
                queue_name: String::new(),
                init_complete: false,
            }
        }

        pub fn get_queue_name(&self) -> &String {
            &self.queue_name
        }

        pub fn get_channel(&self) -> &Channel {
            &self.channel
        }
        
        pub fn close(self) {
            self.connection.close().expect("Closing Connection");
        }

        pub fn declare_work_queue(&mut self, queue_name: String) {
            let mut options = QueueDeclareOptions::default();
            options.durable = true;
            let queue = Box::new(self.channel.queue_declare(&queue_name, options).expect(""));

            self.queue_name = String::from(queue.name());
            self.queue_type = String::from("work_queue");
            self.init_complete = true;
        }

        pub fn consume<F: Fn(Delivery)>(&self, handler: F) {
            if self.init_complete != true {
                println!("The queue connection is not configured correctly yet");
                return;
            }
            let consumer = self.channel.basic_consume(&self.queue_name, ConsumerOptions::default()).expect("create consumer");

            for (_, message) in consumer.receiver().iter().enumerate() {
                match message {
                    ConsumerMessage::Delivery(d) => {
                        handler(d)
                    }
                    other => {
                        println!("consume ended: {:?}", other)
                    }
                }
            }
        }

        pub fn publish(&self, message: Publish) -> Result<(), amiquip::Error> {
            self.channel.basic_publish("", message)
        }
    }
}