runledger_runtime/catalog/
registration.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use runledger_core::jobs::{JobHandler, JobHandlerRegistry, JobType, JobTypeName};
5use runledger_postgres::jobs::JobDefinitionUpsert;
6
7use crate::registry::JobRegistry;
8
9use super::types::CatalogJob;
10use super::{CatalogError, JobCatalog, JobCatalogDefaults, JobCatalogSyncScope};
11
12impl JobCatalog {
13 #[must_use]
15 pub fn new() -> Self {
16 Self {
17 defaults: JobCatalogDefaults::default(),
18 jobs: BTreeMap::new(),
19 }
20 }
21
22 #[must_use]
24 pub fn defaults(mut self, defaults: JobCatalogDefaults) -> Self {
25 self.defaults = defaults;
26 self
27 }
28
29 pub fn try_job<H>(mut self, job_type: &'static str, handler: H) -> Result<Self, CatalogError>
34 where
35 H: JobHandler + 'static,
36 {
37 let declared = Self::validate_declared_job_type(job_type)?;
38 let handler_type = Self::validate_handler_job_type(&handler)?;
39 if declared != handler_type {
40 return Err(CatalogError::HandlerJobTypeMismatch {
41 declared: declared.as_str().to_owned(),
42 handler: handler_type.as_str().to_owned(),
43 });
44 }
45
46 let key = JobTypeName::new(job_type).map_err(|source| CatalogError::InvalidJobType {
47 job_type: job_type.to_owned(),
48 source,
49 })?;
50 if self.jobs.contains_key(&key) {
51 return Err(CatalogError::DuplicateJobType {
52 job_type: job_type.to_owned(),
53 });
54 }
55
56 self.jobs.insert(
57 key,
58 CatalogJob {
59 job_type: declared,
60 handler: Arc::new(handler),
61 retry_delay_overrides: BTreeMap::new(),
62 },
63 );
64 Ok(self)
65 }
66
67 #[must_use]
69 pub fn job<H>(self, job_type: &'static str, handler: H) -> Self
70 where
71 H: JobHandler + 'static,
72 {
73 self.try_job(job_type, handler).unwrap_or_else(|error| {
74 panic!("invalid job catalog registration for {job_type:?}: {error}");
75 })
76 }
77
78 pub fn try_retry_delay_override(
83 mut self,
84 job_type: &str,
85 failure_code: &'static str,
86 retry_delay_ms: i32,
87 ) -> Result<Self, CatalogError> {
88 let key = self.require_job_key(job_type)?;
89 Self::validate_failure_code(failure_code)?;
90 Self::validate_retry_delay(retry_delay_ms)?;
91 self.jobs
92 .get_mut(&key)
93 .expect("job key validated")
94 .retry_delay_overrides
95 .insert(failure_code, retry_delay_ms);
96 Ok(self)
97 }
98
99 #[must_use]
101 pub fn retry_delay_override(
102 self,
103 job_type: &str,
104 failure_code: &'static str,
105 retry_delay_ms: i32,
106 ) -> Self {
107 self.try_retry_delay_override(job_type, failure_code, retry_delay_ms)
108 .unwrap_or_else(|error| {
109 panic!(
110 "invalid retry delay override for job type {job_type:?}, failure code {failure_code:?}: {error}"
111 );
112 })
113 }
114
115 #[must_use]
120 pub fn to_registry(&self) -> JobRegistry {
121 let mut registry = JobRegistry::new();
122 for entry in self.jobs.values() {
123 registry.register_boxed(Arc::clone(&entry.handler));
124 for (failure_code, retry_delay_ms) in &entry.retry_delay_overrides {
125 registry.register_retry_delay_override(
126 entry.job_type,
127 failure_code,
128 *retry_delay_ms,
129 );
130 }
131 }
132 registry
133 }
134
135 #[must_use]
137 pub fn contains(&self, job_type: JobType<'_>) -> bool {
138 self.jobs.contains_key(job_type.as_str())
139 }
140
141 pub fn require_job_type(&self, job_type: &str) -> Result<JobType<'static>, CatalogError> {
146 let key = self.require_job_key(job_type)?;
147 Ok(self.jobs.get(&key).expect("job key validated").job_type)
148 }
149
150 pub fn require_catalog_enabled_job_type(
160 &self,
161 job_type: &str,
162 ) -> Result<JobType<'static>, CatalogError> {
163 let job_type = self.require_job_type(job_type)?;
164 if !self.defaults.is_enabled {
165 return Err(CatalogError::DisabledJobType {
166 job_type: job_type.as_str().to_owned(),
167 });
168 }
169 Ok(job_type)
170 }
171
172 pub(super) fn validate_defaults(&self) -> Result<(), CatalogError> {
173 if self.defaults.version <= 0 {
174 return Err(CatalogError::InvalidDefinitionValue { field: "version" });
175 }
176 if self.defaults.max_attempts <= 0 {
177 return Err(CatalogError::InvalidDefinitionValue {
178 field: "max_attempts",
179 });
180 }
181 if self.defaults.default_timeout_seconds <= 0 {
182 return Err(CatalogError::InvalidDefinitionValue {
183 field: "default_timeout_seconds",
184 });
185 }
186 Ok(())
188 }
189
190 pub(super) fn validate_exact_sync_scope(
191 &self,
192 scope: &JobCatalogSyncScope,
193 ) -> Result<(), CatalogError> {
194 if self.jobs.is_empty() {
195 return Err(CatalogError::EmptyExactSyncCatalog);
196 }
197
198 for entry in self.jobs.values() {
199 if !scope.contains(entry.job_type) {
200 return Err(CatalogError::JobTypeOutsideExactSyncScope {
201 job_type: entry.job_type.as_str().to_owned(),
202 });
203 }
204 }
205
206 Ok(())
207 }
208
209 pub(super) fn materialize_definition(
210 &self,
211 entry: &CatalogJob,
212 ) -> JobDefinitionUpsert<'static> {
213 JobDefinitionUpsert {
214 job_type: entry.job_type,
215 version: self.defaults.version,
216 max_attempts: self.defaults.max_attempts,
217 default_timeout_seconds: self.defaults.default_timeout_seconds,
218 default_priority: self.defaults.default_priority,
219 is_enabled: self.defaults.is_enabled,
220 }
221 }
222
223 pub(super) fn require_job_key(&self, job_type: &str) -> Result<JobTypeName, CatalogError> {
224 let key = JobTypeName::new(job_type).map_err(|source| CatalogError::InvalidJobType {
225 job_type: job_type.to_owned(),
226 source,
227 })?;
228 if self.jobs.contains_key(&key) {
229 Ok(key)
230 } else {
231 Err(CatalogError::UnknownJobType {
232 job_type: job_type.to_owned(),
233 })
234 }
235 }
236
237 fn validate_declared_job_type(
238 job_type: &'static str,
239 ) -> Result<JobType<'static>, CatalogError> {
240 JobType::try_new(job_type).map_err(|source| CatalogError::InvalidJobType {
241 job_type: job_type.to_owned(),
242 source,
243 })
244 }
245
246 fn validate_handler_job_type<H: JobHandler + ?Sized>(
247 handler: &H,
248 ) -> Result<JobType<'static>, CatalogError> {
249 let handler_job_type = handler.job_type();
250 JobType::try_new(handler_job_type.as_str()).map_err(|source| {
251 CatalogError::InvalidHandlerJobType {
252 handler_job_type: handler_job_type.as_str().to_owned(),
253 source,
254 }
255 })
256 }
257
258 fn validate_failure_code(failure_code: &str) -> Result<(), CatalogError> {
259 if failure_code.trim().is_empty() {
260 Err(CatalogError::InvalidFailureCode)
261 } else {
262 Ok(())
263 }
264 }
265
266 fn validate_retry_delay(retry_delay_ms: i32) -> Result<(), CatalogError> {
267 if retry_delay_ms <= 0 {
268 Err(CatalogError::InvalidRetryDelay)
269 } else {
270 Ok(())
271 }
272 }
273}
274
275impl Default for JobCatalog {
276 fn default() -> Self {
277 Self::new()
278 }
279}