use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::types::HandlingMode;
use serde::{Deserialize, Serialize};
use crate::identifiers::PolicyVersion;
use crate::ingress_types::RuntimeInputSemantics;
use crate::input::Input;
use crate::policy::PolicyDecision;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputLifecycleState {
Accepted,
Queued,
Staged,
Applied,
AppliedPendingConsumption,
Consumed,
Superseded,
Coalesced,
Abandoned,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputAbandonReason {
Retired,
Reset,
Stopped,
Destroyed,
Cancelled,
MaxAttemptsExhausted { attempts: u32 },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "outcome_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputTerminalOutcome {
Consumed,
Superseded { superseded_by: InputId },
Coalesced { aggregate_id: InputId },
Abandoned { reason: InputAbandonReason },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputStateHistoryEntry {
pub timestamp: DateTime<Utc>,
pub from: InputLifecycleState,
pub to: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicySnapshot {
pub version: PolicyVersion,
pub decision: PolicyDecision,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "source_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum ReconstructionSource {
Projection {
rule_id: String,
source_event_id: String,
},
Coalescing {
source_input_ids: Vec<InputId>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputStateEvent {
pub timestamp: DateTime<Utc>,
pub state: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputStateSeed {
pub phase: InputLifecycleState,
pub last_run_id: Option<RunId>,
pub last_boundary_sequence: Option<u64>,
pub admission_sequence: Option<u64>,
pub terminal_outcome: Option<InputTerminalOutcome>,
pub attempt_count: u32,
pub recovery_lane: Option<HandlingMode>,
}
impl InputStateSeed {
pub fn new_accepted() -> Self {
Self {
phase: InputLifecycleState::Accepted,
last_run_id: None,
last_boundary_sequence: None,
admission_sequence: None,
terminal_outcome: None,
attempt_count: 0,
recovery_lane: None,
}
}
}
#[derive(Debug, Clone)]
pub struct StoredInputState {
pub state: InputState,
pub seed: InputStateSeed,
}
impl StoredInputState {
pub fn new_accepted(input_id: InputId) -> Self {
Self {
state: InputState::new_accepted(input_id),
seed: InputStateSeed::new_accepted(),
}
}
}
#[derive(Debug, Clone)]
pub struct InputStatePersistenceRecord {
bundle: StoredInputState,
}
impl InputStatePersistenceRecord {
pub(crate) fn from_machine_snapshot(bundle: StoredInputState) -> Result<Self, String> {
crate::meerkat_machine::authorize_stored_input_state_seed(
&bundle.state.input_id,
&bundle.seed,
)?;
Ok(Self { bundle })
}
pub fn as_stored(&self) -> &StoredInputState {
&self.bundle
}
pub fn clone_stored(&self) -> StoredInputState {
self.bundle.clone()
}
pub fn into_stored(self) -> StoredInputState {
self.bundle
}
}
#[derive(Debug, Clone)]
pub struct InputState {
pub input_id: InputId,
pub history: Vec<InputStateHistoryEntry>,
pub updated_at: DateTime<Utc>,
pub policy: Option<PolicySnapshot>,
pub runtime_semantics: Option<RuntimeInputSemantics>,
pub durability: Option<crate::input::InputDurability>,
pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
pub recovery_count: u32,
pub reconstruction_source: Option<ReconstructionSource>,
pub persisted_input: Option<Input>,
pub created_at: DateTime<Utc>,
}
impl InputState {
pub fn new_accepted(input_id: InputId) -> Self {
let now = Utc::now();
Self {
input_id,
history: Vec::new(),
updated_at: now,
policy: None,
runtime_semantics: None,
durability: None,
idempotency_key: None,
recovery_count: 0,
reconstruction_source: None,
persisted_input: None,
created_at: now,
}
}
pub fn history(&self) -> &[InputStateHistoryEntry] {
&self.history
}
pub fn updated_at(&self) -> DateTime<Utc> {
self.updated_at
}
}
#[derive(Serialize, Deserialize)]
struct InputStateSerde {
stored_input_state_version: u32,
input_id: InputId,
current_state: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
policy: Option<PolicySnapshot>,
#[serde(default, skip_serializing_if = "Option::is_none")]
runtime_semantics: Option<RuntimeInputSemantics>,
#[serde(skip_serializing_if = "Option::is_none")]
terminal_outcome: Option<InputTerminalOutcome>,
#[serde(skip_serializing_if = "Option::is_none")]
durability: Option<crate::input::InputDurability>,
#[serde(skip_serializing_if = "Option::is_none")]
idempotency_key: Option<crate::identifiers::IdempotencyKey>,
#[serde(default)]
attempt_count: u32,
#[serde(default)]
recovery_count: u32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
history: Vec<InputStateHistoryEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
reconstruction_source: Option<ReconstructionSource>,
#[serde(default, skip_serializing_if = "Option::is_none")]
persisted_input: Option<Input>,
#[serde(default, skip_serializing_if = "Option::is_none")]
last_run_id: Option<RunId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
last_boundary_sequence: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
admission_sequence: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
recovery_lane: Option<HandlingMode>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
impl Serialize for StoredInputState {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let helper = InputStateSerde {
stored_input_state_version:
meerkat_core::generated::session_persistence_version_authority::stored_input_state_version(
),
input_id: self.state.input_id.clone(),
current_state: self.seed.phase,
policy: self.state.policy.clone(),
runtime_semantics: self.state.runtime_semantics,
terminal_outcome: self.seed.terminal_outcome.clone(),
durability: self.state.durability,
idempotency_key: self.state.idempotency_key.clone(),
attempt_count: self.seed.attempt_count,
recovery_count: self.state.recovery_count,
history: self.state.history.clone(),
reconstruction_source: self.state.reconstruction_source.clone(),
persisted_input: self.state.persisted_input.clone(),
last_run_id: self.seed.last_run_id.clone(),
last_boundary_sequence: self.seed.last_boundary_sequence,
admission_sequence: self.seed.admission_sequence,
recovery_lane: self.seed.recovery_lane,
created_at: self.state.created_at,
updated_at: self.state.updated_at,
};
helper.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for StoredInputState {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let helper = InputStateSerde::deserialize(deserializer)?;
let _stored_input_state_version =
meerkat_core::generated::session_persistence_version_authority::restore_stored_input_state_version(
helper.stored_input_state_version,
)
.map_err(<D::Error as serde::de::Error>::custom)?;
let state = InputState {
input_id: helper.input_id,
history: helper.history,
updated_at: helper.updated_at,
policy: helper.policy,
runtime_semantics: helper.runtime_semantics,
durability: helper.durability,
idempotency_key: helper.idempotency_key,
recovery_count: helper.recovery_count,
reconstruction_source: helper.reconstruction_source,
persisted_input: helper.persisted_input,
created_at: helper.created_at,
};
let seed = InputStateSeed {
phase: helper.current_state,
last_run_id: helper.last_run_id,
last_boundary_sequence: helper.last_boundary_sequence,
admission_sequence: helper.admission_sequence,
terminal_outcome: helper.terminal_outcome,
attempt_count: helper.attempt_count,
recovery_lane: helper.recovery_lane,
};
Ok(StoredInputState { state, seed })
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::policy::{
ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
};
use meerkat_core::ops::{OpEvent, OperationId};
#[test]
fn new_accepted_starts_with_no_shell_history() {
let id = InputId::new();
let state = InputState::new_accepted(id.clone());
assert_eq!(state.input_id, id);
assert!(state.history.is_empty());
}
#[test]
fn seed_new_accepted_defaults_match_queue_lifecycle() {
let seed = InputStateSeed::new_accepted();
assert_eq!(seed.phase, InputLifecycleState::Accepted);
assert!(seed.last_run_id.is_none());
assert!(seed.last_boundary_sequence.is_none());
assert!(seed.admission_sequence.is_none());
assert!(seed.terminal_outcome.is_none());
assert_eq!(seed.attempt_count, 0);
}
#[test]
fn lifecycle_state_serde() {
for state in [
InputLifecycleState::Accepted,
InputLifecycleState::Queued,
InputLifecycleState::Staged,
InputLifecycleState::Applied,
InputLifecycleState::AppliedPendingConsumption,
InputLifecycleState::Consumed,
InputLifecycleState::Superseded,
InputLifecycleState::Coalesced,
InputLifecycleState::Abandoned,
] {
let json = serde_json::to_value(state).unwrap();
let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
assert_eq!(state, parsed);
}
}
#[test]
fn stored_input_state_serde_roundtrip_preserves_fields() {
let mut state = InputState::new_accepted(InputId::new());
let policy = PolicyDecision {
apply_mode: ApplyMode::StageRunStart,
wake_mode: WakeMode::WakeIfIdle,
queue_mode: QueueMode::Fifo,
consume_point: ConsumePoint::OnRunComplete,
drain_policy: DrainPolicy::QueueNextTurn,
routing_disposition: RoutingDisposition::Queue,
record_transcript: true,
emit_operator_content: true,
policy_version: PolicyVersion(1),
};
state.policy = Some(PolicySnapshot {
version: PolicyVersion(1),
decision: policy.clone(),
});
state.runtime_semantics = Some(
crate::policy_table::generated_admission_projection_for_kind(
crate::identifiers::KindId::new(crate::identifiers::InputKind::Prompt),
true,
)
.expect("generated admission projection")
.runtime_semantics,
);
state.history.push(InputStateHistoryEntry {
timestamp: state.updated_at,
from: InputLifecycleState::Accepted,
to: InputLifecycleState::Queued,
reason: Some("QueueAccepted".into()),
});
let bundle = StoredInputState {
state,
seed: InputStateSeed {
phase: InputLifecycleState::Queued,
last_run_id: None,
last_boundary_sequence: None,
admission_sequence: Some(42),
terminal_outcome: None,
attempt_count: 0,
recovery_lane: Some(HandlingMode::Queue),
},
};
let json = serde_json::to_value(&bundle).unwrap();
let parsed: StoredInputState = serde_json::from_value(json).unwrap();
assert_eq!(parsed.state.input_id, bundle.state.input_id);
assert_eq!(parsed.seed.phase, bundle.seed.phase);
assert_eq!(
parsed.seed.admission_sequence,
bundle.seed.admission_sequence
);
assert_eq!(parsed.seed.recovery_lane, bundle.seed.recovery_lane);
assert_eq!(
parsed.state.runtime_semantics,
bundle.state.runtime_semantics
);
assert_eq!(parsed.state.history.len(), 1);
}
#[test]
fn stored_input_state_rejects_legacy_persisted_input_tags() {
let continuation_bundle = StoredInputState {
state: InputState {
persisted_input: Some(Input::Continuation(
crate::input::ContinuationInput::detached_background_op_completed(),
)),
..InputState::new_accepted(InputId::new())
},
seed: InputStateSeed::new_accepted(),
};
let mut continuation_json = serde_json::to_value(&continuation_bundle).unwrap();
continuation_json["persisted_input"]["input_type"] =
serde_json::Value::String("system_generated".into());
serde_json::from_value::<StoredInputState>(continuation_json)
.expect_err("legacy system_generated persisted input tag must be rejected");
let operation_bundle = StoredInputState {
state: InputState {
persisted_input: Some(Input::Operation(crate::input::OperationInput {
header: crate::input::InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: crate::input::InputOrigin::System,
durability: crate::input::InputDurability::Derived,
visibility: crate::input::InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
operation_id: OperationId::new(),
event: OpEvent::Cancelled {
id: OperationId::new(),
},
})),
..InputState::new_accepted(InputId::new())
},
seed: InputStateSeed::new_accepted(),
};
let mut operation_json = serde_json::to_value(&operation_bundle).unwrap();
operation_json["persisted_input"]["input_type"] =
serde_json::Value::String("projected".into());
serde_json::from_value::<StoredInputState>(operation_json)
.expect_err("legacy projected persisted input tag must be rejected");
}
#[test]
fn stored_input_state_rejects_legacy_dual_carrier_persisted_input_shape() {
let bundle = StoredInputState {
state: InputState {
persisted_input: Some(Input::Prompt(crate::input::PromptInput::new("hello", None))),
..InputState::new_accepted(InputId::new())
},
seed: InputStateSeed::new_accepted(),
};
let mut json = serde_json::to_value(&bundle).unwrap();
let persisted = json["persisted_input"]
.as_object_mut()
.expect("persisted_input object");
persisted.remove("content");
persisted.insert("text".into(), serde_json::Value::String("hello".into()));
persisted.insert("blocks".into(), serde_json::Value::Null);
serde_json::from_value::<StoredInputState>(json)
.expect_err("legacy text+blocks persisted prompt shape must be rejected");
}
#[test]
fn abandon_reason_serde() {
for reason in [
InputAbandonReason::Retired,
InputAbandonReason::Reset,
InputAbandonReason::Destroyed,
InputAbandonReason::Cancelled,
] {
let json = serde_json::to_value(&reason).unwrap();
let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
assert_eq!(reason, parsed);
}
}
#[test]
fn terminal_outcome_consumed_serde() {
let outcome = InputTerminalOutcome::Consumed;
let json = serde_json::to_value(&outcome).unwrap();
assert_eq!(json["outcome_type"], "consumed");
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert_eq!(outcome, parsed);
}
#[test]
fn terminal_outcome_superseded_serde() {
let outcome = InputTerminalOutcome::Superseded {
superseded_by: InputId::new(),
};
let json = serde_json::to_value(&outcome).unwrap();
assert_eq!(json["outcome_type"], "superseded");
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
}
#[test]
fn terminal_outcome_abandoned_serde() {
let outcome = InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Retired,
};
let json = serde_json::to_value(&outcome).unwrap();
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Retired,
}
));
}
#[test]
fn reconstruction_source_serde() {
let sources = vec![
ReconstructionSource::Projection {
rule_id: "rule-1".into(),
source_event_id: "evt-1".into(),
},
ReconstructionSource::Coalescing {
source_input_ids: vec![InputId::new(), InputId::new()],
},
];
for source in sources {
let json = serde_json::to_value(&source).unwrap();
assert!(json["source_type"].is_string());
let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
let _ = parsed;
}
}
#[test]
fn input_state_event_serde() {
let event = InputStateEvent {
timestamp: Utc::now(),
state: InputLifecycleState::Queued,
detail: Some("queued for processing".into()),
};
let json = serde_json::to_value(&event).unwrap();
let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
assert_eq!(parsed.state, InputLifecycleState::Queued);
}
}