my_awesome_rabbitmq_lib/
start.rs1use crate::emitter::Emitter;
2use crate::events::{ MicroserviceEvent};
3use crate::queue_consumer_props::{Exchange, QueueConsumerProps};
4use crate::saga::{CommandHandler, StepCommand};
5use tracing::error;
6use crate::connection::{RabbitMQClient, RabbitMQError};
7use crate::events_consume::EventHandler;
8pub(crate) type EventEmitter = Emitter<EventHandler, MicroserviceEvent>;
9pub(crate) type SagaEmitter = Emitter<CommandHandler, StepCommand>;
10
11impl RabbitMQClient {
12 pub async fn connect_to_events(
13 &self,
14 ) -> Result<EventEmitter, RabbitMQError> {
15 let queue_name = self.events_queue_name.clone();
16 self.create_header_consumers(&queue_name, self.events)
17 .await?;
18 let emitter = self.start_consuming_events().await;
19
20 Ok(emitter)
21 }
22
23 pub(crate) async fn start_consuming_events(&self) -> EventEmitter {
24 let mut emitter_guard = self.event_emitter.lock().await;
25 let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();
26
27 tokio::spawn({
28 let client = self.clone();
29 let queue_name = self.events_queue_name.clone();
30 let emitter = emitter.clone();
31
32 async move {
33 if let Err(e) = client.consume_events(&queue_name, emitter).await {
34 error!("Error consuming messages: {:?}", e);
35 }
36 }
37 });
38
39 emitter
40 }
41
42 pub async fn connect_to_saga_commands(
43 &self,
44 ) -> Result<SagaEmitter, RabbitMQError> {
45 let queue_name = self.saga_queue_name.clone();
46
47 self.create_consumers(vec![QueueConsumerProps {
48 queue_name,
49 exchange: Exchange::COMMANDS,
50 }])
51 .await?;
52
53 let emitter = self.start_consuming_saga_commands().await;
54
55 Ok(emitter)
56 }
57
58 pub(crate) async fn start_consuming_saga_commands(&self) -> SagaEmitter {
59 let mut emitter_guard = self.saga_emitter.lock().await;
60 let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();
61
62 tokio::spawn({
63 let client = self.clone();
64 let queue_name = self.saga_queue_name.clone();
65 let emitter = emitter.clone();
66
67 async move {
68 if let Err(e) = client.consume_saga_steps(&queue_name, emitter).await {
69 error!("Error consuming messages: {:?}", e);
70 }
71 }
72 });
73
74 emitter
75 }
76}