#![cfg(feature = "macros")]
mod common;
use std::{
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use common::handler_signal;
use ruststream::codec::JsonCodec;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream, TypedPublisher};
use ruststream::{Name, OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct Receipt {
id: u32,
}
static PLAIN_ON: AtomicUsize = AtomicUsize::new(0);
static BATCH: AtomicUsize = AtomicUsize::new(0);
static BATCH_ON: AtomicUsize = AtomicUsize::new(0);
static POUT: AtomicUsize = AtomicUsize::new(0);
static POUT_ON: AtomicUsize = AtomicUsize::new(0);
static BPOUT: AtomicUsize = AtomicUsize::new(0);
static BPOUT_ON: AtomicUsize = AtomicUsize::new(0);
static NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("sc-plain-on")]
async fn plain_on(_o: &Order) -> HandlerResult {
PLAIN_ON.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber(batch("sc-batch"))]
async fn batch(orders: &[Order]) -> HandlerResult {
BATCH.fetch_add(orders.len(), Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber(batch("sc-batch-ignored"))]
async fn batch_on(orders: &[Order]) -> HandlerResult {
BATCH_ON.fetch_add(orders.len(), Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("sc-pin", publish("sc-pout"))]
async fn relay(o: &Order) -> Receipt {
Receipt { id: o.id }
}
#[subscriber("sc-pin-ignored", publish("sc-pout-on"))]
async fn relay_on(o: &Order) -> Receipt {
Receipt { id: o.id }
}
#[subscriber(batch("sc-bpin"), publish("sc-bpout"))]
async fn batch_relay(orders: &[Order]) -> Vec<Receipt> {
orders.iter().map(|o| Receipt { id: o.id }).collect()
}
#[subscriber(batch("sc-bpin-ignored"), publish("sc-bpout-on"))]
async fn batch_relay_on(orders: &[Order]) -> Vec<Receipt> {
orders.iter().map(|o| Receipt { id: o.id }).collect()
}
#[subscriber("sc-pout")]
async fn pout_check(_r: &Receipt) -> HandlerResult {
POUT.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("sc-pout-on")]
async fn pout_on_check(_r: &Receipt) -> HandlerResult {
POUT_ON.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("sc-bpout")]
async fn bpout_check(_r: &Receipt) -> HandlerResult {
BPOUT.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("sc-bpout-on")]
async fn bpout_on_check(_r: &Receipt) -> HandlerResult {
BPOUT_ON.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn scope_codec_include_family_dispatches() {
let broker = MemoryBroker::new();
let driver = broker.clone().publisher();
let reply_broker = broker.clone();
let app =
RustStream::new(AppInfo::new("sc", "0.1.0")).with_broker_codec(broker, JsonCodec, |b| {
b.include_on(Name::new("sc-plain-on"), plain_on);
b.include_batch(batch);
b.include_batch_on(Name::new("sc-batch-on"), batch_on);
b.include_publishing(relay, TypedPublisher::new(reply_broker.publisher()));
b.include_publishing_on(
Name::new("sc-pin-on"),
relay_on,
TypedPublisher::new(reply_broker.publisher()),
);
b.include_batch_publishing(batch_relay, TypedPublisher::new(reply_broker.publisher()));
b.include_batch_publishing_on(
Name::new("sc-bpin-on"),
batch_relay_on,
TypedPublisher::new(reply_broker.publisher()),
);
b.include(pout_check);
b.include(pout_on_check);
b.include(bpout_check);
b.include(bpout_on_check);
});
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 1 }).unwrap();
let topics = [
"sc-plain-on",
"sc-batch",
"sc-batch-on",
"sc-pin",
"sc-pin-on",
"sc-bpin",
"sc-bpin-on",
];
let counters: [&AtomicUsize; 7] = [
&PLAIN_ON, &BATCH, &BATCH_ON, &POUT, &POUT_ON, &BPOUT, &BPOUT_ON,
];
let outcome = tokio::time::timeout(Duration::from_secs(5), async {
loop {
for topic in topics {
let _ = driver.publish(OutgoingMessage::new(topic, &payload)).await;
}
handler_signal(&NOTIFY).await;
if counters.iter().all(|c| c.load(Ordering::SeqCst) >= 1) {
break;
}
}
})
.await;
assert!(
outcome.is_ok(),
"a scope-codec include variant never dispatched"
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static D_PLAIN_ON: AtomicUsize = AtomicUsize::new(0);
static D_BATCH_ON: AtomicUsize = AtomicUsize::new(0);
static D_POUT_ON: AtomicUsize = AtomicUsize::new(0);
static D_BPOUT_ON: AtomicUsize = AtomicUsize::new(0);
static D_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("d-plain-on")]
async fn d_plain_on(_o: &Order) -> HandlerResult {
D_PLAIN_ON.fetch_add(1, Ordering::SeqCst);
D_NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber(batch("d-batch-ignored"))]
async fn d_batch_on(orders: &[Order]) -> HandlerResult {
D_BATCH_ON.fetch_add(orders.len(), Ordering::SeqCst);
D_NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("d-pin-ignored", publish("d-pout-on"))]
async fn d_relay_on(o: &Order) -> Receipt {
Receipt { id: o.id }
}
#[subscriber(batch("d-bpin-ignored"), publish("d-bpout-on"))]
async fn d_batch_relay_on(orders: &[Order]) -> Vec<Receipt> {
orders.iter().map(|o| Receipt { id: o.id }).collect()
}
#[subscriber("d-pout-on")]
async fn d_pout_on_check(_r: &Receipt) -> HandlerResult {
D_POUT_ON.fetch_add(1, Ordering::SeqCst);
D_NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("d-bpout-on")]
async fn d_bpout_on_check(_r: &Receipt) -> HandlerResult {
D_BPOUT_ON.fetch_add(1, Ordering::SeqCst);
D_NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn default_codec_include_on_family_dispatches() {
let broker = MemoryBroker::new();
let driver = broker.clone().publisher();
let reply_broker = broker.clone();
let app = RustStream::new(AppInfo::new("dsc", "0.1.0")).with_broker(broker, |b| {
b.include_on(Name::new("d-plain-on"), d_plain_on);
b.include_batch_on(Name::new("d-batch-on"), d_batch_on);
b.include_publishing_on(
Name::new("d-pin-on"),
d_relay_on,
TypedPublisher::new(reply_broker.publisher()),
);
b.include_batch_publishing_on(
Name::new("d-bpin-on"),
d_batch_relay_on,
TypedPublisher::new(reply_broker.publisher()),
);
b.include(d_pout_on_check);
b.include(d_bpout_on_check);
});
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = serde_json::to_vec(&Order { id: 1 }).unwrap();
let topics = ["d-plain-on", "d-batch-on", "d-pin-on", "d-bpin-on"];
let counters: [&AtomicUsize; 4] = [&D_PLAIN_ON, &D_BATCH_ON, &D_POUT_ON, &D_BPOUT_ON];
let outcome = tokio::time::timeout(Duration::from_secs(5), async {
loop {
for topic in topics {
let _ = driver.publish(OutgoingMessage::new(topic, &payload)).await;
}
handler_signal(&D_NOTIFY).await;
if counters.iter().all(|c| c.load(Ordering::SeqCst) >= 1) {
break;
}
}
})
.await;
assert!(
outcome.is_ok(),
"a default-codec include_on variant never dispatched"
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}