Skip to main content

meerkat_runtime/
input_state.rs

1//! ยง13 InputState -- the lifecycle state machine for inputs.
2//!
3//! Every accepted input has an InputState that tracks its progression
4//! through the runtime lifecycle. Canonical lifecycle state is owned by
5//! [`InputLifecycleAuthority`] -- all transitions flow through
6//! `authority().apply()`.
7
8use chrono::{DateTime, Utc};
9use meerkat_core::lifecycle::{InputId, RunId};
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::PolicyVersion;
13use crate::input::Input;
14use crate::input_lifecycle_authority::{
15    InputLifecycleAuthority, InputLifecycleError, InputLifecycleInput, InputLifecycleMutator,
16    InputLifecycleTransition,
17};
18use crate::policy::PolicyDecision;
19
20/// The lifecycle state of an input.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum InputLifecycleState {
25    /// Input has been accepted but not yet queued.
26    Accepted,
27    /// Input is in the processing queue.
28    Queued,
29    /// Input has been staged at a run boundary.
30    Staged,
31    /// Input's boundary primitive has been applied.
32    Applied,
33    /// Applied and pending consumption (run in progress).
34    AppliedPendingConsumption,
35    /// Input has been fully consumed (terminal).
36    Consumed,
37    /// Input was superseded by a newer input (terminal).
38    Superseded,
39    /// Input was coalesced into an aggregate (terminal).
40    Coalesced,
41    /// Input was abandoned (retire/reset/destroy) (terminal).
42    Abandoned,
43}
44
45impl InputLifecycleState {
46    /// Check if this is a terminal state (no further transitions possible).
47    pub fn is_terminal(&self) -> bool {
48        matches!(
49            self,
50            Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
51        )
52    }
53}
54
55/// Why an input was abandoned.
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58#[non_exhaustive]
59pub enum InputAbandonReason {
60    /// Runtime was retired.
61    Retired,
62    /// Runtime was reset.
63    Reset,
64    /// Runtime was destroyed.
65    Destroyed,
66    /// Input was explicitly cancelled.
67    Cancelled,
68}
69
70/// Terminal outcome of an input.
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(tag = "outcome_type", rename_all = "snake_case")]
73#[non_exhaustive]
74pub enum InputTerminalOutcome {
75    /// Successfully consumed by a run.
76    Consumed,
77    /// Superseded by a newer input.
78    Superseded { superseded_by: InputId },
79    /// Coalesced into an aggregate.
80    Coalesced { aggregate_id: InputId },
81    /// Abandoned.
82    Abandoned { reason: InputAbandonReason },
83}
84
85/// A single entry in the input's state history.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InputStateHistoryEntry {
88    /// When this transition occurred.
89    pub timestamp: DateTime<Utc>,
90    /// The state transitioned from.
91    pub from: InputLifecycleState,
92    /// The state transitioned to.
93    pub to: InputLifecycleState,
94    /// Optional reason for the transition.
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub reason: Option<String>,
97}
98
99/// Snapshot of the policy that was applied to this input.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct PolicySnapshot {
102    /// The policy version.
103    pub version: PolicyVersion,
104    /// The decision that was made.
105    pub decision: PolicyDecision,
106}
107
108/// How a derived input can be reconstructed after crash recovery.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(tag = "source_type", rename_all = "snake_case")]
111#[non_exhaustive]
112pub enum ReconstructionSource {
113    /// Derived from a projection rule.
114    Projection {
115        rule_id: String,
116        source_event_id: String,
117    },
118    /// Derived from coalescing.
119    Coalescing { source_input_ids: Vec<InputId> },
120}
121
122/// An event on an input's state (for event sourcing).
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct InputStateEvent {
125    /// When the event occurred.
126    pub timestamp: DateTime<Utc>,
127    /// The new lifecycle state.
128    pub state: InputLifecycleState,
129    /// Optional detail.
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub detail: Option<String>,
132}
133
134/// Full state of an input in the runtime.
135///
136/// Canonical lifecycle state (phase, terminal_outcome, history, last_run_id,
137/// last_boundary_sequence, updated_at) is owned by the embedded
138/// [`InputLifecycleAuthority`]. All transitions must flow through
139/// [`InputState::apply`]. Direct mutation of canonical fields is not possible.
140///
141/// Operational fields (input_id, policy, durability, idempotency_key, etc.)
142/// are caller-managed and do not participate in the state machine.
143#[derive(Debug, Clone)]
144pub struct InputState {
145    /// The input ID.
146    pub input_id: InputId,
147    /// The canonical lifecycle authority -- owns phase, terminal_outcome,
148    /// history, last_run_id, last_boundary_sequence, updated_at.
149    authority: InputLifecycleAuthority,
150    /// Policy snapshot applied to this input.
151    pub policy: Option<PolicySnapshot>,
152    /// Durability requirement (retained for recovery correctness).
153    pub durability: Option<crate::input::InputDurability>,
154    /// Idempotency key (retained for dedup across restarts).
155    pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
156    /// Number of times this input has been applied (for crash-loop detection).
157    pub attempt_count: u32,
158    /// Number of times this input has been recovered.
159    pub recovery_count: u32,
160    /// How to reconstruct this input (for derived inputs).
161    pub reconstruction_source: Option<ReconstructionSource>,
162    /// Persisted input payload used to rebuild queued work after recovery.
163    pub persisted_input: Option<Input>,
164    /// When the input was created.
165    pub created_at: DateTime<Utc>,
166}
167
168impl InputState {
169    /// Create a new InputState in the Accepted state.
170    pub fn new_accepted(input_id: InputId) -> Self {
171        let now = Utc::now();
172        Self {
173            input_id,
174            authority: InputLifecycleAuthority::new_at(now),
175            policy: None,
176            durability: None,
177            idempotency_key: None,
178            attempt_count: 0,
179            recovery_count: 0,
180            reconstruction_source: None,
181            persisted_input: None,
182            created_at: now,
183        }
184    }
185
186    // ---- Canonical field accessors (delegate to authority) ----
187
188    /// Current lifecycle state.
189    pub fn current_state(&self) -> InputLifecycleState {
190        self.authority.phase()
191    }
192
193    /// Check if the input is in a terminal state.
194    pub fn is_terminal(&self) -> bool {
195        self.authority.is_terminal()
196    }
197
198    /// Terminal outcome (set when state becomes terminal).
199    pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
200        self.authority.terminal_outcome()
201    }
202
203    /// State transition history.
204    pub fn history(&self) -> &[InputStateHistoryEntry] {
205        self.authority.history()
206    }
207
208    /// Last run that touched this input.
209    pub fn last_run_id(&self) -> Option<&RunId> {
210        self.authority.last_run_id()
211    }
212
213    /// Boundary receipt sequence for the last applied run.
214    pub fn last_boundary_sequence(&self) -> Option<u64> {
215        self.authority.last_boundary_sequence()
216    }
217
218    /// When the input was last updated.
219    pub fn updated_at(&self) -> DateTime<Utc> {
220        self.authority.updated_at()
221    }
222
223    // ---- Authority access ----
224
225    /// Apply a lifecycle input through the authority.
226    ///
227    /// This is the ONLY way to mutate canonical lifecycle state.
228    pub fn apply(
229        &mut self,
230        input: InputLifecycleInput,
231    ) -> Result<InputLifecycleTransition, InputLifecycleError> {
232        self.authority.apply(input)
233    }
234
235    /// Check if a transition would be accepted without applying it.
236    pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
237        self.authority.can_accept(input)
238    }
239
240    /// Set the terminal outcome after a transition (for Superseded/Coalesced
241    /// which need caller-provided data).
242    pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
243        self.authority.set_terminal_outcome(outcome);
244    }
245
246    /// Get a reference to the authority (for advanced read-only operations).
247    pub fn authority(&self) -> &InputLifecycleAuthority {
248        &self.authority
249    }
250
251    /// Get a mutable reference to the authority (for recovery paths).
252    pub fn authority_mut(&mut self) -> &mut InputLifecycleAuthority {
253        &mut self.authority
254    }
255}
256
257// ---------------------------------------------------------------------------
258// Custom Serialize/Deserialize to preserve wire format
259// ---------------------------------------------------------------------------
260
261/// Serialization helper to keep the same wire format as the old InputState.
262#[derive(Serialize, Deserialize)]
263struct InputStateSerde {
264    input_id: InputId,
265    current_state: InputLifecycleState,
266    #[serde(skip_serializing_if = "Option::is_none")]
267    policy: Option<PolicySnapshot>,
268    #[serde(skip_serializing_if = "Option::is_none")]
269    terminal_outcome: Option<InputTerminalOutcome>,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    durability: Option<crate::input::InputDurability>,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    idempotency_key: Option<crate::identifiers::IdempotencyKey>,
274    #[serde(default)]
275    attempt_count: u32,
276    #[serde(default)]
277    recovery_count: u32,
278    #[serde(default, skip_serializing_if = "Vec::is_empty")]
279    history: Vec<InputStateHistoryEntry>,
280    #[serde(skip_serializing_if = "Option::is_none")]
281    reconstruction_source: Option<ReconstructionSource>,
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    persisted_input: Option<Input>,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    last_run_id: Option<RunId>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    last_boundary_sequence: Option<u64>,
288    created_at: DateTime<Utc>,
289    updated_at: DateTime<Utc>,
290}
291
292impl Serialize for InputState {
293    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294        let helper = InputStateSerde {
295            input_id: self.input_id.clone(),
296            current_state: self.authority.phase(),
297            policy: self.policy.clone(),
298            terminal_outcome: self.authority.terminal_outcome().cloned(),
299            durability: self.durability,
300            idempotency_key: self.idempotency_key.clone(),
301            attempt_count: self.attempt_count,
302            recovery_count: self.recovery_count,
303            history: self.authority.history().to_vec(),
304            reconstruction_source: self.reconstruction_source.clone(),
305            persisted_input: self.persisted_input.clone(),
306            last_run_id: self.authority.last_run_id().cloned(),
307            last_boundary_sequence: self.authority.last_boundary_sequence(),
308            created_at: self.created_at,
309            updated_at: self.authority.updated_at(),
310        };
311        helper.serialize(serializer)
312    }
313}
314
315impl<'de> Deserialize<'de> for InputState {
316    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
317        let helper = InputStateSerde::deserialize(deserializer)?;
318        let authority = InputLifecycleAuthority::restore(
319            helper.current_state,
320            helper.terminal_outcome,
321            helper.last_run_id,
322            helper.last_boundary_sequence,
323            helper.history,
324            helper.updated_at,
325        );
326        Ok(InputState {
327            input_id: helper.input_id,
328            authority,
329            policy: helper.policy,
330            durability: helper.durability,
331            idempotency_key: helper.idempotency_key,
332            attempt_count: helper.attempt_count,
333            recovery_count: helper.recovery_count,
334            reconstruction_source: helper.reconstruction_source,
335            persisted_input: helper.persisted_input,
336            created_at: helper.created_at,
337        })
338    }
339}
340
341#[cfg(test)]
342#[allow(clippy::unwrap_used)]
343mod tests {
344    use super::*;
345    use crate::policy::{
346        ApplyMode, ConsumePoint, DrainPolicy, InterruptPolicy, QueueMode, RoutingDisposition,
347        WakeMode,
348    };
349    use meerkat_core::ops::{OpEvent, OperationId};
350
351    #[test]
352    fn lifecycle_state_terminal() {
353        assert!(InputLifecycleState::Consumed.is_terminal());
354        assert!(InputLifecycleState::Superseded.is_terminal());
355        assert!(InputLifecycleState::Coalesced.is_terminal());
356        assert!(InputLifecycleState::Abandoned.is_terminal());
357
358        assert!(!InputLifecycleState::Accepted.is_terminal());
359        assert!(!InputLifecycleState::Queued.is_terminal());
360        assert!(!InputLifecycleState::Staged.is_terminal());
361        assert!(!InputLifecycleState::Applied.is_terminal());
362        assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
363    }
364
365    #[test]
366    fn lifecycle_state_serde() {
367        for state in [
368            InputLifecycleState::Accepted,
369            InputLifecycleState::Queued,
370            InputLifecycleState::Staged,
371            InputLifecycleState::Applied,
372            InputLifecycleState::AppliedPendingConsumption,
373            InputLifecycleState::Consumed,
374            InputLifecycleState::Superseded,
375            InputLifecycleState::Coalesced,
376            InputLifecycleState::Abandoned,
377        ] {
378            let json = serde_json::to_value(state).unwrap();
379            let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
380            assert_eq!(state, parsed);
381        }
382    }
383
384    #[test]
385    fn input_state_new_accepted() {
386        let id = InputId::new();
387        let state = InputState::new_accepted(id.clone());
388        assert_eq!(state.input_id, id);
389        assert_eq!(state.current_state(), InputLifecycleState::Accepted);
390        assert!(!state.is_terminal());
391        assert!(state.history().is_empty());
392        assert!(state.terminal_outcome().is_none());
393        assert!(state.policy.is_none());
394    }
395
396    #[test]
397    fn input_state_serde_roundtrip() {
398        let mut state = InputState::new_accepted(InputId::new());
399        state.policy = Some(PolicySnapshot {
400            version: PolicyVersion(1),
401            decision: PolicyDecision {
402                apply_mode: ApplyMode::StageRunStart,
403                wake_mode: WakeMode::WakeIfIdle,
404                queue_mode: QueueMode::Fifo,
405                consume_point: ConsumePoint::OnRunComplete,
406                interrupt_policy: InterruptPolicy::None,
407                drain_policy: DrainPolicy::QueueNextTurn,
408                routing_disposition: RoutingDisposition::Queue,
409                record_transcript: true,
410                emit_operator_content: true,
411                policy_version: PolicyVersion(1),
412            },
413        });
414        state.apply(InputLifecycleInput::QueueAccepted).unwrap();
415
416        let json = serde_json::to_value(&state).unwrap();
417        let parsed: InputState = serde_json::from_value(json).unwrap();
418        assert_eq!(parsed.input_id, state.input_id);
419        assert_eq!(parsed.current_state(), state.current_state());
420        assert_eq!(parsed.history().len(), 1);
421    }
422
423    #[test]
424    fn input_state_deserializes_legacy_persisted_input_tags() {
425        let mut continuation_state = InputState::new_accepted(InputId::new());
426        continuation_state.persisted_input = Some(Input::Continuation(
427            crate::input::ContinuationInput::terminal_peer_response_for_request(
428                "legacy continuation",
429                Some("req-legacy".into()),
430            ),
431        ));
432        let mut continuation_json = serde_json::to_value(&continuation_state).unwrap();
433        continuation_json["persisted_input"]["input_type"] =
434            serde_json::Value::String("system_generated".into());
435        let parsed: InputState = serde_json::from_value(continuation_json).unwrap();
436        assert!(matches!(
437            parsed.persisted_input,
438            Some(Input::Continuation(_))
439        ));
440
441        let mut operation_state = InputState::new_accepted(InputId::new());
442        operation_state.persisted_input = Some(Input::Operation(crate::input::OperationInput {
443            header: crate::input::InputHeader {
444                id: InputId::new(),
445                timestamp: Utc::now(),
446                source: crate::input::InputOrigin::System,
447                durability: crate::input::InputDurability::Derived,
448                visibility: crate::input::InputVisibility::default(),
449                idempotency_key: None,
450                supersession_key: None,
451                correlation_id: None,
452            },
453            operation_id: OperationId::new(),
454            event: OpEvent::Cancelled {
455                id: OperationId::new(),
456            },
457        }));
458        let mut operation_json = serde_json::to_value(&operation_state).unwrap();
459        operation_json["persisted_input"]["input_type"] =
460            serde_json::Value::String("projected".into());
461        let parsed: InputState = serde_json::from_value(operation_json).unwrap();
462        assert!(matches!(parsed.persisted_input, Some(Input::Operation(_))));
463    }
464
465    #[test]
466    fn abandon_reason_serde() {
467        for reason in [
468            InputAbandonReason::Retired,
469            InputAbandonReason::Reset,
470            InputAbandonReason::Destroyed,
471            InputAbandonReason::Cancelled,
472        ] {
473            let json = serde_json::to_value(&reason).unwrap();
474            let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
475            assert_eq!(reason, parsed);
476        }
477    }
478
479    #[test]
480    fn terminal_outcome_consumed_serde() {
481        let outcome = InputTerminalOutcome::Consumed;
482        let json = serde_json::to_value(&outcome).unwrap();
483        assert_eq!(json["outcome_type"], "consumed");
484        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
485        assert_eq!(outcome, parsed);
486    }
487
488    #[test]
489    fn terminal_outcome_superseded_serde() {
490        let outcome = InputTerminalOutcome::Superseded {
491            superseded_by: InputId::new(),
492        };
493        let json = serde_json::to_value(&outcome).unwrap();
494        assert_eq!(json["outcome_type"], "superseded");
495        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
496        assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
497    }
498
499    #[test]
500    fn terminal_outcome_abandoned_serde() {
501        let outcome = InputTerminalOutcome::Abandoned {
502            reason: InputAbandonReason::Retired,
503        };
504        let json = serde_json::to_value(&outcome).unwrap();
505        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
506        assert!(matches!(
507            parsed,
508            InputTerminalOutcome::Abandoned {
509                reason: InputAbandonReason::Retired,
510            }
511        ));
512    }
513
514    #[test]
515    fn reconstruction_source_serde() {
516        let sources = vec![
517            ReconstructionSource::Projection {
518                rule_id: "rule-1".into(),
519                source_event_id: "evt-1".into(),
520            },
521            ReconstructionSource::Coalescing {
522                source_input_ids: vec![InputId::new(), InputId::new()],
523            },
524        ];
525        for source in sources {
526            let json = serde_json::to_value(&source).unwrap();
527            assert!(json["source_type"].is_string());
528            let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
529            let _ = parsed;
530        }
531    }
532
533    #[test]
534    fn input_state_event_serde() {
535        let event = InputStateEvent {
536            timestamp: Utc::now(),
537            state: InputLifecycleState::Queued,
538            detail: Some("queued for processing".into()),
539        };
540        let json = serde_json::to_value(&event).unwrap();
541        let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
542        assert_eq!(parsed.state, InputLifecycleState::Queued);
543    }
544}