use ruststream::memory::MemorySource;
use ruststream::runtime::HandlerResult;
use ruststream::subscriber;
use crate::domain::{Cancellation, Confirmation, Order, Repository};
#[subscriber(MemorySource::new("orders"), publish("confirmations"))]
pub(crate) async fn confirm(
order: &Order,
ctx: &mut Context<'_>,
) -> Result<Confirmation, HandlerResult> {
let repo = ctx
.state()
.get::<Repository>()
.expect("repository set in on_startup");
tracing::debug!(
order = order.id,
customer = %order.customer,
item = %order.item,
"confirming order"
);
match repo.record_order(order.id).await {
Ok(()) => Ok(Confirmation {
order_id: order.id,
accepted: order.quantity > 0,
}),
Err(e) if e.is_transient() => {
tracing::warn!(order = order.id, "store busy, asking for redelivery");
Err(HandlerResult::retry())
}
Err(e) => {
tracing::error!(order = order.id, error = %e, "dropping order");
Err(HandlerResult::drop())
}
}
}
#[subscriber("cancellations")]
pub(crate) async fn on_cancel(cancel: &Cancellation, ctx: &mut Context<'_>) -> HandlerResult {
let repo = ctx
.state()
.get::<Repository>()
.expect("repository set in on_startup");
match repo.cancel(cancel.order_id).await {
Err(e) if e.is_transient() => HandlerResult::retry(),
_ => HandlerResult::Ack,
}
}