Skip to main content

wfe_core/traits/
step.rs

1use async_trait::async_trait;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::artifact_volume::{ArtifactVolume, ArtifactVolumePackage};
6use crate::models::{
7    ExecutionPointer, ExecutionResult, WorkflowDefinition, WorkflowInstance, WorkflowStep,
8};
9use crate::traits::ArtifactStore;
10use super::persistence::PersistenceProvider;
11
12/// Marker trait for all data types that flow between workflow steps.
13/// Anything that is serializable and deserializable qualifies.
14pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
15
16/// Blanket implementation: any type satisfying the bounds is WorkflowData.
17impl<T> WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
18
19/// Context for steps that need to interact with the workflow host.
20/// Implemented by WorkflowHost to allow steps like SubWorkflow to start child workflows.
21///
22/// The `parent_root_workflow_id` argument carries the UUID of the top-level
23/// ancestor workflow so backends (notably Kubernetes) can place every
24/// descendant of a given root run in the same isolation domain — namespace,
25/// shared volume, RBAC — so sub-workflows can share state like a cloned
26/// repo checkout. Pass `None` when starting a brand-new root workflow.
27pub trait HostContext: Send + Sync {
28    fn start_workflow(
29        &self,
30        definition_id: &str,
31        version: u32,
32        data: serde_json::Value,
33        parent_root_workflow_id: Option<String>,
34    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<String>> + Send + '_>>;
35}
36
37/// Context available to a step during execution.
38pub struct StepExecutionContext<'a> {
39    /// The current item when iterating (ForEach).
40    pub item: Option<&'a serde_json::Value>,
41    /// The current execution pointer.
42    pub execution_pointer: &'a ExecutionPointer,
43    /// Persistence data from a previous execution of this step.
44    pub persistence_data: Option<&'a serde_json::Value>,
45    /// The step definition.
46    pub step: &'a WorkflowStep,
47    /// The running workflow instance.
48    pub workflow: &'a WorkflowInstance,
49    /// The compiled workflow definition the instance was created from.
50    /// `None` on code paths that don't have it available (some test fixtures);
51    /// production execution always populates this so executor-specific
52    /// features (e.g. Kubernetes shared volumes) can inspect the
53    /// definition-level configuration.
54    pub definition: Option<&'a WorkflowDefinition>,
55    /// Cancellation token.
56    pub cancellation_token: tokio_util::sync::CancellationToken,
57    /// Host context for starting child workflows. None if not available.
58    pub host_context: Option<&'a dyn HostContext>,
59    /// Log sink for streaming step output. None if not configured.
60    pub log_sink: Option<&'a dyn super::LogSink>,
61    /// Artifact store for backward compatibility.
62    pub artifact_store: Option<&'a dyn ArtifactStore>,
63    /// Resolved artifact volume for this step. Preferred over `artifact_store`.
64    pub artifact_volume: Option<&'a ArtifactVolume>,
65    /// Serialized artifact package for distributed scenarios.
66    pub artifact_package: Option<ArtifactVolumePackage>,
67    /// Persistence provider. Available to long-running steps (e.g.
68    /// `SignAndWriteCommitStep`) that need to emit a mid-step heartbeat by
69    /// calling `persistence.persist_workflow(&instance)`. The Postgres
70    /// implementation automatically bumps `last_heartbeat_at = now()` on
71    /// every `persist_workflow` call.
72    pub persistence: Option<&'a dyn PersistenceProvider>,
73}
74
75// Manual Debug impl since dyn HostContext/PersistenceProvider are not Debug.
76impl<'a> std::fmt::Debug for StepExecutionContext<'a> {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("StepExecutionContext")
79            .field("item", &self.item)
80            .field("execution_pointer", &self.execution_pointer)
81            .field("persistence_data", &self.persistence_data)
82            .field("step", &self.step)
83            .field("workflow", &self.workflow)
84            .field("definition", &self.definition.is_some())
85            .field("host_context", &self.host_context.is_some())
86            .field("log_sink", &self.log_sink.is_some())
87            .field("artifact_store", &self.artifact_store.is_some())
88            .field("artifact_volume", &self.artifact_volume.is_some())
89            .field("artifact_package", &self.artifact_package.is_some())
90            .field("persistence", &self.persistence.is_some())
91            .finish()
92    }
93}
94
95/// The core unit of work in a workflow. Each step implements this trait.
96///
97/// Steps must be `Send + Sync` because the executor may run them on different
98/// threads. They must also implement `Default` to be usable with the builder API.
99///
100/// # Example
101/// ```ignore
102/// use async_trait::async_trait;
103/// use wfe_core::models::ExecutionResult;
104/// use wfe_core::traits::step::{StepBody, StepExecutionContext};
105///
106/// #[derive(Default)]
107/// struct Greet;
108///
109/// #[async_trait]
110/// impl StepBody for Greet {
111///     async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
112///         println!("Hello, workflow!");
113///         Ok(ExecutionResult::next())
114///     }
115/// }
116/// ```
117#[async_trait]
118pub trait StepBody: Send + Sync {
119    /// Execute the step.
120    ///
121    /// The [`StepExecutionContext`] provides access to workflow data, the current
122    /// execution pointer, persistence data from previous runs, and a cancellation
123    /// token. Return an [`ExecutionResult`] to control flow:
124    ///
125    /// - [`ExecutionResult::next()`](crate::models::ExecutionResult::next) — continue to the next step
126    /// - [`ExecutionResult::branch(values, data)`](crate::models::ExecutionResult::branch) — follow a named outcome
127    /// - [`ExecutionResult::sleep(duration, data)`](crate::models::ExecutionResult::sleep) — pause execution
128    /// - `Err(WfeError::Execution("msg".into()))` — mark the pointer as failed
129    /// - [`ExecutionResult::persist(data)`](crate::models::ExecutionResult::persist) — persist state and pause
130    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult>;
131
132    /// Mount any artifacts this step requires.
133    ///
134    /// Called by the executor loop **before** [`run`](Self::run).
135    /// Default implementation is a no-op for primitives that don't consume
136    /// artifact inputs.
137    async fn mount_artifacts(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result<()> {
138        Ok(())
139    }
140
141    /// Unmount and clean up artifacts.
142    ///
143    /// Called by the executor loop **after** [`run`](Self::run) completes,
144    /// regardless of success or failure.
145    /// Default implementation is a no-op.
146    async fn unmount_artifacts(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result<()> {
147        Ok(())
148    }
149}