Skip to main content

meerkat_runtime/
input_lifecycle_authority.rs

1//! Generated-authority module for the InputLifecycle machine.
2//!
3//! This module provides typed enums and a sealed mutator trait that enforces
4//! all InputLifecycle state mutations flow through the machine authority.
5//! Handwritten shell code calls [`InputLifecycleAuthority::apply`] and executes
6//! returned effects; it cannot mutate canonical state directly.
7//!
8//! The transition table encoded here is the single source of truth, matching
9//! the machine schema in `meerkat-machine-schema/src/catalog/input_lifecycle.rs`:
10//!
11//! - 9 states: Accepted, Queued, Staged, Applied, AppliedPendingConsumption,
12//!   Consumed, Superseded, Coalesced, Abandoned
13//! - 9 inputs: QueueAccepted, StageForRun, RollbackStaged, MarkApplied,
14//!   MarkAppliedPendingConsumption, Consume, Supersede, Coalesce, Abandon
15//! - 4 fields: terminal_outcome, last_run_id, last_boundary_sequence, attempt_count
16//! - 4 terminal states: Consumed, Superseded, Coalesced, Abandoned
17//! - 4 effects: InputLifecycleNotice, RecordTerminalOutcome,
18//!   RecordRunAssociation, RecordBoundarySequence
19
20use chrono::{DateTime, Utc};
21use meerkat_core::lifecycle::{InputId, RunId};
22
23use crate::input_state::{
24    InputAbandonReason, InputLifecycleState, InputStateHistoryEntry, InputTerminalOutcome,
25};
26
27// ---------------------------------------------------------------------------
28// Typed input enum -- mirrors the machine schema's input variants
29// ---------------------------------------------------------------------------
30
31/// Typed inputs for the InputLifecycle machine.
32///
33/// Shell code classifies raw commands into these typed inputs, then calls
34/// [`InputLifecycleAuthority::apply`]. The authority decides transition legality.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum InputLifecycleInput {
37    /// Accepted -> Queued: input has been policy-resolved and is ready for queueing.
38    QueueAccepted,
39    /// Queued -> Staged: input is being staged for a specific run.
40    StageForRun { run_id: RunId },
41    /// Staged -> Queued: rollback on run failure.
42    RollbackStaged,
43    /// Staged -> Applied: the input's boundary primitive has been applied.
44    MarkApplied { run_id: RunId },
45    /// Applied -> AppliedPendingConsumption: boundary receipt confirms application.
46    MarkAppliedPendingConsumption { boundary_sequence: u64 },
47    /// AppliedPendingConsumption -> Consumed: run completed successfully.
48    Consume,
49    /// Queued -> Superseded: a newer input with the same supersession scope arrived.
50    Supersede,
51    /// Queued -> Coalesced: input was merged into an aggregate.
52    Coalesce,
53    /// Any non-terminal -> Abandoned: input was abandoned (retire/reset/destroy/cancel).
54    Abandon { reason: InputAbandonReason },
55    /// Accepted -> Consumed: shortcut for Ignore+OnAccept policy (no queue/run cycle).
56    ConsumeOnAccept,
57}
58
59// ---------------------------------------------------------------------------
60// Typed effect enum -- mirrors the machine schema's effect variants
61// ---------------------------------------------------------------------------
62
63/// Effects emitted by InputLifecycle transitions.
64///
65/// Shell code receives these from [`InputLifecycleAuthority::apply`] and is
66/// responsible for executing the side effects (e.g. emitting events, persisting).
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum InputLifecycleEffect {
69    /// Notify observers of the new lifecycle state.
70    InputLifecycleNotice { new_state: InputLifecycleState },
71    /// Record the terminal outcome for this input.
72    RecordTerminalOutcome { outcome: InputTerminalOutcome },
73    /// Record the run association (which run touched this input).
74    RecordRunAssociation { run_id: RunId },
75    /// Record the boundary receipt sequence number.
76    RecordBoundarySequence { boundary_sequence: u64 },
77}
78
79// ---------------------------------------------------------------------------
80// Transition result
81// ---------------------------------------------------------------------------
82
83/// Successful transition outcome from the InputLifecycle authority.
84#[derive(Debug)]
85pub struct InputLifecycleTransition {
86    /// The phase after the transition.
87    pub next_phase: InputLifecycleState,
88    /// Effects to be executed by shell code.
89    pub effects: Vec<InputLifecycleEffect>,
90}
91
92// ---------------------------------------------------------------------------
93// Error type
94// ---------------------------------------------------------------------------
95
96/// Errors from the input lifecycle authority.
97#[derive(Debug, Clone, thiserror::Error)]
98#[non_exhaustive]
99pub enum InputLifecycleError {
100    /// The transition is not valid from the current state.
101    #[error("Invalid transition: {from:?} via {input} (current phase rejects this input)")]
102    InvalidTransition {
103        from: InputLifecycleState,
104        input: String,
105    },
106    /// The input is already in a terminal state.
107    #[error("Input is in terminal state {state:?}")]
108    TerminalState { state: InputLifecycleState },
109    /// A guard condition was not met.
110    #[error("Guard failed: {guard} (from {from:?})")]
111    GuardFailed {
112        from: InputLifecycleState,
113        guard: String,
114    },
115}
116
117// ---------------------------------------------------------------------------
118// Canonical machine state (fields)
119// ---------------------------------------------------------------------------
120
121/// Canonical machine-owned fields for InputLifecycle.
122///
123/// These fields are owned exclusively by the authority and cannot be mutated
124/// by handwritten shell code.
125/// Maximum number of stage → rollback cycles before the machine abandons the input.
126///
127/// This prevents unbounded `Queued → Staged → Queued` livelock when an executor
128/// keeps failing on the same input. The value is part of the machine schema.
129const MAX_STAGE_ATTEMPTS: u32 = 3;
130
131#[derive(Debug, Clone)]
132struct InputLifecycleFields {
133    terminal_outcome: Option<InputTerminalOutcome>,
134    last_run_id: Option<RunId>,
135    last_boundary_sequence: Option<u64>,
136    /// How many times this input has been staged for a run. Incremented on
137    /// `StageForRun`, checked on `RollbackStaged`. When `attempt_count >=
138    /// MAX_STAGE_ATTEMPTS`, `RollbackStaged` transitions to `Abandoned`
139    /// instead of `Queued`.
140    attempt_count: u32,
141}
142
143// ---------------------------------------------------------------------------
144// Sealed mutator trait -- only the authority implements this
145// ---------------------------------------------------------------------------
146
147mod sealed {
148    pub trait Sealed {}
149}
150
151/// Sealed trait for InputLifecycle state mutation.
152///
153/// Only [`InputLifecycleAuthority`] implements this. Handwritten code cannot
154/// create alternative implementations, ensuring single-source-of-truth
155/// semantics for lifecycle state.
156pub trait InputLifecycleMutator: sealed::Sealed {
157    /// Apply a typed input to the current machine state.
158    ///
159    /// Returns the transition result including next state and effects,
160    /// or an error if the transition is not legal from the current state.
161    fn apply(
162        &mut self,
163        input: InputLifecycleInput,
164    ) -> Result<InputLifecycleTransition, InputLifecycleError>;
165}
166
167// ---------------------------------------------------------------------------
168// Authority implementation
169// ---------------------------------------------------------------------------
170
171/// The canonical authority for InputLifecycle state.
172///
173/// Holds the canonical phase + fields and delegates all transitions through
174/// the encoded transition table. The authority OWNS the canonical state --
175/// callers cannot get `&mut` access to the inner fields.
176#[derive(Debug, Clone)]
177pub struct InputLifecycleAuthority {
178    /// Canonical phase.
179    phase: InputLifecycleState,
180    /// Canonical machine-owned fields.
181    fields: InputLifecycleFields,
182    /// State transition history.
183    history: Vec<InputStateHistoryEntry>,
184    /// Timestamp of last state change.
185    updated_at: DateTime<Utc>,
186}
187
188impl sealed::Sealed for InputLifecycleAuthority {}
189
190impl Default for InputLifecycleAuthority {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl InputLifecycleAuthority {
197    /// Create a new authority in the Accepted state.
198    pub fn new() -> Self {
199        Self::new_at(Utc::now())
200    }
201
202    /// Create a new authority in the Accepted state with a caller-owned timestamp.
203    ///
204    /// Use this when the caller already captured a canonical `now` to ensure
205    /// `updated_at` is consistent with sibling timestamps (e.g., `created_at`
206    /// on the owning `InputState`).
207    pub fn new_at(now: DateTime<Utc>) -> Self {
208        Self {
209            phase: InputLifecycleState::Accepted,
210            fields: InputLifecycleFields {
211                terminal_outcome: None,
212                last_run_id: None,
213                last_boundary_sequence: None,
214                attempt_count: 0,
215            },
216            history: Vec::new(),
217            updated_at: now,
218        }
219    }
220
221    /// Create an authority initialized to a specific phase (for recovery).
222    pub fn with_phase(phase: InputLifecycleState) -> Self {
223        Self {
224            phase,
225            fields: InputLifecycleFields {
226                terminal_outcome: None,
227                last_run_id: None,
228                last_boundary_sequence: None,
229                attempt_count: 0,
230            },
231            history: Vec::new(),
232            updated_at: Utc::now(),
233        }
234    }
235
236    /// Restore an authority from persisted state (for crash recovery).
237    ///
238    /// This reconstructs the authority from a previously-serialized snapshot,
239    /// including all canonical fields and history.
240    pub fn restore(
241        phase: InputLifecycleState,
242        terminal_outcome: Option<InputTerminalOutcome>,
243        last_run_id: Option<RunId>,
244        last_boundary_sequence: Option<u64>,
245        attempt_count: u32,
246        history: Vec<InputStateHistoryEntry>,
247        updated_at: DateTime<Utc>,
248    ) -> Self {
249        Self {
250            phase,
251            fields: InputLifecycleFields {
252                terminal_outcome,
253                last_run_id,
254                last_boundary_sequence,
255                attempt_count,
256            },
257            history,
258            updated_at,
259        }
260    }
261
262    /// Current phase (read from canonical state).
263    pub fn phase(&self) -> InputLifecycleState {
264        self.phase
265    }
266
267    /// Whether the current phase is terminal.
268    pub fn is_terminal(&self) -> bool {
269        self.phase.is_terminal()
270    }
271
272    /// Current terminal outcome (if in a terminal state).
273    pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
274        self.fields.terminal_outcome.as_ref()
275    }
276
277    /// Last run ID that touched this input.
278    pub fn last_run_id(&self) -> Option<&RunId> {
279        self.fields.last_run_id.as_ref()
280    }
281
282    /// Last boundary sequence number.
283    pub fn last_boundary_sequence(&self) -> Option<u64> {
284        self.fields.last_boundary_sequence
285    }
286
287    /// Number of times this input has been staged for a run.
288    pub fn attempt_count(&self) -> u32 {
289        self.fields.attempt_count
290    }
291
292    /// State transition history.
293    pub fn history(&self) -> &[InputStateHistoryEntry] {
294        &self.history
295    }
296
297    /// Timestamp of last state change.
298    pub fn updated_at(&self) -> DateTime<Utc> {
299        self.updated_at
300    }
301
302    /// Check if a transition is legal without applying it.
303    pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
304        self.evaluate(input).is_ok()
305    }
306
307    /// Require that the authority is in one of the given phases.
308    pub fn require_phase(
309        &self,
310        allowed: &[InputLifecycleState],
311    ) -> Result<(), InputLifecycleError> {
312        if allowed.contains(&self.phase) {
313            Ok(())
314        } else {
315            Err(InputLifecycleError::InvalidTransition {
316                from: self.phase,
317                input: format!("require_phase({allowed:?})"),
318            })
319        }
320    }
321
322    /// Evaluate a transition without committing it.
323    fn evaluate(
324        &self,
325        input: &InputLifecycleInput,
326    ) -> Result<
327        (
328            InputLifecycleState,
329            InputLifecycleFields,
330            Vec<InputLifecycleEffect>,
331        ),
332        InputLifecycleError,
333    > {
334        #[allow(clippy::enum_glob_use)]
335        use InputLifecycleInput::*;
336        #[allow(clippy::enum_glob_use)]
337        use InputLifecycleState::*;
338
339        let phase = self.phase;
340        let mut fields = self.fields.clone();
341        let mut effects = Vec::new();
342
343        // Terminal states reject ALL inputs.
344        if phase.is_terminal() {
345            return Err(InputLifecycleError::TerminalState { state: phase });
346        }
347
348        let next_phase = match (phase, input) {
349            // QueueAccepted: Accepted -> Queued
350            (Accepted, QueueAccepted) => {
351                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
352                Queued
353            }
354
355            // StageForRun: Queued -> Staged (increments attempt_count)
356            (Queued, StageForRun { run_id }) => {
357                fields.last_run_id = Some(run_id.clone());
358                fields.attempt_count += 1;
359                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Staged });
360                effects.push(InputLifecycleEffect::RecordRunAssociation {
361                    run_id: run_id.clone(),
362                });
363                Staged
364            }
365
366            // RollbackStaged: Staged -> Queued (if attempts remain)
367            //                  Staged -> Abandoned (if max attempts exhausted)
368            (Staged, RollbackStaged) => {
369                if fields.attempt_count >= MAX_STAGE_ATTEMPTS {
370                    let outcome = InputTerminalOutcome::Abandoned {
371                        reason: crate::input_state::InputAbandonReason::MaxAttemptsExhausted {
372                            attempts: fields.attempt_count,
373                        },
374                    };
375                    fields.terminal_outcome = Some(outcome.clone());
376                    effects.push(InputLifecycleEffect::InputLifecycleNotice {
377                        new_state: Abandoned,
378                    });
379                    effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
380                    Abandoned
381                } else {
382                    effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
383                    Queued
384                }
385            }
386
387            // MarkApplied: Staged -> Applied (guard: matches_last_run)
388            (Staged, MarkApplied { run_id }) => {
389                if self.fields.last_run_id.as_ref() != Some(run_id) {
390                    return Err(InputLifecycleError::GuardFailed {
391                        from: phase,
392                        guard: format!(
393                            "matches_last_run: expected {:?}, got {run_id:?}",
394                            self.fields.last_run_id
395                        ),
396                    });
397                }
398                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Applied });
399                Applied
400            }
401
402            // MarkAppliedPendingConsumption: Applied -> AppliedPendingConsumption
403            (Applied, MarkAppliedPendingConsumption { boundary_sequence }) => {
404                fields.last_boundary_sequence = Some(*boundary_sequence);
405                effects.push(InputLifecycleEffect::InputLifecycleNotice {
406                    new_state: AppliedPendingConsumption,
407                });
408                effects.push(InputLifecycleEffect::RecordBoundarySequence {
409                    boundary_sequence: *boundary_sequence,
410                });
411                AppliedPendingConsumption
412            }
413
414            // Consume: AppliedPendingConsumption -> Consumed
415            (AppliedPendingConsumption, Consume) => {
416                let outcome = InputTerminalOutcome::Consumed;
417                fields.terminal_outcome = Some(outcome.clone());
418                effects.push(InputLifecycleEffect::InputLifecycleNotice {
419                    new_state: Consumed,
420                });
421                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
422                Consumed
423            }
424
425            // Supersede: Queued -> Superseded
426            (Queued, Supersede) => {
427                let outcome = InputTerminalOutcome::Superseded {
428                    superseded_by: InputId::new(), // placeholder, caller sets via set_terminal_outcome
429                };
430                fields.terminal_outcome = Some(outcome.clone());
431                effects.push(InputLifecycleEffect::InputLifecycleNotice {
432                    new_state: Superseded,
433                });
434                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
435                Superseded
436            }
437
438            // Coalesce: Queued -> Coalesced
439            (Queued, Coalesce) => {
440                let outcome = InputTerminalOutcome::Coalesced {
441                    aggregate_id: InputId::new(), // placeholder, caller sets via set_terminal_outcome
442                };
443                fields.terminal_outcome = Some(outcome.clone());
444                effects.push(InputLifecycleEffect::InputLifecycleNotice {
445                    new_state: Coalesced,
446                });
447                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
448                Coalesced
449            }
450
451            // Abandon: any non-terminal -> Abandoned
452            (
453                Accepted | Queued | Staged | Applied | AppliedPendingConsumption,
454                Abandon { reason },
455            ) => {
456                let outcome = InputTerminalOutcome::Abandoned {
457                    reason: reason.clone(),
458                };
459                fields.terminal_outcome = Some(outcome.clone());
460                effects.push(InputLifecycleEffect::InputLifecycleNotice {
461                    new_state: Abandoned,
462                });
463                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
464                Abandoned
465            }
466
467            // ConsumeOnAccept: Accepted -> Consumed (Ignore+OnAccept shortcut)
468            (Accepted, ConsumeOnAccept) => {
469                let outcome = InputTerminalOutcome::Consumed;
470                fields.terminal_outcome = Some(outcome.clone());
471                effects.push(InputLifecycleEffect::InputLifecycleNotice {
472                    new_state: Consumed,
473                });
474                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
475                Consumed
476            }
477
478            // All other combinations are illegal.
479            _ => {
480                return Err(InputLifecycleError::InvalidTransition {
481                    from: phase,
482                    input: format!("{input:?}"),
483                });
484            }
485        };
486
487        Ok((next_phase, fields, effects))
488    }
489
490    /// Set the terminal outcome after a transition.
491    ///
492    /// Used by shell code for Superseded/Coalesced which need caller-provided
493    /// data (superseded_by / aggregate_id) that the authority's transition
494    /// doesn't know at evaluate-time.
495    pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
496        self.fields.terminal_outcome = Some(outcome);
497    }
498
499    /// Stamp receipt metadata onto the authority state.
500    ///
501    /// Used by the store layer after it generates an authoritative receipt
502    /// with a sequence number. This is NOT a lifecycle transition -- it's
503    /// operational metadata enrichment by the persistence layer.
504    pub fn stamp_receipt_metadata(&mut self, run_id: RunId, boundary_sequence: u64) {
505        self.fields.last_run_id = Some(run_id);
506        self.fields.last_boundary_sequence = Some(boundary_sequence);
507    }
508}
509
510impl InputLifecycleMutator for InputLifecycleAuthority {
511    fn apply(
512        &mut self,
513        input: InputLifecycleInput,
514    ) -> Result<InputLifecycleTransition, InputLifecycleError> {
515        let from = self.phase;
516        let reason = format!("{input:?}");
517        let (next_phase, next_fields, effects) = self.evaluate(&input)?;
518
519        let now = Utc::now();
520
521        // Record history.
522        self.history.push(InputStateHistoryEntry {
523            timestamp: now,
524            from,
525            to: next_phase,
526            reason: Some(reason),
527        });
528
529        // Commit: update canonical state.
530        self.phase = next_phase;
531        self.fields = next_fields;
532        self.updated_at = now;
533
534        Ok(InputLifecycleTransition {
535            next_phase,
536            effects,
537        })
538    }
539}
540
541// ---------------------------------------------------------------------------
542// Tests
543// ---------------------------------------------------------------------------
544
545#[cfg(test)]
546#[allow(
547    clippy::unwrap_used,
548    clippy::expect_used,
549    clippy::redundant_clone,
550    clippy::panic
551)]
552mod tests {
553    use super::*;
554
555    fn make_authority() -> InputLifecycleAuthority {
556        InputLifecycleAuthority::new()
557    }
558
559    fn make_at_phase(phase: InputLifecycleState) -> InputLifecycleAuthority {
560        InputLifecycleAuthority::with_phase(phase)
561    }
562
563    // ---- Happy path: full lifecycle ----
564
565    #[test]
566    fn accepted_to_queued() {
567        let mut auth = make_authority();
568        let t = auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
569        assert_eq!(t.next_phase, InputLifecycleState::Queued);
570        assert_eq!(auth.phase(), InputLifecycleState::Queued);
571        assert_eq!(auth.history().len(), 1);
572        assert_eq!(auth.history()[0].from, InputLifecycleState::Accepted);
573        assert_eq!(auth.history()[0].to, InputLifecycleState::Queued);
574    }
575
576    #[test]
577    fn queued_to_staged() {
578        let mut auth = make_authority();
579        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
580        let run_id = RunId::new();
581        let t = auth
582            .apply(InputLifecycleInput::StageForRun {
583                run_id: run_id.clone(),
584            })
585            .unwrap();
586        assert_eq!(t.next_phase, InputLifecycleState::Staged);
587        assert_eq!(auth.last_run_id(), Some(&run_id));
588        assert!(
589            t.effects
590                .iter()
591                .any(|e| matches!(e, InputLifecycleEffect::RecordRunAssociation { .. }))
592        );
593    }
594
595    #[test]
596    fn staged_to_applied() {
597        let mut auth = make_authority();
598        let run_id = RunId::new();
599        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
600        auth.apply(InputLifecycleInput::StageForRun {
601            run_id: run_id.clone(),
602        })
603        .unwrap();
604        let t = auth
605            .apply(InputLifecycleInput::MarkApplied {
606                run_id: run_id.clone(),
607            })
608            .unwrap();
609        assert_eq!(t.next_phase, InputLifecycleState::Applied);
610    }
611
612    #[test]
613    fn applied_to_applied_pending_consumption() {
614        let mut auth = make_authority();
615        let run_id = RunId::new();
616        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
617        auth.apply(InputLifecycleInput::StageForRun {
618            run_id: run_id.clone(),
619        })
620        .unwrap();
621        auth.apply(InputLifecycleInput::MarkApplied {
622            run_id: run_id.clone(),
623        })
624        .unwrap();
625        let t = auth
626            .apply(InputLifecycleInput::MarkAppliedPendingConsumption {
627                boundary_sequence: 42,
628            })
629            .unwrap();
630        assert_eq!(t.next_phase, InputLifecycleState::AppliedPendingConsumption);
631        assert_eq!(auth.last_boundary_sequence(), Some(42));
632        assert!(t.effects.iter().any(|e| matches!(
633            e,
634            InputLifecycleEffect::RecordBoundarySequence {
635                boundary_sequence: 42
636            }
637        )));
638    }
639
640    #[test]
641    fn applied_pending_to_consumed() {
642        let mut auth = make_authority();
643        let run_id = RunId::new();
644        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
645        auth.apply(InputLifecycleInput::StageForRun {
646            run_id: run_id.clone(),
647        })
648        .unwrap();
649        auth.apply(InputLifecycleInput::MarkApplied {
650            run_id: run_id.clone(),
651        })
652        .unwrap();
653        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
654            boundary_sequence: 1,
655        })
656        .unwrap();
657        let t = auth.apply(InputLifecycleInput::Consume).unwrap();
658        assert_eq!(t.next_phase, InputLifecycleState::Consumed);
659        assert!(auth.is_terminal());
660        assert!(matches!(
661            auth.terminal_outcome(),
662            Some(InputTerminalOutcome::Consumed)
663        ));
664    }
665
666    #[test]
667    fn full_happy_path_history() {
668        let mut auth = make_authority();
669        let run_id = RunId::new();
670        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
671        auth.apply(InputLifecycleInput::StageForRun {
672            run_id: run_id.clone(),
673        })
674        .unwrap();
675        auth.apply(InputLifecycleInput::MarkApplied {
676            run_id: run_id.clone(),
677        })
678        .unwrap();
679        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
680            boundary_sequence: 1,
681        })
682        .unwrap();
683        auth.apply(InputLifecycleInput::Consume).unwrap();
684        assert_eq!(auth.history().len(), 5);
685    }
686
687    // ---- Staged rollback ----
688
689    #[test]
690    fn staged_to_queued_rollback() {
691        let mut auth = make_authority();
692        let run_id = RunId::new();
693        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
694        auth.apply(InputLifecycleInput::StageForRun {
695            run_id: run_id.clone(),
696        })
697        .unwrap();
698        let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
699        assert_eq!(t.next_phase, InputLifecycleState::Queued);
700        assert_eq!(auth.phase(), InputLifecycleState::Queued);
701    }
702
703    // ---- MarkApplied guard: matches_last_run ----
704
705    #[test]
706    fn mark_applied_rejects_wrong_run_id() {
707        let mut auth = make_authority();
708        let run_id = RunId::new();
709        let wrong_run_id = RunId::new();
710        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
711        auth.apply(InputLifecycleInput::StageForRun {
712            run_id: run_id.clone(),
713        })
714        .unwrap();
715        let result = auth.apply(InputLifecycleInput::MarkApplied {
716            run_id: wrong_run_id,
717        });
718        assert!(result.is_err());
719        assert!(matches!(
720            result.unwrap_err(),
721            InputLifecycleError::GuardFailed { .. }
722        ));
723        // Phase unchanged on failure.
724        assert_eq!(auth.phase(), InputLifecycleState::Staged);
725    }
726
727    // ---- Hard rule: AppliedPendingConsumption -> Queued REJECTED ----
728
729    #[test]
730    fn applied_pending_to_queued_rejected() {
731        let mut auth = make_authority();
732        let run_id = RunId::new();
733        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
734        auth.apply(InputLifecycleInput::StageForRun {
735            run_id: run_id.clone(),
736        })
737        .unwrap();
738        auth.apply(InputLifecycleInput::MarkApplied {
739            run_id: run_id.clone(),
740        })
741        .unwrap();
742        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
743            boundary_sequence: 1,
744        })
745        .unwrap();
746
747        let result = auth.apply(InputLifecycleInput::RollbackStaged);
748        assert!(result.is_err());
749        assert_eq!(auth.phase(), InputLifecycleState::AppliedPendingConsumption);
750    }
751
752    // ---- Terminal states reject ALL transitions ----
753
754    #[test]
755    fn consumed_rejects_all() {
756        let mut auth = make_authority();
757        let run_id = RunId::new();
758        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
759        auth.apply(InputLifecycleInput::StageForRun {
760            run_id: run_id.clone(),
761        })
762        .unwrap();
763        auth.apply(InputLifecycleInput::MarkApplied {
764            run_id: run_id.clone(),
765        })
766        .unwrap();
767        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
768            boundary_sequence: 1,
769        })
770        .unwrap();
771        auth.apply(InputLifecycleInput::Consume).unwrap();
772
773        let result = auth.apply(InputLifecycleInput::QueueAccepted);
774        assert!(result.is_err());
775        assert!(matches!(
776            result.unwrap_err(),
777            InputLifecycleError::TerminalState { .. }
778        ));
779    }
780
781    #[test]
782    fn superseded_rejects_all() {
783        let mut auth = make_authority();
784        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
785        auth.apply(InputLifecycleInput::Supersede).unwrap();
786        assert!(auth.is_terminal());
787
788        let result = auth.apply(InputLifecycleInput::QueueAccepted);
789        assert!(result.is_err());
790        assert!(matches!(
791            result.unwrap_err(),
792            InputLifecycleError::TerminalState { .. }
793        ));
794    }
795
796    #[test]
797    fn coalesced_rejects_all() {
798        let mut auth = make_authority();
799        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
800        auth.apply(InputLifecycleInput::Coalesce).unwrap();
801        assert!(auth.is_terminal());
802
803        let result = auth.apply(InputLifecycleInput::QueueAccepted);
804        assert!(result.is_err());
805    }
806
807    #[test]
808    fn abandoned_rejects_all() {
809        let mut auth = make_authority();
810        auth.apply(InputLifecycleInput::Abandon {
811            reason: InputAbandonReason::Retired,
812        })
813        .unwrap();
814        assert!(auth.is_terminal());
815
816        let result = auth.apply(InputLifecycleInput::QueueAccepted);
817        assert!(result.is_err());
818    }
819
820    // ---- Abandon from various states ----
821
822    #[test]
823    fn abandon_from_accepted() {
824        let mut auth = make_authority();
825        let t = auth
826            .apply(InputLifecycleInput::Abandon {
827                reason: InputAbandonReason::Retired,
828            })
829            .unwrap();
830        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
831        assert!(matches!(
832            auth.terminal_outcome(),
833            Some(InputTerminalOutcome::Abandoned {
834                reason: InputAbandonReason::Retired,
835            })
836        ));
837    }
838
839    #[test]
840    fn abandon_from_queued() {
841        let mut auth = make_authority();
842        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
843        let t = auth
844            .apply(InputLifecycleInput::Abandon {
845                reason: InputAbandonReason::Reset,
846            })
847            .unwrap();
848        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
849    }
850
851    #[test]
852    fn abandon_from_staged() {
853        let mut auth = make_authority();
854        let run_id = RunId::new();
855        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
856        auth.apply(InputLifecycleInput::StageForRun {
857            run_id: run_id.clone(),
858        })
859        .unwrap();
860        let t = auth
861            .apply(InputLifecycleInput::Abandon {
862                reason: InputAbandonReason::Destroyed,
863            })
864            .unwrap();
865        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
866    }
867
868    #[test]
869    fn abandon_from_applied() {
870        let mut auth = make_authority();
871        let run_id = RunId::new();
872        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
873        auth.apply(InputLifecycleInput::StageForRun {
874            run_id: run_id.clone(),
875        })
876        .unwrap();
877        auth.apply(InputLifecycleInput::MarkApplied {
878            run_id: run_id.clone(),
879        })
880        .unwrap();
881        let t = auth
882            .apply(InputLifecycleInput::Abandon {
883                reason: InputAbandonReason::Cancelled,
884            })
885            .unwrap();
886        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
887    }
888
889    #[test]
890    fn abandon_from_applied_pending() {
891        let mut auth = make_authority();
892        let run_id = RunId::new();
893        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
894        auth.apply(InputLifecycleInput::StageForRun {
895            run_id: run_id.clone(),
896        })
897        .unwrap();
898        auth.apply(InputLifecycleInput::MarkApplied {
899            run_id: run_id.clone(),
900        })
901        .unwrap();
902        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
903            boundary_sequence: 1,
904        })
905        .unwrap();
906        let t = auth
907            .apply(InputLifecycleInput::Abandon {
908                reason: InputAbandonReason::Retired,
909            })
910            .unwrap();
911        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
912    }
913
914    // ---- ConsumeOnAccept (Ignore + OnAccept) ----
915
916    #[test]
917    fn consume_on_accept_from_accepted() {
918        let mut auth = make_authority();
919        let t = auth.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
920        assert_eq!(t.next_phase, InputLifecycleState::Consumed);
921        assert!(auth.is_terminal());
922        assert!(matches!(
923            auth.terminal_outcome(),
924            Some(InputTerminalOutcome::Consumed)
925        ));
926    }
927
928    #[test]
929    fn consume_on_accept_from_queued_rejected() {
930        let mut auth = make_authority();
931        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
932        let result = auth.apply(InputLifecycleInput::ConsumeOnAccept);
933        assert!(result.is_err());
934    }
935
936    // ---- Invalid transitions ----
937
938    #[test]
939    fn accepted_to_staged_invalid() {
940        let mut auth = make_authority();
941        let result = auth.apply(InputLifecycleInput::StageForRun {
942            run_id: RunId::new(),
943        });
944        assert!(result.is_err());
945    }
946
947    #[test]
948    fn accepted_to_applied_invalid() {
949        let mut auth = make_authority();
950        let result = auth.apply(InputLifecycleInput::MarkApplied {
951            run_id: RunId::new(),
952        });
953        assert!(result.is_err());
954    }
955
956    #[test]
957    fn queued_to_applied_invalid() {
958        let mut auth = make_authority();
959        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
960        let result = auth.apply(InputLifecycleInput::MarkApplied {
961            run_id: RunId::new(),
962        });
963        assert!(result.is_err());
964    }
965
966    #[test]
967    fn queued_to_consumed_invalid() {
968        let mut auth = make_authority();
969        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
970        let result = auth.apply(InputLifecycleInput::Consume);
971        assert!(result.is_err());
972    }
973
974    // ---- Supersede / Coalesce ----
975
976    #[test]
977    fn supersede_from_queued() {
978        let mut auth = make_authority();
979        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
980        let t = auth.apply(InputLifecycleInput::Supersede).unwrap();
981        assert_eq!(t.next_phase, InputLifecycleState::Superseded);
982        assert!(auth.is_terminal());
983        assert!(
984            t.effects
985                .iter()
986                .any(|e| matches!(e, InputLifecycleEffect::RecordTerminalOutcome { .. }))
987        );
988    }
989
990    #[test]
991    fn supersede_from_accepted_rejected() {
992        let mut auth = make_authority();
993        let result = auth.apply(InputLifecycleInput::Supersede);
994        assert!(result.is_err());
995    }
996
997    #[test]
998    fn coalesce_from_queued() {
999        let mut auth = make_authority();
1000        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1001        let t = auth.apply(InputLifecycleInput::Coalesce).unwrap();
1002        assert_eq!(t.next_phase, InputLifecycleState::Coalesced);
1003        assert!(auth.is_terminal());
1004    }
1005
1006    #[test]
1007    fn coalesce_from_accepted_rejected() {
1008        let mut auth = make_authority();
1009        let result = auth.apply(InputLifecycleInput::Coalesce);
1010        assert!(result.is_err());
1011    }
1012
1013    // ---- Set terminal outcome ----
1014
1015    #[test]
1016    fn set_terminal_outcome_superseded() {
1017        let mut auth = make_authority();
1018        let superseder = InputId::new();
1019        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1020        auth.apply(InputLifecycleInput::Supersede).unwrap();
1021        auth.set_terminal_outcome(InputTerminalOutcome::Superseded {
1022            superseded_by: superseder.clone(),
1023        });
1024        match auth.terminal_outcome() {
1025            Some(InputTerminalOutcome::Superseded { superseded_by }) => {
1026                assert_eq!(superseded_by, &superseder);
1027            }
1028            other => panic!("expected Superseded, got {other:?}"),
1029        }
1030    }
1031
1032    #[test]
1033    fn set_terminal_outcome_coalesced() {
1034        let mut auth = make_authority();
1035        let aggregate = InputId::new();
1036        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1037        auth.apply(InputLifecycleInput::Coalesce).unwrap();
1038        auth.set_terminal_outcome(InputTerminalOutcome::Coalesced {
1039            aggregate_id: aggregate.clone(),
1040        });
1041        match auth.terminal_outcome() {
1042            Some(InputTerminalOutcome::Coalesced { aggregate_id }) => {
1043                assert_eq!(aggregate_id, &aggregate);
1044            }
1045            other => panic!("expected Coalesced, got {other:?}"),
1046        }
1047    }
1048
1049    // ---- History recording ----
1050
1051    #[test]
1052    fn history_records_reason() {
1053        let mut auth = make_authority();
1054        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1055        assert!(auth.history()[0].reason.is_some());
1056        assert!(
1057            auth.history()[0]
1058                .reason
1059                .as_deref()
1060                .is_some_and(|r| r.contains("QueueAccepted"))
1061        );
1062    }
1063
1064    #[test]
1065    fn history_records_timestamps() {
1066        let mut auth = make_authority();
1067        let before = Utc::now();
1068        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1069        let after = Utc::now();
1070        assert!(auth.history()[0].timestamp >= before);
1071        assert!(auth.history()[0].timestamp <= after);
1072    }
1073
1074    // ---- can_accept probing ----
1075
1076    #[test]
1077    fn can_accept_probes_without_mutation() {
1078        let auth = make_authority();
1079        assert!(auth.can_accept(&InputLifecycleInput::QueueAccepted));
1080        assert!(!auth.can_accept(&InputLifecycleInput::Consume));
1081        // Phase is still Accepted -- no mutation.
1082        assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1083    }
1084
1085    // ---- require_phase ----
1086
1087    #[test]
1088    fn require_phase_accepts_allowed() {
1089        let auth = make_authority();
1090        assert!(
1091            auth.require_phase(&[InputLifecycleState::Accepted, InputLifecycleState::Queued])
1092                .is_ok()
1093        );
1094    }
1095
1096    #[test]
1097    fn require_phase_rejects_disallowed() {
1098        let auth = make_authority();
1099        let result = auth.require_phase(&[InputLifecycleState::Queued]);
1100        assert!(matches!(
1101            result,
1102            Err(InputLifecycleError::InvalidTransition { .. })
1103        ));
1104    }
1105
1106    // ---- Phase unchanged on failure ----
1107
1108    #[test]
1109    fn phase_unchanged_on_rejected_transition() {
1110        let mut auth = make_authority();
1111        let _ = auth.apply(InputLifecycleInput::Consume);
1112        assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1113        assert!(auth.history().is_empty());
1114    }
1115
1116    // ---- Restore from persisted state ----
1117
1118    #[test]
1119    fn restore_preserves_all_fields() {
1120        let run_id = RunId::new();
1121        let auth = InputLifecycleAuthority::restore(
1122            InputLifecycleState::Applied,
1123            None,
1124            Some(run_id.clone()),
1125            Some(42),
1126            2, // attempt_count
1127            vec![InputStateHistoryEntry {
1128                timestamp: Utc::now(),
1129                from: InputLifecycleState::Accepted,
1130                to: InputLifecycleState::Queued,
1131                reason: Some("restored".into()),
1132            }],
1133            Utc::now(),
1134        );
1135        assert_eq!(auth.phase(), InputLifecycleState::Applied);
1136        assert_eq!(auth.last_run_id(), Some(&run_id));
1137        assert_eq!(auth.last_boundary_sequence(), Some(42));
1138        assert_eq!(auth.attempt_count(), 2);
1139        assert_eq!(auth.history().len(), 1);
1140    }
1141
1142    #[test]
1143    fn restore_preserves_attempt_count_for_retry_bound() {
1144        // Regression test: attempt_count must survive recovery so that
1145        // MAX_STAGE_ATTEMPTS is not reset on restart.
1146        let auth = InputLifecycleAuthority::restore(
1147            InputLifecycleState::Queued,
1148            None,
1149            None,
1150            None,
1151            2, // 2 prior attempts
1152            Vec::new(),
1153            Utc::now(),
1154        );
1155
1156        // One more stage + rollback should push to Abandoned (3 >= MAX_STAGE_ATTEMPTS)
1157        let mut auth = auth;
1158        auth.apply(InputLifecycleInput::StageForRun {
1159            run_id: RunId::new(),
1160        })
1161        .unwrap();
1162        assert_eq!(auth.attempt_count(), 3);
1163
1164        let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
1165        assert_eq!(
1166            t.next_phase,
1167            InputLifecycleState::Abandoned,
1168            "after 3 attempts (2 restored + 1 new), rollback should abandon"
1169        );
1170    }
1171
1172    // ---- Abandon from all non-terminal states ----
1173
1174    #[test]
1175    fn abandon_from_all_non_terminal_states() {
1176        for phase in [
1177            InputLifecycleState::Accepted,
1178            InputLifecycleState::Queued,
1179            InputLifecycleState::Staged,
1180            InputLifecycleState::Applied,
1181            InputLifecycleState::AppliedPendingConsumption,
1182        ] {
1183            let mut auth = make_at_phase(phase);
1184            let t = auth.apply(InputLifecycleInput::Abandon {
1185                reason: InputAbandonReason::Destroyed,
1186            });
1187            assert!(
1188                t.is_ok(),
1189                "abandon should succeed from {phase:?}, got {t:?}"
1190            );
1191            assert!(auth.is_terminal());
1192        }
1193    }
1194
1195    // ---- Abandon from terminal states rejected ----
1196
1197    #[test]
1198    fn abandon_from_terminal_states_rejected() {
1199        for phase in [
1200            InputLifecycleState::Consumed,
1201            InputLifecycleState::Superseded,
1202            InputLifecycleState::Coalesced,
1203            InputLifecycleState::Abandoned,
1204        ] {
1205            let mut auth = make_at_phase(phase);
1206            let result = auth.apply(InputLifecycleInput::Abandon {
1207                reason: InputAbandonReason::Destroyed,
1208            });
1209            assert!(
1210                result.is_err(),
1211                "abandon should be rejected from terminal {phase:?}"
1212            );
1213        }
1214    }
1215
1216    // ---- Effects emitted correctly ----
1217
1218    #[test]
1219    fn consume_emits_notice_and_terminal_outcome() {
1220        let mut auth = make_authority();
1221        let run_id = RunId::new();
1222        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1223        auth.apply(InputLifecycleInput::StageForRun {
1224            run_id: run_id.clone(),
1225        })
1226        .unwrap();
1227        auth.apply(InputLifecycleInput::MarkApplied {
1228            run_id: run_id.clone(),
1229        })
1230        .unwrap();
1231        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
1232            boundary_sequence: 1,
1233        })
1234        .unwrap();
1235        let t = auth.apply(InputLifecycleInput::Consume).unwrap();
1236        assert!(t.effects.iter().any(|e| matches!(
1237            e,
1238            InputLifecycleEffect::InputLifecycleNotice {
1239                new_state: InputLifecycleState::Consumed
1240            }
1241        )));
1242        assert!(t.effects.iter().any(|e| matches!(
1243            e,
1244            InputLifecycleEffect::RecordTerminalOutcome {
1245                outcome: InputTerminalOutcome::Consumed
1246            }
1247        )));
1248    }
1249
1250    #[test]
1251    fn abandon_emits_notice_and_terminal_outcome() {
1252        let mut auth = make_authority();
1253        let t = auth
1254            .apply(InputLifecycleInput::Abandon {
1255                reason: InputAbandonReason::Cancelled,
1256            })
1257            .unwrap();
1258        assert!(t.effects.iter().any(|e| matches!(
1259            e,
1260            InputLifecycleEffect::InputLifecycleNotice {
1261                new_state: InputLifecycleState::Abandoned
1262            }
1263        )));
1264        assert!(t.effects.iter().any(|e| matches!(
1265            e,
1266            InputLifecycleEffect::RecordTerminalOutcome {
1267                outcome: InputTerminalOutcome::Abandoned {
1268                    reason: InputAbandonReason::Cancelled,
1269                },
1270            }
1271        )));
1272    }
1273}