use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Payment {
id: u64,
settled: bool,
}
#[subscriber("payments")]
async fn reconcile(payment: &Payment) -> HandlerResult {
if !payment.settled {
return HandlerResult::retry_after(Duration::from_secs(5));
}
println!("payment {} settled", payment.id);
HandlerResult::Ack
}
#[subscriber(batch("payments"))]
async fn reconcile_page(payments: &[Payment]) -> Vec<HandlerResult> {
payments
.iter()
.map(|payment| {
if payment.settled {
HandlerResult::Ack
} else {
HandlerResult::retry_after(Duration::from_secs(30))
}
})
.collect()
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("retry", "0.1.0")).with_broker(MemoryBroker::new(), |b| {
b.include(reconcile);
b.include_batch(reconcile_page);
})
}