use uuid::Uuid;
use super::super::identifiers::{JobType, StepKey};
use super::super::status::JobStage;
use super::build_validation::validate_step_enqueue;
use super::errors::WorkflowBuildError;
use super::types::{
WorkflowDependencyReleaseMode, WorkflowStepDependencySpec, WorkflowStepEnqueue,
WorkflowStepExecutionKind,
};
#[derive(Debug, Clone)]
pub struct WorkflowStepEnqueueBuilder<'a> {
step_key: StepKey<'a>,
execution_kind: WorkflowStepExecutionKind,
job_type: Option<JobType<'a>>,
organization_id: Option<Uuid>,
payload: &'a serde_json::Value,
priority: Option<i32>,
max_attempts: Option<i32>,
timeout_seconds: Option<i32>,
stage: Option<JobStage>,
dependencies: Vec<WorkflowStepDependencySpec<'a>>,
}
impl<'a> WorkflowStepEnqueueBuilder<'a> {
#[must_use]
pub fn new(
step_key: StepKey<'a>,
job_type: JobType<'a>,
payload: &'a serde_json::Value,
) -> Self {
Self {
step_key,
execution_kind: WorkflowStepExecutionKind::Job,
job_type: Some(job_type),
organization_id: None,
payload,
priority: None,
max_attempts: None,
timeout_seconds: None,
stage: Some(JobStage::Queued),
dependencies: Vec::new(),
}
}
pub fn try_new(
step_key: &'a str,
job_type: &'a str,
payload: &'a serde_json::Value,
) -> Result<Self, WorkflowBuildError> {
let step_key = StepKey::try_new(step_key)
.map_err(|_| WorkflowBuildError::BlankStepKey { step_index: None })?;
let job_type =
JobType::try_new(job_type).map_err(|_| WorkflowBuildError::BlankStepJobType {
step_key: step_key.as_str().to_owned(),
})?;
Ok(Self::new(step_key, job_type, payload))
}
#[must_use]
pub fn new_external(step_key: StepKey<'a>, payload: &'a serde_json::Value) -> Self {
Self {
step_key,
execution_kind: WorkflowStepExecutionKind::External,
job_type: None,
organization_id: None,
payload,
priority: None,
max_attempts: None,
timeout_seconds: None,
stage: None,
dependencies: Vec::new(),
}
}
#[must_use]
pub fn organization_id(mut self, organization_id: Uuid) -> Self {
self.organization_id = Some(organization_id);
self
}
#[must_use]
pub fn clear_organization_id(mut self) -> Self {
self.organization_id = None;
self
}
pub fn try_new_external(
step_key: &'a str,
payload: &'a serde_json::Value,
) -> Result<Self, WorkflowBuildError> {
let step_key = StepKey::try_new(step_key)
.map_err(|_| WorkflowBuildError::BlankStepKey { step_index: None })?;
Ok(Self::new_external(step_key, payload))
}
#[must_use]
pub fn priority(mut self, priority: i32) -> Self {
self.priority = Some(priority);
self
}
#[must_use]
pub fn clear_priority(mut self) -> Self {
self.priority = None;
self
}
#[must_use]
pub fn max_attempts(mut self, max_attempts: i32) -> Self {
self.max_attempts = Some(max_attempts);
self
}
#[must_use]
pub fn clear_max_attempts(mut self) -> Self {
self.max_attempts = None;
self
}
#[must_use]
pub fn timeout_seconds(mut self, timeout_seconds: i32) -> Self {
self.timeout_seconds = Some(timeout_seconds);
self
}
#[must_use]
pub fn clear_timeout_seconds(mut self) -> Self {
self.timeout_seconds = None;
self
}
#[must_use]
pub fn stage(mut self, stage: JobStage) -> Self {
self.stage = Some(stage);
self
}
#[must_use]
pub fn clear_stage(mut self) -> Self {
self.stage = None;
self
}
#[must_use]
pub fn set_dependencies(
mut self,
dependencies: impl IntoIterator<Item = WorkflowStepDependencySpec<'a>>,
) -> Self {
self.dependencies = dependencies.into_iter().collect();
self
}
#[must_use]
pub fn dependency(mut self, dependency: WorkflowStepDependencySpec<'a>) -> Self {
self.dependencies.push(dependency);
self
}
#[must_use]
pub fn depends_on_terminal(self, prerequisite_step_keys: &[StepKey<'a>]) -> Self {
self.depends_on(
prerequisite_step_keys,
WorkflowDependencyReleaseMode::OnTerminal,
)
}
#[must_use]
pub fn depends_on_success(self, prerequisite_step_keys: &[StepKey<'a>]) -> Self {
self.depends_on(
prerequisite_step_keys,
WorkflowDependencyReleaseMode::OnSuccess,
)
}
fn depends_on(
mut self,
prerequisite_step_keys: &[StepKey<'a>],
release_mode: WorkflowDependencyReleaseMode,
) -> Self {
self.dependencies
.extend(
prerequisite_step_keys
.iter()
.copied()
.map(|prerequisite_step_key| WorkflowStepDependencySpec {
prerequisite_step_key,
release_mode: Some(release_mode),
}),
);
self
}
pub fn try_build(self) -> Result<WorkflowStepEnqueue<'a>, WorkflowBuildError> {
let step = WorkflowStepEnqueue {
step_key: self.step_key,
execution_kind: self.execution_kind,
job_type: self.job_type,
organization_id: self.organization_id,
payload: self.payload,
priority: self.priority,
max_attempts: self.max_attempts,
timeout_seconds: self.timeout_seconds,
stage: self.stage,
dependencies: self.dependencies,
};
validate_step_enqueue(&step, None)?;
Ok(step)
}
}