Skip to main content

runledger_runtime/catalog/
inputs.rs

1use chrono::{DateTime, Utc};
2use runledger_core::jobs::{JobStage, WorkflowStepEnqueueBuilder};
3use runledger_postgres::jobs::{JobEnqueue, JobScheduleUpsert};
4use serde_json::Value;
5use uuid::Uuid;
6
7use super::{CatalogError, JobCatalog};
8
9/// Input for building a [`JobEnqueue`] from a catalog-backed job type.
10#[derive(Debug, Clone)]
11pub struct CatalogJobEnqueueInput<'a> {
12    /// Catalog job type to enqueue.
13    pub job_type: &'a str,
14    /// Optional organization scope copied to the queued job.
15    pub organization_id: Option<Uuid>,
16    /// JSON payload stored with the queued job.
17    pub payload: &'a Value,
18    /// Optional queue priority override.
19    pub priority: Option<i32>,
20    /// Optional maximum-attempts override.
21    pub max_attempts: Option<i32>,
22    /// Optional execution timeout override, in seconds.
23    pub timeout_seconds: Option<i32>,
24    /// Optional future time before which the job should not run.
25    pub next_run_at: Option<DateTime<Utc>>,
26    /// Optional idempotency key for duplicate enqueue protection.
27    pub idempotency_key: Option<&'a str>,
28    /// Optional initial job stage.
29    pub stage: Option<JobStage>,
30}
31
32/// Input for building a [`JobScheduleUpsert`] from a catalog-backed job type.
33#[derive(Debug, Clone)]
34pub struct CatalogJobScheduleInput<'a> {
35    /// Unique schedule name.
36    pub name: &'a str,
37    /// Catalog job type enqueued when the schedule fires.
38    pub job_type: &'a str,
39    /// Optional organization scope copied into jobs for a new schedule.
40    ///
41    /// Existing schedules preserve their stored organization scope on conflict.
42    pub organization_id: Option<Uuid>,
43    /// JSON payload template copied into scheduled jobs.
44    pub payload_template: &'a Value,
45    /// Cron expression used by the runtime scheduler.
46    pub cron_expr: &'a str,
47    /// Whether a new schedule should be active.
48    ///
49    /// Existing schedules preserve their stored active state on conflict.
50    pub is_active: bool,
51    /// Next UTC instant at which the schedule is due.
52    pub next_fire_at: DateTime<Utc>,
53    /// Maximum deterministic jitter, in seconds, applied to future fire times.
54    pub max_jitter_seconds: i32,
55}
56
57impl JobCatalog {
58    /// Builds a [`JobEnqueue`] after validating the job type is registered and enabled.
59    ///
60    /// This checks catalog configuration only. Operator-disabled database rows
61    /// are still enforced by `runledger-postgres` when the job is enqueued.
62    /// Catalog defaults' enabled flag applies to every catalog entry; per-job
63    /// enabled overrides are not modeled yet.
64    pub fn job_enqueue<'a>(
65        &self,
66        input: &CatalogJobEnqueueInput<'a>,
67    ) -> Result<JobEnqueue<'a>, CatalogError> {
68        let job_type = self.require_catalog_enabled_job_type(input.job_type)?;
69        Ok(JobEnqueue {
70            job_type,
71            organization_id: input.organization_id,
72            payload: input.payload,
73            priority: input.priority,
74            max_attempts: input.max_attempts,
75            timeout_seconds: input.timeout_seconds,
76            next_run_at: input.next_run_at,
77            idempotency_key: input.idempotency_key,
78            stage: input.stage,
79        })
80    }
81
82    /// Builds a [`JobScheduleUpsert`] after validating the job type is registered and enabled.
83    ///
84    /// This checks catalog configuration only. Operator-disabled database rows
85    /// are still enforced by `runledger-postgres` when schedule-created jobs are
86    /// materialized.
87    /// Catalog defaults' enabled flag applies to every catalog entry; per-job
88    /// enabled overrides are not modeled yet.
89    pub fn job_schedule<'a>(
90        &self,
91        input: &CatalogJobScheduleInput<'a>,
92    ) -> Result<JobScheduleUpsert<'a>, CatalogError> {
93        let job_type = self.require_catalog_enabled_job_type(input.job_type)?;
94        Ok(JobScheduleUpsert {
95            name: input.name,
96            job_type,
97            organization_id: input.organization_id,
98            payload_template: input.payload_template,
99            cron_expr: input.cron_expr,
100            is_active: input.is_active,
101            next_fire_at: input.next_fire_at,
102            max_jitter_seconds: input.max_jitter_seconds,
103        })
104    }
105
106    /// Builds a workflow step after validating the job type is registered and enabled.
107    pub fn workflow_step<'a>(
108        &self,
109        step_key: &'a str,
110        job_type_name: &str,
111        payload: &'a Value,
112    ) -> Result<WorkflowStepEnqueueBuilder<'a>, CatalogError> {
113        let job_type = self.require_catalog_enabled_job_type(job_type_name)?;
114        WorkflowStepEnqueueBuilder::try_new(step_key, job_type.as_str(), payload)
115            .map_err(CatalogError::WorkflowBuild)
116    }
117}