runledger_runtime/catalog/
mod.rs1mod error;
14mod inputs;
15mod registration;
16mod sync;
17mod types;
18mod workflow;
19
20pub use error::CatalogError;
21pub use inputs::{CatalogJobEnqueueInput, CatalogJobScheduleInput};
22pub use types::{
23 JobCatalog, JobCatalogDefaults, JobCatalogExactSyncReport, JobCatalogSyncReport,
24 JobCatalogSyncScope,
25};
26pub use workflow::CatalogWorkflowDagBuilder;
27
28#[cfg(test)]
29mod tests {
30 use super::*;
31 use async_trait::async_trait;
32 use runledger_core::jobs::{JobContext, JobFailure, JobHandler, JobType, WorkflowBuildError};
33 use serde_json::Value;
34 use serde_json::json;
35
36 struct StaticHandler(&'static str);
37
38 #[async_trait]
39 impl JobHandler for StaticHandler {
40 fn job_type(&self) -> JobType<'static> {
41 JobType::new(self.0)
42 }
43
44 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
45 Ok(())
46 }
47 }
48
49 struct BlankHandler;
50
51 #[async_trait]
52 impl JobHandler for BlankHandler {
53 fn job_type(&self) -> JobType<'static> {
54 JobType::new(" ")
55 }
56
57 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
58 Ok(())
59 }
60 }
61
62 struct MismatchHandler;
63
64 #[async_trait]
65 impl JobHandler for MismatchHandler {
66 fn job_type(&self) -> JobType<'static> {
67 JobType::new("jobs.other")
68 }
69
70 async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
71 Ok(())
72 }
73 }
74
75 #[test]
76 fn rejects_blank_declared_job_type() {
77 let error = JobCatalog::new()
78 .try_job(" ", StaticHandler("jobs.test"))
79 .expect_err("blank declared job type");
80 assert!(matches!(
81 error,
82 CatalogError::InvalidJobType {
83 job_type,
84 source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
85 } if job_type == " "
86 ));
87 }
88
89 #[test]
90 fn rejects_blank_handler_job_type() {
91 let error = JobCatalog::new()
92 .try_job("jobs.test", BlankHandler)
93 .expect_err("blank handler job type");
94 assert!(matches!(
95 error,
96 CatalogError::InvalidHandlerJobType {
97 handler_job_type,
98 source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
99 } if handler_job_type == " "
100 ));
101 }
102
103 #[test]
104 fn rejects_handler_job_type_mismatch() {
105 let error = JobCatalog::new()
106 .try_job("jobs.catalog.expected", MismatchHandler)
107 .expect_err("handler mismatch");
108 assert!(matches!(error, CatalogError::HandlerJobTypeMismatch { .. }));
109 }
110
111 #[test]
112 fn rejects_duplicate_job_type() {
113 let error = JobCatalog::new()
114 .try_job("jobs.dup", StaticHandler("jobs.dup"))
115 .expect("first registration")
116 .try_job("jobs.dup", StaticHandler("jobs.dup"))
117 .expect_err("duplicate job type");
118 assert!(matches!(error, CatalogError::DuplicateJobType { .. }));
119 }
120
121 #[test]
122 fn to_registry_preserves_handlers_and_retry_overrides() {
123 let catalog = JobCatalog::new()
124 .job("jobs.test", StaticHandler("jobs.test"))
125 .try_retry_delay_override("jobs.test", "job.test.wait", 42)
126 .expect("retry override");
127 let registry = catalog.to_registry();
128 assert!(registry.get(JobType::new("jobs.test")).is_some());
129 assert_eq!(
130 registry.retry_delay_override(JobType::new("jobs.test"), "job.test.wait"),
131 Some(42)
132 );
133 }
134
135 #[test]
136 fn retry_override_rejects_unknown_job_type() {
137 let error = JobCatalog::new()
138 .try_retry_delay_override("jobs.missing", "job.test.wait", 42)
139 .expect_err("unknown job type");
140 assert!(matches!(error, CatalogError::UnknownJobType { .. }));
141 }
142
143 #[test]
144 fn retry_override_validates_job_type_before_override_values() {
145 let error = JobCatalog::new()
146 .try_retry_delay_override("jobs.missing", " ", 0)
147 .expect_err("unknown job type");
148 assert!(matches!(error, CatalogError::UnknownJobType { .. }));
149 }
150
151 #[test]
152 fn retry_override_rejects_blank_failure_code() {
153 let error = JobCatalog::new()
154 .job("jobs.test", StaticHandler("jobs.test"))
155 .try_retry_delay_override("jobs.test", " ", 42)
156 .expect_err("blank failure code");
157 assert!(matches!(error, CatalogError::InvalidFailureCode));
158 }
159
160 #[test]
161 fn retry_override_rejects_non_positive_delay() {
162 let error = JobCatalog::new()
163 .job("jobs.test", StaticHandler("jobs.test"))
164 .try_retry_delay_override("jobs.test", "job.test.wait", 0)
165 .expect_err("invalid retry delay");
166 assert!(matches!(error, CatalogError::InvalidRetryDelay));
167 }
168
169 #[test]
170 fn exact_sync_scope_rejects_blank_job_type() {
171 let error = JobCatalogSyncScope::job_type(" ").expect_err("blank exact sync job type");
172 assert!(matches!(
173 error,
174 CatalogError::InvalidExactSyncScopeJobType {
175 job_type,
176 source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
177 } if job_type == " "
178 ));
179 }
180
181 #[test]
182 fn exact_sync_scope_rejects_empty_job_type_list() {
183 let error = JobCatalogSyncScope::job_types(Vec::<String>::new())
184 .expect_err("empty exact sync scope");
185 assert!(matches!(error, CatalogError::InvalidExactSyncScope));
186 }
187
188 #[test]
189 fn job_enqueue_rejects_unknown_job_type() {
190 let catalog = JobCatalog::new();
191 let error = catalog
192 .job_enqueue(&CatalogJobEnqueueInput {
193 job_type: "jobs.missing",
194 organization_id: None,
195 payload: &json!({}),
196 priority: None,
197 max_attempts: None,
198 timeout_seconds: None,
199 next_run_at: None,
200 idempotency_key: None,
201 stage: None,
202 })
203 .expect_err("unknown job type");
204 assert!(matches!(error, CatalogError::UnknownJobType { .. }));
205 }
206
207 #[test]
208 fn job_enqueue_rejects_disabled_catalog_defaults() {
209 let catalog = JobCatalog::new()
210 .job("jobs.test", StaticHandler("jobs.test"))
211 .defaults(JobCatalogDefaults::new().enabled(false));
212 let error = catalog
213 .job_enqueue(&CatalogJobEnqueueInput {
214 job_type: "jobs.test",
215 organization_id: None,
216 payload: &json!({}),
217 priority: None,
218 max_attempts: None,
219 timeout_seconds: None,
220 next_run_at: None,
221 idempotency_key: None,
222 stage: None,
223 })
224 .expect_err("disabled job type");
225 assert!(matches!(error, CatalogError::DisabledJobType { .. }));
226 }
227
228 #[test]
229 fn workflow_dag_propagates_blank_step_key_error() {
230 let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
231 let error = catalog
232 .workflow_dag("workflow.test", &json!({}))
233 .job(" ", "jobs.test", &json!({}))
234 .expect_err("blank step key");
235 assert!(matches!(
236 error,
237 CatalogError::WorkflowBuild(WorkflowBuildError::BlankStepKey { .. })
238 ));
239 }
240
241 #[test]
242 fn workflow_dag_propagates_unknown_dependency_error() {
243 let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
244 let error = catalog
245 .workflow_dag("workflow.test", &json!({}))
246 .job("first", "jobs.test", &json!({}))
247 .expect("first step")
248 .after_success("missing", ["first"])
249 .expect_err("unknown dependency target");
250 assert!(matches!(
251 error,
252 CatalogError::WorkflowBuild(WorkflowBuildError::UnknownStepKey { .. })
253 ));
254 }
255
256 #[test]
257 fn workflow_dag_propagates_empty_workflow_error() {
258 let catalog = JobCatalog::new();
259 let error = catalog
260 .workflow_dag("workflow.test", &json!({}))
261 .try_build()
262 .expect_err("empty workflow");
263 assert!(matches!(
264 error,
265 CatalogError::WorkflowBuild(WorkflowBuildError::EmptySteps)
266 ));
267 }
268}