Skip to main content

runledger_runtime/
worker.rs

1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8    JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21// Kept stable for clients that already match this code; it also covers leases
22// that expired before the worker's lifecycle update reached storage.
23const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
24const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
25const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
26const HANDLER_PANIC_CODE: &str = "job.handler_panic";
27const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
28const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
29    "job.unstarted_claim_release_not_applicable";
30const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
31#[cfg(test)]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
33#[cfg(not(test))]
34const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
35
36pub async fn run_worker_loop(
37    pool: runledger_postgres::DbPool,
38    registry: JobRegistry,
39    config: JobsConfig,
40    mut shutdown: watch::Receiver<bool>,
41) {
42    let registry = Arc::new(registry);
43    let claimable_job_types = registry.registered_types();
44    let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
45    let mut join_set: JoinSet<()> = JoinSet::new();
46
47    loop {
48        drain_finished_tasks(&mut join_set).await;
49
50        if shutdown_requested_or_closed(&shutdown) {
51            break;
52        }
53
54        if claimable_job_types.is_empty() {
55            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
56                break;
57            }
58            continue;
59        }
60
61        let available = semaphore.available_permits();
62        if available == 0 {
63            if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
64                break;
65            }
66            continue;
67        }
68
69        let claim_limit = min(available, config.claim_batch_size as usize);
70        let claimed = match jobs::claim_prestart_jobs_for_types(
71            &pool,
72            &config.worker_id,
73            config.lease_ttl_seconds,
74            claim_limit as i64,
75            &claimable_job_types,
76        )
77        .await
78        {
79            Ok(claimed) => claimed,
80            Err(error) => {
81                let error = WorkerError::ClaimJobs {
82                    worker_id: config.worker_id.clone(),
83                    source: error,
84                };
85                warn!(%error, "worker claim failed");
86                Vec::new()
87            }
88        };
89
90        if claimed.is_empty() {
91            wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
92            continue;
93        }
94
95        let claimed_len = claimed.len();
96        for job in claimed {
97            let permit = match Arc::clone(&semaphore).acquire_owned().await {
98                Ok(permit) => permit,
99                Err(_) => break,
100            };
101            let pool_clone = pool.clone();
102            let registry_clone = Arc::clone(&registry);
103            let lease_ttl_seconds = config.lease_ttl_seconds;
104            join_set.spawn(async move {
105                let _permit = permit;
106                process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
107            });
108        }
109
110        if claimed_len == claim_limit {
111            continue;
112        }
113
114        if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
115            break;
116        }
117    }
118
119    info!("worker shutdown requested; draining in-flight jobs");
120    while join_set.join_next().await.is_some() {}
121}
122
123async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
124    while let Some(result) = join_set.try_join_next() {
125        if let Err(error) = result {
126            error!(%error, "job task crashed");
127        }
128    }
129}
130
131fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
132    *shutdown.borrow() || shutdown.has_changed().is_err()
133}
134
135async fn wait_for_shutdown_or_poll(
136    shutdown: &mut watch::Receiver<bool>,
137    poll_interval: Duration,
138) -> bool {
139    tokio::select! {
140        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
141        _ = sleep(poll_interval) => false,
142    }
143}
144
145async fn process_claimed_job(
146    pool: runledger_postgres::DbPool,
147    registry: Arc<JobRegistry>,
148    job: jobs::JobQueueRecord,
149    lease_ttl_seconds: i32,
150) {
151    let worker_id = job
152        .worker_id
153        .clone()
154        .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
155
156    let job_span = info_span!(
157        "job",
158        sentry.name = %job.job_type,
159        sentry.op = "runledger.job",
160        job_id = %job.id,
161        job_type = %job.job_type,
162        run_number = job.run_number,
163        attempt = job.attempt,
164        organization_id = ?job.organization_id,
165        worker_id = %worker_id,
166    );
167    async {
168        let start = Instant::now();
169        let context = JobContext {
170            job_id: job.id,
171            run_number: job.run_number,
172            attempt: job.attempt,
173            organization_id: job.organization_id,
174            worker_id: worker_id.clone(),
175        };
176
177        if !mark_job_running_or_abort(&pool, &context, &job).await {
178            return;
179        }
180
181        match execute_job_handler_with_heartbeats(
182            pool.clone(),
183            Arc::clone(&registry),
184            &context,
185            &job,
186            lease_ttl_seconds,
187        )
188        .await
189        {
190            Ok(progress) => {
191                let completion = JobProgressUpdate {
192                    stage: Some(progress.stage),
193                    progress_done: progress.progress_done,
194                    progress_total: progress.progress_total,
195                    checkpoint: progress.checkpoint.as_ref(),
196                };
197                if let Err(error) = jobs::complete_job_success(
198                    &pool,
199                    job.id,
200                    job.run_number,
201                    job.attempt,
202                    &context.worker_id,
203                    Some(&completion),
204                )
205                .await
206                {
207                    let release_conflict = is_workflow_release_conflict_error(&error);
208                    let error = WorkerError::CompleteSuccess {
209                        job_id: job.id,
210                        attempt: job.attempt,
211                        source: error,
212                    };
213                    if release_conflict {
214                        warn!(
215                            %error,
216                            job_id = %job.id,
217                            "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
218                        );
219                    } else {
220                        error!(%error, job_id = %job.id, "failed to mark job success");
221                    }
222                }
223            }
224            Err(failure) => {
225                if is_lease_maintenance_failure(&failure) {
226                    warn!(
227                        job_id = %job.id,
228                        attempt = job.attempt,
229                        failure_code = failure.code,
230                        "job processing aborted because durable lease maintenance was lost"
231                    );
232                    return;
233                }
234
235                let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
236                    None
237                } else {
238                    Some(retry_delay_ms_for_failure(
239                        registry.as_ref(),
240                        &job,
241                        &failure,
242                    ))
243                };
244                let failure_payload = JobFailureUpdate {
245                    kind: failure.kind,
246                    code: failure.code,
247                    message: failure.message.as_ref(),
248                    retry_delay_ms,
249                };
250                let dead_letter = dead_letter_info(&job, &failure);
251                if let Err(error) = jobs::complete_job_failure(
252                    &pool,
253                    job.id,
254                    job.run_number,
255                    job.attempt,
256                    &context.worker_id,
257                    &failure_payload,
258                )
259                .await
260                {
261                    let release_conflict = is_workflow_release_conflict_error(&error);
262                    let error = WorkerError::CompleteFailure {
263                        job_id: job.id,
264                        attempt: job.attempt,
265                        source: error,
266                    };
267                    if release_conflict {
268                        warn!(
269                            %error,
270                            job_id = %job.id,
271                            "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
272                        );
273                    } else {
274                        error!(%error, job_id = %job.id, "failed to mark job failure");
275                    }
276                } else if let Some(dead_letter) = dead_letter {
277                    warn!(
278                        job_id = %job.id,
279                        job_type = %job.job_type,
280                        run_number = job.run_number,
281                        attempt = job.attempt,
282                        max_attempts = job.max_attempts,
283                        organization_id = ?job.organization_id,
284                        worker_id = %context.worker_id,
285                        dead_letter_reason = ?dead_letter.reason,
286                        failure_kind = ?dead_letter.failure.kind,
287                        failure_code = dead_letter.failure.code,
288                        failure_message = %dead_letter.failure.message,
289                        "job dead lettered after handler failure"
290                    );
291                    notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
292                        .await;
293                }
294            }
295        }
296
297        info!(
298            job_id = %job.id,
299            attempt = job.attempt,
300            run_number = job.run_number,
301            elapsed_ms = start.elapsed().as_millis(),
302            "job processed"
303        );
304    }
305    .instrument(job_span)
306    .await;
307}
308
309async fn mark_job_running_or_abort(
310    pool: &runledger_postgres::DbPool,
311    context: &JobContext,
312    job: &jobs::JobQueueRecord,
313) -> bool {
314    let running_progress = JobProgressUpdate {
315        stage: Some(runledger_core::jobs::JobStage::Running),
316        progress_done: None,
317        progress_total: None,
318        checkpoint: None,
319    };
320
321    let Err(source) = jobs::update_job_progress(
322        pool,
323        job.id,
324        job.run_number,
325        job.attempt,
326        &context.worker_id,
327        &running_progress,
328    )
329    .await
330    else {
331        return true;
332    };
333
334    handle_running_progress_persist_failure(pool, context, job, source).await;
335    false
336}
337
338async fn handle_running_progress_persist_failure(
339    pool: &runledger_postgres::DbPool,
340    context: &JobContext,
341    job: &jobs::JobQueueRecord,
342    source: runledger_postgres::Error,
343) {
344    let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
345    let error = WorkerError::SetRunningProgress {
346        job_id: job.id,
347        attempt: job.attempt,
348        source,
349    };
350
351    if lease_owner_mismatch {
352        warn!(
353            %error,
354            job_id = %job.id,
355            attempt = job.attempt,
356            "aborting job before execution because lease ownership was already lost"
357        );
358        return;
359    }
360
361    match jobs::release_unstarted_job_claim(
362        pool,
363        job.id,
364        job.run_number,
365        job.attempt,
366        &context.worker_id,
367        RUNNING_PROGRESS_PERSIST_FAILED_REASON,
368        UNSTARTED_CLAIM_RETRY_DELAY_MS,
369    )
370    .await
371    {
372        Ok(()) => {
373            warn!(
374                %error,
375                job_id = %job.id,
376                attempt = job.attempt,
377                "running progress could not be persisted; released unstarted claim back to pending"
378            );
379        }
380        Err(release_error) => {
381            let no_longer_releasable =
382                is_unstarted_claim_release_not_applicable_error(&release_error);
383            let release_error = WorkerError::ReleaseUnstartedClaim {
384                job_id: job.id,
385                attempt: job.attempt,
386                source: release_error,
387            };
388            if no_longer_releasable {
389                warn!(
390                    %error,
391                    %release_error,
392                    job_id = %job.id,
393                    attempt = job.attempt,
394                    "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
395                );
396                return;
397            }
398
399            warn!(
400                %error,
401                %release_error,
402                job_id = %job.id,
403                attempt = job.attempt,
404                "running progress could not be persisted; leaving claim for reaper recovery"
405            );
406        }
407    }
408}
409
410async fn execute_job_handler(
411    registry: Arc<JobRegistry>,
412    context: &JobContext,
413    job: &jobs::JobQueueRecord,
414) -> Result<JobProgress, JobFailure> {
415    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
416        return Err(JobFailure::terminal(
417            "job.handler_not_registered",
418            "No handler is registered for this job type.",
419        ));
420    };
421
422    handler
423        .execute(context.clone(), job.payload.clone())
424        .await?;
425
426    Ok(JobProgress {
427        stage: runledger_core::jobs::JobStage::Completed,
428        progress_done: None,
429        progress_total: None,
430        checkpoint: None,
431    })
432}
433
434async fn execute_job_handler_with_heartbeats(
435    pool: runledger_postgres::DbPool,
436    registry: Arc<JobRegistry>,
437    context: &JobContext,
438    job: &jobs::JobQueueRecord,
439    lease_ttl_seconds: i32,
440) -> Result<JobProgress, JobFailure> {
441    let mut execution =
442        Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
443    let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
444    let mut timeout = Box::pin(sleep_until(timeout_deadline));
445
446    let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
447    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
448    ticker.tick().await;
449
450    loop {
451        tokio::select! {
452            result = &mut execution => {
453                return match result {
454                    Ok(result) => result,
455                    Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
456                };
457            }
458            _ = &mut timeout => {
459                return Err(JobFailure::timeout(
460                    "job.timeout_exceeded",
461                    "Job exceeded the configured timeout.",
462                ));
463            }
464            _ = ticker.tick() => {
465                if let Err(error) = jobs::heartbeat_job(
466                    &pool,
467                    job.id,
468                    job.run_number,
469                    job.attempt,
470                    &context.worker_id,
471                    lease_ttl_seconds,
472                )
473                .await
474                {
475                    let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
476                    let error = WorkerError::Heartbeat {
477                        job_id: job.id,
478                        attempt: job.attempt,
479                        source: error,
480                    };
481
482                    if lease_owner_mismatch {
483                        warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
484                        return Err(lease_owner_mismatch_failure());
485                    }
486
487                    warn!(
488                        %error,
489                        job_id = %job.id,
490                        "aborting job because lease heartbeat could not be persisted"
491                    );
492                    return Err(lease_maintenance_failure());
493                }
494            }
495        }
496    }
497}
498
499fn lease_owner_mismatch_failure() -> JobFailure {
500    JobFailure::lease_expired(
501        LEASE_OWNER_MISMATCH_CODE,
502        "Job lease ownership was lost during processing.",
503    )
504}
505
506fn lease_maintenance_failure() -> JobFailure {
507    JobFailure::lease_expired(
508        LEASE_MAINTENANCE_FAILED_CODE,
509        "Job lease could not be durably maintained during processing.",
510    )
511}
512
513fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
514    JobFailure::panicked(
515        HANDLER_PANIC_CODE,
516        format!(
517            "Job handler panicked: {}",
518            panic_payload_message(&*panic_payload)
519        ),
520    )
521}
522
523fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
524    if let Some(message) = panic_payload.downcast_ref::<String>() {
525        return message.clone();
526    }
527
528    if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
529        return (*message).to_string();
530    }
531
532    "non-string panic payload".to_string()
533}
534
535fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
536    matches!(
537        error,
538        runledger_postgres::Error::QueryError(query_error)
539            if query_error.code() == expected_code
540    )
541}
542
543fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
544    has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
545}
546
547fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
548    has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
549}
550
551fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
552    has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
553}
554
555fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
556    matches!(
557        failure.code,
558        LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
559    )
560}
561
562fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
563    // Renew at one-third of the lease TTL so a delayed heartbeat still leaves
564    // time for subsequent renewals before the lease expires.
565    let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
566    Duration::from_secs(seconds)
567}
568
569fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
570    matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
571}
572
573fn retry_delay_ms_for_failure(
574    registry: &JobRegistry,
575    job: &jobs::JobQueueRecord,
576    failure: &JobFailure,
577) -> i32 {
578    registry
579        .retry_delay_override(job.job_type.as_borrowed(), failure.code)
580        .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
581}
582
583fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
584    let reason = if is_non_retryable_failure_kind(failure.kind) {
585        Some(JobDeadLetterReason::FailureKindNonRetryable)
586    } else if job.attempt >= job.max_attempts {
587        Some(JobDeadLetterReason::AttemptsExhausted)
588    } else {
589        None
590    }?;
591
592    Some(JobDeadLetterInfo::new(
593        failure.clone(),
594        reason,
595        Some(job.max_attempts),
596    ))
597}
598
599async fn notify_handler_of_dead_letter(
600    registry: &JobRegistry,
601    context: &JobContext,
602    job: &jobs::JobQueueRecord,
603    dead_letter: JobDeadLetterInfo,
604) {
605    let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
606        return;
607    };
608    let context = context.clone();
609    let payload = job.payload.clone();
610
611    let hook_task = tokio::spawn(async move {
612        tokio::time::timeout(
613            TERMINAL_HOOK_TIMEOUT,
614            handler.on_dead_letter(context, payload, dead_letter),
615        )
616        .await
617        .is_ok()
618    });
619    match hook_task.await {
620        Ok(true) => {}
621        Ok(false) => {
622            warn!(
623                job_id = %job.id,
624                job_type = %job.job_type,
625                run_number = job.run_number,
626                attempt = job.attempt,
627                timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
628                "dead-letter hook timed out; continuing worker job task"
629            );
630        }
631        Err(error) => log_dead_letter_hook_join_error(job, error),
632    }
633}
634
635fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
636    if error.is_panic() {
637        warn!(
638            job_id = %job.id,
639            job_type = %job.job_type,
640            run_number = job.run_number,
641            attempt = job.attempt,
642            error = %error,
643            "dead-letter hook panicked; continuing worker job task"
644        );
645    } else if error.is_cancelled() {
646        warn!(
647            job_id = %job.id,
648            job_type = %job.job_type,
649            run_number = job.run_number,
650            attempt = job.attempt,
651            error = %error,
652            "dead-letter hook was cancelled; continuing worker job task"
653        );
654    } else {
655        warn!(
656            job_id = %job.id,
657            job_type = %job.job_type,
658            run_number = job.run_number,
659            attempt = job.attempt,
660            error = %error,
661            "dead-letter hook join failed; continuing worker job task"
662        );
663    }
664}
665
666fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
667    let exp = attempt.clamp(1, 10) as u32;
668    let base_ms: i64 = 5_000;
669    let raw = base_ms * (1_i64 << exp);
670    let capped = raw.min(300_000);
671    let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
672    (capped + jitter).max(1_000) as i32
673}
674
675#[cfg(test)]
676mod tests;