Skip to main content

swf_runtime/runner/
mod.rs

1use crate::context::{SuspendState, WorkflowContext};
2use crate::error::{ErrorKind, WorkflowError, WorkflowResult};
3use crate::events::SharedEventBus;
4use crate::expression::evaluate_value_expr;
5use crate::handler::{CallHandler, CustomTaskHandler, HandlerRegistry, RunHandler};
6use crate::json_schema::validate_schema;
7use crate::listener::{WorkflowEvent, WorkflowExecutionListener};
8use crate::secret::SecretManager;
9use crate::status::StatusPhase;
10use crate::task_runner::{TaskRunner, TaskSupport};
11use crate::tasks::DoTaskRunner;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use swf_core::models::task::TaskDefinition;
17use swf_core::models::workflow::WorkflowDefinition;
18
19/// A handle to control a running workflow from another task/thread
20///
21/// Allows suspending and resuming a workflow execution externally.
22/// Obtain via `WorkflowRunner::handle()` before calling `run()`.
23#[derive(Clone)]
24pub struct WorkflowHandle {
25    suspend_state: SuspendState,
26}
27
28/// A handle to a scheduled (recurring) workflow execution
29///
30/// Can be cancelled to stop the recurring schedule.
31pub struct ScheduledWorkflow {
32    join_handle: tokio::task::JoinHandle<()>,
33    cancel_tx: tokio::sync::watch::Sender<bool>,
34}
35
36impl ScheduledWorkflow {
37    /// Cancels the scheduled workflow execution
38    pub fn cancel(&self) {
39        let _ = self.cancel_tx.send(true);
40    }
41
42    /// Waits for the scheduled workflow to complete (after cancellation)
43    pub async fn join(self) {
44        let _ = self.join_handle.await;
45    }
46}
47
48impl WorkflowHandle {
49    /// Suspends the workflow. Returns true if suspended, false if already suspended.
50    pub fn suspend(&self) -> bool {
51        self.suspend_state.suspend()
52    }
53
54    /// Resumes a suspended workflow. Returns true if resumed, false if not suspended.
55    pub fn resume(&self) -> bool {
56        self.suspend_state.resume()
57    }
58
59    /// Checks if the workflow is currently suspended
60    pub fn is_suspended(&self) -> bool {
61        self.suspend_state.is_suspended()
62    }
63}
64
65/// The main workflow runner that executes workflow definitions
66pub struct WorkflowRunner {
67    workflow: WorkflowDefinition,
68    secret_manager: Option<Arc<dyn SecretManager>>,
69    listener: Option<Arc<dyn WorkflowExecutionListener>>,
70    event_bus: Option<SharedEventBus>,
71    sub_workflows: HashMap<String, WorkflowDefinition>,
72    /// External function definitions registered via with_function()
73    functions: HashMap<String, TaskDefinition>,
74    handler_registry: HandlerRegistry,
75    /// Shared suspend/resume state (cloned into WorkflowContext during run())
76    suspend_state: SuspendState,
77}
78
79impl WorkflowRunner {
80    /// Creates a new WorkflowRunner for the given workflow definition
81    pub fn new(workflow: WorkflowDefinition) -> WorkflowResult<Self> {
82        Ok(Self {
83            workflow,
84            secret_manager: None,
85            listener: None,
86            event_bus: None,
87            sub_workflows: HashMap::new(),
88            functions: HashMap::new(),
89            handler_registry: HandlerRegistry::new(),
90            suspend_state: SuspendState::new(),
91        })
92    }
93
94    /// Sets the secret manager for $secret expression variable
95    pub fn with_secret_manager(mut self, manager: Arc<dyn SecretManager>) -> Self {
96        self.secret_manager = Some(manager);
97        self
98    }
99
100    /// Sets the execution listener for workflow/task events
101    pub fn with_listener(mut self, listener: Arc<dyn WorkflowExecutionListener>) -> Self {
102        self.listener = Some(listener);
103        self
104    }
105
106    /// Sets the event bus for emit/listen tasks
107    pub fn with_event_bus(mut self, bus: SharedEventBus) -> Self {
108        self.event_bus = Some(bus);
109        self
110    }
111
112    /// Registers a sub-workflow that can be invoked via `run: workflow`
113    /// Keyed by "namespace/name/version"
114    pub fn with_sub_workflow(mut self, workflow: WorkflowDefinition) -> Self {
115        let doc = &workflow.document;
116        let key = format!("{}/{}/{}", doc.namespace, doc.name, doc.version);
117        self.sub_workflows.insert(key, workflow);
118        self
119    }
120
121    /// Registers a custom call handler for a specific call type
122    /// (e.g., "grpc", "openapi", "asyncapi", "a2a")
123    pub fn with_call_handler(mut self, handler: Box<dyn CallHandler>) -> Self {
124        self.handler_registry.register_call_handler(handler);
125        self
126    }
127
128    /// Registers a custom run handler for a specific run type
129    /// (e.g., "container", "script")
130    pub fn with_run_handler(mut self, handler: Box<dyn RunHandler>) -> Self {
131        self.handler_registry.register_run_handler(handler);
132        self
133    }
134
135    /// Registers a named function definition for call.function resolution
136    ///
137    /// This allows registering external function definitions that can be
138    /// referenced by `call: <functionName>` in workflows, similar to
139    /// Java SDK's cataloged function mechanism.
140    pub fn with_function(mut self, name: &str, task: TaskDefinition) -> Self {
141        self.functions.insert(name.to_string(), task);
142        self
143    }
144
145    /// Sets the entire handler registry (used for propagating handlers to child runners)
146    pub fn with_handler_registry(mut self, registry: HandlerRegistry) -> Self {
147        self.handler_registry = registry;
148        self
149    }
150
151    /// Registers a custom task handler for a specific custom task type
152    pub fn with_custom_task_handler(mut self, handler: Box<dyn CustomTaskHandler>) -> Self {
153        self.handler_registry.register_custom_task_handler(handler);
154        self
155    }
156
157    /// Runs the workflow with the given input and returns the output
158    pub async fn run(&self, input: Value) -> WorkflowResult<Value> {
159        let mut context = WorkflowContext::new(&self.workflow)?;
160
161        // Set secret manager if configured
162        if let Some(ref mgr) = self.secret_manager {
163            context.set_secret_manager(mgr.clone());
164        }
165
166        // Set listener if configured
167        if let Some(ref listener) = self.listener {
168            context.set_listener(listener.clone());
169        }
170
171        // Set sub-workflow registry
172        if !self.sub_workflows.is_empty() {
173            context.set_sub_workflows(self.sub_workflows.clone());
174        }
175
176        // Set event bus if configured
177        if let Some(ref bus) = self.event_bus {
178            context.set_event_bus(bus.clone());
179        }
180
181        // Set handler registry
182        context.set_handler_registry(self.handler_registry.clone());
183
184        // Set registered function definitions
185        if !self.functions.is_empty() {
186            context.set_functions(self.functions.clone());
187        }
188
189        // Share suspend/resume state with context
190        context.set_suspend_state(self.suspend_state.clone());
191
192        let instance_id = context.instance_id().to_string();
193
194        // Handle schedule:after — delay before starting
195        if let Some(ref schedule) = self.workflow.schedule {
196            if let Some(ref after_duration) = schedule.after {
197                let duration = crate::utils::duration_to_std(after_duration);
198                if !duration.is_zero() {
199                    context.set_status(StatusPhase::Waiting);
200                    tokio::time::sleep(duration).await;
201                }
202            }
203        }
204
205        // Process input
206        let processed_input = self.process_input(&input, &context)?;
207
208        context.set_input(processed_input.clone());
209        context.set_raw_input(&input);
210        context.set_status(StatusPhase::Running);
211
212        context.emit_event(WorkflowEvent::WorkflowStarted {
213            instance_id: instance_id.clone(),
214            input: processed_input.clone(),
215        });
216
217        // Run the top-level do tasks (with optional workflow timeout)
218        let do_runner = DoTaskRunner::new_from_workflow(&self.workflow)?;
219
220        // Resolve workflow timeout
221        let workflow_timeout = self.resolve_workflow_timeout(&processed_input, &context);
222
223        let mut support = TaskSupport::new(&self.workflow, &mut context);
224
225        let run_result = if let Some(timeout_duration) = workflow_timeout {
226            match tokio::time::timeout(
227                timeout_duration,
228                do_runner.run(processed_input, &mut support),
229            )
230            .await
231            {
232                Ok(result) => result,
233                Err(_) => {
234                    // Cancel the context so any running wait points respond immediately
235                    support.context.cancel();
236                    support.context.set_status(StatusPhase::Faulted);
237                    support.context.emit_event(WorkflowEvent::WorkflowFailed {
238                        instance_id: instance_id.clone(),
239                        error: "workflow timed out".to_string(),
240                    });
241                    return Err(WorkflowError::timeout(
242                        format!("workflow timed out after {:?}", timeout_duration),
243                        &self.workflow.document.name,
244                    ));
245                }
246            }
247        } else {
248            do_runner.run(processed_input, &mut support).await
249        };
250
251        let output = match run_result {
252            Ok(output) => output,
253            Err(e) => {
254                support.context.set_status(StatusPhase::Faulted);
255                support.context.emit_event(WorkflowEvent::WorkflowFailed {
256                    instance_id: instance_id.clone(),
257                    error: format!("{}", e),
258                });
259                // Only set instance on Runtime errors, preserve error type for others
260                if e.kind() == ErrorKind::Runtime {
261                    let reference = support.get_task_reference().unwrap_or("/");
262                    return Err(e.with_instance(reference));
263                }
264                return Err(e);
265            }
266        };
267
268        support.context.clear_task_context();
269
270        // Process output using TaskSupport (reuses shared output processing logic)
271        let processed_output = support.process_task_output(
272            self.workflow.output.as_ref(),
273            &output,
274            &self.workflow.document.name,
275        )?;
276
277        support.context.set_output(processed_output.clone());
278        support.context.set_status(StatusPhase::Completed);
279
280        support
281            .context
282            .emit_event(WorkflowEvent::WorkflowCompleted {
283                instance_id: instance_id.clone(),
284                output: processed_output.clone(),
285            });
286
287        Ok(processed_output)
288    }
289
290    /// Returns a reference to the workflow definition
291    pub fn workflow(&self) -> &WorkflowDefinition {
292        &self.workflow
293    }
294
295    /// Returns a WorkflowHandle that can suspend/resume the running workflow
296    ///
297    /// Must be called before `run()`. The handle shares suspend/resume state
298    /// with the workflow context via Arc.
299    pub fn handle(&self) -> WorkflowHandle {
300        WorkflowHandle {
301            suspend_state: self.suspend_state.clone(),
302        }
303    }
304
305    /// Runs the workflow on a recurring schedule based on the workflow's
306    /// `schedule.every` or `schedule.cron` definition.
307    ///
308    /// For `every`: runs the workflow at fixed intervals.
309    /// For `cron`: currently not supported (requires cron parsing library).
310    ///
311    /// Returns a `ScheduledWorkflow` that can be cancelled to stop the schedule.
312    /// If no schedule is defined, runs once and returns a completed handle.
313    pub fn schedule(self, input: Value) -> ScheduledWorkflow {
314        if let Some(ref schedule) = self.workflow.schedule {
315            if let Some(ref every_duration) = schedule.every {
316                let interval = crate::utils::duration_to_std(every_duration);
317                let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
318                let join_handle = tokio::spawn(async move {
319                    let mut interval_timer = tokio::time::interval(interval);
320                    loop {
321                        tokio::select! {
322                            _ = interval_timer.tick() => {
323                                let _ = self.run(input.clone()).await;
324                            }
325                            _ = cancel_rx.changed() => {
326                                break;
327                            }
328                        }
329                    }
330                });
331                return ScheduledWorkflow {
332                    join_handle,
333                    cancel_tx,
334                };
335            }
336            // Cron/after/on: run once (cron scheduling not yet supported)
337        }
338
339        // No schedule or non-recurring: run once
340        let (cancel_tx, _) = tokio::sync::watch::channel(false);
341        let join_handle = tokio::spawn(async move {
342            let _ = self.run(input).await;
343        });
344        ScheduledWorkflow {
345            join_handle,
346            cancel_tx,
347        }
348    }
349
350    /// Resolves the workflow-level timeout duration, if configured
351    fn resolve_workflow_timeout(
352        &self,
353        input: &Value,
354        context: &WorkflowContext,
355    ) -> Option<Duration> {
356        let timeout_def = self.workflow.timeout.as_ref()?;
357        let vars = context.get_vars();
358        crate::utils::parse_duration_with_context(
359            timeout_def,
360            input,
361            &vars,
362            &self.workflow.document.name,
363            Some(&self.workflow),
364        )
365        .ok()
366    }
367
368    /// Processes workflow input: schema validation and expression transformation
369    fn process_input(&self, input: &Value, context: &WorkflowContext) -> WorkflowResult<Value> {
370        let input_def = match &self.workflow.input {
371            Some(def) => def,
372            None => return Ok(input.clone()),
373        };
374
375        // Validate input schema
376        if let Some(ref schema) = input_def.schema {
377            validate_schema(input, schema, "/")?;
378        }
379
380        // Transform input via from expression
381        let vars = context.get_vars();
382        match input_def.from {
383            Some(ref from_val) => evaluate_value_expr(from_val, input, &vars, "/"),
384            None => Ok(input.clone()),
385        }
386    }
387}
388
389#[cfg(test)]
390#[allow(clippy::needless_borrow, clippy::unnecessary_to_owned, clippy::ptr_arg)]
391mod runner_tests;