Skip to main content

meerkat_runtime/
runtime_event.rs

1//! §7 RuntimeEvent — full event hierarchy for runtime observability.
2//!
3//! Five categories: InputLifecycle, RunLifecycle, RuntimeState, Topology, Projection.
4
5use chrono::{DateTime, Utc};
6use meerkat_core::lifecycle::{InputId, RunId};
7use serde::{Deserialize, Serialize};
8
9use crate::identifiers::{
10    CausationId, CorrelationId, EventCodeId, LogicalRuntimeId, RuntimeEventId,
11};
12use crate::input_state::InputLifecycleState;
13use crate::runtime_state::RuntimeState;
14
15/// Envelope for all runtime events.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RuntimeEventEnvelope {
18    /// Unique event ID.
19    pub id: RuntimeEventId,
20    /// When the event occurred.
21    pub timestamp: DateTime<Utc>,
22    /// Which runtime emitted this event.
23    pub runtime_id: LogicalRuntimeId,
24    /// The event payload.
25    pub event: RuntimeEvent,
26    /// Causation chain.
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub causation_id: Option<CausationId>,
29    /// Correlation for cross-boundary tracing.
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub correlation_id: Option<CorrelationId>,
32}
33
34/// Runtime event categories and codes.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(tag = "category", content = "data", rename_all = "snake_case")]
37#[non_exhaustive]
38pub enum RuntimeEvent {
39    /// Input lifecycle state transitions.
40    InputLifecycle(InputLifecycleEvent),
41    /// Run lifecycle state transitions.
42    RunLifecycle(RunLifecycleEvent),
43    /// Runtime state transitions.
44    RuntimeStateChange(RuntimeStateChangeEvent),
45    /// Topology changes (agents joining/leaving).
46    Topology(RuntimeTopologyEvent),
47    /// Projection-derived events.
48    Projection(RuntimeProjectionEvent),
49}
50
51impl RuntimeEvent {
52    /// Get the stable event code for wire formats.
53    pub fn event_code(&self) -> EventCodeId {
54        match self {
55            RuntimeEvent::InputLifecycle(e) => e.event_code(),
56            RuntimeEvent::RunLifecycle(e) => e.event_code(),
57            RuntimeEvent::RuntimeStateChange(_) => EventCodeId::new("runtime.state_changed"),
58            RuntimeEvent::Topology(e) => e.event_code(),
59            RuntimeEvent::Projection(_) => EventCodeId::new("runtime.projection_emitted"),
60        }
61    }
62}
63
64/// Input lifecycle events.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(tag = "code", rename_all = "snake_case")]
67#[non_exhaustive]
68pub enum InputLifecycleEvent {
69    /// Input accepted into the system.
70    Accepted { input_id: InputId },
71    /// Input deduplicated (idempotency key matched).
72    Deduplicated {
73        input_id: InputId,
74        existing_id: InputId,
75    },
76    /// Input superseded by a newer input.
77    Superseded {
78        input_id: InputId,
79        superseded_by: InputId,
80    },
81    /// Input coalesced into an aggregate.
82    Coalesced {
83        input_id: InputId,
84        aggregate_id: InputId,
85    },
86    /// Input queued for processing.
87    Queued { input_id: InputId },
88    /// Input staged at a run boundary.
89    Staged { input_id: InputId, run_id: RunId },
90    /// Input applied (boundary primitive executed).
91    Applied { input_id: InputId, run_id: RunId },
92    /// Input consumed (run completed successfully).
93    Consumed { input_id: InputId, run_id: RunId },
94    /// Input abandoned (retire/reset/destroy).
95    Abandoned { input_id: InputId, reason: String },
96    /// Input state transitioned.
97    StateTransitioned {
98        input_id: InputId,
99        from: InputLifecycleState,
100        to: InputLifecycleState,
101    },
102}
103
104impl InputLifecycleEvent {
105    pub fn event_code(&self) -> EventCodeId {
106        match self {
107            Self::Accepted { .. } => EventCodeId::new("input.accepted"),
108            Self::Deduplicated { .. } => EventCodeId::new("input.deduplicated"),
109            Self::Superseded { .. } => EventCodeId::new("input.superseded"),
110            Self::Coalesced { .. } => EventCodeId::new("input.coalesced"),
111            Self::Queued { .. } => EventCodeId::new("input.queued"),
112            Self::Staged { .. } => EventCodeId::new("input.staged"),
113            Self::Applied { .. } => EventCodeId::new("input.applied"),
114            Self::Consumed { .. } => EventCodeId::new("input.consumed"),
115            Self::Abandoned { .. } => EventCodeId::new("input.abandoned"),
116            Self::StateTransitioned { .. } => EventCodeId::new("input.state_transitioned"),
117        }
118    }
119}
120
121/// Run lifecycle events.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "code", rename_all = "snake_case")]
124#[non_exhaustive]
125pub enum RunLifecycleEvent {
126    /// A run started.
127    Started { run_id: RunId },
128    /// A run completed successfully.
129    Completed { run_id: RunId },
130    /// A run failed.
131    Failed {
132        run_id: RunId,
133        error: String,
134        recoverable: bool,
135    },
136    /// A run was cancelled.
137    Cancelled { run_id: RunId },
138}
139
140impl RunLifecycleEvent {
141    pub fn event_code(&self) -> EventCodeId {
142        match self {
143            Self::Started { .. } => EventCodeId::new("run.started"),
144            Self::Completed { .. } => EventCodeId::new("run.completed"),
145            Self::Failed { .. } => EventCodeId::new("run.failed"),
146            Self::Cancelled { .. } => EventCodeId::new("run.cancelled"),
147        }
148    }
149}
150
151/// Runtime state change events.
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct RuntimeStateChangeEvent {
154    pub from: RuntimeState,
155    pub to: RuntimeState,
156}
157
158/// Topology events (agents joining/leaving the runtime).
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(tag = "code", rename_all = "snake_case")]
161#[non_exhaustive]
162pub enum RuntimeTopologyEvent {
163    /// A runtime instance was created.
164    RuntimeCreated { runtime_id: LogicalRuntimeId },
165    /// A runtime instance was retired.
166    RuntimeRetired { runtime_id: LogicalRuntimeId },
167    /// A runtime instance was respawned.
168    RuntimeRespawned { runtime_id: LogicalRuntimeId },
169    /// A runtime instance was destroyed.
170    RuntimeDestroyed { runtime_id: LogicalRuntimeId },
171}
172
173impl RuntimeTopologyEvent {
174    pub fn event_code(&self) -> EventCodeId {
175        match self {
176            Self::RuntimeCreated { .. } => EventCodeId::new("topology.runtime_created"),
177            Self::RuntimeRetired { .. } => EventCodeId::new("topology.runtime_retired"),
178            Self::RuntimeRespawned { .. } => EventCodeId::new("topology.runtime_respawned"),
179            Self::RuntimeDestroyed { .. } => EventCodeId::new("topology.runtime_destroyed"),
180        }
181    }
182}
183
184/// Projection-derived events.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct RuntimeProjectionEvent {
187    /// The projection rule that generated this event.
188    pub rule_id: String,
189    /// The projected input ID.
190    pub projected_input_id: InputId,
191    /// Source event that triggered the projection.
192    pub source_event_id: RuntimeEventId,
193}
194
195#[cfg(test)]
196#[allow(clippy::unwrap_used)]
197mod tests {
198    use super::*;
199
200    fn make_envelope(event: RuntimeEvent) -> RuntimeEventEnvelope {
201        RuntimeEventEnvelope {
202            id: RuntimeEventId::new(),
203            timestamp: Utc::now(),
204            runtime_id: LogicalRuntimeId::new("test-runtime"),
205            event,
206            causation_id: None,
207            correlation_id: None,
208        }
209    }
210
211    #[test]
212    fn input_lifecycle_accepted_serde() {
213        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
214            input_id: InputId::new(),
215        });
216        let envelope = make_envelope(event);
217        let json = serde_json::to_value(&envelope).unwrap();
218        assert_eq!(json["event"]["category"], "input_lifecycle");
219        assert_eq!(json["event"]["data"]["code"], "accepted");
220        let parsed: RuntimeEventEnvelope = serde_json::from_value(json).unwrap();
221        assert!(matches!(
222            parsed.event,
223            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted { .. })
224        ));
225    }
226
227    #[test]
228    fn input_lifecycle_deduplicated_serde() {
229        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated {
230            input_id: InputId::new(),
231            existing_id: InputId::new(),
232        });
233        let json = serde_json::to_value(&event).unwrap();
234        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
235        assert!(matches!(
236            parsed,
237            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated { .. })
238        ));
239    }
240
241    #[test]
242    fn input_lifecycle_superseded_serde() {
243        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded {
244            input_id: InputId::new(),
245            superseded_by: InputId::new(),
246        });
247        let json = serde_json::to_value(&event).unwrap();
248        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
249        assert!(matches!(
250            parsed,
251            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded { .. })
252        ));
253    }
254
255    #[test]
256    fn input_lifecycle_coalesced_serde() {
257        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced {
258            input_id: InputId::new(),
259            aggregate_id: InputId::new(),
260        });
261        let json = serde_json::to_value(&event).unwrap();
262        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
263        assert!(matches!(
264            parsed,
265            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced { .. })
266        ));
267    }
268
269    #[test]
270    fn run_lifecycle_started_serde() {
271        let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started {
272            run_id: RunId::new(),
273        });
274        let json = serde_json::to_value(&event).unwrap();
275        assert_eq!(json["category"], "run_lifecycle");
276        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
277        assert!(matches!(
278            parsed,
279            RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started { .. })
280        ));
281    }
282
283    #[test]
284    fn run_lifecycle_failed_serde() {
285        let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
286            run_id: RunId::new(),
287            error: "timeout".into(),
288            recoverable: true,
289        });
290        let json = serde_json::to_value(&event).unwrap();
291        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
292        assert!(matches!(
293            parsed,
294            RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
295                recoverable: true,
296                ..
297            })
298        ));
299    }
300
301    #[test]
302    fn runtime_state_change_serde() {
303        let event = RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
304            from: RuntimeState::Idle,
305            to: RuntimeState::Running,
306        });
307        let json = serde_json::to_value(&event).unwrap();
308        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
309        assert!(matches!(
310            parsed,
311            RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
312                from: RuntimeState::Idle,
313                to: RuntimeState::Running,
314            })
315        ));
316    }
317
318    #[test]
319    fn topology_created_serde() {
320        let event = RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated {
321            runtime_id: LogicalRuntimeId::new("mob-agent-1"),
322        });
323        let json = serde_json::to_value(&event).unwrap();
324        assert_eq!(json["category"], "topology");
325        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
326        assert!(matches!(
327            parsed,
328            RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated { .. })
329        ));
330    }
331
332    #[test]
333    fn event_code_coverage() {
334        let events = vec![
335            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
336                input_id: InputId::new(),
337            }),
338            RuntimeEvent::RunLifecycle(RunLifecycleEvent::Completed {
339                run_id: RunId::new(),
340            }),
341            RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
342                from: RuntimeState::Idle,
343                to: RuntimeState::Running,
344            }),
345            RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeRetired {
346                runtime_id: LogicalRuntimeId::new("x"),
347            }),
348            RuntimeEvent::Projection(RuntimeProjectionEvent {
349                rule_id: "rule-1".into(),
350                projected_input_id: InputId::new(),
351                source_event_id: RuntimeEventId::new(),
352            }),
353        ];
354        for event in &events {
355            let code = event.event_code();
356            assert!(!code.0.is_empty());
357        }
358    }
359}