use async_nats::service;
use rofr::ClientError;
use rofr::Cluster;
use rofr::Error;
use rofr::Request;
use rofr::RequestContext;
use rofr::Response;
use rofr::StreamContext;
use rofr::futures::StreamExt;
use rofr::service;
use serde::Deserialize;
use serde::Serialize;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Serialize, Deserialize)]
pub struct ExampleRequest {
input: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ExampleResponse {
output: String,
}
#[service(name = "test_service", version = "0.1.2")]
trait TestService {
type Context;
#[endpoint(subject = "response_only")]
async fn response_only(ctx: RequestContext<Self::Context>) -> Result<Response<()>, Error>;
#[endpoint(subject = "echo")]
async fn echo(
_ctx: RequestContext<Self::Context>,
body: Request<ExampleRequest>,
) -> Result<Response<ExampleResponse>, Error>;
}
#[derive(Debug)]
pub struct TestImpl;
impl TestService for TestImpl {
type Context = ();
async fn response_only(_ctx: RequestContext<Self::Context>) -> Result<Response<()>, Error> {
Ok(Response(()))
}
async fn echo(
_ctx: RequestContext<Self::Context>,
body: Request<ExampleRequest>,
) -> Result<Response<ExampleResponse>, Error> {
Ok(Response(ExampleResponse {
output: body.input.to_owned(),
}))
}
}
#[tokio::test]
async fn test_service_info() {
let server = nats_server::run_server("tests/nats/default.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let mut cluster = Cluster::new(server.client_url()).unwrap();
let test_service = TestImpl::service(());
cluster.register(test_service);
tokio::spawn(async move {
cluster.run().await.unwrap();
});
sleep(Duration::from_millis(100)).await;
let info: service::Info = serde_json::from_slice(
&client
.request("$SRV.INFO", "".into())
.await
.unwrap()
.payload,
)
.unwrap();
assert_eq!(info.version, "0.1.2");
assert_eq!(info.name, "test_service");
assert_eq!(info.endpoints.len(), 2);
}
#[tokio::test]
async fn test_service_echo() {
let server = nats_server::run_server("tests/nats/default.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let client = TestServiceClient::new(client);
let mut cluster = Cluster::new(server.client_url()).unwrap();
let test_service = TestImpl::service(());
cluster.register(test_service);
tokio::spawn(async move {
cluster.run().await.unwrap();
});
sleep(Duration::from_millis(100)).await;
let sample_input = "Example text goes in, example text goes out. You can't explain that.";
let response = client
.echo(ExampleRequest {
input: sample_input.to_owned(),
})
.await
.unwrap();
assert_eq!(response.output, sample_input);
}
#[tokio::test]
async fn test_cluster_no_services() {
let server = nats_server::run_server("tests/nats/default.conf");
let cluster = Cluster::new(server.client_url()).unwrap();
let result = tokio::time::timeout(Duration::from_millis(50), cluster.run()).await;
assert!(
result.is_err(),
"cluster without services exited immediately"
);
}
#[service(name = "test_service_no_endpoints", version = "0.1.2")]
trait TestServiceNoEndpoints {
type Context;
}
#[derive(Debug)]
struct TestServiceNoEndpointsImpl;
impl TestServiceNoEndpoints for TestServiceNoEndpointsImpl {
type Context = ();
}
#[tokio::test]
async fn test_serivce_no_endpoints() {
let server = nats_server::run_server("tests/nats/default.conf");
let mut cluster = Cluster::new(server.client_url()).unwrap();
let test_service = TestServiceNoEndpointsImpl::service(());
cluster.register(test_service);
let result = tokio::time::timeout(Duration::from_millis(50), cluster.run()).await;
assert!(
result.is_err(),
"cluster without services exited immediately"
);
}
#[service(name = "test_service_with_stream", version = "0.1.2")]
trait TestServiceWithStream {
type Context;
#[stream(
name = "TEST_STREAM",
subject = "test_stream_subject",
message = u64,
)]
async fn test_stream(ctx: StreamContext<Self::Context>) -> Result<(), Error>;
}
#[derive(Debug)]
struct TestServiceWithStreamImpl;
impl TestServiceWithStream for TestServiceWithStreamImpl {
type Context = ();
async fn test_stream(ctx: StreamContext<Self::Context>) -> Result<(), Error> {
ctx.send("test_stream_subject", &25)
.await? .await?;
Ok(())
}
}
#[tokio::test]
async fn test_service_stream() {
let server = nats_server::run_server("tests/nats/default.conf");
let mut cluster = Cluster::new(server.client_url()).unwrap();
let test_service = TestServiceWithStreamImpl::service(());
cluster.register(test_service);
tokio::spawn(async move {
cluster.run().await.unwrap();
});
sleep(Duration::from_millis(50)).await;
let client = async_nats::connect(server.client_url()).await.unwrap();
let client = TestServiceWithStreamClient::new(client);
let mut stream = client.test_stream().await.unwrap();
let response = stream.next().await;
assert!(response.is_some());
assert_eq!(response.unwrap().unwrap(), 25);
}
#[tokio::test]
async fn test_service_stream_not_found() {
let server = nats_server::run_server("tests/nats/default.conf");
let client =
TestServiceWithStreamClient::new(async_nats::connect(server.client_url()).await.unwrap());
let result = client.test_stream().await;
assert!(
matches!(result, Err(ClientError::Request(_))),
"expected a request error when the stream does not exist",
);
}
#[derive(Debug)]
struct TestServiceWithStreamMultipleImpl;
impl TestServiceWithStream for TestServiceWithStreamMultipleImpl {
type Context = ();
async fn test_stream(ctx: StreamContext<Self::Context>) -> Result<(), Error> {
for value in [10u64, 20, 30] {
ctx.send("test_stream_subject", &value).await?.await?;
}
Ok(())
}
}
#[tokio::test]
async fn test_service_stream_multiple_messages() {
let server = nats_server::run_server("tests/nats/default.conf");
let mut cluster = Cluster::new(server.client_url()).unwrap();
cluster.register(TestServiceWithStreamMultipleImpl::service(()));
tokio::spawn(async move {
cluster.run().await.unwrap();
});
sleep(Duration::from_millis(50)).await;
let client =
TestServiceWithStreamClient::new(async_nats::connect(server.client_url()).await.unwrap());
let mut stream = client.test_stream().await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), 10);
assert_eq!(stream.next().await.unwrap().unwrap(), 20);
assert_eq!(stream.next().await.unwrap().unwrap(), 30);
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Measurement {
sensor_id: u32,
value: f64,
}
#[service(name = "test_measurement_stream", version = "0.1.0")]
trait TestMeasurementStreamService {
type Context;
#[stream(
name = "MEASUREMENT_STREAM",
subject = "measurement",
message = Measurement,
)]
async fn measurement_stream(ctx: StreamContext<Self::Context>) -> Result<(), Error>;
}
#[derive(Debug)]
struct TestMeasurementStreamImpl;
impl TestMeasurementStreamService for TestMeasurementStreamImpl {
type Context = ();
async fn measurement_stream(ctx: StreamContext<Self::Context>) -> Result<(), Error> {
ctx.send(
"measurement",
&Measurement {
sensor_id: 42,
value: 98.6,
},
)
.await?
.await?;
Ok(())
}
}
#[tokio::test]
async fn test_service_stream_struct_message() {
let server = nats_server::run_server("tests/nats/default.conf");
let mut cluster = Cluster::new(server.client_url()).unwrap();
cluster.register(TestMeasurementStreamImpl::service(()));
tokio::spawn(async move {
cluster.run().await.unwrap();
});
sleep(Duration::from_millis(50)).await;
let client = TestMeasurementStreamServiceClient::new(
async_nats::connect(server.client_url()).await.unwrap(),
);
let mut stream = client.measurement_stream().await.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg.sensor_id, 42);
assert_eq!(msg.value, 98.6);
}