legend-saga 0.0.5

A Rust library for working with RabbitMQ and asynchronous operations
Documentation
use crate::emitter::Emitter;
use crate::events::{ MicroserviceEvent};
use crate::queue_consumer_props::{Exchange, QueueConsumerProps};
use crate::saga::{CommandHandler, StepCommand};
use tracing::error;
use crate::connection::{RabbitMQClient, RabbitMQError};
use crate::events_consume::EventHandler;
pub(crate) type EventEmitter = Emitter<EventHandler, MicroserviceEvent>;
pub(crate) type SagaEmitter = Emitter<CommandHandler, StepCommand>;

impl RabbitMQClient {
    pub async fn connect_to_events(
        &self,
    ) -> Result<EventEmitter, RabbitMQError> {
        let queue_name = self.events_queue_name.clone();
        self.create_header_consumers(&queue_name, self.events)
            .await?;
        let emitter = self.start_consuming_events().await;

        Ok(emitter)
    }

    pub(crate) async fn start_consuming_events(&self) -> EventEmitter {
        let mut emitter_guard = self.event_emitter.lock().await;
        let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();

        tokio::spawn({
            let client = self.clone();
            let queue_name = self.events_queue_name.clone();
            let emitter = emitter.clone();

            async move {
                if let Err(e) = client.consume_events(&queue_name, emitter).await {
                    error!("Error consuming messages: {:?}", e);
                }
            }
        });

        emitter
    }

    pub async fn connect_to_saga_commands(
        &self,
    ) -> Result<SagaEmitter, RabbitMQError> {
        let queue_name = self.saga_queue_name.clone();

        self.create_consumers(vec![QueueConsumerProps {
            queue_name,
            exchange: Exchange::COMMANDS,
        }])
        .await?;

        let emitter = self.start_consuming_saga_commands().await;

        Ok(emitter)
    }

    pub(crate) async fn start_consuming_saga_commands(&self) -> SagaEmitter {
        let mut emitter_guard = self.saga_emitter.lock().await;
        let emitter = emitter_guard.get_or_insert_with(Emitter::new).clone();

        tokio::spawn({
            let client = self.clone();
            let queue_name = self.saga_queue_name.clone();
            let emitter = emitter.clone();

            async move {
                if let Err(e) = client.consume_saga_steps(&queue_name, emitter).await {
                    error!("Error consuming messages: {:?}", e);
                }
            }
        });

        emitter
    }
}