1use chrono::{DateTime, Utc};
6use meerkat_core::lifecycle::{InputId, RunId};
7use serde::{Deserialize, Serialize};
8
9use crate::identifiers::{
10 CausationId, CorrelationId, EventCodeId, LogicalRuntimeId, RuntimeEventId,
11};
12use crate::input_state::InputLifecycleState;
13use crate::runtime_state::RuntimeState;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RuntimeEventEnvelope {
18 pub id: RuntimeEventId,
20 pub timestamp: DateTime<Utc>,
22 pub runtime_id: LogicalRuntimeId,
24 pub event: RuntimeEvent,
26 #[serde(skip_serializing_if = "Option::is_none")]
28 pub causation_id: Option<CausationId>,
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub correlation_id: Option<CorrelationId>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(tag = "category", content = "data", rename_all = "snake_case")]
37#[non_exhaustive]
38pub enum RuntimeEvent {
39 InputLifecycle(InputLifecycleEvent),
41 RunLifecycle(RunLifecycleEvent),
43 RuntimeStateChange(RuntimeStateChangeEvent),
45 Topology(RuntimeTopologyEvent),
47 Projection(RuntimeProjectionEvent),
49}
50
51impl RuntimeEvent {
52 pub fn event_code(&self) -> EventCodeId {
54 match self {
55 RuntimeEvent::InputLifecycle(e) => e.event_code(),
56 RuntimeEvent::RunLifecycle(e) => e.event_code(),
57 RuntimeEvent::RuntimeStateChange(_) => EventCodeId::new("runtime.state_changed"),
58 RuntimeEvent::Topology(e) => e.event_code(),
59 RuntimeEvent::Projection(_) => EventCodeId::new("runtime.projection_emitted"),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(tag = "code", rename_all = "snake_case")]
67#[non_exhaustive]
68pub enum InputLifecycleEvent {
69 Accepted { input_id: InputId },
71 Deduplicated {
73 input_id: InputId,
74 existing_id: InputId,
75 },
76 Superseded {
78 input_id: InputId,
79 superseded_by: InputId,
80 },
81 Coalesced {
83 input_id: InputId,
84 aggregate_id: InputId,
85 },
86 Queued { input_id: InputId },
88 Staged { input_id: InputId, run_id: RunId },
90 Applied { input_id: InputId, run_id: RunId },
92 Consumed { input_id: InputId, run_id: RunId },
94 Abandoned { input_id: InputId, reason: String },
96 StateTransitioned {
98 input_id: InputId,
99 from: InputLifecycleState,
100 to: InputLifecycleState,
101 },
102}
103
104impl InputLifecycleEvent {
105 pub fn event_code(&self) -> EventCodeId {
106 match self {
107 Self::Accepted { .. } => EventCodeId::new("input.accepted"),
108 Self::Deduplicated { .. } => EventCodeId::new("input.deduplicated"),
109 Self::Superseded { .. } => EventCodeId::new("input.superseded"),
110 Self::Coalesced { .. } => EventCodeId::new("input.coalesced"),
111 Self::Queued { .. } => EventCodeId::new("input.queued"),
112 Self::Staged { .. } => EventCodeId::new("input.staged"),
113 Self::Applied { .. } => EventCodeId::new("input.applied"),
114 Self::Consumed { .. } => EventCodeId::new("input.consumed"),
115 Self::Abandoned { .. } => EventCodeId::new("input.abandoned"),
116 Self::StateTransitioned { .. } => EventCodeId::new("input.state_transitioned"),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "code", rename_all = "snake_case")]
124#[non_exhaustive]
125pub enum RunLifecycleEvent {
126 Started { run_id: RunId },
128 Completed { run_id: RunId },
130 Failed {
132 run_id: RunId,
133 error: String,
134 recoverable: bool,
135 },
136 Cancelled { run_id: RunId },
138}
139
140impl RunLifecycleEvent {
141 pub fn event_code(&self) -> EventCodeId {
142 match self {
143 Self::Started { .. } => EventCodeId::new("run.started"),
144 Self::Completed { .. } => EventCodeId::new("run.completed"),
145 Self::Failed { .. } => EventCodeId::new("run.failed"),
146 Self::Cancelled { .. } => EventCodeId::new("run.cancelled"),
147 }
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct RuntimeStateChangeEvent {
154 pub from: RuntimeState,
155 pub to: RuntimeState,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(tag = "code", rename_all = "snake_case")]
161#[non_exhaustive]
162pub enum RuntimeTopologyEvent {
163 RuntimeCreated { runtime_id: LogicalRuntimeId },
165 RuntimeRetired { runtime_id: LogicalRuntimeId },
167 RuntimeRespawned { runtime_id: LogicalRuntimeId },
169 RuntimeDestroyed { runtime_id: LogicalRuntimeId },
171}
172
173impl RuntimeTopologyEvent {
174 pub fn event_code(&self) -> EventCodeId {
175 match self {
176 Self::RuntimeCreated { .. } => EventCodeId::new("topology.runtime_created"),
177 Self::RuntimeRetired { .. } => EventCodeId::new("topology.runtime_retired"),
178 Self::RuntimeRespawned { .. } => EventCodeId::new("topology.runtime_respawned"),
179 Self::RuntimeDestroyed { .. } => EventCodeId::new("topology.runtime_destroyed"),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct RuntimeProjectionEvent {
187 pub rule_id: String,
189 pub projected_input_id: InputId,
191 pub source_event_id: RuntimeEventId,
193}
194
195#[cfg(test)]
196#[allow(clippy::unwrap_used)]
197mod tests {
198 use super::*;
199
200 fn make_envelope(event: RuntimeEvent) -> RuntimeEventEnvelope {
201 RuntimeEventEnvelope {
202 id: RuntimeEventId::new(),
203 timestamp: Utc::now(),
204 runtime_id: LogicalRuntimeId::new("test-runtime"),
205 event,
206 causation_id: None,
207 correlation_id: None,
208 }
209 }
210
211 #[test]
212 fn input_lifecycle_accepted_serde() {
213 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
214 input_id: InputId::new(),
215 });
216 let envelope = make_envelope(event);
217 let json = serde_json::to_value(&envelope).unwrap();
218 assert_eq!(json["event"]["category"], "input_lifecycle");
219 assert_eq!(json["event"]["data"]["code"], "accepted");
220 let parsed: RuntimeEventEnvelope = serde_json::from_value(json).unwrap();
221 assert!(matches!(
222 parsed.event,
223 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted { .. })
224 ));
225 }
226
227 #[test]
228 fn input_lifecycle_deduplicated_serde() {
229 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated {
230 input_id: InputId::new(),
231 existing_id: InputId::new(),
232 });
233 let json = serde_json::to_value(&event).unwrap();
234 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
235 assert!(matches!(
236 parsed,
237 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated { .. })
238 ));
239 }
240
241 #[test]
242 fn input_lifecycle_superseded_serde() {
243 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded {
244 input_id: InputId::new(),
245 superseded_by: InputId::new(),
246 });
247 let json = serde_json::to_value(&event).unwrap();
248 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
249 assert!(matches!(
250 parsed,
251 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded { .. })
252 ));
253 }
254
255 #[test]
256 fn input_lifecycle_coalesced_serde() {
257 let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced {
258 input_id: InputId::new(),
259 aggregate_id: InputId::new(),
260 });
261 let json = serde_json::to_value(&event).unwrap();
262 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
263 assert!(matches!(
264 parsed,
265 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced { .. })
266 ));
267 }
268
269 #[test]
270 fn run_lifecycle_started_serde() {
271 let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started {
272 run_id: RunId::new(),
273 });
274 let json = serde_json::to_value(&event).unwrap();
275 assert_eq!(json["category"], "run_lifecycle");
276 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
277 assert!(matches!(
278 parsed,
279 RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started { .. })
280 ));
281 }
282
283 #[test]
284 fn run_lifecycle_failed_serde() {
285 let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
286 run_id: RunId::new(),
287 error: "timeout".into(),
288 recoverable: true,
289 });
290 let json = serde_json::to_value(&event).unwrap();
291 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
292 assert!(matches!(
293 parsed,
294 RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
295 recoverable: true,
296 ..
297 })
298 ));
299 }
300
301 #[test]
302 fn runtime_state_change_serde() {
303 let event = RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
304 from: RuntimeState::Idle,
305 to: RuntimeState::Running,
306 });
307 let json = serde_json::to_value(&event).unwrap();
308 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
309 assert!(matches!(
310 parsed,
311 RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
312 from: RuntimeState::Idle,
313 to: RuntimeState::Running,
314 })
315 ));
316 }
317
318 #[test]
319 fn topology_created_serde() {
320 let event = RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated {
321 runtime_id: LogicalRuntimeId::new("mob-agent-1"),
322 });
323 let json = serde_json::to_value(&event).unwrap();
324 assert_eq!(json["category"], "topology");
325 let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
326 assert!(matches!(
327 parsed,
328 RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated { .. })
329 ));
330 }
331
332 #[test]
333 fn event_code_coverage() {
334 let events = vec![
335 RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
336 input_id: InputId::new(),
337 }),
338 RuntimeEvent::RunLifecycle(RunLifecycleEvent::Completed {
339 run_id: RunId::new(),
340 }),
341 RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
342 from: RuntimeState::Idle,
343 to: RuntimeState::Running,
344 }),
345 RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeRetired {
346 runtime_id: LogicalRuntimeId::new("x"),
347 }),
348 RuntimeEvent::Projection(RuntimeProjectionEvent {
349 rule_id: "rule-1".into(),
350 projected_input_id: InputId::new(),
351 source_event_id: RuntimeEventId::new(),
352 }),
353 ];
354 for event in &events {
355 let code = event.event_code();
356 assert!(!code.0.is_empty());
357 }
358 }
359}