my_awesome_rabbitmq_lib/
start.rs

1use crate::emitter::Emitter;
2use crate::events::{ MicroserviceEvent};
3use crate::queue_consumer_props::{Exchange, QueueConsumerProps};
4use crate::saga::{CommandHandler, StepCommand};
5use tracing::error;
6use crate::connection::{RabbitMQClient, RabbitMQError};
7use crate::events_consume::EventHandler;
8pub(crate) type EventEmitter = Emitter<EventHandler, MicroserviceEvent>;
9pub(crate) type SagaEmitter = Emitter<CommandHandler, StepCommand>;
10
11impl RabbitMQClient {
12    pub async fn connect_to_events(
13        &self,
14    ) -> Result<EventEmitter, RabbitMQError> {
15        let queue_name = self.events_queue_name.clone();
16        self.create_header_consumers(&queue_name, self.events)
17            .await?;
18        let emitter = self.start_consuming_events().await;
19
20        Ok(emitter)
21    }
22
23    pub(crate) async fn start_consuming_events(&self) -> EventEmitter {
24        let mut emitter_guard = self.event_emitter.lock().await;
25        let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();
26
27        tokio::spawn({
28            let client = self.clone();
29            let queue_name = self.events_queue_name.clone();
30            let emitter = emitter.clone();
31
32            async move {
33                if let Err(e) = client.consume_events(&queue_name, emitter).await {
34                    error!("Error consuming messages: {:?}", e);
35                }
36            }
37        });
38
39        emitter
40    }
41
42    pub async fn connect_to_saga_commands(
43        &self,
44    ) -> Result<SagaEmitter, RabbitMQError> {
45        let queue_name = self.saga_queue_name.clone();
46
47        self.create_consumers(vec![QueueConsumerProps {
48            queue_name,
49            exchange: Exchange::COMMANDS,
50        }])
51        .await?;
52
53        let emitter = self.start_consuming_saga_commands().await;
54
55        Ok(emitter)
56    }
57
58    pub(crate) async fn start_consuming_saga_commands(&self) -> SagaEmitter {
59        let mut emitter_guard = self.saga_emitter.lock().await;
60        let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();
61
62        tokio::spawn({
63            let client = self.clone();
64            let queue_name = self.saga_queue_name.clone();
65            let emitter = emitter.clone();
66
67            async move {
68                if let Err(e) = client.consume_saga_steps(&queue_name, emitter).await {
69                    error!("Error consuming messages: {:?}", e);
70                }
71            }
72        });
73
74        emitter
75    }
76}