use crate::context::{SuspendState, WorkflowContext};
use crate::error::{ErrorKind, WorkflowError, WorkflowResult};
use crate::events::SharedEventBus;
use crate::expression::evaluate_value_expr;
use crate::handler::{CallHandler, CustomTaskHandler, HandlerRegistry, RunHandler};
use crate::json_schema::validate_schema;
use crate::listener::{WorkflowEvent, WorkflowExecutionListener};
use crate::secret::SecretManager;
use crate::status::StatusPhase;
use crate::task_runner::{TaskRunner, TaskSupport};
use crate::tasks::DoTaskRunner;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use swf_core::models::task::TaskDefinition;
use swf_core::models::workflow::WorkflowDefinition;
#[derive(Clone)]
pub struct WorkflowHandle {
suspend_state: SuspendState,
}
pub struct ScheduledWorkflow {
join_handle: tokio::task::JoinHandle<()>,
cancel_tx: tokio::sync::watch::Sender<bool>,
}
impl ScheduledWorkflow {
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
pub async fn join(self) {
let _ = self.join_handle.await;
}
}
impl WorkflowHandle {
pub fn suspend(&self) -> bool {
self.suspend_state.suspend()
}
pub fn resume(&self) -> bool {
self.suspend_state.resume()
}
pub fn is_suspended(&self) -> bool {
self.suspend_state.is_suspended()
}
}
pub struct WorkflowRunner {
workflow: WorkflowDefinition,
secret_manager: Option<Arc<dyn SecretManager>>,
listener: Option<Arc<dyn WorkflowExecutionListener>>,
event_bus: Option<SharedEventBus>,
sub_workflows: HashMap<String, WorkflowDefinition>,
functions: HashMap<String, TaskDefinition>,
handler_registry: HandlerRegistry,
suspend_state: SuspendState,
}
impl WorkflowRunner {
pub fn new(workflow: WorkflowDefinition) -> WorkflowResult<Self> {
Ok(Self {
workflow,
secret_manager: None,
listener: None,
event_bus: None,
sub_workflows: HashMap::new(),
functions: HashMap::new(),
handler_registry: HandlerRegistry::new(),
suspend_state: SuspendState::new(),
})
}
pub fn with_secret_manager(mut self, manager: Arc<dyn SecretManager>) -> Self {
self.secret_manager = Some(manager);
self
}
pub fn with_listener(mut self, listener: Arc<dyn WorkflowExecutionListener>) -> Self {
self.listener = Some(listener);
self
}
pub fn with_event_bus(mut self, bus: SharedEventBus) -> Self {
self.event_bus = Some(bus);
self
}
pub fn with_sub_workflow(mut self, workflow: WorkflowDefinition) -> Self {
let doc = &workflow.document;
let key = format!("{}/{}/{}", doc.namespace, doc.name, doc.version);
self.sub_workflows.insert(key, workflow);
self
}
pub fn with_call_handler(mut self, handler: Box<dyn CallHandler>) -> Self {
self.handler_registry.register_call_handler(handler);
self
}
pub fn with_run_handler(mut self, handler: Box<dyn RunHandler>) -> Self {
self.handler_registry.register_run_handler(handler);
self
}
pub fn with_function(mut self, name: &str, task: TaskDefinition) -> Self {
self.functions.insert(name.to_string(), task);
self
}
pub fn with_handler_registry(mut self, registry: HandlerRegistry) -> Self {
self.handler_registry = registry;
self
}
pub fn with_custom_task_handler(mut self, handler: Box<dyn CustomTaskHandler>) -> Self {
self.handler_registry.register_custom_task_handler(handler);
self
}
pub async fn run(&self, input: Value) -> WorkflowResult<Value> {
let mut context = WorkflowContext::new(&self.workflow)?;
if let Some(ref mgr) = self.secret_manager {
context.set_secret_manager(mgr.clone());
}
if let Some(ref listener) = self.listener {
context.set_listener(listener.clone());
}
if !self.sub_workflows.is_empty() {
context.set_sub_workflows(self.sub_workflows.clone());
}
if let Some(ref bus) = self.event_bus {
context.set_event_bus(bus.clone());
}
context.set_handler_registry(self.handler_registry.clone());
if !self.functions.is_empty() {
context.set_functions(self.functions.clone());
}
context.set_suspend_state(self.suspend_state.clone());
let instance_id = context.instance_id().to_string();
if let Some(ref schedule) = self.workflow.schedule {
if let Some(ref after_duration) = schedule.after {
let duration = crate::utils::duration_to_std(after_duration);
if !duration.is_zero() {
context.set_status(StatusPhase::Waiting);
tokio::time::sleep(duration).await;
}
}
}
let processed_input = self.process_input(&input, &context)?;
context.set_input(processed_input.clone());
context.set_raw_input(&input);
context.set_status(StatusPhase::Running);
context.emit_event(WorkflowEvent::WorkflowStarted {
instance_id: instance_id.clone(),
input: processed_input.clone(),
});
let do_runner = DoTaskRunner::new_from_workflow(&self.workflow)?;
let workflow_timeout = self.resolve_workflow_timeout(&processed_input, &context);
let mut support = TaskSupport::new(&self.workflow, &mut context);
let run_result = if let Some(timeout_duration) = workflow_timeout {
match tokio::time::timeout(
timeout_duration,
do_runner.run(processed_input, &mut support),
)
.await
{
Ok(result) => result,
Err(_) => {
support.context.cancel();
support.context.set_status(StatusPhase::Faulted);
support.context.emit_event(WorkflowEvent::WorkflowFailed {
instance_id: instance_id.clone(),
error: "workflow timed out".to_string(),
});
return Err(WorkflowError::timeout(
format!("workflow timed out after {:?}", timeout_duration),
&self.workflow.document.name,
));
}
}
} else {
do_runner.run(processed_input, &mut support).await
};
let output = match run_result {
Ok(output) => output,
Err(e) => {
support.context.set_status(StatusPhase::Faulted);
support.context.emit_event(WorkflowEvent::WorkflowFailed {
instance_id: instance_id.clone(),
error: format!("{}", e),
});
if e.kind() == ErrorKind::Runtime {
let reference = support.get_task_reference().unwrap_or("/");
return Err(e.with_instance(reference));
}
return Err(e);
}
};
support.context.clear_task_context();
let processed_output = support.process_task_output(
self.workflow.output.as_ref(),
&output,
&self.workflow.document.name,
)?;
support.context.set_output(processed_output.clone());
support.context.set_status(StatusPhase::Completed);
support
.context
.emit_event(WorkflowEvent::WorkflowCompleted {
instance_id: instance_id.clone(),
output: processed_output.clone(),
});
Ok(processed_output)
}
pub fn workflow(&self) -> &WorkflowDefinition {
&self.workflow
}
pub fn handle(&self) -> WorkflowHandle {
WorkflowHandle {
suspend_state: self.suspend_state.clone(),
}
}
pub fn schedule(self, input: Value) -> ScheduledWorkflow {
if let Some(ref schedule) = self.workflow.schedule {
if let Some(ref every_duration) = schedule.every {
let interval = crate::utils::duration_to_std(every_duration);
let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
let join_handle = tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
tokio::select! {
_ = interval_timer.tick() => {
let _ = self.run(input.clone()).await;
}
_ = cancel_rx.changed() => {
break;
}
}
}
});
return ScheduledWorkflow {
join_handle,
cancel_tx,
};
}
}
let (cancel_tx, _) = tokio::sync::watch::channel(false);
let join_handle = tokio::spawn(async move {
let _ = self.run(input).await;
});
ScheduledWorkflow {
join_handle,
cancel_tx,
}
}
fn resolve_workflow_timeout(
&self,
input: &Value,
context: &WorkflowContext,
) -> Option<Duration> {
let timeout_def = self.workflow.timeout.as_ref()?;
let vars = context.get_vars();
crate::utils::parse_duration_with_context(
timeout_def,
input,
&vars,
&self.workflow.document.name,
Some(&self.workflow),
)
.ok()
}
fn process_input(&self, input: &Value, context: &WorkflowContext) -> WorkflowResult<Value> {
let input_def = match &self.workflow.input {
Some(def) => def,
None => return Ok(input.clone()),
};
if let Some(ref schema) = input_def.schema {
validate_schema(input, schema, "/")?;
}
let vars = context.get_vars();
match input_def.from {
Some(ref from_val) => evaluate_value_expr(from_val, input, &vars, "/"),
None => Ok(input.clone()),
}
}
}
#[cfg(test)]
#[allow(clippy::needless_borrow, clippy::unnecessary_to_owned, clippy::ptr_arg)]
mod runner_tests;