Skip to main content

runledger_runtime/
worker.rs

1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8    JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::RuntimeLoopExit;
17use crate::WorkerError;
18use crate::config::JobsConfig;
19use crate::registry::JobRegistry;
20
21const UNKNOWN_WORKER_ID: &str = "unknown-worker";
22// Kept stable for clients that already match this code; it also covers leases
23// that expired before the worker's lifecycle update reached storage.
24const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
25const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
26const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
27const HANDLER_PANIC_CODE: &str = "job.handler_panic";
28const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
29const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
30    "job.unstarted_claim_release_not_applicable";
31const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
32#[cfg(test)]
33const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
34#[cfg(not(test))]
35const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
36
37pub async fn run_worker_loop(
38    pool: runledger_postgres::DbPool,
39    registry: JobRegistry,
40    config: JobsConfig,
41    mut shutdown: watch::Receiver<bool>,
42) -> RuntimeLoopExit {
43    let registry = Arc::new(registry);
44    let claimable_job_types = registry.registered_types();
45    let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
46    let mut join_set: JoinSet<()> = JoinSet::new();
47
48    loop {
49        drain_finished_tasks(&mut join_set).await;
50
51        if shutdown_requested_or_closed(&shutdown) {
52            return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
53        }
54
55        if claimable_job_types.is_empty() {
56            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
57                return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
58            }
59            continue;
60        }
61
62        let available = semaphore.available_permits();
63        if available == 0 {
64            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
65                return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
66            }
67            continue;
68        }
69
70        let claim_limit = min(available, config.claim_batch_size as usize);
71        let claimed = match jobs::claim_prestart_jobs_for_types(
72            &pool,
73            &config.worker_id,
74            config.lease_ttl_seconds,
75            claim_limit as i64,
76            &claimable_job_types,
77        )
78        .await
79        {
80            Ok(claimed) => claimed,
81            Err(error) => {
82                let error = WorkerError::ClaimJobs {
83                    worker_id: config.worker_id.clone(),
84                    source: error,
85                };
86                warn!(%error, "worker claim failed");
87                Vec::new()
88            }
89        };
90
91        if claimed.is_empty() {
92            wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
93            continue;
94        }
95
96        let claimed_len = claimed.len();
97        for job in claimed {
98            let permit = match Arc::clone(&semaphore).acquire_owned().await {
99                Ok(permit) => permit,
100                Err(_) => {
101                    // The worker owns this semaphore and never closes it. If
102                    // this defensive branch fires, surface it as an unexpected
103                    // loop completion rather than graceful shutdown.
104                    warn!("worker semaphore closed; stopping worker loop");
105                    return drain_in_flight_jobs(join_set, RuntimeLoopExit::Completed).await;
106                }
107            };
108            let pool_clone = pool.clone();
109            let registry_clone = Arc::clone(&registry);
110            let lease_ttl_seconds = config.lease_ttl_seconds;
111            join_set.spawn(async move {
112                let _permit = permit;
113                process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
114            });
115        }
116
117        if claimed_len == claim_limit {
118            continue;
119        }
120
121        if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
122            return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
123        }
124    }
125}
126
127async fn drain_in_flight_jobs(mut join_set: JoinSet<()>, exit: RuntimeLoopExit) -> RuntimeLoopExit {
128    if !join_set.is_empty() {
129        match exit {
130            RuntimeLoopExit::Shutdown => {
131                info!("worker shutdown requested; draining in-flight jobs")
132            }
133            RuntimeLoopExit::Completed => {
134                warn!("worker loop completed before shutdown; draining in-flight jobs");
135            }
136        }
137    }
138    while join_set.join_next().await.is_some() {}
139    exit
140}
141
142async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
143    while let Some(result) = join_set.try_join_next() {
144        if let Err(error) = result {
145            error!(%error, "job task crashed");
146        }
147    }
148}
149
150fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
151    *shutdown.borrow() || shutdown.has_changed().is_err()
152}
153
154async fn wait_for_shutdown_or_poll(
155    shutdown: &mut watch::Receiver<bool>,
156    poll_interval: Duration,
157) -> bool {
158    tokio::select! {
159        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
160        _ = sleep(poll_interval) => false,
161    }
162}
163
164async fn process_claimed_job(
165    pool: runledger_postgres::DbPool,
166    registry: Arc<JobRegistry>,
167    job: jobs::JobQueueRecord,
168    lease_ttl_seconds: i32,
169) {
170    let worker_id = job
171        .worker_id
172        .clone()
173        .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
174
175    let job_span = info_span!(
176        "job",
177        sentry.name = %job.job_type,
178        sentry.op = "runledger.job",
179        job_id = %job.id,
180        job_type = %job.job_type,
181        run_number = job.run_number,
182        attempt = job.attempt,
183        organization_id = ?job.organization_id,
184        worker_id = %worker_id,
185    );
186    async {
187        let start = Instant::now();
188        let context = JobContext {
189            job_id: job.id,
190            run_number: job.run_number,
191            attempt: job.attempt,
192            organization_id: job.organization_id,
193            worker_id: worker_id.clone(),
194        };
195
196        if !mark_job_running_or_abort(&pool, &context, &job).await {
197            return;
198        }
199
200        match execute_job_handler_with_heartbeats(
201            pool.clone(),
202            Arc::clone(&registry),
203            &context,
204            &job,
205            lease_ttl_seconds,
206        )
207        .await
208        {
209            Ok(progress) => {
210                let completion = JobProgressUpdate {
211                    stage: Some(progress.stage),
212                    progress_done: progress.progress_done,
213                    progress_total: progress.progress_total,
214                    checkpoint: progress.checkpoint.as_ref(),
215                };
216                if let Err(error) = jobs::complete_job_success(
217                    &pool,
218                    job.id,
219                    job.run_number,
220                    job.attempt,
221                    &context.worker_id,
222                    Some(&completion),
223                )
224                .await
225                {
226                    let release_conflict = is_workflow_release_conflict_error(&error);
227                    let error = WorkerError::CompleteSuccess {
228                        job_id: job.id,
229                        attempt: job.attempt,
230                        source: error,
231                    };
232                    if release_conflict {
233                        warn!(
234                            %error,
235                            job_id = %job.id,
236                            "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
237                        );
238                    } else {
239                        error!(%error, job_id = %job.id, "failed to mark job success");
240                    }
241                }
242            }
243            Err(failure) => {
244                if is_lease_maintenance_failure(&failure) {
245                    warn!(
246                        job_id = %job.id,
247                        attempt = job.attempt,
248                        failure_code = failure.code,
249                        "job processing aborted because durable lease maintenance was lost"
250                    );
251                    return;
252                }
253
254                let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
255                    None
256                } else {
257                    Some(retry_delay_ms_for_failure(
258                        registry.as_ref(),
259                        &job,
260                        &failure,
261                    ))
262                };
263                let failure_payload = JobFailureUpdate {
264                    kind: failure.kind,
265                    code: failure.code,
266                    message: failure.message.as_ref(),
267                    retry_delay_ms,
268                };
269                let dead_letter = dead_letter_info(&job, &failure);
270                if let Err(error) = jobs::complete_job_failure(
271                    &pool,
272                    job.id,
273                    job.run_number,
274                    job.attempt,
275                    &context.worker_id,
276                    &failure_payload,
277                )
278                .await
279                {
280                    let release_conflict = is_workflow_release_conflict_error(&error);
281                    let error = WorkerError::CompleteFailure {
282                        job_id: job.id,
283                        attempt: job.attempt,
284                        source: error,
285                    };
286                    if release_conflict {
287                        warn!(
288                            %error,
289                            job_id = %job.id,
290                            "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
291                        );
292                    } else {
293                        error!(%error, job_id = %job.id, "failed to mark job failure");
294                    }
295                } else if let Some(dead_letter) = dead_letter {
296                    warn!(
297                        job_id = %job.id,
298                        job_type = %job.job_type,
299                        run_number = job.run_number,
300                        attempt = job.attempt,
301                        max_attempts = job.max_attempts,
302                        organization_id = ?job.organization_id,
303                        worker_id = %context.worker_id,
304                        dead_letter_reason = ?dead_letter.reason,
305                        failure_kind = ?dead_letter.failure.kind,
306                        failure_code = dead_letter.failure.code,
307                        failure_message = %dead_letter.failure.message,
308                        "job dead lettered after handler failure"
309                    );
310                    notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
311                        .await;
312                }
313            }
314        }
315
316        info!(
317            job_id = %job.id,
318            attempt = job.attempt,
319            run_number = job.run_number,
320            elapsed_ms = start.elapsed().as_millis(),
321            "job processed"
322        );
323    }
324    .instrument(job_span)
325    .await;
326}
327
328async fn mark_job_running_or_abort(
329    pool: &runledger_postgres::DbPool,
330    context: &JobContext,
331    job: &jobs::JobQueueRecord,
332) -> bool {
333    let running_progress = JobProgressUpdate {
334        stage: Some(runledger_core::jobs::JobStage::Running),
335        progress_done: None,
336        progress_total: None,
337        checkpoint: None,
338    };
339
340    let Err(source) = jobs::update_job_progress(
341        pool,
342        job.id,
343        job.run_number,
344        job.attempt,
345        &context.worker_id,
346        &running_progress,
347    )
348    .await
349    else {
350        return true;
351    };
352
353    handle_running_progress_persist_failure(pool, context, job, source).await;
354    false
355}
356
357async fn handle_running_progress_persist_failure(
358    pool: &runledger_postgres::DbPool,
359    context: &JobContext,
360    job: &jobs::JobQueueRecord,
361    source: runledger_postgres::Error,
362) {
363    let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
364    let error = WorkerError::SetRunningProgress {
365        job_id: job.id,
366        attempt: job.attempt,
367        source,
368    };
369
370    if lease_owner_mismatch {
371        warn!(
372            %error,
373            job_id = %job.id,
374            attempt = job.attempt,
375            "aborting job before execution because lease ownership was already lost"
376        );
377        return;
378    }
379
380    match jobs::release_unstarted_job_claim(
381        pool,
382        job.id,
383        job.run_number,
384        job.attempt,
385        &context.worker_id,
386        RUNNING_PROGRESS_PERSIST_FAILED_REASON,
387        UNSTARTED_CLAIM_RETRY_DELAY_MS,
388    )
389    .await
390    {
391        Ok(()) => {
392            warn!(
393                %error,
394                job_id = %job.id,
395                attempt = job.attempt,
396                "running progress could not be persisted; released unstarted claim back to pending"
397            );
398        }
399        Err(release_error) => {
400            let no_longer_releasable =
401                is_unstarted_claim_release_not_applicable_error(&release_error);
402            let release_error = WorkerError::ReleaseUnstartedClaim {
403                job_id: job.id,
404                attempt: job.attempt,
405                source: release_error,
406            };
407            if no_longer_releasable {
408                warn!(
409                    %error,
410                    %release_error,
411                    job_id = %job.id,
412                    attempt = job.attempt,
413                    "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
414                );
415                return;
416            }
417
418            warn!(
419                %error,
420                %release_error,
421                job_id = %job.id,
422                attempt = job.attempt,
423                "running progress could not be persisted; leaving claim for reaper recovery"
424            );
425        }
426    }
427}
428
429async fn execute_job_handler(
430    registry: Arc<JobRegistry>,
431    context: &JobContext,
432    job: &jobs::JobQueueRecord,
433) -> Result<JobProgress, JobFailure> {
434    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
435        return Err(JobFailure::terminal(
436            "job.handler_not_registered",
437            "No handler is registered for this job type.",
438        ));
439    };
440
441    handler
442        .execute(context.clone(), job.payload.clone())
443        .await?;
444
445    Ok(JobProgress {
446        stage: runledger_core::jobs::JobStage::Completed,
447        progress_done: None,
448        progress_total: None,
449        checkpoint: None,
450    })
451}
452
453async fn execute_job_handler_with_heartbeats(
454    pool: runledger_postgres::DbPool,
455    registry: Arc<JobRegistry>,
456    context: &JobContext,
457    job: &jobs::JobQueueRecord,
458    lease_ttl_seconds: i32,
459) -> Result<JobProgress, JobFailure> {
460    let mut execution =
461        Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
462    let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
463    let mut timeout = Box::pin(sleep_until(timeout_deadline));
464
465    let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
466    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
467    ticker.tick().await;
468
469    loop {
470        tokio::select! {
471            result = &mut execution => {
472                return match result {
473                    Ok(result) => result,
474                    Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
475                };
476            }
477            _ = &mut timeout => {
478                return Err(JobFailure::timeout(
479                    "job.timeout_exceeded",
480                    "Job exceeded the configured timeout.",
481                ));
482            }
483            _ = ticker.tick() => {
484                if let Err(error) = jobs::heartbeat_job(
485                    &pool,
486                    job.id,
487                    job.run_number,
488                    job.attempt,
489                    &context.worker_id,
490                    lease_ttl_seconds,
491                )
492                .await
493                {
494                    let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
495                    let error = WorkerError::Heartbeat {
496                        job_id: job.id,
497                        attempt: job.attempt,
498                        source: error,
499                    };
500
501                    if lease_owner_mismatch {
502                        warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
503                        return Err(lease_owner_mismatch_failure());
504                    }
505
506                    warn!(
507                        %error,
508                        job_id = %job.id,
509                        "aborting job because lease heartbeat could not be persisted"
510                    );
511                    return Err(lease_maintenance_failure());
512                }
513            }
514        }
515    }
516}
517
518fn lease_owner_mismatch_failure() -> JobFailure {
519    JobFailure::lease_expired(
520        LEASE_OWNER_MISMATCH_CODE,
521        "Job lease ownership was lost during processing.",
522    )
523}
524
525fn lease_maintenance_failure() -> JobFailure {
526    JobFailure::lease_expired(
527        LEASE_MAINTENANCE_FAILED_CODE,
528        "Job lease could not be durably maintained during processing.",
529    )
530}
531
532fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
533    JobFailure::panicked(
534        HANDLER_PANIC_CODE,
535        format!(
536            "Job handler panicked: {}",
537            panic_payload_message(&*panic_payload)
538        ),
539    )
540}
541
542fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
543    if let Some(message) = panic_payload.downcast_ref::<String>() {
544        return message.clone();
545    }
546
547    if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
548        return (*message).to_string();
549    }
550
551    "non-string panic payload".to_string()
552}
553
554fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
555    matches!(
556        error,
557        runledger_postgres::Error::QueryError(query_error)
558            if query_error.code() == expected_code
559    )
560}
561
562fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
563    has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
564}
565
566fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
567    has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
568}
569
570fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
571    has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
572}
573
574fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
575    matches!(
576        failure.code,
577        LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
578    )
579}
580
581fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
582    // Renew at one-third of the lease TTL so a delayed heartbeat still leaves
583    // time for subsequent renewals before the lease expires.
584    let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
585    Duration::from_secs(seconds)
586}
587
588fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
589    matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
590}
591
592fn retry_delay_ms_for_failure(
593    registry: &JobRegistry,
594    job: &jobs::JobQueueRecord,
595    failure: &JobFailure,
596) -> i32 {
597    registry
598        .retry_delay_override(job.job_type.as_borrowed(), failure.code)
599        .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
600}
601
602fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
603    let reason = if is_non_retryable_failure_kind(failure.kind) {
604        Some(JobDeadLetterReason::FailureKindNonRetryable)
605    } else if job.attempt >= job.max_attempts {
606        Some(JobDeadLetterReason::AttemptsExhausted)
607    } else {
608        None
609    }?;
610
611    Some(JobDeadLetterInfo::new(
612        failure.clone(),
613        reason,
614        Some(job.max_attempts),
615    ))
616}
617
618async fn notify_handler_of_dead_letter(
619    registry: &JobRegistry,
620    context: &JobContext,
621    job: &jobs::JobQueueRecord,
622    dead_letter: JobDeadLetterInfo,
623) {
624    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
625        return;
626    };
627    let context = context.clone();
628    let payload = job.payload.clone();
629
630    let hook_task = tokio::spawn(async move {
631        tokio::time::timeout(
632            TERMINAL_HOOK_TIMEOUT,
633            handler.on_dead_letter(context, payload, dead_letter),
634        )
635        .await
636        .is_ok()
637    });
638    match hook_task.await {
639        Ok(true) => {}
640        Ok(false) => {
641            warn!(
642                job_id = %job.id,
643                job_type = %job.job_type,
644                run_number = job.run_number,
645                attempt = job.attempt,
646                timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
647                "dead-letter hook timed out; continuing worker job task"
648            );
649        }
650        Err(error) => log_dead_letter_hook_join_error(job, error),
651    }
652}
653
654fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
655    if error.is_panic() {
656        warn!(
657            job_id = %job.id,
658            job_type = %job.job_type,
659            run_number = job.run_number,
660            attempt = job.attempt,
661            error = %error,
662            "dead-letter hook panicked; continuing worker job task"
663        );
664    } else if error.is_cancelled() {
665        warn!(
666            job_id = %job.id,
667            job_type = %job.job_type,
668            run_number = job.run_number,
669            attempt = job.attempt,
670            error = %error,
671            "dead-letter hook was cancelled; continuing worker job task"
672        );
673    } else {
674        warn!(
675            job_id = %job.id,
676            job_type = %job.job_type,
677            run_number = job.run_number,
678            attempt = job.attempt,
679            error = %error,
680            "dead-letter hook join failed; continuing worker job task"
681        );
682    }
683}
684
685fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
686    let exp = attempt.clamp(1, 10) as u32;
687    let base_ms: i64 = 5_000;
688    let raw = base_ms * (1_i64 << exp);
689    let capped = raw.min(300_000);
690    let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
691    (capped + jitter).max(1_000) as i32
692}
693
694#[cfg(test)]
695mod tests;