1use chrono::Utc;
7use ironflow_store::entities::RunStatus;
8use serde::{Deserialize, Serialize};
9
10use super::{Transition, TransitionError};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum RunEvent {
27 PickedUp,
29 AllStepsCompleted,
31 StepFailed,
33 StepFailedRetryable,
35 RetryStarted,
37 MaxRetriesExceeded,
39 CancelRequested,
41}
42
43impl std::fmt::Display for RunEvent {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 RunEvent::PickedUp => f.write_str("picked_up"),
47 RunEvent::AllStepsCompleted => f.write_str("all_steps_completed"),
48 RunEvent::StepFailed => f.write_str("step_failed"),
49 RunEvent::StepFailedRetryable => f.write_str("step_failed_retryable"),
50 RunEvent::RetryStarted => f.write_str("retry_started"),
51 RunEvent::MaxRetriesExceeded => f.write_str("max_retries_exceeded"),
52 RunEvent::CancelRequested => f.write_str("cancel_requested"),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
93pub struct RunFsm {
94 state: RunStatus,
95 history: Vec<Transition<RunStatus, RunEvent>>,
96}
97
98impl RunFsm {
99 pub fn new() -> Self {
111 Self {
112 state: RunStatus::Pending,
113 history: Vec::new(),
114 }
115 }
116
117 pub fn from_state(state: RunStatus) -> Self {
129 Self {
130 state,
131 history: Vec::new(),
132 }
133 }
134
135 pub fn state(&self) -> RunStatus {
137 self.state
138 }
139
140 pub fn history(&self) -> &[Transition<RunStatus, RunEvent>] {
142 &self.history
143 }
144
145 pub fn is_terminal(&self) -> bool {
147 self.state.is_terminal()
148 }
149
150 pub fn apply(
171 &mut self,
172 event: RunEvent,
173 ) -> Result<RunStatus, TransitionError<RunStatus, RunEvent>> {
174 let next = next_state(self.state, event).ok_or(TransitionError {
175 from: self.state,
176 event,
177 })?;
178
179 let transition = Transition {
180 from: self.state,
181 to: next,
182 event,
183 at: Utc::now(),
184 };
185
186 self.history.push(transition);
187 self.state = next;
188 Ok(next)
189 }
190
191 pub fn can_apply(&self, event: RunEvent) -> bool {
203 next_state(self.state, event).is_some()
204 }
205}
206
207impl Default for RunFsm {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213fn next_state(from: RunStatus, event: RunEvent) -> Option<RunStatus> {
216 match (from, event) {
217 (RunStatus::Pending, RunEvent::PickedUp) => Some(RunStatus::Running),
219 (RunStatus::Pending, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
220
221 (RunStatus::Running, RunEvent::AllStepsCompleted) => Some(RunStatus::Completed),
223 (RunStatus::Running, RunEvent::StepFailed) => Some(RunStatus::Failed),
224 (RunStatus::Running, RunEvent::StepFailedRetryable) => Some(RunStatus::Retrying),
225 (RunStatus::Running, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
226
227 (RunStatus::Retrying, RunEvent::RetryStarted) => Some(RunStatus::Running),
229 (RunStatus::Retrying, RunEvent::MaxRetriesExceeded) => Some(RunStatus::Failed),
230 (RunStatus::Retrying, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
231
232 _ => None,
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
244 fn pending_to_running() {
245 let mut fsm = RunFsm::new();
246 let result = fsm.apply(RunEvent::PickedUp);
247 assert!(result.is_ok());
248 assert_eq!(fsm.state(), RunStatus::Running);
249 }
250
251 #[test]
252 fn full_success_path() {
253 let mut fsm = RunFsm::new();
254 fsm.apply(RunEvent::PickedUp).unwrap();
255 fsm.apply(RunEvent::AllStepsCompleted).unwrap();
256 assert_eq!(fsm.state(), RunStatus::Completed);
257 assert!(fsm.is_terminal());
258 assert_eq!(fsm.history().len(), 2);
259 }
260
261 #[test]
262 fn full_failure_path() {
263 let mut fsm = RunFsm::new();
264 fsm.apply(RunEvent::PickedUp).unwrap();
265 fsm.apply(RunEvent::StepFailed).unwrap();
266 assert_eq!(fsm.state(), RunStatus::Failed);
267 assert!(fsm.is_terminal());
268 }
269
270 #[test]
271 fn retry_then_success() {
272 let mut fsm = RunFsm::new();
273 fsm.apply(RunEvent::PickedUp).unwrap();
274 fsm.apply(RunEvent::StepFailedRetryable).unwrap();
275 assert_eq!(fsm.state(), RunStatus::Retrying);
276
277 fsm.apply(RunEvent::RetryStarted).unwrap();
278 assert_eq!(fsm.state(), RunStatus::Running);
279
280 fsm.apply(RunEvent::AllStepsCompleted).unwrap();
281 assert_eq!(fsm.state(), RunStatus::Completed);
282 assert_eq!(fsm.history().len(), 4);
283 }
284
285 #[test]
286 fn retry_then_max_retries_exceeded() {
287 let mut fsm = RunFsm::new();
288 fsm.apply(RunEvent::PickedUp).unwrap();
289 fsm.apply(RunEvent::StepFailedRetryable).unwrap();
290 fsm.apply(RunEvent::MaxRetriesExceeded).unwrap();
291 assert_eq!(fsm.state(), RunStatus::Failed);
292 }
293
294 #[test]
295 fn cancel_from_pending() {
296 let mut fsm = RunFsm::new();
297 fsm.apply(RunEvent::CancelRequested).unwrap();
298 assert_eq!(fsm.state(), RunStatus::Cancelled);
299 assert!(fsm.is_terminal());
300 }
301
302 #[test]
303 fn cancel_from_running() {
304 let mut fsm = RunFsm::new();
305 fsm.apply(RunEvent::PickedUp).unwrap();
306 fsm.apply(RunEvent::CancelRequested).unwrap();
307 assert_eq!(fsm.state(), RunStatus::Cancelled);
308 }
309
310 #[test]
311 fn cancel_from_retrying() {
312 let mut fsm = RunFsm::new();
313 fsm.apply(RunEvent::PickedUp).unwrap();
314 fsm.apply(RunEvent::StepFailedRetryable).unwrap();
315 fsm.apply(RunEvent::CancelRequested).unwrap();
316 assert_eq!(fsm.state(), RunStatus::Cancelled);
317 }
318
319 #[test]
322 fn cannot_complete_from_pending() {
323 let mut fsm = RunFsm::new();
324 let result = fsm.apply(RunEvent::AllStepsCompleted);
325 assert!(result.is_err());
326 assert_eq!(fsm.state(), RunStatus::Pending);
327 }
328
329 #[test]
330 fn cannot_pick_up_running() {
331 let mut fsm = RunFsm::new();
332 fsm.apply(RunEvent::PickedUp).unwrap();
333 let result = fsm.apply(RunEvent::PickedUp);
334 assert!(result.is_err());
335 }
336
337 #[test]
338 fn cannot_transition_from_terminal() {
339 let mut fsm = RunFsm::new();
340 fsm.apply(RunEvent::PickedUp).unwrap();
341 fsm.apply(RunEvent::AllStepsCompleted).unwrap();
342
343 assert!(fsm.apply(RunEvent::PickedUp).is_err());
344 assert!(fsm.apply(RunEvent::CancelRequested).is_err());
345 assert!(fsm.apply(RunEvent::StepFailed).is_err());
346 }
347
348 #[test]
351 fn can_apply_checks_without_mutation() {
352 let fsm = RunFsm::new();
353 assert!(fsm.can_apply(RunEvent::PickedUp));
354 assert!(fsm.can_apply(RunEvent::CancelRequested));
355 assert!(!fsm.can_apply(RunEvent::AllStepsCompleted));
356 assert!(!fsm.can_apply(RunEvent::StepFailed));
357 assert_eq!(fsm.state(), RunStatus::Pending);
358 }
359
360 #[test]
363 fn from_state_resumes_at_given_state() {
364 let mut fsm = RunFsm::from_state(RunStatus::Running);
365 assert_eq!(fsm.state(), RunStatus::Running);
366 assert!(fsm.history().is_empty());
367
368 fsm.apply(RunEvent::AllStepsCompleted).unwrap();
369 assert_eq!(fsm.state(), RunStatus::Completed);
370 }
371
372 #[test]
375 fn history_records_transitions() {
376 let mut fsm = RunFsm::new();
377 fsm.apply(RunEvent::PickedUp).unwrap();
378 fsm.apply(RunEvent::StepFailedRetryable).unwrap();
379 fsm.apply(RunEvent::RetryStarted).unwrap();
380
381 let history = fsm.history();
382 assert_eq!(history.len(), 3);
383
384 assert_eq!(history[0].from, RunStatus::Pending);
385 assert_eq!(history[0].to, RunStatus::Running);
386 assert_eq!(history[0].event, RunEvent::PickedUp);
387
388 assert_eq!(history[1].from, RunStatus::Running);
389 assert_eq!(history[1].to, RunStatus::Retrying);
390 assert_eq!(history[1].event, RunEvent::StepFailedRetryable);
391
392 assert_eq!(history[2].from, RunStatus::Retrying);
393 assert_eq!(history[2].to, RunStatus::Running);
394 assert_eq!(history[2].event, RunEvent::RetryStarted);
395 }
396
397 #[test]
400 fn transition_error_display() {
401 let mut fsm = RunFsm::new();
402 let err = fsm.apply(RunEvent::AllStepsCompleted).unwrap_err();
403 let msg = err.to_string();
404 assert!(msg.contains("all_steps_completed"));
405 assert!(msg.contains("Pending"));
406 }
407}