Skip to main content

forge_core/workflow/
step.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use serde::{Serialize, de::DeserializeOwned};
8
9use crate::Result;
10
11/// Type alias for compensation function to reduce complexity.
12type CompensateFn<'a, T, C> = Arc<dyn Fn(T) -> Pin<Box<C>> + Send + Sync + 'a>;
13
14/// Step execution status.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StepStatus {
17    /// Step not yet started.
18    Pending,
19    /// Step currently running.
20    Running,
21    /// Step completed successfully.
22    Completed,
23    /// Step failed.
24    Failed,
25    /// Step compensation ran.
26    Compensated,
27    /// Step was skipped.
28    Skipped,
29    /// Step is waiting (suspended).
30    Waiting,
31}
32
33impl StepStatus {
34    /// Convert to string for database storage.
35    pub fn as_str(&self) -> &'static str {
36        match self {
37            Self::Pending => "pending",
38            Self::Running => "running",
39            Self::Completed => "completed",
40            Self::Failed => "failed",
41            Self::Compensated => "compensated",
42            Self::Skipped => "skipped",
43            Self::Waiting => "waiting",
44        }
45    }
46}
47
48impl FromStr for StepStatus {
49    type Err = std::convert::Infallible;
50
51    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
52        Ok(match s {
53            "pending" => Self::Pending,
54            "running" => Self::Running,
55            "completed" => Self::Completed,
56            "failed" => Self::Failed,
57            "compensated" => Self::Compensated,
58            "skipped" => Self::Skipped,
59            "waiting" => Self::Waiting,
60            _ => Self::Pending,
61        })
62    }
63}
64
65/// Result of a step execution.
66#[derive(Debug, Clone)]
67pub struct StepResult<T> {
68    /// Step name.
69    pub name: String,
70    /// Step status.
71    pub status: StepStatus,
72    /// Step result (if completed).
73    pub value: Option<T>,
74    /// Error message (if failed).
75    pub error: Option<String>,
76}
77
78/// A workflow step definition.
79pub struct Step<T> {
80    /// Step name.
81    pub name: String,
82    /// Step result type.
83    _marker: std::marker::PhantomData<T>,
84}
85
86impl<T> Step<T> {
87    /// Create a new step.
88    pub fn new(name: impl Into<String>) -> Self {
89        Self {
90            name: name.into(),
91            _marker: std::marker::PhantomData,
92        }
93    }
94}
95
96/// Builder for configuring and executing a step.
97pub struct StepBuilder<'a, T, F, C>
98where
99    T: Serialize + DeserializeOwned + Send + 'static,
100    F: Future<Output = Result<T>> + Send + 'a,
101    C: Future<Output = Result<()>> + Send + 'a,
102{
103    name: String,
104    run_fn: Option<Pin<Box<dyn FnOnce() -> F + Send + 'a>>>,
105    compensate_fn: Option<CompensateFn<'a, T, C>>,
106    timeout: Option<Duration>,
107    retry_count: u32,
108    retry_delay: Duration,
109    optional: bool,
110    _marker: std::marker::PhantomData<(T, F, C)>,
111}
112
113impl<'a, T, F, C> StepBuilder<'a, T, F, C>
114where
115    T: Serialize + DeserializeOwned + Send + Clone + 'static,
116    F: Future<Output = Result<T>> + Send + 'a,
117    C: Future<Output = Result<()>> + Send + 'a,
118{
119    /// Create a new step builder.
120    pub fn new(name: impl Into<String>) -> Self {
121        Self {
122            name: name.into(),
123            run_fn: None,
124            compensate_fn: None,
125            timeout: None,
126            retry_count: 0,
127            retry_delay: Duration::from_secs(1),
128            optional: false,
129            _marker: std::marker::PhantomData,
130        }
131    }
132
133    /// Set the step execution function.
134    pub fn run<RF>(mut self, f: RF) -> Self
135    where
136        RF: FnOnce() -> F + Send + 'a,
137    {
138        self.run_fn = Some(Box::pin(f));
139        self
140    }
141
142    /// Set the compensation function.
143    pub fn compensate<CF>(mut self, f: CF) -> Self
144    where
145        CF: Fn(T) -> Pin<Box<C>> + Send + Sync + 'a,
146    {
147        self.compensate_fn = Some(Arc::new(f));
148        self
149    }
150
151    /// Set step timeout.
152    pub fn timeout(mut self, duration: Duration) -> Self {
153        self.timeout = Some(duration);
154        self
155    }
156
157    /// Configure retry behavior.
158    pub fn retry(mut self, count: u32, delay: Duration) -> Self {
159        self.retry_count = count;
160        self.retry_delay = delay;
161        self
162    }
163
164    /// Mark the step as optional (failure won't trigger compensation).
165    pub fn optional(mut self) -> Self {
166        self.optional = true;
167        self
168    }
169
170    /// Get step name.
171    pub fn name(&self) -> &str {
172        &self.name
173    }
174
175    /// Check if step is optional.
176    pub fn is_optional(&self) -> bool {
177        self.optional
178    }
179
180    /// Get retry count.
181    pub fn retry_count(&self) -> u32 {
182        self.retry_count
183    }
184
185    /// Get retry delay.
186    pub fn retry_delay(&self) -> Duration {
187        self.retry_delay
188    }
189
190    /// Get timeout.
191    pub fn get_timeout(&self) -> Option<Duration> {
192        self.timeout
193    }
194}
195
196/// Configuration for a step (without closures, for storage).
197#[derive(Debug, Clone)]
198pub struct StepConfig {
199    /// Step name.
200    pub name: String,
201    /// Step timeout.
202    pub timeout: Option<Duration>,
203    /// Retry count.
204    pub retry_count: u32,
205    /// Retry delay.
206    pub retry_delay: Duration,
207    /// Whether the step is optional.
208    pub optional: bool,
209    /// Whether the step has a compensation function.
210    pub has_compensation: bool,
211}
212
213impl Default for StepConfig {
214    fn default() -> Self {
215        Self {
216            name: String::new(),
217            timeout: None,
218            retry_count: 0,
219            retry_delay: Duration::from_secs(1),
220            optional: false,
221            has_compensation: false,
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_step_status_conversion() {
232        assert_eq!(StepStatus::Pending.as_str(), "pending");
233        assert_eq!(StepStatus::Running.as_str(), "running");
234        assert_eq!(StepStatus::Completed.as_str(), "completed");
235        assert_eq!(StepStatus::Failed.as_str(), "failed");
236        assert_eq!(StepStatus::Compensated.as_str(), "compensated");
237
238        assert_eq!("pending".parse::<StepStatus>(), Ok(StepStatus::Pending));
239        assert_eq!("completed".parse::<StepStatus>(), Ok(StepStatus::Completed));
240    }
241
242    #[test]
243    fn test_step_config_default() {
244        let config = StepConfig::default();
245        assert!(config.name.is_empty());
246        assert!(!config.optional);
247        assert_eq!(config.retry_count, 0);
248    }
249}