#![cfg(all(feature = "macros", feature = "cbor", feature = "msgpack"))]
mod common;
use std::{
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use common::handler_signal;
use ruststream::codec::{CborCodec, Codec, MsgpackCodec};
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, Router, RustStream};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
}
static CBOR_SEEN: AtomicUsize = AtomicUsize::new(0);
static MSGPACK_SEEN: AtomicUsize = AtomicUsize::new(0);
static NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("orders-cbor")]
async fn cbor_order(order: &Order) -> HandlerResult {
assert_eq!(order.id, 7);
CBOR_SEEN.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[subscriber("orders-msgpack")]
async fn msgpack_order(order: &Order) -> HandlerResult {
assert_eq!(order.id, 7);
MSGPACK_SEEN.fetch_add(1, Ordering::SeqCst);
NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn non_default_codecs_dispatch_through_the_router() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let cbor_router = Router::<MemoryBroker>::new()
.with_codec(CborCodec)
.include(cbor_order);
let msgpack_router = Router::<MemoryBroker>::new()
.with_codec(MsgpackCodec)
.include(msgpack_order);
let app = RustStream::new(AppInfo::new("codecs", "0.1.0")).with_broker(broker, |b| {
b.include_router(cbor_router);
b.include_router(msgpack_router);
});
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let cbor_bytes = CborCodec.encode(&Order { id: 7 }).unwrap();
let msgpack_bytes = MsgpackCodec.encode(&Order { id: 7 }).unwrap();
let driven = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("orders-cbor", &cbor_bytes))
.await;
let _ = publisher
.publish(OutgoingMessage::new("orders-msgpack", &msgpack_bytes))
.await;
handler_signal(&NOTIFY).await;
if CBOR_SEEN.load(Ordering::SeqCst) >= 1 && MSGPACK_SEEN.load(Ordering::SeqCst) >= 1 {
break;
}
}
})
.await;
assert!(
driven.is_ok(),
"a non-default codec handler never dispatched"
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}