use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream, Settle};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[subscriber("orders")]
async fn handle(order: &Order) -> Settle {
let id = order.id;
HandlerResult::ack().and_after(async move {
println!("order {id} acked; notifying downstream");
})
}
#[subscriber(batch("orders"))]
async fn handle_page(orders: &[Order]) -> Vec<Settle> {
orders
.iter()
.map(|order| {
if order.id == 0 {
HandlerResult::retry().into()
} else {
let id = order.id;
HandlerResult::ack().and_after(async move {
println!("order {id} acked in batch; following up");
})
}
})
.collect()
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("post_settle", "0.1.0")).with_broker(MemoryBroker::new(), |b| {
b.include(handle);
b.include_batch(handle_page);
})
}