Skip to main content

runledger_runtime/
registry.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4pub use runledger_core::jobs::JobHandler;
5use runledger_core::jobs::{JobHandlerRegistry, JobType};
6
7#[derive(Clone, Default)]
8pub struct JobRegistry {
9    handlers: HashMap<JobType<'static>, Arc<dyn JobHandler>>,
10    retry_delay_overrides: HashMap<JobType<'static>, HashMap<&'static str, i32>>,
11}
12
13impl JobRegistry {
14    #[must_use]
15    pub fn new() -> Self {
16        Self::default()
17    }
18
19    pub fn register<H>(&mut self, handler: H)
20    where
21        H: JobHandler + 'static,
22    {
23        self.register_boxed(Arc::new(handler));
24    }
25
26    pub fn register_retry_delay_override(
27        &mut self,
28        job_type: JobType<'static>,
29        failure_code: &'static str,
30        retry_delay_ms: i32,
31    ) {
32        assert!(retry_delay_ms > 0, "retry delay override must be positive");
33
34        self.retry_delay_overrides
35            .entry(job_type)
36            .or_default()
37            .insert(failure_code, retry_delay_ms);
38    }
39
40    #[must_use]
41    pub fn get(&self, job_type: JobType<'_>) -> Option<Arc<dyn JobHandler>> {
42        self.handlers.get(job_type.as_str()).cloned()
43    }
44
45    #[must_use]
46    pub fn retry_delay_override(&self, job_type: JobType<'_>, failure_code: &str) -> Option<i32> {
47        self.retry_delay_overrides
48            .get(job_type.as_str())
49            .and_then(|overrides| overrides.get(failure_code).copied())
50    }
51
52    #[must_use]
53    pub fn registered_types(&self) -> Vec<JobType<'_>> {
54        let mut keys: Vec<JobType<'_>> = self.handlers.keys().copied().collect();
55        keys.sort_unstable();
56        keys
57    }
58}
59
60impl JobHandlerRegistry for JobRegistry {
61    fn register_boxed(&mut self, handler: Arc<dyn JobHandler>) {
62        self.handlers.insert(handler.job_type(), handler);
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use async_trait::async_trait;
69    use runledger_core::jobs::{JobContext, JobFailure, JobType};
70    use serde_json::{Value, json};
71    use uuid::Uuid;
72
73    use super::{JobHandler, JobRegistry};
74
75    struct ExampleHandler;
76
77    #[async_trait]
78    impl JobHandler for ExampleHandler {
79        fn job_type(&self) -> JobType<'static> {
80            JobType::new("jobs.example")
81        }
82
83        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
84            Ok(())
85        }
86    }
87
88    fn test_context() -> JobContext {
89        JobContext {
90            job_id: Uuid::now_v7(),
91            run_number: 1,
92            attempt: 1,
93            organization_id: None,
94            worker_id: "registry-test-worker".to_string(),
95        }
96    }
97
98    #[tokio::test]
99    async fn registered_handler_executes_successfully_via_trait_object() {
100        let mut registry = JobRegistry::new();
101        registry.register(ExampleHandler);
102        let handler = registry
103            .get(JobType::new("jobs.example"))
104            .expect("handler exists");
105
106        handler
107            .execute(test_context(), json!({}))
108            .await
109            .expect("registered handler should execute");
110    }
111
112    #[test]
113    fn registered_types_returns_sorted_job_types() {
114        let mut registry = JobRegistry::new();
115        registry.register(ExampleHandler);
116
117        assert_eq!(
118            registry.registered_types(),
119            vec![JobType::new("jobs.example")]
120        );
121    }
122
123    #[test]
124    fn retry_delay_override_matches_job_type_and_failure_code() {
125        let mut registry = JobRegistry::new();
126        registry.register_retry_delay_override(
127            JobType::new("jobs.example"),
128            "job.example.wait",
129            42,
130        );
131
132        assert_eq!(
133            registry.retry_delay_override(JobType::new("jobs.example"), "job.example.wait"),
134            Some(42)
135        );
136        assert_eq!(
137            registry.retry_delay_override(JobType::new("jobs.other"), "job.example.wait"),
138            None
139        );
140        assert_eq!(
141            registry.retry_delay_override(JobType::new("jobs.example"), "job.example.other"),
142            None
143        );
144    }
145
146    #[test]
147    #[should_panic(expected = "retry delay override must be positive")]
148    fn retry_delay_override_rejects_zero_delay() {
149        let mut registry = JobRegistry::new();
150        registry.register_retry_delay_override(JobType::new("jobs.example"), "job.example.wait", 0);
151    }
152}