Skip to main content

multiple_kinds_of_consumers/
multiple_kinds_of_consumers.rs

1use resilient_rabbitmq_client::prelude::*;
2
3// First we declare all of our consumer types, with their respective implementations of the AsyncConsumer trait.
4#[derive(Clone)]
5pub struct FirstKindOfRabbitMQClient;
6
7#[async_trait]
8impl AsyncConsumer for FirstKindOfRabbitMQClient {
9    #[allow(unused_variables)]
10    async fn consume(
11        &mut self,
12        channel: &Channel,
13        deliver: Deliver,
14        basic_properties: BasicProperties,
15        content: Vec<u8>,
16    ) {
17        // Let's say who we are and print the data, then acknowledge the message.
18        println!(
19            "FirstKindOfRabbitMQClient -> Message received : {}",
20            String::from_utf8_lossy(&content).to_string()
21        );
22
23        let args = BasicAckArguments::new(deliver.delivery_tag(), false);
24        channel.basic_ack(args).await.unwrap();
25    }
26}
27
28#[derive(Clone)]
29pub struct SecondKindOfRabbitMQClient;
30
31#[async_trait]
32impl AsyncConsumer for SecondKindOfRabbitMQClient {
33    #[allow(unused_variables)]
34    async fn consume(
35        &mut self,
36        channel: &Channel,
37        deliver: Deliver,
38        basic_properties: BasicProperties,
39        content: Vec<u8>,
40    ) {
41        // Let's do almost the same thing as the first kind of consumer : print then ack
42        println!(
43            "SecondKindOfRabbitMQClient -> Message received : {}",
44            String::from_utf8_lossy(&content).to_string()
45        );
46
47        let args = BasicAckArguments::new(deliver.delivery_tag(), false);
48        channel.basic_ack(args).await.unwrap();
49    }
50}
51
52// Then we proceed with some polymorphism :
53//  - declare an enum which consists of one or the other kind of consumer
54//  - have it implement the AsyncConsumer trait
55//  - pass this type to the RabbitMqClientHandler
56#[derive(Clone)]
57enum RabbitMQClient {
58    FirstKind(FirstKindOfRabbitMQClient),
59    SecondKind(SecondKindOfRabbitMQClient),
60}
61
62#[async_trait]
63impl AsyncConsumer for RabbitMQClient {
64    async fn consume(
65        &mut self,
66        channel: &Channel,
67        deliver: Deliver,
68        basic_properties: BasicProperties,
69        content: Vec<u8>,
70    ) {
71        match self {
72            RabbitMQClient::FirstKind(first_kind_rabbitmq_client) => {
73                first_kind_rabbitmq_client
74                    .consume(channel, deliver, basic_properties, content)
75                    .await
76            }
77            RabbitMQClient::SecondKind(second_kind_rabbitmq_client) => {
78                second_kind_rabbitmq_client
79                    .consume(channel, deliver, basic_properties, content)
80                    .await
81            }
82        }
83    }
84}
85
86// We can finally create our main function, declare our consumers and keep all of them alive at once.
87#[tokio::main]
88async fn main() {
89    // Declare a RabbitMqClientHandler
90    let rabbitmq_connection_arguments =
91        ConnectionArguments::new("localhost", 5672, "guest", "guest");
92    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95    // First kind of client : consumes from a specific queue a given data format
96    let basic_consumer_first_kind_args =
97        ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98    let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99    resilient_rabbitmq_connection
100        .new_consumer(
101            RabbitMQClient::FirstKind(basic_consumer_first_kind),
102            basic_consumer_first_kind_args,
103        )
104        .await;
105
106    // Second kind of client : consumes from another queue another kind of data
107    let basic_consumer_second_kind_args =
108        ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109    let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110    resilient_rabbitmq_connection
111        .new_consumer(
112            RabbitMQClient::SecondKind(basic_consumer_second_kind),
113            basic_consumer_second_kind_args,
114        )
115        .await;
116
117    // Keep alive the RabbitMqClientHandler
118    resilient_rabbitmq_connection.keep_alive().await;
119}