use crate::{Chain, Chord, CompensationWorkflow, Group, Signature};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub struct Saga {
pub workflow: CompensationWorkflow,
pub isolation: SagaIsolation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SagaIsolation {
ReadUncommitted,
ReadCommitted,
Serializable,
}
impl Saga {
pub fn new(workflow: CompensationWorkflow) -> Self {
Self {
workflow,
isolation: SagaIsolation::ReadCommitted,
}
}
pub fn with_isolation(mut self, isolation: SagaIsolation) -> Self {
self.isolation = isolation;
self
}
}
impl std::fmt::Display for Saga {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Saga[{} steps, isolation={:?}]",
self.workflow.len(),
self.isolation
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScatterGather {
pub scatter: Signature,
pub workers: Vec<Signature>,
pub gather: Signature,
pub timeout: Option<u64>,
}
impl ScatterGather {
pub fn new(scatter: Signature, workers: Vec<Signature>, gather: Signature) -> Self {
Self {
scatter,
workers,
gather,
timeout: None,
}
}
pub fn with_timeout(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
}
impl std::fmt::Display for ScatterGather {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ScatterGather[scatter={}, {} workers, gather={}]",
self.scatter.task,
self.workers.len(),
self.gather.task
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Pipeline {
pub stages: Vec<Signature>,
pub buffer_size: Option<usize>,
}
impl Pipeline {
pub fn new() -> Self {
Self {
stages: Vec::new(),
buffer_size: None,
}
}
pub fn stage(mut self, stage: Signature) -> Self {
self.stages.push(stage);
self
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = Some(size);
self
}
pub fn is_empty(&self) -> bool {
self.stages.is_empty()
}
pub fn len(&self) -> usize {
self.stages.len()
}
}
impl Default for Pipeline {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for Pipeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Pipeline[{} stages]", self.stages.len())?;
if let Some(buf) = self.buffer_size {
write!(f, " buffer={}", buf)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FanOut {
pub source: Signature,
pub consumers: Vec<Signature>,
}
impl FanOut {
pub fn new(source: Signature) -> Self {
Self {
source,
consumers: Vec::new(),
}
}
pub fn consumer(mut self, consumer: Signature) -> Self {
self.consumers.push(consumer);
self
}
pub fn len(&self) -> usize {
self.consumers.len()
}
pub fn is_empty(&self) -> bool {
self.consumers.is_empty()
}
}
impl std::fmt::Display for FanOut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FanOut[source={}, {} consumers]",
self.source.task,
self.consumers.len()
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FanIn {
pub sources: Vec<Signature>,
pub aggregator: Signature,
}
impl FanIn {
pub fn new(aggregator: Signature) -> Self {
Self {
sources: Vec::new(),
aggregator,
}
}
pub fn source(mut self, source: Signature) -> Self {
self.sources.push(source);
self
}
pub fn len(&self) -> usize {
self.sources.len()
}
pub fn is_empty(&self) -> bool {
self.sources.is_empty()
}
}
impl std::fmt::Display for FanIn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FanIn[{} sources, aggregator={}]",
self.sources.len(),
self.aggregator.task
)
}
}
#[derive(Debug, Clone)]
pub struct ValidationResult {
pub valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
impl ValidationResult {
pub fn valid() -> Self {
Self {
valid: true,
errors: Vec::new(),
warnings: Vec::new(),
}
}
pub fn invalid(error: impl Into<String>) -> Self {
Self {
valid: false,
errors: vec![error.into()],
warnings: Vec::new(),
}
}
pub fn add_error(&mut self, error: impl Into<String>) {
self.errors.push(error.into());
self.valid = false;
}
pub fn add_warning(&mut self, warning: impl Into<String>) {
self.warnings.push(warning.into());
}
}
impl std::fmt::Display for ValidationResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.valid {
write!(f, "Valid")?;
if !self.warnings.is_empty() {
write!(f, " ({} warnings)", self.warnings.len())?;
}
} else {
write!(f, "Invalid ({} errors)", self.errors.len())?;
}
Ok(())
}
}
pub trait WorkflowValidator {
fn validate(&self) -> ValidationResult;
}
impl WorkflowValidator for Chain {
fn validate(&self) -> ValidationResult {
let mut result = ValidationResult::valid();
if self.is_empty() {
result.add_error("Chain cannot be empty");
}
if self.len() > 100 {
result.add_warning(format!(
"Chain has {} tasks, which may be inefficient",
self.len()
));
}
result
}
}
impl WorkflowValidator for Group {
fn validate(&self) -> ValidationResult {
let mut result = ValidationResult::valid();
if self.is_empty() {
result.add_error("Group cannot be empty");
}
if self.len() > 1000 {
result.add_warning(format!(
"Group has {} tasks, which may overwhelm workers",
self.len()
));
}
result
}
}
impl WorkflowValidator for Chord {
fn validate(&self) -> ValidationResult {
let mut result = ValidationResult::valid();
if self.header.is_empty() {
result.add_error("Chord header cannot be empty");
}
result
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LoopControl {
Continue,
Break,
BreakWith { value: serde_json::Value },
}
impl LoopControl {
pub fn continue_loop() -> Self {
Self::Continue
}
pub fn break_loop() -> Self {
Self::Break
}
pub fn break_with(value: serde_json::Value) -> Self {
Self::BreakWith { value }
}
}
impl std::fmt::Display for LoopControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Continue => write!(f, "Continue"),
Self::Break => write!(f, "Break"),
Self::BreakWith { .. } => write!(f, "BreakWith"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub enum ErrorPropagationMode {
#[default]
StopOnFirstError,
ContinueOnError,
PartialFailure {
max_failures: usize,
max_failure_rate: Option<f64>,
},
}
impl ErrorPropagationMode {
pub fn partial_failure(max_failures: usize) -> Self {
Self::PartialFailure {
max_failures,
max_failure_rate: None,
}
}
pub fn partial_failure_with_rate(max_failures: usize, max_rate: f64) -> Self {
Self::PartialFailure {
max_failures,
max_failure_rate: Some(max_rate),
}
}
pub fn allows_continue(&self) -> bool {
!matches!(self, Self::StopOnFirstError)
}
}
impl std::fmt::Display for ErrorPropagationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StopOnFirstError => write!(f, "StopOnFirstError"),
Self::ContinueOnError => write!(f, "ContinueOnError"),
Self::PartialFailure {
max_failures,
max_failure_rate,
} => {
write!(f, "PartialFailure(max={})", max_failures)?;
if let Some(rate) = max_failure_rate {
write!(f, " rate={:.1}%", rate * 100.0)?;
}
Ok(())
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartialFailureTracker {
pub total_tasks: usize,
pub successful_tasks: usize,
pub failed_tasks: usize,
pub successful_task_ids: Vec<Uuid>,
pub failed_task_ids: Vec<(Uuid, String)>,
}
impl PartialFailureTracker {
pub fn new(total_tasks: usize) -> Self {
Self {
total_tasks,
successful_tasks: 0,
failed_tasks: 0,
successful_task_ids: Vec::new(),
failed_task_ids: Vec::new(),
}
}
pub fn record_success(&mut self, task_id: Uuid) {
self.successful_tasks += 1;
self.successful_task_ids.push(task_id);
}
pub fn record_failure(&mut self, task_id: Uuid, error: String) {
self.failed_tasks += 1;
self.failed_task_ids.push((task_id, error));
}
pub fn failure_rate(&self) -> f64 {
if self.total_tasks == 0 {
return 0.0;
}
self.failed_tasks as f64 / self.total_tasks as f64
}
pub fn success_rate(&self) -> f64 {
if self.total_tasks == 0 {
return 1.0;
}
self.successful_tasks as f64 / self.total_tasks as f64
}
pub fn exceeds_threshold(&self, mode: &ErrorPropagationMode) -> bool {
match mode {
ErrorPropagationMode::StopOnFirstError => self.failed_tasks > 0,
ErrorPropagationMode::ContinueOnError => false,
ErrorPropagationMode::PartialFailure {
max_failures,
max_failure_rate,
} => {
if self.failed_tasks >= *max_failures {
return true;
}
if let Some(rate) = max_failure_rate {
if self.failure_rate() > *rate {
return true;
}
}
false
}
}
}
pub fn should_continue(&self, mode: &ErrorPropagationMode) -> bool {
!self.exceeds_threshold(mode)
}
}
impl std::fmt::Display for PartialFailureTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PartialFailureTracker[success={}/{}, failed={}, rate={:.1}%]",
self.successful_tasks,
self.total_tasks,
self.failed_tasks,
self.failure_rate() * 100.0
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub enum IsolationLevel {
#[default]
None,
Resource {
max_memory_mb: Option<u64>,
max_cpu_percent: Option<u8>,
},
Error,
Full {
max_memory_mb: Option<u64>,
max_cpu_percent: Option<u8>,
},
}
impl IsolationLevel {
pub fn resource(max_memory_mb: u64) -> Self {
Self::Resource {
max_memory_mb: Some(max_memory_mb),
max_cpu_percent: None,
}
}
pub fn full(max_memory_mb: u64) -> Self {
Self::Full {
max_memory_mb: Some(max_memory_mb),
max_cpu_percent: None,
}
}
pub fn has_resource_limits(&self) -> bool {
matches!(self, Self::Resource { .. } | Self::Full { .. })
}
pub fn has_error_isolation(&self) -> bool {
matches!(self, Self::Error | Self::Full { .. })
}
}
impl std::fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "None"),
Self::Resource {
max_memory_mb,
max_cpu_percent,
} => {
write!(f, "Resource(")?;
if let Some(mem) = max_memory_mb {
write!(f, "mem={}MB", mem)?;
}
if let Some(cpu) = max_cpu_percent {
write!(f, " cpu={}%", cpu)?;
}
write!(f, ")")
}
Self::Error => write!(f, "Error"),
Self::Full {
max_memory_mb,
max_cpu_percent,
} => {
write!(f, "Full(")?;
if let Some(mem) = max_memory_mb {
write!(f, "mem={}MB", mem)?;
}
if let Some(cpu) = max_cpu_percent {
write!(f, " cpu={}%", cpu)?;
}
write!(f, ")")
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubWorkflowIsolation {
pub workflow_id: Uuid,
pub parent_workflow_id: Option<Uuid>,
pub isolation_level: IsolationLevel,
pub propagate_errors: bool,
pub propagate_cancellation: bool,
}
impl SubWorkflowIsolation {
pub fn new(workflow_id: Uuid, isolation_level: IsolationLevel) -> Self {
Self {
workflow_id,
parent_workflow_id: None,
isolation_level,
propagate_errors: true,
propagate_cancellation: true,
}
}
pub fn with_parent(mut self, parent_id: Uuid) -> Self {
self.parent_workflow_id = Some(parent_id);
self
}
pub fn no_error_propagation(mut self) -> Self {
self.propagate_errors = false;
self
}
pub fn no_cancellation_propagation(mut self) -> Self {
self.propagate_cancellation = false;
self
}
}
impl std::fmt::Display for SubWorkflowIsolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SubWorkflowIsolation[id={}, level={}, errors={}, cancel={}]",
self.workflow_id,
self.isolation_level,
self.propagate_errors,
self.propagate_cancellation
)
}
}