use tokio::sync::broadcast;
use aa_runtime::approval::ApprovalRequest;
use super::publisher;
use super::webhook::WebhookTarget;
use crate::budget::BudgetAlert;
pub async fn webhook_delivery_loop(
target: WebhookTarget,
mut approval_rx: broadcast::Receiver<ApprovalRequest>,
mut budget_rx: broadcast::Receiver<BudgetAlert>,
) {
tracing::info!(url = %target.url(), "webhook delivery loop started");
loop {
tokio::select! {
result = approval_rx.recv() => {
match result {
Ok(request) => {
let envelope = publisher::approval_to_envelope(&request);
if let Err(e) = target.deliver(&envelope).await {
tracing::warn!(
error = %e,
request_id = %request.request_id,
"failed to deliver approval webhook"
);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
dropped = n,
"approval event receiver lagged, some events were not delivered"
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::info!("approval broadcast channel closed");
break;
}
}
}
result = budget_rx.recv() => {
match result {
Ok(alert) => {
let envelope = publisher::budget_alert_to_envelope(&alert);
if let Err(e) = target.deliver(&envelope).await {
tracing::warn!(
error = %e,
threshold_pct = alert.threshold_pct,
"failed to deliver budget alert webhook"
);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
dropped = n,
"budget alert receiver lagged, some events were not delivered"
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::info!("budget alert broadcast channel closed");
break;
}
}
}
}
}
tracing::info!("webhook delivery loop stopped");
}