1use chrono::{DateTime, Utc};
16use meerkat_core::lifecycle::{InputId, RunId};
17use serde::{Deserialize, Serialize};
18
19use crate::identifiers::PolicyVersion;
20use crate::ingress_types::RuntimeInputSemantics;
21use crate::input::Input;
22use crate::policy::PolicyDecision;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27#[non_exhaustive]
28pub enum InputLifecycleState {
29 Accepted,
30 Queued,
31 Staged,
32 Applied,
33 AppliedPendingConsumption,
34 Consumed,
35 Superseded,
36 Coalesced,
37 Abandoned,
38}
39
40impl InputLifecycleState {
41 pub fn is_terminal(&self) -> bool {
42 matches!(
43 self,
44 Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
45 )
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[non_exhaustive]
53pub enum InputAbandonReason {
54 Retired,
55 Reset,
56 Stopped,
57 Destroyed,
58 Cancelled,
59 MaxAttemptsExhausted { attempts: u32 },
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(tag = "outcome_type", rename_all = "snake_case")]
68#[non_exhaustive]
69pub enum InputTerminalOutcome {
70 Consumed,
71 Superseded { superseded_by: InputId },
72 Coalesced { aggregate_id: InputId },
73 Abandoned { reason: InputAbandonReason },
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct InputStateHistoryEntry {
79 pub timestamp: DateTime<Utc>,
80 pub from: InputLifecycleState,
81 pub to: InputLifecycleState,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub reason: Option<String>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PolicySnapshot {
89 pub version: PolicyVersion,
90 pub decision: PolicyDecision,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(tag = "source_type", rename_all = "snake_case")]
96#[non_exhaustive]
97pub enum ReconstructionSource {
98 Projection {
99 rule_id: String,
100 source_event_id: String,
101 },
102 Coalescing {
103 source_input_ids: Vec<InputId>,
104 },
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct InputStateEvent {
110 pub timestamp: DateTime<Utc>,
111 pub state: InputLifecycleState,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub detail: Option<String>,
114}
115
116pub const MAX_STAGE_ATTEMPTS: u32 = 3;
120
121#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct InputStateSeed {
133 pub phase: InputLifecycleState,
134 pub last_run_id: Option<RunId>,
135 pub last_boundary_sequence: Option<u64>,
136 pub terminal_outcome: Option<InputTerminalOutcome>,
137 pub attempt_count: u32,
138}
139
140impl InputStateSeed {
141 pub fn new_accepted() -> Self {
144 Self {
145 phase: InputLifecycleState::Accepted,
146 last_run_id: None,
147 last_boundary_sequence: None,
148 terminal_outcome: None,
149 attempt_count: 0,
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
161pub struct StoredInputState {
162 pub state: InputState,
163 pub seed: InputStateSeed,
164}
165
166impl StoredInputState {
167 pub fn new_accepted(input_id: InputId) -> Self {
169 Self {
170 state: InputState::new_accepted(input_id),
171 seed: InputStateSeed::new_accepted(),
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
186pub struct InputState {
187 pub input_id: InputId,
188 pub terminal_outcome: Option<InputTerminalOutcome>,
190 pub attempt_count: u32,
192 pub history: Vec<InputStateHistoryEntry>,
193 pub updated_at: DateTime<Utc>,
194 pub policy: Option<PolicySnapshot>,
195 pub runtime_semantics: Option<RuntimeInputSemantics>,
198 pub durability: Option<crate::input::InputDurability>,
199 pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
200 pub recovery_count: u32,
201 pub reconstruction_source: Option<ReconstructionSource>,
202 pub persisted_input: Option<Input>,
203 pub created_at: DateTime<Utc>,
204}
205
206impl InputState {
207 pub fn new_accepted(input_id: InputId) -> Self {
211 let now = Utc::now();
212 Self {
213 input_id,
214 terminal_outcome: None,
215 attempt_count: 0,
216 history: Vec::new(),
217 updated_at: now,
218 policy: None,
219 runtime_semantics: None,
220 durability: None,
221 idempotency_key: None,
222 recovery_count: 0,
223 reconstruction_source: None,
224 persisted_input: None,
225 created_at: now,
226 }
227 }
228
229 pub fn is_terminal(&self) -> bool {
230 self.terminal_outcome.is_some()
231 }
232
233 pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
234 self.terminal_outcome.as_ref()
235 }
236
237 pub fn history(&self) -> &[InputStateHistoryEntry] {
238 &self.history
239 }
240
241 pub fn updated_at(&self) -> DateTime<Utc> {
242 self.updated_at
243 }
244
245 pub fn attempt_count(&self) -> u32 {
246 self.attempt_count
247 }
248}
249
250#[derive(Serialize, Deserialize)]
263struct InputStateSerde {
264 input_id: InputId,
265 current_state: InputLifecycleState,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 policy: Option<PolicySnapshot>,
268 #[serde(default, skip_serializing_if = "Option::is_none")]
269 runtime_semantics: Option<RuntimeInputSemantics>,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 terminal_outcome: Option<InputTerminalOutcome>,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 durability: Option<crate::input::InputDurability>,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 idempotency_key: Option<crate::identifiers::IdempotencyKey>,
276 #[serde(default)]
277 attempt_count: u32,
278 #[serde(default)]
279 recovery_count: u32,
280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
281 history: Vec<InputStateHistoryEntry>,
282 #[serde(skip_serializing_if = "Option::is_none")]
283 reconstruction_source: Option<ReconstructionSource>,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 persisted_input: Option<Input>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 last_run_id: Option<RunId>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 last_boundary_sequence: Option<u64>,
290 created_at: DateTime<Utc>,
291 updated_at: DateTime<Utc>,
292}
293
294impl Serialize for StoredInputState {
295 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
296 let helper = InputStateSerde {
297 input_id: self.state.input_id.clone(),
298 current_state: self.seed.phase,
299 policy: self.state.policy.clone(),
300 runtime_semantics: self.state.runtime_semantics,
301 terminal_outcome: self.seed.terminal_outcome.clone(),
302 durability: self.state.durability,
303 idempotency_key: self.state.idempotency_key.clone(),
304 attempt_count: self.seed.attempt_count,
305 recovery_count: self.state.recovery_count,
306 history: self.state.history.clone(),
307 reconstruction_source: self.state.reconstruction_source.clone(),
308 persisted_input: self.state.persisted_input.clone(),
309 last_run_id: self.seed.last_run_id.clone(),
310 last_boundary_sequence: self.seed.last_boundary_sequence,
311 created_at: self.state.created_at,
312 updated_at: self.state.updated_at,
313 };
314 helper.serialize(serializer)
315 }
316}
317
318impl<'de> Deserialize<'de> for StoredInputState {
319 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
320 let helper = InputStateSerde::deserialize(deserializer)?;
321 let state = InputState {
322 input_id: helper.input_id,
323 terminal_outcome: helper.terminal_outcome.clone(),
324 attempt_count: helper.attempt_count,
325 history: helper.history,
326 updated_at: helper.updated_at,
327 policy: helper.policy,
328 runtime_semantics: helper.runtime_semantics,
329 durability: helper.durability,
330 idempotency_key: helper.idempotency_key,
331 recovery_count: helper.recovery_count,
332 reconstruction_source: helper.reconstruction_source,
333 persisted_input: helper.persisted_input,
334 created_at: helper.created_at,
335 };
336 let seed = InputStateSeed {
337 phase: helper.current_state,
338 last_run_id: helper.last_run_id,
339 last_boundary_sequence: helper.last_boundary_sequence,
340 terminal_outcome: helper.terminal_outcome,
341 attempt_count: helper.attempt_count,
342 };
343 Ok(StoredInputState { state, seed })
344 }
345}
346
347#[cfg(test)]
348#[allow(clippy::unwrap_used)]
349mod tests {
350 use super::*;
351 use crate::policy::{
352 ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
353 };
354 use meerkat_core::ops::{OpEvent, OperationId};
355
356 #[test]
357 fn new_accepted_starts_with_no_shell_history() {
358 let id = InputId::new();
359 let state = InputState::new_accepted(id.clone());
360 assert_eq!(state.input_id, id);
361 assert!(state.history.is_empty());
362 }
363
364 #[test]
365 fn seed_new_accepted_defaults_match_queue_lifecycle() {
366 let seed = InputStateSeed::new_accepted();
367 assert_eq!(seed.phase, InputLifecycleState::Accepted);
368 assert!(seed.last_run_id.is_none());
369 assert!(seed.last_boundary_sequence.is_none());
370 assert!(seed.terminal_outcome.is_none());
371 assert_eq!(seed.attempt_count, 0);
372 }
373
374 #[test]
375 fn lifecycle_state_terminal() {
376 assert!(InputLifecycleState::Consumed.is_terminal());
377 assert!(InputLifecycleState::Superseded.is_terminal());
378 assert!(InputLifecycleState::Coalesced.is_terminal());
379 assert!(InputLifecycleState::Abandoned.is_terminal());
380
381 assert!(!InputLifecycleState::Accepted.is_terminal());
382 assert!(!InputLifecycleState::Queued.is_terminal());
383 assert!(!InputLifecycleState::Staged.is_terminal());
384 assert!(!InputLifecycleState::Applied.is_terminal());
385 assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
386 }
387
388 #[test]
389 fn lifecycle_state_serde() {
390 for state in [
391 InputLifecycleState::Accepted,
392 InputLifecycleState::Queued,
393 InputLifecycleState::Staged,
394 InputLifecycleState::Applied,
395 InputLifecycleState::AppliedPendingConsumption,
396 InputLifecycleState::Consumed,
397 InputLifecycleState::Superseded,
398 InputLifecycleState::Coalesced,
399 InputLifecycleState::Abandoned,
400 ] {
401 let json = serde_json::to_value(state).unwrap();
402 let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
403 assert_eq!(state, parsed);
404 }
405 }
406
407 #[test]
408 fn stored_input_state_serde_roundtrip_preserves_fields() {
409 let mut state = InputState::new_accepted(InputId::new());
410 let policy = PolicyDecision {
411 apply_mode: ApplyMode::StageRunStart,
412 wake_mode: WakeMode::WakeIfIdle,
413 queue_mode: QueueMode::Fifo,
414 consume_point: ConsumePoint::OnRunComplete,
415 drain_policy: DrainPolicy::QueueNextTurn,
416 routing_disposition: RoutingDisposition::Queue,
417 record_transcript: true,
418 emit_operator_content: true,
419 policy_version: PolicyVersion(1),
420 };
421 state.policy = Some(PolicySnapshot {
422 version: PolicyVersion(1),
423 decision: policy.clone(),
424 });
425 state.runtime_semantics = Some(RuntimeInputSemantics::from_policy_and_kind(
426 &policy,
427 crate::identifiers::InputKind::Prompt,
428 ));
429 state.history.push(InputStateHistoryEntry {
430 timestamp: state.updated_at,
431 from: InputLifecycleState::Accepted,
432 to: InputLifecycleState::Queued,
433 reason: Some("QueueAccepted".into()),
434 });
435 let bundle = StoredInputState {
436 state,
437 seed: InputStateSeed {
438 phase: InputLifecycleState::Queued,
439 last_run_id: None,
440 last_boundary_sequence: None,
441 terminal_outcome: None,
442 attempt_count: 0,
443 },
444 };
445
446 let json = serde_json::to_value(&bundle).unwrap();
447 let parsed: StoredInputState = serde_json::from_value(json).unwrap();
448 assert_eq!(parsed.state.input_id, bundle.state.input_id);
449 assert_eq!(parsed.seed.phase, bundle.seed.phase);
450 assert_eq!(
451 parsed.state.runtime_semantics,
452 bundle.state.runtime_semantics
453 );
454 assert_eq!(parsed.state.history.len(), 1);
455 }
456
457 #[test]
458 fn stored_input_state_deserializes_legacy_persisted_input_tags() {
459 let continuation_bundle = StoredInputState {
460 state: InputState {
461 persisted_input: Some(Input::Continuation(
462 crate::input::ContinuationInput::detached_background_op_completed(),
463 )),
464 ..InputState::new_accepted(InputId::new())
465 },
466 seed: InputStateSeed::new_accepted(),
467 };
468 let mut continuation_json = serde_json::to_value(&continuation_bundle).unwrap();
469 continuation_json["persisted_input"]["input_type"] =
470 serde_json::Value::String("system_generated".into());
471 let parsed: StoredInputState = serde_json::from_value(continuation_json).unwrap();
472 assert!(matches!(
473 parsed.state.persisted_input,
474 Some(Input::Continuation(_))
475 ));
476
477 let operation_bundle = StoredInputState {
478 state: InputState {
479 persisted_input: Some(Input::Operation(crate::input::OperationInput {
480 header: crate::input::InputHeader {
481 id: InputId::new(),
482 timestamp: Utc::now(),
483 source: crate::input::InputOrigin::System,
484 durability: crate::input::InputDurability::Derived,
485 visibility: crate::input::InputVisibility::default(),
486 idempotency_key: None,
487 supersession_key: None,
488 correlation_id: None,
489 },
490 operation_id: OperationId::new(),
491 event: OpEvent::Cancelled {
492 id: OperationId::new(),
493 },
494 })),
495 ..InputState::new_accepted(InputId::new())
496 },
497 seed: InputStateSeed::new_accepted(),
498 };
499 let mut operation_json = serde_json::to_value(&operation_bundle).unwrap();
500 operation_json["persisted_input"]["input_type"] =
501 serde_json::Value::String("projected".into());
502 let parsed: StoredInputState = serde_json::from_value(operation_json).unwrap();
503 assert!(matches!(
504 parsed.state.persisted_input,
505 Some(Input::Operation(_))
506 ));
507 }
508
509 #[test]
510 fn abandon_reason_serde() {
511 for reason in [
512 InputAbandonReason::Retired,
513 InputAbandonReason::Reset,
514 InputAbandonReason::Destroyed,
515 InputAbandonReason::Cancelled,
516 ] {
517 let json = serde_json::to_value(&reason).unwrap();
518 let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
519 assert_eq!(reason, parsed);
520 }
521 }
522
523 #[test]
524 fn terminal_outcome_consumed_serde() {
525 let outcome = InputTerminalOutcome::Consumed;
526 let json = serde_json::to_value(&outcome).unwrap();
527 assert_eq!(json["outcome_type"], "consumed");
528 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
529 assert_eq!(outcome, parsed);
530 }
531
532 #[test]
533 fn terminal_outcome_superseded_serde() {
534 let outcome = InputTerminalOutcome::Superseded {
535 superseded_by: InputId::new(),
536 };
537 let json = serde_json::to_value(&outcome).unwrap();
538 assert_eq!(json["outcome_type"], "superseded");
539 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
540 assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
541 }
542
543 #[test]
544 fn terminal_outcome_abandoned_serde() {
545 let outcome = InputTerminalOutcome::Abandoned {
546 reason: InputAbandonReason::Retired,
547 };
548 let json = serde_json::to_value(&outcome).unwrap();
549 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
550 assert!(matches!(
551 parsed,
552 InputTerminalOutcome::Abandoned {
553 reason: InputAbandonReason::Retired,
554 }
555 ));
556 }
557
558 #[test]
559 fn reconstruction_source_serde() {
560 let sources = vec![
561 ReconstructionSource::Projection {
562 rule_id: "rule-1".into(),
563 source_event_id: "evt-1".into(),
564 },
565 ReconstructionSource::Coalescing {
566 source_input_ids: vec![InputId::new(), InputId::new()],
567 },
568 ];
569 for source in sources {
570 let json = serde_json::to_value(&source).unwrap();
571 assert!(json["source_type"].is_string());
572 let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
573 let _ = parsed;
574 }
575 }
576
577 #[test]
578 fn input_state_event_serde() {
579 let event = InputStateEvent {
580 timestamp: Utc::now(),
581 state: InputLifecycleState::Queued,
582 detail: Some("queued for processing".into()),
583 };
584 let json = serde_json::to_value(&event).unwrap();
585 let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
586 assert_eq!(parsed.state, InputLifecycleState::Queued);
587 }
588}