mod error;
mod inputs;
mod registration;
mod sync;
mod types;
mod workflow;
pub use error::CatalogError;
pub use inputs::{CatalogJobEnqueueInput, CatalogJobScheduleInput};
pub use types::{
JobCatalog, JobCatalogDefaults, JobCatalogExactSyncReport, JobCatalogSyncReport,
JobCatalogSyncScope,
};
pub use workflow::CatalogWorkflowDagBuilder;
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use runledger_core::jobs::{JobContext, JobFailure, JobHandler, JobType, WorkflowBuildError};
use serde_json::Value;
use serde_json::json;
struct StaticHandler(&'static str);
#[async_trait]
impl JobHandler for StaticHandler {
fn job_type(&self) -> JobType<'static> {
JobType::new(self.0)
}
async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
Ok(())
}
}
struct BlankHandler;
#[async_trait]
impl JobHandler for BlankHandler {
fn job_type(&self) -> JobType<'static> {
JobType::new(" ")
}
async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
Ok(())
}
}
struct MismatchHandler;
#[async_trait]
impl JobHandler for MismatchHandler {
fn job_type(&self) -> JobType<'static> {
JobType::new("jobs.other")
}
async fn execute(&self, _context: JobContext, _payload: Value) -> Result<(), JobFailure> {
Ok(())
}
}
#[test]
fn rejects_blank_declared_job_type() {
let error = JobCatalog::new()
.try_job(" ", StaticHandler("jobs.test"))
.expect_err("blank declared job type");
assert!(matches!(
error,
CatalogError::InvalidJobType {
job_type,
source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
} if job_type == " "
));
}
#[test]
fn rejects_blank_handler_job_type() {
let error = JobCatalog::new()
.try_job("jobs.test", BlankHandler)
.expect_err("blank handler job type");
assert!(matches!(
error,
CatalogError::InvalidHandlerJobType {
handler_job_type,
source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
} if handler_job_type == " "
));
}
#[test]
fn rejects_handler_job_type_mismatch() {
let error = JobCatalog::new()
.try_job("jobs.catalog.expected", MismatchHandler)
.expect_err("handler mismatch");
assert!(matches!(error, CatalogError::HandlerJobTypeMismatch { .. }));
}
#[test]
fn rejects_duplicate_job_type() {
let error = JobCatalog::new()
.try_job("jobs.dup", StaticHandler("jobs.dup"))
.expect("first registration")
.try_job("jobs.dup", StaticHandler("jobs.dup"))
.expect_err("duplicate job type");
assert!(matches!(error, CatalogError::DuplicateJobType { .. }));
}
#[test]
fn to_registry_preserves_handlers_and_retry_overrides() {
let catalog = JobCatalog::new()
.job("jobs.test", StaticHandler("jobs.test"))
.try_retry_delay_override("jobs.test", "job.test.wait", 42)
.expect("retry override");
let registry = catalog.to_registry();
assert!(registry.get(JobType::new("jobs.test")).is_some());
assert_eq!(
registry.retry_delay_override(JobType::new("jobs.test"), "job.test.wait"),
Some(42)
);
}
#[test]
fn retry_override_rejects_unknown_job_type() {
let error = JobCatalog::new()
.try_retry_delay_override("jobs.missing", "job.test.wait", 42)
.expect_err("unknown job type");
assert!(matches!(error, CatalogError::UnknownJobType { .. }));
}
#[test]
fn retry_override_validates_job_type_before_override_values() {
let error = JobCatalog::new()
.try_retry_delay_override("jobs.missing", " ", 0)
.expect_err("unknown job type");
assert!(matches!(error, CatalogError::UnknownJobType { .. }));
}
#[test]
fn retry_override_rejects_blank_failure_code() {
let error = JobCatalog::new()
.job("jobs.test", StaticHandler("jobs.test"))
.try_retry_delay_override("jobs.test", " ", 42)
.expect_err("blank failure code");
assert!(matches!(error, CatalogError::InvalidFailureCode));
}
#[test]
fn retry_override_rejects_non_positive_delay() {
let error = JobCatalog::new()
.job("jobs.test", StaticHandler("jobs.test"))
.try_retry_delay_override("jobs.test", "job.test.wait", 0)
.expect_err("invalid retry delay");
assert!(matches!(error, CatalogError::InvalidRetryDelay));
}
#[test]
fn exact_sync_scope_rejects_blank_job_type() {
let error = JobCatalogSyncScope::job_type(" ").expect_err("blank exact sync job type");
assert!(matches!(
error,
CatalogError::InvalidExactSyncScopeJobType {
job_type,
source: runledger_core::jobs::IdentifierValidationError::BlankJobType,
} if job_type == " "
));
}
#[test]
fn exact_sync_scope_rejects_empty_job_type_list() {
let error = JobCatalogSyncScope::job_types(Vec::<String>::new())
.expect_err("empty exact sync scope");
assert!(matches!(error, CatalogError::InvalidExactSyncScope));
}
#[test]
fn job_enqueue_rejects_unknown_job_type() {
let catalog = JobCatalog::new();
let error = catalog
.job_enqueue(&CatalogJobEnqueueInput {
job_type: "jobs.missing",
organization_id: None,
payload: &json!({}),
priority: None,
max_attempts: None,
timeout_seconds: None,
next_run_at: None,
idempotency_key: None,
stage: None,
})
.expect_err("unknown job type");
assert!(matches!(error, CatalogError::UnknownJobType { .. }));
}
#[test]
fn job_enqueue_rejects_disabled_catalog_defaults() {
let catalog = JobCatalog::new()
.job("jobs.test", StaticHandler("jobs.test"))
.defaults(JobCatalogDefaults::new().enabled(false));
let error = catalog
.job_enqueue(&CatalogJobEnqueueInput {
job_type: "jobs.test",
organization_id: None,
payload: &json!({}),
priority: None,
max_attempts: None,
timeout_seconds: None,
next_run_at: None,
idempotency_key: None,
stage: None,
})
.expect_err("disabled job type");
assert!(matches!(error, CatalogError::DisabledJobType { .. }));
}
#[test]
fn workflow_dag_propagates_blank_step_key_error() {
let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
let error = catalog
.workflow_dag("workflow.test", &json!({}))
.job(" ", "jobs.test", &json!({}))
.expect_err("blank step key");
assert!(matches!(
error,
CatalogError::WorkflowBuild(WorkflowBuildError::BlankStepKey { .. })
));
}
#[test]
fn workflow_dag_propagates_unknown_dependency_error() {
let catalog = JobCatalog::new().job("jobs.test", StaticHandler("jobs.test"));
let error = catalog
.workflow_dag("workflow.test", &json!({}))
.job("first", "jobs.test", &json!({}))
.expect("first step")
.after_success("missing", ["first"])
.expect_err("unknown dependency target");
assert!(matches!(
error,
CatalogError::WorkflowBuild(WorkflowBuildError::UnknownStepKey { .. })
));
}
#[test]
fn workflow_dag_propagates_empty_workflow_error() {
let catalog = JobCatalog::new();
let error = catalog
.workflow_dag("workflow.test", &json!({}))
.try_build()
.expect_err("empty workflow");
assert!(matches!(
error,
CatalogError::WorkflowBuild(WorkflowBuildError::EmptySteps)
));
}
}