Skip to main content

runledger_runtime/catalog/
workflow.rs

1use runledger_core::jobs::{WorkflowDagBuilder, WorkflowRunEnqueue};
2use serde_json::Value;
3use uuid::Uuid;
4
5use super::{CatalogError, JobCatalog};
6
7/// Workflow DAG builder that validates job types against a [`JobCatalog`].
8#[derive(Debug, Clone)]
9pub struct CatalogWorkflowDagBuilder<'a, 'catalog> {
10    pub(super) catalog: &'catalog JobCatalog,
11    pub(super) inner: WorkflowDagBuilder<'a>,
12}
13
14impl JobCatalog {
15    /// Starts a workflow DAG builder that validates step job types against the catalog.
16    #[must_use]
17    pub fn workflow_dag<'a>(
18        &self,
19        workflow_type: &'a str,
20        metadata: &'a Value,
21    ) -> CatalogWorkflowDagBuilder<'a, '_> {
22        CatalogWorkflowDagBuilder {
23            catalog: self,
24            inner: WorkflowDagBuilder::new(workflow_type, metadata),
25        }
26    }
27}
28
29impl<'a, 'catalog> CatalogWorkflowDagBuilder<'a, 'catalog> {
30    /// Sets the organization scope for the workflow run and its steps by default.
31    #[must_use]
32    pub fn organization_id(mut self, organization_id: Uuid) -> Self {
33        self.inner = self.inner.organization_id(organization_id);
34        self
35    }
36
37    /// Clears the workflow-level organization scope.
38    #[must_use]
39    pub fn clear_organization_id(mut self) -> Self {
40        self.inner = self.inner.clear_organization_id();
41        self
42    }
43
44    /// Sets the workflow idempotency key.
45    #[must_use]
46    pub fn idempotency_key(mut self, idempotency_key: &'a str) -> Self {
47        self.inner = self.inner.idempotency_key(idempotency_key);
48        self
49    }
50
51    /// Clears the workflow idempotency key.
52    #[must_use]
53    pub fn clear_idempotency_key(mut self) -> Self {
54        self.inner = self.inner.clear_idempotency_key();
55        self
56    }
57
58    /// Adds a job step after validating `job_type_name` against enabled catalog entries.
59    ///
60    /// # Errors
61    /// Returns [`CatalogError`] when the job type is unknown or disabled, or when
62    /// the underlying workflow builder rejects the step.
63    pub fn job(
64        mut self,
65        step_key: &'a str,
66        job_type_name: &str,
67        payload: &'a Value,
68    ) -> Result<Self, CatalogError> {
69        let job_type = self
70            .catalog
71            .require_catalog_enabled_job_type(job_type_name)?;
72        self.inner = self
73            .inner
74            .job(step_key, job_type.as_str(), payload)
75            .map_err(CatalogError::WorkflowBuild)?;
76        Ok(self)
77    }
78
79    /// Adds success dependencies to an existing workflow step.
80    ///
81    /// # Errors
82    /// Returns [`CatalogError::WorkflowBuild`] when the underlying workflow
83    /// builder rejects the dependency edge.
84    pub fn after_success<I>(self, step_key: &'a str, prerequisites: I) -> Result<Self, CatalogError>
85    where
86        I: IntoIterator<Item = &'a str>,
87    {
88        self.inner
89            .after_success(step_key, prerequisites)
90            .map_err(CatalogError::WorkflowBuild)
91            .map(|inner| Self {
92                catalog: self.catalog,
93                inner,
94            })
95    }
96
97    /// Adds terminal-state dependencies to an existing workflow step.
98    ///
99    /// # Errors
100    /// Returns [`CatalogError::WorkflowBuild`] when the underlying workflow
101    /// builder rejects the dependency edge.
102    pub fn after_terminal<I>(
103        self,
104        step_key: &'a str,
105        prerequisites: I,
106    ) -> Result<Self, CatalogError>
107    where
108        I: IntoIterator<Item = &'a str>,
109    {
110        self.inner
111            .after_terminal(step_key, prerequisites)
112            .map_err(CatalogError::WorkflowBuild)
113            .map(|inner| Self {
114                catalog: self.catalog,
115                inner,
116            })
117    }
118
119    /// Builds the workflow enqueue payload.
120    ///
121    /// This is an alias for [`Self::try_build`].
122    ///
123    /// # Errors
124    /// Returns [`CatalogError::WorkflowBuild`] when final workflow validation fails.
125    pub fn build(self) -> Result<WorkflowRunEnqueue<'a>, CatalogError> {
126        self.try_build()
127    }
128
129    /// Builds the workflow enqueue payload.
130    ///
131    /// # Errors
132    /// Returns [`CatalogError::WorkflowBuild`] when final workflow validation fails.
133    pub fn try_build(self) -> Result<WorkflowRunEnqueue<'a>, CatalogError> {
134        self.inner.try_build().map_err(CatalogError::WorkflowBuild)
135    }
136}