use ruststream::memory::MemoryBroker;
use ruststream::runtime::{
AppInfo, BlanketLayer, Context, Handler, HandlerResult, Identity, Layer, Router, RustStream,
Stack,
};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[subscriber("orders")]
async fn orders(order: &Order) -> HandlerResult {
println!("got order {}", order.id);
HandlerResult::Ack
}
#[subscriber("shipments")]
async fn shipments(order: &Order) -> HandlerResult {
println!("got shipment for order {}", order.id);
HandlerResult::Ack
}
#[subscriber("audit")]
async fn audit(order: &Order) -> HandlerResult {
println!("audited order {}", order.id);
HandlerResult::Ack
}
#[derive(Clone)]
struct LogLayer;
struct Logged<H>(H);
impl<H> Layer<H> for LogLayer {
type Handler = Logged<H>;
fn layer(&self, inner: H) -> Logged<H> {
Logged(inner)
}
}
impl<M: Send + Sync, H: Handler<M>> Handler<M> for Logged<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
println!("app layer -> {}", ctx.name());
self.0.handle(msg, ctx).await
}
}
impl BlanketLayer for LogLayer {
fn apply<M, H>(&self, handler: H) -> impl Handler<M> + 'static
where
M: Send + Sync + 'static,
H: Handler<M> + 'static,
{
Logged(handler)
}
}
#[ruststream::app]
fn app() -> RustStream<Stack<LogLayer, Identity>> {
RustStream::new(AppInfo::new("app-scope", "0.1.0"))
.layer(LogLayer)
.with_broker(MemoryBroker::new(), |b| {
b.include(orders); b.include(shipments);
b.include_router(Router::new().include(audit));
})
}