#[cfg(feature = "websockets")]
mod websockets {
use std::path::PathBuf;
use futures_util::StreamExt;
#[tokio::test]
async fn core() {
let _server = nats_server::run_server("tests/configs/ws.conf");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let client = async_nats::ConnectOptions::new()
.retry_on_initial_connect()
.connect("ws://localhost:8444")
.await
.unwrap();
let mut sub = client.subscribe("foo").await.unwrap();
client.publish("foo", "hello".into()).await.unwrap();
assert_eq!(sub.next().await.unwrap().payload, "hello");
let payload = bytes::Bytes::from(vec![22; 1024 * 1024]);
let mut sub = client.subscribe("foo").await.unwrap().take(10);
for _ in 0..10 {
client.publish("foo", payload.clone()).await.unwrap();
}
while let Some(msg) = sub.next().await {
assert_eq!(msg.payload, payload);
}
let mut requests = client.subscribe("foo").await.unwrap();
tokio::task::spawn({
let client = client.clone();
async move {
let request = requests.next().await.unwrap();
client
.publish(request.reply.unwrap(), request.payload)
.await
.unwrap();
}
});
let response = client.request("foo", "hello".into()).await.unwrap();
assert_eq!(response.payload, "hello");
}
#[tokio::test]
async fn tls() {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let _server = nats_server::run_server("tests/configs/ws_tls.conf");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let client = async_nats::ConnectOptions::new()
.user_and_password("derek".into(), "porkchop".into())
.add_root_certificates(path.join("tests/configs/certs/rootCA.pem"))
.connect("wss://localhost:8445")
.await
.unwrap();
let mut sub = client.subscribe("foo").await.unwrap();
client.publish("foo", "hello".into()).await.unwrap();
assert_eq!(sub.next().await.unwrap().payload, "hello");
}
}