1use chrono::Utc;
9
10use crate::input_state::{
11 InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
12 InputTerminalOutcome,
13};
14use meerkat_core::lifecycle::InputId;
15
16#[derive(Debug, Clone, thiserror::Error)]
18#[non_exhaustive]
19pub enum InputStateMachineError {
20 #[error("Invalid transition: {from:?} -> {to:?}")]
22 InvalidTransition {
23 from: InputLifecycleState,
24 to: InputLifecycleState,
25 },
26 #[error("Input {input_id} is in terminal state {state:?}")]
28 TerminalState {
29 input_id: InputId,
30 state: InputLifecycleState,
31 },
32}
33
34pub struct InputStateMachine;
36
37impl InputStateMachine {
38 pub fn is_valid_transition(from: InputLifecycleState, to: InputLifecycleState) -> bool {
40 use InputLifecycleState::{
41 Abandoned, Accepted, Applied, AppliedPendingConsumption, Coalesced, Consumed, Queued,
42 Staged, Superseded,
43 };
44
45 if from.is_terminal() {
47 return false;
48 }
49
50 matches!(
51 (from, to),
52 (Accepted, Queued | Consumed | Superseded | Coalesced | Abandoned)
54 | (Queued, Staged | Superseded | Coalesced | Abandoned)
56 | (Staged, Queued | Applied | Superseded | Abandoned)
58 | (Applied, AppliedPendingConsumption | Abandoned)
60 | (AppliedPendingConsumption, Consumed | Abandoned)
63 )
64 }
65
66 pub fn transition(
70 state: &mut InputState,
71 to: InputLifecycleState,
72 reason: Option<String>,
73 ) -> Result<(), InputStateMachineError> {
74 let from = state.current_state;
75
76 if from.is_terminal() {
78 return Err(InputStateMachineError::TerminalState {
79 input_id: state.input_id.clone(),
80 state: from,
81 });
82 }
83
84 if !Self::is_valid_transition(from, to) {
85 return Err(InputStateMachineError::InvalidTransition { from, to });
86 }
87
88 let now = Utc::now();
89
90 state.history.push(InputStateHistoryEntry {
92 timestamp: now,
93 from,
94 to,
95 reason,
96 });
97
98 state.current_state = to;
100 state.updated_at = now;
101
102 if to.is_terminal() && state.terminal_outcome.is_none() {
104 state.terminal_outcome = Some(match to {
105 InputLifecycleState::Consumed => InputTerminalOutcome::Consumed,
106 InputLifecycleState::Abandoned => InputTerminalOutcome::Abandoned {
107 reason: InputAbandonReason::Cancelled,
108 },
109 _ => return Ok(()),
112 });
113 }
114
115 Ok(())
116 }
117
118 pub fn set_terminal_outcome(state: &mut InputState, outcome: InputTerminalOutcome) {
120 state.terminal_outcome = Some(outcome);
121 }
122
123 pub fn abandon(
125 state: &mut InputState,
126 reason: InputAbandonReason,
127 ) -> Result<(), InputStateMachineError> {
128 Self::transition(
129 state,
130 InputLifecycleState::Abandoned,
131 Some(format!("abandoned: {reason:?}")),
132 )?;
133 state.terminal_outcome = Some(InputTerminalOutcome::Abandoned { reason });
134 Ok(())
135 }
136}
137
138#[cfg(test)]
139#[allow(clippy::unwrap_used)]
140mod tests {
141 use super::*;
142
143 fn new_state() -> InputState {
144 InputState::new_accepted(InputId::new())
145 }
146
147 #[test]
150 fn accepted_to_queued() {
151 let mut state = new_state();
152 assert!(
153 InputStateMachine::transition(
154 &mut state,
155 InputLifecycleState::Queued,
156 Some("policy resolved".into()),
157 )
158 .is_ok()
159 );
160 assert_eq!(state.current_state, InputLifecycleState::Queued);
161 assert_eq!(state.history.len(), 1);
162 assert_eq!(state.history[0].from, InputLifecycleState::Accepted);
163 assert_eq!(state.history[0].to, InputLifecycleState::Queued);
164 }
165
166 #[test]
167 fn queued_to_staged() {
168 let mut state = new_state();
169 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
170 assert!(
171 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None,).is_ok()
172 );
173 assert_eq!(state.current_state, InputLifecycleState::Staged);
174 }
175
176 #[test]
177 fn staged_to_applied() {
178 let mut state = new_state();
179 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
180 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
181 assert!(
182 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None,).is_ok()
183 );
184 assert_eq!(state.current_state, InputLifecycleState::Applied);
185 }
186
187 #[test]
188 fn applied_to_applied_pending_consumption() {
189 let mut state = new_state();
190 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
191 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
192 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
193 assert!(
194 InputStateMachine::transition(
195 &mut state,
196 InputLifecycleState::AppliedPendingConsumption,
197 None,
198 )
199 .is_ok()
200 );
201 assert_eq!(
202 state.current_state,
203 InputLifecycleState::AppliedPendingConsumption
204 );
205 }
206
207 #[test]
208 fn applied_pending_to_consumed() {
209 let mut state = new_state();
210 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
211 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
212 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
213 InputStateMachine::transition(
214 &mut state,
215 InputLifecycleState::AppliedPendingConsumption,
216 None,
217 )
218 .unwrap();
219 assert!(
220 InputStateMachine::transition(&mut state, InputLifecycleState::Consumed, None,).is_ok()
221 );
222 assert!(state.is_terminal());
223 assert!(matches!(
224 state.terminal_outcome,
225 Some(InputTerminalOutcome::Consumed)
226 ));
227 }
228
229 #[test]
230 fn full_happy_path_history() {
231 let mut state = new_state();
232 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
233 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
234 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
235 InputStateMachine::transition(
236 &mut state,
237 InputLifecycleState::AppliedPendingConsumption,
238 None,
239 )
240 .unwrap();
241 InputStateMachine::transition(&mut state, InputLifecycleState::Consumed, None).unwrap();
242 assert_eq!(state.history.len(), 5);
243 }
244
245 #[test]
248 fn staged_to_queued_rollback() {
249 let mut state = new_state();
250 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
251 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
252 assert!(
253 InputStateMachine::transition(
254 &mut state,
255 InputLifecycleState::Queued,
256 Some("run failed, rollback".into()),
257 )
258 .is_ok()
259 );
260 assert_eq!(state.current_state, InputLifecycleState::Queued);
261 }
262
263 #[test]
266 fn applied_pending_to_queued_rejected() {
267 let mut state = new_state();
268 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
269 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
270 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
271 InputStateMachine::transition(
272 &mut state,
273 InputLifecycleState::AppliedPendingConsumption,
274 None,
275 )
276 .unwrap();
277
278 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None);
279 assert!(result.is_err());
280 assert!(matches!(
281 result.unwrap_err(),
282 InputStateMachineError::InvalidTransition { .. }
283 ));
284 assert_eq!(
286 state.current_state,
287 InputLifecycleState::AppliedPendingConsumption
288 );
289 }
290
291 #[test]
294 fn consumed_rejects_all() {
295 let mut state = new_state();
296 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
297 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
298 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
299 InputStateMachine::transition(
300 &mut state,
301 InputLifecycleState::AppliedPendingConsumption,
302 None,
303 )
304 .unwrap();
305 InputStateMachine::transition(&mut state, InputLifecycleState::Consumed, None).unwrap();
306
307 for target in [
308 InputLifecycleState::Accepted,
309 InputLifecycleState::Queued,
310 InputLifecycleState::Staged,
311 InputLifecycleState::Applied,
312 InputLifecycleState::Consumed,
313 ] {
314 let result = InputStateMachine::transition(&mut state, target, None);
315 assert!(result.is_err());
316 assert!(matches!(
317 result.unwrap_err(),
318 InputStateMachineError::TerminalState { .. }
319 ));
320 }
321 }
322
323 #[test]
324 fn superseded_rejects_all() {
325 let mut state = new_state();
326 InputStateMachine::transition(&mut state, InputLifecycleState::Superseded, None).unwrap();
327 assert!(state.is_terminal());
328
329 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None);
330 assert!(result.is_err());
331 }
332
333 #[test]
334 fn coalesced_rejects_all() {
335 let mut state = new_state();
336 InputStateMachine::transition(&mut state, InputLifecycleState::Coalesced, None).unwrap();
337 assert!(state.is_terminal());
338
339 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None);
340 assert!(result.is_err());
341 }
342
343 #[test]
344 fn abandoned_rejects_all() {
345 let mut state = new_state();
346 InputStateMachine::abandon(&mut state, InputAbandonReason::Retired).unwrap();
347 assert!(state.is_terminal());
348
349 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None);
350 assert!(result.is_err());
351 }
352
353 #[test]
356 fn abandon_from_accepted() {
357 let mut state = new_state();
358 assert!(InputStateMachine::abandon(&mut state, InputAbandonReason::Retired).is_ok());
359 assert!(matches!(
360 state.terminal_outcome,
361 Some(InputTerminalOutcome::Abandoned {
362 reason: InputAbandonReason::Retired,
363 })
364 ));
365 }
366
367 #[test]
368 fn abandon_from_queued() {
369 let mut state = new_state();
370 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
371 assert!(InputStateMachine::abandon(&mut state, InputAbandonReason::Reset).is_ok());
372 }
373
374 #[test]
375 fn abandon_from_staged() {
376 let mut state = new_state();
377 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
378 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
379 assert!(InputStateMachine::abandon(&mut state, InputAbandonReason::Destroyed).is_ok());
380 }
381
382 #[test]
383 fn abandon_from_applied() {
384 let mut state = new_state();
385 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
386 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
387 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
388 assert!(InputStateMachine::abandon(&mut state, InputAbandonReason::Cancelled).is_ok());
389 }
390
391 #[test]
392 fn abandon_from_applied_pending() {
393 let mut state = new_state();
394 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
395 InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None).unwrap();
396 InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None).unwrap();
397 InputStateMachine::transition(
398 &mut state,
399 InputLifecycleState::AppliedPendingConsumption,
400 None,
401 )
402 .unwrap();
403 assert!(InputStateMachine::abandon(&mut state, InputAbandonReason::Retired).is_ok());
404 }
405
406 #[test]
409 fn accepted_to_consumed_ignore_on_accept() {
410 let mut state = new_state();
411 assert!(
412 InputStateMachine::transition(
413 &mut state,
414 InputLifecycleState::Consumed,
415 Some("Ignore + OnAccept".into()),
416 )
417 .is_ok()
418 );
419 assert!(state.is_terminal());
420 }
421
422 #[test]
425 fn accepted_to_staged_invalid() {
426 let mut state = new_state();
427 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Staged, None);
428 assert!(result.is_err());
429 }
430
431 #[test]
432 fn accepted_to_applied_invalid() {
433 let mut state = new_state();
434 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None);
435 assert!(result.is_err());
436 }
437
438 #[test]
439 fn queued_to_applied_invalid() {
440 let mut state = new_state();
441 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
442 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Applied, None);
443 assert!(result.is_err());
444 }
445
446 #[test]
447 fn queued_to_consumed_invalid() {
448 let mut state = new_state();
449 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
450 let result = InputStateMachine::transition(&mut state, InputLifecycleState::Consumed, None);
451 assert!(result.is_err());
452 }
453
454 #[test]
457 fn set_terminal_outcome_superseded() {
458 let mut state = new_state();
459 let superseder = InputId::new();
460 InputStateMachine::transition(&mut state, InputLifecycleState::Superseded, None).unwrap();
461 InputStateMachine::set_terminal_outcome(
462 &mut state,
463 InputTerminalOutcome::Superseded {
464 superseded_by: superseder,
465 },
466 );
467 assert!(matches!(
468 state.terminal_outcome,
469 Some(InputTerminalOutcome::Superseded { .. })
470 ));
471 }
472
473 #[test]
476 fn history_records_reason() {
477 let mut state = new_state();
478 InputStateMachine::transition(
479 &mut state,
480 InputLifecycleState::Queued,
481 Some("test reason".into()),
482 )
483 .unwrap();
484 assert_eq!(state.history[0].reason.as_deref(), Some("test reason"));
485 }
486
487 #[test]
488 fn history_records_timestamps() {
489 let mut state = new_state();
490 let before = Utc::now();
491 InputStateMachine::transition(&mut state, InputLifecycleState::Queued, None).unwrap();
492 let after = Utc::now();
493 assert!(state.history[0].timestamp >= before);
494 assert!(state.history[0].timestamp <= after);
495 }
496}