1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8 JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
24const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
25const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
26const HANDLER_PANIC_CODE: &str = "job.handler_panic";
27const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
28const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
29 "job.unstarted_claim_release_not_applicable";
30const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
31#[cfg(test)]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
33#[cfg(not(test))]
34const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
35
36pub async fn run_worker_loop(
37 pool: runledger_postgres::DbPool,
38 registry: JobRegistry,
39 config: JobsConfig,
40 mut shutdown: watch::Receiver<bool>,
41) {
42 let registry = Arc::new(registry);
43 let claimable_job_types = registry.registered_types();
44 let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
45 let mut join_set: JoinSet<()> = JoinSet::new();
46
47 loop {
48 drain_finished_tasks(&mut join_set).await;
49
50 if shutdown_requested_or_closed(&shutdown) {
51 break;
52 }
53
54 if claimable_job_types.is_empty() {
55 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
56 break;
57 }
58 continue;
59 }
60
61 let available = semaphore.available_permits();
62 if available == 0 {
63 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
64 break;
65 }
66 continue;
67 }
68
69 let claim_limit = min(available, config.claim_batch_size as usize);
70 let claimed = match jobs::claim_prestart_jobs_for_types(
71 &pool,
72 &config.worker_id,
73 config.lease_ttl_seconds,
74 claim_limit as i64,
75 &claimable_job_types,
76 )
77 .await
78 {
79 Ok(claimed) => claimed,
80 Err(error) => {
81 let error = WorkerError::ClaimJobs {
82 worker_id: config.worker_id.clone(),
83 source: error,
84 };
85 warn!(%error, "worker claim failed");
86 Vec::new()
87 }
88 };
89
90 if claimed.is_empty() {
91 wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
92 continue;
93 }
94
95 let claimed_len = claimed.len();
96 for job in claimed {
97 let permit = match Arc::clone(&semaphore).acquire_owned().await {
98 Ok(permit) => permit,
99 Err(_) => break,
100 };
101 let pool_clone = pool.clone();
102 let registry_clone = Arc::clone(®istry);
103 let lease_ttl_seconds = config.lease_ttl_seconds;
104 join_set.spawn(async move {
105 let _permit = permit;
106 process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
107 });
108 }
109
110 if claimed_len == claim_limit {
111 continue;
112 }
113
114 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
115 break;
116 }
117 }
118
119 info!("worker shutdown requested; draining in-flight jobs");
120 while join_set.join_next().await.is_some() {}
121}
122
123async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
124 while let Some(result) = join_set.try_join_next() {
125 if let Err(error) = result {
126 error!(%error, "job task crashed");
127 }
128 }
129}
130
131fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
132 *shutdown.borrow() || shutdown.has_changed().is_err()
133}
134
135async fn wait_for_shutdown_or_poll(
136 shutdown: &mut watch::Receiver<bool>,
137 poll_interval: Duration,
138) -> bool {
139 tokio::select! {
140 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
141 _ = sleep(poll_interval) => false,
142 }
143}
144
145async fn process_claimed_job(
146 pool: runledger_postgres::DbPool,
147 registry: Arc<JobRegistry>,
148 job: jobs::JobQueueRecord,
149 lease_ttl_seconds: i32,
150) {
151 let worker_id = job
152 .worker_id
153 .clone()
154 .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
155
156 let job_span = info_span!(
157 "job",
158 sentry.name = %job.job_type,
159 sentry.op = "runledger.job",
160 job_id = %job.id,
161 job_type = %job.job_type,
162 run_number = job.run_number,
163 attempt = job.attempt,
164 organization_id = ?job.organization_id,
165 worker_id = %worker_id,
166 );
167 async {
168 let start = Instant::now();
169 let context = JobContext {
170 job_id: job.id,
171 run_number: job.run_number,
172 attempt: job.attempt,
173 organization_id: job.organization_id,
174 worker_id: worker_id.clone(),
175 };
176
177 if !mark_job_running_or_abort(&pool, &context, &job).await {
178 return;
179 }
180
181 match execute_job_handler_with_heartbeats(
182 pool.clone(),
183 Arc::clone(®istry),
184 &context,
185 &job,
186 lease_ttl_seconds,
187 )
188 .await
189 {
190 Ok(progress) => {
191 let completion = JobProgressUpdate {
192 stage: Some(progress.stage),
193 progress_done: progress.progress_done,
194 progress_total: progress.progress_total,
195 checkpoint: progress.checkpoint.as_ref(),
196 };
197 if let Err(error) = jobs::complete_job_success(
198 &pool,
199 job.id,
200 job.run_number,
201 job.attempt,
202 &context.worker_id,
203 Some(&completion),
204 )
205 .await
206 {
207 let release_conflict = is_workflow_release_conflict_error(&error);
208 let error = WorkerError::CompleteSuccess {
209 job_id: job.id,
210 attempt: job.attempt,
211 source: error,
212 };
213 if release_conflict {
214 warn!(
215 %error,
216 job_id = %job.id,
217 "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
218 );
219 } else {
220 error!(%error, job_id = %job.id, "failed to mark job success");
221 }
222 }
223 }
224 Err(failure) => {
225 if is_lease_maintenance_failure(&failure) {
226 warn!(
227 job_id = %job.id,
228 attempt = job.attempt,
229 failure_code = failure.code,
230 "job processing aborted because durable lease maintenance was lost"
231 );
232 return;
233 }
234
235 let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
236 None
237 } else {
238 Some(retry_delay_ms_for_failure(
239 registry.as_ref(),
240 &job,
241 &failure,
242 ))
243 };
244 let failure_payload = JobFailureUpdate {
245 kind: failure.kind,
246 code: failure.code,
247 message: failure.message.as_ref(),
248 retry_delay_ms,
249 };
250 let dead_letter = dead_letter_info(&job, &failure);
251 if let Err(error) = jobs::complete_job_failure(
252 &pool,
253 job.id,
254 job.run_number,
255 job.attempt,
256 &context.worker_id,
257 &failure_payload,
258 )
259 .await
260 {
261 let release_conflict = is_workflow_release_conflict_error(&error);
262 let error = WorkerError::CompleteFailure {
263 job_id: job.id,
264 attempt: job.attempt,
265 source: error,
266 };
267 if release_conflict {
268 warn!(
269 %error,
270 job_id = %job.id,
271 "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
272 );
273 } else {
274 error!(%error, job_id = %job.id, "failed to mark job failure");
275 }
276 } else if let Some(dead_letter) = dead_letter {
277 warn!(
278 job_id = %job.id,
279 job_type = %job.job_type,
280 run_number = job.run_number,
281 attempt = job.attempt,
282 max_attempts = job.max_attempts,
283 organization_id = ?job.organization_id,
284 worker_id = %context.worker_id,
285 dead_letter_reason = ?dead_letter.reason,
286 failure_kind = ?dead_letter.failure.kind,
287 failure_code = dead_letter.failure.code,
288 failure_message = %dead_letter.failure.message,
289 "job dead lettered after handler failure"
290 );
291 notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
292 .await;
293 }
294 }
295 }
296
297 info!(
298 job_id = %job.id,
299 attempt = job.attempt,
300 run_number = job.run_number,
301 elapsed_ms = start.elapsed().as_millis(),
302 "job processed"
303 );
304 }
305 .instrument(job_span)
306 .await;
307}
308
309async fn mark_job_running_or_abort(
310 pool: &runledger_postgres::DbPool,
311 context: &JobContext,
312 job: &jobs::JobQueueRecord,
313) -> bool {
314 let running_progress = JobProgressUpdate {
315 stage: Some(runledger_core::jobs::JobStage::Running),
316 progress_done: None,
317 progress_total: None,
318 checkpoint: None,
319 };
320
321 let Err(source) = jobs::update_job_progress(
322 pool,
323 job.id,
324 job.run_number,
325 job.attempt,
326 &context.worker_id,
327 &running_progress,
328 )
329 .await
330 else {
331 return true;
332 };
333
334 handle_running_progress_persist_failure(pool, context, job, source).await;
335 false
336}
337
338async fn handle_running_progress_persist_failure(
339 pool: &runledger_postgres::DbPool,
340 context: &JobContext,
341 job: &jobs::JobQueueRecord,
342 source: runledger_postgres::Error,
343) {
344 let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
345 let error = WorkerError::SetRunningProgress {
346 job_id: job.id,
347 attempt: job.attempt,
348 source,
349 };
350
351 if lease_owner_mismatch {
352 warn!(
353 %error,
354 job_id = %job.id,
355 attempt = job.attempt,
356 "aborting job before execution because lease ownership was already lost"
357 );
358 return;
359 }
360
361 match jobs::release_unstarted_job_claim(
362 pool,
363 job.id,
364 job.run_number,
365 job.attempt,
366 &context.worker_id,
367 RUNNING_PROGRESS_PERSIST_FAILED_REASON,
368 UNSTARTED_CLAIM_RETRY_DELAY_MS,
369 )
370 .await
371 {
372 Ok(()) => {
373 warn!(
374 %error,
375 job_id = %job.id,
376 attempt = job.attempt,
377 "running progress could not be persisted; released unstarted claim back to pending"
378 );
379 }
380 Err(release_error) => {
381 let no_longer_releasable =
382 is_unstarted_claim_release_not_applicable_error(&release_error);
383 let release_error = WorkerError::ReleaseUnstartedClaim {
384 job_id: job.id,
385 attempt: job.attempt,
386 source: release_error,
387 };
388 if no_longer_releasable {
389 warn!(
390 %error,
391 %release_error,
392 job_id = %job.id,
393 attempt = job.attempt,
394 "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
395 );
396 return;
397 }
398
399 warn!(
400 %error,
401 %release_error,
402 job_id = %job.id,
403 attempt = job.attempt,
404 "running progress could not be persisted; leaving claim for reaper recovery"
405 );
406 }
407 }
408}
409
410async fn execute_job_handler(
411 registry: Arc<JobRegistry>,
412 context: &JobContext,
413 job: &jobs::JobQueueRecord,
414) -> Result<JobProgress, JobFailure> {
415 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
416 return Err(JobFailure::terminal(
417 "job.handler_not_registered",
418 "No handler is registered for this job type.",
419 ));
420 };
421
422 handler
423 .execute(context.clone(), job.payload.clone())
424 .await?;
425
426 Ok(JobProgress {
427 stage: runledger_core::jobs::JobStage::Completed,
428 progress_done: None,
429 progress_total: None,
430 checkpoint: None,
431 })
432}
433
434async fn execute_job_handler_with_heartbeats(
435 pool: runledger_postgres::DbPool,
436 registry: Arc<JobRegistry>,
437 context: &JobContext,
438 job: &jobs::JobQueueRecord,
439 lease_ttl_seconds: i32,
440) -> Result<JobProgress, JobFailure> {
441 let mut execution =
442 Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
443 let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
444 let mut timeout = Box::pin(sleep_until(timeout_deadline));
445
446 let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
447 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
448 ticker.tick().await;
449
450 loop {
451 tokio::select! {
452 result = &mut execution => {
453 return match result {
454 Ok(result) => result,
455 Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
456 };
457 }
458 _ = &mut timeout => {
459 return Err(JobFailure::timeout(
460 "job.timeout_exceeded",
461 "Job exceeded the configured timeout.",
462 ));
463 }
464 _ = ticker.tick() => {
465 if let Err(error) = jobs::heartbeat_job(
466 &pool,
467 job.id,
468 job.run_number,
469 job.attempt,
470 &context.worker_id,
471 lease_ttl_seconds,
472 )
473 .await
474 {
475 let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
476 let error = WorkerError::Heartbeat {
477 job_id: job.id,
478 attempt: job.attempt,
479 source: error,
480 };
481
482 if lease_owner_mismatch {
483 warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
484 return Err(lease_owner_mismatch_failure());
485 }
486
487 warn!(
488 %error,
489 job_id = %job.id,
490 "aborting job because lease heartbeat could not be persisted"
491 );
492 return Err(lease_maintenance_failure());
493 }
494 }
495 }
496 }
497}
498
499fn lease_owner_mismatch_failure() -> JobFailure {
500 JobFailure::lease_expired(
501 LEASE_OWNER_MISMATCH_CODE,
502 "Job lease ownership was lost during processing.",
503 )
504}
505
506fn lease_maintenance_failure() -> JobFailure {
507 JobFailure::lease_expired(
508 LEASE_MAINTENANCE_FAILED_CODE,
509 "Job lease could not be durably maintained during processing.",
510 )
511}
512
513fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
514 JobFailure::panicked(
515 HANDLER_PANIC_CODE,
516 format!(
517 "Job handler panicked: {}",
518 panic_payload_message(&*panic_payload)
519 ),
520 )
521}
522
523fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
524 if let Some(message) = panic_payload.downcast_ref::<String>() {
525 return message.clone();
526 }
527
528 if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
529 return (*message).to_string();
530 }
531
532 "non-string panic payload".to_string()
533}
534
535fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
536 matches!(
537 error,
538 runledger_postgres::Error::QueryError(query_error)
539 if query_error.code() == expected_code
540 )
541}
542
543fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
544 has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
545}
546
547fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
548 has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
549}
550
551fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
552 has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
553}
554
555fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
556 matches!(
557 failure.code,
558 LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
559 )
560}
561
562fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
563 let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
566 Duration::from_secs(seconds)
567}
568
569fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
570 matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
571}
572
573fn retry_delay_ms_for_failure(
574 registry: &JobRegistry,
575 job: &jobs::JobQueueRecord,
576 failure: &JobFailure,
577) -> i32 {
578 registry
579 .retry_delay_override(job.job_type.as_borrowed(), failure.code)
580 .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
581}
582
583fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
584 let reason = if is_non_retryable_failure_kind(failure.kind) {
585 Some(JobDeadLetterReason::FailureKindNonRetryable)
586 } else if job.attempt >= job.max_attempts {
587 Some(JobDeadLetterReason::AttemptsExhausted)
588 } else {
589 None
590 }?;
591
592 Some(JobDeadLetterInfo::new(
593 failure.clone(),
594 reason,
595 Some(job.max_attempts),
596 ))
597}
598
599async fn notify_handler_of_dead_letter(
600 registry: &JobRegistry,
601 context: &JobContext,
602 job: &jobs::JobQueueRecord,
603 dead_letter: JobDeadLetterInfo,
604) {
605 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
606 return;
607 };
608 let context = context.clone();
609 let payload = job.payload.clone();
610
611 let hook_task = tokio::spawn(async move {
612 tokio::time::timeout(
613 TERMINAL_HOOK_TIMEOUT,
614 handler.on_dead_letter(context, payload, dead_letter),
615 )
616 .await
617 .is_ok()
618 });
619 match hook_task.await {
620 Ok(true) => {}
621 Ok(false) => {
622 warn!(
623 job_id = %job.id,
624 job_type = %job.job_type,
625 run_number = job.run_number,
626 attempt = job.attempt,
627 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
628 "dead-letter hook timed out; continuing worker job task"
629 );
630 }
631 Err(error) => log_dead_letter_hook_join_error(job, error),
632 }
633}
634
635fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
636 if error.is_panic() {
637 warn!(
638 job_id = %job.id,
639 job_type = %job.job_type,
640 run_number = job.run_number,
641 attempt = job.attempt,
642 error = %error,
643 "dead-letter hook panicked; continuing worker job task"
644 );
645 } else if error.is_cancelled() {
646 warn!(
647 job_id = %job.id,
648 job_type = %job.job_type,
649 run_number = job.run_number,
650 attempt = job.attempt,
651 error = %error,
652 "dead-letter hook was cancelled; continuing worker job task"
653 );
654 } else {
655 warn!(
656 job_id = %job.id,
657 job_type = %job.job_type,
658 run_number = job.run_number,
659 attempt = job.attempt,
660 error = %error,
661 "dead-letter hook join failed; continuing worker job task"
662 );
663 }
664}
665
666fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
667 let exp = attempt.clamp(1, 10) as u32;
668 let base_ms: i64 = 5_000;
669 let raw = base_ms * (1_i64 << exp);
670 let capped = raw.min(300_000);
671 let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
672 (capped + jitter).max(1_000) as i32
673}
674
675#[cfg(test)]
676mod tests;