1use std::any::Any;
2use std::cmp::min;
3use std::panic::AssertUnwindSafe;
4use std::sync::Arc;
5
6use futures_util::FutureExt;
7use runledger_core::jobs::{
8 JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobFailureKind, JobProgress,
9};
10use runledger_postgres::jobs::{self, JobFailureUpdate, JobProgressUpdate};
11use tokio::sync::{Semaphore, watch};
12use tokio::task::JoinSet;
13use tokio::time::{Duration, Instant, MissedTickBehavior, sleep, sleep_until};
14use tracing::{Instrument, error, info, info_span, warn};
15
16use crate::RuntimeLoopExit;
17use crate::WorkerError;
18use crate::config::JobsConfig;
19use crate::registry::JobRegistry;
20
21const UNKNOWN_WORKER_ID: &str = "unknown-worker";
22const LEASE_OWNER_MISMATCH_CODE: &str = "job.lease_owner_mismatch";
25const LEASE_MAINTENANCE_FAILED_CODE: &str = "job.lease_maintenance_failed";
26const WORKFLOW_RELEASE_CONFLICT_CODE: &str = "workflow.release_conflict";
27const HANDLER_PANIC_CODE: &str = "job.handler_panic";
28const RUNNING_PROGRESS_PERSIST_FAILED_REASON: &str = "RUNNING_PROGRESS_PERSIST_FAILED";
29const UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE: &str =
30 "job.unstarted_claim_release_not_applicable";
31const UNSTARTED_CLAIM_RETRY_DELAY_MS: i32 = 1_000;
32#[cfg(test)]
33const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
34#[cfg(not(test))]
35const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
36
37pub async fn run_worker_loop(
38 pool: runledger_postgres::DbPool,
39 registry: JobRegistry,
40 config: JobsConfig,
41 mut shutdown: watch::Receiver<bool>,
42) -> RuntimeLoopExit {
43 let registry = Arc::new(registry);
44 let claimable_job_types = registry.registered_types();
45 let semaphore = Arc::new(Semaphore::new(config.max_global_concurrency));
46 let mut join_set: JoinSet<()> = JoinSet::new();
47
48 loop {
49 drain_finished_tasks(&mut join_set).await;
50
51 if shutdown_requested_or_closed(&shutdown) {
52 return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
53 }
54
55 if claimable_job_types.is_empty() {
56 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
57 return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
58 }
59 continue;
60 }
61
62 let available = semaphore.available_permits();
63 if available == 0 {
64 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
65 return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
66 }
67 continue;
68 }
69
70 let claim_limit = min(available, config.claim_batch_size as usize);
71 let claimed = match jobs::claim_prestart_jobs_for_types(
72 &pool,
73 &config.worker_id,
74 config.lease_ttl_seconds,
75 claim_limit as i64,
76 &claimable_job_types,
77 )
78 .await
79 {
80 Ok(claimed) => claimed,
81 Err(error) => {
82 let error = WorkerError::ClaimJobs {
83 worker_id: config.worker_id.clone(),
84 source: error,
85 };
86 warn!(%error, "worker claim failed");
87 Vec::new()
88 }
89 };
90
91 if claimed.is_empty() {
92 wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await;
93 continue;
94 }
95
96 let claimed_len = claimed.len();
97 for job in claimed {
98 let permit = match Arc::clone(&semaphore).acquire_owned().await {
99 Ok(permit) => permit,
100 Err(_) => {
101 warn!("worker semaphore closed; stopping worker loop");
105 return drain_in_flight_jobs(join_set, RuntimeLoopExit::Completed).await;
106 }
107 };
108 let pool_clone = pool.clone();
109 let registry_clone = Arc::clone(®istry);
110 let lease_ttl_seconds = config.lease_ttl_seconds;
111 join_set.spawn(async move {
112 let _permit = permit;
113 process_claimed_job(pool_clone, registry_clone, job, lease_ttl_seconds).await;
114 });
115 }
116
117 if claimed_len == claim_limit {
118 continue;
119 }
120
121 if wait_for_shutdown_or_poll(&mut shutdown, config.poll_interval).await {
122 return drain_in_flight_jobs(join_set, RuntimeLoopExit::Shutdown).await;
123 }
124 }
125}
126
127async fn drain_in_flight_jobs(mut join_set: JoinSet<()>, exit: RuntimeLoopExit) -> RuntimeLoopExit {
128 if !join_set.is_empty() {
129 match exit {
130 RuntimeLoopExit::Shutdown => {
131 info!("worker shutdown requested; draining in-flight jobs")
132 }
133 RuntimeLoopExit::Completed => {
134 warn!("worker loop completed before shutdown; draining in-flight jobs");
135 }
136 }
137 }
138 while join_set.join_next().await.is_some() {}
139 exit
140}
141
142async fn drain_finished_tasks(join_set: &mut JoinSet<()>) {
143 while let Some(result) = join_set.try_join_next() {
144 if let Err(error) = result {
145 error!(%error, "job task crashed");
146 }
147 }
148}
149
150fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
151 *shutdown.borrow() || shutdown.has_changed().is_err()
152}
153
154async fn wait_for_shutdown_or_poll(
155 shutdown: &mut watch::Receiver<bool>,
156 poll_interval: Duration,
157) -> bool {
158 tokio::select! {
159 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
160 _ = sleep(poll_interval) => false,
161 }
162}
163
164async fn process_claimed_job(
165 pool: runledger_postgres::DbPool,
166 registry: Arc<JobRegistry>,
167 job: jobs::JobQueueRecord,
168 lease_ttl_seconds: i32,
169) {
170 let worker_id = job
171 .worker_id
172 .clone()
173 .unwrap_or_else(|| UNKNOWN_WORKER_ID.to_owned());
174
175 let job_span = info_span!(
176 "job",
177 sentry.name = %job.job_type,
178 sentry.op = "runledger.job",
179 job_id = %job.id,
180 job_type = %job.job_type,
181 run_number = job.run_number,
182 attempt = job.attempt,
183 organization_id = ?job.organization_id,
184 worker_id = %worker_id,
185 );
186 async {
187 let start = Instant::now();
188 let context = JobContext {
189 job_id: job.id,
190 run_number: job.run_number,
191 attempt: job.attempt,
192 organization_id: job.organization_id,
193 worker_id: worker_id.clone(),
194 };
195
196 if !mark_job_running_or_abort(&pool, &context, &job).await {
197 return;
198 }
199
200 match execute_job_handler_with_heartbeats(
201 pool.clone(),
202 Arc::clone(®istry),
203 &context,
204 &job,
205 lease_ttl_seconds,
206 )
207 .await
208 {
209 Ok(progress) => {
210 let completion = JobProgressUpdate {
211 stage: Some(progress.stage),
212 progress_done: progress.progress_done,
213 progress_total: progress.progress_total,
214 checkpoint: progress.checkpoint.as_ref(),
215 };
216 if let Err(error) = jobs::complete_job_success(
217 &pool,
218 job.id,
219 job.run_number,
220 job.attempt,
221 &context.worker_id,
222 Some(&completion),
223 )
224 .await
225 {
226 let release_conflict = is_workflow_release_conflict_error(&error);
227 let error = WorkerError::CompleteSuccess {
228 job_id: job.id,
229 attempt: job.attempt,
230 source: error,
231 };
232 if release_conflict {
233 warn!(
234 %error,
235 job_id = %job.id,
236 "job success completion conflicted with workflow cancellation; leaving lease for reaper recovery"
237 );
238 } else {
239 error!(%error, job_id = %job.id, "failed to mark job success");
240 }
241 }
242 }
243 Err(failure) => {
244 if is_lease_maintenance_failure(&failure) {
245 warn!(
246 job_id = %job.id,
247 attempt = job.attempt,
248 failure_code = failure.code,
249 "job processing aborted because durable lease maintenance was lost"
250 );
251 return;
252 }
253
254 let retry_delay_ms = if is_non_retryable_failure_kind(failure.kind) {
255 None
256 } else {
257 Some(retry_delay_ms_for_failure(
258 registry.as_ref(),
259 &job,
260 &failure,
261 ))
262 };
263 let failure_payload = JobFailureUpdate {
264 kind: failure.kind,
265 code: failure.code,
266 message: failure.message.as_ref(),
267 retry_delay_ms,
268 };
269 let dead_letter = dead_letter_info(&job, &failure);
270 if let Err(error) = jobs::complete_job_failure(
271 &pool,
272 job.id,
273 job.run_number,
274 job.attempt,
275 &context.worker_id,
276 &failure_payload,
277 )
278 .await
279 {
280 let release_conflict = is_workflow_release_conflict_error(&error);
281 let error = WorkerError::CompleteFailure {
282 job_id: job.id,
283 attempt: job.attempt,
284 source: error,
285 };
286 if release_conflict {
287 warn!(
288 %error,
289 job_id = %job.id,
290 "job failure completion conflicted with workflow cancellation; leaving lease for reaper recovery"
291 );
292 } else {
293 error!(%error, job_id = %job.id, "failed to mark job failure");
294 }
295 } else if let Some(dead_letter) = dead_letter {
296 warn!(
297 job_id = %job.id,
298 job_type = %job.job_type,
299 run_number = job.run_number,
300 attempt = job.attempt,
301 max_attempts = job.max_attempts,
302 organization_id = ?job.organization_id,
303 worker_id = %context.worker_id,
304 dead_letter_reason = ?dead_letter.reason,
305 failure_kind = ?dead_letter.failure.kind,
306 failure_code = dead_letter.failure.code,
307 failure_message = %dead_letter.failure.message,
308 "job dead lettered after handler failure"
309 );
310 notify_handler_of_dead_letter(registry.as_ref(), &context, &job, dead_letter)
311 .await;
312 }
313 }
314 }
315
316 info!(
317 job_id = %job.id,
318 attempt = job.attempt,
319 run_number = job.run_number,
320 elapsed_ms = start.elapsed().as_millis(),
321 "job processed"
322 );
323 }
324 .instrument(job_span)
325 .await;
326}
327
328async fn mark_job_running_or_abort(
329 pool: &runledger_postgres::DbPool,
330 context: &JobContext,
331 job: &jobs::JobQueueRecord,
332) -> bool {
333 let running_progress = JobProgressUpdate {
334 stage: Some(runledger_core::jobs::JobStage::Running),
335 progress_done: None,
336 progress_total: None,
337 checkpoint: None,
338 };
339
340 let Err(source) = jobs::update_job_progress(
341 pool,
342 job.id,
343 job.run_number,
344 job.attempt,
345 &context.worker_id,
346 &running_progress,
347 )
348 .await
349 else {
350 return true;
351 };
352
353 handle_running_progress_persist_failure(pool, context, job, source).await;
354 false
355}
356
357async fn handle_running_progress_persist_failure(
358 pool: &runledger_postgres::DbPool,
359 context: &JobContext,
360 job: &jobs::JobQueueRecord,
361 source: runledger_postgres::Error,
362) {
363 let lease_owner_mismatch = is_lease_owner_mismatch_error(&source);
364 let error = WorkerError::SetRunningProgress {
365 job_id: job.id,
366 attempt: job.attempt,
367 source,
368 };
369
370 if lease_owner_mismatch {
371 warn!(
372 %error,
373 job_id = %job.id,
374 attempt = job.attempt,
375 "aborting job before execution because lease ownership was already lost"
376 );
377 return;
378 }
379
380 match jobs::release_unstarted_job_claim(
381 pool,
382 job.id,
383 job.run_number,
384 job.attempt,
385 &context.worker_id,
386 RUNNING_PROGRESS_PERSIST_FAILED_REASON,
387 UNSTARTED_CLAIM_RETRY_DELAY_MS,
388 )
389 .await
390 {
391 Ok(()) => {
392 warn!(
393 %error,
394 job_id = %job.id,
395 attempt = job.attempt,
396 "running progress could not be persisted; released unstarted claim back to pending"
397 );
398 }
399 Err(release_error) => {
400 let no_longer_releasable =
401 is_unstarted_claim_release_not_applicable_error(&release_error);
402 let release_error = WorkerError::ReleaseUnstartedClaim {
403 job_id: job.id,
404 attempt: job.attempt,
405 source: release_error,
406 };
407 if no_longer_releasable {
408 warn!(
409 %error,
410 %release_error,
411 job_id = %job.id,
412 attempt = job.attempt,
413 "running progress could not be persisted; unstarted release no longer applies and the job will continue under the current lease owner"
414 );
415 return;
416 }
417
418 warn!(
419 %error,
420 %release_error,
421 job_id = %job.id,
422 attempt = job.attempt,
423 "running progress could not be persisted; leaving claim for reaper recovery"
424 );
425 }
426 }
427}
428
429async fn execute_job_handler(
430 registry: Arc<JobRegistry>,
431 context: &JobContext,
432 job: &jobs::JobQueueRecord,
433) -> Result<JobProgress, JobFailure> {
434 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
435 return Err(JobFailure::terminal(
436 "job.handler_not_registered",
437 "No handler is registered for this job type.",
438 ));
439 };
440
441 handler
442 .execute(context.clone(), job.payload.clone())
443 .await?;
444
445 Ok(JobProgress {
446 stage: runledger_core::jobs::JobStage::Completed,
447 progress_done: None,
448 progress_total: None,
449 checkpoint: None,
450 })
451}
452
453async fn execute_job_handler_with_heartbeats(
454 pool: runledger_postgres::DbPool,
455 registry: Arc<JobRegistry>,
456 context: &JobContext,
457 job: &jobs::JobQueueRecord,
458 lease_ttl_seconds: i32,
459) -> Result<JobProgress, JobFailure> {
460 let mut execution =
461 Box::pin(AssertUnwindSafe(execute_job_handler(registry, context, job)).catch_unwind());
462 let timeout_deadline = Instant::now() + Duration::from_secs(job.timeout_seconds.max(1) as u64);
463 let mut timeout = Box::pin(sleep_until(timeout_deadline));
464
465 let mut ticker = tokio::time::interval(heartbeat_interval(lease_ttl_seconds));
466 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
467 ticker.tick().await;
468
469 loop {
470 tokio::select! {
471 result = &mut execution => {
472 return match result {
473 Ok(result) => result,
474 Err(panic_payload) => Err(handler_panic_failure(panic_payload)),
475 };
476 }
477 _ = &mut timeout => {
478 return Err(JobFailure::timeout(
479 "job.timeout_exceeded",
480 "Job exceeded the configured timeout.",
481 ));
482 }
483 _ = ticker.tick() => {
484 if let Err(error) = jobs::heartbeat_job(
485 &pool,
486 job.id,
487 job.run_number,
488 job.attempt,
489 &context.worker_id,
490 lease_ttl_seconds,
491 )
492 .await
493 {
494 let lease_owner_mismatch = is_lease_owner_mismatch_error(&error);
495 let error = WorkerError::Heartbeat {
496 job_id: job.id,
497 attempt: job.attempt,
498 source: error,
499 };
500
501 if lease_owner_mismatch {
502 warn!(%error, job_id = %job.id, "job heartbeat lost lease ownership");
503 return Err(lease_owner_mismatch_failure());
504 }
505
506 warn!(
507 %error,
508 job_id = %job.id,
509 "aborting job because lease heartbeat could not be persisted"
510 );
511 return Err(lease_maintenance_failure());
512 }
513 }
514 }
515 }
516}
517
518fn lease_owner_mismatch_failure() -> JobFailure {
519 JobFailure::lease_expired(
520 LEASE_OWNER_MISMATCH_CODE,
521 "Job lease ownership was lost during processing.",
522 )
523}
524
525fn lease_maintenance_failure() -> JobFailure {
526 JobFailure::lease_expired(
527 LEASE_MAINTENANCE_FAILED_CODE,
528 "Job lease could not be durably maintained during processing.",
529 )
530}
531
532fn handler_panic_failure(panic_payload: Box<dyn Any + Send>) -> JobFailure {
533 JobFailure::panicked(
534 HANDLER_PANIC_CODE,
535 format!(
536 "Job handler panicked: {}",
537 panic_payload_message(&*panic_payload)
538 ),
539 )
540}
541
542fn panic_payload_message(panic_payload: &(dyn Any + Send)) -> String {
543 if let Some(message) = panic_payload.downcast_ref::<String>() {
544 return message.clone();
545 }
546
547 if let Some(message) = panic_payload.downcast_ref::<&'static str>() {
548 return (*message).to_string();
549 }
550
551 "non-string panic payload".to_string()
552}
553
554fn has_query_error_code(error: &runledger_postgres::Error, expected_code: &str) -> bool {
555 matches!(
556 error,
557 runledger_postgres::Error::QueryError(query_error)
558 if query_error.code() == expected_code
559 )
560}
561
562fn is_lease_owner_mismatch_error(error: &runledger_postgres::Error) -> bool {
563 has_query_error_code(error, LEASE_OWNER_MISMATCH_CODE)
564}
565
566fn is_unstarted_claim_release_not_applicable_error(error: &runledger_postgres::Error) -> bool {
567 has_query_error_code(error, UNSTARTED_CLAIM_RELEASE_NOT_APPLICABLE_CODE)
568}
569
570fn is_workflow_release_conflict_error(error: &runledger_postgres::Error) -> bool {
571 has_query_error_code(error, WORKFLOW_RELEASE_CONFLICT_CODE)
572}
573
574fn is_lease_maintenance_failure(failure: &JobFailure) -> bool {
575 matches!(
576 failure.code,
577 LEASE_OWNER_MISMATCH_CODE | LEASE_MAINTENANCE_FAILED_CODE
578 )
579}
580
581fn heartbeat_interval(lease_ttl_seconds: i32) -> Duration {
582 let seconds = (lease_ttl_seconds.max(1) / 3).max(1) as u64;
585 Duration::from_secs(seconds)
586}
587
588fn is_non_retryable_failure_kind(kind: JobFailureKind) -> bool {
589 matches!(kind, JobFailureKind::Terminal | JobFailureKind::Panicked)
590}
591
592fn retry_delay_ms_for_failure(
593 registry: &JobRegistry,
594 job: &jobs::JobQueueRecord,
595 failure: &JobFailure,
596) -> i32 {
597 registry
598 .retry_delay_override(job.job_type.as_borrowed(), failure.code)
599 .unwrap_or_else(|| compute_retry_delay_ms(job.attempt, job.id))
600}
601
602fn dead_letter_info(job: &jobs::JobQueueRecord, failure: &JobFailure) -> Option<JobDeadLetterInfo> {
603 let reason = if is_non_retryable_failure_kind(failure.kind) {
604 Some(JobDeadLetterReason::FailureKindNonRetryable)
605 } else if job.attempt >= job.max_attempts {
606 Some(JobDeadLetterReason::AttemptsExhausted)
607 } else {
608 None
609 }?;
610
611 Some(JobDeadLetterInfo::new(
612 failure.clone(),
613 reason,
614 Some(job.max_attempts),
615 ))
616}
617
618async fn notify_handler_of_dead_letter(
619 registry: &JobRegistry,
620 context: &JobContext,
621 job: &jobs::JobQueueRecord,
622 dead_letter: JobDeadLetterInfo,
623) {
624 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
625 return;
626 };
627 let context = context.clone();
628 let payload = job.payload.clone();
629
630 let hook_task = tokio::spawn(async move {
631 tokio::time::timeout(
632 TERMINAL_HOOK_TIMEOUT,
633 handler.on_dead_letter(context, payload, dead_letter),
634 )
635 .await
636 .is_ok()
637 });
638 match hook_task.await {
639 Ok(true) => {}
640 Ok(false) => {
641 warn!(
642 job_id = %job.id,
643 job_type = %job.job_type,
644 run_number = job.run_number,
645 attempt = job.attempt,
646 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
647 "dead-letter hook timed out; continuing worker job task"
648 );
649 }
650 Err(error) => log_dead_letter_hook_join_error(job, error),
651 }
652}
653
654fn log_dead_letter_hook_join_error(job: &jobs::JobQueueRecord, error: tokio::task::JoinError) {
655 if error.is_panic() {
656 warn!(
657 job_id = %job.id,
658 job_type = %job.job_type,
659 run_number = job.run_number,
660 attempt = job.attempt,
661 error = %error,
662 "dead-letter hook panicked; continuing worker job task"
663 );
664 } else if error.is_cancelled() {
665 warn!(
666 job_id = %job.id,
667 job_type = %job.job_type,
668 run_number = job.run_number,
669 attempt = job.attempt,
670 error = %error,
671 "dead-letter hook was cancelled; continuing worker job task"
672 );
673 } else {
674 warn!(
675 job_id = %job.id,
676 job_type = %job.job_type,
677 run_number = job.run_number,
678 attempt = job.attempt,
679 error = %error,
680 "dead-letter hook join failed; continuing worker job task"
681 );
682 }
683}
684
685fn compute_retry_delay_ms(attempt: i32, job_id: uuid::Uuid) -> i32 {
686 let exp = attempt.clamp(1, 10) as u32;
687 let base_ms: i64 = 5_000;
688 let raw = base_ms * (1_i64 << exp);
689 let capped = raw.min(300_000);
690 let jitter = (job_id.as_u128() % 1_000) as i64 - 500;
691 (capped + jitter).max(1_000) as i32
692}
693
694#[cfg(test)]
695mod tests;