use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use chrono::Utc;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio::time::interval;
use tracing::Instrument;
use crate::alert::{AlertEvent, SharedAlerter};
use crate::config::{self, WorkerConfig};
use crate::db::Database;
use crate::metrics::Metrics;
const DEFAULT_CB_THRESHOLD: u32 = 5;
const DEFAULT_CB_COOLDOWN_SECS: u64 = 60;
const MAINTENANCE_INTERVAL_SECS: u64 = 3600;
#[derive(Debug)]
pub struct CircuitBreaker {
failures: std::sync::atomic::AtomicU32,
threshold: u32,
opened_at: std::sync::Mutex<Option<std::time::Instant>>,
cooldown: Duration,
half_open_in_progress: std::sync::atomic::AtomicBool,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
impl CircuitBreaker {
pub fn new(threshold: u32, cooldown: Duration) -> Self {
Self {
failures: std::sync::atomic::AtomicU32::new(0),
threshold,
opened_at: std::sync::Mutex::new(None),
cooldown,
half_open_in_progress: std::sync::atomic::AtomicBool::new(false),
}
}
pub fn state(&self) -> CircuitState {
let opened = self.opened_at.lock().unwrap_or_else(|e| e.into_inner());
match *opened {
None => CircuitState::Closed,
Some(at) => {
if at.elapsed() >= self.cooldown {
CircuitState::HalfOpen
} else {
CircuitState::Open
}
}
}
}
pub fn allow_request(&self) -> bool {
match self.state() {
CircuitState::Closed => true,
CircuitState::Open => false,
CircuitState::HalfOpen => {
!self
.half_open_in_progress
.swap(true, std::sync::atomic::Ordering::AcqRel)
}
}
}
pub fn record_success(&self) {
self.failures.store(0, std::sync::atomic::Ordering::Release);
*self.opened_at.lock().unwrap_or_else(|e| e.into_inner()) = None;
self.half_open_in_progress
.store(false, std::sync::atomic::Ordering::Release);
}
pub fn record_failure(&self) -> bool {
self.half_open_in_progress
.store(false, std::sync::atomic::Ordering::Release);
let prev = self
.failures
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let count = prev + 1;
if count >= self.threshold {
let mut opened = self.opened_at.lock().unwrap_or_else(|e| e.into_inner());
let was_half_open = match *opened {
Some(at) => at.elapsed() >= self.cooldown,
None => false,
};
if opened.is_none() || was_half_open {
*opened = Some(std::time::Instant::now());
return count == self.threshold && !was_half_open;
}
}
false
}
}
pub struct Worker {
db: Arc<Database>,
metrics: Arc<Metrics>,
alerter: SharedAlerter,
worker_config: WorkerConfig,
db_healthy: Arc<std::sync::atomic::AtomicBool>,
http: reqwest::Client,
poll_interval: Duration,
shutdown: tokio::sync::watch::Receiver<bool>,
rate_limiters: HashMap<
String,
Arc<
governor::RateLimiter<
governor::state::NotKeyed,
governor::state::InMemoryState,
governor::clock::DefaultClock,
>,
>,
>,
transforms: Arc<HashMap<String, String>>,
handler_headers: Arc<HashMap<String, HashMap<String, String>>>,
handler_methods: Arc<HashMap<String, String>>,
circuit_breakers: HashMap<String, Arc<CircuitBreaker>>,
workflows: Arc<HashMap<String, config::WorkflowConfig>>,
default_retry_max: u32,
queues: Arc<HashMap<String, config::QueueConfig>>,
event_ttl_secs: u64,
}
impl Worker {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: Arc<Database>,
metrics: Arc<Metrics>,
alerter: SharedAlerter,
worker_config: WorkerConfig,
shutdown: tokio::sync::watch::Receiver<bool>,
handler_rate_limits: HashMap<String, u32>,
handler_transforms: HashMap<String, String>,
handler_headers: HashMap<String, HashMap<String, String>>,
handler_methods: HashMap<String, String>,
workflows: HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
handler_names: Vec<String>,
queues: HashMap<String, config::QueueConfig>,
db_healthy: Arc<std::sync::atomic::AtomicBool>,
event_ttl_secs: u64,
) -> Self {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to build HTTP client");
let rate_limiters = handler_rate_limits
.into_iter()
.filter(|(_, rate)| *rate > 0)
.map(|(name, rate)| {
let quota = governor::Quota::per_second(std::num::NonZeroU32::new(rate).unwrap());
let limiter = governor::RateLimiter::direct(quota);
(name, Arc::new(limiter))
})
.collect();
let circuit_breakers: HashMap<String, Arc<CircuitBreaker>> = handler_names
.into_iter()
.map(|name| {
(
name,
Arc::new(CircuitBreaker::new(
DEFAULT_CB_THRESHOLD,
Duration::from_secs(DEFAULT_CB_COOLDOWN_SECS),
)),
)
})
.collect();
Self {
db,
metrics,
alerter,
worker_config,
http,
poll_interval: Duration::from_secs(1),
shutdown,
rate_limiters,
circuit_breakers,
transforms: Arc::new(handler_transforms),
handler_headers: Arc::new(handler_headers),
handler_methods: Arc::new(handler_methods),
workflows: Arc::new(workflows),
default_retry_max,
queues: Arc::new(queues),
db_healthy,
event_ttl_secs,
}
}
pub async fn run(mut self) {
tracing::info!("Queue worker started");
run_maintenance(
&self.db,
&self.metrics,
self.worker_config.stale_threshold_secs,
self.worker_config.retention_hours,
self.event_ttl_secs,
)
.await;
let semaphore = Arc::new(Semaphore::new(self.worker_config.max_concurrency));
let mut in_flight = JoinSet::new();
let mut poll_ticker = interval(self.poll_interval);
let busy_interval = Duration::from_millis(50);
let mut maint_ticker = interval(Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
maint_ticker.tick().await; let mut queue_recovery_ticker = interval(Duration::from_secs(10));
queue_recovery_ticker.tick().await; let mut consecutive_db_errors: u32 = 0;
loop {
tokio::select! {
_ = self.shutdown.changed() => {
if *self.shutdown.borrow() {
tracing::info!("Worker received shutdown signal");
break;
}
}
_ = poll_ticker.tick() => {
let is_postgres = self.db.driver == "postgres" && !self.db.is_d1();
let batch = self.worker_config.batch_size.min(
i32::try_from(semaphore.available_permits()).unwrap_or(i32::MAX)
).max(1);
let jobs = match self.db.fetch_available_jobs(batch).await {
Ok(j) => {
if consecutive_db_errors > 0 {
consecutive_db_errors = 0;
self.db_healthy.store(true, std::sync::atomic::Ordering::Relaxed);
tracing::info!("Database recovered, resuming webhook acceptance");
}
j
}
Err(e) => {
consecutive_db_errors += 1;
self.metrics.inc_db_errors();
if consecutive_db_errors >= 3 {
self.db_healthy.store(false, std::sync::atomic::Ordering::Relaxed);
if consecutive_db_errors == 3 {
if let Some(ref a) = self.alerter {
a.send(AlertEvent::DbUnhealthy);
}
}
}
let backoff = Duration::from_secs(
(1u64 << consecutive_db_errors.min(5)).min(30)
);
tracing::error!(
error = %e,
consecutive_errors = consecutive_db_errors,
backoff_secs = backoff.as_secs(),
"Failed to fetch jobs, backing off"
);
poll_ticker.reset_after(backoff);
continue;
}
};
let found_jobs = !jobs.is_empty();
for job in jobs {
let permit = match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => break, };
if !is_postgres {
match self.db.mark_job_running(&job.id).await {
Ok(true) => {}
Ok(false) => { drop(permit); continue; }
Err(e) => {
self.metrics.inc_db_errors();
tracing::error!(job_id = job.id, error = %e, "Failed to lock job");
drop(permit);
continue;
}
}
}
let db = self.db.clone();
let http = self.http.clone();
let metrics = self.metrics.clone();
let alerter = self.alerter.clone();
let transforms = self.transforms.clone();
let handler_headers = self.handler_headers.clone();
let handler_methods = self.handler_methods.clone();
let workflows = self.workflows.clone();
let default_retry_max = self.default_retry_max;
let rate_limiter = self.rate_limiters.get(&job.handler).cloned();
let cb = self.circuit_breakers.get(&job.handler).cloned();
let deliver_span = tracing::info_span!("deliver", job_id = %job.id, event_id = %job.event_id, handler = %job.handler);
in_flight.spawn(async move {
if let Some(ref cb) = cb {
if !cb.allow_request() {
tracing::warn!(
handler = job.handler,
job_id = job.id,
"Circuit open, rescheduling job"
);
let next_at = (Utc::now() + chrono::Duration::seconds(10)).naive_utc();
let _ = db.mark_job_retryable(&job.id, next_at, "circuit breaker open").await;
metrics.inc_circuit_rejected(&job.handler);
drop(permit);
return;
}
}
if let Some(ref rl) = rate_limiter {
rl.until_ready().await;
}
deliver_job(&db, &http, &metrics, &alerter, &transforms, &handler_headers, &handler_methods, &workflows, default_retry_max, &job, cb.as_deref()).await;
drop(permit);
}.instrument(deliver_span));
}
if found_jobs {
poll_ticker.reset_after(busy_interval);
}
}
Some(result) = in_flight.join_next() => {
if let Err(e) = result {
tracing::error!(error = %e, "Delivery task panicked");
}
}
_ = maint_ticker.tick() => {
run_maintenance(&self.db, &self.metrics, self.worker_config.stale_threshold_secs, self.worker_config.retention_hours, self.event_ttl_secs).await;
}
_ = queue_recovery_ticker.tick() => {
if !self.queues.is_empty() {
recover_expired_queue_messages(&self.db, &self.metrics, &self.queues).await;
}
}
}
}
let remaining = in_flight.len();
if remaining > 0 {
let timeout = Duration::from_secs(self.worker_config.drain_timeout_secs);
tracing::info!(
count = remaining,
timeout_secs = timeout.as_secs(),
"Draining in-flight deliveries"
);
let drain = async {
while let Some(result) = in_flight.join_next().await {
if let Err(e) = result {
tracing::error!(error = %e, "Delivery task panicked during shutdown");
}
}
};
if tokio::time::timeout(timeout, drain).await.is_err() {
let abandoned = in_flight.len();
tracing::warn!(
count = abandoned,
"Drain timeout reached, abandoning remaining deliveries"
);
}
}
tracing::info!("Worker stopped");
}
}
async fn run_maintenance(
db: &Database,
metrics: &Metrics,
stale_secs: i64,
retention_hours: i64,
event_ttl_secs: u64,
) {
match db.recover_stale_jobs(stale_secs).await {
Ok(0) => {}
Ok(n) => tracing::info!(count = n, "Recovered stale running jobs"),
Err(e) => {
metrics.inc_db_errors();
tracing::error!(error = %e, "Failed to recover stale jobs");
}
}
match db.cleanup_old_records(retention_hours).await {
Ok((0, 0)) => {}
Ok((jobs, attempts)) => tracing::info!(jobs, attempts, "Cleaned up old records"),
Err(e) => {
metrics.inc_db_errors();
tracing::error!(error = %e, "Failed to cleanup old records");
}
}
if event_ttl_secs > 0 {
match db.expire_old_jobs(event_ttl_secs as i64).await {
Ok(0) => {}
Ok(n) => tracing::info!(count = n, "Expired jobs past event TTL"),
Err(e) => {
metrics.inc_db_errors();
tracing::error!(error = %e, "Failed to expire old jobs");
}
}
}
}
async fn recover_expired_queue_messages(
db: &Database,
metrics: &Metrics,
queues: &HashMap<String, config::QueueConfig>,
) {
for (name, queue) in queues {
let handler = format!("queue/{}", name);
let timeout_secs = config::parse_duration(&queue.visibility_timeout);
if timeout_secs == 0 {
continue;
}
match db
.recover_expired_queue_messages(&handler, timeout_secs)
.await
{
Ok(0) => {}
Ok(n) => {
metrics.inc_queue_messages_expired(name, n);
tracing::info!(queue = name, recovered = n, "Recovered expired queue messages");
}
Err(e) => {
metrics.inc_db_errors();
tracing::error!(error = %e, queue = name, "Failed to recover expired queue messages");
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn deliver_job(
db: &Database,
http: &reqwest::Client,
metrics: &Metrics,
alerter: &SharedAlerter,
transforms: &HashMap<String, String>,
handler_headers: &HashMap<String, HashMap<String, String>>,
handler_methods: &HashMap<String, String>,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
circuit_breaker: Option<&CircuitBreaker>,
) {
let start = std::time::Instant::now();
let is_outbound = job.handler.starts_with("outbound/");
let is_workflow = !is_outbound && job.handler.contains('/');
if is_workflow {
if let Ok(Some(wf_data)) = db.get_workflow_job_data(&job.id).await {
if let Some(ref run_id) = wf_data.workflow_run_id {
if let Ok(Some(timeout_at)) = db.get_workflow_timeout(run_id).await {
let now = crate::db::format_now();
if now > timeout_at {
let workflow_name = job.handler.split('/').next().unwrap_or("");
tracing::warn!(
workflow = workflow_name,
run_id = run_id.as_str(),
job_id = job.id,
"Workflow timed out, skipping step"
);
let _ = db.mark_job_dead(&job.id, "workflow timeout").await;
let _ = db.fail_workflow_run(run_id).await;
metrics.inc_workflow_failed(workflow_name);
if let Some(a) = alerter {
a.send(AlertEvent::WorkflowFailed {
workflow: workflow_name.to_string(),
run_id: run_id.clone(),
});
}
return;
}
}
}
}
}
if is_workflow {
let is_branch_job = if let Ok(Some(wf_check)) = db.get_workflow_job_data(&job.id).await {
wf_check.branch_name.is_some()
} else {
false
};
let workflow_name = job.handler.split('/').next().unwrap_or("");
let step_name_part = job.handler.split('/').nth(1).unwrap_or("");
if !is_branch_job {
if let Some(wf) = workflows.get(workflow_name) {
if let Some(step) = wf.steps.iter().find(|s| s.name == step_name_part) {
match step.handler_type.as_str() {
"choice" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark choice job completed");
return;
}
if let Err(e) = handle_choice_step(
db,
metrics,
alerter,
workflows,
default_retry_max,
job,
step,
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle choice step");
}
return;
}
"parallel" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark parallel job completed");
return;
}
if let Err(e) =
handle_parallel_step(db, metrics, alerter, default_retry_max, job, step)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle parallel step");
}
return;
}
"map" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark map job completed");
return;
}
if let Err(e) =
handle_map_step(db, metrics, alerter, default_retry_max, job, step).await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle map step");
}
return;
}
"wait" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark wait job completed");
return;
}
if let Err(e) = handle_wait_step(
db,
metrics,
workflows,
default_retry_max,
job,
step,
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle wait step");
}
return;
}
"callback" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark callback job completed");
return;
}
if let Err(e) = handle_callback_step(
db,
http,
metrics,
workflows,
default_retry_max,
job,
step,
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle callback step");
}
return;
}
"workflow" => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark sub-workflow job completed");
return;
}
if let Err(e) = handle_subworkflow_step(
db,
metrics,
workflows,
default_retry_max,
job,
step,
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to handle sub-workflow step");
}
return;
}
_ => {} }
}
}
} }
let (transform, custom_headers, method) = if is_workflow {
let workflow_name = job.handler.split('/').next().unwrap_or("");
let step_name = job.handler.split('/').nth(1).unwrap_or("");
let wf = workflows.get(workflow_name);
let step = wf.and_then(|wf| wf.steps.iter().find(|s| s.name == step_name));
let wf_data = db.get_workflow_job_data(&job.id).await.ok().flatten();
let branch_name = wf_data.as_ref().and_then(|d| d.branch_name.as_deref());
let (headers, method) = if let Some(bn) = branch_name {
let branch = step
.and_then(|s| s.branches.as_ref())
.and_then(|bs| bs.iter().find(|b| b.name == bn));
let h = branch.map(|b| &b.headers).filter(|h| !h.is_empty());
let m = branch.map_or("POST", |b| b.method.as_str());
(h.cloned(), m.to_string())
} else {
let h = step.map(|s| &s.headers).filter(|h| !h.is_empty());
let m = step.map_or("POST", |s| s.method.as_str());
(h.cloned(), m.to_string())
};
(None, headers, method)
} else {
(
transforms.get(&job.handler).map(|s| s.as_str()),
handler_headers.get(&job.handler).cloned(),
handler_methods
.get(&job.handler)
.cloned()
.unwrap_or_else(|| "POST".into()),
)
};
let result = if is_workflow {
deliver_workflow_step(db, http, job, custom_headers.as_ref(), &method).await
} else if is_outbound {
deliver_outbound(db, http, job).await
} else {
deliver(db, http, job, transform, custom_headers.as_ref(), &method).await
};
let duration_ms = i64::try_from(start.elapsed().as_millis()).unwrap_or(i64::MAX);
match result {
Ok(delivery_result) => {
let status_code = delivery_result.status_code;
let attempt_id = ulid::Ulid::new().to_string();
if let Err(e) = db
.insert_attempt(
&attempt_id,
&job.id,
job.attempt + 1,
Some(status_code as i32),
delivery_result.response_body.as_deref(),
None,
duration_ms,
)
.await
{
metrics.inc_db_errors();
tracing::error!(job_id = job.id, error = %e, "Failed to insert attempt");
return;
}
if (200..300).contains(&status_code) {
metrics.inc_delivery_success_for(&job.handler, duration_ms.max(0) as u64);
if let Some(cb) = circuit_breaker {
cb.record_success();
}
if let Err(e) = db.mark_job_completed(&job.id).await {
metrics.inc_db_errors();
tracing::error!(job_id = job.id, error = %e, "Failed to mark completed");
}
if is_workflow {
if let Err(e) = handle_workflow_step_success(
db,
metrics,
alerter,
workflows,
default_retry_max,
job,
delivery_result.response_body.as_deref(),
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to advance workflow");
}
}
tracing::info!(
job_id = job.id,
handler = job.handler,
status = status_code,
duration_ms,
"Job completed"
);
} else {
metrics.inc_delivery_failure_for(&job.handler, duration_ms.max(0) as u64);
let error_type = if (400..500).contains(&status_code) {
"4xx"
} else {
"5xx"
};
metrics.inc_delivery_error_type(error_type);
let error = format!("HTTP {status_code}");
if is_workflow {
handle_workflow_step_failure(
db,
metrics,
alerter,
workflows,
default_retry_max,
job,
&error,
error_type,
)
.await;
} else {
if let Some(cb) = circuit_breaker {
if cb.record_failure() {
tracing::warn!(handler = job.handler, "Circuit breaker opened");
metrics.inc_circuit_opened(&job.handler);
if let Some(a) = alerter {
a.send(AlertEvent::CircuitOpened {
handler: job.handler.clone(),
});
}
}
}
match handle_failure(db, job, &error, delivery_result.retry_after_secs).await {
Ok(true) => {
metrics.inc_dlq(&job.handler);
if let Some(a) = alerter {
a.send(AlertEvent::Dlq {
job_id: job.id.clone(),
handler: job.handler.clone(),
attempts: job.attempt + 1,
});
}
}
Ok(false) => {}
Err(e) => {
tracing::error!(job_id = job.id, error = %e, "Failed to handle failure")
}
}
}
}
}
Err(e) => {
let error = e.to_string();
let attempt_id = ulid::Ulid::new().to_string();
metrics.inc_delivery_failure_for(&job.handler, duration_ms.max(0) as u64);
let error_type = if e
.downcast_ref::<reqwest::Error>()
.is_some_and(|re| re.is_timeout())
{
"timeout"
} else {
"network"
};
metrics.inc_delivery_error_type(error_type);
let _ = db
.insert_attempt(
&attempt_id,
&job.id,
job.attempt + 1,
None,
None,
Some(&error),
duration_ms,
)
.await;
if is_workflow {
handle_workflow_step_failure(
db,
metrics,
alerter,
workflows,
default_retry_max,
job,
&error,
error_type,
)
.await;
} else {
if let Some(cb) = circuit_breaker {
if cb.record_failure() {
tracing::warn!(handler = job.handler, "Circuit breaker opened");
metrics.inc_circuit_opened(&job.handler);
if let Some(a) = alerter {
a.send(AlertEvent::CircuitOpened {
handler: job.handler.clone(),
});
}
}
}
match handle_failure(db, job, &error, None).await {
Ok(true) => metrics.inc_dlq(&job.handler),
Ok(false) => {}
Err(e) => {
tracing::error!(job_id = job.id, error = %e, "Failed to handle failure")
}
}
}
}
}
}
struct DeliveryResult {
status_code: u16,
response_body: Option<String>,
retry_after_secs: Option<i64>,
}
async fn deliver_workflow_step(
db: &Database,
http: &reqwest::Client,
job: &crate::db::JobRow,
custom_headers: Option<&HashMap<String, String>>,
method: &str,
) -> Result<DeliveryResult> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let payload = wf_data
.and_then(|d| d.step_input)
.unwrap_or_else(|| "{}".to_string());
let reqwest_method = match method {
"GET" => reqwest::Method::GET,
"PUT" => reqwest::Method::PUT,
"PATCH" => reqwest::Method::PATCH,
"DELETE" => reqwest::Method::DELETE,
_ => reqwest::Method::POST,
};
let mut request = http
.request(reqwest_method.clone(), &job.url)
.header("Content-Type", "application/json")
.header("X-Qhook-Job-ID", &job.id)
.header("X-Qhook-Event-ID", &job.event_id)
.header("X-Qhook-Handler", &job.handler)
.header("X-Qhook-Attempt", (job.attempt + 1).to_string());
{
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(format!("{}:{}", job.event_id, job.handler));
let delivery_id = hex::encode(&hasher.finalize()[..16]);
request = request.header("X-Qhook-Delivery-ID", &delivery_id);
}
if let Some(ch) = custom_headers {
for (key, value) in ch {
request = request.header(key.as_str(), value.as_str());
}
}
let response = if reqwest_method == reqwest::Method::GET {
request.send().await?
} else {
request.body(payload).send().await?
};
let status_code = response.status().as_u16();
let retry_after_secs = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(parse_retry_after);
let body = response.text().await.ok();
Ok(DeliveryResult {
status_code,
response_body: body,
retry_after_secs,
})
}
async fn handle_workflow_step_success(
db: &Database,
metrics: &Metrics,
alerter: &SharedAlerter,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
response_body: Option<&str>,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()), };
if wf_data.branch_name.is_some() {
return handle_branch_completion(
db,
metrics,
workflows,
default_retry_max,
job,
&wf_data,
response_body,
)
.await;
}
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let step_index = wf_data.step_index.unwrap_or(0);
if let Some(body) = response_body {
db.save_step_output(&job.id, body).await?;
}
let workflow_name = job.handler.split('/').next().unwrap_or("");
let workflow = match workflows.get(workflow_name) {
Some(w) => w,
None => {
tracing::error!(workflow = workflow_name, "Workflow config not found");
return Ok(());
}
};
let current_step = workflow
.steps
.iter()
.find(|s| Some(s.name.as_str()) == wf_data.step_name.as_deref());
let step_input_payload = wf_data.step_input.as_deref().unwrap_or("{}");
let merged_payload = if let Some(step) = current_step {
merge_result_path(
step_input_payload,
response_body.unwrap_or("{}"),
step.result_path.as_deref(),
)
} else {
response_body.unwrap_or("{}").to_string()
};
let output_payload = if let Some(step) = current_step
&& let Some(ref output_template) = step.output
{
crate::api::apply_transform(&merged_payload, output_template)
} else {
merged_payload
};
metrics.inc_workflow_step_completed(workflow_name);
if let Ok(Some(timeout_at)) = db.get_workflow_timeout(run_id).await {
let now = crate::db::format_now();
if now > timeout_at {
db.fail_workflow_run(run_id).await?;
metrics.inc_workflow_failed(workflow_name);
if let Some(a) = alerter {
a.send(AlertEvent::WorkflowFailed {
workflow: workflow_name.to_string(),
run_id: run_id.to_string(),
});
}
tracing::warn!(workflow = workflow_name, run_id, "Workflow timed out");
return Ok(());
}
}
if current_step.is_some_and(|s| s.end) {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
tracing::info!(
workflow = workflow_name,
run_id,
"Workflow completed (end step)"
);
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(());
}
let next_index = (step_index + 1).max(0) as usize;
if next_index >= workflow.steps.len() {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
tracing::info!(workflow = workflow_name, run_id, "Workflow completed");
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(());
}
let next_step = &workflow.steps[next_index];
db.update_workflow_run_step(run_id, &next_step.name).await?;
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, next_step.name);
let max_attempts = next_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
let next_input = match &next_step.input {
Some(template) => crate::api::apply_transform(&output_payload, template),
None => output_payload,
};
db.insert_workflow_job(
&next_job_id,
&job.event_id,
&next_handler,
next_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&next_step.name,
i32::try_from(next_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
job_id = next_job_id,
step = next_step.name,
"Next workflow step job created"
);
Ok(())
}
fn merge_result_path(input: &str, response: &str, result_path: Option<&str>) -> String {
match result_path {
None | Some("$") => {
response.to_string()
}
Some("null") => {
input.to_string()
}
Some(path) if path.starts_with("$.") => {
let field = &path[2..];
let mut input_json: serde_json::Value = serde_json::from_str(input)
.unwrap_or(serde_json::Value::Object(Default::default()));
let response_json: serde_json::Value =
serde_json::from_str(response).unwrap_or(serde_json::Value::Null);
if let serde_json::Value::Object(ref mut map) = input_json {
map.insert(field.to_string(), response_json);
}
serde_json::to_string(&input_json).unwrap_or_else(|_| input.to_string())
}
_ => response.to_string(),
}
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn handle_workflow_step_failure(
db: &Database,
metrics: &Metrics,
alerter: &SharedAlerter,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
error: &str,
error_type: &str,
) {
let workflow_name = job.handler.split('/').next().unwrap_or("");
let step_name = job.handler.split('/').nth(1).unwrap_or("");
let workflow = match workflows.get(workflow_name) {
Some(w) => w,
None => {
let _ = handle_failure(db, job, error, None).await;
return;
}
};
let step = workflow.steps.iter().find(|s| s.name == step_name);
let should_retry = step
.and_then(|s| s.retry.as_ref())
.map(|r| error_type_matches(&r.errors, error_type))
.unwrap_or(true);
let current_attempt = job.attempt + 1;
if should_retry && current_attempt < job.max_attempts {
let base_backoff = 30i64 * (1i64 << current_attempt.min(10));
let jitter = rand::Rng::random_range(&mut rand::rng(), 0.8..1.2);
let backoff_secs = ((base_backoff as f64) * jitter) as i64;
let next_at = Utc::now().naive_utc() + chrono::Duration::seconds(backoff_secs);
if let Err(e) = db.mark_job_retryable(&job.id, next_at, error).await {
tracing::error!(job_id = job.id, error = %e, "Failed to schedule retry");
}
return;
}
if let Some(step) = step
&& let Some(ref catches) = step.catch
{
for c in catches {
if error_type_matches(&c.errors, error_type) {
if let Err(e) = route_to_catch_step(
db,
metrics,
workflows,
default_retry_max,
workflow_name,
&c.goto,
job,
error,
)
.await
{
tracing::error!(
job_id = job.id,
goto = c.goto,
error = %e,
"Failed to route to catch step"
);
}
return;
}
}
}
let on_failure = step
.map(|s| &s.on_failure)
.unwrap_or(&config::OnFailure::Stop);
match on_failure {
config::OnFailure::Continue => {
if let Err(e) = db.mark_job_completed(&job.id).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark completed");
}
let error_payload = serde_json::json!({
"error": error,
"failed_step": step_name,
})
.to_string();
if let Err(e) = handle_workflow_step_success(
db,
metrics,
alerter,
workflows,
default_retry_max,
job,
Some(&error_payload),
)
.await
{
tracing::error!(job_id = job.id, error = %e, "Failed to continue after failure");
}
}
config::OnFailure::Stop => {
if let Err(e) = db.mark_job_dead(&job.id, error).await {
tracing::error!(job_id = job.id, error = %e, "Failed to mark dead");
}
metrics.inc_dlq(&job.handler);
if let Some(a) = alerter {
a.send(AlertEvent::Dlq {
job_id: job.id.clone(),
handler: job.handler.clone(),
attempts: current_attempt,
});
}
if let Ok(Some(wf_data)) = db.get_workflow_job_data(&job.id).await {
if let Some(run_id) = &wf_data.workflow_run_id {
let _ = db.fail_workflow_run(run_id).await;
metrics.inc_workflow_failed(workflow_name);
if let Some(a) = alerter {
a.send(AlertEvent::WorkflowFailed {
workflow: workflow_name.to_string(),
run_id: run_id.clone(),
});
}
tracing::warn!(
workflow = workflow_name,
run_id,
step = step_name,
"Workflow failed"
);
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn route_to_catch_step(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
workflow_name: &str,
goto_step_name: &str,
job: &crate::db::JobRow,
error: &str,
) -> Result<()> {
db.mark_job_completed(&job.id).await?;
let workflow = workflows
.get(workflow_name)
.ok_or_else(|| anyhow::anyhow!("Workflow '{}' not found", workflow_name))?;
let (goto_index, goto_step) = workflow
.steps
.iter()
.enumerate()
.find(|(_, s)| s.name == goto_step_name)
.ok_or_else(|| anyhow::anyhow!("Step '{}' not found", goto_step_name))?;
let wf_data = db.get_workflow_job_data(&job.id).await?;
let run_id = wf_data
.as_ref()
.and_then(|d| d.workflow_run_id.as_deref())
.unwrap_or("");
db.update_workflow_run_step(run_id, goto_step_name).await?;
let error_payload = serde_json::json!({
"error": error,
"failed_step": job.handler.split('/').nth(1).unwrap_or(""),
"job_id": job.id,
})
.to_string();
let next_input = match &goto_step.input {
Some(template) => crate::api::apply_transform(&error_payload, template),
None => error_payload,
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, goto_step.name);
let max_attempts = goto_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&next_job_id,
&job.event_id,
&next_handler,
goto_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&goto_step.name,
i32::try_from(goto_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
job_id = next_job_id,
step = goto_step.name,
"Routed to catch step"
);
Ok(())
}
async fn handle_choice_step(
db: &Database,
metrics: &Metrics,
alerter: &SharedAlerter,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let workflow_name = job.handler.split('/').next().unwrap_or("");
let workflow = workflows
.get(workflow_name)
.ok_or_else(|| anyhow::anyhow!("Workflow '{}' not found", workflow_name))?;
let mut goto = None;
if let Some(ref choices) = step.choices {
for choice in choices {
if crate::api::evaluate_filter_pub(payload, &choice.when) {
goto = Some(choice.goto.as_str());
break;
}
}
}
if goto.is_none() {
goto = step.default.as_deref();
}
let goto = match goto {
Some(g) => g,
None => {
db.fail_workflow_run(run_id).await?;
metrics.inc_workflow_failed(workflow_name);
if let Some(a) = alerter {
a.send(AlertEvent::WorkflowFailed {
workflow: workflow_name.to_string(),
run_id: run_id.to_string(),
});
}
tracing::warn!(
workflow = workflow_name,
run_id,
step = step.name,
"Choice step: no matching condition and no default"
);
return Ok(());
}
};
let (goto_index, goto_step) = workflow
.steps
.iter()
.enumerate()
.find(|(_, s)| s.name == goto)
.ok_or_else(|| anyhow::anyhow!("Step '{}' not found", goto))?;
db.update_workflow_run_step(run_id, goto).await?;
let next_input = match &goto_step.input {
Some(template) => crate::api::apply_transform(payload, template),
None => payload.to_string(),
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, goto_step.name);
let max_attempts = goto_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&next_job_id,
&job.event_id,
&next_handler,
goto_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&goto_step.name,
i32::try_from(goto_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
job_id = next_job_id,
step = goto_step.name,
choice_from = step.name,
"Choice step routed"
);
Ok(())
}
async fn handle_parallel_step(
db: &Database,
metrics: &Metrics,
_alerter: &SharedAlerter,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let step_index = wf_data.step_index.unwrap_or(0);
let workflow_name = job.handler.split('/').next().unwrap_or("");
let branches = match &step.branches {
Some(b) => b,
None => return Ok(()),
};
db.set_parallel_state(run_id, &step.name, i32::try_from(branches.len()).unwrap_or(i32::MAX))
.await?;
let max_attempts = step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
for branch in branches {
let branch_job_id = ulid::Ulid::new().to_string();
let handler = format!("{}/{}", workflow_name, step.name);
let branch_input = match &step.input {
Some(template) => crate::api::apply_transform(payload, template),
None => payload.to_string(),
};
db.insert_branch_job(
&branch_job_id,
&job.event_id,
&handler,
&branch.url,
max_attempts,
run_id,
&step.name,
step_index,
Some(&branch_input),
&branch.name,
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
job_id = branch_job_id,
step = step.name,
branch = branch.name,
"Parallel branch job created"
);
}
Ok(())
}
async fn handle_map_step(
db: &Database,
metrics: &Metrics,
alerter: &SharedAlerter,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let step_index = wf_data.step_index.unwrap_or(0);
let workflow_name = job.handler.split('/').next().unwrap_or("");
let items_path = match &step.items_path {
Some(p) => p,
None => return Ok(()),
};
let url = step.url.as_deref().unwrap_or("");
let json: serde_json::Value = serde_json::from_str(payload)?;
let items = crate::api::resolve_path_pub(&json, items_path);
let items = match items {
Some(serde_json::Value::Array(arr)) => arr.clone(),
_ => {
tracing::warn!(
workflow = workflow_name,
run_id,
items_path,
"Map step: items_path does not resolve to an array"
);
vec![]
}
};
if items.is_empty() {
db.save_step_output(&job.id, "[]").await?;
return handle_workflow_step_success(
db,
metrics,
alerter,
&HashMap::new(), default_retry_max,
job,
Some("[]"),
)
.await;
}
db.set_parallel_state(run_id, &step.name, i32::try_from(items.len()).unwrap_or(i32::MAX))
.await?;
let max_attempts = step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
for (i, item) in items.iter().enumerate() {
let item_job_id = ulid::Ulid::new().to_string();
let handler = format!("{}/{}", workflow_name, step.name);
let item_payload = serde_json::to_string(item)?;
let branch_name = format!("{}", i);
db.insert_branch_job(
&item_job_id,
&job.event_id,
&handler,
url,
max_attempts,
run_id,
&step.name,
step_index,
Some(&item_payload),
&branch_name,
)
.await?;
metrics.inc_jobs_created();
}
tracing::info!(
workflow = workflow_name,
run_id,
step = step.name,
count = items.len(),
"Map step: created jobs for items"
);
Ok(())
}
async fn handle_branch_completion(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
wf_data: &crate::db::WorkflowJobRow,
response_body: Option<&str>,
) -> Result<()> {
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let step_name = wf_data.step_name.as_deref().unwrap_or("");
let step_index = wf_data.step_index.unwrap_or(0);
let workflow_name = job.handler.split('/').next().unwrap_or("");
if let Some(body) = response_body {
db.save_step_output(&job.id, body).await?;
}
let (completed, total) = db.increment_parallel_completed(run_id).await?;
tracing::info!(
workflow = workflow_name,
run_id,
branch = wf_data.branch_name.as_deref().unwrap_or("?"),
completed,
total,
"Branch completed"
);
if completed < total {
return Ok(()); }
let branch_outputs = db.get_branch_outputs(run_id, step_name).await?;
let workflow = workflows.get(workflow_name);
let step = workflow.and_then(|w| w.steps.iter().find(|s| s.name == step_name));
let is_map = step.is_some_and(|s| s.handler_type == "map");
let merged = if is_map {
let arr: Vec<serde_json::Value> = branch_outputs
.iter()
.map(|(_, output)| {
output
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(serde_json::Value::Null)
})
.collect();
serde_json::to_string(&arr)?
} else {
let mut obj = serde_json::Map::new();
for (branch_name, output) in &branch_outputs {
let val = output
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(serde_json::Value::Null);
obj.insert(branch_name.clone(), val);
}
serde_json::to_string(&serde_json::Value::Object(obj))?
};
db.clear_parallel_state(run_id).await?;
let original_input = wf_data.step_input.as_deref().unwrap_or("{}");
let result_path = step.and_then(|s| s.result_path.as_deref());
let merged_payload = merge_result_path(original_input, &merged, result_path);
let output_payload = if let Some(step) = step
&& let Some(ref output_template) = step.output
{
crate::api::apply_transform(&merged_payload, output_template)
} else {
merged_payload
};
metrics.inc_workflow_step_completed(workflow_name);
if step.is_some_and(|s| s.end) {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
tracing::info!(
workflow = workflow_name,
run_id,
"Workflow completed (parallel/map end step)"
);
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(());
}
let workflow = match workflows.get(workflow_name) {
Some(w) => w,
None => return Ok(()),
};
let next_index = (step_index + 1).max(0) as usize;
if next_index >= workflow.steps.len() {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
tracing::info!(workflow = workflow_name, run_id, "Workflow completed");
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(());
}
let next_step = &workflow.steps[next_index];
db.update_workflow_run_step(run_id, &next_step.name).await?;
let next_input = match &next_step.input {
Some(template) => crate::api::apply_transform(&output_payload, template),
None => output_payload,
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, next_step.name);
let max_attempts = next_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&next_job_id,
&job.event_id,
&next_handler,
next_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&next_step.name,
i32::try_from(next_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
job_id = next_job_id,
step = next_step.name,
"Next step after parallel/map"
);
Ok(())
}
async fn handle_wait_step(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let step_index = wf_data.step_index.unwrap_or(0);
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let workflow_name = job.handler.split('/').next().unwrap_or("");
let workflow = match workflows.get(workflow_name) {
Some(w) => w,
None => return Ok(()),
};
let wait_until = if let Some(seconds) = step.seconds {
Utc::now().naive_utc() + chrono::Duration::seconds(i64::try_from(seconds).unwrap_or(i64::MAX))
} else if let Some(ref ts_path) = step.timestamp_path {
let json: serde_json::Value = serde_json::from_str(payload)?;
let ts_value = crate::api::resolve_path_pub(&json, ts_path);
match ts_value {
Some(serde_json::Value::String(ts)) => {
chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%dT%H:%M:%S%.3f")
.or_else(|_| chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%dT%H:%M:%SZ"))
.or_else(|e| {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&ts) {
Ok(dt.naive_utc())
} else {
Err(e)
}
})
.unwrap_or_else(|_| {
tracing::warn!(
workflow = workflow_name,
run_id,
timestamp = ts,
"Wait step: failed to parse timestamp, using now"
);
Utc::now().naive_utc()
})
}
Some(serde_json::Value::Number(n)) => {
let ts = n.as_i64().unwrap_or(0);
chrono::DateTime::from_timestamp(ts, 0)
.map(|dt| dt.naive_utc())
.unwrap_or_else(|| Utc::now().naive_utc())
}
_ => {
tracing::warn!(
workflow = workflow_name,
run_id,
path = ts_path.as_str(),
"Wait step: timestamp_path not found or invalid"
);
Utc::now().naive_utc()
}
}
} else {
Utc::now().naive_utc()
};
let scheduled_at = crate::db::format_dt(wait_until);
metrics.inc_workflow_step_completed(workflow_name);
tracing::info!(
workflow = workflow_name,
run_id,
step = step.name,
scheduled_at = scheduled_at,
"Wait step completed, scheduling next step"
);
let next_index = (step_index + 1).max(0) as usize;
if next_index >= workflow.steps.len() {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
payload,
)
.await?;
return Ok(());
}
let next_step = &workflow.steps[next_index];
db.update_workflow_run_step(run_id, &next_step.name).await?;
let next_input = match &next_step.input {
Some(template) => crate::api::apply_transform(payload, template),
None => payload.to_string(),
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, next_step.name);
let max_attempts = next_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job_delayed(
&next_job_id,
&job.event_id,
&next_handler,
next_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&next_step.name,
i32::try_from(next_index).unwrap_or(i32::MAX),
Some(&next_input),
&scheduled_at,
)
.await?;
metrics.inc_jobs_created();
Ok(())
}
async fn handle_callback_step(
db: &Database,
http: &reqwest::Client,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let step_index = wf_data.step_index.unwrap_or(0);
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let workflow_name = job.handler.split('/').next().unwrap_or("");
let _ = workflows
.get(workflow_name)
.ok_or_else(|| anyhow::anyhow!("Workflow '{}' not found", workflow_name))?;
let callback_token = format!("{}{}", ulid::Ulid::new(), ulid::Ulid::new());
let callback_job_id = ulid::Ulid::new().to_string();
let handler = format!("{}/{}", workflow_name, step.name);
let max_attempts = step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
let timeout_at = step.callback_timeout.map(|secs| {
crate::db::format_dt(Utc::now().naive_utc() + chrono::Duration::seconds(i64::try_from(secs).unwrap_or(i64::MAX)))
});
db.insert_callback_job(
&callback_job_id,
&job.event_id,
&handler,
max_attempts,
run_id,
&step.name,
step_index,
Some(payload),
&callback_token,
timeout_at.as_deref(),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
step = step.name,
token_prefix = &callback_token[..8],
"Callback step: waiting for external callback"
);
if let Some(ref url) = step.url {
let notify_payload = serde_json::json!({
"callback_token": callback_token,
"workflow": workflow_name,
"step": step.name,
"run_id": run_id,
"payload": serde_json::from_str::<serde_json::Value>(payload).unwrap_or_default(),
});
let mut request = http
.post(url)
.header("Content-Type", "application/json")
.header("X-Qhook-Callback-Token", &callback_token);
for (key, value) in &step.headers {
request = request.header(key.as_str(), value.as_str());
}
match request.json(¬ify_payload).send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!(
workflow = workflow_name,
step = step.name,
url,
"Callback token notified to external service"
);
}
Ok(resp) => {
tracing::warn!(
workflow = workflow_name,
step = step.name,
url,
status = resp.status().as_u16(),
"Failed to notify callback token (non-2xx)"
);
}
Err(e) => {
tracing::warn!(
workflow = workflow_name,
step = step.name,
url,
error = %e,
"Failed to notify callback token"
);
}
}
}
Ok(())
}
pub async fn resume_callback(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
token: &str,
callback_payload: &str,
) -> Result<bool> {
let job_id = match db.resume_callback_job(token, callback_payload).await? {
Some(id) => id,
None => return Ok(false),
};
metrics.inc_callbacks_received();
let job = db
.get_callback_job(token)
.await?
.ok_or_else(|| anyhow::anyhow!("Callback job not found after resume"))?;
let wf_data = db.get_workflow_job_data(&job_id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(true),
};
let run_id = wf_data.workflow_run_id.as_ref().unwrap();
let step_index = wf_data.step_index.unwrap_or(0);
let workflow_name = job.handler.split('/').next().unwrap_or("");
let workflow = match workflows.get(workflow_name) {
Some(w) => w,
None => return Ok(true),
};
let step_name = wf_data.step_name.as_deref().unwrap_or("");
let current_step = workflow.steps.iter().find(|s| s.name == step_name);
let step_input_payload = wf_data.step_input.as_deref().unwrap_or("{}");
let merged_payload = if let Some(step) = current_step {
merge_result_path(
step_input_payload,
callback_payload,
step.result_path.as_deref(),
)
} else {
callback_payload.to_string()
};
let output_payload = if let Some(step) = current_step
&& let Some(ref output_template) = step.output
{
crate::api::apply_transform(&merged_payload, output_template)
} else {
merged_payload
};
metrics.inc_workflow_step_completed(workflow_name);
if current_step.is_some_and(|s| s.end) {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(true);
}
let next_index = (step_index + 1).max(0) as usize;
if next_index >= workflow.steps.len() {
db.complete_workflow_run(run_id).await?;
metrics.inc_workflow_completed(workflow_name);
resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
run_id,
&job.event_id,
&output_payload,
)
.await?;
return Ok(true);
}
let next_step = &workflow.steps[next_index];
db.update_workflow_run_step(run_id, &next_step.name).await?;
let next_input = match &next_step.input {
Some(template) => crate::api::apply_transform(&output_payload, template),
None => output_payload,
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", workflow_name, next_step.name);
let max_attempts = next_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&next_job_id,
&job.event_id,
&next_handler,
next_step.url.as_deref().unwrap_or(""),
max_attempts,
run_id,
&next_step.name,
i32::try_from(next_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
workflow = workflow_name,
run_id,
step = next_step.name,
"Callback received, advancing workflow"
);
Ok(true)
}
async fn handle_subworkflow_step(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
job: &crate::db::JobRow,
step: &config::StepConfig,
) -> Result<()> {
let wf_data = db.get_workflow_job_data(&job.id).await?;
let wf_data = match wf_data {
Some(d) if d.workflow_run_id.is_some() => d,
_ => return Ok(()),
};
let parent_run_id = wf_data.workflow_run_id.as_ref().unwrap();
let parent_step_index = wf_data.step_index.unwrap_or(0);
let payload = wf_data.step_input.as_deref().unwrap_or("{}");
let parent_workflow_name = job.handler.split('/').next().unwrap_or("");
let sub_workflow_name = step
.workflow
.as_deref()
.ok_or_else(|| anyhow::anyhow!("Sub-workflow step has no workflow name"))?;
let sub_workflow = workflows
.get(sub_workflow_name)
.ok_or_else(|| anyhow::anyhow!("Sub-workflow '{}' not found", sub_workflow_name))?;
if sub_workflow.steps.is_empty() {
anyhow::bail!("Sub-workflow '{}' has no steps", sub_workflow_name);
}
let sub_run_id = ulid::Ulid::new().to_string();
let first_step = &sub_workflow.steps[0];
db.insert_sub_workflow_run(
&sub_run_id,
sub_workflow_name,
&job.event_id,
&first_step.name,
parent_run_id,
parent_step_index,
)
.await?;
let first_input = match &first_step.input {
Some(template) => crate::api::apply_transform(payload, template),
None => payload.to_string(),
};
let first_job_id = ulid::Ulid::new().to_string();
let first_handler = format!("{}/{}", sub_workflow_name, first_step.name);
let max_attempts = first_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&first_job_id,
&job.event_id,
&first_handler,
first_step.url.as_deref().unwrap_or(""),
max_attempts,
&sub_run_id,
&first_step.name,
0,
Some(&first_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
parent_workflow = parent_workflow_name,
parent_run_id,
sub_workflow = sub_workflow_name,
sub_run_id,
step = first_step.name,
"Sub-workflow launched"
);
Ok(())
}
async fn resume_parent_workflow(
db: &Database,
metrics: &Metrics,
workflows: &HashMap<String, config::WorkflowConfig>,
default_retry_max: u32,
run_id: &str,
event_id: &str,
output_payload: &str,
) -> Result<()> {
let parent = db.get_parent_workflow_run(run_id).await?;
let (parent_run_id, parent_step_index) = match parent {
Some(p) => p,
None => return Ok(()), };
let parent_run = db
.get_workflow_run(&parent_run_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Parent workflow run '{}' not found", parent_run_id))?;
let parent_workflow_name = &parent_run.workflow;
let parent_workflow = match workflows.get(parent_workflow_name.as_str()) {
Some(w) => w,
None => return Ok(()),
};
let next_index = (parent_step_index + 1).max(0) as usize;
if next_index >= parent_workflow.steps.len() {
db.complete_workflow_run(&parent_run_id).await?;
metrics.inc_workflow_completed(parent_workflow_name);
tracing::info!(
workflow = parent_workflow_name.as_str(),
run_id = parent_run_id,
"Parent workflow completed (sub-workflow was last step)"
);
return Box::pin(resume_parent_workflow(
db,
metrics,
workflows,
default_retry_max,
&parent_run_id,
event_id,
output_payload,
))
.await;
}
let next_step = &parent_workflow.steps[next_index];
db.update_workflow_run_step(&parent_run_id, &next_step.name)
.await?;
let next_input = match &next_step.input {
Some(template) => crate::api::apply_transform(output_payload, template),
None => output_payload.to_string(),
};
let next_job_id = ulid::Ulid::new().to_string();
let next_handler = format!("{}/{}", parent_workflow_name, next_step.name);
let max_attempts = next_step
.retry
.as_ref()
.map(|r| r.max)
.unwrap_or(default_retry_max);
db.insert_workflow_job(
&next_job_id,
event_id,
&next_handler,
next_step.url.as_deref().unwrap_or(""),
max_attempts,
&parent_run_id,
&next_step.name,
i32::try_from(next_index).unwrap_or(i32::MAX),
Some(&next_input),
)
.await?;
metrics.inc_jobs_created();
tracing::info!(
parent_workflow = parent_workflow_name.as_str(),
parent_run_id,
step = next_step.name,
"Sub-workflow completed, resuming parent workflow"
);
Ok(())
}
fn error_type_matches(errors: &[config::ErrorType], error_type: &str) -> bool {
errors.iter().any(|e| match e {
config::ErrorType::All => true,
config::ErrorType::Timeout => error_type == "timeout",
config::ErrorType::Http5xx => error_type == "5xx",
config::ErrorType::Http4xx => error_type == "4xx",
config::ErrorType::Network => error_type == "network",
})
}
async fn deliver(
db: &Database,
http: &reqwest::Client,
job: &crate::db::JobRow,
transform: Option<&str>,
custom_headers: Option<&HashMap<String, String>>,
method: &str,
) -> Result<DeliveryResult> {
let (raw_payload, headers_json) = db.get_event_data(&job.event_id).await?;
let payload = match transform {
Some(template) => crate::api::apply_transform(&raw_payload, template),
None => raw_payload,
};
deliver_http(http, job, &payload, &headers_json, custom_headers, method).await
}
async fn deliver_outbound(
db: &Database,
http: &reqwest::Client,
job: &crate::db::JobRow,
) -> Result<DeliveryResult> {
let endpoint_id = job
.handler
.strip_prefix("outbound/")
.unwrap_or(&job.handler);
let (payload, _headers_json) = db.get_event_data(&job.event_id).await?;
let signing_secret = db
.get_endpoint_secret(endpoint_id)
.await?
.unwrap_or_default();
let timestamp = chrono::Utc::now().timestamp();
let msg_id = &job.id;
let signature = crate::verify::sign_outbound_payload(
&signing_secret,
msg_id,
timestamp,
payload.as_bytes(),
);
let event_type: Option<String> =
sqlx::query_as::<_, (String,)>("SELECT event_type FROM events WHERE id = $1")
.bind(&job.event_id)
.fetch_optional(db.sqlx_pool())
.await?
.map(|r| r.0);
let mut request = http
.post(&job.url)
.header("Content-Type", "application/json")
.header("webhook-id", msg_id.as_str())
.header("webhook-timestamp", timestamp.to_string())
.header("webhook-signature", format!("v1,{}", signature))
.header("X-Qhook-Event-ID", &job.event_id);
if let Some(ref et) = event_type {
request = request.header("X-Qhook-Event-Type", et.as_str());
}
let response = request.body(payload).send().await?;
let retry_after_secs = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(parse_retry_after);
Ok(DeliveryResult {
status_code: response.status().as_u16(),
response_body: None,
retry_after_secs,
})
}
fn parse_retry_after(value: &str) -> Option<i64> {
value
.trim()
.parse::<i64>()
.ok()
.filter(|&s| s > 0 && s <= 86400)
}
async fn deliver_http(
http: &reqwest::Client,
job: &crate::db::JobRow,
payload: &str,
headers_json: &Option<String>,
custom_headers: Option<&HashMap<String, String>>,
method: &str,
) -> Result<DeliveryResult> {
let reqwest_method = match method {
"GET" => reqwest::Method::GET,
"PUT" => reqwest::Method::PUT,
"PATCH" => reqwest::Method::PATCH,
"DELETE" => reqwest::Method::DELETE,
_ => reqwest::Method::POST,
};
let mut request = http
.request(reqwest_method.clone(), &job.url)
.header("Content-Type", "application/json")
.header("X-Qhook-Job-ID", &job.id)
.header("X-Qhook-Event-ID", &job.event_id)
.header("X-Qhook-Handler", &job.handler)
.header("X-Qhook-Attempt", (job.attempt + 1).to_string());
{
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(format!("{}:{}", job.event_id, job.handler));
let delivery_id = hex::encode(&hasher.finalize()[..16]);
request = request.header("X-Qhook-Delivery-ID", &delivery_id);
}
if let Some(hj) = headers_json
&& let Ok(headers) = serde_json::from_str::<std::collections::HashMap<String, String>>(hj)
{
for (key, value) in &headers {
if key.starts_with("ce-") {
request = request.header(key.as_str(), value.as_str());
}
}
}
if let Some(ch) = custom_headers {
for (key, value) in ch {
request = request.header(key.as_str(), value.as_str());
}
}
let response = if reqwest_method == reqwest::Method::GET {
request.send().await?
} else {
request.body(payload.to_string()).send().await?
};
let retry_after_secs = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(parse_retry_after);
Ok(DeliveryResult {
status_code: response.status().as_u16(),
response_body: None,
retry_after_secs,
})
}
async fn handle_failure(
db: &Database,
job: &crate::db::JobRow,
error: &str,
retry_after_secs: Option<i64>,
) -> Result<bool> {
let current_attempt = job.attempt + 1;
if current_attempt >= job.max_attempts {
db.mark_job_dead(&job.id, error).await?;
tracing::warn!(
job_id = job.id,
handler = job.handler,
attempts = current_attempt,
"Job moved to DLQ"
);
Ok(true)
} else {
let backoff_secs = retry_after_secs.unwrap_or_else(|| {
let base_backoff = 30i64 * (1i64 << current_attempt.min(10));
let jitter = rand::Rng::random_range(&mut rand::rng(), 0.8..1.2);
((base_backoff as f64) * jitter) as i64
});
let next_at = Utc::now().naive_utc() + chrono::Duration::seconds(backoff_secs);
db.mark_job_retryable(&job.id, next_at, error).await?;
tracing::info!(
job_id = job.id,
handler = job.handler,
attempt = current_attempt,
next_retry_secs = backoff_secs,
retry_after = retry_after_secs.is_some(),
error,
"Job scheduled for retry"
);
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_result_path_default_replaces() {
let input = r#"{"id": "123", "amount": 5000}"#;
let response = r#"{"valid": true}"#;
assert_eq!(merge_result_path(input, response, None), response);
assert_eq!(merge_result_path(input, response, Some("$")), response);
}
#[test]
fn test_merge_result_path_null_discards() {
let input = r#"{"id": "123"}"#;
let response = r#"{"valid": true}"#;
assert_eq!(merge_result_path(input, response, Some("null")), input);
}
#[test]
fn test_merge_result_path_field_merges() {
let input = r#"{"id":"123","amount":5000}"#;
let response = r#"{"valid":true,"score":0.1}"#;
let result = merge_result_path(input, response, Some("$.validation"));
let json: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(json["id"], "123");
assert_eq!(json["amount"], 5000);
assert_eq!(json["validation"]["valid"], true);
assert_eq!(json["validation"]["score"], 0.1);
}
#[test]
fn test_error_type_matches_all() {
assert!(error_type_matches(&[config::ErrorType::All], "timeout"));
assert!(error_type_matches(&[config::ErrorType::All], "5xx"));
assert!(error_type_matches(&[config::ErrorType::All], "4xx"));
assert!(error_type_matches(&[config::ErrorType::All], "network"));
}
#[test]
fn test_error_type_matches_specific() {
let errors = vec![config::ErrorType::Timeout, config::ErrorType::Http5xx];
assert!(error_type_matches(&errors, "timeout"));
assert!(error_type_matches(&errors, "5xx"));
assert!(!error_type_matches(&errors, "4xx"));
assert!(!error_type_matches(&errors, "network"));
}
#[test]
fn test_error_type_matches_empty() {
assert!(!error_type_matches(&[], "timeout"));
}
#[test]
fn test_circuit_breaker_starts_closed() {
let cb = CircuitBreaker::new(3, Duration::from_secs(60));
assert_eq!(cb.state(), CircuitState::Closed);
assert!(cb.allow_request());
}
#[test]
fn test_circuit_breaker_opens_after_threshold() {
let cb = CircuitBreaker::new(3, Duration::from_secs(60));
assert!(!cb.record_failure()); assert!(!cb.record_failure()); assert!(cb.record_failure()); assert_eq!(cb.state(), CircuitState::Open);
assert!(!cb.allow_request());
}
#[test]
fn test_circuit_breaker_success_resets() {
let cb = CircuitBreaker::new(3, Duration::from_secs(60));
cb.record_failure();
cb.record_failure();
cb.record_success();
assert_eq!(cb.state(), CircuitState::Closed);
assert!(!cb.record_failure());
assert!(!cb.record_failure());
assert!(cb.record_failure());
assert_eq!(cb.state(), CircuitState::Open);
}
#[test]
fn test_circuit_breaker_half_open_after_cooldown() {
let cb = CircuitBreaker::new(2, Duration::from_millis(1));
cb.record_failure();
cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
std::thread::sleep(Duration::from_millis(5));
assert_eq!(cb.state(), CircuitState::HalfOpen);
assert!(cb.allow_request());
assert!(!cb.allow_request()); }
#[test]
fn test_circuit_breaker_half_open_success_closes() {
let cb = CircuitBreaker::new(2, Duration::from_millis(1));
cb.record_failure();
cb.record_failure();
std::thread::sleep(Duration::from_millis(5));
assert!(cb.allow_request()); cb.record_success();
assert_eq!(cb.state(), CircuitState::Closed);
assert!(cb.allow_request());
}
#[test]
fn test_circuit_breaker_half_open_failure_reopens() {
let cb = CircuitBreaker::new(2, Duration::from_millis(1));
cb.record_failure();
cb.record_failure();
std::thread::sleep(Duration::from_millis(5));
assert!(cb.allow_request()); cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
assert!(!cb.allow_request());
}
#[test]
fn test_parse_retry_after_valid() {
assert_eq!(parse_retry_after("120"), Some(120));
assert_eq!(parse_retry_after("1"), Some(1));
assert_eq!(parse_retry_after(" 30 "), Some(30));
}
#[test]
fn test_parse_retry_after_invalid() {
assert_eq!(parse_retry_after("0"), None);
assert_eq!(parse_retry_after("-1"), None);
assert_eq!(parse_retry_after("abc"), None);
assert_eq!(parse_retry_after(""), None);
}
#[test]
fn test_parse_retry_after_clamped() {
assert_eq!(parse_retry_after("86401"), None);
assert_eq!(parse_retry_after("86400"), Some(86400));
}
}