forge_core/workflow/
traits.rs1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use super::context::WorkflowContext;
9use crate::Result;
10
11pub trait ForgeWorkflow: Send + Sync + 'static {
13 type Input: DeserializeOwned + Serialize + Send + Sync;
15 type Output: Serialize + Send;
17
18 fn info() -> WorkflowInfo;
20
21 fn execute(
23 ctx: &WorkflowContext,
24 input: Self::Input,
25 ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
26}
27
28#[derive(Debug, Clone)]
30pub struct WorkflowInfo {
31 pub name: &'static str,
33 pub version: &'static str,
35 pub signature: &'static str,
37 pub is_active: bool,
39 pub is_deprecated: bool,
41 pub timeout: Duration,
43 pub http_timeout: Option<Duration>,
45 pub is_public: bool,
47 pub required_role: Option<&'static str>,
49}
50
51impl Default for WorkflowInfo {
52 fn default() -> Self {
53 Self {
54 name: "",
55 version: "v1",
56 signature: "",
57 is_active: true,
58 is_deprecated: false,
59 timeout: Duration::from_secs(86400), http_timeout: None,
61 is_public: false,
62 required_role: None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum WorkflowStatus {
70 Created,
72 Running,
74 Waiting,
76 Completed,
78 Compensating,
80 Compensated,
82 Failed,
84 BlockedMissingVersion,
86 BlockedSignatureMismatch,
88 BlockedMissingHandler,
90 RetiredUnresumable,
92 CancelledByOperator,
94}
95
96impl WorkflowStatus {
97 pub fn as_str(&self) -> &'static str {
99 match self {
100 Self::Created => "created",
101 Self::Running => "running",
102 Self::Waiting => "waiting",
103 Self::Completed => "completed",
104 Self::Compensating => "compensating",
105 Self::Compensated => "compensated",
106 Self::Failed => "failed",
107 Self::BlockedMissingVersion => "blocked_missing_version",
108 Self::BlockedSignatureMismatch => "blocked_signature_mismatch",
109 Self::BlockedMissingHandler => "blocked_missing_handler",
110 Self::RetiredUnresumable => "retired_unresumable",
111 Self::CancelledByOperator => "cancelled_by_operator",
112 }
113 }
114
115 pub fn is_terminal(&self) -> bool {
117 matches!(
118 self,
119 Self::Completed
120 | Self::Compensated
121 | Self::Failed
122 | Self::RetiredUnresumable
123 | Self::CancelledByOperator
124 )
125 }
126
127 pub fn is_blocked(&self) -> bool {
129 matches!(
130 self,
131 Self::BlockedMissingVersion
132 | Self::BlockedSignatureMismatch
133 | Self::BlockedMissingHandler
134 )
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct ParseWorkflowStatusError(pub String);
140
141impl std::fmt::Display for ParseWorkflowStatusError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 write!(f, "invalid workflow status: '{}'", self.0)
144 }
145}
146
147impl std::error::Error for ParseWorkflowStatusError {}
148
149impl FromStr for WorkflowStatus {
150 type Err = ParseWorkflowStatusError;
151
152 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
153 match s {
154 "created" => Ok(Self::Created),
155 "running" => Ok(Self::Running),
156 "waiting" => Ok(Self::Waiting),
157 "completed" => Ok(Self::Completed),
158 "compensating" => Ok(Self::Compensating),
159 "compensated" => Ok(Self::Compensated),
160 "failed" => Ok(Self::Failed),
161 "blocked_missing_version" => Ok(Self::BlockedMissingVersion),
162 "blocked_signature_mismatch" => Ok(Self::BlockedSignatureMismatch),
163 "blocked_missing_handler" => Ok(Self::BlockedMissingHandler),
164 "retired_unresumable" => Ok(Self::RetiredUnresumable),
165 "cancelled_by_operator" => Ok(Self::CancelledByOperator),
166 _ => Err(ParseWorkflowStatusError(s.to_string())),
167 }
168 }
169}
170
171#[cfg(test)]
172#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn test_workflow_info_default() {
178 let info = WorkflowInfo::default();
179 assert_eq!(info.name, "");
180 assert_eq!(info.version, "v1");
181 assert!(info.is_active);
182 assert!(!info.is_deprecated);
183 }
184
185 #[test]
186 fn test_workflow_status_conversion() {
187 assert_eq!(WorkflowStatus::Running.as_str(), "running");
188 assert_eq!(WorkflowStatus::Completed.as_str(), "completed");
189 assert_eq!(WorkflowStatus::Compensating.as_str(), "compensating");
190 assert_eq!(
191 WorkflowStatus::BlockedMissingVersion.as_str(),
192 "blocked_missing_version"
193 );
194 assert_eq!(
195 WorkflowStatus::CancelledByOperator.as_str(),
196 "cancelled_by_operator"
197 );
198
199 assert_eq!(
200 "running".parse::<WorkflowStatus>(),
201 Ok(WorkflowStatus::Running)
202 );
203 assert_eq!(
204 "completed".parse::<WorkflowStatus>(),
205 Ok(WorkflowStatus::Completed)
206 );
207 assert_eq!(
208 "blocked_missing_version".parse::<WorkflowStatus>(),
209 Ok(WorkflowStatus::BlockedMissingVersion)
210 );
211 assert_eq!(
212 "cancelled_by_operator".parse::<WorkflowStatus>(),
213 Ok(WorkflowStatus::CancelledByOperator)
214 );
215 }
216
217 #[test]
218 fn test_workflow_status_is_terminal() {
219 assert!(!WorkflowStatus::Running.is_terminal());
220 assert!(!WorkflowStatus::Waiting.is_terminal());
221 assert!(WorkflowStatus::Completed.is_terminal());
222 assert!(WorkflowStatus::Failed.is_terminal());
223 assert!(WorkflowStatus::Compensated.is_terminal());
224 assert!(WorkflowStatus::RetiredUnresumable.is_terminal());
225 assert!(WorkflowStatus::CancelledByOperator.is_terminal());
226 }
227
228 #[test]
229 fn test_workflow_status_is_blocked() {
230 assert!(!WorkflowStatus::Running.is_blocked());
231 assert!(WorkflowStatus::BlockedMissingVersion.is_blocked());
232 assert!(WorkflowStatus::BlockedSignatureMismatch.is_blocked());
233 assert!(WorkflowStatus::BlockedMissingHandler.is_blocked());
234 }
235}