#![cfg(all(feature = "macros", feature = "memory", feature = "json"))]
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, LazyLock, Mutex};
use std::time::Duration;
use ruststream::memory::{MemoryBroker, MemoryMessage};
use ruststream::runtime::{
AppInfo, Outgoing, PublishContext, PublishDynLayer, PublishDynNext, PublishDynStack,
PublishLayer, PublishNext, PublishPipeline, PublishTransform, RustStream, TypedPublisher,
for_batch,
};
use ruststream::{
BuildContext, Field, Headers, IncomingMessage, OutgoingMessage, Publisher, subscriber,
};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Serialize, Deserialize)]
struct Req {
n: u32,
}
#[derive(Serialize, Deserialize)]
struct Resp {
n: u32,
}
#[derive(Default)]
struct TraceCtx {
correlation: Option<String>,
}
impl BuildContext<MemoryMessage> for TraceCtx {
fn build(msg: &MemoryMessage) -> Self {
Self {
correlation: msg.headers().correlation_id().map(str::to_owned),
}
}
}
#[derive(Clone, Copy)]
struct Correlation;
impl Field<TraceCtx> for Correlation {
type Value<'a> = Option<&'a str>;
fn get(self, c: &TraceCtx) -> Option<&str> {
c.correlation.as_deref()
}
}
struct PropagateCorrelation;
impl PublishTransform<TraceCtx> for PropagateCorrelation {
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, TraceCtx>) {
if let Some(id) = cx.context(Correlation) {
out.headers_mut()
.insert("correlation-id", id.as_bytes().to_vec());
}
}
}
#[subscriber("in", publish("out"))]
async fn echo(req: &Req, _ctx: &mut Context<'_, TraceCtx>) -> Resp {
Resp { n: req.n }
}
static CAPTURED: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));
static GOT: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("out")]
async fn capture(_resp: &Resp, ctx: &mut Context<'_>) {
*CAPTURED.lock().expect("poisoned") = ctx.headers().correlation_id().map(str::to_owned);
GOT.notify_one();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn delivery_context_propagates_to_the_reply() {
let ingress = MemoryBroker::new();
let egress = MemoryBroker::new();
let ingress_pub = ingress.publisher();
let egress_pub = TypedPublisher::new(egress.publisher()).transform(PropagateCorrelation);
let app = RustStream::new(AppInfo::new("svc", "0.1.0"))
.with_broker(ingress, |b| {
b.include_publishing(echo, egress_pub);
})
.with_broker(egress, |b| {
b.include(capture);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
let payload = serde_json::to_vec(&Req { n: 7 }).expect("encode");
let captured = tokio::time::timeout(Duration::from_secs(2), async {
let notified = GOT.notified();
tokio::pin!(notified);
loop {
let mut headers = Headers::new();
headers.insert("correlation-id", "trace-abc");
ingress_pub
.publish(OutgoingMessage::new("in", &payload).with_headers(headers))
.await
.expect("publish");
tokio::select! {
() = &mut notified => break,
() = tokio::time::sleep(Duration::from_millis(10)) => {}
}
}
})
.await;
assert!(captured.is_ok(), "reply never captured");
assert_eq!(
CAPTURED.lock().expect("poisoned").as_deref(),
Some("trace-abc"),
"the reply should carry the delivery's correlation id, stamped by the publish layer"
);
shutdown.notify_one();
run.await.expect("join").expect("run");
}
struct MarkBatched;
impl<C> PublishTransform<C> for MarkBatched {
fn apply(&self, out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {
out.headers_mut().insert("x-batched", b"1".to_vec());
}
}
#[subscriber(batch("batch-in"), publish("batch-out"))]
async fn batch_echo(reqs: &[Req]) -> Vec<Resp> {
reqs.iter().map(|r| Resp { n: r.n }).collect()
}
static BATCHED: LazyLock<Mutex<Option<bool>>> = LazyLock::new(|| Mutex::new(None));
static BATCH_GOT: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("batch-out")]
async fn batch_capture(_resp: &Resp, ctx: &mut Context<'_>) {
*BATCHED.lock().expect("poisoned") = Some(ctx.headers().get("x-batched").is_some());
BATCH_GOT.notify_one();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_layer_runs_only_on_batched_replies() {
let broker = MemoryBroker::new();
let ingress_pub = broker.publisher();
let reply_pub = TypedPublisher::new(broker.publisher()).batch_transform(for_batch(MarkBatched));
let app = RustStream::new(AppInfo::new("svc", "0.1.0")).with_broker(broker, |b| {
b.include_batch_publishing(batch_echo, reply_pub);
b.include(batch_capture);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
let payload = serde_json::to_vec(&Req { n: 1 }).expect("encode");
let captured = tokio::time::timeout(Duration::from_secs(2), async {
let notified = BATCH_GOT.notified();
tokio::pin!(notified);
loop {
ingress_pub
.publish(OutgoingMessage::new("batch-in", &payload))
.await
.expect("publish");
tokio::select! {
() = &mut notified => break,
() = tokio::time::sleep(Duration::from_millis(10)) => {}
}
}
})
.await;
assert!(captured.is_ok(), "batched reply never captured");
assert_eq!(
*BATCHED.lock().expect("poisoned"),
Some(true),
"the batch layer should stamp every batched reply"
);
shutdown.notify_one();
run.await.expect("join").expect("run");
}
struct StampDyn;
impl PublishDynLayer for StampDyn {
fn on_publish<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishDynNext<'a>,
) -> Pin<
Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
out.headers_mut().insert("x-dyn", b"1".to_vec());
next.run(out).await
})
}
}
#[subscriber("dyn-in", publish("dyn-out"))]
async fn dyn_echo(req: &Req) -> Resp {
Resp { n: req.n }
}
static DYN_SEEN: LazyLock<Mutex<Option<bool>>> = LazyLock::new(|| Mutex::new(None));
static DYN_GOT: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("dyn-out")]
async fn dyn_capture(_resp: &Resp, ctx: &mut Context<'_>) {
*DYN_SEEN.lock().expect("poisoned") = Some(ctx.headers().get("x-dyn").is_some());
DYN_GOT.notify_one();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dyn_stack_runs_a_runtime_built_middleware() {
let broker = MemoryBroker::new();
let ingress_pub = broker.publisher();
let stack = PublishDynStack::new([Arc::new(StampDyn) as Arc<dyn PublishDynLayer>]);
let app = RustStream::new(AppInfo::new("svc", "0.1.0"))
.publish_layer(stack)
.with_broker(broker, |b| {
let reply_pub = TypedPublisher::new(b.broker().publisher());
b.include_publishing(dyn_echo, reply_pub);
b.include(dyn_capture);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
let payload = serde_json::to_vec(&Req { n: 3 }).expect("encode");
let captured = tokio::time::timeout(Duration::from_secs(2), async {
let notified = DYN_GOT.notified();
tokio::pin!(notified);
loop {
ingress_pub
.publish(OutgoingMessage::new("dyn-in", &payload))
.await
.expect("publish");
tokio::select! {
() = &mut notified => break,
() = tokio::time::sleep(Duration::from_millis(10)) => {}
}
}
})
.await;
assert!(captured.is_ok(), "reply never captured");
assert_eq!(
*DYN_SEEN.lock().expect("poisoned"),
Some(true),
"the dynamic stack middleware should run and stamp the reply"
);
shutdown.notify_one();
run.await.expect("join").expect("run");
}
type PubFut<'a> =
Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>>;
fn append_order(out: &mut Outgoing<'_>, letter: &str) {
let mut order = out
.headers()
.get_str("order")
.unwrap_or_default()
.to_owned();
order.push_str(letter);
out.headers_mut().insert("order", order.into_bytes());
}
#[derive(Clone)]
struct AppendA;
impl PublishLayer for AppendA {
fn on_publish<'a, N: PublishPipeline>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a, N>,
) -> PubFut<'a> {
append_order(out, "A");
next.run(out)
}
}
#[derive(Clone)]
struct AppendB;
impl PublishLayer for AppendB {
fn on_publish<'a, N: PublishPipeline>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a, N>,
) -> PubFut<'a> {
append_order(out, "B");
next.run(out)
}
}
#[subscriber("ord-in", publish("ord-out"))]
async fn ord_echo(req: &Req) -> Resp {
Resp { n: req.n }
}
static ORDER: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));
static ORDER_GOT: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("ord-out")]
async fn ord_capture(_resp: &Resp, ctx: &mut Context<'_>) {
*ORDER.lock().expect("poisoned") = ctx.headers().get_str("order").map(str::to_owned);
ORDER_GOT.notify_one();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_layer_last_added_runs_outermost() {
let broker = MemoryBroker::new();
let ingress_pub = broker.publisher();
let app = RustStream::new(AppInfo::new("svc", "0.1.0"))
.publish_layer(AppendA)
.publish_layer(AppendB)
.with_broker(broker, |b| {
b.include_publishing(ord_echo, TypedPublisher::new(b.broker().publisher()));
b.include(ord_capture);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
let payload = serde_json::to_vec(&Req { n: 1 }).expect("encode");
let got = tokio::time::timeout(Duration::from_secs(2), async {
let notified = ORDER_GOT.notified();
tokio::pin!(notified);
loop {
ingress_pub
.publish(OutgoingMessage::new("ord-in", &payload))
.await
.expect("publish");
tokio::select! {
() = &mut notified => break,
() = tokio::time::sleep(Duration::from_millis(10)) => {}
}
}
})
.await;
assert!(got.is_ok(), "reply never captured");
assert_eq!(
ORDER.lock().expect("poisoned").as_deref(),
Some("BA"),
"the last publish_layer added must run outermost"
);
shutdown.notify_one();
run.await.expect("join").expect("run");
}