mod client {
use async_nats::client::Request;
use async_nats::connection::State;
use async_nats::header::HeaderValue;
use async_nats::{
ConnectErrorKind, ConnectOptions, Event, RequestErrorKind, ServerAddr, Subject,
};
use bytes::Bytes;
use futures_util::future::join_all;
use futures_util::stream::StreamExt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
#[tokio::test]
async fn force_reconnect() {
let (dctx, mut dcrx) = tokio::sync::mpsc::channel(1);
let (rctx, mut rcrx) = tokio::sync::mpsc::channel(1);
let server = nats_server::run_basic_server();
let client = async_nats::ConnectOptions::new()
.event_callback(move |event| {
let dctx = dctx.clone();
let rctx = rctx.clone();
async move {
match event {
Event::Disconnected => dctx.send(()).await.unwrap(),
Event::Connected => rctx.send(()).await.unwrap(),
_ => (),
}
}
})
.connect(server.client_url())
.await
.unwrap();
let mut sub = client.subscribe("foo").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.force_reconnect().await.unwrap();
tokio::time::timeout(Duration::from_secs(5), async {
rcrx.recv().await.unwrap();
dcrx.recv().await.unwrap();
rcrx.recv().await.unwrap();
})
.await
.unwrap();
client.publish("foo", "data".into()).await.unwrap();
tokio::time::timeout(Duration::from_secs(5), sub.next())
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn basic_pub_sub() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut subscriber = client.subscribe("foo").await.unwrap();
for _ in 0..10 {
client.publish("foo", "data".into()).await.unwrap()
}
client.flush().await.unwrap();
let mut i = 0;
while tokio::time::timeout(tokio::time::Duration::from_millis(500), subscriber.next())
.await
.unwrap()
.is_some()
{
i += 1;
if i >= 10 {
break;
}
}
assert_eq!(i, 10);
}
#[tokio::test]
async fn queue_sub() {
let server = nats_server::run_basic_server();
const NUM_SUBSCRIBERS: usize = 3;
const NUM_ITEMS: usize = 20;
let mut subscribers = Vec::new();
let client = async_nats::connect(server.client_url()).await.unwrap();
for _i in 0..NUM_SUBSCRIBERS {
subscribers.push(
client
.queue_subscribe("qfoo", "group".into())
.await
.unwrap(),
);
}
for _ in 0..NUM_ITEMS {
client.publish("qfoo", "data".into()).await.unwrap();
}
client.flush().await.unwrap();
let mut results = Vec::new();
for mut subscriber in subscribers.into_iter() {
results.push(tokio::spawn(async move {
let mut count = 0u32;
while let Ok(Some(_)) = tokio::time::timeout(
tokio::time::Duration::from_millis(1000),
subscriber.next(),
)
.await
{
count += 1;
}
count
}));
}
let counts = join_all(results.iter_mut())
.await
.into_iter()
.filter_map(|n| n.ok())
.collect::<Vec<u32>>();
let total: u32 = counts.iter().sum();
assert_eq!(total, NUM_ITEMS as u32, "all items received");
let num_receivers = counts.into_iter().filter(|n| *n > 0u32).count();
assert!(num_receivers > 1, "should not all go to single subscriber");
}
#[tokio::test]
async fn cloned_client() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut subscriber = client.clone().subscribe("foo").await.unwrap();
let cloned_client = client.clone();
for _ in 0..10 {
cloned_client.publish("foo", "data".into()).await.unwrap();
}
let mut i = 0;
while tokio::time::timeout(tokio::time::Duration::from_millis(500), subscriber.next())
.await
.unwrap()
.is_some()
{
i += 1;
if i >= 10 {
break;
}
}
assert_eq!(i, 10);
}
#[tokio::test]
async fn publish_with_headers() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut subscriber = client.subscribe("test").await.unwrap();
let mut headers = async_nats::HeaderMap::new();
headers.insert("X-Test", HeaderValue::from_str("Test").unwrap());
client
.publish_with_headers("test", headers.clone(), b"".as_ref().into())
.await
.unwrap();
client.flush().await.unwrap();
let message = subscriber.next().await.unwrap();
assert_eq!(message.headers.unwrap(), headers);
let mut headers = async_nats::HeaderMap::new();
headers.insert("X-Test", HeaderValue::from_str("Test").unwrap());
headers.append("X-Test", "Second");
client
.publish_with_headers("test", headers.clone(), "test".into())
.await
.unwrap();
let message = subscriber.next().await.unwrap();
assert_eq!(message.headers.unwrap(), headers);
}
#[tokio::test]
async fn publish_request() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
tokio::spawn({
let client = client.clone();
async move {
let msg = sub.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "resp".into())
.await
.unwrap();
}
});
let inbox = client.new_inbox();
let mut insub = client.subscribe(inbox.clone()).await.unwrap();
client
.publish_with_reply("test", inbox, "data".into())
.await
.unwrap();
assert!(insub.next().await.is_some());
}
#[tokio::test]
async fn request() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
tokio::spawn({
let client = client.clone();
async move {
let msg = sub.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "reply".into())
.await
.unwrap();
}
});
let resp = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
client.request("test", "request".into()),
)
.await
.unwrap();
assert_eq!(resp.unwrap().payload, Bytes::from("reply"));
}
#[tokio::test]
async fn request_timeout() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let _sub = client.subscribe("service").await.unwrap();
client.flush().await.unwrap();
let err = client
.request("service", "payload".into())
.await
.unwrap_err();
assert_eq!(err.kind(), RequestErrorKind::TimedOut)
}
#[tokio::test]
async fn request_no_responders() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let err = tokio::time::timeout(
tokio::time::Duration::from_millis(300),
client.request("test", "request".into()),
)
.await
.unwrap()
.unwrap_err();
assert_eq!(RequestErrorKind::NoResponders, err.kind());
}
#[tokio::test]
async fn request_builder() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let inbox: Subject = "CUSTOMIZED".into();
let mut sub = client.subscribe("service").await.unwrap();
tokio::task::spawn({
let client = client.clone();
let inbox = inbox.clone();
async move {
let request = sub.next().await.unwrap();
let reply = request.reply.unwrap();
assert_eq!(reply, inbox);
client.publish(reply, "ok".into()).await.unwrap();
client.flush().await.unwrap();
}
});
let request = Request::new().inbox(inbox.to_string());
client.send_request("service", request).await.unwrap();
}
#[tokio::test]
async fn unsubscribe() {
use std::error::Error;
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
let result = sub.unsubscribe().await;
match result {
Ok(()) => println!("ok"),
Err(err) => {
println!("error: {err}");
println!("source: {:?}", err.source())
}
}
let mut sub2 = client.subscribe("test2").await.unwrap();
client.publish("test2", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub2.next().await.is_some());
}
#[tokio::test]
async fn unsubscribe_after() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
for _ in 0..2 {
client.publish("test", "data".into()).await.unwrap();
}
sub.unsubscribe_after(3).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
for _ in 0..3 {
assert!(sub.next().await.is_some());
}
assert!(sub.next().await.is_none());
}
#[tokio::test]
async fn unsubscribe_after_immediate() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
sub.unsubscribe_after(1).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_none());
}
#[tokio::test]
async fn connect_invalid() {
assert!(async_nats::connect("localhost:1111").await.is_err());
}
#[tokio::test]
async fn connect_domain() {
assert!(async_nats::connect("demo.nats.io").await.is_ok());
}
#[tokio::test]
async fn connect_invalid_tls_over_ip() {
let server = nats_server::run_basic_server();
assert!(async_nats::ConnectOptions::new()
.require_tls(true)
.connect(server.client_url())
.await
.is_err());
}
#[cfg(not(target_os = "windows"))]
#[tokio::test]
async fn reconnect_fallback() {
use async_nats::ServerAddr;
let mut servers = vec![
nats_server::run_basic_server(),
nats_server::run_basic_server(),
nats_server::run_basic_server(),
];
let client = async_nats::ConnectOptions::new()
.connect(
servers
.iter()
.map(|server| server.client_url().parse::<ServerAddr>().unwrap())
.collect::<Vec<ServerAddr>>()
.as_slice(),
)
.await
.unwrap();
let mut subscriber = client.subscribe("test").await.unwrap();
while !servers.is_empty() {
assert_eq!(State::Connected, client.connection_state());
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(subscriber.next().await.is_some());
drop(servers.remove(0));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
#[tokio::test]
async fn token_auth() {
let server = nats_server::run_server("tests/configs/token.conf");
let client = async_nats::ConnectOptions::with_token("s3cr3t".into())
.connect(server.client_url())
.await
.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "test".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
}
#[tokio::test]
async fn user_pass_auth() {
let server = nats_server::run_server("tests/configs/user_pass.conf");
let client =
async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t".into())
.connect(server.client_url())
.await
.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "test".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
}
#[tokio::test]
async fn required_auth_not_provided() {
let server = nats_server::run_server("tests/configs/user_pass.conf");
let err = async_nats::ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap_err()
.kind();
assert_eq!(ConnectErrorKind::AuthorizationViolation, err);
}
#[tokio::test]
async fn user_pass_auth_wrong_pass() {
let server = nats_server::run_server("tests/configs/user_pass.conf");
let err = async_nats::ConnectOptions::with_user_and_password(
"derek".into(),
"bad_password".into(),
)
.connect(server.client_url())
.await
.unwrap_err();
assert_eq!(ConnectErrorKind::AuthorizationViolation, err.kind());
}
#[tokio::test]
async fn connection_callbacks() {
let server = nats_server::run_basic_server();
let port = server.client_port().to_string();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let (dc_tx, mut dc_rx) = tokio::sync::mpsc::channel(128);
let client = async_nats::ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
let dc_tx = dc_tx.clone();
async move {
if let Event::Connected = event {
println!("reconnection callback fired");
tx.send(()).await.unwrap();
}
if let Event::Disconnected = event {
println!("disconnect callback fired");
dc_tx.send(()).await.unwrap();
}
}
})
.connect(server.client_url())
.await
.unwrap();
println!("connected");
client.subscribe("test").await.unwrap();
client.flush().await.unwrap();
println!("dropped server {:?}", server.client_url());
drop(server);
tokio::time::sleep(Duration::from_secs(3)).await;
let _server = nats_server::run_server_with_port("", Some(port.as_str()));
tokio::time::timeout(Duration::from_secs(15), dc_rx.recv())
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(15), rx.recv())
.await
.unwrap()
.unwrap();
}
#[tokio::test]
#[cfg_attr(target_os = "windows", ignore)]
async fn lame_duck_callback() {
let server = nats_server::run_basic_server();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let client = ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::LameDuckMode = event {
tx.send(()).await.unwrap();
}
}
})
.connect(server.client_url())
.await
.unwrap();
let mut sub = client.subscribe("data").await.unwrap();
client.publish("data", "data".into()).await.unwrap();
sub.next().await.unwrap();
nats_server::set_lame_duck_mode(&server);
tokio::time::timeout(Duration::from_secs(10), rx.recv())
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn slow_consumers() {
let server = nats_server::run_basic_server();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let client = ConnectOptions::new()
.subscription_capacity(1)
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::SlowConsumer(_) = event {
tx.send(()).await.unwrap()
}
}
})
.connect(server.client_url())
.await
.unwrap();
let _sub = client.subscribe("data").await.unwrap();
client.publish("data", "data".into()).await.unwrap();
client.publish("data", "data".into()).await.unwrap();
client.flush().await.unwrap();
client.publish("data", "data".into()).await.unwrap();
client.flush().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn no_echo() {
let server = nats_server::run_basic_server();
let client = ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap();
let mut subscription = client.subscribe("echo").await.unwrap();
client.publish("echo", "data".into()).await.unwrap();
tokio::time::timeout(Duration::from_millis(500), subscription.next())
.await
.unwrap();
let server = nats_server::run_basic_server();
let client = ConnectOptions::new()
.no_echo()
.connect(server.client_url())
.await
.unwrap();
let mut subscription = client.subscribe("echo").await.unwrap();
client.publish("echo", "data".into()).await.unwrap();
tokio::time::timeout(Duration::from_millis(50), subscription.next())
.await
.expect_err("should timeout");
}
#[tokio::test]
async fn reconnect_failures() {
let server = nats_server::run_basic_server();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let _client = ConnectOptions::new()
.event_callback(move |err| {
let tx = tx.clone();
async move {
tx.send(err.to_string()).unwrap();
}
})
.connect(server.client_url())
.await
.unwrap();
drop(server);
rx.recv().await;
rx.recv().await;
rx.recv().await;
rx.recv().await;
}
#[tokio::test]
async fn reconnect_delay_callback_custom() {
let server = nats_server::run_basic_server();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let _ = ConnectOptions::new()
.retry_on_initial_connect()
.reconnect_delay_callback(move |attempts| {
let tx = tx.clone();
let duration = std::time::Duration::from_millis(std::cmp::min(
((attempts - 1) * 500) as u64,
1500,
));
tx.send((attempts, duration)).unwrap();
duration
})
.connect(server.client_url())
.await
.unwrap();
drop(server);
let (attempt, duration) = rx.recv().await.unwrap();
assert_eq!(attempt, 1);
assert_eq!(duration.as_millis(), 0);
let (attempt, duration) = rx.recv().await.unwrap();
assert_eq!(attempt, 2);
assert_eq!(duration.as_millis(), 500);
let (attempt, duration) = rx.recv().await.unwrap();
assert_eq!(attempt, 3);
assert_eq!(duration.as_millis(), 1000);
let (attempt, duration) = rx.recv().await.unwrap();
assert_eq!(attempt, 4);
assert_eq!(duration.as_millis(), 1500);
let (attempt, duration) = rx.recv().await.unwrap();
assert_eq!(attempt, 5);
assert_eq!(duration.as_millis(), 1500);
}
#[tokio::test]
async fn connect_timeout() {
let startup_listener = std::sync::Arc::new(tokio::sync::Notify::new());
let startup_signal = startup_listener.clone();
let startup_notified = startup_listener.notified();
tokio::spawn(async move {
let socket = tokio::net::TcpSocket::new_v4()?;
socket.set_reuseaddr(true)?;
socket.bind("127.0.0.1:4848".parse().unwrap())?;
let _listener = if cfg!(target_os = "macos") {
socket.listen(1)?
} else {
socket.listen(0)?
};
startup_signal.notify_waiters();
startup_signal.notified().await;
Ok::<(), std::io::Error>(())
});
startup_notified.await;
let _hanger = tokio::net::TcpStream::connect("127.0.0.1:4848")
.await
.unwrap();
let timeout_result = ConnectOptions::new()
.connection_timeout(tokio::time::Duration::from_millis(200))
.connect("nats://127.0.0.1:4848")
.await;
assert_eq!(
timeout_result.unwrap_err().kind(),
ConnectErrorKind::TimedOut
);
startup_listener.notify_one();
}
#[tokio::test]
async fn inbox_prefix() {
let server = nats_server::run_basic_server();
let client = ConnectOptions::new()
.custom_inbox_prefix("BOB")
.connect(server.client_url())
.await
.unwrap();
let mut inbox_wildcard_subscription = client.subscribe("BOB.>").await.unwrap();
let mut subscription = client.subscribe("request").await.unwrap();
tokio::task::spawn({
let client = client.clone();
async move {
let msg = subscription.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "prefix workers".into())
.await
.unwrap();
}
});
client.request("request", "data".into()).await.unwrap();
inbox_wildcard_subscription.next().await.unwrap();
}
#[tokio::test]
async fn connection_state() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
assert_eq!(State::Connected, client.connection_state());
drop(server);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(State::Disconnected, client.connection_state());
}
#[tokio::test]
async fn publish_error_should_be_nameable() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let _error: Result<(), async_nats::PublishError> =
client.publish("foo", "data".into()).await;
}
#[tokio::test]
async fn retry_on_initial_connect() {
let _client = ConnectOptions::new()
.connect("localhost:7779")
.await
.expect_err("should fail to connect");
let client = ConnectOptions::new()
.event_callback(|ev| async move {
println!("event: {ev}");
})
.retry_on_initial_connect()
.connect("localhost:7779")
.await
.unwrap();
let mut sub = client.subscribe("DATA").await.unwrap();
client.publish("DATA", "payload".into()).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let _server = nats_server::run_server_with_port("", Some("7779"));
sub.next().await.unwrap();
}
#[tokio::test]
async fn retained_servers_order() {
let mut servers = vec![
nats_server::run_basic_server(),
nats_server::run_basic_server(),
];
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let _ = ConnectOptions::with_user_and_password("js".into(), "js".into())
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::Disconnected = event {
tx.send(()).unwrap();
}
}
})
.retain_servers_order()
.connect(
servers
.iter()
.map(|s| s.client_url().parse::<ServerAddr>().unwrap())
.collect::<Vec<ServerAddr>>(),
)
.await
.unwrap();
drop(servers.remove(0));
rx.recv().await;
}
#[tokio::test]
async fn multiple_auth_methods() {
use async_nats::ServerAddr;
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let mut servers = vec![
nats_server::run_basic_server(),
nats_server::run_server("tests/configs/jwt.conf"),
nats_server::run_server("tests/configs/token.conf"),
];
let client = async_nats::ConnectOptions::new()
.user_and_password("js".into(), "js".into())
.token("s3cr3t".into())
.credentials_file(path.join("tests/configs/TestUser.creds"))
.await
.unwrap()
.connect(
servers
.iter()
.map(|server| server.client_url().parse::<ServerAddr>().unwrap())
.collect::<Vec<ServerAddr>>()
.as_slice(),
)
.await
.unwrap();
let mut subscriber = client.subscribe("test").await.unwrap();
while !servers.is_empty() {
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(subscriber.next().await.is_some());
drop(servers.remove(0));
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
}
#[tokio::test]
async fn custom_auth_callback() {
let server = nats_server::run_server("tests/configs/user_pass.conf");
ConnectOptions::with_auth_callback(move |_| async move {
let mut auth = async_nats::Auth::new();
auth.username = Some("derek".to_string());
auth.password = Some("s3cr3t".to_string());
Ok(auth)
})
.connect(server.client_url())
.await
.unwrap();
}
#[tokio::test]
async fn custom_auth_callback_jwt() {
let server = nats_server::run_server("tests/configs/jwt.conf");
ConnectOptions::with_auth_callback(move |nonce| async move {
let mut auth = async_nats::Auth::new();
auth.jwt = Some("eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.".to_owned() +
"eyJqdGkiOiJMN1dBT1hJU0tPSUZNM1QyNEhMQ09ENzJRT1czQkNVWEdETjRKVU1SSUtHTlQ3RzdZVFRRIiwiaWF0IjoxNjUxNzkwOTgyLCJpc3MiOiJBRFRRUzdaQ0ZWSk5XNTcyNkdPWVhXNVRTQ1pGTklRU0hLMlpHWVVCQ0Q1RDc3T1ROTE9PS1pPWiIsIm5hbWUiOiJUZXN0V" +
"XNlciIsInN1YiI6IlVBRkhHNkZVRDJVVTRTREZWQUZVTDVMREZPMlhNNFdZTTc2VU5YVFBKWUpLN0VFTVlSQkhUMlZFIiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ." +
"bp2-Jsy33l4ayF7Ku1MNdJby4WiMKUrG-rSVYGBusAtV3xP4EdCa-zhSNUaBVIL3uYPPCQYCEoM1pCUdOnoJBg");
let key_pair = nkeys::KeyPair::from_seed("SUACH75SWCM5D2JMJM6EKLR2WDARVGZT4QC6LX3AGHSWOMVAKERABBBRWM").unwrap();
let sign = key_pair.sign(&nonce).map_err(async_nats::AuthError::new)?;
auth.signature = Some(sign);
Ok(auth)
})
.connect(server.client_url())
.await
.unwrap();
}
#[tokio::test]
async fn max_reconnects() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let _client = ConnectOptions::new()
.max_reconnects(5)
.retry_on_initial_connect()
.event_callback(move |event| {
let tx = tx.clone();
async move {
println!("event: {event}");
tx.send(event).unwrap();
}
})
.connect("localhost:7778")
.await
.unwrap();
for _ in 0..5 {
match rx.recv().await.unwrap() {
Event::ClientError(async_nats::ClientError::Other(_)) => (),
other => panic!("unexpected event: {other:?}"),
};
}
assert_eq!(
rx.recv().await.unwrap(),
Event::ClientError(async_nats::ClientError::MaxReconnects)
);
}
#[tokio::test]
async fn publish_payload_size() {
let server = nats_server::run_server("tests/configs/max_payload.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let payload = vec![0u8; 1024 * 1024];
client.publish("big", payload.into()).await.unwrap_err();
client.publish("small", "data".into()).await.unwrap();
client
.publish("just_ok", vec![0u8; 1024 * 128].into())
.await
.unwrap();
}
#[tokio::test]
async fn client_statistics() {
let server = nats_server::run_basic_server();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let client = async_nats::ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::Connected = event {
tx.send(()).await.unwrap();
}
}
})
.connect(server.client_url())
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let stats = client.statistics();
assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0);
assert!(stats.in_bytes.load(Ordering::Relaxed) != 0);
assert!(stats.out_bytes.load(Ordering::Relaxed) != 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 1);
let mut responder = client.subscribe("request").await.unwrap();
tokio::task::spawn({
let client = client.clone();
async move {
let msg = responder.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "response".into())
.await
.unwrap();
}
});
client.request("request", "data".into()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
sub.next().await.unwrap();
sub.next().await.unwrap();
client.flush().await.unwrap();
client.force_reconnect().await.unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4);
assert!(stats.in_bytes.load(Ordering::Relaxed) != 0);
assert!(stats.out_bytes.load(Ordering::Relaxed) != 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn client_timeout() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
assert_eq!(client.timeout(), Some(Duration::from_secs(10)));
let client = async_nats::ConnectOptions::new()
.request_timeout(Some(Duration::from_secs(30)))
.connect(server.client_url())
.await
.unwrap();
assert_eq!(client.timeout(), Some(Duration::from_secs(30)));
let client = async_nats::ConnectOptions::new()
.request_timeout(None)
.connect(server.client_url())
.await
.unwrap();
assert_eq!(client.timeout(), None);
}
#[tokio::test]
async fn drain_subscription_basic() {
use std::error::Error;
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
let result = sub.drain().await;
match result {
Ok(()) => println!("ok"),
Err(err) => {
println!("error: {err}");
println!("source: {:?}", err.source())
}
}
assert!(sub.next().await.is_none());
let mut sub2 = client.subscribe("test2").await.unwrap();
client.publish("test2", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub2.next().await.is_some());
}
#[tokio::test]
async fn drain_subscription_unsub_after() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
sub.unsubscribe_after(120)
.await
.expect("Expected to send unsub_after");
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
sub.drain().await.expect("Expected to drain the sub");
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_none());
}
#[tokio::test]
async fn drain_subscription_active() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let constant_writer = tokio::spawn({
let client = client.clone();
async move {
loop {
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
}
}
});
let mut sub = client.subscribe("test").await.unwrap();
assert!(sub.next().await.is_some());
sub.drain().await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
let sleep_fut = async move { while sub.next().await.is_some() {} };
tokio::time::timeout(Duration::from_secs(10), sleep_fut)
.await
.expect("Expected stream to drain within 10s");
assert!(!constant_writer.is_finished());
let mut sub2 = client.subscribe("test").await.unwrap();
assert!(sub2.next().await.is_some());
}
#[tokio::test]
async fn drain_client_basic() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub.next().await.is_some());
client.drain().await.unwrap();
assert!(sub.next().await.is_none());
client
.subscribe("test2")
.await
.expect_err("Expected client to be drained");
client
.publish("test", "data".into())
.await
.expect_err("Expected client to be drained");
let _client2 = async_nats::connect(server.client_url())
.await
.expect("Expected to be able to create a new client");
}
#[tokio::test]
async fn subject_validation_rejects_bad_subjects() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
client
.publish("bad subject", "data".into())
.await
.expect_err("publish should reject subject with spaces");
client
.subscribe("bad subject")
.await
.expect_err("subscribe should reject subject with spaces");
client
.publish_with_reply("valid", "bad reply", "data".into())
.await
.expect_err("publish_with_reply should reject reply subject with spaces");
let err = client
.request("bad subject", "data".into())
.await
.expect_err("request should reject subject with spaces");
assert_ne!(
err.kind(),
RequestErrorKind::TimedOut,
"expected a subject validation error, got timeout: {err:?}"
);
assert_ne!(
err.kind(),
RequestErrorKind::NoResponders,
"expected a subject validation error, got no-responders: {err:?}"
);
}
#[tokio::test]
async fn request_validates_subject() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let err = client
.request("bad subject", "data".into())
.await
.expect_err("request should reject subject with spaces");
assert_eq!(err.kind(), RequestErrorKind::InvalidSubject);
let err = client
.request_with_headers("bad subject", async_nats::HeaderMap::new(), "data".into())
.await
.expect_err("request_with_headers should reject subject with spaces");
assert_eq!(err.kind(), RequestErrorKind::InvalidSubject);
}
#[tokio::test]
async fn queue_subscribe_validates_queue_group() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
client
.queue_subscribe("events", "bad group".into())
.await
.expect_err("queue_subscribe should reject queue group with spaces");
client
.queue_subscribe("events", "bad\r\ngroup".into())
.await
.expect_err("queue_subscribe should reject queue group with CRLF");
client
.queue_subscribe("events", "bad\tgroup".into())
.await
.expect_err("queue_subscribe should reject queue group with tab");
client
.queue_subscribe("events", "".into())
.await
.expect_err("queue_subscribe should reject empty queue group");
client
.queue_subscribe("events", "workers".into())
.await
.expect("queue_subscribe should accept valid queue group");
}
#[tokio::test]
async fn skip_subject_validation_allows_bad_publish_subjects() {
let server = nats_server::run_basic_server();
let client = async_nats::ConnectOptions::new()
.skip_subject_validation(true)
.connect(server.client_url())
.await
.unwrap();
client
.publish("foo..bar", "data".into())
.await
.expect("publish should allow double dots when validation is skipped");
}
#[tokio::test]
async fn skip_subject_validation_still_validates_subscribe() {
let server = nats_server::run_basic_server();
let client = async_nats::ConnectOptions::new()
.skip_subject_validation(true)
.connect(server.client_url())
.await
.unwrap();
client
.subscribe("foo..bar")
.await
.expect_err("subscribe should reject double dots even when skip is enabled");
}
#[tokio::test]
async fn drain_subscription_deadlock() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut subscriber = client.subscribe("test").await.unwrap();
client.flush().await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
let start = Instant::now();
subscriber.drain().await.unwrap();
subscriber.next().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_secs() < 5,
"drain took too long: {:?} - bug likely present",
elapsed
);
}
#[tokio::test]
async fn local_address() {
let server = nats_server::run_basic_server();
let addr: std::net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let client = ConnectOptions::new()
.local_address(addr)
.connect(server.client_url())
.await
.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
}
#[tokio::test]
async fn local_address_with_port() {
let server = nats_server::run_basic_server();
let addr: std::net::SocketAddr = "127.0.0.1:19898".parse().unwrap();
let client = ConnectOptions::new()
.local_address(addr)
.connect(server.client_url())
.await
.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
}
#[tokio::test]
async fn handshake_timeout_no_info() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
let (_stream, _peer) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_secs(30)).await;
});
let start = Instant::now();
let result = ConnectOptions::new()
.connection_timeout(Duration::from_millis(500))
.connect(format!("nats://127.0.0.1:{}", addr.port()))
.await;
let elapsed = start.elapsed();
assert_eq!(
result.unwrap_err().kind(),
ConnectErrorKind::TimedOut,
"should time out when server never sends INFO"
);
assert!(
elapsed < Duration::from_secs(5),
"timeout should fire near 500ms, but took {:?}",
elapsed,
);
handle.abort();
}
#[tokio::test]
async fn handshake_timeout_no_pong() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
let (mut stream, _peer) = listener.accept().await.unwrap();
let info = format!("INFO {{\"server_id\":\"test\",\"server_name\":\"test\",\"version\":\"2.10.0\",\"proto\":1,\"host\":\"127.0.0.1\",\"port\":{},\"max_payload\":1048576}}\r\n", addr.port());
tokio::io::AsyncWriteExt::write_all(&mut stream, info.as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(30)).await;
});
let start = Instant::now();
let result = ConnectOptions::new()
.connection_timeout(Duration::from_millis(500))
.connect(format!("nats://127.0.0.1:{}", addr.port()))
.await;
let elapsed = start.elapsed();
assert_eq!(
result.unwrap_err().kind(),
ConnectErrorKind::TimedOut,
"should time out when server never sends PONG"
);
assert!(
elapsed < Duration::from_secs(5),
"timeout should fire near 500ms, but took {:?}",
elapsed,
);
handle.abort();
}
}