Skip to main content

runledger_runtime/
worker.rs

1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8    JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
22const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
23const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
24const HANDLER_PANIC_CODE: &str = "job.handler_panic";
25const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
26const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
27    "job.unstarted_claim_release_not_applicable";
28const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
29#[cfg(test)]
30const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
31#[cfg(not(test))]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
33
34pub async fn run_worker_loop(
35    pool: runledger_postgres::DbPool,
36    registry: JobRegistry,
37    config: JobsConfig,
38    mut shutdown: watch::Receiver<bool>,
39) {
40    let registry = Arc::new(registry);
41    let claimable_job_types = registry.registered_types();
42    let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
43    let mut join_set: JoinSet<()> = JoinSet::new();
44
45    loop {
46        drain_finished_tasks(&mut join_set).await;
47
48        if shutdown_requested_or_closed(&shutdown) {
49            break;
50        }
51
52        if claimable_job_types.is_empty() {
53            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
54                break;
55            }
56            continue;
57        }
58
59        let available = semaphore.available_permits();
60        if available == 0 {
61            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
62                break;
63            }
64            continue;
65        }
66
67        let claim_limit = min(available, config.claim_batch_size as usize);
68        let claimed = match jobs::claim_prestart_jobs_for_types(
69            &pool,
70            &config.worker_id,
71            config.lease_ttl_seconds,
72            claim_limit as i64,
73            &claimable_job_types,
74        )
75        .await
76        {
77            Ok(claimed) => claimed,
78            Err(error) => {
79                let error = WorkerError::ClaimJobs {
80                    worker_id: config.worker_id.clone(),
81                    source: error,
82                };
83                warn!(%error, "worker claim failed");
84                Vec::new()
85            }
86        };
87
88        if claimed.is_empty() {
89            wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
90            continue;
91        }
92
93        let claimed_len = claimed.len();
94        for job in claimed {
95            let permit = match Arc::clone(&semaphore).acquire_owned().await {
96                Ok(permit) => permit,
97                Err(_) => break,
98            };
99            let pool_clone = pool.clone();
100            let registry_clone = Arc::clone(&registry);
101            let lease_ttl_seconds = config.lease_ttl_seconds;
102            join_set.spawn(async move {
103                let _permit = permit;
104                process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
105            });
106        }
107
108        if claimed_len == claim_limit {
109            continue;
110        }
111
112        if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
113            break;
114        }
115    }
116
117    info!("worker shutdown requested; draining in-flight jobs");
118    while join_set.join_next().await.is_some() {}
119}
120
121async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
122    while let Some(result) = join_set.try_join_next() {
123        if let Err(error) = result {
124            error!(%error, "job task crashed");
125        }
126    }
127}
128
129fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
130    *shutdown.borrow() || shutdown.has_changed().is_err()
131}
132
133async fn wait_for_shutdown_or_poll(
134    shutdown: &mut watch::Receiver<bool>,
135    poll_interval: Duration,
136) -> bool {
137    tokio::select! {
138        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
139        _ = sleep(poll_interval) => false,
140    }
141}
142
143async fn process_claimed_job(
144    pool: runledger_postgres::DbPool,
145    registry: Arc<JobRegistry>,
146    job: jobs::JobQueueRecord,
147    lease_ttl_seconds: i32,
148) {
149    let worker_id = job
150        .worker_id
151        .clone()
152        .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
153
154    let job_span = info_span!(
155        "job",
156        sentry.name = %job.job_type,
157        sentry.op = "runledger.job",
158        job_id = %job.id,
159        job_type = %job.job_type,
160        run_number = job.run_number,
161        attempt = job.attempt,
162        organization_id = ?job.organization_id,
163        worker_id = %worker_id,
164    );
165    async {
166        let start = Instant::now();
167        let context = JobContext {
168            job_id: job.id,
169            run_number: job.run_number,
170            attempt: job.attempt,
171            organization_id: job.organization_id,
172            worker_id: worker_id.clone(),
173        };
174
175        if !mark_job_running_or_abort(&pool, &context, &job).await {
176            return;
177        }
178
179        match execute_job_handler_with_heartbeats(
180            pool.clone(),
181            Arc::clone(&registry),
182            &context,
183            &job,
184            lease_ttl_seconds,
185        )
186        .await
187        {
188            Ok(progress) => {
189                let completion = JobProgressUpdate {
190                    stage: Some(progress.stage),
191                    progress_done: progress.progress_done,
192                    progress_total: progress.progress_total,
193                    checkpoint: progress.checkpoint.as_ref(),
194                };
195                if let Err(error) = jobs::complete_job_success(
196                    &pool,
197                    job.id,
198                    job.run_number,
199                    job.attempt,
200                    &context.worker_id,
201                    Some(&completion),
202                )
203                .await
204                {
205                    let release_conflict = is_workflow_release_conflict_error(&error);
206                    let error = WorkerError::CompleteSuccess {
207                        job_id: job.id,
208                        attempt: job.attempt,
209                        source: error,
210                    };
211                    if release_conflict {
212                        warn!(
213                            %error,
214                            job_id = %job.id,
215                            "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
216                        );
217                    } else {
218                        error!(%error, job_id = %job.id, "failed to mark job success");
219                    }
220                }
221            }
222            Err(failure) => {
223                if is_lease_maintenance_failure(&failure) {
224                    warn!(
225                        job_id = %job.id,
226                        attempt = job.attempt,
227                        failure_code = failure.code,
228                        "job processing aborted because durable lease maintenance was lost"
229                    );
230                    return;
231                }
232
233                let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
234                    None
235                } else {
236                    Some(compute_retry_delay_ms(job.attempt, job.id))
237                };
238                let failure_payload = JobFailureUpdate {
239                    kind: failure.kind,
240                    code: failure.code,
241                    message: failure.message.as_ref(),
242                    retry_delay_ms,
243                };
244                let dead_letter = dead_letter_info(&job, &failure);
245                if let Err(error) = jobs::complete_job_failure(
246                    &pool,
247                    job.id,
248                    job.run_number,
249                    job.attempt,
250                    &context.worker_id,
251                    &failure_payload,
252                )
253                .await
254                {
255                    let release_conflict = is_workflow_release_conflict_error(&error);
256                    let error = WorkerError::CompleteFailure {
257                        job_id: job.id,
258                        attempt: job.attempt,
259                        source: error,
260                    };
261                    if release_conflict {
262                        warn!(
263                            %error,
264                            job_id = %job.id,
265                            "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
266                        );
267                    } else {
268                        error!(%error, job_id = %job.id, "failed to mark job failure");
269                    }
270                } else if let Some(dead_letter) = dead_letter {
271                    warn!(
272                        job_id = %job.id,
273                        job_type = %job.job_type,
274                        run_number = job.run_number,
275                        attempt = job.attempt,
276                        max_attempts = job.max_attempts,
277                        organization_id = ?job.organization_id,
278                        worker_id = %context.worker_id,
279                        dead_letter_reason = ?dead_letter.reason,
280                        failure_kind = ?dead_letter.failure.kind,
281                        failure_code = dead_letter.failure.code,
282                        failure_message = %dead_letter.failure.message,
283                        "job dead lettered after handler failure"
284                    );
285                    notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
286                        .await;
287                }
288            }
289        }
290
291        info!(
292            job_id = %job.id,
293            attempt = job.attempt,
294            run_number = job.run_number,
295            elapsed_ms = start.elapsed().as_millis(),
296            "job processed"
297        );
298    }
299    .instrument(job_span)
300    .await;
301}
302
303async fn mark_job_running_or_abort(
304    pool: &runledger_postgres::DbPool,
305    context: &JobContext,
306    job: &jobs::JobQueueRecord,
307) -> bool {
308    let running_progress = JobProgressUpdate {
309        stage: Some(runledger_core::jobs::JobStage::Running),
310        progress_done: None,
311        progress_total: None,
312        checkpoint: None,
313    };
314
315    let Err(source) = jobs::update_job_progress(
316        pool,
317        job.id,
318        job.run_number,
319        job.attempt,
320        &context.worker_id,
321        &running_progress,
322    )
323    .await
324    else {
325        return true;
326    };
327
328    handle_running_progress_persist_failure(pool, context, job, source).await;
329    false
330}
331
332async fn handle_running_progress_persist_failure(
333    pool: &runledger_postgres::DbPool,
334    context: &JobContext,
335    job: &jobs::JobQueueRecord,
336    source: runledger_postgres::Error,
337) {
338    let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
339    let error = WorkerError::SetRunningProgress {
340        job_id: job.id,
341        attempt: job.attempt,
342        source,
343    };
344
345    if lease_owner_mismatch {
346        warn!(
347            %error,
348            job_id = %job.id,
349            attempt = job.attempt,
350            "aborting job before execution because lease ownership was already lost"
351        );
352        return;
353    }
354
355    match jobs::release_unstarted_job_claim(
356        pool,
357        job.id,
358        job.run_number,
359        job.attempt,
360        &context.worker_id,
361        RUNNING_PROGRESS_PERSIST_FAILED_REASON,
362        UNSTARTED_CLAIM_RETRY_DELAY_MS,
363    )
364    .await
365    {
366        Ok(()) => {
367            warn!(
368                %error,
369                job_id = %job.id,
370                attempt = job.attempt,
371                "running progress could not be persisted; released unstarted claim back to pending"
372            );
373        }
374        Err(release_error) => {
375            let no_longer_releasable =
376                is_unstarted_claim_release_not_applicable_error(&release_error);
377            let release_error = WorkerError::ReleaseUnstartedClaim {
378                job_id: job.id,
379                attempt: job.attempt,
380                source: release_error,
381            };
382            if no_longer_releasable {
383                warn!(
384                    %error,
385                    %release_error,
386                    job_id = %job.id,
387                    attempt = job.attempt,
388                    "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
389                );
390                return;
391            }
392
393            warn!(
394                %error,
395                %release_error,
396                job_id = %job.id,
397                attempt = job.attempt,
398                "running progress could not be persisted; leaving claim for reaper recovery"
399            );
400        }
401    }
402}
403
404async fn execute_job_handler(
405    registry: Arc<JobRegistry>,
406    context: &JobContext,
407    job: &jobs::JobQueueRecord,
408) -> Result<JobProgress, JobFailure> {
409    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
410        return Err(JobFailure::terminal(
411            "job.handler_not_registered",
412            "No handler is registered for this job type.",
413        ));
414    };
415
416    handler
417        .execute(context.clone(), job.payload.clone())
418        .await?;
419
420    Ok(JobProgress {
421        stage: runledger_core::jobs::JobStage::Completed,
422        progress_done: None,
423        progress_total: None,
424        checkpoint: None,
425    })
426}
427
428async fn execute_job_handler_with_heartbeats(
429    pool: runledger_postgres::DbPool,
430    registry: Arc<JobRegistry>,
431    context: &JobContext,
432    job: &jobs::JobQueueRecord,
433    lease_ttl_seconds: i32,
434) -> Result<JobProgress, JobFailure> {
435    let mut execution =
436        Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
437    let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
438    let mut timeout = Box::pin(sleep_until(timeout_deadline));
439
440    let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
441    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
442    ticker.tick().await;
443
444    loop {
445        tokio::select! {
446            result = &mut execution => {
447                return match result {
448                    Ok(result) => result,
449                    Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
450                };
451            }
452            _ = &mut timeout => {
453                return Err(JobFailure::timeout(
454                    "job.timeout_exceeded",
455                    "Job exceeded the configured timeout.",
456                ));
457            }
458            _ = ticker.tick() => {
459                if let Err(error) = jobs::heartbeat_job(
460                    &pool,
461                    job.id,
462                    job.run_number,
463                    job.attempt,
464                    &context.worker_id,
465                    lease_ttl_seconds,
466                )
467                .await
468                {
469                    let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
470                    let error = WorkerError::Heartbeat {
471                        job_id: job.id,
472                        attempt: job.attempt,
473                        source: error,
474                    };
475
476                    if lease_owner_mismatch {
477                        warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
478                        return Err(lease_owner_mismatch_failure());
479                    }
480
481                    warn!(
482                        %error,
483                        job_id = %job.id,
484                        "aborting job because lease heartbeat could not be persisted"
485                    );
486                    return Err(lease_maintenance_failure());
487                }
488            }
489        }
490    }
491}
492
493fn lease_owner_mismatch_failure() -> JobFailure {
494    JobFailure::lease_expired(
495        LEASE_OWNER_MISMATCH_CODE,
496        "Job lease ownership was lost during processing.",
497    )
498}
499
500fn lease_maintenance_failure() -> JobFailure {
501    JobFailure::lease_expired(
502        LEASE_MAINTENANCE_FAILED_CODE,
503        "Job lease could not be durably maintained during processing.",
504    )
505}
506
507fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
508    JobFailure::panicked(
509        HANDLER_PANIC_CODE,
510        format!(
511            "Job handler panicked: {}",
512            panic_payload_message(&*panic_payload)
513        ),
514    )
515}
516
517fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
518    if let Some(message) = panic_payload.downcast_ref::<String>() {
519        return message.clone();
520    }
521
522    if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
523        return (*message).to_string();
524    }
525
526    "non-string panic payload".to_string()
527}
528
529fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
530    matches!(
531        error,
532        runledger_postgres::Error::QueryError(query_error)
533            if query_error.code() == expected_code
534    )
535}
536
537fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
538    has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
539}
540
541fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
542    has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
543}
544
545fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
546    has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
547}
548
549fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
550    matches!(
551        failure.code,
552        LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
553    )
554}
555
556fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
557    // Renew at one-third of the lease TTL so a delayed heartbeat still leaves
558    // time for subsequent renewals before the lease expires.
559    let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
560    Duration::from_secs(seconds)
561}
562
563fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
564    matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
565}
566
567fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
568    let reason = if is_non_retryable_failure_kind(failure.kind) {
569        Some(JobDeadLetterReason::FailureKindNonRetryable)
570    } else if job.attempt >= job.max_attempts {
571        Some(JobDeadLetterReason::AttemptsExhausted)
572    } else {
573        None
574    }?;
575
576    Some(JobDeadLetterInfo::new(
577        failure.clone(),
578        reason,
579        Some(job.max_attempts),
580    ))
581}
582
583async fn notify_handler_of_dead_letter(
584    registry: &JobRegistry,
585    context: &JobContext,
586    job: &jobs::JobQueueRecord,
587    dead_letter: JobDeadLetterInfo,
588) {
589    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
590        return;
591    };
592    let context = context.clone();
593    let payload = job.payload.clone();
594
595    let hook_task = tokio::spawn(async move {
596        tokio::time::timeout(
597            TERMINAL_HOOK_TIMEOUT,
598            handler.on_dead_letter(context, payload, dead_letter),
599        )
600        .await
601        .is_ok()
602    });
603    match hook_task.await {
604        Ok(true) => {}
605        Ok(false) => {
606            warn!(
607                job_id = %job.id,
608                job_type = %job.job_type,
609                run_number = job.run_number,
610                attempt = job.attempt,
611                timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
612                "dead-letter hook timed out; continuing worker job task"
613            );
614        }
615        Err(error) => log_dead_letter_hook_join_error(job, error),
616    }
617}
618
619fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
620    if error.is_panic() {
621        warn!(
622            job_id = %job.id,
623            job_type = %job.job_type,
624            run_number = job.run_number,
625            attempt = job.attempt,
626            error = %error,
627            "dead-letter hook panicked; continuing worker job task"
628        );
629    } else if error.is_cancelled() {
630        warn!(
631            job_id = %job.id,
632            job_type = %job.job_type,
633            run_number = job.run_number,
634            attempt = job.attempt,
635            error = %error,
636            "dead-letter hook was cancelled; continuing worker job task"
637        );
638    } else {
639        warn!(
640            job_id = %job.id,
641            job_type = %job.job_type,
642            run_number = job.run_number,
643            attempt = job.attempt,
644            error = %error,
645            "dead-letter hook join failed; continuing worker job task"
646        );
647    }
648}
649
650fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
651    let exp = attempt.clamp(1, 10) as u32;
652    let base_ms: i64 = 5_000;
653    let raw = base_ms * (1_i64 << exp);
654    let capped = raw.min(300_000);
655    let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
656    (capped + jitter).max(1_000) as i32
657}
658
659#[cfg(test)]
660mod tests;