#![cfg(feature = "macros")]
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use ruststream::codec::JsonCodec;
use ruststream::memory::{MemoryBroker, MemorySubscriber};
use ruststream::runtime::{
AppInfo, HandlerResult, Outgoing, PublishLayer, PublishMiddleware, PublishNext, RustStream,
TypedPublisher,
};
use ruststream::{Message, OutgoingMessage, Publisher, SubscriptionSource, subscriber};
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
total: f64,
}
static HANDLED: AtomicU32 = AtomicU32::new(0);
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
HANDLED.fetch_add(order.id, Ordering::SeqCst);
HandlerResult::Ack
}
struct StreamSource {
name: String,
}
impl StreamSource {
fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
}
}
}
impl SubscriptionSource<MemoryBroker> for StreamSource {
type Subscriber = MemorySubscriber;
fn name(&self) -> &str {
&self.name
}
async fn subscribe(self, broker: &MemoryBroker) -> Result<MemorySubscriber, Infallible> {
Ok(broker.subscribe(&self.name))
}
}
static HANDLED_ON_STREAM: AtomicU32 = AtomicU32::new(0);
#[subscriber("ignored-on-the-include_on-path")]
async fn on_stream(order: &Order) -> HandlerResult {
HANDLED_ON_STREAM.fetch_add(order.id, Ordering::SeqCst);
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn macro_def_mounts_on_arbitrary_source() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("svc", "0.1.0")).with_broker(broker, |b| {
b.include_on(
StreamSource {
name: "events.stream".to_owned(),
},
on_stream,
JsonCodec,
);
});
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 4, total: 1.0 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("events.stream", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if HANDLED_ON_STREAM.load(Ordering::SeqCst) >= 4 {
break;
}
}
})
.await;
assert!(result.is_ok(), "include_on handler did not run");
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static HANDLED_CTOR: AtomicU32 = AtomicU32::new(0);
#[subscriber(StreamSource::new("ctor.stream"))]
async fn on_ctor(order: &Order) -> HandlerResult {
HANDLED_CTOR.fetch_add(order.id, Ordering::SeqCst);
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn macro_descriptor_in_decorator() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app =
RustStream::new(AppInfo::new("svc", "0.1.0")).with_broker(broker, |b| b.include(on_ctor));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 6, total: 1.0 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("ctor.stream", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if HANDLED_CTOR.load(Ordering::SeqCst) >= 6 {
break;
}
}
})
.await;
assert!(
result.is_ok(),
"descriptor-in-decorator handler did not run"
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
#[derive(Message)]
#[allow(dead_code)]
struct DescribedOrder {
id: u32,
}
#[test]
fn derive_message_metadata() {
assert_eq!(DescribedOrder::NAME, "DescribedOrder");
assert_eq!(
DescribedOrder::DESCRIPTION,
Some("An order placed by a customer."),
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn macro_subscriber_dispatches() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app =
RustStream::new(AppInfo::new("svc", "0.1.0")).with_broker(broker, |b| b.include(handle));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 5, total: 1.0 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("orders", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if HANDLED.load(Ordering::SeqCst) >= 5 {
break;
}
}
})
.await;
assert!(result.is_ok(), "macro handler did not run");
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static HANDLED_DEFAULT: AtomicU32 = AtomicU32::new(0);
#[subscriber("orders-default")]
async fn handle_default(order: &Order) -> HandlerResult {
HANDLED_DEFAULT.fetch_add(order.id, Ordering::SeqCst);
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn scope_default_codec_drops_per_call_codec() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app =
RustStream::new(AppInfo::new("svc", "0.1.0"))
.with_broker_codec(broker, JsonCodec, |b| b.include(handle_default));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 9, total: 1.0 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("orders-default", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if HANDLED_DEFAULT.load(Ordering::SeqCst) >= 9 {
break;
}
}
})
.await;
assert!(result.is_ok(), "scope-default-codec handler did not run");
shutdown.notify_one();
run.await.unwrap().unwrap();
}
struct StaticEnvelope;
impl PublishLayer for StaticEnvelope {
fn apply(&self, out: &mut Outgoing) {
out.headers_mut().insert("x-static", b"1".to_vec());
}
}
#[derive(Serialize, Deserialize)]
struct Ping {
n: u32,
}
static STATIC_SEEN: AtomicU32 = AtomicU32::new(0);
#[subscriber("ping-in", publish("ping-out"))]
async fn relay(p: &Ping) -> Ping {
Ping { n: p.n }
}
#[subscriber("ping-out")]
async fn check(p: &Ping, ctx: &mut Context) -> HandlerResult {
if ctx.headers().get("x-static").is_some() {
STATIC_SEEN.store(p.n, Ordering::SeqCst);
}
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn static_publish_layer_transforms_reply() {
let ingress = MemoryBroker::new();
let egress = MemoryBroker::new();
let ingress_pub = ingress.publisher();
let egress_pub = TypedPublisher::new(egress.publisher()).layer(StaticEnvelope);
let app = RustStream::new(AppInfo::new("svc", "0.1.0"))
.with_broker(ingress, |b| {
b.include_publishing(relay, egress_pub);
})
.with_broker(egress, |b| b.include(check));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Ping { n: 7 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = ingress_pub
.publish(OutgoingMessage::new("ping-in", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if STATIC_SEEN.load(Ordering::SeqCst) == 7 {
break;
}
}
})
.await;
assert!(
result.is_ok(),
"static publish layer header did not reach the consumer",
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
#[derive(Serialize, Deserialize)]
struct Request {
n: u32,
}
#[derive(Serialize, Deserialize)]
struct Response {
doubled: u32,
}
static REPLY_DOUBLED: AtomicU32 = AtomicU32::new(0);
static REPLY_TAGGED: AtomicU32 = AtomicU32::new(0);
struct Tagger;
impl PublishMiddleware for Tagger {
fn on_publish<'a>(
&'a self,
out: &'a mut Outgoing,
next: PublishNext<'a>,
) -> Pin<
Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
out.headers_mut().insert("x-envelope", b"1".to_vec());
next.run(out).await
})
}
}
#[subscriber("requests", publish("responses"))]
async fn reply(req: &Request) -> Response {
Response { doubled: req.n * 2 }
}
#[subscriber("responses")]
async fn capture(resp: &Response, ctx: &mut Context) -> HandlerResult {
if ctx.headers().get("x-envelope").is_some() {
REPLY_TAGGED.store(1, Ordering::SeqCst);
}
REPLY_DOUBLED.store(resp.doubled, Ordering::SeqCst);
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn macro_publisher_replies_cross_broker() {
let ingress = MemoryBroker::new();
let egress = MemoryBroker::new();
let ingress_pub = ingress.publisher();
let egress_pub = TypedPublisher::new(egress.publisher());
let app = RustStream::new(AppInfo::new("svc", "0.1.0"))
.publish_layer(Tagger)
.with_broker(ingress, |b| {
b.include_publishing(reply, egress_pub);
})
.with_broker(egress, |b| b.include(capture));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Request { n: 21 }).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), async {
loop {
let _ = ingress_pub
.publish(OutgoingMessage::new("requests", &payload))
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
if REPLY_DOUBLED.load(Ordering::SeqCst) == 42 {
break;
}
}
})
.await;
assert!(result.is_ok(), "reply was not published to egress");
assert_eq!(
REPLY_TAGGED.load(Ordering::SeqCst),
1,
"publish middleware header did not reach the consumer",
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}