1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use serde::{Serialize, de::DeserializeOwned};
8
9use crate::Result;
10
11type CompensateFn<'a, T, C> = Arc<dyn Fn(T) -> Pin<Box<C>> + Send + Sync + 'a>;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StepStatus {
17 Pending,
19 Running,
21 Completed,
23 Failed,
25 Compensated,
27 Skipped,
29 Waiting,
31}
32
33impl StepStatus {
34 pub fn as_str(&self) -> &'static str {
36 match self {
37 Self::Pending => "pending",
38 Self::Running => "running",
39 Self::Completed => "completed",
40 Self::Failed => "failed",
41 Self::Compensated => "compensated",
42 Self::Skipped => "skipped",
43 Self::Waiting => "waiting",
44 }
45 }
46}
47
48impl FromStr for StepStatus {
49 type Err = std::convert::Infallible;
50
51 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
52 Ok(match s {
53 "pending" => Self::Pending,
54 "running" => Self::Running,
55 "completed" => Self::Completed,
56 "failed" => Self::Failed,
57 "compensated" => Self::Compensated,
58 "skipped" => Self::Skipped,
59 "waiting" => Self::Waiting,
60 _ => Self::Pending,
61 })
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct StepResult<T> {
68 pub name: String,
70 pub status: StepStatus,
72 pub value: Option<T>,
74 pub error: Option<String>,
76}
77
78pub struct Step<T> {
80 pub name: String,
82 _marker: std::marker::PhantomData<T>,
84}
85
86impl<T> Step<T> {
87 pub fn new(name: impl Into<String>) -> Self {
89 Self {
90 name: name.into(),
91 _marker: std::marker::PhantomData,
92 }
93 }
94}
95
96pub struct StepBuilder<'a, T, F, C>
98where
99 T: Serialize + DeserializeOwned + Send + 'static,
100 F: Future<Output = Result<T>> + Send + 'a,
101 C: Future<Output = Result<()>> + Send + 'a,
102{
103 name: String,
104 run_fn: Option<Pin<Box<dyn FnOnce() -> F + Send + 'a>>>,
105 compensate_fn: Option<CompensateFn<'a, T, C>>,
106 timeout: Option<Duration>,
107 retry_count: u32,
108 retry_delay: Duration,
109 optional: bool,
110 _marker: std::marker::PhantomData<(T, F, C)>,
111}
112
113impl<'a, T, F, C> StepBuilder<'a, T, F, C>
114where
115 T: Serialize + DeserializeOwned + Send + Clone + 'static,
116 F: Future<Output = Result<T>> + Send + 'a,
117 C: Future<Output = Result<()>> + Send + 'a,
118{
119 pub fn new(name: impl Into<String>) -> Self {
121 Self {
122 name: name.into(),
123 run_fn: None,
124 compensate_fn: None,
125 timeout: None,
126 retry_count: 0,
127 retry_delay: Duration::from_secs(1),
128 optional: false,
129 _marker: std::marker::PhantomData,
130 }
131 }
132
133 pub fn run<RF>(mut self, f: RF) -> Self
135 where
136 RF: FnOnce() -> F + Send + 'a,
137 {
138 self.run_fn = Some(Box::pin(f));
139 self
140 }
141
142 pub fn compensate<CF>(mut self, f: CF) -> Self
144 where
145 CF: Fn(T) -> Pin<Box<C>> + Send + Sync + 'a,
146 {
147 self.compensate_fn = Some(Arc::new(f));
148 self
149 }
150
151 pub fn timeout(mut self, duration: Duration) -> Self {
153 self.timeout = Some(duration);
154 self
155 }
156
157 pub fn retry(mut self, count: u32, delay: Duration) -> Self {
159 self.retry_count = count;
160 self.retry_delay = delay;
161 self
162 }
163
164 pub fn optional(mut self) -> Self {
166 self.optional = true;
167 self
168 }
169
170 pub fn name(&self) -> &str {
172 &self.name
173 }
174
175 pub fn is_optional(&self) -> bool {
177 self.optional
178 }
179
180 pub fn retry_count(&self) -> u32 {
182 self.retry_count
183 }
184
185 pub fn retry_delay(&self) -> Duration {
187 self.retry_delay
188 }
189
190 pub fn get_timeout(&self) -> Option<Duration> {
192 self.timeout
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct StepConfig {
199 pub name: String,
201 pub timeout: Option<Duration>,
203 pub retry_count: u32,
205 pub retry_delay: Duration,
207 pub optional: bool,
209 pub has_compensation: bool,
211}
212
213impl Default for StepConfig {
214 fn default() -> Self {
215 Self {
216 name: String::new(),
217 timeout: None,
218 retry_count: 0,
219 retry_delay: Duration::from_secs(1),
220 optional: false,
221 has_compensation: false,
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_step_status_conversion() {
232 assert_eq!(StepStatus::Pending.as_str(), "pending");
233 assert_eq!(StepStatus::Running.as_str(), "running");
234 assert_eq!(StepStatus::Completed.as_str(), "completed");
235 assert_eq!(StepStatus::Failed.as_str(), "failed");
236 assert_eq!(StepStatus::Compensated.as_str(), "compensated");
237
238 assert_eq!("pending".parse::<StepStatus>(), Ok(StepStatus::Pending));
239 assert_eq!("completed".parse::<StepStatus>(), Ok(StepStatus::Completed));
240 }
241
242 #[test]
243 fn test_step_config_default() {
244 let config = StepConfig::default();
245 assert!(config.name.is_empty());
246 assert!(!config.optional);
247 assert_eq!(config.retry_count, 0);
248 }
249}