Skip to main content

runledger_runtime/catalog/
registration.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use runledger_core::jobs::{JobHandler, JobHandlerRegistry, JobType, JobTypeName};
5use runledger_postgres::jobs::JobDefinitionUpsert;
6
7use crate::registry::JobRegistry;
8
9use super::types::CatalogJob;
10use super::{CatalogError, JobCatalog, JobCatalogDefaults, JobCatalogSyncScope};
11
12impl JobCatalog {
13    /// Creates an empty catalog with default definition values.
14    #[must_use]
15    pub fn new() -> Self {
16        Self {
17            defaults: JobCatalogDefaults::default(),
18            jobs: BTreeMap::new(),
19        }
20    }
21
22    /// Replaces the definition defaults used by subsequent sync operations.
23    #[must_use]
24    pub fn defaults(mut self, defaults: JobCatalogDefaults) -> Self {
25        self.defaults = defaults;
26        self
27    }
28
29    /// Registers a handler after validating declared and handler job types match.
30    ///
31    /// # Errors
32    /// Returns [`CatalogError`] when job types are blank, mismatched, or duplicated.
33    pub fn try_job<H>(mut self, job_type: &'static str, handler: H) -> Result<Self, CatalogError>
34    where
35        H: JobHandler + 'static,
36    {
37        let declared = Self::validate_declared_job_type(job_type)?;
38        let handler_type = Self::validate_handler_job_type(&handler)?;
39        if declared != handler_type {
40            return Err(CatalogError::HandlerJobTypeMismatch {
41                declared: declared.as_str().to_owned(),
42                handler: handler_type.as_str().to_owned(),
43            });
44        }
45
46        let key = JobTypeName::new(job_type).map_err(|source| CatalogError::InvalidJobType {
47            job_type: job_type.to_owned(),
48            source,
49        })?;
50        if self.jobs.contains_key(&key) {
51            return Err(CatalogError::DuplicateJobType {
52                job_type: job_type.to_owned(),
53            });
54        }
55
56        self.jobs.insert(
57            key,
58            CatalogJob {
59                job_type: declared,
60                handler: Arc::new(handler),
61                retry_delay_overrides: BTreeMap::new(),
62            },
63        );
64        Ok(self)
65    }
66
67    /// Registers a handler, panicking when validation fails.
68    #[must_use]
69    pub fn job<H>(self, job_type: &'static str, handler: H) -> Self
70    where
71        H: JobHandler + 'static,
72    {
73        self.try_job(job_type, handler).unwrap_or_else(|error| {
74            panic!("invalid job catalog registration for {job_type:?}: {error}");
75        })
76    }
77
78    /// Registers a retry-delay override for a catalog job type.
79    ///
80    /// # Errors
81    /// Returns [`CatalogError`] when the job type is unknown or override values are invalid.
82    pub fn try_retry_delay_override(
83        mut self,
84        job_type: &str,
85        failure_code: &'static str,
86        retry_delay_ms: i32,
87    ) -> Result<Self, CatalogError> {
88        let key = self.require_job_key(job_type)?;
89        Self::validate_failure_code(failure_code)?;
90        Self::validate_retry_delay(retry_delay_ms)?;
91        self.jobs
92            .get_mut(&key)
93            .expect("job key validated")
94            .retry_delay_overrides
95            .insert(failure_code, retry_delay_ms);
96        Ok(self)
97    }
98
99    /// Registers a retry-delay override, panicking when validation fails.
100    #[must_use]
101    pub fn retry_delay_override(
102        self,
103        job_type: &str,
104        failure_code: &'static str,
105        retry_delay_ms: i32,
106    ) -> Self {
107        self.try_retry_delay_override(job_type, failure_code, retry_delay_ms)
108            .unwrap_or_else(|error| {
109                panic!(
110                    "invalid retry delay override for job type {job_type:?}, failure code {failure_code:?}: {error}"
111                );
112            })
113    }
114
115    /// Converts the catalog into a runtime [`JobRegistry`].
116    ///
117    /// Disabled catalog defaults still register handlers so workers can process
118    /// already-queued work and dead-letter hooks.
119    #[must_use]
120    pub fn to_registry(&self) -> JobRegistry {
121        let mut registry = JobRegistry::new();
122        for entry in self.jobs.values() {
123            registry.register_boxed(Arc::clone(&entry.handler));
124            for (failure_code, retry_delay_ms) in &entry.retry_delay_overrides {
125                registry.register_retry_delay_override(
126                    entry.job_type,
127                    failure_code,
128                    *retry_delay_ms,
129                );
130            }
131        }
132        registry
133    }
134
135    /// Returns whether the catalog has a registered job type.
136    #[must_use]
137    pub fn contains(&self, job_type: JobType<'_>) -> bool {
138        self.jobs.contains_key(job_type.as_str())
139    }
140
141    /// Returns a catalog job type when it is registered.
142    ///
143    /// # Errors
144    /// Returns [`CatalogError::UnknownJobType`] when the name is not in the catalog.
145    pub fn require_job_type(&self, job_type: &str) -> Result<JobType<'static>, CatalogError> {
146        let key = self.require_job_key(job_type)?;
147        Ok(self.jobs.get(&key).expect("job key validated").job_type)
148    }
149
150    /// Returns a catalog job type when it is registered and catalog-enabled.
151    ///
152    /// This checks catalog configuration only. It does not read `job_definitions`;
153    /// operator-disabled database rows are enforced later by persistence APIs.
154    /// Catalog defaults' enabled flag applies to every catalog entry; per-job
155    /// enabled overrides are not modeled yet.
156    ///
157    /// # Errors
158    /// Returns [`CatalogError::UnknownJobType`] or [`CatalogError::DisabledJobType`].
159    pub fn require_catalog_enabled_job_type(
160        &self,
161        job_type: &str,
162    ) -> Result<JobType<'static>, CatalogError> {
163        let job_type = self.require_job_type(job_type)?;
164        if !self.defaults.is_enabled {
165            return Err(CatalogError::DisabledJobType {
166                job_type: job_type.as_str().to_owned(),
167            });
168        }
169        Ok(job_type)
170    }
171
172    pub(super) fn validate_defaults(&self) -> Result<(), CatalogError> {
173        if self.defaults.version <= 0 {
174            return Err(CatalogError::InvalidDefinitionValue { field: "version" });
175        }
176        if self.defaults.max_attempts <= 0 {
177            return Err(CatalogError::InvalidDefinitionValue {
178                field: "max_attempts",
179            });
180        }
181        if self.defaults.default_timeout_seconds <= 0 {
182            return Err(CatalogError::InvalidDefinitionValue {
183                field: "default_timeout_seconds",
184            });
185        }
186        // default_priority intentionally accepts zero and negative values.
187        Ok(())
188    }
189
190    pub(super) fn validate_exact_sync_scope(
191        &self,
192        scope: &JobCatalogSyncScope,
193    ) -> Result<(), CatalogError> {
194        if self.jobs.is_empty() {
195            return Err(CatalogError::EmptyExactSyncCatalog);
196        }
197
198        for entry in self.jobs.values() {
199            if !scope.contains(entry.job_type) {
200                return Err(CatalogError::JobTypeOutsideExactSyncScope {
201                    job_type: entry.job_type.as_str().to_owned(),
202                });
203            }
204        }
205
206        Ok(())
207    }
208
209    pub(super) fn materialize_definition(
210        &self,
211        entry: &CatalogJob,
212    ) -> JobDefinitionUpsert<'static> {
213        JobDefinitionUpsert {
214            job_type: entry.job_type,
215            version: self.defaults.version,
216            max_attempts: self.defaults.max_attempts,
217            default_timeout_seconds: self.defaults.default_timeout_seconds,
218            default_priority: self.defaults.default_priority,
219            is_enabled: self.defaults.is_enabled,
220        }
221    }
222
223    pub(super) fn require_job_key(&self, job_type: &str) -> Result<JobTypeName, CatalogError> {
224        let key = JobTypeName::new(job_type).map_err(|source| CatalogError::InvalidJobType {
225            job_type: job_type.to_owned(),
226            source,
227        })?;
228        if self.jobs.contains_key(&key) {
229            Ok(key)
230        } else {
231            Err(CatalogError::UnknownJobType {
232                job_type: job_type.to_owned(),
233            })
234        }
235    }
236
237    fn validate_declared_job_type(
238        job_type: &'static str,
239    ) -> Result<JobType<'static>, CatalogError> {
240        JobType::try_new(job_type).map_err(|source| CatalogError::InvalidJobType {
241            job_type: job_type.to_owned(),
242            source,
243        })
244    }
245
246    fn validate_handler_job_type<H: JobHandler + ?Sized>(
247        handler: &H,
248    ) -> Result<JobType<'static>, CatalogError> {
249        let handler_job_type = handler.job_type();
250        JobType::try_new(handler_job_type.as_str()).map_err(|source| {
251            CatalogError::InvalidHandlerJobType {
252                handler_job_type: handler_job_type.as_str().to_owned(),
253                source,
254            }
255        })
256    }
257
258    fn validate_failure_code(failure_code: &str) -> Result<(), CatalogError> {
259        if failure_code.trim().is_empty() {
260            Err(CatalogError::InvalidFailureCode)
261        } else {
262            Ok(())
263        }
264    }
265
266    fn validate_retry_delay(retry_delay_ms: i32) -> Result<(), CatalogError> {
267        if retry_delay_ms <= 0 {
268            Err(CatalogError::InvalidRetryDelay)
269        } else {
270            Ok(())
271        }
272    }
273}
274
275impl Default for JobCatalog {
276    fn default() -> Self {
277        Self::new()
278    }
279}