Skip to main content

forge_core/workflow/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use super::context::WorkflowContext;
9use crate::Result;
10
11/// Trait for durable workflow handlers.
12pub trait ForgeWorkflow: crate::__sealed::Sealed + Send + Sync + 'static {
13    type Input: DeserializeOwned + Serialize + Send + Sync;
14    type Output: Serialize + Send;
15
16    fn info() -> WorkflowInfo;
17
18    fn execute(
19        ctx: &WorkflowContext,
20        input: Self::Input,
21    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
22}
23
24/// Lifecycle state of a workflow definition version.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26#[non_exhaustive]
27pub enum WorkflowDefStatus {
28    #[default]
29    Active,
30    Deprecated,
31    Staging,
32}
33
34impl WorkflowDefStatus {
35    pub fn as_str(self) -> &'static str {
36        match self {
37            Self::Active => "active",
38            Self::Deprecated => "deprecated",
39            Self::Staging => "staging",
40        }
41    }
42
43    pub fn is_active(self) -> bool {
44        matches!(self, Self::Active)
45    }
46
47    pub fn is_deprecated(self) -> bool {
48        matches!(self, Self::Deprecated)
49    }
50}
51
52/// Metadata for a registered workflow handler.
53#[derive(Debug, Clone)]
54pub struct WorkflowInfo {
55    pub name: &'static str,
56    pub version: &'static str,
57    /// Blake3 hash of the persisted contract (name, version, step/wait keys, types).
58    pub signature: &'static str,
59    pub status: WorkflowDefStatus,
60    pub timeout: Duration,
61    pub http_timeout: Option<Duration>,
62    pub is_public: bool,
63    pub required_role: Option<&'static str>,
64}
65
66impl WorkflowInfo {
67    pub fn is_active(&self) -> bool {
68        self.status.is_active()
69    }
70
71    pub fn is_deprecated(&self) -> bool {
72        self.status.is_deprecated()
73    }
74}
75
76impl Default for WorkflowInfo {
77    fn default() -> Self {
78        Self {
79            name: "",
80            version: "v1",
81            signature: "",
82            status: WorkflowDefStatus::Active,
83            timeout: Duration::from_secs(86400), // 24 hours
84            http_timeout: None,
85            is_public: false,
86            required_role: None,
87        }
88    }
89}
90
91/// Workflow execution status. Blocked variants are non-terminal.
92///
93/// `Compensating` is non-terminal — the run is actively unwinding completed
94/// steps. `Compensated`, `Retired`, and `CancelledByOperator` are terminal:
95/// they each describe a distinct end state that callers must be able to
96/// distinguish from a generic `Failed`.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98#[non_exhaustive]
99pub enum WorkflowStatus {
100    Pending,
101    Running,
102    Sleeping,
103    Waiting,
104    Completed,
105    Failed,
106    /// Compensation is running (reverse-order rollback of completed steps).
107    /// Non-terminal — the run will transition to `Compensated` (or `Failed`
108    /// if a handler errored).
109    Compensating,
110    /// Compensation finished successfully; the run is rolled back and done.
111    Compensated,
112    /// Operator marked the run as unresumable (e.g., schema drift after a
113    /// failed deploy). Terminal — manual intervention required to re-run.
114    Retired,
115    /// Operator force-aborted the run. Terminal — distinct from `Failed`
116    /// because the cause was external, not the workflow itself.
117    CancelledByOperator,
118    BlockedMissingVersion,
119    BlockedSignatureMismatch,
120    BlockedMissingHandler,
121}
122
123impl WorkflowStatus {
124    pub fn as_str(&self) -> &'static str {
125        match self {
126            Self::Pending => "pending",
127            Self::Running => "running",
128            Self::Sleeping => "sleeping",
129            Self::Waiting => "waiting",
130            Self::Completed => "completed",
131            Self::Failed => "failed",
132            Self::Compensating => "compensating",
133            Self::Compensated => "compensated",
134            Self::Retired => "retired_unresumable",
135            Self::CancelledByOperator => "cancelled_by_operator",
136            Self::BlockedMissingVersion => "blocked_missing_version",
137            Self::BlockedSignatureMismatch => "blocked_signature_mismatch",
138            Self::BlockedMissingHandler => "blocked_missing_handler",
139        }
140    }
141
142    pub fn is_terminal(&self) -> bool {
143        matches!(
144            self,
145            Self::Completed
146                | Self::Failed
147                | Self::Compensated
148                | Self::Retired
149                | Self::CancelledByOperator
150        )
151    }
152
153    pub fn is_blocked(&self) -> bool {
154        matches!(
155            self,
156            Self::BlockedMissingVersion
157                | Self::BlockedSignatureMismatch
158                | Self::BlockedMissingHandler
159        )
160    }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct ParseWorkflowStatusError(pub String);
165
166impl std::fmt::Display for ParseWorkflowStatusError {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        write!(f, "invalid workflow status: '{}'", self.0)
169    }
170}
171
172impl std::error::Error for ParseWorkflowStatusError {}
173
174impl FromStr for WorkflowStatus {
175    type Err = ParseWorkflowStatusError;
176
177    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
178        match s {
179            "pending" | "created" => Ok(Self::Pending),
180            "running" => Ok(Self::Running),
181            "sleeping" => Ok(Self::Sleeping),
182            "waiting" => Ok(Self::Waiting),
183            "completed" => Ok(Self::Completed),
184            "failed" => Ok(Self::Failed),
185            "compensating" => Ok(Self::Compensating),
186            "compensated" => Ok(Self::Compensated),
187            "retired_unresumable" => Ok(Self::Retired),
188            "cancelled_by_operator" => Ok(Self::CancelledByOperator),
189            "blocked_missing_version" => Ok(Self::BlockedMissingVersion),
190            "blocked_signature_mismatch" => Ok(Self::BlockedSignatureMismatch),
191            "blocked_missing_handler" => Ok(Self::BlockedMissingHandler),
192            _ => Err(ParseWorkflowStatusError(s.to_string())),
193        }
194    }
195}
196
197#[cfg(test)]
198#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn test_workflow_info_default() {
204        let info = WorkflowInfo::default();
205        assert_eq!(info.name, "");
206        assert_eq!(info.version, "v1");
207        assert_eq!(info.status, WorkflowDefStatus::Active);
208        assert!(info.is_active());
209        assert!(!info.is_deprecated());
210    }
211
212    #[test]
213    fn test_workflow_status_conversion() {
214        assert_eq!(WorkflowStatus::Pending.as_str(), "pending");
215        assert_eq!(WorkflowStatus::Running.as_str(), "running");
216        assert_eq!(WorkflowStatus::Sleeping.as_str(), "sleeping");
217        assert_eq!(WorkflowStatus::Waiting.as_str(), "waiting");
218        assert_eq!(WorkflowStatus::Completed.as_str(), "completed");
219        assert_eq!(WorkflowStatus::Failed.as_str(), "failed");
220
221        assert_eq!(
222            "pending".parse::<WorkflowStatus>(),
223            Ok(WorkflowStatus::Pending)
224        );
225        assert_eq!(
226            "running".parse::<WorkflowStatus>(),
227            Ok(WorkflowStatus::Running)
228        );
229        assert_eq!(
230            "sleeping".parse::<WorkflowStatus>(),
231            Ok(WorkflowStatus::Sleeping)
232        );
233    }
234
235    #[test]
236    fn test_workflow_status_distinct_parsing() {
237        // `created` is the only true legacy alias — every other previously
238        // collapsed string now parses to its own distinct variant.
239        assert_eq!(
240            "created".parse::<WorkflowStatus>(),
241            Ok(WorkflowStatus::Pending)
242        );
243        assert_eq!(
244            "compensating".parse::<WorkflowStatus>(),
245            Ok(WorkflowStatus::Compensating)
246        );
247        assert_eq!(
248            "compensated".parse::<WorkflowStatus>(),
249            Ok(WorkflowStatus::Compensated)
250        );
251        assert_eq!(
252            "retired_unresumable".parse::<WorkflowStatus>(),
253            Ok(WorkflowStatus::Retired)
254        );
255        assert_eq!(
256            "cancelled_by_operator".parse::<WorkflowStatus>(),
257            Ok(WorkflowStatus::CancelledByOperator)
258        );
259        assert_eq!(
260            "blocked_missing_version".parse::<WorkflowStatus>(),
261            Ok(WorkflowStatus::BlockedMissingVersion)
262        );
263        assert_eq!(
264            "blocked_signature_mismatch".parse::<WorkflowStatus>(),
265            Ok(WorkflowStatus::BlockedSignatureMismatch)
266        );
267        assert_eq!(
268            "blocked_missing_handler".parse::<WorkflowStatus>(),
269            Ok(WorkflowStatus::BlockedMissingHandler)
270        );
271    }
272
273    #[test]
274    fn test_workflow_status_is_terminal() {
275        assert!(!WorkflowStatus::Running.is_terminal());
276        assert!(!WorkflowStatus::Waiting.is_terminal());
277        assert!(!WorkflowStatus::Sleeping.is_terminal());
278        assert!(!WorkflowStatus::Pending.is_terminal());
279        // Compensating is non-terminal — it's actively unwinding.
280        assert!(!WorkflowStatus::Compensating.is_terminal());
281        assert!(!WorkflowStatus::BlockedMissingVersion.is_terminal());
282        assert!(!WorkflowStatus::BlockedSignatureMismatch.is_terminal());
283        assert!(!WorkflowStatus::BlockedMissingHandler.is_terminal());
284        assert!(WorkflowStatus::Completed.is_terminal());
285        assert!(WorkflowStatus::Failed.is_terminal());
286        // Each new terminal variant resolves to its own end-state.
287        assert!(WorkflowStatus::Compensated.is_terminal());
288        assert!(WorkflowStatus::Retired.is_terminal());
289        assert!(WorkflowStatus::CancelledByOperator.is_terminal());
290    }
291
292    #[test]
293    fn workflow_status_terminal_variants_round_trip_as_str() {
294        for variant in [
295            WorkflowStatus::Compensating,
296            WorkflowStatus::Compensated,
297            WorkflowStatus::Retired,
298            WorkflowStatus::CancelledByOperator,
299        ] {
300            let s = variant.as_str();
301            let parsed: WorkflowStatus = s.parse().expect("round trip");
302            assert_eq!(parsed, variant, "{s} did not round-trip");
303        }
304    }
305
306    #[test]
307    fn workflow_def_status_default_is_active() {
308        // Default must be Active so a freshly-constructed WorkflowInfo accepts
309        // new runs without an explicit status set.
310        assert_eq!(WorkflowDefStatus::default(), WorkflowDefStatus::Active);
311    }
312
313    #[test]
314    fn workflow_def_status_as_str_round_trips_all_variants() {
315        assert_eq!(WorkflowDefStatus::Active.as_str(), "active");
316        assert_eq!(WorkflowDefStatus::Deprecated.as_str(), "deprecated");
317        assert_eq!(WorkflowDefStatus::Staging.as_str(), "staging");
318    }
319
320    #[test]
321    fn workflow_def_status_active_predicate_only_matches_active() {
322        assert!(WorkflowDefStatus::Active.is_active());
323        assert!(!WorkflowDefStatus::Deprecated.is_active());
324        assert!(!WorkflowDefStatus::Staging.is_active());
325    }
326
327    #[test]
328    fn workflow_def_status_deprecated_predicate_only_matches_deprecated() {
329        assert!(!WorkflowDefStatus::Active.is_deprecated());
330        assert!(WorkflowDefStatus::Deprecated.is_deprecated());
331        assert!(!WorkflowDefStatus::Staging.is_deprecated());
332    }
333
334    #[test]
335    fn workflow_info_active_and_deprecated_track_status() {
336        let deprecated = WorkflowInfo {
337            status: WorkflowDefStatus::Deprecated,
338            ..WorkflowInfo::default()
339        };
340        assert!(!deprecated.is_active());
341        assert!(deprecated.is_deprecated());
342
343        let staging = WorkflowInfo {
344            status: WorkflowDefStatus::Staging,
345            ..WorkflowInfo::default()
346        };
347        assert!(!staging.is_active());
348        assert!(!staging.is_deprecated());
349
350        let active = WorkflowInfo {
351            status: WorkflowDefStatus::Active,
352            ..WorkflowInfo::default()
353        };
354        assert!(active.is_active());
355        assert!(!active.is_deprecated());
356    }
357
358    #[test]
359    fn workflow_info_default_timeout_is_one_day() {
360        let info = WorkflowInfo::default();
361        assert_eq!(info.timeout, Duration::from_secs(86_400));
362        assert!(info.http_timeout.is_none());
363        assert!(!info.is_public);
364        assert!(info.required_role.is_none());
365        assert!(info.signature.is_empty());
366    }
367
368    #[test]
369    fn workflow_status_parse_rejects_unknown() {
370        let err = "garbage".parse::<WorkflowStatus>().unwrap_err();
371        assert_eq!(err.0, "garbage");
372        // Display must echo the bad value so logs pinpoint the typo.
373        let msg = err.to_string();
374        assert!(msg.contains("garbage"), "display dropped value: {msg}");
375        assert!(msg.contains("invalid workflow status"));
376    }
377
378    #[test]
379    fn parse_workflow_status_error_eq_uses_inner_string() {
380        // PartialEq is derived, so equality is by inner String only.
381        assert_eq!(
382            ParseWorkflowStatusError("x".to_string()),
383            ParseWorkflowStatusError("x".to_string())
384        );
385        assert_ne!(
386            ParseWorkflowStatusError("x".to_string()),
387            ParseWorkflowStatusError("y".to_string())
388        );
389    }
390
391    #[test]
392    fn workflow_status_blocked_variants_parse_distinctly() {
393        assert_eq!(
394            "blocked_missing_version".parse::<WorkflowStatus>(),
395            Ok(WorkflowStatus::BlockedMissingVersion)
396        );
397        assert_eq!(
398            "blocked_signature_mismatch".parse::<WorkflowStatus>(),
399            Ok(WorkflowStatus::BlockedSignatureMismatch)
400        );
401        assert_eq!(
402            "blocked_missing_handler".parse::<WorkflowStatus>(),
403            Ok(WorkflowStatus::BlockedMissingHandler)
404        );
405        for blocked in [
406            WorkflowStatus::BlockedMissingVersion,
407            WorkflowStatus::BlockedSignatureMismatch,
408            WorkflowStatus::BlockedMissingHandler,
409        ] {
410            assert!(blocked.is_blocked());
411            assert!(!blocked.is_terminal());
412        }
413    }
414}