Skip to main content

forge_core/workflow/
step.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use serde::{Serialize, de::DeserializeOwned};
8
9use crate::Result;
10
11/// Type alias for compensation function to reduce complexity.
12type CompensateFn<'a, T, C> = Arc<dyn Fn(T) -> Pin<Box<C>> + Send + Sync + 'a>;
13
14/// Step execution status.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StepStatus {
17    /// Step not yet started.
18    Pending,
19    /// Step currently running.
20    Running,
21    /// Step completed successfully.
22    Completed,
23    /// Step failed.
24    Failed,
25    /// Step compensation ran.
26    Compensated,
27    /// Step was skipped.
28    Skipped,
29    /// Step is waiting (suspended).
30    Waiting,
31}
32
33impl StepStatus {
34    /// Convert to string for database storage.
35    pub fn as_str(&self) -> &'static str {
36        match self {
37            Self::Pending => "pending",
38            Self::Running => "running",
39            Self::Completed => "completed",
40            Self::Failed => "failed",
41            Self::Compensated => "compensated",
42            Self::Skipped => "skipped",
43            Self::Waiting => "waiting",
44        }
45    }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct ParseStepStatusError(pub String);
50
51impl std::fmt::Display for ParseStepStatusError {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "invalid step status: '{}'", self.0)
54    }
55}
56
57impl std::error::Error for ParseStepStatusError {}
58
59impl FromStr for StepStatus {
60    type Err = ParseStepStatusError;
61
62    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
63        match s {
64            "pending" => Ok(Self::Pending),
65            "running" => Ok(Self::Running),
66            "completed" => Ok(Self::Completed),
67            "failed" => Ok(Self::Failed),
68            "compensated" => Ok(Self::Compensated),
69            "skipped" => Ok(Self::Skipped),
70            "waiting" => Ok(Self::Waiting),
71            _ => Err(ParseStepStatusError(s.to_string())),
72        }
73    }
74}
75
76/// Result of a step execution.
77#[derive(Debug, Clone)]
78pub struct StepResult<T> {
79    /// Step name.
80    pub name: String,
81    /// Step status.
82    pub status: StepStatus,
83    /// Step result (if completed).
84    pub value: Option<T>,
85    /// Error message (if failed).
86    pub error: Option<String>,
87}
88
89/// A workflow step definition.
90pub struct Step<T> {
91    /// Step name.
92    pub name: String,
93    /// Step result type.
94    _marker: std::marker::PhantomData<T>,
95}
96
97impl<T> Step<T> {
98    /// Create a new step.
99    pub fn new(name: impl Into<String>) -> Self {
100        Self {
101            name: name.into(),
102            _marker: std::marker::PhantomData,
103        }
104    }
105}
106
107/// Builder for configuring and executing a step.
108pub struct StepBuilder<'a, T, F, C>
109where
110    T: Serialize + DeserializeOwned + Send + 'static,
111    F: Future<Output = Result<T>> + Send + 'a,
112    C: Future<Output = Result<()>> + Send + 'a,
113{
114    name: String,
115    run_fn: Option<Pin<Box<dyn FnOnce() -> F + Send + 'a>>>,
116    compensate_fn: Option<CompensateFn<'a, T, C>>,
117    timeout: Option<Duration>,
118    retry_count: u32,
119    retry_delay: Duration,
120    optional: bool,
121    _marker: std::marker::PhantomData<(T, F, C)>,
122}
123
124impl<'a, T, F, C> StepBuilder<'a, T, F, C>
125where
126    T: Serialize + DeserializeOwned + Send + Clone + 'static,
127    F: Future<Output = Result<T>> + Send + 'a,
128    C: Future<Output = Result<()>> + Send + 'a,
129{
130    /// Create a new step builder.
131    pub fn new(name: impl Into<String>) -> Self {
132        Self {
133            name: name.into(),
134            run_fn: None,
135            compensate_fn: None,
136            timeout: None,
137            retry_count: 0,
138            retry_delay: Duration::from_secs(1),
139            optional: false,
140            _marker: std::marker::PhantomData,
141        }
142    }
143
144    /// Set the step execution function.
145    pub fn run<RF>(mut self, f: RF) -> Self
146    where
147        RF: FnOnce() -> F + Send + 'a,
148    {
149        self.run_fn = Some(Box::pin(f));
150        self
151    }
152
153    /// Set the compensation function.
154    pub fn compensate<CF>(mut self, f: CF) -> Self
155    where
156        CF: Fn(T) -> Pin<Box<C>> + Send + Sync + 'a,
157    {
158        self.compensate_fn = Some(Arc::new(f));
159        self
160    }
161
162    /// Set step timeout.
163    pub fn timeout(mut self, duration: Duration) -> Self {
164        self.timeout = Some(duration);
165        self
166    }
167
168    /// Configure retry behavior.
169    pub fn retry(mut self, count: u32, delay: Duration) -> Self {
170        self.retry_count = count;
171        self.retry_delay = delay;
172        self
173    }
174
175    /// Mark the step as optional (failure won't trigger compensation).
176    pub fn optional(mut self) -> Self {
177        self.optional = true;
178        self
179    }
180
181    /// Get step name.
182    pub fn name(&self) -> &str {
183        &self.name
184    }
185
186    /// Check if step is optional.
187    pub fn is_optional(&self) -> bool {
188        self.optional
189    }
190
191    /// Get retry count.
192    pub fn retry_count(&self) -> u32 {
193        self.retry_count
194    }
195
196    /// Get retry delay.
197    pub fn retry_delay(&self) -> Duration {
198        self.retry_delay
199    }
200
201    /// Get timeout.
202    pub fn get_timeout(&self) -> Option<Duration> {
203        self.timeout
204    }
205}
206
207/// Configuration for a step (without closures, for storage).
208#[derive(Debug, Clone)]
209pub struct StepConfig {
210    /// Step name.
211    pub name: String,
212    /// Step timeout.
213    pub timeout: Option<Duration>,
214    /// Retry count.
215    pub retry_count: u32,
216    /// Retry delay.
217    pub retry_delay: Duration,
218    /// Whether the step is optional.
219    pub optional: bool,
220    /// Whether the step has a compensation function.
221    pub has_compensation: bool,
222}
223
224impl Default for StepConfig {
225    fn default() -> Self {
226        Self {
227            name: String::new(),
228            timeout: None,
229            retry_count: 0,
230            retry_delay: Duration::from_secs(1),
231            optional: false,
232            has_compensation: false,
233        }
234    }
235}
236
237#[cfg(test)]
238#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn test_step_status_conversion() {
244        assert_eq!(StepStatus::Pending.as_str(), "pending");
245        assert_eq!(StepStatus::Running.as_str(), "running");
246        assert_eq!(StepStatus::Completed.as_str(), "completed");
247        assert_eq!(StepStatus::Failed.as_str(), "failed");
248        assert_eq!(StepStatus::Compensated.as_str(), "compensated");
249
250        assert_eq!("pending".parse::<StepStatus>(), Ok(StepStatus::Pending));
251        assert_eq!("completed".parse::<StepStatus>(), Ok(StepStatus::Completed));
252    }
253
254    #[test]
255    fn test_step_config_default() {
256        let config = StepConfig::default();
257        assert!(config.name.is_empty());
258        assert!(!config.optional);
259        assert_eq!(config.retry_count, 0);
260    }
261}