1use chrono::{DateTime, Utc};
9use meerkat_core::lifecycle::{InputId, RunId};
10use serde::{Deserialize, Serialize};
11
12use crate::identifiers::PolicyVersion;
13use crate::input::Input;
14use crate::input_lifecycle_authority::{
15 InputLifecycleAuthority, InputLifecycleError, InputLifecycleInput, InputLifecycleMutator,
16 InputLifecycleTransition,
17};
18use crate::policy::PolicyDecision;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum InputLifecycleState {
25 Accepted,
27 Queued,
29 Staged,
31 Applied,
33 AppliedPendingConsumption,
35 Consumed,
37 Superseded,
39 Coalesced,
41 Abandoned,
43}
44
45impl InputLifecycleState {
46 pub fn is_terminal(&self) -> bool {
48 matches!(
49 self,
50 Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
51 )
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58#[non_exhaustive]
59pub enum InputAbandonReason {
60 Retired,
62 Reset,
64 Destroyed,
66 Cancelled,
68 MaxAttemptsExhausted { attempts: u32 },
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "outcome_type", rename_all = "snake_case")]
75#[non_exhaustive]
76pub enum InputTerminalOutcome {
77 Consumed,
79 Superseded { superseded_by: InputId },
81 Coalesced { aggregate_id: InputId },
83 Abandoned { reason: InputAbandonReason },
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct InputStateHistoryEntry {
90 pub timestamp: DateTime<Utc>,
92 pub from: InputLifecycleState,
94 pub to: InputLifecycleState,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub reason: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct PolicySnapshot {
104 pub version: PolicyVersion,
106 pub decision: PolicyDecision,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(tag = "source_type", rename_all = "snake_case")]
113#[non_exhaustive]
114pub enum ReconstructionSource {
115 Projection {
117 rule_id: String,
118 source_event_id: String,
119 },
120 Coalescing { source_input_ids: Vec<InputId> },
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct InputStateEvent {
127 pub timestamp: DateTime<Utc>,
129 pub state: InputLifecycleState,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub detail: Option<String>,
134}
135
136#[derive(Debug, Clone)]
146pub struct InputState {
147 pub input_id: InputId,
149 authority: InputLifecycleAuthority,
152 pub policy: Option<PolicySnapshot>,
154 pub durability: Option<crate::input::InputDurability>,
156 pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
158 pub recovery_count: u32,
160 pub reconstruction_source: Option<ReconstructionSource>,
162 pub persisted_input: Option<Input>,
164 pub created_at: DateTime<Utc>,
166}
167
168impl InputState {
169 pub fn new_accepted(input_id: InputId) -> Self {
171 let now = Utc::now();
172 Self {
173 input_id,
174 authority: InputLifecycleAuthority::new_at(now),
175 policy: None,
176 durability: None,
177 idempotency_key: None,
178 recovery_count: 0,
179 reconstruction_source: None,
180 persisted_input: None,
181 created_at: now,
182 }
183 }
184
185 pub fn current_state(&self) -> InputLifecycleState {
189 self.authority.phase()
190 }
191
192 pub fn is_terminal(&self) -> bool {
194 self.authority.is_terminal()
195 }
196
197 pub fn terminal_outcome(&self) -> Option<&InputTerminalOutcome> {
199 self.authority.terminal_outcome()
200 }
201
202 pub fn history(&self) -> &[InputStateHistoryEntry] {
204 self.authority.history()
205 }
206
207 pub fn last_run_id(&self) -> Option<&RunId> {
209 self.authority.last_run_id()
210 }
211
212 pub fn last_boundary_sequence(&self) -> Option<u64> {
214 self.authority.last_boundary_sequence()
215 }
216
217 pub fn updated_at(&self) -> DateTime<Utc> {
219 self.authority.updated_at()
220 }
221
222 pub fn attempt_count(&self) -> u32 {
226 self.authority.attempt_count()
227 }
228
229 pub fn apply(
235 &mut self,
236 input: InputLifecycleInput,
237 ) -> Result<InputLifecycleTransition, InputLifecycleError> {
238 self.authority.apply(input)
239 }
240
241 pub fn can_accept(&self, input: &InputLifecycleInput) -> bool {
243 self.authority.can_accept(input)
244 }
245
246 pub fn set_terminal_outcome(&mut self, outcome: InputTerminalOutcome) {
249 self.authority.set_terminal_outcome(outcome);
250 }
251
252 pub fn authority(&self) -> &InputLifecycleAuthority {
254 &self.authority
255 }
256
257 pub fn authority_mut(&mut self) -> &mut InputLifecycleAuthority {
259 &mut self.authority
260 }
261}
262
263#[derive(Serialize, Deserialize)]
269struct InputStateSerde {
270 input_id: InputId,
271 current_state: InputLifecycleState,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 policy: Option<PolicySnapshot>,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 terminal_outcome: Option<InputTerminalOutcome>,
276 #[serde(skip_serializing_if = "Option::is_none")]
277 durability: Option<crate::input::InputDurability>,
278 #[serde(skip_serializing_if = "Option::is_none")]
279 idempotency_key: Option<crate::identifiers::IdempotencyKey>,
280 #[serde(default)]
281 attempt_count: u32,
282 #[serde(default)]
283 recovery_count: u32,
284 #[serde(default, skip_serializing_if = "Vec::is_empty")]
285 history: Vec<InputStateHistoryEntry>,
286 #[serde(skip_serializing_if = "Option::is_none")]
287 reconstruction_source: Option<ReconstructionSource>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 persisted_input: Option<Input>,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
291 last_run_id: Option<RunId>,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
293 last_boundary_sequence: Option<u64>,
294 created_at: DateTime<Utc>,
295 updated_at: DateTime<Utc>,
296}
297
298impl Serialize for InputState {
299 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
300 let helper = InputStateSerde {
301 input_id: self.input_id.clone(),
302 current_state: self.authority.phase(),
303 policy: self.policy.clone(),
304 terminal_outcome: self.authority.terminal_outcome().cloned(),
305 durability: self.durability,
306 idempotency_key: self.idempotency_key.clone(),
307 attempt_count: self.authority.attempt_count(),
308 recovery_count: self.recovery_count,
309 history: self.authority.history().to_vec(),
310 reconstruction_source: self.reconstruction_source.clone(),
311 persisted_input: self.persisted_input.clone(),
312 last_run_id: self.authority.last_run_id().cloned(),
313 last_boundary_sequence: self.authority.last_boundary_sequence(),
314 created_at: self.created_at,
315 updated_at: self.authority.updated_at(),
316 };
317 helper.serialize(serializer)
318 }
319}
320
321impl<'de> Deserialize<'de> for InputState {
322 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
323 let helper = InputStateSerde::deserialize(deserializer)?;
324 let authority = InputLifecycleAuthority::restore(
325 helper.current_state,
326 helper.terminal_outcome,
327 helper.last_run_id,
328 helper.last_boundary_sequence,
329 helper.attempt_count,
330 helper.history,
331 helper.updated_at,
332 );
333 Ok(InputState {
334 input_id: helper.input_id,
335 authority,
336 policy: helper.policy,
337 durability: helper.durability,
338 idempotency_key: helper.idempotency_key,
339 recovery_count: helper.recovery_count,
340 reconstruction_source: helper.reconstruction_source,
341 persisted_input: helper.persisted_input,
342 created_at: helper.created_at,
343 })
344 }
345}
346
347#[cfg(test)]
348#[allow(clippy::unwrap_used)]
349mod tests {
350 use super::*;
351 use crate::policy::{
352 ApplyMode, ConsumePoint, DrainPolicy, QueueMode, RoutingDisposition, WakeMode,
353 };
354 use meerkat_core::ops::{OpEvent, OperationId};
355
356 #[test]
357 fn attempt_count_accessor_delegates_to_authority() {
358 let state = InputState::new_accepted(InputId::new());
359 assert_eq!(state.attempt_count(), 0);
360 }
361
362 #[test]
363 fn lifecycle_state_terminal() {
364 assert!(InputLifecycleState::Consumed.is_terminal());
365 assert!(InputLifecycleState::Superseded.is_terminal());
366 assert!(InputLifecycleState::Coalesced.is_terminal());
367 assert!(InputLifecycleState::Abandoned.is_terminal());
368
369 assert!(!InputLifecycleState::Accepted.is_terminal());
370 assert!(!InputLifecycleState::Queued.is_terminal());
371 assert!(!InputLifecycleState::Staged.is_terminal());
372 assert!(!InputLifecycleState::Applied.is_terminal());
373 assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
374 }
375
376 #[test]
377 fn lifecycle_state_serde() {
378 for state in [
379 InputLifecycleState::Accepted,
380 InputLifecycleState::Queued,
381 InputLifecycleState::Staged,
382 InputLifecycleState::Applied,
383 InputLifecycleState::AppliedPendingConsumption,
384 InputLifecycleState::Consumed,
385 InputLifecycleState::Superseded,
386 InputLifecycleState::Coalesced,
387 InputLifecycleState::Abandoned,
388 ] {
389 let json = serde_json::to_value(state).unwrap();
390 let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
391 assert_eq!(state, parsed);
392 }
393 }
394
395 #[test]
396 fn input_state_new_accepted() {
397 let id = InputId::new();
398 let state = InputState::new_accepted(id.clone());
399 assert_eq!(state.input_id, id);
400 assert_eq!(state.current_state(), InputLifecycleState::Accepted);
401 assert!(!state.is_terminal());
402 assert!(state.history().is_empty());
403 assert!(state.terminal_outcome().is_none());
404 assert!(state.policy.is_none());
405 }
406
407 #[test]
408 fn input_state_serde_roundtrip() {
409 let mut state = InputState::new_accepted(InputId::new());
410 state.policy = Some(PolicySnapshot {
411 version: PolicyVersion(1),
412 decision: PolicyDecision {
413 apply_mode: ApplyMode::StageRunStart,
414 wake_mode: WakeMode::WakeIfIdle,
415 queue_mode: QueueMode::Fifo,
416 consume_point: ConsumePoint::OnRunComplete,
417 drain_policy: DrainPolicy::QueueNextTurn,
418 routing_disposition: RoutingDisposition::Queue,
419 record_transcript: true,
420 emit_operator_content: true,
421 policy_version: PolicyVersion(1),
422 },
423 });
424 state.apply(InputLifecycleInput::QueueAccepted).unwrap();
425
426 let json = serde_json::to_value(&state).unwrap();
427 let parsed: InputState = serde_json::from_value(json).unwrap();
428 assert_eq!(parsed.input_id, state.input_id);
429 assert_eq!(parsed.current_state(), state.current_state());
430 assert_eq!(parsed.history().len(), 1);
431 }
432
433 #[test]
434 fn input_state_deserializes_legacy_persisted_input_tags() {
435 let mut continuation_state = InputState::new_accepted(InputId::new());
436 continuation_state.persisted_input = Some(Input::Continuation(
437 crate::input::ContinuationInput::detached_background_op_completed(),
438 ));
439 let mut continuation_json = serde_json::to_value(&continuation_state).unwrap();
440 continuation_json["persisted_input"]["input_type"] =
441 serde_json::Value::String("system_generated".into());
442 let parsed: InputState = serde_json::from_value(continuation_json).unwrap();
443 assert!(matches!(
444 parsed.persisted_input,
445 Some(Input::Continuation(_))
446 ));
447
448 let mut operation_state = InputState::new_accepted(InputId::new());
449 operation_state.persisted_input = Some(Input::Operation(crate::input::OperationInput {
450 header: crate::input::InputHeader {
451 id: InputId::new(),
452 timestamp: Utc::now(),
453 source: crate::input::InputOrigin::System,
454 durability: crate::input::InputDurability::Derived,
455 visibility: crate::input::InputVisibility::default(),
456 idempotency_key: None,
457 supersession_key: None,
458 correlation_id: None,
459 },
460 operation_id: OperationId::new(),
461 event: OpEvent::Cancelled {
462 id: OperationId::new(),
463 },
464 }));
465 let mut operation_json = serde_json::to_value(&operation_state).unwrap();
466 operation_json["persisted_input"]["input_type"] =
467 serde_json::Value::String("projected".into());
468 let parsed: InputState = serde_json::from_value(operation_json).unwrap();
469 assert!(matches!(parsed.persisted_input, Some(Input::Operation(_))));
470 }
471
472 #[test]
473 fn abandon_reason_serde() {
474 for reason in [
475 InputAbandonReason::Retired,
476 InputAbandonReason::Reset,
477 InputAbandonReason::Destroyed,
478 InputAbandonReason::Cancelled,
479 ] {
480 let json = serde_json::to_value(&reason).unwrap();
481 let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
482 assert_eq!(reason, parsed);
483 }
484 }
485
486 #[test]
487 fn terminal_outcome_consumed_serde() {
488 let outcome = InputTerminalOutcome::Consumed;
489 let json = serde_json::to_value(&outcome).unwrap();
490 assert_eq!(json["outcome_type"], "consumed");
491 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
492 assert_eq!(outcome, parsed);
493 }
494
495 #[test]
496 fn terminal_outcome_superseded_serde() {
497 let outcome = InputTerminalOutcome::Superseded {
498 superseded_by: InputId::new(),
499 };
500 let json = serde_json::to_value(&outcome).unwrap();
501 assert_eq!(json["outcome_type"], "superseded");
502 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
503 assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
504 }
505
506 #[test]
507 fn terminal_outcome_abandoned_serde() {
508 let outcome = InputTerminalOutcome::Abandoned {
509 reason: InputAbandonReason::Retired,
510 };
511 let json = serde_json::to_value(&outcome).unwrap();
512 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
513 assert!(matches!(
514 parsed,
515 InputTerminalOutcome::Abandoned {
516 reason: InputAbandonReason::Retired,
517 }
518 ));
519 }
520
521 #[test]
522 fn reconstruction_source_serde() {
523 let sources = vec![
524 ReconstructionSource::Projection {
525 rule_id: "rule-1".into(),
526 source_event_id: "evt-1".into(),
527 },
528 ReconstructionSource::Coalescing {
529 source_input_ids: vec![InputId::new(), InputId::new()],
530 },
531 ];
532 for source in sources {
533 let json = serde_json::to_value(&source).unwrap();
534 assert!(json["source_type"].is_string());
535 let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
536 let _ = parsed;
537 }
538 }
539
540 #[test]
541 fn input_state_event_serde() {
542 let event = InputStateEvent {
543 timestamp: Utc::now(),
544 state: InputLifecycleState::Queued,
545 detail: Some("queued for processing".into()),
546 };
547 let json = serde_json::to_value(&event).unwrap();
548 let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
549 assert_eq!(parsed.state, InputLifecycleState::Queued);
550 }
551}