use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::{InputId, RunId};
use serde::{Deserialize, Serialize};
use crate::identifiers::PolicyVersion;
use crate::input::Input;
use crate::policy::PolicyDecision;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputLifecycleState {
Accepted,
Queued,
Staged,
Applied,
AppliedPendingConsumption,
Consumed,
Superseded,
Coalesced,
Abandoned,
}
impl InputLifecycleState {
pub fn is_terminal(&self) -> bool {
matches!(
self,
Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputAbandonReason {
Retired,
Reset,
Destroyed,
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "outcome_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputTerminalOutcome {
Consumed,
Superseded { superseded_by: InputId },
Coalesced { aggregate_id: InputId },
Abandoned { reason: InputAbandonReason },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputStateHistoryEntry {
pub timestamp: DateTime<Utc>,
pub from: InputLifecycleState,
pub to: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicySnapshot {
pub version: PolicyVersion,
pub decision: PolicyDecision,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "source_type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum ReconstructionSource {
Projection {
rule_id: String,
source_event_id: String,
},
Coalescing { source_input_ids: Vec<InputId> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputStateEvent {
pub timestamp: DateTime<Utc>,
pub state: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InputState {
pub input_id: InputId,
pub current_state: InputLifecycleState,
#[serde(skip_serializing_if = "Option::is_none")]
pub policy: Option<PolicySnapshot>,
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_outcome: Option<InputTerminalOutcome>,
#[serde(skip_serializing_if = "Option::is_none")]
pub durability: Option<crate::input::InputDurability>,
#[serde(skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
#[serde(default)]
pub attempt_count: u32,
#[serde(default)]
pub recovery_count: u32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub history: Vec<InputStateHistoryEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reconstruction_source: Option<ReconstructionSource>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub persisted_input: Option<Input>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_id: Option<RunId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_boundary_sequence: Option<u64>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl InputState {
pub fn new_accepted(input_id: InputId) -> Self {
let now = Utc::now();
Self {
input_id,
current_state: InputLifecycleState::Accepted,
policy: None,
terminal_outcome: None,
durability: None,
idempotency_key: None,
attempt_count: 0,
recovery_count: 0,
history: Vec::new(),
reconstruction_source: None,
persisted_input: None,
last_run_id: None,
last_boundary_sequence: None,
created_at: now,
updated_at: now,
}
}
pub fn is_terminal(&self) -> bool {
self.current_state.is_terminal()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::policy::{ApplyMode, ConsumePoint, QueueMode, WakeMode};
#[test]
fn lifecycle_state_terminal() {
assert!(InputLifecycleState::Consumed.is_terminal());
assert!(InputLifecycleState::Superseded.is_terminal());
assert!(InputLifecycleState::Coalesced.is_terminal());
assert!(InputLifecycleState::Abandoned.is_terminal());
assert!(!InputLifecycleState::Accepted.is_terminal());
assert!(!InputLifecycleState::Queued.is_terminal());
assert!(!InputLifecycleState::Staged.is_terminal());
assert!(!InputLifecycleState::Applied.is_terminal());
assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
}
#[test]
fn lifecycle_state_serde() {
for state in [
InputLifecycleState::Accepted,
InputLifecycleState::Queued,
InputLifecycleState::Staged,
InputLifecycleState::Applied,
InputLifecycleState::AppliedPendingConsumption,
InputLifecycleState::Consumed,
InputLifecycleState::Superseded,
InputLifecycleState::Coalesced,
InputLifecycleState::Abandoned,
] {
let json = serde_json::to_value(state).unwrap();
let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
assert_eq!(state, parsed);
}
}
#[test]
fn input_state_new_accepted() {
let id = InputId::new();
let state = InputState::new_accepted(id.clone());
assert_eq!(state.input_id, id);
assert_eq!(state.current_state, InputLifecycleState::Accepted);
assert!(!state.is_terminal());
assert!(state.history.is_empty());
assert!(state.terminal_outcome.is_none());
assert!(state.policy.is_none());
}
#[test]
fn input_state_serde_roundtrip() {
let mut state = InputState::new_accepted(InputId::new());
state.policy = Some(PolicySnapshot {
version: PolicyVersion(1),
decision: PolicyDecision {
apply_mode: ApplyMode::StageRunStart,
wake_mode: WakeMode::WakeIfIdle,
queue_mode: QueueMode::Fifo,
consume_point: ConsumePoint::OnRunComplete,
record_transcript: true,
emit_operator_content: true,
policy_version: PolicyVersion(1),
},
});
state.history.push(InputStateHistoryEntry {
timestamp: Utc::now(),
from: InputLifecycleState::Accepted,
to: InputLifecycleState::Queued,
reason: Some("policy resolved".into()),
});
let json = serde_json::to_value(&state).unwrap();
let parsed: InputState = serde_json::from_value(json).unwrap();
assert_eq!(parsed.input_id, state.input_id);
assert_eq!(parsed.current_state, state.current_state);
assert_eq!(parsed.history.len(), 1);
}
#[test]
fn abandon_reason_serde() {
for reason in [
InputAbandonReason::Retired,
InputAbandonReason::Reset,
InputAbandonReason::Destroyed,
InputAbandonReason::Cancelled,
] {
let json = serde_json::to_value(&reason).unwrap();
let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
assert_eq!(reason, parsed);
}
}
#[test]
fn terminal_outcome_consumed_serde() {
let outcome = InputTerminalOutcome::Consumed;
let json = serde_json::to_value(&outcome).unwrap();
assert_eq!(json["outcome_type"], "consumed");
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert_eq!(outcome, parsed);
}
#[test]
fn terminal_outcome_superseded_serde() {
let outcome = InputTerminalOutcome::Superseded {
superseded_by: InputId::new(),
};
let json = serde_json::to_value(&outcome).unwrap();
assert_eq!(json["outcome_type"], "superseded");
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
}
#[test]
fn terminal_outcome_abandoned_serde() {
let outcome = InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Retired,
};
let json = serde_json::to_value(&outcome).unwrap();
let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::Retired,
}
));
}
#[test]
fn reconstruction_source_serde() {
let sources = vec![
ReconstructionSource::Projection {
rule_id: "rule-1".into(),
source_event_id: "evt-1".into(),
},
ReconstructionSource::Coalescing {
source_input_ids: vec![InputId::new(), InputId::new()],
},
];
for source in sources {
let json = serde_json::to_value(&source).unwrap();
assert!(json["source_type"].is_string());
let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
let _ = parsed;
}
}
#[test]
fn input_state_event_serde() {
let event = InputStateEvent {
timestamp: Utc::now(),
state: InputLifecycleState::Queued,
detail: Some("queued for processing".into()),
};
let json = serde_json::to_value(&event).unwrap();
let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
assert_eq!(parsed.state, InputLifecycleState::Queued);
}
}