1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use serde::{Serialize, de::DeserializeOwned};
8
9use crate::Result;
10
11type CompensateFn<'a, T, C> = Arc<dyn Fn(T) -> Pin<Box<C>> + Send + Sync + 'a>;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StepStatus {
17 Pending,
19 Running,
21 Completed,
23 Failed,
25 Compensated,
27 Skipped,
29 Waiting,
31}
32
33impl StepStatus {
34 pub fn as_str(&self) -> &'static str {
36 match self {
37 Self::Pending => "pending",
38 Self::Running => "running",
39 Self::Completed => "completed",
40 Self::Failed => "failed",
41 Self::Compensated => "compensated",
42 Self::Skipped => "skipped",
43 Self::Waiting => "waiting",
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct ParseStepStatusError(pub String);
50
51impl std::fmt::Display for ParseStepStatusError {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "invalid step status: '{}'", self.0)
54 }
55}
56
57impl std::error::Error for ParseStepStatusError {}
58
59impl FromStr for StepStatus {
60 type Err = ParseStepStatusError;
61
62 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
63 match s {
64 "pending" => Ok(Self::Pending),
65 "running" => Ok(Self::Running),
66 "completed" => Ok(Self::Completed),
67 "failed" => Ok(Self::Failed),
68 "compensated" => Ok(Self::Compensated),
69 "skipped" => Ok(Self::Skipped),
70 "waiting" => Ok(Self::Waiting),
71 _ => Err(ParseStepStatusError(s.to_string())),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct StepResult<T> {
79 pub name: String,
81 pub status: StepStatus,
83 pub value: Option<T>,
85 pub error: Option<String>,
87}
88
89pub struct Step<T> {
91 pub name: String,
93 _marker: std::marker::PhantomData<T>,
95}
96
97impl<T> Step<T> {
98 pub fn new(name: impl Into<String>) -> Self {
100 Self {
101 name: name.into(),
102 _marker: std::marker::PhantomData,
103 }
104 }
105}
106
107pub struct StepBuilder<'a, T, F, C>
109where
110 T: Serialize + DeserializeOwned + Send + 'static,
111 F: Future<Output = Result<T>> + Send + 'a,
112 C: Future<Output = Result<()>> + Send + 'a,
113{
114 name: String,
115 run_fn: Option<Pin<Box<dyn FnOnce() -> F + Send + 'a>>>,
116 compensate_fn: Option<CompensateFn<'a, T, C>>,
117 timeout: Option<Duration>,
118 retry_count: u32,
119 retry_delay: Duration,
120 optional: bool,
121 _marker: std::marker::PhantomData<(T, F, C)>,
122}
123
124impl<'a, T, F, C> StepBuilder<'a, T, F, C>
125where
126 T: Serialize + DeserializeOwned + Send + Clone + 'static,
127 F: Future<Output = Result<T>> + Send + 'a,
128 C: Future<Output = Result<()>> + Send + 'a,
129{
130 pub fn new(name: impl Into<String>) -> Self {
132 Self {
133 name: name.into(),
134 run_fn: None,
135 compensate_fn: None,
136 timeout: None,
137 retry_count: 0,
138 retry_delay: Duration::from_secs(1),
139 optional: false,
140 _marker: std::marker::PhantomData,
141 }
142 }
143
144 pub fn run<RF>(mut self, f: RF) -> Self
146 where
147 RF: FnOnce() -> F + Send + 'a,
148 {
149 self.run_fn = Some(Box::pin(f));
150 self
151 }
152
153 pub fn compensate<CF>(mut self, f: CF) -> Self
155 where
156 CF: Fn(T) -> Pin<Box<C>> + Send + Sync + 'a,
157 {
158 self.compensate_fn = Some(Arc::new(f));
159 self
160 }
161
162 pub fn timeout(mut self, duration: Duration) -> Self {
164 self.timeout = Some(duration);
165 self
166 }
167
168 pub fn retry(mut self, count: u32, delay: Duration) -> Self {
170 self.retry_count = count;
171 self.retry_delay = delay;
172 self
173 }
174
175 pub fn optional(mut self) -> Self {
177 self.optional = true;
178 self
179 }
180
181 pub fn name(&self) -> &str {
183 &self.name
184 }
185
186 pub fn is_optional(&self) -> bool {
188 self.optional
189 }
190
191 pub fn retry_count(&self) -> u32 {
193 self.retry_count
194 }
195
196 pub fn retry_delay(&self) -> Duration {
198 self.retry_delay
199 }
200
201 pub fn get_timeout(&self) -> Option<Duration> {
203 self.timeout
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct StepConfig {
210 pub name: String,
212 pub timeout: Option<Duration>,
214 pub retry_count: u32,
216 pub retry_delay: Duration,
218 pub optional: bool,
220 pub has_compensation: bool,
222}
223
224impl Default for StepConfig {
225 fn default() -> Self {
226 Self {
227 name: String::new(),
228 timeout: None,
229 retry_count: 0,
230 retry_delay: Duration::from_secs(1),
231 optional: false,
232 has_compensation: false,
233 }
234 }
235}
236
237#[cfg(test)]
238#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn test_step_status_conversion() {
244 assert_eq!(StepStatus::Pending.as_str(), "pending");
245 assert_eq!(StepStatus::Running.as_str(), "running");
246 assert_eq!(StepStatus::Completed.as_str(), "completed");
247 assert_eq!(StepStatus::Failed.as_str(), "failed");
248 assert_eq!(StepStatus::Compensated.as_str(), "compensated");
249
250 assert_eq!("pending".parse::<StepStatus>(), Ok(StepStatus::Pending));
251 assert_eq!("completed".parse::<StepStatus>(), Ok(StepStatus::Completed));
252 }
253
254 #[test]
255 fn test_step_config_default() {
256 let config = StepConfig::default();
257 assert!(config.name.is_empty());
258 assert!(!config.optional);
259 assert_eq!(config.retry_count, 0);
260 }
261}