1use crate::context::{SuspendState, WorkflowContext};
2use crate::error::{ErrorKind, WorkflowError, WorkflowResult};
3use crate::events::SharedEventBus;
4use crate::expression::evaluate_value_expr;
5use crate::handler::{CallHandler, CustomTaskHandler, HandlerRegistry, RunHandler};
6use crate::json_schema::validate_schema;
7use crate::listener::{WorkflowEvent, WorkflowExecutionListener};
8use crate::secret::SecretManager;
9use crate::status::StatusPhase;
10use crate::task_runner::{TaskRunner, TaskSupport};
11use crate::tasks::DoTaskRunner;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use swf_core::models::task::TaskDefinition;
17use swf_core::models::workflow::WorkflowDefinition;
18
19#[derive(Clone)]
24pub struct WorkflowHandle {
25 suspend_state: SuspendState,
26}
27
28pub struct ScheduledWorkflow {
32 join_handle: tokio::task::JoinHandle<()>,
33 cancel_tx: tokio::sync::watch::Sender<bool>,
34}
35
36impl ScheduledWorkflow {
37 pub fn cancel(&self) {
39 let _ = self.cancel_tx.send(true);
40 }
41
42 pub async fn join(self) {
44 let _ = self.join_handle.await;
45 }
46}
47
48impl WorkflowHandle {
49 pub fn suspend(&self) -> bool {
51 self.suspend_state.suspend()
52 }
53
54 pub fn resume(&self) -> bool {
56 self.suspend_state.resume()
57 }
58
59 pub fn is_suspended(&self) -> bool {
61 self.suspend_state.is_suspended()
62 }
63}
64
65pub struct WorkflowRunner {
67 workflow: WorkflowDefinition,
68 secret_manager: Option<Arc<dyn SecretManager>>,
69 listener: Option<Arc<dyn WorkflowExecutionListener>>,
70 event_bus: Option<SharedEventBus>,
71 sub_workflows: HashMap<String, WorkflowDefinition>,
72 functions: HashMap<String, TaskDefinition>,
74 handler_registry: HandlerRegistry,
75 suspend_state: SuspendState,
77}
78
79impl WorkflowRunner {
80 pub fn new(workflow: WorkflowDefinition) -> WorkflowResult<Self> {
82 Ok(Self {
83 workflow,
84 secret_manager: None,
85 listener: None,
86 event_bus: None,
87 sub_workflows: HashMap::new(),
88 functions: HashMap::new(),
89 handler_registry: HandlerRegistry::new(),
90 suspend_state: SuspendState::new(),
91 })
92 }
93
94 pub fn with_secret_manager(mut self, manager: Arc<dyn SecretManager>) -> Self {
96 self.secret_manager = Some(manager);
97 self
98 }
99
100 pub fn with_listener(mut self, listener: Arc<dyn WorkflowExecutionListener>) -> Self {
102 self.listener = Some(listener);
103 self
104 }
105
106 pub fn with_event_bus(mut self, bus: SharedEventBus) -> Self {
108 self.event_bus = Some(bus);
109 self
110 }
111
112 pub fn with_sub_workflow(mut self, workflow: WorkflowDefinition) -> Self {
115 let doc = &workflow.document;
116 let key = format!("{}/{}/{}", doc.namespace, doc.name, doc.version);
117 self.sub_workflows.insert(key, workflow);
118 self
119 }
120
121 pub fn with_call_handler(mut self, handler: Box<dyn CallHandler>) -> Self {
124 self.handler_registry.register_call_handler(handler);
125 self
126 }
127
128 pub fn with_run_handler(mut self, handler: Box<dyn RunHandler>) -> Self {
131 self.handler_registry.register_run_handler(handler);
132 self
133 }
134
135 pub fn with_function(mut self, name: &str, task: TaskDefinition) -> Self {
141 self.functions.insert(name.to_string(), task);
142 self
143 }
144
145 pub fn with_handler_registry(mut self, registry: HandlerRegistry) -> Self {
147 self.handler_registry = registry;
148 self
149 }
150
151 pub fn with_custom_task_handler(mut self, handler: Box<dyn CustomTaskHandler>) -> Self {
153 self.handler_registry.register_custom_task_handler(handler);
154 self
155 }
156
157 pub async fn run(&self, input: Value) -> WorkflowResult<Value> {
159 let mut context = WorkflowContext::new(&self.workflow)?;
160
161 if let Some(ref mgr) = self.secret_manager {
163 context.set_secret_manager(mgr.clone());
164 }
165
166 if let Some(ref listener) = self.listener {
168 context.set_listener(listener.clone());
169 }
170
171 if !self.sub_workflows.is_empty() {
173 context.set_sub_workflows(self.sub_workflows.clone());
174 }
175
176 if let Some(ref bus) = self.event_bus {
178 context.set_event_bus(bus.clone());
179 }
180
181 context.set_handler_registry(self.handler_registry.clone());
183
184 if !self.functions.is_empty() {
186 context.set_functions(self.functions.clone());
187 }
188
189 context.set_suspend_state(self.suspend_state.clone());
191
192 let instance_id = context.instance_id().to_string();
193
194 if let Some(ref schedule) = self.workflow.schedule {
196 if let Some(ref after_duration) = schedule.after {
197 let duration = crate::utils::duration_to_std(after_duration);
198 if !duration.is_zero() {
199 context.set_status(StatusPhase::Waiting);
200 tokio::time::sleep(duration).await;
201 }
202 }
203 }
204
205 let processed_input = self.process_input(&input, &context)?;
207
208 context.set_input(processed_input.clone());
209 context.set_raw_input(&input);
210 context.set_status(StatusPhase::Running);
211
212 context.emit_event(WorkflowEvent::WorkflowStarted {
213 instance_id: instance_id.clone(),
214 input: processed_input.clone(),
215 });
216
217 let do_runner = DoTaskRunner::new_from_workflow(&self.workflow)?;
219
220 let workflow_timeout = self.resolve_workflow_timeout(&processed_input, &context);
222
223 let mut support = TaskSupport::new(&self.workflow, &mut context);
224
225 let run_result = if let Some(timeout_duration) = workflow_timeout {
226 match tokio::time::timeout(
227 timeout_duration,
228 do_runner.run(processed_input, &mut support),
229 )
230 .await
231 {
232 Ok(result) => result,
233 Err(_) => {
234 support.context.cancel();
236 support.context.set_status(StatusPhase::Faulted);
237 support.context.emit_event(WorkflowEvent::WorkflowFailed {
238 instance_id: instance_id.clone(),
239 error: "workflow timed out".to_string(),
240 });
241 return Err(WorkflowError::timeout(
242 format!("workflow timed out after {:?}", timeout_duration),
243 &self.workflow.document.name,
244 ));
245 }
246 }
247 } else {
248 do_runner.run(processed_input, &mut support).await
249 };
250
251 let output = match run_result {
252 Ok(output) => output,
253 Err(e) => {
254 support.context.set_status(StatusPhase::Faulted);
255 support.context.emit_event(WorkflowEvent::WorkflowFailed {
256 instance_id: instance_id.clone(),
257 error: format!("{}", e),
258 });
259 if e.kind() == ErrorKind::Runtime {
261 let reference = support.get_task_reference().unwrap_or("/");
262 return Err(e.with_instance(reference));
263 }
264 return Err(e);
265 }
266 };
267
268 support.context.clear_task_context();
269
270 let processed_output = support.process_task_output(
272 self.workflow.output.as_ref(),
273 &output,
274 &self.workflow.document.name,
275 )?;
276
277 support.context.set_output(processed_output.clone());
278 support.context.set_status(StatusPhase::Completed);
279
280 support
281 .context
282 .emit_event(WorkflowEvent::WorkflowCompleted {
283 instance_id: instance_id.clone(),
284 output: processed_output.clone(),
285 });
286
287 Ok(processed_output)
288 }
289
290 pub fn workflow(&self) -> &WorkflowDefinition {
292 &self.workflow
293 }
294
295 pub fn handle(&self) -> WorkflowHandle {
300 WorkflowHandle {
301 suspend_state: self.suspend_state.clone(),
302 }
303 }
304
305 pub fn schedule(self, input: Value) -> ScheduledWorkflow {
314 if let Some(ref schedule) = self.workflow.schedule {
315 if let Some(ref every_duration) = schedule.every {
316 let interval = crate::utils::duration_to_std(every_duration);
317 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
318 let join_handle = tokio::spawn(async move {
319 let mut interval_timer = tokio::time::interval(interval);
320 loop {
321 tokio::select! {
322 _ = interval_timer.tick() => {
323 let _ = self.run(input.clone()).await;
324 }
325 _ = cancel_rx.changed() => {
326 break;
327 }
328 }
329 }
330 });
331 return ScheduledWorkflow {
332 join_handle,
333 cancel_tx,
334 };
335 }
336 }
338
339 let (cancel_tx, _) = tokio::sync::watch::channel(false);
341 let join_handle = tokio::spawn(async move {
342 let _ = self.run(input).await;
343 });
344 ScheduledWorkflow {
345 join_handle,
346 cancel_tx,
347 }
348 }
349
350 fn resolve_workflow_timeout(
352 &self,
353 input: &Value,
354 context: &WorkflowContext,
355 ) -> Option<Duration> {
356 let timeout_def = self.workflow.timeout.as_ref()?;
357 let vars = context.get_vars();
358 crate::utils::parse_duration_with_context(
359 timeout_def,
360 input,
361 &vars,
362 &self.workflow.document.name,
363 Some(&self.workflow),
364 )
365 .ok()
366 }
367
368 fn process_input(&self, input: &Value, context: &WorkflowContext) -> WorkflowResult<Value> {
370 let input_def = match &self.workflow.input {
371 Some(def) => def,
372 None => return Ok(input.clone()),
373 };
374
375 if let Some(ref schema) = input_def.schema {
377 validate_schema(input, schema, "/")?;
378 }
379
380 let vars = context.get_vars();
382 match input_def.from {
383 Some(ref from_val) => evaluate_value_expr(from_val, input, &vars, "/"),
384 None => Ok(input.clone()),
385 }
386 }
387}
388
389#[cfg(test)]
390#[allow(clippy::needless_borrow, clippy::unnecessary_to_owned, clippy::ptr_arg)]
391mod runner_tests;