wfe_core/traits/step.rs
1use async_trait::async_trait;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::artifact_volume::{ArtifactVolume, ArtifactVolumePackage};
6use crate::models::{
7 ExecutionPointer, ExecutionResult, WorkflowDefinition, WorkflowInstance, WorkflowStep,
8};
9use crate::traits::ArtifactStore;
10use super::persistence::PersistenceProvider;
11
12/// Marker trait for all data types that flow between workflow steps.
13/// Anything that is serializable and deserializable qualifies.
14pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
15
16/// Blanket implementation: any type satisfying the bounds is WorkflowData.
17impl<T> WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
18
19/// Context for steps that need to interact with the workflow host.
20/// Implemented by WorkflowHost to allow steps like SubWorkflow to start child workflows.
21///
22/// The `parent_root_workflow_id` argument carries the UUID of the top-level
23/// ancestor workflow so backends (notably Kubernetes) can place every
24/// descendant of a given root run in the same isolation domain — namespace,
25/// shared volume, RBAC — so sub-workflows can share state like a cloned
26/// repo checkout. Pass `None` when starting a brand-new root workflow.
27pub trait HostContext: Send + Sync {
28 fn start_workflow(
29 &self,
30 definition_id: &str,
31 version: u32,
32 data: serde_json::Value,
33 parent_root_workflow_id: Option<String>,
34 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<String>> + Send + '_>>;
35}
36
37/// Context available to a step during execution.
38pub struct StepExecutionContext<'a> {
39 /// The current item when iterating (ForEach).
40 pub item: Option<&'a serde_json::Value>,
41 /// The current execution pointer.
42 pub execution_pointer: &'a ExecutionPointer,
43 /// Persistence data from a previous execution of this step.
44 pub persistence_data: Option<&'a serde_json::Value>,
45 /// The step definition.
46 pub step: &'a WorkflowStep,
47 /// The running workflow instance.
48 pub workflow: &'a WorkflowInstance,
49 /// The compiled workflow definition the instance was created from.
50 /// `None` on code paths that don't have it available (some test fixtures);
51 /// production execution always populates this so executor-specific
52 /// features (e.g. Kubernetes shared volumes) can inspect the
53 /// definition-level configuration.
54 pub definition: Option<&'a WorkflowDefinition>,
55 /// Cancellation token.
56 pub cancellation_token: tokio_util::sync::CancellationToken,
57 /// Host context for starting child workflows. None if not available.
58 pub host_context: Option<&'a dyn HostContext>,
59 /// Log sink for streaming step output. None if not configured.
60 pub log_sink: Option<&'a dyn super::LogSink>,
61 /// Artifact store for backward compatibility.
62 pub artifact_store: Option<&'a dyn ArtifactStore>,
63 /// Resolved artifact volume for this step. Preferred over `artifact_store`.
64 pub artifact_volume: Option<&'a ArtifactVolume>,
65 /// Serialized artifact package for distributed scenarios.
66 pub artifact_package: Option<ArtifactVolumePackage>,
67 /// Persistence provider. Available to long-running steps (e.g.
68 /// `SignAndWriteCommitStep`) that need to emit a mid-step heartbeat by
69 /// calling `persistence.persist_workflow(&instance)`. The Postgres
70 /// implementation automatically bumps `last_heartbeat_at = now()` on
71 /// every `persist_workflow` call.
72 pub persistence: Option<&'a dyn PersistenceProvider>,
73}
74
75// Manual Debug impl since dyn HostContext/PersistenceProvider are not Debug.
76impl<'a> std::fmt::Debug for StepExecutionContext<'a> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("StepExecutionContext")
79 .field("item", &self.item)
80 .field("execution_pointer", &self.execution_pointer)
81 .field("persistence_data", &self.persistence_data)
82 .field("step", &self.step)
83 .field("workflow", &self.workflow)
84 .field("definition", &self.definition.is_some())
85 .field("host_context", &self.host_context.is_some())
86 .field("log_sink", &self.log_sink.is_some())
87 .field("artifact_store", &self.artifact_store.is_some())
88 .field("artifact_volume", &self.artifact_volume.is_some())
89 .field("artifact_package", &self.artifact_package.is_some())
90 .field("persistence", &self.persistence.is_some())
91 .finish()
92 }
93}
94
95/// The core unit of work in a workflow. Each step implements this trait.
96///
97/// Steps must be `Send + Sync` because the executor may run them on different
98/// threads. They must also implement `Default` to be usable with the builder API.
99///
100/// # Example
101/// ```ignore
102/// use async_trait::async_trait;
103/// use wfe_core::models::ExecutionResult;
104/// use wfe_core::traits::step::{StepBody, StepExecutionContext};
105///
106/// #[derive(Default)]
107/// struct Greet;
108///
109/// #[async_trait]
110/// impl StepBody for Greet {
111/// async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
112/// println!("Hello, workflow!");
113/// Ok(ExecutionResult::next())
114/// }
115/// }
116/// ```
117#[async_trait]
118pub trait StepBody: Send + Sync {
119 /// Execute the step.
120 ///
121 /// The [`StepExecutionContext`] provides access to workflow data, the current
122 /// execution pointer, persistence data from previous runs, and a cancellation
123 /// token. Return an [`ExecutionResult`] to control flow:
124 ///
125 /// - [`ExecutionResult::next()`](crate::models::ExecutionResult::next) — continue to the next step
126 /// - [`ExecutionResult::branch(values, data)`](crate::models::ExecutionResult::branch) — follow a named outcome
127 /// - [`ExecutionResult::sleep(duration, data)`](crate::models::ExecutionResult::sleep) — pause execution
128 /// - `Err(WfeError::Execution("msg".into()))` — mark the pointer as failed
129 /// - [`ExecutionResult::persist(data)`](crate::models::ExecutionResult::persist) — persist state and pause
130 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult>;
131
132 /// Mount any artifacts this step requires.
133 ///
134 /// Called by the executor loop **before** [`run`](Self::run).
135 /// Default implementation is a no-op for primitives that don't consume
136 /// artifact inputs.
137 async fn mount_artifacts(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result<()> {
138 Ok(())
139 }
140
141 /// Unmount and clean up artifacts.
142 ///
143 /// Called by the executor loop **after** [`run`](Self::run) completes,
144 /// regardless of success or failure.
145 /// Default implementation is a no-op.
146 async fn unmount_artifacts(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result<()> {
147 Ok(())
148 }
149}