#[cfg(test)]
pub(crate) mod setup {
fn get_log_level() -> Level {
let key = "LOG_LEVEL";
if let Ok(value) = env::var(key) {
if !value.is_empty() {
if let Ok(level) = value.trim().to_uppercase().parse() {
return level;
}
}
}
Level::INFO
}
pub fn tracing_subscriber() {
tracing_subscriber::fmt()
.with_max_level(get_log_level())
.init();
}
#[ctor::ctor]
fn init() {
if let Ok(value) = env::var("LOG_LEVEL") {
if !value.is_empty() {
tracing_subscriber()
}
}
}
use crate::events::MicroserviceEvent;
use futures::Stream;
use futures::StreamExt;
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
QueueDeleteOptions,
};
use crate::connection::{AvailableMicroservices, RabbitMQClient, RabbitMQError};
use lapin::types::FieldTable;
use lapin::BasicProperties;
use rand::distr::StandardUniform;
use rand::prelude::Distribution;
use rand::Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::env;
use tokio::runtime::Runtime;
use tracing::{debug, error, info, Level};
pub const TEST_QUEUE: &str = "test_queue";
pub const RABBIT_URI: &str = "amqp://rabbit:1234@localhost:5672";
pub struct TestSetup {
pub rt: Runtime,
pub client: RabbitMQClient,
}
impl RabbitMQClient {
#[cfg(test)]
pub(crate) async fn declare_queue(
&self,
queue_name: &str,
options: QueueDeclareOptions,
arguments: FieldTable,
) -> Result<(), RabbitMQError> {
let channel = self.events_channel.lock().await;
channel
.queue_declare(queue_name, options, arguments)
.await?;
info!("Queue declared: {}", queue_name);
Ok(())
}
#[cfg(test)]
pub(crate) async fn publish_message<T: Serialize>(
&self,
queue_name: &str,
payload: &T,
properties: BasicProperties,
) -> Result<(), RabbitMQError> {
let serialized = serde_json::to_vec(payload)?;
let channel = self.events_channel.lock().await;
channel
.basic_publish(
"",
queue_name,
BasicPublishOptions::default(),
&serialized,
properties,
)
.await?;
info!("Message published to queue: {}", queue_name);
Ok(())
}
#[cfg(test)]
#[allow(dead_code)]
async fn delete_queue(&self, queue_name: &str) -> Result<(), RabbitMQError> {
let channel = self.events_channel.lock().await;
channel
.queue_delete(queue_name, QueueDeleteOptions::default())
.await?;
info!("Queue deleted: {}", queue_name);
Ok(())
}
#[cfg(test)]
pub(crate) async fn consume_messages<T: DeserializeOwned>(
&self,
queue_name: &str,
options: BasicConsumeOptions,
) -> Result<impl Stream<Item = Result<T, RabbitMQError>>, RabbitMQError> {
let channel = self.events_channel.lock().await;
let consumer = channel
.basic_consume(queue_name, "my_consumer", options, FieldTable::default())
.await?;
info!("Started consuming messages from queue: {}", queue_name);
Ok(consumer.map(move |delivery| match delivery {
Ok(delivery) => match serde_json::from_slice(&delivery.data) {
Ok(parsed) => {
tokio::spawn(async move {
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
error!("Failed to acknowledge message: {:?}", e);
}
});
Ok(parsed)
}
Err(e) => {
error!("Failed to deserialize message: {:?}", e);
Err(RabbitMQError::SerializationError(e))
}
},
Err(err) => {
error!("Error receiving message: {:?}", err);
Err(RabbitMQError::from(err))
}
}))
}
}
pub struct Config {
pub events: &'static [MicroserviceEvent],
pub microservice: AvailableMicroservices,
}
impl Distribution<AvailableMicroservices> for StandardUniform {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> AvailableMicroservices {
use strum::IntoEnumIterator;
AvailableMicroservices::iter()
.nth(rng.random_range(0..AvailableMicroservices::iter().count()))
.unwrap()
}
}
pub fn random_microservice() -> AvailableMicroservices {
rand::rng().random()
}
impl TestSetup {
pub fn new(conf: Option<Config>) -> Self {
let conf = conf.unwrap_or_else(|| Config {
events: &[],
microservice: random_microservice(),
});
let rt = Runtime::new().unwrap();
let client = rt.block_on(async {
RabbitMQClient::new(RABBIT_URI, conf.microservice, Some(conf.events))
.await
.expect("Failed to create RabbitMQ client")
});
TestSetup { rt, client }
}
}
impl Drop for TestSetup {
fn drop(&mut self) {
self.rt.block_on(async {
let conn = self
.client
.current_connection()
.await
.expect("Cannot get the connection")
.read()
.await;
if !conn.status().connected() {
unreachable!(
"Connection is always guaranteed in `RabbitMQClient::get_connection`"
);
}
let delete_channel = conn.create_channel().await.unwrap();
let t = conn.topology();
for queue in t.queues {
debug!("DELETING QUEUE: {}", queue.name.to_string());
delete_channel
.queue_delete(&queue.name.to_string(), QueueDeleteOptions::default())
.await
.unwrap();
}
for exchange in t.exchanges {
debug!("DELETING EXCHANGE: {}", exchange.name.to_string());
delete_channel
.exchange_delete(&exchange.name.to_string(), Default::default())
.await
.unwrap();
}
debug!("RESTORED TOPOLOGY");
});
}
}
}