runledger_runtime/
registry.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub use runledger_core::jobs::JobHandler;
5use runledger_core::jobs::{JobHandlerRegistry, JobType};
6
7#[derive(Clone, Default)]
8pub struct JobRegistry {
9 handlers: HashMap<JobType<'static>, Arc<dyn JobHandler>>,
10 retry_delay_overrides: HashMap<JobType<'static>, HashMap<&'static str, i32>>,
11}
12
13impl JobRegistry {
14 #[must_use]
15 pub fn new() -> Self {
16 Self::default()
17 }
18
19 pub fn register<H>(&mut self, handler: H)
20 where
21 H: JobHandler + 'static,
22 {
23 self.register_boxed(Arc::new(handler));
24 }
25
26 pub fn register_retry_delay_override(
27 &mut self,
28 job_type: JobType<'static>,
29 failure_code: &'static str,
30 retry_delay_ms: i32,
31 ) {
32 assert!(retry_delay_ms > 0, "retry delay override must be positive");
33
34 self.retry_delay_overrides
35 .entry(job_type)
36 .or_default()
37 .insert(failure_code, retry_delay_ms);
38 }
39
40 #[must_use]
41 pub fn get(&self, job_type: JobType<'_>) -> Option<Arc<dyn JobHandler>> {
42 self.handlers.get(job_type.as_str()).cloned()
43 }
44
45 #[must_use]
46 pub fn retry_delay_override(&self, job_type: JobType<'_>, failure_code: &str) -> Option<i32> {
47 self.retry_delay_overrides
48 .get(job_type.as_str())
49 .and_then(|overrides| overrides.get(failure_code).copied())
50 }
51
52 #[must_use]
53 pub fn registered_types(&self) -> Vec<JobType<'_>> {
54 let mut keys: Vec<JobType<'_>> = self.handlers.keys().copied().collect();
55 keys.sort_unstable();
56 keys
57 }
58}
59
60impl JobHandlerRegistry for JobRegistry {
61 fn register_boxed(&mut self, handler: Arc<dyn JobHandler>) {
62 self.handlers.insert(handler.job_type(), handler);
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use async_trait::async_trait;
69 use runledger_core::jobs::{JobContext, JobFailure, JobType};
70 use serde_json::{Value, json};
71 use uuid::Uuid;
72
73 use super::{JobHandler, JobRegistry};
74
75 struct ExampleHandler;
76
77 #[async_trait]
78 impl JobHandler for ExampleHandler {
79 fn job_type(&self) -> JobType<'static> {
80 JobType::new("jobs.example")
81 }
82
83 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
84 Ok(())
85 }
86 }
87
88 fn test_context() -> JobContext {
89 JobContext {
90 job_id: Uuid::now_v7(),
91 run_number: 1,
92 attempt: 1,
93 organization_id: None,
94 worker_id: "registry-test-worker".to_string(),
95 }
96 }
97
98 #[tokio::test]
99 async fn registered_handler_executes_successfully_via_trait_object() {
100 let mut registry = JobRegistry::new();
101 registry.register(ExampleHandler);
102 let handler = registry
103 .get(JobType::new("jobs.example"))
104 .expect("handler exists");
105
106 handler
107 .execute(test_context(), json!({}))
108 .await
109 .expect("registered handler should execute");
110 }
111
112 #[test]
113 fn registered_types_returns_sorted_job_types() {
114 let mut registry = JobRegistry::new();
115 registry.register(ExampleHandler);
116
117 assert_eq!(
118 registry.registered_types(),
119 vec![JobType::new("jobs.example")]
120 );
121 }
122
123 #[test]
124 fn retry_delay_override_matches_job_type_and_failure_code() {
125 let mut registry = JobRegistry::new();
126 registry.register_retry_delay_override(
127 JobType::new("jobs.example"),
128 "job.example.wait",
129 42,
130 );
131
132 assert_eq!(
133 registry.retry_delay_override(JobType::new("jobs.example"), "job.example.wait"),
134 Some(42)
135 );
136 assert_eq!(
137 registry.retry_delay_override(JobType::new("jobs.other"), "job.example.wait"),
138 None
139 );
140 assert_eq!(
141 registry.retry_delay_override(JobType::new("jobs.example"), "job.example.other"),
142 None
143 );
144 }
145
146 #[test]
147 #[should_panic(expected = "retry delay override must be positive")]
148 fn retry_delay_override_rejects_zero_delay() {
149 let mut registry = JobRegistry::new();
150 registry.register_retry_delay_override(JobType::new("jobs.example"), "job.example.wait", 0);
151 }
152}