1use crate::context::{SuspendState, WorkflowContext};
2use crate::error::{ErrorKind, WorkflowError, WorkflowResult};
3use crate::events::SharedEventBus;
4use crate::expression::{evaluate_value_expr, ExpressionEngine, ExpressionEngineRegistry};
5use crate::handler::{CallHandler, CustomTaskHandler, HandlerRegistry, RunHandler};
6use crate::json_schema::validate_schema;
7use crate::listener::{WorkflowEvent, WorkflowExecutionListener};
8use crate::secret::SecretManager;
9use crate::status::StatusPhase;
10use crate::task_runner::{TaskRunner, TaskSupport};
11use crate::tasks::DoTaskRunner;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::Duration;
17use swf_core::models::task::TaskDefinition;
18use swf_core::models::workflow::WorkflowDefinition;
19
20#[derive(Clone)]
25pub struct WorkflowHandle {
26 suspend_state: SuspendState,
27}
28
29pub struct ScheduledWorkflow {
33 join_handle: tokio::task::JoinHandle<()>,
34 cancel_tx: tokio::sync::watch::Sender<bool>,
35}
36
37impl ScheduledWorkflow {
38 pub fn cancel(&self) {
40 let _ = self.cancel_tx.send(true);
41 }
42
43 pub async fn join(self) {
45 let _ = self.join_handle.await;
46 }
47}
48
49impl WorkflowHandle {
50 pub fn suspend(&self) -> bool {
52 self.suspend_state.suspend()
53 }
54
55 pub fn resume(&self) -> bool {
57 self.suspend_state.resume()
58 }
59
60 pub fn is_suspended(&self) -> bool {
62 self.suspend_state.is_suspended()
63 }
64}
65
66pub struct WorkflowRunner {
68 workflow: WorkflowDefinition,
69 secret_manager: Option<Arc<dyn SecretManager>>,
70 listener: Option<Arc<dyn WorkflowExecutionListener>>,
71 event_bus: Option<SharedEventBus>,
72 sub_workflows: HashMap<String, WorkflowDefinition>,
73 functions: HashMap<String, TaskDefinition>,
74 handler_registry: HandlerRegistry,
75 expression_engines: ExpressionEngineRegistry,
76 custom_vars: HashMap<String, Value>,
77 suspend_state: SuspendState,
78}
79
80impl WorkflowRunner {
81 pub fn new(workflow: WorkflowDefinition) -> WorkflowResult<Self> {
83 Ok(Self {
84 workflow,
85 secret_manager: None,
86 listener: None,
87 event_bus: None,
88 sub_workflows: HashMap::new(),
89 functions: HashMap::new(),
90 handler_registry: HandlerRegistry::new(),
91 expression_engines: ExpressionEngineRegistry::new(),
92 custom_vars: HashMap::new(),
93 suspend_state: SuspendState::new(),
94 })
95 }
96
97 pub fn with_secret_manager(mut self, manager: Arc<dyn SecretManager>) -> Self {
99 self.secret_manager = Some(manager);
100 self
101 }
102
103 pub fn with_listener(mut self, listener: Arc<dyn WorkflowExecutionListener>) -> Self {
105 self.listener = Some(listener);
106 self
107 }
108
109 pub fn with_event_bus(mut self, bus: SharedEventBus) -> Self {
111 self.event_bus = Some(bus);
112 self
113 }
114
115 pub fn with_sub_workflow(mut self, workflow: WorkflowDefinition) -> Self {
118 let doc = &workflow.document;
119 let key = format!("{}/{}/{}", doc.namespace, doc.name, doc.version);
120 self.sub_workflows.insert(key, workflow);
121 self
122 }
123
124 pub fn with_call_handler(mut self, handler: Box<dyn CallHandler>) -> Self {
127 self.handler_registry.register_call_handler(handler);
128 self
129 }
130
131 pub fn with_run_handler(mut self, handler: Box<dyn RunHandler>) -> Self {
134 self.handler_registry.register_run_handler(handler);
135 self
136 }
137
138 pub fn with_function(mut self, name: &str, task: TaskDefinition) -> Self {
144 self.functions.insert(name.to_string(), task);
145 self
146 }
147
148 pub fn with_handler_registry(mut self, registry: HandlerRegistry) -> Self {
150 self.handler_registry = registry;
151 self
152 }
153
154 pub fn with_custom_task_handler(mut self, handler: Box<dyn CustomTaskHandler>) -> Self {
156 self.handler_registry.register_custom_task_handler(handler);
157 self
158 }
159
160 pub fn with_expression_engine(mut self, engine: Arc<dyn ExpressionEngine>) -> Self {
165 self.expression_engines.register(engine);
166 self
167 }
168
169 pub fn with_expression_engine_registry(mut self, registry: ExpressionEngineRegistry) -> Self {
172 self.expression_engines = registry;
173 self
174 }
175
176 pub fn with_variable(mut self, name: impl Into<String>, value: Value) -> Self {
194 self.custom_vars.insert(name.into(), value);
197 self
198 }
199
200 pub async fn run(&self, input: Value) -> WorkflowResult<Value> {
202 let span = tracing::info_span!(
203 "workflow",
204 name = %self.workflow.document.name,
205 version = %self.workflow.document.version,
206 );
207 let _enter = span.enter();
208
209 let mut context = WorkflowContext::new(&self.workflow)?;
210
211 if let Some(ref mgr) = self.secret_manager {
213 context.set_secret_manager(mgr.clone());
214 }
215
216 if let Some(ref listener) = self.listener {
218 context.set_listener(listener.clone());
219 }
220
221 if !self.sub_workflows.is_empty() {
223 context.set_sub_workflows(self.sub_workflows.clone());
224 }
225
226 if let Some(ref bus) = self.event_bus {
228 context.set_event_bus(bus.clone());
229 }
230
231 context.set_handler_registry(self.handler_registry.clone());
233
234 context.set_expression_engines(self.expression_engines.clone());
236
237 if !self.custom_vars.is_empty() {
239 context.add_local_expr_vars(self.custom_vars.clone());
240 }
241
242 if !self.functions.is_empty() {
244 context.set_functions(self.functions.clone());
245 }
246
247 context.set_suspend_state(self.suspend_state.clone());
249
250 let instance_id = context.instance_id().to_string();
251 tracing::info!(instance_id = %instance_id, "workflow started");
252
253 if let Some(ref schedule) = self.workflow.schedule {
255 if let Some(ref after_duration) = schedule.after {
256 let duration = crate::utils::duration_to_std(after_duration);
257 if !duration.is_zero() {
258 context.set_status(StatusPhase::Waiting);
259 tokio::time::sleep(duration).await;
260 }
261 }
262 }
263
264 let processed_input = self.process_input(&input, &context)?;
266
267 context.set_input(processed_input.clone());
268 context.set_raw_input(&input);
269 context.set_status(StatusPhase::Running);
270
271 context.emit_event(WorkflowEvent::WorkflowStarted {
272 instance_id: instance_id.clone(),
273 input: processed_input.clone(),
274 });
275
276 let do_runner = DoTaskRunner::new_from_workflow(&self.workflow)?;
278
279 let workflow_timeout = self.resolve_workflow_timeout(&processed_input, &context);
281
282 let mut support = TaskSupport::new(&self.workflow, &mut context);
283
284 let run_result = if let Some(timeout_duration) = workflow_timeout {
285 match tokio::time::timeout(
286 timeout_duration,
287 do_runner.run(processed_input, &mut support),
288 )
289 .await
290 {
291 Ok(result) => result,
292 Err(_) => {
293 support.context.cancel();
295 support.context.set_status(StatusPhase::Faulted);
296 support.context.emit_event(WorkflowEvent::WorkflowFailed {
297 instance_id: instance_id.clone(),
298 error: "workflow timed out".to_string(),
299 });
300 return Err(WorkflowError::timeout(
301 format!("workflow timed out after {:?}", timeout_duration),
302 &self.workflow.document.name,
303 ));
304 }
305 }
306 } else {
307 do_runner.run(processed_input, &mut support).await
308 };
309
310 let output = match run_result {
311 Ok(output) => output,
312 Err(e) => {
313 support.context.set_status(StatusPhase::Faulted);
314 support.context.emit_event(WorkflowEvent::WorkflowFailed {
315 instance_id: instance_id.clone(),
316 error: format!("{}", e),
317 });
318 tracing::error!(instance_id = %instance_id, error = %e, "workflow failed");
319 if e.kind() == ErrorKind::Runtime {
321 let reference = support.get_task_reference().unwrap_or("/");
322 return Err(e.with_instance(reference));
323 }
324 return Err(e);
325 }
326 };
327
328 support.context.clear_task_context();
329
330 let processed_output = support.process_task_output(
332 self.workflow.output.as_ref(),
333 &output,
334 &self.workflow.document.name,
335 )?;
336
337 support.context.set_output(processed_output.clone());
338 support.context.set_status(StatusPhase::Completed);
339
340 support
341 .context
342 .emit_event(WorkflowEvent::WorkflowCompleted {
343 instance_id: instance_id.clone(),
344 output: processed_output.clone(),
345 });
346
347 tracing::info!(instance_id = %instance_id, "workflow completed");
348
349 Ok(processed_output)
350 }
351
352 pub fn workflow(&self) -> &WorkflowDefinition {
354 &self.workflow
355 }
356
357 pub fn handle(&self) -> WorkflowHandle {
362 WorkflowHandle {
363 suspend_state: self.suspend_state.clone(),
364 }
365 }
366
367 pub fn schedule(self, input: Value) -> ScheduledWorkflow {
376 if let Some(ref schedule) = self.workflow.schedule {
377 if let Some(ref every_duration) = schedule.every {
378 let interval = crate::utils::duration_to_std(every_duration);
379 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
380 let join_handle = tokio::spawn(async move {
381 let mut interval_timer = tokio::time::interval(interval);
382 loop {
383 tokio::select! {
384 _ = interval_timer.tick() => {
385 let _ = self.run(input.clone()).await;
386 }
387 _ = cancel_rx.changed() => {
388 break;
389 }
390 }
391 }
392 });
393 return ScheduledWorkflow {
394 join_handle,
395 cancel_tx,
396 };
397 }
398 if let Some(ref cron_expr) = schedule.cron {
399 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
400 let cron_expr = cron_expr.clone();
401 let join_handle = tokio::spawn(async move {
402 let schedule = match cron::Schedule::from_str(&cron_expr) {
404 Ok(s) => s,
405 Err(e) => {
406 eprintln!("invalid cron expression '{}': {}", cron_expr, e);
407 return;
408 }
409 };
410 loop {
411 let next = schedule.upcoming(chrono::Utc).next();
412 let next = match next {
413 Some(t) => t,
414 None => break,
415 };
416 let delay = next - chrono::Utc::now();
417 let delay_std = delay.to_std().unwrap_or(Duration::ZERO);
418 if delay_std.is_zero() {
419 let _ = self.run(input.clone()).await;
420 continue;
421 }
422 tokio::select! {
423 _ = tokio::time::sleep(delay_std) => {
424 let _ = self.run(input.clone()).await;
425 }
426 _ = cancel_rx.changed() => {
427 break;
428 }
429 }
430 }
431 });
432 return ScheduledWorkflow {
433 join_handle,
434 cancel_tx,
435 };
436 }
437 }
439
440 let (cancel_tx, _) = tokio::sync::watch::channel(false);
442 let join_handle = tokio::spawn(async move {
443 let _ = self.run(input).await;
444 });
445 ScheduledWorkflow {
446 join_handle,
447 cancel_tx,
448 }
449 }
450
451 fn resolve_workflow_timeout(
453 &self,
454 input: &Value,
455 context: &WorkflowContext,
456 ) -> Option<Duration> {
457 let timeout_def = self.workflow.timeout.as_ref()?;
458 let vars = context.get_vars();
459 crate::utils::parse_duration_with_context(
460 timeout_def,
461 input,
462 &vars,
463 &self.workflow.document.name,
464 Some(&self.workflow),
465 )
466 .ok()
467 }
468
469 fn process_input(&self, input: &Value, context: &WorkflowContext) -> WorkflowResult<Value> {
471 let input_def = match &self.workflow.input {
472 Some(def) => def,
473 None => return Ok(input.clone()),
474 };
475
476 if let Some(ref schema) = input_def.schema {
478 validate_schema(input, schema, "/")?;
479 }
480
481 let vars = context.get_vars();
483 match input_def.from {
484 Some(ref from_val) => evaluate_value_expr(from_val, input, &vars, "/"),
485 None => Ok(input.clone()),
486 }
487 }
488}
489
490#[cfg(test)]
491#[allow(clippy::needless_borrow, clippy::unnecessary_to_owned, clippy::ptr_arg)]
492mod runner_tests;