Skip to main content

runledger_runtime/catalog/
mod.rs

1//! Catalog-backed startup API for keeping handlers, job definitions, schedules,
2//! and workflow steps aligned.
3//!
4//! [`JobCatalog`] is the primary facade. Applications register handlers once,
5//! sync the catalog's definition defaults to PostgreSQL, and use the catalog
6//! helpers to build enqueue, schedule, and workflow inputs with job-type
7//! validation at the call site.
8//!
9//! Catalog defaults apply to every registered entry. This includes
10//! [`JobCatalogDefaults::is_enabled`]; per-job enabled overrides are not modeled
11//! by the catalog API yet.
12
13mod error;
14mod inputs;
15mod registration;
16mod sync;
17mod types;
18mod workflow;
19
20pub use error::CatalogError;
21pub use inputs::{CatalogJobEnqueueInput, CatalogJobScheduleInput};
22pub use types::{
23    JobCatalog, JobCatalogDefaults, JobCatalogExactSyncReport, JobCatalogSyncReport,
24    JobCatalogSyncScope,
25};
26pub use workflow::CatalogWorkflowDagBuilder;
27
28#[cfg(test)]
29mod tests {
30    use super::*;
31    use async_trait::async_trait;
32    use runledger_core::jobs::{JobContext, JobFailure, JobHandler, JobType, WorkflowBuildError};
33    use serde_json::Value;
34    use serde_json::json;
35
36    struct StaticHandler(&'static str);
37
38    #[async_trait]
39    impl JobHandler for StaticHandler {
40        fn job_type(&self) -> JobType<'static> {
41            JobType::new(self.0)
42        }
43
44        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
45            Ok(())
46        }
47    }
48
49    struct BlankHandler;
50
51    #[async_trait]
52    impl JobHandler for BlankHandler {
53        fn job_type(&self) -> JobType<'static> {
54            JobType::new("   ")
55        }
56
57        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
58            Ok(())
59        }
60    }
61
62    struct MismatchHandler;
63
64    #[async_trait]
65    impl JobHandler for MismatchHandler {
66        fn job_type(&self) -> JobType<'static> {
67            JobType::new("jobs.other")
68        }
69
70        async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
71            Ok(())
72        }
73    }
74
75    #[test]
76    fn rejects_blank_declared_job_type() {
77        let error = JobCatalog::new()
78            .try_job("   ", StaticHandler("jobs.test"))
79            .expect_err("blank declared job type");
80        assert!(matches!(
81            error,
82            CatalogError::InvalidJobType {
83                job_type,
84                source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
85            } if job_type == "   "
86        ));
87    }
88
89    #[test]
90    fn rejects_blank_handler_job_type() {
91        let error = JobCatalog::new()
92            .try_job("jobs.test", BlankHandler)
93            .expect_err("blank handler job type");
94        assert!(matches!(
95            error,
96            CatalogError::InvalidHandlerJobType {
97                handler_job_type,
98                source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
99            } if handler_job_type == "   "
100        ));
101    }
102
103    #[test]
104    fn rejects_handler_job_type_mismatch() {
105        let error = JobCatalog::new()
106            .try_job("jobs.catalog.expected", MismatchHandler)
107            .expect_err("handler mismatch");
108        assert!(matches!(error, CatalogError::HandlerJobTypeMismatch { .. }));
109    }
110
111    #[test]
112    fn rejects_duplicate_job_type() {
113        let error = JobCatalog::new()
114            .try_job("jobs.dup", StaticHandler("jobs.dup"))
115            .expect("first registration")
116            .try_job("jobs.dup", StaticHandler("jobs.dup"))
117            .expect_err("duplicate job type");
118        assert!(matches!(error, CatalogError::DuplicateJobType { .. }));
119    }
120
121    #[test]
122    fn to_registry_preserves_handlers_and_retry_overrides() {
123        let catalog = JobCatalog::new()
124            .job("jobs.test", StaticHandler("jobs.test"))
125            .try_retry_delay_override("jobs.test", "job.test.wait", 42)
126            .expect("retry override");
127        let registry = catalog.to_registry();
128        assert!(registry.get(JobType::new("jobs.test")).is_some());
129        assert_eq!(
130            registry.retry_delay_override(JobType::new("jobs.test"), "job.test.wait"),
131            Some(42)
132        );
133    }
134
135    #[test]
136    fn retry_override_rejects_unknown_job_type() {
137        let error = JobCatalog::new()
138            .try_retry_delay_override("jobs.missing", "job.test.wait", 42)
139            .expect_err("unknown job type");
140        assert!(matches!(error, CatalogError::UnknownJobType { .. }));
141    }
142
143    #[test]
144    fn retry_override_validates_job_type_before_override_values() {
145        let error = JobCatalog::new()
146            .try_retry_delay_override("jobs.missing", "   ", 0)
147            .expect_err("unknown job type");
148        assert!(matches!(error, CatalogError::UnknownJobType { .. }));
149    }
150
151    #[test]
152    fn retry_override_rejects_blank_failure_code() {
153        let error = JobCatalog::new()
154            .job("jobs.test", StaticHandler("jobs.test"))
155            .try_retry_delay_override("jobs.test", "   ", 42)
156            .expect_err("blank failure code");
157        assert!(matches!(error, CatalogError::InvalidFailureCode));
158    }
159
160    #[test]
161    fn retry_override_rejects_non_positive_delay() {
162        let error = JobCatalog::new()
163            .job("jobs.test", StaticHandler("jobs.test"))
164            .try_retry_delay_override("jobs.test", "job.test.wait", 0)
165            .expect_err("invalid retry delay");
166        assert!(matches!(error, CatalogError::InvalidRetryDelay));
167    }
168
169    #[test]
170    fn exact_sync_scope_rejects_blank_job_type() {
171        let error = JobCatalogSyncScope::job_type("   ").expect_err("blank exact sync job type");
172        assert!(matches!(
173            error,
174            CatalogError::InvalidExactSyncScopeJobType {
175                job_type,
176                source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
177            } if job_type == "   "
178        ));
179    }
180
181    #[test]
182    fn exact_sync_scope_rejects_empty_job_type_list() {
183        let error = JobCatalogSyncScope::job_types(Vec::<String>::new())
184            .expect_err("empty exact sync scope");
185        assert!(matches!(error, CatalogError::InvalidExactSyncScope));
186    }
187
188    #[test]
189    fn job_enqueue_rejects_unknown_job_type() {
190        let catalog = JobCatalog::new();
191        let error = catalog
192            .job_enqueue(&CatalogJobEnqueueInput {
193                job_type: "jobs.missing",
194                organization_id: None,
195                payload: &json!({}),
196                priority: None,
197                max_attempts: None,
198                timeout_seconds: None,
199                next_run_at: None,
200                idempotency_key: None,
201                stage: None,
202            })
203            .expect_err("unknown job type");
204        assert!(matches!(error, CatalogError::UnknownJobType { .. }));
205    }
206
207    #[test]
208    fn job_enqueue_rejects_disabled_catalog_defaults() {
209        let catalog = JobCatalog::new()
210            .job("jobs.test", StaticHandler("jobs.test"))
211            .defaults(JobCatalogDefaults::new().enabled(false));
212        let error = catalog
213            .job_enqueue(&CatalogJobEnqueueInput {
214                job_type: "jobs.test",
215                organization_id: None,
216                payload: &json!({}),
217                priority: None,
218                max_attempts: None,
219                timeout_seconds: None,
220                next_run_at: None,
221                idempotency_key: None,
222                stage: None,
223            })
224            .expect_err("disabled job type");
225        assert!(matches!(error, CatalogError::DisabledJobType { .. }));
226    }
227
228    #[test]
229    fn workflow_dag_propagates_blank_step_key_error() {
230        let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
231        let error = catalog
232            .workflow_dag("workflow.test", &json!({}))
233            .job("   ", "jobs.test", &json!({}))
234            .expect_err("blank step key");
235        assert!(matches!(
236            error,
237            CatalogError::WorkflowBuild(WorkflowBuildError::BlankStepKey { .. })
238        ));
239    }
240
241    #[test]
242    fn workflow_dag_propagates_unknown_dependency_error() {
243        let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
244        let error = catalog
245            .workflow_dag("workflow.test", &json!({}))
246            .job("first", "jobs.test", &json!({}))
247            .expect("first step")
248            .after_success("missing", ["first"])
249            .expect_err("unknown dependency target");
250        assert!(matches!(
251            error,
252            CatalogError::WorkflowBuild(WorkflowBuildError::UnknownStepKey { .. })
253        ));
254    }
255
256    #[test]
257    fn workflow_dag_propagates_empty_workflow_error() {
258        let catalog = JobCatalog::new();
259        let error = catalog
260            .workflow_dag("workflow.test", &json!({}))
261            .try_build()
262            .expect_err("empty workflow");
263        assert!(matches!(
264            error,
265            CatalogError::WorkflowBuild(WorkflowBuildError::EmptySteps)
266        ));
267    }
268}