1use chrono::{DateTime, Utc};
21use meerkat_core::lifecycle::{InputId, RunId};
22
23use crate::input_state::{
24 InputAbandonReason, InputLifecycleState, InputStateHistoryEntry, InputTerminalOutcome,
25};
26
27#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum InputLifecycleInput {
37 QueueAccepted,
39 StageForRun { run_id: RunId },
41 RollbackStaged,
43 MarkApplied { run_id: RunId },
45 MarkAppliedPendingConsumption { boundary_sequence: u64 },
47 Consume,
49 Supersede,
51 Coalesce,
53 Abandon { reason: InputAbandonReason },
55 ConsumeOnAccept,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum InputLifecycleEffect {
69 InputLifecycleNotice { new_state: InputLifecycleState },
71 RecordTerminalOutcome { outcome: InputTerminalOutcome },
73 RecordRunAssociation { run_id: RunId },
75 RecordBoundarySequence { boundary_sequence: u64 },
77}
78
79#[derive(Debug)]
85pub struct InputLifecycleTransition {
86 pub next_phase: InputLifecycleState,
88 pub effects: Vec<InputLifecycleEffect>,
90}
91
92#[derive(Debug, Clone, thiserror::Error)]
98#[non_exhaustive]
99pub enum InputLifecycleError {
100 #[error("Invalid transition: {from:?} via {input} (current phase rejects this input)")]
102 InvalidTransition {
103 from: InputLifecycleState,
104 input: String,
105 },
106 #[error("Input is in terminal state {state:?}")]
108 TerminalState { state: InputLifecycleState },
109 #[error("Guard failed: {guard} (from {from:?})")]
111 GuardFailed {
112 from: InputLifecycleState,
113 guard: String,
114 },
115}
116
117#[derive(Debug, Clone)]
126struct InputLifecycleFields {
127 terminal_outcome: Option<InputTerminalOutcome>,
128 last_run_id: Option<RunId>,
129 last_boundary_sequence: Option<u64>,
130}
131
132mod sealed {
137 pub trait Sealed {}
138}
139
140pub trait InputLifecycleMutator: sealed::Sealed {
146 fn apply(
151 &mut self,
152 input: InputLifecycleInput,
153 ) -> Result<InputLifecycleTransition, InputLifecycleError>;
154}
155
156#[derive(Debug, Clone)]
166pub struct InputLifecycleAuthority {
167 phase: InputLifecycleState,
169 fields: InputLifecycleFields,
171 history: Vec<InputStateHistoryEntry>,
173 updated_at: DateTime<Utc>,
175}
176
177impl sealed::Sealed for InputLifecycleAuthority {}
178
179impl Default for InputLifecycleAuthority {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185impl InputLifecycleAuthority {
186 pub fn new() -> Self {
188 Self::new_at(Utc::now())
189 }
190
191 pub fn new_at(now: DateTime<Utc>) -> Self {
197 Self {
198 phase: InputLifecycleState::Accepted,
199 fields: InputLifecycleFields {
200 terminal_outcome: None,
201 last_run_id: None,
202 last_boundary_sequence: None,
203 },
204 history: Vec::new(),
205 updated_at: now,
206 }
207 }
208
209 pub fn with_phase(phase: InputLifecycleState) -> Self {
211 Self {
212 phase,
213 fields: InputLifecycleFields {
214 terminal_outcome: None,
215 last_run_id: None,
216 last_boundary_sequence: None,
217 },
218 history: Vec::new(),
219 updated_at: Utc::now(),
220 }
221 }
222
223 pub fn restore(
228 phase: InputLifecycleState,
229 terminal_outcome: Option<InputTerminalOutcome>,
230 last_run_id: Option<RunId>,
231 last_boundary_sequence: Option<u64>,
232 history: Vec<InputStateHistoryEntry>,
233 updated_at: DateTime<Utc>,
234 ) -> Self {
235 Self {
236 phase,
237 fields: InputLifecycleFields {
238 terminal_outcome,
239 last_run_id,
240 last_boundary_sequence,
241 },
242 history,
243 updated_at,
244 }
245 }
246
247 pub fn phase(&self) -> InputLifecycleState {
249 self.phase
250 }
251
252 pub fn is_terminal(&self) -> bool {
254 self.phase.is_terminal()
255 }
256
257 pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
259 self.fields.terminal_outcome.as_ref()
260 }
261
262 pub fn last_run_id(&self) -> Option<&RunId> {
264 self.fields.last_run_id.as_ref()
265 }
266
267 pub fn last_boundary_sequence(&self) -> Option<u64> {
269 self.fields.last_boundary_sequence
270 }
271
272 pub fn history(&self) -> &[InputStateHistoryEntry] {
274 &self.history
275 }
276
277 pub fn updated_at(&self) -> DateTime<Utc> {
279 self.updated_at
280 }
281
282 pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
284 self.evaluate(input).is_ok()
285 }
286
287 pub fn require_phase(
289 &self,
290 allowed: &[InputLifecycleState],
291 ) -> Result<(), InputLifecycleError> {
292 if allowed.contains(&self.phase) {
293 Ok(())
294 } else {
295 Err(InputLifecycleError::InvalidTransition {
296 from: self.phase,
297 input: format!("require_phase({allowed:?})"),
298 })
299 }
300 }
301
302 fn evaluate(
304 &self,
305 input: &InputLifecycleInput,
306 ) -> Result<
307 (
308 InputLifecycleState,
309 InputLifecycleFields,
310 Vec<InputLifecycleEffect>,
311 ),
312 InputLifecycleError,
313 > {
314 #[allow(clippy::enum_glob_use)]
315 use InputLifecycleInput::*;
316 #[allow(clippy::enum_glob_use)]
317 use InputLifecycleState::*;
318
319 let phase = self.phase;
320 let mut fields = self.fields.clone();
321 let mut effects = Vec::new();
322
323 if phase.is_terminal() {
325 return Err(InputLifecycleError::TerminalState { state: phase });
326 }
327
328 let next_phase = match (phase, input) {
329 (Accepted, QueueAccepted) => {
331 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
332 Queued
333 }
334
335 (Queued, StageForRun { run_id }) => {
337 fields.last_run_id = Some(run_id.clone());
338 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Staged });
339 effects.push(InputLifecycleEffect::RecordRunAssociation {
340 run_id: run_id.clone(),
341 });
342 Staged
343 }
344
345 (Staged, RollbackStaged) => {
347 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
348 Queued
349 }
350
351 (Staged, MarkApplied { run_id }) => {
353 if self.fields.last_run_id.as_ref() != Some(run_id) {
354 return Err(InputLifecycleError::GuardFailed {
355 from: phase,
356 guard: format!(
357 "matches_last_run: expected {:?}, got {run_id:?}",
358 self.fields.last_run_id
359 ),
360 });
361 }
362 effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Applied });
363 Applied
364 }
365
366 (Applied, MarkAppliedPendingConsumption { boundary_sequence }) => {
368 fields.last_boundary_sequence = Some(*boundary_sequence);
369 effects.push(InputLifecycleEffect::InputLifecycleNotice {
370 new_state: AppliedPendingConsumption,
371 });
372 effects.push(InputLifecycleEffect::RecordBoundarySequence {
373 boundary_sequence: *boundary_sequence,
374 });
375 AppliedPendingConsumption
376 }
377
378 (AppliedPendingConsumption, Consume) => {
380 let outcome = InputTerminalOutcome::Consumed;
381 fields.terminal_outcome = Some(outcome.clone());
382 effects.push(InputLifecycleEffect::InputLifecycleNotice {
383 new_state: Consumed,
384 });
385 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
386 Consumed
387 }
388
389 (Queued, Supersede) => {
391 let outcome = InputTerminalOutcome::Superseded {
392 superseded_by: InputId::new(), };
394 fields.terminal_outcome = Some(outcome.clone());
395 effects.push(InputLifecycleEffect::InputLifecycleNotice {
396 new_state: Superseded,
397 });
398 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
399 Superseded
400 }
401
402 (Queued, Coalesce) => {
404 let outcome = InputTerminalOutcome::Coalesced {
405 aggregate_id: InputId::new(), };
407 fields.terminal_outcome = Some(outcome.clone());
408 effects.push(InputLifecycleEffect::InputLifecycleNotice {
409 new_state: Coalesced,
410 });
411 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
412 Coalesced
413 }
414
415 (
417 Accepted | Queued | Staged | Applied | AppliedPendingConsumption,
418 Abandon { reason },
419 ) => {
420 let outcome = InputTerminalOutcome::Abandoned {
421 reason: reason.clone(),
422 };
423 fields.terminal_outcome = Some(outcome.clone());
424 effects.push(InputLifecycleEffect::InputLifecycleNotice {
425 new_state: Abandoned,
426 });
427 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
428 Abandoned
429 }
430
431 (Accepted, ConsumeOnAccept) => {
433 let outcome = InputTerminalOutcome::Consumed;
434 fields.terminal_outcome = Some(outcome.clone());
435 effects.push(InputLifecycleEffect::InputLifecycleNotice {
436 new_state: Consumed,
437 });
438 effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
439 Consumed
440 }
441
442 _ => {
444 return Err(InputLifecycleError::InvalidTransition {
445 from: phase,
446 input: format!("{input:?}"),
447 });
448 }
449 };
450
451 Ok((next_phase, fields, effects))
452 }
453
454 pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
460 self.fields.terminal_outcome = Some(outcome);
461 }
462
463 pub fn stamp_receipt_metadata(&mut self, run_id: RunId, boundary_sequence: u64) {
469 self.fields.last_run_id = Some(run_id);
470 self.fields.last_boundary_sequence = Some(boundary_sequence);
471 }
472}
473
474impl InputLifecycleMutator for InputLifecycleAuthority {
475 fn apply(
476 &mut self,
477 input: InputLifecycleInput,
478 ) -> Result<InputLifecycleTransition, InputLifecycleError> {
479 let from = self.phase;
480 let reason = format!("{input:?}");
481 let (next_phase, next_fields, effects) = self.evaluate(&input)?;
482
483 let now = Utc::now();
484
485 self.history.push(InputStateHistoryEntry {
487 timestamp: now,
488 from,
489 to: next_phase,
490 reason: Some(reason),
491 });
492
493 self.phase = next_phase;
495 self.fields = next_fields;
496 self.updated_at = now;
497
498 Ok(InputLifecycleTransition {
499 next_phase,
500 effects,
501 })
502 }
503}
504
505#[cfg(test)]
510#[allow(
511 clippy::unwrap_used,
512 clippy::expect_used,
513 clippy::redundant_clone,
514 clippy::panic
515)]
516mod tests {
517 use super::*;
518
519 fn make_authority() -> InputLifecycleAuthority {
520 InputLifecycleAuthority::new()
521 }
522
523 fn make_at_phase(phase: InputLifecycleState) -> InputLifecycleAuthority {
524 InputLifecycleAuthority::with_phase(phase)
525 }
526
527 #[test]
530 fn accepted_to_queued() {
531 let mut auth = make_authority();
532 let t = auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
533 assert_eq!(t.next_phase, InputLifecycleState::Queued);
534 assert_eq!(auth.phase(), InputLifecycleState::Queued);
535 assert_eq!(auth.history().len(), 1);
536 assert_eq!(auth.history()[0].from, InputLifecycleState::Accepted);
537 assert_eq!(auth.history()[0].to, InputLifecycleState::Queued);
538 }
539
540 #[test]
541 fn queued_to_staged() {
542 let mut auth = make_authority();
543 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
544 let run_id = RunId::new();
545 let t = auth
546 .apply(InputLifecycleInput::StageForRun {
547 run_id: run_id.clone(),
548 })
549 .unwrap();
550 assert_eq!(t.next_phase, InputLifecycleState::Staged);
551 assert_eq!(auth.last_run_id(), Some(&run_id));
552 assert!(
553 t.effects
554 .iter()
555 .any(|e| matches!(e, InputLifecycleEffect::RecordRunAssociation { .. }))
556 );
557 }
558
559 #[test]
560 fn staged_to_applied() {
561 let mut auth = make_authority();
562 let run_id = RunId::new();
563 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
564 auth.apply(InputLifecycleInput::StageForRun {
565 run_id: run_id.clone(),
566 })
567 .unwrap();
568 let t = auth
569 .apply(InputLifecycleInput::MarkApplied {
570 run_id: run_id.clone(),
571 })
572 .unwrap();
573 assert_eq!(t.next_phase, InputLifecycleState::Applied);
574 }
575
576 #[test]
577 fn applied_to_applied_pending_consumption() {
578 let mut auth = make_authority();
579 let run_id = RunId::new();
580 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
581 auth.apply(InputLifecycleInput::StageForRun {
582 run_id: run_id.clone(),
583 })
584 .unwrap();
585 auth.apply(InputLifecycleInput::MarkApplied {
586 run_id: run_id.clone(),
587 })
588 .unwrap();
589 let t = auth
590 .apply(InputLifecycleInput::MarkAppliedPendingConsumption {
591 boundary_sequence: 42,
592 })
593 .unwrap();
594 assert_eq!(t.next_phase, InputLifecycleState::AppliedPendingConsumption);
595 assert_eq!(auth.last_boundary_sequence(), Some(42));
596 assert!(t.effects.iter().any(|e| matches!(
597 e,
598 InputLifecycleEffect::RecordBoundarySequence {
599 boundary_sequence: 42
600 }
601 )));
602 }
603
604 #[test]
605 fn applied_pending_to_consumed() {
606 let mut auth = make_authority();
607 let run_id = RunId::new();
608 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
609 auth.apply(InputLifecycleInput::StageForRun {
610 run_id: run_id.clone(),
611 })
612 .unwrap();
613 auth.apply(InputLifecycleInput::MarkApplied {
614 run_id: run_id.clone(),
615 })
616 .unwrap();
617 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
618 boundary_sequence: 1,
619 })
620 .unwrap();
621 let t = auth.apply(InputLifecycleInput::Consume).unwrap();
622 assert_eq!(t.next_phase, InputLifecycleState::Consumed);
623 assert!(auth.is_terminal());
624 assert!(matches!(
625 auth.terminal_outcome(),
626 Some(InputTerminalOutcome::Consumed)
627 ));
628 }
629
630 #[test]
631 fn full_happy_path_history() {
632 let mut auth = make_authority();
633 let run_id = RunId::new();
634 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
635 auth.apply(InputLifecycleInput::StageForRun {
636 run_id: run_id.clone(),
637 })
638 .unwrap();
639 auth.apply(InputLifecycleInput::MarkApplied {
640 run_id: run_id.clone(),
641 })
642 .unwrap();
643 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
644 boundary_sequence: 1,
645 })
646 .unwrap();
647 auth.apply(InputLifecycleInput::Consume).unwrap();
648 assert_eq!(auth.history().len(), 5);
649 }
650
651 #[test]
654 fn staged_to_queued_rollback() {
655 let mut auth = make_authority();
656 let run_id = RunId::new();
657 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
658 auth.apply(InputLifecycleInput::StageForRun {
659 run_id: run_id.clone(),
660 })
661 .unwrap();
662 let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
663 assert_eq!(t.next_phase, InputLifecycleState::Queued);
664 assert_eq!(auth.phase(), InputLifecycleState::Queued);
665 }
666
667 #[test]
670 fn mark_applied_rejects_wrong_run_id() {
671 let mut auth = make_authority();
672 let run_id = RunId::new();
673 let wrong_run_id = RunId::new();
674 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
675 auth.apply(InputLifecycleInput::StageForRun {
676 run_id: run_id.clone(),
677 })
678 .unwrap();
679 let result = auth.apply(InputLifecycleInput::MarkApplied {
680 run_id: wrong_run_id,
681 });
682 assert!(result.is_err());
683 assert!(matches!(
684 result.unwrap_err(),
685 InputLifecycleError::GuardFailed { .. }
686 ));
687 assert_eq!(auth.phase(), InputLifecycleState::Staged);
689 }
690
691 #[test]
694 fn applied_pending_to_queued_rejected() {
695 let mut auth = make_authority();
696 let run_id = RunId::new();
697 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
698 auth.apply(InputLifecycleInput::StageForRun {
699 run_id: run_id.clone(),
700 })
701 .unwrap();
702 auth.apply(InputLifecycleInput::MarkApplied {
703 run_id: run_id.clone(),
704 })
705 .unwrap();
706 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
707 boundary_sequence: 1,
708 })
709 .unwrap();
710
711 let result = auth.apply(InputLifecycleInput::RollbackStaged);
712 assert!(result.is_err());
713 assert_eq!(auth.phase(), InputLifecycleState::AppliedPendingConsumption);
714 }
715
716 #[test]
719 fn consumed_rejects_all() {
720 let mut auth = make_authority();
721 let run_id = RunId::new();
722 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
723 auth.apply(InputLifecycleInput::StageForRun {
724 run_id: run_id.clone(),
725 })
726 .unwrap();
727 auth.apply(InputLifecycleInput::MarkApplied {
728 run_id: run_id.clone(),
729 })
730 .unwrap();
731 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
732 boundary_sequence: 1,
733 })
734 .unwrap();
735 auth.apply(InputLifecycleInput::Consume).unwrap();
736
737 let result = auth.apply(InputLifecycleInput::QueueAccepted);
738 assert!(result.is_err());
739 assert!(matches!(
740 result.unwrap_err(),
741 InputLifecycleError::TerminalState { .. }
742 ));
743 }
744
745 #[test]
746 fn superseded_rejects_all() {
747 let mut auth = make_authority();
748 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
749 auth.apply(InputLifecycleInput::Supersede).unwrap();
750 assert!(auth.is_terminal());
751
752 let result = auth.apply(InputLifecycleInput::QueueAccepted);
753 assert!(result.is_err());
754 assert!(matches!(
755 result.unwrap_err(),
756 InputLifecycleError::TerminalState { .. }
757 ));
758 }
759
760 #[test]
761 fn coalesced_rejects_all() {
762 let mut auth = make_authority();
763 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
764 auth.apply(InputLifecycleInput::Coalesce).unwrap();
765 assert!(auth.is_terminal());
766
767 let result = auth.apply(InputLifecycleInput::QueueAccepted);
768 assert!(result.is_err());
769 }
770
771 #[test]
772 fn abandoned_rejects_all() {
773 let mut auth = make_authority();
774 auth.apply(InputLifecycleInput::Abandon {
775 reason: InputAbandonReason::Retired,
776 })
777 .unwrap();
778 assert!(auth.is_terminal());
779
780 let result = auth.apply(InputLifecycleInput::QueueAccepted);
781 assert!(result.is_err());
782 }
783
784 #[test]
787 fn abandon_from_accepted() {
788 let mut auth = make_authority();
789 let t = auth
790 .apply(InputLifecycleInput::Abandon {
791 reason: InputAbandonReason::Retired,
792 })
793 .unwrap();
794 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
795 assert!(matches!(
796 auth.terminal_outcome(),
797 Some(InputTerminalOutcome::Abandoned {
798 reason: InputAbandonReason::Retired,
799 })
800 ));
801 }
802
803 #[test]
804 fn abandon_from_queued() {
805 let mut auth = make_authority();
806 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
807 let t = auth
808 .apply(InputLifecycleInput::Abandon {
809 reason: InputAbandonReason::Reset,
810 })
811 .unwrap();
812 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
813 }
814
815 #[test]
816 fn abandon_from_staged() {
817 let mut auth = make_authority();
818 let run_id = RunId::new();
819 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
820 auth.apply(InputLifecycleInput::StageForRun {
821 run_id: run_id.clone(),
822 })
823 .unwrap();
824 let t = auth
825 .apply(InputLifecycleInput::Abandon {
826 reason: InputAbandonReason::Destroyed,
827 })
828 .unwrap();
829 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
830 }
831
832 #[test]
833 fn abandon_from_applied() {
834 let mut auth = make_authority();
835 let run_id = RunId::new();
836 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
837 auth.apply(InputLifecycleInput::StageForRun {
838 run_id: run_id.clone(),
839 })
840 .unwrap();
841 auth.apply(InputLifecycleInput::MarkApplied {
842 run_id: run_id.clone(),
843 })
844 .unwrap();
845 let t = auth
846 .apply(InputLifecycleInput::Abandon {
847 reason: InputAbandonReason::Cancelled,
848 })
849 .unwrap();
850 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
851 }
852
853 #[test]
854 fn abandon_from_applied_pending() {
855 let mut auth = make_authority();
856 let run_id = RunId::new();
857 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
858 auth.apply(InputLifecycleInput::StageForRun {
859 run_id: run_id.clone(),
860 })
861 .unwrap();
862 auth.apply(InputLifecycleInput::MarkApplied {
863 run_id: run_id.clone(),
864 })
865 .unwrap();
866 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
867 boundary_sequence: 1,
868 })
869 .unwrap();
870 let t = auth
871 .apply(InputLifecycleInput::Abandon {
872 reason: InputAbandonReason::Retired,
873 })
874 .unwrap();
875 assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
876 }
877
878 #[test]
881 fn consume_on_accept_from_accepted() {
882 let mut auth = make_authority();
883 let t = auth.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
884 assert_eq!(t.next_phase, InputLifecycleState::Consumed);
885 assert!(auth.is_terminal());
886 assert!(matches!(
887 auth.terminal_outcome(),
888 Some(InputTerminalOutcome::Consumed)
889 ));
890 }
891
892 #[test]
893 fn consume_on_accept_from_queued_rejected() {
894 let mut auth = make_authority();
895 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
896 let result = auth.apply(InputLifecycleInput::ConsumeOnAccept);
897 assert!(result.is_err());
898 }
899
900 #[test]
903 fn accepted_to_staged_invalid() {
904 let mut auth = make_authority();
905 let result = auth.apply(InputLifecycleInput::StageForRun {
906 run_id: RunId::new(),
907 });
908 assert!(result.is_err());
909 }
910
911 #[test]
912 fn accepted_to_applied_invalid() {
913 let mut auth = make_authority();
914 let result = auth.apply(InputLifecycleInput::MarkApplied {
915 run_id: RunId::new(),
916 });
917 assert!(result.is_err());
918 }
919
920 #[test]
921 fn queued_to_applied_invalid() {
922 let mut auth = make_authority();
923 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
924 let result = auth.apply(InputLifecycleInput::MarkApplied {
925 run_id: RunId::new(),
926 });
927 assert!(result.is_err());
928 }
929
930 #[test]
931 fn queued_to_consumed_invalid() {
932 let mut auth = make_authority();
933 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
934 let result = auth.apply(InputLifecycleInput::Consume);
935 assert!(result.is_err());
936 }
937
938 #[test]
941 fn supersede_from_queued() {
942 let mut auth = make_authority();
943 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
944 let t = auth.apply(InputLifecycleInput::Supersede).unwrap();
945 assert_eq!(t.next_phase, InputLifecycleState::Superseded);
946 assert!(auth.is_terminal());
947 assert!(
948 t.effects
949 .iter()
950 .any(|e| matches!(e, InputLifecycleEffect::RecordTerminalOutcome { .. }))
951 );
952 }
953
954 #[test]
955 fn supersede_from_accepted_rejected() {
956 let mut auth = make_authority();
957 let result = auth.apply(InputLifecycleInput::Supersede);
958 assert!(result.is_err());
959 }
960
961 #[test]
962 fn coalesce_from_queued() {
963 let mut auth = make_authority();
964 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
965 let t = auth.apply(InputLifecycleInput::Coalesce).unwrap();
966 assert_eq!(t.next_phase, InputLifecycleState::Coalesced);
967 assert!(auth.is_terminal());
968 }
969
970 #[test]
971 fn coalesce_from_accepted_rejected() {
972 let mut auth = make_authority();
973 let result = auth.apply(InputLifecycleInput::Coalesce);
974 assert!(result.is_err());
975 }
976
977 #[test]
980 fn set_terminal_outcome_superseded() {
981 let mut auth = make_authority();
982 let superseder = InputId::new();
983 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
984 auth.apply(InputLifecycleInput::Supersede).unwrap();
985 auth.set_terminal_outcome(InputTerminalOutcome::Superseded {
986 superseded_by: superseder.clone(),
987 });
988 match auth.terminal_outcome() {
989 Some(InputTerminalOutcome::Superseded { superseded_by }) => {
990 assert_eq!(superseded_by, &superseder);
991 }
992 other => panic!("expected Superseded, got {other:?}"),
993 }
994 }
995
996 #[test]
997 fn set_terminal_outcome_coalesced() {
998 let mut auth = make_authority();
999 let aggregate = InputId::new();
1000 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1001 auth.apply(InputLifecycleInput::Coalesce).unwrap();
1002 auth.set_terminal_outcome(InputTerminalOutcome::Coalesced {
1003 aggregate_id: aggregate.clone(),
1004 });
1005 match auth.terminal_outcome() {
1006 Some(InputTerminalOutcome::Coalesced { aggregate_id }) => {
1007 assert_eq!(aggregate_id, &aggregate);
1008 }
1009 other => panic!("expected Coalesced, got {other:?}"),
1010 }
1011 }
1012
1013 #[test]
1016 fn history_records_reason() {
1017 let mut auth = make_authority();
1018 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1019 assert!(auth.history()[0].reason.is_some());
1020 assert!(
1021 auth.history()[0]
1022 .reason
1023 .as_deref()
1024 .is_some_and(|r| r.contains("QueueAccepted"))
1025 );
1026 }
1027
1028 #[test]
1029 fn history_records_timestamps() {
1030 let mut auth = make_authority();
1031 let before = Utc::now();
1032 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1033 let after = Utc::now();
1034 assert!(auth.history()[0].timestamp >= before);
1035 assert!(auth.history()[0].timestamp <= after);
1036 }
1037
1038 #[test]
1041 fn can_accept_probes_without_mutation() {
1042 let auth = make_authority();
1043 assert!(auth.can_accept(&InputLifecycleInput::QueueAccepted));
1044 assert!(!auth.can_accept(&InputLifecycleInput::Consume));
1045 assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1047 }
1048
1049 #[test]
1052 fn require_phase_accepts_allowed() {
1053 let auth = make_authority();
1054 assert!(
1055 auth.require_phase(&[InputLifecycleState::Accepted, InputLifecycleState::Queued])
1056 .is_ok()
1057 );
1058 }
1059
1060 #[test]
1061 fn require_phase_rejects_disallowed() {
1062 let auth = make_authority();
1063 let result = auth.require_phase(&[InputLifecycleState::Queued]);
1064 assert!(matches!(
1065 result,
1066 Err(InputLifecycleError::InvalidTransition { .. })
1067 ));
1068 }
1069
1070 #[test]
1073 fn phase_unchanged_on_rejected_transition() {
1074 let mut auth = make_authority();
1075 let _ = auth.apply(InputLifecycleInput::Consume);
1076 assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1077 assert!(auth.history().is_empty());
1078 }
1079
1080 #[test]
1083 fn restore_preserves_all_fields() {
1084 let run_id = RunId::new();
1085 let auth = InputLifecycleAuthority::restore(
1086 InputLifecycleState::Applied,
1087 None,
1088 Some(run_id.clone()),
1089 Some(42),
1090 vec![InputStateHistoryEntry {
1091 timestamp: Utc::now(),
1092 from: InputLifecycleState::Accepted,
1093 to: InputLifecycleState::Queued,
1094 reason: Some("restored".into()),
1095 }],
1096 Utc::now(),
1097 );
1098 assert_eq!(auth.phase(), InputLifecycleState::Applied);
1099 assert_eq!(auth.last_run_id(), Some(&run_id));
1100 assert_eq!(auth.last_boundary_sequence(), Some(42));
1101 assert_eq!(auth.history().len(), 1);
1102 }
1103
1104 #[test]
1107 fn abandon_from_all_non_terminal_states() {
1108 for phase in [
1109 InputLifecycleState::Accepted,
1110 InputLifecycleState::Queued,
1111 InputLifecycleState::Staged,
1112 InputLifecycleState::Applied,
1113 InputLifecycleState::AppliedPendingConsumption,
1114 ] {
1115 let mut auth = make_at_phase(phase);
1116 let t = auth.apply(InputLifecycleInput::Abandon {
1117 reason: InputAbandonReason::Destroyed,
1118 });
1119 assert!(
1120 t.is_ok(),
1121 "abandon should succeed from {phase:?}, got {t:?}"
1122 );
1123 assert!(auth.is_terminal());
1124 }
1125 }
1126
1127 #[test]
1130 fn abandon_from_terminal_states_rejected() {
1131 for phase in [
1132 InputLifecycleState::Consumed,
1133 InputLifecycleState::Superseded,
1134 InputLifecycleState::Coalesced,
1135 InputLifecycleState::Abandoned,
1136 ] {
1137 let mut auth = make_at_phase(phase);
1138 let result = auth.apply(InputLifecycleInput::Abandon {
1139 reason: InputAbandonReason::Destroyed,
1140 });
1141 assert!(
1142 result.is_err(),
1143 "abandon should be rejected from terminal {phase:?}"
1144 );
1145 }
1146 }
1147
1148 #[test]
1151 fn consume_emits_notice_and_terminal_outcome() {
1152 let mut auth = make_authority();
1153 let run_id = RunId::new();
1154 auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1155 auth.apply(InputLifecycleInput::StageForRun {
1156 run_id: run_id.clone(),
1157 })
1158 .unwrap();
1159 auth.apply(InputLifecycleInput::MarkApplied {
1160 run_id: run_id.clone(),
1161 })
1162 .unwrap();
1163 auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
1164 boundary_sequence: 1,
1165 })
1166 .unwrap();
1167 let t = auth.apply(InputLifecycleInput::Consume).unwrap();
1168 assert!(t.effects.iter().any(|e| matches!(
1169 e,
1170 InputLifecycleEffect::InputLifecycleNotice {
1171 new_state: InputLifecycleState::Consumed
1172 }
1173 )));
1174 assert!(t.effects.iter().any(|e| matches!(
1175 e,
1176 InputLifecycleEffect::RecordTerminalOutcome {
1177 outcome: InputTerminalOutcome::Consumed
1178 }
1179 )));
1180 }
1181
1182 #[test]
1183 fn abandon_emits_notice_and_terminal_outcome() {
1184 let mut auth = make_authority();
1185 let t = auth
1186 .apply(InputLifecycleInput::Abandon {
1187 reason: InputAbandonReason::Cancelled,
1188 })
1189 .unwrap();
1190 assert!(t.effects.iter().any(|e| matches!(
1191 e,
1192 InputLifecycleEffect::InputLifecycleNotice {
1193 new_state: InputLifecycleState::Abandoned
1194 }
1195 )));
1196 assert!(t.effects.iter().any(|e| matches!(
1197 e,
1198 InputLifecycleEffect::RecordTerminalOutcome {
1199 outcome: InputTerminalOutcome::Abandoned {
1200 reason: InputAbandonReason::Cancelled,
1201 },
1202 }
1203 )));
1204 }
1205}