1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8 JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::WorkerError;
17use crate::config::JobsConfig;
18use crate::registry::JobRegistry;
19
20const UNKNOWN_WORKER_ID: &str = "unknown-worker";
21const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
22const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
23const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
24const HANDLER_PANIC_CODE: &str = "job.handler_panic";
25const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
26const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
27 "job.unstarted_claim_release_not_applicable";
28const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
29#[cfg(test)]
30const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
31#[cfg(not(test))]
32const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
33
34pub async fn run_worker_loop(
35 pool: runledger_postgres::DbPool,
36 registry: JobRegistry,
37 config: JobsConfig,
38 mut shutdown: watch::Receiver<bool>,
39) {
40 let registry = Arc::new(registry);
41 let claimable_job_types = registry.registered_types();
42 let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
43 let mut join_set: JoinSet<()> = JoinSet::new();
44
45 loop {
46 drain_finished_tasks(&mut join_set).await;
47
48 if shutdown_requested_or_closed(&shutdown) {
49 break;
50 }
51
52 if claimable_job_types.is_empty() {
53 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
54 break;
55 }
56 continue;
57 }
58
59 let available = semaphore.available_permits();
60 if available == 0 {
61 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
62 break;
63 }
64 continue;
65 }
66
67 let claim_limit = min(available, config.claim_batch_size as usize);
68 let claimed = match jobs::claim_prestart_jobs_for_types(
69 &pool,
70 &config.worker_id,
71 config.lease_ttl_seconds,
72 claim_limit as i64,
73 &claimable_job_types,
74 )
75 .await
76 {
77 Ok(claimed) => claimed,
78 Err(error) => {
79 let error = WorkerError::ClaimJobs {
80 worker_id: config.worker_id.clone(),
81 source: error,
82 };
83 warn!(%error, "worker claim failed");
84 Vec::new()
85 }
86 };
87
88 if claimed.is_empty() {
89 wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
90 continue;
91 }
92
93 let claimed_len = claimed.len();
94 for job in claimed {
95 let permit = match Arc::clone(&semaphore).acquire_owned().await {
96 Ok(permit) => permit,
97 Err(_) => break,
98 };
99 let pool_clone = pool.clone();
100 let registry_clone = Arc::clone(®istry);
101 let lease_ttl_seconds = config.lease_ttl_seconds;
102 join_set.spawn(async move {
103 let _permit = permit;
104 process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
105 });
106 }
107
108 if claimed_len == claim_limit {
109 continue;
110 }
111
112 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
113 break;
114 }
115 }
116
117 info!("worker shutdown requested; draining in-flight jobs");
118 while join_set.join_next().await.is_some() {}
119}
120
121async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
122 while let Some(result) = join_set.try_join_next() {
123 if let Err(error) = result {
124 error!(%error, "job task crashed");
125 }
126 }
127}
128
129fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
130 *shutdown.borrow() || shutdown.has_changed().is_err()
131}
132
133async fn wait_for_shutdown_or_poll(
134 shutdown: &mut watch::Receiver<bool>,
135 poll_interval: Duration,
136) -> bool {
137 tokio::select! {
138 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
139 _ = sleep(poll_interval) => false,
140 }
141}
142
143async fn process_claimed_job(
144 pool: runledger_postgres::DbPool,
145 registry: Arc<JobRegistry>,
146 job: jobs::JobQueueRecord,
147 lease_ttl_seconds: i32,
148) {
149 let worker_id = job
150 .worker_id
151 .clone()
152 .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
153
154 let job_span = info_span!(
155 "job",
156 sentry.name = %job.job_type,
157 sentry.op = "runledger.job",
158 job_id = %job.id,
159 job_type = %job.job_type,
160 run_number = job.run_number,
161 attempt = job.attempt,
162 organization_id = ?job.organization_id,
163 worker_id = %worker_id,
164 );
165 async {
166 let start = Instant::now();
167 let context = JobContext {
168 job_id: job.id,
169 run_number: job.run_number,
170 attempt: job.attempt,
171 organization_id: job.organization_id,
172 worker_id: worker_id.clone(),
173 };
174
175 if !mark_job_running_or_abort(&pool, &context, &job).await {
176 return;
177 }
178
179 match execute_job_handler_with_heartbeats(
180 pool.clone(),
181 Arc::clone(®istry),
182 &context,
183 &job,
184 lease_ttl_seconds,
185 )
186 .await
187 {
188 Ok(progress) => {
189 let completion = JobProgressUpdate {
190 stage: Some(progress.stage),
191 progress_done: progress.progress_done,
192 progress_total: progress.progress_total,
193 checkpoint: progress.checkpoint.as_ref(),
194 };
195 if let Err(error) = jobs::complete_job_success(
196 &pool,
197 job.id,
198 job.run_number,
199 job.attempt,
200 &context.worker_id,
201 Some(&completion),
202 )
203 .await
204 {
205 let release_conflict = is_workflow_release_conflict_error(&error);
206 let error = WorkerError::CompleteSuccess {
207 job_id: job.id,
208 attempt: job.attempt,
209 source: error,
210 };
211 if release_conflict {
212 warn!(
213 %error,
214 job_id = %job.id,
215 "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
216 );
217 } else {
218 error!(%error, job_id = %job.id, "failed to mark job success");
219 }
220 }
221 }
222 Err(failure) => {
223 if is_lease_maintenance_failure(&failure) {
224 warn!(
225 job_id = %job.id,
226 attempt = job.attempt,
227 failure_code = failure.code,
228 "job processing aborted because durable lease maintenance was lost"
229 );
230 return;
231 }
232
233 let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
234 None
235 } else {
236 Some(compute_retry_delay_ms(job.attempt, job.id))
237 };
238 let failure_payload = JobFailureUpdate {
239 kind: failure.kind,
240 code: failure.code,
241 message: failure.message.as_ref(),
242 retry_delay_ms,
243 };
244 let dead_letter = dead_letter_info(&job, &failure);
245 if let Err(error) = jobs::complete_job_failure(
246 &pool,
247 job.id,
248 job.run_number,
249 job.attempt,
250 &context.worker_id,
251 &failure_payload,
252 )
253 .await
254 {
255 let release_conflict = is_workflow_release_conflict_error(&error);
256 let error = WorkerError::CompleteFailure {
257 job_id: job.id,
258 attempt: job.attempt,
259 source: error,
260 };
261 if release_conflict {
262 warn!(
263 %error,
264 job_id = %job.id,
265 "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
266 );
267 } else {
268 error!(%error, job_id = %job.id, "failed to mark job failure");
269 }
270 } else if let Some(dead_letter) = dead_letter {
271 warn!(
272 job_id = %job.id,
273 job_type = %job.job_type,
274 run_number = job.run_number,
275 attempt = job.attempt,
276 max_attempts = job.max_attempts,
277 organization_id = ?job.organization_id,
278 worker_id = %context.worker_id,
279 dead_letter_reason = ?dead_letter.reason,
280 failure_kind = ?dead_letter.failure.kind,
281 failure_code = dead_letter.failure.code,
282 failure_message = %dead_letter.failure.message,
283 "job dead lettered after handler failure"
284 );
285 notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
286 .await;
287 }
288 }
289 }
290
291 info!(
292 job_id = %job.id,
293 attempt = job.attempt,
294 run_number = job.run_number,
295 elapsed_ms = start.elapsed().as_millis(),
296 "job processed"
297 );
298 }
299 .instrument(job_span)
300 .await;
301}
302
303async fn mark_job_running_or_abort(
304 pool: &runledger_postgres::DbPool,
305 context: &JobContext,
306 job: &jobs::JobQueueRecord,
307) -> bool {
308 let running_progress = JobProgressUpdate {
309 stage: Some(runledger_core::jobs::JobStage::Running),
310 progress_done: None,
311 progress_total: None,
312 checkpoint: None,
313 };
314
315 let Err(source) = jobs::update_job_progress(
316 pool,
317 job.id,
318 job.run_number,
319 job.attempt,
320 &context.worker_id,
321 &running_progress,
322 )
323 .await
324 else {
325 return true;
326 };
327
328 handle_running_progress_persist_failure(pool, context, job, source).await;
329 false
330}
331
332async fn handle_running_progress_persist_failure(
333 pool: &runledger_postgres::DbPool,
334 context: &JobContext,
335 job: &jobs::JobQueueRecord,
336 source: runledger_postgres::Error,
337) {
338 let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
339 let error = WorkerError::SetRunningProgress {
340 job_id: job.id,
341 attempt: job.attempt,
342 source,
343 };
344
345 if lease_owner_mismatch {
346 warn!(
347 %error,
348 job_id = %job.id,
349 attempt = job.attempt,
350 "aborting job before execution because lease ownership was already lost"
351 );
352 return;
353 }
354
355 match jobs::release_unstarted_job_claim(
356 pool,
357 job.id,
358 job.run_number,
359 job.attempt,
360 &context.worker_id,
361 RUNNING_PROGRESS_PERSIST_FAILED_REASON,
362 UNSTARTED_CLAIM_RETRY_DELAY_MS,
363 )
364 .await
365 {
366 Ok(()) => {
367 warn!(
368 %error,
369 job_id = %job.id,
370 attempt = job.attempt,
371 "running progress could not be persisted; released unstarted claim back to pending"
372 );
373 }
374 Err(release_error) => {
375 let no_longer_releasable =
376 is_unstarted_claim_release_not_applicable_error(&release_error);
377 let release_error = WorkerError::ReleaseUnstartedClaim {
378 job_id: job.id,
379 attempt: job.attempt,
380 source: release_error,
381 };
382 if no_longer_releasable {
383 warn!(
384 %error,
385 %release_error,
386 job_id = %job.id,
387 attempt = job.attempt,
388 "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
389 );
390 return;
391 }
392
393 warn!(
394 %error,
395 %release_error,
396 job_id = %job.id,
397 attempt = job.attempt,
398 "running progress could not be persisted; leaving claim for reaper recovery"
399 );
400 }
401 }
402}
403
404async fn execute_job_handler(
405 registry: Arc<JobRegistry>,
406 context: &JobContext,
407 job: &jobs::JobQueueRecord,
408) -> Result<JobProgress, JobFailure> {
409 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
410 return Err(JobFailure::terminal(
411 "job.handler_not_registered",
412 "No handler is registered for this job type.",
413 ));
414 };
415
416 handler
417 .execute(context.clone(), job.payload.clone())
418 .await?;
419
420 Ok(JobProgress {
421 stage: runledger_core::jobs::JobStage::Completed,
422 progress_done: None,
423 progress_total: None,
424 checkpoint: None,
425 })
426}
427
428async fn execute_job_handler_with_heartbeats(
429 pool: runledger_postgres::DbPool,
430 registry: Arc<JobRegistry>,
431 context: &JobContext,
432 job: &jobs::JobQueueRecord,
433 lease_ttl_seconds: i32,
434) -> Result<JobProgress, JobFailure> {
435 let mut execution =
436 Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
437 let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
438 let mut timeout = Box::pin(sleep_until(timeout_deadline));
439
440 let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
441 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
442 ticker.tick().await;
443
444 loop {
445 tokio::select! {
446 result = &mut execution => {
447 return match result {
448 Ok(result) => result,
449 Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
450 };
451 }
452 _ = &mut timeout => {
453 return Err(JobFailure::timeout(
454 "job.timeout_exceeded",
455 "Job exceeded the configured timeout.",
456 ));
457 }
458 _ = ticker.tick() => {
459 if let Err(error) = jobs::heartbeat_job(
460 &pool,
461 job.id,
462 job.run_number,
463 job.attempt,
464 &context.worker_id,
465 lease_ttl_seconds,
466 )
467 .await
468 {
469 let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
470 let error = WorkerError::Heartbeat {
471 job_id: job.id,
472 attempt: job.attempt,
473 source: error,
474 };
475
476 if lease_owner_mismatch {
477 warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
478 return Err(lease_owner_mismatch_failure());
479 }
480
481 warn!(
482 %error,
483 job_id = %job.id,
484 "aborting job because lease heartbeat could not be persisted"
485 );
486 return Err(lease_maintenance_failure());
487 }
488 }
489 }
490 }
491}
492
493fn lease_owner_mismatch_failure() -> JobFailure {
494 JobFailure::lease_expired(
495 LEASE_OWNER_MISMATCH_CODE,
496 "Job lease ownership was lost during processing.",
497 )
498}
499
500fn lease_maintenance_failure() -> JobFailure {
501 JobFailure::lease_expired(
502 LEASE_MAINTENANCE_FAILED_CODE,
503 "Job lease could not be durably maintained during processing.",
504 )
505}
506
507fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
508 JobFailure::panicked(
509 HANDLER_PANIC_CODE,
510 format!(
511 "Job handler panicked: {}",
512 panic_payload_message(&*panic_payload)
513 ),
514 )
515}
516
517fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
518 if let Some(message) = panic_payload.downcast_ref::<String>() {
519 return message.clone();
520 }
521
522 if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
523 return (*message).to_string();
524 }
525
526 "non-string panic payload".to_string()
527}
528
529fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
530 matches!(
531 error,
532 runledger_postgres::Error::QueryError(query_error)
533 if query_error.code() == expected_code
534 )
535}
536
537fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
538 has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
539}
540
541fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
542 has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
543}
544
545fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
546 has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
547}
548
549fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
550 matches!(
551 failure.code,
552 LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
553 )
554}
555
556fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
557 let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
560 Duration::from_secs(seconds)
561}
562
563fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
564 matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
565}
566
567fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
568 let reason = if is_non_retryable_failure_kind(failure.kind) {
569 Some(JobDeadLetterReason::FailureKindNonRetryable)
570 } else if job.attempt >= job.max_attempts {
571 Some(JobDeadLetterReason::AttemptsExhausted)
572 } else {
573 None
574 }?;
575
576 Some(JobDeadLetterInfo::new(
577 failure.clone(),
578 reason,
579 Some(job.max_attempts),
580 ))
581}
582
583async fn notify_handler_of_dead_letter(
584 registry: &JobRegistry,
585 context: &JobContext,
586 job: &jobs::JobQueueRecord,
587 dead_letter: JobDeadLetterInfo,
588) {
589 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
590 return;
591 };
592 let context = context.clone();
593 let payload = job.payload.clone();
594
595 let hook_task = tokio::spawn(async move {
596 tokio::time::timeout(
597 TERMINAL_HOOK_TIMEOUT,
598 handler.on_dead_letter(context, payload, dead_letter),
599 )
600 .await
601 .is_ok()
602 });
603 match hook_task.await {
604 Ok(true) => {}
605 Ok(false) => {
606 warn!(
607 job_id = %job.id,
608 job_type = %job.job_type,
609 run_number = job.run_number,
610 attempt = job.attempt,
611 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
612 "dead-letter hook timed out; continuing worker job task"
613 );
614 }
615 Err(error) => log_dead_letter_hook_join_error(job, error),
616 }
617}
618
619fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
620 if error.is_panic() {
621 warn!(
622 job_id = %job.id,
623 job_type = %job.job_type,
624 run_number = job.run_number,
625 attempt = job.attempt,
626 error = %error,
627 "dead-letter hook panicked; continuing worker job task"
628 );
629 } else if error.is_cancelled() {
630 warn!(
631 job_id = %job.id,
632 job_type = %job.job_type,
633 run_number = job.run_number,
634 attempt = job.attempt,
635 error = %error,
636 "dead-letter hook was cancelled; continuing worker job task"
637 );
638 } else {
639 warn!(
640 job_id = %job.id,
641 job_type = %job.job_type,
642 run_number = job.run_number,
643 attempt = job.attempt,
644 error = %error,
645 "dead-letter hook join failed; continuing worker job task"
646 );
647 }
648}
649
650fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
651 let exp = attempt.clamp(1, 10) as u32;
652 let base_ms: i64 = 5_000;
653 let raw = base_ms * (1_i64 << exp);
654 let capped = raw.min(300_000);
655 let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
656 (capped + jitter).max(1_000) as i32
657}
658
659#[cfg(test)]
660mod tests;