1use chrono::{DateTime, Utc};
21use meerkat_core::lifecycle::{InputId, RunId};
22
23use crate::input_state::{
24 InputAbandonReason, InputLifecycleState, InputStateHistoryEntry, InputTerminalOutcome,
25};
26
27#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum InputLifecycleInput {
37 QueueAccepted,
39 StageForRun { run_id: RunId },
41 RollbackStaged,
43 MarkApplied { run_id: RunId },
45 MarkAppliedPendingConsumption { boundary_sequence: u64 },
47 Consume,
49 Supersede,
51 Coalesce,
53 Abandon { reason: InputAbandonReason },
55 ConsumeOnAccept,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum InputLifecycleEffect {
69 InputLifecycleNotice { new_state: InputLifecycleState },
71 RecordTerminalOutcome { outcome: InputTerminalOutcome },
73 RecordRunAssociation { run_id: RunId },
75 RecordBoundarySequence { boundary_sequence: u64 },
77}
78
79#[derive(Debug)]
85pub struct InputLifecycleTransition {
86 pub next_phase: InputLifecycleState,
88 pub effects: Vec<InputLifecycleEffect>,
90}
91
92#[derive(Debug, Clone, thiserror::Error)]
98#[non_exhaustive]
99pub enum InputLifecycleError {
100 #[error("Invalid transition: {from:?} via {input} (current phase rejects this input)")]
102 InvalidTransition {
103 from: InputLifecycleState,
104 input: String,
105 },
106 #[error("Input is in terminal state {state:?}")]
108 TerminalState { state: InputLifecycleState },
109 #[error("Guard failed: {guard} (from {from:?})")]
111 GuardFailed {
112 from: InputLifecycleState,
113 guard: String,
114 },
115}
116
117const MAX_STAGE_ATTEMPTS: u32 = 3;
130
131#[derive(Debug, Clone)]
132struct InputLifecycleFields {
133 terminal_outcome: Option<InputTerminalOutcome>,
134 last_run_id: Option<RunId>,
135 last_boundary_sequence: Option<u64>,
136 attempt_count: u32,
141}
142
143mod sealed {
148 pub trait Sealed {}
149}
150
151pub trait InputLifecycleMutator: sealed::Sealed {
157 fn apply(
162 &mut self,
163 input: InputLifecycleInput,
164 ) -> Result<InputLifecycleTransition, InputLifecycleError>;
165}
166
167#[derive(Debug, Clone)]
177pub struct InputLifecycleAuthority {
178 phase: InputLifecycleState,
180 fields: InputLifecycleFields,
182 history: Vec<InputStateHistoryEntry>,
184 updated_at: DateTime<Utc>,
186}
187
188impl sealed::Sealed for InputLifecycleAuthority {}
189
190impl Default for InputLifecycleAuthority {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl InputLifecycleAuthority {
197 pub fn new() -> Self {
199 Self::new_at(Utc::now())
200 }
201
202 pub fn new_at(now: DateTime<Utc>) -> Self {
208 Self {
209 phase: InputLifecycleState::Accepted,
210 fields: InputLifecycleFields {
211 terminal_outcome: None,
212 last_run_id: None,
213 last_boundary_sequence: None,
214 attempt_count: 0,
215 },
216 history: Vec::new(),
217 updated_at: now,
218 }
219 }
220
221 pub fn with_phase(phase: InputLifecycleState) -> Self {
223 Self {
224 phase,
225 fields: InputLifecycleFields {
226 terminal_outcome: None,
227 last_run_id: None,
228 last_boundary_sequence: None,
229 attempt_count: 0,
230 },
231 history: Vec::new(),
232 updated_at: Utc::now(),
233 }
234 }
235
236 pub fn restore(
241 phase: InputLifecycleState,
242 terminal_outcome: Option<InputTerminalOutcome>,
243 last_run_id: Option<RunId>,
244 last_boundary_sequence: Option<u64>,
245 attempt_count: u32,
246 history: Vec<InputStateHistoryEntry>,
247 updated_at: DateTime<Utc>,
248 ) -> Self {
249 Self {
250 phase,
251 fields: InputLifecycleFields {
252 terminal_outcome,
253 last_run_id,
254 last_boundary_sequence,
255 attempt_count,
256 },
257 history,
258 updated_at,
259 }
260 }
261
262 pub fn phase(&self) -> InputLifecycleState {
264 self.phase
265 }
266
267 pub fn is_terminal(&self) -> bool {
269 self.phase.is_terminal()
270 }
271
272 pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
274 self.fields.terminal_outcome.as_ref()
275 }
276
277 pub fn last_run_id(&self) -> Option<&RunId> {
279 self.fields.last_run_id.as_ref()
280 }
281
282 pub fn last_boundary_sequence(&self) -> Option<u64> {
284 self.fields.last_boundary_sequence
285 }
286
287 pub fn attempt_count(&self) -> u32 {
289 self.fields.attempt_count
290 }
291
292 pub fn history(&self) -> &[InputStateHistoryEntry] {
294 &self.history
295 }
296
297 pub fn updated_at(&self) -> DateTime<Utc> {
299 self.updated_at
300 }
301
302 pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
304 self.evaluate(input).is_ok()
305 }
306
307 pub fn require_phase(
309 &self,
310 allowed: &[InputLifecycleState],
311 ) -> Result<(), InputLifecycleError> {
312 if allowed.contains(&self.phase) {
313 Ok(())
314 } else {
315 Err(InputLifecycleError::InvalidTransition {
316 from: self.phase,
317 input: format!("require_phase({allowed:?})"),
318 })
319 }
320 }
321
322 fn evaluate(
324 &self,
325 input: &InputLifecycleInput,
326 ) -> Result<
327 (
328 InputLifecycleState,
329 InputLifecycleFields,
330 Vec<InputLifecycleEffect>,
331 ),
332 InputLifecycleError,
333 > {
334 #[allow(clippy::enum_glob_use)]
335 use InputLifecycleInput::*;
336 #[allow(clippy::enum_glob_use)]
337 use InputLifecycleState::*;
338
339 let phase = self.phase;
340 let mut fields = self.fields.clone();
341 let mut effects = Vec::new();
342
343 if phase.is_terminal() {
345 return Err(InputLifecycleError::TerminalState { state: phase });
346 }
347
348 let next_phase = match (phase, input) {
349 (Accepted, QueueAccepted) => {
351 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
352 Queued
353 }
354
355 (Queued, StageForRun { run_id }) => {
357 fields.last_run_id = Some(run_id.clone());
358 fields.attempt_count += 1;
359 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Staged });
360 effects.push(InputLifecycleEffect::RecordRunAssociation {
361 run_id: run_id.clone(),
362 });
363 Staged
364 }
365
366 (Staged, RollbackStaged) => {
369 if fields.attempt_count >= MAX_STAGE_ATTEMPTS {
370 let outcome = InputTerminalOutcome::Abandoned {
371 reason: crate::input_state::InputAbandonReason::MaxAttemptsExhausted {
372 attempts: fields.attempt_count,
373 },
374 };
375 fields.terminal_outcome = Some(outcome.clone());
376 effects.push(InputLifecycleEffect::InputLifecycleNotice {
377 new_state: Abandoned,
378 });
379 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
380 Abandoned
381 } else {
382 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
383 Queued
384 }
385 }
386
387 (Staged, MarkApplied { run_id }) => {
389 if self.fields.last_run_id.as_ref() != Some(run_id) {
390 return Err(InputLifecycleError::GuardFailed {
391 from: phase,
392 guard: format!(
393 "matches_last_run: expected {:?}, got {run_id:?}",
394 self.fields.last_run_id
395 ),
396 });
397 }
398 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Applied });
399 Applied
400 }
401
402 (Applied, MarkAppliedPendingConsumption { boundary_sequence }) => {
404 fields.last_boundary_sequence = Some(*boundary_sequence);
405 effects.push(InputLifecycleEffect::InputLifecycleNotice {
406 new_state: AppliedPendingConsumption,
407 });
408 effects.push(InputLifecycleEffect::RecordBoundarySequence {
409 boundary_sequence: *boundary_sequence,
410 });
411 AppliedPendingConsumption
412 }
413
414 (AppliedPendingConsumption, Consume) => {
416 let outcome = InputTerminalOutcome::Consumed;
417 fields.terminal_outcome = Some(outcome.clone());
418 effects.push(InputLifecycleEffect::InputLifecycleNotice {
419 new_state: Consumed,
420 });
421 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
422 Consumed
423 }
424
425 (Queued, Supersede) => {
427 let outcome = InputTerminalOutcome::Superseded {
428 superseded_by: InputId::new(), };
430 fields.terminal_outcome = Some(outcome.clone());
431 effects.push(InputLifecycleEffect::InputLifecycleNotice {
432 new_state: Superseded,
433 });
434 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
435 Superseded
436 }
437
438 (Queued, Coalesce) => {
440 let outcome = InputTerminalOutcome::Coalesced {
441 aggregate_id: InputId::new(), };
443 fields.terminal_outcome = Some(outcome.clone());
444 effects.push(InputLifecycleEffect::InputLifecycleNotice {
445 new_state: Coalesced,
446 });
447 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
448 Coalesced
449 }
450
451 (
453 Accepted | Queued | Staged | Applied | AppliedPendingConsumption,
454 Abandon { reason },
455 ) => {
456 let outcome = InputTerminalOutcome::Abandoned {
457 reason: reason.clone(),
458 };
459 fields.terminal_outcome = Some(outcome.clone());
460 effects.push(InputLifecycleEffect::InputLifecycleNotice {
461 new_state: Abandoned,
462 });
463 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
464 Abandoned
465 }
466
467 (Accepted, ConsumeOnAccept) => {
469 let outcome = InputTerminalOutcome::Consumed;
470 fields.terminal_outcome = Some(outcome.clone());
471 effects.push(InputLifecycleEffect::InputLifecycleNotice {
472 new_state: Consumed,
473 });
474 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
475 Consumed
476 }
477
478 _ => {
480 return Err(InputLifecycleError::InvalidTransition {
481 from: phase,
482 input: format!("{input:?}"),
483 });
484 }
485 };
486
487 Ok((next_phase, fields, effects))
488 }
489
490 pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
496 self.fields.terminal_outcome = Some(outcome);
497 }
498
499 pub fn stamp_receipt_metadata(&mut self, run_id: RunId, boundary_sequence: u64) {
505 self.fields.last_run_id = Some(run_id);
506 self.fields.last_boundary_sequence = Some(boundary_sequence);
507 }
508}
509
510impl InputLifecycleMutator for InputLifecycleAuthority {
511 fn apply(
512 &mut self,
513 input: InputLifecycleInput,
514 ) -> Result<InputLifecycleTransition, InputLifecycleError> {
515 let from = self.phase;
516 let reason = format!("{input:?}");
517 let (next_phase, next_fields, effects) = self.evaluate(&input)?;
518
519 let now = Utc::now();
520
521 self.history.push(InputStateHistoryEntry {
523 timestamp: now,
524 from,
525 to: next_phase,
526 reason: Some(reason),
527 });
528
529 self.phase = next_phase;
531 self.fields = next_fields;
532 self.updated_at = now;
533
534 Ok(InputLifecycleTransition {
535 next_phase,
536 effects,
537 })
538 }
539}
540
541#[cfg(test)]
546#[allow(
547 clippy::unwrap_used,
548 clippy::expect_used,
549 clippy::redundant_clone,
550 clippy::panic
551)]
552mod tests {
553 use super::*;
554
555 fn make_authority() -> InputLifecycleAuthority {
556 InputLifecycleAuthority::new()
557 }
558
559 fn make_at_phase(phase: InputLifecycleState) -> InputLifecycleAuthority {
560 InputLifecycleAuthority::with_phase(phase)
561 }
562
563 #[test]
566 fn accepted_to_queued() {
567 let mut auth = make_authority();
568 let t = auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
569 assert_eq!(t.next_phase, InputLifecycleState::Queued);
570 assert_eq!(auth.phase(), InputLifecycleState::Queued);
571 assert_eq!(auth.history().len(), 1);
572 assert_eq!(auth.history()[0].from, InputLifecycleState::Accepted);
573 assert_eq!(auth.history()[0].to, InputLifecycleState::Queued);
574 }
575
576 #[test]
577 fn queued_to_staged() {
578 let mut auth = make_authority();
579 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
580 let run_id = RunId::new();
581 let t = auth
582 .apply(InputLifecycleInput::StageForRun {
583 run_id: run_id.clone(),
584 })
585 .unwrap();
586 assert_eq!(t.next_phase, InputLifecycleState::Staged);
587 assert_eq!(auth.last_run_id(), Some(&run_id));
588 assert!(
589 t.effects
590 .iter()
591 .any(|e| matches!(e, InputLifecycleEffect::RecordRunAssociation { .. }))
592 );
593 }
594
595 #[test]
596 fn staged_to_applied() {
597 let mut auth = make_authority();
598 let run_id = RunId::new();
599 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
600 auth.apply(InputLifecycleInput::StageForRun {
601 run_id: run_id.clone(),
602 })
603 .unwrap();
604 let t = auth
605 .apply(InputLifecycleInput::MarkApplied {
606 run_id: run_id.clone(),
607 })
608 .unwrap();
609 assert_eq!(t.next_phase, InputLifecycleState::Applied);
610 }
611
612 #[test]
613 fn applied_to_applied_pending_consumption() {
614 let mut auth = make_authority();
615 let run_id = RunId::new();
616 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
617 auth.apply(InputLifecycleInput::StageForRun {
618 run_id: run_id.clone(),
619 })
620 .unwrap();
621 auth.apply(InputLifecycleInput::MarkApplied {
622 run_id: run_id.clone(),
623 })
624 .unwrap();
625 let t = auth
626 .apply(InputLifecycleInput::MarkAppliedPendingConsumption {
627 boundary_sequence: 42,
628 })
629 .unwrap();
630 assert_eq!(t.next_phase, InputLifecycleState::AppliedPendingConsumption);
631 assert_eq!(auth.last_boundary_sequence(), Some(42));
632 assert!(t.effects.iter().any(|e| matches!(
633 e,
634 InputLifecycleEffect::RecordBoundarySequence {
635 boundary_sequence: 42
636 }
637 )));
638 }
639
640 #[test]
641 fn applied_pending_to_consumed() {
642 let mut auth = make_authority();
643 let run_id = RunId::new();
644 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
645 auth.apply(InputLifecycleInput::StageForRun {
646 run_id: run_id.clone(),
647 })
648 .unwrap();
649 auth.apply(InputLifecycleInput::MarkApplied {
650 run_id: run_id.clone(),
651 })
652 .unwrap();
653 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
654 boundary_sequence: 1,
655 })
656 .unwrap();
657 let t = auth.apply(InputLifecycleInput::Consume).unwrap();
658 assert_eq!(t.next_phase, InputLifecycleState::Consumed);
659 assert!(auth.is_terminal());
660 assert!(matches!(
661 auth.terminal_outcome(),
662 Some(InputTerminalOutcome::Consumed)
663 ));
664 }
665
666 #[test]
667 fn full_happy_path_history() {
668 let mut auth = make_authority();
669 let run_id = RunId::new();
670 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
671 auth.apply(InputLifecycleInput::StageForRun {
672 run_id: run_id.clone(),
673 })
674 .unwrap();
675 auth.apply(InputLifecycleInput::MarkApplied {
676 run_id: run_id.clone(),
677 })
678 .unwrap();
679 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
680 boundary_sequence: 1,
681 })
682 .unwrap();
683 auth.apply(InputLifecycleInput::Consume).unwrap();
684 assert_eq!(auth.history().len(), 5);
685 }
686
687 #[test]
690 fn staged_to_queued_rollback() {
691 let mut auth = make_authority();
692 let run_id = RunId::new();
693 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
694 auth.apply(InputLifecycleInput::StageForRun {
695 run_id: run_id.clone(),
696 })
697 .unwrap();
698 let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
699 assert_eq!(t.next_phase, InputLifecycleState::Queued);
700 assert_eq!(auth.phase(), InputLifecycleState::Queued);
701 }
702
703 #[test]
706 fn mark_applied_rejects_wrong_run_id() {
707 let mut auth = make_authority();
708 let run_id = RunId::new();
709 let wrong_run_id = RunId::new();
710 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
711 auth.apply(InputLifecycleInput::StageForRun {
712 run_id: run_id.clone(),
713 })
714 .unwrap();
715 let result = auth.apply(InputLifecycleInput::MarkApplied {
716 run_id: wrong_run_id,
717 });
718 assert!(result.is_err());
719 assert!(matches!(
720 result.unwrap_err(),
721 InputLifecycleError::GuardFailed { .. }
722 ));
723 assert_eq!(auth.phase(), InputLifecycleState::Staged);
725 }
726
727 #[test]
730 fn applied_pending_to_queued_rejected() {
731 let mut auth = make_authority();
732 let run_id = RunId::new();
733 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
734 auth.apply(InputLifecycleInput::StageForRun {
735 run_id: run_id.clone(),
736 })
737 .unwrap();
738 auth.apply(InputLifecycleInput::MarkApplied {
739 run_id: run_id.clone(),
740 })
741 .unwrap();
742 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
743 boundary_sequence: 1,
744 })
745 .unwrap();
746
747 let result = auth.apply(InputLifecycleInput::RollbackStaged);
748 assert!(result.is_err());
749 assert_eq!(auth.phase(), InputLifecycleState::AppliedPendingConsumption);
750 }
751
752 #[test]
755 fn consumed_rejects_all() {
756 let mut auth = make_authority();
757 let run_id = RunId::new();
758 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
759 auth.apply(InputLifecycleInput::StageForRun {
760 run_id: run_id.clone(),
761 })
762 .unwrap();
763 auth.apply(InputLifecycleInput::MarkApplied {
764 run_id: run_id.clone(),
765 })
766 .unwrap();
767 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
768 boundary_sequence: 1,
769 })
770 .unwrap();
771 auth.apply(InputLifecycleInput::Consume).unwrap();
772
773 let result = auth.apply(InputLifecycleInput::QueueAccepted);
774 assert!(result.is_err());
775 assert!(matches!(
776 result.unwrap_err(),
777 InputLifecycleError::TerminalState { .. }
778 ));
779 }
780
781 #[test]
782 fn superseded_rejects_all() {
783 let mut auth = make_authority();
784 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
785 auth.apply(InputLifecycleInput::Supersede).unwrap();
786 assert!(auth.is_terminal());
787
788 let result = auth.apply(InputLifecycleInput::QueueAccepted);
789 assert!(result.is_err());
790 assert!(matches!(
791 result.unwrap_err(),
792 InputLifecycleError::TerminalState { .. }
793 ));
794 }
795
796 #[test]
797 fn coalesced_rejects_all() {
798 let mut auth = make_authority();
799 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
800 auth.apply(InputLifecycleInput::Coalesce).unwrap();
801 assert!(auth.is_terminal());
802
803 let result = auth.apply(InputLifecycleInput::QueueAccepted);
804 assert!(result.is_err());
805 }
806
807 #[test]
808 fn abandoned_rejects_all() {
809 let mut auth = make_authority();
810 auth.apply(InputLifecycleInput::Abandon {
811 reason: InputAbandonReason::Retired,
812 })
813 .unwrap();
814 assert!(auth.is_terminal());
815
816 let result = auth.apply(InputLifecycleInput::QueueAccepted);
817 assert!(result.is_err());
818 }
819
820 #[test]
823 fn abandon_from_accepted() {
824 let mut auth = make_authority();
825 let t = auth
826 .apply(InputLifecycleInput::Abandon {
827 reason: InputAbandonReason::Retired,
828 })
829 .unwrap();
830 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
831 assert!(matches!(
832 auth.terminal_outcome(),
833 Some(InputTerminalOutcome::Abandoned {
834 reason: InputAbandonReason::Retired,
835 })
836 ));
837 }
838
839 #[test]
840 fn abandon_from_queued() {
841 let mut auth = make_authority();
842 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
843 let t = auth
844 .apply(InputLifecycleInput::Abandon {
845 reason: InputAbandonReason::Reset,
846 })
847 .unwrap();
848 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
849 }
850
851 #[test]
852 fn abandon_from_staged() {
853 let mut auth = make_authority();
854 let run_id = RunId::new();
855 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
856 auth.apply(InputLifecycleInput::StageForRun {
857 run_id: run_id.clone(),
858 })
859 .unwrap();
860 let t = auth
861 .apply(InputLifecycleInput::Abandon {
862 reason: InputAbandonReason::Destroyed,
863 })
864 .unwrap();
865 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
866 }
867
868 #[test]
869 fn abandon_from_applied() {
870 let mut auth = make_authority();
871 let run_id = RunId::new();
872 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
873 auth.apply(InputLifecycleInput::StageForRun {
874 run_id: run_id.clone(),
875 })
876 .unwrap();
877 auth.apply(InputLifecycleInput::MarkApplied {
878 run_id: run_id.clone(),
879 })
880 .unwrap();
881 let t = auth
882 .apply(InputLifecycleInput::Abandon {
883 reason: InputAbandonReason::Cancelled,
884 })
885 .unwrap();
886 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
887 }
888
889 #[test]
890 fn abandon_from_applied_pending() {
891 let mut auth = make_authority();
892 let run_id = RunId::new();
893 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
894 auth.apply(InputLifecycleInput::StageForRun {
895 run_id: run_id.clone(),
896 })
897 .unwrap();
898 auth.apply(InputLifecycleInput::MarkApplied {
899 run_id: run_id.clone(),
900 })
901 .unwrap();
902 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
903 boundary_sequence: 1,
904 })
905 .unwrap();
906 let t = auth
907 .apply(InputLifecycleInput::Abandon {
908 reason: InputAbandonReason::Retired,
909 })
910 .unwrap();
911 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
912 }
913
914 #[test]
917 fn consume_on_accept_from_accepted() {
918 let mut auth = make_authority();
919 let t = auth.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
920 assert_eq!(t.next_phase, InputLifecycleState::Consumed);
921 assert!(auth.is_terminal());
922 assert!(matches!(
923 auth.terminal_outcome(),
924 Some(InputTerminalOutcome::Consumed)
925 ));
926 }
927
928 #[test]
929 fn consume_on_accept_from_queued_rejected() {
930 let mut auth = make_authority();
931 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
932 let result = auth.apply(InputLifecycleInput::ConsumeOnAccept);
933 assert!(result.is_err());
934 }
935
936 #[test]
939 fn accepted_to_staged_invalid() {
940 let mut auth = make_authority();
941 let result = auth.apply(InputLifecycleInput::StageForRun {
942 run_id: RunId::new(),
943 });
944 assert!(result.is_err());
945 }
946
947 #[test]
948 fn accepted_to_applied_invalid() {
949 let mut auth = make_authority();
950 let result = auth.apply(InputLifecycleInput::MarkApplied {
951 run_id: RunId::new(),
952 });
953 assert!(result.is_err());
954 }
955
956 #[test]
957 fn queued_to_applied_invalid() {
958 let mut auth = make_authority();
959 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
960 let result = auth.apply(InputLifecycleInput::MarkApplied {
961 run_id: RunId::new(),
962 });
963 assert!(result.is_err());
964 }
965
966 #[test]
967 fn queued_to_consumed_invalid() {
968 let mut auth = make_authority();
969 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
970 let result = auth.apply(InputLifecycleInput::Consume);
971 assert!(result.is_err());
972 }
973
974 #[test]
977 fn supersede_from_queued() {
978 let mut auth = make_authority();
979 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
980 let t = auth.apply(InputLifecycleInput::Supersede).unwrap();
981 assert_eq!(t.next_phase, InputLifecycleState::Superseded);
982 assert!(auth.is_terminal());
983 assert!(
984 t.effects
985 .iter()
986 .any(|e| matches!(e, InputLifecycleEffect::RecordTerminalOutcome { .. }))
987 );
988 }
989
990 #[test]
991 fn supersede_from_accepted_rejected() {
992 let mut auth = make_authority();
993 let result = auth.apply(InputLifecycleInput::Supersede);
994 assert!(result.is_err());
995 }
996
997 #[test]
998 fn coalesce_from_queued() {
999 let mut auth = make_authority();
1000 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1001 let t = auth.apply(InputLifecycleInput::Coalesce).unwrap();
1002 assert_eq!(t.next_phase, InputLifecycleState::Coalesced);
1003 assert!(auth.is_terminal());
1004 }
1005
1006 #[test]
1007 fn coalesce_from_accepted_rejected() {
1008 let mut auth = make_authority();
1009 let result = auth.apply(InputLifecycleInput::Coalesce);
1010 assert!(result.is_err());
1011 }
1012
1013 #[test]
1016 fn set_terminal_outcome_superseded() {
1017 let mut auth = make_authority();
1018 let superseder = InputId::new();
1019 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1020 auth.apply(InputLifecycleInput::Supersede).unwrap();
1021 auth.set_terminal_outcome(InputTerminalOutcome::Superseded {
1022 superseded_by: superseder.clone(),
1023 });
1024 match auth.terminal_outcome() {
1025 Some(InputTerminalOutcome::Superseded { superseded_by }) => {
1026 assert_eq!(superseded_by, &superseder);
1027 }
1028 other => panic!("expected Superseded, got {other:?}"),
1029 }
1030 }
1031
1032 #[test]
1033 fn set_terminal_outcome_coalesced() {
1034 let mut auth = make_authority();
1035 let aggregate = InputId::new();
1036 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1037 auth.apply(InputLifecycleInput::Coalesce).unwrap();
1038 auth.set_terminal_outcome(InputTerminalOutcome::Coalesced {
1039 aggregate_id: aggregate.clone(),
1040 });
1041 match auth.terminal_outcome() {
1042 Some(InputTerminalOutcome::Coalesced { aggregate_id }) => {
1043 assert_eq!(aggregate_id, &aggregate);
1044 }
1045 other => panic!("expected Coalesced, got {other:?}"),
1046 }
1047 }
1048
1049 #[test]
1052 fn history_records_reason() {
1053 let mut auth = make_authority();
1054 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1055 assert!(auth.history()[0].reason.is_some());
1056 assert!(
1057 auth.history()[0]
1058 .reason
1059 .as_deref()
1060 .is_some_and(|r| r.contains("QueueAccepted"))
1061 );
1062 }
1063
1064 #[test]
1065 fn history_records_timestamps() {
1066 let mut auth = make_authority();
1067 let before = Utc::now();
1068 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1069 let after = Utc::now();
1070 assert!(auth.history()[0].timestamp >= before);
1071 assert!(auth.history()[0].timestamp <= after);
1072 }
1073
1074 #[test]
1077 fn can_accept_probes_without_mutation() {
1078 let auth = make_authority();
1079 assert!(auth.can_accept(&InputLifecycleInput::QueueAccepted));
1080 assert!(!auth.can_accept(&InputLifecycleInput::Consume));
1081 assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1083 }
1084
1085 #[test]
1088 fn require_phase_accepts_allowed() {
1089 let auth = make_authority();
1090 assert!(
1091 auth.require_phase(&[InputLifecycleState::Accepted, InputLifecycleState::Queued])
1092 .is_ok()
1093 );
1094 }
1095
1096 #[test]
1097 fn require_phase_rejects_disallowed() {
1098 let auth = make_authority();
1099 let result = auth.require_phase(&[InputLifecycleState::Queued]);
1100 assert!(matches!(
1101 result,
1102 Err(InputLifecycleError::InvalidTransition { .. })
1103 ));
1104 }
1105
1106 #[test]
1109 fn phase_unchanged_on_rejected_transition() {
1110 let mut auth = make_authority();
1111 let _ = auth.apply(InputLifecycleInput::Consume);
1112 assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1113 assert!(auth.history().is_empty());
1114 }
1115
1116 #[test]
1119 fn restore_preserves_all_fields() {
1120 let run_id = RunId::new();
1121 let auth = InputLifecycleAuthority::restore(
1122 InputLifecycleState::Applied,
1123 None,
1124 Some(run_id.clone()),
1125 Some(42),
1126 2, vec![InputStateHistoryEntry {
1128 timestamp: Utc::now(),
1129 from: InputLifecycleState::Accepted,
1130 to: InputLifecycleState::Queued,
1131 reason: Some("restored".into()),
1132 }],
1133 Utc::now(),
1134 );
1135 assert_eq!(auth.phase(), InputLifecycleState::Applied);
1136 assert_eq!(auth.last_run_id(), Some(&run_id));
1137 assert_eq!(auth.last_boundary_sequence(), Some(42));
1138 assert_eq!(auth.attempt_count(), 2);
1139 assert_eq!(auth.history().len(), 1);
1140 }
1141
1142 #[test]
1143 fn restore_preserves_attempt_count_for_retry_bound() {
1144 let auth = InputLifecycleAuthority::restore(
1147 InputLifecycleState::Queued,
1148 None,
1149 None,
1150 None,
1151 2, Vec::new(),
1153 Utc::now(),
1154 );
1155
1156 let mut auth = auth;
1158 auth.apply(InputLifecycleInput::StageForRun {
1159 run_id: RunId::new(),
1160 })
1161 .unwrap();
1162 assert_eq!(auth.attempt_count(), 3);
1163
1164 let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
1165 assert_eq!(
1166 t.next_phase,
1167 InputLifecycleState::Abandoned,
1168 "after 3 attempts (2 restored + 1 new), rollback should abandon"
1169 );
1170 }
1171
1172 #[test]
1175 fn abandon_from_all_non_terminal_states() {
1176 for phase in [
1177 InputLifecycleState::Accepted,
1178 InputLifecycleState::Queued,
1179 InputLifecycleState::Staged,
1180 InputLifecycleState::Applied,
1181 InputLifecycleState::AppliedPendingConsumption,
1182 ] {
1183 let mut auth = make_at_phase(phase);
1184 let t = auth.apply(InputLifecycleInput::Abandon {
1185 reason: InputAbandonReason::Destroyed,
1186 });
1187 assert!(
1188 t.is_ok(),
1189 "abandon should succeed from {phase:?}, got {t:?}"
1190 );
1191 assert!(auth.is_terminal());
1192 }
1193 }
1194
1195 #[test]
1198 fn abandon_from_terminal_states_rejected() {
1199 for phase in [
1200 InputLifecycleState::Consumed,
1201 InputLifecycleState::Superseded,
1202 InputLifecycleState::Coalesced,
1203 InputLifecycleState::Abandoned,
1204 ] {
1205 let mut auth = make_at_phase(phase);
1206 let result = auth.apply(InputLifecycleInput::Abandon {
1207 reason: InputAbandonReason::Destroyed,
1208 });
1209 assert!(
1210 result.is_err(),
1211 "abandon should be rejected from terminal {phase:?}"
1212 );
1213 }
1214 }
1215
1216 #[test]
1219 fn consume_emits_notice_and_terminal_outcome() {
1220 let mut auth = make_authority();
1221 let run_id = RunId::new();
1222 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1223 auth.apply(InputLifecycleInput::StageForRun {
1224 run_id: run_id.clone(),
1225 })
1226 .unwrap();
1227 auth.apply(InputLifecycleInput::MarkApplied {
1228 run_id: run_id.clone(),
1229 })
1230 .unwrap();
1231 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
1232 boundary_sequence: 1,
1233 })
1234 .unwrap();
1235 let t = auth.apply(InputLifecycleInput::Consume).unwrap();
1236 assert!(t.effects.iter().any(|e| matches!(
1237 e,
1238 InputLifecycleEffect::InputLifecycleNotice {
1239 new_state: InputLifecycleState::Consumed
1240 }
1241 )));
1242 assert!(t.effects.iter().any(|e| matches!(
1243 e,
1244 InputLifecycleEffect::RecordTerminalOutcome {
1245 outcome: InputTerminalOutcome::Consumed
1246 }
1247 )));
1248 }
1249
1250 #[test]
1251 fn abandon_emits_notice_and_terminal_outcome() {
1252 let mut auth = make_authority();
1253 let t = auth
1254 .apply(InputLifecycleInput::Abandon {
1255 reason: InputAbandonReason::Cancelled,
1256 })
1257 .unwrap();
1258 assert!(t.effects.iter().any(|e| matches!(
1259 e,
1260 InputLifecycleEffect::InputLifecycleNotice {
1261 new_state: InputLifecycleState::Abandoned
1262 }
1263 )));
1264 assert!(t.effects.iter().any(|e| matches!(
1265 e,
1266 InputLifecycleEffect::RecordTerminalOutcome {
1267 outcome: InputTerminalOutcome::Abandoned {
1268 reason: InputAbandonReason::Cancelled,
1269 },
1270 }
1271 )));
1272 }
1273}