use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use ruststream::memory::{MemoryBroker, MemoryMessage};
use ruststream::runtime::{
AppInfo, Context, DynMiddleware, DynStack, Handler, HandlerResult, Identity, Layer, Next,
RustStream, Settle, Stack,
};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
println!("got order {}", order.id);
HandlerResult::Ack
}
#[subscriber("returns")]
async fn returns(order: &Order) -> HandlerResult {
println!("got return for order {}", order.id);
HandlerResult::Ack
}
#[derive(Clone)]
struct LogLayer;
struct Logged<H>(H);
impl<H> Layer<H> for LogLayer {
type Handler = Logged<H>;
fn layer(&self, inner: H) -> Logged<H> {
Logged(inner)
}
}
impl<M: Send + Sync, H: Handler<M>> Handler<M> for Logged<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
println!("-> {}", ctx.name());
let settle = self.0.handle(msg, ctx).await;
println!("<- {}", ctx.name());
settle
}
}
struct Audit {
service: String,
}
impl<I: Send + Sync> DynMiddleware<I> for Audit {
fn handle<'a>(
&'a self,
input: &'a I,
ctx: &'a mut Context<'_>,
next: Next<'a, I>,
) -> Pin<Box<dyn Future<Output = Settle> + Send + 'a>> {
Box::pin(async move {
println!("[{}] handling {}", self.service, ctx.name());
next.run(input, ctx).await
})
}
}
#[ruststream::app]
fn app() -> RustStream<Stack<DynStack<MemoryMessage>, Stack<LogLayer, Identity>>> {
let audit_enabled = std::env::var("AUDIT").is_ok();
let info = AppInfo::new("middleware", "0.1.0");
let mut middleware: Vec<Arc<dyn DynMiddleware<MemoryMessage>>> = Vec::new();
if audit_enabled {
middleware.push(Arc::new(Audit {
service: "orders".to_owned(),
}));
}
let stack = DynStack::new(middleware);
RustStream::new(info)
.layer(LogLayer)
.layer(stack)
.with_broker(MemoryBroker::new(), |b| {
b.include(handle);
b.include(returns);
})
}