use amqp_client_rust::api::utils::{ContentEncoding};
use amqp_client_rust::{
api::eventbus::AsyncEventbusRabbitMQ,
domain::config::QoSConfig
};
use tokio::{self, sync::Mutex};
use uuid::Uuid;
use std::{sync::Arc};
use std::time::Duration;
mod base;
use base::create_test_config;
#[tokio::test]
async fn test_publish_and_subscribe() {
let config = create_test_config();
let eventbus = AsyncEventbusRabbitMQ::new(config, QoSConfig::default());
let exchange_name = "test_exchange";
let routing_key = format!("test_routing_key_{}", Uuid::new_v4());
let test_message = "Hello, RabbitMQ!".as_bytes();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let tx = Arc::new(Mutex::new(tx));
eventbus.subscribe(
exchange_name,
routing_key.as_str(),
move |message| {
let tx = Arc::clone(&tx);
Box::pin(async move {
let _ = tx.lock().await.send(message).await;
Ok(())
})
},
None, Some(Duration::from_secs(5)),
).await.expect("Failed to subscribe");
eventbus.publish(
exchange_name,
routing_key.as_str(),
test_message,
Some("text/plain"),
ContentEncoding::None,
Some(Duration::from_secs(5)),
None,
None,
).await.expect("Failed to publish message");
let received_message = tokio::time::timeout(Duration::from_secs(10), rx.recv())
.await
.expect("Timed out waiting for message")
.expect("Failed to receive message");
assert_eq!(received_message.body, test_message.into(), "Received message does not match sent message");
assert!(eventbus.dispose().await.is_ok());
}
#[tokio::test]
async fn test_rpc_client_and_server() {
let config = create_test_config();
let eventbus = AsyncEventbusRabbitMQ::new(config.clone(), QoSConfig::default());
let routing_key = format!("test_rpc_routing_key_{}", Uuid::new_v4());
let test_message = "RPC request".as_bytes().to_vec();
let _ = eventbus.provide_resource(
routing_key.as_str(),
|request| {
Box::pin(async move {
println!("Get request: {:?}", request);
let response = format!("Processed: {}", String::from_utf8_lossy(&request.body));
println!("Send request: {:?}, {}", response.as_bytes().to_vec(), response);
Ok(amqp_client_rust::api::utils::Message { body: response.as_bytes().into(), content_type: Some("text/plain".to_string()) })
})
},
Some(Duration::from_secs(5)),
Some(Duration::from_secs(10)),
).await;
let rpc_result = eventbus.rpc_client(
config.options.rpc_exchange_name.as_str(),
routing_key.as_str(),
test_message,
"text/plain",
ContentEncoding::None,
5000, Some(Duration::from_secs(5)),
None,
None,
).await;
assert!(rpc_result.is_ok(), "RPC call failed: {:?}", rpc_result.err());
let rpc_result = rpc_result.unwrap();
let expected_response = "Processed: RPC request".as_bytes().to_vec();
assert_eq!(rpc_result, expected_response, "RPC response does not match expected result");
assert!(eventbus.dispose().await.is_ok());
}