1use chrono::{DateTime, Utc};
6use meerkat_core::TurnErrorMetadata;
7use meerkat_core::lifecycle::{InputId, RunId};
8use serde::{Deserialize, Serialize};
9
10use crate::identifiers::{
11 CausationId, CorrelationId, EventCodeId, LogicalRuntimeId, RuntimeEventId,
12};
13use crate::input_state::{InputAbandonReason, InputLifecycleState};
14use crate::runtime_state::RuntimeState;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RuntimeEventEnvelope {
19 pub id: RuntimeEventId,
21 pub timestamp: DateTime<Utc>,
23 pub runtime_id: LogicalRuntimeId,
25 pub event: RuntimeEvent,
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub causation_id: Option<CausationId>,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub correlation_id: Option<CorrelationId>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(tag = "category", content = "data", rename_all = "snake_case")]
38#[non_exhaustive]
39pub enum RuntimeEvent {
40 InputLifecycle(InputLifecycleEvent),
42 RunLifecycle(RunLifecycleEvent),
44 RuntimeStateChange(RuntimeStateChangeEvent),
46 Topology(RuntimeTopologyEvent),
48 Projection(RuntimeProjectionEvent),
50}
51
52impl RuntimeEvent {
53 pub fn event_code(&self) -> EventCodeId {
55 match self {
56 RuntimeEvent::InputLifecycle(e) => e.event_code(),
57 RuntimeEvent::RunLifecycle(e) => e.event_code(),
58 RuntimeEvent::RuntimeStateChange(_) => EventCodeId::new("runtime.state_changed"),
59 RuntimeEvent::Topology(e) => e.event_code(),
60 RuntimeEvent::Projection(_) => EventCodeId::new("runtime.projection_emitted"),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "code", rename_all = "snake_case")]
68#[non_exhaustive]
69pub enum InputLifecycleEvent {
70 Accepted { input_id: InputId },
72 Deduplicated {
74 input_id: InputId,
75 existing_id: InputId,
76 },
77 Superseded {
79 input_id: InputId,
80 superseded_by: InputId,
81 },
82 Coalesced {
84 input_id: InputId,
85 aggregate_id: InputId,
86 },
87 Queued { input_id: InputId },
89 Staged { input_id: InputId, run_id: RunId },
91 Applied { input_id: InputId, run_id: RunId },
93 Consumed { input_id: InputId, run_id: RunId },
95 Abandoned {
100 input_id: InputId,
101 reason: InputAbandonReason,
102 },
103 StateTransitioned {
105 input_id: InputId,
106 from: InputLifecycleState,
107 to: InputLifecycleState,
108 },
109}
110
111impl InputLifecycleEvent {
112 pub fn event_code(&self) -> EventCodeId {
113 match self {
114 Self::Accepted { .. } => EventCodeId::new("input.accepted"),
115 Self::Deduplicated { .. } => EventCodeId::new("input.deduplicated"),
116 Self::Superseded { .. } => EventCodeId::new("input.superseded"),
117 Self::Coalesced { .. } => EventCodeId::new("input.coalesced"),
118 Self::Queued { .. } => EventCodeId::new("input.queued"),
119 Self::Staged { .. } => EventCodeId::new("input.staged"),
120 Self::Applied { .. } => EventCodeId::new("input.applied"),
121 Self::Consumed { .. } => EventCodeId::new("input.consumed"),
122 Self::Abandoned { .. } => EventCodeId::new("input.abandoned"),
123 Self::StateTransitioned { .. } => EventCodeId::new("input.state_transitioned"),
124 }
125 }
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130#[serde(tag = "code", rename_all = "snake_case")]
131#[non_exhaustive]
132pub enum RunLifecycleEvent {
133 Started { run_id: RunId },
135 Completed { run_id: RunId },
137 Failed {
144 run_id: RunId,
145 error: TurnErrorMetadata,
146 },
147 Cancelled { run_id: RunId },
149}
150
151impl RunLifecycleEvent {
152 pub fn event_code(&self) -> EventCodeId {
153 match self {
154 Self::Started { .. } => EventCodeId::new("run.started"),
155 Self::Completed { .. } => EventCodeId::new("run.completed"),
156 Self::Failed { .. } => EventCodeId::new("run.failed"),
157 Self::Cancelled { .. } => EventCodeId::new("run.cancelled"),
158 }
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct RuntimeStateChangeEvent {
165 pub from: RuntimeState,
166 pub to: RuntimeState,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(tag = "code", rename_all = "snake_case")]
172#[non_exhaustive]
173pub enum RuntimeTopologyEvent {
174 RuntimeCreated { runtime_id: LogicalRuntimeId },
176 RuntimeRetired { runtime_id: LogicalRuntimeId },
178 RuntimeRecycled { runtime_id: LogicalRuntimeId },
180 RuntimeDestroyed { runtime_id: LogicalRuntimeId },
182}
183
184impl RuntimeTopologyEvent {
185 pub fn event_code(&self) -> EventCodeId {
186 match self {
187 Self::RuntimeCreated { .. } => EventCodeId::new("topology.runtime_created"),
188 Self::RuntimeRetired { .. } => EventCodeId::new("topology.runtime_retired"),
189 Self::RuntimeRecycled { .. } => EventCodeId::new("topology.runtime_recycled"),
190 Self::RuntimeDestroyed { .. } => EventCodeId::new("topology.runtime_destroyed"),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct RuntimeProjectionEvent {
198 pub rule_id: String,
200 pub projected_input_id: InputId,
202 pub source_event_id: RuntimeEventId,
204}
205
206#[cfg(test)]
207#[allow(clippy::unwrap_used)]
208mod tests {
209 use super::*;
210
211 fn make_envelope(event: RuntimeEvent) -> RuntimeEventEnvelope {
212 RuntimeEventEnvelope {
213 id: RuntimeEventId::new(),
214 timestamp: Utc::now(),
215 runtime_id: LogicalRuntimeId::new("test-runtime"),
216 event,
217 causation_id: None,
218 correlation_id: None,
219 }
220 }
221
222 #[test]
223 fn input_lifecycle_accepted_serde() {
224 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
225 input_id: InputId::new(),
226 });
227 let envelope = make_envelope(event);
228 let json = serde_json::to_value(&envelope).unwrap();
229 assert_eq!(json["event"]["category"], "input_lifecycle");
230 assert_eq!(json["event"]["data"]["code"], "accepted");
231 let parsed: RuntimeEventEnvelope = serde_json::from_value(json).unwrap();
232 assert!(matches!(
233 parsed.event,
234 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted { .. })
235 ));
236 }
237
238 #[test]
239 fn input_lifecycle_deduplicated_serde() {
240 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated {
241 input_id: InputId::new(),
242 existing_id: InputId::new(),
243 });
244 let json = serde_json::to_value(&event).unwrap();
245 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
246 assert!(matches!(
247 parsed,
248 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated { .. })
249 ));
250 }
251
252 #[test]
253 fn input_lifecycle_superseded_serde() {
254 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded {
255 input_id: InputId::new(),
256 superseded_by: InputId::new(),
257 });
258 let json = serde_json::to_value(&event).unwrap();
259 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
260 assert!(matches!(
261 parsed,
262 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded { .. })
263 ));
264 }
265
266 #[test]
267 fn input_lifecycle_coalesced_serde() {
268 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced {
269 input_id: InputId::new(),
270 aggregate_id: InputId::new(),
271 });
272 let json = serde_json::to_value(&event).unwrap();
273 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
274 assert!(matches!(
275 parsed,
276 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced { .. })
277 ));
278 }
279
280 #[test]
281 fn run_lifecycle_started_serde() {
282 let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started {
283 run_id: RunId::new(),
284 });
285 let json = serde_json::to_value(&event).unwrap();
286 assert_eq!(json["category"], "run_lifecycle");
287 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
288 assert!(matches!(
289 parsed,
290 RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started { .. })
291 ));
292 }
293
294 #[test]
295 fn run_lifecycle_failed_serde() {
296 let mut metadata = TurnErrorMetadata::runtime_apply_failure("timeout");
297 metadata.retryable = Some(true);
298 let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
299 run_id: RunId::new(),
300 error: metadata,
301 });
302 let json = serde_json::to_value(&event).unwrap();
303 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
304 let RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed { error, .. }) = parsed else {
305 panic!("expected run lifecycle failed event");
306 };
307 assert_eq!(error.retryable, Some(true));
308 assert_eq!(
309 error.kind,
310 meerkat_core::TurnTerminalCauseKind::RuntimeApplyFailure
311 );
312 }
313
314 #[test]
315 fn input_lifecycle_abandoned_carries_typed_reason() {
316 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Abandoned {
317 input_id: InputId::new(),
318 reason: InputAbandonReason::MaxAttemptsExhausted { attempts: 3 },
319 });
320 let json = serde_json::to_value(&event).unwrap();
321 assert_eq!(json["category"], "input_lifecycle");
324 assert_eq!(json["data"]["code"], "abandoned");
325 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
326 let RuntimeEvent::InputLifecycle(InputLifecycleEvent::Abandoned { reason, .. }) = parsed
327 else {
328 panic!("expected input lifecycle abandoned event");
329 };
330 assert_eq!(
331 reason,
332 InputAbandonReason::MaxAttemptsExhausted { attempts: 3 }
333 );
334 }
335
336 #[test]
337 fn runtime_state_change_serde() {
338 let event = RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
339 from: RuntimeState::Idle,
340 to: RuntimeState::Running,
341 });
342 let json = serde_json::to_value(&event).unwrap();
343 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
344 assert!(matches!(
345 parsed,
346 RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
347 from: RuntimeState::Idle,
348 to: RuntimeState::Running,
349 })
350 ));
351 }
352
353 #[test]
354 fn topology_created_serde() {
355 let event = RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated {
356 runtime_id: LogicalRuntimeId::new("mob-agent-1"),
357 });
358 let json = serde_json::to_value(&event).unwrap();
359 assert_eq!(json["category"], "topology");
360 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
361 assert!(matches!(
362 parsed,
363 RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated { .. })
364 ));
365 }
366
367 #[test]
368 fn event_code_coverage() {
369 let events = vec![
370 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
371 input_id: InputId::new(),
372 }),
373 RuntimeEvent::RunLifecycle(RunLifecycleEvent::Completed {
374 run_id: RunId::new(),
375 }),
376 RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
377 from: RuntimeState::Idle,
378 to: RuntimeState::Running,
379 }),
380 RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeRetired {
381 runtime_id: LogicalRuntimeId::new("x"),
382 }),
383 RuntimeEvent::Projection(RuntimeProjectionEvent {
384 rule_id: "rule-1".into(),
385 projected_input_id: InputId::new(),
386 source_event_id: RuntimeEventId::new(),
387 }),
388 ];
389 for event in &events {
390 let code = event.event_code();
391 assert!(!code.0.is_empty());
392 }
393 }
394}