use std::env;
use std::time::Duration;
use actix_web::web::Data;
use serde_json::Value;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::AppState;
use crate::data::events::post_event;
use crate::data::outbox::{
OutboxEvent, count_pending_outbox_events, count_poisoned_outbox_events,
lease_pending_outbox_events, mark_outbox_failed_attempt, mark_outbox_published,
};
use crate::data::webhooks::cleanup_webhook_state_ledger_older_than;
use crate::webhooks::{GatewayWebhookTrigger, spawn_gateway_webhook_dispatch};
const ENV_OUTBOX_ENABLED: &str = "ATHENA_OUTBOX_ENABLED";
const ENV_OUTBOX_POLL_MS: &str = "ATHENA_OUTBOX_POLL_MS";
const ENV_OUTBOX_BATCH_SIZE: &str = "ATHENA_OUTBOX_BATCH_SIZE";
const ENV_OUTBOX_MAX_ATTEMPTS: &str = "ATHENA_OUTBOX_MAX_ATTEMPTS";
const ENV_WEBHOOK_LEDGER_RETENTION_SECONDS: &str = "ATHENA_WEBHOOK_LEDGER_RETENTION_SECONDS";
const ENV_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS: &str =
"ATHENA_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS";
const DEFAULT_POLL_MS: u64 = 1_000;
const DEFAULT_BATCH_SIZE: i32 = 50;
const DEFAULT_MAX_ATTEMPTS: i32 = 10;
const DEFAULT_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS: u64 = 300;
pub fn spawn_outbox_relay_worker(app_state: Data<AppState>) {
let enabled: bool = env::var(ENV_OUTBOX_ENABLED)
.map(|v| matches!(v.trim(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false);
if !enabled {
debug!("Outbox relay worker is disabled (set ATHENA_OUTBOX_ENABLED=true to enable)");
return;
}
let poll_ms: u64 = env::var(ENV_OUTBOX_POLL_MS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_POLL_MS)
.max(100);
let batch_size: i32 = env::var(ENV_OUTBOX_BATCH_SIZE)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BATCH_SIZE)
.max(1);
let max_attempts: i32 = env::var(ENV_OUTBOX_MAX_ATTEMPTS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MAX_ATTEMPTS)
.max(1);
let Some(logging_client) = app_state.logging_client_name.clone() else {
warn!(
"Outbox relay worker enabled but no logging client configured; \
skipping (set logging_client in config.yaml)"
);
return;
};
let Some(pool) = app_state.pg_registry.get_pool(&logging_client) else {
warn!(
client = %logging_client,
"Outbox relay worker: logging pool unavailable at startup; skipping"
);
return;
};
let worker_state: Data<AppState> = app_state.clone();
info!(
poll_ms,
batch_size, max_attempts, "Starting outbox relay worker"
);
let webhook_ledger_retention_seconds: i64 = env::var(ENV_WEBHOOK_LEDGER_RETENTION_SECONDS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0)
.max(0);
let webhook_ledger_cleanup_every_seconds: u64 =
env::var(ENV_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS)
.max(30);
tokio::spawn(async move {
let poll_interval: Duration = Duration::from_millis(poll_ms);
let mut last_webhook_cleanup = std::time::Instant::now();
loop {
match lease_pending_outbox_events(&pool, batch_size, max_attempts).await {
Err(err) => {
warn!(error = %err, "Outbox relay: lease query failed; will retry");
}
Ok(events) => {
let count: usize = events.len();
if count > 0 {
debug!(count, "Outbox relay: dispatching batch");
}
for event in events {
dispatch_outbox_event(&worker_state, &pool, event).await;
}
}
}
emit_outbox_metrics(&pool, max_attempts).await;
if webhook_ledger_retention_seconds > 0
&& last_webhook_cleanup.elapsed()
>= Duration::from_secs(webhook_ledger_cleanup_every_seconds)
{
match cleanup_webhook_state_ledger_older_than(
&pool,
webhook_ledger_retention_seconds,
)
.await
{
Ok(removed) => {
if removed > 0 {
info!(
removed,
retention_seconds = webhook_ledger_retention_seconds,
"webhook_state_ledger_cleanup_removed"
);
}
}
Err(err) => {
warn!(error = %err, "webhook_state_ledger_cleanup_failed");
}
}
last_webhook_cleanup = std::time::Instant::now();
}
tokio::time::sleep(poll_interval).await;
}
});
}
async fn dispatch_outbox_event(
state: &Data<AppState>,
pool: &sqlx::postgres::PgPool,
event: OutboxEvent,
) {
let event_id: Uuid = event.event_id;
let result: Result<(), String> = match event.event_type.as_str() {
"mutation.insert" | "mutation.update" | "mutation.delete" => {
dispatch_mutation_event(state.get_ref(), &event).await
}
"webhook.trigger" => dispatch_webhook_event(state, &event).await,
other => {
warn!(
event_id = %event_id,
event_type = %other,
"Outbox relay: unknown event_type; marking published to avoid infinite retry"
);
Ok(())
}
};
match result {
Ok(()) => {
if let Err(err) = mark_outbox_published(pool, event_id).await {
error!(
event_id = %event_id,
error = %err,
"Outbox relay: failed to mark event published (will re-lease)"
);
}
}
Err(err) => {
warn!(
event_id = %event_id,
error = %err,
attempts = event.attempts,
"Outbox relay: dispatch failed; recording attempt"
);
if let Err(db_err) = mark_outbox_failed_attempt(pool, event_id, &err).await {
error!(
event_id = %event_id,
error = %db_err,
"Outbox relay: failed to record failed attempt"
);
}
}
}
}
async fn dispatch_mutation_event(_state: &AppState, event: &OutboxEvent) -> Result<(), String> {
let company_id: String = event
.headers
.get("company_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if company_id.is_empty() {
debug!(
event_id = %event.event_id,
"Outbox relay: mutation event has no company_id in headers; skipping CDC publish"
);
return Ok(());
}
post_event(company_id, event.payload.clone()).await;
Ok(())
}
async fn dispatch_webhook_event(state: &Data<AppState>, event: &OutboxEvent) -> Result<(), String> {
let payload: &Value = &event.payload;
let client_name: String = event
.headers
.get("client_name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let route_key: String = payload
.get("route_key")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let table_name: Option<String> = payload
.get("table_name")
.and_then(Value::as_str)
.map(str::to_string);
let request_id: Option<String> = event
.headers
.get("request_id")
.and_then(Value::as_str)
.map(str::to_string);
let request_method: String = payload
.get("request_method")
.and_then(Value::as_str)
.unwrap_or("POST")
.to_string();
let request_path: String = payload
.get("request_path")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let request_payload: Option<Value> = payload.get("request_payload").cloned();
let response_payload: Option<Value> = payload.get("response_payload").cloned();
let headers: Vec<(String, String)> = payload
.get("headers")
.and_then(Value::as_array)
.map(|arr| {
arr.iter()
.filter_map(|pair| {
let name: String = pair.get(0).and_then(Value::as_str)?.to_string();
let value: String = pair.get(1).and_then(Value::as_str)?.to_string();
Some((name, value))
})
.collect()
})
.unwrap_or_default();
let trigger: GatewayWebhookTrigger = GatewayWebhookTrigger {
client_name,
route_key,
table_name,
request_id,
request_method,
request_path,
headers,
payload: request_payload,
response: response_payload,
};
spawn_gateway_webhook_dispatch(state.clone(), trigger);
Ok(())
}
async fn emit_outbox_metrics(pool: &sqlx::postgres::PgPool, max_attempts: i32) {
match count_pending_outbox_events(pool, max_attempts).await {
Ok(pending) => {
debug!(pending, "outbox_pending_total");
}
Err(err) => {
debug!(error = %err, "Outbox relay: pending count query failed");
}
}
match count_poisoned_outbox_events(pool, max_attempts).await {
Ok(poisoned) => {
if poisoned > 0 {
warn!(
poisoned,
max_attempts,
"outbox_poisoned_total: events have exhausted all retries and require manual inspection"
);
}
}
Err(err) => {
debug!(error = %err, "Outbox relay: poisoned count query failed");
}
}
}