1use chrono::{DateTime, Utc};
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::HandlingMode;
21use serde::{Deserialize, Serialize};
22
23use crate::identifiers::PolicyVersion;
24use crate::ingress_types::RuntimeInputSemantics;
25use crate::input::Input;
26use crate::policy::PolicyDecision;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31#[non_exhaustive]
32pub enum InputLifecycleState {
33 Accepted,
34 Queued,
35 Staged,
36 Applied,
37 AppliedPendingConsumption,
38 Consumed,
39 Superseded,
40 Coalesced,
41 Abandoned,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47#[non_exhaustive]
48pub enum InputAbandonReason {
49 Retired,
50 Reset,
51 Stopped,
52 Destroyed,
53 Cancelled,
54 MaxAttemptsExhausted { attempts: u32 },
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(tag = "outcome_type", rename_all = "snake_case")]
63#[non_exhaustive]
64pub enum InputTerminalOutcome {
65 Consumed,
66 Superseded { superseded_by: InputId },
67 Coalesced { aggregate_id: InputId },
68 Abandoned { reason: InputAbandonReason },
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct InputStateHistoryEntry {
74 pub timestamp: DateTime<Utc>,
75 pub from: InputLifecycleState,
76 pub to: InputLifecycleState,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub reason: Option<String>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct PolicySnapshot {
84 pub version: PolicyVersion,
85 pub decision: PolicyDecision,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(tag = "source_type", rename_all = "snake_case")]
91#[non_exhaustive]
92pub enum ReconstructionSource {
93 Projection {
94 rule_id: String,
95 source_event_id: String,
96 },
97 Coalescing {
98 source_input_ids: Vec<InputId>,
99 },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct InputStateEvent {
105 pub timestamp: DateTime<Utc>,
106 pub state: InputLifecycleState,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub detail: Option<String>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct InputStateSeed {
123 pub phase: InputLifecycleState,
124 pub last_run_id: Option<RunId>,
125 pub last_boundary_sequence: Option<u64>,
126 pub admission_sequence: Option<u64>,
127 pub terminal_outcome: Option<InputTerminalOutcome>,
128 pub attempt_count: u32,
129 pub recovery_lane: Option<HandlingMode>,
130}
131
132impl InputStateSeed {
133 pub fn new_accepted() -> Self {
136 Self {
137 phase: InputLifecycleState::Accepted,
138 last_run_id: None,
139 last_boundary_sequence: None,
140 admission_sequence: None,
141 terminal_outcome: None,
142 attempt_count: 0,
143 recovery_lane: None,
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
155pub struct StoredInputState {
156 pub state: InputState,
157 pub seed: InputStateSeed,
158}
159
160impl StoredInputState {
161 pub fn new_accepted(input_id: InputId) -> Self {
163 Self {
164 state: InputState::new_accepted(input_id),
165 seed: InputStateSeed::new_accepted(),
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
173pub struct InputStatePersistenceRecord {
174 bundle: StoredInputState,
175}
176
177impl InputStatePersistenceRecord {
178 pub(crate) fn from_machine_snapshot(bundle: StoredInputState) -> Result<Self, String> {
182 crate::meerkat_machine::authorize_stored_input_state_seed(
183 &bundle.state.input_id,
184 &bundle.seed,
185 )?;
186 Ok(Self { bundle })
187 }
188
189 pub fn as_stored(&self) -> &StoredInputState {
191 &self.bundle
192 }
193
194 pub fn clone_stored(&self) -> StoredInputState {
196 self.bundle.clone()
197 }
198
199 pub fn into_stored(self) -> StoredInputState {
201 self.bundle
202 }
203}
204
205#[derive(Debug, Clone)]
216pub struct InputState {
217 pub input_id: InputId,
218 pub history: Vec<InputStateHistoryEntry>,
219 pub updated_at: DateTime<Utc>,
220 pub policy: Option<PolicySnapshot>,
221 pub runtime_semantics: Option<RuntimeInputSemantics>,
224 pub durability: Option<crate::input::InputDurability>,
225 pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
226 pub recovery_count: u32,
227 pub reconstruction_source: Option<ReconstructionSource>,
228 pub persisted_input: Option<Input>,
229 pub created_at: DateTime<Utc>,
230}
231
232impl InputState {
233 pub fn new_accepted(input_id: InputId) -> Self {
237 let now = Utc::now();
238 Self {
239 input_id,
240 history: Vec::new(),
241 updated_at: now,
242 policy: None,
243 runtime_semantics: None,
244 durability: None,
245 idempotency_key: None,
246 recovery_count: 0,
247 reconstruction_source: None,
248 persisted_input: None,
249 created_at: now,
250 }
251 }
252
253 pub fn history(&self) -> &[InputStateHistoryEntry] {
254 &self.history
255 }
256
257 pub fn updated_at(&self) -> DateTime<Utc> {
258 self.updated_at
259 }
260}
261
262#[derive(Serialize, Deserialize)]
275struct InputStateSerde {
276 stored_input_state_version: u32,
277 input_id: InputId,
278 current_state: InputLifecycleState,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 policy: Option<PolicySnapshot>,
281 #[serde(default, skip_serializing_if = "Option::is_none")]
282 runtime_semantics: Option<RuntimeInputSemantics>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 terminal_outcome: Option<InputTerminalOutcome>,
285 #[serde(skip_serializing_if = "Option::is_none")]
286 durability: Option<crate::input::InputDurability>,
287 #[serde(skip_serializing_if = "Option::is_none")]
288 idempotency_key: Option<crate::identifiers::IdempotencyKey>,
289 #[serde(default)]
290 attempt_count: u32,
291 #[serde(default)]
292 recovery_count: u32,
293 #[serde(default, skip_serializing_if = "Vec::is_empty")]
294 history: Vec<InputStateHistoryEntry>,
295 #[serde(skip_serializing_if = "Option::is_none")]
296 reconstruction_source: Option<ReconstructionSource>,
297 #[serde(default, skip_serializing_if = "Option::is_none")]
298 persisted_input: Option<Input>,
299 #[serde(default, skip_serializing_if = "Option::is_none")]
300 last_run_id: Option<RunId>,
301 #[serde(default, skip_serializing_if = "Option::is_none")]
302 last_boundary_sequence: Option<u64>,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
304 admission_sequence: Option<u64>,
305 #[serde(default, skip_serializing_if = "Option::is_none")]
306 recovery_lane: Option<HandlingMode>,
307 created_at: DateTime<Utc>,
308 updated_at: DateTime<Utc>,
309}
310
311impl Serialize for StoredInputState {
312 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
313 let helper = InputStateSerde {
314 stored_input_state_version:
315 meerkat_core::generated::session_persistence_version_authority::stored_input_state_version(
316 ),
317 input_id: self.state.input_id.clone(),
318 current_state: self.seed.phase,
319 policy: self.state.policy.clone(),
320 runtime_semantics: self.state.runtime_semantics,
321 terminal_outcome: self.seed.terminal_outcome.clone(),
322 durability: self.state.durability,
323 idempotency_key: self.state.idempotency_key.clone(),
324 attempt_count: self.seed.attempt_count,
325 recovery_count: self.state.recovery_count,
326 history: self.state.history.clone(),
327 reconstruction_source: self.state.reconstruction_source.clone(),
328 persisted_input: self.state.persisted_input.clone(),
329 last_run_id: self.seed.last_run_id.clone(),
330 last_boundary_sequence: self.seed.last_boundary_sequence,
331 admission_sequence: self.seed.admission_sequence,
332 recovery_lane: self.seed.recovery_lane,
333 created_at: self.state.created_at,
334 updated_at: self.state.updated_at,
335 };
336 helper.serialize(serializer)
337 }
338}
339
340impl<'de> Deserialize<'de> for StoredInputState {
341 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
342 let helper = InputStateSerde::deserialize(deserializer)?;
343 let _stored_input_state_version =
344 meerkat_core::generated::session_persistence_version_authority::restore_stored_input_state_version(
345 helper.stored_input_state_version,
346 )
347 .map_err(<D::Error as serde::de::Error>::custom)?;
348 let state = InputState {
349 input_id: helper.input_id,
350 history: helper.history,
351 updated_at: helper.updated_at,
352 policy: helper.policy,
353 runtime_semantics: helper.runtime_semantics,
354 durability: helper.durability,
355 idempotency_key: helper.idempotency_key,
356 recovery_count: helper.recovery_count,
357 reconstruction_source: helper.reconstruction_source,
358 persisted_input: helper.persisted_input,
359 created_at: helper.created_at,
360 };
361 let seed = InputStateSeed {
362 phase: helper.current_state,
363 last_run_id: helper.last_run_id,
364 last_boundary_sequence: helper.last_boundary_sequence,
365 admission_sequence: helper.admission_sequence,
366 terminal_outcome: helper.terminal_outcome,
367 attempt_count: helper.attempt_count,
368 recovery_lane: helper.recovery_lane,
369 };
370 Ok(StoredInputState { state, seed })
371 }
372}
373
374#[cfg(test)]
375#[allow(clippy::unwrap_used)]
376mod tests {
377 use super::*;
378 use crate::policy::{
379 ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
380 };
381 use meerkat_core::ops::{OpEvent, OperationId};
382
383 #[test]
384 fn new_accepted_starts_with_no_shell_history() {
385 let id = InputId::new();
386 let state = InputState::new_accepted(id.clone());
387 assert_eq!(state.input_id, id);
388 assert!(state.history.is_empty());
389 }
390
391 #[test]
392 fn seed_new_accepted_defaults_match_queue_lifecycle() {
393 let seed = InputStateSeed::new_accepted();
394 assert_eq!(seed.phase, InputLifecycleState::Accepted);
395 assert!(seed.last_run_id.is_none());
396 assert!(seed.last_boundary_sequence.is_none());
397 assert!(seed.admission_sequence.is_none());
398 assert!(seed.terminal_outcome.is_none());
399 assert_eq!(seed.attempt_count, 0);
400 }
401
402 #[test]
403 fn lifecycle_state_serde() {
404 for state in [
405 InputLifecycleState::Accepted,
406 InputLifecycleState::Queued,
407 InputLifecycleState::Staged,
408 InputLifecycleState::Applied,
409 InputLifecycleState::AppliedPendingConsumption,
410 InputLifecycleState::Consumed,
411 InputLifecycleState::Superseded,
412 InputLifecycleState::Coalesced,
413 InputLifecycleState::Abandoned,
414 ] {
415 let json = serde_json::to_value(state).unwrap();
416 let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
417 assert_eq!(state, parsed);
418 }
419 }
420
421 #[test]
422 fn stored_input_state_serde_roundtrip_preserves_fields() {
423 let mut state = InputState::new_accepted(InputId::new());
424 let policy = PolicyDecision {
425 apply_mode: ApplyMode::StageRunStart,
426 wake_mode: WakeMode::WakeIfIdle,
427 queue_mode: QueueMode::Fifo,
428 consume_point: ConsumePoint::OnRunComplete,
429 drain_policy: DrainPolicy::QueueNextTurn,
430 routing_disposition: RoutingDisposition::Queue,
431 record_transcript: true,
432 emit_operator_content: true,
433 policy_version: PolicyVersion(1),
434 };
435 state.policy = Some(PolicySnapshot {
436 version: PolicyVersion(1),
437 decision: policy.clone(),
438 });
439 state.runtime_semantics = Some(
440 crate::policy_table::generated_admission_projection_for_kind(
441 crate::identifiers::KindId::new(crate::identifiers::InputKind::Prompt),
442 true,
443 )
444 .expect("generated admission projection")
445 .runtime_semantics,
446 );
447 state.history.push(InputStateHistoryEntry {
448 timestamp: state.updated_at,
449 from: InputLifecycleState::Accepted,
450 to: InputLifecycleState::Queued,
451 reason: Some("QueueAccepted".into()),
452 });
453 let bundle = StoredInputState {
454 state,
455 seed: InputStateSeed {
456 phase: InputLifecycleState::Queued,
457 last_run_id: None,
458 last_boundary_sequence: None,
459 admission_sequence: Some(42),
460 terminal_outcome: None,
461 attempt_count: 0,
462 recovery_lane: Some(HandlingMode::Queue),
463 },
464 };
465
466 let json = serde_json::to_value(&bundle).unwrap();
467 let parsed: StoredInputState = serde_json::from_value(json).unwrap();
468 assert_eq!(parsed.state.input_id, bundle.state.input_id);
469 assert_eq!(parsed.seed.phase, bundle.seed.phase);
470 assert_eq!(
471 parsed.seed.admission_sequence,
472 bundle.seed.admission_sequence
473 );
474 assert_eq!(parsed.seed.recovery_lane, bundle.seed.recovery_lane);
475 assert_eq!(
476 parsed.state.runtime_semantics,
477 bundle.state.runtime_semantics
478 );
479 assert_eq!(parsed.state.history.len(), 1);
480 }
481
482 #[test]
483 fn stored_input_state_rejects_legacy_persisted_input_tags() {
484 let continuation_bundle = StoredInputState {
488 state: InputState {
489 persisted_input: Some(Input::Continuation(
490 crate::input::ContinuationInput::detached_background_op_completed(),
491 )),
492 ..InputState::new_accepted(InputId::new())
493 },
494 seed: InputStateSeed::new_accepted(),
495 };
496 let mut continuation_json = serde_json::to_value(&continuation_bundle).unwrap();
497 continuation_json["persisted_input"]["input_type"] =
498 serde_json::Value::String("system_generated".into());
499 serde_json::from_value::<StoredInputState>(continuation_json)
500 .expect_err("legacy system_generated persisted input tag must be rejected");
501
502 let operation_bundle = StoredInputState {
503 state: InputState {
504 persisted_input: Some(Input::Operation(crate::input::OperationInput {
505 header: crate::input::InputHeader {
506 id: InputId::new(),
507 timestamp: Utc::now(),
508 source: crate::input::InputOrigin::System,
509 durability: crate::input::InputDurability::Derived,
510 visibility: crate::input::InputVisibility::default(),
511 idempotency_key: None,
512 supersession_key: None,
513 correlation_id: None,
514 },
515 operation_id: OperationId::new(),
516 event: OpEvent::Cancelled {
517 id: OperationId::new(),
518 },
519 })),
520 ..InputState::new_accepted(InputId::new())
521 },
522 seed: InputStateSeed::new_accepted(),
523 };
524 let mut operation_json = serde_json::to_value(&operation_bundle).unwrap();
525 operation_json["persisted_input"]["input_type"] =
526 serde_json::Value::String("projected".into());
527 serde_json::from_value::<StoredInputState>(operation_json)
528 .expect_err("legacy projected persisted input tag must be rejected");
529 }
530
531 #[test]
532 fn stored_input_state_rejects_legacy_dual_carrier_persisted_input_shape() {
533 let bundle = StoredInputState {
537 state: InputState {
538 persisted_input: Some(Input::Prompt(crate::input::PromptInput::new("hello", None))),
539 ..InputState::new_accepted(InputId::new())
540 },
541 seed: InputStateSeed::new_accepted(),
542 };
543 let mut json = serde_json::to_value(&bundle).unwrap();
544 let persisted = json["persisted_input"]
545 .as_object_mut()
546 .expect("persisted_input object");
547 persisted.remove("content");
548 persisted.insert("text".into(), serde_json::Value::String("hello".into()));
549 persisted.insert("blocks".into(), serde_json::Value::Null);
550 serde_json::from_value::<StoredInputState>(json)
551 .expect_err("legacy text+blocks persisted prompt shape must be rejected");
552 }
553
554 #[test]
555 fn abandon_reason_serde() {
556 for reason in [
557 InputAbandonReason::Retired,
558 InputAbandonReason::Reset,
559 InputAbandonReason::Destroyed,
560 InputAbandonReason::Cancelled,
561 ] {
562 let json = serde_json::to_value(&reason).unwrap();
563 let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
564 assert_eq!(reason, parsed);
565 }
566 }
567
568 #[test]
569 fn terminal_outcome_consumed_serde() {
570 let outcome = InputTerminalOutcome::Consumed;
571 let json = serde_json::to_value(&outcome).unwrap();
572 assert_eq!(json["outcome_type"], "consumed");
573 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
574 assert_eq!(outcome, parsed);
575 }
576
577 #[test]
578 fn terminal_outcome_superseded_serde() {
579 let outcome = InputTerminalOutcome::Superseded {
580 superseded_by: InputId::new(),
581 };
582 let json = serde_json::to_value(&outcome).unwrap();
583 assert_eq!(json["outcome_type"], "superseded");
584 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
585 assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
586 }
587
588 #[test]
589 fn terminal_outcome_abandoned_serde() {
590 let outcome = InputTerminalOutcome::Abandoned {
591 reason: InputAbandonReason::Retired,
592 };
593 let json = serde_json::to_value(&outcome).unwrap();
594 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
595 assert!(matches!(
596 parsed,
597 InputTerminalOutcome::Abandoned {
598 reason: InputAbandonReason::Retired,
599 }
600 ));
601 }
602
603 #[test]
604 fn reconstruction_source_serde() {
605 let sources = vec![
606 ReconstructionSource::Projection {
607 rule_id: "rule-1".into(),
608 source_event_id: "evt-1".into(),
609 },
610 ReconstructionSource::Coalescing {
611 source_input_ids: vec![InputId::new(), InputId::new()],
612 },
613 ];
614 for source in sources {
615 let json = serde_json::to_value(&source).unwrap();
616 assert!(json["source_type"].is_string());
617 let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
618 let _ = parsed;
619 }
620 }
621
622 #[test]
623 fn input_state_event_serde() {
624 let event = InputStateEvent {
625 timestamp: Utc::now(),
626 state: InputLifecycleState::Queued,
627 detail: Some("queued for processing".into()),
628 };
629 let json = serde_json::to_value(&event).unwrap();
630 let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
631 assert_eq!(parsed.state, InputLifecycleState::Queued);
632 }
633}