1use chrono::{DateTime, Utc};
7use meerkat_core::lifecycle::{InputId, RunId};
8use serde::{Deserialize, Serialize};
9
10use crate::identifiers::PolicyVersion;
11use crate::input::Input;
12use crate::policy::PolicyDecision;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17#[non_exhaustive]
18pub enum InputLifecycleState {
19 Accepted,
21 Queued,
23 Staged,
25 Applied,
27 AppliedPendingConsumption,
29 Consumed,
31 Superseded,
33 Coalesced,
35 Abandoned,
37}
38
39impl InputLifecycleState {
40 pub fn is_terminal(&self) -> bool {
42 matches!(
43 self,
44 Self::Consumed | Self::Superseded | Self::Coalesced | Self::Abandoned
45 )
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[non_exhaustive]
53pub enum InputAbandonReason {
54 Retired,
56 Reset,
58 Destroyed,
60 Cancelled,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(tag = "outcome_type", rename_all = "snake_case")]
67#[non_exhaustive]
68pub enum InputTerminalOutcome {
69 Consumed,
71 Superseded { superseded_by: InputId },
73 Coalesced { aggregate_id: InputId },
75 Abandoned { reason: InputAbandonReason },
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct InputStateHistoryEntry {
82 pub timestamp: DateTime<Utc>,
84 pub from: InputLifecycleState,
86 pub to: InputLifecycleState,
88 #[serde(skip_serializing_if = "Option::is_none")]
90 pub reason: Option<String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PolicySnapshot {
96 pub version: PolicyVersion,
98 pub decision: PolicyDecision,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(tag = "source_type", rename_all = "snake_case")]
105#[non_exhaustive]
106pub enum ReconstructionSource {
107 Projection {
109 rule_id: String,
110 source_event_id: String,
111 },
112 Coalescing { source_input_ids: Vec<InputId> },
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct InputStateEvent {
119 pub timestamp: DateTime<Utc>,
121 pub state: InputLifecycleState,
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub detail: Option<String>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct InputState {
131 pub input_id: InputId,
133 pub current_state: InputLifecycleState,
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub policy: Option<PolicySnapshot>,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub terminal_outcome: Option<InputTerminalOutcome>,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub durability: Option<crate::input::InputDurability>,
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub idempotency_key: Option<crate::identifiers::IdempotencyKey>,
147 #[serde(default)]
149 pub attempt_count: u32,
150 #[serde(default)]
152 pub recovery_count: u32,
153 #[serde(default, skip_serializing_if = "Vec::is_empty")]
155 pub history: Vec<InputStateHistoryEntry>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub reconstruction_source: Option<ReconstructionSource>,
159 #[serde(default, skip_serializing_if = "Option::is_none")]
161 pub persisted_input: Option<Input>,
162 #[serde(default, skip_serializing_if = "Option::is_none")]
164 pub last_run_id: Option<RunId>,
165 #[serde(default, skip_serializing_if = "Option::is_none")]
167 pub last_boundary_sequence: Option<u64>,
168 pub created_at: DateTime<Utc>,
170 pub updated_at: DateTime<Utc>,
172}
173
174impl InputState {
175 pub fn new_accepted(input_id: InputId) -> Self {
177 let now = Utc::now();
178 Self {
179 input_id,
180 current_state: InputLifecycleState::Accepted,
181 policy: None,
182 terminal_outcome: None,
183 durability: None,
184 idempotency_key: None,
185 attempt_count: 0,
186 recovery_count: 0,
187 history: Vec::new(),
188 reconstruction_source: None,
189 persisted_input: None,
190 last_run_id: None,
191 last_boundary_sequence: None,
192 created_at: now,
193 updated_at: now,
194 }
195 }
196
197 pub fn is_terminal(&self) -> bool {
199 self.current_state.is_terminal()
200 }
201}
202
203#[cfg(test)]
204#[allow(clippy::unwrap_used)]
205mod tests {
206 use super::*;
207 use crate::policy::{ApplyMode, ConsumePoint, QueueMode, WakeMode};
208
209 #[test]
210 fn lifecycle_state_terminal() {
211 assert!(InputLifecycleState::Consumed.is_terminal());
212 assert!(InputLifecycleState::Superseded.is_terminal());
213 assert!(InputLifecycleState::Coalesced.is_terminal());
214 assert!(InputLifecycleState::Abandoned.is_terminal());
215
216 assert!(!InputLifecycleState::Accepted.is_terminal());
217 assert!(!InputLifecycleState::Queued.is_terminal());
218 assert!(!InputLifecycleState::Staged.is_terminal());
219 assert!(!InputLifecycleState::Applied.is_terminal());
220 assert!(!InputLifecycleState::AppliedPendingConsumption.is_terminal());
221 }
222
223 #[test]
224 fn lifecycle_state_serde() {
225 for state in [
226 InputLifecycleState::Accepted,
227 InputLifecycleState::Queued,
228 InputLifecycleState::Staged,
229 InputLifecycleState::Applied,
230 InputLifecycleState::AppliedPendingConsumption,
231 InputLifecycleState::Consumed,
232 InputLifecycleState::Superseded,
233 InputLifecycleState::Coalesced,
234 InputLifecycleState::Abandoned,
235 ] {
236 let json = serde_json::to_value(state).unwrap();
237 let parsed: InputLifecycleState = serde_json::from_value(json).unwrap();
238 assert_eq!(state, parsed);
239 }
240 }
241
242 #[test]
243 fn input_state_new_accepted() {
244 let id = InputId::new();
245 let state = InputState::new_accepted(id.clone());
246 assert_eq!(state.input_id, id);
247 assert_eq!(state.current_state, InputLifecycleState::Accepted);
248 assert!(!state.is_terminal());
249 assert!(state.history.is_empty());
250 assert!(state.terminal_outcome.is_none());
251 assert!(state.policy.is_none());
252 }
253
254 #[test]
255 fn input_state_serde_roundtrip() {
256 let mut state = InputState::new_accepted(InputId::new());
257 state.policy = Some(PolicySnapshot {
258 version: PolicyVersion(1),
259 decision: PolicyDecision {
260 apply_mode: ApplyMode::StageRunStart,
261 wake_mode: WakeMode::WakeIfIdle,
262 queue_mode: QueueMode::Fifo,
263 consume_point: ConsumePoint::OnRunComplete,
264 record_transcript: true,
265 emit_operator_content: true,
266 policy_version: PolicyVersion(1),
267 },
268 });
269 state.history.push(InputStateHistoryEntry {
270 timestamp: Utc::now(),
271 from: InputLifecycleState::Accepted,
272 to: InputLifecycleState::Queued,
273 reason: Some("policy resolved".into()),
274 });
275
276 let json = serde_json::to_value(&state).unwrap();
277 let parsed: InputState = serde_json::from_value(json).unwrap();
278 assert_eq!(parsed.input_id, state.input_id);
279 assert_eq!(parsed.current_state, state.current_state);
280 assert_eq!(parsed.history.len(), 1);
281 }
282
283 #[test]
284 fn abandon_reason_serde() {
285 for reason in [
286 InputAbandonReason::Retired,
287 InputAbandonReason::Reset,
288 InputAbandonReason::Destroyed,
289 InputAbandonReason::Cancelled,
290 ] {
291 let json = serde_json::to_value(&reason).unwrap();
292 let parsed: InputAbandonReason = serde_json::from_value(json).unwrap();
293 assert_eq!(reason, parsed);
294 }
295 }
296
297 #[test]
298 fn terminal_outcome_consumed_serde() {
299 let outcome = InputTerminalOutcome::Consumed;
300 let json = serde_json::to_value(&outcome).unwrap();
301 assert_eq!(json["outcome_type"], "consumed");
302 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
303 assert_eq!(outcome, parsed);
304 }
305
306 #[test]
307 fn terminal_outcome_superseded_serde() {
308 let outcome = InputTerminalOutcome::Superseded {
309 superseded_by: InputId::new(),
310 };
311 let json = serde_json::to_value(&outcome).unwrap();
312 assert_eq!(json["outcome_type"], "superseded");
313 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
314 assert!(matches!(parsed, InputTerminalOutcome::Superseded { .. }));
315 }
316
317 #[test]
318 fn terminal_outcome_abandoned_serde() {
319 let outcome = InputTerminalOutcome::Abandoned {
320 reason: InputAbandonReason::Retired,
321 };
322 let json = serde_json::to_value(&outcome).unwrap();
323 let parsed: InputTerminalOutcome = serde_json::from_value(json).unwrap();
324 assert!(matches!(
325 parsed,
326 InputTerminalOutcome::Abandoned {
327 reason: InputAbandonReason::Retired,
328 }
329 ));
330 }
331
332 #[test]
333 fn reconstruction_source_serde() {
334 let sources = vec![
335 ReconstructionSource::Projection {
336 rule_id: "rule-1".into(),
337 source_event_id: "evt-1".into(),
338 },
339 ReconstructionSource::Coalescing {
340 source_input_ids: vec![InputId::new(), InputId::new()],
341 },
342 ];
343 for source in sources {
344 let json = serde_json::to_value(&source).unwrap();
345 assert!(json["source_type"].is_string());
347 let parsed: ReconstructionSource = serde_json::from_value(json).unwrap();
348 let _ = parsed;
349 }
350 }
351
352 #[test]
353 fn input_state_event_serde() {
354 let event = InputStateEvent {
355 timestamp: Utc::now(),
356 state: InputLifecycleState::Queued,
357 detail: Some("queued for processing".into()),
358 };
359 let json = serde_json::to_value(&event).unwrap();
360 let parsed: InputStateEvent = serde_json::from_value(json).unwrap();
361 assert_eq!(parsed.state, InputLifecycleState::Queued);
362 }
363}