1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8 JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
22const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
23const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
24const HANDLER_PANIC_CODE: &str = "job.handler_panic";
25const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
26const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
27 "job.unstarted_claim_release_not_applicable";
28const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
29#[cfg(test)]
30const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
31#[cfg(not(test))]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
33
34pub async fn run_worker_loop(
35 pool: runledger_postgres::DbPool,
36 registry: JobRegistry,
37 config: JobsConfig,
38 mut shutdown: watch::Receiver<bool>,
39) {
40 let registry = Arc::new(registry);
41 let claimable_job_types = registry.registered_types();
42 let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
43 let mut join_set: JoinSet<()> = JoinSet::new();
44
45 loop {
46 drain_finished_tasks(&mut join_set).await;
47
48 if shutdown_requested_or_closed(&shutdown) {
49 break;
50 }
51
52 if claimable_job_types.is_empty() {
53 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
54 break;
55 }
56 continue;
57 }
58
59 let available = semaphore.available_permits();
60 if available == 0 {
61 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
62 break;
63 }
64 continue;
65 }
66
67 let claim_limit = min(available, config.claim_batch_size as usize);
68 let claimed = match jobs::claim_prestart_jobs_for_types(
69 &pool,
70 &config.worker_id,
71 config.lease_ttl_seconds,
72 claim_limit as i64,
73 &claimable_job_types,
74 )
75 .await
76 {
77 Ok(claimed) => claimed,
78 Err(error) => {
79 let error = WorkerError::ClaimJobs {
80 worker_id: config.worker_id.clone(),
81 source: error,
82 };
83 warn!(%error, "worker claim failed");
84 Vec::new()
85 }
86 };
87
88 if claimed.is_empty() {
89 wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
90 continue;
91 }
92
93 let claimed_len = claimed.len();
94 for job in claimed {
95 let permit = match Arc::clone(&semaphore).acquire_owned().await {
96 Ok(permit) => permit,
97 Err(_) => break,
98 };
99 let pool_clone = pool.clone();
100 let registry_clone = Arc::clone(®istry);
101 let lease_ttl_seconds = config.lease_ttl_seconds;
102 join_set.spawn(async move {
103 let _permit = permit;
104 process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
105 });
106 }
107
108 if claimed_len == claim_limit {
109 continue;
110 }
111
112 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
113 break;
114 }
115 }
116
117 info!("worker shutdown requested; draining in-flight jobs");
118 while join_set.join_next().await.is_some() {}
119}
120
121async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
122 while let Some(result) = join_set.try_join_next() {
123 if let Err(error) = result {
124 error!(%error, "job task crashed");
125 }
126 }
127}
128
129fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
130 *shutdown.borrow() || shutdown.has_changed().is_err()
131}
132
133async fn wait_for_shutdown_or_poll(
134 shutdown: &mut watch::Receiver<bool>,
135 poll_interval: Duration,
136) -> bool {
137 tokio::select! {
138 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
139 _ = sleep(poll_interval) => false,
140 }
141}
142
143async fn process_claimed_job(
144 pool: runledger_postgres::DbPool,
145 registry: Arc<JobRegistry>,
146 job: jobs::JobQueueRecord,
147 lease_ttl_seconds: i32,
148) {
149 let worker_id = job
150 .worker_id
151 .clone()
152 .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
153
154 let job_span = info_span!(
155 "job",
156 sentry.name = %job.job_type,
157 sentry.op = "runledger.job",
158 job_id = %job.id,
159 job_type = %job.job_type,
160 run_number = job.run_number,
161 attempt = job.attempt,
162 organization_id = ?job.organization_id,
163 worker_id = %worker_id,
164 );
165 async {
166 let start = Instant::now();
167 let context = JobContext {
168 job_id: job.id,
169 run_number: job.run_number,
170 attempt: job.attempt,
171 organization_id: job.organization_id,
172 worker_id: worker_id.clone(),
173 };
174
175 if !mark_job_running_or_abort(&pool, &context, &job).await {
176 return;
177 }
178
179 match execute_job_handler_with_heartbeats(
180 pool.clone(),
181 Arc::clone(®istry),
182 &context,
183 &job,
184 lease_ttl_seconds,
185 )
186 .await
187 {
188 Ok(progress) => {
189 let completion = JobProgressUpdate {
190 stage: Some(progress.stage),
191 progress_done: progress.progress_done,
192 progress_total: progress.progress_total,
193 checkpoint: progress.checkpoint.as_ref(),
194 };
195 if let Err(error) = jobs::complete_job_success(
196 &pool,
197 job.id,
198 job.run_number,
199 job.attempt,
200 &context.worker_id,
201 Some(&completion),
202 )
203 .await
204 {
205 let release_conflict = is_workflow_release_conflict_error(&error);
206 let error = WorkerError::CompleteSuccess {
207 job_id: job.id,
208 attempt: job.attempt,
209 source: error,
210 };
211 if release_conflict {
212 warn!(
213 %error,
214 job_id = %job.id,
215 "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
216 );
217 } else {
218 error!(%error, job_id = %job.id, "failed to mark job success");
219 }
220 }
221 }
222 Err(failure) => {
223 if is_lease_maintenance_failure(&failure) {
224 warn!(
225 job_id = %job.id,
226 attempt = job.attempt,
227 failure_code = failure.code,
228 "job processing aborted because durable lease maintenance was lost"
229 );
230 return;
231 }
232
233 let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
234 None
235 } else {
236 Some(retry_delay_ms_for_failure(
237 registry.as_ref(),
238 &job,
239 &failure,
240 ))
241 };
242 let failure_payload = JobFailureUpdate {
243 kind: failure.kind,
244 code: failure.code,
245 message: failure.message.as_ref(),
246 retry_delay_ms,
247 };
248 let dead_letter = dead_letter_info(&job, &failure);
249 if let Err(error) = jobs::complete_job_failure(
250 &pool,
251 job.id,
252 job.run_number,
253 job.attempt,
254 &context.worker_id,
255 &failure_payload,
256 )
257 .await
258 {
259 let release_conflict = is_workflow_release_conflict_error(&error);
260 let error = WorkerError::CompleteFailure {
261 job_id: job.id,
262 attempt: job.attempt,
263 source: error,
264 };
265 if release_conflict {
266 warn!(
267 %error,
268 job_id = %job.id,
269 "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
270 );
271 } else {
272 error!(%error, job_id = %job.id, "failed to mark job failure");
273 }
274 } else if let Some(dead_letter) = dead_letter {
275 warn!(
276 job_id = %job.id,
277 job_type = %job.job_type,
278 run_number = job.run_number,
279 attempt = job.attempt,
280 max_attempts = job.max_attempts,
281 organization_id = ?job.organization_id,
282 worker_id = %context.worker_id,
283 dead_letter_reason = ?dead_letter.reason,
284 failure_kind = ?dead_letter.failure.kind,
285 failure_code = dead_letter.failure.code,
286 failure_message = %dead_letter.failure.message,
287 "job dead lettered after handler failure"
288 );
289 notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
290 .await;
291 }
292 }
293 }
294
295 info!(
296 job_id = %job.id,
297 attempt = job.attempt,
298 run_number = job.run_number,
299 elapsed_ms = start.elapsed().as_millis(),
300 "job processed"
301 );
302 }
303 .instrument(job_span)
304 .await;
305}
306
307async fn mark_job_running_or_abort(
308 pool: &runledger_postgres::DbPool,
309 context: &JobContext,
310 job: &jobs::JobQueueRecord,
311) -> bool {
312 let running_progress = JobProgressUpdate {
313 stage: Some(runledger_core::jobs::JobStage::Running),
314 progress_done: None,
315 progress_total: None,
316 checkpoint: None,
317 };
318
319 let Err(source) = jobs::update_job_progress(
320 pool,
321 job.id,
322 job.run_number,
323 job.attempt,
324 &context.worker_id,
325 &running_progress,
326 )
327 .await
328 else {
329 return true;
330 };
331
332 handle_running_progress_persist_failure(pool, context, job, source).await;
333 false
334}
335
336async fn handle_running_progress_persist_failure(
337 pool: &runledger_postgres::DbPool,
338 context: &JobContext,
339 job: &jobs::JobQueueRecord,
340 source: runledger_postgres::Error,
341) {
342 let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
343 let error = WorkerError::SetRunningProgress {
344 job_id: job.id,
345 attempt: job.attempt,
346 source,
347 };
348
349 if lease_owner_mismatch {
350 warn!(
351 %error,
352 job_id = %job.id,
353 attempt = job.attempt,
354 "aborting job before execution because lease ownership was already lost"
355 );
356 return;
357 }
358
359 match jobs::release_unstarted_job_claim(
360 pool,
361 job.id,
362 job.run_number,
363 job.attempt,
364 &context.worker_id,
365 RUNNING_PROGRESS_PERSIST_FAILED_REASON,
366 UNSTARTED_CLAIM_RETRY_DELAY_MS,
367 )
368 .await
369 {
370 Ok(()) => {
371 warn!(
372 %error,
373 job_id = %job.id,
374 attempt = job.attempt,
375 "running progress could not be persisted; released unstarted claim back to pending"
376 );
377 }
378 Err(release_error) => {
379 let no_longer_releasable =
380 is_unstarted_claim_release_not_applicable_error(&release_error);
381 let release_error = WorkerError::ReleaseUnstartedClaim {
382 job_id: job.id,
383 attempt: job.attempt,
384 source: release_error,
385 };
386 if no_longer_releasable {
387 warn!(
388 %error,
389 %release_error,
390 job_id = %job.id,
391 attempt = job.attempt,
392 "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
393 );
394 return;
395 }
396
397 warn!(
398 %error,
399 %release_error,
400 job_id = %job.id,
401 attempt = job.attempt,
402 "running progress could not be persisted; leaving claim for reaper recovery"
403 );
404 }
405 }
406}
407
408async fn execute_job_handler(
409 registry: Arc<JobRegistry>,
410 context: &JobContext,
411 job: &jobs::JobQueueRecord,
412) -> Result<JobProgress, JobFailure> {
413 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
414 return Err(JobFailure::terminal(
415 "job.handler_not_registered",
416 "No handler is registered for this job type.",
417 ));
418 };
419
420 handler
421 .execute(context.clone(), job.payload.clone())
422 .await?;
423
424 Ok(JobProgress {
425 stage: runledger_core::jobs::JobStage::Completed,
426 progress_done: None,
427 progress_total: None,
428 checkpoint: None,
429 })
430}
431
432async fn execute_job_handler_with_heartbeats(
433 pool: runledger_postgres::DbPool,
434 registry: Arc<JobRegistry>,
435 context: &JobContext,
436 job: &jobs::JobQueueRecord,
437 lease_ttl_seconds: i32,
438) -> Result<JobProgress, JobFailure> {
439 let mut execution =
440 Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
441 let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
442 let mut timeout = Box::pin(sleep_until(timeout_deadline));
443
444 let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
445 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
446 ticker.tick().await;
447
448 loop {
449 tokio::select! {
450 result = &mut execution => {
451 return match result {
452 Ok(result) => result,
453 Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
454 };
455 }
456 _ = &mut timeout => {
457 return Err(JobFailure::timeout(
458 "job.timeout_exceeded",
459 "Job exceeded the configured timeout.",
460 ));
461 }
462 _ = ticker.tick() => {
463 if let Err(error) = jobs::heartbeat_job(
464 &pool,
465 job.id,
466 job.run_number,
467 job.attempt,
468 &context.worker_id,
469 lease_ttl_seconds,
470 )
471 .await
472 {
473 let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
474 let error = WorkerError::Heartbeat {
475 job_id: job.id,
476 attempt: job.attempt,
477 source: error,
478 };
479
480 if lease_owner_mismatch {
481 warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
482 return Err(lease_owner_mismatch_failure());
483 }
484
485 warn!(
486 %error,
487 job_id = %job.id,
488 "aborting job because lease heartbeat could not be persisted"
489 );
490 return Err(lease_maintenance_failure());
491 }
492 }
493 }
494 }
495}
496
497fn lease_owner_mismatch_failure() -> JobFailure {
498 JobFailure::lease_expired(
499 LEASE_OWNER_MISMATCH_CODE,
500 "Job lease ownership was lost during processing.",
501 )
502}
503
504fn lease_maintenance_failure() -> JobFailure {
505 JobFailure::lease_expired(
506 LEASE_MAINTENANCE_FAILED_CODE,
507 "Job lease could not be durably maintained during processing.",
508 )
509}
510
511fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
512 JobFailure::panicked(
513 HANDLER_PANIC_CODE,
514 format!(
515 "Job handler panicked: {}",
516 panic_payload_message(&*panic_payload)
517 ),
518 )
519}
520
521fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
522 if let Some(message) = panic_payload.downcast_ref::<String>() {
523 return message.clone();
524 }
525
526 if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
527 return (*message).to_string();
528 }
529
530 "non-string panic payload".to_string()
531}
532
533fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
534 matches!(
535 error,
536 runledger_postgres::Error::QueryError(query_error)
537 if query_error.code() == expected_code
538 )
539}
540
541fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
542 has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
543}
544
545fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
546 has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
547}
548
549fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
550 has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
551}
552
553fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
554 matches!(
555 failure.code,
556 LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
557 )
558}
559
560fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
561 let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
564 Duration::from_secs(seconds)
565}
566
567fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
568 matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
569}
570
571fn retry_delay_ms_for_failure(
572 registry: &JobRegistry,
573 job: &jobs::JobQueueRecord,
574 failure: &JobFailure,
575) -> i32 {
576 registry
577 .retry_delay_override(job.job_type.as_borrowed(), failure.code)
578 .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
579}
580
581fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
582 let reason = if is_non_retryable_failure_kind(failure.kind) {
583 Some(JobDeadLetterReason::FailureKindNonRetryable)
584 } else if job.attempt >= job.max_attempts {
585 Some(JobDeadLetterReason::AttemptsExhausted)
586 } else {
587 None
588 }?;
589
590 Some(JobDeadLetterInfo::new(
591 failure.clone(),
592 reason,
593 Some(job.max_attempts),
594 ))
595}
596
597async fn notify_handler_of_dead_letter(
598 registry: &JobRegistry,
599 context: &JobContext,
600 job: &jobs::JobQueueRecord,
601 dead_letter: JobDeadLetterInfo,
602) {
603 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
604 return;
605 };
606 let context = context.clone();
607 let payload = job.payload.clone();
608
609 let hook_task = tokio::spawn(async move {
610 tokio::time::timeout(
611 TERMINAL_HOOK_TIMEOUT,
612 handler.on_dead_letter(context, payload, dead_letter),
613 )
614 .await
615 .is_ok()
616 });
617 match hook_task.await {
618 Ok(true) => {}
619 Ok(false) => {
620 warn!(
621 job_id = %job.id,
622 job_type = %job.job_type,
623 run_number = job.run_number,
624 attempt = job.attempt,
625 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
626 "dead-letter hook timed out; continuing worker job task"
627 );
628 }
629 Err(error) => log_dead_letter_hook_join_error(job, error),
630 }
631}
632
633fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
634 if error.is_panic() {
635 warn!(
636 job_id = %job.id,
637 job_type = %job.job_type,
638 run_number = job.run_number,
639 attempt = job.attempt,
640 error = %error,
641 "dead-letter hook panicked; continuing worker job task"
642 );
643 } else if error.is_cancelled() {
644 warn!(
645 job_id = %job.id,
646 job_type = %job.job_type,
647 run_number = job.run_number,
648 attempt = job.attempt,
649 error = %error,
650 "dead-letter hook was cancelled; continuing worker job task"
651 );
652 } else {
653 warn!(
654 job_id = %job.id,
655 job_type = %job.job_type,
656 run_number = job.run_number,
657 attempt = job.attempt,
658 error = %error,
659 "dead-letter hook join failed; continuing worker job task"
660 );
661 }
662}
663
664fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
665 let exp = attempt.clamp(1, 10) as u32;
666 let base_ms: i64 = 5_000;
667 let raw = base_ms * (1_i64 << exp);
668 let capped = raw.min(300_000);
669 let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
670 (capped + jitter).max(1_000) as i32
671}
672
673#[cfg(test)]
674mod tests;