Skip to main content

meerkat_runtime/
input_state.rs

1//! ยง13 InputState โ€” per-input data shell.
2//!
3//! Canonical lifecycle truth for every input lives in the MeerkatMachine DSL
4//! (`input_phases`, `input_run_associations`, `input_boundary_sequences` plus
5//! the `QueueAccepted` / `StageForRun` / `RecordBoundarySeq` / etc.
6//! transitions). This module owns ONLY the per-input shell metadata needed for
7//! persistence/projection: a history log, timestamps, compatibility policy
8//! snapshot, durability observation, idempotency key, and the cached payload
9//! needed to rebuild queued work after recovery. Durability admission validity
10//! and recovered keep/drop behavior are emitted by generated MeerkatMachine
11//! inputs/effects.
12//!
13//! Terminal outcome and attempt count are DSL-owned facts. Live reads go
14//! through `EphemeralRuntimeDriver::input_terminal_outcome` /
15//! `input_attempt_count`; persistence carries them on [`InputStateSeed`].
16//! `InputState` holds no copy of either.
17
18use chrono::{DateTime, Utc};
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::HandlingMode;
21use serde::{Deserialize, Serialize};
22
23use crate::identifiers::PolicyVersion;
24use crate::ingress_types::RuntimeInputSemantics;
25use crate::input::Input;
26use crate::policy::PolicyDecision;
27
28/// The lifecycle state of an input โ€” mirrors the DSL's `input_phases` values.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31#[non_exhaustive]
32pub enum InputLifecycleState {
33    Accepted,
34    Queued,
35    Staged,
36    Applied,
37    AppliedPendingConsumption,
38    Consumed,
39    Superseded,
40    Coalesced,
41    Abandoned,
42}
43
44/// Why an input was abandoned.
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47#[non_exhaustive]
48pub enum InputAbandonReason {
49    Retired,
50    Reset,
51    Stopped,
52    Destroyed,
53    Cancelled,
54    MaxAttemptsExhausted { attempts: u32 },
55}
56
57/// Terminal outcome for an input.
58///
59/// The authoritative live copy is split across the DSL's typed terminal maps;
60/// persistence carries it on [`InputStateSeed`].
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(tag = "outcome_type", rename_all = "snake_case")]
63#[non_exhaustive]
64pub enum InputTerminalOutcome {
65    Consumed,
66    Superseded { superseded_by: InputId },
67    Coalesced { aggregate_id: InputId },
68    Abandoned { reason: InputAbandonReason },
69}
70
71/// A single entry in the input's state history (shell bookkeeping).
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct InputStateHistoryEntry {
74    pub timestamp: DateTime<Utc>,
75    pub from: InputLifecycleState,
76    pub to: InputLifecycleState,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub reason: Option<String>,
79}
80
81/// Snapshot of the policy that was applied to this input.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct PolicySnapshot {
84    pub version: PolicyVersion,
85    pub decision: PolicyDecision,
86}
87
88/// How a derived input can be reconstructed after crash recovery.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(tag = "source_type", rename_all = "snake_case")]
91#[non_exhaustive]
92pub enum ReconstructionSource {
93    Projection {
94        rule_id: String,
95        source_event_id: String,
96    },
97    Coalescing {
98        source_input_ids: Vec<InputId>,
99    },
100}
101
102/// An event on an input's state (for event sourcing).
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct InputStateEvent {
105    pub timestamp: DateTime<Utc>,
106    pub state: InputLifecycleState,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub detail: Option<String>,
109}
110
111/// DSL-owned lifecycle projection for an input.
112///
113/// Carries the fields that are authoritative in the MeerkatMachine DSL
114/// (`input_phases`, `input_run_associations`, `input_boundary_sequences`,
115/// `input_terminal_kind` + `input_superseded_by` / `input_aggregate_id` /
116/// `input_abandon_reason` / `input_abandon_attempt_count`, and
117/// `input_attempt_counts` / `input_admission_seq` / `input_recovery_lanes`) so
118/// they can travel alongside a persisted [`InputState`] at the store boundary,
119/// where no live DSL is available to query. Inside a running driver, these
120/// values are always read from the DSL directly, never from the seed.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct InputStateSeed {
123    pub phase: InputLifecycleState,
124    pub last_run_id: Option<RunId>,
125    pub last_boundary_sequence: Option<u64>,
126    pub admission_sequence: Option<u64>,
127    pub terminal_outcome: Option<InputTerminalOutcome>,
128    pub attempt_count: u32,
129    pub recovery_lane: Option<HandlingMode>,
130}
131
132impl InputStateSeed {
133    /// Freshly-accepted input: no run association, no boundary sequence,
134    /// no terminal outcome, zero attempts.
135    pub fn new_accepted() -> Self {
136        Self {
137            phase: InputLifecycleState::Accepted,
138            last_run_id: None,
139            last_boundary_sequence: None,
140            admission_sequence: None,
141            terminal_outcome: None,
142            attempt_count: 0,
143            recovery_lane: None,
144        }
145    }
146}
147
148/// Persisted bundle: shell [`InputState`] plus its [`InputStateSeed`].
149///
150/// Used at the store boundary so the DSL-owned fields survive persistence
151/// without being re-shadowed onto `InputState` itself. Recovery treats the
152/// seed as a durable witness and re-enters the recovered facts through typed
153/// machine inputs; it does not hydrate DSL state directly from this bundle.
154#[derive(Debug, Clone)]
155pub struct StoredInputState {
156    pub state: InputState,
157    pub seed: InputStateSeed,
158}
159
160impl StoredInputState {
161    /// Convenience: freshly-accepted bundle.
162    pub fn new_accepted(input_id: InputId) -> Self {
163        Self {
164            state: InputState::new_accepted(input_id),
165            seed: InputStateSeed::new_accepted(),
166        }
167    }
168}
169
170/// Store-write wrapper for an input-state bundle whose DSL-owned seed facts
171/// came from a generated MeerkatMachine-owned snapshot.
172#[derive(Debug, Clone)]
173pub struct InputStatePersistenceRecord {
174    bundle: StoredInputState,
175}
176
177impl InputStatePersistenceRecord {
178    /// Package a store-bound input-state bundle that was read from generated
179    /// MeerkatMachine authority. This is intentionally crate-private so
180    /// callers cannot mint persistence records from handwritten seed facts.
181    pub(crate) fn from_machine_snapshot(bundle: StoredInputState) -> Result<Self, String> {
182        crate::meerkat_machine::authorize_stored_input_state_seed(
183            &bundle.state.input_id,
184            &bundle.seed,
185        )?;
186        Ok(Self { bundle })
187    }
188
189    /// Raw bundle approved for durable persistence.
190    pub fn as_stored(&self) -> &StoredInputState {
191        &self.bundle
192    }
193
194    /// Clone the approved raw bundle.
195    pub fn clone_stored(&self) -> StoredInputState {
196        self.bundle.clone()
197    }
198
199    /// Consume the approved record into its raw bundle.
200    pub fn into_stored(self) -> StoredInputState {
201        self.bundle
202    }
203}
204
205/// Per-input shell data. Plain fields, no hidden state machine.
206///
207/// All DSL-owned lifecycle fields (`phase`, `last_run_id`,
208/// `last_boundary_sequence`, `terminal_outcome`, `attempt_count`,
209/// `recovery_lane`) are
210/// authoritative in the DSL. Live code reads them via
211/// `EphemeralRuntimeDriver::input_phase` / `input_last_run_id` /
212/// `input_last_boundary_sequence` / `input_terminal_outcome` /
213/// `input_attempt_count` / `input_recovery_lane`. Persistence callsites
214/// serialize them via [`InputStateSeed`] bundled on [`StoredInputState`].
215#[derive(Debug, Clone)]
216pub struct InputState {
217    pub input_id: InputId,
218    pub history: Vec<InputStateHistoryEntry>,
219    pub updated_at: DateTime<Utc>,
220    pub policy: Option<PolicySnapshot>,
221    /// Runtime-stamped run semantics captured at admission and persisted so
222    /// recovery does not reclassify execution kind from payload shape.
223    pub runtime_semantics: Option<RuntimeInputSemantics>,
224    pub durability: Option<crate::input::InputDurability>,
225    pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
226    pub recovery_count: u32,
227    pub reconstruction_source: Option<ReconstructionSource>,
228    pub persisted_input: Option<Input>,
229    pub created_at: DateTime<Utc>,
230}
231
232impl InputState {
233    /// Create a fresh InputState. Paired DSL state starts in the `Accepted`
234    /// phase via [`InputStateSeed::new_accepted`]; callers that need the
235    /// bundle use [`StoredInputState::new_accepted`].
236    pub fn new_accepted(input_id: InputId) -> Self {
237        let now = Utc::now();
238        Self {
239            input_id,
240            history: Vec::new(),
241            updated_at: now,
242            policy: None,
243            runtime_semantics: None,
244            durability: None,
245            idempotency_key: None,
246            recovery_count: 0,
247            reconstruction_source: None,
248            persisted_input: None,
249            created_at: now,
250        }
251    }
252
253    pub fn history(&self) -> &[InputStateHistoryEntry] {
254        &self.history
255    }
256
257    pub fn updated_at(&self) -> DateTime<Utc> {
258        self.updated_at
259    }
260}
261
262// ---------------------------------------------------------------------------
263// Custom Serialize / Deserialize โ€” preserves the on-disk wire format
264// ---------------------------------------------------------------------------
265//
266// `InputStateSerde` is the on-disk contract exercised by
267// `recovery_contract`, `recovery_replay`, and `driver_persistent` tests.
268// Field names, types, defaults, and `skip_serializing_if` markers are kept
269// verbatim from the pre-5G/1 release. Since `InputState` no longer owns the
270// three DSL-authoritative fields, serialization flows through
271// [`StoredInputState`] where shell + seed can be bundled into the wire
272// struct.
273
274#[derive(Serialize, Deserialize)]
275struct InputStateSerde {
276    stored_input_state_version: u32,
277    input_id: InputId,
278    current_state: InputLifecycleState,
279    #[serde(skip_serializing_if = "Option::is_none")]
280    policy: Option<PolicySnapshot>,
281    #[serde(default, skip_serializing_if = "Option::is_none")]
282    runtime_semantics: Option<RuntimeInputSemantics>,
283    #[serde(skip_serializing_if = "Option::is_none")]
284    terminal_outcome: Option<InputTerminalOutcome>,
285    #[serde(skip_serializing_if = "Option::is_none")]
286    durability: Option<crate::input::InputDurability>,
287    #[serde(skip_serializing_if = "Option::is_none")]
288    idempotency_key: Option<crate::identifiers::IdempotencyKey>,
289    #[serde(default)]
290    attempt_count: u32,
291    #[serde(default)]
292    recovery_count: u32,
293    #[serde(default, skip_serializing_if = "Vec::is_empty")]
294    history: Vec<InputStateHistoryEntry>,
295    #[serde(skip_serializing_if = "Option::is_none")]
296    reconstruction_source: Option<ReconstructionSource>,
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    persisted_input: Option<Input>,
299    #[serde(default, skip_serializing_if = "Option::is_none")]
300    last_run_id: Option<RunId>,
301    #[serde(default, skip_serializing_if = "Option::is_none")]
302    last_boundary_sequence: Option<u64>,
303    #[serde(default, skip_serializing_if = "Option::is_none")]
304    admission_sequence: Option<u64>,
305    #[serde(default, skip_serializing_if = "Option::is_none")]
306    recovery_lane: Option<HandlingMode>,
307    created_at: DateTime<Utc>,
308    updated_at: DateTime<Utc>,
309}
310
311impl Serialize for StoredInputState {
312    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
313        let helper = InputStateSerde {
314            stored_input_state_version:
315                meerkat_core::generated::session_persistence_version_authority::stored_input_state_version(
316                ),
317            input_id: self.state.input_id.clone(),
318            current_state: self.seed.phase,
319            policy: self.state.policy.clone(),
320            runtime_semantics: self.state.runtime_semantics,
321            terminal_outcome: self.seed.terminal_outcome.clone(),
322            durability: self.state.durability,
323            idempotency_key: self.state.idempotency_key.clone(),
324            attempt_count: self.seed.attempt_count,
325            recovery_count: self.state.recovery_count,
326            history: self.state.history.clone(),
327            reconstruction_source: self.state.reconstruction_source.clone(),
328            persisted_input: self.state.persisted_input.clone(),
329            last_run_id: self.seed.last_run_id.clone(),
330            last_boundary_sequence: self.seed.last_boundary_sequence,
331            admission_sequence: self.seed.admission_sequence,
332            recovery_lane: self.seed.recovery_lane,
333            created_at: self.state.created_at,
334            updated_at: self.state.updated_at,
335        };
336        helper.serialize(serializer)
337    }
338}
339
340impl<'de> Deserialize<'de> for StoredInputState {
341    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
342        let helper = InputStateSerde::deserialize(deserializer)?;
343        let _stored_input_state_version =
344            meerkat_core::generated::session_persistence_version_authority::restore_stored_input_state_version(
345                helper.stored_input_state_version,
346            )
347            .map_err(<D::Error as serde::de::Error>::custom)?;
348        let state = InputState {
349            input_id: helper.input_id,
350            history: helper.history,
351            updated_at: helper.updated_at,
352            policy: helper.policy,
353            runtime_semantics: helper.runtime_semantics,
354            durability: helper.durability,
355            idempotency_key: helper.idempotency_key,
356            recovery_count: helper.recovery_count,
357            reconstruction_source: helper.reconstruction_source,
358            persisted_input: helper.persisted_input,
359            created_at: helper.created_at,
360        };
361        let seed = InputStateSeed {
362            phase: helper.current_state,
363            last_run_id: helper.last_run_id,
364            last_boundary_sequence: helper.last_boundary_sequence,
365            admission_sequence: helper.admission_sequence,
366            terminal_outcome: helper.terminal_outcome,
367            attempt_count: helper.attempt_count,
368            recovery_lane: helper.recovery_lane,
369        };
370        Ok(StoredInputState { state, seed })
371    }
372}
373
374#[cfg(test)]
375#[allow(clippy::unwrap_used)]
376mod tests {
377    use super::*;
378    use crate::policy::{
379        ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
380    };
381    use meerkat_core::ops::{OpEvent, OperationId};
382
383    #[test]
384    fn new_accepted_starts_with_no_shell_history() {
385        let id = InputId::new();
386        let state = InputState::new_accepted(id.clone());
387        assert_eq!(state.input_id, id);
388        assert!(state.history.is_empty());
389    }
390
391    #[test]
392    fn seed_new_accepted_defaults_match_queue_lifecycle() {
393        let seed = InputStateSeed::new_accepted();
394        assert_eq!(seed.phase, InputLifecycleState::Accepted);
395        assert!(seed.last_run_id.is_none());
396        assert!(seed.last_boundary_sequence.is_none());
397        assert!(seed.admission_sequence.is_none());
398        assert!(seed.terminal_outcome.is_none());
399        assert_eq!(seed.attempt_count, 0);
400    }
401
402    #[test]
403    fn lifecycle_state_serde() {
404        for state in [
405            InputLifecycleState::Accepted,
406            InputLifecycleState::Queued,
407            InputLifecycleState::Staged,
408            InputLifecycleState::Applied,
409            InputLifecycleState::AppliedPendingConsumption,
410            InputLifecycleState::Consumed,
411            InputLifecycleState::Superseded,
412            InputLifecycleState::Coalesced,
413            InputLifecycleState::Abandoned,
414        ] {
415            let json = serde_json::to_value(state).unwrap();
416            let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
417            assert_eq!(state, parsed);
418        }
419    }
420
421    #[test]
422    fn stored_input_state_serde_roundtrip_preserves_fields() {
423        let mut state = InputState::new_accepted(InputId::new());
424        let policy = PolicyDecision {
425            apply_mode: ApplyMode::StageRunStart,
426            wake_mode: WakeMode::WakeIfIdle,
427            queue_mode: QueueMode::Fifo,
428            consume_point: ConsumePoint::OnRunComplete,
429            drain_policy: DrainPolicy::QueueNextTurn,
430            routing_disposition: RoutingDisposition::Queue,
431            record_transcript: true,
432            emit_operator_content: true,
433            policy_version: PolicyVersion(1),
434        };
435        state.policy = Some(PolicySnapshot {
436            version: PolicyVersion(1),
437            decision: policy.clone(),
438        });
439        state.runtime_semantics = Some(
440            crate::policy_table::generated_admission_projection_for_kind(
441                crate::identifiers::KindId::new(crate::identifiers::InputKind::Prompt),
442                true,
443            )
444            .expect("generated admission projection")
445            .runtime_semantics,
446        );
447        state.history.push(InputStateHistoryEntry {
448            timestamp: state.updated_at,
449            from: InputLifecycleState::Accepted,
450            to: InputLifecycleState::Queued,
451            reason: Some("QueueAccepted".into()),
452        });
453        let bundle = StoredInputState {
454            state,
455            seed: InputStateSeed {
456                phase: InputLifecycleState::Queued,
457                last_run_id: None,
458                last_boundary_sequence: None,
459                admission_sequence: Some(42),
460                terminal_outcome: None,
461                attempt_count: 0,
462                recovery_lane: Some(HandlingMode::Queue),
463            },
464        };
465
466        let json = serde_json::to_value(&bundle).unwrap();
467        let parsed: StoredInputState = serde_json::from_value(json).unwrap();
468        assert_eq!(parsed.state.input_id, bundle.state.input_id);
469        assert_eq!(parsed.seed.phase, bundle.seed.phase);
470        assert_eq!(
471            parsed.seed.admission_sequence,
472            bundle.seed.admission_sequence
473        );
474        assert_eq!(parsed.seed.recovery_lane, bundle.seed.recovery_lane);
475        assert_eq!(
476            parsed.state.runtime_semantics,
477            bundle.state.runtime_semantics
478        );
479        assert_eq!(parsed.state.history.len(), 1);
480    }
481
482    #[test]
483    fn stored_input_state_rejects_legacy_persisted_input_tags() {
484        // Pre-rename `system_generated` / `projected` persisted input tags are
485        // retired shapes: a stored row carrying them must fail closed instead
486        // of being folded into the canonical `continuation` / `operation` tags.
487        let continuation_bundle = StoredInputState {
488            state: InputState {
489                persisted_input: Some(Input::Continuation(
490                    crate::input::ContinuationInput::detached_background_op_completed(),
491                )),
492                ..InputState::new_accepted(InputId::new())
493            },
494            seed: InputStateSeed::new_accepted(),
495        };
496        let mut continuation_json = serde_json::to_value(&continuation_bundle).unwrap();
497        continuation_json["persisted_input"]["input_type"] =
498            serde_json::Value::String("system_generated".into());
499        serde_json::from_value::<StoredInputState>(continuation_json)
500            .expect_err("legacy system_generated persisted input tag must be rejected");
501
502        let operation_bundle = StoredInputState {
503            state: InputState {
504                persisted_input: Some(Input::Operation(crate::input::OperationInput {
505                    header: crate::input::InputHeader {
506                        id: InputId::new(),
507                        timestamp: Utc::now(),
508                        source: crate::input::InputOrigin::System,
509                        durability: crate::input::InputDurability::Derived,
510                        visibility: crate::input::InputVisibility::default(),
511                        idempotency_key: None,
512                        supersession_key: None,
513                        correlation_id: None,
514                    },
515                    operation_id: OperationId::new(),
516                    event: OpEvent::Cancelled {
517                        id: OperationId::new(),
518                    },
519                })),
520                ..InputState::new_accepted(InputId::new())
521            },
522            seed: InputStateSeed::new_accepted(),
523        };
524        let mut operation_json = serde_json::to_value(&operation_bundle).unwrap();
525        operation_json["persisted_input"]["input_type"] =
526            serde_json::Value::String("projected".into());
527        serde_json::from_value::<StoredInputState>(operation_json)
528            .expect_err("legacy projected persisted input tag must be rejected");
529    }
530
531    #[test]
532    fn stored_input_state_rejects_legacy_dual_carrier_persisted_input_shape() {
533        // The retired persisted prompt shape carried `text` + optional
534        // `blocks`; the single typed `content` owner replaced both. A stored
535        // row holding the old shape must fail closed.
536        let bundle = StoredInputState {
537            state: InputState {
538                persisted_input: Some(Input::Prompt(crate::input::PromptInput::new("hello", None))),
539                ..InputState::new_accepted(InputId::new())
540            },
541            seed: InputStateSeed::new_accepted(),
542        };
543        let mut json = serde_json::to_value(&bundle).unwrap();
544        let persisted = json["persisted_input"]
545            .as_object_mut()
546            .expect("persisted_input object");
547        persisted.remove("content");
548        persisted.insert("text".into(), serde_json::Value::String("hello".into()));
549        persisted.insert("blocks".into(), serde_json::Value::Null);
550        serde_json::from_value::<StoredInputState>(json)
551            .expect_err("legacy text+blocks persisted prompt shape must be rejected");
552    }
553
554    #[test]
555    fn abandon_reason_serde() {
556        for reason in [
557            InputAbandonReason::Retired,
558            InputAbandonReason::Reset,
559            InputAbandonReason::Destroyed,
560            InputAbandonReason::Cancelled,
561        ] {
562            let json = serde_json::to_value(&reason).unwrap();
563            let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
564            assert_eq!(reason, parsed);
565        }
566    }
567
568    #[test]
569    fn terminal_outcome_consumed_serde() {
570        let outcome = InputTerminalOutcome::Consumed;
571        let json = serde_json::to_value(&outcome).unwrap();
572        assert_eq!(json["outcome_type"], "consumed");
573        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
574        assert_eq!(outcome, parsed);
575    }
576
577    #[test]
578    fn terminal_outcome_superseded_serde() {
579        let outcome = InputTerminalOutcome::Superseded {
580            superseded_by: InputId::new(),
581        };
582        let json = serde_json::to_value(&outcome).unwrap();
583        assert_eq!(json["outcome_type"], "superseded");
584        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
585        assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
586    }
587
588    #[test]
589    fn terminal_outcome_abandoned_serde() {
590        let outcome = InputTerminalOutcome::Abandoned {
591            reason: InputAbandonReason::Retired,
592        };
593        let json = serde_json::to_value(&outcome).unwrap();
594        let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
595        assert!(matches!(
596            parsed,
597            InputTerminalOutcome::Abandoned {
598                reason: InputAbandonReason::Retired,
599            }
600        ));
601    }
602
603    #[test]
604    fn reconstruction_source_serde() {
605        let sources = vec![
606            ReconstructionSource::Projection {
607                rule_id: "rule-1".into(),
608                source_event_id: "evt-1".into(),
609            },
610            ReconstructionSource::Coalescing {
611                source_input_ids: vec![InputId::new(), InputId::new()],
612            },
613        ];
614        for source in sources {
615            let json = serde_json::to_value(&source).unwrap();
616            assert!(json["source_type"].is_string());
617            let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
618            let _ = parsed;
619        }
620    }
621
622    #[test]
623    fn input_state_event_serde() {
624        let event = InputStateEvent {
625            timestamp: Utc::now(),
626            state: InputLifecycleState::Queued,
627            detail: Some("queued for processing".into()),
628        };
629        let json = serde_json::to_value(&event).unwrap();
630        let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
631        assert_eq!(parsed.state, InputLifecycleState::Queued);
632    }
633}