scouter-events 0.21.0

Logic for setting up and running Scouter event-driven consumers and producers
#[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
pub use crate::producer::kafka::KafkaProducer;

#[cfg(feature = "rabbitmq")]
pub use crate::producer::rabbitmq::RabbitMQProducer;

#[cfg(feature = "redis_events")]
use crate::producer::redis::RedisProducer;

use crate::error::EventError;
use crate::producer::grpc::GrpcProducer;
pub use crate::producer::http::HttpProducer;
pub use crate::producer::kafka::KafkaConfig;
pub use crate::producer::mock::{MockConfig, MockProducer};
pub use crate::producer::rabbitmq::RabbitMQConfig;
use crate::queue::types::TransportConfig;
use scouter_types::MessageRecord;
use tracing::debug;

#[derive(Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ProducerEnum {
    HTTP(HttpProducer),

    Grpc(GrpcProducer),

    Mock(MockProducer),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    Kafka(KafkaProducer),

    #[cfg(feature = "rabbitmq")]
    RabbitMQ(RabbitMQProducer),

    #[cfg(feature = "redis_events")]
    Redis(RedisProducer),
}

impl ProducerEnum {
    pub async fn publish(&self, message: MessageRecord) -> Result<(), EventError> {
        match self {
            ProducerEnum::HTTP(producer) => producer.publish(message).await,
            ProducerEnum::Grpc(producer) => producer.publish(message).await,
            ProducerEnum::Mock(producer) => producer.publish(message).await,
            #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
            ProducerEnum::Kafka(producer) => producer.publish(message).await,
            #[cfg(feature = "rabbitmq")]
            ProducerEnum::RabbitMQ(producer) => producer.publish(message).await,
            #[cfg(feature = "redis_events")]
            ProducerEnum::Redis(producer) => producer.publish(message).await,
        }
    }

    pub async fn flush(&self) -> Result<(), EventError> {
        match self {
            ProducerEnum::HTTP(producer) => producer.flush().await,
            ProducerEnum::Grpc(producer) => producer.flush().await,
            ProducerEnum::Mock(producer) => producer.flush().await,
            #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
            ProducerEnum::Kafka(producer) => producer.flush(),
            #[cfg(feature = "rabbitmq")]
            ProducerEnum::RabbitMQ(producer) => producer.flush().await,
            #[cfg(feature = "redis_events")]
            ProducerEnum::Redis(producer) => producer.flush().await,
        }
    }
}

/// Underlying Enum used with feature queues
#[derive(Clone)]
pub struct RustScouterProducer {
    producer: ProducerEnum,
}

impl RustScouterProducer {
    pub async fn new(config: TransportConfig) -> Result<Self, EventError> {
        let producer = match config {
            TransportConfig::RabbitMQ(_config) => {
                #[cfg(feature = "rabbitmq")]
                {
                    let producer = RabbitMQProducer::new(_config).await?;
                    debug!("Creating RabbitMQ producer");
                    ProducerEnum::RabbitMQ(producer)
                }
                #[cfg(not(feature = "rabbitmq"))]
                {
                    return Err(EventError::RabbitMQFeatureNotEnabledError);
                }
            }
            TransportConfig::Kafka(_config) => {
                #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
                {
                    debug!("Creating Kafka producer");
                    ProducerEnum::Kafka(KafkaProducer::new(_config)?)
                }
                #[cfg(not(any(feature = "kafka", feature = "kafka-vendored")))]
                {
                    return Err(EventError::KafkaFeatureNotEnabledError);
                }
            }
            TransportConfig::Redis(_config) => {
                #[cfg(feature = "redis_events")]
                {
                    let producer = RedisProducer::new(_config).await?;
                    debug!("Creating Redis producer");
                    ProducerEnum::Redis(producer)
                }
                #[cfg(not(feature = "redis_events"))]
                {
                    return Err(EventError::RedisFeatureNotEnabledError);
                }
            }
            TransportConfig::Http(config) => {
                debug!("Creating HTTP producer");
                let producer = HttpProducer::new(config).await?;
                ProducerEnum::HTTP(producer)
            }
            TransportConfig::Grpc(config) => {
                let producer = GrpcProducer::new(config).await?;
                debug!("Creating gRPC producer");
                ProducerEnum::Grpc(producer)
            }
            TransportConfig::Mock(config) => {
                let producer = MockProducer::new(config).await?;
                debug!("Creating Mock producer");
                ProducerEnum::Mock(producer)
            }
        };

        Ok(RustScouterProducer { producer })
    }

    pub async fn publish(&self, message: MessageRecord) -> Result<(), EventError> {
        debug!("message length: {}", message.len());
        self.producer.publish(message).await
    }

    pub async fn flush(&self) -> Result<(), EventError> {
        self.producer.flush().await
    }
}