use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::{InputId, RunId};
use crate::input_state::{
InputAbandonReason, InputLifecycleState, InputStateHistoryEntry, InputTerminalOutcome,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputLifecycleInput {
QueueAccepted,
StageForRun { run_id: RunId },
RollbackStaged,
MarkApplied { run_id: RunId },
MarkAppliedPendingConsumption { boundary_sequence: u64 },
Consume,
Supersede,
Coalesce,
Abandon { reason: InputAbandonReason },
ConsumeOnAccept,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputLifecycleEffect {
InputLifecycleNotice { new_state: InputLifecycleState },
RecordTerminalOutcome { outcome: InputTerminalOutcome },
RecordRunAssociation { run_id: RunId },
RecordBoundarySequence { boundary_sequence: u64 },
}
#[derive(Debug)]
pub struct InputLifecycleTransition {
pub next_phase: InputLifecycleState,
pub effects: Vec<InputLifecycleEffect>,
}
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum InputLifecycleError {
#[error("Invalid transition: {from:?} via {input} (current phase rejects this input)")]
InvalidTransition {
from: InputLifecycleState,
input: String,
},
#[error("Input is in terminal state {state:?}")]
TerminalState { state: InputLifecycleState },
#[error("Guard failed: {guard} (from {from:?})")]
GuardFailed {
from: InputLifecycleState,
guard: String,
},
}
const MAX_STAGE_ATTEMPTS: u32 = 3;
#[derive(Debug, Clone)]
struct InputLifecycleFields {
terminal_outcome: Option<InputTerminalOutcome>,
last_run_id: Option<RunId>,
last_boundary_sequence: Option<u64>,
attempt_count: u32,
}
mod sealed {
pub trait Sealed {}
}
pub trait InputLifecycleMutator: sealed::Sealed {
fn apply(
&mut self,
input: InputLifecycleInput,
) -> Result<InputLifecycleTransition, InputLifecycleError>;
}
#[derive(Debug, Clone)]
pub struct InputLifecycleAuthority {
phase: InputLifecycleState,
fields: InputLifecycleFields,
history: Vec<InputStateHistoryEntry>,
updated_at: DateTime<Utc>,
}
impl sealed::Sealed for InputLifecycleAuthority {}
impl Default for InputLifecycleAuthority {
fn default() -> Self {
Self::new()
}
}
impl InputLifecycleAuthority {
pub fn new() -> Self {
Self::new_at(Utc::now())
}
pub fn new_at(now: DateTime<Utc>) -> Self {
Self {
phase: InputLifecycleState::Accepted,
fields: InputLifecycleFields {
terminal_outcome: None,
last_run_id: None,
last_boundary_sequence: None,
attempt_count: 0,
},
history: Vec::new(),
updated_at: now,
}
}
pub fn with_phase(phase: InputLifecycleState) -> Self {
Self {
phase,
fields: InputLifecycleFields {
terminal_outcome: None,
last_run_id: None,
last_boundary_sequence: None,
attempt_count: 0,
},
history: Vec::new(),
updated_at: Utc::now(),
}
}
pub fn restore(
phase: InputLifecycleState,
terminal_outcome: Option<InputTerminalOutcome>,
last_run_id: Option<RunId>,
last_boundary_sequence: Option<u64>,
attempt_count: u32,
history: Vec<InputStateHistoryEntry>,
updated_at: DateTime<Utc>,
) -> Self {
Self {
phase,
fields: InputLifecycleFields {
terminal_outcome,
last_run_id,
last_boundary_sequence,
attempt_count,
},
history,
updated_at,
}
}
pub fn phase(&self) -> InputLifecycleState {
self.phase
}
pub fn is_terminal(&self) -> bool {
self.phase.is_terminal()
}
pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
self.fields.terminal_outcome.as_ref()
}
pub fn last_run_id(&self) -> Option<&RunId> {
self.fields.last_run_id.as_ref()
}
pub fn last_boundary_sequence(&self) -> Option<u64> {
self.fields.last_boundary_sequence
}
pub fn attempt_count(&self) -> u32 {
self.fields.attempt_count
}
pub fn history(&self) -> &[InputStateHistoryEntry] {
&self.history
}
pub fn updated_at(&self) -> DateTime<Utc> {
self.updated_at
}
pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
self.evaluate(input).is_ok()
}
pub fn require_phase(
&self,
allowed: &[InputLifecycleState],
) -> Result<(), InputLifecycleError> {
if allowed.contains(&self.phase) {
Ok(())
} else {
Err(InputLifecycleError::InvalidTransition {
from: self.phase,
input: format!("require_phase({allowed:?})"),
})
}
}
fn evaluate(
&self,
input: &InputLifecycleInput,
) -> Result<
(
InputLifecycleState,
InputLifecycleFields,
Vec<InputLifecycleEffect>,
),
InputLifecycleError,
> {
#[allow(clippy::enum_glob_use)]
use InputLifecycleInput::*;
#[allow(clippy::enum_glob_use)]
use InputLifecycleState::*;
let phase = self.phase;
let mut fields = self.fields.clone();
let mut effects = Vec::new();
if phase.is_terminal() {
return Err(InputLifecycleError::TerminalState { state: phase });
}
let next_phase = match (phase, input) {
(Accepted, QueueAccepted) => {
effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
Queued
}
(Queued, StageForRun { run_id }) => {
fields.last_run_id = Some(run_id.clone());
fields.attempt_count += 1;
effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Staged });
effects.push(InputLifecycleEffect::RecordRunAssociation {
run_id: run_id.clone(),
});
Staged
}
(Staged, RollbackStaged) => {
if fields.attempt_count >= MAX_STAGE_ATTEMPTS {
let outcome = InputTerminalOutcome::Abandoned {
reason: crate::input_state::InputAbandonReason::MaxAttemptsExhausted {
attempts: fields.attempt_count,
},
};
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Abandoned,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Abandoned
} else {
effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
Queued
}
}
(Staged, MarkApplied { run_id }) => {
if self.fields.last_run_id.as_ref() != Some(run_id) {
return Err(InputLifecycleError::GuardFailed {
from: phase,
guard: format!(
"matches_last_run: expected {:?}, got {run_id:?}",
self.fields.last_run_id
),
});
}
effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Applied });
Applied
}
(Applied, MarkAppliedPendingConsumption { boundary_sequence }) => {
fields.last_boundary_sequence = Some(*boundary_sequence);
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: AppliedPendingConsumption,
});
effects.push(InputLifecycleEffect::RecordBoundarySequence {
boundary_sequence: *boundary_sequence,
});
AppliedPendingConsumption
}
(AppliedPendingConsumption, Consume) => {
let outcome = InputTerminalOutcome::Consumed;
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Consumed,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Consumed
}
(Queued, Supersede) => {
let outcome = InputTerminalOutcome::Superseded {
superseded_by: InputId::new(), };
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Superseded,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Superseded
}
(Queued, Coalesce) => {
let outcome = InputTerminalOutcome::Coalesced {
aggregate_id: InputId::new(), };
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Coalesced,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Coalesced
}
(
Accepted | Queued | Staged | Applied | AppliedPendingConsumption,
Abandon { reason },
) => {
let outcome = InputTerminalOutcome::Abandoned {
reason: reason.clone(),
};
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Abandoned,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Abandoned
}
(Accepted, ConsumeOnAccept) => {
let outcome = InputTerminalOutcome::Consumed;
fields.terminal_outcome = Some(outcome.clone());
effects.push(InputLifecycleEffect::InputLifecycleNotice {
new_state: Consumed,
});
effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
Consumed
}
_ => {
return Err(InputLifecycleError::InvalidTransition {
from: phase,
input: format!("{input:?}"),
});
}
};
Ok((next_phase, fields, effects))
}
pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
self.fields.terminal_outcome = Some(outcome);
}
pub fn stamp_receipt_metadata(&mut self, run_id: RunId, boundary_sequence: u64) {
self.fields.last_run_id = Some(run_id);
self.fields.last_boundary_sequence = Some(boundary_sequence);
}
}
impl InputLifecycleMutator for InputLifecycleAuthority {
fn apply(
&mut self,
input: InputLifecycleInput,
) -> Result<InputLifecycleTransition, InputLifecycleError> {
let from = self.phase;
let reason = format!("{input:?}");
let (next_phase, next_fields, effects) = self.evaluate(&input)?;
let now = Utc::now();
self.history.push(InputStateHistoryEntry {
timestamp: now,
from,
to: next_phase,
reason: Some(reason),
});
self.phase = next_phase;
self.fields = next_fields;
self.updated_at = now;
Ok(InputLifecycleTransition {
next_phase,
effects,
})
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::redundant_clone,
clippy::panic
)]
mod tests {
use super::*;
fn make_authority() -> InputLifecycleAuthority {
InputLifecycleAuthority::new()
}
fn make_at_phase(phase: InputLifecycleState) -> InputLifecycleAuthority {
InputLifecycleAuthority::with_phase(phase)
}
#[test]
fn accepted_to_queued() {
let mut auth = make_authority();
let t = auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Queued);
assert_eq!(auth.phase(), InputLifecycleState::Queued);
assert_eq!(auth.history().len(), 1);
assert_eq!(auth.history()[0].from, InputLifecycleState::Accepted);
assert_eq!(auth.history()[0].to, InputLifecycleState::Queued);
}
#[test]
fn queued_to_staged() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let run_id = RunId::new();
let t = auth
.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Staged);
assert_eq!(auth.last_run_id(), Some(&run_id));
assert!(
t.effects
.iter()
.any(|e| matches!(e, InputLifecycleEffect::RecordRunAssociation { .. }))
);
}
#[test]
fn staged_to_applied() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
let t = auth
.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Applied);
}
#[test]
fn applied_to_applied_pending_consumption() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
let t = auth
.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 42,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::AppliedPendingConsumption);
assert_eq!(auth.last_boundary_sequence(), Some(42));
assert!(t.effects.iter().any(|e| matches!(
e,
InputLifecycleEffect::RecordBoundarySequence {
boundary_sequence: 42
}
)));
}
#[test]
fn applied_pending_to_consumed() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
let t = auth.apply(InputLifecycleInput::Consume).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Consumed);
assert!(auth.is_terminal());
assert!(matches!(
auth.terminal_outcome(),
Some(InputTerminalOutcome::Consumed)
));
}
#[test]
fn full_happy_path_history() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
auth.apply(InputLifecycleInput::Consume).unwrap();
assert_eq!(auth.history().len(), 5);
}
#[test]
fn staged_to_queued_rollback() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Queued);
assert_eq!(auth.phase(), InputLifecycleState::Queued);
}
#[test]
fn mark_applied_rejects_wrong_run_id() {
let mut auth = make_authority();
let run_id = RunId::new();
let wrong_run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
let result = auth.apply(InputLifecycleInput::MarkApplied {
run_id: wrong_run_id,
});
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
InputLifecycleError::GuardFailed { .. }
));
assert_eq!(auth.phase(), InputLifecycleState::Staged);
}
#[test]
fn applied_pending_to_queued_rejected() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
let result = auth.apply(InputLifecycleInput::RollbackStaged);
assert!(result.is_err());
assert_eq!(auth.phase(), InputLifecycleState::AppliedPendingConsumption);
}
#[test]
fn consumed_rejects_all() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
auth.apply(InputLifecycleInput::Consume).unwrap();
let result = auth.apply(InputLifecycleInput::QueueAccepted);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
InputLifecycleError::TerminalState { .. }
));
}
#[test]
fn superseded_rejects_all() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::Supersede).unwrap();
assert!(auth.is_terminal());
let result = auth.apply(InputLifecycleInput::QueueAccepted);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
InputLifecycleError::TerminalState { .. }
));
}
#[test]
fn coalesced_rejects_all() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::Coalesce).unwrap();
assert!(auth.is_terminal());
let result = auth.apply(InputLifecycleInput::QueueAccepted);
assert!(result.is_err());
}
#[test]
fn abandoned_rejects_all() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Retired,
})
.unwrap();
assert!(auth.is_terminal());
let result = auth.apply(InputLifecycleInput::QueueAccepted);
assert!(result.is_err());
}
#[test]
fn abandon_from_accepted() {
let mut auth = make_authority();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Retired,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
assert!(matches!(
auth.terminal_outcome(),
Some(InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Retired,
})
));
}
#[test]
fn abandon_from_queued() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Reset,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
}
#[test]
fn abandon_from_staged() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Destroyed,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
}
#[test]
fn abandon_from_applied() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Cancelled,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
}
#[test]
fn abandon_from_applied_pending() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Retired,
})
.unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
}
#[test]
fn consume_on_accept_from_accepted() {
let mut auth = make_authority();
let t = auth.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Consumed);
assert!(auth.is_terminal());
assert!(matches!(
auth.terminal_outcome(),
Some(InputTerminalOutcome::Consumed)
));
}
#[test]
fn consume_on_accept_from_queued_rejected() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let result = auth.apply(InputLifecycleInput::ConsumeOnAccept);
assert!(result.is_err());
}
#[test]
fn accepted_to_staged_invalid() {
let mut auth = make_authority();
let result = auth.apply(InputLifecycleInput::StageForRun {
run_id: RunId::new(),
});
assert!(result.is_err());
}
#[test]
fn accepted_to_applied_invalid() {
let mut auth = make_authority();
let result = auth.apply(InputLifecycleInput::MarkApplied {
run_id: RunId::new(),
});
assert!(result.is_err());
}
#[test]
fn queued_to_applied_invalid() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let result = auth.apply(InputLifecycleInput::MarkApplied {
run_id: RunId::new(),
});
assert!(result.is_err());
}
#[test]
fn queued_to_consumed_invalid() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let result = auth.apply(InputLifecycleInput::Consume);
assert!(result.is_err());
}
#[test]
fn supersede_from_queued() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let t = auth.apply(InputLifecycleInput::Supersede).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Superseded);
assert!(auth.is_terminal());
assert!(
t.effects
.iter()
.any(|e| matches!(e, InputLifecycleEffect::RecordTerminalOutcome { .. }))
);
}
#[test]
fn supersede_from_accepted_rejected() {
let mut auth = make_authority();
let result = auth.apply(InputLifecycleInput::Supersede);
assert!(result.is_err());
}
#[test]
fn coalesce_from_queued() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let t = auth.apply(InputLifecycleInput::Coalesce).unwrap();
assert_eq!(t.next_phase, InputLifecycleState::Coalesced);
assert!(auth.is_terminal());
}
#[test]
fn coalesce_from_accepted_rejected() {
let mut auth = make_authority();
let result = auth.apply(InputLifecycleInput::Coalesce);
assert!(result.is_err());
}
#[test]
fn set_terminal_outcome_superseded() {
let mut auth = make_authority();
let superseder = InputId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::Supersede).unwrap();
auth.set_terminal_outcome(InputTerminalOutcome::Superseded {
superseded_by: superseder.clone(),
});
match auth.terminal_outcome() {
Some(InputTerminalOutcome::Superseded { superseded_by }) => {
assert_eq!(superseded_by, &superseder);
}
other => panic!("expected Superseded, got {other:?}"),
}
}
#[test]
fn set_terminal_outcome_coalesced() {
let mut auth = make_authority();
let aggregate = InputId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::Coalesce).unwrap();
auth.set_terminal_outcome(InputTerminalOutcome::Coalesced {
aggregate_id: aggregate.clone(),
});
match auth.terminal_outcome() {
Some(InputTerminalOutcome::Coalesced { aggregate_id }) => {
assert_eq!(aggregate_id, &aggregate);
}
other => panic!("expected Coalesced, got {other:?}"),
}
}
#[test]
fn history_records_reason() {
let mut auth = make_authority();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
assert!(auth.history()[0].reason.is_some());
assert!(
auth.history()[0]
.reason
.as_deref()
.is_some_and(|r| r.contains("QueueAccepted"))
);
}
#[test]
fn history_records_timestamps() {
let mut auth = make_authority();
let before = Utc::now();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
let after = Utc::now();
assert!(auth.history()[0].timestamp >= before);
assert!(auth.history()[0].timestamp <= after);
}
#[test]
fn can_accept_probes_without_mutation() {
let auth = make_authority();
assert!(auth.can_accept(&InputLifecycleInput::QueueAccepted));
assert!(!auth.can_accept(&InputLifecycleInput::Consume));
assert_eq!(auth.phase(), InputLifecycleState::Accepted);
}
#[test]
fn require_phase_accepts_allowed() {
let auth = make_authority();
assert!(
auth.require_phase(&[InputLifecycleState::Accepted, InputLifecycleState::Queued])
.is_ok()
);
}
#[test]
fn require_phase_rejects_disallowed() {
let auth = make_authority();
let result = auth.require_phase(&[InputLifecycleState::Queued]);
assert!(matches!(
result,
Err(InputLifecycleError::InvalidTransition { .. })
));
}
#[test]
fn phase_unchanged_on_rejected_transition() {
let mut auth = make_authority();
let _ = auth.apply(InputLifecycleInput::Consume);
assert_eq!(auth.phase(), InputLifecycleState::Accepted);
assert!(auth.history().is_empty());
}
#[test]
fn restore_preserves_all_fields() {
let run_id = RunId::new();
let auth = InputLifecycleAuthority::restore(
InputLifecycleState::Applied,
None,
Some(run_id.clone()),
Some(42),
2, vec![InputStateHistoryEntry {
timestamp: Utc::now(),
from: InputLifecycleState::Accepted,
to: InputLifecycleState::Queued,
reason: Some("restored".into()),
}],
Utc::now(),
);
assert_eq!(auth.phase(), InputLifecycleState::Applied);
assert_eq!(auth.last_run_id(), Some(&run_id));
assert_eq!(auth.last_boundary_sequence(), Some(42));
assert_eq!(auth.attempt_count(), 2);
assert_eq!(auth.history().len(), 1);
}
#[test]
fn restore_preserves_attempt_count_for_retry_bound() {
let auth = InputLifecycleAuthority::restore(
InputLifecycleState::Queued,
None,
None,
None,
2, Vec::new(),
Utc::now(),
);
let mut auth = auth;
auth.apply(InputLifecycleInput::StageForRun {
run_id: RunId::new(),
})
.unwrap();
assert_eq!(auth.attempt_count(), 3);
let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
assert_eq!(
t.next_phase,
InputLifecycleState::Abandoned,
"after 3 attempts (2 restored + 1 new), rollback should abandon"
);
}
#[test]
fn abandon_from_all_non_terminal_states() {
for phase in [
InputLifecycleState::Accepted,
InputLifecycleState::Queued,
InputLifecycleState::Staged,
InputLifecycleState::Applied,
InputLifecycleState::AppliedPendingConsumption,
] {
let mut auth = make_at_phase(phase);
let t = auth.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Destroyed,
});
assert!(
t.is_ok(),
"abandon should succeed from {phase:?}, got {t:?}"
);
assert!(auth.is_terminal());
}
}
#[test]
fn abandon_from_terminal_states_rejected() {
for phase in [
InputLifecycleState::Consumed,
InputLifecycleState::Superseded,
InputLifecycleState::Coalesced,
InputLifecycleState::Abandoned,
] {
let mut auth = make_at_phase(phase);
let result = auth.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Destroyed,
});
assert!(
result.is_err(),
"abandon should be rejected from terminal {phase:?}"
);
}
}
#[test]
fn consume_emits_notice_and_terminal_outcome() {
let mut auth = make_authority();
let run_id = RunId::new();
auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
auth.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.unwrap();
auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 1,
})
.unwrap();
let t = auth.apply(InputLifecycleInput::Consume).unwrap();
assert!(t.effects.iter().any(|e| matches!(
e,
InputLifecycleEffect::InputLifecycleNotice {
new_state: InputLifecycleState::Consumed
}
)));
assert!(t.effects.iter().any(|e| matches!(
e,
InputLifecycleEffect::RecordTerminalOutcome {
outcome: InputTerminalOutcome::Consumed
}
)));
}
#[test]
fn abandon_emits_notice_and_terminal_outcome() {
let mut auth = make_authority();
let t = auth
.apply(InputLifecycleInput::Abandon {
reason: InputAbandonReason::Cancelled,
})
.unwrap();
assert!(t.effects.iter().any(|e| matches!(
e,
InputLifecycleEffect::InputLifecycleNotice {
new_state: InputLifecycleState::Abandoned
}
)));
assert!(t.effects.iter().any(|e| matches!(
e,
InputLifecycleEffect::RecordTerminalOutcome {
outcome: InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Cancelled,
},
}
)));
}
}