use crate::activity::{Activity, ActivityError, ActivityResult};
use crate::capability::{Capability, CapabilityError, CapabilityResult, CapabilityProvider};
use crate::engine::ExecutionContext;
use crate::model::{ExclusiveGateway, ParallelGateway, InclusiveGateway};
use async_trait::async_trait;
use std::collections::HashMap;
pub struct ExclusiveGatewayActivity {
gateway: ExclusiveGateway,
}
impl ExclusiveGatewayActivity {
pub fn new(gateway: ExclusiveGateway) -> Self {
Self { gateway }
}
}
#[async_trait]
impl Activity for ExclusiveGatewayActivity {
async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
let definition = &context.process_definition;
let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
let mut selected_flow: Option<String> = None;
for flow in outgoing_flows {
if Some(&flow.id) == self.gateway.default_flow.as_ref() {
if selected_flow.is_none() {
selected_flow = Some(flow.target_ref.clone());
}
continue;
}
if let Some(condition) = &flow.condition_expression {
if selected_flow.is_none() {
selected_flow = Some(flow.target_ref.clone());
break;
}
} else {
if selected_flow.is_none() {
selected_flow = Some(flow.target_ref.clone());
}
}
}
match selected_flow {
Some(target) => Ok(ActivityResult::Continue {
next_elements: vec![target],
}),
None => Err(ActivityError::ExecutionFailed(
"No outgoing flow selected from exclusive gateway".to_string(),
)),
}
}
fn id(&self) -> &str {
&self.gateway.base.id
}
fn name(&self) -> Option<&str> {
self.gateway.base.name.as_deref()
}
}
pub struct ParallelGatewayActivity {
gateway: ParallelGateway,
}
impl ParallelGatewayActivity {
pub fn new(gateway: ParallelGateway) -> Self {
Self { gateway }
}
}
#[async_trait]
impl Activity for ParallelGatewayActivity {
async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
let definition = &context.process_definition;
let incoming_flows = definition.get_incoming_flows(&self.gateway.base.id);
let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
if !outgoing_flows.is_empty() {
let next_elements: Vec<String> = outgoing_flows
.iter()
.map(|flow| flow.target_ref.clone())
.collect();
Ok(ActivityResult::Continue { next_elements })
} else {
Ok(ActivityResult::Completed { output_variables: None })
}
}
fn id(&self) -> &str {
&self.gateway.base.id
}
fn name(&self) -> Option<&str> {
self.gateway.base.name.as_deref()
}
}
pub struct InclusiveGatewayActivity {
gateway: InclusiveGateway,
}
impl InclusiveGatewayActivity {
pub fn new(gateway: InclusiveGateway) -> Self {
Self { gateway }
}
}
#[async_trait]
impl Activity for InclusiveGatewayActivity {
async fn execute(&self, context: &mut ExecutionContext) -> Result<ActivityResult, ActivityError> {
let definition = &context.process_definition;
let outgoing_flows = definition.get_outgoing_flows(&self.gateway.base.id);
let mut selected_targets = Vec::new();
for flow in outgoing_flows {
if Some(&flow.id) == self.gateway.default_flow.as_ref() {
if selected_targets.is_empty() {
selected_targets.push(flow.target_ref.clone());
}
continue;
}
if let Some(condition) = &flow.condition_expression {
selected_targets.push(flow.target_ref.clone());
} else {
selected_targets.push(flow.target_ref.clone());
}
}
if selected_targets.is_empty() {
Err(ActivityError::ExecutionFailed(
"No outgoing flow selected from inclusive gateway".to_string(),
))
} else {
Ok(ActivityResult::Continue {
next_elements: selected_targets,
})
}
}
fn id(&self) -> &str {
&self.gateway.base.id
}
fn name(&self) -> Option<&str> {
self.gateway.base.name.as_deref()
}
}