use crate::events::PayloadEvent;
use crate::queue_consumer_props::Exchange;
use lapin::{
options::BasicPublishOptions, options::ExchangeDeclareOptions, types::AMQPValue,
types::FieldTable, BasicProperties, ExchangeKind,
};
use serde::Serialize;
use crate::connection::{get_or_init_publish_channel, RabbitMQClient, RabbitMQError};
impl RabbitMQClient {
pub async fn publish_event<T: PayloadEvent + Serialize>(
payload: T,
) -> Result<(), RabbitMQError> {
let channel_arc = get_or_init_publish_channel().await?;
let channel = channel_arc.lock().await;
let event_type = payload.event_type();
let mut header_event = FieldTable::default();
header_event.insert(
event_type.as_ref().to_uppercase().into(),
AMQPValue::LongString(event_type.as_ref().into()),
);
header_event.insert("all-micro".into(), AMQPValue::LongString("yes".into()));
channel
.exchange_declare(
Exchange::MATCHING,
ExchangeKind::Headers,
ExchangeDeclareOptions {
durable: true,
..ExchangeDeclareOptions::default()
},
FieldTable::default(),
)
.await?;
let body = serde_json::to_vec(&payload)?;
channel
.basic_publish(
Exchange::MATCHING,
"",
BasicPublishOptions::default(),
&body,
BasicProperties::default()
.with_headers(header_event)
.with_content_type("application/json".into())
.with_delivery_mode(2), )
.await?;
Ok(())
}
}
#[cfg(test)]
mod test_publish_event {
use crate::events::AuthDeletedUserPayload;
use crate::events::MicroserviceEvent::AuthDeletedUser;
use crate::test::setup::{Config, TestSetup};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use tokio::sync::Barrier;
use crate::connection::AvailableMicroservices::Auth;
use crate::connection::RabbitMQClient;
#[test]
fn test_publish_event() {
let setup = TestSetup::new(Some(Config {
events: &[AuthDeletedUser],
microservice: Auth,
}));
setup.rt.block_on(async {
let auth_deleted_user_payload = AuthDeletedUserPayload {
user_id: "user1233".to_string(),
};
let barrier = Arc::new(Barrier::new(2));
let b_clone = barrier.clone();
let e = setup
.client
.connect_to_events()
.await
.expect("TODO: panic message");
e.on_with_async_handler(AuthDeletedUser, move |handler| {
let barrier = barrier.clone();
async move {
println!("{:?} {:?}", AuthDeletedUser, handler.get_payload());
let p: AuthDeletedUserPayload =
handler.parse_payload().expect("Error parsing payload");
assert_eq!(p.user_id, "user1233");
handler.ack().await.expect("Error acking message");
barrier.wait().await;
}
})
.await;
RabbitMQClient::publish_event(auth_deleted_user_payload)
.await
.expect("Error publishing AuthDeletedUserPayload event");
b_clone.wait().await;
});
}
#[test]
fn test_publish_with_reconnection_event() {
let setup = TestSetup::new(Some(Config {
events: &[AuthDeletedUser],
microservice: Auth,
}));
setup.rt.block_on(async {
let auth_deleted_user_payload = AuthDeletedUserPayload {
user_id: "user1233".to_string(),
};
let e = setup
.client
.connect_to_events()
.await
.expect("TODO: panic message");
let atomic_counter = Arc::new(AtomicUsize::new(0));
let first_barrier = Arc::new(Barrier::new(2));
let first_barrier_clone = first_barrier.clone();
let last_barrier = Arc::new(Barrier::new(2));
let last_barrier_clone = last_barrier.clone();
e.on_with_async_handler(AuthDeletedUser, move |handler| {
let first_barrier = first_barrier.clone();
let last_barrier = last_barrier.clone();
let a_counter = atomic_counter.clone();
async move {
println!("{:?} {:?}", AuthDeletedUser, handler.get_payload());
let p: AuthDeletedUserPayload =
handler.parse_payload().expect("Error parsing payload");
assert_eq!(p.user_id, "user1233");
let atomic_value = a_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
handler.ack().await.expect("Error acking message");
if atomic_value == 0 {
first_barrier.wait().await;
}
if atomic_value == 1 {
last_barrier.wait().await;
}
}
})
.await;
RabbitMQClient::publish_event(auth_deleted_user_payload.clone())
.await
.expect("Error publishing AuthDeletedUserPayload event");
first_barrier_clone.wait().await;
{
let conn = setup.client.current_connection().await.expect("No connection found").write().await;
conn.close(0, "Test disconnect")
.await
.expect("Failed to close connection");
}
setup
.client
.reconnect()
.await
.expect("Reconnection should succeed");
RabbitMQClient::publish_event(auth_deleted_user_payload)
.await
.expect("Error publishing AuthDeletedUserPayload event");
last_barrier_clone.wait().await;
});
}
}