use std::sync::atomic::{AtomicU64, Ordering};
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{
AppInfo, Context, Handler, HandlerResult, Identity, Layer, RustStream, Settle, Stack,
};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[derive(Debug)]
struct AppConfig {
reject_zero_ids: bool,
}
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
println!("received on {}", ctx.name());
if let Some(id) = ctx.headers().get("x-request-id") {
println!("request {}", String::from_utf8_lossy(id));
}
let config = ctx
.state()
.get::<AppConfig>()
.expect("config inserted at build time");
if config.reject_zero_ids && order.id == 0 {
return HandlerResult::drop();
}
let id = order.id;
ctx.after_ack(async move {
println!("order {id} acked; sending the confirmation");
});
ctx.insert(order.id);
if let Some(seen) = ctx.get::<u64>() {
println!("processing order {seen}");
}
HandlerResult::Ack
}
#[derive(Clone)]
struct RequestId;
struct WithRequestId<H>(H);
impl<H> Layer<H> for RequestId {
type Handler = WithRequestId<H>;
fn layer(&self, inner: H) -> WithRequestId<H> {
WithRequestId(inner)
}
}
static NEXT_REQUEST: AtomicU64 = AtomicU64::new(1);
impl<M: Send + Sync, H: Handler<M>> Handler<M> for WithRequestId<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
if ctx.headers().get("x-request-id").is_none() {
let id = format!("req-{}", NEXT_REQUEST.fetch_add(1, Ordering::Relaxed));
ctx.headers_mut().insert("x-request-id", id.into_bytes());
}
self.0.handle(msg, ctx).await
}
}
#[ruststream::app]
fn app() -> RustStream<Stack<RequestId, Identity>> {
RustStream::new(AppInfo::new("context", "0.1.0"))
.insert_state(AppConfig {
reject_zero_ids: true,
})
.layer(RequestId)
.with_broker(MemoryBroker::new(), |b| b.include(handle))
}