mod common;
use common::NatsServer;
use futures::{future, stream::StreamExt};
use rants::Client;
#[tokio::test(threaded_scheduler)]
async fn echo() {
common::init();
let _nats_server = NatsServer::new(&[]).await;
let number_of_messages = 1024;
let address = "127.0.0.1".parse().unwrap();
let client = Client::new(vec![address]);
let subject = "test".parse().unwrap();
assert!(client
.publish(&subject, b"test")
.await
.unwrap_err()
.not_connected());
client
.connect_mut()
.await
.verbose(true)
.pedantic(true)
.echo(true);
client.connect().await;
let subject = "test".parse().unwrap();
let (_, subscription) = client
.subscribe(&subject, number_of_messages)
.await
.unwrap();
let mut publishers = Vec::new();
for i in 0..number_of_messages {
let client = Client::clone(&client);
publishers.push(async move {
let subject = "test".parse().unwrap();
let message = format!("{}", i);
client.publish(&subject, message.as_bytes()).await.unwrap();
});
}
future::join_all(publishers).await;
let mut messages = subscription
.take(number_of_messages)
.map(|msg| {
String::from_utf8(msg.into_payload())
.unwrap()
.parse::<usize>()
.unwrap()
})
.collect::<Vec<usize>>()
.await;
messages.sort();
assert_eq!(messages, (0..number_of_messages).collect::<Vec<usize>>());
client.disconnect().await;
}