Skip to main content

meerkat_runtime/
input_state.rs

1//! ยง13 InputState -- the lifecycle state machine for inputs.
2//!
3//! Every accepted input has an InputState that tracks its progression
4//! through the runtime lifecycle. Canonical lifecycle state is owned by
5//! [`InputLifecycleAuthority`] -- all transitions flow through
6//! `authority().apply()`.
7
8use chrono::{DateTime, Utc};
9use meerkat_core::lifecycle::{InputId, RunId};
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::PolicyVersion;
13use crate::input::Input;
14use crate::input_lifecycle_authority::{
15    InputLifecycleAuthority, InputLifecycleError, InputLifecycleInput, InputLifecycleMutator,
16    InputLifecycleTransition,
17};
18use crate::policy::PolicyDecision;
19
20/// The lifecycle state of an input.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum InputLifecycleState {
25    /// Input has been accepted but not yet queued.
26    Accepted,
27    /// Input is in the processing queue.
28    Queued,
29    /// Input has been staged at a run boundary.
30    Staged,
31    /// Input's boundary primitive has been applied.
32    Applied,
33    /// Applied and pending consumption (run in progress).
34    AppliedPendingConsumption,
35    /// Input has been fully consumed (terminal).
36    Consumed,
37    /// Input was superseded by a newer input (terminal).
38    Superseded,
39    /// Input was coalesced into an aggregate (terminal).
40    Coalesced,
41    /// Input was abandoned (retire/reset/destroy) (terminal).
42    Abandoned,
43}
44
45impl InputLifecycleState {
46    /// Check if this is a terminal state (no further transitions possible).
47    pub fn is_terminal(&self) -> bool {
48        matches!(
49            self,
50            Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
51        )
52    }
53}
54
55/// Why an input was abandoned.
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58#[non_exhaustive]
59pub enum InputAbandonReason {
60    /// Runtime was retired.
61    Retired,
62    /// Runtime was reset.
63    Reset,
64    /// Runtime was destroyed.
65    Destroyed,
66    /// Input was explicitly cancelled.
67    Cancelled,
68    /// The input was staged for execution too many times without success.
69    MaxAttemptsExhausted { attempts: u32 },
70}
71
72/// Terminal outcome of an input.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "outcome_type", rename_all = "snake_case")]
75#[non_exhaustive]
76pub enum InputTerminalOutcome {
77    /// Successfully consumed by a run.
78    Consumed,
79    /// Superseded by a newer input.
80    Superseded { superseded_by: InputId },
81    /// Coalesced into an aggregate.
82    Coalesced { aggregate_id: InputId },
83    /// Abandoned.
84    Abandoned { reason: InputAbandonReason },
85}
86
87/// A single entry in the input's state history.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct InputStateHistoryEntry {
90    /// When this transition occurred.
91    pub timestamp: DateTime<Utc>,
92    /// The state transitioned from.
93    pub from: InputLifecycleState,
94    /// The state transitioned to.
95    pub to: InputLifecycleState,
96    /// Optional reason for the transition.
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub reason: Option<String>,
99}
100
101/// Snapshot of the policy that was applied to this input.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct PolicySnapshot {
104    /// The policy version.
105    pub version: PolicyVersion,
106    /// The decision that was made.
107    pub decision: PolicyDecision,
108}
109
110/// How a derived input can be reconstructed after crash recovery.
111#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(tag = "source_type", rename_all = "snake_case")]
113#[non_exhaustive]
114pub enum ReconstructionSource {
115    /// Derived from a projection rule.
116    Projection {
117        rule_id: String,
118        source_event_id: String,
119    },
120    /// Derived from coalescing.
121    Coalescing { source_input_ids: Vec<InputId> },
122}
123
124/// An event on an input's state (for event sourcing).
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct InputStateEvent {
127    /// When the event occurred.
128    pub timestamp: DateTime<Utc>,
129    /// The new lifecycle state.
130    pub state: InputLifecycleState,
131    /// Optional detail.
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub detail: Option<String>,
134}
135
136/// Full state of an input in the runtime.
137///
138/// Canonical lifecycle state (phase, terminal_outcome, history, last_run_id,
139/// last_boundary_sequence, updated_at) is owned by the embedded
140/// [`InputLifecycleAuthority`]. All transitions must flow through
141/// [`InputState::apply`]. Direct mutation of canonical fields is not possible.
142///
143/// Operational fields (input_id, policy, durability, idempotency_key, etc.)
144/// are caller-managed and do not participate in the state machine.
145#[derive(Debug, Clone)]
146pub struct InputState {
147    /// The input ID.
148    pub input_id: InputId,
149    /// The canonical lifecycle authority -- owns phase, terminal_outcome,
150    /// history, last_run_id, last_boundary_sequence, updated_at.
151    authority: InputLifecycleAuthority,
152    /// Policy snapshot applied to this input.
153    pub policy: Option<PolicySnapshot>,
154    /// Durability requirement (retained for recovery correctness).
155    pub durability: Option<crate::input::InputDurability>,
156    /// Idempotency key (retained for dedup across restarts).
157    pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
158    /// Number of times this input has been recovered.
159    pub recovery_count: u32,
160    /// How to reconstruct this input (for derived inputs).
161    pub reconstruction_source: Option<ReconstructionSource>,
162    /// Persisted input payload used to rebuild queued work after recovery.
163    pub persisted_input: Option<Input>,
164    /// When the input was created.
165    pub created_at: DateTime<Utc>,
166}
167
168impl InputState {
169    /// Create a new InputState in the Accepted state.
170    pub fn new_accepted(input_id: InputId) -> Self {
171        let now = Utc::now();
172        Self {
173            input_id,
174            authority: InputLifecycleAuthority::new_at(now),
175            policy: None,
176            durability: None,
177            idempotency_key: None,
178            recovery_count: 0,
179            reconstruction_source: None,
180            persisted_input: None,
181            created_at: now,
182        }
183    }
184
185    // ---- Canonical field accessors (delegate to authority) ----
186
187    /// Current lifecycle state.
188    pub fn current_state(&self) -> InputLifecycleState {
189        self.authority.phase()
190    }
191
192    /// Check if the input is in a terminal state.
193    pub fn is_terminal(&self) -> bool {
194        self.authority.is_terminal()
195    }
196
197    /// Terminal outcome (set when state becomes terminal).
198    pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
199        self.authority.terminal_outcome()
200    }
201
202    /// State transition history.
203    pub fn history(&self) -> &[InputStateHistoryEntry] {
204        self.authority.history()
205    }
206
207    /// Last run that touched this input.
208    pub fn last_run_id(&self) -> Option<&RunId> {
209        self.authority.last_run_id()
210    }
211
212    /// Boundary receipt sequence for the last applied run.
213    pub fn last_boundary_sequence(&self) -> Option<u64> {
214        self.authority.last_boundary_sequence()
215    }
216
217    /// When the input was last updated.
218    pub fn updated_at(&self) -> DateTime<Utc> {
219        self.authority.updated_at()
220    }
221
222    /// Number of times this input has been staged for a run (retry budget).
223    ///
224    /// Owned by [`InputLifecycleAuthority`]. No shadow copy on InputState.
225    pub fn attempt_count(&self) -> u32 {
226        self.authority.attempt_count()
227    }
228
229    // ---- Authority access ----
230
231    /// Apply a lifecycle input through the authority.
232    ///
233    /// This is the ONLY way to mutate canonical lifecycle state.
234    pub fn apply(
235        &mut self,
236        input: InputLifecycleInput,
237    ) -> Result<InputLifecycleTransition, InputLifecycleError> {
238        self.authority.apply(input)
239    }
240
241    /// Check if a transition would be accepted without applying it.
242    pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
243        self.authority.can_accept(input)
244    }
245
246    /// Set the terminal outcome after a transition (for Superseded/Coalesced
247    /// which need caller-provided data).
248    pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
249        self.authority.set_terminal_outcome(outcome);
250    }
251
252    /// Get a reference to the authority (for advanced read-only operations).
253    pub fn authority(&self) -> &InputLifecycleAuthority {
254        &self.authority
255    }
256
257    /// Get a mutable reference to the authority (for recovery paths).
258    pub fn authority_mut(&mut self) -> &mut InputLifecycleAuthority {
259        &mut self.authority
260    }
261}
262
263// ---------------------------------------------------------------------------
264// Custom Serialize/Deserialize to preserve wire format
265// ---------------------------------------------------------------------------
266
267/// Serialization helper to keep the same wire format as the old InputState.
268#[derive(Serialize, Deserialize)]
269struct InputStateSerde {
270    input_id: InputId,
271    current_state: InputLifecycleState,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    policy: Option<PolicySnapshot>,
274    #[serde(skip_serializing_if = "Option::is_none")]
275    terminal_outcome: Option<InputTerminalOutcome>,
276    #[serde(skip_serializing_if = "Option::is_none")]
277    durability: Option<crate::input::InputDurability>,
278    #[serde(skip_serializing_if = "Option::is_none")]
279    idempotency_key: Option<crate::identifiers::IdempotencyKey>,
280    #[serde(default)]
281    attempt_count: u32,
282    #[serde(default)]
283    recovery_count: u32,
284    #[serde(default, skip_serializing_if = "Vec::is_empty")]
285    history: Vec<InputStateHistoryEntry>,
286    #[serde(skip_serializing_if = "Option::is_none")]
287    reconstruction_source: Option<ReconstructionSource>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    persisted_input: Option<Input>,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    last_run_id: Option<RunId>,
292    #[serde(default, skip_serializing_if = "Option::is_none")]
293    last_boundary_sequence: Option<u64>,
294    created_at: DateTime<Utc>,
295    updated_at: DateTime<Utc>,
296}
297
298impl Serialize for InputState {
299    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
300        let helper = InputStateSerde {
301            input_id: self.input_id.clone(),
302            current_state: self.authority.phase(),
303            policy: self.policy.clone(),
304            terminal_outcome: self.authority.terminal_outcome().cloned(),
305            durability: self.durability,
306            idempotency_key: self.idempotency_key.clone(),
307            attempt_count: self.authority.attempt_count(),
308            recovery_count: self.recovery_count,
309            history: self.authority.history().to_vec(),
310            reconstruction_source: self.reconstruction_source.clone(),
311            persisted_input: self.persisted_input.clone(),
312            last_run_id: self.authority.last_run_id().cloned(),
313            last_boundary_sequence: self.authority.last_boundary_sequence(),
314            created_at: self.created_at,
315            updated_at: self.authority.updated_at(),
316        };
317        helper.serialize(serializer)
318    }
319}
320
321impl<'de> Deserialize<'de> for InputState {
322    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
323        let helper = InputStateSerde::deserialize(deserializer)?;
324        let authority = InputLifecycleAuthority::restore(
325            helper.current_state,
326            helper.terminal_outcome,
327            helper.last_run_id,
328            helper.last_boundary_sequence,
329            helper.attempt_count,
330            helper.history,
331            helper.updated_at,
332        );
333        Ok(InputState {
334            input_id: helper.input_id,
335            authority,
336            policy: helper.policy,
337            durability: helper.durability,
338            idempotency_key: helper.idempotency_key,
339            recovery_count: helper.recovery_count,
340            reconstruction_source: helper.reconstruction_source,
341            persisted_input: helper.persisted_input,
342            created_at: helper.created_at,
343        })
344    }
345}
346
347#[cfg(test)]
348#[allow(clippy::unwrap_used)]
349mod tests {
350    use super::*;
351    use crate::policy::{
352        ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
353    };
354    use meerkat_core::ops::{OpEvent, OperationId};
355
356    #[test]
357    fn attempt_count_accessor_delegates_to_authority() {
358        let state = InputState::new_accepted(InputId::new());
359        assert_eq!(state.attempt_count(), 0);
360    }
361
362    #[test]
363    fn lifecycle_state_terminal() {
364        assert!(InputLifecycleState::Consumed.is_terminal());
365        assert!(InputLifecycleState::Superseded.is_terminal());
366        assert!(InputLifecycleState::Coalesced.is_terminal());
367        assert!(InputLifecycleState::Abandoned.is_terminal());
368
369        assert!(!InputLifecycleState::Accepted.is_terminal());
370        assert!(!InputLifecycleState::Queued.is_terminal());
371        assert!(!InputLifecycleState::Staged.is_terminal());
372        assert!(!InputLifecycleState::Applied.is_terminal());
373        assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
374    }
375
376    #[test]
377    fn lifecycle_state_serde() {
378        for state in [
379            InputLifecycleState::Accepted,
380            InputLifecycleState::Queued,
381            InputLifecycleState::Staged,
382            InputLifecycleState::Applied,
383            InputLifecycleState::AppliedPendingConsumption,
384            InputLifecycleState::Consumed,
385            InputLifecycleState::Superseded,
386            InputLifecycleState::Coalesced,
387            InputLifecycleState::Abandoned,
388        ] {
389            let json = serde_json::to_value(state).unwrap();
390            let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
391            assert_eq!(state, parsed);
392        }
393    }
394
395    #[test]
396    fn input_state_new_accepted() {
397        let id = InputId::new();
398        let state = InputState::new_accepted(id.clone());
399        assert_eq!(state.input_id, id);
400        assert_eq!(state.current_state(), InputLifecycleState::Accepted);
401        assert!(!state.is_terminal());
402        assert!(state.history().is_empty());
403        assert!(state.terminal_outcome().is_none());
404        assert!(state.policy.is_none());
405    }
406
407    #[test]
408    fn input_state_serde_roundtrip() {
409        let mut state = InputState::new_accepted(InputId::new());
410        state.policy = Some(PolicySnapshot {
411            version: PolicyVersion(1),
412            decision: PolicyDecision {
413                apply_mode: ApplyMode::StageRunStart,
414                wake_mode: WakeMode::WakeIfIdle,
415                queue_mode: QueueMode::Fifo,
416                consume_point: ConsumePoint::OnRunComplete,
417                drain_policy: DrainPolicy::QueueNextTurn,
418                routing_disposition: RoutingDisposition::Queue,
419                record_transcript: true,
420                emit_operator_content: true,
421                policy_version: PolicyVersion(1),
422            },
423        });
424        state.apply(InputLifecycleInput::QueueAccepted).unwrap();
425
426        let json = serde_json::to_value(&state).unwrap();
427        let parsed: InputState = serde_json::from_value(json).unwrap();
428        assert_eq!(parsed.input_id, state.input_id);
429        assert_eq!(parsed.current_state(), state.current_state());
430        assert_eq!(parsed.history().len(), 1);
431    }
432
433    #[test]
434    fn input_state_deserializes_legacy_persisted_input_tags() {
435        let mut continuation_state = InputState::new_accepted(InputId::new());
436        continuation_state.persisted_input = Some(Input::Continuation(
437            crate::input::ContinuationInput::detached_background_op_completed(),
438        ));
439        let mut continuation_json = serde_json::to_value(&continuation_state).unwrap();
440        continuation_json["persisted_input"]["input_type"] =
441            serde_json::Value::String("system_generated".into());
442        let parsed: InputState = serde_json::from_value(continuation_json).unwrap();
443        assert!(matches!(
444            parsed.persisted_input,
445            Some(Input::Continuation(_))
446        ));
447
448        let mut operation_state = InputState::new_accepted(InputId::new());
449        operation_state.persisted_input = Some(Input::Operation(crate::input::OperationInput {
450            header: crate::input::InputHeader {
451                id: InputId::new(),
452                timestamp: Utc::now(),
453                source: crate::input::InputOrigin::System,
454                durability: crate::input::InputDurability::Derived,
455                visibility: crate::input::InputVisibility::default(),
456                idempotency_key: None,
457                supersession_key: None,
458                correlation_id: None,
459            },
460            operation_id: OperationId::new(),
461            event: OpEvent::Cancelled {
462                id: OperationId::new(),
463            },
464        }));
465        let mut operation_json = serde_json::to_value(&operation_state).unwrap();
466        operation_json["persisted_input"]["input_type"] =
467            serde_json::Value::String("projected".into());
468        let parsed: InputState = serde_json::from_value(operation_json).unwrap();
469        assert!(matches!(parsed.persisted_input, Some(Input::Operation(_))));
470    }
471
472    #[test]
473    fn abandon_reason_serde() {
474        for reason in [
475            InputAbandonReason::Retired,
476            InputAbandonReason::Reset,
477            InputAbandonReason::Destroyed,
478            InputAbandonReason::Cancelled,
479        ] {
480            let json = serde_json::to_value(&reason).unwrap();
481            let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
482            assert_eq!(reason, parsed);
483        }
484    }
485
486    #[test]
487    fn terminal_outcome_consumed_serde() {
488        let outcome = InputTerminalOutcome::Consumed;
489        let json = serde_json::to_value(&outcome).unwrap();
490        assert_eq!(json["outcome_type"], "consumed");
491        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
492        assert_eq!(outcome, parsed);
493    }
494
495    #[test]
496    fn terminal_outcome_superseded_serde() {
497        let outcome = InputTerminalOutcome::Superseded {
498            superseded_by: InputId::new(),
499        };
500        let json = serde_json::to_value(&outcome).unwrap();
501        assert_eq!(json["outcome_type"], "superseded");
502        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
503        assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
504    }
505
506    #[test]
507    fn terminal_outcome_abandoned_serde() {
508        let outcome = InputTerminalOutcome::Abandoned {
509            reason: InputAbandonReason::Retired,
510        };
511        let json = serde_json::to_value(&outcome).unwrap();
512        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
513        assert!(matches!(
514            parsed,
515            InputTerminalOutcome::Abandoned {
516                reason: InputAbandonReason::Retired,
517            }
518        ));
519    }
520
521    #[test]
522    fn reconstruction_source_serde() {
523        let sources = vec![
524            ReconstructionSource::Projection {
525                rule_id: "rule-1".into(),
526                source_event_id: "evt-1".into(),
527            },
528            ReconstructionSource::Coalescing {
529                source_input_ids: vec![InputId::new(), InputId::new()],
530            },
531        ];
532        for source in sources {
533            let json = serde_json::to_value(&source).unwrap();
534            assert!(json["source_type"].is_string());
535            let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
536            let _ = parsed;
537        }
538    }
539
540    #[test]
541    fn input_state_event_serde() {
542        let event = InputStateEvent {
543            timestamp: Utc::now(),
544            state: InputLifecycleState::Queued,
545            detail: Some("queued for processing".into()),
546        };
547        let json = serde_json::to_value(&event).unwrap();
548        let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
549        assert_eq!(parsed.state, InputLifecycleState::Queued);
550    }
551}