use uuid::Uuid;
use super::super::identifiers::{StepKey, WorkflowType};
use super::errors::WorkflowBuildError;
use super::run_builder::WorkflowRunEnqueueBuilder;
use super::step_builder::WorkflowStepEnqueueBuilder;
use super::types::WorkflowRunEnqueue;
#[derive(Debug, Clone)]
struct StepSlot<'a> {
step_key: StepKey<'a>,
builder: Option<WorkflowStepEnqueueBuilder<'a>>,
}
#[doc(alias = "dag")]
#[doc(alias = "orchestration")]
#[doc(alias = "dependencies")]
#[derive(Debug, Clone)]
pub struct WorkflowDagBuilder<'a> {
workflow_type: WorkflowType<'a>,
organization_id: Option<Uuid>,
metadata: &'a serde_json::Value,
idempotency_key: Option<&'a str>,
steps: Vec<StepSlot<'a>>,
}
impl<'a> WorkflowDagBuilder<'a> {
#[must_use]
pub fn new(workflow_type: &'a str, metadata: &'a serde_json::Value) -> Self {
Self {
workflow_type: WorkflowType::new(workflow_type),
organization_id: None,
metadata,
idempotency_key: None,
steps: Vec::new(),
}
}
pub fn try_new(
workflow_type: &'a str,
metadata: &'a serde_json::Value,
) -> Result<Self, WorkflowBuildError> {
let workflow_type = WorkflowType::try_new(workflow_type)
.map_err(|_| WorkflowBuildError::BlankWorkflowType)?;
Ok(Self {
workflow_type,
organization_id: None,
metadata,
idempotency_key: None,
steps: Vec::new(),
})
}
#[must_use]
pub fn organization_id(mut self, organization_id: Uuid) -> Self {
self.organization_id = Some(organization_id);
self
}
#[must_use]
pub fn clear_organization_id(mut self) -> Self {
self.organization_id = None;
self
}
#[must_use]
pub fn idempotency_key(mut self, idempotency_key: &'a str) -> Self {
self.idempotency_key = Some(idempotency_key);
self
}
#[must_use]
pub fn clear_idempotency_key(mut self) -> Self {
self.idempotency_key = None;
self
}
pub fn job(
mut self,
step_key: &'a str,
job_type: &'a str,
payload: &'a serde_json::Value,
) -> Result<Self, WorkflowBuildError> {
let validated_step_key = StepKey::try_new(step_key)
.map_err(|_| WorkflowBuildError::BlankStepKey { step_index: None })?;
if self
.steps
.iter()
.any(|slot| slot.step_key == validated_step_key)
{
return Err(WorkflowBuildError::DuplicateStepKey {
step_key: validated_step_key.as_str().to_owned(),
});
}
let builder = WorkflowStepEnqueueBuilder::try_new(step_key, job_type, payload)?;
self.steps.push(StepSlot {
step_key: validated_step_key,
builder: Some(builder),
});
Ok(self)
}
pub fn after_success<I>(
self,
step_key: &'a str,
prerequisites: I,
) -> Result<Self, WorkflowBuildError>
where
I: IntoIterator<Item = &'a str>,
{
self.after(
step_key,
prerequisites,
WorkflowStepEnqueueBuilder::depends_on_success,
)
}
pub fn after_terminal<I>(
self,
step_key: &'a str,
prerequisites: I,
) -> Result<Self, WorkflowBuildError>
where
I: IntoIterator<Item = &'a str>,
{
self.after(
step_key,
prerequisites,
WorkflowStepEnqueueBuilder::depends_on_terminal,
)
}
fn after<I, F>(
mut self,
step_key: &'a str,
prerequisites: I,
attach: F,
) -> Result<Self, WorkflowBuildError>
where
I: IntoIterator<Item = &'a str>,
F: FnOnce(WorkflowStepEnqueueBuilder<'a>, &[StepKey<'a>]) -> WorkflowStepEnqueueBuilder<'a>,
{
let target_step_key = StepKey::try_new(step_key)
.map_err(|_| WorkflowBuildError::BlankStepKey { step_index: None })?;
let target_step_key_string = target_step_key.as_str().to_owned();
let prerequisite_step_keys = prerequisites
.into_iter()
.map(|prerequisite_step_key| {
StepKey::try_new(prerequisite_step_key).map_err(|_| {
WorkflowBuildError::BlankDependencyStepKey {
step_key: target_step_key_string.clone(),
}
})
})
.collect::<Result<Vec<_>, _>>()?;
let slot = self
.steps
.iter_mut()
.find(|slot| slot.step_key == target_step_key)
.ok_or(WorkflowBuildError::UnknownStepKey {
step_key: target_step_key_string,
})?;
let builder = slot
.builder
.take()
.expect("step builder is present until try_build consumes it");
slot.builder = Some(attach(builder, &prerequisite_step_keys));
Ok(self)
}
pub fn build(self) -> Result<WorkflowRunEnqueue<'a>, WorkflowBuildError> {
self.try_build()
}
pub fn try_build(self) -> Result<WorkflowRunEnqueue<'a>, WorkflowBuildError> {
let mut built_steps = Vec::with_capacity(self.steps.len());
for mut slot in self.steps {
let builder = slot
.builder
.take()
.expect("step builder is present until try_build consumes it");
built_steps.push(builder.try_build()?);
}
let mut run_builder = WorkflowRunEnqueueBuilder::new(self.workflow_type, self.metadata);
if let Some(organization_id) = self.organization_id {
run_builder = run_builder.organization_id(organization_id);
}
if let Some(idempotency_key) = self.idempotency_key {
run_builder = run_builder.idempotency_key(idempotency_key);
}
run_builder.extend_steps(built_steps).try_build()
}
}