wfe_core/traits/step.rs
1use async_trait::async_trait;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::models::{
6 ExecutionPointer, ExecutionResult, WorkflowDefinition, WorkflowInstance, WorkflowStep,
7};
8
9/// Marker trait for all data types that flow between workflow steps.
10/// Anything that is serializable and deserializable qualifies.
11pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
12
13/// Blanket implementation: any type satisfying the bounds is WorkflowData.
14impl<T> WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
15
16/// Context for steps that need to interact with the workflow host.
17/// Implemented by WorkflowHost to allow steps like SubWorkflow to start child workflows.
18///
19/// The `parent_root_workflow_id` argument carries the UUID of the top-level
20/// ancestor workflow so backends (notably Kubernetes) can place every
21/// descendant of a given root run in the same isolation domain — namespace,
22/// shared volume, RBAC — so sub-workflows can share state like a cloned
23/// repo checkout. Pass `None` when starting a brand-new root workflow.
24pub trait HostContext: Send + Sync {
25 fn start_workflow(
26 &self,
27 definition_id: &str,
28 version: u32,
29 data: serde_json::Value,
30 parent_root_workflow_id: Option<String>,
31 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<String>> + Send + '_>>;
32}
33
34/// Context available to a step during execution.
35pub struct StepExecutionContext<'a> {
36 /// The current item when iterating (ForEach).
37 pub item: Option<&'a serde_json::Value>,
38 /// The current execution pointer.
39 pub execution_pointer: &'a ExecutionPointer,
40 /// Persistence data from a previous execution of this step.
41 pub persistence_data: Option<&'a serde_json::Value>,
42 /// The step definition.
43 pub step: &'a WorkflowStep,
44 /// The running workflow instance.
45 pub workflow: &'a WorkflowInstance,
46 /// The compiled workflow definition the instance was created from.
47 /// `None` on code paths that don't have it available (some test fixtures);
48 /// production execution always populates this so executor-specific
49 /// features (e.g. Kubernetes shared volumes) can inspect the
50 /// definition-level configuration.
51 pub definition: Option<&'a WorkflowDefinition>,
52 /// Cancellation token.
53 pub cancellation_token: tokio_util::sync::CancellationToken,
54 /// Host context for starting child workflows. None if not available.
55 pub host_context: Option<&'a dyn HostContext>,
56 /// Log sink for streaming step output. None if not configured.
57 pub log_sink: Option<&'a dyn super::LogSink>,
58}
59
60// Manual Debug impl since dyn HostContext is not Debug.
61impl<'a> std::fmt::Debug for StepExecutionContext<'a> {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("StepExecutionContext")
64 .field("item", &self.item)
65 .field("execution_pointer", &self.execution_pointer)
66 .field("persistence_data", &self.persistence_data)
67 .field("step", &self.step)
68 .field("workflow", &self.workflow)
69 .field("definition", &self.definition.is_some())
70 .field("host_context", &self.host_context.is_some())
71 .field("log_sink", &self.log_sink.is_some())
72 .finish()
73 }
74}
75
76/// The core unit of work in a workflow. Each step implements this trait.
77#[async_trait]
78pub trait StepBody: Send + Sync {
79 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult>;
80}