Skip to main content

runledger_runtime/
worker.rs

1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8    JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
22const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
23const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
24const HANDLER_PANIC_CODE: &str = "job.handler_panic";
25const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
26const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
27    "job.unstarted_claim_release_not_applicable";
28const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
29#[cfg(test)]
30const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
31#[cfg(not(test))]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
33
34pub async fn run_worker_loop(
35    pool: runledger_postgres::DbPool,
36    registry: JobRegistry,
37    config: JobsConfig,
38    mut shutdown: watch::Receiver<bool>,
39) {
40    let registry = Arc::new(registry);
41    let claimable_job_types = registry.registered_types();
42    let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
43    let mut join_set: JoinSet<()> = JoinSet::new();
44
45    loop {
46        drain_finished_tasks(&mut join_set).await;
47
48        if shutdown_requested_or_closed(&shutdown) {
49            break;
50        }
51
52        if claimable_job_types.is_empty() {
53            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
54                break;
55            }
56            continue;
57        }
58
59        let available = semaphore.available_permits();
60        if available == 0 {
61            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
62                break;
63            }
64            continue;
65        }
66
67        let claim_limit = min(available, config.claim_batch_size as usize);
68        let claimed = match jobs::claim_prestart_jobs_for_types(
69            &pool,
70            &config.worker_id,
71            config.lease_ttl_seconds,
72            claim_limit as i64,
73            &claimable_job_types,
74        )
75        .await
76        {
77            Ok(claimed) => claimed,
78            Err(error) => {
79                let error = WorkerError::ClaimJobs {
80                    worker_id: config.worker_id.clone(),
81                    source: error,
82                };
83                warn!(%error, "worker claim failed");
84                Vec::new()
85            }
86        };
87
88        if claimed.is_empty() {
89            wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
90            continue;
91        }
92
93        let claimed_len = claimed.len();
94        for job in claimed {
95            let permit = match Arc::clone(&semaphore).acquire_owned().await {
96                Ok(permit) => permit,
97                Err(_) => break,
98            };
99            let pool_clone = pool.clone();
100            let registry_clone = Arc::clone(&registry);
101            let lease_ttl_seconds = config.lease_ttl_seconds;
102            join_set.spawn(async move {
103                let _permit = permit;
104                process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
105            });
106        }
107
108        if claimed_len == claim_limit {
109            continue;
110        }
111
112        if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
113            break;
114        }
115    }
116
117    info!("worker shutdown requested; draining in-flight jobs");
118    while join_set.join_next().await.is_some() {}
119}
120
121async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
122    while let Some(result) = join_set.try_join_next() {
123        if let Err(error) = result {
124            error!(%error, "job task crashed");
125        }
126    }
127}
128
129fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
130    *shutdown.borrow() || shutdown.has_changed().is_err()
131}
132
133async fn wait_for_shutdown_or_poll(
134    shutdown: &mut watch::Receiver<bool>,
135    poll_interval: Duration,
136) -> bool {
137    tokio::select! {
138        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
139        _ = sleep(poll_interval) => false,
140    }
141}
142
143async fn process_claimed_job(
144    pool: runledger_postgres::DbPool,
145    registry: Arc<JobRegistry>,
146    job: jobs::JobQueueRecord,
147    lease_ttl_seconds: i32,
148) {
149    let worker_id = job
150        .worker_id
151        .clone()
152        .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
153
154    let job_span = info_span!(
155        "job",
156        sentry.name = %job.job_type,
157        sentry.op = "runledger.job",
158        job_id = %job.id,
159        job_type = %job.job_type,
160        run_number = job.run_number,
161        attempt = job.attempt,
162        organization_id = ?job.organization_id,
163        worker_id = %worker_id,
164    );
165    async {
166        let start = Instant::now();
167        let context = JobContext {
168            job_id: job.id,
169            run_number: job.run_number,
170            attempt: job.attempt,
171            organization_id: job.organization_id,
172            worker_id: worker_id.clone(),
173        };
174
175        if !mark_job_running_or_abort(&pool, &context, &job).await {
176            return;
177        }
178
179        match execute_job_handler_with_heartbeats(
180            pool.clone(),
181            Arc::clone(&registry),
182            &context,
183            &job,
184            lease_ttl_seconds,
185        )
186        .await
187        {
188            Ok(progress) => {
189                let completion = JobProgressUpdate {
190                    stage: Some(progress.stage),
191                    progress_done: progress.progress_done,
192                    progress_total: progress.progress_total,
193                    checkpoint: progress.checkpoint.as_ref(),
194                };
195                if let Err(error) = jobs::complete_job_success(
196                    &pool,
197                    job.id,
198                    job.run_number,
199                    job.attempt,
200                    &context.worker_id,
201                    Some(&completion),
202                )
203                .await
204                {
205                    let release_conflict = is_workflow_release_conflict_error(&error);
206                    let error = WorkerError::CompleteSuccess {
207                        job_id: job.id,
208                        attempt: job.attempt,
209                        source: error,
210                    };
211                    if release_conflict {
212                        warn!(
213                            %error,
214                            job_id = %job.id,
215                            "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
216                        );
217                    } else {
218                        error!(%error, job_id = %job.id, "failed to mark job success");
219                    }
220                }
221            }
222            Err(failure) => {
223                if is_lease_maintenance_failure(&failure) {
224                    warn!(
225                        job_id = %job.id,
226                        attempt = job.attempt,
227                        failure_code = failure.code,
228                        "job processing aborted because durable lease maintenance was lost"
229                    );
230                    return;
231                }
232
233                let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
234                    None
235                } else {
236                    Some(retry_delay_ms_for_failure(
237                        registry.as_ref(),
238                        &job,
239                        &failure,
240                    ))
241                };
242                let failure_payload = JobFailureUpdate {
243                    kind: failure.kind,
244                    code: failure.code,
245                    message: failure.message.as_ref(),
246                    retry_delay_ms,
247                };
248                let dead_letter = dead_letter_info(&job, &failure);
249                if let Err(error) = jobs::complete_job_failure(
250                    &pool,
251                    job.id,
252                    job.run_number,
253                    job.attempt,
254                    &context.worker_id,
255                    &failure_payload,
256                )
257                .await
258                {
259                    let release_conflict = is_workflow_release_conflict_error(&error);
260                    let error = WorkerError::CompleteFailure {
261                        job_id: job.id,
262                        attempt: job.attempt,
263                        source: error,
264                    };
265                    if release_conflict {
266                        warn!(
267                            %error,
268                            job_id = %job.id,
269                            "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
270                        );
271                    } else {
272                        error!(%error, job_id = %job.id, "failed to mark job failure");
273                    }
274                } else if let Some(dead_letter) = dead_letter {
275                    warn!(
276                        job_id = %job.id,
277                        job_type = %job.job_type,
278                        run_number = job.run_number,
279                        attempt = job.attempt,
280                        max_attempts = job.max_attempts,
281                        organization_id = ?job.organization_id,
282                        worker_id = %context.worker_id,
283                        dead_letter_reason = ?dead_letter.reason,
284                        failure_kind = ?dead_letter.failure.kind,
285                        failure_code = dead_letter.failure.code,
286                        failure_message = %dead_letter.failure.message,
287                        "job dead lettered after handler failure"
288                    );
289                    notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
290                        .await;
291                }
292            }
293        }
294
295        info!(
296            job_id = %job.id,
297            attempt = job.attempt,
298            run_number = job.run_number,
299            elapsed_ms = start.elapsed().as_millis(),
300            "job processed"
301        );
302    }
303    .instrument(job_span)
304    .await;
305}
306
307async fn mark_job_running_or_abort(
308    pool: &runledger_postgres::DbPool,
309    context: &JobContext,
310    job: &jobs::JobQueueRecord,
311) -> bool {
312    let running_progress = JobProgressUpdate {
313        stage: Some(runledger_core::jobs::JobStage::Running),
314        progress_done: None,
315        progress_total: None,
316        checkpoint: None,
317    };
318
319    let Err(source) = jobs::update_job_progress(
320        pool,
321        job.id,
322        job.run_number,
323        job.attempt,
324        &context.worker_id,
325        &running_progress,
326    )
327    .await
328    else {
329        return true;
330    };
331
332    handle_running_progress_persist_failure(pool, context, job, source).await;
333    false
334}
335
336async fn handle_running_progress_persist_failure(
337    pool: &runledger_postgres::DbPool,
338    context: &JobContext,
339    job: &jobs::JobQueueRecord,
340    source: runledger_postgres::Error,
341) {
342    let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
343    let error = WorkerError::SetRunningProgress {
344        job_id: job.id,
345        attempt: job.attempt,
346        source,
347    };
348
349    if lease_owner_mismatch {
350        warn!(
351            %error,
352            job_id = %job.id,
353            attempt = job.attempt,
354            "aborting job before execution because lease ownership was already lost"
355        );
356        return;
357    }
358
359    match jobs::release_unstarted_job_claim(
360        pool,
361        job.id,
362        job.run_number,
363        job.attempt,
364        &context.worker_id,
365        RUNNING_PROGRESS_PERSIST_FAILED_REASON,
366        UNSTARTED_CLAIM_RETRY_DELAY_MS,
367    )
368    .await
369    {
370        Ok(()) => {
371            warn!(
372                %error,
373                job_id = %job.id,
374                attempt = job.attempt,
375                "running progress could not be persisted; released unstarted claim back to pending"
376            );
377        }
378        Err(release_error) => {
379            let no_longer_releasable =
380                is_unstarted_claim_release_not_applicable_error(&release_error);
381            let release_error = WorkerError::ReleaseUnstartedClaim {
382                job_id: job.id,
383                attempt: job.attempt,
384                source: release_error,
385            };
386            if no_longer_releasable {
387                warn!(
388                    %error,
389                    %release_error,
390                    job_id = %job.id,
391                    attempt = job.attempt,
392                    "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
393                );
394                return;
395            }
396
397            warn!(
398                %error,
399                %release_error,
400                job_id = %job.id,
401                attempt = job.attempt,
402                "running progress could not be persisted; leaving claim for reaper recovery"
403            );
404        }
405    }
406}
407
408async fn execute_job_handler(
409    registry: Arc<JobRegistry>,
410    context: &JobContext,
411    job: &jobs::JobQueueRecord,
412) -> Result<JobProgress, JobFailure> {
413    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
414        return Err(JobFailure::terminal(
415            "job.handler_not_registered",
416            "No handler is registered for this job type.",
417        ));
418    };
419
420    handler
421        .execute(context.clone(), job.payload.clone())
422        .await?;
423
424    Ok(JobProgress {
425        stage: runledger_core::jobs::JobStage::Completed,
426        progress_done: None,
427        progress_total: None,
428        checkpoint: None,
429    })
430}
431
432async fn execute_job_handler_with_heartbeats(
433    pool: runledger_postgres::DbPool,
434    registry: Arc<JobRegistry>,
435    context: &JobContext,
436    job: &jobs::JobQueueRecord,
437    lease_ttl_seconds: i32,
438) -> Result<JobProgress, JobFailure> {
439    let mut execution =
440        Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
441    let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
442    let mut timeout = Box::pin(sleep_until(timeout_deadline));
443
444    let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
445    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
446    ticker.tick().await;
447
448    loop {
449        tokio::select! {
450            result = &mut execution => {
451                return match result {
452                    Ok(result) => result,
453                    Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
454                };
455            }
456            _ = &mut timeout => {
457                return Err(JobFailure::timeout(
458                    "job.timeout_exceeded",
459                    "Job exceeded the configured timeout.",
460                ));
461            }
462            _ = ticker.tick() => {
463                if let Err(error) = jobs::heartbeat_job(
464                    &pool,
465                    job.id,
466                    job.run_number,
467                    job.attempt,
468                    &context.worker_id,
469                    lease_ttl_seconds,
470                )
471                .await
472                {
473                    let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
474                    let error = WorkerError::Heartbeat {
475                        job_id: job.id,
476                        attempt: job.attempt,
477                        source: error,
478                    };
479
480                    if lease_owner_mismatch {
481                        warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
482                        return Err(lease_owner_mismatch_failure());
483                    }
484
485                    warn!(
486                        %error,
487                        job_id = %job.id,
488                        "aborting job because lease heartbeat could not be persisted"
489                    );
490                    return Err(lease_maintenance_failure());
491                }
492            }
493        }
494    }
495}
496
497fn lease_owner_mismatch_failure() -> JobFailure {
498    JobFailure::lease_expired(
499        LEASE_OWNER_MISMATCH_CODE,
500        "Job lease ownership was lost during processing.",
501    )
502}
503
504fn lease_maintenance_failure() -> JobFailure {
505    JobFailure::lease_expired(
506        LEASE_MAINTENANCE_FAILED_CODE,
507        "Job lease could not be durably maintained during processing.",
508    )
509}
510
511fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
512    JobFailure::panicked(
513        HANDLER_PANIC_CODE,
514        format!(
515            "Job handler panicked: {}",
516            panic_payload_message(&*panic_payload)
517        ),
518    )
519}
520
521fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
522    if let Some(message) = panic_payload.downcast_ref::<String>() {
523        return message.clone();
524    }
525
526    if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
527        return (*message).to_string();
528    }
529
530    "non-string panic payload".to_string()
531}
532
533fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
534    matches!(
535        error,
536        runledger_postgres::Error::QueryError(query_error)
537            if query_error.code() == expected_code
538    )
539}
540
541fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
542    has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
543}
544
545fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
546    has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
547}
548
549fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
550    has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
551}
552
553fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
554    matches!(
555        failure.code,
556        LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
557    )
558}
559
560fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
561    // Renew at one-third of the lease TTL so a delayed heartbeat still leaves
562    // time for subsequent renewals before the lease expires.
563    let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
564    Duration::from_secs(seconds)
565}
566
567fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
568    matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
569}
570
571fn retry_delay_ms_for_failure(
572    registry: &JobRegistry,
573    job: &jobs::JobQueueRecord,
574    failure: &JobFailure,
575) -> i32 {
576    registry
577        .retry_delay_override(job.job_type.as_borrowed(), failure.code)
578        .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
579}
580
581fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
582    let reason = if is_non_retryable_failure_kind(failure.kind) {
583        Some(JobDeadLetterReason::FailureKindNonRetryable)
584    } else if job.attempt >= job.max_attempts {
585        Some(JobDeadLetterReason::AttemptsExhausted)
586    } else {
587        None
588    }?;
589
590    Some(JobDeadLetterInfo::new(
591        failure.clone(),
592        reason,
593        Some(job.max_attempts),
594    ))
595}
596
597async fn notify_handler_of_dead_letter(
598    registry: &JobRegistry,
599    context: &JobContext,
600    job: &jobs::JobQueueRecord,
601    dead_letter: JobDeadLetterInfo,
602) {
603    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
604        return;
605    };
606    let context = context.clone();
607    let payload = job.payload.clone();
608
609    let hook_task = tokio::spawn(async move {
610        tokio::time::timeout(
611            TERMINAL_HOOK_TIMEOUT,
612            handler.on_dead_letter(context, payload, dead_letter),
613        )
614        .await
615        .is_ok()
616    });
617    match hook_task.await {
618        Ok(true) => {}
619        Ok(false) => {
620            warn!(
621                job_id = %job.id,
622                job_type = %job.job_type,
623                run_number = job.run_number,
624                attempt = job.attempt,
625                timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
626                "dead-letter hook timed out; continuing worker job task"
627            );
628        }
629        Err(error) => log_dead_letter_hook_join_error(job, error),
630    }
631}
632
633fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
634    if error.is_panic() {
635        warn!(
636            job_id = %job.id,
637            job_type = %job.job_type,
638            run_number = job.run_number,
639            attempt = job.attempt,
640            error = %error,
641            "dead-letter hook panicked; continuing worker job task"
642        );
643    } else if error.is_cancelled() {
644        warn!(
645            job_id = %job.id,
646            job_type = %job.job_type,
647            run_number = job.run_number,
648            attempt = job.attempt,
649            error = %error,
650            "dead-letter hook was cancelled; continuing worker job task"
651        );
652    } else {
653        warn!(
654            job_id = %job.id,
655            job_type = %job.job_type,
656            run_number = job.run_number,
657            attempt = job.attempt,
658            error = %error,
659            "dead-letter hook join failed; continuing worker job task"
660        );
661    }
662}
663
664fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
665    let exp = attempt.clamp(1, 10) as u32;
666    let base_ms: i64 = 5_000;
667    let raw = base_ms * (1_i64 << exp);
668    let capped = raw.min(300_000);
669    let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
670    (capped + jitter).max(1_000) as i32
671}
672
673#[cfg(test)]
674mod tests;