runledger_runtime/catalog/inputs.rs
1use chrono::{DateTime, Utc};
2use runledger_core::jobs::{JobStage, WorkflowStepEnqueueBuilder};
3use runledger_postgres::jobs::{JobEnqueue, JobScheduleUpsert};
4use serde_json::Value;
5use uuid::Uuid;
6
7use super::{CatalogError, JobCatalog};
8
9/// Input for building a [`JobEnqueue`] from a catalog-backed job type.
10#[derive(Debug, Clone)]
11pub struct CatalogJobEnqueueInput<'a> {
12 /// Catalog job type to enqueue.
13 pub job_type: &'a str,
14 /// Optional organization scope copied to the queued job.
15 pub organization_id: Option<Uuid>,
16 /// JSON payload stored with the queued job.
17 pub payload: &'a Value,
18 /// Optional queue priority override.
19 pub priority: Option<i32>,
20 /// Optional maximum-attempts override.
21 pub max_attempts: Option<i32>,
22 /// Optional execution timeout override, in seconds.
23 pub timeout_seconds: Option<i32>,
24 /// Optional future time before which the job should not run.
25 pub next_run_at: Option<DateTime<Utc>>,
26 /// Optional idempotency key for duplicate enqueue protection.
27 pub idempotency_key: Option<&'a str>,
28 /// Optional initial job stage.
29 pub stage: Option<JobStage>,
30}
31
32/// Input for building a [`JobScheduleUpsert`] from a catalog-backed job type.
33#[derive(Debug, Clone)]
34pub struct CatalogJobScheduleInput<'a> {
35 /// Unique schedule name.
36 pub name: &'a str,
37 /// Catalog job type enqueued when the schedule fires.
38 pub job_type: &'a str,
39 /// Optional organization scope copied into jobs for a new schedule.
40 ///
41 /// Existing schedules preserve their stored organization scope on conflict.
42 pub organization_id: Option<Uuid>,
43 /// JSON payload template copied into scheduled jobs.
44 pub payload_template: &'a Value,
45 /// Cron expression used by the runtime scheduler.
46 pub cron_expr: &'a str,
47 /// Whether a new schedule should be active.
48 ///
49 /// Existing schedules preserve their stored active state on conflict.
50 pub is_active: bool,
51 /// Next UTC instant at which the schedule is due.
52 pub next_fire_at: DateTime<Utc>,
53 /// Maximum deterministic jitter, in seconds, applied to future fire times.
54 pub max_jitter_seconds: i32,
55}
56
57impl JobCatalog {
58 /// Builds a [`JobEnqueue`] after validating the job type is registered and enabled.
59 ///
60 /// This checks catalog configuration only. Operator-disabled database rows
61 /// are still enforced by `runledger-postgres` when the job is enqueued.
62 /// Catalog defaults' enabled flag applies to every catalog entry; per-job
63 /// enabled overrides are not modeled yet.
64 pub fn job_enqueue<'a>(
65 &self,
66 input: &CatalogJobEnqueueInput<'a>,
67 ) -> Result<JobEnqueue<'a>, CatalogError> {
68 let job_type = self.require_catalog_enabled_job_type(input.job_type)?;
69 Ok(JobEnqueue {
70 job_type,
71 organization_id: input.organization_id,
72 payload: input.payload,
73 priority: input.priority,
74 max_attempts: input.max_attempts,
75 timeout_seconds: input.timeout_seconds,
76 next_run_at: input.next_run_at,
77 idempotency_key: input.idempotency_key,
78 stage: input.stage,
79 })
80 }
81
82 /// Builds a [`JobScheduleUpsert`] after validating the job type is registered and enabled.
83 ///
84 /// This checks catalog configuration only. Operator-disabled database rows
85 /// are still enforced by `runledger-postgres` when schedule-created jobs are
86 /// materialized.
87 /// Catalog defaults' enabled flag applies to every catalog entry; per-job
88 /// enabled overrides are not modeled yet.
89 pub fn job_schedule<'a>(
90 &self,
91 input: &CatalogJobScheduleInput<'a>,
92 ) -> Result<JobScheduleUpsert<'a>, CatalogError> {
93 let job_type = self.require_catalog_enabled_job_type(input.job_type)?;
94 Ok(JobScheduleUpsert {
95 name: input.name,
96 job_type,
97 organization_id: input.organization_id,
98 payload_template: input.payload_template,
99 cron_expr: input.cron_expr,
100 is_active: input.is_active,
101 next_fire_at: input.next_fire_at,
102 max_jitter_seconds: input.max_jitter_seconds,
103 })
104 }
105
106 /// Builds a workflow step after validating the job type is registered and enabled.
107 pub fn workflow_step<'a>(
108 &self,
109 step_key: &'a str,
110 job_type_name: &str,
111 payload: &'a Value,
112 ) -> Result<WorkflowStepEnqueueBuilder<'a>, CatalogError> {
113 let job_type = self.require_catalog_enabled_job_type(job_type_name)?;
114 WorkflowStepEnqueueBuilder::try_new(step_key, job_type.as_str(), payload)
115 .map_err(CatalogError::WorkflowBuild)
116 }
117}