use std::time::Duration;
use ruststream::codec::JsonCodec;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, Context, HandlerMetadata, HandlerResult, RustStream, typed};
use ruststream::subscriber;
use ruststream::{Buffered, Name};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
println!("got order {}", order.id);
HandlerResult::Ack
}
#[subscriber("orders")]
async fn with_context(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(id) = ctx.headers().correlation_id() {
println!("order {} correlates to {id}", order.id);
}
HandlerResult::Ack
}
#[subscriber(batch("orders"))]
async fn settle(orders: &[Order]) -> HandlerResult {
println!("settling {} orders", orders.len());
HandlerResult::Ack
}
#[subscriber("orders", workers(16))]
async fn fan_out(order: &Order) -> HandlerResult {
println!("processing order {}", order.id);
HandlerResult::Ack
}
#[subscriber("orders", workers(16, by_key))]
async fn per_customer(order: &Order) -> HandlerResult {
println!("processing order {}", order.id);
HandlerResult::Ack
}
#[subscriber(batch("orders"))]
async fn reconcile(orders: &[Order]) -> Vec<HandlerResult> {
orders
.iter()
.map(|order| {
if order.id == 0 {
HandlerResult::retry()
} else {
HandlerResult::Ack
}
})
.collect()
}
#[subscriber(batch(Buffered::<Name>::new(Name::new("orders"))
.max_size(128)
.max_wait(Duration::from_millis(20))))]
async fn drain(orders: &[Order]) -> HandlerResult {
println!("draining {} orders", orders.len());
HandlerResult::Ack
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("subscribers", "0.1.0")).with_broker(MemoryBroker::new(), |b| {
b.include(handle);
b.include(with_context);
b.include_batch(settle);
b.include_batch(reconcile);
b.include_batch(drain);
b.include(fan_out);
b.include(per_customer);
b.subscribe(
Name::new("orders"),
typed(JsonCodec, |_order: &Order, _ctx: &mut Context| async {
HandlerResult::Ack
}),
HandlerMetadata::typed::<Order>("orders"),
);
})
}