1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use super::context::WorkflowContext;
9use crate::Result;
10
11pub trait ForgeWorkflow: crate::__sealed::Sealed + Send + Sync + 'static {
13 type Input: DeserializeOwned + Serialize + Send + Sync;
14 type Output: Serialize + Send;
15
16 fn info() -> WorkflowInfo;
17
18 fn execute(
19 ctx: &WorkflowContext,
20 input: Self::Input,
21 ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26#[non_exhaustive]
27pub enum WorkflowDefStatus {
28 #[default]
29 Active,
30 Deprecated,
31 Staging,
32}
33
34impl WorkflowDefStatus {
35 pub fn as_str(self) -> &'static str {
36 match self {
37 Self::Active => "active",
38 Self::Deprecated => "deprecated",
39 Self::Staging => "staging",
40 }
41 }
42
43 pub fn is_active(self) -> bool {
44 matches!(self, Self::Active)
45 }
46
47 pub fn is_deprecated(self) -> bool {
48 matches!(self, Self::Deprecated)
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct WorkflowInfo {
55 pub name: &'static str,
56 pub version: &'static str,
57 pub signature: &'static str,
59 pub status: WorkflowDefStatus,
60 pub timeout: Duration,
61 pub http_timeout: Option<Duration>,
62 pub is_public: bool,
63 pub required_role: Option<&'static str>,
64}
65
66impl WorkflowInfo {
67 pub fn is_active(&self) -> bool {
68 self.status.is_active()
69 }
70
71 pub fn is_deprecated(&self) -> bool {
72 self.status.is_deprecated()
73 }
74}
75
76impl Default for WorkflowInfo {
77 fn default() -> Self {
78 Self {
79 name: "",
80 version: "v1",
81 signature: "",
82 status: WorkflowDefStatus::Active,
83 timeout: Duration::from_secs(86400), http_timeout: None,
85 is_public: false,
86 required_role: None,
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98#[non_exhaustive]
99pub enum WorkflowStatus {
100 Pending,
101 Running,
102 Sleeping,
103 Waiting,
104 Completed,
105 Failed,
106 Compensating,
110 Compensated,
112 Retired,
115 CancelledByOperator,
118 BlockedMissingVersion,
119 BlockedSignatureMismatch,
120 BlockedMissingHandler,
121}
122
123impl WorkflowStatus {
124 pub fn as_str(&self) -> &'static str {
125 match self {
126 Self::Pending => "pending",
127 Self::Running => "running",
128 Self::Sleeping => "sleeping",
129 Self::Waiting => "waiting",
130 Self::Completed => "completed",
131 Self::Failed => "failed",
132 Self::Compensating => "compensating",
133 Self::Compensated => "compensated",
134 Self::Retired => "retired_unresumable",
135 Self::CancelledByOperator => "cancelled_by_operator",
136 Self::BlockedMissingVersion => "blocked_missing_version",
137 Self::BlockedSignatureMismatch => "blocked_signature_mismatch",
138 Self::BlockedMissingHandler => "blocked_missing_handler",
139 }
140 }
141
142 pub fn is_terminal(&self) -> bool {
143 matches!(
144 self,
145 Self::Completed
146 | Self::Failed
147 | Self::Compensated
148 | Self::Retired
149 | Self::CancelledByOperator
150 )
151 }
152
153 pub fn is_blocked(&self) -> bool {
154 matches!(
155 self,
156 Self::BlockedMissingVersion
157 | Self::BlockedSignatureMismatch
158 | Self::BlockedMissingHandler
159 )
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct ParseWorkflowStatusError(pub String);
165
166impl std::fmt::Display for ParseWorkflowStatusError {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 write!(f, "invalid workflow status: '{}'", self.0)
169 }
170}
171
172impl std::error::Error for ParseWorkflowStatusError {}
173
174impl FromStr for WorkflowStatus {
175 type Err = ParseWorkflowStatusError;
176
177 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
178 match s {
179 "pending" | "created" => Ok(Self::Pending),
180 "running" => Ok(Self::Running),
181 "sleeping" => Ok(Self::Sleeping),
182 "waiting" => Ok(Self::Waiting),
183 "completed" => Ok(Self::Completed),
184 "failed" => Ok(Self::Failed),
185 "compensating" => Ok(Self::Compensating),
186 "compensated" => Ok(Self::Compensated),
187 "retired_unresumable" => Ok(Self::Retired),
188 "cancelled_by_operator" => Ok(Self::CancelledByOperator),
189 "blocked_missing_version" => Ok(Self::BlockedMissingVersion),
190 "blocked_signature_mismatch" => Ok(Self::BlockedSignatureMismatch),
191 "blocked_missing_handler" => Ok(Self::BlockedMissingHandler),
192 _ => Err(ParseWorkflowStatusError(s.to_string())),
193 }
194 }
195}
196
197#[cfg(test)]
198#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn test_workflow_info_default() {
204 let info = WorkflowInfo::default();
205 assert_eq!(info.name, "");
206 assert_eq!(info.version, "v1");
207 assert_eq!(info.status, WorkflowDefStatus::Active);
208 assert!(info.is_active());
209 assert!(!info.is_deprecated());
210 }
211
212 #[test]
213 fn test_workflow_status_conversion() {
214 assert_eq!(WorkflowStatus::Pending.as_str(), "pending");
215 assert_eq!(WorkflowStatus::Running.as_str(), "running");
216 assert_eq!(WorkflowStatus::Sleeping.as_str(), "sleeping");
217 assert_eq!(WorkflowStatus::Waiting.as_str(), "waiting");
218 assert_eq!(WorkflowStatus::Completed.as_str(), "completed");
219 assert_eq!(WorkflowStatus::Failed.as_str(), "failed");
220
221 assert_eq!(
222 "pending".parse::<WorkflowStatus>(),
223 Ok(WorkflowStatus::Pending)
224 );
225 assert_eq!(
226 "running".parse::<WorkflowStatus>(),
227 Ok(WorkflowStatus::Running)
228 );
229 assert_eq!(
230 "sleeping".parse::<WorkflowStatus>(),
231 Ok(WorkflowStatus::Sleeping)
232 );
233 }
234
235 #[test]
236 fn test_workflow_status_distinct_parsing() {
237 assert_eq!(
240 "created".parse::<WorkflowStatus>(),
241 Ok(WorkflowStatus::Pending)
242 );
243 assert_eq!(
244 "compensating".parse::<WorkflowStatus>(),
245 Ok(WorkflowStatus::Compensating)
246 );
247 assert_eq!(
248 "compensated".parse::<WorkflowStatus>(),
249 Ok(WorkflowStatus::Compensated)
250 );
251 assert_eq!(
252 "retired_unresumable".parse::<WorkflowStatus>(),
253 Ok(WorkflowStatus::Retired)
254 );
255 assert_eq!(
256 "cancelled_by_operator".parse::<WorkflowStatus>(),
257 Ok(WorkflowStatus::CancelledByOperator)
258 );
259 assert_eq!(
260 "blocked_missing_version".parse::<WorkflowStatus>(),
261 Ok(WorkflowStatus::BlockedMissingVersion)
262 );
263 assert_eq!(
264 "blocked_signature_mismatch".parse::<WorkflowStatus>(),
265 Ok(WorkflowStatus::BlockedSignatureMismatch)
266 );
267 assert_eq!(
268 "blocked_missing_handler".parse::<WorkflowStatus>(),
269 Ok(WorkflowStatus::BlockedMissingHandler)
270 );
271 }
272
273 #[test]
274 fn test_workflow_status_is_terminal() {
275 assert!(!WorkflowStatus::Running.is_terminal());
276 assert!(!WorkflowStatus::Waiting.is_terminal());
277 assert!(!WorkflowStatus::Sleeping.is_terminal());
278 assert!(!WorkflowStatus::Pending.is_terminal());
279 assert!(!WorkflowStatus::Compensating.is_terminal());
281 assert!(!WorkflowStatus::BlockedMissingVersion.is_terminal());
282 assert!(!WorkflowStatus::BlockedSignatureMismatch.is_terminal());
283 assert!(!WorkflowStatus::BlockedMissingHandler.is_terminal());
284 assert!(WorkflowStatus::Completed.is_terminal());
285 assert!(WorkflowStatus::Failed.is_terminal());
286 assert!(WorkflowStatus::Compensated.is_terminal());
288 assert!(WorkflowStatus::Retired.is_terminal());
289 assert!(WorkflowStatus::CancelledByOperator.is_terminal());
290 }
291
292 #[test]
293 fn workflow_status_terminal_variants_round_trip_as_str() {
294 for variant in [
295 WorkflowStatus::Compensating,
296 WorkflowStatus::Compensated,
297 WorkflowStatus::Retired,
298 WorkflowStatus::CancelledByOperator,
299 ] {
300 let s = variant.as_str();
301 let parsed: WorkflowStatus = s.parse().expect("round trip");
302 assert_eq!(parsed, variant, "{s} did not round-trip");
303 }
304 }
305
306 #[test]
307 fn workflow_def_status_default_is_active() {
308 assert_eq!(WorkflowDefStatus::default(), WorkflowDefStatus::Active);
311 }
312
313 #[test]
314 fn workflow_def_status_as_str_round_trips_all_variants() {
315 assert_eq!(WorkflowDefStatus::Active.as_str(), "active");
316 assert_eq!(WorkflowDefStatus::Deprecated.as_str(), "deprecated");
317 assert_eq!(WorkflowDefStatus::Staging.as_str(), "staging");
318 }
319
320 #[test]
321 fn workflow_def_status_active_predicate_only_matches_active() {
322 assert!(WorkflowDefStatus::Active.is_active());
323 assert!(!WorkflowDefStatus::Deprecated.is_active());
324 assert!(!WorkflowDefStatus::Staging.is_active());
325 }
326
327 #[test]
328 fn workflow_def_status_deprecated_predicate_only_matches_deprecated() {
329 assert!(!WorkflowDefStatus::Active.is_deprecated());
330 assert!(WorkflowDefStatus::Deprecated.is_deprecated());
331 assert!(!WorkflowDefStatus::Staging.is_deprecated());
332 }
333
334 #[test]
335 fn workflow_info_active_and_deprecated_track_status() {
336 let deprecated = WorkflowInfo {
337 status: WorkflowDefStatus::Deprecated,
338 ..WorkflowInfo::default()
339 };
340 assert!(!deprecated.is_active());
341 assert!(deprecated.is_deprecated());
342
343 let staging = WorkflowInfo {
344 status: WorkflowDefStatus::Staging,
345 ..WorkflowInfo::default()
346 };
347 assert!(!staging.is_active());
348 assert!(!staging.is_deprecated());
349
350 let active = WorkflowInfo {
351 status: WorkflowDefStatus::Active,
352 ..WorkflowInfo::default()
353 };
354 assert!(active.is_active());
355 assert!(!active.is_deprecated());
356 }
357
358 #[test]
359 fn workflow_info_default_timeout_is_one_day() {
360 let info = WorkflowInfo::default();
361 assert_eq!(info.timeout, Duration::from_secs(86_400));
362 assert!(info.http_timeout.is_none());
363 assert!(!info.is_public);
364 assert!(info.required_role.is_none());
365 assert!(info.signature.is_empty());
366 }
367
368 #[test]
369 fn workflow_status_parse_rejects_unknown() {
370 let err = "garbage".parse::<WorkflowStatus>().unwrap_err();
371 assert_eq!(err.0, "garbage");
372 let msg = err.to_string();
374 assert!(msg.contains("garbage"), "display dropped value: {msg}");
375 assert!(msg.contains("invalid workflow status"));
376 }
377
378 #[test]
379 fn parse_workflow_status_error_eq_uses_inner_string() {
380 assert_eq!(
382 ParseWorkflowStatusError("x".to_string()),
383 ParseWorkflowStatusError("x".to_string())
384 );
385 assert_ne!(
386 ParseWorkflowStatusError("x".to_string()),
387 ParseWorkflowStatusError("y".to_string())
388 );
389 }
390
391 #[test]
392 fn workflow_status_blocked_variants_parse_distinctly() {
393 assert_eq!(
394 "blocked_missing_version".parse::<WorkflowStatus>(),
395 Ok(WorkflowStatus::BlockedMissingVersion)
396 );
397 assert_eq!(
398 "blocked_signature_mismatch".parse::<WorkflowStatus>(),
399 Ok(WorkflowStatus::BlockedSignatureMismatch)
400 );
401 assert_eq!(
402 "blocked_missing_handler".parse::<WorkflowStatus>(),
403 Ok(WorkflowStatus::BlockedMissingHandler)
404 );
405 for blocked in [
406 WorkflowStatus::BlockedMissingVersion,
407 WorkflowStatus::BlockedSignatureMismatch,
408 WorkflowStatus::BlockedMissingHandler,
409 ] {
410 assert!(blocked.is_blocked());
411 assert!(!blocked.is_terminal());
412 }
413 }
414}