#![cfg(all(
feature = "testing",
feature = "macros",
feature = "memory",
feature = "json"
))]
use ruststream::subscriber;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, PartialEq)]
struct Order {
id: u64,
quantity: u32,
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
struct Confirmation {
id: u64,
accepted: bool,
}
#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
Confirmation {
id: order.id,
accepted: order.quantity > 0,
}
}
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream, TypedPublisher};
use ruststream::testing::TestApp;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn confirms_valid_orders() {
let app = RustStream::new(AppInfo::new("orders-test", "0.0.0")).with_broker(
MemoryBroker::new(),
|b| {
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(confirm, replies);
},
);
let tb = TestApp::start(app).await.expect("start harness");
tb.broker::<MemoryBroker>()
.publish("orders", &Order { id: 1, quantity: 2 })
.await
.expect("publish");
tb.broker::<MemoryBroker>()
.subscriber("orders")
.assert_called_once()
.with(&Order { id: 1, quantity: 2 })
.settled(HandlerResult::Ack);
tb.broker::<MemoryBroker>()
.published::<Confirmation>("confirmations")
.assert_called_once()
.with(&Confirmation {
id: 1,
accepted: true,
});
}