1use chrono::{DateTime, Utc};
9use meerkat_core::lifecycle::{InputId, RunId};
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::PolicyVersion;
13use crate::input::Input;
14use crate::input_lifecycle_authority::{
15 InputLifecycleAuthority, InputLifecycleError, InputLifecycleInput, InputLifecycleMutator,
16 InputLifecycleTransition,
17};
18use crate::policy::PolicyDecision;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum InputLifecycleState {
25 Accepted,
27 Queued,
29 Staged,
31 Applied,
33 AppliedPendingConsumption,
35 Consumed,
37 Superseded,
39 Coalesced,
41 Abandoned,
43}
44
45impl InputLifecycleState {
46 pub fn is_terminal(&self) -> bool {
48 matches!(
49 self,
50 Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
51 )
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58#[non_exhaustive]
59pub enum InputAbandonReason {
60 Retired,
62 Reset,
64 Destroyed,
66 Cancelled,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(tag = "outcome_type", rename_all = "snake_case")]
73#[non_exhaustive]
74pub enum InputTerminalOutcome {
75 Consumed,
77 Superseded { superseded_by: InputId },
79 Coalesced { aggregate_id: InputId },
81 Abandoned { reason: InputAbandonReason },
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InputStateHistoryEntry {
88 pub timestamp: DateTime<Utc>,
90 pub from: InputLifecycleState,
92 pub to: InputLifecycleState,
94 #[serde(skip_serializing_if = "Option::is_none")]
96 pub reason: Option<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct PolicySnapshot {
102 pub version: PolicyVersion,
104 pub decision: PolicyDecision,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(tag = "source_type", rename_all = "snake_case")]
111#[non_exhaustive]
112pub enum ReconstructionSource {
113 Projection {
115 rule_id: String,
116 source_event_id: String,
117 },
118 Coalescing { source_input_ids: Vec<InputId> },
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct InputStateEvent {
125 pub timestamp: DateTime<Utc>,
127 pub state: InputLifecycleState,
129 #[serde(skip_serializing_if = "Option::is_none")]
131 pub detail: Option<String>,
132}
133
134#[derive(Debug, Clone)]
144pub struct InputState {
145 pub input_id: InputId,
147 authority: InputLifecycleAuthority,
150 pub policy: Option<PolicySnapshot>,
152 pub durability: Option<crate::input::InputDurability>,
154 pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
156 pub attempt_count: u32,
158 pub recovery_count: u32,
160 pub reconstruction_source: Option<ReconstructionSource>,
162 pub persisted_input: Option<Input>,
164 pub created_at: DateTime<Utc>,
166}
167
168impl InputState {
169 pub fn new_accepted(input_id: InputId) -> Self {
171 let now = Utc::now();
172 Self {
173 input_id,
174 authority: InputLifecycleAuthority::new_at(now),
175 policy: None,
176 durability: None,
177 idempotency_key: None,
178 attempt_count: 0,
179 recovery_count: 0,
180 reconstruction_source: None,
181 persisted_input: None,
182 created_at: now,
183 }
184 }
185
186 pub fn current_state(&self) -> InputLifecycleState {
190 self.authority.phase()
191 }
192
193 pub fn is_terminal(&self) -> bool {
195 self.authority.is_terminal()
196 }
197
198 pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
200 self.authority.terminal_outcome()
201 }
202
203 pub fn history(&self) -> &[InputStateHistoryEntry] {
205 self.authority.history()
206 }
207
208 pub fn last_run_id(&self) -> Option<&RunId> {
210 self.authority.last_run_id()
211 }
212
213 pub fn last_boundary_sequence(&self) -> Option<u64> {
215 self.authority.last_boundary_sequence()
216 }
217
218 pub fn updated_at(&self) -> DateTime<Utc> {
220 self.authority.updated_at()
221 }
222
223 pub fn apply(
229 &mut self,
230 input: InputLifecycleInput,
231 ) -> Result<InputLifecycleTransition, InputLifecycleError> {
232 self.authority.apply(input)
233 }
234
235 pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
237 self.authority.can_accept(input)
238 }
239
240 pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
243 self.authority.set_terminal_outcome(outcome);
244 }
245
246 pub fn authority(&self) -> &InputLifecycleAuthority {
248 &self.authority
249 }
250
251 pub fn authority_mut(&mut self) -> &mut InputLifecycleAuthority {
253 &mut self.authority
254 }
255}
256
257#[derive(Serialize, Deserialize)]
263struct InputStateSerde {
264 input_id: InputId,
265 current_state: InputLifecycleState,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 policy: Option<PolicySnapshot>,
268 #[serde(skip_serializing_if = "Option::is_none")]
269 terminal_outcome: Option<InputTerminalOutcome>,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 durability: Option<crate::input::InputDurability>,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 idempotency_key: Option<crate::identifiers::IdempotencyKey>,
274 #[serde(default)]
275 attempt_count: u32,
276 #[serde(default)]
277 recovery_count: u32,
278 #[serde(default, skip_serializing_if = "Vec::is_empty")]
279 history: Vec<InputStateHistoryEntry>,
280 #[serde(skip_serializing_if = "Option::is_none")]
281 reconstruction_source: Option<ReconstructionSource>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
283 persisted_input: Option<Input>,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 last_run_id: Option<RunId>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 last_boundary_sequence: Option<u64>,
288 created_at: DateTime<Utc>,
289 updated_at: DateTime<Utc>,
290}
291
292impl Serialize for InputState {
293 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294 let helper = InputStateSerde {
295 input_id: self.input_id.clone(),
296 current_state: self.authority.phase(),
297 policy: self.policy.clone(),
298 terminal_outcome: self.authority.terminal_outcome().cloned(),
299 durability: self.durability,
300 idempotency_key: self.idempotency_key.clone(),
301 attempt_count: self.attempt_count,
302 recovery_count: self.recovery_count,
303 history: self.authority.history().to_vec(),
304 reconstruction_source: self.reconstruction_source.clone(),
305 persisted_input: self.persisted_input.clone(),
306 last_run_id: self.authority.last_run_id().cloned(),
307 last_boundary_sequence: self.authority.last_boundary_sequence(),
308 created_at: self.created_at,
309 updated_at: self.authority.updated_at(),
310 };
311 helper.serialize(serializer)
312 }
313}
314
315impl<'de> Deserialize<'de> for InputState {
316 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
317 let helper = InputStateSerde::deserialize(deserializer)?;
318 let authority = InputLifecycleAuthority::restore(
319 helper.current_state,
320 helper.terminal_outcome,
321 helper.last_run_id,
322 helper.last_boundary_sequence,
323 helper.history,
324 helper.updated_at,
325 );
326 Ok(InputState {
327 input_id: helper.input_id,
328 authority,
329 policy: helper.policy,
330 durability: helper.durability,
331 idempotency_key: helper.idempotency_key,
332 attempt_count: helper.attempt_count,
333 recovery_count: helper.recovery_count,
334 reconstruction_source: helper.reconstruction_source,
335 persisted_input: helper.persisted_input,
336 created_at: helper.created_at,
337 })
338 }
339}
340
341#[cfg(test)]
342#[allow(clippy::unwrap_used)]
343mod tests {
344 use super::*;
345 use crate::policy::{
346 ApplyMode, ConsumePoint, DrainPolicy, InterruptPolicy, QueueMode, RoutingDisposition,
347 WakeMode,
348 };
349 use meerkat_core::ops::{OpEvent, OperationId};
350
351 #[test]
352 fn lifecycle_state_terminal() {
353 assert!(InputLifecycleState::Consumed.is_terminal());
354 assert!(InputLifecycleState::Superseded.is_terminal());
355 assert!(InputLifecycleState::Coalesced.is_terminal());
356 assert!(InputLifecycleState::Abandoned.is_terminal());
357
358 assert!(!InputLifecycleState::Accepted.is_terminal());
359 assert!(!InputLifecycleState::Queued.is_terminal());
360 assert!(!InputLifecycleState::Staged.is_terminal());
361 assert!(!InputLifecycleState::Applied.is_terminal());
362 assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
363 }
364
365 #[test]
366 fn lifecycle_state_serde() {
367 for state in [
368 InputLifecycleState::Accepted,
369 InputLifecycleState::Queued,
370 InputLifecycleState::Staged,
371 InputLifecycleState::Applied,
372 InputLifecycleState::AppliedPendingConsumption,
373 InputLifecycleState::Consumed,
374 InputLifecycleState::Superseded,
375 InputLifecycleState::Coalesced,
376 InputLifecycleState::Abandoned,
377 ] {
378 let json = serde_json::to_value(state).unwrap();
379 let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
380 assert_eq!(state, parsed);
381 }
382 }
383
384 #[test]
385 fn input_state_new_accepted() {
386 let id = InputId::new();
387 let state = InputState::new_accepted(id.clone());
388 assert_eq!(state.input_id, id);
389 assert_eq!(state.current_state(), InputLifecycleState::Accepted);
390 assert!(!state.is_terminal());
391 assert!(state.history().is_empty());
392 assert!(state.terminal_outcome().is_none());
393 assert!(state.policy.is_none());
394 }
395
396 #[test]
397 fn input_state_serde_roundtrip() {
398 let mut state = InputState::new_accepted(InputId::new());
399 state.policy = Some(PolicySnapshot {
400 version: PolicyVersion(1),
401 decision: PolicyDecision {
402 apply_mode: ApplyMode::StageRunStart,
403 wake_mode: WakeMode::WakeIfIdle,
404 queue_mode: QueueMode::Fifo,
405 consume_point: ConsumePoint::OnRunComplete,
406 interrupt_policy: InterruptPolicy::None,
407 drain_policy: DrainPolicy::QueueNextTurn,
408 routing_disposition: RoutingDisposition::Queue,
409 record_transcript: true,
410 emit_operator_content: true,
411 policy_version: PolicyVersion(1),
412 },
413 });
414 state.apply(InputLifecycleInput::QueueAccepted).unwrap();
415
416 let json = serde_json::to_value(&state).unwrap();
417 let parsed: InputState = serde_json::from_value(json).unwrap();
418 assert_eq!(parsed.input_id, state.input_id);
419 assert_eq!(parsed.current_state(), state.current_state());
420 assert_eq!(parsed.history().len(), 1);
421 }
422
423 #[test]
424 fn input_state_deserializes_legacy_persisted_input_tags() {
425 let mut continuation_state = InputState::new_accepted(InputId::new());
426 continuation_state.persisted_input = Some(Input::Continuation(
427 crate::input::ContinuationInput::terminal_peer_response_for_request(
428 "legacy continuation",
429 Some("req-legacy".into()),
430 ),
431 ));
432 let mut continuation_json = serde_json::to_value(&continuation_state).unwrap();
433 continuation_json["persisted_input"]["input_type"] =
434 serde_json::Value::String("system_generated".into());
435 let parsed: InputState = serde_json::from_value(continuation_json).unwrap();
436 assert!(matches!(
437 parsed.persisted_input,
438 Some(Input::Continuation(_))
439 ));
440
441 let mut operation_state = InputState::new_accepted(InputId::new());
442 operation_state.persisted_input = Some(Input::Operation(crate::input::OperationInput {
443 header: crate::input::InputHeader {
444 id: InputId::new(),
445 timestamp: Utc::now(),
446 source: crate::input::InputOrigin::System,
447 durability: crate::input::InputDurability::Derived,
448 visibility: crate::input::InputVisibility::default(),
449 idempotency_key: None,
450 supersession_key: None,
451 correlation_id: None,
452 },
453 operation_id: OperationId::new(),
454 event: OpEvent::Cancelled {
455 id: OperationId::new(),
456 },
457 }));
458 let mut operation_json = serde_json::to_value(&operation_state).unwrap();
459 operation_json["persisted_input"]["input_type"] =
460 serde_json::Value::String("projected".into());
461 let parsed: InputState = serde_json::from_value(operation_json).unwrap();
462 assert!(matches!(parsed.persisted_input, Some(Input::Operation(_))));
463 }
464
465 #[test]
466 fn abandon_reason_serde() {
467 for reason in [
468 InputAbandonReason::Retired,
469 InputAbandonReason::Reset,
470 InputAbandonReason::Destroyed,
471 InputAbandonReason::Cancelled,
472 ] {
473 let json = serde_json::to_value(&reason).unwrap();
474 let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
475 assert_eq!(reason, parsed);
476 }
477 }
478
479 #[test]
480 fn terminal_outcome_consumed_serde() {
481 let outcome = InputTerminalOutcome::Consumed;
482 let json = serde_json::to_value(&outcome).unwrap();
483 assert_eq!(json["outcome_type"], "consumed");
484 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
485 assert_eq!(outcome, parsed);
486 }
487
488 #[test]
489 fn terminal_outcome_superseded_serde() {
490 let outcome = InputTerminalOutcome::Superseded {
491 superseded_by: InputId::new(),
492 };
493 let json = serde_json::to_value(&outcome).unwrap();
494 assert_eq!(json["outcome_type"], "superseded");
495 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
496 assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
497 }
498
499 #[test]
500 fn terminal_outcome_abandoned_serde() {
501 let outcome = InputTerminalOutcome::Abandoned {
502 reason: InputAbandonReason::Retired,
503 };
504 let json = serde_json::to_value(&outcome).unwrap();
505 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
506 assert!(matches!(
507 parsed,
508 InputTerminalOutcome::Abandoned {
509 reason: InputAbandonReason::Retired,
510 }
511 ));
512 }
513
514 #[test]
515 fn reconstruction_source_serde() {
516 let sources = vec![
517 ReconstructionSource::Projection {
518 rule_id: "rule-1".into(),
519 source_event_id: "evt-1".into(),
520 },
521 ReconstructionSource::Coalescing {
522 source_input_ids: vec![InputId::new(), InputId::new()],
523 },
524 ];
525 for source in sources {
526 let json = serde_json::to_value(&source).unwrap();
527 assert!(json["source_type"].is_string());
528 let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
529 let _ = parsed;
530 }
531 }
532
533 #[test]
534 fn input_state_event_serde() {
535 let event = InputStateEvent {
536 timestamp: Utc::now(),
537 state: InputLifecycleState::Queued,
538 detail: Some("queued for processing".into()),
539 };
540 let json = serde_json::to_value(&event).unwrap();
541 let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
542 assert_eq!(parsed.state, InputLifecycleState::Queued);
543 }
544}