Skip to main content

meerkat_runtime/
input_state.rs

1//! §13 InputState — per-input data shell.
2//!
3//! Canonical lifecycle truth for every input lives in the MeerkatMachine DSL
4//! (`input_phases`, `input_run_associations`, `input_boundary_sequences` plus
5//! the `QueueAccepted` / `StageForRun` / `RecordBoundarySeq` / etc.
6//! transitions). This module owns ONLY the per-input shell metadata that has
7//! no DSL home today: a history log, timestamps, policy snapshot, durability
8//! class, idempotency key, and the cached payload needed to rebuild queued
9//! work after recovery.
10//!
11//! `InputState` still caches terminal outcome and attempt count for persistence,
12//! recovery normalization, and compatibility reads, but the authoritative live
13//! copies now live in the DSL's typed terminal/attempt maps.
14
15use chrono::{DateTime, Utc};
16use meerkat_core::lifecycle::{InputId, RunId};
17use serde::{Deserialize, Serialize};
18
19use crate::identifiers::PolicyVersion;
20use crate::ingress_types::RuntimeInputSemantics;
21use crate::input::Input;
22use crate::policy::PolicyDecision;
23
24/// The lifecycle state of an input — mirrors the DSL's `input_phases` values.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27#[non_exhaustive]
28pub enum InputLifecycleState {
29    Accepted,
30    Queued,
31    Staged,
32    Applied,
33    AppliedPendingConsumption,
34    Consumed,
35    Superseded,
36    Coalesced,
37    Abandoned,
38}
39
40impl InputLifecycleState {
41    pub fn is_terminal(&self) -> bool {
42        matches!(
43            self,
44            Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
45        )
46    }
47}
48
49/// Why an input was abandoned.
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[non_exhaustive]
53pub enum InputAbandonReason {
54    Retired,
55    Reset,
56    Stopped,
57    Destroyed,
58    Cancelled,
59    MaxAttemptsExhausted { attempts: u32 },
60}
61
62/// Terminal outcome cache for an input.
63///
64/// The authoritative live copy is split across the DSL's typed terminal maps.
65/// The shell keeps this typed cache for persistence and compatibility reads.
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(tag = "outcome_type", rename_all = "snake_case")]
68#[non_exhaustive]
69pub enum InputTerminalOutcome {
70    Consumed,
71    Superseded { superseded_by: InputId },
72    Coalesced { aggregate_id: InputId },
73    Abandoned { reason: InputAbandonReason },
74}
75
76/// A single entry in the input's state history (shell bookkeeping).
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct InputStateHistoryEntry {
79    pub timestamp: DateTime<Utc>,
80    pub from: InputLifecycleState,
81    pub to: InputLifecycleState,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub reason: Option<String>,
84}
85
86/// Snapshot of the policy that was applied to this input.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PolicySnapshot {
89    pub version: PolicyVersion,
90    pub decision: PolicyDecision,
91}
92
93/// How a derived input can be reconstructed after crash recovery.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(tag = "source_type", rename_all = "snake_case")]
96#[non_exhaustive]
97pub enum ReconstructionSource {
98    Projection {
99        rule_id: String,
100        source_event_id: String,
101    },
102    Coalescing {
103        source_input_ids: Vec<InputId>,
104    },
105}
106
107/// An event on an input's state (for event sourcing).
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct InputStateEvent {
110    pub timestamp: DateTime<Utc>,
111    pub state: InputLifecycleState,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub detail: Option<String>,
114}
115
116/// Maximum stage → rollback cycles before the shell chooses Abandon instead
117/// of RollbackStaged at the callsite. Shell retry-policy mechanic; not a DSL
118/// guard.
119pub const MAX_STAGE_ATTEMPTS: u32 = 3;
120
121/// DSL-owned lifecycle projection for an input.
122///
123/// Carries the fields that are authoritative in the MeerkatMachine DSL
124/// (`input_phases`, `input_run_associations`, `input_boundary_sequences`,
125/// `input_terminal_kind` + `input_superseded_by` / `input_aggregate_id` /
126/// `input_abandon_reason` / `input_abandon_attempt_count`, and
127/// `input_attempt_counts`) so they can travel alongside a persisted
128/// [`InputState`] at the store boundary, where no live DSL is available to
129/// query. Inside a running driver, these values are always read from the DSL
130/// directly, never from the seed.
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct InputStateSeed {
133    pub phase: InputLifecycleState,
134    pub last_run_id: Option<RunId>,
135    pub last_boundary_sequence: Option<u64>,
136    pub terminal_outcome: Option<InputTerminalOutcome>,
137    pub attempt_count: u32,
138}
139
140impl InputStateSeed {
141    /// Freshly-accepted input: no run association, no boundary sequence,
142    /// no terminal outcome, zero attempts.
143    pub fn new_accepted() -> Self {
144        Self {
145            phase: InputLifecycleState::Accepted,
146            last_run_id: None,
147            last_boundary_sequence: None,
148            terminal_outcome: None,
149            attempt_count: 0,
150        }
151    }
152}
153
154/// Persisted bundle: shell [`InputState`] plus its [`InputStateSeed`].
155///
156/// Used at the store boundary so the DSL-owned fields survive persistence
157/// without being re-shadowed onto `InputState` itself. Recovery treats the
158/// seed as a durable witness and re-enters the recovered facts through typed
159/// machine inputs; it does not hydrate DSL state directly from this bundle.
160#[derive(Debug, Clone)]
161pub struct StoredInputState {
162    pub state: InputState,
163    pub seed: InputStateSeed,
164}
165
166impl StoredInputState {
167    /// Convenience: freshly-accepted bundle.
168    pub fn new_accepted(input_id: InputId) -> Self {
169        Self {
170            state: InputState::new_accepted(input_id),
171            seed: InputStateSeed::new_accepted(),
172        }
173    }
174}
175
176/// Per-input shell data. Plain fields, no hidden state machine.
177///
178/// All DSL-owned lifecycle fields (`phase`, `last_run_id`,
179/// `last_boundary_sequence`, `terminal_outcome`, `attempt_count`) are
180/// authoritative in the DSL. Live code reads them via
181/// `EphemeralRuntimeDriver::input_phase` / `input_last_run_id` /
182/// `input_last_boundary_sequence` / `input_terminal_outcome` /
183/// `input_attempt_count`. Persistence callsites serialize them via
184/// [`InputStateSeed`] bundled on [`StoredInputState`].
185#[derive(Debug, Clone)]
186pub struct InputState {
187    pub input_id: InputId,
188    /// Compatibility cache of the DSL-owned terminal outcome metadata.
189    pub terminal_outcome: Option<InputTerminalOutcome>,
190    /// Compatibility cache of the DSL-owned attempt count.
191    pub attempt_count: u32,
192    pub history: Vec<InputStateHistoryEntry>,
193    pub updated_at: DateTime<Utc>,
194    pub policy: Option<PolicySnapshot>,
195    /// Runtime-stamped run semantics captured at admission and persisted so
196    /// recovery does not reclassify execution kind from payload shape.
197    pub runtime_semantics: Option<RuntimeInputSemantics>,
198    pub durability: Option<crate::input::InputDurability>,
199    pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
200    pub recovery_count: u32,
201    pub reconstruction_source: Option<ReconstructionSource>,
202    pub persisted_input: Option<Input>,
203    pub created_at: DateTime<Utc>,
204}
205
206impl InputState {
207    /// Create a fresh InputState. Paired DSL state starts in the `Accepted`
208    /// phase via [`InputStateSeed::new_accepted`]; callers that need the
209    /// bundle use [`StoredInputState::new_accepted`].
210    pub fn new_accepted(input_id: InputId) -> Self {
211        let now = Utc::now();
212        Self {
213            input_id,
214            terminal_outcome: None,
215            attempt_count: 0,
216            history: Vec::new(),
217            updated_at: now,
218            policy: None,
219            runtime_semantics: None,
220            durability: None,
221            idempotency_key: None,
222            recovery_count: 0,
223            reconstruction_source: None,
224            persisted_input: None,
225            created_at: now,
226        }
227    }
228
229    pub fn is_terminal(&self) -> bool {
230        self.terminal_outcome.is_some()
231    }
232
233    pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
234        self.terminal_outcome.as_ref()
235    }
236
237    pub fn history(&self) -> &[InputStateHistoryEntry] {
238        &self.history
239    }
240
241    pub fn updated_at(&self) -> DateTime<Utc> {
242        self.updated_at
243    }
244
245    pub fn attempt_count(&self) -> u32 {
246        self.attempt_count
247    }
248}
249
250// ---------------------------------------------------------------------------
251// Custom Serialize / Deserialize — preserves the on-disk wire format
252// ---------------------------------------------------------------------------
253//
254// `InputStateSerde` is the on-disk contract exercised by
255// `recovery_contract`, `recovery_replay`, and `driver_persistent` tests.
256// Field names, types, defaults, and `skip_serializing_if` markers are kept
257// verbatim from the pre-5G/1 release. Since `InputState` no longer owns the
258// three DSL-authoritative fields, serialization flows through
259// [`StoredInputState`] where shell + seed can be bundled into the wire
260// struct.
261
262#[derive(Serialize, Deserialize)]
263struct InputStateSerde {
264    input_id: InputId,
265    current_state: InputLifecycleState,
266    #[serde(skip_serializing_if = "Option::is_none")]
267    policy: Option<PolicySnapshot>,
268    #[serde(default, skip_serializing_if = "Option::is_none")]
269    runtime_semantics: Option<RuntimeInputSemantics>,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    terminal_outcome: Option<InputTerminalOutcome>,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    durability: Option<crate::input::InputDurability>,
274    #[serde(skip_serializing_if = "Option::is_none")]
275    idempotency_key: Option<crate::identifiers::IdempotencyKey>,
276    #[serde(default)]
277    attempt_count: u32,
278    #[serde(default)]
279    recovery_count: u32,
280    #[serde(default, skip_serializing_if = "Vec::is_empty")]
281    history: Vec<InputStateHistoryEntry>,
282    #[serde(skip_serializing_if = "Option::is_none")]
283    reconstruction_source: Option<ReconstructionSource>,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    persisted_input: Option<Input>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    last_run_id: Option<RunId>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    last_boundary_sequence: Option<u64>,
290    created_at: DateTime<Utc>,
291    updated_at: DateTime<Utc>,
292}
293
294impl Serialize for StoredInputState {
295    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
296        let helper = InputStateSerde {
297            input_id: self.state.input_id.clone(),
298            current_state: self.seed.phase,
299            policy: self.state.policy.clone(),
300            runtime_semantics: self.state.runtime_semantics,
301            terminal_outcome: self.seed.terminal_outcome.clone(),
302            durability: self.state.durability,
303            idempotency_key: self.state.idempotency_key.clone(),
304            attempt_count: self.seed.attempt_count,
305            recovery_count: self.state.recovery_count,
306            history: self.state.history.clone(),
307            reconstruction_source: self.state.reconstruction_source.clone(),
308            persisted_input: self.state.persisted_input.clone(),
309            last_run_id: self.seed.last_run_id.clone(),
310            last_boundary_sequence: self.seed.last_boundary_sequence,
311            created_at: self.state.created_at,
312            updated_at: self.state.updated_at,
313        };
314        helper.serialize(serializer)
315    }
316}
317
318impl<'de> Deserialize<'de> for StoredInputState {
319    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
320        let helper = InputStateSerde::deserialize(deserializer)?;
321        let state = InputState {
322            input_id: helper.input_id,
323            terminal_outcome: helper.terminal_outcome.clone(),
324            attempt_count: helper.attempt_count,
325            history: helper.history,
326            updated_at: helper.updated_at,
327            policy: helper.policy,
328            runtime_semantics: helper.runtime_semantics,
329            durability: helper.durability,
330            idempotency_key: helper.idempotency_key,
331            recovery_count: helper.recovery_count,
332            reconstruction_source: helper.reconstruction_source,
333            persisted_input: helper.persisted_input,
334            created_at: helper.created_at,
335        };
336        let seed = InputStateSeed {
337            phase: helper.current_state,
338            last_run_id: helper.last_run_id,
339            last_boundary_sequence: helper.last_boundary_sequence,
340            terminal_outcome: helper.terminal_outcome,
341            attempt_count: helper.attempt_count,
342        };
343        Ok(StoredInputState { state, seed })
344    }
345}
346
347#[cfg(test)]
348#[allow(clippy::unwrap_used)]
349mod tests {
350    use super::*;
351    use crate::policy::{
352        ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
353    };
354    use meerkat_core::ops::{OpEvent, OperationId};
355
356    #[test]
357    fn new_accepted_starts_with_no_shell_history() {
358        let id = InputId::new();
359        let state = InputState::new_accepted(id.clone());
360        assert_eq!(state.input_id, id);
361        assert!(state.history.is_empty());
362    }
363
364    #[test]
365    fn seed_new_accepted_defaults_match_queue_lifecycle() {
366        let seed = InputStateSeed::new_accepted();
367        assert_eq!(seed.phase, InputLifecycleState::Accepted);
368        assert!(seed.last_run_id.is_none());
369        assert!(seed.last_boundary_sequence.is_none());
370        assert!(seed.terminal_outcome.is_none());
371        assert_eq!(seed.attempt_count, 0);
372    }
373
374    #[test]
375    fn lifecycle_state_terminal() {
376        assert!(InputLifecycleState::Consumed.is_terminal());
377        assert!(InputLifecycleState::Superseded.is_terminal());
378        assert!(InputLifecycleState::Coalesced.is_terminal());
379        assert!(InputLifecycleState::Abandoned.is_terminal());
380
381        assert!(!InputLifecycleState::Accepted.is_terminal());
382        assert!(!InputLifecycleState::Queued.is_terminal());
383        assert!(!InputLifecycleState::Staged.is_terminal());
384        assert!(!InputLifecycleState::Applied.is_terminal());
385        assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
386    }
387
388    #[test]
389    fn lifecycle_state_serde() {
390        for state in [
391            InputLifecycleState::Accepted,
392            InputLifecycleState::Queued,
393            InputLifecycleState::Staged,
394            InputLifecycleState::Applied,
395            InputLifecycleState::AppliedPendingConsumption,
396            InputLifecycleState::Consumed,
397            InputLifecycleState::Superseded,
398            InputLifecycleState::Coalesced,
399            InputLifecycleState::Abandoned,
400        ] {
401            let json = serde_json::to_value(state).unwrap();
402            let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
403            assert_eq!(state, parsed);
404        }
405    }
406
407    #[test]
408    fn stored_input_state_serde_roundtrip_preserves_fields() {
409        let mut state = InputState::new_accepted(InputId::new());
410        let policy = PolicyDecision {
411            apply_mode: ApplyMode::StageRunStart,
412            wake_mode: WakeMode::WakeIfIdle,
413            queue_mode: QueueMode::Fifo,
414            consume_point: ConsumePoint::OnRunComplete,
415            drain_policy: DrainPolicy::QueueNextTurn,
416            routing_disposition: RoutingDisposition::Queue,
417            record_transcript: true,
418            emit_operator_content: true,
419            policy_version: PolicyVersion(1),
420        };
421        state.policy = Some(PolicySnapshot {
422            version: PolicyVersion(1),
423            decision: policy.clone(),
424        });
425        state.runtime_semantics = Some(RuntimeInputSemantics::from_policy_and_kind(
426            &policy,
427            crate::identifiers::InputKind::Prompt,
428        ));
429        state.history.push(InputStateHistoryEntry {
430            timestamp: state.updated_at,
431            from: InputLifecycleState::Accepted,
432            to: InputLifecycleState::Queued,
433            reason: Some("QueueAccepted".into()),
434        });
435        let bundle = StoredInputState {
436            state,
437            seed: InputStateSeed {
438                phase: InputLifecycleState::Queued,
439                last_run_id: None,
440                last_boundary_sequence: None,
441                terminal_outcome: None,
442                attempt_count: 0,
443            },
444        };
445
446        let json = serde_json::to_value(&bundle).unwrap();
447        let parsed: StoredInputState = serde_json::from_value(json).unwrap();
448        assert_eq!(parsed.state.input_id, bundle.state.input_id);
449        assert_eq!(parsed.seed.phase, bundle.seed.phase);
450        assert_eq!(
451            parsed.state.runtime_semantics,
452            bundle.state.runtime_semantics
453        );
454        assert_eq!(parsed.state.history.len(), 1);
455    }
456
457    #[test]
458    fn stored_input_state_deserializes_legacy_persisted_input_tags() {
459        let continuation_bundle = StoredInputState {
460            state: InputState {
461                persisted_input: Some(Input::Continuation(
462                    crate::input::ContinuationInput::detached_background_op_completed(),
463                )),
464                ..InputState::new_accepted(InputId::new())
465            },
466            seed: InputStateSeed::new_accepted(),
467        };
468        let mut continuation_json = serde_json::to_value(&continuation_bundle).unwrap();
469        continuation_json["persisted_input"]["input_type"] =
470            serde_json::Value::String("system_generated".into());
471        let parsed: StoredInputState = serde_json::from_value(continuation_json).unwrap();
472        assert!(matches!(
473            parsed.state.persisted_input,
474            Some(Input::Continuation(_))
475        ));
476
477        let operation_bundle = StoredInputState {
478            state: InputState {
479                persisted_input: Some(Input::Operation(crate::input::OperationInput {
480                    header: crate::input::InputHeader {
481                        id: InputId::new(),
482                        timestamp: Utc::now(),
483                        source: crate::input::InputOrigin::System,
484                        durability: crate::input::InputDurability::Derived,
485                        visibility: crate::input::InputVisibility::default(),
486                        idempotency_key: None,
487                        supersession_key: None,
488                        correlation_id: None,
489                    },
490                    operation_id: OperationId::new(),
491                    event: OpEvent::Cancelled {
492                        id: OperationId::new(),
493                    },
494                })),
495                ..InputState::new_accepted(InputId::new())
496            },
497            seed: InputStateSeed::new_accepted(),
498        };
499        let mut operation_json = serde_json::to_value(&operation_bundle).unwrap();
500        operation_json["persisted_input"]["input_type"] =
501            serde_json::Value::String("projected".into());
502        let parsed: StoredInputState = serde_json::from_value(operation_json).unwrap();
503        assert!(matches!(
504            parsed.state.persisted_input,
505            Some(Input::Operation(_))
506        ));
507    }
508
509    #[test]
510    fn abandon_reason_serde() {
511        for reason in [
512            InputAbandonReason::Retired,
513            InputAbandonReason::Reset,
514            InputAbandonReason::Destroyed,
515            InputAbandonReason::Cancelled,
516        ] {
517            let json = serde_json::to_value(&reason).unwrap();
518            let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
519            assert_eq!(reason, parsed);
520        }
521    }
522
523    #[test]
524    fn terminal_outcome_consumed_serde() {
525        let outcome = InputTerminalOutcome::Consumed;
526        let json = serde_json::to_value(&outcome).unwrap();
527        assert_eq!(json["outcome_type"], "consumed");
528        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
529        assert_eq!(outcome, parsed);
530    }
531
532    #[test]
533    fn terminal_outcome_superseded_serde() {
534        let outcome = InputTerminalOutcome::Superseded {
535            superseded_by: InputId::new(),
536        };
537        let json = serde_json::to_value(&outcome).unwrap();
538        assert_eq!(json["outcome_type"], "superseded");
539        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
540        assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
541    }
542
543    #[test]
544    fn terminal_outcome_abandoned_serde() {
545        let outcome = InputTerminalOutcome::Abandoned {
546            reason: InputAbandonReason::Retired,
547        };
548        let json = serde_json::to_value(&outcome).unwrap();
549        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
550        assert!(matches!(
551            parsed,
552            InputTerminalOutcome::Abandoned {
553                reason: InputAbandonReason::Retired,
554            }
555        ));
556    }
557
558    #[test]
559    fn reconstruction_source_serde() {
560        let sources = vec![
561            ReconstructionSource::Projection {
562                rule_id: "rule-1".into(),
563                source_event_id: "evt-1".into(),
564            },
565            ReconstructionSource::Coalescing {
566                source_input_ids: vec![InputId::new(), InputId::new()],
567            },
568        ];
569        for source in sources {
570            let json = serde_json::to_value(&source).unwrap();
571            assert!(json["source_type"].is_string());
572            let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
573            let _ = parsed;
574        }
575    }
576
577    #[test]
578    fn input_state_event_serde() {
579        let event = InputStateEvent {
580            timestamp: Utc::now(),
581            state: InputLifecycleState::Queued,
582            detail: Some("queued for processing".into()),
583        };
584        let json = serde_json::to_value(&event).unwrap();
585        let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
586        assert_eq!(parsed.state, InputLifecycleState::Queued);
587    }
588}