Skip to main content

meerkat_runtime/
input_lifecycle_authority.rs

1//! Generated-authority module for the InputLifecycle machine.
2//!
3//! This module provides typed enums and a sealed mutator trait that enforces
4//! all InputLifecycle state mutations flow through the machine authority.
5//! Handwritten shell code calls [`InputLifecycleAuthority::apply`] and executes
6//! returned effects; it cannot mutate canonical state directly.
7//!
8//! The transition table encoded here is the single source of truth, matching
9//! the machine schema in `meerkat-machine-schema/src/catalog/input_lifecycle.rs`:
10//!
11//! - 9 states: Accepted, Queued, Staged, Applied, AppliedPendingConsumption,
12//!   Consumed, Superseded, Coalesced, Abandoned
13//! - 9 inputs: QueueAccepted, StageForRun, RollbackStaged, MarkApplied,
14//!   MarkAppliedPendingConsumption, Consume, Supersede, Coalesce, Abandon
15//! - 3 fields: terminal_outcome, last_run_id, last_boundary_sequence
16//! - 4 terminal states: Consumed, Superseded, Coalesced, Abandoned
17//! - 4 effects: InputLifecycleNotice, RecordTerminalOutcome,
18//!   RecordRunAssociation, RecordBoundarySequence
19
20use chrono::{DateTime, Utc};
21use meerkat_core::lifecycle::{InputId, RunId};
22
23use crate::input_state::{
24    InputAbandonReason, InputLifecycleState, InputStateHistoryEntry, InputTerminalOutcome,
25};
26
27// ---------------------------------------------------------------------------
28// Typed input enum -- mirrors the machine schema's input variants
29// ---------------------------------------------------------------------------
30
31/// Typed inputs for the InputLifecycle machine.
32///
33/// Shell code classifies raw commands into these typed inputs, then calls
34/// [`InputLifecycleAuthority::apply`]. The authority decides transition legality.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum InputLifecycleInput {
37    /// Accepted -> Queued: input has been policy-resolved and is ready for queueing.
38    QueueAccepted,
39    /// Queued -> Staged: input is being staged for a specific run.
40    StageForRun { run_id: RunId },
41    /// Staged -> Queued: rollback on run failure.
42    RollbackStaged,
43    /// Staged -> Applied: the input's boundary primitive has been applied.
44    MarkApplied { run_id: RunId },
45    /// Applied -> AppliedPendingConsumption: boundary receipt confirms application.
46    MarkAppliedPendingConsumption { boundary_sequence: u64 },
47    /// AppliedPendingConsumption -> Consumed: run completed successfully.
48    Consume,
49    /// Queued -> Superseded: a newer input with the same supersession scope arrived.
50    Supersede,
51    /// Queued -> Coalesced: input was merged into an aggregate.
52    Coalesce,
53    /// Any non-terminal -> Abandoned: input was abandoned (retire/reset/destroy/cancel).
54    Abandon { reason: InputAbandonReason },
55    /// Accepted -> Consumed: shortcut for Ignore+OnAccept policy (no queue/run cycle).
56    ConsumeOnAccept,
57}
58
59// ---------------------------------------------------------------------------
60// Typed effect enum -- mirrors the machine schema's effect variants
61// ---------------------------------------------------------------------------
62
63/// Effects emitted by InputLifecycle transitions.
64///
65/// Shell code receives these from [`InputLifecycleAuthority::apply`] and is
66/// responsible for executing the side effects (e.g. emitting events, persisting).
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum InputLifecycleEffect {
69    /// Notify observers of the new lifecycle state.
70    InputLifecycleNotice { new_state: InputLifecycleState },
71    /// Record the terminal outcome for this input.
72    RecordTerminalOutcome { outcome: InputTerminalOutcome },
73    /// Record the run association (which run touched this input).
74    RecordRunAssociation { run_id: RunId },
75    /// Record the boundary receipt sequence number.
76    RecordBoundarySequence { boundary_sequence: u64 },
77}
78
79// ---------------------------------------------------------------------------
80// Transition result
81// ---------------------------------------------------------------------------
82
83/// Successful transition outcome from the InputLifecycle authority.
84#[derive(Debug)]
85pub struct InputLifecycleTransition {
86    /// The phase after the transition.
87    pub next_phase: InputLifecycleState,
88    /// Effects to be executed by shell code.
89    pub effects: Vec<InputLifecycleEffect>,
90}
91
92// ---------------------------------------------------------------------------
93// Error type
94// ---------------------------------------------------------------------------
95
96/// Errors from the input lifecycle authority.
97#[derive(Debug, Clone, thiserror::Error)]
98#[non_exhaustive]
99pub enum InputLifecycleError {
100    /// The transition is not valid from the current state.
101    #[error("Invalid transition: {from:?} via {input} (current phase rejects this input)")]
102    InvalidTransition {
103        from: InputLifecycleState,
104        input: String,
105    },
106    /// The input is already in a terminal state.
107    #[error("Input is in terminal state {state:?}")]
108    TerminalState { state: InputLifecycleState },
109    /// A guard condition was not met.
110    #[error("Guard failed: {guard} (from {from:?})")]
111    GuardFailed {
112        from: InputLifecycleState,
113        guard: String,
114    },
115}
116
117// ---------------------------------------------------------------------------
118// Canonical machine state (fields)
119// ---------------------------------------------------------------------------
120
121/// Canonical machine-owned fields for InputLifecycle.
122///
123/// These fields are owned exclusively by the authority and cannot be mutated
124/// by handwritten shell code.
125#[derive(Debug, Clone)]
126struct InputLifecycleFields {
127    terminal_outcome: Option<InputTerminalOutcome>,
128    last_run_id: Option<RunId>,
129    last_boundary_sequence: Option<u64>,
130}
131
132// ---------------------------------------------------------------------------
133// Sealed mutator trait -- only the authority implements this
134// ---------------------------------------------------------------------------
135
136mod sealed {
137    pub trait Sealed {}
138}
139
140/// Sealed trait for InputLifecycle state mutation.
141///
142/// Only [`InputLifecycleAuthority`] implements this. Handwritten code cannot
143/// create alternative implementations, ensuring single-source-of-truth
144/// semantics for lifecycle state.
145pub trait InputLifecycleMutator: sealed::Sealed {
146    /// Apply a typed input to the current machine state.
147    ///
148    /// Returns the transition result including next state and effects,
149    /// or an error if the transition is not legal from the current state.
150    fn apply(
151        &mut self,
152        input: InputLifecycleInput,
153    ) -> Result<InputLifecycleTransition, InputLifecycleError>;
154}
155
156// ---------------------------------------------------------------------------
157// Authority implementation
158// ---------------------------------------------------------------------------
159
160/// The canonical authority for InputLifecycle state.
161///
162/// Holds the canonical phase + fields and delegates all transitions through
163/// the encoded transition table. The authority OWNS the canonical state --
164/// callers cannot get `&mut` access to the inner fields.
165#[derive(Debug, Clone)]
166pub struct InputLifecycleAuthority {
167    /// Canonical phase.
168    phase: InputLifecycleState,
169    /// Canonical machine-owned fields.
170    fields: InputLifecycleFields,
171    /// State transition history.
172    history: Vec<InputStateHistoryEntry>,
173    /// Timestamp of last state change.
174    updated_at: DateTime<Utc>,
175}
176
177impl sealed::Sealed for InputLifecycleAuthority {}
178
179impl Default for InputLifecycleAuthority {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185impl InputLifecycleAuthority {
186    /// Create a new authority in the Accepted state.
187    pub fn new() -> Self {
188        Self::new_at(Utc::now())
189    }
190
191    /// Create a new authority in the Accepted state with a caller-owned timestamp.
192    ///
193    /// Use this when the caller already captured a canonical `now` to ensure
194    /// `updated_at` is consistent with sibling timestamps (e.g., `created_at`
195    /// on the owning `InputState`).
196    pub fn new_at(now: DateTime<Utc>) -> Self {
197        Self {
198            phase: InputLifecycleState::Accepted,
199            fields: InputLifecycleFields {
200                terminal_outcome: None,
201                last_run_id: None,
202                last_boundary_sequence: None,
203            },
204            history: Vec::new(),
205            updated_at: now,
206        }
207    }
208
209    /// Create an authority initialized to a specific phase (for recovery).
210    pub fn with_phase(phase: InputLifecycleState) -> Self {
211        Self {
212            phase,
213            fields: InputLifecycleFields {
214                terminal_outcome: None,
215                last_run_id: None,
216                last_boundary_sequence: None,
217            },
218            history: Vec::new(),
219            updated_at: Utc::now(),
220        }
221    }
222
223    /// Restore an authority from persisted state (for crash recovery).
224    ///
225    /// This reconstructs the authority from a previously-serialized snapshot,
226    /// including all canonical fields and history.
227    pub fn restore(
228        phase: InputLifecycleState,
229        terminal_outcome: Option<InputTerminalOutcome>,
230        last_run_id: Option<RunId>,
231        last_boundary_sequence: Option<u64>,
232        history: Vec<InputStateHistoryEntry>,
233        updated_at: DateTime<Utc>,
234    ) -> Self {
235        Self {
236            phase,
237            fields: InputLifecycleFields {
238                terminal_outcome,
239                last_run_id,
240                last_boundary_sequence,
241            },
242            history,
243            updated_at,
244        }
245    }
246
247    /// Current phase (read from canonical state).
248    pub fn phase(&self) -> InputLifecycleState {
249        self.phase
250    }
251
252    /// Whether the current phase is terminal.
253    pub fn is_terminal(&self) -> bool {
254        self.phase.is_terminal()
255    }
256
257    /// Current terminal outcome (if in a terminal state).
258    pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
259        self.fields.terminal_outcome.as_ref()
260    }
261
262    /// Last run ID that touched this input.
263    pub fn last_run_id(&self) -> Option<&RunId> {
264        self.fields.last_run_id.as_ref()
265    }
266
267    /// Last boundary sequence number.
268    pub fn last_boundary_sequence(&self) -> Option<u64> {
269        self.fields.last_boundary_sequence
270    }
271
272    /// State transition history.
273    pub fn history(&self) -> &[InputStateHistoryEntry] {
274        &self.history
275    }
276
277    /// Timestamp of last state change.
278    pub fn updated_at(&self) -> DateTime<Utc> {
279        self.updated_at
280    }
281
282    /// Check if a transition is legal without applying it.
283    pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
284        self.evaluate(input).is_ok()
285    }
286
287    /// Require that the authority is in one of the given phases.
288    pub fn require_phase(
289        &self,
290        allowed: &[InputLifecycleState],
291    ) -> Result<(), InputLifecycleError> {
292        if allowed.contains(&self.phase) {
293            Ok(())
294        } else {
295            Err(InputLifecycleError::InvalidTransition {
296                from: self.phase,
297                input: format!("require_phase({allowed:?})"),
298            })
299        }
300    }
301
302    /// Evaluate a transition without committing it.
303    fn evaluate(
304        &self,
305        input: &InputLifecycleInput,
306    ) -> Result<
307        (
308            InputLifecycleState,
309            InputLifecycleFields,
310            Vec<InputLifecycleEffect>,
311        ),
312        InputLifecycleError,
313    > {
314        #[allow(clippy::enum_glob_use)]
315        use InputLifecycleInput::*;
316        #[allow(clippy::enum_glob_use)]
317        use InputLifecycleState::*;
318
319        let phase = self.phase;
320        let mut fields = self.fields.clone();
321        let mut effects = Vec::new();
322
323        // Terminal states reject ALL inputs.
324        if phase.is_terminal() {
325            return Err(InputLifecycleError::TerminalState { state: phase });
326        }
327
328        let next_phase = match (phase, input) {
329            // QueueAccepted: Accepted -> Queued
330            (Accepted, QueueAccepted) => {
331                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
332                Queued
333            }
334
335            // StageForRun: Queued -> Staged
336            (Queued, StageForRun { run_id }) => {
337                fields.last_run_id = Some(run_id.clone());
338                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Staged });
339                effects.push(InputLifecycleEffect::RecordRunAssociation {
340                    run_id: run_id.clone(),
341                });
342                Staged
343            }
344
345            // RollbackStaged: Staged -> Queued
346            (Staged, RollbackStaged) => {
347                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Queued });
348                Queued
349            }
350
351            // MarkApplied: Staged -> Applied (guard: matches_last_run)
352            (Staged, MarkApplied { run_id }) => {
353                if self.fields.last_run_id.as_ref() != Some(run_id) {
354                    return Err(InputLifecycleError::GuardFailed {
355                        from: phase,
356                        guard: format!(
357                            "matches_last_run: expected {:?}, got {run_id:?}",
358                            self.fields.last_run_id
359                        ),
360                    });
361                }
362                effects.push(InputLifecycleEffect::InputLifecycleNotice { new_state: Applied });
363                Applied
364            }
365
366            // MarkAppliedPendingConsumption: Applied -> AppliedPendingConsumption
367            (Applied, MarkAppliedPendingConsumption { boundary_sequence }) => {
368                fields.last_boundary_sequence = Some(*boundary_sequence);
369                effects.push(InputLifecycleEffect::InputLifecycleNotice {
370                    new_state: AppliedPendingConsumption,
371                });
372                effects.push(InputLifecycleEffect::RecordBoundarySequence {
373                    boundary_sequence: *boundary_sequence,
374                });
375                AppliedPendingConsumption
376            }
377
378            // Consume: AppliedPendingConsumption -> Consumed
379            (AppliedPendingConsumption, Consume) => {
380                let outcome = InputTerminalOutcome::Consumed;
381                fields.terminal_outcome = Some(outcome.clone());
382                effects.push(InputLifecycleEffect::InputLifecycleNotice {
383                    new_state: Consumed,
384                });
385                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
386                Consumed
387            }
388
389            // Supersede: Queued -> Superseded
390            (Queued, Supersede) => {
391                let outcome = InputTerminalOutcome::Superseded {
392                    superseded_by: InputId::new(), // placeholder, caller sets via set_terminal_outcome
393                };
394                fields.terminal_outcome = Some(outcome.clone());
395                effects.push(InputLifecycleEffect::InputLifecycleNotice {
396                    new_state: Superseded,
397                });
398                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
399                Superseded
400            }
401
402            // Coalesce: Queued -> Coalesced
403            (Queued, Coalesce) => {
404                let outcome = InputTerminalOutcome::Coalesced {
405                    aggregate_id: InputId::new(), // placeholder, caller sets via set_terminal_outcome
406                };
407                fields.terminal_outcome = Some(outcome.clone());
408                effects.push(InputLifecycleEffect::InputLifecycleNotice {
409                    new_state: Coalesced,
410                });
411                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
412                Coalesced
413            }
414
415            // Abandon: any non-terminal -> Abandoned
416            (
417                Accepted | Queued | Staged | Applied | AppliedPendingConsumption,
418                Abandon { reason },
419            ) => {
420                let outcome = InputTerminalOutcome::Abandoned {
421                    reason: reason.clone(),
422                };
423                fields.terminal_outcome = Some(outcome.clone());
424                effects.push(InputLifecycleEffect::InputLifecycleNotice {
425                    new_state: Abandoned,
426                });
427                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
428                Abandoned
429            }
430
431            // ConsumeOnAccept: Accepted -> Consumed (Ignore+OnAccept shortcut)
432            (Accepted, ConsumeOnAccept) => {
433                let outcome = InputTerminalOutcome::Consumed;
434                fields.terminal_outcome = Some(outcome.clone());
435                effects.push(InputLifecycleEffect::InputLifecycleNotice {
436                    new_state: Consumed,
437                });
438                effects.push(InputLifecycleEffect::RecordTerminalOutcome { outcome });
439                Consumed
440            }
441
442            // All other combinations are illegal.
443            _ => {
444                return Err(InputLifecycleError::InvalidTransition {
445                    from: phase,
446                    input: format!("{input:?}"),
447                });
448            }
449        };
450
451        Ok((next_phase, fields, effects))
452    }
453
454    /// Set the terminal outcome after a transition.
455    ///
456    /// Used by shell code for Superseded/Coalesced which need caller-provided
457    /// data (superseded_by / aggregate_id) that the authority's transition
458    /// doesn't know at evaluate-time.
459    pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
460        self.fields.terminal_outcome = Some(outcome);
461    }
462
463    /// Stamp receipt metadata onto the authority state.
464    ///
465    /// Used by the store layer after it generates an authoritative receipt
466    /// with a sequence number. This is NOT a lifecycle transition -- it's
467    /// operational metadata enrichment by the persistence layer.
468    pub fn stamp_receipt_metadata(&mut self, run_id: RunId, boundary_sequence: u64) {
469        self.fields.last_run_id = Some(run_id);
470        self.fields.last_boundary_sequence = Some(boundary_sequence);
471    }
472}
473
474impl InputLifecycleMutator for InputLifecycleAuthority {
475    fn apply(
476        &mut self,
477        input: InputLifecycleInput,
478    ) -> Result<InputLifecycleTransition, InputLifecycleError> {
479        let from = self.phase;
480        let reason = format!("{input:?}");
481        let (next_phase, next_fields, effects) = self.evaluate(&input)?;
482
483        let now = Utc::now();
484
485        // Record history.
486        self.history.push(InputStateHistoryEntry {
487            timestamp: now,
488            from,
489            to: next_phase,
490            reason: Some(reason),
491        });
492
493        // Commit: update canonical state.
494        self.phase = next_phase;
495        self.fields = next_fields;
496        self.updated_at = now;
497
498        Ok(InputLifecycleTransition {
499            next_phase,
500            effects,
501        })
502    }
503}
504
505// ---------------------------------------------------------------------------
506// Tests
507// ---------------------------------------------------------------------------
508
509#[cfg(test)]
510#[allow(
511    clippy::unwrap_used,
512    clippy::expect_used,
513    clippy::redundant_clone,
514    clippy::panic
515)]
516mod tests {
517    use super::*;
518
519    fn make_authority() -> InputLifecycleAuthority {
520        InputLifecycleAuthority::new()
521    }
522
523    fn make_at_phase(phase: InputLifecycleState) -> InputLifecycleAuthority {
524        InputLifecycleAuthority::with_phase(phase)
525    }
526
527    // ---- Happy path: full lifecycle ----
528
529    #[test]
530    fn accepted_to_queued() {
531        let mut auth = make_authority();
532        let t = auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
533        assert_eq!(t.next_phase, InputLifecycleState::Queued);
534        assert_eq!(auth.phase(), InputLifecycleState::Queued);
535        assert_eq!(auth.history().len(), 1);
536        assert_eq!(auth.history()[0].from, InputLifecycleState::Accepted);
537        assert_eq!(auth.history()[0].to, InputLifecycleState::Queued);
538    }
539
540    #[test]
541    fn queued_to_staged() {
542        let mut auth = make_authority();
543        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
544        let run_id = RunId::new();
545        let t = auth
546            .apply(InputLifecycleInput::StageForRun {
547                run_id: run_id.clone(),
548            })
549            .unwrap();
550        assert_eq!(t.next_phase, InputLifecycleState::Staged);
551        assert_eq!(auth.last_run_id(), Some(&run_id));
552        assert!(
553            t.effects
554                .iter()
555                .any(|e| matches!(e, InputLifecycleEffect::RecordRunAssociation { .. }))
556        );
557    }
558
559    #[test]
560    fn staged_to_applied() {
561        let mut auth = make_authority();
562        let run_id = RunId::new();
563        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
564        auth.apply(InputLifecycleInput::StageForRun {
565            run_id: run_id.clone(),
566        })
567        .unwrap();
568        let t = auth
569            .apply(InputLifecycleInput::MarkApplied {
570                run_id: run_id.clone(),
571            })
572            .unwrap();
573        assert_eq!(t.next_phase, InputLifecycleState::Applied);
574    }
575
576    #[test]
577    fn applied_to_applied_pending_consumption() {
578        let mut auth = make_authority();
579        let run_id = RunId::new();
580        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
581        auth.apply(InputLifecycleInput::StageForRun {
582            run_id: run_id.clone(),
583        })
584        .unwrap();
585        auth.apply(InputLifecycleInput::MarkApplied {
586            run_id: run_id.clone(),
587        })
588        .unwrap();
589        let t = auth
590            .apply(InputLifecycleInput::MarkAppliedPendingConsumption {
591                boundary_sequence: 42,
592            })
593            .unwrap();
594        assert_eq!(t.next_phase, InputLifecycleState::AppliedPendingConsumption);
595        assert_eq!(auth.last_boundary_sequence(), Some(42));
596        assert!(t.effects.iter().any(|e| matches!(
597            e,
598            InputLifecycleEffect::RecordBoundarySequence {
599                boundary_sequence: 42
600            }
601        )));
602    }
603
604    #[test]
605    fn applied_pending_to_consumed() {
606        let mut auth = make_authority();
607        let run_id = RunId::new();
608        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
609        auth.apply(InputLifecycleInput::StageForRun {
610            run_id: run_id.clone(),
611        })
612        .unwrap();
613        auth.apply(InputLifecycleInput::MarkApplied {
614            run_id: run_id.clone(),
615        })
616        .unwrap();
617        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
618            boundary_sequence: 1,
619        })
620        .unwrap();
621        let t = auth.apply(InputLifecycleInput::Consume).unwrap();
622        assert_eq!(t.next_phase, InputLifecycleState::Consumed);
623        assert!(auth.is_terminal());
624        assert!(matches!(
625            auth.terminal_outcome(),
626            Some(InputTerminalOutcome::Consumed)
627        ));
628    }
629
630    #[test]
631    fn full_happy_path_history() {
632        let mut auth = make_authority();
633        let run_id = RunId::new();
634        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
635        auth.apply(InputLifecycleInput::StageForRun {
636            run_id: run_id.clone(),
637        })
638        .unwrap();
639        auth.apply(InputLifecycleInput::MarkApplied {
640            run_id: run_id.clone(),
641        })
642        .unwrap();
643        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
644            boundary_sequence: 1,
645        })
646        .unwrap();
647        auth.apply(InputLifecycleInput::Consume).unwrap();
648        assert_eq!(auth.history().len(), 5);
649    }
650
651    // ---- Staged rollback ----
652
653    #[test]
654    fn staged_to_queued_rollback() {
655        let mut auth = make_authority();
656        let run_id = RunId::new();
657        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
658        auth.apply(InputLifecycleInput::StageForRun {
659            run_id: run_id.clone(),
660        })
661        .unwrap();
662        let t = auth.apply(InputLifecycleInput::RollbackStaged).unwrap();
663        assert_eq!(t.next_phase, InputLifecycleState::Queued);
664        assert_eq!(auth.phase(), InputLifecycleState::Queued);
665    }
666
667    // ---- MarkApplied guard: matches_last_run ----
668
669    #[test]
670    fn mark_applied_rejects_wrong_run_id() {
671        let mut auth = make_authority();
672        let run_id = RunId::new();
673        let wrong_run_id = RunId::new();
674        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
675        auth.apply(InputLifecycleInput::StageForRun {
676            run_id: run_id.clone(),
677        })
678        .unwrap();
679        let result = auth.apply(InputLifecycleInput::MarkApplied {
680            run_id: wrong_run_id,
681        });
682        assert!(result.is_err());
683        assert!(matches!(
684            result.unwrap_err(),
685            InputLifecycleError::GuardFailed { .. }
686        ));
687        // Phase unchanged on failure.
688        assert_eq!(auth.phase(), InputLifecycleState::Staged);
689    }
690
691    // ---- Hard rule: AppliedPendingConsumption -> Queued REJECTED ----
692
693    #[test]
694    fn applied_pending_to_queued_rejected() {
695        let mut auth = make_authority();
696        let run_id = RunId::new();
697        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
698        auth.apply(InputLifecycleInput::StageForRun {
699            run_id: run_id.clone(),
700        })
701        .unwrap();
702        auth.apply(InputLifecycleInput::MarkApplied {
703            run_id: run_id.clone(),
704        })
705        .unwrap();
706        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
707            boundary_sequence: 1,
708        })
709        .unwrap();
710
711        let result = auth.apply(InputLifecycleInput::RollbackStaged);
712        assert!(result.is_err());
713        assert_eq!(auth.phase(), InputLifecycleState::AppliedPendingConsumption);
714    }
715
716    // ---- Terminal states reject ALL transitions ----
717
718    #[test]
719    fn consumed_rejects_all() {
720        let mut auth = make_authority();
721        let run_id = RunId::new();
722        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
723        auth.apply(InputLifecycleInput::StageForRun {
724            run_id: run_id.clone(),
725        })
726        .unwrap();
727        auth.apply(InputLifecycleInput::MarkApplied {
728            run_id: run_id.clone(),
729        })
730        .unwrap();
731        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
732            boundary_sequence: 1,
733        })
734        .unwrap();
735        auth.apply(InputLifecycleInput::Consume).unwrap();
736
737        let result = auth.apply(InputLifecycleInput::QueueAccepted);
738        assert!(result.is_err());
739        assert!(matches!(
740            result.unwrap_err(),
741            InputLifecycleError::TerminalState { .. }
742        ));
743    }
744
745    #[test]
746    fn superseded_rejects_all() {
747        let mut auth = make_authority();
748        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
749        auth.apply(InputLifecycleInput::Supersede).unwrap();
750        assert!(auth.is_terminal());
751
752        let result = auth.apply(InputLifecycleInput::QueueAccepted);
753        assert!(result.is_err());
754        assert!(matches!(
755            result.unwrap_err(),
756            InputLifecycleError::TerminalState { .. }
757        ));
758    }
759
760    #[test]
761    fn coalesced_rejects_all() {
762        let mut auth = make_authority();
763        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
764        auth.apply(InputLifecycleInput::Coalesce).unwrap();
765        assert!(auth.is_terminal());
766
767        let result = auth.apply(InputLifecycleInput::QueueAccepted);
768        assert!(result.is_err());
769    }
770
771    #[test]
772    fn abandoned_rejects_all() {
773        let mut auth = make_authority();
774        auth.apply(InputLifecycleInput::Abandon {
775            reason: InputAbandonReason::Retired,
776        })
777        .unwrap();
778        assert!(auth.is_terminal());
779
780        let result = auth.apply(InputLifecycleInput::QueueAccepted);
781        assert!(result.is_err());
782    }
783
784    // ---- Abandon from various states ----
785
786    #[test]
787    fn abandon_from_accepted() {
788        let mut auth = make_authority();
789        let t = auth
790            .apply(InputLifecycleInput::Abandon {
791                reason: InputAbandonReason::Retired,
792            })
793            .unwrap();
794        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
795        assert!(matches!(
796            auth.terminal_outcome(),
797            Some(InputTerminalOutcome::Abandoned {
798                reason: InputAbandonReason::Retired,
799            })
800        ));
801    }
802
803    #[test]
804    fn abandon_from_queued() {
805        let mut auth = make_authority();
806        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
807        let t = auth
808            .apply(InputLifecycleInput::Abandon {
809                reason: InputAbandonReason::Reset,
810            })
811            .unwrap();
812        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
813    }
814
815    #[test]
816    fn abandon_from_staged() {
817        let mut auth = make_authority();
818        let run_id = RunId::new();
819        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
820        auth.apply(InputLifecycleInput::StageForRun {
821            run_id: run_id.clone(),
822        })
823        .unwrap();
824        let t = auth
825            .apply(InputLifecycleInput::Abandon {
826                reason: InputAbandonReason::Destroyed,
827            })
828            .unwrap();
829        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
830    }
831
832    #[test]
833    fn abandon_from_applied() {
834        let mut auth = make_authority();
835        let run_id = RunId::new();
836        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
837        auth.apply(InputLifecycleInput::StageForRun {
838            run_id: run_id.clone(),
839        })
840        .unwrap();
841        auth.apply(InputLifecycleInput::MarkApplied {
842            run_id: run_id.clone(),
843        })
844        .unwrap();
845        let t = auth
846            .apply(InputLifecycleInput::Abandon {
847                reason: InputAbandonReason::Cancelled,
848            })
849            .unwrap();
850        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
851    }
852
853    #[test]
854    fn abandon_from_applied_pending() {
855        let mut auth = make_authority();
856        let run_id = RunId::new();
857        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
858        auth.apply(InputLifecycleInput::StageForRun {
859            run_id: run_id.clone(),
860        })
861        .unwrap();
862        auth.apply(InputLifecycleInput::MarkApplied {
863            run_id: run_id.clone(),
864        })
865        .unwrap();
866        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
867            boundary_sequence: 1,
868        })
869        .unwrap();
870        let t = auth
871            .apply(InputLifecycleInput::Abandon {
872                reason: InputAbandonReason::Retired,
873            })
874            .unwrap();
875        assert_eq!(t.next_phase, InputLifecycleState::Abandoned);
876    }
877
878    // ---- ConsumeOnAccept (Ignore + OnAccept) ----
879
880    #[test]
881    fn consume_on_accept_from_accepted() {
882        let mut auth = make_authority();
883        let t = auth.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
884        assert_eq!(t.next_phase, InputLifecycleState::Consumed);
885        assert!(auth.is_terminal());
886        assert!(matches!(
887            auth.terminal_outcome(),
888            Some(InputTerminalOutcome::Consumed)
889        ));
890    }
891
892    #[test]
893    fn consume_on_accept_from_queued_rejected() {
894        let mut auth = make_authority();
895        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
896        let result = auth.apply(InputLifecycleInput::ConsumeOnAccept);
897        assert!(result.is_err());
898    }
899
900    // ---- Invalid transitions ----
901
902    #[test]
903    fn accepted_to_staged_invalid() {
904        let mut auth = make_authority();
905        let result = auth.apply(InputLifecycleInput::StageForRun {
906            run_id: RunId::new(),
907        });
908        assert!(result.is_err());
909    }
910
911    #[test]
912    fn accepted_to_applied_invalid() {
913        let mut auth = make_authority();
914        let result = auth.apply(InputLifecycleInput::MarkApplied {
915            run_id: RunId::new(),
916        });
917        assert!(result.is_err());
918    }
919
920    #[test]
921    fn queued_to_applied_invalid() {
922        let mut auth = make_authority();
923        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
924        let result = auth.apply(InputLifecycleInput::MarkApplied {
925            run_id: RunId::new(),
926        });
927        assert!(result.is_err());
928    }
929
930    #[test]
931    fn queued_to_consumed_invalid() {
932        let mut auth = make_authority();
933        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
934        let result = auth.apply(InputLifecycleInput::Consume);
935        assert!(result.is_err());
936    }
937
938    // ---- Supersede / Coalesce ----
939
940    #[test]
941    fn supersede_from_queued() {
942        let mut auth = make_authority();
943        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
944        let t = auth.apply(InputLifecycleInput::Supersede).unwrap();
945        assert_eq!(t.next_phase, InputLifecycleState::Superseded);
946        assert!(auth.is_terminal());
947        assert!(
948            t.effects
949                .iter()
950                .any(|e| matches!(e, InputLifecycleEffect::RecordTerminalOutcome { .. }))
951        );
952    }
953
954    #[test]
955    fn supersede_from_accepted_rejected() {
956        let mut auth = make_authority();
957        let result = auth.apply(InputLifecycleInput::Supersede);
958        assert!(result.is_err());
959    }
960
961    #[test]
962    fn coalesce_from_queued() {
963        let mut auth = make_authority();
964        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
965        let t = auth.apply(InputLifecycleInput::Coalesce).unwrap();
966        assert_eq!(t.next_phase, InputLifecycleState::Coalesced);
967        assert!(auth.is_terminal());
968    }
969
970    #[test]
971    fn coalesce_from_accepted_rejected() {
972        let mut auth = make_authority();
973        let result = auth.apply(InputLifecycleInput::Coalesce);
974        assert!(result.is_err());
975    }
976
977    // ---- Set terminal outcome ----
978
979    #[test]
980    fn set_terminal_outcome_superseded() {
981        let mut auth = make_authority();
982        let superseder = InputId::new();
983        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
984        auth.apply(InputLifecycleInput::Supersede).unwrap();
985        auth.set_terminal_outcome(InputTerminalOutcome::Superseded {
986            superseded_by: superseder.clone(),
987        });
988        match auth.terminal_outcome() {
989            Some(InputTerminalOutcome::Superseded { superseded_by }) => {
990                assert_eq!(superseded_by, &superseder);
991            }
992            other => panic!("expected Superseded, got {other:?}"),
993        }
994    }
995
996    #[test]
997    fn set_terminal_outcome_coalesced() {
998        let mut auth = make_authority();
999        let aggregate = InputId::new();
1000        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1001        auth.apply(InputLifecycleInput::Coalesce).unwrap();
1002        auth.set_terminal_outcome(InputTerminalOutcome::Coalesced {
1003            aggregate_id: aggregate.clone(),
1004        });
1005        match auth.terminal_outcome() {
1006            Some(InputTerminalOutcome::Coalesced { aggregate_id }) => {
1007                assert_eq!(aggregate_id, &aggregate);
1008            }
1009            other => panic!("expected Coalesced, got {other:?}"),
1010        }
1011    }
1012
1013    // ---- History recording ----
1014
1015    #[test]
1016    fn history_records_reason() {
1017        let mut auth = make_authority();
1018        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1019        assert!(auth.history()[0].reason.is_some());
1020        assert!(
1021            auth.history()[0]
1022                .reason
1023                .as_deref()
1024                .is_some_and(|r| r.contains("QueueAccepted"))
1025        );
1026    }
1027
1028    #[test]
1029    fn history_records_timestamps() {
1030        let mut auth = make_authority();
1031        let before = Utc::now();
1032        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1033        let after = Utc::now();
1034        assert!(auth.history()[0].timestamp >= before);
1035        assert!(auth.history()[0].timestamp <= after);
1036    }
1037
1038    // ---- can_accept probing ----
1039
1040    #[test]
1041    fn can_accept_probes_without_mutation() {
1042        let auth = make_authority();
1043        assert!(auth.can_accept(&InputLifecycleInput::QueueAccepted));
1044        assert!(!auth.can_accept(&InputLifecycleInput::Consume));
1045        // Phase is still Accepted -- no mutation.
1046        assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1047    }
1048
1049    // ---- require_phase ----
1050
1051    #[test]
1052    fn require_phase_accepts_allowed() {
1053        let auth = make_authority();
1054        assert!(
1055            auth.require_phase(&[InputLifecycleState::Accepted, InputLifecycleState::Queued])
1056                .is_ok()
1057        );
1058    }
1059
1060    #[test]
1061    fn require_phase_rejects_disallowed() {
1062        let auth = make_authority();
1063        let result = auth.require_phase(&[InputLifecycleState::Queued]);
1064        assert!(matches!(
1065            result,
1066            Err(InputLifecycleError::InvalidTransition { .. })
1067        ));
1068    }
1069
1070    // ---- Phase unchanged on failure ----
1071
1072    #[test]
1073    fn phase_unchanged_on_rejected_transition() {
1074        let mut auth = make_authority();
1075        let _ = auth.apply(InputLifecycleInput::Consume);
1076        assert_eq!(auth.phase(), InputLifecycleState::Accepted);
1077        assert!(auth.history().is_empty());
1078    }
1079
1080    // ---- Restore from persisted state ----
1081
1082    #[test]
1083    fn restore_preserves_all_fields() {
1084        let run_id = RunId::new();
1085        let auth = InputLifecycleAuthority::restore(
1086            InputLifecycleState::Applied,
1087            None,
1088            Some(run_id.clone()),
1089            Some(42),
1090            vec![InputStateHistoryEntry {
1091                timestamp: Utc::now(),
1092                from: InputLifecycleState::Accepted,
1093                to: InputLifecycleState::Queued,
1094                reason: Some("restored".into()),
1095            }],
1096            Utc::now(),
1097        );
1098        assert_eq!(auth.phase(), InputLifecycleState::Applied);
1099        assert_eq!(auth.last_run_id(), Some(&run_id));
1100        assert_eq!(auth.last_boundary_sequence(), Some(42));
1101        assert_eq!(auth.history().len(), 1);
1102    }
1103
1104    // ---- Abandon from all non-terminal states ----
1105
1106    #[test]
1107    fn abandon_from_all_non_terminal_states() {
1108        for phase in [
1109            InputLifecycleState::Accepted,
1110            InputLifecycleState::Queued,
1111            InputLifecycleState::Staged,
1112            InputLifecycleState::Applied,
1113            InputLifecycleState::AppliedPendingConsumption,
1114        ] {
1115            let mut auth = make_at_phase(phase);
1116            let t = auth.apply(InputLifecycleInput::Abandon {
1117                reason: InputAbandonReason::Destroyed,
1118            });
1119            assert!(
1120                t.is_ok(),
1121                "abandon should succeed from {phase:?}, got {t:?}"
1122            );
1123            assert!(auth.is_terminal());
1124        }
1125    }
1126
1127    // ---- Abandon from terminal states rejected ----
1128
1129    #[test]
1130    fn abandon_from_terminal_states_rejected() {
1131        for phase in [
1132            InputLifecycleState::Consumed,
1133            InputLifecycleState::Superseded,
1134            InputLifecycleState::Coalesced,
1135            InputLifecycleState::Abandoned,
1136        ] {
1137            let mut auth = make_at_phase(phase);
1138            let result = auth.apply(InputLifecycleInput::Abandon {
1139                reason: InputAbandonReason::Destroyed,
1140            });
1141            assert!(
1142                result.is_err(),
1143                "abandon should be rejected from terminal {phase:?}"
1144            );
1145        }
1146    }
1147
1148    // ---- Effects emitted correctly ----
1149
1150    #[test]
1151    fn consume_emits_notice_and_terminal_outcome() {
1152        let mut auth = make_authority();
1153        let run_id = RunId::new();
1154        auth.apply(InputLifecycleInput::QueueAccepted).unwrap();
1155        auth.apply(InputLifecycleInput::StageForRun {
1156            run_id: run_id.clone(),
1157        })
1158        .unwrap();
1159        auth.apply(InputLifecycleInput::MarkApplied {
1160            run_id: run_id.clone(),
1161        })
1162        .unwrap();
1163        auth.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
1164            boundary_sequence: 1,
1165        })
1166        .unwrap();
1167        let t = auth.apply(InputLifecycleInput::Consume).unwrap();
1168        assert!(t.effects.iter().any(|e| matches!(
1169            e,
1170            InputLifecycleEffect::InputLifecycleNotice {
1171                new_state: InputLifecycleState::Consumed
1172            }
1173        )));
1174        assert!(t.effects.iter().any(|e| matches!(
1175            e,
1176            InputLifecycleEffect::RecordTerminalOutcome {
1177                outcome: InputTerminalOutcome::Consumed
1178            }
1179        )));
1180    }
1181
1182    #[test]
1183    fn abandon_emits_notice_and_terminal_outcome() {
1184        let mut auth = make_authority();
1185        let t = auth
1186            .apply(InputLifecycleInput::Abandon {
1187                reason: InputAbandonReason::Cancelled,
1188            })
1189            .unwrap();
1190        assert!(t.effects.iter().any(|e| matches!(
1191            e,
1192            InputLifecycleEffect::InputLifecycleNotice {
1193                new_state: InputLifecycleState::Abandoned
1194            }
1195        )));
1196        assert!(t.effects.iter().any(|e| matches!(
1197            e,
1198            InputLifecycleEffect::RecordTerminalOutcome {
1199                outcome: InputTerminalOutcome::Abandoned {
1200                    reason: InputAbandonReason::Cancelled,
1201                },
1202            }
1203        )));
1204    }
1205}