use crate::context::WorkflowContext;
use crate::error::{WorkflowError, WorkflowResult};
use crate::expression::{traverse_and_evaluate, traverse_and_evaluate_obj};
use crate::handler::HandlerRegistry;
use crate::json_schema::validate_schema;
use crate::listener::WorkflowEvent;
use crate::status::StatusPhase;
use crate::tasks::*;
use serde_json::Value;
use std::collections::HashMap;
use swf_core::models::input::InputDataModelDefinition;
use swf_core::models::output::OutputDataModelDefinition;
use swf_core::models::task::{TaskDefinition, TaskDefinitionFields};
use swf_core::models::workflow::WorkflowDefinition;
pub struct OwnedTaskSupport {
pub workflow: WorkflowDefinition,
pub context: WorkflowContext,
}
impl OwnedTaskSupport {
pub fn from_support(support: &TaskSupport<'_>) -> Self {
Self {
workflow: support.workflow.clone(),
context: support.context.clone(),
}
}
pub fn as_task_support(&mut self) -> TaskSupport<'_> {
TaskSupport::new(&self.workflow, &mut self.context)
}
}
#[async_trait::async_trait]
pub trait TaskRunner: Send + Sync {
async fn run(&self, input: Value, support: &mut TaskSupport<'_>) -> WorkflowResult<Value>;
fn task_name(&self) -> &str;
}
pub struct TaskSupport<'a> {
pub workflow: &'a WorkflowDefinition,
pub context: &'a mut WorkflowContext,
}
impl<'a> TaskSupport<'a> {
pub fn new(workflow: &'a WorkflowDefinition, context: &'a mut WorkflowContext) -> Self {
Self { workflow, context }
}
pub fn set_task_status(&mut self, task: &str, status: StatusPhase) {
self.context.set_task_status(task, status);
}
pub fn set_task_name(&mut self, name: &str) {
self.context.set_task_name(name);
}
pub fn set_task_raw_input(&mut self, input: &Value) {
self.context.set_task_raw_input(input);
}
pub fn set_task_raw_output(&mut self, output: &Value) {
self.context.set_task_raw_output(output);
}
pub fn set_task_def(&mut self, task: &Value) {
self.context.set_task_def(task);
}
pub fn set_task_started_at(&mut self) {
self.context.set_task_started_at();
}
pub fn set_task_reference_from_name(&mut self, name: &str) -> WorkflowResult<()> {
let reference = self
.context
.get_workflow_json()
.and_then(|json| crate::json_pointer::generate_json_pointer_from_value(json, name).ok())
.unwrap_or_else(|| format!("/{}", name));
self.context.set_task_reference(&reference);
Ok(())
}
pub fn inc_iteration(&mut self, task_name: &str) -> u32 {
self.context.inc_iteration(task_name)
}
pub fn set_retry_attempt(&mut self, attempt: u32) {
self.context.set_retry_attempt(attempt)
}
pub fn get_task_reference(&self) -> Option<&str> {
self.context.get_task_reference()
}
pub fn add_local_expr_vars(&mut self, vars: HashMap<String, Value>) {
self.context.add_local_expr_vars(vars);
}
pub fn remove_local_expr_vars(&mut self, keys: &[&str]) {
self.context.remove_local_expr_vars(keys);
}
pub fn emit_event(&self, event: WorkflowEvent) {
self.context.emit_event(event);
}
pub fn set_instance_ctx(&mut self, value: Value) {
self.context.set_instance_ctx(value);
}
pub fn get_vars(&self) -> HashMap<String, Value> {
self.context.get_vars()
}
pub fn eval_jq(&self, expr: &str, input: &Value, task_name: &str) -> WorkflowResult<Value> {
let vars = self.get_vars();
crate::expression::evaluate_jq(expr, input, &vars)
.map_err(|e| crate::error::WorkflowError::expression(format!("{}", e), task_name))
}
pub fn eval_jq_expr(
&self,
raw_expr: &str,
input: &Value,
task_name: &str,
) -> WorkflowResult<Value> {
let sanitized = crate::expression::prepare_expression(raw_expr);
self.eval_jq(&sanitized, input, task_name)
}
pub fn eval_bool(&self, expr: &str, input: &Value) -> WorkflowResult<bool> {
let vars = self.get_vars();
crate::expression::traverse_and_evaluate_bool(expr, input, &vars)
}
pub fn eval_str(&self, expr: &str, input: &Value, task_name: &str) -> WorkflowResult<String> {
let vars = self.get_vars();
crate::expression::evaluate_expression_str(expr, input, &vars, task_name)
}
pub fn eval_traverse(&self, node: &mut Value, input: &Value) -> WorkflowResult<()> {
let vars = self.get_vars();
traverse_and_evaluate(node, input, &vars)
}
pub fn eval_obj(
&self,
from: Option<&Value>,
input: &Value,
task_name: &str,
) -> WorkflowResult<Value> {
let vars = self.get_vars();
traverse_and_evaluate_obj(from, input, &vars, task_name)
}
pub fn eval_duration(
&self,
expr: &swf_core::models::duration::OneOfDurationOrIso8601Expression,
input: &Value,
task_name: &str,
) -> WorkflowResult<std::time::Duration> {
let vars = self.get_vars();
crate::utils::resolve_duration_expr(expr, input, &vars, task_name)
}
pub fn get_handler_registry(&self) -> &HandlerRegistry {
self.context.get_handler_registry()
}
pub fn clone_event_bus(&self) -> Option<crate::events::SharedEventBus> {
self.context.clone_event_bus()
}
pub fn should_run_task(
&self,
if_condition: Option<&str>,
input: &Value,
) -> WorkflowResult<bool> {
match if_condition {
None => Ok(true),
Some(condition) => self.eval_bool(condition, input),
}
}
pub fn process_task_input(
&self,
input_def: Option<&InputDataModelDefinition>,
input: &Value,
task_name: &str,
) -> WorkflowResult<Value> {
let input_def = match input_def {
Some(def) => def,
None => return Ok(input.clone()),
};
if let Some(ref schema) = input_def.schema {
validate_schema(input, schema, task_name)?;
}
match input_def.from {
Some(ref from_val) => {
crate::expression::evaluate_value_expr(from_val, input, &self.get_vars(), task_name)
}
None => Ok(input.clone()),
}
}
fn process_task_output_with_vars(
&self,
output_def: Option<&OutputDataModelDefinition>,
output: &Value,
task_name: &str,
vars: &HashMap<String, Value>,
) -> WorkflowResult<Value> {
let output_def = match output_def {
Some(def) => def,
None => return Ok(output.clone()),
};
let result = match output_def.as_ {
Some(ref as_val) => {
crate::expression::evaluate_value_expr(as_val, output, vars, task_name)?
}
None => output.clone(),
};
if let Some(ref schema) = output_def.schema {
validate_schema(&result, schema, task_name)?;
}
Ok(result)
}
pub fn process_task_output(
&self,
output_def: Option<&OutputDataModelDefinition>,
output: &Value,
task_name: &str,
) -> WorkflowResult<Value> {
let vars = self.get_vars();
self.process_task_output_with_vars(output_def, output, task_name, &vars)
}
pub fn process_task_export(
&mut self,
export_def: Option<&OutputDataModelDefinition>,
output: &Value,
task_name: &str,
) -> WorkflowResult<()> {
if export_def.is_none() {
return Ok(());
}
let result = self.process_task_output(export_def, output, task_name)?;
self.set_instance_ctx(result);
Ok(())
}
pub async fn execute_task_lifecycle(
&mut self,
task_name: &str,
common: &TaskDefinitionFields,
_input: &Value,
raw_output: Value,
) -> WorkflowResult<Value> {
self.set_task_raw_output(&raw_output);
let vars = self.get_vars();
let output = self.process_task_output_with_vars(
common.output.as_ref(),
&raw_output,
task_name,
&vars,
)?;
if common.export.is_some() {
let export_result = self.process_task_output_with_vars(
common.export.as_ref(),
&output,
task_name,
&vars,
)?;
self.set_instance_ctx(export_result);
}
self.context.clear_authorization();
self.emit_event(WorkflowEvent::TaskCompleted {
instance_id: self.context.instance_id().to_string(),
task_name: task_name.to_string(),
output: output.clone(),
});
Ok(output)
}
pub async fn run_task_with_input_and_timeout(
&mut self,
task_name: &str,
common: &TaskDefinitionFields,
input: &Value,
runner: &dyn TaskRunner,
) -> WorkflowResult<Value> {
self.set_task_started_at();
self.set_task_raw_input(input);
self.set_task_name(task_name);
self.inc_iteration(task_name);
self.emit_event(WorkflowEvent::TaskStarted {
instance_id: self.context.instance_id().to_string(),
task_name: task_name.to_string(),
});
let task_input = self.process_task_input(common.input.as_ref(), input, task_name)?;
if let Some(timeout) = common.timeout.as_ref() {
let vars = self.get_vars();
let duration = crate::utils::parse_duration_with_context(
timeout,
&task_input,
&vars,
task_name,
Some(self.workflow),
)?;
match tokio::time::timeout(duration, runner.run(task_input, self)).await {
Ok(result) => result,
Err(_) => Err(WorkflowError::timeout(
format!("task '{}' timed out after {:?}", task_name, duration),
task_name,
)),
}
} else {
runner.run(task_input, self).await
}
}
}
pub fn create_task_runner(
name: &str,
task: &TaskDefinition,
workflow: &WorkflowDefinition,
) -> WorkflowResult<Box<dyn TaskRunner>> {
match task {
TaskDefinition::Do(t) => Ok(Box::new(DoTaskRunner::new(name, t)?)),
TaskDefinition::Set(t) => Ok(Box::new(SetTaskRunner::new(name, t)?)),
TaskDefinition::Wait(t) => Ok(Box::new(WaitTaskRunner::new(name, t)?)),
TaskDefinition::Raise(t) => Ok(Box::new(RaiseTaskRunner::new(name, t, workflow)?)),
TaskDefinition::For(t) => Ok(Box::new(ForTaskRunner::new(name, t)?)),
TaskDefinition::Switch(t) => Ok(Box::new(SwitchTaskRunner::new(name, t)?)),
TaskDefinition::Fork(t) => Ok(Box::new(ForkTaskRunner::new(name, t, workflow)?)),
TaskDefinition::Try(t) => Ok(Box::new(TryTaskRunner::new(name, t, workflow)?)),
TaskDefinition::Emit(t) => Ok(Box::new(EmitTaskRunner::new(name, t)?)),
TaskDefinition::Listen(t) => Ok(Box::new(ListenTaskRunner::new(name, t)?)),
TaskDefinition::Call(t) => Ok(Box::new(CallTaskRunner::new(name, t)?)),
TaskDefinition::Run(t) => Ok(Box::new(RunTaskRunner::new(name, t)?)),
TaskDefinition::Custom(t) => Ok(Box::new(CustomTaskRunner::new(name, t)?)),
}
}