Skip to main content

runledger_runtime/
reaper.rs

1use runledger_core::jobs::{JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tokio::task::{Id, JoinSet};
6use tokio::time::{Duration, sleep, timeout};
7use tracing::{info, warn};
8
9use crate::ReaperError;
10use crate::config::JobsConfig;
11use crate::registry::JobRegistry;
12
13const REAPER_WORKER_ID: &str = "reaper";
14const LEASE_EXPIRED_CODE: &str = "job.lease_expired";
15const LEASE_EXPIRED_MESSAGE: &str = "Job lease expired before completion.";
16const REAPER_TERMINAL_HOOK_MAX_CONCURRENCY: usize = 8;
17#[cfg(test)]
18const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_millis(100);
19#[cfg(not(test))]
20const TERMINAL_HOOK_TIMEOUT: Duration = Duration::from_secs(10);
21#[cfg(test)]
22const TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT: Duration = Duration::from_millis(50);
23#[cfg(not(test))]
24const TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT: Duration = Duration::from_millis(250);
25
26pub async fn run_reaper_loop(
27    pool: runledger_postgres::DbPool,
28    registry: JobRegistry,
29    config: JobsConfig,
30    mut shutdown: watch::Receiver<bool>,
31) {
32    let registry = Arc::new(registry);
33
34    loop {
35        if shutdown_requested_or_closed(&shutdown) {
36            break;
37        }
38
39        match runledger_postgres::jobs::reap_expired_leases_with_terminal_records(
40            &pool,
41            config.claim_batch_size,
42            config.reaper_retry_delay_ms,
43        )
44        .await
45        {
46            Ok(result) => {
47                if result.processed > 0 {
48                    info!(reaped = result.processed, "reaper reclaimed expired leases");
49                }
50
51                let fanout_result = notify_handlers_of_terminal_lease_expirations(
52                    registry.as_ref(),
53                    &result.terminal_dead_lettered,
54                    &mut shutdown,
55                )
56                .await;
57                if matches!(
58                    fanout_result,
59                    TerminalHookFanoutResult::InterruptedByShutdown
60                ) {
61                    break;
62                }
63            }
64            Err(error) => {
65                let error = ReaperError::ReapExpiredLeases {
66                    batch_size: config.claim_batch_size,
67                    retry_delay_ms: config.reaper_retry_delay_ms,
68                    source: error,
69                };
70                warn!(%error, "reaper iteration failed");
71            }
72        }
73
74        if wait_for_shutdown_or_poll(&mut shutdown, config.reaper_interval).await {
75            break;
76        }
77    }
78
79    info!("reaper shutdown complete");
80}
81
82fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
83    *shutdown.borrow() || shutdown.has_changed().is_err()
84}
85
86async fn wait_for_shutdown_or_poll(
87    shutdown: &mut watch::Receiver<bool>,
88    poll_interval: Duration,
89) -> bool {
90    tokio::select! {
91        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
92        _ = sleep(poll_interval) => false,
93    }
94}
95
96async fn notify_handlers_of_terminal_lease_expirations(
97    registry: &JobRegistry,
98    jobs: &[runledger_postgres::jobs::ReapedTerminalLeaseRecord],
99    shutdown: &mut watch::Receiver<bool>,
100) -> TerminalHookFanoutResult {
101    let mut in_flight = JoinSet::new();
102    let mut metadata: HashMap<Id, HookMetadata> = HashMap::new();
103
104    for job in jobs {
105        if *shutdown.borrow() {
106            abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
107            return TerminalHookFanoutResult::InterruptedByShutdown;
108        }
109
110        let Some(handler) = registry.get(job.job_type.as_borrowed()) else {
111            continue;
112        };
113
114        let context = JobContext {
115            job_id: job.job_id,
116            run_number: job.run_number,
117            attempt: job.attempt,
118            organization_id: job.organization_id,
119            worker_id: REAPER_WORKER_ID.to_string(),
120        };
121        let payload = job.payload.clone();
122        let dead_letter = JobDeadLetterInfo::new(
123            lease_expired_failure(),
124            JobDeadLetterReason::LeaseExpired,
125            None,
126        );
127        let hook_meta = HookMetadata {
128            job_id: job.job_id.to_string(),
129            job_type: job.job_type.to_string(),
130            run_number: job.run_number,
131            attempt: job.attempt,
132        };
133
134        let abort_handle = in_flight.spawn(async move {
135            if tokio::time::timeout(
136                TERMINAL_HOOK_TIMEOUT,
137                handler.on_dead_letter(context, payload, dead_letter),
138            )
139            .await
140            .is_ok()
141            {
142                HookOutcome::Completed
143            } else {
144                HookOutcome::TimedOut
145            }
146        });
147        metadata.insert(abort_handle.id(), hook_meta);
148
149        if in_flight.len() >= REAPER_TERMINAL_HOOK_MAX_CONCURRENCY
150            && matches!(
151                await_next_hook_result_or_shutdown(&mut in_flight, &mut metadata, shutdown).await,
152                NextHookResult::InterruptedByShutdown
153            )
154        {
155            abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
156            return TerminalHookFanoutResult::InterruptedByShutdown;
157        }
158    }
159
160    while !in_flight.is_empty() {
161        if matches!(
162            await_next_hook_result_or_shutdown(&mut in_flight, &mut metadata, shutdown).await,
163            NextHookResult::InterruptedByShutdown
164        ) {
165            abort_terminal_hook_fanout(&mut in_flight, &mut metadata).await;
166            return TerminalHookFanoutResult::InterruptedByShutdown;
167        }
168    }
169
170    if !metadata.is_empty() {
171        warn!(
172            stale_hook_metadata_entries = metadata.len(),
173            "reaper hook metadata diverged from in-flight task set; clearing stale metadata"
174        );
175        metadata.clear();
176    }
177
178    TerminalHookFanoutResult::Completed
179}
180
181fn lease_expired_failure() -> JobFailure {
182    JobFailure::lease_expired(LEASE_EXPIRED_CODE, LEASE_EXPIRED_MESSAGE)
183}
184
185#[derive(Debug)]
186enum HookOutcome {
187    Completed,
188    TimedOut,
189}
190
191#[derive(Debug, PartialEq, Eq)]
192enum TerminalHookFanoutResult {
193    Completed,
194    InterruptedByShutdown,
195}
196
197#[derive(Debug, PartialEq, Eq)]
198enum NextHookResult {
199    HookSettled,
200    InterruptedByShutdown,
201}
202
203#[derive(Debug)]
204struct HookMetadata {
205    job_id: String,
206    job_type: String,
207    run_number: i32,
208    attempt: i32,
209}
210
211type HookJoinResult = std::result::Result<(Id, HookOutcome), tokio::task::JoinError>;
212
213async fn await_next_hook_result_or_shutdown(
214    in_flight: &mut JoinSet<HookOutcome>,
215    metadata: &mut HashMap<Id, HookMetadata>,
216    shutdown: &mut watch::Receiver<bool>,
217) -> NextHookResult {
218    if *shutdown.borrow() {
219        return NextHookResult::InterruptedByShutdown;
220    }
221
222    loop {
223        tokio::select! {
224            changed = shutdown.changed() => {
225                if changed.is_err() || *shutdown.borrow() {
226                    return NextHookResult::InterruptedByShutdown;
227                }
228            }
229            result = in_flight.join_next_with_id() => {
230                if let Some(result) = result {
231                    handle_next_hook_result(result, metadata);
232                }
233                return NextHookResult::HookSettled;
234            }
235        }
236    }
237}
238
239async fn abort_terminal_hook_fanout(
240    in_flight: &mut JoinSet<HookOutcome>,
241    metadata: &mut HashMap<Id, HookMetadata>,
242) {
243    if in_flight.is_empty() {
244        if !metadata.is_empty() {
245            warn!(
246                stale_hook_metadata_entries = metadata.len(),
247                "reaper hook metadata diverged while shutdown fanout abort had no in-flight tasks"
248            );
249            metadata.clear();
250        }
251        return;
252    }
253
254    warn!(
255        in_flight_terminal_hooks = in_flight.len(),
256        drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
257        "shutdown requested while terminal failure hooks are running; waiting briefly for in-flight hooks to finish"
258    );
259
260    let drain_result = timeout(
261        TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT,
262        drain_terminal_hook_results(in_flight, metadata),
263    )
264    .await;
265    match drain_result {
266        Ok(()) => {}
267        Err(_) => {
268            warn!(
269                remaining_in_flight_hooks = in_flight.len(),
270                drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
271                "terminal hooks did not finish before shutdown deadline; aborting remaining hooks"
272            );
273            in_flight.abort_all();
274
275            let abort_drain_result = timeout(
276                TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT,
277                drain_aborted_terminal_hook_results(in_flight, metadata),
278            )
279            .await;
280
281            match abort_drain_result {
282                Ok(cancelled_hook_count) => {
283                    if cancelled_hook_count > 0 {
284                        info!(
285                            cancelled_hook_count,
286                            "terminal failure hooks cancelled due to shutdown request"
287                        );
288                    }
289                }
290                Err(_) => {
291                    warn!(
292                        remaining_in_flight_hooks = in_flight.len(),
293                        undrained_hook_metadata_entries = metadata.len(),
294                        drain_timeout_ms = TERMINAL_HOOK_ABORT_DRAIN_TIMEOUT.as_millis(),
295                        "terminal hook abort drain timed out during shutdown; dropping unresolved hook tasks"
296                    );
297                }
298            }
299        }
300    }
301
302    if !metadata.is_empty() {
303        warn!(
304            stale_hook_metadata_entries = metadata.len(),
305            "reaper hook metadata remains after shutdown fanout abort; clearing stale metadata"
306        );
307        metadata.clear();
308    }
309}
310
311async fn drain_terminal_hook_results(
312    in_flight: &mut JoinSet<HookOutcome>,
313    metadata: &mut HashMap<Id, HookMetadata>,
314) {
315    while let Some(result) = in_flight.join_next_with_id().await {
316        handle_next_hook_result(result, metadata);
317    }
318}
319
320async fn drain_aborted_terminal_hook_results(
321    in_flight: &mut JoinSet<HookOutcome>,
322    metadata: &mut HashMap<Id, HookMetadata>,
323) -> usize {
324    let mut cancelled_hook_count: usize = 0;
325
326    while let Some(result) = in_flight.join_next_with_id().await {
327        match result {
328            Err(error) if error.is_cancelled() => {
329                let id = error.id();
330                if metadata.remove(&id).is_none() {
331                    warn!(
332                        "terminal failure hook cancellation observed during shutdown; metadata missing in reaper loop"
333                    );
334                }
335                cancelled_hook_count += 1;
336            }
337            other => handle_next_hook_result(other, metadata),
338        }
339    }
340
341    cancelled_hook_count
342}
343
344fn handle_next_hook_result(result: HookJoinResult, metadata: &mut HashMap<Id, HookMetadata>) {
345    match result {
346        Ok((id, HookOutcome::Completed)) => {
347            if metadata.remove(&id).is_none() {
348                warn!("terminal failure hook completed; metadata missing in reaper loop");
349            }
350        }
351        Ok((id, HookOutcome::TimedOut)) => {
352            if let Some(meta) = metadata.remove(&id) {
353                warn!(
354                    job_id = meta.job_id,
355                    job_type = meta.job_type,
356                    run_number = meta.run_number,
357                    attempt = meta.attempt,
358                    timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
359                    "terminal failure hook timed out; continuing reaper loop"
360                );
361            } else {
362                warn!(
363                    timeout_ms = TERMINAL_HOOK_TIMEOUT.as_millis(),
364                    "terminal failure hook timed out; metadata missing in reaper loop"
365                );
366            }
367        }
368        Err(error) => {
369            let id = error.id();
370            if let Some(meta) = metadata.remove(&id) {
371                if error.is_panic() {
372                    warn!(
373                        job_id = meta.job_id,
374                        job_type = meta.job_type,
375                        run_number = meta.run_number,
376                        attempt = meta.attempt,
377                        error = %error,
378                        "terminal failure hook panicked; continuing reaper loop"
379                    );
380                } else if error.is_cancelled() {
381                    warn!(
382                        job_id = meta.job_id,
383                        job_type = meta.job_type,
384                        run_number = meta.run_number,
385                        attempt = meta.attempt,
386                        error = %error,
387                        "terminal failure hook was cancelled; continuing reaper loop"
388                    );
389                } else {
390                    warn!(
391                        job_id = meta.job_id,
392                        job_type = meta.job_type,
393                        run_number = meta.run_number,
394                        attempt = meta.attempt,
395                        error = %error,
396                        "terminal failure hook join failed; continuing reaper loop"
397                    );
398                }
399            } else if error.is_panic() {
400                warn!(
401                    error = %error,
402                    "terminal failure hook panicked; metadata missing in reaper loop"
403                );
404            } else if error.is_cancelled() {
405                warn!(
406                    error = %error,
407                    "terminal failure hook was cancelled; metadata missing in reaper loop"
408                );
409            } else {
410                warn!(
411                    error = %error,
412                    "terminal failure hook join failed; metadata missing in reaper loop"
413                );
414            }
415        }
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use std::future::pending;
422    use std::sync::atomic::{AtomicUsize, Ordering};
423    use std::sync::{Arc, Mutex};
424    use std::time::Duration;
425
426    use runledger_core::jobs::{
427        JobContext, JobDeadLetterInfo, JobDeadLetterReason, JobFailure, JobType,
428    };
429    use runledger_postgres::jobs::ReapedTerminalLeaseRecord;
430    use serde_json::{Value, json};
431    use sqlx::types::Uuid;
432    use tokio::sync::{Notify, watch};
433    use tokio::time::{sleep, timeout};
434
435    use crate::registry::{JobHandler, JobRegistry};
436
437    use super::{TerminalHookFanoutResult, notify_handlers_of_terminal_lease_expirations};
438
439    struct HangingTerminalHookHandler;
440    struct ControlledTerminalHookHandler {
441        started: Arc<Notify>,
442        release: Arc<Notify>,
443        completions: Arc<AtomicUsize>,
444    }
445
446    struct RecordingDeadLetterHandler {
447        dead_letters: Arc<Mutex<Vec<JobDeadLetterInfo>>>,
448    }
449
450    #[async_trait::async_trait]
451    impl JobHandler for HangingTerminalHookHandler {
452        fn job_type(&self) -> JobType<'static> {
453            JobType::new("jobs.test.reaper.hook.hang")
454        }
455
456        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
457            Ok(())
458        }
459
460        async fn on_dead_letter(
461            &self,
462            _context: JobContext,
463            _payload: Value,
464            _dead_letter: JobDeadLetterInfo,
465        ) {
466            pending::<()>().await;
467        }
468    }
469
470    #[async_trait::async_trait]
471    impl JobHandler for ControlledTerminalHookHandler {
472        fn job_type(&self) -> JobType<'static> {
473            JobType::new("jobs.test.reaper.hook.controlled")
474        }
475
476        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
477            Ok(())
478        }
479
480        async fn on_dead_letter(
481            &self,
482            _context: JobContext,
483            _payload: Value,
484            _dead_letter: JobDeadLetterInfo,
485        ) {
486            self.started.notify_one();
487            self.release.notified().await;
488            self.completions.fetch_add(1, Ordering::SeqCst);
489        }
490    }
491
492    #[async_trait::async_trait]
493    impl JobHandler for RecordingDeadLetterHandler {
494        fn job_type(&self) -> JobType<'static> {
495            JobType::new("jobs.test.reaper.dead_letter.record")
496        }
497
498        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
499            Ok(())
500        }
501
502        async fn on_dead_letter(
503            &self,
504            _context: JobContext,
505            _payload: Value,
506            dead_letter: JobDeadLetterInfo,
507        ) {
508            self.dead_letters
509                .lock()
510                .expect("dead-letter list lock should not be poisoned")
511                .push(dead_letter);
512        }
513    }
514
515    #[tokio::test]
516    async fn notify_handlers_survives_terminal_hook_timeout() {
517        let mut registry = JobRegistry::new();
518        registry.register(HangingTerminalHookHandler);
519        let (_shutdown_tx, mut shutdown_rx) = watch::channel(false);
520
521        let jobs = vec![ReapedTerminalLeaseRecord {
522            job_id: Uuid::now_v7(),
523            job_type: runledger_core::jobs::JobTypeName::from_static("jobs.test.reaper.hook.hang"),
524            organization_id: None,
525            run_number: 1,
526            attempt: 1,
527            payload: json!({ "kind": "hook-timeout" }),
528        }];
529
530        let result = timeout(
531            Duration::from_secs(2),
532            notify_handlers_of_terminal_lease_expirations(&registry, &jobs, &mut shutdown_rx),
533        )
534        .await
535        .expect("notification pass should return even when terminal hook hangs");
536        assert_eq!(result, TerminalHookFanoutResult::Completed);
537    }
538
539    #[tokio::test]
540    async fn notify_handlers_shutdown_interrupts_terminal_hook_fanout() {
541        let mut registry = JobRegistry::new();
542        registry.register(HangingTerminalHookHandler);
543        let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
544
545        let jobs: Vec<_> = (0..256)
546            .map(|idx| ReapedTerminalLeaseRecord {
547                job_id: Uuid::now_v7(),
548                job_type: runledger_core::jobs::JobTypeName::from_static(
549                    "jobs.test.reaper.hook.hang",
550                ),
551                organization_id: None,
552                run_number: 1,
553                attempt: 1,
554                payload: json!({ "kind": "hook-shutdown", "index": idx }),
555            })
556            .collect();
557
558        tokio::spawn(async move {
559            sleep(Duration::from_millis(20)).await;
560            let _ = shutdown_tx.send(true);
561        });
562
563        let result = timeout(
564            Duration::from_secs(2),
565            notify_handlers_of_terminal_lease_expirations(&registry, &jobs, &mut shutdown_rx),
566        )
567        .await
568        .expect("notification pass should return quickly after shutdown signal");
569        assert_eq!(result, TerminalHookFanoutResult::InterruptedByShutdown);
570    }
571
572    #[tokio::test]
573    async fn notify_handlers_ignores_non_shutdown_watch_updates() {
574        let started = Arc::new(Notify::new());
575        let release = Arc::new(Notify::new());
576        let completions = Arc::new(AtomicUsize::new(0));
577        let mut registry = JobRegistry::new();
578        registry.register(ControlledTerminalHookHandler {
579            started: started.clone(),
580            release: release.clone(),
581            completions: completions.clone(),
582        });
583        let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
584        let update_tx = shutdown_tx.clone();
585
586        let jobs = vec![ReapedTerminalLeaseRecord {
587            job_id: Uuid::now_v7(),
588            job_type: runledger_core::jobs::JobTypeName::from_static(
589                "jobs.test.reaper.hook.controlled",
590            ),
591            organization_id: None,
592            run_number: 1,
593            attempt: 1,
594            payload: json!({ "kind": "non-shutdown-watch-update" }),
595        }];
596
597        tokio::spawn(async move {
598            started.notified().await;
599            let _ = update_tx.send(false);
600            sleep(Duration::from_millis(20)).await;
601            release.notify_waiters();
602        });
603
604        let result =
605            notify_handlers_of_terminal_lease_expirations(&registry, &jobs, &mut shutdown_rx).await;
606        drop(shutdown_tx);
607
608        assert_eq!(result, TerminalHookFanoutResult::Completed);
609        assert_eq!(completions.load(Ordering::SeqCst), 1);
610    }
611
612    #[tokio::test]
613    async fn notify_handlers_reports_lease_expiration_dead_letter_reason() {
614        let dead_letters = Arc::new(Mutex::new(Vec::new()));
615        let mut registry = JobRegistry::new();
616        registry.register(RecordingDeadLetterHandler {
617            dead_letters: dead_letters.clone(),
618        });
619        let (_shutdown_tx, mut shutdown_rx) = watch::channel(false);
620
621        let jobs = vec![ReapedTerminalLeaseRecord {
622            job_id: Uuid::now_v7(),
623            job_type: runledger_core::jobs::JobTypeName::from_static(
624                "jobs.test.reaper.dead_letter.record",
625            ),
626            organization_id: None,
627            run_number: 1,
628            attempt: 1,
629            payload: json!({ "kind": "lease-expired" }),
630        }];
631
632        let result =
633            notify_handlers_of_terminal_lease_expirations(&registry, &jobs, &mut shutdown_rx).await;
634        assert_eq!(result, TerminalHookFanoutResult::Completed);
635
636        let dead_letters = dead_letters
637            .lock()
638            .expect("dead-letter list lock should not be poisoned");
639        assert_eq!(dead_letters.len(), 1);
640        let dead_letter = &dead_letters[0];
641        assert_eq!(dead_letter.reason, JobDeadLetterReason::LeaseExpired);
642        assert_eq!(
643            dead_letter.failure.kind,
644            runledger_core::jobs::JobFailureKind::LeaseExpired
645        );
646        assert_eq!(dead_letter.max_attempts, None);
647    }
648}