Skip to main content

meerkat_runtime/
input_state.rs

1//! §13 InputState — the lifecycle state machine for inputs.
2//!
3//! Every accepted input has an InputState that tracks its progression
4//! through the runtime lifecycle.
5
6use chrono::{DateTime, Utc};
7use meerkat_core::lifecycle::{InputId, RunId};
8use serde::{Deserialize, Serialize};
9
10use crate::identifiers::PolicyVersion;
11use crate::input::Input;
12use crate::policy::PolicyDecision;
13
14/// The lifecycle state of an input.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17#[non_exhaustive]
18pub enum InputLifecycleState {
19    /// Input has been accepted but not yet queued.
20    Accepted,
21    /// Input is in the processing queue.
22    Queued,
23    /// Input has been staged at a run boundary.
24    Staged,
25    /// Input's boundary primitive has been applied.
26    Applied,
27    /// Applied and pending consumption (run in progress).
28    AppliedPendingConsumption,
29    /// Input has been fully consumed (terminal).
30    Consumed,
31    /// Input was superseded by a newer input (terminal).
32    Superseded,
33    /// Input was coalesced into an aggregate (terminal).
34    Coalesced,
35    /// Input was abandoned (retire/reset/destroy) (terminal).
36    Abandoned,
37}
38
39impl InputLifecycleState {
40    /// Check if this is a terminal state (no further transitions possible).
41    pub fn is_terminal(&self) -> bool {
42        matches!(
43            self,
44            Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
45        )
46    }
47}
48
49/// Why an input was abandoned.
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[non_exhaustive]
53pub enum InputAbandonReason {
54    /// Runtime was retired.
55    Retired,
56    /// Runtime was reset.
57    Reset,
58    /// Runtime was destroyed.
59    Destroyed,
60    /// Input was explicitly cancelled.
61    Cancelled,
62}
63
64/// Terminal outcome of an input.
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(tag = "outcome_type", rename_all = "snake_case")]
67#[non_exhaustive]
68pub enum InputTerminalOutcome {
69    /// Successfully consumed by a run.
70    Consumed,
71    /// Superseded by a newer input.
72    Superseded { superseded_by: InputId },
73    /// Coalesced into an aggregate.
74    Coalesced { aggregate_id: InputId },
75    /// Abandoned.
76    Abandoned { reason: InputAbandonReason },
77}
78
79/// A single entry in the input's state history.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct InputStateHistoryEntry {
82    /// When this transition occurred.
83    pub timestamp: DateTime<Utc>,
84    /// The state transitioned from.
85    pub from: InputLifecycleState,
86    /// The state transitioned to.
87    pub to: InputLifecycleState,
88    /// Optional reason for the transition.
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub reason: Option<String>,
91}
92
93/// Snapshot of the policy that was applied to this input.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PolicySnapshot {
96    /// The policy version.
97    pub version: PolicyVersion,
98    /// The decision that was made.
99    pub decision: PolicyDecision,
100}
101
102/// How a derived input can be reconstructed after crash recovery.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(tag = "source_type", rename_all = "snake_case")]
105#[non_exhaustive]
106pub enum ReconstructionSource {
107    /// Derived from a projection rule.
108    Projection {
109        rule_id: String,
110        source_event_id: String,
111    },
112    /// Derived from coalescing.
113    Coalescing { source_input_ids: Vec<InputId> },
114}
115
116/// An event on an input's state (for event sourcing).
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct InputStateEvent {
119    /// When the event occurred.
120    pub timestamp: DateTime<Utc>,
121    /// The new lifecycle state.
122    pub state: InputLifecycleState,
123    /// Optional detail.
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub detail: Option<String>,
126}
127
128/// Full state of an input in the runtime.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct InputState {
131    /// The input ID.
132    pub input_id: InputId,
133    /// Current lifecycle state.
134    pub current_state: InputLifecycleState,
135    /// Policy snapshot applied to this input.
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub policy: Option<PolicySnapshot>,
138    /// Terminal outcome (set when state becomes terminal).
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub terminal_outcome: Option<InputTerminalOutcome>,
141    /// Durability requirement (retained for recovery correctness).
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub durability: Option<crate::input::InputDurability>,
144    /// Idempotency key (retained for dedup across restarts).
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
147    /// Number of times this input has been applied (for crash-loop detection).
148    #[serde(default)]
149    pub attempt_count: u32,
150    /// Number of times this input has been recovered.
151    #[serde(default)]
152    pub recovery_count: u32,
153    /// State transition history.
154    #[serde(default, skip_serializing_if = "Vec::is_empty")]
155    pub history: Vec<InputStateHistoryEntry>,
156    /// How to reconstruct this input (for derived inputs).
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub reconstruction_source: Option<ReconstructionSource>,
159    /// Persisted input payload used to rebuild queued work after recovery.
160    #[serde(default, skip_serializing_if = "Option::is_none")]
161    pub persisted_input: Option<Input>,
162    /// Last run that touched this input.
163    #[serde(default, skip_serializing_if = "Option::is_none")]
164    pub last_run_id: Option<RunId>,
165    /// Boundary receipt sequence for the last applied run.
166    #[serde(default, skip_serializing_if = "Option::is_none")]
167    pub last_boundary_sequence: Option<u64>,
168    /// When the input was created.
169    pub created_at: DateTime<Utc>,
170    /// When the input was last updated.
171    pub updated_at: DateTime<Utc>,
172}
173
174impl InputState {
175    /// Create a new InputState in the Accepted state.
176    pub fn new_accepted(input_id: InputId) -> Self {
177        let now = Utc::now();
178        Self {
179            input_id,
180            current_state: InputLifecycleState::Accepted,
181            policy: None,
182            terminal_outcome: None,
183            durability: None,
184            idempotency_key: None,
185            attempt_count: 0,
186            recovery_count: 0,
187            history: Vec::new(),
188            reconstruction_source: None,
189            persisted_input: None,
190            last_run_id: None,
191            last_boundary_sequence: None,
192            created_at: now,
193            updated_at: now,
194        }
195    }
196
197    /// Check if the input is in a terminal state.
198    pub fn is_terminal(&self) -> bool {
199        self.current_state.is_terminal()
200    }
201}
202
203#[cfg(test)]
204#[allow(clippy::unwrap_used)]
205mod tests {
206    use super::*;
207    use crate::policy::{ApplyMode, ConsumePoint, QueueMode, WakeMode};
208
209    #[test]
210    fn lifecycle_state_terminal() {
211        assert!(InputLifecycleState::Consumed.is_terminal());
212        assert!(InputLifecycleState::Superseded.is_terminal());
213        assert!(InputLifecycleState::Coalesced.is_terminal());
214        assert!(InputLifecycleState::Abandoned.is_terminal());
215
216        assert!(!InputLifecycleState::Accepted.is_terminal());
217        assert!(!InputLifecycleState::Queued.is_terminal());
218        assert!(!InputLifecycleState::Staged.is_terminal());
219        assert!(!InputLifecycleState::Applied.is_terminal());
220        assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
221    }
222
223    #[test]
224    fn lifecycle_state_serde() {
225        for state in [
226            InputLifecycleState::Accepted,
227            InputLifecycleState::Queued,
228            InputLifecycleState::Staged,
229            InputLifecycleState::Applied,
230            InputLifecycleState::AppliedPendingConsumption,
231            InputLifecycleState::Consumed,
232            InputLifecycleState::Superseded,
233            InputLifecycleState::Coalesced,
234            InputLifecycleState::Abandoned,
235        ] {
236            let json = serde_json::to_value(state).unwrap();
237            let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
238            assert_eq!(state, parsed);
239        }
240    }
241
242    #[test]
243    fn input_state_new_accepted() {
244        let id = InputId::new();
245        let state = InputState::new_accepted(id.clone());
246        assert_eq!(state.input_id, id);
247        assert_eq!(state.current_state, InputLifecycleState::Accepted);
248        assert!(!state.is_terminal());
249        assert!(state.history.is_empty());
250        assert!(state.terminal_outcome.is_none());
251        assert!(state.policy.is_none());
252    }
253
254    #[test]
255    fn input_state_serde_roundtrip() {
256        let mut state = InputState::new_accepted(InputId::new());
257        state.policy = Some(PolicySnapshot {
258            version: PolicyVersion(1),
259            decision: PolicyDecision {
260                apply_mode: ApplyMode::StageRunStart,
261                wake_mode: WakeMode::WakeIfIdle,
262                queue_mode: QueueMode::Fifo,
263                consume_point: ConsumePoint::OnRunComplete,
264                record_transcript: true,
265                emit_operator_content: true,
266                policy_version: PolicyVersion(1),
267            },
268        });
269        state.history.push(InputStateHistoryEntry {
270            timestamp: Utc::now(),
271            from: InputLifecycleState::Accepted,
272            to: InputLifecycleState::Queued,
273            reason: Some("policy resolved".into()),
274        });
275
276        let json = serde_json::to_value(&state).unwrap();
277        let parsed: InputState = serde_json::from_value(json).unwrap();
278        assert_eq!(parsed.input_id, state.input_id);
279        assert_eq!(parsed.current_state, state.current_state);
280        assert_eq!(parsed.history.len(), 1);
281    }
282
283    #[test]
284    fn abandon_reason_serde() {
285        for reason in [
286            InputAbandonReason::Retired,
287            InputAbandonReason::Reset,
288            InputAbandonReason::Destroyed,
289            InputAbandonReason::Cancelled,
290        ] {
291            let json = serde_json::to_value(&reason).unwrap();
292            let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
293            assert_eq!(reason, parsed);
294        }
295    }
296
297    #[test]
298    fn terminal_outcome_consumed_serde() {
299        let outcome = InputTerminalOutcome::Consumed;
300        let json = serde_json::to_value(&outcome).unwrap();
301        assert_eq!(json["outcome_type"], "consumed");
302        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
303        assert_eq!(outcome, parsed);
304    }
305
306    #[test]
307    fn terminal_outcome_superseded_serde() {
308        let outcome = InputTerminalOutcome::Superseded {
309            superseded_by: InputId::new(),
310        };
311        let json = serde_json::to_value(&outcome).unwrap();
312        assert_eq!(json["outcome_type"], "superseded");
313        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
314        assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
315    }
316
317    #[test]
318    fn terminal_outcome_abandoned_serde() {
319        let outcome = InputTerminalOutcome::Abandoned {
320            reason: InputAbandonReason::Retired,
321        };
322        let json = serde_json::to_value(&outcome).unwrap();
323        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
324        assert!(matches!(
325            parsed,
326            InputTerminalOutcome::Abandoned {
327                reason: InputAbandonReason::Retired,
328            }
329        ));
330    }
331
332    #[test]
333    fn reconstruction_source_serde() {
334        let sources = vec![
335            ReconstructionSource::Projection {
336                rule_id: "rule-1".into(),
337                source_event_id: "evt-1".into(),
338            },
339            ReconstructionSource::Coalescing {
340                source_input_ids: vec![InputId::new(), InputId::new()],
341            },
342        ];
343        for source in sources {
344            let json = serde_json::to_value(&source).unwrap();
345            // Verify the tag is present
346            assert!(json["source_type"].is_string());
347            let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
348            let _ = parsed;
349        }
350    }
351
352    #[test]
353    fn input_state_event_serde() {
354        let event = InputStateEvent {
355            timestamp: Utc::now(),
356            state: InputLifecycleState::Queued,
357            detail: Some("queued for processing".into()),
358        };
359        let json = serde_json::to_value(&event).unwrap();
360        let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
361        assert_eq!(parsed.state, InputLifecycleState::Queued);
362    }
363}