Skip to main content

swf_runtime/runner/
mod.rs

1use crate::context::{SuspendState, WorkflowContext};
2use crate::error::{ErrorKind, WorkflowError, WorkflowResult};
3use crate::events::SharedEventBus;
4use crate::expression::{evaluate_value_expr, ExpressionEngine, ExpressionEngineRegistry};
5use crate::handler::{CallHandler, CustomTaskHandler, HandlerRegistry, RunHandler};
6use crate::json_schema::validate_schema;
7use crate::listener::{WorkflowEvent, WorkflowExecutionListener};
8use crate::secret::SecretManager;
9use crate::status::StatusPhase;
10use crate::task_runner::{TaskRunner, TaskSupport};
11use crate::tasks::DoTaskRunner;
12use serde_json::Value;
13use std::collections::HashMap;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::Duration;
17use swf_core::models::task::TaskDefinition;
18use swf_core::models::workflow::WorkflowDefinition;
19
20/// A handle to control a running workflow from another task/thread
21///
22/// Allows suspending and resuming a workflow execution externally.
23/// Obtain via `WorkflowRunner::handle()` before calling `run()`.
24#[derive(Clone)]
25pub struct WorkflowHandle {
26    suspend_state: SuspendState,
27}
28
29/// A handle to a scheduled (recurring) workflow execution
30///
31/// Can be cancelled to stop the recurring schedule.
32pub struct ScheduledWorkflow {
33    join_handle: tokio::task::JoinHandle<()>,
34    cancel_tx: tokio::sync::watch::Sender<bool>,
35}
36
37impl ScheduledWorkflow {
38    /// Cancels the scheduled workflow execution
39    pub fn cancel(&self) {
40        let _ = self.cancel_tx.send(true);
41    }
42
43    /// Waits for the scheduled workflow to complete (after cancellation)
44    pub async fn join(self) {
45        let _ = self.join_handle.await;
46    }
47}
48
49impl WorkflowHandle {
50    /// Suspends the workflow. Returns true if suspended, false if already suspended.
51    pub fn suspend(&self) -> bool {
52        self.suspend_state.suspend()
53    }
54
55    /// Resumes a suspended workflow. Returns true if resumed, false if not suspended.
56    pub fn resume(&self) -> bool {
57        self.suspend_state.resume()
58    }
59
60    /// Checks if the workflow is currently suspended
61    pub fn is_suspended(&self) -> bool {
62        self.suspend_state.is_suspended()
63    }
64}
65
66/// The main workflow runner that executes workflow definitions
67pub struct WorkflowRunner {
68    workflow: WorkflowDefinition,
69    secret_manager: Option<Arc<dyn SecretManager>>,
70    listener: Option<Arc<dyn WorkflowExecutionListener>>,
71    event_bus: Option<SharedEventBus>,
72    sub_workflows: HashMap<String, WorkflowDefinition>,
73    functions: HashMap<String, TaskDefinition>,
74    handler_registry: HandlerRegistry,
75    expression_engines: ExpressionEngineRegistry,
76    custom_vars: HashMap<String, Value>,
77    suspend_state: SuspendState,
78}
79
80impl WorkflowRunner {
81    /// Creates a new WorkflowRunner for the given workflow definition
82    pub fn new(workflow: WorkflowDefinition) -> WorkflowResult<Self> {
83        Ok(Self {
84            workflow,
85            secret_manager: None,
86            listener: None,
87            event_bus: None,
88            sub_workflows: HashMap::new(),
89            functions: HashMap::new(),
90            handler_registry: HandlerRegistry::new(),
91            expression_engines: ExpressionEngineRegistry::new(),
92            custom_vars: HashMap::new(),
93            suspend_state: SuspendState::new(),
94        })
95    }
96
97    /// Sets the secret manager for $secret expression variable
98    pub fn with_secret_manager(mut self, manager: Arc<dyn SecretManager>) -> Self {
99        self.secret_manager = Some(manager);
100        self
101    }
102
103    /// Sets the execution listener for workflow/task events
104    pub fn with_listener(mut self, listener: Arc<dyn WorkflowExecutionListener>) -> Self {
105        self.listener = Some(listener);
106        self
107    }
108
109    /// Sets the event bus for emit/listen tasks
110    pub fn with_event_bus(mut self, bus: SharedEventBus) -> Self {
111        self.event_bus = Some(bus);
112        self
113    }
114
115    /// Registers a sub-workflow that can be invoked via `run: workflow`
116    /// Keyed by "namespace/name/version"
117    pub fn with_sub_workflow(mut self, workflow: WorkflowDefinition) -> Self {
118        let doc = &workflow.document;
119        let key = format!("{}/{}/{}", doc.namespace, doc.name, doc.version);
120        self.sub_workflows.insert(key, workflow);
121        self
122    }
123
124    /// Registers a custom call handler for a specific call type
125    /// (e.g., "grpc", "openapi", "asyncapi", "a2a")
126    pub fn with_call_handler(mut self, handler: Box<dyn CallHandler>) -> Self {
127        self.handler_registry.register_call_handler(handler);
128        self
129    }
130
131    /// Registers a custom run handler for a specific run type
132    /// (e.g., "container", "script")
133    pub fn with_run_handler(mut self, handler: Box<dyn RunHandler>) -> Self {
134        self.handler_registry.register_run_handler(handler);
135        self
136    }
137
138    /// Registers a named function definition for call.function resolution
139    ///
140    /// This allows registering external function definitions that can be
141    /// referenced by `call: <functionName>` in workflows, similar to
142    /// Java SDK's cataloged function mechanism.
143    pub fn with_function(mut self, name: &str, task: TaskDefinition) -> Self {
144        self.functions.insert(name.to_string(), task);
145        self
146    }
147
148    /// Sets the entire handler registry (used for propagating handlers to child runners)
149    pub fn with_handler_registry(mut self, registry: HandlerRegistry) -> Self {
150        self.handler_registry = registry;
151        self
152    }
153
154    /// Registers a custom task handler for a specific custom task type
155    pub fn with_custom_task_handler(mut self, handler: Box<dyn CustomTaskHandler>) -> Self {
156        self.handler_registry.register_custom_task_handler(handler);
157        self
158    }
159
160    /// Registers a custom expression engine for a specific prefix (e.g., "cel", "js")
161    ///
162    /// Expressions prefixed with `engine_prefix:` will be routed to this engine.
163    /// Unprefixed expressions default to JQ.
164    pub fn with_expression_engine(mut self, engine: Arc<dyn ExpressionEngine>) -> Self {
165        self.expression_engines.register(engine);
166        self
167    }
168
169    /// Sets the expression engine registry (replaces all previously registered engines).
170    /// Useful for propagating engines from a parent runner to a child runner.
171    pub fn with_expression_engine_registry(mut self, registry: ExpressionEngineRegistry) -> Self {
172        self.expression_engines = registry;
173        self
174    }
175
176    /// Injects a custom variable into the JQ expression context.
177    ///
178    /// The variable will be available as `$name` in all expressions (input.from,
179    /// output.as, switch conditions, etc.). This is useful for passing external
180    /// configuration or environment information into the workflow.
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// use swf_runtime::WorkflowRunner;
186    /// use serde_json::json;
187    ///
188    /// let runner = WorkflowRunner::new(workflow)
189    ///     .expect("failed to create runner")
190    ///     .with_variable("config", json!({"base_url": "https://api.example.com"}))
191    ///     .with_variable("env", json!({"region": "us-east-1"}));
192    /// ```
193    pub fn with_variable(mut self, name: impl Into<String>, value: Value) -> Self {
194        // Store as a sub-workflow-independent variable — we'll inject into context in run()
195        // For simplicity, we use a dedicated field on WorkflowRunner
196        self.custom_vars.insert(name.into(), value);
197        self
198    }
199
200    /// Runs the workflow with the given input and returns the output
201    pub async fn run(&self, input: Value) -> WorkflowResult<Value> {
202        let span = tracing::info_span!(
203            "workflow",
204            name = %self.workflow.document.name,
205            version = %self.workflow.document.version,
206        );
207        let _enter = span.enter();
208
209        let mut context = WorkflowContext::new(&self.workflow)?;
210
211        // Set secret manager if configured
212        if let Some(ref mgr) = self.secret_manager {
213            context.set_secret_manager(mgr.clone());
214        }
215
216        // Set listener if configured
217        if let Some(ref listener) = self.listener {
218            context.set_listener(listener.clone());
219        }
220
221        // Set sub-workflow registry
222        if !self.sub_workflows.is_empty() {
223            context.set_sub_workflows(self.sub_workflows.clone());
224        }
225
226        // Set event bus if configured
227        if let Some(ref bus) = self.event_bus {
228            context.set_event_bus(bus.clone());
229        }
230
231        // Set handler registry
232        context.set_handler_registry(self.handler_registry.clone());
233
234        // Set expression engines
235        context.set_expression_engines(self.expression_engines.clone());
236
237        // Inject custom variables into the JQ context
238        if !self.custom_vars.is_empty() {
239            context.add_local_expr_vars(self.custom_vars.clone());
240        }
241
242        // Set registered function definitions
243        if !self.functions.is_empty() {
244            context.set_functions(self.functions.clone());
245        }
246
247        // Share suspend/resume state with context
248        context.set_suspend_state(self.suspend_state.clone());
249
250        let instance_id = context.instance_id().to_string();
251        tracing::info!(instance_id = %instance_id, "workflow started");
252
253        // Handle schedule:after — delay before starting
254        if let Some(ref schedule) = self.workflow.schedule {
255            if let Some(ref after_duration) = schedule.after {
256                let duration = crate::utils::duration_to_std(after_duration);
257                if !duration.is_zero() {
258                    context.set_status(StatusPhase::Waiting);
259                    tokio::time::sleep(duration).await;
260                }
261            }
262        }
263
264        // Process input
265        let processed_input = self.process_input(&input, &context)?;
266
267        context.set_input(processed_input.clone());
268        context.set_raw_input(&input);
269        context.set_status(StatusPhase::Running);
270
271        context.emit_event(WorkflowEvent::WorkflowStarted {
272            instance_id: instance_id.clone(),
273            input: processed_input.clone(),
274        });
275
276        // Run the top-level do tasks (with optional workflow timeout)
277        let do_runner = DoTaskRunner::new_from_workflow(&self.workflow)?;
278
279        // Resolve workflow timeout
280        let workflow_timeout = self.resolve_workflow_timeout(&processed_input, &context);
281
282        let mut support = TaskSupport::new(&self.workflow, &mut context);
283
284        let run_result = if let Some(timeout_duration) = workflow_timeout {
285            match tokio::time::timeout(
286                timeout_duration,
287                do_runner.run(processed_input, &mut support),
288            )
289            .await
290            {
291                Ok(result) => result,
292                Err(_) => {
293                    // Cancel the context so any running wait points respond immediately
294                    support.context.cancel();
295                    support.context.set_status(StatusPhase::Faulted);
296                    support.context.emit_event(WorkflowEvent::WorkflowFailed {
297                        instance_id: instance_id.clone(),
298                        error: "workflow timed out".to_string(),
299                    });
300                    return Err(WorkflowError::timeout(
301                        format!("workflow timed out after {:?}", timeout_duration),
302                        &self.workflow.document.name,
303                    ));
304                }
305            }
306        } else {
307            do_runner.run(processed_input, &mut support).await
308        };
309
310        let output = match run_result {
311            Ok(output) => output,
312            Err(e) => {
313                support.context.set_status(StatusPhase::Faulted);
314                support.context.emit_event(WorkflowEvent::WorkflowFailed {
315                    instance_id: instance_id.clone(),
316                    error: format!("{}", e),
317                });
318                tracing::error!(instance_id = %instance_id, error = %e, "workflow failed");
319                // Only set instance on Runtime errors, preserve error type for others
320                if e.kind() == ErrorKind::Runtime {
321                    let reference = support.get_task_reference().unwrap_or("/");
322                    return Err(e.with_instance(reference));
323                }
324                return Err(e);
325            }
326        };
327
328        support.context.clear_task_context();
329
330        // Process output using TaskSupport (reuses shared output processing logic)
331        let processed_output = support.process_task_output(
332            self.workflow.output.as_ref(),
333            &output,
334            &self.workflow.document.name,
335        )?;
336
337        support.context.set_output(processed_output.clone());
338        support.context.set_status(StatusPhase::Completed);
339
340        support
341            .context
342            .emit_event(WorkflowEvent::WorkflowCompleted {
343                instance_id: instance_id.clone(),
344                output: processed_output.clone(),
345            });
346
347        tracing::info!(instance_id = %instance_id, "workflow completed");
348
349        Ok(processed_output)
350    }
351
352    /// Returns a reference to the workflow definition
353    pub fn workflow(&self) -> &WorkflowDefinition {
354        &self.workflow
355    }
356
357    /// Returns a WorkflowHandle that can suspend/resume the running workflow
358    ///
359    /// Must be called before `run()`. The handle shares suspend/resume state
360    /// with the workflow context via Arc.
361    pub fn handle(&self) -> WorkflowHandle {
362        WorkflowHandle {
363            suspend_state: self.suspend_state.clone(),
364        }
365    }
366
367    /// Runs the workflow on a recurring schedule based on the workflow's
368    /// `schedule.every` or `schedule.cron` definition.
369    ///
370    /// For `every`: runs the workflow at fixed intervals.
371    /// For `cron`: runs the workflow according to the cron expression schedule.
372    ///
373    /// Returns a `ScheduledWorkflow` that can be cancelled to stop the schedule.
374    /// If no schedule is defined, runs once and returns a completed handle.
375    pub fn schedule(self, input: Value) -> ScheduledWorkflow {
376        if let Some(ref schedule) = self.workflow.schedule {
377            if let Some(ref every_duration) = schedule.every {
378                let interval = crate::utils::duration_to_std(every_duration);
379                let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
380                let join_handle = tokio::spawn(async move {
381                    let mut interval_timer = tokio::time::interval(interval);
382                    loop {
383                        tokio::select! {
384                            _ = interval_timer.tick() => {
385                                let _ = self.run(input.clone()).await;
386                            }
387                            _ = cancel_rx.changed() => {
388                                break;
389                            }
390                        }
391                    }
392                });
393                return ScheduledWorkflow {
394                    join_handle,
395                    cancel_tx,
396                };
397            }
398            if let Some(ref cron_expr) = schedule.cron {
399                let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
400                let cron_expr = cron_expr.clone();
401                let join_handle = tokio::spawn(async move {
402                    // Parse the cron expression (standard 5-field: min hour dom month dow)
403                    let schedule = match cron::Schedule::from_str(&cron_expr) {
404                        Ok(s) => s,
405                        Err(e) => {
406                            eprintln!("invalid cron expression '{}': {}", cron_expr, e);
407                            return;
408                        }
409                    };
410                    loop {
411                        let next = schedule.upcoming(chrono::Utc).next();
412                        let next = match next {
413                            Some(t) => t,
414                            None => break,
415                        };
416                        let delay = next - chrono::Utc::now();
417                        let delay_std = delay.to_std().unwrap_or(Duration::ZERO);
418                        if delay_std.is_zero() {
419                            let _ = self.run(input.clone()).await;
420                            continue;
421                        }
422                        tokio::select! {
423                            _ = tokio::time::sleep(delay_std) => {
424                                let _ = self.run(input.clone()).await;
425                            }
426                            _ = cancel_rx.changed() => {
427                                break;
428                            }
429                        }
430                    }
431                });
432                return ScheduledWorkflow {
433                    join_handle,
434                    cancel_tx,
435                };
436            }
437            // after/on: run once (event-based scheduling not yet supported)
438        }
439
440        // No schedule or non-recurring: run once
441        let (cancel_tx, _) = tokio::sync::watch::channel(false);
442        let join_handle = tokio::spawn(async move {
443            let _ = self.run(input).await;
444        });
445        ScheduledWorkflow {
446            join_handle,
447            cancel_tx,
448        }
449    }
450
451    /// Resolves the workflow-level timeout duration, if configured
452    fn resolve_workflow_timeout(
453        &self,
454        input: &Value,
455        context: &WorkflowContext,
456    ) -> Option<Duration> {
457        let timeout_def = self.workflow.timeout.as_ref()?;
458        let vars = context.get_vars();
459        crate::utils::parse_duration_with_context(
460            timeout_def,
461            input,
462            &vars,
463            &self.workflow.document.name,
464            Some(&self.workflow),
465        )
466        .ok()
467    }
468
469    /// Processes workflow input: schema validation and expression transformation
470    fn process_input(&self, input: &Value, context: &WorkflowContext) -> WorkflowResult<Value> {
471        let input_def = match &self.workflow.input {
472            Some(def) => def,
473            None => return Ok(input.clone()),
474        };
475
476        // Validate input schema
477        if let Some(ref schema) = input_def.schema {
478            validate_schema(input, schema, "/")?;
479        }
480
481        // Transform input via from expression
482        let vars = context.get_vars();
483        match input_def.from {
484            Some(ref from_val) => evaluate_value_expr(from_val, input, &vars, "/"),
485            None => Ok(input.clone()),
486        }
487    }
488}
489
490#[cfg(test)]
491#[allow(clippy::needless_borrow, clippy::unnecessary_to_owned, clippy::ptr_arg)]
492mod runner_tests;