Skip to main content

job/
registry.rs

1//! Registry storing job initializers and retry settings.
2
3use es_entity::clock::ClockHandle;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use super::{entity::*, error::JobError, repo::JobRepo, runner::*, spawner::JobSpawner};
8
9/// Internal trait for storing initializers with erased Config type.
10/// Only `init` is needed after registration - job_type and retry_settings
11/// are extracted before boxing and stored separately.
12pub(crate) trait AnyJobInitializer: Send + Sync + 'static {
13    fn init(
14        &self,
15        job: &Job,
16        repo: Arc<JobRepo>,
17        clock: ClockHandle,
18    ) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>>;
19}
20
21impl<T: JobInitializer> AnyJobInitializer for T {
22    fn init(
23        &self,
24        job: &Job,
25        repo: Arc<JobRepo>,
26        clock: ClockHandle,
27    ) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
28        let spawner = JobSpawner::<T::Config>::new(repo, self.job_type(), clock);
29        JobInitializer::init(self, job, spawner)
30    }
31}
32
33/// Keeps track of registered job types and their retry behaviour.
34pub struct JobRegistry {
35    initializers: HashMap<JobType, Box<dyn AnyJobInitializer>>,
36    retry_settings: HashMap<JobType, RetrySettings>,
37}
38
39impl JobRegistry {
40    pub(crate) fn new() -> Self {
41        Self {
42            initializers: HashMap::new(),
43            retry_settings: HashMap::new(),
44        }
45    }
46
47    /// Register a [`JobInitializer`] and its associated retry settings.
48    /// Returns the job type that was registered.
49    pub fn add_initializer<I: JobInitializer>(&mut self, initializer: I) -> JobType {
50        let job_type = initializer.job_type();
51        let retry_settings = initializer.retry_on_error_settings();
52        self.initializers
53            .insert(job_type.clone(), Box::new(initializer));
54        self.retry_settings.insert(job_type.clone(), retry_settings);
55        job_type
56    }
57
58    pub(super) fn init_job(
59        &self,
60        job: &Job,
61        repo: Arc<JobRepo>,
62        clock: ClockHandle,
63    ) -> Result<Box<dyn JobRunner>, JobError> {
64        self.initializers
65            .get(&job.job_type)
66            .ok_or(JobError::NoInitializerPresent)?
67            .init(job, repo, clock)
68            .map_err(|e| JobError::JobInitError(e.to_string()))
69    }
70
71    /// Retrieve retry settings for a given job type.
72    pub(super) fn retry_settings(&self, job_type: &JobType) -> &RetrySettings {
73        self.retry_settings
74            .get(job_type)
75            .expect("Retry settings not found")
76    }
77
78    /// Get a list of all registered job types.
79    pub(crate) fn registered_job_types(&self) -> Vec<JobType> {
80        self.initializers.keys().cloned().collect()
81    }
82}