1use runledger_core::jobs::{JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tokio::task::{Id, JoinSet};
6use tokio::time::{Duration, sleep, timeout};
7use tracing::{info, warn};
8
9use crate::ReaperError;
10use crate::config::JobsConfig;
11use crate::registry::JobRegistry;
12
13const REAPER_WORKER_ID: &str = "reaper";
14const LEASE_EXPIRED_CODE: &str = "job.lease_expired";
15const LEASE_EXPIRED_MESSAGE: &str = "Job lease expired before completion.";
16const REAPER_TERMINAL_HOOK_MAX_CONCURRENCY: usize = 8;
17#[cfg(test)]
18const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
19#[cfg(not(test))]
20const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
21#[cfg(test)]
22const TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT: Duration = Duration::from_millis(50);
23#[cfg(not(test))]
24const TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT: Duration = Duration::from_millis(250);
25
26pub async fn run_reaper_loop(
27 pool: runledger_postgres::DbPool,
28 registry: JobRegistry,
29 config: JobsConfig,
30 mut shutdown: watch::Receiver<bool>,
31) {
32 let registry = Arc::new(registry);
33
34 loop {
35 if shutdown_requested_or_closed(&shutdown) {
36 break;
37 }
38
39 match runledger_postgres::jobs::reap_expired_leases_with_terminal_records(
40 &pool,
41 config.claim_batch_size,
42 config.reaper_retry_delay_ms,
43 )
44 .await
45 {
46 Ok(result) => {
47 if result.processed > 0 {
48 info!(reaped = result.processed, "reaper reclaimed expired leases");
49 }
50
51 let fanout_result = notify_handlers_of_terminal_lease_expirations(
52 registry.as_ref(),
53 &result.terminal_dead_lettered,
54 &mut shutdown,
55 )
56 .await;
57 if matches!(
58 fanout_result,
59 TerminalHookFanoutResult::InterruptedByShutdown
60 ) {
61 break;
62 }
63 }
64 Err(error) => {
65 let error = ReaperError::ReapExpiredLeases {
66 batch_size: config.claim_batch_size,
67 retry_delay_ms: config.reaper_retry_delay_ms,
68 source: error,
69 };
70 warn!(%error, "reaper iteration failed");
71 }
72 }
73
74 if wait_for_shutdown_or_poll(&mut shutdown, config.reaper_interval).await {
75 break;
76 }
77 }
78
79 info!("reaper shutdown complete");
80}
81
82fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
83 *shutdown.borrow() || shutdown.has_changed().is_err()
84}
85
86async fn wait_for_shutdown_or_poll(
87 shutdown: &mut watch::Receiver<bool>,
88 poll_interval: Duration,
89) -> bool {
90 tokio::select! {
91 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
92 _ = sleep(poll_interval) => false,
93 }
94}
95
96async fn notify_handlers_of_terminal_lease_expirations(
97 registry: &JobRegistry,
98 jobs: &[runledger_postgres::jobs::ReapedTerminalLeaseRecord],
99 shutdown: &mut watch::Receiver<bool>,
100) -> TerminalHookFanoutResult {
101 let mut in_flight = JoinSet::new();
102 let mut metadata: HashMap<Id, HookMetadata> = HashMap::new();
103
104 for job in jobs {
105 if *shutdown.borrow() {
106 abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
107 return TerminalHookFanoutResult::InterruptedByShutdown;
108 }
109
110 let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
111 continue;
112 };
113
114 let context = JobContext {
115 job_id: job.job_id,
116 run_number: job.run_number,
117 attempt: job.attempt,
118 organization_id: job.organization_id,
119 worker_id: REAPER_WORKER_ID.to_string(),
120 };
121 let payload = job.payload.clone();
122 let dead_letter = JobDeadLetterInfo::new(
123 lease_expired_failure(),
124 JobDeadLetterReason::LeaseExpired,
125 None,
126 );
127 let hook_meta = HookMetadata {
128 job_id: job.job_id.to_string(),
129 job_type: job.job_type.to_string(),
130 run_number: job.run_number,
131 attempt: job.attempt,
132 };
133
134 let abort_handle = in_flight.spawn(async move {
135 if tokio::time::timeout(
136 TERMINAL_HOOK_TIMEOUT,
137 handler.on_dead_letter(context, payload, dead_letter),
138 )
139 .await
140 .is_ok()
141 {
142 HookOutcome::Completed
143 } else {
144 HookOutcome::TimedOut
145 }
146 });
147 metadata.insert(abort_handle.id(), hook_meta);
148
149 if in_flight.len() >= REAPER_TERMINAL_HOOK_MAX_CONCURRENCY
150 && matches!(
151 await_next_hook_result_or_shutdown(&mut in_flight, &mut metadata, shutdown).await,
152 NextHookResult::InterruptedByShutdown
153 )
154 {
155 abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
156 return TerminalHookFanoutResult::InterruptedByShutdown;
157 }
158 }
159
160 while !in_flight.is_empty() {
161 if matches!(
162 await_next_hook_result_or_shutdown(&mut in_flight, &mut metadata, shutdown).await,
163 NextHookResult::InterruptedByShutdown
164 ) {
165 abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
166 return TerminalHookFanoutResult::InterruptedByShutdown;
167 }
168 }
169
170 if !metadata.is_empty() {
171 warn!(
172 stale_hook_metadata_entries = metadata.len(),
173 "reaper hook metadata diverged from in-flight task set; clearing stale metadata"
174 );
175 metadata.clear();
176 }
177
178 TerminalHookFanoutResult::Completed
179}
180
181fn lease_expired_failure() -> JobFailure {
182 JobFailure::lease_expired(LEASE_EXPIRED_CODE, LEASE_EXPIRED_MESSAGE)
183}
184
185#[derive(Debug)]
186enum HookOutcome {
187 Completed,
188 TimedOut,
189}
190
191#[derive(Debug, PartialEq, Eq)]
192enum TerminalHookFanoutResult {
193 Completed,
194 InterruptedByShutdown,
195}
196
197#[derive(Debug, PartialEq, Eq)]
198enum NextHookResult {
199 HookSettled,
200 InterruptedByShutdown,
201}
202
203#[derive(Debug)]
204struct HookMetadata {
205 job_id: String,
206 job_type: String,
207 run_number: i32,
208 attempt: i32,
209}
210
211type HookJoinResult = std::result::Result<(Id, HookOutcome), tokio::task::JoinError>;
212
213async fn await_next_hook_result_or_shutdown(
214 in_flight: &mut JoinSet<HookOutcome>,
215 metadata: &mut HashMap<Id, HookMetadata>,
216 shutdown: &mut watch::Receiver<bool>,
217) -> NextHookResult {
218 if *shutdown.borrow() {
219 return NextHookResult::InterruptedByShutdown;
220 }
221
222 loop {
223 tokio::select! {
224 changed = shutdown.changed() => {
225 if changed.is_err() || *shutdown.borrow() {
226 return NextHookResult::InterruptedByShutdown;
227 }
228 }
229 result = in_flight.join_next_with_id() => {
230 if let Some(result) = result {
231 handle_next_hook_result(result, metadata);
232 }
233 return NextHookResult::HookSettled;
234 }
235 }
236 }
237}
238
239async fn abort_terminal_hook_fanout(
240 in_flight: &mut JoinSet<HookOutcome>,
241 metadata: &mut HashMap<Id, HookMetadata>,
242) {
243 if in_flight.is_empty() {
244 if !metadata.is_empty() {
245 warn!(
246 stale_hook_metadata_entries = metadata.len(),
247 "reaper hook metadata diverged while shutdown fanout abort had no in-flight tasks"
248 );
249 metadata.clear();
250 }
251 return;
252 }
253
254 warn!(
255 in_flight_terminal_hooks = in_flight.len(),
256 drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
257 "shutdown requested while terminal failure hooks are running; waiting briefly for in-flight hooks to finish"
258 );
259
260 let drain_result = timeout(
261 TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT,
262 drain_terminal_hook_results(in_flight, metadata),
263 )
264 .await;
265 match drain_result {
266 Ok(()) => {}
267 Err(_) => {
268 warn!(
269 remaining_in_flight_hooks = in_flight.len(),
270 drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
271 "terminal hooks did not finish before shutdown deadline; aborting remaining hooks"
272 );
273 in_flight.abort_all();
274
275 let abort_drain_result = timeout(
276 TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT,
277 drain_aborted_terminal_hook_results(in_flight, metadata),
278 )
279 .await;
280
281 match abort_drain_result {
282 Ok(cancelled_hook_count) => {
283 if cancelled_hook_count > 0 {
284 info!(
285 cancelled_hook_count,
286 "terminal failure hooks cancelled due to shutdown request"
287 );
288 }
289 }
290 Err(_) => {
291 warn!(
292 remaining_in_flight_hooks = in_flight.len(),
293 undrained_hook_metadata_entries = metadata.len(),
294 drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
295 "terminal hook abort drain timed out during shutdown; dropping unresolved hook tasks"
296 );
297 }
298 }
299 }
300 }
301
302 if !metadata.is_empty() {
303 warn!(
304 stale_hook_metadata_entries = metadata.len(),
305 "reaper hook metadata remains after shutdown fanout abort; clearing stale metadata"
306 );
307 metadata.clear();
308 }
309}
310
311async fn drain_terminal_hook_results(
312 in_flight: &mut JoinSet<HookOutcome>,
313 metadata: &mut HashMap<Id, HookMetadata>,
314) {
315 while let Some(result) = in_flight.join_next_with_id().await {
316 handle_next_hook_result(result, metadata);
317 }
318}
319
320async fn drain_aborted_terminal_hook_results(
321 in_flight: &mut JoinSet<HookOutcome>,
322 metadata: &mut HashMap<Id, HookMetadata>,
323) -> usize {
324 let mut cancelled_hook_count: usize = 0;
325
326 while let Some(result) = in_flight.join_next_with_id().await {
327 match result {
328 Err(error) if error.is_cancelled() => {
329 let id = error.id();
330 if metadata.remove(&id).is_none() {
331 warn!(
332 "terminal failure hook cancellation observed during shutdown; metadata missing in reaper loop"
333 );
334 }
335 cancelled_hook_count += 1;
336 }
337 other => handle_next_hook_result(other, metadata),
338 }
339 }
340
341 cancelled_hook_count
342}
343
344fn handle_next_hook_result(result: HookJoinResult, metadata: &mut HashMap<Id, HookMetadata>) {
345 match result {
346 Ok((id, HookOutcome::Completed)) => {
347 if metadata.remove(&id).is_none() {
348 warn!("terminal failure hook completed; metadata missing in reaper loop");
349 }
350 }
351 Ok((id, HookOutcome::TimedOut)) => {
352 if let Some(meta) = metadata.remove(&id) {
353 warn!(
354 job_id = meta.job_id,
355 job_type = meta.job_type,
356 run_number = meta.run_number,
357 attempt = meta.attempt,
358 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
359 "terminal failure hook timed out; continuing reaper loop"
360 );
361 } else {
362 warn!(
363 timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
364 "terminal failure hook timed out; metadata missing in reaper loop"
365 );
366 }
367 }
368 Err(error) => {
369 let id = error.id();
370 if let Some(meta) = metadata.remove(&id) {
371 if error.is_panic() {
372 warn!(
373 job_id = meta.job_id,
374 job_type = meta.job_type,
375 run_number = meta.run_number,
376 attempt = meta.attempt,
377 error = %error,
378 "terminal failure hook panicked; continuing reaper loop"
379 );
380 } else if error.is_cancelled() {
381 warn!(
382 job_id = meta.job_id,
383 job_type = meta.job_type,
384 run_number = meta.run_number,
385 attempt = meta.attempt,
386 error = %error,
387 "terminal failure hook was cancelled; continuing reaper loop"
388 );
389 } else {
390 warn!(
391 job_id = meta.job_id,
392 job_type = meta.job_type,
393 run_number = meta.run_number,
394 attempt = meta.attempt,
395 error = %error,
396 "terminal failure hook join failed; continuing reaper loop"
397 );
398 }
399 } else if error.is_panic() {
400 warn!(
401 error = %error,
402 "terminal failure hook panicked; metadata missing in reaper loop"
403 );
404 } else if error.is_cancelled() {
405 warn!(
406 error = %error,
407 "terminal failure hook was cancelled; metadata missing in reaper loop"
408 );
409 } else {
410 warn!(
411 error = %error,
412 "terminal failure hook join failed; metadata missing in reaper loop"
413 );
414 }
415 }
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use std::future::pending;
422 use std::sync::atomic::{AtomicUsize, Ordering};
423 use std::sync::{Arc, Mutex};
424 use std::time::Duration;
425
426 use runledger_core::jobs::{
427 JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobType,
428 };
429 use runledger_postgres::jobs::ReapedTerminalLeaseRecord;
430 use serde_json::{Value, json};
431 use sqlx::types::Uuid;
432 use tokio::sync::{Notify, watch};
433 use tokio::time::{sleep, timeout};
434
435 use crate::registry::{JobHandler, JobRegistry};
436
437 use super::{TerminalHookFanoutResult, notify_handlers_of_terminal_lease_expirations};
438
439 struct HangingTerminalHookHandler;
440 struct ControlledTerminalHookHandler {
441 started: Arc<Notify>,
442 release: Arc<Notify>,
443 completions: Arc<AtomicUsize>,
444 }
445
446 struct RecordingDeadLetterHandler {
447 dead_letters: Arc<Mutex<Vec<JobDeadLetterInfo>>>,
448 }
449
450 #[async_trait::async_trait]
451 impl JobHandler for HangingTerminalHookHandler {
452 fn job_type(&self) -> JobType<'static> {
453 JobType::new("jobs.test.reaper.hook.hang")
454 }
455
456 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
457 Ok(())
458 }
459
460 async fn on_dead_letter(
461 &self,
462 _context: JobContext,
463 _payload: Value,
464 _dead_letter: JobDeadLetterInfo,
465 ) {
466 pending::<()>().await;
467 }
468 }
469
470 #[async_trait::async_trait]
471 impl JobHandler for ControlledTerminalHookHandler {
472 fn job_type(&self) -> JobType<'static> {
473 JobType::new("jobs.test.reaper.hook.controlled")
474 }
475
476 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
477 Ok(())
478 }
479
480 async fn on_dead_letter(
481 &self,
482 _context: JobContext,
483 _payload: Value,
484 _dead_letter: JobDeadLetterInfo,
485 ) {
486 self.started.notify_one();
487 self.release.notified().await;
488 self.completions.fetch_add(1, Ordering::SeqCst);
489 }
490 }
491
492 #[async_trait::async_trait]
493 impl JobHandler for RecordingDeadLetterHandler {
494 fn job_type(&self) -> JobType<'static> {
495 JobType::new("jobs.test.reaper.dead_letter.record")
496 }
497
498 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
499 Ok(())
500 }
501
502 async fn on_dead_letter(
503 &self,
504 _context: JobContext,
505 _payload: Value,
506 dead_letter: JobDeadLetterInfo,
507 ) {
508 self.dead_letters
509 .lock()
510 .expect("dead-letter list lock should not be poisoned")
511 .push(dead_letter);
512 }
513 }
514
515 #[tokio::test]
516 async fn notify_handlers_survives_terminal_hook_timeout() {
517 let mut registry = JobRegistry::new();
518 registry.register(HangingTerminalHookHandler);
519 let (_shutdown_tx, mut shutdown_rx) = watch::channel(false);
520
521 let jobs = vec![ReapedTerminalLeaseRecord {
522 job_id: Uuid::now_v7(),
523 job_type: runledger_core::jobs::JobTypeName::from_static("jobs.test.reaper.hook.hang"),
524 organization_id: None,
525 run_number: 1,
526 attempt: 1,
527 payload: json!({ "kind": "hook-timeout" }),
528 }];
529
530 let result = timeout(
531 Duration::from_secs(2),
532 notify_handlers_of_terminal_lease_expirations(®istry, &jobs, &mut shutdown_rx),
533 )
534 .await
535 .expect("notification pass should return even when terminal hook hangs");
536 assert_eq!(result, TerminalHookFanoutResult::Completed);
537 }
538
539 #[tokio::test]
540 async fn notify_handlers_shutdown_interrupts_terminal_hook_fanout() {
541 let mut registry = JobRegistry::new();
542 registry.register(HangingTerminalHookHandler);
543 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
544
545 let jobs: Vec<_> = (0..256)
546 .map(|idx| ReapedTerminalLeaseRecord {
547 job_id: Uuid::now_v7(),
548 job_type: runledger_core::jobs::JobTypeName::from_static(
549 "jobs.test.reaper.hook.hang",
550 ),
551 organization_id: None,
552 run_number: 1,
553 attempt: 1,
554 payload: json!({ "kind": "hook-shutdown", "index": idx }),
555 })
556 .collect();
557
558 tokio::spawn(async move {
559 sleep(Duration::from_millis(20)).await;
560 let _ = shutdown_tx.send(true);
561 });
562
563 let result = timeout(
564 Duration::from_secs(2),
565 notify_handlers_of_terminal_lease_expirations(®istry, &jobs, &mut shutdown_rx),
566 )
567 .await
568 .expect("notification pass should return quickly after shutdown signal");
569 assert_eq!(result, TerminalHookFanoutResult::InterruptedByShutdown);
570 }
571
572 #[tokio::test]
573 async fn notify_handlers_ignores_non_shutdown_watch_updates() {
574 let started = Arc::new(Notify::new());
575 let release = Arc::new(Notify::new());
576 let completions = Arc::new(AtomicUsize::new(0));
577 let mut registry = JobRegistry::new();
578 registry.register(ControlledTerminalHookHandler {
579 started: started.clone(),
580 release: release.clone(),
581 completions: completions.clone(),
582 });
583 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
584 let update_tx = shutdown_tx.clone();
585
586 let jobs = vec![ReapedTerminalLeaseRecord {
587 job_id: Uuid::now_v7(),
588 job_type: runledger_core::jobs::JobTypeName::from_static(
589 "jobs.test.reaper.hook.controlled",
590 ),
591 organization_id: None,
592 run_number: 1,
593 attempt: 1,
594 payload: json!({ "kind": "non-shutdown-watch-update" }),
595 }];
596
597 tokio::spawn(async move {
598 started.notified().await;
599 let _ = update_tx.send(false);
600 sleep(Duration::from_millis(20)).await;
601 release.notify_waiters();
602 });
603
604 let result =
605 notify_handlers_of_terminal_lease_expirations(®istry, &jobs, &mut shutdown_rx).await;
606 drop(shutdown_tx);
607
608 assert_eq!(result, TerminalHookFanoutResult::Completed);
609 assert_eq!(completions.load(Ordering::SeqCst), 1);
610 }
611
612 #[tokio::test]
613 async fn notify_handlers_reports_lease_expiration_dead_letter_reason() {
614 let dead_letters = Arc::new(Mutex::new(Vec::new()));
615 let mut registry = JobRegistry::new();
616 registry.register(RecordingDeadLetterHandler {
617 dead_letters: dead_letters.clone(),
618 });
619 let (_shutdown_tx, mut shutdown_rx) = watch::channel(false);
620
621 let jobs = vec![ReapedTerminalLeaseRecord {
622 job_id: Uuid::now_v7(),
623 job_type: runledger_core::jobs::JobTypeName::from_static(
624 "jobs.test.reaper.dead_letter.record",
625 ),
626 organization_id: None,
627 run_number: 1,
628 attempt: 1,
629 payload: json!({ "kind": "lease-expired" }),
630 }];
631
632 let result =
633 notify_handlers_of_terminal_lease_expirations(®istry, &jobs, &mut shutdown_rx).await;
634 assert_eq!(result, TerminalHookFanoutResult::Completed);
635
636 let dead_letters = dead_letters
637 .lock()
638 .expect("dead-letter list lock should not be poisoned");
639 assert_eq!(dead_letters.len(), 1);
640 let dead_letter = &dead_letters[0];
641 assert_eq!(dead_letter.reason, JobDeadLetterReason::LeaseExpired);
642 assert_eq!(
643 dead_letter.failure.kind,
644 runledger_core::jobs::JobFailureKind::LeaseExpired
645 );
646 assert_eq!(dead_letter.max_attempts, None);
647 }
648}